Skip to content

Implement stream listener #20

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 24 commits into from
Jul 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
da0dacd
First attempt for stream_listener
ElFantasma Jun 30, 2025
c15cd54
add test to stream listener
juan518munoz Jul 1, 2025
cfca334
add util fn to convert receiver to stream
juan518munoz Jul 2, 2025
4e3db7f
add bounded channel
juan518munoz Jul 2, 2025
de32666
add broadcast listener
juan518munoz Jul 2, 2025
6bfb8a1
fix `spawn_broadcast_listener`
juan518munoz Jul 2, 2025
7a6f79a
unify spawn_listener & remove oneline functions
juan518munoz Jul 2, 2025
4d49882
doc update
juan518munoz Jul 2, 2025
5138e09
add impl of sync spawn listener
juan518munoz Jul 3, 2025
04222ae
rename spawn_listener to spawn_listener_from_iter, and port spawn_lis…
juan518munoz Jul 3, 2025
51ecbb2
add bound channel to threads concurrency
juan518munoz Jul 3, 2025
0a78b64
merge duplicated code
juan518munoz Jul 3, 2025
835bf08
add cancel token with 'flaky' test
juan518munoz Jul 3, 2025
0c24840
unflaky the test
juan518munoz Jul 3, 2025
541cc1e
add cancellation to task impl of spawn_listener
juan518munoz Jul 3, 2025
2950644
docs & clippy
juan518munoz Jul 3, 2025
376deda
Merge branch 'main' into stream_listener
juan518munoz Jul 4, 2025
6f7a305
use futures select inside spawn listener
juan518munoz Jul 4, 2025
702e4df
remove bounded channels from tasks impl
juan518munoz Jul 4, 2025
2bed000
remove sync channels from threads impl
juan518munoz Jul 4, 2025
ce77a36
deprecate spawn_listener for threads impl
juan518munoz Jul 4, 2025
2dc4cc1
fix imports
juan518munoz Jul 4, 2025
2638334
reword example for clarity
juan518munoz Jul 8, 2025
bbabca9
add comment for clarity
juan518munoz Jul 10, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .tool-versions
Original file line number Diff line number Diff line change
@@ -1 +1 @@
rust 1.85.1
rust 1.88.0
15 changes: 15 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion concurrency/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,10 @@ spawned-rt = { workspace = true }
tracing = { workspace = true }
futures = "0.3.1"

[dev-dependencies]
# This tokio imports are only used in tests, we should not use them in the library code.
tokio-stream = { version = "0.1.17" }
tokio = { version = "1", features = ["full"] }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One hidden goal we have is not to depend on tokio in more than one place. This deps should be in rt crate, and reexported so we can use them in concurrency crate
(that way, if we ever plan to replace the runtime, we do it in a single place, ideally)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This imports are only used for testing (inside of the streaming tests more specifically). They are meant to show intercompatibility with tokio (e.g we receive a broadcast channel and need to make use of it inside of spawned).


[lib]
path = "./src/lib.rs"
path = "./src/lib.rs"
4 changes: 4 additions & 0 deletions concurrency/src/tasks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,15 @@

mod gen_server;
mod process;
mod stream;
mod time;

#[cfg(test)]
mod stream_tests;
#[cfg(test)]
mod timer_tests;

pub use gen_server::{CallResponse, CastResponse, GenServer, GenServerHandle, GenServerInMsg};
pub use process::{send, Process, ProcessInfo};
pub use stream::spawn_listener;
pub use time::{send_after, send_interval};
57 changes: 57 additions & 0 deletions concurrency/src/tasks/stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
use crate::tasks::{GenServer, GenServerHandle};
use futures::{future::select, Stream, StreamExt};
use spawned_rt::tasks::{CancellationToken, JoinHandle};

