@@ -253,17 +253,22 @@ func (pq *persistentQueue[T]) putInternal(ctx context.Context, req T) error {
253
253
return err
254
254
}
255
255
}
256
+ // Operations will include item and write index (and context if spancontext feature enabled)
257
+ ops := make ([]* storage.Operation , 2 , 3 )
258
+ ops [0 ] = storage .SetOperation (writeIndexKey , itemIndexToBytes (pq .metadata .WriteIndex + 1 ))
256
259
257
260
reqBuf , err := pq .set .encoding .Marshal (req )
258
261
if err != nil {
259
262
return err
260
263
}
264
+ ops [1 ] = storage .SetOperation (getItemKey (pq .metadata .WriteIndex ), reqBuf )
261
265
262
- // Carry out a transaction where we both add the item and update the write index
263
- ops := []* storage.Operation {
264
- storage .SetOperation (writeIndexKey , itemIndexToBytes (pq .metadata .WriteIndex + 1 )),
265
- storage .SetOperation (getItemKey (pq .metadata .WriteIndex ), reqBuf ),
266
+ contextBuf := marshalSpanContext (ctx )
267
+ if len (contextBuf ) > 0 {
268
+ ops = append (ops , storage .SetOperation (getContextKey (pq .metadata .WriteIndex ), contextBuf ))
266
269
}
270
+
271
+ // Carry out a transaction where we add the item/context and update the write index
267
272
if err = pq .client .Batch (ctx , ops ... ); err != nil {
268
273
return err
269
274
}
@@ -295,7 +300,13 @@ func (pq *persistentQueue[T]) Read(ctx context.Context) (context.Context, T, Don
295
300
296
301
// Read until either a successful retrieved element or no more elements in the storage.
297
302
for pq .metadata .ReadIndex != pq .metadata .WriteIndex {
298
- index , req , consumed := pq .getNextItem (ctx )
303
+ index , req , consumed , restoredContext , err := pq .getNextItem (ctx )
304
+ if err != nil {
305
+ pq .logger .Debug ("Failed to dispatch item" , zap .Error (err ))
306
+ if err = pq .itemDispatchingFinish (ctx , index ); err != nil {
307
+ pq .logger .Error ("Error deleting item from queue" , zap .Error (err ))
308
+ }
309
+ }
299
310
// Ensure the used size and the channel size are in sync.
300
311
if pq .metadata .ReadIndex == pq .metadata .WriteIndex {
301
312
pq .metadata .QueueSize = 0
@@ -304,7 +315,7 @@ func (pq *persistentQueue[T]) Read(ctx context.Context) (context.Context, T, Don
304
315
if consumed {
305
316
id := indexDonePool .Get ().(* indexDone )
306
317
id .reset (index , pq .set .sizer .Sizeof (req ), pq )
307
- return context . Background () , req , id , true
318
+ return restoredContext , req , id , true
308
319
}
309
320
}
310
321
@@ -317,37 +328,44 @@ func (pq *persistentQueue[T]) Read(ctx context.Context) (context.Context, T, Don
317
328
// getNextItem pulls the next available item from the persistent storage along with its index. Once processing is
318
329
// finished, the index should be called with onDone to clean up the storage. If no new item is available,
319
330
// returns false.
320
- func (pq * persistentQueue [T ]) getNextItem (ctx context.Context ) (uint64 , T , bool ) {
331
+ func (pq * persistentQueue [T ]) getNextItem (ctx context.Context ) (uint64 , T , bool , context. Context , error ) {
321
332
index := pq .metadata .ReadIndex
322
333
// Increase here, so even if errors happen below, it always iterates
323
334
pq .metadata .ReadIndex ++
324
335
pq .metadata .CurrentlyDispatchedItems = append (pq .metadata .CurrentlyDispatchedItems , index )
325
336
getOp := storage .GetOperation (getItemKey (index ))
326
- err := pq . client . Batch ( ctx ,
327
- storage .SetOperation (readIndexKey , itemIndexToBytes (pq .metadata .ReadIndex )),
328
- storage .SetOperation (currentlyDispatchedItemsKey , itemIndexArrayToBytes (pq .metadata .CurrentlyDispatchedItems )),
329
- getOp )
337
+ ops := make ([] * storage. Operation , 3 , 4 )
338
+ ops [ 0 ] = storage .SetOperation (readIndexKey , itemIndexToBytes (pq .metadata .ReadIndex ))
339
+ ops [ 1 ] = storage .SetOperation (currentlyDispatchedItemsKey , itemIndexArrayToBytes (pq .metadata .CurrentlyDispatchedItems ))
340
+ ops [ 2 ] = getOp
330
341
331
- var request T
332
- if err == nil {
333
- request , err = pq .set .encoding .Unmarshal (getOp .Value )
342
+ // Only add context operation if feature gate is enabled
343
+ var ctxOp * storage.Operation
344
+ if persistRequestContextFeatureGate .IsEnabled () {
345
+ ctxOp = storage .GetOperation (getContextKey (index ))
346
+ ops = append (ops , ctxOp )
334
347
}
335
348
349
+ var request T
350
+ restoredContext := context .Background ()
351
+ err := pq .client .Batch (ctx , ops ... )
336
352
if err != nil {
337
- pq .logger .Debug ("Failed to dispatch item" , zap .Error (err ))
338
- // We need to make sure that currently dispatched items list is cleaned
339
- if err = pq .itemDispatchingFinish (ctx , index ); err != nil {
340
- pq .logger .Error ("Error deleting item from queue" , zap .Error (err ))
341
- }
342
-
343
- return 0 , request , false
353
+ return 0 , request , false , restoredContext , err
354
+ }
355
+ request , err = pq .set .encoding .Unmarshal (getOp .Value )
356
+ if err != nil {
357
+ return 0 , request , false , restoredContext , err
344
358
}
345
359
360
+ // Only try to restore context if feature gate is enabled
361
+ if persistRequestContextFeatureGate .IsEnabled () {
362
+ restoredContext = unmarshalSpanContext (ctxOp .Value )
363
+ }
346
364
// Increase the reference count, so the client is not closed while the request is being processed.
347
365
// The client cannot be closed because we hold the lock since last we checked `stopped`.
348
366
pq .refClient ++
349
367
350
- return index , request , true
368
+ return index , request , true , restoredContext , nil
351
369
}
352
370
353
371
// onDone should be called to remove the item of the given index from the queue once processing is finished.
@@ -414,13 +432,29 @@ func (pq *persistentQueue[T]) retrieveAndEnqueueNotDispatchedReqs(ctx context.Co
414
432
415
433
pq .logger .Info ("Fetching items left for dispatch by consumers" , zap .Int (zapNumberOfItems ,
416
434
len (dispatchedItems )))
417
- retrieveBatch := make ([]* storage.Operation , len (dispatchedItems ))
418
- cleanupBatch := make ([]* storage.Operation , len (dispatchedItems ))
435
+
436
+ // Calculate batch sizes based on whether context persistence is enabled
437
+ batchSize := len (dispatchedItems )
438
+ if persistRequestContextFeatureGate .IsEnabled () {
439
+ batchSize *= 2
440
+ }
441
+
442
+ retrieveBatch := make ([]* storage.Operation , batchSize )
443
+ cleanupBatch := make ([]* storage.Operation , batchSize )
444
+
419
445
for i , it := range dispatchedItems {
420
- key := getItemKey (it )
421
- retrieveBatch [i ] = storage .GetOperation (key )
422
- cleanupBatch [i ] = storage .DeleteOperation (key )
446
+ reqKey := getItemKey (it )
447
+ retrieveBatch [i ] = storage .GetOperation (reqKey )
448
+ cleanupBatch [i ] = storage .DeleteOperation (reqKey )
449
+
450
+ if persistRequestContextFeatureGate .IsEnabled () {
451
+ // store the context keys at at the end of the batch
452
+ ctxKey := getContextKey (it )
453
+ retrieveBatch [len (dispatchedItems )+ i ] = storage .GetOperation (ctxKey )
454
+ cleanupBatch [len (dispatchedItems )+ i ] = storage .DeleteOperation (ctxKey )
455
+ }
423
456
}
457
+
424
458
retrieveErr := pq .client .Batch (ctx , retrieveBatch ... )
425
459
cleanupErr := pq .client .Batch (ctx , cleanupBatch ... )
426
460
@@ -434,18 +468,27 @@ func (pq *persistentQueue[T]) retrieveAndEnqueueNotDispatchedReqs(ctx context.Co
434
468
}
435
469
436
470
errCount := 0
437
- for _ , op := range retrieveBatch {
471
+ // only need to iterate over first half of batch if spancontext is persisted as these items
472
+ // are at corresponding index in the second half of retrieveBatch
473
+ for idx := 0 ; idx < len (dispatchedItems ); idx ++ {
474
+ op := retrieveBatch [idx ]
438
475
if op .Value == nil {
439
476
pq .logger .Warn ("Failed retrieving item" , zap .String (zapKey , op .Key ), zap .Error (errValueNotSet ))
440
477
continue
441
478
}
479
+ restoredContext := ctx
442
480
req , err := pq .set .encoding .Unmarshal (op .Value )
443
481
// If error happened or item is nil, it will be efficiently ignored
444
482
if err != nil {
445
483
pq .logger .Warn ("Failed unmarshalling item" , zap .String (zapKey , op .Key ), zap .Error (err ))
446
484
continue
447
485
}
448
- if pq .putInternal (ctx , req ) != nil {
486
+ // We will then retrieve the context from the back half of the batch list, see above
487
+ if persistRequestContextFeatureGate .IsEnabled () {
488
+ ctxOp := retrieveBatch [len (dispatchedItems )+ idx ]
489
+ restoredContext = unmarshalSpanContext (ctxOp .Value )
490
+ }
491
+ if pq .putInternal (restoredContext , req ) != nil {
449
492
errCount ++
450
493
}
451
494
}
@@ -470,9 +513,12 @@ func (pq *persistentQueue[T]) itemDispatchingFinish(ctx context.Context, index u
470
513
}
471
514
}
472
515
473
- setOp := storage .SetOperation (currentlyDispatchedItemsKey , itemIndexArrayToBytes (pq .metadata .CurrentlyDispatchedItems ))
474
- deleteOp := storage .DeleteOperation (getItemKey (index ))
475
- if err := pq .client .Batch (ctx , setOp , deleteOp ); err != nil {
516
+ setOps := []* storage.Operation {storage .SetOperation (currentlyDispatchedItemsKey , itemIndexArrayToBytes (pq .metadata .CurrentlyDispatchedItems ))}
517
+ deleteOps := []* storage.Operation {storage .DeleteOperation (getItemKey (index ))}
518
+ if persistRequestContextFeatureGate .IsEnabled () {
519
+ deleteOps = append (deleteOps , storage .DeleteOperation (getContextKey (index )))
520
+ }
521
+ if err := pq .client .Batch (ctx , append (setOps , deleteOps ... )... ); err != nil {
476
522
// got an error, try to gracefully handle it
477
523
pq .logger .Warn ("Failed updating currently dispatched items, trying to delete the item first" ,
478
524
zap .Error (err ))
@@ -481,12 +527,12 @@ func (pq *persistentQueue[T]) itemDispatchingFinish(ctx context.Context, index u
481
527
return nil
482
528
}
483
529
484
- if err := pq .client .Batch (ctx , deleteOp ); err != nil {
530
+ if err := pq .client .Batch (ctx , deleteOps ... ); err != nil {
485
531
// Return an error here, as this indicates an issue with the underlying storage medium
486
532
return fmt .Errorf ("failed deleting item from queue, got error from storage: %w" , err )
487
533
}
488
534
489
- if err := pq .client .Batch (ctx , setOp ); err != nil {
535
+ if err := pq .client .Batch (ctx , setOps ... ); err != nil {
490
536
// even if this fails, we still have the right dispatched items in memory
491
537
// at worst, we'll have the wrong list in storage, and we'll discard the nonexistent items during startup
492
538
return fmt .Errorf ("failed updating currently dispatched items, but deleted item successfully: %w" , err )
0 commit comments