Skip to content

Commit c1a8a0b

Browse files
authored
Merge pull request #259 from FiveSheepCo/feat/support-streaming-comments-merge
Support comments in streaming response
2 parents bb07e02 + 2bbae9b commit c1a8a0b

File tree

4 files changed

+150
-54
lines changed

4 files changed

+150
-54
lines changed
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
//
2+
// StreamInterpreter.swift
3+
// OpenAI
4+
//
5+
// Created by Oleksii Nezhyborets on 03.02.2025.
6+
//
7+
8+
import Foundation
9+
10+
/// https://html.spec.whatwg.org/multipage/server-sent-events.html#event-stream-interpretation
11+
/// 9.2.6 Interpreting an event stream
12+
class StreamInterpreter<ResultType: Codable> {
13+
private let streamingCompletionMarker = "[DONE]"
14+
private var previousChunkBuffer = ""
15+
16+
var onEventDispatched: ((ResultType) -> Void)?
17+
18+
func processData(_ data: Data) throws {
19+
guard let stringContent = String(data: data, encoding: .utf8) else {
20+
throw StreamingError.unknownContent
21+
}
22+
try processJSON(from: stringContent)
23+
}
24+
25+
private func processJSON(from stringContent: String) throws {
26+
if stringContent.isEmpty {
27+
return
28+
}
29+
30+
let fullChunk = "\(previousChunkBuffer)\(stringContent)"
31+
let chunkLines = fullChunk
32+
.components(separatedBy: .newlines)
33+
.map { $0.trimmingCharacters(in: .whitespacesAndNewlines) }
34+
.filter { $0.isEmpty == false }
35+
36+
var jsonObjects: [String] = []
37+
for line in chunkLines {
38+
39+
// Skip comments
40+
if line.starts(with: ":") { continue }
41+
42+
// Get JSON object
43+
let jsonData = line
44+
.components(separatedBy: "data:")
45+
.map { $0.trimmingCharacters(in: .whitespacesAndNewlines) }
46+
.filter { $0.isEmpty == false }
47+
jsonObjects.append(contentsOf: jsonData)
48+
}
49+
50+
previousChunkBuffer = ""
51+
52+
guard jsonObjects.isEmpty == false, jsonObjects.first != streamingCompletionMarker else {
53+
return
54+
}
55+
56+
try jsonObjects.enumerated().forEach { (index, jsonContent) in
57+
guard jsonContent != streamingCompletionMarker && !jsonContent.isEmpty else {
58+
return
59+
}
60+
guard let jsonData = jsonContent.data(using: .utf8) else {
61+
throw StreamingError.unknownContent
62+
}
63+
let decoder = JSONDecoder()
64+
do {
65+
let object = try decoder.decode(ResultType.self, from: jsonData)
66+
onEventDispatched?(object)
67+
} catch {
68+
if let decoded = try? decoder.decode(APIErrorResponse.self, from: jsonData) {
69+
throw decoded
70+
} else if index == jsonObjects.count - 1 {
71+
previousChunkBuffer = "data: \(jsonContent)" // Chunk ends in a partial JSON
72+
} else {
73+
throw error
74+
}
75+
}
76+
}
77+
}
78+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
//
2+
// File.swift
3+
// OpenAI
4+
//
5+
// Created by Oleksii Nezhyborets on 03.02.2025.
6+
//
7+
8+
import Foundation
9+
10+
enum StreamingError: Error {
11+
case unknownContent
12+
case emptyContent
13+
}
Lines changed: 12 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
//
22
// StreamingSession.swift
3-
//
3+
//
44
//
55
// Created by Sergii Kryvoblotskyi on 18/04/2023.
66
//
@@ -11,27 +11,22 @@ import FoundationNetworking
1111
#endif
1212

1313
final class StreamingSession<ResultType: Codable>: NSObject, Identifiable, URLSessionDelegate, URLSessionDataDelegate {
14-
15-
enum StreamingError: Error {
16-
case unknownContent
17-
case emptyContent
18-
}
19-
2014
var onReceiveContent: ((StreamingSession, ResultType) -> Void)?
2115
var onProcessingError: ((StreamingSession, Error) -> Void)?
2216
var onComplete: ((StreamingSession, Error?) -> Void)?
2317

24-
private let streamingCompletionMarker = "[DONE]"
2518
private let urlRequest: URLRequest
2619
private lazy var urlSession: URLSession = {
2720
let session = URLSession(configuration: .default, delegate: self, delegateQueue: nil)
2821
return session
2922
}()
3023

31-
private var previousChunkBuffer = ""
24+
private let interpreter = StreamInterpreter<ResultType>()
3225

3326
init(urlRequest: URLRequest) {
3427
self.urlRequest = urlRequest
28+
super.init()
29+
subscribeToParser()
3530
}
3631

3732
func perform() {
@@ -45,54 +40,17 @@ final class StreamingSession<ResultType: Codable>: NSObject, Identifiable, URLSe
4540
}
4641

4742
func urlSession(_ session: URLSession, dataTask: URLSessionDataTask, didReceive data: Data) {
48-
guard let stringContent = String(data: data, encoding: .utf8) else {
49-
onProcessingError?(self, StreamingError.unknownContent)
50-
return
43+
do {
44+
try interpreter.processData(data)
45+
} catch {
46+
onProcessingError?(self, error)
5147
}
52-
processJSON(from: stringContent)
5348
}
5449

55-
}
56-
57-
extension StreamingSession {
58-
59-
private func processJSON(from stringContent: String) {
60-
if stringContent.isEmpty {
61-
return
62-
}
63-
let jsonObjects = "\(previousChunkBuffer)\(stringContent)"
64-
.trimmingCharacters(in: .whitespacesAndNewlines)
65-
.components(separatedBy: "data:")
66-
.map { $0.trimmingCharacters(in: .whitespacesAndNewlines) }
67-
.filter { $0.isEmpty == false }
68-
69-
previousChunkBuffer = ""
70-
71-
guard jsonObjects.isEmpty == false, jsonObjects.first != streamingCompletionMarker else {
72-
return
73-
}
74-
jsonObjects.enumerated().forEach { (index, jsonContent) in
75-
guard jsonContent != streamingCompletionMarker && !jsonContent.isEmpty else {
76-
return
77-
}
78-
guard let jsonData = jsonContent.data(using: .utf8) else {
79-
onProcessingError?(self, StreamingError.unknownContent)
80-
return
81-
}
82-
let decoder = JSONDecoder()
83-
do {
84-
let object = try decoder.decode(ResultType.self, from: jsonData)
85-
onReceiveContent?(self, object)
86-
} catch {
87-
if let decoded = try? decoder.decode(APIErrorResponse.self, from: jsonData) {
88-
onProcessingError?(self, decoded)
89-
} else if index == jsonObjects.count - 1 {
90-
previousChunkBuffer = "data: \(jsonContent)" // Chunk ends in a partial JSON
91-
} else {
92-
onProcessingError?(self, error)
93-
}
94-
}
50+
private func subscribeToParser() {
51+
interpreter.onEventDispatched = { [weak self] content in
52+
guard let self else { return }
53+
self.onReceiveContent?(self, content)
9554
}
9655
}
97-
9856
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
//
2+
// StreamInterpreterTests.swift
3+
// OpenAI
4+
//
5+
// Created by Oleksii Nezhyborets on 03.02.2025.
6+
//
7+
8+
import Testing
9+
import Foundation
10+
@testable import OpenAI
11+
12+
struct StreamInterpreterTests {
13+
let interpreter = StreamInterpreter<ChatStreamResult>()
14+
15+
@Test func testParseShortMessageResponseStream() throws {
16+
var chatStreamResults: [ChatStreamResult] = []
17+
interpreter.onEventDispatched = { chatStreamResults.append($0) }
18+
19+
try interpreter.processData(chatCompletionChunk())
20+
try interpreter.processData(chatCompletionChunkTermination())
21+
#expect(chatStreamResults.count == 3)
22+
}
23+
24+
// https://html.spec.whatwg.org/multipage/server-sent-events.html#event-stream-interpretation
25+
// If the line starts with a U+003A COLON character (:)
26+
// - Ignore the line.
27+
@Test func testIgnoresLinesStartingWithColon() throws {
28+
var chatStreamResults: [ChatStreamResult] = []
29+
interpreter.onEventDispatched = { chatStreamResults.append($0) }
30+
31+
try interpreter.processData(chatCompletionChunkWithComment())
32+
#expect(chatStreamResults.count == 1)
33+
}
34+
35+
// Chunk with 3 objects. I captured it from a real response. It's a very short response that contains just "Hi"
36+
private func chatCompletionChunk() -> Data {
37+
"data: {\"id\":\"chatcmpl-AwnboO5ZnaUyii9xxC5ZVmM5vGark\",\"object\":\"chat.completion.chunk\",\"created\":1738577084,\"model\":\"gpt-4-0613\",\"service_tier\":\"default\",\"system_fingerprint\":null,\"choices\":[{\"index\":0,\"delta\":{\"role\":\"assistant\",\"content\":\"\",\"refusal\":null},\"logprobs\":null,\"finish_reason\":null}]}\n\ndata: {\"id\":\"chatcmpl-AwnboO5ZnaUyii9xxC5ZVmM5vGark\",\"object\":\"chat.completion.chunk\",\"created\":1738577084,\"model\":\"gpt-4-0613\",\"service_tier\":\"default\",\"system_fingerprint\":null,\"choices\":[{\"index\":0,\"delta\":{\"content\":\"Hi\"},\"logprobs\":null,\"finish_reason\":null}]}\n\ndata: {\"id\":\"chatcmpl-AwnboO5ZnaUyii9xxC5ZVmM5vGark\",\"object\":\"chat.completion.chunk\",\"created\":1738577084,\"model\":\"gpt-4-0613\",\"service_tier\":\"default\",\"system_fingerprint\":null,\"choices\":[{\"index\":0,\"delta\":{},\"logprobs\":null,\"finish_reason\":\"stop\"}]}\n\n".data(using: .utf8)!
38+
}
39+
40+
private func chatCompletionChunkWithComment() -> Data {
41+
": OPENROUTER PROCESSING\n\ndata: {\"id\":\"chatcmpl-AwnboO5ZnaUyii9xxC5ZVmM5vGark\",\"object\":\"chat.completion.chunk\",\"created\":1738577084,\"model\":\"gpt-4-0613\",\"service_tier\":\"default\",\"system_fingerprint\":null,\"choices\":[{\"index\":0,\"delta\":{\"role\":\"assistant\",\"content\":\"\",\"refusal\":null},\"logprobs\":null,\"finish_reason\":null}]}\n\n".data(using: .utf8)!
42+
}
43+
44+
private func chatCompletionChunkTermination() -> Data {
45+
"data: [DONE]\n\n".data(using: .utf8)!
46+
}
47+
}

0 commit comments

Comments
 (0)