Skip to content

signaltometricsconnector: support gauges #40113

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
May 23, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 39 additions & 1 deletion connector/signaltometricsconnector/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,10 @@ signaltometrics:

`signaltometrics` produces a variety of metric types by utilizing [OTTL](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/ottl/README.md)
to extract the relevant data for a metric type from the incoming data. The
component can produce the following metric types for each signal types:
component can produce the following metric types for each signal type:

- [Sum](https://opentelemetry.io/docs/specs/otel/metrics/data-model/#sums)
- [Gauge](https://opentelemetry.io/docs/specs/otel/metrics/data-model/#gauge)
- [Histogram](https://opentelemetry.io/docs/specs/otel/metrics/data-model/#histogram)
- [Exponential Histogram](https://opentelemetry.io/docs/specs/otel/metrics/data-model/#exponentialhistogram)

Expand All @@ -91,6 +92,43 @@ sum:
[OTTL converters](https://pkg.go.dev/github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/ottlfuncs#readme-converters)
can be used to transform the data.

#### Gauge

Gauge metrics aggregate the last value of a signal and have the following configuration:

```yaml
gauge:
value: <ottl_value_expression>
```

- [**Required**] `value`represents an OTTL expression to extract a numeric value from
the signal. Only OTTL expressions that return a value are accepted. The returned
value determines the value type of the `gauge` metric (`int` or `double`).
- For logs: Use e.g. `ExtractGrokPatterns` with a single key selector (see below).
- For other signals: Use a field such as `value_int`, `value_double`, or a valid OTTL expression.

**Examples:**

_Logs (with Grok pattern):_
```yaml
signaltometrics:
logs:
- name: logs.memory_mb
description: Extract memory_mb from log records
gauge:
value: ExtractGrokPatterns(body, "Memory usage %{NUMBER:memory_mb:int}MB")["memory_mb"]
```

_Traces:_
```yaml
signaltometrics:
spans:
- name: span.duration.gauge
description: Span duration as gauge
gauge:
value: Int(Seconds(end_time - start_time))
```

#### Histogram

Histogram metrics have the following configurations:
Expand Down
32 changes: 32 additions & 0 deletions connector/signaltometricsconnector/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import (
"errors"
"fmt"
"regexp"
"strings"

"github.com/lightstep/go-expohisto/structure"
"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -163,6 +165,10 @@
Value string `mapstructure:"value"`
}

type Gauge struct {
Value string `mapstructure:"value"`
}

// MetricInfo defines the structure of the metric produced by the connector.
type MetricInfo struct {
Name string `mapstructure:"name"`
Expand All @@ -181,6 +187,7 @@
Histogram *Histogram `mapstructure:"histogram"`
ExponentialHistogram *ExponentialHistogram `mapstructure:"exponential_histogram"`
Sum *Sum `mapstructure:"sum"`
Gauge *Gauge `mapstructure:"gauge"`
// prevent unkeyed literal initialization
_ struct{}
}
Expand Down Expand Up @@ -251,6 +258,15 @@
return nil
}

func (mi *MetricInfo) validateGauge() error {
if mi.Gauge != nil {
if mi.Gauge.Value == "" {
return errors.New("value must be defined for gauge metrics")
}
}
return nil
}

// validateMetricInfo is an utility method validate all supported metric
// types defined for the metric info including any ottl expressions.
func validateMetricInfo[K any](mi MetricInfo, parser ottl.Parser[K]) error {
Expand All @@ -266,6 +282,9 @@
if err := mi.validateSum(); err != nil {
return fmt.Errorf("sum validation failed: %w", err)
}
if err := mi.validateGauge(); err != nil {
return fmt.Errorf("gauge validation failed: %w", err)
}

// Exactly one metric should be defined. Also, validate OTTL expressions,
// note that, here we only evaluate if statements are valid. Check for
Expand Down Expand Up @@ -299,6 +318,19 @@
return fmt.Errorf("failed to parse value OTTL expression for summary: %w", err)
}
}
if mi.Gauge != nil {
metricsDefinedCount++
if _, err := parser.ParseValueExpression(mi.Gauge.Value); err != nil {
return fmt.Errorf("failed to parse value OTTL expression for gauge: %w", err)
}
// if ExtractGrokPatterns is used, validate the key selector
if strings.Contains(mi.Gauge.Value, "ExtractGrokPatterns") {
// Ensure a [key] selector is present after ExtractGrokPatterns
if !regexp.MustCompile(`ExtractGrokPatterns\([^)]*\)\s*\[[^\]]+\]`).MatchString(mi.Gauge.Value) {
return fmt.Errorf("ExtractGrokPatterns: a single key selector[key] is required for signal to gauge")

Check failure on line 330 in connector/signaltometricsconnector/config/config.go

View workflow job for this annotation

GitHub Actions / lint-matrix (windows, connector)

use-errors-new: replace fmt.Errorf by errors.New (revive)

Check failure on line 330 in connector/signaltometricsconnector/config/config.go

View workflow job for this annotation

GitHub Actions / lint-matrix (linux, connector)

use-errors-new: replace fmt.Errorf by errors.New (revive)
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check would be true for all configured metric types so maybe in a follow-up PR we can generalize this as a common validation for ExtractGrokPatterns.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not that important though and we can just create an issue and leave it for future too as we might need to think about how to (or if to) handle config validation for all funcs that return map (or other non-primitives)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

created #40118

}
if metricsDefinedCount != 1 {
return fmt.Errorf("exactly one of the metrics must be defined, %d found", metricsDefinedCount)
}
Expand Down
6 changes: 6 additions & 0 deletions connector/signaltometricsconnector/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ func TestConfig(t *testing.T) {
fullErrorForSignal(t, "profiles", "exactly one of the metrics must be defined"),
},
},
{
path: "invalid_grok_type_map",
errorMsgs: []string{
fullErrorForSignal(t, "logs", "ExtractGrokPatterns: a single key selector[key] is required for signal to gauge"),
},
},
{
path: "invalid_ottl_value_expression",
errorMsgs: []string{
Expand Down
3 changes: 3 additions & 0 deletions connector/signaltometricsconnector/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func TestConnectorWithTraces(t *testing.T) {
"histograms",
"exponential_histograms",
"metric_identity",
"gauge",
}

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -71,6 +72,7 @@ func TestConnectorWithMetrics(t *testing.T) {
"sum",
"histograms",
"exponential_histograms",
"gauge",
}

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -104,6 +106,7 @@ func TestConnectorWithLogs(t *testing.T) {
"histograms",
"exponential_histograms",
"metric_identity",
"gauge",
}

ctx, cancel := context.WithCancel(context.Background())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package aggregator // import "github.com/open-telemetry/opentelemetry-collector-
import (
"context"
"fmt"
"strings"
"time"

"go.opentelemetry.io/collector/pdata/pcommon"
Expand All @@ -27,6 +28,7 @@ type Aggregator[K any] struct {
smLookup map[[16]byte]pmetric.ScopeMetrics
valueCounts map[model.MetricKey]map[[16]byte]map[[16]byte]*valueCountDP
sums map[model.MetricKey]map[[16]byte]map[[16]byte]*sumDP
gauges map[model.MetricKey]map[[16]byte]map[[16]byte]*gaugeDP
timestamp time.Time
}

Expand All @@ -37,6 +39,7 @@ func NewAggregator[K any](metrics pmetric.Metrics) *Aggregator[K] {
smLookup: make(map[[16]byte]pmetric.ScopeMetrics),
valueCounts: make(map[model.MetricKey]map[[16]byte]map[[16]byte]*valueCountDP),
sums: make(map[model.MetricKey]map[[16]byte]map[[16]byte]*sumDP),
gauges: make(map[model.MetricKey]map[[16]byte]map[[16]byte]*gaugeDP),
timestamp: time.Now(),
}
}
Expand Down Expand Up @@ -87,6 +90,27 @@ func (a *Aggregator[K]) Aggregate(
v, v,
)
}
case pmetric.MetricTypeGauge:
raw, err := md.Gauge.Value.Eval(ctx, tCtx)
if err != nil {
if strings.Contains(err.Error(), "key not found in map") {
// Gracefully skip missing keys in ExtractGrokPatterns
return nil
}
return fmt.Errorf("failed to execute OTTL value for gauge: %w", err)
}
if raw == nil {
return nil
}
switch v := raw.(type) {
case int64, float64:
return a.aggregateGauge(md, resAttrs, srcAttrs, v)
default:
return fmt.Errorf(
"failed to parse gauge OTTL value of type %T into int64 or float64: %v",
v, v,
)
}
}
return nil
}
Expand Down Expand Up @@ -145,11 +169,27 @@ func (a *Aggregator[K]) Finalize(mds []model.MetricDef[K]) {
dp.Copy(a.timestamp, destCounter.DataPoints().AppendEmpty())
}
}
for resID, dpMap := range a.gauges[md.Key] {
if md.Gauge == nil {
continue
}
metrics := a.smLookup[resID].Metrics()
destMetric := metrics.AppendEmpty()
destMetric.SetName(md.Key.Name)
destMetric.SetUnit(md.Key.Unit)
destMetric.SetDescription(md.Key.Description)
destGauge := destMetric.SetEmptyGauge()
destGauge.DataPoints().EnsureCapacity(len(dpMap))
for _, dp := range dpMap {
dp.Copy(a.timestamp, destGauge.DataPoints().AppendEmpty())
}
}
// If there are two metric defined with the same key required by metricKey
// then they will be aggregated within the same metric and produced
// together. Deleting the key ensures this while preventing duplicates.
delete(a.valueCounts, md.Key)
delete(a.sums, md.Key)
delete(a.gauges, md.Key)
}
}

