Skip to content

Implement stream listener #20

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 23 commits into
base: main
Choose a base branch
from
Open

Implement stream listener #20

wants to merge 23 commits into from

Conversation

juan518munoz
Copy link
Contributor

@juan518munoz juan518munoz commented Jul 2, 2025

Add spawn_listener function to crate, this function is used to enable communication between GenServers and Streams.

In this PR adds a new re-export:

Which is used to convert (wrap) their kind of receiving channel into a stream compatible with spawn_listener.

Threads implementation is no longer maintained, so there's nothing added to that concurrenc method.

Closes #17

@juan518munoz juan518munoz marked this pull request as ready for review July 3, 2025 20:38
E: std::fmt::Debug + Send,
S: Unpin + Send + Stream<Item = Result<I, E>> + 'static,
{
let cancelation_token = CancellationToken::new();
Copy link
Collaborator

@ElFantasma ElFantasma Jul 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had the idea of creating the CancellationToken in the GenServer on initialization. And every time a listener or a timer is spawned to interact with the GenServer it should request it for the CancellationToken to reuse the same one.
Then, it's the GenServer responsibility to cancel the token on termination. That way, graceful cancellation is transparent for the external process.
Anyway, we can try to implement that on this PR, or ticket it for a future improvement.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that it'd be a good idea to have the cancellation token inside of the gen server, considering this change would also requiere a change in the timer implementation, I believe we should make this change on a separate PR.

I'll branch from this one and start work on it ASAP.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, given GenServer is just a Trait, I guess we should put the token in the GenServerHandler, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I've taken an approach of that kind, see the PR here

let cloned_token = cancelation_token.clone();
let join_handle = spawned_rt::tasks::spawn(async move {
loop {
if cloned_token.is_cancelled() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not the way an async CancellationToken is supposed to be used:
stream.next() is a future and execution can be held at that point, even when the token gets cancelled. We should use futures::future::select like we do on timer module (or tokio::select! if we decide to allow macros) to await on both futures at the same time. See graceful shutdown

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

@@ -8,5 +8,9 @@ spawned-rt = { workspace = true }
tracing = { workspace = true }
futures = "0.3.1"

[dev-dependencies]
tokio-stream = { version = "0.1.17" }
tokio = { version = "1", features = ["full"] }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One hidden goal we have is not to depend on tokio in more than one place. This deps should be in rt crate, and reexported so we can use them in concurrency crate
(that way, if we ever plan to replace the runtime, we do it in a single place, ideally)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This imports are only used for testing (inside of the streaming tests more specifically). They are meant to show intercompatibility with tokio (e.g we receive a broadcast channel and need to make use of it inside of spawned).

Copy link
Collaborator

@ElFantasma ElFantasma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM,
just a minor comment on using tokio in other place than rt

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Implement Stream listener mechanism
3 participants