Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion engine/packages/gasoline/src/db/kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1166,6 +1166,7 @@ impl Database for DatabaseKv {

anyhow::Ok(buffer)
}
.custom_instrument(tracing::debug_span!("read_wake_conditions"))
)?;

// Sort for consistency across all workers
Expand Down Expand Up @@ -1312,7 +1313,7 @@ impl Database for DatabaseKv {
.buffer_unordered(1024)
.try_filter_map(|x| std::future::ready(Ok(x)))
.try_collect::<Vec<_>>()
.instrument(tracing::trace_span!("map_to_leased_workflows"))
.custom_instrument(tracing::debug_span!("map_to_leased_workflows"))
.await?;

// Clear all wake conditions from workflows that we have leased
Expand Down
2 changes: 1 addition & 1 deletion engine/packages/guard-core/src/proxy_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ impl ProxyService {
}

/// Process an individual request.
#[tracing::instrument(name = "guard_request", skip_all, fields(ray_id, req_id))]
#[tracing::instrument(name = "guard_request", skip_all, fields(ray_id, req_id, uri=%req.uri()))]
pub async fn process(&self, mut req: Request<BodyIncoming>) -> Result<Response<ResponseBody>> {
let start_time = Instant::now();

Expand Down
48 changes: 32 additions & 16 deletions engine/packages/namespace/src/ops/get_local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,40 @@ pub async fn namespace_get_local(ctx: &OperationCtx, input: &Input) -> Result<Ve
return Err(errors::Namespace::NotLeader.build());
}

let namespaces = ctx
.udb()?
.run(|tx| async move {
futures_util::stream::iter(input.namespace_ids.clone())
.map(|namespace_id| {
let tx = tx.clone();
ctx.cache()
.clone()
.request()
.fetch_all_json(
"namespace.get_local",
input.namespace_ids.clone(),
move |mut cache, namespace_ids| async move {
let namespace_ids = &namespace_ids;
let namespaces = ctx
.udb()?
.run(|tx| async move {
futures_util::stream::iter(namespace_ids.clone())
.map(|namespace_id| {
let tx = tx.clone();

async move { get_inner(namespace_id, &tx).await }
})
.buffer_unordered(1024)
.try_filter_map(|x| std::future::ready(Ok(x)))
.try_collect::<Vec<_>>()
.await
})
.custom_instrument(tracing::info_span!("namespace_get_local_tx"))
.await?;
async move { get_inner(namespace_id, &tx).await }
})
.buffer_unordered(1024)
.try_filter_map(|x| std::future::ready(Ok(x)))
.try_collect::<Vec<_>>()
.await
})
.custom_instrument(tracing::info_span!("namespace_get_local_tx"))
.await?;

Ok(namespaces)
for ns in namespaces {
let namespace_id = ns.namespace_id;
cache.resolve(&&namespace_id, ns);
}

Ok(cache)
},
)
.await
}

pub(crate) async fn get_inner(
Expand Down
58 changes: 39 additions & 19 deletions engine/packages/namespace/src/ops/resolve_for_name_local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,44 @@ pub async fn namespace_resolve_for_name_local(
return Err(errors::Namespace::NotLeader.build());
}

ctx.udb()?
.run(|tx| {
let name = input.name.clone();
async move {
let tx = tx.with_subspace(keys::subspace());

let Some(namespace_id) = tx
.read_opt(&keys::ByNameKey::new(name.clone()), Serializable)
.await?
else {
// Namespace not found
return Ok(None);
};

get_inner(namespace_id, &tx).await
}
})
.custom_instrument(tracing::info_span!("namespace_resolve_for_name_local_tx"))
ctx.cache()
.clone()
.request()
.fetch_one_json(
"namespace.resolve_for_name_local",
input.name.clone(),
move |mut cache, key| {
async move {
let ns = ctx
.udb()?
.run(|tx| {
let name = input.name.clone();
async move {
let tx = tx.with_subspace(keys::subspace());

let Some(namespace_id) = tx
.read_opt(&keys::ByNameKey::new(name.clone()), Serializable)
.await?
else {
// Namespace not found
return Ok(None);
};

get_inner(namespace_id, &tx).await
}
})
.custom_instrument(tracing::info_span!(
"namespace_resolve_for_name_local_tx"
))
.await?;

if let Some(ns) = ns {
cache.resolve(&key, ns);
}

Ok(cache)
}
},
)
.await
.map_err(Into::into)
}
7 changes: 6 additions & 1 deletion engine/packages/pegboard-gateway2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,8 @@ impl PegboardGateway2 {
}

Err(ServiceUnavailable.build())
};
}
.instrument(tracing::info_span!("wait_for_tunnel_response"));
let response_start_timeout = Duration::from_millis(
self.ctx
.config()
Expand Down Expand Up @@ -864,6 +865,7 @@ impl PegboardGateway2 {
}
}

