diff --git a/.gitignore b/.gitignore index 05ca10e95..6c2b3a026 100644 --- a/.gitignore +++ b/.gitignore @@ -31,5 +31,7 @@ magicblock-test-storage/ # AI related **/CLAUDE.md +CODEBASE_MAP.md config.json config.toml +AGENTS.md diff --git a/Cargo.lock b/Cargo.lock index ca708dff1..d62709001 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3714,6 +3714,7 @@ name = "magicblock-processor" version = "0.8.6" dependencies = [ "bincode", + "blake3", "guinea", "magicblock-accounts-db", "magicblock-core", @@ -9210,6 +9211,7 @@ dependencies = [ "solana-transaction-status-client-types", "tempfile", "tokio", + "tokio-util", "tracing", "tracing-log", "tracing-subscriber", diff --git a/Cargo.toml b/Cargo.toml index ee6353966..6a40474bb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -57,6 +57,7 @@ async-nats = "0.46" async-trait = "0.1.77" base64 = "0.21.7" bincode = "1.3.3" +blake3 = "1.8" bytes = { version = "1.0", features = ["serde"] } borsh = { version = "1.5.1", features = ["derive", "unstable__schema"] } bs58 = "0.5.1" diff --git a/config.example.toml b/config.example.toml index 3d07bb8f5..3f4ff2b76 100644 --- a/config.example.toml +++ b/config.example.toml @@ -189,6 +189,13 @@ reset = false # Env: MBV_LEDGER__BLOCK_TIME block-time = "400ms" +# The number of slots included in a superblock. +# accountsdb checksum/snapshot is taken at superblock boundary +# +# Default: 72000 +# Env: MBV_LEDGER__SUPERBLOCK_SIZE +superblock-size = 72000 + # Upper hard threshold for the max size of the ledger (in bytes). # Ledger truncation logic kicks in when disk usage approaches this limit. # Default: 536_870_912 (512 MB) diff --git a/magicblock-accounts-db/src/snapshot.rs b/magicblock-accounts-db/src/snapshot.rs index 6876ef5cc..655548e44 100644 --- a/magicblock-accounts-db/src/snapshot.rs +++ b/magicblock-accounts-db/src/snapshot.rs @@ -140,6 +140,12 @@ impl SnapshotManager { tar.append_dir_all(".", snapshot_dir) .log_err(|| "Failed to append directory to tar")?; tar.finish().log_err(|| "Failed to finalize tar archive")?; + let enc = tar + .into_inner() + .log_err(|| "Failed to recover gzip encoder from tar builder")?; + let file = enc.finish().log_err(|| "Failed to finish gzip archive")?; + file.sync_all() + .log_err(|| "Failed to sync archive to disk")?; // Atomically rename to final path fs::rename(&tmp_path, &archive_path).log_err(|| { diff --git a/magicblock-aperture/src/geyser.rs b/magicblock-aperture/src/geyser.rs index ba2c47e71..ca96fb240 100644 --- a/magicblock-aperture/src/geyser.rs +++ b/magicblock-aperture/src/geyser.rs @@ -8,9 +8,9 @@ use agave_geyser_plugin_interface::geyser_plugin_interface::{ use json::{JsonValueTrait, Value}; use libloading::{Library, Symbol}; use magicblock_core::link::{ - accounts::AccountWithSlot, blocks::BlockUpdate, - transactions::TransactionStatus, + accounts::AccountWithSlot, transactions::TransactionStatus, }; +use magicblock_ledger::LatestBlockInner; use solana_account::ReadableAccount; use solana_transaction_status::RewardsAndNumPartitions; @@ -152,19 +152,19 @@ impl GeyserPluginManager { pub fn notify_block( &self, - block: &BlockUpdate, + block: &LatestBlockInner, ) -> Result<(), GeyserPluginError> { check_if_enabled!(self); let block = ReplicaBlockInfoV4 { - slot: block.meta.slot, - parent_slot: block.meta.slot.saturating_sub(1), - blockhash: &block.hash.to_string(), - block_height: Some(block.meta.slot), + slot: block.slot, + parent_slot: block.slot.saturating_sub(1), + blockhash: &block.blockhash.to_string(), + block_height: Some(block.slot), rewards: &RewardsAndNumPartitions { rewards: Vec::new(), num_partitions: None, }, - block_time: Some(block.meta.time), + block_time: Some(block.clock.unix_timestamp), // TODO(bmuddha): register proper values with the new ledger parent_blockhash: "11111111111111111111111111111111", executed_transaction_count: 0, diff --git a/magicblock-aperture/src/processor.rs b/magicblock-aperture/src/processor.rs index 4934afbd7..5005ad5c3 100644 --- a/magicblock-aperture/src/processor.rs +++ b/magicblock-aperture/src/processor.rs @@ -5,6 +5,7 @@ use magicblock_core::link::{ accounts::AccountUpdateRx, blocks::BlockUpdateRx, transactions::TransactionStatusRx, DispatchEndpoints, }; +use magicblock_ledger::LatestBlockInner; use tokio_util::sync::CancellationToken; use tracing::{info, instrument, warn}; @@ -48,8 +49,8 @@ pub(crate) struct EventProcessor { account_update_rx: AccountUpdateRx, /// A receiver for transaction status events, sourced from the `TransactionExecutor`. transaction_status_rx: TransactionStatusRx, - /// A receiver for new block events. - block_update_rx: BlockUpdateRx, + /// A receiver for block update events from the ledger. + block_update_rx: BlockUpdateRx, /// An entry point for communicating with loaded geyser plugins geyser: Arc, } @@ -61,13 +62,17 @@ impl EventProcessor { state: &SharedState, geyser: Arc, ) -> ApertureResult { + let latest_block = state.ledger.latest_block().clone(); + // Subscribe to block updates immediately to ensure we don't miss any + // notifications that might be sent before the `run()` method is polled. + let block_update_rx = latest_block.subscribe(); Ok(Self { subscriptions: state.subscriptions.clone(), transactions: state.transactions.clone(), blocks: state.blocks.clone(), account_update_rx: channels.account_update.clone(), transaction_status_rx: channels.transaction_status.clone(), - block_update_rx: channels.block_update.clone(), + block_update_rx, geyser, }) } @@ -107,16 +112,20 @@ impl EventProcessor { #[instrument(skip(self, cancel), fields(processor_id = id))] async fn run(self, id: usize, cancel: CancellationToken) { info!("Event processor started"); + let mut block_update_rx = self.block_update_rx; loop { tokio::select! { biased; - // Process a new block. - Ok(latest) = self.block_update_rx.recv_async() => { + // Process a new block. We use `recv()` which returns `Ok(())` on + // success or `Err(Lagged)` if we fell behind. In either case, we + // want to update with the latest block. Only `Err(Closed)` should + // stop us, but that's handled by the cancel token. + Ok(latest) = block_update_rx.recv() => { // Notify subscribers waiting on slot updates. - self.subscriptions.send_slot(latest.meta.slot); + self.subscriptions.send_slot(latest.slot); // Notify registered geyser plugins (if any) of the latest slot. - let _ = self.geyser.notify_slot(latest.meta.slot).inspect_err(|e| { + let _ = self.geyser.notify_slot(latest.slot).inspect_err(|e| { warn!(error = ?e, "Geyser slot update failed"); }); // Notify listening geyser plugins @@ -124,7 +133,7 @@ impl EventProcessor { warn!(error = ?e, "Geyser block update failed"); }); // Update the global blocks cache with the latest block. - self.blocks.set_latest(latest); + self.blocks.set_latest(&latest); } // Process a new account state update. diff --git a/magicblock-aperture/src/state/blocks.rs b/magicblock-aperture/src/state/blocks.rs index 263606ee7..88c1b20c9 100644 --- a/magicblock-aperture/src/state/blocks.rs +++ b/magicblock-aperture/src/state/blocks.rs @@ -1,10 +1,8 @@ use std::{ops::Deref, sync::Arc, time::Duration}; use arc_swap::ArcSwapAny; -use magicblock_core::{ - link::blocks::{BlockHash, BlockMeta, BlockUpdate}, - Slot, -}; +use magicblock_core::{link::blocks::BlockHash, Slot}; +use magicblock_ledger::LatestBlockInner; use solana_rpc_client_api::response::RpcBlockhash; use super::ExpiringCache; @@ -26,7 +24,7 @@ pub(crate) struct BlocksCache { /// Latest observed block (updated whenever the ledger transitions to new slot) latest: ArcSwapAny>, /// An underlying time-based cache for storing `BlockHash` to `BlockMeta` mappings. - cache: ExpiringCache, + cache: ExpiringCache, } /// Last produced block that has been put into cache. We need to keep this separately, @@ -38,7 +36,7 @@ pub(crate) struct LastCachedBlock { } impl Deref for BlocksCache { - type Target = ExpiringCache; + type Target = ExpiringCache; fn deref(&self) -> &Self::Target { &self.cache } @@ -61,6 +59,8 @@ impl BlocksCache { let blocktime_ratio = SOLANA_BLOCK_TIME / blocktime as f64; let block_validity = blocktime_ratio * MAX_VALID_BLOCKHASH_SLOTS; let cache = ExpiringCache::new(BLOCK_CACHE_TTL); + // Add the initial blockhash to the cache so it's recognized as valid + cache.push(latest.blockhash, latest.slot); Self { latest: ArcSwapAny::new(latest.into()), block_validity: block_validity as u64, @@ -69,14 +69,14 @@ impl BlocksCache { } /// Updates the latest block information in the cache. - pub(crate) fn set_latest(&self, latest: BlockUpdate) { + pub(crate) fn set_latest(&self, latest: &LatestBlockInner) { let last = LastCachedBlock { - blockhash: latest.hash, - slot: latest.meta.slot, + blockhash: latest.blockhash, + slot: latest.slot, }; // Register the block in the expiring cache - self.cache.push(latest.hash, latest.meta); + self.cache.push(latest.blockhash, latest.slot); // And mark it as latest observed self.latest.swap(last.into()); } diff --git a/magicblock-aperture/tests/accounts.rs b/magicblock-aperture/tests/accounts.rs index 0128bb72e..25f29e71a 100644 --- a/magicblock-aperture/tests/accounts.rs +++ b/magicblock-aperture/tests/accounts.rs @@ -31,7 +31,6 @@ async fn test_get_account_info() { .get_account_with_commitment(&Pubkey::new_unique(), Default::default()) .await .expect("rpc request for non-existent account failed"); - assert_eq!(nonexistent.context.slot, env.latest_slot()); assert_eq!(nonexistent.value, None, "account should not exist"); } diff --git a/magicblock-aperture/tests/blocks.rs b/magicblock-aperture/tests/blocks.rs index 5fd4db305..5b69f561f 100644 --- a/magicblock-aperture/tests/blocks.rs +++ b/magicblock-aperture/tests/blocks.rs @@ -5,23 +5,24 @@ use solana_transaction_status::UiTransactionEncoding; mod setup; -/// Verifies `get_slot` consistently returns the latest slot number. +/// Verifies `get_slot` returns a valid slot number that progresses over time. #[tokio::test] async fn test_get_slot() { let env = RpcTestEnv::new().await; - // Check repeatedly while advancing slots to ensure it stays in sync. - for _ in 0..8 { - let slot = env.rpc.get_slot().await.expect("get_slot request failed"); - assert_eq!( - slot, - env.latest_slot(), - "RPC slot should match the latest slot in the ledger" - ); - env.advance_slots(1); - } + let initial_slot = + env.rpc.get_slot().await.expect("get_slot request failed"); + + // Wait for at least 2 slots to progress, demonstrating auto-advancement + env.wait_for_slot_progress(2).await; + + let new_slot = env.rpc.get_slot().await.expect("get_slot request failed"); + assert!( + new_slot >= initial_slot + 2, + "slot should have progressed by at least 2: initial={initial_slot}, new={new_slot}" + ); } -/// Verifies `get_block_height` returns the latest slot number. +/// Verifies `get_block_height` returns a valid block height. #[tokio::test] async fn test_get_block_height() { let env = RpcTestEnv::new().await; @@ -30,46 +31,49 @@ async fn test_get_block_height() { .get_block_height() .await .expect("get_block_height request failed"); - assert_eq!( - block_height, - env.latest_slot(), - "RPC block height should match the latest slot" - ); + // Block height should be at least 1 (genesis slot moves to slot 1 during setup) + assert!(block_height >= 1, "block height should be at least 1"); } -/// Verifies `get_latest_blockhash` and its commitment-aware variant. +/// Verifies `get_latest_blockhash` returns a valid blockhash. #[tokio::test] async fn test_get_latest_blockhash() { let env = RpcTestEnv::new().await; - env.advance_slots(1); // Ensure a non-genesis blockhash exists. + // Wait for at least one slot to ensure a non-genesis blockhash exists + env.wait_for_slot_progress(1).await; + + // Test the method with commitment level (call this first to get both values together) + let (blockhash, last_valid_slot) = env + .rpc + .get_latest_blockhash_with_commitment(Default::default()) + .await + .expect("failed to request blockhash with commitment"); - // Test the basic method. + // Test the basic method - may return a different blockhash if slot advanced let rpc_blockhash = env .rpc .get_latest_blockhash() .await .expect("get_latest_blockhash request failed"); - let latest_block = env.block.load(); - assert_eq!( - rpc_blockhash, latest_block.blockhash, - "RPC blockhash should match the latest from the ledger" + // Both blockhashes should be valid (non-default) + assert_ne!( + blockhash, + BlockHash::default(), + "blockhash should not be default" ); - - // Test the method with commitment level, which also returns the last valid slot. - let (blockhash, last_valid_slot) = env - .rpc - .get_latest_blockhash_with_commitment(Default::default()) - .await - .expect("failed to request blockhash with commitment"); - - assert_eq!( - blockhash, latest_block.blockhash, - "RPC blockhash with commitment should also match" + assert_ne!( + rpc_blockhash, + BlockHash::default(), + "rpc_blockhash should not be default" ); + + // last_valid_slot should be greater than current slot + let current_slot = + env.rpc.get_slot().await.expect("get_slot request failed"); assert!( - last_valid_slot >= latest_block.slot + 150, - "last_valid_block_height is incorrect" + last_valid_slot > current_slot, + "last_valid_block_height ({last_valid_slot}) should be greater than current slot ({current_slot})" ); } @@ -77,18 +81,25 @@ async fn test_get_latest_blockhash() { #[tokio::test] async fn test_is_blockhash_valid() { let env = RpcTestEnv::new().await; - env.advance_slots(1); + // Wait for at least one slot to ensure a valid blockhash exists + env.wait_for_slot_progress(1).await; + + // Get a recent blockhash + let latest_blockhash = env + .rpc + .get_latest_blockhash() + .await + .expect("get_latest_blockhash request failed"); - // Test a recent, valid blockhash. - let latest_block = env.block.load(); + // Test a recent, valid blockhash let is_valid = env .rpc - .is_blockhash_valid(&latest_block.blockhash, Default::default()) + .is_blockhash_valid(&latest_blockhash, Default::default()) .await .expect("request for recent blockhash failed"); assert!(is_valid, "a recent blockhash should be considered valid"); - // Test an unknown (and therefore invalid) blockhash. + // Test an unknown (and therefore invalid) blockhash let invalid_blockhash = BlockHash::new_unique(); let is_valid = env .rpc @@ -101,42 +112,60 @@ async fn test_is_blockhash_valid() { ); } -/// Verifies `get_block` can fetch a full block and its contents. +/// Verifies `get_block` can fetch a block with transactions. #[tokio::test] async fn test_get_block() { let env = RpcTestEnv::new().await; + // Record the slot before executing the transaction + let slot_before = + env.rpc.get_slot().await.expect("get_slot request failed"); + let signature = env.execute_transaction().await; - let latest_slot = env.block.load().slot; - let latest_blockhash = env.block.load().blockhash; - // Test fetching an existing block with a specific config. - let block = env - .rpc - .get_block_with_config( - latest_slot, - RpcBlockConfig { - encoding: Some(UiTransactionEncoding::Base64), - ..Default::default() - }, - ) - .await - .expect("get_block request for an existing block failed"); + // Wait for at least one slot to progress + env.wait_for_slot_progress(1).await; + + // The transaction should be in a slot >= slot_before + let current_slot = + env.rpc.get_slot().await.expect("get_slot request failed"); - assert_eq!(block.block_height, Some(latest_slot)); - assert_eq!(block.blockhash, latest_blockhash.to_string()); + // Find the block containing our transaction by checking recent slots + let mut found_block = None; + for slot in slot_before..=current_slot { + if let Ok(block) = env + .rpc + .get_block_with_config( + slot, + RpcBlockConfig { + encoding: Some(UiTransactionEncoding::Base64), + ..Default::default() + }, + ) + .await + { + if let Some(transactions) = &block.transactions { + for txn in transactions { + if let Some(decoded) = txn.transaction.decode() { + if decoded.signatures[0] == signature { + found_block = Some((slot, block)); + break; + } + } + } + } + } + if found_block.is_some() { + break; + } + } - let first_transaction = block - .transactions - .expect("block should contain transactions") - .pop() - .expect("transaction list should not be empty"); + let (slot, block) = + found_block.expect("should find block containing transaction"); - let block_txn_signature = - first_transaction.transaction.decode().unwrap().signatures[0]; - assert_eq!(block_txn_signature, signature); + assert_eq!(block.block_height, Some(slot)); - // Test fetching a non-existent block, which should result in an error. - let nonexistent_block_result = env.rpc.get_block(latest_slot + 100).await; + // Test fetching a non-existent block, which should result in an error + let nonexistent_block_result = env.rpc.get_block(current_slot + 100).await; assert!( nonexistent_block_result.is_err(), "request for a non-existent block should fail" @@ -147,43 +176,80 @@ async fn test_get_block() { #[tokio::test] async fn test_get_blocks() { let env = RpcTestEnv::new().await; - env.advance_slots(5); + // Wait for at least 5 slots to exist + env.wait_for_slot_progress(5).await; + + let current_slot = + env.rpc.get_slot().await.expect("get_slot request failed"); + let start = current_slot.saturating_sub(3); + let end = current_slot; let blocks = env .rpc - .get_blocks(1, Some(4)) + .get_blocks(start, Some(end)) .await .expect("get_blocks request failed"); - assert_eq!( - blocks, - vec![1, 2, 3, 4], - "should return the correct range of slots" - ); + // Should return a contiguous range from start to end + assert!(!blocks.is_empty(), "should return at least one block"); + + // Verify each slot is within bounds [start, end] + for &slot in &blocks { + assert!(slot >= start, "slot {slot} should be >= start {start}"); + assert!(slot <= end, "slot {slot} should be <= end {end}"); + } + + // Verify slots are strictly increasing and contiguous + for i in 1..blocks.len() { + let prev = blocks[i - 1]; + let curr = blocks[i]; + assert!( + curr > prev, + "slots should be strictly increasing: {prev} -> {curr}" + ); + assert_eq!( + curr, + prev + 1, + "slots should be contiguous: expected {} after {}, got {}", + prev + 1, + prev, + curr + ); + } } -/// Verifies `get_block_time` returns the correct Unix timestamp for a slot. +/// Verifies `get_block_time` returns a valid Unix timestamp for a slot. #[tokio::test] async fn test_get_block_time() { let env = RpcTestEnv::new().await; - let latest_block = env.block.load(); + // Wait for at least one slot + env.wait_for_slot_progress(1).await; + + let current_slot = + env.rpc.get_slot().await.expect("get_slot request failed"); let time = env .rpc - .get_block_time(latest_block.slot) + .get_block_time(current_slot) .await .expect("get_block_time request failed"); - assert_eq!( - time, latest_block.clock.unix_timestamp, - "get_block_time should return the same timestamp stored in the ledger" + + // The timestamp should be a reasonable Unix timestamp (> 1 billion) + assert!( + time > 1_000_000_000, + "get_block_time should return a valid Unix timestamp, got {time}" ); } -/// Verifies `get_blocks_with_limit` can fetch a limited number of slots from a start point. +/// Verifies `get_blocks_with_limit` can fetch a limited number of slots. #[tokio::test] async fn test_get_blocks_with_limit() { let env = RpcTestEnv::new().await; - env.advance_slots(10); - let start_slot = 5; + // Wait for at least 10 slots to exist + env.wait_for_slot_progress(10).await; + + let current_slot = + env.rpc.get_slot().await.expect("get_slot request failed"); + let start_slot = current_slot.saturating_sub(7); let limit = 3; let blocks = env @@ -191,9 +257,31 @@ async fn test_get_blocks_with_limit() { .get_blocks_with_limit(start_slot, limit) .await .expect("get_blocks_with_limit request failed"); + + // Should return exactly `limit` blocks + assert_eq!(blocks.len(), limit, "should return exactly {limit} blocks"); + // First block should be start_slot assert_eq!( - blocks, - vec![5, 6, 7], - "should return the correct range of slots with a limit" + blocks.first().copied().unwrap_or(0), + start_slot, + "first block should be start_slot" ); + + // Verify slots are strictly increasing and contiguous + for i in 1..blocks.len() { + let prev = blocks[i - 1]; + let curr = blocks[i]; + assert!( + curr > prev, + "slots should be strictly increasing: {prev} -> {curr}" + ); + assert_eq!( + curr, + prev + 1, + "slots should be contiguous: expected {} after {}, got {}", + prev + 1, + prev, + curr + ); + } } diff --git a/magicblock-aperture/tests/mocked.rs b/magicblock-aperture/tests/mocked.rs index 4217ada9d..f96cfdf57 100644 --- a/magicblock-aperture/tests/mocked.rs +++ b/magicblock-aperture/tests/mocked.rs @@ -158,10 +158,15 @@ async fn test_get_epoch_info() { .expect("get_epoch_info request failed"); assert_eq!(epoch_info.epoch, 0, "epoch should be 0"); - assert_eq!( + // The absolute_slot should be at most 1 behind the current slot + // due to auto-advancement timing + let current_slot = env.latest_slot(); + assert!( + epoch_info.absolute_slot <= current_slot + && epoch_info.absolute_slot >= current_slot.saturating_sub(1), + "absolute_slot {} should be within 1 of current slot {}", epoch_info.absolute_slot, - env.latest_slot(), - "absolute_slot should be equal to env slot" + current_slot ); } diff --git a/magicblock-aperture/tests/setup.rs b/magicblock-aperture/tests/setup.rs index 9399745ad..39c91b9e2 100644 --- a/magicblock-aperture/tests/setup.rs +++ b/magicblock-aperture/tests/setup.rs @@ -85,9 +85,13 @@ impl RpcTestEnv { /// 3. Starts a live `JsonRpcServer` (HTTP and WebSocket) in a background task. /// 4. Connects an `RpcClient` and `PubsubClient` to the running server. pub async fn new() -> Self { + // Use a short block time so the scheduler auto-advances slots. + // Tests should use `wait_for_slot_progress()` to wait for slot progression. const BLOCK_TIME_MS: u64 = 50; let execution = ExecutionTestEnv::new(); + // Wait for the scheduler to be ready and in primary mode + execution.wait_for_scheduler_ready().await; let faucet = Keypair::new(); execution.fund_account(faucet.pubkey(), Self::INIT_ACCOUNT_BALANCE); @@ -137,8 +141,17 @@ impl RpcTestEnv { .await .expect("failed to create a pubsub client to RPC server"); - // Allow server threads to initialize. - thread::yield_now(); + // Allow async tasks (event processors) to initialize and start + // listening for notifications. + // We need to ensure the event processor's run() task has been polled + // at least once to start receiving notifications. + // Multiple yield_now calls give the runtime chances to schedule spawned tasks. + for _ in 0..10 { + tokio::task::yield_now().await; + } + // Small additional delay to ensure the select! loop in the event processor + // has started waiting for notifications. + tokio::time::sleep(std::time::Duration::from_millis(10)).await; Self { block: execution.ledger.latest_block().clone(), @@ -232,9 +245,35 @@ impl RpcTestEnv { } } + /// Waits for the RPC slot to advance by at least `count` slots from the current + /// RPC slot value. + /// + /// This is used when the scheduler auto-advances slots and tests need to wait + /// for the BlocksCache to catch up. + pub async fn wait_for_slot_progress(&self, count: u64) { + let initial_slot = + self.rpc.get_slot().await.expect("get_slot request failed"); + let target_slot = initial_slot + count; + + let start = std::time::Instant::now(); + loop { + let slot = + self.rpc.get_slot().await.expect("get_slot request failed"); + if slot >= target_slot { + break; + } + if start.elapsed() > std::time::Duration::from_secs(5) { + panic!( + "Timed out waiting for slot to advance: expected >= {target_slot}, got {slot}" + ); + } + tokio::time::sleep(std::time::Duration::from_millis(1)).await; + } + } + /// Returns the latest slot number from the ledger. pub fn latest_slot(&self) -> Slot { - self.block.load().slot + self.execution.ledger.latest_block().load().slot } /// Creates and executes a generic transaction that modifies a new account. diff --git a/magicblock-aperture/tests/transactions.rs b/magicblock-aperture/tests/transactions.rs index f6d23468d..183227154 100644 --- a/magicblock-aperture/tests/transactions.rs +++ b/magicblock-aperture/tests/transactions.rs @@ -1,4 +1,4 @@ -use std::time::Duration; +use std::time::{Duration, Instant}; use magicblock_accounts_db::traits::AccountsBank; use magicblock_core::link::blocks::BlockHash; @@ -258,6 +258,8 @@ async fn test_simulate_transaction_returns_requested_accounts() { async fn test_simulate_transaction_with_config_options() { let env = RpcTestEnv::new().await; + // Replacing the blockhash invalidates the original signature, so this must + // run with signature verification disabled. // Test `replace_recent_blockhash: true` { let mut transfer_tx = env.build_transfer_txn(); @@ -265,7 +267,7 @@ async fn test_simulate_transaction_with_config_options() { transfer_tx.message.recent_blockhash = bogus_blockhash; let config = RpcSimulateTransactionConfig { - sig_verify: true, + sig_verify: false, replace_recent_blockhash: true, ..Default::default() }; @@ -392,14 +394,26 @@ async fn test_get_signature_statuses() { .await .unwrap(); let sig_nonexistent = Signature::new_unique(); - tokio::time::sleep(Duration::from_millis(10)).await; // Allow propagation - let statuses = env - .rpc - .get_signature_statuses(&[sig_success, sig_fail, sig_nonexistent]) - .await - .expect("get_signature_statuses request failed") - .value; + let start = Instant::now(); + let statuses = loop { + let statuses = env + .rpc + .get_signature_statuses(&[sig_success, sig_fail, sig_nonexistent]) + .await + .expect("get_signature_statuses request failed") + .value; + if statuses.first().and_then(Clone::clone).is_some() + && statuses.get(1).and_then(Clone::clone).is_some() + { + break statuses; + } + assert!( + start.elapsed() < Duration::from_secs(5), + "timed out waiting for signature statuses to propagate" + ); + tokio::time::sleep(Duration::from_millis(10)).await; + }; assert_eq!( statuses.len(), @@ -444,7 +458,6 @@ async fn test_get_signatures_for_address() { async fn test_get_signatures_for_address_pagination() { let env = RpcTestEnv::new().await; let mut signatures = Vec::new(); - env.advance_slots(1); for _ in 0..5 { signatures.push(env.execute_transaction().await); } @@ -488,13 +501,20 @@ async fn test_get_signatures_for_address_pagination() { async fn test_get_transaction() { // Test successful transaction let env = RpcTestEnv::new().await; + let initial_slot = env.latest_slot(); let success_sig = env.execute_transaction().await; let transaction = env .rpc .get_transaction(&success_sig, UiTransactionEncoding::Base64) .await .expect("getTransaction request failed"); - assert_eq!(transaction.slot, env.latest_slot()); + // Transaction should be in a slot >= initial_slot (scheduler may have advanced) + assert!( + transaction.slot >= initial_slot, + "transaction slot {} should be >= initial slot {}", + transaction.slot, + initial_slot + ); assert_eq!(transaction.transaction.meta.unwrap().err, None); // Test failed transaction @@ -505,7 +525,8 @@ async fn test_get_transaction() { .schedule(failing_tx) .await .unwrap(); - tokio::time::sleep(Duration::from_millis(10)).await; + // Wait longer for the transaction to be processed with auto-advancement + tokio::time::sleep(Duration::from_millis(100)).await; let transaction = env .rpc .get_transaction(&fail_sig, UiTransactionEncoding::Base64) diff --git a/magicblock-aperture/tests/websocket.rs b/magicblock-aperture/tests/websocket.rs index c6eac4e82..5689cb8d7 100644 --- a/magicblock-aperture/tests/websocket.rs +++ b/magicblock-aperture/tests/websocket.rs @@ -38,10 +38,27 @@ async fn test_account_subscribe() { notification.value.lamports, RpcTestEnv::INIT_ACCOUNT_BALANCE + amount ); - assert_eq!(notification.context.slot, env.latest_slot()); + // The notification slot should be valid. With auto-advancement, slots + // advance independently, so we just verify the notification has a reasonable slot. + // We check that the slot is not from the distant past or future. + let current_slot = env.latest_slot(); + assert!( + notification.context.slot <= current_slot + 1, + "notification slot {} should be reasonable compared to current slot {}", + notification.context.slot, + current_slot + ); // Unsubscribe and verify no more messages are received. + // With auto-advancement, there may be buffered notifications, so we + // drain any remaining messages with a timeout before checking for closure. unsub().await; + // Drain any buffered notifications that were sent before unsubscription completed + while let Ok(Some(_)) = + timeout(Duration::from_millis(10), stream.next()).await + { + // Drain buffered messages + } let closed = stream.next().await.is_none(); assert!( closed, @@ -206,18 +223,41 @@ async fn test_slot_subscribe() { .expect("failed to subscribe to slots"); let initial_slot = env.latest_slot(); - for i in 1..=3 { - env.advance_slots(1); - let notification = timeout(Duration::from_millis(200), stream.next()) - .await - .expect("timed out waiting for slot notification") - .expect("stream should not be closed"); - - assert_eq!(notification.slot, initial_slot + i); - assert_eq!(notification.parent, initial_slot + i - 1); + // Wait for at least 3 slot notifications from auto-advancement + // Initialize last_slot to allow the first notification to be >= initial_slot + let mut last_slot = initial_slot.saturating_sub(1); + let mut notifications_received = 0; + for _ in 0..10 { + let result = timeout(Duration::from_millis(200), stream.next()).await; + let Ok(Some(notification)) = result else { + // Timed out or stream closed - continue to try more + continue; + }; + + // Verify slot is advancing (not necessarily sequential due to auto-advancement) + assert!(notification.slot > last_slot, "slot should advance"); + assert_eq!(notification.parent, notification.slot - 1); + last_slot = notification.slot; + notifications_received += 1; + + if notifications_received >= 3 { + break; + } } + // Verify we received at least 3 notifications + assert!( + notifications_received >= 3, + "should have received at least 3 slot notifications, got {notifications_received}" + ); + unsub().await; + // Drain any buffered notifications that were sent before unsubscription completed + while let Ok(Some(_)) = + timeout(Duration::from_millis(10), stream.next()).await + { + // Drain buffered messages + } let closed = stream.next().await.is_none(); assert!( closed, diff --git a/magicblock-api/src/lib.rs b/magicblock-api/src/lib.rs index 853019317..6d0db7546 100644 --- a/magicblock-api/src/lib.rs +++ b/magicblock-api/src/lib.rs @@ -5,5 +5,4 @@ mod genesis_utils; pub mod ledger; mod magic_sys_adapter; pub mod magic_validator; -mod slot; mod tickers; diff --git a/magicblock-api/src/magic_validator.rs b/magicblock-api/src/magic_validator.rs index 77cb0a2d2..6f693be87 100644 --- a/magicblock-api/src/magic_validator.rs +++ b/magicblock-api/src/magic_validator.rs @@ -42,7 +42,6 @@ use magicblock_config::{ }; use magicblock_core::{ link::{ - blocks::BlockUpdateTx, link, transactions::{SchedulerMode, TransactionSchedulerHandle}, }, @@ -100,7 +99,6 @@ use crate::{ write_validator_keypair_to_ledger, }, magic_sys_adapter::MagicSysAdapter, - slot::advance_slot_and_update_ledger, tickers::{init_slot_ticker, init_system_metrics_ticker}, }; @@ -129,7 +127,6 @@ pub struct MagicValidator { rpc_handle: thread::JoinHandle<()>, identity: Pubkey, transaction_scheduler: TransactionSchedulerHandle, - block_udpate_tx: BlockUpdateTx, _metrics: (MetricsService, tokio::task::JoinHandle<()>), claim_fees_task: ClaimFeesTask, task_scheduler: Option, @@ -349,6 +346,8 @@ impl MagicValidator { .chainlink .auto_airdrop_lamports > 0, + block_time: config.ledger.block_time, + superblock_size: config.ledger.superblock_size, shutdown: token.clone(), mode_rx, pause_permit: validator_channels.pause_permit, @@ -453,7 +452,6 @@ impl MagicValidator { rpc_handle, identity: validator_pubkey, transaction_scheduler: dispatch.transaction_scheduler, - block_udpate_tx: validator_channels.block_update, task_scheduler: Some(task_scheduler), transaction_execution, replication_handle: None, @@ -619,10 +617,10 @@ impl MagicValidator { } let accountsdb_slot = self.accountsdb.slot(); let ledger_slot = self.ledger.latest_block().load().slot; - // In case if we have a perfect match between accountsdb and ledger slot - // (note: that accountsdb is always 1 slot ahead of ledger), then there's - // no need to run any kind of ledger replay - if accountsdb_slot.saturating_sub(1) == ledger_slot { + + // If we have accountsdb state, which is at least as new as the last state state + // transition in the ledger then there's no need to run any kind of ledger replay + if accountsdb_slot >= ledger_slot { return Ok(()); } @@ -643,7 +641,6 @@ impl MagicValidator { let slot_to_continue_at = process_ledger_result?; log_timing("startup", "ledger_replay", step_start); - self.accountsdb.set_slot(slot_to_continue_at); // The transactions to schedule and accept account commits re-run when we // process the ledger, however we do not want to re-commit them. @@ -658,20 +655,6 @@ impl MagicValidator { "Cleared scheduled commits" ); - // We want the next transaction either due to hydrating of cloned accounts or - // user request to be processed in the next slot such that it doesn't become - // part of the last block found in the existing ledger which would be incorrect. - let step_start = Instant::now(); - let (update_ledger_result, _) = advance_slot_and_update_ledger( - &self.accountsdb, - &self.ledger, - &self.block_udpate_tx, - ); - if let Err(err) = update_ledger_result { - return Err(err.into()); - } - log_timing("startup", "advance_slot_after_replay", step_start); - tracing::Span::current().record("ledger_slot", slot_to_continue_at); info!("Ledger processing complete"); @@ -943,10 +926,9 @@ impl MagicValidator { self.slot_ticker = Some(init_slot_ticker( self.accountsdb.clone(), &self.scheduled_commits_processor, - self.ledger.clone(), + self.ledger.latest_block().clone(), self.config.ledger.block_time, self.transaction_scheduler.clone(), - self.block_udpate_tx.clone(), self.exit.clone(), )); log_timing("startup", "slot_ticker_start", step_start); diff --git a/magicblock-api/src/slot.rs b/magicblock-api/src/slot.rs deleted file mode 100644 index 32c65966a..000000000 --- a/magicblock-api/src/slot.rs +++ /dev/null @@ -1,67 +0,0 @@ -use std::{ - sync::Arc, - time::{SystemTime, UNIX_EPOCH}, -}; - -use magicblock_accounts_db::AccountsDb; -use magicblock_core::link::blocks::{BlockMeta, BlockUpdate, BlockUpdateTx}; -use magicblock_ledger::{errors::LedgerResult, Ledger}; -use solana_program::clock::Slot; -use solana_sha256_hasher::Hasher; - -pub fn advance_slot_and_update_ledger( - accountsdb: &Arc, - ledger: &Ledger, - block_update_tx: &BlockUpdateTx, -) -> (LedgerResult<()>, Slot) { - // This is the latest "confirmed" block, written to the ledger - let latest_block = ledger.latest_block().load(); - // And this is not yet "confirmed" slot, which doesn't have an associated "block" - // same as latest_block.slot + 1, accountsdb is always 1 slot ahead of the ledger; - let current_slot = accountsdb.slot(); - // Determine next blockhash - let blockhash = { - // In the Solana implementation there is a lot of logic going on to determine the next - // blockhash, however we don't really produce any blocks, so any new hash will do. - // Therefore we derive it from the previous hash and the current slot. - let mut hasher = Hasher::default(); - hasher.hash(latest_block.blockhash.as_ref()); - hasher.hash(¤t_slot.to_le_bytes()); - hasher.result() - }; - - // current slot is "finalized", and next slot becomes active - let next_slot = current_slot + 1; - - // Each time we advance the slot, we check if a snapshot should be taken. - // If the current slot is a multiple of the preconfigured snapshot frequency, - // the AccountsDB will enforce a global lock before taking the snapshot. This - // introduces a slight hiccup in transaction execution, which is an unavoidable - // consequence of the need to flush in-memory data to disk, while ensuring no - // writes occur during this operation. With small and CoW databases, this lock - // should not exceed a few milliseconds. - accountsdb.set_slot(next_slot); - - // NOTE: - // As we have a single node network, we have no option but to use the time from host machine - let timestamp = SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap() - // NOTE: since we can tick very frequently, a lot of blocks might have identical timestamps - .as_secs() as i64; - // Update ledger with previous block's meta, this will also notify various - // listeners (like transaction executors) that block has been "produced" - let ledger_result = ledger.write_block(current_slot, timestamp, blockhash); - // also notify downstream subscribers (RPC/Geyser) that block has been produced - let update = BlockUpdate { - hash: blockhash, - meta: BlockMeta { - slot: current_slot, - time: timestamp, - }, - }; - - let _ = block_update_tx.send(update); - - (ledger_result, next_slot) -} diff --git a/magicblock-api/src/tickers.rs b/magicblock-api/src/tickers.rs index 7ea151f8b..30e7e8e9a 100644 --- a/magicblock-api/src/tickers.rs +++ b/magicblock-api/src/tickers.rs @@ -8,9 +8,8 @@ use std::{ use magicblock_accounts::ScheduledCommitsProcessor; use magicblock_accounts_db::{traits::AccountsBank, AccountsDb}; -use magicblock_core::link::{ - blocks::BlockUpdateTx, - transactions::{with_encoded, TransactionSchedulerHandle}, +use magicblock_core::link::transactions::{ + with_encoded, TransactionSchedulerHandle, }; use magicblock_ledger::{LatestBlock, Ledger}; use magicblock_magic_program_api as magic_program; @@ -20,40 +19,20 @@ use solana_account::ReadableAccount; use tokio_util::sync::CancellationToken; use tracing::*; -use crate::slot::advance_slot_and_update_ledger; - pub fn init_slot_ticker( accountsdb: Arc, committor_processor: &Option>, - ledger: Arc, + latest_block: LatestBlock, tick_duration: Duration, transaction_scheduler: TransactionSchedulerHandle, - block_updates_tx: BlockUpdateTx, exit: Arc, ) -> tokio::task::JoinHandle<()> { let committor_processor = committor_processor.clone(); - let latest_block = ledger.latest_block().clone(); tokio::task::spawn(async move { - let log = tick_duration >= Duration::from_secs(5); while !exit.load(Ordering::Relaxed) { tokio::time::sleep(tick_duration).await; - let (update_ledger_result, next_slot) = - advance_slot_and_update_ledger( - &accountsdb, - &ledger, - &block_updates_tx, - ); - if let Err(err) = update_ledger_result { - error!(error = ?err, "Failed to write block"); - } - - if log { - debug!(slot = next_slot, "Advanced to slot"); - } - metrics::inc_slot(); - // Handle intents if such feature enabled let Some(committor_processor) = &committor_processor else { continue; @@ -71,11 +50,7 @@ pub fn init_slot_ticker( ) .await; } - if log { - debug!(slot = next_slot, "Advanced to slot"); - } } - metrics::inc_slot(); }) } diff --git a/magicblock-config/src/config/ledger.rs b/magicblock-config/src/config/ledger.rs index b81987f76..6022eaa49 100644 --- a/magicblock-config/src/config/ledger.rs +++ b/magicblock-config/src/config/ledger.rs @@ -13,6 +13,10 @@ pub struct LedgerConfig { #[serde(with = "humantime")] pub block_time: Duration, + /// The number of slots that must elapse before + /// the accountsdb snapshot/checksum is taken + pub superblock_size: u64, + /// If true, the existing ledger database will be wiped on startup. /// Useful for ephemeral or testing environments. pub reset: bool, @@ -39,6 +43,8 @@ impl Default for LedgerConfig { block_time: Duration::from_millis( consts::DEFAULT_LEDGER_BLOCK_TIME_MS, ), + + superblock_size: consts::DEFAULT_SUPERBLOCK_SIZE, reset: false, verify_keypair: true, size: consts::DEFAULT_LEDGER_SIZE, diff --git a/magicblock-config/src/consts.rs b/magicblock-config/src/consts.rs index ac8696719..705a60060 100644 --- a/magicblock-config/src/consts.rs +++ b/magicblock-config/src/consts.rs @@ -54,6 +54,9 @@ pub const DEFAULT_LEDGER_BLOCK_TIME_MS: u64 = 50; /// Default ledger size (100 GB) pub const DEFAULT_LEDGER_SIZE: u64 = 100 * 1024 * 1024 * 1024; +/// Default superblock size (72K ~ 1 hour with 50ms block time) +pub const DEFAULT_SUPERBLOCK_SIZE: u64 = 3600 * 20; + /// Metrics Defaults /// Default address for the metrics endpoint (Prometheus format) pub const DEFAULT_METRICS_ADDR: &str = "0.0.0.0:9000"; diff --git a/magicblock-core/src/link.rs b/magicblock-core/src/link.rs index e935ba23e..058b637f3 100644 --- a/magicblock-core/src/link.rs +++ b/magicblock-core/src/link.rs @@ -1,7 +1,6 @@ use std::sync::Arc; use accounts::{AccountUpdateRx, AccountUpdateTx}; -use blocks::{BlockUpdateRx, BlockUpdateTx}; use tokio::sync::{ mpsc::{self, Receiver, Sender}, Semaphore, @@ -33,8 +32,6 @@ pub struct DispatchEndpoints { pub transaction_scheduler: TransactionSchedulerHandle, /// Receives notifications about account state changes from the executor. pub account_update: AccountUpdateRx, - /// Receives notifications when a new block is produced. - pub block_update: BlockUpdateRx, /// Receives scheduled (crank) tasks from transactions executor. pub tasks_service: Option, /// Receives replication events from the transaction scheduler. @@ -53,8 +50,6 @@ pub struct ValidatorChannelEndpoints { pub transaction_to_process: TransactionToProcessRx, /// Sends notifications about account state changes to the pool of EventProccessor workers. pub account_update: AccountUpdateTx, - /// Sends notifications when a new block is produced to the pool of EventProcessor workers. - pub block_update: BlockUpdateTx, /// Sends scheduled (crank) tasks to tasks service from transactions executor. pub tasks_service: ScheduledTasksTx, /// Sends replication events to the replication service. @@ -75,7 +70,6 @@ pub fn link() -> (DispatchEndpoints, ValidatorChannelEndpoints) { // Unbounded channels for high-throughput multicast where backpressure is not desired. let (transaction_status_tx, transaction_status_rx) = flume::unbounded(); let (account_update_tx, account_update_rx) = flume::unbounded(); - let (block_update_tx, block_update_rx) = flume::unbounded(); let (tasks_tx, tasks_rx) = mpsc::unbounded_channel(); // Bounded channels for command queues where applying backpressure is important. @@ -94,7 +88,6 @@ pub fn link() -> (DispatchEndpoints, ValidatorChannelEndpoints) { transaction_scheduler, transaction_status: transaction_status_rx, account_update: account_update_rx, - block_update: block_update_rx, tasks_service: Some(tasks_rx), replication_messages: Some(replication_rx), }; @@ -104,7 +97,6 @@ pub fn link() -> (DispatchEndpoints, ValidatorChannelEndpoints) { transaction_to_process: txn_to_process_rx, transaction_status: transaction_status_tx, account_update: account_update_tx, - block_update: block_update_tx, tasks_service: tasks_tx, replication_messages: replication_tx, pause_permit, diff --git a/magicblock-core/src/link/blocks.rs b/magicblock-core/src/link/blocks.rs index c847221da..559e16725 100644 --- a/magicblock-core/src/link/blocks.rs +++ b/magicblock-core/src/link/blocks.rs @@ -1,35 +1,10 @@ -use flume::{Receiver as MpmcReceiver, Sender as MpmcSender}; use solana_hash::Hash; - -use crate::Slot; +use tokio::sync::broadcast; /// A type alias for the cryptographic hash of a block. pub type BlockHash = Hash; -/// The receiving end of the channel for new block notifications. -pub type BlockUpdateRx = MpmcReceiver; -/// The sending end of the channel for new block notifications. -pub type BlockUpdateTx = MpmcSender; - -/// A type alias for a block's production timestamp, a Unix timestamp. -pub type BlockTime = i64; - -/// A message representing a new block produced by the validator. -/// -/// This is the primary message type sent over the block update channel to notify -/// listeners of new blocks. -#[derive(Default)] -pub struct BlockUpdate { - /// The metadata associated with the block. - pub meta: BlockMeta, - /// The unique hash of the block. - pub hash: BlockHash, -} -/// A collection of metadata associated with a block. -#[derive(Default, Clone, Copy)] -pub struct BlockMeta { - /// The slot number in which the block was produced. - pub slot: Slot, - /// The timestamp of the block's production. - pub time: BlockTime, -} +/// A receiver for block update notifications. +/// Typically instantiated as `BlockUpdateRx` where the payload +/// contains the latest block data (slot, blockhash, timestamp). +pub type BlockUpdateRx = broadcast::Receiver; diff --git a/magicblock-core/src/link/replication.rs b/magicblock-core/src/link/replication.rs index 031b29d3e..22afc579d 100644 --- a/magicblock-core/src/link/replication.rs +++ b/magicblock-core/src/link/replication.rs @@ -29,6 +29,13 @@ pub enum Message { } impl Message { + pub fn kind(&self) -> &'static str { + match self { + Self::Transaction(_) => "transaction", + Self::Block(_) => "block", + Self::SuperBlock(_) => "superblock", + } + } /// Returns the (slot, index) position of this message. /// Block and SuperBlock messages use sentinel index values. pub fn slot_and_index(&self) -> (Slot, TransactionIndex) { diff --git a/magicblock-ledger/src/blockstore_processor/mod.rs b/magicblock-ledger/src/blockstore_processor/mod.rs index 7782c7133..37c4b9ebd 100644 --- a/magicblock-ledger/src/blockstore_processor/mod.rs +++ b/magicblock-ledger/src/blockstore_processor/mod.rs @@ -8,7 +8,7 @@ use solana_clock::{Slot, UnixTimestamp}; use solana_hash::Hash; use solana_transaction::versioned::VersionedTransaction; use solana_transaction_status::VersionedConfirmedBlock; -use tracing::{instrument, Level, *}; +use tracing::{Level, *}; use crate::{ errors::{LedgerError, LedgerResult}, diff --git a/magicblock-ledger/src/lib.rs b/magicblock-ledger/src/lib.rs index 41e584baf..35c747234 100644 --- a/magicblock-ledger/src/lib.rs +++ b/magicblock-ledger/src/lib.rs @@ -33,7 +33,7 @@ pub struct LatestBlock { /// Notification mechanism to signal that the block has been modified, /// the actual state is not sent via channel, as it can be accessed any /// time with `load` method, only the fact of production is communicated - notifier: broadcast::Sender<()>, + notifier: broadcast::Sender, } impl LatestBlockInner { @@ -53,9 +53,7 @@ impl LatestBlockInner { impl Default for LatestBlock { fn default() -> Self { - // 1 is just enough number of notifications to keep around, in order to cover - // cases when a subscriber might not be listening when broadcast is triggered - let (notifier, _) = broadcast::channel(1); + let (notifier, _) = broadcast::channel(32); let inner = Default::default(); Self { inner, notifier } } @@ -72,15 +70,15 @@ impl LatestBlock { /// This is the "writer" method for the single-writer, multi-reader pattern. pub fn store(&self, slot: u64, blockhash: Hash, timestamp: i64) { let block = LatestBlockInner::new(slot, blockhash, timestamp); - self.inner.store(block.into()); + self.inner.store(block.clone().into()); // Broadcast the update. It's okay if there are no active listeners. - let _ = self.notifier.send(()); + let _ = self.notifier.send(block); } /// Creates a new receiver to listen for block updates. /// Each receiver created via this method will be notified when `store` is called. /// This allows multiple components to react to new blocks concurrently. - pub fn subscribe(&self) -> broadcast::Receiver<()> { + pub fn subscribe(&self) -> broadcast::Receiver { self.notifier.subscribe() } } diff --git a/magicblock-metrics/src/metrics/mod.rs b/magicblock-metrics/src/metrics/mod.rs index f776e1447..296dd3f6c 100644 --- a/magicblock-metrics/src/metrics/mod.rs +++ b/magicblock-metrics/src/metrics/mod.rs @@ -35,8 +35,8 @@ const SECONDS_1_9: [f64; 9] = [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]; lazy_static::lazy_static! { pub (crate) static ref REGISTRY: Registry = Registry::new_custom(Some("mbv".to_string()), None).unwrap(); - static ref SLOT_COUNT: IntCounter = IntCounter::new( - "slot_count", "Slot Count", + static ref SLOT_GAUGE: IntGauge = IntGauge::new( + "slot_gauge", "Validator slot" ).unwrap(); // Needs to be a gauge so we can set it directly @@ -561,7 +561,7 @@ pub(crate) fn register() { .expect("collector can't be registered"); }; } - register!(SLOT_COUNT); + register!(SLOT_GAUGE); register!(CHAIN_SLOT_GAUGE); register!(CACHED_CLONE_OUTPUTS_COUNT); register!(LEDGER_SIZE_GAUGE); @@ -636,8 +636,8 @@ pub(crate) fn register() { }); } -pub fn inc_slot() { - SLOT_COUNT.inc(); +pub fn set_slot(slot: u64) { + SLOT_GAUGE.set(slot as i64); } pub fn set_chain_slot(value: u64) { diff --git a/magicblock-processor/Cargo.toml b/magicblock-processor/Cargo.toml index 4bb50e9d6..b3a467b04 100644 --- a/magicblock-processor/Cargo.toml +++ b/magicblock-processor/Cargo.toml @@ -9,6 +9,7 @@ edition.workspace = true [dependencies] bincode = { workspace = true } +blake3 = { workspace = true } tracing = { workspace = true } parking_lot = { workspace = true } serde = { workspace = true } diff --git a/magicblock-processor/src/executor/mod.rs b/magicblock-processor/src/executor/mod.rs index 8562f8f9d..6efc8181a 100644 --- a/magicblock-processor/src/executor/mod.rs +++ b/magicblock-processor/src/executor/mod.rs @@ -28,7 +28,10 @@ use solana_svm::transaction_processor::{ use solana_transaction::sanitized::SanitizedTransaction; use tokio::{ runtime::Builder, - sync::mpsc::{Receiver, Sender}, + sync::{ + mpsc::{Receiver, Sender}, + Semaphore, + }, }; use tracing::{info, instrument, warn}; @@ -61,6 +64,7 @@ pub(super) struct TransactionExecutor { ledger: Arc, block: LatestBlock, block_history: BTreeMap, + execution_permits: Arc, // SVM Components processor: TransactionBatchProcessor, @@ -84,6 +88,7 @@ impl TransactionExecutor { state: &TransactionSchedulerState, rx: Receiver, ready_tx: Sender, + execution_permits: Arc, programs_cache: Arc>>, ) -> Self { let slot = state.accountsdb.slot(); @@ -119,6 +124,7 @@ impl TransactionExecutor { config, block: block.clone(), environment: state.environment.clone(), + execution_permits, rx, ready_tx, accounts_tx: state.account_update_tx.clone(), @@ -173,6 +179,7 @@ impl TransactionExecutor { if transaction.slot != self.processor.slot { self.transition_to_slot(transaction.slot); } + let _permit = self.execution_permits.acquire().await; match transaction.txn.mode { TransactionProcessingMode::Execution(_) => { self.execute(transaction, None); @@ -186,8 +193,8 @@ impl TransactionExecutor { } let _ = self.ready_tx.try_send(self.id); } - _ = block_updated.recv() => { - self.register_new_block(); + Ok(latest) = block_updated.recv() => { + self.register_new_block(latest); } else => break, } @@ -195,8 +202,7 @@ impl TransactionExecutor { info!("Executor terminated"); } - fn register_new_block(&mut self) { - let block = LatestBlockInner::clone(&*self.block.load()); + fn register_new_block(&mut self, block: LatestBlockInner) { while self.block_history.len() >= BLOCK_HISTORY_SIZE { self.block_history.pop_first(); } @@ -204,13 +210,15 @@ impl TransactionExecutor { } fn transition_to_slot(&mut self, slot: Slot) { - let Some(block) = self.block_history.get(&slot) else { + // transactions execute in the latest finalized block + 1 + let prev_slot = slot.saturating_sub(1); + let Some(block) = self.block_history.get(&prev_slot) else { // should never happen in practice warn!(slot, "tried to transition to slot which wasn't registered"); return; }; self.environment.blockhash = block.blockhash; - self.processor.slot = block.slot; + self.processor.slot = slot; self.set_sysvars(block); } diff --git a/magicblock-processor/src/scheduler/coordinator.rs b/magicblock-processor/src/scheduler/coordinator.rs index c2b4a2437..5958db721 100644 --- a/magicblock-processor/src/scheduler/coordinator.rs +++ b/magicblock-processor/src/scheduler/coordinator.rs @@ -20,13 +20,14 @@ //! - `TransactionId`: Monotonic ID for FIFO queue ordering //! - `BinaryHeap`: Min-heap ordered by transaction ID -use std::{cmp::Ordering, collections::BinaryHeap}; +use std::{cmp::Ordering, collections::BinaryHeap, sync::Arc}; use magicblock_core::{ coordination_mode, link::transactions::{ProcessableTransaction, TransactionProcessingMode}, }; use magicblock_metrics::metrics::MAX_LOCK_CONTENTION_QUEUE_SIZE; +use tokio::sync::{OwnedSemaphorePermit, Semaphore}; use tracing::{error, warn}; use super::locks::{ @@ -59,6 +60,9 @@ pub(super) struct ExecutionCoordinator { locks: LocksCache, /// Current coordination mode (Primary or Replica) mode: CoordinationMode, + /// Semaphore for counting the number of busy executors + /// (i.e. ones executing transaction at any given moment) + execution_permits: Arc, } /// Coordination mode determining how transactions are scheduled. @@ -115,7 +119,7 @@ impl ExecutionCoordinator { /// /// Starts in StartingUp mode to allow ledger replay before switching to Primary /// or Replica. - pub(super) fn new(count: usize) -> Self { + pub(super) fn new(count: usize, execution_permits: Arc) -> Self { Self { blocked_transactions: (0..count) .map(|_| BinaryHeap::new()) @@ -124,6 +128,7 @@ impl ExecutionCoordinator { ready_executors: (0..count as u32).collect(), locks: LocksCache::default(), mode: CoordinationMode::StartingUp(ReplicaMode::default()), + execution_permits, } } @@ -316,7 +321,7 @@ impl ExecutionCoordinator { } /// Check whether the node is acting as an event source for replication - pub(super) fn should_replicate(&self) -> bool { + pub(super) fn is_primary(&self) -> bool { matches!(self.mode, CoordinationMode::Primary(_)) } @@ -328,6 +333,22 @@ impl ExecutionCoordinator { pub(super) fn is_idle(&self) -> bool { self.ready_executors.len() == self.blocked_transactions.len() } + + /// Waits until all executors are idle, then returns a permit that keeps them paused. + /// + /// This acquires all execution permits, blocking until every executor finishes + /// its current transaction. While holding the returned permit, no executors can + /// start new transactions. + /// + /// Use this for operations that require exclusive access to `AccountsDb`, + /// such as taking snapshots or computing checksums. + pub(super) async fn wait_for_idle(&self) -> OwnedSemaphorePermit { + self.execution_permits + .clone() + .acquire_many_owned(self.blocked_transactions.len() as u32) + .await + .expect("semaphore can never be closed") + } } /// Transaction wrapped with a monotonic ID for FIFO queue ordering. diff --git a/magicblock-processor/src/scheduler/mod.rs b/magicblock-processor/src/scheduler/mod.rs index 1fd4a917b..e43131f17 100644 --- a/magicblock-processor/src/scheduler/mod.rs +++ b/magicblock-processor/src/scheduler/mod.rs @@ -6,18 +6,56 @@ //! - Spawns N `TransactionExecutor` workers (one per OS thread) //! - Dispatches transactions using account locking to prevent conflicts //! - Multiplexes between: new transactions, executor readiness, and slot transitions +//! +//! # Streaming Blockhash Algorithm +//! +//! Block hashes are computed incrementally as transactions are processed, rather than +//! waiting until the end of a slot. This enables: +//! +//! - **Early detection of hash divergence** between primary and replica nodes +//! - **Reduced latency** for block finalization +//! +//! The algorithm works as follows: +//! 1. At slot boundary, the hasher is reset and seeded with the previous blockhash +//! 2. Each transaction signature is hashed into the stream as it's scheduled +//! 3. At slot completion, the accumulated hash becomes the new blockhash +//! +//! # Primary vs Replica Responsibilities +//! +//! ## Primary Mode +//! +//! - Acts as the clock source, driving slot transitions via `slot_ticker` +//! - Computes blockhash and broadcasts it via replication channel +//! - Writes completed blocks to the ledger +//! +//! ## Replica Mode +//! +//! - Waits for block production notifications from replication service +//! - Verifies computed blockhash matches the received blockhash +//! - Logs divergence errors (does not halt execution) +//! +//! # Slot Transition Lifecycle +//! +//! 1. **Finalize current block** - compute blockhash, persist to ledger +//! 2. **Broadcast block** - send to replication channel (primary only) +//! 3. **Reset hasher** - seed with previous blockhash for next slot +//! 4. **Advance slot** - increment slot number, reset transaction index +//! 5. **Update program cache** - prune stale programs, re-root to new slot +//! 6. **Update sysvars** - Clock and SlotHashes accounts use std::{ sync::{Arc, RwLock}, thread::JoinHandle, + time::{SystemTime, UNIX_EPOCH}, }; +use blake3::Hasher; use coordinator::{ExecutionCoordinator, TransactionWithId}; use locks::{ExecutorId, MAX_SVM_EXECUTORS}; use magicblock_accounts_db::{traits::AccountsBank, AccountsDb}; use magicblock_core::{ link::{ - replication::{self, Message}, + replication::{self, Message, SuperBlock}, transactions::{ ProcessableTransaction, SchedulerMode, TransactionProcessingMode, TransactionToProcessRx, @@ -25,9 +63,10 @@ use magicblock_core::{ }, Slot, }; -use magicblock_ledger::LatestBlock; +use magicblock_ledger::{LatestBlock, LatestBlockInner, Ledger}; +use magicblock_metrics::metrics; use solana_account::{from_account, to_account}; -use solana_program::slot_hashes::SlotHashes; +use solana_program::{clock::Clock, hash::Hash, slot_hashes::SlotHashes}; use solana_program_runtime::loaded_programs::ProgramCache; use solana_sdk_ids::sysvar::{clock, slot_hashes}; use state::TransactionSchedulerState; @@ -37,6 +76,7 @@ use tokio::{ mpsc::{channel, Receiver, Sender}, OwnedSemaphorePermit, Semaphore, }, + time::{interval, Interval}, }; use tokio_util::sync::CancellationToken; use tracing::{error, info, instrument, warn}; @@ -64,6 +104,8 @@ pub struct TransactionScheduler { program_cache: Arc>>, /// Accounts database (for sysvar updates on slot transition) accountsdb: Arc, + /// Global transactions ledger + ledger: Arc, /// Latest block metadata (slot, clock, blockhash) latest_block: LatestBlock, /// Global shutdown signal. @@ -75,6 +117,12 @@ pub struct TransactionScheduler { /// Semaphore for coordinating exclusive DB access with external callers. /// Scheduler acquires permit when scheduling, releases when idle. pause_permit: Arc, + /// Streaming blockhash state + hasher: Hasher, + /// Time interval between consecutive slots + slot_ticker: Interval, + /// Number of slots to elapse, before we perform snapshot/checksum operations + superblock_size: u64, /// Current Slot that scheduler is operating on (clock value) slot: Slot, /// Current transaction index included into the block under assembly @@ -95,6 +143,7 @@ impl TransactionScheduler { let program_cache = state.prepare_programs_cache(); state.prepare_sysvars(); + let execution_permits = Arc::new(Semaphore::new(count)); for id in 0..count { let (transactions_tx, transactions_rx) = channel(EXECUTOR_QUEUE_CAPACITY); @@ -103,6 +152,7 @@ impl TransactionScheduler { &state, transactions_rx, ready_tx.clone(), + execution_permits.clone(), program_cache.clone(), ); executor.populate_builtins(); @@ -110,19 +160,29 @@ impl TransactionScheduler { executors.push(transactions_tx); } + let mut hasher = Hasher::new(); + let slot_ticker = interval(state.block_time); + let latest_block = state.ledger.latest_block().clone(); + hasher.update(latest_block.load().blockhash.as_ref()); + let slot = state.accountsdb.slot() + 1; + Self { - coordinator: ExecutionCoordinator::new(count), + coordinator: ExecutionCoordinator::new(count, execution_permits), transactions_rx: state.txn_to_process_rx, ready_rx, executors, - latest_block: state.ledger.latest_block().clone(), + latest_block, + ledger: state.ledger, program_cache, accountsdb: state.accountsdb, shutdown: state.shutdown, mode_rx: state.mode_rx, replication_tx: state.replication_tx, pause_permit: state.pause_permit, - slot: state.ledger.latest_block().load().slot, + superblock_size: state.superblock_size, + hasher, + slot_ticker, + slot, index: 0, } } @@ -156,7 +216,17 @@ impl TransactionScheduler { loop { tokio::select! { biased; - Ok(()) = block_produced.recv() => self.transition_to_new_slot(), + Ok(latest) = block_produced.recv() => { + if !self.coordinator.is_primary() { + self.transition_to_new_slot(Some(latest)).await; + } + } + _ = self.slot_ticker.tick() => { + if self.coordinator.is_primary() { + let slot = self.transition_to_new_slot(None).await; + self.handle_superblock(slot).await; + } + } Some(executor) = self.ready_rx.recv() => { self.handle_ready_executor(executor).await; // Release permit when idle: no active work, safe for external access @@ -188,6 +258,41 @@ impl TransactionScheduler { info!("Scheduler terminated"); } + /// Sends a replication message, logging any errors. + async fn send_replication(&self, msg: Message) { + if self.replication_tx.is_closed() { + return; + } + let kind = msg.kind(); + if let Err(error) = self.replication_tx.send(msg).await { + error!( + %error, + kind, + slot = self.slot, + index = self.index, + "replication send failed" + ); + } + } + + async fn handle_superblock(&self, slot: Slot) { + if !slot.is_multiple_of(self.superblock_size) { + return; + } + + // Make sure all executors are idle (no state transitions are happening) + let _guard = self.coordinator.wait_for_idle().await; + // SAFETY: + // we have made sure that no state transitions are in progress via _guard + let Ok(checksum) = (unsafe { self.accountsdb.take_snapshot(slot) }) + else { + error!("failed to create accountsdb snapshot"); + return; + }; + let msg = Message::SuperBlock(SuperBlock { slot, checksum }); + self.send_replication(msg).await; + } + async fn handle_ready_executor(&mut self, executor: ExecutorId) { self.coordinator.unlock_accounts(executor); self.reschedule_blocked_transactions(executor).await; @@ -254,23 +359,27 @@ impl TransactionScheduler { Ok(txn) => txn, Err(blocker) => return Some(blocker), }; - let (slot, index) = - if let TransactionProcessingMode::Replay(ctx) = txn.mode { - (ctx.slot, ctx.index) - } else { + let mut is_execution = false; + let (slot, index) = match txn.mode { + TransactionProcessingMode::Replay(ctx) => (ctx.slot, ctx.index), + TransactionProcessingMode::Simulation(_) => (self.slot, 0), + TransactionProcessingMode::Execution(_) => { + is_execution = true; let index = self.index; + // we only advance the index if we are executing (primary mode) self.index += 1; (self.slot, index) - }; + } + }; + if is_execution { + self.hasher.update(txn.transaction.signature().as_ref()); + } let msg = txn .encoded .as_ref() .cloned() - .filter(|_| { - matches!(txn.mode, TransactionProcessingMode::Execution(_)) - && self.coordinator.should_replicate() - }) + .filter(|_| is_execution && self.coordinator.is_primary()) .map(|payload| { Message::Transaction(replication::Transaction { index, @@ -284,43 +393,154 @@ impl TransactionScheduler { |error| error!(executor, %error, "Executor channel send failed"), ).is_ok(); if let Some(msg) = msg.filter(|_| sent) { - let _ = self.replication_tx.send(msg).await.inspect_err( - |error| error!(executor, %error, "Replication channel send failed"), - ); + self.send_replication(msg).await; } None } - fn transition_to_new_slot(&mut self) { - let block = self.latest_block.load(); - let mut cache = self.program_cache.write().unwrap(); - self.slot = block.slot; + /// Transitions to the next slot, finalizing the current block. + /// + /// In primary mode, this drives the slot transition clock and broadcasts + /// the new block. In replica mode, this responds to block notifications. + async fn transition_to_new_slot( + &mut self, + block: Option, + ) -> u64 { + let block = self.prepare_block(block).await; + self.finalize_block(&block).await; + self.update_sysvars(&block); + metrics::set_slot(block.slot); + block.slot + } + + /// Prepares the block for the current slot. + /// + /// In primary mode: computes blockhash, creates block from local state. + /// In replica mode: uses the block from the replication stream, verifying hash. + async fn prepare_block( + &mut self, + block: Option, + ) -> LatestBlockInner { + if let Some(block) = block { + self.verify_block_as_replica(&block); + block + } else { + self.prepare_block_as_primary().await + } + } + + /// Prepares block as primary: computes blockhash and broadcasts to replicas. + async fn prepare_block_as_primary(&mut self) -> LatestBlockInner { + let blockhash: [u8; 32] = *self.hasher.finalize().as_bytes(); + // NOTE: + // As we have a single node network, we have no + // option but to use the time from host machine + let unix_timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + // NOTE: since we can tick very frequently, a lot + // of blocks might have identical timestamps + .as_secs() as i64; + let clock = Clock { + slot: self.slot + 1, + unix_timestamp, + ..Default::default() + }; + let block = LatestBlockInner { + slot: self.slot, + blockhash: blockhash.into(), + clock, + }; + let msg = Message::Block(replication::Block { + slot: block.slot, + hash: block.blockhash, + timestamp: block.clock.unix_timestamp, + }); + self.send_replication(msg).await; + block + } + + /// Checks that the blockhash received from replication stream matches the local + fn verify_block_as_replica(&self, block: &LatestBlockInner) { + if block.blockhash.as_ref() != self.hasher.finalize().as_bytes() { + // TODO(bmuddha): + // this should never happen, and it's unclear how + // to recover from it, right now the log is used + // for debugging purposes only + // NOTE: + // This error will always be logged once + // when replica starts up with an empty ledger + error!( + slot = block.slot, + "replication blockhash has diverged from local" + ) + } + } + + /// Finalizes the block: persists to ledger and updates accountsdb slot. + async fn finalize_block(&self, block: &LatestBlockInner) { + if self.coordinator.is_primary() { + let _ = self + .ledger + .write_block( + block.slot, + block.clock.unix_timestamp, + block.blockhash, + ) + .inspect_err(|error| { + error!(%error, "failed to write block to the ledger") + }); + } + self.accountsdb.set_slot(block.slot); + } + + /// Updates sysvars and program cache for the new slot. + /// + /// This must be called after block finalization to prepare state for the next slot. + fn update_sysvars(&mut self, block: &LatestBlockInner) { + // Reset hasher and seed with previous blockhash for next slot + self.hasher.reset(); + self.hasher.update(block.blockhash.as_ref()); + + // Advance slot and reset transaction index + self.slot = block.clock.slot; self.index = 0; + self.update_program_cache(block.slot); + self.update_clock_sysvar(&block.clock); + self.update_slot_hashes_sysvar(block.slot, &block.blockhash); + } + + /// Updates the program cache for the new slot. + fn update_program_cache(&mut self, slot: Slot) { + let mut cache = self.program_cache.write().unwrap(); // Prune stale programs and re-root to new slot - cache.prune(block.slot, 0); - cache.latest_root_slot = block.slot; + cache.prune(slot, 0); + // Release lock before syscall lookup (prevents deadlock if sysvar is accessed) + drop(cache); + } - // Update Clock sysvar + /// Updates the Clock sysvar account. + fn update_clock_sysvar(&self, clock: &Clock) { if let Some(mut account) = self.accountsdb.get_account(&clock::ID) { - let _ = account.serialize_data(&block.clock); + let _ = account.serialize_data(clock); let _ = self.accountsdb.insert_account(&clock::ID, &account); } + } - // Update SlotHashes sysvar + /// Updates the SlotHashes sysvar account with the new slot/blockhash pair. + fn update_slot_hashes_sysvar(&self, slot: Slot, blockhash: &Hash) { if let Some(mut acc) = self.accountsdb.get_account(&slot_hashes::ID) { let Some(mut hashes) = from_account::(&acc) else { warn!("failed to read slot hashes from account"); return; }; - hashes.add(block.slot, block.blockhash); + hashes.add(slot, *blockhash); if to_account(&hashes, &mut acc).is_none() { warn!("failed to write slot hashes to account"); } let _ = self.accountsdb.insert_account(&slot_hashes::ID, &acc); } - // Release lock before syscall lookup (prevents deadlock if sysvar is accessed) - drop(cache); } } diff --git a/magicblock-processor/src/scheduler/state.rs b/magicblock-processor/src/scheduler/state.rs index 1cbd4e675..1d71ebc04 100644 --- a/magicblock-processor/src/scheduler/state.rs +++ b/magicblock-processor/src/scheduler/state.rs @@ -4,7 +4,10 @@ //! `TransactionSchedulerState` is constructed externally and passed to //! `TransactionScheduler::new()` for initialization. -use std::sync::{Arc, OnceLock, RwLock}; +use std::{ + sync::{Arc, OnceLock, RwLock}, + time::Duration, +}; use magicblock_accounts_db::{traits::AccountsBank, AccountsDb}; use magicblock_core::link::{ @@ -63,6 +66,9 @@ pub struct TransactionSchedulerState { // === Configuration === pub is_auto_airdrop_lamports_enabled: bool, pub shutdown: CancellationToken, + pub block_time: Duration, + pub superblock_size: u64, + /// Receives mode transition commands (Primary or Replica) at runtime. pub mode_rx: Receiver, } diff --git a/magicblock-processor/src/scheduler/tests.rs b/magicblock-processor/src/scheduler/tests.rs index 6e508995c..aec709992 100644 --- a/magicblock-processor/src/scheduler/tests.rs +++ b/magicblock-processor/src/scheduler/tests.rs @@ -7,6 +7,8 @@ //! - Coordination modes (Primary/Replica) //! - Edge cases (empty transactions, duplicate accounts) +use std::sync::Arc; + use magicblock_core::link::transactions::{ ProcessableTransaction, ReplayPosition, SanitizeableTransaction, TransactionProcessingMode, @@ -19,9 +21,16 @@ use solana_program::{ use solana_pubkey::Pubkey; use solana_signer::Signer; use solana_transaction::Transaction; +use tokio::sync::Semaphore; use super::coordinator::{ExecutionCoordinator, TransactionWithId}; +/// Creates a coordinator for testing with the required execution permits semaphore. +fn new_coordinator(count: usize) -> ExecutionCoordinator { + let execution_permits = Arc::new(Semaphore::new(count)); + ExecutionCoordinator::new(count, execution_permits) +} + /// Creates a mock transaction with the specified accounts and processing mode. fn mock_txn_with_mode( accounts: &[(Pubkey, bool)], @@ -73,7 +82,7 @@ fn mock_replay_txn( #[test] fn write_blocks_write() { - let mut c = ExecutionCoordinator::new(2); + let mut c = new_coordinator(2); let acc = Pubkey::new_unique(); let e0 = c.get_ready_executor().unwrap(); @@ -85,7 +94,7 @@ fn write_blocks_write() { #[test] fn write_blocks_read() { - let mut c = ExecutionCoordinator::new(2); + let mut c = new_coordinator(2); let acc = Pubkey::new_unique(); let e0 = c.get_ready_executor().unwrap(); @@ -100,7 +109,7 @@ fn write_blocks_read() { #[test] fn read_blocks_write() { - let mut c = ExecutionCoordinator::new(2); + let mut c = new_coordinator(2); let acc = Pubkey::new_unique(); let e0 = c.get_ready_executor().unwrap(); @@ -112,7 +121,7 @@ fn read_blocks_write() { #[test] fn multiple_readers_allowed() { - let mut c = ExecutionCoordinator::new(3); + let mut c = new_coordinator(3); let acc = Pubkey::new_unique(); let e0 = c.get_ready_executor().unwrap(); @@ -130,7 +139,7 @@ fn multiple_readers_allowed() { #[test] fn partial_locks_released_on_failure() { - let mut c = ExecutionCoordinator::new(2); + let mut c = new_coordinator(2); let (a, b) = (Pubkey::new_unique(), Pubkey::new_unique()); let e0 = c.get_ready_executor().unwrap(); @@ -156,7 +165,7 @@ fn partial_locks_released_on_failure() { #[test] fn blocked_transactions_dequeued_in_fifo_order() { - let mut c = ExecutionCoordinator::new(4); + let mut c = new_coordinator(4); // Switch to primary mode for tests that need multiple blocked transactions c.switch_to_primary_mode(); let acc = Pubkey::new_unique(); @@ -197,7 +206,7 @@ fn blocked_transactions_dequeued_in_fifo_order() { #[test] fn blocked_transaction_releases_executor() { - let mut c = ExecutionCoordinator::new(2); + let mut c = new_coordinator(2); let acc = Pubkey::new_unique(); let e0 = c.get_ready_executor().unwrap(); @@ -217,7 +226,7 @@ fn blocked_transaction_releases_executor() { #[test] fn unlock_allows_blocked_transaction_to_proceed() { - let mut c = ExecutionCoordinator::new(2); + let mut c = new_coordinator(2); let acc = Pubkey::new_unique(); let e0 = c.get_ready_executor().unwrap(); @@ -237,7 +246,7 @@ fn unlock_allows_blocked_transaction_to_proceed() { #[test] fn transaction_requeued_to_different_executor_keeps_id() { - let mut c = ExecutionCoordinator::new(3); + let mut c = new_coordinator(3); let (a, b) = (Pubkey::new_unique(), Pubkey::new_unique()); let e0 = c.get_ready_executor().unwrap(); @@ -272,7 +281,7 @@ fn transaction_requeued_to_different_executor_keeps_id() { #[test] fn empty_transaction_always_succeeds() { - let mut c = ExecutionCoordinator::new(1); + let mut c = new_coordinator(1); let e = c.get_ready_executor().unwrap(); assert!(c.try_schedule(e, mock_txn(&[])).is_ok()); } @@ -280,7 +289,7 @@ fn empty_transaction_always_succeeds() { #[test] fn transaction_with_duplicate_accounts() { // Real transactions shouldn't have duplicates, but verify we don't panic - let mut c = ExecutionCoordinator::new(1); + let mut c = new_coordinator(1); let acc = Pubkey::new_unique(); let e = c.get_ready_executor().unwrap(); @@ -296,7 +305,7 @@ fn transaction_with_duplicate_accounts() { #[test] fn replica_mode_blocks_new_transactions_when_pending() { - let mut c = ExecutionCoordinator::new(2); + let mut c = new_coordinator(2); let acc = Pubkey::new_unique(); let position = ReplayPosition { slot: 0, @@ -323,7 +332,7 @@ fn replica_mode_blocks_new_transactions_when_pending() { #[test] fn replica_mode_unblocks_when_pending_completes() { - let mut c = ExecutionCoordinator::new(2); + let mut c = new_coordinator(2); let acc = Pubkey::new_unique(); let position = ReplayPosition { slot: 0, @@ -357,7 +366,7 @@ fn replica_mode_unblocks_when_pending_completes() { #[test] fn starting_up_mode_rejects_execution_transactions() { - let c = ExecutionCoordinator::new(1); + let c = new_coordinator(1); // StartingUp mode should reject Execution transactions assert!( @@ -375,7 +384,7 @@ fn starting_up_mode_rejects_execution_transactions() { #[test] fn starting_up_mode_allows_replay_transactions() { - let c = ExecutionCoordinator::new(1); + let c = new_coordinator(1); // StartingUp mode should allow Replay transactions assert!(c.is_transaction_allowed(&TransactionProcessingMode::Replay( @@ -389,7 +398,7 @@ fn starting_up_mode_allows_replay_transactions() { #[test] fn starting_up_to_primary_mode_switch() { - let mut c = ExecutionCoordinator::new(1); + let mut c = new_coordinator(1); // Start in StartingUp, should reject Execution assert!( @@ -417,7 +426,7 @@ fn starting_up_to_primary_mode_switch() { #[test] fn starting_up_to_replica_mode_switch() { - let mut c = ExecutionCoordinator::new(1); + let mut c = new_coordinator(1); // Start in StartingUp, should allow Replay assert!(c.is_transaction_allowed(&TransactionProcessingMode::Replay( @@ -447,7 +456,7 @@ fn starting_up_to_replica_mode_switch() { #[test] fn starting_up_blocks_new_transactions_when_pending() { - let mut c = ExecutionCoordinator::new(2); + let mut c = new_coordinator(2); let acc = Pubkey::new_unique(); let position = ReplayPosition { slot: 0, diff --git a/magicblock-processor/tests/fees.rs b/magicblock-processor/tests/fees.rs index a3c4e068a..52f3204ed 100644 --- a/magicblock-processor/tests/fees.rs +++ b/magicblock-processor/tests/fees.rs @@ -157,6 +157,7 @@ async fn test_escrowed_payer_success() { #[tokio::test] async fn test_fee_charged_for_failed_transaction() { let env = ExecutionTestEnv::new(); + env.wait_for_scheduler_ready().await; let initial_bal = env.get_payer().lamports(); // Create invalid instruction (writing to empty data) @@ -187,6 +188,7 @@ async fn test_fee_charged_for_failed_transaction() { #[tokio::test] async fn test_escrow_charged_for_failed_transaction() { let env = ExecutionTestEnv::new(); + env.wait_for_scheduler_ready().await; let mut payer = env.get_payer(); payer.set_lamports(0); payer.set_delegated(false); diff --git a/magicblock-processor/tests/replay.rs b/magicblock-processor/tests/replay.rs index 81533e35a..23b52f23e 100644 --- a/magicblock-processor/tests/replay.rs +++ b/magicblock-processor/tests/replay.rs @@ -84,6 +84,7 @@ fn setup_replay_scenario_replica( pub async fn test_replay_state_transition() { // Run in Replica mode (scheduler starts in Replica, no mode switch) let env = ExecutionTestEnv::new_replica_mode(1, false); + env.yield_to_scheduler().await; let (txn, pubkeys) = setup_replay_scenario_replica( &env, diff --git a/magicblock-replicator/src/nats/consumer.rs b/magicblock-replicator/src/nats/consumer.rs index 02c05b1c0..264813137 100644 --- a/magicblock-replicator/src/nats/consumer.rs +++ b/magicblock-replicator/src/nats/consumer.rs @@ -1,8 +1,8 @@ //! Pull-based consumer for receiving replicated events. +pub use async_nats::jetstream::consumer::pull::Stream as MessageStream; use async_nats::jetstream::consumer::{ - pull::{Config as PullConfig, Stream as MessageStream}, - AckPolicy, DeliverPolicy, PullConsumer, + pull::Config as PullConfig, AckPolicy, DeliverPolicy, PullConsumer, }; use tokio_util::sync::CancellationToken; use tracing::warn; @@ -84,4 +84,25 @@ impl Consumer { } } } + + pub(crate) async fn pending( + &self, + cancel: &CancellationToken, + ) -> Option { + loop { + tokio::select! { + result = self.inner.get_info() => { + match result { + Ok(i) => break Some(i.num_pending), + Err(error) => { + warn!(%error, "failed to query consumer info") + } + } + } + _ = cancel.cancelled() => { + break None; + } + } + } + } } diff --git a/magicblock-replicator/src/nats/mod.rs b/magicblock-replicator/src/nats/mod.rs index 0c9386a76..5ec8a46de 100644 --- a/magicblock-replicator/src/nats/mod.rs +++ b/magicblock-replicator/src/nats/mod.rs @@ -16,7 +16,7 @@ mod snapshot; use async_nats::Subject; pub use broker::Broker; -pub use consumer::Consumer; +pub use consumer::{Consumer, MessageStream}; pub use lock_watcher::LockWatcher; use magicblock_core::link::replication::Message; pub use producer::Producer; diff --git a/magicblock-replicator/src/nats/producer.rs b/magicblock-replicator/src/nats/producer.rs index e2ab5137d..06b499684 100644 --- a/magicblock-replicator/src/nats/producer.rs +++ b/magicblock-replicator/src/nats/producer.rs @@ -68,4 +68,12 @@ impl Producer { } } } + + /// Release the leader lock + pub async fn release(&self) -> Result<()> { + self.lock + .delete_expect_revision(cfg::LOCK_KEY, Some(self.revision)) + .await + .map_err(Into::into) + } } diff --git a/magicblock-replicator/src/service/context.rs b/magicblock-replicator/src/service/context.rs index a9f253a57..5ecfab156 100644 --- a/magicblock-replicator/src/service/context.rs +++ b/magicblock-replicator/src/service/context.rs @@ -93,6 +93,8 @@ impl ReplicationContext { /// Writes block to ledger. pub async fn write_block(&self, block: &Block) -> Result<()> { + // wait for the scheduler to accept all of the previous block transactions + let _guard = self.scheduler.wait_for_idle().await; self.ledger .write_block(block.slot, block.timestamp, block.hash)?; Ok(()) diff --git a/magicblock-replicator/src/service/primary.rs b/magicblock-replicator/src/service/primary.rs index 5733fe9b5..7f812d44a 100644 --- a/magicblock-replicator/src/service/primary.rs +++ b/magicblock-replicator/src/service/primary.rs @@ -72,6 +72,9 @@ impl Primary { } } _ = self.ctx.cancel.cancelled() => { + if let Err(error) = self.producer.release().await { + warn!(%error, "failed to release producer lock"); + } info!("shutdown received, terminating primary mode"); return Ok(None); } diff --git a/magicblock-replicator/src/service/standby.rs b/magicblock-replicator/src/service/standby.rs index c38357efc..a47614a4c 100644 --- a/magicblock-replicator/src/service/standby.rs +++ b/magicblock-replicator/src/service/standby.rs @@ -61,14 +61,17 @@ impl Standby { tokio::select! { biased; _ = self.watcher.wait_for_expiry() => { - if self.can_promote { - info!("leader lock expired, attempting takeover"); - if let Ok(Some(producer)) = self.ctx.try_acquire_producer().await { - info!("acquired leadership, promoting"); - return self.ctx.into_primary(producer, self.messages).await.map(Some); - } - } else { + if self.has_pending().await { + continue; + } + if !self.can_promote { warn!("leader lock expired, but takeover disabled (ReplicaOnly mode)"); + continue + } + info!("leader lock expired, attempting takeover"); + if let Ok(Some(producer)) = self.ctx.try_acquire_producer().await { + info!("acquired leadership, promoting"); + return self.ctx.into_primary(producer, self.messages).await.map(Some); } } result = stream.next() => { @@ -89,13 +92,13 @@ impl Standby { } } _ = timeout_check.tick(), if self.last_activity.elapsed() > LEADER_TIMEOUT => { - if self.can_promote { - if let Ok(Some(producer)) = self.ctx.try_acquire_producer().await { - info!("acquired leadership via timeout, promoting"); - return self.ctx.into_primary(producer, self.messages).await.map(Some); - } - } else { + if !self.can_promote { warn!("leader timeout reached, but takeover disabled (ReplicaOnly mode)"); + continue; + } + if let Ok(Some(producer)) = self.ctx.try_acquire_producer().await { + info!("acquired leadership via timeout, promoting"); + return self.ctx.into_primary(producer, self.messages).await.map(Some); } } _ = self.ctx.cancel.cancelled() => { @@ -116,11 +119,15 @@ impl Standby { }; let (slot, index) = message.slot_and_index(); + let current_slot = self.ctx.slot; // Skip duplicates. - let obsolete = self.ctx.slot == slot && self.ctx.index >= index; - if self.ctx.slot > slot || obsolete { + let obsolete = current_slot == slot && self.ctx.index >= index; + if current_slot > slot || obsolete { return; } + if slot.saturating_sub(self.ctx.slot) > 1 { + error!(slot, current_slot, "slot sequence has been skipped"); + } let result = match message { Message::Transaction(txn) => { @@ -141,6 +148,15 @@ impl Standby { self.ctx.update_position(slot, index); } + /// Check whether consumer has any undelivered messages in the stream + async fn has_pending(&mut self) -> bool { + self.consumer + .pending(&self.ctx.cancel) + .await + .map(|pending| pending != 0) + .unwrap_or_default() + } + async fn replay_tx(&self, msg: Transaction) -> Result<()> { let pos = ReplayPosition { slot: msg.slot, diff --git a/test-integration/Cargo.lock b/test-integration/Cargo.lock index eefe6f0a7..8a70e5dfe 100644 --- a/test-integration/Cargo.lock +++ b/test-integration/Cargo.lock @@ -4057,6 +4057,7 @@ name = "magicblock-processor" version = "0.8.6" dependencies = [ "bincode", + "blake3", "magicblock-accounts-db", "magicblock-core", "magicblock-ledger", diff --git a/test-kit/Cargo.toml b/test-kit/Cargo.toml index fa00e9359..f71799b01 100644 --- a/test-kit/Cargo.toml +++ b/test-kit/Cargo.toml @@ -27,6 +27,7 @@ solana-transaction-status-client-types = { workspace = true } tempfile = { workspace = true } tokio = { workspace = true } +tokio-util = { workspace = true } tracing = { workspace = true } tracing-log = { workspace = true } tracing-subscriber = { workspace = true } diff --git a/test-kit/src/lib.rs b/test-kit/src/lib.rs index efa899771..eb2fe9023 100644 --- a/test-kit/src/lib.rs +++ b/test-kit/src/lib.rs @@ -4,14 +4,14 @@ use std::{ atomic::{AtomicUsize, Ordering}, Arc, }, - thread, + thread::{self, JoinHandle}, + time::Duration, }; pub use guinea; use magicblock_accounts_db::{traits::AccountsBank, AccountsDb}; use magicblock_core::{ link::{ - blocks::{BlockMeta, BlockUpdate, BlockUpdateTx}, link, transactions::{ ReplayPosition, SanitizeableTransaction, SchedulerMode, @@ -40,8 +40,11 @@ use solana_transaction_error::TransactionResult; use solana_transaction_status_client_types::TransactionStatusMeta; use tempfile::TempDir; use tokio::sync::mpsc::Sender; +use tokio_util::sync::CancellationToken; use tracing::{error, instrument}; +pub const BLOCK_TIME: Duration = Duration::from_millis(50); +pub const SUPERBLOCK_SIZE: u64 = 72000; /// A simulated validator backend for integration tests. /// /// This struct encapsulates all the core components of a validator, including @@ -63,10 +66,12 @@ pub struct ExecutionTestEnv { pub dir: TempDir, /// The "client-side" channel endpoints for listening to validator events. pub dispatch: DispatchEndpoints, - /// The "server-side" channel endpoint for broadcasting new block updates. - pub blocks_tx: BlockUpdateTx, /// Transaction execution scheduler/backend for deferred launch pub scheduler: Option, + /// Join handle for the spawned scheduler thread. + scheduler_thread: Option>, + /// Shutdown token for the scheduler runtime. + shutdown: CancellationToken, /// Sender for transitioning from StartingUp to Primary/Replica mode mode_tx: Sender, } @@ -139,6 +144,7 @@ impl ExecutionTestEnv { let (mode_tx, mode_rx) = tokio::sync::mpsc::channel(1); + let shutdown = CancellationToken::new(); let mut this = Self { payer_index: AtomicUsize::new(0), payers, @@ -147,8 +153,9 @@ impl ExecutionTestEnv { transaction_scheduler: dispatch.transaction_scheduler.clone(), dir, dispatch, - blocks_tx: validator_channels.block_update, scheduler: None, + scheduler_thread: None, + shutdown: shutdown.clone(), mode_tx, }; this.advance_slot(); // Move to slot 1 to ensure a non-genesis state. @@ -170,9 +177,11 @@ impl ExecutionTestEnv { replication_tx: validator_channels.replication_messages, environment, is_auto_airdrop_lamports_enabled: false, - shutdown: Default::default(), + shutdown, mode_rx, pause_permit: validator_channels.pause_permit, + block_time: BLOCK_TIME, + superblock_size: SUPERBLOCK_SIZE, }; // Pre-send the target mode so the scheduler picks it up once running. @@ -187,7 +196,7 @@ impl ExecutionTestEnv { if defer_startup { this.scheduler.replace(scheduler); } else { - scheduler.spawn(); + this.scheduler_thread = Some(scheduler.spawn()); } for payer in this.payers.iter() { @@ -198,7 +207,7 @@ impl ExecutionTestEnv { pub fn run_scheduler(&mut self) { if let Some(scheduler) = self.scheduler.take() { - scheduler.spawn(); + self.scheduler_thread = Some(scheduler.spawn()); } } @@ -212,6 +221,40 @@ impl ExecutionTestEnv { .expect("failed to send target mode to mode_tx"); } + /// Waits for the scheduler to be ready and in primary mode. + /// + /// This is achieved by waiting for the slot to advance, which indicates + /// the scheduler has processed the mode switch and is running. + pub async fn wait_for_scheduler_ready(&self) { + let initial_slot = self.ledger.latest_block().load().slot; + let start = std::time::Instant::now(); + loop { + let current_slot = self.ledger.latest_block().load().slot; + if current_slot > initial_slot { + break; + } + if start.elapsed() > std::time::Duration::from_secs(5) { + panic!( + "Timed out waiting for scheduler to be ready: slot {current_slot}" + ); + } + tokio::time::sleep(std::time::Duration::from_millis(1)).await; + } + } + + /// Waits for the scheduler to start and be ready to process transactions. + /// + /// This is a simpler version that just yields to allow the scheduler time to start. + /// Use this for tests that don't need to wait for slot advancement (e.g., replay tests). + pub async fn yield_to_scheduler(&self) { + // Give the scheduler time to start and process any pending mode switches + for _ in 0..10 { + tokio::task::yield_now().await; + } + // Small additional delay to ensure the scheduler is in its select! loop + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + } + /// Creates a new account with the specified properties. /// Note: This helper automatically marks the account as `delegated`. pub fn create_account_with_config( @@ -282,12 +325,6 @@ impl ExecutionTestEnv { .expect("failed to write new block to the ledger"); self.accountsdb.set_slot(slot); - // Notify the system that a new block was produced. - let _ = self.blocks_tx.send(BlockUpdate { - hash, - meta: BlockMeta { slot, time }, - }); - // Yield to allow other tasks (like the executor) to process the slot change. thread::yield_now(); slot @@ -422,6 +459,15 @@ impl ExecutionTestEnv { } } +impl Drop for ExecutionTestEnv { + fn drop(&mut self) { + self.shutdown.cancel(); + if let Some(handle) = self.scheduler_thread.take() { + let _ = handle.join(); + } + } +} + pub struct CommitableAccount<'db> { pub pubkey: Pubkey, pub account: AccountSharedData,