Skip to content

Commit 3254370

Browse files
authored
Change OpenCensus type to OpenTelemetry type for statsD receiver (#2733)
1 parent f8c550f commit 3254370

File tree

8 files changed

+319
-1354
lines changed

8 files changed

+319
-1354
lines changed

receiver/statsdreceiver/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ The Following settings are optional:
1919

2020
- `aggregation_interval: 70s`(default value is 60s): The aggregation time that the receiver aggregates the metrics (similar to the flush interval in StatsD server)
2121

22-
- `enable_metric_type: true`(default value is false): Enbale the statsd receiver to be able to emit the mertic type(gauge, counter, timer(in the future), histogram(in the future)) as a lable.
22+
- `enable_metric_type: true`(default value is false): Enable the statsd receiver to be able to emit the metric type(gauge, counter, timer(in the future), histogram(in the future)) as a label.
2323

2424
Example:
2525

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
// Copyright 2020, OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package protocol
16+
17+
import (
18+
"time"
19+
20+
"go.opentelemetry.io/collector/consumer/pdata"
21+
)
22+
23+
func buildCounterMetric(parsedMetric statsDMetric, timeNow time.Time) pdata.InstrumentationLibraryMetrics {
24+
dp := pdata.NewIntDataPoint()
25+
dp.SetValue(parsedMetric.intvalue)
26+
dp.SetTimestamp(pdata.TimestampFromTime(timeNow))
27+
for i, key := range parsedMetric.labelKeys {
28+
dp.LabelsMap().Insert(key, parsedMetric.labelValues[i])
29+
}
30+
31+
nm := pdata.NewMetric()
32+
nm.SetName(parsedMetric.description.name)
33+
if parsedMetric.unit != "" {
34+
nm.SetUnit(parsedMetric.unit)
35+
}
36+
nm.SetDataType(pdata.MetricDataTypeIntSum)
37+
nm.IntSum().DataPoints().Append(dp)
38+
nm.IntSum().SetAggregationTemporality(pdata.AggregationTemporalityDelta)
39+
nm.IntSum().SetIsMonotonic(true)
40+
41+
ilm := pdata.NewInstrumentationLibraryMetrics()
42+
ilm.Metrics().Append(nm)
43+
44+
return ilm
45+
}
46+
47+
func buildGaugeMetric(parsedMetric statsDMetric, timeNow time.Time) pdata.InstrumentationLibraryMetrics {
48+
dp := pdata.NewDoubleDataPoint()
49+
dp.SetValue(parsedMetric.floatvalue)
50+
dp.SetTimestamp(pdata.TimestampFromTime(timeNow))
51+
for i, key := range parsedMetric.labelKeys {
52+
dp.LabelsMap().Insert(key, parsedMetric.labelValues[i])
53+
}
54+
55+
nm := pdata.NewMetric()
56+
nm.SetName(parsedMetric.description.name)
57+
if parsedMetric.unit != "" {
58+
nm.SetUnit(parsedMetric.unit)
59+
}
60+
nm.SetDataType(pdata.MetricDataTypeDoubleGauge)
61+
nm.DoubleGauge().DataPoints().Append(dp)
62+
63+
ilm := pdata.NewInstrumentationLibraryMetrics()
64+
ilm.Metrics().Append(nm)
65+
66+
return ilm
67+
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
// Copyright 2020, OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package protocol
16+
17+
import (
18+
"testing"
19+
"time"
20+
21+
"github.com/stretchr/testify/assert"
22+
"go.opentelemetry.io/collector/consumer/pdata"
23+
)
24+
25+
func TestBuildCounterMetric(t *testing.T) {
26+
timeNow := time.Now()
27+
metricDescription := statsDMetricdescription{
28+
name: "testCounter",
29+
}
30+
parsedMetric := statsDMetric{
31+
description: metricDescription,
32+
intvalue: 32,
33+
unit: "meter",
34+
labelKeys: []string{"mykey"},
35+
labelValues: []string{"myvalue"},
36+
}
37+
metric := buildCounterMetric(parsedMetric, timeNow)
38+
expectedMetric := pdata.NewInstrumentationLibraryMetrics()
39+
expectedMetric.Metrics().Resize(1)
40+
expectedMetric.Metrics().At(0).SetName("testCounter")
41+
expectedMetric.Metrics().At(0).SetUnit("meter")
42+
expectedMetric.Metrics().At(0).SetDataType(pdata.MetricDataTypeIntSum)
43+
expectedMetric.Metrics().At(0).IntSum().SetAggregationTemporality(pdata.AggregationTemporalityDelta)
44+
expectedMetric.Metrics().At(0).IntSum().SetIsMonotonic(true)
45+
expectedMetric.Metrics().At(0).IntSum().DataPoints().Resize(1)
46+
expectedMetric.Metrics().At(0).IntSum().DataPoints().At(0).SetValue(32)
47+
expectedMetric.Metrics().At(0).IntSum().DataPoints().At(0).SetTimestamp(pdata.TimestampFromTime(timeNow))
48+
expectedMetric.Metrics().At(0).IntSum().DataPoints().At(0).LabelsMap().Insert("mykey", "myvalue")
49+
assert.Equal(t, metric, expectedMetric)
50+
}
51+
52+
func TestBuildGaugeMetric(t *testing.T) {
53+
timeNow := time.Now()
54+
metricDescription := statsDMetricdescription{
55+
name: "testGauge",
56+
}
57+
parsedMetric := statsDMetric{
58+
description: metricDescription,
59+
floatvalue: 32.3,
60+
unit: "meter",
61+
labelKeys: []string{"mykey", "mykey2"},
62+
labelValues: []string{"myvalue", "myvalue2"},
63+
}
64+
metric := buildGaugeMetric(parsedMetric, timeNow)
65+
expectedMetric := pdata.NewInstrumentationLibraryMetrics()
66+
expectedMetric.Metrics().Resize(1)
67+
expectedMetric.Metrics().At(0).SetName("testGauge")
68+
expectedMetric.Metrics().At(0).SetUnit("meter")
69+
expectedMetric.Metrics().At(0).SetDataType(pdata.MetricDataTypeDoubleGauge)
70+
expectedMetric.Metrics().At(0).DoubleGauge().DataPoints().Resize(1)
71+
expectedMetric.Metrics().At(0).DoubleGauge().DataPoints().At(0).SetValue(32.3)
72+
expectedMetric.Metrics().At(0).DoubleGauge().DataPoints().At(0).SetTimestamp(pdata.TimestampFromTime(timeNow))
73+
expectedMetric.Metrics().At(0).DoubleGauge().DataPoints().At(0).LabelsMap().Insert("mykey", "myvalue")
74+
expectedMetric.Metrics().At(0).DoubleGauge().DataPoints().At(0).LabelsMap().Insert("mykey2", "myvalue2")
75+
assert.Equal(t, metric, expectedMetric)
76+
}

receiver/statsdreceiver/protocol/parser.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,12 @@
1515
package protocol
1616

1717
import (
18-
metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
18+
"go.opentelemetry.io/collector/consumer/pdata"
1919
)
2020

2121
// Parser is something that can map input StatsD strings to OTLP Metric representations.
2222
type Parser interface {
2323
Initialize(enableMetricType bool) error
24-
GetMetrics() []*metricspb.Metric
24+
GetMetrics() pdata.Metrics
2525
Aggregate(line string) error
2626
}

receiver/statsdreceiver/protocol/statsd_parser.go

Lines changed: 29 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,8 @@ import (
2121
"strings"
2222
"time"
2323

24-
metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
24+
"go.opentelemetry.io/collector/consumer/pdata"
2525
"go.opentelemetry.io/otel/attribute"
26-
"google.golang.org/protobuf/types/known/timestamppb"
2726
)
2827

2928
var (
@@ -39,8 +38,8 @@ const TagMetricType = "metric_type"
3938

4039
// StatsDParser supports the Parse method for parsing StatsD messages with Tags.
4140
type StatsDParser struct {
42-
gauges map[statsDMetricdescription]*metricspb.Metric
43-
counters map[statsDMetricdescription]*metricspb.Metric
41+
gauges map[statsDMetricdescription]pdata.InstrumentationLibraryMetrics
42+
counters map[statsDMetricdescription]pdata.InstrumentationLibraryMetrics
4443
enableMetricType bool
4544
}
4645

@@ -51,10 +50,9 @@ type statsDMetric struct {
5150
floatvalue float64
5251
addition bool
5352
unit string
54-
metricType metricspb.MetricDescriptor_Type
5553
sampleRate float64
56-
labelKeys []*metricspb.LabelKey
57-
labelValues []*metricspb.LabelValue
54+
labelKeys []string
55+
labelValues []string
5856
}
5957

6058
type statsDMetricdescription struct {
@@ -64,32 +62,34 @@ type statsDMetricdescription struct {
6462
}
6563

6664
func (p *StatsDParser) Initialize(enableMetricType bool) error {
67-
p.gauges = make(map[statsDMetricdescription]*metricspb.Metric)
68-
p.counters = make(map[statsDMetricdescription]*metricspb.Metric)
65+
p.gauges = make(map[statsDMetricdescription]pdata.InstrumentationLibraryMetrics)
66+
p.counters = make(map[statsDMetricdescription]pdata.InstrumentationLibraryMetrics)
6967
p.enableMetricType = enableMetricType
7068
return nil
7169
}
7270

7371
// get the metrics preparing for flushing and reset the state
74-
func (p *StatsDParser) GetMetrics() []*metricspb.Metric {
75-
var metrics []*metricspb.Metric
72+
func (p *StatsDParser) GetMetrics() pdata.Metrics {
73+
metrics := pdata.NewMetrics()
74+
metrics.ResourceMetrics().Resize(1)
75+
metrics.ResourceMetrics().At(0).InstrumentationLibraryMetrics().Resize(0)
7676

7777
for _, metric := range p.gauges {
78-
metrics = append(metrics, metric)
78+
metrics.ResourceMetrics().At(0).InstrumentationLibraryMetrics().Append(metric)
7979
}
8080

8181
for _, metric := range p.counters {
82-
metrics = append(metrics, metric)
82+
metrics.ResourceMetrics().At(0).InstrumentationLibraryMetrics().Append(metric)
8383
}
8484

85-
p.gauges = make(map[statsDMetricdescription]*metricspb.Metric)
86-
p.counters = make(map[statsDMetricdescription]*metricspb.Metric)
85+
p.gauges = make(map[statsDMetricdescription]pdata.InstrumentationLibraryMetrics)
86+
p.counters = make(map[statsDMetricdescription]pdata.InstrumentationLibraryMetrics)
8787

8888
return metrics
8989
}
9090

91-
var timeNowFunc = func() int64 {
92-
return time.Now().Unix()
91+
var timeNowFunc = func() time.Time {
92+
return time.Now()
9393
}
9494

9595
//aggregate for each metric line
@@ -102,30 +102,25 @@ func (p *StatsDParser) Aggregate(line string) error {
102102
case "g":
103103
_, ok := p.gauges[parsedMetric.description]
104104
if !ok {
105-
metricPoint := buildPoint(parsedMetric)
106-
p.gauges[parsedMetric.description] = buildMetric(parsedMetric, metricPoint)
105+
p.gauges[parsedMetric.description] = buildGaugeMetric(parsedMetric, timeNowFunc())
107106
} else {
108107
if parsedMetric.addition {
109-
savedValue := p.gauges[parsedMetric.description].GetTimeseries()[0].Points[0].GetDoubleValue()
108+
savedValue := p.gauges[parsedMetric.description].Metrics().At(0).DoubleGauge().DataPoints().At(0).Value()
110109
parsedMetric.floatvalue = parsedMetric.floatvalue + savedValue
111-
metricPoint := buildPoint(parsedMetric)
112-
p.gauges[parsedMetric.description] = buildMetric(parsedMetric, metricPoint)
110+
p.gauges[parsedMetric.description] = buildGaugeMetric(parsedMetric, timeNowFunc())
113111
} else {
114-
metricPoint := buildPoint(parsedMetric)
115-
p.gauges[parsedMetric.description] = buildMetric(parsedMetric, metricPoint)
112+
p.gauges[parsedMetric.description] = buildGaugeMetric(parsedMetric, timeNowFunc())
116113
}
117114
}
118115

119116
case "c":
120117
_, ok := p.counters[parsedMetric.description]
121118
if !ok {
122-
metricPoint := buildPoint(parsedMetric)
123-
p.counters[parsedMetric.description] = buildMetric(parsedMetric, metricPoint)
119+
p.counters[parsedMetric.description] = buildCounterMetric(parsedMetric, timeNowFunc())
124120
} else {
125-
savedValue := p.counters[parsedMetric.description].GetTimeseries()[0].Points[0].GetInt64Value()
121+
savedValue := p.counters[parsedMetric.description].Metrics().At(0).IntSum().DataPoints().At(0).Value()
126122
parsedMetric.intvalue = parsedMetric.intvalue + savedValue
127-
metricPoint := buildPoint(parsedMetric)
128-
p.counters[parsedMetric.description] = buildMetric(parsedMetric, metricPoint)
123+
p.counters[parsedMetric.description] = buildCounterMetric(parsedMetric, timeNowFunc())
129124
}
130125
}
131126

@@ -188,11 +183,8 @@ func parseMessageToMetric(line string, enableMetricType bool) (statsDMetric, err
188183
if len(tagParts) != 2 {
189184
return result, fmt.Errorf("invalid tag format: %s", tagParts)
190185
}
191-
result.labelKeys = append(result.labelKeys, &metricspb.LabelKey{Key: tagParts[0]})
192-
result.labelValues = append(result.labelValues, &metricspb.LabelValue{
193-
Value: tagParts[1],
194-
HasValue: true,
195-
})
186+
result.labelKeys = append(result.labelKeys, tagParts[0])
187+
result.labelValues = append(result.labelValues, tagParts[1])
196188
kvs = append(kvs, attribute.String(tagParts[0], tagParts[1]))
197189
}
198190

@@ -207,7 +199,6 @@ func parseMessageToMetric(line string, enableMetricType bool) (statsDMetric, err
207199
return result, fmt.Errorf("gauge: parse metric value string: %s", result.value)
208200
}
209201
result.floatvalue = f
210-
result.metricType = metricspb.MetricDescriptor_GAUGE_DOUBLE
211202
case "c":
212203
f, err := strconv.ParseFloat(result.value, 64)
213204
if err != nil {
@@ -218,7 +209,6 @@ func parseMessageToMetric(line string, enableMetricType bool) (statsDMetric, err
218209
i = int64(f / result.sampleRate)
219210
}
220211
result.intvalue = i
221-
result.metricType = metricspb.MetricDescriptor_GAUGE_INT64
222212
}
223213

224214
// add metric_type dimension for all metrics
@@ -231,11 +221,9 @@ func parseMessageToMetric(line string, enableMetricType bool) (statsDMetric, err
231221
case "c":
232222
metricType = "counter"
233223
}
234-
result.labelKeys = append(result.labelKeys, &metricspb.LabelKey{Key: TagMetricType})
235-
result.labelValues = append(result.labelValues, &metricspb.LabelValue{
236-
Value: metricType,
237-
HasValue: true,
238-
})
224+
result.labelKeys = append(result.labelKeys, TagMetricType)
225+
result.labelValues = append(result.labelValues, metricType)
226+
239227
kvs = append(kvs, attribute.String(TagMetricType, metricType))
240228
}
241229

@@ -255,57 +243,3 @@ func contains(slice []string, element string) bool {
255243
}
256244
return false
257245
}
258-
259-
func buildMetric(metric statsDMetric, point *metricspb.Point) *metricspb.Metric {
260-
return &metricspb.Metric{
261-
MetricDescriptor: &metricspb.MetricDescriptor{
262-
Name: metric.description.name,
263-
Type: metric.metricType,
264-
LabelKeys: metric.labelKeys,
265-
Unit: metric.unit,
266-
},
267-
Timeseries: []*metricspb.TimeSeries{
268-
{
269-
LabelValues: metric.labelValues,
270-
Points: []*metricspb.Point{
271-
point,
272-
},
273-
},
274-
},
275-
}
276-
}
277-
278-
func buildPoint(parsedMetric statsDMetric) *metricspb.Point {
279-
now := &timestamppb.Timestamp{
280-
Seconds: timeNowFunc(),
281-
}
282-
283-
switch parsedMetric.description.statsdMetricType {
284-
case "c":
285-
return buildCounterPoint(parsedMetric, now)
286-
case "g":
287-
return buildGaugePoint(parsedMetric, now)
288-
}
289-
290-
return nil
291-
}
292-
293-
func buildCounterPoint(parsedMetric statsDMetric, now *timestamppb.Timestamp) *metricspb.Point {
294-
point := &metricspb.Point{
295-
Timestamp: now,
296-
Value: &metricspb.Point_Int64Value{
297-
Int64Value: parsedMetric.intvalue,
298-
},
299-
}
300-
return point
301-
}
302-
303-
func buildGaugePoint(parsedMetric statsDMetric, now *timestamppb.Timestamp) *metricspb.Point {
304-
point := &metricspb.Point{
305-
Timestamp: now,
306-
Value: &metricspb.Point_DoubleValue{
307-
DoubleValue: parsedMetric.floatvalue,
308-
},
309-
}
310-
return point
311-
}

0 commit comments

Comments
 (0)