Skip to content

Commit 523ce77

Browse files
authored
Merge pull request #4 from DaPulse/addition/ran/missing_function_callbacks
addition of data in instrumentation and version upgrade
2 parents 6cf35c1 + 2e8c900 commit 523ce77

File tree

3 files changed

+53
-38
lines changed

3 files changed

+53
-38
lines changed

dist/consumer.js

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,9 @@ class Consumer extends events_1.EventEmitter {
7171
this.handleMessageBatch = options.handleMessageBatch;
7272
this.pollingStartedInstrumentCallback = options.pollingStartedInstrumentCallback;
7373
this.pollingFinishedInstrumentCallback = options.pollingFinishedInstrumentCallback;
74+
this.batchStartedInstrumentCallBack = options.batchStartedInstrumentCallBack;
75+
this.batchFinishedInstrumentCallBack = options.batchFinishedInstrumentCallBack;
76+
this.batchFailedInstrumentCallBack = options.batchFailedInstrumentCallBack;
7477
this.handleMessageTimeout = options.handleMessageTimeout;
7578
this.attributeNames = options.attributeNames || [];
7679
this.messageAttributeNames = options.messageAttributeNames || [];
@@ -135,7 +138,8 @@ class Consumer extends events_1.EventEmitter {
135138
this.pollingFinishedInstrumentCallback({
136139
instanceId: process.env.HOSTNAME,
137140
queueUrl: this.queueUrl,
138-
messagesReceived: numberOfMessages
141+
messagesReceived: numberOfMessages,
142+
freeConcurrentSlots: this.freeConcurrentSlots
139143
});
140144
}
141145
if (response) {
@@ -251,7 +255,8 @@ class Consumer extends events_1.EventEmitter {
251255
this.pollingStartedInstrumentCallback({
252256
instanceId: process.env.HOSTNAME,
253257
queueUrl: this.queueUrl,
254-
pollBatchSize
258+
pollBatchSize,
259+
freeConcurrentSlots: this.freeConcurrentSlots
255260
});
256261
}
257262
let currentPollingTimeout = this.pollingWaitTimeMs;
@@ -296,31 +301,34 @@ class Consumer extends events_1.EventEmitter {
296301
instanceId: process.env.HOSTNAME,
297302
queueUrl: this.queueUrl,
298303
batchUuid,
299-
numberOfMessages: messages.length
304+
numberOfMessages: messages.length,
305+
freeConcurrentSlots: this.freeConcurrentSlots
300306
});
301307
}
302-
try {
303-
await this.handleMessageBatch(messages, this);
308+
this.handleMessageBatch(messages, this)
309+
.then(() => {
304310
if (this.batchFinishedInstrumentCallBack) {
305311
this.batchFinishedInstrumentCallBack({
306312
instanceId: process.env.HOSTNAME,
307313
queueUrl: this.queueUrl,
308314
batchUuid,
309-
numberOfMessages: messages.length
315+
numberOfMessages: messages.length,
316+
freeConcurrentSlots: this.freeConcurrentSlots
310317
});
311318
}
312-
}
313-
catch (err) {
319+
})
320+
.catch(err => {
314321
if (this.batchFailedInstrumentCallBack) {
315322
this.batchFailedInstrumentCallBack({
316323
instanceId: process.env.HOSTNAME,
317324
queueUrl: this.queueUrl,
318325
batchUuid,
319-
numberOfMessages: messages.length
326+
numberOfMessages: messages.length,
327+
freeConcurrentSlots: this.freeConcurrentSlots,
328+
error: err
320329
});
321330
}
322-
throw err;
323-
}
331+
});
324332
}
325333
}
326334
exports.Consumer = Consumer;

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "sqs-consumer",
3-
"version": "5.7.8",
3+
"version": "5.7.81",
44
"description": "Build SQS-based Node applications without the boilerplate",
55
"main": "dist/index.js",
66
"types": "dist/index.d.ts",

src/consumer.ts

Lines changed: 33 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,9 @@ export class Consumer extends EventEmitter {
140140
this.handleMessageBatch = options.handleMessageBatch;
141141
this.pollingStartedInstrumentCallback = options.pollingStartedInstrumentCallback;
142142
this.pollingFinishedInstrumentCallback = options.pollingFinishedInstrumentCallback;
143+
this.batchStartedInstrumentCallBack = options.batchStartedInstrumentCallBack;
144+
this.batchFinishedInstrumentCallBack = options.batchFinishedInstrumentCallBack;
145+
this.batchFailedInstrumentCallBack = options.batchFailedInstrumentCallBack;
143146
this.handleMessageTimeout = options.handleMessageTimeout;
144147
this.attributeNames = options.attributeNames || [];
145148
this.messageAttributeNames = options.messageAttributeNames || [];
@@ -216,7 +219,8 @@ export class Consumer extends EventEmitter {
216219
this.pollingFinishedInstrumentCallback({
217220
instanceId: process.env.HOSTNAME,
218221
queueUrl: this.queueUrl,
219-
messagesReceived: numberOfMessages
222+
messagesReceived: numberOfMessages,
223+
freeConcurrentSlots: this.freeConcurrentSlots
220224
});
221225
}
222226

@@ -334,7 +338,8 @@ export class Consumer extends EventEmitter {
334338
this.pollingStartedInstrumentCallback({
335339
instanceId: process.env.HOSTNAME,
336340
queueUrl: this.queueUrl,
337-
pollBatchSize
341+
pollBatchSize,
342+
freeConcurrentSlots: this.freeConcurrentSlots
338343
});
339344
}
340345

@@ -384,32 +389,34 @@ export class Consumer extends EventEmitter {
384389
instanceId: process.env.HOSTNAME,
385390
queueUrl: this.queueUrl,
386391
batchUuid,
387-
numberOfMessages: messages.length
392+
numberOfMessages: messages.length,
393+
freeConcurrentSlots: this.freeConcurrentSlots
388394
});
389395
}
390396

391-
try {
392-
await this.handleMessageBatch(messages, this);
393-
394-
if (this.batchFinishedInstrumentCallBack) {
395-
this.batchFinishedInstrumentCallBack({
396-
instanceId: process.env.HOSTNAME,
397-
queueUrl: this.queueUrl,
398-
batchUuid,
399-
numberOfMessages: messages.length
400-
});
401-
}
402-
} catch (err) {
403-
if (this.batchFailedInstrumentCallBack) {
404-
this.batchFailedInstrumentCallBack({
405-
instanceId: process.env.HOSTNAME,
406-
queueUrl: this.queueUrl,
407-
batchUuid,
408-
numberOfMessages: messages.length
409-
});
410-
}
411-
412-
throw err;
413-
}
397+
this.handleMessageBatch(messages, this)
398+
.then(() => {
399+
if (this.batchFinishedInstrumentCallBack) {
400+
this.batchFinishedInstrumentCallBack({
401+
instanceId: process.env.HOSTNAME,
402+
queueUrl: this.queueUrl,
403+
batchUuid,
404+
numberOfMessages: messages.length,
405+
freeConcurrentSlots: this.freeConcurrentSlots
406+
});
407+
}
408+
})
409+
.catch(err => {
410+
if (this.batchFailedInstrumentCallBack) {
411+
this.batchFailedInstrumentCallBack({
412+
instanceId: process.env.HOSTNAME,
413+
queueUrl: this.queueUrl,
414+
batchUuid,
415+
numberOfMessages: messages.length,
416+
freeConcurrentSlots: this.freeConcurrentSlots,
417+
error: err
418+
});
419+
}
420+
});
414421
}
415422
}

0 commit comments

Comments
 (0)