11# chuk_tool_processor/core/processor.py
22import asyncio
33import time
4+ import json
5+ import hashlib
46from typing import Any , Dict , List , Optional , Type , Union
57
6- # imports
8+ # imports
79from chuk_tool_processor .models .tool_call import ToolCall
810from chuk_tool_processor .models .tool_result import ToolResult
911from chuk_tool_processor .registry import ToolRegistryInterface , ToolRegistryProvider
@@ -21,6 +23,7 @@ class ToolProcessor:
2123 Main class for processing tool calls from LLM responses.
2224 Combines parsing, execution, and result handling.
2325 """
26+
2427 def __init__ (
2528 self ,
2629 registry : Optional [ToolRegistryInterface ] = None ,
@@ -33,11 +36,11 @@ def __init__(
3336 tool_rate_limits : Optional [Dict [str , tuple ]] = None ,
3437 enable_retries : bool = True ,
3538 max_retries : int = 3 ,
36- parser_plugins : Optional [List [str ]] = None
39+ parser_plugins : Optional [List [str ]] = None ,
3740 ):
3841 """
3942 Initialize the tool processor.
40-
43+
4144 Args:
4245 registry: Tool registry to use. If None, uses the global registry.
4346 default_timeout: Default timeout for tool execution in seconds.
@@ -53,55 +56,55 @@ def __init__(
5356 If None, uses all available parsers.
5457 """
5558 self .logger = get_logger ("chuk_tool_processor.processor" )
56-
59+
5760 # Use provided registry or global registry
5861 self .registry = registry or ToolRegistryProvider .get_registry ()
59-
62+
6063 # Create base executor with in-process strategy
6164 self .strategy = InProcessStrategy (
6265 registry = self .registry ,
6366 default_timeout = default_timeout ,
64- max_concurrency = max_concurrency
67+ max_concurrency = max_concurrency ,
6568 )
66-
69+
6770 self .executor = ToolExecutor (
6871 registry = self .registry ,
6972 default_timeout = default_timeout ,
70- strategy = self .strategy
73+ strategy = self .strategy ,
7174 )
72-
75+
7376 # Apply optional wrappers
7477 if enable_retries :
7578 self .logger .debug ("Enabling retry logic" )
7679 self .executor = RetryableToolExecutor (
7780 executor = self .executor ,
78- default_config = RetryConfig (max_retries = max_retries )
81+ default_config = RetryConfig (max_retries = max_retries ),
7982 )
80-
83+
8184 if enable_rate_limiting :
8285 self .logger .debug ("Enabling rate limiting" )
8386 rate_limiter = RateLimiter (
8487 global_limit = global_rate_limit ,
85- tool_limits = tool_rate_limits
88+ tool_limits = tool_rate_limits ,
8689 )
8790 self .executor = RateLimitedToolExecutor (
8891 executor = self .executor ,
89- rate_limiter = rate_limiter
92+ rate_limiter = rate_limiter ,
9093 )
91-
94+
9295 if enable_caching :
9396 self .logger .debug ("Enabling result caching" )
9497 cache = InMemoryCache (default_ttl = cache_ttl )
9598 self .executor = CachingToolExecutor (
9699 executor = self .executor ,
97100 cache = cache ,
98- default_ttl = cache_ttl
101+ default_ttl = cache_ttl ,
99102 )
100-
103+
101104 # Discover plugins if not already done
102105 if not plugin_registry .list_plugins ().get ("parser" , []):
103106 discover_default_plugins ()
104-
107+
105108 # Get parser plugins
106109 if parser_plugins :
107110 self .parsers = [
@@ -112,63 +115,59 @@ def __init__(
112115 else :
113116 parser_names = plugin_registry .list_plugins ().get ("parser" , [])
114117 self .parsers = [
115- plugin_registry .get_plugin ("parser" , name )
116- for name in parser_names
118+ plugin_registry .get_plugin ("parser" , name ) for name in parser_names
117119 ]
118-
120+
119121 self .logger .debug (f"Initialized with { len (self .parsers )} parser plugins" )
120-
122+
121123 async def process_text (
122124 self ,
123125 text : str ,
124126 timeout : Optional [float ] = None ,
125127 use_cache : bool = True ,
126- request_id : Optional [str ] = None
128+ request_id : Optional [str ] = None ,
127129 ) -> List [ToolResult ]:
128130 """
129131 Process text to extract and execute tool calls.
130-
132+
131133 Args:
132134 text: Text to process.
133135 timeout: Optional timeout for execution.
134136 use_cache: Whether to use cached results.
135137 request_id: Optional request ID for logging.
136-
138+
137139 Returns:
138140 List of tool results.
139141 """
140142 # Create request context
141143 with request_logging (request_id ) as req_id :
142144 self .logger .debug (f"Processing text ({ len (text )} chars)" )
143-
145+
144146 # Extract tool calls
145147 calls = await self ._extract_tool_calls (text )
146-
148+
147149 if not calls :
148150 self .logger .debug ("No tool calls found" )
149151 return []
150-
152+
151153 self .logger .debug (f"Found { len (calls )} tool calls" )
152-
154+
153155 # Execute tool calls
154156 with log_context_span ("tool_execution" , {"num_calls" : len (calls )}):
155157 # Check if any tools are unknown
156- tool_names = set (call .tool for call in calls )
157- unknown_tools = [
158- name for name in tool_names
159- if not self .registry .get_tool (name )
160- ]
161-
158+ tool_names = {call .tool for call in calls }
159+ unknown_tools = [name for name in tool_names if not self .registry .get_tool (name )]
160+
162161 if unknown_tools :
163162 self .logger .warning (f"Unknown tools: { unknown_tools } " )
164-
163+
165164 # Execute tools
166165 results = await self .executor .execute (calls , timeout = timeout )
167-
166+
168167 # Log metrics for each tool call
169168 for call , result in zip (calls , results ):
170169 log_tool_call (call , result )
171-
170+
172171 # Record metrics
173172 duration = (result .end_time - result .start_time ).total_seconds ()
174173 metrics .log_tool_execution (
@@ -177,64 +176,72 @@ async def process_text(
177176 duration = duration ,
178177 error = result .error ,
179178 cached = getattr (result , "cached" , False ),
180- attempts = getattr (result , "attempts" , 1 )
179+ attempts = getattr (result , "attempts" , 1 ),
181180 )
182-
181+
183182 return results
184-
183+
185184 async def _extract_tool_calls (self , text : str ) -> List [ToolCall ]:
186185 """
187186 Extract tool calls from text using all available parsers.
188-
187+
189188 Args:
190189 text: Text to parse.
191-
190+
192191 Returns:
193192 List of tool calls.
194193 """
195- all_calls = []
196-
194+ all_calls : List [ ToolCall ] = []
195+
197196 # Try each parser
198197 with log_context_span ("parsing" , {"text_length" : len (text )}):
199198 for parser in self .parsers :
200199 parser_name = parser .__class__ .__name__
201-
200+
202201 with log_context_span (f"parser.{ parser_name } " , log_duration = True ):
203202 start_time = time .time ()
204-
203+
205204 try :
206205 # Try to parse
207206 calls = parser .try_parse (text )
208-
207+
209208 # Log success
210209 duration = time .time () - start_time
211210 metrics .log_parser_metric (
212211 parser = parser_name ,
213212 success = True ,
214213 duration = duration ,
215- num_calls = len (calls )
214+ num_calls = len (calls ),
216215 )
217-
216+
218217 # Add calls to result
219218 all_calls .extend (calls )
220-
219+
221220 except Exception as e :
222221 # Log failure
223222 duration = time .time () - start_time
224223 metrics .log_parser_metric (
225224 parser = parser_name ,
226225 success = False ,
227226 duration = duration ,
228- num_calls = 0
227+ num_calls = 0 ,
229228 )
230229 self .logger .error (f"Parser { parser_name } failed: { str (e )} " )
231-
232- # Remove duplicates
233- unique_calls = {}
230+
231+ # ------------------------------------------------------------------ #
232+ # Remove duplicates – use a stable digest instead of hashing a
233+ # frozenset of argument items (which breaks on unhashable types).
234+ # ------------------------------------------------------------------ #
235+ def _args_digest (args : Dict [str , Any ]) -> str :
236+ """Return a stable hash for any JSON-serialisable payload."""
237+ blob = json .dumps (args , sort_keys = True , default = str )
238+ return hashlib .md5 (blob .encode ()).hexdigest ()
239+
240+ unique_calls : Dict [str , ToolCall ] = {}
234241 for call in all_calls :
235- key = f"{ call .tool } :{ hash ( frozenset ( call .arguments . items ()) )} "
242+ key = f"{ call .tool } :{ _args_digest ( call .arguments )} "
236243 unique_calls [key ] = call
237-
244+
238245 return list (unique_calls .values ())
239246
240247
@@ -246,23 +253,23 @@ async def process_text(
246253 text : str ,
247254 timeout : Optional [float ] = None ,
248255 use_cache : bool = True ,
249- request_id : Optional [str ] = None
256+ request_id : Optional [str ] = None ,
250257) -> List [ToolResult ]:
251258 """
252259 Process text with the default processor.
253-
260+
254261 Args:
255262 text: Text to process.
256263 timeout: Optional timeout for execution.
257264 use_cache: Whether to use cached results.
258265 request_id: Optional request ID for logging.
259-
266+
260267 Returns:
261268 List of tool results.
262269 """
263270 return await default_processor .process_text (
264271 text = text ,
265272 timeout = timeout ,
266273 use_cache = use_cache ,
267- request_id = request_id
274+ request_id = request_id ,
268275 )
0 commit comments