Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 3 additions & 36 deletions exporter/kafkaexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/exporter/exporterhelper"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka/configkafka"
)

// Config defines configuration for Kafka exporter.
Expand Down Expand Up @@ -63,7 +63,7 @@ type Config struct {
Producer Producer `mapstructure:"producer"`

// Authentication defines used authentication mechanism.
Authentication kafka.Authentication `mapstructure:"auth"`
Authentication configkafka.AuthenticationConfig `mapstructure:"auth"`
}

// Metadata defines configuration for retrieving metadata from the broker.
Expand Down Expand Up @@ -121,41 +121,8 @@ func (cfg *Config) Validate() error {
if cfg.Producer.RequiredAcks < -1 || cfg.Producer.RequiredAcks > 1 {
return fmt.Errorf("producer.required_acks has to be between -1 and 1. configured value %v", cfg.Producer.RequiredAcks)
}

_, err := saramaProducerCompressionCodec(cfg.Producer.Compression)
if err != nil {
return err
}

return validateSASLConfig(cfg.Authentication.SASL)
}

func validateSASLConfig(c *kafka.SASLConfig) error {
if c == nil {
return nil
}

if c.Mechanism != "AWS_MSK_IAM" && c.Mechanism != "AWS_MSK_IAM_OAUTHBEARER" {
if c.Username == "" {
return fmt.Errorf("auth.sasl.username is required")
}
if c.Password == "" {
return fmt.Errorf("auth.sasl.password is required")
}
}

switch c.Mechanism {
case "PLAIN", "AWS_MSK_IAM", "AWS_MSK_IAM_OAUTHBEARER", "SCRAM-SHA-256", "SCRAM-SHA-512":
// Do nothing, valid mechanism
default:
return fmt.Errorf("auth.sasl.mechanism should be one of 'PLAIN', 'AWS_MSK_IAM', 'AWS_MSK_IAM_OAUTHBEARER', 'SCRAM-SHA-256' or 'SCRAM-SHA-512'. configured value %v", c.Mechanism)
}

if c.Version < 0 || c.Version > 1 {
return fmt.Errorf("auth.sasl.version has to be either 0 or 1. configured value %v", c.Version)
}

return nil
return err
}

func saramaProducerCompressionCodec(compression string) (sarama.CompressionCodec, error) {
Expand Down
109 changes: 10 additions & 99 deletions exporter/kafkaexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"go.opentelemetry.io/collector/exporter/exporterhelper"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter/internal/metadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka/configkafka"
)

