Skip to content

signaltometricsconnector: support gauges #40113

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
May 23, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion connector/signaltometricsconnector/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,10 @@ signaltometrics:

`signaltometrics` produces a variety of metric types by utilizing [OTTL](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/ottl/README.md)
to extract the relevant data for a metric type from the incoming data. The
component can produce the following metric types for each signal types:
component can produce the following metric types for each signal type:

- [Sum](https://opentelemetry.io/docs/specs/otel/metrics/data-model/#sums)
- [Gauge](#gauge)
- [Histogram](https://opentelemetry.io/docs/specs/otel/metrics/data-model/#histogram)
- [Exponential Histogram](https://opentelemetry.io/docs/specs/otel/metrics/data-model/#exponentialhistogram)

Expand All @@ -91,6 +92,31 @@ sum:
[OTTL converters](https://pkg.go.dev/github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/ottlfuncs#readme-converters)
can be used to transform the data.

#### Gauge

Gauge metrics are supported for logs and allow extracting numeric values from log bodies using Grok patterns via the `ExtractGrokPatterns` OTTL function. The gauges are aggregated by last value.

Gauge metrics have the following configuration:

```yaml
gauge:
value: ExtractGrokPatterns(target, pattern)
```

- [**Required**] `value`: An OTTL expression that must use `ExtractGrokPatterns` to extract a value from the log body. The Grok pattern must:
- Contain **exactly one** Grok pattern (e.g., `%{NUMBER:memory_mb:int}`)
- Use `int`,`long`,`double` or `float` as the data type for the extracted value.

**Example:**
```yaml
signaltometrics:
logs:
- name: logs.memory_mb
description: Extract memory_mb from log records
gauge:
value: ExtractGrokPatterns(body, "Memory usage %{NUMBER:memory_mb:int}MB")
```

#### Histogram

Histogram metrics have the following configurations:
Expand Down
50 changes: 50 additions & 0 deletions connector/signaltometricsconnector/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ package config // import "github.com/open-telemetry/opentelemetry-collector-cont
import (
"errors"
"fmt"
"regexp"
"slices"
"strings"

"github.com/lightstep/go-expohisto/structure"
"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -163,6 +166,10 @@ type Sum struct {
Value string `mapstructure:"value"`
}

type Gauge struct {
Value string `mapstructure:"value"`
}

// MetricInfo defines the structure of the metric produced by the connector.
type MetricInfo struct {
Name string `mapstructure:"name"`
Expand All @@ -181,6 +188,7 @@ type MetricInfo struct {
Histogram *Histogram `mapstructure:"histogram"`
ExponentialHistogram *ExponentialHistogram `mapstructure:"exponential_histogram"`
Sum *Sum `mapstructure:"sum"`
Gauge *Gauge `mapstructure:"gauge"`
// prevent unkeyed literal initialization
_ struct{}
}
Expand Down Expand Up @@ -251,6 +259,15 @@ func (mi *MetricInfo) validateSum() error {
return nil
}

func (mi *MetricInfo) validateGauge() error {
if mi.Gauge != nil {
if mi.Gauge.Value == "" {
return errors.New("value must be defined for gauge metrics")
}
}
return nil
}

// validateMetricInfo is an utility method validate all supported metric
// types defined for the metric info including any ottl expressions.
func validateMetricInfo[K any](mi MetricInfo, parser ottl.Parser[K]) error {
Expand All @@ -266,6 +283,9 @@ func validateMetricInfo[K any](mi MetricInfo, parser ottl.Parser[K]) error {
if err := mi.validateSum(); err != nil {
return fmt.Errorf("sum validation failed: %w", err)
}
if err := mi.validateGauge(); err != nil {
return fmt.Errorf("gauge validation failed: %w", err)
}

// Exactly one metric should be defined. Also, validate OTTL expressions,
// note that, here we only evaluate if statements are valid. Check for
Expand Down Expand Up @@ -299,6 +319,36 @@ func validateMetricInfo[K any](mi MetricInfo, parser ottl.Parser[K]) error {
return fmt.Errorf("failed to parse value OTTL expression for summary: %w", err)
}
}
if mi.Gauge != nil {
metricsDefinedCount++
if _, err := parser.ParseValueExpression(mi.Gauge.Value); err != nil {
return fmt.Errorf("failed to parse value OTTL expression for gauge: %w", err)
}
// Custom validation: fail if ExtractGrokPatterns is used with multiple patterns or configured with unsupported type
if strings.Contains(mi.Gauge.Value, "ExtractGrokPatterns") {
// Count the number of grok patterns
count := strings.Count(mi.Gauge.Value, "%{")
if count > 1 {
return fmt.Errorf("ExtractGrokPatterns: only exactly one grok pattern is supported for logs to gauge, found %d", count)
}
// Validate grok pattern types
// Example: %{NUMBER:foo:int} or %{NUMBER:bar:float}
grokPatternRe := regexp.MustCompile(`%\{[^}]+\}`)
matches := grokPatternRe.FindAllString(mi.Gauge.Value, -1)
supportedTypes := []string{"int", "float", "double", "long"}
for _, p := range matches {
// Remove %{ and }
inner := p[2 : len(p)-1]
parts := strings.Split(inner, ":")
if len(parts) >= 3 {
typePart := parts[2]
if !slices.Contains(supportedTypes, typePart) {
return fmt.Errorf("ExtractGrokPatterns: only int, float, double, and long types are supported for logs to gauge, found '%s' in pattern '%s'", typePart, p)
}
}
}
}
}
if metricsDefinedCount != 1 {
return fmt.Errorf("exactly one of the metrics must be defined, %d found", metricsDefinedCount)
}
Expand Down
12 changes: 12 additions & 0 deletions connector/signaltometricsconnector/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,18 @@ func TestConfig(t *testing.T) {
fullErrorForSignal(t, "profiles", "exactly one of the metrics must be defined"),
},
},
{
path: "multiple_grok_patterns",
errorMsgs: []string{
fullErrorForSignal(t, "logs", "ExtractGrokPatterns: only exactly one grok pattern is supported for logs to gauge"),
},
},
{
path: "invalid_grok_type",
errorMsgs: []string{
fullErrorForSignal(t, "logs", "ExtractGrokPatterns: only int, float, double, and long types are supported for logs to gauge"),
},
},
{
path: "invalid_ottl_value_expression",
errorMsgs: []string{
Expand Down
1 change: 1 addition & 0 deletions connector/signaltometricsconnector/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func TestConnectorWithLogs(t *testing.T) {
"histograms",
"exponential_histograms",
"metric_identity",
"gauge",
}

ctx, cancel := context.WithCancel(context.Background())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type Aggregator[K any] struct {
smLookup map[[16]byte]pmetric.ScopeMetrics
valueCounts map[model.MetricKey]map[[16]byte]map[[16]byte]*valueCountDP
sums map[model.MetricKey]map[[16]byte]map[[16]byte]*sumDP
gauges map[model.MetricKey]map[[16]byte]map[[16]byte]*gaugeDP
timestamp time.Time
}

Expand All @@ -37,6 +38,7 @@ func NewAggregator[K any](metrics pmetric.Metrics) *Aggregator[K] {
smLookup: make(map[[16]byte]pmetric.ScopeMetrics),
valueCounts: make(map[model.MetricKey]map[[16]byte]map[[16]byte]*valueCountDP),
sums: make(map[model.MetricKey]map[[16]byte]map[[16]byte]*sumDP),
gauges: make(map[model.MetricKey]map[[16]byte]map[[16]byte]*gaugeDP),
timestamp: time.Now(),
}
}
Expand Down Expand Up @@ -87,6 +89,28 @@ func (a *Aggregator[K]) Aggregate(
v, v,
)
}
case pmetric.MetricTypeGauge:
// aggregated gauges are currently supported when used with the ExtractGrokPatterns OTTL function, where the result is a pcommon.Map with named capture groups
raw, err := md.Gauge.Value.Eval(ctx, tCtx)
if err != nil {
return fmt.Errorf("failed to execute OTTL value for gauge: %w", err)
}
v, ok := raw.(pcommon.Map)
if !ok {
return fmt.Errorf("failed to parse gauge OTTL value of type %T into pcommon.Map: %v", v, v)
}
v.Range(func(k string, v pcommon.Value) bool {
// Check if the value is a numeric type
switch v.Type() {
case pcommon.ValueTypeInt, pcommon.ValueTypeDouble:
return a.aggregateGauge(md, resAttrs, srcAttrs, v) == nil
default:
return true
}
})
if err != nil {
return err
}
}
return nil
}
Expand Down Expand Up @@ -145,11 +169,27 @@ func (a *Aggregator[K]) Finalize(mds []model.MetricDef[K]) {
dp.Copy(a.timestamp, destCounter.DataPoints().AppendEmpty())
}
}
if md.Gauge == nil {
continue
}
for resID, dpMap := range a.gauges[md.Key] {
metrics := a.smLookup[resID].Metrics()
destMetric := metrics.AppendEmpty()
destMetric.SetName(md.Key.Name)
destMetric.SetUnit(md.Key.Unit)
destMetric.SetDescription(md.Key.Description)
destGauge := destMetric.SetEmptyGauge()
destGauge.DataPoints().EnsureCapacity(len(dpMap))
for _, dp := range dpMap {
dp.Copy(a.timestamp, destGauge.DataPoints().AppendEmpty())
}
}
// If there are two metric defined with the same key required by metricKey
// then they will be aggregated within the same metric and produced
// together. Deleting the key ensures this while preventing duplicates.
delete(a.valueCounts, md.Key)
delete(a.sums, md.Key)
delete(a.gauges, md.Key)
}
}

Expand Down Expand Up @@ -193,6 +233,26 @@ func (a *Aggregator[K]) aggregateDouble(
return nil
}

func (a *Aggregator[K]) aggregateGauge(
md model.MetricDef[K],
resAttrs, srcAttrs pcommon.Map,
v pcommon.Value,
) error {
resID := a.getResourceID(resAttrs)
attrID := pdatautil.MapHash(srcAttrs)
if _, ok := a.gauges[md.Key]; !ok {
a.gauges[md.Key] = make(map[[16]byte]map[[16]byte]*gaugeDP)
}
if _, ok := a.gauges[md.Key][resID]; !ok {
a.gauges[md.Key][resID] = make(map[[16]byte]*gaugeDP)
}
if _, ok := a.gauges[md.Key][resID][attrID]; !ok {
a.gauges[md.Key][resID][attrID] = newGaugeDP(srcAttrs)
}
a.gauges[md.Key][resID][attrID].Aggregate(v)
return nil
}

func (a *Aggregator[K]) aggregateValueCount(
md model.MetricDef[K],
resAttrs, srcAttrs pcommon.Map,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package aggregator // import "github.com/open-telemetry/opentelemetry-collector-contrib/connector/signaltometricsconnector/internal/aggregator"

import (
"time"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
)

// gaugeDP is a data point for gauge metrics.
type gaugeDP struct {
attrs pcommon.Map
val pcommon.Value
}

func newGaugeDP(attrs pcommon.Map) *gaugeDP {
return &gaugeDP{
attrs: attrs,
}
}

func (dp *gaugeDP) Aggregate(v pcommon.Value) {
switch v.Type() {
case pcommon.ValueTypeDouble, pcommon.ValueTypeInt:
dp.val = v
default:
panic("unexpected usage of gauge datapoint, only double or int value expected")
}
}

// Copy copies the gauge data point to the destination number data point
func (dp *gaugeDP) Copy(
timestamp time.Time,
dest pmetric.NumberDataPoint,
) {
dp.attrs.CopyTo(dest.Attributes())
switch dp.val.Type() {
case pcommon.ValueTypeDouble:
dest.SetDoubleValue(dp.val.Double())
case pcommon.ValueTypeInt:
dest.SetIntValue(dp.val.Int())
}
// TODO determine appropriate start time
dest.SetTimestamp(pcommon.NewTimestampFromTime(timestamp))
}
28 changes: 28 additions & 0 deletions connector/signaltometricsconnector/internal/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,26 @@ func (s *Sum[K]) fromConfig(
return nil
}

type Gauge[K any] struct {
Value *ottl.ValueExpression[K]
}

func (s *Gauge[K]) fromConfig(
mi *config.Gauge,
parser ottl.Parser[K],
) error {
if mi == nil {
return nil
}

var err error
s.Value, err = parser.ParseValueExpression(mi.Value)
if err != nil {
return fmt.Errorf("failed to parse value OTTL expression for gauge: %w", err)
}
return nil
}

type MetricDef[K any] struct {
Key MetricKey
IncludeResourceAttributes []AttributeKeyValue
Expand All @@ -114,6 +134,7 @@ type MetricDef[K any] struct {
ExponentialHistogram *ExponentialHistogram[K]
ExplicitHistogram *ExplicitHistogram[K]
Sum *Sum[K]
Gauge *Gauge[K]
}

func (md *MetricDef[K]) FromMetricInfo(
Expand Down Expand Up @@ -167,6 +188,13 @@ func (md *MetricDef[K]) FromMetricInfo(
return fmt.Errorf("failed to parse sum config: %w", err)
}
}
if mi.Gauge != nil {
md.Key.Type = pmetric.MetricTypeGauge
md.Gauge = new(Gauge[K])
if err := md.Gauge.fromConfig(mi.Gauge, parser); err != nil {
return fmt.Errorf("failed to parse gauge config: %w", err)
}
}
return nil
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
signaltometrics:
logs:
- name: logs.invalid_grok_type
description: Invalid grok type in pattern
gauge:
value: ExtractGrokPatterns(body, "Memory usage %{NUMBER:memory_mb:string}MB")
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
signaltometrics:
logs:
- name: logs.memory_mb
description: Extract memory_mb from log records
gauge:
value: ExtractGrokPatterns(body, "Memory usage %{NUMBER:memory_mb:int}MB CPU usage %{NUMBER:cpu:float}")
Loading