Skip to content

Commit 2c023be

Browse files
committed
mcp support
1 parent b82e846 commit 2c023be

12 files changed

+831
-663
lines changed

examples/mcp_stdio_example.py

Lines changed: 99 additions & 135 deletions
Original file line numberDiff line numberDiff line change
@@ -1,151 +1,115 @@
1+
#!/usr/bin/env python
12
# examples/mcp_stdio_example.py
23
"""
3-
Example demonstrating flexible MCP integration with support for multiple transport types.
4+
Demo: wire a remote MCP “time” server into CHUK via **stdio** transport.
5+
6+
Prerequisites
7+
-------------
8+
Nothing but `uv` installed – the time-server will be fetched on-the-fly
9+
(`uvx mcp-server-time …`).
10+
11+
What it shows
12+
-------------
13+
1. create / reuse a minimal server-config JSON
14+
2. initialise the stdio StreamManager & register the remote tools
15+
3. list everything that landed in the registry
16+
4. look up the wrapper for `get_current_time` and call it directly
417
"""
18+
19+
from __future__ import annotations
20+
521
import asyncio
22+
import json
623
import os
724
import sys
8-
import json
25+
from pathlib import Path
926
from typing import Dict, List
1027

11-
# Add project root to path if running as script
12-
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
13-
14-
# CHUK Tool Processor imports
15-
from chuk_tool_processor.mcp import setup_mcp_stdio, setup_mcp_sse
16-
from chuk_tool_processor.registry import ToolRegistryProvider
17-
18-
async def stdio_example():
19-
"""Example using stdio transport."""
20-
print("\n=== MCP with Stdio Transport ===")
21-
22-
# Server configuration
23-
config_file = "server_config.json"
24-
25-
# Create or update config file for the example
26-
if not os.path.exists(config_file):
27-
server_config = {
28+
# --------------------------------------------------------------------- #
29+
# allow “poetry run python examples/…” style execution #
30+
# --------------------------------------------------------------------- #
31+
PROJECT_ROOT = Path(__file__).resolve().parents[1]
32+
sys.path.insert(0, str(PROJECT_ROOT))
33+
34+
# --------------------------------------------------------------------- #
35+
# CHUK imports #
36+
# --------------------------------------------------------------------- #
37+
from chuk_tool_processor.mcp.setup_mcp_stdio import setup_mcp_stdio
38+
from chuk_tool_processor.registry.provider import ToolRegistryProvider
39+
40+
41+
# --------------------------------------------------------------------- #
42+
# helper: pretty-print a namespace #
43+
# --------------------------------------------------------------------- #
44+
async def dump_namespace(namespace: str) -> None:
45+
registry = await ToolRegistryProvider.get_registry()
46+
47+
# ✅ list_tools() gives a plain list already
48+
tools = [t for t in await registry.list_tools() if t[0] == namespace]
49+
50+
print(f"Tools in namespace {namespace!r} ({len(tools)}):")
51+
for ns, name in tools:
52+
meta = await registry.get_metadata(name, ns)
53+
desc = meta.description if meta else "no description"
54+
print(f" • {ns}.{name:<20}{desc}")
55+
56+
# --------------------------------------------------------------------- #
57+
# main demo #
58+
# --------------------------------------------------------------------- #
59+
async def main() -> None:
60+
print("=== Flexible MCP integration demo ===\n")
61+
62+
# 1️⃣ write / reuse server-config
63+
cfg_path = PROJECT_ROOT / "server_config.json"
64+
if not cfg_path.exists():
65+
cfg = {
2866
"mcpServers": {
29-
"echo": {
30-
"command": "uv",
31-
"args": ["--directory", "/Users/christopherhay/chris-source/agent-x/mcp-servers/chuk-mcp-echo-server", "run", "src/chuk_mcp_echo_server/main.py"]
67+
"time": {
68+
"command": "uvx",
69+
"args": [
70+
"mcp-server-time",
71+
"--local-timezone=America/New_York",
72+
],
3273
}
3374
}
3475
}
35-
36-
with open(config_file, "w") as f:
37-
json.dump(server_config, f)
38-
print(f"Created config file: {config_file}")
76+
cfg_path.write_text(json.dumps(cfg, indent=2))
77+
print(f"Created demo config: {cfg_path}\n")
3978
else:
40-
print(f"Using existing config file: {config_file}")
41-
42-
servers = ["echo"]
43-
server_names = {0: "echo"}
44-
45-
try:
46-
processor, stream_manager = await setup_mcp_stdio(
47-
config_file=config_file,
48-
servers=servers,
49-
server_names=server_names,
50-
namespace="stdio"
51-
)
52-
53-
registry = ToolRegistryProvider.get_registry()
54-
tools = [t for t in registry.list_tools() if t[0] == "stdio"]
55-
print(f"Registered stdio tools ({len(tools)}):")
56-
for namespace, name in tools:
57-
metadata = registry.get_metadata(name, namespace)
58-
description = metadata.description if metadata else "No description"
59-
print(f" - {namespace}.{name}: {description}")
60-
61-
# Example LLM text with tool calls - use fully-qualified default namespace name
62-
llm_text = """
63-
I'll echo your message using stdio transport.
64-
65-
<tool name=\"stdio.echo\" args='{"message": "Hello from stdio transport!"}'/>
66-
"""
67-
68-
print("\nProcessing LLM text...")
69-
results = await processor.process_text(llm_text)
70-
71-
if results:
72-
print("\nResults:")
73-
for result in results:
74-
print(f"Tool: {result.tool}")
75-
if result.error:
76-
print(f" Error: {result.error}")
77-
else:
78-
print(f" Result: {json.dumps(result.result, indent=2) if isinstance(result.result, dict) else result.result}")
79-
print(f" Duration: {(result.end_time - result.start_time).total_seconds():.3f}s")
80-
else:
81-
print("\nNo tool calls found or executed.")
82-
83-
await stream_manager.close()
84-
85-
except Exception as e:
86-
print(f"Error in stdio example: {e}")
87-
import traceback
88-
traceback.print_exc()
89-
90-
async def sse_example():
91-
"""Example using SSE transport."""
92-
print("\n=== MCP with SSE Transport ===")
93-
94-
sse_servers = [
95-
{
96-
"name": "weather",
97-
"url": "https://api.example.com/sse/weather",
98-
"api_key": "your_api_key_here"
99-
}
100-
]
101-
server_names = {0: "weather"}
102-
103-
try:
104-
processor, stream_manager = await setup_mcp_sse(
105-
servers=sse_servers,
106-
server_names=server_names,
107-
namespace="sse"
108-
)
109-
110-
registry = ToolRegistryProvider.get_registry()
111-
tools = [t for t in registry.list_tools() if t[0] == "sse"]
112-
print(f"Registered SSE tools ({len(tools)}):")
113-
for namespace, name in tools:
114-
metadata = registry.get_metadata(name, namespace)
115-
description = metadata.description if metadata else "No description"
116-
print(f" - {namespace}.{name}: {description}")
117-
118-
print("\nNote: SSE transport is currently a placeholder implementation.")
119-
120-
await stream_manager.close()
121-
122-
except Exception as e:
123-
print(f"Error in SSE example: {e}")
124-
import traceback
125-
traceback.print_exc()
126-
127-
async def main():
128-
"""Run the examples."""
129-
print("\n=== Flexible MCP Integration Example ===")
130-
await stdio_example()
131-
#await sse_example()
132-
133-
registry = ToolRegistryProvider.get_registry()
134-
all_tools = registry.list_tools()
135-
136-
print("\n=== All Registered Tools ===")
137-
print(f"Total tools: {len(all_tools)}")
138-
139-
by_namespace = {}
140-
for namespace, name in all_tools:
141-
by_namespace.setdefault(namespace, []).append(name)
142-
143-
for namespace, tools in by_namespace.items():
144-
print(f"\nNamespace: {namespace} ({len(tools)} tools)")
145-
for name in tools:
146-
metadata = registry.get_metadata(name, namespace)
147-
description = metadata.description if metadata else "No description"
148-
print(f" - {name}: {description}")
79+
print(f"Using server config: {cfg_path}\n")
80+
81+
# 2️⃣ setup stdio transport + registry
82+
processor, stream_manager = await setup_mcp_stdio(
83+
config_file=str(cfg_path),
84+
servers=["time"],
85+
server_names={0: "time"},
86+
namespace="stdio",
87+
)
88+
89+
await dump_namespace("stdio")
90+
91+
# 3️⃣ look up the wrapper & call it directly
92+
print("\nExecuting stdio.get_current_time …")
93+
registry = await ToolRegistryProvider.get_registry()
94+
wrapper_cls = await registry.get_tool("get_current_time", "stdio")
95+
96+
if wrapper_cls is None:
97+
print("❌ tool not found in registry")
98+
else:
99+
wrapper = wrapper_cls() if callable(wrapper_cls) else wrapper_cls
100+
try:
101+
res = await wrapper.execute(timezone="America/New_York")
102+
print("\nResult:")
103+
if isinstance(res, (dict, list)):
104+
print(json.dumps(res, indent=2))
105+
else:
106+
print(res)
107+
except Exception as exc:
108+
print("❌ execution failed:", exc)
109+
110+
# 4️⃣ tidy-up
111+
await stream_manager.close()
112+
149113

150114
if __name__ == "__main__":
151115
asyncio.run(main())

0 commit comments

Comments
 (0)