Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ members = [
"tooling/loc",
"tooling/archive_sync",
"crates/common/config",
"tooling/prune-blocks",
]
resolver = "2"

Expand Down
4 changes: 4 additions & 0 deletions crates/storage/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ pub trait StoreEngine: Debug + Send + Sync + RefUnwindSafe {
/// Remove canonical block
async fn remove_block(&self, block_number: BlockNumber) -> Result<(), StoreError>;

/// Removes Block and all asociated data (receipts, transaction locations, etc)
/// Doesn't fail if the block doesn't exist or has missing data
async fn purge_block(&self, block_number: BlockNumber) -> Result<(), StoreError>;

/// Obtain canonical block bodies in from..=to
async fn get_block_bodies(
&self,
Expand Down
6 changes: 6 additions & 0 deletions crates/storage/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,12 @@ impl Store {
self.engine.remove_block(block_number).await
}

/// Removes Block and all asociated data (receipts, transaction locations, etc)
/// Doesn't fail if the block doesn't exist or has missing data
pub async fn purge_block(&self, block_number: BlockNumber) -> Result<(), StoreError> {
self.engine.purge_block(block_number).await
}

pub async fn get_block_bodies(
&self,
from: BlockNumber,
Expand Down
20 changes: 20 additions & 0 deletions crates/storage/store_db/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,26 @@ impl StoreEngine for Store {
Ok(())
}

async fn purge_block(&self, block_number: BlockNumber) -> Result<(), StoreError> {
let mut store = self.inner()?;
let Some(block_hash) = store.canonical_hashes.remove(&block_number) else {
// Block must have been already purged
return Ok(());
};
// Obtain block hash & block body so we can use it to remove receipts & transactions
if let Some(block_body) = store.bodies.remove(&block_hash) {
// Remove transaction location and receipts. Note that if the block was obtained via snap sync these are not guaranteed to exist
for tx_hash in block_body.transactions.iter().map(|tx| tx.hash()) {
store.transaction_locations.remove(&tx_hash);
}
store.receipts.remove(&block_hash);
}
// Remove block header & number
store.headers.remove(&block_hash);
store.block_numbers.remove(&block_hash);
Ok(())
}

async fn get_block_bodies(
&self,
from: BlockNumber,
Expand Down
47 changes: 47 additions & 0 deletions crates/storage/store_db/libmdbx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,53 @@ impl StoreEngine for Store {
txn.commit().map_err(StoreError::LibmdbxError)
}

async fn purge_block(&self, block_number: BlockNumber) -> Result<(), StoreError> {
let txn = self
.db
.begin_readwrite()
.map_err(StoreError::LibmdbxError)?;
let Some(block_hash) = txn
.get::<CanonicalBlockHashes>(block_number)
.map_err(StoreError::LibmdbxError)?
else {
// Block must have been already purged
txn.commit().map_err(StoreError::LibmdbxError)?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to commit here? Can't we just abort or discard the tx?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had some problems because of this when testing it

return Ok(());
};
let block_hash = block_hash.to()?;
// Obtain block hash & block body so we can use it to remove receipts & transactions
if let Some(block_body) = txn
.get::<Bodies>(block_hash.into())
.map_err(StoreError::LibmdbxError)?
{
let block_body = block_body.to()?;
// Remove transaction location and receipts. Note that if the block was obtained via snap sync these are not guaranteed to exist
for (idx, tx_hash) in block_body
.transactions
.iter()
.map(|tx| tx.hash())
.enumerate()
{
txn.delete::<TransactionLocations>(tx_hash.into(), None)
.map_err(StoreError::LibmdbxError)?;
txn.delete::<Receipts>((block_hash, idx as u64).into(), None)
.map_err(StoreError::LibmdbxError)?;
Comment on lines +345 to +355
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If these aren't guaranteed to exist, should we ignore any errors then?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't get an error if they don't exist

}
// Remove body
txn.delete::<Bodies>(block_hash.into(), None)
.map_err(StoreError::LibmdbxError)?;
}
// Remove block header, hash & number
txn.delete::<CanonicalBlockHashes>(block_number, None)
.map_err(StoreError::LibmdbxError)?;
txn.delete::<Headers>(block_hash.into(), None)
.map_err(StoreError::LibmdbxError)?;
txn.delete::<BlockNumbers>(block_hash.into(), None)
.map_err(StoreError::LibmdbxError)?;

txn.commit().map_err(StoreError::LibmdbxError)
}

async fn get_block_bodies(
&self,
from: BlockNumber,
Expand Down
13 changes: 13 additions & 0 deletions tooling/prune-blocks/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[package]
name = "prune-blocks"
version.workspace = true
edition.workspace = true

[dependencies]
ethrex-storage.workspace = true
tokio.workspace = true
tracing.workspace = true
tracing-subscriber = { version = "0.3", features = ["fmt"] }
clap = { workspace = true, features = ["string"] }
eyre.workspace = true
ethrex.workspace = true
15 changes: 15 additions & 0 deletions tooling/prune-blocks/src/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Prune Blocks

This tool can be used to reduce the DB size of the node by removing old blocks and their associated data. Note that this is counter-spec and will hinder the node's ability to provide data to other nodes. It also does not perform state pruning.

## Usage

The tool takes two optional arguments:
*`datadir`: The path to the DB location, will use the default one if not provided
*`blocks_to_keep`: The amount of latest blocks that will be kept in the DB. This value must be at least 128 and lower than the current amount of blocks in the chain.

And should be run like this:

```bash
cargo run --release -- --datadir DATADIR --blocks-to-keep BLOCKS_TO_KEEP
```
95 changes: 95 additions & 0 deletions tooling/prune-blocks/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
use std::time::{Duration, Instant};

use clap::Parser;
use ethrex::{
initializers::load_store,
utils::{default_datadir, init_datadir},
};
use tracing::info;
use tracing_subscriber::FmtSubscriber;

const MIN_BLOCKS_TO_KEEP: u64 = 128;

fn mseconds_to_readable(mut mseconds: u128) -> String {
const DAY: u128 = 24 * HOUR;
const HOUR: u128 = 60 * MINUTE;
const MINUTE: u128 = 60 * SECOND;
const SECOND: u128 = 1000 * MSECOND;
const MSECOND: u128 = 1;
let mut res = String::new();
let mut apply_time_unit = |unit_in_ms: u128, unit_str: &str| {
if mseconds > unit_in_ms {
let amount_of_unit = mseconds / unit_in_ms;
res.push_str(&format!("{amount_of_unit}{unit_str}"));
mseconds -= unit_in_ms * amount_of_unit
}
};
apply_time_unit(DAY, "d");
apply_time_unit(HOUR, "h");
apply_time_unit(MINUTE, "m");
apply_time_unit(SECOND, "s");
apply_time_unit(MSECOND, "ms");

res
}

#[derive(Parser)]
struct Args {
#[arg(
long = "blocks-to-keep",
value_name = "NUMBER",
help = "Amount of blocks to keep",
long_help = "Cannot be smaller than 128",
default_value_t = MIN_BLOCKS_TO_KEEP,
)]
blocks_to_keep: u64,
#[arg(
long = "datadir",
value_name = "DATABASE_DIRECTORY",
default_value_t = default_datadir(),
help = "Receives the name of the directory where the Database is located.",
long_help = "If the datadir is the word `memory`, ethrex will use the `InMemory Engine`.",
env = "ETHREX_DATADIR"
)]
pub datadir: String,
}

#[tokio::main]
pub async fn main() -> eyre::Result<()> {
let args = Args::parse();
tracing::subscriber::set_global_default(FmtSubscriber::new())
.expect("setting default subscriber failed");
if args.blocks_to_keep < MIN_BLOCKS_TO_KEEP {
return Err(eyre::ErrReport::msg(format!(
"Must keep at least {MIN_BLOCKS_TO_KEEP} in store"
)));
}
let data_dir = init_datadir(&args.datadir);
let store = load_store(&data_dir).await;
let latest_number = store.get_latest_block_number().await?;
if latest_number <= args.blocks_to_keep {
return Err(eyre::ErrReport::msg(format!(
"Only have {latest_number} blocks in store, cannot prune"
)));
}
let last_block_to_prune = latest_number - args.blocks_to_keep;
let prune_start = Instant::now();
let mut last_show_progress = Instant::now();
const SHOW_PROGRESS_INTERVAL: Duration = Duration::from_secs(5);
// Skip the genesis block
for block_number in 1..last_block_to_prune {
if last_show_progress.elapsed() > SHOW_PROGRESS_INTERVAL {
last_show_progress = Instant::now();
info!(
"Pruned {block_number} blocks, {}% done",
(block_number * 100) / last_block_to_prune
)
}
store.purge_block(block_number).await?;
}
info!(
"Succesfully pruned {last_block_to_prune} blocks in {}",
mseconds_to_readable(prune_start.elapsed().as_millis())
);
Ok(())
}
Loading