Skip to content

Commit 0529c6a

Browse files
committed
fixes
1 parent 10c6f14 commit 0529c6a

File tree

4 files changed

+29
-9
lines changed

4 files changed

+29
-9
lines changed

apps/hip-3-pusher/src/pusher/config.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
STALE_TIMEOUT_SECONDS = 5
77
USER_LIMIT_INTERVAL_SECONDS = 1800
8+
HYPERLIQUID_WS_PING_INTERVAL_SECONDS = 20
89

910

1011
class KMSConfig(BaseModel):
@@ -39,6 +40,7 @@ class HyperliquidConfig(BaseModel):
3940
publish_timeout: float
4041
enable_publish: bool
4142
user_limit_interval: int = USER_LIMIT_INTERVAL_SECONDS
43+
ws_ping_interval: int = HYPERLIQUID_WS_PING_INTERVAL_SECONDS
4244

4345
@model_validator(mode="after")
4446
def set_default_urls(self):

apps/hip-3-pusher/src/pusher/hyperliquid_listener.py

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,11 @@
1919
class HLChannel(StrEnum):
2020
CHANNEL_ACTIVE_ASSET_CTX = "activeAssetCtx"
2121
CHANNEL_ALL_MIDS = "allMids"
22+
CHANNEL_SUBSCRIPTION_RESPONSE = "subscriptionResponse"
23+
CHANNEL_PONG = "pong"
24+
CHANNEL_ERROR = "error"
25+
26+
DATA_CHANNELS = [HLChannel.CHANNEL_ACTIVE_ASSET_CTX, HLChannel.CHANNEL_ALL_MIDS]
2227

2328

2429
class HyperliquidListener:
@@ -33,6 +38,7 @@ def __init__(self, config: Config, hl_oracle_state: PriceSourceState, hl_mark_st
3338
self.hl_oracle_state = hl_oracle_state
3439
self.hl_mark_state = hl_mark_state
3540
self.hl_mid_state = hl_mid_state
41+
self.ws_ping_interval = config.hyperliquid.ws_ping_interval
3642

3743
def get_subscribe_request(self, asset):
3844
return {
@@ -66,7 +72,10 @@ async def subscribe_single_inner(self, url):
6672
await ws.send(json.dumps(subscribe_all_mids_request))
6773
logger.info("Sent subscribe request for allMids for dex: {} to {}", self.market_name, url)
6874

69-
channel_last_message_timestamp = {channel: time.time() for channel in HLChannel}
75+
now = time.time()
76+
channel_last_message_timestamp = {channel: now for channel in HLChannel}
77+
last_ping_timestamp = now
78+
7079
# listen for updates
7180
while True:
7281
try:
@@ -76,29 +85,37 @@ async def subscribe_single_inner(self, url):
7685
now = time.time()
7786
if not channel:
7887
logger.error("No channel in message: {}", data)
79-
elif channel == "subscriptionResponse":
80-
logger.debug("Received subscription response: {}", data)
81-
elif channel == "error":
88+
elif channel == HLChannel.CHANNEL_SUBSCRIPTION_RESPONSE:
89+
logger.info("Received subscription response: {}", data)
90+
elif channel == HLChannel.CHANNEL_ERROR:
8291
logger.error("Received Hyperliquid error response: {}", data)
8392
elif channel == HLChannel.CHANNEL_ACTIVE_ASSET_CTX:
8493
self.parse_hyperliquid_active_asset_ctx_update(data, now)
8594
channel_last_message_timestamp[channel] = now
8695
elif channel == HLChannel.CHANNEL_ALL_MIDS:
8796
self.parse_hyperliquid_all_mids_update(data, now)
8897
channel_last_message_timestamp[channel] = now
98+
elif channel == HLChannel.CHANNEL_PONG:
99+
logger.debug("Received pong")
89100
else:
90101
logger.error("Received unknown channel: {}", channel)
91102

92103
# check for stale channels
93-
for channel in HLChannel:
104+
for channel in DATA_CHANNELS:
94105
if now - channel_last_message_timestamp[channel] > STALE_TIMEOUT_SECONDS:
95106
logger.warning("HyperliquidLister: no messages in channel {} stale in {} seconds; reconnecting...", channel, STALE_TIMEOUT_SECONDS)
96107
raise StaleConnectionError(f"No messages in channel {channel} in {STALE_TIMEOUT_SECONDS} seconds, reconnecting...")
108+
109+
# ping if we need to
110+
if now - last_ping_timestamp > self.ws_ping_interval:
111+
await ws.send(json.dumps({"method": "ping"}))
112+
last_ping_timestamp = now
97113
except asyncio.TimeoutError:
98114
logger.warning("HyperliquidListener: No messages overall in {} seconds, reconnecting...", STALE_TIMEOUT_SECONDS)
99115
raise StaleConnectionError(f"No messages overall in {STALE_TIMEOUT_SECONDS} seconds, reconnecting...")
100-
except websockets.ConnectionClosed:
101-
logger.warning("HyperliquidListener: Connection closed, reconnecting...")
116+
except websockets.ConnectionClosed as e:
117+
rc, rr = e.rcvd.code if e.rcvd else None, e.rcvd.reason if e.rcvd else None
118+
logger.warning("HyperliquidListener: Websocket connection closed (code={} reason={}); reconnecting...", rc, rr)
102119
raise
103120
except json.JSONDecodeError as e:
104121
logger.error("Failed to decode JSON message: {} error: {}", message, e)

apps/hip-3-pusher/src/pusher/metrics.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ def _init_metrics(self):
4848
name="hip_3_relayer_price_config",
4949
description="Price source config",
5050
)
51-
# labels: user
51+
# labels: dex, user
5252
self.user_request_balance = self.meter.create_gauge(
5353
name="hip_3_relayer_user_request_balance",
5454
description="Number of update requests left before rate limit",

apps/hip-3-pusher/src/pusher/user_limit_listener.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ def __init__(self, config: Config, metrics: Metrics, address: str):
1515
self.address = address.lower()
1616
self.metrics = metrics
1717
self.interval = config.hyperliquid.user_limit_interval
18+
self.dex = config.hyperliquid.market_name
1819

1920
base_url = constants.TESTNET_API_URL if config.hyperliquid.use_testnet else constants.MAINNET_API_URL
2021
self.info = Info(base_url=base_url, skip_ws=True, meta=Meta(universe=[]), spot_meta=SpotMeta(universe=[], tokens=[]))
@@ -27,7 +28,7 @@ async def run(self):
2728
logger.debug("userRateLimit response: {}", response)
2829
balance = response["nRequestsSurplus"] - response["nRequestsCap"] - response["nRequestsUsed"]
2930
logger.debug("userRateLimit user: {} balance: {}", self.address, balance)
30-
self.metrics.user_request_balance.set(balance, {"user": self.address})
31+
self.metrics.user_request_balance.set(balance, {"dex": self.dex, "user": self.address})
3132
except Exception as e:
3233
logger.error("userRateLimit query failed: {}", e)
3334
await asyncio.sleep(self.interval)

0 commit comments

Comments
 (0)