diff --git a/.chloggen/add-scale-function.yaml b/.chloggen/add-scale-function.yaml new file mode 100644 index 0000000000000..202fecf428832 --- /dev/null +++ b/.chloggen/add-scale-function.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: processor/transform + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add `scale_metric` function that scales all data points in a metric. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [16214] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/processor/transformprocessor/README.md b/processor/transformprocessor/README.md index daab05325aec3..f823e018c2bbf 100644 --- a/processor/transformprocessor/README.md +++ b/processor/transformprocessor/README.md @@ -212,6 +212,7 @@ In addition to OTTL functions, the processor defines its own functions to help w - [convert_summary_count_val_to_sum](#convert_summary_count_val_to_sum) - [convert_summary_sum_val_to_sum](#convert_summary_sum_val_to_sum) - [copy_metric](#copy_metric) +- [scale_metric](#scale_metric) ### convert_sum_to_gauge @@ -347,6 +348,21 @@ Examples: - `copy_metric(desc="new desc") where description == "old desc"` +### scale_metric + +`scale_metric(factor, Optional[unit])` + +The `scale_metric` function multiplies the values in the data points in the metric by the float value `factor`. +If the optional string `unit` is provided, the metric's unit will be set to this value. +The supported data types are: + +Supported metric types are `Gauge`, `Sum`, `Histogram`, and `Summary`. + +Examples: + +- `scale_metric(0.1)`: Scale the metric by a factor of `0.1`. The unit of the metric will not be modified. +- `scale_metric(10.0, "kWh")`: Scale the metric by a factor of `10.0` and sets the unit to `kWh`. + ## Examples ### Perform transformation if field does not exist diff --git a/processor/transformprocessor/internal/metrics/func_scale.go b/processor/transformprocessor/internal/metrics/func_scale.go new file mode 100644 index 0000000000000..60ad4305db9bf --- /dev/null +++ b/processor/transformprocessor/internal/metrics/func_scale.go @@ -0,0 +1,130 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package metrics // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/metrics" + +import ( + "context" + "errors" + "fmt" + + "go.opentelemetry.io/collector/pdata/pmetric" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlmetric" +) + +type ScaleArguments struct { + Multiplier float64 + Unit ottl.Optional[ottl.StringGetter[ottlmetric.TransformContext]] +} + +func newScaleMetricFactory() ottl.Factory[ottlmetric.TransformContext] { + return ottl.NewFactory("scale_metric", &ScaleArguments{}, createScaleFunction) +} + +func createScaleFunction(_ ottl.FunctionContext, oArgs ottl.Arguments) (ottl.ExprFunc[ottlmetric.TransformContext], error) { + args, ok := oArgs.(*ScaleArguments) + + if !ok { + return nil, fmt.Errorf("ScaleFactory args must be of type *ScaleArguments[K]") + } + + return Scale(*args) +} + +func Scale(args ScaleArguments) (ottl.ExprFunc[ottlmetric.TransformContext], error) { + return func(ctx context.Context, tCtx ottlmetric.TransformContext) (any, error) { + metric := tCtx.GetMetric() + + var unit *string + if !args.Unit.IsEmpty() { + u, err := args.Unit.Get().Get(ctx, tCtx) + if err != nil { + return nil, fmt.Errorf("could not get unit from ScaleArguments: %w", err) + } + unit = &u + } + + switch metric.Type() { + case pmetric.MetricTypeGauge: + scaleMetric(metric.Gauge().DataPoints(), args.Multiplier) + case pmetric.MetricTypeHistogram: + scaleHistogram(metric.Histogram().DataPoints(), args.Multiplier) + case pmetric.MetricTypeSummary: + scaleSummarySlice(metric.Summary().DataPoints(), args.Multiplier) + case pmetric.MetricTypeSum: + scaleMetric(metric.Sum().DataPoints(), args.Multiplier) + case pmetric.MetricTypeExponentialHistogram: + return nil, errors.New("exponential histograms are not supported by the 'scale_metric' function") + default: + return nil, fmt.Errorf("unsupported metric type: '%v'", metric.Type()) + } + if unit != nil { + metric.SetUnit(*unit) + } + + return nil, nil + }, nil +} + +func scaleExemplar(ex *pmetric.Exemplar, multiplier float64) { + switch ex.ValueType() { + case pmetric.ExemplarValueTypeInt: + ex.SetIntValue(int64(float64(ex.IntValue()) * multiplier)) + case pmetric.ExemplarValueTypeDouble: + ex.SetDoubleValue(ex.DoubleValue() * multiplier) + } +} + +func scaleSummarySlice(values pmetric.SummaryDataPointSlice, multiplier float64) { + for i := 0; i < values.Len(); i++ { + dp := values.At(i) + + dp.SetSum(dp.Sum() * multiplier) + + for i := 0; i < dp.QuantileValues().Len(); i++ { + qv := dp.QuantileValues().At(i) + qv.SetValue(qv.Value() * multiplier) + } + } +} + +func scaleHistogram(datapoints pmetric.HistogramDataPointSlice, multiplier float64) { + for i := 0; i < datapoints.Len(); i++ { + dp := datapoints.At(i) + + if dp.HasSum() { + dp.SetSum(dp.Sum() * multiplier) + } + if dp.HasMin() { + dp.SetMin(dp.Min() * multiplier) + } + if dp.HasMax() { + dp.SetMax(dp.Max() * multiplier) + } + + for bounds, bi := dp.ExplicitBounds(), 0; bi < bounds.Len(); bi++ { + bounds.SetAt(bi, bounds.At(bi)*multiplier) + } + + for exemplars, ei := dp.Exemplars(), 0; ei < exemplars.Len(); ei++ { + exemplar := exemplars.At(ei) + scaleExemplar(&exemplar, multiplier) + } + } +} + +func scaleMetric(points pmetric.NumberDataPointSlice, multiplier float64) { + for i := 0; i < points.Len(); i++ { + dp := points.At(i) + switch dp.ValueType() { + case pmetric.NumberDataPointValueTypeInt: + dp.SetIntValue(int64(float64(dp.IntValue()) * multiplier)) + + case pmetric.NumberDataPointValueTypeDouble: + dp.SetDoubleValue(dp.DoubleValue() * multiplier) + default: + } + } +} diff --git a/processor/transformprocessor/internal/metrics/func_scale_test.go b/processor/transformprocessor/internal/metrics/func_scale_test.go new file mode 100644 index 0000000000000..29441bb047fbf --- /dev/null +++ b/processor/transformprocessor/internal/metrics/func_scale_test.go @@ -0,0 +1,204 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package metrics + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlmetric" +) + +func TestScale(t *testing.T) { + type testCase struct { + name string + args ScaleArguments + valueFunc func() pmetric.Metric + wantFunc func() pmetric.Metric + wantErr bool + } + tests := []testCase{ + { + name: "scale gauge float metric", + valueFunc: func() pmetric.Metric { + metric := pmetric.NewMetric() + metric.SetName("test-metric") + metric.SetEmptyGauge() + metric.Gauge().DataPoints().AppendEmpty().SetDoubleValue(10.0) + + return metric + }, + args: ScaleArguments{ + Multiplier: 10.0, + Unit: ottl.NewTestingOptional[ottl.StringGetter[ottlmetric.TransformContext]](ottl.StandardStringGetter[ottlmetric.TransformContext]{ + Getter: func(_ context.Context, _ ottlmetric.TransformContext) (any, error) { + return "kWh", nil + }, + }), + }, + wantFunc: func() pmetric.Metric { + metric := pmetric.NewMetric() + metric.SetName("test-metric") + metric.SetEmptyGauge() + metric.SetUnit("kWh") + metric.Gauge().DataPoints().AppendEmpty().SetDoubleValue(100.0) + + return metric + }, + wantErr: false, + }, + { + name: "scale gauge int metric", + valueFunc: func() pmetric.Metric { + metric := pmetric.NewMetric() + metric.SetName("test-metric") + metric.SetEmptyGauge() + metric.Gauge().DataPoints().AppendEmpty().SetIntValue(10) + + return metric + }, + args: ScaleArguments{ + Multiplier: 10.0, + }, + wantFunc: func() pmetric.Metric { + metric := pmetric.NewMetric() + metric.SetName("test-metric") + metric.SetEmptyGauge() + metric.Gauge().DataPoints().AppendEmpty().SetIntValue(100.0) + + return metric + }, + wantErr: false, + }, + { + name: "scale sum metric", + valueFunc: func() pmetric.Metric { + metric := pmetric.NewMetric() + metric.SetName("test-metric") + metric.SetEmptySum() + metric.Sum().DataPoints().AppendEmpty().SetDoubleValue(10.0) + + return metric + }, + args: ScaleArguments{ + Multiplier: 10.0, + }, + wantFunc: func() pmetric.Metric { + metric := pmetric.NewMetric() + metric.SetName("test-metric") + metric.SetEmptySum() + metric.Sum().DataPoints().AppendEmpty().SetDoubleValue(100.0) + + return metric + }, + wantErr: false, + }, + { + name: "scale histogram metric", + valueFunc: func() pmetric.Metric { + metric := getTestScalingHistogramMetric(1, 4, 1, 3, []float64{1, 10}, []uint64{1, 2}, []float64{1.0}, 1, 1) + return metric + }, + args: ScaleArguments{ + Multiplier: 10.0, + }, + wantFunc: func() pmetric.Metric { + metric := getTestScalingHistogramMetric(1, 40, 10, 30, []float64{10, 100}, []uint64{1, 2}, []float64{10.0}, 1, 1) + return metric + }, + wantErr: false, + }, + { + name: "scale summary metric", + valueFunc: func() pmetric.Metric { + metric := pmetric.NewMetric() + dp := metric.SetEmptySummary().DataPoints().AppendEmpty() + dp.SetSum(10.0) + qv := dp.QuantileValues().AppendEmpty() + qv.SetValue(10.0) + + return metric + }, + args: ScaleArguments{ + Multiplier: 10.0, + }, + wantFunc: func() pmetric.Metric { + metric := pmetric.NewMetric() + dp := metric.SetEmptySummary().DataPoints().AppendEmpty() + dp.SetSum(100.0) + qv := dp.QuantileValues().AppendEmpty() + qv.SetValue(100.0) + + return metric + }, + wantErr: false, + }, + { + name: "unsupported: exponential histogram metric", + valueFunc: func() pmetric.Metric { + metric := pmetric.NewMetric() + metric.SetEmptyExponentialHistogram() + return metric + }, + args: ScaleArguments{ + Multiplier: 10.0, + }, + wantFunc: func() pmetric.Metric { + // value should not be modified + metric := pmetric.NewMetric() + metric.SetEmptyExponentialHistogram() + return metric + }, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + target := ottlmetric.NewTransformContext( + tt.valueFunc(), + pmetric.NewMetricSlice(), + pcommon.NewInstrumentationScope(), + pcommon.NewResource(), + pmetric.NewScopeMetrics(), + pmetric.NewResourceMetrics(), + ) + + expressionFunc, _ := Scale(tt.args) + _, err := expressionFunc(context.Background(), target) + + if tt.wantErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + assert.EqualValues(t, tt.wantFunc(), target.GetMetric()) + }) + } +} + +func getTestScalingHistogramMetric(count uint64, sum, min, max float64, bounds []float64, bucketCounts []uint64, exemplars []float64, start, timestamp pcommon.Timestamp) pmetric.Metric { + metric := pmetric.NewMetric() + metric.SetName("test-metric") + metric.SetEmptyHistogram() + histogramDatapoint := metric.Histogram().DataPoints().AppendEmpty() + histogramDatapoint.SetCount(count) + histogramDatapoint.SetSum(sum) + histogramDatapoint.SetMin(min) + histogramDatapoint.SetMax(max) + histogramDatapoint.ExplicitBounds().FromRaw(bounds) + histogramDatapoint.BucketCounts().FromRaw(bucketCounts) + for i := 0; i < len(exemplars); i++ { + exemplar := histogramDatapoint.Exemplars().AppendEmpty() + exemplar.SetTimestamp(1) + exemplar.SetDoubleValue(exemplars[i]) + } + histogramDatapoint.SetStartTimestamp(start) + histogramDatapoint.SetTimestamp(timestamp) + return metric +} diff --git a/processor/transformprocessor/internal/metrics/functions.go b/processor/transformprocessor/internal/metrics/functions.go index 162e208c362ae..18c7b460f198b 100644 --- a/processor/transformprocessor/internal/metrics/functions.go +++ b/processor/transformprocessor/internal/metrics/functions.go @@ -49,6 +49,7 @@ func MetricFunctions() map[string]ottl.Factory[ottlmetric.TransformContext] { newExtractSumMetricFactory(), newExtractCountMetricFactory(), newCopyMetricFactory(), + newScaleMetricFactory(), ) if useConvertBetweenSumAndGaugeMetricContext.IsEnabled() { diff --git a/processor/transformprocessor/internal/metrics/functions_test.go b/processor/transformprocessor/internal/metrics/functions_test.go index ec8b09743c7f9..60ef54812c913 100644 --- a/processor/transformprocessor/internal/metrics/functions_test.go +++ b/processor/transformprocessor/internal/metrics/functions_test.go @@ -37,6 +37,7 @@ func Test_MetricFunctions(t *testing.T) { expected["extract_sum_metric"] = newExtractSumMetricFactory() expected["extract_count_metric"] = newExtractCountMetricFactory() expected["copy_metric"] = newCopyMetricFactory() + expected["scale_metric"] = newScaleMetricFactory() defer testutil.SetFeatureGateForTest(t, useConvertBetweenSumAndGaugeMetricContext, true)() actual := MetricFunctions() diff --git a/processor/transformprocessor/internal/metrics/processor_test.go b/processor/transformprocessor/internal/metrics/processor_test.go index 03f0b31de2a5f..69c250aea76ae 100644 --- a/processor/transformprocessor/internal/metrics/processor_test.go +++ b/processor/transformprocessor/internal/metrics/processor_test.go @@ -200,6 +200,21 @@ func Test_ProcessMetrics_MetricContext(t *testing.T) { newMetric.SetUnit("s") }, }, + { + statements: []string{`scale_metric(10.0,"s") where name == "operationA"`}, + want: func(td pmetric.Metrics) { + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(0).SetDoubleValue(10.0) + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(1).SetDoubleValue(37.0) + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).SetUnit("s") + }, + }, + { + statements: []string{`scale_metric(10.0) where name == "operationA"`}, + want: func(td pmetric.Metrics) { + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(0).SetDoubleValue(10.0) + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(1).SetDoubleValue(37.0) + }, + }, } for _, tt := range tests {