Skip to content

Commit b79a68d

Browse files
vidhyavfacebook-github-bot
authored andcommitted
Fixed a bug in the channel that was not detecting out of seq messages (#663)
Summary: Now, we break the session if we detect an out of seq message. This was disabled because of a failing test. But it turns out the test is wrong. The test stops NetRx but that should kill NetTx because netrx failure results in the entire session state nullified on the receiver side. Reviewed By: pzhan9, mariusae Differential Revision: D79044526
1 parent 45c5574 commit b79a68d

File tree

1 file changed

+5
-36
lines changed

1 file changed

+5
-36
lines changed

hyperactor/src/channel/net.rs

Lines changed: 5 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -981,14 +981,14 @@ impl<S: AsyncRead + AsyncWrite + Send + 'static + Unpin> ServerConn<S> {
981981
},
982982
// Ignore retransmits.
983983
Ok(Frame::Message(seq, _)) if seq < next.seq => (),
984+
// The following segment ensures exactly-once semantics.
985+
// That means No out-of-order delivery and no duplicate delivery.
984986
Ok(Frame::Message(seq, message)) => {
987+
// received seq should be equal to next seq. Else error out!
985988
if seq > next.seq {
986989
tracing::error!("out-of-sequence message from {}", self.source);
987-
// TODO: T217549393: Enabling
988-
// this next line causes test
989-
// `channel::net::tests::test_tcp_reconnect`
990-
// to unexpectedly fail.
991-
// break (next, Err(anyhow::anyhow!("out-of-sequence message from {}", self.source)))
990+
let next_seq = next.seq;
991+
break (next, Err(anyhow::anyhow!("out-of-sequence message from {}, expected seq {}, got {}", self.source, next_seq, seq)))
992992
}
993993
match tx.send(message).await {
994994
Ok(()) => {
@@ -2040,37 +2040,6 @@ mod tests {
20402040
}
20412041
}
20422042

2043-
#[tracing_test::traced_test]
2044-
#[async_timed_test(timeout_secs = 60)]
2045-
async fn test_tcp_reconnect() {
2046-
// Use temporary config for this test
2047-
let config = config::global::lock();
2048-
let _guard = config.override_key(config::MESSAGE_ACK_EVERY_N_MESSAGES, 1);
2049-
let socket_addr: SocketAddr = "[::1]:0".parse().unwrap();
2050-
let (local_addr, mut rx1) = tcp::serve::<u64>(socket_addr).await.unwrap();
2051-
let local_socket = match local_addr {
2052-
ChannelAddr::Tcp(socket) => socket,
2053-
_ => panic!("unexpected channel type"),
2054-
};
2055-
let tx = dial::<u64>(local_addr).unwrap();
2056-
tx.try_post(101, unused_return_channel()).unwrap();
2057-
assert_eq!(rx1.recv().await.unwrap(), 101);
2058-
// Wait long enough to ensure message is acked.
2059-
RealClock.sleep(Duration::from_secs(5)).await;
2060-
2061-
// Stop the server.
2062-
rx1.2.stop("from testing");
2063-
assert_matches!(rx1.recv().await.unwrap_err(), ChannelError::Closed);
2064-
2065-
// Send the message is allowed even when the server is down.
2066-
tx.try_post(102, unused_return_channel()).unwrap();
2067-
2068-
// Start the server again. Need to serve on the same socket address
2069-
// because that is what tx knows.
2070-
let (_, mut rx2) = tcp::serve::<u64>(local_socket).await.unwrap();
2071-
assert_eq!(rx2.recv().await.unwrap(), 102);
2072-
}
2073-
20742043
#[async_timed_test(timeout_secs = 30)]
20752044
async fn test_ack_flush() {
20762045
let config = config::global::lock();

0 commit comments

Comments
 (0)