@@ -6,21 +6,18 @@ package queuebatch // import "go.opentelemetry.io/collector/exporter/exporterhel
6
6
import (
7
7
"context"
8
8
"encoding/binary"
9
- "encoding/hex"
10
9
"encoding/json"
11
10
"errors"
12
11
"fmt"
13
12
"strconv"
14
13
"sync"
15
14
16
- "go.opentelemetry.io/otel/trace"
17
15
"go.uber.org/zap"
18
16
19
17
"go.opentelemetry.io/collector/component"
20
18
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/experr"
21
19
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
22
20
"go.opentelemetry.io/collector/extension/xextension/storage"
23
- "go.opentelemetry.io/collector/featuregate"
24
21
"go.opentelemetry.io/collector/pipeline"
25
22
)
26
23
@@ -37,24 +34,14 @@ const (
37
34
// queueMetadataKey is the new single key for all queue metadata.
38
35
// TODO: Enable when https://github.com/open-telemetry/opentelemetry-collector/issues/12890 is done
39
36
//nolint:unused
40
- queueMetadataKey = "qmv0"
41
- errInvalidTraceFlagsLength = "trace flags must only be 1 byte"
37
+ queueMetadataKey = "qmv0"
42
38
)
43
39
44
40
var (
45
41
errValueNotSet = errors .New ("value not set" )
46
42
errInvalidValue = errors .New ("invalid value" )
47
43
errNoStorageClient = errors .New ("no storage client extension found" )
48
44
errWrongExtensionType = errors .New ("requested extension is not a storage extension" )
49
-
50
- // persistRequestContextFeatureGate controls whether request context should be persisted in the queue.
51
- persistRequestContextFeatureGate = featuregate .GlobalRegistry ().MustRegister (
52
- "exporter.PersistRequestContext" ,
53
- featuregate .StageAlpha ,
54
- featuregate .WithRegisterFromVersion ("v0.127.0" ),
55
- featuregate .WithRegisterDescription ("controls whether context should be stored alongside requests in the persistent queue" ),
56
- featuregate .WithRegisterReferenceURL ("https://github.com/open-telemetry/opentelemetry-collector/pull/12934" ),
57
- )
58
45
)
59
46
60
47
var indexDonePool = sync.Pool {
@@ -258,80 +245,6 @@ func (pq *persistentQueue[T]) Offer(ctx context.Context, req T) error {
258
245
return pq .putInternal (ctx , req )
259
246
}
260
247
261
- // necessary due to SpanContext and SpanContextConfig not supporting Unmarshal interface,
262
- // see https://github.com/open-telemetry/opentelemetry-go/issues/1819.
263
- type spanContext struct {
264
- TraceID string
265
- SpanID string
266
- TraceFlags string
267
- TraceState string
268
- Remote bool
269
- }
270
-
271
- func localSpanContextFromTraceSpanContext (sc trace.SpanContext ) spanContext {
272
- return spanContext {
273
- TraceID : sc .TraceID ().String (),
274
- SpanID : sc .SpanID ().String (),
275
- TraceFlags : sc .TraceFlags ().String (),
276
- TraceState : sc .TraceState ().String (),
277
- Remote : sc .IsRemote (),
278
- }
279
- }
280
-
281
- func contextWithLocalSpanContext (ctx context.Context , sc spanContext ) context.Context {
282
- traceID , err := trace .TraceIDFromHex (sc .TraceID )
283
- if err != nil {
284
- return ctx
285
- }
286
- spanID , err := trace .SpanIDFromHex (sc .SpanID )
287
- if err != nil {
288
- return ctx
289
- }
290
- traceFlags , err := traceFlagsFromHex (sc .TraceFlags )
291
- if err != nil {
292
- return ctx
293
- }
294
- traceState , err := trace .ParseTraceState (sc .TraceState )
295
- if err != nil {
296
- return ctx
297
- }
298
-
299
- return trace .ContextWithSpanContext (ctx , trace .NewSpanContext (trace.SpanContextConfig {
300
- TraceID : traceID ,
301
- SpanID : spanID ,
302
- TraceFlags : * traceFlags ,
303
- TraceState : traceState ,
304
- Remote : sc .Remote ,
305
- }))
306
- }
307
-
308
- // requestContext wraps trace.SpanContext to allow for unmarshaling as well as
309
- // future metadata key/value pairs to be added.
310
- type requestContext struct {
311
- SpanContext spanContext
312
- }
313
-
314
- // reverse of code in trace library https://github.com/open-telemetry/opentelemetry-go/blob/v1.35.0/trace/trace.go#L143-L168
315
- func traceFlagsFromHex (hexStr string ) (* trace.TraceFlags , error ) {
316
- decoded , err := hex .DecodeString (hexStr )
317
- if err != nil {
318
- return nil , err
319
- }
320
- if len (decoded ) != 1 {
321
- return nil , errors .New (errInvalidTraceFlagsLength )
322
- }
323
- traceFlags := trace .TraceFlags (decoded [0 ])
324
- return & traceFlags , nil
325
- }
326
-
327
- func getAndMarshalSpanContext (ctx context.Context ) ([]byte , error ) {
328
- if ! persistRequestContextFeatureGate .IsEnabled () {
329
- return nil , nil
330
- }
331
- rc := localSpanContextFromTraceSpanContext (trace .SpanContextFromContext (ctx ))
332
- return json .Marshal (requestContext {SpanContext : rc })
333
- }
334
-
335
248
// putInternal is the internal version that requires caller to hold the mutex lock.
336
249
func (pq * persistentQueue [T ]) putInternal (ctx context.Context , req T ) error {
337
250
reqSize := pq .set .sizer .Sizeof (req )
@@ -668,10 +581,6 @@ func getItemKey(index uint64) string {
668
581
return strconv .FormatUint (index , 10 )
669
582
}
670
583
671
- func getContextKey (index uint64 ) string {
672
- return strconv .FormatUint (index , 10 ) + "_context"
673
- }
674
-
675
584
func itemIndexToBytes (value uint64 ) []byte {
676
585
return binary .LittleEndian .AppendUint64 ([]byte {}, value )
677
586
}
0 commit comments