From 112aba59b0e2b7d8414c7e462a8dba741ab37c3b Mon Sep 17 00:00:00 2001 From: Jacob Helwig Date: Fri, 13 Jun 2025 18:19:00 +0000 Subject: [PATCH 1/4] Try to reduce nats load by only inserting FrontEndObjects if they're not already in the KV store This doesn't reduce the amount of information sent over the wrire to nats on an insert, but nats should be able to short-circuit and avoid doing any write in the case where the object has already been generated for any reason. With the interface provided by the nats KV store, our options for only inserting if the key doesn't already exist looked to be: * Send the (potentially) new thing over the wire and watching for a failure (`create` behaves this way). * Try to get first (and receive the full value). * Iterate over all of the keys to see if the key is already there. Trying the insert using `create` and ignoring the "already exists" error kind seems to be the least-bad of the options. --- lib/frigg/src/lib.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/lib/frigg/src/lib.rs b/lib/frigg/src/lib.rs index f1ae4c28c9..9b9c7c5058 100644 --- a/lib/frigg/src/lib.rs +++ b/lib/frigg/src/lib.rs @@ -24,6 +24,7 @@ use si_data_nats::{ }, kv::{ self, + CreateErrorKind, Watch, }, stream::ConsumerError, @@ -142,7 +143,11 @@ impl FriggStore { &object.checksum, ); let value = serde_json::to_vec(&object).map_err(Error::Serialize)?; - self.store.put(key.as_str(), value.into()).await?; + if let Err(err) = self.store.create(key.as_str(), value.into()).await { + if !matches!(err.kind(), CreateErrorKind::AlreadyExists) { + return Err(Error::Create(err)); + } + }; Ok(key) } From a4f83dc878324b7fc8bbd1cb080afbfb88c4bdfd Mon Sep 17 00:00:00 2001 From: Jacob Helwig Date: Fri, 13 Jun 2025 18:26:50 +0000 Subject: [PATCH 2/4] Initial sketch of streaming patches to the front end from edda The idea with streaming patches to the front end is that it will be able to start using the new MVs as soon as they are generated, before _all_ of the data is ready, and there will be a message with the MvIndex patch (message not created/sent yet) that will let the front end know that the stream of patches has finished, and which objects it should have by the end (fetching any it missed the patch for). Right now, these streaming patches are sent in addition to the existing patch batches, but once we start using the streamed patches in the front end, we'll want to stop creating & sending the batched patches as it will be purely redundant from the streamed patches. We will also want to figure out if the existing `IndexUpdate` message should be augmented to include both the from-checksum & patch for the MvIndex, or if we want an entirely new message to handle this. --- .../materialized_view.rs | 31 +++++++--- lib/edda-server/src/updates.rs | 25 ++++++++ .../src/object/patch.rs | 62 ++++++++++++++++++- 3 files changed, 110 insertions(+), 8 deletions(-) diff --git a/lib/edda-server/src/change_set_processor_task/materialized_view.rs b/lib/edda-server/src/change_set_processor_task/materialized_view.rs index 51795cdc2c..387ea98c07 100644 --- a/lib/edda-server/src/change_set_processor_task/materialized_view.rs +++ b/lib/edda-server/src/change_set_processor_task/materialized_view.rs @@ -56,6 +56,7 @@ use si_frontend_mv_types::{ IndexUpdate, ObjectPatch, PatchBatch, + StreamingPatch, UpdateMeta, }, }, @@ -311,6 +312,7 @@ pub async fn build_all_mv_for_change_set( ctx, frigg, parallel_build_limit, + edda_updates, ctx.workspace_pk()?, ctx.change_set_id(), &changes, @@ -427,6 +429,7 @@ pub async fn build_mv_for_changes_in_change_set( ctx, frigg, parallel_build_limit, + edda_updates, workspace_id, change_set_id, changes, @@ -542,6 +545,7 @@ async fn build_mv_inner( ctx: &DalContext, frigg: &FriggStore, parallel_build_limit: usize, + edda_updates: &EddaUpdates, workspace_pk: si_id::WorkspacePk, change_set_id: ChangeSetId, changes: &[Change], @@ -615,13 +619,30 @@ async fn build_mv_inner( match execution_result { Ok((maybe_patch, maybe_frontend_object)) => { + // We need to make sure the frontend object is inserted into the store first so that + // a client can directly fetch it without racing against the object's insertion if the + // client does not already have the base object to apply the streaming patch to. + if let Some(frontend_object) = maybe_frontend_object { + frigg.insert_object(workspace_pk, &frontend_object).await?; + frontend_objects.push(frontend_object); + } if let Some(patch) = maybe_patch { + let streaming_patch = StreamingPatch::new( + workspace_pk, + change_set_id, + kind, + mv_id, + patch.from_checksum.clone(), + patch.to_checksum.clone(), + patch.patch.clone(), + ); + edda_updates + .publish_streaming_patch(streaming_patch) + .await?; + debug!("Patch!: {:?}", patch); patches.push(patch); } - if let Some(frontend_object) = maybe_frontend_object { - frontend_objects.push(frontend_object); - } build_count += 1; if build_duration > build_max_elapsed { build_max_elapsed = build_duration; @@ -636,10 +657,6 @@ async fn build_mv_inner( } } - frigg - .insert_objects(workspace_pk, frontend_objects.iter()) - .await?; - Ok(( frontend_objects, patches, diff --git a/lib/edda-server/src/updates.rs b/lib/edda-server/src/updates.rs index 71c75472c0..f43b39686d 100644 --- a/lib/edda-server/src/updates.rs +++ b/lib/edda-server/src/updates.rs @@ -15,6 +15,7 @@ use si_data_nats::{ use si_frontend_mv_types::object::patch::{ IndexUpdate, PatchBatch, + StreamingPatch, }; use si_id::WorkspacePk; use telemetry::prelude::*; @@ -79,6 +80,30 @@ impl EddaUpdates { .await } + #[instrument( + name = "edda_updates.publish_streaming_patch", + level = "debug", + skip_all, + fields() + )] + pub(crate) async fn publish_streaming_patch( + &self, + streaming_patch: StreamingPatch, + ) -> Result<()> { + let mut id_buf = WorkspacePk::array_to_str_buf(); + + self.serialize_compress_publish( + subject::update_for( + self.subject_prefix.as_deref(), + streaming_patch.workspace_id.array_to_str(&mut id_buf), + streaming_patch.message_kind(), + ), + streaming_patch, + true, + ) + .await + } + #[instrument( name = "edda_updates.publish_index_update", level = "debug", diff --git a/lib/si-frontend-mv-types-rs/src/object/patch.rs b/lib/si-frontend-mv-types-rs/src/object/patch.rs index efa8963ba8..671276bf1e 100644 --- a/lib/si-frontend-mv-types-rs/src/object/patch.rs +++ b/lib/si-frontend-mv-types-rs/src/object/patch.rs @@ -4,8 +4,11 @@ use si_id::{ WorkspacePk, }; -const PATCH_BATCH_KIND: &str = "PatchMessage"; +use crate::reference::ReferenceKind; + const INDEX_UPDATE_KIND: &str = "IndexUpdate"; +const PATCH_BATCH_KIND: &str = "PatchMessage"; +const STREAMING_PATCH_MESSAGE_KIND: &str = "StreamingPatch"; #[derive(Debug, Clone, Serialize)] #[serde(rename_all = "camelCase")] @@ -21,6 +24,63 @@ pub struct UpdateMeta { pub from_index_checksum: String, } +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct StreamingPatch { + /// The workspace this patch is targeting. + pub workspace_id: WorkspacePk, + /// The change set this patch is targeting. + pub change_set_id: ChangeSetId, + /// The MV kind this patch is targeting. + pub kind: String, + /// The ID of the object this patch is targeting. + pub id: String, + /// The original checksum of the object before this patch. + /// + /// Checksum of `"0"` means this is a new object that must be created. + pub from_checksum: String, + /// The new checksum of the object after this patch. + /// + /// Checksum of `"0"` means this is an existing object that must be removed + pub to_checksum: String, + /// The JSON patch to apply to the object. + /// + /// If neither of `from_checksum`, and `to_checksum` are all `0`, this field + /// contains the JSON Patch document to apply to the version of the object with + /// a checksum matching `from_checksum` that will result in a version with the + /// checksum matching `to_checksum`. + pub patch: json_patch::Patch, + /// The message kind for the front end. + message_kind: &'static str, +} + +impl StreamingPatch { + pub fn new( + workspace_id: WorkspacePk, + change_set_id: ChangeSetId, + kind: ReferenceKind, + id: String, + from_checksum: String, + to_checksum: String, + patch: json_patch::Patch, + ) -> Self { + Self { + workspace_id, + change_set_id, + kind: kind.to_string(), + id, + from_checksum, + to_checksum, + patch, + message_kind: STREAMING_PATCH_MESSAGE_KIND, + } + } + + pub fn message_kind(&self) -> &'static str { + self.message_kind + } +} + #[derive(Debug, Clone, Serialize)] #[serde(rename_all = "camelCase")] pub struct PatchBatch { From 808709c699ef19d8a9c03d5a769110200493e436 Mon Sep 17 00:00:00 2001 From: Jacob Helwig Date: Tue, 24 Jun 2025 16:07:33 +0000 Subject: [PATCH 3/4] Use a priority queue to determine order of MV builds in edda When streaming patches out to the front end, it would be helpful for the front end's reactivity to be able to send lists early so it can display skeletons for individual items it hasn't gotten the patch information for yet. Right now, there are two "priorities": * Lists * Details (everything else) By default an MV will use the "Details" priority. If the `build_priority` annotation is provided for the MV struct definition, then it will use whatever is specified in the annotation. This also re-structures how MV build tasks are spawned so that we can get everything queued before letting the priority queue handle things from there, to ensure the higher-priority items are always built first. Rather than having to explicitly check if the Change's `entity_kind` is the same as the MV's `trigger_entity`, this check has been abstracted away into the `MaterializedViewInventoryItem` and is checked during the initial queue population. This means that whenever we go to spawn an MV build task, we already know that the change is relevant for that specific MV kind. --- Cargo.lock | 1 + .../materialized_view.rs | 509 +++++++----------- lib/si-events-rs/src/lib.rs | 1 + lib/si-events-rs/src/materialized_view.rs | 14 + lib/si-events-rs/src/workspace_snapshot.rs | 2 +- lib/si-frontend-mv-types-macros/BUCK | 1 + lib/si-frontend-mv-types-macros/Cargo.toml | 2 + .../src/materialized_view.rs | 26 + lib/si-frontend-mv-types-rs/src/action.rs | 1 + lib/si-frontend-mv-types-rs/src/component.rs | 1 + .../src/incoming_connections.rs | 5 +- .../src/materialized_view.rs | 25 +- .../src/schema_variant.rs | 1 + lib/si-frontend-mv-types-rs/src/view.rs | 2 + 14 files changed, 273 insertions(+), 318 deletions(-) create mode 100644 lib/si-events-rs/src/materialized_view.rs diff --git a/Cargo.lock b/Cargo.lock index d50ffbd969..d17ddc47cc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8407,6 +8407,7 @@ dependencies = [ "manyhow", "proc-macro2", "quote", + "si-events", "syn 2.0.103", ] diff --git a/lib/edda-server/src/change_set_processor_task/materialized_view.rs b/lib/edda-server/src/change_set_processor_task/materialized_view.rs index 387ea98c07..644e85fe41 100644 --- a/lib/edda-server/src/change_set_processor_task/materialized_view.rs +++ b/lib/edda-server/src/change_set_processor_task/materialized_view.rs @@ -1,7 +1,7 @@ use std::{ collections::{ + BinaryHeap, HashSet, - VecDeque, }, time::Duration, }; @@ -28,10 +28,10 @@ use frigg::{ }; use si_events::{ WorkspaceSnapshotAddress, + materialized_view::BuildPriority, workspace_snapshot::Change, }; use si_frontend_mv_types::{ - MaterializedView, action::{ ActionPrototypeViewList as ActionPrototypeViewListMv, ActionViewList as ActionViewListMv, @@ -49,7 +49,10 @@ use si_frontend_mv_types::{ IncomingConnectionsList as IncomingConnectionsListMv, }, index::MvIndex, - materialized_view::materialized_view_definitions_checksum, + materialized_view::{ + MaterializedViewInventoryItem, + materialized_view_definitions_checksum, + }, object::{ FrontendObject, patch::{ @@ -79,7 +82,6 @@ use si_id::{ WorkspacePk, }; use si_layer_cache::LayerDbError; -use strum::IntoEnumIterator; use telemetry::prelude::*; use telemetry_utils::metric; use thiserror::Error; @@ -553,24 +555,17 @@ async fn build_mv_inner( let mut frontend_objects = Vec::new(); let mut patches = Vec::new(); let mut build_tasks = JoinSet::new(); - let mut queued_mv_builds = VecDeque::new(); + let mut queued_mv_builds = BinaryHeap::new(); - // We'll spawn up to the first `parallel_build_limit` build tasks, and queue the rest to be spawned - // as other build tasks are completed. + // Queue everything so we can let the priority queue determine the order everything is built. for &change in changes { - for mv_kind in ReferenceKind::iter() { - if let Some(queued_build) = spawn_build_mv_task_for_change_and_mv_kind( - &mut build_tasks, - ctx, - frigg, - parallel_build_limit, - change, - mv_kind, - workspace_pk, - ) - .await? - { - queued_mv_builds.push_back(queued_build); + for mv_inventory_item in ::inventory::iter::() { + if mv_inventory_item.should_build_for_change(change) { + queued_mv_builds.push(QueuedBuildMvTask { + change, + mv_kind: mv_inventory_item.kind(), + priority: mv_inventory_item.build_priority(), + }); } } } @@ -591,26 +586,23 @@ async fn build_mv_inner( // Spawn as many of the queued build tasks as we can, up to the concurrency limit. while !queued_mv_builds.is_empty() && build_tasks.len() < parallel_build_limit { - let Some(QueuedBuildMvTask { change, mv_kind }) = queued_mv_builds.pop_front() else { + let Some(QueuedBuildMvTask { + change, mv_kind, .. + }) = queued_mv_builds.pop() + else { // This _really_ shouldn't ever return `None` as we just checked that // `queued_mv_builds` is not empty. break; }; - // This _really_ shouldn't ever return `Some`, but better to be paranoid than to - // forget about pending work. - if let Some(queued_build) = spawn_build_mv_task_for_change_and_mv_kind( + spawn_build_mv_task_for_change_and_mv_kind( &mut build_tasks, ctx, frigg, - parallel_build_limit, change, mv_kind, workspace_pk, ) .await? - { - queued_mv_builds.push_back(queued_build); - } } if let Some(join_result) = build_tasks.join_next().await { @@ -775,356 +767,251 @@ where /// The [`Change`] of the trigger entity that would have spawned a build task for the /// `mv_kind`, if we hadn't already reached the concurrency limit for running build /// tasks. -#[derive(Debug)] +#[derive(Debug, Eq, PartialEq)] struct QueuedBuildMvTask { pub change: Change, pub mv_kind: ReferenceKind, + pub priority: BuildPriority, +} + +impl Ord for QueuedBuildMvTask { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + match self.priority.cmp(&other.priority) { + std::cmp::Ordering::Equal => { + // Within the same priority, items will be ordered by the triggering + // entity ID. Doing `self.cmp(other)` means that the newer IDs are + // be considered "larger", and items with the same priority will be + // processed newest to oldest acording to when their ID was created. + self.change.entity_id.cmp(&other.change.entity_id) + } + ord => ord, + } + } +} + +impl PartialOrd for QueuedBuildMvTask { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } } async fn spawn_build_mv_task_for_change_and_mv_kind( build_tasks: &mut JoinSet, ctx: &DalContext, frigg: &FriggStore, - parallel_build_limit: usize, change: Change, mv_kind: ReferenceKind, workspace_pk: si_id::WorkspacePk, -) -> Result, MaterializedViewError> { +) -> Result<(), MaterializedViewError> { match mv_kind { ReferenceKind::ActionPrototypeViewList => { let entity_mv_id = change.entity_id.to_string(); - let trigger_entity = ::trigger_entity(); - if change.entity_kind != trigger_entity { - return Ok(None); - } - - if build_tasks.len() < parallel_build_limit { - spawn_build_mv_task!( - build_tasks, - ctx, - frigg, - change, - entity_mv_id, - ActionPrototypeViewListMv, - dal_materialized_views::action_prototype_view_list::assemble( - ctx.clone(), - si_events::ulid::Ulid::from(change.entity_id).into() - ), - ); - } else { - return Ok(Some(QueuedBuildMvTask { change, mv_kind })); - } + spawn_build_mv_task!( + build_tasks, + ctx, + frigg, + change, + entity_mv_id, + ActionPrototypeViewListMv, + dal_materialized_views::action_prototype_view_list::assemble( + ctx.clone(), + si_events::ulid::Ulid::from(change.entity_id).into() + ), + ); } ReferenceKind::ActionViewList => { let workspace_mv_id = workspace_pk.to_string(); - let trigger_entity = ::trigger_entity(); - if change.entity_kind != trigger_entity { - return Ok(None); - } - - if build_tasks.len() < parallel_build_limit { - spawn_build_mv_task!( - build_tasks, - ctx, - frigg, - change, - workspace_mv_id, - ActionViewListMv, - dal_materialized_views::action_view_list::assemble(ctx.clone()), - ); - } else { - return Ok(Some(QueuedBuildMvTask { change, mv_kind })); - } + spawn_build_mv_task!( + build_tasks, + ctx, + frigg, + change, + workspace_mv_id, + ActionViewListMv, + dal_materialized_views::action_view_list::assemble(ctx.clone()), + ); } ReferenceKind::AttributeTree => { let entity_mv_id = change.entity_id.to_string(); - let trigger_entity = ::trigger_entity(); - if change.entity_kind != trigger_entity { - return Ok(None); - } - - if build_tasks.len() < parallel_build_limit { - spawn_build_mv_task!( - build_tasks, - ctx, - frigg, - change, - entity_mv_id, - AttributeTreeMv, - dal_materialized_views::component::attribute_tree::assemble( - ctx.clone(), - si_events::ulid::Ulid::from(change.entity_id).into(), - ), - ); - } else { - return Ok(Some(QueuedBuildMvTask { change, mv_kind })); - } + spawn_build_mv_task!( + build_tasks, + ctx, + frigg, + change, + entity_mv_id, + AttributeTreeMv, + dal_materialized_views::component::attribute_tree::assemble( + ctx.clone(), + si_events::ulid::Ulid::from(change.entity_id).into(), + ), + ); } ReferenceKind::ComponentInList => { let entity_mv_id = change.entity_id.to_string(); - let trigger_entity = ::trigger_entity(); - if change.entity_kind != trigger_entity { - return Ok(None); - } - - if build_tasks.len() < parallel_build_limit { - spawn_build_mv_task!( - build_tasks, - ctx, - frigg, - change, - entity_mv_id, - ComponentInListMv, - dal_materialized_views::component::assemble_in_list( - ctx.clone(), - si_events::ulid::Ulid::from(change.entity_id).into(), - ), - ); - } else { - return Ok(Some(QueuedBuildMvTask { change, mv_kind })); - } + spawn_build_mv_task!( + build_tasks, + ctx, + frigg, + change, + entity_mv_id, + ComponentInListMv, + dal_materialized_views::component::assemble_in_list( + ctx.clone(), + si_events::ulid::Ulid::from(change.entity_id).into(), + ), + ); } ReferenceKind::Component => { let entity_mv_id = change.entity_id.to_string(); - let trigger_entity = ::trigger_entity(); - if change.entity_kind != trigger_entity { - return Ok(None); - } - - if build_tasks.len() < parallel_build_limit { - spawn_build_mv_task!( - build_tasks, - ctx, - frigg, - change, - entity_mv_id, - ComponentMv, - dal_materialized_views::component::assemble( - ctx.clone(), - si_events::ulid::Ulid::from(change.entity_id).into(), - ), - ); - } else { - return Ok(Some(QueuedBuildMvTask { change, mv_kind })); - } + spawn_build_mv_task!( + build_tasks, + ctx, + frigg, + change, + entity_mv_id, + ComponentMv, + dal_materialized_views::component::assemble( + ctx.clone(), + si_events::ulid::Ulid::from(change.entity_id).into(), + ), + ); } ReferenceKind::ComponentList => { let workspace_mv_id = workspace_pk.to_string(); - let trigger_entity = ::trigger_entity(); - if change.entity_kind != trigger_entity { - return Ok(None); - } - - if build_tasks.len() < parallel_build_limit { - spawn_build_mv_task!( - build_tasks, - ctx, - frigg, - change, - workspace_mv_id, - ComponentListMv, - dal_materialized_views::component_list::assemble(ctx.clone(),), - ); - } else { - return Ok(Some(QueuedBuildMvTask { change, mv_kind })); - } + spawn_build_mv_task!( + build_tasks, + ctx, + frigg, + change, + workspace_mv_id, + ComponentListMv, + dal_materialized_views::component_list::assemble(ctx.clone(),), + ); } ReferenceKind::IncomingConnections => { let entity_mv_id = change.entity_id.to_string(); - let trigger_entity = ::trigger_entity(); - if change.entity_kind != trigger_entity { - return Ok(None); - } - - if build_tasks.len() < parallel_build_limit { - spawn_build_mv_task!( - build_tasks, - ctx, - frigg, - change, - entity_mv_id, - IncomingConnectionsMv, - dal_materialized_views::incoming_connections::assemble( - ctx.clone(), - si_events::ulid::Ulid::from(change.entity_id).into(), - ), - ); - } else { - return Ok(Some(QueuedBuildMvTask { change, mv_kind })); - } + spawn_build_mv_task!( + build_tasks, + ctx, + frigg, + change, + entity_mv_id, + IncomingConnectionsMv, + dal_materialized_views::incoming_connections::assemble( + ctx.clone(), + si_events::ulid::Ulid::from(change.entity_id).into(), + ), + ); } ReferenceKind::IncomingConnectionsList => { let workspace_mv_id = workspace_pk.to_string(); - let trigger_entity = ::trigger_entity(); - if change.entity_kind != trigger_entity { - return Ok(None); - } - - if build_tasks.len() < parallel_build_limit { - spawn_build_mv_task!( - build_tasks, - ctx, - frigg, - change, - workspace_mv_id, - IncomingConnectionsListMv, - dal_materialized_views::incoming_connections_list::assemble(ctx.clone()), - ); - } else { - return Ok(Some(QueuedBuildMvTask { change, mv_kind })); - } + spawn_build_mv_task!( + build_tasks, + ctx, + frigg, + change, + workspace_mv_id, + IncomingConnectionsListMv, + dal_materialized_views::incoming_connections_list::assemble(ctx.clone()), + ); } ReferenceKind::SchemaMembers => { let entity_mv_id = change.entity_id.to_string(); - let trigger_entity = ::trigger_entity(); - if change.entity_kind != trigger_entity { - return Ok(None); - } - - if build_tasks.len() < parallel_build_limit { - spawn_build_mv_task!( - build_tasks, - ctx, - frigg, - change, - entity_mv_id, - SchemaMembers, - dal_materialized_views::component::assemble_schema_members( - ctx.clone(), - si_events::ulid::Ulid::from(change.entity_id).into() - ), - ); - } else { - return Ok(Some(QueuedBuildMvTask { change, mv_kind })); - } + spawn_build_mv_task!( + build_tasks, + ctx, + frigg, + change, + entity_mv_id, + SchemaMembers, + dal_materialized_views::component::assemble_schema_members( + ctx.clone(), + si_events::ulid::Ulid::from(change.entity_id).into() + ), + ); } ReferenceKind::SchemaVariant => { let entity_mv_id = change.entity_id.to_string(); - let trigger_entity = ::trigger_entity(); - if change.entity_kind != trigger_entity { - return Ok(None); - } - - if build_tasks.len() < parallel_build_limit { - spawn_build_mv_task!( - build_tasks, - ctx, - frigg, - change, - entity_mv_id, - SchemaVariantMv, - dal_materialized_views::schema_variant::assemble( - ctx.clone(), - si_events::ulid::Ulid::from(change.entity_id).into() - ), - ); - } else { - return Ok(Some(QueuedBuildMvTask { change, mv_kind })); - } + spawn_build_mv_task!( + build_tasks, + ctx, + frigg, + change, + entity_mv_id, + SchemaVariantMv, + dal_materialized_views::schema_variant::assemble( + ctx.clone(), + si_events::ulid::Ulid::from(change.entity_id).into() + ), + ); } ReferenceKind::SchemaVariantCategories => { let workspace_mv_id = workspace_pk.to_string(); - let trigger_entity = ::trigger_entity(); - if change.entity_kind != trigger_entity { - return Ok(None); - } - - if build_tasks.len() < parallel_build_limit { - spawn_build_mv_task!( - build_tasks, - ctx, - frigg, - change, - workspace_mv_id, - SchemaVariantCategoriesMv, - dal_materialized_views::schema_variant_categories::assemble(ctx.clone()), - ); - } else { - return Ok(Some(QueuedBuildMvTask { change, mv_kind })); - } + spawn_build_mv_task!( + build_tasks, + ctx, + frigg, + change, + workspace_mv_id, + SchemaVariantCategoriesMv, + dal_materialized_views::schema_variant_categories::assemble(ctx.clone()), + ); } ReferenceKind::View => { let entity_mv_id = change.entity_id.to_string(); - let trigger_entity = ::trigger_entity(); - if change.entity_kind != trigger_entity { - return Ok(None); - } - - if build_tasks.len() < parallel_build_limit { - spawn_build_mv_task!( - build_tasks, - ctx, - frigg, - change, - entity_mv_id, - ViewMv, - dal_materialized_views::view::assemble( - ctx.clone(), - si_events::ulid::Ulid::from(change.entity_id).into() - ) - ); - } else { - return Ok(Some(QueuedBuildMvTask { change, mv_kind })); - } + spawn_build_mv_task!( + build_tasks, + ctx, + frigg, + change, + entity_mv_id, + ViewMv, + dal_materialized_views::view::assemble( + ctx.clone(), + si_events::ulid::Ulid::from(change.entity_id).into() + ) + ); } ReferenceKind::ViewList => { let workspace_mv_id = workspace_pk.to_string(); - let trigger_entity = ::trigger_entity(); - if change.entity_kind != trigger_entity { - return Ok(None); - } - - if build_tasks.len() < parallel_build_limit { - spawn_build_mv_task!( - build_tasks, - ctx, - frigg, - change, - workspace_mv_id, - ViewListMv, - dal_materialized_views::view_list::assemble(ctx.clone()), - ); - } else { - return Ok(Some(QueuedBuildMvTask { change, mv_kind })); - } + spawn_build_mv_task!( + build_tasks, + ctx, + frigg, + change, + workspace_mv_id, + ViewListMv, + dal_materialized_views::view_list::assemble(ctx.clone()), + ); } ReferenceKind::ViewComponentList => { let entity_mv_id = change.entity_id.to_string(); - let trigger_entity = ::trigger_entity(); - if change.entity_kind != trigger_entity { - return Ok(None); - } - - if build_tasks.len() < parallel_build_limit { - spawn_build_mv_task!( - build_tasks, - ctx, - frigg, - change, - entity_mv_id, - ViewComponentListMv, - dal_materialized_views::view_component_list::assemble( - ctx.clone(), - si_events::ulid::Ulid::from(change.entity_id).into(), - ), - ); - } else { - return Ok(Some(QueuedBuildMvTask { change, mv_kind })); - } + spawn_build_mv_task!( + build_tasks, + ctx, + frigg, + change, + entity_mv_id, + ViewComponentListMv, + dal_materialized_views::view_component_list::assemble( + ctx.clone(), + si_events::ulid::Ulid::from(change.entity_id).into(), + ), + ); } // Building the `MvIndex` itself is handled separately as the logic depends @@ -1136,5 +1023,5 @@ async fn spawn_build_mv_task_for_change_and_mv_kind( ReferenceKind::ChangeSetList | ReferenceKind::ChangeSetRecord => {} } - Ok(None) + Ok(()) } diff --git a/lib/si-events-rs/src/lib.rs b/lib/si-events-rs/src/lib.rs index f477691e21..4d81fbe1ac 100644 --- a/lib/si-events-rs/src/lib.rs +++ b/lib/si-events-rs/src/lib.rs @@ -28,6 +28,7 @@ mod func; mod func_execution; mod func_run; mod func_run_log; +pub mod materialized_view; mod resource_metadata; mod schema; mod schema_variant; diff --git a/lib/si-events-rs/src/materialized_view.rs b/lib/si-events-rs/src/materialized_view.rs new file mode 100644 index 0000000000..254f68ea05 --- /dev/null +++ b/lib/si-events-rs/src/materialized_view.rs @@ -0,0 +1,14 @@ +#[remain::sorted] +#[derive( + Debug, Copy, Clone, PartialOrd, Ord, PartialEq, Eq, Default, strum::Display, strum::EnumString, +)] +/// The build priority for MVs is: +/// * List +/// * Detail +/// +/// The order of the priorities is determined by the descriminant, largest to smallest. +pub enum BuildPriority { + #[default] + Detail = 5, + List = 10, +} diff --git a/lib/si-events-rs/src/workspace_snapshot.rs b/lib/si-events-rs/src/workspace_snapshot.rs index 321d1bbac1..270c421e7d 100644 --- a/lib/si-events-rs/src/workspace_snapshot.rs +++ b/lib/si-events-rs/src/workspace_snapshot.rs @@ -12,7 +12,7 @@ use crate::{ create_xxhash_type!(Checksum); -#[derive(Debug, Clone, Copy, Deserialize, Serialize)] +#[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq, Eq)] pub struct Change { pub entity_id: EntityId, pub entity_kind: EntityKind, diff --git a/lib/si-frontend-mv-types-macros/BUCK b/lib/si-frontend-mv-types-macros/BUCK index 43cdadd6f8..ecf87dd1cd 100644 --- a/lib/si-frontend-mv-types-macros/BUCK +++ b/lib/si-frontend-mv-types-macros/BUCK @@ -10,6 +10,7 @@ cargo_test( rust_library( name = "si-frontend-mv-types-macros", deps = [ + "//lib/si-events-rs:si-events", "//third-party/rust:darling", "//third-party/rust:manyhow", "//third-party/rust:proc-macro2", diff --git a/lib/si-frontend-mv-types-macros/Cargo.toml b/lib/si-frontend-mv-types-macros/Cargo.toml index 1efa31205f..36dab25bcd 100644 --- a/lib/si-frontend-mv-types-macros/Cargo.toml +++ b/lib/si-frontend-mv-types-macros/Cargo.toml @@ -12,6 +12,8 @@ publish.workspace = true proc-macro = true [dependencies] +si-events = { path = "../../lib/si-events-rs" } + darling = { workspace = true } manyhow = { workspace = true } proc-macro2 = { workspace = true } diff --git a/lib/si-frontend-mv-types-macros/src/materialized_view.rs b/lib/si-frontend-mv-types-macros/src/materialized_view.rs index dbe899bbba..5db7799607 100644 --- a/lib/si-frontend-mv-types-macros/src/materialized_view.rs +++ b/lib/si-frontend-mv-types-macros/src/materialized_view.rs @@ -1,3 +1,5 @@ +use std::str::FromStr; + use darling::FromAttributes; use manyhow::{ bail, @@ -8,6 +10,7 @@ use quote::{ format_ident, quote, }; +use si_events::materialized_view::BuildPriority; use syn::{ Data, DeriveInput, @@ -21,6 +24,7 @@ use crate::ty_to_string; struct MaterializedViewOptions { trigger_entity: Option, reference_kind: Option, + build_priority: Option, } pub fn derive_materialized_view( @@ -66,6 +70,23 @@ pub fn derive_materialized_view( let Some(self_reference_kind) = struct_options.reference_kind else { bail!(input, "MV must have a reference_kind attribute"); }; + let build_priority = { + let priority = match struct_options.build_priority { + Some(priority_string) => { + if let Ok(priority) = BuildPriority::from_str(&priority_string) { + priority + } else { + bail!( + input, + "Invalid build_priority; must be one of the ::si_events::materialized_view::BuildPriority variants." + ); + } + } + None => BuildPriority::default(), + } + .to_string(); + format_ident!("{}", priority) + }; let definition_checksum = { let mut hash_updates = TokenStream::new(); @@ -98,6 +119,10 @@ pub fn derive_materialized_view( fn definition_checksum() -> ::si_events::workspace_snapshot::Checksum { *#checksum_static_ident } + + fn build_priority() -> ::si_events::materialized_view::BuildPriority { + ::si_events::materialized_view::BuildPriority::#build_priority + } } static #checksum_static_ident: ::std::sync::LazyLock<::si_events::workspace_snapshot::Checksum> = @@ -109,6 +134,7 @@ pub fn derive_materialized_view( crate::materialized_view::MaterializedViewInventoryItem::new( #self_reference_kind, #trigger_entity, + ::si_events::materialized_view::BuildPriority::#build_priority, &#checksum_static_ident, ) }; diff --git a/lib/si-frontend-mv-types-rs/src/action.rs b/lib/si-frontend-mv-types-rs/src/action.rs index e099c4dd41..029c98575b 100644 --- a/lib/si-frontend-mv-types-rs/src/action.rs +++ b/lib/si-frontend-mv-types-rs/src/action.rs @@ -71,6 +71,7 @@ pub struct ActionView { #[mv( trigger_entity = EntityKind::CategoryAction, reference_kind = ReferenceKind::ActionViewList, + build_priority = "List", )] pub struct ActionViewList { pub id: WorkspacePk, diff --git a/lib/si-frontend-mv-types-rs/src/component.rs b/lib/si-frontend-mv-types-rs/src/component.rs index ba92094384..24e89f6223 100644 --- a/lib/si-frontend-mv-types-rs/src/component.rs +++ b/lib/si-frontend-mv-types-rs/src/component.rs @@ -160,6 +160,7 @@ pub struct SchemaMembers { #[mv( trigger_entity = EntityKind::CategoryComponent, reference_kind = ReferenceKind::ComponentList, + build_priority = "List", )] pub struct ComponentList { pub id: WorkspacePk, diff --git a/lib/si-frontend-mv-types-rs/src/incoming_connections.rs b/lib/si-frontend-mv-types-rs/src/incoming_connections.rs index 34b37fcccc..4d1a9d5bca 100644 --- a/lib/si-frontend-mv-types-rs/src/incoming_connections.rs +++ b/lib/si-frontend-mv-types-rs/src/incoming_connections.rs @@ -66,8 +66,9 @@ pub struct IncomingConnections { )] #[serde(rename_all = "camelCase")] #[mv( - trigger_entity = EntityKind::CategoryComponent, - reference_kind = ReferenceKind::IncomingConnectionsList, + trigger_entity = EntityKind::CategoryComponent, + reference_kind = ReferenceKind::IncomingConnectionsList, + build_priority = "List", )] pub struct IncomingConnectionsList { pub id: WorkspacePk, diff --git a/lib/si-frontend-mv-types-rs/src/materialized_view.rs b/lib/si-frontend-mv-types-rs/src/materialized_view.rs index b19d58bd15..1ba28c067a 100644 --- a/lib/si-frontend-mv-types-rs/src/materialized_view.rs +++ b/lib/si-frontend-mv-types-rs/src/materialized_view.rs @@ -1,7 +1,11 @@ -use si_events::workspace_snapshot::{ - Checksum, - ChecksumHasher, - EntityKind, +use si_events::{ + materialized_view::BuildPriority, + workspace_snapshot::{ + Change, + Checksum, + ChecksumHasher, + EntityKind, + }, }; use crate::{ @@ -13,12 +17,14 @@ pub trait MaterializedView { fn kind() -> ReferenceKind; fn trigger_entity() -> EntityKind; fn definition_checksum() -> Checksum; + fn build_priority() -> BuildPriority; } #[derive(Debug, Clone)] pub struct MaterializedViewInventoryItem { kind: ReferenceKind, trigger_entity: EntityKind, + build_priority: BuildPriority, definition_checksum: &'static ::std::sync::LazyLock, } @@ -26,11 +32,13 @@ impl MaterializedViewInventoryItem { pub const fn new( kind: ReferenceKind, trigger_entity: EntityKind, + build_priority: BuildPriority, definition_checksum: &'static ::std::sync::LazyLock, ) -> Self { MaterializedViewInventoryItem { kind, trigger_entity, + build_priority, definition_checksum, } } @@ -42,9 +50,18 @@ impl MaterializedViewInventoryItem { pub fn trigger_entity(&self) -> EntityKind { self.trigger_entity } + + pub fn build_priority(&self) -> BuildPriority { + self.build_priority + } + pub fn definition_checksum(&self) -> Checksum { **self.definition_checksum } + + pub fn should_build_for_change(&self, change: Change) -> bool { + change.entity_kind == self.trigger_entity + } } static MATERIALIZED_VIEW_DEFINITIONS_CHECKSUM: ::std::sync::LazyLock = diff --git a/lib/si-frontend-mv-types-rs/src/schema_variant.rs b/lib/si-frontend-mv-types-rs/src/schema_variant.rs index f9d7d01e1c..652ebb8780 100644 --- a/lib/si-frontend-mv-types-rs/src/schema_variant.rs +++ b/lib/si-frontend-mv-types-rs/src/schema_variant.rs @@ -179,6 +179,7 @@ pub struct SchemaVariantsByCategory { #[mv( trigger_entity = EntityKind::CategorySchema, reference_kind = ReferenceKind::SchemaVariantCategories, + build_priority = "List", )] pub struct SchemaVariantCategories { pub id: WorkspacePk, diff --git a/lib/si-frontend-mv-types-rs/src/view.rs b/lib/si-frontend-mv-types-rs/src/view.rs index bafea9acc9..c26d46e916 100644 --- a/lib/si-frontend-mv-types-rs/src/view.rs +++ b/lib/si-frontend-mv-types-rs/src/view.rs @@ -45,6 +45,7 @@ pub struct View { #[mv( trigger_entity = EntityKind::CategoryView, reference_kind = ReferenceKind::ViewList, + build_priority = "List", )] pub struct ViewList { pub id: WorkspacePk, @@ -65,6 +66,7 @@ pub struct ViewList { #[mv( trigger_entity = EntityKind::View, reference_kind = ReferenceKind::ViewComponentList, + build_priority = "List", )] pub struct ViewComponentList { pub id: ViewId, From 80d9155ef8b20551bb15cb33775e058d62f1a5d5 Mon Sep 17 00:00:00 2001 From: Jacob Helwig Date: Fri, 27 Jun 2025 14:07:42 +0000 Subject: [PATCH 4/4] Make streamed patches from edda a config setting instead of always on Rather than always sending the patches for the front end both as individual patches when each is built, and as a single batch after all have been built, there is now a `SI_EDDA__STREAMING_PATCHES` setting to toggle between the two behaviors. The default behavior is to send the patches as a single batch, making the newer streaming behavior opt-in for development & testing. --- lib/dal-test/src/lib.rs | 1 + lib/edda-server/src/config.rs | 16 ++++++++++++++++ lib/edda-server/src/server.rs | 9 +++++++-- lib/edda-server/src/updates.rs | 16 +++++++++++++++- 4 files changed, 39 insertions(+), 3 deletions(-) diff --git a/lib/dal-test/src/lib.rs b/lib/dal-test/src/lib.rs index b29ff91304..553f2cf03c 100644 --- a/lib/dal-test/src/lib.rs +++ b/lib/dal-test/src/lib.rs @@ -787,6 +787,7 @@ pub async fn edda_server( config.instance_id(), config.concurrency_limit(), config.parallel_build_limit(), + config.streaming_patches(), services_context, config.quiescent_period(), shutdown_token, diff --git a/lib/edda-server/src/config.rs b/lib/edda-server/src/config.rs index b7b0c56375..b450f99e98 100644 --- a/lib/edda-server/src/config.rs +++ b/lib/edda-server/src/config.rs @@ -67,6 +67,9 @@ pub struct Config { #[builder(default = "default_parallel_build_limit()")] parallel_build_limit: usize, + #[builder(default = "default_streaming_patches()")] + streaming_patches: bool, + #[builder(default = "PgPoolConfig::default()")] pg_pool: PgPoolConfig, @@ -101,6 +104,11 @@ impl Config { self.parallel_build_limit } + /// Gets whether edda should stream patches, or send as a single batch. + pub fn streaming_patches(&self) -> bool { + self.streaming_patches + } + /// Gets the config's instance ID. pub fn instance_id(&self) -> &str { self.instance_id.as_ref() @@ -157,6 +165,8 @@ pub struct ConfigFile { edda_concurrency_limit: Option, #[serde(default = "default_parallel_build_limit")] edda_parallel_build_limit: usize, + #[serde(default = "default_streaming_patches")] + streaming_patches: bool, #[serde(default)] pg: PgPoolConfig, #[serde(default)] @@ -177,6 +187,7 @@ impl Default for ConfigFile { instance_id: random_instance_id(), edda_concurrency_limit: default_concurrency_limit(), edda_parallel_build_limit: default_parallel_build_limit(), + streaming_patches: default_streaming_patches(), pg: Default::default(), nats: Default::default(), crypto: Default::default(), @@ -204,6 +215,7 @@ impl TryFrom for Config { config.symmetric_crypto_service(value.symmetric_crypto_service.try_into()?); config.layer_db_config(value.layer_db_config); config.concurrency_limit(value.edda_concurrency_limit); + config.streaming_patches(value.streaming_patches); config.parallel_build_limit(value.edda_parallel_build_limit); config.instance_id(value.instance_id); config.quiescent_period(Duration::from_secs(value.quiescent_period_secs)); @@ -223,6 +235,10 @@ fn default_parallel_build_limit() -> usize { DEFAULT_PARALLEL_BUILD_LIMIT } +fn default_streaming_patches() -> bool { + false +} + fn default_symmetric_crypto_config() -> SymmetricCryptoServiceConfigFile { SymmetricCryptoServiceConfigFile { active_key: None, diff --git a/lib/edda-server/src/server.rs b/lib/edda-server/src/server.rs index b74439eaaf..f224cec906 100644 --- a/lib/edda-server/src/server.rs +++ b/lib/edda-server/src/server.rs @@ -161,6 +161,7 @@ impl Server { config.instance_id().to_string(), config.concurrency_limit(), config.parallel_build_limit(), + config.streaming_patches(), services_context, config.quiescent_period(), shutdown_token, @@ -174,6 +175,7 @@ impl Server { instance_id: impl Into, concurrency_limit: Option, parallel_build_limit: usize, + streaming_patches: bool, services_context: ServicesContext, quiescent_period: Duration, shutdown_token: CancellationToken, @@ -200,8 +202,11 @@ impl Server { let frigg = FriggStore::new(nats.clone(), frigg_kv(&context, prefix.as_deref()).await?); - let edda_updates = - EddaUpdates::new(nats.clone(), services_context.compute_executor().clone()); + let edda_updates = EddaUpdates::new( + nats.clone(), + services_context.compute_executor().clone(), + streaming_patches, + ); let ctx_builder = DalContext::builder(services_context, false); diff --git a/lib/edda-server/src/updates.rs b/lib/edda-server/src/updates.rs index f43b39686d..b7c77e12be 100644 --- a/lib/edda-server/src/updates.rs +++ b/lib/edda-server/src/updates.rs @@ -42,10 +42,15 @@ pub(crate) struct EddaUpdates { compute_executor: DedicatedExecutor, subject_prefix: Option>, max_payload: usize, + streaming_patches: bool, } impl EddaUpdates { - pub(crate) fn new(nats: NatsClient, compute_executor: DedicatedExecutor) -> Self { + pub(crate) fn new( + nats: NatsClient, + compute_executor: DedicatedExecutor, + streaming_patches: bool, + ) -> Self { let subject_prefix = nats .metadata() .subject_prefix() @@ -56,6 +61,7 @@ impl EddaUpdates { compute_executor, subject_prefix, max_payload, + streaming_patches, } } @@ -66,6 +72,10 @@ impl EddaUpdates { fields() )] pub(crate) async fn publish_patch_batch(&self, patch_batch: PatchBatch) -> Result<()> { + if self.streaming_patches { + return Ok(()); + } + let mut id_buf = WorkspacePk::array_to_str_buf(); self.serialize_compress_publish( @@ -90,6 +100,10 @@ impl EddaUpdates { &self, streaming_patch: StreamingPatch, ) -> Result<()> { + if !self.streaming_patches { + return Ok(()); + } + let mut id_buf = WorkspacePk::array_to_str_buf(); self.serialize_compress_publish(