@@ -5,6 +5,8 @@ import { findDoubleNewlineIndex, LineDecoder } from '../internal/decoders/line';
5
5
import { ReadableStreamToAsyncIterable } from '../internal/shims' ;
6
6
import { isAbortError } from '../internal/errors' ;
7
7
import { encodeUTF8 } from '../internal/utils/bytes' ;
8
+ import { loggerFor } from '../internal/utils/log' ;
9
+ import type { Opencode } from '../client' ;
8
10
9
11
type Bytes = string | ArrayBuffer | Uint8Array | null | undefined ;
10
12
@@ -16,16 +18,24 @@ export type ServerSentEvent = {
16
18
17
19
export class Stream < Item > implements AsyncIterable < Item > {
18
20
controller : AbortController ;
21
+ #client: Opencode | undefined ;
19
22
20
23
constructor (
21
24
private iterator : ( ) => AsyncIterator < Item > ,
22
25
controller : AbortController ,
26
+ client ?: Opencode ,
23
27
) {
24
28
this . controller = controller ;
29
+ this . #client = client ;
25
30
}
26
31
27
- static fromSSEResponse < Item > ( response : Response , controller : AbortController ) : Stream < Item > {
32
+ static fromSSEResponse < Item > (
33
+ response : Response ,
34
+ controller : AbortController ,
35
+ client ?: Opencode ,
36
+ ) : Stream < Item > {
28
37
let consumed = false ;
38
+ const logger = client ? loggerFor ( client ) : console ;
29
39
30
40
async function * iterator ( ) : AsyncIterator < Item , any , undefined > {
31
41
if ( consumed ) {
@@ -38,8 +48,8 @@ export class Stream<Item> implements AsyncIterable<Item> {
38
48
try {
39
49
yield JSON . parse ( sse . data ) ;
40
50
} catch ( e ) {
41
- console . error ( `Could not parse message into JSON:` , sse . data ) ;
42
- console . error ( `From chunk:` , sse . raw ) ;
51
+ logger . error ( `Could not parse message into JSON:` , sse . data ) ;
52
+ logger . error ( `From chunk:` , sse . raw ) ;
43
53
throw e ;
44
54
}
45
55
}
@@ -54,14 +64,18 @@ export class Stream<Item> implements AsyncIterable<Item> {
54
64
}
55
65
}
56
66
57
- return new Stream ( iterator , controller ) ;
67
+ return new Stream ( iterator , controller , client ) ;
58
68
}
59
69
60
70
/**
61
71
* Generates a Stream from a newline-separated ReadableStream
62
72
* where each item is a JSON value.
63
73
*/
64
- static fromReadableStream < Item > ( readableStream : ReadableStream , controller : AbortController ) : Stream < Item > {
74
+ static fromReadableStream < Item > (
75
+ readableStream : ReadableStream ,
76
+ controller : AbortController ,
77
+ client ?: Opencode ,
78
+ ) : Stream < Item > {
65
79
let consumed = false ;
66
80
67
81
async function * iterLines ( ) : AsyncGenerator < string , void , unknown > {
@@ -101,7 +115,7 @@ export class Stream<Item> implements AsyncIterable<Item> {
101
115
}
102
116
}
103
117
104
- return new Stream ( iterator , controller ) ;
118
+ return new Stream ( iterator , controller , client ) ;
105
119
}
106
120
107
121
[ Symbol . asyncIterator ] ( ) : AsyncIterator < Item > {
@@ -131,8 +145,8 @@ export class Stream<Item> implements AsyncIterable<Item> {
131
145
} ;
132
146
133
147
return [
134
- new Stream ( ( ) => teeIterator ( left ) , this . controller ) ,
135
- new Stream ( ( ) => teeIterator ( right ) , this . controller ) ,
148
+ new Stream ( ( ) => teeIterator ( left ) , this . controller , this . #client ) ,
149
+ new Stream ( ( ) => teeIterator ( right ) , this . controller , this . #client ) ,
136
150
] ;
137
151
}
138
152
0 commit comments