Skip to content

Commit fffeab1

Browse files
authored
Check for staleness in each Hyperliquid websocket channel (#3289)
1 parent 7427c56 commit fffeab1

File tree

13 files changed

+258
-84
lines changed

13 files changed

+258
-84
lines changed

apps/hip-3-pusher/pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "hip-3-pusher"
3-
version = "0.2.3"
3+
version = "0.2.4"
44
description = "Hyperliquid HIP-3 market oracle pusher"
55
readme = "README.md"
66
requires-python = "==3.13.*"

apps/hip-3-pusher/src/pusher/config.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,14 @@
33
from typing import Optional
44
from typing import Literal
55

6+
# Interval of time after which we'll cycle websocket connections
67
STALE_TIMEOUT_SECONDS = 5
8+
# This is the interval to call userRateLimit. Low-frequency as it's just for long-term metrics.
9+
USER_LIMIT_INTERVAL_SECONDS = 1800
10+
# HL has an application-level ping-pong that should be handled on the order of a minute.
11+
HYPERLIQUID_WS_PING_INTERVAL_SECONDS = 20
12+
# Number of websocket failures before we crash/restart the app.
13+
DEFAULT_STOP_AFTER_ATTEMPT = 20
714

815

916
class KMSConfig(BaseModel):
@@ -20,11 +27,13 @@ class LazerConfig(BaseModel):
2027
lazer_urls: list[str]
2128
lazer_api_key: str
2229
feed_ids: list[int]
30+
stop_after_attempt: int = DEFAULT_STOP_AFTER_ATTEMPT
2331

2432

2533
class HermesConfig(BaseModel):
2634
hermes_urls: list[str]
2735
feed_ids: list[str]
36+
stop_after_attempt: int = DEFAULT_STOP_AFTER_ATTEMPT
2837

2938

3039
class HyperliquidConfig(BaseModel):
@@ -37,6 +46,9 @@ class HyperliquidConfig(BaseModel):
3746
publish_interval: float
3847
publish_timeout: float
3948
enable_publish: bool
49+
user_limit_interval: int = USER_LIMIT_INTERVAL_SECONDS
50+
ws_ping_interval: int = HYPERLIQUID_WS_PING_INTERVAL_SECONDS
51+
stop_after_attempt: int = DEFAULT_STOP_AFTER_ATTEMPT
4052

4153
@model_validator(mode="after")
4254
def set_default_urls(self):

apps/hip-3-pusher/src/pusher/hermes_listener.py

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
import asyncio
22
import json
33
from loguru import logger
4-
import time
54
import websockets
6-
from tenacity import retry, retry_if_exception_type, wait_exponential
5+
from tenacity import retry, retry_if_exception_type, wait_fixed, stop_after_attempt
76

87
from pusher.config import Config, STALE_TIMEOUT_SECONDS
98
from pusher.exception import StaleConnectionError
@@ -18,6 +17,7 @@ def __init__(self, config: Config, hermes_state: PriceSourceState):
1817
self.hermes_urls = config.hermes.hermes_urls
1918
self.feed_ids = config.hermes.feed_ids
2019
self.hermes_state = hermes_state
20+
self.stop_after_attempt = config.hermes.stop_after_attempt
2121

2222
def get_subscribe_request(self):
2323
return {
@@ -36,13 +36,19 @@ async def subscribe_all(self):
3636

3737
await asyncio.gather(*(self.subscribe_single(url) for url in self.hermes_urls))
3838

39-
@retry(
40-
retry=retry_if_exception_type((StaleConnectionError, websockets.ConnectionClosed)),
41-
wait=wait_exponential(multiplier=1, min=1, max=10),
42-
reraise=True,
43-
)
4439
async def subscribe_single(self, url):
45-
return await self.subscribe_single_inner(url)
40+
logger.info("Starting Hermes listener loop: {}", url)
41+
42+
@retry(
43+
retry=retry_if_exception_type(Exception),
44+
wait=wait_fixed(1),
45+
stop=stop_after_attempt(self.stop_after_attempt),
46+
reraise=True,
47+
)
48+
async def _run():
49+
return await self.subscribe_single_inner(url)
50+
51+
return await _run()
4652

4753
async def subscribe_single_inner(self, url):
4854
async with websockets.connect(url) as ws:
@@ -58,13 +64,15 @@ async def subscribe_single_inner(self, url):
5864
data = json.loads(message)
5965
self.parse_hermes_message(data)
6066
except asyncio.TimeoutError:
67+
logger.warning("HermesListener: No messages in {} seconds, reconnecting...", STALE_TIMEOUT_SECONDS)
6168
raise StaleConnectionError(f"No messages in {STALE_TIMEOUT_SECONDS} seconds, reconnecting")
6269
except websockets.ConnectionClosed:
70+
logger.warning("HermesListener: Connection closed, reconnecting...")
6371
raise
6472
except json.JSONDecodeError as e:
65-
logger.error("Failed to decode JSON message: {}", e)
73+
logger.exception("Failed to decode JSON message: {}", repr(e))
6674
except Exception as e:
67-
logger.error("Unexpected exception: {}", e)
75+
logger.exception("Unexpected exception: {}", repr(e))
6876

6977
def parse_hermes_message(self, data):
7078
"""
@@ -83,7 +91,6 @@ def parse_hermes_message(self, data):
8391
expo = price_object["expo"]
8492
publish_time = price_object["publish_time"]
8593
logger.debug("Hermes update: {} {} {} {}", id, price, expo, publish_time)
86-
now = time.time()
87-
self.hermes_state.put(id, PriceUpdate(price, now))
94+
self.hermes_state.put(id, PriceUpdate(price, publish_time))
8895
except Exception as e:
89-
logger.error("parse_hermes_message error: {}", e)
96+
logger.exception("parse_hermes_message error: {}", repr(e))

apps/hip-3-pusher/src/pusher/hyperliquid_listener.py

Lines changed: 70 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
import asyncio
22
import json
3+
from enum import StrEnum
4+
35
import websockets
46
from loguru import logger
5-
from tenacity import retry, retry_if_exception_type, wait_exponential
7+
from tenacity import retry, retry_if_exception_type, wait_fixed, stop_after_attempt
68
import time
79

810
from pusher.config import Config, STALE_TIMEOUT_SECONDS
@@ -14,6 +16,22 @@
1416
HYPERLIQUID_MAINNET_WS_URL = "wss://api.hyperliquid.xyz/ws"
1517
HYPERLIQUID_TESTNET_WS_URL = "wss://api.hyperliquid-testnet.xyz/ws"
1618

19+
class HLChannel(StrEnum):
20+
""" Hyperliquid websocket subscription channels. See https://hyperliquid.gitbook.io/hyperliquid-docs/for-developers/api/websocket/subscriptions """
21+
22+
# activeAssetCtx includes oracle and mark price for perps (either main HyperCore or HIP-3)
23+
CHANNEL_ACTIVE_ASSET_CTX = "activeAssetCtx"
24+
# HL market mid price
25+
CHANNEL_ALL_MIDS = "allMids"
26+
# either subscription ack or error
27+
CHANNEL_SUBSCRIPTION_RESPONSE = "subscriptionResponse"
28+
# application-level ping response
29+
CHANNEL_PONG = "pong"
30+
# error response
31+
CHANNEL_ERROR = "error"
32+
33+
DATA_CHANNELS = [HLChannel.CHANNEL_ACTIVE_ASSET_CTX, HLChannel.CHANNEL_ALL_MIDS]
34+
1735

1836
class HyperliquidListener:
1937
"""
@@ -27,6 +45,8 @@ def __init__(self, config: Config, hl_oracle_state: PriceSourceState, hl_mark_st
2745
self.hl_oracle_state = hl_oracle_state
2846
self.hl_mark_state = hl_mark_state
2947
self.hl_mid_state = hl_mid_state
48+
self.ws_ping_interval = config.hyperliquid.ws_ping_interval
49+
self.stop_after_attempt = config.hyperliquid.stop_after_attempt
3050

3151
def get_subscribe_request(self, asset):
3252
return {
@@ -37,13 +57,19 @@ def get_subscribe_request(self, asset):
3757
async def subscribe_all(self):
3858
await asyncio.gather(*(self.subscribe_single(hyperliquid_ws_url) for hyperliquid_ws_url in self.hyperliquid_ws_urls))
3959

40-
@retry(
41-
retry=retry_if_exception_type((StaleConnectionError, websockets.ConnectionClosed)),
42-
wait=wait_exponential(multiplier=1, min=1, max=10),
43-
reraise=True,
44-
)
4560
async def subscribe_single(self, url):
46-
return await self.subscribe_single_inner(url)
61+
logger.info("Starting Hyperliquid listener loop: {}", url)
62+
63+
@retry(
64+
retry=retry_if_exception_type(Exception),
65+
wait=wait_fixed(1),
66+
stop=stop_after_attempt(self.stop_after_attempt),
67+
reraise=True,
68+
)
69+
async def _run():
70+
return await self.subscribe_single_inner(url)
71+
72+
return await _run()
4773

4874
async def subscribe_single_inner(self, url):
4975
async with websockets.connect(url) as ws:
@@ -59,48 +85,69 @@ async def subscribe_single_inner(self, url):
5985
await ws.send(json.dumps(subscribe_all_mids_request))
6086
logger.info("Sent subscribe request for allMids for dex: {} to {}", self.market_name, url)
6187

88+
now = time.time()
89+
channel_last_message_timestamp = {channel: now for channel in HLChannel}
90+
last_ping_timestamp = now
91+
6292
# listen for updates
6393
while True:
6494
try:
6595
message = await asyncio.wait_for(ws.recv(), timeout=STALE_TIMEOUT_SECONDS)
6696
data = json.loads(message)
6797
channel = data.get("channel", None)
98+
now = time.time()
6899
if not channel:
69100
logger.error("No channel in message: {}", data)
70-
elif channel == "subscriptionResponse":
71-
logger.debug("Received subscription response: {}", data)
72-
elif channel == "error":
101+
elif channel == HLChannel.CHANNEL_SUBSCRIPTION_RESPONSE:
102+
logger.info("Received subscription response: {}", data)
103+
elif channel == HLChannel.CHANNEL_ERROR:
73104
logger.error("Received Hyperliquid error response: {}", data)
74-
elif channel == "activeAssetCtx":
75-
self.parse_hyperliquid_active_asset_ctx_update(data)
76-
elif channel == "allMids":
77-
self.parse_hyperliquid_all_mids_update(data)
105+
elif channel == HLChannel.CHANNEL_ACTIVE_ASSET_CTX:
106+
self.parse_hyperliquid_active_asset_ctx_update(data, now)
107+
channel_last_message_timestamp[channel] = now
108+
elif channel == HLChannel.CHANNEL_ALL_MIDS:
109+
self.parse_hyperliquid_all_mids_update(data, now)
110+
channel_last_message_timestamp[channel] = now
111+
elif channel == HLChannel.CHANNEL_PONG:
112+
logger.debug("Received pong")
78113
else:
79114
logger.error("Received unknown channel: {}", channel)
115+
116+
# check for stale channels
117+
for channel in DATA_CHANNELS:
118+
if now - channel_last_message_timestamp[channel] > STALE_TIMEOUT_SECONDS:
119+
logger.warning("HyperliquidLister: no messages in channel {} stale in {} seconds; reconnecting...", channel, STALE_TIMEOUT_SECONDS)
120+
raise StaleConnectionError(f"No messages in channel {channel} in {STALE_TIMEOUT_SECONDS} seconds, reconnecting...")
121+
122+
# ping if we need to
123+
if now - last_ping_timestamp > self.ws_ping_interval:
124+
await ws.send(json.dumps({"method": "ping"}))
125+
last_ping_timestamp = now
80126
except asyncio.TimeoutError:
81-
raise StaleConnectionError(f"No messages in {STALE_TIMEOUT_SECONDS} seconds, reconnecting...")
82-
except websockets.ConnectionClosed:
127+
logger.warning("HyperliquidListener: No messages overall in {} seconds, reconnecting...", STALE_TIMEOUT_SECONDS)
128+
raise StaleConnectionError(f"No messages overall in {STALE_TIMEOUT_SECONDS} seconds, reconnecting...")
129+
except websockets.ConnectionClosed as e:
130+
rc, rr = e.rcvd.code if e.rcvd else None, e.rcvd.reason if e.rcvd else None
131+
logger.warning("HyperliquidListener: Websocket connection closed (code={} reason={}); reconnecting...", rc, rr)
83132
raise
84133
except json.JSONDecodeError as e:
85-
logger.error("Failed to decode JSON message: {} error: {}", message, e)
134+
logger.exception("Failed to decode JSON message: {} error: {}", message, repr(e))
86135
except Exception as e:
87-
logger.error("Unexpected exception: {}", e)
136+
logger.exception("Unexpected exception: {}", repr(e))
88137

89-
def parse_hyperliquid_active_asset_ctx_update(self, message):
138+
def parse_hyperliquid_active_asset_ctx_update(self, message, now):
90139
try:
91140
ctx = message["data"]["ctx"]
92141
symbol = message["data"]["coin"]
93-
now = time.time()
94142
self.hl_oracle_state.put(symbol, PriceUpdate(ctx["oraclePx"], now))
95143
self.hl_mark_state.put(symbol, PriceUpdate(ctx["markPx"], now))
96144
logger.debug("activeAssetCtx symbol: {} oraclePx: {} markPx: {}", symbol, ctx["oraclePx"], ctx["markPx"])
97145
except Exception as e:
98-
logger.error("parse_hyperliquid_active_asset_ctx_update error: message: {} e: {}", message, e)
146+
logger.exception("parse_hyperliquid_active_asset_ctx_update error: message: {} e: {}", message, repr(e))
99147

100-
def parse_hyperliquid_all_mids_update(self, message):
148+
def parse_hyperliquid_all_mids_update(self, message, now):
101149
try:
102150
mids = message["data"]["mids"]
103-
now = time.time()
104151
for mid in mids:
105152
self.hl_mid_state.put(mid, PriceUpdate(mids[mid], now))
106153
logger.debug("allMids: {}", mids)

apps/hip-3-pusher/src/pusher/lazer_listener.py

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
import asyncio
22
import json
33
from loguru import logger
4-
import time
54
import websockets
6-
from tenacity import retry, retry_if_exception_type, wait_exponential
5+
from tenacity import retry, retry_if_exception_type, wait_fixed, stop_after_attempt
76

87
from pusher.config import Config, STALE_TIMEOUT_SECONDS
98
from pusher.exception import StaleConnectionError
@@ -19,6 +18,7 @@ def __init__(self, config: Config, lazer_state: PriceSourceState):
1918
self.api_key = config.lazer.lazer_api_key
2019
self.feed_ids = config.lazer.feed_ids
2120
self.lazer_state = lazer_state
21+
self.stop_after_attempt = config.lazer.stop_after_attempt
2222

2323
def get_subscribe_request(self, subscription_id: int):
2424
return {
@@ -40,13 +40,19 @@ async def subscribe_all(self):
4040

4141
await asyncio.gather(*(self.subscribe_single(router_url) for router_url in self.lazer_urls))
4242

43-
@retry(
44-
retry=retry_if_exception_type((StaleConnectionError, websockets.ConnectionClosed)),
45-
wait=wait_exponential(multiplier=1, min=1, max=10),
46-
reraise=True,
47-
)
4843
async def subscribe_single(self, router_url):
49-
return await self.subscribe_single_inner(router_url)
44+
logger.info("Starting Lazer listener loop: {}", router_url)
45+
46+
@retry(
47+
retry=retry_if_exception_type(Exception),
48+
wait=wait_fixed(1),
49+
stop=stop_after_attempt(self.stop_after_attempt),
50+
reraise=True,
51+
)
52+
async def _run():
53+
return await self.subscribe_single_inner(router_url)
54+
55+
return await _run()
5056

5157
async def subscribe_single_inner(self, router_url):
5258
headers = {
@@ -66,13 +72,15 @@ async def subscribe_single_inner(self, router_url):
6672
data = json.loads(message)
6773
self.parse_lazer_message(data)
6874
except asyncio.TimeoutError:
75+
logger.warning("LazerListener: No messages in {} seconds, reconnecting...", STALE_TIMEOUT_SECONDS)
6976
raise StaleConnectionError(f"No messages in {STALE_TIMEOUT_SECONDS} seconds, reconnecting")
7077
except websockets.ConnectionClosed:
78+
logger.warning("LazerListener: Connection closed, reconnecting...")
7179
raise
7280
except json.JSONDecodeError as e:
73-
logger.error("Failed to decode JSON message: {}", e)
81+
logger.exception("Failed to decode JSON message: {}", repr(e))
7482
except Exception as e:
75-
logger.error("Unexpected exception: {}", e)
83+
logger.exception("Unexpected exception: {}", repr(e))
7684

7785
def parse_lazer_message(self, data):
7886
"""
@@ -85,14 +93,15 @@ def parse_lazer_message(self, data):
8593
if data.get("type", "") != "streamUpdated":
8694
return
8795
price_feeds = data["parsed"]["priceFeeds"]
88-
logger.debug("price_feeds: {}", price_feeds)
89-
now = time.time()
96+
# timestampUs is in micros, this is scaled to unix seconds the same as time.time()
97+
timestamp_seconds = int(data["parsed"]["timestampUs"]) / 1_000_000.0
98+
logger.debug("price_feeds: {} timestamp: {}", price_feeds, timestamp_seconds)
9099
for feed_update in price_feeds:
91100
feed_id = feed_update.get("priceFeedId", None)
92101
price = feed_update.get("price", None)
93102
if feed_id is None or price is None:
94103
continue
95104
else:
96-
self.lazer_state.put(feed_id, PriceUpdate(price, now))
105+
self.lazer_state.put(feed_id, PriceUpdate(price, timestamp_seconds))
97106
except Exception as e:
98-
logger.error("parse_lazer_message error: {}", e)
107+
logger.exception("parse_lazer_message error: {}", repr(e))

apps/hip-3-pusher/src/pusher/main.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from pusher.price_state import PriceState
1414
from pusher.publisher import Publisher
1515
from pusher.metrics import Metrics
16+
from pusher.user_limit_listener import UserLimitListener
1617

1718

1819
def load_config():
@@ -52,13 +53,15 @@ async def main():
5253
lazer_listener = LazerListener(config, price_state.lazer_state)
5354
hermes_listener = HermesListener(config, price_state.hermes_state)
5455
seda_listener = SedaListener(config, price_state.seda_state)
56+
user_limit_listener = UserLimitListener(config, metrics, publisher.user_limit_address)
5557

5658
await asyncio.gather(
5759
publisher.run(),
5860
hyperliquid_listener.subscribe_all(),
5961
lazer_listener.subscribe_all(),
6062
hermes_listener.subscribe_all(),
6163
seda_listener.run(),
64+
user_limit_listener.run(),
6265
)
6366
logger.info("Exiting hip-3-pusher..")
6467

apps/hip-3-pusher/src/pusher/metrics.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,11 @@ def _init_metrics(self):
4848
name="hip_3_relayer_price_config",
4949
description="Price source config",
5050
)
51+
# labels: dex, user
52+
self.user_request_balance = self.meter.create_gauge(
53+
name="hip_3_relayer_user_request_balance",
54+
description="Number of update requests left before rate limit",
55+
)
5156

5257
def set_price_configs(self, dex: str, price_config: PriceConfig):
5358
self._set_price_config_type(dex, price_config.oracle, "oracle")

0 commit comments

Comments
 (0)