Skip to content

[v2][adjuster] Implement span ID uniquifier adjuster to operate on otlp data model #6367

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions cmd/query/app/querysvc/adjuster/adjuster.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"go.opentelemetry.io/collector/pdata/ptrace"
)

var _ Adjuster = (Func)(nil)

// Adjuster is an interface for modifying a trace object in place.
// If an issue is encountered that prevents modifications, an error should be returned.
// The caller must ensure that all spans in the ptrace.Traces argument
Expand All @@ -17,6 +19,14 @@ type Adjuster interface {
Adjust(ptrace.Traces) error
}

// Func is a type alias that wraps a function and makes an Adjuster from it.
type Func func(traces ptrace.Traces) error

// Adjust implements Adjuster interface for the Func alias.
func (f Func) Adjust(traces ptrace.Traces) error {
return f(traces)
}

// Sequence creates an adjuster that combines a series of adjusters
// applied in order. Errors from each step are accumulated and returned
// in the end as a single wrapper error. Errors do not interrupt the
Expand Down
2 changes: 2 additions & 0 deletions cmd/query/app/querysvc/adjuster/ipattribute.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"go.opentelemetry.io/collector/pdata/ptrace"
)

var _ Adjuster = (*IPAttributeAdjuster)(nil)

var ipAttributesToCorrect = map[string]struct{}{
"ip": {},
"peer.ipv4": {},
Expand Down
2 changes: 2 additions & 0 deletions cmd/query/app/querysvc/adjuster/resourceattributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"github.com/jaegertracing/jaeger/pkg/otelsemconv"
)

var _ Adjuster = (*ResourceAttributesAdjuster)(nil)

var libraryKeys = map[string]struct{}{
string(otelsemconv.TelemetrySDKLanguageKey): {},
string(otelsemconv.TelemetrySDKNameKey): {},
Expand Down
157 changes: 157 additions & 0 deletions cmd/query/app/querysvc/adjuster/spaniduniquifier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
// Copyright (c) 2019 The Jaeger Authors.
// Copyright (c) 2017 Uber Technologies, Inc.
// SPDX-License-Identifier: Apache-2.0

package adjuster

import (
"errors"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/jaegertracing/jaeger/internal/jptrace"
)

const (
warningTooManySpans = "cannot assign unique span ID, too many spans in the trace"
)

// SpanIDUniquifier returns an adjuster that changes span ids for server
// spans (i.e. spans with tag: span.kind == server) if there is another
// client span that shares the same span ID. This is needed to deal with
// Zipkin-style clients that reuse the same span ID for both client and server
// side of an RPC call. Jaeger UI expects all spans to have unique IDs.
//
// This adjuster never returns any errors. Instead it records any issues
// it encounters in Span.Warnings.
func SpanIDUniquifier() Func {
return Func(func(traces ptrace.Traces) error {
adjuster := spanIDDeduper{
spansByID: make(map[pcommon.SpanID][]ptrace.Span),
maxUsedID: pcommon.NewSpanIDEmpty(),
}
return adjuster.adjust(traces)
})
}

type spanIDDeduper struct {
spansByID map[pcommon.SpanID][]ptrace.Span
maxUsedID pcommon.SpanID
}

func (d *spanIDDeduper) adjust(traces ptrace.Traces) error {
d.groupSpansByID(traces)
d.uniquifyServerSpanIDs(traces)
return nil
}

// groupSpansByID groups spans with the same ID returning a map id -> []Span
func (d *spanIDDeduper) groupSpansByID(traces ptrace.Traces) {
resourceSpans := traces.ResourceSpans()
for i := 0; i < resourceSpans.Len(); i++ {
rs := resourceSpans.At(i)
scopeSpans := rs.ScopeSpans()
for j := 0; j < scopeSpans.Len(); j++ {
ss := scopeSpans.At(j)
spans := ss.Spans()
for k := 0; k < spans.Len(); k++ {
span := spans.At(k)
if spans, ok := d.spansByID[span.SpanID()]; ok {
d.spansByID[span.SpanID()] = append(spans, span)
} else {
d.spansByID[span.SpanID()] = []ptrace.Span{span}
}
}
}
}
}

func (d *spanIDDeduper) isSharedWithClientSpan(spanID pcommon.SpanID) bool {
spans := d.spansByID[spanID]
for _, span := range spans {
if span.Kind() == ptrace.SpanKindClient {
return true
}
}
return false
}

func (d *spanIDDeduper) uniquifyServerSpanIDs(traces ptrace.Traces) {
oldToNewSpanIDs := make(map[pcommon.SpanID]pcommon.SpanID)
resourceSpans := traces.ResourceSpans()
for i := 0; i < resourceSpans.Len(); i++ {
rs := resourceSpans.At(i)
scopeSpans := rs.ScopeSpans()
for j := 0; j < scopeSpans.Len(); j++ {
ss := scopeSpans.At(j)
spans := ss.Spans()
for k := 0; k < spans.Len(); k++ {
span := spans.At(k)
// only replace span IDs for server-side spans that share the ID with something else
if span.Kind() == ptrace.SpanKindServer && d.isSharedWithClientSpan(span.SpanID()) {
newID, err := d.makeUniqueSpanID()
if err != nil {
jptrace.AddWarning(span, err.Error())
continue
}
oldToNewSpanIDs[span.SpanID()] = newID
span.SetParentSpanID(span.SpanID()) // previously shared ID is the new parent
span.SetSpanID(newID)
}
}
}
}
d.swapParentIDs(traces, oldToNewSpanIDs)
}

// swapParentIDs corrects ParentSpanID of all spans that are children of the server
// spans whose IDs we made unique.
func (*spanIDDeduper) swapParentIDs(
traces ptrace.Traces,
oldToNewSpanIDs map[pcommon.SpanID]pcommon.SpanID,
) {
resourceSpans := traces.ResourceSpans()
for i := 0; i < resourceSpans.Len(); i++ {
rs := resourceSpans.At(i)
scopeSpans := rs.ScopeSpans()
for j := 0; j < scopeSpans.Len(); j++ {
ss := scopeSpans.At(j)
spans := ss.Spans()
for k := 0; k < spans.Len(); k++ {
span := spans.At(k)
if parentID, ok := oldToNewSpanIDs[span.ParentSpanID()]; ok {
if span.SpanID() != parentID {
span.SetParentSpanID(parentID)
}
}
}
}
}
}

// makeUniqueSpanID returns a new ID that is not used in the trace,
// or an error if such ID cannot be generated, which is unlikely,
// given that the whole space of span IDs is 2^64.
func (d *spanIDDeduper) makeUniqueSpanID() (pcommon.SpanID, error) {
id := incrementSpanID(d.maxUsedID)
for id != pcommon.NewSpanIDEmpty() {
if _, exists := d.spansByID[id]; !exists {
d.maxUsedID = id
return id, nil
}
id = incrementSpanID(id)
}
return pcommon.NewSpanIDEmpty(), errors.New(warningTooManySpans)
}

func incrementSpanID(spanID pcommon.SpanID) pcommon.SpanID {
newID := spanID
for i := len(newID) - 1; i >= 0; i-- {
newID[i]++
if newID[i] != 0 {
break
}
}
return newID
}
108 changes: 108 additions & 0 deletions cmd/query/app/querysvc/adjuster/spaniduniquifier_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright (c) 2019 The Jaeger Authors.
// Copyright (c) 2017 Uber Technologies, Inc.
// SPDX-License-Identifier: Apache-2.0

package adjuster

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/jaegertracing/jaeger/internal/jptrace"
)

