Skip to content

Commit bd30fa2

Browse files
committed
🎉 feat: refactor sse
1 parent fe59894 commit bd30fa2

File tree

10 files changed

+387
-81
lines changed

10 files changed

+387
-81
lines changed

CHANGELOG.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,13 @@
1+
# 1.3.16 - 22 Aug 2025
2+
Improvement:
3+
- `sse` now infer type
4+
- `sse` now accepts `ReadableStream` to return stream as `text/event-stream`
5+
- refactor SSE handler
6+
- support returning `ReadbleStream` from generator or async generator
7+
8+
Bug fix:
9+
- static response now use callback clone instead of bind
10+
111
# 1.3.15 - 21 Aug 2025
212
Bug fix:
313
- ValidationError.detail only handle custom error

example/a.ts

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,23 @@
1-
import { Elysia, t, validationDetail } from '../src'
1+
import { Elysia, sse, t } from '../src'
2+
import { streamResponse } from '../src/adapter/utils'
3+
import { req } from '../test/utils'
24

3-
new Elysia()
4-
.onError(({ error, code }) => {
5-
if (code === 'VALIDATION') return error.detail(error.message)
6-
})
7-
.post('/', () => 'Hello World!', {
8-
body: t.Object({
9-
x: t.Number({
10-
error: 'x must be a number'
11-
}),
5+
const app = new Elysia()
6+
.get('/', function () {
7+
return new ReadableStream({
8+
async start(controller) {
9+
controller.enqueue('a')
10+
await Bun.sleep(100)
11+
controller.enqueue('b')
12+
await Bun.sleep(100)
13+
controller.close()
14+
}
1215
})
1316
})
1417
.listen(3000)
18+
19+
const response = await app.handle(req('/'))
20+
21+
for await (const a of streamResponse(response)) {
22+
console.log(a)
23+
}

src/adapter/bun/handler-native.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,6 @@ export const createNativeStaticHandler = (
3939
if (!response.headers.has('content-type'))
4040
response.headers.append('content-type', 'text/plain')
4141

42-
return () => response.clone() as any
42+
return () => response.clone() as Response
4343
}
4444
}

src/adapter/bun/handler.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -664,7 +664,7 @@ export const createStaticHandler = (
664664
!hooks.beforeHandle?.length &&
665665
!hooks.afterHandle?.length
666666
)
667-
return response.clone.bind(response) as any
667+
return () => response.clone() as Response
668668
}
669669

670670
const handleResponse = createResponseHandler({

src/adapter/utils.ts

Lines changed: 69 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -147,28 +147,48 @@ type CreateHandlerParameter = {
147147
mapCompactResponse(response: unknown, request?: Request): Response
148148
}
149149

150+
const asyncIterator = Symbol.asyncIterator
151+
150152
export const createStreamHandler =
151153
({ mapResponse, mapCompactResponse }: CreateHandlerParameter) =>
152154
async (
153-
generator: Generator | AsyncGenerator,
155+
generator: Generator | AsyncGenerator | ReadableStream,
154156
set?: Context['set'],
155157
request?: Request
156158
) => {
157-
let init = generator.next()
159+
// Since ReadableStream doesn't have next, init might be undefined
160+
let init = (generator as Generator).next?.() as
161+
| IteratorResult<unknown>
162+
| undefined
163+
158164
if (init instanceof Promise) init = await init
159165

160-
if (typeof init?.done === 'undefined' || init?.done) {
166+
// Generator or ReadableStream is returned from a generator function
167+
if (init?.value instanceof ReadableStream) {
168+
// @ts-ignore
169+
generator = init.value
170+
} else if (init && (typeof init?.done === 'undefined' || init?.done)) {
161171
if (set) return mapResponse(init.value, set, request)
162172
return mapCompactResponse(init.value, request)
163173
}
164174

165-
const contentType =
166-
// @ts-ignore
167-
init.value && typeof init.value?.stream
168-
? 'text/event-stream'
169-
: init.value && typeof init.value === 'object'
170-
? 'application/json'
171-
: 'text/plain'
175+
const isSSE =
176+
// @ts-ignore First SSE result is wrapped with sse()
177+
init?.value?.sse ??
178+
// @ts-ignore ReadableStream is wrapped with sse()
179+
generator?.sse ??
180+
// User explicitly set content-type to SSE
181+
set?.headers['content-type']?.startsWith('text/event-stream')
182+
183+
const format = isSSE
184+
? (data: string) => `data: ${data}\n\n`
185+
: (data: string) => data
186+
187+
const contentType = isSSE
188+
? 'text/event-stream'
189+
: init?.value && typeof init?.value === 'object'
190+
? 'application/json'
191+
: 'text/plain'
172192

173193
if (set?.headers) {
174194
if (!set.headers['transfer-encoding'])
@@ -177,7 +197,7 @@ export const createStreamHandler =
177197
set.headers['content-type'] = contentType
178198
if (!set.headers['cache-control'])
179199
set.headers['cache-control'] = 'no-cache'
180-
} else {
200+
} else
181201
set = {
182202
status: 200,
183203
headers: {
@@ -187,7 +207,6 @@ export const createStreamHandler =
187207
connection: 'keep-alive'
188208
}
189209
}
190-
}
191210

192211
return new Response(
193212
new ReadableStream({
@@ -199,57 +218,58 @@ export const createStreamHandler =
199218

200219
try {
201220
controller.close()
202-
} catch {
203-
// nothing
204-
}
221+
} catch {}
205222
})
206223

207-
if (init.value !== undefined && init.value !== null) {
224+
if (!init || init.value instanceof ReadableStream) {
225+
} else if (
226+
init.value !== undefined &&
227+
init.value !== null
228+
) {
208229
// @ts-ignore
209-
if (init.value.toStream)
230+
if (init.value.toSSE)
210231
// @ts-ignore
211-
controller.enqueue(init.value.toStream())
232+
controller.enqueue(init.value.toSSE())
212233
else if (typeof init.value === 'object')
213234
try {
214235
controller.enqueue(
215-
Buffer.from(JSON.stringify(init.value))
236+
format(JSON.stringify(init.value))
216237
)
217238
} catch {
218239
controller.enqueue(
219-
Buffer.from(init.value.toString())
240+
format(init.value.toString())
220241
)
221242
}
222-
else
223-
controller.enqueue(
224-
Buffer.from(init.value.toString())
225-
)
243+
else controller.enqueue(format(init.value.toString()))
226244
}
227245

228-
for await (const chunk of generator) {
229-
if (end) break
230-
if (chunk === undefined || chunk === null) continue
246+
try {
247+
for await (const chunk of generator) {
248+
if (end) break
249+
if (chunk === undefined || chunk === null) continue
231250

232-
// @ts-ignore
233-
if (chunk.toStream)
234251
// @ts-ignore
235-
controller.enqueue(chunk.toStream())
236-
else if (typeof chunk === 'object')
237-
try {
238-
controller.enqueue(
239-
Buffer.from(JSON.stringify(chunk))
240-
)
241-
} catch {
242-
controller.enqueue(
243-
Buffer.from(chunk.toString())
244-
)
245-
}
246-
else controller.enqueue(Buffer.from(chunk.toString()))
247-
248-
// Wait for the next event loop
249-
// Otherwise the data will be mixed up
250-
await new Promise<void>((resolve) =>
251-
setTimeout(() => resolve(), 0)
252-
)
252+
if (chunk.toSSE)
253+
// @ts-ignore
254+
controller.enqueue(chunk.toSSE())
255+
else if (typeof chunk === 'object')
256+
try {
257+
controller.enqueue(
258+
format(JSON.stringify(chunk))
259+
)
260+
} catch {
261+
controller.enqueue(format(chunk.toString()))
262+
}
263+
else controller.enqueue(format(chunk.toString()))
264+
265+
// Wait for the next event loop
266+
// Otherwise the data will be mixed up
267+
await new Promise<void>((resolve) =>
268+
setTimeout(() => resolve(), 0)
269+
)
270+
}
271+
} catch (error) {
272+
console.warn(error)
253273
}
254274

255275
try {
@@ -276,7 +296,8 @@ export async function* streamResponse(response: Response) {
276296
const { done, value } = await reader.read()
277297
if (done) break
278298

279-
yield decoder.decode(value)
299+
if (typeof value === 'string') yield value
300+
else yield decoder.decode(value)
280301
}
281302
} finally {
282303
reader.releaseLock()

src/adapter/web-standard/handler.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -699,7 +699,7 @@ export const createStaticHandler = (
699699
!hooks.beforeHandle?.length &&
700700
!hooks.afterHandle?.length
701701
)
702-
return response.clone.bind(response) as any
702+
return () => response.clone() as Response
703703
}
704704

705705
const handleResponse = createResponseHandler({

src/types.ts

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1612,14 +1612,14 @@ type _CreateEden<
16121612
[x in Path]: Property
16131613
}
16141614

1615-
type RemoveStartinSlash<T> = T extends `/${infer Rest}` ? Rest : T
1615+
type RemoveStartingSlash<T> = T extends `/${infer Rest}` ? Rest : T
16161616

16171617
export type CreateEden<
16181618
Path extends string,
16191619
Property extends Record<string, unknown> = {}
16201620
> = Path extends '' | '/'
16211621
? Property
1622-
: _CreateEden<RemoveStartinSlash<Path>, Property>
1622+
: _CreateEden<RemoveStartingSlash<Path>, Property>
16231623

16241624
export type ComposeElysiaResponse<
16251625
Schema extends RouteSchema,
@@ -1652,7 +1652,9 @@ type _ComposeElysiaResponse<Schema extends RouteSchema, Handle> = Prettify<
16521652
>['response']
16531653
: Handle extends Generator<infer A, infer B, infer C>
16541654
? AsyncGenerator<A, B, C>
1655-
: Replace<Handle, ElysiaFile, File>
1655+
: Handle extends ReadableStream<infer A>
1656+
? AsyncGenerator<A, void, unknown>
1657+
: Replace<Handle, ElysiaFile, File>
16561658
}) &
16571659
ExtractErrorFromHandle<Handle> &
16581660
({} extends Omit<Schema['response'], 200>
@@ -1843,8 +1845,9 @@ type SetContentType =
18431845
| 'application/zip'
18441846
| 'text/css'
18451847
| 'text/csv'
1846-
| 'text/html'
18471848
| 'text/calendar'
1849+
| 'text/event-stream'
1850+
| 'text/html'
18481851
| 'text/javascript'
18491852
| 'text/plain'
18501853
| 'text/xml'
@@ -2036,13 +2039,16 @@ export type MergeTypeModule<
20362039
B extends TModule<any, any>
20372040
> = TModule<Prettify<UnwrapTypeModule<A> & UnwrapTypeModule<B>>>
20382041

2039-
export type SSEPayload = {
2042+
export type SSEPayload<
2043+
Data extends unknown = unknown,
2044+
Event extends string | undefined = string | undefined
2045+
> = {
20402046
/** id of the event */
20412047
id?: string | number | null
20422048
/** event name */
2043-
event?: string
2049+
event?: Event
20442050
/** retry in millisecond */
20452051
retry?: number
20462052
/** data to send */
2047-
data?: unknown
2053+
data?: Data
20482054
}

src/utils.ts

Lines changed: 41 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ import type {
1919
AfterResponseHandler,
2020
SchemaValidator,
2121
AnyLocalHook,
22-
SSEPayload
22+
SSEPayload,
23+
Prettify
2324
} from './types'
2425
import { ElysiaFile } from './universal/file'
2526

@@ -1129,6 +1130,10 @@ export const supportPerMethodInlineHandler = (() => {
11291130
return true
11301131
})()
11311132

1133+
type FormatSSEPayload<T = unknown> = T extends string
1134+
? { readonly data: T }
1135+
: Prettify<SSEPayload<T>>
1136+
11321137
/**
11331138
* Return a Server Sent Events (SSE) payload
11341139
*
@@ -1145,18 +1150,44 @@ export const supportPerMethodInlineHandler = (() => {
11451150
* })
11461151
* }
11471152
*/
1148-
export const sse = (
1149-
payload: string | SSEPayload
1150-
): SSEPayload & { toStream(): string } => {
1151-
if (typeof payload === 'string')
1152-
payload = {
1153-
data: payload
1154-
}
1153+
export const sse = <
1154+
const T extends
1155+
| string
1156+
| SSEPayload
1157+
| Generator
1158+
| AsyncGenerator
1159+
| ReadableStream
1160+
>(
1161+
_payload: T
1162+
): T extends string
1163+
? { readonly data: T }
1164+
: T extends SSEPayload
1165+
? T
1166+
: T extends ReadableStream<infer A>
1167+
? ReadableStream<FormatSSEPayload<A>>
1168+
: T extends Generator<infer A, infer B, infer C>
1169+
? Generator<FormatSSEPayload<A>, B, C>
1170+
: T extends AsyncGenerator<infer A, infer B, infer C>
1171+
? AsyncGenerator<FormatSSEPayload<A>, B, C>
1172+
: T => {
1173+
if (_payload instanceof ReadableStream) {
1174+
// @ts-expect-error
1175+
_payload.sse = true
1176+
return _payload as any
1177+
}
1178+
1179+
const payload: SSEPayload =
1180+
typeof _payload === 'string'
1181+
? { data: _payload }
1182+
: (_payload as SSEPayload)
11551183

1156-
if (payload.id === undefined) payload.id = randomId()
1184+
// if (payload.id === undefined) payload.id = randomId()
1185+
1186+
// @ts-ignore
1187+
payload.sse = true
11571188

11581189
// @ts-ignore
1159-
payload.toStream = () => {
1190+
payload.toSSE = () => {
11601191
let payloadString = ''
11611192

11621193
if (payload.id !== undefined && payload.id !== null)

0 commit comments

Comments
 (0)