From da0dacddf61f45771555bfc4781caf6c9d5e2fb5 Mon Sep 17 00:00:00 2001 From: Esteban Dimitroff Hodi Date: Mon, 30 Jun 2025 17:51:24 -0300 Subject: [PATCH 01/34] First attempt for stream_listener --- .tool-versions | 2 +- Cargo.lock | 12 ++++++++++++ concurrency/src/tasks/mod.rs | 2 ++ concurrency/src/tasks/stream.rs | 32 ++++++++++++++++++++++++++++++++ rt/Cargo.toml | 1 + 5 files changed, 48 insertions(+), 1 deletion(-) create mode 100644 concurrency/src/tasks/stream.rs diff --git a/.tool-versions b/.tool-versions index ee114d2..35b639f 100644 --- a/.tool-versions +++ b/.tool-versions @@ -1 +1 @@ -rust 1.85.1 +rust 1.88.0 diff --git a/Cargo.lock b/Cargo.lock index 9d6c50d..185fc76 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1210,6 +1210,7 @@ version = "0.1.0" dependencies = [ "crossbeam", "tokio", + "tokio-stream", "tokio-util", "tracing", "tracing-subscriber", @@ -1342,6 +1343,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-stream" +version = "0.1.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eca58d7bba4a75707817a2c44174253f9236b2d5fbd055602e9d5c07c139a047" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-util" version = "0.7.15" diff --git a/concurrency/src/tasks/mod.rs b/concurrency/src/tasks/mod.rs index 54f35fa..446f67b 100644 --- a/concurrency/src/tasks/mod.rs +++ b/concurrency/src/tasks/mod.rs @@ -4,10 +4,12 @@ mod gen_server; mod process; mod time; +mod stream; #[cfg(test)] mod timer_tests; pub use gen_server::{CallResponse, CastResponse, GenServer, GenServerHandle, GenServerInMsg}; pub use process::{send, Process, ProcessInfo}; +pub use stream::{spawn_listener}; pub use time::{send_after, send_interval}; diff --git a/concurrency/src/tasks/stream.rs b/concurrency/src/tasks/stream.rs new file mode 100644 index 0000000..6fbbdd3 --- /dev/null +++ b/concurrency/src/tasks/stream.rs @@ -0,0 +1,32 @@ +use futures::{Stream, StreamExt}; + +use crate::tasks::{GenServer, GenServerHandle}; + +pub fn spawn_listener( + mut handle: GenServerHandle, + message_builder: F, + mut stream: S) +where + T: GenServer + 'static, + F: Fn(I) -> T::CastMsg + Send + 'static, + I: Send, + S: Unpin + Send + Stream> + 'static, +{ + spawned_rt::tasks::spawn(async move { + loop { + match stream.next().await { + Some(Ok(item)) => { + let _ = handle.cast(message_builder(item)).await; + } + Some(Err(e)) => { + tracing::trace!("Received Error in msg {e:?}"); + break; + } + None => { + tracing::trace!("Stream finnished"); + break; + }, + } + } + }); +} \ No newline at end of file diff --git a/rt/Cargo.toml b/rt/Cargo.toml index b4fbbcf..9527b0d 100644 --- a/rt/Cargo.toml +++ b/rt/Cargo.toml @@ -6,6 +6,7 @@ edition = "2021" [dependencies] tokio = { version = "1", features = ["full"] } tokio-util = { version = "0.7.15" } +tokio-stream = { version = "0.1.17" } crossbeam = { version = "0.7.3" } tracing = { workspace = true } tracing-subscriber = { workspace = true } From c15cd54be3b5555c3cf0f4614beb9d6e7f47e436 Mon Sep 17 00:00:00 2001 From: Juan Munoz Date: Tue, 1 Jul 2025 17:14:58 -0300 Subject: [PATCH 02/34] add test to stream listener --- Cargo.lock | 1 + concurrency/Cargo.toml | 5 +- concurrency/src/tasks/mod.rs | 6 ++- concurrency/src/tasks/stream_test.rs | 74 ++++++++++++++++++++++++++++ 4 files changed, 83 insertions(+), 3 deletions(-) create mode 100644 concurrency/src/tasks/stream_test.rs diff --git a/Cargo.lock b/Cargo.lock index 185fc76..5b07b5c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1201,6 +1201,7 @@ version = "0.1.0" dependencies = [ "futures", "spawned-rt", + "tokio-stream", "tracing", ] diff --git a/concurrency/Cargo.toml b/concurrency/Cargo.toml index 57c225d..824a7ee 100644 --- a/concurrency/Cargo.toml +++ b/concurrency/Cargo.toml @@ -8,5 +8,8 @@ spawned-rt = { workspace = true } tracing = { workspace = true } futures = "0.3.1" +[dev-dependencies] +tokio-stream = { version = "0.1.17" } + [lib] -path = "./src/lib.rs" \ No newline at end of file +path = "./src/lib.rs" diff --git a/concurrency/src/tasks/mod.rs b/concurrency/src/tasks/mod.rs index 446f67b..3448286 100644 --- a/concurrency/src/tasks/mod.rs +++ b/concurrency/src/tasks/mod.rs @@ -3,13 +3,15 @@ mod gen_server; mod process; -mod time; mod stream; +mod time; +#[cfg(test)] +mod stream_test; #[cfg(test)] mod timer_tests; pub use gen_server::{CallResponse, CastResponse, GenServer, GenServerHandle, GenServerInMsg}; pub use process::{send, Process, ProcessInfo}; -pub use stream::{spawn_listener}; +pub use stream::spawn_listener; pub use time::{send_after, send_interval}; diff --git a/concurrency/src/tasks/stream_test.rs b/concurrency/src/tasks/stream_test.rs new file mode 100644 index 0000000..0617ab6 --- /dev/null +++ b/concurrency/src/tasks/stream_test.rs @@ -0,0 +1,74 @@ +use std::time::Duration; + +use spawned_rt::tasks::{self as rt}; + +use crate::tasks::{spawn_listener, CallResponse, CastResponse, GenServer, GenServerHandle}; + +type SummatoryHandle = GenServerHandle; + +struct Summatory; + +type SummatoryState = u16; +type SummatoryCastMessage = SummatoryState; +type SummatoryOutMessage = SummatoryState; + +impl Summatory { + pub async fn get_value(server: &mut SummatoryHandle) -> Result { + server.call(()).await.map_err(|_| ()) + } +} + +impl GenServer for Summatory { + type CallMsg = (); // We only handle one type of call, so there is no need for a specific message type. + type CastMsg = SummatoryCastMessage; + type OutMsg = SummatoryOutMessage; + type State = SummatoryState; + type Error = (); + + fn new() -> Self { + Self + } + + async fn handle_cast( + &mut self, + message: Self::CastMsg, + _handle: &GenServerHandle, + state: Self::State, + ) -> CastResponse { + let new_state = state + message; + CastResponse::NoReply(new_state) + } + + async fn handle_call( + &mut self, + _message: Self::CallMsg, + _handle: &SummatoryHandle, + state: Self::State, + ) -> CallResponse { + let current_value = state; + CallResponse::Reply(state, current_value) + } +} + +// In this example, the stream sends u8 values, which are converted to the type +// supported by the GenServer (SummatoryCastMessage / u16). +fn message_builder(value: u8) -> SummatoryCastMessage { + value.into() +} + +#[test] +pub fn test_sum_numbers_from_stream() { + let runtime = rt::Runtime::new().unwrap(); + runtime.block_on(async move { + let mut summatory_handle = Summatory::start(0); + let stream = tokio_stream::iter(vec![1u8, 2, 3, 4, 5].into_iter().map(Ok)); + + spawn_listener(summatory_handle.clone(), message_builder, stream); + + // Wait for 1 second so the whole stream is processed + rt::sleep(Duration::from_secs(1)).await; + + let val = Summatory::get_value(&mut summatory_handle).await.unwrap(); + assert_eq!(val, 15); + }) +} From cfca33455905e13ff304d4a586c62478f5d839d7 Mon Sep 17 00:00:00 2001 From: Juan Munoz Date: Wed, 2 Jul 2025 10:53:49 -0300 Subject: [PATCH 03/34] add util fn to convert receiver to stream --- Cargo.lock | 1 + concurrency/src/tasks/mod.rs | 2 +- concurrency/src/tasks/stream.rs | 20 ++++++++++++----- concurrency/src/tasks/stream_test.rs | 33 +++++++++++++++++++++++++++- rt/Cargo.toml | 4 ++-- rt/src/tasks/mod.rs | 1 + rt/src/tasks/tokio/mod.rs | 1 + 7 files changed, 52 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5b07b5c..2821b79 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1353,6 +1353,7 @@ dependencies = [ "futures-core", "pin-project-lite", "tokio", + "tokio-util", ] [[package]] diff --git a/concurrency/src/tasks/mod.rs b/concurrency/src/tasks/mod.rs index 3448286..c1358d1 100644 --- a/concurrency/src/tasks/mod.rs +++ b/concurrency/src/tasks/mod.rs @@ -13,5 +13,5 @@ mod timer_tests; pub use gen_server::{CallResponse, CastResponse, GenServer, GenServerHandle, GenServerInMsg}; pub use process::{send, Process, ProcessInfo}; -pub use stream::spawn_listener; +pub use stream::{spawn_listener, unbounded_receiver_to_stream}; pub use time::{send_after, send_interval}; diff --git a/concurrency/src/tasks/stream.rs b/concurrency/src/tasks/stream.rs index 6fbbdd3..ad12203 100644 --- a/concurrency/src/tasks/stream.rs +++ b/concurrency/src/tasks/stream.rs @@ -1,11 +1,19 @@ use futures::{Stream, StreamExt}; +use spawned_rt::tasks::{mpsc::Receiver, UnboundedReceiverStream}; use crate::tasks::{GenServer, GenServerHandle}; -pub fn spawn_listener( - mut handle: GenServerHandle, - message_builder: F, - mut stream: S) +/// Converts an unbounded receiver into a stream that can be used with async tasks +/// This function is useful for integrating with the `spawn_listener` function +pub fn unbounded_receiver_to_stream(receiver: Receiver) -> UnboundedReceiverStream +where + T: 'static + Clone + Send, +{ + UnboundedReceiverStream::new(receiver) +} + +/// Spawns a listener that listens to a stream and sends messages to a GenServer +pub fn spawn_listener(mut handle: GenServerHandle, message_builder: F, mut stream: S) where T: GenServer + 'static, F: Fn(I) -> T::CastMsg + Send + 'static, @@ -25,8 +33,8 @@ where None => { tracing::trace!("Stream finnished"); break; - }, + } } } }); -} \ No newline at end of file +} diff --git a/concurrency/src/tasks/stream_test.rs b/concurrency/src/tasks/stream_test.rs index 0617ab6..3192382 100644 --- a/concurrency/src/tasks/stream_test.rs +++ b/concurrency/src/tasks/stream_test.rs @@ -2,7 +2,10 @@ use std::time::Duration; use spawned_rt::tasks::{self as rt}; -use crate::tasks::{spawn_listener, CallResponse, CastResponse, GenServer, GenServerHandle}; +use crate::tasks::{ + spawn_listener, unbounded_receiver_to_stream, CallResponse, CastResponse, GenServer, + GenServerHandle, +}; type SummatoryHandle = GenServerHandle; @@ -72,3 +75,31 @@ pub fn test_sum_numbers_from_stream() { assert_eq!(val, 15); }) } + +#[test] +pub fn test_sum_numbers_from_channel() { + let runtime = rt::Runtime::new().unwrap(); + runtime.block_on(async move { + let mut summatory_handle = Summatory::start(0); + let (tx, rx) = spawned_rt::tasks::mpsc::channel(); + + // Spawn a task to send numbers to the channel + spawned_rt::tasks::spawn(async move { + for i in 1..=5 { + tx.send(Ok(i)).unwrap(); + } + }); + + spawn_listener( + summatory_handle.clone(), + message_builder, + unbounded_receiver_to_stream(rx), + ); + + // Wait for 1 second so the whole stream is processed + rt::sleep(Duration::from_secs(1)).await; + + let val = Summatory::get_value(&mut summatory_handle).await.unwrap(); + assert_eq!(val, 15); + }) +} diff --git a/rt/Cargo.toml b/rt/Cargo.toml index 9527b0d..b3317ac 100644 --- a/rt/Cargo.toml +++ b/rt/Cargo.toml @@ -6,10 +6,10 @@ edition = "2021" [dependencies] tokio = { version = "1", features = ["full"] } tokio-util = { version = "0.7.15" } -tokio-stream = { version = "0.1.17" } +tokio-stream = { version = "0.1.17", features = ["sync"] } crossbeam = { version = "0.7.3" } tracing = { workspace = true } tracing-subscriber = { workspace = true } [lib] -path = "./src/lib.rs" \ No newline at end of file +path = "./src/lib.rs" diff --git a/rt/src/tasks/mod.rs b/rt/src/tasks/mod.rs index 7508d43..11c9d79 100644 --- a/rt/src/tasks/mod.rs +++ b/rt/src/tasks/mod.rs @@ -18,6 +18,7 @@ pub use crate::tasks::tokio::oneshot; pub use crate::tasks::tokio::sleep; pub use crate::tasks::tokio::CancellationToken; pub use crate::tasks::tokio::{spawn, spawn_blocking, JoinHandle, Runtime}; +pub use crate::tasks::tokio::{BroadcastStream, UnboundedReceiverStream}; use std::future::Future; pub fn run(future: F) -> F::Output { diff --git a/rt/src/tasks/tokio/mod.rs b/rt/src/tasks/tokio/mod.rs index aaf679d..9ccd31b 100644 --- a/rt/src/tasks/tokio/mod.rs +++ b/rt/src/tasks/tokio/mod.rs @@ -7,4 +7,5 @@ pub use tokio::{ task::{spawn, spawn_blocking, JoinHandle}, time::sleep, }; +pub use tokio_stream::wrappers::{BroadcastStream, UnboundedReceiverStream}; pub use tokio_util::sync::CancellationToken; From 4e3db7fff69ab050e4cd1100e15e91d97d1b9631 Mon Sep 17 00:00:00 2001 From: Juan Munoz Date: Wed, 2 Jul 2025 11:59:45 -0300 Subject: [PATCH 04/34] add bounded channel --- concurrency/src/tasks/gen_server.rs | 14 ++++++------ concurrency/src/tasks/mod.rs | 2 +- concurrency/src/tasks/process.rs | 28 +++++++++++++---------- concurrency/src/tasks/stream.rs | 22 ++++++++++++++---- concurrency/src/tasks/stream_test.rs | 34 +++++++++++++++++++++++++--- examples/ping_pong/src/consumer.rs | 4 ++-- examples/ping_pong/src/messages.rs | 4 ++-- examples/ping_pong/src/producer.rs | 12 +++++----- rt/src/tasks/mod.rs | 2 +- rt/src/tasks/tokio/mod.rs | 2 +- rt/src/tasks/tokio/mpsc.rs | 4 ++-- 11 files changed, 87 insertions(+), 41 deletions(-) diff --git a/concurrency/src/tasks/gen_server.rs b/concurrency/src/tasks/gen_server.rs index 39ec0b5..9edf96c 100644 --- a/concurrency/src/tasks/gen_server.rs +++ b/concurrency/src/tasks/gen_server.rs @@ -8,7 +8,7 @@ use crate::error::GenServerError; #[derive(Debug)] pub struct GenServerHandle { - pub tx: mpsc::Sender>, + pub tx: mpsc::UnboundedSender>, } impl Clone for GenServerHandle { @@ -21,7 +21,7 @@ impl Clone for GenServerHandle { impl GenServerHandle { pub(crate) fn new(initial_state: G::State) -> Self { - let (tx, mut rx) = mpsc::channel::>(); + let (tx, mut rx) = mpsc::unbounded_channel::>(); let handle = GenServerHandle { tx }; let mut gen_server: G = GenServer::new(); let handle_clone = handle.clone(); @@ -39,7 +39,7 @@ impl GenServerHandle { } pub(crate) fn new_blocking(initial_state: G::State) -> Self { - let (tx, mut rx) = mpsc::channel::>(); + let (tx, mut rx) = mpsc::unbounded_channel::>(); let handle = GenServerHandle { tx }; let mut gen_server: G = GenServer::new(); let handle_clone = handle.clone(); @@ -58,7 +58,7 @@ impl GenServerHandle { handle_clone } - pub fn sender(&self) -> mpsc::Sender> { + pub fn sender(&self) -> mpsc::UnboundedSender> { self.tx.clone() } @@ -131,7 +131,7 @@ where fn run( &mut self, handle: &GenServerHandle, - rx: &mut mpsc::Receiver>, + rx: &mut mpsc::UnboundedReceiver>, state: Self::State, ) -> impl Future> + Send { async { @@ -162,7 +162,7 @@ where fn main_loop( &mut self, handle: &GenServerHandle, - rx: &mut mpsc::Receiver>, + rx: &mut mpsc::UnboundedReceiver>, mut state: Self::State, ) -> impl Future> + Send { async { @@ -181,7 +181,7 @@ where fn receive( &mut self, handle: &GenServerHandle, - rx: &mut mpsc::Receiver>, + rx: &mut mpsc::UnboundedReceiver>, state: Self::State, ) -> impl Future> + Send { async move { diff --git a/concurrency/src/tasks/mod.rs b/concurrency/src/tasks/mod.rs index c1358d1..69f51d8 100644 --- a/concurrency/src/tasks/mod.rs +++ b/concurrency/src/tasks/mod.rs @@ -13,5 +13,5 @@ mod timer_tests; pub use gen_server::{CallResponse, CastResponse, GenServer, GenServerHandle, GenServerInMsg}; pub use process::{send, Process, ProcessInfo}; -pub use stream::{spawn_listener, unbounded_receiver_to_stream}; +pub use stream::{receiver_to_stream, spawn_listener, unbounded_receiver_to_stream}; pub use time::{send_after, send_interval}; diff --git a/concurrency/src/tasks/process.rs b/concurrency/src/tasks/process.rs index b623d2b..0f936c5 100644 --- a/concurrency/src/tasks/process.rs +++ b/concurrency/src/tasks/process.rs @@ -6,12 +6,12 @@ use std::future::Future; #[derive(Debug)] pub struct ProcessInfo { - pub tx: mpsc::Sender, + pub tx: mpsc::UnboundedSender, pub handle: JoinHandle<()>, } impl ProcessInfo { - pub fn sender(&self) -> mpsc::Sender { + pub fn sender(&self) -> mpsc::UnboundedSender { self.tx.clone() } @@ -26,7 +26,7 @@ where { fn spawn(mut self) -> impl Future> + Send { async { - let (tx, mut rx) = mpsc::channel::(); + let (tx, mut rx) = mpsc::unbounded_channel::(); let tx_clone = tx.clone(); let handle = rt::spawn(async move { self.run(&tx_clone, &mut rx).await; @@ -37,8 +37,8 @@ where fn run( &mut self, - tx: &mpsc::Sender, - rx: &mut mpsc::Receiver, + tx: &mpsc::UnboundedSender, + rx: &mut mpsc::UnboundedReceiver, ) -> impl Future + Send { async { self.init(tx).await; @@ -48,8 +48,8 @@ where fn main_loop( &mut self, - tx: &mpsc::Sender, - rx: &mut mpsc::Receiver, + tx: &mpsc::UnboundedSender, + rx: &mut mpsc::UnboundedReceiver, ) -> impl Future + Send { async { loop { @@ -66,14 +66,14 @@ where false } - fn init(&mut self, _tx: &mpsc::Sender) -> impl Future + Send { + fn init(&mut self, _tx: &mpsc::UnboundedSender) -> impl Future + Send { async {} } fn receive( &mut self, - tx: &mpsc::Sender, - rx: &mut mpsc::Receiver, + tx: &mpsc::UnboundedSender, + rx: &mut mpsc::UnboundedReceiver, ) -> impl std::future::Future + Send { async { match rx.recv().await { @@ -83,10 +83,14 @@ where } } - fn handle(&mut self, message: T, tx: &mpsc::Sender) -> impl Future + Send; + fn handle( + &mut self, + message: T, + tx: &mpsc::UnboundedSender, + ) -> impl Future + Send; } -pub fn send(tx: &mpsc::Sender, message: T) +pub fn send(tx: &mpsc::UnboundedSender, message: T) where T: Send, { diff --git a/concurrency/src/tasks/stream.rs b/concurrency/src/tasks/stream.rs index ad12203..08d8853 100644 --- a/concurrency/src/tasks/stream.rs +++ b/concurrency/src/tasks/stream.rs @@ -1,17 +1,31 @@ use futures::{Stream, StreamExt}; -use spawned_rt::tasks::{mpsc::Receiver, UnboundedReceiverStream}; +use spawned_rt::tasks::{ + mpsc::{Receiver, UnboundedReceiver}, + ReceiverStream, UnboundedReceiverStream, +}; use crate::tasks::{GenServer, GenServerHandle}; -/// Converts an unbounded receiver into a stream that can be used with async tasks -/// This function is useful for integrating with the `spawn_listener` function -pub fn unbounded_receiver_to_stream(receiver: Receiver) -> UnboundedReceiverStream +/// Converts an unbounded receiver into a stream that can be used with async tasks. +/// +/// This function is useful in conjunction with the [`spawn_listener`] function. +pub fn unbounded_receiver_to_stream(receiver: UnboundedReceiver) -> UnboundedReceiverStream where T: 'static + Clone + Send, { UnboundedReceiverStream::new(receiver) } +/// Converts a (bounded) receiver into a stream that can be used with async tasks. +/// +/// This function is useful in conjunction with the [`spawn_listener`] function. +pub fn receiver_to_stream(receiver: Receiver) -> ReceiverStream +where + T: 'static + Clone + Send, +{ + ReceiverStream::new(receiver) +} + /// Spawns a listener that listens to a stream and sends messages to a GenServer pub fn spawn_listener(mut handle: GenServerHandle, message_builder: F, mut stream: S) where diff --git a/concurrency/src/tasks/stream_test.rs b/concurrency/src/tasks/stream_test.rs index 3192382..533564b 100644 --- a/concurrency/src/tasks/stream_test.rs +++ b/concurrency/src/tasks/stream_test.rs @@ -3,8 +3,8 @@ use std::time::Duration; use spawned_rt::tasks::{self as rt}; use crate::tasks::{ - spawn_listener, unbounded_receiver_to_stream, CallResponse, CastResponse, GenServer, - GenServerHandle, + receiver_to_stream, spawn_listener, unbounded_receiver_to_stream, CallResponse, CastResponse, + GenServer, GenServerHandle, }; type SummatoryHandle = GenServerHandle; @@ -81,7 +81,35 @@ pub fn test_sum_numbers_from_channel() { let runtime = rt::Runtime::new().unwrap(); runtime.block_on(async move { let mut summatory_handle = Summatory::start(0); - let (tx, rx) = spawned_rt::tasks::mpsc::channel(); + let (tx, rx) = spawned_rt::tasks::mpsc::channel(5); + + // Spawn a task to send numbers to the channel + spawned_rt::tasks::spawn(async move { + for i in 1..=5 { + tx.send(Ok(i)).await.unwrap(); + } + }); + + spawn_listener( + summatory_handle.clone(), + message_builder, + receiver_to_stream(rx), + ); + + // Wait for 1 second so the whole stream is processed + rt::sleep(Duration::from_secs(1)).await; + + let val = Summatory::get_value(&mut summatory_handle).await.unwrap(); + assert_eq!(val, 15); + }) +} + +#[test] +pub fn test_sum_numbers_from_unbounded_channel() { + let runtime = rt::Runtime::new().unwrap(); + runtime.block_on(async move { + let mut summatory_handle = Summatory::start(0); + let (tx, rx) = spawned_rt::tasks::mpsc::unbounded_channel(); // Spawn a task to send numbers to the channel spawned_rt::tasks::spawn(async move { diff --git a/examples/ping_pong/src/consumer.rs b/examples/ping_pong/src/consumer.rs index 8ead269..5118184 100644 --- a/examples/ping_pong/src/consumer.rs +++ b/examples/ping_pong/src/consumer.rs @@ -1,5 +1,5 @@ use spawned_concurrency::tasks::{self as concurrency, Process, ProcessInfo}; -use spawned_rt::tasks::mpsc::Sender; +use spawned_rt::tasks::mpsc::UnboundedSender; use crate::messages::Message; @@ -12,7 +12,7 @@ impl Consumer { } impl Process for Consumer { - async fn handle(&mut self, message: Message, _tx: &Sender) -> Message { + async fn handle(&mut self, message: Message, _tx: &UnboundedSender) -> Message { tracing::info!("Consumer received {message:?}"); match message.clone() { Message::Ping { from } => { diff --git a/examples/ping_pong/src/messages.rs b/examples/ping_pong/src/messages.rs index a22ae6c..a5e797a 100644 --- a/examples/ping_pong/src/messages.rs +++ b/examples/ping_pong/src/messages.rs @@ -1,7 +1,7 @@ -use spawned_rt::tasks::mpsc::Sender; +use spawned_rt::tasks::mpsc::UnboundedSender; #[derive(Debug, Clone)] pub enum Message { - Ping { from: Sender }, + Ping { from: UnboundedSender }, Pong, } diff --git a/examples/ping_pong/src/producer.rs b/examples/ping_pong/src/producer.rs index 71829a1..b542496 100644 --- a/examples/ping_pong/src/producer.rs +++ b/examples/ping_pong/src/producer.rs @@ -1,18 +1,18 @@ use spawned_concurrency::tasks::{self as concurrency, Process, ProcessInfo}; -use spawned_rt::tasks::mpsc::Sender; +use spawned_rt::tasks::mpsc::UnboundedSender; use crate::messages::Message; pub struct Producer { - consumer: Sender, + consumer: UnboundedSender, } impl Producer { - pub async fn spawn_new(consumer: Sender) -> ProcessInfo { + pub async fn spawn_new(consumer: UnboundedSender) -> ProcessInfo { Self { consumer }.spawn().await } - fn send_ping(&self, tx: &Sender, consumer: &Sender) { + fn send_ping(&self, tx: &UnboundedSender, consumer: &UnboundedSender) { let message = Message::Ping { from: tx.clone() }; tracing::info!("Producer sent Ping"); concurrency::send(consumer, message); @@ -20,11 +20,11 @@ impl Producer { } impl Process for Producer { - async fn init(&mut self, tx: &Sender) { + async fn init(&mut self, tx: &UnboundedSender) { self.send_ping(tx, &self.consumer); } - async fn handle(&mut self, message: Message, tx: &Sender) -> Message { + async fn handle(&mut self, message: Message, tx: &UnboundedSender) -> Message { tracing::info!("Producer received {message:?}"); self.send_ping(tx, &self.consumer); message diff --git a/rt/src/tasks/mod.rs b/rt/src/tasks/mod.rs index 11c9d79..7055005 100644 --- a/rt/src/tasks/mod.rs +++ b/rt/src/tasks/mod.rs @@ -18,7 +18,7 @@ pub use crate::tasks::tokio::oneshot; pub use crate::tasks::tokio::sleep; pub use crate::tasks::tokio::CancellationToken; pub use crate::tasks::tokio::{spawn, spawn_blocking, JoinHandle, Runtime}; -pub use crate::tasks::tokio::{BroadcastStream, UnboundedReceiverStream}; +pub use crate::tasks::tokio::{BroadcastStream, ReceiverStream, UnboundedReceiverStream}; use std::future::Future; pub fn run(future: F) -> F::Output { diff --git a/rt/src/tasks/tokio/mod.rs b/rt/src/tasks/tokio/mod.rs index 9ccd31b..7727b26 100644 --- a/rt/src/tasks/tokio/mod.rs +++ b/rt/src/tasks/tokio/mod.rs @@ -7,5 +7,5 @@ pub use tokio::{ task::{spawn, spawn_blocking, JoinHandle}, time::sleep, }; -pub use tokio_stream::wrappers::{BroadcastStream, UnboundedReceiverStream}; +pub use tokio_stream::wrappers::{BroadcastStream, ReceiverStream, UnboundedReceiverStream}; pub use tokio_util::sync::CancellationToken; diff --git a/rt/src/tasks/tokio/mpsc.rs b/rt/src/tasks/tokio/mpsc.rs index ec520a6..fca1e6b 100644 --- a/rt/src/tasks/tokio/mpsc.rs +++ b/rt/src/tasks/tokio/mpsc.rs @@ -1,6 +1,6 @@ //! Tokio.rs reexports to prevent tokio dependencies within external code pub use tokio::sync::mpsc::{ - error::SendError, unbounded_channel as channel, UnboundedReceiver as Receiver, - UnboundedSender as Sender, + channel, error::SendError, unbounded_channel, Receiver, Sender, UnboundedReceiver, + UnboundedSender, }; From de32666e28a66042c4cc65028c278644aea4e748 Mon Sep 17 00:00:00 2001 From: Juan Munoz Date: Wed, 2 Jul 2025 14:45:35 -0300 Subject: [PATCH 05/34] add broadcast listener --- concurrency/src/tasks/mod.rs | 5 ++- concurrency/src/tasks/stream.rs | 51 ++++++++++++++++++++++++++-- concurrency/src/tasks/stream_test.rs | 28 ++++++++++++++- rt/src/tasks/mod.rs | 4 ++- rt/src/tasks/tokio/mod.rs | 4 ++- rt/src/tasks/tokio/mpsc.rs | 9 +++-- 6 files changed, 92 insertions(+), 9 deletions(-) diff --git a/concurrency/src/tasks/mod.rs b/concurrency/src/tasks/mod.rs index 69f51d8..371abc8 100644 --- a/concurrency/src/tasks/mod.rs +++ b/concurrency/src/tasks/mod.rs @@ -13,5 +13,8 @@ mod timer_tests; pub use gen_server::{CallResponse, CastResponse, GenServer, GenServerHandle, GenServerInMsg}; pub use process::{send, Process, ProcessInfo}; -pub use stream::{receiver_to_stream, spawn_listener, unbounded_receiver_to_stream}; +pub use stream::{ + broadcast_receiver_to_stream, receiver_to_stream, spawn_broadcast_listener, spawn_listener, + unbounded_receiver_to_stream, +}; pub use time::{send_after, send_interval}; diff --git a/concurrency/src/tasks/stream.rs b/concurrency/src/tasks/stream.rs index 08d8853..c182e00 100644 --- a/concurrency/src/tasks/stream.rs +++ b/concurrency/src/tasks/stream.rs @@ -1,7 +1,7 @@ use futures::{Stream, StreamExt}; use spawned_rt::tasks::{ - mpsc::{Receiver, UnboundedReceiver}, - ReceiverStream, UnboundedReceiverStream, + mpsc::{BroadcastReceiver, Receiver, UnboundedReceiver}, + BroadcastStream, BroadcastStreamRecvError, ReceiverStream, UnboundedReceiverStream, }; use crate::tasks::{GenServer, GenServerHandle}; @@ -52,3 +52,50 @@ where } }); } + +/// Converts a broadcast receiver into a stream that can be used with async tasks. +/// +/// This function is useful in conjunction with the [`spawn_broadcast_listener`] function. +pub fn broadcast_receiver_to_stream(receiver: BroadcastReceiver) -> BroadcastStream +where + T: 'static + Clone + Send, +{ + BroadcastStream::new(receiver) +} + +/// Spawns a listener that listens to a stream and sends messages to a GenServer +pub fn spawn_broadcast_listener( + mut handle: GenServerHandle, + message_builder: F, + mut stream: S, +) where + T: GenServer + 'static, + F: Fn(I) -> T::CastMsg + Send + 'static, + I: Send, + S: Unpin + + Send + + Stream, BroadcastStreamRecvError>> + + 'static, +{ + spawned_rt::tasks::spawn(async move { + loop { + match stream.next().await { + Some(Ok(Ok(item))) => { + let _ = handle.cast(message_builder(item)).await; + } + Some(Ok(Err(e))) => { + tracing::trace!("Received Error in msg {e:?}"); + break; + } + Some(Err(e)) => { + tracing::trace!("Received Error in msg {e:?}"); + break; + } + None => { + tracing::trace!("Stream finnished"); + break; + } + } + } + }); +} diff --git a/concurrency/src/tasks/stream_test.rs b/concurrency/src/tasks/stream_test.rs index 533564b..4e63d6d 100644 --- a/concurrency/src/tasks/stream_test.rs +++ b/concurrency/src/tasks/stream_test.rs @@ -3,7 +3,8 @@ use std::time::Duration; use spawned_rt::tasks::{self as rt}; use crate::tasks::{ - receiver_to_stream, spawn_listener, unbounded_receiver_to_stream, CallResponse, CastResponse, + broadcast_receiver_to_stream, receiver_to_stream, spawn_listener, + stream::spawn_broadcast_listener, unbounded_receiver_to_stream, CallResponse, CastResponse, GenServer, GenServerHandle, }; @@ -131,3 +132,28 @@ pub fn test_sum_numbers_from_unbounded_channel() { assert_eq!(val, 15); }) } + +#[test] +pub fn test_sum_numbers_from_broadcast_channel() { + let runtime = rt::Runtime::new().unwrap(); + runtime.block_on(async move { + let mut summatory_handle = Summatory::start(0); + let (tx, rx) = spawned_rt::tasks::mpsc::broadcast_channel(5); + + // Spawn a task to send numbers to the channel + spawned_rt::tasks::spawn(async move { + for i in 1u8..=5 { + tx.send(Ok(i)).unwrap(); + } + }); + + let stream = broadcast_receiver_to_stream(rx); + spawn_broadcast_listener(summatory_handle.clone(), message_builder, stream); + + // Wait for 1 second so the whole stream is processed + rt::sleep(Duration::from_secs(1)).await; + + let val = Summatory::get_value(&mut summatory_handle).await.unwrap(); + assert_eq!(val, 15); + }) +} diff --git a/rt/src/tasks/mod.rs b/rt/src/tasks/mod.rs index 7055005..e4b5b3a 100644 --- a/rt/src/tasks/mod.rs +++ b/rt/src/tasks/mod.rs @@ -18,7 +18,9 @@ pub use crate::tasks::tokio::oneshot; pub use crate::tasks::tokio::sleep; pub use crate::tasks::tokio::CancellationToken; pub use crate::tasks::tokio::{spawn, spawn_blocking, JoinHandle, Runtime}; -pub use crate::tasks::tokio::{BroadcastStream, ReceiverStream, UnboundedReceiverStream}; +pub use crate::tasks::tokio::{ + BroadcastStream, BroadcastStreamRecvError, ReceiverStream, UnboundedReceiverStream, +}; use std::future::Future; pub fn run(future: F) -> F::Output { diff --git a/rt/src/tasks/tokio/mod.rs b/rt/src/tasks/tokio/mod.rs index 7727b26..5cf31d4 100644 --- a/rt/src/tasks/tokio/mod.rs +++ b/rt/src/tasks/tokio/mod.rs @@ -7,5 +7,7 @@ pub use tokio::{ task::{spawn, spawn_blocking, JoinHandle}, time::sleep, }; -pub use tokio_stream::wrappers::{BroadcastStream, ReceiverStream, UnboundedReceiverStream}; +pub use tokio_stream::wrappers::{ + errors::BroadcastStreamRecvError, BroadcastStream, ReceiverStream, UnboundedReceiverStream, +}; pub use tokio_util::sync::CancellationToken; diff --git a/rt/src/tasks/tokio/mpsc.rs b/rt/src/tasks/tokio/mpsc.rs index fca1e6b..fba1606 100644 --- a/rt/src/tasks/tokio/mpsc.rs +++ b/rt/src/tasks/tokio/mpsc.rs @@ -1,6 +1,9 @@ //! Tokio.rs reexports to prevent tokio dependencies within external code -pub use tokio::sync::mpsc::{ - channel, error::SendError, unbounded_channel, Receiver, Sender, UnboundedReceiver, - UnboundedSender, +pub use tokio::sync::{ + broadcast::{channel as broadcast_channel, Receiver as BroadcastReceiver}, + mpsc::{ + channel, error::SendError, unbounded_channel, Receiver, Sender, UnboundedReceiver, + UnboundedSender, + }, }; From 6bfb8a1820ae19b7eb74be20aaae1b9e3760f072 Mon Sep 17 00:00:00 2001 From: Juan Munoz Date: Wed, 2 Jul 2025 15:33:28 -0300 Subject: [PATCH 06/34] fix `spawn_broadcast_listener` --- concurrency/src/tasks/stream.rs | 25 ++++++++++--------------- concurrency/src/tasks/stream_test.rs | 2 +- 2 files changed, 11 insertions(+), 16 deletions(-) diff --git a/concurrency/src/tasks/stream.rs b/concurrency/src/tasks/stream.rs index c182e00..2f8be83 100644 --- a/concurrency/src/tasks/stream.rs +++ b/concurrency/src/tasks/stream.rs @@ -72,25 +72,20 @@ pub fn spawn_broadcast_listener( T: GenServer + 'static, F: Fn(I) -> T::CastMsg + Send + 'static, I: Send, - S: Unpin - + Send - + Stream, BroadcastStreamRecvError>> - + 'static, + S: Unpin + Send + Stream> + 'static, { spawned_rt::tasks::spawn(async move { loop { match stream.next().await { - Some(Ok(Ok(item))) => { - let _ = handle.cast(message_builder(item)).await; - } - Some(Ok(Err(e))) => { - tracing::trace!("Received Error in msg {e:?}"); - break; - } - Some(Err(e)) => { - tracing::trace!("Received Error in msg {e:?}"); - break; - } + Some(item) => match item { + Ok(i) => { + let _ = handle.cast(message_builder(i)).await; + } + Err(e) => { + tracing::trace!("Received Error in msg {e:?}"); + break; + } + }, None => { tracing::trace!("Stream finnished"); break; diff --git a/concurrency/src/tasks/stream_test.rs b/concurrency/src/tasks/stream_test.rs index 4e63d6d..913b710 100644 --- a/concurrency/src/tasks/stream_test.rs +++ b/concurrency/src/tasks/stream_test.rs @@ -143,7 +143,7 @@ pub fn test_sum_numbers_from_broadcast_channel() { // Spawn a task to send numbers to the channel spawned_rt::tasks::spawn(async move { for i in 1u8..=5 { - tx.send(Ok(i)).unwrap(); + tx.send(i).unwrap(); } }); From 7a6f79a3e041d3ba3a8f1f1c9acca6937acec715 Mon Sep 17 00:00:00 2001 From: Juan Munoz Date: Wed, 2 Jul 2025 16:46:43 -0300 Subject: [PATCH 07/34] unify spawn_listener & remove oneline functions --- concurrency/src/tasks/mod.rs | 5 +- concurrency/src/tasks/stream.rs | 74 +++------------------------- concurrency/src/tasks/stream_test.rs | 23 ++++----- rt/src/tasks/mod.rs | 4 +- rt/src/tasks/tokio/mod.rs | 4 +- 5 files changed, 22 insertions(+), 88 deletions(-) diff --git a/concurrency/src/tasks/mod.rs b/concurrency/src/tasks/mod.rs index 371abc8..3448286 100644 --- a/concurrency/src/tasks/mod.rs +++ b/concurrency/src/tasks/mod.rs @@ -13,8 +13,5 @@ mod timer_tests; pub use gen_server::{CallResponse, CastResponse, GenServer, GenServerHandle, GenServerInMsg}; pub use process::{send, Process, ProcessInfo}; -pub use stream::{ - broadcast_receiver_to_stream, receiver_to_stream, spawn_broadcast_listener, spawn_listener, - unbounded_receiver_to_stream, -}; +pub use stream::spawn_listener; pub use time::{send_after, send_interval}; diff --git a/concurrency/src/tasks/stream.rs b/concurrency/src/tasks/stream.rs index 2f8be83..1d88964 100644 --- a/concurrency/src/tasks/stream.rs +++ b/concurrency/src/tasks/stream.rs @@ -1,70 +1,9 @@ -use futures::{Stream, StreamExt}; -use spawned_rt::tasks::{ - mpsc::{BroadcastReceiver, Receiver, UnboundedReceiver}, - BroadcastStream, BroadcastStreamRecvError, ReceiverStream, UnboundedReceiverStream, -}; - use crate::tasks::{GenServer, GenServerHandle}; - -/// Converts an unbounded receiver into a stream that can be used with async tasks. -/// -/// This function is useful in conjunction with the [`spawn_listener`] function. -pub fn unbounded_receiver_to_stream(receiver: UnboundedReceiver) -> UnboundedReceiverStream -where - T: 'static + Clone + Send, -{ - UnboundedReceiverStream::new(receiver) -} - -/// Converts a (bounded) receiver into a stream that can be used with async tasks. -/// -/// This function is useful in conjunction with the [`spawn_listener`] function. -pub fn receiver_to_stream(receiver: Receiver) -> ReceiverStream -where - T: 'static + Clone + Send, -{ - ReceiverStream::new(receiver) -} - -/// Spawns a listener that listens to a stream and sends messages to a GenServer -pub fn spawn_listener(mut handle: GenServerHandle, message_builder: F, mut stream: S) -where - T: GenServer + 'static, - F: Fn(I) -> T::CastMsg + Send + 'static, - I: Send, - S: Unpin + Send + Stream> + 'static, -{ - spawned_rt::tasks::spawn(async move { - loop { - match stream.next().await { - Some(Ok(item)) => { - let _ = handle.cast(message_builder(item)).await; - } - Some(Err(e)) => { - tracing::trace!("Received Error in msg {e:?}"); - break; - } - None => { - tracing::trace!("Stream finnished"); - break; - } - } - } - }); -} - -/// Converts a broadcast receiver into a stream that can be used with async tasks. -/// -/// This function is useful in conjunction with the [`spawn_broadcast_listener`] function. -pub fn broadcast_receiver_to_stream(receiver: BroadcastReceiver) -> BroadcastStream -where - T: 'static + Clone + Send, -{ - BroadcastStream::new(receiver) -} +use futures::{Stream, StreamExt}; /// Spawns a listener that listens to a stream and sends messages to a GenServer -pub fn spawn_broadcast_listener( +/// Items sent through the stream are required to be wrapped in a Result type. +pub fn spawn_listener( mut handle: GenServerHandle, message_builder: F, mut stream: S, @@ -72,12 +11,13 @@ pub fn spawn_broadcast_listener( T: GenServer + 'static, F: Fn(I) -> T::CastMsg + Send + 'static, I: Send, - S: Unpin + Send + Stream> + 'static, + E: std::fmt::Debug + Send, + S: Unpin + Send + Stream> + 'static, { spawned_rt::tasks::spawn(async move { loop { match stream.next().await { - Some(item) => match item { + Some(res) => match res { Ok(i) => { let _ = handle.cast(message_builder(i)).await; } @@ -87,7 +27,7 @@ pub fn spawn_broadcast_listener( } }, None => { - tracing::trace!("Stream finnished"); + tracing::trace!("Stream finished"); break; } } diff --git a/concurrency/src/tasks/stream_test.rs b/concurrency/src/tasks/stream_test.rs index 913b710..756af01 100644 --- a/concurrency/src/tasks/stream_test.rs +++ b/concurrency/src/tasks/stream_test.rs @@ -1,11 +1,9 @@ use std::time::Duration; -use spawned_rt::tasks::{self as rt}; +use spawned_rt::tasks::{self as rt, BroadcastStream, ReceiverStream, UnboundedReceiverStream}; use crate::tasks::{ - broadcast_receiver_to_stream, receiver_to_stream, spawn_listener, - stream::spawn_broadcast_listener, unbounded_receiver_to_stream, CallResponse, CastResponse, - GenServer, GenServerHandle, + stream::spawn_listener, CallResponse, CastResponse, GenServer, GenServerHandle, }; type SummatoryHandle = GenServerHandle; @@ -65,7 +63,7 @@ pub fn test_sum_numbers_from_stream() { let runtime = rt::Runtime::new().unwrap(); runtime.block_on(async move { let mut summatory_handle = Summatory::start(0); - let stream = tokio_stream::iter(vec![1u8, 2, 3, 4, 5].into_iter().map(Ok)); + let stream = tokio_stream::iter(vec![1u8, 2, 3, 4, 5].into_iter().map(Ok::)); spawn_listener(summatory_handle.clone(), message_builder, stream); @@ -82,7 +80,7 @@ pub fn test_sum_numbers_from_channel() { let runtime = rt::Runtime::new().unwrap(); runtime.block_on(async move { let mut summatory_handle = Summatory::start(0); - let (tx, rx) = spawned_rt::tasks::mpsc::channel(5); + let (tx, rx) = spawned_rt::tasks::mpsc::channel::>(5); // Spawn a task to send numbers to the channel spawned_rt::tasks::spawn(async move { @@ -94,7 +92,7 @@ pub fn test_sum_numbers_from_channel() { spawn_listener( summatory_handle.clone(), message_builder, - receiver_to_stream(rx), + ReceiverStream::new(rx), ); // Wait for 1 second so the whole stream is processed @@ -110,7 +108,7 @@ pub fn test_sum_numbers_from_unbounded_channel() { let runtime = rt::Runtime::new().unwrap(); runtime.block_on(async move { let mut summatory_handle = Summatory::start(0); - let (tx, rx) = spawned_rt::tasks::mpsc::unbounded_channel(); + let (tx, rx) = spawned_rt::tasks::mpsc::unbounded_channel::>(); // Spawn a task to send numbers to the channel spawned_rt::tasks::spawn(async move { @@ -122,7 +120,7 @@ pub fn test_sum_numbers_from_unbounded_channel() { spawn_listener( summatory_handle.clone(), message_builder, - unbounded_receiver_to_stream(rx), + UnboundedReceiverStream::new(rx), ); // Wait for 1 second so the whole stream is processed @@ -147,8 +145,11 @@ pub fn test_sum_numbers_from_broadcast_channel() { } }); - let stream = broadcast_receiver_to_stream(rx); - spawn_broadcast_listener(summatory_handle.clone(), message_builder, stream); + spawn_listener( + summatory_handle.clone(), + message_builder, + BroadcastStream::new(rx), + ); // Wait for 1 second so the whole stream is processed rt::sleep(Duration::from_secs(1)).await; diff --git a/rt/src/tasks/mod.rs b/rt/src/tasks/mod.rs index e4b5b3a..7055005 100644 --- a/rt/src/tasks/mod.rs +++ b/rt/src/tasks/mod.rs @@ -18,9 +18,7 @@ pub use crate::tasks::tokio::oneshot; pub use crate::tasks::tokio::sleep; pub use crate::tasks::tokio::CancellationToken; pub use crate::tasks::tokio::{spawn, spawn_blocking, JoinHandle, Runtime}; -pub use crate::tasks::tokio::{ - BroadcastStream, BroadcastStreamRecvError, ReceiverStream, UnboundedReceiverStream, -}; +pub use crate::tasks::tokio::{BroadcastStream, ReceiverStream, UnboundedReceiverStream}; use std::future::Future; pub fn run(future: F) -> F::Output { diff --git a/rt/src/tasks/tokio/mod.rs b/rt/src/tasks/tokio/mod.rs index 5cf31d4..7727b26 100644 --- a/rt/src/tasks/tokio/mod.rs +++ b/rt/src/tasks/tokio/mod.rs @@ -7,7 +7,5 @@ pub use tokio::{ task::{spawn, spawn_blocking, JoinHandle}, time::sleep, }; -pub use tokio_stream::wrappers::{ - errors::BroadcastStreamRecvError, BroadcastStream, ReceiverStream, UnboundedReceiverStream, -}; +pub use tokio_stream::wrappers::{BroadcastStream, ReceiverStream, UnboundedReceiverStream}; pub use tokio_util::sync::CancellationToken; From 4d49882f0e822e30c2e8eb2e49e0724a9d778efc Mon Sep 17 00:00:00 2001 From: Juan Munoz Date: Wed, 2 Jul 2025 16:53:33 -0300 Subject: [PATCH 08/34] doc update --- concurrency/src/tasks/stream.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/concurrency/src/tasks/stream.rs b/concurrency/src/tasks/stream.rs index 1d88964..dd03419 100644 --- a/concurrency/src/tasks/stream.rs +++ b/concurrency/src/tasks/stream.rs @@ -1,7 +1,8 @@ use crate::tasks::{GenServer, GenServerHandle}; use futures::{Stream, StreamExt}; -/// Spawns a listener that listens to a stream and sends messages to a GenServer +/// Spawns a listener that listens to a stream and sends messages to a GenServer. +/// /// Items sent through the stream are required to be wrapped in a Result type. pub fn spawn_listener( mut handle: GenServerHandle, From 5138e0964f216834440370ab477438671e33f8a8 Mon Sep 17 00:00:00 2001 From: Juan Munoz Date: Thu, 3 Jul 2025 09:49:24 -0300 Subject: [PATCH 09/34] add impl of sync spawn listener --- concurrency/src/tasks/mod.rs | 2 +- .../tasks/{stream_test.rs => stream_tests.rs} | 0 concurrency/src/threads/mod.rs | 4 + concurrency/src/threads/stream.rs | 45 ++++++++++ concurrency/src/threads/stream_tests.rs | 88 +++++++++++++++++++ 5 files changed, 138 insertions(+), 1 deletion(-) rename concurrency/src/tasks/{stream_test.rs => stream_tests.rs} (100%) create mode 100644 concurrency/src/threads/stream.rs create mode 100644 concurrency/src/threads/stream_tests.rs diff --git a/concurrency/src/tasks/mod.rs b/concurrency/src/tasks/mod.rs index 3448286..201c7d2 100644 --- a/concurrency/src/tasks/mod.rs +++ b/concurrency/src/tasks/mod.rs @@ -7,7 +7,7 @@ mod stream; mod time; #[cfg(test)] -mod stream_test; +mod stream_tests; #[cfg(test)] mod timer_tests; diff --git a/concurrency/src/tasks/stream_test.rs b/concurrency/src/tasks/stream_tests.rs similarity index 100% rename from concurrency/src/tasks/stream_test.rs rename to concurrency/src/tasks/stream_tests.rs diff --git a/concurrency/src/threads/mod.rs b/concurrency/src/threads/mod.rs index 44e9dcd..a13f963 100644 --- a/concurrency/src/threads/mod.rs +++ b/concurrency/src/threads/mod.rs @@ -3,11 +3,15 @@ mod gen_server; mod process; +mod stream; mod time; +#[cfg(test)] +mod stream_tests; #[cfg(test)] mod timer_tests; pub use gen_server::{CallResponse, CastResponse, GenServer, GenServerHandle, GenServerInMsg}; pub use process::{send, Process, ProcessInfo}; +pub use stream::{spawn_listener, stream_to_iter}; pub use time::{send_after, send_interval}; diff --git a/concurrency/src/threads/stream.rs b/concurrency/src/threads/stream.rs new file mode 100644 index 0000000..71405ca --- /dev/null +++ b/concurrency/src/threads/stream.rs @@ -0,0 +1,45 @@ +use crate::threads::{GenServer, GenServerHandle}; +use std::thread; + +use futures::{Stream, StreamExt}; // StreamExt gives us `next` +use std::iter; + +/// Utility function to convert asynchronous streams into synchronous iterators. +/// +/// This function is useful when used in conjunction with `spawn_listener` +pub fn stream_to_iter(mut stream: S) -> impl Iterator +where + S: Stream + Unpin, +{ + iter::from_fn(move || futures::executor::block_on(stream.next())) +} + +/// Spawns a listener that iterates over a stream and sends messages to a GenServer. +/// +/// Items sent through the iterator are required to be wrapped in a Result type. +pub fn spawn_listener( + mut handle: GenServerHandle, + message_builder: F, + iterable: S, +) where + T: GenServer + 'static, + F: Fn(I) -> T::CastMsg + Send + 'static, + I: Send + 'static, + E: std::fmt::Debug + Send + 'static, + S: Iterator> + Send + 'static, +{ + thread::spawn(move || { + for res in iterable { + match res { + Ok(i) => { + let _ = handle.cast(message_builder(i)); + } + Err(err) => { + tracing::error!("Error in stream: {:?}", err); + break; + } + } + } + tracing::trace!("Stream finished"); + }); +} diff --git a/concurrency/src/threads/stream_tests.rs b/concurrency/src/threads/stream_tests.rs new file mode 100644 index 0000000..347fed7 --- /dev/null +++ b/concurrency/src/threads/stream_tests.rs @@ -0,0 +1,88 @@ +use std::time::Duration; + +use crate::threads::{ + stream::spawn_listener, stream_to_iter, CallResponse, CastResponse, GenServer, GenServerHandle, +}; + +use spawned_rt::threads::{self as rt}; + +type SummatoryHandle = GenServerHandle; + +struct Summatory; + +type SummatoryState = u16; +type SummatoryCastMessage = SummatoryState; +type SummatoryOutMessage = SummatoryState; + +impl Summatory { + pub fn get_value(server: &mut SummatoryHandle) -> Result { + server.call(()).map_err(|_| ()) + } +} + +impl GenServer for Summatory { + type CallMsg = (); // We only handle one type of call, so there is no need for a specific message type. + type CastMsg = SummatoryCastMessage; + type OutMsg = SummatoryOutMessage; + type State = SummatoryState; + type Error = (); + + fn new() -> Self { + Self + } + + fn handle_cast( + &mut self, + message: Self::CastMsg, + _handle: &GenServerHandle, + state: Self::State, + ) -> CastResponse { + let new_state = state + message; + CastResponse::NoReply(new_state) + } + + fn handle_call( + &mut self, + _message: Self::CallMsg, + _handle: &GenServerHandle, + state: Self::State, + ) -> CallResponse { + let current_value = state; + CallResponse::Reply(state, current_value) + } +} + +// In this example, the stream sends u8 values, which are converted to the type +// supported by the GenServer (SummatoryCastMessage / u16). +fn message_builder(value: u8) -> SummatoryCastMessage { + value.into() +} + +#[test] +pub fn test_sum_numbers_from_iter() { + let mut summatory_handle = Summatory::start(0); + let stream = vec![1u8, 2, 3, 4, 5].into_iter().map(Ok::); + + spawn_listener(summatory_handle.clone(), message_builder, stream); + + // Wait for the stream to finish processing + rt::sleep(Duration::from_secs(1)); + + let val = Summatory::get_value(&mut summatory_handle).unwrap(); + assert_eq!(val, 15); +} + +#[test] +pub fn test_sum_numbers_from_async_stream() { + let mut summatory_handle = Summatory::start(0); + let async_stream = tokio_stream::iter(vec![1u8, 2, 3, 4, 5].into_iter().map(Ok::)); + let sync_stream = stream_to_iter(async_stream); + + spawn_listener(summatory_handle.clone(), message_builder, sync_stream); + + // Wait for the stream to finish processing + rt::sleep(Duration::from_secs(1)); + + let val = Summatory::get_value(&mut summatory_handle).unwrap(); + assert_eq!(val, 15); +} From 04222ae6bdf8b123f48e619f30ead01d0e36401b Mon Sep 17 00:00:00 2001 From: Juan Munoz Date: Thu, 3 Jul 2025 10:26:06 -0300 Subject: [PATCH 10/34] rename spawn_listener to spawn_listener_from_iter, and port spawn_listener --- concurrency/src/threads/mod.rs | 2 +- concurrency/src/threads/stream.rs | 44 ++++++++++++++++++------- concurrency/src/threads/stream_tests.rs | 34 +++++++++++++++---- 3 files changed, 62 insertions(+), 18 deletions(-) diff --git a/concurrency/src/threads/mod.rs b/concurrency/src/threads/mod.rs index a13f963..0138c05 100644 --- a/concurrency/src/threads/mod.rs +++ b/concurrency/src/threads/mod.rs @@ -13,5 +13,5 @@ mod timer_tests; pub use gen_server::{CallResponse, CastResponse, GenServer, GenServerHandle, GenServerInMsg}; pub use process::{send, Process, ProcessInfo}; -pub use stream::{spawn_listener, stream_to_iter}; +pub use stream::{spawn_listener, spawn_listener_from_iter}; pub use time::{send_after, send_interval}; diff --git a/concurrency/src/threads/stream.rs b/concurrency/src/threads/stream.rs index 71405ca..3018e7c 100644 --- a/concurrency/src/threads/stream.rs +++ b/concurrency/src/threads/stream.rs @@ -1,33 +1,55 @@ use crate::threads::{GenServer, GenServerHandle}; use std::thread; -use futures::{Stream, StreamExt}; // StreamExt gives us `next` +use futures::{Stream, StreamExt}; use std::iter; -/// Utility function to convert asynchronous streams into synchronous iterators. +/// Spawns a listener that iterates over an iterable and sends messages to a GenServer. /// -/// This function is useful when used in conjunction with `spawn_listener` -pub fn stream_to_iter(mut stream: S) -> impl Iterator -where - S: Stream + Unpin, +/// Items sent through the iterator are required to be wrapped in a Result type. +pub fn spawn_listener_from_iter( + mut handle: GenServerHandle, + message_builder: F, + iterable: S, +) where + T: GenServer + 'static, + F: Fn(I) -> T::CastMsg + Send + 'static, + I: Send + 'static, + E: std::fmt::Debug + Send + 'static, + S: Iterator> + Send + 'static, { - iter::from_fn(move || futures::executor::block_on(stream.next())) + thread::spawn(move || { + for res in iterable { + match res { + Ok(i) => { + let _ = handle.cast(message_builder(i)); + } + Err(err) => { + tracing::error!("Error in stream: {:?}", err); + break; + } + } + } + tracing::trace!("Stream finished"); + }); } -/// Spawns a listener that iterates over a stream and sends messages to a GenServer. +/// Spawns a listener that listens to a stream and sends messages to a GenServer. /// -/// Items sent through the iterator are required to be wrapped in a Result type. +/// Items sent through the stream are required to be wrapped in a Result type. pub fn spawn_listener( mut handle: GenServerHandle, message_builder: F, - iterable: S, + mut stream: S, ) where T: GenServer + 'static, F: Fn(I) -> T::CastMsg + Send + 'static, I: Send + 'static, E: std::fmt::Debug + Send + 'static, - S: Iterator> + Send + 'static, + S: Unpin + Send + Stream> + 'static, { + // Convert the stream into an iterator that blocks on each item. + let iterable = iter::from_fn(move || futures::executor::block_on(stream.next())); thread::spawn(move || { for res in iterable { match res { diff --git a/concurrency/src/threads/stream_tests.rs b/concurrency/src/threads/stream_tests.rs index 347fed7..4f81ec4 100644 --- a/concurrency/src/threads/stream_tests.rs +++ b/concurrency/src/threads/stream_tests.rs @@ -1,7 +1,8 @@ use std::time::Duration; use crate::threads::{ - stream::spawn_listener, stream_to_iter, CallResponse, CastResponse, GenServer, GenServerHandle, + stream::{spawn_listener, spawn_listener_from_iter}, + CallResponse, CastResponse, GenServer, GenServerHandle, }; use spawned_rt::threads::{self as rt}; @@ -63,7 +64,7 @@ pub fn test_sum_numbers_from_iter() { let mut summatory_handle = Summatory::start(0); let stream = vec![1u8, 2, 3, 4, 5].into_iter().map(Ok::); - spawn_listener(summatory_handle.clone(), message_builder, stream); + spawn_listener_from_iter(summatory_handle.clone(), message_builder, stream); // Wait for the stream to finish processing rt::sleep(Duration::from_secs(1)); @@ -74,13 +75,34 @@ pub fn test_sum_numbers_from_iter() { #[test] pub fn test_sum_numbers_from_async_stream() { + // In this example we are converting an async stream into a synchronous one + // Does this make sense in a real-world scenario? let mut summatory_handle = Summatory::start(0); - let async_stream = tokio_stream::iter(vec![1u8, 2, 3, 4, 5].into_iter().map(Ok::)); - let sync_stream = stream_to_iter(async_stream); + let stream = tokio_stream::iter(vec![1u8, 2, 3, 4, 5].into_iter().map(Ok::)); - spawn_listener(summatory_handle.clone(), message_builder, sync_stream); + spawn_listener(summatory_handle.clone(), message_builder, stream); - // Wait for the stream to finish processing + // Wait for 1 second so the whole stream is processed + rt::sleep(Duration::from_secs(1)); + + let val = Summatory::get_value(&mut summatory_handle).unwrap(); + assert_eq!(val, 15); +} + +#[test] +pub fn test_sum_numbers_from_channel() { + let mut summatory_handle = Summatory::start(0); + let (tx, rx) = rt::mpsc::channel::>(); + + rt::spawn(move || { + for i in 1..=5 { + tx.send(Ok(i)).unwrap(); + } + }); + + spawn_listener_from_iter(summatory_handle.clone(), message_builder, rx.into_iter()); + + // Wait for 1 second so the whole stream is processed rt::sleep(Duration::from_secs(1)); let val = Summatory::get_value(&mut summatory_handle).unwrap(); From 51ecbb256aa40dc80fab9d8cfabae36801ca582a Mon Sep 17 00:00:00 2001 From: Juan Munoz Date: Thu, 3 Jul 2025 10:50:23 -0300 Subject: [PATCH 11/34] add bound channel to threads concurrency --- concurrency/src/threads/gen_server.rs | 2 +- concurrency/src/threads/process.rs | 2 +- concurrency/src/threads/stream_tests.rs | 42 ++++++++++++++++++------- rt/src/threads/mpsc.rs | 4 ++- 4 files changed, 36 insertions(+), 14 deletions(-) diff --git a/concurrency/src/threads/gen_server.rs b/concurrency/src/threads/gen_server.rs index 753e849..69dd94c 100644 --- a/concurrency/src/threads/gen_server.rs +++ b/concurrency/src/threads/gen_server.rs @@ -23,7 +23,7 @@ impl Clone for GenServerHandle { impl GenServerHandle { pub(crate) fn new(initial_state: G::State) -> Self { - let (tx, mut rx) = mpsc::channel::>(); + let (tx, mut rx) = mpsc::unbounded_channel::>(); let handle = GenServerHandle { tx }; let mut gen_server: G = GenServer::new(); let handle_clone = handle.clone(); diff --git a/concurrency/src/threads/process.rs b/concurrency/src/threads/process.rs index 3dfd87d..88eec8e 100644 --- a/concurrency/src/threads/process.rs +++ b/concurrency/src/threads/process.rs @@ -24,7 +24,7 @@ where Self: Send + Sync + Sized + 'static, { fn spawn(mut self) -> ProcessInfo { - let (tx, mut rx) = mpsc::channel::(); + let (tx, mut rx) = mpsc::unbounded_channel::(); let tx_clone = tx.clone(); let handle = rt::spawn(move || self.run(&tx_clone, &mut rx)); ProcessInfo { tx, handle } diff --git a/concurrency/src/threads/stream_tests.rs b/concurrency/src/threads/stream_tests.rs index 4f81ec4..6616f77 100644 --- a/concurrency/src/threads/stream_tests.rs +++ b/concurrency/src/threads/stream_tests.rs @@ -60,13 +60,15 @@ fn message_builder(value: u8) -> SummatoryCastMessage { } #[test] -pub fn test_sum_numbers_from_iter() { +pub fn test_sum_numbers_from_stream() { + // In this example we are converting an async stream into a synchronous one + // Does this make sense in a real-world scenario? let mut summatory_handle = Summatory::start(0); - let stream = vec![1u8, 2, 3, 4, 5].into_iter().map(Ok::); + let stream = tokio_stream::iter(vec![1u8, 2, 3, 4, 5].into_iter().map(Ok::)); - spawn_listener_from_iter(summatory_handle.clone(), message_builder, stream); + spawn_listener(summatory_handle.clone(), message_builder, stream); - // Wait for the stream to finish processing + // Wait for 1 second so the whole stream is processed rt::sleep(Duration::from_secs(1)); let val = Summatory::get_value(&mut summatory_handle).unwrap(); @@ -74,13 +76,17 @@ pub fn test_sum_numbers_from_iter() { } #[test] -pub fn test_sum_numbers_from_async_stream() { - // In this example we are converting an async stream into a synchronous one - // Does this make sense in a real-world scenario? +pub fn test_sum_numbers_from_channel() { let mut summatory_handle = Summatory::start(0); - let stream = tokio_stream::iter(vec![1u8, 2, 3, 4, 5].into_iter().map(Ok::)); + let (tx, rx) = rt::mpsc::channel::>(5); - spawn_listener(summatory_handle.clone(), message_builder, stream); + rt::spawn(move || { + for i in 1..=5 { + tx.send(Ok(i)).unwrap(); + } + }); + + spawn_listener_from_iter(summatory_handle.clone(), message_builder, rx.into_iter()); // Wait for 1 second so the whole stream is processed rt::sleep(Duration::from_secs(1)); @@ -90,9 +96,9 @@ pub fn test_sum_numbers_from_async_stream() { } #[test] -pub fn test_sum_numbers_from_channel() { +pub fn test_sum_numbers_from_unbounded_channel() { let mut summatory_handle = Summatory::start(0); - let (tx, rx) = rt::mpsc::channel::>(); + let (tx, rx) = rt::mpsc::unbounded_channel::>(); rt::spawn(move || { for i in 1..=5 { @@ -108,3 +114,17 @@ pub fn test_sum_numbers_from_channel() { let val = Summatory::get_value(&mut summatory_handle).unwrap(); assert_eq!(val, 15); } + +#[test] +pub fn test_sum_numbers_from_iter() { + let mut summatory_handle = Summatory::start(0); + let stream = vec![1u8, 2, 3, 4, 5].into_iter().map(Ok::); + + spawn_listener_from_iter(summatory_handle.clone(), message_builder, stream); + + // Wait for the stream to finish processing + rt::sleep(Duration::from_secs(1)); + + let val = Summatory::get_value(&mut summatory_handle).unwrap(); + assert_eq!(val, 15); +} diff --git a/rt/src/threads/mpsc.rs b/rt/src/threads/mpsc.rs index c5b140a..350a7aa 100644 --- a/rt/src/threads/mpsc.rs +++ b/rt/src/threads/mpsc.rs @@ -1,3 +1,5 @@ //! non-async replacement for mpsc channels -pub use std::sync::mpsc::{channel, Receiver, SendError, Sender}; +pub use std::sync::mpsc::{ + channel as unbounded_channel, sync_channel as channel, Receiver, SendError, Sender, +}; From 0a78b6416c13592ec39f25cbaf63f5500d504359 Mon Sep 17 00:00:00 2001 From: Juan Munoz Date: Thu, 3 Jul 2025 12:38:13 -0300 Subject: [PATCH 12/34] merge duplicated code --- concurrency/src/threads/stream.rs | 22 +++------------------- 1 file changed, 3 insertions(+), 19 deletions(-) diff --git a/concurrency/src/threads/stream.rs b/concurrency/src/threads/stream.rs index 3018e7c..e7add15 100644 --- a/concurrency/src/threads/stream.rs +++ b/concurrency/src/threads/stream.rs @@ -37,11 +37,8 @@ pub fn spawn_listener_from_iter( /// Spawns a listener that listens to a stream and sends messages to a GenServer. /// /// Items sent through the stream are required to be wrapped in a Result type. -pub fn spawn_listener( - mut handle: GenServerHandle, - message_builder: F, - mut stream: S, -) where +pub fn spawn_listener(handle: GenServerHandle, message_builder: F, mut stream: S) +where T: GenServer + 'static, F: Fn(I) -> T::CastMsg + Send + 'static, I: Send + 'static, @@ -50,18 +47,5 @@ pub fn spawn_listener( { // Convert the stream into an iterator that blocks on each item. let iterable = iter::from_fn(move || futures::executor::block_on(stream.next())); - thread::spawn(move || { - for res in iterable { - match res { - Ok(i) => { - let _ = handle.cast(message_builder(i)); - } - Err(err) => { - tracing::error!("Error in stream: {:?}", err); - break; - } - } - } - tracing::trace!("Stream finished"); - }); + spawn_listener_from_iter(handle, message_builder, iterable); } From 835bf083a15f307299142f6b71c004f0e83e9edb Mon Sep 17 00:00:00 2001 From: Juan Munoz Date: Thu, 3 Jul 2025 15:08:26 -0300 Subject: [PATCH 13/34] add cancel token with 'flaky' test --- concurrency/src/threads/stream.rs | 16 +++++- concurrency/src/threads/stream_tests.rs | 76 ++++++++++++++++++++++++- 2 files changed, 88 insertions(+), 4 deletions(-) diff --git a/concurrency/src/threads/stream.rs b/concurrency/src/threads/stream.rs index e7add15..93a0d5c 100644 --- a/concurrency/src/threads/stream.rs +++ b/concurrency/src/threads/stream.rs @@ -1,7 +1,8 @@ use crate::threads::{GenServer, GenServerHandle}; -use std::thread; +use std::thread::{self, JoinHandle}; use futures::{Stream, StreamExt}; +use spawned_rt::threads::CancellationToken; use std::iter; /// Spawns a listener that iterates over an iterable and sends messages to a GenServer. @@ -11,15 +12,23 @@ pub fn spawn_listener_from_iter( mut handle: GenServerHandle, message_builder: F, iterable: S, -) where +) -> (JoinHandle<()>, CancellationToken) +where T: GenServer + 'static, F: Fn(I) -> T::CastMsg + Send + 'static, I: Send + 'static, E: std::fmt::Debug + Send + 'static, S: Iterator> + Send + 'static, { - thread::spawn(move || { + let cancelation_token = CancellationToken::new(); + let mut cloned_token = cancelation_token.clone(); + let join_handle = thread::spawn(move || { for res in iterable { + if cloned_token.is_cancelled() { + tracing::trace!("Received signal to stop listener, stopping"); + break; + } + match res { Ok(i) => { let _ = handle.cast(message_builder(i)); @@ -32,6 +41,7 @@ pub fn spawn_listener_from_iter( } tracing::trace!("Stream finished"); }); + (join_handle, cancelation_token) } /// Spawns a listener that listens to a stream and sends messages to a GenServer. diff --git a/concurrency/src/threads/stream_tests.rs b/concurrency/src/threads/stream_tests.rs index 6616f77..075aec8 100644 --- a/concurrency/src/threads/stream_tests.rs +++ b/concurrency/src/threads/stream_tests.rs @@ -1,4 +1,4 @@ -use std::time::Duration; +use std::{time::Duration, u8}; use crate::threads::{ stream::{spawn_listener, spawn_listener_from_iter}, @@ -128,3 +128,77 @@ pub fn test_sum_numbers_from_iter() { let val = Summatory::get_value(&mut summatory_handle).unwrap(); assert_eq!(val, 15); } + +type BigSummatoryHandle = GenServerHandle; + +struct BigSummatory; + +type BigSummatoryState = u128; +type BigSummatoryCastMessage = BigSummatoryState; +type BigSummatoryOutMessage = BigSummatoryState; + +impl BigSummatory { + pub fn get_value(server: &mut BigSummatoryHandle) -> Result { + server.call(()).map_err(|_| ()) + } +} + +impl GenServer for BigSummatory { + type CallMsg = (); // We only handle one type of call, so there is no need for a specific message type. + type CastMsg = BigSummatoryCastMessage; + type OutMsg = BigSummatoryOutMessage; + type State = BigSummatoryState; + type Error = (); + + fn new() -> Self { + Self + } + + fn handle_cast( + &mut self, + message: Self::CastMsg, + _handle: &GenServerHandle, + state: Self::State, + ) -> CastResponse { + let new_state = state + message; + CastResponse::NoReply(new_state) + } + + fn handle_call( + &mut self, + _message: Self::CallMsg, + _handle: &GenServerHandle, + state: Self::State, + ) -> CallResponse { + let current_value = state; + CallResponse::Reply(state, current_value) + } +} + +#[test] +pub fn test_stream_cancellation() { + // TODO: This test is in some way flaky, it relies on the processing machine + // not being fast enough, look for a more reliable way to test this. + let mut summatory_handle = BigSummatory::start(0); + + let big_range = 1u32..=u16::MAX as u32; + let big_stream = big_range.clone().into_iter().map(Ok::); + let expected_result: u128 = big_range.sum::() as u128; + + let (_handle, mut cancel_token) = + spawn_listener_from_iter(summatory_handle.clone(), |x| x as u128, big_stream); + + // Wait for a moment to allow some processing + rt::sleep(Duration::from_millis(1)); + + // Cancel the stream processing + cancel_token.cancel(); + + let val = BigSummatory::get_value(&mut summatory_handle).unwrap(); + + // BigSummatory should not have had enough time to process all items + assert_ne!(val, expected_result); + + // Yet still should have processed some items + assert!(val > 0); +} From 0c24840e5a571f31f3abfb6045798a622b80c5b1 Mon Sep 17 00:00:00 2001 From: Juan Munoz Date: Thu, 3 Jul 2025 15:36:16 -0300 Subject: [PATCH 14/34] unflaky the test --- concurrency/src/threads/stream_tests.rs | 82 +++++-------------------- 1 file changed, 17 insertions(+), 65 deletions(-) diff --git a/concurrency/src/threads/stream_tests.rs b/concurrency/src/threads/stream_tests.rs index 075aec8..5daa915 100644 --- a/concurrency/src/threads/stream_tests.rs +++ b/concurrency/src/threads/stream_tests.rs @@ -129,76 +129,28 @@ pub fn test_sum_numbers_from_iter() { assert_eq!(val, 15); } -type BigSummatoryHandle = GenServerHandle; - -struct BigSummatory; - -type BigSummatoryState = u128; -type BigSummatoryCastMessage = BigSummatoryState; -type BigSummatoryOutMessage = BigSummatoryState; - -impl BigSummatory { - pub fn get_value(server: &mut BigSummatoryHandle) -> Result { - server.call(()).map_err(|_| ()) - } -} - -impl GenServer for BigSummatory { - type CallMsg = (); // We only handle one type of call, so there is no need for a specific message type. - type CastMsg = BigSummatoryCastMessage; - type OutMsg = BigSummatoryOutMessage; - type State = BigSummatoryState; - type Error = (); - - fn new() -> Self { - Self - } - - fn handle_cast( - &mut self, - message: Self::CastMsg, - _handle: &GenServerHandle, - state: Self::State, - ) -> CastResponse { - let new_state = state + message; - CastResponse::NoReply(new_state) - } - - fn handle_call( - &mut self, - _message: Self::CallMsg, - _handle: &GenServerHandle, - state: Self::State, - ) -> CallResponse { - let current_value = state; - CallResponse::Reply(state, current_value) - } -} - #[test] pub fn test_stream_cancellation() { - // TODO: This test is in some way flaky, it relies on the processing machine - // not being fast enough, look for a more reliable way to test this. - let mut summatory_handle = BigSummatory::start(0); - - let big_range = 1u32..=u16::MAX as u32; - let big_stream = big_range.clone().into_iter().map(Ok::); - let expected_result: u128 = big_range.sum::() as u128; - - let (_handle, mut cancel_token) = - spawn_listener_from_iter(summatory_handle.clone(), |x| x as u128, big_stream); + let mut summatory_handle = Summatory::start(0); + let stream = vec![1u8, 2, 3, 4, 5].into_iter().map(Ok::); - // Wait for a moment to allow some processing - rt::sleep(Duration::from_millis(1)); + const RUNNING_TIME: u64 = 1000; - // Cancel the stream processing - cancel_token.cancel(); + // Add 'processing time' to each message + let message_builder = |x: u8| { + rt::sleep(Duration::from_millis(RUNNING_TIME / 4)); + x as u16 + }; - let val = BigSummatory::get_value(&mut summatory_handle).unwrap(); + spawn_listener_from_iter(summatory_handle.clone(), message_builder, stream); - // BigSummatory should not have had enough time to process all items - assert_ne!(val, expected_result); + // Wait for the stream to finish processing + rt::sleep(Duration::from_millis(RUNNING_TIME)); - // Yet still should have processed some items - assert!(val > 0); + // The reasoning for this assertion is that each message takes a quarter of the total time + // to be processed, so having a stream of 5 messages, some of them will never be processed. + // At first glance we would expect val == 10 considering it has time to process four messages, + // but in reality it will only get to process three messages due to other unacounted (minimal) overheads. + let val = Summatory::get_value(&mut summatory_handle).unwrap(); + assert_eq!(val, 6); } From 541cc1eb76482668383f2fb1c415f1bb14e9770e Mon Sep 17 00:00:00 2001 From: Juan Munoz Date: Thu, 3 Jul 2025 16:47:27 -0300 Subject: [PATCH 15/34] add cancellation to task impl of spawn_listener --- concurrency/src/tasks/stream.rs | 14 ++++++++-- concurrency/src/tasks/stream_tests.rs | 37 +++++++++++++++++++++++++ concurrency/src/threads/stream_tests.rs | 5 ++-- 3 files changed, 52 insertions(+), 4 deletions(-) diff --git a/concurrency/src/tasks/stream.rs b/concurrency/src/tasks/stream.rs index dd03419..d47579d 100644 --- a/concurrency/src/tasks/stream.rs +++ b/concurrency/src/tasks/stream.rs @@ -1,5 +1,6 @@ use crate::tasks::{GenServer, GenServerHandle}; use futures::{Stream, StreamExt}; +use spawned_rt::tasks::{CancellationToken, JoinHandle}; /// Spawns a listener that listens to a stream and sends messages to a GenServer. /// @@ -8,15 +9,23 @@ pub fn spawn_listener( mut handle: GenServerHandle, message_builder: F, mut stream: S, -) where +) -> (JoinHandle<()>, CancellationToken) +where T: GenServer + 'static, F: Fn(I) -> T::CastMsg + Send + 'static, I: Send, E: std::fmt::Debug + Send, S: Unpin + Send + Stream> + 'static, { - spawned_rt::tasks::spawn(async move { + let cancelation_token = CancellationToken::new(); + let cloned_token = cancelation_token.clone(); + let join_handle = spawned_rt::tasks::spawn(async move { loop { + if cloned_token.is_cancelled() { + tracing::trace!("Received signal to stop listener, stopping"); + break; + } + match stream.next().await { Some(res) => match res { Ok(i) => { @@ -34,4 +43,5 @@ pub fn spawn_listener( } } }); + (join_handle, cancelation_token) } diff --git a/concurrency/src/tasks/stream_tests.rs b/concurrency/src/tasks/stream_tests.rs index 756af01..df73f9f 100644 --- a/concurrency/src/tasks/stream_tests.rs +++ b/concurrency/src/tasks/stream_tests.rs @@ -158,3 +158,40 @@ pub fn test_sum_numbers_from_broadcast_channel() { assert_eq!(val, 15); }) } + +#[test] +pub fn test_stream_cancellation() { + const RUNNING_TIME: u64 = 1000; + + let runtime = rt::Runtime::new().unwrap(); + runtime.block_on(async move { + let mut summatory_handle = Summatory::start(0); + let (tx, rx) = spawned_rt::tasks::mpsc::channel::>(5); + + // Spawn a task to send numbers to the channel + spawned_rt::tasks::spawn(async move { + for i in 1..=5 { + tx.send(Ok(i)).await.unwrap(); + rt::sleep(Duration::from_millis(RUNNING_TIME / 4)).await; + } + }); + + let (_handle, cancellation_token) = spawn_listener( + summatory_handle.clone(), + message_builder, + ReceiverStream::new(rx), + ); + + // Wait for 1 second so the whole stream is processed + rt::sleep(Duration::from_millis(RUNNING_TIME)).await; + + cancellation_token.cancel(); + + // The reasoning for this assertion is that each message takes a quarter of the total time + // to be processed, so having a stream of 5 messages, the last one won't be processed. + // We could safely assume that it will get to process 4 messages, but in case of any extenal + // slowdown, it could process less. + let val = Summatory::get_value(&mut summatory_handle).await.unwrap(); + assert!(val > 0 && val < 15); + }) +} diff --git a/concurrency/src/threads/stream_tests.rs b/concurrency/src/threads/stream_tests.rs index 5daa915..0a17abd 100644 --- a/concurrency/src/threads/stream_tests.rs +++ b/concurrency/src/threads/stream_tests.rs @@ -150,7 +150,8 @@ pub fn test_stream_cancellation() { // The reasoning for this assertion is that each message takes a quarter of the total time // to be processed, so having a stream of 5 messages, some of them will never be processed. // At first glance we would expect val == 10 considering it has time to process four messages, - // but in reality it will only get to process three messages due to other unacounted (minimal) overheads. + // but in reality it will most likely get to process only three messages due to the overhead + // of creating a new thread for each message. let val = Summatory::get_value(&mut summatory_handle).unwrap(); - assert_eq!(val, 6); + assert!(val > 0 && val < 15); } From 2950644a337f20353d450cfd4212a95e9b76f7d9 Mon Sep 17 00:00:00 2001 From: Juan Munoz Date: Thu, 3 Jul 2025 16:54:41 -0300 Subject: [PATCH 16/34] docs & clippy --- Cargo.toml | 3 +-- concurrency/src/tasks/stream.rs | 3 +++ concurrency/src/threads/stream.rs | 3 +++ concurrency/src/threads/stream_tests.rs | 2 +- 4 files changed, 8 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 33c635a..c76db38 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,4 +1,5 @@ [workspace] +resolver = "3" members = [ "rt", "examples/bank", @@ -17,5 +18,3 @@ spawned-rt = { version = "0.1.0", path = "rt" } spawned-concurrency = { version = "0.1.0", path = "concurrency" } tracing = { version = "0.1.41", features = ["log"] } tracing-subscriber = { version = "0.3.19", features = ["env-filter"] } - - diff --git a/concurrency/src/tasks/stream.rs b/concurrency/src/tasks/stream.rs index d47579d..f520a42 100644 --- a/concurrency/src/tasks/stream.rs +++ b/concurrency/src/tasks/stream.rs @@ -5,6 +5,9 @@ use spawned_rt::tasks::{CancellationToken, JoinHandle}; /// Spawns a listener that listens to a stream and sends messages to a GenServer. /// /// Items sent through the stream are required to be wrapped in a Result type. +/// +/// This function returns a handle to the spawned task and a cancellation token +/// to stop it. pub fn spawn_listener( mut handle: GenServerHandle, message_builder: F, diff --git a/concurrency/src/threads/stream.rs b/concurrency/src/threads/stream.rs index 93a0d5c..faecb5d 100644 --- a/concurrency/src/threads/stream.rs +++ b/concurrency/src/threads/stream.rs @@ -8,6 +8,9 @@ use std::iter; /// Spawns a listener that iterates over an iterable and sends messages to a GenServer. /// /// Items sent through the iterator are required to be wrapped in a Result type. +/// +/// This function returns a handle to the spawned task and a cancellation token +/// to stop it. pub fn spawn_listener_from_iter( mut handle: GenServerHandle, message_builder: F, diff --git a/concurrency/src/threads/stream_tests.rs b/concurrency/src/threads/stream_tests.rs index 0a17abd..0d41b4b 100644 --- a/concurrency/src/threads/stream_tests.rs +++ b/concurrency/src/threads/stream_tests.rs @@ -1,4 +1,4 @@ -use std::{time::Duration, u8}; +use std::time::Duration; use crate::threads::{ stream::{spawn_listener, spawn_listener_from_iter}, From 6f7a305825f56499d55dd942304732691b805fe9 Mon Sep 17 00:00:00 2001 From: Juan Munoz Date: Fri, 4 Jul 2025 11:33:57 -0300 Subject: [PATCH 17/34] use futures select inside spawn listener --- concurrency/src/tasks/stream.rs | 49 +++++++++++++++++++-------------- 1 file changed, 28 insertions(+), 21 deletions(-) diff --git a/concurrency/src/tasks/stream.rs b/concurrency/src/tasks/stream.rs index f520a42..bb3df3b 100644 --- a/concurrency/src/tasks/stream.rs +++ b/concurrency/src/tasks/stream.rs @@ -1,5 +1,5 @@ use crate::tasks::{GenServer, GenServerHandle}; -use futures::{Stream, StreamExt}; +use futures::{future::select, Stream, StreamExt}; use spawned_rt::tasks::{CancellationToken, JoinHandle}; /// Spawns a listener that listens to a stream and sends messages to a GenServer. @@ -15,7 +15,7 @@ pub fn spawn_listener( ) -> (JoinHandle<()>, CancellationToken) where T: GenServer + 'static, - F: Fn(I) -> T::CastMsg + Send + 'static, + F: Fn(I) -> T::CastMsg + Send + 'static + std::marker::Sync, I: Send, E: std::fmt::Debug + Send, S: Unpin + Send + Stream> + 'static, @@ -23,27 +23,34 @@ where let cancelation_token = CancellationToken::new(); let cloned_token = cancelation_token.clone(); let join_handle = spawned_rt::tasks::spawn(async move { - loop { - if cloned_token.is_cancelled() { - tracing::trace!("Received signal to stop listener, stopping"); - break; - } - - match stream.next().await { - Some(res) => match res { - Ok(i) => { - let _ = handle.cast(message_builder(i)).await; - } - Err(e) => { - tracing::trace!("Received Error in msg {e:?}"); - break; + let result = select( + Box::pin(cloned_token.cancelled()), + Box::pin(async { + loop { + match stream.next().await { + Some(Ok(i)) => match handle.cast(message_builder(i)).await { + Ok(_) => tracing::trace!("Message sent successfully"), + Err(e) => { + tracing::error!("Failed to send message: {e:?}"); + break; + } + }, + Some(Err(e)) => { + tracing::trace!("Received Error in msg {e:?}"); + break; + } + None => { + tracing::trace!("Stream finished"); + break; + } } - }, - None => { - tracing::trace!("Stream finished"); - break; } - } + }), + ) + .await; + match result { + futures::future::Either::Left(_) => tracing::trace!("Listener cancelled"), + futures::future::Either::Right(_) => (), // Stream finished or errored out } }); (join_handle, cancelation_token) From 7b0e33df8bb641c2f75dde52f42b25fb25a90191 Mon Sep 17 00:00:00 2001 From: Juan Munoz Date: Fri, 4 Jul 2025 12:30:41 -0300 Subject: [PATCH 18/34] use genserver cancel token on stream --- concurrency/src/threads/gen_server.rs | 20 +++++++++++++++++--- concurrency/src/threads/stream.rs | 9 +++------ concurrency/src/threads/stream_tests.rs | 2 ++ 3 files changed, 22 insertions(+), 9 deletions(-) diff --git a/concurrency/src/threads/gen_server.rs b/concurrency/src/threads/gen_server.rs index 69dd94c..44da3a9 100644 --- a/concurrency/src/threads/gen_server.rs +++ b/concurrency/src/threads/gen_server.rs @@ -1,6 +1,6 @@ //! GenServer trait and structs to create an abstraction similar to Erlang gen_server. //! See examples/name_server for a usage example. -use spawned_rt::threads::{self as rt, mpsc, oneshot}; +use spawned_rt::threads::{self as rt, mpsc, oneshot, CancellationToken}; use std::{ fmt::Debug, panic::{catch_unwind, AssertUnwindSafe}, @@ -8,15 +8,17 @@ use std::{ use crate::error::GenServerError; -#[derive(Debug)] pub struct GenServerHandle { pub tx: mpsc::Sender>, + /// Cancellation token to stop the GenServer + cancellation_token: CancellationToken, } impl Clone for GenServerHandle { fn clone(&self) -> Self { Self { tx: self.tx.clone(), + cancellation_token: self.cancellation_token.clone(), } } } @@ -24,7 +26,11 @@ impl Clone for GenServerHandle { impl GenServerHandle { pub(crate) fn new(initial_state: G::State) -> Self { let (tx, mut rx) = mpsc::unbounded_channel::>(); - let handle = GenServerHandle { tx }; + let cancellation_token = CancellationToken::new(); + let handle = GenServerHandle { + tx, + cancellation_token, + }; let mut gen_server: G = GenServer::new(); let handle_clone = handle.clone(); // Ignore the JoinHandle for now. Maybe we'll use it in the future @@ -57,6 +63,14 @@ impl GenServerHandle { .send(GenServerInMsg::Cast { message }) .map_err(|_error| GenServerError::Server) } + + pub fn cancellation_token(&self) -> CancellationToken { + self.cancellation_token.clone() + } + + pub fn stop(&mut self) { + self.cancellation_token.cancel(); + } } pub enum GenServerInMsg { diff --git a/concurrency/src/threads/stream.rs b/concurrency/src/threads/stream.rs index faecb5d..e01d88d 100644 --- a/concurrency/src/threads/stream.rs +++ b/concurrency/src/threads/stream.rs @@ -2,7 +2,6 @@ use crate::threads::{GenServer, GenServerHandle}; use std::thread::{self, JoinHandle}; use futures::{Stream, StreamExt}; -use spawned_rt::threads::CancellationToken; use std::iter; /// Spawns a listener that iterates over an iterable and sends messages to a GenServer. @@ -15,7 +14,7 @@ pub fn spawn_listener_from_iter( mut handle: GenServerHandle, message_builder: F, iterable: S, -) -> (JoinHandle<()>, CancellationToken) +) -> JoinHandle<()> where T: GenServer + 'static, F: Fn(I) -> T::CastMsg + Send + 'static, @@ -23,11 +22,9 @@ where E: std::fmt::Debug + Send + 'static, S: Iterator> + Send + 'static, { - let cancelation_token = CancellationToken::new(); - let mut cloned_token = cancelation_token.clone(); let join_handle = thread::spawn(move || { for res in iterable { - if cloned_token.is_cancelled() { + if handle.cancellation_token().is_cancelled() { tracing::trace!("Received signal to stop listener, stopping"); break; } @@ -44,7 +41,7 @@ where } tracing::trace!("Stream finished"); }); - (join_handle, cancelation_token) + join_handle } /// Spawns a listener that listens to a stream and sends messages to a GenServer. diff --git a/concurrency/src/threads/stream_tests.rs b/concurrency/src/threads/stream_tests.rs index 0d41b4b..836bbb0 100644 --- a/concurrency/src/threads/stream_tests.rs +++ b/concurrency/src/threads/stream_tests.rs @@ -147,6 +147,8 @@ pub fn test_stream_cancellation() { // Wait for the stream to finish processing rt::sleep(Duration::from_millis(RUNNING_TIME)); + summatory_handle.stop(); + // The reasoning for this assertion is that each message takes a quarter of the total time // to be processed, so having a stream of 5 messages, some of them will never be processed. // At first glance we would expect val == 10 considering it has time to process four messages, From 138890fae6adf7c6bb69d2b656ce4a886f3e853d Mon Sep 17 00:00:00 2001 From: Juan Munoz Date: Fri, 4 Jul 2025 14:54:23 -0300 Subject: [PATCH 19/34] add cancelation token to timer --- concurrency/src/threads/time.rs | 6 ++-- concurrency/src/threads/timer_tests.rs | 41 ++++++++++++++++++++++++++ 2 files changed, 45 insertions(+), 2 deletions(-) diff --git a/concurrency/src/threads/time.rs b/concurrency/src/threads/time.rs index 3d47c05..28a1323 100644 --- a/concurrency/src/threads/time.rs +++ b/concurrency/src/threads/time.rs @@ -23,7 +23,8 @@ where let mut cloned_token = cancellation_token.clone(); let join_handle = rt::spawn(move || { rt::sleep(period); - if !cloned_token.is_cancelled() { + // Timer action is ignored if it was either cancelled or the associated GenServer is no longer running. + if !cloned_token.is_cancelled() && !handle.cancellation_token().is_cancelled() { let _ = handle.cast(message); }; }); @@ -46,7 +47,8 @@ where let mut cloned_token = cancellation_token.clone(); let join_handle = rt::spawn(move || loop { rt::sleep(period); - if cloned_token.is_cancelled() { + // Timer action is ignored if it was either cancelled or the associated GenServer is no longer running. + if cloned_token.is_cancelled() || handle.cancellation_token().is_cancelled() { break; } else { let _ = handle.cast(message.clone()); diff --git a/concurrency/src/threads/timer_tests.rs b/concurrency/src/threads/timer_tests.rs index 6b3b8a4..5c557cd 100644 --- a/concurrency/src/threads/timer_tests.rs +++ b/concurrency/src/threads/timer_tests.rs @@ -231,3 +231,44 @@ pub fn test_send_after_and_cancellation() { // As timer was cancelled, count should remain at 1 assert_eq!(DelayedOutMessage::Count(1), count2); } + +#[test] +pub fn test_send_after_and_gen_server_stop() { + // Start a Delayed + let mut repeater = Delayed::start(DelayedState { count: 0 }); + + // Set a just once timed message + let _ = send_after( + Duration::from_millis(100), + repeater.clone(), + DelayedCastMessage::Inc, + ); + + // Wait for 200 milliseconds + rt::sleep(Duration::from_millis(200)); + + // Check count + let count = Delayed::get_count(&mut repeater).unwrap(); + + // Only one message (no repetition) + assert_eq!(DelayedOutMessage::Count(1), count); + + // New timer + let _ = send_after( + Duration::from_millis(100), + repeater.clone(), + DelayedCastMessage::Inc, + ); + + // Stop the GenServer before timeout + repeater.stop(); + + // Wait another 200 milliseconds + rt::sleep(Duration::from_millis(200)); + + // Check count again + let count2 = Delayed::get_count(&mut repeater).unwrap(); + + // As timer was cancelled, count should remain at 1 + assert_eq!(DelayedOutMessage::Count(1), count2); +} From 116241d9391bd6957e4f4c1f615d92da90a6cbcc Mon Sep 17 00:00:00 2001 From: Juan Munoz Date: Fri, 4 Jul 2025 15:30:33 -0300 Subject: [PATCH 20/34] add cancellation token to gen server tasks impl --- concurrency/src/tasks/gen_server.rs | 26 +++++++++++++--- concurrency/src/tasks/time.rs | 18 ++++++++++-- concurrency/src/tasks/timer_tests.rs | 44 ++++++++++++++++++++++++++++ 3 files changed, 82 insertions(+), 6 deletions(-) diff --git a/concurrency/src/tasks/gen_server.rs b/concurrency/src/tasks/gen_server.rs index 9edf96c..86b49f5 100644 --- a/concurrency/src/tasks/gen_server.rs +++ b/concurrency/src/tasks/gen_server.rs @@ -1,20 +1,22 @@ //! GenServer trait and structs to create an abstraction similar to Erlang gen_server. //! See examples/name_server for a usage example. use futures::future::FutureExt as _; -use spawned_rt::tasks::{self as rt, mpsc, oneshot}; +use spawned_rt::tasks::{self as rt, mpsc, oneshot, CancellationToken}; use std::{fmt::Debug, future::Future, panic::AssertUnwindSafe}; use crate::error::GenServerError; -#[derive(Debug)] pub struct GenServerHandle { pub tx: mpsc::UnboundedSender>, + /// Cancellation token to stop the GenServer + cancellation_token: CancellationToken, } impl Clone for GenServerHandle { fn clone(&self) -> Self { Self { tx: self.tx.clone(), + cancellation_token: self.cancellation_token.clone(), } } } @@ -22,7 +24,11 @@ impl Clone for GenServerHandle { impl GenServerHandle { pub(crate) fn new(initial_state: G::State) -> Self { let (tx, mut rx) = mpsc::unbounded_channel::>(); - let handle = GenServerHandle { tx }; + let cancellation_token = CancellationToken::new(); + let handle = GenServerHandle { + tx, + cancellation_token, + }; let mut gen_server: G = GenServer::new(); let handle_clone = handle.clone(); // Ignore the JoinHandle for now. Maybe we'll use it in the future @@ -40,7 +46,11 @@ impl GenServerHandle { pub(crate) fn new_blocking(initial_state: G::State) -> Self { let (tx, mut rx) = mpsc::unbounded_channel::>(); - let handle = GenServerHandle { tx }; + let cancellation_token = CancellationToken::new(); + let handle = GenServerHandle { + tx, + cancellation_token, + }; let mut gen_server: G = GenServer::new(); let handle_clone = handle.clone(); // Ignore the JoinHandle for now. Maybe we'll use it in the future @@ -79,6 +89,14 @@ impl GenServerHandle { .send(GenServerInMsg::Cast { message }) .map_err(|_error| GenServerError::Server) } + + pub fn cancellation_token(&self) -> CancellationToken { + self.cancellation_token.clone() + } + + pub fn stop(&mut self) { + self.cancellation_token.cancel(); + } } pub enum GenServerInMsg { diff --git a/concurrency/src/tasks/time.rs b/concurrency/src/tasks/time.rs index f26118b..619e553 100644 --- a/concurrency/src/tasks/time.rs +++ b/concurrency/src/tasks/time.rs @@ -22,9 +22,16 @@ where { let cancellation_token = CancellationToken::new(); let cloned_token = cancellation_token.clone(); + let gen_server_cancellation_token = handle.cancellation_token(); let join_handle = rt::spawn(async move { - let _ = select( + // Timer action is ignored if it was either cancelled or the associated GenServer is no longer running. + let cancel_conditions = select( Box::pin(cloned_token.cancelled()), + Box::pin(gen_server_cancellation_token.cancelled()), + ); + + let _ = select( + cancel_conditions, Box::pin(async { rt::sleep(period).await; let _ = handle.cast(message.clone()).await; @@ -49,10 +56,17 @@ where { let cancellation_token = CancellationToken::new(); let cloned_token = cancellation_token.clone(); + let gen_server_cancellation_token = handle.cancellation_token(); let join_handle = rt::spawn(async move { loop { - let result = select( + // Timer action is ignored if it was either cancelled or the associated GenServer is no longer running. + let cancel_conditions = select( Box::pin(cloned_token.cancelled()), + Box::pin(gen_server_cancellation_token.cancelled()), + ); + + let result = select( + Box::pin(cancel_conditions), Box::pin(async { rt::sleep(period).await; let _ = handle.cast(message.clone()).await; diff --git a/concurrency/src/tasks/timer_tests.rs b/concurrency/src/tasks/timer_tests.rs index d805c82..6d4b83b 100644 --- a/concurrency/src/tasks/timer_tests.rs +++ b/concurrency/src/tasks/timer_tests.rs @@ -246,3 +246,47 @@ pub fn test_send_after_and_cancellation() { assert_eq!(DelayedOutMessage::Count(1), count2); }); } + +#[test] +pub fn test_send_after_gen_server_stop() { + let runtime = rt::Runtime::new().unwrap(); + runtime.block_on(async move { + // Start a Delayed + let mut repeater = Delayed::start(DelayedState { count: 0 }); + + // Set a just once timed message + let _ = send_after( + Duration::from_millis(100), + repeater.clone(), + DelayedCastMessage::Inc, + ); + + // Wait for 200 milliseconds + rt::sleep(Duration::from_millis(200)).await; + + // Check count + let count = Delayed::get_count(&mut repeater).await.unwrap(); + + // Only one message (no repetition) + assert_eq!(DelayedOutMessage::Count(1), count); + + // New timer + let _ = send_after( + Duration::from_millis(100), + repeater.clone(), + DelayedCastMessage::Inc, + ); + + // Cancel the new timer before timeout + repeater.stop(); + + // Wait another 200 milliseconds + rt::sleep(Duration::from_millis(200)).await; + + // Check count again + let count2 = Delayed::get_count(&mut repeater).await.unwrap(); + + // As timer was cancelled, count should remain at 1 + assert_eq!(DelayedOutMessage::Count(1), count2); + }); +} From 702e4df0c472527df296246528171ef4ec9b8c3e Mon Sep 17 00:00:00 2001 From: Juan Munoz Date: Fri, 4 Jul 2025 16:44:34 -0300 Subject: [PATCH 21/34] remove bounded channels from tasks impl --- Cargo.lock | 1 + concurrency/Cargo.toml | 1 + concurrency/src/tasks/gen_server.rs | 14 +++++----- concurrency/src/tasks/process.rs | 28 ++++++++----------- concurrency/src/tasks/stream_tests.rs | 40 ++++----------------------- examples/ping_pong/src/consumer.rs | 4 +-- examples/ping_pong/src/messages.rs | 4 +-- examples/ping_pong/src/producer.rs | 12 ++++---- rt/src/tasks/mod.rs | 2 +- rt/src/tasks/tokio/mod.rs | 2 +- rt/src/tasks/tokio/mpsc.rs | 9 ++---- 11 files changed, 42 insertions(+), 75 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2821b79..a598332 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1201,6 +1201,7 @@ version = "0.1.0" dependencies = [ "futures", "spawned-rt", + "tokio", "tokio-stream", "tracing", ] diff --git a/concurrency/Cargo.toml b/concurrency/Cargo.toml index 824a7ee..cda0105 100644 --- a/concurrency/Cargo.toml +++ b/concurrency/Cargo.toml @@ -10,6 +10,7 @@ futures = "0.3.1" [dev-dependencies] tokio-stream = { version = "0.1.17" } +tokio = { version = "1", features = ["full"] } [lib] path = "./src/lib.rs" diff --git a/concurrency/src/tasks/gen_server.rs b/concurrency/src/tasks/gen_server.rs index 9edf96c..39ec0b5 100644 --- a/concurrency/src/tasks/gen_server.rs +++ b/concurrency/src/tasks/gen_server.rs @@ -8,7 +8,7 @@ use crate::error::GenServerError; #[derive(Debug)] pub struct GenServerHandle { - pub tx: mpsc::UnboundedSender>, + pub tx: mpsc::Sender>, } impl Clone for GenServerHandle { @@ -21,7 +21,7 @@ impl Clone for GenServerHandle { impl GenServerHandle { pub(crate) fn new(initial_state: G::State) -> Self { - let (tx, mut rx) = mpsc::unbounded_channel::>(); + let (tx, mut rx) = mpsc::channel::>(); let handle = GenServerHandle { tx }; let mut gen_server: G = GenServer::new(); let handle_clone = handle.clone(); @@ -39,7 +39,7 @@ impl GenServerHandle { } pub(crate) fn new_blocking(initial_state: G::State) -> Self { - let (tx, mut rx) = mpsc::unbounded_channel::>(); + let (tx, mut rx) = mpsc::channel::>(); let handle = GenServerHandle { tx }; let mut gen_server: G = GenServer::new(); let handle_clone = handle.clone(); @@ -58,7 +58,7 @@ impl GenServerHandle { handle_clone } - pub fn sender(&self) -> mpsc::UnboundedSender> { + pub fn sender(&self) -> mpsc::Sender> { self.tx.clone() } @@ -131,7 +131,7 @@ where fn run( &mut self, handle: &GenServerHandle, - rx: &mut mpsc::UnboundedReceiver>, + rx: &mut mpsc::Receiver>, state: Self::State, ) -> impl Future> + Send { async { @@ -162,7 +162,7 @@ where fn main_loop( &mut self, handle: &GenServerHandle, - rx: &mut mpsc::UnboundedReceiver>, + rx: &mut mpsc::Receiver>, mut state: Self::State, ) -> impl Future> + Send { async { @@ -181,7 +181,7 @@ where fn receive( &mut self, handle: &GenServerHandle, - rx: &mut mpsc::UnboundedReceiver>, + rx: &mut mpsc::Receiver>, state: Self::State, ) -> impl Future> + Send { async move { diff --git a/concurrency/src/tasks/process.rs b/concurrency/src/tasks/process.rs index 0f936c5..b623d2b 100644 --- a/concurrency/src/tasks/process.rs +++ b/concurrency/src/tasks/process.rs @@ -6,12 +6,12 @@ use std::future::Future; #[derive(Debug)] pub struct ProcessInfo { - pub tx: mpsc::UnboundedSender, + pub tx: mpsc::Sender, pub handle: JoinHandle<()>, } impl ProcessInfo { - pub fn sender(&self) -> mpsc::UnboundedSender { + pub fn sender(&self) -> mpsc::Sender { self.tx.clone() } @@ -26,7 +26,7 @@ where { fn spawn(mut self) -> impl Future> + Send { async { - let (tx, mut rx) = mpsc::unbounded_channel::(); + let (tx, mut rx) = mpsc::channel::(); let tx_clone = tx.clone(); let handle = rt::spawn(async move { self.run(&tx_clone, &mut rx).await; @@ -37,8 +37,8 @@ where fn run( &mut self, - tx: &mpsc::UnboundedSender, - rx: &mut mpsc::UnboundedReceiver, + tx: &mpsc::Sender, + rx: &mut mpsc::Receiver, ) -> impl Future + Send { async { self.init(tx).await; @@ -48,8 +48,8 @@ where fn main_loop( &mut self, - tx: &mpsc::UnboundedSender, - rx: &mut mpsc::UnboundedReceiver, + tx: &mpsc::Sender, + rx: &mut mpsc::Receiver, ) -> impl Future + Send { async { loop { @@ -66,14 +66,14 @@ where false } - fn init(&mut self, _tx: &mpsc::UnboundedSender) -> impl Future + Send { + fn init(&mut self, _tx: &mpsc::Sender) -> impl Future + Send { async {} } fn receive( &mut self, - tx: &mpsc::UnboundedSender, - rx: &mut mpsc::UnboundedReceiver, + tx: &mpsc::Sender, + rx: &mut mpsc::Receiver, ) -> impl std::future::Future + Send { async { match rx.recv().await { @@ -83,14 +83,10 @@ where } } - fn handle( - &mut self, - message: T, - tx: &mpsc::UnboundedSender, - ) -> impl Future + Send; + fn handle(&mut self, message: T, tx: &mpsc::Sender) -> impl Future + Send; } -pub fn send(tx: &mpsc::UnboundedSender, message: T) +pub fn send(tx: &mpsc::Sender, message: T) where T: Send, { diff --git a/concurrency/src/tasks/stream_tests.rs b/concurrency/src/tasks/stream_tests.rs index df73f9f..c2d6fde 100644 --- a/concurrency/src/tasks/stream_tests.rs +++ b/concurrency/src/tasks/stream_tests.rs @@ -1,6 +1,6 @@ use std::time::Duration; -use spawned_rt::tasks::{self as rt, BroadcastStream, ReceiverStream, UnboundedReceiverStream}; +use spawned_rt::tasks::{self as rt, BroadcastStream, ReceiverStream}; use crate::tasks::{ stream::spawn_listener, CallResponse, CastResponse, GenServer, GenServerHandle, @@ -80,35 +80,7 @@ pub fn test_sum_numbers_from_channel() { let runtime = rt::Runtime::new().unwrap(); runtime.block_on(async move { let mut summatory_handle = Summatory::start(0); - let (tx, rx) = spawned_rt::tasks::mpsc::channel::>(5); - - // Spawn a task to send numbers to the channel - spawned_rt::tasks::spawn(async move { - for i in 1..=5 { - tx.send(Ok(i)).await.unwrap(); - } - }); - - spawn_listener( - summatory_handle.clone(), - message_builder, - ReceiverStream::new(rx), - ); - - // Wait for 1 second so the whole stream is processed - rt::sleep(Duration::from_secs(1)).await; - - let val = Summatory::get_value(&mut summatory_handle).await.unwrap(); - assert_eq!(val, 15); - }) -} - -#[test] -pub fn test_sum_numbers_from_unbounded_channel() { - let runtime = rt::Runtime::new().unwrap(); - runtime.block_on(async move { - let mut summatory_handle = Summatory::start(0); - let (tx, rx) = spawned_rt::tasks::mpsc::unbounded_channel::>(); + let (tx, rx) = spawned_rt::tasks::mpsc::channel::>(); // Spawn a task to send numbers to the channel spawned_rt::tasks::spawn(async move { @@ -120,7 +92,7 @@ pub fn test_sum_numbers_from_unbounded_channel() { spawn_listener( summatory_handle.clone(), message_builder, - UnboundedReceiverStream::new(rx), + ReceiverStream::new(rx), ); // Wait for 1 second so the whole stream is processed @@ -136,7 +108,7 @@ pub fn test_sum_numbers_from_broadcast_channel() { let runtime = rt::Runtime::new().unwrap(); runtime.block_on(async move { let mut summatory_handle = Summatory::start(0); - let (tx, rx) = spawned_rt::tasks::mpsc::broadcast_channel(5); + let (tx, rx) = tokio::sync::broadcast::channel::(5); // Spawn a task to send numbers to the channel spawned_rt::tasks::spawn(async move { @@ -166,12 +138,12 @@ pub fn test_stream_cancellation() { let runtime = rt::Runtime::new().unwrap(); runtime.block_on(async move { let mut summatory_handle = Summatory::start(0); - let (tx, rx) = spawned_rt::tasks::mpsc::channel::>(5); + let (tx, rx) = spawned_rt::tasks::mpsc::channel::>(); // Spawn a task to send numbers to the channel spawned_rt::tasks::spawn(async move { for i in 1..=5 { - tx.send(Ok(i)).await.unwrap(); + tx.send(Ok(i)).unwrap(); rt::sleep(Duration::from_millis(RUNNING_TIME / 4)).await; } }); diff --git a/examples/ping_pong/src/consumer.rs b/examples/ping_pong/src/consumer.rs index 5118184..8ead269 100644 --- a/examples/ping_pong/src/consumer.rs +++ b/examples/ping_pong/src/consumer.rs @@ -1,5 +1,5 @@ use spawned_concurrency::tasks::{self as concurrency, Process, ProcessInfo}; -use spawned_rt::tasks::mpsc::UnboundedSender; +use spawned_rt::tasks::mpsc::Sender; use crate::messages::Message; @@ -12,7 +12,7 @@ impl Consumer { } impl Process for Consumer { - async fn handle(&mut self, message: Message, _tx: &UnboundedSender) -> Message { + async fn handle(&mut self, message: Message, _tx: &Sender) -> Message { tracing::info!("Consumer received {message:?}"); match message.clone() { Message::Ping { from } => { diff --git a/examples/ping_pong/src/messages.rs b/examples/ping_pong/src/messages.rs index a5e797a..a22ae6c 100644 --- a/examples/ping_pong/src/messages.rs +++ b/examples/ping_pong/src/messages.rs @@ -1,7 +1,7 @@ -use spawned_rt::tasks::mpsc::UnboundedSender; +use spawned_rt::tasks::mpsc::Sender; #[derive(Debug, Clone)] pub enum Message { - Ping { from: UnboundedSender }, + Ping { from: Sender }, Pong, } diff --git a/examples/ping_pong/src/producer.rs b/examples/ping_pong/src/producer.rs index b542496..71829a1 100644 --- a/examples/ping_pong/src/producer.rs +++ b/examples/ping_pong/src/producer.rs @@ -1,18 +1,18 @@ use spawned_concurrency::tasks::{self as concurrency, Process, ProcessInfo}; -use spawned_rt::tasks::mpsc::UnboundedSender; +use spawned_rt::tasks::mpsc::Sender; use crate::messages::Message; pub struct Producer { - consumer: UnboundedSender, + consumer: Sender, } impl Producer { - pub async fn spawn_new(consumer: UnboundedSender) -> ProcessInfo { + pub async fn spawn_new(consumer: Sender) -> ProcessInfo { Self { consumer }.spawn().await } - fn send_ping(&self, tx: &UnboundedSender, consumer: &UnboundedSender) { + fn send_ping(&self, tx: &Sender, consumer: &Sender) { let message = Message::Ping { from: tx.clone() }; tracing::info!("Producer sent Ping"); concurrency::send(consumer, message); @@ -20,11 +20,11 @@ impl Producer { } impl Process for Producer { - async fn init(&mut self, tx: &UnboundedSender) { + async fn init(&mut self, tx: &Sender) { self.send_ping(tx, &self.consumer); } - async fn handle(&mut self, message: Message, tx: &UnboundedSender) -> Message { + async fn handle(&mut self, message: Message, tx: &Sender) -> Message { tracing::info!("Producer received {message:?}"); self.send_ping(tx, &self.consumer); message diff --git a/rt/src/tasks/mod.rs b/rt/src/tasks/mod.rs index 7055005..10de5fd 100644 --- a/rt/src/tasks/mod.rs +++ b/rt/src/tasks/mod.rs @@ -18,7 +18,7 @@ pub use crate::tasks::tokio::oneshot; pub use crate::tasks::tokio::sleep; pub use crate::tasks::tokio::CancellationToken; pub use crate::tasks::tokio::{spawn, spawn_blocking, JoinHandle, Runtime}; -pub use crate::tasks::tokio::{BroadcastStream, ReceiverStream, UnboundedReceiverStream}; +pub use crate::tasks::tokio::{BroadcastStream, ReceiverStream}; use std::future::Future; pub fn run(future: F) -> F::Output { diff --git a/rt/src/tasks/tokio/mod.rs b/rt/src/tasks/tokio/mod.rs index 7727b26..6abf60d 100644 --- a/rt/src/tasks/tokio/mod.rs +++ b/rt/src/tasks/tokio/mod.rs @@ -7,5 +7,5 @@ pub use tokio::{ task::{spawn, spawn_blocking, JoinHandle}, time::sleep, }; -pub use tokio_stream::wrappers::{BroadcastStream, ReceiverStream, UnboundedReceiverStream}; +pub use tokio_stream::wrappers::{BroadcastStream, UnboundedReceiverStream as ReceiverStream}; pub use tokio_util::sync::CancellationToken; diff --git a/rt/src/tasks/tokio/mpsc.rs b/rt/src/tasks/tokio/mpsc.rs index fba1606..ec520a6 100644 --- a/rt/src/tasks/tokio/mpsc.rs +++ b/rt/src/tasks/tokio/mpsc.rs @@ -1,9 +1,6 @@ //! Tokio.rs reexports to prevent tokio dependencies within external code -pub use tokio::sync::{ - broadcast::{channel as broadcast_channel, Receiver as BroadcastReceiver}, - mpsc::{ - channel, error::SendError, unbounded_channel, Receiver, Sender, UnboundedReceiver, - UnboundedSender, - }, +pub use tokio::sync::mpsc::{ + error::SendError, unbounded_channel as channel, UnboundedReceiver as Receiver, + UnboundedSender as Sender, }; From 2bed000e04ee1962ab4294f7849618f3008400df Mon Sep 17 00:00:00 2001 From: Juan Munoz Date: Fri, 4 Jul 2025 16:48:11 -0300 Subject: [PATCH 22/34] remove sync channels from threads impl --- concurrency/src/threads/gen_server.rs | 2 +- concurrency/src/threads/process.rs | 2 +- concurrency/src/threads/stream_tests.rs | 22 +--------------------- rt/src/threads/mpsc.rs | 4 +--- 4 files changed, 4 insertions(+), 26 deletions(-) diff --git a/concurrency/src/threads/gen_server.rs b/concurrency/src/threads/gen_server.rs index 69dd94c..753e849 100644 --- a/concurrency/src/threads/gen_server.rs +++ b/concurrency/src/threads/gen_server.rs @@ -23,7 +23,7 @@ impl Clone for GenServerHandle { impl GenServerHandle { pub(crate) fn new(initial_state: G::State) -> Self { - let (tx, mut rx) = mpsc::unbounded_channel::>(); + let (tx, mut rx) = mpsc::channel::>(); let handle = GenServerHandle { tx }; let mut gen_server: G = GenServer::new(); let handle_clone = handle.clone(); diff --git a/concurrency/src/threads/process.rs b/concurrency/src/threads/process.rs index 88eec8e..3dfd87d 100644 --- a/concurrency/src/threads/process.rs +++ b/concurrency/src/threads/process.rs @@ -24,7 +24,7 @@ where Self: Send + Sync + Sized + 'static, { fn spawn(mut self) -> ProcessInfo { - let (tx, mut rx) = mpsc::unbounded_channel::(); + let (tx, mut rx) = mpsc::channel::(); let tx_clone = tx.clone(); let handle = rt::spawn(move || self.run(&tx_clone, &mut rx)); ProcessInfo { tx, handle } diff --git a/concurrency/src/threads/stream_tests.rs b/concurrency/src/threads/stream_tests.rs index 0d41b4b..05011aa 100644 --- a/concurrency/src/threads/stream_tests.rs +++ b/concurrency/src/threads/stream_tests.rs @@ -78,27 +78,7 @@ pub fn test_sum_numbers_from_stream() { #[test] pub fn test_sum_numbers_from_channel() { let mut summatory_handle = Summatory::start(0); - let (tx, rx) = rt::mpsc::channel::>(5); - - rt::spawn(move || { - for i in 1..=5 { - tx.send(Ok(i)).unwrap(); - } - }); - - spawn_listener_from_iter(summatory_handle.clone(), message_builder, rx.into_iter()); - - // Wait for 1 second so the whole stream is processed - rt::sleep(Duration::from_secs(1)); - - let val = Summatory::get_value(&mut summatory_handle).unwrap(); - assert_eq!(val, 15); -} - -#[test] -pub fn test_sum_numbers_from_unbounded_channel() { - let mut summatory_handle = Summatory::start(0); - let (tx, rx) = rt::mpsc::unbounded_channel::>(); + let (tx, rx) = rt::mpsc::channel::>(); rt::spawn(move || { for i in 1..=5 { diff --git a/rt/src/threads/mpsc.rs b/rt/src/threads/mpsc.rs index 350a7aa..c5b140a 100644 --- a/rt/src/threads/mpsc.rs +++ b/rt/src/threads/mpsc.rs @@ -1,5 +1,3 @@ //! non-async replacement for mpsc channels -pub use std::sync::mpsc::{ - channel as unbounded_channel, sync_channel as channel, Receiver, SendError, Sender, -}; +pub use std::sync::mpsc::{channel, Receiver, SendError, Sender}; From ce77a368e45bb544fce1cefb64be12a3ceee0fe8 Mon Sep 17 00:00:00 2001 From: Juan Munoz Date: Fri, 4 Jul 2025 16:56:05 -0300 Subject: [PATCH 23/34] deprecate spawn_listener for threads impl --- concurrency/src/threads/mod.rs | 2 - concurrency/src/threads/stream.rs | 53 +-------- concurrency/src/threads/stream_tests.rs | 137 ------------------------ 3 files changed, 3 insertions(+), 189 deletions(-) delete mode 100644 concurrency/src/threads/stream_tests.rs diff --git a/concurrency/src/threads/mod.rs b/concurrency/src/threads/mod.rs index 0138c05..bee6813 100644 --- a/concurrency/src/threads/mod.rs +++ b/concurrency/src/threads/mod.rs @@ -6,8 +6,6 @@ mod process; mod stream; mod time; -#[cfg(test)] -mod stream_tests; #[cfg(test)] mod timer_tests; diff --git a/concurrency/src/threads/stream.rs b/concurrency/src/threads/stream.rs index faecb5d..a4fd749 100644 --- a/concurrency/src/threads/stream.rs +++ b/concurrency/src/threads/stream.rs @@ -1,56 +1,11 @@ use crate::threads::{GenServer, GenServerHandle}; -use std::thread::{self, JoinHandle}; -use futures::{Stream, StreamExt}; -use spawned_rt::threads::CancellationToken; -use std::iter; - -/// Spawns a listener that iterates over an iterable and sends messages to a GenServer. -/// -/// Items sent through the iterator are required to be wrapped in a Result type. -/// -/// This function returns a handle to the spawned task and a cancellation token -/// to stop it. -pub fn spawn_listener_from_iter( - mut handle: GenServerHandle, - message_builder: F, - iterable: S, -) -> (JoinHandle<()>, CancellationToken) -where - T: GenServer + 'static, - F: Fn(I) -> T::CastMsg + Send + 'static, - I: Send + 'static, - E: std::fmt::Debug + Send + 'static, - S: Iterator> + Send + 'static, -{ - let cancelation_token = CancellationToken::new(); - let mut cloned_token = cancelation_token.clone(); - let join_handle = thread::spawn(move || { - for res in iterable { - if cloned_token.is_cancelled() { - tracing::trace!("Received signal to stop listener, stopping"); - break; - } - - match res { - Ok(i) => { - let _ = handle.cast(message_builder(i)); - } - Err(err) => { - tracing::error!("Error in stream: {:?}", err); - break; - } - } - } - tracing::trace!("Stream finished"); - }); - (join_handle, cancelation_token) -} +use futures::Stream; /// Spawns a listener that listens to a stream and sends messages to a GenServer. /// /// Items sent through the stream are required to be wrapped in a Result type. -pub fn spawn_listener(handle: GenServerHandle, message_builder: F, mut stream: S) +pub fn spawn_listener(_handle: GenServerHandle, _message_builder: F, _stream: S) where T: GenServer + 'static, F: Fn(I) -> T::CastMsg + Send + 'static, @@ -58,7 +13,5 @@ where E: std::fmt::Debug + Send + 'static, S: Unpin + Send + Stream> + 'static, { - // Convert the stream into an iterator that blocks on each item. - let iterable = iter::from_fn(move || futures::executor::block_on(stream.next())); - spawn_listener_from_iter(handle, message_builder, iterable); + unimplemented!("Unsupported function in threads mode") } diff --git a/concurrency/src/threads/stream_tests.rs b/concurrency/src/threads/stream_tests.rs deleted file mode 100644 index 05011aa..0000000 --- a/concurrency/src/threads/stream_tests.rs +++ /dev/null @@ -1,137 +0,0 @@ -use std::time::Duration; - -use crate::threads::{ - stream::{spawn_listener, spawn_listener_from_iter}, - CallResponse, CastResponse, GenServer, GenServerHandle, -}; - -use spawned_rt::threads::{self as rt}; - -type SummatoryHandle = GenServerHandle; - -struct Summatory; - -type SummatoryState = u16; -type SummatoryCastMessage = SummatoryState; -type SummatoryOutMessage = SummatoryState; - -impl Summatory { - pub fn get_value(server: &mut SummatoryHandle) -> Result { - server.call(()).map_err(|_| ()) - } -} - -impl GenServer for Summatory { - type CallMsg = (); // We only handle one type of call, so there is no need for a specific message type. - type CastMsg = SummatoryCastMessage; - type OutMsg = SummatoryOutMessage; - type State = SummatoryState; - type Error = (); - - fn new() -> Self { - Self - } - - fn handle_cast( - &mut self, - message: Self::CastMsg, - _handle: &GenServerHandle, - state: Self::State, - ) -> CastResponse { - let new_state = state + message; - CastResponse::NoReply(new_state) - } - - fn handle_call( - &mut self, - _message: Self::CallMsg, - _handle: &GenServerHandle, - state: Self::State, - ) -> CallResponse { - let current_value = state; - CallResponse::Reply(state, current_value) - } -} - -// In this example, the stream sends u8 values, which are converted to the type -// supported by the GenServer (SummatoryCastMessage / u16). -fn message_builder(value: u8) -> SummatoryCastMessage { - value.into() -} - -#[test] -pub fn test_sum_numbers_from_stream() { - // In this example we are converting an async stream into a synchronous one - // Does this make sense in a real-world scenario? - let mut summatory_handle = Summatory::start(0); - let stream = tokio_stream::iter(vec![1u8, 2, 3, 4, 5].into_iter().map(Ok::)); - - spawn_listener(summatory_handle.clone(), message_builder, stream); - - // Wait for 1 second so the whole stream is processed - rt::sleep(Duration::from_secs(1)); - - let val = Summatory::get_value(&mut summatory_handle).unwrap(); - assert_eq!(val, 15); -} - -#[test] -pub fn test_sum_numbers_from_channel() { - let mut summatory_handle = Summatory::start(0); - let (tx, rx) = rt::mpsc::channel::>(); - - rt::spawn(move || { - for i in 1..=5 { - tx.send(Ok(i)).unwrap(); - } - }); - - spawn_listener_from_iter(summatory_handle.clone(), message_builder, rx.into_iter()); - - // Wait for 1 second so the whole stream is processed - rt::sleep(Duration::from_secs(1)); - - let val = Summatory::get_value(&mut summatory_handle).unwrap(); - assert_eq!(val, 15); -} - -#[test] -pub fn test_sum_numbers_from_iter() { - let mut summatory_handle = Summatory::start(0); - let stream = vec![1u8, 2, 3, 4, 5].into_iter().map(Ok::); - - spawn_listener_from_iter(summatory_handle.clone(), message_builder, stream); - - // Wait for the stream to finish processing - rt::sleep(Duration::from_secs(1)); - - let val = Summatory::get_value(&mut summatory_handle).unwrap(); - assert_eq!(val, 15); -} - -#[test] -pub fn test_stream_cancellation() { - let mut summatory_handle = Summatory::start(0); - let stream = vec![1u8, 2, 3, 4, 5].into_iter().map(Ok::); - - const RUNNING_TIME: u64 = 1000; - - // Add 'processing time' to each message - let message_builder = |x: u8| { - rt::sleep(Duration::from_millis(RUNNING_TIME / 4)); - x as u16 - }; - - spawn_listener_from_iter(summatory_handle.clone(), message_builder, stream); - - // Wait for the stream to finish processing - rt::sleep(Duration::from_millis(RUNNING_TIME)); - - // The reasoning for this assertion is that each message takes a quarter of the total time - // to be processed, so having a stream of 5 messages, some of them will never be processed. - // At first glance we would expect val == 10 considering it has time to process four messages, - // but in reality it will most likely get to process only three messages due to the overhead - // of creating a new thread for each message. - let val = Summatory::get_value(&mut summatory_handle).unwrap(); - assert!(val > 0 && val < 15); -} From 2dc4cc1340331bcde68b68a2d1dfed45867bac6e Mon Sep 17 00:00:00 2001 From: Juan Munoz Date: Fri, 4 Jul 2025 16:56:27 -0300 Subject: [PATCH 24/34] fix imports --- concurrency/src/threads/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/concurrency/src/threads/mod.rs b/concurrency/src/threads/mod.rs index bee6813..193af89 100644 --- a/concurrency/src/threads/mod.rs +++ b/concurrency/src/threads/mod.rs @@ -11,5 +11,5 @@ mod timer_tests; pub use gen_server::{CallResponse, CastResponse, GenServer, GenServerHandle, GenServerInMsg}; pub use process::{send, Process, ProcessInfo}; -pub use stream::{spawn_listener, spawn_listener_from_iter}; +pub use stream::spawn_listener; pub use time::{send_after, send_interval}; From ab9f69cd455ffae5ada2091f5fc71387dff162bd Mon Sep 17 00:00:00 2001 From: Juan Munoz Date: Fri, 4 Jul 2025 17:35:00 -0300 Subject: [PATCH 25/34] remove impl for threads due to reprecation --- concurrency/src/threads/gen_server.rs | 19 ++---------- concurrency/src/threads/stream.rs | 1 + concurrency/src/threads/time.rs | 5 ++-- concurrency/src/threads/timer_tests.rs | 41 -------------------------- 4 files changed, 5 insertions(+), 61 deletions(-) diff --git a/concurrency/src/threads/gen_server.rs b/concurrency/src/threads/gen_server.rs index d82a2e2..076e750 100644 --- a/concurrency/src/threads/gen_server.rs +++ b/concurrency/src/threads/gen_server.rs @@ -1,6 +1,6 @@ //! GenServer trait and structs to create an abstraction similar to Erlang gen_server. //! See examples/name_server for a usage example. -use spawned_rt::threads::{self as rt, mpsc, oneshot, CancellationToken}; +use spawned_rt::threads::{self as rt, mpsc, oneshot}; use std::{ fmt::Debug, panic::{catch_unwind, AssertUnwindSafe}, @@ -10,15 +10,12 @@ use crate::error::GenServerError; pub struct GenServerHandle { pub tx: mpsc::Sender>, - /// Cancellation token to stop the GenServer - cancellation_token: CancellationToken, } impl Clone for GenServerHandle { fn clone(&self) -> Self { Self { tx: self.tx.clone(), - cancellation_token: self.cancellation_token.clone(), } } } @@ -26,11 +23,7 @@ impl Clone for GenServerHandle { impl GenServerHandle { pub(crate) fn new(initial_state: G::State) -> Self { let (tx, mut rx) = mpsc::channel::>(); - let cancellation_token = CancellationToken::new(); - let handle = GenServerHandle { - tx, - cancellation_token, - }; + let handle = GenServerHandle { tx }; let mut gen_server: G = GenServer::new(); let handle_clone = handle.clone(); // Ignore the JoinHandle for now. Maybe we'll use it in the future @@ -63,14 +56,6 @@ impl GenServerHandle { .send(GenServerInMsg::Cast { message }) .map_err(|_error| GenServerError::Server) } - - pub fn cancellation_token(&self) -> CancellationToken { - self.cancellation_token.clone() - } - - pub fn stop(&mut self) { - self.cancellation_token.cancel(); - } } pub enum GenServerInMsg { diff --git a/concurrency/src/threads/stream.rs b/concurrency/src/threads/stream.rs index 37989e3..a4fd749 100644 --- a/concurrency/src/threads/stream.rs +++ b/concurrency/src/threads/stream.rs @@ -1,4 +1,5 @@ use crate::threads::{GenServer, GenServerHandle}; + use futures::Stream; /// Spawns a listener that listens to a stream and sends messages to a GenServer. diff --git a/concurrency/src/threads/time.rs b/concurrency/src/threads/time.rs index 28a1323..98ecb2c 100644 --- a/concurrency/src/threads/time.rs +++ b/concurrency/src/threads/time.rs @@ -23,8 +23,7 @@ where let mut cloned_token = cancellation_token.clone(); let join_handle = rt::spawn(move || { rt::sleep(period); - // Timer action is ignored if it was either cancelled or the associated GenServer is no longer running. - if !cloned_token.is_cancelled() && !handle.cancellation_token().is_cancelled() { + if !cloned_token.is_cancelled() { let _ = handle.cast(message); }; }); @@ -48,7 +47,7 @@ where let join_handle = rt::spawn(move || loop { rt::sleep(period); // Timer action is ignored if it was either cancelled or the associated GenServer is no longer running. - if cloned_token.is_cancelled() || handle.cancellation_token().is_cancelled() { + if cloned_token.is_cancelled() { break; } else { let _ = handle.cast(message.clone()); diff --git a/concurrency/src/threads/timer_tests.rs b/concurrency/src/threads/timer_tests.rs index 5c557cd..6b3b8a4 100644 --- a/concurrency/src/threads/timer_tests.rs +++ b/concurrency/src/threads/timer_tests.rs @@ -231,44 +231,3 @@ pub fn test_send_after_and_cancellation() { // As timer was cancelled, count should remain at 1 assert_eq!(DelayedOutMessage::Count(1), count2); } - -#[test] -pub fn test_send_after_and_gen_server_stop() { - // Start a Delayed - let mut repeater = Delayed::start(DelayedState { count: 0 }); - - // Set a just once timed message - let _ = send_after( - Duration::from_millis(100), - repeater.clone(), - DelayedCastMessage::Inc, - ); - - // Wait for 200 milliseconds - rt::sleep(Duration::from_millis(200)); - - // Check count - let count = Delayed::get_count(&mut repeater).unwrap(); - - // Only one message (no repetition) - assert_eq!(DelayedOutMessage::Count(1), count); - - // New timer - let _ = send_after( - Duration::from_millis(100), - repeater.clone(), - DelayedCastMessage::Inc, - ); - - // Stop the GenServer before timeout - repeater.stop(); - - // Wait another 200 milliseconds - rt::sleep(Duration::from_millis(200)); - - // Check count again - let count2 = Delayed::get_count(&mut repeater).unwrap(); - - // As timer was cancelled, count should remain at 1 - assert_eq!(DelayedOutMessage::Count(1), count2); -} From 2d6e8dbfcaf1c78b478e972b768771ae26f8de28 Mon Sep 17 00:00:00 2001 From: Juan Munoz Date: Fri, 4 Jul 2025 17:37:26 -0300 Subject: [PATCH 26/34] revert more lines --- concurrency/src/threads/gen_server.rs | 1 + concurrency/src/threads/time.rs | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/concurrency/src/threads/gen_server.rs b/concurrency/src/threads/gen_server.rs index 076e750..753e849 100644 --- a/concurrency/src/threads/gen_server.rs +++ b/concurrency/src/threads/gen_server.rs @@ -8,6 +8,7 @@ use std::{ use crate::error::GenServerError; +#[derive(Debug)] pub struct GenServerHandle { pub tx: mpsc::Sender>, } diff --git a/concurrency/src/threads/time.rs b/concurrency/src/threads/time.rs index 98ecb2c..3d47c05 100644 --- a/concurrency/src/threads/time.rs +++ b/concurrency/src/threads/time.rs @@ -46,7 +46,6 @@ where let mut cloned_token = cancellation_token.clone(); let join_handle = rt::spawn(move || loop { rt::sleep(period); - // Timer action is ignored if it was either cancelled or the associated GenServer is no longer running. if cloned_token.is_cancelled() { break; } else { From b5ef429cb1238cb99a21981e1ef1fe0b6ced5bdf Mon Sep 17 00:00:00 2001 From: Juan Munoz Date: Mon, 7 Jul 2025 09:56:01 -0300 Subject: [PATCH 27/34] rename `stop` to `teardown` --- concurrency/src/tasks/gen_server.rs | 2 +- concurrency/src/tasks/timer_tests.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/concurrency/src/tasks/gen_server.rs b/concurrency/src/tasks/gen_server.rs index aab1246..e3711f5 100644 --- a/concurrency/src/tasks/gen_server.rs +++ b/concurrency/src/tasks/gen_server.rs @@ -94,7 +94,7 @@ impl GenServerHandle { self.cancellation_token.clone() } - pub fn stop(&mut self) { + pub fn teardown(&mut self) { self.cancellation_token.cancel(); } } diff --git a/concurrency/src/tasks/timer_tests.rs b/concurrency/src/tasks/timer_tests.rs index 6d4b83b..86c1b05 100644 --- a/concurrency/src/tasks/timer_tests.rs +++ b/concurrency/src/tasks/timer_tests.rs @@ -248,7 +248,7 @@ pub fn test_send_after_and_cancellation() { } #[test] -pub fn test_send_after_gen_server_stop() { +pub fn test_send_after_gen_server_teardown() { let runtime = rt::Runtime::new().unwrap(); runtime.block_on(async move { // Start a Delayed @@ -278,7 +278,7 @@ pub fn test_send_after_gen_server_stop() { ); // Cancel the new timer before timeout - repeater.stop(); + repeater.teardown(); // Wait another 200 milliseconds rt::sleep(Duration::from_millis(200)).await; From 668b41f04d9992724b3349b0b87480d7b158e73a Mon Sep 17 00:00:00 2001 From: Juan Munoz Date: Thu, 10 Jul 2025 13:51:31 -0300 Subject: [PATCH 28/34] refactor teardown logic --- concurrency/src/tasks/gen_server.rs | 20 ++++++++--- concurrency/src/tasks/stream.rs | 11 +++--- concurrency/src/tasks/stream_tests.rs | 51 ++++++++++++++++++++------- concurrency/src/tasks/timer_tests.rs | 23 +++++++----- 4 files changed, 74 insertions(+), 31 deletions(-) diff --git a/concurrency/src/tasks/gen_server.rs b/concurrency/src/tasks/gen_server.rs index e3711f5..5f8edd5 100644 --- a/concurrency/src/tasks/gen_server.rs +++ b/concurrency/src/tasks/gen_server.rs @@ -93,10 +93,6 @@ impl GenServerHandle { pub fn cancellation_token(&self) -> CancellationToken { self.cancellation_token.clone() } - - pub fn teardown(&mut self) { - self.cancellation_token.cancel(); - } } pub enum GenServerInMsg { @@ -186,12 +182,15 @@ where async { loop { let (new_state, cont) = self.receive(handle, rx, state).await?; + state = new_state; if !cont { break; } - state = new_state; } tracing::trace!("Stopping GenServer"); + if let Err(err) = self.teardown(handle, state).await { + tracing::error!("Error during teardown: {err:?}"); + } Ok(()) } } @@ -287,6 +286,17 @@ where ) -> impl Future> + Send { async { CastResponse::Unused } } + + /// Teardown function. It's called after the stop message is received. + /// It can be overrided on implementations in case final steps are required, + /// like closing streams, stopping timers, etc. + fn teardown( + &mut self, + _handle: &GenServerHandle, + _state: Self::State, + ) -> impl Future> + Send { + async { Ok(()) } + } } #[cfg(test)] diff --git a/concurrency/src/tasks/stream.rs b/concurrency/src/tasks/stream.rs index bb3df3b..07ab108 100644 --- a/concurrency/src/tasks/stream.rs +++ b/concurrency/src/tasks/stream.rs @@ -1,6 +1,6 @@ use crate::tasks::{GenServer, GenServerHandle}; use futures::{future::select, Stream, StreamExt}; -use spawned_rt::tasks::{CancellationToken, JoinHandle}; +use spawned_rt::tasks::JoinHandle; /// Spawns a listener that listens to a stream and sends messages to a GenServer. /// @@ -12,7 +12,7 @@ pub fn spawn_listener( mut handle: GenServerHandle, message_builder: F, mut stream: S, -) -> (JoinHandle<()>, CancellationToken) +) -> JoinHandle<()> where T: GenServer + 'static, F: Fn(I) -> T::CastMsg + Send + 'static + std::marker::Sync, @@ -20,11 +20,10 @@ where E: std::fmt::Debug + Send, S: Unpin + Send + Stream> + 'static, { - let cancelation_token = CancellationToken::new(); - let cloned_token = cancelation_token.clone(); + let cancelation_token = handle.cancellation_token(); let join_handle = spawned_rt::tasks::spawn(async move { let result = select( - Box::pin(cloned_token.cancelled()), + Box::pin(cancelation_token.cancelled()), Box::pin(async { loop { match stream.next().await { @@ -53,5 +52,5 @@ where futures::future::Either::Right(_) => (), // Stream finished or errored out } }); - (join_handle, cancelation_token) + join_handle } diff --git a/concurrency/src/tasks/stream_tests.rs b/concurrency/src/tasks/stream_tests.rs index c2d6fde..a4565cb 100644 --- a/concurrency/src/tasks/stream_tests.rs +++ b/concurrency/src/tasks/stream_tests.rs @@ -3,7 +3,7 @@ use std::time::Duration; use spawned_rt::tasks::{self as rt, BroadcastStream, ReceiverStream}; use crate::tasks::{ - stream::spawn_listener, CallResponse, CastResponse, GenServer, GenServerHandle, + send_after, stream::spawn_listener, CallResponse, CastResponse, GenServer, GenServerHandle, }; type SummatoryHandle = GenServerHandle; @@ -11,9 +11,15 @@ type SummatoryHandle = GenServerHandle; struct Summatory; type SummatoryState = u16; -type SummatoryCastMessage = SummatoryState; +// type SummatoryCastMessage = SummatoryState; type SummatoryOutMessage = SummatoryState; +#[derive(Clone)] +enum SummatoryCastMessage { + Add(u16), + Stop, +} + impl Summatory { pub async fn get_value(server: &mut SummatoryHandle) -> Result { server.call(()).await.map_err(|_| ()) @@ -37,8 +43,13 @@ impl GenServer for Summatory { _handle: &GenServerHandle, state: Self::State, ) -> CastResponse { - let new_state = state + message; - CastResponse::NoReply(new_state) + match message { + SummatoryCastMessage::Add(val) => { + let new_state = state + val; + CastResponse::NoReply(new_state) + } + SummatoryCastMessage::Stop => CastResponse::Stop, + } } async fn handle_call( @@ -50,12 +61,21 @@ impl GenServer for Summatory { let current_value = state; CallResponse::Reply(state, current_value) } + + async fn teardown( + &mut self, + handle: &GenServerHandle, + _state: Self::State, + ) -> Result<(), Self::Error> { + handle.cancellation_token().cancel(); + Ok(()) + } } // In this example, the stream sends u8 values, which are converted to the type // supported by the GenServer (SummatoryCastMessage / u16). fn message_builder(value: u8) -> SummatoryCastMessage { - value.into() + SummatoryCastMessage::Add(value.into()) } #[test] @@ -148,22 +168,29 @@ pub fn test_stream_cancellation() { } }); - let (_handle, cancellation_token) = spawn_listener( + let listener_handle = spawn_listener( summatory_handle.clone(), message_builder, ReceiverStream::new(rx), ); - // Wait for 1 second so the whole stream is processed - rt::sleep(Duration::from_millis(RUNNING_TIME)).await; - - cancellation_token.cancel(); + // Start a timer to stop the stream after a certain time + let summatory_handle_clone = summatory_handle.clone(); + let _ = send_after( + Duration::from_millis(RUNNING_TIME + 10), + summatory_handle_clone, + SummatoryCastMessage::Stop, + ); + // Just before the stream is cancelled we retrieve the current value. + rt::sleep(Duration::from_millis(RUNNING_TIME)).await; + let val = Summatory::get_value(&mut summatory_handle).await.unwrap(); // The reasoning for this assertion is that each message takes a quarter of the total time // to be processed, so having a stream of 5 messages, the last one won't be processed. // We could safely assume that it will get to process 4 messages, but in case of any extenal // slowdown, it could process less. - let val = Summatory::get_value(&mut summatory_handle).await.unwrap(); - assert!(val > 0 && val < 15); + assert!((1..=10).contains(&val)); + + assert!(listener_handle.await.is_ok()); }) } diff --git a/concurrency/src/tasks/timer_tests.rs b/concurrency/src/tasks/timer_tests.rs index 86c1b05..297a45c 100644 --- a/concurrency/src/tasks/timer_tests.rs +++ b/concurrency/src/tasks/timer_tests.rs @@ -149,6 +149,7 @@ enum DelayedCastMessage { #[derive(Clone)] enum DelayedCallMessage { GetCount, + Stop, } #[derive(PartialEq, Debug)] @@ -165,6 +166,10 @@ impl Delayed { .await .map_err(|_| ()) } + + pub async fn stop(server: &mut DelayedHandle) -> Result { + server.call(DelayedCallMessage::Stop).await.map_err(|_| ()) + } } impl GenServer for Delayed { @@ -180,12 +185,17 @@ impl GenServer for Delayed { async fn handle_call( &mut self, - _message: Self::CallMsg, + message: Self::CallMsg, _handle: &DelayedHandle, state: Self::State, ) -> CallResponse { - let count = state.count; - CallResponse::Reply(state, DelayedOutMessage::Count(count)) + match message { + DelayedCallMessage::GetCount => { + let count = state.count; + CallResponse::Reply(state, DelayedOutMessage::Count(count)) + } + DelayedCallMessage::Stop => CallResponse::Stop(DelayedOutMessage::Count(state.count)), + } } async fn handle_cast( @@ -277,15 +287,12 @@ pub fn test_send_after_gen_server_teardown() { DelayedCastMessage::Inc, ); - // Cancel the new timer before timeout - repeater.teardown(); + // Stop the GenServer before timeout + let count2 = Delayed::stop(&mut repeater).await.unwrap(); // Wait another 200 milliseconds rt::sleep(Duration::from_millis(200)).await; - // Check count again - let count2 = Delayed::get_count(&mut repeater).await.unwrap(); - // As timer was cancelled, count should remain at 1 assert_eq!(DelayedOutMessage::Count(1), count2); }); From 412f291cf6c4861cfd37e9db3c6a58974dd9bcae Mon Sep 17 00:00:00 2001 From: Juan Munoz Date: Thu, 10 Jul 2025 14:11:08 -0300 Subject: [PATCH 29/34] remove commented code, reword tracing msg --- concurrency/src/tasks/stream.rs | 2 +- concurrency/src/tasks/stream_tests.rs | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/concurrency/src/tasks/stream.rs b/concurrency/src/tasks/stream.rs index 07ab108..4c4e844 100644 --- a/concurrency/src/tasks/stream.rs +++ b/concurrency/src/tasks/stream.rs @@ -48,7 +48,7 @@ where ) .await; match result { - futures::future::Either::Left(_) => tracing::trace!("Listener cancelled"), + futures::future::Either::Left(_) => tracing::trace!("GenServer stopped"), futures::future::Either::Right(_) => (), // Stream finished or errored out } }); diff --git a/concurrency/src/tasks/stream_tests.rs b/concurrency/src/tasks/stream_tests.rs index a4565cb..6e18e58 100644 --- a/concurrency/src/tasks/stream_tests.rs +++ b/concurrency/src/tasks/stream_tests.rs @@ -11,7 +11,6 @@ type SummatoryHandle = GenServerHandle; struct Summatory; type SummatoryState = u16; -// type SummatoryCastMessage = SummatoryState; type SummatoryOutMessage = SummatoryState; #[derive(Clone)] From f37178e9313662a40f677c20baaa9f2b33d4a1ba Mon Sep 17 00:00:00 2001 From: Juan Munoz Date: Fri, 11 Jul 2025 12:53:54 -0300 Subject: [PATCH 30/34] improve default teardown function --- concurrency/src/tasks/gen_server.rs | 8 ++++++-- concurrency/src/tasks/stream_tests.rs | 9 --------- 2 files changed, 6 insertions(+), 11 deletions(-) diff --git a/concurrency/src/tasks/gen_server.rs b/concurrency/src/tasks/gen_server.rs index 5f8edd5..ad0fda6 100644 --- a/concurrency/src/tasks/gen_server.rs +++ b/concurrency/src/tasks/gen_server.rs @@ -292,10 +292,14 @@ where /// like closing streams, stopping timers, etc. fn teardown( &mut self, - _handle: &GenServerHandle, + handle: &GenServerHandle, _state: Self::State, ) -> impl Future> + Send { - async { Ok(()) } + async { + // By default, we just cancel the cancellation token available on all GenServerHandles. + handle.cancellation_token().cancel(); + Ok(()) + } } } diff --git a/concurrency/src/tasks/stream_tests.rs b/concurrency/src/tasks/stream_tests.rs index 6e18e58..9546003 100644 --- a/concurrency/src/tasks/stream_tests.rs +++ b/concurrency/src/tasks/stream_tests.rs @@ -60,15 +60,6 @@ impl GenServer for Summatory { let current_value = state; CallResponse::Reply(state, current_value) } - - async fn teardown( - &mut self, - handle: &GenServerHandle, - _state: Self::State, - ) -> Result<(), Self::Error> { - handle.cancellation_token().cancel(); - Ok(()) - } } // In this example, the stream sends u8 values, which are converted to the type From e6520709c550609ffcf8e05d3ee6c35a3a570596 Mon Sep 17 00:00:00 2001 From: Juan Munoz Date: Fri, 11 Jul 2025 15:44:56 -0300 Subject: [PATCH 31/34] mandatory token cancel --- concurrency/src/tasks/gen_server.rs | 9 +++------ concurrency/src/tasks/stream_tests.rs | 6 ++++++ 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/concurrency/src/tasks/gen_server.rs b/concurrency/src/tasks/gen_server.rs index ad0fda6..6c15ce0 100644 --- a/concurrency/src/tasks/gen_server.rs +++ b/concurrency/src/tasks/gen_server.rs @@ -188,6 +188,7 @@ where } } tracing::trace!("Stopping GenServer"); + handle.cancellation_token().cancel(); if let Err(err) = self.teardown(handle, state).await { tracing::error!("Error during teardown: {err:?}"); } @@ -292,14 +293,10 @@ where /// like closing streams, stopping timers, etc. fn teardown( &mut self, - handle: &GenServerHandle, + _handle: &GenServerHandle, _state: Self::State, ) -> impl Future> + Send { - async { - // By default, we just cancel the cancellation token available on all GenServerHandles. - handle.cancellation_token().cancel(); - Ok(()) - } + async { Ok(()) } } } diff --git a/concurrency/src/tasks/stream_tests.rs b/concurrency/src/tasks/stream_tests.rs index 9546003..cab97bb 100644 --- a/concurrency/src/tasks/stream_tests.rs +++ b/concurrency/src/tasks/stream_tests.rs @@ -175,6 +175,7 @@ pub fn test_stream_cancellation() { // Just before the stream is cancelled we retrieve the current value. rt::sleep(Duration::from_millis(RUNNING_TIME)).await; let val = Summatory::get_value(&mut summatory_handle).await.unwrap(); + // The reasoning for this assertion is that each message takes a quarter of the total time // to be processed, so having a stream of 5 messages, the last one won't be processed. // We could safely assume that it will get to process 4 messages, but in case of any extenal @@ -182,5 +183,10 @@ pub fn test_stream_cancellation() { assert!((1..=10).contains(&val)); assert!(listener_handle.await.is_ok()); + + // Finnally, we check that the server is stopped, by getting an error when trying to call it. + rt::sleep(Duration::from_millis(10)).await; + let foo = Summatory::get_value(&mut summatory_handle).await; + assert!(Summatory::get_value(&mut summatory_handle).await.is_err()); }) } From b25770723b39163a570c7427aea7df0ad96e4789 Mon Sep 17 00:00:00 2001 From: Juan Munoz Date: Fri, 11 Jul 2025 16:35:29 -0300 Subject: [PATCH 32/34] remove unused variable --- concurrency/src/tasks/stream_tests.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/concurrency/src/tasks/stream_tests.rs b/concurrency/src/tasks/stream_tests.rs index cab97bb..e96c7e1 100644 --- a/concurrency/src/tasks/stream_tests.rs +++ b/concurrency/src/tasks/stream_tests.rs @@ -186,7 +186,6 @@ pub fn test_stream_cancellation() { // Finnally, we check that the server is stopped, by getting an error when trying to call it. rt::sleep(Duration::from_millis(10)).await; - let foo = Summatory::get_value(&mut summatory_handle).await; assert!(Summatory::get_value(&mut summatory_handle).await.is_err()); }) } From 633f1975a10c9a8052dd1a0617a3a32624efe27d Mon Sep 17 00:00:00 2001 From: Juan Munoz Date: Wed, 16 Jul 2025 11:34:16 -0300 Subject: [PATCH 33/34] bump crate version --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 2e9c9e6..bbdbf71 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,6 @@ tracing = { version = "0.1.41", features = ["log"] } tracing-subscriber = { version = "0.3.19", features = ["env-filter"] } [workspace.package] -version = "0.1.3" +version = "0.1.4" license = "MIT" edition = "2021" From 729dc1b469b9ad7c2ca02af285cad4572ea29784 Mon Sep 17 00:00:00 2001 From: Juan Munoz Date: Wed, 16 Jul 2025 11:34:44 -0300 Subject: [PATCH 34/34] update lock --- Cargo.lock | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 035015c..d49427a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1197,7 +1197,7 @@ dependencies = [ [[package]] name = "spawned-concurrency" -version = "0.1.3" +version = "0.1.4" dependencies = [ "futures", "spawned-rt", @@ -1208,7 +1208,7 @@ dependencies = [ [[package]] name = "spawned-rt" -version = "0.1.3" +version = "0.1.4" dependencies = [ "crossbeam", "tokio",