Skip to content

Commit 977ffa7

Browse files
committed
[receiver/awsfirehosereceiver] Add support for CloudWatch logs
1 parent 6b3237a commit 977ffa7

29 files changed

+749
-30
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: awsfirehosereceiver
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add support for CloudWatch logs
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [35077]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: []

receiver/awsfirehosereceiver/README.md

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
<!-- status autogenerated section -->
44
| Status | |
55
| ------------- |-----------|
6-
| Stability | [alpha]: metrics |
6+
| Stability | [alpha]: metrics, logs |
77
| Distributions | [contrib] |
88
| Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Areceiver%2Fawsfirehose%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Areceiver%2Fawsfirehose) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Areceiver%2Fawsfirehose%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Areceiver%2Fawsfirehose) |
99
| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@Aneurysm9](https://www.github.com/Aneurysm9) |
@@ -62,3 +62,24 @@ See [documentation](https://docs.aws.amazon.com/firehose/latest/dev/create-desti
6262
The record type for the CloudWatch metric stream. Expects the format for the records to be JSON.
6363
See [documentation](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-Metric-Streams.html) for details.
6464

65+
### cwlogs
66+
The record type for the CloudWatch log stream. Expects the format for the records to be JSON.
67+
For example:
68+
69+
```json
70+
{
71+
"messageType": "DATA_MESSAGE",
72+
"owner": "111122223333",
73+
"logGroup": "my-log-group",
74+
"logStream": "my-log-stream",
75+
"subscriptionFilters": ["my-subscription-filter"],
76+
"logEvents": [
77+
{
78+
"id": "123",
79+
"timestamp": 1725544035523,
80+
"message": "My log message."
81+
}
82+
]
83+
}
84+
```
85+

receiver/awsfirehosereceiver/config_test.go

Lines changed: 37 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package awsfirehosereceiver
55

66
import (
7+
"fmt"
78
"path/filepath"
89
"testing"
910

@@ -18,29 +19,40 @@ import (
1819
)
1920

2021
func TestLoadConfig(t *testing.T) {
21-
cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml"))
22-
require.NoError(t, err)
23-
24-
factory := NewFactory()
25-
cfg := factory.CreateDefaultConfig()
26-
27-
sub, err := cm.Sub(component.NewIDWithName(metadata.Type, "").String())
28-
require.NoError(t, err)
29-
require.NoError(t, sub.Unmarshal(cfg))
30-
31-
assert.NoError(t, component.ValidateConfig(cfg))
32-
33-
require.Equal(t, &Config{
34-
RecordType: "cwmetrics",
35-
AccessKey: "some_access_key",
36-
ServerConfig: confighttp.ServerConfig{
37-
Endpoint: "0.0.0.0:4433",
38-
TLSSetting: &configtls.ServerConfig{
39-
Config: configtls.Config{
40-
CertFile: "server.crt",
41-
KeyFile: "server.key",
42-
},
43-
},
44-
},
45-
}, cfg)
22+
for _, configType := range []string{
23+
"cwmetrics", "cwlogs", "invalid",
24+
} {
25+
t.Run(configType, func(t *testing.T) {
26+
fileName := fmt.Sprintf("%s_config.yaml", configType)
27+
cm, err := confmaptest.LoadConf(filepath.Join("testdata", fileName))
28+
require.NoError(t, err)
29+
30+
factory := NewFactory()
31+
cfg := factory.CreateDefaultConfig()
32+
33+
sub, err := cm.Sub(component.NewIDWithName(metadata.Type, "").String())
34+
require.NoError(t, err)
35+
require.NoError(t, sub.Unmarshal(cfg))
36+
37+
err = component.ValidateConfig(cfg)
38+
if configType == "invalid" {
39+
assert.Error(t, err)
40+
} else {
41+
assert.NoError(t, err)
42+
require.Equal(t, &Config{
43+
RecordType: configType,
44+
AccessKey: "some_access_key",
45+
ServerConfig: confighttp.ServerConfig{
46+
Endpoint: "0.0.0.0:4433",
47+
TLSSetting: &configtls.ServerConfig{
48+
Config: configtls.Config{
49+
CertFile: "server.crt",
50+
KeyFile: "server.key",
51+
},
52+
},
53+
},
54+
}, cfg)
55+
}
56+
})
57+
}
4658
}

receiver/awsfirehosereceiver/factory.go

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/localhostgate"
1717
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/metadata"
1818
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler"
19+
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog"
1920
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream"
2021
)
2122

@@ -29,6 +30,7 @@ var (
2930
errUnrecognizedRecordType = errors.New("unrecognized record type")
3031
availableRecordTypes = map[string]bool{
3132
cwmetricstream.TypeStr: true,
33+
cwlog.TypeStr: true,
3234
}
3335
)
3436

@@ -38,7 +40,8 @@ func NewFactory() receiver.Factory {
3840
return receiver.NewFactory(
3941
metadata.Type,
4042
createDefaultConfig,
41-
receiver.WithMetrics(createMetricsReceiver, metadata.MetricsStability))
43+
receiver.WithMetrics(createMetricsReceiver, metadata.MetricsStability),
44+
receiver.WithLogs(createLogsReceiver, metadata.LogsStability))
4245
}
4346

4447
// validateRecordType checks the available record types for the
@@ -59,6 +62,14 @@ func defaultMetricsUnmarshalers(logger *zap.Logger) map[string]unmarshaler.Metri
5962
}
6063
}
6164

