diff --git a/async_rithmic/exceptions.py b/async_rithmic/exceptions.py index 510356d..12643d4 100644 --- a/async_rithmic/exceptions.py +++ b/async_rithmic/exceptions.py @@ -6,3 +6,6 @@ class RithmicErrorResponse(Exception): class InvalidRequestError(Exception): """Raised when a user-level API call is missing required arguments or is malformed.""" pass + +class HistoricalDataRequestInProgressError(RuntimeError): + pass diff --git a/async_rithmic/helpers/concurrency.py b/async_rithmic/helpers/concurrency.py index e117584..d5bad71 100644 --- a/async_rithmic/helpers/concurrency.py +++ b/async_rithmic/helpers/concurrency.py @@ -3,7 +3,7 @@ from contextlib import asynccontextmanager @asynccontextmanager -async def try_acquire_lock(plant, timeout: float = 5.0, context: str = ""): +async def try_acquire_lock(plant, timeout: float = 10.0, context: str = ""): """ Attempts to acquire an asyncio.Lock with timeout. Logs and raises on timeout to help detect deadlocks. diff --git a/async_rithmic/plants/base.py b/async_rithmic/plants/base.py index 4479750..2fb2a90 100644 --- a/async_rithmic/plants/base.py +++ b/async_rithmic/plants/base.py @@ -529,6 +529,12 @@ async def _process_response(self, response): # - pnl subscription responses return True + if response.template_id in [203, 207]: + # Let plant handle: + # - historical time bars + # - historical tick bars + return False + if response.template_id == 77: # Forced logout self.logger.warning("Received a ForcedLogout message from Rithmic - did you reach the maximum number of concurrent sessions ?") @@ -555,11 +561,16 @@ async def _process_response(self, response): raise RithmicErrorResponse(f"Rithmic returned an error={MessageToDict(response)} for the request={request}") else: - if response.template_id in [11, 15, 114, 301]: - # We expect a single response containing `rp_code` for these endpoints + # single-response endpoints that carries data along with the terminal sentinel + # 11: login response + # 15: reference data response + # 114: front month contract response + # 301: login info response + _terminal_carries_data = {11, 15, 114, 301} + + if response.template_id in _terminal_carries_data: self.request_manager.handle_response(response) - # Else: multiple response + a sentinel message with `rp_code` self.request_manager.mark_complete(request_id) else: self.request_manager.handle_response(response) diff --git a/async_rithmic/plants/history.py b/async_rithmic/plants/history.py index 859302e..925e96c 100644 --- a/async_rithmic/plants/history.py +++ b/async_rithmic/plants/history.py @@ -1,22 +1,61 @@ from datetime import datetime import asyncio -from collections import defaultdict +from dataclasses import dataclass from .base import BasePlant +from ..exceptions import HistoricalDataRequestInProgressError from ..enums import SysInfraType, TimeBarType from .. import protocol_buffers as pb + +@dataclass +class HistoricalDataRequest: + """ + Tracks one in-flight historical data request. + """ + + # Request time range + start_index: int + end_index: int + + # Request params + params: dict + + # Pagination + page_count: int + max_pages: int + + # State + done: asyncio.Event + data_received: asyncio.Event + data: list[dict] + + last_marker: int = 0 + + @property + def reached_max_pages(self) -> bool: + return self.page_count >= self.max_pages + + @property + def received_no_data(self) -> bool: + return self.last_marker == 0 + + @property + def reached_end(self) -> bool: + return self.last_marker >= self.end_index + + @property + def is_finished_downloading(self) -> bool: + return self.reached_max_pages or self.received_no_data or self.reached_end + class HistoryPlant(BasePlant): infra_type = SysInfraType.HISTORY_PLANT def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - self.historical_tick_data = defaultdict(list) - self.historical_time_bar_data = defaultdict(list) - - self.historical_tick_events: dict = {} - self.historical_time_bar_events: dict = {} + self.historical_time_bar_requests = {} + self.historical_tick_requests = {} self.client.on_historical_tick += self._on_historical_tick self.client.on_historical_time_bar += self._on_historical_time_bar @@ -32,12 +71,57 @@ def _datetime_to_index(self, dt: datetime): return int(dt.timestamp()) async def _on_historical_time_bar(self, data): - key = f"{data['symbol']}_{data['type']}" - self.historical_time_bar_data[key].append(data) + key = data["_key"] + if (request := self.historical_time_bar_requests.get(key)) is not None: + request.data.append(data) + request.data_received.set() async def _on_historical_tick(self, data): - key = f"{data['symbol']}" - self.historical_tick_data[key].append(data) + key = data["_key"] + if (request := self.historical_tick_requests.get(key)) is not None: + request.data.append(data) + request.data_received.set() + + async def _wait_for_historical_request_completion( + self, + request: HistoricalDataRequest, + idle_timeout: float, + ) -> list[dict]: + """ + Waits for historical data request to complete + Raises TimeoutError exception if no new data has been received in the last `idle_timeout` seconds + """ + while not request.done.is_set(): + request.data_received.clear() + + done_task = asyncio.create_task(request.done.wait()) + data_task = asyncio.create_task(request.data_received.wait()) + + try: + done, pending = await asyncio.wait( + {done_task, data_task}, + timeout=idle_timeout, + return_when=asyncio.FIRST_COMPLETED, + ) + + for task in pending: + task.cancel() + + if not done: + raise TimeoutError( + f"Historical data request stalled: no data or completion message " + f"received for {idle_timeout:.1f}s" + ) + + if done_task in done: + break + + finally: + for task in (done_task, data_task): + if not task.done(): + task.cancel() + + return request.data async def get_historical_tick_data( self, @@ -45,47 +129,88 @@ async def get_historical_tick_data( exchange: str, start_time: datetime, end_time: datetime, - wait: bool = True + wait: bool = True, + idle_timeout: float = 5.0, + max_pages: int = 1_000, ): """ - Creates and sends request for download of tick data for security/exchange over time period - - :param request_id: (str) generated request id used for processing as messages come in - :param symbol: (str) valid security code (e.g. ES) - :param exchange: (str) valid exchange code (e.g. CME) - :param start_time: (dt) start time as datetime in utc - :param end_time: (dt) end time as datetime in utc + Requests historical ticks for a symbol/exchange over a time range. + + :param symbol: Security code, e.g. "NQM6". + :param exchange: Exchange code, e.g. "CME". + :param start_time: Start time of the replay request. + :param end_time: End time of the replay request. + :param wait: If True, wait for the historical replay to complete and return + the collected ticks. If False, return immediately after sending the + request; ticks are still emitted through the ``on_historical_tick`` + callback. + :param idle_timeout: Maximum number of seconds to wait without receiving + either a historical bar or the replay completion message. This is an + idle/stall timeout, not a total request timeout; the timer resets every + time progress is observed. + :param max_pages: Maximum number of replay pages to request. Use `1` to send + a single replay request without pagination. Values greater than `1` allow + the client to issue additional replay requests until the returned bars cover + the requested `end_time`. This handles Rithmic replay truncation. + :return: A list of historical tick dictionaries when ``wait=True``; + otherwise ``None``. + :raises HistoricalDataRequestInProgressError: If another historical tick + request is already in progress. + :raises TimeoutError: If no bar or completion message is received for + ``idle_timeout`` seconds while waiting. """ - key = f"{symbol}" - if wait: - event = asyncio.Event() - self.historical_tick_events[key] = event - - await self._send_and_recv_immediate( - template_id=206, - user_msg=symbol, - symbol=symbol, - exchange=exchange, - bar_type=pb.request_tick_bar_replay_pb2.RequestTickBarReplay.BarType.TICK_BAR, - bar_type_specifier="1", - bar_sub_type=pb.request_tick_bar_replay_pb2.RequestTickBarReplay.BarSubType.REGULAR, - time_order=pb.request_tick_bar_replay_pb2.RequestTickBarReplay.TimeOrder.FORWARDS, - start_index=self._datetime_to_index(start_time), - finish_index=self._datetime_to_index(end_time), + key = f"{symbol}_{exchange}" + + if key in self.historical_tick_requests: + raise HistoricalDataRequestInProgressError( + "Cannot start another historical tick request with the same " + "symbol and exchange while one is already in progress." + ) + + start_index = self._datetime_to_index(start_time) + end_index = self._datetime_to_index(end_time) + + self.historical_tick_requests[key] = HistoricalDataRequest( + # Request range + start_index=start_index, + end_index=end_index, + + # Request param + params=dict( + symbol=symbol, + exchange=exchange, + bar_type=pb.request_tick_bar_replay_pb2.RequestTickBarReplay.BarType.TICK_BAR, + bar_type_specifier="1", + bar_sub_type=pb.request_tick_bar_replay_pb2.RequestTickBarReplay.BarSubType.REGULAR, + time_order=pb.request_tick_bar_replay_pb2.RequestTickBarReplay.TimeOrder.FORWARDS, + ), + + # Pagination + page_count=1, + max_pages=max_pages, + + # State + done=asyncio.Event(), + data_received=asyncio.Event(), + data=[], ) - # Wait until all the historical data has been fetched before returning it - if wait: - try: - await asyncio.wait_for(event.wait(), 5.0) - except asyncio.TimeoutError: - # No response within 5s — return whatever accumulated (may be empty) - pass - finally: - self.historical_tick_events.pop(key, None) + await self._request_historical_ticks(key) + + if not wait: + # Historical ticks will still be emitted through `on_historical_tick`, + # but this call returns immediately instead of collecting them. + return None - return self.historical_tick_data.pop(key, []) + # Wait until Rithmic sends the completion message, and return ticks. + try: + return await self._wait_for_historical_request_completion( + request=self.historical_tick_requests[key], + idle_timeout=idle_timeout, + ) + finally: + self.historical_tick_requests.pop(key, None) async def get_historical_time_bars( self, @@ -95,36 +220,116 @@ async def get_historical_time_bars( end_time: datetime, bar_type: TimeBarType, bar_type_periods: int, - wait: bool = True + wait: bool = True, + idle_timeout: float = 5.0, + max_pages: int = 1_000, ): - key = f"{symbol}_{bar_type}" + """ + Requests historical time bars for a symbol/exchange over a time range. + + :param symbol: Security code, e.g. "NQM6". + :param exchange: Exchange code, e.g. "CME". + :param start_time: Start time of the replay request. + :param end_time: End time of the replay request. + :param bar_type: Type of time bar to request. + :param bar_type_periods: Bar period value. For minute bars, this is the + number of minutes. For second bars, this is the number of seconds. + :param wait: If True, wait for the historical replay to complete and return + the collected bars. If False, return immediately after sending the + request; bars are still emitted through the ``on_historical_time_bar`` + callback. + :param idle_timeout: Maximum number of seconds to wait without receiving + either a historical bar or the replay completion message. This is an + idle/stall timeout, not a total request timeout; the timer resets every + time progress is observed. + :param max_pages: Maximum number of replay pages to request. Use `1` to send + a single replay request without pagination. Values greater than `1` allow + the client to issue additional replay requests until the returned bars cover + the requested `end_time`. This handles Rithmic replay truncation. + :return: A list of historical time bar dictionaries when ``wait=True``; + otherwise ``None``. + :raises HistoricalDataRequestInProgressError: If another historical time bar + request is already in progress. + :raises TimeoutError: If no bar or completion message is received for + ``idle_timeout`` seconds while waiting. + """ + + key = f"{symbol}_{exchange}_{bar_type}_{bar_type_periods}" + + if key in self.historical_time_bar_requests: + raise HistoricalDataRequestInProgressError( + "Cannot start another historical time bar request with the same " + "symbol, exchange, bar type, and period while one is already in progress." + ) + + start_index = self._datetime_to_index(start_time) + end_index = self._datetime_to_index(end_time) + + self.historical_time_bar_requests[key] = HistoricalDataRequest( + # Request range + start_index=start_index, + end_index=end_index, + + # Request param + params=dict( + symbol=symbol, + exchange=exchange, + bar_type=bar_type, + bar_type_period=bar_type_periods, + time_order=pb.request_time_bar_replay_pb2.RequestTimeBarReplay.TimeOrder.FORWARDS, + ), + + # Pagination + page_count=1, + max_pages=max_pages, + + # State + done=asyncio.Event(), + data_received=asyncio.Event(), + data=[], + ) + + await self._request_historical_time_bars(key) + + if not wait: + # Historical bars will still be emitted through `on_historical_time_bar`, + # but this call returns immediately instead of collecting them. + return None + + # Wait until Rithmic sends the completion message, and return bars. + try: + return await self._wait_for_historical_request_completion( + request=self.historical_time_bar_requests[key], + idle_timeout=idle_timeout, + ) + finally: + self.historical_time_bar_requests.pop(key, None) - if wait: - event = asyncio.Event() - self.historical_time_bar_events[key] = event + async def _request_historical_time_bars(self, key: str): + request: HistoricalDataRequest = self.historical_time_bar_requests[key] - await self._send_and_recv_immediate( + self.logger.debug(f"Requesting page {request.page_count} (start index = {request.start_index}) of historical time bars for {key}") + + await self._send_request( template_id=202, - symbol=symbol, - exchange=exchange, - bar_type=bar_type, - bar_type_period=bar_type_periods, - time_order=pb.request_time_bar_replay_pb2.RequestTimeBarReplay.TimeOrder.FORWARDS, - start_index=self._datetime_to_index(start_time), - finish_index=self._datetime_to_index(end_time), + user_msg=key, + start_index=request.start_index, + finish_index=request.end_index, + **request.params, ) - # Wait until all the historical data has been fetched before returning it - if wait: - try: - await asyncio.wait_for(event.wait(), 5.0) - except asyncio.TimeoutError: - # No response within 5s — return whatever accumulated (may be empty) - pass - finally: - self.historical_time_bar_events.pop(key, None) + async def _request_historical_ticks(self, key: str): + request: HistoricalDataRequest = self.historical_tick_requests[key] - return self.historical_time_bar_data.pop(key, []) + self.logger.debug(f"Requesting page {request.page_count} (start index = {request.start_index}) of historical ticks for {key}") + + await self._send_request( + template_id=206, + user_msg=key, + start_index=request.start_index, + finish_index=request.end_index, + **request.params, + ) async def subscribe_to_time_bar_data( self, @@ -175,31 +380,50 @@ async def _process_response(self, response): if response.template_id == 203: # Historical time bar is_last_bar = response.rp_code == ['0'] or response.rq_handler_rp_code == [] + key = response.user_msg[0] + if is_last_bar: - # Signal the specific per-request event (keyed by symbol+type). - # Falls back to no-op if no waiter registered (defensive). - key = f"{response.symbol}_{response.type}" - event = self.historical_time_bar_events.get(key) - if event is not None: - event.set() + if (request := self.historical_time_bar_requests.get(key)) is not None: + if request.is_finished_downloading: + self.logger.debug(f"Finished downloading historical time bars for {key}") + + self.historical_time_bar_requests[key].done.set() + self.historical_time_bar_requests.pop(key, None) + + else: + # Request the next page of results + next_start_index = request.last_marker + 1 + request.last_marker = 0 + request.page_count += 1 + request.start_index = next_start_index + + await asyncio.sleep(0.01) + await self._request_historical_time_bars(key) + return data = self._response_to_dict(response) + data["_key"] = key data["bar_end_datetime"] = datetime.fromtimestamp(data['marker']) + if (request := self.historical_time_bar_requests.get(key)) is not None: + request.last_marker = data['marker'] + await self.client.on_historical_time_bar.call_async(data) elif response.template_id == 207: # Historical tick bar is_last_bar = response.rp_code == ['0'] or response.rq_handler_rp_code == [] + key = response.user_msg[0] + if is_last_bar: - key = f"{response.symbol}" - event = self.historical_tick_events.get(key) - if event is not None: - event.set() + if self.historical_tick_requests.get(key) is not None: + self.historical_tick_requests[key].done.set() + self.historical_tick_requests.pop(key, None) return data = self._response_to_dict(response) + data["_key"] = key data["datetime"] = self._ssboe_usecs_to_datetime(response.data_bar_ssboe[0], response.data_bar_usecs[0]) await self.client.on_historical_tick.call_async(data) diff --git a/async_rithmic/plants/order.py b/async_rithmic/plants/order.py index 21a7d47..5d86fc1 100644 --- a/async_rithmic/plants/order.py +++ b/async_rithmic/plants/order.py @@ -219,7 +219,9 @@ async def submit_order( kwargs.setdefault("duration", OrderDuration.DAY) msg_kwargs = self._validate_price_fields(order_type, **kwargs) - msg_kwargs["account_id"] = kwargs.pop("account_id", None) + + if "account_id" in kwargs: + msg_kwargs["account_id"] = kwargs.pop("account_id") # Get trade route filtered = [r for r in self.trade_routes if r.exchange == exchange] diff --git a/docs/connection.rst b/docs/connection.rst index 4a7317c..039ea51 100644 --- a/docs/connection.rst +++ b/docs/connection.rst @@ -35,6 +35,7 @@ Conformance - First, `contact Rithmic `_. - You will be asked to connect to the order plant and leave the app running. - You can use the `conformance.py script `_ to accomplish this. + - See `this GitHub issue comment `_ for one user's notes on the conformance process. Custom Reconnection Settings ---------------------------- diff --git a/docs/historical_data.rst b/docs/historical_data.rst index c8e2393..70b85b2 100644 --- a/docs/historical_data.rst +++ b/docs/historical_data.rst @@ -51,7 +51,7 @@ The following example fetches historical tick data: Fetch Historical Time Bars -------------------------- -This example fetches historical aggregated time bars (6-second bars in this case): +Fetch historical time bars for a symbol over a time range. .. code-block:: python @@ -91,3 +91,47 @@ This example fetches historical aggregated time bars (6-second bars in this case await client.disconnect() asyncio.run(main()) + +By default, ``get_historical_time_bars()`` waits until the historical replay is +complete and returns the collected bars as a list. + +Rithmic may truncate historical replay responses. In practice, a single replay +request can return at most about 10,000 bars, even if the requested time range +contains many more bars. For example, requesting several months of 1-minute bars +may cover hundreds of thousands of bars, but Rithmic may only return the first +page of results. + +To handle this, ``async_rithmic`` automatically paginates historical time bar +requests. After each page, it uses the last received bar marker as the starting +point for the next request and continues until one of the following happens: + +- the requested ``end_time`` is reached; +- Rithmic returns no more data; +- ``max_pages`` is reached. + +The ``max_pages`` argument controls how many replay pages can be requested. + +The ``idle_timeout`` argument controls how long the client waits without seeing +progress while waiting for a historical replay to complete. + +.. code-block:: python + + bars = await client.get_historical_time_bars( + ..., + max_pages=100, + idle_timeout=10.0, + ) + +This is an idle timeout, not a total request timeout. The timer resets whenever a +bar or completion message is received. + +If ``wait=False`` is passed, the method sends the replay request and returns +immediately. Historical bars are still emitted through the +``on_historical_time_bar`` callback. + +.. code-block:: python + + async def callback(data): + print(f"Received data: {data}") + + client.on_historical_time_bar += callback \ No newline at end of file diff --git a/docs/orders.rst b/docs/orders.rst index fefb362..95454b0 100644 --- a/docs/orders.rst +++ b/docs/orders.rst @@ -147,6 +147,97 @@ This example places a limit order and cancels it shortly after: asyncio.run(main()) +Placing a Bracket Order +----------------------- + +A bracket order is a regular order with one or more attached exit orders bound to it. +In practice, you submit the entry order exactly like a normal market, limit, stop, or +stop-limit order, and then add: + +- ``stop_ticks`` for an attached stop-loss order. +- ``target_ticks`` for an attached take-profit order. +- both ``stop_ticks`` and ``target_ticks`` for a bracket with both stop-loss and take-profit. + +The stop-loss and take-profit distances are expressed in ticks from the entry price. +The attached order quantity is automatically set to the same quantity as the main order. + +When neither ``stop_ticks`` nor ``target_ticks`` is passed, ``submit_order()`` submits a +normal order. When either one is passed, ``submit_order()`` submits a bracket order. + +Stop-loss only +~~~~~~~~~~~~~~ + +.. code-block:: python + + await client.submit_order( + order_id, + security_code, + exchange, + qty=1, + order_type=OrderType.MARKET, + transaction_type=TransactionType.BUY, + stop_ticks=20, + ) + +Take-profit only +~~~~~~~~~~~~~~~~ + +.. code-block:: python + + await client.submit_order( + order_id, + security_code, + exchange, + qty=1, + order_type=OrderType.MARKET, + transaction_type=TransactionType.BUY, + target_ticks=40, + ) + +Stop-loss and take-profit +~~~~~~~~~~~~~~~~~~~~~~~~~ + +.. code-block:: python + + await client.submit_order( + order_id, + security_code, + exchange, + qty=1, + order_type=OrderType.MARKET, + transaction_type=TransactionType.BUY, + stop_ticks=20, + target_ticks=40, + ) + + +Market-on-reject for stop orders +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +For stop-loss brackets, you may also pass ``stop_market_on_reject=True``: + +.. code-block:: python + + await client.submit_order( + order_id, + security_code, + exchange, + qty=1, + order_type=OrderType.MARKET, + transaction_type=TransactionType.BUY, + stop_ticks=20, + target_ticks=40, + stop_market_on_reject=True, + ) + +This option tells Rithmic to convert a rejected stop order into a market order. It is +useful as a protective fallback: if the attached stop cannot be accepted as a stop +order, the platform can still attempt to flatten the position instead of leaving it +without stop protection. + +Use this option only when that behavior is desired. A market order can fill with +slippage, especially during fast markets or thin liquidity. + Cancelling an order ------------------- @@ -171,29 +262,55 @@ To cancel all open orders: Modifying an order ------------------ +Modify an existing active order with new parameters. -Modify an existing order with new parameters. - -This method allows you to update one or more attributes of an active order, such as quantity, order type, price, stop-loss, or take-profit levels. +This method allows you to update one or more attributes of an active order, such +as quantity, order type, price, stop-loss, or take-profit levels. **Supported attributes:** - ``qty``: New quantity for the order. -- ``order_type``: Order type (e.g., ``"MKT"``, ``"LMT"``, ``"STOP LMT"``, etc.). -- ``price``: Updated price (used for limit or stop-limit orders). -- ``trigger_price``: Updated trigger price (for stop orders). -- ``stop_ticks``: New stop-loss in ticks (modify stop-loss). -- ``target_ticks``: New take-profit in ticks (modify take-profit). +- ``order_type``: Order type, for example ``OrderType.MARKET``, ``OrderType.LIMIT``, or ``OrderType.STOP_LIMIT``. +- ``price``: Updated price, used for limit or stop-limit orders. +- ``trigger_price``: Updated trigger price, used for stop orders. +- ``stop_ticks``: New stop-loss in ticks. +- ``target_ticks``: New take-profit in ticks. + +.. note:: + + Rithmic does not allow the main order, stop-loss, and take-profit to be + modified concurrently. If multiple parts of a bracket order are modified, + ``async_rithmic`` submits the modifications sequentially. + +Basic example: .. code-block:: python await client.modify_order( order_id="abc123", qty=3, - target_ticks=50 - stop_ticks=25 + target_ticks=50, + stop_ticks=25, + ) + +For time-critical modifications, you can pass the existing order object directly +using ``order=``. This skips the internal ``get_order()`` lookup and avoids an +extra network round-trip before sending the modification request. + +Example: + +.. code-block:: python + + order = await client.get_order(order_id="abc123") + + await client.modify_order( + order=order, + target_ticks=50, ) +This is useful when you already have the order object and want to avoid adding +unnecessary latency to the modification path. + Exiting a position ------------------ diff --git a/tests/test_history_plant.py b/tests/test_history_plant.py new file mode 100644 index 0000000..02372e5 --- /dev/null +++ b/tests/test_history_plant.py @@ -0,0 +1,272 @@ +import asyncio +from unittest.mock import MagicMock, AsyncMock +from datetime import datetime, timezone +from pattern_kit import Event + +import pytest + +from async_rithmic.plants import HistoryPlant +from async_rithmic.enums import TimeBarType + + +@pytest.fixture +def history_plant_mock(): + client = MagicMock() + client.retry_settings = MagicMock(max_retries=1, timeout=3, jitter_range=None) + client.on_historical_time_bar = Event() + client.on_historical_tick = Event() + + plant = HistoryPlant(client) + plant.ws = AsyncMock() + # Stub the send to avoid real network + plant._send_request = AsyncMock(return_value=None) + + return plant + + +async def test_empty_response_returns_empty_list(history_plant_mock): + """ + Bug A: When Rithmic returns only the is_last_bar marker (no data bars), + the code used to raise KeyError on .pop(key). Now it returns []. + """ + plant = history_plant_mock + key = f"MNQM6_CME_{int(TimeBarType.MINUTE_BAR)}_1" + + # Simulate the is_last_bar marker arriving right after the request is sent + async def trigger_empty_response(): + await asyncio.sleep(0.01) + await plant._process_response( + MagicMock( + template_id=203, + rp_code=['0'], + user_msg=[key] + ) + ) + + asyncio.create_task(trigger_empty_response()) + + result = await plant.get_historical_time_bars( + symbol="MNQM6", + exchange="CME", + start_time=datetime(2026, 4, 13, 0, 0), + end_time=datetime(2026, 4, 13, 0, 1), + bar_type=TimeBarType.MINUTE_BAR, + bar_type_periods=1, + ) + assert result == [] + # Events dict is cleaned up + assert key not in plant.historical_time_bar_requests + + +async def test_empty_tick_response_returns_empty_list(history_plant_mock): + """Same as Bug A but for tick data.""" + plant = history_plant_mock + key = f"MNQM6_CME" + + # Simulate the is_last_bar marker arriving right after the request is sent + async def trigger_empty_response(): + await asyncio.sleep(0.01) + await plant._process_response( + MagicMock( + template_id=207, + rp_code=['0'], + user_msg=[key] + ) + ) + + asyncio.create_task(trigger_empty_response()) + + result = await plant.get_historical_tick_data( + symbol="MNQM6", exchange="CME", + start_time=datetime(2026, 4, 13, 0), end_time=datetime(2026, 4, 13, 0, 1), + ) + assert result == [] + # Events dict is cleaned up + assert key not in plant.historical_tick_requests + + + +async def test_concurrent_different_symbols(history_plant_mock): + """ + Bug B: Two concurrent requests used to share one event. The first response + would wake the second caller prematurely. Now each request has its own event. + """ + plant = history_plant_mock + + async def fire_responses(data_rows): + await asyncio.sleep(0.01) + + keys = set() + for symbol, data in data_rows: + key = f"{symbol}_CME_{int(TimeBarType.MINUTE_BAR)}_1" + keys.add(key) + + plant._response_to_dict = MagicMock(return_value=data) + + await plant._process_response( + MagicMock( + template_id=203, + user_msg=[key] + ) + ) + + await asyncio.sleep(0.01) + + for key in keys: + plant.historical_time_bar_requests[key].done.set() + + + # Fire interleaved responses + asyncio.create_task( + fire_responses([ + ["MNQM6", {"x": 1, "marker": 1777300260}], + ["MESM6", {"y": 1, "marker": 1777300260}], + ["MNQM6", {"x": 2, "marker": 1777300260}], + ]) + ) + + result_a, result_b = await asyncio.gather( + plant.get_historical_time_bars( + symbol="MNQM6", exchange="CME", + start_time=datetime(2026, 4, 13, 0), end_time=datetime(2026, 4, 13, 0, 1), + bar_type=TimeBarType.MINUTE_BAR, bar_type_periods=1, + ), + plant.get_historical_time_bars( + symbol="MESM6", exchange="CME", + start_time=datetime(2026, 4, 13, 0), end_time=datetime(2026, 4, 13, 0, 1), + bar_type=TimeBarType.MINUTE_BAR, bar_type_periods=1, + ), + ) + + assert len(result_a) == 2 + assert result_a[0]["x"] == 1 + assert result_a[1]["x"] == 2 + + assert len(result_b) == 1 + assert result_b[0]["y"] == 1 + +async def test_historical_time_bar_pagination(history_plant_mock): + """ + When Rithmic truncates a historical time bar replay, the client should + request additional pages until the returned bars cover the requested end_time. + + Rithmic seems to label time bars by end timestamp. So a request covering 10:00 to + 10:04 can return bars labeled 10:01, 10:02, 10:03, 10:04, 10:05. + """ + plant = history_plant_mock + + symbol = "MNQM6" + exchange = "CME" + bar_type = TimeBarType.MINUTE_BAR + bar_type_periods = 1 + + key = f"{symbol}_{exchange}_{bar_type}_{bar_type_periods}" + + # Marker values returned by Rithmic for the request: + # + # 1777644060 -> 10:01 + # 1777644120 -> 10:02 + # 1777644180 -> 10:03 + # 1777644240 -> 10:04 + # 1777644300 -> 10:05 + + start_dt = datetime(2026, 5, 1, 14, 0, tzinfo=timezone.utc) + end_dt = datetime(2026, 5, 1, 14, 4, tzinfo=timezone.utc) + + chunks = [ + # First request response: truncated before covering end_time. + [ + {"marker": 1777644060, "page": 1}, + {"marker": 1777644120, "page": 1}, + None, # End msg + ], + + # Second request response: now covers/passes end_time. + [ + {"marker": 1777644180, "page": 2}, + {"marker": 1777644240, "page": 2}, + {"marker": 1777644300, "page": 2}, + None, # End msg + ], + ] + + async def emit_chunk(chunk): + for row in chunk: + if row is None: + # Rithmic terminal/completion message for this page. + await plant._process_response( + MagicMock( + template_id=203, + rp_code=["0"], + rq_handler_rp_code=[], + user_msg=[key], + ) + ) + continue + + plant._response_to_dict = MagicMock(return_value=row) + + await plant._process_response( + MagicMock( + template_id=203, + rp_code=[], + rq_handler_rp_code=["data"], + user_msg=[key], + ) + ) + + send_count = 0 + + async def fake_send_request(**kwargs): + nonlocal send_count + + chunk = chunks[send_count] + send_count += 1 + + # Schedule responses asynchronously so get_historical_time_bars() + # can enter _wait_for_historical_request_completion(). + asyncio.create_task(emit_chunk(chunk)) + + plant._send_request = AsyncMock(side_effect=fake_send_request) + + result = await plant.get_historical_time_bars( + symbol=symbol, + exchange=exchange, + start_time=start_dt, + end_time=end_dt, + bar_type=bar_type, + bar_type_periods=bar_type_periods, + max_pages=5, + ) + + assert [row["marker"] for row in result] == [ + 1777644060, + 1777644120, + 1777644180, + 1777644240, + 1777644300, + ] + + assert [row["page"] for row in result] == [1, 1, 2, 2, 2] + + # One request for the first page, one request for the second page. + assert plant._send_request.await_count == 2 + assert send_count == 2 + + # Request state should be cleaned up after completion. + assert key not in plant.historical_time_bar_requests + + first_call = plant._send_request.await_args_list[0].kwargs + second_call = plant._send_request.await_args_list[1].kwargs + + assert first_call["template_id"] == 202 + assert first_call["user_msg"] == key + assert first_call["start_index"] == 1777644000 + assert first_call["finish_index"] == 1777644240 + + assert second_call["template_id"] == 202 + assert second_call["user_msg"] == key + + # next_start_index = request.last_marker + 1 + assert second_call["start_index"] == 1777644120 + 1 + assert second_call["finish_index"] == 1777644240 diff --git a/tests/test_history_races.py b/tests/test_history_races.py deleted file mode 100644 index 6e3ceb6..0000000 --- a/tests/test_history_races.py +++ /dev/null @@ -1,115 +0,0 @@ -""" -Regression tests for the historical data race conditions fixed via per-request events. - -Two scenarios that previously crashed: - A) Empty response (Rithmic returns only the is_last_bar marker → KeyError on pop). - B) Concurrent requests for different symbols (shared event got overwritten). -""" -import asyncio -from unittest.mock import MagicMock, AsyncMock -from datetime import datetime - -import pytest - -from async_rithmic.plants import HistoryPlant -from async_rithmic.enums import TimeBarType - - -@pytest.fixture -def history_plant_mock(): - plant = HistoryPlant(MagicMock()) - plant.ws = AsyncMock() - plant.client = MagicMock() - plant.client.retry_settings = MagicMock(max_retries=1, timeout=3, jitter_range=None) - # Stub the send to avoid real network - plant._send_and_recv_immediate = AsyncMock(return_value=None) - return plant - - -async def test_empty_response_returns_empty_list(history_plant_mock): - """ - Bug A: When Rithmic returns only the is_last_bar marker (no data bars), - the code used to raise KeyError on .pop(key). Now it returns []. - """ - plant = history_plant_mock - key = f"MNQM6_{int(TimeBarType.MINUTE_BAR)}" - - # Simulate the is_last_bar marker arriving right after the request is sent - async def trigger_empty_response(): - await asyncio.sleep(0.01) - event = plant.historical_time_bar_events.get(key) - if event: - event.set() - - asyncio.create_task(trigger_empty_response()) - - result = await plant.get_historical_time_bars( - symbol="MNQM6", - exchange="CME", - start_time=__import__("datetime").datetime(2026, 4, 13, 0, 0), - end_time=__import__("datetime").datetime(2026, 4, 13, 0, 1), - bar_type=TimeBarType.MINUTE_BAR, - bar_type_periods=1, - ) - assert result == [] - # Events dict is cleaned up - assert key not in plant.historical_time_bar_events - - -async def test_concurrent_different_symbols(history_plant_mock): - """ - Bug B: Two concurrent requests used to share one event. The first response - would wake the second caller prematurely. Now each request has its own event. - """ - plant = history_plant_mock - - async def fire_response_for(symbol, bar_type, delay, data_rows): - await asyncio.sleep(delay) - key = f"{symbol}_{int(bar_type)}" - for row in data_rows: - plant.historical_time_bar_data[key].append(row) - event = plant.historical_time_bar_events.get(key) - if event: - event.set() - - # Caller A requests MNQM6, caller B requests MESM6 a moment later - # A's response arrives first with 2 bars; B's response arrives after with 1 bar - asyncio.create_task(fire_response_for("MNQM6", TimeBarType.MINUTE_BAR, 0.01, [{"x": 1}, {"x": 2}])) - asyncio.create_task(fire_response_for("MESM6", TimeBarType.MINUTE_BAR, 0.02, [{"y": 1}])) - - result_a, result_b = await asyncio.gather( - plant.get_historical_time_bars( - symbol="MNQM6", exchange="CME", - start_time=datetime(2026, 4, 13, 0), end_time=datetime(2026, 4, 13, 0, 1), - bar_type=TimeBarType.MINUTE_BAR, bar_type_periods=1, - ), - plant.get_historical_time_bars( - symbol="MESM6", exchange="CME", - start_time=datetime(2026, 4, 13, 0), end_time=datetime(2026, 4, 13, 0, 1), - bar_type=TimeBarType.MINUTE_BAR, bar_type_periods=1, - ), - ) - - assert result_a == [{"x": 1}, {"x": 2}] - assert result_b == [{"y": 1}] - - -async def test_empty_tick_response_returns_empty_list(history_plant_mock): - """Same as Bug A but for tick data.""" - plant = history_plant_mock - key = "MNQM6" - - async def trigger_empty(): - await asyncio.sleep(0.01) - event = plant.historical_tick_events.get(key) - if event: - event.set() - - asyncio.create_task(trigger_empty()) - - result = await plant.get_historical_tick_data( - symbol="MNQM6", exchange="CME", - start_time=datetime(2026, 4, 13, 0), end_time=datetime(2026, 4, 13, 0, 1), - ) - assert result == [] - assert key not in plant.historical_tick_events diff --git a/tests/test_order_plant.py.bkp b/tests/test_order_plant.py.bkp new file mode 100644 index 0000000..40f0e27 --- /dev/null +++ b/tests/test_order_plant.py.bkp @@ -0,0 +1,90 @@ +import pytest +import asyncio +import uuid +import random +from collections import namedtuple +from unittest.mock import MagicMock, AsyncMock + +from async_rithmic.helpers.request_manager import RequestManager +from async_rithmic.plants.order import OrderPlant + +# Mimic a protobuf response closely enough for _process_response: +# template_id, rp_code (list of strings), user_msg (list of strings), plus a basket_id field. +FakeResponse = namedtuple("FakeResponse", ["template_id", "rp_code", "user_msg", "basket_id"]) + +class TestOrderPlant: + @pytest.fixture + def plant(self): + client = MagicMock() + plant = OrderPlant(client) + plant._send_request = AsyncMock() + + return plant + + async def _submit_and_deliver_terminal(self, plant, template_id=312, expected_template=313, + deliver_delay=0.005): + + """Simulate one submit_order call: send, wait for one terminal response with rp_code=0, + return the stored response list.""" + request_id = str(uuid.uuid4()) + + async def send(): + return await plant.request_manager.send_and_collect( + timeout=2.0, + user_msg=request_id, + template_id=template_id, + expected_response=dict(template_id=expected_template, user_msg=[request_id]), + ) + + async def deliver(): + await asyncio.sleep(deliver_delay) + response = FakeResponse( + template_id=expected_template, + rp_code=['0'], # success — terminal response + user_msg=[request_id], + basket_id=f"basket_{request_id[:8]}", + ) + await plant._process_response(response) + + # Run send and deliver concurrently; send awaits done_event, deliver fires it. + results, _ = await asyncio.gather(send(), deliver()) + return results, request_id + + async def test_single_submit_order_terminal_response_stored(self, plant): + responses, request_id = await self._submit_and_deliver_terminal(plant) + assert len(responses) == 1, f"Expected 1 stored response, got {len(responses)}" + assert responses[0].basket_id == f"basket_{request_id[:8]}" + assert responses[0].user_msg == [request_id] + + async def test_three_concurrent_submit_orders(self, plant): + """ + 3 concurrent submit_orders, each receives only a terminal response with rp_code=0. + All 3 must get their correct response (matched by user_msg).""" + results = await asyncio.gather( + self._submit_and_deliver_terminal(plant, deliver_delay=0.002), + self._submit_and_deliver_terminal(plant, deliver_delay=0.005), + self._submit_and_deliver_terminal(plant, deliver_delay=0.008), + ) + + for i, (responses, request_id) in enumerate(results): + assert len(responses) == 1, \ + f"[Request {i}] Expected 1 response, got {len(responses)} — terminal was dropped" + # Each response must be matched to its OWN request_id, not a sibling's. + assert responses[0].user_msg == [request_id], \ + f"[Request {i}] Response user_msg {responses[0].user_msg} does not match request {request_id}" + assert responses[0].basket_id == f"basket_{request_id[:8]}" + + async def test_six_concurrent_submit_orders_staggered(self, plant): + """ + Heavier fan-out mimicking 6 accounts firing orders over ~100ms window, terminal responses arrive + in arbitrary order. + """ + tasks = [ + self._submit_and_deliver_terminal(plant, deliver_delay=random.uniform(0.001, 0.05)) + for _ in range(6) + ] + results = await asyncio.gather(*tasks) + + for i, (responses, request_id) in enumerate(results): + assert len(responses) == 1, f"[Request {i}] lost terminal response" + assert responses[0].user_msg == [request_id] diff --git a/tests/test_request_manager.py b/tests/test_request_manager.py index 4c564dd..4a591bb 100644 --- a/tests/test_request_manager.py +++ b/tests/test_request_manager.py @@ -85,3 +85,14 @@ async def test_heavy_interleaved_request_streams(self, manager): f"[Request {i}] Response had wrong template_id: {r.template_id} ≠ {expected_template_id}" assert r.account_id == account_id, \ f"[Request {i}] Response had wrong account_id: {r.account_id} ≠ {account_id}" + + async def test_no_response_times_out(self, manager): + request_id = str(uuid.uuid4()) + with pytest.raises(asyncio.TimeoutError): + await manager.send_and_collect( + timeout=0.01, + user_msg=request_id, + template_id=312, + expected_response=dict(template_id=313, user_msg=[request_id]), + ) +