diff --git a/UnleashClient/__init__.py b/UnleashClient/__init__.py index 221d24d1..80573c47 100644 --- a/UnleashClient/__init__.py +++ b/UnleashClient/__init__.py @@ -16,6 +16,7 @@ from UnleashClient.api import register_client from UnleashClient.constants import ( + APPLICATION_HEADERS, DISABLED_VARIATION, ETAG, METRIC_LAST_SENT_TIME, @@ -35,6 +36,7 @@ aggregate_and_send_metrics, fetch_and_load_features, ) +from UnleashClient.streaming import StreamingConnector, StreamingEventProcessor from .cache import BaseCache, FileCache from .utils import LOGGER, InstanceAllowType, InstanceCounter @@ -111,7 +113,7 @@ class UnleashClient: :param event_callback: Function to call if impression events are enabled. WARNING: Depending on your event library, this may have performance implications! """ - def __init__( + def __init__( # noqa: PLR0915 self, url: str, app_name: str, @@ -136,6 +138,8 @@ def __init__( scheduler_executor: Optional[str] = None, multiple_instance_mode: InstanceAllowType = InstanceAllowType.WARN, event_callback: Optional[Callable[[BaseEvent], None]] = None, + experimental_mode: Optional[dict] = None, + sse_client_factory: Optional[Callable] = None, ) -> None: custom_headers = custom_headers or {} custom_options = custom_options or {} @@ -169,6 +173,9 @@ def __init__( self.unleash_verbose_log_level = verbose_log_level self.unleash_event_callback = event_callback self._ready_callback = build_ready_callback(event_callback) + self.experimental_mode = experimental_mode + self._streaming_connector: Optional[StreamingConnector] = None + self._sse_client_factory = sse_client_factory self._do_instance_check(multiple_instance_mode) @@ -267,8 +274,10 @@ def initialize_client(self, fetch_toggles: bool = True) -> None: try: base_headers = { **self.unleash_custom_headers, + **APPLICATION_HEADERS, "unleash-connection-id": self.connection_id, "unleash-appname": self.unleash_app_name, + "unleash-instanceid": self.unleash_instance_id, "unleash-sdk": f"{SDK_NAME}:{SDK_VERSION}", } @@ -277,7 +286,6 @@ def initialize_client(self, fetch_toggles: bool = True) -> None: "unleash-interval": self.unleash_metrics_interval_str_millis, } - # Setup metrics_args = { "url": self.unleash_url, "app_name": self.unleash_app_name, @@ -289,6 +297,12 @@ def initialize_client(self, fetch_toggles: bool = True) -> None: "engine": self.engine, } + base_job_args = { + "cache": self.cache, + "engine": self.engine, + "ready_callback": self._ready_callback, + } + # Register app if not self.unleash_disable_registration: register_client( @@ -303,47 +317,68 @@ def initialize_client(self, fetch_toggles: bool = True) -> None: self.unleash_request_timeout, ) - if fetch_toggles: - fetch_headers = { - **base_headers, - "unleash-interval": self.unleash_refresh_interval_str_millis, - } - - job_args = { - "url": self.unleash_url, - "app_name": self.unleash_app_name, - "instance_id": self.unleash_instance_id, - "headers": fetch_headers, - "custom_options": self.unleash_custom_options, - "cache": self.cache, - "engine": self.engine, - "request_timeout": self.unleash_request_timeout, - "request_retries": self.unleash_request_retries, - "project": self.unleash_project_name, - "event_callback": self.unleash_event_callback, - "ready_callback": self._ready_callback, - } - job_func: Callable = fetch_and_load_features - else: - job_args = { - "cache": self.cache, - "engine": self.engine, - "ready_callback": self._ready_callback, - } - job_func = load_features - - job_func(**job_args) # type: ignore # Start periodic jobs self.unleash_scheduler.start() - self.fl_job = self.unleash_scheduler.add_job( - job_func, - trigger=IntervalTrigger( - seconds=int(self.unleash_refresh_interval), - jitter=self.unleash_refresh_jitter, - ), - executor=self.unleash_executor_name, - kwargs=job_args, + + # Decide upstream connection mode + mode = ( + (self.experimental_mode or {}).get("type") + if self.experimental_mode + else None ) + + if mode != "streaming": + if fetch_toggles: + # MODE: polling + + job_args = { + **base_job_args, + "url": self.unleash_url, + "app_name": self.unleash_app_name, + "instance_id": self.unleash_instance_id, + "headers": { + **base_headers, + "unleash-interval": self.unleash_refresh_interval_str_millis, + }, + "custom_options": self.unleash_custom_options, + "request_timeout": self.unleash_request_timeout, + "request_retries": self.unleash_request_retries, + "event_callback": self.unleash_event_callback, + "project": self.unleash_project_name, + } + + job_func: Callable + job_func = fetch_and_load_features + else: + # MODE: offline + + job_args = base_job_args + job_func = load_features + + job_func(**job_args) # type: ignore + self.fl_job = self.unleash_scheduler.add_job( + job_func, + trigger=IntervalTrigger( + seconds=int(self.unleash_refresh_interval), + jitter=self.unleash_refresh_jitter, + ), + executor=self.unleash_executor_name, + kwargs=job_args, + ) + else: + # MODE: streaming + processor = StreamingEventProcessor(self.engine) + self._streaming_connector = StreamingConnector( + url=self.unleash_url, + headers=base_headers, + request_timeout=self.unleash_request_timeout, + event_processor=processor, + on_ready=self._ready_callback, + sse_client_factory=self._sse_client_factory, + custom_options=self.unleash_custom_options, + ) + self._streaming_connector.start() + if not self.unleash_disable_metrics: self.metric_job = self.unleash_scheduler.add_job( aggregate_and_send_metrics, @@ -396,7 +431,11 @@ def destroy(self) -> None: You shouldn't need this too much! """ - self.fl_job.remove() + try: + if self.fl_job: + self.fl_job.remove() + except Exception: # best-effort + pass if self.metric_job: self.metric_job.remove() @@ -411,7 +450,11 @@ def destroy(self) -> None: request_timeout=self.unleash_request_timeout, engine=self.engine, ) - + if self._streaming_connector: + try: + self._streaming_connector.stop() + except Exception: + pass self.unleash_scheduler.shutdown() self.cache.destroy() @@ -513,9 +556,9 @@ def get_variant(self, feature_name: str, context: Optional[dict] = None) -> dict event_type=UnleashEventType.VARIANT, event_id=uuid.uuid4(), context=context, - enabled=variant["enabled"], + enabled=bool(variant["enabled"]), feature_name=feature_name, - variant=variant["name"], + variant=str(variant["name"]), ) self.unleash_event_callback(event) diff --git a/UnleashClient/constants.py b/UnleashClient/constants.py index f6ba034e..c9c06e6d 100644 --- a/UnleashClient/constants.py +++ b/UnleashClient/constants.py @@ -6,7 +6,7 @@ REQUEST_TIMEOUT = 30 REQUEST_RETRIES = 3 METRIC_LAST_SENT_TIME = "mlst" -CLIENT_SPEC_VERSION = "5.1.9" +CLIENT_SPEC_VERSION = "5.2.2" # =Unleash= APPLICATION_HEADERS = { @@ -19,6 +19,7 @@ REGISTER_URL = "/client/register" FEATURES_URL = "/client/features" METRICS_URL = "/client/metrics" +STREAMING_URL = "/client/streaming" # Cache keys FAILED_STRATEGIES = "failed_strategies" diff --git a/UnleashClient/streaming/__init__.py b/UnleashClient/streaming/__init__.py new file mode 100644 index 00000000..ef464852 --- /dev/null +++ b/UnleashClient/streaming/__init__.py @@ -0,0 +1,5 @@ +# ruff: noqa: F401 +# Streaming package + +from .connector import StreamingConnector +from .event_processor import StreamingEventProcessor diff --git a/UnleashClient/streaming/connector.py b/UnleashClient/streaming/connector.py new file mode 100644 index 00000000..eb200de9 --- /dev/null +++ b/UnleashClient/streaming/connector.py @@ -0,0 +1,123 @@ +import threading +from typing import Callable, Optional + +from ld_eventsource import SSEClient +from ld_eventsource.config import ConnectStrategy, ErrorStrategy, RetryDelayStrategy + +from UnleashClient.constants import STREAMING_URL +from UnleashClient.utils import LOGGER + +from .event_processor import StreamingEventProcessor + + +class StreamingConnector: + """ + Manages the SSE connection lifecycle with reconnect/backoff and delegates + event handling to an injected StreamingEventProcessor. + """ + + def __init__( + self, + url: str, + headers: dict, + request_timeout: int, + event_processor: StreamingEventProcessor, + on_ready: Optional[Callable[[], None]] = None, + sse_client_factory: Optional[Callable[[str, dict, int], SSEClient]] = None, + backoff_initial: float = 2.0, + backoff_max: float = 30.0, + backoff_multiplier: float = 2.0, + backoff_jitter: Optional[float] = 0.5, + custom_options: Optional[dict] = None, + ) -> None: + self._base_url = url.rstrip("/") + STREAMING_URL + self._headers = {**headers, "Accept": "text/event-stream"} + self._timeout = request_timeout + self._on_ready = on_ready + self._sse_factory = sse_client_factory + self._backoff_initial = backoff_initial + self._backoff_max = backoff_max + self._backoff_multiplier = backoff_multiplier + self._backoff_jitter = backoff_jitter + self._stop = threading.Event() + self._thread: Optional[threading.Thread] = None + self._lock = threading.Lock() + self._processor = event_processor + base_options = custom_options or {} + if self._timeout is not None and "timeout" not in base_options: + base_options = {"timeout": self._timeout, **base_options} + self._custom_options = base_options + + def start(self): + if self._thread and self._thread.is_alive(): + return + self._stop.clear() + self._thread = threading.Thread( + target=self._run, name="UnleashStreaming", daemon=True + ) + self._thread.start() + + def stop(self): + self._stop.set() + if self._thread: + self._thread.join(timeout=5) + + def _run(self): # noqa: PLR0912 + """ + Main streaming loop. Creates SSEClient once and lets it handle retries internally. + Only recreates client if there's a catastrophic failure. + """ + client: Optional[SSEClient] = None + + try: + LOGGER.info("Connecting to Unleash streaming endpoint: %s", self._base_url) + + if self._sse_factory: + client = self._sse_factory(self._base_url, self._headers, self._timeout) + else: + connect_strategy = ConnectStrategy.http( + self._base_url, + headers=self._headers, + urllib3_request_options=self._custom_options, + ) + + retry_strategy = RetryDelayStrategy.default( + max_delay=self._backoff_max, + backoff_multiplier=self._backoff_multiplier, + jitter_multiplier=self._backoff_jitter, + ) + + client = SSEClient( + connect=connect_strategy, + initial_retry_delay=self._backoff_initial, + retry_delay_strategy=retry_strategy, + retry_delay_reset_threshold=60.0, + error_strategy=ErrorStrategy.always_continue(), + logger=LOGGER, + ) + + for event in client.events: + if self._stop.is_set(): + break + if not event.event: + continue + + self._processor.process(event) + if event.event == "unleash-connected" and self._processor.hydrated: + if self._on_ready: + try: + self._on_ready() + except Exception as cb_exc: # noqa: BLE001 + LOGGER.debug("Ready callback error: %s", cb_exc) + + LOGGER.debug("SSE stream ended") + + except Exception as exc: # noqa: BLE001 + LOGGER.warning("Streaming connection failed: %s", exc) + + finally: + try: + if client is not None: + client.close() + except Exception: # noqa: BLE001 + pass diff --git a/UnleashClient/streaming/event_processor.py b/UnleashClient/streaming/event_processor.py new file mode 100644 index 00000000..197d1435 --- /dev/null +++ b/UnleashClient/streaming/event_processor.py @@ -0,0 +1,62 @@ +from __future__ import annotations + +from threading import Lock +from typing import Any + +from yggdrasil_engine.engine import UnleashEngine + +from UnleashClient.utils import LOGGER + + +class StreamingEventProcessor: + """ + Processes SSE events from the Unleash streaming endpoint and applies + resulting deltas/state to the provided engine in a thread-safe manner. + + This class is deliberately unaware of connection/reconnect concerns; it + only deals with event semantics. + """ + + def __init__(self, engine: UnleashEngine) -> None: + self._engine = engine + self._lock = Lock() + self._hydrated = False + + @property + def hydrated(self) -> bool: + return self._hydrated + + def process(self, event: Any) -> None: + """ + Handle a single SSE event object. The object is expected to have + attributes `event` (type) and `data` (payload string or dict). + """ + try: + etype = getattr(event, "event", None) + if not etype: + return + + if etype == "unleash-connected": + self._handle_connected(event) + elif etype == "unleash-updated": + self._handle_updated(event) + else: + LOGGER.debug("Ignoring SSE event type: %s", etype) + except Exception as exc: # noqa: BLE001 + LOGGER.warning("Error processing SSE event: %s", exc) + + def _apply_delta(self, event_data: Any) -> None: + if not event_data: + return + with self._lock: + self._engine.take_state(event_data) + # TODO: backup file + + def _handle_connected(self, event: Any) -> None: + LOGGER.debug("Processing initial hydration data") + self._apply_delta(getattr(event, "data", None)) + if not self._hydrated: + self._hydrated = True + + def _handle_updated(self, event: Any) -> None: + self._apply_delta(getattr(event, "data", None)) diff --git a/pyproject.toml b/pyproject.toml index d09f1194..bfee8565 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,7 +31,8 @@ dependencies=[ "importlib_metadata", "python-dateutil", "semver < 4.0.0", - "yggdrasil-engine", + "yggdrasil-engine >= 1.0.0", + "launchdarkly-eventsource", ] [project.urls] diff --git a/requirements.txt b/requirements.txt index b2bd7c15..75ec0b96 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,7 +6,7 @@ mmhash3 python-dateutil requests semver -yggdrasil-engine +yggdrasil-engine>=1.0.0 # Development packages # - Testing diff --git a/tests/integration_tests/integration_unleash.py b/tests/integration_tests/integration_unleash.py new file mode 100644 index 00000000..48d5da36 --- /dev/null +++ b/tests/integration_tests/integration_unleash.py @@ -0,0 +1,39 @@ +# --- +import logging +import os +import sys +import time + +from UnleashClient import UnleashClient + +root = logging.getLogger() +root.setLevel(logging.DEBUG) + +handler = logging.StreamHandler(sys.stdout) +handler.setLevel(logging.DEBUG) +formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") +handler.setFormatter(formatter) +root.addHandler(handler) +# --- + +api_url = os.getenv("UNLEASH_API_URL", "https://app.unleash-hosted.com/demo/api") +api_token = os.getenv( + "UNLEASH_API_TOKEN", + "demo-app:dev.9fc74dd72d2b88bea5253c04240b21a54841f08d9918046ed55a06b5", +) +flag = "example-flag" +use_streaming = os.getenv("USE_STREAMING", "true").lower() == "true" + +client = UnleashClient( + url=api_url, + app_name="integration-python", + custom_headers={"Authorization": api_token}, + experimental_mode={"type": "streaming"} if use_streaming else None, + metrics_interval=1, +) + +client.initialize_client() + +while True: + print(f"'{flag}' is enabled: {client.is_enabled(flag)}") + time.sleep(3) diff --git a/tests/integration_tests/integration_unleashheroku.py b/tests/integration_tests/integration_unleashheroku.py deleted file mode 100644 index 8c68827b..00000000 --- a/tests/integration_tests/integration_unleashheroku.py +++ /dev/null @@ -1,29 +0,0 @@ -# --- -import logging -import sys -import time - -from UnleashClient import UnleashClient - -root = logging.getLogger() -root.setLevel(logging.DEBUG) - -handler = logging.StreamHandler(sys.stdout) -handler.setLevel(logging.DEBUG) -formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") -handler.setFormatter(formatter) -root.addHandler(handler) -# --- - -my_client = UnleashClient( - url="https://unleash.herokuapp.com/api", - environment="staging", - app_name="pyIvan", -) - -my_client.initialize_client() - -while True: - time.sleep(10) - context = {"userId": "1", "sound": "woof"} - print(f"ivantest: {my_client.is_enabled('ivantest', context)}") diff --git a/tests/unit_tests/streaming/test_client_streaming.py b/tests/unit_tests/streaming/test_client_streaming.py new file mode 100644 index 00000000..7b718cdc --- /dev/null +++ b/tests/unit_tests/streaming/test_client_streaming.py @@ -0,0 +1,122 @@ +from __future__ import annotations + +import json +import time + +import pytest + +from UnleashClient import INSTANCES, UnleashClient + + +@pytest.fixture(autouse=True) +def reset_instances(): + INSTANCES._reset() + + +class FakeEvent: + def __init__(self, event: str, data): + self.event = event + self.data = data + + +class FiniteSSEClient: + """SSE client that yields given events then stops.""" + + def __init__(self, events): + self._events = list(events) + self.closed = False + + @property + def events(self): + for e in self._events: + if self.closed: + break + yield e + + def close(self): + self.closed = True + + def interrupt(self): + self.close() + + +def _state_with_feature(name: str) -> str: + payload = { + "version": 1, + "features": [ + {"name": name, "enabled": True, "strategies": [{"name": "default"}]} + ], + "segments": [], + } + return json.dumps(payload) + + +def test_streaming_processes_unleash_connected_event(): + captured = {} + + def factory(url, headers, timeout): + captured["url"] = url + captured["headers"] = headers + return FiniteSSEClient( + [FakeEvent("unleash-connected", _state_with_feature("test-feature"))] + ) + + client = UnleashClient( + url="http://unleash.example", + app_name="my-test-app", + instance_id="rspec/test", + disable_metrics=True, + disable_registration=True, + custom_headers={"X-API-KEY": "123"}, + experimental_mode={"type": "streaming"}, + sse_client_factory=factory, + ) + + try: + client.initialize_client() + time.sleep(0.05) + + assert captured["url"].endswith("/client/streaming") + assert captured["headers"]["X-API-KEY"] == "123" + + assert client.is_enabled("test-feature") is True + finally: + client.destroy() + + +def test_streaming_processes_unleash_updated_event(): + captured = {} + + def factory(url, headers, timeout): + captured["url"] = url + captured["headers"] = headers + empty_state = json.dumps({"version": 1, "features": [], "segments": []}) + update_state = _state_with_feature("test-feature") + return FiniteSSEClient( + [ + FakeEvent("unleash-connected", empty_state), + FakeEvent("unleash-updated", update_state), + ] + ) + + client = UnleashClient( + url="http://unleash.example", + app_name="my-test-app", + instance_id="rspec/test", + disable_metrics=True, + disable_registration=True, + custom_headers={"X-API-KEY": "123"}, + experimental_mode={"type": "streaming"}, + sse_client_factory=factory, + ) + + try: + client.initialize_client() + time.sleep(0.05) + + assert captured["url"].endswith("/client/streaming") + assert captured["headers"]["X-API-KEY"] == "123" + + assert client.is_enabled("test-feature") is True + finally: + client.destroy() diff --git a/tests/unit_tests/streaming/test_event_processor.py b/tests/unit_tests/streaming/test_event_processor.py new file mode 100644 index 00000000..f0eca6ba --- /dev/null +++ b/tests/unit_tests/streaming/test_event_processor.py @@ -0,0 +1,55 @@ +from __future__ import annotations + +from UnleashClient.streaming.event_processor import StreamingEventProcessor + + +class FakeEngine: + def __init__(self): + self.states = [] + + def take_state(self, state): + self.states.append(state) + + +class FakeEvent: + def __init__(self, event: str, data): + self.event = event + self.data = data + + +def test_processor_hydrates_on_connected(): + engine = FakeEngine() + processor = StreamingEventProcessor(engine) + + assert processor.hydrated is False + + payload = {"version": 1, "features": [], "segments": []} + processor.process(FakeEvent("unleash-connected", payload)) + + assert processor.hydrated is True + assert engine.states == [payload] + + +def test_processor_applies_updates(): + engine = FakeEngine() + processor = StreamingEventProcessor(engine) + + connected_payload = {"version": 1, "features": ["f1"], "segments": []} + update_payload = {"version": 2, "features": ["f1", "f2"], "segments": []} + + processor.process(FakeEvent("unleash-connected", connected_payload)) + processor.process(FakeEvent("unleash-updated", update_payload)) + + assert processor.hydrated is True + assert engine.states == [connected_payload, update_payload] + + +def test_processor_ignores_unknown_event_types(): + engine = FakeEngine() + processor = StreamingEventProcessor(engine) + + processor.process(FakeEvent("heartbeat", {})) + processor.process(FakeEvent("message", {})) + + # No states should be applied + assert engine.states == []