Skip to content

Commit 325f3b8

Browse files
authored
kafkaexporter: Allow topics from metadata key (#40154)
#### Description Introduces a new per-signal config option `topic_from_metadata_key` to allow the user to produce to topics which which can be set using parts of the request metadata. This is useful for multi-tenant use-cases. <!-- Issue number (e.g. #1234) or full URL to issue, if applicable. --> #### Link to tracking issue Closes #39208 <!--Describe what testing was performed and which tests were added.--> #### Testing Unit tests <!--Describe the documentation added.--> #### Documentation Updated the readme with the new option. --------- Signed-off-by: Marc Lopez Rubio <[email protected]>
1 parent c37a07f commit 325f3b8

File tree

7 files changed

+176
-80
lines changed

7 files changed

+176
-80
lines changed
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: 'enhancement'
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: 'kafkaexporter'
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: "Allow kafka exporter to produce to topics based on metadata key values"
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [39208]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: |
19+
Allows the Kafka exporter to dynamically use a signal's export target topic based
20+
on the value of the pipeline's metadata, allowing dynamic signal routing.
21+
22+
# If your change doesn't affect end users or the exported elements of any package,
23+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
24+
# Optional: The change log or logs in which this entry should be included.
25+
# e.g. '[user]' or '[user, api]'
26+
# Include 'user' if the change is relevant to end users.
27+
# Include 'api' if there is a change to a library API.
28+
# Default: '[user]'
29+
change_logs: [user]

exporter/kafkaexporter/README.md

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,15 @@ The following settings can be optionally configured:
3030
- `logs`
3131
- `topic` (default = otlp\_logs): The name of the Kafka topic to which logs will be exported.
3232
- `encoding` (default = otlp\_proto): The encoding for logs. See [Supported encodings](#supported-encodings).
33+
- `topic_from_metadata_key` (default = ""): The name of the metadata key whose value should be used as the message's topic. Useful to dynamically produce to topics based on request inputs. It takes precedence over `topic_from_attribute` and `topic` settings.
3334
- `metrics`
3435
- `topic` (default = otlp\_metrics): The name of the Kafka topic from which to consume metrics.
3536
- `encoding` (default = otlp\_proto): The encoding for metrics. See [Supported encodings](#supported-encodings).
37+
- `topic_from_metadata_key` (default = ""): The name of the metadata key whose value should be used as the message's topic. Useful to dynamically produce to topics based on request inputs. It takes precedence over `topic_from_attribute` and `topic` settings.
3638
- `traces`
3739
- `topic` (default = otlp\_spans): The name of the Kafka topic from which to consume traces.
3840
- `encoding` (default = otlp\_proto): The encoding for traces. See [Supported encodings](#supported-encodings).
41+
- `topic_from_metadata_key` (default = ""): The name of the metadata key whose value should be used as the message's topic. Useful to dynamically produce to topics based on request inputs. It takes precedence over `topic_from_attribute` and `topic` settings.
3942
- `topic` (Deprecated in v0.124.0: use `logs::topic`, `metrics::topic`, and `traces::topic`) If specified, this is used as the default topic, but will be overridden by signal-specific configuration. See [Destination Topic](#destination-topic) below for more details.
4043
- `topic_from_attribute` (default = ""): Specify the resource attribute whose value should be used as the message's topic. See [Destination Topic](#destination-topic) below for more details.
4144
- `encoding` (Deprecated in v0.124.0: use `logs::encoding`, `metrics::encoding`, and `traces::encoding`) If specified, this is used as the default encoding, but will be overridden by signal-specific configuration. See [Supported encodings](#supported-encodings) below for more details.
@@ -139,6 +142,7 @@ exporters:
139142
140143
The destination topic can be defined in a few different ways and takes priority in the following order:
141144
142-
1. When `topic_from_attribute` is configured, and the corresponding attribute is found on the ingested data, the value of this attribute is used.
143-
2. If a prior component in the collector pipeline sets the topic on the context via the `topic.WithTopic` function (from the `github.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/topic` package), the value set in the context is used.
144-
3. Finally, the `<signal>::topic` configuration is used for the signal-specific destination topic. If this is not explicitly configured, the `topic` configuration (deprecated in v0.124.0) is used as a fallback for all signals.
145+
1. When `<signal>.topic_from_metadata_key` is set to use a key from the request metadata, the value of this key is used as the signal specific topic.
146+
2. Otherwise, if `topic_from_attribute` is configured, and the corresponding attribute is found on the ingested data, the value of this attribute is used.
147+
3. If a prior component in the collector pipeline sets the topic on the context via the `topic.WithTopic` function (from the `github.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/topic` package), the value set in the context is used.
148+
4. Finally, the `<signal>::topic` configuration is used for the signal-specific destination topic. If this is not explicitly configured, the `topic` configuration (deprecated in v0.124.0) is used as a fallback for all signals.

exporter/kafkaexporter/config.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,11 @@ type SignalConfig struct {
117117
// - "otlp_logs" for logs
118118
Topic string `mapstructure:"topic"`
119119

120+
// TopicFromMetadataKey holds the name of the metadata key to use as the
121+
// topic name for this signal type. If this is set, it takes precedence
122+
// over the topic name set in the topic field.
123+
TopicFromMetadataKey string `mapstructure:"topic_from_metadata_key"`
124+
120125
// Encoding holds the encoding of messages for the signal type.
121126
//
122127
// Defaults to "otlp_proto".

exporter/kafkaexporter/config_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,9 @@ func TestLoadConfig(t *testing.T) {
8787
ClientConfig: configkafka.NewDefaultClientConfig(),
8888
Producer: configkafka.NewDefaultProducerConfig(),
8989
Logs: SignalConfig{
90-
Topic: "legacy_topic",
91-
Encoding: "otlp_proto",
90+
Topic: "legacy_topic",
91+
Encoding: "otlp_proto",
92+
TopicFromMetadataKey: "metadata_key",
9293
},
9394
Metrics: SignalConfig{
9495
Topic: "metrics_topic",

exporter/kafkaexporter/kafka_exporter.go

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ func (e *kafkaTracesMessager) marshalData(td ptrace.Traces) ([]marshaler.Message
151151
}
152152

153153
func (e *kafkaTracesMessager) getTopic(ctx context.Context, td ptrace.Traces) string {
154-
return getTopic(ctx, &e.config, e.config.Traces.Topic, td.ResourceSpans())
154+
return getTopic(ctx, e.config.Traces, e.config.TopicFromAttribute, td.ResourceSpans())
155155
}
156156

157157
func (e *kafkaTracesMessager) partitionData(td ptrace.Traces) iter.Seq2[[]byte, ptrace.Traces] {
@@ -196,7 +196,7 @@ func (e *kafkaLogsMessager) marshalData(ld plog.Logs) ([]marshaler.Message, erro
196196
}
197197

198198
func (e *kafkaLogsMessager) getTopic(ctx context.Context, ld plog.Logs) string {
199-
return getTopic(ctx, &e.config, e.config.Logs.Topic, ld.ResourceLogs())
199+
return getTopic(ctx, e.config.Logs, e.config.TopicFromAttribute, ld.ResourceLogs())
200200
}
201201

202202
func (e *kafkaLogsMessager) partitionData(ld plog.Logs) iter.Seq2[[]byte, plog.Logs] {
@@ -239,7 +239,7 @@ func (e *kafkaMetricsMessager) marshalData(md pmetric.Metrics) ([]marshaler.Mess
239239
}
240240

241241
func (e *kafkaMetricsMessager) getTopic(ctx context.Context, md pmetric.Metrics) string {
242-
return getTopic(ctx, &e.config, e.config.Metrics.Topic, md.ResourceMetrics())
242+
return getTopic(ctx, e.config.Metrics, e.config.TopicFromAttribute, md.ResourceMetrics())
243243
}
244244

245245
func (e *kafkaMetricsMessager) partitionData(md pmetric.Metrics) iter.Seq2[[]byte, pmetric.Metrics] {
@@ -270,23 +270,27 @@ type resource interface {
270270

271271
func getTopic[T resource](
272272
ctx context.Context,
273-
cfg *Config,
274-
defaultTopic string,
273+
signalCfg SignalConfig,
274+
topicFromAttribute string,
275275
resources resourceSlice[T],
276276
) string {
277-
if cfg.TopicFromAttribute != "" {
277+
if k := signalCfg.TopicFromMetadataKey; k != "" {
278+
if topic := client.FromContext(ctx).Metadata.Get(k); len(topic) > 0 {
279+
return topic[0]
280+
}
281+
}
282+
if topicFromAttribute != "" {
278283
for i := 0; i < resources.Len(); i++ {
279-
rv, ok := resources.At(i).Resource().Attributes().Get(cfg.TopicFromAttribute)
284+
rv, ok := resources.At(i).Resource().Attributes().Get(topicFromAttribute)
280285
if ok && rv.Str() != "" {
281286
return rv.Str()
282287
}
283288
}
284289
}
285-
contextTopic, ok := topic.FromContext(ctx)
286-
if ok {
287-
return contextTopic
290+
if topic, ok := topic.FromContext(ctx); ok {
291+
return topic
288292
}
289-
return defaultTopic
293+
return signalCfg.Topic
290294
}
291295

292296
func makeSaramaMessages(messages []marshaler.Message, topic string) []*sarama.ProducerMessage {

exporter/kafkaexporter/kafka_exporter_test.go

Lines changed: 115 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -666,91 +666,142 @@ func TestLogsPusher_partitioning(t *testing.T) {
666666

667667
func Test_GetTopic(t *testing.T) {
668668
tests := []struct {
669-
name string
670-
cfg Config
671-
ctx context.Context
672-
resource any
673-
wantTopic string
669+
name string
670+
topicFromAttribute string
671+
signalCfg SignalConfig
672+
ctx context.Context
673+
resource any
674+
wantTopic string
674675
}{
676+
// topicFromAttribute tests.
675677
{
676-
name: "Valid metric attribute, return topic name",
677-
cfg: Config{
678-
TopicFromAttribute: "resource-attr",
679-
},
680-
ctx: topic.WithTopic(context.Background(), "context-topic"),
681-
resource: testdata.GenerateMetrics(1).ResourceMetrics(),
682-
wantTopic: "resource-attr-val-1",
678+
name: "Valid metric attribute, return topic name",
679+
topicFromAttribute: "resource-attr",
680+
signalCfg: SignalConfig{Topic: "defaultTopic"},
681+
ctx: topic.WithTopic(context.Background(), "context-topic"),
682+
resource: testdata.GenerateMetrics(1).ResourceMetrics(),
683+
wantTopic: "resource-attr-val-1",
683684
},
684685
{
685-
name: "Valid trace attribute, return topic name",
686-
cfg: Config{
687-
TopicFromAttribute: "resource-attr",
688-
},
689-
ctx: topic.WithTopic(context.Background(), "context-topic"),
690-
resource: testdata.GenerateTraces(1).ResourceSpans(),
691-
wantTopic: "resource-attr-val-1",
686+
name: "Valid trace attribute, return topic name",
687+
topicFromAttribute: "resource-attr",
688+
signalCfg: SignalConfig{Topic: "defaultTopic"},
689+
ctx: topic.WithTopic(context.Background(), "context-topic"),
690+
resource: testdata.GenerateTraces(1).ResourceSpans(),
691+
wantTopic: "resource-attr-val-1",
692692
},
693693
{
694-
name: "Valid log attribute, return topic name",
695-
cfg: Config{
696-
TopicFromAttribute: "resource-attr",
697-
},
698-
ctx: topic.WithTopic(context.Background(), "context-topic"),
699-
resource: testdata.GenerateLogs(1).ResourceLogs(),
700-
wantTopic: "resource-attr-val-1",
694+
name: "Valid log attribute, return topic name",
695+
topicFromAttribute: "resource-attr",
696+
signalCfg: SignalConfig{Topic: "defaultTopic"},
697+
ctx: topic.WithTopic(context.Background(), "context-topic"),
698+
resource: testdata.GenerateLogs(1).ResourceLogs(),
699+
wantTopic: "resource-attr-val-1",
701700
},
702701
{
703-
name: "Attribute not found",
704-
cfg: Config{
705-
TopicFromAttribute: "nonexistent_attribute",
706-
},
702+
name: "Attribute not found",
703+
topicFromAttribute: "nonexistent_attribute",
704+
signalCfg: SignalConfig{Topic: "defaultTopic"},
705+
ctx: context.Background(),
706+
resource: testdata.GenerateMetrics(1).ResourceMetrics(),
707+
wantTopic: "defaultTopic",
708+
},
709+
// Nonexistent attribute tests.
710+
{
711+
name: "Valid metric context, return topic name",
712+
topicFromAttribute: "nonexistent_attribute",
713+
signalCfg: SignalConfig{Topic: "defaultTopic"},
714+
ctx: topic.WithTopic(context.Background(), "context-topic"),
715+
resource: testdata.GenerateMetrics(1).ResourceMetrics(),
716+
wantTopic: "context-topic",
717+
},
718+
{
719+
name: "Valid trace context, return topic name",
720+
topicFromAttribute: "nonexistent_attribute",
721+
signalCfg: SignalConfig{Topic: "defaultTopic"},
722+
ctx: topic.WithTopic(context.Background(), "context-topic"),
723+
resource: testdata.GenerateTraces(1).ResourceSpans(),
724+
wantTopic: "context-topic",
725+
},
726+
{
727+
name: "Valid log context, return topic name",
728+
topicFromAttribute: "nonexistent_attribute",
729+
signalCfg: SignalConfig{Topic: "defaultTopic"},
730+
ctx: topic.WithTopic(context.Background(), "context-topic"),
731+
resource: testdata.GenerateLogs(1).ResourceLogs(),
732+
wantTopic: "context-topic",
733+
},
734+
// Generic known failure modes.
735+
{
736+
name: "Attribute not found",
737+
topicFromAttribute: "nonexistent_attribute",
738+
signalCfg: SignalConfig{Topic: "defaultTopic"},
739+
ctx: context.Background(),
740+
resource: testdata.GenerateMetrics(1).ResourceMetrics(),
741+
wantTopic: "defaultTopic",
742+
},
743+
{
744+
name: "TopicFromAttribute, return default topic",
707745
ctx: context.Background(),
746+
signalCfg: SignalConfig{Topic: "defaultTopic"},
708747
resource: testdata.GenerateMetrics(1).ResourceMetrics(),
709748
wantTopic: "defaultTopic",
710749
},
711-
750+
// topicFromMetadata tests.
712751
{
713-
name: "Valid metric context, return topic name",
714-
cfg: Config{
715-
TopicFromAttribute: "nonexistent_attribute",
752+
name: "Metrics topic from metadata",
753+
signalCfg: SignalConfig{
754+
Topic: "defaultTopic",
755+
TopicFromMetadataKey: "metrics_topic_metadata",
716756
},
717-
ctx: topic.WithTopic(context.Background(), "context-topic"),
757+
ctx: client.NewContext(context.Background(),
758+
client.Info{Metadata: client.NewMetadata(map[string][]string{
759+
"metrics_topic_metadata": {"my_metrics_topic"},
760+
})},
761+
),
718762
resource: testdata.GenerateMetrics(1).ResourceMetrics(),
719-
wantTopic: "context-topic",
720-
},
721-
{
722-
name: "Valid trace context, return topic name",
723-
cfg: Config{
724-
TopicFromAttribute: "nonexistent_attribute",
725-
},
726-
ctx: topic.WithTopic(context.Background(), "context-topic"),
727-
resource: testdata.GenerateTraces(1).ResourceSpans(),
728-
wantTopic: "context-topic",
763+
wantTopic: "my_metrics_topic",
729764
},
730765
{
731-
name: "Valid log context, return topic name",
732-
cfg: Config{
733-
TopicFromAttribute: "nonexistent_attribute",
766+
name: "Logs topic from metadata",
767+
signalCfg: SignalConfig{
768+
Topic: "defaultTopic",
769+
TopicFromMetadataKey: "logs_topic_metadata",
734770
},
735-
ctx: topic.WithTopic(context.Background(), "context-topic"),
771+
ctx: client.NewContext(context.Background(),
772+
client.Info{Metadata: client.NewMetadata(map[string][]string{
773+
"logs_topic_metadata": {"my_logs_topic"},
774+
})},
775+
),
736776
resource: testdata.GenerateLogs(1).ResourceLogs(),
737-
wantTopic: "context-topic",
777+
wantTopic: "my_logs_topic",
738778
},
739-
740779
{
741-
name: "Attribute not found",
742-
cfg: Config{
743-
TopicFromAttribute: "nonexistent_attribute",
780+
name: "Traces topic from metadata",
781+
signalCfg: SignalConfig{
782+
Topic: "defaultTopic",
783+
TopicFromMetadataKey: "traces_topic_metadata",
744784
},
745-
ctx: context.Background(),
746-
resource: testdata.GenerateMetrics(1).ResourceMetrics(),
747-
wantTopic: "defaultTopic",
785+
ctx: client.NewContext(context.Background(),
786+
client.Info{Metadata: client.NewMetadata(map[string][]string{
787+
"traces_topic_metadata": {"my_traces_topic"},
788+
})},
789+
),
790+
resource: testdata.GenerateTraces(1).ResourceSpans(),
791+
wantTopic: "my_traces_topic",
748792
},
749793
{
750-
name: "TopicFromAttribute, return default topic",
751-
cfg: Config{},
752-
ctx: context.Background(),
753-
resource: testdata.GenerateMetrics(1).ResourceMetrics(),
794+
name: "metadata key not found uses default topic",
795+
signalCfg: SignalConfig{
796+
Topic: "defaultTopic",
797+
TopicFromMetadataKey: "key not found",
798+
},
799+
ctx: client.NewContext(context.Background(),
800+
client.Info{Metadata: client.NewMetadata(map[string][]string{
801+
"traces_topic_metadata": {"my_traces_topic"},
802+
})},
803+
),
804+
resource: testdata.GenerateTraces(1).ResourceSpans(),
754805
wantTopic: "defaultTopic",
755806
},
756807
}
@@ -760,11 +811,11 @@ func Test_GetTopic(t *testing.T) {
760811
topic := ""
761812
switch r := tests[i].resource.(type) {
762813
case pmetric.ResourceMetricsSlice:
763-
topic = getTopic(tests[i].ctx, &tests[i].cfg, "defaultTopic", r)
814+
topic = getTopic(tests[i].ctx, tests[i].signalCfg, tests[i].topicFromAttribute, r)
764815
case ptrace.ResourceSpansSlice:
765-
topic = getTopic(tests[i].ctx, &tests[i].cfg, "defaultTopic", r)
816+
topic = getTopic(tests[i].ctx, tests[i].signalCfg, tests[i].topicFromAttribute, r)
766817
case plog.ResourceLogsSlice:
767-
topic = getTopic(tests[i].ctx, &tests[i].cfg, "defaultTopic", r)
818+
topic = getTopic(tests[i].ctx, tests[i].signalCfg, tests[i].topicFromAttribute, r)
768819
}
769820
assert.Equal(t, tests[i].wantTopic, topic)
770821
})

exporter/kafkaexporter/testdata/config.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ kafka/legacy_topic:
2323
topic: legacy_topic
2424
metrics:
2525
topic: metrics_topic
26+
logs:
27+
topic_from_metadata_key: metadata_key
2628
kafka/legacy_encoding:
2729
encoding: legacy_encoding
2830
metrics:

0 commit comments

Comments
 (0)