Skip to content

optimized filtering #21

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ The exported property contains an array of definitions, each linking a match to
- `options.ignoreFromSelf`: Don't inform about changes that originated from the microservice to be informed (based on the hostname).
- `options.retry`: (experimental) How many times the request is sent again on failure. Defaults to 0. Warning: in case of retries, deltas may be received out of order!
- `options.retryTimeout`: (experimental) How much time is left in between retries (in ms). Currently defaults to 250ms.
- `options.sendMatchesOnly`: Only send triples that match, removing the other triples from the changes.

### Modifying quads
#### Normalize datetime
Expand Down
179 changes: 94 additions & 85 deletions app.js
Original file line number Diff line number Diff line change
@@ -1,23 +1,48 @@
import { app } from 'mu';
import { app, errorHandler } from 'mu';
import services from './config/rules';
import normalizeQuad from './config/normalize-quad';
import bodyParser from 'body-parser';
import dns from 'dns';
import { foldChangeSets } from './folding';
import { sendRequest } from './send-request';
import { sendBundledRequest } from './bundle-requests';
import {
filterChangesetsOnPattern,
tripleMatchesSpec,
filterMatchesForOrigin,
hostnameForEntry
} from './matching';
import {
DEBUG_DELTA_MATCH,
DEBUG_DELTA_SEND,
DEBUG_TRIPLE_MATCHES_SPEC,
LOG_REQUESTS,
LOG_SERVER_CONFIGURATION,
} from './env';

// Log server config if requested
if( process.env["LOG_SERVER_CONFIGURATION"] )
if(LOG_SERVER_CONFIGURATION)
console.log(JSON.stringify( services ));

let index = 0;
const groupedServices = services.reduce((acc, service) => {
// Create a unique key for the match pattern
const matchKey = `${normalizeObject(service.match)}${service.options.sendMatchesOnly || false}`;
if (!acc[matchKey]) {
acc[matchKey] = [];
}
service.index = index++;
acc[matchKey].push(service);
return acc;
}, {});


app.get( '/', function( req, res ) {
res.status(200);
res.send("Hello, delta notification is running");
} );

