Skip to content

Commit 279ccad

Browse files
authored
[chore] Add unified queueState struct (#13067)
Adds a single key structure to store information about the persistent queue. #12890
1 parent 633b6de commit 279ccad

File tree

7 files changed

+297
-1
lines changed

7 files changed

+297
-1
lines changed

Makefile

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ gotidy:
9494
gogenerate:
9595
cd cmd/mdatagen && $(GOCMD) install .
9696
@$(MAKE) for-all-target TARGET="generate"
97+
$(MAKE) genproto_internal
9798
$(MAKE) fmt
9899

99100
.PHONY: addlicense
@@ -260,6 +261,17 @@ gensemconv: $(SEMCONVGEN) $(SEMCONVKIT)
260261
$(SEMCONVGEN) -o semconv/${SPECTAG} -t semconv/template.j2 -s ${SPECTAG} -i ${SPECPATH}/model/. --only=attribute_group -p conventionType=attribute_group -f generated_attribute_group.go
261262
$(SEMCONVKIT) -output "semconv/$(SPECTAG)" -tag "$(SPECTAG)"
262263

264+
INTERNAL_PROTO_SRC_DIRS := exporter/exporterhelper/internal/queuebatch/internal/persistentqueue
265+
# INTERNAL_PROTO_SRC_DIRS += path/to/other/proto/dirs
266+
INTERNAL_PROTO_FILES := $(foreach dir,$(INTERNAL_PROTO_SRC_DIRS),$(wildcard $(dir)/*.proto))
267+
INTERNAL_PROTOC := $(DOCKERCMD) run --rm -u ${shell id -u} -v${PWD}:${PWD} -w${PWD} ${DOCKER_PROTOBUF} --proto_path=${PWD} --go_out=${PWD}
268+
269+
.PHONY: genproto_internal
270+
genproto_internal:
271+
@echo "Generating Go code for internal proto files"
272+
@echo "Found proto files: $(INTERNAL_PROTO_FILES)"
273+
$(foreach file,$(INTERNAL_PROTO_FILES),$(call exec-command,$(INTERNAL_PROTOC) --go_opt=paths=source_relative $(file)))
274+
263275
ALL_MOD_PATHS := "" $(ALL_MODULES:.%=%)
264276

265277
.PHONY: prepare-contrib

exporter/exporterhelper/internal/queuebatch/internal/persistentqueue/meta.pb.go

Lines changed: 209 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
syntax = "proto3";
2+
3+
package opentelemetry.collector.exporter.exporterhelper.internal.queuebatch.internal.persistentqueue;
4+
5+
option go_package = "go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch/internal/persistentqueue";
6+
7+
// QueueMetadata holds all persistent metadata for the queue.
8+
message QueueMetadata{
9+
// Sizer type configuration (bytes=0, items=1, requests=2)
10+
int32 sizer_type_value = 1;
11+
12+
// Index of the next item to be read from the queue
13+
uint64 read_index = 2;
14+
15+
// Index where the next item will be written to the queue
16+
uint64 write_index = 3;
17+
18+
// Current total size of the queue (in bytes, items, or requests)
19+
sfixed64 queue_size = 4;
20+
21+
// List of item indices currently being processed by consumers
22+
repeated fixed64 currently_dispatched_items = 5;
23+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package persistentqueue // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch/internal/persistentqueue"
5+
6+
import (
7+
"fmt"
8+
9+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
10+
)
11+
12+
// Numeric constants for protobuf serialization
13+
const (
14+
SizerTypeBytesValue int32 = iota // 0
15+
SizerTypeItemsValue // 1
16+
SizerTypeRequestsValue // 2
17+
)
18+
19+
// SizerTypeToInt32 converts SizerType to int32 for protobuf serialization
20+
func SizerTypeToInt32(sizerType request.SizerType) (int32, error) {
21+
switch sizerType {
22+
case request.SizerTypeBytes:
23+
return SizerTypeBytesValue, nil
24+
case request.SizerTypeItems:
25+
return SizerTypeItemsValue, nil
26+
case request.SizerTypeRequests:
27+
return SizerTypeRequestsValue, nil
28+
default:
29+
return -1, fmt.Errorf("invalid sizer type: %v", sizerType)
30+
}
31+
}
32+
33+
// SizerTypeFromInt32 creates SizerType from int32 representation
34+
func SizerTypeFromInt32(value int32) (request.SizerType, error) {
35+
switch value {
36+
case SizerTypeBytesValue:
37+
return request.SizerTypeBytes, nil
38+
case SizerTypeItemsValue:
39+
return request.SizerTypeItems, nil
40+
case SizerTypeRequestsValue:
41+
return request.SizerTypeRequests, nil
42+
default:
43+
return request.SizerType{}, fmt.Errorf("invalid sizer type value: %d", value)
44+
}
45+
}

exporter/exporterhelper/internal/queuebatch/persistent_queue.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,11 @@ const (
2929
writeIndexKey = "wi"
3030
currentlyDispatchedItemsKey = "di"
3131
queueSizeKey = "si"
32+
33+
// queueMetadataKey is the new single key for all queue metadata.
34+
// TODO: Enable when https://github.com/open-telemetry/opentelemetry-collector/issues/12890 is done
35+
//nolint:unused
36+
queueMetadataKey = "qmv0"
3237
)
3338

3439
var (
@@ -46,6 +51,7 @@ var indexDonePool = sync.Pool{
4651

4752
type persistentQueueSettings[T any] struct {
4853
sizer request.Sizer[T]
54+
sizerType request.SizerType
4955
capacity int64
5056
blockOnOverflow bool
5157
signal pipeline.Signal

exporter/exporterhelper/internal/queuebatch/queue_batch.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ func newQueueBatch(
9898
} else {
9999
q = newAsyncQueue(newPersistentQueue[request.Request](persistentQueueSettings[request.Request]{
100100
sizer: sizer,
101+
sizerType: cfg.Sizer,
101102
capacity: cfg.QueueSize,
102103
blockOnOverflow: cfg.BlockOnOverflow,
103104
signal: set.Signal,

exporter/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ require (
2828
go.uber.org/goleak v1.3.0
2929
go.uber.org/multierr v1.11.0
3030
go.uber.org/zap v1.27.0
31+
google.golang.org/protobuf v1.36.6
3132
)
3233

3334
require (
@@ -64,7 +65,6 @@ require (
6465
golang.org/x/text v0.24.0 // indirect
6566
google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a // indirect
6667
google.golang.org/grpc v1.72.2 // indirect
67-
google.golang.org/protobuf v1.36.6 // indirect
6868
gopkg.in/yaml.v3 v3.0.1 // indirect
6969
sigs.k8s.io/yaml v1.4.0 // indirect
7070
)

0 commit comments

Comments
 (0)