#[tracing::instrument(skip_all)]
async fn hibernate_ws(ws_rx: Arc<Mutex<WebSocketReceiver>>) -> Result<HibernationResult> {
let mut guard = ws_rx.lock().await;
let mut pinned = std::pin::Pin::new(&mut *guard);
Expand All @@ -888,6 +890,7 @@ async fn hibernate_ws(ws_rx: Arc<Mutex<WebSocketReceiver>>) -> Result<Hibernatio
}
}

#[derive(Debug)]
enum Metric {
HttpIngress(usize),
HttpEgress(usize),
Expand All @@ -899,6 +902,7 @@ enum Metric {
WebsocketStopHibernate,
}

#[tracing::instrument(skip_all, fields(?actor_id, ?metric))]
async fn record_req_metrics(
ctx: &StandaloneCtx,
actor_id: Id,
Expand All @@ -918,6 +922,7 @@ async fn record_req_metrics(

Ok(())
})
.instrument(tracing::info_span!("record_req_metrics_tx"))
.await?;

Ok(())
Expand Down
33 changes: 23 additions & 10 deletions engine/packages/pegboard-outbound/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ pub async fn start(config: rivet_config::Config, pools: rivet_pools::Pools) -> R
res
}

#[tracing::instrument(skip_all)]
async fn inner(ctx: &StandaloneCtx, conns: &mut Vec<OutboundHandler>) -> Result<()> {
let mut sub = ctx
.ups()?
Expand Down Expand Up @@ -136,16 +137,20 @@ struct OutboundHandler {
impl OutboundHandler {
fn new(ctx: &StandaloneCtx, packet: protocol::ToOutbound) -> Self {
let ctx = ctx.clone();
let handle = tokio::spawn(async move {
if let Err(err) = handle(&ctx, packet).await {
tracing::error!(?err, "outbound handler failed");
let handle = tokio::spawn(
async move {
if let Err(err) = handle(&ctx, packet).await {
tracing::error!(?err, "outbound handler failed");
}
}
});
.instrument(tracing::info_span!("outbound_handle_task")),
);

OutboundHandler { handle }
}
}

#[tracing::instrument(skip_all)]
async fn handle(ctx: &StandaloneCtx, packet: protocol::ToOutbound) -> Result<()> {
let (namespace_id, pool_name, checkpoint, actor_config) = match packet {
protocol::ToOutbound::ToOutboundActorStart(protocol::ToOutboundActorStart {
Expand Down Expand Up @@ -246,6 +251,7 @@ async fn handle(ctx: &StandaloneCtx, packet: protocol::ToOutbound) -> Result<()>
res
}

#[tracing::instrument(skip_all)]
async fn serverless_outbound_req(
ctx: &StandaloneCtx,
namespace_id: Id,
Expand Down Expand Up @@ -295,18 +301,24 @@ async fn serverless_outbound_req(

let endpoint_url = format!("{}/start", url.trim_end_matches('/'));

tracing::debug!(%endpoint_url, "sending outbound req");

let client = rivet_pools::reqwest::client_no_timeout().await?;
let req = client.post(endpoint_url).body(payload).headers(headers);

let req = client
.post(endpoint_url.clone())
.body(payload)
.headers(headers);
let mut source = sse::EventSource::new(req).context("failed creating event source")?;

tracing::debug!(%endpoint_url, "sending outbound req");

let stream_handler = async {
tracing::debug!(%endpoint_url, "stream handler future");

while let Some(event) = source.next().await {
match event {
Ok(event) => match event {
sse::Event::Open => {}
sse::Event::Open => {
tracing::debug!(%endpoint_url, "sse stream opened");
}
sse::Event::Message(msg) => match msg.event.as_str() {
"ping" => {}
event => {
Expand Down Expand Up @@ -421,7 +433,7 @@ async fn serverless_outbound_req(
}

return res;
},
}
_ = tokio::time::sleep(sleep_until_drain) => {}
_ = term_signal.recv() => {}
}
Expand Down Expand Up @@ -453,6 +465,7 @@ async fn serverless_outbound_req(
}

/// Report an error to the error tracker workflow.
#[tracing::instrument(skip_all)]
async fn report_error(
ctx: &StandaloneCtx,
namespace_id: Id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,6 @@ pub async fn list_runner_config_epoxy_replica_ids(
ctx: &OperationCtx,
input: &Input,
) -> Result<Output> {
let start = std::time::Instant::now();
let replicas = list_runner_config_epoxy_replica_ids_inner(ctx, input).await?;
tracing::debug!(
duration_ms = %start.elapsed().as_millis(),
?replicas,
"list_runner_config_epoxy_replica_ids completed"
);

Ok(Output { replicas })
}

async fn list_runner_config_epoxy_replica_ids_inner(
ctx: &OperationCtx,
input: &Input,
) -> Result<Vec<ReplicaId>> {
let (enabled_dcs, cluster_config) = tokio::try_join!(
ctx.op(crate::ops::runner::list_runner_config_enabled_dcs::Input {
namespace_id: input.namespace_id,
Expand Down Expand Up @@ -62,5 +47,5 @@ async fn list_runner_config_epoxy_replica_ids_inner(
bail!("resolved runner config epoxy replica set is empty");
}

Ok(replicas)
Ok(Output { replicas })
}
Loading
Loading