Expand Down Expand Up @@ -193,6 +233,26 @@ func (a *Aggregator[K]) aggregateDouble(
return nil
}

func (a *Aggregator[K]) aggregateGauge(
md model.MetricDef[K],
resAttrs, srcAttrs pcommon.Map,
v any,
) error {
resID := a.getResourceID(resAttrs)
attrID := pdatautil.MapHash(srcAttrs)
if _, ok := a.gauges[md.Key]; !ok {
a.gauges[md.Key] = make(map[[16]byte]map[[16]byte]*gaugeDP)
}
if _, ok := a.gauges[md.Key][resID]; !ok {
a.gauges[md.Key][resID] = make(map[[16]byte]*gaugeDP)
}
if _, ok := a.gauges[md.Key][resID][attrID]; !ok {
a.gauges[md.Key][resID][attrID] = newGaugeDP(srcAttrs)
}
a.gauges[md.Key][resID][attrID].Aggregate(v)
return nil
}

func (a *Aggregator[K]) aggregateValueCount(
md model.MetricDef[K],
resAttrs, srcAttrs pcommon.Map,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package aggregator // import "github.com/open-telemetry/opentelemetry-collector-contrib/connector/signaltometricsconnector/internal/aggregator"

import (
"time"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
)

// gaugeDP is a data point for gauge metrics.
type gaugeDP struct {
attrs pcommon.Map
val any
}

func newGaugeDP(attrs pcommon.Map) *gaugeDP {
return &gaugeDP{
attrs: attrs,
}
}

func (dp *gaugeDP) Aggregate(v any) {
switch v := v.(type) {
case float64, int64:
dp.val = v
default:
panic("unexpected usage of gauge datapoint, only double or int value expected")
}
}

// Copy copies the gauge data point to the destination number data point
func (dp *gaugeDP) Copy(
timestamp time.Time,
dest pmetric.NumberDataPoint,
) {
dp.attrs.CopyTo(dest.Attributes())
switch dp.val.(type) {

Check failure on line 40 in connector/signaltometricsconnector/internal/aggregator/gaugedp.go

View workflow job for this annotation

GitHub Actions / lint-matrix (windows, connector)

typeSwitchVar: 2 cases can benefit from type switch with assignment (gocritic)

Check failure on line 40 in connector/signaltometricsconnector/internal/aggregator/gaugedp.go

View workflow job for this annotation

GitHub Actions / lint-matrix (linux, connector)

typeSwitchVar: 2 cases can benefit from type switch with assignment (gocritic)
case float64:
dest.SetDoubleValue(dp.val.(float64))
case int64:
dest.SetIntValue(dp.val.(int64))
}
// TODO determine appropriate start time
dest.SetTimestamp(pcommon.NewTimestampFromTime(timestamp))
}
28 changes: 28 additions & 0 deletions connector/signaltometricsconnector/internal/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,26 @@ func (s *Sum[K]) fromConfig(
return nil
}

type Gauge[K any] struct {
Value *ottl.ValueExpression[K]
}

func (s *Gauge[K]) fromConfig(
mi *config.Gauge,
parser ottl.Parser[K],
) error {
if mi == nil {
return nil
}

var err error
s.Value, err = parser.ParseValueExpression(mi.Value)
if err != nil {
return fmt.Errorf("failed to parse value OTTL expression for gauge: %w", err)
}
return nil
}

type MetricDef[K any] struct {
Key MetricKey
IncludeResourceAttributes []AttributeKeyValue
Expand All @@ -114,6 +134,7 @@ type MetricDef[K any] struct {
ExponentialHistogram *ExponentialHistogram[K]
ExplicitHistogram *ExplicitHistogram[K]
Sum *Sum[K]
Gauge *Gauge[K]
}

func (md *MetricDef[K]) FromMetricInfo(
Expand Down Expand Up @@ -167,6 +188,13 @@ func (md *MetricDef[K]) FromMetricInfo(
return fmt.Errorf("failed to parse sum config: %w", err)
}
}
if mi.Gauge != nil {
md.Key.Type = pmetric.MetricTypeGauge
md.Gauge = new(Gauge[K])
if err := md.Gauge.fromConfig(mi.Gauge, parser); err != nil {
return fmt.Errorf("failed to parse gauge config: %w", err)
}
}
return nil
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
signaltometrics:
logs:
- name: logs.memory_mb
description: Extract memory_mb from log records
gauge:
value: ExtractGrokPatterns(body, "Memory usage %{NUMBER:memory_mb:int}MB")
Loading
Loading