Skip to content
277 changes: 273 additions & 4 deletions asynciterator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,15 @@ export class AsyncIterator<T> extends EventEmitter {
@returns {module:asynciterator.AsyncIterator} A new iterator that maps the items from this iterator
*/
map<D>(map: (item: T) => D, self?: any): AsyncIterator<D> {
return this.transform({ map: self ? map.bind(self) : map });
return new MappingIterator(this, self ? map.bind(self) : map);
}

/**
MultiMaps items according to a synchronous generator (hence no need for buffering)
@param {Function} multiMap The function to multiMap items with
*/
multiMap<D>(multiMap: (item: T) => Generator<D>): AsyncIterator<D> {
return new MultiMappingIterator(this, multiMap);
}

/**
Expand All @@ -469,7 +477,7 @@ export class AsyncIterator<T> extends EventEmitter {
filter<K extends T>(filter: (item: T) => item is K, self?: any): AsyncIterator<K>;
filter(filter: (item: T) => boolean, self?: any): AsyncIterator<T>;
filter(filter: (item: T) => boolean, self?: any): AsyncIterator<T> {
return this.transform({ filter: self ? filter.bind(self) : filter });
return new FilteringIterator(this, self ? filter.bind(self) : filter);
}

/**
Expand Down Expand Up @@ -510,7 +518,7 @@ export class AsyncIterator<T> extends EventEmitter {
@returns {module:asynciterator.AsyncIterator} A new iterator that skips the given number of items
*/
skip(offset: number): AsyncIterator<T> {
return this.transform({ offset });
return new SkippingIterator(this, offset);
}

/**
Expand All @@ -520,7 +528,7 @@ export class AsyncIterator<T> extends EventEmitter {
@returns {module:asynciterator.AsyncIterator} A new iterator with at most the given number of items
*/
take(limit: number): AsyncIterator<T> {
return this.transform({ limit });
return new LimitingIterator(this, limit);
}

/**
Expand Down Expand Up @@ -1251,6 +1259,267 @@ function destinationFillBuffer<S>(this: InternalSource<S>) {
(this._destination as any)._fillBuffer();
}

export class SynchronousTransformIterator<S, D = S> extends AsyncIterator<D> {
protected _source: AsyncIterator<S>;

constructor(source: AsyncIterator<S>) {
/* eslint-disable no-use-before-define */
super();
this._source = source;
const cleanup = () => {
source.removeListener('end', onSourceEnd);
source.removeListener('error', onSourceError);
source.removeListener('readable', onSourceReadable);
taskScheduler(() => {
// Delayed as there might be pending tasks using the source at the
// time that cleanup() is called.
delete this._source;
});
};
const onSourceEnd = () => {
cleanup();
this.close();
};
const onSourceError = (err: Error) => {
this.emit('error', err);
};
const onSourceReadable = () => {
if (this.readable) {
// TODO: I'm not completely sure as to why this is needed but without
// the following line, some use cases relying on flow mode (i.e.
// consuming items via `on('data', (data) => {})`) do not work.
// It looks like the debouncing that happens in `set readable()`
// in `AsyncIterator` prevents the event from firing as `this`
// is already readable.
this.emit('readable');
}
else {
this.readable = true;
}
};
source.on('end', onSourceEnd);
source.on('error', onSourceError);
source.on('readable', onSourceReadable);
if (source.readable)
onSourceReadable();
}

protected _destroy(cause: Error | undefined, callback: (error?: Error) => void) {
super._destroy(cause, callback);
this._source.destroy(cause);
}
}

export class MultiMappingIterator<S, D = S> extends SynchronousTransformIterator<S, D> {
protected readonly _map: (item: S) => Generator<D>;
private generator?: Generator<D>;

constructor(source: AsyncIterator<S>, map: (item: S) => Generator<D>) {
super(source);
this._map = map;
}

read(): D | null {
let _item;

// eslint-disable-next-line no-constant-condition
while (true) {
if (!this.generator) {
if ((_item = this._source.read()) === null)
return null;
this.generator = this._map(_item);
}
if (!(_item = this.generator.next()).done)
return _item.value;
this.generator = undefined;
}
}
}

