Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/database/db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ mod models;
pub use models::*;

mod operations;
pub use operations::DatabaseOperations;
pub use operations::{DatabaseOperations, UnwindResult};

mod transaction;
pub use transaction::DatabaseTransaction;
Expand Down
81 changes: 74 additions & 7 deletions crates/database/db/src/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,20 @@ pub trait DatabaseOperations: DatabaseConnectionProvider {
async fn insert_batch(&self, batch_commit: BatchCommitData) -> Result<(), DatabaseError> {
tracing::trace!(target: "scroll::db", batch_hash = ?batch_commit.hash, batch_index = batch_commit.index, "Inserting batch input into database.");
let batch_commit: models::batch_commit::ActiveModel = batch_commit.into();
match models::batch_commit::Entity::insert(batch_commit)
Ok(models::batch_commit::Entity::insert(batch_commit)
.on_conflict(
OnConflict::column(models::batch_commit::Column::Index).do_nothing().to_owned(),
OnConflict::column(models::batch_commit::Column::Index)
.update_columns(vec![
models::batch_commit::Column::Hash,
models::batch_commit::Column::BlockNumber,
models::batch_commit::Column::BlockTimestamp,
models::batch_commit::Column::FinalizedBlockNumber,
])
.to_owned(),
)
.exec(self.get_connection())
.await
{
Err(sea_orm::DbErr::RecordNotInserted) => Ok(()),
x => Ok(x.map(|_| ())?),
}
.map(|_| ())?)
}

/// Finalize a [`BatchCommitData`] with the provided `batch_hash` in the database and set the
Expand Down Expand Up @@ -300,8 +304,13 @@ pub trait DatabaseOperations: DatabaseConnectionProvider {
/// L1 block number at which this batch was produced. It then retrieves the latest block for
/// the previous batch (i.e., the batch before the latest safe block). It then returns a tuple
/// of this latest fetched block and the L1 block number of the batch.
async fn get_startup_data(&self) -> Result<(Option<BlockInfo>, Option<u64>), DatabaseError> {
async fn get_startup_data(
&self,
genesis_hash: B256,
) -> Result<(Option<BlockInfo>, Option<u64>), DatabaseError> {
tracing::trace!(target: "scroll::db", "Fetching startup safe block from database.");
let finalized_block_number = self.get_finalized_l1_block_number().await?.unwrap_or(0);
self.unwind(genesis_hash, finalized_block_number).await?;
let safe = if let Some(batch_info) = self
.get_latest_safe_l2_info()
.await?
Expand Down Expand Up @@ -408,6 +417,64 @@ pub trait DatabaseOperations: DatabaseConnectionProvider {
Ok(None)
}
}

/// Unwinds the indexer by deleting all indexed data greater than the provided L1 block number.
async fn unwind(
&self,
genesis_hash: B256,
l1_block_number: u64,
) -> Result<UnwindResult, DatabaseError> {
// delete batch inputs and l1 messages
let batches_removed = self.delete_batches_gt(l1_block_number).await?;
let deleted_messages = self.delete_l1_messages_gt(l1_block_number).await?;

// filter and sort the executed L1 messages
let mut removed_executed_l1_messages: Vec<_> =
deleted_messages.into_iter().filter(|x| x.l2_block_number.is_some()).collect();
removed_executed_l1_messages
.sort_by(|a, b| a.transaction.queue_index.cmp(&b.transaction.queue_index));

// check if we need to reorg the L2 head and delete some L2 blocks
let (queue_index, l2_head_block_info) =
if let Some(msg) = removed_executed_l1_messages.first() {
let l2_reorg_block_number = msg
.l2_block_number
.expect("we guarantee that this is Some(u64) due to the filter above") -
1;
let l2_block_info = self.get_l2_block_info_by_number(l2_reorg_block_number).await?;
self.delete_l2_blocks_gt(l2_reorg_block_number).await?;
(Some(msg.transaction.queue_index), l2_block_info)
} else {
(None, None)
};

// check if we need to reorg the L2 safe block
let l2_safe_block_info = if batches_removed > 0 {
if let Some(x) = self.get_latest_safe_l2_info().await? {
Some(x.0)
} else {
Some(BlockInfo::new(0, genesis_hash))
}
} else {
None
};

// commit the transaction
Ok(UnwindResult { l1_block_number, queue_index, l2_head_block_info, l2_safe_block_info })
}
}

/// The result of [`DatabaseOperations::unwind`].
#[derive(Debug)]
pub struct UnwindResult {
/// The L1 block number that we unwinded to.
pub l1_block_number: u64,
/// The latest unconsumed queue index after the uwnind.
pub queue_index: Option<u64>,
/// The L2 head block info after the unwind. This is only populated if the L2 head has reorged.
pub l2_head_block_info: Option<BlockInfo>,
/// The L2 safe block info after the unwind. This is only populated if the L2 safe has reorged.
pub l2_safe_block_info: Option<BlockInfo>,
}

impl<T> DatabaseOperations for T where T: DatabaseConnectionProvider {}
13 changes: 11 additions & 2 deletions crates/indexer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use rollup_node_primitives::{
use rollup_node_watcher::L1Notification;
use scroll_alloy_consensus::TxL1Message;
use scroll_alloy_hardforks::{ScrollHardfork, ScrollHardforks};
use scroll_db::{Database, DatabaseError, DatabaseOperations};
use scroll_db::{Database, DatabaseError, DatabaseOperations, UnwindResult};
use std::{
collections::{HashMap, VecDeque},
pin::Pin,
Expand Down Expand Up @@ -171,7 +171,16 @@ impl<ChainSpec: ScrollHardforks + EthChainSpec + Send + Sync + 'static> Indexer<
chain_spec: Arc<ChainSpec>,
l1_block_number: u64,
) -> Result<IndexerEvent, IndexerError> {
unwind(database, chain_spec, l1_block_number).await
let txn = database.tx().await?;
let UnwindResult { l1_block_number, queue_index, l2_head_block_info, l2_safe_block_info } =
txn.unwind(chain_spec.genesis_hash(), l1_block_number).await?;
txn.commit().await?;
Ok(IndexerEvent::UnwindIndexed {
l1_block_number,
queue_index,
l2_head_block_info,
l2_safe_block_info,
})
}

/// Handles a finalized event by updating the indexer L1 finalized block and returning the new
Expand Down
5 changes: 2 additions & 3 deletions crates/node/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,8 @@ impl ScrollRollupNodeConfig {
// On startup we replay the latest batch of blocks from the database as such we set the safe
// block hash to the latest block hash associated with the previous consolidated
// batch in the database.
let finalized_block_number = db.get_finalized_l1_block_number().await?.unwrap_or(0);
rollup_node_indexer::unwind(db.clone(), chain_spec.clone(), finalized_block_number).await?;
let (startup_safe_block, l1_start_block_number) = db.get_startup_data().await?;
let (startup_safe_block, l1_start_block_number) =
db.get_startup_data(chain_spec.genesis_hash()).await?;
if let Some(block_info) = startup_safe_block {
fcs.update_safe_block_info(block_info);
} else {
Expand Down