diff --git a/Cargo.lock b/Cargo.lock index dd5d2c029..e378bf739 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3766,12 +3766,9 @@ checksum = "56199f7ddabf13fe5074ce809e7d3f42b42ae711800501b5b16ea82ad029c39d" [[package]] name = "slab" -version = "0.4.9" +version = "0.4.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67" -dependencies = [ - "autocfg", -] +checksum = "04dc19736151f35336d325007ac991178d504a119863a2fcb3758cdb5e52c50d" [[package]] name = "smallvec" @@ -4496,7 +4493,7 @@ version = "1.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" dependencies = [ - "cfg-if 0.1.10", + "cfg-if 1.0.0", "static_assertions", ] @@ -5193,6 +5190,7 @@ dependencies = [ "rustc_version 0.4.1", "serde", "serde_json", + "slab", "socket2 0.5.7", "tokio", "tokio-util", diff --git a/Cargo.toml b/Cargo.toml index e7f5513d2..1ed09e632 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -163,6 +163,7 @@ serde = { version = "1.0.210", default-features = false, features = [ serde_json = "1.0.128" serde_with = "3.12.0" serde_yaml = "0.9.34" +slab = "0.4.10" static_init = "1.0.3" stabby = "36.1.1" sha3 = "0.10.8" diff --git a/zenoh-ext/src/advanced_subscriber.rs b/zenoh-ext/src/advanced_subscriber.rs index 96c43d3c1..a5a6828c2 100644 --- a/zenoh-ext/src/advanced_subscriber.rs +++ b/zenoh-ext/src/advanced_subscriber.rs @@ -23,7 +23,7 @@ use zenoh::{ ConsolidationMode, Parameters, Selector, TimeBound, TimeExpr, TimeRange, ZenohParameters, }, sample::{Locality, Sample, SampleKind, SourceSn}, - session::{EntityGlobalId, EntityId}, + session::{ClosingCallbackId, EntityGlobalId, EntityId}, Resolvable, Resolve, Session, Wait, KE_ADV_PREFIX, KE_EMPTY, KE_PUB, KE_STAR, KE_STARSTAR, KE_SUB, }; @@ -409,6 +409,15 @@ struct State { callback: Callback, miss_handlers: HashMap>, token: Option, + closing_callback_id: Option, +} + +impl Drop for State { + fn drop(&mut self) { + if let Some(id) = self.closing_callback_id.take() { + self.session.unregister_closing_callback(id); + } + } } #[zenoh_macros::unstable] @@ -683,8 +692,21 @@ impl AdvancedSubscriber { callback: callback.clone(), miss_handlers: HashMap::new(), token: None, + closing_callback_id: None, })); + let closing_callback_id = conf + .session + .register_closing_callback({ + let statesref = statesref.clone(); + move || { + let mut state = zlock!(statesref); + state.callback = Callback::new(Arc::new(|_| ())); + } + }) + .map_err(|_| "session closed")?; + zlock!(statesref).closing_callback_id = Some(closing_callback_id); + let sub_callback = { let statesref = statesref.clone(); let session = conf.session.clone(); diff --git a/zenoh/Cargo.toml b/zenoh/Cargo.toml index b9e3f595f..2bf4e75cb 100644 --- a/zenoh/Cargo.toml +++ b/zenoh/Cargo.toml @@ -101,6 +101,7 @@ rand = { workspace = true, features = ["default"] } ref-cast = { workspace = true } serde = { workspace = true, features = ["default"] } serde_json = { workspace = true } +slab = { workspace = true } socket2 = { workspace = true } uhlc = { workspace = true, features = ["default"] } vec_map = { workspace = true } diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index 834dfa02c..93ff5780d 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -156,6 +156,7 @@ pub(crate) struct SessionState { pub(crate) aggregated_subscribers: Vec, pub(crate) aggregated_publishers: Vec, pub(crate) publisher_qos_tree: KeBoxTree, + pub(crate) closing_callbacks: ClosingCallbackList, } impl SessionState { @@ -190,6 +191,7 @@ impl SessionState { aggregated_subscribers, aggregated_publishers, publisher_qos_tree, + closing_callbacks: Default::default(), } } } @@ -1249,6 +1251,63 @@ impl Session { source_info: SourceInfo::empty(), } } + + /// Registers a closing callback to the session. + /// + /// The callback will be called when the session will be closed. It returns an id to unregister + /// the callback with [`unregister_closing_callback`](Self::unregister_closing_callback). If + /// the session is already closed, the callback is returned in an error. + /// + /// Execution order of callbacks is unspecified. + /// + /// # Examples + /// + /// ```rust + /// # #[tokio::main] + /// # async fn main() { + /// + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); + /// if let Err(_) = session.register_closing_callback(|| println!("session closed")) { + /// println!("session already closed"); + /// } + /// # } + /// ``` + #[zenoh_macros::internal] + pub fn register_closing_callback( + &self, + callback: F, + ) -> Result { + let mut state = zwrite!(self.0.state); + if state.primitives().is_err() { + return Err(callback); + } + Ok(state.closing_callbacks.insert_callback(Box::new(callback))) + } + + /// Unregisters a closing callback. + /// + /// The callback must have been registered with + /// [`register_closing_callback`](Self::register_closing_callback). + /// It will no longer be called on session closing. + /// + /// # Examples + /// + /// ```rust + /// # #[tokio::main] + /// # async fn main() { + /// + /// let session = zenoh::open(zenoh::Config::default()).await.unwrap(); + /// let Ok(id) = session.register_closing_callback(|| println!("session closed")) else { + /// panic!("session already closed"); + /// }; + /// session.unregister_closing_callback(id); + /// # } + /// ``` + #[zenoh_macros::internal] + pub fn unregister_closing_callback(&self, callback_id: ClosingCallbackId) { + let mut state = zwrite!(self.0.state); + state.closing_callbacks.remove_callback(callback_id); + } } impl Session { @@ -3213,6 +3272,7 @@ impl Closee for Arc { // will be stabilized. let mut state = zwrite!(self.state); let _matching_listeners = std::mem::take(&mut state.matching_listeners); + let _closing_callbacks = std::mem::take(&mut state.closing_callbacks); drop(state); } } @@ -3225,3 +3285,49 @@ impl Closeable for Session { self.0.clone() } } + +#[derive(Default)] +pub(crate) struct ClosingCallbackList { + callbacks: slab::Slab, + generation: usize, +} + +impl ClosingCallbackList { + fn insert_callback(&mut self, callback: Box) -> ClosingCallbackId { + let generation = self.generation; + let index = self.callbacks.insert(ClosingCallback { + callback, + generation, + }); + self.generation = self.generation.wrapping_add(1); + ClosingCallbackId { index, generation } + } + + fn remove_callback(&mut self, id: ClosingCallbackId) { + if matches!( self.callbacks.get(id.index), Some(cb) if cb.generation == id.generation) { + self.callbacks.remove(id.index); + } + } +} + +impl Drop for ClosingCallbackList { + fn drop(&mut self) { + for cb in self.callbacks.drain() { + (cb.callback)(); + } + } +} + +struct ClosingCallback { + callback: Box, + generation: usize, +} + +/// The id of a registered session closing callback. +/// +/// See [`Session::register_closing_callback`]. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ClosingCallbackId { + index: usize, + generation: usize, +} diff --git a/zenoh/src/lib.rs b/zenoh/src/lib.rs index cb5e0d0f0..f152eb122 100644 --- a/zenoh/src/lib.rs +++ b/zenoh/src/lib.rs @@ -208,6 +208,8 @@ pub mod session { #[zenoh_macros::internal] pub use crate::api::builders::session::{init, InitBuilder}; + #[zenoh_macros::internal] + pub use crate::api::session::ClosingCallbackId; pub use crate::api::{ builders::{ close::CloseBuilder,