-
Notifications
You must be signed in to change notification settings - Fork 2.6k
[v2][storage] Create v2 query service to operate on otlp data model #6343
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 7 commits
ad0de07
24278bb
41f1d51
a33ab6c
ffad629
1d721ac
a6df503
60a875a
d31a5e5
2c7a848
d32ba48
ecab080
9ee2d38
57ef9e7
40d9a68
32491f5
3d6af5f
ab95ab7
6737ca9
005d8d7
7171785
e5a4b1b
f887d09
959d2d4
d468249
f6d157e
d2f1c95
6f69d63
9250285
1e0c5b3
c14fc4f
73fbb19
5020f89
e29eafe
2c716bf
28e5fd0
5ae23f2
a79bcac
5636427
529bcda
806a265
f163d55
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,175 @@ | ||
// Copyright (c) 2024 The Jaeger Authors. | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package querysvc | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"time" | ||
|
||
"go.opentelemetry.io/collector/pdata/pcommon" | ||
"go.opentelemetry.io/collector/pdata/ptrace" | ||
|
||
"github.com/jaegertracing/jaeger/cmd/query/app/querysvc/adjuster" | ||
"github.com/jaegertracing/jaeger/model" | ||
"github.com/jaegertracing/jaeger/pkg/iter" | ||
"github.com/jaegertracing/jaeger/storage_v2/depstore" | ||
"github.com/jaegertracing/jaeger/storage_v2/tracestore" | ||
) | ||
|
||
var errNoArchiveSpanStorage = errors.New("archive span storage was not configured") | ||
|
||
const ( | ||
defaultMaxClockSkewAdjust = time.Second | ||
) | ||
|
||
// QueryServiceOptions has optional members of QueryService | ||
type QueryServiceOptionsV2 struct { | ||
ArchiveTraceReader tracestore.Reader | ||
ArchiveTraceWriter tracestore.Writer | ||
Adjuster adjuster.Adjuster | ||
} | ||
yurishkuro marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
// StorageCapabilities is a feature flag for query service | ||
type StorageCapabilitiesV2 struct { | ||
mahadzaryab1 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
ArchiveStorage bool `json:"archiveStorage"` | ||
// TODO: Maybe add metrics Storage here | ||
// SupportRegex bool | ||
// SupportTagFilter bool | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @yurishkuro do you have the context for these items? do we still want to do them? |
||
} | ||
|
||
// QueryService contains span utils required by the query-service. | ||
type QueryServiceV2 struct { | ||
traceReader tracestore.Reader | ||
dependencyReader depstore.Reader | ||
options QueryServiceOptionsV2 | ||
yurishkuro marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
// NewQueryService returns a new QueryService. | ||
func NewQueryServiceV2( | ||
traceReader tracestore.Reader, | ||
dependencyReader depstore.Reader, | ||
options QueryServiceOptionsV2, | ||
) *QueryServiceV2 { | ||
qsvc := &QueryServiceV2{ | ||
traceReader: traceReader, | ||
dependencyReader: dependencyReader, | ||
options: options, | ||
} | ||
|
||
if qsvc.options.Adjuster == nil { | ||
qsvc.options.Adjuster = adjuster.Sequence(adjuster.StandardAdjusters(defaultMaxClockSkewAdjust)...) | ||
mahadzaryab1 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
return qsvc | ||
} | ||
yurishkuro marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
// GetTrace is the queryService implementation of tracestore.Reader.GetTrace | ||
mahadzaryab1 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
func (qs QueryServiceV2) GetTraces(ctx context.Context, traceIDs ...tracestore.GetTraceParams) iter.Seq2[[]ptrace.Traces, error] { | ||
getTracesIter := qs.traceReader.GetTraces(ctx, traceIDs...) | ||
return func(yield func([]ptrace.Traces, error) bool) { | ||
foundTraceIDs := make(map[pcommon.TraceID]struct{}) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you can move code from L94-L112 into a helper function
and reuse it for main reader, archive reader, and in FindTraces (which is currently incorrect as it does not pre-aggreate the chunks). |
||
getTracesIter(func(traces []ptrace.Traces, err error) bool { | ||
if err != nil { | ||
return yield(nil, err) | ||
} | ||
for _, trace := range traces { | ||
resources := trace.ResourceSpans() | ||
for i := 0; i < resources.Len(); i++ { | ||
scopes := resources.At(i).ScopeSpans() | ||
for j := 0; j < scopes.Len(); j++ { | ||
spans := scopes.At(j).Spans() | ||
for k := 0; k < spans.Len(); k++ { | ||
span := spans.At(k) | ||
foundTraceIDs[span.TraceID()] = struct{}{} | ||
} | ||
} | ||
} | ||
} | ||
return yield(traces, nil) | ||
mahadzaryab1 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
}) | ||
if qs.options.ArchiveTraceReader != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. somewhere you should be checking if an error was already encountered or the caller's There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @yurishkuro there's a check on line 97 - or are you talking about a different check? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. More comprehensively - no matter what happens inside your lambda function once you get to this point you still continue, even though the caller's There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @yurishkuro Thanks for the explanation. I made this change - let me know if it makes sense. |
||
missingTraceIDs := []tracestore.GetTraceParams{} | ||
mahadzaryab1 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
for _, id := range traceIDs { | ||
if _, found := foundTraceIDs[id.TraceID]; !found { | ||
missingTraceIDs = append(missingTraceIDs, id) | ||
} | ||
} | ||
if len(missingTraceIDs) > 0 { | ||
qs.options.ArchiveTraceReader.GetTraces(ctx, missingTraceIDs...)(yield) | ||
mahadzaryab1 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} | ||
} | ||
} | ||
|
||
// GetServices is the queryService implementation of tracestore.Reader.GetServices | ||
func (qs QueryServiceV2) GetServices(ctx context.Context) ([]string, error) { | ||
return qs.traceReader.GetServices(ctx) | ||
} | ||
|
||
// GetOperations is the queryService implementation of tracestore.Reader.GetOperations | ||
func (qs QueryServiceV2) GetOperations( | ||
ctx context.Context, | ||
query tracestore.OperationQueryParameters, | ||
) ([]tracestore.Operation, error) { | ||
return qs.traceReader.GetOperations(ctx, query) | ||
} | ||
|
||
// FindTraces is the queryService implementation of tracestore.Reader.FindTraces | ||
func (qs QueryServiceV2) FindTraces(ctx context.Context, query tracestore.TraceQueryParams) iter.Seq2[[]ptrace.Traces, error] { | ||
return qs.traceReader.FindTraces(ctx, query) | ||
} | ||
|
||
// ArchiveTrace is the queryService utility to archive traces. | ||
func (qs QueryServiceV2) ArchiveTrace(ctx context.Context, traceID pcommon.TraceID) error { | ||
if qs.options.ArchiveTraceWriter == nil { | ||
return errNoArchiveSpanStorage | ||
} | ||
getTracesIter := qs.GetTraces(ctx, tracestore.GetTraceParams{TraceID: traceID}) | ||
var archiveErr error | ||
getTracesIter(func(traces []ptrace.Traces, err error) bool { | ||
if err != nil { | ||
archiveErr = err | ||
return false | ||
} | ||
for _, trace := range traces { | ||
err = qs.options.ArchiveTraceWriter.WriteTraces(ctx, trace) | ||
if err != nil { | ||
archiveErr = err | ||
return false | ||
} | ||
} | ||
return true | ||
}) | ||
return archiveErr | ||
} | ||
|
||
// Adjust applies adjusters to the trace. | ||
func (qs QueryServiceV2) Adjust(tracesIter iter.Seq[[]ptrace.Traces]) { | ||
tracesIter(func(traces []ptrace.Traces) bool { | ||
for _, trace := range traces { | ||
qs.options.Adjuster.Adjust(trace) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you cannot assume here that |
||
} | ||
return true | ||
}) | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @yurishkuro Did I understand correctly in that this is what we wanted here? Or did you mean that we should change the underlying adjusters themselves to work on Seq? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, you understood correctly. However, because adjusting needs to re-arrange the data I think this func should return |
||
|
||
// GetDependencies implements depstore.Reader.GetDependencies | ||
func (qs QueryServiceV2) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) { | ||
return qs.dependencyReader.GetDependencies(ctx, depstore.QueryParameters{ | ||
StartTime: endTs.Add(-lookback), | ||
EndTime: endTs, | ||
}) | ||
} | ||
|
||
// GetCapabilities returns the features supported by the query service. | ||
func (qs QueryServiceV2) GetCapabilities() StorageCapabilities { | ||
return StorageCapabilities{ | ||
ArchiveStorage: qs.options.hasArchiveStorage(), | ||
} | ||
mahadzaryab1 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
// hasArchiveStorage returns true if archive storage reader/writer are initialized. | ||
func (opts *QueryServiceOptionsV2) hasArchiveStorage() bool { | ||
return opts.ArchiveTraceReader != nil && opts.ArchiveTraceWriter != nil | ||
} |
Uh oh!
There was an error while loading. Please reload this page.