From 6102e3ef3e19b811dd05f2842bf86e18597adcd7 Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Sat, 8 Mar 2025 16:47:38 +0800 Subject: [PATCH 1/8] internal/kafka: add common config & clients --- internal/kafka/authentication.go | 117 ++----- internal/kafka/authentication_test.go | 96 ++--- internal/kafka/client.go | 45 +++ internal/kafka/client_test.go | 110 ++++++ internal/kafka/configkafka/config.go | 327 ++++++++++++++++++ internal/kafka/configkafka/config_test.go | 182 ++++++++++ .../configkafka/testdata/client_config.yaml | 74 ++++ .../configkafka/testdata/consumer_config.yaml | 16 + .../configkafka/testdata/producer_config.yaml | 14 + internal/kafka/go.mod | 20 ++ internal/kafka/go.sum | 76 ++++ 11 files changed, 946 insertions(+), 131 deletions(-) create mode 100644 internal/kafka/client.go create mode 100644 internal/kafka/client_test.go create mode 100644 internal/kafka/configkafka/config.go create mode 100644 internal/kafka/configkafka/config_test.go create mode 100644 internal/kafka/configkafka/testdata/client_config.yaml create mode 100644 internal/kafka/configkafka/testdata/consumer_config.yaml create mode 100644 internal/kafka/configkafka/testdata/producer_config.yaml diff --git a/internal/kafka/authentication.go b/internal/kafka/authentication.go index ca6ce4c3519e..19b466fe7bcf 100644 --- a/internal/kafka/authentication.go +++ b/internal/kafka/authentication.go @@ -15,73 +15,22 @@ import ( "go.opentelemetry.io/collector/config/configtls" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka/awsmsk" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka/configkafka" ) -// Authentication defines authentication. -type Authentication struct { - PlainText *PlainTextConfig `mapstructure:"plain_text"` - SASL *SASLConfig `mapstructure:"sasl"` - TLS *configtls.ClientConfig `mapstructure:"tls"` - Kerberos *KerberosConfig `mapstructure:"kerberos"` -} - -// PlainTextConfig defines plaintext authentication. -type PlainTextConfig struct { - Username string `mapstructure:"username"` - Password string `mapstructure:"password"` -} - -// SASLConfig defines the configuration for the SASL authentication. -type SASLConfig struct { - // Username to be used on authentication - Username string `mapstructure:"username"` - // Password to be used on authentication - Password string `mapstructure:"password"` - // SASL Mechanism to be used, possible values are: (PLAIN, AWS_MSK_IAM, AWS_MSK_IAM_OAUTHBEARER, SCRAM-SHA-256 or SCRAM-SHA-512). - Mechanism string `mapstructure:"mechanism"` - // SASL Protocol Version to be used, possible values are: (0, 1). Defaults to 0. - Version int `mapstructure:"version"` - - AWSMSK AWSMSKConfig `mapstructure:"aws_msk"` -} - -// AWSMSKConfig defines the additional SASL authentication -// measures needed to use AWS_MSK_IAM and AWS_MSK_IAM_OAUTHBEARER mechanism -type AWSMSKConfig struct { - // Region is the AWS region the MSK cluster is based in - Region string `mapstructure:"region"` - // BrokerAddr is the client is connecting to in order to perform the auth required - BrokerAddr string `mapstructure:"broker_addr"` - // Context - ctx context.Context -} - -// Token return the AWS session token for the AWS_MSK_IAM_OAUTHBEARER mechanism -func (c *AWSMSKConfig) Token() (*sarama.AccessToken, error) { - token, _, err := signer.GenerateAuthToken(c.ctx, c.Region) - - return &sarama.AccessToken{Token: token}, err -} - -// KerberosConfig defines kerberos configuration. -type KerberosConfig struct { - ServiceName string `mapstructure:"service_name"` - Realm string `mapstructure:"realm"` - UseKeyTab bool `mapstructure:"use_keytab"` - Username string `mapstructure:"username"` - Password string `mapstructure:"password" json:"-"` - ConfigPath string `mapstructure:"config_file"` - KeyTabPath string `mapstructure:"keytab_file"` - DisablePAFXFAST bool `mapstructure:"disable_fast_negotiation"` -} - -// ConfigureAuthentication configures authentication in sarama.Config. -func ConfigureAuthentication(ctx context.Context, config Authentication, saramaConfig *sarama.Config) error { +// ConfigureSaramaAuthentication configures authentication in sarama.Config. +// +// The provided config is assumed to have been validated. +func ConfigureSaramaAuthentication( + ctx context.Context, + config configkafka.AuthenticationConfig, + saramaConfig *sarama.Config, +) error { if config.PlainText != nil { configurePlaintext(*config.PlainText, saramaConfig) } if config.TLS != nil { - if err := configureTLS(*config.TLS, saramaConfig); err != nil { + if err := configureTLS(ctx, *config.TLS, saramaConfig); err != nil { return err } } @@ -90,31 +39,23 @@ func ConfigureAuthentication(ctx context.Context, config Authentication, saramaC return err } } - if config.Kerberos != nil { configureKerberos(*config.Kerberos, saramaConfig) } return nil } -func configurePlaintext(config PlainTextConfig, saramaConfig *sarama.Config) { +func configurePlaintext(config configkafka.PlainTextConfig, saramaConfig *sarama.Config) { saramaConfig.Net.SASL.Enable = true saramaConfig.Net.SASL.User = config.Username saramaConfig.Net.SASL.Password = config.Password } -func configureSASL(ctx context.Context, config SASLConfig, saramaConfig *sarama.Config) error { - if config.Username == "" && config.Mechanism != "AWS_MSK_IAM_OAUTHBEARER" { - return fmt.Errorf("username have to be provided") - } - - if config.Password == "" && config.Mechanism != "AWS_MSK_IAM_OAUTHBEARER" { - return fmt.Errorf("password have to be provided") - } - +func configureSASL(ctx context.Context, config configkafka.SASLConfig, saramaConfig *sarama.Config) error { saramaConfig.Net.SASL.Enable = true saramaConfig.Net.SASL.User = config.Username saramaConfig.Net.SASL.Password = config.Password + saramaConfig.Net.SASL.Version = int16(config.Version) switch config.Mechanism { case "SCRAM-SHA-512": @@ -131,30 +72,17 @@ func configureSASL(ctx context.Context, config SASLConfig, saramaConfig *sarama. } saramaConfig.Net.SASL.Mechanism = awsmsk.Mechanism case "AWS_MSK_IAM_OAUTHBEARER": - config.AWSMSK.ctx = ctx saramaConfig.Net.SASL.Mechanism = sarama.SASLTypeOAuth - saramaConfig.Net.SASL.TokenProvider = &config.AWSMSK + saramaConfig.Net.SASL.TokenProvider = &awsMSKTokenProvider{ctx: ctx, region: config.AWSMSK.Region} tlsConfig := tls.Config{} saramaConfig.Net.TLS.Enable = true saramaConfig.Net.TLS.Config = &tlsConfig - default: - return fmt.Errorf(`invalid SASL Mechanism %q: can be either "PLAIN", "AWS_MSK_IAM", "AWS_MSK_IAM_OAUTHBEARER", "SCRAM-SHA-256" or "SCRAM-SHA-512"`, config.Mechanism) - } - - switch config.Version { - case 0: - saramaConfig.Net.SASL.Version = sarama.SASLHandshakeV0 - case 1: - saramaConfig.Net.SASL.Version = sarama.SASLHandshakeV1 - default: - return fmt.Errorf(`invalid SASL Protocol Version %d: can be either 0 or 1`, config.Version) } - return nil } -func configureTLS(config configtls.ClientConfig, saramaConfig *sarama.Config) error { - tlsConfig, err := config.LoadTLSConfig(context.Background()) +func configureTLS(ctx context.Context, config configtls.ClientConfig, saramaConfig *sarama.Config) error { + tlsConfig, err := config.LoadTLSConfig(ctx) if err != nil { return fmt.Errorf("error loading tls config: %w", err) } @@ -163,7 +91,7 @@ func configureTLS(config configtls.ClientConfig, saramaConfig *sarama.Config) er return nil } -func configureKerberos(config KerberosConfig, saramaConfig *sarama.Config) { +func configureKerberos(config configkafka.KerberosConfig, saramaConfig *sarama.Config) { saramaConfig.Net.SASL.Mechanism = sarama.SASLTypeGSSAPI saramaConfig.Net.SASL.Enable = true if config.UseKeyTab { @@ -179,3 +107,14 @@ func configureKerberos(config KerberosConfig, saramaConfig *sarama.Config) { saramaConfig.Net.SASL.GSSAPI.ServiceName = config.ServiceName saramaConfig.Net.SASL.GSSAPI.DisablePAFXFAST = config.DisablePAFXFAST } + +type awsMSKTokenProvider struct { + ctx context.Context + region string +} + +// Token return the AWS session token for the AWS_MSK_IAM_OAUTHBEARER mechanism +func (c *awsMSKTokenProvider) Token() (*sarama.AccessToken, error) { + token, _, err := signer.GenerateAuthToken(c.ctx, c.region) + return &sarama.AccessToken{Token: token}, err +} diff --git a/internal/kafka/authentication_test.go b/internal/kafka/authentication_test.go index 47cbc0989684..b59c5c097552 100644 --- a/internal/kafka/authentication_test.go +++ b/internal/kafka/authentication_test.go @@ -12,6 +12,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/config/configtls" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka/configkafka" ) func TestAuthentication(t *testing.T) { @@ -52,15 +54,17 @@ func TestAuthentication(t *testing.T) { require.NoError(t, err) saramaTLSCfg.Net.TLS.Config = tlscfg - ctx := context.Background() - saramaSASLAWSIAMOATUHConfig := &sarama.Config{} - saramaSASLAWSIAMOATUHConfig.Net.SASL.Enable = true - saramaSASLAWSIAMOATUHConfig.Net.SASL.Mechanism = sarama.SASLTypeOAuth - saramaSASLAWSIAMOATUHConfig.Net.SASL.TokenProvider = &AWSMSKConfig{Region: "region", ctx: ctx} + saramaSASLAWSIAMOAUTHConfig := &sarama.Config{} + saramaSASLAWSIAMOAUTHConfig.Net.SASL.Enable = true + saramaSASLAWSIAMOAUTHConfig.Net.SASL.Mechanism = sarama.SASLTypeOAuth + saramaSASLAWSIAMOAUTHConfig.Net.SASL.TokenProvider = &awsMSKTokenProvider{ + ctx: context.Background(), + region: "region", + } tlsConfig := tls.Config{} - saramaSASLAWSIAMOATUHConfig.Net.TLS.Enable = true - saramaSASLAWSIAMOATUHConfig.Net.TLS.Config = &tlsConfig + saramaSASLAWSIAMOAUTHConfig.Net.TLS.Enable = true + saramaSASLAWSIAMOAUTHConfig.Net.TLS.Config = &tlsConfig saramaKerberosCfg := &sarama.Config{} saramaKerberosCfg.Net.SASL.Mechanism = sarama.SASLTypeGSSAPI @@ -89,86 +93,82 @@ func TestAuthentication(t *testing.T) { saramaKerberosDisablePAFXFASTFalseCfg.Net.SASL.GSSAPI.DisablePAFXFAST = false tests := []struct { - auth Authentication + auth configkafka.AuthenticationConfig saramaConfig *sarama.Config err string }{ { - auth: Authentication{PlainText: &PlainTextConfig{Username: "jdoe", Password: "pass"}}, + auth: configkafka.AuthenticationConfig{ + PlainText: &configkafka.PlainTextConfig{Username: "jdoe", Password: "pass"}, + }, saramaConfig: saramaPlaintext, }, { - auth: Authentication{TLS: &configtls.ClientConfig{}}, + auth: configkafka.AuthenticationConfig{TLS: &configtls.ClientConfig{}}, saramaConfig: saramaTLSCfg, }, { - auth: Authentication{TLS: &configtls.ClientConfig{ + auth: configkafka.AuthenticationConfig{TLS: &configtls.ClientConfig{ Config: configtls.Config{CAFile: "/doesnotexists"}, }}, saramaConfig: saramaTLSCfg, err: "failed to load TLS config", }, { - auth: Authentication{Kerberos: &KerberosConfig{ServiceName: "foobar"}}, + auth: configkafka.AuthenticationConfig{ + Kerberos: &configkafka.KerberosConfig{ServiceName: "foobar"}, + }, saramaConfig: saramaKerberosCfg, }, { - auth: Authentication{Kerberos: &KerberosConfig{UseKeyTab: true, KeyTabPath: "/path"}}, + auth: configkafka.AuthenticationConfig{ + Kerberos: &configkafka.KerberosConfig{UseKeyTab: true, KeyTabPath: "/path"}, + }, saramaConfig: saramaKerberosKeyTabCfg, }, { - auth: Authentication{Kerberos: &KerberosConfig{ServiceName: "foobar", DisablePAFXFAST: true}}, + auth: configkafka.AuthenticationConfig{ + Kerberos: &configkafka.KerberosConfig{ServiceName: "foobar", DisablePAFXFAST: true}, + }, saramaConfig: saramaKerberosDisablePAFXFASTTrueCfg, }, { - auth: Authentication{Kerberos: &KerberosConfig{ServiceName: "foobar", DisablePAFXFAST: false}}, + auth: configkafka.AuthenticationConfig{Kerberos: &configkafka.KerberosConfig{ServiceName: "foobar", DisablePAFXFAST: false}}, saramaConfig: saramaKerberosDisablePAFXFASTFalseCfg, }, { - auth: Authentication{SASL: &SASLConfig{Username: "jdoe", Password: "pass", Mechanism: "SCRAM-SHA-256"}}, + auth: configkafka.AuthenticationConfig{SASL: &configkafka.SASLConfig{Username: "jdoe", Password: "pass", Mechanism: "SCRAM-SHA-256"}}, saramaConfig: saramaSASLSCRAM256Config, }, { - auth: Authentication{SASL: &SASLConfig{Username: "jdoe", Password: "pass", Mechanism: "SCRAM-SHA-512"}}, + auth: configkafka.AuthenticationConfig{SASL: &configkafka.SASLConfig{Username: "jdoe", Password: "pass", Mechanism: "SCRAM-SHA-512"}}, saramaConfig: saramaSASLSCRAM512Config, }, { - auth: Authentication{SASL: &SASLConfig{Username: "jdoe", Password: "pass", Mechanism: "SCRAM-SHA-512", Version: 1}}, + auth: configkafka.AuthenticationConfig{ + SASL: &configkafka.SASLConfig{Username: "jdoe", Password: "pass", Mechanism: "SCRAM-SHA-512", Version: 1}, + }, saramaConfig: saramaSASLHandshakeV1Config, }, { - auth: Authentication{SASL: &SASLConfig{Username: "jdoe", Password: "pass", Mechanism: "PLAIN"}}, + auth: configkafka.AuthenticationConfig{ + SASL: &configkafka.SASLConfig{Username: "jdoe", Password: "pass", Mechanism: "PLAIN"}, + }, saramaConfig: saramaSASLPLAINConfig, }, { - auth: Authentication{SASL: &SASLConfig{Username: "", Password: "", Mechanism: "AWS_MSK_IAM_OAUTHBEARER", AWSMSK: AWSMSKConfig{Region: "region"}}}, - saramaConfig: saramaSASLAWSIAMOATUHConfig, - }, - { - auth: Authentication{SASL: &SASLConfig{Username: "jdoe", Password: "pass", Mechanism: "SCRAM-SHA-222"}}, - saramaConfig: saramaSASLSCRAM512Config, - err: "invalid SASL Mechanism", - }, - { - auth: Authentication{SASL: &SASLConfig{Username: "", Password: "pass", Mechanism: "SCRAM-SHA-512"}}, - saramaConfig: saramaSASLSCRAM512Config, - err: "username have to be provided", - }, - { - auth: Authentication{SASL: &SASLConfig{Username: "jdoe", Password: "", Mechanism: "SCRAM-SHA-512"}}, - saramaConfig: saramaSASLSCRAM512Config, - err: "password have to be provided", - }, - { - auth: Authentication{SASL: &SASLConfig{Username: "jdoe", Password: "pass", Mechanism: "SCRAM-SHA-512", Version: 2}}, - saramaConfig: saramaSASLSCRAM512Config, - err: "invalid SASL Protocol Version", + auth: configkafka.AuthenticationConfig{ + SASL: &configkafka.SASLConfig{ + Mechanism: "AWS_MSK_IAM_OAUTHBEARER", AWSMSK: configkafka.AWSMSKConfig{Region: "region"}, + }, + }, + saramaConfig: saramaSASLAWSIAMOAUTHConfig, }, } for _, test := range tests { t.Run("", func(t *testing.T) { config := &sarama.Config{} - err := ConfigureAuthentication(context.Background(), test.auth, config) + err := ConfigureSaramaAuthentication(context.Background(), test.auth, config) if test.err != "" { assert.ErrorContains(t, err, test.err) } else { @@ -179,3 +179,15 @@ func TestAuthentication(t *testing.T) { }) } } + +func TestConfigureSaramaAuthentication_TLS(t *testing.T) { + auth := configkafka.AuthenticationConfig{ + TLS: &configtls.ClientConfig{ + Config: configtls.Config{ + CAFile: "/nonexistent", + }, + }, + } + err := ConfigureSaramaAuthentication(context.Background(), auth, &sarama.Config{}) + require.ErrorContains(t, err, "failed to load TLS config") +} diff --git a/internal/kafka/client.go b/internal/kafka/client.go new file mode 100644 index 000000000000..a8d7e9015278 --- /dev/null +++ b/internal/kafka/client.go @@ -0,0 +1,45 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package kafka // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka" + +import ( + "context" + + "github.com/IBM/sarama" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka/configkafka" +) + +// NewSaramaClusterAdminClient returns a new Kafka cluster admin client with the given configuration. +func NewSaramaClusterAdminClient(ctx context.Context, config configkafka.ClientConfig) (sarama.ClusterAdmin, error) { + saramaConfig, err := NewSaramaClientConfig(ctx, config) + if err != nil { + return nil, err + } + return sarama.NewClusterAdmin(config.Brokers, saramaConfig) +} + +// TODO add NewSaramaConsumerGroup, extracted from receiver/kafkareceiver +// TODO add NewSaramaSyncProducer, extracted from exporter/kafkaexporter + +// NewSaramaClientConfig returns a Sarama client config, based on the given config. +func NewSaramaClientConfig(ctx context.Context, config configkafka.ClientConfig) (*sarama.Config, error) { + saramaConfig := sarama.NewConfig() + saramaConfig.Metadata.Full = config.Metadata.Full + saramaConfig.Metadata.Retry.Max = config.Metadata.Retry.Max + saramaConfig.Metadata.Retry.Backoff = config.Metadata.Retry.Backoff + if config.ResolveCanonicalBootstrapServersOnly { + saramaConfig.Net.ResolveCanonicalBootstrapServers = true + } + if config.ProtocolVersion != "" { + var err error + if saramaConfig.Version, err = sarama.ParseKafkaVersion(config.ProtocolVersion); err != nil { + return nil, err + } + } + if err := ConfigureSaramaAuthentication(ctx, config.Authentication, saramaConfig); err != nil { + return nil, err + } + return saramaConfig, nil +} diff --git a/internal/kafka/client_test.go b/internal/kafka/client_test.go new file mode 100644 index 000000000000..aa3d44520e65 --- /dev/null +++ b/internal/kafka/client_test.go @@ -0,0 +1,110 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package kafka // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka" + +import ( + "context" + "testing" + "time" + + "github.com/IBM/sarama" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/config/configtls" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka/configkafka" +) + +func TestNewSaramaClientConfig(t *testing.T) { + for name, tt := range map[string]struct { + input configkafka.ClientConfig + check func(*testing.T, *sarama.Config) + expectedErr string + }{ + "default": { + input: configkafka.NewDefaultClientConfig(), + check: func(t *testing.T, cfg *sarama.Config) { + expected := sarama.NewConfig() + + // Ignore uncomparable fields, which happen to be irrelevant + // for the base client anyway. + expected.Consumer.Group.Rebalance.GroupStrategies = nil + expected.MetricRegistry = nil + expected.Producer.Partitioner = nil + cfg.Consumer.Group.Rebalance.GroupStrategies = nil + cfg.MetricRegistry = nil + cfg.Producer.Partitioner = nil + + // Our metadata defaults differ from those of Sarama's. + defaultMetadataConfig := configkafka.NewDefaultMetadataConfig() + expected.Metadata.Full = defaultMetadataConfig.Full + expected.Metadata.Retry.Max = defaultMetadataConfig.Retry.Max + expected.Metadata.Retry.Backoff = defaultMetadataConfig.Retry.Backoff + + assert.Equal(t, expected, cfg) + }, + }, + "metadata": { + input: func() configkafka.ClientConfig { + cfg := configkafka.NewDefaultClientConfig() + cfg.Metadata = configkafka.MetadataConfig{ + Full: false, + Retry: configkafka.MetadataRetryConfig{ + Max: 123, + Backoff: time.Minute, + }, + } + return cfg + }(), + check: func(t *testing.T, cfg *sarama.Config) { + assert.False(t, cfg.Metadata.Full) + assert.Equal(t, 123, cfg.Metadata.Retry.Max) + assert.Equal(t, time.Minute, cfg.Metadata.Retry.Backoff) + assert.Nil(t, cfg.Metadata.Retry.BackoffFunc) + }, + }, + "resolve_canonical_bootstrap_servers_only": { + input: func() configkafka.ClientConfig { + cfg := configkafka.NewDefaultClientConfig() + cfg.ResolveCanonicalBootstrapServersOnly = true + return cfg + }(), + check: func(t *testing.T, cfg *sarama.Config) { + assert.True(t, cfg.Net.ResolveCanonicalBootstrapServers) + }, + }, + "protocol_version": { + input: func() configkafka.ClientConfig { + cfg := configkafka.NewDefaultClientConfig() + cfg.ProtocolVersion = "3.1.2" + return cfg + }(), + check: func(t *testing.T, cfg *sarama.Config) { + assert.Equal(t, sarama.V3_1_2_0, cfg.Version) + }, + }, + "auth": { + input: func() configkafka.ClientConfig { + cfg := configkafka.NewDefaultClientConfig() + cfg.Authentication.TLS = &configtls.ClientConfig{ + Config: configtls.Config{CAFile: "/nonexistent"}, + } + return cfg + }(), + expectedErr: "failed to load TLS config", + }, + } { + t.Run(name, func(t *testing.T) { + output, err := NewSaramaClientConfig(context.Background(), tt.input) + if tt.expectedErr != "" { + require.Error(t, err) + require.ErrorContains(t, err, tt.expectedErr) + } else { + require.NoError(t, err) + require.NotNil(t, output) + tt.check(t, output) + } + }) + } +} diff --git a/internal/kafka/configkafka/config.go b/internal/kafka/configkafka/config.go new file mode 100644 index 000000000000..f1baa20814b6 --- /dev/null +++ b/internal/kafka/configkafka/config.go @@ -0,0 +1,327 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package configkafka // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka/configkafka" + +import ( + "errors" + "fmt" + "time" + + "github.com/IBM/sarama" + "go.opentelemetry.io/collector/config/configtls" + "go.opentelemetry.io/collector/confmap" +) + +const ( + LatestOffset = "latest" + EarliestOffset = "earliest" +) + +type ClientConfig struct { + // Brokers holds the list of Kafka bootstrap servers (default localhost:9092). + Brokers []string `mapstructure:"brokers"` + + // ResolveCanonicalBootstrapServersOnly configures the Kafka client to perform + // a DNS lookup on each of the provided brokers, and then perform a reverse + // lookup on the resulting IPs to obtain the canonical hostnames to use as the + // bootstrap servers. This can be required in SASL environments. + ResolveCanonicalBootstrapServersOnly bool `mapstructure:"resolve_canonical_bootstrap_servers_only"` + + // ProtocolVersion defines the Kafka protocol version that the client will + // assume it is running against. + ProtocolVersion string `mapstructure:"protocol_version"` + + // ClientID holds the client ID advertised to Kafka, which can be used for + // enforcing ACLs, throttling quotas, and more (default "otel-collector") + ClientID string `mapstructure:"client_id"` + + // Authentication holds Kafka authentication details. + Authentication AuthenticationConfig `mapstructure:"auth"` + + // Metadata holds metadata-related configuration for producers and consumers. + Metadata MetadataConfig `mapstructure:"metadata"` +} + +func NewDefaultClientConfig() ClientConfig { + return ClientConfig{ + Brokers: []string{"localhost:9092"}, + ClientID: "otel-collector", + Metadata: NewDefaultMetadataConfig(), + } +} + +func (c ClientConfig) Validate() error { + if len(c.Brokers) == 0 { + return errors.New("brokers must be specified") + } + if c.ProtocolVersion != "" { + if _, err := sarama.ParseKafkaVersion(c.ProtocolVersion); err != nil { + return fmt.Errorf("invalid protocol version: %w", err) + } + } + return nil +} + +type ConsumerConfig struct { + // SessionTimeout controls the Kafka consumer group session timeout. + // The session timeout is used to detect the consumer's liveness. + SessionTimeout time.Duration `mapstructure:"session_timeout"` + + // HeartbeatInterval controls the Kafka consumer group coordination + // heartbeat interval. Heartbeats ensure the consumer's session remains + // active. + HeartbeatInterval time.Duration `mapstructure:"heartbeat_interval"` + + // GroupID specifies the ID of the consumer group that will be + // consuming messages from (default "otel-collector"). + GroupID string `mapstructure:"group_id"` + + // InitialOffset specifies the initial offset to use if no offset was + // previously committed. Must be `latest` or `earliest` (default "latest"). + InitialOffset string `mapstructure:"initial_offset"` + + // AutoCommit controls the auto-commit functionality of the consumer. + AutoCommit AutoCommitConfig `mapstructure:"autocommit"` + + // The minimum bytes per fetch from Kafka (default "1") + MinFetchSize int32 `mapstructure:"min_fetch_size"` + + // The default bytes per fetch from Kafka (default "1048576") + DefaultFetchSize int32 `mapstructure:"default_fetch_size"` + + // The maximum bytes per fetch from Kafka (default "0", no limit) + MaxFetchSize int32 `mapstructure:"max_fetch_size"` +} + +func NewDefaultConsumerConfig() ConsumerConfig { + return ConsumerConfig{ + SessionTimeout: 10 * time.Second, + HeartbeatInterval: 3 * time.Second, + GroupID: "otel-collector", + InitialOffset: "latest", + AutoCommit: AutoCommitConfig{ + Enable: true, + Interval: time.Second, + }, + MinFetchSize: 1, + MaxFetchSize: 0, + DefaultFetchSize: 1048576, + } +} + +func (c ConsumerConfig) Validate() error { + switch c.InitialOffset { + case LatestOffset, EarliestOffset: + // Valid + default: + return fmt.Errorf( + "initial_offset should be one of 'latest' or 'earliest'. configured value %v", + c.InitialOffset, + ) + } + return nil +} + +type AutoCommitConfig struct { + // Whether or not to auto-commit updated offsets back to the broker. + // (default enabled). + Enable bool `mapstructure:"enable"` + + // How frequently to commit updated offsets. Ineffective unless + // auto-commit is enabled (default 1s) + Interval time.Duration `mapstructure:"interval"` +} + +type ProducerConfig struct { + // Maximum message bytes the producer will accept to produce (default 1000000) + MaxMessageBytes int `mapstructure:"max_message_bytes"` + + // RequiredAcks holds the number acknowledgements required before producing + // returns successfully. See: + // https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#acks + // + // Acceptable values are: + // 0 (NoResponse) Does not wait for any acknowledgements. + // 1 (WaitForLocal) Waits for only the leader to write the record to its local log, + // but does not wait for followers to acknowledge. (default) + // -1 (WaitForAll) Waits for all in-sync replicas to acknowledge. + // In YAML configuration, "all" is accepted as an alias for -1. + RequiredAcks RequiredAcks `mapstructure:"required_acks"` + + // Compression Codec used to produce messages + // https://pkg.go.dev/github.com/IBM/sarama@v1.30.0#CompressionCodec + // The options are: 'none' (default), 'gzip', 'snappy', 'lz4', and 'zstd' + Compression string `mapstructure:"compression"` + + // The maximum number of messages the producer will send in a single + // broker request. Defaults to 0 for unlimited. Similar to + // `queue.buffering.max.messages` in the JVM producer. + FlushMaxMessages int `mapstructure:"flush_max_messages"` +} + +func NewDefaultProducerConfig() ProducerConfig { + return ProducerConfig{ + MaxMessageBytes: 1000000, + RequiredAcks: WaitForLocal, + Compression: "none", + FlushMaxMessages: 0, + } +} + +func (c ProducerConfig) Validate() error { + switch c.Compression { + case "none", "gzip", "snappy", "lz4", "zstd": + // Valid compression + default: + return fmt.Errorf( + "compression should be one of 'none', 'gzip', 'snappy', 'lz4', or 'zstd'. configured value is %q", + c.Compression, + ) + } + return nil +} + +// Unmarshal unmarshals into ProducerConfig, allowing the user to specify any of ["all", -1, 0, 1] +// for required_acks. This is in line with standard Kafka producer configuration as described at +// https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#acks +// +// Note that confmap.Unmarshaler may only be implemented by structs, so we cannot define this method +// on RequiredAcks itself. +func (c *ProducerConfig) Unmarshal(conf *confmap.Conf) error { + if conf.Get("required_acks") == "all" { + if err := conf.Merge(confmap.NewFromStringMap( + map[string]any{"required_acks": WaitForAll}, + )); err != nil { + return err + } + } + return conf.Unmarshal(c) +} + +// RequiredAcks defines record acknowledgement behavior for for producers. +type RequiredAcks int + +const ( + // NoResponse doesn't send any response, the TCP ACK is all you get. + NoResponse RequiredAcks = 0 + // WaitForLocal waits for only the local commit to succeed before responding. + WaitForLocal RequiredAcks = 1 + // WaitForAll waits for all in-sync replicas to commit before responding. + // The minimum number of in-sync replicas is configured on the broker via + // the `min.insync.replicas` configuration key. + WaitForAll RequiredAcks = -1 +) + +func (r RequiredAcks) Validate() error { + if r < -1 || r > 1 { + return fmt.Errorf("expected 'all' (-1), 0, or 1; configured value is %v", r) + } + return nil +} + +type MetadataConfig struct { + // Whether to maintain a full set of metadata for all topics, or just + // the minimal set that has been necessary so far. The full set is simpler + // and usually more convenient, but can take up a substantial amount of + // memory if you have many topics and partitions. Defaults to true. + Full bool `mapstructure:"full"` + + // Retry configuration for metadata. + // This configuration is useful to avoid race conditions when broker + // is starting at the same time as collector. + Retry MetadataRetryConfig `mapstructure:"retry"` +} + +// MetadataRetryConfig defines retry configuration for Metadata. +type MetadataRetryConfig struct { + // The total number of times to retry a metadata request when the + // cluster is in the middle of a leader election or at startup (default 3). + Max int `mapstructure:"max"` + // How long to wait for leader election to occur before retrying + // (default 250ms). Similar to the JVM's `retry.backoff.ms`. + Backoff time.Duration `mapstructure:"backoff"` +} + +func NewDefaultMetadataConfig() MetadataConfig { + return MetadataConfig{ + Full: true, + Retry: MetadataRetryConfig{ + Max: 3, + Backoff: time.Millisecond * 250, + }, + } +} + +// AuthenticationConfig defines authentication-related configuration. +type AuthenticationConfig struct { + PlainText *PlainTextConfig `mapstructure:"plain_text"` + SASL *SASLConfig `mapstructure:"sasl"` + TLS *configtls.ClientConfig `mapstructure:"tls"` + Kerberos *KerberosConfig `mapstructure:"kerberos"` +} + +// PlainTextConfig defines plaintext authentication. +type PlainTextConfig struct { + Username string `mapstructure:"username"` + Password string `mapstructure:"password"` +} + +// SASLConfig defines the configuration for the SASL authentication. +type SASLConfig struct { + // Username to be used on authentication + Username string `mapstructure:"username"` + // Password to be used on authentication + Password string `mapstructure:"password"` + // SASL Mechanism to be used, possible values are: (PLAIN, AWS_MSK_IAM, AWS_MSK_IAM_OAUTHBEARER, SCRAM-SHA-256 or SCRAM-SHA-512). + Mechanism string `mapstructure:"mechanism"` + // SASL Protocol Version to be used, possible values are: (0, 1). Defaults to 0. + Version int `mapstructure:"version"` + // AWSMSK holds configuration specific to AWS MSK. + AWSMSK AWSMSKConfig `mapstructure:"aws_msk"` +} + +func (c SASLConfig) Validate() error { + switch c.Mechanism { + case "AWS_MSK_IAM", "AWS_MSK_IAM_OAUTHBEARER": + // TODO validate c.AWSMSK + case "PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512": + // Do nothing, valid mechanism + if c.Username == "" { + return fmt.Errorf("username is required") + } + if c.Password == "" { + return fmt.Errorf("password is required") + } + default: + return fmt.Errorf( + "mechanism should be one of 'PLAIN', 'AWS_MSK_IAM', 'AWS_MSK_IAM_OAUTHBEARER', 'SCRAM-SHA-256' or 'SCRAM-SHA-512'. configured value %v", + c.Mechanism, + ) + } + if c.Version < 0 || c.Version > 1 { + return fmt.Errorf("version has to be either 0 or 1. configured value %v", c.Version) + } + return nil +} + +// AWSMSKConfig defines the additional SASL authentication +// measures needed to use AWS_MSK_IAM and AWS_MSK_IAM_OAUTHBEARER mechanism +type AWSMSKConfig struct { + // Region is the AWS region the MSK cluster is based in + Region string `mapstructure:"region"` + // BrokerAddr is the client is connecting to in order to perform the auth required + BrokerAddr string `mapstructure:"broker_addr"` +} + +// KerberosConfig defines kerberos configuration. +type KerberosConfig struct { + ServiceName string `mapstructure:"service_name"` + Realm string `mapstructure:"realm"` + UseKeyTab bool `mapstructure:"use_keytab"` + Username string `mapstructure:"username"` + Password string `mapstructure:"password" json:"-"` + ConfigPath string `mapstructure:"config_file"` + KeyTabPath string `mapstructure:"keytab_file"` + DisablePAFXFAST bool `mapstructure:"disable_fast_negotiation"` +} diff --git a/internal/kafka/configkafka/config_test.go b/internal/kafka/configkafka/config_test.go new file mode 100644 index 000000000000..4fbe56469241 --- /dev/null +++ b/internal/kafka/configkafka/config_test.go @@ -0,0 +1,182 @@ +package configkafka + +import ( + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configtls" + "go.opentelemetry.io/collector/confmap/confmaptest" + "go.opentelemetry.io/collector/confmap/xconfmap" +) + +func TestClientConfig(t *testing.T) { + testConfig(t, "client_config.yaml", NewDefaultClientConfig, map[string]struct { + expected ClientConfig + expectedErr string + }{ + "": { + expected: NewDefaultClientConfig(), + }, + "full": { + expected: ClientConfig{ + Brokers: []string{"foo:123", "bar:456"}, + ResolveCanonicalBootstrapServersOnly: true, + ClientID: "vip", + ProtocolVersion: "1.2.3", + Authentication: AuthenticationConfig{ + TLS: &configtls.ClientConfig{ + Config: configtls.Config{ + CAFile: "ca.pem", + CertFile: "cert.pem", + KeyFile: "key.pem", + }, + }, + }, + Metadata: MetadataConfig{ + Full: false, + Retry: MetadataRetryConfig{ + Max: 10, + Backoff: 5 * time.Second, + }, + }, + }, + }, + "sasl_aws_msk_iam": { + expected: func() ClientConfig { + cfg := NewDefaultClientConfig() + cfg.Authentication.SASL = &SASLConfig{ + Mechanism: "AWS_MSK_IAM", + } + return cfg + }(), + }, + "sasl_plain": { + expected: func() ClientConfig { + cfg := NewDefaultClientConfig() + cfg.Authentication.SASL = &SASLConfig{ + Mechanism: "PLAIN", + Username: "abc", + Password: "def", + } + return cfg + }(), + }, + + // Invalid configurations + "brokers_required": { + expectedErr: "brokers must be specified", + }, + "invalid_protocol_version": { + expectedErr: "invalid protocol version: invalid version `none`", + }, + "sasl_invalid_mechanism": { + expectedErr: "auth::sasl: mechanism should be one of 'PLAIN', 'AWS_MSK_IAM', 'AWS_MSK_IAM_OAUTHBEARER', 'SCRAM-SHA-256' or 'SCRAM-SHA-512'. configured value FANCY", + }, + "sasl_invalid_version": { + expectedErr: "auth::sasl: version has to be either 0 or 1. configured value -1", + }, + "sasl_plain_username_required": { + expectedErr: "auth::sasl: username is required", + }, + "sasl_plain_password_required": { + expectedErr: "auth::sasl: password is required", + }, + }) +} + +func TestConsumerConfig(t *testing.T) { + testConfig(t, "consumer_config.yaml", NewDefaultConsumerConfig, map[string]struct { + expected ConsumerConfig + expectedErr string + }{ + "": { + expected: NewDefaultConsumerConfig(), + }, + "full": { + expected: ConsumerConfig{ + SessionTimeout: 5 * time.Second, + HeartbeatInterval: 2 * time.Second, + GroupID: "throng", + InitialOffset: "earliest", + AutoCommit: AutoCommitConfig{ + Enable: false, + Interval: 10 * time.Minute, + }, + MinFetchSize: 10, + DefaultFetchSize: 1024, + MaxFetchSize: 4096, + }, + }, + + // Invalid configurations + "invalid_initial_offset": { + expectedErr: "initial_offset should be one of 'latest' or 'earliest'. configured value middle", + }, + }) +} + +func TestProducerConfig(t *testing.T) { + testConfig(t, "producer_config.yaml", NewDefaultProducerConfig, map[string]struct { + expected ProducerConfig + expectedErr string + }{ + "": { + expected: NewDefaultProducerConfig(), + }, + "full": { + expected: ProducerConfig{ + MaxMessageBytes: 1, + RequiredAcks: 0, + Compression: "gzip", + FlushMaxMessages: 2, + }, + }, + "required_acks_all": { + expected: func() ProducerConfig { + cfg := NewDefaultProducerConfig() + cfg.RequiredAcks = WaitForAll + return cfg + }(), + }, + + // Invalid configurations + "invalid_compression": { + expectedErr: `compression should be one of 'none', 'gzip', 'snappy', 'lz4', or 'zstd'. configured value is "brotli"`, + }, + "invalid_required_acks": { + expectedErr: "required_acks: expected 'all' (-1), 0, or 1; configured value is 3", + }, + }) +} + +func testConfig[ConfigStruct any](t *testing.T, filename string, defaultConfig func() ConfigStruct, testcases map[string]struct { + expected ConfigStruct + expectedErr string +}, +) { + t.Parallel() + + cm, err := confmaptest.LoadConf(filepath.Join("testdata", filename)) + require.NoError(t, err) + + for name, tt := range testcases { + t.Run(name, func(t *testing.T) { + cfg := defaultConfig() + + sub, err := cm.Sub(component.NewIDWithName(component.MustNewType("kafka"), name).String()) + require.NoError(t, err) + require.NoError(t, sub.Unmarshal(&cfg)) + + err = xconfmap.Validate(cfg) + if tt.expectedErr != "" { + require.EqualError(t, err, tt.expectedErr) + } else { + require.NoError(t, err) + require.Equal(t, tt.expected, cfg) + } + }) + } +} diff --git a/internal/kafka/configkafka/testdata/client_config.yaml b/internal/kafka/configkafka/testdata/client_config.yaml new file mode 100644 index 000000000000..a6e784e7d2c5 --- /dev/null +++ b/internal/kafka/configkafka/testdata/client_config.yaml @@ -0,0 +1,74 @@ +kafka: {} +kafka/full: + brokers: ["foo:123", "bar:456"] + resolve_canonical_bootstrap_servers_only: true + client_id: vip + protocol_version: 1.2.3 + auth: + tls: + ca_file: ca.pem + cert_file: cert.pem + key_file: key.pem + metadata: + full: false + retry: + max: 10 + backoff: 5s +kafka/sasl_aws_msk_iam: + auth: + sasl: + mechanism: AWS_MSK_IAM +kafka/sasl_plain: + auth: + sasl: + mechanism: PLAIN + username: abc + password: def + +# Invalid configurations +kafka/brokers_required: + brokers: [] + +kafka/invalid_protocol_version: + protocol_version: none + +kafka/sasl_invalid_mechanism: + auth: + sasl: + mechanism: FANCY + +kafka/sasl_invalid_version: + auth: + sasl: + mechanism: PLAIN + username: abc + password: def + version: -1 + +kafka/sasl_plain_username_required: + auth: + sasl: + mechanism: PLAIN + password: xyz + +kafka/sasl_plain_password_required: + auth: + sasl: + mechanism: PLAIN + username: xyz + +kafka/foo: + brokers: + - "foo:123" + - "bar:456" + resolve_canonical_bootstrap_servers_only: true + client_id: otel-collector + auth: + tls: + ca_file: ca.pem + cert_file: cert.pem + key_file: key.pem + metadata: + retry: + max: 10 + backoff: 5s diff --git a/internal/kafka/configkafka/testdata/consumer_config.yaml b/internal/kafka/configkafka/testdata/consumer_config.yaml new file mode 100644 index 000000000000..ea585f949624 --- /dev/null +++ b/internal/kafka/configkafka/testdata/consumer_config.yaml @@ -0,0 +1,16 @@ +kafka: {} +kafka/full: + session_timeout: 5s + heartbeat_interval: 2s + group_id: throng + initial_offset: earliest + autocommit: + enable: false + interval: 10m + min_fetch_size: 10 + default_fetch_size: 1024 + max_fetch_size: 4096 + +# Invalid configurations +kafka/invalid_initial_offset: + initial_offset: middle diff --git a/internal/kafka/configkafka/testdata/producer_config.yaml b/internal/kafka/configkafka/testdata/producer_config.yaml new file mode 100644 index 000000000000..2375e7abd439 --- /dev/null +++ b/internal/kafka/configkafka/testdata/producer_config.yaml @@ -0,0 +1,14 @@ +kafka: {} +kafka/full: + max_message_bytes: 1 + required_acks: 0 + compression: gzip + flush_max_messages: 2 +kafka/required_acks_all: + required_acks: all + +# Invalid configurations +kafka/invalid_compression: + compression: brotli +kafka/invalid_required_acks: + required_acks: 3 diff --git a/internal/kafka/go.mod b/internal/kafka/go.mod index 849bbcd1fd81..805ec2a4c621 100644 --- a/internal/kafka/go.mod +++ b/internal/kafka/go.mod @@ -8,7 +8,10 @@ require ( github.com/aws/aws-sdk-go v1.55.6 github.com/stretchr/testify v1.10.0 github.com/xdg-go/scram v1.1.2 + go.opentelemetry.io/collector/component v1.27.1-0.20250307164521-7c787571daa5 go.opentelemetry.io/collector/config/configtls v1.27.1-0.20250307164521-7c787571daa5 + go.opentelemetry.io/collector/confmap v1.27.1-0.20250307164521-7c787571daa5 + go.opentelemetry.io/collector/confmap/xconfmap v0.121.1-0.20250307145831-dc9250a6c150 go.uber.org/goleak v1.3.0 go.uber.org/multierr v1.11.0 ) @@ -32,10 +35,13 @@ require ( github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect github.com/eapache/queue v1.1.0 // indirect github.com/fsnotify/fsnotify v1.8.0 // indirect + github.com/go-viper/mapstructure/v2 v2.2.1 // indirect + github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/go-uuid v1.0.3 // indirect + github.com/hashicorp/go-version v1.7.0 // indirect github.com/jcmturner/aescts/v2 v2.0.0 // indirect github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect github.com/jcmturner/gofork v1.7.6 // indirect @@ -43,15 +49,29 @@ require ( github.com/jcmturner/rpc/v2 v2.0.3 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/klauspost/compress v1.17.11 // indirect + github.com/knadh/koanf/maps v0.1.1 // indirect + github.com/knadh/koanf/providers/confmap v0.1.0 // indirect + github.com/knadh/koanf/v2 v2.1.2 // indirect + github.com/mitchellh/copystructure v1.2.0 // indirect + github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/pierrec/lz4/v4 v4.1.22 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect github.com/xdg-go/pbkdf2 v1.0.0 // indirect github.com/xdg-go/stringprep v1.0.4 // indirect go.opentelemetry.io/collector/config/configopaque v1.27.1-0.20250307164521-7c787571daa5 // indirect + go.opentelemetry.io/collector/featuregate v1.27.1-0.20250307164521-7c787571daa5 // indirect + go.opentelemetry.io/collector/pdata v1.27.1-0.20250307164521-7c787571daa5 // indirect + go.opentelemetry.io/otel v1.34.0 // indirect + go.opentelemetry.io/otel/metric v1.34.0 // indirect + go.opentelemetry.io/otel/trace v1.34.0 // indirect + go.uber.org/zap v1.27.0 // indirect golang.org/x/crypto v0.33.0 // indirect golang.org/x/net v0.35.0 // indirect golang.org/x/sys v0.30.0 // indirect golang.org/x/text v0.22.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f // indirect + google.golang.org/grpc v1.71.0 // indirect + google.golang.org/protobuf v1.36.5 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/internal/kafka/go.sum b/internal/kafka/go.sum index 0705e2ccaf98..5c24604c6858 100644 --- a/internal/kafka/go.sum +++ b/internal/kafka/go.sum @@ -44,8 +44,22 @@ github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8 github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= github.com/fsnotify/fsnotify v1.8.0 h1:dAwr6QBTBZIkG8roQaJjGof0pp0EeF+tNV7YBP3F/8M= github.com/fsnotify/fsnotify v1.8.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/go-viper/mapstructure/v2 v2.2.1 h1:ZAaOCxANMuZx5RCeg0mBdEZk7DZasvvZIxtHqx8aGss= +github.com/go-viper/mapstructure/v2 v2.2.1/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= +github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= @@ -56,6 +70,8 @@ github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9 github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/go-uuid v1.0.3 h1:2gKiV6YVmrJ1i2CKKa9obLvRieoRGviZFL26PcT/Co8= github.com/hashicorp/go-uuid v1.0.3/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/hashicorp/go-version v1.7.0 h1:5tqGy27NaOTB8yJKUZELlFAS/LTKJkrmONwQKeRZfjY= +github.com/hashicorp/go-version v1.7.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8= github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs= github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo= @@ -72,12 +88,24 @@ github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9Y github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8= github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= +github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= +github.com/knadh/koanf/maps v0.1.1 h1:G5TjmUh2D7G2YWf5SQQqSiHRJEjaicvU0KpypqB3NIs= +github.com/knadh/koanf/maps v0.1.1/go.mod h1:npD/QZY3V6ghQDdcQzl1W4ICNVTkohC8E73eI2xW4yI= +github.com/knadh/koanf/providers/confmap v0.1.0 h1:gOkxhHkemwG4LezxxN8DMOFopOPghxRVp7JbIvdvqzU= +github.com/knadh/koanf/providers/confmap v0.1.0/go.mod h1:2uLhxQzJnyHKfxG927awZC7+fyHFdQkd697K4MdLnIU= +github.com/knadh/koanf/v2 v2.1.2 h1:I2rtLRqXRy1p01m/utEtpZSSA6dcJbgGVuE27kW2PzQ= +github.com/knadh/koanf/v2 v2.1.2/go.mod h1:Gphfaen0q1Fc1HTgJgSTC4oRX9R2R5ErYMZJy8fLJBo= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw= +github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s= +github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ= +github.com/mitchellh/reflectwalk v1.0.2/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw= github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU= github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= @@ -102,23 +130,56 @@ github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= +github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= +go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= +go.opentelemetry.io/collector/component v1.27.1-0.20250307164521-7c787571daa5 h1:t+Fe2/UIP77585G3fs/8q7EhklYVXgvIvYA6FMZD4qg= +go.opentelemetry.io/collector/component v1.27.1-0.20250307164521-7c787571daa5/go.mod h1:ziknjoG2/kGMIq9vBUpzMQKIQDL53jxgj5bwMoO/zC0= go.opentelemetry.io/collector/config/configopaque v1.27.1-0.20250307164521-7c787571daa5 h1:cqm0I1LAwQDIBklNZZ3yNwdg/QaFPf/p+MoVxeGA8xw= go.opentelemetry.io/collector/config/configopaque v1.27.1-0.20250307164521-7c787571daa5/go.mod h1:GYQiC8IejBcwE8z0O4DwbBR/Hf6U7d8DTf+cszyqwFs= go.opentelemetry.io/collector/config/configtls v1.27.1-0.20250307164521-7c787571daa5 h1:PERaUEfslTVIto8p0BtAinn2VHdZvWcEkf5l+wm0fUc= go.opentelemetry.io/collector/config/configtls v1.27.1-0.20250307164521-7c787571daa5/go.mod h1:i6kX7oboR1sO+J+hDImtKH4GnNCFiwcTAr2fzGRP0kI= +go.opentelemetry.io/collector/confmap v1.27.1-0.20250307164521-7c787571daa5 h1:ahX6KiN/zA1tyHpA52RY8wcM3yTdLmxkJ5jackf6S5M= +go.opentelemetry.io/collector/confmap v1.27.1-0.20250307164521-7c787571daa5/go.mod h1:6VV+Zoc+4tUpViZLFxo4ra/YNiyISwmJIgCchy1TJa0= +go.opentelemetry.io/collector/confmap/xconfmap v0.121.1-0.20250307145831-dc9250a6c150 h1:bkOPeoVuzip3r1SfvN9PvBJN6kujGgu6HDFID0X1m1g= +go.opentelemetry.io/collector/confmap/xconfmap v0.121.1-0.20250307145831-dc9250a6c150/go.mod h1:npXgwAEcNHOf04WT3DLTxsErOdMbzClzu1ul7YetuX8= +go.opentelemetry.io/collector/featuregate v1.27.1-0.20250307164521-7c787571daa5 h1:oIg7O8rLfULCG4lUJdfB5SyJvGpLge71H9tc4T9G8uQ= +go.opentelemetry.io/collector/featuregate v1.27.1-0.20250307164521-7c787571daa5/go.mod h1:Y/KsHbvREENKvvN9RlpiWk/IGBK+CATBYzIIpU7nccc= +go.opentelemetry.io/collector/pdata v1.27.1-0.20250307164521-7c787571daa5 h1:oYWAIyyo690KlLokRfpXz4MsUVGVi/zX23bv9R+ualk= +go.opentelemetry.io/collector/pdata v1.27.1-0.20250307164521-7c787571daa5/go.mod h1:nFXOEpZx43ykMZJd87AHWIJKqDP+UMMKydIy59m5SEs= +go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY= +go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI= +go.opentelemetry.io/otel/metric v1.34.0 h1:+eTR3U0MyfWjRDhmFMxe2SsW64QrZ84AOhvqS7Y+PoQ= +go.opentelemetry.io/otel/metric v1.34.0/go.mod h1:CEDrp0fy2D0MvkXE+dPV7cMi8tWZwX3dmaIhwPOaqHE= +go.opentelemetry.io/otel/sdk v1.34.0 h1:95zS4k/2GOy069d321O8jWgYsW3MzVV+KuSPKp7Wr1A= +go.opentelemetry.io/otel/sdk v1.34.0/go.mod h1:0e/pNiaMAqaykJGKbi+tSjWfNNHMTxoC9qANsCzbyxU= +go.opentelemetry.io/otel/sdk/metric v1.34.0 h1:5CeK9ujjbFVL5c1PhLuStg1wxA7vQv7ce1EK0Gyvahk= +go.opentelemetry.io/otel/sdk/metric v1.34.0/go.mod h1:jQ/r8Ze28zRKoNRdkjCZxfs6YvBTG1+YIqyFVFYec5w= +go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC8mh/k= +go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= +go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= golang.org/x/crypto v0.33.0 h1:IOBPskki6Lysi0lo9qQvbxiQ+FvsCC/YWOecCHAixus= golang.org/x/crypto v0.33.0/go.mod h1:bVdXmD7IV/4GdElGPozy6U7lWdRXA4qyRVGJV57uQ5M= +golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= @@ -126,10 +187,14 @@ golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.35.0 h1:T5GQRQb2y08kTAByq9L4/bz8cipCdA8FbRTXewonqY8= golang.org/x/net v0.35.0/go.mod h1:EglIi67kWsHKlRzzVMUD93VMSWGFOMSZgxFjparz1Qk= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.11.0 h1:GGz8+XQP4FvTTrjZPzNKTMFtSXH80RAzG+5ghFPgK9w= golang.org/x/sync v0.11.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -149,8 +214,19 @@ golang.org/x/text v0.22.0 h1:bofq7m3/HAFvbF51jz3Q9wLg3jkvSPuiZu/pD1XwgtM= golang.org/x/text v0.22.0/go.mod h1:YRoo4H8PVmsu+E3Ou7cqLVH8oXWIHVoX0jqUWALQhfY= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f h1:OxYkA3wjPsZyBylwymxSHa7ViiW1Sml4ToBrncvFehI= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f/go.mod h1:+2Yz8+CLJbIfL9z73EW45avw8Lmge3xVElCP9zEKi50= +google.golang.org/grpc v1.71.0 h1:kF77BGdPTQ4/JZWMlb9VpJ5pa25aqvVqogsxNHHdeBg= +google.golang.org/grpc v1.71.0/go.mod h1:H0GRtasmQOh9LkFoCPDu3ZrwUtD1YGE+b2vYBYd/8Ec= +google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM= +google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= From 1965928f8a25f8a65c16b7f8ee2261349b54b016 Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Sat, 8 Mar 2025 16:50:28 +0800 Subject: [PATCH 2/8] Update extension/observer/kafkatopicsobserver --- extension/observer/kafkatopicsobserver/config.go | 10 +++++----- extension/observer/kafkatopicsobserver/config_test.go | 6 +++--- extension/observer/kafkatopicsobserver/extension.go | 2 +- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/extension/observer/kafkatopicsobserver/config.go b/extension/observer/kafkatopicsobserver/config.go index 3723364efa5f..6be8f9d913d0 100644 --- a/extension/observer/kafkatopicsobserver/config.go +++ b/extension/observer/kafkatopicsobserver/config.go @@ -9,7 +9,7 @@ import ( "go.uber.org/multierr" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka/configkafka" ) // Config defines configuration for docker observer @@ -22,10 +22,10 @@ type Config struct { // required in SASL environments. ResolveCanonicalBootstrapServersOnly bool `mapstructure:"resolve_canonical_bootstrap_servers_only"` // Kafka protocol version - ProtocolVersion string `mapstructure:"protocol_version"` - Authentication kafka.Authentication `mapstructure:"auth"` - TopicRegex string `mapstructure:"topic_regex"` - TopicsSyncInterval time.Duration `mapstructure:"topics_sync_interval"` + ProtocolVersion string `mapstructure:"protocol_version"` + Authentication configkafka.AuthenticationConfig `mapstructure:"auth"` + TopicRegex string `mapstructure:"topic_regex"` + TopicsSyncInterval time.Duration `mapstructure:"topics_sync_interval"` } func (config *Config) Validate() (errs error) { diff --git a/extension/observer/kafkatopicsobserver/config_test.go b/extension/observer/kafkatopicsobserver/config_test.go index af6ff3b919dc..219ed31f19f9 100644 --- a/extension/observer/kafkatopicsobserver/config_test.go +++ b/extension/observer/kafkatopicsobserver/config_test.go @@ -16,7 +16,7 @@ import ( "go.opentelemetry.io/collector/confmap/xconfmap" "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/kafkatopicsobserver/internal/metadata" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka/configkafka" ) func TestLoadConfig(t *testing.T) { @@ -40,8 +40,8 @@ func TestLoadConfig(t *testing.T) { TopicRegex: "^topic[0-9]$", TopicsSyncInterval: 5 * time.Second, ResolveCanonicalBootstrapServersOnly: false, - Authentication: kafka.Authentication{ - PlainText: &kafka.PlainTextConfig{ + Authentication: configkafka.AuthenticationConfig{ + PlainText: &configkafka.PlainTextConfig{ Username: "fooUser", Password: "fooPassword", }, diff --git a/extension/observer/kafkatopicsobserver/extension.go b/extension/observer/kafkatopicsobserver/extension.go index 7fb0601b5b8b..6511aff12b41 100644 --- a/extension/observer/kafkatopicsobserver/extension.go +++ b/extension/observer/kafkatopicsobserver/extension.go @@ -140,7 +140,7 @@ var createKafkaClusterAdmin = func(ctx context.Context, config Config) (sarama.C return nil, err } } - if err := kafka.ConfigureAuthentication(ctx, config.Authentication, saramaConfig); err != nil { + if err := kafka.ConfigureSaramaAuthentication(ctx, config.Authentication, saramaConfig); err != nil { return nil, err } return sarama.NewClusterAdmin(config.Brokers, saramaConfig) From 14bbb96784be53bd6318cf27375bd3f347e67d8e Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Sat, 8 Mar 2025 16:56:41 +0800 Subject: [PATCH 3/8] Update exporter/kafkaexporter --- exporter/kafkaexporter/config.go | 39 +------ exporter/kafkaexporter/config_test.go | 109 ++---------------- exporter/kafkaexporter/go.mod | 2 +- exporter/kafkaexporter/kafka_exporter.go | 2 +- exporter/kafkaexporter/kafka_exporter_test.go | 34 ------ 5 files changed, 15 insertions(+), 171 deletions(-) diff --git a/exporter/kafkaexporter/config.go b/exporter/kafkaexporter/config.go index 6c66cc121cad..06885f4b1791 100644 --- a/exporter/kafkaexporter/config.go +++ b/exporter/kafkaexporter/config.go @@ -12,7 +12,7 @@ import ( "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/exporter/exporterhelper" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka/configkafka" ) // Config defines configuration for Kafka exporter. @@ -63,7 +63,7 @@ type Config struct { Producer Producer `mapstructure:"producer"` // Authentication defines used authentication mechanism. - Authentication kafka.Authentication `mapstructure:"auth"` + Authentication configkafka.AuthenticationConfig `mapstructure:"auth"` } // Metadata defines configuration for retrieving metadata from the broker. @@ -121,41 +121,8 @@ func (cfg *Config) Validate() error { if cfg.Producer.RequiredAcks < -1 || cfg.Producer.RequiredAcks > 1 { return fmt.Errorf("producer.required_acks has to be between -1 and 1. configured value %v", cfg.Producer.RequiredAcks) } - _, err := saramaProducerCompressionCodec(cfg.Producer.Compression) - if err != nil { - return err - } - - return validateSASLConfig(cfg.Authentication.SASL) -} - -func validateSASLConfig(c *kafka.SASLConfig) error { - if c == nil { - return nil - } - - if c.Mechanism != "AWS_MSK_IAM" && c.Mechanism != "AWS_MSK_IAM_OAUTHBEARER" { - if c.Username == "" { - return fmt.Errorf("auth.sasl.username is required") - } - if c.Password == "" { - return fmt.Errorf("auth.sasl.password is required") - } - } - - switch c.Mechanism { - case "PLAIN", "AWS_MSK_IAM", "AWS_MSK_IAM_OAUTHBEARER", "SCRAM-SHA-256", "SCRAM-SHA-512": - // Do nothing, valid mechanism - default: - return fmt.Errorf("auth.sasl.mechanism should be one of 'PLAIN', 'AWS_MSK_IAM', 'AWS_MSK_IAM_OAUTHBEARER', 'SCRAM-SHA-256' or 'SCRAM-SHA-512'. configured value %v", c.Mechanism) - } - - if c.Version < 0 || c.Version > 1 { - return fmt.Errorf("auth.sasl.version has to be either 0 or 1. configured value %v", c.Version) - } - - return nil + return err } func saramaProducerCompressionCodec(compression string) (sarama.CompressionCodec, error) { diff --git a/exporter/kafkaexporter/config_test.go b/exporter/kafkaexporter/config_test.go index 43f4ac94966e..2b1719a3e07a 100644 --- a/exporter/kafkaexporter/config_test.go +++ b/exporter/kafkaexporter/config_test.go @@ -20,7 +20,7 @@ import ( "go.opentelemetry.io/collector/exporter/exporterhelper" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter/internal/metadata" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka/configkafka" ) func TestLoadConfig(t *testing.T) { @@ -63,8 +63,8 @@ func TestLoadConfig(t *testing.T) { PartitionLogsByResourceAttributes: true, Brokers: []string{"foo:123", "bar:456"}, ClientID: "test_client_id", - Authentication: kafka.Authentication{ - PlainText: &kafka.PlainTextConfig{ + Authentication: configkafka.AuthenticationConfig{ + PlainText: &configkafka.PlainTextConfig{ Username: "jdoe", Password: "pass", }, @@ -86,8 +86,8 @@ func TestLoadConfig(t *testing.T) { { id: component.NewIDWithName(metadata.Type, ""), option: func(conf *Config) { - conf.Authentication = kafka.Authentication{ - SASL: &kafka.SASLConfig{ + conf.Authentication = configkafka.AuthenticationConfig{ + SASL: &configkafka.SASLConfig{ Username: "jdoe", Password: "pass", Mechanism: "PLAIN", @@ -119,12 +119,12 @@ func TestLoadConfig(t *testing.T) { PartitionLogsByResourceAttributes: true, Brokers: []string{"foo:123", "bar:456"}, ClientID: "test_client_id", - Authentication: kafka.Authentication{ - PlainText: &kafka.PlainTextConfig{ + Authentication: configkafka.AuthenticationConfig{ + PlainText: &configkafka.PlainTextConfig{ Username: "jdoe", Password: "pass", }, - SASL: &kafka.SASLConfig{ + SASL: &configkafka.SASLConfig{ Username: "jdoe", Password: "pass", Mechanism: "PLAIN", @@ -175,8 +175,8 @@ func TestLoadConfig(t *testing.T) { Brokers: []string{"foo:123", "bar:456"}, ClientID: "test_client_id", ResolveCanonicalBootstrapServersOnly: true, - Authentication: kafka.Authentication{ - PlainText: &kafka.PlainTextConfig{ + Authentication: configkafka.AuthenticationConfig{ + PlainText: &configkafka.PlainTextConfig{ Username: "jdoe", Password: "pass", }, @@ -222,95 +222,6 @@ func TestValidate_err_compression(t *testing.T) { assert.EqualError(t, err, "producer.compression should be one of 'none', 'gzip', 'snappy', 'lz4', or 'zstd'. configured value idk") } -func TestValidate_sasl_username(t *testing.T) { - config := &Config{ - Producer: Producer{ - Compression: "none", - }, - Authentication: kafka.Authentication{ - SASL: &kafka.SASLConfig{ - Username: "", - Password: "pass", - Mechanism: "PLAIN", - }, - }, - } - - err := config.Validate() - assert.EqualError(t, err, "auth.sasl.username is required") -} - -func TestValidate_sasl_password(t *testing.T) { - config := &Config{ - Producer: Producer{ - Compression: "none", - }, - Authentication: kafka.Authentication{ - SASL: &kafka.SASLConfig{ - Username: "jdoe", - Password: "", - Mechanism: "PLAIN", - }, - }, - } - - err := config.Validate() - assert.EqualError(t, err, "auth.sasl.password is required") -} - -func TestValidate_sasl_mechanism(t *testing.T) { - config := &Config{ - Producer: Producer{ - Compression: "none", - }, - Authentication: kafka.Authentication{ - SASL: &kafka.SASLConfig{ - Username: "jdoe", - Password: "pass", - Mechanism: "FAKE", - }, - }, - } - - err := config.Validate() - assert.EqualError(t, err, "auth.sasl.mechanism should be one of 'PLAIN', 'AWS_MSK_IAM', 'AWS_MSK_IAM_OAUTHBEARER', 'SCRAM-SHA-256' or 'SCRAM-SHA-512'. configured value FAKE") -} - -func TestValidate_sasl_version(t *testing.T) { - config := &Config{ - Producer: Producer{ - Compression: "none", - }, - Authentication: kafka.Authentication{ - SASL: &kafka.SASLConfig{ - Username: "jdoe", - Password: "pass", - Mechanism: "PLAIN", - Version: 42, - }, - }, - } - - err := config.Validate() - assert.EqualError(t, err, "auth.sasl.version has to be either 0 or 1. configured value 42") -} - -func TestValidate_sasl_iam(t *testing.T) { - config := &Config{ - Producer: Producer{ - Compression: "none", - }, - Authentication: kafka.Authentication{ - SASL: &kafka.SASLConfig{ - Mechanism: "AWS_MSK_IAM", - }, - }, - } - - err := config.Validate() - assert.NoError(t, err) -} - func Test_saramaProducerCompressionCodec(t *testing.T) { tests := map[string]struct { compression string diff --git a/exporter/kafkaexporter/go.mod b/exporter/kafkaexporter/go.mod index d50c7d94179b..bcb35d085327 100644 --- a/exporter/kafkaexporter/go.mod +++ b/exporter/kafkaexporter/go.mod @@ -19,7 +19,6 @@ require ( go.opentelemetry.io/collector/component v1.27.1-0.20250307164521-7c787571daa5 go.opentelemetry.io/collector/component/componenttest v0.121.1-0.20250307145831-dc9250a6c150 go.opentelemetry.io/collector/config/configretry v1.27.1-0.20250307164521-7c787571daa5 - go.opentelemetry.io/collector/config/configtls v1.27.1-0.20250307164521-7c787571daa5 go.opentelemetry.io/collector/confmap v1.27.1-0.20250307164521-7c787571daa5 go.opentelemetry.io/collector/confmap/xconfmap v0.121.1-0.20250307164521-7c787571daa5 go.opentelemetry.io/collector/consumer v1.27.1-0.20250307164521-7c787571daa5 @@ -90,6 +89,7 @@ require ( github.com/xdg-go/stringprep v1.0.4 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect go.opentelemetry.io/collector/config/configopaque v1.27.1-0.20250307164521-7c787571daa5 // indirect + go.opentelemetry.io/collector/config/configtls v1.27.1-0.20250307164521-7c787571daa5 // indirect go.opentelemetry.io/collector/consumer/consumertest v0.121.1-0.20250307145831-dc9250a6c150 // indirect go.opentelemetry.io/collector/consumer/xconsumer v0.121.1-0.20250307145831-dc9250a6c150 // indirect go.opentelemetry.io/collector/exporter/xexporter v0.121.1-0.20250307145831-dc9250a6c150 // indirect diff --git a/exporter/kafkaexporter/kafka_exporter.go b/exporter/kafkaexporter/kafka_exporter.go index 6720237fabe2..b63769e8464c 100644 --- a/exporter/kafkaexporter/kafka_exporter.go +++ b/exporter/kafkaexporter/kafka_exporter.go @@ -236,7 +236,7 @@ func newSaramaProducer(ctx context.Context, config Config) (sarama.SyncProducer, c.Version = version } - if err := kafka.ConfigureAuthentication(ctx, config.Authentication, c); err != nil { + if err := kafka.ConfigureSaramaAuthentication(ctx, config.Authentication, c); err != nil { return nil, err } diff --git a/exporter/kafkaexporter/kafka_exporter_test.go b/exporter/kafkaexporter/kafka_exporter_test.go index be0fb1c64c09..db9f477d2b82 100644 --- a/exporter/kafkaexporter/kafka_exporter_test.go +++ b/exporter/kafkaexporter/kafka_exporter_test.go @@ -14,7 +14,6 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/config/configtls" "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" @@ -23,7 +22,6 @@ import ( "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter/internal/metadata" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/topic" ) @@ -111,38 +109,6 @@ func TestLogsExporter_encoding_extension(t *testing.T) { assert.NotContains(t, err.Error(), errUnrecognizedEncoding.Error()) } -func TestNewExporter_err_auth_type(t *testing.T) { - c := Config{ - ProtocolVersion: "2.0.0", - Authentication: kafka.Authentication{ - TLS: &configtls.ClientConfig{ - Config: configtls.Config{ - CAFile: "/nonexistent", - }, - }, - }, - Encoding: defaultEncoding, - Metadata: Metadata{ - Full: false, - }, - Producer: Producer{ - Compression: "none", - }, - } - texp := newTracesExporter(c, exportertest.NewNopSettings(metadata.Type)) - require.NotNil(t, texp) - err := texp.start(context.Background(), componenttest.NewNopHost()) - assert.ErrorContains(t, err, "failed to load TLS config") - mexp := newMetricsExporter(c, exportertest.NewNopSettings(metadata.Type)) - require.NotNil(t, mexp) - err = mexp.start(context.Background(), componenttest.NewNopHost()) - assert.ErrorContains(t, err, "failed to load TLS config") - lexp := newLogsExporter(c, exportertest.NewNopSettings(metadata.Type)) - require.NotNil(t, lexp) - err = lexp.start(context.Background(), componenttest.NewNopHost()) - assert.ErrorContains(t, err, "failed to load TLS config") -} - func TestNewExporter_err_compression(t *testing.T) { c := Config{ Encoding: defaultEncoding, From c4a196b3d12a5e819926e3997f5717acdd1cf4b1 Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Sat, 8 Mar 2025 16:57:51 +0800 Subject: [PATCH 4/8] Update receiver/kafkametricsreceiver --- receiver/kafkametricsreceiver/config.go | 4 ++-- receiver/kafkametricsreceiver/config_test.go | 4 ++-- receiver/kafkametricsreceiver/go.sum | 2 ++ receiver/kafkametricsreceiver/receiver.go | 2 +- receiver/kafkametricsreceiver/receiver_test.go | 16 ---------------- 5 files changed, 7 insertions(+), 21 deletions(-) diff --git a/receiver/kafkametricsreceiver/config.go b/receiver/kafkametricsreceiver/config.go index 8a6b003c64c9..3174979e0842 100644 --- a/receiver/kafkametricsreceiver/config.go +++ b/receiver/kafkametricsreceiver/config.go @@ -8,7 +8,7 @@ import ( "go.opentelemetry.io/collector/scraper/scraperhelper" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka/configkafka" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkametricsreceiver/internal/metadata" ) @@ -38,7 +38,7 @@ type Config struct { GroupMatch string `mapstructure:"group_match"` // Authentication data - Authentication kafka.Authentication `mapstructure:"auth"` + Authentication configkafka.AuthenticationConfig `mapstructure:"auth"` // Cluster metadata refresh frequency // Configures the refresh frequency to update cached cluster metadata diff --git a/receiver/kafkametricsreceiver/config_test.go b/receiver/kafkametricsreceiver/config_test.go index d65eebb9c898..b24875fb83e7 100644 --- a/receiver/kafkametricsreceiver/config_test.go +++ b/receiver/kafkametricsreceiver/config_test.go @@ -14,7 +14,7 @@ import ( "go.opentelemetry.io/collector/confmap/confmaptest" "go.opentelemetry.io/collector/scraper/scraperhelper" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka/configkafka" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkametricsreceiver/internal/metadata" ) @@ -35,7 +35,7 @@ func TestLoadConfig(t *testing.T) { ProtocolVersion: "2.0.0", TopicMatch: "test_\\w+", GroupMatch: "test_\\w+", - Authentication: kafka.Authentication{ + Authentication: configkafka.AuthenticationConfig{ TLS: &configtls.ClientConfig{ Config: configtls.Config{ CAFile: "ca.pem", diff --git a/receiver/kafkametricsreceiver/go.sum b/receiver/kafkametricsreceiver/go.sum index 3f591d0fd5ac..96ad97db1d0c 100644 --- a/receiver/kafkametricsreceiver/go.sum +++ b/receiver/kafkametricsreceiver/go.sum @@ -157,6 +157,8 @@ go.opentelemetry.io/collector/config/configtls v1.27.1-0.20250307164521-7c787571 go.opentelemetry.io/collector/config/configtls v1.27.1-0.20250307164521-7c787571daa5/go.mod h1:i6kX7oboR1sO+J+hDImtKH4GnNCFiwcTAr2fzGRP0kI= go.opentelemetry.io/collector/confmap v1.27.1-0.20250307164521-7c787571daa5 h1:ahX6KiN/zA1tyHpA52RY8wcM3yTdLmxkJ5jackf6S5M= go.opentelemetry.io/collector/confmap v1.27.1-0.20250307164521-7c787571daa5/go.mod h1:6VV+Zoc+4tUpViZLFxo4ra/YNiyISwmJIgCchy1TJa0= +go.opentelemetry.io/collector/confmap/xconfmap v0.121.1-0.20250307145831-dc9250a6c150 h1:bkOPeoVuzip3r1SfvN9PvBJN6kujGgu6HDFID0X1m1g= +go.opentelemetry.io/collector/confmap/xconfmap v0.121.1-0.20250307145831-dc9250a6c150/go.mod h1:npXgwAEcNHOf04WT3DLTxsErOdMbzClzu1ul7YetuX8= go.opentelemetry.io/collector/consumer v1.27.1-0.20250307164521-7c787571daa5 h1:7nstyK9IqwNojPurS3epKZn7tEjZaMWFI24rVZpcQCM= go.opentelemetry.io/collector/consumer v1.27.1-0.20250307164521-7c787571daa5/go.mod h1:FfEUMYyi/fj0nZQSLQSLnbGMiw/B5cuKbLkD0LJ2iAs= go.opentelemetry.io/collector/consumer/consumererror v0.121.1-0.20250307145831-dc9250a6c150 h1:qW9stu/9TkgBGhEF8i9kUBokCxlNK284EfV7OjuZhgc= diff --git a/receiver/kafkametricsreceiver/receiver.go b/receiver/kafkametricsreceiver/receiver.go index c54dbdad9a8e..b8b2995b01b4 100644 --- a/receiver/kafkametricsreceiver/receiver.go +++ b/receiver/kafkametricsreceiver/receiver.go @@ -54,7 +54,7 @@ var newMetricsReceiver = func( sc.Metadata.RefreshFrequency = config.RefreshFrequency } - if err := kafka.ConfigureAuthentication(ctx, config.Authentication, sc); err != nil { + if err := kafka.ConfigureSaramaAuthentication(ctx, config.Authentication, sc); err != nil { return nil, err } scraperControllerOptions := make([]scraperhelper.ControllerOption, 0, len(config.Scrapers)) diff --git a/receiver/kafkametricsreceiver/receiver_test.go b/receiver/kafkametricsreceiver/receiver_test.go index 0599464adae9..de716bfd7da2 100644 --- a/receiver/kafkametricsreceiver/receiver_test.go +++ b/receiver/kafkametricsreceiver/receiver_test.go @@ -10,14 +10,12 @@ import ( "github.com/IBM/sarama" "github.com/stretchr/testify/assert" - "go.opentelemetry.io/collector/config/configtls" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/receiver/receivertest" "go.opentelemetry.io/collector/scraper" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkametricsreceiver/internal/metadata" ) @@ -46,20 +44,6 @@ func TestNewReceiver_invalid_scraper_error(t *testing.T) { } } -func TestNewReceiver_invalid_auth_error(t *testing.T) { - c := createDefaultConfig().(*Config) - c.Authentication = kafka.Authentication{ - TLS: &configtls.ClientConfig{ - Config: configtls.Config{ - CAFile: "/invalid", - }, - }, - } - r, err := newMetricsReceiver(context.Background(), *c, receivertest.NewNopSettings(metadata.Type), nil) - assert.ErrorContains(t, err, "failed to load TLS config") - assert.Nil(t, r) -} - func TestNewReceiver_refresh_frequency(t *testing.T) { c := createDefaultConfig().(*Config) c.RefreshFrequency = 1 From e5501e15ebaaeb2bcd6ccfc93bd965f2aadf027d Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Sun, 9 Mar 2025 14:09:55 +0800 Subject: [PATCH 5/8] Update receiver/kafkareceiver --- receiver/kafkareceiver/config.go | 4 +- receiver/kafkareceiver/config_test.go | 6 +- receiver/kafkareceiver/kafka_receiver.go | 2 +- receiver/kafkareceiver/kafka_receiver_test.go | 69 ------------------- 4 files changed, 6 insertions(+), 75 deletions(-) diff --git a/receiver/kafkareceiver/config.go b/receiver/kafkareceiver/config.go index 50796c065693..e8f5decacc58 100644 --- a/receiver/kafkareceiver/config.go +++ b/receiver/kafkareceiver/config.go @@ -10,7 +10,7 @@ import ( "go.opentelemetry.io/collector/config/configretry" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka/configkafka" ) type AutoCommit struct { @@ -69,7 +69,7 @@ type Config struct { // Client, and shared by the Producer/Consumer. Metadata kafkaexporter.Metadata `mapstructure:"metadata"` - Authentication kafka.Authentication `mapstructure:"auth"` + Authentication configkafka.AuthenticationConfig `mapstructure:"auth"` // Controls the auto-commit functionality AutoCommit AutoCommit `mapstructure:"autocommit"` diff --git a/receiver/kafkareceiver/config_test.go b/receiver/kafkareceiver/config_test.go index c00220c80578..c8049509b50d 100644 --- a/receiver/kafkareceiver/config_test.go +++ b/receiver/kafkareceiver/config_test.go @@ -17,7 +17,7 @@ import ( "go.opentelemetry.io/collector/confmap/xconfmap" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka/configkafka" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver/internal/metadata" ) @@ -44,7 +44,7 @@ func TestLoadConfig(t *testing.T) { InitialOffset: "latest", SessionTimeout: 10 * time.Second, HeartbeatInterval: 3 * time.Second, - Authentication: kafka.Authentication{ + Authentication: configkafka.AuthenticationConfig{ TLS: &configtls.ClientConfig{ Config: configtls.Config{ CAFile: "ca.pem", @@ -83,7 +83,7 @@ func TestLoadConfig(t *testing.T) { InitialOffset: "earliest", SessionTimeout: 45 * time.Second, HeartbeatInterval: 15 * time.Second, - Authentication: kafka.Authentication{ + Authentication: configkafka.AuthenticationConfig{ TLS: &configtls.ClientConfig{ Config: configtls.Config{ CAFile: "ca.pem", diff --git a/receiver/kafkareceiver/kafka_receiver.go b/receiver/kafkareceiver/kafka_receiver.go index 3bc09797bd61..f8183e783e3c 100644 --- a/receiver/kafkareceiver/kafka_receiver.go +++ b/receiver/kafkareceiver/kafka_receiver.go @@ -161,7 +161,7 @@ func createKafkaClient(ctx context.Context, config Config) (sarama.ConsumerGroup return nil, err } } - if err := kafka.ConfigureAuthentication(ctx, config.Authentication, saramaConfig); err != nil { + if err := kafka.ConfigureSaramaAuthentication(ctx, config.Authentication, saramaConfig); err != nil { return nil, err } return sarama.NewConsumerGroup(config.Brokers, config.GroupID, saramaConfig) diff --git a/receiver/kafkareceiver/kafka_receiver_test.go b/receiver/kafkareceiver/kafka_receiver_test.go index 0abf2a9fb499..d7144d6ca56c 100644 --- a/receiver/kafkareceiver/kafka_receiver_test.go +++ b/receiver/kafkareceiver/kafka_receiver_test.go @@ -16,7 +16,6 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/config/configtls" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" @@ -31,9 +30,7 @@ import ( "go.uber.org/zap/zapcore" "go.uber.org/zap/zaptest/observer" - "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/textutils" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver/internal/metadata" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver/internal/metadatatest" ) @@ -60,28 +57,6 @@ func TestNewTracesReceiver_encoding_err(t *testing.T) { assert.EqualError(t, err, errUnrecognizedEncoding.Error()) } -func TestNewTracesReceiver_err_auth_type(t *testing.T) { - c := Config{ - ProtocolVersion: "2.0.0", - Authentication: kafka.Authentication{ - TLS: &configtls.ClientConfig{ - Config: configtls.Config{ - CAFile: "/nonexistent", - }, - }, - }, - Encoding: defaultEncoding, - Metadata: kafkaexporter.Metadata{ - Full: false, - }, - } - r, err := newTracesReceiver(c, receivertest.NewNopSettings(metadata.Type), consumertest.NewNop()) - require.NoError(t, err) - require.NotNil(t, r) - err = r.Start(context.Background(), componenttest.NewNopHost()) - assert.ErrorContains(t, err, "failed to load TLS config") -} - func TestNewTracesReceiver_initial_offset_err(t *testing.T) { c := Config{ InitialOffset: "foo", @@ -419,28 +394,6 @@ func TestNewMetricsReceiver_encoding_err(t *testing.T) { assert.EqualError(t, err, errUnrecognizedEncoding.Error()) } -func TestNewMetricsExporter_err_auth_type(t *testing.T) { - c := Config{ - ProtocolVersion: "2.0.0", - Authentication: kafka.Authentication{ - TLS: &configtls.ClientConfig{ - Config: configtls.Config{ - CAFile: "/nonexistent", - }, - }, - }, - Encoding: defaultEncoding, - Metadata: kafkaexporter.Metadata{ - Full: false, - }, - } - r, err := newMetricsReceiver(c, receivertest.NewNopSettings(metadata.Type), consumertest.NewNop()) - require.NoError(t, err) - require.NotNil(t, r) - err = r.Start(context.Background(), componenttest.NewNopHost()) - assert.ErrorContains(t, err, "failed to load TLS config") -} - func TestNewMetricsReceiver_initial_offset_err(t *testing.T) { c := Config{ InitialOffset: "foo", @@ -764,28 +717,6 @@ func TestNewLogsReceiver_encoding_err(t *testing.T) { assert.EqualError(t, err, errUnrecognizedEncoding.Error()) } -func TestNewLogsExporter_err_auth_type(t *testing.T) { - c := Config{ - ProtocolVersion: "2.0.0", - Authentication: kafka.Authentication{ - TLS: &configtls.ClientConfig{ - Config: configtls.Config{ - CAFile: "/nonexistent", - }, - }, - }, - Encoding: defaultEncoding, - Metadata: kafkaexporter.Metadata{ - Full: false, - }, - } - r, err := newLogsReceiver(c, receivertest.NewNopSettings(metadata.Type), consumertest.NewNop()) - require.NoError(t, err) - require.NotNil(t, r) - err = r.Start(context.Background(), componenttest.NewNopHost()) - assert.ErrorContains(t, err, "failed to load TLS config") -} - func TestNewLogsReceiver_initial_offset_err(t *testing.T) { c := Config{ InitialOffset: "foo", From c72c43e7ddcd82443ee5495015b3ce90dc9055ed Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Tue, 11 Mar 2025 11:07:59 +0800 Subject: [PATCH 6/8] Fix check-collector-module-version --- internal/kafka/go.mod | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/kafka/go.mod b/internal/kafka/go.mod index b4a443109de2..dcfe9c5b7e52 100644 --- a/internal/kafka/go.mod +++ b/internal/kafka/go.mod @@ -60,8 +60,8 @@ require ( github.com/xdg-go/pbkdf2 v1.0.0 // indirect github.com/xdg-go/stringprep v1.0.4 // indirect go.opentelemetry.io/collector/config/configopaque v1.27.1-0.20250307194215-7d3e03e500b0 // indirect - go.opentelemetry.io/collector/featuregate v1.27.0 // indirect - go.opentelemetry.io/collector/pdata v1.27.0 // indirect + go.opentelemetry.io/collector/featuregate v1.27.1-0.20250307194215-7d3e03e500b0 // indirect + go.opentelemetry.io/collector/pdata v1.27.1-0.20250307194215-7d3e03e500b0 // indirect go.opentelemetry.io/otel v1.34.0 // indirect go.opentelemetry.io/otel/metric v1.34.0 // indirect go.opentelemetry.io/otel/trace v1.34.0 // indirect From 97a563674e4203107be28de8e1f9b40af92da542 Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Tue, 11 Mar 2025 11:12:03 +0800 Subject: [PATCH 7/8] Update go.sum too :facepalm: --- internal/kafka/go.sum | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/kafka/go.sum b/internal/kafka/go.sum index 8bb3f15dce4c..198d6b24e0c7 100644 --- a/internal/kafka/go.sum +++ b/internal/kafka/go.sum @@ -145,10 +145,10 @@ go.opentelemetry.io/collector/confmap v1.27.1-0.20250307194215-7d3e03e500b0 h1:w go.opentelemetry.io/collector/confmap v1.27.1-0.20250307194215-7d3e03e500b0/go.mod h1:6VV+Zoc+4tUpViZLFxo4ra/YNiyISwmJIgCchy1TJa0= go.opentelemetry.io/collector/confmap/xconfmap v0.121.1-0.20250307164521-7c787571daa5 h1:0Vcgm88Eh4vUyBILGYt35rxPaFAUyPTsXa+MtfprnVo= go.opentelemetry.io/collector/confmap/xconfmap v0.121.1-0.20250307164521-7c787571daa5/go.mod h1:npXgwAEcNHOf04WT3DLTxsErOdMbzClzu1ul7YetuX8= -go.opentelemetry.io/collector/featuregate v1.27.0 h1:4LLrccoMz/gJT5uym8ojBlMzY5tr4RzUUXzwlBuiRz0= -go.opentelemetry.io/collector/featuregate v1.27.0/go.mod h1:Y/KsHbvREENKvvN9RlpiWk/IGBK+CATBYzIIpU7nccc= -go.opentelemetry.io/collector/pdata v1.27.0 h1:66yI7FYkUDia74h48Fd2/KG2Vk8DxZnGw54wRXykCEU= -go.opentelemetry.io/collector/pdata v1.27.0/go.mod h1:18e8/xDZsqyj00h/5HM5GLdJgBzzG9Ei8g9SpNoiMtI= +go.opentelemetry.io/collector/featuregate v1.27.1-0.20250307194215-7d3e03e500b0 h1:FeNz6xofOs3oUno+llbotuCEu8Ah4N4uGGKqUBYfK9U= +go.opentelemetry.io/collector/featuregate v1.27.1-0.20250307194215-7d3e03e500b0/go.mod h1:Y/KsHbvREENKvvN9RlpiWk/IGBK+CATBYzIIpU7nccc= +go.opentelemetry.io/collector/pdata v1.27.1-0.20250307194215-7d3e03e500b0 h1:85gWk6wREFVE51XJVRhc7+bEwoVwLViGk7Y0CokNA1A= +go.opentelemetry.io/collector/pdata v1.27.1-0.20250307194215-7d3e03e500b0/go.mod h1:nFXOEpZx43ykMZJd87AHWIJKqDP+UMMKydIy59m5SEs= go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY= go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI= go.opentelemetry.io/otel/metric v1.34.0 h1:+eTR3U0MyfWjRDhmFMxe2SsW64QrZ84AOhvqS7Y+PoQ= From a6cb7055912fb8a1c37c303b51213b767a51446e Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Wed, 12 Mar 2025 10:31:02 +0800 Subject: [PATCH 8/8] Add missing license header --- internal/kafka/configkafka/config_test.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/internal/kafka/configkafka/config_test.go b/internal/kafka/configkafka/config_test.go index 4fbe56469241..0785e1e92841 100644 --- a/internal/kafka/configkafka/config_test.go +++ b/internal/kafka/configkafka/config_test.go @@ -1,4 +1,7 @@ -package configkafka +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package configkafka // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka/configkafka" import ( "path/filepath"