Skip to content

Commit df83394

Browse files
committed
refactor(http_parsing_utils.py): streamline request body handling and memory management
- Removed the weak reference cache for request bodies and replaced it with direct storage on the request object to prevent memory leaks. - Implemented immediate garbage collection every 100 requests to manage memory usage effectively. - Added a cleanup function to explicitly free memory associated with request processing. - Updated the logic for retrieving and storing parsed request bodies to enhance performance and reliability.
1 parent 460ef37 commit df83394

File tree

1 file changed

+43
-42
lines changed

1 file changed

+43
-42
lines changed

litellm/proxy/common_utils/http_parsing_utils.py

Lines changed: 43 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import json
2-
import weakref
32
from typing import Any, Dict, List, Optional
43

54
import orjson
@@ -9,10 +8,6 @@
98
from litellm.proxy._types import ProxyException
109
from litellm.types.router import Deployment
1110

12-
# Use a regular dictionary with manual cleanup
13-
# WeakValueDictionary doesn't work well with our setup since request objects may be kept alive
14-
_request_body_cache: Dict[int, Dict] = {}
15-
1611

1712
async def _read_request_body(request: Optional[Request]) -> Dict:
1813
"""
@@ -49,7 +44,19 @@ async def _read_request_body(request: Optional[Request]) -> Dict:
4944
parsed_body = {}
5045
else:
5146
try:
47+
# Parse and immediately clear body reference to prevent memory accumulation
5248
parsed_body = orjson.loads(body)
49+
del body # Clear body reference immediately
50+
51+
# Force garbage collection every 100 requests to prevent accumulation
52+
import gc
53+
if hasattr(_read_request_body, '_gc_counter'):
54+
_read_request_body._gc_counter += 1
55+
else:
56+
_read_request_body._gc_counter = 1
57+
58+
if _read_request_body._gc_counter % 100 == 0:
59+
gc.collect()
5360
except orjson.JSONDecodeError as e:
5461
# First try the standard json module which is more forgiving
5562
# First decode bytes to string if needed
@@ -95,36 +102,15 @@ async def _read_request_body(request: Optional[Request]) -> Dict:
95102
return {}
96103

97104

98-
def clear_request_cache(request: Optional[Request] = None) -> None:
99-
"""
100-
Clear cached request bodies to free memory.
101-
102-
Parameters:
103-
- request: If provided, only clear cache for this specific request.
104-
If None, clear entire cache.
105-
"""
106-
global _request_body_cache
107-
108-
if request is not None:
109-
request_id = id(request)
110-
if request_id in _request_body_cache:
111-
try:
112-
del _request_body_cache[request_id]
113-
except KeyError:
114-
pass
115-
else:
116-
# Clear entire cache
117-
_request_body_cache.clear()
118105

119106

120107
def _safe_get_request_parsed_body(request: Optional[Request]) -> Optional[dict]:
121108
if request is None:
122109
return None
123110

124-
# Try to get from cache first
125-
request_id = id(request)
126-
if request_id in _request_body_cache:
127-
return _request_body_cache[request_id]
111+
# Check if we already have a parsed body stored directly on the request object
112+
if hasattr(request, "_litellm_parsed_body"):
113+
return getattr(request, "_litellm_parsed_body")
128114

129115
# Fallback to checking request.scope for backward compatibility
130116
if (
@@ -134,13 +120,34 @@ def _safe_get_request_parsed_body(request: Optional[Request]) -> Optional[dict]:
134120
):
135121
accepted_keys, parsed_body = request.scope["parsed_body"]
136122
result = {key: parsed_body[key] for key in accepted_keys}
137-
# Clean up the scope to free memory
123+
# Clean up the scope to free memory and store on request object
138124
del request.scope["parsed_body"]
139-
# Store in our cache for consistency
140-
_request_body_cache[request_id] = result
125+
setattr(request, "_litellm_parsed_body", result)
141126
return result
142127
return None
143128

129+
130+
def cleanup_request_memory(request: Optional[Request]) -> None:
131+
"""
132+
Explicitly cleanup request memory to prevent leaks.
133+
Call this after request processing is complete.
134+
"""
135+
if request is None:
136+
return
137+
138+
try:
139+
# Remove parsed body from request object
140+
if hasattr(request, "_litellm_parsed_body"):
141+
delattr(request, "_litellm_parsed_body")
142+
143+
# Clean up any remaining scope data
144+
if hasattr(request, "scope") and "parsed_body" in request.scope:
145+
del request.scope["parsed_body"]
146+
147+
except Exception:
148+
pass # Silent cleanup - don't break request processing
149+
150+
144151
def _safe_get_request_query_params(request: Optional[Request]) -> Dict:
145152
if request is None:
146153
return {}
@@ -162,16 +169,10 @@ def _safe_set_request_parsed_body(
162169
if request is None:
163170
return
164171

165-
# Store in cache with size limit to prevent unbounded growth
166-
request_id = id(request)
167-
_request_body_cache[request_id] = parsed_body
168-
169-
# If cache gets too large, remove oldest entries
170-
if len(_request_body_cache) > 1000: # Maximum 1000 cached requests
171-
# Remove the oldest 100 entries
172-
keys_to_remove = list(_request_body_cache.keys())[:100]
173-
for key in keys_to_remove:
174-
del _request_body_cache[key]
172+
# Store the parsed body directly on the request object
173+
# This prevents memory leaks and data cross-contamination since
174+
# the data is tied to the specific request object lifecycle
175+
setattr(request, "_litellm_parsed_body", parsed_body)
175176

176177
except Exception as e:
177178
verbose_proxy_logger.debug(

0 commit comments

Comments
 (0)