Skip to content

Commit 0dc5c1f

Browse files
authored
[chore] [exporter/signalfx] Remove redundant traces shim (#31058)
Also, remove `ActiveServiceTracker.spansProcessed` field as redundant dependency on the shim
1 parent 633aea2 commit 0dc5c1f

File tree

8 files changed

+74
-325
lines changed

8 files changed

+74
-325
lines changed

exporter/signalfxexporter/factory.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ func createTracesExporter(
105105
ctx,
106106
set,
107107
cfg,
108-
tracker.AddSpans,
108+
tracker.ProcessTraces,
109109
exporterhelper.WithStart(tracker.Start),
110110
exporterhelper.WithShutdown(tracker.Shutdown))
111111
}

exporter/signalfxexporter/internal/apm/tracetracker/shims.go

Lines changed: 0 additions & 57 deletions
This file was deleted.

exporter/signalfxexporter/internal/apm/tracetracker/tracker.go

Lines changed: 34 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,12 @@ package tracetracker // import "github.com/open-telemetry/opentelemetry-collecto
77
import (
88
"context"
99
"strings"
10-
"sync/atomic"
1110
"time"
1211

12+
"go.opentelemetry.io/collector/pdata/pcommon"
13+
"go.opentelemetry.io/collector/pdata/ptrace"
14+
conventions "go.opentelemetry.io/collector/semconv/v1.6.1"
15+
1316
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/signalfxexporter/internal/apm/correlations"
1417
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/signalfxexporter/internal/apm/log"
1518
)
@@ -55,9 +58,6 @@ type ActiveServiceTracker struct {
5558
// correlationClient is the client used for updating infrastructure correlation properties
5659
correlationClient correlations.CorrelationClient
5760

58-
// Internal metrics
59-
spansProcessed int64
60-
6161
// Map of dimensions to sync to with the key being the span attribute to lookup and the value being
6262
// the dimension to sync to.
6363
dimsToSyncSource map[string]string
@@ -119,29 +119,35 @@ func New(
119119
return a
120120
}
121121

122-
// AddSpansGeneric accepts a list of trace spans and uses them to update the
122+
// ProcessTraces accepts a list of trace spans and uses them to update the
123123
// current list of active services. This is thread-safe.
124-
func (a *ActiveServiceTracker) AddSpansGeneric(_ context.Context, spans SpanList) {
124+
func (a *ActiveServiceTracker) ProcessTraces(_ context.Context, traces ptrace.Traces) {
125125
// Take current time once since this is a system call.
126126
now := a.timeNow()
127127

128-
for i := 0; i < spans.Len(); i++ {
129-
a.processEnvironment(spans.At(i), now)
130-
a.processService(spans.At(i), now)
128+
for i := 0; i < traces.ResourceSpans().Len(); i++ {
129+
a.processEnvironment(traces.ResourceSpans().At(i).Resource(), now)
130+
a.processService(traces.ResourceSpans().At(i).Resource(), now)
131131
}
132-
133-
// Protected by lock above
134-
atomic.AddInt64(&a.spansProcessed, int64(spans.Len()))
135132
}
136133

137-
func (a *ActiveServiceTracker) processEnvironment(span Span, now time.Time) {
138-
if span.NumTags() == 0 {
134+
func (a *ActiveServiceTracker) processEnvironment(res pcommon.Resource, now time.Time) {
135+
attrs := res.Attributes()
136+
if attrs.Len() == 0 {
139137
return
140138
}
141-
environment, environmentFound := span.Environment()
142139

143-
// If spans are coming in with no environment, we use the same fallback value that is being set on the backend.
144-
if !environmentFound || strings.TrimSpace(environment) == "" {
140+
// Determine the environment value from the incoming spans.
141+
// First check "deployment.environment" attribute.
142+
// Then, try "environment" attribute (SignalFx schema).
143+
// Otherwise, use the same fallback value as set on the backend.
144+
var environment string
145+
if env, ok := attrs.Get(conventions.AttributeDeploymentEnvironment); ok {
146+
environment = env.Str()
147+
} else if env, ok = attrs.Get("environment"); ok {
148+
environment = env.Str()
149+
}
150+
if strings.TrimSpace(environment) == "" {
145151
environment = fallbackEnvironment
146152
}
147153

@@ -175,29 +181,30 @@ func (a *ActiveServiceTracker) processEnvironment(span Span, now time.Time) {
175181
for sourceAttr, dimName := range a.dimsToSyncSource {
176182
sourceAttr := sourceAttr
177183
dimName := dimName
178-
if dimValue, ok := span.Tag(sourceAttr); ok {
184+
if val, ok := attrs.Get(sourceAttr); ok {
179185
// Note that the value is not set on the cache key. We only send the first environment received for a
180186
// given pod/container, and we never delete the values set on the container/pod dimension.
181187
// So we only need to cache the dim name and dim value that have been associated with an environment.
182-
if exists := a.tenantEnvironmentCache.UpdateIfExists(&CacheKey{dimName: dimName, dimValue: dimValue}, now); !exists {
188+
if exists := a.tenantEnvironmentCache.UpdateIfExists(&CacheKey{dimName: dimName, dimValue: val.Str()}, now); !exists {
183189
a.correlationClient.Correlate(&correlations.Correlation{
184190
Type: correlations.Environment,
185191
DimName: dimName,
186-
DimValue: dimValue,
192+
DimValue: val.Str(),
187193
Value: environment,
188194
}, func(cor *correlations.Correlation, err error) {
189195
if err == nil {
190-
a.tenantEnvironmentCache.UpdateOrCreate(&CacheKey{dimName: dimName, dimValue: dimValue}, now)
196+
a.tenantEnvironmentCache.UpdateOrCreate(&CacheKey{dimName: dimName, dimValue: val.Str()}, now)
191197
}
192198
})
193199
}
194200
}
195201
}
196202
}
197203

198-
func (a *ActiveServiceTracker) processService(span Span, now time.Time) {
204+
func (a *ActiveServiceTracker) processService(res pcommon.Resource, now time.Time) {
199205
// Can't do anything if the spans don't have a local service name
200-
service, ok := span.ServiceName()
206+
serviceNameAttr, ok := res.Attributes().Get(conventions.AttributeServiceName)
207+
service := serviceNameAttr.Str()
201208
if !ok || service == "" {
202209
return
203210
}
@@ -234,19 +241,19 @@ func (a *ActiveServiceTracker) processService(span Span, now time.Time) {
234241
for sourceAttr, dimName := range a.dimsToSyncSource {
235242
sourceAttr := sourceAttr
236243
dimName := dimName
237-
if dimValue, ok := span.Tag(sourceAttr); ok {
244+
if val, ok := res.Attributes().Get(sourceAttr); ok {
238245
// Note that the value is not set on the cache key. We only send the first service received for a
239246
// given pod/container, and we never delete the values set on the container/pod dimension.
240247
// So we only need to cache the dim name and dim value that have been associated with a service.
241-
if exists := a.tenantServiceCache.UpdateIfExists(&CacheKey{dimName: dimName, dimValue: dimValue}, now); !exists {
248+
if exists := a.tenantServiceCache.UpdateIfExists(&CacheKey{dimName: dimName, dimValue: val.Str()}, now); !exists {
242249
a.correlationClient.Correlate(&correlations.Correlation{
243250
Type: correlations.Service,
244251
DimName: dimName,
245-
DimValue: dimValue,
252+
DimValue: val.Str(),
246253
Value: service,
247254
}, func(cor *correlations.Correlation, err error) {
248255
if err == nil {
249-
a.tenantServiceCache.UpdateOrCreate(&CacheKey{dimName: dimName, dimValue: dimValue}, now)
256+
a.tenantServiceCache.UpdateOrCreate(&CacheKey{dimName: dimName, dimValue: val.Str()}, now)
250257
}
251258
})
252259
}

exporter/signalfxexporter/internal/apm/tracetracker/tracker_test.go

Lines changed: 34 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@ import (
1212
"time"
1313

1414
"github.com/stretchr/testify/assert"
15+
"go.opentelemetry.io/collector/pdata/pcommon"
16+
"go.opentelemetry.io/collector/pdata/ptrace"
17+
conventions "go.opentelemetry.io/collector/semconv/v1.6.1"
1518

1619
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/signalfxexporter/internal/apm/correlations"
1720
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/signalfxexporter/internal/apm/log"
@@ -26,17 +29,15 @@ func advanceTime(a *ActiveServiceTracker, minutes int64) {
2629
a.timeNow = func() time.Time { return newNow }
2730
}
2831

29-
// mergeStringMaps merges n maps with a later map's keys overriding earlier maps
30-
func mergeStringMaps(maps ...map[string]string) map[string]string {
31-
ret := map[string]string{}
32-
32+
// newResourceWithAttrs creates a new resource with the given attributes.
33+
func newResourceWithAttrs(maps ...map[string]string) pcommon.Resource {
34+
res := pcommon.NewResource()
3335
for _, m := range maps {
3436
for k, v := range m {
35-
ret[k] = v
37+
res.Attributes().PutStr(k, v)
3638
}
3739
}
38-
39-
return ret
40+
return res
4041
}
4142

4243
func TestExpiration(t *testing.T) {
@@ -46,32 +47,24 @@ func TestExpiration(t *testing.T) {
4647
a := New(log.Nil, 5*time.Minute, correlationClient, hostIDDims, DefaultDimsToSyncSource)
4748
setTime(a, time.Unix(100, 0))
4849

49-
a.AddSpansGeneric(context.Background(), fakeSpanList{
50-
{
51-
serviceName: "one",
52-
tags: mergeStringMaps(hostIDDims, map[string]string{"environment": "environment1"}),
53-
},
54-
{
55-
serviceName: "two",
56-
tags: mergeStringMaps(hostIDDims, map[string]string{"environment": "environment2"}),
57-
},
58-
{
59-
serviceName: "three",
60-
tags: mergeStringMaps(hostIDDims, map[string]string{"environment": "environment3"}),
61-
},
62-
})
50+
fakeTraces := ptrace.NewTraces()
51+
newResourceWithAttrs(hostIDDims, map[string]string{conventions.AttributeServiceName: "one", "environment": "environment1"}).
52+
CopyTo(fakeTraces.ResourceSpans().AppendEmpty().Resource())
53+
newResourceWithAttrs(hostIDDims, map[string]string{conventions.AttributeServiceName: "two", "environment": "environment2"}).
54+
CopyTo(fakeTraces.ResourceSpans().AppendEmpty().Resource())
55+
newResourceWithAttrs(hostIDDims, map[string]string{conventions.AttributeServiceName: "three", "environment": "environment3"}).
56+
CopyTo(fakeTraces.ResourceSpans().AppendEmpty().Resource())
57+
a.ProcessTraces(context.Background(), fakeTraces)
6358

6459
assert.Equal(t, int64(3), a.hostServiceCache.ActiveCount, "activeServiceCount is not properly tracked")
6560
assert.Equal(t, int64(3), a.hostEnvironmentCache.ActiveCount, "activeEnvironmentCount is not properly tracked")
6661

6762
advanceTime(a, 4)
6863

69-
a.AddSpansGeneric(context.Background(), fakeSpanList{
70-
{
71-
serviceName: "two",
72-
tags: mergeStringMaps(hostIDDims, map[string]string{"environment": "environment2"}),
73-
},
74-
})
64+
fakeTraces = ptrace.NewTraces()
65+
newResourceWithAttrs(hostIDDims, map[string]string{conventions.AttributeServiceName: "two", "environment": "environment2"}).
66+
CopyTo(fakeTraces.ResourceSpans().AppendEmpty().Resource())
67+
a.ProcessTraces(context.Background(), fakeTraces)
7568

7669
advanceTime(a, 2)
7770
a.Purge()
@@ -148,11 +141,12 @@ func TestCorrelationEmptyEnvironment(t *testing.T) {
148141
a := New(log.Nil, 5*time.Minute, correlationClient, hostIDDims, DefaultDimsToSyncSource)
149142
wg.Wait() // wait for the initial fetch of hostIDDims to complete
150143

151-
a.AddSpansGeneric(context.Background(), fakeSpanList{
152-
{tags: mergeStringMaps(hostIDDims, containerLevelIDDims)},
153-
{tags: mergeStringMaps(hostIDDims, containerLevelIDDims)},
154-
{tags: mergeStringMaps(hostIDDims, containerLevelIDDims)},
155-
})
144+
fakeTraces := ptrace.NewTraces()
145+
fakeResource := newResourceWithAttrs(hostIDDims, containerLevelIDDims)
146+
fakeResource.CopyTo(fakeTraces.ResourceSpans().AppendEmpty().Resource())
147+
fakeResource.CopyTo(fakeTraces.ResourceSpans().AppendEmpty().Resource())
148+
fakeResource.CopyTo(fakeTraces.ResourceSpans().AppendEmpty().Resource())
149+
a.ProcessTraces(context.Background(), fakeTraces)
156150

157151
cors := correlationClient.getCorrelations()
158152
assert.Equal(t, 4, len(cors), "expected 4 correlations to be made")
@@ -194,20 +188,14 @@ func TestCorrelationUpdates(t *testing.T) {
194188

195189
setTime(a, time.Unix(100, 0))
196190

197-
a.AddSpansGeneric(context.Background(), fakeSpanList{
198-
{
199-
serviceName: "one",
200-
tags: mergeStringMaps(hostIDDims, mergeStringMaps(containerLevelIDDims, map[string]string{"environment": "environment1"})),
201-
},
202-
{
203-
serviceName: "two",
204-
tags: mergeStringMaps(hostIDDims, mergeStringMaps(containerLevelIDDims, map[string]string{"environment": "environment2"})),
205-
},
206-
{
207-
serviceName: "three",
208-
tags: mergeStringMaps(hostIDDims, mergeStringMaps(containerLevelIDDims, map[string]string{"environment": "environment3"})),
209-
},
210-
})
191+
fakeTraces := ptrace.NewTraces()
192+
newResourceWithAttrs(containerLevelIDDims, map[string]string{conventions.AttributeServiceName: "one", "environment": "environment1"}).
193+
CopyTo(fakeTraces.ResourceSpans().AppendEmpty().Resource())
194+
newResourceWithAttrs(containerLevelIDDims, map[string]string{conventions.AttributeServiceName: "two", "environment": "environment2"}).
195+
CopyTo(fakeTraces.ResourceSpans().AppendEmpty().Resource())
196+
newResourceWithAttrs(containerLevelIDDims, map[string]string{conventions.AttributeServiceName: "three", "environment": "environment3"}).
197+
CopyTo(fakeTraces.ResourceSpans().AppendEmpty().Resource())
198+
a.ProcessTraces(context.Background(), fakeTraces)
211199

212200
assert.Equal(t, int64(3), a.hostServiceCache.ActiveCount, "activeServiceCount is not properly tracked")
213201
assert.Equal(t, int64(3), a.hostEnvironmentCache.ActiveCount, "activeEnvironmentCount is not properly tracked")

exporter/signalfxexporter/internal/correlation/correlation.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,9 +80,9 @@ func newCorrelationClient(cfg *Config, accessToken configopaque.String, params e
8080
}, nil
8181
}
8282

83-
// AddSpans processes the provided spans to correlate the services and environment observed
83+
// ProcessTraces processes the provided spans to correlate the services and environment observed
8484
// to the resources (host, pods, etc.) emitting the spans.
85-
func (cor *Tracker) AddSpans(ctx context.Context, traces ptrace.Traces) error {
85+
func (cor *Tracker) ProcessTraces(ctx context.Context, traces ptrace.Traces) error {
8686
if cor == nil || traces.ResourceSpans().Len() == 0 {
8787
return nil
8888
}
@@ -116,7 +116,7 @@ func (cor *Tracker) AddSpans(ctx context.Context, traces ptrace.Traces) error {
116116
})
117117

118118
if cor.traceTracker != nil {
119-
cor.traceTracker.AddSpansGeneric(ctx, spanListWrap{traces.ResourceSpans()})
119+
cor.traceTracker.ProcessTraces(ctx, traces)
120120
}
121121

122122
return nil

exporter/signalfxexporter/internal/correlation/correlation_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,10 @@ func TestTrackerAddSpans(t *testing.T) {
3333
attr.PutStr("host.name", "localhost")
3434

3535
// Add empty first, should ignore.
36-
assert.NoError(t, tracker.AddSpans(context.Background(), ptrace.NewTraces()))
36+
assert.NoError(t, tracker.ProcessTraces(context.Background(), ptrace.NewTraces()))
3737
assert.Nil(t, tracker.traceTracker)
3838

39-
assert.NoError(t, tracker.AddSpans(context.Background(), traces))
39+
assert.NoError(t, tracker.ProcessTraces(context.Background(), traces))
4040

4141
assert.NotNil(t, tracker.traceTracker, "trace tracker should be set")
4242

0 commit comments

Comments
 (0)