Skip to content

feat: exclude nodes with zero balance from node_groups #573

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/discovery/src/api/routes/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,7 @@ mod tests {
last_updated: None,
created_at: None,
location: None,
latest_balance: None,
};

match app_state.node_store.update_node(validated).await {
Expand Down
183 changes: 100 additions & 83 deletions crates/discovery/src/chainsync/sync.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::store::node_store::NodeStore;
use alloy::primitives::Address;
use alloy::providers::Provider as _;
use alloy::providers::RootProvider;
use anyhow::Error;
use futures::stream::{self, StreamExt};
Expand All @@ -18,6 +19,7 @@ pub struct ChainSync {
pub node_store: Arc<NodeStore>,
cancel_token: CancellationToken,
chain_sync_interval: Duration,
provider: RootProvider,
contracts: Contracts<RootProvider>,
last_chain_sync: Arc<Mutex<Option<std::time::SystemTime>>>,
}
Expand All @@ -27,106 +29,27 @@ impl ChainSync {
node_store: Arc<NodeStore>,
cancellation_token: CancellationToken,
chain_sync_interval: Duration,
provider: RootProvider,
contracts: Contracts<RootProvider>,
last_chain_sync: Arc<Mutex<Option<std::time::SystemTime>>>,
) -> Self {
Self {
node_store,
cancel_token: cancellation_token,
chain_sync_interval,
provider,
contracts,
last_chain_sync,
}
}

async fn sync_single_node(
node_store: Arc<NodeStore>,
contracts: Contracts<RootProvider>,
node: DiscoveryNode,
) -> Result<(), Error> {
let mut n = node.clone();

// Safely parse provider_address and node_address
let provider_address = Address::from_str(&node.provider_address).map_err(|e| {
error!(
"Failed to parse provider address '{}': {}",
node.provider_address, e
);
anyhow::anyhow!("Invalid provider address")
})?;

let node_address = Address::from_str(&node.id).map_err(|e| {
error!("Failed to parse node address '{}': {}", node.id, e);
anyhow::anyhow!("Invalid node address")
})?;

let node_info = contracts
.compute_registry
.get_node(provider_address, node_address)
.await
.map_err(|e| {
error!(
"Error retrieving node info for provider {provider_address} and node {node_address}: {e}"
);
anyhow::anyhow!("Failed to retrieve node info")
})?;

let provider_info = contracts
.compute_registry
.get_provider(provider_address)
.await
.map_err(|e| {
error!("Error retrieving provider info for {provider_address}: {e}");
anyhow::anyhow!("Failed to retrieve provider info")
})?;

let (is_active, is_validated) = node_info;
n.is_active = is_active;
n.is_validated = is_validated;
n.is_provider_whitelisted = provider_info.is_whitelisted;

// Handle potential errors from async calls
let is_blacklisted = contracts
.compute_pool
.is_node_blacklisted(node.node.compute_pool_id, node_address)
.await
.map_err(|e| {
error!(
"Error checking if node {} is blacklisted in pool {}: {}",
node_address, node.node.compute_pool_id, e
);
anyhow::anyhow!("Failed to check blacklist status")
})?;
n.is_blacklisted = is_blacklisted;

// Only update if the node has changed
if n.is_active != node.is_active
|| n.is_validated != node.is_validated
|| n.is_provider_whitelisted != node.is_provider_whitelisted
|| n.is_blacklisted != node.is_blacklisted
{
match node_store.update_node(n).await {
Ok(_) => {
debug!("Successfully updated node {}", node.id);
Ok(())
}
Err(e) => {
error!("Error updating node {}: {}", node.id, e);
Err(anyhow::anyhow!("Failed to update node: {}", e))
}
}
} else {
debug!("Node {} unchanged, skipping update", node.id);
Ok(())
}
}

pub fn run(self) -> Result<(), Error> {
let ChainSync {
node_store,
cancel_token,
chain_sync_interval,
last_chain_sync,
provider,
contracts,
} = self;

Expand All @@ -153,9 +76,10 @@ impl ChainSync {
let results: Vec<Result<(), Error>> = stream::iter(nodes)
.map(|node| {
let node_store = node_store.clone();
let provider = provider.clone();
let contracts = contracts.clone();
async move {
ChainSync::sync_single_node(node_store, contracts, node).await
sync_single_node(node_store, provider, contracts, node).await
}
})
.buffer_unordered(MAX_CONCURRENT_SYNCS)
Expand Down Expand Up @@ -207,3 +131,96 @@ impl ChainSync {
Ok(())
}
}

async fn sync_single_node(
node_store: Arc<NodeStore>,
provider: RootProvider,
contracts: Contracts<RootProvider>,
node: DiscoveryNode,
) -> Result<(), Error> {
let mut n = node.clone();

// Safely parse provider_address and node_address
let provider_address = Address::from_str(&node.provider_address).map_err(|e| {
error!(
"Failed to parse provider address '{}': {}",
node.provider_address, e
);
anyhow::anyhow!("Invalid provider address")
})?;

let node_address = Address::from_str(&node.id).map_err(|e| {
error!("Failed to parse node address '{}': {}", node.id, e);
anyhow::anyhow!("Invalid node address")
})?;

let balance = provider.get_balance(node_address).await.map_err(|e| {
error!("Error retrieving balance for node {}: {}", node_address, e);
anyhow::anyhow!("Failed to retrieve node balance")
})?;
n.latest_balance = Some(balance);

let node_info = contracts
.compute_registry
.get_node(provider_address, node_address)
.await
.map_err(|e| {
error!(
"Error retrieving node info for provider {} and node {}: {}",
provider_address, node_address, e
);
anyhow::anyhow!("Failed to retrieve node info")
})?;

let provider_info = contracts
.compute_registry
.get_provider(provider_address)
.await
.map_err(|e| {
error!(
"Error retrieving provider info for {}: {}",
provider_address, e
);
anyhow::anyhow!("Failed to retrieve provider info")
})?;

let (is_active, is_validated) = node_info;
n.is_active = is_active;
n.is_validated = is_validated;
n.is_provider_whitelisted = provider_info.is_whitelisted;

// Handle potential errors from async calls
let is_blacklisted = contracts
.compute_pool
.is_node_blacklisted(node.node.compute_pool_id, node_address)
.await
.map_err(|e| {
error!(
"Error checking if node {} is blacklisted in pool {}: {}",
node_address, node.node.compute_pool_id, e
);
anyhow::anyhow!("Failed to check blacklist status")
})?;
n.is_blacklisted = is_blacklisted;

// Only update if the node has changed
if n.is_active != node.is_active
|| n.is_validated != node.is_validated
|| n.is_provider_whitelisted != node.is_provider_whitelisted
|| n.is_blacklisted != node.is_blacklisted
{
match node_store.update_node(n).await {
Ok(_) => {
debug!("Successfully updated node {}", node.id);
Ok(())
}
Err(e) => {
error!("Error updating node {}: {}", node.id, e);
Err(anyhow::anyhow!("Failed to update node: {}", e))
}
}
} else {
debug!("Node {} unchanged, skipping update", node.id);
Ok(())
}
}
3 changes: 2 additions & 1 deletion crates/discovery/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ async fn main() -> Result<()> {
};

let provider = RootProvider::new_http(endpoint);
let contracts = ContractBuilder::new(provider)
let contracts = ContractBuilder::new(provider.clone())
.with_compute_registry()
.with_ai_token()
.with_prime_network()
Expand All @@ -116,6 +116,7 @@ async fn main() -> Result<()> {
node_store.clone(),
cancellation_token.clone(),
Duration::from_secs(10),
provider,
contracts.clone(),
last_chain_sync.clone(),
);
Expand Down
16 changes: 16 additions & 0 deletions crates/orchestrator/src/discovery/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::plugins::StatusUpdatePlugin;
use crate::store::core::StoreContext;
use crate::utils::loop_heartbeats::LoopHeartbeats;
use alloy::primitives::Address;
use alloy::primitives::U256;
use anyhow::Error;
use anyhow::Result;
use chrono::Utc;
Expand Down Expand Up @@ -381,6 +382,21 @@ impl DiscoveryMonitor {
}
}
}

