Skip to content

Commit f0b8b45

Browse files
vidhyavfacebook-github-bot
authored andcommitted
Fixed a bug in the channel that was not detecting out of seq messages (meta-pytorch#663)
Summary: This was because we need to check for out of seq messages only if it is not the first message of the connection (even if it is for subsequent sessions). Differential Revision: D79044526
1 parent 43569d1 commit f0b8b45

File tree

1 file changed

+11
-15
lines changed

1 file changed

+11
-15
lines changed

hyperactor/src/channel/net.rs

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -961,13 +961,14 @@ impl<S: AsyncRead + AsyncWrite + Send + 'static + Unpin> ServerConn<S> {
961961
// Ignore retransmits.
962962
Ok(Frame::Message(seq, _)) if seq < next.seq => (),
963963
Ok(Frame::Message(seq, message)) => {
964-
if seq > next.seq {
964+
// received seq should be either equal to next seq or
965+
// it is the first ever payload for this connection (in which case,
966+
// the other side might be sending a payload from a previous lost connection
967+
// for the same session). Else error out!
968+
if seq > next.seq && next.seq != 0 {
965969
tracing::error!("out-of-sequence message from {}", self.source);
966-
// TODO: T217549393: Enabling
967-
// this next line causes test
968-
// `channel::net::tests::test_tcp_reconnect`
969-
// to unexpectedly fail.
970-
// break (next, Err(anyhow::anyhow!("out-of-sequence message from {}", self.source)))
970+
let next_seq = next.seq;
971+
break (next, Err(anyhow::anyhow!("out-of-sequence message from {}, expected seq {}, got {}", self.source, next_seq, seq)))
971972
}
972973
match tx.send(message).await {
973974
Ok(()) => {
@@ -1173,15 +1174,10 @@ impl SessionManager {
11731174
{
11741175
let session_id = conn.handshake::<M>().await?;
11751176

1176-
let session_var = match self.sessions.entry(session_id) {
1177-
Entry::Occupied(entry) => entry.get().clone(),
1178-
Entry::Vacant(entry) => {
1179-
// We haven't seen this session before. We begin with seq=0 and ack=0.
1180-
let var = MVar::full(Next { seq: 0, ack: 0 });
1181-
entry.insert(var.clone());
1182-
var
1183-
}
1184-
};
1177+
let session_var = self
1178+
.sessions
1179+
.entry(session_id)
1180+
.or_insert_with(|| MVar::full(Next { seq: 0, ack: 0 }));
11851181

11861182
let next = session_var.take().await;
11871183
let (next, res) = conn.process(session_id, tx, cancel_token, next).await;

0 commit comments

Comments
 (0)