Skip to content

apollo_consensus_orchestrator: remove some panics from context #8047

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main-v0.14.0
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 105 additions & 34 deletions crates/apollo_consensus_orchestrator/src/sequencer_consensus_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use apollo_batcher_types::batcher_types::{
ProposalId,
StartHeightInput,
};
use apollo_batcher_types::communication::BatcherClient;
use apollo_batcher_types::communication::{BatcherClient, BatcherClientError};
use apollo_class_manager_types::transaction_converter::TransactionConverterTrait;
use apollo_consensus::types::{
ConsensusContext,
Expand All @@ -25,7 +25,11 @@ use apollo_consensus::types::{
Round,
ValidatorId,
};
use apollo_l1_gas_price_types::{EthToStrkOracleClientTrait, L1GasPriceProviderClient};
use apollo_l1_gas_price_types::{
EthToStrkOracleClientTrait,
L1GasPriceProviderClient,
DEFAULT_ETH_TO_FRI_RATE,
};
use apollo_network::network_manager::{BroadcastTopicClient, BroadcastTopicClientTrait};
use apollo_protobuf::consensus::{
ConsensusBlockInfo,
Expand All @@ -37,7 +41,7 @@ use apollo_protobuf::consensus::{
Vote,
DEFAULT_VALIDATOR_ID,
};
use apollo_state_sync_types::communication::StateSyncClient;
use apollo_state_sync_types::communication::{StateSyncClient, StateSyncClientError};
use apollo_state_sync_types::state_sync_types::SyncBlock;
use apollo_time::time::Clock;
use async_trait::async_trait;
Expand Down Expand Up @@ -378,8 +382,13 @@ impl ConsensusContext for SequencerConsensusContext {
}))
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()
.expect("Failed converting transaction during repropose");
.collect::<Result<Vec<_>, _>>();
let Ok(transactions) = transactions else {
// transaction_converter is an external dependency (class manager) and so
// we can't assume success on reproposal.
error!("Failed converting transaction during repropose: {transactions:?}");
return;
};

