Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions crates/autopilot/src/database/auction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,18 +97,30 @@ impl Postgres {
})
}

pub async fn get_next_auction_id(&self) -> Result<dto::AuctionId> {
let _timer = super::Metrics::get()
.database_queries
.with_label_values(&["get_next_auction_id"])
.start_timer();

let mut ex = self.pool.acquire().await?;
let id = database::auction::get_next_auction_id(&mut ex).await?;
Ok(id)
}

pub async fn replace_current_auction(
&self,
id: dto::AuctionId,
auction: &dto::RawAuctionData,
) -> Result<dto::AuctionId> {
) -> Result<()> {
let _timer = super::Metrics::get()
.database_queries
.with_label_values(&["replace_current_auction"])
.with_label_values(&["insert_auction_with_id"])
.start_timer();

let data = serde_json::to_value(auction)?;
let mut ex = self.pool.acquire().await?;
let id = database::auction::replace_auction(&mut ex, &data).await?;
Ok(id)
database::auction::insert_auction_with_id(&mut ex, id, &data).await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To supplement my point, I'd have to dive up to here to understand that idis the new auction ID

Ok(())
}
}
86 changes: 41 additions & 45 deletions crates/autopilot/src/infra/persistence/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,27 +63,52 @@ impl Persistence {
LeaderLock::new(self.postgres.pool.clone(), key, Duration::from_millis(200))
}

