Skip to content

Stream patches to the front end from edda; try to reduce nats write load; priority queue for MV builds #6410

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

jhelwig
Copy link
Contributor

@jhelwig jhelwig commented Jun 13, 2025

Initial sketch of streaming patches to the front end from edda

The idea with streaming patches to the front end is that it will be able to start using the new MVs as soon as they are generated, before all of the data is ready, and there will be a message with the MvIndex patch (message not created/sent yet) that will let the front end know that the stream of patches has finished, and which objects it should have by the end (fetching any it missed the patch for).

Right now, these streaming patches are sent in addition to the existing patch batches, but once we start using the streamed patches in the front end, we'll want to stop creating & sending the batched patches as it will be purely redundant from the streamed patches.

We will also want to figure out if the existing IndexUpdate message should be augmented to include both the from-checksum & patch for the MvIndex, or if we want an entirely new message to handle this.

Try to reduce nats load by only inserting FrontEndObjects if they're not already in the KV store

This doesn't reduce the amount of information sent over the wrire to nats on an insert, but nats should be able to short-circuit and avoid doing any write in the case where the object has already been generated for any reason.

With the interface provided by the nats KV store, our options for only inserting if the key doesn't already exist looked to be:

  • Send the (potentially) new thing over the wire and watching for a failure (create behaves this way).
  • Try to get first (and receive the full value).
  • Iterate over all of the keys to see if the key is already there.

Trying the insert using create and ignoring the "already exists" error kind seems to be the least-bad of the options.

Use a priority queue to determine order of MV builds in edda

When streaming patches out to the front end, it would be helpful for the front end's reactivity to be able to send lists early so it can display skeletons for individual items it hasn't gotten the patch information for yet.

Right now, there are two "priorities":

  • Lists
  • Details (everything else)

By default an MV will use the "Details" priority. If the build_priority annotation is provided for the MV struct definition, then it will use whatever is specified in the annotation.

This also re-structures how MV build tasks are spawned so that we can get everything queued before letting the priority queue handle things from there, to ensure the higher-priority items are always built first. Rather than having to explicitly check if the Change's entity_kind is the same as the MV's trigger_entity, this check has been abstracted away into the MaterializedViewInventoryItem and is checked during the initial queue population. This means that whenever we go to spawn an MV build task, we already know that the change is relevant for that specific MV kind.

Make streamed patches from edda a config setting instead of always on

Rather than always sending the patches for the front end both as individual patches when each is built, and as a single batch after all have been built, there is now a SI_EDDA__STREAMING_PATCHES setting to toggle between the two behaviors. The default behavior is to send the patches as a single batch, making the newer streaming behavior opt-in for development & testing.

Copy link

github-actions bot commented Jun 13, 2025

Dependency Review

✅ No vulnerabilities or OpenSSF Scorecard issues found.

Scanned Files

None

@jhelwig
Copy link
Contributor Author

jhelwig commented Jun 13, 2025

An example of one of the streamed patches on the websocket:

{"workspaceId":"01GWAJW3PQ0S1DVBN76XH9DBHV","changeSetId":"01JXN8KGNG2CK01ETFH33VBNJE","kind":"Component","id":"01JXN8MRMF6AZ50CTMXPKT741Y","fromChecksum":"616b523fb65078f0063ad308e6e7cf1b","toChecksum":"214531f044b7b66cc11976eb8fea6847","patch":[{"op":"replace","path":"/resourceDiff/current","value":"{\n  \"si\": {\n    \"name\": \"StackFleetAssociation\",\n    \"type\": \"component\",\n    \"color\": \"#FF9900\"\n  },\n  \"domain\": {\n    \"extra\": {\n      \"Region\": \"us-east-1\",\n      \"AwsResourceType\": \"AWS::AppStream::StackFleetAssociation\",\n      \"PropUsageMap\": \"{\\\"createOnly\\\":[],\\\"updatable\\\":[\\\"StackName\\\",\\\"FleetName\\\"],\\\"secrets\\\":[]}\"\n    }\n  }\n}"},{"op":"replace","path":"/resourceDiff/diff","value":"+{\n+  \"si\": {\n+    \"name\": \"StackFleetAssociation\",\n+    \"type\": \"component\",\n+    \"color\": \"#FF9900\"\n+  },\n+  \"domain\": {\n+    \"extra\": {\n+      \"Region\": \"us-east-1\",\n+      \"AwsResourceType\": \"AWS::AppStream::StackFleetAssociation\",\n+      \"PropUsageMap\": \"{\\\"createOnly\\\":[],\\\"updatable\\\":[\\\"StackName\\\",\\\"FleetName\\\"],\\\"secrets\\\":[]}\"\n+    }\n+  }\n+}"}],"messageKind":"StreamingPatch"}

