Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions .chloggen/feat_fsync_option_filestorage.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'enhancement'

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: 'entension/storage/filestorage'

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: 'Add support for setting bbolt fsync option'

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [20266]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
10 changes: 5 additions & 5 deletions extension/storage/filestorage/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,17 @@ type fileStorageClient struct {
closed bool
}

func bboltOptions(timeout time.Duration) *bbolt.Options {
func bboltOptions(timeout time.Duration, noSync bool) *bbolt.Options {
return &bbolt.Options{
Timeout: timeout,
NoSync: true,
NoSync: noSync,
NoFreelistSync: true,
FreelistType: bbolt.FreelistMapType,
}
}

func newClient(logger *zap.Logger, filePath string, timeout time.Duration, compactionCfg *CompactionConfig) (*fileStorageClient, error) {
options := bboltOptions(timeout)
func newClient(logger *zap.Logger, filePath string, timeout time.Duration, compactionCfg *CompactionConfig, fSync bool) (*fileStorageClient, error) {
options := bboltOptions(timeout, !fSync)
db, err := bbolt.Open(filePath, 0600, options)
if err != nil {
return nil, err
Expand Down Expand Up @@ -172,7 +172,7 @@ func (c *fileStorageClient) Compact(compactionDirectory string, timeout time.Dur
}()

// use temporary file as compaction target
options := bboltOptions(timeout)
options := bboltOptions(timeout, true)

c.compactionMutex.Lock()
defer c.compactionMutex.Unlock()
Expand Down
36 changes: 18 additions & 18 deletions extension/storage/filestorage/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
func TestClientOperations(t *testing.T) {
dbFile := filepath.Join(t.TempDir(), "my_db")

client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{})
client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false)
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, client.Close(context.TODO()))
Expand Down Expand Up @@ -59,7 +59,7 @@ func TestClientBatchOperations(t *testing.T) {
tempDir := t.TempDir()
dbFile := filepath.Join(tempDir, "my_db")

client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{})
client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false)
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, client.Close(context.TODO()))
Expand Down Expand Up @@ -180,7 +180,7 @@ func TestNewClientTransactionErrors(t *testing.T) {
tempDir := t.TempDir()
dbFile := filepath.Join(tempDir, "my_db")

client, err := newClient(zap.NewNop(), dbFile, timeout, &CompactionConfig{})
client, err := newClient(zap.NewNop(), dbFile, timeout, &CompactionConfig{}, false)
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, client.Close(context.TODO()))
Expand All @@ -204,7 +204,7 @@ func TestNewClientErrorsOnInvalidBucket(t *testing.T) {
tempDir := t.TempDir()
dbFile := filepath.Join(tempDir, "my_db")

client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{})
client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false)
require.Error(t, err)
require.Nil(t, client)

Expand Down Expand Up @@ -259,7 +259,7 @@ func TestClientReboundCompaction(t *testing.T) {
CheckInterval: checkInterval,
ReboundNeededThresholdMiB: testCase.reboundNeededThresholdMiB,
ReboundTriggerThresholdMiB: testCase.reboundTriggerThresholdMiB,
})
}, false)
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, client.Close(context.TODO()))
Expand Down Expand Up @@ -348,7 +348,7 @@ func TestClientConcurrentCompaction(t *testing.T) {
CheckInterval: stepInterval * 2,
ReboundNeededThresholdMiB: 1,
ReboundTriggerThresholdMiB: 5,
})
}, false)
require.NoError(t, err)

