Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions mongodb-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ export type GetOptions = {
export type PingOptions = {
visibility?: number;
resetTries?: boolean;
resetAck?: boolean;
};

export type BaseMessage<T = any> = {
Expand Down Expand Up @@ -205,6 +206,10 @@ export class MongoDBQueue<T = any> {
};
}

if (opts.resetAck) {
update.$unset = {ack: 1};
}

const msg = await this.col.findOneAndUpdate(query, update, options);
if (!msg.value) {
throw new Error('Queue.ping(): Unidentified ack : ' + ack);
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@reedsy/mongodb-queue",
"version": "8.1.0",
"version": "8.2.0",
"description": "Message queues which uses MongoDB.",
"main": "mongodb-queue.js",
"scripts": {
Expand Down
27 changes: 27 additions & 0 deletions test/ping.js
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,33 @@ setup().then(({client, db}) => {
t.end();
});

test('ping: reset ack', async function(t) {
const queue = new MongoDBQueue(db, 'ping', {visibility: 3});
let msg;
let id;

id = await queue.add('Hello, World!');
t.ok(id, 'There is an id returned when adding a message.');
msg = await queue.get();
const ack = msg.ack;
// message should reset in three seconds
t.ok(msg.id, 'Got a msg.id (sanity check)');
await timeout(2000);
id = await queue.ping(msg.ack, {resetAck: true});
t.ok(id, 'Received an id when acking this message');
// wait until the msg has returned to the queue
await timeout(6000);
msg = await queue.get();
t.notEqual(ack, msg.ack, 'Ack was reset');
await queue.ack(msg.ack);
msg = await queue.get();
// no more messages
t.ok(!msg, 'No msg received');

t.pass('Finished test ok');
t.end();
});

test('client.close()', function(t) {
t.pass('client.close()');
client.close();
Expand Down