Skip to content

Commit 3ec4a35

Browse files
authored
feat: add status reporting to S3 (#45748)
* Add health status reporting * Move status reporter helper from CloudWatch status reporting to libbeat * Enhance status reporter helper * Fix linting issues, esp related to logging
1 parent 9dc2a0c commit 3ec4a35

15 files changed

+313
-144
lines changed

CHANGELOG.next.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -510,6 +510,7 @@ otherwise no tag is added. {issue}42208[42208] {pull}42403[42403]
510510
- Add status reporting support for AWS CloudWatch input. {pull}45679[45679]
511511
- Add mechanism to allow HTTP JSON templates to terminate without logging an error. {issue}45664[45664] {pull}45810[45810]
512512
- Improve error reporting for schemeless URLs in HTTP JSON input. {pull}45953[45953]
513+
- Add status reporting support for AWS S3 input. {pull}45748[45748]
513514

514515
*Auditbeat*
515516

x-pack/filebeat/input/awscloudwatch/input.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/elastic/beats/v7/libbeat/management/status"
2222
"github.com/elastic/beats/v7/libbeat/statestore"
2323
awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws"
24+
"github.com/elastic/beats/v7/x-pack/libbeat/statusreporterhelper"
2425
conf "github.com/elastic/elastic-agent-libs/config"
2526
"github.com/elastic/elastic-agent-libs/logp"
2627
"github.com/elastic/go-concert/unison"
@@ -94,7 +95,7 @@ func (in *cloudwatchInput) Run(inputContext v2.Context, pipeline beat.Pipeline)
9495
log := inputContext.Logger
9596

9697
// setup status reporter
97-
in.status = newCWStateReporter(inputContext, log)
98+
in.status = statusreporterhelper.New(inputContext.StatusReporter, log, "CloudWatch")
9899

99100
defer in.status.UpdateStatus(status.Stopped, "")
100101
in.status.UpdateStatus(status.Starting, "Input starting")

x-pack/filebeat/input/awscloudwatch/state_reporter.go

Lines changed: 0 additions & 54 deletions
This file was deleted.

x-pack/filebeat/input/awscloudwatch/state_reporter_test.go

Lines changed: 0 additions & 59 deletions
This file was deleted.

x-pack/filebeat/input/awss3/input_benchmark_test.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,7 @@ func benchmarkInputSQS(t *testing.T, workerCount int) testing.BenchmarkResult {
223223
config.NumberOfWorkers = workerCount
224224
sqsReader := newSQSReaderInput(config, aws.Config{})
225225
sqsReader.log = log.Named("sqs")
226+
sqsReader.status = &statusReporterHelperMock{}
226227
sqsReader.pipeline = newFakePipeline()
227228
sqsReader.metrics = newInputMetrics(monitoring.NewRegistry(), workerCount)
228229
sqsReader.sqs, err = newConstantSQS()
@@ -235,6 +236,7 @@ func benchmarkInputSQS(t *testing.T, workerCount int) testing.BenchmarkResult {
235236
b.Cleanup(cancel)
236237

237238
go func() {
239+
//nolint:gosec // not going to have anywhere near uint64 overflow number of received messages
238240
for sqsReader.metrics.sqsMessagesReceivedTotal.Get() < uint64(b.N) {
239241
time.Sleep(5 * time.Millisecond)
240242
}
@@ -262,8 +264,6 @@ func benchmarkInputSQS(t *testing.T, workerCount int) testing.BenchmarkResult {
262264
}
263265

264266
func TestBenchmarkInputSQS(t *testing.T) {
265-
err := logp.TestingSetup(logp.WithLevel(logp.InfoLevel))
266-
require.NoError(t, err)
267267

268268
results := []testing.BenchmarkResult{
269269
benchmarkInputSQS(t, 1),
@@ -352,6 +352,7 @@ func benchmarkInputS3(t *testing.T, numberOfWorkers int) testing.BenchmarkResult
352352
states: states,
353353
provider: "provider",
354354
filterProvider: newFilterProvider(&config),
355+
status: &statusReporterHelperMock{},
355356
}
356357

357358
s3Poller.run(ctx)
@@ -392,8 +393,6 @@ func benchmarkInputS3(t *testing.T, numberOfWorkers int) testing.BenchmarkResult
392393
}
393394

394395
func TestBenchmarkInputS3(t *testing.T) {
395-
err := logp.TestingSetup(logp.WithLevel(logp.InfoLevel))
396-
require.NoError(t, err)
397396

398397
results := []testing.BenchmarkResult{
399398
benchmarkInputS3(t, 1),

x-pack/filebeat/input/awss3/input_test.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,50 @@ package awss3
66

77
import (
88
"errors"
9+
"sync"
910
"testing"
1011

1112
awssdk "github.com/aws/aws-sdk-go-v2/aws"
1213
"github.com/stretchr/testify/assert"
1314

1415
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
16+
"github.com/elastic/beats/v7/libbeat/management/status"
1517
awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws"
1618
"github.com/elastic/elastic-agent-libs/logp"
1719
"github.com/elastic/elastic-agent-libs/monitoring"
1820
)
1921

22+
// statusReporterHelperMock is a thread-safe mock of a status reporter that
23+
// behaves like StatusReporterHelper
24+
type statusReporterHelperMock struct {
25+
mu sync.Mutex
26+
statuses []mgmtStatusUpdate
27+
current status.Status
28+
}
29+
30+
type mgmtStatusUpdate struct {
31+
status status.Status
32+
msg string
33+
}
34+
35+
func (r *statusReporterHelperMock) getStatuses() []mgmtStatusUpdate {
36+
r.mu.Lock()
37+
defer r.mu.Unlock()
38+
s := make([]mgmtStatusUpdate, len(r.statuses))
39+
copy(s, r.statuses)
40+
return s
41+
}
42+
43+
func (r *statusReporterHelperMock) UpdateStatus(s status.Status, msg string) {
44+
r.mu.Lock()
45+
defer r.mu.Unlock()
46+
// Imitate behavior of statusReporterHelper. Only record if the new status is different.
47+
if s != r.current {
48+
r.current = s
49+
r.statuses = append(r.statuses, mgmtStatusUpdate{status: s, msg: msg})
50+
}
51+
}
52+
2053
func TestGetProviderFromDomain(t *testing.T) {
2154
tests := []struct {
2255
endpoint string
@@ -155,6 +188,7 @@ func TestRegionSelection(t *testing.T) {
155188
MetricsRegistry: monitoring.NewRegistry(),
156189
}
157190

191+
in.status = &statusReporterHelperMock{}
158192
// Run setup and verify that it put the correct region in awsConfig.Region
159193
err := in.setup(inputCtx, &fakePipeline{})
160194
in.cleanup()

x-pack/filebeat/input/awss3/s3_input.go

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@ import (
1616
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
1717
"github.com/elastic/beats/v7/libbeat/beat"
1818
"github.com/elastic/beats/v7/libbeat/common/backoff"
19+
"github.com/elastic/beats/v7/libbeat/management/status"
1920
"github.com/elastic/beats/v7/libbeat/statestore"
21+
"github.com/elastic/beats/v7/x-pack/libbeat/statusreporterhelper"
2022
"github.com/elastic/elastic-agent-libs/logp"
2123
"github.com/elastic/go-concert/timed"
2224
)
@@ -37,6 +39,9 @@ type s3PollerInput struct {
3739
s3ObjectHandler s3ObjectHandlerFactory
3840
states *states
3941
filterProvider *filterProvider
42+
43+
// health status reporting
44+
status status.StatusReporter
4045
}
4146

4247
func newS3PollerInput(
@@ -62,21 +67,32 @@ func (in *s3PollerInput) Run(
6267
inputContext v2.Context,
6368
pipeline beat.Pipeline,
6469
) error {
70+
6571
in.log = inputContext.Logger.Named("s3")
72+
73+
in.status = statusreporterhelper.New(inputContext.StatusReporter, in.log, "S3")
74+
defer in.status.UpdateStatus(status.Stopped, "")
75+
in.status.UpdateStatus(status.Starting, "Input starting")
76+
6677
in.pipeline = pipeline
6778
var err error
6879

6980
// Load the persistent S3 polling state.
7081
in.states, err = newStates(in.log, in.store, in.config.BucketListPrefix)
7182
if err != nil {
72-
return fmt.Errorf("can not start persistent store: %w", err)
83+
err = fmt.Errorf("can not start persistent store: %w", err)
84+
in.status.UpdateStatus(status.Failed, fmt.Sprintf("Setup failure: %s", err.Error()))
85+
return err
7386
}
7487
defer in.states.Close()
7588

7689
ctx := v2.GoContextFromCanceler(inputContext.Cancelation)
90+
in.status.UpdateStatus(status.Configuring, "Configuring input")
7791
in.s3, err = in.createS3API(ctx)
7892
if err != nil {
79-
return fmt.Errorf("failed to create S3 API: %w", err)
93+
err = fmt.Errorf("failed to create S3 API for bucket ARN '%s': Error: %w", in.config.getBucketARN(), err)
94+
in.status.UpdateStatus(status.Failed, fmt.Sprintf("Setup failure: %s", err.Error()))
95+
return err
8096
}
8197

8298
in.metrics = newInputMetrics(inputContext.MetricsRegistry, in.config.NumberOfWorkers)
@@ -97,6 +113,7 @@ func (in *s3PollerInput) Run(
97113
func (in *s3PollerInput) run(ctx context.Context) {
98114
// Scan the bucket in a loop, delaying by the configured interval each
99115
// iteration.
116+
in.status.UpdateStatus(status.Running, "Input is running")
100117
for ctx.Err() == nil {
101118
in.runPoll(ctx)
102119
_ = timed.Wait(ctx, in.config.BucketListInterval)
@@ -129,6 +146,7 @@ func (in *s3PollerInput) runPoll(ctx context.Context) {
129146
err := in.states.CleanUp(ids)
130147
if err != nil {
131148
in.log.Errorf("failed to cleanup states: %v", err.Error())
149+
in.status.UpdateStatus(status.Degraded, fmt.Sprintf("Input state cleanup failure: %s", err.Error()))
132150
}
133151
}
134152

@@ -138,13 +156,13 @@ func (in *s3PollerInput) workerLoop(ctx context.Context, workChan <-chan state)
138156
client, err := createPipelineClient(in.pipeline, acks)
139157
if err != nil {
140158
in.log.Errorf("failed to create pipeline client: %v", err.Error())
159+
in.status.UpdateStatus(status.Degraded, fmt.Sprintf("A worker's pipeline client setup failed, error: %s", err.Error()))
141160
return
142161
}
143162
defer client.Close()
144163
defer acks.Close()
145164

146165
rateLimitWaiter := backoff.NewEqualJitterBackoff(ctx.Done(), 1, 120)
147-
148166
for _state := range workChan {
149167
state := _state
150168
event := in.s3EventForState(state)
@@ -166,6 +184,9 @@ func (in *s3PollerInput) workerLoop(ctx context.Context, workChan <-chan state)
166184
if errors.Is(err, errS3DownloadFailed) {
167185
// Download errors are ephemeral. Add a backoff delay, then skip to the
168186
// next iteration so we don't mark the object as permanently failed.
187+
in.status.UpdateStatus(status.Degraded,
188+
fmt.Sprintf("S3 download failure for object key '%s' in bucket '%s': %s",
189+
state.Key, state.Bucket, err.Error()))
169190
rateLimitWaiter.Wait()
170191
continue
171192
}
@@ -176,7 +197,10 @@ func (in *s3PollerInput) workerLoop(ctx context.Context, workChan <-chan state)
176197
if err != nil {
177198
in.log.Errorf("failed processing S3 event for object key %q in bucket %q: %v",
178199
state.Key, state.Bucket, err.Error())
179-
200+
in.status.UpdateStatus(status.Degraded,
201+
fmt.Sprintf(
202+
"S3 object processing failure for object key '%s' in bucket '%s': %s",
203+
state.Key, state.Bucket, err.Error()))
180204
// Non-retryable error.
181205
state.Failed = true
182206
} else {
@@ -188,6 +212,9 @@ func (in *s3PollerInput) workerLoop(ctx context.Context, workChan <-chan state)
188212
err := in.states.AddState(state)
189213
if err != nil {
190214
in.log.Errorf("saving completed object state: %v", err.Error())
215+
in.status.UpdateStatus(status.Degraded, fmt.Sprintf("Failure checkpointing (saving completed object state): %s", err.Error()))
216+
} else {
217+
in.status.UpdateStatus(status.Running, "Input is running")
191218
}
192219

193220
// Metrics
@@ -213,12 +240,14 @@ func (in *s3PollerInput) readerLoop(ctx context.Context, workChan chan<- state)
213240

214241
if err != nil {
215242
in.log.Warnw("Error when paginating listing.", "error", err)
243+
in.status.UpdateStatus(status.Degraded, fmt.Sprintf("S3 pagination error: %s", err.Error()))
216244
// QuotaExceededError is client-side rate limiting in the AWS sdk,
217245
// don't include it in the circuit breaker count
218246
if !errors.As(err, &ratelimit.QuotaExceededError{}) {
219247
circuitBreaker++
220248
if circuitBreaker >= readerLoopMaxCircuitBreaker {
221249
in.log.Warnw(fmt.Sprintf("%d consecutive error when paginating listing, breaking the circuit.", circuitBreaker), "error", err)
250+
in.status.UpdateStatus(status.Degraded, fmt.Sprintf("Too many consecutive errors (%d) in S3 pagination. Error: %s", circuitBreaker, err.Error()))
222251
return nil, false
223252
}
224253
}

0 commit comments

Comments
 (0)