Skip to content

Commit ba6228f

Browse files
authored
[v2][query] Implement helper to buffer sequence of traces (#6401)
## Which problem is this PR solving? - Towards #6337 ## Description of the changes - This PR implements a helper `AggregateTraces` in the `jptrace` package to aggregate a sequence of `[]ptrace.Traces` to `ptrace.Traces`. This was done by combining contiguous trace chunks together based on the traceID. ## How was this change tested? - Added unit tests ## Checklist - [x] I have read https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md - [x] I have signed all commits - [x] I have added unit tests for the new functionality - [x] I have run lint and test steps successfully - for `jaeger`: `make lint test` - for `jaeger-ui`: `npm run lint` and `npm run test` --------- Signed-off-by: Mahad Zaryab <[email protected]>
1 parent 774bf9f commit ba6228f

File tree

3 files changed

+192
-0
lines changed

3 files changed

+192
-0
lines changed

internal/jptrace/aggregator.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
// Copyright (c) 2024 The Jaeger Authors.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package jptrace
5+
6+
import (
7+
"iter"
8+
9+
"go.opentelemetry.io/collector/pdata/pcommon"
10+
"go.opentelemetry.io/collector/pdata/ptrace"
11+
)
12+
13+
// AggregateTraces aggregates a sequence of trace batches into individual traces.
14+
//
15+
// The `tracesSeq` input must adhere to the chunking requirements of tracestore.Reader.GetTraces.
16+
func AggregateTraces(tracesSeq iter.Seq2[[]ptrace.Traces, error]) iter.Seq2[ptrace.Traces, error] {
17+
return func(yield func(trace ptrace.Traces, err error) bool) {
18+
currentTrace := ptrace.NewTraces()
19+
currentTraceID := pcommon.NewTraceIDEmpty()
20+
21+
tracesSeq(func(traces []ptrace.Traces, err error) bool {
22+
if err != nil {
23+
yield(ptrace.NewTraces(), err)
24+
return false
25+
}
26+
for _, trace := range traces {
27+
resources := trace.ResourceSpans()
28+
traceID := resources.At(0).ScopeSpans().At(0).Spans().At(0).TraceID()
29+
if currentTraceID == traceID {
30+
mergeTraces(trace, currentTrace)
31+
} else {
32+
if currentTrace.ResourceSpans().Len() > 0 {
33+
if !yield(currentTrace, nil) {
34+
return false
35+
}
36+
}
37+
currentTrace = trace
38+
currentTraceID = traceID
39+
}
40+
}
41+
return true
42+
})
43+
if currentTrace.ResourceSpans().Len() > 0 {
44+
yield(currentTrace, nil)
45+
}
46+
}
47+
}
48+
49+
func mergeTraces(src, dest ptrace.Traces) {
50+
resources := src.ResourceSpans()
51+
for i := 0; i < resources.Len(); i++ {
52+
resource := resources.At(i)
53+
resource.CopyTo(dest.ResourceSpans().AppendEmpty())
54+
}
55+
}

internal/jptrace/aggregator_test.go

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
// Copyright (c) 2024 The Jaeger Authors.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package jptrace
5+
6+
import (
7+
"testing"
8+
9+
"github.com/stretchr/testify/assert"
10+
"github.com/stretchr/testify/require"
11+
"go.opentelemetry.io/collector/pdata/pcommon"
12+
"go.opentelemetry.io/collector/pdata/ptrace"
13+
)
14+
15+
func TestAggregateTraces_AggregatesSpansWithSameTraceID(t *testing.T) {
16+
trace1 := ptrace.NewTraces()
17+
resource1 := trace1.ResourceSpans().AppendEmpty()
18+
scope1 := resource1.ScopeSpans().AppendEmpty()
19+
span1 := scope1.Spans().AppendEmpty()
20+
span1.SetTraceID(pcommon.TraceID([16]byte{1}))
21+
span1.SetName("span1")
22+
23+
trace1Continued := ptrace.NewTraces()
24+
resource2 := trace1Continued.ResourceSpans().AppendEmpty()
25+
scope2 := resource2.ScopeSpans().AppendEmpty()
26+
span2 := scope2.Spans().AppendEmpty()
27+
span2.SetTraceID(pcommon.TraceID([16]byte{1}))
28+
span2.SetName("span2")
29+
30+
trace2 := ptrace.NewTraces()
31+
resource3 := trace2.ResourceSpans().AppendEmpty()
32+
scope3 := resource3.ScopeSpans().AppendEmpty()
33+
span3 := scope3.Spans().AppendEmpty()
34+
span3.SetTraceID(pcommon.TraceID([16]byte{2}))
35+
span3.SetName("span3")
36+
37+
trace3 := ptrace.NewTraces()
38+
resource4 := trace3.ResourceSpans().AppendEmpty()
39+
scope4 := resource4.ScopeSpans().AppendEmpty()
40+
span4 := scope4.Spans().AppendEmpty()
41+
span4.SetTraceID(pcommon.TraceID([16]byte{3}))
42+
span4.SetName("span4")
43+
44+
tracesSeq := func(yield func([]ptrace.Traces, error) bool) {
45+
yield([]ptrace.Traces{trace1, trace1Continued, trace2}, nil)
46+
yield([]ptrace.Traces{trace3}, nil)
47+
}
48+
49+
var result []ptrace.Traces
50+
AggregateTraces(tracesSeq)(func(trace ptrace.Traces, _ error) bool {
51+
result = append(result, trace)
52+
return true
53+
})
54+
55+
require.Len(t, result, 3)
56+
57+
require.Equal(t, 2, result[0].ResourceSpans().Len())
58+
require.Equal(t, 1, result[1].ResourceSpans().Len())
59+
require.Equal(t, 1, result[2].ResourceSpans().Len())
60+
61+
gotSpan1 := result[0].ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0)
62+
require.Equal(t, gotSpan1.TraceID(), pcommon.TraceID([16]byte{1}))
63+
require.Equal(t, "span1", gotSpan1.Name())
64+
65+
gotSpan2 := result[0].ResourceSpans().At(1).ScopeSpans().At(0).Spans().At(0)
66+
require.Equal(t, gotSpan2.TraceID(), pcommon.TraceID([16]byte{1}))
67+
require.Equal(t, "span2", gotSpan2.Name())
68+
69+
gotSpan3 := result[1].ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0)
70+
require.Equal(t, gotSpan3.TraceID(), pcommon.TraceID([16]byte{2}))
71+
require.Equal(t, "span3", gotSpan3.Name())
72+
73+
gotSpan4 := result[2].ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0)
74+
require.Equal(t, gotSpan4.TraceID(), pcommon.TraceID([16]byte{3}))
75+
require.Equal(t, "span4", gotSpan4.Name())
76+
}
77+
78+
func TestAggregateTraces_YieldsErrorFromTracesSeq(t *testing.T) {
79+
trace1 := ptrace.NewTraces()
80+
resource1 := trace1.ResourceSpans().AppendEmpty()
81+
scope1 := resource1.ScopeSpans().AppendEmpty()
82+
span1 := scope1.Spans().AppendEmpty()
83+
span1.SetTraceID(pcommon.TraceID([16]byte{1}))
84+
span1.SetName("span1")
85+
86+
tracesSeq := func(yield func([]ptrace.Traces, error) bool) {
87+
if !yield(nil, assert.AnError) {
88+
return
89+
}
90+
yield([]ptrace.Traces{trace1}, nil) // should not get here
91+
}
92+
aggregatedSeq := AggregateTraces(tracesSeq)
93+
94+
var lastResult ptrace.Traces
95+
var lastErr error
96+
aggregatedSeq(func(trace ptrace.Traces, e error) bool {
97+
lastResult = trace
98+
if e != nil {
99+
lastErr = e
100+
}
101+
return true
102+
})
103+
104+
require.ErrorIs(t, lastErr, assert.AnError)
105+
require.Equal(t, ptrace.NewTraces(), lastResult)
106+
}
107+
108+
func TestAggregateTraces_RespectsEarlyReturn(t *testing.T) {
109+
trace1 := ptrace.NewTraces()
110+
resource1 := trace1.ResourceSpans().AppendEmpty()
111+
scope1 := resource1.ScopeSpans().AppendEmpty()
112+
span1 := scope1.Spans().AppendEmpty()
113+
span1.SetTraceID(pcommon.TraceID([16]byte{1}))
114+
span1.SetName("span1")
115+
116+
trace2 := ptrace.NewTraces()
117+
resource2 := trace2.ResourceSpans().AppendEmpty()
118+
scope2 := resource2.ScopeSpans().AppendEmpty()
119+
span2 := scope2.Spans().AppendEmpty()
120+
span2.SetTraceID(pcommon.TraceID([16]byte{2}))
121+
span2.SetName("span2")
122+
123+
tracesSeq := func(yield func([]ptrace.Traces, error) bool) {
124+
yield([]ptrace.Traces{trace1}, nil)
125+
yield([]ptrace.Traces{trace2}, nil)
126+
}
127+
aggregatedSeq := AggregateTraces(tracesSeq)
128+
129+
var lastResult ptrace.Traces
130+
aggregatedSeq(func(trace ptrace.Traces, _ error) bool {
131+
lastResult = trace
132+
return false
133+
})
134+
135+
require.Equal(t, trace1, lastResult)
136+
}

storage_v2/tracestore/reader.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ type Reader interface {
2222
// Chunking requirements:
2323
// - A single ptrace.Traces chunk MUST NOT contain spans from multiple traces.
2424
// - Large traces MAY be split across multiple, *consecutive* ptrace.Traces chunks.
25+
// - Each returned ptrace.Traces object MUST NOT be empty.
2526
//
2627
// Edge cases:
2728
// - If no spans are found for any given trace ID, the ID is ignored.

0 commit comments

Comments
 (0)