1+ #!/usr/bin/env python
2+ # examples/mcp_http_streamable_example.py
3+ """
4+ Demo: wire a remote MCP server via **HTTP Streamable** transport to CHUK.
5+
6+ Prerequisites
7+ -------------
8+ - A running HTTP Streamable MCP server at localhost:8000
9+ - The server should expose HTTP tools (http_greet, session_info, etc.)
10+
11+ What it shows
12+ -------------
13+ 1. Connect to an HTTP Streamable MCP server at localhost:8000 with proper MCP initialization
14+ 2. Register the remote tools in the local CHUK registry
15+ 3. List everything that landed in the registry
16+ 4. Look up the wrapper for HTTP tools and call them directly
17+ 5. Test multiple queries with different HTTP Streamable tools
18+ 6. Demonstrate the modern single-endpoint approach (spec 2025-03-26)
19+ """
20+
21+ from __future__ import annotations
22+
23+ import asyncio
24+ import json
25+ import os
26+ import sys
27+ from pathlib import Path
28+ from typing import Dict , List
29+
30+ # --------------------------------------------------------------------- #
31+ # allow "poetry run python examples/β¦" style execution #
32+ # --------------------------------------------------------------------- #
33+ PROJECT_ROOT = Path (__file__ ).resolve ().parents [1 ]
34+ sys .path .insert (0 , str (PROJECT_ROOT ))
35+
36+ # --------------------------------------------------------------------- #
37+ # CHUK imports #
38+ # --------------------------------------------------------------------- #
39+ from chuk_tool_processor .mcp .setup_mcp_http_streamable import setup_mcp_http_streamable
40+ from chuk_tool_processor .registry .provider import ToolRegistryProvider
41+
42+
43+ # --------------------------------------------------------------------- #
44+ # helper: pretty-print a namespace #
45+ # --------------------------------------------------------------------- #
46+ async def dump_namespace (namespace : str ) -> None :
47+ registry = await ToolRegistryProvider .get_registry ()
48+
49+ # β
list_tools() gives a plain list already
50+ tools = [t for t in await registry .list_tools () if t [0 ] == namespace ]
51+
52+ print (f"Tools in namespace { namespace !r} ({ len (tools )} ):" )
53+ for ns , name in tools :
54+ meta = await registry .get_metadata (name , ns )
55+ desc = meta .description if meta else "no description"
56+ print (f" β’ { ns } .{ name :<30} β { desc } " )
57+
58+ # --------------------------------------------------------------------- #
59+ # main demo #
60+ # --------------------------------------------------------------------- #
61+ async def main () -> None :
62+ print ("=== MCP HTTP Streamable Integration Demo ===\n " )
63+ print ("Using modern MCP Streamable HTTP transport (spec 2025-03-26)" )
64+ print ("This replaces the deprecated SSE transport with better infrastructure compatibility.\n " )
65+
66+ # 1οΈβ£ setup HTTP Streamable transport + registry
67+ try :
68+ print ("π Connecting to MCP HTTP Streamable server..." )
69+ processor , stream_manager = await setup_mcp_http_streamable (
70+ servers = [
71+ {
72+ "name" : "http_demo_server" ,
73+ "url" : "http://localhost:8000" , # Single endpoint approach
74+ # Optional: add API key if your server requires it
75+ # "api_key": "your-api-key-here"
76+ # Optional: add session ID for stateful connections
77+ # "session_id": "demo-session-123"
78+ }
79+ ],
80+ server_names = {0 : "http_demo_server" },
81+ namespace = "http" ,
82+ )
83+ print ("β
Successfully connected to MCP HTTP Streamable server!" )
84+ except Exception as e :
85+ print (f"β Failed to connect to HTTP Streamable server: { e } " )
86+ print ("Make sure you have an HTTP Streamable MCP server running at http://localhost:8000" )
87+ print ("The server should expose HTTP tools like 'http_greet', 'session_info'" )
88+ return
89+
90+ # 2οΈβ£ show what tools we got
91+ await dump_namespace ("http" )
92+
93+ # 3οΈβ£ test http_greet tool
94+ print ("\n " + "=" * 60 )
95+ print ("Testing http_greet tool..." )
96+ registry = await ToolRegistryProvider .get_registry ()
97+ wrapper_cls = await registry .get_tool ("http_greet" , "http" )
98+
99+ if wrapper_cls is None :
100+ print ("β http_greet tool not found in registry" )
101+ print ("Available tools:" )
102+ tools = await registry .list_tools ("http" )
103+ for ns , name in tools :
104+ print (f" - { name } " )
105+ await stream_manager .close ()
106+ return
107+
108+ # 4οΈβ£ execute http_greet with sample parameters
109+ wrapper = wrapper_cls () if callable (wrapper_cls ) else wrapper_cls
110+ try :
111+ name = "HTTP Streamable User"
112+ style = "formal"
113+ print (f"π Greeting: { name } ({ style } style)" )
114+
115+ res = await wrapper .execute (name = name , style = style )
116+ print ("\n π Result:" )
117+ print (res )
118+ except Exception as exc :
119+ print (f"β http_greet execution failed: { exc } " )
120+
121+ # 5οΈβ£ test multiple HTTP Streamable tools
122+ print ("\n " + "=" * 60 )
123+ print ("Testing all available HTTP Streamable tools..." )
124+
125+ test_scenarios = [
126+ {
127+ "tool" : "http_greet" ,
128+ "args" : {"name" : "Alice" , "style" : "casual" },
129+ "description" : "Casual greeting via HTTP transport"
130+ },
131+ {
132+ "tool" : "session_info" ,
133+ "args" : {},
134+ "description" : "Get current HTTP session information"
135+ },
136+ {
137+ "tool" : "http_counter" ,
138+ "args" : {"increment" : 3 },
139+ "description" : "Increment session counter"
140+ },
141+ {
142+ "tool" : "slow_operation" ,
143+ "args" : {"duration" : 2 },
144+ "description" : "Test slow operation (may use streaming)"
145+ }
146+ ]
147+
148+ for i , scenario in enumerate (test_scenarios , 1 ):
149+ tool_name = scenario ["tool" ]
150+ args = scenario ["args" ]
151+ description = scenario ["description" ]
152+
153+ print (f"\n π Test { i } : { tool_name } - { description } " )
154+ print (f" Args: { args } " )
155+
156+ # Get tool wrapper
157+ tool_wrapper_cls = await registry .get_tool (tool_name , "http" )
158+
159+ if tool_wrapper_cls is None :
160+ print (f" β οΈ Tool '{ tool_name } ' not available" )
161+ continue
162+
163+ try :
164+ tool_wrapper = tool_wrapper_cls () if callable (tool_wrapper_cls ) else tool_wrapper_cls
165+ res = await tool_wrapper .execute (** args )
166+
167+ print (" β
Success!" )
168+ if isinstance (res , str ):
169+ # Truncate long responses for readability
170+ if len (res ) > 200 :
171+ res = res [:200 ] + "..."
172+ print (f" π Response: { res } " )
173+ elif isinstance (res , (dict , list )):
174+ print (f" π Result: { json .dumps (res , indent = 6 )} " )
175+ else :
176+ print (f" π Result: { res } " )
177+
178+ except Exception as exc :
179+ print (f" β Failed: { exc } " )
180+
181+ # 6οΈβ£ demonstrate HTTP Streamable advantages
182+ print ("\n " + "=" * 60 )
183+ print ("HTTP Streamable Transport Advantages:" )
184+ print ("π Single endpoint approach (/mcp)" )
185+ print ("π Better infrastructure compatibility" )
186+ print ("β‘ Supports both immediate JSON and streaming SSE responses" )
187+ print ("π‘οΈ Enhanced error handling and retry logic" )
188+ print ("π§ Stateless operation when streaming not needed" )
189+ print ("π Modern replacement for deprecated SSE transport" )
190+
191+ # 7οΈβ£ show session persistence if supported
192+ print ("\n " + "=" * 60 )
193+ print ("Testing session persistence..." )
194+
195+ # Call counter multiple times to show session state
196+ counter_wrapper_cls = await registry .get_tool ("http_counter" , "http" )
197+ if counter_wrapper_cls :
198+ counter_wrapper = counter_wrapper_cls () if callable (counter_wrapper_cls ) else counter_wrapper_cls
199+
200+ try :
201+ print ("π First counter call..." )
202+ res1 = await counter_wrapper .execute (increment = 1 )
203+ print (f" Result: { res1 } " )
204+
205+ print ("π Second counter call..." )
206+ res2 = await counter_wrapper .execute (increment = 2 )
207+ print (f" Result: { res2 } " )
208+
209+ # Get session info to see the accumulated state
210+ session_wrapper_cls = await registry .get_tool ("session_info" , "http" )
211+ if session_wrapper_cls :
212+ session_wrapper = session_wrapper_cls () if callable (session_wrapper_cls ) else session_wrapper_cls
213+ session_info = await session_wrapper .execute ()
214+ print (f"π Final session state: { session_info } " )
215+
216+ except Exception as exc :
217+ print (f"β Session persistence test failed: { exc } " )
218+
219+ # 8οΈβ£ show server capabilities
220+ print ("\n " + "=" * 60 )
221+ print ("MCP HTTP Streamable Server Information:" )
222+ print ("β
HTTP Streamable transport (spec 2025-03-26)" )
223+ print ("β
Single /mcp endpoint for all communication" )
224+ print ("β
JSON-RPC 2.0 protocol compliance" )
225+ print ("β
Tool discovery and execution" )
226+ print ("β
Session management support" )
227+ print ("β
Optional streaming for complex operations" )
228+ print ("β
Better infrastructure compatibility than SSE" )
229+
230+ # 9οΈβ£ tidy-up
231+ print ("\n π Closing connections..." )
232+ await stream_manager .close ()
233+ print ("β
HTTP Streamable demo completed successfully!" )
234+
235+
236+ if __name__ == "__main__" :
237+ asyncio .run (main ())
0 commit comments