-
Notifications
You must be signed in to change notification settings - Fork 74
feat(code-interpreter): Add convenience methods for file operations and package management #202
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,10 +4,11 @@ | |
| applications to start, stop, and invoke code execution in a managed sandbox environment. | ||
| """ | ||
|
|
||
| import base64 | ||
| import logging | ||
| import uuid | ||
| from contextlib import contextmanager | ||
| from typing import Dict, Generator, Optional | ||
| from typing import Any, Dict, Generator, List, Optional, Union | ||
|
|
||
| import boto3 | ||
|
|
||
|
|
@@ -31,6 +32,30 @@ class CodeInterpreter: | |
| client: The boto3 client for interacting with the service. | ||
| identifier (str, optional): The code interpreter identifier. | ||
| session_id (str, optional): The active session ID. | ||
|
|
||
| Basic Usage: | ||
| >>> from bedrock_agentcore.tools.code_interpreter_client import CodeInterpreter | ||
| >>> | ||
| >>> client = CodeInterpreter('us-west-2') | ||
| >>> client.start() | ||
| >>> | ||
| >>> # Execute code | ||
| >>> result = client.execute_code("print('Hello, World!')") | ||
| >>> | ||
| >>> # Install packages | ||
| >>> client.install_packages(['pandas', 'matplotlib']) | ||
| >>> | ||
| >>> # Upload and process data | ||
| >>> client.upload_file('data.csv', csv_content, description='Sales data') | ||
| >>> | ||
| >>> client.stop() | ||
|
|
||
| Context Manager Usage: | ||
| >>> from bedrock_agentcore.tools.code_interpreter_client import code_session | ||
| >>> | ||
| >>> with code_session('us-west-2') as client: | ||
| ... client.install_packages(['numpy']) | ||
| ... result = client.execute_code('import numpy as np; print(np.pi)') | ||
| """ | ||
|
|
||
| def __init__(self, region: str, session: Optional[boto3.Session] = None) -> None: | ||
|
|
@@ -62,6 +87,7 @@ def __init__(self, region: str, session: Optional[boto3.Session] = None) -> None | |
|
|
||
| self._identifier = None | ||
| self._session_id = None | ||
| self._file_descriptions: Dict[str, str] = {} | ||
|
|
||
| @property | ||
| def identifier(self) -> Optional[str]: | ||
|
|
@@ -404,6 +430,296 @@ def invoke(self, method: str, params: Optional[Dict] = None): | |
| arguments=params or {}, | ||
| ) | ||
|
|
||
| def upload_file( | ||
| self, | ||
| path: str, | ||
| content: Union[str, bytes], | ||
| description: str = "", | ||
| ) -> Dict[str, Any]: | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. are we not able to strengthen the return type further here? what's the expected shape we plan to return here?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The return type is Dict[str, Any] because invoke() returns that. We could define a TypedDict for the response shape, but it couples us to the API response structure. For now, keeping Dict[str, Any] gives flexibility. Can revisit if we want stricter typing across the SDK. |
||
| r"""Upload a file to the code interpreter environment. | ||
|
|
||
| This is a convenience wrapper around the writeFiles method that provides | ||
| a cleaner interface for file uploads with optional semantic descriptions. | ||
|
|
||
| Args: | ||
| path: Relative path where the file should be saved (e.g., 'data.csv', | ||
| 'scripts/analysis.py'). Must be relative to the working directory. | ||
| Absolute paths starting with '/' are not allowed. | ||
| content: File content as string (text files) or bytes (binary files). | ||
| Binary content will be base64 encoded automatically. | ||
| description: Optional semantic description of the file contents. | ||
| This is stored as metadata and can help LLMs understand | ||
| the data structure (e.g., "CSV with columns: date, revenue, product_id"). | ||
|
|
||
| Returns: | ||
| Dict containing the result of the write operation. | ||
|
|
||
| Raises: | ||
| ValueError: If path is absolute or content type is invalid. | ||
|
|
||
| Example: | ||
| >>> # Upload a CSV file | ||
| >>> client.upload_file( | ||
| ... path='sales_data.csv', | ||
| ... content='date,revenue\n2024-01-01,1000\n2024-01-02,1500', | ||
| ... description='Daily sales data with columns: date, revenue' | ||
| ... ) | ||
|
|
||
| >>> # Upload a Python script | ||
| >>> client.upload_file( | ||
| ... path='scripts/analyze.py', | ||
| ... content='import pandas as pd\ndf = pd.read_csv("sales_data.csv")' | ||
| ... ) | ||
| """ | ||
| if path.startswith("/"): | ||
| raise ValueError( | ||
| f"Path must be relative, not absolute. Got: {path}. Use paths like 'data.csv' or 'scripts/analysis.py'." | ||
| ) | ||
|
|
||
| # Handle binary content | ||
| if isinstance(content, bytes): | ||
| file_content = {"path": path, "blob": base64.b64encode(content).decode("utf-8")} | ||
| else: | ||
| file_content = {"path": path, "text": content} | ||
|
|
||
| if description: | ||
| self.logger.info("Uploading file: %s (%s)", path, description) | ||
| else: | ||
| self.logger.info("Uploading file: %s", path) | ||
|
|
||
| result = self.invoke("writeFiles", {"content": [file_content]}) | ||
|
|
||
| # Store description as metadata (available for future LLM context) | ||
| if description and hasattr(self, "_file_descriptions"): | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we need the hasattr check here? I see _file_descriptions is defined above as {}?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. will remove this.. |
||
| self._file_descriptions[path] = description | ||
|
|
||
| return result | ||
|
|
||
| def upload_files( | ||
| self, | ||
| files: List[Dict[str, str]], | ||
| ) -> Dict[str, Any]: | ||
| """Upload multiple files to the code interpreter environment. | ||
|
|
||
| Args: | ||
| files: List of file specifications, each containing: | ||
| - 'path': Relative file path | ||
| - 'content': File content (string or bytes) | ||
| - 'description': Optional semantic description | ||
|
|
||
| Returns: | ||
| Dict containing the result of the write operation. | ||
|
|
||
| Example: | ||
| >>> client.upload_files([ | ||
| ... {'path': 'data.csv', 'content': csv_data, 'description': 'Sales data'}, | ||
| ... {'path': 'config.json', 'content': json_config} | ||
| ... ]) | ||
| """ | ||
| file_contents = [] | ||
| for file_spec in files: | ||
| path = file_spec["path"] | ||
| content = file_spec["content"] | ||
|
|
||
| if path.startswith("/"): | ||
| raise ValueError(f"Path must be relative, not absolute. Got: {path}") | ||
|
|
||
| if isinstance(content, bytes): | ||
| file_contents.append({"path": path, "blob": base64.b64encode(content).decode("utf-8")}) | ||
| else: | ||
| file_contents.append({"path": path, "text": content}) | ||
|
|
||
| self.logger.info("Uploading %d files", len(files)) | ||
| return self.invoke("writeFiles", {"content": file_contents}) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. didn't follow this. we are taking list of files and there is no batch file upload API in code interpreter? so we should just call upload_file utility we have above? Also need to see how to handle partial failure in this case. Throwing an exception even if one file upload files should be ok and users will have to retry entire batch. Some advanced support would be to instead return list of responses back and let clients handle which ones to retry with
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The writeFiles API does accept a list, so it is a batch operation at the API level. But your point about reusing upload_file and handling partial failures is valid. Two options:
I'd lean toward Option A for now since the API handles it atomically - either all succeed or all fail. We can add advanced partial-failure handling as a follow-up if users request it. |
||
|
|
||
| def install_packages( | ||
| self, | ||
| packages: List[str], | ||
| upgrade: bool = False, | ||
| ) -> Dict[str, Any]: | ||
| """Install Python packages in the code interpreter environment. | ||
|
|
||
| This is a convenience wrapper around executeCommand that handles | ||
| pip install commands with proper formatting. | ||
|
|
||
| Args: | ||
| packages: List of package names to install. Can include version | ||
| specifiers (e.g., ['pandas>=2.0', 'numpy', 'scikit-learn==1.3.0']). | ||
| upgrade: If True, adds --upgrade flag to update existing packages. | ||
|
|
||
| Returns: | ||
| Dict containing the command execution result with stdout/stderr. | ||
|
|
||
| Example: | ||
| >>> # Install multiple packages | ||
| >>> client.install_packages(['pandas', 'matplotlib', 'scikit-learn']) | ||
|
|
||
| >>> # Install with version constraints | ||
| >>> client.install_packages(['pandas>=2.0', 'numpy<2.0']) | ||
|
|
||
| >>> # Upgrade existing packages | ||
| >>> client.install_packages(['pandas'], upgrade=True) | ||
| """ | ||
| if not packages: | ||
| raise ValueError("At least one package name must be provided") | ||
|
|
||
| # Sanitize package names (basic validation) | ||
| for pkg in packages: | ||
| if any(char in pkg for char in [";", "&", "|", "`", "$"]): | ||
| raise ValueError(f"Invalid characters in package name: {pkg}") | ||
|
|
||
| packages_str = " ".join(packages) | ||
| upgrade_flag = "--upgrade " if upgrade else "" | ||
| command = f"pip install {upgrade_flag}{packages_str}" | ||
|
|
||
| self.logger.info("Installing packages: %s", packages_str) | ||
| return self.invoke("executeCommand", {"command": command}) | ||
|
|
||
| def download_file( | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we call this read/write file to keep it mapped to API input names? (we may introduce download/upload for some future operations related to blob storage upload/download)
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i think upload_file/download_file are more intuitive from a user perspective than write_file/read_file. The existing invoke('writeFiles') and invoke('readFiles') are the low-level API - these convenience methods add some semantic clarity. Suggest we keep upload_file/download_file for now, and if we add blob storage later, we can name those upload_to_s3/download_from_s3 or similar. |
||
| self, | ||
| path: str, | ||
| ) -> str: | ||
| """Download/read a file from the code interpreter environment. | ||
|
|
||
| Args: | ||
| path: Path to the file to read. | ||
|
|
||
| Returns: | ||
| File content as string. | ||
|
|
||
| Raises: | ||
| FileNotFoundError: If the file doesn't exist. | ||
|
|
||
| Example: | ||
| >>> # Read a generated file | ||
| >>> content = client.download_file('output/results.csv') | ||
| >>> print(content) | ||
| """ | ||
| self.logger.info("Downloading file: %s", path) | ||
| result = self.invoke("readFiles", {"paths": [path]}) | ||
|
|
||
| # Parse the response to extract file content | ||
| # Response structure from the API | ||
| if "stream" in result: | ||
| for event in result["stream"]: | ||
| if "result" in event: | ||
| for content_item in event["result"].get("content", []): | ||
| if content_item.get("type") == "resource": | ||
| resource = content_item.get("resource", {}) | ||
| if "text" in resource: | ||
| return resource["text"] | ||
| elif "blob" in resource: | ||
| return base64.b64decode(resource["blob"]).decode("utf-8") | ||
|
|
||
| raise FileNotFoundError(f"Could not read file: {path}") | ||
|
|
||
| def download_files( | ||
| self, | ||
| paths: List[str], | ||
| ) -> Dict[str, str]: | ||
| """Download/read multiple files from the code interpreter environment. | ||
|
|
||
| Args: | ||
| paths: List of file paths to read. | ||
|
|
||
| Returns: | ||
| Dict mapping file paths to their contents. | ||
|
|
||
| Example: | ||
| >>> files = client.download_files(['data.csv', 'results.json']) | ||
| >>> print(files['data.csv']) | ||
| """ | ||
| self.logger.info("Downloading %d files", len(paths)) | ||
| result = self.invoke("readFiles", {"paths": paths}) | ||
|
|
||
| files = {} | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we re_use the utility method above to download_file?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, makes sense. Will refactor to reuse download_file |
||
| if "stream" in result: | ||
| for event in result["stream"]: | ||
| if "result" in event: | ||
| for content_item in event["result"].get("content", []): | ||
| if content_item.get("type") == "resource": | ||
| resource = content_item.get("resource", {}) | ||
| uri = resource.get("uri", "") | ||
| file_path = uri.replace("file://", "") | ||
|
|
||
| if "text" in resource: | ||
| files[file_path] = resource["text"] | ||
| elif "blob" in resource: | ||
| files[file_path] = base64.b64decode(resource["blob"]).decode("utf-8") | ||
|
|
||
| return files | ||
|
|
||
| def execute_code( | ||
| self, | ||
| code: str, | ||
| language: str = "python", | ||
| clear_context: bool = False, | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. minor note: this clear_context is only application for python and is not supported for other languages yet.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point. Will update the docstring to clarify this limitation. |
||
| ) -> Dict[str, Any]: | ||
| """Execute code in the interpreter environment. | ||
|
|
||
| This is a convenience wrapper around the executeCode method with | ||
| typed parameters for better IDE support and validation. | ||
|
|
||
| Args: | ||
| code: The code to execute. | ||
| language: Programming language - 'python', 'javascript', or 'typescript'. | ||
| Default is 'python'. | ||
| clear_context: If True, clears all previous variable state before execution. | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we consider adding a first class method to clear context? so that users can write code such as: so it's easy and clean to clear context instead of always needing to set it along with another execute code call.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. cleaner API. Will add. |
||
| Default is False (variables persist across calls). | ||
|
|
||
| Returns: | ||
| Dict containing execution results including stdout, stderr, exit_code. | ||
|
|
||
| Example: | ||
| >>> # Execute Python code | ||
| >>> result = client.execute_code(''' | ||
| ... import pandas as pd | ||
| ... df = pd.DataFrame({'a': [1, 2, 3], 'b': [4, 5, 6]}) | ||
| ... print(df.describe()) | ||
| ... ''') | ||
|
|
||
| >>> # Clear context and start fresh | ||
| >>> result = client.execute_code('x = 10', clear_context=True) | ||
| """ | ||
| valid_languages = ["python", "javascript", "typescript"] | ||
| if language not in valid_languages: | ||
| raise ValueError(f"Language must be one of {valid_languages}, got: {language}") | ||
|
|
||
| self.logger.info("Executing %s code (%d chars)", language, len(code)) | ||
|
|
||
| return self.invoke( | ||
| "executeCode", | ||
| { | ||
| "code": code, | ||
| "language": language, | ||
| "clearContext": clear_context, | ||
| }, | ||
| ) | ||
|
|
||
| def execute_shell( | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we call this exec() or execute_command() to keep consistency with API?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sure, execute_command matches the API name better. Will rename. |
||
| self, | ||
| command: str, | ||
| ) -> Dict[str, Any]: | ||
| """Execute a shell command in the interpreter environment. | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we also have few command task related API's we could have wrappers for this can also be a follow-up
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agree these would be useful. Already have invoke() paths for these. Can add as follow-up since the core file/code operations are the priority. |
||
|
|
||
| This is a convenience wrapper around executeCommand. | ||
|
|
||
| Args: | ||
| command: Shell command to execute. | ||
|
|
||
| Returns: | ||
| Dict containing command execution results. | ||
|
|
||
| Example: | ||
| >>> # List files | ||
| >>> result = client.execute_shell('ls -la') | ||
|
|
||
| >>> # Check Python version | ||
| >>> result = client.execute_shell('python --version') | ||
| """ | ||
| self.logger.info("Executing shell command: %s...", command[:50]) | ||
| return self.invoke("executeCommand", {"command": command}) | ||
|
|
||
|
|
||
| @contextmanager | ||
| def code_session( | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if we could have subclasses to organize the utility methods based on:
so we would end up with:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we keep this as a follow-up enhancement. Current flat structure works and matches how other SDKs typically expose methods. We can layer subclasses on top in a future version without breaking changes.