From 2dddc0805a71d0c60ff461579c90a6e01726f435 Mon Sep 17 00:00:00 2001 From: Giuseppe Scuglia Date: Fri, 6 Dec 2024 18:20:38 +0100 Subject: [PATCH 1/3] feat: add alerts sse notification endpoint --- src/codegate/db/connection.py | 7 ++++--- src/codegate/server.py | 4 ++++ src/codegate/sse.py | 22 ++++++++++++++++++++++ 3 files changed, 30 insertions(+), 3 deletions(-) create mode 100644 src/codegate/sse.py diff --git a/src/codegate/db/connection.py b/src/codegate/db/connection.py index feafefd3..e64bac21 100644 --- a/src/codegate/db/connection.py +++ b/src/codegate/db/connection.py @@ -5,7 +5,6 @@ import uuid from pathlib import Path from typing import AsyncGenerator, AsyncIterator, List, Optional - import structlog from litellm import ChatCompletionRequest, ModelResponse from pydantic import BaseModel @@ -20,7 +19,7 @@ ) logger = structlog.get_logger("codegate") - +alert_queue = asyncio.Queue() class DbCodeGate: @@ -213,7 +212,9 @@ async def record_alerts(self, alerts: List[Alert]) -> None: async with asyncio.TaskGroup() as tg: for alert in alerts: try: - tg.create_task(self._insert_pydantic_model(alert, sql)) + result = tg.create_task(self._insert_pydantic_model(alert, sql)) + if result and alert.trigger_category == "critical": + await alert_queue.put(f"New alert detected: {alert.timestamp}") except Exception as e: logger.error(f"Failed to record alert: {alert}.", error=str(e)) return None diff --git a/src/codegate/server.py b/src/codegate/server.py index e5114b05..7b1e7ee4 100644 --- a/src/codegate/server.py +++ b/src/codegate/server.py @@ -6,6 +6,7 @@ from codegate import __description__, __version__ from codegate.config import Config from codegate.dashboard.dashboard import dashboard_router +from codegate.sse import router as sse_router from codegate.pipeline.base import PipelineStep, SequentialPipelineProcessor from codegate.pipeline.codegate_context_retriever.codegate import CodegateContextRetriever from codegate.pipeline.extract_snippets.extract_snippets import CodeSnippetExtractor @@ -134,4 +135,7 @@ async def health_check(): # Include the routes for the dashboard app.include_router(dashboard_router) + # Include the routes for the SSE + app.include_router(sse_router) + return app diff --git a/src/codegate/sse.py b/src/codegate/sse.py new file mode 100644 index 00000000..b6517dee --- /dev/null +++ b/src/codegate/sse.py @@ -0,0 +1,22 @@ +from fastapi import APIRouter +from fastapi.responses import StreamingResponse +from typing import AsyncGenerator + +from codegate.db.connection import alert_queue + +router = APIRouter(tags=["SSE"]) + +async def generate_sse_events() -> AsyncGenerator[str, None]: + """ + SSE generator from queue + """ + while True: + message = await alert_queue.get() + yield f"data: {message}\n\n" + +@router.get("/alerts_notification") +async def stream_sse(): + """ + Send alerts event + """ + return StreamingResponse(generate_sse_events(), media_type="text/event-stream") \ No newline at end of file From af46a59555bb6485ed3376ca073b5c00762e1738 Mon Sep 17 00:00:00 2001 From: Giuseppe Scuglia Date: Mon, 9 Dec 2024 12:40:01 +0100 Subject: [PATCH 2/3] move sse endpoint under dashboard route --- src/codegate/dashboard/dashboard.py | 21 +++++++++++++++++++-- src/codegate/server.py | 4 ---- 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/src/codegate/dashboard/dashboard.py b/src/codegate/dashboard/dashboard.py index 2814c494..b844d224 100644 --- a/src/codegate/dashboard/dashboard.py +++ b/src/codegate/dashboard/dashboard.py @@ -1,15 +1,16 @@ import asyncio -from typing import List +from typing import List, AsyncGenerator import structlog from fastapi import APIRouter +from fastapi.responses import StreamingResponse from codegate.dashboard.post_processing import ( parse_get_alert_conversation, parse_messages_in_conversations, ) from codegate.dashboard.request_models import AlertConversation, Conversation -from codegate.db.connection import DbReader +from codegate.db.connection import DbReader, alert_queue logger = structlog.get_logger("codegate") @@ -34,3 +35,19 @@ def get_alerts() -> List[AlertConversation]: """ alerts_prompt_output = asyncio.run(db_reader.get_alerts_with_prompt_and_output()) return asyncio.run(parse_get_alert_conversation(alerts_prompt_output)) + + +async def generate_sse_events() -> AsyncGenerator[str, None]: + """ + SSE generator from queue + """ + while True: + message = await alert_queue.get() + yield f"data: {message}\n\n" + +@dashboard_router.get("/dashboard/alerts_notification") +async def stream_sse(): + """ + Send alerts event + """ + return StreamingResponse(generate_sse_events(), media_type="text/event-stream") diff --git a/src/codegate/server.py b/src/codegate/server.py index 7b1e7ee4..e5114b05 100644 --- a/src/codegate/server.py +++ b/src/codegate/server.py @@ -6,7 +6,6 @@ from codegate import __description__, __version__ from codegate.config import Config from codegate.dashboard.dashboard import dashboard_router -from codegate.sse import router as sse_router from codegate.pipeline.base import PipelineStep, SequentialPipelineProcessor from codegate.pipeline.codegate_context_retriever.codegate import CodegateContextRetriever from codegate.pipeline.extract_snippets.extract_snippets import CodeSnippetExtractor @@ -135,7 +134,4 @@ async def health_check(): # Include the routes for the dashboard app.include_router(dashboard_router) - # Include the routes for the SSE - app.include_router(sse_router) - return app From f75bf61216536eb36e882163ddba4207f6b540c1 Mon Sep 17 00:00:00 2001 From: Giuseppe Scuglia Date: Mon, 9 Dec 2024 12:46:25 +0100 Subject: [PATCH 3/3] leftover --- src/codegate/sse.py | 22 ---------------------- 1 file changed, 22 deletions(-) delete mode 100644 src/codegate/sse.py diff --git a/src/codegate/sse.py b/src/codegate/sse.py deleted file mode 100644 index b6517dee..00000000 --- a/src/codegate/sse.py +++ /dev/null @@ -1,22 +0,0 @@ -from fastapi import APIRouter -from fastapi.responses import StreamingResponse -from typing import AsyncGenerator - -from codegate.db.connection import alert_queue - -router = APIRouter(tags=["SSE"]) - -async def generate_sse_events() -> AsyncGenerator[str, None]: - """ - SSE generator from queue - """ - while True: - message = await alert_queue.get() - yield f"data: {message}\n\n" - -@router.get("/alerts_notification") -async def stream_sse(): - """ - Send alerts event - """ - return StreamingResponse(generate_sse_events(), media_type="text/event-stream") \ No newline at end of file