@@ -49,12 +49,20 @@ void _asyncudp_on_read_cb(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf,
49
49
}
50
50
51
51
void AsyncUDP::_DO_NOT_CALL_uv_on_read (uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf, const struct sockaddr *addr, unsigned flags) {
52
+ if (nread <= 0 ) {
53
+ return ;
54
+ }
52
55
_handlerMutex.lock ();
53
56
auto h = _handler;
54
57
_handlerMutex.unlock ();
55
- if (h) {
56
- AsyncUDPPacket packet ((uint8_t *)buf->base , nread);
57
- h (packet);
58
+ if (_waitingToBeLooped && nread == _waitingToBeLooped->len && memcmp (buf->base , _waitingToBeLooped->data , nread) == 0 ) {
59
+ _waitingToBeLooped = std::unique_ptr<asyncUDPSendTask>();
60
+ _attemptWrite ();
61
+ } else {
62
+ if (h) {
63
+ AsyncUDPPacket packet ((uint8_t *)buf->base , nread);
64
+ h (packet);
65
+ }
58
66
}
59
67
free (buf->base );
60
68
}
@@ -156,16 +164,21 @@ size_t AsyncUDP::writeTo(const uint8_t *data, size_t len, const IPAddress addr,
156
164
return len;
157
165
}
158
166
159
- void AsyncUDP::_DO_NOT_CALL_async_cb () {
167
+ void AsyncUDP::_attemptWrite () {
160
168
_sendQueueMutex.lock ();
161
- while ( !_sendQueue.empty ()) {
169
+ if (!_waitingToBeLooped && !_sendQueue.empty ()) {
162
170
auto task = std::move (_sendQueue.back ());
163
171
_sendQueue.pop_back ();
164
172
_sendQueueMutex.unlock ();
165
173
_doWrite (task->data , task->len , task->addr , task->port );
166
174
_sendQueueMutex.lock ();
175
+ _waitingToBeLooped = std::move (task);
167
176
}
168
177
_sendQueueMutex.unlock ();
178
+ }
179
+
180
+ void AsyncUDP::_DO_NOT_CALL_async_cb () {
181
+ _attemptWrite ();
169
182
if (_quit.load ()) {
170
183
uv_udp_recv_stop (&_socket);
171
184
// FIXME: don't do bytes → string → bytes IP conversion
0 commit comments