Skip to content
This repository was archived by the owner on Jun 5, 2025. It is now read-only.

feat: remove not needed encryption of secrets #1123

Merged
merged 9 commits into from
Mar 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/codegate/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from codegate.config import Config, ConfigurationError
from codegate.db.connection import init_db_sync, init_session_if_not_exists
from codegate.pipeline.factory import PipelineFactory
from codegate.pipeline.secrets.manager import SecretsManager
from codegate.pipeline.sensitive_data.manager import SensitiveDataManager
from codegate.providers import crud as provendcrud
from codegate.providers.copilot.provider import CopilotProvider
from codegate.server import init_app
Expand Down Expand Up @@ -331,8 +331,8 @@ def serve( # noqa: C901
click.echo("Existing Certificates are already present.")

# Initialize secrets manager and pipeline factory
secrets_manager = SecretsManager()
pipeline_factory = PipelineFactory(secrets_manager)
sensitive_data_manager = SensitiveDataManager()
pipeline_factory = PipelineFactory(sensitive_data_manager)

app = init_app(pipeline_factory)

Expand Down
27 changes: 8 additions & 19 deletions src/codegate/pipeline/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,34 +12,23 @@
from codegate.clients.clients import ClientType
from codegate.db.models import Alert, AlertSeverity, Output, Prompt
from codegate.extract_snippets.message_extractor import CodeSnippet
from codegate.pipeline.secrets.manager import SecretsManager
from codegate.pipeline.sensitive_data.manager import SensitiveDataManager

logger = structlog.get_logger("codegate")


@dataclass
class PipelineSensitiveData:
manager: SecretsManager
manager: SensitiveDataManager
session_id: str
api_key: Optional[str] = None
model: Optional[str] = None
provider: Optional[str] = None
api_base: Optional[str] = None

def secure_cleanup(self):
"""Securely cleanup sensitive data for this session"""
if self.manager is None or self.session_id == "":
return

self.manager.cleanup_session(self.session_id)
self.session_id = ""

# Securely wipe the API key using the same method as secrets manager
if self.api_key is not None:
api_key_bytes = bytearray(self.api_key.encode())
self.manager.crypto.wipe_bytearray(api_key_bytes)
self.api_key = None

self.model = None


Expand Down Expand Up @@ -274,19 +263,19 @@ class InputPipelineInstance:
def __init__(
self,
pipeline_steps: List[PipelineStep],
secret_manager: SecretsManager,
sensitive_data_manager: SensitiveDataManager,
is_fim: bool,
client: ClientType = ClientType.GENERIC,
):
self.pipeline_steps = pipeline_steps
self.secret_manager = secret_manager
self.sensitive_data_manager = sensitive_data_manager
self.is_fim = is_fim
self.context = PipelineContext(client=client)

# we create the sesitive context here so that it is not shared between individual requests
# TODO: could we get away with just generating the session ID for an instance?
self.context.sensitive = PipelineSensitiveData(
manager=self.secret_manager,
manager=self.sensitive_data_manager,
session_id=str(uuid.uuid4()),
)
self.context.metadata["is_fim"] = is_fim
Expand Down Expand Up @@ -343,20 +332,20 @@ class SequentialPipelineProcessor:
def __init__(
self,
pipeline_steps: List[PipelineStep],
secret_manager: SecretsManager,
sensitive_data_manager: SensitiveDataManager,
client_type: ClientType,
is_fim: bool,
):
self.pipeline_steps = pipeline_steps
self.secret_manager = secret_manager
self.sensitive_data_manager = sensitive_data_manager
self.is_fim = is_fim
self.instance = self._create_instance(client_type)

def _create_instance(self, client_type: ClientType) -> InputPipelineInstance:
"""Create a new pipeline instance for processing a request"""
return InputPipelineInstance(
self.pipeline_steps,
self.secret_manager,
self.sensitive_data_manager,
self.is_fim,
client_type,
)
Expand Down
14 changes: 7 additions & 7 deletions src/codegate/pipeline/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,18 @@
PiiRedactionNotifier,
PiiUnRedactionStep,
)
from codegate.pipeline.secrets.manager import SecretsManager
from codegate.pipeline.secrets.secrets import (
CodegateSecrets,
SecretRedactionNotifier,
SecretUnredactionStep,
)
from codegate.pipeline.sensitive_data.manager import SensitiveDataManager
from codegate.pipeline.system_prompt.codegate import SystemPrompt


