diff --git a/Cargo.lock b/Cargo.lock index cf4bed5a888..5dcc79f1818 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7195,6 +7195,7 @@ name = "reth" version = "1.4.8" dependencies = [ "alloy-rpc-types", + "alloy-transport", "aquamarine", "backon", "clap", @@ -10393,11 +10394,16 @@ name = "reth-scroll-rpc" version = "1.4.8" dependencies = [ "alloy-consensus", + "alloy-json-rpc", "alloy-primitives", + "alloy-rpc-client", "alloy-rpc-types-eth", + "alloy-transport", + "alloy-transport-http", "eyre", "jsonrpsee-types", "parking_lot", + "reqwest", "reth-chainspec", "reth-evm", "reth-network-api", @@ -10422,6 +10428,7 @@ dependencies = [ "scroll-alloy-rpc-types", "thiserror 2.0.12", "tokio", + "tracing", ] [[package]] diff --git a/bin/reth/Cargo.toml b/bin/reth/Cargo.toml index 4d93ca5d73c..0c758f71c78 100644 --- a/bin/reth/Cargo.toml +++ b/bin/reth/Cargo.toml @@ -61,6 +61,7 @@ tokio = { workspace = true, features = ["sync", "macros", "time", "rt-multi-thre aquamarine.workspace = true clap = { workspace = true, features = ["derive", "env"] } eyre.workspace = true +alloy-transport.workspace = true [dev-dependencies] backon.workspace = true diff --git a/crates/scroll/rpc/Cargo.toml b/crates/scroll/rpc/Cargo.toml index f1fbbc9289c..e43bfba1338 100644 --- a/crates/scroll/rpc/Cargo.toml +++ b/crates/scroll/rpc/Cargo.toml @@ -42,6 +42,16 @@ alloy-primitives.workspace = true alloy-rpc-types-eth.workspace = true alloy-consensus.workspace = true revm.workspace = true +alloy-transport.workspace = true +alloy-json-rpc.workspace = true +alloy-rpc-client.workspace = true +alloy-transport-http.workspace = true + +# reqwest +reqwest = { workspace = true, default-features = false, features = ["rustls-tls-native-roots"] } + +# tracing +tracing.workspace = true # async parking_lot.workspace = true diff --git a/crates/scroll/rpc/src/error.rs b/crates/scroll/rpc/src/error.rs index 63570531669..a9f9a09227f 100644 --- a/crates/scroll/rpc/src/error.rs +++ b/crates/scroll/rpc/src/error.rs @@ -1,6 +1,9 @@ //! RPC errors specific to Scroll. +use alloy_json_rpc::ErrorPayload; use alloy_rpc_types_eth::BlockError; +use alloy_transport::{RpcError, TransportErrorKind}; +use jsonrpsee_types::error::{INTERNAL_ERROR_CODE}; use reth_evm::execute::ProviderError; use reth_rpc_eth_api::{AsEthApiError, TransactionConversionError}; use reth_rpc_eth_types::{error::api::FromEvmHalt, EthApiError}; @@ -12,12 +15,16 @@ pub enum ScrollEthApiError { /// L1 ethereum error. #[error(transparent)] Eth(#[from] EthApiError), + /// Sequencer client error. + #[error(transparent)] + Sequencer(#[from] SequencerClientError), } impl AsEthApiError for ScrollEthApiError { fn as_err(&self) -> Option<&EthApiError> { match self { Self::Eth(err) => Some(err), + _ => None, } } } @@ -26,6 +33,7 @@ impl From for jsonrpsee_types::error::ErrorObject<'static> { fn from(err: ScrollEthApiError) -> Self { match err { ScrollEthApiError::Eth(err) => err.into(), + ScrollEthApiError::Sequencer(err) => err.into(), } } } @@ -62,3 +70,31 @@ impl From for ScrollEthApiError { Self::Eth(EthApiError::from(value)) } } + +/// Error type when interacting with the Sequencer +#[derive(Debug, thiserror::Error)] +pub enum SequencerClientError { + /// Wrapper around an [`RpcError`]. + #[error(transparent)] + HttpError(#[from] RpcError), + /// Thrown when serializing transaction to forward to sequencer + #[error("invalid sequencer transaction")] + InvalidSequencerTransaction, +} + +impl From for jsonrpsee_types::error::ErrorObject<'static> { + fn from(err: SequencerClientError) -> Self { + match err { + SequencerClientError::HttpError(RpcError::ErrorResp(ErrorPayload { + code, + message, + data, + })) => jsonrpsee_types::error::ErrorObject::owned(code as i32, message, data), + err => jsonrpsee_types::error::ErrorObject::owned( + INTERNAL_ERROR_CODE, + err.to_string(), + None::, + ), + } + } +} diff --git a/crates/scroll/rpc/src/eth/mod.rs b/crates/scroll/rpc/src/eth/mod.rs index c89c1509fbb..c8bbf000f4b 100644 --- a/crates/scroll/rpc/src/eth/mod.rs +++ b/crates/scroll/rpc/src/eth/mod.rs @@ -1,6 +1,7 @@ //! Scroll-Reth `eth_` endpoint implementation. use alloy_primitives::U256; +use eyre::WrapErr; use reth_chainspec::{EthChainSpec, EthereumHardforks}; use reth_evm::ConfigureEvm; use reth_network_api::NetworkInfo; @@ -40,6 +41,8 @@ mod pending_block; pub mod receipt; pub mod transaction; +use crate::SequencerClient; + /// Adapter for [`EthApiInner`], which holds all the data required to serve core `eth_` API. pub type EthApiNodeBackend = EthApiInner< ::Provider, @@ -74,8 +77,8 @@ pub struct ScrollEthApi { impl ScrollEthApi { /// Creates a new [`ScrollEthApi`]. - pub fn new(eth_api: EthApiNodeBackend) -> Self { - let inner = Arc::new(ScrollEthApiInner { eth_api }); + pub fn new(eth_api: EthApiNodeBackend, sequencer_client: Option) -> Self { + let inner = Arc::new(ScrollEthApiInner { eth_api, sequencer_client }); Self { inner: inner.clone(), _nt: PhantomData, @@ -99,6 +102,11 @@ where self.inner.eth_api() } + /// Returns the configured sequencer client, if any. + pub fn sequencer_client(&self) -> Option<&SequencerClient> { + self.inner.sequencer_client() + } + /// Return a builder for the [`ScrollEthApi`]. pub const fn builder() -> ScrollEthApiBuilder { ScrollEthApiBuilder::new() @@ -302,6 +310,9 @@ impl fmt::Debug for ScrollEthApi { pub struct ScrollEthApiInner { /// Gateway to node's core components. pub eth_api: EthApiNodeBackend, + /// Sequencer client, configured to forward submitted transactions to sequencer of given Scroll + /// network. + sequencer_client: Option, } impl ScrollEthApiInner { @@ -309,16 +320,31 @@ impl ScrollEthApiInner { const fn eth_api(&self) -> &EthApiNodeBackend { &self.eth_api } + + /// Returns the configured sequencer client, if any. + const fn sequencer_client(&self) -> Option<&SequencerClient> { + self.sequencer_client.as_ref() + } } /// A type that knows how to build a [`ScrollEthApi`]. #[derive(Debug, Default)] -pub struct ScrollEthApiBuilder {} +pub struct ScrollEthApiBuilder { + /// Sequencer client, configured to forward submitted transactions to sequencer of given Scroll + /// network. + sequencer_url: Option, +} impl ScrollEthApiBuilder { /// Creates a [`ScrollEthApiBuilder`] instance. pub const fn new() -> Self { - Self {} + Self { sequencer_url: None } + } + + /// With a [`SequencerClient`]. + pub fn with_sequencer(mut self, sequencer_url: Option) -> Self { + self.sequencer_url = sequencer_url; + self } } @@ -330,6 +356,7 @@ where type EthApi = ScrollEthApi; async fn build_eth_api(self, ctx: EthApiCtx<'_, N>) -> eyre::Result { + let Self { sequencer_url } = self; let eth_api = reth_rpc::EthApiBuilder::new( ctx.components.provider().clone(), ctx.components.pool().clone(), @@ -345,6 +372,16 @@ where .proof_permits(ctx.config.proof_permits) .build_inner(); - Ok(ScrollEthApi::new(eth_api)) + let sequencer_client = if let Some(url) = sequencer_url { + Some( + SequencerClient::new(&url) + .await + .wrap_err_with(|| "Failed to init sequencer client with: {url}")?, + ) + } else { + None + }; + + Ok(ScrollEthApi::new(eth_api, sequencer_client)) } } diff --git a/crates/scroll/rpc/src/eth/transaction.rs b/crates/scroll/rpc/src/eth/transaction.rs index d044b1c7d33..5e492ea0955 100644 --- a/crates/scroll/rpc/src/eth/transaction.rs +++ b/crates/scroll/rpc/src/eth/transaction.rs @@ -2,7 +2,7 @@ use crate::{ eth::{ScrollEthApiInner, ScrollNodeCore}, - ScrollEthApi, + ScrollEthApi, ScrollEthApiError, SequencerClient, }; use alloy_consensus::transaction::TransactionInfo; use alloy_primitives::{Bytes, B256}; @@ -13,7 +13,7 @@ use reth_provider::{ }; use reth_rpc_eth_api::{ helpers::{EthSigner, EthTransactions, LoadTransaction, SpawnBlocking}, - try_into_scroll_tx_info, FromEthApiError, FullEthApiTypes, RpcNodeCore, RpcNodeCoreExt, + try_into_scroll_tx_info, EthApiTypes, FromEthApiError, FullEthApiTypes, RpcNodeCore, RpcNodeCoreExt, TxInfoMapper, }; use reth_rpc_eth_types::utils::recover_raw_transaction; @@ -27,7 +27,7 @@ use std::{ impl EthTransactions for ScrollEthApi where - Self: LoadTransaction, + Self: LoadTransaction + EthApiTypes, N: ScrollNodeCore>>, { fn signers(&self) -> &parking_lot::RwLock>>>> { @@ -41,6 +41,36 @@ where let recovered = recover_raw_transaction(&tx)?; let pool_transaction = ::Transaction::from_pooled(recovered); + // On scroll, transactions are forwarded directly to the sequencer to be included in + // blocks that it builds. + if let Some(client) = self.raw_tx_forwarder().as_ref() { + tracing::debug!(target: "rpc::eth", hash = %pool_transaction.hash(), "forwarding raw transaction to sequencer"); + + match client.forward_raw_transaction(&tx).await { + Ok(hash) => { + // Sequencer succeeded, try to add to local pool too + let _ = self + .pool() + .add_transaction(TransactionOrigin::Local, pool_transaction) + .await.inspect_err(|err| { + tracing::debug!(target: "rpc::eth", %err, %hash, "successfully sent tx to sequencer, but failed to persist in local tx pool"); + }); + return Ok(hash); + } + Err(err) => { + tracing::warn!(target: "rpc::eth", %err, hash=% *pool_transaction.hash(), "failed to forward raw transaction to sequencer"); + // Sequencer failed, try local pool instead + let hash = self + .pool() + .add_transaction(TransactionOrigin::Local, pool_transaction) + .await + .map_err(Self::Error::from_eth_err)?; + tracing::debug!(target: "rpc::eth", %hash, "failed to forward tx to sequencer, but successfully added to local tx pool"); + return Ok(hash); + } + } + } + // submit the transaction to the pool with a `Local` origin let hash = self .pool() @@ -60,6 +90,16 @@ where { } +impl ScrollEthApi +where + N: ScrollNodeCore, +{ + /// Returns the [`SequencerClient`] if one is set. + pub fn raw_tx_forwarder(&self) -> Option { + self.inner.sequencer_client.clone() + } +} + /// Scroll implementation of [`TxInfoMapper`]. /// /// Receipt is fetched to extract the `l1_fee` for all transactions but L1 messages. diff --git a/crates/scroll/rpc/src/lib.rs b/crates/scroll/rpc/src/lib.rs index a3058ffee02..116bc181c81 100644 --- a/crates/scroll/rpc/src/lib.rs +++ b/crates/scroll/rpc/src/lib.rs @@ -10,6 +10,8 @@ pub mod error; pub mod eth; +pub mod sequencer; -pub use error::ScrollEthApiError; +pub use error::{ScrollEthApiError, SequencerClientError}; pub use eth::{ScrollEthApi, ScrollReceiptBuilder}; +pub use sequencer::SequencerClient; \ No newline at end of file diff --git a/crates/scroll/rpc/src/sequencer.rs b/crates/scroll/rpc/src/sequencer.rs new file mode 100644 index 00000000000..5c7c5e610c9 --- /dev/null +++ b/crates/scroll/rpc/src/sequencer.rs @@ -0,0 +1,207 @@ +//! Helpers for scroll specific RPC implementations. + +use crate::SequencerClientError; +use alloy_json_rpc::{RpcRecv, RpcSend}; +use alloy_primitives::{hex, B256}; +use alloy_rpc_client::{BuiltInConnectionString, ClientBuilder, RpcClient as Client}; +use alloy_transport_http::Http; +use std::{str::FromStr, sync::Arc}; +use thiserror::Error; +use tracing::warn; + +/// Sequencer client error +#[derive(Error, Debug)] +pub enum Error { + /// Invalid scheme + #[error("Invalid scheme of sequencer url: {0}")] + InvalidScheme(String), + /// Invalid url + #[error("Invalid sequencer url: {0}")] + InvalidUrl(String), + /// Establishing a connection to the sequencer endpoint resulted in an error. + #[error("Failed to connect to sequencer: {0}")] + TransportError( + #[from] + #[source] + alloy_transport::TransportError, + ), + /// Reqwest failed to init client + #[error("Failed to init reqwest client for sequencer: {0}")] + ReqwestError( + #[from] + #[source] + reqwest::Error, + ), +} + +/// A client to interact with a Sequencer +#[derive(Debug, Clone)] +pub struct SequencerClient { + inner: Arc, +} + +impl SequencerClient { + /// Creates a new [`SequencerClient`] for the given URL. + /// + /// If the URL is a websocket endpoint we connect a websocket instance. + pub async fn new(sequencer_endpoint: impl Into) -> Result { + let sequencer_endpoint = sequencer_endpoint.into(); + let endpoint = BuiltInConnectionString::from_str(&sequencer_endpoint)?; + if let BuiltInConnectionString::Http(url) = endpoint { + let client = reqwest::Client::builder() + // we force use tls to prevent native issues + .use_rustls_tls() + .build()?; + Self::with_http_client(url, client) + } else { + let client = ClientBuilder::default().connect_with(endpoint).await?; + let inner = SequencerClientInner { sequencer_endpoint, client }; + Ok(Self { inner: Arc::new(inner) }) + } + } + + /// Creates a new [`SequencerClient`] with http transport with the given http client. + pub fn with_http_client( + sequencer_endpoint: impl Into, + client: reqwest::Client, + ) -> Result { + let sequencer_endpoint: String = sequencer_endpoint.into(); + let url = sequencer_endpoint + .parse() + .map_err(|_| Error::InvalidUrl(sequencer_endpoint.clone()))?; + + let http_client = Http::with_client(client, url); + let is_local = http_client.guess_local(); + let client = ClientBuilder::default().transport(http_client, is_local); + + let inner = SequencerClientInner { sequencer_endpoint, client }; + Ok(Self { inner: Arc::new(inner) }) + } + + /// Returns the network of the client + pub fn endpoint(&self) -> &str { + &self.inner.sequencer_endpoint + } + + /// Returns the client + pub fn client(&self) -> &Client { + &self.inner.client + } + + /// Sends a [`alloy_rpc_client::RpcCall`] request to the sequencer endpoint. + async fn send_rpc_call( + &self, + method: &str, + params: Params, + ) -> Result { + let resp = + self.client().request::(method.to_string(), params).await.inspect_err( + |err| { + warn!( + target: "rpc::sequencer", + %err, + "HTTP request to sequencer failed", + ); + }, + )?; + Ok(resp) + } + + /// Forwards a transaction to the sequencer endpoint. + pub async fn forward_raw_transaction(&self, tx: &[u8]) -> Result { + let rlp_hex = hex::encode_prefixed(tx); + let tx_hash = + self.send_rpc_call("eth_sendRawTransaction", (rlp_hex,)).await.inspect_err(|err| { + warn!( + target: "rpc::eth", + %err, + "Failed to forward transaction to sequencer", + ); + })?; + + Ok(tx_hash) + } +} + +#[derive(Debug)] +struct SequencerClientInner { + /// The endpoint of the sequencer + sequencer_endpoint: String, + /// The client + client: Client, +} + +#[cfg(test)] +mod tests { + use super::*; + use alloy_primitives::U64; + + #[tokio::test] + async fn test_http_body_str() { + let client = SequencerClient::new("http://localhost:8545").await.unwrap(); + + let request = client + .client() + .make_request("eth_getBlockByNumber", (U64::from(10),)) + .serialize() + .unwrap() + .take_request(); + let body = request.get(); + + assert_eq!( + body, + r#"{"method":"eth_getBlockByNumber","params":["0xa"],"id":0,"jsonrpc":"2.0"}"# + ); + + let request = client + .client() + .make_request( + "eth_sendRawTransaction", + format!("0x{}", hex::encode("abcd")), + ) + .serialize() + .unwrap() + .take_request(); + let body = request.get(); + + assert_eq!( + body, + r#"{"method":"eth_sendRawTransaction","params":"0x61626364","id":1,"jsonrpc":"2.0"}"# + ); + } + + #[tokio::test] + #[ignore = "Start if WS is reachable at ws://localhost:8546"] + async fn test_ws_body_str() { + let client = SequencerClient::new("ws://localhost:8546").await.unwrap(); + + let request = client + .client() + .make_request("eth_getBlockByNumber", (U64::from(10),)) + .serialize() + .unwrap() + .take_request(); + let body = request.get(); + + assert_eq!( + body, + r#"{"method":"eth_getBlockByNumber","params":["0xa"],"id":0,"jsonrpc":"2.0"}"# + ); + + let request = client + .client() + .make_request( + "eth_sendRawTransaction", + format!("0x{}", hex::encode("abcd")), + ) + .serialize() + .unwrap() + .take_request(); + let body = request.get(); + + assert_eq!( + body, + r#"{"method":"eth_sendRawTransaction","params":"0x61626364","id":1,"jsonrpc":"2.0"}"# + ); + } +}