Skip to content

Commit d1937d6

Browse files
ZenoCC-Pengatoulme
andauthored
[receiver/collectd] Migrate from OpenCensus data model to pdata (#24770)
**Description:** Migrate from OpenCensus data model to pdata on collectdreceiver. Using golden to compare metrics for testing. Since wavefrontreceiver depends on collectdreceiver, adjustments are necessary in the go.mod and go.sum files. **Link to tracking Issue:** [20760](#20760) --------- Co-authored-by: Antoine Toulme <[email protected]>
1 parent c1fb7f1 commit d1937d6

File tree

10 files changed

+377
-685
lines changed

10 files changed

+377
-685
lines changed
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
# Use this changelog template to create an entry for release notes.
2+
# If your change doesn't affect end users, such as a test fix or a tooling change,
3+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
4+
5+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
6+
change_type: enhancement
7+
8+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
9+
component: receiver/collectdreceiver
10+
11+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
12+
note: Migrate from opencensus to pdata, change collectd, test to match pdata format.
13+
14+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
15+
issues: [20760]

receiver/collectdreceiver/collectd.go

Lines changed: 82 additions & 120 deletions
Original file line numberDiff line numberDiff line change
@@ -9,19 +9,12 @@ import (
99
"strings"
1010
"time"
1111

12-
metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
13-
"google.golang.org/protobuf/types/known/timestamppb"
12+
"go.opentelemetry.io/collector/pdata/pcommon"
13+
"go.opentelemetry.io/collector/pdata/pmetric"
1414

1515
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/sanitize"
1616
)
1717

18-
const (
19-
collectDMetricDerive = "derive"
20-
collectDMetricGauge = "gauge"
21-
collectDMetricCounter = "counter"
22-
collectDMetricAbsolute = "absolute"
23-
)
24-
2518
type collectDRecord struct {
2619
Dsnames []*string `json:"dsnames"`
2720
Dstypes []*string `json:"dstypes"`
@@ -38,31 +31,31 @@ type collectDRecord struct {
3831
Severity *string `json:"severity"`
3932
}
4033

41-
func (r *collectDRecord) isEvent() bool {
42-
return r.Time != nil && r.Severity != nil && r.Message != nil
34+
func (cdr *collectDRecord) isEvent() bool {
35+
return cdr.Time != nil && cdr.Severity != nil && cdr.Message != nil
4336
}
4437

45-
func (r *collectDRecord) protoTime() *timestamppb.Timestamp {
46-
if r.Time == nil {
47-
return nil
38+
func (cdr *collectDRecord) protoTime() pcommon.Timestamp {
39+
// Return 1970-01-01 00:00:00 +0000 UTC.
40+
if cdr.Time == nil {
41+
return pcommon.NewTimestampFromTime(time.Unix(0, 0))
4842
}
49-
ts := time.Unix(0, int64(float64(time.Second)**r.Time))
50-
return timestamppb.New(ts)
43+
ts := time.Unix(0, int64(float64(time.Second)**cdr.Time))
44+
return pcommon.NewTimestampFromTime(ts)
5145
}
5246

53-
func (r *collectDRecord) startTimestamp(mdType metricspb.MetricDescriptor_Type) *timestamppb.Timestamp {
54-
if mdType == metricspb.MetricDescriptor_CUMULATIVE_DISTRIBUTION || mdType == metricspb.MetricDescriptor_CUMULATIVE_DOUBLE || mdType == metricspb.MetricDescriptor_CUMULATIVE_INT64 {
55-
return timestamppb.New(time.Unix(0, int64((*r.Time-*r.Interval)*float64(time.Second))))
47+
func (cdr *collectDRecord) startTimestamp(metricType string) pcommon.Timestamp {
48+
if metricType == "cumulative" {
49+
return pcommon.NewTimestampFromTime(time.Unix(0, int64((*cdr.Time-*cdr.Interval)*float64(time.Second))))
5650
}
57-
return nil
51+
return pcommon.NewTimestampFromTime(time.Unix(0, 0))
5852
}
5953

60-
func (r *collectDRecord) appendToMetrics(metrics []*metricspb.Metric, defaultLabels map[string]string) ([]*metricspb.Metric, error) {
54+
func (cdr *collectDRecord) appendToMetrics(scopeMetrics pmetric.ScopeMetrics, defaultLabels map[string]string) error {
6155
// Ignore if record is an event instead of data point
62-
if r.isEvent() {
56+
if cdr.isEvent() {
6357
recordEventsReceived()
64-
return metrics, nil
65-
58+
return nil
6659
}
6760

6861
recordMetricsReceived()
@@ -71,125 +64,120 @@ func (r *collectDRecord) appendToMetrics(metrics []*metricspb.Metric, defaultLab
7164
labels[k] = v
7265
}
7366

74-
for i := range r.Dsnames {
75-
if i < len(r.Dstypes) && i < len(r.Values) && r.Values[i] != nil {
76-
dsType, dsName, val := r.Dstypes[i], r.Dsnames[i], r.Values[i]
77-
metricName, usedDsName := r.getReasonableMetricName(i, labels)
67+
for i := range cdr.Dsnames {
68+
if i < len(cdr.Dstypes) && i < len(cdr.Values) && cdr.Values[i] != nil {
69+
dsType, dsName, val := cdr.Dstypes[i], cdr.Dsnames[i], cdr.Values[i]
70+
metricName, usedDsName := cdr.getReasonableMetricName(i, labels)
7871

79-
addIfNotNullOrEmpty(labels, "plugin", r.Plugin)
80-
parseAndAddLabels(labels, r.PluginInstance, r.Host)
72+
addIfNotNullOrEmpty(labels, "plugin", cdr.Plugin)
73+
parseAndAddLabels(labels, cdr.PluginInstance, cdr.Host)
8174
if !usedDsName {
8275
addIfNotNullOrEmpty(labels, "dsname", dsName)
8376
}
8477

85-
metric, err := r.newMetric(metricName, dsType, val, labels)
78+
metric, err := cdr.newMetric(metricName, dsType, val, labels)
8679
if err != nil {
87-
return metrics, fmt.Errorf("error processing metric %s: %w", sanitize.String(metricName), err)
80+
return fmt.Errorf("error processing metric %s: %w", sanitize.String(metricName), err)
8881
}
89-
metrics = append(metrics, metric)
90-
82+
newMetric := scopeMetrics.Metrics().AppendEmpty()
83+
metric.MoveTo(newMetric)
9184
}
9285
}
93-
return metrics, nil
86+
return nil
9487
}
9588

96-
func (r *collectDRecord) newMetric(name string, dsType *string, val *json.Number, labels map[string]string) (*metricspb.Metric, error) {
97-
metric := &metricspb.Metric{}
98-
point, isDouble, err := r.newPoint(val)
89+
// Create new metric, get labels, then setting attribute and metric info
90+
func (cdr *collectDRecord) newMetric(name string, dsType *string, val *json.Number, labels map[string]string) (pmetric.Metric, error) {
91+
attributes := setAttributes(labels)
92+
metric, err := cdr.setMetric(name, dsType, val, attributes)
9993
if err != nil {
100-
return metric, fmt.Errorf("error processing metric %s: %w", name, err)
94+
return pmetric.Metric{}, fmt.Errorf("error processing metric %s: %w", name, err)
10195
}
96+
return metric, nil
97+
}
10298

103-
lKeys, lValues := labelKeysAndValues(labels)
104-
metricType := r.metricType(dsType, isDouble)
105-
metric.MetricDescriptor = &metricspb.MetricDescriptor{
106-
Name: name,
107-
Type: metricType,
108-
LabelKeys: lKeys,
109-
}
110-
metric.Timeseries = []*metricspb.TimeSeries{
111-
{
112-
StartTimestamp: r.startTimestamp(metricType),
113-
LabelValues: lValues,
114-
Points: []*metricspb.Point{point},
115-
},
99+
func setAttributes(labels map[string]string) pcommon.Map {
100+
attributes := pcommon.NewMap()
101+
for k, v := range labels {
102+
attributes.PutStr(k, v)
116103
}
117-
118-
return metric, nil
104+
return attributes
119105
}
120106

121-
func (r *collectDRecord) metricType(dsType *string, isDouble bool) metricspb.MetricDescriptor_Type {
122-
val := ""
107+
// Set new metric info with name, datapoint, time, attributes
108+
func (cdr *collectDRecord) setMetric(name string, dsType *string, val *json.Number, atr pcommon.Map) (pmetric.Metric, error) {
109+
110+
typ := ""
111+
metric := pmetric.NewMetric()
112+
123113
if dsType != nil {
124-
val = *dsType
114+
typ = *dsType
125115
}
126116

127-
switch val {
128-
case collectDMetricCounter, collectDMetricDerive:
129-
return metricCumulative(isDouble)
117+
metric.SetName(name)
118+
dataPoint := setDataPoint(typ, metric)
119+
dataPoint.SetTimestamp(cdr.protoTime())
120+
atr.CopyTo(dataPoint.Attributes())
130121

131-
// Prometheus collectd exporter just ignores it. We use gauge for it as it seems the
132-
// closes type. https://github.com/prometheus/collectd_exporter/blob/master/main.go#L109-L129
133-
case collectDMetricGauge, collectDMetricAbsolute:
134-
return metricGauge(isDouble)
122+
if pointVal, err := val.Int64(); err == nil {
123+
dataPoint.SetIntValue(pointVal)
124+
} else if pointVal, err := val.Float64(); err == nil {
125+
dataPoint.SetDoubleValue(pointVal)
126+
} else {
127+
return pmetric.Metric{}, fmt.Errorf("value could not be decoded: %w", err)
135128
}
136-
return metricGauge(isDouble)
129+
return metric, nil
137130
}
138131

139-
func (r *collectDRecord) newPoint(val *json.Number) (*metricspb.Point, bool, error) {
140-
p := &metricspb.Point{
141-
Timestamp: r.protoTime(),
142-
}
143-
144-
isDouble := true
145-
if v, err := val.Int64(); err == nil {
146-
isDouble = false
147-
p.Value = &metricspb.Point_Int64Value{Int64Value: v}
148-
} else {
149-
v, err := val.Float64()
150-
if err != nil {
151-
return nil, isDouble, fmt.Errorf("value could not be decoded: %w", err)
152-
}
153-
p.Value = &metricspb.Point_DoubleValue{DoubleValue: v}
132+
// check type to decide metric type and return data point
133+
func setDataPoint(typ string, metric pmetric.Metric) pmetric.NumberDataPoint {
134+
var dataPoint pmetric.NumberDataPoint
135+
switch typ {
136+
case "derive", "counter":
137+
sum := metric.SetEmptySum()
138+
sum.SetIsMonotonic(true)
139+
dataPoint = sum.DataPoints().AppendEmpty()
140+
default:
141+
dataPoint = metric.SetEmptyGauge().DataPoints().AppendEmpty()
154142
}
155-
return p, isDouble, nil
143+
return dataPoint
156144
}
157145

158146
// getReasonableMetricName creates metrics names by joining them (if non empty) type.typeinstance
159147
// if there are more than one dsname append .dsname for the particular uint. if there's only one it
160148
// becomes a dimension.
161-
func (r *collectDRecord) getReasonableMetricName(index int, attrs map[string]string) (string, bool) {
149+
func (cdr *collectDRecord) getReasonableMetricName(index int, attrs map[string]string) (string, bool) {
162150
usedDsName := false
163151
capacity := 0
164-
if r.TypeS != nil {
165-
capacity += len(*r.TypeS)
152+
if cdr.TypeS != nil {
153+
capacity += len(*cdr.TypeS)
166154
}
167-
if r.TypeInstance != nil {
168-
capacity += len(*r.TypeInstance)
155+
if cdr.TypeInstance != nil {
156+
capacity += len(*cdr.TypeInstance)
169157
}
170158
parts := make([]byte, 0, capacity)
171159

172-
if !isNilOrEmpty(r.TypeS) {
173-
parts = append(parts, *r.TypeS...)
160+
if !isNilOrEmpty(cdr.TypeS) {
161+
parts = append(parts, *cdr.TypeS...)
174162
}
175-
parts = r.pointTypeInstance(attrs, parts)
176-
if r.Dsnames != nil && !isNilOrEmpty(r.Dsnames[index]) && len(r.Dsnames) > 1 {
163+
parts = cdr.pointTypeInstance(attrs, parts)
164+
if cdr.Dsnames != nil && !isNilOrEmpty(cdr.Dsnames[index]) && len(cdr.Dsnames) > 1 {
177165
if len(parts) > 0 {
178166
parts = append(parts, '.')
179167
}
180-
parts = append(parts, *r.Dsnames[index]...)
168+
parts = append(parts, *cdr.Dsnames[index]...)
181169
usedDsName = true
182170
}
183171
return string(parts), usedDsName
184172
}
185173

186174
// pointTypeInstance extracts information from the TypeInstance field and appends to the metric name when possible.
187-
func (r *collectDRecord) pointTypeInstance(attrs map[string]string, parts []byte) []byte {
188-
if isNilOrEmpty(r.TypeInstance) {
175+
func (cdr *collectDRecord) pointTypeInstance(attrs map[string]string, parts []byte) []byte {
176+
if isNilOrEmpty(cdr.TypeInstance) {
189177
return parts
190178
}
191179

192-
instanceName, extractedAttrs := LabelsFromName(r.TypeInstance)
180+
instanceName, extractedAttrs := LabelsFromName(cdr.TypeInstance)
193181
if instanceName != "" {
194182
if len(parts) > 0 {
195183
parts = append(parts, '.')
@@ -275,29 +263,3 @@ func parseNameForLabels(labels map[string]string, key string, val *string) {
275263
}
276264
addIfNotNullOrEmpty(labels, key, &instanceName)
277265
}
278-
279-
func labelKeysAndValues(labels map[string]string) ([]*metricspb.LabelKey, []*metricspb.LabelValue) {
280-
keys := make([]*metricspb.LabelKey, len(labels))
281-
values := make([]*metricspb.LabelValue, len(labels))
282-
i := 0
283-
for k, v := range labels {
284-
keys[i] = &metricspb.LabelKey{Key: k}
285-
values[i] = &metricspb.LabelValue{Value: v, HasValue: true}
286-
i++
287-
}
288-
return keys, values
289-
}
290-
291-
func metricCumulative(isDouble bool) metricspb.MetricDescriptor_Type {
292-
if isDouble {
293-
return metricspb.MetricDescriptor_CUMULATIVE_DOUBLE
294-
}
295-
return metricspb.MetricDescriptor_CUMULATIVE_INT64
296-
}
297-
298-
func metricGauge(isDouble bool) metricspb.MetricDescriptor_Type {
299-
if isDouble {
300-
return metricspb.MetricDescriptor_GAUGE_DOUBLE
301-
}
302-
return metricspb.MetricDescriptor_GAUGE_INT64
303-
}

0 commit comments

Comments
 (0)