65+
// defaultLogsUnmarshalers creates a map of the available logs unmarshalers.
66+
func defaultLogsUnmarshalers(logger *zap.Logger) map[string]unmarshaler.LogsUnmarshaler {
67+
u := cwlog.NewUnmarshaler(logger)
68+
return map[string]unmarshaler.LogsUnmarshaler{
69+
u.Type(): u,
70+
}
71+
}
72+
6273
// createDefaultConfig creates a default config with the endpoint set
6374
// to port 8443 and the record type set to the CloudWatch metric stream.
6475
func createDefaultConfig() component.Config {
@@ -79,3 +90,13 @@ func createMetricsReceiver(
7990
) (receiver.Metrics, error) {
8091
return newMetricsReceiver(cfg.(*Config), set, defaultMetricsUnmarshalers(set.Logger), nextConsumer)
8192
}
93+
94+
// createMetricsReceiver implements the CreateMetricsReceiver function type.
95+
func createLogsReceiver(
96+
_ context.Context,
97+
set receiver.Settings,
98+
cfg component.Config,
99+
nextConsumer consumer.Logs,
100+
) (receiver.Logs, error) {
101+
return newLogsReceiver(cfg.(*Config), set, defaultLogsUnmarshalers(set.Logger), nextConsumer)
102+
}

receiver/awsfirehosereceiver/factory_test.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,17 @@ func TestCreateMetricsReceiver(t *testing.T) {
2929
require.NotNil(t, r)
3030
}
3131

32+
func TestCreateLogsReceiver(t *testing.T) {
33+
r, err := createLogsReceiver(
34+
context.Background(),
35+
receivertest.NewNopSettings(),
36+
createDefaultConfig(),
37+
consumertest.NewNop(),
38+
)
39+
require.NoError(t, err)
40+
require.NotNil(t, r)
41+
}
42+
3243
func TestValidateRecordType(t *testing.T) {
3344
require.NoError(t, validateRecordType(defaultRecordType))
3445
require.Error(t, validateRecordType("nop"))

receiver/awsfirehosereceiver/generated_component_test.go

Lines changed: 7 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

receiver/awsfirehosereceiver/generated_package_test.go

Lines changed: 1 addition & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

receiver/awsfirehosereceiver/internal/metadata/generated_status.go

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package compression
2+
3+
import (
4+
"bytes"
5+
"compress/gzip"
6+
)
7+
8+
func Zip(data []byte) ([]byte, error) {
9+
var b bytes.Buffer
10+
w := gzip.NewWriter(&b)
11+
12+
_, err := w.Write(data)
13+
if err != nil {
14+
return nil, err
15+
}
16+
17+
if err = w.Flush(); err != nil {
18+
return nil, err
19+
}
20+
21+
if err = w.Close(); err != nil {
22+
return nil, err
23+
}
24+
25+
return b.Bytes(), nil
26+
}
27+
28+
func Unzip(data []byte) ([]byte, error) {
29+
b := bytes.NewBuffer(data)
30+
31+
r, err := gzip.NewReader(b)
32+
if err != nil {
33+
return nil, err
34+
}
35+
36+
var rv bytes.Buffer
37+
_, err = rv.ReadFrom(r)
38+
if err != nil {
39+
return nil, err
40+
}
41+
42+
return rv.Bytes(), nil
43+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package cwlog
2+
3+
type cwLog struct {
4+
MessageType string `json:"messageType"`
5+
Owner string `json:"owner"`
6+
LogGroup string `json:"logGroup"`
7+
LogStream string `json:"logStream"`
8+
SubscriptionFilters []string `json:"subscriptionFilters"`
9+
LogEvents []struct {
10+
Id string `json:"id"`
11+
Timestamp int64 `json:"timestamp"`
12+
Message string `json:"message"`
13+
} `json:"logEvents"`
14+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package cwlog
2+
3+
import (
4+
"go.opentelemetry.io/collector/pdata/pcommon"
5+
"go.opentelemetry.io/collector/pdata/plog"
6+
conventions "go.opentelemetry.io/collector/semconv/v1.6.1"
7+
)
8+
9+
const (
10+
attributeCloudwatchLogGroupName = "cloudwatch.log.group.name"
11+
attributeCloudwatchLogStreamName = "cloudwatch.log.stream.name"
12+
)
13+
14+
// resourceAttributes are the CloudWatch log attributes that define a unique resource.
15+
type resourceAttributes struct {
16+
owner, logGroup, logStream string
17+
}
18+
19+
// resourceLogsBuilder provides convenient access to the a Resource's LogRecordSlice.
20+
type resourceLogsBuilder struct {
21+
rls plog.LogRecordSlice
22+
}
23+
24+
// setAttributes applies the resourceAttributes to the provided Resource.
25+
func (ra *resourceAttributes) setAttributes(resource pcommon.Resource) {
26+
attrs := resource.Attributes()
27+
attrs.PutStr(conventions.AttributeCloudAccountID, ra.owner)
28+
attrs.PutStr("cloudwatch.log.group.name", ra.logStream)
29+
attrs.PutStr("cloudwatch.log.stream", ra.logGroup)
30+
}
31+
32+
// newResourceLogsBuilder to capture logs for the Resource defined by the provided attributes.
33+
func newResourceLogsBuilder(logs plog.Logs, attrs resourceAttributes) *resourceLogsBuilder {
34+
rls := logs.ResourceLogs().AppendEmpty()
35+
attrs.setAttributes(rls.Resource())
36+
return &resourceLogsBuilder{rls.ScopeLogs().AppendEmpty().LogRecords()}
37+
}
38+
39+
// AddLog events to the LogRecordSlice. Resource attributes are captured when creating
40+
// the resourceLogsBuilder, so we only need to consider the LogEvents themselves.
41+
func (rlb *resourceLogsBuilder) AddLog(log cwLog) {
42+
for _, event := range log.LogEvents {
43+
logLine := rlb.rls.AppendEmpty()
44+
logLine.SetTimestamp(pcommon.Timestamp(event.Timestamp))
45+
logLine.Body().SetStr(event.Message)
46+
}
47+
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
{"CHANGE":-0.09,"PRICE":4.96,"TICKER_SYMBOL":"KIN","SECTOR":"ENERGY"}
2+
{"CHANGE":-1.47,"PRICE":134.74,"TICKER_SYMBOL":"DFG","SECTOR":"TECHNOLOGY"}
3+
{"CHANGE":1.96,"PRICE":57.53,"TICKER_SYMBOL":"SAC","SECTOR":"ENERGY"}
4+
{"CHANGE":0.04,"PRICE":32.84,"TICKER_SYMBOL":"PJN","SECTOR":"RETAIL"}
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
{"messageType":"DATA_MESSAGE","owner":"123","logGroup":"test","logStream":"test","subscriptionFilters":["test"],"logEvents":[{"id":"38480917865042697267627490045603633139480491071049695232","timestamp":1725544035523,"message":"Hello world, here is our first log message!"}]}
2+
{"messageType":"DATA_MESSAGE","owner":"123","logGroup":"test","logStream":"test","subscriptionFilters":["test"],"logEvents":[{"id":"38480917865042697267627490045603633139480491071049695233","timestamp":1725554035523,"message":"Hello world, here is our second log message!"}]}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{"messageType":"DATA_MESSAGE","owner":"123","logGroup":"test","logStream":"test","subscriptionFilters":["test"],"logEvents":[{"id":"38480917865042697267627490045603633139480491071049695232","timestamp":1725544035523,"message":"Hello world, here is our first log message!"}]}
2+
{"messageType":"DATA_MESSAGE","owner":"123","logGroup":"test","logStream":"test","subscriptionFilters":["test"],"logEvents":[{"id":"38480917865042697267627490045603633139480491071049695233","timestamp":1725554035523,"message":"Hello world, here is our second log message!"}]}
3+
{"messageType":"DATA_MESSAGE","owner":"123","logGroup":"test2","logStream":"test1","subscriptionFilters":["test"],"logEvents":[{"id":"38480917865042697267627490045603633139480491071049695234","timestamp":1725564035523,"message":"Hello world, here is our third log message!"}]}
4+
{"messageType":"DATA_MESSAGE","owner":"123","logGroup":"test2","logStream":"test2","subscriptionFilters":["test"],"logEvents":[{"id":"38480917865042697267627490045603633139480491071049695235","timestamp":1725574035523,"message":"Hello world, here is our fourth log message!"}]}
5+
{"messageType":"DATA_MESSAGE","owner":"123","logGroup":"test2","logStream":"test1","subscriptionFilters":["test"],"logEvents":[{"id":"38480917865042697267627490045603633139480491071049695236","timestamp":1725584035523,"message":"Hello world, here is our fifth log message!"}]}
6+
{"messageType":"DATA_MESSAGE","owner":"123","logGroup":"test2","logStream":"test2","subscriptionFilters":["test"],"logEvents":[{"id":"38480917865042697267627490045603633139480491071049695237","timestamp":1725594035523,"message":"Hello world, here is our sixth log message!"}]}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
{"messageType":"DATA_MESSAGE","owner":"123","logGroup":"test","logStream":"test","subscriptionFilters":["test"],"logEvents":[{"id":"38480917865042697267627490045603633139480491071049695232","timestamp":1725544035523,"message":"Hello world, here is our first log message!"}]}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
{"messageType":"DATA_MESSAGE","owner":"123","logGroup":"test","logStream":"test","subscriptionFilters":["test"],"logEvents":[{"id":"38480917865042697267627490045603633139480491071049695232","timestamp":1725544035523,"message":"Hello world, here is our first log message!"}]}
2+
{"CHANGE":1.96,"PRICE":57.53,"TICKER_SYMBOL":"SAC","SECTOR":"ENERGY"}
3+
{"messageType":"DATA_MESSAGE","owner":"123","logGroup":"test","logStream":"test","subscriptionFilters":["test"],"logEvents":[{"id":"38480917865042697267627490045603633139480491071049695233","timestamp":1725554035523,"message":"Hello world, here is our second log message!"}]}

0 commit comments

Comments
 (0)