Skip to content

Commit 76645ca

Browse files
committed
improved timeouts, pydantic models etc
1 parent 8c4111d commit 76645ca

File tree

13 files changed

+677
-202
lines changed

13 files changed

+677
-202
lines changed

README.md

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -913,6 +913,79 @@ async def test_calculator():
913913

914914
## Configuration
915915

916+
### Timeout Configuration
917+
918+
CHUK Tool Processor uses a unified timeout configuration system that applies to all MCP transports (HTTP Streamable, SSE, STDIO) and the StreamManager. Instead of managing dozens of individual timeout values, there are just **4 logical timeout categories**:
919+
920+
```python
921+
from chuk_tool_processor.mcp.transport import TimeoutConfig
922+
923+
# Create custom timeout configuration
924+
timeout_config = TimeoutConfig(
925+
connect=30.0, # Connection establishment, initialization, session discovery
926+
operation=30.0, # Normal operations (tool calls, listing tools/resources/prompts)
927+
quick=5.0, # Fast health checks and pings
928+
shutdown=2.0 # Cleanup and shutdown operations
929+
)
930+
```
931+
932+
**Using timeout configuration with StreamManager:**
933+
934+
```python
935+
from chuk_tool_processor.mcp.stream_manager import StreamManager
936+
from chuk_tool_processor.mcp.transport import TimeoutConfig
937+
938+
# Create StreamManager with custom timeouts
939+
timeout_config = TimeoutConfig(
940+
connect=60.0, # Longer for slow initialization
941+
operation=45.0, # Longer for heavy operations
942+
quick=3.0, # Faster health checks
943+
shutdown=5.0 # More time for cleanup
944+
)
945+
946+
manager = StreamManager(timeout_config=timeout_config)
947+
```
948+
949+
**Timeout categories explained:**
950+
951+
| Category | Default | Used For | Examples |
952+
|----------|---------|----------|----------|
953+
| `connect` | 30.0s | Connection setup, initialization, discovery | HTTP connection, SSE session discovery, STDIO subprocess launch |
954+
| `operation` | 30.0s | Normal tool operations | Tool calls, listing tools/resources/prompts, get_tools() |
955+
| `quick` | 5.0s | Fast health/status checks | Ping operations, health checks |
956+
| `shutdown` | 2.0s | Cleanup and teardown | Transport close, connection cleanup |
957+
958+
**Why this matters:**
959+
-**Simple**: 4 timeout values instead of 20+
960+
-**Consistent**: Same timeout behavior across all transports
961+
-**Configurable**: Adjust timeouts based on your environment (slow networks, large datasets, etc.)
962+
-**Type-safe**: Pydantic validation ensures correct values
963+
964+
**Example: Adjusting for slow environments**
965+
966+
```python
967+
from chuk_tool_processor.mcp import setup_mcp_stdio
968+
from chuk_tool_processor.mcp.transport import TimeoutConfig
969+
970+
# For slow network or resource-constrained environments
971+
slow_timeouts = TimeoutConfig(
972+
connect=120.0, # Allow more time for package downloads
973+
operation=60.0, # Allow more time for heavy operations
974+
quick=10.0, # Be patient with health checks
975+
shutdown=10.0 # Allow thorough cleanup
976+
)
977+
978+
processor, manager = await setup_mcp_stdio(
979+
config_file="mcp_config.json",
980+
servers=["sqlite"],
981+
namespace="db",
982+
initialization_timeout=120.0
983+
)
984+
985+
# Set custom timeouts on the manager
986+
manager.timeout_config = slow_timeouts
987+
```
988+
916989
### Environment Variables
917990

918991
| Variable | Default | Description |

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
44

55
[project]
66
name = "chuk-tool-processor"
7-
version = "0.6.28"
7+
version = "0.6.29"
88
description = "Async-native framework for registering, discovering, and executing tools referenced in LLM responses"
99
readme = "README.md"
1010
requires-python = ">=3.11"

src/chuk_tool_processor/mcp/mcp_tool.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -370,7 +370,7 @@ async def _record_success(self) -> None:
370370
self._circuit_open = False
371371
self._circuit_open_time = None
372372
self.connection_state = ConnectionState.HEALTHY
373-
logger.info(f"Circuit breaker closed for tool '{self.tool_name}' after successful execution")
373+
logger.debug(f"Circuit breaker closed for tool '{self.tool_name}' after successful execution")
374374

375375
async def _record_failure(self, is_connection_error: bool = False) -> None:
376376
"""Record a failed execution."""
@@ -407,7 +407,7 @@ def _is_circuit_open(self) -> bool:
407407
self._circuit_open = False
408408
self._circuit_open_time = None
409409
self.connection_state = ConnectionState.HEALTHY
410-
logger.info(f"Circuit breaker reset for tool '{self.tool_name}' after timeout")
410+
logger.debug(f"Circuit breaker reset for tool '{self.tool_name}' after timeout")
411411
return False
412412

