|
1 | 1 | import asyncio
|
2 |
| -import json |
3 | 2 | import re
|
4 | 3 | import ssl
|
5 | 4 | from dataclasses import dataclass
|
6 | 5 | from typing import Dict, List, Optional, Tuple
|
7 | 6 | from urllib.parse import unquote, urljoin, urlparse
|
8 | 7 |
|
9 | 8 | import structlog
|
| 9 | +from litellm.types.utils import Delta, ModelResponse, StreamingChoices |
10 | 10 |
|
11 | 11 | from codegate.ca.codegate_ca import CertificateAuthority
|
12 | 12 | from codegate.config import Config
|
@@ -568,29 +568,51 @@ def connection_made(self, transport: asyncio.Transport) -> None:
|
568 | 568 |
|
569 | 569 | async def _process_stream(self):
|
570 | 570 | try:
|
| 571 | + |
571 | 572 | async def stream_iterator():
|
572 | 573 | while True:
|
573 | 574 | incoming_record = await self.stream_queue.get()
|
574 |
| - yield incoming_record |
575 |
| - |
576 |
| - async for record in stream_iterator(): |
577 |
| - print("received from stream") |
578 |
| - print(record) |
579 |
| - if record["type"] == "done": |
580 |
| - sse_data = b"data: [DONE]\n\n" |
581 |
| - # Add chunk size for DONE message too |
582 |
| - chunk_size = hex(len(sse_data))[2:] + "\r\n" |
583 |
| - self._proxy_transport_write(chunk_size.encode()) |
584 |
| - self._proxy_transport_write(sse_data) |
585 |
| - self._proxy_transport_write(b"\r\n") |
586 |
| - # Now send the final zero chunk |
587 |
| - self._proxy_transport_write(b"0\r\n\r\n") |
588 |
| - else: |
589 |
| - sse_data = f"data: {json.dumps(record['content'])}\n\n".encode("utf-8") |
590 |
| - chunk_size = hex(len(sse_data))[2:] + "\r\n" |
591 |
| - self._proxy_transport_write(chunk_size.encode()) |
592 |
| - self._proxy_transport_write(sse_data) |
593 |
| - self._proxy_transport_write(b"\r\n") |
| 575 | + record_content = incoming_record.get("content", {}) |
| 576 | + |
| 577 | + streaming_choices = [] |
| 578 | + for choice in record_content.get("choices", []): |
| 579 | + streaming_choices.append( |
| 580 | + StreamingChoices( |
| 581 | + finish_reason=choice.get("finish_reason", None), |
| 582 | + index=0, |
| 583 | + delta=Delta( |
| 584 | + content=choice.get("delta", {}).get("content"), role="assistant" |
| 585 | + ), |
| 586 | + logprobs=None, |
| 587 | + ) |
| 588 | + ) |
| 589 | + |
| 590 | + # Convert record to ModelResponse |
| 591 | + mr = ModelResponse( |
| 592 | + id=record_content.get("id", ""), |
| 593 | + choices=streaming_choices, |
| 594 | + created=record_content.get("created", 0), |
| 595 | + model=record_content.get("model", ""), |
| 596 | + object="chat.completion.chunk", |
| 597 | + ) |
| 598 | + yield mr |
| 599 | + |
| 600 | + async for record in self.output_pipeline_instance.process_stream(stream_iterator()): |
| 601 | + chunk = record.model_dump_json(exclude_none=True, exclude_unset=True) |
| 602 | + sse_data = f"data:{chunk}\n\n".encode("utf-8") |
| 603 | + chunk_size = hex(len(sse_data))[2:] + "\r\n" |
| 604 | + self._proxy_transport_write(chunk_size.encode()) |
| 605 | + self._proxy_transport_write(sse_data) |
| 606 | + self._proxy_transport_write(b"\r\n") |
| 607 | + |
| 608 | + sse_data = b"data: [DONE]\n\n" |
| 609 | + # Add chunk size for DONE message too |
| 610 | + chunk_size = hex(len(sse_data))[2:] + "\r\n" |
| 611 | + self._proxy_transport_write(chunk_size.encode()) |
| 612 | + self._proxy_transport_write(sse_data) |
| 613 | + self._proxy_transport_write(b"\r\n") |
| 614 | + # Now send the final zero chunk |
| 615 | + self._proxy_transport_write(b"0\r\n\r\n") |
594 | 616 |
|
595 | 617 | except Exception as e:
|
596 | 618 | logger.error(f"Error processing stream: {e}")
|
|
0 commit comments