diff --git a/Cargo.lock b/Cargo.lock index d50ffbd969..bc0b502011 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7403,6 +7403,7 @@ dependencies = [ "si-data-spicedb", "si-db", "si-events", + "si-frontend-mv-types", "si-frontend-types", "si-id", "si-jwt-public-key", diff --git a/lib/sdf-core/BUCK b/lib/sdf-core/BUCK index d5626276c7..8f92bf5ab7 100644 --- a/lib/sdf-core/BUCK +++ b/lib/sdf-core/BUCK @@ -14,6 +14,7 @@ rust_library( "//lib/si-data-spicedb:si-data-spicedb", "//lib/si-db:si-db", "//lib/si-events-rs:si-events", + "//lib/si-frontend-mv-types-rs:si-frontend-mv-types", "//lib/si-frontend-types-rs:si-frontend-types", "//lib/si-id:si-id", "//lib/si-jwt-public-key:si-jwt-public-key", diff --git a/lib/sdf-core/Cargo.toml b/lib/sdf-core/Cargo.toml index 1abefe5d6f..1ed011f5c9 100644 --- a/lib/sdf-core/Cargo.toml +++ b/lib/sdf-core/Cargo.toml @@ -21,6 +21,7 @@ si-data-nats = { path = "../../lib/si-data-nats" } si-data-spicedb = { path = "../../lib/si-data-spicedb" } si-db = { path = "../../lib/si-db" } si-events = { path = "../../lib/si-events-rs" } +si-frontend-mv-types = { path = "../../lib/si-frontend-mv-types-rs" } si-frontend-types = { path = "../../lib/si-frontend-types-rs" } si-id = { path = "../../lib/si-id" } si-jwt-public-key = { path = "../../lib/si-jwt-public-key" } diff --git a/lib/sdf-core/src/index.rs b/lib/sdf-core/src/index.rs new file mode 100644 index 0000000000..d821463571 --- /dev/null +++ b/lib/sdf-core/src/index.rs @@ -0,0 +1,113 @@ +use std::time::Duration; + +use axum::response::{ + IntoResponse, + Response, +}; +use dal::{ + ChangeSetId, + WorkspacePk, +}; +use hyper::StatusCode; +use serde::{ + Deserialize, + Serialize, +}; +use si_frontend_mv_types::object::FrontendObject; +use si_frontend_types::FrontEndObjectRequest; +use thiserror::Error; + +use crate::api_error::ApiError; + +#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)] +#[serde(rename_all = "camelCase")] +pub struct FrontEndObjectMeta { + pub workspace_snapshot_address: String, + pub index_checksum: String, + pub front_end_object: FrontendObject, +} + +#[remain::sorted] +#[derive(Debug, Error)] +pub enum IndexError { + #[error("change set error: {0}")] + ChangeSet(#[from] dal::ChangeSetError), + #[error("deserializing mv index data error: {0}")] + DeserializingMvIndexData(#[source] serde_json::Error), + #[error("edda client error: {0}")] + EddaClient(#[from] edda_client::ClientError), + #[error("frigg error: {0}")] + Frigg(#[from] frigg::FriggError), + #[error("index not found; workspace_id={0}, change_set_id={1}")] + IndexNotFound(WorkspacePk, ChangeSetId), + #[error("index not found after rebuild; workspace_id={0}, change_set_id={1}")] + IndexNotFoundAfterFreshBuild(WorkspacePk, ChangeSetId), + #[error("index not found after rebuild; workspace_id={0}, change_set_id={1}")] + IndexNotFoundAfterRebuild(WorkspacePk, ChangeSetId), + #[error("item with checksum not found; workspace_id={0}, change_set_id={1}, kind={2}")] + ItemWithChecksumNotFound(WorkspacePk, ChangeSetId, String), + #[error("latest item not found; workspace_id={0}, change_set_id={1}, kind={2}")] + LatestItemNotFound(WorkspacePk, ChangeSetId, String), + #[error("transactions error: {0}")] + Transactions(#[from] dal::TransactionsError), + #[error("timed out when watching index with duration: {0:?}")] + WatchIndexTimeout(Duration), +} + +pub type IndexResult = Result; + +impl IntoResponse for IndexError { + fn into_response(self) -> Response { + let status_code = match &self { + IndexError::IndexNotFound(_, _) + | IndexError::IndexNotFoundAfterFreshBuild(_, _) + | IndexError::IndexNotFoundAfterRebuild(_, _) + | IndexError::ItemWithChecksumNotFound(_, _, _) + | IndexError::LatestItemNotFound(_, _, _) => StatusCode::NOT_FOUND, + _ => ApiError::DEFAULT_ERROR_STATUS_CODE, + }; + + ApiError::new(status_code, self).into_response() + } +} + +pub async fn front_end_object_meta( + frigg: &frigg::FriggStore, + workspace_id: WorkspacePk, + change_set_id: ChangeSetId, + request: &FrontEndObjectRequest, +) -> IndexResult { + let (checksum, address) = match frigg + .get_index_pointer_value(workspace_id, change_set_id) + .await? + { + Some((index, _kv_revision)) => (index.index_checksum, index.snapshot_address), + None => ("".to_string(), "".to_string()), + }; + let obj; + if let Some(checksum) = &request.checksum { + obj = frigg + .get_object(workspace_id, &request.kind, &request.id, checksum) + .await? + .ok_or_else(|| { + IndexError::ItemWithChecksumNotFound( + workspace_id, + change_set_id, + request.kind.clone(), + ) + })?; + } else { + obj = frigg + .get_current_object(workspace_id, change_set_id, &request.kind, &request.id) + .await? + .ok_or_else(|| { + IndexError::LatestItemNotFound(workspace_id, change_set_id, request.kind.clone()) + })?; + } + + Ok(FrontEndObjectMeta { + workspace_snapshot_address: address, + index_checksum: checksum, + front_end_object: obj, + }) +} diff --git a/lib/sdf-core/src/lib.rs b/lib/sdf-core/src/lib.rs index 0d0a3e0a67..d1fc8f6cd3 100644 --- a/lib/sdf-core/src/lib.rs +++ b/lib/sdf-core/src/lib.rs @@ -12,6 +12,7 @@ pub mod app_state; pub mod async_route; pub mod dal_wrapper; pub mod force_change_set_response; +pub mod index; pub mod nats_multiplexer; pub mod tracking; pub mod workspace_permissions; diff --git a/lib/sdf-core/src/nats_multiplexer.rs b/lib/sdf-core/src/nats_multiplexer.rs index 017ba5132c..f0b9e954b0 100644 --- a/lib/sdf-core/src/nats_multiplexer.rs +++ b/lib/sdf-core/src/nats_multiplexer.rs @@ -40,7 +40,7 @@ impl EddaUpdatesMultiplexerClient { } } - pub async fn receiver_for_workspace( + pub async fn messages_for_workspace( &self, prefix: Option<&str>, workspace_id: WorkspacePk, diff --git a/lib/sdf-server/src/service/v2/change_set.rs b/lib/sdf-server/src/service/v2/change_set.rs index 6da60ce063..55ae721181 100644 --- a/lib/sdf-server/src/service/v2/change_set.rs +++ b/lib/sdf-server/src/service/v2/change_set.rs @@ -27,13 +27,13 @@ use sdf_core::{ api_error::ApiError, app_state::AppState, dal_wrapper::DalWrapperError, + index::IndexError, }; use serde::Serialize; use si_data_spicedb::SpiceDbError; use telemetry::prelude::*; use thiserror::Error; -use super::index::IndexError; use crate::middleware::WorkspacePermissionLayer; mod apply; diff --git a/lib/sdf-server/src/service/v2/index.rs b/lib/sdf-server/src/service/v2/index.rs index 61bcaa7b4f..c9dc6d77fa 100644 --- a/lib/sdf-server/src/service/v2/index.rs +++ b/lib/sdf-server/src/service/v2/index.rs @@ -2,10 +2,6 @@ use std::time::Duration; use axum::{ Router, - response::{ - IntoResponse, - Response, - }, routing::{ get, post, @@ -16,15 +12,8 @@ use dal::{ WorkspacePk, }; use futures_lite::StreamExt; -use hyper::StatusCode; -use sdf_core::api_error::ApiError; -use serde::{ - Deserialize, - Serialize, -}; -use si_frontend_mv_types::object::FrontendObject; +use sdf_core::index::IndexResult; use telemetry::prelude::*; -use thiserror::Error; use super::AccessBuilder; use crate::AppState; @@ -35,50 +24,6 @@ mod rebuild_change_set_index; const WATCH_INDEX_TIMEOUT: Duration = Duration::from_secs(4); -#[remain::sorted] -#[derive(Error, Debug)] -pub enum IndexError { - #[error("change set error: {0}")] - ChangeSet(#[from] dal::ChangeSetError), - #[error("deserializing mv index data error: {0}")] - DeserializingMvIndexData(#[source] serde_json::Error), - #[error("edda client error: {0}")] - EddaClient(#[from] edda_client::ClientError), - #[error("frigg error: {0}")] - Frigg(#[from] frigg::FriggError), - #[error("index not found; workspace_pk={0}, change_set_id={1}")] - IndexNotFound(WorkspacePk, ChangeSetId), - #[error("index not found after rebuild; workspace_pk={0}, change_set_id={1}")] - IndexNotFoundAfterFreshBuild(WorkspacePk, ChangeSetId), - #[error("index not found after rebuild; workspace_pk={0}, change_set_id={1}")] - IndexNotFoundAfterRebuild(WorkspacePk, ChangeSetId), - #[error("item with checksum not found; workspace_pk={0}, change_set_id={1}, kind={2}")] - ItemWithChecksumNotFound(WorkspacePk, ChangeSetId, String), - #[error("latest item not found; workspace_pk={0}, change_set_id={1}, kind={2}")] - LatestItemNotFound(WorkspacePk, ChangeSetId, String), - #[error("transactions error: {0}")] - Transactions(#[from] dal::TransactionsError), - #[error("timed out when watching index with duration: {0:?}")] - WatchIndexTimeout(Duration), -} - -pub type IndexResult = Result; - -impl IntoResponse for IndexError { - fn into_response(self) -> Response { - let status_code = match &self { - IndexError::IndexNotFound(_, _) - | IndexError::IndexNotFoundAfterFreshBuild(_, _) - | IndexError::IndexNotFoundAfterRebuild(_, _) - | IndexError::ItemWithChecksumNotFound(_, _, _) - | IndexError::LatestItemNotFound(_, _, _) => StatusCode::NOT_FOUND, - _ => ApiError::DEFAULT_ERROR_STATUS_CODE, - }; - - ApiError::new(status_code, self).into_response() - } -} - pub fn v2_change_set_routes() -> Router { Router::new() .route("/", get(get_change_set_index::get_change_set_index)) @@ -93,14 +38,6 @@ pub fn v2_change_set_routes() -> Router { ) } -#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)] -#[serde(rename_all = "camelCase")] -pub struct FrontEndObjectMeta { - workspace_snapshot_address: String, - index_checksum: String, - front_end_object: FrontendObject, -} - #[instrument( level = "info", name = "sdf.index.request_rebuild", diff --git a/lib/sdf-server/src/service/v2/index/get_change_set_index.rs b/lib/sdf-server/src/service/v2/index/get_change_set_index.rs index 82d3eba029..f4c556be24 100644 --- a/lib/sdf-server/src/service/v2/index/get_change_set_index.rs +++ b/lib/sdf-server/src/service/v2/index/get_change_set_index.rs @@ -8,12 +8,14 @@ use dal::{ ChangeSetId, WorkspacePk, }; +use sdf_core::index::{ + FrontEndObjectMeta, + IndexError, +}; use telemetry::prelude::*; use super::{ AccessBuilder, - FrontEndObjectMeta, - IndexError, IndexResult, request_rebuild_and_watch, }; diff --git a/lib/sdf-server/src/service/v2/index/get_front_end_object.rs b/lib/sdf-server/src/service/v2/index/get_front_end_object.rs index 0e343dae42..d8942158fe 100644 --- a/lib/sdf-server/src/service/v2/index/get_front_end_object.rs +++ b/lib/sdf-server/src/service/v2/index/get_front_end_object.rs @@ -9,16 +9,19 @@ use dal::{ ChangeSetId, WorkspacePk, }; +use sdf_core::index::{ + FrontEndObjectMeta, + front_end_object_meta, +}; use serde::{ Deserialize, Serialize, }; +use si_frontend_types::FrontEndObjectRequest; use telemetry::prelude::*; use super::{ AccessBuilder, - FrontEndObjectMeta, - IndexError, IndexResult, }; use crate::extract::{ @@ -26,14 +29,6 @@ use crate::extract::{ HandlerContext, }; -#[derive(Deserialize, Serialize, Debug, Clone)] -#[serde(rename_all = "camelCase")] -pub struct FrontEndObjectRequest { - pub kind: String, - pub id: String, - pub checksum: Option, -} - pub async fn get_front_end_object( HandlerContext(builder): HandlerContext, AccessBuilder(access_builder): AccessBuilder, @@ -46,7 +41,7 @@ pub async fn get_front_end_object( .await?; Ok(Json( - front_end_object_meta(&frigg, workspace_pk, change_set_id, request).await?, + front_end_object_meta(&frigg, workspace_pk, change_set_id, &request).await?, )) } @@ -77,9 +72,7 @@ pub async fn get_multiple_front_end_objects( let mut successful = Vec::new(); let mut failed = Vec::new(); for object_request in request.requests { - match front_end_object_meta(&frigg, workspace_pk, change_set_id, object_request.clone()) - .await - { + match front_end_object_meta(&frigg, workspace_pk, change_set_id, &object_request).await { Ok(meta) => successful.push(meta), Err(error) => { error!(?error); @@ -90,44 +83,3 @@ pub async fn get_multiple_front_end_objects( Ok(Json(MultipleFrontEndObjectResponse { successful, failed })) } - -async fn front_end_object_meta( - frigg: &frigg::FriggStore, - workspace_pk: WorkspacePk, - change_set_id: ChangeSetId, - request: FrontEndObjectRequest, -) -> IndexResult { - let (checksum, address) = match frigg - .get_index_pointer_value(workspace_pk, change_set_id) - .await? - { - Some((index, _kv_revision)) => (index.index_checksum, index.snapshot_address), - None => ("".to_string(), "".to_string()), - }; - let obj; - if let Some(checksum) = request.checksum { - obj = frigg - .get_object(workspace_pk, &request.kind, &request.id, &checksum) - .await? - .ok_or(IndexError::ItemWithChecksumNotFound( - workspace_pk, - change_set_id, - request.kind, - ))?; - } else { - obj = frigg - .get_current_object(workspace_pk, change_set_id, &request.kind, &request.id) - .await? - .ok_or(IndexError::LatestItemNotFound( - workspace_pk, - change_set_id, - request.kind, - ))?; - } - - Ok(FrontEndObjectMeta { - workspace_snapshot_address: address, - index_checksum: checksum, - front_end_object: obj, - }) -} diff --git a/lib/sdf-v1-routes-ws/src/bifrost/mod.rs b/lib/sdf-v1-routes-ws/src/bifrost/mod.rs index c5cfe52a6a..9b0d1ad401 100644 --- a/lib/sdf-v1-routes-ws/src/bifrost/mod.rs +++ b/lib/sdf-v1-routes-ws/src/bifrost/mod.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use axum::{ extract::{ State, @@ -10,20 +12,21 @@ use dal::{ DedicatedExecutor, WorkspacePk, }; +use frigg::FriggStore; use sdf_core::nats_multiplexer::{ EddaUpdatesMultiplexerClient, NatsMultiplexerClients, }; use sdf_extract::{ ComputeExecutor, + Nats, request::TokenFromQueryParam, - services::Nats, workspace::{ TargetWorkspaceIdFromToken, WorkspaceAuthorization, }, }; -use si_data_nats::NatsClient; +use si_data_nats::ConnectionMetadata; use telemetry::prelude::*; use tokio_util::sync::CancellationToken; @@ -34,10 +37,11 @@ pub mod proto; #[allow(clippy::too_many_arguments)] pub async fn bifrost_handler( wsu: WebSocketUpgrade, - Nats(nats): Nats, _: TokenFromQueryParam, _: TargetWorkspaceIdFromToken, auth: WorkspaceAuthorization, + Nats(nats): Nats, + State(frigg): State, ComputeExecutor(compute_executor): ComputeExecutor, State(shutdown_token): State, State(channel_multiplexer_clients): State, @@ -45,7 +49,8 @@ pub async fn bifrost_handler( Ok(wsu.on_upgrade(move |socket| { run_bifrost_proto( socket, - nats, + nats.metadata_clone(), + frigg, auth.workspace_id, channel_multiplexer_clients.edda_updates, compute_executor, @@ -56,15 +61,22 @@ pub async fn bifrost_handler( async fn run_bifrost_proto( mut socket: WebSocket, - nats: NatsClient, + metadata: Arc, + frigg: FriggStore, workspace_pk: WorkspacePk, bifrost_multiplexer_client: EddaUpdatesMultiplexerClient, compute_executor: DedicatedExecutor, shutdown_token: CancellationToken, ) { - let proto = match proto::run(nats, compute_executor, workspace_pk, shutdown_token) - .start(bifrost_multiplexer_client) - .await + let proto = match proto::run( + metadata, + frigg, + compute_executor, + workspace_pk, + shutdown_token, + ) + .start(bifrost_multiplexer_client) + .await { Ok(started) => started, Err(err) => { diff --git a/lib/sdf-v1-routes-ws/src/bifrost/proto.rs b/lib/sdf-v1-routes-ws/src/bifrost/proto.rs index 3ad302fdd1..a27e118f57 100644 --- a/lib/sdf-v1-routes-ws/src/bifrost/proto.rs +++ b/lib/sdf-v1-routes-ws/src/bifrost/proto.rs @@ -4,6 +4,7 @@ use std::{ Error as _, }, string::FromUtf8Error, + sync::Arc, }; use axum::extract::ws::{ @@ -11,20 +12,38 @@ use axum::extract::ws::{ WebSocket, }; use dal::{ + ChangeSetId, DedicatedExecutor, DedicatedExecutorError, WorkspacePk, }; +use frigg::FriggStore; use miniz_oxide::inflate; +use nats_multiplexer_client::MultiplexerClientError; use nats_std::header::{ self, value::ContentEncoding, }; -use sdf_core::nats_multiplexer::EddaUpdatesMultiplexerClient; -use si_data_nats::NatsClient; +use sdf_core::{ + index::FrontEndObjectMeta, + nats_multiplexer::EddaUpdatesMultiplexerClient, +}; +use serde::{ + Deserialize, + Serialize, +}; +use si_data_nats::ConnectionMetadata; +use si_frontend_types::FrontEndObjectRequest; +use task::{ + BifrostFriggReadsTask, + BifrostFriggReadsTaskHandle, +}; use telemetry::prelude::*; use thiserror::Error; -use tokio::sync::broadcast; +use tokio::sync::{ + broadcast, + mpsc, +}; use tokio_tungstenite::tungstenite; use tokio_util::sync::CancellationToken; @@ -41,6 +60,14 @@ pub enum BifrostError { Decompress(String), #[error("edda updates multiplexer client error: {0}")] EddaUpdatesMultiplexerClient(#[source] Box), + #[error("frigg reads task recv error: channel is empty and closed")] + FriggReadsTaskRecv, + #[error("error serialize frigg reads response: {0}")] + FriggReadsTaskResponseSerialize(#[source] serde_json::Error), + #[error("frigg reads task send error: channel is closed or rx dropped")] + FriggReadsTaskSend, + #[error("Multiplexer client error: {0}")] + MultiplexerClient(#[from] MultiplexerClientError), #[error("Nats error: {0}")] Nats(#[from] si_data_nats::Error), #[error("error parsing payload as utf8 string: {0}")] @@ -57,25 +84,58 @@ type Result = std::result::Result; type Error = BifrostError; +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct WsFrontEndOjbectRequest { + pub ws_request_id: Option, + pub workspace_id: WorkspacePk, + pub change_set_id: ChangeSetId, + #[serde(flatten)] + pub request: FrontEndObjectRequest, +} + +#[derive(Clone, Debug, Serialize)] +#[serde(rename_all = "camelCase", tag = "result")] +pub enum WsFrontEndObjectResponse { + Ok { + ws_request_id: Option, + workspace_id: WorkspacePk, + change_set_id: ChangeSetId, + #[serde(flatten)] + response: FrontEndObjectMeta, + }, + Err { + ws_request_id: Option, + workspace_id: WorkspacePk, + change_set_id: ChangeSetId, + error: String, + #[serde(flatten)] + request: FrontEndObjectRequest, + }, +} + pub fn run( - nats: NatsClient, + metadata: Arc, + frigg: FriggStore, compute_executor: DedicatedExecutor, - workspace_pk: WorkspacePk, + workspace_id: WorkspacePk, token: CancellationToken, ) -> Bifrost { Bifrost { - nats, + metadata, + frigg, compute_executor, - workspace_pk, + workspace_id, token, } } #[derive(Debug)] pub struct Bifrost { - nats: NatsClient, + metadata: Arc, + frigg: FriggStore, compute_executor: DedicatedExecutor, - workspace_pk: WorkspacePk, + workspace_id: WorkspacePk, token: CancellationToken, } @@ -84,16 +144,38 @@ impl Bifrost { self, bifrost_multiplexer_client: EddaUpdatesMultiplexerClient, ) -> Result { - let receiver = bifrost_multiplexer_client - .receiver_for_workspace(self.nats.metadata().subject_prefix(), self.workspace_pk) + let nats_messages = bifrost_multiplexer_client + .messages_for_workspace(self.metadata.subject_prefix(), self.workspace_id) .await .map_err(Error::EddaUpdatesMultiplexerClient)?; + let (requests_tx, requests_rx) = mpsc::channel(256); + let (responses_tx, responses_rx) = mpsc::channel(128); + + let handle = { + let task_token = self.token.child_token(); + + // We will await shutdown of this task via its [`JoinHandle`], hence no need for a + // [`TaskTracker`]. + let join_handle = tokio::spawn( + BifrostFriggReadsTask::create( + self.frigg, + requests_rx, + responses_tx, + task_token.clone(), + ) + .run(), + ); + + BifrostFriggReadsTaskHandle::new(join_handle, task_token) + }; + Ok(BifrostStarted { - _workspace_pk: self.workspace_pk, - _nats: self.nats.clone(), compute_executor: self.compute_executor, - receiver, + nats_messages, + requests_tx, + responses_rx, + handle, token: self.token, }) } @@ -101,17 +183,19 @@ impl Bifrost { #[derive(Debug)] pub struct BifrostStarted { - _workspace_pk: WorkspacePk, - _nats: NatsClient, compute_executor: DedicatedExecutor, - receiver: broadcast::Receiver, + nats_messages: broadcast::Receiver, + requests_tx: mpsc::Sender, + responses_rx: mpsc::Receiver, + handle: BifrostFriggReadsTaskHandle, token: CancellationToken, } impl BifrostStarted { - pub async fn process(mut self, ws: &mut WebSocket) -> Result { + pub async fn process(mut self, ws_client: &mut WebSocket) -> Result { loop { tokio::select! { + // Cancellation token has fired, time to shut down _ = self.token.cancelled() => { trace!("web socket has received cancellation"); let close_frame = ws::CloseFrame { @@ -125,7 +209,7 @@ impl BifrostStarted { }; // Close connection with specific close frame that indicates the server // is going away - if let Err(err) = ws.send(ws::Message::Close(Some(close_frame))).await { + if let Err(err) = ws_client.send(ws::Message::Close(Some(close_frame))).await { // Not much we can or want to do here--we are in the process of // shutting down warn!( @@ -133,26 +217,128 @@ impl BifrostStarted { "error while closing websocket connection during graceful shutdown", ); } - return Ok(BifrostClosing { ws_is_closed: true }); + return Ok(BifrostClosing { + ws_is_closed: true, + handle: self.handle, + }); } - msg = ws.recv() => { - match msg { - Some(Ok(_)) => { - // We don't support any incoming commands over this websocket yet, but - // when we do, this is where we'd handle dispatch for them. + // Maybe a message from web socket client + maybe_ws_client_message = ws_client.recv() => { + match maybe_ws_client_message { + // Received web socket text message + Some(Ok(ws::Message::Text(payload))) => { + let request: WsFrontEndOjbectRequest = + match serde_json::from_str(&payload) { + // Deserialize successful + Ok(r) => r, + // Error while deserializing + Err(err) => { + warn!( + si.error.message = ?err, + "client request failed to deserialize; skipping", + ); + continue; + } + }; + + self.requests_tx + .send(request) + .await + .map_err(|_| BifrostError::FriggReadsTaskSend)?; + } + // Received a ping (library automatically deals with replies) + Some(Ok(ws::Message::Ping(payload))) => { + trace!( + ws.client.ping.message = String::from_utf8_lossy(&payload).as_ref(), + "read web socket client ping message; skipping", + ); + continue; + } + // Received a ping (library automatically deals with replies) + Some(Ok(ws::Message::Pong(payload))) => { + trace!( + ws.client.pong.message = String::from_utf8_lossy(&payload).as_ref(), + "read web socket client pong message; skipping", + ); continue; + } + // Received a close message from the client + Some(Ok(ws::Message::Close(maybe_close_frame))) => { + debug!( + ws.client.close.frame = ?maybe_close_frame, + "read web socket client close message; shutting down bifrost", + ); + return Ok(BifrostClosing { + ws_is_closed: true, + handle: self.handle, + }); + } + // Received unexpected web socket message type + Some(Ok(unexpected_message)) => { + warn!( + message = ?unexpected_message, + "received unexpected message type; skipping", + ); + continue; } + // Next message was a web socket error Some(Err(err)) => return Err(err.into()), - None => return Ok(BifrostClosing { ws_is_closed: true }), + // Web socket stream has closed + None => { + debug!( + "web socket client stream has closed; shutting down bifrost", + ); + + return Ok(BifrostClosing { + ws_is_closed: true, + handle: self.handle, + }); + } } } - nats_msg_result = self.receiver.recv() => { - match nats_msg_result { + // Maybe a response for the web socket client + maybe_response = self.responses_rx.recv() => { + match maybe_response { + // Received a response + Some(response) => { + let payload = serde_json::to_string(&response) + .map_err(BifrostError::FriggReadsTaskResponseSerialize)?; + let msg = ws::Message::Text(payload); + + match Self::send_ws_client_message(ws_client, msg).await { + // Web socket closed, tear down + Some(Ok(_)) => { + debug!( + "before sending response, web socket client has closed; {}", + "shutting down bifrost", + ); + + return Ok(BifrostClosing { + ws_is_closed: true, + handle: self.handle, + }); + } + // Error sending message to client + Some(Err(err)) => return Err(err), + // Sucessfully sent web socket message to client + None => {} + } + } + // Channel is empty and closed + None => { + // Task has terminated prematurely which is an error + return Err(BifrostError::FriggReadsTaskRecv); + } + } + } + // NATS message from subscription + nats_message_result = self.nats_messages.recv() => { + match nats_message_result { // We have a message - Ok(nats_msg) => { - let ws_msg = match self.build_ws_message(nats_msg).await { - Ok(ws_msg) => ws_msg, + Ok(nats_message) => { + let ws_message = match self.build_ws_message(nats_message).await { + Ok(ws_message) => ws_message, Err(err) => { warn!( si.error.message = ?err, @@ -162,20 +348,23 @@ impl BifrostStarted { } }; - if let Err(err) = ws.send(ws_msg).await { - match err - .source() - .and_then(|err| err.downcast_ref::()) - { - // If the websocket has cleanly closed, we should cleanly finish as - // well--this is not an error condition - Some(tungstenite::Error::ConnectionClosed) - | Some(tungstenite::Error::AlreadyClosed) => { - trace!("websocket has cleanly closed, ending"); - return Ok(BifrostClosing { ws_is_closed: true }); - }, - _ => return Err(BifrostError::WsSendIo(err)), + match Self::send_ws_client_message(ws_client, ws_message).await { + // Web socket closed, tear down + Some(Ok(_)) => { + debug!( + "before sending response, web socket client has closed; {}", + "shutting down bifrost", + ); + + return Ok(BifrostClosing { + ws_is_closed: true, + handle: self.handle, + }); } + // Error sending message to client + Some(Err(err)) => return Err(err), + // Sucessfully sent web socket message to client + None => {} } } // We have a `RecvError` @@ -193,6 +382,7 @@ impl BifrostStarted { Ok(BifrostClosing { ws_is_closed: false, + handle: self.handle, }) } @@ -235,15 +425,42 @@ impl BifrostStarted { Ok(ws::Message::Text(payload_str)) } + + async fn send_ws_client_message( + ws_client: &mut ws::WebSocket, + ws_message: ws::Message, + ) -> Option> { + if let Err(err) = ws_client.send(ws_message).await { + match err + .source() + .and_then(|err| err.downcast_ref::()) + { + // If the websocket has cleanly closed, we should cleanly finish as + // well--this is not an error condition + Some(tungstenite::Error::ConnectionClosed) + | Some(tungstenite::Error::AlreadyClosed) => { + trace!("websocket has cleanly closed, ending"); + return Some(Ok(())); + } + _ => return Some(Err(BifrostError::WsSendIo(err))), + } + } + + None + } } #[derive(Debug)] pub struct BifrostClosing { ws_is_closed: bool, + handle: BifrostFriggReadsTaskHandle, } impl BifrostClosing { pub async fn finish(self, ws: WebSocket) -> Result<()> { + // Cancel and await shutdown of task + self.handle.await; + if !self.ws_is_closed { ws.close().await.map_err(BifrostError::WsClose)?; } @@ -251,3 +468,286 @@ impl BifrostClosing { Ok(()) } } + +mod task { + use std::{ + pin::Pin, + result, + task::{ + Context, + Poll, + }, + }; + + use frigg::FriggStore; + use futures::FutureExt; + use sdf_core::index::front_end_object_meta; + use telemetry::prelude::*; + use thiserror::Error; + use tokio::{ + sync::mpsc, + task::JoinHandle, + }; + use tokio_util::sync::CancellationToken; + + use super::{ + WsFrontEndObjectResponse, + WsFrontEndOjbectRequest, + }; + + #[derive(Debug, Error)] + pub(super) enum BifrostFriggReadsTaskError {} + + type Result = result::Result; + + #[derive(Debug)] + pub(super) struct BifrostFriggReadsTaskHandle { + join_handle: JoinHandle<()>, + task_token: CancellationToken, + internally_cancelled: bool, + } + + impl BifrostFriggReadsTaskHandle { + pub(super) fn new(join_handle: JoinHandle<()>, task_token: CancellationToken) -> Self { + Self { + join_handle, + task_token, + internally_cancelled: false, + } + } + } + + impl Drop for BifrostFriggReadsTaskHandle { + fn drop(&mut self) { + self.task_token.cancel(); + } + } + + impl Future for BifrostFriggReadsTaskHandle { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + if !self.internally_cancelled { + self.task_token.cancel(); + self.internally_cancelled = true; + } + + match futures::ready!(self.join_handle.poll_unpin(cx)) { + // Task finished successfully + Ok(_) => Poll::Ready(()), + // Task did not finish successfully + Err(join_err) => { + if join_err.is_panic() { + warn!("{} panicked reported on join", BifrostFriggReadsTask::NAME); + } else if join_err.is_cancelled() { + debug!("{} cancelled reported on join", BifrostFriggReadsTask::NAME); + } else { + warn!( + "{} errored for an unknown reason on join handle", + BifrostFriggReadsTask::NAME + ); + } + + Poll::Ready(()) + } + } + } + } + + #[derive(Debug)] + pub(super) struct BifrostFriggReadsTask { + frigg: FriggStore, + requests_rx: mpsc::Receiver, + responses_tx: mpsc::Sender, + token: CancellationToken, + } + + impl BifrostFriggReadsTask { + const NAME: &'static str = "sdf_v1_routes::bifrost::proto::bifrost_frigg_reads_task"; + + pub(super) fn create( + frigg: FriggStore, + requests_rx: mpsc::Receiver, + responses_tx: mpsc::Sender, + token: CancellationToken, + ) -> Self { + Self { + frigg, + requests_rx, + responses_tx, + token, + } + } + + pub(super) async fn run(mut self) { + if let Err(err) = self.try_run().await { + error!( + task = Self::NAME, + si.error.message = ?err, + "error while running {}", + Self::NAME, + ); + } + } + + async fn try_run(&mut self) -> Result<()> { + loop { + tokio::select! { + // Cancellation token has fired, time to shut down + _ = self.token.cancelled() => { + debug!(task = Self::NAME, "received cancellation"); + // Close requests channel to ensure to further values cannot be received + // and continue to process remaining values until channel is fully drained + self.requests_rx.close(); + } + // Maybe next request + maybe_request = self.requests_rx.recv() => { + match maybe_request { + // Next request + Some(request) => { + if let Err(err) = self.process_request(request).await { + error!( + task = Self::NAME, + si.error.message = ?err, + "error while processing bifrost frigg read request", + ); + } + } + // Channel is empty and closed + None => { + trace!( + task = Self::NAME, + "requests_rx is empty and/or closed; ending task", + ); + break; + } + } + } + } + } + + debug!(task = Self::NAME, "shutdown complete"); + Ok(()) + } + + async fn process_request(&self, ws_request: WsFrontEndOjbectRequest) -> Result<()> { + let ws_response = match front_end_object_meta( + &self.frigg, + ws_request.workspace_id, + ws_request.change_set_id, + &ws_request.request, + ) + .await + { + Ok(response) => WsFrontEndObjectResponse::Ok { + ws_request_id: ws_request.ws_request_id, + workspace_id: ws_request.workspace_id, + change_set_id: ws_request.change_set_id, + response, + }, + Err(err) => WsFrontEndObjectResponse::Err { + ws_request_id: ws_request.ws_request_id, + workspace_id: ws_request.workspace_id, + change_set_id: ws_request.change_set_id, + error: err.to_string(), + request: ws_request.request, + }, + }; + + if let Err(err) = self.responses_tx.send(ws_response).await { + error!( + task = Self::NAME, + si.error.message = ?err, + "error sending bifrost frigg read response; cancelling task", + ); + self.token.cancel(); + }; + + Ok(()) + } + } +} + +#[cfg(test)] +mod tests { + use serde_json::json; + use si_frontend_mv_types::object::FrontendObject; + + use super::*; + + // The following tests are here to help to print out what the request/response messages should + // look like. + // + // To see, uncommented the `panic!()` lines of whichever tests and run the unit tests. They + // will fail and print out the JSON representation using a pretty output format. + mod serialize { + use super::*; + + fn ws_request() -> WsFrontEndOjbectRequest { + WsFrontEndOjbectRequest { + ws_request_id: Some("123".to_string()), + workspace_id: "01JWW640R16P28HXPTZV1EAVDX".parse().unwrap(), + change_set_id: "01JWW6522C1XEG62RC01JMGBTV".parse().unwrap(), + request: FrontEndObjectRequest { + kind: "DooferDoodle".to_string(), + id: "1111".to_string(), + checksum: Some("1111_chk".to_string()), + }, + } + } + + #[test] + fn ws_front_end_object_request() { + let serialized = + serde_json::to_string_pretty(&ws_request()).expect("failed to serialize"); + + println!("{serialized}"); + + // panic!("let's see the serialized!"); + } + + #[test] + fn ws_front_end_object_response_ok() { + let response = WsFrontEndObjectResponse::Ok { + ws_request_id: Some("123".to_string()), + workspace_id: "01JWW640R16P28HXPTZV1EAVDX".parse().unwrap(), + change_set_id: "01JWW6522C1XEG62RC01JMGBTV".parse().unwrap(), + response: FrontEndObjectMeta { + workspace_snapshot_address: "wk_snap_addr".to_string(), + index_checksum: "idx_chk".to_string(), + front_end_object: FrontendObject { + kind: "DooferDoodle".to_string(), + id: "1111".to_string(), + checksum: "1111_chk".to_string(), + data: json!({ + "one": "two", + }), + }, + }, + }; + + let serialized = serde_json::to_string_pretty(&response).expect("failed to serialize"); + + println!("{serialized}"); + + // panic!("let's see the serialized!"); + } + + #[test] + fn ws_front_end_object_response_err() { + let response = WsFrontEndObjectResponse::Err { + ws_request_id: Some("123".to_string()), + workspace_id: "01JWW640R16P28HXPTZV1EAVDX".parse().unwrap(), + change_set_id: "01JWW6522C1XEG62RC01JMGBTV".parse().unwrap(), + error: "you made a boo boo".to_string(), + request: ws_request().request, + }; + + let serialized = serde_json::to_string_pretty(&response).expect("failed to serialize"); + + println!("{serialized}"); + + // panic!("let's see the serialized!"); + } + } +} diff --git a/lib/si-frontend-types-rs/src/index.rs b/lib/si-frontend-types-rs/src/index.rs new file mode 100644 index 0000000000..1a8677cdec --- /dev/null +++ b/lib/si-frontend-types-rs/src/index.rs @@ -0,0 +1,12 @@ +use serde::{ + Deserialize, + Serialize, +}; + +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct FrontEndObjectRequest { + pub kind: String, + pub id: String, + pub checksum: Option, +} diff --git a/lib/si-frontend-types-rs/src/lib.rs b/lib/si-frontend-types-rs/src/lib.rs index 2fa0f46e23..b491418fc5 100644 --- a/lib/si-frontend-types-rs/src/lib.rs +++ b/lib/si-frontend-types-rs/src/lib.rs @@ -5,6 +5,7 @@ mod component; mod conflict; pub mod fs; mod func; +mod index; mod module; pub mod schema_variant; mod workspace; @@ -48,6 +49,7 @@ pub use crate::{ FuncSummary, LeafInputLocation, }, + index::FrontEndObjectRequest, module::{ BuiltinModules, LatestModule,