Skip to content

Commit 090527b

Browse files
authored
[connector/spanmetricsconnector] fix cardinality limit not work (#39664)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description fix #39645 In this PR, changes are below: 1. Fix the bug and use unit tests to ensure that the cardinality limit works as expected. 2. Add missing check logic when adding exemplars to duration metrics. 3. Considering issue #36805, I moved all cardinality limit logic to the internal/metrics package and implemented the check in each GetOrCreate method of the metrics type. 4. Modify overflow metrics to include only one attribute: `otel.metric.overflow: true`.
1 parent e7eef16 commit 090527b

File tree

4 files changed

+276
-126
lines changed

4 files changed

+276
-126
lines changed

connector/spanmetricsconnector/connector.go

Lines changed: 19 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ func initHistogramMetrics(cfg Config) metrics.HistogramMetrics {
157157
if cfg.Histogram.Exponential.MaxSize != 0 {
158158
maxSize = cfg.Histogram.Exponential.MaxSize
159159
}
160-
return metrics.NewExponentialHistogramMetrics(maxSize, cfg.Exemplars.MaxPerDataPoint)
160+
return metrics.NewExponentialHistogramMetrics(maxSize, cfg.Exemplars.MaxPerDataPoint, cfg.AggregationCardinalityLimit)
161161
}
162162

163163
var bounds []float64
@@ -175,7 +175,7 @@ func initHistogramMetrics(cfg Config) metrics.HistogramMetrics {
175175
}
176176
}
177177

178-
return metrics.NewExplicitHistogramMetrics(bounds, cfg.Exemplars.MaxPerDataPoint)
178+
return metrics.NewExplicitHistogramMetrics(bounds, cfg.Exemplars.MaxPerDataPoint, cfg.AggregationCardinalityLimit)
179179
}
180180

181181
// unitDivider returns a unit divider to convert nanoseconds to milliseconds or seconds.
@@ -391,48 +391,26 @@ func (p *connectorImp) aggregateMetrics(traces ptrace.Traces) {
391391
}
392392

