Skip to content

Commit 7f20a87

Browse files
authored
feat: include reqwest in transport-streamble-http-client feature (#376)
* fix: add reqwest dependency to transport-streamable-http-client feature - Fix compilation error when using transport-streamable-http-client feature due to missing dependency - Move From<reqwest::Error> implementation to reqwest module * feat(rmcp): enhance transport features by decoupling reqwest - Added reqwest features for reqwest-based implementations. - Updated documentation - Modified error handling in SSE transport to use `String` for content type. - Updated examples to include new features * feat(rmcp): enhance transport features by decoupling reqwest - Added reqwest features for reqwest-based implementations - Updated documentation - Modified error handling in SSE transport to use `String` - Updated examples to include new features
1 parent 7135f25 commit 7f20a87

File tree

10 files changed

+326
-29
lines changed

10 files changed

+326
-29
lines changed

crates/rmcp/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,12 +103,14 @@ server-side-http = [
103103
client-side-sse = ["dep:sse-stream", "dep:http"]
104104

105105
transport-sse-client = ["client-side-sse", "transport-worker"]
106+
transport-sse-client-reqwest = ["transport-sse-client", "reqwest"]
106107

107108
transport-worker = ["dep:tokio-stream"]
108109

109110

110111
# Streamable HTTP client
111112
transport-streamable-http-client = ["client-side-sse", "transport-worker"]
113+
transport-streamable-http-client-reqwest = ["transport-streamable-http-client", "reqwest"]
112114

113115

114116
transport-async-rw = ["tokio/io-util", "tokio-util/codec"]

crates/rmcp/README.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -199,8 +199,10 @@ RMCP uses feature flags to control which components are included:
199199
- `transport-async-rw`: Async read/write support
200200
- `transport-io`: I/O stream support
201201
- `transport-child-process`: Child process support
202-
- `transport-sse-client` / `transport-sse-server`: SSE support
203-
- `transport-streamable-http-client` / `transport-streamable-http-server`: HTTP streaming
202+
- `transport-sse-client` / `transport-sse-server`: SSE support (client agnostic)
203+
- `transport-sse-client-reqwest`: a default `reqwest` implementation of the SSE client
204+
- `transport-streamable-http-client` / `transport-streamable-http-server`: HTTP streaming (client agnostic, see [`StreamableHttpClientTransport`] for details)
205+
- `transport-streamable-http-client-reqwest`: a default `reqwest` implementation of the streamable http client
204206
- `auth`: OAuth2 authentication support
205207
- `schemars`: JSON Schema generation (for tool definitions)
206208

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
#[cfg(feature = "transport-streamable-http-client")]
2-
#[cfg_attr(docsrs, doc(cfg(feature = "transport-streamable-http-client")))]
1+
#[cfg(feature = "transport-streamable-http-client-reqwest")]
2+
#[cfg_attr(docsrs, doc(cfg(feature = "transport-streamable-http-client-reqwest")))]
33
mod streamable_http_client;
44

5-
#[cfg(feature = "transport-sse-client")]
6-
#[cfg_attr(docsrs, doc(cfg(feature = "transport-sse-client")))]
5+
#[cfg(feature = "transport-sse-client-reqwest")]
6+
#[cfg_attr(docsrs, doc(cfg(feature = "transport-sse-client-reqwest")))]
77
mod sse_client;

crates/rmcp/src/transport/common/reqwest/sse_client.rs

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,12 @@ use crate::transport::{
1111
sse_client::{SseClient, SseClientConfig, SseTransportError},
1212
};
1313

14+
impl From<reqwest::Error> for SseTransportError<reqwest::Error> {
15+
fn from(e: reqwest::Error) -> Self {
16+
SseTransportError::Client(e)
17+
}
18+
}
19+
1420
impl SseClient for reqwest::Client {
1521
type Error = reqwest::Error;
1622

@@ -55,7 +61,9 @@ impl SseClient for reqwest::Client {
5561
match response.headers().get(reqwest::header::CONTENT_TYPE) {
5662
Some(ct) => {
5763
if !ct.as_bytes().starts_with(EVENT_STREAM_MIME_TYPE.as_bytes()) {
58-
return Err(SseTransportError::UnexpectedContentType(Some(ct.clone())));
64+
return Err(SseTransportError::UnexpectedContentType(Some(
65+
String::from_utf8_lossy(ct.as_bytes()).to_string(),
66+
)));
5967
}
6068
}
6169
None => {
@@ -68,6 +76,33 @@ impl SseClient for reqwest::Client {
6876
}
6977

7078
impl SseClientTransport<reqwest::Client> {
79+
/// Creates a new transport using reqwest with the specified SSE endpoint.
80+
///
81+
/// This is a convenience method that creates a transport using the default
82+
/// reqwest client. This method is only available when the
83+
/// `transport-sse-client-reqwest` feature is enabled.
84+
///
85+
/// # Arguments
86+
///
87+
/// * `uri` - The SSE endpoint to connect to
88+
///
89+
/// # Example
90+
///
91+
/// ```rust
92+
/// use rmcp::transport::SseClientTransport;
93+
///
94+
/// // Enable the reqwest feature in Cargo.toml:
95+
/// // rmcp = { version = "0.5", features = ["transport-sse-client-reqwest"] }
96+
///
97+
/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
98+
/// let transport = SseClientTransport::start("http://localhost:8000/sse").await?;
99+
/// # Ok(())
100+
/// # }
101+
/// ```
102+
///
103+
/// # Feature requirement
104+
///
105+
/// This method requires the `transport-sse-client-reqwest` feature.
71106
pub async fn start(
72107
uri: impl Into<Arc<str>>,
73108
) -> Result<Self, SseTransportError<reqwest::Error>> {

crates/rmcp/src/transport/common/reqwest/streamable_http_client.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,12 @@ use crate::{
1414
},
1515
};
1616

17+
impl From<reqwest::Error> for StreamableHttpError<reqwest::Error> {
18+
fn from(e: reqwest::Error) -> Self {
19+
StreamableHttpError::Client(e)
20+
}
21+
}
22+
1723
impl StreamableHttpClient for reqwest::Client {
1824
type Error = reqwest::Error;
1925

@@ -125,6 +131,30 @@ impl StreamableHttpClient for reqwest::Client {
125131
}
126132

127133
impl StreamableHttpClientTransport<reqwest::Client> {
134+
/// Creates a new transport using reqwest with the specified URI.
135+
///
136+
/// This is a convenience method that creates a transport using the default
137+
/// reqwest client. This method is only available when the
138+
/// `transport-streamable-http-client-reqwest` feature is enabled.
139+
///
140+
/// # Arguments
141+
///
142+
/// * `uri` - The server URI to connect to
143+
///
144+
/// # Example
145+
///
146+
/// ```rust,no_run
147+
/// use rmcp::transport::StreamableHttpClientTransport;
148+
///
149+
/// // Enable the reqwest feature in Cargo.toml:
150+
/// // rmcp = { version = "0.5", features = ["transport-streamable-http-client-reqwest"] }
151+
///
152+
/// let transport = StreamableHttpClientTransport::from_uri("http://localhost:8000/mcp");
153+
/// ```
154+
///
155+
/// # Feature requirement
156+
///
157+
/// This method requires the `transport-streamable-http-client-reqwest` feature.
128158
pub fn from_uri(uri: impl Into<Arc<str>>) -> Self {
129159
StreamableHttpClientTransport::with_client(
130160
reqwest::Client::default(),

crates/rmcp/src/transport/sse_client.rs

Lines changed: 82 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ use std::{pin::Pin, sync::Arc};
33

44
use futures::{StreamExt, future::BoxFuture};
55
use http::Uri;
6-
use reqwest::header::HeaderValue;
76
use sse_stream::Error as SseError;
87
use thiserror::Error;
98

@@ -28,7 +27,7 @@ pub enum SseTransportError<E: std::error::Error + Send + Sync + 'static> {
2827
#[error("unexpected end of stream")]
2928
UnexpectedEndOfStream,
3029
#[error("Unexpected content type: {0:?}")]
31-
UnexpectedContentType(Option<HeaderValue>),
30+
UnexpectedContentType(Option<String>),
3231
#[cfg(feature = "auth")]
3332
#[cfg_attr(docsrs, doc(cfg(feature = "auth")))]
3433
#[error("Auth error: {0}")]
@@ -39,12 +38,6 @@ pub enum SseTransportError<E: std::error::Error + Send + Sync + 'static> {
3938
InvalidUriParts(#[from] http::uri::InvalidUriParts),
4039
}
4140

42-
impl From<reqwest::Error> for SseTransportError<reqwest::Error> {
43-
fn from(e: reqwest::Error) -> Self {
44-
SseTransportError::Client(e)
45-
}
46-
}
47-
4841
pub trait SseClient: Clone + Send + Sync + 'static {
4942
type Error: std::error::Error + Send + Sync + 'static;
5043
fn post_message(
@@ -77,6 +70,87 @@ impl<C: SseClient> SseStreamReconnect for SseClientReconnect<C> {
7770
}
7871
}
7972
type ServerMessageStream<C> = Pin<Box<SseAutoReconnectStream<SseClientReconnect<C>>>>;
73+
74+
/// A client-agnostic SSE transport for RMCP that supports Server-Sent Events.
75+
///
76+
/// This transport allows you to choose your preferred HTTP client implementation
77+
/// by implementing the [`SseClient`] trait. The transport handles SSE streaming
78+
/// and automatic reconnection.
79+
///
80+
/// # Usage
81+
///
82+
/// ## Using reqwest
83+
///
84+
/// ```rust
85+
/// use rmcp::transport::SseClientTransport;
86+
///
87+
/// // Enable the reqwest feature in Cargo.toml:
88+
/// // rmcp = { version = "0.5", features = ["transport-sse-client-reqwest"] }
89+
///
90+
/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
91+
/// let transport = SseClientTransport::start("http://localhost:8000/sse").await?;
92+
/// # Ok(())
93+
/// # }
94+
/// ```
95+
///
96+
/// ## Using a custom HTTP client
97+
///
98+
/// ```rust
99+
/// use rmcp::transport::sse_client::{SseClient, SseClientTransport, SseClientConfig};
100+
/// use std::sync::Arc;
101+
/// use futures::stream::BoxStream;
102+
/// use rmcp::model::ClientJsonRpcMessage;
103+
/// use sse_stream::{Sse, Error as SseError};
104+
/// use http::Uri;
105+
///
106+
/// #[derive(Clone)]
107+
/// struct MyHttpClient;
108+
///
109+
/// #[derive(Debug, thiserror::Error)]
110+
/// struct MyError;
111+
///
112+
/// impl std::fmt::Display for MyError {
113+
/// fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114+
/// write!(f, "MyError")
115+
/// }
116+
/// }
117+
///
118+
/// impl SseClient for MyHttpClient {
119+
/// type Error = MyError;
120+
///
121+
/// async fn post_message(
122+
/// &self,
123+
/// _uri: Uri,
124+
/// _message: ClientJsonRpcMessage,
125+
/// _auth_token: Option<String>,
126+
/// ) -> Result<(), rmcp::transport::sse_client::SseTransportError<Self::Error>> {
127+
/// todo!()
128+
/// }
129+
///
130+
/// async fn get_stream(
131+
/// &self,
132+
/// _uri: Uri,
133+
/// _last_event_id: Option<String>,
134+
/// _auth_token: Option<String>,
135+
/// ) -> Result<BoxStream<'static, Result<Sse, SseError>>, rmcp::transport::sse_client::SseTransportError<Self::Error>> {
136+
/// todo!()
137+
/// }
138+
/// }
139+
///
140+
/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
141+
/// let config = SseClientConfig {
142+
/// sse_endpoint: "http://localhost:8000/sse".into(),
143+
/// ..Default::default()
144+
/// };
145+
/// let transport = SseClientTransport::start_with_client(MyHttpClient, config).await?;
146+
/// # Ok(())
147+
/// # }
148+
/// ```
149+
///
150+
/// # Feature Flags
151+
///
152+
/// - `transport-sse-client`: Base feature providing the generic transport infrastructure
153+
/// - `transport-sse-client-reqwest`: Includes reqwest HTTP client support with convenience methods
80154
pub struct SseClientTransport<C: SseClient> {
81155
client: C,
82156
config: SseClientConfig,

0 commit comments

Comments
 (0)