Skip to content

Commit 6cf35c1

Browse files
authored
Merge pull request #3 from DaPulse/feature/ran/review_fixes_new_sqs_infra
Feature/ran/review fixes new sqs infra
2 parents 33a4a88 + b40b69d commit 6cf35c1

File tree

4 files changed

+120
-71
lines changed

4 files changed

+120
-71
lines changed

dist/consumer.d.ts

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,13 @@ export interface ConsumerOptions {
66
attributeNames?: string[];
77
messageAttributeNames?: string[];
88
stopped?: boolean;
9-
concurencyLimit?: number;
9+
concurrencyLimit?: number;
1010
batchSize?: number;
1111
visibilityTimeout?: number;
1212
waitTimeSeconds?: number;
1313
authenticationErrorTimeout?: number;
1414
pollingWaitTimeMs?: number;
15-
pollingWaitTimeMsBatchSizeZero?: number;
15+
msDelayOnEmptyBatchSize?: number;
1616
terminateVisibilityTimeout?: boolean;
1717
sqs?: SQS;
1818
region?: string;
@@ -21,26 +21,31 @@ export interface ConsumerOptions {
2121
handleMessageBatch?(messages: SQSMessage[], consumer: Consumer): Promise<void>;
2222
pollingStartedInstrumentCallback?(eventData: object): void;
2323
pollingFinishedInstrumentCallback?(eventData: object): void;
24-
batchSizeUpdatedInstrument?(eventData: object): void;
24+
batchStartedInstrumentCallBack?(eventData: object): void;
25+
batchFinishedInstrumentCallBack?(eventData: object): void;
26+
batchFailedInstrumentCallBack?(eventData: object): void;
2527
}
2628
export declare class Consumer extends EventEmitter {
2729
private queueUrl;
2830
private handleMessage;
2931
private handleMessageBatch;
3032
private pollingStartedInstrumentCallback?;
3133
private pollingFinishedInstrumentCallback?;
32-
private batchSizeUpdatedInstrument?;
34+
private batchStartedInstrumentCallBack?;
35+
private batchFinishedInstrumentCallBack?;
36+
private batchFailedInstrumentCallBack?;
3337
private handleMessageTimeout;
3438
private attributeNames;
3539
private messageAttributeNames;
3640
private stopped;
37-
private concurencyLimit;
41+
private concurrencyLimit;
42+
private freeConcurrentSlots;
3843
private batchSize;
3944
private visibilityTimeout;
4045
private waitTimeSeconds;
4146
private authenticationErrorTimeout;
4247
private pollingWaitTimeMs;
43-
private pollingWaitTimeMsBatchSizeZero;
48+
private msDelayOnEmptyBatchSize;
4449
private terminateVisibilityTimeout;
4550
private sqs;
4651
constructor(options: ConsumerOptions);
@@ -50,7 +55,6 @@ export declare class Consumer extends EventEmitter {
5055
stop(): void;
5156
reportMessageFromBatchFinished(message: SQSMessage, error: Error): Promise<void>;
5257
private reportNumberOfMessagesReceived;
53-
private updateBatchSize;
5458
private handleSqsResponse;
5559
private processMessage;
5660
private receiveMessage;

dist/consumer.js

Lines changed: 46 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
Object.defineProperty(exports, "__esModule", { value: true });
33
const SQS = require("aws-sdk/clients/sqs");
44
const Debug = require("debug");
5+
const crypto = require("crypto");
56
const events_1 = require("events");
67
const bind_1 = require("./bind");
78
const errors_1 = require("./errors");
@@ -11,6 +12,9 @@ const requiredOptions = [
1112
// only one of handleMessage / handleMessagesBatch is required
1213
'handleMessage|handleMessageBatch'
1314
];
15+
function generateUuid() {
16+
return crypto.randomBytes(16).toString('hex');
17+
}
1418
function createTimeout(duration) {
1519
let timeout;
1620
const pending = new Promise((_, reject) => {
@@ -67,19 +71,19 @@ class Consumer extends events_1.EventEmitter {
6771
this.handleMessageBatch = options.handleMessageBatch;
6872
this.pollingStartedInstrumentCallback = options.pollingStartedInstrumentCallback;
6973
this.pollingFinishedInstrumentCallback = options.pollingFinishedInstrumentCallback;
70-
this.batchSizeUpdatedInstrument = options.batchSizeUpdatedInstrument;
7174
this.handleMessageTimeout = options.handleMessageTimeout;
7275
this.attributeNames = options.attributeNames || [];
7376
this.messageAttributeNames = options.messageAttributeNames || [];
7477
this.stopped = true;
7578
this.batchSize = options.batchSize || 1;
76-
this.concurencyLimit = options.concurencyLimit || 30;
79+
this.concurrencyLimit = options.concurrencyLimit || 30;
80+
this.freeConcurrentSlots = this.concurrencyLimit;
7781
this.visibilityTimeout = options.visibilityTimeout;
7882
this.terminateVisibilityTimeout = options.terminateVisibilityTimeout || false;
7983
this.waitTimeSeconds = options.waitTimeSeconds || 20;
8084
this.authenticationErrorTimeout = options.authenticationErrorTimeout || 10000;
8185
this.pollingWaitTimeMs = options.pollingWaitTimeMs || 0;
82-
this.pollingWaitTimeMsBatchSizeZero = options.pollingWaitTimeMsBatchSizeZero || 5;
86+
this.msDelayOnEmptyBatchSize = options.msDelayOnEmptyBatchSize || 5;
8387
this.sqs =
8488
options.sqs ||
8589
new SQS({
@@ -106,8 +110,7 @@ class Consumer extends events_1.EventEmitter {
106110
}
107111
async reportMessageFromBatchFinished(message, error) {
108112
debug('Message from batch has finised');
109-
this.concurencyLimit++;
110-
this.updateBatchSize();
113+
this.freeConcurrentSlots++;
111114
try {
112115
if (error)
113116
throw error;
@@ -119,22 +122,8 @@ class Consumer extends events_1.EventEmitter {
119122
}
120123
}
121124
reportNumberOfMessagesReceived(numberOfMessages) {
122-
debug('decrementing next batch size');
123-
this.concurencyLimit = this.concurencyLimit - numberOfMessages;
124-
this.updateBatchSize();
125-
}
126-
updateBatchSize() {
127-
debug('Updating next batch size');
128-
this.batchSize = Math.min(10, this.concurencyLimit);
129-
// instrument current batch size
130-
if (this.batchSizeUpdatedInstrument) {
131-
this.batchSizeUpdatedInstrument({
132-
instanceId: process.env.HOSTNAME,
133-
queueUrl: this.queueUrl,
134-
currentConcurencyLimit: this.concurencyLimit,
135-
batchSize: this.batchSize
136-
});
137-
}
125+
debug('Reducing number of messages received from freeConcurrentSlots');
126+
this.freeConcurrentSlots = this.freeConcurrentSlots - numberOfMessages;
138127
}
139128
async handleSqsResponse(response) {
140129
debug('Received SQS response');
@@ -256,22 +245,22 @@ class Consumer extends events_1.EventEmitter {
256245
this.emit('stopped');
257246
return;
258247
}
248+
const pollBatchSize = Math.min(this.batchSize, this.freeConcurrentSlots);
259249
debug('Polling for messages');
260250
if (this.pollingStartedInstrumentCallback) {
261251
this.pollingStartedInstrumentCallback({
262252
instanceId: process.env.HOSTNAME,
263253
queueUrl: this.queueUrl,
264-
// instrument i am about to request this.batchSize messages
265-
batchSize: this.batchSize
254+
pollBatchSize
266255
});
267256
}
268257
let currentPollingTimeout = this.pollingWaitTimeMs;
269-
if (this.batchSize > 0) {
258+
if (pollBatchSize > 0) {
270259
const receiveParams = {
271260
QueueUrl: this.queueUrl,
272261
AttributeNames: this.attributeNames,
273262
MessageAttributeNames: this.messageAttributeNames,
274-
MaxNumberOfMessages: this.batchSize,
263+
MaxNumberOfMessages: pollBatchSize,
275264
WaitTimeSeconds: this.waitTimeSeconds,
276265
VisibilityTimeout: this.visibilityTimeout
277266
};
@@ -293,15 +282,45 @@ class Consumer extends events_1.EventEmitter {
293282
});
294283
}
295284
else {
296-
setTimeout(this.poll, this.pollingWaitTimeMsBatchSizeZero);
285+
setTimeout(this.poll, this.msDelayOnEmptyBatchSize);
297286
}
298287
}
299288
async processMessageBatch(messages) {
300289
messages.forEach(message => {
301290
this.emit('message_received', message);
302291
});
303292
this.reportNumberOfMessagesReceived(messages.length);
304-
this.handleMessageBatch(messages, this);
293+
const batchUuid = generateUuid();
294+
if (this.batchStartedInstrumentCallBack) {
295+
this.batchStartedInstrumentCallBack({
296+
instanceId: process.env.HOSTNAME,
297+
queueUrl: this.queueUrl,
298+
batchUuid,
299+
numberOfMessages: messages.length
300+
});
301+
}
302+
try {
303+
await this.handleMessageBatch(messages, this);
304+
if (this.batchFinishedInstrumentCallBack) {
305+
this.batchFinishedInstrumentCallBack({
306+
instanceId: process.env.HOSTNAME,
307+
queueUrl: this.queueUrl,
308+
batchUuid,
309+
numberOfMessages: messages.length
310+
});
311+
}
312+
}
313+
catch (err) {
314+
if (this.batchFailedInstrumentCallBack) {
315+
this.batchFailedInstrumentCallBack({
316+
instanceId: process.env.HOSTNAME,
317+
queueUrl: this.queueUrl,
318+
batchUuid,
319+
numberOfMessages: messages.length
320+
});
321+
}
322+
throw err;
323+
}
305324
}
306325
}
307326
exports.Consumer = Consumer;

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "sqs-consumer",
3-
"version": "5.7.7",
3+
"version": "5.7.8",
44
"description": "Build SQS-based Node applications without the boilerplate",
55
"main": "dist/index.js",
66
"types": "dist/index.d.ts",

0 commit comments

Comments
 (0)