var (
clientSpanID = pcommon.SpanID([]byte{0, 0, 0, 0, 0, 0, 0, 1})
anotherSpanID = pcommon.SpanID([]byte{1, 0, 0, 0, 0, 0, 0, 0})
)

func makeTraces() ptrace.Traces {
traceID := pcommon.TraceID([]byte{0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 3})

traces := ptrace.NewTraces()
spans := traces.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans()

clientSpan := spans.AppendEmpty()
clientSpan.SetTraceID(traceID)
clientSpan.SetSpanID(clientSpanID)
clientSpan.SetKind(ptrace.SpanKindClient)

serverSpan := spans.AppendEmpty()
serverSpan.SetTraceID(traceID)
serverSpan.SetSpanID(clientSpanID) // shared span ID
serverSpan.SetKind(ptrace.SpanKindServer)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so you dropped the edge case of two server spans sharing client span id? Maybe it's for the better - we always wanted to support partial spans where we could have 2+ span "chunks" sharing the ID which should be merged into a single span. To support partial spans and this use case of dual server spans sharing client span ID we'd need to be very careful about examining the Resource/Scope of the spans to ensure they are identical (or one is empty).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yurishkuro Yeah I dropped it given that the behaviour is undefined and in favour of keeping the implementation the same as the existing adjuster. Happy to revisit this again in the future if we want to support partial spans as you suggested.


anotherServerSpan := spans.AppendEmpty()
anotherServerSpan.SetTraceID(traceID)
anotherServerSpan.SetSpanID(clientSpanID) // shared span ID
anotherServerSpan.SetKind(ptrace.SpanKindServer)

anotherSpan := spans.AppendEmpty()
anotherSpan.SetTraceID(traceID)
anotherSpan.SetSpanID(anotherSpanID)

return traces
}

