Skip to content

Commit 8c7e4d8

Browse files
author
Cody Kaczynski
committed
[receiver/kafkaexporter]: allow tunable fetch sizes
This commit adds the ability to tune the minimum, default and maximum fetch sizes for Kafka in the OpenTelemetry configuration file.
1 parent b4b07d5 commit 8c7e4d8

File tree

6 files changed

+50
-0
lines changed

6 files changed

+50
-0
lines changed

receiver/kafkareceiver/README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ The following settings can be optionally configured:
4545
- `initial_offset` (default = latest): The initial offset to use if no offset was previously committed. Must be `latest` or `earliest`.
4646
- `session_timeout` (default = `10s`): The request timeout for detecting client failures when using Kafka’s group management facilities.
4747
- `heartbeat_interval` (default = `3s`): The expected time between heartbeats to the consumer coordinator when using Kafka’s group management facilities.
48+
- `min_fetch_size` (default = `1`): The minimum number of message bytes to fetch in a request, defaults to 1 byte.
49+
- `default_fetch_size` (default = `1048576`): The default number of message bytes to fetch in a request, defaults to 1MB.
50+
- `max_fetch_size` (default = `0`): The maximum number of message bytes to fetch in a request, defaults to unlimited.
4851
- `auth`
4952
- `plain_text`
5053
- `username`: The username to use.

receiver/kafkareceiver/config.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,13 @@ type Config struct {
7878

7979
// Extract headers from kafka records
8080
HeaderExtraction HeaderExtraction `mapstructure:"header_extraction"`
81+
82+
// The minimum bytes per fetch from Kafka (default "1")
83+
MinFetchSize int32 `mapstructure:"min_fetch_size"`
84+
// The default bytes per fetch from Kafka (default "1048576")
85+
DefaultFetchSize int32 `mapstructure:"default_fetch_size"`
86+
// The maximum bytes per fetch from Kafka (default "0")
87+
MaxFetchSize int32 `mapstructure:"max_fetch_size"`
8188
}
8289

8390
const (

receiver/kafkareceiver/config_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,9 @@ func TestLoadConfig(t *testing.T) {
6262
Enable: true,
6363
Interval: 1 * time.Second,
6464
},
65+
MinFetchSize: 1,
66+
DefaultFetchSize: 1048576,
67+
MaxFetchSize: 0,
6568
},
6669
},
6770
{
@@ -96,6 +99,9 @@ func TestLoadConfig(t *testing.T) {
9699
Enable: true,
97100
Interval: 1 * time.Second,
98101
},
102+
MinFetchSize: 1,
103+
DefaultFetchSize: 1048576,
104+
MaxFetchSize: 0,
99105
},
100106
},
101107
}

receiver/kafkareceiver/factory.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,13 @@ const (
4040
defaultAutoCommitEnable = true
4141
// default from sarama.NewConfig()
4242
defaultAutoCommitInterval = 1 * time.Second
43+
44+
// default from sarama.NewConfig()
45+
defaultMinFetchSize = 1
46+
// default from sarama.NewConfig()
47+
defaultDefaultFetchSize = 1048576
48+
// default from sarama.NewConfig()
49+
defaultMaxFetchSize = 0
4350
)
4451

4552
var errUnrecognizedEncoding = fmt.Errorf("unrecognized encoding")
@@ -120,6 +127,9 @@ func createDefaultConfig() component.Config {
120127
HeaderExtraction: HeaderExtraction{
121128
ExtractHeaders: false,
122129
},
130+
MinFetchSize: defaultMinFetchSize,
131+
DefaultFetchSize: defaultDefaultFetchSize,
132+
MaxFetchSize: defaultMaxFetchSize,
123133
}
124134
}
125135

