Skip to content

fix: wait for pubsub listeners before reconnect#1253

Open
thlorenz wants to merge 11 commits into
masterfrom
thlorenz/fix-dropped-clients
Open

fix: wait for pubsub listeners before reconnect#1253
thlorenz wants to merge 11 commits into
masterfrom
thlorenz/fix-dropped-clients

Conversation

@thlorenz
Copy link
Copy Markdown
Collaborator

@thlorenz thlorenz commented May 27, 2026

Summary

Fix pubsub reconnect lifetime safety by ensuring reconnect waits for subscription listener tasks to stop and drop their streams before pooled pubsub clients are replaced.

Details

magicblock-chainlink

This changes the reconnect path from best-effort cancellation to a two-phase drain:

  • account and program subscriptions now track both cancellation and listener-completion signals
  • reconnect drains subscription maps, requests listener shutdown, and waits for listener completion before recreating pooled pubsub clients
  • listener tasks signal completion only after dropping their update stream and running best-effort unsubscribe cleanup
  • explicit account unsubscribe remains fast by only cancelling the listener and leaving the subscription entry visible until listener cleanup, so a near-simultaneous reconnect can still wait for completion

The pubsub pool reconnect docs now call out the required precondition that old listener streams must be finished before pooled clients are dropped.

Additional tests cover successful account/program listener drain, completion timeout behavior, and the fast explicit-unsubscribe path preserving the map entry for reconnect drain.

Summary by CodeRabbit

  • Bug Fixes

    • Improved subscription shutdown and reconnection so listener tasks are drained and awaited (with timeout) before pooled connections are reused or closed, preventing dropped listeners and leaked resources.
  • Tests

    • Added tests validating that shutdown/reconnect waits for subscription completion and that timeouts and cancellation behave as expected.
  • Documentation

    • Clarified reconnect preconditions to require listener completion before reusing connections.

Review Change Stack

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 27, 2026

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

Run ID: 3138cd86-75fb-4b6b-86b9-d70e43540c83

📥 Commits

Reviewing files that changed from the base of the PR and between 8ebcd58 and ab0c326.

📒 Files selected for processing (1)
  • magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs

📝 Walkthrough

Walkthrough

The PR updates ChainPubsubActor to use per-subscription completion tokens for coordinating safe listener task shutdown. A new completion_token field is added to AccountSubscription. Helper functions introduce test-aware timeout selection and drain-and-wait logic that cancels listener tokens and awaits completion with timeout. Subscription creation and failure paths now manage completion tokens. Listener task cleanup explicitly drops streams and cancels completion tokens. The reconnect flow uses the new drain-and-wait helper, and existing unsubscribe/abort paths are updated to handle the new field. Tests validate the completion token coordination, and reconnect documentation clarifies caller preconditions about listener task completion.

Suggested reviewers

  • GabrielePicco
  • bmuddha
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch thlorenz/fix-dropped-clients

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@thlorenz
Copy link
Copy Markdown
Collaborator Author

@coderabbitai review

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 28, 2026

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs`:
- Around line 224-233: The calls that use .expect(...) when locking
subscriptions and program_subs (the code that initializes account_subs and
program_subs by calling subscriptions.lock() and program_subs.lock()) must not
panic on PoisonError; replace those .expect(...) usages with recoverable
handling that maps the PoisonError into a RemoteAccountProviderError and then
propagate or send that error from the actor instead of panicking. Concretely,
change subscriptions.lock().expect(...) and program_subs.lock().expect(...) to
subscriptions.lock().map_err(|e|
RemoteAccountProviderError::MutexPoisoned(format!("{}", e)))? (or similar) and
return or send that RemoteAccountProviderError to the caller/actor mailbox; make
the same change for the other occurrences around the 418-423 region so all mutex
poison paths propagate RemoteAccountProviderError rather than calling expect.
- Around line 235-253: The current loops serially call and await
Self::cancel_and_wait_for_stream_drop for entries in account_subs and
program_subs, causing N×timeout reconnect latency; change the logic to first
invoke cancellation for all subs without awaiting (collecting the returned
futures) and then await them concurrently (e.g. via futures::future::join_all or
FuturesUnordered) while still capturing the first error into first_error;
operate on the same identifiers (account_subs, program_subs, client_id,
cancel_and_wait_for_stream_drop, first_error) so cancellations run in parallel
and overall wait is bounded by a single timeout rather than multiplied by N.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

Run ID: e6267bb9-b27d-46e7-acf4-559ddef8ddc6

📥 Commits

Reviewing files that changed from the base of the PR and between 89372e6 and 8ebcd58.

📒 Files selected for processing (3)
  • magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs
  • magicblock-chainlink/src/remote_account_provider/pubsub_common.rs
  • magicblock-chainlink/src/remote_account_provider/pubsub_connection_pool.rs

@thlorenz thlorenz marked this pull request as ready for review May 28, 2026 08:58
@thlorenz thlorenz requested a review from GabrielePicco May 28, 2026 09:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant