From dbf246d34488bdd53dd73d441e0107200b3a3196 Mon Sep 17 00:00:00 2001 From: Niels Vandekeybus Date: Thu, 23 Jan 2025 10:28:16 +0100 Subject: [PATCH] add an option to send only matches to receiving services this greatly reduces the size of delta messages in stacks with large data changes. --- README.md | 1 + app.js | 32 +++++++++++++++++++++++++++++++- 2 files changed, 32 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index d6a8dbc..4530cf5 100644 --- a/README.md +++ b/README.md @@ -72,6 +72,7 @@ The exported property contains an array of definitions, each linking a match to - `options.ignoreFromSelf`: Don't inform about changes that originated from the microservice to be informed (based on the hostname). - `options.retry`: (experimental) How many times the request is sent again on failure. Defaults to 0. Warning: in case of retries, deltas may be received out of order! - `options.retryTimeout`: (experimental) How much time is left in between retries (in ms). Currently defaults to 250ms. + - `options.sendMatchesOnly`: Only send triples that match, removing the other triples from the changes. ### Modifying quads #### Normalize datetime diff --git a/app.js b/app.js index 2ba5c84..138185c 100644 --- a/app.js +++ b/app.js @@ -43,6 +43,32 @@ app.post( '/', bodyParser.json({limit: '500mb'}), function( req, res ) { res.status(204).send(); } ); + +/** + * Filters the change sets based on a specified pattern. + * + * @param {Array} changeSets - An array of change set objects, + * each containing `insert` and `delete` properties. + * @param {Object} entry - An object containing the matching criteria. + * @param {Array} entry.match - The pattern used to filter the triples + * in the `insert` and `delete` arrays. + * @returns {Array} A new array of change set objects with + * filtered `insert` and `delete` properties. + */ +function filterChangesestOnPattern(changeSets, entry) { + const filteredChangesets = []; + for (const changeSet of changeSets) { + const { insert, delete: deleteSet } = changeSet; + const clonedChangeSet = { + ...changeSet, + insert: insert.filter((triple) => tripleMatchesSpec(triple, entry.match)), + delete: deleteSet.filter((triple) => tripleMatchesSpec(triple, entry.match)) + }; + filteredChangesets.push(clonedChangeSet); + }; + return filteredChangesets; +} + async function informWatchers( changeSets, res, muCallIdTrail, muSessionId ){ services.map( async (entry, index) => { entry.index = index; @@ -52,7 +78,11 @@ async function informWatchers( changeSets, res, muCallIdTrail, muSessionId ){ const matchSpec = entry.match; - const originFilteredChangeSets = await filterMatchesForOrigin( changeSets, entry ); + let maybePatternFilteredChangesets = changeSets; + if (entry.options?.sendMatchesOnly) { + maybePatternFilteredChangesets = filterChangesestOnPattern(changeSets, entry); + } + const originFilteredChangeSets = await filterMatchesForOrigin( maybePatternFilteredChangesets, entry ); if( process.env["DEBUG_TRIPLE_MATCHES_SPEC"] && entry.options.ignoreFromSelf ) console.log(`There are ${originFilteredChangeSets.length} changes sets not from ${hostnameForEntry( entry )}`);