413413
return True
@@ -462,12 +462,12 @@ def reset_circuit_breaker(self) -> None:
462462
self._circuit_open_time = None
463463
self._consecutive_failures = 0
464464
self.connection_state = ConnectionState.HEALTHY
465-
logger.info(f"Circuit breaker manually reset for tool '{self.tool_name}'")
465+
logger.debug(f"Circuit breaker manually reset for tool '{self.tool_name}'")
466466

467467
def disable_resilience(self) -> None:
468468
"""Disable resilience features for this tool instance."""
469469
self.enable_resilience = False
470-
logger.info(f"Resilience features disabled for tool '{self.tool_name}'")
470+
logger.debug(f"Resilience features disabled for tool '{self.tool_name}'")
471471

472472
def set_stream_manager(self, stream_manager: StreamManager | None) -> None:
473473
"""
@@ -482,7 +482,7 @@ def set_stream_manager(self, stream_manager: StreamManager | None) -> None:
482482
if self._circuit_open:
483483
self._circuit_open = False
484484
self._circuit_open_time = None
485-
logger.info(f"Circuit breaker closed for tool '{self.tool_name}' due to new stream manager")
485+
logger.debug(f"Circuit breaker closed for tool '{self.tool_name}' due to new stream manager")
486486
else:
487487
self.connection_state = ConnectionState.DISCONNECTED
488488

src/chuk_tool_processor/mcp/stream_manager.py

Lines changed: 30 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
MCPBaseTransport,
2222
SSETransport,
2323
StdioTransport,
24+
TimeoutConfig,
2425
)
2526

2627
logger = get_logger("chuk_tool_processor.mcp.stream_manager")
@@ -38,15 +39,15 @@ class StreamManager:
3839
- HTTP Streamable (modern replacement for SSE, spec 2025-03-26) with graceful headers handling
3940
"""
4041

41-
def __init__(self) -> None:
42+
def __init__(self, timeout_config: TimeoutConfig | None = None) -> None:
4243
self.transports: dict[str, MCPBaseTransport] = {}
4344
self.server_info: list[dict[str, Any]] = []
4445
self.tool_to_server_map: dict[str, str] = {}
4546
self.server_names: dict[int, str] = {}
4647
self.all_tools: list[dict[str, Any]] = []
4748
self._lock = asyncio.Lock()
4849
self._closed = False # Track if we've been closed
49-
self._shutdown_timeout = 2.0 # Maximum time to spend on shutdown
50+
self.timeout_config = timeout_config or TimeoutConfig()
5051

