Skip to content

Commit 1166427

Browse files
committed
added more debug
1 parent ec66e29 commit 1166427

File tree

7 files changed

+274
-21
lines changed

7 files changed

+274
-21
lines changed

examples/debug_timeout_source.py

Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
#!/usr/bin/env python
2+
# examples/debug_timeout_source.py
3+
"""
4+
Trace the exact source of the 10-second timeout by instrumenting all timeout-related calls.
5+
"""
6+
7+
import asyncio
8+
import sys
9+
import time
10+
import inspect
11+
from pathlib import Path
12+
from unittest.mock import patch, AsyncMock
13+
14+
# Add project root to path
15+
PROJECT_ROOT = Path(__file__).resolve().parents[1]
16+
sys.path.insert(0, str(PROJECT_ROOT))
17+
18+
from chuk_tool_processor.execution.strategies.inprocess_strategy import InProcessStrategy
19+
from chuk_tool_processor.execution.tool_executor import ToolExecutor
20+
from chuk_tool_processor.registry.provider import ToolRegistryProvider
21+
from chuk_tool_processor.models.tool_call import ToolCall
22+
from chuk_tool_processor.mcp.setup_mcp_sse import setup_mcp_sse
23+
24+
# Store original functions
25+
original_wait_for = asyncio.wait_for
26+
27+
def get_call_stack():
28+
"""Get a simplified call stack for debugging."""
29+
stack = inspect.stack()
30+
relevant_frames = []
31+
for frame in stack[2:8]: # Skip this function and the wrapper
32+
filename = Path(frame.filename).name
33+
relevant_frames.append(f"{filename}:{frame.lineno}:{frame.function}")
34+
return " -> ".join(relevant_frames)
35+
36+
async def traced_wait_for(coro, timeout=None):
37+
"""Trace all asyncio.wait_for calls with their timeouts."""
38+
caller_stack = get_call_stack()
39+
print(f" 🔍 asyncio.wait_for(timeout={timeout}s) from: {caller_stack}")
40+
41+
start_time = time.time()
42+
try:
43+
result = await original_wait_for(coro, timeout)
44+
duration = time.time() - start_time
45+
print(f" ✅ wait_for completed in {duration:.3f}s (limit: {timeout}s)")
46+
return result
47+
except asyncio.TimeoutError:
48+
duration = time.time() - start_time
49+
print(f" ⏰ wait_for TIMED OUT after {duration:.3f}s (limit: {timeout}s)")
50+
raise
51+
except Exception as e:
52+
duration = time.time() - start_time
53+
print(f" ❌ wait_for failed after {duration:.3f}s: {e}")
54+
raise
55+
56+
async def debug_timeout_source():
57+
"""Debug to find the exact source of the 10-second timeout."""
58+
59+
print("=== Tracing Timeout Sources ===\n")
60+
61+
# Patch asyncio.wait_for globally
62+
with patch('asyncio.wait_for', traced_wait_for):
63+
# Also patch it in specific modules that might import it
64+
with patch('chuk_tool_processor.execution.strategies.inprocess_strategy.asyncio.wait_for', traced_wait_for):
65+
with patch('chuk_tool_processor.mcp.stream_manager.asyncio.wait_for', traced_wait_for):
66+
67+
# Connect to mock server
68+
try:
69+
print("1. Connecting to mock MCP SSE server...")
70+
_, stream_manager = await setup_mcp_sse(
71+
servers=[
72+
{
73+
"name": "mock_server",
74+
"url": "http://localhost:8020",
75+
}
76+
],
77+
server_names={0: "mock_server"},
78+
namespace="sse",
79+
)
80+
print("✅ Connected successfully\n")
81+
except Exception as e:
82+
print(f"❌ Connection failed: {e}")
83+
return
84+
85+
registry = await ToolRegistryProvider.get_registry()
86+
87+
# Test with 1 second timeout to see where the 10s comes from
88+
print(f"2. Testing 1 second timeout with comprehensive tracing...")
89+
90+
strategy = InProcessStrategy(
91+
registry,
92+
default_timeout=1.0
93+
)
94+
95+
executor = ToolExecutor(registry=registry, strategy=strategy)
96+
97+
test_call = ToolCall(
98+
tool="perplexity_search",
99+
namespace="sse",
100+
arguments={"query": "Timeout tracing test"}
101+
)
102+
103+
print(f" Strategy default_timeout: {strategy.default_timeout}")
104+
print(f" Executing with full timeout tracing...\n")
105+
106+
try:
107+
start_time = time.time()
108+
109+
results = await executor.execute([test_call])
110+
111+
duration = time.time() - start_time
112+
result = results[0]
113+
114+
print(f"\n ✅ Total execution completed in {duration:.3f}s")
115+
print(f" Result duration: {(result.end_time - result.start_time).total_seconds():.3f}s")
116+
117+
if result.error:
118+
print(f" ⚠️ Error: {result.error}")
119+
120+
# Check if error message contains timeout info
121+
if "timeout" in result.error.lower():
122+
print(f" 🔍 Timeout error detected! Message: {result.error}")
123+
else:
124+
print(f" 📝 Success")
125+
126+
except Exception as e:
127+
duration = time.time() - start_time
128+
print(f"\n ❌ Failed after {duration:.3f}s: {e}")
129+
130+
# Also test the StreamManager call_tool directly
131+
print(f"\n3. Testing StreamManager.call_tool directly...")
132+
133+
try:
134+
start_time = time.time()
135+
136+
# Call the stream manager directly to see if it has its own timeout
137+
result = await stream_manager.call_tool(
138+
tool_name="perplexity_search",
139+
arguments={"query": "Direct StreamManager test"}
140+
)
141+
142+
duration = time.time() - start_time
143+
print(f" ✅ Direct call completed in {duration:.3f}s")
144+
145+
if result.get("isError"):
146+
print(f" ⚠️ Error: {result.get('error')}")
147+
else:
148+
print(f" 📝 Success")
149+
150+
except Exception as e:
151+
duration = time.time() - start_time
152+
print(f" ❌ Direct call failed after {duration:.3f}s: {e}")
153+
154+
# Test transport layer directly
155+
print(f"\n4. Testing transport layer...")
156+
157+
transport = stream_manager.transports.get("mock_server")
158+
if transport:
159+
try:
160+
start_time = time.time()
161+
162+
result = await transport.call_tool(
163+
"perplexity_search",
164+
{"query": "Direct transport test"}
165+
)
166+
167+
duration = time.time() - start_time
168+
print(f" ✅ Transport call completed in {duration:.3f}s")
169+
170+
if result.get("isError"):
171+
print(f" ⚠️ Error: {result.get('error')}")
172+
else:
173+
print(f" 📝 Success")
174+
175+
except Exception as e:
176+
duration = time.time() - start_time
177+
print(f" ❌ Transport call failed after {duration:.3f}s: {e}")
178+
else:
179+
print(" ❌ No transport found for mock_server")
180+
181+
# Cleanup
182+
await stream_manager.close()
183+
print(f"\n✅ Timeout source tracing completed!")
184+
185+
if __name__ == "__main__":
186+
asyncio.run(debug_timeout_source())

