Skip to content

Commit c5aa92a

Browse files
authored
(otelarrowreceiver): add admission control to otlp path (#35021)
1 parent a4cb201 commit c5aa92a

File tree

16 files changed

+490
-49
lines changed

16 files changed

+490
-49
lines changed
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: otelarrowreceiver
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: |
11+
Add admission control in the otelarrow receiver's standard otlp data path.
12+
Also moves admission control config options to a separate block.
13+
arrow.admission_limit_mib -> admission.request_limit_mib
14+
arrow.waiter_limit -> admission.waiter_limit
15+
16+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
17+
issues: [35021]
18+
19+
# (Optional) One or more lines of additional information to render under the primary note.
20+
# These lines will be padded with 2 spaces and then inserted directly into the document.
21+
# Use pipe (|) for multiline entries.
22+
subtext:
23+
24+
# If your change doesn't affect end users or the exported elements of any package,
25+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
26+
# Optional: The change log or logs in which this entry should be included.
27+
# e.g. '[user]' or '[user, api]'
28+
# Include 'user' if the change is relevant to end users.
29+
# Include 'api' if there is a change to a library API.
30+
# Default: '[user]'
31+
change_logs: [user]

internal/otelarrow/go.mod

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,19 @@ require (
4444
github.com/fxamacker/cbor/v2 v2.4.0 // indirect
4545
github.com/go-logr/logr v1.4.2 // indirect
4646
github.com/go-logr/stdr v1.2.2 // indirect
47+
github.com/go-viper/mapstructure/v2 v2.1.0 // indirect
4748
github.com/goccy/go-json v0.10.3 // indirect
4849
github.com/gogo/protobuf v1.3.2 // indirect
4950
github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect
5051
github.com/google/flatbuffers v24.3.25+incompatible // indirect
5152
github.com/json-iterator/go v1.1.12 // indirect
5253
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
54+
github.com/knadh/koanf/maps v0.1.1 // indirect
55+
github.com/knadh/koanf/providers/confmap v0.1.0 // indirect
56+
github.com/knadh/koanf/v2 v2.1.1 // indirect
5357
github.com/mailru/easyjson v0.7.7 // indirect
58+
github.com/mitchellh/copystructure v1.2.0 // indirect
59+
github.com/mitchellh/reflectwalk v1.0.2 // indirect
5460
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
5561
github.com/modern-go/reflect2 v1.0.2 // indirect
5662
github.com/mostynb/go-grpc-compression v1.2.3 // indirect
@@ -69,6 +75,7 @@ require (
6975
go.opentelemetry.io/collector/config/configretry v1.16.0 // indirect
7076
go.opentelemetry.io/collector/config/configtls v1.16.0 // indirect
7177
go.opentelemetry.io/collector/config/internal v0.110.0 // indirect
78+
go.opentelemetry.io/collector/confmap v1.16.0 // indirect
7279
go.opentelemetry.io/collector/consumer/consumerprofiles v0.110.0 // indirect
7380
go.opentelemetry.io/collector/extension v0.110.0 // indirect
7481
go.opentelemetry.io/collector/extension/auth v0.110.0 // indirect

receiver/otelarrowreceiver/README.md

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,16 @@ Several common configuration structures provide additional capabilities automati
7777
- [gRPC settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configgrpc/README.md)
7878
- [TLS and mTLS settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configtls/README.md)
7979

80+
### Admission Control Configuration
81+
82+
In the `admission` configuration block the following settings are available:
83+
84+
- `request_limit_mib` (default: 128): limits the number of requests that are received by the stream in terms of *uncompressed request size*. This should not be confused with `arrow.memory_limit_mib` which limits allocations made by the consumer when translating arrow records into pdata objects. i.e. request size is used to control how much traffic we admit, but does not control how much memory is used during request processing.
85+
86+
- `waiter_limit` (default: 1000): limits the number of requests waiting on admission once `admission_limit_mib` is reached. This is another dimension of memory limiting that ensures waiters are not holding onto a significant amount of memory while waiting to be processed.
87+
88+
`request_limit_mib` and `waiter_limit` are arguments supplied to [admission.BoundedQueue](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/internal/otelarrow/admission). This custom semaphore is meant to be used within receivers to help limit memory within the collector pipeline.
89+
8090
### Arrow-specific Configuration
8191

8292
In the `arrow` configuration block, the following settings are available:
@@ -87,13 +97,6 @@ When the limit is reached, the receiver will return RESOURCE_EXHAUSTED
8797
error codes to the receiver, which are [conditionally retryable, see
8898
exporter retry configuration](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md).
8999

90-
- `admission_limit_mib` (default: 64): limits the number of requests that are received by the stream based on request size information available. This should not be confused with `memory_limit_mib` which limits allocations made by the consumer when translating arrow records into pdata objects. i.e. request size is used to control how much traffic we admit, but does not control how much memory is used during request processing.
91-
92-
- `waiter_limit` (default: 1000): limits the number of requests waiting on admission once `admission_limit_mib` is reached. This is another dimension of memory limiting that ensures waiters are not holding onto a significant amount of memory while waiting to be processed.
93-
94-
`admission_limit_mib` and `waiter_limit` are arguments supplied to [admission.BoundedQueue](https://github.com/open-telemetry/otel-arrow/tree/main/collector/admission). This custom semaphore is meant to be used within receivers to help limit memory within the collector pipeline.
95-
96-
97100
### Compression Configuration
98101

99102
In the `arrow` configuration block, `zstd` sub-section applies to all

receiver/otelarrowreceiver/config.go

Lines changed: 43 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88

99
"go.opentelemetry.io/collector/component"
1010
"go.opentelemetry.io/collector/config/configgrpc"
11+
"go.opentelemetry.io/collector/confmap"
1112

1213
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/compression/zstd"
1314
)
@@ -18,22 +19,30 @@ type Protocols struct {
1819
Arrow ArrowConfig `mapstructure:"arrow"`
1920
}
2021

22+
type AdmissionConfig struct {
23+
// RequestLimitMiB limits the number of requests that are received by the stream based on
24+
// uncompressed request size. Request size is used to control how much traffic we admit
25+
// for processing.
26+
RequestLimitMiB uint64 `mapstructure:"request_limit_mib"`
27+
28+
// WaiterLimit is the limit on the number of waiters waiting to be processed and consumed.
29+
// This is a dimension of memory limiting to ensure waiters are not consuming an
30+
// unexpectedly large amount of memory in the arrow receiver.
31+
WaiterLimit int64 `mapstructure:"waiter_limit"`
32+
}
33+
2134
// ArrowConfig support configuring the Arrow receiver.
2235
type ArrowConfig struct {
2336
// MemoryLimitMiB is the size of a shared memory region used
2437
// by all Arrow streams, in MiB. When too much load is
2538
// passing through, they will see ResourceExhausted errors.
2639
MemoryLimitMiB uint64 `mapstructure:"memory_limit_mib"`
2740

28-
// AdmissionLimitMiB limits the number of requests that are received by the stream based on
29-
// request size information available. Request size is used to control how much traffic we admit
30-
// for processing, but does not control how much memory is used during request processing.
31-
AdmissionLimitMiB uint64 `mapstructure:"admission_limit_mib"`
41+
// Deprecated: This field is no longer supported, use cfg.Admission.RequestLimitMiB instead.
42+
DeprecatedAdmissionLimitMiB uint64 `mapstructure:"admission_limit_mib"`
3243

33-
// WaiterLimit is the limit on the number of waiters waiting to be processed and consumed.
34-
// This is a dimension of memory limiting to ensure waiters are not consuming an
35-
// unexpectedly large amount of memory in the arrow receiver.
36-
WaiterLimit int64 `mapstructure:"waiter_limit"`
44+
// Deprecated: This field is no longer supported, use cfg.Admission.WaiterLimit instead.
45+
DeprecatedWaiterLimit int64 `mapstructure:"waiter_limit"`
3746

3847
// Zstd settings apply to OTel-Arrow use of gRPC specifically.
3948
Zstd zstd.DecoderConfig `mapstructure:"zstd"`
@@ -43,6 +52,8 @@ type ArrowConfig struct {
4352
type Config struct {
4453
// Protocols is the configuration for gRPC and Arrow.
4554
Protocols `mapstructure:"protocols"`
55+
// Admission is the configuration for controlling amount of request memory entering the receiver.
56+
Admission AdmissionConfig `mapstructure:"admission"`
4657
}
4758

4859
var _ component.Config = (*Config)(nil)
@@ -54,3 +65,27 @@ func (cfg *ArrowConfig) Validate() error {
5465
}
5566
return nil
5667
}
68+
69+
func (cfg *Config) Validate() error {
70+
if err := cfg.GRPC.Validate(); err != nil {
71+
return err
72+
}
73+
if err := cfg.Arrow.Validate(); err != nil {
74+
return err
75+
}
76+
return nil
77+
}
78+
79+
// Unmarshal will apply deprecated field values to assist the user with migration.
80+
func (cfg *Config) Unmarshal(conf *confmap.Conf) error {
81+
if err := conf.Unmarshal(cfg); err != nil {
82+
return err
83+
}
84+
if cfg.Admission.RequestLimitMiB == 0 && cfg.Arrow.DeprecatedAdmissionLimitMiB != 0 {
85+
cfg.Admission.RequestLimitMiB = cfg.Arrow.DeprecatedAdmissionLimitMiB
86+
}
87+
if cfg.Admission.WaiterLimit == 0 && cfg.Arrow.DeprecatedWaiterLimit != 0 {
88+
cfg.Admission.WaiterLimit = cfg.Arrow.DeprecatedWaiterLimit
89+
}
90+
return nil
91+
}

receiver/otelarrowreceiver/config_test.go

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -77,15 +77,41 @@ func TestUnmarshalConfig(t *testing.T) {
7777
},
7878
},
7979
Arrow: ArrowConfig{
80-
MemoryLimitMiB: 123,
81-
AdmissionLimitMiB: 80,
82-
WaiterLimit: 100,
80+
MemoryLimitMiB: 123,
8381
},
8482
},
83+
Admission: AdmissionConfig{
84+
RequestLimitMiB: 80,
85+
WaiterLimit: 100,
86+
},
8587
}, cfg)
8688

