Skip to content

Commit 763713b

Browse files
authored
[statsdreceiver]add aggregation for StatsD receiver (#1670)
* add aggregation for StatsD receiver, support metrics types: gauge and counter * revise readme for StatsD receiver * Use Distinct in OTel-Go SDK as the description field for statsDMetric
1 parent a35b1fa commit 763713b

File tree

15 files changed

+1088
-310
lines changed

15 files changed

+1088
-310
lines changed

receiver/statsdreceiver/README.md

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# StatsD Receiver
22

3-
StatsD receiver for ingesting StatsD messages into the OpenTelemetry Collector.
3+
StatsD receiver for ingesting StatsD messages(https://github.com/statsd/statsd/blob/master/docs/metric_types.md) into the OpenTelemetry Collector.
44

55
Supported pipeline types: metrics
66

@@ -12,27 +12,46 @@ The following settings are required:
1212

1313
- `endpoint` (default = `localhost:8125`): Address and port to listen on.
1414

15+
16+
The Following settings are optional:
17+
18+
- `aggregation_interval: 70s`(default value is 60s): The aggregation time that the receiver aggregates the metrics (similar to the flush interval in StatsD server)
19+
1520
Example:
1621

1722
```yaml
1823
receivers:
1924
statsd:
2025
statsd/2:
2126
endpoint: "localhost:8127"
27+
aggregation_interval: 70s
2228
```
2329
2430
The full list of settings exposed for this receiver are documented [here](./config.go)
2531
with detailed sample configurations [here](./testdata/config.yaml).
2632
2733
## Aggregation
2834
29-
Currently the `statsdreceiver` is not providing any aggregation. There are
30-
ideas such as the [Metrics Transform Processor
31-
Proposal](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/332)
32-
that intend to enable control over Metric aggregation in a processor.
33-
34-
An alternative will be to implement some simple aggregation in this receiver.
35-
35+
Aggregation is done in statsD receiver. The default aggregation interval is 60s. The receiver only aggregates the metrics with the same metric name, metric type, label keys and label values. After each aggregation interval, the receiver will send all metrics (after aggregation) in this aggregation interval to the following workflow.
36+
37+
It supports:
38+
39+
Gauge(transferred to double):
40+
- statsdTestMetric1:500|g|#mykey:myvalue
41+
statsdTestMetric1:400|g|#mykey:myvalue
42+
(get the latest value: 400)
43+
- statsdTestMetric1:500|g|#mykey:myvalue
44+
statsdTestMetric1:+2|g|#mykey:myvalue
45+
statsdTestMetric1:-1|g|#mykey:myvalue
46+
(get the value after calculation: 501)
47+
48+
Counter(transferred to int):
49+
- statsdTestMetric1:3000|c|#mykey:myvalue
50+
statsdTestMetric1:4000|c|#mykey:myvalue
51+
(get the value after incrementation: 7000)
52+
- statsdTestMetric1:3000|c|#mykey:myvalue
53+
statsdTestMetric1:20|c|@0.8|#mykey:myvalue
54+
(get the value after incrementation with sample rate: 3000+20/0.8=3025)
3655
## Metrics
3756
3857
General format is:
@@ -43,14 +62,15 @@ General format is:
4362

4463
`<name>:<value>|c|@<sample-rate>|#<tag1-key>:<tag1-value>`
4564

65+
it supports sample rate
66+
4667
### Gauge
4768

4869
`<name>:<value>|g|@<sample-rate>|#<tag1-key>:<tag1-value>`
4970

5071
### Timer
5172

52-
`<name>:<value>|ms|@<sample-rate>|#<tag1-key>:<tag1-value>`
53-
73+
TODO: add support for timer
5474

5575
## Testing
5676

@@ -60,6 +80,7 @@ General format is:
6080
receivers:
6181
statsd:
6282
endpoint: "localhost:8125" # default
83+
aggregation_interval: 60s # default
6384
6485
exporters:
6586
file:

receiver/statsdreceiver/config.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
package statsdreceiver
1616

1717
import (
18+
"time"
19+
1820
"go.opentelemetry.io/collector/config/configmodels"
1921
"go.opentelemetry.io/collector/config/confignet"
2022
)
@@ -23,4 +25,5 @@ import (
2325
type Config struct {
2426
configmodels.ReceiverSettings `mapstructure:",squash"`
2527
NetAddr confignet.NetAddr `mapstructure:",squash"`
28+
AggregationInterval time.Duration `mapstructure:"aggregation_interval"`
2629
}

receiver/statsdreceiver/config_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package statsdreceiver
1717
import (
1818
"path"
1919
"testing"
20+
"time"
2021

2122
"github.com/stretchr/testify/assert"
2223
"github.com/stretchr/testify/require"
@@ -54,5 +55,6 @@ func TestLoadConfig(t *testing.T) {
5455
Endpoint: "localhost:12345",
5556
Transport: "custom_transport",
5657
},
58+
AggregationInterval: 70 * time.Second,
5759
}, r1)
5860
}

receiver/statsdreceiver/factory.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package statsdreceiver
1616

1717
import (
1818
"context"
19+
"time"
1920

2021
"go.opentelemetry.io/collector/component"
2122
"go.opentelemetry.io/collector/config/configmodels"
@@ -26,9 +27,10 @@ import (
2627

2728
const (
2829
// The value of "type" key in configuration.
29-
typeStr = "statsd"
30-
defaultBindEndpoint = "localhost:8125"
31-
defaultTransport = "udp"
30+
typeStr = "statsd"
31+
defaultBindEndpoint = "localhost:8125"
32+
defaultTransport = "udp"
33+
defaultAggregationInterval = 60 * time.Second
3234
)
3335

3436
// NewFactory creates a factory for the StatsD receiver.
@@ -50,6 +52,7 @@ func createDefaultConfig() configmodels.Receiver {
5052
Endpoint: defaultBindEndpoint,
5153
Transport: defaultTransport,
5254
},
55+
AggregationInterval: defaultAggregationInterval,
5356
}
5457
}
5558

receiver/statsdreceiver/factory_test.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,3 +41,15 @@ func TestCreateReceiver(t *testing.T) {
4141
assert.NoError(t, err)
4242
assert.NotNil(t, tReceiver, "receiver creation failed")
4343
}
44+
45+
func TestCreateMetricsReceiverWithNilConsumer(t *testing.T) {
46+
receiver, err := createMetricsReceiver(
47+
context.Background(),
48+
component.ReceiverCreateParams{Logger: zap.NewNop()},
49+
createDefaultConfig(),
50+
nil,
51+
)
52+
53+
assert.Error(t, err, "nil consumer")
54+
assert.Nil(t, receiver)
55+
}

receiver/statsdreceiver/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ require (
77
github.com/stretchr/testify v1.6.1
88
go.opencensus.io v0.22.5
99
go.opentelemetry.io/collector v0.16.1-0.20201207152538-326931de8c32
10+
go.opentelemetry.io/otel v0.13.0
1011
go.uber.org/zap v1.16.0
1112
google.golang.org/protobuf v1.25.0
1213
)

receiver/statsdreceiver/protocol/parser.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,5 +20,7 @@ import (
2020

2121
// Parser is something that can map input StatsD strings to OTLP Metric representations.
2222
type Parser interface {
23-
Parse(in string) (*metricspb.Metric, error)
23+
Initialize() error
24+
GetMetrics() []*metricspb.Metric
25+
Aggregate(line string) error
2426
}

0 commit comments

Comments
 (0)