Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions doc/schema.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,34 +25,40 @@ Each block results in the following new rows:

* `"M{blockhash}" → "{metadata}"` (block weight, size and number of txs)

* `"D{blockhash}" → ""` (signifies the block is done processing)
* `"D{blockhash}" → ""` (signifies the block was added)

Each transaction results in the following new rows:
Each transaction results in the following new row:

* `"T{txid}" → "{serialized-transaction}"`

* `"C{txid}{confirmed-blockhash}" → ""` (a list of blockhashes where `txid` was seen to be confirmed)

Each output results in the following new row:
Each output results in the following new rows:

* `"O{txid}{vout}" → "{scriptpubkey}{value}"`
* `"a{funding-address-str}" → ""` (for prefix address search, only saved when `--address-search` is enabled)

When the indexer is synced up to the tip of the chain, the hash of the tip is saved as following:

* `"t" → "{blockhash}"`

### `history`

Each funding output (except for provably unspendable ones when `--index-unspendables` is not enabled) results in the following new rows (`H` is for history, `F` is for funding):
Each transaction results in the following new row:

* `"C{txid}" → "{confirmed-height}"`

Each funding output (except for provably unspendable ones when `--index-unspendables` is not enabled) results in the following new row (`H` is for history, `F` is for funding):

* `"H{funding-scripthash}{funding-height}F{funding-txid:vout}{value}" → ""`
* `"a{funding-address-str}" → ""` (for prefix address search, only saved when `--address-search` is enabled)

Each spending input (except the coinbase) results in the following new rows (`S` is for spending):

* `"H{funding-scripthash}{spending-height}S{spending-txid:vin}{funding-txid:vout}{value}" → ""`

* `"S{funding-txid:vout}{spending-txid:vin}" → ""`
* `"S{funding-txid:vout}" → "{spending-txid:vin}{spending-height}"`

Each block results in the following new row:

* `"D{blockhash}" → ""` (signifies the block was indexed)

#### Elements only

Expand Down
287 changes: 287 additions & 0 deletions src/bin/db-migrate-v1-to-v2.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,287 @@
use std::collections::BTreeSet;
use std::convert::TryInto;
use std::str;

use itertools::Itertools;
use log::{debug, info, trace};
use rocksdb::WriteBatch;

use bitcoin::hashes::Hash;

use electrs::chain::{BlockHash, Txid};
use electrs::new_index::db::DBFlush;
use electrs::new_index::schema::{
lookup_confirmations, FullHash, Store, TxConfRow as V2TxConfRow, TxEdgeRow as V2TxEdgeRow,
TxHistoryKey,
};
use electrs::util::bincode::{deserialize_big, deserialize_little, serialize_little};
use electrs::{config::Config, metrics::Metrics};

const FROM_DB_VERSION: u32 = 1;
const TO_DB_VERSION: u32 = 2;

const BATCH_SIZE: usize = 15000;
const PROGRESS_EVERY: usize = BATCH_SIZE * 50;

// For Elements-based chains the 'I' asset history index is migrated too
#[cfg(not(feature = "liquid"))]
const HISTORY_PREFIXES: [u8; 1] = [b'H'];
#[cfg(feature = "liquid")]
const HISTORY_PREFIXES: [u8; 2] = [b'H', b'I'];

