diff --git a/README.md b/README.md index cb44e1f..d9b5b6c 100644 --- a/README.md +++ b/README.md @@ -69,14 +69,41 @@ The exported property contains an array of definitions, each linking a match to - `options.resourceFormat`: Version format describing the format of the contents. Keys may be added to this format, but they may not be removed. Filter the properties as needed. - `options.gracePeriod`: Only send the response after a certain amount of time. This will group changes in the future. - `options.ignoreFromSelf`: Don't inform about changes that originated from the microservice to be informed (based on the hostname). + - `options.matchOnEffective`: Only match with effective inserts and deletions, not all inserts and deletions. Default: false. + - `options.requestPerMuCallIdTrail`: Execute one request per muCallIdTrail. mu-authorization batches multiple requests, which might have different idTrails. ## Delta formats The delta may be offered in multiple formats. Versions should match the exact string. Specify `options.resourceFormat` to indicate the specific resourceformat. +#### v0.0.2 + +v0.0.2 is the last format added. It makes a distinction between all inserts and deletions and effective inserts and deletions. An inserts is effective if the triple is not yet in the triplestore. A deletion is effective if the triple was present in the triplestore. + +This version uses an updated mu-authorization that batches inserts and deletions together to improve performance. This imposes the need for an `index` field. This `index` is incremental starting from Unix time in milliseconds. + +```json + [ + { "inserts": [{"subject": { "type": "uri", "value": "http://mu.semte.ch/" }, + "predicate": { "type": "uri", "value": "http://www.w3.org/1999/02/22-rdf-syntax-ns#type" }, + "object": { "type": "uri", "value": "https://schema.org/Project" }}, + {"subject": { "type": "uri", "value": "http://mu.semte.ch/" }, + "predicate": { "type": "uri", "value": "http://purl.org/dc/terms/modified" }, + "object": { "type": "literal", "value": "https://schema.org/Project", "datatype": "http://www.w3.org/2001/XMLSchema#dateTime"}}], + "deletes": [], + "effectiveInserts": [{"subject": { "type": "uri", "value": "http://mu.semte.ch/" }, + "predicate": { "type": "uri", "value": "http://www.w3.org/1999/02/22-rdf-syntax-ns#type" }, + "object": { "type": "uri", "value": "https://schema.org/Project" }}], + "effectiveDeletions": [], + "index" :1629367246152 } + ] +``` + + + ### v0.0.1 -v0.0.1 is the latest format of the delta messages. It may be extended with authorization rights etc. in the future. The value encoding follows the [json-sparql spec RDF term encoding](https://www.w3.org/TR/sparql11-results-json/#select-encode-terms). For example: +v0.0.1 may be extended with authorization rights etc. in the future. The value encoding follows the [json-sparql spec RDF term encoding](https://www.w3.org/TR/sparql11-results-json/#select-encode-terms). For example: ```json [ @@ -95,7 +122,7 @@ v0.0.1 is the latest format of the delta messages. It may be extended with autho Genesis format as described by the initial Delta service PoC. It looks like: ```json - { + { "delta": { "inserts": [{"s": "http://mu.semte.ch/", "p": "http://www.w3.org/1999/02/22-rdf-syntax-ns#type", @@ -115,7 +142,7 @@ Debugging can be enabled in the service by setting environment variables. The f - `DEBUG_DELTA_SEND`: Logs all delta messages that are being sent to clients - `DEBUG_DELTA_MATCH`: Logs a check for each target block, indicating a check will occur - `DEBUG_TRIPLE_MATCHES_SPEC`: Extensive logging for triples matching a given specification. Handy when requests are unexpectedly not sent. - + ## Extending You are encouraged to help figure out how to best extend this service. Fork this repository. Run an experiment. Open an issue or PR describing your experiment. Feel free to open up an issue if you would like to discuss a possible extension. diff --git a/app.js b/app.js index 97c710a..75b3b5d 100644 --- a/app.js +++ b/app.js @@ -5,208 +5,269 @@ import bodyParser from 'body-parser'; import dns from 'dns'; // Also parse application/json as json -app.use( bodyParser.json( { - type: function(req) { - return /^application\/json/.test( req.get('content-type') ); +app.use(bodyParser.json({ + type: function (req) { + return /^application\/json/.test(req.get('content-type')); }, limit: '500mb' -} ) ); +})); // Log server config if requested -if( process.env["LOG_SERVER_CONFIGURATION"] ) - console.log(JSON.stringify( services )); +if (process.env["LOG_SERVER_CONFIGURATION"]) + console.log(JSON.stringify(services)); -app.get( '/', function( req, res ) { +app.get('/', function (req, res) { res.status(200); res.send("Hello, delta notification is running"); -} ); +}); -app.post( '/', function( req, res ) { - if( process.env["LOG_REQUESTS"] ) { +app.post('/', function (req, res) { + if (process.env["LOG_REQUESTS"]) { console.log("Logging request body"); console.log(req.body); } const changeSets = req.body.changeSets; - const originalMuCallIdTrail = JSON.parse( req.get('mu-call-id-trail') || "[]" ); + // const originalMuCallIdTrail = JSON.parse(req.get('mu-call-id-trail') || "[]"); const originalMuCallId = req.get('mu-call-id'); - const muCallIdTrail = JSON.stringify( [...originalMuCallIdTrail, originalMuCallId] ); + // const muCallIdTrail = JSON.stringify([...originalMuCallIdTrail, originalMuCallId]); - changeSets.forEach( (change) => { + changeSets.forEach((change) => { + change.effectiveInserts = change.effectiveInserts || []; + change.effectiveDeletes = change.effectiveDeletes || []; change.insert = change.insert || []; change.delete = change.delete || []; - } ); + }); // inform watchers - informWatchers( changeSets, res, muCallIdTrail ); + informWatchers(changeSets, res, originalMuCallId); // push relevant data to interested actors res.status(204).send(); -} ); +}); + +function getMatchOnEffective(entry) { + if (!(entry.options && "matchOnEffective" in entry.options)) return false; // DEFAULT + return entry.options.matchOnEffective; +} + +function getRequestPerMuCallIdTrail(entry) { + if (!(entry.options && "requestPerMuCallIdTrail" in entry.options)) return true; // DEFAULT + return entry.options.requestPerMuCallIdTrail; +} -async function informWatchers( changeSets, res, muCallIdTrail ){ - services.map( async (entry) => { +async function informWatchers(changeSets, res, originalMuCallId) { + services.map(async (entry) => { // for each entity - if( process.env["DEBUG_DELTA_MATCH"] ) + if (process.env["DEBUG_DELTA_MATCH"]) console.log(`Checking if we want to send to ${entry.callback.url}`); const matchSpec = entry.match; - const originFilteredChangeSets = await filterMatchesForOrigin( changeSets, entry ); - if( process.env["DEBUG_TRIPLE_MATCHES_SPEC"] && entry.options.ignoreFromSelf ) - console.log(`There are ${originFilteredChangeSets.length} changes sets not from ${hostnameForEntry( entry )}`); + const originFilteredChangeSets = await filterMatchesForOrigin(changeSets, entry); + if (process.env["DEBUG_TRIPLE_MATCHES_SPEC"] && entry.options.ignoreFromSelf) + console.log(`There are ${originFilteredChangeSets.length} changes sets not from ${hostnameForEntry(entry)}`); - let allInserts = []; - let allDeletes = []; - originFilteredChangeSets.forEach( (change) => { - allInserts = [...allInserts, ...change.insert]; - allDeletes = [...allDeletes, ...change.delete]; - } ); + const triple_matches_f = (triple) => tripleMatchesSpec(triple, matchSpec); - const changedTriples = [...allInserts, ...allDeletes]; + const someTripleMatchedSpec = getMatchOnEffective(entry) ? originFilteredChangeSets.some((change) => + change.effectiveInserts.some(triple_matches_f) || + change.effectiveDeletes.some(triple_matches_f) + ) : originFilteredChangeSets.some((change) => + change.insert.some(triple_matches_f) || + change.delete.some(triple_matches_f) + ); - const someTripleMatchedSpec = - changedTriples - .some( (triple) => tripleMatchesSpec( triple, matchSpec ) ); - if( process.env["DEBUG_TRIPLE_MATCHES_SPEC"] ) + if (process.env["DEBUG_TRIPLE_MATCHES_SPEC"]) console.log(`Triple matches spec? ${someTripleMatchedSpec}`); - if( someTripleMatchedSpec ) { + if (someTripleMatchedSpec) { // inform matching entities - if( process.env["DEBUG_DELTA_SEND"] ) + if (process.env["DEBUG_DELTA_SEND"]) console.log(`Going to send ${entry.callback.method} to ${entry.callback.url}`); - if( entry.options && entry.options.gracePeriod ) { + if (entry.options && entry.options.gracePeriod) { setTimeout( - () => sendRequest( entry, originFilteredChangeSets, muCallIdTrail ), - entry.options.gracePeriod ); + () => sendRequest(entry, originFilteredChangeSets, originalMuCallId), + entry.options.gracePeriod); } else { - sendRequest( entry, originFilteredChangeSets, muCallIdTrail ); + sendRequest(entry, originFilteredChangeSets, originalMuCallId); } } - } ); + }); } -function tripleMatchesSpec( triple, matchSpec ) { +function tripleMatchesSpec(triple, matchSpec) { // form of triple is {s, p, o}, same as matchSpec - if( process.env["DEBUG_TRIPLE_MATCHES_SPEC"] ) + if (process.env["DEBUG_TRIPLE_MATCHES_SPEC"]) console.log(`Does ${JSON.stringify(triple)} match ${JSON.stringify(matchSpec)}?`); - for( let key in matchSpec ){ + for (let key in matchSpec) { // key is one of s, p, o const subMatchSpec = matchSpec[key]; const subMatchValue = triple[key]; - if( subMatchSpec && !subMatchValue ) + if (subMatchSpec && !subMatchValue) return false; - for( let subKey in subMatchSpec ) + for (let subKey in subMatchSpec) // we're now matching something like {type: "url", value: "http..."} - if( subMatchSpec[subKey] !== subMatchValue[subKey] ) + if (subMatchSpec[subKey] !== subMatchValue[subKey]) return false; } return true; // no false matches found, let's send a response } -function formatChangesetBody( changeSets, options ) { - if( options.resourceFormat == "v0.0.1" ) { - return JSON.stringify( - changeSets.map( (change) => { - return { - inserts: change.insert, - deletes: change.delete - }; - } ) ); - } - if( options.resourceFormat == "v0.0.0-genesis" ) { - // [{delta: {inserts, deletes}] - const newOptions = Object.assign({}, options, { resourceFormat: "v0.0.1" }); - const newFormat = JSON.parse( formatChangesetBody( changeSets, newOptions ) ); - return JSON.stringify({ - // graph: Not available - delta: { - inserts: newFormat - .flatMap( ({inserts}) => inserts) - .map( ({subject,predicate,object}) => - ( { s: subject.value, p: predicate.value, o: object.value } ) ), - deletes: newFormat - .flatMap( ({deletes}) => deletes) - .map( ({subject,predicate,object}) => - ( { s: subject.value, p: predicate.value, o: object.value } ) ) - } - }); - } else { - throw `Unknown resource format ${options.resourceFormat}`; +function formatChangesetBody(changeSets, options) { + switch (options.resourceFormat) { + case "v0.0.2": + return formatV002(changeSets, options); + case "v0.0.1": + return formatV001(changeSets, options); + case "v0.0.0-genesis": + return formatV000Genesis(changeSets, options); + default: + throw `Unknown resource format ${options.resourceFormat}`; } } -async function sendRequest( entry, changeSets, muCallIdTrail ) { - let requestObject; // will contain request information + +function formatV002(changeSets, options) { + return JSON.stringify( + changeSets.map((change) => { + return { + inserts: change.insert, + deletes: change.delete, + effectiveInserts: change.effectiveInserts, + effectiveDeletes: change.effectiveDeletes, + index: change.index + }; + })); +} + +function formatV001(changeSets, options) { + return JSON.stringify( + changeSets.map((change) => { + return { + inserts: change.insert, + deletes: change.delete + }; + })); +} + +function formatV000Genesis(changeSets, options) { + const newOptions = Object.assign({}, options, { resourceFormat: "v0.0.1" }); + const newFormat = JSON.parse(formatV001(changeSets, newOptions)); + return JSON.stringify({ + // graph: Not available + delta: { + inserts: newFormat + .flatMap(({ inserts }) => inserts) + .map(({ subject, predicate, object }) => + ({ s: subject.value, p: predicate.value, o: object.value })), + deletes: newFormat + .flatMap(({ deletes }) => deletes) + .map(({ subject, predicate, object }) => + ({ s: subject.value, p: predicate.value, o: object.value })) + } + }); +} + +function createMuCallIdTrail(trail, originalMuCallId) { + const originalMuCallIdTrail = JSON.parse(trail); + const muCallIdTrail = JSON.stringify([...originalMuCallIdTrail, originalMuCallId]); + return muCallIdTrail; +} + +async function sendRequest(entry, changeSets, originalMuCallId) { + const requestObjects = []; // will contain request information + + let changesPerMuCallIdTrail = {}; + + if (getRequestPerMuCallIdTrail(entry)) { + for (let change of changeSets) { + const trail = change.muCallIdTrail || '[]'; + if (!changesPerMuCallIdTrail[trail]) changesPerMuCallIdTrail[trail] = []; + changesPerMuCallIdTrail[trail].push(change); + } + } else { + // Generic purposes, just one element to loop over + changesPerMuCallIdTrail[changeSets[0].muCallIdTrail || '[]'] = changeSets; + } // construct the requestObject const method = entry.callback.method; const url = entry.callback.url; - const headers = { "Content-Type": "application/json", "MU-AUTH-ALLOWED-GROUPS": changeSets[0].allowedGroups, "mu-call-id-trail": muCallIdTrail, "mu-call-id": uuid() }; - if( entry.options && entry.options.resourceFormat ) { - // we should send contents - const body = formatChangesetBody( changeSets, entry.options ); + for (let muCallIdTrail in changesPerMuCallIdTrail) { + const full_trail = createMuCallIdTrail(muCallIdTrail, originalMuCallId); - // TODO: we now assume the mu-auth-allowed-groups will be the same - // for each changeSet. that's a simplification and we should not - // depend on it. + if (entry.options && entry.options.resourceFormat) { + const current_changes = changesPerMuCallIdTrail[muCallIdTrail]; - requestObject = { - url, method, - headers, - body: body - }; - } else { - // we should only inform - requestObject = { url, method, headers }; + const headers = { "Content-Type": "application/json", "MU-AUTH-ALLOWED-GROUPS": current_changes[0].allowedGroups, "mu-call-id-trail": full_trail, "mu-call-id": uuid() }; + // we should send contents + const body = formatChangesetBody(current_changes, entry.options); + + // TODO: we now assume the mu-auth-allowed-groups will be the same + // for each changeSet. that's a simplification and we should not + // depend on it. + + requestObjects.push({ + url, method, + headers, + body: body + }); + } else { + // we should only inform + requestObjects.push({ url, method, headers }); + } } - if( process.env["DEBUG_DELTA_SEND"] ) + + if (process.env["DEBUG_DELTA_SEND"]) console.log(`Executing send ${method} to ${url}`); - request( requestObject, function( error, response, body ) { - if( error ) { - console.log(`Could not send request ${method} ${url}`); - console.log(error); - console.log(`NOT RETRYING`); // TODO: retry a few times when delta's fail to send - } + for (let requestObject of requestObjects) { + request(requestObject, function (error, response, body) { + if (error) { + console.log(`Could not send request ${method} ${url}`); + console.log(error); + console.log(`NOT RETRYING`); // TODO: retry a few times when delta's fail to send + } - if( response ) { - // console.log( body ); - } - }); + if (response) { + // console.log( body ); + } + }); + } } -async function filterMatchesForOrigin( changeSets, entry ) { - if( ! entry.options || !entry.options.ignoreFromSelf ) { +async function filterMatchesForOrigin(changeSets, entry) { + if (!entry.options || !entry.options.ignoreFromSelf) { return changeSets; } else { - const originIpAddress = await getServiceIp( entry ); - return changeSets.filter( (changeSet) => changeSet.origin != originIpAddress ); + const originIpAddress = await getServiceIp(entry); + return changeSets.filter((changeSet) => changeSet.origin != originIpAddress); } } -function hostnameForEntry( entry ) { +function hostnameForEntry(entry) { return (new URL(entry.callback.url)).hostname; } async function getServiceIp(entry) { - const hostName = hostnameForEntry( entry ); - return new Promise( (resolve, reject) => { - dns.lookup( hostName, { family: 4 }, ( err, address) => { - if( err ) - reject( err ); + const hostName = hostnameForEntry(entry); + return new Promise((resolve, reject) => { + dns.lookup(hostName, { family: 4 }, (err, address) => { + if (err) + reject(err); else - resolve( address ); - } ); - } ); + resolve(address); + }); + }); };