Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 38 additions & 26 deletions async_rithmic/plants/history.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,15 @@ def __init__(self, *args, **kwargs):
self.historical_tick_data = defaultdict(list)
self.historical_time_bar_data = defaultdict(list)

self.historical_tick_event = asyncio.Event()
self.historical_time_bar_event = asyncio.Event()
# Per-request events keyed by f"{symbol}" (tick) or f"{symbol}_{type}" (time bar).
# Fixes two races in the prior single-shared-event design:
# 1) Empty response: is_last_bar marker sets the event before any data
# callbacks fire, so pop(key) raised KeyError (fixed here with per-key
# events + .pop default).
# 2) Concurrent requests: a second caller would overwrite the shared event,
# causing the first response to wake the second caller prematurely.
self.historical_tick_events: dict = {}
self.historical_time_bar_events: dict = {}

self.client.on_historical_tick += self._on_historical_tick
self.client.on_historical_time_bar += self._on_historical_time_bar
Expand Down Expand Up @@ -56,9 +63,11 @@ async def get_historical_tick_data(
:param start_time: (dt) start time as datetime in utc
:param end_time: (dt) end time as datetime in utc
"""
key = f"{symbol}"

if wait:
self.historical_tick_event = asyncio.Event()
event = asyncio.Event()
self.historical_tick_events[key] = event

await self._send_and_recv_immediate(
template_id=206,
Expand All @@ -75,19 +84,15 @@ async def get_historical_tick_data(

# Wait until all the historical data has been fetched before returning it
if wait:
key = f"{symbol}"

try:
await asyncio.wait_for(self.historical_tick_event.wait(), 5.0)
await asyncio.wait_for(event.wait(), 5.0)
except asyncio.TimeoutError:
if len(self.historical_tick_data[key]) == 0:
# No data returned by Rithmic for the request
return []

await self.historical_tick_event.wait()
# No response within 5s — return whatever accumulated (may be empty)
pass
finally:
self.historical_tick_events.pop(key, None)

data = self.historical_tick_data.pop(key)
return data
return self.historical_tick_data.pop(key, [])

async def get_historical_time_bars(
self,
Expand All @@ -99,8 +104,11 @@ async def get_historical_time_bars(
bar_type_periods: int,
wait: bool = True
):
key = f"{symbol}_{bar_type}"

if wait:
self.historical_time_bar_event = asyncio.Event()
event = asyncio.Event()
self.historical_time_bar_events[key] = event

await self._send_and_recv_immediate(
template_id=202,
Expand All @@ -115,19 +123,15 @@ async def get_historical_time_bars(

# Wait until all the historical data has been fetched before returning it
if wait:
key = f"{symbol}_{bar_type}"

try:
await asyncio.wait_for(self.historical_time_bar_event.wait(), 5.0)
await asyncio.wait_for(event.wait(), 5.0)
except asyncio.TimeoutError:
if len(self.historical_time_bar_data[key]) == 0:
# No data returned by Rithmic for the request
return []

await self.historical_time_bar_event.wait()
# No response within 5s — return whatever accumulated (may be empty)
pass
finally:
self.historical_time_bar_events.pop(key, None)

data = self.historical_time_bar_data.pop(key)
return data
return self.historical_time_bar_data.pop(key, [])

async def subscribe_to_time_bar_data(
self,
Expand Down Expand Up @@ -179,7 +183,12 @@ async def _process_response(self, response):
# Historical time bar
is_last_bar = response.rp_code == ['0'] or response.rq_handler_rp_code == []
if is_last_bar:
self.historical_time_bar_event.set()
# Signal the specific per-request event (keyed by symbol+type).
# Falls back to no-op if no waiter registered (defensive).
key = f"{response.symbol}_{response.type}"
event = self.historical_time_bar_events.get(key)
if event is not None:
event.set()
return

data = self._response_to_dict(response)
Expand All @@ -191,7 +200,10 @@ async def _process_response(self, response):
# Historical tick bar
is_last_bar = response.rp_code == ['0'] or response.rq_handler_rp_code == []
if is_last_bar:
self.historical_tick_event.set()
key = f"{response.symbol}"
event = self.historical_tick_events.get(key)
if event is not None:
event.set()
return

data = self._response_to_dict(response)
Expand Down
116 changes: 116 additions & 0 deletions tests/test_history_races.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
"""
Regression tests for the historical data race conditions fixed via per-request events.

Two scenarios that previously crashed:
A) Empty response (Rithmic returns only the is_last_bar marker → KeyError on pop).
B) Concurrent requests for different symbols (shared event got overwritten).
"""
import asyncio
from unittest.mock import MagicMock, AsyncMock

import pytest

from async_rithmic.plants import HistoryPlant
from async_rithmic.enums import TimeBarType


@pytest.fixture
def history_plant_mock():
plant = HistoryPlant(MagicMock())
plant.ws = AsyncMock()
plant.client = MagicMock()
plant.client.retry_settings = MagicMock(max_retries=1, timeout=3, jitter_range=None)
# Stub the send to avoid real network
plant._send_and_recv_immediate = AsyncMock(return_value=None)
return plant


async def test_empty_response_returns_empty_list(history_plant_mock):
"""
Bug A: When Rithmic returns only the is_last_bar marker (no data bars),
the code used to raise KeyError on .pop(key). Now it returns [].
"""
plant = history_plant_mock
key = f"MNQM6_{int(TimeBarType.MINUTE_BAR)}"

# Simulate the is_last_bar marker arriving right after the request is sent
async def trigger_empty_response():
await asyncio.sleep(0.01)
event = plant.historical_time_bar_events.get(key)
if event:
event.set()

asyncio.create_task(trigger_empty_response())

result = await plant.get_historical_time_bars(
symbol="MNQM6",
exchange="CME",
start_time=__import__("datetime").datetime(2026, 4, 13, 0, 0),
end_time=__import__("datetime").datetime(2026, 4, 13, 0, 1),
bar_type=TimeBarType.MINUTE_BAR,
bar_type_periods=1,
)
assert result == []
# Events dict is cleaned up
assert key not in plant.historical_time_bar_events


async def test_concurrent_different_symbols(history_plant_mock):
"""
Bug B: Two concurrent requests used to share one event. The first response
would wake the second caller prematurely. Now each request has its own event.
"""
plant = history_plant_mock
import datetime as dt

async def fire_response_for(symbol, bar_type, delay, data_rows):
await asyncio.sleep(delay)
key = f"{symbol}_{int(bar_type)}"
for row in data_rows:
plant.historical_time_bar_data[key].append(row)
event = plant.historical_time_bar_events.get(key)
if event:
event.set()

# Caller A requests MNQM6, caller B requests MESM6 a moment later
# A's response arrives first with 2 bars; B's response arrives after with 1 bar
asyncio.create_task(fire_response_for("MNQM6", TimeBarType.MINUTE_BAR, 0.01, [{"x": 1}, {"x": 2}]))
asyncio.create_task(fire_response_for("MESM6", TimeBarType.MINUTE_BAR, 0.02, [{"y": 1}]))

result_a, result_b = await asyncio.gather(
plant.get_historical_time_bars(
symbol="MNQM6", exchange="CME",
start_time=dt.datetime(2026, 4, 13, 0), end_time=dt.datetime(2026, 4, 13, 0, 1),
bar_type=TimeBarType.MINUTE_BAR, bar_type_periods=1,
),
plant.get_historical_time_bars(
symbol="MESM6", exchange="CME",
start_time=dt.datetime(2026, 4, 13, 0), end_time=dt.datetime(2026, 4, 13, 0, 1),
bar_type=TimeBarType.MINUTE_BAR, bar_type_periods=1,
),
)

assert result_a == [{"x": 1}, {"x": 2}]
assert result_b == [{"y": 1}]


async def test_empty_tick_response_returns_empty_list(history_plant_mock):
"""Same as Bug A but for tick data."""
plant = history_plant_mock
import datetime as dt
key = "MNQM6"

async def trigger_empty():
await asyncio.sleep(0.01)
event = plant.historical_tick_events.get(key)
if event:
event.set()

asyncio.create_task(trigger_empty())

result = await plant.get_historical_tick_data(
symbol="MNQM6", exchange="CME",
start_time=dt.datetime(2026, 4, 13, 0), end_time=dt.datetime(2026, 4, 13, 0, 1),
)
assert result == []
assert key not in plant.historical_tick_events
Loading