1414import json
1515import os
1616import sys
17+ import logging
1718from pathlib import Path
1819from typing import Any , List , Tuple
1920
2021from colorama import Fore , Style , init as colorama_init
2122
2223colorama_init (autoreset = True )
2324
25+ # ═══════════════════════════════════════════════════════════════════════════
26+ # Fix for CancelledError during asyncio shutdown
27+ # ═══════════════════════════════════════════════════════════════════════════
28+
29+ def setup_asyncio_cleanup ():
30+ """Setup proper asyncio cleanup to prevent CancelledError warnings."""
31+ def handle_exception (loop , context ):
32+ exception = context .get ('exception' )
33+ if isinstance (exception , asyncio .CancelledError ):
34+ # Silently ignore CancelledError during shutdown
35+ return
36+
37+ # Log other exceptions normally
38+ loop .default_exception_handler (context )
39+
40+ # Set the exception handler for the current event loop
41+ try :
42+ loop = asyncio .get_running_loop ()
43+ loop .set_exception_handler (handle_exception )
44+ except RuntimeError :
45+ # No running loop yet, will set later
46+ pass
47+
48+ # Apply the fix
49+ setup_asyncio_cleanup ()
50+
51+ # Also suppress asyncio logger for CancelledError
52+ logging .getLogger ('asyncio' ).setLevel (logging .CRITICAL )
53+
2454# ─── local-package bootstrap ───────────────────────────────────────────────
2555PROJECT_ROOT = Path (__file__ ).resolve ().parents [1 ]
2656sys .path .insert (0 , str (PROJECT_ROOT ))
@@ -135,53 +165,106 @@ def show_results(title: str, calls: List[ToolCall], results: List[ToolResult]) -
135165 print (Style .DIM + "-" * 60 )
136166
137167
168+ async def graceful_shutdown ():
169+ """Perform graceful shutdown of all async tasks."""
170+ try :
171+ # Close the stream manager if it exists
172+ if hasattr (bootstrap_mcp , 'stream_manager' ):
173+ try :
174+ await bootstrap_mcp .stream_manager .close ()
175+ logger .debug ("Stream manager closed successfully" )
176+ except asyncio .CancelledError :
177+ logger .debug ("Stream manager close cancelled during shutdown" )
178+ except Exception as e :
179+ logger .error (f"Error closing stream manager: { e } " )
180+
181+ # Don't wait or cancel tasks during shutdown - let asyncio.run() handle it
182+ logger .debug ("Graceful shutdown completed" )
183+
184+ except Exception as e :
185+ logger .error (f"Error during graceful shutdown: { e } " )
186+
187+
138188# ─── main demo ──────────────────────────────────────────────────────────────
139189async def run_demo () -> None :
140190 print (Fore .GREEN + "=== MCP Time Tool-Calling Demo ===" + Style .RESET_ALL )
141191
142- await bootstrap_mcp ()
143-
144- registry = await ToolRegistryProvider .get_registry ()
145-
146- executor = ToolExecutor (
147- registry ,
148- strategy = InProcessStrategy (
149- registry ,
150- default_timeout = 5.0 ,
151- max_concurrency = 4 ,
152- ),
153- )
192+ # Setup asyncio exception handler for this loop
193+ loop = asyncio .get_running_loop ()
194+
195+ def handle_exception (loop , context ):
196+ exception = context .get ('exception' )
197+ if isinstance (exception , asyncio .CancelledError ):
198+ return # Ignore CancelledError
199+ loop .default_exception_handler (context )
200+
201+ loop .set_exception_handler (handle_exception )
154202
155- # sequential examples --------------------------------------------------
156- for title , plugin , raw in PLUGINS :
157- # new parser API is async
158- calls = await plugin .try_parse (raw )
159- results = await executor .execute (calls )
160- show_results (f"{ title } → sequential" , calls , results )
203+ try :
204+ await bootstrap_mcp ()
161205
162- # parallel demo --------------------------------------------------------
163- banner ("Parallel current-time calls" )
206+ registry = await ToolRegistryProvider .get_registry ()
164207
165- parallel_calls = [
166- ToolCall (
167- tool = f"{ NAMESPACE } .get_current_time" ,
168- arguments = {"timezone" : tz },
208+ executor = ToolExecutor (
209+ registry ,
210+ strategy = InProcessStrategy (
211+ registry ,
212+ default_timeout = 5.0 ,
213+ max_concurrency = 4 ,
214+ ),
169215 )
170- for tz in ["UTC" , "Europe/Paris" , "Asia/Kolkata" , "America/New_York" ]
171- ]
172-
173- parallel_results = await executor .execute (parallel_calls )
174- show_results ("Parallel run" , parallel_calls , parallel_results )
175-
176- # goodbye
177- await bootstrap_mcp .stream_manager .close () # type: ignore[attr-defined]
178-
179-
180- if __name__ == "__main__" :
181- import logging
182216
217+ # sequential examples --------------------------------------------------
218+ for title , plugin , raw in PLUGINS :
219+ # new parser API is async
220+ calls = await plugin .try_parse (raw )
221+ results = await executor .execute (calls )
222+ show_results (f"{ title } → sequential" , calls , results )
223+
224+ # parallel demo --------------------------------------------------------
225+ banner ("Parallel current-time calls" )
226+
227+ parallel_calls = [
228+ ToolCall (
229+ tool = f"{ NAMESPACE } .get_current_time" ,
230+ arguments = {"timezone" : tz },
231+ )
232+ for tz in ["UTC" , "Europe/Paris" , "Asia/Kolkata" , "America/New_York" ]
233+ ]
234+
235+ parallel_results = await executor .execute (parallel_calls )
236+ show_results ("Parallel run" , parallel_calls , parallel_results )
237+
238+ except KeyboardInterrupt :
239+ logger .info ("Demo interrupted by user" )
240+ except Exception as e :
241+ logger .error (f"Demo error: { e } " )
242+ raise
243+ finally :
244+ # Always perform graceful shutdown
245+ await graceful_shutdown ()
246+
247+
248+ def main ():
249+ """Main entry point with proper error handling."""
250+ # Set up logging
183251 logging .getLogger ("chuk_tool_processor" ).setLevel (
184252 getattr (logging , os .environ .get ("LOGLEVEL" , "INFO" ).upper ())
185253 )
254+
255+ try :
256+ # Run the demo with proper cleanup
257+ asyncio .run (run_demo ())
258+ print (Fore .GREEN + "\n ✅ Demo completed successfully!" + Style .RESET_ALL )
259+
260+ except KeyboardInterrupt :
261+ print (Fore .YELLOW + "\n 👋 Demo interrupted by user. Goodbye!" + Style .RESET_ALL )
262+
263+ except Exception as e :
264+ print (Fore .RED + f"\n ❌ Demo failed: { e } " + Style .RESET_ALL )
265+ logger .error (f"Demo failed: { e } " , exc_info = True )
266+ sys .exit (1 )
186267
187- asyncio .run (run_demo ())
268+
269+ if __name__ == "__main__" :
270+ main ()
0 commit comments