if let Some(balance) = discovery_node.latest_balance {
if balance == U256::ZERO {
info!(
"Node {} has zero balance, marking as low balance",
node_address
);
if let Err(e) = self
.update_node_status(&node_address, NodeStatus::LowBalance)
.await
{
error!("Error updating node status: {}", e);
}
}
}
}
Ok(None) => {
// Don't add new node if there's already a healthy node with same IP and port
Expand Down
1 change: 1 addition & 0 deletions crates/orchestrator/src/models/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ pub enum NodeStatus {
Dead,
Ejected,
Banned,
LowBalance,
}

impl Display for NodeStatus {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ impl StatusUpdatePlugin for NodeGroupsPlugin {
);

match node.status {
NodeStatus::Dead => {
NodeStatus::Dead | NodeStatus::LowBalance => {
// Dissolve entire group if node becomes unhealthy
if let Some(group) = self.get_node_group(&node_addr).await? {
info!(
Expand Down
1 change: 0 additions & 1 deletion crates/orchestrator/src/store/domains/node_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ impl NodeStore {
.map_err(|e| anyhow::anyhow!("Failed to serialize location: {}", e))?;
fields.push(("location".to_string(), location_json));
}

Ok(fields)
}

Expand Down
16 changes: 15 additions & 1 deletion crates/shared/src/models/node.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
use alloy::primitives::U256;
use anyhow::anyhow;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::fmt;
use std::ops::Deref;
use std::str::FromStr;
use utoipa::ToSchema;
use utoipa::{openapi::Object, ToSchema};

#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Default, ToSchema)]
pub struct Node {
pub id: String,
// the node's on-chain address.
pub provider_address: String,
pub ip_address: String,
pub port: u16,
Expand Down Expand Up @@ -563,6 +565,16 @@ pub struct DiscoveryNode {
pub created_at: Option<DateTime<Utc>>,
#[serde(default)]
pub location: Option<NodeLocation>,
#[schema(schema_with = u256_schema)]
pub latest_balance: Option<U256>,
}

fn u256_schema() -> Object {
utoipa::openapi::ObjectBuilder::new()
.schema_type(utoipa::openapi::schema::Type::String)
.description(Some("A U256 value represented as a decimal string"))
.examples(Some(serde_json::json!("1000000000000000000")))
.build()
}

impl DiscoveryNode {
Expand All @@ -576,6 +588,7 @@ impl DiscoveryNode {
last_updated: Some(Utc::now()),
created_at: self.created_at,
location: self.location.clone(),
latest_balance: self.latest_balance,
}
}
}
Expand All @@ -599,6 +612,7 @@ impl From<Node> for DiscoveryNode {
last_updated: None,
created_at: Some(Utc::now()),
location: None,
latest_balance: None,
}
}
}
Expand Down