Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 124 additions & 1 deletion engine/artifacts/openapi.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 1 addition & 3 deletions engine/packages/api-peer/src/actors/get_or_create.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use anyhow::Result;
use gas::prelude::*;
use rivet_api_builder::ApiCtx;
use rivet_api_types::actors::get_or_create::{
GetOrCreateQuery, GetOrCreateRequest, GetOrCreateResponse,
};
use rivet_api_types::actors::get_or_create::*;
use rivet_error::RivetError;

#[tracing::instrument(skip_all)]
Expand Down
24 changes: 1 addition & 23 deletions engine/packages/api-peer/src/actors/kv_get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,8 @@ use base64::Engine;
use base64::prelude::BASE64_STANDARD;
use gas::prelude::*;
use rivet_api_builder::ApiCtx;
use rivet_api_types::actors::kv_get::*;
use rivet_util::Id;
use serde::{Deserialize, Serialize};
use utoipa::ToSchema;

#[derive(Deserialize)]
#[serde(deny_unknown_fields)]
pub struct KvGetPath {
pub actor_id: Id,
pub key: String,
}

#[derive(Debug, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct KvGetQuery {
pub namespace: String,
}

#[derive(Serialize, ToSchema)]
#[schema(as = ActorsKvGetResponse)]
pub struct KvGetResponse {
/// Value encoded in base 64.
pub value: String,
pub update_ts: i64,
}

#[utoipa::path(
get,
Expand Down
2 changes: 2 additions & 0 deletions engine/packages/api-peer/src/actors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,5 @@ pub mod get_or_create;
pub mod kv_get;
pub mod list;
pub mod list_names;
pub mod reschedule;
pub mod sleep;
55 changes: 55 additions & 0 deletions engine/packages/api-peer/src/actors/reschedule.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use anyhow::Result;
use gas::prelude::*;
use rivet_api_builder::ApiCtx;
use rivet_api_types::actors::reschedule::*;

#[tracing::instrument(skip_all)]
pub async fn reschedule(
ctx: ApiCtx,
path: ReschedulePath,
query: RescheduleQuery,
_body: RescheduleRequest,
) -> Result<RescheduleResponse> {
// Get the actor first to verify it exists
let actors_res = ctx
.op(pegboard::ops::actor::get::Input {
actor_ids: vec![path.actor_id],
fetch_error: false,
})
.await?;

let actor = actors_res
.actors
.into_iter()
.next()
.ok_or_else(|| pegboard::errors::Actor::NotFound.build())?;

// Verify the actor belongs to the specified namespace
let namespace = ctx
.op(namespace::ops::resolve_for_name_global::Input {
name: query.namespace,
})
.await?
.ok_or_else(|| namespace::errors::Namespace::NotFound.build())?;

if actor.namespace_id != namespace.namespace_id {
return Err(pegboard::errors::Actor::NotFound.build());
}

let res = ctx
.signal(pegboard::workflows::actor2::Reschedule {})
.to_workflow::<pegboard::workflows::actor2::Workflow>()
.tag("actor_id", path.actor_id)
.graceful_not_found()
.send()
.await?;

if res.is_none() {
tracing::warn!(
actor_id=?path.actor_id,
"actor workflow not found, likely already stopped"
);
}

Ok(RescheduleResponse {})
}
55 changes: 55 additions & 0 deletions engine/packages/api-peer/src/actors/sleep.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use anyhow::Result;
use gas::prelude::*;
use rivet_api_builder::ApiCtx;
use rivet_api_types::actors::sleep::*;