/// There is always only one `current` auction.
///
/// This method replaces the current auction with the given one.
///
/// If the given auction is successfully saved, it is also archived.
pub async fn replace_current_auction(
&self,
auction: &domain::RawAuctionData,
) -> Result<domain::auction::Id, DatabaseError> {
let _timer = observe::metrics::metrics()
.on_auction_overhead_start("autopilot", "replace_auction_in_db");
let auction = dto::auction::from_domain(auction.clone());
/// Fetches the ID that should be used for the next auction.
pub async fn get_next_auction_id(&self) -> Result<domain::auction::Id, DatabaseError> {
let _timer = Metrics::get()
.database_queries
.with_label_values(&["get_next_auction_id"])
.start_timer();
self.postgres
.replace_current_auction(&auction)
.get_next_auction_id()
.await
.inspect(|&id| {
self.archive_auction(dto::auction::Auction { id, auction });
})
.map_err(DatabaseError)
}

/// Spawns a background task that replaces the current auction in the DB
/// with the new one.
pub fn replace_current_auction_in_db(
&self,
id: domain::auction::Id,
auction: &domain::RawAuctionData,
Comment on lines +82 to +83
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the variables could have better names, it's kind of hard to tell which is which by just looking at it. Is the ID the new one, or is it contained in AuctionData?

) {
let auction_dto = dto::auction::from_domain(auction.clone());
let db = self.postgres.clone();
tokio::task::spawn(async move {
let _ = db
.replace_current_auction(id, &auction_dto)
.await
.inspect_err(|err| tracing::warn!(?err, "failed to replace auction in DB"));
});
}

/// Spawns a background task that uploads the auction to S3.
pub fn upload_auction_to_s3(&self, id: domain::auction::Id, auction: &domain::RawAuctionData) {
if auction.orders.is_empty() {
return;
}
let Some(s3) = self.s3.clone() else {
return;
};
let auction_dto = dto::auction::from_domain(auction.clone());
tokio::task::spawn(async move {
match s3.upload(id.to_string(), &auction_dto).await {
Ok(key) => tracing::info!(?key, "uploaded auction to s3"),
Err(err) => tracing::warn!(?err, "failed to upload auction to s3"),
Comment on lines +106 to +107
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add the auction id to the logs too? I'm not sure the error has it

}
});
}

/// Finds solvable orders based on the order's min validity period.
pub async fn all_solvable_orders(
&self,
Expand All @@ -95,35 +120,6 @@ impl Persistence {
.context("failed to fetch all solvable orders")
}

/// Saves the given auction to storage for debugging purposes.
///
/// There is no intention to retrieve this data programmatically.
fn archive_auction(&self, instance: dto::auction::Auction) {
let Some(uploader) = self.s3.clone() else {
return;
};
if instance.auction.orders.is_empty() {
tracing::info!("skip upload of empty auction");
return;
}
tokio::spawn(
async move {
match uploader
.upload(instance.id.to_string(), &instance.auction)
.await
{
Ok(key) => {
tracing::info!(?key, "uploaded auction to s3");
}
Err(err) => {
tracing::warn!(?err, "failed to upload auction to s3");
}
}
}
.instrument(tracing::Span::current()),
);
}

/// Saves the competition data to the DB
pub async fn save_competition(
&self,
Expand Down
30 changes: 13 additions & 17 deletions crates/autopilot/src/run_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,33 +262,29 @@ impl RunLoop {
}

async fn cut_auction(&self) -> Option<domain::Auction> {
let auction = match self.solvable_orders_cache.current_auction().await {
Some(auction) => auction,
None => {
tracing::debug!("no current auction");
return None;
}
let Some(auction) = self.solvable_orders_cache.current_auction().await else {
tracing::debug!("no current auction");
return None;
};
let auction = self.remove_in_flight_orders(auction).await;
let id = self
.persistence
.get_next_auction_id()
.await
.inspect_err(|err| tracing::error!(?err, "failed to get next auction id"))
.ok()?;
Metrics::auction(id);

let id = match self.persistence.replace_current_auction(&auction).await {
Ok(id) => {
Metrics::auction(id);
id
}
Err(err) => {
tracing::error!(?err, "failed to replace current auction");
return None;
}
};
// always update the auction because the tests use this as a readiness probe
self.persistence.replace_current_auction_in_db(id, &auction);
self.persistence.upload_auction_to_s3(id, &auction);

if auction.orders.is_empty() {
// Updating liveness probe to not report unhealthy due to this optimization
self.liveness.auction();
tracing::debug!("skipping empty auction");
return None;
}

Some(domain::Auction {
id,
block: auction.block,
Expand Down
28 changes: 19 additions & 9 deletions crates/database/src/auction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,29 @@ LIMIT 1
sqlx::query_as(QUERY).fetch_optional(ex).await
}

pub async fn replace_auction(
pub async fn get_next_auction_id(ex: &mut PgConnection) -> Result<AuctionId, sqlx::Error> {
const QUERY: &str =
r#"SELECT nextval(pg_get_serial_sequence('auctions', 'id'))::bigint as next_id;"#;

let (id,) = sqlx::query_as(QUERY).fetch_one(ex).await?;
Ok(id)
}

pub async fn insert_auction_with_id(
ex: &mut PgConnection,
id: AuctionId,
data: &JsonValue,
) -> Result<AuctionId, sqlx::Error> {
) -> Result<(), sqlx::Error> {
const QUERY: &str = r#"
WITH deleted AS (
DELETE FROM auctions
)
INSERT INTO auctions (json)
VALUES ($1)
RETURNING id;
INSERT INTO auctions (id, json)
VALUES ($1, $2);
"#;

let (id,) = sqlx::query_as(QUERY).bind(data).fetch_one(ex).await?;
Ok(id)
sqlx::query(QUERY).bind(id).bind(data).execute(ex).await?;
Ok(())
}

#[derive(Debug, Clone, PartialEq, sqlx::FromRow)]
Expand Down Expand Up @@ -96,14 +104,16 @@ mod tests {
crate::clear_DANGER_(&mut db).await.unwrap();

let value = JsonValue::Number(1.into());
let id = replace_auction(&mut db, &value).await.unwrap();
let id = get_next_auction_id(&mut db).await.unwrap();
insert_auction_with_id(&mut db, id, &value).await.unwrap();
let (id_, value_) = load_most_recent(&mut db).await.unwrap().unwrap();
assert_eq!(id, id_);
assert_eq!(value, value_);

let value = JsonValue::Number(2.into());
let id_ = replace_auction(&mut db, &value).await.unwrap();
let id_ = get_next_auction_id(&mut db).await.unwrap();
assert_eq!(id + 1, id_);
insert_auction_with_id(&mut db, id_, &value).await.unwrap();
let (id, value_) = load_most_recent(&mut db).await.unwrap().unwrap();
assert_eq!(value, value_);
assert_eq!(id_, id);
Expand Down
Loading