Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
254 changes: 250 additions & 4 deletions asynciterator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,15 @@ export class AsyncIterator<T> extends EventEmitter {
@returns {module:asynciterator.AsyncIterator} A new iterator that maps the items from this iterator
*/
map<D>(map: (item: T) => D, self?: any): AsyncIterator<D> {
return this.transform({ map: self ? map.bind(self) : map });
return new MappingIterator(this, self ? map.bind(self) : map);
}

/**
MultiMaps items according to a synchronous generator (hence no need for buffering)
@param {Function} multiMap The function to multiMap items with
*/
multiMap<D>(multiMap: (item: T) => Generator<D>): AsyncIterator<D> {
return new MultiMappingIterator(this, multiMap);
}

/**
Expand All @@ -469,7 +477,7 @@ export class AsyncIterator<T> extends EventEmitter {
filter<K extends T>(filter: (item: T) => item is K, self?: any): AsyncIterator<K>;
filter(filter: (item: T) => boolean, self?: any): AsyncIterator<T>;
filter(filter: (item: T) => boolean, self?: any): AsyncIterator<T> {
return this.transform({ filter: self ? filter.bind(self) : filter });
return new FilteringIterator(this, self ? filter.bind(self) : filter);
}

/**
Expand Down Expand Up @@ -510,7 +518,7 @@ export class AsyncIterator<T> extends EventEmitter {
@returns {module:asynciterator.AsyncIterator} A new iterator that skips the given number of items
*/
skip(offset: number): AsyncIterator<T> {
return this.transform({ offset });
return new SkippingIterator(this, offset);
}

/**
Expand All @@ -520,7 +528,7 @@ export class AsyncIterator<T> extends EventEmitter {
@returns {module:asynciterator.AsyncIterator} A new iterator with at most the given number of items
*/
take(limit: number): AsyncIterator<T> {
return this.transform({ limit });
return new LimitingIterator(this, limit);
}

/**
Expand Down Expand Up @@ -1251,6 +1259,244 @@ function destinationFillBuffer<S>(this: InternalSource<S>) {
(this._destination as any)._fillBuffer();
}

export class SynchronousTransformIterator<S, D = S> extends AsyncIterator<D> {
protected readonly _source: AsyncIterator<S>;

constructor(source: AsyncIterator<S>) {
/* eslint-disable no-use-before-define */
super();
this._source = source;
const cleanup = () => {
source.removeListener('end', onEnd);
source.removeListener('readable', onReadable);
};
const onEnd = () => {
cleanup();
this.close();
};
const onReadable = () => {
this.readable = true;
};
source.on('end', onEnd);
source.on('readable', onReadable);
}

protected _destroy(cause: Error | undefined, callback: (error?: Error) => void) {
super._destroy(cause, callback);
this._source.destroy(cause);
}
}

export class MultiMappingIterator<S, D = S> extends SynchronousTransformIterator<S, D> {
protected readonly _map: (item: S) => Generator<D>;
private generator?: Generator<D>;

constructor(source: AsyncIterator<S>, map: (item: S) => Generator<D>) {
super(source);
this._map = map;
}

read(): D | null {
let _item;

// eslint-disable-next-line no-constant-condition
while (true) {
if (!this.generator) {
if ((_item = this._source.read()) === null)
return null;
this.generator = this._map(_item);
}
if (!(_item = this.generator.next()).done)
return _item.value;
this.generator = undefined;
}
}
}

export class MappingIterator<S, D = S> extends SynchronousTransformIterator<S, D> {
protected readonly _map: (item: S) => D;

constructor(source: AsyncIterator<S>, map: (item: S) => D) {
super(source);
this._map = map;
}

read(): D | null {
const item = this._source.read();
if (item !== null)
return this._map(item);
return null;
}

map<T>(map: (item: D) => T, self?: any): AsyncIterator<T> {
return new MultiMapFilterTransformIterator(this._source, {
filter: false,
function: self ? map.bind(self) : map,
next: {
filter: false,
function: this._map,
},
});
}

filter(filter: (item: D) => boolean, self?: any): AsyncIterator<D> {
return new MultiMapFilterTransformIterator(this._source, {
filter: true,
function: self ? filter.bind(self) : filter,
next: {
filter: false,
function: this._map,
},
});
}
}

