Skip to content
This repository was archived by the owner on Jun 5, 2025. It is now read-only.

Obfuscate text in all messages, not just the last user one + display feedback on the secrets we obfuscated #193

Merged
merged 2 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 90 additions & 32 deletions src/codegate/pipeline/secrets/secrets.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def _extend_match_boundaries(self, text: str, start: int, end: int) -> tuple[int

def _redeact_text(
self, text: str, secrets_manager: SecretsManager, session_id: str, context: PipelineContext
) -> str:
) -> tuple[str, int]:
"""
Find and encrypt secrets in the given text.

Expand All @@ -86,12 +86,12 @@ def _redeact_text(
session_id: ..
context: The pipeline context to be able to log alerts
Returns:
Protected text with encrypted values
Tuple containing protected text with encrypted values and the count of redacted secrets
"""
# Find secrets in the text
matches = CodegateSignatures.find_in_string(text)
if not matches:
return text
return text, 0

logger.debug(f"Found {len(matches)} secrets in the user message")

Expand Down Expand Up @@ -145,60 +145,59 @@ def _redeact_text(
}
)

# Convert back to string
protected_string = "".join(protected_text)
# Convert back to string
protected_string = "".join(protected_text)

# Log the findings
logger.info("\nFound secrets:")
# Log the findings
logger.info("\nFound secrets:")

for secret in found_secrets:
logger.info(f"\nService: {secret['service']}")
logger.info(f"Type: {secret['type']}")
logger.info(f"Original: {secret['original']}")
logger.info(f"Encrypted: REDACTED<${secret['encrypted']}>")
for secret in found_secrets:
logger.info(f"\nService: {secret['service']}")
logger.info(f"Type: {secret['type']}")
logger.info(f"Original: {secret['original']}")
logger.info(f"Encrypted: REDACTED<${secret['encrypted']}>")

logger.info(f"\nProtected text:\n{protected_string}")
return "".join(protected_text)
print(f"\nProtected text:\n{protected_string}")
return protected_string, len(found_secrets)

async def process(
self, request: ChatCompletionRequest, context: PipelineContext
) -> PipelineResult:
"""
Process the request to find and protect secrets.
Process the request to find and protect secrets in all messages.

Args:
request: The chat completion request
context: The pipeline context

Returns:
PipelineResult containing the processed request
PipelineResult containing the processed request and context with redaction metadata
"""
secrets_manager = context.sensitive.manager
if not secrets_manager or not isinstance(secrets_manager, SecretsManager):
# Should this be an error?
raise ValueError("Secrets manager not found in context")
session_id = context.sensitive.session_id
if not session_id:
raise ValueError("Session ID not found in context")

last_user_message = self.get_last_user_message(request)
extracted_string = None
extracted_index = None
if last_user_message:
extracted_string = last_user_message[0]
extracted_index = last_user_message[1]
new_request = request.copy()
total_redacted = 0

# Process all messages
for i, message in enumerate(new_request["messages"]):
if "content" in message and message["content"]:
# Protect the text
protected_string, redacted_count = self._redeact_text(
message["content"], secrets_manager, session_id, context
)
new_request["messages"][i]["content"] = protected_string
total_redacted += redacted_count

if not extracted_string:
return PipelineResult(request=request, context=context)
logger.info(f"Total secrets redacted: {total_redacted}")

# Protect the text
protected_string = self._redeact_text(
extracted_string, secrets_manager, session_id, context
)
# Store the count in context metadata
context.metadata["redacted_secrets_count"] = total_redacted

# Update the user message
new_request = request.copy()
new_request["messages"][extracted_index]["content"] = protected_string
return PipelineResult(request=new_request, context=context)


Expand Down Expand Up @@ -293,3 +292,62 @@ async def process_chunk(

# No markers or partial markers, let pipeline handle the chunk normally
return [chunk]


class SecretRedactionNotifier(OutputPipelineStep):
"""Pipeline step that notifies about redacted secrets in the stream"""

@property
def name(self) -> str:
return "secret-redaction-notifier"

def _create_chunk(self, original_chunk: ModelResponse, content: str) -> ModelResponse:
"""
Creates a new chunk with the given content, preserving the original chunk's metadata
"""
return ModelResponse(
id=original_chunk.id,
choices=[
StreamingChoices(
finish_reason=None,
index=0,
delta=Delta(content=content, role="assistant"),
logprobs=None,
)
],
created=original_chunk.created,
model=original_chunk.model,
object="chat.completion.chunk",
)

async def process_chunk(
self,
chunk: ModelResponse,
context: OutputPipelineContext,
input_context: Optional[PipelineContext] = None,
) -> list[ModelResponse]:
"""Process a single chunk of the stream"""
if (
not input_context
or not input_context.metadata
or input_context.metadata.get("redacted_secrets_count", 0) == 0
):
return [chunk]

# Check if this is the first chunk (delta role will be present, others will not)
if chunk.choices[0].delta.role:
redacted_count = input_context.metadata["redacted_secrets_count"]
secret_text = "secret" if redacted_count == 1 else "secrets"
# Create notification chunk
notification_chunk = self._create_chunk(
chunk,
f"\n🛡️ Codegate prevented {redacted_count} {secret_text} from being leaked by redacting them.\n\n", # noqa
)

# Reset the counter
input_context.metadata["redacted_secrets_count"] = 0

# Return both the notification and original chunk
return [notification_chunk, chunk]

return [chunk]
7 changes: 6 additions & 1 deletion src/codegate/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@
from codegate.pipeline.extract_snippets.output import CodeCommentStep
from codegate.pipeline.output import OutputPipelineProcessor, OutputPipelineStep
from codegate.pipeline.secrets.manager import SecretsManager
from codegate.pipeline.secrets.secrets import CodegateSecrets, SecretUnredactionStep
from codegate.pipeline.secrets.secrets import (
CodegateSecrets,
SecretRedactionNotifier,
SecretUnredactionStep,
)
from codegate.pipeline.secrets.signatures import CodegateSignatures
from codegate.pipeline.system_prompt.codegate import SystemPrompt
from codegate.pipeline.version.version import CodegateVersion
Expand Down Expand Up @@ -50,6 +54,7 @@ def init_app() -> FastAPI:
fim_pipeline = SequentialPipelineProcessor(fim_steps)

output_steps: List[OutputPipelineStep] = [
SecretRedactionNotifier(),
SecretUnredactionStep(),
CodeCommentStep(),
]
Expand Down