Skip to content

Add stop behaviour to GenServer #22

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 35 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
da0dacd
First attempt for stream_listener
ElFantasma Jun 30, 2025
c15cd54
add test to stream listener
juan518munoz Jul 1, 2025
cfca334
add util fn to convert receiver to stream
juan518munoz Jul 2, 2025
4e3db7f
add bounded channel
juan518munoz Jul 2, 2025
de32666
add broadcast listener
juan518munoz Jul 2, 2025
6bfb8a1
fix `spawn_broadcast_listener`
juan518munoz Jul 2, 2025
7a6f79a
unify spawn_listener & remove oneline functions
juan518munoz Jul 2, 2025
4d49882
doc update
juan518munoz Jul 2, 2025
5138e09
add impl of sync spawn listener
juan518munoz Jul 3, 2025
04222ae
rename spawn_listener to spawn_listener_from_iter, and port spawn_lis…
juan518munoz Jul 3, 2025
51ecbb2
add bound channel to threads concurrency
juan518munoz Jul 3, 2025
0a78b64
merge duplicated code
juan518munoz Jul 3, 2025
835bf08
add cancel token with 'flaky' test
juan518munoz Jul 3, 2025
0c24840
unflaky the test
juan518munoz Jul 3, 2025
541cc1e
add cancellation to task impl of spawn_listener
juan518munoz Jul 3, 2025
2950644
docs & clippy
juan518munoz Jul 3, 2025
376deda
Merge branch 'main' into stream_listener
juan518munoz Jul 4, 2025
6f7a305
use futures select inside spawn listener
juan518munoz Jul 4, 2025
7b0e33d
use genserver cancel token on stream
juan518munoz Jul 4, 2025
138890f
add cancelation token to timer
juan518munoz Jul 4, 2025
116241d
add cancellation token to gen server tasks impl
juan518munoz Jul 4, 2025
702e4df
remove bounded channels from tasks impl
juan518munoz Jul 4, 2025
2bed000
remove sync channels from threads impl
juan518munoz Jul 4, 2025
ce77a36
deprecate spawn_listener for threads impl
juan518munoz Jul 4, 2025
2dc4cc1
fix imports
juan518munoz Jul 4, 2025
53f5a3e
merge dst branch
juan518munoz Jul 4, 2025
ab9f69c
remove impl for threads due to reprecation
juan518munoz Jul 4, 2025
2d6e8db
revert more lines
juan518munoz Jul 4, 2025
b5ef429
rename `stop` to `teardown`
juan518munoz Jul 7, 2025
668b41f
refactor teardown logic
juan518munoz Jul 10, 2025
38d6848
merge main
juan518munoz Jul 10, 2025
412f291
remove commented code, reword tracing msg
juan518munoz Jul 10, 2025
f37178e
improve default teardown function
juan518munoz Jul 11, 2025
e652070
mandatory token cancel
juan518munoz Jul 11, 2025
b257707
remove unused variable
juan518munoz Jul 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 34 additions & 5 deletions concurrency/src/tasks/gen_server.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,34 @@
//! GenServer trait and structs to create an abstraction similar to Erlang gen_server.
//! See examples/name_server for a usage example.
use futures::future::FutureExt as _;
use spawned_rt::tasks::{self as rt, mpsc, oneshot};
use spawned_rt::tasks::{self as rt, mpsc, oneshot, CancellationToken};
use std::{fmt::Debug, future::Future, panic::AssertUnwindSafe};

use crate::error::GenServerError;

#[derive(Debug)]
pub struct GenServerHandle<G: GenServer + 'static> {
pub tx: mpsc::Sender<GenServerInMsg<G>>,
/// Cancellation token to stop the GenServer
cancellation_token: CancellationToken,
}

impl<G: GenServer> Clone for GenServerHandle<G> {
fn clone(&self) -> Self {
Self {
tx: self.tx.clone(),
cancellation_token: self.cancellation_token.clone(),
}
}
}

