Skip to content

Commit b7df5d4

Browse files
committed
[DRAFT] Option to apply memory_limiter extension on all receivers
1 parent d97e019 commit b7df5d4

File tree

13 files changed

+185
-57
lines changed

13 files changed

+185
-57
lines changed

.chloggen/timn_memorylimiterextension-HTTPServerConfig.yaml

Lines changed: 0 additions & 25 deletions
This file was deleted.

config/configgrpc/configgrpc.go

Lines changed: 44 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -283,21 +283,56 @@ func (gss *ServerConfig) ToListenerContext(ctx context.Context) (net.Listener, e
283283
return gss.NetAddr.Listen(ctx)
284284
}
285285

286-
func (gss *ServerConfig) ToServer(host component.Host, settings component.TelemetrySettings, extraOpts ...grpc.ServerOption) (*grpc.Server, error) {
287-
opts, err := gss.toServerOption(host, settings)
286+
// ToListener returns the net.Listener constructed from the settings.
287+
// Deprecated: [v0.94.0] Call Listen directly on the NetAddr field.
288+
func (gss *ServerConfig) ToListener() (net.Listener, error) {
289+
return gss.ToListenerContext(context.Background())
290+
}
291+
292+
func (gss *ServerConfig) ToServer(host component.Host, settings component.TelemetrySettings, tso ...ToServerOption) (*grpc.Server, error) {
293+
opts, err := gss.toServerOption(host, settings, tso)
288294
if err != nil {
289295
return nil, err
290296
}
291-
opts = append(opts, extraOpts...)
292297
return grpc.NewServer(opts...), nil
293298
}
294299

295-
func (gss *ServerConfig) toServerOption(host component.Host, settings component.TelemetrySettings) ([]grpc.ServerOption, error) {
300+
// toServerOptions has options that change the behavior of the HTTP server
301+
// returned by ServerConfig.ToServer().
302+
type toServerOptions struct {
303+
memoryLimiter *component.ID
304+
grpcOptions []grpc.ServerOption
305+
}
306+
307+
// ToServerOption is an option to change the behavior of the HTTP server
308+
// returned by ServerConfig.ToServer().
309+
type ToServerOption func(opts *toServerOptions)
310+
311+
// WithGRPCServerOption adds a grpc.ServerOption to the server.
312+
func WithGRPCServerOption(opt grpc.ServerOption) ToServerOption {
313+
return func(opts *toServerOptions) {
314+
opts.grpcOptions = append(opts.grpcOptions, opt)
315+
}
316+
}
317+
318+
// WithMemoryLimiter sets the memory limiter to be used by the server.
319+
func WithMemoryLimiter(memoryLimiter *component.ID) ToServerOption {
320+
return func(opts *toServerOptions) {
321+
opts.memoryLimiter = memoryLimiter
322+
}
323+
}
324+
325+
func (gss *ServerConfig) toServerOption(host component.Host, settings component.TelemetrySettings, tso []ToServerOption) ([]grpc.ServerOption, error) {
296326
switch gss.NetAddr.Transport {
297327
case "tcp", "tcp4", "tcp6", "udp", "udp4", "udp6":
298328
internal.WarnOnUnspecifiedHost(settings.Logger, gss.NetAddr.Endpoint)
299329
}
300330

331+
toServerOpts := &toServerOptions{}
332+
for _, o := range tso {
333+
o(toServerOpts)
334+
}
335+
301336
var opts []grpc.ServerOption
302337

303338
if gss.TLSSetting != nil {
@@ -369,6 +404,10 @@ func (gss *ServerConfig) toServerOption(host component.Host, settings component.
369404
})
370405
}
371406

407+
if toServerOpts.memoryLimiter != nil {
408+
// TODO: Add memory limiter interceptor.
409+
}
410+
372411
otelOpts := []otelgrpc.Option{
373412
otelgrpc.WithTracerProvider(settings.TracerProvider),
374413
otelgrpc.WithMeterProvider(settings.MeterProvider),
@@ -382,7 +421,7 @@ func (gss *ServerConfig) toServerOption(host component.Host, settings component.
382421

383422
opts = append(opts, grpc.StatsHandler(otelgrpc.NewServerHandler(otelOpts...)), grpc.ChainUnaryInterceptor(uInterceptors...), grpc.ChainStreamInterceptor(sInterceptors...))
384423

385-
return opts, nil
424+
return append(opts, toServerOpts.grpcOptions...), nil
386425
}
387426

388427
// getGRPCCompressionName returns compression name registered in grpc.

config/confighttp/confighttp.go

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -291,9 +291,6 @@ type ServerConfig struct {
291291
// Additional headers attached to each HTTP response sent to the client.
292292
// Header values are opaque since they may be sensitive.
293293
ResponseHeaders map[string]configopaque.String `mapstructure:"response_headers"`
294-
295-
// MemoryLimiter is memory limiter this receiver will use to restrict incoming requests
296-
MemoryLimiter *component.ID `mapstructure:"memory_limiter"`
297294
}
298295

299296
// ToListener creates a net.Listener.
@@ -318,8 +315,9 @@ func (hss *ServerConfig) ToListener() (net.Listener, error) {
318315
// toServerOptions has options that change the behavior of the HTTP server
319316
// returned by ServerConfig.ToServer().
320317
type toServerOptions struct {
321-
errHandler func(w http.ResponseWriter, r *http.Request, errorMsg string, statusCode int)
322-
decoders map[string]func(body io.ReadCloser) (io.ReadCloser, error)
318+
errHandler func(w http.ResponseWriter, r *http.Request, errorMsg string, statusCode int)
319+
decoders map[string]func(body io.ReadCloser) (io.ReadCloser, error)
320+
memoryLimiter *component.ID
323321
}
324322

325323
// ToServerOption is an option to change the behavior of the HTTP server
@@ -345,6 +343,12 @@ func WithDecoder(key string, dec func(body io.ReadCloser) (io.ReadCloser, error)
345343
}
346344
}
347345

346+
func WithMemoryLimiter(extID *component.ID) ToServerOption {
347+
return func(opts *toServerOptions) {
348+
opts.memoryLimiter = extID
349+
}
350+
}
351+
348352
// ToServer creates an http.Server from settings object.
349353
func (hss *ServerConfig) ToServer(host component.Host, settings component.TelemetrySettings, handler http.Handler, opts ...ToServerOption) (*http.Server, error) {
350354
internal.WarnOnUnspecifiedHost(settings.Logger, hss.Endpoint)
@@ -354,8 +358,8 @@ func (hss *ServerConfig) ToServer(host component.Host, settings component.Teleme
354358
o(serverOpts)
355359
}
356360

357-
if hss.MemoryLimiter != nil {
358-
ml, err := getMemoryLimiterExtension(hss.MemoryLimiter, host.GetExtensions())
361+
if serverOpts.memoryLimiter != nil {
362+
ml, err := getMemoryLimiterExtension(serverOpts.memoryLimiter, host.GetExtensions())
359363
if err != nil {
360364
return nil, err
361365
}

extension/extension.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,12 @@ type ConfigWatcher interface {
5151
NotifyConfig(ctx context.Context, conf *confmap.Conf) error
5252
}
5353

54+
type MemoryLimiter interface {
55+
Extension
56+
MustRefuse() bool
57+
ApplyToAllReceivers() bool
58+
}
59+
5460
// StatusWatcher is an extra interface for Extension hosted by the OpenTelemetry
5561
// Collector that is to be implemented by extensions interested in changes to component
5662
// status.

extension/memorylimiterextension/config.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@
33

44
package memorylimiterextension // import "go.opentelemetry.io/collector/extension/memorylimiterextension"
55

6-
import (
7-
"go.opentelemetry.io/collector/internal/memorylimiter"
8-
)
6+
import "go.opentelemetry.io/collector/internal/memorylimiter"
97

10-
type Config = memorylimiter.Config
8+
type Config struct {
9+
memorylimiter.Config `mapstructure:",squash"`
10+
ApplyToAllReceivers bool `mapstructure:"apply_to_all_receivers"`
11+
}

extension/memorylimiterextension/memorylimiter.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,21 @@ import (
1313
)
1414

1515
type memoryLimiterExtension struct {
16-
memLimiter *memorylimiter.MemoryLimiter
16+
memLimiter *memorylimiter.MemoryLimiter
17+
applyToAllReceivers bool
1718
}
1819

1920
// newMemoryLimiter returns a new memorylimiter extension.
2021
func newMemoryLimiter(cfg *Config, logger *zap.Logger) (*memoryLimiterExtension, error) {
21-
ml, err := memorylimiter.NewMemoryLimiter(cfg, logger)
22+
ml, err := memorylimiter.NewMemoryLimiter(&cfg.Config, logger)
2223
if err != nil {
2324
return nil, err
2425
}
2526

26-
return &memoryLimiterExtension{memLimiter: ml}, nil
27+
return &memoryLimiterExtension{
28+
memLimiter: ml,
29+
applyToAllReceivers: cfg.ApplyToAllReceivers,
30+
}, nil
2731
}
2832

2933
func (ml *memoryLimiterExtension) Start(ctx context.Context, host component.Host) error {
@@ -38,3 +42,8 @@ func (ml *memoryLimiterExtension) Shutdown(ctx context.Context) error {
3842
func (ml *memoryLimiterExtension) MustRefuse() bool {
3943
return ml.memLimiter.MustRefuse()
4044
}
45+
46+
// ApplyToAllReceivers returns if the memory limiter must be applied to all receivers.
47+
func (ml *memoryLimiterExtension) ApplyToAllReceivers() bool {
48+
return ml.applyToAllReceivers
49+
}

receiver/otlpreceiver/config.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@ type Protocols struct {
4444
type Config struct {
4545
// Protocols is the configuration for the supported protocols, currently gRPC and HTTP (Proto and JSON).
4646
Protocols `mapstructure:"protocols"`
47+
48+
// MemoryLimiter is memory limiter this receiver will use to restrict incoming requests to avoid OOM kills.
49+
MemoryLimiter *component.ID `mapstructure:"memory_limiter"`
4750
}
4851

4952
var _ component.Config = (*Config)(nil)

receiver/otlpreceiver/factory.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,15 @@ func NewFactory() receiver.Factory {
3434
receiver.WithTraces(createTraces, metadata.TracesStability),
3535
receiver.WithMetrics(createMetrics, metadata.MetricsStability),
3636
receiver.WithLogs(createLog, metadata.LogsStability),
37+
receiver.WithMemoryLimiter(memoryLimiter),
3738
)
3839
}
3940

41+
// memoryLimiter sets the memory limit for the receiver.
42+
func memoryLimiter(_ context.Context, _ receiver.CreateSettings, cfg component.Config) (*component.ID, error) {
43+
return cfg.(*Config).MemoryLimiter, nil
44+
}
45+
4046
// createDefaultConfig creates the default configuration for receiver.
4147
func createDefaultConfig() component.Config {
4248
return &Config{

receiver/otlpreceiver/otlp.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,8 @@ func (r *otlpReceiver) startHTTPServer(host component.Host) error {
145145
}
146146

147147
var err error
148-
if r.serverHTTP, err = r.cfg.HTTP.ToServer(host, r.settings.TelemetrySettings, httpMux, confighttp.WithErrorHandler(errorHandler)); err != nil {
148+
if r.serverHTTP, err = r.cfg.HTTP.ToServer(host, r.settings.TelemetrySettings, httpMux,
149+
confighttp.WithErrorHandler(errorHandler), confighttp.WithMemoryLimiter(r.settings.MemoryLimiter)); err != nil {
149150
return err
150151
}
151152

receiver/receiver.go

Lines changed: 58 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package receiver // import "go.opentelemetry.io/collector/receiver"
55

66
import (
77
"context"
8+
"errors"
89
"fmt"
910

1011
"go.uber.org/zap"
@@ -49,6 +50,10 @@ type CreateSettings struct {
4950

5051
// BuildInfo can be used by components for informational purposes.
5152
BuildInfo component.BuildInfo
53+
54+
// MemoryLimiter returns a memory limiter for the receiver.
55+
// If the receiver supports memory limiting, it must use this MemoryLimiter.
56+
MemoryLimiter *component.ID
5257
}
5358

5459
// Factory is factory interface for receivers.
@@ -82,6 +87,12 @@ type Factory interface {
8287
// LogsReceiverStability gets the stability level of the LogsReceiver.
8388
LogsReceiverStability() component.StabilityLevel
8489

90+
// MemoryLimiter returns a memory limiter for the receiver.
91+
// Implementing this method means that the receiver supports memory limiting.
92+
// Even if calling this method returns nil, memory limiter can be applied implicitly, and the receiver must use
93+
// MemoryLimiter provided in CreateSettings.
94+
MemoryLimiter(ctx context.Context, set CreateSettings, cfg component.Config) (*component.ID, error)
95+
8596
unexportedFactoryFunc()
8697
}
8798

@@ -154,6 +165,7 @@ type factory struct {
154165
metricsStabilityLevel component.StabilityLevel
155166
CreateLogsFunc
156167
logsStabilityLevel component.StabilityLevel
168+
MemoryLimiterFunc
157169
}
158170

159171
func (f *factory) Type() component.Type {
@@ -198,6 +210,23 @@ func WithLogs(createLogsReceiver CreateLogsFunc, sl component.StabilityLevel) Fa
198210
})
199211
}
200212

213+
type MemoryLimiterFunc func(context.Context, CreateSettings, component.Config) (*component.ID, error)
214+
215+
// MemoryLimiter implements Factory.MemoryLimiter.
216+
func (f MemoryLimiterFunc) MemoryLimiter(ctx context.Context, set CreateSettings, cfg component.Config) (*component.ID, error) {
217+
if f == nil {
218+
return nil, errors.New("the receiver doesn't support memory limiting")
219+
}
220+
return f(ctx, set, cfg)
221+
}
222+
223+
// WithMemoryLimiter overrides the default "error not supported" implementation for MemoryLimiter.
224+
func WithMemoryLimiter(memoryLimiterFunc MemoryLimiterFunc) FactoryOption {
225+
return factoryOptionFunc(func(o *factory) {
226+
o.MemoryLimiterFunc = memoryLimiterFunc
227+
})
228+
}
229+
201230
// NewFactory returns a Factory.
202231
func NewFactory(cfgType component.Type, createDefaultConfig component.CreateDefaultConfigFunc, options ...FactoryOption) Factory {
203232
f := &factory{
@@ -246,8 +275,12 @@ func (b *Builder) CreateTraces(ctx context.Context, set CreateSettings, next con
246275
return nil, fmt.Errorf("receiver factory not available for: %q", set.ID)
247276
}
248277

278+
settings, err := updateSettings(ctx, set, f, cfg)
279+
if err != nil {
280+
return nil, err
281+
}
249282
logStabilityLevel(set.Logger, f.TracesReceiverStability())
250-
return f.CreateTracesReceiver(ctx, set, cfg, next)
283+
return f.CreateTracesReceiver(ctx, settings, cfg, next)
251284
}
252285

253286
// CreateMetrics creates a Metrics receiver based on the settings and config.
@@ -262,8 +295,12 @@ func (b *Builder) CreateMetrics(ctx context.Context, set CreateSettings, next co
262295
return nil, fmt.Errorf("receiver factory not available for: %q", set.ID)
263296
}
264297

298+
settings, err := updateSettings(ctx, set, f, cfg)
299+
if err != nil {
300+
return nil, err
301+
}
265302
logStabilityLevel(set.Logger, f.MetricsReceiverStability())
266-
return f.CreateMetricsReceiver(ctx, set, cfg, next)
303+
return f.CreateMetricsReceiver(ctx, settings, cfg, next)
267304
}
268305

269306
// CreateLogs creates a Logs receiver based on the settings and config.
@@ -278,8 +315,12 @@ func (b *Builder) CreateLogs(ctx context.Context, set CreateSettings, next consu
278315
return nil, fmt.Errorf("receiver factory not available for: %q", set.ID)
279316
}
280317

318+
settings, err := updateSettings(ctx, set, f, cfg)
319+
if err != nil {
320+
return nil, err
321+
}
281322
logStabilityLevel(set.Logger, f.LogsReceiverStability())
282-
return f.CreateLogsReceiver(ctx, set, cfg, next)
323+
return f.CreateLogsReceiver(ctx, settings, cfg, next)
283324
}
284325

285326
func (b *Builder) Factory(componentType component.Type) component.Factory {
@@ -296,3 +337,17 @@ func logStabilityLevel(logger *zap.Logger, sl component.StabilityLevel) {
296337
logger.Info(sl.LogMessage())
297338
}
298339
}
340+
341+
func updateSettings(ctx context.Context, set CreateSettings, f Factory, cfg component.Config) (CreateSettings, error) {
342+
ml, err := f.MemoryLimiter(ctx, set, cfg)
343+
344+
// Global memory limiter provided, but the receiver does not support it.
345+
if set.MemoryLimiter != nil && err != nil {
346+
return set, err
347+
}
348+
349+
if ml != nil {
350+
set.MemoryLimiter = ml
351+
}
352+
return set, nil
353+
}

0 commit comments

Comments
 (0)