Skip to content

server: fix client/protocol init, customize server config, consolidate health manager #13

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: main
Choose a base branch
from

Conversation

eliteprox
Copy link
Collaborator

@eliteprox eliteprox commented Aug 16, 2025

This pull request introduces several improvements and refactorings to the PyTrickle streaming library, focusing on stream state management, reliability, and documentation clarity. The most significant change is the replacement of the StreamHealthManager with a more generic StreamState, leading to updates across the stream management logic. Additional enhancements include improved publisher error recovery, better handling of protocol shutdown, and clearer documentation of stream management approaches.

Stream State Management Refactor:

  • Replaced StreamHealthManager with StreamState throughout the stream manager classes, updating all references and logic to use the new state management interface. This simplifies and standardizes how stream health and lifecycle are tracked and reported. (pytrickle/manager.py, pytrickle/__init__.py, pytrickle/server.py, pytrickle/health.py removed) [1] [2] [3] [4] [5] [6] [7] [8] [9] [10] [11] [12]

Publisher and Protocol Reliability Improvements:

  • Improved the publisher's next() method to allow recovery from transient errors and retry preconnects, enhancing robustness in the face of network or encoder issues. (pytrickle/publisher.py)
  • Ensured protocol queues are always initialized and added safe shutdown handling to prevent errors during protocol stop. (pytrickle/protocol.py) [1] [2]
  • Modified publisher session management to always create a fresh session on start, preventing stale keep-alives and improving connection reliability. (pytrickle/publisher.py)

Documentation and API Clarity:

  • Expanded the README.md to clarify the different stream management approaches available in PyTrickle, helping users select the right architecture for their needs.

Frame Utilities:

  • Added a FrameFactory class to provide helper methods for constructing video and audio frames with consistent defaults. (pytrickle/frames.py)

Server Extensibility:

  • Introduced a RouteConfig dataclass for configuring custom routes in the server, laying groundwork for more flexible HTTP API extensions. (pytrickle/server.py)

Let me know if you need a deeper dive into any of these changes!

@eliteprox eliteprox force-pushed the feat/add-server-config branch 2 times, most recently from cbf207d to bea4408 Compare August 16, 2025 20:46
add health manager to server, unify state
fix control handler missing
consolidate HealthManager with StreamState, extend server
@eliteprox eliteprox force-pushed the feat/add-server-config branch from fd6567f to 598f8ef Compare August 16, 2025 21:43
@eliteprox eliteprox changed the title server: customize server config, consolidate health manager server: fix trickleclient/protocol init, customize server config, consolidate health manager Aug 16, 2025
@eliteprox eliteprox changed the title server: fix trickleclient/protocol init, customize server config, consolidate health manager server: fix client/protocol init, customize server config, consolidate health manager Aug 16, 2025
@pschroedl pschroedl self-requested a review August 18, 2025 20:01
@@ -48,7 +47,6 @@
"AudioOutput",
"TricklePublisher",
"TrickleSubscriber",
"StreamHealthManager",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Random nit: mind fixing the tab vs spaces issue on line 42 referring to "StreamProcessor"?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in adf2494

@@ -102,8 +298,9 @@ async def _handle_start_stream(self, request: web.Request) -> web.Response:
height = params_dict.get("height", 512)
max_framerate = params_dict.get("max_framerate", None) # None will use default

# Create protocol and client (align with current Client/Protocol API)
protocol = TrickleProtocol(
#TODO: Consider adding lifecycle_lock here
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is worth adding async with self._lifecycle_lock: here or however you envisioned it to prevent race conditions

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added in aaea932, tested stop/start successfully

if self.subscribe_queue is not None:
self.subscribe_queue.put(None)
except Exception:
pass
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should log the exceptions here and on 211

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added logging in adf2494

pass
finally:
self.session = None
connector = aiohttp.TCPConnector(verify_ssl=False, limit=0, keepalive_timeout=5)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keepalive and client timeout should be configurable or at least constants applied to class params.

Copy link
Collaborator Author

@eliteprox eliteprox Aug 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've now made the timeouts configurable all the way through TrickleServer and TrickleClient. See 685e12f

When initializing StreamServer, user can provide subscriber_timeout and/or publisher_timeout to configure these. I removed the keep alive mechanism as it's not necessary for these short connections.

I also updated TrickleSubscriber to follow similar constant patterns.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On second thought - some publishers may need keep-alive packets. Considering the case of no data sent for 30+ seconds on a running stream. I'll add keep-alive pings back in for publishers

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keep-alive added back 0941d21. It is only configurable by Publisher currently and defaults to a class constant

# If preconnect failed due to connection reuse issues, retry once immediately
if self.next_writer is None and not self._should_stop():
logger.info("Preconnect returned None, retrying once immediately")
await asyncio.sleep(0.05)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also should be a constant ( RETRY_DELAY_SECONDS? )

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to use constants in 685e12f

try:
await self.session.close()
except Exception:
pass
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log exception

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added logging in adf2494

connector = aiohttp.TCPConnector(verify_ssl=False)
timeout = aiohttp.ClientTimeout(total=30) # Reduced timeout for faster shutdown
self.session = aiohttp.ClientSession(connector=connector, timeout=timeout)
# Always create a fresh session on start to avoid stale keep-alives across streams
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you elaborate a bit more on this?

Copy link
Collaborator Author

@eliteprox eliteprox Aug 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is related to a change to close any open connection sessions when starting the publisher. It protects against memory issues if start() is called when the publisher is already running.

The previous start() would create a new connection without checking this. It is also somewhat relevant to the discussion above on timeout and keep_alive messages #13 (comment)

Example:
set_state(PipelineState.READY)
"""
if state == PipelineState.WARMING_PIPELINE:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this could be refactored for maintainability and readability. Consider creating a mapping of state transitions and a helper referring to the map. sending you a DM

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consolidated state transition logic and removed unused code f3f7459

@eliteprox eliteprox requested a review from pschroedl August 18, 2025 22:16
@eliteprox eliteprox force-pushed the feat/add-server-config branch from 1191a82 to f1b72a0 Compare August 19, 2025 01:57
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants