Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
267 changes: 163 additions & 104 deletions asynciterator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ export class AsyncIterator<T> extends EventEmitter {
@returns {module:asynciterator.AsyncIterator} A new iterator that prepends items to this iterator
*/
prepend(items: T[] | AsyncIterator<T>): AsyncIterator<T> {
return this.transform({ prepend: items });
return union([wrap(items), this]);
}

/**
Expand All @@ -492,7 +492,7 @@ export class AsyncIterator<T> extends EventEmitter {
@returns {module:asynciterator.AsyncIterator} A new iterator that appends items to this iterator
*/
append(items: T[] | AsyncIterator<T>): AsyncIterator<T> {
return this.transform({ append: items });
return union([this, wrap(items)]);
}

/**
Expand All @@ -503,7 +503,7 @@ export class AsyncIterator<T> extends EventEmitter {
@returns {module:asynciterator.AsyncIterator} A new iterator that appends and prepends items to this iterator
*/
surround(prepend: AsyncIteratorOrArray<T>, append: AsyncIteratorOrArray<T>): AsyncIterator<T> {
return this.transform({ prepend, append });
return union([wrap(prepend), this, wrap(append)]);
}

/**
Expand Down Expand Up @@ -1619,147 +1619,206 @@ export class MultiTransformIterator<S, D = S> extends TransformIterator<S, D> {
}
}

/** Key indicating the node of a source in a union buffer. */
export const NODE = Symbol('node');

interface CircularNode<T> {
last: CircularNode<T>;
next: CircularNode<T>;
value: T;
}

/**
An iterator that generates items by reading from multiple other iterators.
@extends module:asynciterator.BufferedIterator
@extends module:asynciterator.AsyncIterator
*/
export class UnionIterator<T> extends BufferedIterator<T> {
private _sources : InternalSource<T>[] = [];
private _pending? : { loading: boolean, sources?: AsyncIterator<MaybePromise<AsyncIterator<T>>> };
private _currentSource = -1;
protected _destroySources: boolean;
export class UnionIterator<T> extends AsyncIterator<T> {
private _sources : AsyncIterator<AsyncIterator<T>>;
private _node: CircularNode<AsyncIterator<T>> | null = null;
private _size: number = 0;
private _sourceStarted: boolean = false;
private _destroySources: boolean;
private _maxParallelIterators: number;

/**
Creates a new `UnionIterator`.
@param {module:asynciterator.AsyncIterator|Array} [sources] The sources to read from
@param {module:asynciterator.AsyncIterator|Array} [sources] The sources to read from.
@param {object} [options] Settings of the iterator
@param {boolean} [options.destroySource=true] Whether the sources should be destroyed when transformed iterator is closed or destroyed
@param {number} [options.maxParallelIterators=Infinity] The maximum number of iterators that can be read from simultaneously.
Setting equal to 1 will cause iterators to be read from sequentially and in order.
*/
constructor(sources: AsyncIteratorOrArray<AsyncIterator<T>> |
AsyncIteratorOrArray<Promise<AsyncIterator<T>>> |
AsyncIteratorOrArray<MaybePromise<AsyncIterator<T>>>,
options: BufferedIteratorOptions & { destroySources?: boolean } = {}) {
super(options);
const autoStart = options.autoStart !== false;
AsyncIteratorOrArray<MaybePromise<AsyncIterator<T>>> |
MaybePromise<AsyncIteratorOrArray<MaybePromise<IterableSource<T>>>>,
options: { destroySources?: boolean, maxParallelIterators?: number } = {}) {
super();
this._listenSource(this._sources = wrap(sources).map<AsyncIterator<T>>(wrap));

// Sources have been passed as an iterator
if (isEventEmitter(sources)) {
sources.on('error', error => this.emit('error', error));
this._pending = { loading: false, sources: sources as AsyncIterator<MaybePromise<AsyncIterator<T>>> };
if (autoStart)
this._loadSources();
}
// Sources have been passed as a non-empty array
else if (Array.isArray(sources) && sources.length > 0) {
for (const source of sources)
this._addSource(source as MaybePromise<InternalSource<T>>);
}
// Sources are an empty list
else if (autoStart) {
this.close();
}
// Set other options
this._destroySources = options.destroySources !== false;
this._maxParallelIterators = options.maxParallelIterators || Infinity;

this.readable = this._sources.readable;
}

// Adds the given source to the internal sources array
protected _listenSource(source: InternalSource<any>) {
source[DESTINATION] = this;
source.on('error', destinationEmitError);
// TODO: On readable events we should start reading from the iterator
// that emitted the event
source.on('readable', destinationSetReadable);
source.on('end', destinationRemoveEmptySources);
}

protected _unListenSource(source: InternalSource<any>) {
source.removeListener('error', destinationEmitError);
source.removeListener('readable', destinationSetReadable);
source.removeListener('end', destinationRemoveEmptySources);
delete source[DESTINATION];
}

// Loads pending sources into the sources list
protected _loadSources() {
// Obtain sources iterator
const sources = this._pending!.sources!;
this._pending!.loading = true;
protected _addSource(source: InternalSource<any>, successor = this._node) {
this._listenSource(source);
this._size++;

// Close immediately if done
if (sources.done) {
delete this._pending;
this.close();
let node: CircularNode<AsyncIterator<T>>;
if (successor === null) {
// There is no circular list so make one
node = { value: source } as CircularNode<AsyncIterator<T>>;
node.next = node;
node.last = node;
this._node = node;
}
// Otherwise, set up source reading
else {
sources.on('data', source => {
this._addSource(source as MaybePromise<InternalSource<T>>);
this._fillBufferAsync();
});
sources.on('end', () => {
delete this._pending;
this._fillBuffer();
});
// Insert the source into the circular structure
this._node = node = {
value: source,
next: successor,
last: successor.last,
};

successor.last.next = node;
successor.last = node;
}
source[NODE] = node;
}

// Adds the given source to the internal sources array
protected _addSource(source: MaybePromise<InternalSource<T>>) {
if (isPromise(source))
source = wrap<T>(source) as any as InternalSource<T>;
if (!source.done) {
this._sources.push(source);
source[DESTINATION] = this;
source.on('error', destinationEmitError);
source.on('readable', destinationFillBuffer);
source.on('end', destinationRemoveEmptySources);
protected _removeSource(value: InternalSource<T>) {
const node = value[NODE]!;
this._unListenSource(value);
delete value[NODE];
this._size--;

if (this._size === 0) {
this._node = null;
}
else {
node.last.next = node.next;
node.next.last = node.last;

// Make sure the root node no longer
// references a deleted iterator.
if (this._node === node)
this._node = node.next;
}
}

// Removes sources that will no longer emit items
protected _removeEmptySources() {
this._sources = this._sources.filter((source, index) => {
// Adjust the index of the current source if needed
if (source.done && index <= this._currentSource)
this._currentSource--;
return !source.done;
});
this._fillBuffer();
}

// Reads items from the next sources
protected _read(count: number, done: () => void): void {
// Start source loading if needed
if (this._pending?.loading === false)
this._loadSources();

// Try to read `count` items
let lastCount = 0, item : T | null;
while (lastCount !== (lastCount = count)) {
// Try every source at least once
for (let i = 0; i < this._sources.length && count > 0; i++) {
// Pick the next source
this._currentSource = (this._currentSource + 1) % this._sources.length;
const source = this._sources[this._currentSource];
// Attempt to read an item from that source
if ((item = source.read()) !== null) {
count--;
this._push(item);
}
public read(): T | null {
// TODO: Get rid of this when merging #45
if (!this._sourceStarted)
this._sourceStarted = true;

let { _sources, _size } = this;
let item: T | null;
let iterator: AsyncIterator<T> | null;
while (_size > 0) {
iterator = this._node!.value;

if (iterator.readable && (item = iterator.read()) !== null)
return item;

// TODO: in #45 - if the iterator is done, get rid of it from the circular list
// if (iterator.done)
// this._removeSource(iterator);
// else
this._node = this._node!.next;

_size--;
}

while (this._size < this._maxParallelIterators && (iterator = _sources.read()) !== null) {
if (!iterator.done) {
this._addSource(iterator);
if (iterator.readable && (item = iterator.read()) !== null)
return item;
}
}

// Close this iterator if all of its sources have been read
if (!this._pending && this._sources.length === 0)
if (this._size === 0 && this._sources.done)
this.close();
done();
}
else
this.readable = false;

protected _end(destroy: boolean = false) {
super._end(destroy);
return null;
}

public close() {
this._unListenSource(this._sources);
// Destroy all sources that are still readable
if (this._destroySources) {
for (const source of this._sources)
source.destroy();

// Also close the sources stream if applicable
if (this._pending) {
this._pending!.sources!.destroy();
delete this._pending;
let { _node, _size } = this;
while (_size > 0) {
this._unListenSource(_node!.value);
_node!.value!.destroy();
delete (_node as any)[NODE];
_size--;
_node = _node!.next;
}
this._node = null;
}
this._sources.destroy();
super.close();
}
}

function destinationRemoveEmptySources<T>(this: InternalSource<T>) {
(this[DESTINATION] as any)._removeEmptySources();
// Eventually, rather than just removing empty sources I think
// we will want to find a way of specifically removing the source
// that emitted the end event (the trick is to do this without creating race conditions)
const destination = this[DESTINATION] as any;
if (NODE in this) {
destination._removeSource(this);
if (destination._size === 0 && destination._sources.done && destination._sourceStarted) {
destination!.close();
return;
}
destination.readable = true;
// const _sources = destination._sources;
// let iterator;
// while (destination._size < destination._maxParallelIterators && iterator.readable && (iterator = _sources.read()) !== null) {
// if (!iterator.done) {
// destination._addSource(iterator);
// if (iterator.readable) {
// destination.readable = true;
// return;
// }
// }
// }
}
else {
destination._unListenSource(this);
if (destination._size === 0) {
if (destination._sourceStarted)
destination.close();
else
destination.readable = true;
}
}
}


/**
An iterator that copies items from another iterator.
@extends module:asynciterator.TransformIterator
Expand Down Expand Up @@ -2224,7 +2283,7 @@ type SourceExpression<T> =
(() => MaybePromise<AsyncIterator<T>>);

type InternalSource<T> =
AsyncIterator<T> & { [DESTINATION]?: AsyncIterator<any> };
AsyncIterator<T> & { [DESTINATION]?: AsyncIterator<any>; [NODE]?: CircularNode<AsyncIterator<any>> };

// Returns a function that calls `fn` with `self` as `this` pointer. */
function bind<T extends Function>(fn: T, self?: object): T {
Expand Down
13 changes: 13 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
"c8": "^7.2.0",
"chai": "^4.2.0",
"eslint": "^8.0.0",
"event-emitter-promisify": "^1.1.0",
"husky": "^4.2.5",
"jaguarjs-jsdoc": "^1.1.0",
"jsdoc": "^3.5.5",
Expand Down
Loading