Skip to content

Add external_state_root and ignore_unhealthy_builders #381

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/flashblocks-rpc/src/flashblocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl FlashblocksOverlay {
match msg {
Ok(Message::Binary(bytes)) => match try_decode_message(&bytes) {
Ok(payload) => {
info!("Received payload: {:?}", payload);
debug!("Received payload: {:?}", payload);

let _ = sender
.send(InternalMessage::NewPayload(payload))
Expand Down
13 changes: 13 additions & 0 deletions crates/rollup-boost/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,15 @@ pub struct RollupBoostArgs {
#[arg(long, env)]
pub block_selection_policy: Option<BlockSelectionPolicy>,

/// Should we use the l2 client for computing state root
#[arg(long, env, default_value = "false")]
pub external_state_root: bool,

/// Allow all engine API calls to builder even when marked as unhealthy
/// This is default true assuming no builder CL set up
#[arg(long, env, default_value = "false")]
pub ignore_unhealthy_builders: bool,

#[clap(flatten)]
pub flashblocks: FlashblocksArgs,
}
Expand Down Expand Up @@ -160,6 +169,8 @@ impl RollupBoostArgs {
execution_mode.clone(),
self.block_selection_policy,
probes.clone(),
self.external_state_root,
self.ignore_unhealthy_builders,
);

let health_handle = rollup_boost
Expand All @@ -175,6 +186,8 @@ impl RollupBoostArgs {
execution_mode.clone(),
self.block_selection_policy,
probes.clone(),
self.external_state_root,
self.ignore_unhealthy_builders,
);

let health_handle = rollup_boost
Expand Down
63 changes: 62 additions & 1 deletion crates/rollup-boost/src/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use alloy_primitives::{B256, Bytes};
use futures::{StreamExt as _, stream};
use moka::future::Cache;

use alloy_rpc_types_engine::{ExecutionPayload, ExecutionPayloadV3, PayloadId};
use alloy_rpc_types_engine::{ExecutionPayload, ExecutionPayloadV3, PayloadAttributes, PayloadId};
use op_alloy_rpc_types_engine::{
OpExecutionPayloadEnvelopeV3, OpExecutionPayloadEnvelopeV4, OpExecutionPayloadV4,
};
Expand Down Expand Up @@ -61,6 +61,67 @@ impl OpExecutionPayloadEnvelope {
.len(),
}
}

pub fn transactions(&self) -> Vec<Bytes> {
match self {
OpExecutionPayloadEnvelope::V3(payload) => payload
.execution_payload
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks scary

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if there's a better way to structure this though, seems like the only way to fetch the data

.payload_inner
.payload_inner
.transactions
.clone(),
OpExecutionPayloadEnvelope::V4(payload) => payload
.execution_payload
.payload_inner
.payload_inner
.payload_inner
.transactions
.clone(),
}
}

pub fn payload_attributes(&self) -> PayloadAttributes {
match self {
OpExecutionPayloadEnvelope::V3(payload) => PayloadAttributes {
timestamp: payload.execution_payload.payload_inner.timestamp(),
prev_randao: payload
.execution_payload
.payload_inner
.payload_inner
.prev_randao,
suggested_fee_recipient: payload
.execution_payload
.payload_inner
.payload_inner
.fee_recipient,
withdrawals: Some(payload.execution_payload.withdrawals().clone()),
parent_beacon_block_root: Some(payload.parent_beacon_block_root),
},
OpExecutionPayloadEnvelope::V4(payload) => PayloadAttributes {
timestamp: payload.execution_payload.payload_inner.timestamp(),
prev_randao: payload
.execution_payload
.payload_inner
.payload_inner
.payload_inner
.prev_randao,
suggested_fee_recipient: payload
.execution_payload
.payload_inner
.payload_inner
.payload_inner
.fee_recipient,
withdrawals: Some(
payload
.execution_payload
.payload_inner
.withdrawals()
.clone(),
),
parent_beacon_block_root: Some(payload.parent_beacon_block_root),
},
}
}
}

impl From<OpExecutionPayloadEnvelope> for ExecutionPayload {
Expand Down
125 changes: 116 additions & 9 deletions crates/rollup-boost/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,11 @@ use op_alloy_rpc_types_engine::{
};
use opentelemetry::trace::SpanKind;
use parking_lot::Mutex;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::task::JoinHandle;
use tracing::{error, info, instrument};
use tracing::{debug, error, info, instrument};

pub type Request = HttpRequest;
pub type Response = HttpResponse;
Expand All @@ -49,8 +50,12 @@ pub struct RollupBoostServer<T: EngineApiExt> {
pub builder_client: Arc<T>,
pub payload_trace_context: Arc<PayloadTraceContext>,
block_selection_policy: Option<BlockSelectionPolicy>,
external_state_root: bool,
ignore_unhealthy_builders: bool,
execution_mode: Arc<Mutex<ExecutionMode>>,
probes: Arc<Probes>,
payload_to_fcu_request:
Arc<Mutex<HashMap<PayloadId, (ForkchoiceState, Option<OpPayloadAttributes>)>>>,
}

impl<T> RollupBoostServer<T>
Expand All @@ -63,6 +68,8 @@ where
initial_execution_mode: Arc<Mutex<ExecutionMode>>,
block_selection_policy: Option<BlockSelectionPolicy>,
probes: Arc<Probes>,
external_state_root: bool,
ignore_unhealthy_builders: bool,
) -> Self {
Self {
l2_client: Arc::new(l2_client),
Expand All @@ -71,6 +78,9 @@ where
payload_trace_context: Arc::new(PayloadTraceContext::new()),
execution_mode: initial_execution_mode,
probes,
external_state_root,
ignore_unhealthy_builders,
payload_to_fcu_request: Arc::new(Mutex::new(HashMap::new())),
}
}

Expand Down Expand Up @@ -121,7 +131,7 @@ where
.await;

// async call to builder to sync the builder node
if !self.execution_mode().is_disabled() {
if !self.execution_mode().is_disabled() && !self.should_skip_unhealthy_builder() {
let builder = self.builder_client.clone();
let new_payload_clone = new_payload.clone();
tokio::spawn(async move { builder.new_payload(new_payload_clone).await });
Expand Down Expand Up @@ -182,20 +192,84 @@ where
return RpcResult::Ok(None);
}

if self.should_skip_unhealthy_builder() {
info!(message = "builder is unhealthy, skipping get_payload call to builder");
return RpcResult::Ok(None);
}

// Get payload and validate with the local l2 client
tracing::Span::current().record("builder_has_payload", true);
info!(message = "builder has payload, calling get_payload on builder");

let payload = self.builder_client.get_payload(payload_id, version).await?;
let _ = self

if !self.external_state_root {
let _ = self
.l2_client
.new_payload(NewPayload::from(payload.clone()))
.await?;

return Ok(Some(payload));
}

let fcu_info = self
.payload_to_fcu_request
.lock()
.remove(&payload_id)
.unwrap()
.to_owned()
.clone();

let new_payload_attrs = match fcu_info.1.as_ref() {
Some(attrs) => OpPayloadAttributes {
payload_attributes: attrs.payload_attributes.clone(),
transactions: Some(payload.transactions()),
no_tx_pool: Some(true),
gas_limit: attrs.gas_limit,
eip_1559_params: attrs.eip_1559_params,
},
None => OpPayloadAttributes {
payload_attributes: payload.payload_attributes(),
transactions: Some(payload.transactions()),
no_tx_pool: Some(true),
gas_limit: None,
eip_1559_params: None,
},
};

let l2_result = self
.l2_client
.new_payload(NewPayload::from(payload.clone()))
.fork_choice_updated_v3(fcu_info.0, Some(new_payload_attrs))
.await?;

Ok(Some(payload))
if let Some(new_payload_id) = l2_result.payload_id {
debug!(
message = "sent FCU to l2 to calculate new state root",
"returned_payload_id" = %new_payload_id,
"old_payload_id" = %payload_id,
);
let l2_payload = self.l2_client.get_payload(new_payload_id, version).await;

match l2_payload {
Ok(new_payload) => {
debug!(
message = "received new state root payload from l2",
payload = ?new_payload,
);
return Ok(Some(new_payload));
}

Err(e) => {
error!(message = "error getting new state root payload from l2", error = %e);
return Err(e.into());
}
}
}

Ok(None)
};

let (l2_payload, builder_payload) = tokio::join!(l2_fut, builder_fut);
let (builder_payload, l2_payload) = tokio::join!(builder_fut, l2_fut);

// Evaluate the builder and l2 response and select the final payload
let (payload, context) = {
Expand Down Expand Up @@ -252,6 +326,7 @@ where
let inner_payload = ExecutionPayload::from(payload.clone());
let block_hash = inner_payload.block_hash();
let block_number = inner_payload.block_number();
let state_root = inner_payload.as_v1().state_root;

// Note: This log message is used by integration tests to track payload context.
// While not ideal to rely on log parsing, it provides a reliable way to verify behavior.
Expand All @@ -260,11 +335,16 @@ where
message = "returning block",
"hash" = %block_hash,
"number" = %block_number,
"state_root" = %state_root,
%context,
%payload_id,
);
Ok(payload)
}

fn should_skip_unhealthy_builder(&self) -> bool {
self.ignore_unhealthy_builders && !matches!(self.probes.health(), Health::Healthy)
}
}

