Skip to content

libbeat: panic send on closed channel in the Logstash output #46889

@mauri870

Description

@mauri870

This came from an SDH. Initially, this was reported to be affecting 8.17, but the reproducer is enough to trigger a panic in main as well.

The following test triggers the panic:

diff --git a/libbeat/outputs/logstash/async_test.go b/libbeat/outputs/logstash/async_test.go
index 483e110994..1167a45f13 100644
--- a/libbeat/outputs/logstash/async_test.go
+++ b/libbeat/outputs/logstash/async_test.go
@@ -25,9 +25,12 @@ import (
        "testing"
        "time"

+       "github.com/elastic/beats/v7/libbeat/beat"
+       "github.com/elastic/beats/v7/libbeat/common/transport/transptest"
        "github.com/elastic/beats/v7/libbeat/outputs"
        "github.com/elastic/beats/v7/libbeat/outputs/outest"
        "github.com/elastic/elastic-agent-libs/logp"
+       "github.com/elastic/elastic-agent-libs/mapstr"
        "github.com/elastic/elastic-agent-libs/transport"
 )

@@ -126,3 +129,46 @@ func (t *testAsyncDriver) Publish(batch *outest.Batch) {
 func (t *testAsyncDriver) Returns() []testClientReturn {
        return t.returns
 }
+
+func TestSendCloseDoesNotPanic(t *testing.T) {
+       for i := 0; i < 10; i++ {
+               testSendCloseDoesNotPanic(t)
+       }
+}
+
+func testSendCloseDoesNotPanic(t *testing.T) {
+       server := transptest.NewMockServerTCP(t, 50*time.Millisecond, "", nil)
+       defer server.Close()
+
+       transp, err := server.Connect()
+       if err != nil {
+               t.Fatalf("Failed to connect: %v", err)
+       }
+       defer transp.Close()
+
+       config := DefaultConfig()
+
+       logger, err := logp.NewDevelopmentLogger("")
+       if err != nil {
+               t.Fatalf("Failed to create logger: %v", err)
+       }
+
+       asyncClient, err := newAsyncClient(logger, "beat_version", transp, outputs.NewNilObserver(), &config)
+       if err != nil {
+               t.Fatalf("Failed to create async client: %v", err)
+       }
+
+       event := beat.Event{
+               Fields: mapstr.M{
+                       "message": "test event",
+               },
+       }
+
+       batch := outest.NewBatch(event)
+
+       go func() {
+               _ = asyncClient.Close()
+       }()
+
+       _ = asyncClient.Publish(t.Context(), batch)
+}
--- FAIL: TestSendCloseDoesNotPanic (0.00s)
panic: send on closed channel [recovered]
        panic: send on closed channel

goroutine 77 [running]:
testing.tRunner.func1.2({0x13e1080, 0x16fcc80})
        /home/mauri870/gopath/pkg/mod/golang.org/[email protected]/src/testing/testing.go:1734 +0x21c
testing.tRunner.func1()
        /home/mauri870/gopath/pkg/mod/golang.org/[email protected]/src/testing/testing.go:1737 +0x35e
panic({0x13e1080?, 0x16fcc80?})
        /home/mauri870/gopath/pkg/mod/golang.org/[email protected]/src/runtime/panic.go:792 +0x132
github.com/elastic/go-lumber/client/v2.(*AsyncClient).Send(0xc0001cf9b0, 0xc0001cfa10, {0xc0002b8370?, 0x1, 0x1254ed1?})
        /home/mauri870/gopath/pkg/mod/github.com/elastic/[email protected]/client/v2/async.go:121 +0x86
github.com/elastic/beats/v7/libbeat/outputs/logstash.(*asyncClient).sendEvents(0x9881fe?, 0xc0005e81e0, {0xc0005e8120, 0x1, 0xc0001afbb8?})
        /home/mauri870/git/elastic/beats/libbeat/outputs/logstash/async.go:223 +0x16e
github.com/elastic/beats/v7/libbeat/outputs/logstash.(*asyncClient).Publish(0xc000526480, {0xc0001cf9e0?, 0x1543e2c?}, {0x1712b30, 0xc000526580})
        /home/mauri870/git/elastic/beats/libbeat/outputs/logstash/async.go:167 +0x256
github.com/elastic/beats/v7/libbeat/outputs/logstash.testSendCloseDoesNotPanic(0xc000502380)
        /home/mauri870/git/elastic/beats/libbeat/outputs/logstash/async_test.go:173 +0x50c
github.com/elastic/beats/v7/libbeat/outputs/logstash.TestSendCloseDoesNotPanic(0xc000502380)
        /home/mauri870/git/elastic/beats/libbeat/outputs/logstash/async_test.go:135 +0x25
testing.tRunner(0xc000502380, 0x15f3730)
        /home/mauri870/gopath/pkg/mod/golang.org/[email protected]/src/testing/testing.go:1792 +0xf4
created by testing.(*T).Run in goroutine 1
        /home/mauri870/gopath/pkg/mod/golang.org/[email protected]/src/testing/testing.go:1851 +0x413
FAIL    github.com/elastic/beats/v7/libbeat/outputs/logstash    0.015s
FAIL

Metadata

Metadata

Assignees

Type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions