Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 17 additions & 4 deletions magicblock-api/src/magic_validator.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{
path::{Path, PathBuf},
sync::{
atomic::{AtomicBool, Ordering},
atomic::{AtomicBool, AtomicU64, Ordering},
Arc,
},
thread,
Expand Down Expand Up @@ -211,11 +211,19 @@ impl MagicValidator {
};
let accountsdb = Arc::new(accountsdb);
let (mut dispatch, validator_channels) = link();
let shared_chain_slot =
(!Self::replication_mode_uses_disabled_chainlink(
&config.validator.replication_mode,
))
.then(Arc::<AtomicU64>::default);

let step_start = Instant::now();
let committor_service =
Self::init_committor_service(&config, ledger.latest_block())
.await?;
let committor_service = Self::init_committor_service(
&config,
ledger.latest_block(),
shared_chain_slot.clone(),
)
.await?;
log_timing("startup", "committor_service_init", step_start);
init_magic_sys(Arc::new(MagicSysAdapter::new(
committor_service.clone(),
Expand All @@ -228,6 +236,7 @@ impl MagicValidator {
&dispatch.transaction_scheduler,
&ledger.latest_block().clone(),
&accountsdb,
shared_chain_slot.clone(),
)
.await?,
);
Expand Down Expand Up @@ -443,6 +452,7 @@ impl MagicValidator {
async fn init_committor_service(
config: &ValidatorParams,
latest_block: &LatestBlock,
chain_slot: Option<Arc<AtomicU64>>,
) -> ApiResult<Option<Arc<CommittorService>>> {
let committor_persist_path =
config.storage.join("committor_service.sqlite");
Expand All @@ -464,6 +474,7 @@ impl MagicValidator {
),
actions_timeout: DEFAULT_ACTIONS_TIMEOUT,
},
chain_slot,
actions_callback_executor,
)?));

Expand All @@ -477,6 +488,7 @@ impl MagicValidator {
transaction_scheduler: &TransactionSchedulerHandle,
latest_block: &LatestBlock,
accountsdb: &Arc<AccountsDb>,
chain_slot: Option<Arc<AtomicU64>>,
) -> ApiResult<ChainlinkImpl> {
if Self::replication_mode_uses_disabled_chainlink(
&config.validator.replication_mode,
Expand Down Expand Up @@ -536,6 +548,7 @@ impl MagicValidator {
chainlink_config,
&config.chainlink,
config.storage.as_path(),
chain_slot.unwrap_or_default(),
)
.await?;

Expand Down
8 changes: 7 additions & 1 deletion magicblock-chainlink/src/chainlink/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use std::{collections::HashSet, path::Path, sync::Arc};
use std::{
collections::HashSet,
path::Path,
sync::{atomic::AtomicU64, Arc},
};

use dlp_api::pda::ephemeral_balance_pda_from_payer;
use errors::{ChainlinkError, ChainlinkResult};
Expand Down Expand Up @@ -320,6 +324,7 @@ impl<T: ChainRpcClient, U: ChainPubsubClient, V: AccountsBank, C: Cloner>
config: ChainlinkConfig,
chainlink_config: &ChainLinkConfig,
ledger_path: &Path,
chain_slot: Arc<AtomicU64>,
) -> ChainlinkResult<
InnerChainlink<
ChainRpcClientImpl,
Expand All @@ -337,6 +342,7 @@ impl<T: ChainRpcClient, U: ChainPubsubClient, V: AccountsBank, C: Cloner>
commitment,
tx,
&config.remote_account_provider,
Some(chain_slot),
)
.await?;
let fetch_cloner = if let Some(provider) = account_provider {
Expand Down
6 changes: 3 additions & 3 deletions magicblock-chainlink/src/remote_account_provider/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,7 @@ impl
commitment: CommitmentConfig,
subscription_forwarder: mpsc::Sender<ForwardedSubscriptionUpdate>,
config: &RemoteAccountProviderConfig,
chain_slot: Option<Arc<AtomicU64>>,
) -> ChainlinkResult<
Option<
RemoteAccountProvider<
Expand All @@ -513,6 +514,7 @@ impl
commitment,
subscription_forwarder,
config,
chain_slot.unwrap_or_default(),
)
.await?;
Ok(Some(provider))
Expand Down Expand Up @@ -651,6 +653,7 @@ impl<T: ChainRpcClient, U: ChainPubsubClient> RemoteAccountProvider<T, U> {
commitment: CommitmentConfig,
subscription_forwarder: mpsc::Sender<ForwardedSubscriptionUpdate>,
config: &RemoteAccountProviderConfig,
chain_slot: Arc<AtomicU64>,
) -> RemoteAccountProviderResult<
RemoteAccountProvider<
ChainRpcClientImpl,
Expand All @@ -674,9 +677,6 @@ impl<T: ChainRpcClient, U: ChainPubsubClient> RemoteAccountProvider<T, U> {
let rpc_client =
ChainRpcClientImpl::new_from_url(rpc_url.as_str(), commitment);

// Create chain_slot to be shared with all pubsub clients
let chain_slot = Arc::<AtomicU64>::default();

// Build startup pubsub clients and wrap them into a SubMuxClient.
// gRPC clients are cheap to create and backfill subscriptions, so
// when present we let slower WebSocket clients attach after startup.
Expand Down
9 changes: 7 additions & 2 deletions magicblock-committor-service/src/committor_processor.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{
collections::{HashMap, HashSet},
path::Path,
sync::Arc,
sync::{atomic::AtomicU64, Arc},
};

use magicblock_core::traits::ActionsCallbackScheduler;
Expand Down Expand Up @@ -48,6 +48,7 @@ impl CommittorProcessor {
authority: Keypair,
persist_file: P,
chain_config: ChainConfig,
chain_slot: Option<Arc<AtomicU64>>,
actions_callback_executor: A,
) -> CommittorServiceResult<Self>
where
Expand All @@ -59,7 +60,11 @@ impl CommittorProcessor {
chain_config.commitment,
);
let rpc_client = Arc::new(rpc_client);
let magic_block_rpc_client = MagicblockRpcClient::new(rpc_client);
let magic_block_rpc_client = if let Some(chain_slot) = chain_slot {
MagicblockRpcClient::new_with_chain_slot(rpc_client, chain_slot)
} else {
MagicblockRpcClient::new(rpc_client)
};

// Create TableMania
let gc_config = GarbageCollectorConfig::default();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ impl IntentExecutionClient {
IntentExecutionClient { rpc_client }
}

pub(in crate::intent_executor) async fn invalidate_cached_blockhash(&self) {
self.rpc_client.invalidate_cached_blockhash().await;
}

pub(in crate::intent_executor) async fn execute_message_with_retries(
&self,
authority: &Keypair,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ where
break Err(execution_err);
}
};
self.intent_client.invalidate_cached_blockhash().await;
self.execution_report.dispose(cleanup);

