diff --git a/hyperactor/src/channel/net.rs b/hyperactor/src/channel/net.rs index 280ecef14..e5c991cff 100644 --- a/hyperactor/src/channel/net.rs +++ b/hyperactor/src/channel/net.rs @@ -981,14 +981,14 @@ impl ServerConn { }, // Ignore retransmits. Ok(Frame::Message(seq, _)) if seq < next.seq => (), + // The following segment ensures exactly-once semantics. + // That means No out-of-order delivery and no duplicate delivery. Ok(Frame::Message(seq, message)) => { + // received seq should be equal to next seq. Else error out! if seq > next.seq { tracing::error!("out-of-sequence message from {}", self.source); - // TODO: T217549393: Enabling - // this next line causes test - // `channel::net::tests::test_tcp_reconnect` - // to unexpectedly fail. - // break (next, Err(anyhow::anyhow!("out-of-sequence message from {}", self.source))) + let next_seq = next.seq; + break (next, Err(anyhow::anyhow!("out-of-sequence message from {}, expected seq {}, got {}", self.source, next_seq, seq))) } match tx.send(message).await { Ok(()) => { @@ -2040,37 +2040,6 @@ mod tests { } } - #[tracing_test::traced_test] - #[async_timed_test(timeout_secs = 60)] - async fn test_tcp_reconnect() { - // Use temporary config for this test - let config = config::global::lock(); - let _guard = config.override_key(config::MESSAGE_ACK_EVERY_N_MESSAGES, 1); - let socket_addr: SocketAddr = "[::1]:0".parse().unwrap(); - let (local_addr, mut rx1) = tcp::serve::(socket_addr).await.unwrap(); - let local_socket = match local_addr { - ChannelAddr::Tcp(socket) => socket, - _ => panic!("unexpected channel type"), - }; - let tx = dial::(local_addr).unwrap(); - tx.try_post(101, unused_return_channel()).unwrap(); - assert_eq!(rx1.recv().await.unwrap(), 101); - // Wait long enough to ensure message is acked. - RealClock.sleep(Duration::from_secs(5)).await; - - // Stop the server. - rx1.2.stop("from testing"); - assert_matches!(rx1.recv().await.unwrap_err(), ChannelError::Closed); - - // Send the message is allowed even when the server is down. - tx.try_post(102, unused_return_channel()).unwrap(); - - // Start the server again. Need to serve on the same socket address - // because that is what tx knows. - let (_, mut rx2) = tcp::serve::(local_socket).await.unwrap(); - assert_eq!(rx2.recv().await.unwrap(), 102); - } - #[async_timed_test(timeout_secs = 30)] async fn test_ack_flush() { let config = config::global::lock();