Skip to content
This repository was archived by the owner on Jun 5, 2025. It is now read-only.

feat: add alerts sse notification endpoint #227

Merged
merged 3 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions src/codegate/dashboard/dashboard.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
import asyncio
from typing import List
from typing import List, AsyncGenerator

import structlog
from fastapi import APIRouter
from fastapi.responses import StreamingResponse

from codegate.dashboard.post_processing import (
parse_get_alert_conversation,
parse_messages_in_conversations,
)
from codegate.dashboard.request_models import AlertConversation, Conversation
from codegate.db.connection import DbReader
from codegate.db.connection import DbReader, alert_queue

logger = structlog.get_logger("codegate")

Expand All @@ -34,3 +35,19 @@ def get_alerts() -> List[AlertConversation]:
"""
alerts_prompt_output = asyncio.run(db_reader.get_alerts_with_prompt_and_output())
return asyncio.run(parse_get_alert_conversation(alerts_prompt_output))


async def generate_sse_events() -> AsyncGenerator[str, None]:
"""
SSE generator from queue
"""
while True:
message = await alert_queue.get()
yield f"data: {message}\n\n"

@dashboard_router.get("/dashboard/alerts_notification")
async def stream_sse():
"""
Send alerts event
"""
return StreamingResponse(generate_sse_events(), media_type="text/event-stream")
7 changes: 4 additions & 3 deletions src/codegate/db/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import uuid
from pathlib import Path
from typing import AsyncGenerator, AsyncIterator, List, Optional

import structlog
from litellm import ChatCompletionRequest, ModelResponse
from pydantic import BaseModel
Expand All @@ -20,7 +19,7 @@
)

logger = structlog.get_logger("codegate")

alert_queue = asyncio.Queue()

class DbCodeGate:

Expand Down Expand Up @@ -213,7 +212,9 @@ async def record_alerts(self, alerts: List[Alert]) -> None:
async with asyncio.TaskGroup() as tg:
for alert in alerts:
try:
tg.create_task(self._insert_pydantic_model(alert, sql))
result = tg.create_task(self._insert_pydantic_model(alert, sql))
if result and alert.trigger_category == "critical":
await alert_queue.put(f"New alert detected: {alert.timestamp}")
except Exception as e:
logger.error(f"Failed to record alert: {alert}.", error=str(e))
return None
Expand Down
Loading