stream_sender
.send(ProposalPart::Transactions(TransactionBatch { transactions }))
Expand Down Expand Up @@ -437,10 +446,7 @@ impl ConsensusContext for SequencerConsensusContext {
let block_info;
{
let height = BlockNumber(height);
let mut proposals = self
.valid_proposals
.lock()
.expect("Lock on active proposals was poisoned due to a previous panic");
let mut proposals = self.valid_proposals.lock().unwrap();
(block_info, transactions, proposal_id) =
proposals.get_proposal(&height, &block).clone();

Expand All @@ -449,12 +455,8 @@ impl ConsensusContext for SequencerConsensusContext {

// TODO(dvir): return from the batcher's 'decision_reached' function the relevant data to
// build a blob.
let DecisionReachedResponse { state_diff, l2_gas_used, central_objects } = self
.deps
.batcher
.decision_reached(DecisionReachedInput { proposal_id })
.await
.expect("Failed to get state diff.");
let DecisionReachedResponse { state_diff, l2_gas_used, central_objects } =
self.batcher_decision_reached(proposal_id).await;

// Remove transactions that were not accepted by the Batcher, so `transactions` and
// `central_objects.execution_infos` correspond to the same list of (only accepted)
Expand Down Expand Up @@ -528,9 +530,7 @@ impl ConsensusContext for SequencerConsensusContext {
l1_transaction_hashes,
block_header_without_hash,
};
let state_sync_client = self.deps.state_sync_client.clone();
// `add_new_block` returns immediately, it doesn't wait for sync to fully process the block.
state_sync_client.add_new_block(sync_block).await.expect("Failed to add new block.");
self.sync_add_new_block(sync_block).await;

// Strip the transaction hashes from `execution_infos`, since we don't use it in the blob
// version of `execution_infos`.
Expand Down Expand Up @@ -602,15 +602,7 @@ impl ConsensusContext for SequencerConsensusContext {
);
return false;
}
let eth_to_fri_rate = sync_block
.block_header_without_hash
.l1_gas_price
.price_in_fri
.checked_mul_u128(WEI_PER_ETH)
.expect("Gas price overflow")
.checked_div(sync_block.block_header_without_hash.l1_gas_price.price_in_wei.0)
.expect("Price in wei should be non-zero")
.0;
let eth_to_fri_rate = get_eth_to_fri_rate(&sync_block);
self.previous_block_info = Some(ConsensusBlockInfo {
height,
timestamp: timestamp.0,
Expand All @@ -625,7 +617,7 @@ impl ConsensusContext for SequencerConsensusContext {
eth_to_fri_rate,
});
self.interrupt_active_proposal().await;
self.deps.batcher.add_sync_block(sync_block).await.unwrap();
self.batcher_add_sync_block(sync_block).await;
true
}

Expand All @@ -639,11 +631,7 @@ impl ConsensusContext for SequencerConsensusContext {
// that consensus works on a given height until it is done (either a decision is reached
// or sync causes us to move on) and then moves on to a different height, never to
// return to the old height.
self.deps
.batcher
.start_height(StartHeightInput { height })
.await
.expect("Batcher should be ready to start the next height");
self.batcher_start_height(height).await;
return;
}
assert_eq!(Some(height), self.current_height);
Expand Down Expand Up @@ -749,7 +737,73 @@ impl SequencerConsensusContext {
async fn interrupt_active_proposal(&mut self) {
if let Some((token, handle)) = self.active_proposal.take() {
token.cancel();
handle.await.expect("Proposal task failed");
handle.await.expect("Proposal task failed, propogating panic");
}
}

async fn batcher_decision_reached(
&mut self,
proposal_id: ProposalId,
) -> DecisionReachedResponse {
loop {
let input = DecisionReachedInput { proposal_id };
match self.deps.batcher.decision_reached(input).await {
Ok(response) => break response,
Err(BatcherClientError::BatcherError(e)) => {
panic!("Failed to add decision due to batcher error: {e:?}");
}
Err(BatcherClientError::ClientError(e)) => {
error!("Failed to add decision due to client error: {e:?}");
}
}
tokio::task::yield_now().await;
}
}

async fn batcher_add_sync_block(&mut self, sync_block: SyncBlock) {
loop {
match self.deps.batcher.add_sync_block(sync_block.clone()).await {
Ok(_) => break,
Err(BatcherClientError::BatcherError(e)) => {
panic!("Failed to add sync block due to batcher error: {e:?}");
}
Err(BatcherClientError::ClientError(e)) => {
error!("Failed to add sync block due to client error: {e:?}");
}
}
tokio::task::yield_now().await;
}
}

// `add_new_block` returns immediately, it doesn't wait for sync to fully process the block.
async fn sync_add_new_block(&mut self, sync_block: SyncBlock) {
loop {
match self.deps.state_sync_client.add_new_block(sync_block.clone()).await {
Ok(_) => break,
Err(StateSyncClientError::StateSyncError(e)) => {
panic!("Failed to add new block due to sync error: {e:?}");
}
Err(StateSyncClientError::ClientError(e)) => {
error!("Failed to add new block due to client error: {e:?}");
}
}
tokio::task::yield_now().await;
}
}

async fn batcher_start_height(&mut self, height: BlockNumber) {
loop {
let input = StartHeightInput { height };
match self.deps.batcher.start_height(input).await {
Ok(_) => break,
Err(BatcherClientError::BatcherError(e)) => {
panic!("Failed to start height due to batcher error: height={height} {e:?}");
}
Err(BatcherClientError::ClientError(e)) => {
error!("Failed to start height due to client error: height={height} {e:?}");
}
}
tokio::task::yield_now().await;
}
}
}
Expand All @@ -764,3 +818,20 @@ async fn validate_and_send(
.map_err(|_| ValidateProposalError::SendError(proposal_commitment))?;
Ok(proposal_commitment)
}

fn get_eth_to_fri_rate(sync_block: &SyncBlock) -> u128 {
let price_in_fri = sync_block.block_header_without_hash.l1_gas_price.price_in_fri;
let price_in_wei = sync_block.block_header_without_hash.l1_gas_price.price_in_wei.0;
price_in_fri
.checked_mul_u128(WEI_PER_ETH)
.map(|x| x.0)
.unwrap_or_else(|| {
error!("Gas price overflow");
u128::MAX
})
.checked_div(price_in_wei)
.unwrap_or_else(|| {
error!("Zero gas price");
DEFAULT_ETH_TO_FRI_RATE
})
}