Skip to content

Commit b89c1ba

Browse files
committed
Feature: Add basic webhook support for orchestrator status changes (#276)
* add basic webhook support to notify when a node changes status on orchestrator * update helm chart
1 parent df539c6 commit b89c1ba

File tree

5 files changed

+121
-28
lines changed

5 files changed

+121
-28
lines changed

deployment/k8s/orchestrator-chart/templates/orchestrator-deployment.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,3 +73,7 @@ spec:
7373
value: "{{ .Values.env.LOG_LEVEL }}"
7474
- name: HOURLY_S3_UPLOAD_LIMIT
7575
value: "{{ .Values.env.HOURLY_S3_UPLOAD_LIMIT }}"
76+
{{- if .Values.env.WEBHOOK_URLS }}
77+
- name: WEBHOOK_URLS
78+
value: "{{ .Values.env.WEBHOOK_URLS }}"
79+
{{- end }}

orchestrator/Dockerfile

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ ENV S3_CREDENTIALS=""
2121
ENV BUCKET_NAME=""
2222
ENV LOG_LEVEL=""
2323
ENV HOURLY_S3_UPLOAD_LIMIT="2"
24+
ENV WEBHOOK_URLS=""
2425

2526
RUN echo '#!/bin/sh\n\
2627
exec /usr/local/bin/orchestrator \
@@ -40,6 +41,7 @@ $([ ! -z "$S3_CREDENTIALS" ] && echo "--s3-credentials $S3_CREDENTIALS") \
4041
$([ ! -z "$BUCKET_NAME" ] && echo "--bucket-name $BUCKET_NAME") \
4142
$([ ! -z "$LOG_LEVEL" ] && echo "--log-level $LOG_LEVEL") \
4243
$([ ! -z "$HOURLY_S3_UPLOAD_LIMIT" ] && echo "--hourly-s3-upload-limit $HOURLY_S3_UPLOAD_LIMIT") \
44+
$([ ! -z "$WEBHOOK_URLS" ] && echo "--webhook-urls $WEBHOOK_URLS") \
4345
"$@"' > /entrypoint.sh && \
4446
chmod +x /entrypoint.sh
4547

orchestrator/src/main.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,10 @@ struct Args {
8686
/// Log level
8787
#[arg(short = 'l', long, default_value = "info")]
8888
log_level: String,
89+
90+
/// Webhook urls (comma-separated string)
91+
#[arg(long, default_value = "")]
92+
webhook_urls: Option<String>,
8993
}
9094

9195
#[tokio::main]
@@ -174,6 +178,13 @@ async fn main() -> Result<()> {
174178
let status_update_store_context = store_context.clone();
175179
let status_update_heartbeats = heartbeats.clone();
176180
let status_update_contracts = contracts.clone();
181+
let webhook_urls = args
182+
.webhook_urls
183+
.clone()
184+
.unwrap_or_default()
185+
.split(',')
186+
.map(|s| s.to_string())
187+
.collect();
177188
tasks.spawn(async move {
178189
let status_updater = NodeStatusUpdater::new(
179190
status_update_store_context.clone(),
@@ -183,6 +194,7 @@ async fn main() -> Result<()> {
183194
compute_pool_id,
184195
args.disable_ejection,
185196
status_update_heartbeats.clone(),
197+
webhook_urls,
186198
);
187199
status_updater.run().await
188200
});

orchestrator/src/models/node.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use chrono::{DateTime, Utc};
33
use serde::{Deserialize, Serialize};
44
use shared::models::node::DiscoveryNode;
55
use shared::models::task::TaskState;
6-
use std::fmt;
6+
use std::fmt::{self, Display};
77

88
#[derive(Debug, Clone, Serialize, Deserialize)]
99
pub struct OrchestratorNode {
@@ -57,7 +57,6 @@ impl fmt::Display for OrchestratorNode {
5757
write!(f, "{}", serde_json::to_string(self).unwrap())
5858
}
5959
}
60-
6160
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
6261
pub enum NodeStatus {
6362
Discovered,
@@ -68,3 +67,9 @@ pub enum NodeStatus {
6867
Ejected,
6968
Banned,
7069
}
70+
71+
impl Display for NodeStatus {
72+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
73+
write!(f, "{:?}", self)
74+
}
75+
}

orchestrator/src/node/status_update.rs

Lines changed: 96 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ use crate::store::core::StoreContext;
33
use crate::utils::loop_heartbeats::LoopHeartbeats;
44
use anyhow::Ok;
55
use log::{debug, error, info};
6+
use reqwest::Client;
7+
use serde_json::json;
68
use shared::web3::contracts::core::builder::Contracts;
79
use std::result::Result;
810
use std::sync::Arc;
@@ -17,9 +19,12 @@ pub struct NodeStatusUpdater {
1719
pool_id: u32,
1820
disable_ejection: bool,
1921
heartbeats: Arc<LoopHeartbeats>,
22+
webhooks: Vec<String>,
23+
http_client: Client,
2024
}
2125

2226
impl NodeStatusUpdater {
27+
#[allow(clippy::too_many_arguments)]
2328
pub fn new(
2429
store_context: Arc<StoreContext>,
2530
update_interval: u64,
@@ -28,6 +33,7 @@ impl NodeStatusUpdater {
2833
pool_id: u32,
2934
disable_ejection: bool,
3035
heartbeats: Arc<LoopHeartbeats>,
36+
webhooks: Vec<String>,
3137
) -> Self {
3238
Self {
3339
store_context,
@@ -37,6 +43,8 @@ impl NodeStatusUpdater {
3743
pool_id,
3844
disable_ejection,
3945
heartbeats,
46+
webhooks,
47+
http_client: Client::new(),
4048
}
4149
}
4250

@@ -108,6 +116,55 @@ impl NodeStatusUpdater {
108116
Ok(())
109117
}
110118

119+
async fn trigger_webhooks(
120+
&self,
121+
node: &OrchestratorNode,
122+
old_status: NodeStatus,
123+
) -> Result<(), anyhow::Error> {
124+
if old_status == node.status
125+
|| node.status == NodeStatus::Unhealthy
126+
|| node.status == NodeStatus::Discovered
127+
{
128+
return Ok(());
129+
}
130+
131+
// If no webhooks configured, return early
132+
if self.webhooks.is_empty() {
133+
return Ok(());
134+
}
135+
136+
let payload = json!({
137+
"node_address": node.address.to_string(),
138+
"ip_address": node.ip_address,
139+
"port": node.port,
140+
"old_status": old_status.to_string(),
141+
"new_status": node.status.to_string(),
142+
"timestamp": chrono::Utc::now().to_rfc3339(),
143+
});
144+
145+
let webhooks = self.webhooks.clone();
146+
let client = self.http_client.clone();
147+
tokio::spawn(async move {
148+
for webhook_url in webhooks {
149+
if let Err(e) = client
150+
.post(&webhook_url)
151+
.json(&payload)
152+
.timeout(Duration::from_secs(5)) // Add timeout to prevent hanging
153+
.send()
154+
.await
155+
{
156+
error!("Failed to send webhook to {}: {}", webhook_url, e);
157+
} else {
158+
debug!("Webhook to {} triggered successfully", webhook_url);
159+
}
160+
}
161+
});
162+
163+
tokio::time::sleep(Duration::from_millis(50)).await;
164+
165+
Ok(())
166+
}
167+
111168
pub async fn sync_chain_with_nodes(&self) -> Result<(), anyhow::Error> {
112169
let nodes = self.store_context.node_store.get_nodes();
113170
for node in nodes {
@@ -134,7 +191,8 @@ impl NodeStatusUpdater {
134191
pub async fn process_nodes(&self) -> Result<(), anyhow::Error> {
135192
let nodes = self.store_context.node_store.get_nodes();
136193
for node in nodes {
137-
let mut node = node.clone();
194+
let node = node.clone();
195+
let old_status = node.status.clone();
138196
let heartbeat = self
139197
.store_context
140198
.heartbeat_store
@@ -145,6 +203,9 @@ impl NodeStatusUpdater {
145203
.get_unhealthy_counter(&node.address);
146204

147205
let is_node_in_pool = self.is_node_in_pool(&node).await;
206+
let mut status_changed = false;
207+
let mut new_status = node.status.clone();
208+
148209
match heartbeat {
149210
Some(beat) => {
150211
// Update version if necessary
@@ -164,29 +225,23 @@ impl NodeStatusUpdater {
164225
|| node.status == NodeStatus::WaitingForHeartbeat
165226
{
166227
if is_node_in_pool {
167-
node.status = NodeStatus::Healthy;
228+
new_status = NodeStatus::Healthy;
168229
} else {
169230
// Reset to discovered to init re-invite to pool
170-
node.status = NodeStatus::Discovered;
231+
new_status = NodeStatus::Discovered;
171232
}
172-
let _: () = self
173-
.store_context
174-
.node_store
175-
.update_node_status(&node.address, node.status);
233+
status_changed = true;
176234
}
177235
// If node is Discovered or Dead:
178236
else if node.status == NodeStatus::Discovered
179237
|| node.status == NodeStatus::Dead
180238
{
181239
if is_node_in_pool {
182-
node.status = NodeStatus::Healthy;
240+
new_status = NodeStatus::Healthy;
183241
} else {
184-
node.status = NodeStatus::Discovered;
242+
new_status = NodeStatus::Discovered;
185243
}
186-
let _: () = self
187-
.store_context
188-
.node_store
189-
.update_node_status(&node.address, node.status);
244+
status_changed = true;
190245
}
191246

192247
// Clear unhealthy counter on heartbeat receipt
@@ -203,15 +258,13 @@ impl NodeStatusUpdater {
203258

204259
match node.status {
205260
NodeStatus::Healthy => {
206-
self.store_context
207-
.node_store
208-
.update_node_status(&node.address, NodeStatus::Unhealthy);
261+
new_status = NodeStatus::Unhealthy;
262+
status_changed = true;
209263
}
210264
NodeStatus::Unhealthy => {
211265
if unhealthy_counter + 1 >= self.missing_heartbeat_threshold {
212-
self.store_context
213-
.node_store
214-
.update_node_status(&node.address, NodeStatus::Dead);
266+
new_status = NodeStatus::Dead;
267+
status_changed = true;
215268
}
216269
}
217270
NodeStatus::Discovered => {
@@ -220,24 +273,33 @@ impl NodeStatusUpdater {
220273
// The node is in pool but does not send heartbeats - maybe due to a downtime of the orchestrator?
221274
// Node invites fail now since the node cannot be in pool again.
222275
// We have to eject and re-invite - we can simply do this by setting the status to unhealthy. The node will eventually be ejected.
223-
self.store_context
224-
.node_store
225-
.update_node_status(&node.address, NodeStatus::Unhealthy);
276+
new_status = NodeStatus::Unhealthy;
277+
status_changed = true;
226278
}
227279
}
228280
NodeStatus::WaitingForHeartbeat => {
229281
if unhealthy_counter + 1 >= self.missing_heartbeat_threshold {
230282
// Unhealthy counter is reset when node is invited
231283
// usually it starts directly with heartbeat
232-
self.store_context
233-
.node_store
234-
.update_node_status(&node.address, NodeStatus::Unhealthy);
284+
new_status = NodeStatus::Unhealthy;
285+
status_changed = true;
235286
}
236287
}
237288
_ => (),
238289
}
239290
}
240291
}
292+
293+
if status_changed {
294+
let _: () = self
295+
.store_context
296+
.node_store
297+
.update_node_status(&node.address, new_status);
298+
299+
if let Some(updated_node) = self.store_context.node_store.get_node(&node.address) {
300+
let _ = self.trigger_webhooks(&updated_node, old_status).await;
301+
}
302+
}
241303
}
242304
Ok(())
243305
}
@@ -269,6 +331,7 @@ mod tests {
269331
0,
270332
false,
271333
Arc::new(LoopHeartbeats::new()),
334+
vec![],
272335
);
273336
let node = OrchestratorNode {
274337
address: Address::from_str("0x0000000000000000000000000000000000000000").unwrap(),
@@ -342,6 +405,7 @@ mod tests {
342405
0,
343406
false,
344407
Arc::new(LoopHeartbeats::new()),
408+
vec![],
345409
);
346410
tokio::spawn(async move {
347411
updater
@@ -386,6 +450,7 @@ mod tests {
386450
0,
387451
false,
388452
Arc::new(LoopHeartbeats::new()),
453+
vec![],
389454
);
390455
tokio::spawn(async move {
391456
updater
@@ -440,6 +505,7 @@ mod tests {
440505
0,
441506
false,
442507
Arc::new(LoopHeartbeats::new()),
508+
vec![],
443509
);
444510
tokio::spawn(async move {
445511
updater
@@ -498,6 +564,7 @@ mod tests {
498564
0,
499565
false,
500566
Arc::new(LoopHeartbeats::new()),
567+
vec![],
501568
);
502569
tokio::spawn(async move {
503570
updater
@@ -564,6 +631,7 @@ mod tests {
564631
0,
565632
false,
566633
Arc::new(LoopHeartbeats::new()),
634+
vec![],
567635
);
568636
tokio::spawn(async move {
569637
updater
@@ -629,6 +697,7 @@ mod tests {
629697
0,
630698
false,
631699
Arc::new(LoopHeartbeats::new()),
700+
vec![],
632701
);
633702
tokio::spawn(async move {
634703
updater
@@ -681,7 +750,6 @@ mod tests {
681750
version: None,
682751
last_status_change: None,
683752
};
684-
println!("Node: {:?}", node);
685753

686754
let _: () = app_state.store_context.node_store.add_node(node.clone());
687755
let updater = NodeStatusUpdater::new(
@@ -692,6 +760,7 @@ mod tests {
692760
0,
693761
false,
694762
Arc::new(LoopHeartbeats::new()),
763+
vec![],
695764
);
696765
tokio::spawn(async move {
697766
updater
@@ -748,6 +817,7 @@ mod tests {
748817
0,
749818
false,
750819
Arc::new(LoopHeartbeats::new()),
820+
vec![],
751821
);
752822
tokio::spawn(async move {
753823
updater

0 commit comments

Comments
 (0)