Skip to content

Commit 81c5eee

Browse files
axwAneurysm9mx-psi
authored
[receiver/awsfirehose] Add support for encoding extensions (#37262)
#### Description Add support for using encoding extensions for unmarshalling records transmitted via Amazon Data Firehose. The "record_type" config is now deprecated and has been replaced by "encoding". This new config setting supports all of the existing encodings (cwlogs, cwmetrics otlp_v1) as well as support for loading additional encodings via extensions. #### Link to tracking issue Fixes #37113 #### Testing Should be a non-functional change, so mostly relying on existing unit tests to catch issues. Tests have been added for new extension functionality. Manually tested creating a Firehose delivery stream and using the `text_encoding` extension: 1. Ran collector with following config: ```yaml receivers: awsfirehose: endpoint: localhost:1234 encoding: text_encoding exporters: debug: verbosity: detailed extensions: text_encoding: service: extensions: [text_encoding] pipelines: logs: receivers: [awsfirehose] processors: [] exporters: [debug] ``` 2. Exposed to the internet with ngrok 3. Created a Firehose delivery stream pointed at ngrok HTTPS endpoint 4. Used AWS CLI to send a record: `aws firehose put-record --delivery-stream-name=axwtest --record Data=$(echo -n abc | base64)` 5. Observed log record being exported by the debug exporter: ``` 2025-02-11T14:09:17.090+0800 info Logs {"kind": "exporter", "data_type": "logs", "name": "debug", "resource logs": 1, "log records": 1} 2025-02-11T14:09:17.090+0800 info ResourceLog #0 Resource SchemaURL: ScopeLogs #0 ScopeLogs SchemaURL: InstrumentationScope LogRecord #0 ObservedTimestamp: 2025-02-11 06:09:17.090506322 +0000 UTC Timestamp: 1970-01-01 00:00:00 +0000 UTC SeverityText: SeverityNumber: Unspecified(0) Body: Str(abc) Trace ID: Span ID: Flags: 0 ``` #### Documentation Updated README. --------- Co-authored-by: Anthony Mirabella <[email protected]> Co-authored-by: Pablo Baeyens <[email protected]>
1 parent 453abeb commit 81c5eee

13 files changed

+329
-155
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: awsfirehosereceiver
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add support for encoding extensions
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [37113]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: Adds `encoding` config setting, and deprecates the `record_type` setting.
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

receiver/awsfirehosereceiver/README.md

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -45,25 +45,28 @@ See [documentation](https://github.com/open-telemetry/opentelemetry-collector/bl
4545

4646
A `cert_file` and `key_file` are required.
4747

48-
### record_type:
49-
The type of record being received from the delivery stream. Each unmarshaler handles a specific type, so the field allows the receiver to use the correct one.
48+
### encoding:
49+
50+
The ID of an [encoding extension](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/extension/encoding) for decoding logs or metrics.
51+
This configuration also supports the built-in encodings listed in the [Encodings](#encodings) section.
52+
If no encoding is specified, then the receiver will default to a signal-specific encoding: `cwmetrics` for metrics, and `cwlogs` for logs.
5053

51-
default: `cwmetrics`
54+
### record_type:
5255

53-
See the [Record Types](#record-types) section for all available options.
56+
Deprecated, use `encoding` instead. `record_type` will be removed in a future release; it is an alias for `encoding`.
5457

5558
### access_key (Optional):
5659
The access key to be checked on each request received. This can be set when creating or updating the delivery stream.
5760
See [documentation](https://docs.aws.amazon.com/firehose/latest/dev/create-destination.html#create-destination-http) for details.
5861

59-
## Record Types
62+
## Encodings
6063

6164
### cwmetrics
62-
The record type for the CloudWatch metric stream. Expects the format for the records to be JSON.
65+
The encoding for the CloudWatch metric stream. Expects the format for the records to be JSON.
6366
See [documentation](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-Metric-Streams.html) for details.
6467

6568
### cwlogs
66-
The record type for the CloudWatch log stream. Expects the format for the records to be JSON.
69+
The encoding for the CloudWatch log stream. Expects the format for the records to be JSON.
6770
For example:
6871

6972
```json
@@ -84,5 +87,5 @@ For example:
8487
```
8588

8689
### otlp_v1
87-
The OTLP v1 format as produced by CloudWatch metric streams.
90+
The OTLP v1 encoding as produced by CloudWatch metric streams.
8891
See [documentation](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-metric-streams-formats-opentelemetry-100.html) for details.

receiver/awsfirehosereceiver/config.go

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,24 @@ import (
88

99
"go.opentelemetry.io/collector/config/confighttp"
1010
"go.opentelemetry.io/collector/config/configopaque"
11+
"go.uber.org/zap"
1112
)
1213

14+
var errRecordTypeEncodingSet = errors.New("record_type must not be set when encoding is set")
15+
1316
type Config struct {
1417
// ServerConfig is used to set up the Firehose delivery
1518
// endpoint. The Firehose delivery stream expects an HTTPS
1619
// endpoint, so TLSSettings must be used to enable that.
1720
confighttp.ServerConfig `mapstructure:",squash"`
18-
// RecordType is the key used to determine which unmarshaler to use
19-
// when receiving the requests.
21+
// Encoding identifies the encoding of records received from
22+
// Firehose. Defaults to telemetry-specific encodings: "cwlog"
23+
// for logs, and "cwmetrics" for metrics.
24+
Encoding string `mapstructure:"encoding"`
25+
// RecordType is an alias for Encoding for backwards compatibility.
26+
// It is an error to specify both encoding and record_type.
27+
//
28+
// Deprecated: [v0.121.0] use Encoding instead.
2029
RecordType string `mapstructure:"record_type"`
2130
// AccessKey is checked against the one received with each request.
2231
// This can be set when creating or updating the Firehose delivery
@@ -30,10 +39,14 @@ func (c *Config) Validate() error {
3039
if c.Endpoint == "" {
3140
return errors.New("must specify endpoint")
3241
}
33-
// If a record type is specified, it must be valid.
34-
// An empty string is acceptable, however, because it will use a telemetry-type-specific default.
35-
if c.RecordType != "" {
36-
return validateRecordType(c.RecordType)
42+
if c.RecordType != "" && c.Encoding != "" {
43+
return errRecordTypeEncodingSet
3744
}
3845
return nil
3946
}
47+
48+
func handleDeprecatedConfig(cfg *Config, logger *zap.Logger) {
49+
if cfg.RecordType != "" {
50+
logger.Warn("record_type is deprecated, and will be removed in a future version. Use encoding instead.")
51+
}
52+
}

receiver/awsfirehosereceiver/config_test.go

Lines changed: 28 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import (
2020

2121
func TestLoadConfig(t *testing.T) {
2222
for _, configType := range []string{
23-
"cwmetrics", "cwlogs", "otlp_v1", "invalid",
23+
"cwmetrics", "cwlogs", "otlp_v1",
2424
} {
2525
t.Run(configType, func(t *testing.T) {
2626
fileName := configType + "_config.yaml"
@@ -35,24 +35,35 @@ func TestLoadConfig(t *testing.T) {
3535
require.NoError(t, sub.Unmarshal(cfg))
3636

3737
err = xconfmap.Validate(cfg)
38-
if configType == "invalid" {
39-
assert.Error(t, err)
40-
} else {
41-
assert.NoError(t, err)
42-
require.Equal(t, &Config{
43-
RecordType: configType,
44-
AccessKey: "some_access_key",
45-
ServerConfig: confighttp.ServerConfig{
46-
Endpoint: "0.0.0.0:4433",
47-
TLSSetting: &configtls.ServerConfig{
48-
Config: configtls.Config{
49-
CertFile: "server.crt",
50-
KeyFile: "server.key",
51-
},
38+
assert.NoError(t, err)
39+
require.Equal(t, &Config{
40+
RecordType: configType,
41+
AccessKey: "some_access_key",
42+
ServerConfig: confighttp.ServerConfig{
43+
Endpoint: "0.0.0.0:4433",
44+
TLSSetting: &configtls.ServerConfig{
45+
Config: configtls.Config{
46+
CertFile: "server.crt",
47+
KeyFile: "server.key",
5248
},
5349
},
54-
}, cfg)
55-
}
50+
},
51+
}, cfg)
5652
})
5753
}
5854
}
55+
56+
func TestLoadConfigInvalid(t *testing.T) {
57+
cm, err := confmaptest.LoadConf(filepath.Join("testdata", "invalid_config.yaml"))
58+
require.NoError(t, err)
59+
60+
factory := NewFactory()
61+
cfg := factory.CreateDefaultConfig()
62+
63+
sub, err := cm.Sub(component.NewIDWithName(metadata.Type, "").String())
64+
require.NoError(t, err)
65+
require.NoError(t, sub.Unmarshal(cfg))
66+
67+
err = xconfmap.Validate(cfg)
68+
assert.ErrorIs(t, err, errRecordTypeEncodingSet)
69+
}

receiver/awsfirehosereceiver/factory.go

Lines changed: 6 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -5,35 +5,19 @@ package awsfirehosereceiver // import "github.com/open-telemetry/opentelemetry-c
55

66
import (
77
"context"
8-
"errors"
98

109
"go.opentelemetry.io/collector/component"
1110
"go.opentelemetry.io/collector/config/confighttp"
1211
"go.opentelemetry.io/collector/consumer"
13-
"go.opentelemetry.io/collector/pdata/plog"
14-
"go.opentelemetry.io/collector/pdata/pmetric"
1512
"go.opentelemetry.io/collector/receiver"
16-
"go.uber.org/zap"
1713

1814
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/metadata"
19-
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog"
20-
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream"
21-
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/otlpmetricstream"
2215
)
2316

2417
const (
2518
defaultEndpoint = "localhost:4433"
2619
)
2720

28-
var (
29-
errUnrecognizedRecordType = errors.New("unrecognized record type")
30-
availableRecordTypes = map[string]bool{
31-
cwmetricstream.TypeStr: true,
32-
cwlog.TypeStr: true,
33-
otlpmetricstream.TypeStr: true,
34-
}
35-
)
36-
3721
// NewFactory creates a receiver factory for awsfirehose. Currently, only
3822
// available in metrics pipelines.
3923
func NewFactory() receiver.Factory {
@@ -44,34 +28,6 @@ func NewFactory() receiver.Factory {
4428
receiver.WithLogs(createLogsReceiver, metadata.LogsStability))
4529
}
4630

47-
// validateRecordType checks the available record types for the
48-
// passed in one and returns an error if not found.
49-
func validateRecordType(recordType string) error {
50-
if _, ok := availableRecordTypes[recordType]; !ok {
51-
return errUnrecognizedRecordType
52-
}
53-
return nil
54-
}
55-
56-
// defaultMetricsUnmarshalers creates a map of the available metrics
57-
// unmarshalers.
58-
func defaultMetricsUnmarshalers(logger *zap.Logger) map[string]pmetric.Unmarshaler {
59-
cwmsu := cwmetricstream.NewUnmarshaler(logger)
60-
otlpv1msu := otlpmetricstream.NewUnmarshaler(logger)
61-
return map[string]pmetric.Unmarshaler{
62-
cwmsu.Type(): cwmsu,
63-
otlpv1msu.Type(): otlpv1msu,
64-
}
65-
}
66-
67-
// defaultLogsUnmarshalers creates a map of the available logs unmarshalers.
68-
func defaultLogsUnmarshalers(logger *zap.Logger) map[string]plog.Unmarshaler {
69-
u := cwlog.NewUnmarshaler(logger)
70-
return map[string]plog.Unmarshaler{
71-
u.Type(): u,
72-
}
73-
}
74-
7531
// createDefaultConfig creates a default config with the endpoint set
7632
// to port 8443 and the record type set to the CloudWatch metric stream.
7733
func createDefaultConfig() component.Config {
@@ -89,7 +45,9 @@ func createMetricsReceiver(
8945
cfg component.Config,
9046
nextConsumer consumer.Metrics,
9147
) (receiver.Metrics, error) {
92-
return newMetricsReceiver(cfg.(*Config), set, defaultMetricsUnmarshalers(set.Logger), nextConsumer)
48+
c := cfg.(*Config)
49+
handleDeprecatedConfig(c, set.Logger)
50+
return newMetricsReceiver(c, set, nextConsumer)
9351
}
9452

9553
// createMetricsReceiver implements the CreateMetricsReceiver function type.
@@ -99,5 +57,7 @@ func createLogsReceiver(
9957
cfg component.Config,
10058
nextConsumer consumer.Logs,
10159
) (receiver.Logs, error) {
102-
return newLogsReceiver(cfg.(*Config), set, defaultLogsUnmarshalers(set.Logger), nextConsumer)
60+
c := cfg.(*Config)
61+
handleDeprecatedConfig(c, set.Logger)
62+
return newLogsReceiver(c, set, nextConsumer)
10363
}

receiver/awsfirehosereceiver/factory_test.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ import (
1313
"go.opentelemetry.io/collector/receiver/receivertest"
1414

1515
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/metadata"
16-
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/otlpmetricstream"
1716
)
1817

1918
func TestValidConfig(t *testing.T) {
@@ -42,10 +41,3 @@ func TestCreateLogsReceiver(t *testing.T) {
4241
require.NoError(t, err)
4342
require.NotNil(t, r)
4443
}
45-
46-
func TestValidateRecordType(t *testing.T) {
47-
require.NoError(t, validateRecordType(defaultMetricsRecordType))
48-
require.NoError(t, validateRecordType(defaultLogsRecordType))
49-
require.NoError(t, validateRecordType(otlpmetricstream.TypeStr))
50-
require.Error(t, validateRecordType("nop"))
51-
}

receiver/awsfirehosereceiver/logs_receiver.go

Lines changed: 34 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"io"
1111
"net/http"
1212

13+
"go.opentelemetry.io/collector/component"
1314
"go.opentelemetry.io/collector/consumer"
1415
"go.opentelemetry.io/collector/consumer/consumererror"
1516
"go.opentelemetry.io/collector/pdata/plog"
@@ -18,11 +19,14 @@ import (
1819
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog"
1920
)
2021

21-
const defaultLogsRecordType = cwlog.TypeStr
22+
const defaultLogsEncoding = cwlog.TypeStr
2223

2324
// logsConsumer implements the firehoseConsumer
2425
// to use a logs consumer and unmarshaler.
2526
type logsConsumer struct {
27+
config *Config
28+
settings receiver.Settings
29+
2630
// consumer passes the translated logs on to the
2731
// next consumer.
2832
consumer consumer.Logs
@@ -38,21 +42,12 @@ var _ firehoseConsumer = (*logsConsumer)(nil)
3842
func newLogsReceiver(
3943
config *Config,
4044
set receiver.Settings,
41-
unmarshalers map[string]plog.Unmarshaler,
4245
nextConsumer consumer.Logs,
4346
) (receiver.Logs, error) {
44-
recordType := config.RecordType
45-
if recordType == "" {
46-
recordType = defaultLogsRecordType
47-
}
48-
configuredUnmarshaler := unmarshalers[recordType]
49-
if configuredUnmarshaler == nil {
50-
return nil, fmt.Errorf("%w: recordType = %s", errUnrecognizedRecordType, recordType)
51-
}
52-
5347
c := &logsConsumer{
54-
consumer: nextConsumer,
55-
unmarshaler: configuredUnmarshaler,
48+
config: config,
49+
settings: set,
50+
consumer: nextConsumer,
5651
}
5752
return &firehoseReceiver{
5853
settings: set,
@@ -61,8 +56,32 @@ func newLogsReceiver(
6156
}, nil
6257
}
6358

64-
// Consume uses the configured unmarshaler to unmarshal each record
65-
// into a plog.Logs and pass it to the next consumer, one record at a time.
59+
// Start sets the consumer's log unmarshaler to either a built-in
60+
// unmarshaler or one loaded from an encoding extension.
61+
func (c *logsConsumer) Start(_ context.Context, host component.Host) error {
62+
encoding := c.config.Encoding
63+
if encoding == "" {
64+
encoding = c.config.RecordType
65+
if encoding == "" {
66+
encoding = defaultLogsEncoding
67+
}
68+
}
69+
if encoding == cwlog.TypeStr {
70+
// TODO: make cwlogs an encoding extension
71+
c.unmarshaler = cwlog.NewUnmarshaler(c.settings.Logger)
72+
} else {
73+
unmarshaler, err := loadEncodingExtension[plog.Unmarshaler](host, encoding, "logs")
74+
if err != nil {
75+
return fmt.Errorf("failed to load encoding extension: %w", err)
76+
}
77+
c.unmarshaler = unmarshaler
78+
}
79+
return nil
80+
}
81+
82+
// Consume uses the configured unmarshaler to deserialize each record,
83+
// with each resulting plog.Logs being sent to the next consumer as
84+
// they are unmarshalled.
6685
func (c *logsConsumer) Consume(ctx context.Context, nextRecord nextRecordFunc, commonAttributes map[string]string) (int, error) {
6786
for {
6887
record, err := nextRecord()

0 commit comments

Comments
 (0)