diff --git a/crates/flashblocks-rpc/src/flashblocks.rs b/crates/flashblocks-rpc/src/flashblocks.rs index 689d7ea5..424c182a 100644 --- a/crates/flashblocks-rpc/src/flashblocks.rs +++ b/crates/flashblocks-rpc/src/flashblocks.rs @@ -46,7 +46,7 @@ impl FlashblocksOverlay { match msg { Ok(Message::Binary(bytes)) => match try_decode_message(&bytes) { Ok(payload) => { - info!("Received payload: {:?}", payload); + debug!("Received payload: {:?}", payload); let _ = sender .send(InternalMessage::NewPayload(payload)) diff --git a/crates/rollup-boost/src/cli.rs b/crates/rollup-boost/src/cli.rs index 3eb4d3d6..914a770a 100644 --- a/crates/rollup-boost/src/cli.rs +++ b/crates/rollup-boost/src/cli.rs @@ -93,6 +93,15 @@ pub struct RollupBoostArgs { #[arg(long, env)] pub block_selection_policy: Option, + /// Should we use the l2 client for computing state root + #[arg(long, env, default_value = "false")] + pub external_state_root: bool, + + /// Allow all engine API calls to builder even when marked as unhealthy + /// This is default true assuming no builder CL set up + #[arg(long, env, default_value = "false")] + pub ignore_unhealthy_builders: bool, + #[clap(flatten)] pub flashblocks: FlashblocksArgs, } @@ -160,6 +169,8 @@ impl RollupBoostArgs { execution_mode.clone(), self.block_selection_policy, probes.clone(), + self.external_state_root, + self.ignore_unhealthy_builders, ); let health_handle = rollup_boost @@ -175,6 +186,8 @@ impl RollupBoostArgs { execution_mode.clone(), self.block_selection_policy, probes.clone(), + self.external_state_root, + self.ignore_unhealthy_builders, ); let health_handle = rollup_boost diff --git a/crates/rollup-boost/src/payload.rs b/crates/rollup-boost/src/payload.rs index 1ed6707e..818c59d1 100644 --- a/crates/rollup-boost/src/payload.rs +++ b/crates/rollup-boost/src/payload.rs @@ -2,7 +2,7 @@ use alloy_primitives::{B256, Bytes}; use futures::{StreamExt as _, stream}; use moka::future::Cache; -use alloy_rpc_types_engine::{ExecutionPayload, ExecutionPayloadV3, PayloadId}; +use alloy_rpc_types_engine::{ExecutionPayload, ExecutionPayloadV3, PayloadAttributes, PayloadId}; use op_alloy_rpc_types_engine::{ OpExecutionPayloadEnvelopeV3, OpExecutionPayloadEnvelopeV4, OpExecutionPayloadV4, }; @@ -61,6 +61,67 @@ impl OpExecutionPayloadEnvelope { .len(), } } + + pub fn transactions(&self) -> Vec { + match self { + OpExecutionPayloadEnvelope::V3(payload) => payload + .execution_payload + .payload_inner + .payload_inner + .transactions + .clone(), + OpExecutionPayloadEnvelope::V4(payload) => payload + .execution_payload + .payload_inner + .payload_inner + .payload_inner + .transactions + .clone(), + } + } + + pub fn payload_attributes(&self) -> PayloadAttributes { + match self { + OpExecutionPayloadEnvelope::V3(payload) => PayloadAttributes { + timestamp: payload.execution_payload.payload_inner.timestamp(), + prev_randao: payload + .execution_payload + .payload_inner + .payload_inner + .prev_randao, + suggested_fee_recipient: payload + .execution_payload + .payload_inner + .payload_inner + .fee_recipient, + withdrawals: Some(payload.execution_payload.withdrawals().clone()), + parent_beacon_block_root: Some(payload.parent_beacon_block_root), + }, + OpExecutionPayloadEnvelope::V4(payload) => PayloadAttributes { + timestamp: payload.execution_payload.payload_inner.timestamp(), + prev_randao: payload + .execution_payload + .payload_inner + .payload_inner + .payload_inner + .prev_randao, + suggested_fee_recipient: payload + .execution_payload + .payload_inner + .payload_inner + .payload_inner + .fee_recipient, + withdrawals: Some( + payload + .execution_payload + .payload_inner + .withdrawals() + .clone(), + ), + parent_beacon_block_root: Some(payload.parent_beacon_block_root), + }, + } + } } impl From for ExecutionPayload { diff --git a/crates/rollup-boost/src/server.rs b/crates/rollup-boost/src/server.rs index d6f315a2..e2852542 100644 --- a/crates/rollup-boost/src/server.rs +++ b/crates/rollup-boost/src/server.rs @@ -33,10 +33,11 @@ use op_alloy_rpc_types_engine::{ }; use opentelemetry::trace::SpanKind; use parking_lot::Mutex; +use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; use tokio::task::JoinHandle; -use tracing::{error, info, instrument}; +use tracing::{debug, error, info, instrument}; pub type Request = HttpRequest; pub type Response = HttpResponse; @@ -49,8 +50,12 @@ pub struct RollupBoostServer { pub builder_client: Arc, pub payload_trace_context: Arc, block_selection_policy: Option, + external_state_root: bool, + ignore_unhealthy_builders: bool, execution_mode: Arc>, probes: Arc, + payload_to_fcu_request: + Arc)>>>, } impl RollupBoostServer @@ -63,6 +68,8 @@ where initial_execution_mode: Arc>, block_selection_policy: Option, probes: Arc, + external_state_root: bool, + ignore_unhealthy_builders: bool, ) -> Self { Self { l2_client: Arc::new(l2_client), @@ -71,6 +78,9 @@ where payload_trace_context: Arc::new(PayloadTraceContext::new()), execution_mode: initial_execution_mode, probes, + external_state_root, + ignore_unhealthy_builders, + payload_to_fcu_request: Arc::new(Mutex::new(HashMap::new())), } } @@ -121,7 +131,7 @@ where .await; // async call to builder to sync the builder node - if !self.execution_mode().is_disabled() { + if !self.execution_mode().is_disabled() && !self.should_skip_unhealthy_builder() { let builder = self.builder_client.clone(); let new_payload_clone = new_payload.clone(); tokio::spawn(async move { builder.new_payload(new_payload_clone).await }); @@ -182,20 +192,84 @@ where return RpcResult::Ok(None); } + if self.should_skip_unhealthy_builder() { + info!(message = "builder is unhealthy, skipping get_payload call to builder"); + return RpcResult::Ok(None); + } + // Get payload and validate with the local l2 client tracing::Span::current().record("builder_has_payload", true); info!(message = "builder has payload, calling get_payload on builder"); let payload = self.builder_client.get_payload(payload_id, version).await?; - let _ = self + + if !self.external_state_root { + let _ = self + .l2_client + .new_payload(NewPayload::from(payload.clone())) + .await?; + + return Ok(Some(payload)); + } + + let fcu_info = self + .payload_to_fcu_request + .lock() + .remove(&payload_id) + .unwrap() + .to_owned() + .clone(); + + let new_payload_attrs = match fcu_info.1.as_ref() { + Some(attrs) => OpPayloadAttributes { + payload_attributes: attrs.payload_attributes.clone(), + transactions: Some(payload.transactions()), + no_tx_pool: Some(true), + gas_limit: attrs.gas_limit, + eip_1559_params: attrs.eip_1559_params, + }, + None => OpPayloadAttributes { + payload_attributes: payload.payload_attributes(), + transactions: Some(payload.transactions()), + no_tx_pool: Some(true), + gas_limit: None, + eip_1559_params: None, + }, + }; + + let l2_result = self .l2_client - .new_payload(NewPayload::from(payload.clone())) + .fork_choice_updated_v3(fcu_info.0, Some(new_payload_attrs)) .await?; - Ok(Some(payload)) + if let Some(new_payload_id) = l2_result.payload_id { + debug!( + message = "sent FCU to l2 to calculate new state root", + "returned_payload_id" = %new_payload_id, + "old_payload_id" = %payload_id, + ); + let l2_payload = self.l2_client.get_payload(new_payload_id, version).await; + + match l2_payload { + Ok(new_payload) => { + debug!( + message = "received new state root payload from l2", + payload = ?new_payload, + ); + return Ok(Some(new_payload)); + } + + Err(e) => { + error!(message = "error getting new state root payload from l2", error = %e); + return Err(e.into()); + } + } + } + + Ok(None) }; - let (l2_payload, builder_payload) = tokio::join!(l2_fut, builder_fut); + let (builder_payload, l2_payload) = tokio::join!(builder_fut, l2_fut); // Evaluate the builder and l2 response and select the final payload let (payload, context) = { @@ -252,6 +326,7 @@ where let inner_payload = ExecutionPayload::from(payload.clone()); let block_hash = inner_payload.block_hash(); let block_number = inner_payload.block_number(); + let state_root = inner_payload.as_v1().state_root; // Note: This log message is used by integration tests to track payload context. // While not ideal to rely on log parsing, it provides a reliable way to verify behavior. @@ -260,11 +335,16 @@ where message = "returning block", "hash" = %block_hash, "number" = %block_number, + "state_root" = %state_root, %context, %payload_id, ); Ok(payload) } + + fn should_skip_unhealthy_builder(&self) -> bool { + self.ignore_unhealthy_builders && !matches!(self.probes.health(), Health::Healthy) + } } impl TryInto> for RollupBoostServer @@ -357,6 +437,12 @@ where return Ok(l2_fut.await?); } + // If traffic to the unhealthy builder is not allowed and the builder is unhealthy, + if self.should_skip_unhealthy_builder() { + info!(message = "builder is unhealthy, skipping FCU to builder"); + return Ok(l2_fut.await?); + } + let span = tracing::Span::current(); // If the fcu contains payload attributes and the tx pool is disabled, // only forward the FCU to the default l2 client @@ -378,6 +464,10 @@ where span.id(), ) .await; + + self.payload_to_fcu_request + .lock() + .insert(payload_id, (fork_choice_state, payload_attributes)); } // We always return the value from the l2 client @@ -387,7 +477,7 @@ where // to both the builder and the default l2 client let builder_fut = self .builder_client - .fork_choice_updated_v3(fork_choice_state, payload_attributes); + .fork_choice_updated_v3(fork_choice_state, payload_attributes.clone()); let (l2_result, builder_result) = tokio::join!(l2_fut, builder_fut); let l2_response = l2_result?; @@ -407,6 +497,10 @@ where span.id(), ) .await; + + self.payload_to_fcu_request + .lock() + .insert(payload_id, (fork_choice_state, payload_attributes)); } return Ok(l2_response); @@ -416,15 +510,23 @@ where // forward the fcu to the builder to keep it synced and immediately return the l2 // response without awaiting the builder let builder_client = self.builder_client.clone(); + let attrs_clone = payload_attributes.clone(); tokio::spawn(async move { // It is not critical to wait for the builder response here // During moments of high load, Op-node can send hundreds of FCU requests // and we want to ensure that we don't block the main thread in those scenarios builder_client - .fork_choice_updated_v3(fork_choice_state, payload_attributes) + .fork_choice_updated_v3(fork_choice_state, attrs_clone) .await }); - return Ok(l2_fut.await?); + let l2_response = l2_fut.await?; + if let Some(payload_id) = l2_response.payload_id { + self.payload_to_fcu_request + .lock() + .insert(payload_id, (fork_choice_state, payload_attributes)); + } + + return Ok(l2_response); } } @@ -669,12 +771,17 @@ pub mod tests { let (probe_layer, probes) = ProbeLayer::new(); let execution_mode = Arc::new(Mutex::new(ExecutionMode::Enabled)); + // For tests, set initial health to Healthy since we don't run health checks + probes.set_health(Health::Healthy); + let rollup_boost = RollupBoostServer::new( l2_client, builder_client, execution_mode.clone(), None, probes.clone(), + false, + true, ); let module: RpcModule<()> = rollup_boost.try_into().unwrap(); diff --git a/crates/rollup-boost/src/tests/common/mod.rs b/crates/rollup-boost/src/tests/common/mod.rs index 7d6f2a13..791d004b 100644 --- a/crates/rollup-boost/src/tests/common/mod.rs +++ b/crates/rollup-boost/src/tests/common/mod.rs @@ -231,6 +231,9 @@ pub struct RollupBoostTestHarnessBuilder { proxy_handler: Option>, isthmus_block: Option, block_time: u64, + external_state_root: bool, + ignore_unhealthy_builders: Option, + max_unsafe_interval: Option, } impl RollupBoostTestHarnessBuilder { @@ -240,6 +243,9 @@ impl RollupBoostTestHarnessBuilder { proxy_handler: None, isthmus_block: None, block_time: 1, + external_state_root: false, + ignore_unhealthy_builders: None, + max_unsafe_interval: None, } } @@ -290,6 +296,21 @@ impl RollupBoostTestHarnessBuilder { self } + pub fn with_l2_state_root_computation(mut self, enabled: bool) -> Self { + self.external_state_root = enabled; + self + } + + pub fn with_ignore_unhealthy_builders(mut self, enabled: bool) -> Self { + self.ignore_unhealthy_builders = Some(enabled); + self + } + + pub fn with_max_unsafe_interval(mut self, interval_secs: u64) -> Self { + self.max_unsafe_interval = Some(interval_secs); + self + } + pub async fn build(self) -> eyre::Result { let network = rand::random::().to_string(); let l2_log_consumer = self.log_consumer("l2").await?; @@ -363,6 +384,13 @@ impl RollupBoostTestHarnessBuilder { rollup_boost.args.l2_client.l2_url = l2.auth_rpc().await?; rollup_boost.args.builder.builder_url = builder_url.try_into().unwrap(); rollup_boost.args.log_file = Some(rollup_boost_log_file_path); + rollup_boost.args.external_state_root = self.external_state_root; + if let Some(allow_traffic) = self.ignore_unhealthy_builders { + rollup_boost.args.ignore_unhealthy_builders = allow_traffic; + } + if let Some(interval) = self.max_unsafe_interval { + rollup_boost.args.max_unsafe_interval = interval; + } let rollup_boost = rollup_boost.start().await; println!("rollup-boost authrpc: {}", rollup_boost.rpc_endpoint()); println!("rollup-boost metrics: {}", rollup_boost.metrics_endpoint()); diff --git a/crates/rollup-boost/src/tests/common/services/rollup_boost.rs b/crates/rollup-boost/src/tests/common/services/rollup_boost.rs index 4d68da29..dad579c8 100644 --- a/crates/rollup-boost/src/tests/common/services/rollup_boost.rs +++ b/crates/rollup-boost/src/tests/common/services/rollup_boost.rs @@ -51,6 +51,8 @@ impl Default for RollupBoostConfig { &format!("--l2-jwt-path={}/jwt_secret.hex", *TEST_DATA), &format!("--builder-jwt-path={}/jwt_secret.hex", *TEST_DATA), "--log-level=trace", + "--health-check-interval=1", // Set health check interval to 1 second for tests + "--max-unsafe-interval=60", // Increase max unsafe interval for tests ]); args.rpc_port = get_available_port(); diff --git a/crates/rollup-boost/src/tests/execution_mode.rs b/crates/rollup-boost/src/tests/execution_mode.rs index 8aa17ed4..3f2f4337 100644 --- a/crates/rollup-boost/src/tests/execution_mode.rs +++ b/crates/rollup-boost/src/tests/execution_mode.rs @@ -13,11 +13,17 @@ struct CounterHandler { impl BuilderProxyHandler for CounterHandler { fn handle( &self, - _method: String, + method: String, _params: Value, _result: Value, ) -> Pin> + Send>> { - *self.counter.lock().unwrap() += 1; + // Only count Engine API calls, not health check calls + if method != "eth_getBlockByNumber" { + *self.counter.lock().unwrap() += 1; + tracing::info!("Proxy handler intercepted Engine API call: {}", method); + } else { + tracing::debug!("Proxy handler intercepted health check call: {}", method); + } async move { None }.boxed() } } diff --git a/crates/rollup-boost/src/tests/l2_state_root_computation.rs b/crates/rollup-boost/src/tests/l2_state_root_computation.rs new file mode 100644 index 00000000..4112a02e --- /dev/null +++ b/crates/rollup-boost/src/tests/l2_state_root_computation.rs @@ -0,0 +1,75 @@ +use std::{pin::Pin, sync::Arc}; + +use super::common::{RollupBoostTestHarnessBuilder, proxy::BuilderProxyHandler}; +use alloy_primitives::B256; +use futures::FutureExt as _; +use serde_json::Value; + +use op_alloy_rpc_types_engine::OpExecutionPayloadEnvelopeV3; + +struct ZeroStateRootHandler; + +impl BuilderProxyHandler for ZeroStateRootHandler { + fn handle( + &self, + method: String, + _params: Value, + _result: Value, + ) -> Pin> + Send>> { + async move { + if method != "engine_getPayloadV3" { + return None; + } + + let mut payload = + serde_json::from_value::(_result).unwrap(); + + // Set state root to zero to simulate builder payload without computed state root + payload + .execution_payload + .payload_inner + .payload_inner + .state_root = B256::ZERO; + + let result = serde_json::to_value(&payload).unwrap(); + Some(result) + } + .boxed() + } +} + +#[tokio::test] +async fn test_l2_state_root_computation() -> eyre::Result<()> { + // Test that when builder returns payload with zero state root and L2 state root computation + // is enabled, rollup-boost uses L2 client to compute the correct state root + let harness = RollupBoostTestHarnessBuilder::new("l2_state_root_computation") + .proxy_handler(Arc::new(ZeroStateRootHandler)) + .with_l2_state_root_computation(true) + .build() + .await?; + + let mut block_generator = harness.block_generator().await?; + + // Generate blocks and verify they are processed successfully by the builder + // with L2 computing the correct state root + for _ in 0..3 { + let (_block, block_creator) = block_generator.generate_block(false).await?; + assert!( + block_creator.is_builder(), + "Block creator should be the builder" + ); + } + + // Check logs to verify L2 state root computation was used + let logs = std::fs::read_to_string(harness.rollup_boost.args().log_file.clone().unwrap())?; + assert!( + logs.contains("sent FCU to l2 to calculate new state root"), + "Logs should contain message indicating L2 state root computation was used" + ); + assert!( + logs.contains("received new state root payload from l2"), + "Logs should contain message indicating L2 returned corrected payload" + ); + + Ok(()) +} diff --git a/crates/rollup-boost/src/tests/mod.rs b/crates/rollup-boost/src/tests/mod.rs index 6a4d924e..bd0facbd 100644 --- a/crates/rollup-boost/src/tests/mod.rs +++ b/crates/rollup-boost/src/tests/mod.rs @@ -4,8 +4,10 @@ mod builder_full_delay; mod builder_returns_incorrect_block; mod execution_mode; mod fcu_no_block_time_delay; +mod l2_state_root_computation; mod no_tx_pool; mod remote_builder_down; mod simple; mod simple_isthmus; mod simple_isthmus_transition; +mod unhealthy_builder_traffic; diff --git a/crates/rollup-boost/src/tests/unhealthy_builder_traffic.rs b/crates/rollup-boost/src/tests/unhealthy_builder_traffic.rs new file mode 100644 index 00000000..126650d3 --- /dev/null +++ b/crates/rollup-boost/src/tests/unhealthy_builder_traffic.rs @@ -0,0 +1,110 @@ +use super::common::{RollupBoostTestHarnessBuilder, proxy::BuilderProxyHandler}; +use crate::ExecutionMode; +use futures::FutureExt as _; +use serde_json::Value; +use std::pin::Pin; +use std::sync::{Arc, Mutex}; +use std::time::Duration; + +struct CounterHandler { + counter: Arc>, +} + +impl BuilderProxyHandler for CounterHandler { + fn handle( + &self, + method: String, + _params: Value, + _result: Value, + ) -> Pin> + Send>> { + // Only count Engine API calls, not health check calls + if method != "eth_getBlockByNumber" { + *self.counter.lock().unwrap() += 1; + tracing::info!("Proxy handler intercepted Engine API call: {}", method); + } else { + tracing::debug!("Proxy handler intercepted health check call: {}", method); + } + async move { None }.boxed() + } +} + +#[tokio::test] +async fn no_traffic_to_unhealthy_builder_when_flag_disabled() -> eyre::Result<()> { + // Create a counter that tracks Engine API calls to the builder + let counter = Arc::new(Mutex::new(0)); + let handler = Arc::new(CounterHandler { + counter: counter.clone(), + }); + + // Create test harness with: + // - ignore_unhealthy_builders=true (key test parameter) + // - short max_unsafe_interval=1 to make builder unhealthy quickly + let harness = RollupBoostTestHarnessBuilder::new("no_traffic_unhealthy") + .proxy_handler(handler) + .with_ignore_unhealthy_builders(true) + .with_max_unsafe_interval(1) + .build() + .await?; + + // Override max_unsafe_interval to 1 second for this test + // We'll need to modify the config after build - let's access the rollup boost args + + let mut block_generator = harness.block_generator().await?; + let client = harness.debug_client().await; + + // Step 1: Disable execution mode so L2 moves ahead and builder falls behind + let response = client + .set_execution_mode(ExecutionMode::Disabled) + .await + .unwrap(); + assert_eq!(response.execution_mode, ExecutionMode::Disabled); + + // Step 2: Let L2 move ahead by generating some blocks + for _ in 0..3 { + let (_block, block_creator) = block_generator.generate_block(false).await?; + assert!( + block_creator.is_l2(), + "Blocks should be created by L2 when execution disabled" + ); + } + + // Step 3: Re-enable execution mode + let response = client + .set_execution_mode(ExecutionMode::Enabled) + .await + .unwrap(); + assert_eq!(response.execution_mode, ExecutionMode::Enabled); + + // Step 4:Wait for health check to run again and mark builder as unhealthy + // since execution mode is now enabled and builder timestamp is stale + tokio::time::sleep(Duration::from_secs(2)).await; + + // Reset counter after builder should be marked unhealthy + *counter.lock().unwrap() = 0; + + // Step 5: Generate blocks - builder should now be unhealthy and with flag=false, + // no engine API calls should go to the builder + for i in 0..5 { + let (_block, block_creator) = block_generator.generate_block(false).await?; + println!("Block {}: created by {:?}", i + 1, block_creator); + // Blocks should be created by L2 since builder is unhealthy and flag is false + assert!( + block_creator.is_l2(), + "Block creator should be L2 when builder is unhealthy and flag is false" + ); + } + + let final_count = *counter.lock().unwrap(); + println!( + "Engine API calls to builder with ignore_unhealthy_builders=true: {}", + final_count + ); + + // With flag=false and unhealthy builder, should see zero engine API calls + assert_eq!( + final_count, 0, + "Should see no Engine API calls to unhealthy builder when flag is false" + ); + + Ok(()) +}