Skip to content

[pkg/stanza] Remove batching in LogEmitter behind feature gate #38428

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
30 changes: 30 additions & 0 deletions .chloggen/synchronous-logemitter-behind-feature-gate.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: pkg/stanza

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Prevent data loss in Stanza-based receivers on ungraceful shutdown of the collector

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [35456]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
Enable the `stanza.synchronousLogEmitter` feature gate to unlock this feature.
See the [documentation](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/stanza/README.md) for more information.


# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
28 changes: 28 additions & 0 deletions pkg/stanza/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,34 @@ Common functionality for all of these receivers is provided by the adapter packa
- A special `emitter` operator, combined with a `converter` which together act as a bridge from the operator sequence to the
OpenTelemetry Collector's pipelines.

### Feature Gates

#### `stanza.synchronousLogEmitter`

The `stanza.synchronousLogEmitter` feature gate prevents possible data loss during an ungraceful shutdown of the collector by emitting logs in LogEmitter synchronously,
instead of batching the logs in LogEmitter's internal buffer. See related issue <https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/35456>.

LogEmitter is a component in Stanza that passes logs from Stanza pipeline to the collector's pipeline.
LogEmitter keeps an internal buffer of logs and only emits the logs as a single batch when the buffer is full (or when flush timeout elapses).
This was done in order to increase performance, as processing data in batches is much more performant than processing each entry separately.
However, this has the disadvantage of losing the data in the buffer in case of an ungraceful shutdown of the collector.
To prevent this, enable this feature gate to make LogEmitter synchronous, eliminating the risk of data loss.

Note that enabling this feature gate may have negative performance impact in some situations, see below.

The performance impact does not occur when using receivers based on Stanza inputs that support batching. Currently these are: File Log receiver. See caveat below.

The performance impact may be observed when using receivers based on Stanza inputs that do not support batching. Currently these are: Journald receiver, Named Pipe receiver, Syslog receiver, TCP Log receiver, UDP Log receiver, Windows EventLog receiver.

The caveat is that even when using a receiver that supports batching (like the File Log receiver), the performance impact may still be observed when additional operators are configured (see `operators` configuration option).
This is because Stanza transform operators currently don't support processing logs in batches, so even if the File Log receiver's File input operator creates a batch of logs,
the next operator in Stanza pipeline will split every batch into single entries.

The planned schedule for this feature gate is the following:

- Introduce as `Alpha` (disabled by default) in v0.122.0
- Move to `Beta` (enabled by default) after transform operators support batching and after all receivers that are selected to support batching support it

### FAQ

Q: Why don't we make every parser and transform operator into a distinct OpenTelemetry processor?
Expand Down
17 changes: 16 additions & 1 deletion pkg/stanza/adapter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/featuregate"
rcvr "go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/receiverhelper"

Expand All @@ -17,6 +18,14 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/pipeline"
)

var synchronousLogEmitterFeatureGate = featuregate.GlobalRegistry().MustRegister(
"stanza.synchronousLogEmitter",
featuregate.StageAlpha,
featuregate.WithRegisterDescription("Prevents possible data loss in Stanza-based receivers by emitting logs synchronously."),
featuregate.WithRegisterFromVersion("v0.122.0"),
featuregate.WithRegisterReferenceURL("https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/35456"),
)

