@@ -38,7 +38,8 @@ use nautilus_model::{
38
38
identifiers:: { ClientId , Symbol , Venue } ,
39
39
instruments:: Instrument ,
40
40
} ;
41
- use tokio:: sync:: mpsc;
41
+ use tokio:: { sync:: mpsc, task:: JoinHandle } ;
42
+ use tokio_util:: sync:: CancellationToken ;
42
43
43
44
use crate :: {
44
45
historical:: { DatabentoHistoricalClient , RangeQueryParams } ,
@@ -99,6 +100,10 @@ pub struct DatabentoDataClient {
99
100
loader : DatabentoDataLoader ,
100
101
/// Feed handler command senders per dataset.
101
102
cmd_channels : Arc < Mutex < AHashMap < String , mpsc:: UnboundedSender < LiveCommand > > > > ,
103
+ /// Task handles for life cycle management.
104
+ task_handles : Arc < Mutex < Vec < JoinHandle < ( ) > > > > ,
105
+ /// Cancellation token for graceful shutdown.
106
+ cancellation_token : CancellationToken ,
102
107
/// Publisher to venue mapping.
103
108
publisher_venue_map : Arc < IndexMap < PublisherId , Venue > > ,
104
109
/// Symbol to venue mapping (for caching).
@@ -143,6 +148,8 @@ impl DatabentoDataClient {
143
148
historical,
144
149
loader,
145
150
cmd_channels : Arc :: new ( Mutex :: new ( AHashMap :: new ( ) ) ) ,
151
+ task_handles : Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) ,
152
+ cancellation_token : CancellationToken :: new ( ) ,
146
153
publisher_venue_map : Arc :: new ( publisher_venue_map) ,
147
154
symbol_venue_map : Arc :: new ( RwLock :: new ( AHashMap :: new ( ) ) ) ,
148
155
} )
@@ -167,9 +174,9 @@ impl DatabentoDataClient {
167
174
if !channels. contains_key ( dataset) {
168
175
tracing:: info!( "Creating new feed handler for dataset: {dataset}" ) ;
169
176
let cmd_tx = self . initialize_live_feed ( dataset. to_string ( ) ) ?;
170
- channels. insert ( dataset. to_string ( ) , cmd_tx) ;
177
+ channels. insert ( dataset. to_string ( ) , cmd_tx. clone ( ) ) ;
171
178
172
- self . send_command_to_dataset ( dataset, LiveCommand :: Start ) ? ;
179
+ tracing :: debug! ( "Feed handler created for dataset: {dataset}, channel stored" ) ;
173
180
}
174
181
175
182
Ok ( ( ) )
@@ -214,50 +221,79 @@ impl DatabentoDataClient {
214
221
self . config . bars_timestamp_on_close ,
215
222
) ;
216
223
217
- // Spawn the feed handler task
218
- tokio:: spawn ( async move {
219
- if let Err ( e) = feed_handler. run ( ) . await {
220
- tracing:: error!( "Feed handler error: {e}" ) ;
224
+ let cancellation_token = self . cancellation_token . clone ( ) ;
225
+
226
+ // Spawn the feed handler task with cancellation support
227
+ let feed_handle = tokio:: spawn ( async move {
228
+ tokio:: select! {
229
+ result = feed_handler. run( ) => {
230
+ if let Err ( e) = result {
231
+ tracing:: error!( "Feed handler error: {e}" ) ;
232
+ }
233
+ }
234
+ _ = cancellation_token. cancelled( ) => {
235
+ tracing:: info!( "Feed handler cancelled" ) ;
236
+ }
221
237
}
222
238
} ) ;
223
239
224
- // Spawn message processing task
225
- tokio:: spawn ( async move {
240
+ let cancellation_token = self . cancellation_token . clone ( ) ;
241
+
242
+ // Spawn message processing task with cancellation support
243
+ let msg_handle = tokio:: spawn ( async move {
226
244
let mut msg_rx = msg_rx;
227
- while let Some ( msg) = msg_rx. recv ( ) . await {
228
- match msg {
229
- LiveMessage :: Data ( data) => {
230
- tracing:: debug!( "Received data: {data:?}" ) ;
231
- // TODO: Forward to message bus or data engine
232
- }
233
- LiveMessage :: Instrument ( instrument) => {
234
- tracing:: debug!( "Received instrument: {}" , instrument. id( ) ) ;
235
- // TODO: Forward to cache or instrument manager
236
- }
237
- LiveMessage :: Status ( status) => {
238
- tracing:: debug!( "Received status: {status:?}" ) ;
239
- // TODO: Forward to appropriate handler
240
- }
241
- LiveMessage :: Imbalance ( imbalance) => {
242
- tracing:: debug!( "Received imbalance: {imbalance:?}" ) ;
243
- // TODO: Forward to appropriate handler
245
+ loop {
246
+ tokio:: select! {
247
+ msg = msg_rx. recv( ) => {
248
+ match msg {
249
+ Some ( LiveMessage :: Data ( data) ) => {
250
+ tracing:: debug!( "Received data: {data:?}" ) ;
251
+ // TODO: Forward to message bus or data engine
252
+ }
253
+ Some ( LiveMessage :: Instrument ( instrument) ) => {
254
+ tracing:: debug!( "Received instrument: {}" , instrument. id( ) ) ;
255
+ // TODO: Forward to cache or instrument manager
256
+ }
257
+ Some ( LiveMessage :: Status ( status) ) => {
258
+ tracing:: debug!( "Received status: {status:?}" ) ;
259
+ // TODO: Forward to appropriate handler
260
+ }
261
+ Some ( LiveMessage :: Imbalance ( imbalance) ) => {
262
+ tracing:: debug!( "Received imbalance: {imbalance:?}" ) ;
263
+ // TODO: Forward to appropriate handler
264
+ }
265
+ Some ( LiveMessage :: Statistics ( statistics) ) => {
266
+ tracing:: debug!( "Received statistics: {statistics:?}" ) ;
267
+ // TODO: Forward to appropriate handler
268
+ }
269
+ Some ( LiveMessage :: Error ( error) ) => {
270
+ tracing:: error!( "Feed handler error: {error}" ) ;
271
+ // TODO: Handle error appropriately
272
+ }
273
+ Some ( LiveMessage :: Close ) => {
274
+ tracing:: info!( "Feed handler closed" ) ;
275
+ break ;
276
+ }
277
+ None => {
278
+ tracing:: debug!( "Message channel closed" ) ;
279
+ break ;
280
+ }
281
+ }
244
282
}
245
- LiveMessage :: Statistics ( statistics) => {
246
- tracing:: debug!( "Received statistics: {statistics:?}" ) ;
247
- // TODO: Forward to appropriate handler
248
- }
249
- LiveMessage :: Error ( error) => {
250
- tracing:: error!( "Feed handler error: {error}" ) ;
251
- // TODO: Handle error appropriately
252
- }
253
- LiveMessage :: Close => {
254
- tracing:: info!( "Feed handler closed" ) ;
283
+ _ = cancellation_token. cancelled( ) => {
284
+ tracing:: info!( "Message processing cancelled" ) ;
255
285
break ;
256
286
}
257
287
}
258
288
}
259
289
} ) ;
260
290
291
+ {
292
+ let mut handles = self . task_handles . lock ( ) . unwrap ( ) ;
293
+ handles. push ( feed_handle) ;
294
+ handles. push ( msg_handle) ;
295
+ }
296
+
261
297
Ok ( cmd_tx)
262
298
}
263
299
}
@@ -280,6 +316,9 @@ impl DataClient for DatabentoDataClient {
280
316
fn stop ( & self ) -> anyhow:: Result < ( ) > {
281
317
tracing:: debug!( "Stopping Databento data client" ) ;
282
318
319
+ // Signal cancellation to all running tasks
320
+ self . cancellation_token . cancel ( ) ;
321
+
283
322
// Send close command to all active feed handlers
284
323
let channels = self . cmd_channels . lock ( ) . unwrap ( ) ;
285
324
for ( dataset, tx) in channels. iter ( ) {
@@ -317,6 +356,9 @@ impl DataClient for DatabentoDataClient {
317
356
async fn disconnect ( & self ) -> anyhow:: Result < ( ) > {
318
357
tracing:: debug!( "Disconnecting Databento data client" ) ;
319
358
359
+ // Signal cancellation to all running tasks
360
+ self . cancellation_token . cancel ( ) ;
361
+
320
362
// Send close command to all active feed handlers
321
363
{
322
364
let channels = self . cmd_channels . lock ( ) . unwrap ( ) ;
@@ -327,9 +369,22 @@ impl DataClient for DatabentoDataClient {
327
369
}
328
370
}
329
371
372
+ // Wait for all spawned tasks to complete
373
+ let handles = {
374
+ let mut task_handles = self . task_handles . lock ( ) . unwrap ( ) ;
375
+ std:: mem:: take ( & mut * task_handles)
376
+ } ;
377
+
378
+ for handle in handles {
379
+ if let Err ( e) = handle. await {
380
+ if !e. is_cancelled ( ) {
381
+ tracing:: error!( "Task join error: {e}" ) ;
382
+ }
383
+ }
384
+ }
385
+
330
386
self . is_connected . store ( false , Ordering :: Relaxed ) ;
331
387
332
- // Clear all command senders
333
388
{
334
389
let mut channels = self . cmd_channels . lock ( ) . unwrap ( ) ;
335
390
channels. clear ( ) ;
@@ -352,8 +407,18 @@ impl DataClient for DatabentoDataClient {
352
407
tracing:: debug!( "Subscribe quotes: {cmd:?}" ) ;
353
408
354
409
let dataset = self . get_dataset_for_venue ( cmd. instrument_id . venue ) ?;
410
+ let was_new_handler = {
411
+ let channels = self . cmd_channels . lock ( ) . unwrap ( ) ;
412
+ !channels. contains_key ( & dataset)
413
+ } ;
414
+
355
415
self . get_or_create_feed_handler ( & dataset) ?;
356
416
417
+ // Start the feed handler if it was newly created
418
+ if was_new_handler {
419
+ self . send_command_to_dataset ( & dataset, LiveCommand :: Start ) ?;
420
+ }
421
+
357
422
let symbol = instrument_id_to_symbol_string (
358
423
cmd. instrument_id ,
359
424
& mut self . symbol_venue_map . write ( ) . unwrap ( ) ,
@@ -373,8 +438,18 @@ impl DataClient for DatabentoDataClient {
373
438
tracing:: debug!( "Subscribe trades: {cmd:?}" ) ;
374
439
375
440
let dataset = self . get_dataset_for_venue ( cmd. instrument_id . venue ) ?;
441
+ let was_new_handler = {
442
+ let channels = self . cmd_channels . lock ( ) . unwrap ( ) ;
443
+ !channels. contains_key ( & dataset)
444
+ } ;
445
+
376
446
self . get_or_create_feed_handler ( & dataset) ?;
377
447
448
+ // Start the feed handler if it was newly created
449
+ if was_new_handler {
450
+ self . send_command_to_dataset ( & dataset, LiveCommand :: Start ) ?;
451
+ }
452
+
378
453
let symbol = instrument_id_to_symbol_string (
379
454
cmd. instrument_id ,
380
455
& mut self . symbol_venue_map . write ( ) . unwrap ( ) ,
@@ -394,8 +469,18 @@ impl DataClient for DatabentoDataClient {
394
469
tracing:: debug!( "Subscribe book deltas: {cmd:?}" ) ;
395
470
396
471
let dataset = self . get_dataset_for_venue ( cmd. instrument_id . venue ) ?;
472
+ let was_new_handler = {
473
+ let channels = self . cmd_channels . lock ( ) . unwrap ( ) ;
474
+ !channels. contains_key ( & dataset)
475
+ } ;
476
+
397
477
self . get_or_create_feed_handler ( & dataset) ?;
398
478
479
+ // Start the feed handler if it was newly created
480
+ if was_new_handler {
481
+ self . send_command_to_dataset ( & dataset, LiveCommand :: Start ) ?;
482
+ }
483
+
399
484
let symbol = instrument_id_to_symbol_string (
400
485
cmd. instrument_id ,
401
486
& mut self . symbol_venue_map . write ( ) . unwrap ( ) ,
@@ -418,8 +503,18 @@ impl DataClient for DatabentoDataClient {
418
503
tracing:: debug!( "Subscribe instrument status: {cmd:?}" ) ;
419
504
420
505
let dataset = self . get_dataset_for_venue ( cmd. instrument_id . venue ) ?;
506
+ let was_new_handler = {
507
+ let channels = self . cmd_channels . lock ( ) . unwrap ( ) ;
508
+ !channels. contains_key ( & dataset)
509
+ } ;
510
+
421
511
self . get_or_create_feed_handler ( & dataset) ?;
422
512
513
+ // Start the feed handler if it was newly created
514
+ if was_new_handler {
515
+ self . send_command_to_dataset ( & dataset, LiveCommand :: Start ) ?;
516
+ }
517
+
423
518
let symbol = instrument_id_to_symbol_string (
424
519
cmd. instrument_id ,
425
520
& mut self . symbol_venue_map . write ( ) . unwrap ( ) ,
0 commit comments