8789
}
8890

91+
// Tests that a deprecated config validation sets RequestLimitMiB and WaiterLimit in the correct config block.
92+
func TestValidateDeprecatedConfig(t *testing.T) {
93+
cm, err := confmaptest.LoadConf(filepath.Join("testdata", "deprecated.yaml"))
94+
require.NoError(t, err)
95+
cfg := &Config{}
96+
assert.NoError(t, cm.Unmarshal(cfg))
97+
assert.NoError(t, cfg.Validate())
98+
assert.Equal(t,
99+
&Config{
100+
Protocols: Protocols{
101+
Arrow: ArrowConfig{
102+
MemoryLimitMiB: 123,
103+
DeprecatedAdmissionLimitMiB: 80,
104+
DeprecatedWaiterLimit: 100,
105+
},
106+
},
107+
Admission: AdmissionConfig{
108+
// cfg.Validate should now set these fields.
109+
RequestLimitMiB: 80,
110+
WaiterLimit: 100,
111+
},
112+
}, cfg)
113+
}
114+
89115
func TestUnmarshalConfigUnix(t *testing.T) {
90116
cm, err := confmaptest.LoadConf(filepath.Join("testdata", "uds.yaml"))
91117
require.NoError(t, err)
@@ -103,11 +129,13 @@ func TestUnmarshalConfigUnix(t *testing.T) {
103129
ReadBufferSize: 512 * 1024,
104130
},
105131
Arrow: ArrowConfig{
106-
MemoryLimitMiB: defaultMemoryLimitMiB,
107-
AdmissionLimitMiB: defaultAdmissionLimitMiB,
108-
WaiterLimit: defaultWaiterLimit,
132+
MemoryLimitMiB: defaultMemoryLimitMiB,
109133
},
110134
},
135+
Admission: AdmissionConfig{
136+
RequestLimitMiB: defaultRequestLimitMiB,
137+
WaiterLimit: defaultWaiterLimit,
138+
},
111139
}, cfg)
112140
}
113141

