Skip to content

Commit 3d5bf37

Browse files
committed
debugging sse
1 parent 46c648d commit 3d5bf37

File tree

7 files changed

+1269
-57
lines changed

7 files changed

+1269
-57
lines changed

debug/simple_sse_debug.py

Lines changed: 364 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,364 @@
1+
#!/usr/bin/env python
2+
"""
3+
working_debug_script.py - Clean working async SSE MCP debug script
4+
Based on the successful persistent SSE connection pattern
5+
"""
6+
7+
import asyncio
8+
import json
9+
import os
10+
import uuid
11+
from pathlib import Path
12+
from typing import Dict, Any, Optional
13+
14+
import httpx
15+
from dotenv import load_dotenv
16+
17+
# Load .env file
18+
PROJECT_ROOT = Path(__file__).resolve().parents[1]
19+
env_file = PROJECT_ROOT / ".env"
20+
if env_file.exists():
21+
load_dotenv(env_file)
22+
print(f"✓ Loaded .env file from {env_file}")
23+
24+
class WorkingSSEDebugClient:
25+
"""Clean working async SSE MCP client for debugging."""
26+
27+
def __init__(self, server_url: str, bearer_token: Optional[str] = None):
28+
self.server_url = server_url.rstrip('/')
29+
self.bearer_token = bearer_token
30+
self.session_id = None
31+
self.message_url = None
32+
self.pending_requests: Dict[str, asyncio.Future] = {}
33+
self.sse_task = None
34+
self.client = None
35+
self.sse_response = None
36+
self.sse_stream_context = None
37+
38+
def _get_headers(self) -> Dict[str, str]:
39+
"""Get headers with auth if available."""
40+
headers = {}
41+
if self.bearer_token:
42+
headers['Authorization'] = f'Bearer {self.bearer_token}'
43+
return headers
44+
45+
async def connect(self) -> bool:
46+
"""Connect using the working persistent SSE pattern."""
47+
self.client = httpx.AsyncClient(timeout=60.0)
48+
49+
sse_url = f"{self.server_url}/sse"
50+
print(f"🔗 Connecting to SSE: {sse_url}")
51+
52+
try:
53+
print("📡 Using streaming request (proven to work)...")
54+
55+
# Use streaming approach that works
56+
self.sse_stream_context = self.client.stream('GET', sse_url, headers=self._get_headers())
57+
self.sse_response = await self.sse_stream_context.__aenter__()
58+
59+
if self.sse_response.status_code != 200:
60+
print(f"❌ SSE connection failed: {self.sse_response.status_code}")
61+
text = await self.sse_response.atext()
62+
print(f"Error: {text}")
63+
return False
64+
65+
print(f"✅ SSE streaming connection established: {self.sse_response.status_code}")
66+
67+
# Start persistent SSE processing
68+
self.sse_task = asyncio.create_task(self._process_sse_stream())
69+
70+
# Wait for session discovery
71+
print("⏳ Waiting for session discovery...")
72+
for i in range(50): # 5 seconds max
73+
if self.message_url:
74+
break
75+
await asyncio.sleep(0.1)
76+
if i % 10 == 0 and i > 0:
77+
print(f"⏳ Still waiting... ({i/10:.1f}s)")
78+
79+
if not self.message_url:
80+
print("❌ Failed to get session info from SSE")
81+
return False
82+
83+
print(f"✅ Session ready: {self.session_id}")
84+
print(f"📍 Message URL: {self.message_url}")
85+
return True
86+
87+
except Exception as e:
88+
print(f"❌ Connection failed: {e}")
89+
import traceback
90+
traceback.print_exc()
91+
return False
92+
93+
async def _process_sse_stream(self):
94+
"""Process the persistent SSE stream for session discovery and responses."""
95+
try:
96+
print("👂 Starting persistent SSE stream processing...")
97+
event_count = 0
98+
99+
async for line in self.sse_response.aiter_lines():
100+
line = line.strip()
101+
if not line:
102+
continue
103+
104+
event_count += 1
105+
print(f"📡 SSE #{event_count}: {line}")
106+
107+
# Handle initial session setup
108+
if not self.message_url and line.startswith('data:') and '/messages/' in line:
109+
endpoint_path = line.split(':', 1)[1].strip()
110+
self.message_url = f"{self.server_url}{endpoint_path}"
111+
112+
if 'session_id=' in endpoint_path:
113+
self.session_id = endpoint_path.split('session_id=')[1].split('&')[0]
114+
115+
print(f"✅ Got session info from SSE")
116+
continue
117+
118+
# Handle JSON-RPC responses
119+
await self._handle_sse_response(line)
120+
121+
except Exception as e:
122+
print(f"❌ SSE stream error: {e}")
123+
import traceback
124+
traceback.print_exc()
125+
126+
async def _handle_sse_response(self, line: str):
127+
"""Handle SSE line that might contain JSON-RPC response."""
128+
try:
129+
if not line.startswith('data:'):
130+
return
131+
132+
data_part = line.split(':', 1)[1].strip()
133+
134+
# Skip pings, empty data, and session announcements
135+
if not data_part or data_part.startswith('ping') or '/messages/' in data_part:
136+
return
137+
138+
# Try to parse as JSON-RPC response
139+
try:
140+
response_data = json.loads(data_part)
141+
142+
if 'jsonrpc' in response_data and 'id' in response_data:
143+
request_id = str(response_data['id'])
144+
print(f"🎉 Received JSON-RPC response for ID: {request_id}")
145+
146+
# Resolve pending request
147+
if request_id in self.pending_requests:
148+
future = self.pending_requests.pop(request_id)
149+
if not future.done():
150+
future.set_result(response_data)
151+
print(f"✅ Resolved pending request: {request_id}")
152+
else:
153+
print(f"⚠️ No pending request for ID: {request_id}")
154+
155+
except json.JSONDecodeError:
156+
print(f"📡 Non-JSON data: {data_part}")
157+
158+
except Exception as e:
159+
print(f"⚠️ Error handling SSE response: {e}")
160+
161+
async def send_request(self, method: str, params: Dict[str, Any] = None, timeout: float = 30.0) -> Dict[str, Any]:
162+
"""Send MCP request and wait for async response."""
163+
if not self.message_url:
164+
raise RuntimeError("Not connected - call connect() first")
165+
166+
request_id = str(uuid.uuid4())
167+
message = {
168+
"jsonrpc": "2.0",
169+
"id": request_id,
170+
"method": method,
171+
"params": params or {}
172+
}
173+
174+
print(f"📤 Sending {method} (ID: {request_id})")
175+
176+
# Create future for response
177+
future = asyncio.Future()
178+
self.pending_requests[request_id] = future
179+
180+
try:
181+
# Send message via separate client
182+
headers = {
183+
'Content-Type': 'application/json',
184+
**self._get_headers()
185+
}
186+
187+
async with httpx.AsyncClient(timeout=10.0) as send_client:
188+
response = await send_client.post(self.message_url, headers=headers, json=message)
189+
print(f"📨 Message response: {response.status_code}")
190+
191+
if response.status_code == 202:
192+
print(f"✅ Message accepted - waiting for response via persistent SSE...")
193+
194+
# Wait for async response
195+
try:
196+
result = await asyncio.wait_for(future, timeout=timeout)
197+
print(f"✅ Got response for {method}")
198+
return result
199+
except asyncio.TimeoutError:
200+
self.pending_requests.pop(request_id, None)
201+
raise TimeoutError(f"Timeout waiting for {method} response after {timeout}s")
202+
203+
elif response.status_code == 200:
204+
# Immediate response
205+
self.pending_requests.pop(request_id, None)
206+
return response.json()
207+
else:
208+
self.pending_requests.pop(request_id, None)
209+
raise RuntimeError(f"Request failed: {response.status_code} - {response.text}")
210+
211+
except Exception as e:
212+
self.pending_requests.pop(request_id, None)
213+
raise
214+
215+
async def send_notification(self, method: str, params: Dict[str, Any] = None):
216+
"""Send MCP notification (no response expected)."""
217+
if not self.message_url:
218+
raise RuntimeError("Not connected - call connect() first")
219+
220+
message = {
221+
"jsonrpc": "2.0",
222+
"method": method,
223+
"params": params or {}
224+
}
225+
226+
print(f"📢 Sending notification: {method}")
227+
228+
headers = {
229+
'Content-Type': 'application/json',
230+
**self._get_headers()
231+
}
232+
233+
async with httpx.AsyncClient(timeout=10.0) as send_client:
234+
response = await send_client.post(self.message_url, headers=headers, json=message)
235+
print(f"📨 Notification response: {response.status_code}")
236+
return response.status_code
237+
238+
async def close(self):
239+
"""Clean up connections."""
240+
if self.sse_task:
241+
self.sse_task.cancel()
242+
if self.sse_stream_context:
243+
try:
244+
await self.sse_stream_context.__aexit__(None, None, None)
245+
except:
246+
pass
247+
if self.client:
248+
await self.client.aclose()
249+
250+
def get_server_config():
251+
"""Get server configuration from environment."""
252+
url_map = os.getenv('MCP_SERVER_URL_MAP', '{}')
253+
bearer_token = os.getenv('MCP_BEARER_TOKEN')
254+
255+
try:
256+
url_config = json.loads(url_map)
257+
except json.JSONDecodeError:
258+
return None, None
259+
260+
server_url = url_config.get('perplexity_server')
261+
return server_url, bearer_token
262+
263+
async def debug_mcp_server():
264+
"""Debug the MCP server with working async SSE client."""
265+
print("🔍 Working Async SSE MCP Debug Script")
266+
print("=" * 50)
267+
268+
server_url, bearer_token = get_server_config()
269+
if not server_url:
270+
print("❌ No server configuration found")
271+
return
272+
273+
print(f"🎯 Server: {server_url}")
274+
print(f"🔑 Token: {'SET' if bearer_token else 'NOT SET'}")
275+
276+
client = WorkingSSEDebugClient(server_url, bearer_token)
277+
278+
try:
279+
# Step 1: Connect
280+
print(f"\n{'='*20} CONNECTING {'='*20}")
281+
if not await client.connect():
282+
print("❌ Connection failed - stopping debug")
283+
return
284+
285+
# Step 2: Initialize
286+
print(f"\n{'='*20} INITIALIZING {'='*20}")
287+
init_response = await client.send_request("initialize", {
288+
"protocolVersion": "2024-11-05",
289+
"capabilities": {},
290+
"clientInfo": {
291+
"name": "debug-client",
292+
"version": "1.0.0"
293+
}
294+
})
295+
296+
print(f"✅ Initialize successful!")
297+
print(f"📋 Server: {init_response.get('result', {}).get('serverInfo', {})}")
298+
print(f"📋 Capabilities: {init_response.get('result', {}).get('capabilities', {})}")
299+
300+
# Step 3: Send initialized notification
301+
print(f"\n{'='*20} SENDING INITIALIZED {'='*20}")
302+
await client.send_notification("notifications/initialized")
303+
304+
# Step 4: List tools (try different approaches)
305+
print(f"\n{'='*20} LISTING TOOLS {'='*20}")
306+
307+
# Try with empty params
308+
try:
309+
tools_response = await client.send_request("tools/list", {})
310+
if 'error' not in tools_response:
311+
print(f"✅ Tools list successful!")
312+
tools = tools_response.get('result', {}).get('tools', [])
313+
print(f"📋 Found {len(tools)} tools:")
314+
for i, tool in enumerate(tools[:5]): # Show first 5
315+
name = tool.get('name', 'unknown')
316+
desc = tool.get('description', 'No description')
317+
print(f" {i+1}. {name}: {desc}")
318+
else:
319+
print(f"❌ Tools list failed: {tools_response.get('error', {}).get('message', 'Unknown error')}")
320+
321+
# Try without params
322+
print(f"🔄 Retrying without params...")
323+
tools_response = await client.send_request("tools/list")
324+
if 'error' not in tools_response:
325+
print(f"✅ Tools list successful (no params)!")
326+
tools = tools_response.get('result', {}).get('tools', [])
327+
print(f"📋 Found {len(tools)} tools")
328+
else:
329+
print(f"❌ Still failed: {tools_response.get('error', {}).get('message', 'Unknown error')}")
330+
331+
except Exception as e:
332+
print(f"❌ Tools list error: {e}")
333+
334+
# Step 5: Try other methods
335+
print(f"\n{'='*20} TESTING OTHER METHODS {'='*20}")
336+
337+
test_methods = [
338+
("resources/list", {}),
339+
("prompts/list", {}),
340+
]
341+
342+
for method, params in test_methods:
343+
try:
344+
print(f"🔧 Testing {method}...")
345+
response = await client.send_request(method, params, timeout=10.0)
346+
if 'error' not in response:
347+
print(f" ✅ {method} successful!")
348+
else:
349+
error_msg = response.get('error', {}).get('message', 'Unknown error')
350+
print(f" ❌ {method} failed: {error_msg}")
351+
except Exception as e:
352+
print(f" ❌ {method} error: {e}")
353+
354+
print(f"\n🎉 Debug completed successfully!")
355+
356+
except Exception as e:
357+
print(f"❌ Debug failed: {e}")
358+
import traceback
359+
traceback.print_exc()
360+
finally:
361+
await client.close()
362+
363+
if __name__ == "__main__":
364+
asyncio.run(debug_mcp_server())

0 commit comments

Comments
 (0)