Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions lib/sdf-core/BUCK
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ rust_library(
"//lib/si-data-spicedb:si-data-spicedb",
"//lib/si-db:si-db",
"//lib/si-events-rs:si-events",
"//lib/si-frontend-mv-types-rs:si-frontend-mv-types",
"//lib/si-frontend-types-rs:si-frontend-types",
"//lib/si-id:si-id",
"//lib/si-jwt-public-key:si-jwt-public-key",
Expand Down
1 change: 1 addition & 0 deletions lib/sdf-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ si-data-nats = { path = "../../lib/si-data-nats" }
si-data-spicedb = { path = "../../lib/si-data-spicedb" }
si-db = { path = "../../lib/si-db" }
si-events = { path = "../../lib/si-events-rs" }
si-frontend-mv-types = { path = "../../lib/si-frontend-mv-types-rs" }
si-frontend-types = { path = "../../lib/si-frontend-types-rs" }
si-id = { path = "../../lib/si-id" }
si-jwt-public-key = { path = "../../lib/si-jwt-public-key" }
Expand Down
113 changes: 113 additions & 0 deletions lib/sdf-core/src/index.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
use std::time::Duration;

use axum::response::{
IntoResponse,
Response,
};
use dal::{
ChangeSetId,
WorkspacePk,
};
use hyper::StatusCode;
use serde::{
Deserialize,
Serialize,
};
use si_frontend_mv_types::object::FrontendObject;
use si_frontend_types::FrontEndObjectRequest;
use thiserror::Error;

use crate::api_error::ApiError;

#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct FrontEndObjectMeta {
pub workspace_snapshot_address: String,
pub index_checksum: String,
pub front_end_object: FrontendObject,
}

