diff --git a/README.md b/README.md index d6a8dbc..4530cf5 100644 --- a/README.md +++ b/README.md @@ -72,6 +72,7 @@ The exported property contains an array of definitions, each linking a match to - `options.ignoreFromSelf`: Don't inform about changes that originated from the microservice to be informed (based on the hostname). - `options.retry`: (experimental) How many times the request is sent again on failure. Defaults to 0. Warning: in case of retries, deltas may be received out of order! - `options.retryTimeout`: (experimental) How much time is left in between retries (in ms). Currently defaults to 250ms. + - `options.sendMatchesOnly`: Only send triples that match, removing the other triples from the changes. ### Modifying quads #### Normalize datetime diff --git a/app.js b/app.js index 2ba5c84..7665650 100644 --- a/app.js +++ b/app.js @@ -1,23 +1,48 @@ -import { app } from 'mu'; +import { app, errorHandler } from 'mu'; import services from './config/rules'; import normalizeQuad from './config/normalize-quad'; import bodyParser from 'body-parser'; -import dns from 'dns'; import { foldChangeSets } from './folding'; import { sendRequest } from './send-request'; import { sendBundledRequest } from './bundle-requests'; +import { + filterChangesetsOnPattern, + tripleMatchesSpec, + filterMatchesForOrigin, + hostnameForEntry +} from './matching'; +import { + DEBUG_DELTA_MATCH, + DEBUG_DELTA_SEND, + DEBUG_TRIPLE_MATCHES_SPEC, + LOG_REQUESTS, + LOG_SERVER_CONFIGURATION, +} from './env'; // Log server config if requested -if( process.env["LOG_SERVER_CONFIGURATION"] ) +if(LOG_SERVER_CONFIGURATION) console.log(JSON.stringify( services )); +let index = 0; +const groupedServices = services.reduce((acc, service) => { + // Create a unique key for the match pattern + const matchKey = `${normalizeObject(service.match)}${service.options.sendMatchesOnly || false}`; + if (!acc[matchKey]) { + acc[matchKey] = []; + } + service.index = index++; + acc[matchKey].push(service); + return acc; +}, {}); + + app.get( '/', function( req, res ) { res.status(200); res.send("Hello, delta notification is running"); } ); app.post( '/', bodyParser.json({limit: '500mb'}), function( req, res ) { - if( process.env["LOG_REQUESTS"] ) { + if( LOG_REQUESTS ) { console.log("Logging request body"); console.log(req.body); } @@ -43,99 +68,83 @@ app.post( '/', bodyParser.json({limit: '500mb'}), function( req, res ) { res.status(204).send(); } ); -async function informWatchers( changeSets, res, muCallIdTrail, muSessionId ){ - services.map( async (entry, index) => { - entry.index = index; - // for each entity - if( process.env["DEBUG_DELTA_MATCH"] ) - console.log(`Checking if we want to send to ${entry.callback.url}`); +// log delta's ignored because of size +app.use((err, req, res, next) => { + if (err.type === 'entity.too.large') { + console.warn(`Payload too large for ${req.method} ${req.originalUrl}`); + return res.status(413).send('Payload too large'); + } - const matchSpec = entry.match; + // Pass other errors to the default handler + next(err); +}); - const originFilteredChangeSets = await filterMatchesForOrigin( changeSets, entry ); - if( process.env["DEBUG_TRIPLE_MATCHES_SPEC"] && entry.options.ignoreFromSelf ) - console.log(`There are ${originFilteredChangeSets.length} changes sets not from ${hostnameForEntry( entry )}`); +app.use(errorHandler); + +async function informWatchers( changeSets, res, muCallIdTrail, muSessionId ){ + // Iterate over each unique match pattern + for (const matchKey in groupedServices) { + const firstEntry = groupedServices[matchKey][0]; + // can use first entry since it's part of grouping + const sendMatchesOnly = firstEntry.options.sendMatchesOnly; + let maybePatternFilteredChangesets = changeSets; + if (sendMatchesOnly) { + maybePatternFilteredChangesets = filterChangesetsOnPattern(changeSets, firstEntry); + } let allInserts = []; let allDeletes = []; - - originFilteredChangeSets.forEach( (change) => { + maybePatternFilteredChangesets.forEach( (change) => { allInserts = [...allInserts, ...change.insert]; allDeletes = [...allDeletes, ...change.delete]; } ); - const changedTriples = [...allInserts, ...allDeletes]; - const someTripleMatchedSpec = - changedTriples - .some( (triple) => tripleMatchesSpec( triple, matchSpec ) ); - - if( process.env["DEBUG_TRIPLE_MATCHES_SPEC"] ) - console.log(`Triple matches spec? ${someTripleMatchedSpec}`); - - if( someTripleMatchedSpec ) { - // inform matching entities - if( process.env["DEBUG_DELTA_SEND"] ) - console.log(`Going to send ${entry.callback.method} to ${entry.callback.url}`); - - if( entry.options && entry.options.gracePeriod ) { - sendBundledRequest(entry, originFilteredChangeSets, muCallIdTrail, muSessionId); - } else { - const foldedChangeSets = foldChangeSets( entry, originFilteredChangeSets ); - sendRequest( entry, foldedChangeSets, muCallIdTrail, muSessionId ); + changedTriples + .some( (triple) => tripleMatchesSpec( triple, firstEntry.match ) ); + const matchingServices = groupedServices[matchKey]; + matchingServices.forEach( async (entry) => { + if( DEBUG_TRIPLE_MATCHES_SPEC ) + console.log(`Triple matches spec? ${someTripleMatchedSpec}`); + + if( someTripleMatchedSpec ) { + // for each entity + if( DEBUG_DELTA_MATCH ) + console.log(`Checking if we want to send to ${entry.callback.url}`); + const matchSpec = entry.match; + const originFilteredChangeSets = await filterMatchesForOrigin( maybePatternFilteredChangesets, entry ); + + if ( originFilteredChangeSets.length > 0 ) { + if( DEBUG_TRIPLE_MATCHES_SPEC && entry.options.ignoreFromSelf ) + console.log(`There are ${originFilteredChangeSets.length} changes sets not from ${hostnameForEntry( entry )}`); + + // inform matching entities + if( DEBUG_DELTA_SEND ) + console.log(`Going to send ${entry.callback.method} to ${entry.callback.url}`); + + if( entry.options?.gracePeriod ) { + sendBundledRequest(entry, originFilteredChangeSets, muCallIdTrail, muSessionId); + } else { + const foldedChangeSets = foldChangeSets( entry, originFilteredChangeSets ); + sendRequest( entry, foldedChangeSets, muCallIdTrail, muSessionId ); + } + } } - } - } ); -} - -function tripleMatchesSpec( triple, matchSpec ) { - // form of triple is {s, p, o}, same as matchSpec - if( process.env["DEBUG_TRIPLE_MATCHES_SPEC"] ) - console.log(`Does ${JSON.stringify(triple)} match ${JSON.stringify(matchSpec)}?`); - - for( let key in matchSpec ){ - // key is one of s, p, o - const subMatchSpec = matchSpec[key]; - const subMatchValue = triple[key]; - - if( subMatchSpec && !subMatchValue ) - return false; - - for( let subKey in subMatchSpec ) - // we're now matching something like {type: "url", value: "http..."} - if( subMatchSpec[subKey] !== subMatchValue[subKey] ) - return false; - } - return true; // no false matches found, let's send a response -} - -async function filterMatchesForOrigin( changeSets, entry ) { - if( ! entry.options || !entry.options.ignoreFromSelf ) { - return changeSets; - } else { - try { - const originIpAddress = await getServiceIp( entry ); - return changeSets.filter( (changeSet) => changeSet.origin != originIpAddress ); - } catch(e) { - console.error(`Could not filter changeset because an error was returned while looking up ip for ${entry.callback.url}`); - console.error(e); - return changeSets; - } + } ); } } -function hostnameForEntry( entry ) { - return (new URL(entry.callback.url)).hostname; +/** + * Normalizes an object by sorting its keys and converting it to a string. + * + * @param {Object} obj - The object to normalize. + * @returns {string} A string representation of the normalized object. + */ +function normalizeObject(obj) { + return JSON.stringify(Object.keys(obj) + .sort() + .reduce((acc, key) => { + acc[key] = obj[key]; + return acc; + }, {})); } - -async function getServiceIp(entry) { - const hostName = hostnameForEntry( entry ); - return new Promise( (resolve, reject) => { - dns.lookup( hostName, { family: 4 }, ( err, address) => { - if( err ) - reject( err ); - else - resolve( address ); - } ); - } ); -}; diff --git a/bundle-requests.js b/bundle-requests.js index 38b1226..233d728 100644 --- a/bundle-requests.js +++ b/bundle-requests.js @@ -1,5 +1,6 @@ import { foldChangeSets } from './folding'; import { sendRequest } from "./send-request.js"; +import { DEBUG_DELTA_SEND } from './env'; // map from bundle key to bundle object const bundles = {}; @@ -58,13 +59,13 @@ export const sendBundledRequest = ( existingBundle.bundledCallIdTrails.push(muCallIdTrail); // since an existing bundle exists, we don't need to send it after timeout, // the existing bundle will send us too - if (process.env["DEBUG_DELTA_SEND"]) { + if (DEBUG_DELTA_SEND) { console.log( `Adding to bundle for key ${bundleKey}, now contains ${existingBundle.changeSets.length} change sets` ); } } else { - if (process.env["DEBUG_DELTA_SEND"]) { + if (DEBUG_DELTA_SEND) { console.log( `Creating bundle for key ${bundleKey}, sending in ${entry.options.gracePeriod}ms` ); diff --git a/config/normalize-quad.js b/config/normalize-quad.js index c296860..7b51565 100644 --- a/config/normalize-quad.js +++ b/config/normalize-quad.js @@ -1,4 +1,4 @@ -const normalizeDate = process.env["NORMALIZE_DATETIME_IN_QUAD"]; +import { NORMALIZE_DATETIME_IN_QUAD as normalizeDate} from '../env'; export default function(quad) { if (normalizeDate && quad.object.datatype == 'http://www.w3.org/2001/XMLSchema#dateTime') { diff --git a/env.js b/env.js new file mode 100644 index 0000000..be7eeac --- /dev/null +++ b/env.js @@ -0,0 +1,8 @@ +export const DEBUG_DELTA_FOLD = process.env["DEBUG_DELTA_FOLD"]; +export const DEBUG_DELTA_MATCH = process.env["DEBUG_DELTA_MATCH"]; +export const DEBUG_DELTA_NOT_SENDING_EMPTY = process.env["DEBUG_DELTA_NOT_SENDING_EMPTY"]; +export const DEBUG_DELTA_SEND = process.env["DEBUG_DELTA_SEND"]; +export const DEBUG_TRIPLE_MATCHES_SPEC = process.env["DEBUG_TRIPLE_MATCHES_SPEC"]; +export const LOG_REQUESTS = process.env["LOG_REQUESTS"]; +export const LOG_SERVER_CONFIGURATION = process.env["LOG_SERVER_CONFIGURATION"]; +export const NORMALIZE_DATETIME_IN_QUAD = process.env["NORMALIZE_DATETIME_IN_QUAD"]; diff --git a/folding.js b/folding.js index d4a8f6e..efe316b 100644 --- a/folding.js +++ b/folding.js @@ -1,3 +1,4 @@ +import { DEBUG_DELTA_FOLD } from './env'; /** * Quads may be folded if * - a quad is deleted after it has been inserted @@ -44,7 +45,7 @@ export function foldChangeSets(entry, changeSets) { if (foldedInsertQuads.length) foldedChangeSets.push({ delete: [], insert: foldedInsertQuads }); - if (process.env["DEBUG_DELTA_FOLD"]) + if (DEBUG_DELTA_FOLD) console.log(`Folded changeset from:\n ${JSON.stringify(changeSets)}\nto:\n ${JSON.stringify(foldedChangeSets)}`); return foldedChangeSets; diff --git a/matching.js b/matching.js new file mode 100644 index 0000000..2b51a3b --- /dev/null +++ b/matching.js @@ -0,0 +1,81 @@ +import dns from 'dns'; +import { DEBUG_TRIPLE_MATCHES_SPEC } from './env'; +/** + * Filters the change sets based on a specified pattern. + * + * @param {Array} changeSets - An array of change set objects, + * each containing `insert` and `delete` properties. + * @param {Object} entry - An object containing the matching criteria. + * @param {Array} entry.match - The pattern used to filter the triples + * in the `insert` and `delete` arrays. + * @returns {Array} A new array of change set objects with + * filtered `insert` and `delete` properties. + */ +export function filterChangesetsOnPattern(changeSets, entry) { + const filteredChangesets = []; + for (const changeSet of changeSets) { + const { insert, delete: deleteSet, effectiveInsert, effectiveDelete } = changeSet; + const clonedChangeSet = { + ...changeSet, + insert: insert.filter((triple) => tripleMatchesSpec(triple, entry.match)), + delete: deleteSet.filter((triple) => tripleMatchesSpec(triple, entry.match)), + effectiveInsert: effectiveInsert.filter((triple) => tripleMatchesSpec(triple, entry.match)), + effectiveDelete: effectiveDelete.filter((triple) => tripleMatchesSpec(triple, entry.match)), + }; + filteredChangesets.push(clonedChangeSet); + }; + return filteredChangesets; +} + +export function tripleMatchesSpec( triple, matchSpec ) { + // form of triple is {s, p, o}, same as matchSpec + if(DEBUG_TRIPLE_MATCHES_SPEC) + console.log(`Does ${JSON.stringify(triple)} match ${JSON.stringify(matchSpec)}?`); + + for( let key in matchSpec ){ + // key is one of s, p, o + const subMatchSpec = matchSpec[key]; + const subMatchValue = triple[key]; + + if( subMatchSpec && !subMatchValue ) + return false; + + for( let subKey in subMatchSpec ) + // we're now matching something like {type: "url", value: "http..."} + if( subMatchSpec[subKey] !== subMatchValue[subKey] ) + return false; + } + return true; // no false matches found, let's send a response +} + + +export async function filterMatchesForOrigin( changeSets, entry ) { + if( ! entry.options || !entry.options.ignoreFromSelf ) { + return changeSets; + } else { + try { + const originIpAddress = await getServiceIp( entry ); + return changeSets.filter( (changeSet) => changeSet.origin != originIpAddress ); + } catch(e) { + console.error(`Could not filter changeset because an error was returned while looking up ip for ${entry.callback.url}`); + console.error(e); + return changeSets; + } + } +} + +export function hostnameForEntry( entry ) { + return (new URL(entry.callback.url)).hostname; +} + +async function getServiceIp(entry) { + const hostName = hostnameForEntry( entry ); + return new Promise( (resolve, reject) => { + dns.lookup( hostName, { family: 4 }, ( err, address) => { + if( err ) + reject( err ); + else + resolve( address ); + } ); + } ); +}; diff --git a/send-request.js b/send-request.js index e19a990..4e6cb7b 100644 --- a/send-request.js +++ b/send-request.js @@ -2,6 +2,8 @@ import http from "http"; import { uuid } from "mu"; const DEFAULT_RETRY_TIMEOUT = 250; +const DEBUG_DELTA_SEND = process.env["DEBUG_DELTA_SEND"]; +const DEBUG_DELTA_NOT_SENDING_EMPTY = process.env["DEBUG_DELTA_NOT_SENDING_EMPTY"]; function formatChangesetBody(changeSets, options) { if (options.resourceFormat == "v0.0.1") { @@ -115,7 +117,7 @@ export async function sendRequest( // we should send contents body = formatChangesetBody(changeSets, entry.options); } - if (process.env["DEBUG_DELTA_SEND"]) + if (DEBUG_DELTA_SEND) console.log(`Executing send ${method} to ${url}`); try { const keepAliveAgent = new http.Agent({ @@ -137,10 +139,10 @@ export async function sendRequest( retriesLeft ); } catch (error) { - console.log(error); + console.error(`Error sending delta to ${url}`, error); } } else { - if (process.env["DEBUG_DELTA_SEND"] || process.env["DEBUG_DELTA_NOT_SENDING_EMPTY"]) + if (DEBUG_DELTA_SEND || DEBUG_DELTA_NOT_SENDING_EMPTY) console.log(`Changeset empty. Not sending to ${entry.callback.method} ${entry.callback.url}`); } }