@@ -5,6 +5,8 @@ import { findDoubleNewlineIndex, LineDecoder } from '../internal/decoders/line';
5
5
import { ReadableStreamToAsyncIterable } from '../internal/shims' ;
6
6
import { isAbortError } from '../internal/errors' ;
7
7
import { encodeUTF8 } from '../internal/utils/bytes' ;
8
+ import { loggerFor } from '../internal/utils/log' ;
9
+ import type { OpenAI } from '../client' ;
8
10
9
11
import { APIError } from './error' ;
10
12
@@ -18,16 +20,24 @@ export type ServerSentEvent = {
18
20
19
21
export class Stream < Item > implements AsyncIterable < Item > {
20
22
controller : AbortController ;
23
+ #client: OpenAI | undefined ;
21
24
22
25
constructor (
23
26
private iterator : ( ) => AsyncIterator < Item > ,
24
27
controller : AbortController ,
28
+ client ?: OpenAI ,
25
29
) {
26
30
this . controller = controller ;
31
+ this . #client = client ;
27
32
}
28
33
29
- static fromSSEResponse < Item > ( response : Response , controller : AbortController ) : Stream < Item > {
34
+ static fromSSEResponse < Item > (
35
+ response : Response ,
36
+ controller : AbortController ,
37
+ client ?: OpenAI ,
38
+ ) : Stream < Item > {
30
39
let consumed = false ;
40
+ const logger = client ? loggerFor ( client ) : console ;
31
41
32
42
async function * iterator ( ) : AsyncIterator < Item , any , undefined > {
33
43
if ( consumed ) {
@@ -54,8 +64,8 @@ export class Stream<Item> implements AsyncIterable<Item> {
54
64
try {
55
65
data = JSON . parse ( sse . data ) ;
56
66
} catch ( e ) {
57
- console . error ( `Could not parse message into JSON:` , sse . data ) ;
58
- console . error ( `From chunk:` , sse . raw ) ;
67
+ logger . error ( `Could not parse message into JSON:` , sse . data ) ;
68
+ logger . error ( `From chunk:` , sse . raw ) ;
59
69
throw e ;
60
70
}
61
71
@@ -91,14 +101,18 @@ export class Stream<Item> implements AsyncIterable<Item> {
91
101
}
92
102
}
93
103
94
- return new Stream ( iterator , controller ) ;
104
+ return new Stream ( iterator , controller , client ) ;
95
105
}
96
106
97
107
/**
98
108
* Generates a Stream from a newline-separated ReadableStream
99
109
* where each item is a JSON value.
100
110
*/
101
- static fromReadableStream < Item > ( readableStream : ReadableStream , controller : AbortController ) : Stream < Item > {
111
+ static fromReadableStream < Item > (
112
+ readableStream : ReadableStream ,
113
+ controller : AbortController ,
114
+ client ?: OpenAI ,
115
+ ) : Stream < Item > {
102
116
let consumed = false ;
103
117
104
118
async function * iterLines ( ) : AsyncGenerator < string , void , unknown > {
@@ -138,7 +152,7 @@ export class Stream<Item> implements AsyncIterable<Item> {
138
152
}
139
153
}
140
154
141
- return new Stream ( iterator , controller ) ;
155
+ return new Stream ( iterator , controller , client ) ;
142
156
}
143
157
144
158
[ Symbol . asyncIterator ] ( ) : AsyncIterator < Item > {
@@ -168,8 +182,8 @@ export class Stream<Item> implements AsyncIterable<Item> {
168
182
} ;
169
183
170
184
return [
171
- new Stream ( ( ) => teeIterator ( left ) , this . controller ) ,
172
- new Stream ( ( ) => teeIterator ( right ) , this . controller ) ,
185
+ new Stream ( ( ) => teeIterator ( left ) , this . controller , this . #client ) ,
186
+ new Stream ( ( ) => teeIterator ( right ) , this . controller , this . #client ) ,
173
187
] ;
174
188
}
175
189
0 commit comments