func TestSpanIDUniquifierTriggered(t *testing.T) {
traces := makeTraces()
deduper := SpanIDUniquifier()
require.NoError(t, deduper.Adjust(traces))

spans := traces.ResourceSpans().At(0).ScopeSpans().At(0).Spans()

clientSpan := spans.At(0)
assert.Equal(t, clientSpanID, clientSpan.SpanID(), "client span ID should not change")

serverSpan := spans.At(1)
assert.EqualValues(t, []byte{0, 0, 0, 0, 0, 0, 0, 2}, serverSpan.SpanID(), "server span ID should be reassigned")
assert.EqualValues(t, []byte{0, 0, 0, 0, 0, 0, 0, 3}, serverSpan.ParentSpanID(), "next server span should be this server span's parent")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would expect serverSpan.parentSpanId == clientSpanID. If it's id=3, there was no such span in the trace before, so it's a broken link.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yurishkuro That's what i was expecting as well but this is a direct translation of what was there before. If you take a look at the old implementation, oldToNewSpanIDs gets overwritten with the latest trace that it encounters which is what causes this. Do we want to change the behaviour here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was due to the undefined behaviour resulting from a having two server spans sharing the same clientID. This is now fixed.


anotherServerSpan := spans.At(2)
assert.EqualValues(t, []byte{0, 0, 0, 0, 0, 0, 0, 3}, anotherServerSpan.SpanID(), "server span ID should be reassigned")
assert.EqualValues(t, clientSpanID, anotherServerSpan.ParentSpanID(), "client span should be server span's parent")
Copy link
Collaborator Author

@mahadzaryab1 mahadzaryab1 Dec 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yurishkuro does this expectation make sense to you? it takes the last repeated span and sets that to be the parent and then adjusts the repeated ones before to have that be the parent. So in this case, the hierarchy becomes clientSpan -> anotherServerSpan -> serverSpan.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

first, when you have one client and two server spans sharing the same ID, it's doesn't really make sense and the behavior of adjuster is not well defined, because the condition already violates "shared span id" notion from Zipkin. I think it's technically possible to have if a retry from client was done without creating a new client span. I think a reasonable thing to do in such case is to assign unique IDs to both server spans, such that we get a tree (but not a chain that you have):

clientSpan -> serverSpan
clientSpan -> anotherServerSpan
``
In addition, if there is a 4th span whose `parentId == clientSpanId`, then that 4th span's parent ID must be overwritten to the newly generated span ID. Here we have an ambiguity - it could be either `serverSpan.spanId` or `anotherServerSpan.spanId`, there's no way for us to know (and we should arbitrarily pick first or last reassigned `server` span).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yurishkuro Ah okay. I'm happy to remove this test if the behaviour is undefined then because it didn't exist in the old implementation either.


thirdSpan := spans.At(3)
assert.Equal(t, anotherSpanID, thirdSpan.SpanID(), "3rd span ID should not change")
}

func TestSpanIDUniquifierNotTriggered(t *testing.T) {
traces := makeTraces()
spans := traces.ResourceSpans().At(0).ScopeSpans().At(0).Spans()

// only copy server span and random span
newSpans := ptrace.NewSpanSlice()
spans.At(1).CopyTo(newSpans.AppendEmpty())
spans.At(3).CopyTo(newSpans.AppendEmpty())
newSpans.CopyTo(spans)

deduper := SpanIDUniquifier()
require.NoError(t, deduper.Adjust(traces))

gotSpans := traces.ResourceSpans().At(0).ScopeSpans().At(0).Spans()

serverSpanID := clientSpanID // for better readability
serverSpan := gotSpans.At(0)
assert.Equal(t, serverSpanID, serverSpan.SpanID(), "server span ID should be unchanged")

thirdSpan := gotSpans.At(1)
assert.Equal(t, anotherSpanID, thirdSpan.SpanID(), "3rd span ID should not change")
}

func TestSpanIDUniquifierError(t *testing.T) {
traces := makeTraces()

maxID := pcommon.SpanID([8]byte{255, 255, 255, 255, 255, 255, 255, 255})

deduper := &spanIDDeduper{spansByID: make(map[pcommon.SpanID][]ptrace.Span), maxUsedID: maxID}
require.NoError(t, deduper.adjust(traces))

span := traces.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(1)
warnings := jptrace.GetWarnings(span)
require.Len(t, warnings, 1)
require.Equal(t, "cannot assign unique span ID, too many spans in the trace", warnings[0])
}
2 changes: 2 additions & 0 deletions cmd/query/app/querysvc/adjuster/spanlinks.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"go.opentelemetry.io/collector/pdata/ptrace"
)

var _ Adjuster = (*LinksAdjuster)(nil)

// SpanLinks creates an adjuster that removes span links with empty trace IDs.
func SpanLinks() LinksAdjuster {
return LinksAdjuster{}
Expand Down
Loading