Human readable-ish version:

{
  "workspaceId": "01GWAJW3PQ0S1DVBN76XH9DBHV",
  "changeSetId": "01JXN8KGNG2CK01ETFH33VBNJE",
  "kind": "Component",
  "id": "01JXN8MRMF6AZ50CTMXPKT741Y",
  "fromChecksum": "616b523fb65078f0063ad308e6e7cf1b",
  "toChecksum": "214531f044b7b66cc11976eb8fea6847",
  "patch": [
    {
      "op": "replace",
      "path": "/resourceDiff/current",
      "value": "{\n  \"si\": {\n    \"name\": \"StackFleetAssociation\",\n    \"type\": \"component\",\n    \"color\": \"#FF9900\"\n  },\n  \"domain\": {\n    \"extra\": {\n      \"Region\": \"us-east-1\",\n      \"AwsResourceType\": \"AWS::AppStream::StackFleetAssociation\",\n      \"PropUsageMap\": \"{\\\"createOnly\\\":[],\\\"updatable\\\":[\\\"StackName\\\",\\\"FleetName\\\"],\\\"secrets\\\":[]}\"\n    }\n  }\n}"
    },
    {
      "op": "replace",
      "path": "/resourceDiff/diff",
      "value": "+{\n+  \"si\": {\n+    \"name\": \"StackFleetAssociation\",\n+    \"type\": \"component\",\n+    \"color\": \"#FF9900\"\n+  },\n+  \"domain\": {\n+    \"extra\": {\n+      \"Region\": \"us-east-1\",\n+      \"AwsResourceType\": \"AWS::AppStream::StackFleetAssociation\",\n+      \"PropUsageMap\": \"{\\\"createOnly\\\":[],\\\"updatable\\\":[\\\"StackName\\\",\\\"FleetName\\\"],\\\"secrets\\\":[]}\"\n+    }\n+  }\n+}"
    }
  ],
  "messageKind": "StreamingPatch"
}

@jobelenus
Copy link
Contributor

@jhelwig thank you for the example payload. Is that id field the "placeholder" (e.g. that value will be in the "end" message for the index update with the resulting index checksum?)

@jhelwig
Copy link
Contributor Author

jhelwig commented Jun 13, 2025

The id in the sample payload is the ID of the atom/MV. At the time the streaming patch is sent, we don't yet know what the index checksum will be.

@jhelwig
Copy link
Contributor Author

jhelwig commented Jun 13, 2025

The "placeholder"/"staging" index checksum should probably be the change set id since we mostly treat the checksum as an opaque string, and it will never overlap with a "real" checksum.

@jhelwig jhelwig force-pushed the jhelwig/streaming-patches-from-edda branch from ef6faed to 943d8c5 Compare June 16, 2025 16:55
fnichol
fnichol previously approved these changes Jun 16, 2025
Copy link
Contributor

@fnichol fnichol left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like your approach to send both the incremental and the batch as we can migrate/experiment with both approaches.

Looks good!

