Skip to content

Commit a2ca2a2

Browse files
adcharrejriguera
authored andcommitted
[receiver/awss3receiver]: Add ingest progress notifications via OpAMP (open-telemetry#33980)
1 parent 14f4c8a commit a2ca2a2

File tree

12 files changed

+645
-14
lines changed

12 files changed

+645
-14
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: 'enhancement'
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: awss3receiver
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: 'Add support for monitoring the progress of ingesting data from an S3 bucket via OpAMP custom messages.'
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [30750]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

receiver/awss3receiver/README.md

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ The following exporter configuration parameters are supported.
3232
| `encodings:` | An array of entries with the following properties: | | Optional |
3333
| `extension` | Extension to use for decoding a key with a matching suffix. | | Required |
3434
| `suffix` | Key suffix to match against. | | Required |
35+
| `notifications:` | | | |
36+
| `opampextension` | Name of the OpAMP Extension to use to send ingest progress notifications. | | |
3537

3638
### Time format for `starttime` and `endtime`
3739
The `starttime` and `endtime` fields are used to specify the time range for which to retrieve data.
@@ -67,4 +69,25 @@ receivers:
6769
encodings:
6870
- extension: text_encoding
6971
suffix: ".txt"
70-
```
72+
```
73+
74+
## Notifications
75+
The receiver can send notifications of ingest progress to an OpAmp server using the custom message capability of
76+
"org.opentelemetry.collector.receiver.awss3" and message type "TimeBasedIngestStatus".
77+
The format of the notifications is a ProtoBuf formatted OLTP logs message with a single Log Record. The `body` of the
78+
record is set to `status` and the timestamp of the record is used to hold the ingest time. The record also has the
79+
following attributes:
80+
81+
| Attribute | Description |
82+
|:------------------|:--------------------------------------------------------------------------------|
83+
| `telemetry_type` | The type of telemetry being ingested. One of "traces", "metrics", or "logs". |
84+
| `ingest_status` | The status of the data ingestion. One of "ingesting", "failed", or "completed". |
85+
| `start_time` | The time to start retrieving data as an Int64, nanoseconds since Unix epoch. |
86+
| `end_time` | The time to stop retrieving data as an Int64, nanoseconds since Unix epoch. |
87+
| `failure_message` | Error message if `ingest_status` is "failed". |
88+
89+
The "ingesting" status is sent at the beginning of the ingest process before data has been retrieved for the specified time.
90+
If during the processing of the data an error occurs a status message with `ingest_status` set to "failed" status with
91+
the time of the data being ingested when the failure occurred.
92+
If the ingest process completes successfully a status message with `ingest_status` set to "completed" is sent.
93+

receiver/awss3receiver/config.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,17 +26,22 @@ type S3DownloaderConfig struct {
2626
S3ForcePathStyle bool `mapstructure:"s3_force_path_style"`
2727
}
2828

29+
type Notifications struct {
30+
OpAMP *component.ID `mapstructure:"opampextension"`
31+
}
32+
2933
type Encoding struct {
3034
Extension component.ID `mapstructure:"extension"`
3135
Suffix string `mapstructure:"suffix"`
3236
}
3337

3438
// Config defines the configuration for the file receiver.
3539
type Config struct {
36-
S3Downloader S3DownloaderConfig `mapstructure:"s3downloader"`
37-
StartTime string `mapstructure:"starttime"`
38-
EndTime string `mapstructure:"endtime"`
39-
Encodings []Encoding `mapstructure:"encodings"`
40+
S3Downloader S3DownloaderConfig `mapstructure:"s3downloader"`
41+
StartTime string `mapstructure:"starttime"`
42+
EndTime string `mapstructure:"endtime"`
43+
Encodings []Encoding `mapstructure:"encodings"`
44+
Notifications Notifications `mapstructure:"notifications"`
4045
}
4146

4247
const (

receiver/awss3receiver/config_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ func TestConfig_Validate_Valid(t *testing.T) {
4141
func TestLoadConfig(t *testing.T) {
4242
cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml"))
4343
require.NoError(t, err)
44-
44+
opampExtension := component.NewIDWithName(component.MustNewType("opamp"), "bar")
4545
tests := []struct {
4646
id component.ID
4747
expected component.Config
@@ -89,6 +89,9 @@ func TestLoadConfig(t *testing.T) {
8989
Suffix: "nop",
9090
},
9191
},
92+
Notifications: Notifications{
93+
OpAMP: &opampExtension,
94+
},
9295
},
9396
},
9497
}

receiver/awss3receiver/go.mod

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ require (
77
github.com/aws/aws-sdk-go-v2/config v1.27.39
88
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.25
99
github.com/aws/aws-sdk-go-v2/service/s3 v1.63.3
10+
github.com/open-telemetry/opamp-go v0.15.0
11+
github.com/open-telemetry/opentelemetry-collector-contrib/extension/opampcustommessages v0.110.1-0.20241004063257-d6cd5935eefc
1012
github.com/stretchr/testify v1.9.0
1113
go.opentelemetry.io/collector/component v0.111.0
1214
go.opentelemetry.io/collector/confmap v1.17.0
@@ -71,3 +73,5 @@ require (
7173
google.golang.org/protobuf v1.34.2 // indirect
7274
gopkg.in/yaml.v3 v3.0.1 // indirect
7375
)
76+
77+
replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/opampcustommessages => ../../extension/opampcustommessages

receiver/awss3receiver/go.sum

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package awss3receiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awss3receiver"
5+
6+
import (
7+
"context"
8+
"errors"
9+
"fmt"
10+
"time"
11+
12+
"github.com/open-telemetry/opamp-go/client/types"
13+
"go.opentelemetry.io/collector/component"
14+
"go.opentelemetry.io/collector/pdata/pcommon"
15+
"go.opentelemetry.io/collector/pdata/plog"
16+
"go.uber.org/zap"
17+
18+
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/opampcustommessages"
19+
)
20+
21+
const (
22+
IngestStatusCompleted = "completed"
23+
IngestStatusFailed = "failed"
24+
IngestStatusIngesting = "ingesting"
25+
CustomCapability = "org.opentelemetry.collector.receiver.awss3"
26+
maxNotificationAttempts = 3
27+
)
28+
29+
type statusNotification struct {
30+
TelemetryType string
31+
IngestStatus string
32+
StartTime time.Time
33+
EndTime time.Time
34+
IngestTime time.Time
35+
FailureMessage string
36+
}
37+
38+
type statusNotifier interface {
39+
Start(ctx context.Context, host component.Host) error
40+
Shutdown(ctx context.Context) error
41+
SendStatus(ctx context.Context, message statusNotification)
42+
}
43+
44+
type opampNotifier struct {
45+
logger *zap.Logger
46+
opampExtensionID component.ID
47+
handler opampcustommessages.CustomCapabilityHandler
48+
}
49+
50+
func newNotifier(config *Config, logger *zap.Logger) statusNotifier {
51+
if config.Notifications.OpAMP != nil {
52+
return &opampNotifier{opampExtensionID: *config.Notifications.OpAMP, logger: logger}
53+
}
54+
return nil
55+
}
56+
57+
func (n *opampNotifier) Start(_ context.Context, host component.Host) error {
58+
ext, ok := host.GetExtensions()[n.opampExtensionID]
59+
if !ok {
60+
return fmt.Errorf("extension %q does not exist", n.opampExtensionID)
61+
}
62+
63+
registry, ok := ext.(opampcustommessages.CustomCapabilityRegistry)
64+
if !ok {
65+
return fmt.Errorf("extension %q is not a custom message registry", n.opampExtensionID)
66+
}
67+
68+
handler, err := registry.Register(CustomCapability)
69+
if err != nil {
70+
return fmt.Errorf("failed to register custom capability: %w", err)
71+
}
72+
if handler == nil {
73+
return errors.New("custom capability handler is nil")
74+
}
75+
n.handler = handler
76+
return nil
77+
}
78+
79+
func (n *opampNotifier) Shutdown(_ context.Context) error {
80+
if n.handler != nil {
81+
n.handler.Unregister()
82+
}
83+
return nil
84+
}
85+
86+
func (n *opampNotifier) SendStatus(_ context.Context, message statusNotification) {
87+
logs := plog.NewLogs()
88+
log := logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()
89+
log.Body().SetStr("status")
90+
attributes := log.Attributes()
91+
attributes.PutStr("telemetry_type", message.TelemetryType)
92+
attributes.PutStr("ingest_status", message.IngestStatus)
93+
attributes.PutInt("start_time", int64(pcommon.NewTimestampFromTime(message.StartTime)))
94+
attributes.PutInt("end_time", int64(pcommon.NewTimestampFromTime(message.EndTime)))
95+
log.SetTimestamp(pcommon.NewTimestampFromTime(message.IngestTime))
96+
97+
if message.FailureMessage != "" {
98+
attributes.PutStr("failure_message", message.FailureMessage)
99+
}
100+
101+
marshaler := plog.ProtoMarshaler{}
102+
bytes, err := marshaler.MarshalLogs(logs)
103+
if err != nil {
104+
return
105+
}
106+
for attempt := 0; attempt < maxNotificationAttempts; attempt++ {
107+
sendingChan, sendingErr := n.handler.SendMessage("TimeBasedIngestStatus", bytes)
108+
switch {
109+
case sendingErr == nil:
110+
return
111+
case errors.Is(sendingErr, types.ErrCustomMessagePending):
112+
<-sendingChan
113+
default:
114+
// The only other errors returned by the OpAmp extension are unrecoverable, ie ErrCustomCapabilityNotSupported
115+
// so just log an error and return.
116+
n.logger.Error("Failed to send notification", zap.Error(sendingErr), zap.Int("attempt", attempt))
117+
return
118+
}
119+
}
120+
n.logger.Error("Failed to send notification after multiple attempts", zap.Int("max_attempts", maxNotificationAttempts))
121+
}

0 commit comments

Comments
 (0)