Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 57 additions & 7 deletions cores/portduino/AsyncUDP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,18 @@
#include <sys/socket.h>
#include "Utility.h"

#define LOOP_TIMER_CHECK_TIMEOUT_MS 100

void _asyncudp_async_cb(uv_async_t *handle) {
AsyncUDP *udp = (AsyncUDP *)handle->data;
udp->_DO_NOT_CALL_async_cb();
}

void _asyncudp_timer_cb(uv_timer_t *handle) {
AsyncUDP *udp = (AsyncUDP *)handle->data;
udp->_DO_NOT_CALL_timer_cb();
}

AsyncUDP::AsyncUDP() {
_handler = NULL;
_connected = false;
Expand All @@ -17,6 +24,8 @@ AsyncUDP::AsyncUDP() {
uv_loop_init(&_loop);
_async.data = this;
uv_async_init(&_loop, &_async, _asyncudp_async_cb);
_timer.data = this;
uv_timer_init(&_loop, &_timer);
}

AsyncUDP::~AsyncUDP() {
Expand Down Expand Up @@ -49,12 +58,27 @@ void _asyncudp_on_read_cb(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf,
}

void AsyncUDP::_DO_NOT_CALL_uv_on_read(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf, const struct sockaddr *addr, unsigned flags) {
if (nread <= 0) {
if (_waitingToBeLooped) {
// we are waiting to receive a packet yet we just exhausted the receive buffer.
// This can happen if we are unlucky and this happens to run while we were sending a packet.
// Or this happens because the receive buffer was full and the packet we are waiting for was dropped.
_emptiedBuffer = true;
}
return;
}
_handlerMutex.lock();
auto h = _handler;
_handlerMutex.unlock();
if (h) {
AsyncUDPPacket packet((uint8_t*)buf->base, nread);
h(packet);
if (_waitingToBeLooped && nread == _waitingToBeLooped->len && memcmp(buf->base, _waitingToBeLooped->data, nread) == 0) {
_waitingToBeLooped = std::unique_ptr<asyncUDPSendTask>();
uv_timer_stop(&_timer);
_attemptWrite();
} else {
if (h) {
AsyncUDPPacket packet((uint8_t*)buf->base, nread);
h(packet);
}
}
free(buf->base);
}
Expand Down Expand Up @@ -156,16 +180,26 @@ size_t AsyncUDP::writeTo(const uint8_t *data, size_t len, const IPAddress addr,
return len;
}

void AsyncUDP::_DO_NOT_CALL_async_cb() {
void AsyncUDP::_attemptWrite() {
if (_waitingToBeLooped) {
return;
}
_sendQueueMutex.lock();
while (!_sendQueue.empty()) {
if (!_sendQueue.empty()) {
auto task = std::move(_sendQueue.back());
_sendQueue.pop_back();
_sendQueueMutex.unlock();
_doWrite(task->data, task->len, task->addr, task->port);
_sendQueueMutex.lock();
_waitingToBeLooped = std::move(task);
_emptiedBuffer = false;
uv_timer_start(&_timer, _asyncudp_timer_cb, LOOP_TIMER_CHECK_TIMEOUT_MS, LOOP_TIMER_CHECK_TIMEOUT_MS);
} else {
_sendQueueMutex.unlock();
}
_sendQueueMutex.unlock();
}

void AsyncUDP::_DO_NOT_CALL_async_cb() {
_attemptWrite();
if (_quit.load()) {
uv_udp_recv_stop(&_socket);
// FIXME: don't do bytes → string → bytes IP conversion
Expand All @@ -178,6 +212,22 @@ void AsyncUDP::_DO_NOT_CALL_async_cb() {
}
}

void AsyncUDP::_DO_NOT_CALL_timer_cb() {
if (!_waitingToBeLooped) {
uv_timer_stop(&_timer);
return;
}
if (_emptiedBuffer) {
// We waited for LOOP_TIMER_CHECK_TIMEOUT_MS; we exhausted the receive buffer yet we did not receive the LOOPed packet we were waiting for.
// It is fair to say we will most likely never receive it.
// Probably it was dropped by the kernel because the receive buffer was full.
_waitingToBeLooped = std::unique_ptr<asyncUDPSendTask>();
uv_timer_stop(&_timer);
} else {
// We are still waiting for the packet to be LOOPed back but we havn't yet exhausted the receive buffer.
}
}

void _asyncudp_send_cb(uv_udp_send_t *req, int status) {
free(req);
}
Expand Down
11 changes: 11 additions & 0 deletions cores/portduino/AsyncUDP.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,13 @@ class AsyncUDP final
// the queue is used because uv_udp_send is not threadsafe and uv_async can merge multiple calls into one callback
std::vector<std::unique_ptr<asyncUDPSendTask>> _sendQueue;

// Theses must be accessed from the uv loop.
// _waitingToBeLooped is used to wait for a sent packet to be looped back before we send an other one.
// This allows the recv callback to omit sent packets.
std::unique_ptr<asyncUDPSendTask> _waitingToBeLooped;
bool _emptiedBuffer = false;
uv_timer_t _timer;

std::atomic<bool> _quit;
std::thread _ioThread;

Expand Down Expand Up @@ -107,8 +114,12 @@ class AsyncUDP final
// do not call, used internally as callback from libuv's C callback
void _DO_NOT_CALL_uv_on_read(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf, const struct sockaddr *addr, unsigned flags);
void _DO_NOT_CALL_async_cb();
void _DO_NOT_CALL_timer_cb();

private:
// _attemptWrite must be accessed from the uv loop.
void _attemptWrite();
// _doWrite must be accessed from the uv loop.
void _doWrite(const uint8_t *data, size_t len, const IPAddress addr, uint16_t port);
};

Expand Down