Skip to content

[v1][adjuster] Change v1 adjuster interface to not return error and modify trace in place #6426

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Dec 27, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 4 additions & 19 deletions cmd/query/app/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,10 +254,7 @@ func (aH *APIHandler) search(w http.ResponseWriter, r *http.Request) {
func (aH *APIHandler) tracesToResponse(traces []*model.Trace, adjust bool, uiErrors []structuredError) *structuredResponse {
uiTraces := make([]*ui.Trace, len(traces))
for i, v := range traces {
uiTrace, uiErr := aH.convertModelToUI(v, adjust)
if uiErr != nil {
uiErrors = append(uiErrors, *uiErr)
}
uiTrace := aH.convertModelToUI(v, adjust)
uiTraces[i] = uiTrace
}

Expand Down Expand Up @@ -364,24 +361,12 @@ func (aH *APIHandler) metrics(w http.ResponseWriter, r *http.Request, getMetrics
aH.writeJSON(w, r, m)
}

func (aH *APIHandler) convertModelToUI(trc *model.Trace, adjust bool) (*ui.Trace, *structuredError) {
var errs []error
func (aH *APIHandler) convertModelToUI(trc *model.Trace, adjust bool) *ui.Trace {
if adjust {
var err error
trc, err = aH.queryService.Adjust(trc)
if err != nil {
errs = append(errs, err)
}
trc = aH.queryService.Adjust(trc)
}
uiTrace := uiconv.FromDomain(trc)
var uiError *structuredError
if err := errors.Join(errs...); err != nil {
uiError = &structuredError{
Msg: err.Error(),
TraceID: uiTrace.TraceID,
}
}
return uiTrace, uiError
return uiTrace
}

func (*APIHandler) deduplicateDependencies(dependencies []model.DependencyLink) []ui.DependencyLink {
Expand Down
40 changes: 0 additions & 40 deletions cmd/query/app/http_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (

"github.com/jaegertracing/jaeger/cmd/query/app/querysvc"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/model/adjuster"
ui "github.com/jaegertracing/jaeger/model/json"
"github.com/jaegertracing/jaeger/pkg/jtracer"
"github.com/jaegertracing/jaeger/pkg/tenancy"
Expand All @@ -59,7 +58,6 @@ func (m *IoReaderMock) Read(b []byte) (int, error) {
var (
errStorageMsg = "storage error"
errStorage = errors.New(errStorageMsg)
errAdjustment = errors.New("adjustment error")

httpClient = &http.Client{
Timeout: 2 * time.Second,
Expand Down Expand Up @@ -375,25 +373,6 @@ func TestGetTraceNotFound(t *testing.T) {
require.EqualError(t, err, parsedError(404, "trace not found"))
}

func TestGetTraceAdjustmentFailure(t *testing.T) {
ts := initializeTestServerWithHandler(
t,
querysvc.QueryServiceOptions{
Adjuster: adjuster.Func(func(trace *model.Trace) (*model.Trace, error) {
return trace, errAdjustment
}),
},
)
ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("spanstore.GetTraceParameters")).
Return(mockTrace, nil).Once()

var response structuredResponse
err := getJSON(ts.server.URL+`/api/traces/123456`, &response)
require.NoError(t, err)
assert.Len(t, response.Errors, 1)
assert.EqualValues(t, errAdjustment.Error(), response.Errors[0].Msg)
}

func TestGetTraceBadTraceID(t *testing.T) {
ts := initializeTestServer(t)

Expand Down Expand Up @@ -564,25 +543,6 @@ func TestSearchByTraceIDFailure(t *testing.T) {
require.EqualError(t, err, parsedError(500, whatsamattayou))
}

func TestSearchModelConversionFailure(t *testing.T) {
ts := initializeTestServerWithOptions(
t,
&tenancy.Manager{},
querysvc.QueryServiceOptions{
Adjuster: adjuster.Func(func(trace *model.Trace) (*model.Trace, error) {
return trace, errAdjustment
}),
},
)
ts.spanReader.On("FindTraces", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("*spanstore.TraceQueryParameters")).
Return([]*model.Trace{mockTrace}, nil).Once()
var response structuredResponse
err := getJSON(ts.server.URL+`/api/traces?service=service&start=0&end=0&operation=operation&limit=200&minDuration=20ms`, &response)
require.NoError(t, err)
assert.Len(t, response.Errors, 1)
assert.EqualValues(t, errAdjustment.Error(), response.Errors[0].Msg)
}

func TestSearchDBFailure(t *testing.T) {
ts := initializeTestServer(t)
ts.spanReader.On("FindTraces", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("*spanstore.TraceQueryParameters")).
Expand Down
2 changes: 1 addition & 1 deletion cmd/query/app/querysvc/query_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@
}

// Adjust applies adjusters to the trace.
func (qs QueryService) Adjust(trace *model.Trace) (*model.Trace, error) {
func (qs QueryService) Adjust(trace *model.Trace) *model.Trace {

Check warning on line 131 in cmd/query/app/querysvc/query_service.go

View check run for this annotation

Codecov / codecov/patch

cmd/query/app/querysvc/query_service.go#L131

Added line #L131 was not covered by tests
return qs.options.Adjuster.Adjust(trace)
}

Expand Down
20 changes: 0 additions & 20 deletions cmd/query/app/querysvc/query_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/model/adjuster"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/pkg/testutils"
"github.com/jaegertracing/jaeger/storage"
Expand All @@ -31,8 +30,6 @@ import (
const millisToNanosMultiplier = int64(time.Millisecond / time.Nanosecond)

var (
errAdjustment = errors.New("adjustment error")

defaultDependencyLookbackDuration = time.Hour * 24

mockTraceID = model.NewTraceID(0, 123456)
Expand Down Expand Up @@ -80,14 +77,6 @@ func withArchiveSpanWriter() testOption {
}
}

func withAdjuster() testOption {
return func(_ *testQueryService, options *QueryServiceOptions) {
options.Adjuster = adjuster.Func(func(trace *model.Trace) (*model.Trace, error) {
return trace, errAdjustment
})
}
}

func initializeTestService(optionAppliers ...testOption) *testQueryService {
readStorage := &spanstoremocks.Reader{}
traceReader := v1adapter.NewTraceReader(readStorage)
Expand Down Expand Up @@ -307,15 +296,6 @@ func TestArchiveTraceSuccess(t *testing.T) {
require.NoError(t, err)
}

// Test QueryService.Adjust()
func TestTraceAdjustmentFailure(t *testing.T) {
tqs := initializeTestService(withAdjuster())

_, err := tqs.queryService.Adjust(mockTrace)
require.Error(t, err)
assert.EqualValues(t, errAdjustment.Error(), err.Error())
}

// Test QueryService.GetDependencies()
func TestGetDependencies(t *testing.T) {
tqs := initializeTestService()
Expand Down
35 changes: 8 additions & 27 deletions model/adjuster/adjuster.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,57 +5,38 @@
package adjuster

import (
"errors"

"github.com/jaegertracing/jaeger/model"
)

// Adjuster applies certain modifications to a Trace object.
// It returns adjusted Trace, which can be the same Trace updated in place.
// If it detects a problem with the trace that prevents it from applying
// adjustments, it must still return the original trace, and the error.
// adjustments, it must still return the original trace.
type Adjuster interface {
Adjust(trace *model.Trace) (*model.Trace, error)
Adjust(trace *model.Trace) *model.Trace
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason to return the trace? I don't think any adjusters actually create a new one, they always return the original trace that may be changed in place. So I would make it consistent with the v2 Adapter API to simplify things.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yurishkuro was thinking the same thing as well - done

}

// Func wraps a function of appropriate signature and makes an Adjuster from it.
type Func func(trace *model.Trace) (*model.Trace, error)
type Func func(trace *model.Trace) *model.Trace

// Adjust implements Adjuster interface.
func (f Func) Adjust(trace *model.Trace) (*model.Trace, error) {
func (f Func) Adjust(trace *model.Trace) *model.Trace {
return f(trace)
}

// Sequence creates an adjuster that combines a series of adjusters
// applied in order. Errors from each step are accumulated and returned
// in the end as a single wrapper error. Errors do not interrupt the
// sequence of adapters.
// applied in order.
func Sequence(adjusters ...Adjuster) Adjuster {
return sequence{adjusters: adjusters}
}

// FailFastSequence is similar to Sequence() but returns immediately
// if any adjuster returns an error.
func FailFastSequence(adjusters ...Adjuster) Adjuster {
return sequence{adjusters: adjusters, failFast: true}
}

type sequence struct {
adjusters []Adjuster
failFast bool
}

func (c sequence) Adjust(trace *model.Trace) (*model.Trace, error) {
var errs []error
func (c sequence) Adjust(trace *model.Trace) *model.Trace {
for _, adjuster := range c.adjusters {
var err error
trace, err = adjuster.Adjust(trace)
if err != nil {
if c.failFast {
return trace, err
}
errs = append(errs, err)
}
trace = adjuster.Adjust(trace)
}
return trace, errors.Join(errs...)
return trace
}
46 changes: 10 additions & 36 deletions model/adjuster/adjuster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,54 +5,28 @@
package adjuster_test

import (
"errors"
"fmt"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/model/adjuster"
)

func TestSequences(t *testing.T) {
// mock adjuster that increments span ID
adj := adjuster.Func(func(trace *model.Trace) (*model.Trace, error) {
adj := adjuster.Func(func(trace *model.Trace) *model.Trace {
trace.Spans[0].SpanID++
return trace, nil
return trace
})

adjErr := errors.New("mock adjuster error")
failingAdj := adjuster.Func(func(trace *model.Trace) (*model.Trace, error) {
return trace, adjErr
})
seqAdjuster := adjuster.Sequence(adj, adj)

span := &model.Span{}
trace := model.Trace{Spans: []*model.Span{span}}

adjTrace := seqAdjuster.Adjust(&trace)

testCases := []struct {
adjuster adjuster.Adjuster
err string
lastSpanID int
}{
{
adjuster: adjuster.Sequence(adj, failingAdj, adj, failingAdj),
err: fmt.Sprintf("%s\n%s", adjErr, adjErr),
lastSpanID: 2,
},
{
adjuster: adjuster.FailFastSequence(adj, failingAdj, adj, failingAdj),
err: adjErr.Error(),
lastSpanID: 1,
},
}

for _, testCase := range testCases {
span := &model.Span{}
trace := model.Trace{Spans: []*model.Span{span}}

adjTrace, err := testCase.adjuster.Adjust(&trace)

assert.Equal(t, span, adjTrace.Spans[0], "same trace & span returned")
assert.EqualValues(t, testCase.lastSpanID, span.SpanID, "expect span ID to be incremented")
require.EqualError(t, err, testCase.err)
}
assert.Equal(t, span, adjTrace.Spans[0], "same trace & span returned")
assert.EqualValues(t, 2, span.SpanID, "expect span ID to be incremented")
}
4 changes: 2 additions & 2 deletions model/adjuster/bad_span_references.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ import (

// SpanReferences creates an adjuster that removes invalid span references, e.g. with traceID==0
func SpanReferences() Adjuster {
return Func(func(trace *model.Trace) (*model.Trace, error) {
return Func(func(trace *model.Trace) *model.Trace {
adjuster := spanReferenceAdjuster{}
for _, span := range trace.Spans {
adjuster.adjust(span)
}
return trace, nil
return trace
})
}

Expand Down
4 changes: 1 addition & 3 deletions model/adjuster/bad_span_references_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/jaegertracing/jaeger/model"
)
Expand All @@ -29,8 +28,7 @@ func TestSpanReferencesAdjuster(t *testing.T) {
},
},
}
trace, err := SpanReferences().Adjust(trace)
require.NoError(t, err)
trace = SpanReferences().Adjust(trace)
assert.Empty(t, trace.Spans[0].References)
assert.Empty(t, trace.Spans[1].References)
assert.Len(t, trace.Spans[2].References, 2)
Expand Down
7 changes: 3 additions & 4 deletions model/adjuster/clockskew.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,9 @@ import (
// The algorithm assumes that all spans have unique IDs, so the trace may need
// to go through another adjuster first, such as ZipkinSpanIDUniquifier.
//
// This adjuster never returns any errors. Instead it records any issues
// it encounters in Span.Warnings.
// Any issues encountered by the adjuster are recorded in Span.Warnings.
func ClockSkew(maxDelta time.Duration) Adjuster {
return Func(func(trace *model.Trace) (*model.Trace, error) {
return Func(func(trace *model.Trace) *model.Trace {
adjuster := &clockSkewAdjuster{
trace: trace,
maxDelta: maxDelta,
Expand All @@ -35,7 +34,7 @@ func ClockSkew(maxDelta time.Duration) Adjuster {
skew := clockSkew{hostKey: n.hostKey}
adjuster.adjustNode(n, nil, skew)
}
return adjuster.trace, nil
return adjuster.trace
})
}

Expand Down
3 changes: 1 addition & 2 deletions model/adjuster/clockskew_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,7 @@ func TestClockSkewAdjuster(t *testing.T) {
testCase := tt // capture loop var
t.Run(testCase.description, func(t *testing.T) {
adjuster := ClockSkew(tt.maxAdjust)
trace, err := adjuster.Adjust(makeTrace(testCase.trace))
require.NoError(t, err)
trace := adjuster.Adjust(makeTrace(testCase.trace))
if testCase.err != "" {
var err string
for _, span := range trace.Spans {
Expand Down
4 changes: 2 additions & 2 deletions model/adjuster/ip_tag.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@ func IPTagAdjuster() Adjuster {
}
}

return Func(func(trace *model.Trace) (*model.Trace, error) {
return Func(func(trace *model.Trace) *model.Trace {
for _, span := range trace.Spans {
adjustTags(span.Tags)
adjustTags(span.Process.Tags)
model.KeyValues(span.Process.Tags).Sort()
}
return trace, nil
return trace
})
}
4 changes: 1 addition & 3 deletions model/adjuster/ip_tag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/jaegertracing/jaeger/model"
)
Expand Down Expand Up @@ -36,8 +35,7 @@ func TestIPTagAdjuster(t *testing.T) {
},
},
}
trace, err := IPTagAdjuster().Adjust(trace)
require.NoError(t, err)
trace = IPTagAdjuster().Adjust(trace)

expectedSpanTags := model.KeyValues{
model.Int64("a", 42),
Expand Down
Loading
Loading