/// Spawns a listener that listens to a stream and sends messages to a GenServer.
///
/// Items sent through the stream are required to be wrapped in a Result type.
///
/// This function returns a handle to the spawned task and a cancellation token
/// to stop it.
pub fn spawn_listener<T, F, S, I, E>(
mut handle: GenServerHandle<T>,
message_builder: F,
mut stream: S,
) -> (JoinHandle<()>, CancellationToken)
where
T: GenServer + 'static,
F: Fn(I) -> T::CastMsg + Send + 'static + std::marker::Sync,
I: Send,
E: std::fmt::Debug + Send,
S: Unpin + Send + Stream<Item = Result<I, E>> + 'static,
{
let cancelation_token = CancellationToken::new();
Copy link
Collaborator

@ElFantasma ElFantasma Jul 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had the idea of creating the CancellationToken in the GenServer on initialization. And every time a listener or a timer is spawned to interact with the GenServer it should request it for the CancellationToken to reuse the same one.
Then, it's the GenServer responsibility to cancel the token on termination. That way, graceful cancellation is transparent for the external process.
Anyway, we can try to implement that on this PR, or ticket it for a future improvement.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that it'd be a good idea to have the cancellation token inside of the gen server, considering this change would also requiere a change in the timer implementation, I believe we should make this change on a separate PR.

I'll branch from this one and start work on it ASAP.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, given GenServer is just a Trait, I guess we should put the token in the GenServerHandler, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I've taken an approach of that kind, see the PR here

let cloned_token = cancelation_token.clone();
let join_handle = spawned_rt::tasks::spawn(async move {
let result = select(
Box::pin(cloned_token.cancelled()),
Box::pin(async {
loop {
match stream.next().await {
Some(Ok(i)) => match handle.cast(message_builder(i)).await {
Ok(_) => tracing::trace!("Message sent successfully"),
Err(e) => {
tracing::error!("Failed to send message: {e:?}");
break;
}
},
Some(Err(e)) => {
tracing::trace!("Received Error in msg {e:?}");
break;
}
None => {
tracing::trace!("Stream finished");
break;
}
}
}
}),
)
.await;
match result {
futures::future::Either::Left(_) => tracing::trace!("Listener cancelled"),
futures::future::Either::Right(_) => (), // Stream finished or errored out
}
});
(join_handle, cancelation_token)
}
174 changes: 174 additions & 0 deletions concurrency/src/tasks/stream_tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
use std::time::Duration;

use spawned_rt::tasks::{self as rt, BroadcastStream, ReceiverStream};

use crate::tasks::{
stream::spawn_listener, CallResponse, CastResponse, GenServer, GenServerHandle,
};

type SummatoryHandle = GenServerHandle<Summatory>;

struct Summatory;

type SummatoryState = u16;

#[derive(Clone)]
struct UpdateSumatory {
added_value: u16,
}

impl Summatory {
pub async fn get_value(server: &mut SummatoryHandle) -> Result<SummatoryState, ()> {
server.call(()).await.map_err(|_| ())
}
}

impl GenServer for Summatory {
type CallMsg = (); // We only handle one type of call, so there is no need for a specific message type.
type CastMsg = UpdateSumatory;
type OutMsg = SummatoryState;
type State = SummatoryState;
type Error = ();

fn new() -> Self {
Self
}

async fn handle_cast(
&mut self,
message: Self::CastMsg,
_handle: &GenServerHandle<Self>,
state: Self::State,
) -> CastResponse<Self> {
let new_state = state + message.added_value;
CastResponse::NoReply(new_state)
}

async fn handle_call(
&mut self,
_message: Self::CallMsg,
_handle: &SummatoryHandle,
state: Self::State,
) -> CallResponse<Self> {
let current_value = state;
CallResponse::Reply(state, current_value)
}
}

// In this example, the stream sends u8 values, which are converted to the type
// supported by the GenServer (UpdateSumatory / u16).
fn message_builder(value: u8) -> UpdateSumatory {
UpdateSumatory {
added_value: value as u16,
}
}

#[test]
pub fn test_sum_numbers_from_stream() {
let runtime = rt::Runtime::new().unwrap();
runtime.block_on(async move {
let mut summatory_handle = Summatory::start(0);
let stream = tokio_stream::iter(vec![1u8, 2, 3, 4, 5].into_iter().map(Ok::<u8, ()>));

spawn_listener(summatory_handle.clone(), message_builder, stream);

// Wait for 1 second so the whole stream is processed
rt::sleep(Duration::from_secs(1)).await;

let val = Summatory::get_value(&mut summatory_handle).await.unwrap();
assert_eq!(val, 15);
})
}

#[test]
pub fn test_sum_numbers_from_channel() {
let runtime = rt::Runtime::new().unwrap();
runtime.block_on(async move {
let mut summatory_handle = Summatory::start(0);
let (tx, rx) = spawned_rt::tasks::mpsc::channel::<Result<u8, ()>>();

// Spawn a task to send numbers to the channel
spawned_rt::tasks::spawn(async move {
for i in 1..=5 {
tx.send(Ok(i)).unwrap();
}
});

spawn_listener(
summatory_handle.clone(),
message_builder,
ReceiverStream::new(rx),
);

// Wait for 1 second so the whole stream is processed
rt::sleep(Duration::from_secs(1)).await;

let val = Summatory::get_value(&mut summatory_handle).await.unwrap();
assert_eq!(val, 15);
})
}

