Skip to content

Commit 835a1df

Browse files
committed
[exporterhelper] Preserve request span context in the persistent queue
1 parent d800ad3 commit 835a1df

File tree

8 files changed

+408
-31
lines changed

8 files changed

+408
-31
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: exporterhelper
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add an option to preserve request span context in the persistent queue
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: []
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: |
19+
Currently, it's only available under "exporter.PersistRequestContext" feature gate which can be enabled by adding
20+
`--feature-gates=exporter.PersistRequestContext` to the collector command line.
21+
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]
Lines changed: 217 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,217 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package persistentqueue // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch/internal/persistentqueue"
5+
6+
import (
7+
"context"
8+
"encoding/binary"
9+
"errors"
10+
"fmt"
11+
"io"
12+
"math"
13+
14+
"go.opentelemetry.io/otel/propagation"
15+
16+
"go.opentelemetry.io/collector/featuregate"
17+
)
18+
19+
// persistRequestContextFeatureGate controls whether request context should be preserved in the persistent queue.
20+
var persistRequestContextFeatureGate = featuregate.GlobalRegistry().MustRegister(
21+
"exporter.PersistRequestContext",
22+
featuregate.StageAlpha,
23+
featuregate.WithRegisterFromVersion("v0.128.0"),
24+
featuregate.WithRegisterDescription("controls whether context should be stored alongside requests in the persistent queue"),
25+
)
26+
27+
type Encoding[T any] interface {
28+
// Marshal is a function that can marshal a request into bytes.
29+
Marshal(T) ([]byte, error)
30+
31+
// Unmarshal is a function that can unmarshal bytes into a request.
32+
Unmarshal([]byte) (T, error)
33+
}
34+
35+
// Encoder provides an interface for marshaling and unmarshaling requests along with their context.
36+
type Encoder[T any] struct {
37+
encoding Encoding[T]
38+
}
39+
40+
func NewEncoder[T any](encoding Encoding[T]) Encoder[T] {
41+
return Encoder[T]{
42+
encoding: encoding,
43+
}
44+
}
45+
46+
// requestDataKey is the key used to store request data in bytesMap.
47+
const requestDataKey = "req"
48+
49+
var tracePropagator = propagation.TraceContext{}
50+
51+
func (re Encoder[T]) Marshal(ctx context.Context, req T) ([]byte, error) {
52+
if !persistRequestContextFeatureGate.IsEnabled() {
53+
return re.encoding.Marshal(req)
54+
}
55+
56+
bm := newBytesMap()
57+
tracePropagator.Inject(ctx, &bytesMapCarrier{bytesMap: bm})
58+
reqBuf, err := re.encoding.Marshal(req)
59+
if err != nil {
60+
return nil, err
61+
}
62+
if err := bm.set(requestDataKey, reqBuf); err != nil {
63+
return nil, fmt.Errorf("failed to marshal request: %w", err)
64+
}
65+
66+
return *bm, nil
67+
}
68+
69+
func (re Encoder[T]) Unmarshal(b []byte) (T, context.Context, error) {
70+
if !persistRequestContextFeatureGate.IsEnabled() {
71+
req, err := re.encoding.Unmarshal(b)
72+
return req, context.Background(), err
73+
}
74+
75+
bm := bytesMapFromBytes(b)
76+
if bm == nil {
77+
// Fall back to unmarshalling of the request alone.
78+
// This can happen if the data persisted by the version that doesn't support the context unmarshaling.
79+
req, err := re.encoding.Unmarshal(b)
80+
return req, context.Background(), err
81+
}
82+
ctx := tracePropagator.Extract(context.Background(), &bytesMapCarrier{bytesMap: bm})
83+
reqBuf, err := bm.get(requestDataKey)
84+
var req T
85+
if err != nil {
86+
return req, context.Background(), fmt.Errorf("failed to read serialized request data: %w", err)
87+
}
88+
req, err = re.encoding.Unmarshal(reqBuf)
89+
return req, ctx, err
90+
}
91+
92+
// bytesMap is a slice of bytes that represents a map-like structure for storing key-value pairs.
93+
// It's optimized for efficient memory usage for low number of key-value pairs with big values.
94+
// The format is a sequence of key-value pairs encoded as:
95+
// - 1 byte length of the key
96+
// - key bytes
97+
// - 4 byte length of the value
98+
// - value bytes
99+
type bytesMap []byte
100+
101+
// prefix bytes to denote the bytesMap serialization: 0x00 magic byte + 0x01 version of the encoder.
102+
const (
103+
magicByte = byte(0x00)
104+
formatV1Byte = byte(0x01)
105+
prefixBytesLen = 2
106+
initialCapacity = 256
107+
)
108+
109+
func newBytesMap() *bytesMap {
110+
bm := bytesMap(make([]byte, 0, initialCapacity))
111+
bm = append(bm, magicByte, formatV1Byte)
112+
return &bm
113+
}
114+
115+
// set sets the specified key in the map. Must be called only once for each key.
116+
func (bm *bytesMap) set(key string, val []byte) error {
117+
if len(key) > math.MaxUint8 {
118+
return errors.New("key param is too long")
119+
}
120+
valSize := len(val)
121+
if uint64(valSize) > math.MaxUint32 {
122+
return fmt.Errorf("value is too large to persist, size %d", valSize)
123+
}
124+
125+
*bm = append(*bm, byte(len(key)))
126+
*bm = append(*bm, key...)
127+
128+
var lenBuf [4]byte
129+
binary.LittleEndian.PutUint32(lenBuf[:], uint32(valSize)) //nolint:gosec // disable G115
130+
*bm = append(*bm, lenBuf[:]...)
131+
*bm = append(*bm, val...)
132+
133+
return nil
134+
}
135+
136+
// get scans sequentially for the first matching key and returns the value as bytes.
137+
func (bm *bytesMap) get(k string) ([]byte, error) {
138+
for i := prefixBytesLen; i < len(*bm); {
139+
kl := int([]byte(*bm)[i])
140+
i++
141+
142+
if i+kl > len(*bm) {
143+
return nil, io.ErrUnexpectedEOF
144+
}
145+
key := string([]byte(*bm)[i : i+kl])
146+
i += kl
147+
148+
if i+4 > len(*bm) {
149+
return nil, io.ErrUnexpectedEOF
150+
}
151+
vLen := binary.LittleEndian.Uint32([]byte(*bm)[i:])
152+
i += 4
153+
154+
if i+int(vLen) > len(*bm) {
155+
return nil, io.ErrUnexpectedEOF
156+
}
157+
val := []byte(*bm)[i : i+int(vLen)]
158+
i += int(vLen)
159+
160+
if key == k {
161+
return val, nil
162+
}
163+
}
164+
return nil, nil
165+
}
166+
167+
// keys returns header names in encounter order.
168+
func (bm *bytesMap) keys() []string {
169+
var out []string
170+
for i := prefixBytesLen; i < len(*bm); {
171+
kl := int([]byte(*bm)[i])
172+
i++
173+
174+
if i+kl > len(*bm) {
175+
break // malformed entry
176+
}
177+
out = append(out, string([]byte(*bm)[i:i+kl]))
178+
i += kl
179+
180+
if i+4 > len(*bm) {
181+
break // malformed entry
182+
}
183+
vLen := binary.LittleEndian.Uint32([]byte(*bm)[i:])
184+
i += 4 + int(vLen)
185+
}
186+
return out
187+
}
188+
189+
func bytesMapFromBytes(b []byte) *bytesMap {
190+
if len(b) < prefixBytesLen || b[0] != magicByte || b[1] != formatV1Byte {
191+
return nil
192+
}
193+
return (*bytesMap)(&b)
194+
}
195+
196+
// bytesMapCarrier implements propagation.TextMapCarrier on top of bytesMap.
197+
type bytesMapCarrier struct {
198+
*bytesMap
199+
}
200+
201+
var _ propagation.TextMapCarrier = (*bytesMapCarrier)(nil)
202+
203+
// Set appends a new string entry; if the key already exists it is left unchanged.
204+
func (c *bytesMapCarrier) Set(k, v string) {
205+
_ = c.set(k, []byte(v))
206+
}
207+
208+
// Get scans sequentially for the first matching key.
209+
func (c *bytesMapCarrier) Get(k string) string {
210+
v, _ := c.get(k)
211+
return string(v)
212+
}
213+
214+
// Keys returns header names in encounter order.
215+
func (c *bytesMapCarrier) Keys() []string {
216+
return c.keys()
217+
}
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package persistentqueue // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch/internal/persistentqueue"
5+
6+
import (
7+
"context"
8+
"testing"
9+
10+
"github.com/stretchr/testify/assert"
11+
"github.com/stretchr/testify/require"
12+
"go.opentelemetry.io/otel/trace"
13+
14+
"go.opentelemetry.io/collector/featuregate"
15+
)
16+
17+
func TestBytesMap(t *testing.T) {
18+
data := []struct {
19+
key string
20+
val []byte
21+
}{
22+
{"key1", []byte("value1")},
23+
{"key2", []byte("value2")},
24+
{"key3", []byte("value3")},
25+
{"key4", []byte("value4")},
26+
}
27+
28+
bm := newBytesMap()
29+
for _, d := range data {
30+
err := bm.set(d.key, d.val)
31+
require.NoError(t, err)
32+
}
33+
34+
assert.Equal(t, []string{"key1", "key2", "key3", "key4"}, bm.keys())
35+
36+
buf, err := bm.get("key2")
37+
require.NoError(t, err)
38+
assert.Equal(t, []byte("value2"), buf)
39+
buf, err = bm.get("key4")
40+
require.NoError(t, err)
41+
assert.Equal(t, []byte("value4"), buf)
42+
}
43+
44+
func TestBytesMapCarrier(t *testing.T) {
45+
carrier := &bytesMapCarrier{bytesMap: newBytesMap()}
46+
47+
carrier.Set("key1", "val1")
48+
carrier.Set("key2", "val2")
49+
50+
assert.Equal(t, []string{"key1", "key2"}, carrier.Keys())
51+
assert.Equal(t, "val2", carrier.Get("key2"))
52+
assert.Equal(t, "val1", carrier.Get("key1"))
53+
}
54+
55+
func TestEncoder(t *testing.T) {
56+
traceID, err := trace.TraceIDFromHex("0102030405060708090a0b0c0d0e0f10")
57+
require.NoError(t, err)
58+
spanID, err := trace.SpanIDFromHex("0102030405060708")
59+
require.NoError(t, err)
60+
spanCtx := trace.NewSpanContext(trace.SpanContextConfig{
61+
TraceID: traceID,
62+
SpanID: spanID,
63+
TraceFlags: 0x01,
64+
TraceState: trace.TraceState{},
65+
Remote: true,
66+
})
67+
ctx := trace.ContextWithSpanContext(context.Background(), spanCtx)
68+
req := uint64(123)
69+
enc := NewEncoder(Uint64Encoding{})
70+
fgOrigState := persistRequestContextFeatureGate.IsEnabled()
71+
72+
tests := []struct {
73+
name string
74+
fgEnabledOnWrite bool
75+
fgEnabledOnRead bool
76+
wantSpanCtx trace.SpanContext
77+
wantReadErr error
78+
}{
79+
{
80+
name: "feature_gate_disabled_on_write_and_read",
81+
},
82+
{
83+
name: "feature_gate_enabled_on_write_and_read",
84+
fgEnabledOnWrite: true,
85+
fgEnabledOnRead: true,
86+
wantSpanCtx: spanCtx,
87+
},
88+
{
89+
name: "feature_gate_disabled_on_write_enabled_on_read",
90+
fgEnabledOnRead: true,
91+
},
92+
{
93+
name: "feature_gate_enabled_on_write_disabled_on_read",
94+
fgEnabledOnWrite: true,
95+
wantReadErr: ErrInvalidValue,
96+
},
97+
}
98+
99+
for _, tt := range tests {
100+
t.Run(tt.name, func(t *testing.T) {
101+
require.NoError(t, featuregate.GlobalRegistry().Set(persistRequestContextFeatureGate.ID(), tt.fgEnabledOnWrite))
102+
defer func() {
103+
require.NoError(t, featuregate.GlobalRegistry().Set(persistRequestContextFeatureGate.ID(), fgOrigState))
104+
}()
105+
buf, err := enc.Marshal(ctx, req)
106+
require.NoError(t, err)
107+
108+
require.NoError(t, featuregate.GlobalRegistry().Set(persistRequestContextFeatureGate.ID(), tt.fgEnabledOnRead))
109+
gotReq, gotCtx, err := enc.Unmarshal(buf)
110+
assert.Equal(t, tt.wantReadErr, err)
111+
if err == nil {
112+
assert.Equal(t, req, gotReq)
113+
gotSpanCtx := trace.SpanContextFromContext(gotCtx)
114+
assert.Equal(t, tt.wantSpanCtx, gotSpanCtx)
115+
}
116+
})
117+
}
118+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package persistentqueue // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch/internal/persistentqueue"
5+
6+
import (
7+
"encoding/binary"
8+
"errors"
9+
)
10+
11+
var ErrInvalidValue = errors.New("invalid value")
12+
13+
type Uint64Encoding struct{}
14+
15+
var _ Encoding[uint64] = Uint64Encoding{}
16+
17+
func (Uint64Encoding) Marshal(val uint64) ([]byte, error) {
18+
return binary.LittleEndian.AppendUint64([]byte{}, val), nil
19+
}
20+
21+
func (Uint64Encoding) Unmarshal(bytes []byte) (uint64, error) {
22+
if len(bytes) != 8 {
23+
return 0, ErrInvalidValue
24+
}
25+
return binary.LittleEndian.Uint64(bytes), nil
26+
}

0 commit comments

Comments
 (0)