diff --git a/.chloggen/separate_timeseries_same_labels_same_datapoints.yaml b/.chloggen/separate_timeseries_same_labels_same_datapoints.yaml new file mode 100644 index 0000000000000..17aa66a9baf09 --- /dev/null +++ b/.chloggen/separate_timeseries_same_labels_same_datapoints.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: prometheusremotewritereciever + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Separate timeseries with the same labels are now translated into the same OTLP metric. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [37791] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: timeseries that belongs to the same metric should be added to the same datapoints slice. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/receiver/prometheusremotewritereceiver/go.mod b/receiver/prometheusremotewritereceiver/go.mod index 4905ef0d95fc9..8404bba76e49f 100644 --- a/receiver/prometheusremotewritereceiver/go.mod +++ b/receiver/prometheusremotewritereceiver/go.mod @@ -6,6 +6,7 @@ require ( github.com/cespare/xxhash/v2 v2.3.0 github.com/gogo/protobuf v1.3.2 github.com/golang/snappy v1.0.0 + github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics v0.122.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.122.0 github.com/prometheus/prometheus v0.300.1 github.com/stretchr/testify v1.10.0 @@ -120,3 +121,5 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden + +replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics => ../../internal/exp/metrics diff --git a/receiver/prometheusremotewritereceiver/receiver.go b/receiver/prometheusremotewritereceiver/receiver.go index 3be0428794743..9cf6d2fb2e5fb 100644 --- a/receiver/prometheusremotewritereceiver/receiver.go +++ b/receiver/prometheusremotewritereceiver/receiver.go @@ -25,6 +25,8 @@ import ( "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/receiver" "go.uber.org/zap/zapcore" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" ) func newRemoteWriteReceiver(settings receiver.Settings, cfg *Config, nextConsumer consumer.Metrics) (receiver.Metrics, error) { @@ -176,11 +178,13 @@ func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, req *wr // This cache is called "intra" because in the future we'll have a "interRequestCache" to cache resourceAttributes // between requests based on the metric "target_info". intraRequestCache = make(map[uint64]pmetric.ResourceMetrics) + // The key is composed by: resource_hash:scope_name:scope_version:metric_name:unit:type + // TODO: use the appropriate hash function. + metricCache = make(map[string]pmetric.Metric) ) for _, ts := range req.Timeseries { ls := ts.ToLabels(&labelsBuilder, req.Symbols) - if !ls.Has(labels.MetricName) { badRequestErrors = errors.Join(badRequestErrors, fmt.Errorf("missing metric name in labels")) continue @@ -201,17 +205,78 @@ func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, req *wr intraRequestCache[hashedLabels] = rm } + scopeName, scopeVersion := prw.extractScopeInfo(ls) + metricName := ls.Get(labels.MetricName) + // TODO: Like UnitRef, we should assign the HelpRef to the metric. + if ts.Metadata.UnitRef >= uint32(len(req.Symbols)) { + badRequestErrors = errors.Join(badRequestErrors, fmt.Errorf("unit ref %d is out of bounds of symbolsTable", ts.Metadata.UnitRef)) + continue + } + unit := req.Symbols[ts.Metadata.UnitRef] + + resourceID := identity.OfResource(rm.Resource()) + // Temporary approach to generate the metric key. + // TODO: Replace this with a proper hashing function. + // The definition of the metric uniqueness is based on the following document. Ref: https://opentelemetry.io/docs/specs/otel/metrics/data-model/#opentelemetry-protocol-data-model + metricKey := fmt.Sprintf("%s:%s:%s:%s:%s:%d", + resourceID.String(), // Resource identity + scopeName, // Scope name + scopeVersion, // Scope version + metricName, // Metric name + unit, // Unit + ts.Metadata.Type) // Metric type + + var scope pmetric.ScopeMetrics + var foundScope bool + for i := 0; i < rm.ScopeMetrics().Len(); i++ { + s := rm.ScopeMetrics().At(i) + if s.Scope().Name() == scopeName && s.Scope().Version() == scopeVersion { + scope = s + foundScope = true + break + } + } + if !foundScope { + scope = rm.ScopeMetrics().AppendEmpty() + scope.Scope().SetName(scopeName) + scope.Scope().SetVersion(scopeVersion) + } + + metric, exists := metricCache[metricKey] + // If the metric does not exist, we create an empty metric and add it to the cache. + if !exists { + metric = scope.Metrics().AppendEmpty() + metric.SetName(metricName) + metric.SetUnit(unit) + + switch ts.Metadata.Type { + case writev2.Metadata_METRIC_TYPE_GAUGE: + metric.SetEmptyGauge() + case writev2.Metadata_METRIC_TYPE_COUNTER: + sum := metric.SetEmptySum() + sum.SetIsMonotonic(true) + sum.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + case writev2.Metadata_METRIC_TYPE_HISTOGRAM: + metric.SetEmptyHistogram() + case writev2.Metadata_METRIC_TYPE_SUMMARY: + metric.SetEmptySummary() + } + + metricCache[metricKey] = metric + } + + // Otherwise, we append the samples to the existing metric. switch ts.Metadata.Type { - case writev2.Metadata_METRIC_TYPE_COUNTER: - prw.addCounterDatapoints(rm, ls, ts) case writev2.Metadata_METRIC_TYPE_GAUGE: - prw.addGaugeDatapoints(rm, ls, ts) - case writev2.Metadata_METRIC_TYPE_SUMMARY: - prw.addSummaryDatapoints(rm, ls, ts) + addNumberDatapoints(metric.Gauge().DataPoints(), ls, ts) + case writev2.Metadata_METRIC_TYPE_COUNTER: + addNumberDatapoints(metric.Sum().DataPoints(), ls, ts) case writev2.Metadata_METRIC_TYPE_HISTOGRAM: - prw.addHistogramDatapoints(rm, ls, ts) + addHistogramDatapoints(metric.Histogram().DataPoints(), ls, ts) + case writev2.Metadata_METRIC_TYPE_SUMMARY: + addSummaryDatapoints(metric.Summary().DataPoints(), ls, ts) default: - badRequestErrors = errors.Join(badRequestErrors, fmt.Errorf("unsupported metric type %q for metric %q", ts.Metadata.Type, ls.Get(labels.MetricName))) + badRequestErrors = errors.Join(badRequestErrors, fmt.Errorf("unsupported metric type %q for metric %q", ts.Metadata.Type, metricName)) } } @@ -235,46 +300,9 @@ func parseJobAndInstance(dest pcommon.Map, job, instance string) { } } -func (prw *prometheusRemoteWriteReceiver) addCounterDatapoints(_ pmetric.ResourceMetrics, _ labels.Labels, _ writev2.TimeSeries) { - // TODO: Implement this function -} - -func (prw *prometheusRemoteWriteReceiver) addGaugeDatapoints(rm pmetric.ResourceMetrics, ls labels.Labels, ts writev2.TimeSeries) { - // TODO: Cache metric name+type+unit and look up cache before creating new empty metric. - // In OTel name+type+unit is the unique identifier of a metric and we should not create - // a new metric if it already exists. - - scopeName, scopeVersion := prw.extractScopeInfo(ls) - - // Check if the name and version present in the labels are already present in the ResourceMetrics. - // If it is not present, we should create a new ScopeMetrics. - // Otherwise, we should append to the existing ScopeMetrics. - for j := 0; j < rm.ScopeMetrics().Len(); j++ { - scope := rm.ScopeMetrics().At(j) - if scopeName == scope.Scope().Name() && scopeVersion == scope.Scope().Version() { - addDatapoints(scope.Metrics().AppendEmpty().SetEmptyGauge().DataPoints(), ls, ts) - return - } - } - - scope := rm.ScopeMetrics().AppendEmpty() - scope.Scope().SetName(scopeName) - scope.Scope().SetVersion(scopeVersion) - m := scope.Metrics().AppendEmpty().SetEmptyGauge() - addDatapoints(m.DataPoints(), ls, ts) -} - -func (prw *prometheusRemoteWriteReceiver) addSummaryDatapoints(_ pmetric.ResourceMetrics, _ labels.Labels, _ writev2.TimeSeries) { - // TODO: Implement this function -} - -func (prw *prometheusRemoteWriteReceiver) addHistogramDatapoints(_ pmetric.ResourceMetrics, _ labels.Labels, _ writev2.TimeSeries) { - // TODO: Implement this function -} - -// addDatapoints adds the labels to the datapoints attributes. +// addNumberDatapoints adds the labels to the datapoints attributes. // TODO: We're still not handling the StartTimestamp. -func addDatapoints(datapoints pmetric.NumberDataPointSlice, ls labels.Labels, ts writev2.TimeSeries) { +func addNumberDatapoints(datapoints pmetric.NumberDataPointSlice, ls labels.Labels, ts writev2.TimeSeries) { // Add samples from the timeseries for _, sample := range ts.Samples { dp := datapoints.AppendEmpty() @@ -295,6 +323,14 @@ func addDatapoints(datapoints pmetric.NumberDataPointSlice, ls labels.Labels, ts } } +func addSummaryDatapoints(_ pmetric.SummaryDataPointSlice, _ labels.Labels, _ writev2.TimeSeries) { + // TODO: Implement this function +} + +func addHistogramDatapoints(_ pmetric.HistogramDataPointSlice, _ labels.Labels, _ writev2.TimeSeries) { + // TODO: Implement this function +} + // extractScopeInfo extracts the scope name and version from the labels. If the labels do not contain the scope name/version, // it will use the default values from the settings. func (prw *prometheusRemoteWriteReceiver) extractScopeInfo(ls labels.Labels) (string, string) { diff --git a/receiver/prometheusremotewritereceiver/receiver_test.go b/receiver/prometheusremotewritereceiver/receiver_test.go index ab31ce72183f1..163b23e23eea0 100644 --- a/receiver/prometheusremotewritereceiver/receiver_test.go +++ b/receiver/prometheusremotewritereceiver/receiver_test.go @@ -160,6 +160,19 @@ func TestTranslateV2(t *testing.T) { }, expectError: `duplicate label "__name__" in labels`, }, + { + name: "UnitRef bigger than symbols length", + request: &writev2.Request{ + Symbols: []string{"", "__name__", "test"}, + Timeseries: []writev2.TimeSeries{ + { + Metadata: writev2.Metadata{Type: writev2.Metadata_METRIC_TYPE_GAUGE, UnitRef: 3}, + LabelsRefs: []uint32{1, 2}, + }, + }, + }, + expectError: "unit ref 3 is out of bounds of symbolsTable", + }, { name: "valid request", request: writeV2RequestFixture, @@ -175,13 +188,17 @@ func TestTranslateV2(t *testing.T) { // Since we don't define the labels otel_scope_name and otel_scope_version, the default values coming from the receiver settings will be used. sm1.Scope().SetName("OpenTelemetry Collector") sm1.Scope().SetVersion("latest") - dp1 := sm1.Metrics().AppendEmpty().SetEmptyGauge().DataPoints().AppendEmpty() + metrics1 := sm1.Metrics().AppendEmpty() + metrics1.SetName("test_metric1") + metrics1.SetUnit("") + + dp1 := metrics1.SetEmptyGauge().DataPoints().AppendEmpty() dp1.SetTimestamp(pcommon.Timestamp(1 * int64(time.Millisecond))) dp1.SetDoubleValue(1.0) dp1.Attributes().PutStr("d", "e") dp1.Attributes().PutStr("foo", "bar") - dp2 := sm1.Metrics().AppendEmpty().SetEmptyGauge().DataPoints().AppendEmpty() + dp2 := metrics1.Gauge().DataPoints().AppendEmpty() dp2.SetTimestamp(pcommon.Timestamp(2 * int64(time.Millisecond))) dp2.SetDoubleValue(2.0) dp2.Attributes().PutStr("d", "e") @@ -195,7 +212,11 @@ func TestTranslateV2(t *testing.T) { sm2 := rm2.ScopeMetrics().AppendEmpty() sm2.Scope().SetName("OpenTelemetry Collector") sm2.Scope().SetVersion("latest") - dp3 := sm2.Metrics().AppendEmpty().SetEmptyGauge().DataPoints().AppendEmpty() + metrics2 := sm2.Metrics().AppendEmpty() + metrics2.SetName("test_metric1") + metrics2.SetUnit("") + + dp3 := metrics2.SetEmptyGauge().DataPoints().AppendEmpty() dp3.SetTimestamp(pcommon.Timestamp(2 * int64(time.Millisecond))) dp3.SetDoubleValue(2.0) dp3.Attributes().PutStr("d", "e") @@ -249,13 +270,16 @@ func TestTranslateV2(t *testing.T) { sm1 := rm1.ScopeMetrics().AppendEmpty() sm1.Scope().SetName("scope1") sm1.Scope().SetVersion("v1") + metrics1 := sm1.Metrics().AppendEmpty() + metrics1.SetName("test_metric") + metrics1.SetUnit("") - dp1 := sm1.Metrics().AppendEmpty().SetEmptyGauge().DataPoints().AppendEmpty() + dp1 := metrics1.SetEmptyGauge().DataPoints().AppendEmpty() dp1.SetTimestamp(pcommon.Timestamp(1 * int64(time.Millisecond))) dp1.SetDoubleValue(1.0) dp1.Attributes().PutStr("d", "e") - dp2 := sm1.Metrics().AppendEmpty().SetEmptyGauge().DataPoints().AppendEmpty() + dp2 := metrics1.Gauge().DataPoints().AppendEmpty() dp2.SetTimestamp(pcommon.Timestamp(2 * int64(time.Millisecond))) dp2.SetDoubleValue(2.0) dp2.Attributes().PutStr("d", "e") @@ -263,8 +287,11 @@ func TestTranslateV2(t *testing.T) { sm2 := rm1.ScopeMetrics().AppendEmpty() sm2.Scope().SetName("scope2") sm2.Scope().SetVersion("v2") + metrics2 := sm2.Metrics().AppendEmpty() + metrics2.SetName("test_metric") + metrics2.SetUnit("") - dp3 := sm2.Metrics().AppendEmpty().SetEmptyGauge().DataPoints().AppendEmpty() + dp3 := metrics2.SetEmptyGauge().DataPoints().AppendEmpty() dp3.SetTimestamp(pcommon.Timestamp(3 * int64(time.Millisecond))) dp3.SetDoubleValue(3.0) dp3.Attributes().PutStr("foo", "bar") @@ -273,6 +300,79 @@ func TestTranslateV2(t *testing.T) { }(), expectedStats: remote.WriteResponseStats{}, }, + { + name: "separate timeseries - same labels - should be same datapointslice", + request: &writev2.Request{ + Symbols: []string{ + "", + "__name__", "test_metric", // 1, 2 + "job", "service-x/test", // 3, 4 + "instance", "107cn001", // 5, 6 + "otel_scope_name", "scope1", // 7, 8 + "otel_scope_version", "v1", // 9, 10 + "d", "e", // 11, 12 + "foo", "bar", // 13, 14 + "f", "g", // 15, 16 + "seconds", "milliseconds", // 17, 18 + }, + Timeseries: []writev2.TimeSeries{ + { + Metadata: writev2.Metadata{Type: writev2.Metadata_METRIC_TYPE_GAUGE, UnitRef: 17}, + LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}, + Samples: []writev2.Sample{{Value: 1, Timestamp: 1}}, + }, + { + Metadata: writev2.Metadata{Type: writev2.Metadata_METRIC_TYPE_GAUGE, UnitRef: 17}, + LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}, + Samples: []writev2.Sample{{Value: 2, Timestamp: 2}}, + }, + { + // Unit changed, so it should be a different metric. + Metadata: writev2.Metadata{Type: writev2.Metadata_METRIC_TYPE_GAUGE, UnitRef: 18}, + LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 13, 14}, + Samples: []writev2.Sample{{Value: 3, Timestamp: 3}}, + }, + }, + }, + expectedMetrics: func() pmetric.Metrics { + expected := pmetric.NewMetrics() + rm1 := expected.ResourceMetrics().AppendEmpty() + rmAttributes1 := rm1.Resource().Attributes() + rmAttributes1.PutStr("service.namespace", "service-x") + rmAttributes1.PutStr("service.name", "test") + rmAttributes1.PutStr("service.instance.id", "107cn001") + + sm1 := rm1.ScopeMetrics().AppendEmpty() + sm1.Scope().SetName("scope1") + sm1.Scope().SetVersion("v1") + + // Expected to have 2 metrics and 3 data points. + // The first metric should have 2 data points. + // The second metric should have 1 data point. + metrics1 := sm1.Metrics().AppendEmpty() + metrics1.SetName("test_metric") + metrics1.SetUnit("seconds") + dp1 := metrics1.SetEmptyGauge().DataPoints().AppendEmpty() + dp1.SetTimestamp(pcommon.Timestamp(1 * int64(time.Millisecond))) + dp1.SetDoubleValue(1.0) + dp1.Attributes().PutStr("d", "e") + + dp2 := metrics1.Gauge().DataPoints().AppendEmpty() + dp2.SetTimestamp(pcommon.Timestamp(2 * int64(time.Millisecond))) + dp2.SetDoubleValue(2.0) + dp2.Attributes().PutStr("d", "e") + + metrics2 := sm1.Metrics().AppendEmpty() + metrics2.SetName("test_metric") + metrics2.SetUnit("milliseconds") + dp3 := metrics2.SetEmptyGauge().DataPoints().AppendEmpty() + dp3.SetTimestamp(pcommon.Timestamp(3 * int64(time.Millisecond))) + dp3.SetDoubleValue(3.0) + dp3.Attributes().PutStr("foo", "bar") + + return expected + }(), + }, } { t.Run(tc.name, func(t *testing.T) { metrics, stats, err := prwReceiver.translateV2(ctx, tc.request)