Skip to content

Commit 063c920

Browse files
committed
execution strategiest now async
1 parent f0b130a commit 063c920

File tree

13 files changed

+1068
-891
lines changed

13 files changed

+1068
-891
lines changed
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
"""
2+
examples/demo_strategies_wrappers.py
3+
====================================
4+
5+
Demonstrates, in one run:
6+
7+
• In-process execution via ToolExecutor
8+
• Retry logic (fails once, then succeeds) via RetryableToolExecutor
9+
• Global rate-limit via RateLimitedToolExecutor
10+
• Result caching with 2-second TTL via CachingToolExecutor
11+
12+
Logging for the initial flaky failure is silenced, so the run is tidy.
13+
14+
Expected flow
15+
-------------
16+
1. Echo – cache MISS → stored
17+
2. Echo – cache HIT (machine == "cache")
18+
3. wait 3 s – cache entry expires
19+
4. Echo – MISS again
20+
5. Flaky – first failure, one retry, success (result == 14)
21+
6. Two rapid echo calls – rate-limit pauses second ≈1 s
22+
"""
23+
from __future__ import annotations
24+
25+
import asyncio
26+
import logging
27+
import time
28+
29+
from chuk_tool_processor.execution.tool_executor import ToolExecutor
30+
from chuk_tool_processor.execution.wrappers.caching import (
31+
CachingToolExecutor,
32+
InMemoryCache,
33+
)
34+
from chuk_tool_processor.execution.wrappers.rate_limiting import (
35+
RateLimiter,
36+
RateLimitedToolExecutor,
37+
)
38+
from chuk_tool_processor.execution.wrappers.retry import (
39+
RetryConfig,
40+
RetryableToolExecutor,
41+
)
42+
from chuk_tool_processor.models.tool_call import ToolCall
43+
from chuk_tool_processor.registry.interface import ToolRegistryInterface
44+
45+
# --------------------------------------------------------------------------- #
46+
# 0. Silence noisy ERROR log from first flaky failure
47+
# --------------------------------------------------------------------------- #
48+
lib_log = logging.getLogger(
49+
"chuk_tool_processor.execution.inprocess_strategy"
50+
)
51+
lib_log.setLevel(logging.CRITICAL) # higher than ERROR
52+
lib_log.propagate = False # prevent bubbling to root
53+
54+
55+
# --------------------------------------------------------------------------- #
56+
# 1. Demo tools
57+
# --------------------------------------------------------------------------- #
58+
class EchoTool:
59+
async def execute(self, *, text: str):
60+
return f"echo: {text}"
61+
62+
63+
class FlakyTool:
64+
"""Fails once, succeeds on subsequent calls."""
65+
66+
def __init__(self):
67+
self._called = False
68+
69+
async def _aexecute(self, value: int):
70+
if not self._called:
71+
self._called = True
72+
raise RuntimeError("flaky failure – try again")
73+
return value * 2
74+
75+
76+
# --------------------------------------------------------------------------- #
77+
# 2. Registry with stateful FlakyTool instance
78+
# --------------------------------------------------------------------------- #
79+
class DemoRegistry(ToolRegistryInterface):
80+
def __init__(self):
81+
self._tools = {
82+
"echo": EchoTool, # class: new instance per exec
83+
"flaky": FlakyTool(), # instance: state survives retry
84+
}
85+
86+
def get_tool(self, name):
87+
return self._tools.get(name)
88+
89+
90+
# --------------------------------------------------------------------------- #
91+
# 3. Build executor stack
92+
# --------------------------------------------------------------------------- #
93+
base_exec = ToolExecutor(DemoRegistry())
94+
95+
retry_exec = RetryableToolExecutor(
96+
base_exec,
97+
default_config=RetryConfig(
98+
max_retries=2,
99+
base_delay=0.2, # short delay for demo
100+
jitter=False,
101+
retry_on_exceptions=[RuntimeError],
102+
),
103+
)
104+
105+
rate_limiter = RateLimiter(global_limit=1, global_period=1) # 1 req / sec
106+
rl_exec = RateLimitedToolExecutor(retry_exec, rate_limiter)
107+
108+
cache_exec = CachingToolExecutor(
109+
rl_exec,
110+
cache=InMemoryCache(default_ttl=2), # 2-second TTL
111+
default_ttl=2,
112+
)
113+
114+
echo_call = ToolCall(tool="echo", arguments={"text": "hi"})
115+
flaky_call = ToolCall(tool="flaky", arguments={"value": 7})
116+
117+
# --------------------------------------------------------------------------- #
118+
async def main():
119+
print("\n--- 1) Echo (MISS) ---------------------------------------")
120+
print((await cache_exec.execute([echo_call]))[0].model_dump())
121+
122+
print("\n--- 2) Echo (HIT) ----------------------------------------")
123+
print((await cache_exec.execute([echo_call]))[0].model_dump())
124+
125+
print("\n--- waiting 3 s for cache to expire ----------------------")
126+
await asyncio.sleep(3)
127+
128+
print("\n--- 3) Echo (MISS again, TTL expired) --------------------")
129+
print((await cache_exec.execute([echo_call]))[0].model_dump())
130+
131+
print("\n--- 4) Flaky tool (retry succeeds) -----------------------")
132+
print((await cache_exec.execute([flaky_call]))[0].model_dump())
133+
134+
print("\n--- 5) Rate-limit demo -----------------------------------")
135+
start = time.monotonic()
136+
await cache_exec.execute([ToolCall(tool="echo", arguments={"text": "one"})])
137+
await cache_exec.execute([ToolCall(tool="echo", arguments={"text": "two"})])
138+
print(f"Elapsed ≈{time.monotonic() - start:.2f}s (≥ 1 s by limit)")
139+
140+
141+
if __name__ == "__main__":
142+
asyncio.run(main())

