Skip to content

Commit ed30d5d

Browse files
authored
[v2][storage] Create v2 query service to operate on otlp data model (#6343)
## Which problem is this PR solving? - Towards #6337 ## Description of the changes - Implement a v2 version of the query service that operates on the OTLP data model. This PR will be followed up by a series of PRs where this this new query service will be updated with the existing handlers. Once all the handlers have been migrated to use this query service, we can remove the old one. ## How was this change tested? - Added unit tests ## Checklist - [x] I have read https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md - [x] I have signed all commits - [x] I have added unit tests for the new functionality - [x] I have run lint and test steps successfully - for `jaeger`: `make lint test` - for `jaeger-ui`: `yarn lint` and `yarn test` --------- Signed-off-by: Mahad Zaryab <[email protected]>
1 parent 5901fd8 commit ed30d5d

File tree

9 files changed

+724
-11
lines changed

9 files changed

+724
-11
lines changed

cmd/jaeger/internal/integration/trace_reader.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ func (r *traceReader) GetServices(ctx context.Context) ([]string, error) {
9090
return res.Services, nil
9191
}
9292

93-
func (r *traceReader) GetOperations(ctx context.Context, query tracestore.OperationQueryParameters) ([]tracestore.Operation, error) {
93+
func (r *traceReader) GetOperations(ctx context.Context, query tracestore.OperationQueryParams) ([]tracestore.Operation, error) {
9494
var operations []tracestore.Operation
9595
res, err := r.client.GetOperations(ctx, &api_v3.GetOperationsRequest{
9696
Service: query.ServiceName,
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
// Copyright (c) 2024 The Jaeger Authors.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package querysvc
5+
6+
import (
7+
"testing"
8+
9+
"github.com/jaegertracing/jaeger/pkg/testutils"
10+
)
11+
12+
func TestMain(m *testing.M) {
13+
testutils.VerifyGoLeaks(m)
14+
}
Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
// Copyright (c) 2024 The Jaeger Authors.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package querysvc
5+
6+
import (
7+
"context"
8+
"errors"
9+
"time"
10+
11+
"go.opentelemetry.io/collector/pdata/pcommon"
12+
"go.opentelemetry.io/collector/pdata/ptrace"
13+
14+
"github.com/jaegertracing/jaeger/cmd/query/app/querysvc/v2/adjuster"
15+
"github.com/jaegertracing/jaeger/internal/jptrace"
16+
"github.com/jaegertracing/jaeger/model"
17+
"github.com/jaegertracing/jaeger/pkg/iter"
18+
"github.com/jaegertracing/jaeger/storage_v2/depstore"
19+
"github.com/jaegertracing/jaeger/storage_v2/tracestore"
20+
)
21+
22+
var errNoArchiveSpanStorage = errors.New("archive span storage was not configured")
23+
24+
const (
25+
defaultMaxClockSkewAdjust = time.Second
26+
)
27+
28+
// QueryServiceOptions holds the configuration options for the query service.
29+
type QueryServiceOptions struct {
30+
// ArchiveTraceReader is used to read archived traces from the storage.
31+
ArchiveTraceReader tracestore.Reader
32+
// ArchiveTraceWriter is used to write traces to the archive storage.
33+
ArchiveTraceWriter tracestore.Writer
34+
// Adjuster is used to adjust traces before they are returned to the client.
35+
// If not set, the default adjuster will be used.
36+
Adjuster adjuster.Adjuster
37+
}
38+
39+
// StorageCapabilities is a feature flag for query service
40+
type StorageCapabilities struct {
41+
ArchiveStorage bool `json:"archiveStorage"`
42+
// TODO: Maybe add metrics Storage here
43+
// SupportRegex bool
44+
// SupportTagFilter bool
45+
}
46+
47+
// QueryService provides methods to query data from the storage.
48+
type QueryService struct {
49+
traceReader tracestore.Reader
50+
dependencyReader depstore.Reader
51+
options QueryServiceOptions
52+
}
53+
54+
// GetTraceParams defines the parameters for retrieving traces using the GetTraces function.
55+
type GetTraceParams struct {
56+
// TraceIDs is a slice of trace identifiers to fetch.
57+
TraceIDs []tracestore.GetTraceParams
58+
// RawTraces indicates whether to retrieve raw traces.
59+
// If set to false, the traces will be adjusted using QueryServiceOptions.Adjuster.
60+
RawTraces bool
61+
}
62+
63+
// TraceQueryParams represents the parameters for querying a batch of traces.
64+
type TraceQueryParams struct {
65+
tracestore.TraceQueryParams
66+
// RawTraces indicates whether to retrieve raw traces.
67+
// If set to false, the traces will be adjusted using QueryServiceOptions.Adjuster.
68+
RawTraces bool
69+
}
70+
71+
func NewQueryService(
72+
traceReader tracestore.Reader,
73+
dependencyReader depstore.Reader,
74+
options QueryServiceOptions,
75+
) *QueryService {
76+
qsvc := &QueryService{
77+
traceReader: traceReader,
78+
dependencyReader: dependencyReader,
79+
options: options,
80+
}
81+
82+
if qsvc.options.Adjuster == nil {
83+
qsvc.options.Adjuster = adjuster.Sequence(
84+
adjuster.StandardAdjusters(defaultMaxClockSkewAdjust)...)
85+
}
86+
return qsvc
87+
}
88+
89+
// GetTraces retrieves traces with given trace IDs from the primary reader,
90+
// and if any of them are not found it then queries the archive reader.
91+
// The iterator is single-use: once consumed, it cannot be used again.
92+
func (qs QueryService) GetTraces(
93+
ctx context.Context,
94+
params GetTraceParams,
95+
) iter.Seq2[[]ptrace.Traces, error] {
96+
getTracesIter := qs.traceReader.GetTraces(ctx, params.TraceIDs...)
97+
return func(yield func([]ptrace.Traces, error) bool) {
98+
foundTraceIDs, proceed := qs.receiveTraces(getTracesIter, yield, params.RawTraces)
99+
if proceed && qs.options.ArchiveTraceReader != nil {
100+
var missingTraceIDs []tracestore.GetTraceParams
101+
for _, id := range params.TraceIDs {
102+
if _, found := foundTraceIDs[id.TraceID]; !found {
103+
missingTraceIDs = append(missingTraceIDs, id)
104+
}
105+
}
106+
if len(missingTraceIDs) > 0 {
107+
getArchiveTracesIter := qs.options.ArchiveTraceReader.GetTraces(ctx, missingTraceIDs...)
108+
qs.receiveTraces(getArchiveTracesIter, yield, params.RawTraces)
109+
}
110+
}
111+
}
112+
}
113+
114+
func (qs QueryService) GetServices(ctx context.Context) ([]string, error) {
115+
return qs.traceReader.GetServices(ctx)
116+
}
117+
118+
func (qs QueryService) GetOperations(
119+
ctx context.Context,
120+
query tracestore.OperationQueryParams,
121+
) ([]tracestore.Operation, error) {
122+
return qs.traceReader.GetOperations(ctx, query)
123+
}
124+
125+
func (qs QueryService) FindTraces(
126+
ctx context.Context,
127+
query TraceQueryParams,
128+
) iter.Seq2[[]ptrace.Traces, error] {
129+
return func(yield func([]ptrace.Traces, error) bool) {
130+
tracesIter := qs.traceReader.FindTraces(ctx, query.TraceQueryParams)
131+
qs.receiveTraces(tracesIter, yield, query.RawTraces)
132+
}
133+
}
134+
135+
// ArchiveTrace archives a trace specified by the given query parameters.
136+
// If the ArchiveTraceWriter is not configured, it returns
137+
// an error indicating that there is no archive span storage available.
138+
func (qs QueryService) ArchiveTrace(ctx context.Context, query tracestore.GetTraceParams) error {
139+
if qs.options.ArchiveTraceWriter == nil {
140+
return errNoArchiveSpanStorage
141+
}
142+
getTracesIter := qs.GetTraces(
143+
ctx, GetTraceParams{TraceIDs: []tracestore.GetTraceParams{query}},
144+
)
145+
var archiveErr error
146+
getTracesIter(func(traces []ptrace.Traces, err error) bool {
147+
if err != nil {
148+
archiveErr = err
149+
return false
150+
}
151+
for _, trace := range traces {
152+
err = qs.options.ArchiveTraceWriter.WriteTraces(ctx, trace)
153+
if err != nil {
154+
archiveErr = errors.Join(archiveErr, err)
155+
}
156+
}
157+
return true
158+
})
159+
return archiveErr
160+
}
161+
162+
func (qs QueryService) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
163+
return qs.dependencyReader.GetDependencies(ctx, depstore.QueryParameters{
164+
StartTime: endTs.Add(-lookback),
165+
EndTime: endTs,
166+
})
167+
}
168+
169+
func (qs QueryService) GetCapabilities() StorageCapabilities {
170+
return StorageCapabilities{
171+
ArchiveStorage: qs.options.hasArchiveStorage(),
172+
}
173+
}
174+
175+
func (opts *QueryServiceOptions) hasArchiveStorage() bool {
176+
return opts.ArchiveTraceReader != nil && opts.ArchiveTraceWriter != nil
177+
}
178+
179+
func (qs QueryService) receiveTraces(
180+
seq iter.Seq2[[]ptrace.Traces, error],
181+
yield func([]ptrace.Traces, error) bool,
182+
rawTraces bool,
183+
) (map[pcommon.TraceID]struct{}, bool) {
184+
aggregatedTraces := jptrace.AggregateTraces(seq)
185+
foundTraceIDs := make(map[pcommon.TraceID]struct{})
186+
proceed := true
187+
aggregatedTraces(func(trace ptrace.Traces, err error) bool {
188+
if err != nil {
189+
proceed = yield(nil, err)
190+
return proceed
191+
}
192+
if !rawTraces {
193+
qs.options.Adjuster.Adjust(trace)
194+
}
195+
jptrace.SpanIter(trace)(func(_ jptrace.SpanIterPos, span ptrace.Span) bool {
196+
foundTraceIDs[span.TraceID()] = struct{}{}
197+
return true
198+
})
199+
proceed = yield([]ptrace.Traces{trace}, nil)
200+
return proceed
201+
})
202+
return foundTraceIDs, proceed
203+
}

0 commit comments

Comments
 (0)