Skip to content

Commit 4be973d

Browse files
mahadzaryab1ekefan
authored andcommitted
[v2][adjuster] Implement adjuster for deduplicating spans (jaegertracing#6391)
## Which problem is this PR solving? - Towards jaegertracing#6344 ## Description of the changes - Implemented an adjuster to deduplicate spans. - The span deduplication is done by marshalling each span into protobuf bytes and applying the FNV hash algorithm to it. ## How was this change tested? - Added unit tests ## Checklist - [x] I have read https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md - [x] I have signed all commits - [x] I have added unit tests for the new functionality - [x] I have run lint and test steps successfully - for `jaeger`: `make lint test` - for `jaeger-ui`: `npm run lint` and `npm run test` --------- Signed-off-by: Mahad Zaryab <[email protected]>
1 parent 7aa5c0f commit 4be973d

File tree

3 files changed

+405
-0
lines changed

3 files changed

+405
-0
lines changed
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
// Copyright (c) 2024 The Jaeger Authors.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package adjuster
5+
6+
import (
7+
"fmt"
8+
"hash/fnv"
9+
10+
"go.opentelemetry.io/collector/pdata/ptrace"
11+
12+
"github.com/jaegertracing/jaeger/internal/jptrace"
13+
)
14+
15+
var _ Adjuster = (*SpanHashDeduper)(nil)
16+
17+
// SpanHash creates an adjuster that deduplicates spans by removing all but one span
18+
// with the same hash code. This is particularly useful for scenarios where spans
19+
// may be duplicated during archival, such as with ElasticSearch archival.
20+
//
21+
// The hash code is generated by serializing the span into protobuf bytes and applying
22+
// the FNV hashing algorithm to the serialized data.
23+
//
24+
// To ensure consistent hash codes, this adjuster should be executed after
25+
// SortAttributesAndEvents, which normalizes the order of collections within the span.
26+
func SpanHash() SpanHashDeduper {
27+
return SpanHashDeduper{
28+
marshaler: &ptrace.ProtoMarshaler{},
29+
}
30+
}
31+
32+
type SpanHashDeduper struct {
33+
marshaler ptrace.Marshaler
34+
}
35+
36+
func (s *SpanHashDeduper) Adjust(traces ptrace.Traces) {
37+
spansByHash := make(map[uint64]ptrace.Span)
38+
resourceSpans := traces.ResourceSpans()
39+
for i := 0; i < resourceSpans.Len(); i++ {
40+
rs := resourceSpans.At(i)
41+
scopeSpans := rs.ScopeSpans()
42+
hashTrace := ptrace.NewTraces()
43+
hashResourceSpan := hashTrace.ResourceSpans().AppendEmpty()
44+
hashScopeSpan := hashResourceSpan.ScopeSpans().AppendEmpty()
45+
hashSpan := hashScopeSpan.Spans().AppendEmpty()
46+
rs.Resource().Attributes().CopyTo(hashResourceSpan.Resource().Attributes())
47+
for j := 0; j < scopeSpans.Len(); j++ {
48+
ss := scopeSpans.At(j)
49+
spans := ss.Spans()
50+
ss.Scope().Attributes().CopyTo(hashScopeSpan.Scope().Attributes())
51+
dedupedSpans := ptrace.NewSpanSlice()
52+
for k := 0; k < spans.Len(); k++ {
53+
span := spans.At(k)
54+
span.CopyTo(hashSpan)
55+
h, err := s.computeHashCode(
56+
hashTrace,
57+
)
58+
if err != nil {
59+
jptrace.AddWarning(span, fmt.Sprintf("failed to compute hash code: %v", err))
60+
span.CopyTo(dedupedSpans.AppendEmpty())
61+
continue
62+
}
63+
if _, ok := spansByHash[h]; !ok {
64+
spansByHash[h] = span
65+
span.CopyTo(dedupedSpans.AppendEmpty())
66+
}
67+
}
68+
dedupedSpans.CopyTo(spans)
69+
}
70+
}
71+
}
72+
73+
func (s *SpanHashDeduper) computeHashCode(
74+
hashTrace ptrace.Traces,
75+
) (uint64, error) {
76+
b, err := s.marshaler.MarshalTraces(hashTrace)
77+
if err != nil {
78+
return 0, err
79+
}
80+
hasher := fnv.New64a()
81+
hasher.Write(b) // the writer in the Hash interface never returns an error
82+
return hasher.Sum64(), nil
83+
}

0 commit comments

Comments
 (0)