Skip to content

Commit 21a70d6

Browse files
Add a memory limiter processor (#498)
This adds a processor that drops data according to configured memory limits. The processor is important for high load situations when receiving rate exceeds exporting rate (and an extreme case of this is when the target of exporting is unavailable). Typical production run will need to have this processor included in every pipeline immediately after the batch processor.
1 parent 9778b16 commit 21a70d6

File tree

13 files changed

+922
-13
lines changed

13 files changed

+922
-13
lines changed

defaults/defaults.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"github.com/open-telemetry/opentelemetry-collector/processor"
3333
"github.com/open-telemetry/opentelemetry-collector/processor/attributesprocessor"
3434
"github.com/open-telemetry/opentelemetry-collector/processor/batchprocessor"
35+
"github.com/open-telemetry/opentelemetry-collector/processor/memorylimiter"
3536
"github.com/open-telemetry/opentelemetry-collector/processor/queuedprocessor"
3637
"github.com/open-telemetry/opentelemetry-collector/processor/samplingprocessor/probabilisticsamplerprocessor"
3738
"github.com/open-telemetry/opentelemetry-collector/processor/samplingprocessor/tailsamplingprocessor"
@@ -87,6 +88,7 @@ func Components() (
8788
&attributesprocessor.Factory{},
8889
&queuedprocessor.Factory{},
8990
&batchprocessor.Factory{},
91+
&memorylimiter.Factory{},
9092
&tailsamplingprocessor.Factory{},
9193
&probabilisticsamplerprocessor.Factory{},
9294
)

defaults/defaults_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636
"github.com/open-telemetry/opentelemetry-collector/processor"
3737
"github.com/open-telemetry/opentelemetry-collector/processor/attributesprocessor"
3838
"github.com/open-telemetry/opentelemetry-collector/processor/batchprocessor"
39+
"github.com/open-telemetry/opentelemetry-collector/processor/memorylimiter"
3940
"github.com/open-telemetry/opentelemetry-collector/processor/queuedprocessor"
4041
"github.com/open-telemetry/opentelemetry-collector/processor/samplingprocessor/probabilisticsamplerprocessor"
4142
"github.com/open-telemetry/opentelemetry-collector/processor/samplingprocessor/tailsamplingprocessor"
@@ -64,6 +65,7 @@ func TestDefaultComponents(t *testing.T) {
6465
"attributes": &attributesprocessor.Factory{},
6566
"queued_retry": &queuedprocessor.Factory{},
6667
"batch": &batchprocessor.Factory{},
68+
"memory_limiter": &memorylimiter.Factory{},
6769
"tail_sampling": &tailsamplingprocessor.Factory{},
6870
"probabilistic_sampler": &probabilisticsamplerprocessor.Factory{},
6971
}

processor/memorylimiter/config.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
// Copyright 2019, OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
// Package memorylimiter provides a processor for OpenTelemetry Service pipeline
16+
// that drops data on the pipeline according to the current state of memory
17+
// usage.
18+
package memorylimiter
19+
20+
import (
21+
"time"
22+
23+
"github.com/open-telemetry/opentelemetry-collector/config/configmodels"
24+
)
25+
26+
// Config defines configuration for memory memoryLimiter processor.
27+
type Config struct {
28+
configmodels.ProcessorSettings `mapstructure:",squash"`
29+
30+
// CheckInterval is the time between measurements of memory usage for the
31+
// purposes of avoiding going over the limits. Defaults to zero, so no
32+
// checks will be performed.
33+
CheckInterval time.Duration `mapstructure:"check_interval"`
34+
35+
// MemoryLimitMiB is the maximum amount of memory, in MiB, targeted to be
36+
// allocated by the process.
37+
MemoryLimitMiB uint32 `mapstructure:"limit_mib"`
38+
39+
// MemorySpikeLimitMiB is the maximum, in MiB, spike expected between the
40+
// measurements of memory usage.
41+
MemorySpikeLimitMiB uint32 `mapstructure:"spike_limit_mib"`
42+
43+
// BallastSizeMiB is the size, in MiB, of the ballast size being used by the
44+
// process.
45+
BallastSizeMiB uint32 `mapstructure:"ballast_size_mib"`
46+
}
47+
48+
// Name of BallastSizeMiB config option.
49+
const ballastSizeMibKey = "ballast_size_mib"
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
// Copyright 2019, OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package memorylimiter
16+
17+
import (
18+
"path"
19+
"testing"
20+
"time"
21+
22+
"github.com/stretchr/testify/assert"
23+
"github.com/stretchr/testify/require"
24+
25+
"github.com/open-telemetry/opentelemetry-collector/config"
26+
"github.com/open-telemetry/opentelemetry-collector/config/configmodels"
27+
)
28+
29+
func TestLoadConfig(t *testing.T) {
30+
factories, err := config.ExampleComponents()
31+
require.NoError(t, err)
32+
factory := &Factory{}
33+
factories.Processors[typeStr] = factory
34+
require.NoError(t, err)
35+
36+
config, err := config.LoadConfigFile(
37+
t,
38+
path.Join(".", "testdata", "config.yaml"),
39+
factories)
40+
41+
require.Nil(t, err)
42+
require.NotNil(t, config)
43+
44+
p0 := config.Processors["memory_limiter"]
45+
assert.Equal(t, p0,
46+
&Config{
47+
ProcessorSettings: configmodels.ProcessorSettings{
48+
TypeVal: "memory_limiter",
49+
NameVal: "memory_limiter",
50+
},
51+
})
52+
53+
p1 := config.Processors["memory_limiter/with-settings"]
54+
assert.Equal(t, p1,
55+
&Config{
56+
ProcessorSettings: configmodels.ProcessorSettings{
57+
TypeVal: "memory_limiter",
58+
NameVal: "memory_limiter/with-settings",
59+
},
60+
CheckInterval: 5 * time.Second,
61+
MemoryLimitMiB: 4000,
62+
MemorySpikeLimitMiB: 500,
63+
BallastSizeMiB: 2000,
64+
})
65+
}

processor/memorylimiter/factory.go

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
// Copyright 2019, OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package memorylimiter
16+
17+
import (
18+
"go.uber.org/zap"
19+
20+
"github.com/open-telemetry/opentelemetry-collector/config/configmodels"
21+
"github.com/open-telemetry/opentelemetry-collector/consumer"
22+
"github.com/open-telemetry/opentelemetry-collector/processor"
23+
)
24+
25+
const (
26+
// The value of "type" Attribute Key in configuration.
27+
typeStr = "memory_limiter"
28+
)
29+
30+
// Factory is the factory for Attribute Key processor.
31+
type Factory struct {
32+
}
33+
34+
// Type gets the type of the config created by this factory.
35+
func (f *Factory) Type() string {
36+
return typeStr
37+
}
38+
39+
// CreateDefaultConfig creates the default configuration for processor. Notice
40+
// that the default configuration is expected to fail for this processor.
41+
func (f *Factory) CreateDefaultConfig() configmodels.Processor {
42+
return &Config{
43+
ProcessorSettings: configmodels.ProcessorSettings{
44+
TypeVal: typeStr,
45+
NameVal: typeStr,
46+
},
47+
}
48+
}
49+
50+
// CreateTraceProcessor creates a trace processor based on this config.
51+
func (f *Factory) CreateTraceProcessor(
52+
logger *zap.Logger,
53+
nextConsumer consumer.TraceConsumer,
54+
cfg configmodels.Processor,
55+
) (processor.TraceProcessor, error) {
56+
return f.createProcessor(logger, nextConsumer, nil, cfg)
57+
}
58+
59+
// CreateMetricsProcessor creates a metrics processor based on this config.
60+
func (f *Factory) CreateMetricsProcessor(
61+
logger *zap.Logger,
62+
nextConsumer consumer.MetricsConsumer,
63+
cfg configmodels.Processor,
64+
) (processor.MetricsProcessor, error) {
65+
return f.createProcessor(logger, nil, nextConsumer, cfg)
66+
}
67+
68+
func (f *Factory) createProcessor(
69+
logger *zap.Logger,
70+
traceConsumer consumer.TraceConsumer,
71+
metricConsumer consumer.MetricsConsumer,
72+
cfg configmodels.Processor,
73+
) (processor.DualTypeProcessor, error) {
74+
const mibBytes = 1024 * 1024
75+
pCfg := cfg.(*Config)
76+
return New(
77+
cfg.Name(),
78+
traceConsumer,
79+
metricConsumer,
80+
pCfg.CheckInterval,
81+
uint64(pCfg.MemoryLimitMiB)*mibBytes,
82+
uint64(pCfg.MemorySpikeLimitMiB)*mibBytes,
83+
uint64(pCfg.BallastSizeMiB)*mibBytes,
84+
logger,
85+
)
86+
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
// Copyright 2019, OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package memorylimiter
16+
17+
import (
18+
"testing"
19+
"time"
20+
21+
"github.com/stretchr/testify/assert"
22+
"github.com/stretchr/testify/require"
23+
"go.uber.org/zap"
24+
25+
"github.com/open-telemetry/opentelemetry-collector/config/configcheck"
26+
"github.com/open-telemetry/opentelemetry-collector/exporter/exportertest"
27+
)
28+
29+
func TestCreateDefaultConfig(t *testing.T) {
30+
factory := &Factory{}
31+
require.NotNil(t, factory)
32+
33+
cfg := factory.CreateDefaultConfig()
34+
assert.NotNil(t, cfg, "failed to create default config")
35+
assert.NoError(t, configcheck.ValidateConfig(cfg))
36+
}
37+
38+
func TestCreateProcessor(t *testing.T) {
39+
factory := &Factory{}
40+
require.NotNil(t, factory)
41+
42+
cfg := factory.CreateDefaultConfig()
43+
44+
// This processor can't be created with the default config.
45+
tp, err := factory.CreateTraceProcessor(zap.NewNop(), exportertest.NewNopTraceExporter(), cfg)
46+
assert.Nil(t, tp)
47+
assert.Error(t, err, "created processor with invalid settings")
48+
49+
mp, err := factory.CreateMetricsProcessor(zap.NewNop(), exportertest.NewNopMetricsExporter(), cfg)
50+
assert.Nil(t, mp)
51+
assert.Error(t, err, "created processor with invalid settings")
52+
53+
// Create processor with a valid config.
54+
pCfg := cfg.(*Config)
55+
pCfg.MemoryLimitMiB = 5722
56+
pCfg.MemorySpikeLimitMiB = 1907
57+
pCfg.BallastSizeMiB = 2048
58+
pCfg.CheckInterval = 100 * time.Millisecond
59+
60+
tp, err = factory.CreateTraceProcessor(zap.NewNop(), exportertest.NewNopTraceExporter(), cfg)
61+
assert.NoError(t, err)
62+
assert.NotNil(t, tp)
63+
assert.NoError(t, tp.Shutdown())
64+
65+
mp, err = factory.CreateMetricsProcessor(zap.NewNop(), exportertest.NewNopMetricsExporter(), cfg)
66+
assert.NoError(t, err)
67+
assert.NotNil(t, mp)
68+
assert.NoError(t, mp.Shutdown())
69+
}

0 commit comments

Comments
 (0)