export class FilteringIterator<T> extends SynchronousTransformIterator<T> {
protected readonly _filter: (item: T) => boolean;

constructor(source: AsyncIterator<T>, filter: (item: T) => boolean) {
super(source);
this._filter = filter;
}

read(): T | null {
let item;
while ((item = this._source.read()) !== null) {
if (this._filter(item))
return item;
}
return null;
}

map<D>(map: (item: T) => D, self?: any): AsyncIterator<D> {
return new MultiMapFilterTransformIterator(this._source, {
filter: false,
function: self ? map.bind(self) : map,
next: {
filter: true,
function: this._filter,
},
});
}

filter(filter: (item: T) => boolean, self?: any): AsyncIterator<T> {
return new MultiMapFilterTransformIterator(this._source, {
filter: true,
function: self ? filter.bind(self) : filter,
next: {
filter: true,
function: this._filter,
},
});
}
}

export class SkippingIterator<T> extends SynchronousTransformIterator<T> {
protected readonly _skip: number;
protected _skipped: number;

constructor(source: AsyncIterator<T>, skip: number) {
super(source);
this._skip = skip;
this._skipped = 0;
}

read(): T | null {
let item;
while ((item = this._source.read()) !== null) {
if (this._skipped < this._skip)
this._skipped += 1;
else
return item;
}
return null;
}
}

export class LimitingIterator<T> extends SynchronousTransformIterator<T> {
protected readonly _limit: number;
protected _count: number;

constructor(source: AsyncIterator<T>, limit: number) {
super(source);
this._limit = limit;
this._count = 0;
}

read(): T | null {
const item = this._source.read();
if (item !== null) {
if (this._count < this._limit) {
this._count += 1;
return item;
}
this.close();
return null;
}
return null;
}
}

interface Transform {
filter: boolean,
function: Function,
next?: Transform
}

export class MultiMapFilterTransformIterator<S, D = S> extends SynchronousTransformIterator<S, D> {
private _transformation?: (item: S) => D | null;

constructor(source: AsyncIterator<S>, private transforms: Transform) {
super(source);
}

protected transformation(_item: S): D | null {
if (!this._transformation) {
let _transforms: Transform | undefined = this.transforms;

const { filter, function: func } = _transforms!;

this._transformation = filter ?
((item: any) => func(item) ? item : null) :
func as any;

while ((_transforms = _transforms!.next) !== undefined) {
const { filter: _filter, function: _func } = _transforms;
const t = this._transformation!;

this._transformation = _filter ?
(item: any) => _func(item) ? t(item) : null :
(item: any) => t(_func(item));
}
}
return this._transformation!(_item);
}

read(): D | null {
let item;
while ((item = this._source.read()) !== null) {
if ((item = this.transformation(item)) !== null)
return item;
}
return null;
}

map<T>(map: (item: D) => T, self?: any): AsyncIterator<T> {
return new MultiMapFilterTransformIterator(this._source, {
filter: false,
function: self ? map.bind(self) : map,
next: this.transforms,
});
}

filter(filter: (item: D) => boolean, self?: any): AsyncIterator<D> {
return new MultiMapFilterTransformIterator(this._source, {
filter: true,
function: self ? filter.bind(self) : filter,
next: this.transforms,
});
}
}

