diff --git a/.chloggen/add-support-for-gc.yaml b/.chloggen/add-support-for-gc.yaml new file mode 100644 index 00000000000..646c60ce0c8 --- /dev/null +++ b/.chloggen/add-support-for-gc.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: memorylimiter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add support to configure min GC intervals for soft and hard limits. + +# One or more tracking issues or pull requests related to the change +issues: [12450] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/extension/memorylimiterextension/factory.go b/extension/memorylimiterextension/factory.go index 197b112c7a3..72f40dce67e 100644 --- a/extension/memorylimiterextension/factory.go +++ b/extension/memorylimiterextension/factory.go @@ -11,6 +11,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/extension" "go.opentelemetry.io/collector/extension/memorylimiterextension/internal/metadata" + "go.opentelemetry.io/collector/internal/memorylimiter" ) // NewFactory returns a new factory for the Memory Limiter extension. @@ -25,7 +26,7 @@ func NewFactory() extension.Factory { // CreateDefaultConfig creates the default configuration for extension. Notice // that the default configuration is expected to fail for this extension. func createDefaultConfig() component.Config { - return &Config{} + return memorylimiter.NewDefaultConfig() } func create(_ context.Context, set extension.Settings, cfg component.Config) (extension.Extension, error) { diff --git a/extension/memorylimiterextension/memorylimiter_test.go b/extension/memorylimiterextension/memorylimiter_test.go index da651bc492a..6edbc3bd2d4 100644 --- a/extension/memorylimiterextension/memorylimiter_test.go +++ b/extension/memorylimiterextension/memorylimiter_test.go @@ -13,7 +13,7 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/zap" - "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/internal/memorylimiter" "go.opentelemetry.io/collector/internal/memorylimiter/iruntime" ) @@ -70,14 +70,20 @@ func TestMemoryPressureResponse(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - memorylimiter.GetMemoryFn = totalMemory + memorylimiter.GetMemoryFn = func() (uint64, error) { + return uint64(2048), nil + } memorylimiter.ReadMemStatsFn = func(ms *runtime.MemStats) { ms.Alloc = tt.memAlloc } + t.Cleanup(func() { + memorylimiter.GetMemoryFn = iruntime.TotalMemory + memorylimiter.ReadMemStatsFn = runtime.ReadMemStats + }) ml, err := newMemoryLimiter(tt.mlCfg, zap.NewNop()) assert.NoError(t, err) - assert.NoError(t, ml.Start(ctx, &mockHost{})) + assert.NoError(t, ml.Start(ctx, componenttest.NewNopHost())) ml.memLimiter.CheckMemLimits() mustRefuse := ml.MustRefuse() if tt.expectError { @@ -88,20 +94,4 @@ func TestMemoryPressureResponse(t *testing.T) { assert.NoError(t, ml.Shutdown(ctx)) }) } - t.Cleanup(func() { - memorylimiter.GetMemoryFn = iruntime.TotalMemory - memorylimiter.ReadMemStatsFn = runtime.ReadMemStats - }) -} - -type mockHost struct { - component.Host -} - -func (h *mockHost) GetExtensions() map[component.ID]component.Component { - return make(map[component.ID]component.Component) -} - -func totalMemory() (uint64, error) { - return uint64(2048), nil } diff --git a/internal/memorylimiter/config.go b/internal/memorylimiter/config.go index bb20ad6565a..76610e3c73e 100644 --- a/internal/memorylimiter/config.go +++ b/internal/memorylimiter/config.go @@ -12,6 +12,7 @@ import ( var ( errCheckIntervalOutOfRange = errors.New("'check_interval' must be greater than zero") + errInconsistentGCMinInterval = errors.New("'min_gc_interval_when_soft_limited' should be larger than 'min_gc_interval_when_hard_limited'") errLimitOutOfRange = errors.New("'limit_mib' or 'limit_percentage' must be greater than zero") errSpikeLimitOutOfRange = errors.New("'spike_limit_mib' must be smaller than 'limit_mib'") errSpikeLimitPercentageOutOfRange = errors.New("'spike_limit_percentage' must be smaller than 'limit_percentage'") @@ -26,6 +27,16 @@ type Config struct { // checks will be performed. CheckInterval time.Duration `mapstructure:"check_interval"` + // MinGCIntervalWhenSoftLimited minimum interval between forced GC when in soft (=limit_mib - spike_limit_mib) limited mode. + // Zero value means no minimum interval. + // GCs is a CPU-heavy operation and executing it too frequently may affect the recovery capabilities of the collector. + MinGCIntervalWhenSoftLimited time.Duration `mapstructure:"min_gc_interval_when_soft_limited"` + + // MinGCIntervalWhenHardLimited minimum interval between forced GC when in hard (=limit_mib) limited mode. + // Zero value means no minimum interval. + // GCs is a CPU-heavy operation and executing it too frequently may affect the recovery capabilities of the collector. + MinGCIntervalWhenHardLimited time.Duration `mapstructure:"min_gc_interval_when_hard_limited"` + // MemoryLimitMiB is the maximum amount of memory, in MiB, targeted to be // allocated by the process. MemoryLimitMiB uint32 `mapstructure:"limit_mib"` @@ -45,11 +56,20 @@ type Config struct { var _ component.Config = (*Config)(nil) +func NewDefaultConfig() *Config { + return &Config{ + MinGCIntervalWhenSoftLimited: 10 * time.Second, + } +} + // Validate checks if the processor configuration is valid func (cfg *Config) Validate() error { if cfg.CheckInterval <= 0 { return errCheckIntervalOutOfRange } + if cfg.MinGCIntervalWhenSoftLimited < cfg.MinGCIntervalWhenHardLimited { + return errInconsistentGCMinInterval + } if cfg.MemoryLimitMiB == 0 && cfg.MemoryLimitPercentage == 0 { return errLimitOutOfRange } diff --git a/internal/memorylimiter/config_test.go b/internal/memorylimiter/config_test.go index 320384d344d..5861ad683bb 100644 --- a/internal/memorylimiter/config_test.go +++ b/internal/memorylimiter/config_test.go @@ -84,6 +84,17 @@ func TestConfigValidate(t *testing.T) { }, err: errSpikeLimitPercentageOutOfRange, }, + { + name: "invalid gc intervals", + cfg: &Config{ + CheckInterval: 100 * time.Millisecond, + MinGCIntervalWhenSoftLimited: 50 * time.Millisecond, + MinGCIntervalWhenHardLimited: 100 * time.Millisecond, + MemoryLimitMiB: 5722, + MemorySpikeLimitMiB: 1907, + }, + err: errInconsistentGCMinInterval, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/internal/memorylimiter/go.mod b/internal/memorylimiter/go.mod index 652e6152072..7ae32f4c1ce 100644 --- a/internal/memorylimiter/go.mod +++ b/internal/memorylimiter/go.mod @@ -25,16 +25,18 @@ require ( github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect + github.com/rogpeppe/go-internal v1.13.1 // indirect github.com/tklauser/go-sysconf v0.3.12 // indirect github.com/tklauser/numcpus v0.6.1 // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect go.opentelemetry.io/collector/pdata v1.26.0 // indirect go.opentelemetry.io/otel v1.34.0 // indirect go.opentelemetry.io/otel/metric v1.34.0 // indirect + go.opentelemetry.io/otel/sdk/metric v1.34.0 // indirect go.opentelemetry.io/otel/trace v1.34.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/net v0.33.0 // indirect - golang.org/x/sys v0.28.0 // indirect + golang.org/x/sys v0.29.0 // indirect golang.org/x/text v0.21.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20241202173237-19429a94021a // indirect google.golang.org/grpc v1.70.0 // indirect diff --git a/internal/memorylimiter/go.sum b/internal/memorylimiter/go.sum index 157d8f74147..a2fbb81e69d 100644 --- a/internal/memorylimiter/go.sum +++ b/internal/memorylimiter/go.sum @@ -41,8 +41,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c h1:ncq/mPwQF4JjgDlrVEn3C11VoGHZN7m8qihwgMEtzYw= github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE= -github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= -github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= +github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= +github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= github.com/shirou/gopsutil/v4 v4.25.1 h1:QSWkTc+fu9LTAWfkZwZ6j8MSUk4A2LV7rbH0ZqmLjXs= github.com/shirou/gopsutil/v4 v4.25.1/go.mod h1:RoUCUpndaJFtT+2zsZzzmhvbfGoDCJ7nFXKJf8GqJbI= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= @@ -61,10 +61,10 @@ go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY= go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI= go.opentelemetry.io/otel/metric v1.34.0 h1:+eTR3U0MyfWjRDhmFMxe2SsW64QrZ84AOhvqS7Y+PoQ= go.opentelemetry.io/otel/metric v1.34.0/go.mod h1:CEDrp0fy2D0MvkXE+dPV7cMi8tWZwX3dmaIhwPOaqHE= -go.opentelemetry.io/otel/sdk v1.32.0 h1:RNxepc9vK59A8XsgZQouW8ue8Gkb4jpWtJm9ge5lEG4= -go.opentelemetry.io/otel/sdk v1.32.0/go.mod h1:LqgegDBjKMmb2GC6/PrTnteJG39I8/vJCAP9LlJXEjU= -go.opentelemetry.io/otel/sdk/metric v1.32.0 h1:rZvFnvmvawYb0alrYkjraqJq0Z4ZUJAiyYCU9snn1CU= -go.opentelemetry.io/otel/sdk/metric v1.32.0/go.mod h1:PWeZlq0zt9YkYAp3gjKZ0eicRYvOh1Gd+X99x6GHpCQ= +go.opentelemetry.io/otel/sdk v1.34.0 h1:95zS4k/2GOy069d321O8jWgYsW3MzVV+KuSPKp7Wr1A= +go.opentelemetry.io/otel/sdk v1.34.0/go.mod h1:0e/pNiaMAqaykJGKbi+tSjWfNNHMTxoC9qANsCzbyxU= +go.opentelemetry.io/otel/sdk/metric v1.34.0 h1:5CeK9ujjbFVL5c1PhLuStg1wxA7vQv7ce1EK0Gyvahk= +go.opentelemetry.io/otel/sdk/metric v1.34.0/go.mod h1:jQ/r8Ze28zRKoNRdkjCZxfs6YvBTG1+YIqyFVFYec5w= go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC8mh/k= go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= @@ -94,8 +94,8 @@ golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= -golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU= +golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= diff --git a/internal/memorylimiter/memorylimiter.go b/internal/memorylimiter/memorylimiter.go index 53371b8b85c..906e6217c7b 100644 --- a/internal/memorylimiter/memorylimiter.go +++ b/internal/memorylimiter/memorylimiter.go @@ -20,10 +20,6 @@ import ( const ( mibBytes = 1024 * 1024 - - // Minimum interval between forced GC when in soft limited mode. We don't want to - // do GCs too frequently since it is a CPU-heavy operation. - minGCIntervalWhenSoftLimited = 10 * time.Second ) var ( @@ -50,11 +46,14 @@ type MemoryLimiter struct { ticker *time.Ticker - lastGCDone time.Time + minGCIntervalWhenSoftLimited time.Duration + minGCIntervalWhenHardLimited time.Duration + lastGCDone time.Time - // The function to read the mem values is set as a reference to help with + // The functions to read the mem values and run GC are set as a reference to help with // testing different values. readMemStatsFn func(m *runtime.MemStats) + runGCFn func() // Fields used for logging. logger *zap.Logger @@ -78,18 +77,20 @@ func NewMemoryLimiter(cfg *Config, logger *zap.Logger) (*MemoryLimiter, error) { zap.Duration("check_interval", cfg.CheckInterval)) return &MemoryLimiter{ - usageChecker: *usageChecker, - memCheckWait: cfg.CheckInterval, - ticker: time.NewTicker(cfg.CheckInterval), - readMemStatsFn: ReadMemStatsFn, - logger: logger, - mustRefuse: &atomic.Bool{}, + usageChecker: *usageChecker, + memCheckWait: cfg.CheckInterval, + ticker: time.NewTicker(cfg.CheckInterval), + minGCIntervalWhenSoftLimited: cfg.MinGCIntervalWhenSoftLimited, + minGCIntervalWhenHardLimited: cfg.MinGCIntervalWhenHardLimited, + lastGCDone: time.Now(), + readMemStatsFn: ReadMemStatsFn, + runGCFn: runtime.GC, + logger: logger, + mustRefuse: &atomic.Bool{}, }, nil } -// startMonitoring starts a single ticker'd goroutine per instance -// that will check memory usage every checkInterval period. -func (ml *MemoryLimiter) startMonitoring() { +func (ml *MemoryLimiter) Start(_ context.Context, _ component.Host) error { ml.refCounterLock.Lock() defer ml.refCounterLock.Unlock() @@ -110,10 +111,6 @@ func (ml *MemoryLimiter) startMonitoring() { } }() } -} - -func (ml *MemoryLimiter) Start(_ context.Context, _ component.Host) error { - ml.startMonitoring() return nil } @@ -167,7 +164,7 @@ func memstatToZapField(ms *runtime.MemStats) zap.Field { } func (ml *MemoryLimiter) doGCandReadMemStats() *runtime.MemStats { - runtime.GC() + ml.runGCFn() ml.lastGCDone = time.Now() ms := ml.readMemStats() ml.logger.Info("Memory usage after GC.", memstatToZapField(ms)) @@ -180,38 +177,42 @@ func (ml *MemoryLimiter) CheckMemLimits() { ml.logger.Debug("Currently used memory.", memstatToZapField(ms)) - if ml.usageChecker.aboveHardLimit(ms) { - ml.logger.Warn("Memory usage is above hard limit. Forcing a GC.", memstatToZapField(ms)) - ms = ml.doGCandReadMemStats() - } - - // Remember current state. - wasRefusing := ml.mustRefuse.Load() - - // Check if the memory usage is above the soft limit. - mustRefuse := ml.usageChecker.aboveSoftLimit(ms) - - if wasRefusing && !mustRefuse { - // Was previously refusing but enough memory is available now, no need to limit. - ml.logger.Info("Memory usage back within limits. Resuming normal operation.", memstatToZapField(ms)) + // Check if we are below the soft limit. + aboveSoftLimit := ml.usageChecker.aboveSoftLimit(ms) + if !aboveSoftLimit { + if ml.mustRefuse.Load() { + // Was previously refusing but enough memory is available now, no need to limit. + ml.logger.Info("Memory usage back within limits. Resuming normal operation.", memstatToZapField(ms)) + } + ml.mustRefuse.Store(aboveSoftLimit) + return } - if !wasRefusing && mustRefuse { + if ml.usageChecker.aboveHardLimit(ms) { + // We are above hard limit, do a GC if it wasn't done recently and see if + // it brings memory usage below the soft limit. + if time.Since(ml.lastGCDone) > ml.minGCIntervalWhenHardLimited { + ml.logger.Warn("Memory usage is above hard limit. Forcing a GC.", memstatToZapField(ms)) + ms = ml.doGCandReadMemStats() + // Check the limit again to see if GC helped. + aboveSoftLimit = ml.usageChecker.aboveSoftLimit(ms) + } + } else { // We are above soft limit, do a GC if it wasn't done recently and see if // it brings memory usage below the soft limit. - if time.Since(ml.lastGCDone) > minGCIntervalWhenSoftLimited { + if time.Since(ml.lastGCDone) > ml.minGCIntervalWhenSoftLimited { ml.logger.Info("Memory usage is above soft limit. Forcing a GC.", memstatToZapField(ms)) ms = ml.doGCandReadMemStats() // Check the limit again to see if GC helped. - mustRefuse = ml.usageChecker.aboveSoftLimit(ms) + aboveSoftLimit = ml.usageChecker.aboveSoftLimit(ms) } + } - if mustRefuse { - ml.logger.Warn("Memory usage is above soft limit. Refusing data.", memstatToZapField(ms)) - } + if !ml.mustRefuse.Load() && aboveSoftLimit { + ml.logger.Warn("Memory usage is above soft limit. Refusing data.", memstatToZapField(ms)) } - ml.mustRefuse.Store(mustRefuse) + ml.mustRefuse.Store(aboveSoftLimit) } type memUsageChecker struct { diff --git a/internal/memorylimiter/memorylimiter_test.go b/internal/memorylimiter/memorylimiter_test.go index 6096877ccdc..1f26d024cc8 100644 --- a/internal/memorylimiter/memorylimiter_test.go +++ b/internal/memorylimiter/memorylimiter_test.go @@ -5,8 +5,8 @@ package memorylimiter import ( "runtime" - "sync/atomic" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -19,15 +19,15 @@ import ( // check expected side effects. func TestMemoryPressureResponse(t *testing.T) { var currentMemAlloc uint64 - ml := &MemoryLimiter{ - usageChecker: memUsageChecker{ - memAllocLimit: 1024, - }, - mustRefuse: &atomic.Bool{}, - readMemStatsFn: func(ms *runtime.MemStats) { - ms.Alloc = currentMemAlloc - }, - logger: zap.NewNop(), + cfg := &Config{ + CheckInterval: 1 * time.Minute, + MemoryLimitMiB: 1024, + MemorySpikeLimitMiB: 0, + } + ml, err := NewMemoryLimiter(cfg, zap.NewNop()) + require.NoError(t, err) + ml.readMemStatsFn = func(ms *runtime.MemStats) { + ms.Alloc = currentMemAlloc * mibBytes } // Below memAllocLimit. @@ -41,7 +41,7 @@ func TestMemoryPressureResponse(t *testing.T) { assert.True(t, ml.MustRefuse()) // Check spike limit - ml.usageChecker.memSpikeLimit = 512 + ml.usageChecker.memSpikeLimit = 512 * mibBytes // Below memSpikeLimit. currentMemAlloc = 500 @@ -132,3 +132,99 @@ func TestRefuseDecision(t *testing.T) { }) } } + +func TestCallGCWhenSoftLimit(t *testing.T) { + tests := []struct { + name string + mlCfg *Config + memAllocMiB [2]uint64 + numGCs int + }{ + { + name: "GC when first soft limit and not immediately", + mlCfg: &Config{ + CheckInterval: 1 * time.Minute, + MinGCIntervalWhenSoftLimited: 10 * time.Second, + MemoryLimitMiB: 50, + MemorySpikeLimitMiB: 10, + }, + memAllocMiB: [2]uint64{45, 45}, + numGCs: 1, + }, + { + name: "GC always when soft limit min interval is 0", + mlCfg: &Config{ + CheckInterval: 1 * time.Minute, + MinGCIntervalWhenSoftLimited: 0, + MemoryLimitMiB: 50, + MemorySpikeLimitMiB: 10, + }, + memAllocMiB: [2]uint64{45, 45}, + numGCs: 2, + }, + { + name: "GC when first hard limit and not immediately", + mlCfg: &Config{ + CheckInterval: 1 * time.Minute, + MinGCIntervalWhenHardLimited: 10 * time.Second, + MemoryLimitMiB: 50, + MemorySpikeLimitMiB: 10, + }, + memAllocMiB: [2]uint64{55, 55}, + numGCs: 1, + }, + { + name: "GC always when hard limit min interval is 0", + mlCfg: &Config{ + CheckInterval: 1 * time.Minute, + MinGCIntervalWhenHardLimited: 0, + MemoryLimitMiB: 50, + MemorySpikeLimitMiB: 10, + }, + memAllocMiB: [2]uint64{55, 55}, + numGCs: 2, + }, + { + name: "GC based on soft then based on hard limit", + mlCfg: &Config{ + CheckInterval: 1 * time.Minute, + MinGCIntervalWhenSoftLimited: 10 * time.Second, + MinGCIntervalWhenHardLimited: 0, + MemoryLimitMiB: 50, + MemorySpikeLimitMiB: 10, + }, + memAllocMiB: [2]uint64{45, 55}, + numGCs: 2, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ml, err := NewMemoryLimiter(tt.mlCfg, zap.NewNop()) + require.NoError(t, err) + memAllocMiB := uint64(0) + ml.readMemStatsFn = func(ms *runtime.MemStats) { + ms.Alloc = memAllocMiB * mibBytes + } + // Mark last GC in the past so that even first call can trigger GC + // Not updating the initialization code, since at the beginning of the collector no need to GC. + ml.lastGCDone = ml.lastGCDone.Add(-time.Minute) + numGCs := 0 + ml.runGCFn = func() { + numGCs++ + } + + memAllocMiB = tt.memAllocMiB[0] + ml.CheckMemLimits() + assert.True(t, ml.MustRefuse()) + + // On windows, time has larger precision, and checking here again may return same time as "lastGCDone" + // which will not trigger a new GC for 0 duration, update last GC with -1 millis. + ml.lastGCDone = ml.lastGCDone.Add(-1 * time.Millisecond) + memAllocMiB = tt.memAllocMiB[1] + ml.CheckMemLimits() + assert.True(t, ml.MustRefuse()) + + assert.Equal(t, tt.numGCs, numGCs) + }) + } +} diff --git a/processor/memorylimiterprocessor/factory.go b/processor/memorylimiterprocessor/factory.go index d65246c3940..39ae9bf782a 100644 --- a/processor/memorylimiterprocessor/factory.go +++ b/processor/memorylimiterprocessor/factory.go @@ -11,6 +11,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/internal/memorylimiter" "go.opentelemetry.io/collector/internal/telemetry" "go.opentelemetry.io/collector/internal/telemetry/componentattribute" "go.opentelemetry.io/collector/processor" @@ -43,7 +44,7 @@ func NewFactory() processor.Factory { // CreateDefaultConfig creates the default configuration for processor. Notice // that the default configuration is expected to fail for this processor. func createDefaultConfig() component.Config { - return &Config{} + return memorylimiter.NewDefaultConfig() } func (f *factory) createTraces(