fn main() {
let config = Config::from_args();
let metrics = Metrics::new(config.monitoring_addr);
let store = Store::open(&config, &metrics, false);

let txstore_db = store.txstore_db();
let history_db = store.history_db();
let cache_db = store.cache_db();
let headers = store.headers();
let tip_height = headers.best_height() as u32;

// Check the DB version under `V` matches the expected version
for db in [txstore_db, history_db, cache_db] {
let ver_bytes = db.get(b"V").expect("missing DB version");
let ver: u32 = deserialize_little(&ver_bytes[0..4]).unwrap();
assert_eq!(ver, FROM_DB_VERSION, "unexpected DB version {}", ver);
}

// Utility to log progress once every PROGRESS_EVERY ticks
let mut tick = 0usize;
macro_rules! progress {
($($arg:tt)+) => {{
tick = tick.wrapping_add(1);
if tick % PROGRESS_EVERY == 0 {
debug!($($arg)+);
}
}};
}

// 1. Migrate the address prefix search index
// Moved as-is from the history db to the txstore db
info!("[1/4] migrating address prefix search index...");
let address_iter = history_db.iter_scan(b"a");
for chunk in &address_iter.chunks(BATCH_SIZE) {
let mut batch = WriteBatch::default();
for row in chunk {
progress!("[1/4] at {}", str::from_utf8(&row.key[1..]).unwrap());
batch.put(row.key, row.value);
}
// Write batches without flushing (sync and WAL disabled)
trace!("[1/4] writing batch of {} ops", batch.len());
txstore_db.write_batch(batch, DBFlush::Disable);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're going to test the in-memory impact of queued batches on some dev deployments to see if we can get the additional memory requirements for a migration

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The migration script itself shouldn't be overly memory-intensive, its set to buffer batches in chunks of 15,000 (BATCH_SIZE) to keep them reasonably sized and avoid excessive memory use.

My understanding is that there isn't much to be gained by making the batches gigantic anyway, since they're written with WAL disabled and the per-batch overhead is very small.

It could however make sense run the migration with more memory for RocksDB's write_buffer_size (configurable via --db-write-buffer-size-mb), so that RocksDB can buffer writes more efficiently internally and won't have to flush to SST as often.

}
// Flush the txstore db, only then delete the original rows from the history db
info!("[1/4] flushing V2 address index to txstore db");
txstore_db.flush();
info!("[1/4] deleting V1 address index from history db");
history_db.delete_range(b"a", b"b", DBFlush::Enable);

// 2. Migrate the TxConf transaction confirmation index
// - Moved from the txstore db to the history db
// - Changed from a set of blocks seen to include the tx to a single block (that is part of the best chain)
// - Changed from the block hash to the block height
// - Entries originating from stale blocks are removed
// Steps 3/4 depend on this index getting migrated first
info!("[2/4] migrating TxConf index...");
let txconf_iter = txstore_db.iter_scan(b"C");
for chunk in &txconf_iter.chunks(BATCH_SIZE) {
let mut batch = WriteBatch::default();
for v1_row in chunk {
let v1_txconf: V1TxConfKey =
deserialize_little(&v1_row.key).expect("invalid TxConfKey");
let blockhash = BlockHash::from_byte_array(v1_txconf.blockhash);
if let Some(header) = headers.header_by_blockhash(&blockhash) {
// The blockhash is still part of the best chain, use its height to construct the V2 row
let v2_row = V2TxConfRow::new(v1_txconf.txid, header.height() as u32).into_row();
batch.put(v2_row.key, v2_row.value);
} else {
// The transaction was reorged, don't write the V2 entry
// trace!("[2/4] skipping reorged TxConf for {}", Txid::from_byte_array(txconf.txid));
}
progress!(
"[2/4] migrating TxConf index ~{:.2}%",
est_hash_progress(&v1_txconf.txid)
);
}
// Write batches without flushing (sync and WAL disabled)
trace!("[2/4] writing batch of {} ops", batch.len());
history_db.write_batch(batch, DBFlush::Disable);
}
// Flush the history db, only then delete the original rows from the txstore db
info!("[2/4] flushing V2 TxConf to history db");
history_db.flush();
info!("[2/4] deleting V1 TxConf from txstore db");
txstore_db.delete_range(b"C", b"D", DBFlush::Enable);

// 3. Migrate the TxEdge spending index
// - Changed from a set of inputs seen to spend the outpoint to a single spending input (that is part of the best chain)
// - Keep the height of the spending tx
// - Entries originating from stale blocks are removed
info!("[3/4] migrating TxEdge index...");
let txedge_iter = history_db.iter_scan(b"S");
for chunk in &txedge_iter.chunks(BATCH_SIZE) {
let mut v1_edges = Vec::with_capacity(BATCH_SIZE);
let mut spending_txids = BTreeSet::new();
for v1_row in chunk {
if let Ok(v1_edge) = deserialize_little::<V1TxEdgeKey>(&v1_row.key) {
spending_txids.insert(Txid::from_byte_array(v1_edge.spending_txid));
v1_edges.push((v1_edge, v1_row.key));
}
// Rows with keys that cannot be deserialized into V1TxEdgeKey are assumed to already be upgraded, and skipped
// This is necessary to properly recover if the migration stops halfway through.
}

// Lookup the confirmation status for the entire chunk using a MultiGet operation
let confirmations = lookup_confirmations(history_db, tip_height, spending_txids);

let mut batch = WriteBatch::default();
for (v1_edge, v1_db_key) in v1_edges {
let spending_txid = Txid::from_byte_array(v1_edge.spending_txid);

// Remove the old V1 entry. V2 entries use a different key.
batch.delete(v1_db_key);

if let Some(spending_height) = confirmations.get(&spending_txid) {
// Re-add the V2 entry if it is still part of the best chain
let v2_row = V2TxEdgeRow::new(
v1_edge.funding_txid,
v1_edge.funding_vout,
v1_edge.spending_txid,
v1_edge.spending_vin,
*spending_height, // now with the height included
)
.into_row();
batch.put(v2_row.key, v2_row.value);
} else {
// The spending transaction was reorged, don't write the V2 entry
//trace!("[3/4] skipping reorged TxEdge for {}", spending_txid);
}

progress!(
"[3/4] migrating TxEdge index ~{:.2}%",
est_hash_progress(&v1_edge.funding_txid)
);
}
// Write batches without flushing (sync and WAL disabled)
trace!("[3/4] writing batch of {} ops", batch.len());
history_db.write_batch(batch, DBFlush::Disable);
}
info!("[3/4] flushing V2 TxEdge index to history db");
history_db.flush();

// 4. Migrate the TxHistory index
// Entries originating from stale blocks are removed, with no other changes
info!("[4/4] migrating TxHistory index...");
for prefix in HISTORY_PREFIXES {
let txhistory_iter = history_db.iter_scan(&[prefix]);
info!("[4/4] migrating TxHistory index {}", prefix as char);
for chunk in &txhistory_iter.chunks(BATCH_SIZE) {
let mut history_entries = Vec::with_capacity(BATCH_SIZE);
let mut history_txids = BTreeSet::new();
for row in chunk {
let hist: TxHistoryKey = deserialize_big(&row.key).expect("invalid TxHistoryKey");
history_txids.insert(hist.txinfo.get_txid());
history_entries.push((hist, row.key));
}

// Lookup the confirmation status for the entire chunk using a MultiGet operation
let confirmations = lookup_confirmations(history_db, tip_height, history_txids);

let mut batch = WriteBatch::default();
for (hist, db_key) in history_entries {
let hist_txid = hist.txinfo.get_txid();
if confirmations.get(&hist_txid) != Some(&hist.confirmed_height) {
// The history entry originated from a stale block, remove it
batch.delete(db_key);
// trace!("[4/4] removing reorged TxHistory for {}", hist.txinfo.get_txid());
}
progress!(
"[4/4] migrating TxHistory index {} ~{:.2}%",
prefix as char,
est_hash_progress(&hist.hash)
);
}
// Write batches without flushing (sync and WAL disabled)
trace!("[4/4] writing batch of {} deletions", batch.len());
if !batch.is_empty() {
history_db.write_batch(batch, DBFlush::Disable);
}
}
}
info!("[4/4] flushing TxHistory deletions to history db");
history_db.flush();

// Update the DB version under `V`
let ver_bytes = serialize_little(&(TO_DB_VERSION, config.light_mode)).unwrap();
for db in [txstore_db, history_db, cache_db] {
db.put_sync(b"V", &ver_bytes);
}

// Compact everything once at the end
txstore_db.full_compaction();
history_db.full_compaction();
}

