Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
kind: bug-fix
summary: fix defer usage for stopped status reporting
component: filebeat

# AUTOMATED
# OPTIONAL to manually add other PR URLs
# PR URL: A link the PR that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
# pr: https://github.com/owner/repo/1234

# AUTOMATED
# OPTIONAL to manually add other issue URLs
# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
# issue: https://github.com/owner/repo/1234
4 changes: 2 additions & 2 deletions x-pack/filebeat/input/awscloudwatch/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,6 @@ func (in *cloudwatchInput) Run(inputContext v2.Context, pipeline beat.Pipeline)

// setup status reporter
in.status = statusreporterhelper.New(inputContext.StatusReporter, log, "CloudWatch")

defer in.status.UpdateStatus(status.Stopped, "")
in.status.UpdateStatus(status.Starting, "Input starting")

handler, err := newStateHandler(log, in.config, in.store)
Expand Down Expand Up @@ -154,6 +152,8 @@ func (in *cloudwatchInput) Run(inputContext v2.Context, pipeline beat.Pipeline)
log.Debugf("Config scan_frequency = %s", cwPoller.config.ScanFrequency)
log.Debugf("Config api_sleep = %s", cwPoller.config.APISleep)
cwPoller.receive(ctx, logGroupIDs, time.Now)
in.status.UpdateStatus(status.Stopped, "Input execution ended")

return nil
}

Expand Down
3 changes: 1 addition & 2 deletions x-pack/filebeat/input/awss3/s3_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ func (in *s3PollerInput) Run(
in.log = inputContext.Logger.Named("s3")

in.status = statusreporterhelper.New(inputContext.StatusReporter, in.log, "S3")
defer in.status.UpdateStatus(status.Stopped, "")
in.status.UpdateStatus(status.Starting, "Input starting")

in.pipeline = pipeline
Expand Down Expand Up @@ -107,7 +106,7 @@ func (in *s3PollerInput) Run(
)

in.run(ctx)

in.status.UpdateStatus(status.Stopped, "Input execution ended")
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/awss3/sqs_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ func (in *sqsReaderInput) Run(
pipeline beat.Pipeline,
) error {
in.status = statusreporterhelper.New(inputContext.StatusReporter, inputContext.Logger, "S3 via SQS")
defer in.status.UpdateStatus(status.Stopped, "")
in.status.UpdateStatus(status.Starting, "Input starting")

// Initialize everything for this run
Expand All @@ -90,6 +89,7 @@ func (in *sqsReaderInput) Run(
ctx := v2.GoContextFromCanceler(inputContext.Cancelation)
in.run(ctx)
in.cleanup()
in.status.UpdateStatus(status.Stopped, "Input execution ended")

return nil
}
Expand Down