Skip to content

Commit 3cc798e

Browse files
committed
Initial sketch of streaming patches to the front end from edda
The idea with streaming patches to the front end is that it will be able to start using the new MVs as soon as they are generated, before _all_ of the data is ready, and there will be a message with the MvIndex patch (message not created/sent yet) that will let the front end know that the stream of patches has finished, and which objects it should have by the end (fetching any it missed the patch for). Right now, these streaming patches are sent in addition to the existing patch batches, but once we start using the streamed patches in the front end, we'll want to stop creating & sending the batched patches as it will be purely redundant from the streamed patches. We will also want to figure out if the existing `IndexUpdate` message should be augmented to include both the from-checksum & patch for the MvIndex, or if we want an entirely new message to handle this.
1 parent a8fdca1 commit 3cc798e

File tree

3 files changed

+110
-8
lines changed

3 files changed

+110
-8
lines changed

lib/edda-server/src/change_set_processor_task/materialized_view.rs

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ use si_frontend_mv_types::{
5656
IndexUpdate,
5757
ObjectPatch,
5858
PatchBatch,
59+
StreamingPatch,
5960
UpdateMeta,
6061
},
6162
},
@@ -311,6 +312,7 @@ pub async fn build_all_mv_for_change_set(
311312
ctx,
312313
frigg,
313314
parallel_build_limit,
315+
edda_updates,
314316
ctx.workspace_pk()?,
315317
ctx.change_set_id(),
316318
&changes,
@@ -427,6 +429,7 @@ pub async fn build_mv_for_changes_in_change_set(
427429
ctx,
428430
frigg,
429431
parallel_build_limit,
432+
edda_updates,
430433
workspace_id,
431434
change_set_id,
432435
changes,
@@ -542,6 +545,7 @@ async fn build_mv_inner(
542545
ctx: &DalContext,
543546
frigg: &FriggStore,
544547
parallel_build_limit: usize,
548+
edda_updates: &EddaUpdates,
545549
workspace_pk: si_id::WorkspacePk,
546550
change_set_id: ChangeSetId,
547551
changes: &[Change],
@@ -615,13 +619,30 @@ async fn build_mv_inner(
615619

616620
match execution_result {
617621
Ok((maybe_patch, maybe_frontend_object)) => {
622+
// We need to make sure the frontend object is inserted into the store first so that
623+
// a client can directly fetch it without racing against the object's insertion if the
624+
// client does not already have the base object to apply the streaming patch to.
625+
if let Some(frontend_object) = maybe_frontend_object {
626+
frigg.insert_object(workspace_pk, &frontend_object).await?;
627+
frontend_objects.push(frontend_object);
628+
}
618629
if let Some(patch) = maybe_patch {
630+
let streaming_patch = StreamingPatch::new(
631+
workspace_pk,
632+
change_set_id,
633+
kind,
634+
mv_id,
635+
patch.from_checksum.clone(),
636+
patch.to_checksum.clone(),
637+
patch.patch.clone(),
638+
);
639+
edda_updates
640+
.publish_streaming_patch(streaming_patch)
641+
.await?;
642+
619643
debug!("Patch!: {:?}", patch);
620644
patches.push(patch);
621645
}
622-
if let Some(frontend_object) = maybe_frontend_object {
623-
frontend_objects.push(frontend_object);
624-
}
625646
build_count += 1;
626647
if build_duration > build_max_elapsed {
627648
build_max_elapsed = build_duration;
@@ -636,10 +657,6 @@ async fn build_mv_inner(
636657
}
637658
}
638659

639-
frigg
640-
.insert_objects(workspace_pk, frontend_objects.iter())
641-
.await?;
642-
643660
Ok((
644661
frontend_objects,
645662
patches,

lib/edda-server/src/updates.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use si_data_nats::{
1515
use si_frontend_mv_types::object::patch::{
1616
IndexUpdate,
1717
PatchBatch,
18+
StreamingPatch,
1819
};
1920
use si_id::WorkspacePk;
2021
use telemetry::prelude::*;
@@ -79,6 +80,30 @@ impl EddaUpdates {
7980
.await
8081
}
8182

83+
#[instrument(
84+
name = "edda_updates.publish_streaming_patch",
85+
level = "debug",
86+
skip_all,
87+
fields()
88+
)]
89+
pub(crate) async fn publish_streaming_patch(
90+
&self,
91+
streaming_patch: StreamingPatch,
92+
) -> Result<()> {
93+
let mut id_buf = WorkspacePk::array_to_str_buf();
94+
95+
self.serialize_compress_publish(
96+
subject::update_for(
97+
self.subject_prefix.as_deref(),
98+
streaming_patch.workspace_id.array_to_str(&mut id_buf),
99+
streaming_patch.message_kind(),
100+
),
101+
streaming_patch,
102+
true,
103+
)
104+
.await
105+
}
106+
82107
#[instrument(
83108
name = "edda_updates.publish_index_update",
84109
level = "debug",

lib/si-frontend-mv-types-rs/src/object/patch.rs

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,11 @@ use si_id::{
44
WorkspacePk,
55
};
66

7-
const PATCH_BATCH_KIND: &str = "PatchMessage";
7+
use crate::reference::ReferenceKind;
8+
89
const INDEX_UPDATE_KIND: &str = "IndexUpdate";
10+
const PATCH_BATCH_KIND: &str = "PatchMessage";
11+
const STREAMING_PATCH_MESSAGE_KIND: &str = "StreamingPatch";
912

1013
#[derive(Debug, Clone, Serialize)]
1114
#[serde(rename_all = "camelCase")]
@@ -21,6 +24,63 @@ pub struct UpdateMeta {
2124
pub from_index_checksum: String,
2225
}
2326

27+
#[derive(Debug, Clone, Serialize)]
28+
#[serde(rename_all = "camelCase")]
29+
pub struct StreamingPatch {
30+
/// The workspace this patch is targeting.
31+
pub workspace_id: WorkspacePk,
32+
/// The change set this patch is targeting.
33+
pub change_set_id: ChangeSetId,
34+
/// The MV kind this patch is targeting.
35+
pub kind: String,
36+
/// The ID of the object this patch is targeting.
37+
pub id: String,
38+
/// The original checksum of the object before this patch.
39+
///
40+
/// Checksum of `"0"` means this is a new object that must be created.
41+
pub from_checksum: String,
42+
/// The new checksum of the object after this patch.
43+
///
44+
/// Checksum of `"0"` means this is an existing object that must be removed
45+
pub to_checksum: String,
46+
/// The JSON patch to apply to the object.
47+
///
48+
/// If neither of `from_checksum`, and `to_checksum` are all `0`, this field
49+
/// contains the JSON Patch document to apply to the version of the object with
50+
/// a checksum matching `from_checksum` that will result in a version with the
51+
/// checksum matching `to_checksum`.
52+
pub patch: json_patch::Patch,
53+
/// The message kind for the front end.
54+
message_kind: &'static str,
55+
}
56+
57+
impl StreamingPatch {
58+
pub fn new(
59+
workspace_id: WorkspacePk,
60+
change_set_id: ChangeSetId,
61+
kind: ReferenceKind,
62+
id: String,
63+
from_checksum: String,
64+
to_checksum: String,
65+
patch: json_patch::Patch,
66+
) -> Self {
67+
Self {
68+
workspace_id,
69+
change_set_id,
70+
kind: kind.to_string(),
71+
id,
72+
from_checksum,
73+
to_checksum,
74+
patch,
75+
message_kind: STREAMING_PATCH_MESSAGE_KIND,
76+
}
77+
}
78+
79+
pub fn message_kind(&self) -> &'static str {
80+
self.message_kind
81+
}
82+
}
83+
2484
#[derive(Debug, Clone, Serialize)]
2585
#[serde(rename_all = "camelCase")]
2686
pub struct PatchBatch {

0 commit comments

Comments
 (0)