diff --git a/engine/artifacts/config-schema.json b/engine/artifacts/config-schema.json index 345223bad4..594bd3cf2a 100644 --- a/engine/artifacts/config-schema.json +++ b/engine/artifacts/config-schema.json @@ -532,6 +532,14 @@ ], "format": "int64" }, + "actor_retry_duration_threshold": { + "description": "How long to wait after starting to attempt to reallocate before before setting actor to sleep.\n\nUnit is in milliseconds.", + "type": [ + "integer", + "null" + ], + "format": "int64" + }, "actor_start_threshold": { "description": "How long to wait after creating and not receiving a starting state before setting actor as lost.\n\nUnit is in milliseconds.", "type": [ diff --git a/engine/artifacts/openapi.json b/engine/artifacts/openapi.json index 5f8800a9c2..773c1abd56 100644 --- a/engine/artifacts/openapi.json +++ b/engine/artifacts/openapi.json @@ -11,7 +11,7 @@ "name": "Apache-2.0", "identifier": "Apache-2.0" }, - "version": "2.2.0" + "version": "2.2.1" }, "paths": { "/actors": { diff --git a/engine/packages/api-peer/src/actors/delete.rs b/engine/packages/api-peer/src/actors/delete.rs index c9c285aa98..1339a4b4a9 100644 --- a/engine/packages/api-peer/src/actors/delete.rs +++ b/engine/packages/api-peer/src/actors/delete.rs @@ -19,9 +19,10 @@ use rivet_util::Id; #[tracing::instrument(skip_all)] pub async fn delete(ctx: ApiCtx, path: DeletePath, query: DeleteQuery) -> Result { // Subscribe before fetching actor data - let mut destroy_sub = ctx - .subscribe::(("actor_id", path.actor_id)) - .await?; + let (mut destroy_sub, mut destroy_sub2) = tokio::try_join!( + ctx.subscribe::(("actor_id", path.actor_id)), + ctx.subscribe::(("actor_id", path.actor_id)), + )?; let (actors_res, namespace_res) = tokio::try_join!( // Get the actor to verify it exists @@ -52,21 +53,33 @@ pub async fn delete(ctx: ApiCtx, path: DeletePath, query: DeleteQuery) -> Result return Err(pegboard::errors::Actor::NotFound.build()); } - // TODO: Actor v2 + // Try actor2 first, then fallback to actor let res = ctx - .signal(pegboard::workflows::actor::Destroy {}) - .to_workflow::() + .signal(pegboard::workflows::actor2::Destroy {}) + .to_workflow::() .tag("actor_id", path.actor_id) .graceful_not_found() .send() .await?; if res.is_none() { - tracing::warn!( - actor_id=?path.actor_id, - "actor workflow not found, likely already stopped" - ); + let res = ctx + .signal(pegboard::workflows::actor::Destroy {}) + .to_workflow::() + .tag("actor_id", path.actor_id) + .graceful_not_found() + .send() + .await?; + + if res.is_none() { + tracing::warn!( + actor_id=?path.actor_id, + "actor workflow not found, likely already stopped" + ); + } else { + destroy_sub.next().await?; + } } else { - destroy_sub.next().await?; + destroy_sub2.next().await?; } Ok(DeleteResponse {}) diff --git a/engine/packages/config/src/config/pegboard.rs b/engine/packages/config/src/config/pegboard.rs index 663af4779c..e5bae9c70d 100644 --- a/engine/packages/config/src/config/pegboard.rs +++ b/engine/packages/config/src/config/pegboard.rs @@ -20,6 +20,10 @@ pub struct Pegboard { /// /// Unit is in milliseconds. pub actor_stop_threshold: Option, + /// How long to wait after starting to attempt to reallocate before before setting actor to sleep. + /// + /// Unit is in milliseconds. + pub actor_retry_duration_threshold: Option, /// How long an actor goes without retries before it's retry count is reset to 0, effectively resetting its /// backoff to 0. /// @@ -163,6 +167,10 @@ impl Pegboard { self.actor_stop_threshold.unwrap_or(30_000) } + pub fn actor_retry_duration_threshold(&self) -> i64 { + self.actor_retry_duration_threshold.unwrap_or(300_000) + } + pub fn retry_reset_duration(&self) -> i64 { self.retry_reset_duration.unwrap_or(10 * 60 * 1000) } diff --git a/engine/packages/guard/src/routing/pegboard_gateway.rs b/engine/packages/guard/src/routing/pegboard_gateway.rs index e06fdb70b0..deec250617 100644 --- a/engine/packages/guard/src/routing/pegboard_gateway.rs +++ b/engine/packages/guard/src/routing/pegboard_gateway.rs @@ -189,6 +189,7 @@ async fn route_request_inner( stopped_sub, fail_sub, destroy_sub, + migrate_sub, ready_sub2, stopped_sub2, fail_sub2, @@ -198,6 +199,7 @@ async fn route_request_inner( ctx.subscribe::(("actor_id", actor_id)), ctx.subscribe::(("actor_id", actor_id)), ctx.subscribe::(("actor_id", actor_id)), + ctx.subscribe::(("actor_id", actor_id)), ctx.subscribe::(("actor_id", actor_id)), ctx.subscribe::(("actor_id", actor_id)), ctx.subscribe::(("actor_id", actor_id)), @@ -218,6 +220,12 @@ async fn route_request_inner( match actor.version { 2 => { + drop(ready_sub); + drop(stopped_sub); + drop(fail_sub); + drop(destroy_sub); + drop(migrate_sub); + handle_actor_v2( ctx, shared_state, @@ -242,6 +250,11 @@ async fn route_request_inner( stopped_sub, fail_sub, destroy_sub, + migrate_sub, + ready_sub2, + stopped_sub2, + fail_sub2, + destroy_sub2, ) .await } @@ -359,6 +372,11 @@ async fn handle_actor_v1( mut stopped_sub: SubscriptionHandle, mut fail_sub: SubscriptionHandle, mut destroy_sub: SubscriptionHandle, + mut migrate_sub: SubscriptionHandle, + ready_sub2: SubscriptionHandle, + stopped_sub2: SubscriptionHandle, + fail_sub2: SubscriptionHandle, + destroy_sub2: SubscriptionHandle, ) -> Result { // Wake actor if sleeping if actor.sleeping { @@ -382,11 +400,9 @@ async fn handle_actor_v1( let mut wake_retries = 0; // Create pool error check future - let pool_error_check_fut = check_runner_pool_error_loop( - ctx, - actor.namespace_id, - actor.runner_name_selector.as_deref(), - ); + let runner_name_selector = actor.runner_name_selector.clone(); + let pool_error_check_fut = + check_runner_pool_error_loop(ctx, actor.namespace_id, runner_name_selector.as_deref()); tokio::pin!(pool_error_check_fut); // Wait for ready, fail, or destroy @@ -430,6 +446,20 @@ async fn handle_actor_v1( res?; return Err(pegboard::errors::Actor::DestroyedWhileWaitingForReady.build()); } + res = migrate_sub.next() => { + res?; + return handle_actor_v2( + ctx, + shared_state, + actor_id, + actor, + stripped_path, + ready_sub2, + stopped_sub2, + fail_sub2, + destroy_sub2, + ).await; + } res = &mut pool_error_check_fut => { if res? { return Err(errors::ActorRunnerFailed { actor_id }.build()); diff --git a/engine/packages/pegboard/src/workflows/actor/mod.rs b/engine/packages/pegboard/src/workflows/actor/mod.rs index d29a1f8b32..0dbf5d7750 100644 --- a/engine/packages/pegboard/src/workflows/actor/mod.rs +++ b/engine/packages/pegboard/src/workflows/actor/mod.rs @@ -235,6 +235,7 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> input: input.input.clone(), from_v1: true, }) + .tag("actor_id", input.actor_id) .dispatch() .await?; return Ok(()); @@ -845,15 +846,36 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> .send() .await?; - ctx.workflow(destroy::Input { - namespace_id: input.namespace_id, - actor_id: input.actor_id, - name: input.name.clone(), - key: input.key.clone(), - generation: lifecycle_res.generation, - }) - .output() - .await?; + if lifecycle_res.migrate_to_v2 { + ctx.workflow(crate::workflows::actor2::Input { + actor_id: input.actor_id, + name: input.name.clone(), + pool_name: input.runner_name_selector.clone(), + key: input.key.clone(), + namespace_id: input.namespace_id, + crash_policy: input.crash_policy, + input: input.input.clone(), + from_v1: true, + }) + .tag("actor_id", input.actor_id) + .dispatch() + .await?; + + ctx.msg(MigratedToV2 {}) + .topic(("actor_id", input.actor_id)) + .send() + .await?; + } else { + ctx.workflow(destroy::Input { + namespace_id: input.namespace_id, + actor_id: input.actor_id, + name: input.name.clone(), + key: input.key.clone(), + generation: lifecycle_res.generation, + }) + .output() + .await?; + } Ok(()) } @@ -1382,6 +1404,9 @@ pub struct GoingAway { pub reset_rescheduling: bool, } +#[message("pegboard_actor_migrated_to_v2")] +pub struct MigratedToV2 {} + #[signal("pegboard_actor_destroy")] pub struct Destroy {} diff --git a/engine/packages/pegboard/src/workflows/actor2/mod.rs b/engine/packages/pegboard/src/workflows/actor2/mod.rs index 47d70c0705..5a3b2e8c51 100644 --- a/engine/packages/pegboard/src/workflows/actor2/mod.rs +++ b/engine/packages/pegboard/src/workflows/actor2/mod.rs @@ -477,26 +477,8 @@ async fn listen_for_signals( signals } } - Transition::Sleeping { - attempting_reallocation, - } => { - if *attempting_reallocation { - let next_retry_ts = state.retry_backoff_state.get_next_retry_ts(ctx).await?; - - let signals = if let Some(next_retry_ts) = next_retry_ts { - // Listen for signals with timeout - ctx.listen_n_until::
(next_retry_ts, 256).await? - } else { - Vec::new() - }; - - // Attempt reallocation - if signals.is_empty() { - runtime::reschedule_actor(ctx, &input, state, metrics_workflow_id).await?; - } - - signals - } else if let Some(alarm_ts) = state.alarm_ts { + Transition::Sleeping => { + if let Some(alarm_ts) = state.alarm_ts { // Listen for signals with timeout. if a timeout happens, it means this actor should // wake up let signals = ctx.listen_n_until::
(alarm_ts, 256).await?; @@ -513,6 +495,32 @@ async fn listen_for_signals( ctx.listen_n::
(256).await? } } + Transition::Reallocating { since_ts } => { + let next_retry_ts = state.retry_backoff_state.get_next_retry_ts(ctx).await?; + + // If the actor has been retrying for too long, set it to sleep + if state.retry_backoff_state.last_retry_ts + > *since_ts + ctx.config().pegboard().actor_retry_duration_threshold() + { + state.transition = Transition::Sleeping; + + Vec::new() + } else { + let signals = if let Some(next_retry_ts) = next_retry_ts { + // Listen for signals with timeout + ctx.listen_n_until::
(next_retry_ts, 256).await? + } else { + Vec::new() + }; + + // Attempt reallocation + if signals.is_empty() { + runtime::reschedule_actor(ctx, &input, state, metrics_workflow_id).await?; + } + + signals + } + } }; Ok(signals) @@ -537,11 +545,12 @@ async fn process_signal( .. } = &state.transition { + let now = ctx.activity(runtime::GetTsInput {}).await?; + // Transition to starting state.transition = Transition::Starting { destroy_after_start: *destroy_after_start, - lost_timeout_ts: util::timestamp::now() - + ctx.config().pegboard().actor_start_threshold(), + lost_timeout_ts: now + ctx.config().pegboard().actor_start_threshold(), }; } @@ -566,12 +575,16 @@ async fn process_signal( return Ok(Loop::Continue); } } - Transition::Allocating { .. } | Transition::Sleeping { .. } => { + Transition::Allocating { .. } + | Transition::Sleeping + | Transition::Reallocating { .. } => { tracing::warn!(?sig, "actor not allocated, ignoring events"); return Ok(Loop::Continue); } } + let now = ctx.activity(runtime::GetTsInput {}).await?; + // Fetch the last event index for the current envoy or default to -1 (if still starting) let last_event_idx = state .transition @@ -604,7 +617,7 @@ async fn process_signal( // Transition to sleep intent state.transition = Transition::SleepIntent { envoy: std::mem::take(envoy), - lost_timeout_ts: util::timestamp::now() + lost_timeout_ts: now + ctx.config().pegboard().actor_stop_threshold(), rewake_after_stop: false, }; @@ -628,7 +641,7 @@ async fn process_signal( // Transition to stop intent state.transition = Transition::StopIntent { envoy: std::mem::take(envoy), - lost_timeout_ts: util::timestamp::now() + lost_timeout_ts: now + ctx.config().pegboard().actor_stop_threshold(), }; @@ -662,7 +675,7 @@ async fn process_signal( // Transition to destroying state.transition = Transition::Destroying { envoy: runtime::EnvoyState::new(sig.envoy_key.clone()), - lost_timeout_ts: util::timestamp::now() + lost_timeout_ts: now + ctx.config().pegboard().actor_stop_threshold(), }; @@ -680,7 +693,7 @@ async fn process_signal( // Transition to starting state.transition = Transition::Running { envoy: runtime::EnvoyState::new(sig.envoy_key.clone()), - last_liveness_check_ts: util::timestamp::now(), + last_liveness_check_ts: now, }; ctx.activity(runtime::SetConnectableInput { @@ -779,7 +792,7 @@ async fn process_signal( Main::Wake(_) => { // Clear alarm if let Some(alarm_ts) = state.alarm_ts { - let now = ctx.activity(GetTsInput {}).await?; + let now = ctx.activity(runtime::GetTsInput {}).await?; if now >= alarm_ts { state.alarm_ts = None; @@ -787,7 +800,7 @@ async fn process_signal( } match &mut state.transition { - Transition::Sleeping { .. } => { + Transition::Sleeping => { runtime::reschedule_actor(ctx, &input, state, metrics_workflow_id).await?; } Transition::SleepIntent { @@ -836,12 +849,13 @@ async fn process_signal( match &mut state.transition { Transition::Running { envoy, .. } => { + let now = ctx.activity(runtime::GetTsInput {}).await?; let envoy_key = envoy.envoy_key.clone(); + // Transition to going away state.transition = Transition::GoingAway { envoy: std::mem::take(envoy), - lost_timeout_ts: util::timestamp::now() - + ctx.config().pegboard().actor_stop_threshold(), + lost_timeout_ts: now + ctx.config().pegboard().actor_stop_threshold(), }; ctx.activity(runtime::InsertAndSendCommandsInput { @@ -856,17 +870,19 @@ async fn process_signal( .await?; } Transition::SleepIntent { envoy, .. } | Transition::StopIntent { envoy, .. } => { + let now = ctx.activity(runtime::GetTsInput {}).await?; + state.transition = Transition::GoingAway { envoy: std::mem::take(envoy), - lost_timeout_ts: util::timestamp::now() - + ctx.config().pegboard().actor_stop_threshold(), + lost_timeout_ts: now + ctx.config().pegboard().actor_stop_threshold(), }; // Stop command was already sent } Transition::Allocating { .. } | Transition::Starting { .. } - | Transition::Sleeping { .. } => { + | Transition::Sleeping + | Transition::Reallocating { .. } => { tracing::warn!(transition=?state.transition, "should not be reachable"); } Transition::GoingAway { .. } | Transition::Destroying { .. } => {} @@ -875,13 +891,13 @@ async fn process_signal( Main::Destroy(_) => { match &mut state.transition { Transition::Running { envoy, .. } => { + let now = ctx.activity(runtime::GetTsInput {}).await?; let envoy_key = envoy.envoy_key.clone(); // Transition to destroying state.transition = Transition::Destroying { envoy: std::mem::take(envoy), - lost_timeout_ts: util::timestamp::now() - + ctx.config().pegboard().actor_stop_threshold(), + lost_timeout_ts: now + ctx.config().pegboard().actor_stop_threshold(), }; ctx.activity(runtime::InsertAndSendCommandsInput { @@ -898,10 +914,11 @@ async fn process_signal( Transition::SleepIntent { envoy, .. } | Transition::StopIntent { envoy, .. } | Transition::GoingAway { envoy, .. } => { + let now = ctx.activity(runtime::GetTsInput {}).await?; + state.transition = Transition::Destroying { envoy: std::mem::take(envoy), - lost_timeout_ts: util::timestamp::now() - + ctx.config().pegboard().actor_stop_threshold(), + lost_timeout_ts: now + ctx.config().pegboard().actor_stop_threshold(), }; // Stop command was already sent @@ -918,7 +935,7 @@ async fn process_signal( } => { *destroy_after_start = true; } - Transition::Sleeping { .. } => { + Transition::Sleeping | Transition::Reallocating { .. } => { return Ok(Loop::Break(())); } Transition::Destroying { .. } => {} @@ -962,14 +979,6 @@ impl ActorError { } } -#[derive(Debug, Serialize, Deserialize, Hash)] -struct GetTsInput {} - -#[activity(GetTs)] -async fn get_ts(ctx: &ActivityCtx, input: &GetTsInput) -> Result { - Ok(util::timestamp::now()) -} - async fn destroy(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> { ctx.msg(DestroyStarted {}) .topic(("actor_id", input.actor_id)) diff --git a/engine/packages/pegboard/src/workflows/actor2/runtime.rs b/engine/packages/pegboard/src/workflows/actor2/runtime.rs index 76ee019907..a9ea626afa 100644 --- a/engine/packages/pegboard/src/workflows/actor2/runtime.rs +++ b/engine/packages/pegboard/src/workflows/actor2/runtime.rs @@ -11,7 +11,7 @@ use universaldb::prelude::*; use universalpubsub::PublishOpts; use vbare::OwnedVersionedData; -use super::{ActorError, GetTsInput, Input, LostReason, State, Stopped, metrics}; +use super::{ActorError, Input, LostReason, State, Stopped, metrics}; use crate::keys; #[derive(Deserialize, Serialize)] @@ -68,9 +68,9 @@ pub(crate) enum Transition { envoy: EnvoyState, lost_timeout_ts: i64, }, - Sleeping { - /// True if the workflow is currently trying to reallocate with backoff. - attempting_reallocation: bool, + Sleeping, + Reallocating { + since_ts: i64, }, Destroying { envoy: EnvoyState, @@ -88,7 +88,8 @@ impl Transition { | Transition::Destroying { envoy, .. } => Some(envoy), Transition::Allocating { .. } | Transition::Starting { .. } - | Transition::Sleeping { .. } => None, + | Transition::Sleeping + | Transition::Reallocating { .. } => None, } } } @@ -156,6 +157,7 @@ pub enum Allocation { #[derive(Debug, Serialize, Deserialize)] pub struct AllocateOutput { pub allocation: Option, + pub now: i64, } #[activity(Allocate)] @@ -299,7 +301,10 @@ pub async fn allocate(ctx: &ActivityCtx, input: &AllocateInput) -> Result { // Transition to allocating state.transition = Transition::Allocating { destroy_after_start: false, - lost_timeout_ts: util::timestamp::now() - + ctx.config().pegboard().actor_allocation_threshold(), + lost_timeout_ts: now + ctx.config().pegboard().actor_allocation_threshold(), }; } Allocation::Serverful { .. } => { // Transition to starting state.transition = Transition::Starting { destroy_after_start: false, - lost_timeout_ts: util::timestamp::now() - + ctx.config().pegboard().actor_start_threshold(), + lost_timeout_ts: now + ctx.config().pegboard().actor_start_threshold(), }; } } @@ -547,13 +552,13 @@ pub async fn handle_stopped( // Transition to allocating state.transition = Transition::Allocating { destroy_after_start: false, - lost_timeout_ts: util::timestamp::now() + lost_timeout_ts: allocate_res.now + ctx.config().pegboard().actor_allocation_threshold(), }; } else { // Transition to retry backoff - state.transition = Transition::Sleeping { - attempting_reallocation: true, + state.transition = Transition::Reallocating { + since_ts: allocate_res.now, }; } @@ -585,13 +590,13 @@ pub async fn handle_stopped( // Transition to allocating state.transition = Transition::Allocating { destroy_after_start: false, - lost_timeout_ts: util::timestamp::now() + lost_timeout_ts: allocate_res.now + ctx.config().pegboard().actor_allocation_threshold(), }; } else { // Transition to retry backoff - state.transition = Transition::Sleeping { - attempting_reallocation: true, + state.transition = Transition::Reallocating { + since_ts: allocate_res.now, }; } @@ -610,19 +615,17 @@ pub async fn handle_stopped( } // Transition to sleeping - state.transition = Transition::Sleeping { - attempting_reallocation: false, - }; + state.transition = Transition::Sleeping; StoppedResult::Continue } _ => { + let now = ctx.activity(GetTsInput {}).await?; + // Don't destroy on failed allocation, retry instead if let StoppedVariant::FailedAllocation { .. } = &variant { // Transition to retry backoff - state.transition = Transition::Sleeping { - attempting_reallocation: true, - }; + state.transition = Transition::Reallocating { since_ts: now }; StoppedResult::Continue } else { @@ -632,14 +635,12 @@ pub async fn handle_stopped( } } else { // Transition to sleeping - state.transition = Transition::Sleeping { - attempting_reallocation: false, - }; + state.transition = Transition::Sleeping; StoppedResult::Continue }; - if let Transition::Sleeping { .. } = state.transition { + if let Transition::Sleeping = state.transition { ctx.activity(SetSleepingInput {}).await?; } @@ -651,6 +652,14 @@ pub async fn handle_stopped( Ok(stopped_res) } +#[derive(Debug, Serialize, Deserialize, Hash)] +pub struct GetTsInput {} + +#[activity(GetTs)] +pub async fn get_ts(ctx: &ActivityCtx, input: &GetTsInput) -> Result { + Ok(util::timestamp::now()) +} + #[derive(Debug, Serialize, Deserialize, Hash)] pub struct SetErrorInput { pub error: ActorError, diff --git a/engine/packages/pegboard/src/workflows/runner_pool_metadata_poller.rs b/engine/packages/pegboard/src/workflows/runner_pool_metadata_poller.rs index 64de8d6551..32b73c4a3b 100644 --- a/engine/packages/pegboard/src/workflows/runner_pool_metadata_poller.rs +++ b/engine/packages/pegboard/src/workflows/runner_pool_metadata_poller.rs @@ -204,7 +204,7 @@ async fn poll_metadata(ctx: &ActivityCtx, input: &PollMetadataInput) -> Result

