Skip to content

Commit 016b111

Browse files
authored
fix(provider-utils): make ReadableStream.cancel() properly finalize async iterators (#9834)
## Background `convertAsyncIteratorToReadableStream` didn't notify its source when a consumer called `cancel()`. This left async generators running even after the stream was cancelled. This affects any code that adapts `convertAsyncIteratorToReadableStream`, e.g. LlamaIndex adapter ## Manual Verification Checked a real streaming flow (OpenAI via LlamaIndex): read a few chunks, then cancel. Next read returns done=true and no further chunks are produced. ## Summary - Fixed `convertAsyncIteratorToReadableStream` to properly call `iterator.return()` when the stream is cancelled - Added unit tests
1 parent 4fe6e10 commit 016b111

File tree

3 files changed

+96
-1
lines changed

3 files changed

+96
-1
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@ai-sdk/provider-utils': patch
3+
---
4+
5+
fix(provider-utils): make ReadableStream.cancel() properly finalize async iterators
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
import { convertAsyncIteratorToReadableStream } from './convert-async-iterator-to-readable-stream';
2+
import { describe, it, expect } from 'vitest';
3+
4+
async function* makeGenerator(onFinally: () => void) {
5+
try {
6+
let i = 0;
7+
while (true) {
8+
await new Promise(r => setTimeout(r, 0));
9+
yield i++;
10+
}
11+
} finally {
12+
onFinally();
13+
}
14+
}
15+
16+
describe('convertAsyncIteratorToReadableStream', () => {
17+
it('calls iterator.return() on cancel and triggers finally', async () => {
18+
let finallyCalled = false;
19+
const it = makeGenerator(() => {
20+
finallyCalled = true;
21+
});
22+
const stream = convertAsyncIteratorToReadableStream(it);
23+
const reader = stream.getReader();
24+
25+
await reader.read();
26+
27+
await reader.cancel('stop');
28+
29+
// give microtasks a tick for finally to run
30+
await new Promise(r => setTimeout(r, 0));
31+
32+
expect(finallyCalled).toBe(true);
33+
});
34+
35+
it('does not enqueue further values after cancel', async () => {
36+
const it = makeGenerator(() => {});
37+
const stream = convertAsyncIteratorToReadableStream(it);
38+
const reader = stream.getReader();
39+
40+
await reader.read();
41+
await reader.cancel('stop');
42+
43+
const { done, value } = await reader.read();
44+
expect(done).toBe(true);
45+
expect(value).toBeUndefined();
46+
});
47+
48+
it('works with iterator without return() method', async () => {
49+
const it: AsyncIterator<number> = {
50+
async next() {
51+
return { value: 42, done: false };
52+
},
53+
};
54+
const stream = convertAsyncIteratorToReadableStream(it);
55+
const reader = stream.getReader();
56+
57+
const { value } = await reader.read();
58+
expect(value).toBe(42);
59+
60+
await expect(reader.cancel()).resolves.toBeUndefined();
61+
});
62+
63+
it('ignores errors from iterator.return()', async () => {
64+
const it: AsyncIterator<number> = {
65+
async next() {
66+
return { value: 1, done: false };
67+
},
68+
async return() {
69+
throw new Error('return() failed');
70+
},
71+
};
72+
const stream = convertAsyncIteratorToReadableStream(it);
73+
const reader = stream.getReader();
74+
75+
await reader.read();
76+
await expect(reader.cancel()).resolves.toBeUndefined();
77+
});
78+
});

packages/provider-utils/src/convert-async-iterator-to-readable-stream.ts

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
export function convertAsyncIteratorToReadableStream<T>(
99
iterator: AsyncIterator<T>,
1010
): ReadableStream<T> {
11+
let cancelled = false;
12+
1113
return new ReadableStream<T>({
1214
/**
1315
* Called when the consumer wants to pull more data from the stream.
@@ -16,6 +18,7 @@ export function convertAsyncIteratorToReadableStream<T>(
1618
* @returns {Promise<void>}
1719
*/
1820
async pull(controller) {
21+
if (cancelled) return;
1922
try {
2023
const { value, done } = await iterator.next();
2124
if (done) {
@@ -30,6 +33,15 @@ export function convertAsyncIteratorToReadableStream<T>(
3033
/**
3134
* Called when the consumer cancels the stream.
3235
*/
33-
cancel() {},
36+
async cancel(reason?: unknown) {
37+
cancelled = true;
38+
if (iterator.return) {
39+
try {
40+
await iterator.return(reason);
41+
} catch {
42+
// intentionally ignore errors during cancellation
43+
}
44+
}
45+
},
3446
});
3547
}

0 commit comments

Comments
 (0)