Skip to content

Commit b3a760c

Browse files
perebajArthurSens
authored andcommitted
[reciever/prometheusremotewritereceiver] Handle multiple timeseries with same metric name+type+unit (open-telemetry#38453)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description This PR belongs to part of the Linux foundation mentee program. Here we are dealing with data points slices that must be added to the same metric when the incoming time series have some attributes in common. Like same resource, metricName, scopeName, scopeVersion, unitRef and timeseries type. The reference to this implementation can be found https://opentelemetry.io/docs/specs/otel/metrics/data-model/#opentelemetry-protocol-data-model Besides that. We are creating 2 new test cases. The first one, to validate the described behavior above. The second is to validate an error case. When the unitRef passed doesn't match with the symbols slice, causing a panic. This test case is important to guarantee that the error is being handled correctly. <!-- Issue number (e.g. open-telemetry#1234) or full URL to issue, if applicable. --> #### Link to tracking issue Fixes part of open-telemetry#37277. Bullet point `Handle multiple timeseries with same metric name+type+unit` --------- Co-authored-by: Arthur Silva Sens <[email protected]>
1 parent 7e46ef3 commit b3a760c

File tree

4 files changed

+219
-53
lines changed

4 files changed

+219
-53
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: prometheusremotewritereciever
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Separate timeseries with the same labels are now translated into the same OTLP metric.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [37791]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: timeseries that belongs to the same metric should be added to the same datapoints slice.
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

receiver/prometheusremotewritereceiver/go.mod

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ require (
66
github.com/cespare/xxhash/v2 v2.3.0
77
github.com/gogo/protobuf v1.3.2
88
github.com/golang/snappy v1.0.0
9+
github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics v0.122.0
910
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.122.0
1011
github.com/prometheus/prometheus v0.300.1
1112
github.com/stretchr/testify v1.10.0
@@ -120,3 +121,5 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil
120121
replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest
121122

122123
replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden
124+
125+
replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics => ../../internal/exp/metrics

receiver/prometheusremotewritereceiver/receiver.go

Lines changed: 83 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ import (
2525
"go.opentelemetry.io/collector/pdata/pmetric"
2626
"go.opentelemetry.io/collector/receiver"
2727
"go.uber.org/zap/zapcore"
28+
29+
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity"
2830
)
2931

3032
func newRemoteWriteReceiver(settings receiver.Settings, cfg *Config, nextConsumer consumer.Metrics) (receiver.Metrics, error) {
@@ -176,11 +178,13 @@ func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, req *wr
176178
// This cache is called "intra" because in the future we'll have a "interRequestCache" to cache resourceAttributes
177179
// between requests based on the metric "target_info".
178180
intraRequestCache = make(map[uint64]pmetric.ResourceMetrics)
181+
// The key is composed by: resource_hash:scope_name:scope_version:metric_name:unit:type
182+
// TODO: use the appropriate hash function.
183+
metricCache = make(map[string]pmetric.Metric)
179184
)
180185

181186
for _, ts := range req.Timeseries {
182187
ls := ts.ToLabels(&labelsBuilder, req.Symbols)
183-
184188
if !ls.Has(labels.MetricName) {
185189
badRequestErrors = errors.Join(badRequestErrors, fmt.Errorf("missing metric name in labels"))
186190
continue
@@ -201,17 +205,78 @@ func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, req *wr
201205
intraRequestCache[hashedLabels] = rm
202206
}
203207

208+
scopeName, scopeVersion := prw.extractScopeInfo(ls)
209+
metricName := ls.Get(labels.MetricName)
210+
// TODO: Like UnitRef, we should assign the HelpRef to the metric.
211+
if ts.Metadata.UnitRef >= uint32(len(req.Symbols)) {
212+
badRequestErrors = errors.Join(badRequestErrors, fmt.Errorf("unit ref %d is out of bounds of symbolsTable", ts.Metadata.UnitRef))
213+
continue
214+
}
215+
unit := req.Symbols[ts.Metadata.UnitRef]
216+
217+
resourceID := identity.OfResource(rm.Resource())
218+
// Temporary approach to generate the metric key.
219+
// TODO: Replace this with a proper hashing function.
220+
// The definition of the metric uniqueness is based on the following document. Ref: https://opentelemetry.io/docs/specs/otel/metrics/data-model/#opentelemetry-protocol-data-model
221+
metricKey := fmt.Sprintf("%s:%s:%s:%s:%s:%d",
222+
resourceID.String(), // Resource identity
223+
scopeName, // Scope name
224+
scopeVersion, // Scope version
225+
metricName, // Metric name
226+
unit, // Unit
227+
ts.Metadata.Type) // Metric type
228+
229+
var scope pmetric.ScopeMetrics
230+
var foundScope bool
231+
for i := 0; i < rm.ScopeMetrics().Len(); i++ {
232+
s := rm.ScopeMetrics().At(i)
233+
if s.Scope().Name() == scopeName && s.Scope().Version() == scopeVersion {
234+
scope = s
235+
foundScope = true
236+
break
237+
}
238+
}
239+
if !foundScope {
240+
scope = rm.ScopeMetrics().AppendEmpty()
241+
scope.Scope().SetName(scopeName)
242+
scope.Scope().SetVersion(scopeVersion)
243+
}
244+
245+
metric, exists := metricCache[metricKey]
246+
// If the metric does not exist, we create an empty metric and add it to the cache.
247+
if !exists {
248+
metric = scope.Metrics().AppendEmpty()
249+
metric.SetName(metricName)
250+
metric.SetUnit(unit)
251+
252+
switch ts.Metadata.Type {
253+
case writev2.Metadata_METRIC_TYPE_GAUGE:
254+
metric.SetEmptyGauge()
255+
case writev2.Metadata_METRIC_TYPE_COUNTER:
256+
sum := metric.SetEmptySum()
257+
sum.SetIsMonotonic(true)
258+
sum.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
259+
case writev2.Metadata_METRIC_TYPE_HISTOGRAM:
260+
metric.SetEmptyHistogram()
261+
case writev2.Metadata_METRIC_TYPE_SUMMARY:
262+
metric.SetEmptySummary()
263+
}
264+
265+
metricCache[metricKey] = metric
266+
}
267+
268+
// Otherwise, we append the samples to the existing metric.
204269
switch ts.Metadata.Type {
205-
case writev2.Metadata_METRIC_TYPE_COUNTER:
206-
prw.addCounterDatapoints(rm, ls, ts)
207270
case writev2.Metadata_METRIC_TYPE_GAUGE:
208-
prw.addGaugeDatapoints(rm, ls, ts)
209-
case writev2.Metadata_METRIC_TYPE_SUMMARY:
210-
prw.addSummaryDatapoints(rm, ls, ts)
271+
addNumberDatapoints(metric.Gauge().DataPoints(), ls, ts)
272+
case writev2.Metadata_METRIC_TYPE_COUNTER:
273+
addNumberDatapoints(metric.Sum().DataPoints(), ls, ts)
211274
case writev2.Metadata_METRIC_TYPE_HISTOGRAM:
212-
prw.addHistogramDatapoints(rm, ls, ts)
275+
addHistogramDatapoints(metric.Histogram().DataPoints(), ls, ts)
276+
case writev2.Metadata_METRIC_TYPE_SUMMARY:
277+
addSummaryDatapoints(metric.Summary().DataPoints(), ls, ts)
213278
default:
214-
badRequestErrors = errors.Join(badRequestErrors, fmt.Errorf("unsupported metric type %q for metric %q", ts.Metadata.Type, ls.Get(labels.MetricName)))
279+
badRequestErrors = errors.Join(badRequestErrors, fmt.Errorf("unsupported metric type %q for metric %q", ts.Metadata.Type, metricName))
215280
}
216281
}
217282

@@ -235,46 +300,9 @@ func parseJobAndInstance(dest pcommon.Map, job, instance string) {
235300
}
236301
}
237302

238-
func (prw *prometheusRemoteWriteReceiver) addCounterDatapoints(_ pmetric.ResourceMetrics, _ labels.Labels, _ writev2.TimeSeries) {
239-
// TODO: Implement this function
240-
}
241-
242-
func (prw *prometheusRemoteWriteReceiver) addGaugeDatapoints(rm pmetric.ResourceMetrics, ls labels.Labels, ts writev2.TimeSeries) {
243-
// TODO: Cache metric name+type+unit and look up cache before creating new empty metric.
244-
// In OTel name+type+unit is the unique identifier of a metric and we should not create
245-
// a new metric if it already exists.
246-
247-
scopeName, scopeVersion := prw.extractScopeInfo(ls)
248-
249-
// Check if the name and version present in the labels are already present in the ResourceMetrics.
250-
// If it is not present, we should create a new ScopeMetrics.
251-
// Otherwise, we should append to the existing ScopeMetrics.
252-
for j := 0; j < rm.ScopeMetrics().Len(); j++ {
253-
scope := rm.ScopeMetrics().At(j)
254-
if scopeName == scope.Scope().Name() && scopeVersion == scope.Scope().Version() {
255-
addDatapoints(scope.Metrics().AppendEmpty().SetEmptyGauge().DataPoints(), ls, ts)
256-
return
257-
}
258-
}
259-
260-
scope := rm.ScopeMetrics().AppendEmpty()
261-
scope.Scope().SetName(scopeName)
262-
scope.Scope().SetVersion(scopeVersion)
263-
m := scope.Metrics().AppendEmpty().SetEmptyGauge()
264-
addDatapoints(m.DataPoints(), ls, ts)
265-
}
266-
267-
func (prw *prometheusRemoteWriteReceiver) addSummaryDatapoints(_ pmetric.ResourceMetrics, _ labels.Labels, _ writev2.TimeSeries) {
268-
// TODO: Implement this function
269-
}
270-
271-
func (prw *prometheusRemoteWriteReceiver) addHistogramDatapoints(_ pmetric.ResourceMetrics, _ labels.Labels, _ writev2.TimeSeries) {
272-
// TODO: Implement this function
273-
}
274-
275-
// addDatapoints adds the labels to the datapoints attributes.
303+
// addNumberDatapoints adds the labels to the datapoints attributes.
276304
// TODO: We're still not handling the StartTimestamp.
277-
func addDatapoints(datapoints pmetric.NumberDataPointSlice, ls labels.Labels, ts writev2.TimeSeries) {
305+
func addNumberDatapoints(datapoints pmetric.NumberDataPointSlice, ls labels.Labels, ts writev2.TimeSeries) {
278306
// Add samples from the timeseries
279307
for _, sample := range ts.Samples {
280308
dp := datapoints.AppendEmpty()
@@ -295,6 +323,14 @@ func addDatapoints(datapoints pmetric.NumberDataPointSlice, ls labels.Labels, ts
295323
}
296324
}
297325

326+
func addSummaryDatapoints(_ pmetric.SummaryDataPointSlice, _ labels.Labels, _ writev2.TimeSeries) {
327+
// TODO: Implement this function
328+
}
329+
330+
func addHistogramDatapoints(_ pmetric.HistogramDataPointSlice, _ labels.Labels, _ writev2.TimeSeries) {
331+
// TODO: Implement this function
332+
}
333+
298334
// extractScopeInfo extracts the scope name and version from the labels. If the labels do not contain the scope name/version,
299335
// it will use the default values from the settings.
300336
func (prw *prometheusRemoteWriteReceiver) extractScopeInfo(ls labels.Labels) (string, string) {

receiver/prometheusremotewritereceiver/receiver_test.go

Lines changed: 106 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,19 @@ func TestTranslateV2(t *testing.T) {
160160
},
161161
expectError: `duplicate label "__name__" in labels`,
162162
},
163+
{
164+
name: "UnitRef bigger than symbols length",
165+
request: &writev2.Request{
166+
Symbols: []string{"", "__name__", "test"},
167+
Timeseries: []writev2.TimeSeries{
168+
{
169+
Metadata: writev2.Metadata{Type: writev2.Metadata_METRIC_TYPE_GAUGE, UnitRef: 3},
170+
LabelsRefs: []uint32{1, 2},
171+
},
172+
},
173+
},
174+
expectError: "unit ref 3 is out of bounds of symbolsTable",
175+
},
163176
{
164177
name: "valid request",
165178
request: writeV2RequestFixture,
@@ -175,13 +188,17 @@ func TestTranslateV2(t *testing.T) {
175188
// Since we don't define the labels otel_scope_name and otel_scope_version, the default values coming from the receiver settings will be used.
176189
sm1.Scope().SetName("OpenTelemetry Collector")
177190
sm1.Scope().SetVersion("latest")
178-
dp1 := sm1.Metrics().AppendEmpty().SetEmptyGauge().DataPoints().AppendEmpty()
191+
metrics1 := sm1.Metrics().AppendEmpty()
192+
metrics1.SetName("test_metric1")
193+
metrics1.SetUnit("")
194+
195+
dp1 := metrics1.SetEmptyGauge().DataPoints().AppendEmpty()
179196
dp1.SetTimestamp(pcommon.Timestamp(1 * int64(time.Millisecond)))
180197
dp1.SetDoubleValue(1.0)
181198
dp1.Attributes().PutStr("d", "e")
182199
dp1.Attributes().PutStr("foo", "bar")
183200

184-
dp2 := sm1.Metrics().AppendEmpty().SetEmptyGauge().DataPoints().AppendEmpty()
201+
dp2 := metrics1.Gauge().DataPoints().AppendEmpty()
185202
dp2.SetTimestamp(pcommon.Timestamp(2 * int64(time.Millisecond)))
186203
dp2.SetDoubleValue(2.0)
187204
dp2.Attributes().PutStr("d", "e")
@@ -195,7 +212,11 @@ func TestTranslateV2(t *testing.T) {
195212
sm2 := rm2.ScopeMetrics().AppendEmpty()
196213
sm2.Scope().SetName("OpenTelemetry Collector")
197214
sm2.Scope().SetVersion("latest")
198-
dp3 := sm2.Metrics().AppendEmpty().SetEmptyGauge().DataPoints().AppendEmpty()
215+
metrics2 := sm2.Metrics().AppendEmpty()
216+
metrics2.SetName("test_metric1")
217+
metrics2.SetUnit("")
218+
219+
dp3 := metrics2.SetEmptyGauge().DataPoints().AppendEmpty()
199220
dp3.SetTimestamp(pcommon.Timestamp(2 * int64(time.Millisecond)))
200221
dp3.SetDoubleValue(2.0)
201222
dp3.Attributes().PutStr("d", "e")
@@ -249,22 +270,28 @@ func TestTranslateV2(t *testing.T) {
249270
sm1 := rm1.ScopeMetrics().AppendEmpty()
250271
sm1.Scope().SetName("scope1")
251272
sm1.Scope().SetVersion("v1")
273+
metrics1 := sm1.Metrics().AppendEmpty()
274+
metrics1.SetName("test_metric")
275+
metrics1.SetUnit("")
252276

253-
dp1 := sm1.Metrics().AppendEmpty().SetEmptyGauge().DataPoints().AppendEmpty()
277+
dp1 := metrics1.SetEmptyGauge().DataPoints().AppendEmpty()
254278
dp1.SetTimestamp(pcommon.Timestamp(1 * int64(time.Millisecond)))
255279
dp1.SetDoubleValue(1.0)
256280
dp1.Attributes().PutStr("d", "e")
257281

258-
dp2 := sm1.Metrics().AppendEmpty().SetEmptyGauge().DataPoints().AppendEmpty()
282+
dp2 := metrics1.Gauge().DataPoints().AppendEmpty()
259283
dp2.SetTimestamp(pcommon.Timestamp(2 * int64(time.Millisecond)))
260284
dp2.SetDoubleValue(2.0)
261285
dp2.Attributes().PutStr("d", "e")
262286

263287
sm2 := rm1.ScopeMetrics().AppendEmpty()
264288
sm2.Scope().SetName("scope2")
265289
sm2.Scope().SetVersion("v2")
290+
metrics2 := sm2.Metrics().AppendEmpty()
291+
metrics2.SetName("test_metric")
292+
metrics2.SetUnit("")
266293

267-
dp3 := sm2.Metrics().AppendEmpty().SetEmptyGauge().DataPoints().AppendEmpty()
294+
dp3 := metrics2.SetEmptyGauge().DataPoints().AppendEmpty()
268295
dp3.SetTimestamp(pcommon.Timestamp(3 * int64(time.Millisecond)))
269296
dp3.SetDoubleValue(3.0)
270297
dp3.Attributes().PutStr("foo", "bar")
@@ -273,6 +300,79 @@ func TestTranslateV2(t *testing.T) {
273300
}(),
274301
expectedStats: remote.WriteResponseStats{},
275302
},
303+
{
304+
name: "separate timeseries - same labels - should be same datapointslice",
305+
request: &writev2.Request{
306+
Symbols: []string{
307+
"",
308+
"__name__", "test_metric", // 1, 2
309+
"job", "service-x/test", // 3, 4
310+
"instance", "107cn001", // 5, 6
311+
"otel_scope_name", "scope1", // 7, 8
312+
"otel_scope_version", "v1", // 9, 10
313+
"d", "e", // 11, 12
314+
"foo", "bar", // 13, 14
315+
"f", "g", // 15, 16
316+
"seconds", "milliseconds", // 17, 18
317+
},
318+
Timeseries: []writev2.TimeSeries{
319+
{
320+
Metadata: writev2.Metadata{Type: writev2.Metadata_METRIC_TYPE_GAUGE, UnitRef: 17},
321+
LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12},
322+
Samples: []writev2.Sample{{Value: 1, Timestamp: 1}},
323+
},
324+
{
325+
Metadata: writev2.Metadata{Type: writev2.Metadata_METRIC_TYPE_GAUGE, UnitRef: 17},
326+
LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12},
327+
Samples: []writev2.Sample{{Value: 2, Timestamp: 2}},
328+
},
329+
{
330+
// Unit changed, so it should be a different metric.
331+
Metadata: writev2.Metadata{Type: writev2.Metadata_METRIC_TYPE_GAUGE, UnitRef: 18},
332+
LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 13, 14},
333+
Samples: []writev2.Sample{{Value: 3, Timestamp: 3}},
334+
},
335+
},
336+
},
337+
expectedMetrics: func() pmetric.Metrics {
338+
expected := pmetric.NewMetrics()
339+
rm1 := expected.ResourceMetrics().AppendEmpty()
340+
rmAttributes1 := rm1.Resource().Attributes()
341+
rmAttributes1.PutStr("service.namespace", "service-x")
342+
rmAttributes1.PutStr("service.name", "test")
343+
rmAttributes1.PutStr("service.instance.id", "107cn001")
344+
345+
sm1 := rm1.ScopeMetrics().AppendEmpty()
346+
sm1.Scope().SetName("scope1")
347+
sm1.Scope().SetVersion("v1")
348+
349+
// Expected to have 2 metrics and 3 data points.
350+
// The first metric should have 2 data points.
351+
// The second metric should have 1 data point.
352+
metrics1 := sm1.Metrics().AppendEmpty()
353+
metrics1.SetName("test_metric")
354+
metrics1.SetUnit("seconds")
355+
dp1 := metrics1.SetEmptyGauge().DataPoints().AppendEmpty()
356+
dp1.SetTimestamp(pcommon.Timestamp(1 * int64(time.Millisecond)))
357+
dp1.SetDoubleValue(1.0)
358+
dp1.Attributes().PutStr("d", "e")
359+
360+
dp2 := metrics1.Gauge().DataPoints().AppendEmpty()
361+
dp2.SetTimestamp(pcommon.Timestamp(2 * int64(time.Millisecond)))
362+
dp2.SetDoubleValue(2.0)
363+
dp2.Attributes().PutStr("d", "e")
364+
365+
metrics2 := sm1.Metrics().AppendEmpty()
366+
metrics2.SetName("test_metric")
367+
metrics2.SetUnit("milliseconds")
368+
dp3 := metrics2.SetEmptyGauge().DataPoints().AppendEmpty()
369+
dp3.SetTimestamp(pcommon.Timestamp(3 * int64(time.Millisecond)))
370+
dp3.SetDoubleValue(3.0)
371+
dp3.Attributes().PutStr("foo", "bar")
372+
373+
return expected
374+
}(),
375+
},
276376
} {
277377
t.Run(tc.name, func(t *testing.T) {
278378
metrics, stats, err := prwReceiver.translateV2(ctx, tc.request)

0 commit comments

Comments
 (0)