examples/debug_timeout_strategy.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ async def debug_timeout_handling():
3131
servers=[
3232
{
3333
"name": "mock_server",
34-
"url": "http://localhost:8000",
34+
"url": "http://localhost:8020",
3535
}
3636
],
3737
server_names={0: "mock_server"},
@@ -49,6 +49,7 @@ async def debug_timeout_handling():
4949
{"timeout": 1.0, "description": "1 second timeout"},
5050
{"timeout": 2.0, "description": "2 second timeout"},
5151
{"timeout": 3.0, "description": "3 second timeout"},
52+
{"timeout": 15.0, "description": "15 second timeout"},
5253
]
5354

5455
for config in timeout_configs:
@@ -60,8 +61,7 @@ async def debug_timeout_handling():
6061
# Create strategy with specific timeout
6162
strategy = InProcessStrategy(
6263
registry,
63-
default_timeout=timeout,
64-
max_concurrency=1, # Single call to isolate issue
64+
default_timeout=timeout
6565
)
6666

6767
executor = ToolExecutor(registry=registry, strategy=strategy)
@@ -102,8 +102,7 @@ async def debug_timeout_handling():
102102

103103
strategy = InProcessStrategy(
104104
registry,
105-
default_timeout=2.0, # 2 second timeout
106-
max_concurrency=3,
105+
default_timeout=2.0 # 2 second timeout
107106
)
108107

