diff --git a/doc/schema.md b/doc/schema.md index 4875cb4df..d9dabf089 100644 --- a/doc/schema.md +++ b/doc/schema.md @@ -25,17 +25,16 @@ Each block results in the following new rows: * `"M{blockhash}" → "{metadata}"` (block weight, size and number of txs) - * `"D{blockhash}" → ""` (signifies the block is done processing) + * `"D{blockhash}" → ""` (signifies the block was added) -Each transaction results in the following new rows: +Each transaction results in the following new row: * `"T{txid}" → "{serialized-transaction}"` - * `"C{txid}{confirmed-blockhash}" → ""` (a list of blockhashes where `txid` was seen to be confirmed) - -Each output results in the following new row: +Each output results in the following new rows: * `"O{txid}{vout}" → "{scriptpubkey}{value}"` + * `"a{funding-address-str}" → ""` (for prefix address search, only saved when `--address-search` is enabled) When the indexer is synced up to the tip of the chain, the hash of the tip is saved as following: @@ -43,16 +42,23 @@ When the indexer is synced up to the tip of the chain, the hash of the tip is sa ### `history` -Each funding output (except for provably unspendable ones when `--index-unspendables` is not enabled) results in the following new rows (`H` is for history, `F` is for funding): +Each transaction results in the following new row: + + * `"C{txid}" → "{confirmed-height}"` + +Each funding output (except for provably unspendable ones when `--index-unspendables` is not enabled) results in the following new row (`H` is for history, `F` is for funding): * `"H{funding-scripthash}{funding-height}F{funding-txid:vout}{value}" → ""` - * `"a{funding-address-str}" → ""` (for prefix address search, only saved when `--address-search` is enabled) Each spending input (except the coinbase) results in the following new rows (`S` is for spending): * `"H{funding-scripthash}{spending-height}S{spending-txid:vin}{funding-txid:vout}{value}" → ""` - * `"S{funding-txid:vout}{spending-txid:vin}" → ""` + * `"S{funding-txid:vout}" → "{spending-txid:vin}{spending-height}"` + +Each block results in the following new row: + + * `"D{blockhash}" → ""` (signifies the block was indexed) #### Elements only diff --git a/src/bin/db-migrate-v1-to-v2.rs b/src/bin/db-migrate-v1-to-v2.rs new file mode 100644 index 000000000..9289f1d77 --- /dev/null +++ b/src/bin/db-migrate-v1-to-v2.rs @@ -0,0 +1,287 @@ +use std::collections::BTreeSet; +use std::convert::TryInto; +use std::str; + +use itertools::Itertools; +use log::{debug, info, trace}; +use rocksdb::WriteBatch; + +use bitcoin::hashes::Hash; + +use electrs::chain::{BlockHash, Txid}; +use electrs::new_index::db::DBFlush; +use electrs::new_index::schema::{ + lookup_confirmations, FullHash, Store, TxConfRow as V2TxConfRow, TxEdgeRow as V2TxEdgeRow, + TxHistoryKey, +}; +use electrs::util::bincode::{deserialize_big, deserialize_little, serialize_little}; +use electrs::{config::Config, metrics::Metrics}; + +const FROM_DB_VERSION: u32 = 1; +const TO_DB_VERSION: u32 = 2; + +const BATCH_SIZE: usize = 15000; +const PROGRESS_EVERY: usize = BATCH_SIZE * 50; + +// For Elements-based chains the 'I' asset history index is migrated too +#[cfg(not(feature = "liquid"))] +const HISTORY_PREFIXES: [u8; 1] = [b'H']; +#[cfg(feature = "liquid")] +const HISTORY_PREFIXES: [u8; 2] = [b'H', b'I']; + +fn main() { + let config = Config::from_args(); + let metrics = Metrics::new(config.monitoring_addr); + let store = Store::open(&config, &metrics, false); + + let txstore_db = store.txstore_db(); + let history_db = store.history_db(); + let cache_db = store.cache_db(); + let headers = store.headers(); + let tip_height = headers.best_height() as u32; + + // Check the DB version under `V` matches the expected version + for db in [txstore_db, history_db, cache_db] { + let ver_bytes = db.get(b"V").expect("missing DB version"); + let ver: u32 = deserialize_little(&ver_bytes[0..4]).unwrap(); + assert_eq!(ver, FROM_DB_VERSION, "unexpected DB version {}", ver); + } + + // Utility to log progress once every PROGRESS_EVERY ticks + let mut tick = 0usize; + macro_rules! progress { + ($($arg:tt)+) => {{ + tick = tick.wrapping_add(1); + if tick % PROGRESS_EVERY == 0 { + debug!($($arg)+); + } + }}; + } + + // 1. Migrate the address prefix search index + // Moved as-is from the history db to the txstore db + info!("[1/4] migrating address prefix search index..."); + let address_iter = history_db.iter_scan(b"a"); + for chunk in &address_iter.chunks(BATCH_SIZE) { + let mut batch = WriteBatch::default(); + for row in chunk { + progress!("[1/4] at {}", str::from_utf8(&row.key[1..]).unwrap()); + batch.put(row.key, row.value); + } + // Write batches without flushing (sync and WAL disabled) + trace!("[1/4] writing batch of {} ops", batch.len()); + txstore_db.write_batch(batch, DBFlush::Disable); + } + // Flush the txstore db, only then delete the original rows from the history db + info!("[1/4] flushing V2 address index to txstore db"); + txstore_db.flush(); + info!("[1/4] deleting V1 address index from history db"); + history_db.delete_range(b"a", b"b", DBFlush::Enable); + + // 2. Migrate the TxConf transaction confirmation index + // - Moved from the txstore db to the history db + // - Changed from a set of blocks seen to include the tx to a single block (that is part of the best chain) + // - Changed from the block hash to the block height + // - Entries originating from stale blocks are removed + // Steps 3/4 depend on this index getting migrated first + info!("[2/4] migrating TxConf index..."); + let txconf_iter = txstore_db.iter_scan(b"C"); + for chunk in &txconf_iter.chunks(BATCH_SIZE) { + let mut batch = WriteBatch::default(); + for v1_row in chunk { + let v1_txconf: V1TxConfKey = + deserialize_little(&v1_row.key).expect("invalid TxConfKey"); + let blockhash = BlockHash::from_byte_array(v1_txconf.blockhash); + if let Some(header) = headers.header_by_blockhash(&blockhash) { + // The blockhash is still part of the best chain, use its height to construct the V2 row + let v2_row = V2TxConfRow::new(v1_txconf.txid, header.height() as u32).into_row(); + batch.put(v2_row.key, v2_row.value); + } else { + // The transaction was reorged, don't write the V2 entry + // trace!("[2/4] skipping reorged TxConf for {}", Txid::from_byte_array(txconf.txid)); + } + progress!( + "[2/4] migrating TxConf index ~{:.2}%", + est_hash_progress(&v1_txconf.txid) + ); + } + // Write batches without flushing (sync and WAL disabled) + trace!("[2/4] writing batch of {} ops", batch.len()); + history_db.write_batch(batch, DBFlush::Disable); + } + // Flush the history db, only then delete the original rows from the txstore db + info!("[2/4] flushing V2 TxConf to history db"); + history_db.flush(); + info!("[2/4] deleting V1 TxConf from txstore db"); + txstore_db.delete_range(b"C", b"D", DBFlush::Enable); + + // 3. Migrate the TxEdge spending index + // - Changed from a set of inputs seen to spend the outpoint to a single spending input (that is part of the best chain) + // - Keep the height of the spending tx + // - Entries originating from stale blocks are removed + info!("[3/4] migrating TxEdge index..."); + let txedge_iter = history_db.iter_scan(b"S"); + for chunk in &txedge_iter.chunks(BATCH_SIZE) { + let mut v1_edges = Vec::with_capacity(BATCH_SIZE); + let mut spending_txids = BTreeSet::new(); + for v1_row in chunk { + if let Ok(v1_edge) = deserialize_little::(&v1_row.key) { + spending_txids.insert(Txid::from_byte_array(v1_edge.spending_txid)); + v1_edges.push((v1_edge, v1_row.key)); + } + // Rows with keys that cannot be deserialized into V1TxEdgeKey are assumed to already be upgraded, and skipped + // This is necessary to properly recover if the migration stops halfway through. + } + + // Lookup the confirmation status for the entire chunk using a MultiGet operation + let confirmations = lookup_confirmations(history_db, tip_height, spending_txids); + + let mut batch = WriteBatch::default(); + for (v1_edge, v1_db_key) in v1_edges { + let spending_txid = Txid::from_byte_array(v1_edge.spending_txid); + + // Remove the old V1 entry. V2 entries use a different key. + batch.delete(v1_db_key); + + if let Some(spending_height) = confirmations.get(&spending_txid) { + // Re-add the V2 entry if it is still part of the best chain + let v2_row = V2TxEdgeRow::new( + v1_edge.funding_txid, + v1_edge.funding_vout, + v1_edge.spending_txid, + v1_edge.spending_vin, + *spending_height, // now with the height included + ) + .into_row(); + batch.put(v2_row.key, v2_row.value); + } else { + // The spending transaction was reorged, don't write the V2 entry + //trace!("[3/4] skipping reorged TxEdge for {}", spending_txid); + } + + progress!( + "[3/4] migrating TxEdge index ~{:.2}%", + est_hash_progress(&v1_edge.funding_txid) + ); + } + // Write batches without flushing (sync and WAL disabled) + trace!("[3/4] writing batch of {} ops", batch.len()); + history_db.write_batch(batch, DBFlush::Disable); + } + info!("[3/4] flushing V2 TxEdge index to history db"); + history_db.flush(); + + // 4. Migrate the TxHistory index + // Entries originating from stale blocks are removed, with no other changes + info!("[4/4] migrating TxHistory index..."); + for prefix in HISTORY_PREFIXES { + let txhistory_iter = history_db.iter_scan(&[prefix]); + info!("[4/4] migrating TxHistory index {}", prefix as char); + for chunk in &txhistory_iter.chunks(BATCH_SIZE) { + let mut history_entries = Vec::with_capacity(BATCH_SIZE); + let mut history_txids = BTreeSet::new(); + for row in chunk { + let hist: TxHistoryKey = deserialize_big(&row.key).expect("invalid TxHistoryKey"); + history_txids.insert(hist.txinfo.get_txid()); + history_entries.push((hist, row.key)); + } + + // Lookup the confirmation status for the entire chunk using a MultiGet operation + let confirmations = lookup_confirmations(history_db, tip_height, history_txids); + + let mut batch = WriteBatch::default(); + for (hist, db_key) in history_entries { + let hist_txid = hist.txinfo.get_txid(); + if confirmations.get(&hist_txid) != Some(&hist.confirmed_height) { + // The history entry originated from a stale block, remove it + batch.delete(db_key); + // trace!("[4/4] removing reorged TxHistory for {}", hist.txinfo.get_txid()); + } + progress!( + "[4/4] migrating TxHistory index {} ~{:.2}%", + prefix as char, + est_hash_progress(&hist.hash) + ); + } + // Write batches without flushing (sync and WAL disabled) + trace!("[4/4] writing batch of {} deletions", batch.len()); + if !batch.is_empty() { + history_db.write_batch(batch, DBFlush::Disable); + } + } + } + info!("[4/4] flushing TxHistory deletions to history db"); + history_db.flush(); + + // Update the DB version under `V` + let ver_bytes = serialize_little(&(TO_DB_VERSION, config.light_mode)).unwrap(); + for db in [txstore_db, history_db, cache_db] { + db.put_sync(b"V", &ver_bytes); + } + + // Compact everything once at the end + txstore_db.full_compaction(); + history_db.full_compaction(); +} + +// Estimates progress using the first 4 bytes, relying on RocksDB's lexicographic key ordering and uniform hash distribution +fn est_hash_progress(hash: &FullHash) -> f32 { + u32::from_be_bytes(hash[0..4].try_into().unwrap()) as f32 / u32::MAX as f32 * 100f32 +} + +#[derive(Debug, serde::Deserialize)] +struct V1TxConfKey { + #[allow(dead_code)] + code: u8, + txid: FullHash, + blockhash: FullHash, +} + +#[derive(Debug, serde::Deserialize, serde::Serialize)] +struct V1TxEdgeKey { + code: u8, + funding_txid: FullHash, + funding_vout: u16, + spending_txid: FullHash, + spending_vin: u16, +} + +/* +use bitcoin::hex::DisplayHex; + +fn dump_db(db: &DB, label: &str, prefix: &[u8]) { + debug!("dumping {}", label); + for item in db.iter_scan(prefix) { + trace!( + "[{}] {} => {}", + label, + fmt_key(&item.key), + &item.value.to_lower_hex_string() + ); + } +} + +fn debug_batch(batch: &WriteBatch, label: &'static str) { + debug!("batch {} with {} ops", label, batch.len()); + batch.iterate(&mut WriteBatchLogIterator(label)); +} + +struct WriteBatchLogIterator(&'static str); +impl rocksdb::WriteBatchIterator for WriteBatchLogIterator { + fn put(&mut self, key: Box<[u8]>, value: Box<[u8]>) { + trace!( + "[batch {}] PUT {} => {}", + self.0, + fmt_key(&key), + value.to_lower_hex_string() + ); + } + fn delete(&mut self, key: Box<[u8]>) { + trace!("[batch {}] DELETE {}", self.0, fmt_key(&key)); + } +} + +fn fmt_key(key: &[u8]) -> String { + format!("{}-{}", key[0] as char, &key[1..].to_lower_hex_string()) +} +*/ diff --git a/src/bin/electrs.rs b/src/bin/electrs.rs index 59f957ae5..f8178fbf7 100644 --- a/src/bin/electrs.rs +++ b/src/bin/electrs.rs @@ -68,7 +68,7 @@ fn run_server(config: Arc, salt_rwlock: Arc>) -> Result<( signal.clone(), &metrics, )?); - let store = Arc::new(Store::open(&config.db_path.join("newindex"), &config, &metrics)); + let store = Arc::new(Store::open(&config, &metrics, true)); let mut indexer = Indexer::open( Arc::clone(&store), fetch_from(&config, &store), diff --git a/src/bin/popular-scripts.rs b/src/bin/popular-scripts.rs index a7b245817..6ad39f667 100644 --- a/src/bin/popular-scripts.rs +++ b/src/bin/popular-scripts.rs @@ -8,7 +8,7 @@ use electrs::{ fn main() { let config = Config::from_args(); let metrics = Metrics::new(config.monitoring_addr); - let store = Store::open(&config.db_path.join("newindex"), &config, &metrics); + let store = Store::open(&config, &metrics, true); let mut iter = store.history_db().raw_iterator(); iter.seek(b"H"); diff --git a/src/bin/tx-fingerprint-stats.rs b/src/bin/tx-fingerprint-stats.rs index 83b3f213a..f96c7e7e4 100644 --- a/src/bin/tx-fingerprint-stats.rs +++ b/src/bin/tx-fingerprint-stats.rs @@ -24,7 +24,7 @@ fn main() { let signal = Waiter::start(crossbeam_channel::never()); let config = Config::from_args(); let metrics = Metrics::new(config.monitoring_addr); - let store = Arc::new(Store::open(&config.db_path.join("newindex"), &config, &metrics)); + let store = Arc::new(Store::open(&config, &metrics, true)); let metrics = Metrics::new(config.monitoring_addr); metrics.start(); diff --git a/src/elements/asset.rs b/src/elements/asset.rs index 726431b54..149ebd703 100644 --- a/src/elements/asset.rs +++ b/src/elements/asset.rs @@ -13,7 +13,7 @@ use crate::elements::registry::{AssetMeta, AssetRegistry}; use crate::errors::*; use crate::new_index::schema::{TxHistoryInfo, TxHistoryKey, TxHistoryRow}; use crate::new_index::{db::DBFlush, ChainQuery, DBRow, Mempool, Query}; -use crate::util::{bincode, full_hash, Bytes, FullHash, TransactionStatus, TxInput}; +use crate::util::{bincode, full_hash, BlockId, Bytes, FullHash, TransactionStatus, TxInput}; lazy_static! { pub static ref NATIVE_ASSET_ID: AssetId = @@ -509,7 +509,7 @@ where // save updated stats to cache if let Some(lastblock) = lastblock { - chain.store().cache_db().write( + chain.store().cache_db().write_rows( vec![asset_cache_row(asset_id, &newstats, &lastblock)], DBFlush::Enable, ); @@ -526,13 +526,14 @@ fn chain_asset_stats_delta( start_height: usize, apply_fn: AssetStatApplyFn, ) -> (T, Option) { + let headers = chain.store().headers(); let history_iter = chain .history_iter_scan(b'I', &asset_id.into_inner()[..], start_height) .map(TxHistoryRow::from_row) .filter_map(|history| { - chain - .tx_confirming_block(&history.get_txid()) - .map(|blockid| (history, blockid)) + // skip over entries that point to non-existing heights (may happen while new/reorged blocks are being processed) + let header = headers.header_by_height(history.key.confirmed_height as usize)?; + Some((history, BlockId::from(header))) }); let mut stats = init_stats; diff --git a/src/new_index/db.rs b/src/new_index/db.rs index e889aad63..dc37a483e 100644 --- a/src/new_index/db.rs +++ b/src/new_index/db.rs @@ -11,7 +11,7 @@ use crate::config::Config; use crate::new_index::db_metrics::RocksDbMetrics; use crate::util::{bincode, spawn_thread, Bytes}; -static DB_VERSION: u32 = 1; +static DB_VERSION: u32 = 2; #[derive(Debug, Eq, PartialEq)] pub struct DBRow { @@ -38,8 +38,8 @@ impl<'a> Iterator for ScanIterator<'a> { return None; } Some(DBRow { - key: key.to_vec(), - value: value.to_vec(), + key: key.into_vec(), + value: value.into_vec(), }) } } @@ -87,7 +87,7 @@ pub enum DBFlush { } impl DB { - pub fn open(path: &Path, config: &Config) -> DB { + pub fn open(path: &Path, config: &Config, verify_compat: bool) -> DB { debug!("opening DB at {:?}", path); let mut db_opts = rocksdb::Options::default(); db_opts.create_if_missing(true); @@ -119,7 +119,9 @@ impl DB { let db = DB { db: Arc::new(rocksdb::DB::open(&db_opts, path).expect("failed to open RocksDB")) }; - db.verify_compatibility(config); + if verify_compat { + db.verify_compatibility(config); + } db } @@ -170,7 +172,7 @@ impl DB { } } - pub fn write(&self, mut rows: Vec, flush: DBFlush) { + pub fn write_rows(&self, mut rows: Vec, flush: DBFlush) { log::trace!( "writing {} rows to {:?}, flush={:?}", rows.len(), @@ -182,6 +184,20 @@ impl DB { for row in rows { batch.put(&row.key, &row.value); } + self.write_batch(batch, flush) + } + + pub fn delete_rows(&self, mut rows: Vec, flush: DBFlush) { + log::trace!("deleting {} rows from {:?}", rows.len(), self.db,); + rows.sort_unstable_by(|a, b| a.key.cmp(&b.key)); + let mut batch = rocksdb::WriteBatch::default(); + for row in rows { + batch.delete(&row.key); + } + self.write_batch(batch, flush) + } + + pub fn write_batch(&self, batch: rocksdb::WriteBatch, flush: DBFlush) { let do_flush = match flush { DBFlush::Enable => true, DBFlush::Disable => false, @@ -218,21 +234,20 @@ impl DB { self.db.multi_get(keys) } + /// Remove database entries in the range [from, to) + pub fn delete_range>(&self, from: K, to: K, flush: DBFlush) { + let mut batch = rocksdb::WriteBatch::default(); + batch.delete_range(from, to); + self.write_batch(batch, flush); + } + fn verify_compatibility(&self, config: &Config) { - let mut compatibility_bytes = bincode::serialize_little(&DB_VERSION).unwrap(); - - if config.light_mode { - // append a byte to indicate light_mode is enabled. - // we're not letting bincode serialize this so that the compatiblity bytes won't change - // (and require a reindex) when light_mode is disabled. this should be chagned the next - // time we bump DB_VERSION and require a re-index anyway. - compatibility_bytes.push(1); - } + let compatibility_bytes = bincode::serialize_little(&(DB_VERSION, config.light_mode)).unwrap(); match self.get(b"V") { None => self.put(b"V", &compatibility_bytes), - Some(ref x) if x != &compatibility_bytes => { - panic!("Incompatible database found. Please reindex.") + Some(x) if x != compatibility_bytes => { + panic!("Incompatible database found. Please reindex or migrate.") } Some(_) => (), } diff --git a/src/new_index/query.rs b/src/new_index/query.rs index 03c5d201f..712fed330 100644 --- a/src/new_index/query.rs +++ b/src/new_index/query.rs @@ -1,5 +1,3 @@ -use rayon::prelude::*; - use std::collections::{BTreeSet, HashMap}; use std::sync::{Arc, RwLock, RwLockReadGuard}; use std::time::{Duration, Instant}; @@ -153,18 +151,29 @@ impl Query { } #[trace] - pub fn lookup_tx_spends(&self, tx: Transaction) -> Vec> { + pub fn lookup_tx_spends(&self, tx: &Transaction) -> Vec> { let txid = tx.compute_txid(); + let outpoints = tx + .output + .iter() + .enumerate() + .filter(|(_, txout)| is_spendable(txout)) + .map(|(vout, _)| OutPoint::new(txid, vout as u32)) + .collect::>(); + // First fetch all confirmed spends using a MultiGet operation, + // then fall back to the mempool for any outpoints not spent on-chain + let mut chain_spends = self.chain.lookup_spends(outpoints); + let mempool = self.mempool(); tx.output - .par_iter() + .iter() .enumerate() .map(|(vout, txout)| { if is_spendable(txout) { - self.lookup_spend(&OutPoint { - txid, - vout: vout as u32, - }) + let outpoint = OutPoint::new(txid, vout as u32); + chain_spends + .remove(&outpoint) + .or_else(|| mempool.lookup_spend(&outpoint)) } else { None } diff --git a/src/new_index/schema.rs b/src/new_index/schema.rs index 7fb74e825..e35d15c9b 100644 --- a/src/new_index/schema.rs +++ b/src/new_index/schema.rs @@ -18,8 +18,8 @@ use elements::{ }; use std::collections::{BTreeSet, HashMap, HashSet}; -use std::path::Path; -use std::sync::{Arc, RwLock}; +use std::convert::TryInto; +use std::sync::{Arc, RwLock, RwLockReadGuard}; use crate::{chain::{ BlockHash, BlockHeader, Network, OutPoint, Script, Transaction, TxOut, Txid, Value, @@ -58,16 +58,18 @@ pub struct Store { } impl Store { - pub fn open(path: &Path, config: &Config, metrics: &Metrics) -> Self { - let txstore_db = DB::open(&path.join("txstore"), config); + pub fn open(config: &Config, metrics: &Metrics, verify_compat: bool) -> Self { + let path = config.db_path.join("newindex"); + + let txstore_db = DB::open(&path.join("txstore"), config, verify_compat); let added_blockhashes = load_blockhashes(&txstore_db, &BlockRow::done_filter()); debug!("{} blocks were added", added_blockhashes.len()); - let history_db = DB::open(&path.join("history"), config); + let history_db = DB::open(&path.join("history"), config, verify_compat); let indexed_blockhashes = load_blockhashes(&history_db, &BlockRow::done_filter()); debug!("{} blocks were indexed", indexed_blockhashes.len()); - let cache_db = DB::open(&path.join("cache"), config); + let cache_db = DB::open(&path.join("cache"), config, verify_compat); let db_metrics = Arc::new(RocksDbMetrics::new(&metrics)); txstore_db.start_stats_exporter(Arc::clone(&db_metrics), "txstore_db"); @@ -75,8 +77,21 @@ impl Store { cache_db.start_stats_exporter(Arc::clone(&db_metrics), "cache_db"); let headers = if let Some(tip_hash) = txstore_db.get(b"t") { - let tip_hash = deserialize(&tip_hash).expect("invalid chain tip in `t`"); + let mut tip_hash = deserialize(&tip_hash).expect("invalid chain tip in `t`"); let headers_map = load_blockheaders(&txstore_db); + + // Move the tip back until we reach a block that is indexed in the history db. + // It is possible for the tip recorded under the db "t" key to be un-indexed if electrs + // shuts down during reorg handling. Normally this wouldn't matter because the non-indexed + // block would be stale, but it could matter if the chain later re-orged back to + // include the previously stale block because more blocks were built on top of it. + // Without this, the stale-then-not-stale block(s) would not get re-indexed correctly. + while !indexed_blockhashes.contains(&tip_hash) { + tip_hash = headers_map + .get(&tip_hash) + .expect("invalid header chain") + .prev_blockhash; + } debug!( "{} headers were loaded, tip at {:?}", headers_map.len(), @@ -109,6 +124,10 @@ impl Store { &self.cache_db } + pub fn headers(&self) -> RwLockReadGuard { + self.indexed_headers.read().unwrap() + } + pub fn done_initial_sync(&self) -> bool { self.txstore_db.get(b"t").is_some() } @@ -259,22 +278,62 @@ impl Indexer { db.enable_auto_compaction(); } - fn get_new_headers(&self, daemon: &Daemon, tip: &BlockHash) -> Result> { - let headers = self.store.indexed_headers.read().unwrap(); - let new_headers = daemon.get_new_headers(&headers, &tip)?; - let result = headers.order(new_headers); - - if let Some(tip) = result.last() { - info!("{:?} ({} left to index)", tip, result.len()); - }; - Ok(result) + fn get_new_headers( + &self, + daemon: &Daemon, + tip: &BlockHash, + ) -> Result<(Vec, Option)> { + let indexed_headers = self.store.indexed_headers.read().unwrap(); + let raw_new_headers = daemon.get_new_headers(&indexed_headers, tip)?; + let (new_headers, reorged_since) = indexed_headers.preprocess(raw_new_headers, tip); + + if let Some(tip) = new_headers.last() { + info!("{:?} ({} left to index)", tip, new_headers.len()); + } + Ok((new_headers, reorged_since)) } pub fn update(&mut self, daemon: &Daemon) -> Result { let daemon = daemon.reconnect()?; let tip = daemon.getbestblockhash()?; - let new_headers = self.get_new_headers(&daemon, &tip)?; + let (new_headers, reorged_since) = self.get_new_headers(&daemon, &tip)?; + + // Handle reorgs by undoing the reorged (stale) blocks first + if let Some(reorged_since) = reorged_since { + // Remove reorged headers from the in-memory HeaderList. + // This will also immediately invalidate all the history db entries originating from those blocks + // (even before the rows are deleted below), since they reference block heights that will no longer exist. + // This ensures consistency - it is not possible for blocks to be available (e.g. in GET /blocks/tip or /block/:hash) + // without the corresponding history entries for these blocks (e.g. in GET /address/:address/txs), or vice-versa. + let mut reorged_headers = self + .store + .indexed_headers + .write() + .unwrap() + .pop(reorged_since); + // The chain tip will temporarily drop to the common ancestor (at height reorged_since-1), + // until the new headers are `append()`ed (below). + + info!( + "processing reorg of depth {} since height {}", + reorged_headers.len(), + reorged_since, + ); + + // Reorged blocks are undone in chunks of 100, processed in serial, each as an atomic batch. + // Reverse them so that chunks closest to the chain tip are processed first, + // which is necessary to properly recover from crashes during reorg handling. + // Also see the comment under `Store::open()`. + reorged_headers.reverse(); + + // Fetch the reorged blocks, then undo their history index db rows. + // The txstore db rows are kept for reorged blocks/transactions. + start_fetcher(self.from, &daemon, reorged_headers)? + .map(|blocks| self.undo_index(&blocks)); + } + + // Add new blocks to the txstore db let to_add = self.headers_to_add(&new_headers); debug!( "adding transactions from {} blocks using {:?}", @@ -284,6 +343,7 @@ impl Indexer { start_fetcher(self.from, &daemon, to_add)?.map(|blocks| self.add(&blocks)); self.start_auto_compactions(&self.store.txstore_db); + // Index new blocks to the history db let to_index = self.headers_to_index(&new_headers); debug!( "indexing history from {} blocks using {:?}", @@ -301,19 +361,21 @@ impl Indexer { self.flush = DBFlush::Enable; } - // update the synced tip *after* the new data is flushed to disk + // Update the synced tip after all db writes are flushed debug!("updating synced tip to {:?}", tip); self.store.txstore_db.put_sync(b"t", &serialize(&tip)); + // Finally, append the new headers to the in-memory HeaderList. + // This will make both the headers and the history entries visible in the public APIs, consistently with each-other. let mut headers = self.store.indexed_headers.write().unwrap(); - headers.apply(new_headers); + headers.append(new_headers); assert_eq!(tip, *headers.tip()); if let FetchFrom::BlkFiles = self.from { self.from = FetchFrom::Bitcoind; } - self.tip_metric.set(headers.len() as i64 - 1); + self.tip_metric.set(headers.best_height() as i64); Ok(tip) } @@ -326,7 +388,7 @@ impl Indexer { }; { let _timer = self.start_timer("add_write"); - self.store.txstore_db.write(rows, self.flush); + self.store.txstore_db.write_rows(rows, self.flush); } self.store @@ -337,6 +399,37 @@ impl Indexer { } fn index(&self, blocks: &[BlockEntry]) { + self.store + .history_db + .write_rows(self._index(blocks), self.flush); + + let mut indexed_blockhashes = self.store.indexed_blockhashes.write().unwrap(); + indexed_blockhashes.extend(blocks.iter().map(|b| b.entry.hash())); + } + + // Undo the history db entries previously written for the given blocks (that were reorged). + // This includes the TxHistory, TxEdge, TxConf and BlockDone rows ('H', 'S', 'C' and 'D'), + // as well as the Elements history rows ('I' and 'i'). + // + // This does *not* remove any txstore db entries, which are intentionally kept + // even for reorged blocks. + fn undo_index(&self, blocks: &[BlockEntry]) { + self.store + .history_db + .delete_rows(self._index(blocks), self.flush); + // Note this doesn't actually "undo" the rows - the keys are simply deleted, and won't get + // reverted back to their prior value (if there was one). It is expected that the history db + // keys created by blocks are always unique and impossible to already exist from a prior block. + // This is true for all history keys (which always include the height or txid), but for example + // not true for the address prefix search index (in the txstore). + + let mut indexed_blockhashes = self.store.indexed_blockhashes.write().unwrap(); + for block in blocks { + indexed_blockhashes.remove(block.entry.hash()); + } + } + + fn _index(&self, blocks: &[BlockEntry]) -> Vec { let previous_txos_map = { let _timer = self.start_timer("index_lookup"); lookup_txos(&self.store.txstore_db, get_previous_txos(blocks)).unwrap() @@ -353,7 +446,7 @@ impl Indexer { } index_blocks(blocks, &previous_txos_map, &self.iconfig) }; - self.store.history_db.write(rows, self.flush); + rows } pub fn fetch_from(&mut self, from: FetchFrom) { @@ -401,6 +494,28 @@ impl ChainQuery { } } + pub fn get_block_txs( + &self, + hash: &BlockHash, + start_index: usize, + limit: usize, + ) -> Result> { + let txids = self.get_block_txids(hash).chain_err(|| "block not found")?; + ensure!(start_index < txids.len(), "start index out of range"); + + let txids_with_blockhash = txids + .into_iter() + .skip(start_index) + .take(limit) + .map(|txid| (txid, *hash)) + .collect::>(); + + self.lookup_txns(&txids_with_blockhash) + + // XXX use getblock in lightmode? a single RPC call, but would fetch all txs to get one page + // self.daemon.getblock(hash)?.txdata.into_iter().skip(start_index).take(limit).collect() + } + pub fn get_block_meta(&self, hash: &BlockHash) -> Option { let _timer = self.start_timer("get_block_meta"); @@ -426,17 +541,19 @@ impl ChainQuery { let entry = self.header_by_hash(hash)?; let meta = self.get_block_meta(hash)?; let txids = self.get_block_txids(hash)?; + let txids_with_blockhash: Vec<_> = + txids.into_iter().map(|txid| (txid, *hash)).collect(); + let raw_txs = self.lookup_raw_txns(&txids_with_blockhash).ok()?; // TODO avoid hiding all errors as None, return a Result // Reconstruct the raw block using the header and txids, // as let mut raw = Vec::with_capacity(meta.size as usize); raw.append(&mut serialize(entry.header())); - raw.append(&mut serialize(&VarInt(txids.len() as u64))); + raw.append(&mut serialize(&VarInt(raw_txs.len() as u64))); - for txid in txids { - // we don't need to provide the blockhash because we know we're not in light mode - raw.append(&mut self.lookup_raw_txn(&txid, None)?); + for mut raw_tx in raw_txs { + raw.append(&mut raw_tx); } Some(raw) @@ -495,13 +612,15 @@ impl ChainQuery { limit: usize, ) -> Vec<(Transaction, BlockId)> { let _timer_scan = self.start_timer("history"); - let txs_conf = self + let headers = self.store.indexed_headers.read().unwrap(); + let history_iter = self .history_iter_scan_reverse(code, hash) - .map(|row| TxHistoryRow::from_row(row).get_txid()) + .map(TxHistoryRow::from_row) + .map(|row| (row.get_txid(), row.key.confirmed_height as usize)) // XXX: unique() requires keeping an in-memory list of all txids, can we avoid that? .unique() // TODO seek directly to last seen tx without reading earlier rows - .skip_while(|txid| { + .skip_while(|(txid, _)| { // skip until we reach the last_seen_txid last_seen_txid.map_or(false, |last_seen_txid| last_seen_txid != txid) }) @@ -509,15 +628,23 @@ impl ChainQuery { Some(_) => 1, // skip the last_seen_txid itself None => 0, }) - .filter_map(|txid| self.tx_confirming_block(&txid).map(|b| (txid, b))) - .take(limit) - .collect::>(); + // skip over entries that point to non-existing heights (may happen while new/reorged blocks are being processed) + .filter_map(|(txid, height)| Some((txid, headers.header_by_height(height)?))) + .take(limit); + + let mut txids_with_blockhash = Vec::with_capacity(limit); + let mut blockids = Vec::with_capacity(limit); + for (txid, header) in history_iter { + txids_with_blockhash.push((txid, *header.hash())); + blockids.push(BlockId::from(header)); + } + drop(headers); - self.lookup_txns(&txs_conf) + self.lookup_txns(&txids_with_blockhash) .expect("failed looking up txs in history index") .into_iter() - .zip(txs_conf) - .map(|(tx, (_, blockid))| (tx, blockid)) + .zip(blockids) + .map(|(tx, blockid)| (tx, blockid)) .collect() } @@ -528,10 +655,13 @@ impl ChainQuery { fn _history_txids(&self, code: u8, hash: &[u8], limit: usize) -> Vec<(Txid, BlockId)> { let _timer = self.start_timer("history_txids"); + let headers = self.store.indexed_headers.read().unwrap(); self.history_iter_scan(code, hash, 0) - .map(|row| TxHistoryRow::from_row(row).get_txid()) + .map(TxHistoryRow::from_row) + .map(|row| (row.get_txid(), row.key.confirmed_height as usize)) .unique() - .filter_map(|txid| self.tx_confirming_block(&txid).map(|b| (txid, b))) + // skip over entries that point to non-existing heights (may happen while new/reorged blocks are being processed) + .filter_map(|(txid, height)| Some((txid, headers.header_by_height(height)?.into()))) .take(limit) .collect() } @@ -563,7 +693,7 @@ impl ChainQuery { // save updated utxo set to cache if let Some(lastblock) = lastblock { if had_cache || processed_items > MIN_HISTORY_ITEMS_TO_CACHE { - self.store.cache_db.write( + self.store.cache_db.write_rows( vec![UtxoCacheRow::new(scripthash, &newutxos, &lastblock).into_row()], DBFlush::Enable, ); @@ -605,12 +735,14 @@ impl ChainQuery { limit: usize, ) -> Result<(UtxoMap, Option, usize)> { let _timer = self.start_timer("utxo_delta"); + let headers = self.store.indexed_headers.read().unwrap(); let history_iter = self .history_iter_scan(b'H', scripthash, start_height) .map(TxHistoryRow::from_row) + // skip over entries that point to non-existing heights (may happen while new/reorged blocks are being processed) .filter_map(|history| { - self.tx_confirming_block(&history.get_txid()) - .map(|b| (history, b)) + let header = headers.header_by_height(history.key.confirmed_height as usize)?; + Some((history, BlockId::from(header))) }); let mut utxos = init_utxos; @@ -666,7 +798,7 @@ impl ChainQuery { // save updated stats to cache if let Some(lastblock) = lastblock { if newstats.funded_txo_count + newstats.spent_txo_count > MIN_HISTORY_ITEMS_TO_CACHE { - self.store.cache_db.write( + self.store.cache_db.write_rows( vec![StatsCacheRow::new(scripthash, &newstats, &lastblock).into_row()], DBFlush::Enable, ); @@ -683,15 +815,14 @@ impl ChainQuery { start_height: usize, ) -> (ScriptStats, Option) { let _timer = self.start_timer("stats_delta"); // TODO: measure also the number of txns processed. + let headers = self.store.indexed_headers.read().unwrap(); let history_iter = self .history_iter_scan(b'H', scripthash, start_height) .map(TxHistoryRow::from_row) + // skip over entries that point to non-existing heights (may happen while new/reorged blocks are being processed) .filter_map(|history| { - self.tx_confirming_block(&history.get_txid()) - // drop history entries that were previously confirmed in a re-orged block and later - // confirmed again at a different height - .filter(|blockid| blockid.height == history.key.confirmed_height as usize) - .map(|blockid| (history, blockid)) + let header = headers.header_by_height(history.key.confirmed_height as usize)?; + Some((history, BlockId::from(header))) }); let mut stats = init_stats; @@ -746,7 +877,7 @@ impl ChainQuery { pub fn address_search(&self, prefix: &str, limit: usize) -> Vec { let _timer_scan = self.start_timer("address_search"); self.store - .history_db + .txstore_db .iter_scan(&addr_search_filter(prefix)) .take(limit) .map(|row| std::str::from_utf8(&row.key[1..]).unwrap().to_string()) @@ -809,8 +940,9 @@ impl ChainQuery { .map(BlockId::from) } + /// Get the chain tip height. Panics if called on an empty HeaderList. pub fn best_height(&self) -> usize { - self.store.indexed_headers.read().unwrap().len() - 1 + self.store.indexed_headers.read().unwrap().best_height() } pub fn best_hash(&self) -> BlockHash { @@ -825,26 +957,40 @@ impl ChainQuery { .clone() } - // TODO: can we pass txids as a "generic iterable"? - // TODO: should also use a custom ThreadPoolBuilder? - pub fn lookup_txns(&self, txids: &[(Txid, BlockId)]) -> Result> { + pub fn lookup_txns(&self, txids: &[(Txid, BlockHash)]) -> Result> { let _timer = self.start_timer("lookup_txns"); - txids - .par_iter() - .map(|(txid, blockid)| { - self.lookup_txn(txid, Some(&blockid.hash)) - .chain_err(|| "missing tx") - }) - .collect::>>() + Ok(self + .lookup_raw_txns(txids)? + .into_iter() + .map(|rawtx| deserialize(&rawtx).expect("failed to parse Transaction")) + .collect()) } pub fn lookup_txn(&self, txid: &Txid, blockhash: Option<&BlockHash>) -> Option { let _timer = self.start_timer("lookup_txn"); - self.lookup_raw_txn(txid, blockhash).map(|rawtx| { - let txn: Transaction = deserialize(&rawtx).expect("failed to parse Transaction"); - assert_eq!(*txid, txn.compute_txid()); - txn - }) + let rawtx = self.lookup_raw_txn(txid, blockhash)?; + Some(deserialize(&rawtx).expect("failed to parse Transaction")) + } + + pub fn lookup_raw_txns(&self, txids: &[(Txid, BlockHash)]) -> Result> { + let _timer = self.start_timer("lookup_raw_txns"); + if self.light_mode { + txids + .par_iter() + .map(|(txid, blockhash)| { + self.lookup_raw_txn(txid, Some(blockhash)) + .chain_err(|| "missing tx") + }) + .collect() + } else { + let keys = txids.iter().map(|(txid, _)| TxRow::key(&txid[..])); + self.store + .txstore_db + .multi_get(keys) + .into_iter() + .map(|val| val.unwrap().chain_err(|| "missing tx")) + .collect() + } } pub fn lookup_raw_txn(&self, txid: &Txid, blockhash: Option<&BlockHash>) -> Option { @@ -878,34 +1024,52 @@ impl ChainQuery { pub fn lookup_spend(&self, outpoint: &OutPoint) -> Option { let _timer = self.start_timer("lookup_spend"); + let edge = TxEdgeValue::from_bytes(&self.store.history_db.get(&TxEdgeRow::key(outpoint))?); + let headers = self.store.indexed_headers.read().unwrap(); + // skip over entries that point to non-existing heights (may happen while new/reorged blocks are being processed) + let header = headers.header_by_height(edge.spending_height as usize)?; + Some(SpendingInput { + txid: deserialize(&edge.spending_txid).expect("failed to parse Txid"), + vin: edge.spending_vin as u32, + confirmed: Some(header.into()), + }) + } + + pub fn lookup_spends(&self, outpoints: BTreeSet) -> HashMap { + let _timer = self.start_timer("lookup_spends"); + let headers = self.store.indexed_headers.read().unwrap(); self.store .history_db - .iter_scan(&TxEdgeRow::filter(&outpoint)) - .map(TxEdgeRow::from_row) - .find_map(|edge| { - let txid: Txid = deserialize(&edge.key.spending_txid).unwrap(); - self.tx_confirming_block(&txid).map(|b| SpendingInput { - txid, - vin: edge.key.spending_vin as u32, - confirmed: Some(b), - }) + .multi_get(outpoints.iter().map(TxEdgeRow::key)) + .into_iter() + .zip(outpoints) + .filter_map(|(edge_val, outpoint)| { + let edge = TxEdgeValue::from_bytes(&edge_val.unwrap()?); + // skip over entries that point to non-existing heights (may happen while new/reorged blocks are being processed) + let header = headers.header_by_height(edge.spending_height as usize)?; + Some(( + outpoint, + SpendingInput { + txid: deserialize(&edge.spending_txid).expect("failed to parse Txid"), + vin: edge.spending_vin as u32, + confirmed: Some(header.into()), + }, + )) }) + .collect() } pub fn tx_confirming_block(&self, txid: &Txid) -> Option { let _timer = self.start_timer("tx_confirming_block"); + let row_value = self.store.history_db.get(&TxConfRow::key(txid))?; + let height = TxConfRow::height_from_val(&row_value); let headers = self.store.indexed_headers.read().unwrap(); - self.store - .txstore_db - .iter_scan(&TxConfRow::filter(&txid[..])) - .map(TxConfRow::from_row) - // header_by_blockhash only returns blocks that are part of the best chain, - // or None for orphaned blocks. - .filter_map(|conf| { - headers.header_by_blockhash(&deserialize(&conf.key.blockhash).unwrap()) - }) - .next() - .map(BlockId::from) + // skip over entries that point to non-existing heights (may happen while new/reorged blocks are being processed) + Some(headers.header_by_height(height as usize)?.into()) + } + + pub fn lookup_confirmations(&self, txids: BTreeSet) -> HashMap { + lookup_confirmations(&self.store.history_db, self.best_height() as u32, txids) } pub fn get_block_status(&self, hash: &BlockHash) -> BlockStatus { @@ -979,7 +1143,6 @@ fn load_blockheaders(db: &DB) -> HashMap { fn add_blocks(block_entries: &[BlockEntry], iconfig: &IndexerConfig) -> Vec { // persist individual transactions: // T{txid} → {rawtx} - // C{txid}{blockhash}{height} → // O{txid}{index} → {txout} // persist block headers', block txids' and metadata rows: // B{blockhash} → {header} @@ -992,7 +1155,7 @@ fn add_blocks(block_entries: &[BlockEntry], iconfig: &IndexerConfig) -> Vec = b.block.txdata.iter().map(|tx| tx.compute_txid()).collect(); for (tx, txid) in b.block.txdata.iter().zip(txids.iter()) { - add_transaction(*txid, tx, blockhash, &mut rows, iconfig); + add_transaction(*txid, tx, &mut rows, iconfig); } if !iconfig.light_mode { @@ -1008,15 +1171,7 @@ fn add_blocks(block_entries: &[BlockEntry], iconfig: &IndexerConfig) -> Vec, - iconfig: &IndexerConfig, -) { - rows.push(TxConfRow::new(txid, blockhash).into_row()); - +fn add_transaction(txid: Txid, tx: &Transaction, rows: &mut Vec, iconfig: &IndexerConfig) { if !iconfig.light_mode { rows.push(TxRow::new(txid, tx).into_row()); } @@ -1026,6 +1181,12 @@ fn add_transaction( if is_spendable(txo) { rows.push(TxOutRow::new(&txid, txo_index, txo).into_row()); } + + if iconfig.address_search { + if let Some(row) = addr_search_row(&txo.script_pubkey, iconfig.network) { + rows.push(row); + } + } } } @@ -1063,6 +1224,23 @@ fn lookup_txo(txstore_db: &DB, outpoint: &OutPoint) -> Option { .map(|val| deserialize(&val).expect("failed to parse TxOut")) } +pub fn lookup_confirmations( + history_db: &DB, + tip_height: u32, + txids: BTreeSet, +) -> HashMap { + history_db + .multi_get(txids.iter().map(TxConfRow::key)) + .into_iter() + .zip(txids) + .filter_map(|(res, txid)| { + let confirmation_height = u32::from_le_bytes(res.unwrap()?.try_into().unwrap()); + // skip over entries that point to non-existing heights (may happen while new/reorged blocks are being processed) + (confirmation_height <= tip_height).then_some((txid, confirmation_height)) + }) + .collect() +} + fn index_blocks( block_entries: &[BlockEntry], previous_txos_map: &HashMap, @@ -1091,12 +1269,17 @@ fn index_transaction( rows: &mut Vec, iconfig: &IndexerConfig, ) { + let txid = full_hash(&tx.compute_txid()[..]); + + // persist tx confirmation row: + // C{txid} → "{block_height}" + rows.push(TxConfRow::new(txid, confirmed_height).into_row()); + // persist history index: // H{funding-scripthash}{funding-height}F{funding-txid:vout} → "" // H{funding-scripthash}{spending-height}S{spending-txid:vin}{funding-txid:vout} → "" // persist "edges" for fast is-this-TXO-spent check // S{funding-txid:vout}{spending-txid:vin} → "" - let txid = full_hash(&tx.compute_txid()[..]); for (txo_index, txo) in tx.output.iter().enumerate() { if is_spendable(txo) || iconfig.index_unspendables { let history = TxHistoryRow::new( @@ -1109,12 +1292,6 @@ fn index_transaction( }), ); rows.push(history.into_row()); - - if iconfig.address_search { - if let Some(row) = addr_search_row(&txo.script_pubkey, iconfig.network) { - rows.push(row); - } - } } } for (txi_index, txi) in tx.input.iter().enumerate() { @@ -1143,6 +1320,7 @@ fn index_transaction( txi.previous_output.vout as u16, txid, txi_index as u16, + confirmed_height, ); rows.push(edge.into_row()); } @@ -1218,43 +1396,42 @@ impl TxRow { } #[derive(Serialize, Deserialize)] -struct TxConfKey { +pub struct TxConfKey { code: u8, txid: FullHash, - blockhash: FullHash, } -struct TxConfRow { +pub struct TxConfRow { key: TxConfKey, + value: u32, // the confirmation height } impl TxConfRow { - fn new(txid: Txid, blockhash: FullHash) -> TxConfRow { + pub fn new(txid: FullHash, height: u32) -> TxConfRow { let txid = full_hash(&txid[..]); TxConfRow { - key: TxConfKey { - code: b'C', - txid, - blockhash, - }, + key: TxConfKey { code: b'C', txid }, + value: height, } } - fn filter(prefix: &[u8]) -> Bytes { - [b"C", prefix].concat() + pub fn key(txid: &Txid) -> Bytes { + bincode::serialize_little(&TxConfKey { + code: b'C', + txid: full_hash(&txid[..]), + }) + .unwrap() } - fn into_row(self) -> DBRow { + pub fn into_row(self) -> DBRow { DBRow { key: bincode::serialize_little(&self.key).unwrap(), - value: vec![], + value: self.value.to_le_bytes().to_vec(), } } - fn from_row(row: DBRow) -> Self { - TxConfRow { - key: bincode::deserialize_little(&row.key).expect("failed to parse TxConfKey"), - } + fn height_from_val(val: &[u8]) -> u32 { + u32::from_le_bytes(val.try_into().expect("invalid TxConf value")) } } @@ -1497,52 +1674,61 @@ impl TxHistoryInfo { } #[derive(Serialize, Deserialize)] -struct TxEdgeKey { +pub struct TxEdgeKey { code: u8, funding_txid: FullHash, funding_vout: u16, +} + +#[derive(Serialize, Deserialize)] +pub struct TxEdgeValue { spending_txid: FullHash, spending_vin: u16, + spending_height: u32, } -struct TxEdgeRow { +pub struct TxEdgeRow { key: TxEdgeKey, + value: TxEdgeValue, } impl TxEdgeRow { - fn new( + pub fn new( funding_txid: FullHash, funding_vout: u16, spending_txid: FullHash, spending_vin: u16, + spending_height: u32, ) -> Self { let key = TxEdgeKey { code: b'S', funding_txid, funding_vout, + }; + let value = TxEdgeValue { spending_txid, spending_vin, + spending_height, }; - TxEdgeRow { key } + TxEdgeRow { key, value } } - fn filter(outpoint: &OutPoint) -> Bytes { - // TODO build key without using bincode? [ b"S", &outpoint.txid[..], outpoint.vout?? ].concat() + fn key(outpoint: &OutPoint) -> Bytes { bincode::serialize_little(&(b'S', full_hash(&outpoint.txid[..]), outpoint.vout as u16)) .unwrap() } - fn into_row(self) -> DBRow { + pub fn into_row(self) -> DBRow { DBRow { key: bincode::serialize_little(&self.key).unwrap(), - value: vec![], + value: bincode::serialize_little(&self.value).unwrap(), } } +} - fn from_row(row: DBRow) -> Self { - TxEdgeRow { - key: bincode::deserialize_little(&row.key).expect("failed to deserialize TxEdgeKey"), - } +impl TxEdgeValue { + fn from_bytes(bytes: &[u8]) -> Self { + bincode::deserialize_little(bytes).expect("invalid TxEdgeValue") } } diff --git a/src/rest.rs b/src/rest.rs index cefc49b7c..f991351bb 100644 --- a/src/rest.rs +++ b/src/rest.rs @@ -718,41 +718,28 @@ fn handle_request( } (&Method::GET, Some(&"block"), Some(hash), Some(&"txs"), start_index, None) => { let hash = BlockHash::from_str(hash)?; - let txids = query - .chain() - .get_block_txids(&hash) - .ok_or_else(|| HttpError::not_found("Block not found".to_string()))?; - let start_index = start_index .map_or(0u32, |el| el.parse().unwrap_or(0)) .max(0u32) as usize; - if start_index >= txids.len() { - bail!(HttpError::not_found("start index out of range".to_string())); - } else if start_index % CHAIN_TXS_PER_PAGE != 0 { - bail!(HttpError::from(format!( - "start index must be a multipication of {}", - CHAIN_TXS_PER_PAGE - ))); - } - // blockid_by_hash() only returns the BlockId for non-orphaned blocks, - // or None for orphaned - let confirmed_blockid = query.chain().blockid_by_hash(&hash); + ensure!( + start_index % CHAIN_TXS_PER_PAGE == 0, + "start index must be a multipication of {}", + CHAIN_TXS_PER_PAGE + ); + + // The BlockId would not be available for stale blocks + let blockid = query.chain().blockid_by_hash(&hash); - let txs = txids - .iter() - .skip(start_index) - .take(CHAIN_TXS_PER_PAGE) - .map(|txid| { - query - .lookup_txn(&txid) - .map(|tx| (tx, confirmed_blockid.clone())) - .ok_or_else(|| "missing tx".to_string()) - }) - .collect::)>, _>>()?; + let txs = query + .chain() + .get_block_txs(&hash, start_index, CHAIN_TXS_PER_PAGE)? + .into_iter() + .map(|tx| (tx, blockid)) + .collect(); - // XXX orphraned blocks alway get TTL_SHORT - let ttl = ttl_by_depth(confirmed_blockid.map(|b| b.height), query); + // XXX stale blocks alway get TTL_SHORT + let ttl = ttl_by_depth(blockid.map(|b| b.height), query); json_response(prepare_txs(txs, query, config), ttl) } @@ -996,7 +983,7 @@ fn handle_request( .lookup_txn(&hash) .ok_or_else(|| HttpError::not_found("Transaction not found".to_string()))?; let spends: Vec = query - .lookup_tx_spends(tx) + .lookup_tx_spends(&tx) .into_iter() .map(|spend| spend.map_or_else(SpendingValue::default, SpendingValue::from)) .collect(); diff --git a/src/util/block.rs b/src/util/block.rs index 5dac63bcf..c5c2f7c5a 100644 --- a/src/util/block.rs +++ b/src/util/block.rs @@ -2,9 +2,9 @@ use crate::chain::{BlockHash, BlockHeader}; use crate::errors::*; use crate::new_index::BlockEntry; +use itertools::Itertools; use std::collections::HashMap; use std::fmt; -use std::iter::FromIterator; use std::slice; use time::format_description::well_known::Rfc3339; use time::OffsetDateTime as DateTime; @@ -128,59 +128,88 @@ impl HeaderList { ); let mut headers = HeaderList::empty(); - headers.apply(headers.order(headers_chain)); + headers.append(headers.preprocess(headers_chain, &tip_hash).0); headers } + /// Pre-process the given `BlockHeader`s to verify they connect to the chain and to + /// transform them into `HeaderEntry`s with heights and hashes - but without saving them. + /// If the headers trigger a reorg, the `reorged_since` height is returned too. + /// Actually applying the headers requires to first pop() the reorged blocks (if any), + /// then append() the new ones. #[trace] - pub fn order(&self, new_headers: Vec) -> Vec { + pub fn preprocess( + &self, + new_headers: Vec, + new_tip: &BlockHash, + ) -> (Vec, Option) { // header[i] -> header[i-1] (i.e. header.last() is the tip) - struct HashedHeader { - blockhash: BlockHash, - header: BlockHeader, - } - let hashed_headers = - Vec::::from_iter(new_headers.into_iter().map(|header| HashedHeader { - blockhash: header.block_hash(), - header, - })); - for i in 1..hashed_headers.len() { - assert_eq!( - hashed_headers[i].header.prev_blockhash, - hashed_headers[i - 1].blockhash - ); - } - let prev_blockhash = match hashed_headers.first() { - Some(h) => h.header.prev_blockhash, - None => return vec![], // hashed_headers is empty - }; - let new_height: usize = if prev_blockhash == *DEFAULT_BLOCKHASH { - 0 + let (new_height, header_entries) = if !new_headers.is_empty() { + let hashed_headers = new_headers + .into_iter() + .map(|h| (h.block_hash(), h)) + .collect::>(); + for ((curr_blockhash, _), (_, next_header)) in hashed_headers.iter().tuple_windows() { + assert_eq!(*curr_blockhash, next_header.prev_blockhash); + } + assert_eq!(hashed_headers.last().unwrap().0, *new_tip); + + let prev_blockhash = &hashed_headers.first().unwrap().1.prev_blockhash; + let new_height = if *prev_blockhash == *DEFAULT_BLOCKHASH { + 0 + } else { + self.header_by_blockhash(prev_blockhash) + .expect("headers do not connect") + .height() + + 1 + }; + let header_entries = (new_height..) + .zip(hashed_headers) + .map(|(height, (hash, header))| HeaderEntry { + height, + hash, + header, + }) + .collect(); + (new_height, header_entries) } else { - self.header_by_blockhash(&prev_blockhash) - .unwrap_or_else(|| panic!("{} is not part of the blockchain", prev_blockhash)) + // No new headers, but the new tip could potentially shorten the chain (or be a no-op if it matches the existing tip) + // This should not normally happen, but might due to manual `invalidateblock` + let new_height = self + .header_by_blockhash(new_tip) + .expect("new tip not in chain") .height() - + 1 + + 1; + (new_height, vec![]) }; - (new_height..) - .zip(hashed_headers.into_iter()) - .map(|(height, hashed_header)| HeaderEntry { - height, - hash: hashed_header.blockhash, - header: hashed_header.header, - }) - .collect() + let reorged_since = (new_height < self.len()).then_some(new_height); + (header_entries, reorged_since) + } + + /// Pop off reorged blocks since (including) the given height and return them. + #[trace] + pub fn pop(&mut self, since_height: usize) -> Vec { + let reorged_headers = self.headers.split_off(since_height); + + for header in &reorged_headers { + self.heights.remove(header.hash()); + } + self.tip = self + .headers + .last() + .map(|h| *h.hash()) + .unwrap_or_else(|| *DEFAULT_BLOCKHASH); + + reorged_headers } + /// Append new headers. Expected to always extend the tip (stale blocks must be removed first) #[trace] - pub fn apply(&mut self, new_headers: Vec) { + pub fn append(&mut self, new_headers: Vec) { // new_headers[i] -> new_headers[i - 1] (i.e. new_headers.last() is the tip) - for i in 1..new_headers.len() { - assert_eq!(new_headers[i - 1].height() + 1, new_headers[i].height()); - assert_eq!( - *new_headers[i - 1].hash(), - new_headers[i].header().prev_blockhash - ); + for (curr_header, next_header) in new_headers.iter().tuple_windows() { + assert_eq!(curr_header.height() + 1, next_header.height()); + assert_eq!(*curr_header.hash(), next_header.header().prev_blockhash); } let new_height = match new_headers.first() { Some(entry) => { @@ -200,7 +229,7 @@ impl HeaderList { new_headers.len(), new_height ); - let _removed = self.headers.split_off(new_height); // keep [0..new_height) entries + assert_eq!(new_height, self.headers.len()); for new_header in new_headers { let height = new_header.height(); assert_eq!(height, self.headers.len()); @@ -214,11 +243,8 @@ impl HeaderList { pub fn header_by_blockhash(&self, blockhash: &BlockHash) -> Option<&HeaderEntry> { let height = self.heights.get(blockhash)?; let header = self.headers.get(*height)?; - if *blockhash == *header.hash() { - Some(header) - } else { - None - } + assert_eq!(header.hash(), blockhash); + Some(header) } #[trace] @@ -248,6 +274,13 @@ impl HeaderList { self.headers.len() } + /// Get the chain tip height. Panics if called on an empty HeaderList. + pub fn best_height(&self) -> usize { + self.len() + .checked_sub(1) + .expect("best_height() on empty HeaderList") + } + pub fn is_empty(&self) -> bool { self.headers.is_empty() } @@ -262,7 +295,7 @@ impl HeaderList { // Matches bitcoind's behaviour: bitcoin-cli getblock `bitcoin-cli getblockhash 0` | jq '.time == .mediantime' if height == 0 { self.headers.get(0).unwrap().header.time - } else if height > self.len() - 1 { + } else if height > self.best_height() { 0 } else { let mut timestamps = (height.saturating_sub(MTP_SPAN - 1)..=height) diff --git a/tests/common.rs b/tests/common.rs index 5fb995d2d..3662920a1 100644 --- a/tests/common.rs +++ b/tests/common.rs @@ -144,7 +144,7 @@ impl TestRunner { &metrics, )?); - let store = Arc::new(Store::open(&config.db_path.join("newindex"), &config, &metrics)); + let store = Arc::new(Store::open(&config, &metrics, true)); let fetch_from = if !env::var("JSONRPC_IMPORT").is_ok() && !cfg!(feature = "liquid") { // run the initial indexing from the blk files then switch to using the jsonrpc, @@ -276,6 +276,18 @@ impl TestRunner { } } +// Make the RpcApi methods available directly on TestRunner, +// without having to go through the node_client() getter +impl bitcoincore_rpc::RpcApi for TestRunner { + fn call serde::de::Deserialize<'a>>( + &self, + cmd: &str, + args: &[serde_json::Value], + ) -> bitcoincore_rpc::Result { + self.node_client().call(cmd, args) + } +} + pub fn init_rest_tester() -> Result<(rest::Handle, net::SocketAddr, TestRunner)> { let tester = TestRunner::new()?; let rest_server = rest::start(Arc::clone(&tester.config), Arc::clone(&tester.query)); diff --git a/tests/rest.rs b/tests/rest.rs index 382ad16fd..8cb7a4583 100644 --- a/tests/rest.rs +++ b/tests/rest.rs @@ -1,7 +1,11 @@ +use bitcoin::hex::FromHex; use bitcoind::bitcoincore_rpc::RpcApi; use serde_json::Value; use std::collections::HashSet; +#[cfg(not(feature = "liquid"))] +use {bitcoin::Amount, serde_json::from_value}; + use electrs::chain::Txid; pub mod common; @@ -12,17 +16,9 @@ use common::Result; fn test_rest() -> Result<()> { let (rest_handle, rest_addr, mut tester) = common::init_rest_tester().unwrap(); - let get_json = |path: &str| -> Result { - Ok(ureq::get(&format!("http://{}{}", rest_addr, path)) - .call()? - .into_json::()?) - }; - - let get_plain = |path: &str| -> Result { - Ok(ureq::get(&format!("http://{}{}", rest_addr, path)) - .call()? - .into_string()?) - }; + let get = |path: &str| ureq::get(&format!("http://{}{}", rest_addr, path)).call(); + let get_json = |path: &str| -> Result { Ok(get(path)?.into_json::()?) }; + let get_plain = |path: &str| -> Result { Ok(get(path)?.into_string()?) }; // Send transaction and confirm it let addr1 = tester.newaddress()?; @@ -138,6 +134,14 @@ fn test_rest() -> Result<()> { ); assert_eq!(res["tx_count"].as_u64(), Some(2)); + // Test GET /block/:hash/raw + let mut res = get(&format!("/block/{}/raw", blockhash))?.into_reader(); + let mut rest_rawblock = Vec::new(); + res.read_to_end(&mut rest_rawblock).unwrap(); + let node_hexblock = // uses low-level call() to support Elements + tester.call::("getblock", &[blockhash.to_string().into(), 0.into()])?; + assert_eq!(rest_rawblock, Vec::from_hex(&node_hexblock).unwrap()); + // Test GET /block/:hash/txs let res = get_json(&format!("/block/{}/txs", blockhash))?; let block_txs = res.as_array().expect("list of txs"); @@ -207,6 +211,223 @@ fn test_rest() -> Result<()> { let status = empty_package_resp.status(); assert_eq!(status, 400); + // Reorg handling tests + #[cfg(not(feature = "liquid"))] + { + let get_conf_height = |txid| -> Result> { + Ok(get_json(&format!("/tx/{}/status", txid))?["block_height"].as_u64()) + }; + let get_chain_stats = |addr| -> Result { + Ok(get_json(&format!("/address/{}", addr))?["chain_stats"].take()) + }; + let get_chain_txs = |addr| -> Result> { + Ok(from_value(get_json(&format!( + "/address/{}/txs/chain", + addr + ))?)?) + }; + let get_outspend = |outpoint: &bitcoin::OutPoint| -> Result { + get_json(&format!("/tx/{}/outspend/{}", outpoint.txid, outpoint.vout)) + }; + + let init_height = tester.node_client().get_block_count()?; + + let address = tester.newaddress()?; + let miner_address = tester.newaddress()?; + + let txid_a = tester.send(&address, Amount::from_sat(100000))?; + let txid_b = tester.send(&address, Amount::from_sat(200000))?; + let txid_c = tester.send(&address, Amount::from_sat(500000))?; + + let tx_a = tester.get_raw_transaction(&txid_a, None)?; + let tx_b = tester.get_raw_transaction(&txid_b, None)?; + let tx_c = tester.get_raw_transaction(&txid_c, None)?; + + // Confirm tx_a, tx_b and tx_c + let blockhash_1 = tester.mine()?; + + assert_eq!( + get_plain("/blocks/tip/height")?, + (init_height + 1).to_string() + ); + assert_eq!(get_plain("/blocks/tip/hash")?, blockhash_1.to_string()); + assert_eq!(get_conf_height(&txid_a)?, Some(init_height + 1)); + assert_eq!(get_conf_height(&txid_b)?, Some(init_height + 1)); + assert_eq!(get_conf_height(&txid_c)?, Some(init_height + 1)); + assert_eq!( + get_chain_stats(&address)?["funded_txo_sum"].as_u64(), + Some(800000) + ); + assert_eq!(get_chain_txs(&address)?.len(), 3); + + let c_outspend = get_outspend(&tx_c.input[0].previous_output)?; + assert_eq!( + c_outspend["txid"].as_str(), + Some(txid_c.to_string().as_str()) + ); + assert_eq!( + c_outspend["status"]["block_height"].as_u64(), + Some(init_height + 1) + ); + + // Reorg the last block, re-confirm tx_a at the same height + tester.invalidate_block(&blockhash_1)?; + tester.call::( + "generateblock", + &[ + miner_address.to_string().into(), + [txid_a.to_string()].into(), + ], + )?; + // Re-confirm tx_b at a different height + tester.call::( + "generateblock", + &[ + miner_address.to_string().into(), + [txid_b.to_string()].into(), + ], + )?; + // Don't re-confirm tx_c at all + + let blockhash_2 = tester.get_best_block_hash()?; + + tester.sync()?; + + assert_eq!( + get_plain("/blocks/tip/height")?, + (init_height + 2).to_string() + ); + assert_eq!(get_plain("/blocks/tip/hash")?, blockhash_2.to_string()); + + // Test address stats (GET /address/:address) + assert_eq!( + get_chain_stats(&address)?["funded_txo_sum"].as_u64(), + Some(300000) + ); + + // Test address history (GET /address/:address/txs/chain) + let addr_txs = get_chain_txs(&address)?; + assert_eq!(addr_txs.len(), 2); + assert_eq!( + addr_txs[0]["txid"].as_str(), + Some(txid_b.to_string().as_str()) + ); + assert_eq!( + addr_txs[0]["status"]["block_height"].as_u64(), + Some(init_height + 2) + ); + assert_eq!( + addr_txs[1]["txid"].as_str(), + Some(txid_a.to_string().as_str()) + ); + assert_eq!( + addr_txs[1]["status"]["block_height"].as_u64(), + Some(init_height + 1) + ); + + // Test transaction status lookup (GET /tx/:txid/status) + assert_eq!(get_conf_height(&txid_a)?, Some(init_height + 1)); + assert_eq!(get_conf_height(&txid_b)?, Some(init_height + 2)); + assert_eq!(get_conf_height(&txid_c)?, None); + + // Test spend edge lookup (GET /tx/:txid/outspend/:vout) + let a_spends = get_outspend(&tx_a.input[0].previous_output)?; + assert_eq!(a_spends["txid"].as_str(), Some(txid_a.to_string().as_str())); + assert_eq!( + a_spends["status"]["block_height"].as_u64(), + Some(init_height + 1) + ); + let b_spends = get_outspend(&tx_b.input[0].previous_output)?; + assert_eq!(b_spends["txid"].as_str(), Some(txid_b.to_string().as_str())); + assert_eq!( + b_spends["status"]["block_height"].as_u64(), + Some(init_height + 2) + ); + let c_spends = get_outspend(&tx_c.input[0].previous_output)?; + assert_eq!(c_spends["status"]["confirmed"].as_bool(), Some(false)); + + // Test a deeper reorg, all the way back to exclude tx_b + tester.generate_to_address(15, &address)?; + tester.sync()?; + tester.invalidate_block(&blockhash_2)?; + + for _ in 0..20 { + // Mine some empty blocks, intentionally without tx_b + tester.call::( + "generateblock", + &[miner_address.to_string().into(), Vec::::new().into()], + )?; + } + tester.sync()?; + + assert_eq!( + get_plain("/blocks/tip/height")?, + (init_height + 21).to_string() + ); + assert_eq!( + get_plain("/blocks/tip/hash")?, + tester.get_best_block_hash()?.to_string() + ); + + assert_eq!( + get_chain_stats(&address)?["funded_txo_sum"].as_u64(), + Some(100000) + ); + + let addr_txs = get_chain_txs(&address)?; + assert_eq!(addr_txs.len(), 1); + assert_eq!( + addr_txs[0]["txid"].as_str(), + Some(txid_a.to_string().as_str()) + ); + assert_eq!( + addr_txs[0]["status"]["block_height"].as_u64(), + Some(init_height + 1) + ); + + assert_eq!(get_conf_height(&txid_a)?, Some(init_height + 1)); + assert_eq!(get_conf_height(&txid_b)?, None); + assert_eq!(get_conf_height(&txid_c)?, None); + + let a_spends = get_outspend(&tx_a.input[0].previous_output)?; + assert_eq!( + a_spends["status"]["block_height"].as_u64(), + Some(init_height + 1) + ); + let b_spends = get_outspend(&tx_b.input[0].previous_output)?; + assert_eq!(b_spends["spent"].as_bool(), Some(false)); + let c_spends = get_outspend(&tx_b.input[0].previous_output)?; + assert_eq!(c_spends["spent"].as_bool(), Some(false)); + + // Invalidate the tip with no replacement, shortening the chain by one block + tester.invalidate_block(&tester.get_best_block_hash()?)?; + tester.sync()?; + assert_eq!( + get_plain("/blocks/tip/height")?, + (init_height + 20).to_string() + ); + + // Reorg everything back to genesis + tester.invalidate_block(&tester.get_block_hash(1)?)?; + tester.sync()?; + + assert_eq!(get_plain("/blocks/tip/height")?, 0.to_string()); + assert_eq!( + get_chain_stats(&address)?["funded_txo_sum"].as_u64(), + Some(0) + ); + assert_eq!(get_chain_txs(&address)?.len(), 0); + assert_eq!(get_conf_height(&txid_a)?, None); + assert_eq!(get_conf_height(&txid_b)?, None); + assert_eq!(get_conf_height(&txid_c)?, None); + let a_spends = get_outspend(&tx_a.input[0].previous_output)?; + assert_eq!(a_spends["spent"].as_bool(), Some(false)); + + // Mine some blocks so that the followup tests have some coins to play with + tester.generate_to_address(101, &miner_address)?; + tester.sync()?; + } + // bitcoin 28.0 only tests - submitpackage #[cfg(all(not(feature = "liquid"), feature = "bitcoind_28_0"))] {