Skip to content

Migrate legacy queue metadata to a consolidated format and update related tests #13126

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 129 additions & 35 deletions exporter/exporterhelper/internal/queue/persistent_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,11 @@
zapErrorCount = "errorCount"
zapNumberOfItems = "numberOfItems"

readIndexKey = "ri"
writeIndexKey = "wi"
currentlyDispatchedItemsKey = "di"
legacyReadIndexKey = "ri"
legacyWriteIndexKey = "wi"
legacyCurrentlyDispatchedItemsKey = "di"

// queueMetadataKey is the new single key for all queue metadata.
// TODO: Enable when https://github.com/open-telemetry/opentelemetry-collector/issues/12890 is done
//nolint:unused
queueMetadataKey = "qmv0"
)

Expand Down Expand Up @@ -160,10 +158,20 @@
}

func (pq *persistentQueue[T]) initPersistentContiguousStorage(ctx context.Context) {
riOp := storage.GetOperation(readIndexKey)
wiOp := storage.GetOperation(writeIndexKey)
if err := pq.loadQueueMetadata(ctx); err == nil {
pq.logger.Info("Loaded consolidated queue metadata.")
return
} else if !errors.Is(err, errValueNotSet) {
pq.logger.Error("Failed to load consolidated queue metadata", zap.Error(err))
return
}

err := pq.client.Batch(ctx, riOp, wiOp)
pq.logger.Info("Queue metadata not found, attempting migration from legacy format.")
riOp := storage.GetOperation(legacyReadIndexKey)
wiOp := storage.GetOperation(legacyWriteIndexKey)
diOp := storage.GetOperation(legacyCurrentlyDispatchedItemsKey)

err := pq.client.Batch(ctx, riOp, wiOp, diOp)
if err == nil {
pq.metadata.ReadIndex, err = bytesToItemIndex(riOp.Value)
}
Expand All @@ -172,14 +180,77 @@
pq.metadata.WriteIndex, err = bytesToItemIndex(wiOp.Value)
}

if err == nil {
pq.metadata.CurrentlyDispatchedItems, err = bytesToItemIndexArray(diOp.Value)
}

if err != nil {
if errors.Is(err, errValueNotSet) {
pq.logger.Info("Initializing new persistent queue")
} else {
pq.logger.Error("Failed getting read/write index, starting with new ones", zap.Error(err))
pq.logger.Error("Failed to load legacy queue metadata from storage, initializing with defaults", zap.Error(err))

Check warning on line 191 in exporter/exporterhelper/internal/queue/persistent_queue.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterhelper/internal/queue/persistent_queue.go#L191

Added line #L191 was not covered by tests
}
pq.metadata.ReadIndex = 0
pq.metadata.WriteIndex = 0
pq.metadata.CurrentlyDispatchedItems = nil
}

if err := pq.backupCurrentMetadata(ctx); err != nil {
pq.logger.Error("Failed to persist migrated queue metadata", zap.Error(err))
return
}

Check warning on line 201 in exporter/exporterhelper/internal/queue/persistent_queue.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterhelper/internal/queue/persistent_queue.go#L199-L201

Added lines #L199 - L201 were not covered by tests
pq.cleanupLegacyKeys(ctx)
}

func (pq *persistentQueue[T]) loadQueueMetadata(ctx context.Context) error {
buf, err := pq.client.Get(ctx, queueMetadataKey)
if err != nil {
return err
}

if len(buf) == 0 {
return errValueNotSet
}

metadata := &pq.metadata
if err = metadata.Unmarshal(buf); err != nil {
return err
}

pq.logger.Info("Loaded queue metadata",
zap.Int64("itemsSize", pq.metadata.ItemsSize),
zap.Int64("bytesSize", pq.metadata.BytesSize),
zap.Uint64("readIndex", pq.metadata.ReadIndex),
zap.Uint64("writeIndex", pq.metadata.WriteIndex),
zap.Int("dispatchedItems", len(pq.metadata.CurrentlyDispatchedItems)))

return nil
}

