Skip to content

add an option to send only matches to receiving services #20

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ The exported property contains an array of definitions, each linking a match to
- `options.ignoreFromSelf`: Don't inform about changes that originated from the microservice to be informed (based on the hostname).
- `options.retry`: (experimental) How many times the request is sent again on failure. Defaults to 0. Warning: in case of retries, deltas may be received out of order!
- `options.retryTimeout`: (experimental) How much time is left in between retries (in ms). Currently defaults to 250ms.
- `options.sendMatchesOnly`: Only send triples that match, removing the other triples from the changes.

### Modifying quads
#### Normalize datetime
Expand Down
32 changes: 31 additions & 1 deletion app.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,32 @@ app.post( '/', bodyParser.json({limit: '500mb'}), function( req, res ) {
res.status(204).send();
} );


/**
* Filters the change sets based on a specified pattern.
*
* @param {Array<Object>} changeSets - An array of change set objects,
* each containing `insert` and `delete` properties.
* @param {Object} entry - An object containing the matching criteria.
* @param {Array} entry.match - The pattern used to filter the triples
* in the `insert` and `delete` arrays.
* @returns {Array<Object>} A new array of change set objects with
* filtered `insert` and `delete` properties.
*/
function filterChangesestOnPattern(changeSets, entry) {
const filteredChangesets = [];
for (const changeSet of changeSets) {
const { insert, delete: deleteSet } = changeSet;
const clonedChangeSet = {
...changeSet,
insert: insert.filter((triple) => tripleMatchesSpec(triple, entry.match)),
delete: deleteSet.filter((triple) => tripleMatchesSpec(triple, entry.match))
};
filteredChangesets.push(clonedChangeSet);
};
return filteredChangesets;
}

async function informWatchers( changeSets, res, muCallIdTrail, muSessionId ){
services.map( async (entry, index) => {
entry.index = index;
Expand All @@ -52,7 +78,11 @@ async function informWatchers( changeSets, res, muCallIdTrail, muSessionId ){

const matchSpec = entry.match;

const originFilteredChangeSets = await filterMatchesForOrigin( changeSets, entry );
let maybePatternFilteredChangesets = changeSets;
if (entry.options?.sendMatchesOnly) {
maybePatternFilteredChangesets = filterChangesestOnPattern(changeSets, entry);
}
const originFilteredChangeSets = await filterMatchesForOrigin( maybePatternFilteredChangesets, entry );
if( process.env["DEBUG_TRIPLE_MATCHES_SPEC"] && entry.options.ignoreFromSelf )
console.log(`There are ${originFilteredChangeSets.length} changes sets not from ${hostnameForEntry( entry )}`);

Expand Down