Skip to content

Commit 4964cd8

Browse files
[extension/storage/filestorage] Add CleanupOnStart flag for compaction temporary files (#32863)
**Description:** This PR includes a new flag **cleanup_on_start** for the compaction section. During compaction a copy of the database is created, when the process is unexpectedly terminated that temporary file is not removed. That could lead to disk exhaustion given the following scenario: - Process is killed with a big database to be compacted - Compaction is enabled on start - Process will take longer to compact than the allotted time for the collector to reply health checks (see: #32456) - Process is killed while compacting - Big temporary file left This mitigates the potential risk of those temporary files left in a short period of time, by this scenario or similar ones. **Testing:** Included corner case where two instances of the extensions are spawned and one is compacting while the other would attempt to cleanup. **Documentation:** Included description in the README of the new configuration flag --------- Co-authored-by: Daniel Jaglowski <[email protected]>
1 parent d66c0dc commit 4964cd8

File tree

9 files changed

+113
-2
lines changed

9 files changed

+113
-2
lines changed
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: extension/storage/filestorage
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: New flag cleanup_on_start for the compaction section (default=false).
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [32863]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: |
19+
It will remove all temporary files in the compaction directory (those which start with `tempdb`),
20+
temp files will be left if a previous run of the process is killed while compacting.
21+
22+
# If your change doesn't affect end users or the exported elements of any package,
23+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
24+
# Optional: The change log or logs in which this entry should be included.
25+
# e.g. '[user]' or '[user, api]'
26+
# Include 'user' if the change is relevant to end users.
27+
# Include 'api' if there is a change to a library API.
28+
# Default: '[user]'
29+
change_logs: [user]

extension/storage/filestorage/README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@ The default timeout is `1s`.
3434
`compaction.max_transaction_size` (default: 65536): defines maximum size of the compaction transaction.
3535
A value of zero will ignore transaction sizes.
3636

37+
`compaction.cleanup_on_start` (default: false) - specifies if removal of compaction temporary files is performed on start.
38+
It will remove all temporary files in the compaction directory (those which start with `tempdb`),
39+
temp files will be left if a previous run of the process is killed while compacting.
40+
3741
### Rebound (online) compaction
3842

3943
For rebound compaction, there are two additional parameters available:

extension/storage/filestorage/client.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ import (
2020
var defaultBucket = []byte(`default`)
2121

2222
const (
23+
TempDbPrefix = "tempdb"
24+
2325
elapsedKey = "elapsed"
2426
directoryKey = "directory"
2527
tempDirectoryKey = "tempDirectory"
@@ -152,7 +154,7 @@ func (c *fileStorageClient) Compact(compactionDirectory string, timeout time.Dur
152154
var compactedDb *bbolt.DB
153155

154156
// create temporary file in compactionDirectory
155-
file, err = os.CreateTemp(compactionDirectory, "tempdb")
157+
file, err = os.CreateTemp(compactionDirectory, TempDbPrefix)
156158
if err != nil {
157159
return err
158160
}

extension/storage/filestorage/config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,10 @@ type CompactionConfig struct {
4545
MaxTransactionSize int64 `mapstructure:"max_transaction_size,omitempty"`
4646
// CheckInterval specifies frequency of compaction check
4747
CheckInterval time.Duration `mapstructure:"check_interval,omitempty"`
48+
// CleanupOnStart specifies removal of temporary files is performed on start.
49+
// It will remove all the files in the compaction directory starting with tempdb,
50+
// temp files will be left if a previous run of the process is killed while compacting.
51+
CleanupOnStart bool `mapstructure:"cleanup_on_start,omitempty"`
4852
}
4953

5054
func (cfg *Config) Validate() error {

extension/storage/filestorage/config_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ func TestLoadConfig(t *testing.T) {
4545
ReboundTriggerThresholdMiB: 16,
4646
ReboundNeededThresholdMiB: 128,
4747
CheckInterval: time.Second * 5,
48+
CleanupOnStart: true,
4849
},
4950
Timeout: 2 * time.Second,
5051
FSync: true,

extension/storage/filestorage/extension.go

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@ package filestorage // import "github.com/open-telemetry/opentelemetry-collector
55

66
import (
77
"context"
8+
"errors"
89
"fmt"
10+
"os"
911
"path/filepath"
1012
"strings"
1113

@@ -40,8 +42,11 @@ func newLocalFileStorage(logger *zap.Logger, config *Config) (extension.Extensio
4042
}, nil
4143
}
4244

43-
// Start does nothing
45+
// Start runs cleanup if configured
4446
func (lfs *localFileStorage) Start(context.Context, component.Host) error {
47+
if lfs.cfg.Compaction.CleanupOnStart {
48+
return lfs.cleanup(lfs.cfg.Compaction.Directory)
49+
}
4550
return nil
4651
}
4752

@@ -135,3 +140,30 @@ func isSafe(character rune) bool {
135140
}
136141
return false
137142
}
143+
144+
// cleanup left compaction temporary files from previous killed process
145+
func (lfs *localFileStorage) cleanup(compactionDirectory string) error {
146+
pattern := filepath.Join(compactionDirectory, fmt.Sprintf("%s*", TempDbPrefix))
147+
contents, err := filepath.Glob(pattern)
148+
if err != nil {
149+
lfs.logger.Info("cleanup error listing temporary files",
150+
zap.Error(err))
151+
return err
152+
}
153+
154+
var errs []error
155+
for _, item := range contents {
156+
err = os.Remove(item)
157+
if err == nil {
158+
lfs.logger.Debug("cleanup",
159+
zap.String("deletedFile", item))
160+
} else {
161+
errs = append(errs, err)
162+
}
163+
}
164+
if errs != nil {
165+
lfs.logger.Info("cleanup errors",
166+
zap.Error(errors.Join(errs...)))
167+
}
168+
return nil
169+
}

extension/storage/filestorage/extension_test.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/stretchr/testify/assert"
1515
"github.com/stretchr/testify/require"
1616
"go.opentelemetry.io/collector/component"
17+
"go.opentelemetry.io/collector/component/componenttest"
1718
"go.opentelemetry.io/collector/extension/experimental/storage"
1819
"go.opentelemetry.io/collector/extension/extensiontest"
1920
)
@@ -448,3 +449,39 @@ func TestCompactionRemoveTemp(t *testing.T) {
448449
require.NoError(t, err)
449450
require.Equal(t, 0, len(files))
450451
}
452+
453+
func TestCleanupOnStart(t *testing.T) {
454+
ctx := context.Background()
455+
456+
tempDir := t.TempDir()
457+
// simulate left temporary compaction file from killed process
458+
temp, _ := os.CreateTemp(tempDir, TempDbPrefix)
459+
temp.Close()
460+
461+
f := NewFactory()
462+
cfg := f.CreateDefaultConfig().(*Config)
463+
cfg.Directory = tempDir
464+
cfg.Compaction.Directory = tempDir
465+
cfg.Compaction.CleanupOnStart = true
466+
extension, err := f.CreateExtension(context.Background(), extensiontest.NewNopCreateSettings(), cfg)
467+
require.NoError(t, err)
468+
469+
se, ok := extension.(storage.Extension)
470+
require.True(t, ok)
471+
require.NoError(t, se.Start(ctx, componenttest.NewNopHost()))
472+
473+
client, err := se.GetClient(
474+
ctx,
475+
component.KindReceiver,
476+
newTestEntity("my_component"),
477+
"",
478+
)
479+
require.NoError(t, err)
480+
t.Cleanup(func() {
481+
require.NoError(t, client.Close(ctx))
482+
})
483+
484+
files, err := os.ReadDir(tempDir)
485+
require.NoError(t, err)
486+
require.Equal(t, 1, len(files))
487+
}

extension/storage/filestorage/factory.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ func createDefaultConfig() component.Config {
4545
ReboundNeededThresholdMiB: defaultReboundNeededThresholdMib,
4646
ReboundTriggerThresholdMiB: defaultReboundTriggerThresholdMib,
4747
CheckInterval: defaultCompactionInterval,
48+
CleanupOnStart: false,
4849
},
4950
Timeout: time.Second,
5051
FSync: false,

extension/storage/filestorage/testdata/config.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,6 @@ file_storage/all_settings:
1212
rebound_trigger_threshold_mib: 16
1313
rebound_needed_threshold_mib: 128
1414
max_transaction_size: 2048
15+
cleanup_on_start: true
1516
timeout: 2s
1617
fsync: true

0 commit comments

Comments
 (0)