Skip to content

Commit d85958a

Browse files
atoulmeevan-bradleycrobert-1
authored
[extension/storage/redis_storage] Add a new Redis extension to store data in transit (#33224)
**Description:** Add a new extension allowing to store data in transit in a Redis cluster. **Link to tracking Issue:** Fixes #31682 **Testing:** Unit tests **Documentation:** README --------- Co-authored-by: Evan Bradley <[email protected]> Co-authored-by: Curtis Robert <[email protected]>
1 parent 5ed0eb5 commit d85958a

23 files changed

+870
-0
lines changed

.chloggen/redis-storage.yaml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: new_component
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: redis_storage
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Adds a new storage extension using Redis to store data in transit
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [31682]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: []

.github/CODEOWNERS

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ extension/solarwindsapmsettingsextension/ @open-teleme
119119
extension/storage/ @open-telemetry/collector-contrib-approvers @dmitryax @atoulme @djaglowski
120120
extension/storage/dbstorage/ @open-telemetry/collector-contrib-approvers @dmitryax @atoulme
121121
extension/storage/filestorage/ @open-telemetry/collector-contrib-approvers @djaglowski
122+
extension/storage/redisstorageextension/ @open-telemetry/collector-contrib-approvers @atoulme
122123
extension/sumologicextension/ @open-telemetry/collector-contrib-approvers @aboguszewski-sumo @kkujawa-sumo @mat-rumian @rnishtala-sumo @sumo-drosiek
123124

124125
internal/aws/ @open-telemetry/collector-contrib-approvers @Aneurysm9 @mxiamxia

.github/ISSUE_TEMPLATE/bug_report.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ body:
117117
- extension/storage
118118
- extension/storage/dbstorage
119119
- extension/storage/filestorage
120+
- extension/storage/redisstorage
120121
- extension/sumologic
121122
- internal/aws
122123
- internal/collectd

.github/ISSUE_TEMPLATE/feature_request.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ body:
111111
- extension/storage
112112
- extension/storage/dbstorage
113113
- extension/storage/filestorage
114+
- extension/storage/redisstorage
114115
- extension/sumologic
115116
- internal/aws
116117
- internal/collectd

.github/ISSUE_TEMPLATE/other.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ body:
111111
- extension/storage
112112
- extension/storage/dbstorage
113113
- extension/storage/filestorage
114+
- extension/storage/redisstorage
114115
- extension/sumologic
115116
- internal/aws
116117
- internal/collectd

.github/ISSUE_TEMPLATE/unmaintained.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ body:
116116
- extension/storage
117117
- extension/storage/dbstorage
118118
- extension/storage/filestorage
119+
- extension/storage/redisstorage
119120
- extension/sumologic
120121
- internal/aws
121122
- internal/collectd
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
include ../../../Makefile.Common
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
# File Storage
2+
3+
<!-- status autogenerated section -->
4+
| Status | |
5+
| ------------- |-----------|
6+
| Stability | [development] |
7+
| Distributions | [] |
8+
| Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Aextension%2Fredisstorage%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Aextension%2Fredisstorage) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Aextension%2Fredisstorage%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Aextension%2Fredisstorage) |
9+
| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@atoulme](https://www.github.com/atoulme) \| Seeking more code owners! |
10+
11+
[development]: https://github.com/open-telemetry/opentelemetry-collector#development
12+
<!-- end autogenerated section -->
13+
14+
The Redis Storage extension can persist state to a Redis cluster.
15+
16+
The extension requires read and write access to a Redis cluster.
17+
18+
## Config
19+
- `endpoint` (required): The endpoint of the redis instance to connect to. Default: `localhost:6379`
20+
- `password` (optional): the password to connect to the redis instance. Default: ``
21+
- `db` (optional): Database to be selected after connecting to the server. Default: 0
22+
- `expiration` (optional): TTL for all storage entries. Default TTL means the key has no expiration time. Default: 0
23+
24+
## Example
25+
26+
```
27+
extensions:
28+
redis_storage:
29+
redis_storage/all_settings:
30+
endpoint: localhost:6379
31+
password: ""
32+
db: 0
33+
expiration: 5m
34+
35+
service:
36+
extensions: [redis_storage, redis_storage/all_settings]
37+
pipelines:
38+
traces:
39+
receivers: [nop]
40+
processors: [nop]
41+
exporters: [nop]
42+
43+
# Data pipeline is required to load the config.
44+
receivers:
45+
nop:
46+
exporters:
47+
nop:
48+
```
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package redisstorageextension // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage/redisstorageextension"
5+
6+
import (
7+
"time"
8+
9+
"go.opentelemetry.io/collector/config/configopaque"
10+
)
11+
12+
// Config defines configuration for the Redis storage extension.
13+
type Config struct {
14+
Endpoint string `mapstructure:"endpoint"`
15+
Password configopaque.String `mapstructure:"password"`
16+
DB int `mapstructure:"db"`
17+
Expiration time.Duration `mapstructure:"expiration"`
18+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package redisstorageextension
5+
6+
import (
7+
"path/filepath"
8+
"testing"
9+
"time"
10+
11+
"github.com/stretchr/testify/assert"
12+
"github.com/stretchr/testify/require"
13+
"go.opentelemetry.io/collector/component"
14+
"go.opentelemetry.io/collector/confmap/confmaptest"
15+
16+
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage/redisstorageextension/internal/metadata"
17+
)
18+
19+
func TestLoadConfig(t *testing.T) {
20+
t.Parallel()
21+
22+
tests := []struct {
23+
id component.ID
24+
expected component.Config
25+
}{
26+
{
27+
id: component.NewID(metadata.Type),
28+
expected: func() component.Config {
29+
ret := NewFactory().CreateDefaultConfig()
30+
ret.(*Config).Endpoint = "localhost:1234"
31+
return ret
32+
}(),
33+
},
34+
{
35+
id: component.NewIDWithName(metadata.Type, "all_settings"),
36+
expected: &Config{
37+
Endpoint: "localhost:1234",
38+
Password: "passwd",
39+
DB: 1,
40+
Expiration: 3 * time.Hour,
41+
},
42+
},
43+
}
44+
for _, tt := range tests {
45+
t.Run(tt.id.String(), func(t *testing.T) {
46+
cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml"))
47+
require.NoError(t, err)
48+
factory := NewFactory()
49+
cfg := factory.CreateDefaultConfig()
50+
sub, err := cm.Sub(tt.id.String())
51+
require.NoError(t, err)
52+
require.NoError(t, sub.Unmarshal(&cfg))
53+
54+
assert.NoError(t, component.ValidateConfig(cfg))
55+
assert.Equal(t, tt.expected, cfg)
56+
})
57+
}
58+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
//go:generate mdatagen metadata.yaml
5+
package redisstorageextension // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage/redisstorageextension"
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package redisstorageextension // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage/redisstorageextension"
5+
6+
import (
7+
"context"
8+
"errors"
9+
"fmt"
10+
"time"
11+
12+
"github.com/redis/go-redis/v9"
13+
"go.opentelemetry.io/collector/component"
14+
"go.opentelemetry.io/collector/extension"
15+
"go.opentelemetry.io/collector/extension/experimental/storage"
16+
"go.uber.org/zap"
17+
)
18+
19+
type redisStorage struct {
20+
cfg *Config
21+
logger *zap.Logger
22+
client *redis.Client
23+
}
24+
25+
// Ensure this storage extension implements the appropriate interface
26+
var _ storage.Extension = (*redisStorage)(nil)
27+
28+
func newRedisStorage(logger *zap.Logger, config *Config) (extension.Extension, error) {
29+
return &redisStorage{
30+
cfg: config,
31+
logger: logger,
32+
}, nil
33+
}
34+
35+
// Start runs cleanup if configured
36+
func (rs *redisStorage) Start(context.Context, component.Host) error {
37+
c := redis.NewClient(&redis.Options{
38+
Addr: rs.cfg.Endpoint,
39+
Password: string(rs.cfg.Password),
40+
DB: rs.cfg.DB,
41+
})
42+
rs.client = c
43+
return nil
44+
}
45+
46+
// Shutdown will close any open databases
47+
func (rs *redisStorage) Shutdown(context.Context) error {
48+
if rs.client == nil {
49+
return nil
50+
}
51+
return rs.client.Close()
52+
}
53+
54+
type redisClient struct {
55+
client *redis.Client
56+
prefix string
57+
expiration time.Duration
58+
}
59+
60+
var _ storage.Client = redisClient{}
61+
62+
func (rc redisClient) Get(ctx context.Context, key string) ([]byte, error) {
63+
b, err := rc.client.Get(ctx, rc.prefix+key).Bytes()
64+
if errors.Is(err, redis.Nil) {
65+
return nil, nil
66+
}
67+
return b, err
68+
}
69+
70+
func (rc redisClient) Set(ctx context.Context, key string, value []byte) error {
71+
_, err := rc.client.Set(ctx, rc.prefix+key, value, rc.expiration).Result()
72+
return err
73+
}
74+
75+
func (rc redisClient) Delete(ctx context.Context, key string) error {
76+
_, err := rc.client.Del(ctx, rc.prefix+key).Result()
77+
return err
78+
}
79+
80+
func (rc redisClient) Batch(ctx context.Context, ops ...storage.Operation) error {
81+
p := rc.client.Pipeline()
82+
for _, op := range ops {
83+
switch op.Type {
84+
case storage.Delete:
85+
p.Del(ctx, op.Key)
86+
case storage.Get:
87+
p.Get(ctx, op.Key)
88+
case storage.Set:
89+
p.Set(ctx, op.Key, op.Value, rc.expiration)
90+
}
91+
}
92+
_, err := p.Exec(ctx)
93+
return err
94+
}
95+
96+
func (rc redisClient) Close(_ context.Context) error {
97+
return nil
98+
}
99+
100+
// GetClient returns a storage client for an individual component
101+
func (rs *redisStorage) GetClient(_ context.Context, kind component.Kind, ent component.ID, name string) (storage.Client, error) {
102+
var rawName string
103+
if name == "" {
104+
rawName = fmt.Sprintf("%s_%s_%s", kindString(kind), ent.Type(), ent.Name())
105+
} else {
106+
rawName = fmt.Sprintf("%s_%s_%s_%s", kindString(kind), ent.Type(), ent.Name(), name)
107+
}
108+
109+
return redisClient{
110+
client: rs.client,
111+
prefix: rawName,
112+
expiration: rs.cfg.Expiration,
113+
}, nil
114+
}
115+
116+
func kindString(k component.Kind) string {
117+
switch k {
118+
case component.KindReceiver:
119+
return "receiver"
120+
case component.KindProcessor:
121+
return "processor"
122+
case component.KindExporter:
123+
return "exporter"
124+
case component.KindExtension:
125+
return "extension"
126+
case component.KindConnector:
127+
return "connector"
128+
default:
129+
return "other" // not expected
130+
}
131+
}

0 commit comments

Comments
 (0)