receiver/otelarrowreceiver/factory.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@ import (
1919
const (
2020
defaultGRPCEndpoint = "0.0.0.0:4317"
2121

22-
defaultMemoryLimitMiB = 128
23-
defaultAdmissionLimitMiB = defaultMemoryLimitMiB / 2
24-
defaultWaiterLimit = 1000
22+
defaultMemoryLimitMiB = 128
23+
defaultRequestLimitMiB = 128
24+
defaultWaiterLimit = 1000
2525
)
2626

2727
// NewFactory creates a new OTel-Arrow receiver factory.
@@ -47,11 +47,13 @@ func createDefaultConfig() component.Config {
4747
ReadBufferSize: 512 * 1024,
4848
},
4949
Arrow: ArrowConfig{
50-
MemoryLimitMiB: defaultMemoryLimitMiB,
51-
AdmissionLimitMiB: defaultAdmissionLimitMiB,
52-
WaiterLimit: defaultWaiterLimit,
50+
MemoryLimitMiB: defaultMemoryLimitMiB,
5351
},
5452
},
53+
Admission: AdmissionConfig{
54+
RequestLimitMiB: defaultRequestLimitMiB,
55+
WaiterLimit: defaultWaiterLimit,
56+
},
5557
}
5658
}
5759

receiver/otelarrowreceiver/internal/logs/otlp.go

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,12 @@ import (
77
"context"
88

99
"go.opentelemetry.io/collector/consumer"
10+
"go.opentelemetry.io/collector/pdata/plog"
1011
"go.opentelemetry.io/collector/pdata/plog/plogotlp"
1112
"go.opentelemetry.io/collector/receiver/receiverhelper"
13+
"go.uber.org/zap"
14+
15+
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission"
1216
)
1317