receiver/kafkareceiver/factory_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ func TestCreateDefaultConfig(t *testing.T) {
2828
assert.Equal(t, defaultInitialOffset, cfg.InitialOffset)
2929
assert.Equal(t, defaultSessionTimeout, cfg.SessionTimeout)
3030
assert.Equal(t, defaultHeartbeatInterval, cfg.HeartbeatInterval)
31+
assert.Equal(t, defaultMinFetchSize, cfg.MinFetchSize)
32+
assert.Equal(t, defaultDefaultFetchSize, cfg.DefaultFetchSize)
33+
assert.Equal(t, defaultMaxFetchSize, cfg.MaxFetchSize)
3134
}
3235

3336
func TestCreateTracesReceiver(t *testing.T) {

receiver/kafkareceiver/kafka_receiver.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@ type kafkaTracesConsumer struct {
4848
messageMarking MessageMarking
4949
headerExtraction bool
5050
headers []string
51+
minFetchSize int32
52+
defaultFetchSize int32
53+
maxFetchSize int32
5154
}
5255

5356
// kafkaMetricsConsumer uses sarama to consume and handle messages from kafka.
@@ -66,6 +69,9 @@ type kafkaMetricsConsumer struct {
6669
messageMarking MessageMarking
6770
headerExtraction bool
6871
headers []string
72+
minFetchSize int32
73+
defaultFetchSize int32
74+
maxFetchSize int32
6975
}
7076

7177
// kafkaLogsConsumer uses sarama to consume and handle messages from kafka.
@@ -84,6 +90,9 @@ type kafkaLogsConsumer struct {
8490
messageMarking MessageMarking
8591
headerExtraction bool
8692
headers []string
93+
minFetchSize int32
94+
defaultFetchSize int32
95+
maxFetchSize int32
8796
}
8897

8998
var _ receiver.Traces = (*kafkaTracesConsumer)(nil)
@@ -111,6 +120,9 @@ func newTracesReceiver(config Config, set receiver.Settings, unmarshaler TracesU
111120
headerExtraction: config.HeaderExtraction.ExtractHeaders,
112121
headers: config.HeaderExtraction.Headers,
113122
telemetryBuilder: telemetryBuilder,
123+
minFetchSize: config.MinFetchSize,
124+
defaultFetchSize: config.DefaultFetchSize,
125+
maxFetchSize: config.MaxFetchSize,
114126
}, nil
115127
}
116128

@@ -124,6 +136,9 @@ func createKafkaClient(config Config) (sarama.ConsumerGroup, error) {
124136
saramaConfig.Consumer.Offsets.AutoCommit.Interval = config.AutoCommit.Interval
125137
saramaConfig.Consumer.Group.Session.Timeout = config.SessionTimeout
126138
saramaConfig.Consumer.Group.Heartbeat.Interval = config.HeartbeatInterval
139+
saramaConfig.Consumer.Fetch.Min = config.MinFetchSize
140+
saramaConfig.Consumer.Fetch.Default = config.DefaultFetchSize
141+
saramaConfig.Consumer.Fetch.Max = config.MaxFetchSize
127142

128143
var err error
129144
if saramaConfig.Consumer.Offsets.Initial, err = toSaramaInitialOffset(config.InitialOffset); err != nil {
@@ -234,6 +249,9 @@ func newMetricsReceiver(config Config, set receiver.Settings, unmarshaler Metric
234249
headerExtraction: config.HeaderExtraction.ExtractHeaders,
235250
headers: config.HeaderExtraction.Headers,
236251
telemetryBuilder: telemetryBuilder,
252+
minFetchSize: config.MinFetchSize,
253+
defaultFetchSize: config.DefaultFetchSize,
254+
maxFetchSize: config.MaxFetchSize,
237255
}, nil
238256
}
239257

@@ -328,6 +346,9 @@ func newLogsReceiver(config Config, set receiver.Settings, unmarshaler LogsUnmar
328346
headerExtraction: config.HeaderExtraction.ExtractHeaders,
329347
headers: config.HeaderExtraction.Headers,
330348
telemetryBuilder: telemetryBuilder,
349+
minFetchSize: config.MinFetchSize,
350+
defaultFetchSize: config.DefaultFetchSize,
351+
maxFetchSize: config.MaxFetchSize,
331352
}, nil
332353
}
333354

0 commit comments

Comments
 (0)