From 5ca8162998e39c3c1b74096e684386f0f41aa28b Mon Sep 17 00:00:00 2001 From: Jesse Wright <63333554+jeswr@users.noreply.github.com> Date: Wed, 10 Aug 2022 16:51:34 +0200 Subject: [PATCH 1/6] chore: update unioniterator tests --- test/AsyncIterator-test.js | 19 +++++ test/SimpleTransformIterator-test.js | 12 --- test/UnionIterator-test.js | 119 ++++++++++----------------- 3 files changed, 64 insertions(+), 86 deletions(-) diff --git a/test/AsyncIterator-test.js b/test/AsyncIterator-test.js index a062dd9..3ba4c86 100644 --- a/test/AsyncIterator-test.js +++ b/test/AsyncIterator-test.js @@ -7,6 +7,7 @@ import { scheduleTask, isPromise, isIterator, + range, } from '../dist/asynciterator.js'; import { EventEmitter } from 'events'; @@ -1331,4 +1332,22 @@ describe('Type-checking functions', () => { expect(isIterator([][Symbol.iterator]())).to.equal(true); }); }); + + describe('Testing #append', () => { + it('Should append an array', async () => { + expect(await range(0, 1).append([2, 3, 4]).toArray()).to.deep.equal([0, 1, 2, 3, 4]); + }); + it('Should append an iterator', async () => { + expect(await range(0, 1).append(range(2, 4)).toArray()).to.deep.equal([0, 1, 2, 3, 4]); + }); + }); + + describe('Testing #prepend', () => { + it('Should prepend an array', async () => { + expect(await range(0, 1).prepend([2, 3, 4]).toArray()).to.deep.equal([2, 3, 4, 0, 1]); + }); + it('Should prepend an iterator', async () => { + expect(await range(0, 1).prepend(range(2, 4)).toArray()).to.deep.equal([2, 3, 4, 0, 1]); + }); + }); }); diff --git a/test/SimpleTransformIterator-test.js b/test/SimpleTransformIterator-test.js index b531405..2c469c6 100644 --- a/test/SimpleTransformIterator-test.js +++ b/test/SimpleTransformIterator-test.js @@ -1108,10 +1108,6 @@ describe('SimpleTransformIterator', () => { result.on('end', done); }); - it('should be a SimpleTransformIterator', () => { - result.should.be.an.instanceof(SimpleTransformIterator); - }); - it('should prepend the items', () => { items.should.deep.equal(['i', 'ii', 'iii', 'a', 'b', 'c']); }); @@ -1138,10 +1134,6 @@ describe('SimpleTransformIterator', () => { result.on('end', done); }); - it('should be a SimpleTransformIterator', () => { - result.should.be.an.instanceof(SimpleTransformIterator); - }); - it('should append the items', () => { items.should.deep.equal(['a', 'b', 'c', 'I', 'II', 'III']); }); @@ -1168,10 +1160,6 @@ describe('SimpleTransformIterator', () => { result.on('end', done); }); - it('should be a SimpleTransformIterator', () => { - result.should.be.an.instanceof(SimpleTransformIterator); - }); - it('should surround the items', () => { items.should.deep.equal(['i', 'ii', 'iii', 'a', 'b', 'c', 'I', 'II', 'III']); }); diff --git a/test/UnionIterator-test.js b/test/UnionIterator-test.js index 8ed66c2..4e32860 100644 --- a/test/UnionIterator-test.js +++ b/test/UnionIterator-test.js @@ -68,6 +68,17 @@ describe('UnionIterator', () => { (await toArray(iterator)).sort().should.eql([0, 1, 2]); }); + it('should include all data from 1 non-empty and 4 empty sources - with maxParallelIterators: 1', async () => { + const iterator = new UnionIterator([ + new EmptyIterator(), + new EmptyIterator(), + range(0, 2), + new EmptyIterator(), + new EmptyIterator(), + ], { maxParallelIterators: 1 }); + (await toArray(iterator)).sort().should.eql([0, 1, 2]); + }); + describe('when constructed with an array of 0 sources', () => { let iterator; before(() => { @@ -75,8 +86,10 @@ describe('UnionIterator', () => { iterator = new UnionIterator(sources); }); - it('should have ended', () => { - iterator.ended.should.be.true; + describe('before reading', () => { + it('should not have ended', () => { + iterator.ended.should.be.false; + }); }); }); @@ -84,7 +97,7 @@ describe('UnionIterator', () => { let iterator; before(() => { const sources = []; - iterator = new UnionIterator(sources, { autoStart: false }); + iterator = new UnionIterator(sources); }); describe('before reading', () => { @@ -136,7 +149,7 @@ describe('UnionIterator', () => { }); it('should have ended', () => { - iterator.ended.should.be.true; + iterator.ended.should.be.false; }); }); @@ -148,7 +161,7 @@ describe('UnionIterator', () => { }); it('should have ended', () => { - iterator.ended.should.be.true; + iterator.ended.should.be.false; }); }); @@ -156,7 +169,7 @@ describe('UnionIterator', () => { let iterator; before(() => { const sources = []; - iterator = new UnionIterator(new ArrayIterator(sources), { autoStart: false }); + iterator = new UnionIterator(new ArrayIterator(sources)); }); describe('before reading', () => { @@ -226,22 +239,9 @@ describe('UnionIterator', () => { }); describe('before reading', () => { - it('should have read the sources', () => { - sourceIterator.read.should.have.been.called; - }); - it('should not have ended', () => { iterator.ended.should.be.false; }); - - it('should pass errors', () => { - const callback = sinon.spy(); - const error = new Error('error'); - iterator.once('error', callback); - sourceIterator.emit('error', error); - callback.should.have.been.calledOnce; - callback.should.have.been.calledWith(error); - }); }); describe('after reading', () => { @@ -266,26 +266,17 @@ describe('UnionIterator', () => { const sources = [Promise.resolve(range(0, 2)), range(3, 6)]; sourceIterator = new ArrayIterator(sources); sinon.spy(sourceIterator, 'read'); - iterator = new UnionIterator(sourceIterator, { autoStart: true }); + iterator = new UnionIterator(sourceIterator); }); describe('before reading', () => { - it('should have read the sources', () => { - sourceIterator.read.should.have.been.called; + it('should not have read the sources', () => { + sourceIterator.read.should.not.have.been.called; }); it('should not have ended', () => { iterator.ended.should.be.false; }); - - it('should pass errors', () => { - const callback = sinon.spy(); - const error = new Error('error'); - iterator.once('error', callback); - sourceIterator.emit('error', error); - callback.should.have.been.calledOnce; - callback.should.have.been.calledWith(error); - }); }); describe('after reading', () => { @@ -310,7 +301,7 @@ describe('UnionIterator', () => { const sources = [range(0, 2), range(3, 6)]; sourceIterator = new ArrayIterator(sources); sinon.spy(sourceIterator, 'read'); - iterator = new UnionIterator(sourceIterator, { autoStart: false }); + iterator = new UnionIterator(sourceIterator); }); describe('before reading', () => { @@ -321,15 +312,6 @@ describe('UnionIterator', () => { it('should not have ended', () => { iterator.ended.should.be.false; }); - - it('should pass errors', () => { - const callback = sinon.spy(); - const error = new Error('error'); - iterator.once('error', callback); - sourceIterator.emit('error', error); - callback.should.have.been.calledOnce; - callback.should.have.been.calledWith(error); - }); }); describe('after reading', () => { @@ -358,7 +340,7 @@ describe('UnionIterator', () => { const sources = [Promise.resolve(range(0, 2)), range(3, 6)]; sourceIterator = new ArrayIterator(sources); sinon.spy(sourceIterator, 'read'); - iterator = new UnionIterator(sourceIterator, { autoStart: false }); + iterator = new UnionIterator(sourceIterator); }); describe('before reading', () => { @@ -369,15 +351,6 @@ describe('UnionIterator', () => { it('should not have ended', () => { iterator.ended.should.be.false; }); - - it('should pass errors', () => { - const callback = sinon.spy(); - const error = new Error('error'); - iterator.once('error', callback); - sourceIterator.emit('error', error); - callback.should.have.been.calledOnce; - callback.should.have.been.calledWith(error); - }); }); describe('after reading', () => { @@ -412,6 +385,9 @@ describe('UnionIterator', () => { }); it('should emit an error when the first iterator emits an error', () => { + iterator.read().should.eql(0); + iterator.read().should.eql(1); + const error = new Error('error'); const callback = sinon.spy(); iterator.on('error', callback); @@ -421,6 +397,11 @@ describe('UnionIterator', () => { }); it('should emit an error when the second iterator emits an error', () => { + iterator.read().should.eql(0); + iterator.read().should.eql(1); + iterator.read().should.eql(2); + iterator.read().should.eql(3); + const error = new Error('error'); const callback = sinon.spy(); iterator.on('error', callback); @@ -433,23 +414,9 @@ describe('UnionIterator', () => { (await toArray(iterator)).should.be.instanceof(Array); }); - it('should allow the _read method to be called multiple times', () => { - iterator._read(1, noop); - iterator._read(1, noop); - }); - it('should make a round-robin union of the data elements', async () => { (await toArray(iterator)).sort().should.eql([0, 1, 2, 3, 4, 5, 6]); }); - - it('should destroy the sources when closing', async () => { - iterator.close(); - - await new Promise(resolve => iterator.on('end', resolve)); - - sources[0].closed.should.be.true; - sources[1].closed.should.be.true; - }); }); describe('a UnionIterator with two sources without destroySources', () => { @@ -526,15 +493,15 @@ describe('UnionIterator', () => { it('should read 2 streams in round-robin order', async () => { // Read 4 buffered items expect(iterator.read()).to.equal(3); - expect(iterator.read()).to.equal(6); expect(iterator.read()).to.equal(4); - expect(iterator.read()).to.equal(7); + expect(iterator.read()).to.equal(5); + expect(iterator.read()).to.equal(6); // Buffer await new Promise(resolve => scheduleTask(resolve)); // Read remaining items - expect(iterator.read()).to.equal(5); + expect(iterator.read()).to.equal(7); expect(iterator.read()).to.be.null; }); @@ -580,9 +547,18 @@ describe('UnionIterator', () => { await new Promise(resolve => iterator.on('end', resolve)); sourcesIterator.closed.should.be.true; + }); + + it('should close iterators that have started to be read', async () => { + iterator.read(); + iterator.close(); + + await new Promise(resolve => iterator.on('end', resolve)); + + sourcesIterator.closed.should.be.true; sources[0].closed.should.be.true; - sources[1].closed.should.be.true; + sources[1].closed.should.be.false; }); }); @@ -611,9 +587,6 @@ describe('UnionIterator', () => { await new Promise(resolve => iterator.on('end', resolve)); sourcesIterator.closed.should.be.true; - - sources[0].closed.should.be.true; - sources[1].closed.should.be.true; }); }); @@ -662,5 +635,3 @@ function toArray(stream) { stream.on('end', () => resolve(array)); }); } - -function noop() { /* */ } From 153e79f13d0c0547698f6b69ab526073d10d17e8 Mon Sep 17 00:00:00 2001 From: Jesse Wright <63333554+jeswr@users.noreply.github.com> Date: Wed, 10 Aug 2022 16:52:14 +0200 Subject: [PATCH 2/6] perf: update asynciterator --- asynciterator.ts | 259 ++++++++++++++++++++++++++++------------------- 1 file changed, 154 insertions(+), 105 deletions(-) diff --git a/asynciterator.ts b/asynciterator.ts index 78f2a8e..8ea0597 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -482,7 +482,7 @@ export class AsyncIterator extends EventEmitter { @returns {module:asynciterator.AsyncIterator} A new iterator that prepends items to this iterator */ prepend(items: T[] | AsyncIterator): AsyncIterator { - return this.transform({ prepend: items }); + return union([wrap(items), this]); } /** @@ -492,7 +492,7 @@ export class AsyncIterator extends EventEmitter { @returns {module:asynciterator.AsyncIterator} A new iterator that appends items to this iterator */ append(items: T[] | AsyncIterator): AsyncIterator { - return this.transform({ append: items }); + return union([this, wrap(items)]); } /** @@ -503,7 +503,7 @@ export class AsyncIterator extends EventEmitter { @returns {module:asynciterator.AsyncIterator} A new iterator that appends and prepends items to this iterator */ surround(prepend: AsyncIteratorOrArray, append: AsyncIteratorOrArray): AsyncIterator { - return this.transform({ prepend, append }); + return union([wrap(prepend), this, wrap(append)]); } /** @@ -1619,147 +1619,196 @@ export class MultiTransformIterator extends TransformIterator { } } +/** Key indicating the node of a source in a union buffer. */ +export const NODE = Symbol('node'); + +interface CircularNode { + last: CircularNode; + next: CircularNode; + value: T; +} + /** An iterator that generates items by reading from multiple other iterators. - @extends module:asynciterator.BufferedIterator + @extends module:asynciterator.AsyncIterator */ -export class UnionIterator extends BufferedIterator { - private _sources : InternalSource[] = []; - private _pending? : { loading: boolean, sources?: AsyncIterator>> }; - private _currentSource = -1; - protected _destroySources: boolean; +export class UnionIterator extends AsyncIterator { + private _sources : AsyncIterator>; + private _node: CircularNode> | null = null; + private _size: number = 0; + private _sourceStarted: boolean = false; + private _destroySources: boolean; + private _maxParallelIterators: number; /** Creates a new `UnionIterator`. - @param {module:asynciterator.AsyncIterator|Array} [sources] The sources to read from + @param {module:asynciterator.AsyncIterator|Array} [sources] The sources to read from. @param {object} [options] Settings of the iterator @param {boolean} [options.destroySource=true] Whether the sources should be destroyed when transformed iterator is closed or destroyed + @param {number} [options.maxParallelIterators=Infinity] The maximum number of iterators that can be read from simultaneously. + Setting equal to 1 will cause iterators to be read from sequentially and in order. */ constructor(sources: AsyncIteratorOrArray> | AsyncIteratorOrArray>> | - AsyncIteratorOrArray>>, - options: BufferedIteratorOptions & { destroySources?: boolean } = {}) { - super(options); - const autoStart = options.autoStart !== false; + AsyncIteratorOrArray>> | + MaybePromise>>>, + options: { destroySources?: boolean, maxParallelIterators?: number } = {}) { + super(); + this._listenSource(this._sources = wrap(sources).map>(wrap)); - // Sources have been passed as an iterator - if (isEventEmitter(sources)) { - sources.on('error', error => this.emit('error', error)); - this._pending = { loading: false, sources: sources as AsyncIterator>> }; - if (autoStart) - this._loadSources(); - } - // Sources have been passed as a non-empty array - else if (Array.isArray(sources) && sources.length > 0) { - for (const source of sources) - this._addSource(source as MaybePromise>); - } - // Sources are an empty list - else if (autoStart) { - this.close(); - } // Set other options this._destroySources = options.destroySources !== false; + this._maxParallelIterators = options.maxParallelIterators || Infinity; + + this.readable = this._sources.readable; + } + + // Adds the given source to the internal sources array + protected _listenSource(source: InternalSource) { + source[DESTINATION] = this; + source.on('error', destinationEmitError); + // TODO: On readable events we should start reading from the iterator + // that emitted the event + source.on('readable', destinationSetReadable); + source.on('end', destinationRemoveEmptySources); + } + + protected _unListenSource(source: InternalSource) { + source.removeListener('error', destinationEmitError); + source.removeListener('readable', destinationSetReadable); + source.removeListener('end', destinationRemoveEmptySources); + delete source[DESTINATION]; } - // Loads pending sources into the sources list - protected _loadSources() { - // Obtain sources iterator - const sources = this._pending!.sources!; - this._pending!.loading = true; + protected _addSource(source: InternalSource, successor = this._node) { + this._listenSource(source); + this._size++; - // Close immediately if done - if (sources.done) { - delete this._pending; - this.close(); + let node: CircularNode>; + if (successor === null) { + // There is no circular list so make one + node = { value: source } as CircularNode>; + node.next = node; + node.last = node; + this._node = node; } - // Otherwise, set up source reading else { - sources.on('data', source => { - this._addSource(source as MaybePromise>); - this._fillBufferAsync(); - }); - sources.on('end', () => { - delete this._pending; - this._fillBuffer(); - }); + // Insert the source into the circular structure + this._node = node = { + value: source, + next: successor, + last: successor.last, + }; + + successor.last.next = node; + successor.last = node; } + source[NODE] = node; } - // Adds the given source to the internal sources array - protected _addSource(source: MaybePromise>) { - if (isPromise(source)) - source = wrap(source) as any as InternalSource; - if (!source.done) { - this._sources.push(source); - source[DESTINATION] = this; - source.on('error', destinationEmitError); - source.on('readable', destinationFillBuffer); - source.on('end', destinationRemoveEmptySources); + protected _removeSource(value: InternalSource) { + const node = value[NODE]!; + this._unListenSource(value); + delete value[NODE]; + this._size--; + + if (this._size === 0) { + this._node = null; + } + else { + node.last.next = node.next; + node.next.last = node.last; + + // Make sure the root node no longer + // references a deleted iterator. + if (this._node === node) + this._node = node.next; } } - // Removes sources that will no longer emit items - protected _removeEmptySources() { - this._sources = this._sources.filter((source, index) => { - // Adjust the index of the current source if needed - if (source.done && index <= this._currentSource) - this._currentSource--; - return !source.done; - }); - this._fillBuffer(); - } - - // Reads items from the next sources - protected _read(count: number, done: () => void): void { - // Start source loading if needed - if (this._pending?.loading === false) - this._loadSources(); - - // Try to read `count` items - let lastCount = 0, item : T | null; - while (lastCount !== (lastCount = count)) { - // Try every source at least once - for (let i = 0; i < this._sources.length && count > 0; i++) { - // Pick the next source - this._currentSource = (this._currentSource + 1) % this._sources.length; - const source = this._sources[this._currentSource]; - // Attempt to read an item from that source - if ((item = source.read()) !== null) { - count--; - this._push(item); - } - } + public read(): T | null { + // TODO: Get rid of this when merging #45 + if (!this._sourceStarted) + this._sourceStarted = true; + + let { _sources, _size } = this; + let item: T | null; + let iterator: AsyncIterator | null; + while (_size > 0) { + iterator = this._node!.value; + + if (iterator.readable && (item = iterator.read()) !== null) + return item; + + // If the iterator is done, get rid of it from the circular list + if (iterator.done) + this._removeSource(iterator); + else + this._node = this._node!.next; + + _size--; + // TODO: See if this should be an else + } + + while (this._size < this._maxParallelIterators && (iterator = _sources.read()) !== null) { + // TODO - it would be nice to skip adding sources if it is a single or no + // element iterator. + this._addSource(iterator); + if ((item = iterator.read()) !== null) + return item; } - // Close this iterator if all of its sources have been read - if (!this._pending && this._sources.length === 0) + if (this._size === 0 && this._sources.done) this.close(); - done(); - } + else + this.readable = false; - protected _end(destroy: boolean = false) { - super._end(destroy); + return null; + } + public close() { + this._unListenSource(this._sources); // Destroy all sources that are still readable if (this._destroySources) { - for (const source of this._sources) - source.destroy(); - - // Also close the sources stream if applicable - if (this._pending) { - this._pending!.sources!.destroy(); - delete this._pending; + let { _node, _size } = this; + while (_size > 0) { + this._unListenSource(_node!.value); + _node!.value!.destroy(); + delete (_node as any)[NODE]; + _size--; + _node = _node!.next; } + this._node = null; } + this._sources.destroy(); + super.close(); } } function destinationRemoveEmptySources(this: InternalSource) { - (this[DESTINATION] as any)._removeEmptySources(); + // Eventually, rather than just removing empty sources I think + // we will want to find a way of specifically removing the source + // that emitted the end event (the trick is to do this without creating race conditions) + const destination = this[DESTINATION] as any; + if (NODE in this) { + destination._removeSource(this); + if (destination._size === 0 && destination._sources.done && destination._sourceStarted) + destination!.close(); + // Also capture the case where we need to just start re-filling the circular + + // else if (destination._size < destination._maxParallelIterators && destination._sources.readable) { + // TODO: Add a test case for this + this.readable = true; + // TODO: Future performance improvement - continue re-filling the circular linked list + // } + } + else { + destination._unListenSource(this); + if (destination._size === 0 && destination._sourceStarted) + destination!.close(); + } } - /** An iterator that copies items from another iterator. @extends module:asynciterator.TransformIterator @@ -2224,7 +2273,7 @@ type SourceExpression = (() => MaybePromise>); type InternalSource = - AsyncIterator & { [DESTINATION]?: AsyncIterator }; + AsyncIterator & { [DESTINATION]?: AsyncIterator; [NODE]?: CircularNode> }; // Returns a function that calls `fn` with `self` as `this` pointer. */ function bind(fn: T, self?: object): T { From 58c074a88da1210f22a30279026f774129248dd0 Mon Sep 17 00:00:00 2001 From: Jesse Wright <63333554+jeswr@users.noreply.github.com> Date: Wed, 10 Aug 2022 16:52:57 +0200 Subject: [PATCH 3/6] chore: add UnionIterator performance test --- perf/UnionIterator-perf.js | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) create mode 100644 perf/UnionIterator-perf.js diff --git a/perf/UnionIterator-perf.js b/perf/UnionIterator-perf.js new file mode 100644 index 0000000..a1be93a --- /dev/null +++ b/perf/UnionIterator-perf.js @@ -0,0 +1,33 @@ +import { UnionIterator, range } from '../dist/asynciterator.js'; +import { promisifyEventEmitter } from 'event-emitter-promisify'; + +let it; + +// Warmup + +console.time('For loop with 5x10^9 elems'); +for (let i = 0; i < 5_000_000_000; i++) + ; +console.timeEnd('For loop with 5x10^9 elems'); + +console.time('UnionIterator 2x10^7 iterators'); +for (let i = 0; i < 5; i++) { + it = new UnionIterator([range(0, 10_000_000), range(0, 10_000_000)]); + await promisifyEventEmitter(it.on('data', () => { /* noop */ })); +} +console.timeEnd('UnionIterator 2x10^7 iterators'); + +console.time('UnionIterator 1000x500 iterators'); +for (let i = 0; i < 5; i++) { + it = new UnionIterator(range(0, 1000).map(() => range(0, 500))); + await promisifyEventEmitter(it.on('data', () => { /* noop */ })); +} +console.timeEnd('UnionIterator 1000x500 iterators'); + + +console.time('UnionIterator 1000x500 iterators - max parallelism of 1'); +for (let i = 0; i < 5; i++) { + it = new UnionIterator(range(0, 1000).map(() => range(0, 500)), { maxParallelIterators: 1 }); + await promisifyEventEmitter(it.on('data', () => { /* noop */ })); +} +console.timeEnd('UnionIterator 1000x500 iterators - max parallelism of 1'); From 12a5e0034eb51cacdb68500a129873cea3815ec0 Mon Sep 17 00:00:00 2001 From: Jesse Wright <63333554+jeswr@users.noreply.github.com> Date: Wed, 10 Aug 2022 16:53:24 +0200 Subject: [PATCH 4/6] chore: add event-emitter-promisify for perfomance test suite --- package-lock.json | 13 +++++++++++++ package.json | 1 + 2 files changed, 14 insertions(+) diff --git a/package-lock.json b/package-lock.json index 072a897..984020f 100644 --- a/package-lock.json +++ b/package-lock.json @@ -19,6 +19,7 @@ "c8": "^7.2.0", "chai": "^4.2.0", "eslint": "^8.0.0", + "event-emitter-promisify": "^1.1.0", "husky": "^4.2.5", "jaguarjs-jsdoc": "^1.1.0", "jsdoc": "^3.5.5", @@ -1789,6 +1790,12 @@ "node": ">=0.10.0" } }, + "node_modules/event-emitter-promisify": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/event-emitter-promisify/-/event-emitter-promisify-1.1.0.tgz", + "integrity": "sha512-uyHG8gjwYGDlKoo0Txtx/u1HI1ubj0FK0rVqI4O0s1EymQm4iAEMbrS5B+XFlSaS8SZ3xzoKX+YHRZk8Nk/bXg==", + "dev": true + }, "node_modules/fast-deep-equal": { "version": "3.1.3", "resolved": "https://registry.npmjs.org/fast-deep-equal/-/fast-deep-equal-3.1.3.tgz", @@ -5461,6 +5468,12 @@ "integrity": "sha512-kVscqXk4OCp68SZ0dkgEKVi6/8ij300KBWTJq32P/dYeWTSwK41WyTxalN1eRmA5Z9UU/LX9D7FWSmV9SAYx6g==", "dev": true }, + "event-emitter-promisify": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/event-emitter-promisify/-/event-emitter-promisify-1.1.0.tgz", + "integrity": "sha512-uyHG8gjwYGDlKoo0Txtx/u1HI1ubj0FK0rVqI4O0s1EymQm4iAEMbrS5B+XFlSaS8SZ3xzoKX+YHRZk8Nk/bXg==", + "dev": true + }, "fast-deep-equal": { "version": "3.1.3", "resolved": "https://registry.npmjs.org/fast-deep-equal/-/fast-deep-equal-3.1.3.tgz", diff --git a/package.json b/package.json index b6498f9..ed6587a 100644 --- a/package.json +++ b/package.json @@ -53,6 +53,7 @@ "c8": "^7.2.0", "chai": "^4.2.0", "eslint": "^8.0.0", + "event-emitter-promisify": "^1.1.0", "husky": "^4.2.5", "jaguarjs-jsdoc": "^1.1.0", "jsdoc": "^3.5.5", From 821058d2fac0535e4a913a22789f4ff129d773f9 Mon Sep 17 00:00:00 2001 From: Jesse Wright <63333554+jeswr@users.noreply.github.com> Date: Wed, 10 Aug 2022 17:44:57 +0200 Subject: [PATCH 5/6] chore: improve test coverage --- asynciterator.ts | 13 +++++++++---- test/UnionIterator-test.js | 38 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 47 insertions(+), 4 deletions(-) diff --git a/asynciterator.ts b/asynciterator.ts index 8ea0597..433a4c4 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -1753,9 +1753,11 @@ export class UnionIterator extends AsyncIterator { while (this._size < this._maxParallelIterators && (iterator = _sources.read()) !== null) { // TODO - it would be nice to skip adding sources if it is a single or no // element iterator. - this._addSource(iterator); - if ((item = iterator.read()) !== null) - return item; + // if (!iterator.done) { + this._addSource(iterator); + if ((item = iterator.read()) !== null) + return item; + // } } if (this._size === 0 && this._sources.done) @@ -1798,7 +1800,10 @@ function destinationRemoveEmptySources(this: InternalSource) { // else if (destination._size < destination._maxParallelIterators && destination._sources.readable) { // TODO: Add a test case for this - this.readable = true; + if (this.readable) + this.emit('readable') + else + this.readable = true; // TODO: Future performance improvement - continue re-filling the circular linked list // } } diff --git a/test/UnionIterator-test.js b/test/UnionIterator-test.js index 4e32860..b07b86c 100644 --- a/test/UnionIterator-test.js +++ b/test/UnionIterator-test.js @@ -68,6 +68,27 @@ describe('UnionIterator', () => { (await toArray(iterator)).sort().should.eql([0, 1, 2]); }); + it('should include all data from 0 sources - with maxParallelIterators: 1', async () => { + const iterator = new UnionIterator([ + ], { maxParallelIterators: 1 }); + (await toArray(iterator)).sort().should.eql([]); + }); + + it('should include all data from 1 non-empty source - with maxParallelIterators: 1', async () => { + const iterator = new UnionIterator([ + range(0, 2), + ], { maxParallelIterators: 1 }); + (await toArray(iterator)).sort().should.eql([0, 1, 2]); + }); + + it('should include all data from 2 non-empty sources - with maxParallelIterators: 1', async () => { + const iterator = new UnionIterator([ + range(0, 2), + range(3, 4), + ], { maxParallelIterators: 1 }); + (await toArray(iterator)).sort().should.eql([0, 1, 2, 3, 4]); + }); + it('should include all data from 1 non-empty and 4 empty sources - with maxParallelIterators: 1', async () => { const iterator = new UnionIterator([ new EmptyIterator(), @@ -79,6 +100,23 @@ describe('UnionIterator', () => { (await toArray(iterator)).sort().should.eql([0, 1, 2]); }); + it('should include all data from 4 empty sources - with maxParallelIterators: 1', async () => { + const iterator = new UnionIterator([ + new EmptyIterator(), + new EmptyIterator(), + new EmptyIterator(), + new EmptyIterator(), + ], { maxParallelIterators: 1 }); + (await toArray(iterator)).sort().should.eql([]); + }); + + it('should include all data from 1 empty source - with maxParallelIterators: 1', async () => { + const iterator = new UnionIterator([ + new EmptyIterator(), + ], { maxParallelIterators: 1 }); + (await toArray(iterator)).sort().should.eql([]); + }); + describe('when constructed with an array of 0 sources', () => { let iterator; before(() => { From 6e7f093364bef006db6b06136aeb80eafce82d64 Mon Sep 17 00:00:00 2001 From: Jesse Wright <63333554+jeswr@users.noreply.github.com> Date: Thu, 11 Aug 2022 09:49:27 +0200 Subject: [PATCH 6/6] fix: fix failing on empty sources --- asynciterator.ts | 51 ++++++++++++++++++++++++++---------------------- 1 file changed, 28 insertions(+), 23 deletions(-) diff --git a/asynciterator.ts b/asynciterator.ts index 433a4c4..9d6d3cd 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -1740,24 +1740,21 @@ export class UnionIterator extends AsyncIterator { if (iterator.readable && (item = iterator.read()) !== null) return item; - // If the iterator is done, get rid of it from the circular list - if (iterator.done) - this._removeSource(iterator); - else - this._node = this._node!.next; + // TODO: in #45 - if the iterator is done, get rid of it from the circular list + // if (iterator.done) + // this._removeSource(iterator); + // else + this._node = this._node!.next; _size--; - // TODO: See if this should be an else } while (this._size < this._maxParallelIterators && (iterator = _sources.read()) !== null) { - // TODO - it would be nice to skip adding sources if it is a single or no - // element iterator. - // if (!iterator.done) { + if (!iterator.done) { this._addSource(iterator); - if ((item = iterator.read()) !== null) + if (iterator.readable && (item = iterator.read()) !== null) return item; - // } + } } if (this._size === 0 && this._sources.done) @@ -1794,23 +1791,31 @@ function destinationRemoveEmptySources(this: InternalSource) { const destination = this[DESTINATION] as any; if (NODE in this) { destination._removeSource(this); - if (destination._size === 0 && destination._sources.done && destination._sourceStarted) + if (destination._size === 0 && destination._sources.done && destination._sourceStarted) { destination!.close(); - // Also capture the case where we need to just start re-filling the circular - - // else if (destination._size < destination._maxParallelIterators && destination._sources.readable) { - // TODO: Add a test case for this - if (this.readable) - this.emit('readable') - else - this.readable = true; - // TODO: Future performance improvement - continue re-filling the circular linked list + return; + } + destination.readable = true; + // const _sources = destination._sources; + // let iterator; + // while (destination._size < destination._maxParallelIterators && iterator.readable && (iterator = _sources.read()) !== null) { + // if (!iterator.done) { + // destination._addSource(iterator); + // if (iterator.readable) { + // destination.readable = true; + // return; + // } + // } // } } else { destination._unListenSource(this); - if (destination._size === 0 && destination._sourceStarted) - destination!.close(); + if (destination._size === 0) { + if (destination._sourceStarted) + destination.close(); + else + destination.readable = true; + } } }