src/chuk_tool_processor/execution/strategies/inprocess_strategy.py

Lines changed: 29 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,11 @@
1-
"""
2-
In-process execution strategy with sync/async support.
3-
4-
This version prefers the public `execute()` wrapper (with validation and
5-
defaults) over the private `_execute` implementation, fixing missing-argument
6-
errors for `ValidatedTool` subclasses.
7-
"""
8-
1+
# chuk_tool_processor/execution/strategies/inprocess_strategy.py
92
from __future__ import annotations
103

114
import asyncio
125
import inspect
136
import os
147
from datetime import datetime, timezone
15-
from typing import Any, List, Optional
8+
from typing import Any, List
169

1710
from chuk_tool_processor.core.exceptions import ToolExecutionError
1811
from chuk_tool_processor.models.execution_strategy import ExecutionStrategy
@@ -23,9 +16,15 @@
2316

2417
logger = get_logger("chuk_tool_processor.execution.inprocess_strategy")
2518

19+
from contextlib import asynccontextmanager
20+
21+
# Async no-op context manager when no semaphore
22+
@asynccontextmanager
23+
async def _noop_cm():
24+
yield
2625

2726
class InProcessStrategy(ExecutionStrategy):
28-
"""Run tools inside the current interpreter, concurrently."""
27+
"""Execute tools concurrently in the current event-loop."""
2928

3029
def __init__(
3130
self,
@@ -37,9 +36,6 @@ def __init__(
3736
self.default_timeout = default_timeout
3837
self._sem = asyncio.Semaphore(max_concurrency) if max_concurrency else None
3938

40-
# ------------------------------------------------------------------ #
41-
# public API
42-
# ------------------------------------------------------------------ #
4339
async def run(
4440
self,
4541
calls: List[ToolCall],
@@ -51,9 +47,6 @@ async def run(
5147
]
5248
return await asyncio.gather(*tasks)
5349

54-
# ------------------------------------------------------------------ #
55-
# helpers
56-
# ------------------------------------------------------------------ #
5750
async def _execute_single_call(
5851
self,
5952
call: ToolCall,
@@ -75,68 +68,37 @@ async def _execute_single_call(
7568
pid=pid,
7669
)
7770

78-
try:
79-
run = self._run_with_timeout
80-
if self._sem is None:
81-
return await run(impl, call, timeout, start, machine, pid)
82-
async with self._sem:
83-
return await run(impl, call, timeout, start, machine, pid)
84-
except Exception as exc: # pragma: no cover – safety net
85-
logger.exception("Unexpected error while executing %s", call.tool)
71+
tool = impl() if inspect.isclass(impl) else impl
72+
guard = self._sem if self._sem is not None else _noop_cm()
73+
74+
# Determine correct async entry-point, even on bound methods
75+
if hasattr(tool, "_aexecute") and inspect.iscoroutinefunction(type(tool)._aexecute):
76+
fn = tool._aexecute
77+
elif hasattr(tool, "execute") and inspect.iscoroutinefunction(tool.execute):
78+
fn = tool.execute
79+
else:
8680
return ToolResult(
8781
tool=call.tool,
8882
result=None,
89-
error=f"Unexpected error: {exc}",
83+
error=(
84+
"Tool must implement async '_aexecute' or 'execute'."
85+
),
9086
start_time=start,
9187
end_time=datetime.now(timezone.utc),
9288
machine=machine,
9389
pid=pid,
9490
)
9591

96-
# ------------------------------------------------------------------ #
97-
# core execution with timeout
98-
# ------------------------------------------------------------------ #
99-
async def _run_with_timeout(
100-
self,
101-
impl: Any,
102-
call: ToolCall,
103-
timeout: float | None,
104-
start: datetime,
105-
machine: str,
106-
pid: int,
107-
) -> ToolResult:
108-
tool = impl() if isinstance(impl, type) else impl
109-
110-
# ------------------------------------------------------------------
111-
# Entry-point selection order:
112-
# 1. `_aexecute` (async special case)
113-
# 2. `execute` (public wrapper WITH validation & defaults)
114-
# 3. `_execute` (fallback / legacy)
115-
# ------------------------------------------------------------------
116-
if hasattr(tool, "_aexecute") and inspect.iscoroutinefunction(tool._aexecute):
117-
fn = tool._aexecute
118-
is_async = True
119-
elif hasattr(tool, "execute"):
120-
fn = tool.execute
121-
is_async = inspect.iscoroutinefunction(fn)
122-
elif hasattr(tool, "_execute"):
123-
fn = tool._execute
124-
is_async = inspect.iscoroutinefunction(fn)
125-
else:
126-
raise ToolExecutionError(
127-
f"Tool '{call.tool}' must implement _execute, execute or _aexecute"
128-
)
129-
13092
async def _invoke():
131-
if is_async:
132-
return await fn(**call.arguments)
133-
loop = asyncio.get_running_loop()
134-
return await loop.run_in_executor(None, lambda: fn(**call.arguments))
93+
return await fn(**call.arguments)
13594

13695
try:
137-
result_val = (
138-
await asyncio.wait_for(_invoke(), timeout) if timeout else await _invoke()
139-
)
96+
async with guard:
97+
result_val = (
98+
await asyncio.wait_for(_invoke(), timeout)
99+
if timeout
100+
else await _invoke()
101+
)
140102
return ToolResult(
141103
tool=call.tool,
142104
result=result_val,
@@ -157,6 +119,7 @@ async def _invoke():
157119
pid=pid,
158120
)
159121
except Exception as exc:
122+
logger.exception("Error while executing %s", call.tool)
160123
return ToolResult(
161124
tool=call.tool,
162125
result=None,

0 commit comments

Comments
 (0)