Skip to content

Commit 39fac0d

Browse files
committed
Switched async HTTP calls to the Pinecone async SDK
1 parent bc8cf24 commit 39fac0d

File tree

2 files changed

+142
-108
lines changed

2 files changed

+142
-108
lines changed

.dagger/src/semantic_router_ci/main.py

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -188,17 +188,12 @@ async def test(
188188
)
189189
# Set env vars inside test container
190190
container = (
191-
container
192-
.with_env_variable("PINECONE_API_BASE_URL", pinecone_api_base_url)
191+
container.with_env_variable("PINECONE_API_BASE_URL", pinecone_api_base_url)
193192
.with_env_variable(
194193
"POSTGRES_HOST", os.environ.get("POSTGRES_HOST", "postgres")
195194
)
196-
.with_env_variable(
197-
"POSTGRES_PORT", os.environ.get("POSTGRES_PORT", "5432")
198-
)
199-
.with_env_variable(
200-
"POSTGRES_DB", os.environ.get("POSTGRES_DB", "postgres")
201-
)
195+
.with_env_variable("POSTGRES_PORT", os.environ.get("POSTGRES_PORT", "5432"))
196+
.with_env_variable("POSTGRES_DB", os.environ.get("POSTGRES_DB", "postgres"))
202197
.with_env_variable(
203198
"POSTGRES_USER", os.environ.get("POSTGRES_USER", "postgres")
204199
)

semantic_router/index/pinecone.py

Lines changed: 139 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@
33
import json
44
import os
55
import time
6-
from datetime import datetime, timezone
7-
from json.decoder import JSONDecodeError
86
from typing import Any, Dict, List, Optional, Tuple, Union
97

108
import aiohttp
@@ -345,6 +343,7 @@ def _init_index(self, force_create: bool = False) -> Union[Any, None]:
345343
# If index creation is forbidden (likely quota), surface a clear
346344
# instruction to reuse an existing index instead of adding fallback logic.
347345
from pinecone.exceptions import ForbiddenException
346+
348347
if isinstance(e, ForbiddenException):
349348
raise RuntimeError(
350349
"Pinecone index creation forbidden (likely quota). "
@@ -386,7 +385,6 @@ def _init_index(self, force_create: bool = False) -> Union[Any, None]:
386385
else:
387386
time.sleep(0.2)
388387
elif index_exists:
389-
desc = self.client.describe_index(self.index_name)
390388
# Let the SDK pick the correct host (cloud or local) based on client configuration
391389
index = self.client.Index(self.index_name)
392390
self.index = index
@@ -1174,47 +1172,49 @@ async def _async_query(
11741172
:param include_metadata: Whether to include metadata in the results, defaults to False.
11751173
:type include_metadata: bool, optional
11761174
"""
1177-
params = {
1178-
"vector": vector,
1179-
"sparse_vector": sparse_vector,
1180-
"namespace": namespace,
1181-
"filter": filter,
1182-
"top_k": top_k,
1183-
"include_metadata": include_metadata,
1184-
"topK": top_k,
1185-
"includeMetadata": include_metadata,
1186-
}
1175+
# Params now passed directly via SDK below
11871176
if not (await self.ais_ready()):
11881177
raise ValueError("Async index is not initialized.")
1189-
# Ensure host is set for cloud/local
1190-
if not self.host:
1191-
desc = self.client.describe_index(self.index_name)
1192-
self.index_host = getattr(desc, "host", None)
1193-
if self._using_local_emulator:
1194-
self.host = "http://pinecone:5080"
1195-
elif self.index_host:
1196-
self.host = (
1197-
f"https://{self.index_host}"
1198-
if not str(self.index_host).startswith("http")
1199-
else str(self.index_host)
1178+
# Use Pinecone async SDK instead of manual HTTP
1179+
try:
1180+
from pinecone import PineconeAsyncio
1181+
except ImportError as e:
1182+
raise ImportError(
1183+
'Pinecone asyncio support not installed. Install with `pip install "pinecone[asyncio]"`.'
1184+
) from e
1185+
1186+
async with PineconeAsyncio(api_key=self.api_key) as apc:
1187+
# Resolve host via describe if not already known
1188+
index_host: Optional[str] = self.host or None
1189+
if not index_host:
1190+
desc = await apc.describe_index(self.index_name)
1191+
candidate = (
1192+
desc.get("host")
1193+
if isinstance(desc, dict)
1194+
else getattr(desc, "host", None)
1195+
)
1196+
if isinstance(candidate, str):
1197+
index_host = candidate
1198+
else:
1199+
index_host = None
1200+
if self._using_local_emulator and not index_host:
1201+
index_host = "http://pinecone:5080"
1202+
if not index_host:
1203+
raise ValueError(
1204+
"Could not resolve Pinecone index host for async query"
1205+
)
1206+
if not index_host.startswith("http"):
1207+
index_host = f"https://{index_host}"
1208+
1209+
async with apc.Index(host=index_host) as aindex:
1210+
return await aindex.query(
1211+
vector=vector,
1212+
sparse_vector=sparse_vector,
1213+
namespace=namespace,
1214+
filter=filter,
1215+
top_k=top_k,
1216+
include_metadata=include_metadata,
12001217
)
1201-
1202-
async with aiohttp.ClientSession() as session:
1203-
async with session.post(
1204-
f"{self.host}/vectors/query",
1205-
json=params,
1206-
headers=self.headers,
1207-
) as response:
1208-
if response.status != 200:
1209-
error_text = await response.text()
1210-
logger.error(f"Error in query response: {error_text}")
1211-
return {} # or handle the error as needed
1212-
1213-
try:
1214-
return await response.json(content_type=None)
1215-
except JSONDecodeError as e:
1216-
logger.error(f"JSON decode error: {e}")
1217-
return {}
12181218

12191219
async def ais_ready(self, client_only: bool = False) -> bool:
12201220
"""Checks if class attributes exist to be used for async operations.
@@ -1278,26 +1278,36 @@ async def _async_upsert(
12781278
if not (await self.ais_ready()):
12791279
raise ValueError("Async index is not initialized.")
12801280

1281-
params = {
1282-
"vectors": vectors,
1283-
"namespace": namespace,
1284-
}
1285-
1286-
if self.base_url and "api.pinecone.io" in self.base_url:
1287-
if not self.host.startswith("http"):
1288-
logger.error(f"host exists:{self.host}")
1289-
self.host = f"https://{self.host}"
1281+
# Params now passed directly via SDK below
12901282

1291-
elif self.host.startswith("localhost") and self.base_url:
1292-
self.host = f"http://{self.base_url.split(':')[-2].strip('/')}:{self.host.split(':')[-1]}"
1293-
async with aiohttp.ClientSession() as session:
1294-
async with session.post(
1295-
f"{self.host}/vectors/upsert",
1296-
json=params,
1297-
headers=self.headers,
1298-
) as response:
1299-
res = await response.json(content_type=None)
1300-
return res
1283+
# Use Pinecone async SDK for upsert
1284+
try:
1285+
from pinecone import PineconeAsyncio
1286+
except ImportError as e:
1287+
raise ImportError(
1288+
'Pinecone asyncio support not installed. Install with `pip install "pinecone[asyncio]"`.'
1289+
) from e
1290+
1291+
async with PineconeAsyncio(api_key=self.api_key) as apc:
1292+
index_host: Optional[str] = self.host or None
1293+
if not index_host:
1294+
desc = await apc.describe_index(self.index_name)
1295+
candidate = (
1296+
desc.get("host")
1297+
if isinstance(desc, dict)
1298+
else getattr(desc, "host", None)
1299+
)
1300+
index_host = candidate if isinstance(candidate, str) else None
1301+
if self._using_local_emulator and not index_host:
1302+
index_host = "http://pinecone:5080"
1303+
if not index_host:
1304+
raise ValueError(
1305+
"Could not resolve Pinecone index host for async upsert"
1306+
)
1307+
if not index_host.startswith("http"):
1308+
index_host = f"https://{index_host}"
1309+
async with apc.Index(host=index_host) as aindex:
1310+
return await aindex.upsert(vectors=vectors, namespace=namespace)
13011311

13021312
async def _async_create_index(
13031313
self,
@@ -1472,37 +1482,55 @@ async def _async_fetch_metadata(
14721482
elif self.host.startswith("localhost") and self.base_url:
14731483
self.host = f"http://{self.base_url.split(':')[-2].strip('/')}:{self.host.split(':')[-1]}"
14741484

1475-
url = f"{self.host}/vectors/fetch"
1476-
1477-
params = {
1478-
"ids": [vector_id],
1479-
}
1480-
1481-
if namespace:
1482-
params["namespace"] = [namespace]
1483-
elif self.namespace:
1484-
params["namespace"] = [self.namespace]
1485-
1486-
async with aiohttp.ClientSession() as session:
1487-
async with session.get(
1488-
url, params=params, headers=self.headers
1489-
) as response:
1490-
if response.status != 200:
1491-
error_text = await response.text()
1492-
logger.error(f"Error fetching metadata: {error_text}")
1493-
return {}
1494-
1485+
# Use Pinecone async SDK to fetch metadata
1486+
try:
1487+
from pinecone import PineconeAsyncio
1488+
except ImportError as e:
1489+
raise ImportError(
1490+
'Pinecone asyncio support not installed. Install with `pip install "pinecone[asyncio]"`.'
1491+
) from e
1492+
async with PineconeAsyncio(api_key=self.api_key) as apc:
1493+
index_host: Optional[str] = self.host or None
1494+
if not index_host:
1495+
desc = await apc.describe_index(self.index_name)
1496+
candidate = (
1497+
desc.get("host")
1498+
if isinstance(desc, dict)
1499+
else getattr(desc, "host", None)
1500+
)
1501+
index_host = candidate if isinstance(candidate, str) else None
1502+
if index_host and not str(index_host).startswith("http"):
1503+
index_host = f"https://{index_host}"
1504+
if self._using_local_emulator and not index_host:
1505+
index_host = "http://pinecone:5080"
1506+
if not index_host:
1507+
raise ValueError(
1508+
"Could not resolve Pinecone index host for async fetch"
1509+
)
1510+
async with apc.Index(host=index_host) as aindex:
1511+
data = await aindex.fetch(
1512+
ids=[vector_id], namespace=namespace or self.namespace
1513+
)
14951514
try:
1496-
response_data = await response.json(content_type=None)
1515+
if hasattr(data, "vectors"):
1516+
vectors = data.vectors
1517+
else:
1518+
vectors = (
1519+
data.get("vectors", []) if isinstance(data, dict) else []
1520+
)
1521+
if vectors:
1522+
first = (
1523+
vectors[0]
1524+
if isinstance(vectors, list)
1525+
else vectors.get(vector_id)
1526+
)
1527+
metadata = getattr(first, "metadata", None) or (
1528+
first.get("metadata") if isinstance(first, dict) else {}
1529+
)
1530+
return metadata or {}
14971531
except Exception as e:
1498-
logger.warning(f"No metadata found for vector {vector_id}: {e}")
1499-
return {}
1500-
1501-
return (
1502-
response_data.get("vectors", {})
1503-
.get(vector_id, {})
1504-
.get("metadata", {})
1505-
)
1532+
logger.error(f"Error parsing metadata response: {e}")
1533+
return {}
15061534

15071535
def __len__(self):
15081536
"""Returns the total number of vectors in the index. If the index is not initialized
@@ -1546,14 +1574,25 @@ async def _async_describe_index_stats(self):
15461574
:return: Index statistics.
15471575
:rtype: dict
15481576
"""
1549-
url = f"{self.index_host}/describe_index_stats"
1550-
1551-
async with aiohttp.ClientSession() as session:
1552-
async with session.post(
1553-
url,
1554-
headers=self.headers,
1555-
json={"namespace": self.namespace},
1556-
timeout=aiohttp.ClientTimeout(total=300),
1557-
) as response:
1558-
response.raise_for_status()
1559-
return await response.json()
1577+
# Use Pinecone async SDK to describe index stats
1578+
try:
1579+
from pinecone import PineconeAsyncio
1580+
except ImportError as e:
1581+
raise ImportError(
1582+
'Pinecone asyncio support not installed. Install with `pip install "pinecone[asyncio]"`.'
1583+
) from e
1584+
async with PineconeAsyncio(api_key=self.api_key) as apc:
1585+
index_host = self.host
1586+
if not index_host:
1587+
desc = await apc.describe_index(self.index_name)
1588+
index_host = (
1589+
desc.get("host")
1590+
if isinstance(desc, dict)
1591+
else getattr(desc, "host", None)
1592+
)
1593+
if index_host and not str(index_host).startswith("http"):
1594+
index_host = f"https://{index_host}"
1595+
if self._using_local_emulator and not index_host:
1596+
index_host = "http://pinecone:5080"
1597+
async with apc.Index(host=index_host) as aindex:
1598+
return await aindex.describe_index_stats(namespace=self.namespace)

0 commit comments

Comments
 (0)