Skip to content

Commit dd822af

Browse files
lopes-felipechengchuanpeng
authored andcommitted
[exporter/awss3] Implement sending queue (open-telemetry#37274)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description Implements the common queueing feature into the `awss3exporter`, as suggested [here](open-telemetry#36264).
1 parent 74db4ec commit dd822af

File tree

6 files changed

+123
-20
lines changed

6 files changed

+123
-20
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: awss3exporter
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Implement sending queue for S3 exporter
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [37274, 36264]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

exporter/awss3exporter/README.md

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,21 +19,22 @@ This exporter targets to support proto/json format.
1919

2020
The following exporter configuration parameters are supported.
2121

22-
| Name | Description | Default |
23-
|:----------------------|:-------------------------------------------------------------------------------------------------------------------------------------------|-------------|
24-
| `region` | AWS region. | "us-east-1" |
25-
| `s3_bucket` | S3 bucket | |
26-
| `s3_prefix` | prefix for the S3 key (root directory inside bucket). | |
27-
| `s3_partition` | time granularity of S3 key: hour or minute | "minute" |
28-
| `role_arn` | the Role ARN to be assumed | |
29-
| `file_prefix` | file prefix defined by user | |
30-
| `marshaler` | marshaler used to produce output data | `otlp_json` |
31-
| `encoding` | Encoding extension to use to marshal data. Overrides the `marshaler` configuration option if set. | |
22+
| Name | Description | Default |
23+
|:--------------------------|:-------------------------------------------------------------------------------------------------------------------------------------------|-------------|
24+
| `region` | AWS region. | "us-east-1" |
25+
| `s3_bucket` | S3 bucket | |
26+
| `s3_prefix` | prefix for the S3 key (root directory inside bucket). | |
27+
| `s3_partition` | time granularity of S3 key: hour or minute | "minute" |
28+
| `role_arn` | the Role ARN to be assumed | |
29+
| `file_prefix` | file prefix defined by user | |
30+
| `marshaler` | marshaler used to produce output data | `otlp_json` |
31+
| `encoding` | Encoding extension to use to marshal data. Overrides the `marshaler` configuration option if set. | |
3232
| `encoding_file_extension` | file format extension suffix when using the `encoding` configuration option. May be left empty for no suffix to be appended. | |
33-
| `endpoint` | (REST API endpoint) overrides the endpoint used by the exporter instead of constructing it from `region` and `s3_bucket` | |
34-
| `s3_force_path_style` | [set this to `true` to force the request to use path-style addressing](http://docs.aws.amazon.com/AmazonS3/latest/dev/VirtualHosting.html) | false |
35-
| `disable_ssl` | set this to `true` to disable SSL when sending requests | false |
36-
| `compression` | should the file be compressed | none |
33+
| `endpoint` | (REST API endpoint) overrides the endpoint used by the exporter instead of constructing it from `region` and `s3_bucket` | |
34+
| `s3_force_path_style` | [set this to `true` to force the request to use path-style addressing](http://docs.aws.amazon.com/AmazonS3/latest/dev/VirtualHosting.html) | false |
35+
| `disable_ssl` | set this to `true` to disable SSL when sending requests | false |
36+
| `compression` | should the file be compressed | none |
37+
| `sending_queue` | [exporters common queuing](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md) | disabled |
3738

3839
### Marshaler
3940

@@ -68,6 +69,12 @@ exporters:
6869
s3_bucket: 'databucket'
6970
s3_prefix: 'metric'
7071
s3_partition: 'minute'
72+
73+
# Optional (disabled by default)
74+
sending_queue:
75+
enabled: true
76+
num_consumers: 10
77+
queue_size: 100
7178
```
7279
7380
Logs and traces will be stored inside 'databucket' in the following path format.

exporter/awss3exporter/config.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88

99
"go.opentelemetry.io/collector/component"
1010
"go.opentelemetry.io/collector/config/configcompression"
11+
"go.opentelemetry.io/collector/exporter/exporterhelper"
1112
"go.uber.org/multierr"
1213
)
1314

@@ -49,6 +50,8 @@ const (
4950

5051
// Config contains the main configuration options for the s3 exporter
5152
type Config struct {
53+
QueueSettings exporterhelper.QueueConfig `mapstructure:"sending_queue"`
54+
5255
S3Uploader S3UploaderConfig `mapstructure:"s3uploader"`
5356
MarshalerName MarshalerType `mapstructure:"marshaler"`
5457

exporter/awss3exporter/config_test.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/stretchr/testify/assert"
1212
"github.com/stretchr/testify/require"
1313
"go.opentelemetry.io/collector/component"
14+
"go.opentelemetry.io/collector/exporter/exporterhelper"
1415
"go.opentelemetry.io/collector/otelcol/otelcoltest"
1516
"go.uber.org/multierr"
1617

@@ -32,7 +33,12 @@ func TestLoadConfig(t *testing.T) {
3233

3334
e := cfg.Exporters[component.MustNewID("awss3")].(*Config)
3435
encoding := component.MustNewIDWithName("foo", "bar")
36+
37+
queueCfg := exporterhelper.NewDefaultQueueConfig()
38+
queueCfg.Enabled = false
39+
3540
assert.Equal(t, &Config{
41+
QueueSettings: queueCfg,
3642
Encoding: &encoding,
3743
EncodingFileExtension: "baz",
3844
S3Uploader: S3UploaderConfig{
@@ -59,9 +65,16 @@ func TestConfig(t *testing.T) {
5965
require.NoError(t, err)
6066
require.NotNil(t, cfg)
6167

68+
queueCfg := exporterhelper.QueueConfig{
69+
Enabled: true,
70+
NumConsumers: 23,
71+
QueueSize: 42,
72+
}
73+
6274
e := cfg.Exporters[component.MustNewID("awss3")].(*Config)
6375

6476
assert.Equal(t, &Config{
77+
QueueSettings: queueCfg,
6578
S3Uploader: S3UploaderConfig{
6679
Region: "us-east-1",
6780
S3Bucket: "foo",
@@ -88,9 +101,13 @@ func TestConfigForS3CompatibleSystems(t *testing.T) {
88101
require.NoError(t, err)
89102
require.NotNil(t, cfg)
90103

104+
queueCfg := exporterhelper.NewDefaultQueueConfig()
105+
queueCfg.Enabled = false
106+
91107
e := cfg.Exporters[component.MustNewID("awss3")].(*Config)
92108

93109
assert.Equal(t, &Config{
110+
QueueSettings: queueCfg,
94111
S3Uploader: S3UploaderConfig{
95112
Region: "us-east-1",
96113
S3Bucket: "foo",
@@ -200,9 +217,13 @@ func TestMarshallerName(t *testing.T) {
200217
require.NoError(t, err)
201218
require.NotNil(t, cfg)
202219

220+
queueCfg := exporterhelper.NewDefaultQueueConfig()
221+
queueCfg.Enabled = false
222+
203223
e := cfg.Exporters[component.MustNewID("awss3")].(*Config)
204224

205225
assert.Equal(t, &Config{
226+
QueueSettings: queueCfg,
206227
S3Uploader: S3UploaderConfig{
207228
Region: "us-east-1",
208229
S3Bucket: "foo",
@@ -215,6 +236,7 @@ func TestMarshallerName(t *testing.T) {
215236
e = cfg.Exporters[component.MustNewIDWithName("awss3", "proto")].(*Config)
216237

217238
assert.Equal(t, &Config{
239+
QueueSettings: queueCfg,
218240
S3Uploader: S3UploaderConfig{
219241
Region: "us-east-1",
220242
S3Bucket: "bar",
@@ -239,9 +261,13 @@ func TestCompressionName(t *testing.T) {
239261
require.NoError(t, err)
240262
require.NotNil(t, cfg)
241263

264+
queueCfg := exporterhelper.NewDefaultQueueConfig()
265+
queueCfg.Enabled = false
266+
242267
e := cfg.Exporters[component.MustNewID("awss3")].(*Config)
243268

244269
assert.Equal(t, &Config{
270+
QueueSettings: queueCfg,
245271
S3Uploader: S3UploaderConfig{
246272
Region: "us-east-1",
247273
S3Bucket: "foo",
@@ -255,6 +281,7 @@ func TestCompressionName(t *testing.T) {
255281
e = cfg.Exporters[component.MustNewIDWithName("awss3", "proto")].(*Config)
256282

257283
assert.Equal(t, &Config{
284+
QueueSettings: queueCfg,
258285
S3Uploader: S3UploaderConfig{
259286
Region: "us-east-1",
260287
S3Bucket: "bar",

exporter/awss3exporter/factory.go

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,11 @@ func NewFactory() exporter.Factory {
2626
}
2727

2828
func createDefaultConfig() component.Config {
29+
queueCfg := exporterhelper.NewDefaultQueueConfig()
30+
queueCfg.Enabled = false
31+
2932
return &Config{
33+
QueueSettings: queueCfg,
3034
S3Uploader: S3UploaderConfig{
3135
Region: "us-east-1",
3236
S3Partition: "minute",
@@ -39,19 +43,31 @@ func createLogsExporter(ctx context.Context,
3943
params exporter.Settings,
4044
config component.Config,
4145
) (exporter.Logs, error) {
42-
s3Exporter := newS3Exporter(config.(*Config), "logs", params)
46+
cfg, err := checkAndCastConfig(config)
47+
if err != nil {
48+
return nil, err
49+
}
50+
51+
s3Exporter := newS3Exporter(cfg, "logs", params)
4352

4453
return exporterhelper.NewLogs(ctx, params,
4554
config,
4655
s3Exporter.ConsumeLogs,
47-
exporterhelper.WithStart(s3Exporter.start))
56+
exporterhelper.WithStart(s3Exporter.start),
57+
exporterhelper.WithQueue(cfg.QueueSettings),
58+
)
4859
}
4960

5061
func createMetricsExporter(ctx context.Context,
5162
params exporter.Settings,
5263
config component.Config,
5364
) (exporter.Metrics, error) {
54-
s3Exporter := newS3Exporter(config.(*Config), "metrics", params)
65+
cfg, err := checkAndCastConfig(config)
66+
if err != nil {
67+
return nil, err
68+
}
69+
70+
s3Exporter := newS3Exporter(cfg, "metrics", params)
5571

5672
if config.(*Config).MarshalerName == SumoIC {
5773
return nil, fmt.Errorf("metrics are not supported by sumo_ic output format")
@@ -60,14 +76,21 @@ func createMetricsExporter(ctx context.Context,
6076
return exporterhelper.NewMetrics(ctx, params,
6177
config,
6278
s3Exporter.ConsumeMetrics,
63-
exporterhelper.WithStart(s3Exporter.start))
79+
exporterhelper.WithStart(s3Exporter.start),
80+
exporterhelper.WithQueue(cfg.QueueSettings),
81+
)
6482
}
6583

6684
func createTracesExporter(ctx context.Context,
6785
params exporter.Settings,
6886
config component.Config,
6987
) (exporter.Traces, error) {
70-
s3Exporter := newS3Exporter(config.(*Config), "traces", params)
88+
cfg, err := checkAndCastConfig(config)
89+
if err != nil {
90+
return nil, err
91+
}
92+
93+
s3Exporter := newS3Exporter(cfg, "traces", params)
7194

7295
if config.(*Config).MarshalerName == SumoIC {
7396
return nil, fmt.Errorf("traces are not supported by sumo_ic output format")
@@ -77,5 +100,16 @@ func createTracesExporter(ctx context.Context,
77100
params,
78101
config,
79102
s3Exporter.ConsumeTraces,
80-
exporterhelper.WithStart(s3Exporter.start))
103+
exporterhelper.WithStart(s3Exporter.start),
104+
exporterhelper.WithQueue(cfg.QueueSettings),
105+
)
106+
}
107+
108+
// checkAndCastConfig checks the configuration type and casts it to the S3 exporter Config struct.
109+
func checkAndCastConfig(c component.Config) (*Config, error) {
110+
cfg, ok := c.(*Config)
111+
if !ok {
112+
return nil, fmt.Errorf("config structure is not of type *awss3exporter.Config")
113+
}
114+
return cfg, nil
81115
}

exporter/awss3exporter/testdata/config.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,11 @@ receivers:
33

44
exporters:
55
awss3:
6+
sending_queue:
7+
enabled: true
8+
num_consumers: 23
9+
queue_size: 42
10+
611
s3uploader:
712
region: 'us-east-1'
813
s3_bucket: 'foo'

0 commit comments

Comments
 (0)