diff --git a/async_rithmic/plants/history.py b/async_rithmic/plants/history.py index 3a18a7e..49530ac 100644 --- a/async_rithmic/plants/history.py +++ b/async_rithmic/plants/history.py @@ -15,8 +15,15 @@ def __init__(self, *args, **kwargs): self.historical_tick_data = defaultdict(list) self.historical_time_bar_data = defaultdict(list) - self.historical_tick_event = asyncio.Event() - self.historical_time_bar_event = asyncio.Event() + # Per-request events keyed by f"{symbol}" (tick) or f"{symbol}_{type}" (time bar). + # Fixes two races in the prior single-shared-event design: + # 1) Empty response: is_last_bar marker sets the event before any data + # callbacks fire, so pop(key) raised KeyError (fixed here with per-key + # events + .pop default). + # 2) Concurrent requests: a second caller would overwrite the shared event, + # causing the first response to wake the second caller prematurely. + self.historical_tick_events: dict = {} + self.historical_time_bar_events: dict = {} self.client.on_historical_tick += self._on_historical_tick self.client.on_historical_time_bar += self._on_historical_time_bar @@ -56,9 +63,11 @@ async def get_historical_tick_data( :param start_time: (dt) start time as datetime in utc :param end_time: (dt) end time as datetime in utc """ + key = f"{symbol}" if wait: - self.historical_tick_event = asyncio.Event() + event = asyncio.Event() + self.historical_tick_events[key] = event await self._send_and_recv_immediate( template_id=206, @@ -75,19 +84,15 @@ async def get_historical_tick_data( # Wait until all the historical data has been fetched before returning it if wait: - key = f"{symbol}" - try: - await asyncio.wait_for(self.historical_tick_event.wait(), 5.0) + await asyncio.wait_for(event.wait(), 5.0) except asyncio.TimeoutError: - if len(self.historical_tick_data[key]) == 0: - # No data returned by Rithmic for the request - return [] - - await self.historical_tick_event.wait() + # No response within 5s — return whatever accumulated (may be empty) + pass + finally: + self.historical_tick_events.pop(key, None) - data = self.historical_tick_data.pop(key) - return data + return self.historical_tick_data.pop(key, []) async def get_historical_time_bars( self, @@ -99,8 +104,11 @@ async def get_historical_time_bars( bar_type_periods: int, wait: bool = True ): + key = f"{symbol}_{bar_type}" + if wait: - self.historical_time_bar_event = asyncio.Event() + event = asyncio.Event() + self.historical_time_bar_events[key] = event await self._send_and_recv_immediate( template_id=202, @@ -115,19 +123,15 @@ async def get_historical_time_bars( # Wait until all the historical data has been fetched before returning it if wait: - key = f"{symbol}_{bar_type}" - try: - await asyncio.wait_for(self.historical_time_bar_event.wait(), 5.0) + await asyncio.wait_for(event.wait(), 5.0) except asyncio.TimeoutError: - if len(self.historical_time_bar_data[key]) == 0: - # No data returned by Rithmic for the request - return [] - - await self.historical_time_bar_event.wait() + # No response within 5s — return whatever accumulated (may be empty) + pass + finally: + self.historical_time_bar_events.pop(key, None) - data = self.historical_time_bar_data.pop(key) - return data + return self.historical_time_bar_data.pop(key, []) async def subscribe_to_time_bar_data( self, @@ -179,7 +183,12 @@ async def _process_response(self, response): # Historical time bar is_last_bar = response.rp_code == ['0'] or response.rq_handler_rp_code == [] if is_last_bar: - self.historical_time_bar_event.set() + # Signal the specific per-request event (keyed by symbol+type). + # Falls back to no-op if no waiter registered (defensive). + key = f"{response.symbol}_{response.type}" + event = self.historical_time_bar_events.get(key) + if event is not None: + event.set() return data = self._response_to_dict(response) @@ -191,7 +200,10 @@ async def _process_response(self, response): # Historical tick bar is_last_bar = response.rp_code == ['0'] or response.rq_handler_rp_code == [] if is_last_bar: - self.historical_tick_event.set() + key = f"{response.symbol}" + event = self.historical_tick_events.get(key) + if event is not None: + event.set() return data = self._response_to_dict(response) diff --git a/tests/test_history_races.py b/tests/test_history_races.py new file mode 100644 index 0000000..9394f14 --- /dev/null +++ b/tests/test_history_races.py @@ -0,0 +1,116 @@ +""" +Regression tests for the historical data race conditions fixed via per-request events. + +Two scenarios that previously crashed: + A) Empty response (Rithmic returns only the is_last_bar marker → KeyError on pop). + B) Concurrent requests for different symbols (shared event got overwritten). +""" +import asyncio +from unittest.mock import MagicMock, AsyncMock + +import pytest + +from async_rithmic.plants import HistoryPlant +from async_rithmic.enums import TimeBarType + + +@pytest.fixture +def history_plant_mock(): + plant = HistoryPlant(MagicMock()) + plant.ws = AsyncMock() + plant.client = MagicMock() + plant.client.retry_settings = MagicMock(max_retries=1, timeout=3, jitter_range=None) + # Stub the send to avoid real network + plant._send_and_recv_immediate = AsyncMock(return_value=None) + return plant + + +async def test_empty_response_returns_empty_list(history_plant_mock): + """ + Bug A: When Rithmic returns only the is_last_bar marker (no data bars), + the code used to raise KeyError on .pop(key). Now it returns []. + """ + plant = history_plant_mock + key = f"MNQM6_{int(TimeBarType.MINUTE_BAR)}" + + # Simulate the is_last_bar marker arriving right after the request is sent + async def trigger_empty_response(): + await asyncio.sleep(0.01) + event = plant.historical_time_bar_events.get(key) + if event: + event.set() + + asyncio.create_task(trigger_empty_response()) + + result = await plant.get_historical_time_bars( + symbol="MNQM6", + exchange="CME", + start_time=__import__("datetime").datetime(2026, 4, 13, 0, 0), + end_time=__import__("datetime").datetime(2026, 4, 13, 0, 1), + bar_type=TimeBarType.MINUTE_BAR, + bar_type_periods=1, + ) + assert result == [] + # Events dict is cleaned up + assert key not in plant.historical_time_bar_events + + +async def test_concurrent_different_symbols(history_plant_mock): + """ + Bug B: Two concurrent requests used to share one event. The first response + would wake the second caller prematurely. Now each request has its own event. + """ + plant = history_plant_mock + import datetime as dt + + async def fire_response_for(symbol, bar_type, delay, data_rows): + await asyncio.sleep(delay) + key = f"{symbol}_{int(bar_type)}" + for row in data_rows: + plant.historical_time_bar_data[key].append(row) + event = plant.historical_time_bar_events.get(key) + if event: + event.set() + + # Caller A requests MNQM6, caller B requests MESM6 a moment later + # A's response arrives first with 2 bars; B's response arrives after with 1 bar + asyncio.create_task(fire_response_for("MNQM6", TimeBarType.MINUTE_BAR, 0.01, [{"x": 1}, {"x": 2}])) + asyncio.create_task(fire_response_for("MESM6", TimeBarType.MINUTE_BAR, 0.02, [{"y": 1}])) + + result_a, result_b = await asyncio.gather( + plant.get_historical_time_bars( + symbol="MNQM6", exchange="CME", + start_time=dt.datetime(2026, 4, 13, 0), end_time=dt.datetime(2026, 4, 13, 0, 1), + bar_type=TimeBarType.MINUTE_BAR, bar_type_periods=1, + ), + plant.get_historical_time_bars( + symbol="MESM6", exchange="CME", + start_time=dt.datetime(2026, 4, 13, 0), end_time=dt.datetime(2026, 4, 13, 0, 1), + bar_type=TimeBarType.MINUTE_BAR, bar_type_periods=1, + ), + ) + + assert result_a == [{"x": 1}, {"x": 2}] + assert result_b == [{"y": 1}] + + +async def test_empty_tick_response_returns_empty_list(history_plant_mock): + """Same as Bug A but for tick data.""" + plant = history_plant_mock + import datetime as dt + key = "MNQM6" + + async def trigger_empty(): + await asyncio.sleep(0.01) + event = plant.historical_tick_events.get(key) + if event: + event.set() + + asyncio.create_task(trigger_empty()) + + result = await plant.get_historical_tick_data( + symbol="MNQM6", exchange="CME", + start_time=dt.datetime(2026, 4, 13, 0), end_time=dt.datetime(2026, 4, 13, 0, 1), + ) + assert result == [] + assert key not in plant.historical_tick_events