Skip to content

Commit c910564

Browse files
vihangvkVihang Kotharkar
authored and
Vihang Kotharkar
committed
[receiver:awsfirehosereceiver] added support for Otel format
1 parent 71ce97e commit c910564

File tree

6 files changed

+257
-2
lines changed

6 files changed

+257
-2
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: awsfirehosereceiver
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: added OTLP v1 support to Firehose receiver
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [34982]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

receiver/awsfirehosereceiver/README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,3 +62,6 @@ See [documentation](https://docs.aws.amazon.com/firehose/latest/dev/create-desti
6262
The record type for the CloudWatch metric stream. Expects the format for the records to be JSON.
6363
See [documentation](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-Metric-Streams.html) for details.
6464

65+
### otlp_v1
66+
The OTLP v1 format as produced by CloudWatch metric streams.
67+
See [documentation](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-metric-streams-formats-opentelemetry-100.html) for details.

receiver/awsfirehosereceiver/factory.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/metadata"
1818
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler"
1919
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream"
20+
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/otlpmetricstream"
2021
)
2122

2223
const (
@@ -28,7 +29,8 @@ const (
2829
var (
2930
errUnrecognizedRecordType = errors.New("unrecognized record type")
3031
availableRecordTypes = map[string]bool{
31-
cwmetricstream.TypeStr: true,
32+
cwmetricstream.TypeStr: true,
33+
otlpmetricstream.TypeStr: true,
3234
}
3335
)
3436

@@ -54,8 +56,10 @@ func validateRecordType(recordType string) error {
5456
// unmarshalers.
5557
func defaultMetricsUnmarshalers(logger *zap.Logger) map[string]unmarshaler.MetricsUnmarshaler {
5658
cwmsu := cwmetricstream.NewUnmarshaler(logger)
59+
omsu := otlpmetricstream.NewUnmarshaler(logger)
5760
return map[string]unmarshaler.MetricsUnmarshaler{
5861
cwmsu.Type(): cwmsu,
62+
omsu.Type(): omsu,
5963
}
6064
}
6165

receiver/awsfirehosereceiver/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfir
33
go 1.22.0
44

55
require (
6+
github.com/gogo/protobuf v1.3.2
67
github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.108.0
78
github.com/stretchr/testify v1.9.0
89
go.opentelemetry.io/collector/component v0.108.2-0.20240909182537-32cff9f7f331
@@ -29,7 +30,6 @@ require (
2930
github.com/go-logr/logr v1.4.2 // indirect
3031
github.com/go-logr/stdr v1.2.2 // indirect
3132
github.com/go-viper/mapstructure/v2 v2.1.0 // indirect
32-
github.com/gogo/protobuf v1.3.2 // indirect
3333
github.com/golang/snappy v0.0.4 // indirect
3434
github.com/google/uuid v1.6.0 // indirect
3535
github.com/hashicorp/go-version v1.7.0 // indirect
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package otlpmetricstream // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/otlpmetricstream"
5+
6+
import (
7+
"errors"
8+
9+
"github.com/gogo/protobuf/proto"
10+
"go.opentelemetry.io/collector/pdata/pmetric"
11+
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"
12+
"go.uber.org/zap"
13+
14+
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler"
15+
)
16+
17+
const (
18+
// Supported version depends on version of go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp dependency
19+
TypeStr = "otlp_v1"
20+
)
21+
22+
var (
23+
errInvalidRecords = errors.New("record format invalid")
24+
errInvalidOTLPFormatStart = errors.New("unable to decode data length from message")
25+
)
26+
27+
// Unmarshaler for the CloudWatch Metric Stream OpenTelemetry record format.
28+
//
29+
// More details can be found at:
30+
// https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-metric-streams-formats-opentelemetry-100.html
31+
type Unmarshaler struct {
32+
logger *zap.Logger
33+
}
34+
35+
var _ unmarshaler.MetricsUnmarshaler = (*Unmarshaler)(nil)
36+
37+
// NewUnmarshaler creates a new instance of the Unmarshaler.
38+
func NewUnmarshaler(logger *zap.Logger) *Unmarshaler {
39+
return &Unmarshaler{logger}
40+
}
41+
42+
// Unmarshal deserializes the records into pmetric.Metrics
43+
func (u Unmarshaler) Unmarshal(records [][]byte) (pmetric.Metrics, error) {
44+
md := pmetric.NewMetrics()
45+
for recordIndex, record := range records {
46+
var dataLen, pos = len(record), 0
47+
for pos < dataLen {
48+
n, nLen := proto.DecodeVarint(record)
49+
if nLen == 0 && n == 0 {
50+
return md, errInvalidOTLPFormatStart
51+
}
52+
req := pmetricotlp.NewExportRequest()
53+
pos += nLen
54+
err := req.UnmarshalProto(record[pos : pos+int(n)])
55+
pos += int(n)
56+
if err != nil {
57+
u.logger.Error(
58+
"Unable to unmarshal input",
59+
zap.Error(err),
60+
zap.Int("record_index", recordIndex),
61+
)
62+
continue
63+
}
64+
if !u.isValid(req.Metrics()) {
65+
u.logger.Error(
66+
"Invalid metric",
67+
zap.Int("record_index", recordIndex),
68+
)
69+
continue
70+
}
71+
req.Metrics().ResourceMetrics().MoveAndAppendTo(md.ResourceMetrics())
72+
}
73+
}
74+
75+
return md, nil
76+
}
77+
78+
// isValid validates that the metric has been unmarshalled correctly.
79+
func (u Unmarshaler) isValid(metrics pmetric.Metrics) bool {
80+
for r := range metrics.ResourceMetrics().Len() {
81+
for s := range metrics.ResourceMetrics().At(r).ScopeMetrics().Len() {
82+
for m := range metrics.ResourceMetrics().At(r).ScopeMetrics().At(s).Metrics().Len() {
83+
if metrics.ResourceMetrics().At(r).ScopeMetrics().At(s).Metrics().At(m).Name() == "" {
84+
return false
85+
}
86+
if metrics.ResourceMetrics().At(r).ScopeMetrics().At(s).Metrics().At(m).Type() == pmetric.MetricTypeEmpty {
87+
return false
88+
}
89+
}
90+
}
91+
}
92+
93+
return true
94+
}
95+
96+
// Type of the serialized messages.
97+
func (u Unmarshaler) Type() string {
98+
return TypeStr
99+
}
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package otlpmetricstream
5+
6+
import (
7+
"testing"
8+
"time"
9+
10+
"github.com/gogo/protobuf/proto"
11+
"github.com/stretchr/testify/require"
12+
"go.opentelemetry.io/collector/pdata/pcommon"
13+
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"
14+
conventions "go.opentelemetry.io/collector/semconv/v1.6.1"
15+
"go.uber.org/zap"
16+
)
17+
18+
func TestType(t *testing.T) {
19+
unmarshaler := NewUnmarshaler(zap.NewNop())
20+
require.Equal(t, TypeStr, unmarshaler.Type())
21+
}
22+
23+
func createMetricRecord() []byte {
24+
var er = pmetricotlp.NewExportRequest()
25+
var rsm = er.Metrics().ResourceMetrics().AppendEmpty()
26+
rsm.Resource().Attributes().PutStr(conventions.AttributeCloudProvider, conventions.AttributeCloudProviderAWS)
27+
var dp = rsm.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetEmptySummary().DataPoints().AppendEmpty()
28+
dp.SetCount(1)
29+
dp.SetSum(1)
30+
qv := dp.QuantileValues()
31+
min := qv.AppendEmpty()
32+
min.SetQuantile(0)
33+
min.SetValue(0)
34+
max := qv.AppendEmpty()
35+
max.SetQuantile(1)
36+
max.SetValue(1)
37+
dp.SetTimestamp(pcommon.NewTimestampFromTime(time.Now()))
38+
39+
temp, _ := er.MarshalProto()
40+
var record = proto.EncodeVarint(uint64(len(temp)))
41+
record = append(record, temp...)
42+
return record
43+
}
44+
45+
func TestUnmarshal(t *testing.T) {
46+
unmarshaler := NewUnmarshaler(zap.NewNop())
47+
testCases := map[string]struct {
48+
records [][]byte
49+
wantResourceCount int
50+
wantMetricCount int
51+
wantDatapointCount int
52+
wantErr error
53+
}{
54+
"WithSingleRecord": {
55+
records: [][]byte{
56+
createMetricRecord(),
57+
},
58+
wantResourceCount: 1,
59+
wantMetricCount: 1,
60+
wantDatapointCount: 1,
61+
},
62+
"WithMultipleRecords": {
63+
records: [][]byte{
64+
createMetricRecord(),
65+
createMetricRecord(),
66+
createMetricRecord(),
67+
createMetricRecord(),
68+
createMetricRecord(),
69+
createMetricRecord(),
70+
},
71+
wantResourceCount: 6,
72+
wantMetricCount: 6,
73+
wantDatapointCount: 6,
74+
},
75+
"WithEmptyRecord": {
76+
records: make([][]byte, 0),
77+
wantErr: errInvalidRecords,
78+
},
79+
"WithInvalidRecords": {
80+
records: [][]byte{{1, 2}},
81+
wantErr: errInvalidRecords,
82+
},
83+
"WithSomeInvalidRecords": {
84+
records: [][]byte{
85+
createMetricRecord(),
86+
{1, 2},
87+
createMetricRecord(),
88+
},
89+
wantResourceCount: 2,
90+
wantMetricCount: 2,
91+
wantDatapointCount: 2,
92+
},
93+
}
94+
for name, testCase := range testCases {
95+
t.Run(name, func(t *testing.T) {
96+
97+
got, err := unmarshaler.Unmarshal(testCase.records)
98+
if testCase.wantErr != nil {
99+
require.Error(t, err)
100+
require.Equal(t, testCase.wantErr, err)
101+
} else {
102+
require.NoError(t, err)
103+
require.NotNil(t, got)
104+
require.Equal(t, testCase.wantResourceCount, got.ResourceMetrics().Len())
105+
gotMetricCount := 0
106+
gotDatapointCount := 0
107+
for i := 0; i < got.ResourceMetrics().Len(); i++ {
108+
rm := got.ResourceMetrics().At(i)
109+
require.Equal(t, 1, rm.ScopeMetrics().Len())
110+
ilm := rm.ScopeMetrics().At(0)
111+
gotMetricCount += ilm.Metrics().Len()
112+
for j := 0; j < ilm.Metrics().Len(); j++ {
113+
metric := ilm.Metrics().At(j)
114+
gotDatapointCount += metric.Summary().DataPoints().Len()
115+
}
116+
}
117+
require.Equal(t, testCase.wantMetricCount, gotMetricCount)
118+
require.Equal(t, testCase.wantDatapointCount, gotDatapointCount)
119+
}
120+
})
121+
}
122+
}

0 commit comments

Comments
 (0)