diff --git a/asynciterator.ts b/asynciterator.ts index 000fd10..6db2bf7 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -456,7 +456,23 @@ export class AsyncIterator extends EventEmitter { @returns {module:asynciterator.AsyncIterator} A new iterator that maps the items from this iterator */ map(map: (item: T) => D, self?: any): AsyncIterator { - return this.transform({ map: self ? map.bind(self) : map }); + return new MappingIterator(this, self ? map.bind(self) : map); + } + + /** + Possibly maps items, return null if no item should be emitted. + @param {Function} maybeMap The function to multiMap items with + */ + maybeMap(map: (item: T) => D | null, self?: any): AsyncIterator { + return new MaybeMappingIterator(this, self ? map.bind(self) : map); + } + + /** + MultiMaps items according to a synchronous generator (hence no need for buffering) + @param {Function} multiMap The function to multiMap items with + */ + multiMap(multiMap: (item: T) => Generator): AsyncIterator { + return new MultiMappingIterator(this, multiMap); } /** @@ -469,7 +485,7 @@ export class AsyncIterator extends EventEmitter { filter(filter: (item: T) => item is K, self?: any): AsyncIterator; filter(filter: (item: T) => boolean, self?: any): AsyncIterator; filter(filter: (item: T) => boolean, self?: any): AsyncIterator { - return this.transform({ filter: self ? filter.bind(self) : filter }); + return new FilteringIterator(this, self ? filter.bind(self) : filter); } /** @@ -510,7 +526,7 @@ export class AsyncIterator extends EventEmitter { @returns {module:asynciterator.AsyncIterator} A new iterator that skips the given number of items */ skip(offset: number): AsyncIterator { - return this.transform({ offset }); + return new SkippingIterator(this, offset); } /** @@ -520,7 +536,7 @@ export class AsyncIterator extends EventEmitter { @returns {module:asynciterator.AsyncIterator} A new iterator with at most the given number of items */ take(limit: number): AsyncIterator { - return this.transform({ limit }); + return new LimitingIterator(this, limit); } /** @@ -785,6 +801,11 @@ export class IntegerIterator extends AsyncIterator { @extends module:asynciterator.AsyncIterator */ export class BufferedIterator extends AsyncIterator { + static ensureInit(iterator: BufferedIterator) { + if (iterator._state === INIT) + iterator._init(true); + } + private _buffer: LinkedList = new LinkedList(); private _maxBufferSize = 4; protected _reading = true; @@ -1251,6 +1272,328 @@ function destinationFillBuffer(this: InternalSource) { (this._destination as any)._fillBuffer(); } +export abstract class SynchronousTransformIterator extends AsyncIterator { + protected _source: AsyncIterator; + protected _sourceStarted: boolean; + + get fastInfo(): Transform | false { + return false; + } + + protected constructor(source: AsyncIterator) { + /* eslint-disable no-use-before-define */ + super(); + this._source = source; + this._sourceStarted = false; + const cleanup = () => { + source.removeListener('end', onSourceEnd); + source.removeListener('error', onSourceError); + source.removeListener('readable', onSourceReadable); + taskScheduler(() => { + // Delayed as there might be pending tasks using the source at the + // time that cleanup() is called. + delete this._source; + }); + }; + const onSourceEnd = () => { + cleanup(); + this.close(); + }; + const onSourceError = (err: Error) => { + this.emit('error', err); + }; + const onSourceReadable = () => { + if (this.readable) { + // TODO: I'm not completely sure as to why this is needed but without + // the following line, some use cases relying on flow mode (i.e. + // consuming items via `on('data', (data) => {})`) do not work. + // It looks like the debouncing that happens in `set readable()` + // in `AsyncIterator` prevents the event from firing as `this` + // is already readable. + this.emit('readable'); + } + else { + this.readable = true; + } + }; + source.on('end', onSourceEnd); + source.on('error', onSourceError); + source.on('readable', onSourceReadable); + if (source.done) + onSourceEnd(); + else if (source.readable) + onSourceReadable(); + } + + protected _destroy(cause: Error | undefined, callback: (error?: Error) => void) { + super._destroy(cause, callback); + this._source.destroy(cause); + } + + protected _readSource(): S | null { + if (!this._sourceStarted) { + this._sourceStarted = true; + if (this._source instanceof BufferedIterator) + BufferedIterator.ensureInit(this._source); + } + return this._source.read(); + } + + map(map: (item: D) => T, self?: any): AsyncIterator { + const next = this.fastInfo; + if (!next) + return super.map(map, self); + return new MultiMapFilterTransformIterator(this._source, { + filter: false, + function: self ? map.bind(self) : map, + next, + }); + } + + filter(filter: (item: D) => boolean, self?: any): AsyncIterator { + const next = this.fastInfo; + if (!next) + return super.filter(filter, self); + return new MultiMapFilterTransformIterator(this._source, { + filter: true, + function: self ? filter.bind(self) : filter, + next, + }); + } + + maybeMap(map: (item: D) => T | null, self?: any): AsyncIterator { + const next = this.fastInfo; + if (!next) + return super.maybeMap(map, self); + return new MultiMapFilterTransformIterator(this._source, { + filter: false, + nullable: true, + function: self ? map.bind(self) : map, + next, + }); + } +} + +export class MultiMappingIterator extends SynchronousTransformIterator { + protected readonly _map: (item: S) => Generator; + private generator?: Generator; + + constructor(source: AsyncIterator, map: (item: S) => Generator) { + super(source); + this._map = map; + } + + read(): D | null { + let _item; + + // eslint-disable-next-line no-constant-condition + while (true) { + if (!this.generator) { + if ((_item = this._readSource()) === null) + return null; + this.generator = this._map(_item); + } + if (!(_item = this.generator.next()).done) + return _item.value; + this.generator = undefined; + } + } +} + +export class MaybeMappingIterator extends SynchronousTransformIterator { + protected readonly _map: (item: S) => D | null; + + constructor(source: AsyncIterator, map: (item: S) => D | null) { + super(source); + this._map = map; + } + + get fastInfo(): Transform { + return { + filter: false, + nullable: true, + function: this._map, + }; + } + + read(): D | null { + let item; + while ((item = this._readSource()) !== null) { + if ((item = this._map(item)) !== null) + return item; + } + return null; + } +} + +export class MappingIterator extends SynchronousTransformIterator { + protected readonly _map: (item: S) => D; + + constructor(source: AsyncIterator, map: (item: S) => D) { + super(source); + this._map = map; + } + + getInfo(): Transform { + return { + filter: false, + function: this._map, + }; + } + + read(): D | null { + const item = this._readSource(); + if (item !== null) + return this._map(item); + return null; + } +} + +export class FilteringIterator extends SynchronousTransformIterator { + protected readonly _filter: (item: T) => boolean; + + constructor(source: AsyncIterator, filter: (item: T) => boolean) { + super(source); + this._filter = filter; + } + + getInfo(): Transform { + return { + filter: true, + function: this._filter, + }; + } + + read(): T | null { + let item; + while ((item = this._readSource()) !== null) { + if (this._filter(item)) + return item; + } + return null; + } +} + +export class SkippingIterator extends SynchronousTransformIterator { + protected readonly _skip: number; + protected _skipped: number; + + constructor(source: AsyncIterator, skip: number) { + super(source); + this._skip = skip; + this._skipped = 0; + } + + read(): T | null { + let item; + while ((item = this._readSource()) !== null) { + if (this._skipped < this._skip) + this._skipped += 1; + else + return item; + } + return null; + } +} + +export class LimitingIterator extends SynchronousTransformIterator { + protected readonly _limit: number; + protected _count: number; + + constructor(source: AsyncIterator, limit: number) { + super(source); + this._limit = limit; + this._count = 0; + } + + read(): T | null { + const item = this._readSource(); + if (item !== null) { + if (this._count < this._limit) { + this._count += 1; + return item; + } + this.close(); + return null; + } + return null; + } +} + +interface Transform { + filter: boolean, + nullable?: true, + function: Function, + next?: Transform +} + +export class MultiMapFilterTransformIterator extends SynchronousTransformIterator { + private _transformation?: (item: S) => D | null; + + constructor(source: AsyncIterator, private transforms: Transform) { + super(source); + } + + protected transformation(_item: S): D | null { + if (!this._transformation) { + let _transforms: Transform | undefined = this.transforms; + + const { filter, function: func } = _transforms!; + + this._transformation = filter ? + ((item: any) => func(item) ? item : null) : + func as any; + + while ((_transforms = _transforms!.next) !== undefined) { + const { filter: _filter, function: _func, nullable: _nullable } = _transforms; + const t = this._transformation!; + + // eslint-disable-next-line no-nested-ternary + this._transformation = _filter ? + (item: any) => _func(item) ? t(item) : null : ( + _nullable === true ? + (item: any) => ((item = _func(item)) === null ? null : t(item)) : + (item: any) => t(_func(item)) + ); + } + } + return this._transformation!(_item); + } + + read(): D | null { + let item; + while ((item = this._readSource()) !== null) { + if ((item = this.transformation(item)) !== null) + return item; + } + return null; + } + + map(map: (item: D) => T, self?: any): AsyncIterator { + return new MultiMapFilterTransformIterator(this._source, { + filter: false, + function: self ? map.bind(self) : map, + next: this.transforms, + }); + } + + filter(filter: (item: D) => boolean, self?: any): AsyncIterator { + return new MultiMapFilterTransformIterator(this._source, { + filter: true, + function: self ? filter.bind(self) : filter, + next: this.transforms, + }); + } + + maybeMap(map: (item: D) => T | null, self?: any): AsyncIterator { + return new MultiMapFilterTransformIterator(this._source, { + filter: false, + nullable: true, + function: self ? map.bind(self) : map, + next: this.transforms, + }); + } +} /** An iterator that generates items based on a source iterator @@ -1514,7 +1857,7 @@ export class MultiTransformIterator extends TransformIterator { */ export class UnionIterator extends BufferedIterator { private _sources : InternalSource[] = []; - private _pending? : { sources?: AsyncIterator> }; + private _pending? : { sources?: AsyncIterator>> }; private _currentSource = -1; /** @@ -1522,7 +1865,9 @@ export class UnionIterator extends BufferedIterator { @param {module:asynciterator.AsyncIterator|Array} [sources] The sources to read from @param {object} [options] Settings of the iterator */ - constructor(sources: AsyncIteratorOrArray>, + constructor(sources: AsyncIteratorOrArray> | + AsyncIteratorOrArray>> | + AsyncIteratorOrArray>>, options: BufferedIteratorOptions = {}) { super(options); const autoStart = options.autoStart !== false; @@ -1530,14 +1875,14 @@ export class UnionIterator extends BufferedIterator { // Sources have been passed as an iterator if (isEventEmitter(sources)) { sources.on('error', error => this.emit('error', error)); - this._pending = { sources }; + this._pending = { sources: sources as AsyncIterator>> }; if (autoStart) this._loadSources(); } // Sources have been passed as a non-empty array else if (Array.isArray(sources) && sources.length > 0) { for (const source of sources) - this._addSource(source as InternalSource); + this._addSource(source as MaybePromise>); } // Sources are an empty list else if (autoStart) { @@ -1559,7 +1904,7 @@ export class UnionIterator extends BufferedIterator { // Otherwise, set up source reading else { sources.on('data', source => { - this._addSource(source as InternalSource); + this._addSource(source as MaybePromise>); this._fillBufferAsync(); }); sources.on('end', () => { @@ -1570,7 +1915,9 @@ export class UnionIterator extends BufferedIterator { } // Adds the given source to the internal sources array - protected _addSource(source: InternalSource) { + protected _addSource(source: MaybePromise>) { + if (isPromise(source)) + source = wrap(source) as any as InternalSource; if (!source.done) { this._sources.push(source); source._destination = this; @@ -1848,17 +2195,182 @@ class HistoryReader { } } +export type AsyncIteratorLike = EventEmitter & { + on: (event: string | symbol, listener: (...args: any[]) => void) => AsyncIterator; read: () => T | null }; + +/* eslint-disable arrow-body-style */ +export const isAsyncIteratorLike = (item: { [key: string]: any }): item is AsyncIteratorLike => { + return isFunction(item.on) && isFunction(item.read); +}; + +export const isIterator = (item: { [key: string]: any }): item is Iterator => { + return isFunction(item.next); +}; + +export const isIterable = (item: { [key: string]: any }): item is Iterable => { + return Symbol.iterator in item; +}; + +export class WrappingIterator extends AsyncIterator { + protected _source?: AsyncIteratorLike; + protected _sourceStarted: boolean; + + constructor(sourceOrPromise: WrapSource | Promise>, options: WrapOptions = {}) { + super(); + this._sourceStarted = false; + if (isPromise(sourceOrPromise)) { + sourceOrPromise + .then(source => { + WrappingIterator._wrapSource(source, this, options); + }) + .catch(err => { + this.emit('error', err); + }); + } + else { + WrappingIterator._wrapSource(sourceOrPromise, this, options); + } + } + + protected static _wrapSource(source: WrapSource, iterator: WrappingIterator, options: WrapOptions = {}) { + try { + const wrappedSource = isAsyncIteratorLike(source) ? source : _wrap(source, options); + + const cleanup = () => { + wrappedSource.removeListener('end', onSourceEnd); + wrappedSource.removeListener('error', onSourceError); + wrappedSource.removeListener('readable', onSourceReadable); + scheduleTask(() => { + delete iterator._source; + }); + }; + + const onSourceReadable = () => { + if (iterator.readable) { + // TODO: I'm not completely sure as to why this is needed but without + // the following line, some use cases relying on flow mode (i.e. + // consuming items via `on('data', (data) => {})`) do not work. + // It looks like the debouncing that happens in `set readable()` + // in `AsyncIterator` prevents the event from firing as `this` + // is already readable. + iterator.emit('readable'); + } + else { + iterator.readable = true; + } + }; + + const onSourceEnd = () => { + iterator.close(); + cleanup(); + }; + + const onSourceError = (err: Error) => { + iterator.emit('error', err); + }; + + wrappedSource.on('end', onSourceEnd); + wrappedSource.on('error', onSourceError); + wrappedSource.on('readable', onSourceReadable); + + iterator._source = wrappedSource; + + if (wrappedSource instanceof AsyncIterator && wrappedSource.readable) + onSourceReadable(); + } + catch (err) { + scheduleTask(() => iterator.emit('error', err)); + } + } + + read(): T | null { + if (this._source) { + if (!this._sourceStarted && this._source instanceof BufferedIterator) + BufferedIterator.ensureInit(this._source); + return this._source.read(); + } + return null; + } +} + +export class IteratorIterator extends AsyncIterator { + constructor(private source: Iterator) { + super(); + this.readable = true; + } + + read(): T | null { + const item = this.source.next(); + if (item.done) { + this.close(); + return null; + } + return item.value; + } +} + +export interface WrapOptions { + prioritizeIterable?: boolean; + letIteratorThrough?: boolean; +} + +export type WrapSource = T[] | EventEmitter | Iterator | Iterable; + +const _wrap = (source: WrapSource, options: WrapOptions = {}): AsyncIterator => { + if (options.letIteratorThrough && source instanceof AsyncIterator) + return source; + if (options.prioritizeIterable) { + if (isIterator(source)) + return fromIterator(source); + if (isIterable(source)) + return fromIterable(source); + } + if (Array.isArray(source)) + return fromArray(source); + if (isAsyncIteratorLike(source)) + return fromAsyncIteratorLike(source); + if (!options.prioritizeIterable) { + if (isIterator(source)) + return fromIterator(source); + if (isIterable(source)) + return fromIterable(source); + } + throw new Error(`Unsupported source ${source}`); +}; + /** - Creates an iterator that wraps around a given iterator or readable stream. + Creates an iterator that wraps around a given array, iterator, iterable or + readable stream. Use this to convert an iterator-like object into a full-featured AsyncIterator. After this operation, only read the returned iterator instead of the given one. @function - @param {module:asynciterator.AsyncIterator|Readable} [source] The source this iterator generates items from + @param {module:asynciterator.AsyncIterator|Readable} [sourceOrPromise] The source this iterator generates items from @param {object} [options] Settings of the iterator @returns {module:asynciterator.AsyncIterator} A new iterator with the items from the given iterator */ -export function wrap(source: EventEmitter | Promise, options?: TransformIteratorOptions) { - return new TransformIterator(source as AsyncIterator | Promise>, options); +export function wrap( + sourceOrPromise: WrapSource | Promise>, + options: TransformIteratorOptions & WrapOptions = {}, +): AsyncIterator { + // For backward compatibility, passing TransformIteratorOptions results in + // an instance of TransformIterator. + // TODO: consider dropping this in the next major version + if ('maxBufferSize' in options || 'autoStart' in options || 'optional' in options || 'destroySource' in options) + return new TransformIterator(sourceOrPromise as AsyncIterator | Promise>, options); + // If the source is promisified, we *need* to use a WrappingIterator as this + // function is a synchronous one. + if (isPromise(sourceOrPromise)) + return new WrappingIterator(sourceOrPromise, options); + // The _wrap function synchronously return an iterator or throws on + // unsupported sources. However, for backward-compatiblity we need + // to relay errors as events of an AsyncIterator instance. + // TODO: consider dropping this in the next major version + try { + return _wrap(sourceOrPromise as WrapSource, options); + } + catch (err) { + return new WrappingIterator(sourceOrPromise); + } } /** @@ -1880,15 +2392,42 @@ export function single(item: T) { Creates an iterator for the given array. @param {Array} items the items */ -export function fromArray(items: Iterable) { +export function fromArray(items: Iterable): AsyncIterator { return new ArrayIterator(items); } +/** + Creates an iterator for the given ES2015 Iterable. + @param {Iterable} iterable the iterable + */ +export function fromIterable(iterable: Iterable): AsyncIterator { + return new IteratorIterator(iterable[Symbol.iterator]()); +} + +/** + Creates an iterator for the given ES2015 Iterator. + @param {Iterable} iterator the iterator + */ +export function fromIterator(iterator: Iterator): AsyncIterator { + return new IteratorIterator(iterator); +} + +/** + * Creates an iterator for the given iterator-like object + * (AsyncIterator, stream.Readable, ...). + * @param {AsyncIteratorLike} iterator + */ +export function fromAsyncIteratorLike(iterator: AsyncIteratorLike): AsyncIterator { + return new WrappingIterator(iterator); +} + /** Creates an iterator containing all items from the given iterators. @param {Array} items the items */ -export function union(sources: AsyncIteratorOrArray>) { +export function union(sources: AsyncIteratorOrArray> | + AsyncIteratorOrArray>> | + AsyncIteratorOrArray>>) { return new UnionIterator(sources); } diff --git a/package-lock.json b/package-lock.json index 831df0f..d438e01 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "asynciterator", - "version": "3.3.0", + "version": "3.4.0", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "asynciterator", - "version": "3.3.0", + "version": "3.4.0", "license": "MIT", "devDependencies": { "@babel/cli": "^7.10.1", diff --git a/package.json b/package.json index 03b490b..edcab1f 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "asynciterator", - "version": "3.3.0", + "version": "3.4.0", "description": "An asynchronous iterator library for advanced object pipelines.", "author": "Ruben Verborgh ", "type": "module", diff --git a/test/AsyncIterator-test.js b/test/AsyncIterator-test.js index 37a74d6..5f0dfef 100644 --- a/test/AsyncIterator-test.js +++ b/test/AsyncIterator-test.js @@ -5,6 +5,7 @@ import { ENDED, DESTROYED, scheduleTask, + range, } from '../dist/asynciterator.js'; import { EventEmitter } from 'events'; @@ -1307,4 +1308,96 @@ describe('AsyncIterator', () => { }); }); }); + describe('Testing chains fo maps and filters', () => { + let iterator; + beforeEach(() => { + iterator = range(0, 2); + }); + it('Should handle no transforms', async () => { + iterator.read().should.equal(0); + iterator.read().should.equal(1); + iterator.read().should.equal(2); + }); + it('Should handle no transforms arrayified', async () => { + (await iterator.toArray()).should.deep.equal([0, 1, 2]); + }); + it('Should apply maps that doubles correctly', async () => { + (await iterator.map(x => x * 2).toArray()).should.deep.equal([0, 2, 4]); + }); + it('Should apply maps that doubles correctly', async () => { + (await iterator.map(x => x * 2).maybeMap(x => x === 2 ? null : x * 3).toArray()).should.deep.equal([0, 12]); + }); + it('Should apply maps that doubles correctly', async () => { + (await iterator.maybeMap(x => x === 2 ? null : x * 3).toArray()).should.deep.equal([0, 3]); + }); + it('Should apply maps that doubles correctly', async () => { + (await iterator.maybeMap(x => x === 2 ? null : x * 3).maybeMap(x => x === 0 ? null : x * 3).toArray()).should.deep.equal([9]); + }); + it('Should apply maps that doubles correctly', async () => { + (await iterator.map(x => `x${x}`).toArray()).should.deep.equal(['x0', 'x1', 'x2']); + }); + it('Should apply filter correctly', async () => { + (await iterator.filter(x => x % 2 === 0).toArray()).should.deep.equal([0, 2]); + }); + it('Should apply filter then map correctly', async () => { + (await iterator.filter(x => x % 2 === 0).map(x => `x${x}`).toArray()).should.deep.equal(['x0', 'x2']); + }); + it('Should apply map then filter correctly (1)', async () => { + (await iterator.map(x => x).filter(x => x % 2 === 0).toArray()).should.deep.equal([0, 2]); + }); + it('Should apply map then filter to false correctly', async () => { + (await iterator.map(x => `x${x}`).filter(x => true).toArray()).should.deep.equal(['x0', 'x1', 'x2']); + }); + it('Should apply map then filter to true correctly', async () => { + (await iterator.map(x => `x${x}`).filter(x => false).toArray()).should.deep.equal([]); + }); + it('Should apply filter to false then map correctly', async () => { + (await iterator.filter(x => true).map(x => `x${x}`).toArray()).should.deep.equal(['x0', 'x1', 'x2']); + }); + it('Should apply filter to true then map correctly', async () => { + (await iterator.filter(x => false).map(x => `x${x}`).filter(x => false).toArray()).should.deep.equal([]); + }); + it('Should apply filter one then double', async () => { + (await iterator.filter(x => x !== 1).map(x => x * 2).toArray()).should.deep.equal([0, 4]); + }); + it('Should apply double then filter one', async () => { + (await iterator.map(x => x * 2).filter(x => x !== 1).toArray()).should.deep.equal([0, 2, 4]); + }); + it('Should apply map then filter correctly', async () => { + (await iterator.map(x => `x${x}`).filter(x => (x[1] === '0')).toArray()).should.deep.equal(['x0']); + }); + it('Should correctly apply 3 filters', async () => { + (await range(0, 5).filter(x => x !== 1).filter(x => x !== 2).filter(x => x !== 2).toArray()).should.deep.equal([0, 3, 4, 5]); + }); + it('Should correctly apply 3 maps', async () => { + (await range(0, 1).map(x => x * 2).map(x => `z${x}`).map(x => `y${x}`).toArray()).should.deep.equal(['yz0', 'yz2']); + }); + it('Should correctly apply a map, followed by a filter, followed by another map', async () => { + (await range(0, 1).map(x => x * 2).filter(x => x !== 2).map(x => `y${x}`).toArray()).should.deep.equal(['y0']); + }); + it('Should correctly apply a filter-map-filter', async () => { + (await range(0, 2).filter(x => x !== 1).map(x => x * 3).filter(x => x !== 6).toArray()).should.deep.equal([0]); + }); + it('Should handle transforms', async () => { + iterator = iterator.multiMap(function* (data) { + yield `x${data}`; + yield `y${data}`; + }); + (await iterator.toArray()).should.deep.equal(['x0', 'y0', 'x1', 'y1', 'x2', 'y2']); + }); + it('Should handle transforms and maps', async () => { + iterator = iterator.multiMap(function* (data) { + yield `x${data}`; + yield `y${data}`; + }).map(x => `z${x}`); + (await iterator.toArray()).should.deep.equal(['zx0', 'zy0', 'zx1', 'zy1', 'zx2', 'zy2']); + }); + it('Should handle maps and transforms', async () => { + iterator = iterator.map(x => `z${x}`).multiMap(function* (data) { + yield `x${data}`; + yield `y${data}`; + }); + (await iterator.toArray()).should.deep.equal(['xz0', 'yz0', 'xz1', 'yz1', 'xz2', 'yz2']); + }); + }); }); diff --git a/test/FilteringIterator-test.js b/test/FilteringIterator-test.js new file mode 100644 index 0000000..f9d85ef --- /dev/null +++ b/test/FilteringIterator-test.js @@ -0,0 +1,62 @@ +import { + AsyncIterator, + ArrayIterator, + FilteringIterator, +} from '../dist/asynciterator.js'; + +import { EventEmitter } from 'events'; + +describe('FilteringIterator', () => { + describe('The FilteringIterator function', () => { + describe('the result when called with `new`', () => { + let instance; + before(() => { + instance = new FilteringIterator(new ArrayIterator([]), item => true); + }); + + it('should be a FilteringIterator object', () => { + instance.should.be.an.instanceof(FilteringIterator); + }); + + it('should be an AsyncIterator object', () => { + instance.should.be.an.instanceof(AsyncIterator); + }); + + it('should be an EventEmitter object', () => { + instance.should.be.an.instanceof(EventEmitter); + }); + }); + }); + + describe('A FilteringIterator', () => { + let iterator, source; + before(() => { + source = new ArrayIterator([0, 1, 2, 3, 4, 5, 6]); + iterator = new FilteringIterator(source, item => item % 2 === 0); + }); + + describe('when reading items', () => { + const items = []; + before(done => { + iterator.on('data', item => { items.push(item); }); + iterator.on('end', done); + }); + + it('should return items mapped according to the mapping function', () => { + items.should.deep.equal([0, 2, 4, 6]); + }); + }); + }); + + describe('A FilteringIterator with a source that emits 0 items', () => { + it('should not return any items', done => { + const items = []; + const iterator = new FilteringIterator(new ArrayIterator([]), () => true); + iterator.on('data', item => { items.push(item); }); + iterator.on('end', () => { + items.should.deep.equal([]); + done(); + }); + }); + }); +}); diff --git a/test/IteratorIterator-test.js b/test/IteratorIterator-test.js new file mode 100644 index 0000000..169ddeb --- /dev/null +++ b/test/IteratorIterator-test.js @@ -0,0 +1,29 @@ +import { + fromIterator, + fromIterable, + IteratorIterator, +} from '../dist/asynciterator.js'; + +function *testYield() { + yield 1; + yield 2; + yield 3; +} + +describe('IteratorIterator', () => { + it('Should wrap correctly', async () => { + (await new IteratorIterator(testYield()).toArray()).should.deep.equal([1, 2, 3]); + }); +}); + +describe('fromIterator', () => { + it('Should wrap correctly', async () => { + (await fromIterator(testYield()).toArray()).should.deep.equal([1, 2, 3]); + }); +}); + +describe('fromIterable', () => { + it('Should wrap correctly', async () => { + (await fromIterable({ [Symbol.iterator]: testYield }).toArray()).should.deep.equal([1, 2, 3]); + }); +}); diff --git a/test/LimitingIterator-test.js b/test/LimitingIterator-test.js new file mode 100644 index 0000000..2f8601b --- /dev/null +++ b/test/LimitingIterator-test.js @@ -0,0 +1,86 @@ +import { + AsyncIterator, + ArrayIterator, + LimitingIterator, +} from '../dist/asynciterator.js'; + +import { EventEmitter } from 'events'; + +describe('LimitingIterator', () => { + describe('The LimitingIterator function', () => { + describe('the result when called with `new`', () => { + let instance; + before(() => { + instance = new LimitingIterator(new ArrayIterator([]), 10); + }); + + it('should be a LimitingIterator object', () => { + instance.should.be.an.instanceof(LimitingIterator); + }); + + it('should be an AsyncIterator object', () => { + instance.should.be.an.instanceof(AsyncIterator); + }); + + it('should be an EventEmitter object', () => { + instance.should.be.an.instanceof(EventEmitter); + }); + }); + }); + + describe('A LimitingIterator', () => { + let iterator, source; + before(() => { + source = new ArrayIterator([0, 1, 2, 3, 4, 5, 6]); + iterator = new LimitingIterator(source, 4); + }); + + describe('when reading items', () => { + const items = []; + before(done => { + iterator.on('data', item => { items.push(item); }); + iterator.on('end', done); + }); + + it('should return items limited to the specified limit', () => { + items.should.deep.equal([0, 1, 2, 3]); + }); + }); + }); + + describe('A LimitingIterator with a source that emits 0 items', () => { + it('should not return any items', done => { + const items = []; + const iterator = new LimitingIterator(new ArrayIterator([]), 10); + iterator.on('data', item => { items.push(item); }); + iterator.on('end', () => { + items.should.deep.equal([]); + done(); + }); + }); + }); + + describe('A LimitingIterator with a limit of 0 items', () => { + it('should not emit any items', done => { + const items = []; + const iterator = new LimitingIterator(new ArrayIterator([0, 1, 2]), 0); + iterator.on('data', item => { items.push(item); }); + iterator.on('end', () => { + items.should.deep.equal([]); + done(); + }); + }); + }); + + describe('A LimitingIterator with a limit of Infinity items', () => { + it('should emit all items', done => { + const items = []; + const iterator = new LimitingIterator(new ArrayIterator([0, 1, 2, 3, 4, 5, 6]), Infinity); + iterator.on('data', item => { items.push(item); }); + iterator.on('end', () => { + items.should.deep.equal([0, 1, 2, 3, 4, 5, 6]); + done(); + }); + }); + }); +}); diff --git a/test/MappingIterator-test.js b/test/MappingIterator-test.js new file mode 100644 index 0000000..5f5ae2d --- /dev/null +++ b/test/MappingIterator-test.js @@ -0,0 +1,62 @@ +import { + AsyncIterator, + ArrayIterator, + MappingIterator, +} from '../dist/asynciterator.js'; + +import { EventEmitter } from 'events'; + +describe('MappingIterator', () => { + describe('The MappingIterator function', () => { + describe('the result when called with `new`', () => { + let instance; + before(() => { + instance = new MappingIterator(new ArrayIterator([]), item => item); + }); + + it('should be a MappingIterator object', () => { + instance.should.be.an.instanceof(MappingIterator); + }); + + it('should be an AsyncIterator object', () => { + instance.should.be.an.instanceof(AsyncIterator); + }); + + it('should be an EventEmitter object', () => { + instance.should.be.an.instanceof(EventEmitter); + }); + }); + }); + + describe('A MappingIterator', () => { + let iterator, source; + before(() => { + source = new ArrayIterator([0, 1, 2, 3, 4, 5, 6]); + iterator = new MappingIterator(source, item => item * 2); + }); + + describe('when reading items', () => { + const items = []; + before(done => { + iterator.on('data', item => { items.push(item); }); + iterator.on('end', done); + }); + + it('should return items mapped according to the mapping function', () => { + items.should.deep.equal([0, 2, 4, 6, 8, 10, 12]); + }); + }); + }); + + describe('A MappingIterator with a source that emits 0 items', () => { + it('should not return any items', done => { + const items = []; + const iterator = new MappingIterator(new ArrayIterator([]), item => item); + iterator.on('data', item => { items.push(item); }); + iterator.on('end', () => { + items.should.deep.equal([]); + done(); + }); + }); + }); +}); diff --git a/test/SimpleTransformIterator-test.js b/test/SimpleTransformIterator-test.js index e033b8e..71ef501 100644 --- a/test/SimpleTransformIterator-test.js +++ b/test/SimpleTransformIterator-test.js @@ -7,6 +7,10 @@ import { ArrayIterator, IntegerIterator, scheduleTask, + MappingIterator, + FilteringIterator, + SkippingIterator, + LimitingIterator, } from '../dist/asynciterator.js'; import { EventEmitter } from 'events'; @@ -1110,8 +1114,8 @@ describe('SimpleTransformIterator', () => { result.on('end', done); }); - it('should be a SimpleTransformIterator', () => { - result.should.be.an.instanceof(SimpleTransformIterator); + it('should be a MappingIterator', () => { + result.should.be.an.instanceof(MappingIterator); }); it('should execute the map function on all items in order', () => { @@ -1146,7 +1150,7 @@ describe('SimpleTransformIterator', () => { }); it('should be a SimpleTransformIterator', () => { - result.should.be.an.instanceof(SimpleTransformIterator); + result.should.be.an.instanceof(MappingIterator); }); it('should execute the map function on all items in order', () => { @@ -1185,7 +1189,7 @@ describe('SimpleTransformIterator', () => { }); it('should be a SimpleTransformIterator', () => { - result.should.be.an.instanceof(SimpleTransformIterator); + result.should.be.an.instanceof(FilteringIterator); }); it('should execute the filter function on all items in order', () => { @@ -1219,7 +1223,7 @@ describe('SimpleTransformIterator', () => { }); it('should be a SimpleTransformIterator', () => { - result.should.be.an.instanceof(SimpleTransformIterator); + result.should.be.an.instanceof(FilteringIterator); }); it('should execute the filter function on all items in order', () => { @@ -1347,7 +1351,7 @@ describe('SimpleTransformIterator', () => { }); it('should be a SimpleTransformIterator', () => { - result.should.be.an.instanceof(SimpleTransformIterator); + result.should.be.an.instanceof(SkippingIterator); }); it('should skip the given number of items', () => { @@ -1377,7 +1381,7 @@ describe('SimpleTransformIterator', () => { }); it('should be a SimpleTransformIterator', () => { - result.should.be.an.instanceof(SimpleTransformIterator); + result.should.be.an.instanceof(LimitingIterator); }); it('should take the given number of items', () => { diff --git a/test/SkippingIterator-test.js b/test/SkippingIterator-test.js new file mode 100644 index 0000000..c1f912a --- /dev/null +++ b/test/SkippingIterator-test.js @@ -0,0 +1,86 @@ +import { + AsyncIterator, + ArrayIterator, + SkippingIterator, +} from '../dist/asynciterator.js'; + +import { EventEmitter } from 'events'; + +describe('SkippingIterator', () => { + describe('The SkippingIterator function', () => { + describe('the result when called with `new`', () => { + let instance; + before(() => { + instance = new SkippingIterator(new ArrayIterator([]), 10); + }); + + it('should be a SkippingIterator object', () => { + instance.should.be.an.instanceof(SkippingIterator); + }); + + it('should be an AsyncIterator object', () => { + instance.should.be.an.instanceof(AsyncIterator); + }); + + it('should be an EventEmitter object', () => { + instance.should.be.an.instanceof(EventEmitter); + }); + }); + }); + + describe('A SkippingIterator', () => { + let iterator, source; + before(() => { + source = new ArrayIterator([0, 1, 2, 3, 4, 5, 6]); + iterator = new SkippingIterator(source, 4); + }); + + describe('when reading items', () => { + const items = []; + before(done => { + iterator.on('data', item => { items.push(item); }); + iterator.on('end', done); + }); + + it('should return items skipping the specified amount', () => { + items.should.deep.equal([4, 5, 6]); + }); + }); + }); + + describe('A SkippingIterator with a source that emits 0 items', () => { + it('should not return any items', done => { + const items = []; + const iterator = new SkippingIterator(new ArrayIterator([]), 10); + iterator.on('data', item => { items.push(item); }); + iterator.on('end', () => { + items.should.deep.equal([]); + done(); + }); + }); + }); + + describe('A SkippingIterator with a limit of 0 items', () => { + it('should emit all items', done => { + const items = []; + const iterator = new SkippingIterator(new ArrayIterator([0, 1, 2, 3, 4, 5, 6]), 0); + iterator.on('data', item => { items.push(item); }); + iterator.on('end', () => { + items.should.deep.equal([0, 1, 2, 3, 4, 5, 6]); + done(); + }); + }); + }); + + describe('A SkippingIterator with a limit of Infinity items', () => { + it('should skip all items', done => { + const items = []; + const iterator = new SkippingIterator(new ArrayIterator([0, 1, 2, 3, 4, 5, 6]), Infinity); + iterator.on('data', item => { items.push(item); }); + iterator.on('end', () => { + items.should.deep.equal([]); + done(); + }); + }); + }); +}); diff --git a/test/SynchronousTransformIterator-test.js b/test/SynchronousTransformIterator-test.js new file mode 100644 index 0000000..4c6d347 --- /dev/null +++ b/test/SynchronousTransformIterator-test.js @@ -0,0 +1,83 @@ +import { + AsyncIterator, + ArrayIterator, + SynchronousTransformIterator, +} from '../dist/asynciterator.js'; + +import { EventEmitter } from 'events'; + +class _SynchronousTransformIterator extends SynchronousTransformIterator { + read() { + return this._readSource(); + } +} + +describe('SynchronousTransformIterator', () => { + describe('The SynchronousTransformIterator function', () => { + describe('the result when called with `new`', () => { + let instance; + before(() => { + instance = new _SynchronousTransformIterator(new ArrayIterator([])); + }); + + it('should be a SynchronousTransformIterator object', () => { + instance.should.be.an.instanceof(SynchronousTransformIterator); + }); + + it('should be an AsyncIterator object', () => { + instance.should.be.an.instanceof(AsyncIterator); + }); + + it('should be an EventEmitter object', () => { + instance.should.be.an.instanceof(EventEmitter); + }); + }); + }); + + describe('A SynchronousTransformIterator', () => { + let iterator, source; + before(() => { + source = new ArrayIterator([0, 1, 2, 3, 4, 5, 6]); + iterator = new _SynchronousTransformIterator(source); + }); + + describe('when reading items', () => { + const items = []; + before(done => { + iterator.on('data', item => { items.push(item); }); + iterator.on('end', done); + }); + + it('should return all items', () => { + items.should.deep.equal([0, 1, 2, 3, 4, 5, 6]); + }); + }); + }); + + describe('A SynchronousTransformIterator with a source that emits 0 items', () => { + it('should not return any items', done => { + const items = []; + const iterator = new _SynchronousTransformIterator(new ArrayIterator([])); + iterator.on('data', item => { items.push(item); }); + iterator.on('end', () => { + items.should.deep.equal([]); + done(); + }); + }); + }); + + describe('A SynchronousTransformIterator with a source that is already ended', () => { + it('should not return any items', done => { + const items = []; + const source = new ArrayIterator([]); + source.on('end', () => { + const iterator = new _SynchronousTransformIterator(source); + iterator.on('data', item => { items.push(item); }); + iterator.on('end', () => { + items.should.deep.equal([]); + done(); + }); + }); + }); + }); +}); diff --git a/test/TransformIterator-test.js b/test/TransformIterator-test.js index e5d8116..a6cdcab 100644 --- a/test/TransformIterator-test.js +++ b/test/TransformIterator-test.js @@ -4,7 +4,6 @@ import { EmptyIterator, ArrayIterator, TransformIterator, - wrap, scheduleTask, } from '../dist/asynciterator.js'; @@ -32,23 +31,6 @@ describe('TransformIterator', () => { instance.should.be.an.instanceof(EventEmitter); }); }); - - describe('the result when called through `wrap`', () => { - let instance; - before(() => { instance = wrap(); }); - - it('should be an TransformIterator object', () => { - instance.should.be.an.instanceof(TransformIterator); - }); - - it('should be an AsyncIterator object', () => { - instance.should.be.an.instanceof(AsyncIterator); - }); - - it('should be an EventEmitter object', () => { - instance.should.be.an.instanceof(EventEmitter); - }); - }); }); describe('A TransformIterator', () => { diff --git a/test/UnionIterator-test.js b/test/UnionIterator-test.js index 308f52d..c7714ec 100644 --- a/test/UnionIterator-test.js +++ b/test/UnionIterator-test.js @@ -260,6 +260,50 @@ describe('UnionIterator', () => { }); }); + describe('when constructed with an iterator and with autoStart and one source a promise', () => { + let iterator, sourceIterator; + before(() => { + const sources = [Promise.resolve(range(0, 2)), range(3, 6)]; + sourceIterator = new ArrayIterator(sources); + sinon.spy(sourceIterator, 'read'); + iterator = new UnionIterator(sourceIterator, { autoStart: true }); + }); + + describe('before reading', () => { + it('should have read the sources', () => { + sourceIterator.read.should.have.been.called; + }); + + it('should not have ended', () => { + iterator.ended.should.be.false; + }); + + it('should pass errors', () => { + const callback = sinon.spy(); + const error = new Error('error'); + iterator.once('error', callback); + sourceIterator.emit('error', error); + callback.should.have.been.calledOnce; + callback.should.have.been.calledWith(error); + }); + }); + + describe('after reading', () => { + let items; + before(async () => { + items = (await toArray(iterator)).sort(); + }); + + it('should have emitted all items', () => { + items.should.eql([0, 1, 2, 3, 4, 5, 6]); + }); + + it('should have ended', () => { + iterator.ended.should.be.true; + }); + }); + }); + describe('when constructed with an iterator and without autoStart', () => { let iterator, sourceIterator; before(() => { @@ -308,6 +352,54 @@ describe('UnionIterator', () => { }); }); + describe('when constructed with an iterator and without autoStart and one source as a promise', () => { + let iterator, sourceIterator; + before(() => { + const sources = [Promise.resolve(range(0, 2)), range(3, 6)]; + sourceIterator = new ArrayIterator(sources); + sinon.spy(sourceIterator, 'read'); + iterator = new UnionIterator(sourceIterator, { autoStart: false }); + }); + + describe('before reading', () => { + it('should not have read the sources', () => { + sourceIterator.read.should.not.have.been.called; + }); + + it('should not have ended', () => { + iterator.ended.should.be.false; + }); + + it('should pass errors', () => { + const callback = sinon.spy(); + const error = new Error('error'); + iterator.once('error', callback); + sourceIterator.emit('error', error); + callback.should.have.been.calledOnce; + callback.should.have.been.calledWith(error); + }); + }); + + describe('after reading', () => { + let items; + before(async () => { + items = (await toArray(iterator)).sort(); + }); + + it('should have read the sources', () => { + sourceIterator.read.should.have.been.called; + }); + + it('should have emitted all items', () => { + items.should.eql([0, 1, 2, 3, 4, 5, 6]); + }); + + it('should have ended', () => { + iterator.ended.should.be.true; + }); + }); + }); + describe('a UnionIterator with two sources', () => { let iterator, sources; diff --git a/test/WrappingIterator-test.js b/test/WrappingIterator-test.js new file mode 100644 index 0000000..d187567 --- /dev/null +++ b/test/WrappingIterator-test.js @@ -0,0 +1,145 @@ +import { AsyncIterator, ArrayIterator, WrappingIterator, wrap, fromArray } from '../dist/asynciterator.js'; +import { EventEmitter } from 'events'; + +describe('WrappingIterator', () => { + describe('The WrappingIterator function', () => { + describe('the result when called with `new`', () => { + let instance; + + before(() => { + instance = new WrappingIterator(new ArrayIterator([])); + }); + + it('should be a WrappingIterator object', () => { + instance.should.be.an.instanceof(WrappingIterator); + }); + + it('should be a AsyncIterator object', () => { + instance.should.be.an.instanceof(AsyncIterator); + }); + + it('should be an EventEmitter object', () => { + instance.should.be.an.instanceof(EventEmitter); + }); + }); + }); + describe('the result when called with new and with an invalid source', () => { + it('should emit an error', done => { + const source = {}; + const wrapped = new WrappingIterator(source); + wrapped.on('error', err => { + err; + done(); + }); + }); + }); + describe('with an empty source iterator', () => { + it('should end when the source iterator ends and letIteratorThrough is not set', done => { + const source = new ArrayIterator([]); + const wrapped = new WrappingIterator(source); + wrapped.on('end', () => { + done(); + }); + }); + it('should end when the source iterator ends and letIteratorThrough is set to true', done => { + const source = new ArrayIterator([]); + const wrapped = new WrappingIterator(source, { letIteratorThrough: true }); + wrapped.on('end', () => { + done(); + }); + }); + }); + describe('with a non-empty source', () => { + it('should end when the source ends', done => { + const source = new ArrayIterator([0, 1, 2, 3, 4]); + const wrapped = new WrappingIterator(source); + wrapped.on('data', item => { item; }).on('end', () => { + done(); + }); + }); + it('should emit items from the source before ending', done => { + const array = [0, 1, 2, 3, 4]; + const source = new ArrayIterator(array); + const wrapped = new WrappingIterator(source); + let i = 0; + wrapped + .on('data', item => { + item.should.equal(array[i++]); + }) + .on('end', () => { + done(); + }); + }); + }); + describe('with a promise of a non-empty source', () => { + it('should emit items from the source before ending', done => { + const array = [0, 1, 2, 3, 4]; + const source = new ArrayIterator(array); + const wrapped = new WrappingIterator(Promise.resolve(source)); + let i = 0; + wrapped + .on('data', item => { + item.should.equal(array[i++]); + }) + .on('end', () => { + done(); + }); + }); + }); + describe('source with read, on and iterable methods', () => { + let obj; + + beforeEach(() => { + obj = fromArray([1]); + obj[Symbol.iterator] = function * () { + yield 'x'; + yield 'y'; + }; + }); + + it('should prioritize the read method', async () => { + (await wrap(obj).toArray()).should.deep.equal([1]); + }); + it('should use the iterator when correctly set-up', async () => { + (await wrap(obj, { prioritizeIterable: true }).toArray()).should.deep.equal(['x', 'y']); + }); + it('wrapping should produce a new object', async () => { + wrap(obj).should.not.equal(obj); + }); + }); + describe('source that emits an error', () => { + it('relay the error', done => { + const err = new Error('some error'); + const source = new ArrayIterator([0, 1, 2, 3]); + const iterator = new WrappingIterator(source); + iterator.on('error', iteratorErr => { + expect(iteratorErr).to.equal(err); + done(); + }); + source.emit('error', err); + }); + }); + describe('promise of a source that rejects', () => { + it('emit the error', done => { + const err = new Error('some error'); + const iterator = new WrappingIterator(Promise.reject(err)); + iterator.on('error', iteratorErr => { + expect(iteratorErr).to.equal(err); + done(); + }); + }); + }); + describe('promise of a source', () => { + it('read null until the promise resolves', done => { + const promise = new Promise(resolve => { + setTimeout(() => { + resolve(new ArrayIterator([0, 1, 2, 3])); + }, 10); + }); + const iterator = new WrappingIterator(promise); + expect(iterator.readable).to.equal(false); + expect(iterator.read()).to.equal(null); + done(); + }); + }); +}); diff --git a/test/wrap-test.js b/test/wrap-test.js new file mode 100644 index 0000000..127082b --- /dev/null +++ b/test/wrap-test.js @@ -0,0 +1,69 @@ + +import { + wrap, + ArrayIterator, + TransformIterator, + WrappingIterator, +} from '../dist/asynciterator.js'; + +import { EventEmitter } from 'events'; + +class IteratorLike extends EventEmitter { + constructor() { + super(); + this._count = 0; + } + + read() { + if (this._count >= 5) { + this.emit('end'); + return null; + } + return this._count++; + } +} + +describe('The wrap() function', () => { + it('should not let an instance of AsyncIterator pass through without wrapping if letIteratorThrough option is not set', () => { + const source = new ArrayIterator([0, 1, 2, 3, 4]); + const wrapped = wrap(source); + wrapped.should.not.equal(source); + wrapped.should.be.instanceof(WrappingIterator); + }); + + it('should not let an instance of AsyncIterator pass through without wrapping if letIteratorThrough option is set to false', () => { + const source = new ArrayIterator([0, 1, 2, 3, 4]); + const wrapped = wrap(source, { letIteratorThrough: false }); + wrapped.should.not.equal(source); + wrapped.should.be.instanceof(WrappingIterator); + }); + + it('should let an instance of AsyncIterator pass through without wrapping if letIteratorThrough option is set to true', () => { + const source = new ArrayIterator([0, 1, 2, 3, 4]); + const wrapped = wrap(source, { letIteratorThrough: true }); + wrapped.should.equal(source); + wrapped.should.be.instanceof(ArrayIterator); + }); + + it('should emit an error when an incompatible source is passed', done => { + const source = {}; + const wrapped = wrap(source); + wrapped.on('error', err => { + err; + done(); + }); + }); + + it('should return a TransformIterator when transform options are passed', () => { + const source = new ArrayIterator([0, 1, 2, 3, 4]); + const options = { maxBufferSize: 42 }; + const wrapped = wrap(source, options); + wrapped.should.be.instanceof(TransformIterator); + }); + + it('should return a WrappingIterator when no options object is passed', () => { + const source = new IteratorLike(); + const wrapped = wrap(source); + wrapped.should.be.instanceof(WrappingIterator); + }); +});