Skip to content

Commit 6b0a9b5

Browse files
atoulmesbylica-splunk
authored andcommitted
[receiver/splunkhec] fix memory leak (open-telemetry#36146)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description Fix memory leak by changing how we run obsreports for metrics and logs. <!-- Issue number (e.g. open-telemetry#1234) or full URL to issue, if applicable. --> #### Link to tracking issue Fixes open-telemetry#35294
1 parent 0c3d3ea commit 6b0a9b5

File tree

2 files changed

+62
-55
lines changed

2 files changed

+62
-55
lines changed

.chloggen/receiver_ops2.yaml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: bug_fix
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: splunkhecreceiver
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Avoid a memory leak by changing how we record obsreports for logs and metrics.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [35294]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: []

receiver/splunkhecreceiver/receiver.go

Lines changed: 35 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -212,27 +212,26 @@ func (r *splunkReceiver) processSuccessResponse(resp http.ResponseWriter, bodyCo
212212
}
213213

214214
func (r *splunkReceiver) handleAck(resp http.ResponseWriter, req *http.Request) {
215-
ctx := req.Context()
216215
if req.Method != http.MethodPost {
217-
r.failRequest(ctx, resp, http.StatusBadRequest, invalidMethodRespBodyPostOnly, 0, errInvalidMethod)
216+
r.failRequest(resp, http.StatusBadRequest, invalidMethodRespBodyPostOnly, errInvalidMethod)
218217
return
219218
}
220219

221220
// shouldn't run into this case since we only enable this handler IF ackExt exists. But we have this check just in case
222221
if r.ackExt == nil {
223-
r.failRequest(ctx, resp, http.StatusInternalServerError, errInternalServerError, 0, errExtensionMissing)
222+
r.failRequest(resp, http.StatusInternalServerError, errInternalServerError, errExtensionMissing)
224223
return
225224
}
226225

227226
var channelID string
228227
var extracted bool
229228
if channelID, extracted = r.extractChannel(req); extracted {
230229
if channelErr := r.validateChannelHeader(channelID); channelErr != nil {
231-
r.failRequest(ctx, resp, http.StatusBadRequest, []byte(channelErr.Error()), 0, channelErr)
230+
r.failRequest(resp, http.StatusBadRequest, []byte(channelErr.Error()), channelErr)
232231
return
233232
}
234233
} else {
235-
r.failRequest(ctx, resp, http.StatusBadRequest, requiredDataChannelHeader, 0, nil)
234+
r.failRequest(resp, http.StatusBadRequest, requiredDataChannelHeader, nil)
236235
return
237236
}
238237

@@ -241,19 +240,19 @@ func (r *splunkReceiver) handleAck(resp http.ResponseWriter, req *http.Request)
241240

242241
err := dec.Decode(&ackRequest)
243242
if err != nil {
244-
r.failRequest(ctx, resp, http.StatusBadRequest, invalidFormatRespBody, 0, err)
243+
r.failRequest(resp, http.StatusBadRequest, invalidFormatRespBody, err)
245244
return
246245
}
247246

248247
if len(ackRequest.Acks) == 0 {
249-
r.failRequest(ctx, resp, http.StatusBadRequest, invalidFormatRespBody, 0, errors.New("request body must include at least one ackID to be queried"))
248+
r.failRequest(resp, http.StatusBadRequest, invalidFormatRespBody, errors.New("request body must include at least one ackID to be queried"))
250249
return
251250
}
252251

253252
queriedAcks := r.ackExt.QueryAcks(channelID, ackRequest.Acks)
254253
ackString, _ := json.Marshal(queriedAcks)
255254
if err := r.processSuccessResponse(resp, []byte(fmt.Sprintf(ackResponse, ackString))); err != nil {
256-
r.failRequest(ctx, resp, http.StatusInternalServerError, errInternalServerError, 0, err)
255+
r.failRequest(resp, http.StatusInternalServerError, errInternalServerError, err)
257256
}
258257
}
259258

@@ -262,28 +261,28 @@ func (r *splunkReceiver) handleRawReq(resp http.ResponseWriter, req *http.Reques
262261
ctx = r.obsrecv.StartLogsOp(ctx)
263262

264263
if req.Method != http.MethodPost {
265-
r.failRequest(ctx, resp, http.StatusBadRequest, invalidMethodRespBodyPostOnly, 0, errInvalidMethod)
264+
r.failRequest(resp, http.StatusBadRequest, invalidMethodRespBodyPostOnly, errInvalidMethod)
266265
return
267266
}
268267

269268
encoding := req.Header.Get(httpContentEncodingHeader)
270269
if encoding != "" && encoding != gzipEncoding {
271-
r.failRequest(ctx, resp, http.StatusUnsupportedMediaType, invalidEncodingRespBody, 0, errInvalidEncoding)
270+
r.failRequest(resp, http.StatusUnsupportedMediaType, invalidEncodingRespBody, errInvalidEncoding)
272271
return
273272
}
274273

275274
var channelID string
276275
var extracted bool
277276
if channelID, extracted = r.extractChannel(req); extracted {
278277
if channelErr := r.validateChannelHeader(channelID); channelErr != nil {
279-
r.failRequest(ctx, resp, http.StatusBadRequest, []byte(channelErr.Error()), 0, channelErr)
278+
r.failRequest(resp, http.StatusBadRequest, []byte(channelErr.Error()), channelErr)
280279
return
281280
}
282281
}
283282

284283
if req.ContentLength == 0 {
285284
r.obsrecv.EndLogsOp(ctx, metadata.Type.String(), 0, nil)
286-
r.failRequest(ctx, resp, http.StatusBadRequest, noDataRespBody, 0, nil)
285+
r.failRequest(resp, http.StatusBadRequest, noDataRespBody, nil)
287286
return
288287
}
289288

@@ -293,7 +292,7 @@ func (r *splunkReceiver) handleRawReq(resp http.ResponseWriter, req *http.Reques
293292
err := reader.Reset(bodyReader)
294293

295294
if err != nil {
296-
r.failRequest(ctx, resp, http.StatusBadRequest, errGzipReaderRespBody, 0, err)
295+
r.failRequest(resp, http.StatusBadRequest, errGzipReaderRespBody, err)
297296
_, _ = io.ReadAll(req.Body)
298297
_ = req.Body.Close()
299298
return
@@ -311,23 +310,23 @@ func (r *splunkReceiver) handleRawReq(resp http.ResponseWriter, req *http.Reques
311310
err = errors.New("time cannot be less than 0")
312311
}
313312
if err != nil {
314-
r.failRequest(ctx, resp, http.StatusBadRequest, invalidFormatRespBody, 0, err)
313+
r.failRequest(resp, http.StatusBadRequest, invalidFormatRespBody, err)
315314
return
316315
}
317316
timestamp = pcommon.NewTimestampFromTime(time.Unix(t, 0))
318317
}
319318

320319
ld, slLen, err := splunkHecRawToLogData(bodyReader, query, resourceCustomizer, r.config, timestamp)
321320
if err != nil {
322-
r.failRequest(ctx, resp, http.StatusInternalServerError, errInternalServerError, slLen, err)
321+
r.failRequest(resp, http.StatusInternalServerError, errInternalServerError, err)
323322
return
324323
}
325324
consumerErr := r.logsConsumer.ConsumeLogs(ctx, ld)
326325

327326
_ = bodyReader.Close()
328327

329328
if consumerErr != nil {
330-
r.failRequest(ctx, resp, http.StatusInternalServerError, errInternalServerError, slLen, consumerErr)
329+
r.failRequest(resp, http.StatusInternalServerError, errInternalServerError, consumerErr)
331330
} else {
332331
var ackErr error
333332
if len(channelID) > 0 && r.ackExt != nil {
@@ -336,7 +335,7 @@ func (r *splunkReceiver) handleRawReq(resp http.ResponseWriter, req *http.Reques
336335
ackErr = r.processSuccessResponse(resp, okRespBody)
337336
}
338337
if ackErr != nil {
339-
r.failRequest(ctx, resp, http.StatusInternalServerError, errInternalServerError, ld.LogRecordCount(), err)
338+
r.failRequest(resp, http.StatusInternalServerError, errInternalServerError, err)
340339
} else {
341340
r.obsrecv.EndLogsOp(ctx, metadata.Type.String(), slLen, nil)
342341
}
@@ -377,28 +376,22 @@ func (r *splunkReceiver) validateChannelHeader(channelID string) error {
377376

378377
func (r *splunkReceiver) handleReq(resp http.ResponseWriter, req *http.Request) {
379378
ctx := req.Context()
380-
if r.logsConsumer != nil {
381-
ctx = r.obsrecv.StartLogsOp(ctx)
382-
}
383-
if r.metricsConsumer != nil {
384-
ctx = r.obsrecv.StartMetricsOp(ctx)
385-
}
386379

387380
if req.Method != http.MethodPost {
388-
r.failRequest(ctx, resp, http.StatusBadRequest, invalidMethodRespBodyPostOnly, 0, errInvalidMethod)
381+
r.failRequest(resp, http.StatusBadRequest, invalidMethodRespBodyPostOnly, errInvalidMethod)
389382
return
390383
}
391384

392385
encoding := req.Header.Get(httpContentEncodingHeader)
393386
if encoding != "" && encoding != gzipEncoding {
394-
r.failRequest(ctx, resp, http.StatusUnsupportedMediaType, invalidEncodingRespBody, 0, errInvalidEncoding)
387+
r.failRequest(resp, http.StatusUnsupportedMediaType, invalidEncodingRespBody, errInvalidEncoding)
395388
return
396389
}
397390

398391
channelID, extracted := r.extractChannel(req)
399392
if extracted {
400393
if channelErr := r.validateChannelHeader(channelID); channelErr != nil {
401-
r.failRequest(ctx, resp, http.StatusBadRequest, []byte(channelErr.Error()), 0, channelErr)
394+
r.failRequest(resp, http.StatusBadRequest, []byte(channelErr.Error()), channelErr)
402395
return
403396
}
404397
}
@@ -408,15 +401,15 @@ func (r *splunkReceiver) handleReq(resp http.ResponseWriter, req *http.Request)
408401
reader := r.gzipReaderPool.Get().(*gzip.Reader)
409402
err := reader.Reset(bodyReader)
410403
if err != nil {
411-
r.failRequest(ctx, resp, http.StatusBadRequest, errGzipReaderRespBody, 0, err)
404+
r.failRequest(resp, http.StatusBadRequest, errGzipReaderRespBody, err)
412405
return
413406
}
414407
bodyReader = reader
415408
defer r.gzipReaderPool.Put(reader)
416409
}
417410

418411
if req.ContentLength == 0 {
419-
r.failRequest(ctx, resp, http.StatusBadRequest, noDataRespBody, 0, nil)
412+
r.failRequest(resp, http.StatusBadRequest, noDataRespBody, nil)
420413
return
421414
}
422415

@@ -429,35 +422,35 @@ func (r *splunkReceiver) handleReq(resp http.ResponseWriter, req *http.Request)
429422
var msg splunk.Event
430423
err := dec.Decode(&msg)
431424
if err != nil {
432-
r.failRequest(ctx, resp, http.StatusBadRequest, invalidFormatRespBody, len(events)+len(metricEvents), err)
425+
r.failRequest(resp, http.StatusBadRequest, invalidFormatRespBody, err)
433426
return
434427
}
435428

436429
if msg.Event == nil {
437-
r.failRequest(ctx, resp, http.StatusBadRequest, eventRequiredRespBody, len(events)+len(metricEvents), nil)
430+
r.failRequest(resp, http.StatusBadRequest, eventRequiredRespBody, nil)
438431
return
439432
}
440433

441434
if msg.Event == "" {
442-
r.failRequest(ctx, resp, http.StatusBadRequest, eventBlankRespBody, len(events)+len(metricEvents), nil)
435+
r.failRequest(resp, http.StatusBadRequest, eventBlankRespBody, nil)
443436
return
444437
}
445438

446439
for _, v := range msg.Fields {
447440
if !isFlatJSONField(v) {
448-
r.failRequest(ctx, resp, http.StatusBadRequest, []byte(fmt.Sprintf(responseErrHandlingIndexedFields, len(events)+len(metricEvents))), len(events)+len(metricEvents), nil)
441+
r.failRequest(resp, http.StatusBadRequest, []byte(fmt.Sprintf(responseErrHandlingIndexedFields, len(events)+len(metricEvents))), nil)
449442
return
450443
}
451444
}
452445
if msg.IsMetric() {
453446
if r.metricsConsumer == nil {
454-
r.failRequest(ctx, resp, http.StatusBadRequest, errUnsupportedMetricEvent, len(metricEvents), err)
447+
r.failRequest(resp, http.StatusBadRequest, errUnsupportedMetricEvent, err)
455448
return
456449
}
457450
metricEvents = append(metricEvents, &msg)
458451
} else {
459452
if r.logsConsumer == nil {
460-
r.failRequest(ctx, resp, http.StatusBadRequest, errUnsupportedLogEvent, len(events), err)
453+
r.failRequest(resp, http.StatusBadRequest, errUnsupportedLogEvent, err)
461454
return
462455
}
463456
events = append(events, &msg)
@@ -468,21 +461,24 @@ func (r *splunkReceiver) handleReq(resp http.ResponseWriter, req *http.Request)
468461
if r.logsConsumer != nil && len(events) > 0 {
469462
ld, err := splunkHecToLogData(r.settings.Logger, events, resourceCustomizer, r.config)
470463
if err != nil {
471-
r.failRequest(ctx, resp, http.StatusBadRequest, errUnmarshalBodyRespBody, len(events), err)
464+
r.failRequest(resp, http.StatusBadRequest, errUnmarshalBodyRespBody, err)
472465
return
473466
}
467+
ctx = r.obsrecv.StartLogsOp(ctx)
474468
decodeErr := r.logsConsumer.ConsumeLogs(ctx, ld)
469+
r.obsrecv.EndLogsOp(ctx, metadata.Type.String(), len(events), nil)
475470
if decodeErr != nil {
476-
r.failRequest(ctx, resp, http.StatusInternalServerError, errInternalServerError, len(events), decodeErr)
471+
r.failRequest(resp, http.StatusInternalServerError, errInternalServerError, decodeErr)
477472
return
478473
}
479474
}
480475
if r.metricsConsumer != nil && len(metricEvents) > 0 {
481476
md, _ := splunkHecToMetricsData(r.settings.Logger, metricEvents, resourceCustomizer, r.config)
482-
477+
ctx = r.obsrecv.StartMetricsOp(ctx)
483478
decodeErr := r.metricsConsumer.ConsumeMetrics(ctx, md)
479+
r.obsrecv.EndMetricsOp(ctx, metadata.Type.String(), len(metricEvents), nil)
484480
if decodeErr != nil {
485-
r.failRequest(ctx, resp, http.StatusInternalServerError, errInternalServerError, len(metricEvents), decodeErr)
481+
r.failRequest(resp, http.StatusInternalServerError, errInternalServerError, decodeErr)
486482
return
487483
}
488484
}
@@ -494,14 +490,7 @@ func (r *splunkReceiver) handleReq(resp http.ResponseWriter, req *http.Request)
494490
ackErr = r.processSuccessResponse(resp, okRespBody)
495491
}
496492
if ackErr != nil {
497-
r.failRequest(ctx, resp, http.StatusInternalServerError, errInternalServerError, len(events)+len(metricEvents), ackErr)
498-
} else {
499-
if r.logsConsumer != nil {
500-
r.obsrecv.EndLogsOp(ctx, metadata.Type.String(), len(events), nil)
501-
}
502-
if r.metricsConsumer != nil {
503-
r.obsrecv.EndMetricsOp(ctx, metadata.Type.String(), len(metricEvents), nil)
504-
}
493+
r.failRequest(resp, http.StatusInternalServerError, errInternalServerError, ackErr)
505494
}
506495
}
507496

@@ -519,11 +508,9 @@ func (r *splunkReceiver) createResourceCustomizer(req *http.Request) func(resour
519508
}
520509

521510
func (r *splunkReceiver) failRequest(
522-
ctx context.Context,
523511
resp http.ResponseWriter,
524512
httpStatusCode int,
525513
jsonResponse []byte,
526-
numRecordsReceived int,
527514
err error,
528515
) {
529516
resp.WriteHeader(httpStatusCode)
@@ -536,13 +523,6 @@ func (r *splunkReceiver) failRequest(
536523
}
537524
}
538525

539-
if r.logsConsumer != nil {
540-
r.obsrecv.EndLogsOp(ctx, metadata.Type.String(), numRecordsReceived, err)
541-
}
542-
if r.metricsConsumer != nil {
543-
r.obsrecv.EndMetricsOp(ctx, metadata.Type.String(), numRecordsReceived, err)
544-
}
545-
546526
if r.settings.Logger.Core().Enabled(zap.DebugLevel) {
547527
msg := string(jsonResponse)
548528
r.settings.Logger.Debug(

0 commit comments

Comments
 (0)