-
Notifications
You must be signed in to change notification settings - Fork 316
Check for staleness in each Hyperliquid websocket channel #3289
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
| retry=retry_if_exception_type((StaleConnectionError, websockets.ConnectionClosed)), | ||
| wait=wait_exponential(multiplier=1, min=1, max=10), | ||
| retry=retry_if_exception_type(Exception), | ||
| wait=wait_fixed(1), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will it stop retrying after some max retries? Probably a good idea to reraise and crash the app when the retries are exhausted
(Same for the other decorators)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, no. Should be able to tweak this appropriately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add configurable stop_after_attempt for this purpose.
| price_feeds = data["parsed"]["priceFeeds"] | ||
| logger.debug("price_feeds: {}", price_feeds) | ||
| now = time.time() | ||
| timestamp = int(data["parsed"]["timestampUs"]) / 1_000_000.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we name the variable something like timestamp_ms (or whatever the resolution is)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
| return None | ||
| quote_price = self.get_price_from_single_source(quote_source) | ||
| if quote_price is None: | ||
| if quote_price is None or float(quote_price) == 0: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What edge case are we covering here?
Looks like we're casting the string quote price to a float and comparing it to integer zero. Also, you may get caught out by float representation equality issues here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Roman at AR called this out to prevent a divide by zero, would agree it's not likely or helpful. The quote price itself should be from some external source string so we wouldn't expect a float rounding issue here.
| await asyncio.sleep(self.publish_interval) | ||
| try: | ||
| self.publish() | ||
| await self.publish() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to make it async? Looks like because of that we wrap the blocking sync code in a thread (asyncio.to_thread). Is it to avoid blocking the read tasks while publish is happening?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exactly!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We needed to be more careful here to prevent and non-async library requests from blocking reads. Which did appear to be happening.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
well i don't know the rest of the code, this works only if you from the beginning have different tasks, is that so?
in any case, why are you not creating new tasks to publish the data as a fire and forget operation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ali-behjati It's been part of a top-level task. The issue is that we added a mechanism (asyncio.to_thread) to make this push call (which uses the sync requests library) not block other read tasks (which are using async-friendly websocket libraries), and then we had to make all functions in the call tree async/await.
| from pusher.metrics import Metrics | ||
|
|
||
|
|
||
| class UserLimitListener: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would he to document what this class is for
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, will do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Docstring added.
| await asyncio.gather( | ||
| publisher.run(), | ||
| hyperliquid_listener.subscribe_all(), | ||
| lazer_listener.subscribe_all(), | ||
| hermes_listener.subscribe_all(), | ||
| seda_listener.run(), | ||
| user_limit_listener.run(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the expected behavior if any of these tasks exit/raise? I guess the app crashes and we let k8s restart it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct!
| pxs[f"{self.market_name}:{symbol}"] = str(px) | ||
| break | ||
| except Exception as e: | ||
| logger.exception("get_price exception for symbol: {} source_config: {} error: {}", symbol, source_config, e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For printing out expections use repr(e) to maintain useful type context in the output
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed everywhere.
ali-behjati
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks good. I recommend splitting changes in different PRs next time + much more documentation on the code. It's easy to follow for a new reader.
| from typing import Literal | ||
|
|
||
| STALE_TIMEOUT_SECONDS = 5 | ||
| USER_LIMIT_INTERVAL_SECONDS = 1800 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does this mean?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, it's the frequency for calling the user rate limit Hyperliquid API call. It can be low frequency because we basically just want to identify low wallet ballance within a few days.
| HYPERLIQUID_MAINNET_WS_URL = "wss://api.hyperliquid.xyz/ws" | ||
| HYPERLIQUID_TESTNET_WS_URL = "wss://api.hyperliquid-testnet.xyz/ws" | ||
|
|
||
| class HLChannel(StrEnum): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add some comments explain what it is.
| logger.exception("userRateLimit query failed: {}", repr(e)) | ||
|
|
||
| # want to update every 60s to keep metric populated in Grafana | ||
| await asyncio.sleep(60) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's strange. shouldn't the prom client always return the latest number without you needing to set it every 60s? if it doesn't which is strange, i recommend doing it every 30s as you are making calls that might take time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd think so, the issue in grafana was the metric was only popping up during the update frequency even using latest, and was leading to "no data" in the stats panel in many cases. Followed ChatGPT's suggestion to update the metric more frequently.
| await asyncio.sleep(self.publish_interval) | ||
| try: | ||
| self.publish() | ||
| await self.publish() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
well i don't know the rest of the code, this works only if you from the beginning have different tasks, is that so?
in any case, why are you not creating new tasks to publish the data as a fire and forget operation?
Summary
Rationale
How has this been tested?