-
Notifications
You must be signed in to change notification settings - Fork 63
Streaming support #365
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Streaming support #365
Changes from all commits
fe6832e
db0a837
86dd667
f2eab4f
b25e820
5c5e542
f7596b7
394ac5f
6bbc162
fecb283
9ffc01a
c4ffef3
2cafba5
ac8ce42
23be09e
2072ab4
d6b0293
2beb595
3e4500b
a76d913
3cf0222
f626b57
4fcfce9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,6 +16,7 @@ | |
|
||
from UnleashClient.api import register_client | ||
from UnleashClient.constants import ( | ||
APPLICATION_HEADERS, | ||
DISABLED_VARIATION, | ||
ETAG, | ||
METRIC_LAST_SENT_TIME, | ||
|
@@ -35,6 +36,7 @@ | |
aggregate_and_send_metrics, | ||
fetch_and_load_features, | ||
) | ||
from UnleashClient.streaming import StreamingConnector, StreamingEventProcessor | ||
|
||
from .cache import BaseCache, FileCache | ||
from .utils import LOGGER, InstanceAllowType, InstanceCounter | ||
|
@@ -111,7 +113,7 @@ class UnleashClient: | |
:param event_callback: Function to call if impression events are enabled. WARNING: Depending on your event library, this may have performance implications! | ||
""" | ||
|
||
def __init__( | ||
def __init__( # noqa: PLR0915 | ||
self, | ||
url: str, | ||
app_name: str, | ||
|
@@ -136,6 +138,8 @@ def __init__( | |
scheduler_executor: Optional[str] = None, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In Ruby we managed to re-use fetcher_scheduled_executor that is either polling toggle fetcher or streaming. Check usage of https://github.com/Unleash/unleash-ruby-sdk/blob/main/lib/unleash/client.rb#L14 for details. @sighphyre started a great discussion about it here https://github.com/Unleash/unleash-ruby-sdk/pull/248/files#r2262867667. Since Python is similar to Ruby it should be doable too. In Java it was too difficult. In Node I will try to migrate towards this approach too in a subsequent PR. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed! This SDK is a bit more complex than Ruby but the abstractions are more or less sane so I don't think this should be massively challenging |
||
multiple_instance_mode: InstanceAllowType = InstanceAllowType.WARN, | ||
event_callback: Optional[Callable[[BaseEvent], None]] = None, | ||
experimental_mode: Optional[dict] = None, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's really unclear to me how to use this. I think it wants a proper type on it so that end users can leverage their type checker |
||
sse_client_factory: Optional[Callable] = None, | ||
) -> None: | ||
custom_headers = custom_headers or {} | ||
custom_options = custom_options or {} | ||
|
@@ -169,6 +173,9 @@ def __init__( | |
self.unleash_verbose_log_level = verbose_log_level | ||
self.unleash_event_callback = event_callback | ||
self._ready_callback = build_ready_callback(event_callback) | ||
self.experimental_mode = experimental_mode | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't like the fact that we have explosion of fields that work for either polling or streaming. In clear OO design we'd have swappable mechanism for one fetching strategy at a time. I know in Java and .NET we also mix but I plan to fix this in Node. Maybe worth investigating this option in Python? |
||
self._streaming_connector: Optional[StreamingConnector] = None | ||
self._sse_client_factory = sse_client_factory | ||
|
||
self._do_instance_check(multiple_instance_mode) | ||
|
||
|
@@ -267,8 +274,10 @@ def initialize_client(self, fetch_toggles: bool = True) -> None: | |
try: | ||
base_headers = { | ||
**self.unleash_custom_headers, | ||
**APPLICATION_HEADERS, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why these changes here? What does this fix? What test breaks without it and what test ensures that it now correctly works? |
||
"unleash-connection-id": self.connection_id, | ||
"unleash-appname": self.unleash_app_name, | ||
"unleash-instanceid": self.unleash_instance_id, | ||
"unleash-sdk": f"{SDK_NAME}:{SDK_VERSION}", | ||
} | ||
|
||
|
@@ -277,7 +286,6 @@ def initialize_client(self, fetch_toggles: bool = True) -> None: | |
"unleash-interval": self.unleash_metrics_interval_str_millis, | ||
} | ||
|
||
# Setup | ||
metrics_args = { | ||
"url": self.unleash_url, | ||
"app_name": self.unleash_app_name, | ||
|
@@ -289,6 +297,12 @@ def initialize_client(self, fetch_toggles: bool = True) -> None: | |
"engine": self.engine, | ||
} | ||
|
||
base_job_args = { | ||
"cache": self.cache, | ||
"engine": self.engine, | ||
"ready_callback": self._ready_callback, | ||
} | ||
|
||
# Register app | ||
if not self.unleash_disable_registration: | ||
register_client( | ||
|
@@ -303,47 +317,68 @@ def initialize_client(self, fetch_toggles: bool = True) -> None: | |
self.unleash_request_timeout, | ||
) | ||
|
||
if fetch_toggles: | ||
fetch_headers = { | ||
**base_headers, | ||
"unleash-interval": self.unleash_refresh_interval_str_millis, | ||
} | ||
|
||
job_args = { | ||
"url": self.unleash_url, | ||
"app_name": self.unleash_app_name, | ||
"instance_id": self.unleash_instance_id, | ||
"headers": fetch_headers, | ||
"custom_options": self.unleash_custom_options, | ||
"cache": self.cache, | ||
"engine": self.engine, | ||
"request_timeout": self.unleash_request_timeout, | ||
"request_retries": self.unleash_request_retries, | ||
"project": self.unleash_project_name, | ||
"event_callback": self.unleash_event_callback, | ||
"ready_callback": self._ready_callback, | ||
} | ||
job_func: Callable = fetch_and_load_features | ||
else: | ||
job_args = { | ||
"cache": self.cache, | ||
"engine": self.engine, | ||
"ready_callback": self._ready_callback, | ||
} | ||
job_func = load_features | ||
|
||
job_func(**job_args) # type: ignore | ||
# Start periodic jobs | ||
self.unleash_scheduler.start() | ||
self.fl_job = self.unleash_scheduler.add_job( | ||
job_func, | ||
trigger=IntervalTrigger( | ||
seconds=int(self.unleash_refresh_interval), | ||
jitter=self.unleash_refresh_jitter, | ||
), | ||
executor=self.unleash_executor_name, | ||
kwargs=job_args, | ||
|
||
# Decide upstream connection mode | ||
mode = ( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's not a major issue but I think the belongs somewhere else. Be cool to see a a function that returns an Enum of POLLING | OFFLINE | STREAMING or something like that |
||
(self.experimental_mode or {}).get("type") | ||
if self.experimental_mode | ||
else None | ||
) | ||
|
||
if mode != "streaming": | ||
if fetch_toggles: | ||
# MODE: polling | ||
|
||
job_args = { | ||
**base_job_args, | ||
"url": self.unleash_url, | ||
"app_name": self.unleash_app_name, | ||
"instance_id": self.unleash_instance_id, | ||
"headers": { | ||
**base_headers, | ||
"unleash-interval": self.unleash_refresh_interval_str_millis, | ||
}, | ||
"custom_options": self.unleash_custom_options, | ||
"request_timeout": self.unleash_request_timeout, | ||
"request_retries": self.unleash_request_retries, | ||
"event_callback": self.unleash_event_callback, | ||
"project": self.unleash_project_name, | ||
} | ||
|
||
job_func: Callable | ||
job_func = fetch_and_load_features | ||
Comment on lines
+334
to
+351
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My goal here was to avoid larger refactor. I'd like to extract 'mode' (fetching strategy / connector) into a separate layer, but in this SDK fetching runs on the same 'scheduler' as metrics. Decoupling this should be done in another PR in my opinion. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree it belongs in a different PR but I think that PR should come before this one. This function is now 150 lines, in the Python world that's a monster of a function. I think we're starting to see that show itself in the form of comments like "MODE: Polling" which are trying to add order to the chaos here. IMO this is now too complex and needs to be split up. I want to say the Poller vs Streamer abstraction is probably what we want here |
||
else: | ||
# MODE: offline | ||
|
||
job_args = base_job_args | ||
job_func = load_features | ||
Comment on lines
+352
to
+356
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Python SDK sets up periodic update on bootstrapped toggles. I don't think this is consistent across SDKs There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep, it's a Python specific thing. It's to work around the incompatibility between the threading model used in this SDK and server tech like gunicorn which uses a forked process model. |
||
|
||
job_func(**job_args) # type: ignore | ||
self.fl_job = self.unleash_scheduler.add_job( | ||
job_func, | ||
trigger=IntervalTrigger( | ||
seconds=int(self.unleash_refresh_interval), | ||
jitter=self.unleash_refresh_jitter, | ||
), | ||
executor=self.unleash_executor_name, | ||
kwargs=job_args, | ||
) | ||
else: | ||
# MODE: streaming | ||
processor = StreamingEventProcessor(self.engine) | ||
self._streaming_connector = StreamingConnector( | ||
url=self.unleash_url, | ||
headers=base_headers, | ||
request_timeout=self.unleash_request_timeout, | ||
event_processor=processor, | ||
on_ready=self._ready_callback, | ||
sse_client_factory=self._sse_client_factory, | ||
custom_options=self.unleash_custom_options, | ||
) | ||
self._streaming_connector.start() | ||
|
||
if not self.unleash_disable_metrics: | ||
self.metric_job = self.unleash_scheduler.add_job( | ||
aggregate_and_send_metrics, | ||
|
@@ -396,7 +431,11 @@ def destroy(self) -> None: | |
|
||
You shouldn't need this too much! | ||
""" | ||
self.fl_job.remove() | ||
try: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why the new try catch? Seems to only be for polling and if was never added, streaming won't raise an exception here |
||
if self.fl_job: | ||
self.fl_job.remove() | ||
except Exception: # best-effort | ||
pass | ||
if self.metric_job: | ||
self.metric_job.remove() | ||
|
||
|
@@ -411,7 +450,11 @@ def destroy(self) -> None: | |
request_timeout=self.unleash_request_timeout, | ||
engine=self.engine, | ||
) | ||
|
||
if self._streaming_connector: | ||
try: | ||
self._streaming_connector.stop() | ||
except Exception: | ||
pass | ||
self.unleash_scheduler.shutdown() | ||
self.cache.destroy() | ||
|
||
|
@@ -513,9 +556,9 @@ def get_variant(self, feature_name: str, context: Optional[dict] = None) -> dict | |
event_type=UnleashEventType.VARIANT, | ||
event_id=uuid.uuid4(), | ||
context=context, | ||
enabled=variant["enabled"], | ||
enabled=bool(variant["enabled"]), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why this change? |
||
feature_name=feature_name, | ||
variant=variant["name"], | ||
variant=str(variant["name"]), | ||
) | ||
|
||
self.unleash_event_callback(event) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
# ruff: noqa: F401 | ||
# Streaming package | ||
|
||
from .connector import StreamingConnector | ||
from .event_processor import StreamingEventProcessor |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,123 @@ | ||
import threading | ||
from typing import Callable, Optional | ||
|
||
from ld_eventsource import SSEClient | ||
from ld_eventsource.config import ConnectStrategy, ErrorStrategy, RetryDelayStrategy | ||
|
||
from UnleashClient.constants import STREAMING_URL | ||
from UnleashClient.utils import LOGGER | ||
|
||
from .event_processor import StreamingEventProcessor | ||
|
||
|
||
class StreamingConnector: | ||
""" | ||
Manages the SSE connection lifecycle with reconnect/backoff and delegates | ||
event handling to an injected StreamingEventProcessor. | ||
""" | ||
|
||
def __init__( | ||
self, | ||
url: str, | ||
headers: dict, | ||
request_timeout: int, | ||
event_processor: StreamingEventProcessor, | ||
on_ready: Optional[Callable[[], None]] = None, | ||
sse_client_factory: Optional[Callable[[str, dict, int], SSEClient]] = None, | ||
backoff_initial: float = 2.0, | ||
backoff_max: float = 30.0, | ||
backoff_multiplier: float = 2.0, | ||
backoff_jitter: Optional[float] = 0.5, | ||
custom_options: Optional[dict] = None, | ||
) -> None: | ||
self._base_url = url.rstrip("/") + STREAMING_URL | ||
self._headers = {**headers, "Accept": "text/event-stream"} | ||
self._timeout = request_timeout | ||
self._on_ready = on_ready | ||
self._sse_factory = sse_client_factory | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. creating testability/swapping option is only valuable if you gonnd use this option. From my review it looks like we exposed this option (increased complexity) but we never exercised this option. I think in the production implementation of a connector.py we only need to wrap LD client. More detail why too much dependency inversion can be harmful: https://dannorth.net/blog/cupid-the-back-story/#dependency-inversion-principle There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 for this. Too much injection is how you end up with a need for IoC containers shudder. No one wants that |
||
self._backoff_initial = backoff_initial | ||
self._backoff_max = backoff_max | ||
self._backoff_multiplier = backoff_multiplier | ||
self._backoff_jitter = backoff_jitter | ||
self._stop = threading.Event() | ||
self._thread: Optional[threading.Thread] = None | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't understand the need for a thread to be passed here. The code checks that it's both present and alive or else it creates its own. I don't see a way of passing a running thread to this that's helpful |
||
self._lock = threading.Lock() | ||
self._processor = event_processor | ||
base_options = custom_options or {} | ||
if self._timeout is not None and "timeout" not in base_options: | ||
base_options = {"timeout": self._timeout, **base_options} | ||
self._custom_options = base_options | ||
|
||
def start(self): | ||
if self._thread and self._thread.is_alive(): | ||
return | ||
self._stop.clear() | ||
self._thread = threading.Thread( | ||
target=self._run, name="UnleashStreaming", daemon=True | ||
) | ||
self._thread.start() | ||
|
||
def stop(self): | ||
self._stop.set() | ||
if self._thread: | ||
self._thread.join(timeout=5) | ||
|
||
def _run(self): # noqa: PLR0912 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A few thoughts on this function
|
||
""" | ||
Main streaming loop. Creates SSEClient once and lets it handle retries internally. | ||
Only recreates client if there's a catastrophic failure. | ||
""" | ||
client: Optional[SSEClient] = None | ||
|
||
try: | ||
LOGGER.info("Connecting to Unleash streaming endpoint: %s", self._base_url) | ||
|
||
if self._sse_factory: | ||
client = self._sse_factory(self._base_url, self._headers, self._timeout) | ||
else: | ||
connect_strategy = ConnectStrategy.http( | ||
self._base_url, | ||
headers=self._headers, | ||
urllib3_request_options=self._custom_options, | ||
) | ||
|
||
retry_strategy = RetryDelayStrategy.default( | ||
max_delay=self._backoff_max, | ||
backoff_multiplier=self._backoff_multiplier, | ||
jitter_multiplier=self._backoff_jitter, | ||
) | ||
|
||
client = SSEClient( | ||
connect=connect_strategy, | ||
initial_retry_delay=self._backoff_initial, | ||
retry_delay_strategy=retry_strategy, | ||
retry_delay_reset_threshold=60.0, | ||
error_strategy=ErrorStrategy.always_continue(), | ||
logger=LOGGER, | ||
) | ||
|
||
for event in client.events: | ||
if self._stop.is_set(): | ||
break | ||
if not event.event: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This feels like it's sharing a responsibility with the event_processor. Small detail but an event should be handled in one place imo. If that's ignoring it or handling it, it's all part of the same logic. I also think this is a pretty big problem in the library if we're getting events with no event property. Glancing at the source for the lib that's never intended to happen so if we want to be defensive about it, we should probably log it |
||
continue | ||
|
||
self._processor.process(event) | ||
if event.event == "unleash-connected" and self._processor.hydrated: | ||
if self._on_ready: | ||
try: | ||
self._on_ready() | ||
except Exception as cb_exc: # noqa: BLE001 | ||
LOGGER.debug("Ready callback error: %s", cb_exc) | ||
|
||
LOGGER.debug("SSE stream ended") | ||
|
||
except Exception as exc: # noqa: BLE001 | ||
LOGGER.warning("Streaming connection failed: %s", exc) | ||
|
||
finally: | ||
try: | ||
if client is not None: | ||
client.close() | ||
except Exception: # noqa: BLE001 | ||
pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why was it added?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"too many statements" linting exception
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
26 is... well... a LOT of parameters. I'm open to being told this isn't the right time but gosh this feels like we should fix this at this point
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As the person most responsible for this, I would tend to agree but any change would be (kind of by definition) backwards incompatible. I would be on board with updating that but I think it would be a major version bump. (Unless you want a v2 client object...but that way lies maintenance headaches).
If I did this nowadays (with the benefit of like 10 years more experience), I would probably split between required and pseudo-required arguments (url an headers respectively) and put some of the less important/optional configuration (jitter) in an options dataclass or similar.