if self.current_attempt >= RECURSION_CEILING {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ where
break Err(execution_err);
}
};
self.intent_client.invalidate_cached_blockhash().await;
self.execution_report.dispose(cleanup);

if self.state.current_attempt >= Self::RECURSION_CEILING {
Expand Down Expand Up @@ -401,6 +402,7 @@ where
break Err(execution_err);
}
};
self.intent_client.invalidate_cached_blockhash().await;
self.execution_report.dispose(cleanup);

if self.state.current_attempt >= Self::RECURSION_CEILING {
Expand Down
11 changes: 10 additions & 1 deletion magicblock-committor-service/src/service.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
use std::{collections::HashMap, path::Path, sync::Arc, time::Instant};
use std::{
collections::HashMap,
path::Path,
sync::{atomic::AtomicU64, Arc},
time::Instant,
};

use magicblock_core::traits::ActionsCallbackScheduler;
use magicblock_program::magic_scheduled_base_intent::ScheduledIntentBundle;
Expand Down Expand Up @@ -113,6 +118,7 @@ impl CommittorActor {
authority: Keypair,
persist_file: P,
chain_config: ChainConfig,
chain_slot: Option<Arc<AtomicU64>>,
actions_callback_executor: A,
) -> CommittorServiceResult<Self>
where
Expand All @@ -123,6 +129,7 @@ impl CommittorActor {
authority,
persist_file,
chain_config,
chain_slot,
actions_callback_executor,
)?);

Expand Down Expand Up @@ -316,6 +323,7 @@ impl CommittorService {
authority: Keypair,
persist_file: P,
chain_config: ChainConfig,
chain_slot: Option<Arc<AtomicU64>>,
actions_callback_executor: A,
) -> CommittorServiceResult<Self>
where
Expand All @@ -332,6 +340,7 @@ impl CommittorService {
authority,
persist_file,
chain_config,
chain_slot,
actions_callback_executor,
)?;
tokio::spawn(async move {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ impl DeliveryPreparator {
let cleanup_task = preparation_task.cleanup_task();

self.cleanup(authority, &[cleanup_task], &[]).await?;
self.rpc_client.invalidate_cached_blockhash().await;

// Restore preparation stage for retry
*stage = CommitBufferStage::Preparation(preparation_task);
Expand Down
1 change: 1 addition & 0 deletions magicblock-rpc-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ solana-address-lookup-table-interface = { workspace = true, features = [
"bytemuck",
] }
futures-util = { workspace = true }
serde_json = { workspace = true }
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Afaik this is only needed since we reimplemented code that's already in the solana rpc client crate.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See this: #1259 (comment)

Agree on the ugliness, but afaik there is not way to cleanly do it and I want to avoid 2 RPC calls when the slot information is already in the response

thiserror = { workspace = true }
tokio = { workspace = true }

Expand Down
Loading
Loading