// Estimates progress using the first 4 bytes, relying on RocksDB's lexicographic key ordering and uniform hash distribution
fn est_hash_progress(hash: &FullHash) -> f32 {
u32::from_be_bytes(hash[0..4].try_into().unwrap()) as f32 / u32::MAX as f32 * 100f32
}

#[derive(Debug, serde::Deserialize)]
struct V1TxConfKey {
#[allow(dead_code)]
code: u8,
txid: FullHash,
blockhash: FullHash,
}

#[derive(Debug, serde::Deserialize, serde::Serialize)]
struct V1TxEdgeKey {
code: u8,
funding_txid: FullHash,
funding_vout: u16,
spending_txid: FullHash,
spending_vin: u16,
}

/*
use bitcoin::hex::DisplayHex;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Debug code not removed yet

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I figured it doesn't really hurt to keep what I used to debug the migration script in there for future reference, not really in the way since that file isn't going to be touched much once we're migrated. I can remove it if it seems unnecessary :)


fn dump_db(db: &DB, label: &str, prefix: &[u8]) {
debug!("dumping {}", label);
for item in db.iter_scan(prefix) {
trace!(
"[{}] {} => {}",
label,
fmt_key(&item.key),
&item.value.to_lower_hex_string()
);
}
}

fn debug_batch(batch: &WriteBatch, label: &'static str) {
debug!("batch {} with {} ops", label, batch.len());
batch.iterate(&mut WriteBatchLogIterator(label));
}

struct WriteBatchLogIterator(&'static str);
impl rocksdb::WriteBatchIterator for WriteBatchLogIterator {
fn put(&mut self, key: Box<[u8]>, value: Box<[u8]>) {
trace!(
"[batch {}] PUT {} => {}",
self.0,
fmt_key(&key),
value.to_lower_hex_string()
);
}
fn delete(&mut self, key: Box<[u8]>) {
trace!("[batch {}] DELETE {}", self.0, fmt_key(&key));
}
}

fn fmt_key(key: &[u8]) -> String {
format!("{}-{}", key[0] as char, &key[1..].to_lower_hex_string())
}
*/
2 changes: 1 addition & 1 deletion src/bin/electrs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ fn run_server(config: Arc<Config>, salt_rwlock: Arc<RwLock<String>>) -> Result<(
signal.clone(),
&metrics,
)?);
let store = Arc::new(Store::open(&config.db_path.join("newindex"), &config, &metrics));
let store = Arc::new(Store::open(&config, &metrics, true));
let mut indexer = Indexer::open(
Arc::clone(&store),
fetch_from(&config, &store),
Expand Down
2 changes: 1 addition & 1 deletion src/bin/popular-scripts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use electrs::{
fn main() {
let config = Config::from_args();
let metrics = Metrics::new(config.monitoring_addr);
let store = Store::open(&config.db_path.join("newindex"), &config, &metrics);
let store = Store::open(&config, &metrics, true);

let mut iter = store.history_db().raw_iterator();
iter.seek(b"H");
Expand Down
2 changes: 1 addition & 1 deletion src/bin/tx-fingerprint-stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ fn main() {
let signal = Waiter::start(crossbeam_channel::never());
let config = Config::from_args();
let metrics = Metrics::new(config.monitoring_addr);
let store = Arc::new(Store::open(&config.db_path.join("newindex"), &config, &metrics));
let store = Arc::new(Store::open(&config, &metrics, true));

let metrics = Metrics::new(config.monitoring_addr);
metrics.start();
Expand Down
11 changes: 6 additions & 5 deletions src/elements/asset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::elements::registry::{AssetMeta, AssetRegistry};
use crate::errors::*;
use crate::new_index::schema::{TxHistoryInfo, TxHistoryKey, TxHistoryRow};
use crate::new_index::{db::DBFlush, ChainQuery, DBRow, Mempool, Query};
use crate::util::{bincode, full_hash, Bytes, FullHash, TransactionStatus, TxInput};
use crate::util::{bincode, full_hash, BlockId, Bytes, FullHash, TransactionStatus, TxInput};

lazy_static! {
pub static ref NATIVE_ASSET_ID: AssetId =
Expand Down Expand Up @@ -509,7 +509,7 @@ where

// save updated stats to cache
if let Some(lastblock) = lastblock {
chain.store().cache_db().write(
chain.store().cache_db().write_rows(
vec![asset_cache_row(asset_id, &newstats, &lastblock)],
DBFlush::Enable,
);
Expand All @@ -526,13 +526,14 @@ fn chain_asset_stats_delta<T>(
start_height: usize,
apply_fn: AssetStatApplyFn<T>,
) -> (T, Option<BlockHash>) {
let headers = chain.store().headers();
let history_iter = chain
.history_iter_scan(b'I', &asset_id.into_inner()[..], start_height)
.map(TxHistoryRow::from_row)
.filter_map(|history| {
chain
.tx_confirming_block(&history.get_txid())
.map(|blockid| (history, blockid))
// skip over entries that point to non-existing heights (may happen while new/reorged blocks are being processed)
let header = headers.header_by_height(history.key.confirmed_height as usize)?;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Liquid can have, at most, one reorged block. Not sure if it's worth changing this part.

If we do want to handle it, couldn't the confirmed_height be stale? Another block could have re-orged the tip.

This does save a lookup in tx_confirming_block (self.store.history_db.get(&TxConfRow::key(txid))?;) though since we can get the confirmed_height directly

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Liquid can have, at most, one reorged block.

The comment is actually misleading, this can also happen while new blocks are being added, not just with reorged blocks. So even on Liquid there could be multiple in-progress blocks that are being indexed into the DB but not yet part of the HeaderList (e.g. if connectivity is lost or electrs is restarted and we need to catch up).

I updated the comment(s) to be more accurate in 416cb4c.

couldn't the confirmed_height be stale?

Under the new design, if the confirmed_height exists in the HeaderList then it is guaranteed not to be a stale.

This does save a lookup in tx_confirming_block though since we can get the confirmed_height directly

Yes :) Avoiding the tx_confirming_block calls (and its random access DB reads) is the major win enabled by this PR.

Some((history, BlockId::from(header)))
});

let mut stats = init_stats;
Expand Down
Loading