func TestLoadConfig(t *testing.T) {
Expand Down Expand Up @@ -63,8 +63,8 @@ func TestLoadConfig(t *testing.T) {
PartitionLogsByResourceAttributes: true,
Brokers: []string{"foo:123", "bar:456"},
ClientID: "test_client_id",
Authentication: kafka.Authentication{
PlainText: &kafka.PlainTextConfig{
Authentication: configkafka.AuthenticationConfig{
PlainText: &configkafka.PlainTextConfig{
Username: "jdoe",
Password: "pass",
},
Expand All @@ -86,8 +86,8 @@ func TestLoadConfig(t *testing.T) {
{
id: component.NewIDWithName(metadata.Type, ""),
option: func(conf *Config) {
conf.Authentication = kafka.Authentication{
SASL: &kafka.SASLConfig{
conf.Authentication = configkafka.AuthenticationConfig{
SASL: &configkafka.SASLConfig{
Username: "jdoe",
Password: "pass",
Mechanism: "PLAIN",
Expand Down Expand Up @@ -119,12 +119,12 @@ func TestLoadConfig(t *testing.T) {
PartitionLogsByResourceAttributes: true,
Brokers: []string{"foo:123", "bar:456"},
ClientID: "test_client_id",
Authentication: kafka.Authentication{
PlainText: &kafka.PlainTextConfig{
Authentication: configkafka.AuthenticationConfig{
PlainText: &configkafka.PlainTextConfig{
Username: "jdoe",
Password: "pass",
},
SASL: &kafka.SASLConfig{
SASL: &configkafka.SASLConfig{
Username: "jdoe",
Password: "pass",
Mechanism: "PLAIN",
Expand Down Expand Up @@ -175,8 +175,8 @@ func TestLoadConfig(t *testing.T) {
Brokers: []string{"foo:123", "bar:456"},
ClientID: "test_client_id",
ResolveCanonicalBootstrapServersOnly: true,
Authentication: kafka.Authentication{
PlainText: &kafka.PlainTextConfig{
Authentication: configkafka.AuthenticationConfig{
PlainText: &configkafka.PlainTextConfig{
Username: "jdoe",
Password: "pass",
},
Expand Down Expand Up @@ -222,95 +222,6 @@ func TestValidate_err_compression(t *testing.T) {
assert.EqualError(t, err, "producer.compression should be one of 'none', 'gzip', 'snappy', 'lz4', or 'zstd'. configured value idk")
}

func TestValidate_sasl_username(t *testing.T) {
config := &Config{
Producer: Producer{
Compression: "none",
},
Authentication: kafka.Authentication{
SASL: &kafka.SASLConfig{
Username: "",
Password: "pass",
Mechanism: "PLAIN",
},
},
}

err := config.Validate()
assert.EqualError(t, err, "auth.sasl.username is required")
}

func TestValidate_sasl_password(t *testing.T) {
config := &Config{
Producer: Producer{
Compression: "none",
},
Authentication: kafka.Authentication{
SASL: &kafka.SASLConfig{
Username: "jdoe",
Password: "",
Mechanism: "PLAIN",
},
},
}

err := config.Validate()
assert.EqualError(t, err, "auth.sasl.password is required")
}

func TestValidate_sasl_mechanism(t *testing.T) {
config := &Config{
Producer: Producer{
Compression: "none",
},
Authentication: kafka.Authentication{
SASL: &kafka.SASLConfig{
Username: "jdoe",
Password: "pass",
Mechanism: "FAKE",
},
},
}

err := config.Validate()
assert.EqualError(t, err, "auth.sasl.mechanism should be one of 'PLAIN', 'AWS_MSK_IAM', 'AWS_MSK_IAM_OAUTHBEARER', 'SCRAM-SHA-256' or 'SCRAM-SHA-512'. configured value FAKE")
}

func TestValidate_sasl_version(t *testing.T) {
config := &Config{
Producer: Producer{
Compression: "none",
},
Authentication: kafka.Authentication{
SASL: &kafka.SASLConfig{
Username: "jdoe",
Password: "pass",
Mechanism: "PLAIN",
Version: 42,
},
},
}

err := config.Validate()
assert.EqualError(t, err, "auth.sasl.version has to be either 0 or 1. configured value 42")
}

func TestValidate_sasl_iam(t *testing.T) {
config := &Config{
Producer: Producer{
Compression: "none",
},
Authentication: kafka.Authentication{
SASL: &kafka.SASLConfig{
Mechanism: "AWS_MSK_IAM",
},
},
}

err := config.Validate()
assert.NoError(t, err)
}

func Test_saramaProducerCompressionCodec(t *testing.T) {
tests := map[string]struct {
compression string
Expand Down
2 changes: 1 addition & 1 deletion exporter/kafkaexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ require (
go.opentelemetry.io/collector/component v1.27.1-0.20250307194215-7d3e03e500b0
go.opentelemetry.io/collector/component/componenttest v0.121.1-0.20250307194215-7d3e03e500b0
go.opentelemetry.io/collector/config/configretry v1.27.1-0.20250307194215-7d3e03e500b0
go.opentelemetry.io/collector/config/configtls v1.27.1-0.20250307194215-7d3e03e500b0
go.opentelemetry.io/collector/confmap v1.27.1-0.20250307194215-7d3e03e500b0
go.opentelemetry.io/collector/confmap/xconfmap v0.121.1-0.20250307164521-7c787571daa5
go.opentelemetry.io/collector/consumer v1.27.1-0.20250307194215-7d3e03e500b0
Expand Down Expand Up @@ -88,6 +87,7 @@ require (
github.com/xdg-go/stringprep v1.0.4 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/collector/config/configopaque v1.27.1-0.20250307194215-7d3e03e500b0 // indirect
go.opentelemetry.io/collector/config/configtls v1.27.1-0.20250307194215-7d3e03e500b0 // indirect
go.opentelemetry.io/collector/consumer/consumertest v0.121.1-0.20250307194215-7d3e03e500b0 // indirect
go.opentelemetry.io/collector/consumer/xconsumer v0.121.1-0.20250307194215-7d3e03e500b0 // indirect
go.opentelemetry.io/collector/exporter/xexporter v0.121.1-0.20250307194215-7d3e03e500b0 // indirect
Expand Down
2 changes: 1 addition & 1 deletion exporter/kafkaexporter/kafka_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func newSaramaProducer(ctx context.Context, config Config) (sarama.SyncProducer,
c.Version = version
}

if err := kafka.ConfigureAuthentication(ctx, config.Authentication, c); err != nil {
if err := kafka.ConfigureSaramaAuthentication(ctx, config.Authentication, c); err != nil {
return nil, err
}

Expand Down
34 changes: 0 additions & 34 deletions exporter/kafkaexporter/kafka_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
Expand All @@ -23,7 +22,6 @@ import (
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter/internal/metadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/topic"
)

Expand Down Expand Up @@ -111,38 +109,6 @@ func TestLogsExporter_encoding_extension(t *testing.T) {
assert.NotContains(t, err.Error(), errUnrecognizedEncoding.Error())
}

func TestNewExporter_err_auth_type(t *testing.T) {
c := Config{
ProtocolVersion: "2.0.0",
Authentication: kafka.Authentication{
TLS: &configtls.ClientConfig{
Config: configtls.Config{
CAFile: "/nonexistent",
},
},
},
Encoding: defaultEncoding,
Metadata: Metadata{
Full: false,
},
Producer: Producer{
Compression: "none",
},
}
texp := newTracesExporter(c, exportertest.NewNopSettings(metadata.Type))
require.NotNil(t, texp)
err := texp.start(context.Background(), componenttest.NewNopHost())
assert.ErrorContains(t, err, "failed to load TLS config")
mexp := newMetricsExporter(c, exportertest.NewNopSettings(metadata.Type))
require.NotNil(t, mexp)
err = mexp.start(context.Background(), componenttest.NewNopHost())
assert.ErrorContains(t, err, "failed to load TLS config")
lexp := newLogsExporter(c, exportertest.NewNopSettings(metadata.Type))
require.NotNil(t, lexp)
err = lexp.start(context.Background(), componenttest.NewNopHost())
assert.ErrorContains(t, err, "failed to load TLS config")
}

func TestNewExporter_err_compression(t *testing.T) {
c := Config{
Encoding: defaultEncoding,
Expand Down
10 changes: 5 additions & 5 deletions extension/observer/kafkatopicsobserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

"go.uber.org/multierr"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka/configkafka"
)

// Config defines configuration for docker observer
Expand All @@ -22,10 +22,10 @@ type Config struct {
// required in SASL environments.
ResolveCanonicalBootstrapServersOnly bool `mapstructure:"resolve_canonical_bootstrap_servers_only"`
// Kafka protocol version
ProtocolVersion string `mapstructure:"protocol_version"`
Authentication kafka.Authentication `mapstructure:"auth"`
TopicRegex string `mapstructure:"topic_regex"`
TopicsSyncInterval time.Duration `mapstructure:"topics_sync_interval"`
ProtocolVersion string `mapstructure:"protocol_version"`
Authentication configkafka.AuthenticationConfig `mapstructure:"auth"`
TopicRegex string `mapstructure:"topic_regex"`
TopicsSyncInterval time.Duration `mapstructure:"topics_sync_interval"`
}

func (config *Config) Validate() (errs error) {
Expand Down
6 changes: 3 additions & 3 deletions extension/observer/kafkatopicsobserver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"go.opentelemetry.io/collector/confmap/xconfmap"

"github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/kafkatopicsobserver/internal/metadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka/configkafka"
)

func TestLoadConfig(t *testing.T) {
Expand All @@ -40,8 +40,8 @@ func TestLoadConfig(t *testing.T) {
TopicRegex: "^topic[0-9]$",
TopicsSyncInterval: 100 * time.Millisecond,
ResolveCanonicalBootstrapServersOnly: false,
Authentication: kafka.Authentication{
PlainText: &kafka.PlainTextConfig{
Authentication: configkafka.AuthenticationConfig{
PlainText: &configkafka.PlainTextConfig{
Username: "fooUser",
Password: "fooPassword",
},
Expand Down
2 changes: 1 addition & 1 deletion extension/observer/kafkatopicsobserver/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ var createKafkaClusterAdmin = func(ctx context.Context, config Config) (sarama.C
return nil, err
}
}
if err := kafka.ConfigureAuthentication(ctx, config.Authentication, saramaConfig); err != nil {
if err := kafka.ConfigureSaramaAuthentication(ctx, config.Authentication, saramaConfig); err != nil {
return nil, err
}
return sarama.NewClusterAdmin(config.Brokers, saramaConfig)
Expand Down
Loading