diff --git a/Cargo.lock b/Cargo.lock index ec23c90882..1c86e106e4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8564,6 +8564,19 @@ version = "2.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94" +[[package]] +name = "prune-blocks" +version = "0.1.0" +dependencies = [ + "clap 4.5.40", + "ethrex", + "ethrex-storage", + "eyre", + "tokio", + "tracing", + "tracing-subscriber 0.3.19", +] + [[package]] name = "ptr_meta" version = "0.3.0" diff --git a/Cargo.toml b/Cargo.toml index 3a452e91a7..5b238de089 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,6 +32,7 @@ members = [ "tooling/loc", "tooling/archive_sync", "crates/common/config", + "tooling/prune-blocks", ] resolver = "2" diff --git a/crates/storage/api.rs b/crates/storage/api.rs index e0d9cbf4a4..50f28f2f72 100644 --- a/crates/storage/api.rs +++ b/crates/storage/api.rs @@ -53,6 +53,10 @@ pub trait StoreEngine: Debug + Send + Sync + RefUnwindSafe { /// Remove canonical block async fn remove_block(&self, block_number: BlockNumber) -> Result<(), StoreError>; + /// Removes Block and all asociated data (receipts, transaction locations, etc) + /// Doesn't fail if the block doesn't exist or has missing data + async fn purge_block(&self, block_number: BlockNumber) -> Result<(), StoreError>; + /// Obtain canonical block bodies in from..=to async fn get_block_bodies( &self, diff --git a/crates/storage/store.rs b/crates/storage/store.rs index 5100c7d39a..23fd5cc1e3 100644 --- a/crates/storage/store.rs +++ b/crates/storage/store.rs @@ -219,6 +219,12 @@ impl Store { self.engine.remove_block(block_number).await } + /// Removes Block and all asociated data (receipts, transaction locations, etc) + /// Doesn't fail if the block doesn't exist or has missing data + pub async fn purge_block(&self, block_number: BlockNumber) -> Result<(), StoreError> { + self.engine.purge_block(block_number).await + } + pub async fn get_block_bodies( &self, from: BlockNumber, diff --git a/crates/storage/store_db/in_memory.rs b/crates/storage/store_db/in_memory.rs index e0500ca970..a4110408ae 100644 --- a/crates/storage/store_db/in_memory.rs +++ b/crates/storage/store_db/in_memory.rs @@ -177,6 +177,26 @@ impl StoreEngine for Store { Ok(()) } + async fn purge_block(&self, block_number: BlockNumber) -> Result<(), StoreError> { + let mut store = self.inner()?; + let Some(block_hash) = store.canonical_hashes.remove(&block_number) else { + // Block must have been already purged + return Ok(()); + }; + // Obtain block hash & block body so we can use it to remove receipts & transactions + if let Some(block_body) = store.bodies.remove(&block_hash) { + // Remove transaction location and receipts. Note that if the block was obtained via snap sync these are not guaranteed to exist + for tx_hash in block_body.transactions.iter().map(|tx| tx.hash()) { + store.transaction_locations.remove(&tx_hash); + } + store.receipts.remove(&block_hash); + } + // Remove block header & number + store.headers.remove(&block_hash); + store.block_numbers.remove(&block_hash); + Ok(()) + } + async fn get_block_bodies( &self, from: BlockNumber, diff --git a/crates/storage/store_db/libmdbx.rs b/crates/storage/store_db/libmdbx.rs index 9a469aa6a0..57afd7fef1 100644 --- a/crates/storage/store_db/libmdbx.rs +++ b/crates/storage/store_db/libmdbx.rs @@ -322,6 +322,53 @@ impl StoreEngine for Store { txn.commit().map_err(StoreError::LibmdbxError) } + async fn purge_block(&self, block_number: BlockNumber) -> Result<(), StoreError> { + let txn = self + .db + .begin_readwrite() + .map_err(StoreError::LibmdbxError)?; + let Some(block_hash) = txn + .get::(block_number) + .map_err(StoreError::LibmdbxError)? + else { + // Block must have been already purged + txn.commit().map_err(StoreError::LibmdbxError)?; + return Ok(()); + }; + let block_hash = block_hash.to()?; + // Obtain block hash & block body so we can use it to remove receipts & transactions + if let Some(block_body) = txn + .get::(block_hash.into()) + .map_err(StoreError::LibmdbxError)? + { + let block_body = block_body.to()?; + // Remove transaction location and receipts. Note that if the block was obtained via snap sync these are not guaranteed to exist + for (idx, tx_hash) in block_body + .transactions + .iter() + .map(|tx| tx.hash()) + .enumerate() + { + txn.delete::(tx_hash.into(), None) + .map_err(StoreError::LibmdbxError)?; + txn.delete::((block_hash, idx as u64).into(), None) + .map_err(StoreError::LibmdbxError)?; + } + // Remove body + txn.delete::(block_hash.into(), None) + .map_err(StoreError::LibmdbxError)?; + } + // Remove block header, hash & number + txn.delete::(block_number, None) + .map_err(StoreError::LibmdbxError)?; + txn.delete::(block_hash.into(), None) + .map_err(StoreError::LibmdbxError)?; + txn.delete::(block_hash.into(), None) + .map_err(StoreError::LibmdbxError)?; + + txn.commit().map_err(StoreError::LibmdbxError) + } + async fn get_block_bodies( &self, from: BlockNumber, diff --git a/tooling/prune-blocks/Cargo.toml b/tooling/prune-blocks/Cargo.toml new file mode 100644 index 0000000000..25be056774 --- /dev/null +++ b/tooling/prune-blocks/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "prune-blocks" +version.workspace = true +edition.workspace = true + +[dependencies] +ethrex-storage.workspace = true +tokio.workspace = true +tracing.workspace = true +tracing-subscriber = { version = "0.3", features = ["fmt"] } +clap = { workspace = true, features = ["string"] } +eyre.workspace = true +ethrex.workspace = true diff --git a/tooling/prune-blocks/src/README.md b/tooling/prune-blocks/src/README.md new file mode 100644 index 0000000000..daeff4d09d --- /dev/null +++ b/tooling/prune-blocks/src/README.md @@ -0,0 +1,15 @@ +# Prune Blocks + +This tool can be used to reduce the DB size of the node by removing old blocks and their associated data. Note that this is counter-spec and will hinder the node's ability to provide data to other nodes. It also does not perform state pruning. + +## Usage + +The tool takes two optional arguments: + *`datadir`: The path to the DB location, will use the default one if not provided + *`blocks_to_keep`: The amount of latest blocks that will be kept in the DB. This value must be at least 128 and lower than the current amount of blocks in the chain. + +And should be run like this: + +```bash +cargo run --release -- --datadir DATADIR --blocks-to-keep BLOCKS_TO_KEEP +``` diff --git a/tooling/prune-blocks/src/main.rs b/tooling/prune-blocks/src/main.rs new file mode 100644 index 0000000000..7818d9c602 --- /dev/null +++ b/tooling/prune-blocks/src/main.rs @@ -0,0 +1,95 @@ +use std::time::{Duration, Instant}; + +use clap::Parser; +use ethrex::{ + initializers::load_store, + utils::{default_datadir, init_datadir}, +}; +use tracing::info; +use tracing_subscriber::FmtSubscriber; + +const MIN_BLOCKS_TO_KEEP: u64 = 128; + +fn mseconds_to_readable(mut mseconds: u128) -> String { + const DAY: u128 = 24 * HOUR; + const HOUR: u128 = 60 * MINUTE; + const MINUTE: u128 = 60 * SECOND; + const SECOND: u128 = 1000 * MSECOND; + const MSECOND: u128 = 1; + let mut res = String::new(); + let mut apply_time_unit = |unit_in_ms: u128, unit_str: &str| { + if mseconds > unit_in_ms { + let amount_of_unit = mseconds / unit_in_ms; + res.push_str(&format!("{amount_of_unit}{unit_str}")); + mseconds -= unit_in_ms * amount_of_unit + } + }; + apply_time_unit(DAY, "d"); + apply_time_unit(HOUR, "h"); + apply_time_unit(MINUTE, "m"); + apply_time_unit(SECOND, "s"); + apply_time_unit(MSECOND, "ms"); + + res +} + +#[derive(Parser)] +struct Args { + #[arg( + long = "blocks-to-keep", + value_name = "NUMBER", + help = "Amount of blocks to keep", + long_help = "Cannot be smaller than 128", + default_value_t = MIN_BLOCKS_TO_KEEP, + )] + blocks_to_keep: u64, + #[arg( + long = "datadir", + value_name = "DATABASE_DIRECTORY", + default_value_t = default_datadir(), + help = "Receives the name of the directory where the Database is located.", + long_help = "If the datadir is the word `memory`, ethrex will use the `InMemory Engine`.", + env = "ETHREX_DATADIR" + )] + pub datadir: String, +} + +#[tokio::main] +pub async fn main() -> eyre::Result<()> { + let args = Args::parse(); + tracing::subscriber::set_global_default(FmtSubscriber::new()) + .expect("setting default subscriber failed"); + if args.blocks_to_keep < MIN_BLOCKS_TO_KEEP { + return Err(eyre::ErrReport::msg(format!( + "Must keep at least {MIN_BLOCKS_TO_KEEP} in store" + ))); + } + let data_dir = init_datadir(&args.datadir); + let store = load_store(&data_dir).await; + let latest_number = store.get_latest_block_number().await?; + if latest_number <= args.blocks_to_keep { + return Err(eyre::ErrReport::msg(format!( + "Only have {latest_number} blocks in store, cannot prune" + ))); + } + let last_block_to_prune = latest_number - args.blocks_to_keep; + let prune_start = Instant::now(); + let mut last_show_progress = Instant::now(); + const SHOW_PROGRESS_INTERVAL: Duration = Duration::from_secs(5); + // Skip the genesis block + for block_number in 1..last_block_to_prune { + if last_show_progress.elapsed() > SHOW_PROGRESS_INTERVAL { + last_show_progress = Instant::now(); + info!( + "Pruned {block_number} blocks, {}% done", + (block_number * 100) / last_block_to_prune + ) + } + store.purge_block(block_number).await?; + } + info!( + "Succesfully pruned {last_block_to_prune} blocks in {}", + mseconds_to_readable(prune_start.elapsed().as_millis()) + ); + Ok(()) +}