app.post( '/', bodyParser.json({limit: '500mb'}), function( req, res ) {
if( process.env["LOG_REQUESTS"] ) {
if( LOG_REQUESTS ) {
console.log("Logging request body");
console.log(req.body);
}
Expand All @@ -43,99 +68,83 @@ app.post( '/', bodyParser.json({limit: '500mb'}), function( req, res ) {
res.status(204).send();
} );

async function informWatchers( changeSets, res, muCallIdTrail, muSessionId ){
services.map( async (entry, index) => {
entry.index = index;
// for each entity
if( process.env["DEBUG_DELTA_MATCH"] )
console.log(`Checking if we want to send to ${entry.callback.url}`);
// log delta's ignored because of size
app.use((err, req, res, next) => {
if (err.type === 'entity.too.large') {
console.warn(`Payload too large for ${req.method} ${req.originalUrl}`);
return res.status(413).send('Payload too large');
}

const matchSpec = entry.match;
// Pass other errors to the default handler
next(err);
});

const originFilteredChangeSets = await filterMatchesForOrigin( changeSets, entry );
if( process.env["DEBUG_TRIPLE_MATCHES_SPEC"] && entry.options.ignoreFromSelf )
console.log(`There are ${originFilteredChangeSets.length} changes sets not from ${hostnameForEntry( entry )}`);
app.use(errorHandler);

async function informWatchers( changeSets, res, muCallIdTrail, muSessionId ){
// Iterate over each unique match pattern
for (const matchKey in groupedServices) {
const firstEntry = groupedServices[matchKey][0];
// can use first entry since it's part of grouping
const sendMatchesOnly = firstEntry.options.sendMatchesOnly;
let maybePatternFilteredChangesets = changeSets;
if (sendMatchesOnly) {
maybePatternFilteredChangesets = filterChangesetsOnPattern(changeSets, firstEntry);
}

let allInserts = [];
let allDeletes = [];

originFilteredChangeSets.forEach( (change) => {
maybePatternFilteredChangesets.forEach( (change) => {
allInserts = [...allInserts, ...change.insert];
allDeletes = [...allDeletes, ...change.delete];
} );

const changedTriples = [...allInserts, ...allDeletes];

const someTripleMatchedSpec =
changedTriples
.some( (triple) => tripleMatchesSpec( triple, matchSpec ) );

if( process.env["DEBUG_TRIPLE_MATCHES_SPEC"] )
console.log(`Triple matches spec? ${someTripleMatchedSpec}`);

if( someTripleMatchedSpec ) {
// inform matching entities
if( process.env["DEBUG_DELTA_SEND"] )
console.log(`Going to send ${entry.callback.method} to ${entry.callback.url}`);

if( entry.options && entry.options.gracePeriod ) {
sendBundledRequest(entry, originFilteredChangeSets, muCallIdTrail, muSessionId);
} else {
const foldedChangeSets = foldChangeSets( entry, originFilteredChangeSets );
sendRequest( entry, foldedChangeSets, muCallIdTrail, muSessionId );
changedTriples
.some( (triple) => tripleMatchesSpec( triple, firstEntry.match ) );
const matchingServices = groupedServices[matchKey];
matchingServices.forEach( async (entry) => {
if( DEBUG_TRIPLE_MATCHES_SPEC )
console.log(`Triple matches spec? ${someTripleMatchedSpec}`);

if( someTripleMatchedSpec ) {
// for each entity
if( DEBUG_DELTA_MATCH )
console.log(`Checking if we want to send to ${entry.callback.url}`);
const matchSpec = entry.match;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused code

Suggested change
const matchSpec = entry.match;

const originFilteredChangeSets = await filterMatchesForOrigin( maybePatternFilteredChangesets, entry );

if ( originFilteredChangeSets.length > 0 ) {
if( DEBUG_TRIPLE_MATCHES_SPEC && entry.options.ignoreFromSelf )
console.log(`There are ${originFilteredChangeSets.length} changes sets not from ${hostnameForEntry( entry )}`);

// inform matching entities
if( DEBUG_DELTA_SEND )
console.log(`Going to send ${entry.callback.method} to ${entry.callback.url}`);

if( entry.options?.gracePeriod ) {
sendBundledRequest(entry, originFilteredChangeSets, muCallIdTrail, muSessionId);
} else {
const foldedChangeSets = foldChangeSets( entry, originFilteredChangeSets );
sendRequest( entry, foldedChangeSets, muCallIdTrail, muSessionId );
}
}
}
}
} );
}

function tripleMatchesSpec( triple, matchSpec ) {
// form of triple is {s, p, o}, same as matchSpec
if( process.env["DEBUG_TRIPLE_MATCHES_SPEC"] )
console.log(`Does ${JSON.stringify(triple)} match ${JSON.stringify(matchSpec)}?`);

for( let key in matchSpec ){
// key is one of s, p, o
const subMatchSpec = matchSpec[key];
const subMatchValue = triple[key];

if( subMatchSpec && !subMatchValue )
return false;

for( let subKey in subMatchSpec )
// we're now matching something like {type: "url", value: "http..."}
if( subMatchSpec[subKey] !== subMatchValue[subKey] )
return false;
}
return true; // no false matches found, let's send a response
}

async function filterMatchesForOrigin( changeSets, entry ) {
if( ! entry.options || !entry.options.ignoreFromSelf ) {
return changeSets;
} else {
try {
const originIpAddress = await getServiceIp( entry );
return changeSets.filter( (changeSet) => changeSet.origin != originIpAddress );
} catch(e) {
console.error(`Could not filter changeset because an error was returned while looking up ip for ${entry.callback.url}`);
console.error(e);
return changeSets;
}
} );
}
}

function hostnameForEntry( entry ) {
return (new URL(entry.callback.url)).hostname;
/**
* Normalizes an object by sorting its keys and converting it to a string.
*
* @param {Object} obj - The object to normalize.
* @returns {string} A string representation of the normalized object.
*/
function normalizeObject(obj) {
return JSON.stringify(Object.keys(obj)
.sort()
.reduce((acc, key) => {
acc[key] = obj[key];
return acc;
}, {}));
}

async function getServiceIp(entry) {
const hostName = hostnameForEntry( entry );
return new Promise( (resolve, reject) => {
dns.lookup( hostName, { family: 4 }, ( err, address) => {
if( err )
reject( err );
else
resolve( address );
} );
} );
};
5 changes: 3 additions & 2 deletions bundle-requests.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { foldChangeSets } from './folding';
import { sendRequest } from "./send-request.js";
import { DEBUG_DELTA_SEND } from './env';

// map from bundle key to bundle object
const bundles = {};
Expand Down Expand Up @@ -58,13 +59,13 @@ export const sendBundledRequest = (
existingBundle.bundledCallIdTrails.push(muCallIdTrail);
// since an existing bundle exists, we don't need to send it after timeout,
// the existing bundle will send us too
if (process.env["DEBUG_DELTA_SEND"]) {
if (DEBUG_DELTA_SEND) {
console.log(
`Adding to bundle for key ${bundleKey}, now contains ${existingBundle.changeSets.length} change sets`
);
}
} else {
if (process.env["DEBUG_DELTA_SEND"]) {
if (DEBUG_DELTA_SEND) {
console.log(
`Creating bundle for key ${bundleKey}, sending in ${entry.options.gracePeriod}ms`
);
Expand Down
2 changes: 1 addition & 1 deletion config/normalize-quad.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
const normalizeDate = process.env["NORMALIZE_DATETIME_IN_QUAD"];
import { NORMALIZE_DATETIME_IN_QUAD as normalizeDate} from '../env';

export default function(quad) {
if (normalizeDate && quad.object.datatype == 'http://www.w3.org/2001/XMLSchema#dateTime') {
Expand Down
8 changes: 8 additions & 0 deletions env.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
export const DEBUG_DELTA_FOLD = process.env["DEBUG_DELTA_FOLD"];
export const DEBUG_DELTA_MATCH = process.env["DEBUG_DELTA_MATCH"];
export const DEBUG_DELTA_NOT_SENDING_EMPTY = process.env["DEBUG_DELTA_NOT_SENDING_EMPTY"];
export const DEBUG_DELTA_SEND = process.env["DEBUG_DELTA_SEND"];
export const DEBUG_TRIPLE_MATCHES_SPEC = process.env["DEBUG_TRIPLE_MATCHES_SPEC"];
export const LOG_REQUESTS = process.env["LOG_REQUESTS"];
export const LOG_SERVER_CONFIGURATION = process.env["LOG_SERVER_CONFIGURATION"];
export const NORMALIZE_DATETIME_IN_QUAD = process.env["NORMALIZE_DATETIME_IN_QUAD"];
3 changes: 2 additions & 1 deletion folding.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { DEBUG_DELTA_FOLD } from './env';
/**
* Quads may be folded if
* - a quad is deleted after it has been inserted
Expand Down Expand Up @@ -44,7 +45,7 @@ export function foldChangeSets(entry, changeSets) {
if (foldedInsertQuads.length)
foldedChangeSets.push({ delete: [], insert: foldedInsertQuads });

if (process.env["DEBUG_DELTA_FOLD"])
if (DEBUG_DELTA_FOLD)
console.log(`Folded changeset from:\n ${JSON.stringify(changeSets)}\nto:\n ${JSON.stringify(foldedChangeSets)}`);

return foldedChangeSets;
Expand Down
81 changes: 81 additions & 0 deletions matching.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import dns from 'dns';
import { DEBUG_TRIPLE_MATCHES_SPEC } from './env';
/**
* Filters the change sets based on a specified pattern.
*
* @param {Array<Object>} changeSets - An array of change set objects,
* each containing `insert` and `delete` properties.
* @param {Object} entry - An object containing the matching criteria.
* @param {Array} entry.match - The pattern used to filter the triples
* in the `insert` and `delete` arrays.
* @returns {Array<Object>} A new array of change set objects with
* filtered `insert` and `delete` properties.
*/
export function filterChangesetsOnPattern(changeSets, entry) {
const filteredChangesets = [];
for (const changeSet of changeSets) {
const { insert, delete: deleteSet, effectiveInsert, effectiveDelete } = changeSet;
const clonedChangeSet = {
...changeSet,
insert: insert.filter((triple) => tripleMatchesSpec(triple, entry.match)),
delete: deleteSet.filter((triple) => tripleMatchesSpec(triple, entry.match)),
effectiveInsert: effectiveInsert.filter((triple) => tripleMatchesSpec(triple, entry.match)),
effectiveDelete: effectiveDelete.filter((triple) => tripleMatchesSpec(triple, entry.match)),
};
filteredChangesets.push(clonedChangeSet);
};
return filteredChangesets;
}

export function tripleMatchesSpec( triple, matchSpec ) {
// form of triple is {s, p, o}, same as matchSpec
if(DEBUG_TRIPLE_MATCHES_SPEC)
console.log(`Does ${JSON.stringify(triple)} match ${JSON.stringify(matchSpec)}?`);

for( let key in matchSpec ){
// key is one of s, p, o
const subMatchSpec = matchSpec[key];
const subMatchValue = triple[key];

if( subMatchSpec && !subMatchValue )
return false;

for( let subKey in subMatchSpec )
// we're now matching something like {type: "url", value: "http..."}
if( subMatchSpec[subKey] !== subMatchValue[subKey] )
return false;
}
return true; // no false matches found, let's send a response
}


export async function filterMatchesForOrigin( changeSets, entry ) {
if( ! entry.options || !entry.options.ignoreFromSelf ) {
return changeSets;
} else {
try {
const originIpAddress = await getServiceIp( entry );
return changeSets.filter( (changeSet) => changeSet.origin != originIpAddress );
} catch(e) {
console.error(`Could not filter changeset because an error was returned while looking up ip for ${entry.callback.url}`);
console.error(e);
return changeSets;
}
}
}

export function hostnameForEntry( entry ) {
return (new URL(entry.callback.url)).hostname;
}

async function getServiceIp(entry) {
const hostName = hostnameForEntry( entry );
return new Promise( (resolve, reject) => {
dns.lookup( hostName, { family: 4 }, ( err, address) => {
if( err )
reject( err );
else
resolve( address );
} );
} );
};
8 changes: 5 additions & 3 deletions send-request.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ import http from "http";
import { uuid } from "mu";

const DEFAULT_RETRY_TIMEOUT = 250;
const DEBUG_DELTA_SEND = process.env["DEBUG_DELTA_SEND"];
const DEBUG_DELTA_NOT_SENDING_EMPTY = process.env["DEBUG_DELTA_NOT_SENDING_EMPTY"];

function formatChangesetBody(changeSets, options) {
if (options.resourceFormat == "v0.0.1") {
Expand Down Expand Up @@ -115,7 +117,7 @@ export async function sendRequest(
// we should send contents
body = formatChangesetBody(changeSets, entry.options);
}
if (process.env["DEBUG_DELTA_SEND"])
if (DEBUG_DELTA_SEND)
console.log(`Executing send ${method} to ${url}`);
try {
const keepAliveAgent = new http.Agent({
Expand All @@ -137,10 +139,10 @@ export async function sendRequest(
retriesLeft
);
} catch (error) {
console.log(error);
console.error(`Error sending delta to ${url}`, error);
}
} else {
if (process.env["DEBUG_DELTA_SEND"] || process.env["DEBUG_DELTA_NOT_SENDING_EMPTY"])
if (DEBUG_DELTA_SEND || DEBUG_DELTA_NOT_SENDING_EMPTY)
console.log(`Changeset empty. Not sending to ${entry.callback.method} ${entry.callback.url}`);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kept this, but I don't think this situation can occur.

}
}