Skip to content

Commit 9fe267f

Browse files
authored
[chore] kafkareceiver: embed configkafka types (#38817)
#### Description Use configkafka.ClientConfig and configkafka.ConsumerConfig, and the newly extracted consumer client. Redundant validation tests have been removed, as they are now covered by configkafka. This is a non-functional change, as the config defaults do not change for this component. Nevertheless I have manually tested -- see Testing section below. #### Link to tracking issue Fixes #38411 #### Testing Updated unit tests. Manually tested against Redpanda: `$ docker run --publish=9093:9093 --health-cmd "rpk cluster health | grep 'Healthy:.*true'" docker.redpanda.com/redpandadata/redpanda:v23.1.11 redpanda start --kafka-addr=internal://0.0.0.0:9092,external://0.0.0.0:9093 --smp=1 --memory=1G --mode=dev-container` Ran the collector with `kafkametrics -> kafkaexporter -> Redpanda -> kafkareceiver -> debugexporter`: ```yaml receivers: kafka: brokers: [localhost:9093] kafkametrics: brokers: [localhost:9093] collection_interval: 10s scrapers: [topics, consumers] exporters: debug: verbosity: detailed kafka: brokers: [localhost:9093] service: pipelines: metrics/kafka: receivers: [kafka] exporters: [debug] metrics: receivers: [kafkametrics] exporters: [kafka] ``` #### Documentation Updated README to clarify that `protocol_version` is not required.
1 parent 67df5d6 commit 9fe267f

File tree

10 files changed

+92
-338
lines changed

10 files changed

+92
-338
lines changed

internal/kafka/client.go

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,11 @@ var saramaCompressionCodecs = map[string]sarama.CompressionCodec{
2020
"zstd": sarama.CompressionZSTD,
2121
}
2222

23+
var saramaInitialOffsets = map[string]int64{
24+
configkafka.EarliestOffset: sarama.OffsetOldest,
25+
configkafka.LatestOffset: sarama.OffsetNewest,
26+
}
27+
2328
// NewSaramaClient returns a new Kafka client with the given configuration.
2429
func NewSaramaClient(ctx context.Context, config configkafka.ClientConfig) (sarama.Client, error) {
2530
saramaConfig, err := NewSaramaClientConfig(ctx, config)
@@ -38,7 +43,26 @@ func NewSaramaClusterAdminClient(ctx context.Context, config configkafka.ClientC
3843
return sarama.NewClusterAdmin(config.Brokers, saramaConfig)
3944
}
4045

41-
// TODO add NewSaramaConsumerGroup, extracted from receiver/kafkareceiver
46+
// NewSaramaConsumerGroup returns a new Kafka consumer group with the given configuration.
47+
func NewSaramaConsumerGroup(
48+
ctx context.Context,
49+
clientConfig configkafka.ClientConfig,
50+
consumerConfig configkafka.ConsumerConfig,
51+
) (sarama.ConsumerGroup, error) {
52+
saramaConfig, err := NewSaramaClientConfig(ctx, clientConfig)
53+
if err != nil {
54+
return nil, err
55+
}
56+
saramaConfig.Consumer.Group.Session.Timeout = consumerConfig.SessionTimeout
57+
saramaConfig.Consumer.Group.Heartbeat.Interval = consumerConfig.HeartbeatInterval
58+
saramaConfig.Consumer.Fetch.Min = consumerConfig.MinFetchSize
59+
saramaConfig.Consumer.Fetch.Default = consumerConfig.DefaultFetchSize
60+
saramaConfig.Consumer.Fetch.Max = consumerConfig.MaxFetchSize
61+
saramaConfig.Consumer.Offsets.AutoCommit.Enable = consumerConfig.AutoCommit.Enable
62+
saramaConfig.Consumer.Offsets.AutoCommit.Interval = consumerConfig.AutoCommit.Interval
63+
saramaConfig.Consumer.Offsets.Initial = saramaInitialOffsets[consumerConfig.InitialOffset]
64+
return sarama.NewConsumerGroup(clientConfig.Brokers, consumerConfig.GroupID, saramaConfig)
65+
}
4266

4367
// NewSaramaSyncProducer returns a new synchronous Kafka producer with the given configuration.
4468
//

internal/kafka/configkafka/config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ func NewDefaultConsumerConfig() ConsumerConfig {
9999
SessionTimeout: 10 * time.Second,
100100
HeartbeatInterval: 3 * time.Second,
101101
GroupID: "otel-collector",
102-
InitialOffset: "latest",
102+
InitialOffset: LatestOffset,
103103
AutoCommit: AutoCommitConfig{
104104
Enable: true,
105105
Interval: time.Second,

receiver/kafkareceiver/README.md

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,12 @@ Note that metrics and logs only support OTLP.
1919

2020
## Getting Started
2121

22-
The following settings are required:
23-
24-
- `protocol_version` (no default): Kafka protocol version e.g. 2.0.0
22+
There are no required settings.
2523

2624
The following settings can be optionally configured:
2725

28-
- `brokers` (default = localhost:9092): The list of kafka brokers
26+
- `brokers` (default = localhost:9092): The list of kafka brokers.
27+
- `protocol_version` (default = 2.1.0): Kafka protocol version.
2928
- `resolve_canonical_bootstrap_servers_only` (default = false): Whether to resolve then reverse-lookup broker IPs during startup
3029
- `topic` (default = otlp_spans for traces, otlp_metrics for metrics, otlp_logs for logs): The name of the kafka topic to read from.
3130
Only one telemetry type may be used for a given topic.

receiver/kafkareceiver/config.go

Lines changed: 17 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -4,99 +4,47 @@
44
package kafkareceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver"
55

66
import (
7-
"time"
8-
97
"go.opentelemetry.io/collector/component"
108
"go.opentelemetry.io/collector/config/configretry"
119

1210
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka/configkafka"
1311
)
1412

15-
type AutoCommit struct {
16-
// Whether or not to auto-commit updated offsets back to the broker.
17-
// (default enabled).
18-
Enable bool `mapstructure:"enable"`
19-
// How frequently to commit updated offsets. Ineffective unless
20-
// auto-commit is enabled (default 1s)
21-
Interval time.Duration `mapstructure:"interval"`
22-
}
23-
24-
type MessageMarking struct {
25-
// If true, the messages are marked after the pipeline execution
26-
After bool `mapstructure:"after"`
27-
28-
// If false, only the successfully processed messages are marked, it has no impact if
29-
// After is set to false.
30-
// Note: this can block the entire partition in case a message processing returns
31-
// a permanent error.
32-
OnError bool `mapstructure:"on_error"`
33-
}
34-
35-
type HeaderExtraction struct {
36-
ExtractHeaders bool `mapstructure:"extract_headers"`
37-
Headers []string `mapstructure:"headers"`
38-
}
13+
var _ component.Config = (*Config)(nil)
3914

4015
// Config defines configuration for Kafka receiver.
4116
type Config struct {
42-
// The list of kafka brokers (default localhost:9092)
43-
Brokers []string `mapstructure:"brokers"`
44-
// ResolveCanonicalBootstrapServersOnly makes Sarama do a DNS lookup for
45-
// each of the provided brokers. It will then do a PTR lookup for each
46-
// returned IP, and that set of names becomes the broker list. This can be
47-
// required in SASL environments.
48-
ResolveCanonicalBootstrapServersOnly bool `mapstructure:"resolve_canonical_bootstrap_servers_only"`
49-
// Kafka protocol version
50-
ProtocolVersion string `mapstructure:"protocol_version"`
51-
// Session interval for the Kafka consumer
52-
SessionTimeout time.Duration `mapstructure:"session_timeout"`
53-
// Heartbeat interval for the Kafka consumer
54-
HeartbeatInterval time.Duration `mapstructure:"heartbeat_interval"`
17+
configkafka.ClientConfig `mapstructure:",squash"`
18+
configkafka.ConsumerConfig `mapstructure:",squash"`
19+
5520
// The name of the kafka topic to consume from (default "otlp_spans" for traces, "otlp_metrics" for metrics, "otlp_logs" for logs)
5621
Topic string `mapstructure:"topic"`
22+
5723
// Encoding of the messages (default "otlp_proto")
5824
Encoding string `mapstructure:"encoding"`
59-
// The consumer group that receiver will be consuming messages from (default "otel-collector")
60-
GroupID string `mapstructure:"group_id"`
61-
// The consumer client ID that receiver will use (default "otel-collector")
62-
ClientID string `mapstructure:"client_id"`
63-
// The initial offset to use if no offset was previously committed.
64-
// Must be `latest` or `earliest` (default "latest").
65-
InitialOffset string `mapstructure:"initial_offset"`
66-
67-
// Metadata is the namespace for metadata management properties used by the
68-
// Client, and shared by the Producer/Consumer.
69-
Metadata configkafka.MetadataConfig `mapstructure:"metadata"`
70-
71-
Authentication configkafka.AuthenticationConfig `mapstructure:"auth"`
72-
73-
// Controls the auto-commit functionality
74-
AutoCommit AutoCommit `mapstructure:"autocommit"`
7525

7626
// Controls the way the messages are marked as consumed
7727
MessageMarking MessageMarking `mapstructure:"message_marking"`
7828

7929
// Extract headers from kafka records
8030
HeaderExtraction HeaderExtraction `mapstructure:"header_extraction"`
8131

82-
// The minimum bytes per fetch from Kafka (default "1")
83-
MinFetchSize int32 `mapstructure:"min_fetch_size"`
84-
// The default bytes per fetch from Kafka (default "1048576")
85-
DefaultFetchSize int32 `mapstructure:"default_fetch_size"`
86-
// The maximum bytes per fetch from Kafka (default "0", no limit)
87-
MaxFetchSize int32 `mapstructure:"max_fetch_size"`
8832
// In case of some errors returned by the next consumer, the receiver will wait and retry the failed message
8933
ErrorBackOff configretry.BackOffConfig `mapstructure:"error_backoff"`
9034
}
9135

92-
const (
93-
offsetLatest string = "latest"
94-
offsetEarliest string = "earliest"
95-
)
36+
type MessageMarking struct {
37+
// If true, the messages are marked after the pipeline execution
38+
After bool `mapstructure:"after"`
9639

97-
var _ component.Config = (*Config)(nil)
40+
// If false, only the successfully processed messages are marked, it has no impact if
41+
// After is set to false.
42+
// Note: this can block the entire partition in case a message processing returns
43+
// a permanent error.
44+
OnError bool `mapstructure:"on_error"`
45+
}
9846

99-
// Validate checks the receiver configuration is valid
100-
func (cfg *Config) Validate() error {
101-
return nil
47+
type HeaderExtraction struct {
48+
ExtractHeaders bool `mapstructure:"extract_headers"`
49+
Headers []string `mapstructure:"headers"`
10250
}

receiver/kafkareceiver/config_test.go

Lines changed: 32 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -34,39 +34,20 @@ func TestLoadConfig(t *testing.T) {
3434
{
3535
id: component.NewIDWithName(metadata.Type, ""),
3636
expected: &Config{
37-
Topic: "spans",
38-
Encoding: "otlp_proto",
39-
Brokers: []string{"foo:123", "bar:456"},
40-
ResolveCanonicalBootstrapServersOnly: true,
41-
ClientID: "otel-collector",
42-
GroupID: "otel-collector",
43-
InitialOffset: "latest",
44-
SessionTimeout: 10 * time.Second,
45-
HeartbeatInterval: 3 * time.Second,
46-
Authentication: configkafka.AuthenticationConfig{
47-
TLS: &configtls.ClientConfig{
48-
Config: configtls.Config{
49-
CAFile: "ca.pem",
50-
CertFile: "cert.pem",
51-
KeyFile: "key.pem",
52-
},
53-
},
54-
},
55-
Metadata: configkafka.MetadataConfig{
56-
Full: true,
57-
RefreshInterval: 10 * time.Minute,
58-
Retry: configkafka.MetadataRetryConfig{
59-
Max: 10,
60-
Backoff: time.Second * 5,
61-
},
62-
},
63-
AutoCommit: AutoCommit{
64-
Enable: true,
65-
Interval: 1 * time.Second,
66-
},
67-
MinFetchSize: 1,
68-
DefaultFetchSize: 1048576,
69-
MaxFetchSize: 0,
37+
ClientConfig: func() configkafka.ClientConfig {
38+
config := configkafka.NewDefaultClientConfig()
39+
config.Brokers = []string{"foo:123", "bar:456"}
40+
config.ResolveCanonicalBootstrapServersOnly = true
41+
config.ClientID = "the_client_id"
42+
return config
43+
}(),
44+
ConsumerConfig: func() configkafka.ConsumerConfig {
45+
config := configkafka.NewDefaultConsumerConfig()
46+
config.GroupID = "the_group_id"
47+
return config
48+
}(),
49+
Topic: "spans",
50+
Encoding: "otlp_proto",
7051
ErrorBackOff: configretry.BackOffConfig{
7152
Enabled: false,
7253
},
@@ -75,38 +56,29 @@ func TestLoadConfig(t *testing.T) {
7556
{
7657
id: component.NewIDWithName(metadata.Type, "logs"),
7758
expected: &Config{
78-
Topic: "logs",
79-
Encoding: "direct",
80-
Brokers: []string{"coffee:123", "foobar:456"},
81-
ClientID: "otel-collector",
82-
GroupID: "otel-collector",
83-
InitialOffset: "earliest",
84-
SessionTimeout: 45 * time.Second,
85-
HeartbeatInterval: 15 * time.Second,
86-
Authentication: configkafka.AuthenticationConfig{
87-
TLS: &configtls.ClientConfig{
59+
ClientConfig: func() configkafka.ClientConfig {
60+
config := configkafka.NewDefaultClientConfig()
61+
config.Brokers = []string{"coffee:123", "foobar:456"}
62+
config.Metadata.Retry.Max = 10
63+
config.Metadata.Retry.Backoff = 5 * time.Second
64+
config.Authentication.TLS = &configtls.ClientConfig{
8865
Config: configtls.Config{
8966
CAFile: "ca.pem",
9067
CertFile: "cert.pem",
9168
KeyFile: "key.pem",
9269
},
93-
},
94-
},
95-
Metadata: configkafka.MetadataConfig{
96-
Full: true,
97-
RefreshInterval: 10 * time.Minute,
98-
Retry: configkafka.MetadataRetryConfig{
99-
Max: 10,
100-
Backoff: time.Second * 5,
101-
},
102-
},
103-
AutoCommit: AutoCommit{
104-
Enable: true,
105-
Interval: 1 * time.Second,
106-
},
107-
MinFetchSize: 1,
108-
DefaultFetchSize: 1048576,
109-
MaxFetchSize: 0,
70+
}
71+
return config
72+
}(),
73+
ConsumerConfig: func() configkafka.ConsumerConfig {
74+
config := configkafka.NewDefaultConsumerConfig()
75+
config.InitialOffset = configkafka.EarliestOffset
76+
config.SessionTimeout = 45 * time.Second
77+
config.HeartbeatInterval = 15 * time.Second
78+
return config
79+
}(),
80+
Topic: "logs",
81+
Encoding: "direct",
11082
ErrorBackOff: configretry.BackOffConfig{
11183
Enabled: true,
11284
InitialInterval: 1 * time.Second,

receiver/kafkareceiver/factory.go

Lines changed: 7 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"context"
88
"errors"
99
"strings"
10-
"time"
1110

1211
"go.opentelemetry.io/collector/component"
1312
"go.opentelemetry.io/collector/consumer"
@@ -18,28 +17,10 @@ import (
1817
)
1918

2019
const (
21-
defaultTracesTopic = "otlp_spans"
22-
defaultMetricsTopic = "otlp_metrics"
23-
defaultLogsTopic = "otlp_logs"
24-
defaultEncoding = "otlp_proto"
25-
defaultBroker = "localhost:9092"
26-
defaultClientID = "otel-collector"
27-
defaultGroupID = defaultClientID
28-
defaultInitialOffset = offsetLatest
29-
defaultSessionTimeout = 10 * time.Second
30-
defaultHeartbeatInterval = 3 * time.Second
31-
32-
// default from sarama.NewConfig()
33-
defaultAutoCommitEnable = true
34-
// default from sarama.NewConfig()
35-
defaultAutoCommitInterval = 1 * time.Second
36-
37-
// default from sarama.NewConfig()
38-
defaultMinFetchSize = int32(1)
39-
// default from sarama.NewConfig()
40-
defaultDefaultFetchSize = int32(1048576)
41-
// default from sarama.NewConfig()
42-
defaultMaxFetchSize = int32(0)
20+
defaultTracesTopic = "otlp_spans"
21+
defaultMetricsTopic = "otlp_metrics"
22+
defaultLogsTopic = "otlp_logs"
23+
defaultEncoding = "otlp_proto"
4324
)
4425

4526
var errUnrecognizedEncoding = errors.New("unrecognized encoding")
@@ -57,28 +38,16 @@ func NewFactory() receiver.Factory {
5738

5839
func createDefaultConfig() component.Config {
5940
return &Config{
60-
Encoding: defaultEncoding,
61-
Brokers: []string{defaultBroker},
62-
ClientID: defaultClientID,
63-
GroupID: defaultGroupID,
64-
InitialOffset: defaultInitialOffset,
65-
SessionTimeout: defaultSessionTimeout,
66-
HeartbeatInterval: defaultHeartbeatInterval,
67-
Metadata: configkafka.NewDefaultMetadataConfig(),
68-
AutoCommit: AutoCommit{
69-
Enable: defaultAutoCommitEnable,
70-
Interval: defaultAutoCommitInterval,
71-
},
41+
ClientConfig: configkafka.NewDefaultClientConfig(),
42+
ConsumerConfig: configkafka.NewDefaultConsumerConfig(),
43+
Encoding: defaultEncoding,
7244
MessageMarking: MessageMarking{
7345
After: false,
7446
OnError: false,
7547
},
7648
HeaderExtraction: HeaderExtraction{
7749
ExtractHeaders: false,
7850
},
79-
MinFetchSize: defaultMinFetchSize,
80-
DefaultFetchSize: defaultDefaultFetchSize,
81-
MaxFetchSize: defaultMaxFetchSize,
8251
}
8352
}
8453

receiver/kafkareceiver/factory_test.go

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,22 +17,16 @@ import (
1717
"go.opentelemetry.io/collector/receiver/receivertest"
1818
"go.uber.org/zap"
1919

20+
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka/configkafka"
2021
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver/internal/metadata"
2122
)
2223

2324
func TestCreateDefaultConfig(t *testing.T) {
2425
cfg := createDefaultConfig().(*Config)
2526
assert.NotNil(t, cfg, "failed to create default config")
2627
assert.NoError(t, componenttest.CheckConfigStruct(cfg))
27-
assert.Equal(t, []string{defaultBroker}, cfg.Brokers)
28-
assert.Equal(t, defaultGroupID, cfg.GroupID)
29-
assert.Equal(t, defaultClientID, cfg.ClientID)
30-
assert.Equal(t, defaultInitialOffset, cfg.InitialOffset)
31-
assert.Equal(t, defaultSessionTimeout, cfg.SessionTimeout)
32-
assert.Equal(t, defaultHeartbeatInterval, cfg.HeartbeatInterval)
33-
assert.Equal(t, defaultMinFetchSize, cfg.MinFetchSize)
34-
assert.Equal(t, defaultDefaultFetchSize, cfg.DefaultFetchSize)
35-
assert.Equal(t, defaultMaxFetchSize, cfg.MaxFetchSize)
28+
assert.Equal(t, configkafka.NewDefaultClientConfig(), cfg.ClientConfig)
29+
assert.Equal(t, configkafka.NewDefaultConsumerConfig(), cfg.ConsumerConfig)
3630
}
3731

3832
func TestCreateTraces(t *testing.T) {

0 commit comments

Comments
 (0)