Skip to content

Commit ac31de1

Browse files
committed
Add local file storage extension
1 parent 80eb4fc commit ac31de1

38 files changed

+1397
-93
lines changed

cmd/otelcontribcol/components.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ import (
4646
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/httpforwarder"
4747
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/hostobserver"
4848
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/k8sobserver"
49+
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage/filestorage"
4950
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/groupbyattrsprocessor"
5051
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/groupbytraceprocessor"
5152
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sprocessor"
@@ -91,6 +92,7 @@ func components() (component.Factories, error) {
9192
hostobserver.NewFactory(),
9293
httpforwarder.NewFactory(),
9394
k8sobserver.NewFactory(),
95+
filestorage.NewFactory(),
9496
}
9597

9698
for _, ext := range factories.Extensions {

extension/storage/README.md

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,3 @@ Set(string, []byte) error
1414
Delete(string) error
1515
```
1616
Note: All methods should return error only if a problem occurred. (For example, if a file is no longer accessible, or if a remote service is unavailable.)
17-
18-
# TODO Sample code
19-
- Document component expectations
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
include ../../Makefile.Common
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
# File Storage
2+
3+
The File Storage extension can persist state to the local file system.
4+
5+
The extension requires read and write access to a directory. A default directory can be used, but it must already exist in order for the extension to operate.
6+
7+
`directory` is the relative or absolute path to the dedicated data storage directory.
8+
9+
`timeout` is the maximum time to wait for a file lock. This value does not need to be modified in most circumstances.
10+
11+
12+
```
13+
extensions:
14+
file_storage:
15+
file_storage/all_settings:
16+
directory: /var/lib/otelcol/mydir
17+
timeout: 1s
18+
19+
service:
20+
extensions: [file_storage, file_storage/all_settings]
21+
pipelines:
22+
traces:
23+
receivers: [nop]
24+
processors: [nop]
25+
exporters: [nop]
26+
27+
# Data pipeline is required to load the config.
28+
receivers:
29+
nop:
30+
processors:
31+
nop:
32+
exporters:
33+
nop:
34+
```
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package filestorage
16+
17+
import (
18+
"context"
19+
"errors"
20+
"time"
21+
22+
"go.etcd.io/bbolt"
23+
)
24+
25+
var defaultBucket = []byte(`default`)
26+
27+
type fileStorageClient struct {
28+
db *bbolt.DB
29+
}
30+
31+
func newClient(filePath string, timeout time.Duration) (*fileStorageClient, error) {
32+
options := &bbolt.Options{
33+
Timeout: timeout,
34+
NoSync: true,
35+
}
36+
db, err := bbolt.Open(filePath, 0600, options)
37+
if err != nil {
38+
return nil, err
39+
}
40+
41+
initBucket := func(tx *bbolt.Tx) error {
42+
_, err := tx.CreateBucketIfNotExists(defaultBucket)
43+
return err
44+
}
45+
if err := db.Update(initBucket); err != nil {
46+
return nil, err
47+
}
48+
49+
return &fileStorageClient{db}, nil
50+
}
51+
52+
// Get will retrieve data from storage that corresponds to the specified key
53+
func (c *fileStorageClient) Get(_ context.Context, key string) ([]byte, error) {
54+
var result []byte
55+
get := func(tx *bbolt.Tx) error {
56+
bucket := tx.Bucket(defaultBucket)
57+
if bucket == nil {
58+
return errors.New("storage not initialized")
59+
}
60+
result = bucket.Get([]byte(key))
61+
return nil // no error
62+
}
63+
64+
if err := c.db.Update(get); err != nil {
65+
return nil, err
66+
}
67+
return result, nil
68+
}
69+
70+
// Set will store data. The data can be retrieved using the same key
71+
func (c *fileStorageClient) Set(_ context.Context, key string, value []byte) error {
72+
set := func(tx *bbolt.Tx) error {
73+
bucket := tx.Bucket(defaultBucket)
74+
if bucket == nil {
75+
return errors.New("storage not initialized")
76+
}
77+
return bucket.Put([]byte(key), value)
78+
}
79+
80+
return c.db.Update(set)
81+
}
82+
83+
// Delete will delete data associated with the specified key
84+
func (c *fileStorageClient) Delete(_ context.Context, key string) error {
85+
delete := func(tx *bbolt.Tx) error {
86+
bucket := tx.Bucket(defaultBucket)
87+
if bucket == nil {
88+
return errors.New("storage not initialized")
89+
}
90+
return bucket.Delete([]byte(key))
91+
}
92+
93+
return c.db.Update(delete)
94+
}
95+
96+
// Close will close the database
97+
func (c *fileStorageClient) close() error {
98+
return c.db.Close()
99+
}
Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package filestorage
16+
17+
import (
18+
"context"
19+
"io/ioutil"
20+
"os"
21+
"path/filepath"
22+
"testing"
23+
"time"
24+
25+
"github.com/stretchr/testify/require"
26+
"go.etcd.io/bbolt"
27+
)
28+
29+
func TestClientOperations(t *testing.T) {
30+
tempDir := newTempDir(t)
31+
dbFile := filepath.Join(tempDir, "my_db")
32+
33+
client, err := newClient(dbFile, time.Second)
34+
require.NoError(t, err)
35+
36+
ctx := context.Background()
37+
testKey := "testKey"
38+
testValue := []byte("testValue")
39+
40+
// Make sure nothing is there
41+
value, err := client.Get(ctx, testKey)
42+
require.NoError(t, err)
43+
require.Nil(t, value)
44+
45+
// Set it
46+
err = client.Set(ctx, testKey, testValue)
47+
require.NoError(t, err)
48+
49+
// Get it back out, make sure it's right
50+
value, err = client.Get(ctx, testKey)
51+
require.NoError(t, err)
52+
require.Equal(t, testValue, value)
53+
54+
// Delete it
55+
err = client.Delete(ctx, testKey)
56+
require.NoError(t, err)
57+
58+
// Make sure it's gone
59+
value, err = client.Get(ctx, testKey)
60+
require.NoError(t, err)
61+
require.Nil(t, value)
62+
}
63+
64+
func TestNewClientTransactionErrors(t *testing.T) {
65+
timeout := 100 * time.Millisecond
66+
67+
testKey := "testKey"
68+
testValue := []byte("testValue")
69+
70+
testCases := []struct {
71+
name string
72+
setup func(*bbolt.Tx) error
73+
validate func(*testing.T, *fileStorageClient)
74+
}{
75+
{
76+
name: "get",
77+
setup: func(tx *bbolt.Tx) error {
78+
return tx.DeleteBucket(defaultBucket)
79+
},
80+
validate: func(t *testing.T, c *fileStorageClient) {
81+
value, err := c.Get(context.Background(), testKey)
82+
require.Error(t, err)
83+
require.Equal(t, "storage not initialized", err.Error())
84+
require.Nil(t, value)
85+
},
86+
},
87+
{
88+
name: "set",
89+
setup: func(tx *bbolt.Tx) error {
90+
return tx.DeleteBucket(defaultBucket)
91+
},
92+
validate: func(t *testing.T, c *fileStorageClient) {
93+
err := c.Set(context.Background(), testKey, testValue)
94+
require.Error(t, err)
95+
require.Equal(t, "storage not initialized", err.Error())
96+
},
97+
},
98+
{
99+
name: "delete",
100+
setup: func(tx *bbolt.Tx) error {
101+
return tx.DeleteBucket(defaultBucket)
102+
},
103+
validate: func(t *testing.T, c *fileStorageClient) {
104+
err := c.Delete(context.Background(), testKey)
105+
require.Error(t, err)
106+
require.Equal(t, "storage not initialized", err.Error())
107+
},
108+
},
109+
}
110+
111+
for _, tc := range testCases {
112+
t.Run(tc.name, func(t *testing.T) {
113+
114+
tempDir := newTempDir(t)
115+
dbFile := filepath.Join(tempDir, "my_db")
116+
117+
client, err := newClient(dbFile, timeout)
118+
require.NoError(t, err)
119+
120+
// Create a problem
121+
client.db.Update(tc.setup)
122+
123+
// Validate expected behavior
124+
tc.validate(t, client)
125+
})
126+
}
127+
}
128+
129+
func TestNewClientErrorsOnInvalidBucket(t *testing.T) {
130+
temp := defaultBucket
131+
defaultBucket = nil
132+
133+
tempDir := newTempDir(t)
134+
dbFile := filepath.Join(tempDir, "my_db")
135+
136+
client, err := newClient(dbFile, time.Second)
137+
require.Error(t, err)
138+
require.Nil(t, client)
139+
140+
defaultBucket = temp
141+
}
142+
143+
func BenchmarkClientGet(b *testing.B) {
144+
tempDir := newTempDir(b)
145+
dbFile := filepath.Join(tempDir, "my_db")
146+
147+
client, err := newClient(dbFile, time.Second)
148+
require.NoError(b, err)
149+
150+
ctx := context.Background()
151+
testKey := "testKey"
152+
153+
for n := 0; n < b.N; n++ {
154+
client.Get(ctx, testKey)
155+
}
156+
}
157+
158+
func BenchmarkClientSet(b *testing.B) {
159+
tempDir := newTempDir(b)
160+
dbFile := filepath.Join(tempDir, "my_db")
161+
162+
client, err := newClient(dbFile, time.Second)
163+
require.NoError(b, err)
164+
165+
ctx := context.Background()
166+
testKey := "testKey"
167+
testValue := []byte("testValue")
168+
169+
for n := 0; n < b.N; n++ {
170+
client.Set(ctx, testKey, testValue)
171+
}
172+
}
173+
174+
func BenchmarkClientDelete(b *testing.B) {
175+
tempDir := newTempDir(b)
176+
dbFile := filepath.Join(tempDir, "my_db")
177+
178+
client, err := newClient(dbFile, time.Second)
179+
require.NoError(b, err)
180+
181+
ctx := context.Background()
182+
testKey := "testKey"
183+
184+
for n := 0; n < b.N; n++ {
185+
client.Delete(ctx, testKey)
186+
}
187+
}
188+
189+
func newTempDir(tb testing.TB) string {
190+
tempDir, err := ioutil.TempDir("", "")
191+
require.NoError(tb, err)
192+
tb.Cleanup(func() { os.RemoveAll(tempDir) })
193+
return tempDir
194+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package filestorage
16+
17+
import (
18+
"time"
19+
20+
"go.opentelemetry.io/collector/config"
21+
)
22+
23+
// Config defines configuration for http forwarder extension.
24+
type Config struct {
25+
config.ExtensionSettings `mapstructure:",squash"`
26+
27+
Directory string `mapstructure:"directory,omitempty"`
28+
Timeout time.Duration `mapstructure:"timeout,omitempty"`
29+
}

0 commit comments

Comments
 (0)