class PipelineFactory:
def __init__(self, secrets_manager: SecretsManager):
self.secrets_manager = secrets_manager
def __init__(self, sensitive_data_manager: SensitiveDataManager):
self.sensitive_data_manager = sensitive_data_manager

def create_input_pipeline(self, client_type: ClientType) -> SequentialPipelineProcessor:
input_steps: List[PipelineStep] = [
Expand All @@ -32,7 +32,7 @@ def create_input_pipeline(self, client_type: ClientType) -> SequentialPipelinePr
# and without obfuscating the secrets, we'd leak the secrets during those
# later steps
CodegateSecrets(),
CodegatePii(),
CodegatePii(self.sensitive_data_manager),
CodegateCli(),
CodegateContextRetriever(),
SystemPrompt(
Expand All @@ -41,19 +41,19 @@ def create_input_pipeline(self, client_type: ClientType) -> SequentialPipelinePr
]
return SequentialPipelineProcessor(
input_steps,
self.secrets_manager,
self.sensitive_data_manager,
client_type,
is_fim=False,
)

def create_fim_pipeline(self, client_type: ClientType) -> SequentialPipelineProcessor:
fim_steps: List[PipelineStep] = [
CodegateSecrets(),
CodegatePii(),
CodegatePii(self.sensitive_data_manager),
]
return SequentialPipelineProcessor(
fim_steps,
self.secrets_manager,
self.sensitive_data_manager,
client_type,
is_fim=True,
)
Expand Down
120 changes: 18 additions & 102 deletions src/codegate/pipeline/pii/analyzer.py
Original file line number Diff line number Diff line change
@@ -1,47 +1,16 @@
import uuid
from typing import Any, Dict, List, Optional, Tuple
from typing import Any, List, Optional

import structlog
from presidio_analyzer import AnalyzerEngine
from presidio_anonymizer import AnonymizerEngine

from codegate.db.models import AlertSeverity
from codegate.pipeline.base import PipelineContext
from codegate.pipeline.sensitive_data.session_store import SessionStore

logger = structlog.get_logger("codegate.pii.analyzer")


class PiiSessionStore:
"""
A class to manage PII (Personally Identifiable Information) session storage.

Attributes:
session_id (str): The unique identifier for the session. If not provided, a new UUID
is generated. mappings (Dict[str, str]): A dictionary to store mappings between UUID
placeholders and PII.

Methods:
add_mapping(pii: str) -> str:
Adds a PII string to the session store and returns a UUID placeholder for it.

get_pii(uuid_placeholder: str) -> str:
Retrieves the PII string associated with the given UUID placeholder. If the placeholder
is not found, returns the placeholder itself.
"""

def __init__(self, session_id: str = None):
self.session_id = session_id or str(uuid.uuid4())
self.mappings: Dict[str, str] = {}

def add_mapping(self, pii: str) -> str:
uuid_placeholder = f"<{str(uuid.uuid4())}>"
self.mappings[uuid_placeholder] = pii
return uuid_placeholder

def get_pii(self, uuid_placeholder: str) -> str:
return self.mappings.get(uuid_placeholder, uuid_placeholder)


class PiiAnalyzer:
"""
PiiAnalyzer class for analyzing and anonymizing text containing PII.
Expand All @@ -52,12 +21,12 @@ class PiiAnalyzer:
Get or create the singleton instance of PiiAnalyzer.
analyze:
text (str): The text to analyze for PII.
Tuple[str, List[Dict[str, Any]], PiiSessionStore]: The anonymized text, a list of
Tuple[str, List[Dict[str, Any]], SessionStore]: The anonymized text, a list of
found PII details, and the session store.
entities (List[str]): The PII entities to analyze for.
restore_pii:
anonymized_text (str): The text with anonymized PII.
session_store (PiiSessionStore): The PiiSessionStore used for anonymization.
session_store (SessionStore): The SessionStore used for anonymization.
str: The text with original PII restored.
"""

Expand Down Expand Up @@ -95,13 +64,11 @@ def __init__(self):
# Create analyzer with custom NLP engine
self.analyzer = AnalyzerEngine(nlp_engine=nlp_engine)
self.anonymizer = AnonymizerEngine()
self.session_store = PiiSessionStore()
self.session_store = SessionStore()

PiiAnalyzer._instance = self

def analyze(
self, text: str, context: Optional[PipelineContext] = None
) -> Tuple[str, List[Dict[str, Any]], PiiSessionStore]:
def analyze(self, text: str, context: Optional[PipelineContext] = None) -> List:
# Prioritize credit card detection first
entities = [
"PHONE_NUMBER",
Expand All @@ -125,81 +92,30 @@ def analyze(
language="en",
score_threshold=0.3, # Lower threshold to catch more potential matches
)
return analyzer_results

# Track found PII
found_pii = []

# Only anonymize if PII was found
if analyzer_results:
# Log each found PII instance and anonymize
anonymized_text = text
for result in analyzer_results:
pii_value = text[result.start : result.end]
uuid_placeholder = self.session_store.add_mapping(pii_value)
pii_info = {
"type": result.entity_type,
"value": pii_value,
"score": result.score,
"start": result.start,
"end": result.end,
"uuid_placeholder": uuid_placeholder,
}
found_pii.append(pii_info)
anonymized_text = anonymized_text.replace(pii_value, uuid_placeholder)

# Log each PII detection with its UUID mapping
logger.info(
"PII detected and mapped",
pii_type=result.entity_type,
score=f"{result.score:.2f}",
uuid=uuid_placeholder,
# Don't log the actual PII value for security
value_length=len(pii_value),
session_id=self.session_store.session_id,
)

# Log summary of all PII found in this analysis
if found_pii and context:
# Create notification string for alert
notify_string = (
f"**PII Detected** 🔒\n"
f"- Total PII Found: {len(found_pii)}\n"
f"- Types Found: {', '.join(set(p['type'] for p in found_pii))}\n"
)
context.add_alert(
self._name,
trigger_string=notify_string,
severity_category=AlertSeverity.CRITICAL,
)

logger.info(
"PII analysis complete",
total_pii_found=len(found_pii),
pii_types=[p["type"] for p in found_pii],
session_id=self.session_store.session_id,
)

# Return the anonymized text, PII details, and session store
return anonymized_text, found_pii, self.session_store

# If no PII found, return original text, empty list, and session store
return text, [], self.session_store

def restore_pii(self, anonymized_text: str, session_store: PiiSessionStore) -> str:
def restore_pii(self, session_id: str, anonymized_text: str) -> str:
"""
Restore the original PII (Personally Identifiable Information) in the given anonymized text.

This method replaces placeholders in the anonymized text with their corresponding original
PII values using the mappings stored in the provided PiiSessionStore.
PII values using the mappings stored in the provided SessionStore.

Args:
anonymized_text (str): The text containing placeholders for PII.
session_store (PiiSessionStore): The session store containing mappings of placeholders
session_id (str): The session id containing mappings of placeholders
to original PII.

Returns:
str: The text with the original PII restored.
"""
for uuid_placeholder, original_pii in session_store.mappings.items():
session_data = self.session_store.get_by_session_id(session_id)
if not session_data:
logger.warning(
"No active PII session found for given session ID. Unable to restore PII."
)
return anonymized_text

for uuid_placeholder, original_pii in session_data.items():
anonymized_text = anonymized_text.replace(uuid_placeholder, original_pii)
return anonymized_text
Loading