#[tracing::instrument(skip_all)]
pub async fn sleep(
ctx: ApiCtx,
path: SleepPath,
query: SleepQuery,
_body: SleepRequest,
) -> Result<SleepResponse> {
// Get the actor first to verify it exists
let actors_res = ctx
.op(pegboard::ops::actor::get::Input {
actor_ids: vec![path.actor_id],
fetch_error: false,
})
.await?;

let actor = actors_res
.actors
.into_iter()
.next()
.ok_or_else(|| pegboard::errors::Actor::NotFound.build())?;

// Verify the actor belongs to the specified namespace
let namespace = ctx
.op(namespace::ops::resolve_for_name_global::Input {
name: query.namespace,
})
.await?
.ok_or_else(|| namespace::errors::Namespace::NotFound.build())?;

if actor.namespace_id != namespace.namespace_id {
return Err(pegboard::errors::Actor::NotFound.build());
}

let res = ctx
.signal(pegboard::workflows::actor2::Sleep {})
.to_workflow::<pegboard::workflows::actor2::Workflow>()
.tag("actor_id", path.actor_id)
.graceful_not_found()
.send()
.await?;

if res.is_none() {
tracing::warn!(
actor_id=?path.actor_id,
"actor workflow not found, likely already stopped"
);
}

Ok(SleepResponse {})
}
5 changes: 5 additions & 0 deletions engine/packages/api-peer/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ pub async fn router(
"/actors/{actor_id}/kv/keys/{key}",
get(actors::kv_get::kv_get),
)
.route("/actors/{actor_id}/sleep", post(actors::sleep::sleep))
.route(
"/actors/{actor_id}/reschedule",
post(actors::reschedule::reschedule),
)
// MARK: Runners
.route("/runners", get(runners::list))
.route("/runners/names", get(runners::list_names))
Expand Down
4 changes: 1 addition & 3 deletions engine/packages/api-public/src/actors/get_or_create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@ use rivet_api_builder::{
ApiError,
extract::{Extension, Json, Query},
};
use rivet_api_types::actors::get_or_create::{
GetOrCreateQuery, GetOrCreateRequest, GetOrCreateResponse,
};
use rivet_api_types::actors::get_or_create::*;
use rivet_api_util::request_remote_datacenter;

use crate::ctx::ApiCtx;
Expand Down
33 changes: 2 additions & 31 deletions engine/packages/api-public/src/actors/kv_get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,34 +4,12 @@ use rivet_api_builder::{
ApiError,
extract::{Extension, Json, Path, Query},
};
use rivet_api_types::actors::kv_get::*;
use rivet_api_util::request_remote_datacenter_raw;
use rivet_util::Id;
use serde::{Deserialize, Serialize};
use utoipa::{IntoParams, ToSchema};

use crate::ctx::ApiCtx;

#[derive(Debug, Deserialize, Serialize, IntoParams)]
#[serde(deny_unknown_fields)]
#[into_params(parameter_in = Query)]
pub struct KvGetQuery {
pub namespace: String,
}

#[derive(Deserialize)]
#[serde(deny_unknown_fields)]
pub struct KvGetPath {
pub actor_id: Id,
pub key: String,
}

#[derive(Serialize, ToSchema)]
#[schema(as = ActorsKvGetResponse)]
pub struct KvGetResponse {
pub value: String,
pub update_ts: i64,
}

#[utoipa::path(
get,
operation_id = "actors_kv_get",
Expand Down Expand Up @@ -63,14 +41,7 @@ async fn kv_get_inner(ctx: ApiCtx, path: KvGetPath, query: KvGetQuery) -> Result
ctx.auth().await?;

if path.actor_id.label() == ctx.config().dc_label() {
let peer_path = rivet_api_peer::actors::kv_get::KvGetPath {
actor_id: path.actor_id,
key: path.key,
};
let peer_query = rivet_api_peer::actors::kv_get::KvGetQuery {
namespace: query.namespace,
};
let res = rivet_api_peer::actors::kv_get::kv_get(ctx.into(), peer_path, peer_query).await?;
let res = rivet_api_peer::actors::kv_get::kv_get(ctx.into(), path, query).await?;

Ok(Json(res).into_response())
} else {
Expand Down
2 changes: 2 additions & 0 deletions engine/packages/api-public/src/actors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,6 @@ pub mod get_or_create;
pub mod kv_get;
pub mod list;
pub mod list_names;
pub mod reschedule;
pub mod sleep;
pub mod utils;
Loading
Loading