diff --git a/mongodb-queue.ts b/mongodb-queue.ts index ed38b4a..8cbd6f0 100644 --- a/mongodb-queue.ts +++ b/mongodb-queue.ts @@ -39,6 +39,7 @@ export type GetOptions = { export type PingOptions = { visibility?: number; resetTries?: boolean; + resetAck?: boolean; }; export type BaseMessage = { @@ -205,6 +206,10 @@ export class MongoDBQueue { }; } + if (opts.resetAck) { + update.$unset = {ack: 1}; + } + const msg = await this.col.findOneAndUpdate(query, update, options); if (!msg.value) { throw new Error('Queue.ping(): Unidentified ack : ' + ack); diff --git a/package.json b/package.json index 59e6cce..adf7ba0 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@reedsy/mongodb-queue", - "version": "8.1.0", + "version": "8.2.0", "description": "Message queues which uses MongoDB.", "main": "mongodb-queue.js", "scripts": { diff --git a/test/ping.js b/test/ping.js index 54ec2f8..eedca1a 100644 --- a/test/ping.js +++ b/test/ping.js @@ -109,6 +109,33 @@ setup().then(({client, db}) => { t.end(); }); + test('ping: reset ack', async function(t) { + const queue = new MongoDBQueue(db, 'ping', {visibility: 3}); + let msg; + let id; + + id = await queue.add('Hello, World!'); + t.ok(id, 'There is an id returned when adding a message.'); + msg = await queue.get(); + const ack = msg.ack; + // message should reset in three seconds + t.ok(msg.id, 'Got a msg.id (sanity check)'); + await timeout(2000); + id = await queue.ping(msg.ack, {resetAck: true}); + t.ok(id, 'Received an id when acking this message'); + // wait until the msg has returned to the queue + await timeout(6000); + msg = await queue.get(); + t.notEqual(ack, msg.ack, 'Ack was reset'); + await queue.ack(msg.ack); + msg = await queue.get(); + // no more messages + t.ok(!msg, 'No msg received'); + + t.pass('Finished test ok'); + t.end(); + }); + test('client.close()', function(t) { t.pass('client.close()'); client.close();