Skip to content

Commit 361bc6c

Browse files
stream: prevent dead lock when Duplex generator is "thrown"
1 parent 5ad2ca9 commit 361bc6c

File tree

2 files changed

+63
-10
lines changed

2 files changed

+63
-10
lines changed

lib/internal/streams/duplexify.js

Lines changed: 36 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ module.exports = function duplexify(body, name) {
218218
};
219219

220220
function fromAsyncGen(fn, destructor) {
221-
let { promise, resolve } = PromiseWithResolvers();
221+
let { promise, resolve, reject } = PromiseWithResolvers();
222222
const ac = new AbortController();
223223
const signal = ac.signal;
224224

@@ -231,7 +231,7 @@ function fromAsyncGen(fn, destructor) {
231231
if (done) return;
232232
if (signal.aborted)
233233
throw new AbortError(undefined, { cause: signal.reason });
234-
({ promise, resolve } = PromiseWithResolvers());
234+
({ promise, resolve, reject } = PromiseWithResolvers());
235235
// Next line will "break" the loop if the generator is returned/thrown.
236236
yield chunk;
237237
}
@@ -242,6 +242,13 @@ function fromAsyncGen(fn, destructor) {
242242
try {
243243
return await originalReturn.call(this, value);
244244
} finally {
245+
if (resolve) {
246+
const _resolve = resolve;
247+
resolve = null;
248+
reject = null;
249+
_resolve({ done: true, cb: () => {} });
250+
}
251+
245252
if (promise) {
246253
const _promise = promise;
247254
promise = null;
@@ -258,13 +265,22 @@ function fromAsyncGen(fn, destructor) {
258265
try {
259266
return await originalThrow.call(this, err);
260267
} finally {
268+
// asyncGenerator.throw(undefined) should cause a callback error
269+
const error = err || new AbortError();
270+
271+
if (reject) {
272+
const _reject = reject;
273+
reject = null;
274+
resolve = null;
275+
_reject(error);
276+
}
277+
261278
if (promise) {
262279
const _promise = promise;
263280
promise = null;
264281
const { cb } = await _promise;
265282

266-
// asyncGenerator.throw(undefined) should cause a callback error
267-
process.nextTick(cb, err ?? new AbortError());
283+
process.nextTick(cb, error);
268284
}
269285
}
270286
};
@@ -274,14 +290,24 @@ function fromAsyncGen(fn, destructor) {
274290
return {
275291
value,
276292
write(chunk, encoding, cb) {
277-
const _resolve = resolve;
278-
resolve = null;
279-
_resolve({ chunk, done: false, cb });
293+
if (resolve) {
294+
const _resolve = resolve;
295+
resolve = null;
296+
reject = null;
297+
_resolve({ chunk, done: false, cb });
298+
} else {
299+
cb(new AbortError());
300+
}
280301
},
281302
final(cb) {
282-
const _resolve = resolve;
283-
resolve = null;
284-
_resolve({ done: true, cb });
303+
if (resolve) {
304+
const _resolve = resolve;
305+
resolve = null;
306+
reject = null;
307+
_resolve({ done: true, cb });
308+
} else {
309+
cb(new AbortError());
310+
}
285311
},
286312
destroy(err, cb) {
287313
ac.abort();

test/parallel/test-stream-pipeline.js

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1723,3 +1723,30 @@ tmpdir.refresh();
17231723
});
17241724
src.destroy(new Error('problem'));
17251725
}
1726+
1727+
{
1728+
async function* myAsyncGenerator(ag) {
1729+
for await (const data of ag) {
1730+
yield data;
1731+
}
1732+
}
1733+
1734+
const duplexStream = Duplex.from(myAsyncGenerator);
1735+
1736+
const r = new Readable({
1737+
read() {
1738+
this.push('data1\n');
1739+
throw new Error('booom');
1740+
},
1741+
});
1742+
1743+
const w = new Writable({
1744+
write(chunk, encoding, callback) {
1745+
callback();
1746+
},
1747+
});
1748+
1749+
pipeline(r, duplexStream, w, common.mustCall((err) => {
1750+
assert.deepStrictEqual(err, new Error('booom'));
1751+
}));
1752+
}

0 commit comments

Comments
 (0)