diff --git a/app.js b/app.js index 97c710a..44b0c2e 100644 --- a/app.js +++ b/app.js @@ -3,140 +3,148 @@ import request from 'request'; import services from '/config/rules.js'; import bodyParser from 'body-parser'; import dns from 'dns'; +import metrics from './lib/metrics'; // Also parse application/json as json -app.use( bodyParser.json( { +app.use(bodyParser.json({ type: function(req) { - return /^application\/json/.test( req.get('content-type') ); + return /^application\/json/.test(req.get('content-type')); }, limit: '500mb' -} ) ); +})); // Log server config if requested -if( process.env["LOG_SERVER_CONFIGURATION"] ) - console.log(JSON.stringify( services )); +if (process.env['LOG_SERVER_CONFIGURATION']) + console.log(JSON.stringify(services)); -app.get( '/', function( req, res ) { +app.get('/', function(req, res) { res.status(200); - res.send("Hello, delta notification is running"); -} ); + res.send('Hello, delta notification is running'); +}); -app.post( '/', function( req, res ) { - if( process.env["LOG_REQUESTS"] ) { - console.log("Logging request body"); +app.get('/metric', function(req, res) { + res.status(200).send(metrics.getReport()); +}); + +app.post('/', function(req, res) { + if (process.env['LOG_REQUESTS']) { + console.log('Logging request body'); console.log(req.body); } const changeSets = req.body.changeSets; - const originalMuCallIdTrail = JSON.parse( req.get('mu-call-id-trail') || "[]" ); + const originalMuCallIdTrail = JSON.parse(req.get('mu-call-id-trail') || '[]'); const originalMuCallId = req.get('mu-call-id'); - const muCallIdTrail = JSON.stringify( [...originalMuCallIdTrail, originalMuCallId] ); + const muCallIdTrail = JSON.stringify([...originalMuCallIdTrail, originalMuCallId]); - changeSets.forEach( (change) => { + changeSets.forEach((change) => { change.insert = change.insert || []; change.delete = change.delete || []; - } ); + }); // inform watchers - informWatchers( changeSets, res, muCallIdTrail ); + informWatchers(changeSets, res, muCallIdTrail); // push relevant data to interested actors res.status(204).send(); -} ); +}); -async function informWatchers( changeSets, res, muCallIdTrail ){ - services.map( async (entry) => { - // for each entity - if( process.env["DEBUG_DELTA_MATCH"] ) - console.log(`Checking if we want to send to ${entry.callback.url}`); +async function informWatchers(changeSets, res, muCallIdTrail) { - const matchSpec = entry.match; + services.map(async (entry) => { + try { + // for each entity + if (process.env['DEBUG_DELTA_MATCH']) + console.log(`Checking if we want to send to ${entry.callback.url}`); - const originFilteredChangeSets = await filterMatchesForOrigin( changeSets, entry ); - if( process.env["DEBUG_TRIPLE_MATCHES_SPEC"] && entry.options.ignoreFromSelf ) - console.log(`There are ${originFilteredChangeSets.length} changes sets not from ${hostnameForEntry( entry )}`); + const matchSpec = entry.match; - let allInserts = []; - let allDeletes = []; + const originFilteredChangeSets = await filterMatchesForOrigin(changeSets, entry); + if (process.env['DEBUG_TRIPLE_MATCHES_SPEC'] && entry.options.ignoreFromSelf) + console.log(`There are ${originFilteredChangeSets.length} changes sets not from ${hostnameForEntry(entry)}`); - originFilteredChangeSets.forEach( (change) => { - allInserts = [...allInserts, ...change.insert]; - allDeletes = [...allDeletes, ...change.delete]; - } ); + let allInserts = []; + let allDeletes = []; - const changedTriples = [...allInserts, ...allDeletes]; + originFilteredChangeSets.forEach((change) => { + allInserts = [...allInserts, ...change.insert]; + allDeletes = [...allDeletes, ...change.delete]; + }); - const someTripleMatchedSpec = - changedTriples - .some( (triple) => tripleMatchesSpec( triple, matchSpec ) ); + const changedTriples = [...allInserts, ...allDeletes]; - if( process.env["DEBUG_TRIPLE_MATCHES_SPEC"] ) - console.log(`Triple matches spec? ${someTripleMatchedSpec}`); + const someTripleMatchedSpec = + changedTriples.some((triple) => tripleMatchesSpec(triple, matchSpec)); - if( someTripleMatchedSpec ) { - // inform matching entities - if( process.env["DEBUG_DELTA_SEND"] ) - console.log(`Going to send ${entry.callback.method} to ${entry.callback.url}`); + if (process.env['DEBUG_TRIPLE_MATCHES_SPEC']) + console.log(`Triple matches spec? ${someTripleMatchedSpec}`); - if( entry.options && entry.options.gracePeriod ) { - setTimeout( - () => sendRequest( entry, originFilteredChangeSets, muCallIdTrail ), - entry.options.gracePeriod ); - } else { - sendRequest( entry, originFilteredChangeSets, muCallIdTrail ); + if (someTripleMatchedSpec) { + // inform matching entities + if (process.env['DEBUG_DELTA_SEND']) + console.log(`Going to send ${entry.callback.method} to ${entry.callback.url}`); + + if (entry.options && entry.options.gracePeriod) { + setTimeout( + () => sendRequest(entry, originFilteredChangeSets, muCallIdTrail), + entry.options.gracePeriod); + } else { + sendRequest(entry, originFilteredChangeSets, muCallIdTrail); + } } + } catch (error) { + metrics.addRequest({ + url: entry.callback.url, + method: entry.callback.method, + error + }); } - } ); + }); } -function tripleMatchesSpec( triple, matchSpec ) { +function tripleMatchesSpec(triple, matchSpec) { // form of triple is {s, p, o}, same as matchSpec - if( process.env["DEBUG_TRIPLE_MATCHES_SPEC"] ) + if (process.env['DEBUG_TRIPLE_MATCHES_SPEC']) console.log(`Does ${JSON.stringify(triple)} match ${JSON.stringify(matchSpec)}?`); - for( let key in matchSpec ){ + for (let key in matchSpec) { // key is one of s, p, o const subMatchSpec = matchSpec[key]; const subMatchValue = triple[key]; - if( subMatchSpec && !subMatchValue ) + if (subMatchSpec && !subMatchValue) return false; - for( let subKey in subMatchSpec ) - // we're now matching something like {type: "url", value: "http..."} - if( subMatchSpec[subKey] !== subMatchValue[subKey] ) + for (let subKey in subMatchSpec) + // we're now matching something like {type: "url", value: "http..."} + if (subMatchSpec[subKey] !== subMatchValue[subKey]) return false; } return true; // no false matches found, let's send a response } - -function formatChangesetBody( changeSets, options ) { - if( options.resourceFormat == "v0.0.1" ) { +function formatChangesetBody(changeSets, options) { + if (options.resourceFormat == 'v0.0.1') { return JSON.stringify( - changeSets.map( (change) => { - return { - inserts: change.insert, - deletes: change.delete - }; - } ) ); + changeSets.map((change) => { + return { + inserts: change.insert, + deletes: change.delete + }; + })); } - if( options.resourceFormat == "v0.0.0-genesis" ) { + if (options.resourceFormat == 'v0.0.0-genesis') { // [{delta: {inserts, deletes}] - const newOptions = Object.assign({}, options, { resourceFormat: "v0.0.1" }); - const newFormat = JSON.parse( formatChangesetBody( changeSets, newOptions ) ); + const newOptions = Object.assign({}, options, {resourceFormat: 'v0.0.1'}); + const newFormat = JSON.parse(formatChangesetBody(changeSets, newOptions)); return JSON.stringify({ // graph: Not available delta: { - inserts: newFormat - .flatMap( ({inserts}) => inserts) - .map( ({subject,predicate,object}) => - ( { s: subject.value, p: predicate.value, o: object.value } ) ), - deletes: newFormat - .flatMap( ({deletes}) => deletes) - .map( ({subject,predicate,object}) => - ( { s: subject.value, p: predicate.value, o: object.value } ) ) + inserts: newFormat.flatMap(({inserts}) => inserts).map(({subject, predicate, object}) => + ({s: subject.value, p: predicate.value, o: object.value})), + deletes: newFormat.flatMap(({deletes}) => deletes).map(({subject, predicate, object}) => + ({s: subject.value, p: predicate.value, o: object.value})) } }); } else { @@ -144,17 +152,22 @@ function formatChangesetBody( changeSets, options ) { } } -async function sendRequest( entry, changeSets, muCallIdTrail ) { +async function sendRequest(entry, changeSets, muCallIdTrail) { let requestObject; // will contain request information // construct the requestObject const method = entry.callback.method; const url = entry.callback.url; - const headers = { "Content-Type": "application/json", "MU-AUTH-ALLOWED-GROUPS": changeSets[0].allowedGroups, "mu-call-id-trail": muCallIdTrail, "mu-call-id": uuid() }; - - if( entry.options && entry.options.resourceFormat ) { + const headers = { + 'Content-Type': 'application/json', + 'MU-AUTH-ALLOWED-GROUPS': changeSets[0].allowedGroups, + 'mu-call-id-trail': muCallIdTrail, + 'mu-call-id': uuid() + }; + + if (entry.options && entry.options.resourceFormat) { // we should send contents - const body = formatChangesetBody( changeSets, entry.options ); + const body = formatChangesetBody(changeSets, entry.options); // TODO: we now assume the mu-auth-allowed-groups will be the same // for each changeSet. that's a simplification and we should not @@ -167,46 +180,54 @@ async function sendRequest( entry, changeSets, muCallIdTrail ) { }; } else { // we should only inform - requestObject = { url, method, headers }; + requestObject = {url, method, headers}; } - if( process.env["DEBUG_DELTA_SEND"] ) + if (process.env['DEBUG_DELTA_SEND']) console.log(`Executing send ${method} to ${url}`); - request( requestObject, function( error, response, body ) { - if( error ) { + request(requestObject, function(error, response, body) { + if (error) { console.log(`Could not send request ${method} ${url}`); console.log(error); console.log(`NOT RETRYING`); // TODO: retry a few times when delta's fail to send + metrics.addRequest({ + url, + method, + error + }); } - if( response ) { - // console.log( body ); + if (response) { + metrics.addRequest({ + url, + method + }); } }); } -async function filterMatchesForOrigin( changeSets, entry ) { - if( ! entry.options || !entry.options.ignoreFromSelf ) { +async function filterMatchesForOrigin(changeSets, entry) { + if (!entry.options || !entry.options.ignoreFromSelf) { return changeSets; } else { - const originIpAddress = await getServiceIp( entry ); - return changeSets.filter( (changeSet) => changeSet.origin != originIpAddress ); + const originIpAddress = await getServiceIp(entry); + return changeSets.filter((changeSet) => changeSet.origin != originIpAddress); } } -function hostnameForEntry( entry ) { +function hostnameForEntry(entry) { return (new URL(entry.callback.url)).hostname; } async function getServiceIp(entry) { - const hostName = hostnameForEntry( entry ); - return new Promise( (resolve, reject) => { - dns.lookup( hostName, { family: 4 }, ( err, address) => { - if( err ) - reject( err ); + const hostName = hostnameForEntry(entry); + return new Promise((resolve, reject) => { + dns.lookup(hostName, {family: 4}, (err, address) => { + if (err) + reject(err); else - resolve( address ); - } ); - } ); + resolve(address); + }); + }); }; diff --git a/lib/metrics.js b/lib/metrics.js new file mode 100644 index 0000000..7a87b71 --- /dev/null +++ b/lib/metrics.js @@ -0,0 +1,75 @@ +import dayjs from 'dayjs'; + + +const TIME = parseInt(process.env['METRICS_KEEP_DELTAS_LAST_TIME']) || 3; +const UNITE = process.env['METRICS_KEEP_DELTAS_LAST_UNITE'] || 'h'; + +class Metrics { + + constructor() { + this._requests = []; + + // NOTE: set interval to clean up request metrics every hour + const now = dayjs(); + const nextHour = now.hour(now.hour() + 1).minute(0).second(0).millisecond(0); + setTimeout(() => { + this.collectGarbage(); + setInterval(this.collectGarbage.bind(this), 1000 * 60 * 60); + }, nextHour.diff(now, 's')); + } + + get requests() { + const response = this.getLastRequests(TIME, UNITE); + response['failed'] = response.filter(request => !!request.error); + return response; + } + + collectGarbage() { + // NOTE: we slice to remove any unwanted meta-data (clean-copy) + this._requests = this.requests.slice(); + } + + addRequest(request) { + request['time'] = dayjs(); + this._requests.push(request); + } + + getLastRequests(time = 1, unit = 'm') { + return this._requests.filter(value => dayjs().diff(value.time, unit) <= time); + } + + getReport() { + const report = { + delta_delivery_failed: { + count: 0 + } + }; + + report['delta_delivery_total'] = this.requests.length; + report.delta_delivery_failed.count = this.requests.failed.length; + + const helper = {}; + report.delta_delivery_failed.requests = this.requests.failed.reduce((result, request) => { + const key = `${request.url}-${request.method}`; + if (!helper[key]) { + helper[key] = Object.assign({}, { + url: request.url, + method: request.method, + count: 1 + }); + result.push(helper[key]); + } else { + helper[key].count++; + } + return result; + }, []); + + return report; + } + +} + +const metric = new Metrics(); + +// NOTE: we only expose a singleton for service wide use. +export default metric; \ No newline at end of file diff --git a/package.json b/package.json index 89a224e..65d3bd0 100644 --- a/package.json +++ b/package.json @@ -22,6 +22,7 @@ }, "homepage": "https://github.com/mu-semtech/delta-notifier#readme", "dependencies": { + "dayjs": "^1.10.5", "request": "^2.88.0" } }