From 3bf67ba9d11e221fe13c43402bf8bdc7ba8b5d11 Mon Sep 17 00:00:00 2001 From: gsegatti Date: Sat, 26 Apr 2025 23:11:40 +0100 Subject: [PATCH] Improvement: more orchestrator asynchronism - Much like #294, but also applied to discovery and node. --- crates/orchestrator/src/discovery/monitor.rs | 14 +- crates/orchestrator/src/status_update/mod.rs | 222 ++++++++++--------- 2 files changed, 124 insertions(+), 112 deletions(-) diff --git a/crates/orchestrator/src/discovery/monitor.rs b/crates/orchestrator/src/discovery/monitor.rs index 7b5dbba5..bda5aa67 100644 --- a/crates/orchestrator/src/discovery/monitor.rs +++ b/crates/orchestrator/src/discovery/monitor.rs @@ -5,6 +5,8 @@ use crate::utils::loop_heartbeats::LoopHeartbeats; use alloy::primitives::Address; use anyhow::Error; use anyhow::Result; +use futures::stream; +use futures::StreamExt; use log::{error, info}; use serde_json; use shared::models::api::ApiResponse; @@ -209,11 +211,13 @@ impl<'b> DiscoveryMonitor<'b> { async fn get_nodes(&self) -> Result, Error> { let discovery_nodes = self.fetch_nodes_from_discovery().await?; - for discovery_node in &discovery_nodes { - if let Err(e) = self.sync_single_node_with_discovery(discovery_node).await { - error!("Error syncing node with discovery: {}", e); - } - } + stream::iter(discovery_nodes.iter()) + .for_each_concurrent(10, |node| async move { + if let Err(e) = self.sync_single_node_with_discovery(node).await { + error!("Error syncing node with discovery: {}", e); + } + }) + .await; Ok(discovery_nodes .into_iter() diff --git a/crates/orchestrator/src/status_update/mod.rs b/crates/orchestrator/src/status_update/mod.rs index 09db18e1..a3109b3c 100644 --- a/crates/orchestrator/src/status_update/mod.rs +++ b/crates/orchestrator/src/status_update/mod.rs @@ -3,6 +3,7 @@ use crate::plugins::StatusUpdatePlugin; use crate::store::core::StoreContext; use crate::utils::loop_heartbeats::LoopHeartbeats; use anyhow::Ok; +use futures::{stream, StreamExt}; use log::{debug, error, info}; use shared::web3::contracts::core::builder::Contracts; use std::result::Result; @@ -115,8 +116,8 @@ impl NodeStatusUpdater { pub async fn sync_chain_with_nodes(&self) -> Result<(), anyhow::Error> { let nodes = self.store_context.node_store.get_nodes(); - for node in nodes { - if node.status == NodeStatus::Dead { + stream::iter(nodes) + .for_each_concurrent(10, |node| async move { let node_in_pool = self.is_node_in_pool(&node).await; debug!("Node {:?} is in pool: {}", node.address, node_in_pool); if node_in_pool { @@ -131,134 +132,141 @@ impl NodeStatusUpdater { ); } } - } - } + }) + .await; + Ok(()) } pub async fn process_nodes(&self) -> Result<(), anyhow::Error> { let nodes = self.store_context.node_store.get_nodes(); - for node in nodes { - let node = node.clone(); - let old_status = node.status.clone(); - let heartbeat = self - .store_context - .heartbeat_store - .get_heartbeat(&node.address); - let unhealthy_counter: u32 = self - .store_context - .heartbeat_store - .get_unhealthy_counter(&node.address); - - let is_node_in_pool = self.is_node_in_pool(&node).await; - let mut status_changed = false; - let mut new_status = node.status.clone(); - - match heartbeat { - Some(beat) => { - // Update version if necessary - if let Some(version) = &beat.version { - if node.version.as_ref() != Some(version) { - let _: () = self - .store_context - .node_store - .update_node_version(&node.address, version); - } + stream::iter(nodes) + .for_each_concurrent(10, |node| async move { + if let Err(e) = self._process_node(&node).await { + error!("Error processing node {:?}: {}", node.address, e); + } + }) + .await; + Ok(()) + } + + async fn _process_node(&self, node: &OrchestratorNode) -> Result<(), anyhow::Error> { + let old_status = node.status.clone(); + let heartbeat = self + .store_context + .heartbeat_store + .get_heartbeat(&node.address); + let unhealthy_counter: u32 = self + .store_context + .heartbeat_store + .get_unhealthy_counter(&node.address); + + let is_node_in_pool = self.is_node_in_pool(node).await; + let mut status_changed = false; + let mut new_status = node.status.clone(); + + match heartbeat { + Some(beat) => { + // Update version if necessary + if let Some(version) = &beat.version { + if node.version.as_ref() != Some(version) { + let _: () = self + .store_context + .node_store + .update_node_version(&node.address, version); } + } - // Check if the node is in the pool (needed for status transitions) + // Check if the node is in the pool (needed for status transitions) - // If node is Unhealthy or WaitingForHeartbeat: - if node.status == NodeStatus::Unhealthy - || node.status == NodeStatus::WaitingForHeartbeat - { - if is_node_in_pool { - new_status = NodeStatus::Healthy; - } else { - // Reset to discovered to init re-invite to pool - new_status = NodeStatus::Discovered; - } + // If node is Unhealthy or WaitingForHeartbeat: + if node.status == NodeStatus::Unhealthy + || node.status == NodeStatus::WaitingForHeartbeat + { + if is_node_in_pool { + new_status = NodeStatus::Healthy; + } else { + // Reset to discovered to init re-invite to pool + new_status = NodeStatus::Discovered; + } + status_changed = true; + } + // If node is Discovered or Dead: + else if node.status == NodeStatus::Discovered || node.status == NodeStatus::Dead { + if is_node_in_pool { + new_status = NodeStatus::Healthy; + } else { + new_status = NodeStatus::Discovered; + } + status_changed = true; + } + + // Clear unhealthy counter on heartbeat receipt + let _: () = self + .store_context + .heartbeat_store + .clear_unhealthy_counter(&node.address); + } + None => { + // We don't have a heartbeat, increment unhealthy counter + self.store_context + .heartbeat_store + .increment_unhealthy_counter(&node.address); + + match node.status { + NodeStatus::Healthy => { + new_status = NodeStatus::Unhealthy; status_changed = true; } - // If node is Discovered or Dead: - else if node.status == NodeStatus::Discovered - || node.status == NodeStatus::Dead - { - if is_node_in_pool { - new_status = NodeStatus::Healthy; - } else { - new_status = NodeStatus::Discovered; + NodeStatus::Unhealthy => { + if unhealthy_counter + 1 >= self.missing_heartbeat_threshold { + new_status = NodeStatus::Dead; + status_changed = true; } - status_changed = true; } - - // Clear unhealthy counter on heartbeat receipt - let _: () = self - .store_context - .heartbeat_store - .clear_unhealthy_counter(&node.address); - } - None => { - // We don't have a heartbeat, increment unhealthy counter - self.store_context - .heartbeat_store - .increment_unhealthy_counter(&node.address); - - match node.status { - NodeStatus::Healthy => { + NodeStatus::Discovered => { + if is_node_in_pool { + // We have caught a very interesting edge case here. + // The node is in pool but does not send heartbeats - maybe due to a downtime of the orchestrator? + // Node invites fail now since the node cannot be in pool again. + // We have to eject and re-invite - we can simply do this by setting the status to unhealthy. The node will eventually be ejected. new_status = NodeStatus::Unhealthy; status_changed = true; - } - NodeStatus::Unhealthy => { - if unhealthy_counter + 1 >= self.missing_heartbeat_threshold { + } else { + // if we've been trying to invite this node for a while, we eventually give up and mark it as dead + // The node will simply be in status discovered again when the discovery svc date > status change date. + if unhealthy_counter + 1 > 360 { new_status = NodeStatus::Dead; status_changed = true; } } - NodeStatus::Discovered => { - if is_node_in_pool { - // We have caught a very interesting edge case here. - // The node is in pool but does not send heartbeats - maybe due to a downtime of the orchestrator? - // Node invites fail now since the node cannot be in pool again. - // We have to eject and re-invite - we can simply do this by setting the status to unhealthy. The node will eventually be ejected. - new_status = NodeStatus::Unhealthy; - status_changed = true; - } else { - // if we've been trying to invite this node for a while, we eventually give up and mark it as dead - // The node will simply be in status discovered again when the discovery svc date > status change date. - if unhealthy_counter + 1 > 360 { - new_status = NodeStatus::Dead; - status_changed = true; - } - } - } - NodeStatus::WaitingForHeartbeat => { - if unhealthy_counter + 1 >= self.missing_heartbeat_threshold { - // Unhealthy counter is reset when node is invited - // usually it starts directly with heartbeat - new_status = NodeStatus::Unhealthy; - status_changed = true; - } + } + NodeStatus::WaitingForHeartbeat => { + if unhealthy_counter + 1 >= self.missing_heartbeat_threshold { + // Unhealthy counter is reset when node is invited + // usually it starts directly with heartbeat + new_status = NodeStatus::Unhealthy; + status_changed = true; } - _ => (), } + _ => (), } } + } - if status_changed { - let _: () = self - .store_context - .node_store - .update_node_status(&node.address, new_status); - - if let Some(updated_node) = self.store_context.node_store.get_node(&node.address) { - for plugin in self.plugins.iter() { - if let Err(e) = plugin - .handle_status_change(&updated_node, &old_status) - .await - { - error!("Error handling status change: {}", e); - } + if status_changed { + let _: () = self + .store_context + .node_store + .update_node_status(&node.address, new_status); + + if let Some(updated_node) = self.store_context.node_store.get_node(&node.address) { + for plugin in self.plugins.iter() { + if let Err(e) = plugin + .handle_status_change(&updated_node, &old_status) + .await + { + error!("Error handling status change: {}", e); } } }