export class MappingIterator<S, D = S> extends SynchronousTransformIterator<S, D> {
protected readonly _map: (item: S) => D;

constructor(source: AsyncIterator<S>, map: (item: S) => D) {
super(source);
this._map = map;
}

read(): D | null {
const item = this._source.read();
if (item !== null)
return this._map(item);
return null;
}

map<T>(map: (item: D) => T, self?: any): AsyncIterator<T> {
return new MultiMapFilterTransformIterator(this._source, {
filter: false,
function: self ? map.bind(self) : map,
next: {
filter: false,
function: this._map,
},
});
}

filter(filter: (item: D) => boolean, self?: any): AsyncIterator<D> {
return new MultiMapFilterTransformIterator(this._source, {
filter: true,
function: self ? filter.bind(self) : filter,
next: {
filter: false,
function: this._map,
},
});
}
}

export class FilteringIterator<T> extends SynchronousTransformIterator<T> {
protected readonly _filter: (item: T) => boolean;

constructor(source: AsyncIterator<T>, filter: (item: T) => boolean) {
super(source);
this._filter = filter;
}

read(): T | null {
let item;
while ((item = this._source.read()) !== null) {
if (this._filter(item))
return item;
}
return null;
}

map<D>(map: (item: T) => D, self?: any): AsyncIterator<D> {
return new MultiMapFilterTransformIterator(this._source, {
filter: false,
function: self ? map.bind(self) : map,
next: {
filter: true,
function: this._filter,
},
});
}

filter(filter: (item: T) => boolean, self?: any): AsyncIterator<T> {
return new MultiMapFilterTransformIterator(this._source, {
filter: true,
function: self ? filter.bind(self) : filter,
next: {
filter: true,
function: this._filter,
},
});
}
}

export class SkippingIterator<T> extends SynchronousTransformIterator<T> {
protected readonly _skip: number;
protected _skipped: number;

constructor(source: AsyncIterator<T>, skip: number) {
super(source);
this._skip = skip;
this._skipped = 0;
}

read(): T | null {
let item;
while ((item = this._source.read()) !== null) {
if (this._skipped < this._skip)
this._skipped += 1;
else
return item;
}
return null;
}
}

export class LimitingIterator<T> extends SynchronousTransformIterator<T> {
protected readonly _limit: number;
protected _count: number;

constructor(source: AsyncIterator<T>, limit: number) {
super(source);
this._limit = limit;
this._count = 0;
}

read(): T | null {
const item = this._source.read();
if (item !== null) {
if (this._count < this._limit) {
this._count += 1;
return item;
}
this.close();
return null;
}
return null;
}
}

interface Transform {
filter: boolean,
function: Function,
next?: Transform
}

export class MultiMapFilterTransformIterator<S, D = S> extends SynchronousTransformIterator<S, D> {
private _transformation?: (item: S) => D | null;

constructor(source: AsyncIterator<S>, private transforms: Transform) {
super(source);
}

protected transformation(_item: S): D | null {
if (!this._transformation) {
let _transforms: Transform | undefined = this.transforms;

const { filter, function: func } = _transforms!;

this._transformation = filter ?
((item: any) => func(item) ? item : null) :
func as any;

while ((_transforms = _transforms!.next) !== undefined) {
const { filter: _filter, function: _func } = _transforms;
const t = this._transformation!;

this._transformation = _filter ?
(item: any) => _func(item) ? t(item) : null :
(item: any) => t(_func(item));
}
}
return this._transformation!(_item);
}

read(): D | null {
let item;
while ((item = this._source.read()) !== null) {
if ((item = this.transformation(item)) !== null)
return item;
}
return null;
}

map<T>(map: (item: D) => T, self?: any): AsyncIterator<T> {
return new MultiMapFilterTransformIterator(this._source, {
filter: false,
function: self ? map.bind(self) : map,
next: this.transforms,
});
}

filter(filter: (item: D) => boolean, self?: any): AsyncIterator<D> {
return new MultiMapFilterTransformIterator(this._source, {
filter: true,
function: self ? filter.bind(self) : filter,
next: this.transforms,
});
}
}

/**
An iterator that generates items based on a source iterator
Expand Down
Loading