Skip to content

Commit 12aedd1

Browse files
vihangvkVihang Kotharkar
authored and
Vihang Kotharkar
committed
[receiver:awsfirehosereceiver] merge main
1 parent 01665a0 commit 12aedd1

File tree

6 files changed

+260
-2
lines changed

6 files changed

+260
-2
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: awsfirehosereceiver
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: added OTLP v1 support to Firehose receiver
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [34982]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

receiver/awsfirehosereceiver/README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,3 +62,6 @@ See [documentation](https://docs.aws.amazon.com/firehose/latest/dev/create-desti
6262
The record type for the CloudWatch metric stream. Expects the format for the records to be JSON.
6363
See [documentation](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-Metric-Streams.html) for details.
6464

65+
### otlp_v1
66+
The OTLP v1 format as produced by CloudWatch metric streams.
67+
See [documentation](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-metric-streams-formats-opentelemetry-100.html) for details.

receiver/awsfirehosereceiver/factory.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/metadata"
1818
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler"
1919
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream"
20+
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/otlpmetricstream"
2021
)
2122

2223
const (
@@ -28,7 +29,8 @@ const (
2829
var (
2930
errUnrecognizedRecordType = errors.New("unrecognized record type")
3031
availableRecordTypes = map[string]bool{
31-
cwmetricstream.TypeStr: true,
32+
cwmetricstream.TypeStr: true,
33+
otlpmetricstream.TypeStr: true,
3234
}
3335
)
3436

@@ -54,8 +56,10 @@ func validateRecordType(recordType string) error {
5456
// unmarshalers.
5557
func defaultMetricsUnmarshalers(logger *zap.Logger) map[string]unmarshaler.MetricsUnmarshaler {
5658
cwmsu := cwmetricstream.NewUnmarshaler(logger)
59+
omsu := otlpmetricstream.NewUnmarshaler(logger)
5760
return map[string]unmarshaler.MetricsUnmarshaler{
5861
cwmsu.Type(): cwmsu,
62+
omsu.Type(): omsu,
5963
}
6064
}
6165

receiver/awsfirehosereceiver/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfir
33
go 1.22.0
44

55
require (
6+
github.com/gogo/protobuf v1.3.2
67
github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.109.0
78
github.com/stretchr/testify v1.9.0
89
go.opentelemetry.io/collector/component v0.109.0
@@ -29,7 +30,6 @@ require (
2930
github.com/go-logr/logr v1.4.2 // indirect
3031
github.com/go-logr/stdr v1.2.2 // indirect
3132
github.com/go-viper/mapstructure/v2 v2.1.0 // indirect
32-
github.com/gogo/protobuf v1.3.2 // indirect
3333
github.com/golang/snappy v0.0.4 // indirect
3434
github.com/google/uuid v1.6.0 // indirect
3535
github.com/hashicorp/go-version v1.7.0 // indirect
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package otlpmetricstream // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/otlpmetricstream"
5+
6+
import (
7+
"errors"
8+
9+
"github.com/gogo/protobuf/proto"
10+
"go.opentelemetry.io/collector/pdata/pmetric"
11+
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"
12+
"go.uber.org/zap"
13+
14+
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler"
15+
)
16+
17+
const (
18+
// Supported version depends on version of go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp dependency
19+
TypeStr = "otlp_v1"
20+
)
21+
22+
var (
23+
errInvalidOTLPFormatStart = errors.New("unable to decode data length from message")
24+
)
25+
26+
// Unmarshaler for the CloudWatch Metric Stream OpenTelemetry record format.
27+
//
28+
// More details can be found at:
29+
// https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-metric-streams-formats-opentelemetry-100.html
30+
type Unmarshaler struct {
31+
logger *zap.Logger
32+
}
33+
34+
var _ unmarshaler.MetricsUnmarshaler = (*Unmarshaler)(nil)
35+
36+
// NewUnmarshaler creates a new instance of the Unmarshaler.
37+
func NewUnmarshaler(logger *zap.Logger) *Unmarshaler {
38+
return &Unmarshaler{logger}
39+
}
40+
41+
// Unmarshal deserializes the records into pmetric.Metrics
42+
func (u Unmarshaler) Unmarshal(records [][]byte) (pmetric.Metrics, error) {
43+
md := pmetric.NewMetrics()
44+
for recordIndex, record := range records {
45+
var dataLen, pos = len(record), 0
46+
for pos < dataLen {
47+
n, nLen := proto.DecodeVarint(record)
48+
if nLen == 0 && n == 0 {
49+
return md, errInvalidOTLPFormatStart
50+
}
51+
req := pmetricotlp.NewExportRequest()
52+
pos += nLen
53+
err := req.UnmarshalProto(record[pos : pos+int(n)])
54+
pos += int(n)
55+
if err != nil {
56+
u.logger.Error(
57+
"Unable to unmarshal input",
58+
zap.Error(err),
59+
zap.Int("record_index", recordIndex),
60+
)
61+
continue
62+
}
63+
if !u.isValid(req.Metrics()) {
64+
u.logger.Error(
65+
"Invalid metric",
66+
zap.Int("record_index", recordIndex),
67+
)
68+
continue
69+
}
70+
req.Metrics().ResourceMetrics().MoveAndAppendTo(md.ResourceMetrics())
71+
}
72+
}
73+
74+
return md, nil
75+
}
76+
77+
// isValid validates that the metric has been unmarshalled correctly.
78+
func (u Unmarshaler) isValid(metrics pmetric.Metrics) bool {
79+
for r := range metrics.ResourceMetrics().Len() {
80+
for s := range metrics.ResourceMetrics().At(r).ScopeMetrics().Len() {
81+
for m := range metrics.ResourceMetrics().At(r).ScopeMetrics().At(s).Metrics().Len() {
82+
if metrics.ResourceMetrics().At(r).ScopeMetrics().At(s).Metrics().At(m).Name() == "" {
83+
return false
84+
}
85+
if metrics.ResourceMetrics().At(r).ScopeMetrics().At(s).Metrics().At(m).Type() == pmetric.MetricTypeEmpty {
86+
return false
87+
}
88+
}
89+
}
90+
}
91+
92+
return true
93+
}
94+
95+
// Type of the serialized messages.
96+
func (u Unmarshaler) Type() string {
97+
return TypeStr
98+
}
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package otlpmetricstream
5+
6+
import (
7+
"testing"
8+
"time"
9+
10+
"github.com/gogo/protobuf/proto"
11+
"github.com/stretchr/testify/require"
12+
"go.opentelemetry.io/collector/pdata/pcommon"
13+
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"
14+
"go.uber.org/zap"
15+
)
16+
17+
func TestType(t *testing.T) {
18+
unmarshaler := NewUnmarshaler(zap.NewNop())
19+
require.Equal(t, TypeStr, unmarshaler.Type())
20+
}
21+
22+
func createMetricRecord() []byte {
23+
var er = pmetricotlp.NewExportRequest()
24+
var rsm = er.Metrics().ResourceMetrics().AppendEmpty()
25+
var sm = rsm.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty()
26+
sm.SetName("TestMetric")
27+
var dp = sm.SetEmptySummary().DataPoints().AppendEmpty()
28+
dp.SetCount(1)
29+
dp.SetSum(1)
30+
qv := dp.QuantileValues()
31+
min := qv.AppendEmpty()
32+
min.SetQuantile(0)
33+
min.SetValue(0)
34+
max := qv.AppendEmpty()
35+
max.SetQuantile(1)
36+
max.SetValue(1)
37+
dp.SetTimestamp(pcommon.NewTimestampFromTime(time.Now()))
38+
39+
temp, _ := er.MarshalProto()
40+
var record = proto.EncodeVarint(uint64(len(temp)))
41+
record = append(record, temp...)
42+
return record
43+
}
44+
45+
func TestUnmarshal(t *testing.T) {
46+
unmarshaler := NewUnmarshaler(zap.NewNop())
47+
testCases := map[string]struct {
48+
records [][]byte
49+
wantResourceCount int
50+
wantMetricCount int
51+
wantDatapointCount int
52+
wantErr error
53+
}{
54+
"WithSingleRecord": {
55+
records: [][]byte{
56+
createMetricRecord(),
57+
},
58+
wantResourceCount: 1,
59+
wantMetricCount: 1,
60+
wantDatapointCount: 1,
61+
},
62+
"WithMultipleRecords": {
63+
records: [][]byte{
64+
createMetricRecord(),
65+
createMetricRecord(),
66+
createMetricRecord(),
67+
createMetricRecord(),
68+
createMetricRecord(),
69+
createMetricRecord(),
70+
},
71+
wantResourceCount: 6,
72+
wantMetricCount: 6,
73+
wantDatapointCount: 6,
74+
},
75+
"WithEmptyRecord": {
76+
records: make([][]byte, 0),
77+
wantResourceCount: 0,
78+
wantMetricCount: 0,
79+
wantDatapointCount: 0,
80+
},
81+
"WithInvalidRecords": {
82+
records: [][]byte{{1, 2}},
83+
wantResourceCount: 0,
84+
wantMetricCount: 0,
85+
wantDatapointCount: 0,
86+
},
87+
"WithSomeInvalidRecords": {
88+
records: [][]byte{
89+
createMetricRecord(),
90+
{1, 2},
91+
createMetricRecord(),
92+
},
93+
wantResourceCount: 2,
94+
wantMetricCount: 2,
95+
wantDatapointCount: 2,
96+
},
97+
}
98+
for name, testCase := range testCases {
99+
t.Run(name, func(t *testing.T) {
100+
101+
got, err := unmarshaler.Unmarshal(testCase.records)
102+
if testCase.wantErr != nil {
103+
require.Error(t, err)
104+
require.Equal(t, testCase.wantErr, err)
105+
} else {
106+
require.NoError(t, err)
107+
require.NotNil(t, got)
108+
require.Equal(t, testCase.wantResourceCount, got.ResourceMetrics().Len())
109+
gotMetricCount := 0
110+
gotDatapointCount := 0
111+
for i := 0; i < got.ResourceMetrics().Len(); i++ {
112+
rm := got.ResourceMetrics().At(i)
113+
require.Equal(t, 1, rm.ScopeMetrics().Len())
114+
ilm := rm.ScopeMetrics().At(0)
115+
gotMetricCount += ilm.Metrics().Len()
116+
for j := 0; j < ilm.Metrics().Len(); j++ {
117+
metric := ilm.Metrics().At(j)
118+
gotDatapointCount += metric.Summary().DataPoints().Len()
119+
}
120+
}
121+
require.Equal(t, testCase.wantMetricCount, gotMetricCount)
122+
require.Equal(t, testCase.wantDatapointCount, gotDatapointCount)
123+
}
124+
})
125+
}
126+
}

0 commit comments

Comments
 (0)