func (pq *persistentQueue[T]) backupCurrentMetadata(ctx context.Context) error {
metadataBytes, err := metadataToBytes(&pq.metadata)
if err != nil {
return err
}

Check warning on line 234 in exporter/exporterhelper/internal/queue/persistent_queue.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterhelper/internal/queue/persistent_queue.go#L233-L234

Added lines #L233 - L234 were not covered by tests

if err := pq.client.Set(ctx, queueMetadataKey, metadataBytes); err != nil {
pq.logger.Error("Failed to persist current metadata to storage", zap.Error(err))
return err
}

Check warning on line 239 in exporter/exporterhelper/internal/queue/persistent_queue.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterhelper/internal/queue/persistent_queue.go#L237-L239

Added lines #L237 - L239 were not covered by tests
return nil
}

func (pq *persistentQueue[T]) cleanupLegacyKeys(ctx context.Context) {
ops := []*storage.Operation{
storage.DeleteOperation(legacyReadIndexKey),
storage.DeleteOperation(legacyWriteIndexKey),
storage.DeleteOperation(legacyCurrentlyDispatchedItemsKey),
}

if err := pq.client.Batch(ctx, ops...); err != nil {
pq.logger.Warn("Failed to cleanup legacy metadata keys", zap.Error(err))

Check warning on line 251 in exporter/exporterhelper/internal/queue/persistent_queue.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterhelper/internal/queue/persistent_queue.go#L251

Added line #L251 was not covered by tests
} else {
pq.logger.Info("Successfully migrated to consolidated metadata format")
}
}

Expand Down Expand Up @@ -233,20 +304,29 @@
return err
}

// Carry out a transaction where we both add the item and update the write index
itemIndex := pq.metadata.WriteIndex
pq.metadata.ItemsSize += pq.itemsSizer.Sizeof(req)
pq.metadata.BytesSize += pq.bytesSizer.Sizeof(req)
pq.metadata.WriteIndex++

metadataBytes, err := metadataToBytes(&pq.metadata)
if err != nil {
return err
}

Check warning on line 315 in exporter/exporterhelper/internal/queue/persistent_queue.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterhelper/internal/queue/persistent_queue.go#L314-L315

Added lines #L314 - L315 were not covered by tests

// Carry out a transaction where we both add the item and update the metadata.
ops := []*storage.Operation{
storage.SetOperation(writeIndexKey, itemIndexToBytes(pq.metadata.WriteIndex+1)),
storage.SetOperation(getItemKey(pq.metadata.WriteIndex), reqBuf),
storage.SetOperation(queueMetadataKey, metadataBytes),
storage.SetOperation(getItemKey(itemIndex), reqBuf),
}
if err = pq.client.Batch(ctx, ops...); err != nil {
pq.metadata.ItemsSize -= pq.itemsSizer.Sizeof(req)
pq.metadata.BytesSize -= pq.bytesSizer.Sizeof(req)
pq.metadata.WriteIndex--
return err
}

pq.metadata.ItemsSize += pq.itemsSizer.Sizeof(req)
pq.metadata.BytesSize += pq.bytesSizer.Sizeof(req)
pq.metadata.WriteIndex++
pq.hasMoreElements.Signal()

return nil
}

Expand Down Expand Up @@ -291,14 +371,20 @@
// Increase here, so even if errors happen below, it always iterates
pq.metadata.ReadIndex++
pq.metadata.CurrentlyDispatchedItems = append(pq.metadata.CurrentlyDispatchedItems, index)
getOp := storage.GetOperation(getItemKey(index))
err := pq.client.Batch(ctx,
storage.SetOperation(readIndexKey, itemIndexToBytes(pq.metadata.ReadIndex)),
storage.SetOperation(currentlyDispatchedItemsKey, itemIndexArrayToBytes(pq.metadata.CurrentlyDispatchedItems)),
getOp)

var req T
restoredCtx := context.Background()

metadataBytes, err := metadataToBytes(&pq.metadata)
if err != nil {
return 0, req, restoredCtx, false
}

Check warning on line 381 in exporter/exporterhelper/internal/queue/persistent_queue.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterhelper/internal/queue/persistent_queue.go#L380-L381

Added lines #L380 - L381 were not covered by tests

