Skip to content

Commit 1eec443

Browse files
authored
Merge pull request #6527 from systeminit/fnichol/bifrost-mjolnir-requests-v2
feat(sdf): accept object requests on web socket & stream responses
2 parents ba1fe4b + 176a0c2 commit 1eec443

File tree

14 files changed

+709
-175
lines changed

14 files changed

+709
-175
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

lib/sdf-core/BUCK

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ rust_library(
1414
"//lib/si-data-spicedb:si-data-spicedb",
1515
"//lib/si-db:si-db",
1616
"//lib/si-events-rs:si-events",
17+
"//lib/si-frontend-mv-types-rs:si-frontend-mv-types",
1718
"//lib/si-frontend-types-rs:si-frontend-types",
1819
"//lib/si-id:si-id",
1920
"//lib/si-jwt-public-key:si-jwt-public-key",

lib/sdf-core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ si-data-nats = { path = "../../lib/si-data-nats" }
2121
si-data-spicedb = { path = "../../lib/si-data-spicedb" }
2222
si-db = { path = "../../lib/si-db" }
2323
si-events = { path = "../../lib/si-events-rs" }
24+
si-frontend-mv-types = { path = "../../lib/si-frontend-mv-types-rs" }
2425
si-frontend-types = { path = "../../lib/si-frontend-types-rs" }
2526
si-id = { path = "../../lib/si-id" }
2627
si-jwt-public-key = { path = "../../lib/si-jwt-public-key" }

lib/sdf-core/src/index.rs

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
use std::time::Duration;
2+
3+
use axum::response::{
4+
IntoResponse,
5+
Response,
6+
};
7+
use dal::{
8+
ChangeSetId,
9+
WorkspacePk,
10+
};
11+
use hyper::StatusCode;
12+
use serde::{
13+
Deserialize,
14+
Serialize,
15+
};
16+
use si_frontend_mv_types::object::FrontendObject;
17+
use si_frontend_types::FrontEndObjectRequest;
18+
use thiserror::Error;
19+
20+
use crate::api_error::ApiError;
21+
22+
#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)]
23+
#[serde(rename_all = "camelCase")]
24+
pub struct FrontEndObjectMeta {
25+
pub workspace_snapshot_address: String,
26+
pub index_checksum: String,
27+
pub front_end_object: FrontendObject,
28+
}
29+
30+
#[remain::sorted]
31+
#[derive(Debug, Error)]
32+
pub enum IndexError {
33+
#[error("change set error: {0}")]
34+
ChangeSet(#[from] dal::ChangeSetError),
35+
#[error("deserializing mv index data error: {0}")]
36+
DeserializingMvIndexData(#[source] serde_json::Error),
37+
#[error("edda client error: {0}")]
38+
EddaClient(#[from] edda_client::ClientError),
39+
#[error("frigg error: {0}")]
40+
Frigg(#[from] frigg::FriggError),
41+
#[error("index not found; workspace_id={0}, change_set_id={1}")]
42+
IndexNotFound(WorkspacePk, ChangeSetId),
43+
#[error("index not found after rebuild; workspace_id={0}, change_set_id={1}")]
44+
IndexNotFoundAfterFreshBuild(WorkspacePk, ChangeSetId),
45+
#[error("index not found after rebuild; workspace_id={0}, change_set_id={1}")]
46+
IndexNotFoundAfterRebuild(WorkspacePk, ChangeSetId),
47+
#[error("item with checksum not found; workspace_id={0}, change_set_id={1}, kind={2}")]
48+
ItemWithChecksumNotFound(WorkspacePk, ChangeSetId, String),
49+
#[error("latest item not found; workspace_id={0}, change_set_id={1}, kind={2}")]
50+
LatestItemNotFound(WorkspacePk, ChangeSetId, String),
51+
#[error("transactions error: {0}")]
52+
Transactions(#[from] dal::TransactionsError),
53+
#[error("timed out when watching index with duration: {0:?}")]
54+
WatchIndexTimeout(Duration),
55+
}
56+
57+
pub type IndexResult<T> = Result<T, IndexError>;
58+
59+
impl IntoResponse for IndexError {
60+
fn into_response(self) -> Response {
61+
let status_code = match &self {
62+
IndexError::IndexNotFound(_, _)
63+
| IndexError::IndexNotFoundAfterFreshBuild(_, _)
64+
| IndexError::IndexNotFoundAfterRebuild(_, _)
65+
| IndexError::ItemWithChecksumNotFound(_, _, _)
66+
| IndexError::LatestItemNotFound(_, _, _) => StatusCode::NOT_FOUND,
67+
_ => ApiError::DEFAULT_ERROR_STATUS_CODE,
68+
};
69+
70+
ApiError::new(status_code, self).into_response()
71+
}
72+
}
73+
74+
pub async fn front_end_object_meta(
75+
frigg: &frigg::FriggStore,
76+
workspace_id: WorkspacePk,
77+
change_set_id: ChangeSetId,
78+
request: &FrontEndObjectRequest,
79+
) -> IndexResult<FrontEndObjectMeta> {
80+
let (checksum, address) = match frigg
81+
.get_index_pointer_value(workspace_id, change_set_id)
82+
.await?
83+
{
84+
Some((index, _kv_revision)) => (index.index_checksum, index.snapshot_address),
85+
None => ("".to_string(), "".to_string()),
86+
};
87+
let obj;
88+
if let Some(checksum) = &request.checksum {
89+
obj = frigg
90+
.get_object(workspace_id, &request.kind, &request.id, checksum)
91+
.await?
92+
.ok_or_else(|| {
93+
IndexError::ItemWithChecksumNotFound(
94+
workspace_id,
95+
change_set_id,
96+
request.kind.clone(),
97+
)
98+
})?;
99+
} else {
100+
obj = frigg
101+
.get_current_object(workspace_id, change_set_id, &request.kind, &request.id)
102+
.await?
103+
.ok_or_else(|| {
104+
IndexError::LatestItemNotFound(workspace_id, change_set_id, request.kind.clone())
105+
})?;
106+
}
107+
108+
Ok(FrontEndObjectMeta {
109+
workspace_snapshot_address: address,
110+
index_checksum: checksum,
111+
front_end_object: obj,
112+
})
113+
}

lib/sdf-core/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ pub mod app_state;
1212
pub mod async_route;
1313
pub mod dal_wrapper;
1414
pub mod force_change_set_response;
15+
pub mod index;
1516
pub mod nats_multiplexer;
1617
pub mod tracking;
1718
pub mod workspace_permissions;

lib/sdf-core/src/nats_multiplexer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ impl EddaUpdatesMultiplexerClient {
4040
}
4141
}
4242

43-
pub async fn receiver_for_workspace(
43+
pub async fn messages_for_workspace(
4444
&self,
4545
prefix: Option<&str>,
4646
workspace_id: WorkspacePk,

lib/sdf-server/src/service/v2/change_set.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,13 @@ use sdf_core::{
2727
api_error::ApiError,
2828
app_state::AppState,
2929
dal_wrapper::DalWrapperError,
30+
index::IndexError,
3031
};
3132
use serde::Serialize;
3233
use si_data_spicedb::SpiceDbError;
3334
use telemetry::prelude::*;
3435
use thiserror::Error;
3536

36-
use super::index::IndexError;
3737
use crate::middleware::WorkspacePermissionLayer;
3838

3939
mod apply;

lib/sdf-server/src/service/v2/index.rs

Lines changed: 1 addition & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,6 @@ use std::time::Duration;
22

33
use axum::{
44
Router,
5-
response::{
6-
IntoResponse,
7-
Response,
8-
},
95
routing::{
106
get,
117
post,
@@ -16,15 +12,8 @@ use dal::{
1612
WorkspacePk,
1713
};
1814
use futures_lite::StreamExt;
19-
use hyper::StatusCode;
20-
use sdf_core::api_error::ApiError;
21-
use serde::{
22-
Deserialize,
23-
Serialize,
24-
};
25-
use si_frontend_mv_types::object::FrontendObject;
15+
use sdf_core::index::IndexResult;
2616
use telemetry::prelude::*;
27-
use thiserror::Error;
2817

2918
use super::AccessBuilder;
3019
use crate::AppState;
@@ -35,50 +24,6 @@ mod rebuild_change_set_index;
3524

3625
const WATCH_INDEX_TIMEOUT: Duration = Duration::from_secs(4);
3726

38-
#[remain::sorted]
39-
#[derive(Error, Debug)]
40-
pub enum IndexError {
41-
#[error("change set error: {0}")]
42-
ChangeSet(#[from] dal::ChangeSetError),
43-
#[error("deserializing mv index data error: {0}")]
44-
DeserializingMvIndexData(#[source] serde_json::Error),
45-
#[error("edda client error: {0}")]
46-
EddaClient(#[from] edda_client::ClientError),
47-
#[error("frigg error: {0}")]
48-
Frigg(#[from] frigg::FriggError),
49-
#[error("index not found; workspace_pk={0}, change_set_id={1}")]
50-
IndexNotFound(WorkspacePk, ChangeSetId),
51-
#[error("index not found after rebuild; workspace_pk={0}, change_set_id={1}")]
52-
IndexNotFoundAfterFreshBuild(WorkspacePk, ChangeSetId),
53-
#[error("index not found after rebuild; workspace_pk={0}, change_set_id={1}")]
54-
IndexNotFoundAfterRebuild(WorkspacePk, ChangeSetId),
55-
#[error("item with checksum not found; workspace_pk={0}, change_set_id={1}, kind={2}")]
56-
ItemWithChecksumNotFound(WorkspacePk, ChangeSetId, String),
57-
#[error("latest item not found; workspace_pk={0}, change_set_id={1}, kind={2}")]
58-
LatestItemNotFound(WorkspacePk, ChangeSetId, String),
59-
#[error("transactions error: {0}")]
60-
Transactions(#[from] dal::TransactionsError),
61-
#[error("timed out when watching index with duration: {0:?}")]
62-
WatchIndexTimeout(Duration),
63-
}
64-
65-
pub type IndexResult<T> = Result<T, IndexError>;
66-
67-
impl IntoResponse for IndexError {
68-
fn into_response(self) -> Response {
69-
let status_code = match &self {
70-
IndexError::IndexNotFound(_, _)
71-
| IndexError::IndexNotFoundAfterFreshBuild(_, _)
72-
| IndexError::IndexNotFoundAfterRebuild(_, _)
73-
| IndexError::ItemWithChecksumNotFound(_, _, _)
74-
| IndexError::LatestItemNotFound(_, _, _) => StatusCode::NOT_FOUND,
75-
_ => ApiError::DEFAULT_ERROR_STATUS_CODE,
76-
};
77-
78-
ApiError::new(status_code, self).into_response()
79-
}
80-
}
81-
8227
pub fn v2_change_set_routes() -> Router<AppState> {
8328
Router::new()
8429
.route("/", get(get_change_set_index::get_change_set_index))
@@ -93,14 +38,6 @@ pub fn v2_change_set_routes() -> Router<AppState> {
9338
)
9439
}
9540

96-
#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)]
97-
#[serde(rename_all = "camelCase")]
98-
pub struct FrontEndObjectMeta {
99-
workspace_snapshot_address: String,
100-
index_checksum: String,
101-
front_end_object: FrontendObject,
102-
}
103-
10441
#[instrument(
10542
level = "info",
10643
name = "sdf.index.request_rebuild",

lib/sdf-server/src/service/v2/index/get_change_set_index.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,14 @@ use dal::{
88
ChangeSetId,
99
WorkspacePk,
1010
};
11+
use sdf_core::index::{
12+
FrontEndObjectMeta,
13+
IndexError,
14+
};
1115
use telemetry::prelude::*;
1216

1317
use super::{
1418
AccessBuilder,
15-
FrontEndObjectMeta,
16-
IndexError,
1719
IndexResult,
1820
request_rebuild_and_watch,
1921
};

lib/sdf-server/src/service/v2/index/get_front_end_object.rs

Lines changed: 7 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -9,31 +9,26 @@ use dal::{
99
ChangeSetId,
1010
WorkspacePk,
1111
};
12+
use sdf_core::index::{
13+
FrontEndObjectMeta,
14+
front_end_object_meta,
15+
};
1216
use serde::{
1317
Deserialize,
1418
Serialize,
1519
};
20+
use si_frontend_types::FrontEndObjectRequest;
1621
use telemetry::prelude::*;
1722

1823
use super::{
1924
AccessBuilder,
20-
FrontEndObjectMeta,
21-
IndexError,
2225
IndexResult,
2326
};
2427
use crate::extract::{
2528
FriggStore,
2629
HandlerContext,
2730
};
2831

29-
#[derive(Deserialize, Serialize, Debug, Clone)]
30-
#[serde(rename_all = "camelCase")]
31-
pub struct FrontEndObjectRequest {
32-
pub kind: String,
33-
pub id: String,
34-
pub checksum: Option<String>,
35-
}
36-
3732
pub async fn get_front_end_object(
3833
HandlerContext(builder): HandlerContext,
3934
AccessBuilder(access_builder): AccessBuilder,
@@ -46,7 +41,7 @@ pub async fn get_front_end_object(
4641
.await?;
4742

4843
Ok(Json(
49-
front_end_object_meta(&frigg, workspace_pk, change_set_id, request).await?,
44+
front_end_object_meta(&frigg, workspace_pk, change_set_id, &request).await?,
5045
))
5146
}
5247

@@ -77,9 +72,7 @@ pub async fn get_multiple_front_end_objects(
7772
let mut successful = Vec::new();
7873
let mut failed = Vec::new();
7974
for object_request in request.requests {
80-
match front_end_object_meta(&frigg, workspace_pk, change_set_id, object_request.clone())
81-
.await
82-
{
75+
match front_end_object_meta(&frigg, workspace_pk, change_set_id, &object_request).await {
8376
Ok(meta) => successful.push(meta),
8477
Err(error) => {
8578
error!(?error);
@@ -90,44 +83,3 @@ pub async fn get_multiple_front_end_objects(
9083

9184
Ok(Json(MultipleFrontEndObjectResponse { successful, failed }))
9285
}
93-
94-
async fn front_end_object_meta(
95-
frigg: &frigg::FriggStore,
96-
workspace_pk: WorkspacePk,
97-
change_set_id: ChangeSetId,
98-
request: FrontEndObjectRequest,
99-
) -> IndexResult<FrontEndObjectMeta> {
100-
let (checksum, address) = match frigg
101-
.get_index_pointer_value(workspace_pk, change_set_id)
102-
.await?
103-
{
104-
Some((index, _kv_revision)) => (index.index_checksum, index.snapshot_address),
105-
None => ("".to_string(), "".to_string()),
106-
};
107-
let obj;
108-
if let Some(checksum) = request.checksum {
109-
obj = frigg
110-
.get_object(workspace_pk, &request.kind, &request.id, &checksum)
111-
.await?
112-
.ok_or(IndexError::ItemWithChecksumNotFound(
113-
workspace_pk,
114-
change_set_id,
115-
request.kind,
116-
))?;
117-
} else {
118-
obj = frigg
119-
.get_current_object(workspace_pk, change_set_id, &request.kind, &request.id)
120-
.await?
121-
.ok_or(IndexError::LatestItemNotFound(
122-
workspace_pk,
123-
change_set_id,
124-
request.kind,
125-
))?;
126-
}
127-
128-
Ok(FrontEndObjectMeta {
129-
workspace_snapshot_address: address,
130-
index_checksum: checksum,
131-
front_end_object: obj,
132-
})
133-
}

0 commit comments

Comments
 (0)