impl<T> TryInto<RpcModule<()>> for RollupBoostServer<T>
Expand Down Expand Up @@ -357,6 +437,12 @@ where
return Ok(l2_fut.await?);
}

// If traffic to the unhealthy builder is not allowed and the builder is unhealthy,
if self.should_skip_unhealthy_builder() {
info!(message = "builder is unhealthy, skipping FCU to builder");
return Ok(l2_fut.await?);
}

let span = tracing::Span::current();
// If the fcu contains payload attributes and the tx pool is disabled,
// only forward the FCU to the default l2 client
Expand All @@ -378,6 +464,10 @@ where
span.id(),
)
.await;

self.payload_to_fcu_request
.lock()
.insert(payload_id, (fork_choice_state, payload_attributes));
}

// We always return the value from the l2 client
Expand All @@ -387,7 +477,7 @@ where
// to both the builder and the default l2 client
let builder_fut = self
.builder_client
.fork_choice_updated_v3(fork_choice_state, payload_attributes);
.fork_choice_updated_v3(fork_choice_state, payload_attributes.clone());

let (l2_result, builder_result) = tokio::join!(l2_fut, builder_fut);
let l2_response = l2_result?;
Expand All @@ -407,6 +497,10 @@ where
span.id(),
)
.await;

self.payload_to_fcu_request
.lock()
.insert(payload_id, (fork_choice_state, payload_attributes));
}

