diff --git a/src/codegate/providers/copilot/provider.py b/src/codegate/providers/copilot/provider.py index 4ba7359d..b514f3bc 100644 --- a/src/codegate/providers/copilot/provider.py +++ b/src/codegate/providers/copilot/provider.py @@ -575,13 +575,6 @@ def _ensure_output_processor(self) -> None: # Already initialized, no need to reinitialize return - # this is a hotfix - we shortcut before selecting the output pipeline for FIM - # because our FIM output pipeline is actually empty as of now. We should fix this - # but don't have any immediate need. - is_fim = self.proxy.context_tracking.metadata.get("is_fim", False) - if is_fim: - return - logger.debug("Tracking context for pipeline processing") self.sse_processor = SSEProcessor() is_fim = self.proxy.context_tracking.metadata.get("is_fim", False) @@ -601,16 +594,23 @@ async def _process_stream(self): async def stream_iterator(): while True: incoming_record = await self.stream_queue.get() + record_content = incoming_record.get("content", {}) streaming_choices = [] for choice in record_content.get("choices", []): + is_fim = self.proxy.context_tracking.metadata.get("is_fim", False) + if is_fim: + content = choice.get("text", "") + else: + content = choice.get("delta", {}).get("content") + streaming_choices.append( StreamingChoices( finish_reason=choice.get("finish_reason", None), index=0, delta=Delta( - content=choice.get("delta", {}).get("content"), role="assistant" + content=content, role="assistant" ), logprobs=None, ) @@ -658,6 +658,9 @@ def _process_chunk(self, chunk: bytes): self.stream_queue.put_nowait(record) def _proxy_transport_write(self, data: bytes): + if not self.proxy.transport or self.proxy.transport.is_closing(): + logger.error("Proxy transport not available") + return self.proxy.transport.write(data) def data_received(self, data: bytes) -> None: