Skip to content

add preserveOrder stream option #18

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
259 changes: 259 additions & 0 deletions tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,23 @@ var through2Concurrent = require('./through2-concurrent');
var expect = require('expect.js');
var _ = require('underscore');

async function times(num, fn) {
for (i = 0; i < num; i++) {
const result = fn(i);
if (result instanceof Promise) {
await result;
}
}
}

var oldNextTick = process.nextTick;
function wait() {
return new Promise((resolve) => {
oldNextTick(() => {
resolve();
})
})
}

describe('through2-concurrent', function () {
var nextTickScheduled, collectingThrough, transformCalls, flushCalls, finalCalls;
Expand All @@ -18,6 +34,7 @@ describe('through2-concurrent', function () {
nextTickScheduled = [];
fns.forEach(execute);
}
return wait();
}

beforeEach(function () {
Expand Down Expand Up @@ -267,4 +284,246 @@ describe('through2-concurrent', function () {
});
});
});

describe('object streams (preserveOrder=true)', function () {
beforeEach(function () {
transformCalls = [];
flushCalls = [];
finalCalls = [];

var final = function (callback) {
finalCalls.push({ callback: callback });
};
collectingThrough = through2Concurrent.obj(
{ maxConcurrency: 4, final: final, preserveOrder: true },
function (chunk, enc, callback) {
transformCalls.push({ chunk: chunk, enc: enc, callback: callback });
}, function (callback) {
flushCalls.push({ callback: callback });
});
});

it('should initiially allow the maximum specified concurrency', function () {
_.times(10, function (i) {
collectingThrough.write({ number: i });
});
runNextTicks();
expect(transformCalls.length).to.be(4);
});

it('(preserveOrder=true)should not feed more through once the concurrency drops below the given limit', function () {
_.times(10, function (i) {
collectingThrough.write({ number: i });
});
runNextTicks();
transformCalls[2].callback();
transformCalls[0].callback();
runNextTicks();
expect(transformCalls.length).to.be(4);
});

it('should wait for all transform calls to finish before running final and flush when processing > concurrency', async function () {
times(10, function (i) {
collectingThrough.write({ number: i });
});
collectingThrough.end();
await runNextTicks();
await times(9, async function (i) {
const poped = transformCalls.pop()
poped.callback();
await runNextTicks();
});

expect(transformCalls.length).to.be(1);
expect(finalCalls.length).to.be(0);
expect(flushCalls.length).to.be(0);
transformCalls[0].callback();

// Ensure that flush is called straight after final even if we let the
// event loop progress beforehand but not between the calls
await wait();
await runNextTicks();

expect(finalCalls.length).to.be(1);
expect(flushCalls.length).to.be(0);
finalCalls[0].callback();
expect(finalCalls.length).to.be(1);
expect(flushCalls.length).to.be(1);
});

it('should wait for all transform calls to finish before running final and flush when processing < concurrency', async function () {
_.times(3, function (i) {
collectingThrough.write({ number: i });
});
collectingThrough.end();
await runNextTicks();
_.times(2, async function (i) {
transformCalls.pop().callback();
await runNextTicks();
});
expect(transformCalls.length).to.be(1);
expect(finalCalls.length).to.be(0);
expect(flushCalls.length).to.be(0);
transformCalls[0].callback();

await wait();
await runNextTicks();
// Ensure that flush is called straight after final even if we let the
// event loop progress beforehand but not between the calls
expect(finalCalls.length).to.be(1);
expect(flushCalls.length).to.be(0);
finalCalls[0].callback();
expect(finalCalls.length).to.be(1);
expect(flushCalls.length).to.be(1);
});

it('should wait for everything to complete before emitting "finish"', async function () {
var finishCalled = false;
collectingThrough.on('finish', function () {
finishCalled = true;
});
collectingThrough.resume();
collectingThrough.write({ hello: 'world' });
collectingThrough.write({ hello: 'world' });
collectingThrough.end();
await runNextTicks();
expect(finishCalled).to.be(false);
transformCalls[0].callback();
await runNextTicks();
transformCalls[1].callback();
await runNextTicks();
expect(finishCalled).to.be(false);
await runNextTicks();
finalCalls[0].callback();
// runNextTicks does not set stream to sync=false, causing 'finish' to get stuck
// because collectingThrough._writableState.pendingcb will always stay > 0
// setImmediate will guarantee processing and possiblity of 'finish' event
await wait();
expect(finishCalled).to.be(true);
});

it('should wait for everything to complete before emitting "end"', async function () {
var endCalled = false;
collectingThrough.on('end', function () {
endCalled = true;
});
collectingThrough.resume();
collectingThrough.write({ hello: 'world' });
collectingThrough.write({ hello: 'world' });
collectingThrough.end();
await runNextTicks();
expect(endCalled).to.be(false);
transformCalls[0].callback();
await runNextTicks();
transformCalls[1].callback();
await runNextTicks();
expect(endCalled).to.be(false);

await runNextTicks();
finalCalls[0].callback();
await runNextTicks();
expect(endCalled).to.be(false);
flushCalls[0].callback();
await runNextTicks();
expect(endCalled).to.be(true);
});

it('should pass down the stream data added with this.push', function (done) {
var final = function (cb) {
this.push({ finished: true });
cb();
};
const curNextick = process.nextTick;
process.nextTick = oldNextTick;
var passingThrough = through2Concurrent.obj(
{ maxConcurrency: 1, final: final },
function (chunk, enc, callback) {
this.push({ original: chunk });
callback();
}, function (callback) {
this.push({ flushed: true });
});
var out = [];
passingThrough.on('data', function (data) {
out.push(data);
});
passingThrough.write("Hello");
passingThrough.write("World");
passingThrough.end();
setTimeout(() => {
expect(out).to.eql([{ original: "Hello" }, { original: "World" }, { finished: true }, { flushed: true }]);
process.nextTick = curNextick;
done();
}, 0)
});

it('should pass down the stream data added as arguments to the callback', async function () {
var passingThrough = through2Concurrent.obj(
{ maxConcurrency: 1 },
function (chunk, enc, callback) {
callback(null, { original: chunk });
});
var out = [];
passingThrough.on('data', function (data) {
out.push(data);
});
passingThrough.write("Hello");
passingThrough.write("World");
await wait();
passingThrough.end();
await runNextTicks();
expect(out).to.eql([{ original: "Hello" }, { original: "World" }]);
});

describe('without a flush argument or final option', function () {
beforeEach(function () {
transformCalls = [];
flushCalls = [];

collectingThrough = through2Concurrent.obj(
{ maxConcurrency: 4 },
function (chunk, enc, callback) {
transformCalls.push({ chunk: chunk, enc: enc, callback: callback });
});
});

it('should wait for everything to complete before emitting "finish"', async function () {
var finishCalled = false;
collectingThrough.on('finish', function () {
finishCalled = true;
});

collectingThrough.resume();
collectingThrough.write({ hello: 'world' });
collectingThrough.end();
runNextTicks();
expect(finishCalled).to.be(false);
transformCalls[0].callback();

// runNextTicks does not progress ticks and set stream to sync=false, causing
// 'finish' to never fire because collectingThrough._writableState.pendingcb
// will always stay > 0 setImmediate will guarantee processing of 'finish' event
await wait();
await runNextTicks();
expect(finishCalled).to.be(true);
});

it('should wait for everything to complete before emitting "end"', async function () {
var endCalled = false;
collectingThrough.on('end', function () {
endCalled = true;
});
collectingThrough.resume();
collectingThrough.write({ hello: 'world' });
collectingThrough.end();
await runNextTicks();
expect(endCalled).to.be(false);
transformCalls[0].callback();
await runNextTicks();
await runNextTicks();
expect(endCalled).to.be(true);
});
});
});

});
62 changes: 62 additions & 0 deletions through2-concurrent.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,15 @@ var through2 = require('through2');
function cbNoop (cb) {
cb();
}
function removeFirst(arr, item) {
arr.some((v, index) => {
if (v === item) {
arr.splice(index, 1);
return true;
}
return false;
});
}

