Skip to content

Commit ee94c77

Browse files
committed
(open-telemetry#31671) produce delta temporality span metrics with timestamps representing an uninterrupted series. This can avoid significant memory usage compared to producing cumulative span metrics, as long a downstream component can convert from delta back to cumulative, which can depend on the timestamps being uninterrupted.
1 parent 17fe4f8 commit ee94c77

File tree

10 files changed

+395
-40
lines changed

10 files changed

+395
-40
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: spanmetrics
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Produce delta temporality span metrics with StartTimeUnixNano and TimeUnixNano values representing an uninterrupted series
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [31671, 30688]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: This allows producing delta span metrics instead of the more memory-intensive cumulative metrics, specifically when a downstream component can convert the delta metrics to cumulative.
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

connector/spanmetricsconnector/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,8 @@ The following settings can be optionally configured:
120120
- `enabled`: (default: `false`): enabling will add the events metric.
121121
- `dimensions`: (mandatory if `enabled`) the list of the span's event attributes to add as dimensions to the events metric, which will be included _on top of_ the common and configured `dimensions` for span and resource attributes.
122122
- `resource_metrics_key_attributes`: Filter the resource attributes used to produce the resource metrics key map hash. Use this in case changing resource attributes (e.g. process id) are breaking counter metrics.
123+
- `delta_temporality`: Configuration exclusive to generating delta temporality span metrics
124+
- `metric_timestamp_cache_size` (default `10000`): Size of the cache used to keep track of a metric's TimestampUnixNano the last time it was flushed. When a metric is evicted from the cache, its next data point will indicate a "reset" in the series. Downstream components converting from delta to cumulative, like `prometheusexporter`, may handle these resets by setting cumulative counters back to 0.
123125

124126
## Examples
125127

connector/spanmetricsconnector/config.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ var defaultHistogramBucketsMs = []float64{
2323
2, 4, 6, 8, 10, 50, 100, 200, 400, 800, 1000, 1400, 2000, 5000, 10_000, 15_000,
2424
}
2525

26+
var defaultDeltaTimestampCacheSize = 10000
27+
2628
// Dimension defines the dimension name and optional default value if the Dimension is missing from a span attribute.
2729
type Dimension struct {
2830
Name string `mapstructure:"name"`
@@ -79,6 +81,9 @@ type Config struct {
7981

8082
// Events defines the configuration for events section of spans.
8183
Events EventsConfig `mapstructure:"events"`
84+
85+
// DeltaTemporalityConfig is configuration that's exclusive to generating delta span metrics
86+
DeltaTemporalityConfig *DeltaTemporalityConfig `mapstructure:"delta_temporality"`
8287
}
8388

8489
type HistogramConfig struct {
@@ -109,6 +114,11 @@ type EventsConfig struct {
109114
Dimensions []Dimension `mapstructure:"dimensions"`
110115
}
111116

117+
type DeltaTemporalityConfig struct {
118+
// TimestampCacheSize controls the size of the cache used to keep track of a metric's TimestampUnixNano the last time it was flushed
119+
TimestampCacheSize *int `mapstructure:"metric_timestamp_cache_size"`
120+
}
121+
112122
var _ component.ConfigValidator = (*Config)(nil)
113123

114124
// Validate checks if the processor configuration is valid
@@ -139,6 +149,13 @@ func (c Config) Validate() error {
139149
return fmt.Errorf("invalid metrics_expiration: %v, the duration should be positive", c.MetricsExpiration)
140150
}
141151

152+
if c.GetAggregationTemporality() == pmetric.AggregationTemporalityDelta && c.GetDeltaTimestampCacheSize() <= 0 {
153+
return fmt.Errorf(
154+
"invalid delta timestamp cache size: %v, the maximum number of the items in the cache should be positive",
155+
c.GetDeltaTimestampCacheSize(),
156+
)
157+
}
158+
142159
return nil
143160
}
144161

@@ -151,6 +168,13 @@ func (c Config) GetAggregationTemporality() pmetric.AggregationTemporality {
151168
return pmetric.AggregationTemporalityCumulative
152169
}
153170

171+
func (c Config) GetDeltaTimestampCacheSize() int {
172+
if c.DeltaTemporalityConfig != nil && c.DeltaTemporalityConfig.TimestampCacheSize != nil {
173+
return *c.DeltaTemporalityConfig.TimestampCacheSize
174+
}
175+
return defaultDeltaTimestampCacheSize
176+
}
177+
154178
// validateDimensions checks duplicates for reserved dimensions and additional dimensions.
155179
func validateDimensions(dimensions []Dimension) error {
156180
labelNames := make(map[string]struct{})

connector/spanmetricsconnector/config_test.go

Lines changed: 49 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,12 @@ func TestLoadConfig(t *testing.T) {
2727

2828
defaultMethod := "GET"
2929
defaultMaxPerDatapoint := 5
30+
customTimestampCacheSize := 123
3031
tests := []struct {
31-
id component.ID
32-
expected component.Config
33-
errorMessage string
32+
id component.ID
33+
expected component.Config
34+
errorMessage string
35+
extraAssertions func(config *Config)
3436
}{
3537
{
3638
id: component.NewIDWithName(metadata.Type, "default"),
@@ -125,6 +127,47 @@ func TestLoadConfig(t *testing.T) {
125127
Histogram: HistogramConfig{Disable: false, Unit: defaultUnit},
126128
},
127129
},
130+
{
131+
id: component.NewIDWithName(metadata.Type, "custom_delta_timestamp_cache_size"),
132+
expected: &Config{
133+
AggregationTemporality: "AGGREGATION_TEMPORALITY_DELTA",
134+
DeltaTemporalityConfig: &DeltaTemporalityConfig{TimestampCacheSize: &customTimestampCacheSize},
135+
DimensionsCacheSize: defaultDimensionsCacheSize,
136+
ResourceMetricsCacheSize: defaultResourceMetricsCacheSize,
137+
MetricsFlushInterval: 15 * time.Second,
138+
Histogram: HistogramConfig{Disable: false, Unit: defaultUnit},
139+
},
140+
},
141+
{
142+
id: component.NewIDWithName(metadata.Type, "default_delta_timestamp_cache_size"),
143+
expected: &Config{
144+
AggregationTemporality: "AGGREGATION_TEMPORALITY_DELTA",
145+
DimensionsCacheSize: defaultDimensionsCacheSize,
146+
ResourceMetricsCacheSize: defaultResourceMetricsCacheSize,
147+
MetricsFlushInterval: 15 * time.Second,
148+
Histogram: HistogramConfig{Disable: false, Unit: defaultUnit},
149+
},
150+
extraAssertions: func(config *Config) {
151+
assert.Equal(t, defaultDeltaTimestampCacheSize, config.GetDeltaTimestampCacheSize())
152+
},
153+
},
154+
{
155+
id: component.NewIDWithName(metadata.Type, "default_delta_timestamp_cache_size2"),
156+
expected: &Config{
157+
AggregationTemporality: "AGGREGATION_TEMPORALITY_DELTA",
158+
DimensionsCacheSize: defaultDimensionsCacheSize,
159+
ResourceMetricsCacheSize: defaultResourceMetricsCacheSize,
160+
MetricsFlushInterval: 15 * time.Second,
161+
Histogram: HistogramConfig{Disable: false, Unit: defaultUnit},
162+
},
163+
extraAssertions: func(config *Config) {
164+
assert.Equal(t, defaultDeltaTimestampCacheSize, config.GetDeltaTimestampCacheSize())
165+
},
166+
},
167+
{
168+
id: component.NewIDWithName(metadata.Type, "invalid_delta_timestamp_cache_size"),
169+
errorMessage: "invalid delta timestamp cache size: 0, the maximum number of the items in the cache should be positive",
170+
},
128171
}
129172

130173
for _, tt := range tests {
@@ -143,6 +186,9 @@ func TestLoadConfig(t *testing.T) {
143186
}
144187
assert.NoError(t, component.ValidateConfig(cfg))
145188
assert.Equal(t, tt.expected, cfg)
189+
if tt.extraAssertions != nil {
190+
tt.extraAssertions(cfg.(*Config))
191+
}
146192
})
147193
}
148194
}

connector/spanmetricsconnector/connector.go

Lines changed: 46 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package spanmetricsconnector // import "github.com/open-telemetry/opentelemetry-
66
import (
77
"bytes"
88
"context"
9+
"github.com/hashicorp/golang-lru/v2/simplelru"
910
"sync"
1011
"time"
1112

@@ -72,6 +73,9 @@ type connectorImp struct {
7273
eDimensions []dimension
7374

7475
events EventsConfig
76+
77+
// Tracks the last TimestampUnixNano for delta metrics so that they represent an uninterrupted series. Unused for cumulative span metrics.
78+
lastDeltaTimestamps *simplelru.LRU[metrics.Key, pcommon.Timestamp]
7579
}
7680

7781
type resourceMetrics struct {
@@ -125,6 +129,16 @@ func newConnector(logger *zap.Logger, config component.Config, ticker *clock.Tic
125129
resourceMetricsKeyAttributes[attr] = s
126130
}
127131

132+
var lastDeltaTimestamps *simplelru.LRU[metrics.Key, pcommon.Timestamp]
133+
if cfg.GetAggregationTemporality() == pmetric.AggregationTemporalityDelta {
134+
lastDeltaTimestamps, err = simplelru.NewLRU[metrics.Key, pcommon.Timestamp](cfg.GetDeltaTimestampCacheSize(), func(k metrics.Key, v pcommon.Timestamp) {
135+
logger.Info("Evicting cached delta timestamp", zap.String("key", string(k)))
136+
})
137+
if err != nil {
138+
return nil, err
139+
}
140+
}
141+
128142
return &connectorImp{
129143
logger: logger,
130144
config: *cfg,
@@ -133,6 +147,7 @@ func newConnector(logger *zap.Logger, config component.Config, ticker *clock.Tic
133147
dimensions: newDimensions(cfg.Dimensions),
134148
keyBuf: bytes.NewBuffer(make([]byte, 0, 1024)),
135149
metricKeyToDimensions: metricKeyToDimensionsCache,
150+
lastDeltaTimestamps: lastDeltaTimestamps,
136151
ticker: ticker,
137152
done: make(chan struct{}),
138153
eDimensions: newDimensions(cfg.Events.Dimensions),
@@ -251,6 +266,7 @@ func (p *connectorImp) exportMetrics(ctx context.Context) {
251266
// buildMetrics collects the computed raw metrics data and builds OTLP metrics.
252267
func (p *connectorImp) buildMetrics() pmetric.Metrics {
253268
m := pmetric.NewMetrics()
269+
timestamp := pcommon.NewTimestampFromTime(time.Now())
254270

255271
p.resourceMetrics.ForEach(func(_ resourceKey, rawMetrics *resourceMetrics) {
256272
rm := m.ResourceMetrics().AppendEmpty()
@@ -259,23 +275,46 @@ func (p *connectorImp) buildMetrics() pmetric.Metrics {
259275
sm := rm.ScopeMetrics().AppendEmpty()
260276
sm.Scope().SetName("spanmetricsconnector")
261277

278+
/**
279+
* To represent an uninterrupted stream of metrics as per the spec, the (StartTimestamp, Timestamp)'s of successive data points should be:
280+
* - For cumulative metrics: (T1, T2), (T1, T3), (T1, T4) ...
281+
* - For delta metrics: (T1, T2), (T2, T3), (T3, T4) ...
282+
*/
283+
deltaMetricKeys := make(map[metrics.Key]bool)
284+
startTimeGenerator := func(mk metrics.Key) pcommon.Timestamp {
285+
startTime := rawMetrics.startTimestamp
286+
if p.config.GetAggregationTemporality() == pmetric.AggregationTemporalityDelta {
287+
if lastTimestamp, ok := p.lastDeltaTimestamps.Get(mk); ok {
288+
startTime = lastTimestamp
289+
}
290+
// Collect lastDeltaTimestamps keys that need to be updated. Metrics can share the same key, so defer the update.
291+
deltaMetricKeys[mk] = true
292+
}
293+
return startTime
294+
}
295+
262296
sums := rawMetrics.sums
263297
metric := sm.Metrics().AppendEmpty()
264298
metric.SetName(buildMetricName(p.config.Namespace, metricNameCalls))
265-
sums.BuildMetrics(metric, rawMetrics.startTimestamp, p.config.GetAggregationTemporality())
299+
sums.BuildMetrics(metric, startTimeGenerator, timestamp, p.config.GetAggregationTemporality())
266300
if !p.config.Histogram.Disable {
267301
histograms := rawMetrics.histograms
268302
metric = sm.Metrics().AppendEmpty()
269303
metric.SetName(buildMetricName(p.config.Namespace, metricNameDuration))
270304
metric.SetUnit(p.config.Histogram.Unit.String())
271-
histograms.BuildMetrics(metric, rawMetrics.startTimestamp, p.config.GetAggregationTemporality())
305+
histograms.BuildMetrics(metric, startTimeGenerator, timestamp, p.config.GetAggregationTemporality())
272306
}
273307

274308
events := rawMetrics.events
275309
if p.events.Enabled {
276310
metric = sm.Metrics().AppendEmpty()
277311
metric.SetName(buildMetricName(p.config.Namespace, metricNameEvents))
278-
events.BuildMetrics(metric, rawMetrics.startTimestamp, p.config.GetAggregationTemporality())
312+
events.BuildMetrics(metric, startTimeGenerator, timestamp, p.config.GetAggregationTemporality())
313+
}
314+
315+
for mk := range deltaMetricKeys {
316+
// For delta metrics, cache the current data point's timestamp, which will be the start timestamp for the next data points in the series
317+
p.lastDeltaTimestamps.Add(mk, timestamp)
279318
}
280319
})
281320

@@ -322,6 +361,7 @@ func (p *connectorImp) resetState() {
322361
// and span metadata such as name, kind, status_code and any additional
323362
// dimensions the user has configured.
324363
func (p *connectorImp) aggregateMetrics(traces ptrace.Traces) {
364+
startTimestamp := pcommon.NewTimestampFromTime(time.Now())
325365
for i := 0; i < traces.ResourceSpans().Len(); i++ {
326366
rspans := traces.ResourceSpans().At(i)
327367
resourceAttr := rspans.Resource().Attributes()
@@ -330,7 +370,7 @@ func (p *connectorImp) aggregateMetrics(traces ptrace.Traces) {
330370
continue
331371
}
332372

333-
rm := p.getOrCreateResourceMetrics(resourceAttr)
373+
rm := p.getOrCreateResourceMetrics(resourceAttr, startTimestamp)
334374
sums := rm.sums
335375
histograms := rm.histograms
336376
events := rm.events
@@ -427,7 +467,7 @@ func (p *connectorImp) createResourceKey(attr pcommon.Map) resourceKey {
427467
return pdatautil.MapHash(m)
428468
}
429469

430-
func (p *connectorImp) getOrCreateResourceMetrics(attr pcommon.Map) *resourceMetrics {
470+
func (p *connectorImp) getOrCreateResourceMetrics(attr pcommon.Map, startTimestamp pcommon.Timestamp) *resourceMetrics {
431471
key := p.createResourceKey(attr)
432472
v, ok := p.resourceMetrics.Get(key)
433473
if !ok {
@@ -436,7 +476,7 @@ func (p *connectorImp) getOrCreateResourceMetrics(attr pcommon.Map) *resourceMet
436476
sums: metrics.NewSumMetrics(p.config.Exemplars.MaxPerDataPoint),
437477
events: metrics.NewSumMetrics(p.config.Exemplars.MaxPerDataPoint),
438478
attributes: attr,
439-
startTimestamp: pcommon.NewTimestampFromTime(time.Now()),
479+
startTimestamp: startTimestamp,
440480
}
441481
p.resourceMetrics.Add(key, v)
442482
}

0 commit comments

Comments
 (0)