@@ -15,6 +15,7 @@ import (
15
15
16
16
"go.opentelemetry.io/collector/component"
17
17
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/experr"
18
+ "go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch/internal/persistentqueue"
18
19
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
19
20
"go.opentelemetry.io/collector/extension/xextension/storage"
20
21
"go.opentelemetry.io/collector/pipeline"
@@ -92,15 +93,12 @@ type persistentQueue[T any] struct {
92
93
isRequestSized bool
93
94
94
95
// mu guards everything declared below.
95
- mu sync.Mutex
96
- hasMoreElements * sync.Cond
97
- hasMoreSpace * cond
98
- readIndex uint64
99
- writeIndex uint64
100
- currentlyDispatchedItems []uint64
101
- queueSize int64
102
- refClient int64
103
- stopped bool
96
+ mu sync.Mutex
97
+ hasMoreElements * sync.Cond
98
+ hasMoreSpace * cond
99
+ metadata persistentqueue.QueueMetadata
100
+ refClient int64
101
+ stopped bool
104
102
}
105
103
106
104
// newPersistentQueue creates a new queue backed by file storage; name and signal must be a unique combination that identifies the queue storage
@@ -129,7 +127,7 @@ func (pq *persistentQueue[T]) Start(ctx context.Context, host component.Host) er
129
127
func (pq * persistentQueue [T ]) Size () int64 {
130
128
pq .mu .Lock ()
131
129
defer pq .mu .Unlock ()
132
- return pq .queueSize
130
+ return pq .metadata . QueueSize
133
131
}
134
132
135
133
func (pq * persistentQueue [T ]) Capacity () int64 {
@@ -151,11 +149,11 @@ func (pq *persistentQueue[T]) initPersistentContiguousStorage(ctx context.Contex
151
149
152
150
err := pq .client .Batch (ctx , riOp , wiOp )
153
151
if err == nil {
154
- pq .readIndex , err = bytesToItemIndex (riOp .Value )
152
+ pq .metadata . ReadIndex , err = bytesToItemIndex (riOp .Value )
155
153
}
156
154
157
155
if err == nil {
158
- pq .writeIndex , err = bytesToItemIndex (wiOp .Value )
156
+ pq .metadata . WriteIndex , err = bytesToItemIndex (wiOp .Value )
159
157
}
160
158
161
159
if err != nil {
@@ -164,11 +162,11 @@ func (pq *persistentQueue[T]) initPersistentContiguousStorage(ctx context.Contex
164
162
} else {
165
163
pq .logger .Error ("Failed getting read/write index, starting with new ones" , zap .Error (err ))
166
164
}
167
- pq .readIndex = 0
168
- pq .writeIndex = 0
165
+ pq .metadata . ReadIndex = 0
166
+ pq .metadata . WriteIndex = 0
169
167
}
170
168
171
- queueSize := pq .writeIndex - pq .readIndex
169
+ queueSize := pq .metadata . WriteIndex - pq .metadata . ReadIndex
172
170
173
171
// If the queue is sized by the number of requests, no need to read the queue size from storage.
174
172
if queueSize > 0 && ! pq .isRequestSized {
@@ -177,7 +175,7 @@ func (pq *persistentQueue[T]) initPersistentContiguousStorage(ctx context.Contex
177
175
}
178
176
}
179
177
//nolint:gosec
180
- pq .queueSize = int64 (queueSize )
178
+ pq .metadata . QueueSize = int64 (queueSize )
181
179
}
182
180
183
181
// restoreQueueSizeFromStorage restores the queue size from storage.
@@ -222,7 +220,7 @@ func (pq *persistentQueue[T]) backupQueueSize(ctx context.Context) error {
222
220
}
223
221
224
222
//nolint:gosec
225
- return pq .client .Set (ctx , queueSizeKey , itemIndexToBytes (uint64 (pq .queueSize )))
223
+ return pq .client .Set (ctx , queueSizeKey , itemIndexToBytes (uint64 (pq .metadata . QueueSize )))
226
224
}
227
225
228
226
// unrefClient unrefs the client, and closes if no more references. Callers MUST hold the mutex.
@@ -247,7 +245,7 @@ func (pq *persistentQueue[T]) Offer(ctx context.Context, req T) error {
247
245
// putInternal is the internal version that requires caller to hold the mutex lock.
248
246
func (pq * persistentQueue [T ]) putInternal (ctx context.Context , req T ) error {
249
247
reqSize := pq .set .sizer .Sizeof (req )
250
- for pq .queueSize + reqSize > pq .set .capacity {
248
+ for pq .metadata . QueueSize + reqSize > pq .set .capacity {
251
249
if ! pq .set .blockOnOverflow {
252
250
return ErrQueueIsFull
253
251
}
@@ -263,20 +261,20 @@ func (pq *persistentQueue[T]) putInternal(ctx context.Context, req T) error {
263
261
264
262
// Carry out a transaction where we both add the item and update the write index
265
263
ops := []* storage.Operation {
266
- storage .SetOperation (writeIndexKey , itemIndexToBytes (pq .writeIndex + 1 )),
267
- storage .SetOperation (getItemKey (pq .writeIndex ), reqBuf ),
264
+ storage .SetOperation (writeIndexKey , itemIndexToBytes (pq .metadata . WriteIndex + 1 )),
265
+ storage .SetOperation (getItemKey (pq .metadata . WriteIndex ), reqBuf ),
268
266
}
269
267
if err = pq .client .Batch (ctx , ops ... ); err != nil {
270
268
return err
271
269
}
272
270
273
- pq .writeIndex ++
274
- pq .queueSize += reqSize
271
+ pq .metadata . WriteIndex ++
272
+ pq .metadata . QueueSize += reqSize
275
273
pq .hasMoreElements .Signal ()
276
274
277
275
// Back up the queue size to storage every 10 writes. The stored value is used to recover the queue size
278
276
// in case if the collector is killed. The recovered queue size is allowed to be inaccurate.
279
- if (pq .writeIndex % 10 ) == 5 {
277
+ if (pq .metadata . WriteIndex % 10 ) == 5 {
280
278
if err := pq .backupQueueSize (ctx ); err != nil {
281
279
pq .logger .Error ("Error writing queue size to storage" , zap .Error (err ))
282
280
}
@@ -296,11 +294,11 @@ func (pq *persistentQueue[T]) Read(ctx context.Context) (context.Context, T, Don
296
294
}
297
295
298
296
// Read until either a successful retrieved element or no more elements in the storage.
299
- for pq .readIndex != pq .writeIndex {
297
+ for pq .metadata . ReadIndex != pq .metadata . WriteIndex {
300
298
index , req , consumed := pq .getNextItem (ctx )
301
299
// Ensure the used size and the channel size are in sync.
302
- if pq .readIndex == pq .writeIndex {
303
- pq .queueSize = 0
300
+ if pq .metadata . ReadIndex == pq .metadata . WriteIndex {
301
+ pq .metadata . QueueSize = 0
304
302
pq .hasMoreSpace .Signal ()
305
303
}
306
304
if consumed {
@@ -320,14 +318,14 @@ func (pq *persistentQueue[T]) Read(ctx context.Context) (context.Context, T, Don
320
318
// finished, the index should be called with onDone to clean up the storage. If no new item is available,
321
319
// returns false.
322
320
func (pq * persistentQueue [T ]) getNextItem (ctx context.Context ) (uint64 , T , bool ) {
323
- index := pq .readIndex
321
+ index := pq .metadata . ReadIndex
324
322
// Increase here, so even if errors happen below, it always iterates
325
- pq .readIndex ++
326
- pq .currentlyDispatchedItems = append (pq .currentlyDispatchedItems , index )
323
+ pq .metadata . ReadIndex ++
324
+ pq .metadata . CurrentlyDispatchedItems = append (pq .metadata . CurrentlyDispatchedItems , index )
327
325
getOp := storage .GetOperation (getItemKey (index ))
328
326
err := pq .client .Batch (ctx ,
329
- storage .SetOperation (readIndexKey , itemIndexToBytes (pq .readIndex )),
330
- storage .SetOperation (currentlyDispatchedItemsKey , itemIndexArrayToBytes (pq .currentlyDispatchedItems )),
327
+ storage .SetOperation (readIndexKey , itemIndexToBytes (pq .metadata . ReadIndex )),
328
+ storage .SetOperation (currentlyDispatchedItemsKey , itemIndexArrayToBytes (pq .metadata . CurrentlyDispatchedItems )),
331
329
getOp )
332
330
333
331
var request T
@@ -364,12 +362,12 @@ func (pq *persistentQueue[T]) onDone(index uint64, elSize int64, consumeErr erro
364
362
pq .mu .Unlock ()
365
363
}()
366
364
367
- pq .queueSize -= elSize
365
+ pq .metadata . QueueSize -= elSize
368
366
// The size might be not in sync with the queue in case it's restored from the disk
369
367
// because we don't flush the current queue size on the disk on every read/write.
370
368
// In that case we need to make sure it doesn't go below 0.
371
- if pq .queueSize < 0 {
372
- pq .queueSize = 0
369
+ if pq .metadata . QueueSize < 0 {
370
+ pq .metadata . QueueSize = 0
373
371
}
374
372
pq .hasMoreSpace .Signal ()
375
373
@@ -385,7 +383,7 @@ func (pq *persistentQueue[T]) onDone(index uint64, elSize int64, consumeErr erro
385
383
386
384
// Back up the queue size to storage on every 10 reads. The stored value is used to recover the queue size
387
385
// in case if the collector is killed. The recovered queue size is allowed to be inaccurate.
388
- if (pq .readIndex % 10 ) == 0 {
386
+ if (pq .metadata . ReadIndex % 10 ) == 0 {
389
387
if qsErr := pq .backupQueueSize (context .Background ()); qsErr != nil {
390
388
pq .logger .Error ("Error writing queue size to storage" , zap .Error (qsErr ))
391
389
}
@@ -463,16 +461,16 @@ func (pq *persistentQueue[T]) retrieveAndEnqueueNotDispatchedReqs(ctx context.Co
463
461
464
462
// itemDispatchingFinish removes the item from the list of currently dispatched items and deletes it from the persistent queue
465
463
func (pq * persistentQueue [T ]) itemDispatchingFinish (ctx context.Context , index uint64 ) error {
466
- lenCDI := len (pq .currentlyDispatchedItems )
464
+ lenCDI := len (pq .metadata . CurrentlyDispatchedItems )
467
465
for i := 0 ; i < lenCDI ; i ++ {
468
- if pq .currentlyDispatchedItems [i ] == index {
469
- pq .currentlyDispatchedItems [i ] = pq .currentlyDispatchedItems [lenCDI - 1 ]
470
- pq .currentlyDispatchedItems = pq .currentlyDispatchedItems [:lenCDI - 1 ]
466
+ if pq .metadata . CurrentlyDispatchedItems [i ] == index {
467
+ pq .metadata . CurrentlyDispatchedItems [i ] = pq .metadata . CurrentlyDispatchedItems [lenCDI - 1 ]
468
+ pq .metadata . CurrentlyDispatchedItems = pq .metadata . CurrentlyDispatchedItems [:lenCDI - 1 ]
471
469
break
472
470
}
473
471
}
474
472
475
- setOp := storage .SetOperation (currentlyDispatchedItemsKey , itemIndexArrayToBytes (pq .currentlyDispatchedItems ))
473
+ setOp := storage .SetOperation (currentlyDispatchedItemsKey , itemIndexArrayToBytes (pq .metadata . CurrentlyDispatchedItems ))
476
474
deleteOp := storage .DeleteOperation (getItemKey (index ))
477
475
if err := pq .client .Batch (ctx , setOp , deleteOp ); err != nil {
478
476
// got an error, try to gracefully handle it
0 commit comments