Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions litebox/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ windows-sys = { version = "0.60.2", features = [
lock_tracing = ["dep:arrayvec", "spin/mutex"]
panic_on_unclosed_fd_drop = []
enforce_singleton_litebox_instance = []
trace_fs = []

[lints]
workspace = true
Expand Down
14 changes: 14 additions & 0 deletions litebox/src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,20 @@ pub trait IOPollable {
/// calls are what notify observers. This particular function itself however _may_ be used to
/// essentially get "the current status" of events for the system.
fn check_io_events(&self) -> Events;

/// Returns `true` if this pollable cannot deliver asynchronous observer
/// notifications (e.g. host-backed stdin). Callers should use periodic
/// polling instead of blocking indefinitely on observer wakeups.
fn needs_host_poll(&self) -> bool {
false
}

/// Returns `true` if reads on this pollable should block when no data is
/// available. Returns `false` for fds whose callers use epoll/poll and
/// expect EAGAIN to be returned immediately (e.g. PTY master).
fn should_block_read(&self) -> bool {
true
}
}

impl<T: IOPollable> IOPollable for alloc::sync::Arc<T> {
Expand Down
64 changes: 64 additions & 0 deletions litebox/src/event/observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,21 @@ impl<E, F: EventsFilter<E>, Platform: RawSyncPrimitivesProvider> Subject<E, F, P
}
}

fn prune_dead_observers(&self, observers: &mut BTreeMap<ObserverKey<E>, F>) {
observers.retain(|observer, _| {
if observer.upgrade().is_some() {
true
} else {
self.nums.fetch_sub(1, Ordering::Relaxed);
false
}
});
}

/// Register an observer with the given filter.
pub fn register_observer(&self, observer: Weak<dyn Observer<E>>, filter: F) {
let mut observers = self.observers.lock();
self.prune_dead_observers(&mut observers);
if observers
.insert(ObserverKey::new(observer), filter)
.is_none()
Expand Down Expand Up @@ -132,3 +144,55 @@ impl<E, F: EventsFilter<E>, Platform: RawSyncPrimitivesProvider> Subject<E, F, P
});
}
}

#[cfg(test)]
mod tests {
use alloc::sync::Arc;
use core::sync::atomic::{AtomicUsize, Ordering};

use super::{Observer, Subject};
use crate::{event::Events, platform::mock::MockPlatform};

struct TestObserver {
notifications: AtomicUsize,
}

impl Observer<Events> for TestObserver {
fn on_events(&self, _events: &Events) {
self.notifications.fetch_add(1, Ordering::Relaxed);
}
}

#[test]
fn register_observer_prunes_dead_entries() {
let subject = Subject::<Events, Events, MockPlatform>::new();

let stale = Arc::new(TestObserver {
notifications: AtomicUsize::new(0),
});
subject.register_observer(Arc::downgrade(&stale) as _, Events::IN);
assert_eq!(subject.nums.load(Ordering::Relaxed), 1);
assert_eq!(subject.observers.lock().len(), 1);
drop(stale);

let fresh = Arc::new(TestObserver {
notifications: AtomicUsize::new(0),
});
subject.register_observer(Arc::downgrade(&fresh) as _, Events::OUT);
{
let observers = subject.observers.lock();
let registered = observers
.keys()
.next()
.and_then(super::ObserverKey::upgrade)
.expect("dead observer should be pruned during registration");
let fresh_observer: Arc<dyn Observer<Events>> = fresh.clone();
assert!(Arc::ptr_eq(&registered, &fresh_observer));
assert_eq!(subject.nums.load(Ordering::Relaxed), 1);
assert_eq!(observers.len(), 1);
}
subject.notify_observers(Events::OUT);

assert_eq!(fresh.notifications.load(Ordering::Relaxed), 1);
}
}
Loading
Loading