>; kvRequests: Map; nextKvRequestId: number; @@ -111,6 +112,7 @@ export function startEnvoySync(config: EnvoyConfig): EnvoyHandle { const ctx: EnvoyContext = { shared, serverless: false, + shuttingDown: false, actors, kvRequests: new Map(), nextKvRequestId: 0, @@ -139,7 +141,7 @@ export function startEnvoySync(config: EnvoyConfig): EnvoyHandle { if (msg.type === "conn-message") { await handleConnMessage(ctx, startTx, lostTimeout, msg.message); } else if (msg.type === "conn-close") { - await handleConnClose(ctx, lostTimeout); + handleConnClose(ctx, lostTimeout); if (msg.evict) break; } else if (msg.type === "send-events") { const stop = handleSendEvents(ctx, msg.events); @@ -155,42 +157,7 @@ export function startEnvoySync(config: EnvoyConfig): EnvoyHandle { } else if (msg.type === "buffer-tunnel-msg") { ctx.bufferedMessages.push(msg.msg); } else if (msg.type === "shutdown") { - wsSend(ctx.shared, { - tag: "ToRivetStopping", - val: null, - }); - - // Start shutdown checker - spawn(async () => { - let i = 0; - - while (true) { - let total = 0; - - // Check for actors with open handles - for (const gens of ctx.actors.values()) { - const last = Array.from(gens.values())[gens.size - 1]; - - if (last && !last.handle.isClosed()) total++; - } - - // Wait until no actors remain - if (total === 0) { - ctx.shared.envoyTx.send({ type: "stop" }); - break; - } - - await sleep(1000); - - if (i % 10 === 0) { - log(ctx.shared)?.info({ - msg: "waiting on actors to stop before shutdown", - actors: total, - }); - } - i++; - } - }); + handleShutdown(ctx); } else if (msg.type === "stop") { break; } else { @@ -228,7 +195,7 @@ export function startEnvoySync(config: EnvoyConfig): EnvoyHandle { return handle; } -async function handleConnMessage( +function handleConnMessage( ctx: EnvoyContext, startTx: WatchSender, lostTimeout: NodeJS.Timeout | undefined, @@ -248,7 +215,7 @@ async function handleConnMessage( startTx.send(); } else if (message.tag === "ToEnvoyCommands") { - await handleCommands(ctx, message.val); + handleCommands(ctx, message.val); } else if (message.tag === "ToEnvoyAckEvents") { handleAckEvents(ctx, message.val); } else if (message.tag === "ToEnvoyKvResponse") { @@ -260,7 +227,7 @@ async function handleConnMessage( } } -async function handleConnClose(ctx: EnvoyContext, lostTimeout: NodeJS.Timeout | undefined) { +function handleConnClose(ctx: EnvoyContext, lostTimeout: NodeJS.Timeout | undefined) { if (!lostTimeout) { let lostThreshold = ctx.shared.protocolMetadata ? Number(ctx.shared.protocolMetadata.envoyLostThreshold) : 10000; log(ctx.shared)?.debug({ @@ -299,6 +266,48 @@ async function handleConnClose(ctx: EnvoyContext, lostTimeout: NodeJS.Timeout | } } +function handleShutdown(ctx: EnvoyContext) { + if (ctx.shuttingDown) return; + ctx.shuttingDown = true; + + wsSend(ctx.shared, { + tag: "ToRivetStopping", + val: null, + }); + + // Start shutdown checker + spawn(async () => { + let i = 0; + + while (true) { + let total = 0; + + // Check for actors with open handles + for (const gens of ctx.actors.values()) { + const last = Array.from(gens.values())[gens.size - 1]; + + if (last && !last.handle.isClosed()) total++; + } + + // Wait until no actors remain + if (total === 0) { + ctx.shared.envoyTx.send({ type: "stop" }); + break; + } + + await sleep(1000); + + if (i % 10 === 0) { + log(ctx.shared)?.info({ + msg: "waiting on actors to stop before shutdown", + actors: total, + }); + } + i++; + } + }); +} + // MARK: Util export function log(ctx: SharedContext) { diff --git a/frontend/src/components/actors/actor-status-label.tsx b/frontend/src/components/actors/actor-status-label.tsx index 15f99e9d6c..effe358ac7 100644 --- a/frontend/src/components/actors/actor-status-label.tsx +++ b/frontend/src/components/actors/actor-status-label.tsx @@ -165,6 +165,9 @@ export function RunnerPoolError({ error }: { error: RivetActorError }) { .with("internal_error", () => (

Internal error occurred in runner pool

)) + .with("downgrade", () => ( +

Runner pool was downgraded to an unsupported version. Revert to the higher version.

+ )) .with("serverless_stream_ended_early", () => (

Connection terminated before runner stopped. Ensure that diff --git a/rivetkit-typescript/packages/rivetkit/src/remote-manager-driver/api-endpoints.ts b/rivetkit-typescript/packages/rivetkit/src/remote-manager-driver/api-endpoints.ts index 75efd4c556..b4c490a2d0 100644 --- a/rivetkit-typescript/packages/rivetkit/src/remote-manager-driver/api-endpoints.ts +++ b/rivetkit-typescript/packages/rivetkit/src/remote-manager-driver/api-endpoints.ts @@ -59,7 +59,7 @@ export async function getOrCreateActor( return apiCall( config, "PUT", - `/actors2`, + `/actors`, request, ); } @@ -72,7 +72,7 @@ export async function createActor( return apiCall( config, "POST", - `/actors2`, + `/actors`, request, ); } diff --git a/scripts/tests/actor_e2e.ts b/scripts/tests/actor_e2e.ts index a97dbf7350..9e1518865d 100755 --- a/scripts/tests/actor_e2e.ts +++ b/scripts/tests/actor_e2e.ts @@ -22,7 +22,7 @@ async function main() { // Create an actor console.log("Creating actor..."); console.time("actor create"); - const actorResponse = await createActor(RIVET_NAMESPACE, "test-envoy"); + const actorResponse = await createActor(RIVET_NAMESPACE, "test"); console.timeEnd("actor create"); console.log("Actor created:", actorResponse.actor); diff --git a/scripts/tests/utils.ts b/scripts/tests/utils.ts index 7b6cd614ef..05999dd9fc 100644 --- a/scripts/tests/utils.ts +++ b/scripts/tests/utils.ts @@ -9,7 +9,7 @@ export async function createActor( withKey: boolean = true ): Promise { const response = await fetch( - `${RIVET_ENDPOINT}/actors2?namespace=${namespaceName}`, + `${RIVET_ENDPOINT}/actors?namespace=${namespaceName}`, { method: "POST", headers: { @@ -21,7 +21,7 @@ export async function createActor( key: withKey ? crypto.randomUUID() : undefined, input: btoa("hello"), runner_name_selector: runnerNameSelector, - crash_policy: "destroy", + crash_policy: "sleep", }), }, );