-
Notifications
You must be signed in to change notification settings - Fork 0
feat(daemon): power guard + network watchdog with session auto-resume #62
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -0,0 +1,164 @@ | ||||||
| //! Background network connectivity monitor. | ||||||
| //! Detects drops/recoveries and logs which agents were affected. | ||||||
|
|
||||||
| use std::sync::atomic::{AtomicBool, Ordering}; | ||||||
| use std::time::Duration; | ||||||
|
|
||||||
| use r2d2::Pool; | ||||||
| use r2d2_sqlite::SqliteConnectionManager; | ||||||
|
|
||||||
| static NETWORK_UP: AtomicBool = AtomicBool::new(true); | ||||||
|
|
||||||
| /// Returns current network status (non-blocking). | ||||||
| pub fn is_network_up() -> bool { | ||||||
| NETWORK_UP.load(Ordering::Relaxed) | ||||||
| } | ||||||
|
|
||||||
| /// Spawn as a tokio background task at daemon boot. | ||||||
| /// Checks connectivity every 30s and logs agent impact on drop/recovery. | ||||||
| pub async fn run_watchdog(pool: Pool<SqliteConnectionManager>) { | ||||||
| let mut was_up = true; | ||||||
| let mut lost_agents: Vec<String> = Vec::new(); | ||||||
|
|
||||||
| loop { | ||||||
| tokio::time::sleep(Duration::from_secs(30)).await; | ||||||
| let up = check_connectivity().await; | ||||||
| NETWORK_UP.store(up, Ordering::Relaxed); | ||||||
|
|
||||||
| if was_up && !up { | ||||||
| tracing::warn!("network_watchdog: connectivity lost"); | ||||||
| lost_agents = get_active_agents(&pool); | ||||||
| tracing::warn!( | ||||||
| "network_watchdog: {} agents were active: {:?}", | ||||||
| lost_agents.len(), | ||||||
| lost_agents | ||||||
| ); | ||||||
| } | ||||||
|
|
||||||
| if !was_up && up { | ||||||
| tracing::info!("network_watchdog: connectivity restored"); | ||||||
| log_recovery_impact(&pool, &mut lost_agents); | ||||||
| } | ||||||
|
|
||||||
| was_up = up; | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| fn log_recovery_impact( | ||||||
| pool: &Pool<SqliteConnectionManager>, | ||||||
| lost_agents: &mut Vec<String>, | ||||||
| ) { | ||||||
| if lost_agents.is_empty() { | ||||||
| return; | ||||||
| } | ||||||
| let current = get_active_agents(pool); | ||||||
| let missing: Vec<_> = lost_agents | ||||||
| .iter() | ||||||
| .filter(|a| !current.contains(a)) | ||||||
| .cloned() | ||||||
| .collect(); | ||||||
| if !missing.is_empty() { | ||||||
| tracing::warn!( | ||||||
| "network_watchdog: {} agents lost during outage: {:?}", | ||||||
| missing.len(), | ||||||
| missing | ||||||
| ); | ||||||
| respawn_copilot_sessions(&missing); | ||||||
| } | ||||||
| lost_agents.clear(); | ||||||
| } | ||||||
|
|
||||||
| /// Attempt to resume dead Copilot CLI sessions. | ||||||
| /// Scans ~/.copilot/session-state/ for recent sessions and relaunches them. | ||||||
| fn respawn_copilot_sessions(missing_agents: &[String]) { | ||||||
| let copilot_agents: Vec<_> = missing_agents | ||||||
| .iter() | ||||||
| .filter(|a| a.contains("copilot")) | ||||||
| .collect(); | ||||||
| if copilot_agents.is_empty() { | ||||||
| return; | ||||||
| } | ||||||
|
|
||||||
| // Find recent session IDs from session-state directory | ||||||
| let session_dir = dirs::home_dir() | ||||||
| .map(|h| h.join(".copilot/session-state")) | ||||||
| .unwrap_or_default(); | ||||||
| if !session_dir.exists() { | ||||||
| tracing::debug!("network_watchdog: no session-state dir found"); | ||||||
| return; | ||||||
| } | ||||||
|
|
||||||
| // Get most recent sessions (by modification time) | ||||||
| let mut sessions: Vec<_> = match std::fs::read_dir(&session_dir) { | ||||||
| Ok(entries) => entries | ||||||
| .filter_map(Result::ok) | ||||||
| .filter(|e| e.file_type().map(|t| t.is_dir()).unwrap_or(false)) | ||||||
| .filter_map(|e| { | ||||||
| let mtime = e.metadata().ok()?.modified().ok()?; | ||||||
| Some((e.file_name().to_string_lossy().to_string(), mtime)) | ||||||
| }) | ||||||
| .collect(), | ||||||
| Err(_) => return, | ||||||
| }; | ||||||
| sessions.sort_by(|a, b| b.1.cmp(&a.1)); | ||||||
|
|
||||||
| // Resume up to N most recent sessions (one per lost copilot agent) | ||||||
| let to_resume = sessions.iter().take(copilot_agents.len()); | ||||||
| for (session_id, _) in to_resume { | ||||||
| tracing::info!( | ||||||
| "network_watchdog: resuming copilot session {session_id}" | ||||||
| ); | ||||||
| let resume_arg = format!("--resume={session_id}"); | ||||||
| match std::process::Command::new("copilot") | ||||||
| .args([&resume_arg, "--allow-all-tools"]) | ||||||
| .stdout(std::process::Stdio::null()) | ||||||
| .stderr(std::process::Stdio::null()) | ||||||
| .spawn() | ||||||
| { | ||||||
| Ok(child) => tracing::info!( | ||||||
| "network_watchdog: copilot resumed pid={}", child.id() | ||||||
| ), | ||||||
| Err(e) => tracing::warn!( | ||||||
| "network_watchdog: copilot resume failed: {e}" | ||||||
| ), | ||||||
| } | ||||||
|
Roberdan marked this conversation as resolved.
|
||||||
| } | ||||||
| } | ||||||
|
|
||||||
| async fn check_connectivity() -> bool { | ||||||
| let client = reqwest::Client::builder() | ||||||
| .timeout(Duration::from_secs(5)) | ||||||
| .build() | ||||||
| .unwrap_or_default(); | ||||||
| client | ||||||
| .get("https://api.github.com/zen") | ||||||
| .send() | ||||||
| .await | ||||||
| .map(|r| r.status().is_success()) | ||||||
| .unwrap_or(false) | ||||||
|
Roberdan marked this conversation as resolved.
|
||||||
| } | ||||||
|
|
||||||
| fn get_active_agents(pool: &Pool<SqliteConnectionManager>) -> Vec<String> { | ||||||
| let conn = match pool.get() { | ||||||
| Ok(c) => c, | ||||||
| Err(e) => { | ||||||
| tracing::warn!("network_watchdog: pool error: {e}"); | ||||||
| return Vec::new(); | ||||||
| } | ||||||
| }; | ||||||
| let mut stmt = match conn.prepare( | ||||||
| "SELECT name FROM ipc_agents \ | ||||||
| WHERE last_seen >= datetime('now', '-10 minutes')", | ||||||
|
||||||
| WHERE last_seen >= datetime('now', '-10 minutes')", | |
| WHERE last_seen >= strftime('%Y-%m-%dT%H:%M:%f','now','-10 minutes')", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NEEDS FIX — Confirmed. last_seen is stored as strftime('%Y-%m-%dT%H:%M:%f','now') (ISO 8601 with T separator) but the watchdog query uses datetime('now', '-10 minutes') which produces space-separated format (YYYY-MM-DD HH:MM:SS). Lexicographic comparison between these formats gives incorrect results — T > space in ASCII, so the T-format timestamps will always appear "greater", making all same-day agents appear active regardless of actual last_seen time.
Fix: Change the query to use strftime('%Y-%m-%dT%H:%M:%f','now','-10 minutes') to match the storage format.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,180 @@ | ||
| //! Platform-specific sleep prevention while agents are active. | ||
| //! Spawns an OS sleep inhibitor when the first agent registers, | ||
| //! kills it when the last deregisters. | ||
|
|
||
| use std::process::{Child, Command}; | ||
| use std::sync::{Mutex, OnceLock}; | ||
|
|
||
| use serde::Serialize; | ||
|
|
||
| static GUARD: OnceLock<Mutex<PowerGuardInner>> = OnceLock::new(); | ||
|
|
||
| fn guard() -> &'static Mutex<PowerGuardInner> { | ||
| GUARD.get_or_init(|| Mutex::new(PowerGuardInner::new())) | ||
| } | ||
|
|
||
| struct PowerGuardInner { | ||
| process: Option<Child>, | ||
| agent_count: u32, | ||
| } | ||
|
|
||
| impl PowerGuardInner { | ||
| fn new() -> Self { | ||
| Self { process: None, agent_count: 0 } | ||
| } | ||
| } | ||
|
|
||
| impl Drop for PowerGuardInner { | ||
| fn drop(&mut self) { | ||
| if let Some(mut child) = self.process.take() { | ||
| let _ = child.kill(); | ||
| let _ = child.wait(); | ||
| tracing::info!("power_guard: inhibitor killed on drop"); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| #[derive(Debug, Clone, Serialize)] | ||
| pub struct PowerGuardStatus { | ||
| pub active: bool, | ||
| pub agent_count: u32, | ||
| pub platform: &'static str, | ||
| } | ||
|
|
||
| pub struct PowerGuard; | ||
|
|
||
| impl PowerGuard { | ||
| /// Call when an agent registers. | ||
| pub fn acquire() { | ||
| let mut g = guard().lock().expect("power_guard lock poisoned"); | ||
| g.agent_count += 1; | ||
| if g.agent_count == 1 && g.process.is_none() { | ||
| g.process = spawn_inhibitor(); | ||
| if g.process.is_some() { | ||
| tracing::info!( | ||
| "power_guard: sleep inhibitor started (agents active)" | ||
| ); | ||
| } | ||
| } | ||
|
Roberdan marked this conversation as resolved.
|
||
| } | ||
|
|
||
| /// Call when an agent deregisters. | ||
| pub fn release() { | ||
| let mut g = guard().lock().expect("power_guard lock poisoned"); | ||
| g.agent_count = g.agent_count.saturating_sub(1); | ||
| if g.agent_count == 0 { | ||
| if let Some(mut child) = g.process.take() { | ||
| let _ = child.kill(); | ||
| let _ = child.wait(); | ||
| tracing::info!( | ||
| "power_guard: sleep inhibitor stopped (no agents)" | ||
| ); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /// Current status for API/diagnostics. | ||
| pub fn status() -> PowerGuardStatus { | ||
| let g = guard().lock().expect("power_guard lock poisoned"); | ||
| PowerGuardStatus { | ||
| active: g.process.is_some(), | ||
| agent_count: g.agent_count, | ||
| platform: current_platform(), | ||
| } | ||
| } | ||
|
|
||
| /// Reset internal state for test isolation. | ||
| #[cfg(test)] | ||
| fn reset() { | ||
| let mut g = guard().lock().expect("power_guard lock poisoned"); | ||
| if let Some(mut child) = g.process.take() { | ||
| let _ = child.kill(); | ||
| let _ = child.wait(); | ||
| } | ||
| g.agent_count = 0; | ||
| } | ||
| } | ||
|
|
||
| fn current_platform() -> &'static str { | ||
| #[cfg(target_os = "macos")] | ||
| { "macos" } | ||
| #[cfg(target_os = "linux")] | ||
| { "linux" } | ||
| #[cfg(not(any(target_os = "macos", target_os = "linux")))] | ||
| { "unsupported" } | ||
| } | ||
|
|
||
| #[cfg(target_os = "macos")] | ||
| fn spawn_inhibitor() -> Option<Child> { | ||
| Command::new("caffeinate") | ||
| .args(["-i", "-d"]) | ||
| .spawn() | ||
| .map_err(|e| tracing::warn!("power_guard: caffeinate failed: {e}")) | ||
| .ok() | ||
| } | ||
|
|
||
| #[cfg(target_os = "linux")] | ||
| fn spawn_inhibitor() -> Option<Child> { | ||
| Command::new("systemd-inhibit") | ||
| .args([ | ||
| "--what=idle:sleep", | ||
| "--who=convergio-daemon", | ||
| "--why=Active agents", | ||
| "--mode=block", | ||
| "sleep", | ||
| "infinity", | ||
| ]) | ||
| .spawn() | ||
| .map_err(|e| { | ||
| tracing::warn!("power_guard: systemd-inhibit failed: {e}") | ||
| }) | ||
| .ok() | ||
| } | ||
|
|
||
| #[cfg(not(any(target_os = "macos", target_os = "linux")))] | ||
| fn spawn_inhibitor() -> Option<Child> { | ||
| tracing::debug!( | ||
| "power_guard: no inhibitor available on this platform" | ||
| ); | ||
| None | ||
| } | ||
|
|
||
| #[cfg(test)] | ||
| mod tests { | ||
| use super::*; | ||
|
|
||
| #[test] | ||
| fn acquire_release_counting() { | ||
| PowerGuard::reset(); | ||
| let s = PowerGuard::status(); | ||
| assert_eq!(s.agent_count, 0); | ||
| assert!(!s.active); | ||
|
|
||
| PowerGuard::acquire(); | ||
| let s = PowerGuard::status(); | ||
| assert_eq!(s.agent_count, 1); | ||
| // On macOS caffeinate should start | ||
| #[cfg(target_os = "macos")] | ||
| assert!(s.active); | ||
|
|
||
| PowerGuard::acquire(); | ||
| assert_eq!(PowerGuard::status().agent_count, 2); | ||
|
|
||
| PowerGuard::release(); | ||
| assert_eq!(PowerGuard::status().agent_count, 1); | ||
|
|
||
| PowerGuard::release(); | ||
| let s = PowerGuard::status(); | ||
| assert_eq!(s.agent_count, 0); | ||
| assert!(!s.active); | ||
| } | ||
|
|
||
| #[test] | ||
| fn release_saturates_at_zero() { | ||
| PowerGuard::reset(); | ||
| PowerGuard::release(); | ||
| PowerGuard::release(); | ||
| assert_eq!(PowerGuard::status().agent_count, 0); | ||
| assert!(!PowerGuard::status().active); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -66,6 +66,8 @@ pub async fn api_ipc_agents_register( | |||||||||
| tracing::debug!("ws agent_registered broadcast (no subscribers): {e}"); | ||||||||||
| } | ||||||||||
|
|
||||||||||
| crate::power_guard::PowerGuard::acquire(); | ||||||||||
|
|
||||||||||
| // Push live agent list + session state to brain viz | ||||||||||
| broadcast_brain_agent_update(&state); | ||||||||||
| broadcast_brain_session_update(&state); | ||||||||||
|
|
@@ -85,6 +87,8 @@ pub async fn api_ipc_agents_unregister( | |||||||||
| ) | ||||||||||
| .map_err(|e| ApiError::internal(format!("agent unregister failed: {e}")))?; | ||||||||||
|
|
||||||||||
| crate::power_guard::PowerGuard::release(); | ||||||||||
|
|
||||||||||
| if let Err(e) = state.ws_tx.send(json!({ | ||||||||||
| "type": "agent_unregistered", | ||||||||||
| "agent_id": body.agent_id, | ||||||||||
|
|
@@ -141,6 +145,8 @@ pub async fn api_ipc_agents_deregister( | |||||||||
| ) | ||||||||||
| .map_err(|e| ApiError::internal(format!("agent deregister failed: {e}")))?; | ||||||||||
|
|
||||||||||
| crate::power_guard::PowerGuard::release(); | ||||||||||
|
||||||||||
| crate::power_guard::PowerGuard::release(); | |
| for _ in 0..deleted { | |
| crate::power_guard::PowerGuard::release(); | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NEEDS FIX — Confirmed. api_ipc_agents_deregister deletes all hosts for a name (could be >1 row) but calls PowerGuard::release() only once. The deleted count is already captured but not used for release.
Fix: Call release() in a loop for _ in 0..deleted as suggested, or (preferred) refactor PowerGuard to sync count from DB rather than increment/decrement.
Uh oh!
There was an error while loading. Please reload this page.