Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 77 additions & 9 deletions litebox/src/event/observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,28 @@ impl<E, F: EventsFilter<E>, Platform: RawSyncPrimitivesProvider> Subject<E, F, P
}
}

/// Remove entries whose weak references are no longer alive, decrementing
/// `nums` for each pruned entry.
///
/// Called under the lock during registration to eagerly reclaim stale
/// entries from observers that were dropped without being explicitly
/// unregistered. `notify_observers` also calls this to prune before
/// dispatching events.
fn prune_dead_observers(&self, observers: &mut BTreeMap<ObserverKey<E>, F>) {
observers.retain(|observer, _| {
if observer.upgrade().is_some() {
true
} else {
self.nums.fetch_sub(1, Ordering::Relaxed);
false
}
});
}

/// Register an observer with the given filter.
pub fn register_observer(&self, observer: Weak<dyn Observer<E>>, filter: F) {
let mut observers = self.observers.lock();
self.prune_dead_observers(&mut observers);
if observers
.insert(ObserverKey::new(observer), filter)
.is_none()
Expand All @@ -119,16 +138,65 @@ impl<E, F: EventsFilter<E>, Platform: RawSyncPrimitivesProvider> Subject<E, F, P
}

let mut observers = self.observers.lock();
observers.retain(|observer, filter| {
if let Some(observer) = observer.upgrade() {
if filter.filter(&events) {
observer.on_events(&events);
}
true
} else {
self.nums.fetch_sub(1, Ordering::Relaxed);
false
self.prune_dead_observers(&mut observers);
for (observer, filter) in observers.iter() {
if let Some(observer) = observer.upgrade()
&& filter.filter(&events)
{
observer.on_events(&events);
}
}
}
}

#[cfg(test)]
mod tests {
use alloc::sync::Arc;
use core::sync::atomic::{AtomicUsize, Ordering};

use super::{Observer, Subject};
use crate::{event::Events, platform::mock::MockPlatform};

struct TestObserver {
notifications: AtomicUsize,
}

impl Observer<Events> for TestObserver {
fn on_events(&self, _events: &Events) {
self.notifications.fetch_add(1, Ordering::Relaxed);
}
}

#[test]
fn register_observer_prunes_dead_entries() {
let subject = Subject::<Events, Events, MockPlatform>::new();

let stale = Arc::new(TestObserver {
notifications: AtomicUsize::new(0),
});
subject.register_observer(Arc::downgrade(&stale) as _, Events::IN);
assert_eq!(subject.nums.load(Ordering::Relaxed), 1);
assert_eq!(subject.observers.lock().len(), 1);
drop(stale);

let fresh = Arc::new(TestObserver {
notifications: AtomicUsize::new(0),
});
subject.register_observer(Arc::downgrade(&fresh) as _, Events::OUT);
{
let observers = subject.observers.lock();
let registered = observers
.keys()
.next()
.and_then(super::ObserverKey::upgrade)
.expect("dead observer should be pruned during registration");
let fresh_observer: Arc<dyn Observer<Events>> = fresh.clone();
assert!(Arc::ptr_eq(&registered, &fresh_observer));
assert_eq!(subject.nums.load(Ordering::Relaxed), 1);
assert_eq!(observers.len(), 1);
}
subject.notify_observers(Events::OUT);

assert_eq!(fresh.notifications.load(Ordering::Relaxed), 1);
}
}
Loading