Skip to content

Commit 4e7f0ef

Browse files
authored
[awss3receiver] Add SQS notifications support (open-telemetry#40054)
#### Description Add the ability to load telemetry from an S3 bucket when a notification of a new object is received via an SQS queue. #### Link to tracking issue Resolves open-telemetry#36315 #### Testing Additional unit tests added for the SQS notifications. Tested against localstack, an S3 bucket was setup to send notifications of new objects to an SQS queue. #### Documentation Details of new config parameters added to the receivers documentation.
1 parent 88ed4f1 commit 4e7f0ef

15 files changed

+1185
-68
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: "enhancement"
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: "awss3receiver"
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: "Add SQS support to the awss3receiver component"
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [36315]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

receiver/awss3receiver/README.md

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ The following exporter configuration parameters are supported.
2020

2121
| Name | Description | Default | Required |
2222
|:------------------------|:-------------------------------------------------------------------------------------------------------------------------------------------|-------------|----------|
23-
| `starttime` | The time at which to start retrieving data. | | Required |
24-
| `endtime` | The time at which to stop retrieving data. | | Required |
23+
| `starttime` | The time at which to start retrieving data. | | Required if fetching by time |
24+
| `endtime` | The time at which to stop retrieving data. | | Required if fetching by time |
2525
| `s3downloader:` | | | |
2626
| `region` | AWS region. | "us-east-1" | Optional |
2727
| `s3_bucket` | S3 bucket | | Required |
@@ -31,11 +31,37 @@ The following exporter configuration parameters are supported.
3131
| `endpoint` | overrides the endpoint used by the exporter instead of constructing it from `region` and `s3_bucket` | | Optional |
3232
| `endpoint_partition_id` | partition id to use if `endpoint` is specified. | "aws" | Optional |
3333
| `s3_force_path_style` | [set this to `true` to force the request to use path-style addressing](http://docs.aws.amazon.com/AmazonS3/latest/dev/VirtualHosting.html) | false | Optional |
34+
| `sqs:` | | | |
35+
| `queue_url` | The URL of the SQS queue that receives S3 bucket notifications | | Required if fetching by SQS notification |
36+
| `region` | AWS region of the SQS queue | | Required if fetching by SQS notification |
37+
| `endpoint` | Custom endpoint for the SQS service | | Optional |
38+
| `max_messages` | Maximum number of messages to retrieve in a single SQS request | 10 | Optional |
39+
| `wait_time` | Wait time in seconds for long polling SQS requests | 20 | Optional |
3440
| `encodings:` | An array of entries with the following properties: | | Optional |
3541
| `extension` | Extension to use for decoding a key with a matching suffix. | | Required |
3642
| `suffix` | Key suffix to match against. | | Required |
3743
| `notifications:` | | | |
38-
| `opampextension` | Name of the OpAMP Extension to use to send ingest progress notifications. | | |
44+
| `opampextension` | Name of the OpAMP Extension to use to send ingest progress notifications. | | |
45+
46+
There are two modes of operation:
47+
48+
1. **Time Range Mode** - Specify `starttime` and `endtime` to fetch data from a specific time range.
49+
2. **SQS Message Mode** - Subscribe to SQS messages to process new objects as they arrive.
50+
51+
### SQS Message Configuration
52+
53+
The receiver can subscribe to an SQS queue that receives S3 event notifications:
54+
55+
```yaml
56+
sqs:
57+
# Required: The ARN of the SQS queue that receives S3 bucket notifications
58+
queue_url: "https:https://sqs.us-east-1.amazonaws.com/123456789012/test-queue"
59+
# Required: The AWS region of the SQS queue
60+
region: "us-east-1"
61+
```
62+
63+
**Note:** You must configure your S3 bucket to send event notifications to the SQS queue.
64+
Time-based configuration (`starttime`/`endtime`) and SQS configuration cannot be used together.
3965

4066
### Time format for `starttime` and `endtime`
4167
The `starttime` and `endtime` fields are used to specify the time range for which to retrieve data.
@@ -71,6 +97,30 @@ receivers:
7197
encodings:
7298
- extension: text_encoding
7399
suffix: ".txt"
100+
101+
receivers:
102+
awss3/sqs_traces:
103+
s3downloader:
104+
region: us-east-1
105+
s3_bucket: mybucket
106+
s3_prefix: mytrace
107+
sqs:
108+
queue_url: "https://sqs.us-east-1.amazonaws.com/123456789012/test-queue"
109+
region: "us-east-1"
110+
111+
exporters:
112+
otlp:
113+
endpoint: otelcol:4317
114+
115+
service:
116+
pipelines:
117+
traces:
118+
receivers: [awss3/traces]
119+
exporters: [otlp]
120+
121+
traces/sqs:
122+
receivers: [awss3/sqs_traces]
123+
exporters: [otlp]
74124
```
75125

76126
## Notifications

receiver/awss3receiver/config.go

Lines changed: 65 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,28 @@ type S3DownloaderConfig struct {
2626
S3ForcePathStyle bool `mapstructure:"s3_force_path_style"`
2727
}
2828

29+
// SQSConfig holds SQS queue configuration for receiving object change notifications.
30+
type SQSConfig struct {
31+
// QueueURL is the URL of the SQS queue to receive S3 notifications.
32+
QueueURL string `mapstructure:"queue_url"`
33+
// Region specifies the AWS region of the SQS queue.
34+
Region string `mapstructure:"region"`
35+
// Endpoint is the optional custom endpoint for SQS (useful for testing).
36+
Endpoint string `mapstructure:"endpoint"`
37+
// WaitTimeSeconds specifies the duration (in seconds) for long polling SQS messages.
38+
// Maximum is 20 seconds. Default is 20 seconds.
39+
WaitTimeSeconds int64 `mapstructure:"wait_time_seconds"`
40+
// MaxNumberOfMessages specifies the maximum number of messages to receive in a single poll.
41+
// Valid values: 1-10. Default is 10.
42+
MaxNumberOfMessages int64 `mapstructure:"max_number_of_messages"`
43+
}
44+
45+
// Notifications groups optional notification sources.
2946
type Notifications struct {
3047
OpAMP *component.ID `mapstructure:"opampextension"`
3148
}
3249

50+
// Encoding defines the encoding configuration for the file receiver.
3351
type Encoding struct {
3452
Extension component.ID `mapstructure:"extension"`
3553
Suffix string `mapstructure:"suffix"`
@@ -42,6 +60,8 @@ type Config struct {
4260
EndTime string `mapstructure:"endtime"`
4361
Encodings []Encoding `mapstructure:"encodings"`
4462
Notifications Notifications `mapstructure:"notifications"`
63+
// SQS configures receiving S3 object change notifications via an SQS queue.
64+
SQS *SQSConfig `mapstructure:"sqs"`
4565
}
4666

4767
const (
@@ -67,20 +87,59 @@ func (c Config) Validate() error {
6787
if c.S3Downloader.S3Partition != S3PartitionHour && c.S3Downloader.S3Partition != S3PartitionMinute {
6888
errs = multierr.Append(errs, errors.New("s3_partition must be either 'hour' or 'minute'"))
6989
}
70-
if c.StartTime == "" {
71-
errs = multierr.Append(errs, errors.New("starttime is required"))
72-
} else {
90+
91+
// Check for valid time-based configuration
92+
hasStartTime := c.StartTime != ""
93+
hasEndTime := c.EndTime != ""
94+
hasSQS := c.SQS != nil
95+
96+
if !hasStartTime && !hasEndTime && !hasSQS {
97+
errs = multierr.Append(errs, errors.New("either starttime/endtime or sqs configuration must be provided"))
98+
}
99+
100+
// If one of StartTime/EndTime is specified, the other must also be specified
101+
if hasStartTime && !hasEndTime {
102+
errs = multierr.Append(errs, errors.New("when starttime is specified, endtime is required"))
103+
}
104+
if !hasStartTime && hasEndTime {
105+
errs = multierr.Append(errs, errors.New("when endtime is specified, starttime is required"))
106+
}
107+
108+
// StartTime and SQS cannot be specified together
109+
if hasStartTime && hasSQS {
110+
errs = multierr.Append(errs, errors.New("starttime/endtime and sqs configuration cannot be used together"))
111+
}
112+
113+
// Validate StartTime format if specified
114+
if hasStartTime {
73115
if _, err := parseTime(c.StartTime, "starttime"); err != nil {
74116
errs = multierr.Append(errs, err)
75117
}
76118
}
77-
if c.EndTime == "" {
78-
errs = multierr.Append(errs, errors.New("endtime is required"))
79-
} else {
119+
// Validate EndTime format if specified
120+
if hasEndTime {
80121
if _, err := parseTime(c.EndTime, "endtime"); err != nil {
81122
errs = multierr.Append(errs, err)
82123
}
83124
}
125+
126+
// Validate SQS notifications if configured
127+
if c.SQS != nil {
128+
if c.SQS.QueueURL == "" {
129+
errs = multierr.Append(errs, errors.New("sqs.queue_url is required"))
130+
}
131+
if c.SQS.Region == "" {
132+
errs = multierr.Append(errs, errors.New("sqs.region is required"))
133+
}
134+
// Validate wait time seconds
135+
if c.SQS.WaitTimeSeconds < 0 || c.SQS.WaitTimeSeconds > 20 {
136+
errs = multierr.Append(errs, errors.New("sqs.wait_time_seconds must be between 0 and 20"))
137+
}
138+
// Validate max number of messages
139+
if c.SQS.MaxNumberOfMessages < 0 || c.SQS.MaxNumberOfMessages > 10 {
140+
errs = multierr.Append(errs, errors.New("sqs.max_number_of_messages must be between 1 and 10"))
141+
}
142+
}
84143
return errs
85144
}
86145

receiver/awss3receiver/config_test.go

Lines changed: 56 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,21 +22,45 @@ func TestLoadConfig_Validate_Invalid(t *testing.T) {
2222
}
2323

2424
func TestConfig_Validate_Valid(t *testing.T) {
25-
cfg := Config{
26-
S3Downloader: S3DownloaderConfig{
27-
Region: "",
28-
S3Bucket: "abucket",
29-
S3Prefix: "",
30-
S3Partition: "minute",
31-
FilePrefix: "",
32-
Endpoint: "",
33-
EndpointPartitionID: "aws",
34-
S3ForcePathStyle: false,
35-
},
36-
StartTime: "2024-01-01",
37-
EndTime: "2024-01-01",
38-
}
39-
assert.NoError(t, cfg.Validate())
25+
// Valid config with StartTime/EndTime
26+
t.Run("with time range", func(t *testing.T) {
27+
cfg := Config{
28+
S3Downloader: S3DownloaderConfig{
29+
Region: "",
30+
S3Bucket: "abucket",
31+
S3Prefix: "",
32+
S3Partition: "minute",
33+
FilePrefix: "",
34+
Endpoint: "",
35+
EndpointPartitionID: "aws",
36+
S3ForcePathStyle: false,
37+
},
38+
StartTime: "2024-01-01",
39+
EndTime: "2024-01-01",
40+
}
41+
assert.NoError(t, cfg.Validate())
42+
})
43+
44+
// Valid config with SQS
45+
t.Run("with sqs", func(t *testing.T) {
46+
cfg := Config{
47+
S3Downloader: S3DownloaderConfig{
48+
Region: "",
49+
S3Bucket: "abucket",
50+
S3Prefix: "",
51+
S3Partition: "minute",
52+
FilePrefix: "",
53+
Endpoint: "",
54+
EndpointPartitionID: "aws",
55+
S3ForcePathStyle: false,
56+
},
57+
SQS: &SQSConfig{
58+
QueueURL: "https://sqs.us-east-1.amazonaws.com/123456789012/test-queue",
59+
Region: "us-east-1",
60+
},
61+
}
62+
assert.NoError(t, cfg.Validate())
63+
})
4064
}
4165

4266
func TestLoadConfig(t *testing.T) {
@@ -50,7 +74,7 @@ func TestLoadConfig(t *testing.T) {
5074
}{
5175
{
5276
id: component.NewIDWithName(metadata.Type, ""),
53-
errorMessage: "bucket is required; starttime is required; endtime is required",
77+
errorMessage: "bucket is required; either starttime/endtime or sqs configuration must be provided",
5478
},
5579
{
5680
id: component.NewIDWithName(metadata.Type, "1"),
@@ -108,6 +132,22 @@ func TestLoadConfig(t *testing.T) {
108132
EndTime: "2024-02-03T00:00:00Z",
109133
},
110134
},
135+
{
136+
id: component.NewIDWithName(metadata.Type, "5"),
137+
expected: &Config{
138+
S3Downloader: S3DownloaderConfig{
139+
Region: "us-east-1",
140+
S3Bucket: "abucket",
141+
S3Partition: "minute",
142+
EndpointPartitionID: "aws",
143+
},
144+
SQS: &SQSConfig{
145+
QueueURL: "https://sqs.us-east-1.amazonaws.com/123456789012/test-queue",
146+
Region: "us-east-1",
147+
Endpoint: "http://localhost:4575",
148+
},
149+
},
150+
},
111151
}
112152

113153
for _, tt := range tests {

receiver/awss3receiver/go.mod

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ require (
77
github.com/aws/aws-sdk-go-v2/config v1.29.14
88
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.75
99
github.com/aws/aws-sdk-go-v2/service/s3 v1.79.3
10+
github.com/aws/aws-sdk-go-v2/service/sqs v1.38.5
1011
github.com/open-telemetry/opamp-go v0.19.0
1112
github.com/open-telemetry/opentelemetry-collector-contrib/extension/opampcustommessages v0.126.0
1213
github.com/stretchr/testify v1.10.0
@@ -59,6 +60,7 @@ require (
5960
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
6061
github.com/modern-go/reflect2 v1.0.2 // indirect
6162
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
63+
github.com/stretchr/objx v0.5.2 // indirect
6264
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
6365
go.opentelemetry.io/collector/consumer/consumererror v0.126.1-0.20250515040533-97a6accbc082 // indirect
6466
go.opentelemetry.io/collector/consumer/xconsumer v0.126.1-0.20250515040533-97a6accbc082 // indirect

receiver/awss3receiver/go.sum

Lines changed: 4 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)