impl<G: GenServer> GenServerHandle<G> {
pub(crate) fn new(initial_state: G::State) -> Self {
let (tx, mut rx) = mpsc::channel::<GenServerInMsg<G>>();
let handle = GenServerHandle { tx };
let cancellation_token = CancellationToken::new();
let handle = GenServerHandle {
tx,
cancellation_token,
};
let mut gen_server: G = GenServer::new();
let handle_clone = handle.clone();
// Ignore the JoinHandle for now. Maybe we'll use it in the future
Expand All @@ -40,7 +46,11 @@ impl<G: GenServer> GenServerHandle<G> {

pub(crate) fn new_blocking(initial_state: G::State) -> Self {
let (tx, mut rx) = mpsc::channel::<GenServerInMsg<G>>();
let handle = GenServerHandle { tx };
let cancellation_token = CancellationToken::new();
let handle = GenServerHandle {
tx,
cancellation_token,
};
let mut gen_server: G = GenServer::new();
let handle_clone = handle.clone();
// Ignore the JoinHandle for now. Maybe we'll use it in the future
Expand Down Expand Up @@ -79,6 +89,10 @@ impl<G: GenServer> GenServerHandle<G> {
.send(GenServerInMsg::Cast { message })
.map_err(|_error| GenServerError::Server)
}

pub fn cancellation_token(&self) -> CancellationToken {
self.cancellation_token.clone()
}
}

pub enum GenServerInMsg<G: GenServer> {
Expand Down Expand Up @@ -168,12 +182,16 @@ where
async {
loop {
let (new_state, cont) = self.receive(handle, rx, state).await?;
state = new_state;
if !cont {
break;
}
state = new_state;
}
tracing::trace!("Stopping GenServer");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think here is the place to handle.cancellation_token().cancel() as we shouldn't rely on the implementers to remember cancelling it (and if possible, it shouldn't be public)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, considering the cancellation_token is part of every GenServer, it would be proper that by default all of them call the cancel function. I have applied this changes inside of teardown:
https://github.com/lambdaclass/spawned/pull/22/files#diff-932cb8f7a40a4c5e58805fa19b32b0d0fef1afee68560831b86a61d59e0553dcR300

Furthermore, thinking it a bit more, it may be better to have it outside of teardown to prevent the user from shooting himself in the foot, as I can't think of any case where it wouldn't want to call the cancel method.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it should be standard GenServer behavior: Stopping it will cancel it's cancellation token always. And we cannot put it in the default teardown() implementation as it can be overridden.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

handle.cancellation_token().cancel();
if let Err(err) = self.teardown(handle, state).await {
tracing::error!("Error during teardown: {err:?}");
}
Ok(())
}
}
Expand Down Expand Up @@ -269,6 +287,17 @@ where
) -> impl Future<Output = CastResponse<Self>> + Send {
async { CastResponse::Unused }
}

/// Teardown function. It's called after the stop message is received.
/// It can be overrided on implementations in case final steps are required,
/// like closing streams, stopping timers, etc.
fn teardown(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👏

&mut self,
_handle: &GenServerHandle<Self>,
_state: Self::State,
) -> impl Future<Output = Result<(), Self::Error>> + Send {
async { Ok(()) }
}
}

#[cfg(test)]
Expand Down
13 changes: 6 additions & 7 deletions concurrency/src/tasks/stream.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::tasks::{GenServer, GenServerHandle};
use futures::{future::select, Stream, StreamExt};
use spawned_rt::tasks::{CancellationToken, JoinHandle};
use spawned_rt::tasks::JoinHandle;

