@@ -209,13 +209,21 @@ impl<M: RemoteMessage> NetTx<M> {
209
209
// If we can't deliver a message within this limit consider
210
210
// `link` broken and return.
211
211
212
+ #[ derive( Debug ) ]
213
+ struct QueuedMessage < M : RemoteMessage > {
214
+ seq : u64 ,
215
+ data : Bytes ,
216
+ received_at : Instant ,
217
+ return_channel : oneshot:: Sender < M > ,
218
+ }
219
+
212
220
#[ derive( Debug ) ]
213
221
struct Outbox < ' a , M : RemoteMessage > {
214
222
// The seq number of the next new message put into outbox. Requeued
215
223
// unacked messages should still use their already assigned seq
216
224
// numbers.
217
225
next_seq : u64 ,
218
- deque : VecDeque < ( u64 , Bytes , Instant , oneshot :: Sender < M > ) > ,
226
+ deque : VecDeque < QueuedMessage < M > > ,
219
227
log_id : & ' a str ,
220
228
}
221
229
@@ -231,8 +239,9 @@ impl<M: RemoteMessage> NetTx<M> {
231
239
fn is_expired ( & self ) -> bool {
232
240
match self . deque . front ( ) {
233
241
None => false ,
234
- Some ( ( _, _, since, _) ) => {
235
- since. elapsed ( ) > config:: global:: get ( config:: MESSAGE_DELIVERY_TIMEOUT )
242
+ Some ( msg) => {
243
+ msg. received_at . elapsed ( )
244
+ > config:: global:: get ( config:: MESSAGE_DELIVERY_TIMEOUT )
236
245
}
237
246
}
238
247
}
@@ -256,17 +265,17 @@ impl<M: RemoteMessage> NetTx<M> {
256
265
self . log_id,
257
266
)
258
267
} ) ?
259
- . 1
268
+ . data
260
269
. clone ( ) ;
261
270
sink. send ( data) . await . map_err ( |e| e. to_string ( ) ) ?;
262
271
Ok ( ( ) )
263
272
}
264
273
265
274
fn front_size ( & self ) -> Option < usize > {
266
- self . deque . front ( ) . map ( |( _ , bytes , _ , _ ) | bytes . len ( ) )
275
+ self . deque . front ( ) . map ( |msg| msg . data . len ( ) )
267
276
}
268
277
269
- fn pop_front ( & mut self ) -> Option < ( u64 , Bytes , Instant , oneshot :: Sender < M > ) > {
278
+ fn pop_front ( & mut self ) -> Option < QueuedMessage < M > > {
270
279
self . deque . pop_front ( )
271
280
}
272
281
@@ -275,7 +284,7 @@ impl<M: RemoteMessage> NetTx<M> {
275
284
( message, return_channel, received_at) : ( M , oneshot:: Sender < M > , Instant ) ,
276
285
) -> Result < ( ) , String > {
277
286
assert ! (
278
- self . deque. back( ) . is_none_or( |msg| msg. 0 < self . next_seq) ,
287
+ self . deque. back( ) . is_none_or( |msg| msg. seq < self . next_seq) ,
279
288
"{}: unexpected: seq should be in ascending order, but got {:?} vs {}" ,
280
289
self . log_id,
281
290
self . deque. back( ) ,
@@ -287,8 +296,12 @@ impl<M: RemoteMessage> NetTx<M> {
287
296
. map_err ( |e| format ! ( "serialization error: {e}" ) ) ?
288
297
. into ( ) ;
289
298
REMOTE_MESSAGE_SEND_SIZE . record ( data. len ( ) as f64 , & [ ] ) ;
290
- self . deque
291
- . push_back ( ( self . next_seq , data, received_at, return_channel) ) ;
299
+ self . deque . push_back ( QueuedMessage {
300
+ seq : self . next_seq ,
301
+ data,
302
+ received_at,
303
+ return_channel,
304
+ } ) ;
292
305
self . next_seq += 1 ;
293
306
Ok ( ( ) )
294
307
}
@@ -297,11 +310,11 @@ impl<M: RemoteMessage> NetTx<M> {
297
310
match ( unacked. deque . back ( ) , self . deque . front ( ) ) {
298
311
( Some ( last) , Some ( first) ) => {
299
312
assert ! (
300
- last. 0 < first. 0 ,
313
+ last. seq < first. seq ,
301
314
"{}: seq should be in ascending order, but got {} vs {:?}" ,
302
315
self . log_id,
303
- last. 0 ,
304
- first. 0 ,
316
+ last. seq ,
317
+ first. seq ,
305
318
) ;
306
319
}
307
320
_ => ( ) ,
@@ -315,7 +328,7 @@ impl<M: RemoteMessage> NetTx<M> {
315
328
316
329
#[ derive( Debug ) ]
317
330
struct Unacked < ' a , M : RemoteMessage > {
318
- deque : VecDeque < ( u64 , Bytes , Instant , oneshot :: Sender < M > ) > ,
331
+ deque : VecDeque < QueuedMessage < M > > ,
319
332
largest_acked : Option < u64 > ,
320
333
log_id : & ' a str ,
321
334
}
@@ -329,13 +342,13 @@ impl<M: RemoteMessage> NetTx<M> {
329
342
}
330
343
}
331
344
332
- fn push_back ( & mut self , message : ( u64 , Bytes , Instant , oneshot :: Sender < M > ) ) {
345
+ fn push_back ( & mut self , message : QueuedMessage < M > ) {
333
346
assert ! (
334
- self . deque. back( ) . is_none_or( |msg| msg. 0 < message. 0 ) ,
347
+ self . deque. back( ) . is_none_or( |msg| msg. seq < message. seq ) ,
335
348
"{}: seq should be in ascending order, but got {:?} vs {}" ,
336
349
self . log_id,
337
350
self . deque. back( ) ,
338
- message. 0
351
+ message. seq
339
352
) ;
340
353
341
354
if let Some ( largest) = self . largest_acked {
@@ -370,7 +383,7 @@ impl<M: RemoteMessage> NetTx<M> {
370
383
// Tx resends. As a result, this message's ack would be
371
384
// recorded already by `largest_acked` before it is put into
372
385
// unacked queue.
373
- if message. 0 <= largest {
386
+ if message. seq <= largest {
374
387
// since the message is already delivered and acked, it
375
388
// does need to be put in the queue again.
376
389
return ;
@@ -392,8 +405,8 @@ impl<M: RemoteMessage> NetTx<M> {
392
405
393
406
self . largest_acked = Some ( acked) ;
394
407
let deque = & mut self . deque ;
395
- while let Some ( ( seq , _ , _ , _ ) ) = deque. front ( ) {
396
- if * seq <= acked {
408
+ while let Some ( msg ) = deque. front ( ) {
409
+ if msg . seq <= acked {
397
410
deque. pop_front ( ) ;
398
411
} else {
399
412
// Messages in the deque are orderd by seq in ascending
@@ -407,7 +420,7 @@ impl<M: RemoteMessage> NetTx<M> {
407
420
fn is_expired ( & self ) -> bool {
408
421
matches ! (
409
422
self . deque. front( ) ,
410
- Some ( ( _ , _ , received_at , _ ) ) if received_at. elapsed( ) > config:: global:: get( config:: MESSAGE_DELIVERY_TIMEOUT )
423
+ Some ( msg ) if msg . received_at. elapsed( ) > config:: global:: get( config:: MESSAGE_DELIVERY_TIMEOUT )
411
424
)
412
425
}
413
426
@@ -416,10 +429,10 @@ impl<M: RemoteMessage> NetTx<M> {
416
429
/// branches.
417
430
async fn wait_for_timeout ( & self ) {
418
431
match self . deque . front ( ) {
419
- Some ( ( _ , _ , received_at , _ ) ) => {
432
+ Some ( msg ) => {
420
433
RealClock
421
434
. sleep_until (
422
- received_at. clone ( )
435
+ msg . received_at . clone ( )
423
436
+ config:: global:: get ( config:: MESSAGE_DELIVERY_TIMEOUT ) ,
424
437
)
425
438
. await
@@ -728,11 +741,11 @@ impl<M: RemoteMessage> NetTx<M> {
728
741
. deque
729
742
. drain ( ..)
730
743
. chain ( outbox. deque . drain ( ..) )
731
- . filter_map ( |( _ , bytes , _ , return_channel ) | {
732
- bincode:: deserialize ( & bytes )
744
+ . filter_map ( |queued_msg | {
745
+ bincode:: deserialize ( & queued_msg . data )
733
746
. ok ( )
734
747
. and_then ( |frame| match frame {
735
- Frame :: Message ( _, msg) => Some ( ( return_channel, msg) ) ,
748
+ Frame :: Message ( _, msg) => Some ( ( queued_msg . return_channel , msg) ) ,
736
749
_ => None ,
737
750
} )
738
751
} )
0 commit comments