Skip to content

Commit 3434299

Browse files
axwatoulme
authored andcommitted
exporter/kafkaexporter: add signal-specific config (open-telemetry#39204)
#### Description Deprecate `topic` and `encoding`, and introduce signal-specific equivalents: - `logs::topic`, `metrics::topic`, and `traces::topic` - `logs::encoding`, `metrics::encoding`, and `traces::encoding` This enables users to explicitly define a configuration equivalent to the default configuration, or some variation thereof. It also enables specifying different encodings for each signal type, which may be important due to the fact that some encodings only support a subset of signals. <!-- Issue number (e.g. open-telemetry#1234) or full URL to issue, if applicable. --> #### Link to tracking issue Fixes open-telemetry#35432 #### Testing Unit tests added. #### Documentation Updated README. --------- Co-authored-by: Antoine Toulme <[email protected]>
1 parent adf9dc1 commit 3434299

File tree

9 files changed

+233
-42
lines changed

9 files changed

+233
-42
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: deprecation
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: kafkaexporter
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: deprecate `topic` and `encoding`, introduce signal-specific configuration
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [35432]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

exporter/kafkaexporter/README.md

Lines changed: 35 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,26 +17,28 @@ Kafka exporter exports logs, metrics, and traces to Kafka. This exporter uses a
1717
that blocks and does not batch messages, therefore it should be used with batch and queued retry
1818
processors for higher throughput and resiliency. Message payload encoding is configurable.
1919

20+
## Configuration settings
21+
2022
There are no required settings.
2123

2224
The following settings can be optionally configured:
2325
- `brokers` (default = localhost:9092): The list of kafka brokers.
2426
- `protocol_version` (default = 2.1.0): Kafka protocol version.
2527
- `resolve_canonical_bootstrap_servers_only` (default = false): Whether to resolve then reverse-lookup broker IPs during startup.
2628
- `client_id` (default = "otel-collector"): The client ID to configure the Kafka client with. The client ID will be used for all produce requests.
27-
- `topic` (default = otlp_spans for traces, otlp_metrics for metrics, otlp_logs for logs): The name of the default kafka topic to export to. See [Destination Topic](#destination-topic) below for more details.
29+
- `logs`
30+
- `topic` (default = otlp\_logs): The name of the Kafka topic to which logs will be exported.
31+
- `encoding` (default = otlp\_proto): The encoding for logs. See [Supported encodings](#supported-encodings).
32+
- `metrics`
33+
- `topic` (default = otlp\_metrics): The name of the Kafka topic from which to consume metrics.
34+
- `encoding` (default = otlp\_proto): The encoding for metrics. See [Supported encodings](#supported-encodings).
35+
- `traces`
36+
- `topic` (default = otlp\_spans): The name of the Kafka topic from which to consume traces.
37+
- `encoding` (default = otlp\_proto): The encoding for traces. See [Supported encodings](#supported-encodings).
38+
- `topic` (Deprecated in v0.124.0: use `logs::topic`, `metrics::topic`, and `traces::topic`) If specified, this is used as the default topic, but will be overridden by signal-specific configuration. See [Destination Topic](#destination-topic) below for more details.
2839
- `topic_from_attribute` (default = ""): Specify the resource attribute whose value should be used as the message's topic. See [Destination Topic](#destination-topic) below for more details.
40+
- `encoding` (Deprecated in v0.124.0: use `logs::encoding`, `metrics::encoding`, and `traces::encoding`) If specified, this is used as the default encoding, but will be overridden by signal-specific configuration. See [Supported encodings](#supported-encodings) below for more details.
2941
- `include_metadata_keys` (default = []): Specifies a list of metadata keys to propagate as Kafka message headers. If one or more keys aren't found in the metadata, they are ignored.
30-
- `encoding` (default = otlp_proto): The encoding of the traces sent to kafka. All available encodings:
31-
- `otlp_proto`: payload is Protobuf serialized from `ExportTraceServiceRequest` if set as a traces exporter or `ExportMetricsServiceRequest` for metrics or `ExportLogsServiceRequest` for logs.
32-
- `otlp_json`: payload is JSON serialized from `ExportTraceServiceRequest` if set as a traces exporter or `ExportMetricsServiceRequest` for metrics or `ExportLogsServiceRequest` for logs.
33-
- The following encodings are valid *only* for **traces**.
34-
- `jaeger_proto`: the payload is serialized to a single Jaeger proto `Span`, and keyed by TraceID.
35-
- `jaeger_json`: the payload is serialized to a single Jaeger JSON Span using `jsonpb`, and keyed by TraceID.
36-
- `zipkin_proto`: the payload is serialized to Zipkin v2 proto Span.
37-
- `zipkin_json`: the payload is serialized to Zipkin v2 JSON Span.
38-
- The following encodings are valid *only* for **logs**.
39-
- `raw`: if the log record body is a byte array, it is sent as is. Otherwise, it is serialized to JSON. Resource and record attributes are discarded.
4042
- `partition_traces_by_id` (default = false): configures the exporter to include the trace ID as the message key in trace messages sent to kafka. *Please note:* this setting does not have any effect on Jaeger encoding exporters since Jaeger exporters include trace ID as the message key by default.
4143
- `partition_metrics_by_resource_attributes` (default = false) configures the exporter to include the hash of sorted resource attributes as the message partitioning key in metric messages sent to kafka.
4244
- `partition_logs_by_resource_attributes` (default = false) configures the exporter to include the hash of sorted resource attributes as the message partitioning key in log messages sent to kafka.
@@ -88,6 +90,25 @@ The following settings can be optionally configured:
8890
- `compression` (default = 'none') the compression used when producing messages to kafka. The options are: `none`, `gzip`, `snappy`, `lz4`, and `zstd` https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#compression-type
8991
- `flush_max_messages` (default = 0) The maximum number of messages the producer will send in a single broker request.
9092

93+
### Supported encodings
94+
95+
The Kafka exporter supports encoding extensions, as well as the following built-in encodings.
96+
97+
Available for all signals:
98+
- `otlp_proto`: data is encoded as OTLP Protobuf
99+
- `otlp_json`: data is encoded as OTLP JSON
100+
101+
Available only for traces:
102+
- `jaeger_proto`: the payload is serialized to a single Jaeger proto `Span`, and keyed by TraceID.
103+
- `jaeger_json`: the payload is serialized to a single Jaeger JSON Span using `jsonpb`, and keyed by TraceID.
104+
- `zipkin_proto`: the payload is serialized to Zipkin v2 proto Span.
105+
- `zipkin_json`: the payload is serialized to Zipkin v2 JSON Span.
106+
107+
Available only for logs:
108+
- `raw`: if the log record body is a byte array, it is sent as is. Otherwise, it is serialized to JSON. Resource and record attributes are discarded.
109+
110+
### Example configuration
111+
91112
Example configuration:
92113

93114
```yaml
@@ -98,7 +119,9 @@ exporters:
98119
```
99120
100121
## Destination Topic
122+
101123
The destination topic can be defined in a few different ways and takes priority in the following order:
124+
102125
1. When `topic_from_attribute` is configured, and the corresponding attribute is found on the ingested data, the value of this attribute is used.
103126
2. If a prior component in the collector pipeline sets the topic on the context via the `topic.WithTopic` function (from the `github.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/topic` package), the value set in the context is used.
104-
3. Finally, the `topic` configuration is used as a default/fallback destination.
127+
3. Finally, the `<signal>::topic` configuration is used for the signal-specific destination topic. If this is not explicitly configured, the `topic` configuration (deprecated in v0.124.0) is used as a fallback for all signals.

exporter/kafkaexporter/config.go

Lines changed: 75 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package kafkaexporter // import "github.com/open-telemetry/opentelemetry-collect
66
import (
77
"go.opentelemetry.io/collector/component"
88
"go.opentelemetry.io/collector/config/configretry"
9+
"go.opentelemetry.io/collector/confmap"
910
"go.opentelemetry.io/collector/exporter/exporterhelper"
1011

1112
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka/configkafka"
@@ -21,7 +22,21 @@ type Config struct {
2122
configkafka.ClientConfig `mapstructure:",squash"`
2223
Producer configkafka.ProducerConfig `mapstructure:"producer"`
2324

24-
// The name of the kafka topic to export to (default otlp_spans for traces, otlp_metrics for metrics)
25+
// Logs holds configuration about how logs should be sent to Kafka.
26+
Logs SignalConfig `mapstructure:"logs"`
27+
28+
// Metrics holds configuration about how metrics should be sent to Kafka.
29+
Metrics SignalConfig `mapstructure:"metrics"`
30+
31+
// Traces holds configuration about how traces should be sent to Kafka.
32+
Traces SignalConfig `mapstructure:"traces"`
33+
34+
// Topic holds the name of the Kafka topic to which data should be exported.
35+
//
36+
// Topic has no default. If explicitly specified, it will take precedence over
37+
// the default values of logs::topic, metrics::topic, and traces::topic.
38+
//
39+
// Deprecated [v0.124.0]: use logs::topic, metrics::topic, and traces::topic instead.
2540
Topic string `mapstructure:"topic"`
2641

2742
// IncludeMetadataKeys indicates the receiver's client metadata keys to propagate as Kafka message headers.
@@ -30,7 +45,12 @@ type Config struct {
3045
// TopicFromAttribute is the name of the attribute to use as the topic name.
3146
TopicFromAttribute string `mapstructure:"topic_from_attribute"`
3247

33-
// Encoding of messages (default "otlp_proto")
48+
// Encoding holds the encoding of Kafka message values.
49+
//
50+
// Encoding has no default. If explicitly specified, it will take precedence over
51+
// the default values of logs::encoding, metrics::encoding, and traces::encoding.
52+
//
53+
// Deprecated [v0.124.0]: use logs::encoding, metrics::encoding, and traces::encoding instead.
3454
Encoding string `mapstructure:"encoding"`
3555

3656
// PartitionTracesByID sets the message key of outgoing trace messages to the trace ID.
@@ -49,3 +69,56 @@ type Config struct {
4969
// attributes.
5070
PartitionLogsByResourceAttributes bool `mapstructure:"partition_logs_by_resource_attributes"`
5171
}
72+
73+
func (c *Config) Unmarshal(conf *confmap.Conf) error {
74+
if err := conf.Unmarshal(c); err != nil {
75+
return err
76+
}
77+
// Check if deprecated fields have been explicitly set,
78+
// in which case they should be used instead of signal-
79+
// specific defaults.
80+
var zeroConfig Config
81+
if err := conf.Unmarshal(&zeroConfig); err != nil {
82+
return err
83+
}
84+
if c.Topic != "" {
85+
if zeroConfig.Logs.Topic == "" {
86+
c.Logs.Topic = c.Topic
87+
}
88+
if zeroConfig.Metrics.Topic == "" {
89+
c.Metrics.Topic = c.Topic
90+
}
91+
if zeroConfig.Traces.Topic == "" {
92+
c.Traces.Topic = c.Topic
93+
}
94+
}
95+
if c.Encoding != "" {
96+
if zeroConfig.Logs.Encoding == "" {
97+
c.Logs.Encoding = c.Encoding
98+
}
99+
if zeroConfig.Metrics.Encoding == "" {
100+
c.Metrics.Encoding = c.Encoding
101+
}
102+
if zeroConfig.Traces.Encoding == "" {
103+
c.Traces.Encoding = c.Encoding
104+
}
105+
}
106+
return conf.Unmarshal(c)
107+
}
108+
109+
// SignalConfig holds signal-specific configuration for the Kafka exporter.
110+
type SignalConfig struct {
111+
// Topic holds the name of the Kafka topic to which messages of the
112+
// signal type should be produced.
113+
//
114+
// The default depends on the signal type:
115+
// - "otlp_spans" for traces
116+
// - "otlp_metrics" for metrics
117+
// - "otlp_logs" for logs
118+
Topic string `mapstructure:"topic"`
119+
120+
// Encoding holds the encoding of messages for the signal type.
121+
//
122+
// Defaults to "otlp_proto".
123+
Encoding string `mapstructure:"encoding"`
124+
}

exporter/kafkaexporter/config_test.go

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,13 +60,70 @@ func TestLoadConfig(t *testing.T) {
6060
config.RequiredAcks = configkafka.WaitForAll
6161
return config
6262
}(),
63+
Logs: SignalConfig{
64+
Topic: "spans",
65+
Encoding: "otlp_proto",
66+
},
67+
Metrics: SignalConfig{
68+
Topic: "spans",
69+
Encoding: "otlp_proto",
70+
},
71+
Traces: SignalConfig{
72+
Topic: "spans",
73+
Encoding: "otlp_proto",
74+
},
6375
Topic: "spans",
64-
Encoding: "otlp_proto",
6576
PartitionTracesByID: true,
6677
PartitionMetricsByResourceAttributes: true,
6778
PartitionLogsByResourceAttributes: true,
6879
},
6980
},
81+
{
82+
id: component.NewIDWithName(metadata.Type, "legacy_topic"),
83+
expected: &Config{
84+
TimeoutSettings: exporterhelper.NewDefaultTimeoutConfig(),
85+
BackOffConfig: configretry.NewDefaultBackOffConfig(),
86+
QueueSettings: exporterhelper.NewDefaultQueueConfig(),
87+
ClientConfig: configkafka.NewDefaultClientConfig(),
88+
Producer: configkafka.NewDefaultProducerConfig(),
89+
Logs: SignalConfig{
90+
Topic: "legacy_topic",
91+
Encoding: "otlp_proto",
92+
},
93+
Metrics: SignalConfig{
94+
Topic: "metrics_topic",
95+
Encoding: "otlp_proto",
96+
},
97+
Traces: SignalConfig{
98+
Topic: "legacy_topic",
99+
Encoding: "otlp_proto",
100+
},
101+
Topic: "legacy_topic",
102+
},
103+
},
104+
{
105+
id: component.NewIDWithName(metadata.Type, "legacy_encoding"),
106+
expected: &Config{
107+
TimeoutSettings: exporterhelper.NewDefaultTimeoutConfig(),
108+
BackOffConfig: configretry.NewDefaultBackOffConfig(),
109+
QueueSettings: exporterhelper.NewDefaultQueueConfig(),
110+
ClientConfig: configkafka.NewDefaultClientConfig(),
111+
Producer: configkafka.NewDefaultProducerConfig(),
112+
Logs: SignalConfig{
113+
Topic: "otlp_logs",
114+
Encoding: "legacy_encoding",
115+
},
116+
Metrics: SignalConfig{
117+
Topic: "otlp_metrics",
118+
Encoding: "metrics_encoding",
119+
},
120+
Traces: SignalConfig{
121+
Topic: "otlp_spans",
122+
Encoding: "legacy_encoding",
123+
},
124+
Encoding: "legacy_encoding",
125+
},
126+
},
70127
}
71128

72129
for _, tt := range tests {

exporter/kafkaexporter/factory.go

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,13 @@ import (
1717
)
1818

1919
const (
20-
defaultTracesTopic = "otlp_spans"
21-
defaultMetricsTopic = "otlp_metrics"
22-
defaultLogsTopic = "otlp_logs"
23-
defaultEncoding = "otlp_proto"
20+
defaultLogsTopic = "otlp_logs"
21+
defaultLogsEncoding = "otlp_proto"
22+
defaultMetricsTopic = "otlp_metrics"
23+
defaultMetricsEncoding = "otlp_proto"
24+
defaultTracesTopic = "otlp_spans"
25+
defaultTracesEncoding = "otlp_proto"
26+
2427
// partitioning metrics by resource attributes is disabled by default
2528
defaultPartitionMetricsByResourceAttributesEnabled = false
2629
// partitioning logs by resource attributes is disabled by default
@@ -45,9 +48,18 @@ func createDefaultConfig() component.Config {
4548
QueueSettings: exporterhelper.NewDefaultQueueConfig(),
4649
ClientConfig: configkafka.NewDefaultClientConfig(),
4750
Producer: configkafka.NewDefaultProducerConfig(),
48-
// using an empty topic to track when it has not been set by user, default is based on traces or metrics.
49-
Topic: "",
50-
Encoding: defaultEncoding,
51+
Logs: SignalConfig{
52+
Topic: defaultLogsTopic,
53+
Encoding: defaultLogsEncoding,
54+
},
55+
Metrics: SignalConfig{
56+
Topic: defaultMetricsTopic,
57+
Encoding: defaultMetricsEncoding,
58+
},
59+
Traces: SignalConfig{
60+
Topic: defaultTracesTopic,
61+
Encoding: defaultTracesEncoding,
62+
},
5163
PartitionMetricsByResourceAttributes: defaultPartitionMetricsByResourceAttributesEnabled,
5264
PartitionLogsByResourceAttributes: defaultPartitionLogsByResourceAttributesEnabled,
5365
}
@@ -59,9 +71,6 @@ func createTracesExporter(
5971
cfg component.Config,
6072
) (exporter.Traces, error) {
6173
oCfg := *(cfg.(*Config)) // Clone the config
62-
if oCfg.Topic == "" {
63-
oCfg.Topic = defaultTracesTopic
64-
}
6574
exp := newTracesExporter(oCfg, set)
6675
return exporterhelper.NewTraces(
6776
ctx,
@@ -85,9 +94,6 @@ func createMetricsExporter(
8594
cfg component.Config,
8695
) (exporter.Metrics, error) {
8796
oCfg := *(cfg.(*Config)) // Clone the config
88-
if oCfg.Topic == "" {
89-
oCfg.Topic = defaultMetricsTopic
90-
}
9197
exp := newMetricsExporter(oCfg, set)
9298
return exporterhelper.NewMetrics(
9399
ctx,
@@ -111,9 +117,6 @@ func createLogsExporter(
111117
cfg component.Config,
112118
) (exporter.Logs, error) {
113119
oCfg := *(cfg.(*Config)) // Clone the config
114-
if oCfg.Topic == "" {
115-
oCfg.Topic = defaultLogsTopic
116-
}
117120
exp := newLogsExporter(oCfg, set)
118121
return exporterhelper.NewLogs(
119122
ctx,

exporter/kafkaexporter/factory_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ func TestCreateMetricExporter(t *testing.T) {
6666
conf: applyConfigOption(func(conf *Config) {
6767
// Disabling broker check to ensure encoding work
6868
conf.Metadata.Full = false
69-
conf.Encoding = defaultEncoding
69+
conf.Encoding = "otlp_proto"
7070
}),
7171
err: nil,
7272
},
@@ -128,7 +128,7 @@ func TestCreateLogExporter(t *testing.T) {
128128
conf: applyConfigOption(func(conf *Config) {
129129
// Disabling broker check to ensure encoding work
130130
conf.Metadata.Full = false
131-
conf.Encoding = defaultEncoding
131+
conf.Encoding = "otlp_proto"
132132
}),
133133
err: nil,
134134
},
@@ -188,7 +188,7 @@ func TestCreateTraceExporter(t *testing.T) {
188188
conf: applyConfigOption(func(conf *Config) {
189189
// Disabling broker check to ensure encoding work
190190
conf.Metadata.Full = false
191-
conf.Encoding = defaultEncoding
191+
conf.Encoding = "otlp_proto"
192192
}),
193193
err: nil,
194194
},

0 commit comments

Comments
 (0)