Skip to content

Commit 4c34b64

Browse files
authored
fix: streamable http client close on response (#268)
1 parent b9bba0b commit 4c34b64

File tree

1 file changed

+9
-1
lines changed

1 file changed

+9
-1
lines changed

crates/rmcp/src/transport/streamable_http_client.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,7 @@ impl<C: StreamableHttpClient> StreamableHttpClientWorker<C> {
211211
+ Send
212212
+ 'static,
213213
sse_worker_tx: tokio::sync::mpsc::Sender<ServerJsonRpcMessage>,
214+
close_on_response: bool,
214215
ct: CancellationToken,
215216
) -> Result<(), StreamableHttpError<C::Error>> {
216217
let mut sse_stream = std::pin::pin!(sse_stream);
@@ -227,12 +228,16 @@ impl<C: StreamableHttpClient> StreamableHttpClientWorker<C> {
227228
let Some(message) = message.transpose()? else {
228229
break;
229230
};
230-
231+
let is_response = matches!(message, ServerJsonRpcMessage::Response(_));
231232
let yield_result = sse_worker_tx.send(message).await;
232233
if yield_result.is_err() {
233234
tracing::trace!("streamable http transport worker dropped, exiting");
234235
break;
235236
}
237+
if close_on_response && is_response {
238+
tracing::debug!("got response, closing sse stream");
239+
break;
240+
}
236241
}
237242
Ok(())
238243
}
@@ -363,6 +368,7 @@ impl<C: StreamableHttpClient> Worker for StreamableHttpClientWorker<C> {
363368
streams.spawn(Self::execute_sse_stream(
364369
sse_stream,
365370
sse_worker_tx.clone(),
371+
false,
366372
transport_task_ct.child_token(),
367373
));
368374
tracing::debug!("got common stream");
@@ -439,6 +445,7 @@ impl<C: StreamableHttpClient> Worker for StreamableHttpClientWorker<C> {
439445
streams.spawn(Self::execute_sse_stream(
440446
sse_stream,
441447
sse_worker_tx.clone(),
448+
true,
442449
transport_task_ct.child_token(),
443450
));
444451
} else {
@@ -449,6 +456,7 @@ impl<C: StreamableHttpClient> Worker for StreamableHttpClientWorker<C> {
449456
streams.spawn(Self::execute_sse_stream(
450457
sse_stream,
451458
sse_worker_tx.clone(),
459+
true,
452460
transport_task_ct.child_token(),
453461
));
454462
}

0 commit comments

Comments
 (0)