t.Cleanup(func() {
Expand Down Expand Up @@ -408,7 +408,7 @@ func BenchmarkClientGet(b *testing.B) {
tempDir := b.TempDir()
dbFile := filepath.Join(tempDir, "my_db")

client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{})
client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false)
require.NoError(b, err)
b.Cleanup(func() {
require.NoError(b, client.Close(context.TODO()))
Expand All @@ -428,7 +428,7 @@ func BenchmarkClientGet100(b *testing.B) {
tempDir := b.TempDir()
dbFile := filepath.Join(tempDir, "my_db")

client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{})
client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false)
require.NoError(b, err)
b.Cleanup(func() {
require.NoError(b, client.Close(context.TODO()))
Expand All @@ -451,7 +451,7 @@ func BenchmarkClientSet(b *testing.B) {
tempDir := b.TempDir()
dbFile := filepath.Join(tempDir, "my_db")

client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{})
client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false)
require.NoError(b, err)
b.Cleanup(func() {
require.NoError(b, client.Close(context.TODO()))
Expand All @@ -471,7 +471,7 @@ func BenchmarkClientSet100(b *testing.B) {
tempDir := b.TempDir()
dbFile := filepath.Join(tempDir, "my_db")

client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{})
client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false)
require.NoError(b, err)
b.Cleanup(func() {
require.NoError(b, client.Close(context.TODO()))
Expand All @@ -493,7 +493,7 @@ func BenchmarkClientDelete(b *testing.B) {
tempDir := b.TempDir()
dbFile := filepath.Join(tempDir, "my_db")

client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{})
client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false)
require.NoError(b, err)
b.Cleanup(func() {
require.NoError(b, client.Close(context.TODO()))
Expand All @@ -519,7 +519,7 @@ func BenchmarkClientSetLargeDB(b *testing.B) {
tempDir := b.TempDir()
dbFile := filepath.Join(tempDir, "my_db")

client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{})
client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false)
require.NoError(b, err)
b.Cleanup(func() {
require.NoError(b, client.Close(context.TODO()))
Expand Down Expand Up @@ -556,7 +556,7 @@ func BenchmarkClientInitLargeDB(b *testing.B) {
tempDir := b.TempDir()
dbFile := filepath.Join(tempDir, "my_db")

client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{})
client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false)
require.NoError(b, err)
b.Cleanup(func() {
require.NoError(b, client.Close(context.TODO()))
Expand All @@ -575,7 +575,7 @@ func BenchmarkClientInitLargeDB(b *testing.B) {
var tempClient *fileStorageClient
b.ResetTimer()
for n := 0; n < b.N; n++ {
tempClient, err = newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{})
tempClient, err = newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false)
require.NoError(b, err)
b.StopTimer()
err = tempClient.Close(ctx)
Expand All @@ -593,7 +593,7 @@ func BenchmarkClientCompactLargeDBFile(b *testing.B) {
tempDir := b.TempDir()
dbFile := filepath.Join(tempDir, "my_db")

client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{})
client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false)
require.NoError(b, err)
b.Cleanup(func() {
require.NoError(b, client.Close(context.TODO()))
Expand All @@ -620,7 +620,7 @@ func BenchmarkClientCompactLargeDBFile(b *testing.B) {
testDbFile := filepath.Join(tempDir, fmt.Sprintf("my_db%d", n))
err = os.Link(dbFile, testDbFile)
require.NoError(b, err)
client, err = newClient(zap.NewNop(), testDbFile, time.Second, &CompactionConfig{})
client, err = newClient(zap.NewNop(), testDbFile, time.Second, &CompactionConfig{}, false)
require.NoError(b, err)
b.StartTimer()
require.NoError(b, client.Compact(tempDir, time.Second, 65536))
Expand All @@ -637,7 +637,7 @@ func BenchmarkClientCompactDb(b *testing.B) {
tempDir := b.TempDir()
dbFile := filepath.Join(tempDir, "my_db")

client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{})
client, err := newClient(zap.NewNop(), dbFile, time.Second, &CompactionConfig{}, false)
require.NoError(b, err)
b.Cleanup(func() {
require.NoError(b, client.Close(context.TODO()))
Expand All @@ -664,7 +664,7 @@ func BenchmarkClientCompactDb(b *testing.B) {
testDbFile := filepath.Join(tempDir, fmt.Sprintf("my_db%d", n))
err = os.Link(dbFile, testDbFile)
require.NoError(b, err)
client, err = newClient(zap.NewNop(), testDbFile, time.Second, &CompactionConfig{})
client, err = newClient(zap.NewNop(), testDbFile, time.Second, &CompactionConfig{}, false)
require.NoError(b, err)
b.StartTimer()
require.NoError(b, client.Compact(tempDir, time.Second, 65536))
Expand Down
3 changes: 3 additions & 0 deletions extension/storage/filestorage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ type Config struct {
Timeout time.Duration `mapstructure:"timeout,omitempty"`

Compaction *CompactionConfig `mapstructure:"compaction,omitempty"`

// FSync specifies that fsync should be called after each database write
FSync bool `mapstructure:"fsync,omitempty"`
}

// CompactionConfig defines configuration for optional file storage compaction.
Expand Down
1 change: 1 addition & 0 deletions extension/storage/filestorage/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func TestLoadConfig(t *testing.T) {
CheckInterval: time.Second * 5,
},
Timeout: 2 * time.Second,
FSync: true,
},
},
}
Expand Down
2 changes: 1 addition & 1 deletion extension/storage/filestorage/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (lfs *localFileStorage) GetClient(_ context.Context, kind component.Kind, e
rawName = sanitize(rawName)
}
absoluteName := filepath.Join(lfs.cfg.Directory, rawName)
client, err := newClient(lfs.logger, absoluteName, lfs.cfg.Timeout, lfs.cfg.Compaction)
client, err := newClient(lfs.logger, absoluteName, lfs.cfg.Timeout, lfs.cfg.Compaction, lfs.cfg.FSync)

if err != nil {
return nil, err
Expand Down
1 change: 1 addition & 0 deletions extension/storage/filestorage/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func createDefaultConfig() component.Config {
CheckInterval: defaultCompactionInterval,
},
Timeout: time.Second,
FSync: false,
}
}

Expand Down
1 change: 1 addition & 0 deletions extension/storage/filestorage/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func TestFactory(t *testing.T) {
require.Equal(t, expected, cfg.Directory)
}
require.Equal(t, time.Second, cfg.Timeout)
require.Equal(t, false, cfg.FSync)

tests := []struct {
name string
Expand Down
1 change: 1 addition & 0 deletions extension/storage/filestorage/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ file_storage/all_settings:
rebound_needed_threshold_mib: 128
max_transaction_size: 2048
timeout: 2s
fsync: true