#[remain::sorted]
#[derive(Debug, Error)]
pub enum IndexError {
#[error("change set error: {0}")]
ChangeSet(#[from] dal::ChangeSetError),
#[error("deserializing mv index data error: {0}")]
DeserializingMvIndexData(#[source] serde_json::Error),
#[error("edda client error: {0}")]
EddaClient(#[from] edda_client::ClientError),
#[error("frigg error: {0}")]
Frigg(#[from] frigg::FriggError),
#[error("index not found; workspace_id={0}, change_set_id={1}")]
IndexNotFound(WorkspacePk, ChangeSetId),
#[error("index not found after rebuild; workspace_id={0}, change_set_id={1}")]
IndexNotFoundAfterFreshBuild(WorkspacePk, ChangeSetId),
#[error("index not found after rebuild; workspace_id={0}, change_set_id={1}")]
IndexNotFoundAfterRebuild(WorkspacePk, ChangeSetId),
#[error("item with checksum not found; workspace_id={0}, change_set_id={1}, kind={2}")]
ItemWithChecksumNotFound(WorkspacePk, ChangeSetId, String),
#[error("latest item not found; workspace_id={0}, change_set_id={1}, kind={2}")]
LatestItemNotFound(WorkspacePk, ChangeSetId, String),
#[error("transactions error: {0}")]
Transactions(#[from] dal::TransactionsError),
#[error("timed out when watching index with duration: {0:?}")]
WatchIndexTimeout(Duration),
}

pub type IndexResult<T> = Result<T, IndexError>;

impl IntoResponse for IndexError {
fn into_response(self) -> Response {
let status_code = match &self {
IndexError::IndexNotFound(_, _)
| IndexError::IndexNotFoundAfterFreshBuild(_, _)
| IndexError::IndexNotFoundAfterRebuild(_, _)
| IndexError::ItemWithChecksumNotFound(_, _, _)
| IndexError::LatestItemNotFound(_, _, _) => StatusCode::NOT_FOUND,
_ => ApiError::DEFAULT_ERROR_STATUS_CODE,
};

ApiError::new(status_code, self).into_response()
}
}

pub async fn front_end_object_meta(
frigg: &frigg::FriggStore,
workspace_id: WorkspacePk,
change_set_id: ChangeSetId,
request: &FrontEndObjectRequest,
) -> IndexResult<FrontEndObjectMeta> {
let (checksum, address) = match frigg
.get_index_pointer_value(workspace_id, change_set_id)
.await?
{
Some((index, _kv_revision)) => (index.index_checksum, index.snapshot_address),
None => ("".to_string(), "".to_string()),
};
let obj;
if let Some(checksum) = &request.checksum {
obj = frigg
.get_object(workspace_id, &request.kind, &request.id, checksum)
.await?
.ok_or_else(|| {
IndexError::ItemWithChecksumNotFound(
workspace_id,
change_set_id,
request.kind.clone(),
)
})?;
} else {
obj = frigg
.get_current_object(workspace_id, change_set_id, &request.kind, &request.id)
.await?
.ok_or_else(|| {
IndexError::LatestItemNotFound(workspace_id, change_set_id, request.kind.clone())
})?;
}

Ok(FrontEndObjectMeta {
workspace_snapshot_address: address,
index_checksum: checksum,
front_end_object: obj,
})
}
1 change: 1 addition & 0 deletions lib/sdf-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub mod app_state;
pub mod async_route;
pub mod dal_wrapper;
pub mod force_change_set_response;
pub mod index;
pub mod nats_multiplexer;
pub mod tracking;
pub mod workspace_permissions;
Expand Down
2 changes: 1 addition & 1 deletion lib/sdf-core/src/nats_multiplexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl EddaUpdatesMultiplexerClient {
}
}

pub async fn receiver_for_workspace(
pub async fn messages_for_workspace(
&self,
prefix: Option<&str>,
workspace_id: WorkspacePk,
Expand Down
2 changes: 1 addition & 1 deletion lib/sdf-server/src/service/v2/change_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ use sdf_core::{
api_error::ApiError,
app_state::AppState,
dal_wrapper::DalWrapperError,
index::IndexError,
};
use serde::Serialize;
use si_data_spicedb::SpiceDbError;
use telemetry::prelude::*;
use thiserror::Error;

use super::index::IndexError;
use crate::middleware::WorkspacePermissionLayer;

mod apply;
Expand Down
65 changes: 1 addition & 64 deletions lib/sdf-server/src/service/v2/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,6 @@ use std::time::Duration;

use axum::{
Router,
response::{
IntoResponse,
Response,
},
routing::{
get,
post,
Expand All @@ -16,15 +12,8 @@ use dal::{
WorkspacePk,
};
use futures_lite::StreamExt;
use hyper::StatusCode;
use sdf_core::api_error::ApiError;
use serde::{
Deserialize,
Serialize,
};
use si_frontend_mv_types::object::FrontendObject;
use sdf_core::index::IndexResult;
use telemetry::prelude::*;
use thiserror::Error;

use super::AccessBuilder;
use crate::AppState;
Expand All @@ -35,50 +24,6 @@ mod rebuild_change_set_index;

const WATCH_INDEX_TIMEOUT: Duration = Duration::from_secs(4);

#[remain::sorted]
#[derive(Error, Debug)]
pub enum IndexError {
#[error("change set error: {0}")]
ChangeSet(#[from] dal::ChangeSetError),
#[error("deserializing mv index data error: {0}")]
DeserializingMvIndexData(#[source] serde_json::Error),
#[error("edda client error: {0}")]
EddaClient(#[from] edda_client::ClientError),
#[error("frigg error: {0}")]
Frigg(#[from] frigg::FriggError),
#[error("index not found; workspace_pk={0}, change_set_id={1}")]
IndexNotFound(WorkspacePk, ChangeSetId),
#[error("index not found after rebuild; workspace_pk={0}, change_set_id={1}")]
IndexNotFoundAfterFreshBuild(WorkspacePk, ChangeSetId),
#[error("index not found after rebuild; workspace_pk={0}, change_set_id={1}")]
IndexNotFoundAfterRebuild(WorkspacePk, ChangeSetId),
#[error("item with checksum not found; workspace_pk={0}, change_set_id={1}, kind={2}")]
ItemWithChecksumNotFound(WorkspacePk, ChangeSetId, String),
#[error("latest item not found; workspace_pk={0}, change_set_id={1}, kind={2}")]
LatestItemNotFound(WorkspacePk, ChangeSetId, String),
#[error("transactions error: {0}")]
Transactions(#[from] dal::TransactionsError),
#[error("timed out when watching index with duration: {0:?}")]
WatchIndexTimeout(Duration),
}

pub type IndexResult<T> = Result<T, IndexError>;

impl IntoResponse for IndexError {
fn into_response(self) -> Response {
let status_code = match &self {
IndexError::IndexNotFound(_, _)
| IndexError::IndexNotFoundAfterFreshBuild(_, _)
| IndexError::IndexNotFoundAfterRebuild(_, _)
| IndexError::ItemWithChecksumNotFound(_, _, _)
| IndexError::LatestItemNotFound(_, _, _) => StatusCode::NOT_FOUND,
_ => ApiError::DEFAULT_ERROR_STATUS_CODE,
};

ApiError::new(status_code, self).into_response()
}
}

pub fn v2_change_set_routes() -> Router<AppState> {
Router::new()
.route("/", get(get_change_set_index::get_change_set_index))
Expand All @@ -93,14 +38,6 @@ pub fn v2_change_set_routes() -> Router<AppState> {
)
}

#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct FrontEndObjectMeta {
workspace_snapshot_address: String,
index_checksum: String,
front_end_object: FrontendObject,
}

#[instrument(
level = "info",
name = "sdf.index.request_rebuild",
Expand Down
6 changes: 4 additions & 2 deletions lib/sdf-server/src/service/v2/index/get_change_set_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ use dal::{
ChangeSetId,
WorkspacePk,
};
use sdf_core::index::{
FrontEndObjectMeta,
IndexError,
};
use telemetry::prelude::*;

use super::{
AccessBuilder,
FrontEndObjectMeta,
IndexError,
IndexResult,
request_rebuild_and_watch,
};
Expand Down
62 changes: 7 additions & 55 deletions lib/sdf-server/src/service/v2/index/get_front_end_object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,31 +9,26 @@ use dal::{
ChangeSetId,
WorkspacePk,
};
use sdf_core::index::{
FrontEndObjectMeta,
front_end_object_meta,
};
use serde::{
Deserialize,
Serialize,
};
use si_frontend_types::FrontEndObjectRequest;
use telemetry::prelude::*;

use super::{
AccessBuilder,
FrontEndObjectMeta,
IndexError,
IndexResult,
};
use crate::extract::{
FriggStore,
HandlerContext,
};

#[derive(Deserialize, Serialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct FrontEndObjectRequest {
pub kind: String,
pub id: String,
pub checksum: Option<String>,
}

pub async fn get_front_end_object(
HandlerContext(builder): HandlerContext,
AccessBuilder(access_builder): AccessBuilder,
Expand All @@ -46,7 +41,7 @@ pub async fn get_front_end_object(
.await?;

Ok(Json(
front_end_object_meta(&frigg, workspace_pk, change_set_id, request).await?,
front_end_object_meta(&frigg, workspace_pk, change_set_id, &request).await?,
))
}

Expand Down Expand Up @@ -77,9 +72,7 @@ pub async fn get_multiple_front_end_objects(
let mut successful = Vec::new();
let mut failed = Vec::new();
for object_request in request.requests {
match front_end_object_meta(&frigg, workspace_pk, change_set_id, object_request.clone())
.await
{
match front_end_object_meta(&frigg, workspace_pk, change_set_id, &object_request).await {
Ok(meta) => successful.push(meta),
Err(error) => {
error!(?error);
Expand All @@ -90,44 +83,3 @@ pub async fn get_multiple_front_end_objects(

Ok(Json(MultipleFrontEndObjectResponse { successful, failed }))
}

async fn front_end_object_meta(
frigg: &frigg::FriggStore,
workspace_pk: WorkspacePk,
change_set_id: ChangeSetId,
request: FrontEndObjectRequest,
) -> IndexResult<FrontEndObjectMeta> {
let (checksum, address) = match frigg
.get_index_pointer_value(workspace_pk, change_set_id)
.await?
{
Some((index, _kv_revision)) => (index.index_checksum, index.snapshot_address),
None => ("".to_string(), "".to_string()),
};
let obj;
if let Some(checksum) = request.checksum {
obj = frigg
.get_object(workspace_pk, &request.kind, &request.id, &checksum)
.await?
.ok_or(IndexError::ItemWithChecksumNotFound(
workspace_pk,
change_set_id,
request.kind,
))?;
} else {
obj = frigg
.get_current_object(workspace_pk, change_set_id, &request.kind, &request.id)
.await?
.ok_or(IndexError::LatestItemNotFound(
workspace_pk,
change_set_id,
request.kind,
))?;
}

Ok(FrontEndObjectMeta {
workspace_snapshot_address: address,
index_checksum: checksum,
front_end_object: obj,
})
}
Loading