From 98f692386a497808dec0ff0034c86ce540b681b5 Mon Sep 17 00:00:00 2001 From: Jorropo Date: Mon, 18 Aug 2025 03:34:24 +0200 Subject: [PATCH 1/2] AsyncUDP: keep track of the last sent packet and skip it while LOOPing This means we don't invoke the receive handler for packets we send. --- cores/portduino/AsyncUDP.cpp | 26 +++++++++++++++++++++----- cores/portduino/AsyncUDP.h | 8 ++++++++ 2 files changed, 29 insertions(+), 5 deletions(-) diff --git a/cores/portduino/AsyncUDP.cpp b/cores/portduino/AsyncUDP.cpp index d9b0de8..96ba9e8 100644 --- a/cores/portduino/AsyncUDP.cpp +++ b/cores/portduino/AsyncUDP.cpp @@ -49,12 +49,20 @@ void _asyncudp_on_read_cb(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf, } void AsyncUDP::_DO_NOT_CALL_uv_on_read(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf, const struct sockaddr *addr, unsigned flags) { + if (nread <= 0) { + return; + } _handlerMutex.lock(); auto h = _handler; _handlerMutex.unlock(); - if (h) { - AsyncUDPPacket packet((uint8_t*)buf->base, nread); - h(packet); + if (_waitingToBeLooped && nread == _waitingToBeLooped->len && memcmp(buf->base, _waitingToBeLooped->data, nread) == 0) { + _waitingToBeLooped = std::unique_ptr(); + _attemptWrite(); + } else { + if (h) { + AsyncUDPPacket packet((uint8_t*)buf->base, nread); + h(packet); + } } free(buf->base); } @@ -156,16 +164,24 @@ size_t AsyncUDP::writeTo(const uint8_t *data, size_t len, const IPAddress addr, return len; } -void AsyncUDP::_DO_NOT_CALL_async_cb() { +void AsyncUDP::_attemptWrite() { + if (_waitingToBeLooped) { + return; + } _sendQueueMutex.lock(); - while (!_sendQueue.empty()) { + if (!_sendQueue.empty()) { auto task = std::move(_sendQueue.back()); _sendQueue.pop_back(); _sendQueueMutex.unlock(); _doWrite(task->data, task->len, task->addr, task->port); _sendQueueMutex.lock(); + _waitingToBeLooped = std::move(task); } _sendQueueMutex.unlock(); +} + +void AsyncUDP::_DO_NOT_CALL_async_cb() { + _attemptWrite(); if (_quit.load()) { uv_udp_recv_stop(&_socket); // FIXME: don't do bytes → string → bytes IP conversion diff --git a/cores/portduino/AsyncUDP.h b/cores/portduino/AsyncUDP.h index d2d7051..c6db52b 100644 --- a/cores/portduino/AsyncUDP.h +++ b/cores/portduino/AsyncUDP.h @@ -69,6 +69,11 @@ class AsyncUDP final // the queue is used because uv_udp_send is not threadsafe and uv_async can merge multiple calls into one callback std::vector> _sendQueue; + // _waitingToBeLooped is used to wait for a sent packet to be looped back before we send an other one. + // This allows the recv callback to omit sent packets. + // It must be accessed from the uv loop. + std::unique_ptr _waitingToBeLooped; + std::atomic _quit; std::thread _ioThread; @@ -109,6 +114,9 @@ class AsyncUDP final void _DO_NOT_CALL_async_cb(); private: + // _attemptWrite must be accessed from the uv loop. + void _attemptWrite(); + // _doWrite must be accessed from the uv loop. void _doWrite(const uint8_t *data, size_t len, const IPAddress addr, uint16_t port); }; From 6ea9cd0ebee08f67875daeed190f72c72b13fab5 Mon Sep 17 00:00:00 2001 From: Jorropo Date: Mon, 18 Aug 2025 04:14:15 +0200 Subject: [PATCH 2/2] AsyncUDP: timeout waiting for looped packets after 100ms --- cores/portduino/AsyncUDP.cpp | 38 ++++++++++++++++++++++++++++++++++-- cores/portduino/AsyncUDP.h | 5 ++++- 2 files changed, 40 insertions(+), 3 deletions(-) diff --git a/cores/portduino/AsyncUDP.cpp b/cores/portduino/AsyncUDP.cpp index 96ba9e8..b469bf9 100644 --- a/cores/portduino/AsyncUDP.cpp +++ b/cores/portduino/AsyncUDP.cpp @@ -3,11 +3,18 @@ #include #include "Utility.h" +#define LOOP_TIMER_CHECK_TIMEOUT_MS 100 + void _asyncudp_async_cb(uv_async_t *handle) { AsyncUDP *udp = (AsyncUDP *)handle->data; udp->_DO_NOT_CALL_async_cb(); } +void _asyncudp_timer_cb(uv_timer_t *handle) { + AsyncUDP *udp = (AsyncUDP *)handle->data; + udp->_DO_NOT_CALL_timer_cb(); +} + AsyncUDP::AsyncUDP() { _handler = NULL; _connected = false; @@ -17,6 +24,8 @@ AsyncUDP::AsyncUDP() { uv_loop_init(&_loop); _async.data = this; uv_async_init(&_loop, &_async, _asyncudp_async_cb); + _timer.data = this; + uv_timer_init(&_loop, &_timer); } AsyncUDP::~AsyncUDP() { @@ -50,6 +59,12 @@ void _asyncudp_on_read_cb(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf, void AsyncUDP::_DO_NOT_CALL_uv_on_read(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf, const struct sockaddr *addr, unsigned flags) { if (nread <= 0) { + if (_waitingToBeLooped) { + // we are waiting to receive a packet yet we just exhausted the receive buffer. + // This can happen if we are unlucky and this happens to run while we were sending a packet. + // Or this happens because the receive buffer was full and the packet we are waiting for was dropped. + _emptiedBuffer = true; + } return; } _handlerMutex.lock(); @@ -57,6 +72,7 @@ void AsyncUDP::_DO_NOT_CALL_uv_on_read(uv_udp_t *handle, ssize_t nread, const uv _handlerMutex.unlock(); if (_waitingToBeLooped && nread == _waitingToBeLooped->len && memcmp(buf->base, _waitingToBeLooped->data, nread) == 0) { _waitingToBeLooped = std::unique_ptr(); + uv_timer_stop(&_timer); _attemptWrite(); } else { if (h) { @@ -174,10 +190,12 @@ void AsyncUDP::_attemptWrite() { _sendQueue.pop_back(); _sendQueueMutex.unlock(); _doWrite(task->data, task->len, task->addr, task->port); - _sendQueueMutex.lock(); _waitingToBeLooped = std::move(task); + _emptiedBuffer = false; + uv_timer_start(&_timer, _asyncudp_timer_cb, LOOP_TIMER_CHECK_TIMEOUT_MS, LOOP_TIMER_CHECK_TIMEOUT_MS); + } else { + _sendQueueMutex.unlock(); } - _sendQueueMutex.unlock(); } void AsyncUDP::_DO_NOT_CALL_async_cb() { @@ -194,6 +212,22 @@ void AsyncUDP::_DO_NOT_CALL_async_cb() { } } +void AsyncUDP::_DO_NOT_CALL_timer_cb() { + if (!_waitingToBeLooped) { + uv_timer_stop(&_timer); + return; + } + if (_emptiedBuffer) { + // We waited for LOOP_TIMER_CHECK_TIMEOUT_MS; we exhausted the receive buffer yet we did not receive the LOOPed packet we were waiting for. + // It is fair to say we will most likely never receive it. + // Probably it was dropped by the kernel because the receive buffer was full. + _waitingToBeLooped = std::unique_ptr(); + uv_timer_stop(&_timer); + } else { + // We are still waiting for the packet to be LOOPed back but we havn't yet exhausted the receive buffer. + } +} + void _asyncudp_send_cb(uv_udp_send_t *req, int status) { free(req); } diff --git a/cores/portduino/AsyncUDP.h b/cores/portduino/AsyncUDP.h index c6db52b..253f539 100644 --- a/cores/portduino/AsyncUDP.h +++ b/cores/portduino/AsyncUDP.h @@ -69,10 +69,12 @@ class AsyncUDP final // the queue is used because uv_udp_send is not threadsafe and uv_async can merge multiple calls into one callback std::vector> _sendQueue; + // Theses must be accessed from the uv loop. // _waitingToBeLooped is used to wait for a sent packet to be looped back before we send an other one. // This allows the recv callback to omit sent packets. - // It must be accessed from the uv loop. std::unique_ptr _waitingToBeLooped; + bool _emptiedBuffer = false; + uv_timer_t _timer; std::atomic _quit; std::thread _ioThread; @@ -112,6 +114,7 @@ class AsyncUDP final // do not call, used internally as callback from libuv's C callback void _DO_NOT_CALL_uv_on_read(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf, const struct sockaddr *addr, unsigned flags); void _DO_NOT_CALL_async_cb(); + void _DO_NOT_CALL_timer_cb(); private: // _attemptWrite must be accessed from the uv loop.