Skip to content

Commit d61a9fa

Browse files
Implement stream listener (#20)
* First attempt for stream_listener * add test to stream listener * add util fn to convert receiver to stream * add bounded channel * add broadcast listener * fix `spawn_broadcast_listener` * unify spawn_listener & remove oneline functions * doc update * add impl of sync spawn listener * rename spawn_listener to spawn_listener_from_iter, and port spawn_listener * add bound channel to threads concurrency * merge duplicated code * add cancel token with 'flaky' test * unflaky the test * add cancellation to task impl of spawn_listener * docs & clippy * use futures select inside spawn listener * remove bounded channels from tasks impl * remove sync channels from threads impl * deprecate spawn_listener for threads impl * fix imports * reword example for clarity * add comment for clarity --------- Co-authored-by: Esteban Dimitroff Hodi <[email protected]>
1 parent a58b6c0 commit d61a9fa

File tree

11 files changed

+280
-3
lines changed

11 files changed

+280
-3
lines changed

.tool-versions

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
rust 1.85.1
1+
rust 1.88.0

Cargo.lock

Lines changed: 15 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

concurrency/Cargo.toml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,10 @@ spawned-rt = { workspace = true }
88
tracing = { workspace = true }
99
futures = "0.3.1"
1010

11+
[dev-dependencies]
12+
# This tokio imports are only used in tests, we should not use them in the library code.
13+
tokio-stream = { version = "0.1.17" }
14+
tokio = { version = "1", features = ["full"] }
15+
1116
[lib]
12-
path = "./src/lib.rs"
17+
path = "./src/lib.rs"

concurrency/src/tasks/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,15 @@
33
44
mod gen_server;
55
mod process;
6+
mod stream;
67
mod time;
78

9+
#[cfg(test)]
10+
mod stream_tests;
811
#[cfg(test)]
912
mod timer_tests;
1013

1114
pub use gen_server::{CallResponse, CastResponse, GenServer, GenServerHandle, GenServerInMsg};
1215
pub use process::{send, Process, ProcessInfo};
16+
pub use stream::spawn_listener;
1317
pub use time::{send_after, send_interval};

concurrency/src/tasks/stream.rs

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
use crate::tasks::{GenServer, GenServerHandle};
2+
use futures::{future::select, Stream, StreamExt};
3+
use spawned_rt::tasks::{CancellationToken, JoinHandle};
4+
5+
/// Spawns a listener that listens to a stream and sends messages to a GenServer.
6+
///
7+
/// Items sent through the stream are required to be wrapped in a Result type.
8+
///
9+
/// This function returns a handle to the spawned task and a cancellation token
10+
/// to stop it.
11+
pub fn spawn_listener<T, F, S, I, E>(
12+
mut handle: GenServerHandle<T>,
13+
message_builder: F,
14+
mut stream: S,
15+
) -> (JoinHandle<()>, CancellationToken)
16+
where
17+
T: GenServer + 'static,
18+
F: Fn(I) -> T::CastMsg + Send + 'static + std::marker::Sync,
19+
I: Send,
20+
E: std::fmt::Debug + Send,
21+
S: Unpin + Send + Stream<Item = Result<I, E>> + 'static,
22+
{
23+
let cancelation_token = CancellationToken::new();
24+
let cloned_token = cancelation_token.clone();
25+
let join_handle = spawned_rt::tasks::spawn(async move {
26+
let result = select(
27+
Box::pin(cloned_token.cancelled()),
28+
Box::pin(async {
29+
loop {
30+
match stream.next().await {
31+
Some(Ok(i)) => match handle.cast(message_builder(i)).await {
32+
Ok(_) => tracing::trace!("Message sent successfully"),
33+
Err(e) => {
34+
tracing::error!("Failed to send message: {e:?}");
35+
break;
36+
}
37+
},
38+
Some(Err(e)) => {
39+
tracing::trace!("Received Error in msg {e:?}");
40+
break;
41+
}
42+
None => {
43+
tracing::trace!("Stream finished");
44+
break;
45+
}
46+
}
47+
}
48+
}),
49+
)
50+
.await;
51+
match result {
52+
futures::future::Either::Left(_) => tracing::trace!("Listener cancelled"),
53+
futures::future::Either::Right(_) => (), // Stream finished or errored out
54+
}
55+
});
56+
(join_handle, cancelation_token)
57+
}

concurrency/src/tasks/stream_tests.rs

Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
use std::time::Duration;
2+
3+
use spawned_rt::tasks::{self as rt, BroadcastStream, ReceiverStream};
4+
5+
use crate::tasks::{
6+
stream::spawn_listener, CallResponse, CastResponse, GenServer, GenServerHandle,
7+
};
8+
9+
type SummatoryHandle = GenServerHandle<Summatory>;
10+
11+
struct Summatory;
12+
13+
type SummatoryState = u16;
14+
15+
#[derive(Clone)]
16+
struct UpdateSumatory {
17+
added_value: u16,
18+
}
19+
20+
impl Summatory {
21+
pub async fn get_value(server: &mut SummatoryHandle) -> Result<SummatoryState, ()> {
22+
server.call(()).await.map_err(|_| ())
23+
}
24+
}
25+
26+
impl GenServer for Summatory {
27+
type CallMsg = (); // We only handle one type of call, so there is no need for a specific message type.
28+
type CastMsg = UpdateSumatory;
29+
type OutMsg = SummatoryState;
30+
type State = SummatoryState;
31+
type Error = ();
32+
33+
fn new() -> Self {
34+
Self
35+
}
36+
37+
async fn handle_cast(
38+
&mut self,
39+
message: Self::CastMsg,
40+
_handle: &GenServerHandle<Self>,
41+
state: Self::State,
42+
) -> CastResponse<Self> {
43+
let new_state = state + message.added_value;
44+
CastResponse::NoReply(new_state)
45+
}
46+
47+
async fn handle_call(
48+
&mut self,
49+
_message: Self::CallMsg,
50+
_handle: &SummatoryHandle,
51+
state: Self::State,
52+
) -> CallResponse<Self> {
53+
let current_value = state;
54+
CallResponse::Reply(state, current_value)
55+
}
56+
}
57+
58+
// In this example, the stream sends u8 values, which are converted to the type
59+
// supported by the GenServer (UpdateSumatory / u16).
60+
fn message_builder(value: u8) -> UpdateSumatory {
61+
UpdateSumatory {
62+
added_value: value as u16,
63+
}
64+
}
65+
66+
#[test]
67+
pub fn test_sum_numbers_from_stream() {
68+
let runtime = rt::Runtime::new().unwrap();
69+
runtime.block_on(async move {
70+
let mut summatory_handle = Summatory::start(0);
71+
let stream = tokio_stream::iter(vec![1u8, 2, 3, 4, 5].into_iter().map(Ok::<u8, ()>));
72+
73+
spawn_listener(summatory_handle.clone(), message_builder, stream);
74+
75+
// Wait for 1 second so the whole stream is processed
76+
rt::sleep(Duration::from_secs(1)).await;
77+
78+
let val = Summatory::get_value(&mut summatory_handle).await.unwrap();
79+
assert_eq!(val, 15);
80+
})
81+
}
82+
83+
#[test]
84+
pub fn test_sum_numbers_from_channel() {
85+
let runtime = rt::Runtime::new().unwrap();
86+
runtime.block_on(async move {
87+
let mut summatory_handle = Summatory::start(0);
88+
let (tx, rx) = spawned_rt::tasks::mpsc::channel::<Result<u8, ()>>();
89+
90+
// Spawn a task to send numbers to the channel
91+
spawned_rt::tasks::spawn(async move {
92+
for i in 1..=5 {
93+
tx.send(Ok(i)).unwrap();
94+
}
95+
});
96+
97+
spawn_listener(
98+
summatory_handle.clone(),
99+
message_builder,
100+
ReceiverStream::new(rx),
101+
);
102+
103+
// Wait for 1 second so the whole stream is processed
104+
rt::sleep(Duration::from_secs(1)).await;
105+
106+
let val = Summatory::get_value(&mut summatory_handle).await.unwrap();
107+
assert_eq!(val, 15);
108+
})
109+
}
110+
111+
#[test]
112+
pub fn test_sum_numbers_from_broadcast_channel() {
113+
let runtime = rt::Runtime::new().unwrap();
114+
runtime.block_on(async move {
115+
let mut summatory_handle = Summatory::start(0);
116+
let (tx, rx) = tokio::sync::broadcast::channel::<u8>(5);
117+
118+
// Spawn a task to send numbers to the channel
119+
spawned_rt::tasks::spawn(async move {
120+
for i in 1u8..=5 {
121+
tx.send(i).unwrap();
122+
}
123+
});
124+
125+
spawn_listener(
126+
summatory_handle.clone(),
127+
message_builder,
128+
BroadcastStream::new(rx),
129+
);
130+
131+
// Wait for 1 second so the whole stream is processed
132+
rt::sleep(Duration::from_secs(1)).await;
133+
134+
let val = Summatory::get_value(&mut summatory_handle).await.unwrap();
135+
assert_eq!(val, 15);
136+
})
137+
}
138+
139+
#[test]
140+
pub fn test_stream_cancellation() {
141+
const RUNNING_TIME: u64 = 1000;
142+
143+
let runtime = rt::Runtime::new().unwrap();
144+
runtime.block_on(async move {
145+
let mut summatory_handle = Summatory::start(0);
146+
let (tx, rx) = spawned_rt::tasks::mpsc::channel::<Result<u8, ()>>();
147+
148+
// Spawn a task to send numbers to the channel
149+
spawned_rt::tasks::spawn(async move {
150+
for i in 1..=5 {
151+
tx.send(Ok(i)).unwrap();
152+
rt::sleep(Duration::from_millis(RUNNING_TIME / 4)).await;
153+
}
154+
});
155+
156+
let (_handle, cancellation_token) = spawn_listener(
157+
summatory_handle.clone(),
158+
message_builder,
159+
ReceiverStream::new(rx),
160+
);
161+
162+
// Wait for 1 second so the whole stream is processed
163+
rt::sleep(Duration::from_millis(RUNNING_TIME)).await;
164+
165+
cancellation_token.cancel();
166+
167+
// The reasoning for this assertion is that each message takes a quarter of the total time
168+
// to be processed, so having a stream of 5 messages, the last one won't be processed.
169+
// We could safely assume that it will get to process 4 messages, but in case of any extenal
170+
// slowdown, it could process less.
171+
let val = Summatory::get_value(&mut summatory_handle).await.unwrap();
172+
assert!(val > 0 && val < 15);
173+
})
174+
}

concurrency/src/threads/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@
33
44
mod gen_server;
55
mod process;
6+
mod stream;
67
mod time;
78

89
#[cfg(test)]
910
mod timer_tests;
1011

1112
pub use gen_server::{CallResponse, CastResponse, GenServer, GenServerHandle, GenServerInMsg};
1213
pub use process::{send, Process, ProcessInfo};
14+
pub use stream::spawn_listener;
1315
pub use time::{send_after, send_interval};

concurrency/src/threads/stream.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
use crate::threads::{GenServer, GenServerHandle};
2+
3+
use futures::Stream;
4+
5+
/// Spawns a listener that listens to a stream and sends messages to a GenServer.
6+
///
7+
/// Items sent through the stream are required to be wrapped in a Result type.
8+
pub fn spawn_listener<T, F, S, I, E>(_handle: GenServerHandle<T>, _message_builder: F, _stream: S)
9+
where
10+
T: GenServer + 'static,
11+
F: Fn(I) -> T::CastMsg + Send + 'static,
12+
I: Send + 'static,
13+
E: std::fmt::Debug + Send + 'static,
14+
S: Unpin + Send + Stream<Item = Result<I, E>> + 'static,
15+
{
16+
unimplemented!("Unsupported function in threads mode")
17+
}

rt/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,10 @@ edition = "2021"
66
[dependencies]
77
tokio = { version = "1", features = ["full"] }
88
tokio-util = { version = "0.7.15" }
9+
tokio-stream = { version = "0.1.17", features = ["sync"] }
910
crossbeam = { version = "0.7.3" }
1011
tracing = { workspace = true }
1112
tracing-subscriber = { workspace = true }
1213

1314
[lib]
14-
path = "./src/lib.rs"
15+
path = "./src/lib.rs"

rt/src/tasks/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ pub use crate::tasks::tokio::oneshot;
1818
pub use crate::tasks::tokio::sleep;
1919
pub use crate::tasks::tokio::CancellationToken;
2020
pub use crate::tasks::tokio::{spawn, spawn_blocking, JoinHandle, Runtime};
21+
pub use crate::tasks::tokio::{BroadcastStream, ReceiverStream};
2122
use std::future::Future;
2223

2324
pub fn run<F: Future>(future: F) -> F::Output {

0 commit comments

Comments
 (0)