Skip to content

Fixed a bug in the channel that was not detecting out of seq messages #663

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 5 additions & 36 deletions hyperactor/src/channel/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -981,14 +981,14 @@ impl<S: AsyncRead + AsyncWrite + Send + 'static + Unpin> ServerConn<S> {
},
// Ignore retransmits.
Ok(Frame::Message(seq, _)) if seq < next.seq => (),
// The following segment ensures exactly-once semantics.
// That means No out-of-order delivery and no duplicate delivery.
Ok(Frame::Message(seq, message)) => {
// received seq should be equal to next seq. Else error out!
if seq > next.seq {
tracing::error!("out-of-sequence message from {}", self.source);
// TODO: T217549393: Enabling
// this next line causes test
// `channel::net::tests::test_tcp_reconnect`
// to unexpectedly fail.
// break (next, Err(anyhow::anyhow!("out-of-sequence message from {}", self.source)))
let next_seq = next.seq;
break (next, Err(anyhow::anyhow!("out-of-sequence message from {}, expected seq {}, got {}", self.source, next_seq, seq)))
}
match tx.send(message).await {
Ok(()) => {
Expand Down Expand Up @@ -2040,37 +2040,6 @@ mod tests {
}
}

#[tracing_test::traced_test]
#[async_timed_test(timeout_secs = 60)]
async fn test_tcp_reconnect() {
// Use temporary config for this test
let config = config::global::lock();
let _guard = config.override_key(config::MESSAGE_ACK_EVERY_N_MESSAGES, 1);
let socket_addr: SocketAddr = "[::1]:0".parse().unwrap();
let (local_addr, mut rx1) = tcp::serve::<u64>(socket_addr).await.unwrap();
let local_socket = match local_addr {
ChannelAddr::Tcp(socket) => socket,
_ => panic!("unexpected channel type"),
};
let tx = dial::<u64>(local_addr).unwrap();
tx.try_post(101, unused_return_channel()).unwrap();
assert_eq!(rx1.recv().await.unwrap(), 101);
// Wait long enough to ensure message is acked.
RealClock.sleep(Duration::from_secs(5)).await;

// Stop the server.
rx1.2.stop("from testing");
assert_matches!(rx1.recv().await.unwrap_err(), ChannelError::Closed);

// Send the message is allowed even when the server is down.
tx.try_post(102, unused_return_channel()).unwrap();

// Start the server again. Need to serve on the same socket address
// because that is what tx knows.
let (_, mut rx2) = tcp::serve::<u64>(local_socket).await.unwrap();
assert_eq!(rx2.recv().await.unwrap(), 102);
}

#[async_timed_test(timeout_secs = 30)]
async fn test_ack_flush() {
let config = config::global::lock();
Expand Down