109108
executor = ToolExecutor(registry=registry, strategy=strategy)
@@ -119,7 +118,6 @@ async def debug_timeout_handling():
119118
]
120119

121120
print(f" Strategy timeout: {strategy.default_timeout}")
122-
print(f" Max concurrency: {strategy.max_concurrency}")
123121
print(f" Executing {len(parallel_calls)} parallel calls...")
124122

125123
try:

examples/reliable_test_sse_server.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -340,7 +340,7 @@ async def process_message(session_id: str, method: str, msg_id: str, message: di
340340
def main():
341341
"""Run the reliable test server."""
342342
print("🚀 Starting Reliable Test MCP SSE Server...")
343-
print("📡 Server will be available at: http://localhost:8000")
343+
print("📡 Server will be available at: http://localhost:8020")
344344
print("🔧 Available tools:")
345345
for tool in TOOLS:
346346
print(f" - {tool['name']}: {tool['description']}")
@@ -352,7 +352,7 @@ def main():
352352
uvicorn.run(
353353
app,
354354
host="0.0.0.0",
355-
port=8000,
355+
port=8020,
356356
log_level="warning", # Reduce uvicorn noise
357357
access_log=False
358358
)

examples/test_sse_server.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@
5252
logger = get_logger("mcp-mock-sse-demo")
5353

5454
# ─── config / bootstrap ─────────────────────────────────────────────────────
55-
SSE_SERVER_URL = "http://localhost:8000"
55+
SSE_SERVER_URL = "http://localhost:8020"
5656
SERVER_NAME = "mock_perplexity_server"
5757
NAMESPACE = "sse" # where remote tools will be registered
5858

src/chuk_tool_processor/mcp/mcp_tool.py

Lines changed: 46 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,11 @@ def __init__(
3636
servers: Optional[List[str]] = None,
3737
server_names: Optional[Dict[int, str]] = None,
3838
namespace: str = "stdio",
39+
default_timeout: Optional[float] = None, # Add default timeout support
3940
) -> None:
4041
self.tool_name = tool_name
4142
self._sm: Optional[StreamManager] = stream_manager
43+
self.default_timeout = default_timeout or 30.0 # Default to 30s if not specified
4244

4345
# Boot-strap parameters (only needed if _sm is None)
4446
self._cfg_file = cfg_file
@@ -78,21 +80,56 @@ async def _ensure_stream_manager(self) -> StreamManager:
7880
return self._sm # type: ignore[return-value]
7981

8082
# ------------------------------------------------------------------ #
81-
async def execute(self, **kwargs: Any) -> Any:
83+
async def execute(self, timeout: Optional[float] = None, **kwargs: Any) -> Any:
8284
"""
83-
Forward the call to the remote MCP tool.
85+
Forward the call to the remote MCP tool with timeout support.
86+
87+
Args:
88+
timeout: Optional timeout for this specific call. If not provided,
89+
uses the instance's default_timeout.
90+
**kwargs: Arguments to pass to the MCP tool.
91+
92+
Returns:
93+
The result from the MCP tool call.
8494
8595
Raises
8696
------
8797
RuntimeError
8898
If the server returns an error payload.
99+
asyncio.TimeoutError
100+
If the call times out.
89101
"""
90102
sm = await self._ensure_stream_manager()
91-
result = await sm.call_tool(tool_name=self.tool_name, arguments=kwargs)
103+
104+
# Use provided timeout, fall back to instance default, then global default
105+
effective_timeout = timeout if timeout is not None else self.default_timeout
106+
107+
logger.debug("Calling MCP tool '%s' with timeout: %ss", self.tool_name, effective_timeout)
108+
109+
try:
110+
# Pass timeout directly to StreamManager instead of wrapping with wait_for
111+
result = await sm.call_tool(
112+
tool_name=self.tool_name,
113+
arguments=kwargs,
114+
timeout=effective_timeout
115+
)
116+
117+
if result.get("isError"):
118+
err = result.get("error", "Unknown error")
119+
logger.error("Remote MCP error from '%s': %s", self.tool_name, err)
120+
raise RuntimeError(err)
121+
122+
return result.get("content")
123+
124+
except asyncio.TimeoutError:
125+
logger.warning("MCP tool '%s' timed out after %ss", self.tool_name, effective_timeout)
126+
raise
127+
except Exception as e:
128+
logger.error("Error calling MCP tool '%s': %s", self.tool_name, e)
129+
raise
92130

93-
if result.get("isError"):
94-
err = result.get("error", "Unknown error")
95-
logger.error("Remote MCP error from '%s': %s", self.tool_name, err)
96-
raise RuntimeError(err)
97-
98-
return result.get("content")
131+
# ------------------------------------------------------------------ #
132+
# Legacy method name support
133+
async def _aexecute(self, timeout: Optional[float] = None, **kwargs: Any) -> Any:
134+
"""Legacy alias for execute() method."""
135+
return await self.execute(timeout=timeout, **kwargs)

src/chuk_tool_processor/mcp/stream_manager.py

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -265,16 +265,48 @@ async def call_tool(
265265
tool_name: str,
266266
arguments: Dict[str, Any],
267267
server_name: Optional[str] = None,
268+
timeout: Optional[float] = None, # Add timeout parameter
268269
) -> Dict[str, Any]:
270+
"""
271+
Call a tool on the appropriate server with timeout support.
272+
273+
Args:
274+
tool_name: Name of the tool to call
275+
arguments: Arguments to pass to the tool
276+
server_name: Optional server name (auto-detected if not provided)
277+
timeout: Optional timeout for the call
278+
279+
Returns:
280+
Dictionary containing the tool result or error
281+
"""
269282
server_name = server_name or self.get_server_for_tool(tool_name)
270283
if not server_name or server_name not in self.transports:
271284
# wording kept exactly for unit-test expectation
272285
return {
273286
"isError": True,
274287
"error": f"No server found for tool: {tool_name}",
275288
}
276-
return await self.transports[server_name].call_tool(tool_name, arguments)
277-
289+
290+
transport = self.transports[server_name]
291+
292+
# Apply timeout if specified
293+
if timeout is not None:
294+
logger.debug("Calling tool '%s' with %ss timeout", tool_name, timeout)
295+
try:
296+
return await asyncio.wait_for(
297+
transport.call_tool(tool_name, arguments),
298+
timeout=timeout
299+
)
300+
except asyncio.TimeoutError:
301+
logger.warning("Tool '%s' timed out after %ss", tool_name, timeout)
302+
return {
303+
"isError": True,
304+
"error": f"Tool call timed out after {timeout}s",
305+
}
306+
else:
307+
# No timeout specified, call directly
308+
return await transport.call_tool(tool_name, arguments)
309+
278310
# ------------------------------------------------------------------ #
279311
# shutdown #
280312
# ------------------------------------------------------------------ #

uv.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)