diff --git a/Cargo.lock b/Cargo.lock index 1d70eaa9..ef03d12d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1038,15 +1038,6 @@ version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" -[[package]] -name = "file-id" -version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6bc904b9bbefcadbd8e3a9fb0d464a9b979de6324c03b3c663e8994f46a5be36" -dependencies = [ - "windows-sys 0.52.0", -] - [[package]] name = "filetime" version = "0.2.25" @@ -1121,15 +1112,6 @@ dependencies = [ "thiserror 1.0.69", ] -[[package]] -name = "fsevent-sys" -version = "4.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2" -dependencies = [ - "libc", -] - [[package]] name = "futures" version = "0.3.31" @@ -1743,8 +1725,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f37dccff2791ab604f9babef0ba14fbe0be30bd368dc541e2b08d07c8aa908f3" dependencies = [ "bitflags 2.9.1", + "futures-core", "inotify-sys", "libc", + "tokio", ] [[package]] @@ -1819,26 +1803,6 @@ dependencies = [ "wasm-bindgen", ] -[[package]] -name = "kqueue" -version = "1.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eac30106d7dce88daf4a3fcb4879ea939476d5074a9b7ddd0fb97fa4bed5596a" -dependencies = [ - "kqueue-sys", - "libc", -] - -[[package]] -name = "kqueue-sys" -version = "1.0.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed9625ffda8729b85e45cf04090035ac368927b8cebc34898e7c120f52e4838b" -dependencies = [ - "bitflags 1.3.2", - "libc", -] - [[package]] name = "language-tags" version = "0.3.2" @@ -2109,44 +2073,6 @@ dependencies = [ "minimal-lexical", ] -[[package]] -name = "notify" -version = "8.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2fee8403b3d66ac7b26aee6e40a897d85dc5ce26f44da36b8b73e987cc52e943" -dependencies = [ - "bitflags 2.9.1", - "filetime", - "fsevent-sys", - "inotify", - "kqueue", - "libc", - "log", - "mio", - "notify-types", - "walkdir", - "windows-sys 0.59.0", -] - -[[package]] -name = "notify-debouncer-full" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2d88b1a7538054351c8258338df7c931a590513fb3745e8c15eb9ff4199b8d1" -dependencies = [ - "file-id", - "log", - "notify", - "notify-types", - "walkdir", -] - -[[package]] -name = "notify-types" -version = "2.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5e0826a989adedc2a244799e823aece04662b66609d96af8dff7ac6df9a8925d" - [[package]] name = "ntapi" version = "0.4.1" @@ -2283,7 +2209,7 @@ dependencies = [ [[package]] name = "omnect-device-service" -version = "0.41.8" +version = "0.41.10" dependencies = [ "actix-server", "actix-web", @@ -2300,13 +2226,12 @@ dependencies = [ "futures-executor", "futures-util", "glob", + "inotify", "lazy_static", "log", "log-panics", "mockall", "modemmanager", - "notify", - "notify-debouncer-full", "omnect-device-service", "rand 0.9.1", "regex", @@ -2818,15 +2743,6 @@ version = "1.0.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f" -[[package]] -name = "same-file" -version = "1.0.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" -dependencies = [ - "winapi-util", -] - [[package]] name = "schannel" version = "0.1.27" @@ -3544,16 +3460,6 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" -[[package]] -name = "walkdir" -version = "2.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b" -dependencies = [ - "same-file", - "winapi-util", -] - [[package]] name = "want" version = "0.3.1" @@ -3690,15 +3596,6 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" -[[package]] -name = "winapi-util" -version = "0.1.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" -dependencies = [ - "windows-sys 0.59.0", -] - [[package]] name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" diff --git a/Cargo.toml b/Cargo.toml index d9f02ec0..bc40bdfa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ license = "MIT OR Apache-2.0" name = "omnect-device-service" readme = "README.md" repository = "https://github.com/omnect/omnect-device-service.git" -version = "0.41.8" +version = "0.41.10" [dependencies] actix-server = { version = "2.6", default-features = false } @@ -25,12 +25,11 @@ freedesktop_entry_parser = { version = "1.3", default-features = false } futures = { version = "0.3", default-features = false } futures-util = { version = "0.3", default-features = false } glob = { version = "0.3", default-features = false } +inotify = { version = "0.11", default-features = false, features = ["stream"] } lazy_static = { version = "1.5", default-features = false } log = { version = "0.4", default-features = false } log-panics = { version = "2", default-features = false } modemmanager = { git = "https://github.com/omnect/modemmanager.git", tag = "0.3.4", default-features = false, optional = true } -notify = { version = "8.0", default-features = false } -notify-debouncer-full = { version = "0.5", default-features = false } regex-lite = { version = "0.1", default-features = true } reqwest = { version = "0.12", default-features = false, features = [ "default-tls", diff --git a/src/twin/consent.rs b/src/twin/consent.rs index 53c2463a..1a44882a 100644 --- a/src/twin/consent.rs +++ b/src/twin/consent.rs @@ -1,19 +1,19 @@ use crate::{ common::{from_json_file, to_json_file}, - twin::{Feature, feature::*}, + twin::feature::{self, *}, }; use anyhow::{Context, Result, bail, ensure}; use azure_iot_sdk::client::IotMessage; +use inotify::WatchMask; use log::{info, warn}; -use notify_debouncer_full::{Debouncer, NoCache, notify::*}; use serde::{Deserialize, Serialize}; use serde_json::json; -use std::{collections::HashMap, env, path::Path}; +use std::{collections::HashMap, env, path::PathBuf}; use tokio::sync::mpsc::Sender; macro_rules! consent_path { () => { - Path::new(&env::var("CONSENT_DIR_PATH").unwrap_or("/etc/omnect/consent".to_string())) + PathBuf::from(&env::var("CONSENT_DIR_PATH").unwrap_or("/etc/omnect/consent".to_string())) }; } @@ -53,9 +53,8 @@ pub struct ConsentConfig { reset_consent_on_fail: bool, } -#[derive(Default)] pub struct DeviceUpdateConsent { - file_observer: Option>, + file_map: HashMap, tx_reported_properties: Option>, } @@ -87,20 +86,15 @@ impl Feature for DeviceUpdateConsent { .await } - fn command_request_stream(&mut self) -> CommandRequestStreamResult { - let (file_observer, stream) = file_modified_stream::(vec![ - request_consent_path!().as_path(), - history_consent_path!().as_path(), - ]) - .context("command_request_stream: cannot create file_modified_stream")?; - self.file_observer = Some(file_observer); - Ok(Some(stream)) - } - async fn command(&mut self, cmd: &Command) -> CommandResult { match cmd { - Command::FileModified(file) => { - self.report_consent(from_json_file(&file.path)?).await?; + Command::WatchPath(cmd) => { + self.report_consent(from_json_file( + self.file_map + .get(&cmd.event.wd.get_watch_descriptor_id()) + .context("cannot find file for descriptor")?, + )?) + .await?; } Command::DesiredGeneralConsent(cmd) => { self.update_general_consent(cmd).await?; @@ -119,6 +113,22 @@ impl DeviceUpdateConsent { const USER_CONSENT_VERSION: u8 = 1; const ID: &'static str = "device_update_consent"; + pub fn new() -> Result { + let mut file_map = HashMap::new(); + + for path in [request_consent_path!(), history_consent_path!()] { + file_map.insert( + feature::add_watch::(&path, WatchMask::MODIFY)?.get_watch_descriptor_id(), + path, + ); + } + + Ok(DeviceUpdateConsent { + tx_reported_properties: None, + file_map, + }) + } + fn user_consent(&self, cmd: &UserConsentCommand) -> CommandResult { info!("user consent requested: {cmd:?}"); diff --git a/src/twin/factory_reset.rs b/src/twin/factory_reset.rs index 311f44e2..40082ed1 100644 --- a/src/twin/factory_reset.rs +++ b/src/twin/factory_reset.rs @@ -2,13 +2,13 @@ use crate::{ bootloader_env, common::from_json_file, systemd, - twin::{Feature, feature::*}, + twin::feature::{self, *}, web_service, }; use anyhow::{Context, Result, bail}; use azure_iot_sdk::client::IotMessage; +use inotify::WatchMask; use log::{debug, info, warn}; -use notify_debouncer_full::{Debouncer, NoCache, notify::*}; use serde::{Deserialize, Serialize}; use serde_json::{from_reader, json}; use serde_repr::*; @@ -37,8 +37,10 @@ macro_rules! config_path { macro_rules! custom_config_dir_path { () => { - env::var("FACTORY_RESET_CUSTOM_CONFIG_DIR_PATH") - .unwrap_or("/etc/omnect/factory-reset.d".to_string()) + Path::new( + &env::var("FACTORY_RESET_CUSTOM_CONFIG_DIR_PATH") + .unwrap_or("/etc/omnect/factory-reset.d".to_string()), + ) }; } @@ -92,7 +94,6 @@ struct FactoryResetReport { pub struct FactoryReset { tx_reported_properties: Option>, report: FactoryResetReport, - dir_observer: Option>, } impl Feature for FactoryReset { @@ -136,17 +137,9 @@ impl Feature for FactoryReset { Ok(()) } - fn command_request_stream(&mut self) -> CommandRequestStreamResult { - let (dir_observer, stream) = - dir_modified_stream::(vec![&Path::new(&custom_config_dir_path!())]) - .context("command_request_stream: cannot create dir_modified_stream")?; - self.dir_observer = Some(dir_observer); - Ok(Some(stream)) - } - async fn command(&mut self, cmd: &Command) -> CommandResult { match cmd { - Command::DirModified(_) => { + Command::WatchPath(_) => { let keys = FactoryReset::factory_reset_keys()?; if keys != self.report.keys { @@ -193,10 +186,14 @@ impl FactoryReset { result: FactoryReset::factory_reset_result()?, }; + feature::add_watch::( + custom_config_dir_path!(), + WatchMask::CREATE | WatchMask::DELETE, + )?; + Ok(FactoryReset { tx_reported_properties: None, report, - dir_observer: None, }) } diff --git a/src/twin/feature/mod.rs b/src/twin/feature/mod.rs index 3bb1a4c4..6a6d3c05 100644 --- a/src/twin/feature/mod.rs +++ b/src/twin/feature/mod.rs @@ -2,22 +2,24 @@ use crate::twin::{ TwinUpdate, TwinUpdateState, consent, factory_reset, firmware_update, network, reboot, ssh_tunnel, system_info, }; -use anyhow::{Result, bail, ensure}; +use anyhow::{Context, Result, anyhow, bail}; use azure_iot_sdk::client::{DirectMethod, IotMessage}; use futures::Stream; -use futures::StreamExt; +use futures_util::StreamExt; +use inotify::{Event, Inotify, WatchDescriptor, WatchMask, Watches}; use log::{debug, error, info, warn}; -use notify_debouncer_full::{DebounceEventResult, Debouncer, NoCache, new_debouncer, notify::*}; +use std::collections::HashMap; +use std::ffi::OsString; use std::{ any::TypeId, - path::{Path, PathBuf}, + path::Path, pin::Pin, + sync::{LazyLock, Mutex, OnceLock}, time::Duration, }; use tokio::{ sync::{mpsc, oneshot}, - task::JoinHandle, - time::{Instant, Interval}, + task::JoinSet, }; #[derive(Clone, Debug, PartialEq)] @@ -25,11 +27,8 @@ pub enum Command { CloseSshTunnel(ssh_tunnel::CloseSshTunnelCommand), DesiredGeneralConsent(consent::DesiredGeneralConsentCommand), DesiredUpdateDeviceSshCa(ssh_tunnel::UpdateDeviceSshCaCommand), - DirModified(PathCommand), FactoryReset(factory_reset::FactoryResetCommand), FleetId(system_info::FleetIdCommand), - FileCreated(PathCommand), - FileModified(PathCommand), GetSshPubKey(ssh_tunnel::GetSshPubKeyCommand), Interval(IntervalCommand), LoadFirmwareUpdate(firmware_update::LoadUpdateCommand), @@ -40,6 +39,7 @@ pub enum Command { SetWaitOnlineTimeout(reboot::SetWaitOnlineTimeoutCommand), ValidateUpdate(bool), UserConsent(consent::UserConsentCommand), + WatchPath(WatchPathCommand), } impl Command { @@ -50,10 +50,7 @@ impl Command { CloseSshTunnel(_) => TypeId::of::(), DesiredGeneralConsent(_) => TypeId::of::(), DesiredUpdateDeviceSshCa(_) => TypeId::of::(), - DirModified(cmd) => cmd.feature_id, FactoryReset(_) => TypeId::of::(), - FileCreated(cmd) => cmd.feature_id, - FileModified(cmd) => cmd.feature_id, FleetId(_) => TypeId::of::(), GetSshPubKey(_) => TypeId::of::(), Interval(cmd) => cmd.feature_id, @@ -65,6 +62,7 @@ impl Command { SetWaitOnlineTimeout(_) => TypeId::of::(), ValidateUpdate(_) => TypeId::of::(), UserConsent(_) => TypeId::of::(), + WatchPath(cmd) => cmd.feature_id, } } @@ -193,157 +191,160 @@ pub(crate) trait Feature { Ok(()) } - fn command_request_stream(&mut self) -> CommandRequestStreamResult { - Ok(None) - } - async fn command(&mut self, _cmd: &Command) -> CommandResult { unimplemented!(); } } #[derive(Clone, Debug, PartialEq)] -pub struct PathCommand { +pub struct IntervalCommand { pub feature_id: TypeId, - pub path: PathBuf, } -#[derive(Clone, Debug, PartialEq)] -pub struct IntervalCommand { +#[derive(Clone, Debug)] +pub struct WatchPathCommand { pub feature_id: TypeId, - pub instant: Instant, + pub event: Event, } -pub fn interval_stream(interval: Interval) -> CommandRequestStream -where - T: 'static, -{ - tokio_stream::wrappers::IntervalStream::new(interval) - .map(|i| CommandRequest { - command: Command::Interval(IntervalCommand { - feature_id: TypeId::of::(), - instant: i, - }), - reply: None, - }) - .boxed() +#[derive(Debug)] +struct FileWatcher { + watches: Watches, + feature_map: HashMap, } -pub fn file_created_stream(paths: Vec<&Path>) -> Result<(JoinHandle<()>, CommandRequestStream)> -where - T: 'static, -{ - let (tx, rx) = mpsc::channel(2); - let inner_paths: Vec = paths.into_iter().map(|p| p.to_path_buf()).collect(); - - let handle = tokio::task::spawn_blocking(move || { - loop { - for p in &inner_paths { - if matches!(p.try_exists(), Ok(true)) { - let _ = tx.blocking_send(CommandRequest { - command: Command::FileCreated(PathCommand { - feature_id: TypeId::of::(), - path: p.clone(), - }), - reply: None, - }); - return; - } - } - std::thread::sleep(Duration::from_millis(500)); - } - }); - - Ok(( - handle, - tokio_stream::wrappers::ReceiverStream::new(rx).boxed(), - )) +impl PartialEq for WatchPathCommand { + fn eq(&self, other: &Self) -> bool { + self.event.wd == other.event.wd + } } -pub fn file_modified_stream( - paths: Vec<&Path>, -) -> Result<(Debouncer, CommandRequestStream)> -where - T: 'static, -{ - let (tx, rx) = mpsc::channel(2); - let mut debouncer = new_debouncer( - Duration::from_secs(2), - None, - move |res: DebounceEventResult| match res { - Ok(debounced_events) => { - for de in debounced_events { - if let EventKind::Modify(_) = de.event.kind { - debug!("notify-event: {de:?}"); - for p in &de.paths { - let _ = tx.blocking_send(CommandRequest { - command: Command::FileModified(PathCommand { - feature_id: TypeId::of::(), - path: p.clone(), +impl Eq for WatchPathCommand {} + +static TASKS: LazyLock>> = LazyLock::new(|| Mutex::new(JoinSet::new())); +static FILE_WATCHER: OnceLock> = OnceLock::new(); +static TX_COMMAND_REQUEST: OnceLock> = OnceLock::new(); + +pub fn init(tx_command_request: mpsc::Sender) -> Result<()> { + TX_COMMAND_REQUEST + .set(tx_command_request.clone()) + .map_err(|e| anyhow!("init: set TX_COMMAND_REQUEST {e:#?}"))?; + + let inotify = Inotify::init().context("init: failed to initialize inotify")?; + + FILE_WATCHER + .set(Mutex::new(FileWatcher { + watches: inotify.watches(), + feature_map: HashMap::new(), + })) + .map_err(|e| anyhow!("init: set FILE_WATCHER {e:#?}"))?; + + TASKS + .lock() + .map_err(|e| anyhow!("init: TASKS {e:#?}"))? + .spawn(async move { + let mut buffer = [0; 1024]; + let mut stream = inotify.into_event_stream(&mut buffer).unwrap(); + while let Some(event_result) = stream.next().await { + match event_result { + Ok(ev) => { + debug!("inotify: {ev:?}"); + let feature_id = *FILE_WATCHER + .get() + .unwrap() + .lock() + .unwrap() + .feature_map + .get(&ev.wd) + .unwrap(); + + if let Err(e) = tx_command_request + .send(CommandRequest { + command: Command::WatchPath(WatchPathCommand { + feature_id, + event: ev, }), reply: None, - }); + }) + .await + { + error!("inotify: send {e:#?}") } } + Err(e) => error!("inotify: event {e:#?}"), } } - Err(errors) => errors.iter().for_each(|e| error!("notify-error: {e:?}")), - }, - )?; - - for p in paths { - ensure!(p.is_file(), "{p:?} is not a regular existing file"); - debug!("watch {p:?}"); - debouncer.watch(p, RecursiveMode::NonRecursive)?; - } + }); - Ok(( - debouncer, - tokio_stream::wrappers::ReceiverStream::new(rx).boxed(), - )) + Ok(()) } -pub fn dir_modified_stream( - paths: Vec<&Path>, -) -> Result<(Debouncer, CommandRequestStream)> +pub fn add_watch(path: &Path, mask: WatchMask) -> Result where T: 'static, { - let (tx, rx) = mpsc::channel(2); - let mut debouncer = new_debouncer( - Duration::from_secs(2), - None, - move |res: DebounceEventResult| match res { - Ok(debounced_events) => { - for de in debounced_events { - if matches!(de.event.kind, EventKind::Create(_) | EventKind::Remove(_)) { - debug!("notify-event: {de:?}"); - for p in &de.paths { - let _ = tx.blocking_send(CommandRequest { - command: Command::DirModified(PathCommand { - feature_id: TypeId::of::(), - path: p.clone(), - }), - reply: None, - }); - } - } + let mut watcher = FILE_WATCHER + .get() + .context("add_watch: FileWatcher missing")? + .lock() + .map_err(|e| anyhow!("add_watch: FILE_WATCHER {e:#?}"))?; + + let desc = watcher.watches.add(path, mask)?; + + let None = watcher.feature_map.insert(desc.clone(), TypeId::of::()) else { + bail!("add_watch: currently only one feature per watch supported") + }; + + Ok(desc) +} + +pub fn remove_watch(wd: WatchDescriptor) -> Result<()> { + let mut watcher = FILE_WATCHER + .get() + .context("remove_watch: FileWatcher missing")? + .lock() + .map_err(|e| anyhow!("remove_watch: FILE_WATCHER {e:#?}"))?; + + watcher.watches.remove(wd.clone())?; + + if watcher.feature_map.remove_entry(&wd).is_none() { + warn!("remove_watch: WatchDescriptor doesn't exist") + }; + + Ok(()) +} + +pub fn notify_interval(interval: Duration) -> Result<()> +where + T: 'static, +{ + let tx_command_request = TX_COMMAND_REQUEST + .get() + .context("notify_interval: failed to get TX_COMMAND_REQUEST")? + .clone(); + + TASKS + .lock() + .map_err(|e| anyhow!("notify_interval: TASKS {e:#?}"))? + .spawn(async move { + let mut interval = tokio::time::interval(interval); + loop { + interval.tick().await; + if let Err(e) = tx_command_request + .send(CommandRequest { + command: Command::Interval(IntervalCommand { + feature_id: TypeId::of::(), + }), + reply: None, + }) + .await + { + error!("notify_interval: send {e:#?}") } } - Err(errors) => errors.iter().for_each(|e| error!("notify-error: {e:?}")), - }, - )?; - - for p in paths { - ensure!(p.is_dir(), "{p:?} is not a regular existing directory"); - debug!("watch {p:?}"); - debouncer.watch(p, RecursiveMode::Recursive)?; - } + }); - Ok(( - debouncer, - tokio_stream::wrappers::ReceiverStream::new(rx).boxed(), - )) + Ok(()) } #[cfg(test)] @@ -443,10 +444,7 @@ mod tests { }) .unwrap(), Command::UserConsent(consent::UserConsentCommand { - user_consent: std::collections::HashMap::from([( - "foo".to_string(), - "bar".to_string() - )]), + user_consent: HashMap::from([("foo".to_string(), "bar".to_string())]), }) ); diff --git a/src/twin/firmware_update/mod.rs b/src/twin/firmware_update/mod.rs index e6fa6f34..088ea7c5 100644 --- a/src/twin/firmware_update/mod.rs +++ b/src/twin/firmware_update/mod.rs @@ -9,7 +9,6 @@ use crate::{ systemd, systemd::{unit::UnitAction, watchdog::WatchdogManager}, twin::{ - Feature, feature::*, firmware_update::{adu_types::*, common::*, os_version::*}, }, diff --git a/src/twin/mod.rs b/src/twin/mod.rs index 833f5d6b..fe48af9f 100644 --- a/src/twin/mod.rs +++ b/src/twin/mod.rs @@ -52,6 +52,7 @@ enum TwinState { pub struct Twin { client: Option, web_service: Option, + // ToDo: rm? tx_command_request: mpsc::Sender, tx_reported_properties: mpsc::Sender, tx_outgoing_message: mpsc::Sender, @@ -66,6 +67,7 @@ impl Twin { tx_reported_properties: mpsc::Sender, tx_outgoing_message: mpsc::Sender, ) -> Result { + feature::init(tx_command_request.clone())?; /* - init features first - start with SystemInfo in order to log useful infos asap @@ -77,7 +79,7 @@ impl Twin { ), ( TypeId::of::(), - DynFeature::boxed(consent::DeviceUpdateConsent::default()), + DynFeature::boxed(consent::DeviceUpdateConsent::new()?), ), ( TypeId::of::(), @@ -89,11 +91,11 @@ impl Twin { ), ( TypeId::of::(), - DynFeature::boxed(modem_info::ModemInfo::new()), + DynFeature::boxed(modem_info::ModemInfo::new()?), ), ( TypeId::of::(), - DynFeature::boxed(network::Network::default()), + DynFeature::boxed(network::Network::new()?), ), ( TypeId::of::(), @@ -331,19 +333,6 @@ impl Twin { } } - fn feature_command_request_streams(&mut self) -> Vec { - self.features - .values_mut() - .filter_map(|f| { - if f.is_enabled() { - f.command_request_stream().unwrap() - } else { - None - } - }) - .collect() - } - async fn request_validate_update(&mut self, authenticated: bool) -> Result<()> { self.tx_command_request .send(CommandRequest { @@ -384,10 +373,11 @@ impl Twin { systemd::sd_notify_ready(); - let mut command_requests = twin.feature_command_request_streams(); - command_requests.push(Self::direct_method_stream(rx_direct_method)); - command_requests.push(Self::desired_properties_stream(rx_twin_desired)); - command_requests.push(ReceiverStream::new(rx_command_request).boxed()); + let command_requests = vec![ + Self::direct_method_stream(rx_direct_method), + Self::desired_properties_stream(rx_twin_desired), + ReceiverStream::new(rx_command_request).boxed(), + ]; tokio::pin! { let client_created = Self::connect_iothub_client(&client_builder); diff --git a/src/twin/modem_info.rs b/src/twin/modem_info.rs index 76d82e24..bb447ee2 100644 --- a/src/twin/modem_info.rs +++ b/src/twin/modem_info.rs @@ -1,10 +1,10 @@ -use super::{Feature, feature::*}; +use crate::twin::feature::{self, *}; use anyhow::{Context, Result, bail}; use azure_iot_sdk::client::IotMessage; use lazy_static::lazy_static; use serde_json::json; use std::{env, time::Duration}; -use tokio::{sync::mpsc::Sender, time::interval}; +use tokio::sync::mpsc::Sender; #[cfg(feature = "modem_info")] mod inner { @@ -294,14 +294,21 @@ mod inner { use super::*; use log::warn; - #[derive(Default)] pub struct ModemInfo { pub(super) tx_reported_properties: Option>, } impl ModemInfo { - pub fn new() -> Self { - ModemInfo::default() + pub fn new() -> Result { + if 0 < *REFRESH_MODEM_INFO_INTERVAL_SECS { + feature::notify_interval::(Duration::from_secs( + *REFRESH_MODEM_INFO_INTERVAL_SECS, + ))?; + } + + Ok(ModemInfo { + tx_reported_properties: None, + }) } pub async fn report(&self, force: bool) -> Result<()> { @@ -365,16 +372,6 @@ impl Feature for ModemInfo { self.report(true).await } - fn command_request_stream(&mut self) -> CommandRequestStreamResult { - if !self.is_enabled() || 0 == *REFRESH_MODEM_INFO_INTERVAL_SECS { - Ok(None) - } else { - Ok(Some(interval_stream::(interval( - Duration::from_secs(*REFRESH_MODEM_INFO_INTERVAL_SECS), - )))) - } - } - async fn command(&mut self, cmd: &Command) -> CommandResult { let Command::Interval(_) = cmd else { bail!("unexpected event: {cmd:?}") diff --git a/src/twin/network.rs b/src/twin/network.rs index 8d65d44a..2d2f48ae 100644 --- a/src/twin/network.rs +++ b/src/twin/network.rs @@ -1,6 +1,6 @@ use crate::{ systemd::{networkd, unit}, - twin::{Feature, feature::*}, + twin::feature::{self, *}, web_service, }; use anyhow::{Context, Result, bail}; @@ -10,7 +10,7 @@ use log::{debug, error, info, warn}; use serde::Serialize; use serde_json::json; use std::{env, time::Duration}; -use tokio::{sync::mpsc::Sender, time::interval}; +use tokio::sync::mpsc::Sender; lazy_static! { static ref REFRESH_NETWORK_STATUS_INTERVAL_SECS: u64 = { @@ -47,7 +47,6 @@ pub struct Interface { ipv4: IpConfig, } -#[derive(Default)] pub struct Network { tx_reported_properties: Option>, interfaces: Vec, @@ -76,16 +75,6 @@ impl Feature for Network { Ok(()) } - fn command_request_stream(&mut self) -> CommandRequestStreamResult { - if !self.is_enabled() || 0 == *REFRESH_NETWORK_STATUS_INTERVAL_SECS { - Ok(None) - } else { - Ok(Some(interval_stream::(interval( - Duration::from_secs(*REFRESH_NETWORK_STATUS_INTERVAL_SECS), - )))) - } - } - async fn command(&mut self, cmd: &Command) -> CommandResult { match cmd { Command::Interval(_) => {} @@ -110,6 +99,19 @@ impl Network { const NETWORK_STATUS_VERSION: u8 = 3; const ID: &'static str = "network_status"; + pub fn new() -> Result { + if 0 < *REFRESH_NETWORK_STATUS_INTERVAL_SECS { + feature::notify_interval::(Duration::from_secs( + *REFRESH_NETWORK_STATUS_INTERVAL_SECS, + ))?; + } + + Ok(Network { + interfaces: vec![], + tx_reported_properties: None, + }) + } + async fn report(&mut self, force: bool) -> Result<()> { let interfaces = Self::parse_interfaces(&networkd::networkd_interfaces().await?)?; diff --git a/src/twin/provisioning_config.rs b/src/twin/provisioning_config.rs index 3ee00ea4..6f7995c9 100644 --- a/src/twin/provisioning_config.rs +++ b/src/twin/provisioning_config.rs @@ -1,4 +1,4 @@ -use crate::twin::{Feature, feature::*}; +use crate::twin::feature::{self, *}; use anyhow::{Context, Result, bail, ensure}; use azure_iot_sdk::client::IotMessage; use lazy_static::lazy_static; @@ -7,7 +7,7 @@ use serde::Serialize; use serde_json::json; use std::{env, path::Path, time::Duration}; use time::format_description::well_known::Rfc3339; -use tokio::{sync::mpsc::Sender, time::interval}; +use tokio::sync::mpsc::Sender; lazy_static! { static ref REFRESH_EST_EXPIRY_INTERVAL_SECS: u64 = { @@ -141,19 +141,6 @@ impl Feature for ProvisioningConfig { self.report().await } - fn command_request_stream(&mut self) -> CommandRequestStreamResult { - if !self.is_enabled() || 0 == *REFRESH_EST_EXPIRY_INTERVAL_SECS { - Ok(None) - } else { - match &self.method { - Method::X509(cert) if cert.est => Ok(Some(interval_stream::( - interval(Duration::from_secs(*REFRESH_EST_EXPIRY_INTERVAL_SECS)), - ))), - _ => Ok(None), - } - } - } - async fn command(&mut self, cmd: &Command) -> CommandResult { let Command::Interval(_) = cmd else { bail!("unexpected event: {cmd:?}") @@ -232,6 +219,20 @@ impl ProvisioningConfig { _ => bail!("provisioning_config: invalid provisioning configuration found"), }; + if 0 < *REFRESH_EST_EXPIRY_INTERVAL_SECS + && matches!( + method, + Method::X509(X509 { + est: true, + expires: _ + }) + ) + { + feature::notify_interval::(Duration::from_secs( + *REFRESH_EST_EXPIRY_INTERVAL_SECS, + ))?; + } + let this = ProvisioningConfig { tx_reported_properties: None, source, diff --git a/src/twin/reboot.rs b/src/twin/reboot.rs index 5dac511e..d4079b4f 100644 --- a/src/twin/reboot.rs +++ b/src/twin/reboot.rs @@ -1,8 +1,4 @@ -use crate::{ - systemd, - twin::{Feature, feature::*}, - web_service, -}; +use crate::{systemd, twin::feature::*, web_service}; use anyhow::{Context, Result, bail}; use azure_iot_sdk::client::IotMessage; use log::{debug, info}; diff --git a/src/twin/ssh_tunnel.rs b/src/twin/ssh_tunnel.rs index 6f453d6b..da882568 100644 --- a/src/twin/ssh_tunnel.rs +++ b/src/twin/ssh_tunnel.rs @@ -1,11 +1,11 @@ use crate::twin::{ - feature::{Command as FeatureCommand, CommandResult}, Feature, + feature::{Command as FeatureCommand, CommandResult}, }; -use anyhow::{bail, ensure, Context, Result}; +use anyhow::{Context, Result, bail, ensure}; use azure_iot_sdk::client::IotMessage; use log::{debug, error, info, warn}; -use serde::{de::Error, Deserialize, Deserializer}; +use serde::{Deserialize, Deserializer, de::Error}; use serde_json::json; use std::{ env, @@ -18,7 +18,7 @@ use std::{ use tokio::{ io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, process::{Child, Command}, - sync::{mpsc::Sender, OwnedSemaphorePermit, Semaphore, TryAcquireError}, + sync::{OwnedSemaphorePermit, Semaphore, TryAcquireError, mpsc::Sender}, }; use uuid::Uuid; @@ -448,14 +448,20 @@ impl SshTunnel { if msg == "established" { Ok(()) } else { - bail!("await_tunnel_creation: failed to establish ssh tunnel due to unexpected response from ssh server: {}", msg); + bail!( + "await_tunnel_creation: failed to establish ssh tunnel due to unexpected response from ssh server: {}", + msg + ); } } Ok(None) => { bail!("await_tunnel_creation: failed to establish ssh tunnel"); } Err(err) => { - bail!("await_tunnel_creation: failed to establish ssh tunnel since unable to read from ssh process: {}", err); + bail!( + "await_tunnel_creation: failed to establish ssh tunnel since unable to read from ssh process: {}", + err + ); } } } @@ -898,19 +904,21 @@ mod tests { } // the final should fail - assert!(ssh_tunnel - .command(&FeatureCommand::OpenSshTunnel(OpenSshTunnelCommand { - tunnel_id: "b7afb216-5f7a-4755-a300-9374f8a0e9ff".to_string(), - certificate: std::fs::read_to_string(cert_path.clone()).unwrap(), - bastion_config: BastionConfig { - host: "test-host".to_string(), - port: 2222, - user: "test-user".to_string(), - socket_path: PathBuf::from_str("/some/test/socket/path").unwrap(), - }, - })) - .await - .is_err()); + assert!( + ssh_tunnel + .command(&FeatureCommand::OpenSshTunnel(OpenSshTunnelCommand { + tunnel_id: "b7afb216-5f7a-4755-a300-9374f8a0e9ff".to_string(), + certificate: std::fs::read_to_string(cert_path.clone()).unwrap(), + bastion_config: BastionConfig { + host: "test-host".to_string(), + port: 2222, + user: "test-user".to_string(), + socket_path: PathBuf::from_str("/some/test/socket/path").unwrap(), + }, + })) + .await + .is_err() + ); // finally, close the pipes. By opening and closing for writing is // sufficient. diff --git a/src/twin/system_info.rs b/src/twin/system_info.rs index 591a6550..4891cb69 100644 --- a/src/twin/system_info.rs +++ b/src/twin/system_info.rs @@ -1,27 +1,29 @@ use crate::{ common::RootPartition, reboot_reason, - twin::{Feature, feature::*}, + twin::feature::{self, *}, web_service, }; use anyhow::{Context, Result, bail}; use azure_iot_sdk::client::{IotHubClient, IotMessage}; -use futures::StreamExt; +use inotify::WatchMask; use lazy_static::lazy_static; -use log::{debug, info, warn}; -use serde::{Deserialize, Serialize}; +use log::{info, warn}; +use serde::{Deserialize, Serialize, Serializer, ser::Error}; use serde_json::json; -use std::{env, path::Path}; +use std::{ + env, + path::Path, + sync::{LazyLock, RwLock}, + time::Duration, +}; use sysinfo; use time::format_description::well_known::Rfc3339; -use tokio::{ - sync::mpsc, - task::JoinHandle, - time::{Duration, interval}, -}; +use tokio::sync::mpsc; static BOOTLOADER_UPDATED_FILE: &str = "/run/omnect-device-service/omnect_bootloader_updated"; +// ToDo lazy_static! { static ref REFRESH_SYSTEM_INFO_INTERVAL_SECS: u64 = { const REFRESH_SYSTEM_INFO_INTERVAL_SECS_DEFAULT: &str = "60"; @@ -32,43 +34,103 @@ lazy_static! { }; } -#[derive(Serialize)] +#[derive(Default, Serialize)] struct Label { - device_id: String, - module_name: String, + #[serde(serialize_with = "device_id")] + device_id: (), + #[serde(serialize_with = "module_name")] + module_name: (), #[serde(skip_serializing_if = "Option::is_none")] sensor: Option, } +fn device_id(_: &(), s: S) -> Result +where + S: Serializer, +{ + let Some(hostname) = sysinfo::System::host_name() else { + return Err(Error::custom("failed to get hostname")); + }; + + s.serialize_str(&hostname) +} + +fn module_name(_: &(), s: S) -> Result +where + S: Serializer, +{ + s.serialize_str("omnect-device-service") +} + #[derive(Serialize)] -struct Metric { - time_generated_utc: String, - name: String, - value: f64, - labels: Label, +enum MetricValue { + CpuUsage(f64), + MemoryUsed(f64), + MemoryTotal(f64), + DiskUsed(f64), + DiskTotal(f64), + Temp(f64, String), } -impl Metric { - fn new( - time: String, - name: String, - value: f64, - device: String, - sensor: Option, - ) -> Metric { +impl MetricValue { + fn to_metric(self) -> Metric { + let (name, value, labels) = match self { + MetricValue::CpuUsage(value) => ("cpu_usage".to_owned(), value, Label::default()), + MetricValue::MemoryUsed(value) => ("memory_used".to_owned(), value, Label::default()), + MetricValue::MemoryTotal(value) => ("memory_total".to_owned(), value, Label::default()), + MetricValue::DiskUsed(value) => ("disk_used".to_owned(), value, Label::default()), + MetricValue::DiskTotal(value) => ("disk_total".to_owned(), value, Label::default()), + MetricValue::Temp(value, sensor) => ( + "temp".to_owned(), + value, + Label { + sensor: Some(sensor), + ..Default::default() + }, + ), + }; + Metric { - time_generated_utc: time, - name, - value, - labels: Label { - device_id: device, - module_name: "omnect-device-service".to_string(), - sensor, + metric: InnerMetric { + name, + value, + labels, }, + ..Default::default() } } } +#[derive(Default, Serialize)] +struct InnerMetric { + name: String, + value: f64, + labels: Label, +} + +#[derive(Default, Serialize)] +struct Metric { + #[serde(serialize_with = "time_stamp")] + time_generated_utc: (), + #[serde(flatten)] + metric: InnerMetric, +} + +static TIME_STAMP: LazyLock> = + LazyLock::new(|| RwLock::new(time::OffsetDateTime::now_utc().format(&Rfc3339).unwrap())); + +fn time_stamp(_: &(), s: S) -> Result +where + S: Serializer, +{ + let Ok(time_stamp) = TIME_STAMP.read() else { + return Err(Error::custom("failed to get timestamp")); + }; + + s.serialize_str(&time_stamp) +} + +// ToDo rm lazy_staticw lazy_static! { static ref TIMESYNC_FILE: &'static Path = if cfg!(feature = "mock") { Path::new("/tmp/synchronized") @@ -101,7 +163,6 @@ struct HardwareInfo { components: sysinfo::Components, disk: sysinfo::Disks, system: sysinfo::System, - hostname: String, } #[derive(Default, Serialize)] @@ -118,8 +179,6 @@ pub struct SystemInfo { reboot_reason: Option, #[serde(skip_serializing_if = "Option::is_none")] fleet_id: Option, - #[serde(skip_serializing)] - handle: Option>, } impl Feature for SystemInfo { fn name(&self) -> String { @@ -148,29 +207,12 @@ impl Feature for SystemInfo { self.report().await } - fn command_request_stream(&mut self) -> CommandRequestStreamResult { - Ok(match *REFRESH_SYSTEM_INFO_INTERVAL_SECS { - 0 if self.software_info.boot_time.is_none() => Some(self.file_created_stream()?), - 0 if self.software_info.boot_time.is_some() => None, - _ => { - let interval_stream = interval_stream::(interval(Duration::from_secs( - *REFRESH_SYSTEM_INFO_INTERVAL_SECS, - ))); - Some(if self.software_info.boot_time.is_none() { - futures::stream::select(interval_stream, self.file_created_stream()?).boxed() - } else { - interval_stream - }) - } - }) - } - async fn command(&mut self, cmd: &Command) -> CommandResult { match cmd { Command::Interval(_) => { self.metrics().await?; } - Command::FileCreated(_) => { + Command::WatchPath(_) => { self.software_info.boot_time = Some(Self::boot_time()?); self.report().await?; } @@ -211,16 +253,13 @@ impl SystemInfo { RootPartition::current()?.as_str() ); - let boot_time = if matches!(TIMESYNC_FILE.try_exists(), Ok(true)) { - Some(Self::boot_time()?) - } else { - debug!("new: start timesync watcher since not synced yet"); - None - }; + feature::add_watch::(&TIMESYNC_FILE, WatchMask::CREATE | WatchMask::ONESHOT)?; - let Some(hostname) = sysinfo::System::host_name() else { - bail!("metrics: hostname could not be read") - }; + if 0 < *REFRESH_SYSTEM_INFO_INTERVAL_SECS { + feature::notify_interval::(Duration::from_secs( + *REFRESH_SYSTEM_INFO_INTERVAL_SECS, + ))?; + } Ok(SystemInfo { tx_reported_properties: None, @@ -229,7 +268,7 @@ impl SystemInfo { os: Self::os_info()?, azure_sdk_version, omnect_device_service_version, - boot_time, + boot_time: None, }, hardware_info: HardwareInfo { components: sysinfo::Components::new_with_refreshed_list(), @@ -239,21 +278,12 @@ impl SystemInfo { .with_cpu(sysinfo::CpuRefreshKind::everything()) .with_memory(sysinfo::MemoryRefreshKind::everything()), ), - - hostname, }, reboot_reason: reboot_reason::current_reboot_reason(), fleet_id: None, - handle: None, }) } - fn file_created_stream(&mut self) -> Result { - let (handle, stream) = file_created_stream::(vec![&TIMESYNC_FILE])?; - self.handle = Some(handle); - Ok(stream) - } - async fn report(&self) -> Result<()> { let value = serde_json::to_value(self).context("report: cannot serialize")?; @@ -296,106 +326,50 @@ impl SystemInfo { .context("boot_time: format uptime") } - fn cpu_usage(&self, time: String) -> Metric { - Metric::new( - time, - "cpu_usage".to_string(), - self.hardware_info.system.global_cpu_usage() as f64, - self.hardware_info.hostname.clone(), - None, - ) - } - - fn memory_used(&self, time: String) -> Metric { - Metric::new( - time, - "memory_used".to_string(), - self.hardware_info.system.used_memory() as f64, - self.hardware_info.hostname.clone(), - None, - ) - } - - fn memory_total(&self, time: String) -> Metric { - Metric::new( - time, - "memory_total".to_string(), - self.hardware_info.system.total_memory() as f64, - self.hardware_info.hostname.clone(), - None, - ) - } - - fn disk_used(&self, time: String, value: f64) -> Metric { - Metric::new( - time, - "disk_used".to_string(), - value, - self.hardware_info.hostname.clone(), - None, - ) - } - - fn disk_total(&self, time: String, value: f64) -> Metric { - Metric::new( - time, - "disk_total".to_string(), - value, - self.hardware_info.hostname.clone(), - None, - ) - } - - fn temp(&self, time: String, value: f64, sensor: String) -> Metric { - Metric::new( - time, - "temp".to_string(), - value, - self.hardware_info.hostname.clone(), - Some(sensor), - ) - } - async fn metrics(&mut self) -> Result<()> { let Some(tx) = &self.tx_outgoing_message else { warn!("metrics: skip since tx_outgoing_message is None"); return Ok(()); }; - let Ok(time) = time::OffsetDateTime::now_utc().format(&Rfc3339) else { - bail!("metrics: timestamp could not be generated") + let Ok(mut time_stamp) = TIME_STAMP.write() else { + bail!("metrics: failed to lock TIME_STAMP") }; + *time_stamp = time::OffsetDateTime::now_utc() + .format(&Rfc3339) + .context("metrics: failed to get and format timestamp")?; self.hardware_info.components.refresh(true); self.hardware_info.system.refresh_cpu_usage(); self.hardware_info.system.refresh_memory(); self.hardware_info.disk.refresh(true); - let mut disk_total = 0; - let mut disk_used = 0; - for disk in self.hardware_info.disk.list() { - if disk.name().to_str() == Some("/dev/omnect/data") { - disk_total = disk.total_space(); - disk_used = disk.total_space() - disk.available_space(); - break; - } - } - - let mut metric_list = vec![ - self.cpu_usage(time.clone()), - self.memory_used(time.clone()), - self.memory_total(time.clone()), - self.disk_used(time.clone(), disk_used as f64), - self.disk_total(time.clone(), disk_total as f64), + let (disk_total, disk_used) = self + .hardware_info + .disk + .into_iter() + .filter_map(|disk| match (disk.name().to_str(), disk.total_space()) { + (Some("/dev/omnect/data"), space) => Some((space, space - disk.available_space())), + _ => None, + }) + .next() + .unwrap_or((0, 0)); + + let mut metrics = vec![ + MetricValue::CpuUsage(self.hardware_info.system.global_cpu_usage() as f64).to_metric(), + MetricValue::MemoryUsed(self.hardware_info.system.used_memory() as f64).to_metric(), + MetricValue::MemoryTotal(self.hardware_info.system.total_memory() as f64).to_metric(), + MetricValue::DiskUsed(disk_used as f64).to_metric(), + MetricValue::DiskTotal(disk_total as f64).to_metric(), ]; self.hardware_info.components.iter().for_each(|c| { if let Some(t) = c.temperature() { - metric_list.push(self.temp(time.clone(), t.into(), c.label().to_string())) + metrics.push(MetricValue::Temp(t.into(), c.label().to_string()).to_metric()) }; }); - let json = serde_json::to_vec(&metric_list) + let json = serde_json::to_vec(&metrics) .context("metrics list could not be converted to vector:")?; let msg = IotMessage::builder() diff --git a/src/web_service.rs b/src/web_service.rs index 86c82851..c2a5dd7f 100644 --- a/src/web_service.rs +++ b/src/web_service.rs @@ -112,6 +112,7 @@ impl WebService { info!("WebService is enabled"); if matches!(Path::new(&publish_endpoints_path!()).try_exists(), Ok(true)) { + debug!("restore publish endpoints"); *PUBLISH_ENDPOINTS.lock().await = from_json_file(publish_endpoints_path!())?; }