Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions engine/artifacts/config-schema.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion engine/artifacts/openapi.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

35 changes: 24 additions & 11 deletions engine/packages/api-peer/src/actors/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ use rivet_util::Id;
#[tracing::instrument(skip_all)]
pub async fn delete(ctx: ApiCtx, path: DeletePath, query: DeleteQuery) -> Result<DeleteResponse> {
// Subscribe before fetching actor data
let mut destroy_sub = ctx
.subscribe::<pegboard::workflows::actor::DestroyComplete>(("actor_id", path.actor_id))
.await?;
let (mut destroy_sub, mut destroy_sub2) = tokio::try_join!(
ctx.subscribe::<pegboard::workflows::actor::DestroyComplete>(("actor_id", path.actor_id)),
ctx.subscribe::<pegboard::workflows::actor2::DestroyComplete>(("actor_id", path.actor_id)),
)?;

let (actors_res, namespace_res) = tokio::try_join!(
// Get the actor to verify it exists
Expand Down Expand Up @@ -52,21 +53,33 @@ pub async fn delete(ctx: ApiCtx, path: DeletePath, query: DeleteQuery) -> Result
return Err(pegboard::errors::Actor::NotFound.build());
}

// TODO: Actor v2
// Try actor2 first, then fallback to actor
let res = ctx
.signal(pegboard::workflows::actor::Destroy {})
.to_workflow::<pegboard::workflows::actor::Workflow>()
.signal(pegboard::workflows::actor2::Destroy {})
.to_workflow::<pegboard::workflows::actor2::Workflow>()
.tag("actor_id", path.actor_id)
.graceful_not_found()
.send()
.await?;
if res.is_none() {
tracing::warn!(
actor_id=?path.actor_id,
"actor workflow not found, likely already stopped"
);
let res = ctx
.signal(pegboard::workflows::actor::Destroy {})
.to_workflow::<pegboard::workflows::actor::Workflow>()
.tag("actor_id", path.actor_id)
.graceful_not_found()
.send()
.await?;

if res.is_none() {
tracing::warn!(
actor_id=?path.actor_id,
"actor workflow not found, likely already stopped"
);
} else {
destroy_sub.next().await?;
}
} else {
destroy_sub.next().await?;
destroy_sub2.next().await?;
}

Ok(DeleteResponse {})
Expand Down
8 changes: 8 additions & 0 deletions engine/packages/config/src/config/pegboard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ pub struct Pegboard {
///
/// Unit is in milliseconds.
pub actor_stop_threshold: Option<i64>,
/// How long to wait after starting to attempt to reallocate before before setting actor to sleep.
///
/// Unit is in milliseconds.
pub actor_retry_duration_threshold: Option<i64>,
/// How long an actor goes without retries before it's retry count is reset to 0, effectively resetting its
/// backoff to 0.
///
Expand Down Expand Up @@ -163,6 +167,10 @@ impl Pegboard {
self.actor_stop_threshold.unwrap_or(30_000)
}

pub fn actor_retry_duration_threshold(&self) -> i64 {
self.actor_retry_duration_threshold.unwrap_or(300_000)
}

pub fn retry_reset_duration(&self) -> i64 {
self.retry_reset_duration.unwrap_or(10 * 60 * 1000)
}
Expand Down
40 changes: 35 additions & 5 deletions engine/packages/guard/src/routing/pegboard_gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ async fn route_request_inner(
stopped_sub,
fail_sub,
destroy_sub,
migrate_sub,
ready_sub2,
stopped_sub2,
fail_sub2,
Expand All @@ -198,6 +199,7 @@ async fn route_request_inner(
ctx.subscribe::<pegboard::workflows::actor::Stopped>(("actor_id", actor_id)),
ctx.subscribe::<pegboard::workflows::actor::Failed>(("actor_id", actor_id)),
ctx.subscribe::<pegboard::workflows::actor::DestroyStarted>(("actor_id", actor_id)),
ctx.subscribe::<pegboard::workflows::actor::MigratedToV2>(("actor_id", actor_id)),
ctx.subscribe::<pegboard::workflows::actor2::Ready>(("actor_id", actor_id)),
ctx.subscribe::<pegboard::workflows::actor2::Stopped>(("actor_id", actor_id)),
ctx.subscribe::<pegboard::workflows::actor2::Failed>(("actor_id", actor_id)),
Expand All @@ -218,6 +220,12 @@ async fn route_request_inner(

match actor.version {
2 => {
drop(ready_sub);
drop(stopped_sub);
drop(fail_sub);
drop(destroy_sub);
drop(migrate_sub);

handle_actor_v2(
ctx,
shared_state,
Expand All @@ -242,6 +250,11 @@ async fn route_request_inner(
stopped_sub,
fail_sub,
destroy_sub,
migrate_sub,
ready_sub2,
stopped_sub2,
fail_sub2,
destroy_sub2,
)
.await
}
Expand Down Expand Up @@ -359,6 +372,11 @@ async fn handle_actor_v1(
mut stopped_sub: SubscriptionHandle<pegboard::workflows::actor::Stopped>,
mut fail_sub: SubscriptionHandle<pegboard::workflows::actor::Failed>,
mut destroy_sub: SubscriptionHandle<pegboard::workflows::actor::DestroyStarted>,
mut migrate_sub: SubscriptionHandle<pegboard::workflows::actor::MigratedToV2>,
ready_sub2: SubscriptionHandle<pegboard::workflows::actor2::Ready>,
stopped_sub2: SubscriptionHandle<pegboard::workflows::actor2::Stopped>,
fail_sub2: SubscriptionHandle<pegboard::workflows::actor2::Failed>,
destroy_sub2: SubscriptionHandle<pegboard::workflows::actor2::DestroyStarted>,
) -> Result<RoutingOutput> {
// Wake actor if sleeping
if actor.sleeping {
Expand All @@ -382,11 +400,9 @@ async fn handle_actor_v1(
let mut wake_retries = 0;

// Create pool error check future
let pool_error_check_fut = check_runner_pool_error_loop(
ctx,
actor.namespace_id,
actor.runner_name_selector.as_deref(),
);
let runner_name_selector = actor.runner_name_selector.clone();
let pool_error_check_fut =
check_runner_pool_error_loop(ctx, actor.namespace_id, runner_name_selector.as_deref());
tokio::pin!(pool_error_check_fut);

// Wait for ready, fail, or destroy
Expand Down Expand Up @@ -430,6 +446,20 @@ async fn handle_actor_v1(
res?;
return Err(pegboard::errors::Actor::DestroyedWhileWaitingForReady.build());
}
res = migrate_sub.next() => {
res?;
return handle_actor_v2(
ctx,
shared_state,
actor_id,
actor,
stripped_path,
ready_sub2,
stopped_sub2,
fail_sub2,
destroy_sub2,
).await;
}
res = &mut pool_error_check_fut => {
if res? {
return Err(errors::ActorRunnerFailed { actor_id }.build());
Expand Down
43 changes: 34 additions & 9 deletions engine/packages/pegboard/src/workflows/actor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
input: input.input.clone(),
from_v1: true,
})
.tag("actor_id", input.actor_id)
.dispatch()
.await?;
return Ok(());
Expand Down Expand Up @@ -845,15 +846,36 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
.send()
.await?;

ctx.workflow(destroy::Input {
namespace_id: input.namespace_id,
actor_id: input.actor_id,
name: input.name.clone(),
key: input.key.clone(),
generation: lifecycle_res.generation,
})
.output()
.await?;
if lifecycle_res.migrate_to_v2 {
ctx.workflow(crate::workflows::actor2::Input {
actor_id: input.actor_id,
name: input.name.clone(),
pool_name: input.runner_name_selector.clone(),
key: input.key.clone(),
namespace_id: input.namespace_id,
crash_policy: input.crash_policy,
input: input.input.clone(),
from_v1: true,
})
.tag("actor_id", input.actor_id)
.dispatch()
.await?;

ctx.msg(MigratedToV2 {})
.topic(("actor_id", input.actor_id))
.send()
.await?;
} else {
ctx.workflow(destroy::Input {
namespace_id: input.namespace_id,
actor_id: input.actor_id,
name: input.name.clone(),
key: input.key.clone(),
generation: lifecycle_res.generation,
})
.output()
.await?;
}

Ok(())
}
Expand Down Expand Up @@ -1382,6 +1404,9 @@ pub struct GoingAway {
pub reset_rescheduling: bool,
}

#[message("pegboard_actor_migrated_to_v2")]
pub struct MigratedToV2 {}

#[signal("pegboard_actor_destroy")]
pub struct Destroy {}

Expand Down
Loading
Loading