return Ok(l2_response);
Expand All @@ -416,15 +510,23 @@ where
// forward the fcu to the builder to keep it synced and immediately return the l2
// response without awaiting the builder
let builder_client = self.builder_client.clone();
let attrs_clone = payload_attributes.clone();
tokio::spawn(async move {
// It is not critical to wait for the builder response here
// During moments of high load, Op-node can send hundreds of FCU requests
// and we want to ensure that we don't block the main thread in those scenarios
builder_client
.fork_choice_updated_v3(fork_choice_state, payload_attributes)
.fork_choice_updated_v3(fork_choice_state, attrs_clone)
.await
});
return Ok(l2_fut.await?);
let l2_response = l2_fut.await?;
if let Some(payload_id) = l2_response.payload_id {
self.payload_to_fcu_request
.lock()
.insert(payload_id, (fork_choice_state, payload_attributes));
}

return Ok(l2_response);
}
}

Expand Down Expand Up @@ -669,12 +771,17 @@ pub mod tests {
let (probe_layer, probes) = ProbeLayer::new();
let execution_mode = Arc::new(Mutex::new(ExecutionMode::Enabled));

// For tests, set initial health to Healthy since we don't run health checks
probes.set_health(Health::Healthy);

let rollup_boost = RollupBoostServer::new(
l2_client,
builder_client,
execution_mode.clone(),
None,
probes.clone(),
false,
true,
);

let module: RpcModule<()> = rollup_boost.try_into().unwrap();
Expand Down
Loading
Loading