Skip to content

bugfixes #1395

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 43 additions & 2 deletions src/alerts/alert_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::time::Duration;

use chrono::{DateTime, Utc};
use tonic::async_trait;
use tracing::trace;
use ulid::Ulid;

use crate::{
Expand Down Expand Up @@ -204,8 +205,48 @@ impl AlertTrait for ThresholdAlert {
Box::new(self.clone())
}

fn set_state(&mut self, new_state: AlertState) {
self.state = new_state
async fn update_state(
&mut self,
is_manual: bool,
new_state: AlertState,
trigger_notif: Option<String>,
) -> Result<(), AlertError> {
let store = PARSEABLE.storage.get_object_store();
match self.state {
AlertState::Triggered => {
if is_manual
&& new_state != AlertState::Resolved
&& new_state != AlertState::Silenced
{
let msg = format!("Not allowed to manually go from Triggered to {new_state}");
return Err(AlertError::InvalidStateChange(msg));
}
}
AlertState::Silenced => {
if is_manual && new_state != AlertState::Resolved {
let msg = format!("Not allowed to manually go from Silenced to {new_state}");
return Err(AlertError::InvalidStateChange(msg));
}
}
AlertState::Resolved => {
if is_manual {
let msg = format!("Not allowed to go manually from Resolved to {new_state}");
return Err(AlertError::InvalidStateChange(msg));
}
}
}
// update state in memory
self.state = new_state;
// update on disk
store.put_alert(self.id, &self.to_alert_config()).await?;

if trigger_notif.is_some() {
trace!("trigger notif on-\n{}", self.state);
self.to_alert_config()
.trigger_notifications(trigger_notif.unwrap())
.await?;
}
Ok(())
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/alerts/alerts_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ async fn execute_remote_query(query: &str, time_range: &TimeRange) -> Result<f64
fn convert_result_to_f64(result_value: serde_json::Value) -> Result<f64, AlertError> {
// due to the previous validations, we can be sure that we get an array of objects with just one entry
// [{"countField": Number(1120.251)}]
if let Some(array_val) = result_value.as_array().filter(|arr| !arr.is_empty())
if let Some(array_val) = result_value.as_array()
&& !array_val.is_empty()
&& let Some(object) = array_val[0].as_object()
{
let values = object.values().map(|v| v.as_f64().unwrap()).collect_vec();
Expand Down
98 changes: 26 additions & 72 deletions src/alerts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,54 +488,6 @@ impl Display for AlertState {
}
}

impl AlertState {
pub async fn update_state(
&self,
new_state: AlertState,
alert_id: Ulid,
) -> Result<(), AlertError> {
match self {
AlertState::Triggered => {
if new_state == AlertState::Triggered {
let msg = format!("Not allowed to manually go from Triggered to {new_state}");
return Err(AlertError::InvalidStateChange(msg));
} else {
// update state on disk and in memory
let guard = ALERTS.read().await;
let alerts = guard.as_ref().ok_or_else(|| {
AlertError::CustomError("Alert manager not initialized".into())
})?;
alerts
.update_state(alert_id, new_state, Some("".into()))
.await?;
}
}
AlertState::Silenced => {
// from here, the user can only go to Resolved
if new_state == AlertState::Resolved {
// update state on disk and in memory
let guard = ALERTS.read().await;
let alerts = guard.as_ref().ok_or_else(|| {
AlertError::CustomError("Alert manager not initialized".into())
})?;
alerts
.update_state(alert_id, new_state, Some("".into()))
.await?;
} else {
let msg = format!("Not allowed to manually go from Silenced to {new_state}");
return Err(AlertError::InvalidStateChange(msg));
}
}
AlertState::Resolved => {
// user shouldn't logically be changing states if current state is Resolved
let msg = format!("Not allowed to go manually from Resolved to {new_state}");
return Err(AlertError::InvalidStateChange(msg));
}
}
Ok(())
}
}

#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, Default)]
#[serde(rename_all = "camelCase")]
pub enum Severity {
Expand Down Expand Up @@ -1530,11 +1482,11 @@ impl AlertManagerTrait for Alerts {
Ok(alerts)
}

/// Returns a sigle alert that the user has access to (based on query auth)
async fn get_alert_by_id(&self, id: Ulid) -> Result<AlertConfig, AlertError> {
/// Returns a single alert that the user has access to (based on query auth)
async fn get_alert_by_id(&self, id: Ulid) -> Result<Box<dyn AlertTrait>, AlertError> {
let read_access = self.alerts.read().await;
if let Some(alert) = read_access.get(&id) {
Ok(alert.to_alert_config())
Ok(alert.clone_box())
} else {
Err(AlertError::CustomError(format!(
"No alert found for the given ID- {id}"
Expand All @@ -1557,31 +1509,33 @@ impl AlertManagerTrait for Alerts {
new_state: AlertState,
trigger_notif: Option<String>,
) -> Result<(), AlertError> {
let store = PARSEABLE.storage.get_object_store();
// let store = PARSEABLE.storage.get_object_store();

// read and modify alert
let mut alert = self.get_alert_by_id(alert_id).await?;
trace!("get alert state by id-\n{}", alert.state);

alert.state = new_state;

trace!("new state-\n{}", alert.state);

// save to disk
store.put_alert(alert_id, &alert).await?;

// modify in memory
let mut writer = self.alerts.write().await;
if let Some(alert) = writer.get_mut(&alert_id) {
trace!("in memory alert-\n{}", alert.get_state());
alert.set_state(new_state);
trace!("in memory updated alert-\n{}", alert.get_state());
let mut write_access = self.alerts.write().await;
let mut alert: Box<dyn AlertTrait> = if let Some(alert) = write_access.get(&alert_id) {
match &alert.get_alert_type() {
AlertType::Threshold => {
Box::new(ThresholdAlert::from(alert.to_alert_config())) as Box<dyn AlertTrait>
}
AlertType::Anomaly => {
return Err(AlertError::NotPresentInOSS("anomaly".into()));
}
AlertType::Forecast => {
return Err(AlertError::NotPresentInOSS("forecast".into()));
}
}
} else {
return Err(AlertError::CustomError(format!(
"No alert found for the given ID- {alert_id}"
)));
};
drop(writer);

if trigger_notif.is_some() {
trace!("trigger notif on-\n{}", alert.state);
alert.trigger_notifications(trigger_notif.unwrap()).await?;
let current_state = alert.get_state();

if current_state.ne(&new_state) {
alert.update_state(false, new_state, trigger_notif).await?;
write_access.insert(*alert.get_id(), alert.clone_box());
}

Ok(())
Expand Down
9 changes: 7 additions & 2 deletions src/alerts/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ use ulid::Ulid;
pub trait AlertTrait: Debug + Send + Sync {
async fn eval_alert(&self) -> Result<(bool, f64), AlertError>;
async fn validate(&self, session_key: &SessionKey) -> Result<(), AlertError>;
async fn update_state(
&mut self,
is_manual: bool,
new_state: AlertState,
trigger_notif: Option<String>,
) -> Result<(), AlertError>;
fn get_id(&self) -> &Ulid;
fn get_severity(&self) -> &Severity;
fn get_title(&self) -> &str;
Expand All @@ -46,7 +52,6 @@ pub trait AlertTrait: Debug + Send + Sync {
fn get_datasets(&self) -> &Vec<String>;
fn to_alert_config(&self) -> AlertConfig;
fn clone_box(&self) -> Box<dyn AlertTrait>;
fn set_state(&mut self, new_state: AlertState);
}

#[async_trait]
Expand All @@ -57,7 +62,7 @@ pub trait AlertManagerTrait: Send + Sync {
session: SessionKey,
tags: Vec<String>,
) -> Result<Vec<AlertConfig>, AlertError>;
async fn get_alert_by_id(&self, id: Ulid) -> Result<AlertConfig, AlertError>;
async fn get_alert_by_id(&self, id: Ulid) -> Result<Box<dyn AlertTrait>, AlertError>;
async fn update(&self, alert: &dyn AlertTrait);
async fn update_state(
&self,
Expand Down
20 changes: 9 additions & 11 deletions src/handlers/http/alerts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,9 @@ pub async fn get(req: HttpRequest, alert_id: Path<Ulid>) -> Result<impl Responde

let alert = alerts.get_alert_by_id(alert_id).await?;
// validate that the user has access to the tables mentioned in the query
user_auth_for_query(&session_key, &alert.query).await?;
user_auth_for_query(&session_key, alert.get_query()).await?;

Ok(web::Json(alert))
Ok(web::Json(alert.to_alert_config()))
}

// DELETE /alerts/{alert_id}
Expand All @@ -153,7 +153,7 @@ pub async fn delete(req: HttpRequest, alert_id: Path<Ulid>) -> Result<impl Respo
let alert = alerts.get_alert_by_id(alert_id).await?;

// validate that the user has access to the tables mentioned in the query
user_auth_for_query(&session_key, &alert.query).await?;
user_auth_for_query(&session_key, alert.get_query()).await?;

let store = PARSEABLE.storage.get_object_store();
let alert_path = alert_json_path(alert_id);
Expand Down Expand Up @@ -191,9 +191,9 @@ pub async fn update_state(
};

// check if alert id exists in map
let alert = alerts.get_alert_by_id(alert_id).await?;
let mut alert = alerts.get_alert_by_id(alert_id).await?;
// validate that the user has access to the tables mentioned in the query
user_auth_for_query(&session_key, &alert.query).await?;
user_auth_for_query(&session_key, alert.get_query()).await?;

let query_string = req.query_string();

Expand All @@ -212,14 +212,12 @@ pub async fn update_state(
));
}

// get current state
let current_state = alerts.get_state(alert_id).await?;

let new_state = AlertState::from_str(state_value)?;
alert.update_state(true, new_state, Some("".into())).await?;

current_state.update_state(new_state, alert_id).await?;
let alert = alerts.get_alert_by_id(alert_id).await?;
Ok(web::Json(alert))
alerts.update(&*alert).await;

Ok(web::Json(alert.to_alert_config()))
}

pub async fn list_tags() -> Result<impl Responder, AlertError> {
Expand Down
Loading