From b45303772f6d8797b5229a057491a45ea58095e9 Mon Sep 17 00:00:00 2001 From: MasterPtato Date: Fri, 3 Apr 2026 20:58:18 -0700 Subject: [PATCH] fix: optimize e2e actor path --- engine/packages/gasoline/src/db/kv/mod.rs | 3 +- .../packages/guard-core/src/proxy_service.rs | 2 +- .../packages/namespace/src/ops/get_local.rs | 48 +++++--- .../src/ops/resolve_for_name_local.rs | 58 ++++++--- engine/packages/pegboard-gateway2/src/lib.rs | 7 +- engine/packages/pegboard-outbound/src/lib.rs | 33 ++++-- .../list_runner_config_epoxy_replica_ids.rs | 17 +-- .../pegboard/src/workflows/actor2/mod.rs | 91 +++++++++++++++ .../pegboard/src/workflows/actor2/runtime.rs | 8 +- engine/packages/pools/src/pools.rs | 4 + .../envoy-client/src/tasks/envoy/index.ts | 13 ++- .../sdks/typescript/test-envoy/package.json | 2 + .../sdks/typescript/test-envoy/src/index.ts | 49 ++++++-- engine/sdks/typescript/test-envoy/src/log.ts | 19 ++- .../sdks/typescript/test-envoy/src/logfmt.ts | 110 ++++++++++++++++++ pnpm-lock.yaml | 52 ++++----- scripts/run/docker/engine-rocksdb-release.sh | 1 - scripts/tests/actor_e2e.ts | 2 + 18 files changed, 397 insertions(+), 122 deletions(-) create mode 100644 engine/sdks/typescript/test-envoy/src/logfmt.ts diff --git a/engine/packages/gasoline/src/db/kv/mod.rs b/engine/packages/gasoline/src/db/kv/mod.rs index 61e07a9461..9767bafa18 100644 --- a/engine/packages/gasoline/src/db/kv/mod.rs +++ b/engine/packages/gasoline/src/db/kv/mod.rs @@ -1166,6 +1166,7 @@ impl Database for DatabaseKv { anyhow::Ok(buffer) } + .custom_instrument(tracing::debug_span!("read_wake_conditions")) )?; // Sort for consistency across all workers @@ -1312,7 +1313,7 @@ impl Database for DatabaseKv { .buffer_unordered(1024) .try_filter_map(|x| std::future::ready(Ok(x))) .try_collect::>() - .instrument(tracing::trace_span!("map_to_leased_workflows")) + .custom_instrument(tracing::debug_span!("map_to_leased_workflows")) .await?; // Clear all wake conditions from workflows that we have leased diff --git a/engine/packages/guard-core/src/proxy_service.rs b/engine/packages/guard-core/src/proxy_service.rs index 2783edc568..9494415ec2 100644 --- a/engine/packages/guard-core/src/proxy_service.rs +++ b/engine/packages/guard-core/src/proxy_service.rs @@ -331,7 +331,7 @@ impl ProxyService { } /// Process an individual request. - #[tracing::instrument(name = "guard_request", skip_all, fields(ray_id, req_id))] + #[tracing::instrument(name = "guard_request", skip_all, fields(ray_id, req_id, uri=%req.uri()))] pub async fn process(&self, mut req: Request) -> Result> { let start_time = Instant::now(); diff --git a/engine/packages/namespace/src/ops/get_local.rs b/engine/packages/namespace/src/ops/get_local.rs index e72a25c5da..e7a5e01779 100644 --- a/engine/packages/namespace/src/ops/get_local.rs +++ b/engine/packages/namespace/src/ops/get_local.rs @@ -16,24 +16,40 @@ pub async fn namespace_get_local(ctx: &OperationCtx, input: &Input) -> Result>() - .await - }) - .custom_instrument(tracing::info_span!("namespace_get_local_tx")) - .await?; + async move { get_inner(namespace_id, &tx).await } + }) + .buffer_unordered(1024) + .try_filter_map(|x| std::future::ready(Ok(x))) + .try_collect::>() + .await + }) + .custom_instrument(tracing::info_span!("namespace_get_local_tx")) + .await?; - Ok(namespaces) + for ns in namespaces { + let namespace_id = ns.namespace_id; + cache.resolve(&&namespace_id, ns); + } + + Ok(cache) + }, + ) + .await } pub(crate) async fn get_inner( diff --git a/engine/packages/namespace/src/ops/resolve_for_name_local.rs b/engine/packages/namespace/src/ops/resolve_for_name_local.rs index 5aaff209cf..6adfabfc81 100644 --- a/engine/packages/namespace/src/ops/resolve_for_name_local.rs +++ b/engine/packages/namespace/src/ops/resolve_for_name_local.rs @@ -18,24 +18,44 @@ pub async fn namespace_resolve_for_name_local( return Err(errors::Namespace::NotLeader.build()); } - ctx.udb()? - .run(|tx| { - let name = input.name.clone(); - async move { - let tx = tx.with_subspace(keys::subspace()); - - let Some(namespace_id) = tx - .read_opt(&keys::ByNameKey::new(name.clone()), Serializable) - .await? - else { - // Namespace not found - return Ok(None); - }; - - get_inner(namespace_id, &tx).await - } - }) - .custom_instrument(tracing::info_span!("namespace_resolve_for_name_local_tx")) + ctx.cache() + .clone() + .request() + .fetch_one_json( + "namespace.resolve_for_name_local", + input.name.clone(), + move |mut cache, key| { + async move { + let ns = ctx + .udb()? + .run(|tx| { + let name = input.name.clone(); + async move { + let tx = tx.with_subspace(keys::subspace()); + + let Some(namespace_id) = tx + .read_opt(&keys::ByNameKey::new(name.clone()), Serializable) + .await? + else { + // Namespace not found + return Ok(None); + }; + + get_inner(namespace_id, &tx).await + } + }) + .custom_instrument(tracing::info_span!( + "namespace_resolve_for_name_local_tx" + )) + .await?; + + if let Some(ns) = ns { + cache.resolve(&key, ns); + } + + Ok(cache) + } + }, + ) .await - .map_err(Into::into) } diff --git a/engine/packages/pegboard-gateway2/src/lib.rs b/engine/packages/pegboard-gateway2/src/lib.rs index 1bae75d6ba..b70d4c961a 100644 --- a/engine/packages/pegboard-gateway2/src/lib.rs +++ b/engine/packages/pegboard-gateway2/src/lib.rs @@ -244,7 +244,8 @@ impl PegboardGateway2 { } Err(ServiceUnavailable.build()) - }; + } + .instrument(tracing::info_span!("wait_for_tunnel_response")); let response_start_timeout = Duration::from_millis( self.ctx .config() @@ -864,6 +865,7 @@ impl PegboardGateway2 { } } +#[tracing::instrument(skip_all)] async fn hibernate_ws(ws_rx: Arc>) -> Result { let mut guard = ws_rx.lock().await; let mut pinned = std::pin::Pin::new(&mut *guard); @@ -888,6 +890,7 @@ async fn hibernate_ws(ws_rx: Arc>) -> Result R res } +#[tracing::instrument(skip_all)] async fn inner(ctx: &StandaloneCtx, conns: &mut Vec) -> Result<()> { let mut sub = ctx .ups()? @@ -136,16 +137,20 @@ struct OutboundHandler { impl OutboundHandler { fn new(ctx: &StandaloneCtx, packet: protocol::ToOutbound) -> Self { let ctx = ctx.clone(); - let handle = tokio::spawn(async move { - if let Err(err) = handle(&ctx, packet).await { - tracing::error!(?err, "outbound handler failed"); + let handle = tokio::spawn( + async move { + if let Err(err) = handle(&ctx, packet).await { + tracing::error!(?err, "outbound handler failed"); + } } - }); + .instrument(tracing::info_span!("outbound_handle_task")), + ); OutboundHandler { handle } } } +#[tracing::instrument(skip_all)] async fn handle(ctx: &StandaloneCtx, packet: protocol::ToOutbound) -> Result<()> { let (namespace_id, pool_name, checkpoint, actor_config) = match packet { protocol::ToOutbound::ToOutboundActorStart(protocol::ToOutboundActorStart { @@ -246,6 +251,7 @@ async fn handle(ctx: &StandaloneCtx, packet: protocol::ToOutbound) -> Result<()> res } +#[tracing::instrument(skip_all)] async fn serverless_outbound_req( ctx: &StandaloneCtx, namespace_id: Id, @@ -295,18 +301,24 @@ async fn serverless_outbound_req( let endpoint_url = format!("{}/start", url.trim_end_matches('/')); - tracing::debug!(%endpoint_url, "sending outbound req"); - let client = rivet_pools::reqwest::client_no_timeout().await?; - let req = client.post(endpoint_url).body(payload).headers(headers); - + let req = client + .post(endpoint_url.clone()) + .body(payload) + .headers(headers); let mut source = sse::EventSource::new(req).context("failed creating event source")?; + tracing::debug!(%endpoint_url, "sending outbound req"); + let stream_handler = async { + tracing::debug!(%endpoint_url, "stream handler future"); + while let Some(event) = source.next().await { match event { Ok(event) => match event { - sse::Event::Open => {} + sse::Event::Open => { + tracing::debug!(%endpoint_url, "sse stream opened"); + } sse::Event::Message(msg) => match msg.event.as_str() { "ping" => {} event => { @@ -421,7 +433,7 @@ async fn serverless_outbound_req( } return res; - }, + } _ = tokio::time::sleep(sleep_until_drain) => {} _ = term_signal.recv() => {} } @@ -453,6 +465,7 @@ async fn serverless_outbound_req( } /// Report an error to the error tracker workflow. +#[tracing::instrument(skip_all)] async fn report_error( ctx: &StandaloneCtx, namespace_id: Id, diff --git a/engine/packages/pegboard/src/ops/runner/list_runner_config_epoxy_replica_ids.rs b/engine/packages/pegboard/src/ops/runner/list_runner_config_epoxy_replica_ids.rs index 7e3027cb1d..24c12059a5 100644 --- a/engine/packages/pegboard/src/ops/runner/list_runner_config_epoxy_replica_ids.rs +++ b/engine/packages/pegboard/src/ops/runner/list_runner_config_epoxy_replica_ids.rs @@ -20,21 +20,6 @@ pub async fn list_runner_config_epoxy_replica_ids( ctx: &OperationCtx, input: &Input, ) -> Result { - let start = std::time::Instant::now(); - let replicas = list_runner_config_epoxy_replica_ids_inner(ctx, input).await?; - tracing::debug!( - duration_ms = %start.elapsed().as_millis(), - ?replicas, - "list_runner_config_epoxy_replica_ids completed" - ); - - Ok(Output { replicas }) -} - -async fn list_runner_config_epoxy_replica_ids_inner( - ctx: &OperationCtx, - input: &Input, -) -> Result> { let (enabled_dcs, cluster_config) = tokio::try_join!( ctx.op(crate::ops::runner::list_runner_config_enabled_dcs::Input { namespace_id: input.namespace_id, @@ -62,5 +47,5 @@ async fn list_runner_config_epoxy_replica_ids_inner( bail!("resolved runner config epoxy replica set is empty"); } - Ok(replicas) + Ok(Output { replicas }) } diff --git a/engine/packages/pegboard/src/workflows/actor2/mod.rs b/engine/packages/pegboard/src/workflows/actor2/mod.rs index 5a3b2e8c51..9c03614030 100644 --- a/engine/packages/pegboard/src/workflows/actor2/mod.rs +++ b/engine/packages/pegboard/src/workflows/actor2/mod.rs @@ -823,6 +823,89 @@ async fn process_signal( } } } + Main::Sleep(_) => { + match &mut state.transition { + Transition::Allocating { .. } + | Transition::Starting { .. } + | Transition::GoingAway { .. } => { + // TODO: Set to sleep after allocation is complete + } + Transition::Running { envoy, .. } => { + let envoy_key = envoy.envoy_key.clone(); + let now = ctx.activity(runtime::GetTsInput {}).await?; + + // Transition to sleep intent + state.transition = Transition::SleepIntent { + envoy: std::mem::take(envoy), + lost_timeout_ts: now + ctx.config().pegboard().actor_stop_threshold(), + rewake_after_stop: false, + }; + + ctx.activity(runtime::SetSleepingInput {}).await?; + + ctx.activity(runtime::InsertAndSendCommandsInput { + generation: state.generation, + envoy_key, + commands: vec![protocol::Command::CommandStopActor( + protocol::CommandStopActor { + reason: protocol::StopActorReason::SleepIntent, + }, + )], + }) + .await?; + } + Transition::Reallocating { .. } => { + // Stop reallocating + state.transition = Transition::Sleeping; + } + Transition::SleepIntent { .. } + | Transition::StopIntent { .. } + | Transition::Sleeping + | Transition::Destroying { .. } => {} + } + } + Main::Reallocate(_) => { + match &mut state.transition { + Transition::Running { envoy, .. } => { + let now = ctx.activity(runtime::GetTsInput {}).await?; + let envoy_key = envoy.envoy_key.clone(); + + // Transition to going away + state.transition = Transition::GoingAway { + envoy: std::mem::take(envoy), + lost_timeout_ts: now + ctx.config().pegboard().actor_stop_threshold(), + }; + + ctx.activity(runtime::InsertAndSendCommandsInput { + generation: state.generation, + envoy_key, + commands: vec![protocol::Command::CommandStopActor( + protocol::CommandStopActor { + reason: protocol::StopActorReason::GoingAway, + }, + )], + }) + .await?; + } + Transition::SleepIntent { envoy, .. } | Transition::StopIntent { envoy, .. } => { + let now = ctx.activity(runtime::GetTsInput {}).await?; + + state.transition = Transition::GoingAway { + envoy: std::mem::take(envoy), + lost_timeout_ts: now + ctx.config().pegboard().actor_stop_threshold(), + }; + + // Stop command was already sent + } + Transition::Allocating { .. } + | Transition::Starting { .. } + | Transition::Sleeping + | Transition::Reallocating { .. } => { + // Do nothing, already mid allocation + } + Transition::GoingAway { .. } | Transition::Destroying { .. } => {} + } + } Main::Lost(sig) => { // Ignore signals for previous generations if sig.generation != state.generation { @@ -1118,6 +1201,12 @@ pub struct Events { #[signal("pegboard_actor2_wake")] pub struct Wake {} +#[signal("pegboard_actor2_sleep")] +pub struct Sleep {} + +#[signal("pegboard_actor2_reallocate")] +pub struct Reallocate {} + /// Ack response from outbound req handler service. #[signal("pegboard_actor2_allocated")] pub struct Allocated { @@ -1161,6 +1250,8 @@ join_signal!(Main { Allocated, Events, Wake, + Sleep, + Reallocate, Lost, GoingAway, Destroy, diff --git a/engine/packages/pegboard/src/workflows/actor2/runtime.rs b/engine/packages/pegboard/src/workflows/actor2/runtime.rs index a9ea626afa..5e08ecd023 100644 --- a/engine/packages/pegboard/src/workflows/actor2/runtime.rs +++ b/engine/packages/pegboard/src/workflows/actor2/runtime.rs @@ -396,21 +396,21 @@ pub async fn reschedule_actor( if let Some(allocation) = allocate_res.allocation { state.generation += 1; - let now = ctx.activity(GetTsInput {}).await?; - match &allocation { Allocation::Serverless => { // Transition to allocating state.transition = Transition::Allocating { destroy_after_start: false, - lost_timeout_ts: now + ctx.config().pegboard().actor_allocation_threshold(), + lost_timeout_ts: allocate_res.now + + ctx.config().pegboard().actor_allocation_threshold(), }; } Allocation::Serverful { .. } => { // Transition to starting state.transition = Transition::Starting { destroy_after_start: false, - lost_timeout_ts: now + ctx.config().pegboard().actor_start_threshold(), + lost_timeout_ts: allocate_res.now + + ctx.config().pegboard().actor_start_threshold(), }; } } diff --git a/engine/packages/pools/src/pools.rs b/engine/packages/pools/src/pools.rs index ecd49512e2..41611cdbdf 100644 --- a/engine/packages/pools/src/pools.rs +++ b/engine/packages/pools/src/pools.rs @@ -37,6 +37,10 @@ impl Pools { udb, })); + // Initialize here to avoid cold starts elsewhere + crate::reqwest::client().await?; + crate::reqwest::client_no_timeout().await?; + Ok(pool) } diff --git a/engine/sdks/typescript/envoy-client/src/tasks/envoy/index.ts b/engine/sdks/typescript/envoy-client/src/tasks/envoy/index.ts index c716a4b398..489d9f3018 100644 --- a/engine/sdks/typescript/envoy-client/src/tasks/envoy/index.ts +++ b/engine/sdks/typescript/envoy-client/src/tasks/envoy/index.ts @@ -165,10 +165,6 @@ export function startEnvoySync(config: EnvoyConfig): EnvoyHandle { } } - log(ctx.shared)?.info({ - msg: "stopping envoy", - }); - // Cleanup ctx.shared.wsTx?.send({ type: "close", code: 1000, reason: "envoy.shutdown" }); clearInterval(ackInterval); @@ -185,6 +181,12 @@ export function startEnvoySync(config: EnvoyConfig): EnvoyHandle { } } ctx.actors.clear(); + + log(ctx.shared)?.info({ + msg: "envoy stopped", + }); + + ctx.shared.config.onShutdown(); }); // Queue start actor @@ -270,6 +272,8 @@ function handleShutdown(ctx: EnvoyContext) { if (ctx.shuttingDown) return; ctx.shuttingDown = true; + log(ctx.shared)?.debug({ msg: "envoy received shutdown" }); + wsSend(ctx.shared, { tag: "ToRivetStopping", val: null, @@ -341,7 +345,6 @@ function createHandle( return { shutdown(immediate: boolean) { ctx.shared.envoyTx.send({ type: "shutdown" }); - ctx.shared.config.onShutdown(); }, getProtocolMetadata(): protocol.ProtocolMetadata | undefined { diff --git a/engine/sdks/typescript/test-envoy/package.json b/engine/sdks/typescript/test-envoy/package.json index 45739e673e..ad55c8228f 100644 --- a/engine/sdks/typescript/test-envoy/package.json +++ b/engine/sdks/typescript/test-envoy/package.json @@ -4,6 +4,7 @@ "type": "module", "scripts": { "start": "tsx src/index.ts", + "start:bun": "bun src/index.ts", "build": "tsup src/index.ts", "check-types": "tsc --noEmit" }, @@ -16,6 +17,7 @@ "ws": "^8.18.3" }, "devDependencies": { + "@types/bun": "^1.3.11", "@types/node": "^22.18.1", "@types/ws": "^8.18.1", "tinybench": "^5.0.1", diff --git a/engine/sdks/typescript/test-envoy/src/index.ts b/engine/sdks/typescript/test-envoy/src/index.ts index d4f56d8ebc..ccc835db69 100644 --- a/engine/sdks/typescript/test-envoy/src/index.ts +++ b/engine/sdks/typescript/test-envoy/src/index.ts @@ -1,4 +1,3 @@ -import { serve } from "@hono/node-server"; import * as protocol from "@rivetkit/engine-envoy-protocol"; import { EnvoyHandle, startEnvoy, startEnvoySync } from "@rivetkit/engine-envoy-client"; import { Hono, type Context as HonoContext, type Next } from "hono"; @@ -176,6 +175,10 @@ app.get("/shutdown", async (c) => { }); app.post("/api/rivet/start", async (c) => { + getLogger().info({ + msg: `Received SSE request`, + }); + let payload = await c.req.arrayBuffer(); return streamSSE(c, async (stream) => { @@ -197,6 +200,38 @@ app.post("/api/rivet/start", async (c) => { }); }); +app.post("/foo", async (c) => { + let payload = await c.req.arrayBuffer(); + getLogger().info({ + msg: `Received SSE request`, + payload, + }); + + return streamSSE(c, async (stream) => { + const stopped = Promise.withResolvers(); + + // Use setTimeout to immediately defer back to SSE + setTimeout(() => { + const envoy = startEnvoySync({ + ...config, + serverlessStartPayload: payload, + onShutdown() { + stopped.resolve(); + } + }); + + c.req.raw.signal.addEventListener("abort", () => { + getLogger().debug("SSE aborted, shutting down runner"); + envoy!.shutdown(true); + }); + }, 0); + + await stream.writeSSE({ event: "ping", data: "" }); + + await stopped.promise; + }); +}); + app.get("/api/rivet/metadata", async (c) => { return c.json({ // Not actually rivetkit @@ -207,10 +242,12 @@ app.get("/api/rivet/metadata", async (c) => { }); if (AUTOSTART_SERVER) { - serve({ - fetch: app.fetch, - port: INTERNAL_SERVER_PORT, - }); + if (process.versions.bun) { + Bun.serve({ fetch: app.fetch, port: INTERNAL_SERVER_PORT, idleTimeout: 0 }); + } else { + const { serve } = await import("@hono/node-server"); + serve({ fetch: app.fetch, port: INTERNAL_SERVER_PORT }); + } getLogger().info( `Internal HTTP server listening on port ${INTERNAL_SERVER_PORT}`, ); @@ -270,5 +307,3 @@ async function autoConfigureServerless() { ); } } - -export default app; diff --git a/engine/sdks/typescript/test-envoy/src/log.ts b/engine/sdks/typescript/test-envoy/src/log.ts index bf1eb53a03..a05ece3819 100644 --- a/engine/sdks/typescript/test-envoy/src/log.ts +++ b/engine/sdks/typescript/test-envoy/src/log.ts @@ -1,11 +1,10 @@ -import { inspect } from "node:util"; import { - type Level, type LevelWithSilent, type Logger, pino, stdTimeFunctions, } from "pino"; +import { castToLogValue, formatTimestamp, stringify } from "./logfmt"; export type { Logger } from "pino"; @@ -38,9 +37,9 @@ function customWrite(level: string, o: any) { const entries: any = {}; // Add timestamp if enabled - if (process.env["LOG_TIMESTAMP"] === "1" && o.time) { + if (process.env["LOG_TIMESTAMP"] !== "0" && o.time) { const date = typeof o.time === "number" ? new Date(o.time) : new Date(); - entries.ts = date; + entries.ts = formatTimestamp(date); } // Add level @@ -66,15 +65,11 @@ function customWrite(level: string, o: any) { key !== "pid" && key !== "hostname" ) { - entries[key] = value; + entries[key] = castToLogValue(value); } } - const output = inspect(entries, { - compact: true, - breakLength: Infinity, - colors: true, - }); + const output = stringify(entries); console.log(output); } @@ -94,7 +89,7 @@ export async function configureDefaultLogger(): Promise { }, }, timestamp: - process.env["LOG_TIMESTAMP"] === "1" + process.env["LOG_TIMESTAMP"] !== "0" ? stdTimeFunctions.epochTime : false, browser: { @@ -121,7 +116,7 @@ export async function configureDefaultLogger(): Promise { }; const levelName = levelMap[level] || "info"; const time = - process.env["LOG_TIMESTAMP"] === "1" + process.env["LOG_TIMESTAMP"] !== "0" ? Date.now() : undefined; // TODO: This can be simplified in logfmt.ts diff --git a/engine/sdks/typescript/test-envoy/src/logfmt.ts b/engine/sdks/typescript/test-envoy/src/logfmt.ts new file mode 100644 index 0000000000..53666bba03 --- /dev/null +++ b/engine/sdks/typescript/test-envoy/src/logfmt.ts @@ -0,0 +1,110 @@ +import { inspect } from "node:util"; + +type LogLevel = "TRACE" | "DEBUG" | "INFO" | "WARN" | "ERROR" | "FATAL"; + +interface LoggerConfig { + enableInspect: boolean; +} + +export const LOGGER_CONFIG: LoggerConfig = { + enableInspect: true, +}; + +const LOG_LEVEL_COLORS: Record = { + FATAL: "\x1b[31m", + ERROR: "\x1b[31m", + WARN: "\x1b[33m", + INFO: "\x1b[32m", + DEBUG: "\x1b[36m", + TRACE: "\x1b[36m", +}; + +const RESET_COLOR = "\x1b[0m"; + +export function stringify(data: Record): string { + let line = ""; + const entries = Object.entries(data); + + for (let i = 0; i < entries.length; i++) { + const [key, valueRaw] = entries[i]; + + let isNull = false; + let valueString: string; + if (valueRaw == null) { + isNull = true; + valueString = ""; + } else { + valueString = String(valueRaw); + } + + // Clip value unless specifically the error message + if (valueString.length > 512 && key !== "msg" && key !== "error") { + valueString = `${valueString.slice(0, 512)}...`; + } + + const needsQuoting = + valueString.indexOf(" ") > -1 || valueString.indexOf("=") > -1; + const needsEscaping = + valueString.indexOf('"') > -1 || valueString.indexOf("\\") > -1; + + valueString = valueString.replace(/\n/g, "\\n"); + if (needsEscaping) valueString = valueString.replace(/["\\]/g, "\\$&"); + if (needsQuoting || needsEscaping) valueString = `"${valueString}"`; + if (valueString === "" && !isNull) valueString = '""'; + + // Special message colors + let color = "\x1b[2m"; + if (key === "level") { + const levelColor = LOG_LEVEL_COLORS[valueString as LogLevel]; + if (levelColor) { + color = levelColor; + } + } else if (key === "msg") { + color = "\x1b[32m"; + } + + line += `\x1b[0m\x1b[1m${key}\x1b[0m\x1b[2m=\x1b[0m${color}${valueString}${RESET_COLOR}`; + + if (i !== entries.length - 1) { + line += " "; + } + } + + return line; +} + +export function formatTimestamp(date: Date): string { + const year = date.getUTCFullYear(); + const month = String(date.getUTCMonth() + 1).padStart(2, "0"); + const day = String(date.getUTCDate()).padStart(2, "0"); + const hours = String(date.getUTCHours()).padStart(2, "0"); + const minutes = String(date.getUTCMinutes()).padStart(2, "0"); + const seconds = String(date.getUTCSeconds()).padStart(2, "0"); + const milliseconds = String(date.getUTCMilliseconds()).padStart(3, "0"); + + return `${year}-${month}-${day}T${hours}:${minutes}:${seconds}.${milliseconds}Z`; +} + +export function castToLogValue(v: unknown): unknown { + if ( + typeof v === "string" || + typeof v === "number" || + typeof v === "bigint" || + typeof v === "boolean" || + v === null || + v === undefined + ) { + return v; + } + if (LOGGER_CONFIG.enableInspect) { + return inspect(v, { compact: true, breakLength: Infinity, colors: false }); + } + if (v instanceof Error) { + return String(v); + } + try { + return JSON.stringify(v); + } catch { + return "[cannot stringify]"; + } +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 19f88ac16b..e410a86a49 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -272,6 +272,9 @@ importers: specifier: ^8.18.3 version: 8.19.0 devDependencies: + '@types/bun': + specifier: latest + version: 1.3.11 '@types/node': specifier: ^22.18.1 version: 22.19.15 @@ -1509,9 +1512,6 @@ importers: examples/hono: dependencies: - '@hono/node-server': - specifier: ^1.19.12 - version: 1.19.12(hono@4.11.9) hono: specifier: ^4.7.0 version: 4.11.9 @@ -2614,7 +2614,7 @@ importers: version: 4.3.19(react@19.1.0)(zod@3.25.76) drizzle-orm: specifier: ^0.44.2 - version: 0.44.6(@cloudflare/workers-types@4.20251014.0)(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.13)(@types/pg@8.16.0)(@types/sql.js@1.4.9)(better-sqlite3@12.8.0)(bun-types@1.3.0(@types/react@19.2.13))(kysely@0.28.8)(pg@8.17.2)(sql.js@1.13.0) + version: 0.44.6(@cloudflare/workers-types@4.20251014.0)(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.13)(@types/pg@8.16.0)(@types/sql.js@1.4.9)(better-sqlite3@12.8.0)(bun-types@1.3.11)(kysely@0.28.8)(pg@8.17.2)(sql.js@1.13.0) fdb-tuple: specifier: ^1.0.0 version: 1.0.0 @@ -2785,7 +2785,7 @@ importers: version: 4.3.19(react@19.1.0)(zod@3.25.76) drizzle-orm: specifier: ^0.44.2 - version: 0.44.6(@cloudflare/workers-types@4.20251014.0)(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.13)(@types/pg@8.16.0)(@types/sql.js@1.4.9)(better-sqlite3@12.8.0)(bun-types@1.3.0(@types/react@19.2.13))(kysely@0.28.8)(pg@8.17.2)(sql.js@1.13.0) + version: 0.44.6(@cloudflare/workers-types@4.20251014.0)(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.13)(@types/pg@8.16.0)(@types/sql.js@1.4.9)(better-sqlite3@12.8.0)(bun-types@1.3.11)(kysely@0.28.8)(pg@8.17.2)(sql.js@1.13.0) fdb-tuple: specifier: ^1.0.0 version: 1.0.0 @@ -2938,7 +2938,7 @@ importers: version: 0.31.5 drizzle-orm: specifier: ^0.44.2 - version: 0.44.6(@cloudflare/workers-types@4.20251014.0)(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.13)(@types/pg@8.16.0)(@types/sql.js@1.4.9)(better-sqlite3@12.8.0)(bun-types@1.3.0(@types/react@19.2.13))(kysely@0.28.8)(pg@8.17.2)(sql.js@1.13.0) + version: 0.44.6(@cloudflare/workers-types@4.20251014.0)(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.13)(@types/pg@8.16.0)(@types/sql.js@1.4.9)(better-sqlite3@12.8.0)(bun-types@1.3.11)(kysely@0.28.8)(pg@8.17.2)(sql.js@1.13.0) devDependencies: '@types/node': specifier: ^22.13.9 @@ -4270,7 +4270,7 @@ importers: version: 4.0.9 drizzle-orm: specifier: ^0.44.2 - version: 0.44.6(@cloudflare/workers-types@4.20251014.0)(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.13)(@types/pg@8.16.0)(@types/sql.js@1.4.9)(better-sqlite3@12.8.0)(bun-types@1.3.0(@types/react@19.2.13))(kysely@0.28.8)(pg@8.17.2)(sql.js@1.13.0) + version: 0.44.6(@cloudflare/workers-types@4.20251014.0)(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.13)(@types/pg@8.16.0)(@types/sql.js@1.4.9)(better-sqlite3@12.8.0)(bun-types@1.3.11)(kysely@0.28.8)(pg@8.17.2)(sql.js@1.13.0) eventsource: specifier: ^4.0.0 version: 4.0.0 @@ -4787,7 +4787,7 @@ importers: version: 19.2.13 drizzle-orm: specifier: ^0.38.0 - version: 0.38.4(@cloudflare/workers-types@4.20251014.0)(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.13)(@types/pg@8.16.0)(@types/react@19.2.13)(@types/sql.js@1.4.9)(better-sqlite3@12.8.0)(bun-types@1.3.0(@types/react@19.2.13))(kysely@0.28.8)(pg@8.17.2)(react@19.1.0)(sql.js@1.13.0) + version: 0.38.4(@cloudflare/workers-types@4.20251014.0)(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.13)(@types/pg@8.16.0)(@types/react@19.2.13)(@types/sql.js@1.4.9)(better-sqlite3@12.8.0)(bun-types@1.3.11)(kysely@0.28.8)(pg@8.17.2)(react@19.1.0)(sql.js@1.13.0) typescript: specifier: ^5.7.3 version: 5.9.3 @@ -5970,6 +5970,7 @@ packages: '@copilotkit/llmock@1.6.0': resolution: {integrity: sha512-wq4J7ampjoEiOi6v2d7GMK5lTZcTnuhMduSPCIwmyxBTCPA3lekXyNKGJ4t3xM5OgoJReMQ5KmlfrMBVTRNGsA==} engines: {node: '>=20.15.0'} + deprecated: This package has moved to @copilotkit/aimock hasBin: true '@cspotcode/source-map-support@0.8.1': @@ -7267,12 +7268,6 @@ packages: peerDependencies: hono: ^4 - '@hono/node-server@1.19.12': - resolution: {integrity: sha512-txsUW4SQ1iilgE0l9/e9VQWmELXifEFvmdA1j6WFh/aFPj99hIntrSsq/if0UWyGVkmrRPKA1wCeP+UCr1B9Uw==} - engines: {node: '>=18.14.1'} - peerDependencies: - hono: ^4 - '@hono/node-server@1.19.9': resolution: {integrity: sha512-vHL6w3ecZsky+8P5MD+eFfaGTyCeOHUIFYMGpQGbrBTSmNNoxv0if69rEZ5giu36weC5saFuznL411gRX7bJDw==} engines: {node: '>=18.14.1'} @@ -10617,6 +10612,9 @@ packages: '@types/better-sqlite3@7.6.13': resolution: {integrity: sha512-NMv9ASNARoKksWtsq/SHakpYAYnhBrQgGD8zkLYk/jaK8jUGn08CfEdTRgYhMypUQAfzSP8W6gNLe0q19/t4VA==} + '@types/bun@1.3.11': + resolution: {integrity: sha512-5vPne5QvtpjGpsGYXiFyycfpDF2ECyPcTSsFBMa0fraoxiQyMJ3SmuQIGhzPg2WJuWxVBoxWJ2kClYTcw/4fAg==} + '@types/canvas-confetti@1.9.0': resolution: {integrity: sha512-aBGj/dULrimR1XDZLtG9JwxX1b4HPRF6CX9Yfwh3NvstZEm1ZL7RBnel4keCPSqs1ANRu1u2Aoz9R+VmtjYuTg==} @@ -11919,10 +11917,8 @@ packages: builtin-status-codes@3.0.0: resolution: {integrity: sha512-HpGFw18DgFWlncDfjTa2rcQ4W88O1mC8e8yZ2AvQY5KDaktSTwo+KRf6nHK6FRI5FyRyb/5T6+TSxfP7QyGsmQ==} - bun-types@1.3.0: - resolution: {integrity: sha512-u8X0thhx+yJ0KmkxuEo9HAtdfgCBaM/aI9K90VQcQioAmkVp3SG3FkwWGibUFz3WdXAdcsqOcbU40lK7tbHdkQ==} - peerDependencies: - '@types/react': ^19 + bun-types@1.3.11: + resolution: {integrity: sha512-1KGPpoxQWl9f6wcZh57LvrPIInQMn2TQ7jsgxqpRzg+l0QPOFvJVH7HmvHo/AiPgwXy+/Thf6Ov3EdVn1vOabg==} bundle-name@4.1.0: resolution: {integrity: sha512-tjwM5exMg6BGRI+kNmTntNsvdZS1X8BFYS6tnJ2hdH0kVxM6/eVZ2xy+FqStSWvYmtfFMDLIxurorHwDKfDz5Q==} @@ -22221,10 +22217,6 @@ snapshots: dependencies: hono: 4.11.9 - '@hono/node-server@1.19.12(hono@4.11.9)': - dependencies: - hono: 4.11.9 - '@hono/node-server@1.19.9(hono@4.11.9)': dependencies: hono: 4.11.9 @@ -26616,6 +26608,10 @@ snapshots: '@types/node': 22.19.15 optional: true + '@types/bun@1.3.11': + dependencies: + bun-types: 1.3.11 + '@types/canvas-confetti@1.9.0': {} '@types/chai@5.2.3': @@ -28459,11 +28455,9 @@ snapshots: builtin-status-codes@3.0.0: {} - bun-types@1.3.0(@types/react@19.2.13): + bun-types@1.3.11: dependencies: '@types/node': 22.19.15 - '@types/react': 19.2.13 - optional: true bundle-name@4.1.0: dependencies: @@ -29515,7 +29509,7 @@ snapshots: transitivePeerDependencies: - supports-color - drizzle-orm@0.38.4(@cloudflare/workers-types@4.20251014.0)(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.13)(@types/pg@8.16.0)(@types/react@19.2.13)(@types/sql.js@1.4.9)(better-sqlite3@12.8.0)(bun-types@1.3.0(@types/react@19.2.13))(kysely@0.28.8)(pg@8.17.2)(react@19.1.0)(sql.js@1.13.0): + drizzle-orm@0.38.4(@cloudflare/workers-types@4.20251014.0)(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.13)(@types/pg@8.16.0)(@types/react@19.2.13)(@types/sql.js@1.4.9)(better-sqlite3@12.8.0)(bun-types@1.3.11)(kysely@0.28.8)(pg@8.17.2)(react@19.1.0)(sql.js@1.13.0): optionalDependencies: '@cloudflare/workers-types': 4.20251014.0 '@opentelemetry/api': 1.9.0 @@ -29524,13 +29518,13 @@ snapshots: '@types/react': 19.2.13 '@types/sql.js': 1.4.9 better-sqlite3: 12.8.0 - bun-types: 1.3.0(@types/react@19.2.13) + bun-types: 1.3.11 kysely: 0.28.8 pg: 8.17.2 react: 19.1.0 sql.js: 1.13.0 - drizzle-orm@0.44.6(@cloudflare/workers-types@4.20251014.0)(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.13)(@types/pg@8.16.0)(@types/sql.js@1.4.9)(better-sqlite3@12.8.0)(bun-types@1.3.0(@types/react@19.2.13))(kysely@0.28.8)(pg@8.17.2)(sql.js@1.13.0): + drizzle-orm@0.44.6(@cloudflare/workers-types@4.20251014.0)(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.13)(@types/pg@8.16.0)(@types/sql.js@1.4.9)(better-sqlite3@12.8.0)(bun-types@1.3.11)(kysely@0.28.8)(pg@8.17.2)(sql.js@1.13.0): optionalDependencies: '@cloudflare/workers-types': 4.20251014.0 '@opentelemetry/api': 1.9.0 @@ -29538,7 +29532,7 @@ snapshots: '@types/pg': 8.16.0 '@types/sql.js': 1.4.9 better-sqlite3: 12.8.0 - bun-types: 1.3.0(@types/react@19.2.13) + bun-types: 1.3.11 kysely: 0.28.8 pg: 8.17.2 sql.js: 1.13.0 diff --git a/scripts/run/docker/engine-rocksdb-release.sh b/scripts/run/docker/engine-rocksdb-release.sh index 1b0a86768f..08a8381c59 100755 --- a/scripts/run/docker/engine-rocksdb-release.sh +++ b/scripts/run/docker/engine-rocksdb-release.sh @@ -6,5 +6,4 @@ REPO_ROOT="$(cd "${SCRIPT_DIR}/../../.." && pwd)" cd "${REPO_ROOT}" -RUST_LOG=warn \ cargo run --release --bin rivet-engine -- start 2>&1 | tee -i /tmp/rivet-engine.log diff --git a/scripts/tests/actor_e2e.ts b/scripts/tests/actor_e2e.ts index 9e1518865d..75395a4aae 100755 --- a/scripts/tests/actor_e2e.ts +++ b/scripts/tests/actor_e2e.ts @@ -30,6 +30,7 @@ async function main() { // Make a request to the actor console.log("Making request to actor..."); + console.log(new Date().toISOString()); console.time("ping 1"); const actorPingResponse = await fetch(`${RIVET_ENDPOINT}/ping`, { method: "GET", @@ -40,6 +41,7 @@ async function main() { }, }); console.timeEnd("ping 1"); + console.log(new Date().toISOString()); const pingResult = await actorPingResponse.text();