Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion src/health_monitoring_lib/rust/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
use crate::deadline::DeadlineEvaluationError;
use crate::heartbeat::HeartbeatEvaluationError;
use crate::log::ScoreDebug;
use crate::logic::LogicEvaluationError;
use crate::tag::MonitorTag;
use core::hash::Hash;
use core::time::Duration;
Expand Down Expand Up @@ -82,7 +83,7 @@ pub(crate) trait Monitor {
pub(crate) enum MonitorEvaluationError {
Deadline(DeadlineEvaluationError),
Heartbeat(HeartbeatEvaluationError),
Logic,
Logic(LogicEvaluationError),
}

impl From<DeadlineEvaluationError> for MonitorEvaluationError {
Expand All @@ -97,6 +98,12 @@ impl From<HeartbeatEvaluationError> for MonitorEvaluationError {
}
}

impl From<LogicEvaluationError> for MonitorEvaluationError {
fn from(value: LogicEvaluationError) -> Self {
MonitorEvaluationError::Logic(value)
}
}

/// Trait for evaluating monitors and reporting errors to be used by HealthMonitor.
pub(crate) trait MonitorEvaluator {
/// Run monitor evaluation.
Expand Down
128 changes: 125 additions & 3 deletions src/health_monitoring_lib/rust/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ mod worker;

pub mod deadline;
pub mod heartbeat;
pub mod logic;

use crate::common::{Monitor, MonitorEvalHandle};
use crate::deadline::{DeadlineMonitor, DeadlineMonitorBuilder};
use crate::heartbeat::{HeartbeatMonitor, HeartbeatMonitorBuilder};
use crate::log::{error, ScoreDebug};
use crate::logic::{LogicMonitor, LogicMonitorBuilder};
pub use common::TimeRange;
use containers::fixed_capacity::FixedCapacityVec;
use core::time::Duration;
Expand All @@ -48,6 +50,7 @@ pub enum HealthMonitorError {
pub struct HealthMonitorBuilder {
deadline_monitor_builders: HashMap<MonitorTag, DeadlineMonitorBuilder>,
heartbeat_monitor_builders: HashMap<MonitorTag, HeartbeatMonitorBuilder>,
logic_monitor_builders: HashMap<MonitorTag, LogicMonitorBuilder>,
supervisor_api_cycle: Duration,
internal_processing_cycle: Duration,
}
Expand All @@ -58,6 +61,7 @@ impl HealthMonitorBuilder {
Self {
deadline_monitor_builders: HashMap::new(),
heartbeat_monitor_builders: HashMap::new(),
logic_monitor_builders: HashMap::new(),
supervisor_api_cycle: Duration::from_millis(500),
internal_processing_cycle: Duration::from_millis(100),
}
Expand Down Expand Up @@ -89,6 +93,19 @@ impl HealthMonitorBuilder {
self
}

/// Add a [`LogicMonitor`] for the given [`MonitorTag`].
///
/// - `monitor_tag` - unique tag for the [`LogicMonitor`].
/// - `monitor_builder` - monitor builder to finalize.
///
/// # Note
///
/// If a logic monitor with the same tag already exists, it will be overwritten.
pub fn add_logic_monitor(mut self, monitor_tag: MonitorTag, monitor_builder: LogicMonitorBuilder) -> Self {
self.add_logic_monitor_internal(monitor_tag, monitor_builder);
self
}

/// Set the interval between supervisor API notifications.
/// This duration determines how often the health monitor notifies the supervisor about system liveness.
///
Expand Down Expand Up @@ -121,7 +138,9 @@ impl HealthMonitorBuilder {
}

// Check number of monitors.
let num_monitors = self.deadline_monitor_builders.len() + self.heartbeat_monitor_builders.len();
let num_monitors = self.deadline_monitor_builders.len()
+ self.heartbeat_monitor_builders.len()
+ self.logic_monitor_builders.len();
if num_monitors == 0 {
error!("No monitors have been added. HealthMonitor cannot be created.");
return Err(HealthMonitorError::WrongState);
Expand All @@ -144,9 +163,17 @@ impl HealthMonitorBuilder {
heartbeat_monitors.insert(tag, Some(MonitorState::Available(monitor)));
}

// Create logic monitors.
let mut logic_monitors = HashMap::new();
for (tag, builder) in self.logic_monitor_builders {
let monitor = builder.build(tag, &allocator)?;
logic_monitors.insert(tag, Some(MonitorState::Available(monitor)));
}

Ok(HealthMonitor {
deadline_monitors,
heartbeat_monitors,
logic_monitors,
worker: worker::UniqueThreadRunner::new(self.internal_processing_cycle),
supervisor_api_cycle: self.supervisor_api_cycle,
})
Expand All @@ -170,6 +197,10 @@ impl HealthMonitorBuilder {
self.heartbeat_monitor_builders.insert(monitor_tag, monitor_builder);
}

pub(crate) fn add_logic_monitor_internal(&mut self, monitor_tag: MonitorTag, monitor_builder: LogicMonitorBuilder) {
self.logic_monitor_builders.insert(monitor_tag, monitor_builder);
}

pub(crate) fn with_supervisor_api_cycle_internal(&mut self, cycle_duration: Duration) {
self.supervisor_api_cycle = cycle_duration;
}
Expand All @@ -196,6 +227,7 @@ type MonitorContainer<M> = Option<MonitorState<M>>;
pub struct HealthMonitor {
deadline_monitors: HashMap<MonitorTag, MonitorContainer<DeadlineMonitor>>,
heartbeat_monitors: HashMap<MonitorTag, MonitorContainer<HeartbeatMonitor>>,
logic_monitors: HashMap<MonitorTag, MonitorContainer<LogicMonitor>>,
worker: worker::UniqueThreadRunner,
supervisor_api_cycle: Duration,
}
Expand Down Expand Up @@ -241,6 +273,16 @@ impl HealthMonitor {
Self::get_monitor(&mut self.heartbeat_monitors, monitor_tag)
}

/// Get and pass ownership of a [`LogicMonitor`] for the given [`MonitorTag`].
///
/// - `monitor_tag` - unique tag for the [`LogicMonitor`].
///
/// Returns [`Some`] containing [`LogicMonitor`] if found and not taken.
/// Otherwise returns [`None`].
pub fn get_logic_monitor(&mut self, monitor_tag: MonitorTag) -> Option<LogicMonitor> {
Self::get_monitor(&mut self.logic_monitors, monitor_tag)
}

fn collect_given_monitors<M>(
monitors_to_collect: &mut HashMap<MonitorTag, MonitorContainer<M>>,
collected_monitors: &mut FixedCapacityVec<MonitorEvalHandle>,
Expand Down Expand Up @@ -287,10 +329,11 @@ impl HealthMonitor {
/// Health monitoring logic stops when the [`HealthMonitor`] is dropped.
pub fn start(&mut self) -> Result<(), HealthMonitorError> {
// Collect all monitors.
let num_monitors = self.deadline_monitors.len() + self.heartbeat_monitors.len();
let num_monitors = self.deadline_monitors.len() + self.heartbeat_monitors.len() + self.logic_monitors.len();
let mut collected_monitors = FixedCapacityVec::new(num_monitors);
Self::collect_given_monitors(&mut self.deadline_monitors, &mut collected_monitors)?;
Self::collect_given_monitors(&mut self.heartbeat_monitors, &mut collected_monitors)?;
Self::collect_given_monitors(&mut self.logic_monitors, &mut collected_monitors)?;

// Start monitoring logic.
let monitoring_logic = worker::MonitoringLogic::new(
Expand All @@ -315,7 +358,8 @@ mod tests {
use crate::common::TimeRange;
use crate::deadline::DeadlineMonitorBuilder;
use crate::heartbeat::HeartbeatMonitorBuilder;
use crate::tag::MonitorTag;
use crate::logic::LogicMonitorBuilder;
use crate::tag::{MonitorTag, StateTag};
use crate::{HealthMonitorBuilder, HealthMonitorError};
use core::time::Duration;

Expand All @@ -324,11 +368,20 @@ mod tests {
HeartbeatMonitorBuilder::new(range)
}

fn def_logic_monitor_builder() -> LogicMonitorBuilder {
let state1 = StateTag::from("state1");
let state2 = StateTag::from("state2");
LogicMonitorBuilder::new(state1)
.add_state(state1, &[state2])
.add_state(state2, &[state1])
}

#[test]
fn health_monitor_builder_new_succeeds() {
let health_monitor_builder = HealthMonitorBuilder::new();
assert!(health_monitor_builder.deadline_monitor_builders.is_empty());
assert!(health_monitor_builder.heartbeat_monitor_builders.is_empty());
assert!(health_monitor_builder.logic_monitor_builders.is_empty());
assert_eq!(health_monitor_builder.supervisor_api_cycle, Duration::from_millis(500));
assert_eq!(
health_monitor_builder.internal_processing_cycle,
Expand All @@ -342,10 +395,13 @@ mod tests {
let deadline_monitor_builder = DeadlineMonitorBuilder::new();
let heartbeat_monitor_tag = MonitorTag::from("heartbeat_monitor");
let heartbeat_monitor_builder = def_heartbeat_monitor_builder();
let logic_monitor_tag = MonitorTag::from("logic_monitor");
let logic_monitor_builder = def_logic_monitor_builder();

let result = HealthMonitorBuilder::new()
.add_deadline_monitor(deadline_monitor_tag, deadline_monitor_builder)
.add_heartbeat_monitor(heartbeat_monitor_tag, heartbeat_monitor_builder)
.add_logic_monitor(logic_monitor_tag, logic_monitor_builder)
.build();
assert!(result.is_ok());
}
Expand Down Expand Up @@ -475,21 +531,80 @@ mod tests {
assert!(result.is_none());
}

#[test]
fn health_monitor_get_logic_monitor_available() {
let logic_monitor_tag = MonitorTag::from("logic_monitor");
let logic_monitor_builder = def_logic_monitor_builder();
let mut health_monitor = HealthMonitorBuilder::new()
.add_logic_monitor(logic_monitor_tag, logic_monitor_builder)
.build()
.unwrap();

let result = health_monitor.get_logic_monitor(logic_monitor_tag);
assert!(result.is_some());
}

#[test]
fn health_monitor_get_logic_monitor_taken() {
let logic_monitor_tag = MonitorTag::from("logic_monitor");
let logic_monitor_builder = def_logic_monitor_builder();
let mut health_monitor = HealthMonitorBuilder::new()
.add_logic_monitor(logic_monitor_tag, logic_monitor_builder)
.build()
.unwrap();

let _ = health_monitor.get_logic_monitor(logic_monitor_tag);
let result = health_monitor.get_logic_monitor(logic_monitor_tag);
assert!(result.is_none());
}

#[test]
fn health_monitor_get_logic_monitor_unknown() {
let logic_monitor_builder = def_logic_monitor_builder();
let mut health_monitor = HealthMonitorBuilder::new()
.add_logic_monitor(MonitorTag::from("logic_monitor"), logic_monitor_builder)
.build()
.unwrap();

let result = health_monitor.get_logic_monitor(MonitorTag::from("undefined_monitor"));
assert!(result.is_none());
}

#[test]
fn health_monitor_get_logic_monitor_invalid_state() {
let logic_monitor_tag = MonitorTag::from("logic_monitor");
let logic_monitor_builder = def_logic_monitor_builder();
let mut health_monitor = HealthMonitorBuilder::new()
.add_logic_monitor(logic_monitor_tag, logic_monitor_builder)
.build()
.unwrap();

// Inject broken state - unreachable otherwise.
health_monitor.logic_monitors.insert(logic_monitor_tag, None);

let result = health_monitor.get_logic_monitor(logic_monitor_tag);
assert!(result.is_none());
}

#[test]
fn health_monitor_start_succeeds() {
let deadline_monitor_tag = MonitorTag::from("deadline_monitor");
let deadline_monitor_builder = DeadlineMonitorBuilder::new();
let heartbeat_monitor_tag = MonitorTag::from("heartbeat_monitor");
let heartbeat_monitor_builder = def_heartbeat_monitor_builder();
let logic_monitor_tag = MonitorTag::from("logic_monitor");
let logic_monitor_builder = def_logic_monitor_builder();

let mut health_monitor = HealthMonitorBuilder::new()
.add_deadline_monitor(deadline_monitor_tag, deadline_monitor_builder)
.add_heartbeat_monitor(heartbeat_monitor_tag, heartbeat_monitor_builder)
.add_logic_monitor(logic_monitor_tag, logic_monitor_builder)
.build()
.unwrap();

let _deadline_monitor = health_monitor.get_deadline_monitor(deadline_monitor_tag).unwrap();
let _heartbeat_monitor = health_monitor.get_heartbeat_monitor(heartbeat_monitor_tag).unwrap();
let _logic_monitor = health_monitor.get_logic_monitor(logic_monitor_tag).unwrap();

let result = health_monitor.start();
assert!(result.is_ok());
Expand All @@ -499,10 +614,12 @@ mod tests {
fn health_monitor_start_monitors_not_taken() {
let deadline_monitor_builder = DeadlineMonitorBuilder::new();
let heartbeat_monitor_builder = def_heartbeat_monitor_builder();
let logic_monitor_builder = def_logic_monitor_builder();

let mut health_monitor = HealthMonitorBuilder::new()
.add_deadline_monitor(MonitorTag::from("deadline_monitor"), deadline_monitor_builder)
.add_heartbeat_monitor(MonitorTag::from("heartbeat_monitor"), heartbeat_monitor_builder)
.add_logic_monitor(MonitorTag::from("logic_monitor"), logic_monitor_builder)
.build()
.unwrap();

Expand All @@ -516,10 +633,13 @@ mod tests {
let deadline_monitor_builder = DeadlineMonitorBuilder::new();
let heartbeat_monitor_tag = MonitorTag::from("heartbeat_monitor");
let heartbeat_monitor_builder = def_heartbeat_monitor_builder();
let logic_monitor_tag = MonitorTag::from("logic_monitor");
let logic_monitor_builder = def_logic_monitor_builder();

let mut health_monitor = HealthMonitorBuilder::new()
.add_deadline_monitor(deadline_monitor_tag, deadline_monitor_builder)
.add_heartbeat_monitor(heartbeat_monitor_tag, heartbeat_monitor_builder)
.add_logic_monitor(logic_monitor_tag, logic_monitor_builder)
.build()
.unwrap();

Expand All @@ -532,6 +652,8 @@ mod tests {
assert!(get_deadline_monitor_result.is_some());
let get_heartbeat_monitor_result = health_monitor.get_heartbeat_monitor(heartbeat_monitor_tag);
assert!(get_heartbeat_monitor_result.is_some());
let get_logic_monitor_result = health_monitor.get_logic_monitor(logic_monitor_tag);
assert!(get_logic_monitor_result.is_some());

// Try to start again, this time should be successful.
let start_result = health_monitor.start();
Expand Down
Loading
Loading