/// Spawns a listener that listens to a stream and sends messages to a GenServer.
///
Expand All @@ -12,19 +12,18 @@ pub fn spawn_listener<T, F, S, I, E>(
mut handle: GenServerHandle<T>,
message_builder: F,
mut stream: S,
) -> (JoinHandle<()>, CancellationToken)
) -> JoinHandle<()>
where
T: GenServer + 'static,
F: Fn(I) -> T::CastMsg + Send + 'static + std::marker::Sync,
I: Send,
E: std::fmt::Debug + Send,
S: Unpin + Send + Stream<Item = Result<I, E>> + 'static,
{
let cancelation_token = CancellationToken::new();
let cloned_token = cancelation_token.clone();
let cancelation_token = handle.cancellation_token();
let join_handle = spawned_rt::tasks::spawn(async move {
let result = select(
Box::pin(cloned_token.cancelled()),
Box::pin(cancelation_token.cancelled()),
Box::pin(async {
loop {
match stream.next().await {
Expand All @@ -49,9 +48,9 @@ where
)
.await;
match result {
futures::future::Either::Left(_) => tracing::trace!("Listener cancelled"),
futures::future::Either::Left(_) => tracing::trace!("GenServer stopped"),
futures::future::Either::Right(_) => (), // Stream finished or errored out
}
});
(join_handle, cancelation_token)
join_handle
}
53 changes: 35 additions & 18 deletions concurrency/src/tasks/stream_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,20 @@ use std::time::Duration;
use spawned_rt::tasks::{self as rt, BroadcastStream, ReceiverStream};

use crate::tasks::{
stream::spawn_listener, CallResponse, CastResponse, GenServer, GenServerHandle,
send_after, stream::spawn_listener, CallResponse, CastResponse, GenServer, GenServerHandle,
};

type SummatoryHandle = GenServerHandle<Summatory>;

struct Summatory;

type SummatoryState = u16;
type SummatoryOutMessage = SummatoryState;

#[derive(Clone)]
struct UpdateSumatory {
added_value: u16,
enum SummatoryCastMessage {
Add(u16),
Stop,
}

impl Summatory {
Expand All @@ -25,8 +27,8 @@ impl Summatory {

impl GenServer for Summatory {
type CallMsg = (); // We only handle one type of call, so there is no need for a specific message type.
type CastMsg = UpdateSumatory;
type OutMsg = SummatoryState;
type CastMsg = SummatoryCastMessage;
type OutMsg = SummatoryOutMessage;
type State = SummatoryState;
type Error = ();

Expand All @@ -40,8 +42,13 @@ impl GenServer for Summatory {
_handle: &GenServerHandle<Self>,
state: Self::State,
) -> CastResponse<Self> {
let new_state = state + message.added_value;
CastResponse::NoReply(new_state)
match message {
SummatoryCastMessage::Add(val) => {
let new_state = state + val;
CastResponse::NoReply(new_state)
}
SummatoryCastMessage::Stop => CastResponse::Stop,
}
}

async fn handle_call(
Expand All @@ -56,11 +63,9 @@ impl GenServer for Summatory {
}

// In this example, the stream sends u8 values, which are converted to the type
// supported by the GenServer (UpdateSumatory / u16).
fn message_builder(value: u8) -> UpdateSumatory {
UpdateSumatory {
added_value: value as u16,
}
// supported by the GenServer (SummatoryCastMessage / u16).
fn message_builder(value: u8) -> SummatoryCastMessage {
SummatoryCastMessage::Add(value.into())
}

#[test]
Expand Down Expand Up @@ -153,22 +158,34 @@ pub fn test_stream_cancellation() {
}
});

let (_handle, cancellation_token) = spawn_listener(
let listener_handle = spawn_listener(
summatory_handle.clone(),
message_builder,
ReceiverStream::new(rx),
);

// Wait for 1 second so the whole stream is processed
rt::sleep(Duration::from_millis(RUNNING_TIME)).await;
// Start a timer to stop the stream after a certain time
let summatory_handle_clone = summatory_handle.clone();
let _ = send_after(
Duration::from_millis(RUNNING_TIME + 10),
summatory_handle_clone,
SummatoryCastMessage::Stop,
);

cancellation_token.cancel();
// Just before the stream is cancelled we retrieve the current value.
rt::sleep(Duration::from_millis(RUNNING_TIME)).await;
let val = Summatory::get_value(&mut summatory_handle).await.unwrap();

// The reasoning for this assertion is that each message takes a quarter of the total time
// to be processed, so having a stream of 5 messages, the last one won't be processed.
// We could safely assume that it will get to process 4 messages, but in case of any extenal
// slowdown, it could process less.
let val = Summatory::get_value(&mut summatory_handle).await.unwrap();
assert!(val > 0 && val < 15);
assert!((1..=10).contains(&val));

assert!(listener_handle.await.is_ok());

// Finnally, we check that the server is stopped, by getting an error when trying to call it.
rt::sleep(Duration::from_millis(10)).await;
assert!(Summatory::get_value(&mut summatory_handle).await.is_err());
})
}
18 changes: 16 additions & 2 deletions concurrency/src/tasks/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,16 @@ where
{
let cancellation_token = CancellationToken::new();
let cloned_token = cancellation_token.clone();
let gen_server_cancellation_token = handle.cancellation_token();
let join_handle = rt::spawn(async move {
let _ = select(
// Timer action is ignored if it was either cancelled or the associated GenServer is no longer running.
let cancel_conditions = select(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, nice! I like having both, the individual cancellation_token, and the gen_server associated one

Box::pin(cloned_token.cancelled()),
Box::pin(gen_server_cancellation_token.cancelled()),
);

let _ = select(
cancel_conditions,
Box::pin(async {
rt::sleep(period).await;
let _ = handle.cast(message.clone()).await;
Expand All @@ -49,10 +56,17 @@ where
{
let cancellation_token = CancellationToken::new();
let cloned_token = cancellation_token.clone();
let gen_server_cancellation_token = handle.cancellation_token();
let join_handle = rt::spawn(async move {
loop {
let result = select(
// Timer action is ignored if it was either cancelled or the associated GenServer is no longer running.
let cancel_conditions = select(
Box::pin(cloned_token.cancelled()),
Box::pin(gen_server_cancellation_token.cancelled()),
);

let result = select(
Box::pin(cancel_conditions),
Box::pin(async {
rt::sleep(period).await;
let _ = handle.cast(message.clone()).await;
Expand Down
57 changes: 54 additions & 3 deletions concurrency/src/tasks/timer_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ enum DelayedCastMessage {
#[derive(Clone)]
enum DelayedCallMessage {
GetCount,
Stop,
}

#[derive(PartialEq, Debug)]
Expand All @@ -165,6 +166,10 @@ impl Delayed {
.await
.map_err(|_| ())
}

pub async fn stop(server: &mut DelayedHandle) -> Result<DelayedOutMessage, ()> {
server.call(DelayedCallMessage::Stop).await.map_err(|_| ())
}
}

impl GenServer for Delayed {
Expand All @@ -180,12 +185,17 @@ impl GenServer for Delayed {

async fn handle_call(
&mut self,
_message: Self::CallMsg,
message: Self::CallMsg,
_handle: &DelayedHandle,
state: Self::State,
) -> CallResponse<Self> {
let count = state.count;
CallResponse::Reply(state, DelayedOutMessage::Count(count))
match message {
DelayedCallMessage::GetCount => {
let count = state.count;
CallResponse::Reply(state, DelayedOutMessage::Count(count))
}
DelayedCallMessage::Stop => CallResponse::Stop(DelayedOutMessage::Count(state.count)),
}
}

async fn handle_cast(
Expand Down Expand Up @@ -246,3 +256,44 @@ pub fn test_send_after_and_cancellation() {
assert_eq!(DelayedOutMessage::Count(1), count2);
});
}

#[test]
pub fn test_send_after_gen_server_teardown() {
let runtime = rt::Runtime::new().unwrap();
runtime.block_on(async move {
// Start a Delayed
let mut repeater = Delayed::start(DelayedState { count: 0 });

// Set a just once timed message
let _ = send_after(
Duration::from_millis(100),
repeater.clone(),
DelayedCastMessage::Inc,
);

// Wait for 200 milliseconds
rt::sleep(Duration::from_millis(200)).await;

// Check count
let count = Delayed::get_count(&mut repeater).await.unwrap();

// Only one message (no repetition)
assert_eq!(DelayedOutMessage::Count(1), count);

// New timer
let _ = send_after(
Duration::from_millis(100),
repeater.clone(),
DelayedCastMessage::Inc,
);

// Stop the GenServer before timeout
let count2 = Delayed::stop(&mut repeater).await.unwrap();

// Wait another 200 milliseconds
rt::sleep(Duration::from_millis(200)).await;

// As timer was cancelled, count should remain at 1
assert_eq!(DelayedOutMessage::Count(1), count2);
});
}