diff --git a/crates/discovery/src/api/routes/node.rs b/crates/discovery/src/api/routes/node.rs index 80c46c12..e6a76f36 100644 --- a/crates/discovery/src/api/routes/node.rs +++ b/crates/discovery/src/api/routes/node.rs @@ -361,6 +361,7 @@ mod tests { last_updated: None, created_at: None, location: None, + latest_balance: None, }; match app_state.node_store.update_node(validated).await { diff --git a/crates/discovery/src/chainsync/sync.rs b/crates/discovery/src/chainsync/sync.rs index 55812824..6101c87a 100644 --- a/crates/discovery/src/chainsync/sync.rs +++ b/crates/discovery/src/chainsync/sync.rs @@ -1,5 +1,6 @@ use crate::store::node_store::NodeStore; use alloy::primitives::Address; +use alloy::providers::Provider as _; use alloy::providers::RootProvider; use anyhow::Error; use futures::stream::{self, StreamExt}; @@ -18,6 +19,7 @@ pub struct ChainSync { pub node_store: Arc, cancel_token: CancellationToken, chain_sync_interval: Duration, + provider: RootProvider, contracts: Contracts, last_chain_sync: Arc>>, } @@ -27,6 +29,7 @@ impl ChainSync { node_store: Arc, cancellation_token: CancellationToken, chain_sync_interval: Duration, + provider: RootProvider, contracts: Contracts, last_chain_sync: Arc>>, ) -> Self { @@ -34,99 +37,19 @@ impl ChainSync { node_store, cancel_token: cancellation_token, chain_sync_interval, + provider, contracts, last_chain_sync, } } - async fn sync_single_node( - node_store: Arc, - contracts: Contracts, - node: DiscoveryNode, - ) -> Result<(), Error> { - let mut n = node.clone(); - - // Safely parse provider_address and node_address - let provider_address = Address::from_str(&node.provider_address).map_err(|e| { - error!( - "Failed to parse provider address '{}': {}", - node.provider_address, e - ); - anyhow::anyhow!("Invalid provider address") - })?; - - let node_address = Address::from_str(&node.id).map_err(|e| { - error!("Failed to parse node address '{}': {}", node.id, e); - anyhow::anyhow!("Invalid node address") - })?; - - let node_info = contracts - .compute_registry - .get_node(provider_address, node_address) - .await - .map_err(|e| { - error!( - "Error retrieving node info for provider {provider_address} and node {node_address}: {e}" - ); - anyhow::anyhow!("Failed to retrieve node info") - })?; - - let provider_info = contracts - .compute_registry - .get_provider(provider_address) - .await - .map_err(|e| { - error!("Error retrieving provider info for {provider_address}: {e}"); - anyhow::anyhow!("Failed to retrieve provider info") - })?; - - let (is_active, is_validated) = node_info; - n.is_active = is_active; - n.is_validated = is_validated; - n.is_provider_whitelisted = provider_info.is_whitelisted; - - // Handle potential errors from async calls - let is_blacklisted = contracts - .compute_pool - .is_node_blacklisted(node.node.compute_pool_id, node_address) - .await - .map_err(|e| { - error!( - "Error checking if node {} is blacklisted in pool {}: {}", - node_address, node.node.compute_pool_id, e - ); - anyhow::anyhow!("Failed to check blacklist status") - })?; - n.is_blacklisted = is_blacklisted; - - // Only update if the node has changed - if n.is_active != node.is_active - || n.is_validated != node.is_validated - || n.is_provider_whitelisted != node.is_provider_whitelisted - || n.is_blacklisted != node.is_blacklisted - { - match node_store.update_node(n).await { - Ok(_) => { - debug!("Successfully updated node {}", node.id); - Ok(()) - } - Err(e) => { - error!("Error updating node {}: {}", node.id, e); - Err(anyhow::anyhow!("Failed to update node: {}", e)) - } - } - } else { - debug!("Node {} unchanged, skipping update", node.id); - Ok(()) - } - } - pub fn run(self) -> Result<(), Error> { let ChainSync { node_store, cancel_token, chain_sync_interval, last_chain_sync, + provider, contracts, } = self; @@ -153,9 +76,10 @@ impl ChainSync { let results: Vec> = stream::iter(nodes) .map(|node| { let node_store = node_store.clone(); + let provider = provider.clone(); let contracts = contracts.clone(); async move { - ChainSync::sync_single_node(node_store, contracts, node).await + sync_single_node(node_store, provider, contracts, node).await } }) .buffer_unordered(MAX_CONCURRENT_SYNCS) @@ -207,3 +131,96 @@ impl ChainSync { Ok(()) } } + +async fn sync_single_node( + node_store: Arc, + provider: RootProvider, + contracts: Contracts, + node: DiscoveryNode, +) -> Result<(), Error> { + let mut n = node.clone(); + + // Safely parse provider_address and node_address + let provider_address = Address::from_str(&node.provider_address).map_err(|e| { + error!( + "Failed to parse provider address '{}': {}", + node.provider_address, e + ); + anyhow::anyhow!("Invalid provider address") + })?; + + let node_address = Address::from_str(&node.id).map_err(|e| { + error!("Failed to parse node address '{}': {}", node.id, e); + anyhow::anyhow!("Invalid node address") + })?; + + let balance = provider.get_balance(node_address).await.map_err(|e| { + error!("Error retrieving balance for node {}: {}", node_address, e); + anyhow::anyhow!("Failed to retrieve node balance") + })?; + n.latest_balance = Some(balance); + + let node_info = contracts + .compute_registry + .get_node(provider_address, node_address) + .await + .map_err(|e| { + error!( + "Error retrieving node info for provider {} and node {}: {}", + provider_address, node_address, e + ); + anyhow::anyhow!("Failed to retrieve node info") + })?; + + let provider_info = contracts + .compute_registry + .get_provider(provider_address) + .await + .map_err(|e| { + error!( + "Error retrieving provider info for {}: {}", + provider_address, e + ); + anyhow::anyhow!("Failed to retrieve provider info") + })?; + + let (is_active, is_validated) = node_info; + n.is_active = is_active; + n.is_validated = is_validated; + n.is_provider_whitelisted = provider_info.is_whitelisted; + + // Handle potential errors from async calls + let is_blacklisted = contracts + .compute_pool + .is_node_blacklisted(node.node.compute_pool_id, node_address) + .await + .map_err(|e| { + error!( + "Error checking if node {} is blacklisted in pool {}: {}", + node_address, node.node.compute_pool_id, e + ); + anyhow::anyhow!("Failed to check blacklist status") + })?; + n.is_blacklisted = is_blacklisted; + + // Only update if the node has changed + if n.is_active != node.is_active + || n.is_validated != node.is_validated + || n.is_provider_whitelisted != node.is_provider_whitelisted + || n.is_blacklisted != node.is_blacklisted + { + match node_store.update_node(n).await { + Ok(_) => { + debug!("Successfully updated node {}", node.id); + Ok(()) + } + Err(e) => { + error!("Error updating node {}: {}", node.id, e); + Err(anyhow::anyhow!("Failed to update node: {}", e)) + } + } + } else { + debug!("Node {} unchanged, skipping update", node.id); + Ok(()) + } +} diff --git a/crates/discovery/src/main.rs b/crates/discovery/src/main.rs index e0a0b100..00849395 100644 --- a/crates/discovery/src/main.rs +++ b/crates/discovery/src/main.rs @@ -96,7 +96,7 @@ async fn main() -> Result<()> { }; let provider = RootProvider::new_http(endpoint); - let contracts = ContractBuilder::new(provider) + let contracts = ContractBuilder::new(provider.clone()) .with_compute_registry() .with_ai_token() .with_prime_network() @@ -116,6 +116,7 @@ async fn main() -> Result<()> { node_store.clone(), cancellation_token.clone(), Duration::from_secs(10), + provider, contracts.clone(), last_chain_sync.clone(), ); diff --git a/crates/orchestrator/src/discovery/monitor.rs b/crates/orchestrator/src/discovery/monitor.rs index c29bdbf3..dd230b5a 100644 --- a/crates/orchestrator/src/discovery/monitor.rs +++ b/crates/orchestrator/src/discovery/monitor.rs @@ -4,6 +4,7 @@ use crate::plugins::StatusUpdatePlugin; use crate::store::core::StoreContext; use crate::utils::loop_heartbeats::LoopHeartbeats; use alloy::primitives::Address; +use alloy::primitives::U256; use anyhow::Error; use anyhow::Result; use chrono::Utc; @@ -381,6 +382,21 @@ impl DiscoveryMonitor { } } } + + if let Some(balance) = discovery_node.latest_balance { + if balance == U256::ZERO { + info!( + "Node {} has zero balance, marking as low balance", + node_address + ); + if let Err(e) = self + .update_node_status(&node_address, NodeStatus::LowBalance) + .await + { + error!("Error updating node status: {}", e); + } + } + } } Ok(None) => { // Don't add new node if there's already a healthy node with same IP and port diff --git a/crates/orchestrator/src/models/node.rs b/crates/orchestrator/src/models/node.rs index 8b7b5332..bf9b4b95 100644 --- a/crates/orchestrator/src/models/node.rs +++ b/crates/orchestrator/src/models/node.rs @@ -81,6 +81,7 @@ pub enum NodeStatus { Dead, Ejected, Banned, + LowBalance, } impl Display for NodeStatus { diff --git a/crates/orchestrator/src/plugins/node_groups/status_update_impl.rs b/crates/orchestrator/src/plugins/node_groups/status_update_impl.rs index 568b297f..79d3e1ca 100644 --- a/crates/orchestrator/src/plugins/node_groups/status_update_impl.rs +++ b/crates/orchestrator/src/plugins/node_groups/status_update_impl.rs @@ -20,7 +20,7 @@ impl StatusUpdatePlugin for NodeGroupsPlugin { ); match node.status { - NodeStatus::Dead => { + NodeStatus::Dead | NodeStatus::LowBalance => { // Dissolve entire group if node becomes unhealthy if let Some(group) = self.get_node_group(&node_addr).await? { info!( diff --git a/crates/orchestrator/src/store/domains/node_store.rs b/crates/orchestrator/src/store/domains/node_store.rs index 2acfa97e..5e4f42d8 100644 --- a/crates/orchestrator/src/store/domains/node_store.rs +++ b/crates/orchestrator/src/store/domains/node_store.rs @@ -84,7 +84,6 @@ impl NodeStore { .map_err(|e| anyhow::anyhow!("Failed to serialize location: {}", e))?; fields.push(("location".to_string(), location_json)); } - Ok(fields) } diff --git a/crates/shared/src/models/node.rs b/crates/shared/src/models/node.rs index f0ba4e01..0466492c 100644 --- a/crates/shared/src/models/node.rs +++ b/crates/shared/src/models/node.rs @@ -1,14 +1,16 @@ +use alloy::primitives::U256; use anyhow::anyhow; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use std::fmt; use std::ops::Deref; use std::str::FromStr; -use utoipa::ToSchema; +use utoipa::{openapi::Object, ToSchema}; #[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Default, ToSchema)] pub struct Node { pub id: String, + // the node's on-chain address. pub provider_address: String, pub ip_address: String, pub port: u16, @@ -563,6 +565,16 @@ pub struct DiscoveryNode { pub created_at: Option>, #[serde(default)] pub location: Option, + #[schema(schema_with = u256_schema)] + pub latest_balance: Option, +} + +fn u256_schema() -> Object { + utoipa::openapi::ObjectBuilder::new() + .schema_type(utoipa::openapi::schema::Type::String) + .description(Some("A U256 value represented as a decimal string")) + .examples(Some(serde_json::json!("1000000000000000000"))) + .build() } impl DiscoveryNode { @@ -576,6 +588,7 @@ impl DiscoveryNode { last_updated: Some(Utc::now()), created_at: self.created_at, location: self.location.clone(), + latest_balance: self.latest_balance, } } } @@ -599,6 +612,7 @@ impl From for DiscoveryNode { last_updated: None, created_at: Some(Utc::now()), location: None, + latest_balance: None, } } }