1418
const dataFormatProtobuf = "protobuf"
@@ -18,13 +22,19 @@ type Receiver struct {
1822
plogotlp.UnimplementedGRPCServer
1923
nextConsumer consumer.Logs
2024
obsrecv *receiverhelper.ObsReport
25+
boundedQueue *admission.BoundedQueue
26+
sizer *plog.ProtoMarshaler
27+
logger *zap.Logger
2128
}
2229

2330
// New creates a new Receiver reference.
24-
func New(nextConsumer consumer.Logs, obsrecv *receiverhelper.ObsReport) *Receiver {
31+
func New(logger *zap.Logger, nextConsumer consumer.Logs, obsrecv *receiverhelper.ObsReport, bq *admission.BoundedQueue) *Receiver {
2532
return &Receiver{
2633
nextConsumer: nextConsumer,
2734
obsrecv: obsrecv,
35+
boundedQueue: bq,
36+
sizer: &plog.ProtoMarshaler{},
37+
logger: logger,
2838
}
2939
}
3040

@@ -37,7 +47,19 @@ func (r *Receiver) Export(ctx context.Context, req plogotlp.ExportRequest) (plog
3747
}
3848

3949
ctx = r.obsrecv.StartLogsOp(ctx)
40-
err := r.nextConsumer.ConsumeLogs(ctx, ld)
50+
51+
sizeBytes := int64(r.sizer.LogsSize(req.Logs()))
52+
err := r.boundedQueue.Acquire(ctx, sizeBytes)
53+
if err != nil {
54+
return plogotlp.NewExportResponse(), err
55+
}
56+
defer func() {
57+
if releaseErr := r.boundedQueue.Release(sizeBytes); releaseErr != nil {
58+
r.logger.Error("Error releasing bytes from semaphore", zap.Error(releaseErr))
59+
}
60+
}()
61+
62+
err = r.nextConsumer.ConsumeLogs(ctx, ld)
4163
r.obsrecv.EndLogsOp(ctx, dataFormatProtobuf, numSpans, err)
4264

4365
return plogotlp.NewExportResponse(), err

0 commit comments

Comments
 (0)