diff --git a/tests.js b/tests.js index 7709bbb..b851e59 100644 --- a/tests.js +++ b/tests.js @@ -2,7 +2,23 @@ var through2Concurrent = require('./through2-concurrent'); var expect = require('expect.js'); var _ = require('underscore'); +async function times(num, fn) { + for (i = 0; i < num; i++) { + const result = fn(i); + if (result instanceof Promise) { + await result; + } + } +} + var oldNextTick = process.nextTick; +function wait() { + return new Promise((resolve) => { + oldNextTick(() => { + resolve(); + }) + }) +} describe('through2-concurrent', function () { var nextTickScheduled, collectingThrough, transformCalls, flushCalls, finalCalls; @@ -18,6 +34,7 @@ describe('through2-concurrent', function () { nextTickScheduled = []; fns.forEach(execute); } + return wait(); } beforeEach(function () { @@ -267,4 +284,246 @@ describe('through2-concurrent', function () { }); }); }); + + describe('object streams (preserveOrder=true)', function () { + beforeEach(function () { + transformCalls = []; + flushCalls = []; + finalCalls = []; + + var final = function (callback) { + finalCalls.push({ callback: callback }); + }; + collectingThrough = through2Concurrent.obj( + { maxConcurrency: 4, final: final, preserveOrder: true }, + function (chunk, enc, callback) { + transformCalls.push({ chunk: chunk, enc: enc, callback: callback }); + }, function (callback) { + flushCalls.push({ callback: callback }); + }); + }); + + it('should initiially allow the maximum specified concurrency', function () { + _.times(10, function (i) { + collectingThrough.write({ number: i }); + }); + runNextTicks(); + expect(transformCalls.length).to.be(4); + }); + + it('(preserveOrder=true)should not feed more through once the concurrency drops below the given limit', function () { + _.times(10, function (i) { + collectingThrough.write({ number: i }); + }); + runNextTicks(); + transformCalls[2].callback(); + transformCalls[0].callback(); + runNextTicks(); + expect(transformCalls.length).to.be(4); + }); + + it('should wait for all transform calls to finish before running final and flush when processing > concurrency', async function () { + times(10, function (i) { + collectingThrough.write({ number: i }); + }); + collectingThrough.end(); + await runNextTicks(); + await times(9, async function (i) { + const poped = transformCalls.pop() + poped.callback(); + await runNextTicks(); + }); + + expect(transformCalls.length).to.be(1); + expect(finalCalls.length).to.be(0); + expect(flushCalls.length).to.be(0); + transformCalls[0].callback(); + + // Ensure that flush is called straight after final even if we let the + // event loop progress beforehand but not between the calls + await wait(); + await runNextTicks(); + + expect(finalCalls.length).to.be(1); + expect(flushCalls.length).to.be(0); + finalCalls[0].callback(); + expect(finalCalls.length).to.be(1); + expect(flushCalls.length).to.be(1); + }); + + it('should wait for all transform calls to finish before running final and flush when processing < concurrency', async function () { + _.times(3, function (i) { + collectingThrough.write({ number: i }); + }); + collectingThrough.end(); + await runNextTicks(); + _.times(2, async function (i) { + transformCalls.pop().callback(); + await runNextTicks(); + }); + expect(transformCalls.length).to.be(1); + expect(finalCalls.length).to.be(0); + expect(flushCalls.length).to.be(0); + transformCalls[0].callback(); + + await wait(); + await runNextTicks(); + // Ensure that flush is called straight after final even if we let the + // event loop progress beforehand but not between the calls + expect(finalCalls.length).to.be(1); + expect(flushCalls.length).to.be(0); + finalCalls[0].callback(); + expect(finalCalls.length).to.be(1); + expect(flushCalls.length).to.be(1); + }); + + it('should wait for everything to complete before emitting "finish"', async function () { + var finishCalled = false; + collectingThrough.on('finish', function () { + finishCalled = true; + }); + collectingThrough.resume(); + collectingThrough.write({ hello: 'world' }); + collectingThrough.write({ hello: 'world' }); + collectingThrough.end(); + await runNextTicks(); + expect(finishCalled).to.be(false); + transformCalls[0].callback(); + await runNextTicks(); + transformCalls[1].callback(); + await runNextTicks(); + expect(finishCalled).to.be(false); + await runNextTicks(); + finalCalls[0].callback(); + // runNextTicks does not set stream to sync=false, causing 'finish' to get stuck + // because collectingThrough._writableState.pendingcb will always stay > 0 + // setImmediate will guarantee processing and possiblity of 'finish' event + await wait(); + expect(finishCalled).to.be(true); + }); + + it('should wait for everything to complete before emitting "end"', async function () { + var endCalled = false; + collectingThrough.on('end', function () { + endCalled = true; + }); + collectingThrough.resume(); + collectingThrough.write({ hello: 'world' }); + collectingThrough.write({ hello: 'world' }); + collectingThrough.end(); + await runNextTicks(); + expect(endCalled).to.be(false); + transformCalls[0].callback(); + await runNextTicks(); + transformCalls[1].callback(); + await runNextTicks(); + expect(endCalled).to.be(false); + + await runNextTicks(); + finalCalls[0].callback(); + await runNextTicks(); + expect(endCalled).to.be(false); + flushCalls[0].callback(); + await runNextTicks(); + expect(endCalled).to.be(true); + }); + + it('should pass down the stream data added with this.push', function (done) { + var final = function (cb) { + this.push({ finished: true }); + cb(); + }; + const curNextick = process.nextTick; + process.nextTick = oldNextTick; + var passingThrough = through2Concurrent.obj( + { maxConcurrency: 1, final: final }, + function (chunk, enc, callback) { + this.push({ original: chunk }); + callback(); + }, function (callback) { + this.push({ flushed: true }); + }); + var out = []; + passingThrough.on('data', function (data) { + out.push(data); + }); + passingThrough.write("Hello"); + passingThrough.write("World"); + passingThrough.end(); + setTimeout(() => { + expect(out).to.eql([{ original: "Hello" }, { original: "World" }, { finished: true }, { flushed: true }]); + process.nextTick = curNextick; + done(); + }, 0) + }); + + it('should pass down the stream data added as arguments to the callback', async function () { + var passingThrough = through2Concurrent.obj( + { maxConcurrency: 1 }, + function (chunk, enc, callback) { + callback(null, { original: chunk }); + }); + var out = []; + passingThrough.on('data', function (data) { + out.push(data); + }); + passingThrough.write("Hello"); + passingThrough.write("World"); + await wait(); + passingThrough.end(); + await runNextTicks(); + expect(out).to.eql([{ original: "Hello" }, { original: "World" }]); + }); + + describe('without a flush argument or final option', function () { + beforeEach(function () { + transformCalls = []; + flushCalls = []; + + collectingThrough = through2Concurrent.obj( + { maxConcurrency: 4 }, + function (chunk, enc, callback) { + transformCalls.push({ chunk: chunk, enc: enc, callback: callback }); + }); + }); + + it('should wait for everything to complete before emitting "finish"', async function () { + var finishCalled = false; + collectingThrough.on('finish', function () { + finishCalled = true; + }); + + collectingThrough.resume(); + collectingThrough.write({ hello: 'world' }); + collectingThrough.end(); + runNextTicks(); + expect(finishCalled).to.be(false); + transformCalls[0].callback(); + + // runNextTicks does not progress ticks and set stream to sync=false, causing + // 'finish' to never fire because collectingThrough._writableState.pendingcb + // will always stay > 0 setImmediate will guarantee processing of 'finish' event + await wait(); + await runNextTicks(); + expect(finishCalled).to.be(true); + }); + + it('should wait for everything to complete before emitting "end"', async function () { + var endCalled = false; + collectingThrough.on('end', function () { + endCalled = true; + }); + collectingThrough.resume(); + collectingThrough.write({ hello: 'world' }); + collectingThrough.end(); + await runNextTicks(); + expect(endCalled).to.be(false); + transformCalls[0].callback(); + await runNextTicks(); + await runNextTicks(); + expect(endCalled).to.be(true); + }); + }); + }); + }); diff --git a/through2-concurrent.js b/through2-concurrent.js index e268828..b6b147d 100644 --- a/through2-concurrent.js +++ b/through2-concurrent.js @@ -6,6 +6,15 @@ var through2 = require('through2'); function cbNoop (cb) { cb(); } +function removeFirst(arr, item) { + arr.some((v, index) => { + if (v === item) { + arr.splice(index, 1); + return true; + } + return false; + }); +} module.exports = function concurrentThrough (options, transform, flush) { var concurrent = 0, lastCallback = null, pendingFinish = null; @@ -18,6 +27,59 @@ module.exports = function concurrentThrough (options, transform, flush) { var maxConcurrency = options.maxConcurrency || 16; + if (options.preserveOrder) { + const sendArr = []; + const doFinal = options.final || cbNoop; + options.final = function(finalCb) { + Promise.all(sendArr).then(() => { + process.nextTick(() => { + doFinal.call(this, finalCb); + }) + }); + }; + + const fnFlush = function(flushCb) { + if (flush) { + flush.call(this, flushCb); + } else { + flushCb(); + } + }; + + const fnTransform = async function(data, encoding, callback) { + const sendP = new Promise((resolve, reject) => { + transform.call(this, data, encoding, (err, sendData) => { + if (err) { + reject(err); + } else { + resolve(sendData); + } + }); + }); + sendArr.push(sendP); + + // throttle + if (sendArr.length >= maxConcurrency) { + await Promise.all(sendArr.slice()); + const sendData = await sendP; + + removeFirst(sendArr, sendP); + callback(null, sendData); + return; + } + + process.nextTick(() => { + callback(); + }); + await Promise.all(sendArr.slice()); + const sendData = await sendP; + + removeFirst(sendArr, sendP); + this.push(sendData); + }; + return through2(options, fnTransform, fnFlush); + } + function _transform (message, enc, callback) { var self = this; var callbackCalled = false;