Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,245 changes: 2,245 additions & 0 deletions asynciterator copy.ts

Large diffs are not rendered by default.

261 changes: 152 additions & 109 deletions asynciterator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -481,8 +481,8 @@ export class AsyncIterator<T> extends EventEmitter {
@param {Array|module:asynciterator.AsyncIterator} items Items to insert before this iterator's (remaining) items
@returns {module:asynciterator.AsyncIterator} A new iterator that prepends items to this iterator
*/
prepend(items: T[] | AsyncIterator<T>): AsyncIterator<T> {
return this.transform({ prepend: items });
prepend(items: IterableSource<T>): AsyncIterator<T> {
return union([wrap(items), this]);
}

/**
Expand All @@ -491,8 +491,8 @@ export class AsyncIterator<T> extends EventEmitter {
@param {Array|module:asynciterator.AsyncIterator} items Items to insert after this iterator's (remaining) items
@returns {module:asynciterator.AsyncIterator} A new iterator that appends items to this iterator
*/
append(items: T[] | AsyncIterator<T>): AsyncIterator<T> {
return this.transform({ append: items });
append(items: IterableSource<T>): AsyncIterator<T> {
return union([this, wrap(items)]);
}

/**
Expand Down Expand Up @@ -1619,147 +1619,189 @@ export class MultiTransformIterator<S, D = S> extends TransformIterator<S, D> {
}
}

/** Key indicating the node of a source in a union buffer. */
export const NODE = Symbol('node');

interface CircularNode<T> {
last: CircularNode<T>;
next: CircularNode<T>;
value: T;
}

/**
An iterator that generates items by reading from multiple other iterators.
@extends module:asynciterator.BufferedIterator
@extends module:asynciterator.AsyncIterator
*/
export class UnionIterator<T> extends BufferedIterator<T> {
private _sources : InternalSource<T>[] = [];
private _pending? : { loading: boolean, sources?: AsyncIterator<MaybePromise<AsyncIterator<T>>> };
private _currentSource = -1;
protected _destroySources: boolean;
export class UnionIterator<T> extends AsyncIterator<T> {
private _sources : AsyncIterator<AsyncIterator<T>>;
private _node: CircularNode<AsyncIterator<T>> | null = null;
private _size: number = 0;
private _sourceStarted: boolean = false;
private _destroySources: boolean;
private _maxParallelIterators: number;

/**
Creates a new `UnionIterator`.
@param {module:asynciterator.AsyncIterator|Array} [sources] The sources to read from
@param {module:asynciterator.AsyncIterator|Array} [sources] The sources to read from.
@param {object} [options] Settings of the iterator
@param {boolean} [options.destroySource=true] Whether the sources should be destroyed when transformed iterator is closed or destroyed
@param {number} [options.maxParallelIterators=Infinity] The maximum number of iterators that can be read from simultaneously.
Setting equal to 1 will cause iterators to be read from sequentially and in order.
*/
constructor(sources: AsyncIteratorOrArray<AsyncIterator<T>> |
AsyncIteratorOrArray<Promise<AsyncIterator<T>>> |
AsyncIteratorOrArray<MaybePromise<AsyncIterator<T>>>,
options: BufferedIteratorOptions & { destroySources?: boolean } = {}) {
super(options);
const autoStart = options.autoStart !== false;
AsyncIteratorOrArray<MaybePromise<AsyncIterator<T>>> |
MaybePromise<AsyncIteratorOrArray<MaybePromise<IterableSource<T>>>>,
options: { destroySources?: boolean, maxParallelIterators?: number } = {}) {
super();
this._listenSource(this._sources = wrap(sources).map<AsyncIterator<T>>(wrap));

// Sources have been passed as an iterator
if (isEventEmitter(sources)) {
sources.on('error', error => this.emit('error', error));
this._pending = { loading: false, sources: sources as AsyncIterator<MaybePromise<AsyncIterator<T>>> };
if (autoStart)
this._loadSources();
}
// Sources have been passed as a non-empty array
else if (Array.isArray(sources) && sources.length > 0) {
for (const source of sources)
this._addSource(source as MaybePromise<InternalSource<T>>);
}
// Sources are an empty list
else if (autoStart) {
this.close();
}
// Set other options
this._destroySources = options.destroySources !== false;
this._maxParallelIterators = options.maxParallelIterators || Infinity;

this.readable = this._sources.readable;
}

// Loads pending sources into the sources list
protected _loadSources() {
// Obtain sources iterator
const sources = this._pending!.sources!;
this._pending!.loading = true;
// Adds the given source to the internal sources array
protected _listenSource(source: InternalSource<any>) {
source[DESTINATION] = this;
source.on('error', destinationEmitError);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the second of two behaviors I am not happy with. In particular; errors will only be propogated once we have pulled it from this._source. An error may occur before this.

// TODO: On readable events we should start reading from the iterator
// that emitted the event
source.on('readable', destinationSetReadable);
source.on('end', destinationRemoveEmptySources);
}

protected _unListenSource(source: InternalSource<any>) {
source.removeListener('error', destinationEmitError);
source.removeListener('readable', destinationSetReadable);
source.removeListener('end', destinationRemoveEmptySources);
delete source[DESTINATION];
}

protected _addSource(source: InternalSource<any>, successor = this._node) {
this._listenSource(source);
this._size++;

let node: CircularNode<AsyncIterator<T>>;
if (successor === null) {
// There is no circular list so make one
node = { value: source } as CircularNode<AsyncIterator<T>>;
node.next = node;
node.last = node;
this._node = node;
} else {
// Insert the source into the circular structure
this._node = node = {
value: source,
next: successor,
last: successor.last
};

// Close immediately if done
if (sources.done) {
delete this._pending;
this.close();
}
// Otherwise, set up source reading
else {
sources.on('data', source => {
this._addSource(source as MaybePromise<InternalSource<T>>);
this._fillBufferAsync();
});
sources.on('end', () => {
delete this._pending;
this._fillBuffer();
});
successor.last.next = node;
successor.last = node;
}
source[NODE] = node;
}

// Adds the given source to the internal sources array
protected _addSource(source: MaybePromise<InternalSource<T>>) {
if (isPromise(source))
source = wrap<T>(source) as any as InternalSource<T>;
if (!source.done) {
this._sources.push(source);
source[DESTINATION] = this;
source.on('error', destinationEmitError);
source.on('readable', destinationFillBuffer);
source.on('end', destinationRemoveEmptySources);
protected _removeSource(value: InternalSource<T>) {
const node = value[NODE]!;
this._unListenSource(value);
delete value[NODE];
this._size--;

if (this._size === 0) {
this._node = null;
} else {
node.last.next = node.next;
node.next.last = node.last;

// Make sure the root node no longer
// references a deleted iterator.
if (this._node === node) {
this._node = node.next;
}
}
}

// Removes sources that will no longer emit items
protected _removeEmptySources() {
this._sources = this._sources.filter((source, index) => {
// Adjust the index of the current source if needed
if (source.done && index <= this._currentSource)
this._currentSource--;
return !source.done;
});
this._fillBuffer();
}

// Reads items from the next sources
protected _read(count: number, done: () => void): void {
// Start source loading if needed
if (this._pending?.loading === false)
this._loadSources();

// Try to read `count` items
let lastCount = 0, item : T | null;
while (lastCount !== (lastCount = count)) {
// Try every source at least once
for (let i = 0; i < this._sources.length && count > 0; i++) {
// Pick the next source
this._currentSource = (this._currentSource + 1) % this._sources.length;
const source = this._sources[this._currentSource];
// Attempt to read an item from that source
if ((item = source.read()) !== null) {
count--;
this._push(item);
}
public read(): T | null {
// TODO: Get rid of this when merging #45
if (!this._sourceStarted)
this._sourceStarted = true;

let { _sources, _size } = this;
let item: T | null;
let iterator: AsyncIterator<T> | null;
while (_size > 0) {
iterator = this._node!.value;

if (iterator.readable && (item = iterator.read()) !== null)
return item;

// If the iterator is done, get rid of it from the circular list
if (iterator.done) {
this._removeSource(iterator);
} else {
this._node = this._node!.next;
}
_size--;
// TODO: See if this should be an else
}

// Close this iterator if all of its sources have been read
if (!this._pending && this._sources.length === 0)
this.close();
done();
while (this._size < this._maxParallelIterators && (iterator = _sources.read()) !== null) {
// TODO - it would be nice to skip adding sources if it is a single or no
// element iterator.
this._addSource(iterator);
if ((item = iterator.read()) !== null)
return item;
}
this.readable = false;
return null;
}

protected _end(destroy: boolean = false) {
super._end(destroy);

public close() {
this._unListenSource(this._sources);
// Destroy all sources that are still readable
if (this._destroySources) {
for (const source of this._sources)
source.destroy();

// Also close the sources stream if applicable
if (this._pending) {
this._pending!.sources!.destroy();
delete this._pending;
let { _node, _size } = this;
while (_size > 0) {
this._unListenSource(_node!.value);
_node!.value!.destroy();
delete (_node as any)[NODE];
_size--;
_node = _node!.next
}
this._node = null;
}
this._sources.destroy();
super.close();
}
}

function destinationRemoveEmptySources<T>(this: InternalSource<T>) {
(this[DESTINATION] as any)._removeEmptySources();
// Eventually, rather than just removing empty sources I think
// we will want to find a way of specifically removing the source
// that emitted the end event (the trick is to do this without creating race conditions)
const destination = this[DESTINATION];
if (NODE in this) {
(this[DESTINATION] as any)._removeSource(this);
if ((destination as any)._size === 0 && (destination as any)._sources.done) {
// (this[DESTINATION] as any).readable = true;
(destination as any).readable = true;
// TODO: Also capture the case where we need to just start re-filling the circular
// list
}
} else {
(this[DESTINATION] as any)._unListenSource(this);
if ((destination as any)._size === 0) {
// (this[DESTINATION] as any).readable = true;
(destination as any).readable = true;
}
}
// (destination as any).readable = true;
}


/**
An iterator that copies items from another iterator.
@extends module:asynciterator.TransformIterator
Expand Down Expand Up @@ -2166,7 +2208,8 @@ export function fromIterable<T>(source: Iterable<T> | Iterator<T>): AsyncIterato
*/
export function union<T>(sources: AsyncIteratorOrArray<AsyncIterator<T>> |
AsyncIteratorOrArray<Promise<AsyncIterator<T>>> |
AsyncIteratorOrArray<MaybePromise<AsyncIterator<T>>>) {
AsyncIteratorOrArray<MaybePromise<AsyncIterator<T>>> |
MaybePromise<AsyncIteratorOrArray<MaybePromise<IterableSource<T>>>>) {
return new UnionIterator<T>(sources);
}

Expand Down Expand Up @@ -2227,7 +2270,7 @@ type SourceExpression<T> =
(() => MaybePromise<AsyncIterator<T>>);

type InternalSource<T> =
AsyncIterator<T> & { [DESTINATION]?: AsyncIterator<any> };
AsyncIterator<T> & { [DESTINATION]?: AsyncIterator<any>; [NODE]?: CircularNode<AsyncIterator<any>> };

// Returns a function that calls `fn` with `self` as `this` pointer. */
function bind<T extends Function>(fn: T, self?: object): T {
Expand Down
Loading