Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions aptos-core/consensus/src/consensusdb/consensusdb_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,3 +115,91 @@ fn test_dag() {
let vote = Vote::new(node.metadata().clone(), Signature::dummy_signature());
test_dag_type::<DagVoteSchema, <DagVoteSchema as Schema>::Key>(node.id(), vote, &db);
}

#[test]
fn test_unwind_to_block() {
use aptos_consensus_types::{
block::block_test_utils::placeholder_certificate_for_block,
common::Payload,
};

let tmp_dir = TempPath::new();
let db = ConsensusDB::new(&tmp_dir, &PathBuf::new());

let epoch = 1u64;
let genesis = Block::make_genesis_block();
let genesis_qc = certificate_for_genesis();

// Save genesis block and its block_number mapping
db.save_blocks_and_quorum_certificates(vec![genesis.clone()], vec![genesis_qc.clone()])
.unwrap();
db.save_block_numbers(vec![(epoch, 0, genesis.id())]).unwrap();

// Create 5 blocks (block_number 1..=5) in epoch 1
let signer = gaptos::aptos_types::validator_signer::ValidatorSigner::random(None);
let mut parent_qc = genesis_qc;
let mut blocks = Vec::new();
let mut qcs = Vec::new();
let mut block_numbers = Vec::new();

for i in 1..=5u64 {
let block = Block::new_proposal(
Payload::empty(false, true),
i,
gaptos::aptos_infallible::duration_since_epoch().as_micros() as u64,
parent_qc.clone(),
&signer,
Vec::new(),
)
.unwrap();
block.set_block_number(i);
block_numbers.push((epoch, i, block.id()));

let qc = placeholder_certificate_for_block(
&[signer.clone()],
block.id(),
i,
if i == 1 { genesis.id() } else { blocks.last().map(|b: &Block| b.id()).unwrap() },
i - 1,
);

blocks.push(block);
qcs.push(qc.clone());
parent_qc = qc;
}

db.save_blocks_and_quorum_certificates(blocks.clone(), qcs.clone()).unwrap();
db.save_block_numbers(block_numbers).unwrap();

// Save randomness for blocks 1..=5
let randomness: Vec<(u64, Vec<u8>)> = (1..=5).map(|i| (i, vec![i as u8; 32])).collect();
db.put_randomness(&randomness).unwrap();

// Save vote and timeout cert
db.save_vote(vec![1, 2, 3]).unwrap();
db.save_highest_2chain_timeout_certificate(vec![4, 5, 6]).unwrap();

// Verify initial state: genesis + 5 = 6 blocks
assert_eq!(db.get_all::<BlockSchema>().unwrap().len(), 6);
assert_eq!(db.get_all::<QCSchema>().unwrap().len(), 6);

// === Unwind to block 3 ===
db.unwind_to_block(3).unwrap();

// Blocks: genesis + 1,2,3 = 4 remaining
assert_eq!(db.get_all::<BlockSchema>().unwrap().len(), 4);
assert_eq!(db.get_all::<QCSchema>().unwrap().len(), 4);

// BlockNumbers: 0,1,2,3 remaining; 4,5 deleted
let bns: Vec<u64> = db.get_all::<BlockNumberSchema>().unwrap().into_iter().map(|(_, v)| v).collect();
for i in 0..=3 { assert!(bns.contains(&i), "block_number {} should remain", i); }
for i in 4..=5 { assert!(!bns.contains(&i), "block_number {} should be deleted", i); }

// Randomness: 1,2,3 remain; 4,5 deleted
for i in 1..=3 { assert!(db.get_randomness(i).unwrap().is_some()); }
for i in 4..=5 { assert!(db.get_randomness(i).unwrap().is_none()); }

// Vote and timeout cert cleared
assert!(db.get_last_vote().unwrap().is_none());
assert!(db.get_highest_2chain_timeout_certificate().unwrap().is_none());
}
91 changes: 91 additions & 0 deletions aptos-core/consensus/src/consensusdb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,97 @@ impl ConsensusDB {
pub fn get_randomness(&self, block_number: u64) -> Result<Option<Vec<u8>>, DbError> {
Ok(self.get::<schema::randomness::RandomnessSchema>(&block_number)?)
}

/// Unwind the consensus DB to the given target block number.
/// All data for blocks with block_number > target_block_number will be deleted.
/// This includes: blocks, QCs, block numbers, ledger info, epoch-by-block-number,
/// randomness, last vote, and highest 2-chain timeout certificate.
pub fn unwind_to_block(&self, target_block_number: u64) -> Result<(), DbError> {
info!(
"ConsensusDB::unwind_to_block: unwinding to block {}",
target_block_number
);

let mut batch = SchemaBatch::new();
let mut deleted_blocks = 0u64;

// Step 1: Delete (epoch, block_id)-keyed CFs (Block, QC, BlockNumber).
// Iterate epochs from max_epoch downward. Within each epoch, scan BlockNumberSchema
// to find entries with block_number > target. Stop when an entire epoch has
// all block_numbers <= target (no more to delete in earlier epochs).
let max_epoch = self.get_max_epoch();
for epoch in (1..=max_epoch).rev() {
let start_key = (epoch, HashValue::zero());
let end_key = (epoch, HashValue::new([u8::MAX; HashValue::LENGTH]));

let entries = self.get_range_with_filter::<BlockNumberSchema, _>(
&start_key,
&end_key,
|(_, block_number)| *block_number > target_block_number,
)?;

if entries.is_empty() {
// All blocks in this epoch are <= target, no need to check earlier epochs.
break;
}

for ((ep, block_id), _) in &entries {
batch.delete::<BlockSchema>(&(*ep, *block_id))?;
batch.delete::<QCSchema>(&(*ep, *block_id))?;
batch.delete::<BlockNumberSchema>(&(*ep, *block_id))?;
deleted_blocks += 1;
}
}

// Step 2: Delete block_number-keyed CFs by range query (target+1, u64::MAX).
let range_start = target_block_number.saturating_add(1);

// LedgerInfoSchema
let ledger_entries = self.get_range::<LedgerInfoSchema>(&range_start, &u64::MAX)?;
for (bn, _) in &ledger_entries {
batch.delete::<LedgerInfoSchema>(bn)?;
}

// EpochByBlockNumberSchema
let epoch_entries =
self.get_range::<EpochByBlockNumberSchema>(&range_start, &u64::MAX)?;
for (bn, _) in &epoch_entries {
batch.delete::<EpochByBlockNumberSchema>(bn)?;
}

// RandomnessSchema
let randomness_entries =
self.get_range::<schema::randomness::RandomnessSchema>(&range_start, &u64::MAX)?;
for (bn, _) in &randomness_entries {
batch.delete::<schema::randomness::RandomnessSchema>(bn)?;
}

// Step 3: Clear stale vote and timeout certificate.
batch.delete::<schema::single_entry::SingleEntrySchema>(
&schema::single_entry::SingleEntryKey::LastVote,
)?;
batch.delete::<schema::single_entry::SingleEntrySchema>(
&schema::single_entry::SingleEntryKey::Highest2ChainTimeoutCert,
)?;

// Step 4: Commit all deletions atomically.
self.commit(batch)?;

// Step 5: Update the in-memory latest_ledger_info cache.
self.ledger_db.metadata_db().update_latest_ledger_info();

info!(
"ConsensusDB::unwind_to_block complete: deleted {} blocks, \
{} ledger_infos, {} epoch_entries, {} randomness entries. Target: {}",
deleted_blocks,
ledger_entries.len(),
epoch_entries.len(),
randomness_entries.len(),
target_block_number
);

Ok(())
}
}

