Skip to content

Improvement: more orchestrator asynchronism #295

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions crates/orchestrator/src/discovery/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use crate::utils::loop_heartbeats::LoopHeartbeats;
use alloy::primitives::Address;
use anyhow::Error;
use anyhow::Result;
use futures::stream;
use futures::StreamExt;
use log::{error, info};
use serde_json;
use shared::models::api::ApiResponse;
Expand Down Expand Up @@ -209,11 +211,13 @@ impl<'b> DiscoveryMonitor<'b> {
async fn get_nodes(&self) -> Result<Vec<OrchestratorNode>, Error> {
let discovery_nodes = self.fetch_nodes_from_discovery().await?;

for discovery_node in &discovery_nodes {
if let Err(e) = self.sync_single_node_with_discovery(discovery_node).await {
error!("Error syncing node with discovery: {}", e);
}
}
stream::iter(discovery_nodes.iter())
.for_each_concurrent(10, |node| async move {
if let Err(e) = self.sync_single_node_with_discovery(node).await {
error!("Error syncing node with discovery: {}", e);
}
})
.await;

Ok(discovery_nodes
.into_iter()
Expand Down
222 changes: 115 additions & 107 deletions crates/orchestrator/src/status_update/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::plugins::StatusUpdatePlugin;
use crate::store::core::StoreContext;
use crate::utils::loop_heartbeats::LoopHeartbeats;
use anyhow::Ok;
use futures::{stream, StreamExt};
use log::{debug, error, info};
use shared::web3::contracts::core::builder::Contracts;
use std::result::Result;
Expand Down Expand Up @@ -115,8 +116,8 @@ impl NodeStatusUpdater {

pub async fn sync_chain_with_nodes(&self) -> Result<(), anyhow::Error> {
let nodes = self.store_context.node_store.get_nodes();
for node in nodes {
if node.status == NodeStatus::Dead {
stream::iter(nodes)
.for_each_concurrent(10, |node| async move {
let node_in_pool = self.is_node_in_pool(&node).await;
debug!("Node {:?} is in pool: {}", node.address, node_in_pool);
if node_in_pool {
Expand All @@ -131,134 +132,141 @@ impl NodeStatusUpdater {
);
}
}
}
}
})
.await;

Ok(())
}

pub async fn process_nodes(&self) -> Result<(), anyhow::Error> {
let nodes = self.store_context.node_store.get_nodes();
for node in nodes {
let node = node.clone();
let old_status = node.status.clone();
let heartbeat = self
.store_context
.heartbeat_store
.get_heartbeat(&node.address);
let unhealthy_counter: u32 = self
.store_context
.heartbeat_store
.get_unhealthy_counter(&node.address);

let is_node_in_pool = self.is_node_in_pool(&node).await;
let mut status_changed = false;
let mut new_status = node.status.clone();

match heartbeat {
Some(beat) => {
// Update version if necessary
if let Some(version) = &beat.version {
if node.version.as_ref() != Some(version) {
let _: () = self
.store_context
.node_store
.update_node_version(&node.address, version);
}
stream::iter(nodes)
.for_each_concurrent(10, |node| async move {
if let Err(e) = self._process_node(&node).await {
error!("Error processing node {:?}: {}", node.address, e);
}
})
.await;
Ok(())
}

async fn _process_node(&self, node: &OrchestratorNode) -> Result<(), anyhow::Error> {
let old_status = node.status.clone();
let heartbeat = self
.store_context
.heartbeat_store
.get_heartbeat(&node.address);
let unhealthy_counter: u32 = self
.store_context
.heartbeat_store
.get_unhealthy_counter(&node.address);

let is_node_in_pool = self.is_node_in_pool(node).await;
let mut status_changed = false;
let mut new_status = node.status.clone();

match heartbeat {
Some(beat) => {
// Update version if necessary
if let Some(version) = &beat.version {
if node.version.as_ref() != Some(version) {
let _: () = self
.store_context
.node_store
.update_node_version(&node.address, version);
}
}

// Check if the node is in the pool (needed for status transitions)
// Check if the node is in the pool (needed for status transitions)

// If node is Unhealthy or WaitingForHeartbeat:
if node.status == NodeStatus::Unhealthy
|| node.status == NodeStatus::WaitingForHeartbeat
{
if is_node_in_pool {
new_status = NodeStatus::Healthy;
} else {
// Reset to discovered to init re-invite to pool
new_status = NodeStatus::Discovered;
}
// If node is Unhealthy or WaitingForHeartbeat:
if node.status == NodeStatus::Unhealthy
|| node.status == NodeStatus::WaitingForHeartbeat
{
if is_node_in_pool {
new_status = NodeStatus::Healthy;
} else {
// Reset to discovered to init re-invite to pool
new_status = NodeStatus::Discovered;
}
status_changed = true;
}
// If node is Discovered or Dead:
else if node.status == NodeStatus::Discovered || node.status == NodeStatus::Dead {
if is_node_in_pool {
new_status = NodeStatus::Healthy;
} else {
new_status = NodeStatus::Discovered;
}
status_changed = true;
}

// Clear unhealthy counter on heartbeat receipt
let _: () = self
.store_context
.heartbeat_store
.clear_unhealthy_counter(&node.address);
}
None => {
// We don't have a heartbeat, increment unhealthy counter
self.store_context
.heartbeat_store
.increment_unhealthy_counter(&node.address);

match node.status {
NodeStatus::Healthy => {
new_status = NodeStatus::Unhealthy;
status_changed = true;
}
// If node is Discovered or Dead:
else if node.status == NodeStatus::Discovered
|| node.status == NodeStatus::Dead
{
if is_node_in_pool {
new_status = NodeStatus::Healthy;
} else {
new_status = NodeStatus::Discovered;
NodeStatus::Unhealthy => {
if unhealthy_counter + 1 >= self.missing_heartbeat_threshold {
new_status = NodeStatus::Dead;
status_changed = true;
}
status_changed = true;
}

// Clear unhealthy counter on heartbeat receipt
let _: () = self
.store_context
.heartbeat_store
.clear_unhealthy_counter(&node.address);
}
None => {
// We don't have a heartbeat, increment unhealthy counter
self.store_context
.heartbeat_store
.increment_unhealthy_counter(&node.address);

match node.status {
NodeStatus::Healthy => {
NodeStatus::Discovered => {
if is_node_in_pool {
// We have caught a very interesting edge case here.
// The node is in pool but does not send heartbeats - maybe due to a downtime of the orchestrator?
// Node invites fail now since the node cannot be in pool again.
// We have to eject and re-invite - we can simply do this by setting the status to unhealthy. The node will eventually be ejected.
new_status = NodeStatus::Unhealthy;
status_changed = true;
}
NodeStatus::Unhealthy => {
if unhealthy_counter + 1 >= self.missing_heartbeat_threshold {
} else {
// if we've been trying to invite this node for a while, we eventually give up and mark it as dead
// The node will simply be in status discovered again when the discovery svc date > status change date.
if unhealthy_counter + 1 > 360 {
new_status = NodeStatus::Dead;
status_changed = true;
}
}
NodeStatus::Discovered => {
if is_node_in_pool {
// We have caught a very interesting edge case here.
// The node is in pool but does not send heartbeats - maybe due to a downtime of the orchestrator?
// Node invites fail now since the node cannot be in pool again.
// We have to eject and re-invite - we can simply do this by setting the status to unhealthy. The node will eventually be ejected.
new_status = NodeStatus::Unhealthy;
status_changed = true;
} else {
// if we've been trying to invite this node for a while, we eventually give up and mark it as dead
// The node will simply be in status discovered again when the discovery svc date > status change date.
if unhealthy_counter + 1 > 360 {
new_status = NodeStatus::Dead;
status_changed = true;
}
}
}
NodeStatus::WaitingForHeartbeat => {
if unhealthy_counter + 1 >= self.missing_heartbeat_threshold {
// Unhealthy counter is reset when node is invited
// usually it starts directly with heartbeat
new_status = NodeStatus::Unhealthy;
status_changed = true;
}
}
NodeStatus::WaitingForHeartbeat => {
if unhealthy_counter + 1 >= self.missing_heartbeat_threshold {
// Unhealthy counter is reset when node is invited
// usually it starts directly with heartbeat
new_status = NodeStatus::Unhealthy;
status_changed = true;
}
_ => (),
}
_ => (),
}
}
}

if status_changed {
let _: () = self
.store_context
.node_store
.update_node_status(&node.address, new_status);

if let Some(updated_node) = self.store_context.node_store.get_node(&node.address) {
for plugin in self.plugins.iter() {
if let Err(e) = plugin
.handle_status_change(&updated_node, &old_status)
.await
{
error!("Error handling status change: {}", e);
}
if status_changed {
let _: () = self
.store_context
.node_store
.update_node_status(&node.address, new_status);

if let Some(updated_node) = self.store_context.node_store.get_node(&node.address) {
for plugin in self.plugins.iter() {
if let Err(e) = plugin
.handle_status_change(&updated_node, &old_status)
.await
{
error!("Error handling status change: {}", e);
}
}
}
Expand Down