module.exports = function concurrentThrough (options, transform, flush) {
var concurrent = 0, lastCallback = null, pendingFinish = null;
Expand All @@ -18,6 +27,59 @@ module.exports = function concurrentThrough (options, transform, flush) {

var maxConcurrency = options.maxConcurrency || 16;

if (options.preserveOrder) {
const sendArr = [];
const doFinal = options.final || cbNoop;
options.final = function(finalCb) {
Promise.all(sendArr).then(() => {
process.nextTick(() => {
doFinal.call(this, finalCb);
})
});
};

const fnFlush = function(flushCb) {
if (flush) {
flush.call(this, flushCb);
} else {
flushCb();
}
};

const fnTransform = async function(data, encoding, callback) {
const sendP = new Promise((resolve, reject) => {
transform.call(this, data, encoding, (err, sendData) => {
if (err) {
reject(err);
} else {
resolve(sendData);
}
});
});
sendArr.push(sendP);

// throttle
if (sendArr.length >= maxConcurrency) {
await Promise.all(sendArr.slice());
const sendData = await sendP;

removeFirst(sendArr, sendP);
callback(null, sendData);
return;
}

process.nextTick(() => {
callback();
});
await Promise.all(sendArr.slice());
const sendData = await sendP;

removeFirst(sendArr, sendP);
this.push(sendData);
};
return through2(options, fnTransform, fnFlush);
}

function _transform (message, enc, callback) {
var self = this;
var callbackCalled = false;
Expand Down