diff --git a/Cargo.lock b/Cargo.lock index c54c9566..523a91c5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1043,15 +1043,6 @@ version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" -[[package]] -name = "file-id" -version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6bc904b9bbefcadbd8e3a9fb0d464a9b979de6324c03b3c663e8994f46a5be36" -dependencies = [ - "windows-sys 0.52.0", -] - [[package]] name = "filetime" version = "0.2.25" @@ -1126,15 +1117,6 @@ dependencies = [ "thiserror 1.0.69", ] -[[package]] -name = "fsevent-sys" -version = "4.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2" -dependencies = [ - "libc", -] - [[package]] name = "futures" version = "0.3.31" @@ -1748,8 +1730,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f37dccff2791ab604f9babef0ba14fbe0be30bd368dc541e2b08d07c8aa908f3" dependencies = [ "bitflags 2.9.1", + "futures-core", "inotify-sys", "libc", + "tokio", ] [[package]] @@ -1835,26 +1819,6 @@ dependencies = [ "wasm-bindgen", ] -[[package]] -name = "kqueue" -version = "1.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eac30106d7dce88daf4a3fcb4879ea939476d5074a9b7ddd0fb97fa4bed5596a" -dependencies = [ - "kqueue-sys", - "libc", -] - -[[package]] -name = "kqueue-sys" -version = "1.0.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed9625ffda8729b85e45cf04090035ac368927b8cebc34898e7c120f52e4838b" -dependencies = [ - "bitflags 1.3.2", - "libc", -] - [[package]] name = "language-tags" version = "0.3.2" @@ -2125,43 +2089,6 @@ dependencies = [ "minimal-lexical", ] -[[package]] -name = "notify" -version = "8.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d3d07927151ff8575b7087f245456e549fea62edf0ec4e565a5ee50c8402bc3" -dependencies = [ - "bitflags 2.9.1", - "fsevent-sys", - "inotify", - "kqueue", - "libc", - "log", - "mio", - "notify-types", - "walkdir", - "windows-sys 0.60.2", -] - -[[package]] -name = "notify-debouncer-full" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2d88b1a7538054351c8258338df7c931a590513fb3745e8c15eb9ff4199b8d1" -dependencies = [ - "file-id", - "log", - "notify", - "notify-types", - "walkdir", -] - -[[package]] -name = "notify-types" -version = "2.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5e0826a989adedc2a244799e823aece04662b66609d96af8dff7ac6df9a8925d" - [[package]] name = "ntapi" version = "0.4.1" @@ -2315,13 +2242,11 @@ dependencies = [ "futures-executor", "futures-util", "glob", - "lazy_static", + "inotify", "log", "log-panics", "mockall", "modemmanager", - "notify", - "notify-debouncer-full", "omnect-device-service", "rand 0.9.1", "regex", @@ -2853,15 +2778,6 @@ version = "1.0.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f" -[[package]] -name = "same-file" -version = "1.0.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" -dependencies = [ - "winapi-util", -] - [[package]] name = "schannel" version = "0.1.27" @@ -3596,16 +3512,6 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" -[[package]] -name = "walkdir" -version = "2.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b" -dependencies = [ - "same-file", - "winapi-util", -] - [[package]] name = "want" version = "0.3.1" @@ -3742,15 +3648,6 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" -[[package]] -name = "winapi-util" -version = "0.1.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" -dependencies = [ - "windows-sys 0.59.0", -] - [[package]] name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" diff --git a/Cargo.toml b/Cargo.toml index 1704396d..453f0276 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,12 +25,10 @@ freedesktop_entry_parser = { version = "1.3", default-features = false } futures = { version = "0.3", default-features = false } futures-util = { version = "0.3", default-features = false } glob = { version = "0.3", default-features = false } -lazy_static = { version = "1.5", default-features = false } +inotify = { version = "0.11", default-features = false, features = ["stream"] } log = { version = "0.4", default-features = false } log-panics = { version = "2", default-features = false } modemmanager = { git = "https://github.com/omnect/modemmanager.git", tag = "0.3.4", default-features = false, optional = true } -notify = { version = "8.2", default-features = false } -notify-debouncer-full = { version = "0.5", default-features = false } regex-lite = { version = "0.1", default-features = true } reqwest = { version = "0.12", default-features = false, features = [ "default-tls", diff --git a/src/twin/consent.rs b/src/twin/consent.rs index 53c2463a..01b423d2 100644 --- a/src/twin/consent.rs +++ b/src/twin/consent.rs @@ -1,19 +1,19 @@ use crate::{ common::{from_json_file, to_json_file}, - twin::{Feature, feature::*}, + twin::feature::*, }; use anyhow::{Context, Result, bail, ensure}; use azure_iot_sdk::client::IotMessage; +use inotify::WatchMask; use log::{info, warn}; -use notify_debouncer_full::{Debouncer, NoCache, notify::*}; use serde::{Deserialize, Serialize}; use serde_json::json; -use std::{collections::HashMap, env, path::Path}; +use std::{collections::HashMap, env, ffi::c_int, path::PathBuf}; use tokio::sync::mpsc::Sender; macro_rules! consent_path { () => { - Path::new(&env::var("CONSENT_DIR_PATH").unwrap_or("/etc/omnect/consent".to_string())) + PathBuf::from(&env::var("CONSENT_DIR_PATH").unwrap_or("/etc/omnect/consent".to_string())) }; } @@ -53,9 +53,8 @@ pub struct ConsentConfig { reset_consent_on_fail: bool, } -#[derive(Default)] pub struct DeviceUpdateConsent { - file_observer: Option>, + file_observer: HashMap, tx_reported_properties: Option>, } @@ -87,20 +86,15 @@ impl Feature for DeviceUpdateConsent { .await } - fn command_request_stream(&mut self) -> CommandRequestStreamResult { - let (file_observer, stream) = file_modified_stream::(vec![ - request_consent_path!().as_path(), - history_consent_path!().as_path(), - ]) - .context("command_request_stream: cannot create file_modified_stream")?; - self.file_observer = Some(file_observer); - Ok(Some(stream)) - } - async fn command(&mut self, cmd: &Command) -> CommandResult { match cmd { - Command::FileModified(file) => { - self.report_consent(from_json_file(&file.path)?).await?; + Command::WatchPath(cmd) => { + self.report_consent(from_json_file( + self.file_observer + .get(&cmd.id) + .context("cannot find file for watch id")?, + )?) + .await?; } Command::DesiredGeneralConsent(cmd) => { self.update_general_consent(cmd).await?; @@ -108,7 +102,7 @@ impl Feature for DeviceUpdateConsent { Command::UserConsent(cmd) => { self.user_consent(cmd)?; } - _ => bail!("unexpected command"), + _ => bail!("unexpected command: {cmd:?}"), }; Ok(None) @@ -119,6 +113,24 @@ impl DeviceUpdateConsent { const USER_CONSENT_VERSION: u8 = 1; const ID: &'static str = "device_update_consent"; + pub async fn new() -> Result { + let mut file_observer = HashMap::new(); + + for path in [request_consent_path!(), history_consent_path!()] { + file_observer.insert( + add_fs_watch::(&path, WatchMask::MODIFY) + .await? + .get_watch_descriptor_id(), + path, + ); + } + + Ok(DeviceUpdateConsent { + tx_reported_properties: None, + file_observer, + }) + } + fn user_consent(&self, cmd: &UserConsentCommand) -> CommandResult { info!("user consent requested: {cmd:?}"); @@ -192,27 +204,29 @@ mod tests { async fn consent_files_changed_test() { let (tx_reported_properties, mut rx_reported_properties) = tokio::sync::mpsc::channel(100); let mut consent = DeviceUpdateConsent { - file_observer: None, + file_observer: HashMap::from([( + 1, + PathBuf::from("testfiles/positive/test_component/user_consent.json"), + )]), tx_reported_properties: Some(tx_reported_properties), }; assert!( consent - .command(&Command::FileModified(PathCommand { + .command(&Command::WatchPath(WatchPathCommand { feature_id: TypeId::of::(), - path: Path::new("my-path").to_path_buf(), + id: 0, })) .await .unwrap_err() - .chain() - .any(|e| e.to_string().starts_with("failed to open for read: ")) + .to_string() + .eq("cannot find file for watch id") ); consent - .command(&Command::FileModified(PathCommand { + .command(&Command::WatchPath(WatchPathCommand { feature_id: TypeId::of::(), - path: Path::new("testfiles/positive/test_component/user_consent.json") - .to_path_buf(), + id: 1, })) .await .unwrap(); @@ -227,7 +241,7 @@ mod tests { async fn desired_consent_test() { let (tx_reported_properties, mut rx_reported_properties) = tokio::sync::mpsc::channel(100); let mut consent = DeviceUpdateConsent { - file_observer: None, + file_observer: HashMap::new(), tx_reported_properties: Some(tx_reported_properties), }; @@ -318,7 +332,7 @@ mod tests { async fn user_consent_test() { let (tx_reported_properties, _rx_reported_properties) = tokio::sync::mpsc::channel(100); let mut consent = DeviceUpdateConsent { - file_observer: None, + file_observer: HashMap::new(), tx_reported_properties: Some(tx_reported_properties), }; diff --git a/src/twin/factory_reset.rs b/src/twin/factory_reset.rs index 311f44e2..4aa6771f 100644 --- a/src/twin/factory_reset.rs +++ b/src/twin/factory_reset.rs @@ -1,14 +1,8 @@ -use crate::{ - bootloader_env, - common::from_json_file, - systemd, - twin::{Feature, feature::*}, - web_service, -}; +use crate::{bootloader_env, common::from_json_file, systemd, twin::feature::*, web_service}; use anyhow::{Context, Result, bail}; use azure_iot_sdk::client::IotMessage; +use inotify::WatchMask; use log::{debug, info, warn}; -use notify_debouncer_full::{Debouncer, NoCache, notify::*}; use serde::{Deserialize, Serialize}; use serde_json::{from_reader, json}; use serde_repr::*; @@ -37,8 +31,10 @@ macro_rules! config_path { macro_rules! custom_config_dir_path { () => { - env::var("FACTORY_RESET_CUSTOM_CONFIG_DIR_PATH") - .unwrap_or("/etc/omnect/factory-reset.d".to_string()) + Path::new( + &env::var("FACTORY_RESET_CUSTOM_CONFIG_DIR_PATH") + .unwrap_or("/etc/omnect/factory-reset.d".to_string()), + ) }; } @@ -92,7 +88,6 @@ struct FactoryResetReport { pub struct FactoryReset { tx_reported_properties: Option>, report: FactoryResetReport, - dir_observer: Option>, } impl Feature for FactoryReset { @@ -136,17 +131,9 @@ impl Feature for FactoryReset { Ok(()) } - fn command_request_stream(&mut self) -> CommandRequestStreamResult { - let (dir_observer, stream) = - dir_modified_stream::(vec![&Path::new(&custom_config_dir_path!())]) - .context("command_request_stream: cannot create dir_modified_stream")?; - self.dir_observer = Some(dir_observer); - Ok(Some(stream)) - } - async fn command(&mut self, cmd: &Command) -> CommandResult { match cmd { - Command::DirModified(_) => { + Command::WatchPath(_) => { let keys = FactoryReset::factory_reset_keys()?; if keys != self.report.keys { @@ -176,7 +163,7 @@ impl Feature for FactoryReset { Command::FactoryReset(cmd) => { self.reset_to_factory_settings(cmd).await?; } - _ => bail!("unexpected command"), + _ => bail!("unexpected command: {cmd:?}"), }; Ok(None) @@ -187,16 +174,21 @@ impl FactoryReset { const FACTORY_RESET_VERSION: u8 = 3; const ID: &'static str = "factory_reset"; - pub fn new() -> Result { + pub async fn new() -> Result { let report = FactoryResetReport { keys: FactoryReset::factory_reset_keys()?, result: FactoryReset::factory_reset_result()?, }; + add_fs_watch::( + custom_config_dir_path!(), + WatchMask::CREATE | WatchMask::DELETE, + ) + .await?; + Ok(FactoryReset { tx_reported_properties: None, report, - dir_observer: None, }) } @@ -292,7 +284,16 @@ mod tests { crate::common::set_env_var("FACTORY_RESET_CONFIG_FILE_PATH", config_file_path.clone()); crate::common::set_env_var("FACTORY_RESET_CUSTOM_CONFIG_DIR_PATH", custom_dir_path); - let mut factory_reset = FactoryReset::new().unwrap(); + let (tx_reported_properties, mut rx_reported_properties) = tokio::sync::mpsc::channel(100); + let (tx_outgoing_message, mut _rx_outgoing_message) = tokio::sync::mpsc::channel(100); + + let mut factory_reset = FactoryReset { + tx_reported_properties: Some(tx_reported_properties.clone()), + report: FactoryResetReport { + keys: FactoryReset::factory_reset_keys().unwrap(), + result: FactoryReset::factory_reset_result().unwrap(), + }, + }; assert!( factory_reset @@ -320,9 +321,6 @@ mod tests { .await .unwrap(); - let (tx_reported_properties, mut rx_reported_properties) = tokio::sync::mpsc::channel(100); - let (tx_outgoing_message, mut _rx_outgoing_message) = tokio::sync::mpsc::channel(100); - factory_reset .connect_twin(tx_reported_properties, tx_outgoing_message) .await diff --git a/src/twin/feature/mod.rs b/src/twin/feature/mod.rs index cc99d263..89769b59 100644 --- a/src/twin/feature/mod.rs +++ b/src/twin/feature/mod.rs @@ -2,22 +2,24 @@ use crate::twin::{ TwinUpdate, TwinUpdateState, consent, factory_reset, firmware_update, network, reboot, ssh_tunnel, system_info, }; -use anyhow::{Result, bail, ensure}; +use anyhow::{Context, Result, bail}; use azure_iot_sdk::client::{DirectMethod, IotMessage}; use futures::Stream; -use futures::StreamExt; +use futures_util::StreamExt; +use inotify::{Inotify, WatchDescriptor, WatchMask, Watches}; use log::{debug, error, info, warn}; -use notify_debouncer_full::{DebounceEventResult, Debouncer, NoCache, new_debouncer, notify::*}; use std::{ - any::TypeId, - path::{Path, PathBuf}, + any::{TypeId, type_name}, + collections::HashMap, + ffi::c_int, + path::Path, pin::Pin, + sync::LazyLock, time::Duration, }; use tokio::{ - sync::{mpsc, oneshot}, - task::JoinHandle, - time::{Instant, Interval}, + sync::{Mutex, mpsc, oneshot}, + task::JoinSet, }; #[derive(Clone, Debug, PartialEq)] @@ -25,11 +27,8 @@ pub enum Command { CloseSshTunnel(ssh_tunnel::CloseSshTunnelCommand), DesiredGeneralConsent(consent::DesiredGeneralConsentCommand), DesiredUpdateDeviceSshCa(ssh_tunnel::UpdateDeviceSshCaCommand), - DirModified(PathCommand), FactoryReset(factory_reset::FactoryResetCommand), FleetId(system_info::FleetIdCommand), - FileCreated(PathCommand), - FileModified(PathCommand), GetSshPubKey(ssh_tunnel::GetSshPubKeyCommand), Interval(IntervalCommand), LoadFirmwareUpdate(firmware_update::LoadUpdateCommand), @@ -38,8 +37,9 @@ pub enum Command { ReloadNetwork, RunFirmwareUpdate(firmware_update::RunUpdateCommand), SetWaitOnlineTimeout(reboot::SetWaitOnlineTimeoutCommand), - ValidateUpdate(bool), UserConsent(consent::UserConsentCommand), + ValidateUpdate(bool), + WatchPath(WatchPathCommand), } impl Command { @@ -50,10 +50,7 @@ impl Command { CloseSshTunnel(_) => TypeId::of::(), DesiredGeneralConsent(_) => TypeId::of::(), DesiredUpdateDeviceSshCa(_) => TypeId::of::(), - DirModified(cmd) => cmd.feature_id, FactoryReset(_) => TypeId::of::(), - FileCreated(cmd) => cmd.feature_id, - FileModified(cmd) => cmd.feature_id, FleetId(_) => TypeId::of::(), GetSshPubKey(_) => TypeId::of::(), Interval(cmd) => cmd.feature_id, @@ -63,8 +60,9 @@ impl Command { ReloadNetwork => TypeId::of::(), RunFirmwareUpdate(_) => TypeId::of::(), SetWaitOnlineTimeout(_) => TypeId::of::(), - ValidateUpdate(_) => TypeId::of::(), UserConsent(_) => TypeId::of::(), + ValidateUpdate(_) => TypeId::of::(), + WatchPath(cmd) => cmd.feature_id, } } @@ -115,11 +113,7 @@ impl Command { } } _ => { - bail!( - "cannot parse direct method {} with payload {}", - direct_method.name, - direct_method.payload - ) + bail!("cannot parse direct method {direct_method:?} ") } } } @@ -193,157 +187,181 @@ pub(crate) trait Feature { Ok(()) } - fn command_request_stream(&mut self) -> CommandRequestStreamResult { - Ok(None) - } - async fn command(&mut self, _cmd: &Command) -> CommandResult { unimplemented!(); } } #[derive(Clone, Debug, PartialEq)] -pub struct PathCommand { +pub struct IntervalCommand { pub feature_id: TypeId, - pub path: PathBuf, } -#[derive(Clone, Debug, PartialEq)] -pub struct IntervalCommand { +#[derive(Clone, Debug)] +pub struct WatchPathCommand { pub feature_id: TypeId, - pub instant: Instant, + pub id: core::ffi::c_int, } -pub fn interval_stream(interval: Interval) -> CommandRequestStream -where - T: 'static, -{ - tokio_stream::wrappers::IntervalStream::new(interval) - .map(|i| CommandRequest { - command: Command::Interval(IntervalCommand { - feature_id: TypeId::of::(), - instant: i, - }), - reply: None, - }) - .boxed() +impl PartialEq for WatchPathCommand { + fn eq(&self, other: &Self) -> bool { + self.id == other.id + } } -pub fn file_created_stream(paths: Vec<&Path>) -> Result<(JoinHandle<()>, CommandRequestStream)> -where - T: 'static, -{ - let (tx, rx) = mpsc::channel(2); - let inner_paths: Vec = paths.into_iter().map(|p| p.to_path_buf()).collect(); - - let handle = tokio::task::spawn_blocking(move || { - loop { - for p in &inner_paths { - if matches!(p.try_exists(), Ok(true)) { - let _ = tx.blocking_send(CommandRequest { - command: Command::FileCreated(PathCommand { - feature_id: TypeId::of::(), - path: p.clone(), - }), - reply: None, - }); - return; +#[derive(Debug)] +struct FSWatcher { + watches: Watches, + features: HashMap, +} + +/* + We chose LazyLock>> over OnceLock> here only for testing purposes, + in order to allow re-initialization for each testcase. +*/ +static TASKS: LazyLock>>> = LazyLock::new(|| Mutex::new(None)); +static FS_WATCHER: LazyLock>> = LazyLock::new(|| Mutex::new(None)); +static TX_COMMAND_REQUEST: LazyLock>>> = + LazyLock::new(|| Mutex::new(None)); + +pub async fn init(tx_command_request: mpsc::Sender) -> Result<()> { + *TX_COMMAND_REQUEST.lock().await = Some(tx_command_request.clone()); + + let inotify = Inotify::init().context("init: failed to initialize inotify")?; + *FS_WATCHER.lock().await = Some(FSWatcher { + watches: inotify.watches(), + features: HashMap::new(), + }); + + let mut tasks: JoinSet<_> = JoinSet::new(); + tasks.spawn(async move { + let mut buffer = [0; 1024]; + let mut stream = inotify + .into_event_stream(&mut buffer) + .expect("init: failed to get inotify stream"); + while let Some(event_result) = stream.next().await { + match event_result { + Ok(event) => { + debug!("inotify: {event:?}"); + + let Some(feature_id) = FS_WATCHER + .lock() + .await + .as_ref() + .expect("init: failed to get FS_WATCHER") + .features + .get(&event.wd.get_watch_descriptor_id()) + .cloned() + else { + error!("inotify: unknown wd {:?}", event.wd); + continue; + }; + + if let Err(e) = tx_command_request + .send(CommandRequest { + command: Command::WatchPath(WatchPathCommand { + feature_id, + id: event.wd.get_watch_descriptor_id(), + }), + reply: None, + }) + .await + { + error!("inotify: send {e:#?}") + } } + Err(e) => error!("inotify: event {e:#?}"), } - std::thread::sleep(Duration::from_millis(500)); } }); - Ok(( - handle, - tokio_stream::wrappers::ReceiverStream::new(rx).boxed(), - )) + *TASKS.lock().await = Some(tasks); + + Ok(()) } -pub fn file_modified_stream( - paths: Vec<&Path>, -) -> Result<(Debouncer, CommandRequestStream)> +pub(crate) async fn add_fs_watch(path: &Path, mask: WatchMask) -> Result where - T: 'static, + T: Feature + 'static, { - let (tx, rx) = mpsc::channel(2); - let mut debouncer = new_debouncer( - Duration::from_secs(2), - None, - move |res: DebounceEventResult| match res { - Ok(debounced_events) => { - for de in debounced_events { - if let EventKind::Modify(_) = de.event.kind { - debug!("notify-event: {de:?}"); - for p in &de.paths { - let _ = tx.blocking_send(CommandRequest { - command: Command::FileModified(PathCommand { - feature_id: TypeId::of::(), - path: p.clone(), - }), - reply: None, - }); - } - } - } - } - Err(errors) => errors.iter().for_each(|e| error!("notify-error: {e:?}")), - }, - )?; - - for p in paths { - ensure!(p.is_file(), "{p:?} is not a regular existing file"); - debug!("watch {p:?}"); - debouncer.watch(p, RecursiveMode::NonRecursive)?; + debug!("add_fs_watch: {} {path:?} {mask:?}", type_name::()); + + let mut watcher = FS_WATCHER.lock().await; + let watcher = watcher + .as_mut() + .context("add_fs_watch: FS_WATCHER doesn't exist")?; + + let wd = watcher + .watches + .add(path, mask) + .context("add_fs_watch: failed to add")?; + + if watcher + .features + .insert(wd.get_watch_descriptor_id(), TypeId::of::()) + .is_some() + { + bail!("add_fs_watch: currently only one feature per watch supported") } - Ok(( - debouncer, - tokio_stream::wrappers::ReceiverStream::new(rx).boxed(), - )) + Ok(wd) } -pub fn dir_modified_stream( - paths: Vec<&Path>, -) -> Result<(Debouncer, CommandRequestStream)> +pub async fn remove_fs_watch(wd: WatchDescriptor) -> Result<()> { + debug!("remove_fs_watch: {wd:?}"); + + let mut watcher = FS_WATCHER.lock().await; + let watcher = watcher + .as_mut() + .context("remove_fs_watch: FS_WATCHER doesn't exist")?; + + watcher.watches.remove(wd.clone())?; + + if watcher + .features + .remove_entry(&wd.get_watch_descriptor_id()) + .is_none() + { + warn!("remove_fs_watch: WatchDescriptor doesn't exist") + }; + + Ok(()) +} + +pub async fn add_notify_interval(interval: Duration) -> Result<()> where T: 'static, { - let (tx, rx) = mpsc::channel(2); - let mut debouncer = new_debouncer( - Duration::from_secs(2), - None, - move |res: DebounceEventResult| match res { - Ok(debounced_events) => { - for de in debounced_events { - if matches!(de.event.kind, EventKind::Create(_) | EventKind::Remove(_)) { - debug!("notify-event: {de:?}"); - for p in &de.paths { - let _ = tx.blocking_send(CommandRequest { - command: Command::DirModified(PathCommand { - feature_id: TypeId::of::(), - path: p.clone(), - }), - reply: None, - }); - } - } + let feature_id = TypeId::of::(); + + debug!("add_notify_interval: {feature_id:?} {interval:?}"); + + let Some(tx_command_request) = TX_COMMAND_REQUEST.lock().await.clone() else { + bail!("add_notify_interval: sender doesn't exist") + }; + + TASKS + .lock() + .await + .as_mut() + .context("add_notify_interval: TASKS doesn't exist")? + .spawn(async move { + let mut interval = tokio::time::interval(interval); + loop { + interval.tick().await; + if let Err(e) = tx_command_request + .send(CommandRequest { + command: Command::Interval(IntervalCommand { feature_id }), + reply: None, + }) + .await + { + error!("add_notify_interval: send {e:#?}") } } - Err(errors) => errors.iter().for_each(|e| error!("notify-error: {e:?}")), - }, - )?; - - for p in paths { - ensure!(p.is_dir(), "{p:?} is not a regular existing directory"); - debug!("watch {p:?}"); - debouncer.watch(p, RecursiveMode::Recursive)?; - } + }); - Ok(( - debouncer, - tokio_stream::wrappers::ReceiverStream::new(rx).boxed(), - )) + Ok(()) } #[cfg(test)] @@ -352,7 +370,7 @@ mod tests { use crate::twin::factory_reset; use reboot::SetWaitOnlineTimeoutCommand; use serde_json::json; - use std::str::FromStr; + use std::path::PathBuf; use tokio::sync::oneshot; #[test] @@ -443,10 +461,7 @@ mod tests { }) .unwrap(), Command::UserConsent(consent::UserConsentCommand { - user_consent: std::collections::HashMap::from([( - "foo".to_string(), - "bar".to_string() - )]), + user_consent: HashMap::from([("foo".to_string(), "bar".to_string())]), }) ); @@ -518,7 +533,7 @@ mod tests { host: "my-host".to_string(), port: 22, user: "usr".to_string(), - socket_path: PathBuf::from_str("/socket").unwrap(), + socket_path: PathBuf::from("/socket"), } }) ); diff --git a/src/twin/firmware_update/mod.rs b/src/twin/firmware_update/mod.rs index e6fa6f34..b425d338 100644 --- a/src/twin/firmware_update/mod.rs +++ b/src/twin/firmware_update/mod.rs @@ -9,7 +9,6 @@ use crate::{ systemd, systemd::{unit::UnitAction, watchdog::WatchdogManager}, twin::{ - Feature, feature::*, firmware_update::{adu_types::*, common::*, os_version::*}, }, @@ -188,7 +187,7 @@ impl Feature for FirmwareUpdate { .await?; Ok(None) } - _ => bail!("unexpected command"), + _ => bail!("unexpected command: {cmd:?}"), } } } diff --git a/src/twin/mod.rs b/src/twin/mod.rs index 9ccafa07..1edcbc0b 100644 --- a/src/twin/mod.rs +++ b/src/twin/mod.rs @@ -39,7 +39,7 @@ use std::{ path::Path, time::{self, Duration}, }; -use tokio::{select, sync::mpsc}; +use tokio::{select, sync::mpsc, try_join}; use tokio_stream::wrappers::{IntervalStream, ReceiverStream}; #[derive(PartialEq)] @@ -66,23 +66,28 @@ impl Twin { tx_reported_properties: mpsc::Sender, tx_outgoing_message: mpsc::Sender, ) -> Result { - let web_service = web_service::WebService::run(tx_command_request.clone()).await?; + let (_, web_service) = try_join!( + feature::init(tx_command_request.clone()), + web_service::WebService::run(tx_command_request.clone()) + )?; + /* - init features first - start with SystemInfo in order to log useful infos asap + - ToDo: concurrency by try_join_all */ let features = HashMap::from([ ( TypeId::of::(), - DynFeature::new_box(system_info::SystemInfo::new()?), + DynFeature::new_box(system_info::SystemInfo::new().await?), ), ( TypeId::of::(), - DynFeature::new_box(consent::DeviceUpdateConsent::default()), + DynFeature::new_box(consent::DeviceUpdateConsent::new().await?), ), ( TypeId::of::(), - DynFeature::new_box(factory_reset::FactoryReset::new()?), + DynFeature::new_box(factory_reset::FactoryReset::new().await?), ), ( TypeId::of::(), @@ -90,15 +95,15 @@ impl Twin { ), ( TypeId::of::(), - DynFeature::new_box(modem_info::ModemInfo::new()), + DynFeature::new_box(modem_info::ModemInfo::new().await?), ), ( TypeId::of::(), - DynFeature::new_box(network::Network::default()), + DynFeature::new_box(network::Network::new().await?), ), ( TypeId::of::(), - DynFeature::new_box(provisioning_config::ProvisioningConfig::new()?), + DynFeature::new_box(provisioning_config::ProvisioningConfig::new().await?), ), ( TypeId::of::(), @@ -332,19 +337,6 @@ impl Twin { } } - fn feature_command_request_streams(&mut self) -> Vec { - self.features - .values_mut() - .filter_map(|f| { - if f.is_enabled() { - f.command_request_stream().unwrap() - } else { - None - } - }) - .collect() - } - async fn request_validate_update(&mut self, authenticated: bool) -> Result<()> { self.tx_command_request .send(CommandRequest { @@ -385,10 +377,11 @@ impl Twin { systemd::sd_notify_ready(); - let mut command_requests = twin.feature_command_request_streams(); - command_requests.push(Self::direct_method_stream(rx_direct_method)); - command_requests.push(Self::desired_properties_stream(rx_twin_desired)); - command_requests.push(ReceiverStream::new(rx_command_request).boxed()); + let command_requests = vec![ + Self::direct_method_stream(rx_direct_method), + Self::desired_properties_stream(rx_twin_desired), + ReceiverStream::new(rx_command_request).boxed(), + ]; tokio::pin! { let client_created = Self::connect_iothub_client(&client_builder); diff --git a/src/twin/mod_test.rs b/src/twin/mod_test.rs index 385fbd32..39f2ac0f 100644 --- a/src/twin/mod_test.rs +++ b/src/twin/mod_test.rs @@ -6,9 +6,7 @@ pub mod mod_test { AuthenticationObserver, DirectMethod, DirectMethodObserver, TwinObserver, }; use cp_r::CopyOptions; - use env_logger::{Builder, Env}; use futures_executor::block_on; - use lazy_static::lazy_static; use mockall::{automock, predicate::*}; use rand::{ distr::Alphanumeric, @@ -18,14 +16,6 @@ pub mod mod_test { use std::fs::{copy, create_dir_all, remove_dir_all}; use std::{env, fs::OpenOptions, path::PathBuf, time::Duration}; - lazy_static! { - static ref LOG: () = if cfg!(debug_assertions) { - Builder::from_env(Env::default().default_filter_or("debug")).init() - } else { - Builder::from_env(Env::default().default_filter_or("info")).init() - }; - } - const TMPDIR_FORMAT_STR: &str = "/tmp/omnect-device-service-tests/"; const UTC_REGEX: &str = r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(\.\d+)?(Z|[\+-]\d{2}:\d{2})"; @@ -83,8 +73,7 @@ pub mod mod_test { impl TestEnvironment { pub fn new(name: &str) -> TestEnvironment { - lazy_static::initialize(&LOG); - let dirpath = format!("{}{}", TMPDIR_FORMAT_STR, name); + let dirpath = format!("{TMPDIR_FORMAT_STR}{name}"); create_dir_all(&dirpath).unwrap(); TestEnvironment { dirpath } } @@ -92,7 +81,7 @@ pub mod mod_test { pub fn copy_directory(&self, dir: &str) -> PathBuf { let destdir = String::from(dir); let destdir = destdir.split('/').last().unwrap(); - let path = PathBuf::from(format!("{}/{}", self.dirpath, destdir)); + let path = PathBuf::from(format!("{}/{destdir}", self.dirpath)); CopyOptions::new().copy_tree(dir, &path).unwrap(); path } @@ -100,7 +89,7 @@ pub mod mod_test { pub fn mkdir(&self, dir: &str) -> PathBuf { let destdir = String::from(dir); let destdir = destdir.split('/').last().unwrap(); - let path = PathBuf::from(format!("{}/{}", self.dirpath, destdir)); + let path = PathBuf::from(format!("{}/{destdir}", self.dirpath)); std::fs::create_dir_all(&path).unwrap(); path } @@ -108,7 +97,7 @@ pub mod mod_test { pub fn copy_file(&self, file: &str) -> PathBuf { let destfile = String::from(file); let destfile = destfile.split('/').last().unwrap(); - let path = PathBuf::from(format!("{}/{}", self.dirpath, destfile)); + let path = PathBuf::from(format!("{}/{destfile}", self.dirpath)); copy(file, &path).unwrap(); path } @@ -132,6 +121,7 @@ pub mod mod_test { struct TestConfig { twin: Twin, + rx_command_request: mpsc::Receiver, dir: PathBuf, } @@ -236,10 +226,10 @@ pub mod mod_test { let (tx_direct_method, _rx_direct_method) = mpsc::channel(100); let (tx_reported_properties, mut rx_reported_properties) = mpsc::channel(100); let (tx_outgoing_message, _rx_outgoing_message) = mpsc::channel(100); - let (tx_web_service, _rx_web_service) = mpsc::channel(100); + let (tx_command_request, rx_command_request) = mpsc::channel(100); let mut twin = block_on(Twin::new( - tx_web_service, + tx_command_request, tx_reported_properties, tx_outgoing_message, )) @@ -258,6 +248,7 @@ pub mod mod_test { // create test config let mut config = TestConfig { twin, + rx_command_request, dir: PathBuf::from(test_env.dirpath()), }; @@ -651,15 +642,6 @@ pub mod mod_test { let test = |test_attr: &mut TestConfig| { assert!(block_on(async { test_attr.twin.connect_twin().await }).is_ok()); - let mut ev_stream = test_attr - .twin - .features - .get_mut(&TypeId::of::()) - .unwrap() - .command_request_stream() - .unwrap() - .unwrap(); - serde_json::to_writer_pretty( OpenOptions::new() .write(true) @@ -676,7 +658,20 @@ pub mod mod_test { ) .unwrap(); - let cmd = block_on(async { ev_stream.next().await }).unwrap(); + let cmd = block_on(async { + let mut buffer: Vec = Vec::with_capacity(10); + test_attr + .rx_command_request + .recv_many(&mut buffer, 10) + .await; + let i = buffer + .into_iter() + .find(|x| { + x.command.feature_id() == TypeId::of::() + }) + .unwrap(); + i + }); assert!( block_on(async { diff --git a/src/twin/modem_info.rs b/src/twin/modem_info.rs index 76d82e24..b5886dea 100644 --- a/src/twin/modem_info.rs +++ b/src/twin/modem_info.rs @@ -1,10 +1,9 @@ -use super::{Feature, feature::*}; +use crate::twin::feature::*; use anyhow::{Context, Result, bail}; use azure_iot_sdk::client::IotMessage; -use lazy_static::lazy_static; use serde_json::json; use std::{env, time::Duration}; -use tokio::{sync::mpsc::Sender, time::interval}; +use tokio::sync::mpsc::Sender; #[cfg(feature = "modem_info")] mod inner { @@ -300,8 +299,20 @@ mod inner { } impl ModemInfo { - pub fn new() -> Self { - ModemInfo::default() + pub async fn new() -> Result { + let modem_info = ModemInfo::default(); + if modem_info.is_enabled() { + let refresh_interval = env::var("REFRESH_MODEM_INFO_INTERVAL_SECS") + .unwrap_or("600".to_string()) + .parse::() + .context("cannot parse REFRESH_MODEM_INFO_INTERVAL_SECS env var")?; + + if 0 < refresh_interval { + add_notify_interval::(Duration::from_secs(refresh_interval)).await?; + } + } + + Ok(modem_info) } pub async fn report(&self, force: bool) -> Result<()> { @@ -330,16 +341,6 @@ pub use inner::ModemInfo; const MODEM_INFO_VERSION: u8 = 1; const ID: &str = "modem_info"; -lazy_static! { - static ref REFRESH_MODEM_INFO_INTERVAL_SECS: u64 = { - const REFRESH_MODEM_INFO_INTERVAL_SECS_DEFAULT: &str = "600"; - env::var("REFRESH_MODEM_INFO_INTERVAL_SECS") - .unwrap_or(REFRESH_MODEM_INFO_INTERVAL_SECS_DEFAULT.to_string()) - .parse::() - .expect("cannot parse REFRESH_MODEM_INFO_INTERVAL_SECS env var") - }; -} - impl Feature for ModemInfo { fn name(&self) -> String { ID.to_string() @@ -365,19 +366,9 @@ impl Feature for ModemInfo { self.report(true).await } - fn command_request_stream(&mut self) -> CommandRequestStreamResult { - if !self.is_enabled() || 0 == *REFRESH_MODEM_INFO_INTERVAL_SECS { - Ok(None) - } else { - Ok(Some(interval_stream::(interval( - Duration::from_secs(*REFRESH_MODEM_INFO_INTERVAL_SECS), - )))) - } - } - async fn command(&mut self, cmd: &Command) -> CommandResult { let Command::Interval(_) = cmd else { - bail!("unexpected event: {cmd:?}") + bail!("unexpected command: {cmd:?}") }; self.report(false).await?; diff --git a/src/twin/network.rs b/src/twin/network.rs index 8d65d44a..6f40f494 100644 --- a/src/twin/network.rs +++ b/src/twin/network.rs @@ -1,26 +1,15 @@ use crate::{ systemd::{networkd, unit}, - twin::{Feature, feature::*}, + twin::feature::*, web_service, }; use anyhow::{Context, Result, bail}; use azure_iot_sdk::client::IotMessage; -use lazy_static::lazy_static; use log::{debug, error, info, warn}; use serde::Serialize; use serde_json::json; use std::{env, time::Duration}; -use tokio::{sync::mpsc::Sender, time::interval}; - -lazy_static! { - static ref REFRESH_NETWORK_STATUS_INTERVAL_SECS: u64 = { - const REFRESH_NETWORK_STATUS_INTERVAL_SECS_DEFAULT: &str = "60"; - env::var("REFRESH_NETWORK_STATUS_INTERVAL_SECS") - .unwrap_or(REFRESH_NETWORK_STATUS_INTERVAL_SECS_DEFAULT.to_string()) - .parse::() - .expect("cannot parse REFRESH_NETWORK_STATUS_INTERVAL_SECS env var") - }; -} +use tokio::sync::mpsc::Sender; static NETWORK_SERVICE: &str = "systemd-networkd.service"; @@ -47,7 +36,6 @@ pub struct Interface { ipv4: IpConfig, } -#[derive(Default)] pub struct Network { tx_reported_properties: Option>, interfaces: Vec, @@ -76,16 +64,6 @@ impl Feature for Network { Ok(()) } - fn command_request_stream(&mut self) -> CommandRequestStreamResult { - if !self.is_enabled() || 0 == *REFRESH_NETWORK_STATUS_INTERVAL_SECS { - Ok(None) - } else { - Ok(Some(interval_stream::(interval( - Duration::from_secs(*REFRESH_NETWORK_STATUS_INTERVAL_SECS), - )))) - } - } - async fn command(&mut self, cmd: &Command) -> CommandResult { match cmd { Command::Interval(_) => {} @@ -97,7 +75,7 @@ impl Feature for Network { ) .await? } - _ => bail!("unexpected command"), + _ => bail!("unexpected command: {cmd:?}"), } self.report(false).await?; @@ -110,6 +88,22 @@ impl Network { const NETWORK_STATUS_VERSION: u8 = 3; const ID: &'static str = "network_status"; + pub async fn new() -> Result { + let refresh_interval = env::var("REFRESH_NETWORK_STATUS_INTERVAL_SECS") + .unwrap_or("60".to_string()) + .parse::() + .context("cannot parse REFRESH_NETWORK_STATUS_INTERVAL_SECS env var")?; + + if 0 < refresh_interval { + add_notify_interval::(Duration::from_secs(refresh_interval)).await?; + } + + Ok(Network { + interfaces: vec![], + tx_reported_properties: None, + }) + } + async fn report(&mut self, force: bool) -> Result<()> { let interfaces = Self::parse_interfaces(&networkd::networkd_interfaces().await?)?; diff --git a/src/twin/provisioning_config.rs b/src/twin/provisioning_config.rs index 3ee00ea4..2a6d0103 100644 --- a/src/twin/provisioning_config.rs +++ b/src/twin/provisioning_config.rs @@ -1,23 +1,12 @@ -use crate::twin::{Feature, feature::*}; +use crate::twin::feature::{self, *}; use anyhow::{Context, Result, bail, ensure}; use azure_iot_sdk::client::IotMessage; -use lazy_static::lazy_static; use log::{debug, info, warn}; use serde::Serialize; use serde_json::json; use std::{env, path::Path, time::Duration}; use time::format_description::well_known::Rfc3339; -use tokio::{sync::mpsc::Sender, time::interval}; - -lazy_static! { - static ref REFRESH_EST_EXPIRY_INTERVAL_SECS: u64 = { - const REFRESH_EST_EXPIRY_INTERVAL_SECS_DEFAULT: &str = "180"; - env::var("REFRESH_EST_EXPIRY_INTERVAL_SECS") - .unwrap_or(REFRESH_EST_EXPIRY_INTERVAL_SECS_DEFAULT.to_string()) - .parse::() - .expect("cannot parse REFRESH_EST_EXPIRY_INTERVAL_SECS env var") - }; -} +use tokio::sync::mpsc::Sender; #[derive(Debug, Serialize)] #[serde(rename_all = "snake_case")] @@ -141,22 +130,9 @@ impl Feature for ProvisioningConfig { self.report().await } - fn command_request_stream(&mut self) -> CommandRequestStreamResult { - if !self.is_enabled() || 0 == *REFRESH_EST_EXPIRY_INTERVAL_SECS { - Ok(None) - } else { - match &self.method { - Method::X509(cert) if cert.est => Ok(Some(interval_stream::( - interval(Duration::from_secs(*REFRESH_EST_EXPIRY_INTERVAL_SECS)), - ))), - _ => Ok(None), - } - } - } - async fn command(&mut self, cmd: &Command) -> CommandResult { let Command::Interval(_) = cmd else { - bail!("unexpected event: {cmd:?}") + bail!("unexpected command: {cmd:?}") }; let expires = match &self.method { @@ -184,7 +160,7 @@ impl ProvisioningConfig { const ID: &'static str = "provisioning_config"; const IDENTITY_CONFIG_FILE_PATH_DEFAULT: &'static str = "/etc/aziot/config.toml"; - pub fn new() -> Result { + pub async fn new() -> Result { let path = env::var("IDENTITY_CONFIG_FILE_PATH") .unwrap_or(ProvisioningConfig::IDENTITY_CONFIG_FILE_PATH_DEFAULT.to_string()); let config: toml::map::Map = std::fs::read_to_string(&path) @@ -232,6 +208,23 @@ impl ProvisioningConfig { _ => bail!("provisioning_config: invalid provisioning configuration found"), }; + let refresh_interval = env::var("REFRESH_EST_EXPIRY_INTERVAL_SECS") + .unwrap_or("180".to_string()) + .parse::() + .context("cannot parse REFRESH_EST_EXPIRY_INTERVAL_SECS env var")?; + + if 0 < refresh_interval + && matches!( + method, + Method::X509(X509 { + est: true, + expires: _ + }) + ) + { + feature::add_notify_interval::(Duration::from_secs(refresh_interval)).await?; + } + let this = ProvisioningConfig { tx_reported_properties: None, source, @@ -260,16 +253,15 @@ impl ProvisioningConfig { #[cfg(test)] mod tests { - use std::any::TypeId; - use tokio::time::Instant; - use super::*; + use std::any::TypeId; - #[test] - fn provisioning_config_test() { + #[tokio::test(flavor = "multi_thread")] + async fn provisioning_config_test() { crate::common::set_env_var("IDENTITY_CONFIG_FILE_PATH", ""); assert!( ProvisioningConfig::new() + .await .unwrap_err() .to_string() .starts_with("provisioning_config: cannot read") @@ -281,7 +273,7 @@ mod tests { ); crate::common::set_env_var("EST_CERT_FILE_PATH", "testfiles/positive/deviceid1-*.cer"); assert_eq!( - serde_json::to_value(ProvisioningConfig::new().unwrap()).unwrap(), + serde_json::to_value(ProvisioningConfig::new().await.unwrap()).unwrap(), json!({ "source": "dps", "method": { @@ -298,7 +290,7 @@ mod tests { "testfiles/positive/config.toml.tpm", ); assert_eq!( - serde_json::to_value(ProvisioningConfig::new().unwrap()).unwrap(), + serde_json::to_value(ProvisioningConfig::new().await.unwrap()).unwrap(), json!({ "source": "dps", "method": "tpm" @@ -315,7 +307,7 @@ mod tests { crate::common::set_env_var("EST_CERT_FILE_PATH", "testfiles/positive/deviceid1-*.cer"); - let mut config = ProvisioningConfig::new().unwrap(); + let mut config = ProvisioningConfig::new().await.unwrap(); assert_eq!( serde_json::to_value(&config).unwrap(), @@ -339,7 +331,6 @@ mod tests { config .command(&Command::Interval(IntervalCommand { feature_id: TypeId::of::(), - instant: Instant::now(), })) .await .unwrap(); diff --git a/src/twin/reboot.rs b/src/twin/reboot.rs index 5dac511e..93b6cdf1 100644 --- a/src/twin/reboot.rs +++ b/src/twin/reboot.rs @@ -1,8 +1,4 @@ -use crate::{ - systemd, - twin::{Feature, feature::*}, - web_service, -}; +use crate::{systemd, twin::feature::*, web_service}; use anyhow::{Context, Result, bail}; use azure_iot_sdk::client::IotMessage; use log::{debug, info}; @@ -52,7 +48,7 @@ impl Feature for Reboot { match cmd { Command::Reboot => self.reboot().await, Command::SetWaitOnlineTimeout(cmd) => self.set_wait_online_timeout(cmd).await, - _ => bail!("unexpected command"), + _ => bail!("unexpected command: {cmd:?}"), } } } diff --git a/src/twin/ssh_tunnel.rs b/src/twin/ssh_tunnel.rs index 6f453d6b..135dac33 100644 --- a/src/twin/ssh_tunnel.rs +++ b/src/twin/ssh_tunnel.rs @@ -1,11 +1,11 @@ use crate::twin::{ - feature::{Command as FeatureCommand, CommandResult}, Feature, + feature::{Command as FeatureCommand, CommandResult}, }; -use anyhow::{bail, ensure, Context, Result}; +use anyhow::{Context, Result, bail, ensure}; use azure_iot_sdk::client::IotMessage; use log::{debug, error, info, warn}; -use serde::{de::Error, Deserialize, Deserializer}; +use serde::{Deserialize, Deserializer, de::Error}; use serde_json::json; use std::{ env, @@ -18,7 +18,7 @@ use std::{ use tokio::{ io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, process::{Child, Command}, - sync::{mpsc::Sender, OwnedSemaphorePermit, Semaphore, TryAcquireError}, + sync::{OwnedSemaphorePermit, Semaphore, TryAcquireError, mpsc::Sender}, }; use uuid::Uuid; @@ -409,8 +409,7 @@ impl SshTunnel { Ok(output) => output, Err(err) => { error!( - "await_tunnel_termination: could not retrieve output from ssh process: {}", - err + "await_tunnel_termination: could not retrieve output from ssh process: {err}" ); return; } @@ -429,7 +428,7 @@ impl SshTunnel { warn!("Failed to send tunnel update to cloud: {err}"); } - info!("Closed ssh tunnel: {}", tunnel_id); + info!("Closed ssh tunnel: {tunnel_id}"); } async fn await_tunnel_creation(ssh_process: &mut Child) -> Result<()> { @@ -448,14 +447,18 @@ impl SshTunnel { if msg == "established" { Ok(()) } else { - bail!("await_tunnel_creation: failed to establish ssh tunnel due to unexpected response from ssh server: {}", msg); + bail!( + "await_tunnel_creation: failed to establish ssh tunnel due to unexpected response from ssh server: {msg}", + ); } } Ok(None) => { bail!("await_tunnel_creation: failed to establish ssh tunnel"); } Err(err) => { - bail!("await_tunnel_creation: failed to establish ssh tunnel since unable to read from ssh process: {}", err); + bail!( + "await_tunnel_creation: failed to establish ssh tunnel since unable to read from ssh process: {err}", + ); } } } @@ -464,8 +467,8 @@ impl SshTunnel { let control_socket_path = control_socket_path!(&args.tunnel_id); info!( - "close_ssh_tunnel: \"{}\", socket path: \"{:?}\"", - args.tunnel_id, control_socket_path + "close_ssh_tunnel: \"{}\", socket path: \"{control_socket_path:?}\"", + args.tunnel_id ); Self::close_tunnel_command(&control_socket_path).await?; @@ -600,9 +603,8 @@ impl Drop for SshCredentialsGuard { .for_each(|file| { if let Err(err) = remove_file(file) { warn!( - "Failed to delete certificate \"{}\": {}", - file.to_string_lossy(), - err + "Failed to delete certificate \"{}\": {err}", + file.to_string_lossy() ); } }) @@ -667,8 +669,8 @@ mod tests { const KEY_NAME: &str = "key"; let priv_key_path = key_dir.path().join(KEY_NAME); - let pub_key_path = key_dir.path().join(format!("{}.pub", KEY_NAME)); - let cert_path = key_dir.path().join(format!("{}-cert.pub", KEY_NAME)); + let pub_key_path = key_dir.path().join(format!("{KEY_NAME}.pub")); + let cert_path = key_dir.path().join(format!("{KEY_NAME}-cert.pub")); let _priv_key_file = File::create(&priv_key_path).unwrap(); let _pub_key_file = File::create(&pub_key_path).unwrap(); @@ -869,7 +871,7 @@ mod tests { // test connection limit let pipe_names = (1..=5) .into_iter() - .map(|pipe_num| tmp_dir.path().join(&format!("named_pipe_{}", pipe_num))) + .map(|pipe_num| tmp_dir.path().join(&format!("named_pipe_{pipe_num}"))) .collect::>(); for pipe_name in &pipe_names { @@ -898,19 +900,21 @@ mod tests { } // the final should fail - assert!(ssh_tunnel - .command(&FeatureCommand::OpenSshTunnel(OpenSshTunnelCommand { - tunnel_id: "b7afb216-5f7a-4755-a300-9374f8a0e9ff".to_string(), - certificate: std::fs::read_to_string(cert_path.clone()).unwrap(), - bastion_config: BastionConfig { - host: "test-host".to_string(), - port: 2222, - user: "test-user".to_string(), - socket_path: PathBuf::from_str("/some/test/socket/path").unwrap(), - }, - })) - .await - .is_err()); + assert!( + ssh_tunnel + .command(&FeatureCommand::OpenSshTunnel(OpenSshTunnelCommand { + tunnel_id: "b7afb216-5f7a-4755-a300-9374f8a0e9ff".to_string(), + certificate: std::fs::read_to_string(cert_path.clone()).unwrap(), + bastion_config: BastionConfig { + host: "test-host".to_string(), + port: 2222, + user: "test-user".to_string(), + socket_path: PathBuf::from_str("/some/test/socket/path").unwrap(), + }, + })) + .await + .is_err() + ); // finally, close the pipes. By opening and closing for writing is // sufficient. diff --git a/src/twin/system_info.rs b/src/twin/system_info.rs index 591a6550..9a146cdc 100644 --- a/src/twin/system_info.rs +++ b/src/twin/system_info.rs @@ -1,36 +1,20 @@ -use crate::{ - common::RootPartition, - reboot_reason, - twin::{Feature, feature::*}, - web_service, -}; +use crate::{common::RootPartition, reboot_reason, twin::feature::*, web_service}; use anyhow::{Context, Result, bail}; use azure_iot_sdk::client::{IotHubClient, IotMessage}; -use futures::StreamExt; -use lazy_static::lazy_static; -use log::{debug, info, warn}; +use inotify::WatchMask; +use log::{info, warn}; use serde::{Deserialize, Serialize}; use serde_json::json; -use std::{env, path::Path}; +use std::{env, path::Path, time::Duration}; use sysinfo; use time::format_description::well_known::Rfc3339; -use tokio::{ - sync::mpsc, - task::JoinHandle, - time::{Duration, interval}, -}; +use tokio::sync::mpsc; static BOOTLOADER_UPDATED_FILE: &str = "/run/omnect-device-service/omnect_bootloader_updated"; - -lazy_static! { - static ref REFRESH_SYSTEM_INFO_INTERVAL_SECS: u64 = { - const REFRESH_SYSTEM_INFO_INTERVAL_SECS_DEFAULT: &str = "60"; - env::var("REFRESH_SYSTEM_INFO_INTERVAL_SECS") - .unwrap_or(REFRESH_SYSTEM_INFO_INTERVAL_SECS_DEFAULT.to_string()) - .parse::() - .expect("cannot parse REFRESH_SYSTEM_INFO_INTERVAL_SECS env var") - }; -} +#[cfg(not(feature = "mock"))] +static TIMESYNC_FILE: &str = "/run/systemd/timesync/synchronized"; +#[cfg(feature = "mock")] +static TIMESYNC_FILE: &str = "/tmp/synchronized"; #[derive(Serialize)] struct Label { @@ -69,14 +53,6 @@ impl Metric { } } -lazy_static! { - static ref TIMESYNC_FILE: &'static Path = if cfg!(feature = "mock") { - Path::new("/tmp/synchronized") - } else { - Path::new("/run/systemd/timesync/synchronized") - }; -} - #[derive(Clone, Debug, Deserialize, PartialEq)] pub struct FleetIdCommand { pub fleet_id: String, @@ -118,8 +94,6 @@ pub struct SystemInfo { reboot_reason: Option, #[serde(skip_serializing_if = "Option::is_none")] fleet_id: Option, - #[serde(skip_serializing)] - handle: Option>, } impl Feature for SystemInfo { fn name(&self) -> String { @@ -148,29 +122,12 @@ impl Feature for SystemInfo { self.report().await } - fn command_request_stream(&mut self) -> CommandRequestStreamResult { - Ok(match *REFRESH_SYSTEM_INFO_INTERVAL_SECS { - 0 if self.software_info.boot_time.is_none() => Some(self.file_created_stream()?), - 0 if self.software_info.boot_time.is_some() => None, - _ => { - let interval_stream = interval_stream::(interval(Duration::from_secs( - *REFRESH_SYSTEM_INFO_INTERVAL_SECS, - ))); - Some(if self.software_info.boot_time.is_none() { - futures::stream::select(interval_stream, self.file_created_stream()?).boxed() - } else { - interval_stream - }) - } - }) - } - async fn command(&mut self, cmd: &Command) -> CommandResult { match cmd { Command::Interval(_) => { self.metrics().await?; } - Command::FileCreated(_) => { + Command::WatchPath(_) => { self.software_info.boot_time = Some(Self::boot_time()?); self.report().await?; } @@ -182,7 +139,7 @@ impl Feature for SystemInfo { ) .await; } - _ => bail!("unexpected command"), + _ => bail!("unexpected command: {cmd:?}"), } Ok(None) @@ -193,7 +150,7 @@ impl SystemInfo { const SYSTEM_INFO_VERSION: u8 = 1; const ID: &'static str = "system_info"; - pub fn new() -> Result { + pub async fn new() -> Result { let azure_sdk_version = IotHubClient::sdk_version_string(); let omnect_device_service_version = env!("CARGO_PKG_VERSION").to_string(); @@ -211,13 +168,31 @@ impl SystemInfo { RootPartition::current()?.as_str() ); - let boot_time = if matches!(TIMESYNC_FILE.try_exists(), Ok(true)) { + let time_synced_file = Path::new(TIMESYNC_FILE); + let wd = add_fs_watch::( + time_synced_file + .parent() + .context("failed to get parent of TIMESYNC_FILE")?, + WatchMask::CREATE, + ) + .await?; + + let boot_time = if matches!(time_synced_file.try_exists(), Ok(true)) { + remove_fs_watch(wd).await?; Some(Self::boot_time()?) } else { - debug!("new: start timesync watcher since not synced yet"); None }; + let refresh_interval = env::var("REFRESH_SYSTEM_INFO_INTERVAL_SECS") + .unwrap_or("60".to_string()) + .parse::() + .context("cannot parse REFRESH_SYSTEM_INFO_INTERVAL_SECS env var")?; + + if 0 < refresh_interval { + add_notify_interval::(Duration::from_secs(refresh_interval)).await?; + } + let Some(hostname) = sysinfo::System::host_name() else { bail!("metrics: hostname could not be read") }; @@ -239,21 +214,13 @@ impl SystemInfo { .with_cpu(sysinfo::CpuRefreshKind::everything()) .with_memory(sysinfo::MemoryRefreshKind::everything()), ), - hostname, }, reboot_reason: reboot_reason::current_reboot_reason(), fleet_id: None, - handle: None, }) } - fn file_created_stream(&mut self) -> Result { - let (handle, stream) = file_created_stream::(vec![&TIMESYNC_FILE])?; - self.handle = Some(handle); - Ok(stream) - } - async fn report(&self) -> Result<()> { let value = serde_json::to_value(self).context("report: cannot serialize")?;