/**
An iterator that generates items based on a source iterator
Expand Down
84 changes: 84 additions & 0 deletions test/AsyncIterator-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
ENDED,
DESTROYED,
scheduleTask,
range,
} from '../dist/asynciterator.js';

import { EventEmitter } from 'events';
Expand Down Expand Up @@ -1307,4 +1308,87 @@ describe('AsyncIterator', () => {
});
});
});
describe('Testing chains fo maps and filters', () => {
let iterator;
beforeEach(() => {
iterator = range(0, 2);
});
it('Should handle no transforms', async () => {
iterator.read().should.equal(0);
iterator.read().should.equal(1);
iterator.read().should.equal(2);
});
it('Should handle no transforms arrayified', async () => {
(await iterator.toArray()).should.deep.equal([0, 1, 2]);
});
it('Should apply maps that doubles correctly', async () => {
(await iterator.map(x => x * 2).toArray()).should.deep.equal([0, 2, 4]);
});
it('Should apply maps that doubles correctly', async () => {
(await iterator.map(x => `x${x}`).toArray()).should.deep.equal(['x0', 'x1', 'x2']);
});
it('Should apply filter correctly', async () => {
(await iterator.filter(x => x % 2 === 0).toArray()).should.deep.equal([0, 2]);
});
it('Should apply filter then map correctly', async () => {
(await iterator.filter(x => x % 2 === 0).map(x => `x${x}`).toArray()).should.deep.equal(['x0', 'x2']);
});
it('Should apply map then filter correctly (1)', async () => {
(await iterator.map(x => x).filter(x => x % 2 === 0).toArray()).should.deep.equal([0, 2]);
});
it('Should apply map then filter to false correctly', async () => {
(await iterator.map(x => `x${x}`).filter(x => true).toArray()).should.deep.equal(['x0', 'x1', 'x2']);
});
it('Should apply map then filter to true correctly', async () => {
(await iterator.map(x => `x${x}`).filter(x => false).toArray()).should.deep.equal([]);
});
it('Should apply filter to false then map correctly', async () => {
(await iterator.filter(x => true).map(x => `x${x}`).toArray()).should.deep.equal(['x0', 'x1', 'x2']);
});
it('Should apply filter to true then map correctly', async () => {
(await iterator.filter(x => false).map(x => `x${x}`).filter(x => false).toArray()).should.deep.equal([]);
});
it('Should apply filter one then double', async () => {
(await iterator.filter(x => x !== 1).map(x => x * 2).toArray()).should.deep.equal([0, 4]);
});
it('Should apply double then filter one', async () => {
(await iterator.map(x => x * 2).filter(x => x !== 1).toArray()).should.deep.equal([0, 2, 4]);
});
it('Should apply map then filter correctly', async () => {
(await iterator.map(x => `x${x}`).filter(x => (x[1] === '0')).toArray()).should.deep.equal(['x0']);
});
it('Should correctly apply 3 filters', async () => {
(await range(0, 5).filter(x => x !== 1).filter(x => x !== 2).filter(x => x !== 2).toArray()).should.deep.equal([0, 3, 4, 5]);
});
it('Should correctly apply 3 maps', async () => {
(await range(0, 1).map(x => x * 2).map(x => `z${x}`).map(x => `y${x}`).toArray()).should.deep.equal(['yz0', 'yz2']);
});
it('Should correctly apply a map, followed by a filter, followed by another map', async () => {
(await range(0, 1).map(x => x * 2).filter(x => x !== 2).map(x => `y${x}`).toArray()).should.deep.equal(['y0']);
});
it('Should correctly apply a filter-map-filter', async () => {
(await range(0, 2).filter(x => x !== 1).map(x => x * 3).filter(x => x !== 6).toArray()).should.deep.equal([0]);
});
it('Should handle transforms', async () => {
iterator = iterator.multiMap(function* (data) {
yield `x${data}`;
yield `y${data}`;
});
(await iterator.toArray()).should.deep.equal(['x0', 'y0', 'x1', 'y1', 'x2', 'y2']);
});
it('Should handle transforms and maps', async () => {
iterator = iterator.multiMap(function* (data) {
yield `x${data}`;
yield `y${data}`;
}).map(x => `z${x}`);
(await iterator.toArray()).should.deep.equal(['zx0', 'zy0', 'zx1', 'zy1', 'zx2', 'zy2']);
});
it('Should handle maps and transforms', async () => {
iterator = iterator.map(x => `z${x}`).multiMap(function* (data) {
yield `x${data}`;
yield `y${data}`;
});
(await iterator.toArray()).should.deep.equal(['xz0', 'yz0', 'xz1', 'yz1', 'xz2', 'yz2']);
});
});
});
Loading