Skip to content
This repository was archived by the owner on Jun 5, 2025. It is now read-only.

Add workspace-namespaced APIs for messages and alerts #676

Merged
merged 1 commit into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 20 additions & 5 deletions src/codegate/api/dashboard/dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import requests
import structlog
from fastapi import APIRouter, Depends
from fastapi import APIRouter, Depends, HTTPException
from fastapi.responses import StreamingResponse
from fastapi.routing import APIRoute

Expand All @@ -14,12 +14,15 @@
)
from codegate.api.dashboard.request_models import AlertConversation, Conversation
from codegate.db.connection import DbReader, alert_queue
from codegate.workspaces import crud

logger = structlog.get_logger("codegate")

dashboard_router = APIRouter()
db_reader = None

wscrud = crud.WorkspaceCrud()


def uniq_name(route: APIRoute):
return f"v1_{route.name}"
Expand Down Expand Up @@ -48,9 +51,14 @@ def get_messages(db_reader: DbReader = Depends(get_db_reader)) -> List[Conversat
"""
Get all the messages from the database and return them as a list of conversations.
"""
prompts_outputs = asyncio.run(db_reader.get_prompts_with_output())
try:
active_ws = asyncio.run(wscrud.get_active_workspace())
prompts_outputs = asyncio.run(db_reader.get_prompts_with_output(active_ws.id))

return asyncio.run(parse_messages_in_conversations(prompts_outputs))
return asyncio.run(parse_messages_in_conversations(prompts_outputs))
except Exception as e:
logger.error(f"Error getting messages: {str(e)}")
raise HTTPException(status_code=500, detail="Internal server error")


@dashboard_router.get(
Expand All @@ -60,8 +68,15 @@ def get_alerts(db_reader: DbReader = Depends(get_db_reader)) -> List[Optional[Al
"""
Get all the messages from the database and return them as a list of conversations.
"""
alerts_prompt_output = asyncio.run(db_reader.get_alerts_with_prompt_and_output())
return asyncio.run(parse_get_alert_conversation(alerts_prompt_output))
try:
active_ws = asyncio.run(wscrud.get_active_workspace())
alerts_prompt_output = asyncio.run(
db_reader.get_alerts_with_prompt_and_output(active_ws.id)
)
return asyncio.run(parse_get_alert_conversation(alerts_prompt_output))
except Exception as e:
logger.error(f"Error getting alerts: {str(e)}")
raise HTTPException(status_code=500, detail="Internal server error")


async def generate_sse_events() -> AsyncGenerator[str, None]:
Expand Down
54 changes: 51 additions & 3 deletions src/codegate/api/v1.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
from typing import List, Optional

from fastapi import APIRouter, HTTPException, Response
from fastapi.routing import APIRoute
from pydantic import ValidationError

from codegate.api import v1_models
from codegate.api.dashboard.dashboard import dashboard_router
from codegate.db.connection import AlreadyExistsError
from codegate.api.dashboard import dashboard
from codegate.api.dashboard.request_models import AlertConversation, Conversation
from codegate.db.connection import AlreadyExistsError, DbReader
from codegate.workspaces import crud

v1 = APIRouter()
v1.include_router(dashboard_router)
v1.include_router(dashboard.dashboard_router)
wscrud = crud.WorkspaceCrud()

# This is a singleton object
dbreader = DbReader()


def uniq_name(route: APIRoute):
return f"v1_{route.name}"
Expand Down Expand Up @@ -92,3 +98,45 @@ async def delete_workspace(workspace_name: str):
raise HTTPException(status_code=500, detail="Internal server error")

return Response(status_code=204)


@v1.get(
"/workspaces/{workspace_name}/alerts",
tags=["Workspaces"],
generate_unique_id_function=uniq_name,
)
async def get_workspace_alerts(workspace_name: str) -> List[Optional[AlertConversation]]:
"""Get alerts for a workspace."""
try:
ws = await wscrud.get_workspace_by_name(workspace_name)
except crud.WorkspaceDoesNotExistError:
raise HTTPException(status_code=404, detail="Workspace does not exist")
except Exception:
raise HTTPException(status_code=500, detail="Internal server error")

try:
alerts = await dbreader.get_alerts_with_prompt_and_output(ws.id)
return await dashboard.parse_get_alert_conversation(alerts)
except Exception:
raise HTTPException(status_code=500, detail="Internal server error")


@v1.get(
"/workspaces/{workspace_name}/messages",
tags=["Workspaces"],
generate_unique_id_function=uniq_name,
)
async def get_workspace_messages(workspace_name: str) -> List[Conversation]:
"""Get messages for a workspace."""
try:
ws = await wscrud.get_workspace_by_name(workspace_name)
except crud.WorkspaceDoesNotExistError:
raise HTTPException(status_code=404, detail="Workspace does not exist")
except Exception:
raise HTTPException(status_code=500, detail="Internal server error")

try:
prompts_outputs = await dbreader.get_prompts_with_output(ws.id)
return await dashboard.parse_messages_in_conversations(prompts_outputs)
except Exception:
raise HTTPException(status_code=500, detail="Internal server error")
18 changes: 14 additions & 4 deletions src/codegate/db/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ async def _exec_select_conditions_to_pydantic(
raise e
return None

async def get_prompts_with_output(self) -> List[GetPromptWithOutputsRow]:
async def get_prompts_with_output(self, workpace_id: str) -> List[GetPromptWithOutputsRow]:
sql = text(
"""
SELECT
Expand All @@ -375,13 +375,19 @@ async def get_prompts_with_output(self) -> List[GetPromptWithOutputsRow]:
o.timestamp as output_timestamp
FROM prompts p
LEFT JOIN outputs o ON p.id = o.prompt_id
WHERE p.workspace_id = :workspace_id
ORDER BY o.timestamp DESC
"""
)
prompts = await self._execute_select_pydantic_model(GetPromptWithOutputsRow, sql)
conditions = {"workspace_id": workpace_id}
prompts = await self._exec_select_conditions_to_pydantic(
GetPromptWithOutputsRow, sql, conditions, should_raise=True
)
return prompts

async def get_alerts_with_prompt_and_output(self) -> List[GetAlertsWithPromptAndOutputRow]:
async def get_alerts_with_prompt_and_output(
self, workspace_id: str
) -> List[GetAlertsWithPromptAndOutputRow]:
sql = text(
"""
SELECT
Expand All @@ -402,10 +408,14 @@ async def get_alerts_with_prompt_and_output(self) -> List[GetAlertsWithPromptAnd
FROM alerts a
LEFT JOIN prompts p ON p.id = a.prompt_id
LEFT JOIN outputs o ON p.id = o.prompt_id
WHERE p.workspace_id = :workspace_id
ORDER BY a.timestamp DESC
"""
)
prompts = await self._execute_select_pydantic_model(GetAlertsWithPromptAndOutputRow, sql)
conditions = {"workspace_id": workspace_id}
prompts = await self._exec_select_conditions_to_pydantic(
GetAlertsWithPromptAndOutputRow, sql, conditions, should_raise=True
)
return prompts

async def get_workspaces(self) -> List[WorkspaceActive]:
Expand Down
Loading