Skip to content

Commit 7f04365

Browse files
committed
Updated according to jguiton's comments
1 parent 3ac26d3 commit 7f04365

File tree

6 files changed

+24
-116
lines changed

6 files changed

+24
-116
lines changed

exporter/exporterhelper/internal/batch_sender_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -622,7 +622,7 @@ func TestBatchSenderWithTimeout(t *testing.T) {
622622
assert.EqualValues(t, 12, sink.ItemsCount())
623623
})
624624
}
625-
runTest("enable_queue_batcher", true)
625+
// When queue_batcher is enabled, we don't propagate context deadline.
626626
runTest("disable_queue_batcher", false)
627627
}
628628

exporter/exporterhelper/internal/batcher/batch_context.go

Lines changed: 8 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
package batcher // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/batcher"
55
import (
66
"context"
7-
"time"
87

98
"go.opentelemetry.io/otel/trace"
109
)
@@ -13,81 +12,20 @@ type traceContextKeyType int
1312

1413
const batchSpanLinksKey traceContextKeyType = iota
1514

16-
// batchContext links the underlying context to all incoming span contexts.
17-
type batchContext struct {
18-
deadline time.Time
19-
deadlineOk bool
20-
deadlineCtx context.Context
21-
ctx context.Context
22-
}
23-
24-
func SpanLinksFromContext(ctx context.Context) []trace.Link {
15+
// LinksFromContext returns a list of trace links registered in the context.
16+
func LinksFromContext(ctx context.Context) []trace.Link {
2517
if ctx == nil {
2618
return []trace.Link{}
2719
}
28-
29-
if bctx, ok := ctx.(batchContext); ok {
30-
if links, ok := bctx.Value(batchSpanLinksKey).([]trace.Link); ok {
31-
return links
32-
}
33-
} else {
34-
panic("lalal")
20+
if links, ok := ctx.Value(batchSpanLinksKey).([]trace.Link); ok {
21+
return links
3522
}
36-
return []trace.Link{}
23+
return []trace.Link{trace.LinkFromContext(ctx)}
3724
}
3825

39-
func newBatchContext(ctx context.Context) batchContext {
40-
deadline, ok := ctx.Deadline()
41-
underlyingCtx := context.WithValue(
26+
func contextWithMergedLinks(ctx1 context.Context, ctx2 context.Context) context.Context {
27+
return context.WithValue(
4228
context.Background(),
4329
batchSpanLinksKey,
44-
[]trace.Link{trace.LinkFromContext(ctx)})
45-
return batchContext{
46-
deadline: deadline,
47-
deadlineOk: ok,
48-
deadlineCtx: ctx,
49-
ctx: underlyingCtx,
50-
}
51-
}
52-
53-
// Merge links the span from incoming context to the span from the first context.
54-
func (mc batchContext) Merge(other context.Context) batchContext {
55-
deadline, deadlineOk := mc.Deadline()
56-
deadlineCtx := mc.deadlineCtx
57-
if otherDeadline, ok := other.Deadline(); ok {
58-
deadlineOk = true
59-
if deadline.Before(otherDeadline) {
60-
deadline = otherDeadline
61-
deadlineCtx = other
62-
}
63-
}
64-
65-
links := append(SpanLinksFromContext(mc), trace.LinkFromContext(other))
66-
underlyingCtx := context.WithValue(
67-
mc.ctx,
68-
batchSpanLinksKey,
69-
links)
70-
return batchContext{
71-
deadline: deadline,
72-
deadlineOk: deadlineOk,
73-
deadlineCtx: deadlineCtx,
74-
ctx: underlyingCtx,
75-
}
76-
}
77-
78-
// Deadline returns the latest deadline of all context.
79-
func (mc batchContext) Deadline() (time.Time, bool) {
80-
return mc.deadline, mc.deadlineOk
81-
}
82-
83-
func (mc batchContext) Done() <-chan struct{} {
84-
return mc.deadlineCtx.Done()
85-
}
86-
87-
func (mc batchContext) Err() error {
88-
return mc.deadlineCtx.Err()
89-
}
90-
91-
func (mc batchContext) Value(key any) any {
92-
return mc.ctx.Value(key)
30+
append(LinksFromContext(ctx1), LinksFromContext(ctx2)...))
9331
}

exporter/exporterhelper/internal/batcher/batch_context_test.go

Lines changed: 4 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -6,47 +6,13 @@ package batcher
66
import (
77
"context"
88
"testing"
9-
"time"
109

1110
"github.com/stretchr/testify/require"
11+
"go.opentelemetry.io/otel/trace"
1212

1313
"go.opentelemetry.io/collector/component/componenttest"
14-
"go.opentelemetry.io/otel/trace"
1514
)
1615

17-
func TestBatchContextDeadline(t *testing.T) {
18-
now := time.Now()
19-
ctx1 := context.Background()
20-
batchContext := newBatchContext(ctx1)
21-
22-
var ok bool
23-
deadline, ok := batchContext.Deadline()
24-
require.Equal(t, time.Time{}, deadline)
25-
require.False(t, ok)
26-
27-
ctx2, cancel2 := context.WithDeadline(context.Background(), now.Add(200))
28-
defer cancel2()
29-
batchContext = batchContext.Merge(ctx2)
30-
31-
deadline, ok = batchContext.Deadline()
32-
require.True(t, ok)
33-
require.Equal(t, now.Add(200), deadline)
34-
35-
ctx3, cancel3 := context.WithDeadline(context.Background(), now.Add(300))
36-
defer cancel3()
37-
ctx4, cancel4 := context.WithDeadline(context.Background(), now.Add(100))
38-
defer cancel4()
39-
batchContext = batchContext.Merge(ctx3)
40-
batchContext = batchContext.Merge(ctx4)
41-
42-
deadline, ok = batchContext.Deadline()
43-
require.True(t, ok)
44-
require.Equal(t, now.Add(300), deadline)
45-
46-
time.Sleep(300)
47-
require.Equal(t, ctx3.Err(), batchContext.Err())
48-
}
49-
5016
func TestBatchContextLink(t *testing.T) {
5117
tracerProvider := componenttest.NewTelemetry().NewTelemetrySettings().TracerProvider
5218
tracer := tracerProvider.Tracer("go.opentelemetry.io/collector/exporter/exporterhelper")
@@ -55,19 +21,17 @@ func TestBatchContextLink(t *testing.T) {
5521

5622
ctx2, span2 := tracer.Start(ctx1, "span2")
5723
defer span2.End()
58-
batchContext := newBatchContext(ctx2)
5924

6025
ctx3, span3 := tracer.Start(ctx1, "span3")
6126
defer span3.End()
62-
batchContext = batchContext.Merge(ctx3)
6327

6428
ctx4, span4 := tracer.Start(ctx1, "span4")
6529
defer span4.End()
66-
batchContext = batchContext.Merge(ctx4)
6730

68-
span2.AddEvent("This is an event.")
31+
batchContext := contextWithMergedLinks(ctx2, ctx3)
32+
batchContext = contextWithMergedLinks(batchContext, ctx4)
6933

70-
actualLinks := SpanLinksFromContext(batchContext)
34+
actualLinks := LinksFromContext(batchContext)
7135
require.Len(t, actualLinks, 3)
7236
require.Equal(t, trace.SpanContextFromContext(ctx2), actualLinks[0].SpanContext)
7337
require.Equal(t, trace.SpanContextFromContext(ctx3), actualLinks[1].SpanContext)

exporter/exporterhelper/internal/batcher/default_batcher.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import (
1717
)
1818

1919
type batch struct {
20-
ctx batchContext
20+
ctx context.Context
2121
req request.Request
2222
done multiDone
2323
}
@@ -86,7 +86,7 @@ func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done
8686
// Do not flush the last item and add it to the current batch.
8787
reqList = reqList[:len(reqList)-1]
8888
qb.currentBatch = &batch{
89-
ctx: newBatchContext(ctx),
89+
ctx: ctx,
9090
req: lastReq,
9191
done: multiDone{done},
9292
}
@@ -122,7 +122,7 @@ func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done
122122
// Logic on how to deal with the current batch:
123123
qb.currentBatch.req = reqList[0]
124124
qb.currentBatch.done = append(qb.currentBatch.done, done)
125-
qb.currentBatch.ctx = qb.currentBatch.ctx.Merge(ctx)
125+
qb.currentBatch.ctx = contextWithMergedLinks(qb.currentBatch.ctx, ctx)
126126

127127
// Save the "currentBatch" if we need to flush it, because we want to execute flush without holding the lock, and
128128
// cannot unlock and re-lock because we are not done processing all the responses.
@@ -142,7 +142,7 @@ func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done
142142
// Do not flush the last item and add it to the current batch.
143143
reqList = reqList[:len(reqList)-1]
144144
qb.currentBatch = &batch{
145-
ctx: newBatchContext(ctx),
145+
ctx: ctx,
146146
req: lastReq,
147147
done: multiDone{done},
148148
}

exporter/exporterhelper/internal/obs_report_sender.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ func (ors *obsReportSender[K]) Send(ctx context.Context, req K) error {
9696
// StartOp creates the span used to trace the operation. Returning
9797
// the updated context and the created span.
9898
func (ors *obsReportSender[K]) startOp(ctx context.Context) context.Context {
99-
spanLinks := batcher.SpanLinksFromContext(ctx)
99+
spanLinks := batcher.LinksFromContext(ctx)
100100

101101
// This span should contain the links to spans of all batched requests.
102102
ctx, _ = ors.tracer.Start(ctx, ors.spanName, ors.spanAttrs, trace.WithLinks(spanLinks...))

exporter/exporterhelper/internal/queue_sender.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,13 @@ import (
77
"context"
88
"errors"
99

10+
"go.opentelemetry.io/otel/trace"
1011
"go.uber.org/zap"
1112

1213
"go.opentelemetry.io/collector/component"
1314
"go.opentelemetry.io/collector/exporter/exporterbatcher"
1415
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/batcher"
16+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/metadata"
1517
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
1618
"go.opentelemetry.io/collector/exporter/exporterqueue"
1719
"go.opentelemetry.io/collector/featuregate"
@@ -111,7 +113,11 @@ func NewQueueSender(
111113
}
112114

113115
q, err := newObsQueue(qSet, qf(context.Background(), qSet, qCfg, func(ctx context.Context, req request.Request, done exporterqueue.Done) {
116+
// TODO: move start of span to enqueue instead to dequeue.
117+
// Figure out how to preserve span context across persistent storage.
118+
ctx, _ = metadata.Tracer(qSet.ExporterSettings.TelemetrySettings).Start(ctx, "exporter/enqueue")
114119
done.OnDone(exportFunc(ctx, req))
120+
trace.SpanFromContext(ctx).End()
115121
}))
116122
if err != nil {
117123
return nil, err

0 commit comments

Comments
 (0)