Skip to content

Commit 6fa23c0

Browse files
[pkg/stanza] Remove batching in LogEmitter behind feature gate (#38428)
#### Description Removes batching in LogEmitter to prevent data loss during ungraceful shutdown of the collector. See #35456 for details. This is done behind a feature gate, as it may have a negative performance impact, depending on user configuration. See the added documentation for the feature gate. On implementation side, this was done by renaming the existing `LogEmitter` struct to `BatchingLogEmitter` and introducing a new `SynchronousLogEmitter`, see `pkg/stanza/adapter/emitter.go`. #### Link to tracking issue - Fixes #35456 #### Testing Added unit tests in `pkg/stanza/adapter/emitter_test.go`. Adapted the benchmarks `pkg/stanza/adapter/receiver_test.go` to run for both the existing BatchingLogEmitter and the new SynchronousLogEmitter. #### Documentation Added documentation for the feature gate.
1 parent 77de675 commit 6fa23c0

File tree

13 files changed

+239
-72
lines changed

13 files changed

+239
-72
lines changed
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: pkg/stanza
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Prevent data loss in Stanza-based receivers on ungraceful shutdown of the collector
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [35456]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: |
19+
Enable the `stanza.synchronousLogEmitter` feature gate to unlock this feature.
20+
See the [documentation](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/stanza/README.md) for more information.
21+
22+
23+
# If your change doesn't affect end users or the exported elements of any package,
24+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
25+
# Optional: The change log or logs in which this entry should be included.
26+
# e.g. '[user]' or '[user, api]'
27+
# Include 'user' if the change is relevant to end users.
28+
# Include 'api' if there is a change to a library API.
29+
# Default: '[user]'
30+
change_logs: []

pkg/stanza/README.md

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,34 @@ Common functionality for all of these receivers is provided by the adapter packa
4949
- A special `emitter` operator, combined with a `converter` which together act as a bridge from the operator sequence to the
5050
OpenTelemetry Collector's pipelines.
5151

52+
### Feature Gates
53+
54+
#### `stanza.synchronousLogEmitter`
55+
56+
The `stanza.synchronousLogEmitter` feature gate prevents possible data loss during an ungraceful shutdown of the collector by emitting logs in LogEmitter synchronously,
57+
instead of batching the logs in LogEmitter's internal buffer. See related issue <https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/35456>.
58+
59+
LogEmitter is a component in Stanza that passes logs from Stanza pipeline to the collector's pipeline.
60+
LogEmitter keeps an internal buffer of logs and only emits the logs as a single batch when the buffer is full (or when flush timeout elapses).
61+
This was done in order to increase performance, as processing data in batches is much more performant than processing each entry separately.
62+
However, this has the disadvantage of losing the data in the buffer in case of an ungraceful shutdown of the collector.
63+
To prevent this, enable this feature gate to make LogEmitter synchronous, eliminating the risk of data loss.
64+
65+
Note that enabling this feature gate may have negative performance impact in some situations, see below.
66+
67+
The performance impact does not occur when using receivers based on Stanza inputs that support batching. Currently these are: File Log receiver. See caveat below.
68+
69+
The performance impact may be observed when using receivers based on Stanza inputs that do not support batching. Currently these are: Journald receiver, Named Pipe receiver, Syslog receiver, TCP Log receiver, UDP Log receiver, Windows EventLog receiver.
70+
71+
The caveat is that even when using a receiver that supports batching (like the File Log receiver), the performance impact may still be observed when additional operators are configured (see `operators` configuration option).
72+
This is because Stanza transform operators currently don't support processing logs in batches, so even if the File Log receiver's File input operator creates a batch of logs,
73+
the next operator in Stanza pipeline will split every batch into single entries.
74+
75+
The planned schedule for this feature gate is the following:
76+
77+
- Introduce as `Alpha` (disabled by default) in v0.122.0
78+
- Move to `Beta` (enabled by default) after transform operators support batching and after all receivers that are selected to support batching support it
79+
5280
### FAQ
5381

5482
Q: Why don't we make every parser and transform operator into a distinct OpenTelemetry processor?

pkg/stanza/adapter/factory.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88

99
"go.opentelemetry.io/collector/component"
1010
"go.opentelemetry.io/collector/consumer"
11+
"go.opentelemetry.io/collector/featuregate"
1112
rcvr "go.opentelemetry.io/collector/receiver"
1213
"go.opentelemetry.io/collector/receiver/receiverhelper"
1314

@@ -17,6 +18,14 @@ import (
1718
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/pipeline"
1819
)
1920

21+
var synchronousLogEmitterFeatureGate = featuregate.GlobalRegistry().MustRegister(
22+
"stanza.synchronousLogEmitter",
23+
featuregate.StageAlpha,
24+
featuregate.WithRegisterDescription("Prevents possible data loss in Stanza-based receivers by emitting logs synchronously."),
25+
featuregate.WithRegisterFromVersion("v0.122.0"),
26+
featuregate.WithRegisterReferenceURL("https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/35456"),
27+
)
28+
2029
// LogReceiverType is the interface used by stanza-based log receivers
2130
type LogReceiverType interface {
2231
Type() component.Type
@@ -69,7 +78,13 @@ func createLogsReceiver(logReceiverType LogReceiverType) rcvr.CreateLogsFunc {
6978
emitterOpts = append(emitterOpts, helper.WithFlushInterval(baseCfg.flushInterval))
7079
}
7180

72-
emitter := helper.NewLogEmitter(params.TelemetrySettings, rcv.consumeEntries, emitterOpts...)
81+
var emitter helper.LogEmitter
82+
if synchronousLogEmitterFeatureGate.IsEnabled() {
83+
emitter = helper.NewSynchronousLogEmitter(params.TelemetrySettings, rcv.consumeEntries)
84+
} else {
85+
emitter = helper.NewBatchingLogEmitter(params.TelemetrySettings, rcv.consumeEntries, emitterOpts...)
86+
}
87+
7388
pipe, err := pipeline.Config{
7489
Operators: operators,
7590
DefaultOutput: emitter,

pkg/stanza/adapter/integration_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ func createNoopReceiver(nextConsumer consumer.Logs) (*receiver, error) {
5656
obsrecv: obsrecv,
5757
}
5858

59-
emitter := helper.NewLogEmitter(set, rcv.consumeEntries)
59+
emitter := helper.NewBatchingLogEmitter(set, rcv.consumeEntries)
6060

6161
rcv.emitter = emitter
6262
return rcv, nil

pkg/stanza/adapter/receiver.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ type receiver struct {
2525
id component.ID
2626

2727
pipe pipeline.Pipeline
28-
emitter *helper.LogEmitter
28+
emitter helper.LogEmitter
2929
consumer consumer.Logs
3030
obsrecv *receiverhelper.ObsReport
3131

pkg/stanza/adapter/receiver_test.go

Lines changed: 51 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package adapter
66
import (
77
"context"
88
"fmt"
9+
"math"
910
"os"
1011
"path/filepath"
1112
"sync/atomic"
@@ -164,46 +165,41 @@ func TestShutdownFlush(t *testing.T) {
164165
)
165166
}
166167

167-
func BenchmarkReceiver(b *testing.B) {
168-
b.Run(
169-
"1 Log entry per iteration",
170-
func(b *testing.B) {
171-
benchmarkReceiver(b, 1)
172-
},
173-
)
174-
b.Run(
175-
"10 Log entries per iteration",
176-
func(b *testing.B) {
177-
benchmarkReceiver(b, 10)
178-
},
179-
)
180-
b.Run(
181-
"100 Log entries per iteration",
182-
func(b *testing.B) {
183-
benchmarkReceiver(b, 100)
184-
},
185-
)
186-
b.Run(
187-
"1_000 Log entries per iteration",
188-
func(b *testing.B) {
189-
benchmarkReceiver(b, 1_000)
190-
},
191-
)
192-
b.Run(
193-
"10_000 Log entries per iteration",
194-
func(b *testing.B) {
195-
benchmarkReceiver(b, 10_000)
196-
},
197-
)
168+
func BenchmarkReceiverWithBatchingLogEmitter(b *testing.B) {
169+
for n := range 6 {
170+
logEntries := int(math.Pow(10, float64(n)))
171+
b.Run(fmt.Sprintf("%d logs", logEntries), func(b *testing.B) {
172+
benchmarkReceiver(b, logEntries, false, true)
173+
})
174+
}
198175
}
199176

200-
func benchmarkReceiver(b *testing.B, logsPerIteration int) {
177+
func BenchmarkReceiverWithSynchronousLogEmitter(b *testing.B) {
178+
for n := range 6 {
179+
logEntries := int(math.Pow(10, float64(n)))
180+
b.Run(fmt.Sprintf("%d logs", logEntries), func(b *testing.B) {
181+
benchmarkReceiver(b, logEntries, false, false)
182+
})
183+
}
184+
}
185+
186+
func BenchmarkReceiverWithSynchronousLogEmitterAndBatchingInput(b *testing.B) {
187+
for n := range 6 {
188+
logEntries := int(math.Pow(10, float64(n)))
189+
b.Run(fmt.Sprintf("%d logs", logEntries), func(b *testing.B) {
190+
benchmarkReceiver(b, logEntries, true, false)
191+
})
192+
}
193+
}
194+
195+
func benchmarkReceiver(b *testing.B, logsPerIteration int, batchingInput, batchingLogEmitter bool) {
201196
iterationComplete := make(chan struct{})
202197
nextIteration := make(chan struct{})
203198

204199
inputBuilder := &testInputBuilder{
205200
numberOfLogEntries: logsPerIteration,
206201
nextIteration: nextIteration,
202+
produceBatches: batchingInput,
207203
}
208204
inputCfg := operator.Config{
209205
Builder: inputBuilder,
@@ -230,7 +226,12 @@ func benchmarkReceiver(b *testing.B, logsPerIteration int) {
230226
}
231227

232228
set := componenttest.NewNopTelemetrySettings()
233-
emitter := helper.NewLogEmitter(set, rcv.consumeEntries)
229+
var emitter helper.LogEmitter
230+
if batchingLogEmitter {
231+
emitter = helper.NewBatchingLogEmitter(set, rcv.consumeEntries)
232+
} else {
233+
emitter = helper.NewSynchronousLogEmitter(set, rcv.consumeEntries)
234+
}
234235
defer func() {
235236
require.NoError(b, emitter.Stop())
236237
}()
@@ -306,7 +307,7 @@ pipeline:
306307
}
307308

308309
set := componenttest.NewNopTelemetrySettings()
309-
emitter := helper.NewLogEmitter(set, rcv.consumeEntries)
310+
emitter := helper.NewBatchingLogEmitter(set, rcv.consumeEntries)
310311
defer func() {
311312
require.NoError(b, emitter.Stop())
312313
}()
@@ -368,7 +369,7 @@ func BenchmarkParseAndMap(b *testing.B) {
368369
require.NoError(b, yaml.Unmarshal([]byte(pipelineYaml), &operatorCfgs))
369370

370371
set := componenttest.NewNopTelemetrySettings()
371-
emitter := helper.NewLogEmitter(set, func(_ context.Context, entries []*entry.Entry) {
372+
emitter := helper.NewBatchingLogEmitter(set, func(_ context.Context, entries []*entry.Entry) {
372373
for _, e := range entries {
373374
convert(e)
374375
}
@@ -407,6 +408,7 @@ const testInputOperatorTypeStr = "test_input"
407408
type testInputBuilder struct {
408409
numberOfLogEntries int
409410
nextIteration chan struct{}
411+
produceBatches bool
410412
}
411413

412414
func (t *testInputBuilder) ID() string {
@@ -427,6 +429,7 @@ func (t *testInputBuilder) Build(settings component.TelemetrySettings) (operator
427429
return &testInputOperator{
428430
InputOperator: inputOperator,
429431
numberOfLogEntries: t.numberOfLogEntries,
432+
produceBatches: t.produceBatches,
430433
nextIteration: t.nextIteration,
431434
}, nil
432435
}
@@ -438,6 +441,7 @@ var _ operator.Operator = &testInputOperator{}
438441
type testInputOperator struct {
439442
helper.InputOperator
440443
numberOfLogEntries int
444+
produceBatches bool
441445
nextIteration chan struct{}
442446
cancelFunc context.CancelFunc
443447
}
@@ -455,21 +459,29 @@ func (t *testInputOperator) Start(_ operator.Persister) error {
455459
t.cancelFunc = cancelFunc
456460

457461
e := complexEntry()
458-
go func() {
462+
go func(writeBatches bool) {
459463
for {
460464
select {
461465
case <-t.nextIteration:
462-
for i := 0; i < t.numberOfLogEntries; i++ {
463-
_ = t.Write(context.Background(), e)
466+
if writeBatches {
467+
for i := 0; i < t.numberOfLogEntries; i += len(entries) {
468+
_ = t.WriteBatch(context.Background(), entries)
469+
}
470+
} else {
471+
for i := 0; i < t.numberOfLogEntries; i++ {
472+
_ = t.Write(context.Background(), e)
473+
}
464474
}
465475
case <-ctx.Done():
466476
return
467477
}
468478
}
469-
}()
479+
}(t.produceBatches)
470480
return nil
471481
}
472482

483+
var entries = complexEntriesForNDifferentHosts(100, 4)
484+
473485
func (t *testInputOperator) Stop() error {
474486
t.cancelFunc()
475487
return nil

0 commit comments

Comments
 (0)