Skip to content

Commit 6e02e73

Browse files
natburdjaglowski
andauthored
[extension/storage/filestorage] Add bbolt FSync as a config option (#27459)
Description: Exposes bbolt fsync as a configuration option Link to tracking Issue: [20266](#20266) Testing: Manual Testing, Updated unit tests for factory and client Documentation: Added change-log and documentation comments in config.go --------- Co-authored-by: Daniel Jaglowski <[email protected]>
1 parent c9546e2 commit 6e02e73

File tree

10 files changed

+52
-24
lines changed

10 files changed

+52
-24
lines changed
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: 'enhancement'
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: 'entension/storage/filestorage'
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: 'Add support for setting bbolt fsync option'
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [20266]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:

extension/storage/filestorage/README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ The default directory is `%ProgramData%\Otelcol\FileStorage` on Windows and `/va
2525
`timeout` is the maximum time to wait for a file lock. This value does not need to be modified in most circumstances.
2626
The default timeout is `1s`.
2727

28+
`fsync` when set, will force the database to perform an fsync after each write. This helps to ensure database integretity if there is an interruption to the database process, but at the cost of performance. See [DB.NoSync](https://pkg.go.dev/go.etcd.io/bbolt#DB) for more information.
29+
2830
## Compaction
2931
`compaction` defines how and when files should be compacted. There are two modes of compaction available (both of which can be set concurrently):
3032
- `compaction.on_start` (default: false), which happens when collector starts
@@ -78,6 +80,7 @@ extensions:
7880
on_start: true
7981
directory: /tmp/
8082
max_transaction_size: 65_536
83+
fsync: false
8184
8285
service:
8386
extensions: [file_storage, file_storage/all_settings]

extension/storage/filestorage/client.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,17 +37,17 @@ type fileStorageClient struct {
3737
closed bool
3838
}
3939

40-
func bboltOptions(timeout time.Duration) *bbolt.Options {
40+
func bboltOptions(timeout time.Duration, fSync bool) *bbolt.Options {
4141
return &bbolt.Options{
4242
Timeout: timeout,
43-
NoSync: true,
43+
NoSync: !fSync,
4444
NoFreelistSync: true,
4545
FreelistType: bbolt.FreelistMapType,
4646
}
4747
}
4848

49-
func newClient(logger *zap.Logger, filePath string, timeout time.Duration, compactionCfg *CompactionConfig) (*fileStorageClient, error) {
50-
options := bboltOptions(timeout)
49+
func newClient(logger *zap.Logger, filePath string, timeout time.Duration, compactionCfg *CompactionConfig, fSync bool) (*fileStorageClient, error) {
50+
options := bboltOptions(timeout, fSync)
5151
db, err := bbolt.Open(filePath, 0600, options)
5252
if err != nil {
5353
return nil, err
@@ -172,7 +172,7 @@ func (c *fileStorageClient) Compact(compactionDirectory string, timeout time.Dur
172172
}()
173173

174174
// use temporary file as compaction target
175-
options := bboltOptions(timeout)
175+
options := bboltOptions(timeout, false)
176176

177177
c.compactionMutex.Lock()
178178
defer c.compactionMutex.Unlock()

extension/storage/filestorage/client_test.go

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import (
2121
func TestClientOperations(t *testing.T) {
2222
dbFile := filepath.Join(t.TempDir(), "my_db")
2323

24-
client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{})
24+
client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false)
2525
require.NoError(t, err)
2626
t.Cleanup(func() {
2727
require.NoError(t, client.Close(context.TODO()))
@@ -59,7 +59,7 @@ func TestClientBatchOperations(t *testing.T) {
5959
tempDir := t.TempDir()
6060
dbFile := filepath.Join(tempDir, "my_db")
6161

62-
client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{})
62+
client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false)
6363
require.NoError(t, err)
6464
t.Cleanup(func() {
6565
require.NoError(t, client.Close(context.TODO()))
@@ -180,7 +180,7 @@ func TestNewClientTransactionErrors(t *testing.T) {
180180
tempDir := t.TempDir()
181181
dbFile := filepath.Join(tempDir, "my_db")
182182

183-
client, err := newClient(zap.NewNop(), dbFile, timeout, &CompactionConfig{})
183+
client, err := newClient(zap.NewNop(), dbFile, timeout, &CompactionConfig{}, false)
184184
require.NoError(t, err)
185185
t.Cleanup(func() {
186186
require.NoError(t, client.Close(context.TODO()))
@@ -204,7 +204,7 @@ func TestNewClientErrorsOnInvalidBucket(t *testing.T) {
204204
tempDir := t.TempDir()
205205
dbFile := filepath.Join(tempDir, "my_db")
206206

207-
client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{})
207+
client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false)
208208
require.Error(t, err)
209209
require.Nil(t, client)
210210

@@ -259,7 +259,7 @@ func TestClientReboundCompaction(t *testing.T) {
259259
CheckInterval: checkInterval,
260260
ReboundNeededThresholdMiB: testCase.reboundNeededThresholdMiB,
261261
ReboundTriggerThresholdMiB: testCase.reboundTriggerThresholdMiB,
262-
})
262+
}, false)
263263
require.NoError(t, err)
264264
t.Cleanup(func() {
265265
require.NoError(t, client.Close(context.TODO()))
@@ -348,7 +348,7 @@ func TestClientConcurrentCompaction(t *testing.T) {
348348
CheckInterval: stepInterval * 2,
349349
ReboundNeededThresholdMiB: 1,
350350
ReboundTriggerThresholdMiB: 5,
351-
})
351+
}, false)
352352
require.NoError(t, err)
353353

354354
t.Cleanup(func() {
@@ -408,7 +408,7 @@ func BenchmarkClientGet(b *testing.B) {
408408
tempDir := b.TempDir()
409409
dbFile := filepath.Join(tempDir, "my_db")
410410

411-
client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{})
411+
client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false)
412412
require.NoError(b, err)
413413
b.Cleanup(func() {
414414
require.NoError(b, client.Close(context.TODO()))
@@ -428,7 +428,7 @@ func BenchmarkClientGet100(b *testing.B) {
428428
tempDir := b.TempDir()
429429
dbFile := filepath.Join(tempDir, "my_db")
430430

431-
client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{})
431+
client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false)
432432
require.NoError(b, err)
433433
b.Cleanup(func() {
434434
require.NoError(b, client.Close(context.TODO()))
@@ -451,7 +451,7 @@ func BenchmarkClientSet(b *testing.B) {
451451
tempDir := b.TempDir()
452452
dbFile := filepath.Join(tempDir, "my_db")
453453

454-
client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{})
454+
client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false)
455455
require.NoError(b, err)
456456
b.Cleanup(func() {
457457
require.NoError(b, client.Close(context.TODO()))
@@ -471,7 +471,7 @@ func BenchmarkClientSet100(b *testing.B) {
471471
tempDir := b.TempDir()
472472
dbFile := filepath.Join(tempDir, "my_db")
473473

474-
client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{})
474+
client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false)
475475
require.NoError(b, err)
476476
b.Cleanup(func() {
477477
require.NoError(b, client.Close(context.TODO()))
@@ -493,7 +493,7 @@ func BenchmarkClientDelete(b *testing.B) {
493493
tempDir := b.TempDir()
494494
dbFile := filepath.Join(tempDir, "my_db")
495495

496-
client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{})
496+
client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false)
497497
require.NoError(b, err)
498498
b.Cleanup(func() {
499499
require.NoError(b, client.Close(context.TODO()))
@@ -519,7 +519,7 @@ func BenchmarkClientSetLargeDB(b *testing.B) {
519519
tempDir := b.TempDir()
520520
dbFile := filepath.Join(tempDir, "my_db")
521521

522-
client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{})
522+
client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false)
523523
require.NoError(b, err)
524524
b.Cleanup(func() {
525525
require.NoError(b, client.Close(context.TODO()))
@@ -556,7 +556,7 @@ func BenchmarkClientInitLargeDB(b *testing.B) {
556556
tempDir := b.TempDir()
557557
dbFile := filepath.Join(tempDir, "my_db")
558558

559-
client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{})
559+
client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false)
560560
require.NoError(b, err)
561561
b.Cleanup(func() {
562562
require.NoError(b, client.Close(context.TODO()))
@@ -575,7 +575,7 @@ func BenchmarkClientInitLargeDB(b *testing.B) {
575575
var tempClient *fileStorageClient
576576
b.ResetTimer()
577577
for n := 0; n < b.N; n++ {
578-
tempClient, err = newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{})
578+
tempClient, err = newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false)
579579
require.NoError(b, err)
580580
b.StopTimer()
581581
err = tempClient.Close(ctx)
@@ -593,7 +593,7 @@ func BenchmarkClientCompactLargeDBFile(b *testing.B) {
593593
tempDir := b.TempDir()
594594
dbFile := filepath.Join(tempDir, "my_db")
595595

596-
client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{})
596+
client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false)
597597
require.NoError(b, err)
598598
b.Cleanup(func() {
599599
require.NoError(b, client.Close(context.TODO()))
@@ -620,7 +620,7 @@ func BenchmarkClientCompactLargeDBFile(b *testing.B) {
620620
testDbFile := filepath.Join(tempDir, fmt.Sprintf("my_db%d", n))
621621
err = os.Link(dbFile, testDbFile)
622622
require.NoError(b, err)
623-
client, err = newClient(zap.NewNop(), testDbFile, time.Second, &CompactionConfig{})
623+
client, err = newClient(zap.NewNop(), testDbFile, time.Second, &CompactionConfig{}, false)
624624
require.NoError(b, err)
625625
b.StartTimer()
626626
require.NoError(b, client.Compact(tempDir, time.Second, 65536))
@@ -637,7 +637,7 @@ func BenchmarkClientCompactDb(b *testing.B) {
637637
tempDir := b.TempDir()
638638
dbFile := filepath.Join(tempDir, "my_db")
639639

640-
client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{})
640+
client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false)
641641
require.NoError(b, err)
642642
b.Cleanup(func() {
643643
require.NoError(b, client.Close(context.TODO()))
@@ -664,7 +664,7 @@ func BenchmarkClientCompactDb(b *testing.B) {
664664
testDbFile := filepath.Join(tempDir, fmt.Sprintf("my_db%d", n))
665665
err = os.Link(dbFile, testDbFile)
666666
require.NoError(b, err)
667-
client, err = newClient(zap.NewNop(), testDbFile, time.Second, &CompactionConfig{})
667+
client, err = newClient(zap.NewNop(), testDbFile, time.Second, &CompactionConfig{}, false)
668668
require.NoError(b, err)
669669
b.StartTimer()
670670
require.NoError(b, client.Compact(tempDir, time.Second, 65536))

extension/storage/filestorage/config.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ type Config struct {
1717
Timeout time.Duration `mapstructure:"timeout,omitempty"`
1818

1919
Compaction *CompactionConfig `mapstructure:"compaction,omitempty"`
20+
21+
// FSync specifies that fsync should be called after each database write
22+
FSync bool `mapstructure:"fsync,omitempty"`
2023
}
2124

2225
// CompactionConfig defines configuration for optional file storage compaction.

extension/storage/filestorage/config_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ func TestLoadConfig(t *testing.T) {
4747
CheckInterval: time.Second * 5,
4848
},
4949
Timeout: 2 * time.Second,
50+
FSync: true,
5051
},
5152
},
5253
}

extension/storage/filestorage/extension.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ func (lfs *localFileStorage) GetClient(_ context.Context, kind component.Kind, e
6464
rawName = sanitize(rawName)
6565
}
6666
absoluteName := filepath.Join(lfs.cfg.Directory, rawName)
67-
client, err := newClient(lfs.logger, absoluteName, lfs.cfg.Timeout, lfs.cfg.Compaction)
67+
client, err := newClient(lfs.logger, absoluteName, lfs.cfg.Timeout, lfs.cfg.Compaction, lfs.cfg.FSync)
6868

6969
if err != nil {
7070
return nil, err

extension/storage/filestorage/factory.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ func createDefaultConfig() component.Config {
4747
CheckInterval: defaultCompactionInterval,
4848
},
4949
Timeout: time.Second,
50+
FSync: false,
5051
}
5152
}
5253

extension/storage/filestorage/factory_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ func TestFactory(t *testing.T) {
2929
require.Equal(t, expected, cfg.Directory)
3030
}
3131
require.Equal(t, time.Second, cfg.Timeout)
32+
require.Equal(t, false, cfg.FSync)
3233

3334
tests := []struct {
3435
name string

extension/storage/filestorage/testdata/config.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,4 @@ file_storage/all_settings:
1313
rebound_needed_threshold_mib: 128
1414
max_transaction_size: 2048
1515
timeout: 2s
16+
fsync: true

0 commit comments

Comments
 (0)