include!("include/reader.rs");
Expand Down
37 changes: 29 additions & 8 deletions aptos-core/consensus/src/persistent_liveness_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ impl RecoveryData {
blocks: &mut Vec<Block>,
quorum_certs: &mut Vec<QuorumCert>,
order_vote_enabled: bool,
storage_ledger: &LedgerInfoWithSignatures,
) -> Result<RootInfo> {
// sort by (epoch, round) to guarantee the topological order of parent <- child
blocks.sort_by_key(|b| (b.epoch(), b.round()));
Expand Down Expand Up @@ -254,16 +255,35 @@ impl RecoveryData {
WrappedLedgerInfo::new(VoteData::dummy(), root_quorum_cert.ledger_info().clone());
(root_ordered_cert.clone(), root_ordered_cert)
} else {
let root_ordered_cert = quorum_certs
match quorum_certs
.iter()
.find(|qc| qc.commit_info().id() == root_block.id())
.ok_or_else(|| format_err!("No LI found for root: {}", root_block.id()))?
.clone()
.into_wrapped_ledger_info();
let root_commit_cert = root_ordered_cert
.create_merged_with_executed_state(root_ordered_cert.ledger_info().clone())
.expect("Inconsistent commit proof and evaluation decision, cannot commit block");
(root_ordered_cert, root_commit_cert)
{
Some(qc) => {
let root_ordered_cert = qc.clone().into_wrapped_ledger_info();
let root_commit_cert = root_ordered_cert
.create_merged_with_executed_state(root_ordered_cert.ledger_info().clone())
.expect(
"Inconsistent commit proof and evaluation decision, cannot commit block",
);
(root_ordered_cert, root_commit_cert)
}
None => {
// After an unwind, the committing QC (which certifies a block 2 rounds ahead)
// may have been deleted. Fall back to using the storage_ledger from
// LedgerInfoSchema, whose commit_info().id() correctly points to root block.
warn!(
"No committing QC found for root block {} (expected after unwind), \
using storage_ledger fallback",
root_block.id()
);
let root_ordered_cert = WrappedLedgerInfo::new(
VoteData::dummy(),
storage_ledger.clone(),
);
(root_ordered_cert.clone(), root_ordered_cert)
}
}
};
info!("Consensus root block is {}", root_block);
Ok(RootInfo(Box::new(root_block), root_quorum_cert, root_ordered_cert, root_commit_cert))
Expand All @@ -289,6 +309,7 @@ impl RecoveryData {
&mut blocks,
&mut quorum_certs,
order_vote_enabled,
&ledger_recovery_data.storage_ledger,
)?;
} else {
root = ledger_recovery_data.find_root(
Expand Down
7 changes: 5 additions & 2 deletions aptos-core/consensus/src/quorum_store/quorum_store_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,11 @@ impl QuorumStoreStorage for QuorumStoreDB {
iter.map(|res| res.map_err(Into::into)).collect::<Result<HashMap<u64, BatchId>>>()?;
let mut ret = None;
for (epoch, batch_id) in epoch_batch_id {
assert!(current_epoch >= epoch);
if epoch < current_epoch {
if epoch != current_epoch {
// Delete batch IDs from other epochs
if current_epoch < epoch {
warn!("Current epoch({}) is less than epoch({}) in quorum db, maybe cross-epoch unwind", current_epoch, epoch);
}
self.delete_batch_id(epoch)?;
} else {
ret = Some(batch_id);
Expand Down
4 changes: 3 additions & 1 deletion bin/gravity_cli/src/command.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
dkg::DKGCommand, genesis::GenesisCommand, node::NodeCommand, stake::StakeCommand,
validator::ValidatorCommand,
unwind::UnwindCommand, validator::ValidatorCommand,
};
use build_info::{build_information, BUILD_PKG_VERSION};
use clap::{Parser, Subcommand};
Expand Down Expand Up @@ -44,6 +44,8 @@ pub enum SubCommands {
Stake(StakeCommand),
Node(NodeCommand),
Dkg(DKGCommand),
/// Unwind consensus state to a specific block number
Unwind(UnwindCommand),
}

pub trait Executable {
Expand Down
3 changes: 3 additions & 0 deletions bin/gravity_cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ pub mod dkg;
pub mod genesis;
pub mod node;
pub mod stake;
pub mod unwind;
pub mod util;
pub mod validator;

Expand Down Expand Up @@ -46,6 +47,8 @@ fn main() {
// Example: gravity-cli dkg randomness --server-url="127.0.0.1:1024" --block-number=100
dkg::SubCommands::Randomness(randomness_cmd) => randomness_cmd.execute(),
},
// Example: gravity-cli unwind --consensus-db-path="./data/consensus_db" --target=19700
command::SubCommands::Unwind(unwind_cmd) => unwind_cmd.execute(),
};

if let Err(e) = result {
Expand Down
54 changes: 54 additions & 0 deletions bin/gravity_cli/src/unwind.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use anyhow::Result;
use aptos_consensus::consensusdb::ConsensusDB;
use clap::Parser;
use std::path::PathBuf;

/// Unwind consensus state to a specific block number.
/// This deletes all consensus data (blocks, QCs, ledger info, randomness, etc.)
/// for blocks with block_number > target.
#[derive(Debug, Parser)]
pub struct UnwindCommand {
/// Path to the consensus DB data directory.
/// This is typically `<deploy-path>/data/consensus_db`.
#[arg(long)]
consensus_db_path: PathBuf,

/// Target block number to unwind to.
/// All data for blocks with block_number > target will be deleted.
/// The target block itself will be kept.
#[arg(long)]
target: u64,
}

impl super::command::Executable for UnwindCommand {
fn execute(self) -> Result<(), anyhow::Error> {
println!(
"Unwinding consensus DB to block {}...",
self.target
);
println!("Consensus DB path: {:?}", self.consensus_db_path);

if !self.consensus_db_path.exists() {
return Err(anyhow::anyhow!(
"Consensus DB path does not exist: {:?}",
self.consensus_db_path
));
}

// Open ConsensusDB. The second argument is the node config path,
// which is not needed for unwind operations.
let consensus_db = ConsensusDB::new(&self.consensus_db_path, &PathBuf::new());

// Perform the unwind
consensus_db
.unwind_to_block(self.target)
.map_err(|e| anyhow::anyhow!("Failed to unwind consensus DB: {:?}", e))?;

println!(
"Successfully unwound consensus DB to block {}.",
self.target
);

Ok(())
}
}
16 changes: 14 additions & 2 deletions bin/gravity_node/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ pub(crate) struct Cli<
}

impl<C: ChainSpecParser<ChainSpec = ChainSpec>, Ext: clap::Args + fmt::Debug> Cli<C, Ext> {
/// Returns true if the CLI command is the `node` subcommand,
/// which requires full node initialization (consensus, relayer, etc.).
pub(crate) fn is_node_command(&self) -> bool {
matches!(self.command, Commands::Node(_))
}

/// Execute the configured cli command.
///
/// This accepts a closure that is used to launch the node via the
Expand Down Expand Up @@ -152,8 +158,8 @@ impl<C: ChainSpecParser<ChainSpec = ChainSpec>, Ext: clap::Args + fmt::Debug> Cl
debug!(target: "reth::cli", "Initialized tracing, log directory: {}, log level {:?}", self.logs.log_file_directory, self.logs.verbosity);

let runner = CliRunner::try_default_runtime()?;
let _components = |spec: Arc<C::ChainSpec>| {
(EthEvmConfig::ethereum(spec.clone()), EthBeaconConsensus::new(spec))
let components = |spec: Arc<C::ChainSpec>| {
(EthEvmConfig::ethereum(spec.clone()), Arc::new(EthBeaconConsensus::new(spec)))
};
match self.command {
Commands::Node(command) => {
Expand All @@ -176,6 +182,12 @@ impl<C: ChainSpecParser<ChainSpec = ChainSpec>, Ext: clap::Args + fmt::Debug> Cl
Commands::P2P(command) => runner.run_until_ctrl_c(command.execute::<EthereumNode>()),
Commands::Config(command) => runner.run_until_ctrl_c(command.execute()),
Commands::Prune(command) => runner.run_until_ctrl_c(command.execute::<EthereumNode>()),
Commands::Stage(command) => {
println!("Running stage command");
runner.run_command_until_exit(|ctx| {
command.execute::<EthereumNode, _>(ctx, components)
})
}
_ => todo!("not implemented"),
}
}
Expand Down
Loading
Loading