Skip to content

Commit d1e2549

Browse files
author
wenzhe
committed
add preserveOrder stream option
1 parent ce8b0dc commit d1e2549

File tree

1 file changed

+60
-0
lines changed

1 file changed

+60
-0
lines changed

through2-concurrent.js

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,15 @@ var through2 = require('through2');
66
function cbNoop (cb) {
77
cb();
88
}
9+
function removeFirst(arr, item) {
10+
arr.some((v, index) => {
11+
if (v === item) {
12+
arr.splice(index, 1);
13+
return true;
14+
}
15+
return false;
16+
});
17+
}
918

1019
module.exports = function concurrentThrough (options, transform, flush) {
1120
var concurrent = 0, lastCallback = null, pendingFinish = null;
@@ -18,6 +27,57 @@ module.exports = function concurrentThrough (options, transform, flush) {
1827

1928
var maxConcurrency = options.maxConcurrency || 16;
2029

30+
if (options.preserveOrder) {
31+
const sendArr = [];
32+
const doFinal = options.final || cbNoop;
33+
options.final = function(finalCb) {
34+
Promise.all(sendArr).then(() => {
35+
doFinal.call(this, finalCb);
36+
});
37+
};
38+
39+
const fnFlush = function(flushCb) {
40+
if (flush) {
41+
flush.call(this, flushCb);
42+
} else {
43+
flushCb();
44+
}
45+
};
46+
47+
const fnTransform = async function(data, encoding, callback) {
48+
const sendP = new Promise((resolve, reject) => {
49+
transform.call(this, data, encoding, (err, sendData) => {
50+
if (err) {
51+
reject(err);
52+
} else {
53+
resolve(sendData);
54+
}
55+
});
56+
});
57+
sendArr.push(sendP);
58+
59+
// throttle
60+
if (sendArr.length >= maxConcurrency) {
61+
await Promise.all(sendArr.slice());
62+
const sendData = await sendP;
63+
64+
removeFirst(sendArr, sendP);
65+
callback(null, sendData);
66+
return;
67+
}
68+
69+
process.nextTick(() => {
70+
callback();
71+
});
72+
await Promise.all(sendArr.slice());
73+
const sendData = await sendP;
74+
75+
removeFirst(sendArr, sendP);
76+
this.push(sendData);
77+
};
78+
return through2(options, fnTransform, fnFlush);
79+
}
80+
2181
function _transform (message, enc, callback) {
2282
var self = this;
2383
var callbackCalled = false;

0 commit comments

Comments
 (0)