@@ -522,13 +532,30 @@ async fn build_mv_inner(

match execution_result {
Ok((maybe_patch, maybe_frontend_object)) => {
// We need to make sure the frontend object is inserted into the store first so that
// a client can directly fetch it without racing against the object's insertion if the
// client does not already have the base object to apply the streaming patch to.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍🏽

@jhelwig jhelwig force-pushed the jhelwig/streaming-patches-from-edda branch from 943d8c5 to 0a977ff Compare June 23, 2025 17:02
@github-actions github-actions bot added the A-dal label Jun 23, 2025
@jhelwig jhelwig marked this pull request as draft June 24, 2025 20:31
@jhelwig jhelwig force-pushed the jhelwig/streaming-patches-from-edda branch from 0a977ff to 20714b9 Compare June 24, 2025 20:42
@github-actions github-actions bot added A-si-settings Area: Backend service settings management [Rust] A-sdf Area: Primary backend API service [Rust] A-veritech Area: Task execution backend service [Rust] A-cyclone Area: Function execution engine [Rust] A-bytes-lines-codec A-config-file A-si-test-macros A-telemetry-rs A-dal-test A-si-data-nats A-si-data-pg A-si-std A-telemetry-application-rs A-pinga labels Jun 24, 2025
@jhelwig jhelwig force-pushed the jhelwig/streaming-patches-from-edda branch 4 times, most recently from 40a27d9 to 8e52275 Compare June 26, 2025 18:55
jhelwig added 4 commits June 27, 2025 13:56
…not already in the KV store

This doesn't reduce the amount of information sent over the wrire to
nats on an insert, but nats should be able to short-circuit and avoid
doing any write in the case where the object has already been generated
for any reason.

With the interface provided by the nats KV store, our options for only
inserting if the key doesn't already exist looked to be:

* Send the (potentially) new thing over the wire and watching for a
  failure (`create` behaves this way).
* Try to get first (and receive the full value).
* Iterate over all of the keys to see if the key is already there.

Trying the insert using `create` and ignoring the "already exists" error
kind seems to be the least-bad of the options.
The idea with streaming patches to the front end is that it will be able
to start using the new MVs as soon as they are generated, before _all_
of the data is ready, and there will be a message with the MvIndex patch
(message not created/sent yet) that will let the front end know that the
stream of patches has finished, and which objects it should have by the
end (fetching any it missed the patch for).

Right now, these streaming patches are sent in addition to the existing
patch batches, but once we start using the streamed patches in the front
end, we'll want to stop creating & sending the batched patches as it
will be purely redundant from the streamed patches.

We will also want to figure out if the existing `IndexUpdate` message
should be augmented to include both the from-checksum & patch for the
MvIndex, or if we want an entirely new message to handle this.
When streaming patches out to the front end, it would be helpful for the
front end's reactivity to be able to send lists early so it can display
skeletons for individual items it hasn't gotten the patch information
for yet.

Right now, there are two "priorities":
  * Lists
  * Details (everything else)

By default an MV will use the "Details" priority. If the
`build_priority` annotation is provided for the MV struct definition,
then it will use whatever is specified in the annotation.

This also re-structures how MV build tasks are spawned so that we can
get everything queued before letting the priority queue handle things
from there, to ensure the higher-priority items are always built first.
Rather than having to explicitly check if the Change's `entity_kind` is
the same as the MV's `trigger_entity`, this check has been abstracted
away into the `MaterializedViewInventoryItem` and is checked during the
initial queue population. This means that whenever we go to spawn an MV
build task, we already know that the change is relevant for that
specific MV kind.
Rather than always sending the patches for the front end both as
individual patches when each is built, and as a single batch after all
have been built, there is now a `SI_EDDA__STREAMING_PATCHES` setting to
toggle between the two behaviors. The default behavior is to send the
patches as a single batch, making the newer streaming behavior opt-in
for development & testing.
@jhelwig jhelwig force-pushed the jhelwig/streaming-patches-from-edda branch from 8e52275 to 80d9155 Compare June 27, 2025 14:11
@jhelwig jhelwig marked this pull request as ready for review June 27, 2025 14:13
@jhelwig jhelwig requested a review from fnichol June 27, 2025 14:32
@jhelwig jhelwig changed the title Stream patches to the front end from edda; try to reduce nats write load Stream patches to the front end from edda; try to reduce nats write load; priority queue for MV builds Jun 27, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants