Skip to content

Commit 9220c8a

Browse files
committed
Start using the new SizeConfig
Signed-off-by: Bogdan Drutu <[email protected]>
1 parent 5812712 commit 9220c8a

23 files changed

+247
-204
lines changed

.chloggen/finishsize.yaml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: breaking
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: exporterhelper
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Update MergeSplit function signature to use the new SizeConfig
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [12486]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# Optional: The change log or logs in which this entry should be included.
21+
# e.g. '[user]' or '[user, api]'
22+
# Include 'user' if the change is relevant to end users.
23+
# Include 'api' if there is a change to a library API.
24+
# Default: '[user]'
25+
change_logs: [api]

exporter/exporterbatcher/config.go

Lines changed: 33 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,11 @@ package exporterbatcher // import "go.opentelemetry.io/collector/exporter/export
66
import (
77
"errors"
88
"time"
9+
10+
"go.opentelemetry.io/collector/confmap"
911
)
1012

1113
// Config defines a configuration for batching requests based on a timeout and a minimum number of items.
12-
// MaxSizeItems defines batch splitting functionality if it's more than zero.
1314
// Experimental: This API is at the early stage of development and may change without backward compatibility
1415
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
1516
type Config struct {
@@ -19,66 +20,74 @@ type Config struct {
1920
// FlushTimeout sets the time after which a batch will be sent regardless of its size.
2021
FlushTimeout time.Duration `mapstructure:"flush_timeout"`
2122

23+
// SizeConfig sets the size limits for a batch.
2224
SizeConfig `mapstructure:",squash"`
2325

24-
// Deprecated. Ignored if SizeConfig is set.
26+
// Deprecated: [v0.121.0] Ignored if SizeConfig is set.
2527
MinSizeConfig `mapstructure:",squash"`
26-
// Deprecated. Ignored if SizeConfig is set.
28+
// Deprecated: [v0.121.0] Ignored if SizeConfig is set.
2729
MaxSizeConfig `mapstructure:",squash"`
2830
}
2931

32+
// SizeConfig sets the size limits for a batch.
3033
type SizeConfig struct {
3134
Sizer SizerType `mapstructure:"sizer"`
3235

33-
MinSize int `mapstructure:"mix_size"`
36+
// MinSize defines the configuration for the minimum size of a batch.
37+
MinSize int `mapstructure:"min_size"`
38+
// MaxSize defines the configuration for the maximum size of a batch.
3439
MaxSize int `mapstructure:"max_size"`
3540
}
3641

37-
// MinSizeConfig defines the configuration for the minimum number of items in a batch.
38-
// Experimental: This API is at the early stage of development and may change without backward compatibility
39-
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
42+
// Deprecated: [v0.121.0] use SizeConfig.
4043
type MinSizeConfig struct {
4144
// MinSizeItems is the number of items (spans, data points or log records for OTLP) at which the batch should be
4245
// sent regardless of the timeout. There is no guarantee that the batch size always greater than this value.
4346
// This option requires the Request to implement RequestItemsCounter interface. Otherwise, it will be ignored.
4447
MinSizeItems int `mapstructure:"min_size_items"`
4548
}
4649

47-
// MaxSizeConfig defines the configuration for the maximum number of items in a batch.
48-
// Experimental: This API is at the early stage of development and may change without backward compatibility
49-
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
50+
// Deprecated: [v0.121.0] use SizeConfig.
5051
type MaxSizeConfig struct {
5152
// MaxSizeItems is the maximum number of the batch items, i.e. spans, data points or log records for OTLP.
5253
// If the batch size exceeds this value, it will be broken up into smaller batches if possible.
5354
// Setting this value to zero disables the maximum size limit.
5455
MaxSizeItems int `mapstructure:"max_size_items"`
5556
}
5657

57-
func (c Config) Validate() error {
58-
if c.MinSizeItems < 0 {
59-
return errors.New("min_size_items must be greater than or equal to zero")
58+
func (c *Config) Unmarshal(conf *confmap.Conf) error {
59+
if err := conf.Unmarshal(c); err != nil {
60+
return err
6061
}
61-
if c.MaxSizeItems < 0 {
62-
return errors.New("max_size_items must be greater than or equal to zero")
62+
63+
if conf.IsSet("min_size_items") {
64+
c.SizeConfig.MinSize = c.MinSizeItems
6365
}
64-
if c.MaxSizeItems != 0 && c.MaxSizeItems < c.MinSizeItems {
65-
return errors.New("max_size_items must be greater than or equal to min_size_items")
66+
67+
if conf.IsSet("max_size_items") {
68+
c.SizeConfig.MaxSize = c.MaxSizeItems
6669
}
70+
71+
return nil
72+
}
73+
74+
func (c *Config) Validate() error {
6775
if c.FlushTimeout <= 0 {
68-
return errors.New("timeout must be greater than zero")
76+
return errors.New("`flush_timeout` must be greater than zero")
6977
}
78+
7079
return nil
7180
}
7281

7382
func (c SizeConfig) Validate() error {
7483
if c.MinSize < 0 {
75-
return errors.New("min_size must be greater than or equal to zero")
84+
return errors.New("`min_size` must be greater than or equal to zero")
7685
}
7786
if c.MaxSize < 0 {
78-
return errors.New("max_size must be greater than or equal to zero")
87+
return errors.New("`max_size` must be greater than or equal to zero")
7988
}
8089
if c.MaxSize != 0 && c.MaxSize < c.MinSize {
81-
return errors.New("max_size must be greater than or equal to mix_size")
90+
return errors.New("`max_size` must be greater than or equal to mix_size")
8291
}
8392
return nil
8493
}
@@ -87,8 +96,9 @@ func NewDefaultConfig() Config {
8796
return Config{
8897
Enabled: true,
8998
FlushTimeout: 200 * time.Millisecond,
90-
MinSizeConfig: MinSizeConfig{
91-
MinSizeItems: 8192,
99+
SizeConfig: SizeConfig{
100+
Sizer: SizerTypeItems,
101+
MinSize: 8192,
92102
},
93103
}
94104
}

exporter/exporterbatcher/config_test.go

Lines changed: 6 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -6,31 +6,16 @@ package exporterbatcher
66
import (
77
"testing"
88

9-
"github.com/stretchr/testify/assert"
109
"github.com/stretchr/testify/require"
11-
"gopkg.in/yaml.v2"
12-
13-
"go.opentelemetry.io/collector/confmap"
1410
)
1511

1612
func TestValidateConfig(t *testing.T) {
1713
cfg := NewDefaultConfig()
1814
require.NoError(t, cfg.Validate())
1915

20-
cfg.MinSizeItems = -1
21-
require.EqualError(t, cfg.Validate(), "min_size_items must be greater than or equal to zero")
22-
2316
cfg = NewDefaultConfig()
2417
cfg.FlushTimeout = 0
25-
require.EqualError(t, cfg.Validate(), "timeout must be greater than zero")
26-
27-
cfg.MaxSizeItems = -1
28-
require.EqualError(t, cfg.Validate(), "max_size_items must be greater than or equal to zero")
29-
30-
cfg = NewDefaultConfig()
31-
cfg.MaxSizeItems = 20000
32-
cfg.MinSizeItems = 20001
33-
assert.EqualError(t, cfg.Validate(), "max_size_items must be greater than or equal to min_size_items")
18+
require.EqualError(t, cfg.Validate(), "`flush_timeout` must be greater than zero")
3419
}
3520

3621
func TestValidateSizeConfig(t *testing.T) {
@@ -39,45 +24,19 @@ func TestValidateSizeConfig(t *testing.T) {
3924
MaxSize: -100,
4025
MinSize: 100,
4126
}
42-
require.EqualError(t, cfg.Validate(), "max_size must be greater than or equal to zero")
27+
require.EqualError(t, cfg.Validate(), "`max_size` must be greater than or equal to zero")
4328

4429
cfg = SizeConfig{
45-
Sizer: SizerTypeBytes,
30+
Sizer: SizerTypeItems,
4631
MaxSize: 100,
4732
MinSize: -100,
4833
}
49-
require.EqualError(t, cfg.Validate(), "min_size must be greater than or equal to zero")
34+
require.EqualError(t, cfg.Validate(), "`min_size` must be greater than or equal to zero")
5035

5136
cfg = SizeConfig{
52-
Sizer: SizerTypeBytes,
37+
Sizer: SizerTypeItems,
5338
MaxSize: 100,
5439
MinSize: 200,
5540
}
56-
require.EqualError(t, cfg.Validate(), "max_size must be greater than or equal to mix_size")
57-
}
58-
59-
func TestSizeUnmarshaler(t *testing.T) {
60-
var rawConf map[string]any
61-
cfg := NewDefaultConfig()
62-
63-
require.NoError(t, yaml.Unmarshal([]byte(`sizer: bytes`), &rawConf))
64-
require.NoError(t, confmap.NewFromStringMap(rawConf).Unmarshal(&cfg))
65-
require.NoError(t, cfg.Validate())
66-
67-
require.NoError(t, yaml.Unmarshal([]byte(`sizer: "bytes"`), &rawConf))
68-
require.NoError(t, confmap.NewFromStringMap(rawConf).Unmarshal(&cfg))
69-
require.NoError(t, cfg.Validate())
70-
71-
require.NoError(t, yaml.Unmarshal([]byte(`sizer: items`), &rawConf))
72-
require.NoError(t, confmap.NewFromStringMap(rawConf).Unmarshal(&cfg))
73-
require.NoError(t, cfg.Validate())
74-
75-
require.NoError(t, yaml.Unmarshal([]byte(`sizer: 'items'`), &rawConf))
76-
require.NoError(t, confmap.NewFromStringMap(rawConf).Unmarshal(&cfg))
77-
require.NoError(t, cfg.Validate())
78-
79-
require.NoError(t, yaml.Unmarshal([]byte(`sizer: invalid`), &rawConf))
80-
require.EqualError(t,
81-
confmap.NewFromStringMap(rawConf).Unmarshal(&cfg),
82-
"decoding failed due to the following error(s):\n\nerror decoding 'sizer': invalid sizer: \"invalid\"")
41+
require.EqualError(t, cfg.Validate(), "`max_size` must be greater than or equal to mix_size")
8342
}

exporter/exporterbatcher/sizer_type.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,20 +7,21 @@ import (
77
"fmt"
88
)
99

10-
type SizerType string
10+
type SizerType struct {
11+
val string
12+
}
1113

1214
const (
13-
SizerTypeItems SizerType = "items"
14-
SizerTypeBytes SizerType = "bytes"
15+
sizerTypeItems = "items"
1516
)
1617

18+
var SizerTypeItems = SizerType{val: sizerTypeItems}
19+
1720
// UnmarshalText implements TextUnmarshaler interface.
1821
func (s *SizerType) UnmarshalText(text []byte) error {
1922
switch str := string(text); str {
20-
case string(SizerTypeItems):
23+
case sizerTypeItems:
2124
*s = SizerTypeItems
22-
case string(SizerTypeBytes):
23-
*s = SizerTypeBytes
2425
default:
2526
return fmt.Errorf("invalid sizer: %q", str)
2627
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package exporterbatcher
5+
6+
import (
7+
"testing"
8+
9+
"github.com/stretchr/testify/require"
10+
"gopkg.in/yaml.v2"
11+
12+
"go.opentelemetry.io/collector/confmap"
13+
)
14+
15+
func TestSizeTypeUnmarshaler(t *testing.T) {
16+
var rawConf map[string]any
17+
cfg := NewDefaultConfig()
18+
19+
require.NoError(t, yaml.Unmarshal([]byte(`sizer: items`), &rawConf))
20+
require.NoError(t, confmap.NewFromStringMap(rawConf).Unmarshal(&cfg))
21+
require.NoError(t, cfg.Validate())
22+
23+
require.NoError(t, yaml.Unmarshal([]byte(`sizer: 'items'`), &rawConf))
24+
require.NoError(t, confmap.NewFromStringMap(rawConf).Unmarshal(&cfg))
25+
require.NoError(t, cfg.Validate())
26+
27+
require.NoError(t, yaml.Unmarshal([]byte(`sizer: invalid`), &rawConf))
28+
require.ErrorContains(t,
29+
confmap.NewFromStringMap(rawConf).Unmarshal(&cfg),
30+
"decoding failed due to the following error(s):\n\nerror decoding 'sizer': invalid sizer: \"invalid\"")
31+
}

exporter/exporterhelper/internal/batcher/default_batcher.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done
6767
var reqList []request.Request
6868
var mergeSplitErr error
6969
if qb.currentBatch == nil {
70-
reqList, mergeSplitErr = req.MergeSplit(ctx, qb.batchCfg.MaxSizeConfig, nil)
70+
reqList, mergeSplitErr = req.MergeSplit(ctx, qb.batchCfg.SizeConfig, nil)
7171
if mergeSplitErr != nil || len(reqList) == 0 {
7272
done.OnDone(mergeSplitErr)
7373
qb.currentBatchMu.Unlock()
@@ -80,9 +80,9 @@ func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done
8080
}
8181

8282
// We have at least one result in the reqList. Last in the list may not have enough data to be flushed.
83-
// Find if it has at least MinSizeItems, and if it does then move that as the current batch.
83+
// Find if it has at least MinSize, and if it does then move that as the current batch.
8484
lastReq := reqList[len(reqList)-1]
85-
if lastReq.ItemsCount() < qb.batchCfg.MinSizeItems {
85+
if lastReq.ItemsCount() < qb.batchCfg.MinSize {
8686
// Do not flush the last item and add it to the current batch.
8787
reqList = reqList[:len(reqList)-1]
8888
qb.currentBatch = &batch{
@@ -101,7 +101,7 @@ func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done
101101
return
102102
}
103103

104-
reqList, mergeSplitErr = qb.currentBatch.req.MergeSplit(ctx, qb.batchCfg.MaxSizeConfig, req)
104+
reqList, mergeSplitErr = qb.currentBatch.req.MergeSplit(ctx, qb.batchCfg.SizeConfig, req)
105105
// If failed to merge signal all Done callbacks from current batch as well as the current request and reset the current batch.
106106
if mergeSplitErr != nil || len(reqList) == 0 {
107107
done.OnDone(mergeSplitErr)
@@ -127,7 +127,7 @@ func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done
127127
// cannot unlock and re-lock because we are not done processing all the responses.
128128
var firstBatch *batch
129129
// Need to check the currentBatch if more than 1 result returned or if 1 result return but larger than MinSize.
130-
if len(reqList) > 1 || qb.currentBatch.req.ItemsCount() >= qb.batchCfg.MinSizeItems {
130+
if len(reqList) > 1 || qb.currentBatch.req.ItemsCount() >= qb.batchCfg.MinSize {
131131
firstBatch = qb.currentBatch
132132
qb.currentBatch = nil
133133
}
@@ -137,7 +137,7 @@ func (qb *defaultBatcher) Consume(ctx context.Context, req request.Request, done
137137
// If we still have results to process, then we need to check if the last result has enough data to flush, or we add it to the currentBatch.
138138
if len(reqList) > 0 {
139139
lastReq := reqList[len(reqList)-1]
140-
if lastReq.ItemsCount() < qb.batchCfg.MinSizeItems {
140+
if lastReq.ItemsCount() < qb.batchCfg.MinSize {
141141
// Do not flush the last item and add it to the current batch.
142142
reqList = reqList[:len(reqList)-1]
143143
qb.currentBatch = &batch{

0 commit comments

Comments
 (0)