-
Notifications
You must be signed in to change notification settings - Fork 152
Post process settlements in parallel #3853
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
19e9c5a to
d1df7a8
Compare
| const TEMP_ERROR_BACK_OFF: Duration = Duration::from_millis(100); | ||
| tokio::time::sleep(TEMP_ERROR_BACK_OFF).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Q: should it have some form of jitter to avoid retrying a bunch of things at the same time?
| match Self::retry_with_sleep(|| self.post_process_settlement(settlement)).await { | ||
| Some(_) => tracing::debug!(tx = ?settlement.transaction, "successfully post-processed settlement"), | ||
| None => tracing::warn!(tx = ?settlement.transaction, "gave up on post-processing settlement"), | ||
| } | ||
| }) | ||
| .await; | ||
| } | ||
|
|
||
| async fn post_process_settlement(&self, settlement: eth::SettlementEvent) -> Result<()> { | ||
| let settlement_data = self | ||
| .fetch_auction_data_for_transaction(event.transaction) | ||
| .fetch_auction_data_for_transaction(settlement.transaction) | ||
| .await?; | ||
| self.persistence | ||
| .save_settlement(event, settlement_data.as_ref()) | ||
| .save_settlement(settlement, settlement_data.as_ref()) | ||
| .await | ||
| .context("failed to update settlement")?; | ||
|
|
||
| match settlement_data { | ||
| None => Ok(IndexSuccess::SkippedInvalidTransaction), | ||
| Some(_) => Ok(IndexSuccess::IndexedSettlement), | ||
| } | ||
| .context("failed to update settlement") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that we're running multiple ones in parallel, won't this apply extra write pressure on the DB?
Should we rather aggregate concurrent reads and write a single batch?
squadgazzz
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The change makes sense to me, and I also don't see any obvious pitfalls.
| .unwrap_or_default(); | ||
|
|
||
| // everything worked fine -> reset our attempts for the next settlement | ||
| attempts = 0; | ||
| if settlements.is_empty() { | ||
| tracing::debug!("no unprocessed settlements found"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When retry_with_sleep returns None, that means it gave up on retrying with errors. But the log won't reflect it. Should we clarify that there is a problem with the fetching logic?
| } | ||
| } | ||
|
|
||
| async fn retry_with_sleep<F, OK, ERR>(future: impl Fn() -> F) -> Option<OK> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe it would help in debugging if instead we would collect the encountered errors and return Result<OK, Vec<ERR>>, then log it on call site? This would allow us to understand the error.
Alternatively one could assume the passed in future does its own logging.
|
This pull request has been marked as stale because it has been inactive a while. Please update this pull request or it will be automatically closed. |
Description
Currently we post-process settlements (i.e. associate the tx with a solution proposed by a solver) serially. IIRC it was done that way because it was simpler and that step simply ran in a background task at the time.
Since then this logic has been moved onto the hot path so all the time we spend there delays the creation of the next auction.
Additionally with the introduction of combinatorial auctions it's now possible and relatively common to have multiple settlement transactions in the same block. Whenever we have multiple settlements in one block this PR should result in a significant performance uplift because we process multiple concurrently.
Changes
replaced loop to post-process auctions serially with logic that first fetches ALL unprocessed settlements and then works on up to 10 of them concurrently. This is fine for the post processing logic (as opposed to the raw event indexing) because it's okay for post-processing to happen out of order. So if the we successfully post-process settlement
n+1but not settlementn(e.g. due to a crash) the DB query would still just return all unprocessed events instead of all NEW unprocessed events.I also adjusted how retrying works in this code. Instead of returning some result enum (ok, invalid, nothing_to_do) to make the serial loop retry something we now have a function
retry_with_sleepthat simply retires a passed in future n times and returns anOptionto indicate whether it was successful or not.This is used in 2 places:
With that the new logic should be as robust as the old one while being IMO easier to reason about.
How to test
Not sure how to test the improvement specifically. Since this is a performance optimization and I don't really want to test the internals of the implementation having a new test that makes sure that a big number of settlements can be post-processed would be enough to test correctness and for the performance aspect we'd have to deploy this on the cluster.
Regarding the performance I temporarily deployed it to staging and it produced the expected effect of reducing the spikiness that comes from multiple settlements needing to be post-processed in the same block.