getOp := storage.GetOperation(getItemKey(index))
err = pq.client.Batch(ctx,
storage.SetOperation(queueMetadataKey, metadataBytes),
getOp)

if err == nil {
restoredCtx, req, err = pq.encoding.Unmarshal(getOp.Value)
}
Expand Down Expand Up @@ -338,9 +424,6 @@
return
}

if err := pq.itemDispatchingFinish(context.Background(), index); err != nil {
pq.logger.Error("Error deleting item from queue", zap.Error(err))
}
pq.metadata.BytesSize -= bytesSize
if pq.metadata.BytesSize < 0 {
pq.metadata.BytesSize = 0
Expand All @@ -356,26 +439,22 @@
pq.metadata.ItemsSize = 0
}

if err := pq.itemDispatchingFinish(context.Background(), index); err != nil {
pq.logger.Error("Error deleting item from queue", zap.Error(err))
}

Check warning on line 444 in exporter/exporterhelper/internal/queue/persistent_queue.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterhelper/internal/queue/persistent_queue.go#L443-L444

Added lines #L443 - L444 were not covered by tests

// More space available after data are removed from the storage.
pq.hasMoreSpace.Signal()
}

// retrieveAndEnqueueNotDispatchedReqs gets the items for which sending was not finished, cleans the storage
// and moves the items at the back of the queue.
func (pq *persistentQueue[T]) retrieveAndEnqueueNotDispatchedReqs(ctx context.Context) {
var dispatchedItems []uint64

pq.mu.Lock()
defer pq.mu.Unlock()
pq.logger.Debug("Checking if there are items left for dispatch by consumers")
itemKeysBuf, err := pq.client.Get(ctx, currentlyDispatchedItemsKey)
if err == nil {
dispatchedItems, err = bytesToItemIndexArray(itemKeysBuf)
}
if err != nil {
pq.logger.Error("Could not fetch items left for dispatch by consumers", zap.Error(err))
return
}

dispatchedItems := pq.metadata.CurrentlyDispatchedItems

if len(dispatchedItems) == 0 {
pq.logger.Debug("No items left for dispatch by consumers")
Expand All @@ -384,6 +463,8 @@

pq.logger.Info("Fetching items left for dispatch by consumers", zap.Int(zapNumberOfItems,
len(dispatchedItems)))
pq.metadata.CurrentlyDispatchedItems = nil

retrieveBatch := make([]*storage.Operation, len(dispatchedItems))
cleanupBatch := make([]*storage.Operation, len(dispatchedItems))
for i, it := range dispatchedItems {
Expand All @@ -410,6 +491,10 @@
continue
}
reqCtx, req, err := pq.encoding.Unmarshal(op.Value)
// Subtract the item size from the queue size before re-enqueuing to avoid double counting.
pq.metadata.ItemsSize -= pq.itemsSizer.Sizeof(req)
pq.metadata.BytesSize -= pq.bytesSizer.Sizeof(req)

// If error happened or item is nil, it will be efficiently ignored
if err != nil {
pq.logger.Warn("Failed unmarshalling item", zap.String(zapKey, op.Key), zap.Error(err))
Expand Down Expand Up @@ -440,7 +525,12 @@
}
}

setOp := storage.SetOperation(currentlyDispatchedItemsKey, itemIndexArrayToBytes(pq.metadata.CurrentlyDispatchedItems))
metadataBytes, err := metadataToBytes(&pq.metadata)
if err != nil {
return err
}

Check warning on line 531 in exporter/exporterhelper/internal/queue/persistent_queue.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterhelper/internal/queue/persistent_queue.go#L530-L531

Added lines #L530 - L531 were not covered by tests

setOp := storage.SetOperation(queueMetadataKey, metadataBytes)
deleteOp := storage.DeleteOperation(getItemKey(index))
if err := pq.client.Batch(ctx, setOp, deleteOp); err != nil {
// got an error, try to gracefully handle it
Expand Down Expand Up @@ -537,6 +627,10 @@
return val, nil
}

func metadataToBytes(meta *PersistentMetadata) ([]byte, error) {
return meta.Marshal()
}

type indexDone struct {
index uint64
itemsSize int64
Expand Down
Loading
Loading