diff --git a/Cargo.lock b/Cargo.lock index 0e028f9fe..68548b009 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8369,6 +8369,7 @@ dependencies = [ "solana-transaction", "solana-transaction-status-client-types", "tempfile", + "tokio", "tracing", "tracing-log", "tracing-subscriber", diff --git a/magicblock-api/src/magic_validator.rs b/magicblock-api/src/magic_validator.rs index 7be891bd7..6537e634c 100644 --- a/magicblock-api/src/magic_validator.rs +++ b/magicblock-api/src/magic_validator.rs @@ -35,7 +35,8 @@ use magicblock_committor_service::{ }; use magicblock_config::{ config::{ - ChainOperationConfig, LedgerConfig, LifecycleMode, LoadableProgram, + validator::ReplicationMode, ChainOperationConfig, LedgerConfig, + LifecycleMode, LoadableProgram, }, ValidatorParams, }; @@ -54,7 +55,10 @@ use magicblock_metrics::{metrics::TRANSACTION_COUNT, MetricsService}; use magicblock_processor::{ build_svm_env, loader::load_upgradeable_programs, - scheduler::{state::TransactionSchedulerState, TransactionScheduler}, + scheduler::{ + state::{SchedulerMode, TransactionSchedulerState}, + TransactionScheduler, + }, }; use magicblock_program::{ init_magic_sys, @@ -75,7 +79,10 @@ use solana_native_token::LAMPORTS_PER_SOL; use solana_pubkey::Pubkey; use solana_rpc_client::nonblocking::rpc_client::RpcClient; use solana_signer::Signer; -use tokio::runtime::Builder; +use tokio::{ + runtime::Builder, + sync::mpsc::{channel, Sender}, +}; use tokio_util::sync::CancellationToken; use tracing::*; @@ -125,6 +132,7 @@ pub struct MagicValidator { claim_fees_task: ClaimFeesTask, task_scheduler: Option, transaction_execution: thread::JoinHandle<()>, + mode_tx: Sender, } impl MagicValidator { @@ -252,6 +260,9 @@ impl MagicValidator { validator::init_validator_authority(identity_keypair); let base_fee = config.validator.basefee; + + // Mode switcher for transitioning from Replica to Primary mode after ledger replay + let (mode_tx, mode_rx) = channel(1); let txn_scheduler_state = TransactionSchedulerState { accountsdb: accountsdb.clone(), ledger: ledger.clone(), @@ -265,6 +276,7 @@ impl MagicValidator { .auto_airdrop_lamports > 0, shutdown: token.clone(), + mode_rx, }; TRANSACTION_COUNT.inc_by(ledger.count_transactions()? as u64); // Faucet keypair is only used for airdrops, which are not allowed in @@ -368,6 +380,7 @@ impl MagicValidator { block_udpate_tx: validator_channels.block_update, task_scheduler: Some(task_scheduler), transaction_execution, + mode_tx, }) } @@ -684,6 +697,17 @@ impl MagicValidator { // Ledger processing needs to happen before anything of the below let step_start = Instant::now(); self.maybe_process_ledger().await?; + + // Switch scheduler to Primary mode after ledger replay completes. + // Primary validators accept client transactions; Replica validators stay + // in Replica mode to receive transactions from the primary. + if let ReplicationMode::Standalone = + self.config.validator.replication_mode + { + // Ignore send errors: scheduler may have shut down. + let _ = self.mode_tx.send(SchedulerMode::Primary).await; + } + log_timing("startup", "maybe_process_ledger", step_start); // Ledger replay has completed, we can now clean non-delegated accounts diff --git a/magicblock-config/src/config/validator.rs b/magicblock-config/src/config/validator.rs index 9ffce0d53..26febbb6f 100644 --- a/magicblock-config/src/config/validator.rs +++ b/magicblock-config/src/config/validator.rs @@ -1,6 +1,7 @@ // src/config/validator.rs use serde::{Deserialize, Serialize}; use solana_keypair::Keypair; +use url::Url; use crate::{consts, types::SerdeKeypair}; @@ -13,6 +14,20 @@ pub struct ValidatorConfig { /// The validator's identity keypair, encoded in Base58. pub keypair: SerdeKeypair, + + /// Replication role: Primary accepts client transactions, Replica replays from Primary. + pub replication_mode: ReplicationMode, +} + +/// Defines the validator's role in a replication setup. +#[derive(Deserialize, Serialize, Debug, Clone)] +pub enum ReplicationMode { + // Validator which doesn't participate in replication + Standalone, + /// Validator which participates in replication: acting as either a primary or replicator + StandBy(Url), + /// Validator which participates in replication only as replicator (no takeover) + ReplicatOnly(Url), } impl Default for ValidatorConfig { @@ -22,6 +37,7 @@ impl Default for ValidatorConfig { Self { basefee: consts::DEFAULT_BASE_FEE, keypair: SerdeKeypair(keypair), + replication_mode: ReplicationMode::Standalone, } } } diff --git a/magicblock-core/src/link/transactions.rs b/magicblock-core/src/link/transactions.rs index e7cda92b8..899555e29 100644 --- a/magicblock-core/src/link/transactions.rs +++ b/magicblock-core/src/link/transactions.rs @@ -75,15 +75,37 @@ pub struct ProcessableTransaction { pub encoded: Option>, } +/// Specifies the position and persistence behavior for replaying a transaction. +/// +/// During replication, transactions must be replayed at the same slot and index +/// as they appeared on the primary to maintain ordering consistency. +#[derive(Clone, Copy)] +pub struct ReplayPosition { + /// The slot in which the transaction was originally included. + pub slot: Slot, + /// The transaction's index within that slot (0-based). + pub index: u32, + /// Whether to persist the replay to the ledger and broadcast status. + /// - `true`: Record to ledger + broadcast (for replay from primary/replicator) + /// - `false`: No recording, no broadcast (for local ledger replay during startup) + pub persist: bool, +} + /// An enum that specifies how a transaction should be processed by the scheduler. -/// Each variant also carries the one-shot sender to return the result to the original caller. +/// +/// Variants that require result notification carry a one-shot sender: +/// - `Simulation` and `Execution` return results to the caller +/// - `Replay` is fire-and-forget (no sender, just position/persistence info) pub enum TransactionProcessingMode { /// Process the transaction as a simulation. Simulation(TxnSimulationResultTx), /// Process the transaction for standard execution. Execution(TxnExecutionResultTx), - /// Replay the transaction against the current state without persistence to the ledger. - Replay(TxnReplayResultTx), + /// Replay the transaction at a specific slot/index position. + /// + /// The `ReplayPosition` specifies where to record the transaction in the ledger + /// and whether to persist/broadcast the result. + Replay(ReplayPosition), } /// The detailed outcome of a transaction simulation. @@ -250,14 +272,31 @@ impl TransactionSchedulerHandle { self.send(txn, mode).await } - /// Submits a transaction to be replayed against the - /// current accountsdb state and awaits the result. + /// Submits a transaction to be replayed against the current accountsdb state. + /// + /// Unlike `execute()`, this method is fire-and-forget: it returns success + /// once the transaction is queued, not after execution completes. + /// + /// # Arguments + /// * `position` - The slot/index at which to record the transaction, plus + /// whether to persist to ledger and broadcast status + /// * `txn` - The transaction to replay pub async fn replay( &self, + position: ReplayPosition, txn: impl SanitizeableTransaction, ) -> TransactionResult { - let mode = TransactionProcessingMode::Replay; - self.send(txn, mode).await? + let mode = TransactionProcessingMode::Replay(position); + let transaction = txn.sanitize(true)?; + let txn = ProcessableTransaction { + transaction, + mode, + encoded: None, + }; + self.0 + .send(txn) + .await + .map_err(|_| TransactionError::ClusterMaintenance) } /// A private helper that handles the common logic of sanitizing, sending a diff --git a/magicblock-ledger/Cargo.toml b/magicblock-ledger/Cargo.toml index 04e6b2aef..7f776df88 100644 --- a/magicblock-ledger/Cargo.toml +++ b/magicblock-ledger/Cargo.toml @@ -34,7 +34,7 @@ solana-signature = { workspace = true, features = ["rand"] } solana-signer = { workspace = true } solana-keypair = { workspace = true } solana-instruction = { workspace = true } -solana-transaction = { workspace = true } +solana-transaction = { workspace = true, features = ["blake3", "verify"] } solana-transaction-error = { workspace = true } solana-message = { workspace = true } solana-transaction-context = { workspace = true } diff --git a/magicblock-ledger/src/blockstore_processor/mod.rs b/magicblock-ledger/src/blockstore_processor/mod.rs index d1a48f720..7782c7133 100644 --- a/magicblock-ledger/src/blockstore_processor/mod.rs +++ b/magicblock-ledger/src/blockstore_processor/mod.rs @@ -1,7 +1,7 @@ use std::str::FromStr; use magicblock_core::link::transactions::{ - SanitizeableTransaction, TransactionSchedulerHandle, + ReplayPosition, SanitizeableTransaction, TransactionSchedulerHandle, }; use num_format::{Locale, ToFormattedString}; use solana_clock::{Slot, UnixTimestamp}; @@ -64,11 +64,7 @@ async fn replay_blocks( if enabled!(Level::INFO) && slot.is_multiple_of(PROGRESS_REPORT_INTERVAL) { - info!( - slot = %slot.to_formatted_string(&Locale::en), - max_slot = %max_slot, - "Processing block" - ); + info!(slot, max_slot, "Processing block"); } let VersionedConfirmedBlock { @@ -131,10 +127,16 @@ async fn replay_blocks( let txn = txn.sanitize(false).map_err(|err| { LedgerError::BlockStoreProcessor(err.to_string()) })?; + let position = ReplayPosition { + slot: block.slot, + // TODO(bmuddha/thlorenz): retrieve the proper transaction index + index: 0, + persist: false, + }; let result = - transaction_scheduler.replay(txn).await.map_err(|err| { - LedgerError::BlockStoreProcessor(err.to_string()) - }); + transaction_scheduler.replay(position, txn).await.map_err( + |err| LedgerError::BlockStoreProcessor(err.to_string()), + ); if !enabled!(Level::TRACE) { debug!(signature = %signature, result = ?result, "Transaction replay result"); } diff --git a/magicblock-ledger/src/conversions/mod.rs b/magicblock-ledger/src/conversions/mod.rs deleted file mode 100644 index 6f522b9cc..000000000 --- a/magicblock-ledger/src/conversions/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -// NOTE: the opposite of ./solana/storage-proto/src/convert.rs -pub mod transaction; diff --git a/magicblock-ledger/src/conversions/transaction.rs b/magicblock-ledger/src/conversions/transaction.rs deleted file mode 100644 index 68bdd75df..000000000 --- a/magicblock-ledger/src/conversions/transaction.rs +++ /dev/null @@ -1,353 +0,0 @@ -use solana_account_decoder::parse_token::UiTokenAmount; -use solana_clock::{Slot, UnixTimestamp}; -use solana_hash::{Hash, HASH_BYTES}; -use solana_message::{ - compiled_instruction::CompiledInstruction, - v0::{self, LoadedAddresses}, - Message, MessageHeader, VersionedMessage, -}; -use solana_pubkey::Pubkey; -use solana_signature::Signature; -use solana_storage_proto::convert::generated; -use solana_transaction::{versioned::VersionedTransaction, Transaction}; -use solana_transaction_context::TransactionReturnData; -use solana_transaction_error::{TransactionError, TransactionResult}; -use solana_transaction_status::{ - ConfirmedTransactionWithStatusMeta, InnerInstruction, InnerInstructions, - Reward, RewardType, TransactionStatusMeta, TransactionTokenBalance, - TransactionWithStatusMeta, VersionedTransactionWithStatusMeta, -}; -use tracing::*; - -pub fn from_generated_confirmed_transaction( - slot: Slot, - tx: generated::ConfirmedTransaction, - block_time: Option, -) -> ConfirmedTransactionWithStatusMeta { - let tx_with_meta = tx_with_meta_from_generated(tx); - ConfirmedTransactionWithStatusMeta { - slot, - block_time, - tx_with_meta, - } -} -fn tx_with_meta_from_generated( - tx: generated::ConfirmedTransaction, -) -> TransactionWithStatusMeta { - let meta = tx.meta.map(tx_meta_from_generated); - - use TransactionWithStatusMeta::*; - match meta { - Some(meta) => { - let transaction = tx.transaction.map(versioned_tx_from_generated).expect( - "Never should store confirmed transaction without a transaction", - ); - Complete(VersionedTransactionWithStatusMeta { transaction, meta }) - } - None => { - let transaction = tx.transaction.map(tx_from_generated).expect( - "Never should store confirmed transaction without a transaction", - ); - MissingMetadata(transaction) - } - } -} - -// ----------------- -// Transaction Conversions -// ----------------- -fn tx_from_generated(tx: generated::Transaction) -> Transaction { - let message = tx.message.map(message_from_generated).unwrap_or_default(); - let signatures = signatures_from_slices(tx.signatures); - Transaction { - signatures, - message, - } -} -fn message_from_generated(msg: generated::Message) -> Message { - let account_keys = pubkeys_from_slices(msg.account_keys); - - let recent_blockhash = - <[u8; HASH_BYTES]>::try_from(msg.recent_blockhash.as_slice()) - .map(Hash::new_from_array) - .expect("failed to construct hash from slice"); - Message { - account_keys, - recent_blockhash, - header: msg.header.map(header_from_generated).unwrap_or_default(), - instructions: msg - .instructions - .into_iter() - .map(compiled_instruction_from_generated) - .collect(), - } -} - -fn versioned_tx_from_generated( - tx: generated::Transaction, -) -> VersionedTransaction { - let message = tx - .message - .map(versioned_message_from_generated) - .unwrap_or_default(); - let signatures = signatures_from_slices(tx.signatures); - VersionedTransaction { - signatures, - message, - } -} - -fn versioned_message_from_generated( - msg: generated::Message, -) -> VersionedMessage { - let account_keys = pubkeys_from_slices(msg.account_keys); - let recent_blockhash = - <[u8; HASH_BYTES]>::try_from(msg.recent_blockhash.as_slice()) - .map(Hash::new_from_array) - .expect("failed to construct hash from slice"); - let message = v0::Message { - header: msg.header.map(header_from_generated).unwrap_or_default(), - recent_blockhash, - account_keys, - instructions: msg - .instructions - .into_iter() - .map(compiled_instruction_from_generated) - .collect(), - address_table_lookups: msg - .address_table_lookups - .into_iter() - .flat_map(try_address_table_lookup_from_generated) - .collect(), - }; - VersionedMessage::V0(message) -} - -fn header_from_generated(header: generated::MessageHeader) -> MessageHeader { - MessageHeader { - num_required_signatures: header.num_required_signatures as u8, - num_readonly_signed_accounts: header.num_readonly_signed_accounts as u8, - num_readonly_unsigned_accounts: header.num_readonly_unsigned_accounts - as u8, - } -} - -fn compiled_instruction_from_generated( - instruction: generated::CompiledInstruction, -) -> CompiledInstruction { - let program_id_index = instruction.program_id_index as u8; - let accounts = instruction.accounts; - let data = instruction.data; - CompiledInstruction { - program_id_index, - accounts, - data, - } -} - -fn try_address_table_lookup_from_generated( - lookup: generated::MessageAddressTableLookup, -) -> Option { - let account_key = match Pubkey::try_from(lookup.account_key) { - Ok(pubkey) => pubkey, - Err(err) => { - warn!(error = ?err, "Invalid pubkey"); - return None; - } - }; - let writable_indexes = lookup.writable_indexes; - let readonly_indexes = lookup.readonly_indexes; - Some(v0::MessageAddressTableLookup { - account_key, - writable_indexes, - readonly_indexes, - }) -} - -fn signatures_from_slices(signatures: Vec>) -> Vec { - signatures - .into_iter() - .flat_map(|slice| { - Signature::try_from(slice.as_slice()) - .inspect_err(|e| { - warn!(error = ?e, "Invalid signature"); - }) - .ok() - }) - .collect() -} - -// ----------------- -// TransactionStatus Meta Conversions -// ----------------- -fn tx_meta_from_generated( - meta: generated::TransactionStatusMeta, -) -> solana_transaction_status::TransactionStatusMeta { - let inner_instructions = - inner_instructions_from_generated(meta.inner_instructions); - let rewards = rewards_from_generated(meta.rewards); - let pre_token_balances = - token_balances_from_generated(meta.pre_token_balances); - let post_token_balances = - token_balances_from_generated(meta.post_token_balances); - let status = status_from_generated(meta.err); - let return_data = return_data_from_generated(meta.return_data); - TransactionStatusMeta { - status, - compute_units_consumed: meta.compute_units_consumed, - loaded_addresses: LoadedAddresses { - writable: pubkeys_from_slices(meta.loaded_writable_addresses), - readonly: pubkeys_from_slices(meta.loaded_readonly_addresses), - }, - fee: meta.fee, - pre_balances: meta.pre_balances, - post_balances: meta.post_balances, - inner_instructions: Some(inner_instructions), - log_messages: Some(meta.log_messages), - pre_token_balances: Some(pre_token_balances), - post_token_balances: Some(post_token_balances), - return_data, - rewards: Some(rewards), - } -} - -fn status_from_generated( - err: Option, -) -> TransactionResult<()> { - match err { - None => Ok(()), - Some(err) => { - let e: Option = bincode::deserialize(&err.err) - .map_err(|e| { - warn!(error = ?e, "Invalid transaction error"); - e - }) - .ok(); - - match e { - Some(err) => TransactionResult::Err(err), - None => TransactionResult::Ok(()), - } - } - } -} - -fn inner_instructions_from_generated( - inner_instructions: Vec, -) -> Vec { - inner_instructions - .into_iter() - .map(|inner_instructions| InnerInstructions { - index: inner_instructions.index as u8, - instructions: inner_instructions - .instructions - .into_iter() - .map(|ix| { - let stack_height = ix.stack_height(); - InnerInstruction { - instruction: CompiledInstruction { - program_id_index: ix.program_id_index as u8, - accounts: ix.accounts, - data: ix.data, - }, - stack_height: Some(stack_height), - } - }) - .collect(), - }) - .collect() -} - -fn pubkeys_from_slices(pubkeys: Vec>) -> Vec { - pubkeys - .into_iter() - .flat_map(|slice| { - Pubkey::try_from(slice) - .map_err(|e| { - warn!(error = ?e, "Invalid pubkey"); - e - }) - .ok() - }) - .collect() -} - -fn token_balances_from_generated( - token_balances: Vec, -) -> Vec { - token_balances - .into_iter() - .map(|tb| { - let ui_token_amount = tb - .ui_token_amount - .map(token_amount_from_generated) - .unwrap_or(UiTokenAmount { - ui_amount: None, - decimals: Default::default(), - amount: Default::default(), - ui_amount_string: Default::default(), - }); - TransactionTokenBalance { - account_index: tb.account_index as u8, - mint: tb.mint, - ui_token_amount, - owner: tb.owner, - program_id: tb.program_id, - } - }) - .collect() -} - -fn token_amount_from_generated( - token_amount: generated::UiTokenAmount, -) -> UiTokenAmount { - UiTokenAmount { - ui_amount: Some(token_amount.ui_amount), - decimals: token_amount.decimals as u8, - amount: token_amount.amount, - ui_amount_string: token_amount.ui_amount_string, - } -} - -fn rewards_from_generated(rewards: Vec) -> Vec { - rewards - .into_iter() - .map(|r| Reward { - pubkey: r.pubkey, - lamports: r.lamports, - post_balance: r.post_balance, - reward_type: reward_type_from(r.reward_type), - // NOTE: we don't support votes nor staking - commission: None, - }) - .collect() -} - -fn reward_type_from(n: i32) -> Option { - use RewardType::*; - match n { - 0 => Some(Fee), - 1 => Some(Rent), - 2 => Some(Staking), - 3 => Some(Voting), - _ => None, - } -} - -fn return_data_from_generated( - data: Option, -) -> Option { - match data { - None => None, - Some(data) => match Pubkey::try_from(data.program_id) { - Err(e) => { - warn!(error = ?e, "Invalid pubkey"); - None - } - Ok(program_id) => Some(TransactionReturnData { - program_id, - data: data.data, - }), - }, - } -} diff --git a/magicblock-ledger/src/database/columns.rs b/magicblock-ledger/src/database/columns.rs index 4eae81b87..df32ce691 100644 --- a/magicblock-ledger/src/database/columns.rs +++ b/magicblock-ledger/src/database/columns.rs @@ -73,7 +73,7 @@ pub struct Blockhash; /// together from them /// /// * index type: `(`[`Signature`]`, `[`Slot`])` -/// * value type: [`generated::Transaction`] +/// * value type: `Vec` (bincode-serialized `VersionedTransaction`) pub struct Transaction; /// The transaction memos column @@ -508,8 +508,8 @@ impl ColumnName for Transaction { const NAME: &'static str = CONFIRMED_TRANSACTION_CF; } -impl ProtobufColumn for Transaction { - type Type = generated::Transaction; +impl TypedColumn for Transaction { + type Type = Vec; } // Even though it is deprecated it is needed to implement iter_current_index_filtered diff --git a/magicblock-ledger/src/lib.rs b/magicblock-ledger/src/lib.rs index abd9a2dca..b257fd952 100644 --- a/magicblock-ledger/src/lib.rs +++ b/magicblock-ledger/src/lib.rs @@ -85,7 +85,7 @@ impl LatestBlock { } pub mod blockstore_processor; -mod conversions; + mod database; pub mod errors; pub mod ledger_truncator; diff --git a/magicblock-ledger/src/store/api.rs b/magicblock-ledger/src/store/api.rs index 42d58f784..967a8959a 100644 --- a/magicblock-ledger/src/store/api.rs +++ b/magicblock-ledger/src/store/api.rs @@ -3,7 +3,7 @@ use std::{ fmt, fs, path::{Path, PathBuf}, sync::{ - atomic::{AtomicI64, AtomicU32, Ordering}, + atomic::{AtomicI64, Ordering}, Arc, RwLock, }, }; @@ -15,25 +15,22 @@ use magicblock_metrics::metrics::{ HistogramTimer, }; use rocksdb::{Direction as IteratorDirection, FlushOptions}; -use scc::HashCache; use solana_clock::{Slot, UnixTimestamp}; use solana_hash::{Hash, HASH_BYTES}; use solana_measure::measure::Measure; use solana_pubkey::Pubkey; use solana_signature::Signature; -use solana_storage_proto::convert::generated::{self, ConfirmedTransaction}; -use solana_transaction::{ - sanitized::SanitizedTransaction, versioned::VersionedTransaction, -}; +use solana_storage_proto::convert::generated; +use solana_transaction::versioned::VersionedTransaction; use solana_transaction_status::{ ConfirmedTransactionStatusWithSignature, ConfirmedTransactionWithStatusMeta, TransactionStatusMeta, - VersionedConfirmedBlock, VersionedTransactionWithStatusMeta, + TransactionWithStatusMeta, VersionedConfirmedBlock, + VersionedTransactionWithStatusMeta, }; use tracing::*; use crate::{ - conversions::transaction, database::{ columns::{self as cf, Column, ColumnName, DIRTY_COUNT}, db::Database, @@ -74,7 +71,6 @@ pub struct Ledger { lowest_cleanup_slot: RwLock, rpc_api_metrics: LedgerRpcApiMetrics, latest_block: LatestBlock, - block_txn_indexes: HashCache, } impl fmt::Display for Ledger { @@ -169,7 +165,6 @@ impl Ledger { lowest_cleanup_slot: RwLock::::default(), rpc_api_metrics: LedgerRpcApiMetrics::default(), latest_block, - block_txn_indexes: HashCache::default(), }; let (slot, blockhash) = ledger.get_max_blockhash()?; let time = ledger.get_block_time(slot)?.unwrap_or_default(); @@ -314,6 +309,56 @@ impl Ledger { Ok((slot, hash)) } + /// Returns the highest transaction index for a given slot. + /// + /// Uses a reverse iterator from `(slot, u32::MAX)` to find the first + /// (highest) index in O(1) time. + /// + /// Returns `None` if no transactions exist in the slot. + pub fn get_highest_transaction_index_for_slot( + &self, + slot: Slot, + ) -> LedgerResult> { + let mut iter = self.slot_signatures_cf.iter(IteratorMode::From( + (slot, u32::MAX), + IteratorDirection::Reverse, + ))?; + + match iter.next() { + Some(((tx_slot, tx_index), _)) if tx_slot == slot => { + Ok(Some(tx_index)) + } + _ => Ok(None), + } + } + + /// Returns the position (slot, index) of the most recent transaction. + /// + /// This is useful for resuming replication from the last known position. + /// Returns `None` if no transactions exist in the ledger. + pub fn get_latest_transaction_position( + &self, + ) -> LedgerResult> { + let (latest_slot, _) = self.get_max_blockhash()?; + + // Try to find the highest index in the latest slot + if let Some(index) = + self.get_highest_transaction_index_for_slot(latest_slot)? + { + return Ok(Some((latest_slot, index))); + } + + // If the latest slot has no transactions, check previous slots + // by iterating backwards through slot_signatures_cf + let mut iter = self.slot_signatures_cf.iter(IteratorMode::End)?; + + if let Some(((slot, index), _)) = iter.next() { + Ok(Some((slot, index))) + } else { + Ok(None) + } + } + // ----------------- // Block // ----------------- @@ -333,7 +378,6 @@ impl Ledger { self.blockhash_cf.put(slot, &blockhash)?; self.blockhash_cf.try_increase_entry_counter(1); self.latest_block.store(slot, blockhash, timestamp); - let _ = self.block_txn_indexes.put(slot, AtomicU32::new(0)); Ok(()) } @@ -372,9 +416,7 @@ impl Ledger { .into_iter() .map(|tx_signature| { let transaction = self - .transaction_cf - .get_protobuf((tx_signature, slot))? - .map(VersionedTransaction::from) + .read_transaction((tx_signature, slot))? .ok_or(LedgerError::TransactionNotFound)?; let meta = self .transaction_status_cf @@ -788,23 +830,56 @@ impl Ledger { match self .get_confirmed_transaction(signature, highest_confirmed_slot)? { - Some((slot, tx)) => { + Some((slot, transaction, meta)) => { let block_time = self.get_block_time(slot)?; - let tx = transaction::from_generated_confirmed_transaction( - slot, tx, block_time, - ); - Ok(Some(tx)) + let tx_with_meta = match (transaction, meta) { + (Some(transaction), Some(meta)) => { + TransactionWithStatusMeta::Complete( + VersionedTransactionWithStatusMeta { + transaction, + meta, + }, + ) + } + (Some(transaction), None) => { + let legacy_tx = transaction + .into_legacy_transaction() + .ok_or_else(|| { + LedgerError::TransactionConversionError( + "failed to convert versioned transaction to legacy: \ + transaction is v0 (requires metadata)" + .to_string(), + ) + })?; + TransactionWithStatusMeta::MissingMetadata(legacy_tx) + } + (None, Some(_)) | (None, None) => { + return Ok(None); + } + }; + Ok(Some(ConfirmedTransactionWithStatusMeta { + slot, + block_time, + tx_with_meta, + })) } None => Ok(None), } } /// Returns a confirmed transaction and the slot at which it was confirmed + #[allow(clippy::type_complexity)] fn get_confirmed_transaction( &self, signature: Signature, highest_confirmed_slot: Slot, - ) -> LedgerResult> { + ) -> LedgerResult< + Option<( + Slot, + Option, + Option, + )>, + > { self.rpc_api_metrics .num_get_complete_transaction .fetch_add(1, Ordering::Relaxed); @@ -832,15 +907,11 @@ impl Ledger { if slot <= highest_confirmed_slot && tx_signature == signature { - let slot_and_tx = self - .transaction_cf - .get_protobuf((tx_signature, slot))? - .map(|tx| (slot, tx)); - if let Some((slot, tx)) = slot_and_tx { - (slot, Some(tx), None) - } else { - // We have a slot, but couldn't resolve a proper transaction - return Ok(None); + let transaction = + self.read_transaction((tx_signature, slot))?; + match transaction { + Some(tx) => (slot, Some(tx), None), + None => return Ok(None), } } else { return Ok(None); @@ -854,59 +925,85 @@ impl Ledger { } }; - Ok(Some(( - slot, - ConfirmedTransaction { - transaction, - meta: meta.map(|x| x.into()), - }, - ))) + Ok(Some((slot, transaction, meta))) } /// Writes a confirmed transaction pieced together from the provided inputs /// * `signature` - Signature of the transaction /// * `slot` - Slot at which the transaction was confirmed - /// * `transaction` - Transaction to be written, we take a SanititizedTransaction here - /// since that is what we provide Geyser as well + /// * `writable_keys` - Writable account keys from the transaction + /// * `readonly_keys` - Readonly account keys from the transaction + /// * `encoded_transaction` - Bincode-serialized `VersionedTransaction` /// * `status` - status of the transaction + #[allow(clippy::too_many_arguments)] pub fn write_transaction( &self, signature: Signature, slot: Slot, - transaction: &SanitizedTransaction, + index: u32, + writable_keys: Vec<&Pubkey>, + readonly_keys: Vec<&Pubkey>, + encoded_transaction: &[u8], status: TransactionStatusMeta, - ) -> LedgerResult { - let tx_account_locks = transaction.get_account_locks_unchecked(); - + ) -> LedgerResult<()> { // 1. Write Transaction Status - let index = self.write_transaction_status( + self.write_transaction_status( slot, + index, signature, - tx_account_locks.writable, - tx_account_locks.readonly, + writable_keys, + readonly_keys, status, )?; - // 2. Write Transaction - let versioned = transaction.to_versioned_transaction(); - let transaction: generated::Transaction = versioned.into(); - + // 2. Write Transaction (raw bincode bytes) self.transaction_cf - .put_protobuf((signature, slot), &transaction)?; + .put_bytes((signature, slot), encoded_transaction)?; self.transaction_cf.try_increase_entry_counter(1); - Ok(index) + Ok(()) } pub fn read_transaction( &self, index: (Signature, Slot), - ) -> LedgerResult> { + ) -> LedgerResult> { let result = { let (_lock, _) = self.ensure_lowest_cleanup_slot(); - self.transaction_cf.get_protobuf(index) + self.transaction_cf.get_bytes(index) }?; - Ok(result) + match result { + Some(bytes) => { + let tx: VersionedTransaction = deserialize(&bytes)?; + Ok(Some(tx)) + } + None => Ok(None), + } + } + + /// Verifies the signature of a transaction stored in the ledger. + /// + /// Returns: + /// - `None` if no transaction with that signature exists + /// - `Some(true)` if the transaction exists and its signature is valid + /// - `Some(false)` if the transaction exists but its signature is + /// invalid + pub fn verify_transaction_signature( + &self, + signature: &Signature, + ) -> LedgerResult> { + let slot = match self.get_transaction_status(*signature, u64::MAX)? { + Some((slot, _meta)) => slot, + None => return Ok(None), + }; + + let transaction = match self.read_transaction((*signature, slot))? { + Some(tx) => tx, + None => return Ok(None), + }; + + let is_valid = transaction.verify_and_hash_message().is_ok(); + Ok(Some(is_valid)) } pub fn count_transactions(&self) -> LedgerResult { @@ -1002,35 +1099,28 @@ impl Ledger { fn write_transaction_status( &self, slot: Slot, + index: u32, signature: Signature, writable_keys: Vec<&Pubkey>, readonly_keys: Vec<&Pubkey>, status: TransactionStatusMeta, - ) -> LedgerResult { - let transaction_slot_index = self - .block_txn_indexes - .entry(slot) - .or_default() - .1 - .fetch_add(1, Ordering::Relaxed); - + ) -> LedgerResult<()> { for address in writable_keys { self.address_signatures_cf.put( - (*address, slot, transaction_slot_index, signature), + (*address, slot, index, signature), &AddressSignatureMeta { writeable: true }, )?; self.address_signatures_cf.try_increase_entry_counter(1); } for address in readonly_keys { self.address_signatures_cf.put( - (*address, slot, transaction_slot_index, signature), + (*address, slot, index, signature), &AddressSignatureMeta { writeable: false }, )?; self.address_signatures_cf.try_increase_entry_counter(1); } - self.slot_signatures_cf - .put((slot, transaction_slot_index), &signature)?; + self.slot_signatures_cf.put((slot, index), &signature)?; self.slot_signatures_cf.try_increase_entry_counter(1); let status = status.into(); @@ -1049,8 +1139,7 @@ impl Ledger { 1, ); } - - Ok(transaction_slot_index) + Ok(()) } /// Returns an iterator over all transaction statuses. @@ -1343,6 +1432,7 @@ mod tests { use solana_pubkey::Pubkey; use solana_signature::Signature; use solana_signer::Signer; + use solana_transaction::sanitized::SanitizedTransaction; use solana_transaction_context::TransactionReturnData; use solana_transaction_error::{TransactionError, TransactionResult}; use solana_transaction_status::{ @@ -1535,6 +1625,7 @@ mod tests { assert!(store .write_transaction_status( slot, + 0, signature, keys_as_ref!(writable_keys), keys_as_ref!(readonly_keys), @@ -1559,6 +1650,7 @@ mod tests { assert!(store .write_transaction_status( slot, + 0, signature, keys_as_ref!(writable_keys), keys_as_ref!(readonly_keys), @@ -1596,6 +1688,7 @@ mod tests { assert!(store .write_transaction_status( slot_uno, + 0, sig_uno, keys_as_ref!(writable_keys), keys_as_ref!(readonly_keys), @@ -1625,6 +1718,7 @@ mod tests { assert!(store .write_transaction_status( slot_dos, + 0, sig_dos, keys_as_ref!(writable_keys), keys_as_ref!(readonly_keys), @@ -1689,11 +1783,17 @@ mod tests { .is_none()); // 1. Write first transaction and block time for relevant slot + let versioned_uno = sanitized_uno.to_versioned_transaction(); + let encoded_uno = serialize(&versioned_uno).unwrap(); + let locks_uno = sanitized_uno.get_account_locks_unchecked(); assert!(store .write_transaction( sig_uno, slot_uno, - &sanitized_uno, + 0, + locks_uno.writable, + locks_uno.readonly, + &encoded_uno, tx_uno.tx_with_meta.get_status_meta().unwrap(), ) .is_ok()); @@ -1715,11 +1815,17 @@ mod tests { .is_none()); // 2. Write second transaction and block time for relevant slot + let versioned_dos = sanitized_dos.to_versioned_transaction(); + let encoded_dos = serialize(&versioned_dos).unwrap(); + let locks_dos = sanitized_dos.get_account_locks_unchecked(); assert!(store .write_transaction( sig_dos, slot_dos, - &sanitized_dos, + 0, + locks_dos.writable, + locks_dos.readonly, + &encoded_dos, tx_dos.tx_with_meta.get_status_meta().unwrap(), ) .is_ok()); @@ -1755,6 +1861,7 @@ mod tests { assert!(store .write_transaction_status( slot_uno, + 0, signature_uno, keys_as_ref!(writable_keys), keys_as_ref!(readonly_keys), @@ -1779,6 +1886,7 @@ mod tests { assert!(store .write_transaction_status( slot_dos, + 0, signature_dos, keys_as_ref!(writable_keys), keys_as_ref!(readonly_keys), @@ -1796,6 +1904,7 @@ mod tests { assert!(store .write_transaction_status( slot_dos, + 1, signature_dos_2, keys_as_ref!(writable_keys), keys_as_ref!(readonly_keys), @@ -1823,6 +1932,7 @@ mod tests { assert!(store .write_transaction_status( slot_tres, + 0, signature_tres, keys_as_ref!(writable_keys), keys_as_ref!(readonly_keys), @@ -1844,6 +1954,7 @@ mod tests { assert!(store .write_transaction_status( slot_cuatro, + 0, signature_cuatro, keys_as_ref!(writable_keys), keys_as_ref!(readonly_keys), @@ -1865,6 +1976,7 @@ mod tests { assert!(store .write_transaction_status( slot_cinco, + 1, signature_cinco, keys_as_ref!(writable_keys), keys_as_ref!(readonly_keys), @@ -1888,6 +2000,7 @@ mod tests { assert!(store .write_transaction_status( slot_seis, + 0, signature_seis, keys_as_ref!(writable_keys), keys_as_ref!(readonly_keys), @@ -2122,6 +2235,7 @@ mod tests { let sig8 = Signature::new_unique(); let mut current_slot = 0; + let mut current_index = 0; let read_uno = { let (meta, writable_keys, readonly_keys) = create_transaction_status_meta(5); @@ -2141,16 +2255,19 @@ mod tests { ] { if *slot != current_slot { current_slot = *slot; + current_index = 0; } assert!(store .write_transaction_status( *slot, + current_index, *signature, keys_as_ref!(writable_keys.clone()), keys_as_ref!(readonly_keys.clone()), meta.clone(), ) .is_ok()); + current_index += 1; } read_uno @@ -2354,11 +2471,17 @@ mod tests { // 1. Write transactions and block time + memo for relevant slot { + let versioned_uno = sanitized_uno.to_versioned_transaction(); + let encoded_uno = serialize(&versioned_uno).unwrap(); + let locks_uno = sanitized_uno.get_account_locks_unchecked(); assert!(store .write_transaction( sig_uno, slot_uno, - &sanitized_uno, + 0, + locks_uno.writable, + locks_uno.readonly, + &encoded_uno, tx_uno.tx_with_meta.get_status_meta().unwrap(), ) .is_ok()); @@ -2377,11 +2500,17 @@ mod tests { } { + let versioned_dos = sanitized_dos.to_versioned_transaction(); + let encoded_dos = serialize(&versioned_dos).unwrap(); + let locks_dos = sanitized_dos.get_account_locks_unchecked(); assert!(store .write_transaction( sig_dos, slot_dos, - &sanitized_dos, + 0, + locks_dos.writable, + locks_dos.readonly, + &encoded_dos, tx_dos.tx_with_meta.get_status_meta().unwrap(), ) .is_ok()); @@ -2438,4 +2567,122 @@ mod tests { assert_eq!(sig_info_dos.memo, Some("Test Dos Memo".to_string())); } } + + #[test] + fn test_verify_transaction_signature() { + init_logger!(); + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let store = Ledger::open(ledger_path.path()).unwrap(); + + // Create a properly signed transaction + let from_keypair = Keypair::new(); + let to = Pubkey::new_unique(); + let blockhash = Hash::new_unique(); + let tx = solana_system_transaction::transfer( + &from_keypair, + &to, + 42, + blockhash, + ); + let versioned_tx = VersionedTransaction::from(tx); + let signature = versioned_tx.signatures[0]; + let slot = 10u64; + + // Encode and write the transaction to the ledger + let encoded = serialize(&versioned_tx).unwrap(); + let (meta, _, _) = create_transaction_status_meta(5); + let writable_keys = versioned_tx.message.static_account_keys()[..1] + .iter() + .collect(); + let readonly_keys = versioned_tx.message.static_account_keys()[1..] + .iter() + .collect(); + store + .write_transaction( + signature, + slot, + 0, + writable_keys, + readonly_keys, + &encoded, + meta, + ) + .unwrap(); + store.write_block(slot, 100, Hash::new_unique()).unwrap(); + + // Verify a properly signed transaction returns Some(true) + let result = store.verify_transaction_signature(&signature).unwrap(); + assert_eq!(result, Some(true)); + + // Verify a non-existent signature returns None + let random_sig = Signature::new_unique(); + let result = store.verify_transaction_signature(&random_sig).unwrap(); + assert_eq!(result, None); + } + + #[test] + fn test_verify_transaction_signature_not_found() { + init_logger!(); + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let store = Ledger::open(ledger_path.path()).unwrap(); + + // Query an empty ledger — no transaction exists + let sig = Signature::new_unique(); + let result = store.verify_transaction_signature(&sig).unwrap(); + assert_eq!(result, None); + } + + #[test] + fn test_verify_transaction_signature_invalid() { + init_logger!(); + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let store = Ledger::open(ledger_path.path()).unwrap(); + + // Build a transaction with a bogus signature (not matching keypair) + let from_keypair = Keypair::new(); + let to = Pubkey::new_unique(); + let blockhash = Hash::new_unique(); + let tx = solana_system_transaction::transfer( + &from_keypair, + &to, + 42, + blockhash, + ); + let mut versioned_tx = VersionedTransaction::from(tx); + + // Corrupt the signature so verification will fail + let real_sig = versioned_tx.signatures[0]; + versioned_tx.signatures[0] = Signature::new_unique(); + let bad_sig = versioned_tx.signatures[0]; + + let encoded = serialize(&versioned_tx).unwrap(); + let (meta, _, _) = create_transaction_status_meta(5); + let writable_keys = versioned_tx.message.static_account_keys()[..1] + .iter() + .collect(); + let readonly_keys = versioned_tx.message.static_account_keys()[1..] + .iter() + .collect(); + let slot = 10u64; + store + .write_transaction( + bad_sig, + slot, + 0, + writable_keys, + readonly_keys, + &encoded, + meta, + ) + .unwrap(); + store.write_block(slot, 100, Hash::new_unique()).unwrap(); + + // The corrupted signature should fail verification + let result = store.verify_transaction_signature(&bad_sig).unwrap(); + assert_eq!(result, Some(false)); + + // The original valid signature is not in the ledger + let result = store.verify_transaction_signature(&real_sig).unwrap(); + assert_eq!(result, None); + } } diff --git a/magicblock-ledger/tests/common.rs b/magicblock-ledger/tests/common.rs index ed246eae1..6bcce67ba 100644 --- a/magicblock-ledger/tests/common.rs +++ b/magicblock-ledger/tests/common.rs @@ -19,6 +19,7 @@ pub fn setup() -> Ledger { pub fn write_dummy_transaction( ledger: &Ledger, slot: Slot, + index: u32, ) -> (Hash, Signature) { let from = Keypair::new(); let to = Pubkey::new_unique(); @@ -28,8 +29,19 @@ pub fn write_dummy_transaction( let transaction = SanitizedTransaction::from_transaction_for_tests(tx); let status = TransactionStatusMeta::default(); let message_hash = *transaction.message_hash(); + let versioned = transaction.to_versioned_transaction(); + let encoded = bincode::serialize(&versioned).unwrap(); + let locks = transaction.get_account_locks_unchecked(); ledger - .write_transaction(signature, slot, &transaction, status) + .write_transaction( + signature, + slot, + index, + locks.writable, + locks.readonly, + &encoded, + status, + ) .expect("failed to write dummy transaction"); (message_hash, signature) diff --git a/magicblock-ledger/tests/get_block.rs b/magicblock-ledger/tests/get_block.rs index d74a5829b..0bb91a50a 100644 --- a/magicblock-ledger/tests/get_block.rs +++ b/magicblock-ledger/tests/get_block.rs @@ -42,8 +42,8 @@ fn test_get_block_transactions() { init_logger!(); let ledger = setup(); - let (slot_41_tx1, _) = write_dummy_transaction(&ledger, 41); - let (slot_41_tx2, _) = write_dummy_transaction(&ledger, 41); + let (slot_41_tx1, _) = write_dummy_transaction(&ledger, 41, 0); + let (slot_41_tx2, _) = write_dummy_transaction(&ledger, 41, 1); let slot_41_block_time = 410; let slot_41_block_hash = Hash::new_unique(); @@ -51,8 +51,8 @@ fn test_get_block_transactions() { .write_block(41, slot_41_block_time, slot_41_block_hash) .unwrap(); - let (slot_42_tx1, _) = write_dummy_transaction(&ledger, 42); - let (slot_42_tx2, _) = write_dummy_transaction(&ledger, 42); + let (slot_42_tx1, _) = write_dummy_transaction(&ledger, 42, 0); + let (slot_42_tx2, _) = write_dummy_transaction(&ledger, 42, 1); let slot_42_block_time = 420; let slot_42_block_hash = Hash::new_unique(); diff --git a/magicblock-ledger/tests/test_ledger_truncator.rs b/magicblock-ledger/tests/test_ledger_truncator.rs index 6801ae086..4528843d6 100644 --- a/magicblock-ledger/tests/test_ledger_truncator.rs +++ b/magicblock-ledger/tests/test_ledger_truncator.rs @@ -54,7 +54,7 @@ async fn test_truncator_not_purged_size() { ); for i in 0..NUM_TRANSACTIONS { - write_dummy_transaction(&ledger, i); + write_dummy_transaction(&ledger, i, 0); ledger.write_block(i, 0, Hash::new_unique()).unwrap() } let signatures = (0..NUM_TRANSACTIONS) @@ -85,7 +85,7 @@ async fn test_truncator_non_empty_ledger() { let ledger = Arc::new(setup()); let signatures = (0..FINAL_SLOT + 20) .map(|i| { - let (_, signature) = write_dummy_transaction(&ledger, i); + let (_, signature) = write_dummy_transaction(&ledger, i, 0); ledger.write_block(i, 0, Hash::new_unique()).unwrap(); signature }) @@ -128,7 +128,7 @@ async fn transaction_spammer( for _ in 0..num_of_iterations { for _ in 0..tx_per_operation { let slot = signatures.len() as u64; - let (_, signature) = write_dummy_transaction(&ledger, slot); + let (_, signature) = write_dummy_transaction(&ledger, slot, 0); ledger.write_block(slot, 0, Hash::new_unique()).unwrap(); signatures.push(signature); } @@ -186,7 +186,7 @@ async fn test_with_1gb_db() { break; } - write_dummy_transaction(&ledger, slot); + write_dummy_transaction(&ledger, slot, 0); ledger.write_block(slot, 0, Hash::new_unique()).unwrap(); slot += 1 } diff --git a/magicblock-processor/src/executor/mod.rs b/magicblock-processor/src/executor/mod.rs index 5cabbea94..5640205b9 100644 --- a/magicblock-processor/src/executor/mod.rs +++ b/magicblock-processor/src/executor/mod.rs @@ -1,15 +1,19 @@ use std::{ cmp::Ordering, + ops::Deref, sync::{Arc, RwLock}, }; use magicblock_accounts_db::{AccountsDb, GlobalSyncLock}; -use magicblock_core::link::{ - accounts::AccountUpdateTx, - transactions::{ - ScheduledTasksTx, TransactionProcessingMode, TransactionStatusTx, - TransactionToProcessRx, +use magicblock_core::{ + link::{ + accounts::AccountUpdateTx, + transactions::{ + ProcessableTransaction, ScheduledTasksTx, + TransactionProcessingMode, TransactionStatusTx, + }, }, + Slot, }; use magicblock_ledger::{LatestBlock, LatestBlockInner, Ledger}; use parking_lot::RwLockReadGuard; @@ -21,7 +25,11 @@ use solana_svm::transaction_processor::{ ExecutionRecordingConfig, TransactionBatchProcessor, TransactionProcessingConfig, TransactionProcessingEnvironment, }; -use tokio::{runtime::Builder, sync::mpsc::Sender}; +use solana_transaction::sanitized::SanitizedTransaction; +use tokio::{ + runtime::Builder, + sync::mpsc::{Receiver, Sender}, +}; use tracing::{info, instrument}; use crate::{ @@ -29,6 +37,19 @@ use crate::{ scheduler::{locks::ExecutorId, state::TransactionSchedulerState}, }; +pub(crate) struct IndexedTransaction { + pub(crate) slot: Slot, + pub(crate) index: u32, + pub(crate) txn: ProcessableTransaction, +} + +impl Deref for IndexedTransaction { + type Target = SanitizedTransaction; + fn deref(&self) -> &Self::Target { + &self.txn.transaction + } +} + /// A dedicated, single-threaded worker responsible for processing transactions. pub(super) struct TransactionExecutor { id: ExecutorId, @@ -45,7 +66,7 @@ pub(super) struct TransactionExecutor { environment: TransactionProcessingEnvironment<'static>, // Channels - rx: TransactionToProcessRx, + rx: Receiver, transaction_tx: TransactionStatusTx, accounts_tx: AccountUpdateTx, tasks_tx: ScheduledTasksTx, @@ -59,7 +80,7 @@ impl TransactionExecutor { pub(super) fn new( id: ExecutorId, state: &TransactionSchedulerState, - rx: TransactionToProcessRx, + rx: Receiver, ready_tx: Sender, programs_cache: Arc>>, ) -> Self { @@ -140,16 +161,16 @@ impl TransactionExecutor { tokio::select! { biased; txn = self.rx.recv() => { - let Some(txn) = txn else { break }; - match txn.mode { - TransactionProcessingMode::Execution(tx) => { - self.execute([txn.transaction], tx, false); + let Some(transaction) = txn else { break }; + match transaction.txn.mode { + TransactionProcessingMode::Execution(_) => { + self.execute(transaction, None); } TransactionProcessingMode::Simulation(tx) => { - self.simulate([txn.transaction], tx); + self.simulate([transaction.txn.transaction], tx); } - TransactionProcessingMode::Replay(tx) => { - self.execute([txn.transaction], Some(tx), true); + TransactionProcessingMode::Replay(ctx) => { + self.execute(transaction, Some(ctx.persist)); } } let _ = self.ready_tx.try_send(self.id); diff --git a/magicblock-processor/src/executor/processing.rs b/magicblock-processor/src/executor/processing.rs index e8a753e71..5520a06db 100644 --- a/magicblock-processor/src/executor/processing.rs +++ b/magicblock-processor/src/executor/processing.rs @@ -3,8 +3,9 @@ use magicblock_core::{ link::{ accounts::{AccountWithSlot, LockedAccount}, transactions::{ + ProcessableTransaction, TransactionProcessingMode, TransactionSimulationResult, TransactionStatus, - TxnExecutionResultTx, TxnSimulationResultTx, + TxnSimulationResultTx, }, }, tls::ExecutionTlsStash, @@ -28,48 +29,74 @@ use solana_transaction_status::{ }; use tracing::*; +use crate::executor::IndexedTransaction; + impl super::TransactionExecutor { /// Executes a transaction and conditionally commits its results. + /// + /// # Arguments + /// * `transaction` - The transaction to execute + /// * `tx` - Channel to send the execution result (None for replay) + /// * `persist` - Controls persistence behavior: + /// - `None`: Execution mode - notify subscribers, record to ledger, process tasks + /// - `Some(true)`: Replay with persist - record to ledger, no notifications + /// - `Some(false)`: Replay without persist - no side effects pub(super) fn execute( &self, - transaction: [SanitizedTransaction; 1], - tx: TxnExecutionResultTx, - is_replay: bool, + mut transaction: IndexedTransaction, + persist: Option, ) { TRANSACTION_COUNT.inc(); - let (result, balances) = self.process(&transaction); - let [txn] = transaction; + let (result, balances) = { + let txn = [transaction.txn.transaction]; + let result = self.process(&txn); + let [txn] = txn; + transaction.txn.transaction = txn; + result + }; // 1. Handle Loading/Processing Failures let processed = match result { Ok(processed) => processed, Err(err) => { - return self.handle_failure(txn, err, None, tx); + return self.handle_failure(transaction, err, None); } }; // 2. Commit Account State (DB Update) // Note: Failed transactions still pay fees, so we attempt commit even on execution failure. - let fee_payer = *txn.fee_payer(); - if let Err(err) = self.commit_accounts(fee_payer, &processed, is_replay) - { + let fee_payer = *transaction.fee_payer(); + // Only send account updates for Execution mode (persist is None) + let notify = persist.is_none(); + if let Err(err) = self.commit_accounts(fee_payer, &processed, notify) { return self.handle_failure( - txn, + transaction, TransactionError::CommitCancelled, Some(vec![err.to_string()]), - tx, ); } let status = processed.status(); // 3. Post-Processing (Tasks & Ledger) - if status.is_ok() && !is_replay { + // Only process scheduled tasks for successful transactions in Execution mode + if status.is_ok() && persist.is_none() { self.process_scheduled_tasks(); } - - if !is_replay { - self.record_transaction(txn, processed, balances); + let tx = if let TransactionProcessingMode::Execution(ref mut tx) = + transaction.txn.mode + { + tx.take() + } else { + None + }; + // Record to ledger for Execution mode (persist is None) or Replay with persist=true + if persist.unwrap_or(true) { + if let Err(err) = + self.record_transaction(transaction, processed, balances) + { + error!(error = ?err, "Failed to record transaction to ledger"); + } } ExecutionTlsStash::clear(); @@ -151,20 +178,21 @@ impl super::TransactionExecutor { /// Common handler for transaction failures (load error or commit error). fn handle_failure( &self, - txn: SanitizedTransaction, + mut txn: IndexedTransaction, err: TransactionError, logs: Option>, - tx: TxnExecutionResultTx, ) { FAILED_TRANSACTIONS_COUNT.inc(); - self.record_failure(txn, Err(err.clone()), logs); // Even on failure, ensure stash is clear (though likely empty if load failed). ExecutionTlsStash::clear(); - if let Some(tx) = tx { - let _ = tx.send(Err(err)); + if let TransactionProcessingMode::Execution(ref mut tx) = txn.txn.mode { + if let Some(tx) = tx.take() { + let _ = tx.send(Err(err.clone())); + } } + self.record_failure(txn, Err(err), logs); } fn process_scheduled_tasks(&self) { @@ -178,10 +206,10 @@ impl super::TransactionExecutor { /// Writes a fully processed transaction to the Ledger. fn record_transaction( &self, - txn: SanitizedTransaction, + txn: IndexedTransaction, result: ProcessedTransaction, balances: AccountsBalances, - ) { + ) -> Result<(), Box> { let meta = match result { ProcessedTransaction::Executed(executed) => TransactionStatusMeta { fee: executed.loaded_transaction.fee_details.total_fee(), @@ -211,13 +239,13 @@ impl super::TransactionExecutor { }, }; - self.write_to_ledger(txn, meta); + self.write_to_ledger(txn, meta) } /// Writes a failed transaction (load or commit error) to the Ledger. fn record_failure( &self, - txn: SanitizedTransaction, + txn: IndexedTransaction, status: TransactionResult<()>, logs: Option>, ) { @@ -229,38 +257,62 @@ impl super::TransactionExecutor { log_messages: logs, ..Default::default() }; - self.write_to_ledger(txn, meta); + if let Err(err) = self.write_to_ledger(txn, meta) { + error!(error = ?err, "Failed to record failed transaction to ledger"); + } } fn write_to_ledger( &self, - txn: SanitizedTransaction, + txn: IndexedTransaction, meta: TransactionStatusMeta, - ) { + ) -> Result<(), Box> { let signature = *txn.signature(); - let index = match self.ledger.write_transaction( - signature, - self.processor.slot, - &txn, - // TODO(bmuddha): perf: remove clone with the new ledger - meta.clone(), - ) { - Ok(i) => i, - Err(error) => { - error!(error = ?error, "Failed to commit transaction to ledger"); - return; + let slot = txn.slot; + let index = txn.index; + + let ProcessableTransaction { + transaction, + encoded, + .. + } = txn.txn; + + // Use pre-encoded bytes or serialize on the spot + let encoded = match encoded { + Some(bytes) => bytes, + None => { + let versioned = transaction.to_versioned_transaction(); + bincode::serialize(&versioned) + .map_err(|e| Box::new(e) as Box)? } }; + let tx_account_locks = transaction.get_account_locks_unchecked(); + + let result = self.ledger.write_transaction( + signature, + slot, + index, + tx_account_locks.writable, + tx_account_locks.readonly, + &encoded, + meta.clone(), + ); + if let Err(error) = result { + error!(error = ?error, "Failed to commit transaction to ledger"); + return Err(error.into()); + } + let status = TransactionStatus { - slot: self.processor.slot, + slot, index, - txn, + txn: transaction, meta, }; // Notify listeners let _ = self.transaction_tx.send(status); + Ok(()) } /// Persists account changes to AccountsDb and notifies listeners. @@ -268,7 +320,7 @@ impl super::TransactionExecutor { &self, fee_payer: Pubkey, result: &ProcessedTransaction, - is_replay: bool, + notify: bool, ) -> AccountsDbResult<()> { let succeeded = result.status().is_ok(); let accounts = match result { @@ -293,10 +345,9 @@ impl super::TransactionExecutor { &fo.rollback_accounts { // Temporary slice construction to match expected type - // This is slightly inefficient but safe; the vector here is tiny (1 item) return self.insert_and_notify( &[(fee_payer, fee_payer_account.clone())], - is_replay, + notify, false, ); } @@ -309,13 +360,13 @@ impl super::TransactionExecutor { .map(|(_, acc)| acc.privileged()) .unwrap_or(false); - self.insert_and_notify(accounts, is_replay, privileged) + self.insert_and_notify(accounts, notify, privileged) } fn insert_and_notify( &self, accounts: &[(Pubkey, solana_account::AccountSharedData)], - is_replay: bool, + notify: bool, privileged: bool, ) -> AccountsDbResult<()> { // Filter: Persist only dirty or privileged accounts @@ -325,7 +376,7 @@ impl super::TransactionExecutor { self.accountsdb.insert_batch(to_commit)?; - if is_replay { + if !notify { return Ok(()); } diff --git a/magicblock-processor/src/scheduler/coordinator.rs b/magicblock-processor/src/scheduler/coordinator.rs index 57e04ca11..b41de38f3 100644 --- a/magicblock-processor/src/scheduler/coordinator.rs +++ b/magicblock-processor/src/scheduler/coordinator.rs @@ -6,6 +6,14 @@ //! - **Lock contention**: Failed transactions queued behind blocking executor //! - **FIFO ordering**: Transactions processed in ID order within each blocked queue //! +//! # Coordination Modes +//! +//! The scheduler operates in one of two modes: +//! - **Primary**: For validators accepting client transactions. Allows concurrent +//! execution of independent transactions, with a limit on blocked transactions. +//! - **Replica**: For validators replaying transactions from a primary. Enforces +//! strict ordering by allowing only one pending blocked transaction at a time. +//! //! # Key Types //! //! - `ExecutorId`: Unique identifier for each executor worker (0..N) @@ -14,13 +22,19 @@ use std::{cmp::Ordering, collections::BinaryHeap}; -use magicblock_core::link::transactions::ProcessableTransaction; +use magicblock_core::link::transactions::{ + ProcessableTransaction, TransactionProcessingMode, +}; use magicblock_metrics::metrics::MAX_LOCK_CONTENTION_QUEUE_SIZE; +use tracing::{error, warn}; use super::locks::{ next_transaction_id, ExecutorId, LocksCache, TransactionId, }; -use crate::scheduler::locks::RcLock; +use crate::scheduler::{locks::RcLock, state::SchedulerMode}; + +/// Maximum blocked transactions per executor before rejecting new ones (Primary mode). +const BLOCKED_TXN_MULTIPLIER: usize = 2; /// Coordinates transaction scheduling across multiple executor workers. /// @@ -42,9 +56,53 @@ pub(super) struct ExecutionCoordinator { acquired_locks: Vec>, /// Global account lock registry locks: LocksCache, + /// Current coordination mode (Primary or Replica) + mode: CoordinationMode, +} + +/// Coordination mode determining how transactions are scheduled. +pub(super) enum CoordinationMode { + /// Primary mode: accepts client transactions, allows concurrent execution. + Primary(PrimaryMode), + /// Replica mode: replays transactions, enforces strict ordering. + Replica(ReplicaMode), +} + +/// State for Primary mode scheduling. +pub(super) struct PrimaryMode { + /// Current number of blocked transactions. + blocked_txn_count: usize, + /// Maximum allowed blocked transactions before rejecting new ones. + max_blocked_txn: usize, +} + +/// State for Replica mode scheduling. +/// +/// In Replica mode, only one transaction can be pending (blocked) at a time. +/// This ensures strict ordering during replay: when a transaction is blocked, +/// new transactions wait in the channel until it completes. +#[derive(Default)] +pub(super) struct ReplicaMode { + /// ID of the currently pending blocked transaction, if any. + pending: Option, +} + +impl CoordinationMode { + /// Returns true if the scheduler is ready to accept new transactions. + fn is_ready(&self) -> bool { + match self { + // Primary: ready if we haven't hit the blocked transaction limit + Self::Primary(m) => m.blocked_txn_count < m.max_blocked_txn, + // Replica: ready only if no transaction is pending (strict ordering) + Self::Replica(m) => m.pending.is_none(), + } + } } impl ExecutionCoordinator { + /// Creates a new coordinator starting in Replica mode. + /// + /// Starts in Replica mode to allow ledger replay before switching to Primary. pub(super) fn new(count: usize) -> Self { Self { blocked_transactions: (0..count) @@ -53,12 +111,13 @@ impl ExecutionCoordinator { acquired_locks: (0..count).map(|_| Vec::new()).collect(), ready_executors: (0..count as u32).collect(), locks: LocksCache::default(), + mode: CoordinationMode::Replica(ReplicaMode::default()), } } #[inline] pub(super) fn is_ready(&self) -> bool { - !self.ready_executors.is_empty() + !self.ready_executors.is_empty() && self.mode.is_ready() } #[inline] @@ -123,6 +182,20 @@ impl ExecutionCoordinator { blocker: ExecutorId, txn: TransactionWithId, ) { + match &mut self.mode { + CoordinationMode::Replica(r) => { + // In Replica mode, track the pending transaction ID. + // The debug_assert ensures we don't queue when one is already pending + // (enforced by is_ready() returning false when pending.is_some()). + debug_assert!(r.pending.is_none()); + if r.pending.replace(txn.id).is_some() { + error!("Invariant violation: replaced pending transaction"); + } + } + CoordinationMode::Primary(p) => { + p.blocked_txn_count += 1; + } + } let heap = &mut self.blocked_transactions[blocker as usize]; heap.push(txn); MAX_LOCK_CONTENTION_QUEUE_SIZE @@ -133,7 +206,65 @@ impl ExecutionCoordinator { &mut self, executor: ExecutorId, ) -> Option { - self.blocked_transactions[executor as usize].pop() + let txn = self.blocked_transactions[executor as usize].pop(); + match &mut self.mode { + CoordinationMode::Replica(r) => { + // Clear pending if this was the pending transaction. + if r.pending == txn.as_ref().map(|txn| txn.id) { + r.pending.take(); + } + } + CoordinationMode::Primary(p) => { + p.blocked_txn_count = + p.blocked_txn_count.saturating_sub(txn.is_some() as usize); + } + } + txn + } + + /// Transitions to the specified execution mode. + /// + /// No-op if already in the target mode (logs a warning). + pub(super) fn transition_to(&mut self, mode: SchedulerMode) { + match mode { + SchedulerMode::Primary => { + if let CoordinationMode::Primary(_) = self.mode { + warn!("Already in primary mode"); + return; + } + let mode = PrimaryMode { + blocked_txn_count: 0, + max_blocked_txn: self.blocked_transactions.len() + * BLOCKED_TXN_MULTIPLIER, + }; + self.mode = CoordinationMode::Primary(mode); + } + SchedulerMode::Replica => { + if let CoordinationMode::Replica(_) = self.mode { + warn!("Already in replica mode"); + return; + } + self.mode = CoordinationMode::Replica(Default::default()); + } + } + } + + /// Checks if a transaction mode is compatible with the current coordination mode. + /// + /// - Primary mode: rejects Replay transactions (only client Execution allowed) + /// - Replica mode: rejects Execution transactions (only Replay allowed) + /// - Simulations are allowed in all modes + pub(super) fn is_transaction_allowed( + &self, + mode: &TransactionProcessingMode, + ) -> bool { + use CoordinationMode::*; + use TransactionProcessingMode::*; + let mode_mismatch = matches!( + (&self.mode, mode), + (Primary(_), Replay(_)) | (Replica(_), Execution(_)) + ); + !mode_mismatch } } diff --git a/magicblock-processor/src/scheduler/mod.rs b/magicblock-processor/src/scheduler/mod.rs index e10971f2c..90cdec4e9 100644 --- a/magicblock-processor/src/scheduler/mod.rs +++ b/magicblock-processor/src/scheduler/mod.rs @@ -15,8 +15,12 @@ use std::{ use coordinator::{ExecutionCoordinator, TransactionWithId}; use locks::{ExecutorId, MAX_SVM_EXECUTORS}; use magicblock_accounts_db::{traits::AccountsBank, AccountsDb}; -use magicblock_core::link::transactions::{ - ProcessableTransaction, TransactionToProcessRx, +use magicblock_core::{ + link::transactions::{ + ProcessableTransaction, TransactionProcessingMode, + TransactionToProcessRx, + }, + Slot, }; use magicblock_ledger::LatestBlock; use solana_account::{from_account, to_account}; @@ -31,7 +35,10 @@ use tokio::{ use tokio_util::sync::CancellationToken; use tracing::{error, info, instrument, warn}; -use crate::executor::{SimpleForkGraph, TransactionExecutor}; +use crate::{ + executor::{IndexedTransaction, SimpleForkGraph, TransactionExecutor}, + scheduler::state::SchedulerMode, +}; // Capacity of 1 ensures executor processes one transaction at a time const EXECUTOR_QUEUE_CAPACITY: usize = 1; @@ -40,22 +47,28 @@ const EXECUTOR_QUEUE_CAPACITY: usize = 1; /// /// Runs in a dedicated thread with a single-threaded Tokio runtime. pub struct TransactionScheduler { - /// Manages executor pool and account locking + /// Manages executor pool, account locking, and coordination mode coordinator: ExecutionCoordinator, /// Incoming transaction queue from global processor transactions_rx: TransactionToProcessRx, /// Executor readiness notifications (workers signal when idle) ready_rx: Receiver, /// Sender channels to each executor worker - executors: Vec>, + executors: Vec>, /// Shared BPF program cache program_cache: Arc>>, /// Accounts database (for sysvar updates on slot transition) accountsdb: Arc, /// Latest block metadata (slot, clock, blockhash) latest_block: LatestBlock, - /// Global shutdown signal + /// Global shutdown signal. shutdown: CancellationToken, + /// Receiver for mode transition commands. + mode_rx: Receiver, + /// Current active slot (unfinalized block). + slot: Slot, + /// Transaction index within the current block being assembled. + index: u32, } impl TransactionScheduler { @@ -96,6 +109,9 @@ impl TransactionScheduler { program_cache, accountsdb: state.accountsdb, shutdown: state.shutdown, + mode_rx: state.mode_rx, + slot: state.ledger.latest_block().load().slot, + index: 0, } } @@ -113,8 +129,11 @@ impl TransactionScheduler { /// Main event loop: processes executor readiness, new transactions, and slot transitions. /// - /// Uses `biased` select to prioritize ready workers over incoming transactions, - /// ensuring the pipeline stays full. + /// Uses `biased` select to prioritize in this order: + /// 1. Slot transitions + /// 2. Executor readiness + /// 3. Mode switches (before transactions to avoid race condition) + /// 4. New transactions #[instrument(skip(self))] async fn run(mut self) { let mut block_produced = self.latest_block.subscribe(); @@ -123,6 +142,9 @@ impl TransactionScheduler { biased; Ok(()) = block_produced.recv() => self.transition_to_new_slot(), Some(executor) = self.ready_rx.recv() => self.handle_ready_executor(executor), + Some(mode) = self.mode_rx.recv() => { + self.coordinator.transition_to(mode); + } Some(txn) = self.transactions_rx.recv(), if self.coordinator.is_ready() => { self.handle_new_transaction(txn); } @@ -143,6 +165,10 @@ impl TransactionScheduler { } fn handle_new_transaction(&mut self, txn: ProcessableTransaction) { + if !self.coordinator.is_transaction_allowed(&txn.mode) { + warn!("Dropping transaction due to mode incompatibility"); + return; + } // SAFETY: // the caller ensured that executor was ready before invoking this // method so the get_ready_executor should always return Some here @@ -183,6 +209,16 @@ impl TransactionScheduler { Ok(txn) => txn, Err(blocker) => return Some(blocker), }; + let (slot, index) = + if let TransactionProcessingMode::Replay(ctx) = txn.mode { + (ctx.slot, ctx.index) + } else { + let index = self.index; + self.index += 1; + (self.slot, index) + }; + + let txn = IndexedTransaction { slot, index, txn }; let _ = self.executors[executor as usize].try_send(txn).inspect_err( |e| error!(executor, error = ?e, "Executor channel send failed"), @@ -190,9 +226,11 @@ impl TransactionScheduler { None } - fn transition_to_new_slot(&self) { + fn transition_to_new_slot(&mut self) { let block = self.latest_block.load(); let mut cache = self.program_cache.write().unwrap(); + self.slot = block.slot; + self.index = 0; // Prune stale programs and re-root to new slot cache.prune(block.slot, 0); diff --git a/magicblock-processor/src/scheduler/state.rs b/magicblock-processor/src/scheduler/state.rs index 95d640442..42a5392d1 100644 --- a/magicblock-processor/src/scheduler/state.rs +++ b/magicblock-processor/src/scheduler/state.rs @@ -31,6 +31,7 @@ use solana_program_runtime::{ }; use solana_pubkey::Pubkey; use solana_svm::transaction_processor::TransactionProcessingEnvironment; +use tokio::sync::mpsc::Receiver; use tokio_util::sync::CancellationToken; use crate::{executor::SimpleForkGraph, syscalls::SyscallMatmulI8}; @@ -54,6 +55,9 @@ pub struct TransactionSchedulerState { // === Configuration === pub is_auto_airdrop_lamports_enabled: bool, pub shutdown: CancellationToken, + /// Channel for switching scheduler execution mode at runtime. + /// Send `SchedulerMode::Primary` after ledger replay to begin processing client transactions. + pub mode_rx: Receiver, } impl TransactionSchedulerState { @@ -142,3 +146,15 @@ impl TransactionSchedulerState { } } } + +/// Scheduler execution mode command. +/// +/// Send via channel to transition the scheduler between modes. +/// See [`CoordinationMode`](super::coordinator::CoordinationMode) for internal state. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum SchedulerMode { + /// Accept client transactions with concurrent execution. + Primary, + /// Replay transactions with strict ordering. + Replica, +} diff --git a/magicblock-processor/src/scheduler/tests.rs b/magicblock-processor/src/scheduler/tests.rs index 9aa5c7cd3..656dd5782 100644 --- a/magicblock-processor/src/scheduler/tests.rs +++ b/magicblock-processor/src/scheduler/tests.rs @@ -4,10 +4,12 @@ //! - Lock semantics (write/write, write/read, read/write contention) //! - FIFO ordering within blocked queues //! - Executor pool management +//! - Coordination modes (Primary/Replica) //! - Edge cases (empty transactions, duplicate accounts) use magicblock_core::link::transactions::{ - ProcessableTransaction, SanitizeableTransaction, TransactionProcessingMode, + ProcessableTransaction, ReplayPosition, SanitizeableTransaction, + TransactionProcessingMode, }; use solana_keypair::Keypair; use solana_program::{ @@ -18,14 +20,16 @@ use solana_pubkey::Pubkey; use solana_signer::Signer; use solana_transaction::Transaction; -use super::coordinator::{ExecutionCoordinator, TransactionWithId}; +use super::{ + coordinator::{ExecutionCoordinator, TransactionWithId}, + state::SchedulerMode, +}; -/// Creates a mock transaction with the specified accounts. -/// -/// # Arguments -/// -/// * `accounts` - Slice of `(Pubkey, is_writable)` tuples -fn mock_txn(accounts: &[(Pubkey, bool)]) -> TransactionWithId { +/// Creates a mock transaction with the specified accounts and processing mode. +fn mock_txn_with_mode( + accounts: &[(Pubkey, bool)], + mode: TransactionProcessingMode, +) -> TransactionWithId { let payer = Keypair::new(); let instructions: Vec = accounts .iter() @@ -48,11 +52,24 @@ fn mock_txn(accounts: &[(Pubkey, bool)]) -> TransactionWithId { TransactionWithId::new(ProcessableTransaction { transaction: transaction.sanitize(false).unwrap(), - mode: TransactionProcessingMode::Execution(None), + mode, encoded: None, }) } +/// Creates a mock execution transaction with the specified accounts. +fn mock_txn(accounts: &[(Pubkey, bool)]) -> TransactionWithId { + mock_txn_with_mode(accounts, TransactionProcessingMode::Execution(None)) +} + +/// Creates a mock replay transaction with the specified accounts. +fn mock_replay_txn( + accounts: &[(Pubkey, bool)], + position: ReplayPosition, +) -> TransactionWithId { + mock_txn_with_mode(accounts, TransactionProcessingMode::Replay(position)) +} + // ============================================================================= // Lock Semantics // ============================================================================= @@ -143,6 +160,8 @@ fn partial_locks_released_on_failure() { #[test] fn blocked_transactions_dequeued_in_fifo_order() { let mut c = ExecutionCoordinator::new(4); + // Switch to primary mode for tests that need multiple blocked transactions + c.transition_to(SchedulerMode::Primary); let acc = Pubkey::new_unique(); let e0 = c.get_ready_executor().unwrap(); @@ -273,3 +292,68 @@ fn transaction_with_duplicate_accounts() { .try_schedule(e, mock_txn(&[(acc, true), (acc, true)])) .is_ok()); } + +// ============================================================================= +// Coordination Modes +// ============================================================================= + +#[test] +fn replica_mode_blocks_new_transactions_when_pending() { + let mut c = ExecutionCoordinator::new(2); + let acc = Pubkey::new_unique(); + let position = ReplayPosition { + slot: 0, + index: 0, + persist: false, + }; + + // First transaction holds the lock on acc + let e0 = c.get_ready_executor().unwrap(); + assert!(c + .try_schedule(e0, mock_replay_txn(&[(acc, true)], position)) + .is_ok()); + + // Second transaction is BLOCKED because it conflicts with e0's lock + // This sets pending in Replica mode + let e1 = c.get_ready_executor().unwrap(); + assert!(c + .try_schedule(e1, mock_replay_txn(&[(acc, true)], position)) + .is_err()); + + // is_ready should return false because pending is set + assert!(!c.is_ready()); +} + +#[test] +fn replica_mode_unblocks_when_pending_completes() { + let mut c = ExecutionCoordinator::new(2); + let acc = Pubkey::new_unique(); + let position = ReplayPosition { + slot: 0, + index: 0, + persist: false, + }; + + // First transaction holds the lock + let e0 = c.get_ready_executor().unwrap(); + assert!(c + .try_schedule(e0, mock_replay_txn(&[(acc, true)], position)) + .is_ok()); + + // Second transaction is blocked and sets pending + let e1 = c.get_ready_executor().unwrap(); + assert!(c + .try_schedule(e1, mock_replay_txn(&[(acc, true)], position)) + .is_err()); + assert!(!c.is_ready()); // pending is set + + // Complete e0's transaction (unlock accounts) + c.unlock_accounts(e0); + + // Get the blocked transaction from the queue + let blocked = c.next_blocked_transaction(e0); + assert!(blocked.is_some()); + + // Now is_ready should be true again (pending cleared) + assert!(c.is_ready()); +} diff --git a/magicblock-processor/tests/replay.rs b/magicblock-processor/tests/replay.rs index 3403f7cb8..81533e35a 100644 --- a/magicblock-processor/tests/replay.rs +++ b/magicblock-processor/tests/replay.rs @@ -11,17 +11,20 @@ use solana_program::{ use solana_pubkey::Pubkey; use solana_signer::Signer; use solana_transaction::sanitized::SanitizedTransaction; +use solana_transaction_status::TransactionStatusMeta; use test_kit::ExecutionTestEnv; const ACCOUNTS_COUNT: usize = 8; const TIMEOUT: Duration = Duration::from_millis(100); -/// Sets up a replay scenario: Transaction is in Ledger, but AccountsDb is reverted to pre-tx state. -async fn setup_replay_scenario( +/// Sets up a replay scenario in Replica mode: +/// - Transaction is written directly to Ledger (no execution) +/// - AccountsDb remains in pre-transaction state +fn setup_replay_scenario_replica( env: &ExecutionTestEnv, ix: GuineaInstruction, ) -> (SanitizedTransaction, Vec) { - // 1. Create Accounts & Build Instruction + // 1. Create Accounts let accounts: Vec<_> = (0..ACCOUNTS_COUNT) .map(|_| { env.create_account_with_config(LAMPORTS_PER_SOL, 128, guinea::ID) @@ -34,62 +37,67 @@ async fn setup_replay_scenario( .collect(); let pubkeys: Vec<_> = accounts.iter().map(|a| a.pubkey()).collect(); - // 2. Snapshot Pre-State - // Critical: ensure_owned() is required to detach the snapshot from the DB's internal ARC, - // ensuring we hold a distinct copy of the data before modification. - let pre_state: Vec<_> = pubkeys - .iter() - .map(|pk| { - let mut acc = env.accountsdb.get_account(pk).unwrap(); - acc.ensure_owned(); - (*pk, acc) - }) - .collect(); - - // 3. Execute Transaction (Updates Ledger & DB) + // 2. Build Transaction let ix = Instruction::new_with_bincode(guinea::ID, &ix, metas); let txn = env.build_transaction(&[ix]); - let sig = txn.signatures[0]; - - env.execute_transaction(txn).await.unwrap(); - - // 4. Revert DB to Pre-State (Simulating "Catch-up" needed) - for (pk, acc) in pre_state { - let _ = env.accountsdb.insert_account(&pk, &acc); + let sanitized = txn.sanitize(false).unwrap(); + let sig = *sanitized.signature(); + + // 3. Write transaction to ledger directly (without executing) + // This simulates a transaction that was recorded by the primary + let meta = TransactionStatusMeta { + fee: 5000, + pre_balances: pubkeys.iter().map(|_| LAMPORTS_PER_SOL).collect(), + post_balances: pubkeys.iter().map(|_| LAMPORTS_PER_SOL).collect(), + status: Ok(()), + ..Default::default() + }; + let versioned = sanitized.to_versioned_transaction(); + let encoded = bincode::serialize(&versioned).unwrap(); + let locks = sanitized.get_account_locks_unchecked(); + env.ledger + .write_transaction( + sig, + env.ledger.latest_block().load().slot, + 0, // index + locks.writable, + locks.readonly, + &encoded, + meta, + ) + .expect("Failed to write transaction to ledger"); + + // 4. Verify accounts are still in pre-transaction state + for pubkey in &pubkeys { + let account = env.accountsdb.get_account(pubkey).unwrap(); + assert_eq!( + account.data()[0], + 0, + "Account should be in pre-tx state before replay" + ); } - // 5. Fetch Confirmed Transaction from Ledger - let transaction = env - .ledger - .get_complete_transaction(sig, u64::MAX) - .unwrap() - .expect("Transaction should be in ledger") - .get_transaction() - .sanitize(false) - .unwrap(); - - // 6. Drain channels (Cleanup notifications from the setup execution) - while env.dispatch.transaction_status.try_recv().is_ok() {} - while env.dispatch.account_update.try_recv().is_ok() {} - - (transaction, pubkeys) + (sanitized, pubkeys) } #[tokio::test] pub async fn test_replay_state_transition() { - let env = ExecutionTestEnv::new(); - let (txn, pubkeys) = - setup_replay_scenario(&env, GuineaInstruction::WriteByteToData(42)) - .await; + // Run in Replica mode (scheduler starts in Replica, no mode switch) + let env = ExecutionTestEnv::new_replica_mode(1, false); + + let (txn, pubkeys) = setup_replay_scenario_replica( + &env, + GuineaInstruction::WriteByteToData(42), + ); - // 1. Verify Pre-Replay State (Reverted) + // 1. Verify Pre-Replay State for pubkey in &pubkeys { let account = env.accountsdb.get_account(pubkey).unwrap(); assert_eq!(account.data()[0], 0, "Account should be in pre-tx state"); } - // 2. Perform Replay - assert!(env.replay_transaction(txn).await.is_ok()); + // 2. Perform Replay (persist=false: no status notifications) + assert!(env.replay_transaction(false, txn).await.is_ok()); // 3. Verify No Side Effects (Notifications) assert!( diff --git a/magicblock-processor/tests/replica_ordering.rs b/magicblock-processor/tests/replica_ordering.rs new file mode 100644 index 000000000..d52f29dad --- /dev/null +++ b/magicblock-processor/tests/replica_ordering.rs @@ -0,0 +1,363 @@ +//! Integration tests for Replica mode transaction ordering enforcement. +//! +//! These tests verify that in Replica mode, when transactions conflict on accounts, +//! they are executed in strict submission order (FIFO), even under concurrent pressure. + +use std::{ + collections::HashMap, + time::{Duration, Instant}, +}; + +use guinea::GuineaInstruction; +use magicblock_core::link::transactions::ReplayPosition; +use solana_account::ReadableAccount; +use solana_program::{ + instruction::{AccountMeta, Instruction}, + native_token::LAMPORTS_PER_SOL, +}; +use solana_pubkey::Pubkey; +use solana_signature::Signature; +use solana_transaction::Transaction; +use test_kit::{ExecutionTestEnv, Signer}; +use tokio::time::timeout; + +const STRESS_TIMEOUT: Duration = Duration::from_secs(30); +const DEFAULT_BALANCE: u64 = LAMPORTS_PER_SOL * 10; + +// --- Helpers --- + +fn setup_replica_env(executors: u32) -> ExecutionTestEnv { + ExecutionTestEnv::new_replica_mode(executors, true) +} + +fn create_accounts(env: &mut ExecutionTestEnv, count: usize) -> Vec { + (0..count) + .map(|_| { + env.create_account_with_config(DEFAULT_BALANCE, 128, guinea::ID) + .pubkey() + }) + .collect() +} + +fn tx_write( + env: &mut ExecutionTestEnv, + account: Pubkey, + val: u8, +) -> Transaction { + let ix = Instruction::new_with_bincode( + guinea::ID, + &GuineaInstruction::WriteByteToData(val), + vec![AccountMeta::new(account, false)], + ); + env.build_transaction(&[ix]) +} + +fn tx_transfer( + env: &mut ExecutionTestEnv, + from: Pubkey, + to: Pubkey, +) -> Transaction { + let ix = Instruction::new_with_bincode( + guinea::ID, + &GuineaInstruction::Transfer(1000), + vec![AccountMeta::new(from, false), AccountMeta::new(to, false)], + ); + env.build_transaction(&[ix]) +} + +/// Collects execution statuses until all expected signatures are accounted for or timeout. +async fn collect_statuses( + env: &mut ExecutionTestEnv, + count: usize, + limit: Duration, + context: &str, +) -> Vec { + let start = Instant::now(); + let mut results = Vec::with_capacity(count); + + while results.len() < count { + if start.elapsed() > limit { + panic!( + "[{context}] Timeout waiting for transactions. Got {}/{}.", + results.len(), + count + ); + } + if let Ok(Ok(status)) = timeout( + Duration::from_millis(100), + env.dispatch.transaction_status.recv_async(), + ) + .await + { + assert!( + status.meta.status.is_ok(), + "[{context}] Transaction {} failed: {:?}", + status.txn.signatures()[0], + status.meta.status + ); + results.push(status.txn.signatures()[0]); + } + } + results +} + +/// Verifies that transactions were processed in the exact order they were submitted. +async fn verify_ordered( + env: &mut ExecutionTestEnv, + sigs: &[Signature], + limit: Duration, + context: &str, +) { + let received = collect_statuses(env, sigs.len(), limit, context).await; + assert_eq!( + received, + sigs.to_vec(), + "[{context}] Execution order mismatch.\nExpected: {sigs:?}\nGot: {received:?}" + ); +} + +/// Submits all replay transactions sequentially, then starts the scheduler. +/// Returns signatures in submission order. +async fn submit_all_and_start( + env: &mut ExecutionTestEnv, + txs: Vec, +) -> Vec { + let sigs: Vec = txs.iter().map(|tx| tx.signatures[0]).collect(); + + // Submit all transactions sequentially to preserve order + for (i, tx) in txs.into_iter().enumerate() { + let position = ReplayPosition { + slot: env.accountsdb.slot(), + index: i as u32, + persist: true, + }; + env.transaction_scheduler + .replay(position, tx) + .await + .expect("Failed to submit transaction"); + } + + // Now start the scheduler + env.run_scheduler(); + env.advance_slot(); + + sigs +} + +// --- Tests --- + +/// Stress test: 100 conflicting writes to a single account. +/// In Replica mode, these MUST execute in strict FIFO order. +#[tokio::test] +async fn test_replica_stress_single_account_writes() { + let ctx = "Replica Stress Single Account"; + let mut env = setup_replica_env(8); + let count = 100; + let acc = create_accounts(&mut env, 1)[0]; + + let mut txs = Vec::with_capacity(count); + for i in 0..count { + env.advance_slot(); + txs.push(tx_write(&mut env, acc, (i % 256) as u8)); + } + + let sigs = submit_all_and_start(&mut env, txs).await; + verify_ordered(&mut env, &sigs, STRESS_TIMEOUT, ctx).await; + + // Final state must match last write (count - 1) + let final_val = env.get_account(acc).data()[0]; + assert_eq!( + final_val, + ((count - 1) % 256) as u8, + "[{ctx}] Final data value mismatch" + ); +} + +/// Stress test: Multiple accounts with cross-conflicts. +/// Transactions form a chain where each conflicts with the next. +#[tokio::test] +async fn test_replica_stress_conflict_chain() { + let ctx = "Replica Stress Conflict Chain"; + let mut env = setup_replica_env(8); + let count = 100; + let accs = create_accounts(&mut env, count + 1); + + // Build a chain: T0 writes A0+A1, T1 writes A1+A2, etc. + // Each transaction conflicts with its neighbors + let mut txs = Vec::with_capacity(count); + for i in 0..count { + env.advance_slot(); + txs.push(tx_transfer(&mut env, accs[i], accs[i + 1])); + } + + let sigs = submit_all_and_start(&mut env, txs).await; + verify_ordered(&mut env, &sigs, STRESS_TIMEOUT, ctx).await; +} + +/// Stress test: Hotspot account with many writers. +/// 50 transactions all write to the same "hot" account plus a unique account each. +#[tokio::test] +async fn test_replica_stress_hotspot() { + let ctx = "Replica Stress Hotspot"; + let mut env = setup_replica_env(8); + let count = 50; + let accs = create_accounts(&mut env, count + 1); + let hotspot = accs[count]; // The last account is the hotspot + + // Each transaction writes to hotspot + its own account + let mut txs = Vec::with_capacity(count); + for (i, acc) in accs.iter().take(count).enumerate() { + env.advance_slot(); + let ix = Instruction::new_with_bincode( + guinea::ID, + &GuineaInstruction::WriteByteToData(i as u8), + vec![ + AccountMeta::new(*acc, false), + AccountMeta::new(hotspot, false), + ], + ); + txs.push(env.build_transaction(&[ix])); + } + + let sigs = submit_all_and_start(&mut env, txs).await; + + // All transactions conflict on hotspot, so must be ordered + verify_ordered(&mut env, &sigs, STRESS_TIMEOUT, ctx).await; +} + +/// Stress test: Mixed workload with independent and conflicting transactions. +/// Verifies that independent transactions can proceed while maintaining order +/// for conflicting ones. +#[tokio::test] +async fn test_replica_stress_mixed_workload() { + let ctx = "Replica Stress Mixed Workload"; + let mut env = setup_replica_env(8); + let count = 100; + + // Create 4 groups of accounts + let group_a = create_accounts(&mut env, 1); + let group_b = create_accounts(&mut env, 1); + let independent = create_accounts(&mut env, 50); + + let mut txs = Vec::with_capacity(count); + for i in 0..count { + env.advance_slot(); + match i % 4 { + 0 => txs.push(tx_write(&mut env, group_a[0], i as u8)), + 1 => txs.push(tx_write(&mut env, group_b[0], i as u8)), + _ => { + // Independent writes to unique accounts + let idx = (i / 4) % independent.len(); + txs.push(tx_write(&mut env, independent[idx], i as u8)); + } + } + } + + let sigs = submit_all_and_start(&mut env, txs).await; + let received = + collect_statuses(&mut env, sigs.len(), STRESS_TIMEOUT, ctx).await; + + // Extract positions of group_a and group_b transactions + let group_a_sigs: Vec<_> = sigs + .iter() + .enumerate() + .filter(|(i, _)| *i % 4 == 0) + .map(|(_, s)| *s) + .collect(); + let group_b_sigs: Vec<_> = sigs + .iter() + .enumerate() + .filter(|(i, _)| *i % 4 == 1) + .map(|(_, s)| *s) + .collect(); + + let group_a_positions: Vec<_> = group_a_sigs + .iter() + .map(|s| received.iter().position(|r| r == s).unwrap()) + .collect(); + let group_b_positions: Vec<_> = group_b_sigs + .iter() + .map(|s| received.iter().position(|r| r == s).unwrap()) + .collect(); + + // Group A transactions must be in order + let mut sorted_a = group_a_positions.clone(); + sorted_a.sort(); + assert_eq!(group_a_positions, sorted_a, "[{ctx}] Group A not in order"); + + // Group B transactions must be in order + let mut sorted_b = group_b_positions.clone(); + sorted_b.sort(); + assert_eq!(group_b_positions, sorted_b, "[{ctx}] Group B not in order"); +} + +/// Stress test: Many small transfers between pairs of accounts. +/// Each pair is independent, but transfers within a pair must be ordered. +#[tokio::test] +async fn test_replica_stress_transfer_pairs() { + let ctx = "Replica Stress Transfer Pairs"; + let mut env = setup_replica_env(8); + let pairs = 10; + let transfers_per_pair = 10; + let accs = create_accounts(&mut env, pairs * 2); + + let initial: HashMap<_, _> = accs + .iter() + .map(|k| (*k, env.get_account(*k).lamports())) + .collect(); + + let mut txs = Vec::with_capacity(pairs * transfers_per_pair); + // Submit transfers for each pair in round-robin to interleave them + for _ in 0..transfers_per_pair { + for p in 0..pairs { + env.advance_slot(); + let from = accs[p * 2]; + let to = accs[p * 2 + 1]; + txs.push(tx_transfer(&mut env, from, to)); + } + } + + let sigs = submit_all_and_start(&mut env, txs).await; + let received = + collect_statuses(&mut env, sigs.len(), STRESS_TIMEOUT, ctx).await; + + // For each pair, verify transfers executed in order + for p in 0..pairs { + // Get signatures for this pair's transfers (in submission order) + let pair_sigs: Vec<_> = (0..transfers_per_pair) + .map(|t| sigs[t * pairs + p]) + .collect(); + + // Get their execution positions + let positions: Vec<_> = pair_sigs + .iter() + .map(|s| received.iter().position(|r| r == s).unwrap()) + .collect(); + + // Must be in order + let mut sorted = positions.clone(); + sorted.sort(); + assert_eq!( + positions, sorted, + "[{ctx}] Pair {p} transfers not in order" + ); + } + + // Verify final balances + let transfer_amount = 1000u64; + for p in 0..pairs { + let from = accs[p * 2]; + let to = accs[p * 2 + 1]; + let expected_change = transfer_amount * transfers_per_pair as u64; + assert_eq!( + env.get_account(from).lamports(), + initial[&from] - expected_change, + "[{ctx}] Pair {p} sender balance wrong" + ); + assert_eq!( + env.get_account(to).lamports(), + initial[&to] + expected_change, + "[{ctx}] Pair {p} receiver balance wrong" + ); + } +} diff --git a/test-integration/test-tools/src/toml_to_args.rs b/test-integration/test-tools/src/toml_to_args.rs index 2d9025db8..1a7892f21 100644 --- a/test-integration/test-tools/src/toml_to_args.rs +++ b/test-integration/test-tools/src/toml_to_args.rs @@ -83,7 +83,6 @@ pub fn config_to_args( let resolved_full_config_path = config_dir.join(&program.path).canonicalize().unwrap(); args.push(resolved_full_config_path.to_str().unwrap().to_string()); - if program_loader == ProgramLoader::UpgradeableProgram { if let Some(auth) = program.auth { args.push(auth); diff --git a/test-kit/Cargo.toml b/test-kit/Cargo.toml index 808715a3b..925e3456a 100644 --- a/test-kit/Cargo.toml +++ b/test-kit/Cargo.toml @@ -25,6 +25,7 @@ solana-transaction = { workspace = true } solana-transaction-status-client-types = { workspace = true } tempfile = { workspace = true } +tokio = { workspace = true } tracing = { workspace = true } tracing-log = { workspace = true } tracing-subscriber = { workspace = true } diff --git a/test-kit/src/lib.rs b/test-kit/src/lib.rs index aadf1520c..46a04b76e 100644 --- a/test-kit/src/lib.rs +++ b/test-kit/src/lib.rs @@ -14,7 +14,7 @@ use magicblock_core::{ blocks::{BlockMeta, BlockUpdate, BlockUpdateTx}, link, transactions::{ - SanitizeableTransaction, TransactionResult, + ReplayPosition, SanitizeableTransaction, TransactionResult, TransactionSchedulerHandle, TransactionSimulationResult, }, DispatchEndpoints, @@ -25,7 +25,10 @@ use magicblock_ledger::Ledger; use magicblock_processor::{ build_svm_env, loader::load_upgradeable_programs, - scheduler::{state::TransactionSchedulerState, TransactionScheduler}, + scheduler::{ + state::{SchedulerMode, TransactionSchedulerState}, + TransactionScheduler, + }, }; use solana_account::AccountSharedData; pub use solana_instruction::*; @@ -38,6 +41,7 @@ pub use solana_signer::Signer; use solana_transaction::Transaction; use solana_transaction_status_client_types::TransactionStatusMeta; use tempfile::TempDir; +use tokio::sync::mpsc::Sender; use tracing::{error, instrument}; /// A simulated validator backend for integration tests. @@ -65,6 +69,8 @@ pub struct ExecutionTestEnv { pub blocks_tx: BlockUpdateTx, /// Transaction execution scheduler/backend for deferred launch pub scheduler: Option, + /// Channel sender for switching scheduler mode. + mode_tx: Sender, } impl Default for ExecutionTestEnv { @@ -100,6 +106,24 @@ impl ExecutionTestEnv { fee: u64, executors: u32, defer_startup: bool, + ) -> Self { + Self::new_internal(fee, executors, defer_startup, true) + } + + /// Creates a test environment in Replica mode for replay ordering tests. + /// + /// The scheduler starts in Replica mode and only accepts replay transactions. + /// Use `switch_to_primary_mode()` to transition to Primary mode. + pub fn new_replica_mode(executors: u32, defer_startup: bool) -> Self { + Self::new_internal(Self::BASE_FEE, executors, defer_startup, false) + } + + /// Internal constructor with all parameters. + fn new_internal( + fee: u64, + executors: u32, + defer_startup: bool, + primary_mode: bool, ) -> Self { init_logger!(); let dir = @@ -115,6 +139,11 @@ impl ExecutionTestEnv { let environment = build_svm_env(&accountsdb, blockhash, fee); let payers = (0..executors).map(|_| Keypair::new()).collect(); + let (mode_tx, mode_rx) = tokio::sync::mpsc::channel(1); + if primary_mode { + let _ = mode_tx.try_send(SchedulerMode::Primary); + } + let mut this = Self { payer_index: AtomicUsize::new(0), payers, @@ -125,6 +154,7 @@ impl ExecutionTestEnv { dispatch, blocks_tx: validator_channels.block_update, scheduler: None, + mode_tx: mode_tx.clone(), }; this.advance_slot(); // Move to slot 1 to ensure a non-genesis state. @@ -145,6 +175,7 @@ impl ExecutionTestEnv { environment, is_auto_airdrop_lamports_enabled: false, shutdown: Default::default(), + mode_rx, }; // Start/Defer the transaction processing backend. @@ -167,6 +198,14 @@ impl ExecutionTestEnv { } } + /// Switches the scheduler to Primary mode. + /// + /// After this call, the scheduler will accept execution transactions + /// in addition to replay transactions. + pub fn switch_to_primary_mode(&self) { + let _ = self.mode_tx.try_send(SchedulerMode::Primary); + } + /// Creates a new account with the specified properties. /// Note: This helper automatically marks the account as `delegated`. pub fn create_account_with_config( @@ -320,14 +359,27 @@ impl ExecutionTestEnv { } /// Submits a transaction for replay and waits for its result. + /// + /// # Arguments + /// * `persist` - If true, record the transaction to the ledger and broadcast status + /// * `txn` - The transaction to replay #[instrument(skip(self, txn))] pub async fn replay_transaction( &self, + persist: bool, txn: impl SanitizeableTransaction, ) -> TransactionResult { - self.transaction_scheduler.replay(txn).await.inspect_err( - |err| error!(error = ?err, "Transaction replay failed"), - ) + let position = ReplayPosition { + slot: 0, + index: 0, + persist, + }; + self.transaction_scheduler + .replay(position, txn) + .await + .inspect_err( + |err| error!(error = ?err, "Transaction replay failed"), + ) } pub fn get_account(&self, pubkey: Pubkey) -> CommitableAccount<'_> {