393393
key := p.buildKey(serviceName, span, p.dimensions, resourceAttr)
394-
var attributesFun metrics.BuildAttributesFun
395-
396-
// Note: we check cardinality limit here for sums metrics but it is the same
397-
// for histograms because both use the same key and attributes.
398-
if rm.sums.IsCardinalityLimitReached() {
399-
attributesFun = func() pcommon.Map {
400-
attributes := pcommon.NewMap()
401-
for _, d := range p.dimensions {
402-
if v, exists := utilattri.GetDimensionValue(d, span.Attributes(), resourceAttr); exists {
403-
v.CopyTo(attributes.PutEmpty(d.Name))
404-
}
405-
}
406-
attributes.PutBool(overflowKey, true)
407-
408-
return attributes
409-
}
410-
} else {
411-
attributesFun = func() pcommon.Map {
412-
attributes := p.buildAttributes(
413-
serviceName,
414-
span,
415-
resourceAttr,
416-
p.dimensions,
417-
ils.Scope(),
418-
)
419-
return attributes
420-
}
394+
attributesFun := func() pcommon.Map {
395+
return p.buildAttributes(serviceName, span, resourceAttr, p.dimensions, ils.Scope())
421396
}
422397

423-
if !p.config.Histogram.Disable {
424-
// aggregate histogram metrics
425-
h := histograms.GetOrCreate(key, attributesFun, startTimestamp)
426-
p.addExemplar(span, duration, h)
427-
h.Observe(duration)
428-
}
429398
// aggregate sums metrics
430-
s := sums.GetOrCreate(key, attributesFun, startTimestamp)
431-
if p.config.Exemplars.Enabled && !span.TraceID().IsEmpty() {
399+
s, limitReached := sums.GetOrCreate(key, attributesFun, startTimestamp)
400+
if !limitReached && p.config.Exemplars.Enabled && !span.TraceID().IsEmpty() {
432401
s.AddExemplar(span.TraceID(), span.SpanID(), duration)
433402
}
434403
s.Add(1)
435404

405+
// aggregate histogram metrics
406+
if !p.config.Histogram.Disable {
407+
h, durationLimitReached := histograms.GetOrCreate(key, attributesFun, startTimestamp)
408+
if !durationLimitReached && p.config.Exemplars.Enabled && !span.TraceID().IsEmpty() {
409+
p.addExemplar(span, duration, h)
410+
}
411+
h.Observe(duration)
412+
}
413+
436414
// aggregate events metrics
437415
if p.events.Enabled {
438416
for l := 0; l < span.Events().Len(); l++ {
@@ -451,21 +429,11 @@ func (p *connectorImp) aggregateMetrics(traces ptrace.Traces) {
451429
})
452430

453431
eKey := p.buildKey(serviceName, span, eDimensions, rscAndEventAttrs)
454-
if rm.events.IsCardinalityLimitReached() {
455-
attributesFun = func() pcommon.Map {
456-
attributes := pcommon.NewMap()
457-
rscAndEventAttrs.CopyTo(attributes)
458-
attributes.PutBool(overflowKey, true)
459-
460-
return attributes
461-
}
462-
} else {
463-
attributesFun = func() pcommon.Map {
464-
return p.buildAttributes(serviceName, span, rscAndEventAttrs, eDimensions, ils.Scope())
465-
}
432+
attributesFun = func() pcommon.Map {
433+
return p.buildAttributes(serviceName, span, rscAndEventAttrs, eDimensions, ils.Scope())
466434
}
467-
e := events.GetOrCreate(eKey, attributesFun, startTimestamp)
468-
if p.config.Exemplars.Enabled && !span.TraceID().IsEmpty() {
435+
e, eventLimitReached := events.GetOrCreate(eKey, attributesFun, startTimestamp)
436+
if !eventLimitReached && p.config.Exemplars.Enabled && !span.TraceID().IsEmpty() {
469437
e.AddExemplar(span.TraceID(), span.SpanID(), duration)
470438
}
471439
e.Add(1)

connector/spanmetricsconnector/connector_test.go

Lines changed: 109 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -912,6 +912,42 @@ func TestConsumeTraces(t *testing.T) {
912912
}
913913
}
914914

915+
func TestCallsMetricsInitialise(t *testing.T) {
916+
traces := buildSampleTrace()
917+
918+
p, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 0, []string{}, 1000, clockwork.NewFakeClock())
919+
require.NoError(t, err)
920+
921+
ctx := metadata.NewIncomingContext(context.Background(), nil)
922+
err = p.Start(ctx, componenttest.NewNopHost())
923+
defer func() { sdErr := p.Shutdown(ctx); require.NoError(t, sdErr) }()
924+
require.NoError(t, err)
925+
926+
err = p.ConsumeTraces(ctx, traces)
927+
assert.NoError(t, err)
928+
929+
verifyDataPointValue := func(t *testing.T, pmetrics pmetric.Metrics, value int64) {
930+
assert.NotNil(t, pmetrics)
931+
require.NotEmpty(t, pmetrics.ResourceMetrics().Len())
932+
rm := pmetrics.ResourceMetrics().At(0)
933+
require.NotEmpty(t, rm.ScopeMetrics().Len())
934+
sm := rm.ScopeMetrics().At(0)
935+
require.NotEmpty(t, sm.Metrics().Len())
936+
m := sm.Metrics().At(0)
937+
require.NotEmpty(t, m.Sum().DataPoints().Len())
938+
dp := m.Sum().DataPoints().At(0)
939+
require.Equal(t, value, dp.IntValue())
940+
}
941+
942+
// first call buildMetrics(), it will emit zero value
943+
pmetrics := p.buildMetrics()
944+
verifyDataPointValue(t, pmetrics, 0)
945+
946+
// second call buildMetrics(), it will emit actual value
947+
pmetrics = p.buildMetrics()
948+
verifyDataPointValue(t, pmetrics, 1)
949+
}
950+
915951
func TestResourceMetricsCache(t *testing.T) {
916952
p, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 0, []string{}, 1000, clockwork.NewFakeClock())
917953
require.NoError(t, err)
@@ -1372,7 +1408,7 @@ func TestConnector_initHistogramMetrics(t *testing.T) {
13721408
{
13731409
name: "initialize histogram with no config provided",
13741410
config: Config{},
1375-
want: metrics.NewExplicitHistogramMetrics(defaultHistogramBucketsMs, nil),
1411+
want: metrics.NewExplicitHistogramMetrics(defaultHistogramBucketsMs, nil, 0),
13761412
},
13771413
{
13781414
name: "Disable histogram",
@@ -1390,7 +1426,7 @@ func TestConnector_initHistogramMetrics(t *testing.T) {
13901426
Unit: metrics.Milliseconds,
13911427
},
13921428
},
1393-
want: metrics.NewExplicitHistogramMetrics(defaultHistogramBucketsMs, nil),
1429+
want: metrics.NewExplicitHistogramMetrics(defaultHistogramBucketsMs, nil, 0),
13941430
},
13951431
{
13961432
name: "initialize explicit histogram with default bounds (seconds)",
@@ -1399,7 +1435,7 @@ func TestConnector_initHistogramMetrics(t *testing.T) {
13991435
Unit: metrics.Seconds,
14001436
},
14011437
},
1402-
want: metrics.NewExplicitHistogramMetrics(defaultHistogramBucketsSeconds, nil),
1438+
want: metrics.NewExplicitHistogramMetrics(defaultHistogramBucketsSeconds, nil, 0),
14031439
},
14041440
{
14051441
name: "initialize explicit histogram with bounds (seconds)",
@@ -1414,7 +1450,7 @@ func TestConnector_initHistogramMetrics(t *testing.T) {
14141450
},
14151451
},
14161452
},
1417-
want: metrics.NewExplicitHistogramMetrics([]float64{0.1, 1}, nil),
1453+
want: metrics.NewExplicitHistogramMetrics([]float64{0.1, 1}, nil, 0),
14181454
},
14191455
{
14201456
name: "initialize explicit histogram with bounds (ms)",
@@ -1429,7 +1465,7 @@ func TestConnector_initHistogramMetrics(t *testing.T) {
14291465
},
14301466
},
14311467
},
1432-
want: metrics.NewExplicitHistogramMetrics([]float64{100, 1000}, nil),
1468+
want: metrics.NewExplicitHistogramMetrics([]float64{100, 1000}, nil, 0),
14331469
},
14341470
{
14351471
name: "initialize exponential histogram",
@@ -1441,7 +1477,7 @@ func TestConnector_initHistogramMetrics(t *testing.T) {
14411477
},
14421478
},
14431479
},
1444-
want: metrics.NewExponentialHistogramMetrics(10, nil),
1480+
want: metrics.NewExponentialHistogramMetrics(10, nil, 0),
14451481
},
14461482
{
14471483
name: "initialize exponential histogram with default max buckets count",
@@ -1451,7 +1487,7 @@ func TestConnector_initHistogramMetrics(t *testing.T) {
14511487
Exponential: &ExponentialHistogramConfig{},
14521488
},
14531489
},
1454-
want: metrics.NewExponentialHistogramMetrics(structure.DefaultMaxSize, nil),
1490+
want: metrics.NewExponentialHistogramMetrics(structure.DefaultMaxSize, nil, 0),
14551491
},
14561492
}
14571493
for _, tt := range tests {
@@ -1975,36 +2011,42 @@ func TestConnectorWithCardinalityLimit(t *testing.T) {
19752011

19762012
// Create spans for the resources
19772013
traces := ptrace.NewTraces()
2014+
19782015
rspan1 := traces.ResourceSpans().AppendEmpty()
19792016
resource1.CopyTo(rspan1.Resource())
1980-
ils := rspan1.ScopeSpans().AppendEmpty()
2017+
ils1 := rspan1.ScopeSpans().AppendEmpty()
19812018

19822019
rspan2 := traces.ResourceSpans().AppendEmpty()
19832020
resource2.CopyTo(rspan2.Resource())
19842021
ils2 := rspan2.ScopeSpans().AppendEmpty()
19852022

19862023
// Add spans with different names to trigger overflow
1987-
for i := 0; i < 3; i++ {
1988-
span := ils.Spans().AppendEmpty()
1989-
span.SetName(fmt.Sprintf("operation%d", i))
1990-
span.SetKind(ptrace.SpanKindServer)
1991-
span.Attributes().PutStr("http.method", "GET")
2024+
for i := 0; i < 5; i++ {
2025+
span1 := ils1.Spans().AppendEmpty()
2026+
span1.SetName(fmt.Sprintf("operation%d", i))
2027+
span1.SetKind(ptrace.SpanKindServer)
2028+
span1.Attributes().PutStr("http.method", "GET")
19922029

19932030
span2 := ils2.Spans().AppendEmpty()
19942031
span2.SetName(fmt.Sprintf("operation%d", i))
19952032
span2.SetKind(ptrace.SpanKindServer)
19962033
span2.Attributes().PutStr("http.method", "GET")
19972034
}
19982035

2036+
// Send two batches of spans to the connector to ensure it consumes more spans data,
2037+
// avoiding potential edge-case traps.
2038+
assert.NoError(t, connector.ConsumeTraces(context.Background(), traces))
19992039
assert.NoError(t, connector.ConsumeTraces(context.Background(), traces))
20002040

2001-
metrics := connector.buildMetrics()
2041+
// Ignore the first buildMetrics call, which emits zero datapoint values.
2042+
_ = connector.buildMetrics()
20022043

2003-
resourceMetrics := metrics.ResourceMetrics()
2004-
assert.Equal(t, 2, resourceMetrics.Len()) // 2 resources
2044+
pmetrics := connector.buildMetrics()
2045+
rmetrics := pmetrics.ResourceMetrics()
2046+
assert.Equal(t, 2, rmetrics.Len()) // 2 ResourceMetrics
20052047

2006-
for i := 0; i < resourceMetrics.Len(); i++ {
2007-
rm := resourceMetrics.At(i)
2048+
for i := 0; i < rmetrics.Len(); i++ {
2049+
rm := rmetrics.At(i)
20082050
serviceName, _ := rm.Resource().Attributes().Get("service.name")
20092051

20102052
// Each resource should have:
@@ -2013,11 +2055,13 @@ func TestConnectorWithCardinalityLimit(t *testing.T) {
20132055
metricCount := 0
20142056
overflowCount := 0
20152057

2016-
metrics := rm.ScopeMetrics().At(0).Metrics()
2017-
for j := 0; j < metrics.Len(); j++ {
2018-
metric := metrics.At(j)
2058+
assert.Equal(t, 1, rm.ScopeMetrics().Len()) // one ScopeMetrics
2059+
metricsSlice := rm.ScopeMetrics().At(0).Metrics()
2060+
for j := 0; j < metricsSlice.Len(); j++ {
2061+
metric := metricsSlice.At(j)
20192062
if metric.Name() == buildMetricName(DefaultNamespace, metricNameCalls) {
20202063
dps := metric.Sum().DataPoints()
2064+
assert.Equal(t, 3, dps.Len()) // three DataPoints
20212065
for k := 0; k < dps.Len(); k++ {
20222066
dp := dps.At(k)
20232067
if _, exists := dp.Attributes().Get(overflowKey); exists {
@@ -2026,14 +2070,7 @@ func TestConnectorWithCardinalityLimit(t *testing.T) {
20262070
overflowVal, exists := attrs.Get(overflowKey)
20272071
assert.True(t, exists)
20282072
assert.True(t, overflowVal.Bool())
2029-
_, exists = attrs.Get("region")
2030-
assert.True(t, exists)
2031-
_, exists = attrs.Get(spanNameKey)
2032-
assert.False(t, exists)
2033-
_, exists = attrs.Get(spanKindKey)
2034-
assert.False(t, exists)
2035-
_, exists = attrs.Get(statusCodeKey)
2036-
assert.False(t, exists)
2073+
assert.Equal(t, int64(6), dp.IntValue()) // overflow datapoints have value of 6
20372074
} else {
20382075
metricCount++
20392076
attrs := dp.Attributes()
@@ -2047,6 +2084,37 @@ func TestConnectorWithCardinalityLimit(t *testing.T) {
20472084
assert.True(t, exists)
20482085
_, exists = attrs.Get("region")
20492086
assert.True(t, exists)
2087+
assert.Equal(t, int64(2), dp.IntValue()) // normal datapoints have value of 2
2088+
}
2089+
}
2090+
}
2091+
if metric.Name() == buildMetricName(DefaultNamespace, metricNameDuration) {
2092+
dps := metric.Histogram().DataPoints()
2093+
assert.Equal(t, 3, dps.Len()) // three DataPoints
2094+
for k := 0; k < dps.Len(); k++ {
2095+
assert.Equal(t, 3, dps.Len()) // three DataPoints
2096+
for k := 0; k < dps.Len(); k++ {
2097+
dp := dps.At(k)
2098+
if _, exists := dp.Attributes().Get(overflowKey); exists {
2099+
attrs := dp.Attributes()
2100+
overflowVal, exists := attrs.Get(overflowKey)
2101+
assert.True(t, exists)
2102+
assert.True(t, overflowVal.Bool())
2103+
assert.Equal(t, uint64(6), dp.Count()) // overflow datapoints have value of 6
2104+
} else {
2105+
attrs := dp.Attributes()
2106+
_, exists := attrs.Get(serviceNameKey)
2107+
assert.True(t, exists)
2108+
_, exists = attrs.Get(spanNameKey)
2109+
assert.True(t, exists)
2110+
_, exists = attrs.Get(spanKindKey)
2111+
assert.True(t, exists)
2112+
_, exists = attrs.Get(statusCodeKey)
2113+
assert.True(t, exists)
2114+
_, exists = attrs.Get("region")
2115+
assert.True(t, exists)
2116+
assert.Equal(t, uint64(2), dp.Count()) // normal datapoints have value of 2
2117+
}
20502118
}
20512119
}
20522120
}
@@ -2086,27 +2154,28 @@ func TestConnectorWithCardinalityLimitForEvents(t *testing.T) {
20862154

20872155
// Add 3 different events to trigger overflow
20882156
events := span.Events()
2089-
for i := 0; i < 3; i++ {
2157+
for i := 0; i < 5; i++ {
20902158
event := events.AppendEmpty()
20912159
event.SetName(fmt.Sprintf("event%d", i))
20922160
event.Attributes().PutStr("event.name", fmt.Sprintf("event%d", i))
20932161
}
20942162

2095-
// First consume to reach the limit
2163+
// Send two batches of spans to the connector to ensure it consumes more spans data,
2164+
// avoiding potential edge-case traps.
20962165
assert.NoError(t, connector.ConsumeTraces(context.Background(), traces))
2097-
2098-
// Second consume to trigger overflow
20992166
assert.NoError(t, connector.ConsumeTraces(context.Background(), traces))
21002167

2101-
metrics := connector.buildMetrics()
2168+
// Ignore the first buildMetrics call, which emits zero datapoint values.
2169+
_ = connector.buildMetrics()
2170+
pmetrics := connector.buildMetrics()
21022171

2103-
resourceMetrics := metrics.ResourceMetrics()
2104-
assert.Equal(t, 1, resourceMetrics.Len())
2172+
rmetrics := pmetrics.ResourceMetrics()
2173+
assert.Equal(t, 1, rmetrics.Len())
21052174

2106-
rm := resourceMetrics.At(0)
2107-
serviceName, _ := rm.Resource().Attributes().Get("service.name")
2175+
rm := rmetrics.At(0)
21082176

21092177
// Check events metric
2178+
assert.Equal(t, 1, rm.ScopeMetrics().Len())
21102179
metricsSlice := rm.ScopeMetrics().At(0).Metrics()
21112180
var eventsMetric pmetric.Metric
21122181
for i := 0; i < metricsSlice.Len(); i++ {
@@ -2125,10 +2194,7 @@ func TestConnectorWithCardinalityLimitForEvents(t *testing.T) {
21252194
dp := dps.At(i)
21262195
if _, exists := dp.Attributes().Get(overflowKey); exists {
21272196
overflowCount++
2128-
// Verify overflow metric has service name
2129-
serviceNameAttr, ok := dp.Attributes().Get(serviceNameKey)
2130-
assert.True(t, ok)
2131-
assert.Equal(t, serviceName.Str(), serviceNameAttr.Str())
2197+
assert.Equal(t, int64(6), dp.IntValue())
21322198
} else {
21332199
normalCount++
21342200
// Verify normal metric has event name
@@ -2143,6 +2209,7 @@ func TestConnectorWithCardinalityLimitForEvents(t *testing.T) {
21432209
assert.True(t, exists)
21442210
_, exists = attrs.Get(statusCodeKey)
21452211
assert.True(t, exists)
2212+
assert.Equal(t, int64(2), dp.IntValue())
21462213
}
21472214
}
21482215

0 commit comments

Comments
 (0)