Skip to content

[chore] kafkareceiver: embed configkafka types #38817

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion internal/kafka/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ var saramaCompressionCodecs = map[string]sarama.CompressionCodec{
"zstd": sarama.CompressionZSTD,
}

var saramaInitialOffsets = map[string]int64{
configkafka.EarliestOffset: sarama.OffsetOldest,
configkafka.LatestOffset: sarama.OffsetNewest,
}

// NewSaramaClient returns a new Kafka client with the given configuration.
func NewSaramaClient(ctx context.Context, config configkafka.ClientConfig) (sarama.Client, error) {
saramaConfig, err := NewSaramaClientConfig(ctx, config)
Expand All @@ -38,7 +43,26 @@ func NewSaramaClusterAdminClient(ctx context.Context, config configkafka.ClientC
return sarama.NewClusterAdmin(config.Brokers, saramaConfig)
}

// TODO add NewSaramaConsumerGroup, extracted from receiver/kafkareceiver
// NewSaramaConsumerGroup returns a new Kafka consumer group with the given configuration.
func NewSaramaConsumerGroup(
ctx context.Context,
clientConfig configkafka.ClientConfig,
consumerConfig configkafka.ConsumerConfig,
) (sarama.ConsumerGroup, error) {
saramaConfig, err := NewSaramaClientConfig(ctx, clientConfig)
if err != nil {
return nil, err
}
saramaConfig.Consumer.Group.Session.Timeout = consumerConfig.SessionTimeout
saramaConfig.Consumer.Group.Heartbeat.Interval = consumerConfig.HeartbeatInterval
saramaConfig.Consumer.Fetch.Min = consumerConfig.MinFetchSize
saramaConfig.Consumer.Fetch.Default = consumerConfig.DefaultFetchSize
saramaConfig.Consumer.Fetch.Max = consumerConfig.MaxFetchSize
saramaConfig.Consumer.Offsets.AutoCommit.Enable = consumerConfig.AutoCommit.Enable
saramaConfig.Consumer.Offsets.AutoCommit.Interval = consumerConfig.AutoCommit.Interval
saramaConfig.Consumer.Offsets.Initial = saramaInitialOffsets[consumerConfig.InitialOffset]
return sarama.NewConsumerGroup(clientConfig.Brokers, consumerConfig.GroupID, saramaConfig)
}

// NewSaramaSyncProducer returns a new synchronous Kafka producer with the given configuration.
//
Expand Down
2 changes: 1 addition & 1 deletion internal/kafka/configkafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func NewDefaultConsumerConfig() ConsumerConfig {
SessionTimeout: 10 * time.Second,
HeartbeatInterval: 3 * time.Second,
GroupID: "otel-collector",
InitialOffset: "latest",
InitialOffset: LatestOffset,
AutoCommit: AutoCommitConfig{
Enable: true,
Interval: time.Second,
Expand Down
7 changes: 3 additions & 4 deletions receiver/kafkareceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,12 @@ Note that metrics and logs only support OTLP.

## Getting Started

The following settings are required:

- `protocol_version` (no default): Kafka protocol version e.g. 2.0.0
There are no required settings.

The following settings can be optionally configured:

- `brokers` (default = localhost:9092): The list of kafka brokers
- `brokers` (default = localhost:9092): The list of kafka brokers.
- `protocol_version` (default = 2.1.0): Kafka protocol version.
- `resolve_canonical_bootstrap_servers_only` (default = false): Whether to resolve then reverse-lookup broker IPs during startup
- `topic` (default = otlp_spans for traces, otlp_metrics for metrics, otlp_logs for logs): The name of the kafka topic to read from.
Only one telemetry type may be used for a given topic.
Expand Down
86 changes: 17 additions & 69 deletions receiver/kafkareceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,99 +4,47 @@
package kafkareceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver"

import (
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configretry"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka/configkafka"
)

type AutoCommit struct {
// Whether or not to auto-commit updated offsets back to the broker.
// (default enabled).
Enable bool `mapstructure:"enable"`
// How frequently to commit updated offsets. Ineffective unless
// auto-commit is enabled (default 1s)
Interval time.Duration `mapstructure:"interval"`
}

type MessageMarking struct {
// If true, the messages are marked after the pipeline execution
After bool `mapstructure:"after"`

// If false, only the successfully processed messages are marked, it has no impact if
// After is set to false.
// Note: this can block the entire partition in case a message processing returns
// a permanent error.
OnError bool `mapstructure:"on_error"`
}

type HeaderExtraction struct {
ExtractHeaders bool `mapstructure:"extract_headers"`
Headers []string `mapstructure:"headers"`
}
var _ component.Config = (*Config)(nil)

// Config defines configuration for Kafka receiver.
type Config struct {
// The list of kafka brokers (default localhost:9092)
Brokers []string `mapstructure:"brokers"`
// ResolveCanonicalBootstrapServersOnly makes Sarama do a DNS lookup for
// each of the provided brokers. It will then do a PTR lookup for each
// returned IP, and that set of names becomes the broker list. This can be
// required in SASL environments.
ResolveCanonicalBootstrapServersOnly bool `mapstructure:"resolve_canonical_bootstrap_servers_only"`
// Kafka protocol version
ProtocolVersion string `mapstructure:"protocol_version"`
// Session interval for the Kafka consumer
SessionTimeout time.Duration `mapstructure:"session_timeout"`
// Heartbeat interval for the Kafka consumer
HeartbeatInterval time.Duration `mapstructure:"heartbeat_interval"`
configkafka.ClientConfig `mapstructure:",squash"`
configkafka.ConsumerConfig `mapstructure:",squash"`

// The name of the kafka topic to consume from (default "otlp_spans" for traces, "otlp_metrics" for metrics, "otlp_logs" for logs)
Topic string `mapstructure:"topic"`

// Encoding of the messages (default "otlp_proto")
Encoding string `mapstructure:"encoding"`
// The consumer group that receiver will be consuming messages from (default "otel-collector")
GroupID string `mapstructure:"group_id"`
// The consumer client ID that receiver will use (default "otel-collector")
ClientID string `mapstructure:"client_id"`
// The initial offset to use if no offset was previously committed.
// Must be `latest` or `earliest` (default "latest").
InitialOffset string `mapstructure:"initial_offset"`

// Metadata is the namespace for metadata management properties used by the
// Client, and shared by the Producer/Consumer.
Metadata configkafka.MetadataConfig `mapstructure:"metadata"`

Authentication configkafka.AuthenticationConfig `mapstructure:"auth"`

// Controls the auto-commit functionality
AutoCommit AutoCommit `mapstructure:"autocommit"`

// Controls the way the messages are marked as consumed
MessageMarking MessageMarking `mapstructure:"message_marking"`

// Extract headers from kafka records
HeaderExtraction HeaderExtraction `mapstructure:"header_extraction"`

// The minimum bytes per fetch from Kafka (default "1")
MinFetchSize int32 `mapstructure:"min_fetch_size"`
// The default bytes per fetch from Kafka (default "1048576")
DefaultFetchSize int32 `mapstructure:"default_fetch_size"`
// The maximum bytes per fetch from Kafka (default "0", no limit)
MaxFetchSize int32 `mapstructure:"max_fetch_size"`
// In case of some errors returned by the next consumer, the receiver will wait and retry the failed message
ErrorBackOff configretry.BackOffConfig `mapstructure:"error_backoff"`
}

const (
offsetLatest string = "latest"
offsetEarliest string = "earliest"
)
type MessageMarking struct {
// If true, the messages are marked after the pipeline execution
After bool `mapstructure:"after"`

var _ component.Config = (*Config)(nil)
// If false, only the successfully processed messages are marked, it has no impact if
// After is set to false.
// Note: this can block the entire partition in case a message processing returns
// a permanent error.
OnError bool `mapstructure:"on_error"`
}

// Validate checks the receiver configuration is valid
func (cfg *Config) Validate() error {
return nil
type HeaderExtraction struct {
ExtractHeaders bool `mapstructure:"extract_headers"`
Headers []string `mapstructure:"headers"`
}
92 changes: 32 additions & 60 deletions receiver/kafkareceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,39 +34,20 @@ func TestLoadConfig(t *testing.T) {
{
id: component.NewIDWithName(metadata.Type, ""),
expected: &Config{
Topic: "spans",
Encoding: "otlp_proto",
Brokers: []string{"foo:123", "bar:456"},
ResolveCanonicalBootstrapServersOnly: true,
ClientID: "otel-collector",
GroupID: "otel-collector",
InitialOffset: "latest",
SessionTimeout: 10 * time.Second,
HeartbeatInterval: 3 * time.Second,
Authentication: configkafka.AuthenticationConfig{
TLS: &configtls.ClientConfig{
Config: configtls.Config{
CAFile: "ca.pem",
CertFile: "cert.pem",
KeyFile: "key.pem",
},
},
},
Metadata: configkafka.MetadataConfig{
Full: true,
RefreshInterval: 10 * time.Minute,
Retry: configkafka.MetadataRetryConfig{
Max: 10,
Backoff: time.Second * 5,
},
},
AutoCommit: AutoCommit{
Enable: true,
Interval: 1 * time.Second,
},
MinFetchSize: 1,
DefaultFetchSize: 1048576,
MaxFetchSize: 0,
ClientConfig: func() configkafka.ClientConfig {
config := configkafka.NewDefaultClientConfig()
config.Brokers = []string{"foo:123", "bar:456"}
config.ResolveCanonicalBootstrapServersOnly = true
config.ClientID = "the_client_id"
return config
}(),
ConsumerConfig: func() configkafka.ConsumerConfig {
config := configkafka.NewDefaultConsumerConfig()
config.GroupID = "the_group_id"
return config
}(),
Topic: "spans",
Encoding: "otlp_proto",
ErrorBackOff: configretry.BackOffConfig{
Enabled: false,
},
Expand All @@ -75,38 +56,29 @@ func TestLoadConfig(t *testing.T) {
{
id: component.NewIDWithName(metadata.Type, "logs"),
expected: &Config{
Topic: "logs",
Encoding: "direct",
Brokers: []string{"coffee:123", "foobar:456"},
ClientID: "otel-collector",
GroupID: "otel-collector",
InitialOffset: "earliest",
SessionTimeout: 45 * time.Second,
HeartbeatInterval: 15 * time.Second,
Authentication: configkafka.AuthenticationConfig{
TLS: &configtls.ClientConfig{
ClientConfig: func() configkafka.ClientConfig {
config := configkafka.NewDefaultClientConfig()
config.Brokers = []string{"coffee:123", "foobar:456"}
config.Metadata.Retry.Max = 10
config.Metadata.Retry.Backoff = 5 * time.Second
config.Authentication.TLS = &configtls.ClientConfig{
Config: configtls.Config{
CAFile: "ca.pem",
CertFile: "cert.pem",
KeyFile: "key.pem",
},
},
},
Metadata: configkafka.MetadataConfig{
Full: true,
RefreshInterval: 10 * time.Minute,
Retry: configkafka.MetadataRetryConfig{
Max: 10,
Backoff: time.Second * 5,
},
},
AutoCommit: AutoCommit{
Enable: true,
Interval: 1 * time.Second,
},
MinFetchSize: 1,
DefaultFetchSize: 1048576,
MaxFetchSize: 0,
}
return config
}(),
ConsumerConfig: func() configkafka.ConsumerConfig {
config := configkafka.NewDefaultConsumerConfig()
config.InitialOffset = configkafka.EarliestOffset
config.SessionTimeout = 45 * time.Second
config.HeartbeatInterval = 15 * time.Second
return config
}(),
Topic: "logs",
Encoding: "direct",
ErrorBackOff: configretry.BackOffConfig{
Enabled: true,
InitialInterval: 1 * time.Second,
Expand Down
45 changes: 7 additions & 38 deletions receiver/kafkareceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"context"
"errors"
"strings"
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
Expand All @@ -18,28 +17,10 @@ import (
)

const (
defaultTracesTopic = "otlp_spans"
defaultMetricsTopic = "otlp_metrics"
defaultLogsTopic = "otlp_logs"
defaultEncoding = "otlp_proto"
defaultBroker = "localhost:9092"
defaultClientID = "otel-collector"
defaultGroupID = defaultClientID
defaultInitialOffset = offsetLatest
defaultSessionTimeout = 10 * time.Second
defaultHeartbeatInterval = 3 * time.Second

// default from sarama.NewConfig()
defaultAutoCommitEnable = true
// default from sarama.NewConfig()
defaultAutoCommitInterval = 1 * time.Second

// default from sarama.NewConfig()
defaultMinFetchSize = int32(1)
// default from sarama.NewConfig()
defaultDefaultFetchSize = int32(1048576)
// default from sarama.NewConfig()
defaultMaxFetchSize = int32(0)
defaultTracesTopic = "otlp_spans"
defaultMetricsTopic = "otlp_metrics"
defaultLogsTopic = "otlp_logs"
defaultEncoding = "otlp_proto"
)

var errUnrecognizedEncoding = errors.New("unrecognized encoding")
Expand All @@ -64,28 +45,16 @@ func NewFactory(options ...FactoryOption) receiver.Factory {

func createDefaultConfig() component.Config {
return &Config{
Encoding: defaultEncoding,
Brokers: []string{defaultBroker},
ClientID: defaultClientID,
GroupID: defaultGroupID,
InitialOffset: defaultInitialOffset,
SessionTimeout: defaultSessionTimeout,
HeartbeatInterval: defaultHeartbeatInterval,
Metadata: configkafka.NewDefaultMetadataConfig(),
AutoCommit: AutoCommit{
Enable: defaultAutoCommitEnable,
Interval: defaultAutoCommitInterval,
},
ClientConfig: configkafka.NewDefaultClientConfig(),
ConsumerConfig: configkafka.NewDefaultConsumerConfig(),
Encoding: defaultEncoding,
MessageMarking: MessageMarking{
After: false,
OnError: false,
},
HeaderExtraction: HeaderExtraction{
ExtractHeaders: false,
},
MinFetchSize: defaultMinFetchSize,
DefaultFetchSize: defaultDefaultFetchSize,
MaxFetchSize: defaultMaxFetchSize,
}
}

Expand Down
12 changes: 3 additions & 9 deletions receiver/kafkareceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,16 @@ import (
"go.opentelemetry.io/collector/receiver/receivertest"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka/configkafka"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver/internal/metadata"
)

func TestCreateDefaultConfig(t *testing.T) {
cfg := createDefaultConfig().(*Config)
assert.NotNil(t, cfg, "failed to create default config")
assert.NoError(t, componenttest.CheckConfigStruct(cfg))
assert.Equal(t, []string{defaultBroker}, cfg.Brokers)
assert.Equal(t, defaultGroupID, cfg.GroupID)
assert.Equal(t, defaultClientID, cfg.ClientID)
assert.Equal(t, defaultInitialOffset, cfg.InitialOffset)
assert.Equal(t, defaultSessionTimeout, cfg.SessionTimeout)
assert.Equal(t, defaultHeartbeatInterval, cfg.HeartbeatInterval)
assert.Equal(t, defaultMinFetchSize, cfg.MinFetchSize)
assert.Equal(t, defaultDefaultFetchSize, cfg.DefaultFetchSize)
assert.Equal(t, defaultMaxFetchSize, cfg.MaxFetchSize)
assert.Equal(t, configkafka.NewDefaultClientConfig(), cfg.ClientConfig)
assert.Equal(t, configkafka.NewDefaultConsumerConfig(), cfg.ConsumerConfig)
}

func TestCreateTraces(t *testing.T) {
Expand Down
Loading