Skip to content

Commit 1c9e8ef

Browse files
Frapschenvincentfree
authored andcommitted
[connector/spanmetricsconnector] fix cardinality limit not work (open-telemetry#39664)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description fix open-telemetry#39645 In this PR, changes are below: 1. Fix the bug and use unit tests to ensure that the cardinality limit works as expected. 2. Add missing check logic when adding exemplars to duration metrics. 3. Considering issue open-telemetry#36805, I moved all cardinality limit logic to the internal/metrics package and implemented the check in each GetOrCreate method of the metrics type. 4. Modify overflow metrics to include only one attribute: `otel.metric.overflow: true`.
1 parent d575c8d commit 1c9e8ef

File tree

4 files changed

+276
-126
lines changed

4 files changed

+276
-126
lines changed

connector/spanmetricsconnector/connector.go

Lines changed: 19 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ func initHistogramMetrics(cfg Config) metrics.HistogramMetrics {
157157
if cfg.Histogram.Exponential.MaxSize != 0 {
158158
maxSize = cfg.Histogram.Exponential.MaxSize
159159
}
160-
return metrics.NewExponentialHistogramMetrics(maxSize, cfg.Exemplars.MaxPerDataPoint)
160+
return metrics.NewExponentialHistogramMetrics(maxSize, cfg.Exemplars.MaxPerDataPoint, cfg.AggregationCardinalityLimit)
161161
}
162162

163163
var bounds []float64
@@ -175,7 +175,7 @@ func initHistogramMetrics(cfg Config) metrics.HistogramMetrics {
175175
}
176176
}
177177

178-
return metrics.NewExplicitHistogramMetrics(bounds, cfg.Exemplars.MaxPerDataPoint)
178+
return metrics.NewExplicitHistogramMetrics(bounds, cfg.Exemplars.MaxPerDataPoint, cfg.AggregationCardinalityLimit)
179179
}
180180

181181
// unitDivider returns a unit divider to convert nanoseconds to milliseconds or seconds.
@@ -391,48 +391,26 @@ func (p *connectorImp) aggregateMetrics(traces ptrace.Traces) {
391391
}
392392

