From 5ea4e7fb2421561fad418286c76e4b077c5d7876 Mon Sep 17 00:00:00 2001 From: AshinGau Date: Mon, 30 Mar 2026 09:39:02 +0800 Subject: [PATCH] feat(unwind): add unwind operation for sdk&greth --- .../src/consensusdb/consensusdb_test.rs | 88 ++++++++++++++++++ aptos-core/consensus/src/consensusdb/mod.rs | 91 +++++++++++++++++++ .../src/persistent_liveness_storage.rs | 37 ++++++-- .../src/quorum_store/quorum_store_db.rs | 7 +- bin/gravity_cli/src/command.rs | 4 +- bin/gravity_cli/src/main.rs | 3 + bin/gravity_cli/src/unwind.rs | 54 +++++++++++ bin/gravity_node/src/cli.rs | 16 +++- bin/gravity_node/src/main.rs | 24 +++++ crates/api/src/consensus_mempool_handler.rs | 19 +++- 10 files changed, 325 insertions(+), 18 deletions(-) create mode 100644 bin/gravity_cli/src/unwind.rs diff --git a/aptos-core/consensus/src/consensusdb/consensusdb_test.rs b/aptos-core/consensus/src/consensusdb/consensusdb_test.rs index 0cbf2aa12..fc05017e4 100644 --- a/aptos-core/consensus/src/consensusdb/consensusdb_test.rs +++ b/aptos-core/consensus/src/consensusdb/consensusdb_test.rs @@ -115,3 +115,91 @@ fn test_dag() { let vote = Vote::new(node.metadata().clone(), Signature::dummy_signature()); test_dag_type::::Key>(node.id(), vote, &db); } + +#[test] +fn test_unwind_to_block() { + use aptos_consensus_types::{ + block::block_test_utils::placeholder_certificate_for_block, + common::Payload, + }; + + let tmp_dir = TempPath::new(); + let db = ConsensusDB::new(&tmp_dir, &PathBuf::new()); + + let epoch = 1u64; + let genesis = Block::make_genesis_block(); + let genesis_qc = certificate_for_genesis(); + + // Save genesis block and its block_number mapping + db.save_blocks_and_quorum_certificates(vec![genesis.clone()], vec![genesis_qc.clone()]) + .unwrap(); + db.save_block_numbers(vec![(epoch, 0, genesis.id())]).unwrap(); + + // Create 5 blocks (block_number 1..=5) in epoch 1 + let signer = gaptos::aptos_types::validator_signer::ValidatorSigner::random(None); + let mut parent_qc = genesis_qc; + let mut blocks = Vec::new(); + let mut qcs = Vec::new(); + let mut block_numbers = Vec::new(); + + for i in 1..=5u64 { + let block = Block::new_proposal( + Payload::empty(false, true), + i, + gaptos::aptos_infallible::duration_since_epoch().as_micros() as u64, + parent_qc.clone(), + &signer, + Vec::new(), + ) + .unwrap(); + block.set_block_number(i); + block_numbers.push((epoch, i, block.id())); + + let qc = placeholder_certificate_for_block( + &[signer.clone()], + block.id(), + i, + if i == 1 { genesis.id() } else { blocks.last().map(|b: &Block| b.id()).unwrap() }, + i - 1, + ); + + blocks.push(block); + qcs.push(qc.clone()); + parent_qc = qc; + } + + db.save_blocks_and_quorum_certificates(blocks.clone(), qcs.clone()).unwrap(); + db.save_block_numbers(block_numbers).unwrap(); + + // Save randomness for blocks 1..=5 + let randomness: Vec<(u64, Vec)> = (1..=5).map(|i| (i, vec![i as u8; 32])).collect(); + db.put_randomness(&randomness).unwrap(); + + // Save vote and timeout cert + db.save_vote(vec![1, 2, 3]).unwrap(); + db.save_highest_2chain_timeout_certificate(vec![4, 5, 6]).unwrap(); + + // Verify initial state: genesis + 5 = 6 blocks + assert_eq!(db.get_all::().unwrap().len(), 6); + assert_eq!(db.get_all::().unwrap().len(), 6); + + // === Unwind to block 3 === + db.unwind_to_block(3).unwrap(); + + // Blocks: genesis + 1,2,3 = 4 remaining + assert_eq!(db.get_all::().unwrap().len(), 4); + assert_eq!(db.get_all::().unwrap().len(), 4); + + // BlockNumbers: 0,1,2,3 remaining; 4,5 deleted + let bns: Vec = db.get_all::().unwrap().into_iter().map(|(_, v)| v).collect(); + for i in 0..=3 { assert!(bns.contains(&i), "block_number {} should remain", i); } + for i in 4..=5 { assert!(!bns.contains(&i), "block_number {} should be deleted", i); } + + // Randomness: 1,2,3 remain; 4,5 deleted + for i in 1..=3 { assert!(db.get_randomness(i).unwrap().is_some()); } + for i in 4..=5 { assert!(db.get_randomness(i).unwrap().is_none()); } + + // Vote and timeout cert cleared + assert!(db.get_last_vote().unwrap().is_none()); + assert!(db.get_highest_2chain_timeout_certificate().unwrap().is_none()); +} diff --git a/aptos-core/consensus/src/consensusdb/mod.rs b/aptos-core/consensus/src/consensusdb/mod.rs index d977ace35..cc9dce94c 100644 --- a/aptos-core/consensus/src/consensusdb/mod.rs +++ b/aptos-core/consensus/src/consensusdb/mod.rs @@ -415,6 +415,97 @@ impl ConsensusDB { pub fn get_randomness(&self, block_number: u64) -> Result>, DbError> { Ok(self.get::(&block_number)?) } + + /// Unwind the consensus DB to the given target block number. + /// All data for blocks with block_number > target_block_number will be deleted. + /// This includes: blocks, QCs, block numbers, ledger info, epoch-by-block-number, + /// randomness, last vote, and highest 2-chain timeout certificate. + pub fn unwind_to_block(&self, target_block_number: u64) -> Result<(), DbError> { + info!( + "ConsensusDB::unwind_to_block: unwinding to block {}", + target_block_number + ); + + let mut batch = SchemaBatch::new(); + let mut deleted_blocks = 0u64; + + // Step 1: Delete (epoch, block_id)-keyed CFs (Block, QC, BlockNumber). + // Iterate epochs from max_epoch downward. Within each epoch, scan BlockNumberSchema + // to find entries with block_number > target. Stop when an entire epoch has + // all block_numbers <= target (no more to delete in earlier epochs). + let max_epoch = self.get_max_epoch(); + for epoch in (1..=max_epoch).rev() { + let start_key = (epoch, HashValue::zero()); + let end_key = (epoch, HashValue::new([u8::MAX; HashValue::LENGTH])); + + let entries = self.get_range_with_filter::( + &start_key, + &end_key, + |(_, block_number)| *block_number > target_block_number, + )?; + + if entries.is_empty() { + // All blocks in this epoch are <= target, no need to check earlier epochs. + break; + } + + for ((ep, block_id), _) in &entries { + batch.delete::(&(*ep, *block_id))?; + batch.delete::(&(*ep, *block_id))?; + batch.delete::(&(*ep, *block_id))?; + deleted_blocks += 1; + } + } + + // Step 2: Delete block_number-keyed CFs by range query (target+1, u64::MAX). + let range_start = target_block_number.saturating_add(1); + + // LedgerInfoSchema + let ledger_entries = self.get_range::(&range_start, &u64::MAX)?; + for (bn, _) in &ledger_entries { + batch.delete::(bn)?; + } + + // EpochByBlockNumberSchema + let epoch_entries = + self.get_range::(&range_start, &u64::MAX)?; + for (bn, _) in &epoch_entries { + batch.delete::(bn)?; + } + + // RandomnessSchema + let randomness_entries = + self.get_range::(&range_start, &u64::MAX)?; + for (bn, _) in &randomness_entries { + batch.delete::(bn)?; + } + + // Step 3: Clear stale vote and timeout certificate. + batch.delete::( + &schema::single_entry::SingleEntryKey::LastVote, + )?; + batch.delete::( + &schema::single_entry::SingleEntryKey::Highest2ChainTimeoutCert, + )?; + + // Step 4: Commit all deletions atomically. + self.commit(batch)?; + + // Step 5: Update the in-memory latest_ledger_info cache. + self.ledger_db.metadata_db().update_latest_ledger_info(); + + info!( + "ConsensusDB::unwind_to_block complete: deleted {} blocks, \ + {} ledger_infos, {} epoch_entries, {} randomness entries. Target: {}", + deleted_blocks, + ledger_entries.len(), + epoch_entries.len(), + randomness_entries.len(), + target_block_number + ); + + Ok(()) + } } include!("include/reader.rs"); diff --git a/aptos-core/consensus/src/persistent_liveness_storage.rs b/aptos-core/consensus/src/persistent_liveness_storage.rs index 3711f712e..fa982e990 100644 --- a/aptos-core/consensus/src/persistent_liveness_storage.rs +++ b/aptos-core/consensus/src/persistent_liveness_storage.rs @@ -227,6 +227,7 @@ impl RecoveryData { blocks: &mut Vec, quorum_certs: &mut Vec, order_vote_enabled: bool, + storage_ledger: &LedgerInfoWithSignatures, ) -> Result { // sort by (epoch, round) to guarantee the topological order of parent <- child blocks.sort_by_key(|b| (b.epoch(), b.round())); @@ -254,16 +255,35 @@ impl RecoveryData { WrappedLedgerInfo::new(VoteData::dummy(), root_quorum_cert.ledger_info().clone()); (root_ordered_cert.clone(), root_ordered_cert) } else { - let root_ordered_cert = quorum_certs + match quorum_certs .iter() .find(|qc| qc.commit_info().id() == root_block.id()) - .ok_or_else(|| format_err!("No LI found for root: {}", root_block.id()))? - .clone() - .into_wrapped_ledger_info(); - let root_commit_cert = root_ordered_cert - .create_merged_with_executed_state(root_ordered_cert.ledger_info().clone()) - .expect("Inconsistent commit proof and evaluation decision, cannot commit block"); - (root_ordered_cert, root_commit_cert) + { + Some(qc) => { + let root_ordered_cert = qc.clone().into_wrapped_ledger_info(); + let root_commit_cert = root_ordered_cert + .create_merged_with_executed_state(root_ordered_cert.ledger_info().clone()) + .expect( + "Inconsistent commit proof and evaluation decision, cannot commit block", + ); + (root_ordered_cert, root_commit_cert) + } + None => { + // After an unwind, the committing QC (which certifies a block 2 rounds ahead) + // may have been deleted. Fall back to using the storage_ledger from + // LedgerInfoSchema, whose commit_info().id() correctly points to root block. + warn!( + "No committing QC found for root block {} (expected after unwind), \ + using storage_ledger fallback", + root_block.id() + ); + let root_ordered_cert = WrappedLedgerInfo::new( + VoteData::dummy(), + storage_ledger.clone(), + ); + (root_ordered_cert.clone(), root_ordered_cert) + } + } }; info!("Consensus root block is {}", root_block); Ok(RootInfo(Box::new(root_block), root_quorum_cert, root_ordered_cert, root_commit_cert)) @@ -289,6 +309,7 @@ impl RecoveryData { &mut blocks, &mut quorum_certs, order_vote_enabled, + &ledger_recovery_data.storage_ledger, )?; } else { root = ledger_recovery_data.find_root( diff --git a/aptos-core/consensus/src/quorum_store/quorum_store_db.rs b/aptos-core/consensus/src/quorum_store/quorum_store_db.rs index 2ed6b11f2..8fe410df6 100644 --- a/aptos-core/consensus/src/quorum_store/quorum_store_db.rs +++ b/aptos-core/consensus/src/quorum_store/quorum_store_db.rs @@ -106,8 +106,11 @@ impl QuorumStoreStorage for QuorumStoreDB { iter.map(|res| res.map_err(Into::into)).collect::>>()?; let mut ret = None; for (epoch, batch_id) in epoch_batch_id { - assert!(current_epoch >= epoch); - if epoch < current_epoch { + if epoch != current_epoch { + // Delete batch IDs from other epochs + if current_epoch < epoch { + warn!("Current epoch({}) is less than epoch({}) in quorum db, maybe cross-epoch unwind", current_epoch, epoch); + } self.delete_batch_id(epoch)?; } else { ret = Some(batch_id); diff --git a/bin/gravity_cli/src/command.rs b/bin/gravity_cli/src/command.rs index 001803a03..833a13964 100644 --- a/bin/gravity_cli/src/command.rs +++ b/bin/gravity_cli/src/command.rs @@ -1,6 +1,6 @@ use crate::{ dkg::DKGCommand, genesis::GenesisCommand, node::NodeCommand, stake::StakeCommand, - validator::ValidatorCommand, + unwind::UnwindCommand, validator::ValidatorCommand, }; use build_info::{build_information, BUILD_PKG_VERSION}; use clap::{Parser, Subcommand}; @@ -44,6 +44,8 @@ pub enum SubCommands { Stake(StakeCommand), Node(NodeCommand), Dkg(DKGCommand), + /// Unwind consensus state to a specific block number + Unwind(UnwindCommand), } pub trait Executable { diff --git a/bin/gravity_cli/src/main.rs b/bin/gravity_cli/src/main.rs index 459a91e3b..472760510 100644 --- a/bin/gravity_cli/src/main.rs +++ b/bin/gravity_cli/src/main.rs @@ -4,6 +4,7 @@ pub mod dkg; pub mod genesis; pub mod node; pub mod stake; +pub mod unwind; pub mod util; pub mod validator; @@ -46,6 +47,8 @@ fn main() { // Example: gravity-cli dkg randomness --server-url="127.0.0.1:1024" --block-number=100 dkg::SubCommands::Randomness(randomness_cmd) => randomness_cmd.execute(), }, + // Example: gravity-cli unwind --consensus-db-path="./data/consensus_db" --target=19700 + command::SubCommands::Unwind(unwind_cmd) => unwind_cmd.execute(), }; if let Err(e) = result { diff --git a/bin/gravity_cli/src/unwind.rs b/bin/gravity_cli/src/unwind.rs new file mode 100644 index 000000000..e7eca8699 --- /dev/null +++ b/bin/gravity_cli/src/unwind.rs @@ -0,0 +1,54 @@ +use anyhow::Result; +use aptos_consensus::consensusdb::ConsensusDB; +use clap::Parser; +use std::path::PathBuf; + +/// Unwind consensus state to a specific block number. +/// This deletes all consensus data (blocks, QCs, ledger info, randomness, etc.) +/// for blocks with block_number > target. +#[derive(Debug, Parser)] +pub struct UnwindCommand { + /// Path to the consensus DB data directory. + /// This is typically `/data/consensus_db`. + #[arg(long)] + consensus_db_path: PathBuf, + + /// Target block number to unwind to. + /// All data for blocks with block_number > target will be deleted. + /// The target block itself will be kept. + #[arg(long)] + target: u64, +} + +impl super::command::Executable for UnwindCommand { + fn execute(self) -> Result<(), anyhow::Error> { + println!( + "Unwinding consensus DB to block {}...", + self.target + ); + println!("Consensus DB path: {:?}", self.consensus_db_path); + + if !self.consensus_db_path.exists() { + return Err(anyhow::anyhow!( + "Consensus DB path does not exist: {:?}", + self.consensus_db_path + )); + } + + // Open ConsensusDB. The second argument is the node config path, + // which is not needed for unwind operations. + let consensus_db = ConsensusDB::new(&self.consensus_db_path, &PathBuf::new()); + + // Perform the unwind + consensus_db + .unwind_to_block(self.target) + .map_err(|e| anyhow::anyhow!("Failed to unwind consensus DB: {:?}", e))?; + + println!( + "Successfully unwound consensus DB to block {}.", + self.target + ); + + Ok(()) + } +} diff --git a/bin/gravity_node/src/cli.rs b/bin/gravity_node/src/cli.rs index f977fba7f..b37dd460d 100644 --- a/bin/gravity_node/src/cli.rs +++ b/bin/gravity_node/src/cli.rs @@ -94,6 +94,12 @@ pub(crate) struct Cli< } impl, Ext: clap::Args + fmt::Debug> Cli { + /// Returns true if the CLI command is the `node` subcommand, + /// which requires full node initialization (consensus, relayer, etc.). + pub(crate) fn is_node_command(&self) -> bool { + matches!(self.command, Commands::Node(_)) + } + /// Execute the configured cli command. /// /// This accepts a closure that is used to launch the node via the @@ -152,8 +158,8 @@ impl, Ext: clap::Args + fmt::Debug> Cl debug!(target: "reth::cli", "Initialized tracing, log directory: {}, log level {:?}", self.logs.log_file_directory, self.logs.verbosity); let runner = CliRunner::try_default_runtime()?; - let _components = |spec: Arc| { - (EthEvmConfig::ethereum(spec.clone()), EthBeaconConsensus::new(spec)) + let components = |spec: Arc| { + (EthEvmConfig::ethereum(spec.clone()), Arc::new(EthBeaconConsensus::new(spec))) }; match self.command { Commands::Node(command) => { @@ -176,6 +182,12 @@ impl, Ext: clap::Args + fmt::Debug> Cl Commands::P2P(command) => runner.run_until_ctrl_c(command.execute::()), Commands::Config(command) => runner.run_until_ctrl_c(command.execute()), Commands::Prune(command) => runner.run_until_ctrl_c(command.execute::()), + Commands::Stage(command) => { + println!("Running stage command"); + runner.run_command_until_exit(|ctx| { + command.execute::(ctx, components) + }) + } _ => todo!("not implemented"), } } diff --git a/bin/gravity_node/src/main.rs b/bin/gravity_node/src/main.rs index c8d78f948..81c4f45fe 100644 --- a/bin/gravity_node/src/main.rs +++ b/bin/gravity_node/src/main.rs @@ -226,6 +226,30 @@ fn main() { let _profiling_state = if std::env::var("ENABLE_PPROF").is_ok() { Some(setup_pprof_profiler()) } else { None }; let cli = Cli::parse(); + + // For utility subcommands (stage, db, init, config, etc.), skip full node initialization + // and just run the CLI command directly. + if !cli.is_node_command() { + reth_cli_util::sigsegv_handler::install(); + let res = cli.run( + |_builder: WithLaunchContext< + NodeBuilder< + Arc, + ::ChainSpec, + >, + >, + _| { + async move { unreachable!("launcher should not be called for utility commands") } + }, + ); + if let Err(err) = res { + eprintln!("Error: {err:?}"); + std::process::exit(1); + } + return; + } + + // Full node path: requires config, consensus, relayer, etc. let relayer_config_path = cli.gravity_node_config.relayer_config_path.clone(); let gcei_config = check_bootstrap_config(cli.gravity_node_config.node_config_path.clone()); diff --git a/crates/api/src/consensus_mempool_handler.rs b/crates/api/src/consensus_mempool_handler.rs index 7648d6fe6..1cc5215e6 100644 --- a/crates/api/src/consensus_mempool_handler.rs +++ b/crates/api/src/consensus_mempool_handler.rs @@ -97,13 +97,22 @@ impl ConsensusToMempoolHandler { self.handle_consensus_commit_notification(commit_notification).await } ConsensusNotification::SyncToTarget(sync_notification) => { - self.event_subscription_service + let target_block = sync_notification.get_target().ledger_info().block_number(); + match self.event_subscription_service .lock() .await - .notify_initial_configs( - sync_notification.get_target().ledger_info().block_number(), - ) - .unwrap(); + .notify_initial_configs(target_block) + { + Ok(_) => {}, + Err(e) => { + warn!( + "Failed to notify initial configs for block {}: {:?}. \ + This is expected after a cross-epoch unwind; \ + the node will re-sync missing blocks via block sync.", + target_block, e + ); + } + } let _ = self .consensus_notification_listener .respond_to_sync_target_notification(sync_notification, Ok(()))