@@ -82,6 +82,9 @@ class DynamoDBOnlineStoreConfig(FeastConfigBaseModel):
82
82
max_pool_connections : int = 10
83
83
"""Max number of connections for async Dynamodb operations"""
84
84
85
+ keepalive_timeout : float = 12.0
86
+ """Keep-alive timeout in seconds for async Dynamodb connections."""
87
+
85
88
86
89
class DynamoDBOnlineStore (OnlineStore ):
87
90
"""
@@ -97,7 +100,9 @@ class DynamoDBOnlineStore(OnlineStore):
97
100
98
101
async def initialize (self , config : RepoConfig ):
99
102
await _get_aiodynamodb_client (
100
- config .online_store .region , config .online_store .max_pool_connections
103
+ config .online_store .region ,
104
+ config .online_store .max_pool_connections ,
105
+ config .online_store .keepalive_timeout ,
101
106
)
102
107
103
108
async def close (self ):
@@ -272,7 +277,9 @@ async def online_write_batch_async(
272
277
for entity_key , features , timestamp , _ in _latest_data_to_write (data )
273
278
]
274
279
client = await _get_aiodynamodb_client (
275
- online_config .region , config .online_store .max_pool_connections
280
+ online_config .region ,
281
+ online_config .max_pool_connections ,
282
+ online_config .keepalive_timeout ,
276
283
)
277
284
await dynamo_write_items_async (client , table_name , items )
278
285
@@ -377,7 +384,9 @@ def to_tbl_resp(raw_client_response):
377
384
entity_id_batches .append (entity_id_batch )
378
385
379
386
client = await _get_aiodynamodb_client (
380
- online_config .region , online_config .max_pool_connections
387
+ online_config .region ,
388
+ online_config .max_pool_connections ,
389
+ online_config .keepalive_timeout ,
381
390
)
382
391
response_batches = await asyncio .gather (
383
392
* [
@@ -536,14 +545,19 @@ def _get_aioboto_session():
536
545
return _aioboto_session
537
546
538
547
539
- async def _get_aiodynamodb_client (region : str , max_pool_connections : int ):
548
+ async def _get_aiodynamodb_client (
549
+ region : str , max_pool_connections : int , keepalive_timeout : float
550
+ ):
540
551
global _aioboto_client
541
552
if _aioboto_client is None :
542
553
logger .debug ("initializing the aiobotocore dynamodb client" )
543
554
client_context = _get_aioboto_session ().create_client (
544
555
"dynamodb" ,
545
556
region_name = region ,
546
- config = AioConfig (max_pool_connections = max_pool_connections ),
557
+ config = AioConfig (
558
+ max_pool_connections = max_pool_connections ,
559
+ connector_args = {"keepalive_timeout" : keepalive_timeout },
560
+ ),
547
561
)
548
562
context_stack = contextlib .AsyncExitStack ()
549
563
_aioboto_client = await context_stack .enter_async_context (client_context )
0 commit comments