diff --git a/Cargo.lock b/Cargo.lock index 99efcd8..2e5b670 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1111,7 +1111,7 @@ dependencies = [ "idna", "ipnet", "once_cell", - "rand 0.9.1", + "rand 0.9.2", "ring", "thiserror 2.0.12", "tinyvec", @@ -1133,7 +1133,7 @@ dependencies = [ "moka", "once_cell", "parking_lot", - "rand 0.9.1", + "rand 0.9.2", "resolv-conf", "smallvec", "thiserror 2.0.12", @@ -1437,7 +1437,7 @@ dependencies = [ "hyper", "hyper-util", "log", - "rand 0.9.1", + "rand 0.9.2", "tokio", "url", "xmltree", @@ -1769,6 +1769,7 @@ dependencies = [ "irpc-derive", "n0-future", "postcard", + "rand 0.8.5", "serde", "tokio", "tracing", @@ -2483,7 +2484,7 @@ dependencies = [ "nested_enum_utils", "netwatch", "num_enum", - "rand 0.9.1", + "rand 0.9.2", "serde", "smallvec", "snafu", @@ -2626,7 +2627,7 @@ dependencies = [ "bytes", "getrandom 0.3.3", "lru-slab", - "rand 0.9.1", + "rand 0.9.2", "ring", "rustc-hash", "rustls", @@ -2690,9 +2691,9 @@ dependencies = [ [[package]] name = "rand" -version = "0.9.1" +version = "0.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9fbfd9d094a40bf3ae768db9361049ace4c0e04a4fd6b359518bd7b73a73dd97" +checksum = "6db2770f06117d490610c7488547d543617b21bfa07796d7a12f6f1bd53850d1" dependencies = [ "rand_chacha 0.9.0", "rand_core 0.9.3", @@ -3252,7 +3253,7 @@ dependencies = [ "precis-core", "precis-profiles", "quoted-string-parser", - "rand 0.9.1", + "rand 0.9.2", ] [[package]] @@ -3270,7 +3271,7 @@ dependencies = [ "hex", "parking_lot", "pnet_packet", - "rand 0.9.1", + "rand 0.9.2", "socket2 0.5.10", "thiserror 1.0.69", "tokio", @@ -3548,7 +3549,7 @@ dependencies = [ "getrandom 0.3.3", "http 1.3.1", "httparse", - "rand 0.9.1", + "rand 0.9.2", "ring", "rustls-pki-types", "simdutf8", diff --git a/irpc-iroh/Cargo.toml b/irpc-iroh/Cargo.toml index 8e62940..bda1751 100644 --- a/irpc-iroh/Cargo.toml +++ b/irpc-iroh/Cargo.toml @@ -27,3 +27,4 @@ n0-future = { workspace = true } tracing-subscriber = { workspace = true, features = ["fmt"] } irpc-derive = { version = "0.5.0", path = "../irpc-derive" } clap = { version = "4.5.41", features = ["derive"] } +rand = "0.8" diff --git a/irpc-iroh/examples/auth.rs b/irpc-iroh/examples/auth.rs index d7c2c92..78c52ff 100644 --- a/irpc-iroh/examples/auth.rs +++ b/irpc-iroh/examples/auth.rs @@ -3,8 +3,10 @@ //! * Manually implementing the connection loop //! * Authenticating peers +use std::time::Duration; + use anyhow::Result; -use iroh::{protocol::Router, Endpoint, Watcher}; +use iroh::{protocol::Router, Endpoint, NodeAddr, SecretKey, Watcher}; use self::storage::{StorageClient, StorageServer}; @@ -17,20 +19,28 @@ async fn main() -> Result<()> { } async fn remote() -> Result<()> { - let (server_router, server_addr) = { - let endpoint = Endpoint::builder().discovery_n0().bind().await?; + let server_secret_key = SecretKey::generate(&mut rand::rngs::OsRng); + let server_addr = NodeAddr::new(server_secret_key.public()); + let start_server = async move || { + let endpoint = Endpoint::builder() + .secret_key(server_secret_key.clone()) + .discovery_n0() + .bind() + .await?; let server = StorageServer::new("secret".to_string()); let router = Router::builder(endpoint.clone()) .accept(StorageServer::ALPN, server.clone()) .spawn(); - let addr = endpoint.node_addr().initialized().await; - (router, addr) + let _ = endpoint.home_relay().initialized().await; + // wait a bit for publishing to complete.. + tokio::time::sleep(Duration::from_millis(500)).await; + anyhow::Ok(router) }; + let mut server_router = (start_server)().await?; // correct authentication - let client_endpoint = Endpoint::builder().bind().await?; - let api = StorageClient::connect(client_endpoint, server_addr.clone()); - api.auth("secret").await?; + let client_endpoint = Endpoint::builder().discovery_n0().bind().await?; + let api = StorageClient::connect(client_endpoint, server_addr.clone(), "secret"); api.set("hello".to_string(), "world".to_string()).await?; api.set("goodbye".to_string(), "world".to_string()).await?; let value = api.get("hello".to_string()).await?; @@ -40,15 +50,21 @@ async fn remote() -> Result<()> { println!("list value = {value:?}"); } - // invalid authentication - let client_endpoint = Endpoint::builder().bind().await?; - let api = StorageClient::connect(client_endpoint, server_addr.clone()); - assert!(api.auth("bad").await.is_err()); - assert!(api.get("hello".to_string()).await.is_err()); + // restart server + server_router.shutdown().await?; + server_router = (start_server)().await?; + + // reconnections work: client will transparently reauthenticate + println!("restarting server"); + let value = api.get("hello".to_string()).await?; + println!("value = {value:?}"); + api.set("hello".to_string(), "world".to_string()).await?; + let value = api.get("hello".to_string()).await?; + println!("value = {value:?}"); - // no authentication + // invalid authentication let client_endpoint = Endpoint::builder().bind().await?; - let api = StorageClient::connect(client_endpoint, server_addr); + let api = StorageClient::connect(client_endpoint, server_addr.clone(), "bad"); assert!(api.get("hello".to_string()).await.is_err()); drop(server_router); @@ -65,7 +81,7 @@ mod storage { sync::{Arc, Mutex}, }; - use anyhow::Result; + use anyhow::{anyhow, Result}; use iroh::{ endpoint::Connection, protocol::{AcceptError, ProtocolHandler}, @@ -73,9 +89,8 @@ mod storage { }; use irpc::{ channel::{mpsc, oneshot}, - rpc_requests, Client, WithChannels, + rpc_requests, Client, Request, RequestError, WithChannels, }; - // Import the macro use irpc_iroh::{read_request, IrohRemoteConnection}; use serde::{Deserialize, Serialize}; use tracing::info; @@ -109,7 +124,8 @@ mod storage { #[rpc_requests(message = StorageMessage)] #[derive(Serialize, Deserialize, Debug)] enum StorageProtocol { - #[rpc(tx=oneshot::Sender>)] + // Connection will be closed if auth fails. + #[rpc(tx=oneshot::Sender<()>)] Auth(Auth), #[rpc(tx=oneshot::Sender>)] Get(Get), @@ -129,31 +145,29 @@ mod storage { impl ProtocolHandler for StorageServer { async fn accept(&self, conn: Connection) -> Result<(), AcceptError> { - let mut authed = false; - while let Some(msg) = read_request::(&conn).await? { - match msg { - StorageMessage::Auth(msg) => { - let WithChannels { inner, tx, .. } = msg; - if authed { - conn.close(1u32.into(), b"invalid message"); - break; - } else if inner.token != self.auth_token { - conn.close(1u32.into(), b"permission denied"); - break; - } else { - authed = true; - tx.send(Ok(())).await.ok(); - } - } - msg => { - if !authed { - conn.close(1u32.into(), b"permission denied"); - break; - } else { - self.handle_authenticated(msg).await; - } - } + // read first message: must be auth! + let msg = read_request::(&conn).await?; + let auth_ok = if let Some(StorageMessage::Auth(msg)) = msg { + let WithChannels { inner, tx, .. } = msg; + if inner.token == self.auth_token { + tx.send(()).await.ok(); + true + } else { + false } + } else { + false + }; + + // if not authenticated: close connection immediately. + if !auth_ok { + conn.close(1u32.into(), b"permission denied"); + return Ok(()); + } + + // now the connection is authenticated and we can handle all subsequent requests. + while let Some(msg) = read_request::(&conn).await? { + self.handle_request(msg).await; } conn.closed().await; Ok(()) @@ -170,7 +184,7 @@ mod storage { } } - async fn handle_authenticated(&self, msg: StorageMessage) { + async fn handle_request(&self, msg: StorageMessage) { match msg { StorageMessage::Auth(_) => unreachable!("handled in ProtocolHandler::accept"), StorageMessage::Get(get) => { @@ -218,39 +232,63 @@ mod storage { } pub struct StorageClient { + api_token: String, inner: Client, } impl StorageClient { pub const ALPN: &[u8] = ALPN; - pub fn connect(endpoint: Endpoint, addr: impl Into) -> StorageClient { + pub fn connect( + endpoint: Endpoint, + addr: impl Into, + api_token: &str, + ) -> StorageClient { let conn = IrohRemoteConnection::new(endpoint, addr.into(), Self::ALPN.to_vec()); StorageClient { + api_token: api_token.to_string(), inner: Client::boxed(conn), } } - pub async fn auth(&self, token: &str) -> Result<(), anyhow::Error> { - self.inner + async fn authenticated_request(&self) -> Result, irpc::Error> { + let request = self.inner.request().await?; + + // if the connection is not new: no need to reauthenticate. + if !request.is_new_connection() { + return Ok(request); + } + + // if this is a new connection: use this request to send an auth message. + request .rpc(Auth { - token: token.to_string(), + token: self.api_token.clone(), }) - .await? - .map_err(|err| anyhow::anyhow!(err)) + .await?; + // and create a new request for the actual call. + let request = self.inner.request().await?; + // if this *again* created a new connection, we error out. + if request.is_new_connection() { + Err(RequestError::Other(anyhow!("Connection is reconnecting too often")).into()) + } else { + Ok(request) + } } pub async fn get(&self, key: String) -> Result, irpc::Error> { - self.inner.rpc(Get { key }).await + self.authenticated_request().await?.rpc(Get { key }).await } pub async fn list(&self) -> Result, irpc::Error> { - self.inner.server_streaming(List, 10).await + self.authenticated_request() + .await? + .server_streaming(List, 10) + .await } pub async fn set(&self, key: String, value: String) -> Result<(), irpc::Error> { let msg = Set { key, value }; - self.inner.rpc(msg).await + self.authenticated_request().await?.rpc(msg).await } } } diff --git a/irpc-iroh/src/lib.rs b/irpc-iroh/src/lib.rs index 3d0d747..465d071 100644 --- a/irpc-iroh/src/lib.rs +++ b/irpc-iroh/src/lib.rs @@ -10,8 +10,8 @@ use iroh::{ use irpc::{ channel::RecvError, rpc::{ - Handler, RemoteConnection, RemoteService, ERROR_CODE_MAX_MESSAGE_SIZE_EXCEEDED, - MAX_MESSAGE_SIZE, + Handler, RemoteConnection, RemoteService, RemoteStreams, + ERROR_CODE_MAX_MESSAGE_SIZE_EXCEEDED, MAX_MESSAGE_SIZE, }, util::AsyncReadVarintExt, LocalSender, RequestError, @@ -60,7 +60,7 @@ impl RemoteConnection for IrohRemoteConnection { Box::new(self.clone()) } - fn open_bi(&self) -> BoxFuture> { + fn open_bi(&self) -> BoxFuture> { let this = self.0.clone(); Box::pin(async move { let mut guard = this.connection.lock().await; @@ -68,19 +68,24 @@ impl RemoteConnection for IrohRemoteConnection { Some(conn) => { // try to reuse the connection match conn.open_bi().await { - Ok(pair) => pair, + Ok(pair) => RemoteStreams::with_reused(pair), Err(_) => { // try with a new connection, just once *guard = None; - connect_and_open_bi(&this.endpoint, &this.addr, &this.alpn, guard) - .await - .map_err(RequestError::Other)? + let pair = + connect_and_open_bi(&this.endpoint, &this.addr, &this.alpn, guard) + .await + .map_err(RequestError::Other)?; + RemoteStreams::with_new(pair) } } } - None => connect_and_open_bi(&this.endpoint, &this.addr, &this.alpn, guard) - .await - .map_err(RequestError::Other)?, + None => { + let pair = connect_and_open_bi(&this.endpoint, &this.addr, &this.alpn, guard) + .await + .map_err(RequestError::Other)?; + RemoteStreams::with_new(pair) + } }; Ok(pair) }) diff --git a/src/lib.rs b/src/lib.rs index c2da344..198ad9f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1060,21 +1060,19 @@ impl Client { #[allow(clippy::type_complexity)] pub fn request( &self, - ) -> impl Future< - Output = result::Result, rpc::RemoteSender>, RequestError>, - > + 'static { + ) -> impl Future, RequestError>> + 'static { #[cfg(feature = "rpc")] { let cloned = match &self.0 { - ClientInner::Local(tx) => Request::Local(tx.clone()), - ClientInner::Remote(connection) => Request::Remote(connection.clone_boxed()), + ClientInner::Local(tx) => Either::Local(tx.clone()), + ClientInner::Remote(connection) => Either::Remote(connection.clone_boxed()), }; async move { match cloned { - Request::Local(tx) => Ok(Request::Local(tx.into())), - Request::Remote(conn) => { - let (send, recv) = conn.open_bi().await?; - Ok(Request::Remote(rpc::RemoteSender::new(send, recv))) + Either::Local(tx) => Ok(Request::Local(tx.into())), + Either::Remote(conn) => { + let remote_streams = conn.open_bi().await?; + Ok(Request::Remote(rpc::RemoteSender::new(remote_streams))) } } } @@ -1098,24 +1096,7 @@ impl Client { Res: RpcMessage, { let request = self.request(); - async move { - let recv: oneshot::Receiver = match request.await? { - Request::Local(request) => { - let (tx, rx) = oneshot::channel(); - request.send((msg, tx)).await?; - rx - } - #[cfg(not(feature = "rpc"))] - Request::Remote(_request) => unreachable!(), - #[cfg(feature = "rpc")] - Request::Remote(request) => { - let (_tx, rx) = request.write(msg).await?; - rx.into() - } - }; - let res = recv.await?; - Ok(res) - } + async move { request.await?.rpc(msg).await } } /// Performs a request for which the server returns a mpsc receiver. @@ -1132,21 +1113,10 @@ impl Client { { let request = self.request(); async move { - let recv: mpsc::Receiver = match request.await? { - Request::Local(request) => { - let (tx, rx) = mpsc::channel(local_response_cap); - request.send((msg, tx)).await?; - rx - } - #[cfg(not(feature = "rpc"))] - Request::Remote(_request) => unreachable!(), - #[cfg(feature = "rpc")] - Request::Remote(request) => { - let (_tx, rx) = request.write(msg).await?; - rx.into() - } - }; - Ok(recv) + request + .await? + .server_streaming(msg, local_response_cap) + .await } } @@ -1164,25 +1134,7 @@ impl Client { Res: RpcMessage, { let request = self.request(); - async move { - let (update_tx, res_rx): (mpsc::Sender, oneshot::Receiver) = - match request.await? { - Request::Local(request) => { - let (req_tx, req_rx) = mpsc::channel(local_update_cap); - let (res_tx, res_rx) = oneshot::channel(); - request.send((msg, res_tx, req_rx)).await?; - (req_tx, res_rx) - } - #[cfg(not(feature = "rpc"))] - Request::Remote(_request) => unreachable!(), - #[cfg(feature = "rpc")] - Request::Remote(request) => { - let (tx, rx) = request.write(msg).await?; - (tx.into(), rx.into()) - } - }; - Ok((update_tx, res_rx)) - } + async move { request.await?.client_streaming(msg, local_update_cap).await } } /// Performs a request for which the client can send updates, and the server returns a mpsc receiver. @@ -1201,23 +1153,10 @@ impl Client { { let request = self.request(); async move { - let (update_tx, res_rx): (mpsc::Sender, mpsc::Receiver) = - match request.await? { - Request::Local(request) => { - let (update_tx, update_rx) = mpsc::channel(local_update_cap); - let (res_tx, res_rx) = mpsc::channel(local_response_cap); - request.send((msg, res_tx, update_rx)).await?; - (update_tx, res_rx) - } - #[cfg(not(feature = "rpc"))] - Request::Remote(_request) => unreachable!(), - #[cfg(feature = "rpc")] - Request::Remote(request) => { - let (tx, rx) = request.write(msg).await?; - (tx.into(), rx.into()) - } - }; - Ok((update_tx, res_rx)) + request + .await? + .bidi_streaming(msg, local_update_cap, local_response_cap) + .await } } @@ -1231,20 +1170,7 @@ impl Client { Req: Channels, { let request = self.request(); - async move { - match request.await? { - Request::Local(request) => { - request.send((msg,)).await?; - } - #[cfg(not(feature = "rpc"))] - Request::Remote(_request) => unreachable!(), - #[cfg(feature = "rpc")] - Request::Remote(request) => { - let (_tx, _rx) = request.write(msg).await?; - } - }; - Ok(()) - } + async move { request.await?.notify(msg).await } } } @@ -1466,9 +1392,31 @@ pub mod rpc { fn clone_boxed(&self) -> Box; /// Open a bidirectional stream to the remote service. - fn open_bi( - &self, - ) -> BoxFuture>; + fn open_bi(&self) -> BoxFuture>; + } + + #[derive(Debug)] + pub struct RemoteStreams { + pub send: quinn::SendStream, + pub recv: quinn::RecvStream, + pub is_new_connection: bool, + } + + impl RemoteStreams { + pub fn with_reused(pair: (quinn::SendStream, quinn::RecvStream)) -> Self { + Self { + send: pair.0, + recv: pair.1, + is_new_connection: false, + } + } + pub fn with_new(pair: (quinn::SendStream, quinn::RecvStream)) -> Self { + Self { + send: pair.0, + recv: pair.1, + is_new_connection: true, + } + } } /// A connection to a remote service. @@ -1500,28 +1448,30 @@ pub mod rpc { Box::new(self.clone()) } - fn open_bi( - &self, - ) -> BoxFuture> - { + fn open_bi(&self) -> BoxFuture> { let this = self.0.clone(); Box::pin(async move { let mut guard = this.connection.lock().await; - let pair = match guard.as_mut() { + let streams = match guard.as_mut() { Some(conn) => { // try to reuse the connection match conn.open_bi().await { - Ok(pair) => pair, + Ok(pair) => RemoteStreams::with_reused(pair), Err(_) => { // try with a new connection, just once *guard = None; - connect_and_open_bi(&this.endpoint, &this.addr, guard).await? + let pair = + connect_and_open_bi(&this.endpoint, &this.addr, guard).await?; + RemoteStreams::with_new(pair) } } } - None => connect_and_open_bi(&this.endpoint, &this.addr, guard).await?, + None => { + let pair = connect_and_open_bi(&this.endpoint, &this.addr, guard).await?; + RemoteStreams::with_new(pair) + } }; - Ok(pair) + Ok(streams) }) } } @@ -1539,22 +1489,22 @@ pub mod rpc { /// A connection to a remote service that can be used to send the initial message. #[derive(Debug)] - pub struct RemoteSender( - quinn::SendStream, - quinn::RecvStream, - std::marker::PhantomData, - ); + pub struct RemoteSender(RemoteStreams, std::marker::PhantomData); impl RemoteSender { - pub fn new(send: quinn::SendStream, recv: quinn::RecvStream) -> Self { - Self(send, recv, PhantomData) + pub fn new(remote_streams: RemoteStreams) -> Self { + Self(remote_streams, PhantomData) + } + + pub fn is_new_connection(&self) -> bool { + self.0.is_new_connection } pub async fn write( self, msg: impl Into, ) -> std::result::Result<(quinn::SendStream, quinn::RecvStream), WriteError> { - let RemoteSender(mut send, recv, _) = self; + let RemoteStreams { mut send, recv, .. } = self.0; let msg = msg.into(); if postcard::experimental::serialized_size(&msg)? as u64 > MAX_MESSAGE_SIZE { return Err(WriteError::MaxMessageSizeExceeded); @@ -1970,13 +1920,173 @@ pub mod rpc { } } +pub enum Either { + Local(L), + Remote(R), +} + /// A request to a service. This can be either local or remote. -#[derive(Debug)] -pub enum Request { +pub enum Request { /// Local in memory request - Local(L), + Local(LocalSender), /// Remote cross process request - Remote(R), + #[cfg(feature = "rpc")] + Remote(crate::rpc::RemoteSender), + #[cfg(not(feature = "rpc"))] + Remote(()), +} + +impl Request { + pub fn is_new_connection(&self) -> bool { + match self { + Self::Local(_) => false, + #[cfg(feature = "rpc")] + Self::Remote(s) => s.is_new_connection(), + #[cfg(not(feature = "rpc"))] + Self::Remote(()) => false, + } + } + + /// Performs a request for which the server returns a oneshot receiver. + pub async fn rpc(self, msg: Req) -> Result + where + S: From, + S::Message: From>, + Req: Channels, Rx = NoReceiver>, + Res: RpcMessage, + { + let recv: oneshot::Receiver = match self { + Request::Local(request) => { + let (tx, rx) = oneshot::channel(); + request.send((msg, tx)).await?; + rx + } + #[cfg(not(feature = "rpc"))] + Request::Remote(_request) => unreachable!(), + #[cfg(feature = "rpc")] + Request::Remote(request) => { + let (_tx, rx) = request.write(msg).await?; + rx.into() + } + }; + let res = recv.await?; + Ok(res) + } + + /// Performs a request for which the server returns a mpsc receiver. + pub async fn server_streaming( + self, + msg: Req, + local_response_cap: usize, + ) -> Result> + where + S: From, + S::Message: From>, + Req: Channels, Rx = NoReceiver>, + Res: RpcMessage, + { + let recv: mpsc::Receiver = match self { + Request::Local(request) => { + let (tx, rx) = mpsc::channel(local_response_cap); + request.send((msg, tx)).await?; + rx + } + #[cfg(not(feature = "rpc"))] + Request::Remote(_request) => unreachable!(), + #[cfg(feature = "rpc")] + Request::Remote(request) => { + let (_tx, rx) = request.write(msg).await?; + rx.into() + } + }; + Ok(recv) + } + + /// Performs a request for which the client can send updates. + pub async fn client_streaming( + self, + msg: Req, + local_update_cap: usize, + ) -> Result<(mpsc::Sender, oneshot::Receiver)> + where + S: From, + S::Message: From>, + Req: Channels, Rx = mpsc::Receiver>, + Update: RpcMessage, + Res: RpcMessage, + { + let (update_tx, res_rx): (mpsc::Sender, oneshot::Receiver) = match self { + Request::Local(request) => { + let (req_tx, req_rx) = mpsc::channel(local_update_cap); + let (res_tx, res_rx) = oneshot::channel(); + request.send((msg, res_tx, req_rx)).await?; + (req_tx, res_rx) + } + #[cfg(not(feature = "rpc"))] + Request::Remote(_request) => unreachable!(), + #[cfg(feature = "rpc")] + Request::Remote(request) => { + let (tx, rx) = request.write(msg).await?; + (tx.into(), rx.into()) + } + }; + Ok((update_tx, res_rx)) + } + + /// Performs a request for which the client can send updates, and the server returns a mpsc receiver. + pub async fn bidi_streaming( + self, + msg: Req, + local_update_cap: usize, + local_response_cap: usize, + ) -> Result<(mpsc::Sender, mpsc::Receiver)> + where + S: From, + S::Message: From>, + Req: Channels, Rx = mpsc::Receiver>, + Update: RpcMessage, + Res: RpcMessage, + { + let (update_tx, res_rx): (mpsc::Sender, mpsc::Receiver) = match self { + Request::Local(request) => { + let (update_tx, update_rx) = mpsc::channel(local_update_cap); + let (res_tx, res_rx) = mpsc::channel(local_response_cap); + request.send((msg, res_tx, update_rx)).await?; + (update_tx, res_rx) + } + #[cfg(not(feature = "rpc"))] + Request::Remote(_request) => unreachable!(), + #[cfg(feature = "rpc")] + Request::Remote(request) => { + let (tx, rx) = request.write(msg).await?; + (tx.into(), rx.into()) + } + }; + Ok((update_tx, res_rx)) + } + + /// Performs a request for which the server returns nothing. + /// + /// The returned future completes once the message is sent. + pub async fn notify(self, msg: Req) -> Result<()> + where + S: From, + S::Message: From>, + Req: Channels, + { + match self { + Request::Local(request) => { + request.send((msg,)).await?; + } + #[cfg(not(feature = "rpc"))] + Request::Remote(_request) => unreachable!(), + #[cfg(feature = "rpc")] + Request::Remote(request) => { + let (_tx, _rx) = request.write(msg).await?; + } + }; + Ok(()) + } } impl LocalSender {