393393
key := p.buildKey(serviceName, span, p.dimensions, resourceAttr)
394-
var attributesFun metrics.BuildAttributesFun
395-
396-
// Note: we check cardinality limit here for sums metrics but it is the same
397-
// for histograms because both use the same key and attributes.
398-
if rm.sums.IsCardinalityLimitReached() {
399-
attributesFun = func() pcommon.Map {
400-
attributes := pcommon.NewMap()
401-
for _, d := range p.dimensions {
402-
if v, exists := utilattri.GetDimensionValue(d, span.Attributes(), resourceAttr); exists {
403-
v.CopyTo(attributes.PutEmpty(d.Name))
404-
}
405-
}
406-
attributes.PutBool(overflowKey, true)
407-
408-
return attributes
409-
}
410-
} else {
411-
attributesFun = func() pcommon.Map {
412-
attributes := p.buildAttributes(
413-
serviceName,
414-
span,
415-
resourceAttr,
416-
p.dimensions,
417-
ils.Scope(),
418-
)
419-
return attributes
420-
}
394+
attributesFun := func() pcommon.Map {
395+
return p.buildAttributes(serviceName, span, resourceAttr, p.dimensions, ils.Scope())
421396
}
422397

423-
if !p.config.Histogram.Disable {
424-
// aggregate histogram metrics
425-
h := histograms.GetOrCreate(key, attributesFun, startTimestamp)
426-
p.addExemplar(span, duration, h)
427-
h.Observe(duration)
428-
}
429398
// aggregate sums metrics
430-
s := sums.GetOrCreate(key, attributesFun, startTimestamp)
431-
if p.config.Exemplars.Enabled && !span.TraceID().IsEmpty() {
399+
s, limitReached := sums.GetOrCreate(key, attributesFun, startTimestamp)
400+
if !limitReached && p.config.Exemplars.Enabled && !span.TraceID().IsEmpty() {
432401
s.AddExemplar(span.TraceID(), span.SpanID(), duration)
433402
}
434403
s.Add(1)
435404

405+
// aggregate histogram metrics
406+
if !p.config.Histogram.Disable {
407+
h, durationLimitReached := histograms.GetOrCreate(key, attributesFun, startTimestamp)
408+
if !durationLimitReached && p.config.Exemplars.Enabled && !span.TraceID().IsEmpty() {
409+
p.addExemplar(span, duration, h)
410+
}
411+
h.Observe(duration)
412+
}
413+
436414
// aggregate events metrics
437415
if p.events.Enabled {
438416
for l := 0; l < span.Events().Len(); l++ {
@@ -451,21 +429,11 @@ func (p *connectorImp) aggregateMetrics(traces ptrace.Traces) {
451429
})
452430

453431
eKey := p.buildKey(serviceName, span, eDimensions, rscAndEventAttrs)
454-
if rm.events.IsCardinalityLimitReached() {
455-
attributesFun = func() pcommon.Map {
456-
attributes := pcommon.NewMap()
457-
rscAndEventAttrs.CopyTo(attributes)
458-
attributes.PutBool(overflowKey, true)
459-
460-
return attributes
461-
}
462-
} else {
463-
attributesFun = func() pcommon.Map {
464-
return p.buildAttributes(serviceName, span, rscAndEventAttrs, eDimensions, ils.Scope())
465-
}
432+
attributesFun = func() pcommon.Map {
433+
return p.buildAttributes(serviceName, span, rscAndEventAttrs, eDimensions, ils.Scope())
466434
}
467-
e := events.GetOrCreate(eKey, attributesFun, startTimestamp)
468-
if p.config.Exemplars.Enabled && !span.TraceID().IsEmpty() {
435+
e, eventLimitReached := events.GetOrCreate(eKey, attributesFun, startTimestamp)
436+
if !eventLimitReached && p.config.Exemplars.Enabled && !span.TraceID().IsEmpty() {
469437
e.AddExemplar(span.TraceID(), span.SpanID(), duration)
470438
}
471439
e.Add(1)

connector/spanmetricsconnector/connector_test.go

Lines changed: 109 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -912,6 +912,42 @@ func TestConsumeTraces(t *testing.T) {
912912
}
913913
}
914914

915+
func TestCallsMetricsInitialise(t *testing.T) {
916+
traces := buildSampleTrace()
917+
918+
p, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 0, []string{}, 1000, clockwork.NewFakeClock())
919+
require.NoError(t, err)
920+
921+
ctx := metadata.NewIncomingContext(context.Background(), nil)
922+
err = p.Start(ctx, componenttest.NewNopHost())
923+
defer func() { sdErr := p.Shutdown(ctx); require.NoError(t, sdErr) }()
924+
require.NoError(t, err)
925+
926+
err = p.ConsumeTraces(ctx, traces)
927+
assert.NoError(t, err)
928+
929+
verifyDataPointValue := func(t *testing.T, pmetrics pmetric.Metrics, value int64) {
930+
assert.NotNil(t, pmetrics)
931+
require.NotEmpty(t, pmetrics.ResourceMetrics().Len())
932+
rm := pmetrics.ResourceMetrics().At(0)
933+
require.NotEmpty(t, rm.ScopeMetrics().Len())
934+
sm := rm.ScopeMetrics().At(0)
935+
require.NotEmpty(t, sm.Metrics().Len())
936+
m := sm.Metrics().At(0)
937+
require.NotEmpty(t, m.Sum().DataPoints().Len())
938+
dp := m.Sum().DataPoints().At(0)
939+
require.Equal(t, value, dp.IntValue())
940+
}
941+
942+
// first call buildMetrics(), it will emit zero value
943+
pmetrics := p.buildMetrics()
944+
verifyDataPointValue(t, pmetrics, 0)
945+
946+
// second call buildMetrics(), it will emit actual value
947+
pmetrics = p.buildMetrics()
948+
verifyDataPointValue(t, pmetrics, 1)
949+
}
950+
915951
func TestResourceMetricsCache(t *testing.T) {
916952
p, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 0, []string{}, 1000, clockwork.NewFakeClock())
917953
require.NoError(t, err)
@@ -1372,7 +1408,7 @@ func TestConnector_initHistogramMetrics(t *testing.T) {
13721408
{
13731409
name: "initialize histogram with no config provided",
13741410
config: Config{},
1375-
want: metrics.NewExplicitHistogramMetrics(defaultHistogramBucketsMs, nil),
1411+
want: metrics.NewExplicitHistogramMetrics(defaultHistogramBucketsMs, nil, 0),
13761412
},
13771413
{
13781414
name: "Disable histogram",
@@ -1390,7 +1426,7 @@ func TestConnector_initHistogramMetrics(t *testing.T) {
13901426
Unit: metrics.Milliseconds,
13911427
},
13921428
},
1393-
want: metrics.NewExplicitHistogramMetrics(defaultHistogramBucketsMs, nil),
1429+
want: metrics.NewExplicitHistogramMetrics(defaultHistogramBucketsMs, nil, 0),
13941430
},
13951431
{
13961432
name: "initialize explicit histogram with default bounds (seconds)",
@@ -1399,7 +1435,7 @@ func TestConnector_initHistogramMetrics(t *testing.T) {
13991435
Unit: metrics.Seconds,
14001436
},
14011437
},
1402-
want: metrics.NewExplicitHistogramMetrics(defaultHistogramBucketsSeconds, nil),
1438+
want: metrics.NewExplicitHistogramMetrics(defaultHistogramBucketsSeconds, nil, 0),
14031439
},
14041440
{
14051441
name: "initialize explicit histogram with bounds (seconds)",
@@ -1414,7 +1450,7 @@ func TestConnector_initHistogramMetrics(t *testing.T) {
14141450
},
14151451
},
14161452
},
1417-
want: metrics.NewExplicitHistogramMetrics([]float64{0.1, 1}, nil),
1453+
want: metrics.NewExplicitHistogramMetrics([]float64{0.1, 1}, nil, 0),
14181454
},
14191455
{
14201456
name: "initialize explicit histogram with bounds (ms)",
@@ -1429,7 +1465,7 @@ func TestConnector_initHistogramMetrics(t *testing.T) {
14291465
},
14301466
},
14311467
},
1432-
want: metrics.NewExplicitHistogramMetrics([]float64{100, 1000}, nil),
1468+
want: metrics.NewExplicitHistogramMetrics([]float64{100, 1000}, nil, 0),
14331469
},
14341470
{
14351471
name: "initialize exponential histogram",
@@ -1441,7 +1477,7 @@ func TestConnector_initHistogramMetrics(t *testing.T) {
14411477
},
14421478
},
14431479
},
1444-
want: metrics.NewExponentialHistogramMetrics(10, nil),
1480+
want: metrics.NewExponentialHistogramMetrics(10, nil, 0),
14451481
},
14461482
{
14471483
name: "initialize exponential histogram with default max buckets count",
@@ -1451,7 +1487,7 @@ func TestConnector_initHistogramMetrics(t *testing.T) {
14511487
Exponential: &ExponentialHistogramConfig{},
14521488
},
14531489
},
1454-
want: metrics.NewExponentialHistogramMetrics(structure.DefaultMaxSize, nil),
1490+
want: metrics.NewExponentialHistogramMetrics(structure.DefaultMaxSize, nil, 0),
14551491
},
14561492
}
14571493
for _, tt := range tests {
@@ -1975,36 +2011,42 @@ func TestConnectorWithCardinalityLimit(t *testing.T) {
19752011

19762012
// Create spans for the resources
19772013
traces := ptrace.NewTraces()
2014+
19782015
rspan1 := traces.ResourceSpans().AppendEmpty()
19792016
resource1.CopyTo(rspan1.Resource())
1980-
ils := rspan1.ScopeSpans().AppendEmpty()
2017+
ils1 := rspan1.ScopeSpans().AppendEmpty()
19812018

19822019
rspan2 := traces.ResourceSpans().AppendEmpty()
19832020
resource2.CopyTo(rspan2.Resource())
19842021
ils2 := rspan2.ScopeSpans().AppendEmpty()
19852022

19862023
// Add spans with different names to trigger overflow
1987-
for i := 0; i < 3; i++ {
1988-
span := ils.Spans().AppendEmpty()
1989-
span.SetName(fmt.Sprintf("operation%d", i))
1990-
span.SetKind(ptrace.SpanKindServer)
1991-
span.Attributes().PutStr("http.method", "GET")
2024+
for i := 0; i < 5; i++ {
2025+
span1 := ils1.Spans().AppendEmpty()
2026+
span1.SetName(fmt.Sprintf("operation%d", i))
2027+
span1.SetKind(ptrace.SpanKindServer)
2028+
span1.Attributes().PutStr("http.method", "GET")
19922029

19932030
span2 := ils2.Spans().AppendEmpty()
19942031
span2.SetName(fmt.Sprintf("operation%d", i))
19952032
span2.SetKind(ptrace.SpanKindServer)
19962033
span2.Attributes().PutStr("http.method", "GET")
19972034
}
19982035

2036+
// Send two batches of spans to the connector to ensure it consumes more spans data,
2037+
// avoiding potential edge-case traps.
2038+
assert.NoError(t, connector.ConsumeTraces(context.Background(), traces))
19992039
assert.NoError(t, connector.ConsumeTraces(context.Background(), traces))
20002040

2001-
metrics := connector.buildMetrics()
2041+
// Ignore the first buildMetrics call, which emits zero datapoint values.
2042+
_ = connector.buildMetrics()
20022043

2003-
resourceMetrics := metrics.ResourceMetrics()
2004-
assert.Equal(t, 2, resourceMetrics.Len()) // 2 resources
2044+
pmetrics := connector.buildMetrics()
2045+
rmetrics := pmetrics.ResourceMetrics()
2046+
assert.Equal(t, 2, rmetrics.Len()) // 2 ResourceMetrics
20052047

2006-
for i := 0; i < resourceMetrics.Len(); i++ {
2007-
rm := resourceMetrics.At(i)
2048+
for i := 0; i < rmetrics.Len(); i++ {
2049+
rm := rmetrics.At(i)
20082050
serviceName, _ := rm.Resource().Attributes().Get("service.name")
20092051

20102052
// Each resource should have:
@@ -2013,11 +2055,13 @@ func TestConnectorWithCardinalityLimit(t *testing.T) {
20132055
metricCount := 0
20142056
overflowCount := 0
20152057

2016-
metrics := rm.ScopeMetrics().At(0).Metrics()
2017-
for j := 0; j < metrics.Len(); j++ {
2018-
metric := metrics.At(j)
2058+
assert.Equal(t, 1, rm.ScopeMetrics().Len()) // one ScopeMetrics
2059+
metricsSlice := rm.ScopeMetrics().At(0).Metrics()
2060+
for j := 0; j < metricsSlice.Len(); j++ {
2061+
metric := metricsSlice.At(j)
20192062
if metric.Name() == buildMetricName(DefaultNamespace, metricNameCalls) {
20202063
dps := metric.Sum().DataPoints()
2064+
assert.Equal(t, 3, dps.Len()) // three DataPoints
20212065
for k := 0; k < dps.Len(); k++ {
20222066
dp := dps.At(k)
20232067
if _, exists := dp.Attributes().Get(overflowKey); exists {
@@ -2026,14 +2070,7 @@ func TestConnectorWithCardinalityLimit(t *testing.T) {
20262070
overflowVal, exists := attrs.Get(overflowKey)
20272071
assert.True(t, exists)
20282072
assert.True(t, overflowVal.Bool())
2029-
_, exists = attrs.Get("region")
2030-
assert.True(t, exists)
2031-
_, exists = attrs.Get(spanNameKey)
2032-
assert.False(t, exists)
2033-
_, exists = attrs.Get(spanKindKey)
2034-
assert.False(t, exists)
2035-
_, exists = attrs.Get(statusCodeKey)
2036-
assert.False(t, exists)
2073+
assert.Equal(t, int64(6), dp.IntValue()) // overflow datapoints have value of 6
20372074
} else {
20382075
metricCount++
20392076
attrs := dp.Attributes()
@@ -2047,6 +2084,37 @@ func TestConnectorWithCardinalityLimit(t *testing.T) {
20472084
assert.True(t, exists)
20482085
_, exists = attrs.Get("region")
20492086
assert.True(t, exists)
2087+
assert.Equal(t, int64(2), dp.IntValue()) // normal datapoints have value of 2
2088+
}
2089+
}
2090+
}
2091+
if metric.Name() == buildMetricName(DefaultNamespace, metricNameDuration) {
2092+
dps := metric.Histogram().DataPoints()
2093+
assert.Equal(t, 3, dps.Len()) // three DataPoints
2094+
for k := 0; k < dps.Len(); k++ {
2095+
assert.Equal(t, 3, dps.Len()) // three DataPoints
2096+
for k := 0; k < dps.Len(); k++ {
2097+
dp := dps.At(k)
2098+
if _, exists := dp.Attributes().Get(overflowKey); exists {
2099+
attrs := dp.Attributes()
2100+
overflowVal, exists := attrs.Get(overflowKey)
2101+
assert.True(t, exists)
2102+
assert.True(t, overflowVal.Bool())
2103+
assert.Equal(t, uint64(6), dp.Count()) // overflow datapoints have value of 6
2104+
} else {
2105+
attrs := dp.Attributes()
2106+
_, exists := attrs.Get(serviceNameKey)
2107+
assert.True(t, exists)
2108+
_, exists = attrs.Get(spanNameKey)
2109+
assert.True(t, exists)
2110+
_, exists = attrs.Get(spanKindKey)
2111+
assert.True(t, exists)
2112+
_, exists = attrs.Get(statusCodeKey)
2113+
assert.True(t, exists)
2114+
_, exists = attrs.Get("region")
2115+
assert.True(t, exists)
2116+
assert.Equal(t, uint64(2), dp.Count()) // normal datapoints have value of 2
2117+
}
20502118
}
20512119
}
20522120
}
@@ -2086,27 +2154,28 @@ func TestConnectorWithCardinalityLimitForEvents(t *testing.T) {
20862154

20872155
// Add 3 different events to trigger overflow
20882156
events := span.Events()
2089-
for i := 0; i < 3; i++ {
2157+
for i := 0; i < 5; i++ {
20902158
event := events.AppendEmpty()
20912159
event.SetName(fmt.Sprintf("event%d", i))
20922160
event.Attributes().PutStr("event.name", fmt.Sprintf("event%d", i))
20932161
}
20942162

2095-
// First consume to reach the limit
2163+
// Send two batches of spans to the connector to ensure it consumes more spans data,
2164+
// avoiding potential edge-case traps.
20962165
assert.NoError(t, connector.ConsumeTraces(context.Background(), traces))
2097-
2098-
// Second consume to trigger overflow
20992166
assert.NoError(t, connector.ConsumeTraces(context.Background(), traces))
21002167

2101-
metrics := connector.buildMetrics()
2168+
// Ignore the first buildMetrics call, which emits zero datapoint values.
2169+
_ = connector.buildMetrics()
2170+
pmetrics := connector.buildMetrics()
21022171

2103-
resourceMetrics := metrics.ResourceMetrics()
2104-
assert.Equal(t, 1, resourceMetrics.Len())
2172+
rmetrics := pmetrics.ResourceMetrics()
2173+
assert.Equal(t, 1, rmetrics.Len())
21052174

2106-
rm := resourceMetrics.At(0)
2107-
serviceName, _ := rm.Resource().Attributes().Get("service.name")
2175+
rm := rmetrics.At(0)
21082176

21092177
// Check events metric
2178+
assert.Equal(t, 1, rm.ScopeMetrics().Len())
21102179
metricsSlice := rm.ScopeMetrics().At(0).Metrics()
21112180
var eventsMetric pmetric.Metric
21122181
for i := 0; i < metricsSlice.Len(); i++ {
@@ -2125,10 +2194,7 @@ func TestConnectorWithCardinalityLimitForEvents(t *testing.T) {
21252194
dp := dps.At(i)
21262195
if _, exists := dp.Attributes().Get(overflowKey); exists {
21272196
overflowCount++
2128-
// Verify overflow metric has service name
2129-
serviceNameAttr, ok := dp.Attributes().Get(serviceNameKey)
2130-
assert.True(t, ok)
2131-
assert.Equal(t, serviceName.Str(), serviceNameAttr.Str())
2197+
assert.Equal(t, int64(6), dp.IntValue())
21322198
} else {
21332199
normalCount++
21342200
// Verify normal metric has event name
@@ -2143,6 +2209,7 @@ func TestConnectorWithCardinalityLimitForEvents(t *testing.T) {
21432209
assert.True(t, exists)
21442210
_, exists = attrs.Get(statusCodeKey)
21452211
assert.True(t, exists)
2212+
assert.Equal(t, int64(2), dp.IntValue())
21462213
}
21472214
}
21482215

0 commit comments

Comments
 (0)