Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion engine/artifacts/openapi.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion engine/packages/api-public/src/actors/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,12 @@ pub async fn list(Extension(ctx): Extension<ApiCtx>, Query(query): Query<ListQue
}

async fn list_inner(ctx: ApiCtx, query: ListQuery) -> Result<ListResponse> {
ctx.auth().await?;
// Reading is allowed, list requires auth
if query.actor_ids.is_none() && query.actor_id.is_empty() && query.key.is_none() {
ctx.auth().await?;
} else {
ctx.skip_auth();
}

// Parse query
let actor_ids = [
Expand Down
1 change: 1 addition & 0 deletions engine/packages/api-types/src/actors/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub struct CreateRequest {
pub datacenter: Option<String>,
pub name: String,
pub key: Option<String>,
/// Arbitrary base64 encoded binary data.
pub input: Option<String>,
pub runner_name_selector: String,
pub crash_policy: rivet_types::actors::CrashPolicy,
Expand Down
43 changes: 25 additions & 18 deletions engine/packages/pegboard-runner/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ mod ws_to_tunnel_task;
enum LifecycleResult {
Closed,
Aborted,
Evicted,
}

pub struct PegboardRunnerWsCustomServe {
Expand Down Expand Up @@ -222,34 +223,40 @@ impl CustomServeTrait for PegboardRunnerWsCustomServe {
);

// Determine single result from all tasks
let lifecycle_res = match (tunnel_to_ws_res, ws_to_tunnel_res, ping_res) {
let mut lifecycle_res = match (tunnel_to_ws_res, ws_to_tunnel_res, ping_res) {
// Prefer error
(Err(err), _, _) => Err(err),
(_, Err(err), _) => Err(err),
(_, _, Err(err)) => Err(err),
// Prefer non aborted result if both succeed
// Prefer non aborted result
(Ok(res), Ok(LifecycleResult::Aborted), _) => Ok(res),
(Ok(LifecycleResult::Aborted), Ok(res), _) => Ok(res),
// Unlikely case
(res, _, _) => res,
};

// Make runner immediately ineligible when it disconnects
let update_alloc_res = self
.ctx
.op(pegboard::ops::runner::update_alloc_idx::Input {
runners: vec![pegboard::ops::runner::update_alloc_idx::Runner {
runner_id: conn.runner_id,
action: Action::ClearIdx,
}],
})
.await;
if let Err(err) = update_alloc_res {
tracing::error!(
runner_id=?conn.runner_id,
?err,
"critical: failed to evict runner from allocation index during disconnect"
);
if let Ok(LifecycleResult::Evicted) = &lifecycle_res {
lifecycle_res = Err(errors::WsError::Eviction.build());
}
// Clear alloc idx if not evicted
else {
// Make runner immediately ineligible when it disconnects
let update_alloc_res = self
.ctx
.op(pegboard::ops::runner::update_alloc_idx::Input {
runners: vec![pegboard::ops::runner::update_alloc_idx::Runner {
runner_id: conn.runner_id,
action: Action::ClearIdx,
}],
})
.await;
if let Err(err) = update_alloc_res {
tracing::error!(
runner_id=?conn.runner_id,
?err,
"failed to evict runner from allocation index during disconnect"
);
}
}

tracing::debug!(%topic, "runner websocket closed");
Expand Down
2 changes: 1 addition & 1 deletion engine/packages/pegboard-runner/src/tunnel_to_ws_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ async fn recv_msg(
])
.inc();

return Err(errors::WsError::Eviction.build());
return Ok(Err(LifecycleResult::Evicted));
}
_ = tunnel_to_ws_abort_rx.changed() => {
tracing::debug!("task aborted");
Expand Down
2 changes: 1 addition & 1 deletion engine/packages/pegboard-runner/src/ws_to_tunnel_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ async fn recv_msg(
])
.inc();

return Err(errors::WsError::Eviction.build());
return Ok(Err(LifecycleResult::Evicted));
}
_ = ws_to_tunnel_abort_rx.changed() => {
tracing::debug!("task aborted");
Expand Down
13 changes: 8 additions & 5 deletions engine/packages/pegboard/src/ops/runner/update_alloc_idx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ pub async fn pegboard_runner_update_alloc_idx(ctx: &OperationCtx, input: &Input)
let protocol_version_key =
keys::runner::ProtocolVersionKey::new(runner.runner_id);
let last_ping_ts_key = keys::runner::LastPingTsKey::new(runner.runner_id);
let drain_ts_key = keys::runner::DrainTsKey::new(runner.runner_id);
let expired_ts_key = keys::runner::ExpiredTsKey::new(runner.runner_id);

let (
Expand All @@ -80,7 +81,8 @@ pub async fn pegboard_runner_update_alloc_idx(ctx: &OperationCtx, input: &Input)
total_slots_entry,
protocol_version_entry,
last_ping_ts_entry,
expired_ts_entry,
draining,
expired,
) = tokio::try_join!(
tx.read_opt(&workflow_id_key, Serializable),
tx.read_opt(&namespace_id_key, Serializable),
Expand All @@ -90,7 +92,8 @@ pub async fn pegboard_runner_update_alloc_idx(ctx: &OperationCtx, input: &Input)
tx.read_opt(&total_slots_key, Serializable),
tx.read_opt(&protocol_version_key, Serializable),
tx.read_opt(&last_ping_ts_key, Serializable),
tx.read_opt(&expired_ts_key, Serializable),
tx.exists(&drain_ts_key, Serializable),
tx.exists(&expired_ts_key, Serializable),
)?;

let (
Expand Down Expand Up @@ -118,7 +121,7 @@ pub async fn pegboard_runner_update_alloc_idx(ctx: &OperationCtx, input: &Input)
let protocol_version = protocol_version_entry.unwrap_or(PROTOCOL_MK1_VERSION);

// Runner is expired, AddIdx is invalid and UpdatePing will do nothing
if expired_ts_entry.is_some() {
if expired {
match runner.action {
Action::ClearIdx => {}
Action::AddIdx | Action::UpdatePing { .. } => {
Expand Down Expand Up @@ -171,8 +174,8 @@ pub async fn pegboard_runner_update_alloc_idx(ctx: &OperationCtx, input: &Input)
let last_rtt_key = keys::runner::LastRttKey::new(runner.runner_id);
tx.write(&last_rtt_key, rtt)?;

// Only update allocation idx if it existed before
if tx.exists(&old_alloc_key, Serializable).await? {
// Only update allocation idx if not draining
if !draining {
// Clear old key
tx.delete(&old_alloc_key);

Expand Down
1 change: 0 additions & 1 deletion engine/packages/pegboard/src/workflows/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -691,7 +691,6 @@ async fn clear_db(ctx: &ActivityCtx, input: &ClearDbInput) -> Result<()> {
match input.update_state {
RunnerState::Draining => {
tx.write(&keys::runner::DrainTsKey::new(input.runner_id), now)?;
tx.write(&keys::runner::ExpiredTsKey::new(input.runner_id), now)?;
}
RunnerState::Stopped => {
tx.write(&keys::runner::StopTsKey::new(input.runner_id), now)?;
Expand Down
20 changes: 12 additions & 8 deletions engine/packages/pegboard/src/workflows/runner2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -484,13 +484,18 @@ struct MarkEligibleInput {
#[activity(MarkEligible)]
async fn mark_eligible(ctx: &ActivityCtx, input: &MarkEligibleInput) -> Result<()> {
// Mark eligible
ctx.op(crate::ops::runner::update_alloc_idx::Input {
runners: vec![crate::ops::runner::update_alloc_idx::Runner {
runner_id: input.runner_id,
action: crate::ops::runner::update_alloc_idx::Action::AddIdx,
}],
})
.await?;
let res = ctx
.op(crate::ops::runner::update_alloc_idx::Input {
runners: vec![crate::ops::runner::update_alloc_idx::Runner {
runner_id: input.runner_id,
action: crate::ops::runner::update_alloc_idx::Action::AddIdx,
}],
})
.await?;

if !res.notifications.is_empty() {
tracing::warn!(notifs=?res.notifications, runner_id=?input.runner_id, "non-empty update alloc idx response");
}

Ok(())
}
Expand Down Expand Up @@ -535,7 +540,6 @@ async fn clear_db(ctx: &ActivityCtx, input: &ClearDbInput) -> Result<()> {
match input.update_state {
RunnerState::Draining => {
tx.write(&keys::runner::DrainTsKey::new(input.runner_id), now)?;
tx.write(&keys::runner::ExpiredTsKey::new(input.runner_id), now)?;
}
RunnerState::Stopped => {
tx.write(&keys::runner::StopTsKey::new(input.runner_id), now)?;
Expand Down
2 changes: 1 addition & 1 deletion engine/packages/universaldb/src/driver/rocksdb/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub struct RocksDbDatabaseDriver {

impl RocksDbDatabaseDriver {
pub async fn new(db_path: PathBuf) -> Result<Self> {
tracing::info!(?db_path, "starting file system driver");
tracing::info!(db_path=%db_path.display(), "starting file system driver");

// Create directory if it doesn't exist
std::fs::create_dir_all(&db_path).context("failed to create database directory")?;
Expand Down
Loading