diff --git a/src/alerts/alert_types.rs b/src/alerts/alert_types.rs index 74be90393..81fbcbfb9 100644 --- a/src/alerts/alert_types.rs +++ b/src/alerts/alert_types.rs @@ -20,6 +20,7 @@ use std::time::Duration; use chrono::{DateTime, Utc}; use tonic::async_trait; +use tracing::trace; use ulid::Ulid; use crate::{ @@ -204,8 +205,48 @@ impl AlertTrait for ThresholdAlert { Box::new(self.clone()) } - fn set_state(&mut self, new_state: AlertState) { - self.state = new_state + async fn update_state( + &mut self, + is_manual: bool, + new_state: AlertState, + trigger_notif: Option, + ) -> Result<(), AlertError> { + let store = PARSEABLE.storage.get_object_store(); + match self.state { + AlertState::Triggered => { + if is_manual + && new_state != AlertState::Resolved + && new_state != AlertState::Silenced + { + let msg = format!("Not allowed to manually go from Triggered to {new_state}"); + return Err(AlertError::InvalidStateChange(msg)); + } + } + AlertState::Silenced => { + if is_manual && new_state != AlertState::Resolved { + let msg = format!("Not allowed to manually go from Silenced to {new_state}"); + return Err(AlertError::InvalidStateChange(msg)); + } + } + AlertState::Resolved => { + if is_manual { + let msg = format!("Not allowed to go manually from Resolved to {new_state}"); + return Err(AlertError::InvalidStateChange(msg)); + } + } + } + // update state in memory + self.state = new_state; + // update on disk + store.put_alert(self.id, &self.to_alert_config()).await?; + + if trigger_notif.is_some() { + trace!("trigger notif on-\n{}", self.state); + self.to_alert_config() + .trigger_notifications(trigger_notif.unwrap()) + .await?; + } + Ok(()) } } diff --git a/src/alerts/alerts_utils.rs b/src/alerts/alerts_utils.rs index f6eb0b1ce..be9676d37 100644 --- a/src/alerts/alerts_utils.rs +++ b/src/alerts/alerts_utils.rs @@ -137,7 +137,8 @@ async fn execute_remote_query(query: &str, time_range: &TimeRange) -> Result Result { // due to the previous validations, we can be sure that we get an array of objects with just one entry // [{"countField": Number(1120.251)}] - if let Some(array_val) = result_value.as_array().filter(|arr| !arr.is_empty()) + if let Some(array_val) = result_value.as_array() + && !array_val.is_empty() && let Some(object) = array_val[0].as_object() { let values = object.values().map(|v| v.as_f64().unwrap()).collect_vec(); diff --git a/src/alerts/mod.rs b/src/alerts/mod.rs index cb8e8c6a2..90aeb16f4 100644 --- a/src/alerts/mod.rs +++ b/src/alerts/mod.rs @@ -488,54 +488,6 @@ impl Display for AlertState { } } -impl AlertState { - pub async fn update_state( - &self, - new_state: AlertState, - alert_id: Ulid, - ) -> Result<(), AlertError> { - match self { - AlertState::Triggered => { - if new_state == AlertState::Triggered { - let msg = format!("Not allowed to manually go from Triggered to {new_state}"); - return Err(AlertError::InvalidStateChange(msg)); - } else { - // update state on disk and in memory - let guard = ALERTS.read().await; - let alerts = guard.as_ref().ok_or_else(|| { - AlertError::CustomError("Alert manager not initialized".into()) - })?; - alerts - .update_state(alert_id, new_state, Some("".into())) - .await?; - } - } - AlertState::Silenced => { - // from here, the user can only go to Resolved - if new_state == AlertState::Resolved { - // update state on disk and in memory - let guard = ALERTS.read().await; - let alerts = guard.as_ref().ok_or_else(|| { - AlertError::CustomError("Alert manager not initialized".into()) - })?; - alerts - .update_state(alert_id, new_state, Some("".into())) - .await?; - } else { - let msg = format!("Not allowed to manually go from Silenced to {new_state}"); - return Err(AlertError::InvalidStateChange(msg)); - } - } - AlertState::Resolved => { - // user shouldn't logically be changing states if current state is Resolved - let msg = format!("Not allowed to go manually from Resolved to {new_state}"); - return Err(AlertError::InvalidStateChange(msg)); - } - } - Ok(()) - } -} - #[derive(Debug, serde::Serialize, serde::Deserialize, Clone, Default)] #[serde(rename_all = "camelCase")] pub enum Severity { @@ -1530,11 +1482,11 @@ impl AlertManagerTrait for Alerts { Ok(alerts) } - /// Returns a sigle alert that the user has access to (based on query auth) - async fn get_alert_by_id(&self, id: Ulid) -> Result { + /// Returns a single alert that the user has access to (based on query auth) + async fn get_alert_by_id(&self, id: Ulid) -> Result, AlertError> { let read_access = self.alerts.read().await; if let Some(alert) = read_access.get(&id) { - Ok(alert.to_alert_config()) + Ok(alert.clone_box()) } else { Err(AlertError::CustomError(format!( "No alert found for the given ID- {id}" @@ -1557,31 +1509,33 @@ impl AlertManagerTrait for Alerts { new_state: AlertState, trigger_notif: Option, ) -> Result<(), AlertError> { - let store = PARSEABLE.storage.get_object_store(); + // let store = PARSEABLE.storage.get_object_store(); // read and modify alert - let mut alert = self.get_alert_by_id(alert_id).await?; - trace!("get alert state by id-\n{}", alert.state); - - alert.state = new_state; - - trace!("new state-\n{}", alert.state); - - // save to disk - store.put_alert(alert_id, &alert).await?; - - // modify in memory - let mut writer = self.alerts.write().await; - if let Some(alert) = writer.get_mut(&alert_id) { - trace!("in memory alert-\n{}", alert.get_state()); - alert.set_state(new_state); - trace!("in memory updated alert-\n{}", alert.get_state()); + let mut write_access = self.alerts.write().await; + let mut alert: Box = if let Some(alert) = write_access.get(&alert_id) { + match &alert.get_alert_type() { + AlertType::Threshold => { + Box::new(ThresholdAlert::from(alert.to_alert_config())) as Box + } + AlertType::Anomaly => { + return Err(AlertError::NotPresentInOSS("anomaly".into())); + } + AlertType::Forecast => { + return Err(AlertError::NotPresentInOSS("forecast".into())); + } + } + } else { + return Err(AlertError::CustomError(format!( + "No alert found for the given ID- {alert_id}" + ))); }; - drop(writer); - if trigger_notif.is_some() { - trace!("trigger notif on-\n{}", alert.state); - alert.trigger_notifications(trigger_notif.unwrap()).await?; + let current_state = alert.get_state(); + + if current_state.ne(&new_state) { + alert.update_state(false, new_state, trigger_notif).await?; + write_access.insert(*alert.get_id(), alert.clone_box()); } Ok(()) diff --git a/src/alerts/traits.rs b/src/alerts/traits.rs index 81dcd2555..303f6d42f 100644 --- a/src/alerts/traits.rs +++ b/src/alerts/traits.rs @@ -30,6 +30,12 @@ use ulid::Ulid; pub trait AlertTrait: Debug + Send + Sync { async fn eval_alert(&self) -> Result<(bool, f64), AlertError>; async fn validate(&self, session_key: &SessionKey) -> Result<(), AlertError>; + async fn update_state( + &mut self, + is_manual: bool, + new_state: AlertState, + trigger_notif: Option, + ) -> Result<(), AlertError>; fn get_id(&self) -> &Ulid; fn get_severity(&self) -> &Severity; fn get_title(&self) -> &str; @@ -46,7 +52,6 @@ pub trait AlertTrait: Debug + Send + Sync { fn get_datasets(&self) -> &Vec; fn to_alert_config(&self) -> AlertConfig; fn clone_box(&self) -> Box; - fn set_state(&mut self, new_state: AlertState); } #[async_trait] @@ -57,7 +62,7 @@ pub trait AlertManagerTrait: Send + Sync { session: SessionKey, tags: Vec, ) -> Result, AlertError>; - async fn get_alert_by_id(&self, id: Ulid) -> Result; + async fn get_alert_by_id(&self, id: Ulid) -> Result, AlertError>; async fn update(&self, alert: &dyn AlertTrait); async fn update_state( &self, diff --git a/src/handlers/http/alerts.rs b/src/handlers/http/alerts.rs index bcbeeb3af..4847cff3a 100644 --- a/src/handlers/http/alerts.rs +++ b/src/handlers/http/alerts.rs @@ -132,9 +132,9 @@ pub async fn get(req: HttpRequest, alert_id: Path) -> Result) -> Result Result {