Skip to content

Commit 0880968

Browse files
[exporter/splunkhec] Implement maxEventSize on exporter side (#20752)
Added a new config to apply the maxEventSize on our data. - It applies this limit on uncompressed data Link to tracking Issue: #18066 --------- Co-authored-by: Antoine Toulme <[email protected]>
1 parent 6f0cc7a commit 0880968

File tree

7 files changed

+103
-6
lines changed

7 files changed

+103
-6
lines changed

.chloggen/max_event_size.yaml

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
2+
change_type: enhancement
3+
4+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
5+
component: splunkhecexporter
6+
7+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
8+
note: Apply a new config `maxEventSize` on our events, it would break down the events based on this. This basically implements HEC's `maxEventSize` on our exporter. It is implemented on uncompressed data.
9+
10+
# One or more tracking issues related to the change
11+
issues: [20290]
12+
13+
# (Optional) One or more lines of additional information to render under the primary note.
14+
# These lines will be padded with 2 spaces and then inserted directly into the document.
15+
# Use pipe (|) for multiline entries.
16+
subtext:

exporter/splunkhecexporter/buffer.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,19 +43,22 @@ type bufferState struct {
4343
compressionAvailable bool
4444
compressionEnabled bool
4545
bufferMaxLen uint
46+
maxEventLength uint
4647
writer io.Writer
4748
buf *bytes.Buffer
4849
bufFront *index
4950
resource int
5051
library int
5152
containsData bool
53+
rawLength int
5254
}
5355

5456
func (b *bufferState) reset() {
5557
b.buf.Reset()
5658
b.compressionEnabled = false
5759
b.writer = &cancellableBytesWriter{innerWriter: b.buf, maxCapacity: b.bufferMaxLen}
5860
b.containsData = false
61+
b.rawLength = 0
5962
}
6063

6164
func (b *bufferState) Read(p []byte) (n int, err error) {
@@ -71,6 +74,9 @@ func (b *bufferState) Close() error {
7174

7275
// accept returns true if data is accepted by the buffer
7376
func (b *bufferState) accept(data []byte) (bool, error) {
77+
if len(data)+b.rawLength > int(b.maxEventLength) {
78+
return false, nil
79+
}
7480
_, err := b.writer.Write(data)
7581
overCapacity := errors.Is(err, errOverCapacity)
7682
bufLen := b.buf.Len()
@@ -111,13 +117,15 @@ func (b *bufferState) accept(data []byte) (bool, error) {
111117
}
112118

113119
}
120+
b.rawLength += len(data)
114121
b.containsData = true
115122
return true, nil
116123
}
117124
if overCapacity {
118125
return false, nil
119126
}
120127
b.containsData = true
128+
b.rawLength += len(data)
121129
return true, err
122130
}
123131

@@ -180,7 +188,7 @@ func (c *cancellableGzipWriter) close() error {
180188
return c.innerWriter.Close()
181189
}
182190

183-
func makeBlankBufferState(bufCap uint, compressionAvailable bool) *bufferState {
191+
func makeBlankBufferState(bufCap uint, compressionAvailable bool, maxEventLength uint) *bufferState {
184192
// Buffer of JSON encoded Splunk events, last record is expected to overflow bufCap, hence the padding
185193
buf := bytes.NewBuffer(make([]byte, 0, bufCap+bufCapPadding))
186194

@@ -190,6 +198,7 @@ func makeBlankBufferState(bufCap uint, compressionAvailable bool) *bufferState {
190198
writer: &cancellableBytesWriter{innerWriter: buf, maxCapacity: bufCap},
191199
buf: buf,
192200
bufferMaxLen: bufCap,
201+
maxEventLength: maxEventLength,
193202
bufFront: nil, // Index of the log record of the first unsent event in buffer.
194203
resource: 0, // Index of currently processed Resource
195204
library: 0, // Index of currently processed Library

exporter/splunkhecexporter/client.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -124,8 +124,8 @@ func (c *client) pushLogDataInBatches(ctx context.Context, ld plog.Logs, headers
124124
profilingLocalHeaders[k] = v
125125
}
126126

127-
var bufState = makeBlankBufferState(c.config.MaxContentLengthLogs, !c.config.DisableCompression)
128-
var profilingBufState = makeBlankBufferState(c.config.MaxContentLengthLogs, !c.config.DisableCompression)
127+
var bufState = makeBlankBufferState(c.config.MaxContentLengthLogs, !c.config.DisableCompression, c.config.MaxEventSize)
128+
var profilingBufState = makeBlankBufferState(c.config.MaxContentLengthLogs, !c.config.DisableCompression, c.config.MaxEventSize)
129129
var permanentErrors []error
130130

131131
var rls = ld.ResourceLogs()
@@ -395,7 +395,7 @@ func (c *client) pushTracesData(ctx context.Context, tds ptrace.ResourceSpansSli
395395
// The batch content length is restricted to MaxContentLengthMetrics.
396396
// md metrics are parsed to Splunk events.
397397
func (c *client) pushMetricsDataInBatches(ctx context.Context, md pmetric.Metrics, headers map[string]string) error {
398-
var bufState = makeBlankBufferState(c.config.MaxContentLengthMetrics, !c.config.DisableCompression)
398+
var bufState = makeBlankBufferState(c.config.MaxContentLengthMetrics, !c.config.DisableCompression, c.config.MaxEventSize)
399399
var permanentErrors []error
400400

401401
var rms = md.ResourceMetrics()
@@ -430,7 +430,7 @@ func (c *client) pushMetricsDataInBatches(ctx context.Context, md pmetric.Metric
430430
// The batch content length is restricted to MaxContentLengthMetrics.
431431
// td traces are parsed to Splunk events.
432432
func (c *client) pushTracesDataInBatches(ctx context.Context, td ptrace.Traces, headers map[string]string) error {
433-
bufState := makeBlankBufferState(c.config.MaxContentLengthTraces, !c.config.DisableCompression)
433+
bufState := makeBlankBufferState(c.config.MaxContentLengthTraces, !c.config.DisableCompression, c.config.MaxEventSize)
434434
var permanentErrors []error
435435

436436
var rts = td.ResourceSpans()

exporter/splunkhecexporter/client_test.go

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -694,6 +694,54 @@ func TestReceiveLogs(t *testing.T) {
694694
wantErr: "timeout", // our server will time out waiting for the data.
695695
},
696696
},
697+
{
698+
name: "two events with 2000 bytes, one with 2000 bytes, then one with 20000 bytes",
699+
logs: func() plog.Logs {
700+
firstLog := createLogData(1, 1, 3)
701+
firstLog.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Body().SetStr(repeatableString(2000))
702+
firstLog.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(1).Body().SetStr(repeatableString(2000))
703+
firstLog.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(2).Body().SetStr(repeatableString(20000))
704+
return firstLog
705+
}(),
706+
conf: func() *Config {
707+
cfg := NewFactory().CreateDefaultConfig().(*Config)
708+
cfg.MaxEventSize = 20000 // small so we can reproduce without allocating big logs.
709+
cfg.DisableCompression = true
710+
return cfg
711+
}(),
712+
want: wantType{
713+
batches: [][]string{
714+
{`"otel.log.name":"0_0_0"`, `"otel.log.name":"0_0_1"`},
715+
},
716+
numBatches: 1,
717+
},
718+
},
719+
{
720+
name: "two events with 2000 bytes, one with 1000 bytes, then one with 4200 bytes",
721+
logs: func() plog.Logs {
722+
firstLog := createLogData(1, 1, 5)
723+
firstLog.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Body().SetStr(repeatableString(2000))
724+
firstLog.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(1).Body().SetStr(repeatableString(2000))
725+
firstLog.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(2).Body().SetStr(repeatableString(1000))
726+
firstLog.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(3).Body().SetStr(repeatableString(4200))
727+
return firstLog
728+
}(),
729+
conf: func() *Config {
730+
cfg := NewFactory().CreateDefaultConfig().(*Config)
731+
cfg.MaxEventSize = 10000 // small so we can reproduce without allocating big logs.
732+
cfg.MaxContentLengthLogs = 5000
733+
cfg.DisableCompression = true
734+
return cfg
735+
}(),
736+
want: wantType{
737+
batches: [][]string{
738+
{`"otel.log.name":"0_0_0"`, `"otel.log.name":"0_0_1"`},
739+
{`"otel.log.name":"0_0_2"`},
740+
{`"otel.log.name":"0_0_3"`, `"otel.log.name":"0_0_4"`},
741+
},
742+
numBatches: 3,
743+
},
744+
},
697745
}
698746

699747
for _, test := range tests {
@@ -1559,7 +1607,7 @@ func BenchmarkPushLogRecords(b *testing.B) {
15591607
hecWorker: &mockHecWorker{},
15601608
}
15611609

1562-
state := makeBlankBufferState(4096, true)
1610+
state := makeBlankBufferState(4096, true, 4096)
15631611
for n := 0; n < b.N; n++ {
15641612
permanentErrs, sendingErr := c.pushLogRecords(context.Background(), logs.ResourceLogs(), state, map[string]string{})
15651613
assert.NoError(b, sendingErr)

exporter/splunkhecexporter/config.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,11 @@ const (
3333
defaultContentLengthLogsLimit = 2 * 1024 * 1024
3434
defaultContentLengthMetricsLimit = 2 * 1024 * 1024
3535
defaultContentLengthTracesLimit = 2 * 1024 * 1024
36+
defaultMaxEventSize = 5 * 1024 * 1024
3637
maxContentLengthLogsLimit = 800 * 1024 * 1024
3738
maxContentLengthMetricsLimit = 800 * 1024 * 1024
3839
maxContentLengthTracesLimit = 800 * 1024 * 1024
40+
maxMaxEventSize = 800 * 1024 * 1024
3941
)
4042

4143
// OtelToHecFields defines the mapping of attributes to HEC fields
@@ -90,6 +92,10 @@ type Config struct {
9092
// Maximum allowed value is 838860800 (~ 800 MB).
9193
MaxContentLengthTraces uint `mapstructure:"max_content_length_traces"`
9294

95+
// Maximum payload size, raw uncompressed. Default value is 5242880 bytes (5MiB).
96+
// Maximum allowed value is 838860800 (~ 800 MB).
97+
MaxEventSize uint `mapstructure:"max_event_size"`
98+
9399
// App name is used to track telemetry information for Splunk App's using HEC by App name. Defaults to "OpenTelemetry Collector Contrib".
94100
SplunkAppName string `mapstructure:"splunk_app_name"`
95101

@@ -153,6 +159,11 @@ func (cfg *Config) Validate() error {
153159
if cfg.MaxContentLengthTraces > maxContentLengthTracesLimit {
154160
return fmt.Errorf(`requires "max_content_length_traces" <= %d`, maxContentLengthTracesLimit)
155161
}
162+
163+
if cfg.MaxEventSize > maxMaxEventSize {
164+
return fmt.Errorf(`requires "max_event_size" <= %d`, maxMaxEventSize)
165+
}
166+
156167
if err := cfg.QueueSettings.Validate(); err != nil {
157168
return fmt.Errorf("sending_queue settings has invalid configuration: %w", err)
158169
}

exporter/splunkhecexporter/config_test.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ func TestLoadConfig(t *testing.T) {
6565
LogDataEnabled: true,
6666
ProfilingDataEnabled: true,
6767
ExportRaw: true,
68+
MaxEventSize: 5 * 1024 * 1024,
6869
MaxContentLengthLogs: 2 * 1024 * 1024,
6970
MaxContentLengthMetrics: 2 * 1024 * 1024,
7071
MaxContentLengthTraces: 2 * 1024 * 1024,
@@ -190,6 +191,17 @@ func TestConfig_Validate(t *testing.T) {
190191
}(),
191192
wantErr: "requires \"max_content_length_traces\" <= 838860800",
192193
},
194+
{
195+
name: "max default event-size",
196+
cfg: func() *Config {
197+
cfg := createDefaultConfig().(*Config)
198+
cfg.HTTPClientSettings.Endpoint = "http://foo_bar.com"
199+
cfg.MaxEventSize = maxMaxEventSize + 1
200+
cfg.Token = "foo"
201+
return cfg
202+
}(),
203+
wantErr: "requires \"max_event_size\" <= 838860800",
204+
},
193205
}
194206

195207
for _, tt := range tests {

exporter/splunkhecexporter/factory.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ func createDefaultConfig() component.Config {
8080
MaxContentLengthLogs: defaultContentLengthLogsLimit,
8181
MaxContentLengthMetrics: defaultContentLengthMetricsLimit,
8282
MaxContentLengthTraces: defaultContentLengthTracesLimit,
83+
MaxEventSize: defaultMaxEventSize,
8384
HecToOtelAttrs: splunk.HecToOtelAttrs{
8485
Source: splunk.DefaultSourceLabel,
8586
SourceType: splunk.DefaultSourceTypeLabel,

0 commit comments

Comments
 (0)