From 40bff3f6634602037158819b08e448f624766fd9 Mon Sep 17 00:00:00 2001 From: gsegatti Date: Sat, 24 May 2025 20:07:45 +0100 Subject: [PATCH] Improvement: chainsync asynchronism - Like #294 and #295 --- Cargo.lock | 1 + crates/discovery/Cargo.toml | 1 + crates/discovery/src/chainsync/sync.rs | 80 ++++++++++--------- .../implementations/compute_pool_contract.rs | 10 ++- .../compute_registry_contract.rs | 2 +- 5 files changed, 52 insertions(+), 42 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 414e7a3a..ef82e874 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2726,6 +2726,7 @@ dependencies = [ "anyhow", "clap 4.5.37", "env_logger", + "futures", "log", "redis", "redis-test", diff --git a/crates/discovery/Cargo.toml b/crates/discovery/Cargo.toml index 775d9f1c..cba332b8 100644 --- a/crates/discovery/Cargo.toml +++ b/crates/discovery/Cargo.toml @@ -9,6 +9,7 @@ alloy = { workspace = true } anyhow = { workspace = true } clap = { workspace = true } env_logger = { workspace = true } +futures = { workspace = true } log = { workspace = true } redis = { workspace = true } redis-test = { workspace = true } diff --git a/crates/discovery/src/chainsync/sync.rs b/crates/discovery/src/chainsync/sync.rs index 002a1c96..9a7042e1 100644 --- a/crates/discovery/src/chainsync/sync.rs +++ b/crates/discovery/src/chainsync/sync.rs @@ -1,6 +1,7 @@ use crate::store::node_store::NodeStore; use alloy::primitives::Address; use anyhow::Error; +use futures::StreamExt; use log::error; use shared::models::node::DiscoveryNode; use shared::web3::contracts::core::builder::Contracts; @@ -37,10 +38,8 @@ impl ChainSync { async fn sync_single_node( node_store: Arc, contracts: Arc, - node: DiscoveryNode, + mut node: DiscoveryNode, ) -> Result<(), Error> { - let mut n = node.clone(); - // Safely parse provider_address and node_address let provider_address = Address::from_str(&node.provider_address).map_err(|e| { eprintln!("Failed to parse provider address: {}", e); @@ -52,40 +51,37 @@ impl ChainSync { anyhow::anyhow!("Invalid node address") })?; - let node_info = contracts - .compute_registry - .get_node(provider_address, node_address) - .await - .map_err(|e| { - eprintln!("Error retrieving node info: {}", e); - anyhow::anyhow!("Failed to retrieve node info") - })?; + let (node_info_result, provider_info_result, is_blacklisted_result) = tokio::join!( + contracts + .compute_registry + .get_node(provider_address, node_address), + contracts.compute_registry.get_provider(provider_address), + contracts + .compute_pool + .is_node_blacklisted(node.node.compute_pool_id, node_address), + ); - let provider_info = contracts - .compute_registry - .get_provider(provider_address) - .await - .map_err(|e| { - eprintln!("Error retrieving provider info: {}", e); - anyhow::anyhow!("Failed to retrieve provider info") - })?; + let node_info = node_info_result.map_err(|e| { + eprintln!("Error retrieving node info: {}", e); + anyhow::anyhow!("Failed to retrieve node info") + })?; - let (is_active, is_validated) = node_info; - n.is_active = is_active; - n.is_validated = is_validated; - n.is_provider_whitelisted = provider_info.is_whitelisted; + let provider_info = provider_info_result.map_err(|e| { + eprintln!("Error retrieving provider info: {}", e); + anyhow::anyhow!("Failed to retrieve provider info") + })?; - // Handle potential errors from async calls - let is_blacklisted = contracts - .compute_pool - .is_node_blacklisted(node.node.compute_pool_id, node_address) - .await - .map_err(|e| { - eprintln!("Error checking if node is blacklisted: {}", e); - anyhow::anyhow!("Failed to check blacklist status") - })?; - n.is_blacklisted = is_blacklisted; - match node_store.update_node(n) { + let is_blacklisted = is_blacklisted_result.map_err(|e| { + eprintln!("Error checking if node is blacklisted: {}", e); + anyhow::anyhow!("Failed to check blacklist status") + })?; + + let (is_active, is_validated) = node_info; + node.is_active = is_active; + node.is_validated = is_validated; + node.is_provider_whitelisted = provider_info.is_whitelisted; + node.is_blacklisted = is_blacklisted; + match node_store.update_node(node) { Ok(_) => (), Err(e) => { error!("Error updating node: {}", e); @@ -110,11 +106,17 @@ impl ChainSync { let nodes = node_store_clone.get_nodes(); match nodes { Ok(nodes) => { - for node in nodes { - if let Err(e) = ChainSync::sync_single_node(node_store_clone.clone(), contracts_clone.clone(), node).await { - error!("Error syncing node: {}", e); - } - } + futures::stream::iter(nodes) + .for_each_concurrent(10, |node| { + let node_store = node_store_clone.clone(); + let contracts = contracts_clone.clone(); + async move { + if let Err(e) = ChainSync::sync_single_node(node_store, contracts, node).await { + error!("Error syncing node: {}", e); + } + } + }) + .await; // Update the last chain sync time let mut last_sync = last_chain_sync.lock().await; *last_sync = Some(SystemTime::now()); diff --git a/crates/shared/src/web3/contracts/implementations/compute_pool_contract.rs b/crates/shared/src/web3/contracts/implementations/compute_pool_contract.rs index 803abd91..4b2dedc1 100644 --- a/crates/shared/src/web3/contracts/implementations/compute_pool_contract.rs +++ b/crates/shared/src/web3/contracts/implementations/compute_pool_contract.rs @@ -173,7 +173,7 @@ impl ComputePool { &self, pool_id: u32, node: Address, - ) -> Result> { + ) -> Result> { let arg_pool_id: U256 = U256::from(pool_id); let result = self .instance @@ -184,7 +184,13 @@ impl ComputePool { )? .call() .await?; - Ok(result.first().unwrap().as_bool().unwrap()) + let first_value = result + .first() + .ok_or("Empty response when checking if node is blacklisted")?; + let is_blacklisted = first_value + .as_bool() + .ok_or("Expected a boolean value in response for node blacklisted check")?; + Ok(is_blacklisted) } pub async fn get_blacklisted_nodes( diff --git a/crates/shared/src/web3/contracts/implementations/compute_registry_contract.rs b/crates/shared/src/web3/contracts/implementations/compute_registry_contract.rs index da07f0ad..4bc01df5 100644 --- a/crates/shared/src/web3/contracts/implementations/compute_registry_contract.rs +++ b/crates/shared/src/web3/contracts/implementations/compute_registry_contract.rs @@ -22,7 +22,7 @@ impl ComputeRegistryContract { pub async fn get_provider( &self, address: Address, - ) -> Result> { + ) -> Result> { let provider_response = self .instance .instance()