diff --git a/Cargo.lock b/Cargo.lock index 3480eb81e..cbc8d738a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4220,6 +4220,7 @@ name = "magicblock-rpc-client" version = "0.11.2" dependencies = [ "futures-util", + "serde_json", "solana-account 3.4.0", "solana-account-decoder-client-types", "solana-address-lookup-table-interface 3.1.0", diff --git a/magicblock-api/src/magic_validator.rs b/magicblock-api/src/magic_validator.rs index b815d39ca..7bb0181e1 100644 --- a/magicblock-api/src/magic_validator.rs +++ b/magicblock-api/src/magic_validator.rs @@ -1,7 +1,7 @@ use std::{ path::{Path, PathBuf}, sync::{ - atomic::{AtomicBool, Ordering}, + atomic::{AtomicBool, AtomicU64, Ordering}, Arc, }, thread, @@ -211,11 +211,19 @@ impl MagicValidator { }; let accountsdb = Arc::new(accountsdb); let (mut dispatch, validator_channels) = link(); + let shared_chain_slot = + (!Self::replication_mode_uses_disabled_chainlink( + &config.validator.replication_mode, + )) + .then(Arc::::default); let step_start = Instant::now(); - let committor_service = - Self::init_committor_service(&config, ledger.latest_block()) - .await?; + let committor_service = Self::init_committor_service( + &config, + ledger.latest_block(), + shared_chain_slot.clone(), + ) + .await?; log_timing("startup", "committor_service_init", step_start); init_magic_sys(Arc::new(MagicSysAdapter::new( committor_service.clone(), @@ -228,6 +236,7 @@ impl MagicValidator { &dispatch.transaction_scheduler, &ledger.latest_block().clone(), &accountsdb, + shared_chain_slot.clone(), ) .await?, ); @@ -443,6 +452,7 @@ impl MagicValidator { async fn init_committor_service( config: &ValidatorParams, latest_block: &LatestBlock, + chain_slot: Option>, ) -> ApiResult>> { let committor_persist_path = config.storage.join("committor_service.sqlite"); @@ -464,6 +474,7 @@ impl MagicValidator { ), actions_timeout: DEFAULT_ACTIONS_TIMEOUT, }, + chain_slot, actions_callback_executor, )?)); @@ -477,6 +488,7 @@ impl MagicValidator { transaction_scheduler: &TransactionSchedulerHandle, latest_block: &LatestBlock, accountsdb: &Arc, + chain_slot: Option>, ) -> ApiResult { if Self::replication_mode_uses_disabled_chainlink( &config.validator.replication_mode, @@ -536,6 +548,7 @@ impl MagicValidator { chainlink_config, &config.chainlink, config.storage.as_path(), + chain_slot.unwrap_or_default(), ) .await?; diff --git a/magicblock-chainlink/src/chainlink/mod.rs b/magicblock-chainlink/src/chainlink/mod.rs index 8d8f12975..1000f170d 100644 --- a/magicblock-chainlink/src/chainlink/mod.rs +++ b/magicblock-chainlink/src/chainlink/mod.rs @@ -1,4 +1,8 @@ -use std::{collections::HashSet, path::Path, sync::Arc}; +use std::{ + collections::HashSet, + path::Path, + sync::{atomic::AtomicU64, Arc}, +}; use dlp_api::pda::ephemeral_balance_pda_from_payer; use errors::{ChainlinkError, ChainlinkResult}; @@ -320,6 +324,7 @@ impl config: ChainlinkConfig, chainlink_config: &ChainLinkConfig, ledger_path: &Path, + chain_slot: Arc, ) -> ChainlinkResult< InnerChainlink< ChainRpcClientImpl, @@ -337,6 +342,7 @@ impl commitment, tx, &config.remote_account_provider, + Some(chain_slot), ) .await?; let fetch_cloner = if let Some(provider) = account_provider { diff --git a/magicblock-chainlink/src/remote_account_provider/mod.rs b/magicblock-chainlink/src/remote_account_provider/mod.rs index 656317466..de7291bd5 100644 --- a/magicblock-chainlink/src/remote_account_provider/mod.rs +++ b/magicblock-chainlink/src/remote_account_provider/mod.rs @@ -494,6 +494,7 @@ impl commitment: CommitmentConfig, subscription_forwarder: mpsc::Sender, config: &RemoteAccountProviderConfig, + chain_slot: Option>, ) -> ChainlinkResult< Option< RemoteAccountProvider< @@ -513,6 +514,7 @@ impl commitment, subscription_forwarder, config, + chain_slot.unwrap_or_default(), ) .await?; Ok(Some(provider)) @@ -651,6 +653,7 @@ impl RemoteAccountProvider { commitment: CommitmentConfig, subscription_forwarder: mpsc::Sender, config: &RemoteAccountProviderConfig, + chain_slot: Arc, ) -> RemoteAccountProviderResult< RemoteAccountProvider< ChainRpcClientImpl, @@ -674,9 +677,6 @@ impl RemoteAccountProvider { let rpc_client = ChainRpcClientImpl::new_from_url(rpc_url.as_str(), commitment); - // Create chain_slot to be shared with all pubsub clients - let chain_slot = Arc::::default(); - // Build startup pubsub clients and wrap them into a SubMuxClient. // gRPC clients are cheap to create and backfill subscriptions, so // when present we let slower WebSocket clients attach after startup. diff --git a/magicblock-committor-service/src/committor_processor.rs b/magicblock-committor-service/src/committor_processor.rs index 5439037d8..d471fccef 100644 --- a/magicblock-committor-service/src/committor_processor.rs +++ b/magicblock-committor-service/src/committor_processor.rs @@ -1,7 +1,7 @@ use std::{ collections::{HashMap, HashSet}, path::Path, - sync::Arc, + sync::{atomic::AtomicU64, Arc}, }; use magicblock_core::traits::ActionsCallbackScheduler; @@ -48,6 +48,7 @@ impl CommittorProcessor { authority: Keypair, persist_file: P, chain_config: ChainConfig, + chain_slot: Option>, actions_callback_executor: A, ) -> CommittorServiceResult where @@ -59,7 +60,11 @@ impl CommittorProcessor { chain_config.commitment, ); let rpc_client = Arc::new(rpc_client); - let magic_block_rpc_client = MagicblockRpcClient::new(rpc_client); + let magic_block_rpc_client = if let Some(chain_slot) = chain_slot { + MagicblockRpcClient::new_with_chain_slot(rpc_client, chain_slot) + } else { + MagicblockRpcClient::new(rpc_client) + }; // Create TableMania let gc_config = GarbageCollectorConfig::default(); diff --git a/magicblock-committor-service/src/intent_executor/intent_execution_client.rs b/magicblock-committor-service/src/intent_executor/intent_execution_client.rs index 69ace556c..34d771a1c 100644 --- a/magicblock-committor-service/src/intent_executor/intent_execution_client.rs +++ b/magicblock-committor-service/src/intent_executor/intent_execution_client.rs @@ -37,6 +37,10 @@ impl IntentExecutionClient { IntentExecutionClient { rpc_client } } + pub(in crate::intent_executor) async fn invalidate_cached_blockhash(&self) { + self.rpc_client.invalidate_cached_blockhash().await; + } + pub(in crate::intent_executor) async fn execute_message_with_retries( &self, authority: &Keypair, diff --git a/magicblock-committor-service/src/intent_executor/single_stage_executor.rs b/magicblock-committor-service/src/intent_executor/single_stage_executor.rs index e0ef15303..cace3011b 100644 --- a/magicblock-committor-service/src/intent_executor/single_stage_executor.rs +++ b/magicblock-committor-service/src/intent_executor/single_stage_executor.rs @@ -114,6 +114,7 @@ where break Err(execution_err); } }; + self.intent_client.invalidate_cached_blockhash().await; self.execution_report.dispose(cleanup); if self.current_attempt >= RECURSION_CEILING { diff --git a/magicblock-committor-service/src/intent_executor/two_stage_executor.rs b/magicblock-committor-service/src/intent_executor/two_stage_executor.rs index 7358e5ca1..875286f42 100644 --- a/magicblock-committor-service/src/intent_executor/two_stage_executor.rs +++ b/magicblock-committor-service/src/intent_executor/two_stage_executor.rs @@ -141,6 +141,7 @@ where break Err(execution_err); } }; + self.intent_client.invalidate_cached_blockhash().await; self.execution_report.dispose(cleanup); if self.state.current_attempt >= Self::RECURSION_CEILING { @@ -401,6 +402,7 @@ where break Err(execution_err); } }; + self.intent_client.invalidate_cached_blockhash().await; self.execution_report.dispose(cleanup); if self.state.current_attempt >= Self::RECURSION_CEILING { diff --git a/magicblock-committor-service/src/service.rs b/magicblock-committor-service/src/service.rs index bbec9d805..5a08f1d70 100644 --- a/magicblock-committor-service/src/service.rs +++ b/magicblock-committor-service/src/service.rs @@ -1,4 +1,9 @@ -use std::{collections::HashMap, path::Path, sync::Arc, time::Instant}; +use std::{ + collections::HashMap, + path::Path, + sync::{atomic::AtomicU64, Arc}, + time::Instant, +}; use magicblock_core::traits::ActionsCallbackScheduler; use magicblock_program::magic_scheduled_base_intent::ScheduledIntentBundle; @@ -113,6 +118,7 @@ impl CommittorActor { authority: Keypair, persist_file: P, chain_config: ChainConfig, + chain_slot: Option>, actions_callback_executor: A, ) -> CommittorServiceResult where @@ -123,6 +129,7 @@ impl CommittorActor { authority, persist_file, chain_config, + chain_slot, actions_callback_executor, )?); @@ -316,6 +323,7 @@ impl CommittorService { authority: Keypair, persist_file: P, chain_config: ChainConfig, + chain_slot: Option>, actions_callback_executor: A, ) -> CommittorServiceResult where @@ -332,6 +340,7 @@ impl CommittorService { authority, persist_file, chain_config, + chain_slot, actions_callback_executor, )?; tokio::spawn(async move { diff --git a/magicblock-committor-service/src/transaction_preparator/delivery_preparator.rs b/magicblock-committor-service/src/transaction_preparator/delivery_preparator.rs index 484c3bd92..1ce9751a8 100644 --- a/magicblock-committor-service/src/transaction_preparator/delivery_preparator.rs +++ b/magicblock-committor-service/src/transaction_preparator/delivery_preparator.rs @@ -193,6 +193,7 @@ impl DeliveryPreparator { let cleanup_task = preparation_task.cleanup_task(); self.cleanup(authority, &[cleanup_task], &[]).await?; + self.rpc_client.invalidate_cached_blockhash().await; // Restore preparation stage for retry *stage = CommitBufferStage::Preparation(preparation_task); diff --git a/magicblock-rpc-client/Cargo.toml b/magicblock-rpc-client/Cargo.toml index 7adb1167e..1d418efc1 100644 --- a/magicblock-rpc-client/Cargo.toml +++ b/magicblock-rpc-client/Cargo.toml @@ -29,6 +29,7 @@ solana-address-lookup-table-interface = { workspace = true, features = [ "bytemuck", ] } futures-util = { workspace = true } +serde_json = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } diff --git a/magicblock-rpc-client/src/lib.rs b/magicblock-rpc-client/src/lib.rs index ee8a27b56..5997e600a 100644 --- a/magicblock-rpc-client/src/lib.rs +++ b/magicblock-rpc-client/src/lib.rs @@ -3,11 +3,15 @@ pub mod utils; use std::{ - sync::Arc, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, time::{Duration, Instant}, }; use futures_util::future::try_join_all; +use serde_json::json; use solana_account::Account; use solana_account_decoder_client_types::UiAccountEncoding; use solana_address_lookup_table_interface::state::{ @@ -22,18 +26,19 @@ use solana_rpc_client::{ nonblocking::rpc_client::RpcClient, rpc_client::SerializableTransaction, }; use solana_rpc_client_api::{ - client_error::ErrorKind as RpcClientErrorKind, + client_error::{Error as RpcClientError, ErrorKind as RpcClientErrorKind}, config::{ RpcAccountInfoConfig, RpcSendTransactionConfig, RpcTransactionConfig, }, - request::RpcError, + request::{RpcError, RpcRequest}, + response::{Response, RpcBlockhash}, }; use solana_signature::Signature; use solana_transaction_error::{TransactionError, TransactionResult}; use solana_transaction_status_client_types::{ EncodedConfirmedTransactionWithStatusMeta, UiTransactionEncoding, }; -use tokio::time::sleep; +use tokio::{sync::Mutex as TMutex, time::sleep}; use tracing::*; /// The encoding to use when sending transactions @@ -230,12 +235,37 @@ impl MagicBlockSendTransactionOutcome { // Derived from error from helius RPC: Failed to download accounts: Error { request: Some(GetMultipleAccounts), kind: RpcError(RpcResponseError { code: -32602, message: "Too many inputs provided; max 100", data: Empty }) } const MAX_MULTIPLE_ACCOUNTS: usize = 100; +// Keep this below the default 50ms * 150 slot blockhash lifetime. +const BLOCKHASH_CACHE_TTL: Duration = Duration::from_secs(5); +const SLOT_CACHE_TTL: Duration = Duration::from_millis(400); + +#[derive(Default)] +struct RpcClientCache { + blockhash: TMutex>, + slot: TMutex>, +} + +#[derive(Clone, Copy)] +struct CachedBlockhash { + blockhash: Hash, + context_slot: Slot, + last_valid_block_height: u64, + fetched_at: Instant, +} + +#[derive(Clone, Copy)] +struct CachedSlot { + slot: Slot, + fetched_at: Instant, +} /// Wraps a [RpcClient] to provide improved functionality, specifically /// for sending transactions. #[derive(Clone)] pub struct MagicblockRpcClient { client: Arc, + cache: Arc, + chain_slot: Option>, } impl From for MagicblockRpcClient { @@ -247,24 +277,159 @@ impl From for MagicblockRpcClient { impl MagicblockRpcClient { /// Create a new [MagicBlockRpcClient] from an existing [RpcClient]. pub fn new(client: Arc) -> Self { - Self { client } + Self { + client, + cache: Arc::new(RpcClientCache::default()), + chain_slot: None, + } + } + + pub fn new_with_chain_slot( + client: Arc, + chain_slot: Arc, + ) -> Self { + Self { + client, + cache: Arc::new(RpcClientCache::default()), + chain_slot: Some(chain_slot), + } } pub async fn get_latest_blockhash( &self, ) -> MagicBlockRpcClientResult { - self.client.get_latest_blockhash().await.map_err(|e| { - MagicBlockRpcClientError::GetLatestBlockhash(Box::new(e)) - }) + let mut cached = self.cache.blockhash.lock().await; + if let Some(blockhash) = Self::fresh_cached_blockhash(&cached) { + return Ok(blockhash); + } + + let (blockhash, context_slot, last_valid_block_height) = + self.fetch_latest_blockhash_with_context().await?; + *cached = Some(CachedBlockhash { + blockhash, + context_slot, + last_valid_block_height, + fetched_at: Instant::now(), + }); + drop(cached); + + self.record_observed_slot(context_slot).await; + + Ok(blockhash) + } + + async fn fetch_latest_blockhash_with_context( + &self, + ) -> MagicBlockRpcClientResult<(Hash, Slot, u64)> { + let resp: Response = self + .client + .send(RpcRequest::GetLatestBlockhash, json!([self.commitment()])) + .await + .map_err(|e| { + MagicBlockRpcClientError::GetLatestBlockhash(Box::new(e)) + })?; + let blockhash = resp.value.blockhash.parse().map_err(|_| { + MagicBlockRpcClientError::GetLatestBlockhash(Box::new( + RpcClientError::new_with_request( + RpcClientErrorKind::RpcError(RpcError::ParseError( + "Hash".to_string(), + )), + RpcRequest::GetLatestBlockhash, + ), + )) + })?; + + Ok(( + blockhash, + resp.context.slot, + resp.value.last_valid_block_height, + )) } pub async fn get_slot(&self) -> MagicBlockRpcClientResult { + let slot = self.fetch_slot().await?; + self.record_observed_slot(slot).await; + Ok(slot) + } + + async fn get_cached_slot(&self) -> MagicBlockRpcClientResult { + let mut cached = self.cache.slot.lock().await; + if let Some(slot) = Self::fresh_cached_slot(&cached) { + return Ok(slot); + } + + let slot = self.fetch_slot().await?; + Self::cache_slot(&mut cached, slot); + drop(cached); + + self.update_chain_slot(slot); + + Ok(slot) + } + + async fn fetch_slot(&self) -> MagicBlockRpcClientResult { self.client .get_slot() .await .map_err(|e| MagicBlockRpcClientError::GetSlot(Box::new(e))) } + pub async fn invalidate_cached_blockhash(&self) { + let mut cached = self.cache.blockhash.lock().await; + *cached = None; + } + + fn fresh_cached_blockhash( + cached: &Option, + ) -> Option { + cached + .as_ref() + .filter(|value| value.fetched_at.elapsed() < BLOCKHASH_CACHE_TTL) + .map(|value| { + trace!( + context_slot = value.context_slot, + last_valid_block_height = value.last_valid_block_height, + "Using cached latest blockhash" + ); + value.blockhash + }) + } + + fn fresh_cached_slot(cached: &Option) -> Option { + cached + .as_ref() + .filter(|value| value.fetched_at.elapsed() < SLOT_CACHE_TTL) + .map(|value| value.slot) + } + + async fn record_observed_slot(&self, slot: Slot) { + self.update_chain_slot(slot); + let mut cached = self.cache.slot.lock().await; + Self::cache_slot(&mut cached, slot); + } + + fn cache_slot(cached: &mut Option, slot: Slot) { + if cached.as_ref().is_none_or(|value| slot >= value.slot) { + *cached = Some(CachedSlot { + slot, + fetched_at: Instant::now(), + }); + } + } + + fn observed_chain_slot(&self) -> Option { + self.chain_slot + .as_ref() + .map(|slot| slot.load(Ordering::Relaxed)) + .filter(|slot| *slot > 0) + } + + fn update_chain_slot(&self, slot: Slot) { + if let Some(chain_slot) = &self.chain_slot { + chain_slot.fetch_max(slot, Ordering::Relaxed); + } + } + pub async fn get_account( &self, pubkey: &Pubkey, @@ -385,7 +550,11 @@ impl MagicblockRpcClient { } pub async fn wait_for_next_slot(&self) -> MagicBlockRpcClientResult { - let slot = self.get_slot().await?; + let slot = if let Some(slot) = self.observed_chain_slot() { + slot + } else { + self.get_cached_slot().await? + }; self.wait_for_higher_slot(slot).await } @@ -394,7 +563,12 @@ impl MagicblockRpcClient { slot: Slot, ) -> MagicBlockRpcClientResult { let higher_slot = loop { - let next_slot = self.get_slot().await?; + let next_slot = if let Some(next_slot) = self.observed_chain_slot() + { + next_slot + } else { + self.get_cached_slot().await? + }; if next_slot > slot { break next_slot; } diff --git a/magicblock-table-mania/src/manager.rs b/magicblock-table-mania/src/manager.rs index dbf7dee64..a0b60d52b 100644 --- a/magicblock-table-mania/src/manager.rs +++ b/magicblock-table-mania/src/manager.rs @@ -569,12 +569,11 @@ impl TableMania { // 2. Ensure that all matching keys are also present remotely and have been finalized let remote_tables = { - let mut last_slot = self.rpc_client.get_slot().await?; - let matching_table_keys = matching_tables.keys().cloned().collect::>(); let start = Instant::now(); + let mut last_wait_log = Instant::now(); let table_keys_str = matching_table_keys .iter() .map(|x| x.to_string()) @@ -648,12 +647,16 @@ impl TableMania { ); } - if let Ok(slot) = self.rpc_client.wait_for_next_slot().await { - if slot - last_slot > 20 { - tracing::Span::current().record("current_slot", slot); - debug!("Still waiting for remote tables"); - } - last_slot = slot; + if let Err(err) = self.rpc_client.wait_for_next_slot().await { + trace!( + error = ?err, + "Failed to wait for next slot; falling back to timed retry" + ); + sleep(Duration::from_millis(450)).await; + } + if last_wait_log.elapsed() > Duration::from_secs(8) { + debug!("Still waiting for remote tables"); + last_wait_log = Instant::now(); } } }; diff --git a/test-integration/Cargo.lock b/test-integration/Cargo.lock index 2cfd96d55..ca45f7cc0 100644 --- a/test-integration/Cargo.lock +++ b/test-integration/Cargo.lock @@ -4777,6 +4777,7 @@ name = "magicblock-rpc-client" version = "0.11.2" dependencies = [ "futures-util", + "serde_json", "solana-account", "solana-account-decoder-client-types", "solana-address-lookup-table-interface", diff --git a/test-integration/test-chainlink/src/ixtest_context.rs b/test-integration/test-chainlink/src/ixtest_context.rs index 9a0e1dd4a..b48c76359 100644 --- a/test-integration/test-chainlink/src/ixtest_context.rs +++ b/test-integration/test-chainlink/src/ixtest_context.rs @@ -113,6 +113,7 @@ impl IxtestContext { commitment, tx, &config.remote_account_provider, + None, ) .await; diff --git a/test-integration/test-chainlink/tests/ix_remote_account_provider.rs b/test-integration/test-chainlink/tests/ix_remote_account_provider.rs index c1f6da65c..c61830f23 100644 --- a/test-integration/test-chainlink/tests/ix_remote_account_provider.rs +++ b/test-integration/test-chainlink/tests/ix_remote_account_provider.rs @@ -47,6 +47,7 @@ async fn init_remote_account_provider( &RemoteAccountProviderConfig::default_with_lifecycle_mode( LifecycleMode::Ephemeral, ), + None, ) .await .expect("Failed to create RemoteAccountProvider") diff --git a/test-integration/test-committor-service/tests/test_ix_commit_local.rs b/test-integration/test-committor-service/tests/test_ix_commit_local.rs index d18d0575f..d73be65ac 100644 --- a/test-integration/test-committor-service/tests/test_ix_commit_local.rs +++ b/test-integration/test-committor-service/tests/test_ix_commit_local.rs @@ -249,6 +249,7 @@ async fn commit_single_account( validator_auth.insecure_clone(), ":memory:", ChainConfig::local(ComputeBudgetConfig::new(1_000_000)), + None, common::MockActionsCallbackExecutor::default(), ) .unwrap(); @@ -331,6 +332,7 @@ async fn commit_book_order_account( validator_auth.insecure_clone(), ":memory:", ChainConfig::local(ComputeBudgetConfig::new(1_000_000)), + None, common::MockActionsCallbackExecutor::default(), ) .unwrap(); @@ -804,6 +806,7 @@ async fn commit_multiple_accounts( validator_auth.insecure_clone(), ":memory:", ChainConfig::local(ComputeBudgetConfig::new(1_000_000)), + None, common::MockActionsCallbackExecutor::default(), ) .unwrap(); @@ -873,6 +876,7 @@ async fn execute_intent_bundle( validator_auth.insecure_clone(), ":memory:", ChainConfig::local(ComputeBudgetConfig::new(1_000_000)), + None, common::MockActionsCallbackExecutor::default(), ) .unwrap();