Skip to content

Commit ec66e29

Browse files
committed
massively improved timeouts and sse
1 parent d9cc05f commit ec66e29

File tree

12 files changed

+2425
-194
lines changed

12 files changed

+2425
-194
lines changed

examples/debug_timeout_strategy.py

Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
#!/usr/bin/env python
2+
# examples/debug_timeout_strategy.py
3+
"""
4+
Debug the timeout handling in InProcessStrategy to identify why
5+
some calls use wrong timeout values.
6+
"""
7+
8+
import asyncio
9+
import sys
10+
from pathlib import Path
11+
12+
# Add project root to path
13+
PROJECT_ROOT = Path(__file__).resolve().parents[1]
14+
sys.path.insert(0, str(PROJECT_ROOT))
15+
16+
from chuk_tool_processor.execution.strategies.inprocess_strategy import InProcessStrategy
17+
from chuk_tool_processor.execution.tool_executor import ToolExecutor
18+
from chuk_tool_processor.registry.provider import ToolRegistryProvider
19+
from chuk_tool_processor.models.tool_call import ToolCall
20+
from chuk_tool_processor.mcp.setup_mcp_sse import setup_mcp_sse
21+
22+
async def debug_timeout_handling():
23+
"""Debug timeout configuration in the execution strategy."""
24+
25+
print("=== Debugging Timeout Strategy Handling ===\n")
26+
27+
# Connect to mock server
28+
try:
29+
print("1. Connecting to mock MCP SSE server...")
30+
_, stream_manager = await setup_mcp_sse(
31+
servers=[
32+
{
33+
"name": "mock_server",
34+
"url": "http://localhost:8000",
35+
}
36+
],
37+
server_names={0: "mock_server"},
38+
namespace="sse",
39+
)
40+
print("✅ Connected successfully")
41+
except Exception as e:
42+
print(f"❌ Connection failed: {e}")
43+
return
44+
45+
registry = await ToolRegistryProvider.get_registry()
46+
47+
# Test different timeout configurations
48+
timeout_configs = [
49+
{"timeout": 1.0, "description": "1 second timeout"},
50+
{"timeout": 2.0, "description": "2 second timeout"},
51+
{"timeout": 3.0, "description": "3 second timeout"},
52+
]
53+
54+
for config in timeout_configs:
55+
timeout = config["timeout"]
56+
description = config["description"]
57+
58+
print(f"\n2. Testing {description}...")
59+
60+
# Create strategy with specific timeout
61+
strategy = InProcessStrategy(
62+
registry,
63+
default_timeout=timeout,
64+
max_concurrency=1, # Single call to isolate issue
65+
)
66+
67+
executor = ToolExecutor(registry=registry, strategy=strategy)
68+
69+
# Create a test call
70+
test_call = ToolCall(
71+
tool="perplexity_search",
72+
namespace="sse",
73+
arguments={"query": f"Test query with {timeout}s timeout"}
74+
)
75+
76+
print(f" Strategy timeout: {strategy.default_timeout}")
77+
print(f" Executing tool call...")
78+
79+
try:
80+
import time
81+
start_time = time.time()
82+
83+
results = await executor.execute([test_call])
84+
85+
duration = time.time() - start_time
86+
result = results[0]
87+
88+
print(f" ✅ Completed in {duration:.3f}s")
89+
print(f" Result duration: {(result.end_time - result.start_time).total_seconds():.3f}s")
90+
91+
if result.error:
92+
print(f" ⚠️ Error: {result.error}")
93+
else:
94+
print(f" 📝 Success: {str(result.result)[:100]}...")
95+
96+
except Exception as e:
97+
duration = time.time() - start_time
98+
print(f" ❌ Failed after {duration:.3f}s: {e}")
99+
100+
# Test parallel execution with timeout debugging
101+
print(f"\n3. Testing parallel execution timeout behavior...")
102+
103+
strategy = InProcessStrategy(
104+
registry,
105+
default_timeout=2.0, # 2 second timeout
106+
max_concurrency=3,
107+
)
108+
109+
executor = ToolExecutor(registry=registry, strategy=strategy)
110+
111+
# Create multiple parallel calls
112+
parallel_calls = [
113+
ToolCall(
114+
tool="perplexity_search",
115+
namespace="sse",
116+
arguments={"query": f"Parallel test query {i}"}
117+
)
118+
for i in range(3)
119+
]
120+
121+
print(f" Strategy timeout: {strategy.default_timeout}")
122+
print(f" Max concurrency: {strategy.max_concurrency}")
123+
print(f" Executing {len(parallel_calls)} parallel calls...")
124+
125+
try:
126+
import time
127+
start_time = time.time()
128+
129+
results = await executor.execute(parallel_calls)
130+
131+
total_duration = time.time() - start_time
132+
print(f" ✅ All completed in {total_duration:.3f}s")
133+
134+
for i, (call, result) in enumerate(zip(parallel_calls, results)):
135+
duration = (result.end_time - result.start_time).total_seconds()
136+
status = "✅" if result.error is None else "❌"
137+
138+
print(f" Call {i+1}: {status} {duration:.3f}s")
139+
if result.error:
140+
print(f" Error: {result.error}")
141+
else:
142+
print(f" Success: {str(result.result)[:50]}...")
143+
144+
except Exception as e:
145+
total_duration = time.time() - start_time
146+
print(f" ❌ Parallel execution failed after {total_duration:.3f}s: {e}")
147+
148+
# Investigate individual wrapper timeout settings
149+
print(f"\n4. Investigating individual tool wrapper timeouts...")
150+
151+
for tool_name in ["perplexity_search", "perplexity_deep_research", "perplexity_quick_fact"]:
152+
wrapper_cls = await registry.get_tool(tool_name, "sse")
153+
if wrapper_cls:
154+
try:
155+
wrapper = wrapper_cls()
156+
157+
# Check if wrapper has timeout attributes
158+
timeout_attrs = []
159+
for attr in dir(wrapper):
160+
if 'timeout' in attr.lower():
161+
value = getattr(wrapper, attr, None)
162+
timeout_attrs.append(f"{attr}={value}")
163+
164+
print(f" 🔧 {tool_name}:")
165+
if timeout_attrs:
166+
print(f" Timeout attributes: {', '.join(timeout_attrs)}")
167+
else:
168+
print(f" No timeout attributes found")
169+
170+
# Check metadata
171+
metadata = await registry.get_metadata(tool_name, "sse")
172+
if metadata:
173+
if hasattr(metadata, 'timeout'):
174+
print(f" Metadata timeout: {metadata.timeout}")
175+
else:
176+
print(f" No metadata timeout")
177+
178+
except Exception as e:
179+
print(f" ❌ {tool_name}: Failed to inspect - {e}")
180+
181+
# Cleanup
182+
await stream_manager.close()
183+
print(f"\n✅ Timeout debugging completed!")
184+
185+
if __name__ == "__main__":
186+
asyncio.run(debug_timeout_handling())

0 commit comments

Comments
 (0)