5152
# ------------------------------------------------------------------ #
5253
# factory helpers with enhanced error handling #
@@ -251,8 +252,12 @@ async def initialize(
251252
self.transports[server_name] = transport
252253

253254
# Ping and get tools with timeout protection (use longer timeouts for slow servers)
254-
status = "Up" if await asyncio.wait_for(transport.send_ping(), timeout=30.0) else "Down"
255-
tools = await asyncio.wait_for(transport.get_tools(), timeout=30.0)
255+
status = (
256+
"Up"
257+
if await asyncio.wait_for(transport.send_ping(), timeout=self.timeout_config.operation)
258+
else "Down"
259+
)
260+
tools = await asyncio.wait_for(transport.get_tools(), timeout=self.timeout_config.operation)
256261

257262
for t in tools:
258263
name = t.get("name")
@@ -333,8 +338,12 @@ async def initialize_with_sse(
333338

334339
self.transports[name] = transport
335340
# Use longer timeouts for slow servers (ping can take time after initialization)
336-
status = "Up" if await asyncio.wait_for(transport.send_ping(), timeout=30.0) else "Down"
337-
tools = await asyncio.wait_for(transport.get_tools(), timeout=30.0)
341+
status = (
342+
"Up"
343+
if await asyncio.wait_for(transport.send_ping(), timeout=self.timeout_config.operation)
344+
else "Down"
345+
)
346+
tools = await asyncio.wait_for(transport.get_tools(), timeout=self.timeout_config.operation)
338347

339348
for t in tools:
340349
tname = t.get("name")
@@ -415,8 +424,12 @@ async def initialize_with_http_streamable(
415424

416425
self.transports[name] = transport
417426
# Use longer timeouts for slow servers (ping can take time after initialization)
418-
status = "Up" if await asyncio.wait_for(transport.send_ping(), timeout=30.0) else "Down"
419-
tools = await asyncio.wait_for(transport.get_tools(), timeout=30.0)
427+
status = (
428+
"Up"
429+
if await asyncio.wait_for(transport.send_ping(), timeout=self.timeout_config.operation)
430+
else "Down"
431+
)
432+
tools = await asyncio.wait_for(transport.get_tools(), timeout=self.timeout_config.operation)
420433

421434
for t in tools:
422435
tname = t.get("name")
@@ -462,7 +475,7 @@ async def list_tools(self, server_name: str) -> list[dict[str, Any]]:
462475
transport = self.transports[server_name]
463476

464477
try:
465-
tools = await asyncio.wait_for(transport.get_tools(), timeout=10.0)
478+
tools = await asyncio.wait_for(transport.get_tools(), timeout=self.timeout_config.operation)
466479
logger.debug("Found %d tools for server %s", len(tools), server_name)
467480
return tools
468481
except TimeoutError:
@@ -481,7 +494,7 @@ async def ping_servers(self) -> list[dict[str, Any]]:
481494

482495
async def _ping_one(name: str, tr: MCPBaseTransport):
483496
try:
484-
ok = await asyncio.wait_for(tr.send_ping(), timeout=5.0)
497+
ok = await asyncio.wait_for(tr.send_ping(), timeout=self.timeout_config.quick)
485498
except Exception:
486499
ok = False
487500
return {"server": name, "ok": ok}
@@ -496,7 +509,7 @@ async def list_resources(self) -> list[dict[str, Any]]:
496509

497510
async def _one(name: str, tr: MCPBaseTransport):
498511
try:
499-
res = await asyncio.wait_for(tr.list_resources(), timeout=10.0)
512+
res = await asyncio.wait_for(tr.list_resources(), timeout=self.timeout_config.operation)
500513
resources = res.get("resources", []) if isinstance(res, dict) else res
501514
for item in resources:
502515
item = dict(item)
@@ -516,7 +529,7 @@ async def list_prompts(self) -> list[dict[str, Any]]:
516529

517530
async def _one(name: str, tr: MCPBaseTransport):
518531
try:
519-
res = await asyncio.wait_for(tr.list_prompts(), timeout=10.0)
532+
res = await asyncio.wait_for(tr.list_prompts(), timeout=self.timeout_config.operation)
520533
prompts = res.get("prompts", []) if isinstance(res, dict) else res
521534
for item in prompts:
522535
item = dict(item)
@@ -643,7 +656,7 @@ async def _concurrent_close(self, transport_items: list[tuple[str, MCPBaseTransp
643656
try:
644657
results = await asyncio.wait_for(
645658
asyncio.gather(*[task for _, task in close_tasks], return_exceptions=True),
646-
timeout=self._shutdown_timeout,
659+
timeout=self.timeout_config.shutdown,
647660
)
648661

649662
# Process results
@@ -666,7 +679,8 @@ async def _concurrent_close(self, transport_items: list[tuple[str, MCPBaseTransp
666679
# Brief wait for cancellations to complete
667680
with contextlib.suppress(TimeoutError):
668681
await asyncio.wait_for(
669-
asyncio.gather(*[task for _, task in close_tasks], return_exceptions=True), timeout=0.5
682+
asyncio.gather(*[task for _, task in close_tasks], return_exceptions=True),
683+
timeout=self.timeout_config.shutdown,
670684
)
671685

672686
async def _sequential_close(self, transport_items: list[tuple[str, MCPBaseTransport]], close_results: list) -> None:
@@ -675,7 +689,7 @@ async def _sequential_close(self, transport_items: list[tuple[str, MCPBaseTransp
675689
try:
676690
await asyncio.wait_for(
677691
self._close_single_transport(name, transport),
678-
timeout=0.5, # Short timeout per transport
692+
timeout=self.timeout_config.shutdown,
679693
)
680694
logger.debug("Closed transport: %s", name)
681695
close_results.append((name, True, None))
@@ -767,7 +781,7 @@ async def health_check(self) -> dict[str, Any]:
767781

768782
for name, transport in self.transports.items():
769783
try:
770-
ping_ok = await asyncio.wait_for(transport.send_ping(), timeout=5.0)
784+
ping_ok = await asyncio.wait_for(transport.send_ping(), timeout=self.timeout_config.quick)
771785
health_info["transports"][name] = {
772786
"status": "healthy" if ping_ok else "unhealthy",
773787
"ping_success": ping_ok,

src/chuk_tool_processor/mcp/transport/__init__.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,12 @@
1111

1212
from .base_transport import MCPBaseTransport
1313
from .http_streamable_transport import HTTPStreamableTransport
14+
from .models import (
15+
HeadersConfig,
16+
ServerInfo,
17+
TimeoutConfig,
18+
TransportMetrics,
19+
)
1420
from .sse_transport import SSETransport
1521
from .stdio_transport import StdioTransport
1622

@@ -19,4 +25,8 @@
1925
"StdioTransport",
2026
"SSETransport",
2127
"HTTPStreamableTransport",
28+
"TimeoutConfig",
29+
"TransportMetrics",
30+
"ServerInfo",
31+
"HeadersConfig",
2232
]

0 commit comments

Comments
 (0)