// LogReceiverType is the interface used by stanza-based log receivers
type LogReceiverType interface {
Type() component.Type
Expand Down Expand Up @@ -69,7 +78,13 @@ func createLogsReceiver(logReceiverType LogReceiverType) rcvr.CreateLogsFunc {
emitterOpts = append(emitterOpts, helper.WithFlushInterval(baseCfg.flushInterval))
}

emitter := helper.NewLogEmitter(params.TelemetrySettings, rcv.consumeEntries, emitterOpts...)
var emitter helper.LogEmitter
if synchronousLogEmitterFeatureGate.IsEnabled() {
emitter = helper.NewSynchronousLogEmitter(params.TelemetrySettings, rcv.consumeEntries)
} else {
emitter = helper.NewBatchingLogEmitter(params.TelemetrySettings, rcv.consumeEntries, emitterOpts...)
}

pipe, err := pipeline.Config{
Operators: operators,
DefaultOutput: emitter,
Expand Down
2 changes: 1 addition & 1 deletion pkg/stanza/adapter/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func createNoopReceiver(nextConsumer consumer.Logs) (*receiver, error) {
obsrecv: obsrecv,
}

emitter := helper.NewLogEmitter(set, rcv.consumeEntries)
emitter := helper.NewBatchingLogEmitter(set, rcv.consumeEntries)

rcv.emitter = emitter
return rcv, nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/stanza/adapter/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type receiver struct {
id component.ID

pipe pipeline.Pipeline
emitter *helper.LogEmitter
emitter helper.LogEmitter
consumer consumer.Logs
obsrecv *receiverhelper.ObsReport

Expand Down
90 changes: 51 additions & 39 deletions pkg/stanza/adapter/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package adapter
import (
"context"
"fmt"
"math"
"os"
"path/filepath"
"sync/atomic"
Expand Down Expand Up @@ -164,46 +165,41 @@ func TestShutdownFlush(t *testing.T) {
)
}

func BenchmarkReceiver(b *testing.B) {
b.Run(
"1 Log entry per iteration",
func(b *testing.B) {
benchmarkReceiver(b, 1)
},
)
b.Run(
"10 Log entries per iteration",
func(b *testing.B) {
benchmarkReceiver(b, 10)
},
)
b.Run(
"100 Log entries per iteration",
func(b *testing.B) {
benchmarkReceiver(b, 100)
},
)
b.Run(
"1_000 Log entries per iteration",
func(b *testing.B) {
benchmarkReceiver(b, 1_000)
},
)
b.Run(
"10_000 Log entries per iteration",
func(b *testing.B) {
benchmarkReceiver(b, 10_000)
},
)
func BenchmarkReceiverWithBatchingLogEmitter(b *testing.B) {
for n := range 6 {
logEntries := int(math.Pow(10, float64(n)))
b.Run(fmt.Sprintf("%d logs", logEntries), func(b *testing.B) {
benchmarkReceiver(b, logEntries, false, true)
})
}
}

func benchmarkReceiver(b *testing.B, logsPerIteration int) {
func BenchmarkReceiverWithSynchronousLogEmitter(b *testing.B) {
for n := range 6 {
logEntries := int(math.Pow(10, float64(n)))
b.Run(fmt.Sprintf("%d logs", logEntries), func(b *testing.B) {
benchmarkReceiver(b, logEntries, false, false)
})
}
}

func BenchmarkReceiverWithSynchronousLogEmitterAndBatchingInput(b *testing.B) {
for n := range 6 {
logEntries := int(math.Pow(10, float64(n)))
b.Run(fmt.Sprintf("%d logs", logEntries), func(b *testing.B) {
benchmarkReceiver(b, logEntries, true, false)
})
}
}

func benchmarkReceiver(b *testing.B, logsPerIteration int, batchingInput, batchingLogEmitter bool) {
iterationComplete := make(chan struct{})
nextIteration := make(chan struct{})

inputBuilder := &testInputBuilder{
numberOfLogEntries: logsPerIteration,
nextIteration: nextIteration,
produceBatches: batchingInput,
}
inputCfg := operator.Config{
Builder: inputBuilder,
Expand All @@ -230,7 +226,12 @@ func benchmarkReceiver(b *testing.B, logsPerIteration int) {
}

set := componenttest.NewNopTelemetrySettings()
emitter := helper.NewLogEmitter(set, rcv.consumeEntries)
var emitter helper.LogEmitter
if batchingLogEmitter {
emitter = helper.NewBatchingLogEmitter(set, rcv.consumeEntries)
} else {
emitter = helper.NewSynchronousLogEmitter(set, rcv.consumeEntries)
}
defer func() {
require.NoError(b, emitter.Stop())
}()
Expand Down Expand Up @@ -306,7 +307,7 @@ pipeline:
}

set := componenttest.NewNopTelemetrySettings()
emitter := helper.NewLogEmitter(set, rcv.consumeEntries)
emitter := helper.NewBatchingLogEmitter(set, rcv.consumeEntries)
defer func() {
require.NoError(b, emitter.Stop())
}()
Expand Down Expand Up @@ -368,7 +369,7 @@ func BenchmarkParseAndMap(b *testing.B) {
require.NoError(b, yaml.Unmarshal([]byte(pipelineYaml), &operatorCfgs))

set := componenttest.NewNopTelemetrySettings()
emitter := helper.NewLogEmitter(set, func(_ context.Context, entries []*entry.Entry) {
emitter := helper.NewBatchingLogEmitter(set, func(_ context.Context, entries []*entry.Entry) {
for _, e := range entries {
convert(e)
}
Expand Down Expand Up @@ -407,6 +408,7 @@ const testInputOperatorTypeStr = "test_input"
type testInputBuilder struct {
numberOfLogEntries int
nextIteration chan struct{}
produceBatches bool
}

func (t *testInputBuilder) ID() string {
Expand All @@ -427,6 +429,7 @@ func (t *testInputBuilder) Build(settings component.TelemetrySettings) (operator
return &testInputOperator{
InputOperator: inputOperator,
numberOfLogEntries: t.numberOfLogEntries,
produceBatches: t.produceBatches,
nextIteration: t.nextIteration,
}, nil
}
Expand All @@ -438,6 +441,7 @@ var _ operator.Operator = &testInputOperator{}
type testInputOperator struct {
helper.InputOperator
numberOfLogEntries int
produceBatches bool
nextIteration chan struct{}
cancelFunc context.CancelFunc
}
Expand All @@ -455,21 +459,29 @@ func (t *testInputOperator) Start(_ operator.Persister) error {
t.cancelFunc = cancelFunc

e := complexEntry()
go func() {
go func(writeBatches bool) {
for {
select {
case <-t.nextIteration:
for i := 0; i < t.numberOfLogEntries; i++ {
_ = t.Write(context.Background(), e)
if writeBatches {
for i := 0; i < t.numberOfLogEntries; i += len(entries) {
_ = t.WriteBatch(context.Background(), entries)
}
} else {
for i := 0; i < t.numberOfLogEntries; i++ {
_ = t.Write(context.Background(), e)
}
}
case <-ctx.Done():
return
}
}
}()
}(t.produceBatches)
return nil
}

var entries = complexEntriesForNDifferentHosts(100, 4)

func (t *testInputOperator) Stop() error {
t.cancelFunc()
return nil
Expand Down
Loading