#[test]
pub fn test_sum_numbers_from_broadcast_channel() {
let runtime = rt::Runtime::new().unwrap();
runtime.block_on(async move {
let mut summatory_handle = Summatory::start(0);
let (tx, rx) = tokio::sync::broadcast::channel::<u8>(5);

// Spawn a task to send numbers to the channel
spawned_rt::tasks::spawn(async move {
for i in 1u8..=5 {
tx.send(i).unwrap();
}
});

spawn_listener(
summatory_handle.clone(),
message_builder,
BroadcastStream::new(rx),
);

// Wait for 1 second so the whole stream is processed
rt::sleep(Duration::from_secs(1)).await;

let val = Summatory::get_value(&mut summatory_handle).await.unwrap();
assert_eq!(val, 15);
})
}

#[test]
pub fn test_stream_cancellation() {
const RUNNING_TIME: u64 = 1000;

let runtime = rt::Runtime::new().unwrap();
runtime.block_on(async move {
let mut summatory_handle = Summatory::start(0);
let (tx, rx) = spawned_rt::tasks::mpsc::channel::<Result<u8, ()>>();

// Spawn a task to send numbers to the channel
spawned_rt::tasks::spawn(async move {
for i in 1..=5 {
tx.send(Ok(i)).unwrap();
rt::sleep(Duration::from_millis(RUNNING_TIME / 4)).await;
}
});

let (_handle, cancellation_token) = spawn_listener(
summatory_handle.clone(),
message_builder,
ReceiverStream::new(rx),
);

// Wait for 1 second so the whole stream is processed
rt::sleep(Duration::from_millis(RUNNING_TIME)).await;

cancellation_token.cancel();

// The reasoning for this assertion is that each message takes a quarter of the total time
// to be processed, so having a stream of 5 messages, the last one won't be processed.
// We could safely assume that it will get to process 4 messages, but in case of any extenal
// slowdown, it could process less.
let val = Summatory::get_value(&mut summatory_handle).await.unwrap();
assert!(val > 0 && val < 15);
})
}
2 changes: 2 additions & 0 deletions concurrency/src/threads/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@

mod gen_server;
mod process;
mod stream;
mod time;

#[cfg(test)]
mod timer_tests;

pub use gen_server::{CallResponse, CastResponse, GenServer, GenServerHandle, GenServerInMsg};
pub use process::{send, Process, ProcessInfo};
pub use stream::spawn_listener;
pub use time::{send_after, send_interval};
17 changes: 17 additions & 0 deletions concurrency/src/threads/stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
use crate::threads::{GenServer, GenServerHandle};

use futures::Stream;

/// Spawns a listener that listens to a stream and sends messages to a GenServer.
///
/// Items sent through the stream are required to be wrapped in a Result type.
pub fn spawn_listener<T, F, S, I, E>(_handle: GenServerHandle<T>, _message_builder: F, _stream: S)
where
T: GenServer + 'static,
F: Fn(I) -> T::CastMsg + Send + 'static,
I: Send + 'static,
E: std::fmt::Debug + Send + 'static,
S: Unpin + Send + Stream<Item = Result<I, E>> + 'static,
{
unimplemented!("Unsupported function in threads mode")
}
3 changes: 2 additions & 1 deletion rt/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ edition = "2021"
[dependencies]
tokio = { version = "1", features = ["full"] }
tokio-util = { version = "0.7.15" }
tokio-stream = { version = "0.1.17", features = ["sync"] }
crossbeam = { version = "0.7.3" }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }

[lib]
path = "./src/lib.rs"
path = "./src/lib.rs"
1 change: 1 addition & 0 deletions rt/src/tasks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub use crate::tasks::tokio::oneshot;
pub use crate::tasks::tokio::sleep;
pub use crate::tasks::tokio::CancellationToken;
pub use crate::tasks::tokio::{spawn, spawn_blocking, JoinHandle, Runtime};
pub use crate::tasks::tokio::{BroadcastStream, ReceiverStream};
use std::future::Future;

pub fn run<F: Future>(future: F) -> F::Output {
Expand Down
1 change: 1 addition & 0 deletions rt/src/tasks/tokio/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ pub use tokio::{
task::{spawn, spawn_blocking, JoinHandle},
time::sleep,
};
pub use tokio_stream::wrappers::{BroadcastStream, UnboundedReceiverStream as ReceiverStream};
pub use tokio_util::sync::CancellationToken;
Loading