From 9d93d88814118a664360a3357f37bce09f32d09a Mon Sep 17 00:00:00 2001 From: "Lee E. Hinman" Date: Mon, 29 Sep 2025 12:13:18 -0500 Subject: [PATCH 1/4] fix race condition in beat LoadMeta --- libbeat/cmd/instance/beat.go | 21 ++++++++++----------- libbeat/cmd/instance/beat_test.go | 27 +++++++++++++++++++++++---- 2 files changed, 33 insertions(+), 15 deletions(-) diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index aa93db94b652..4999f327c8be 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -953,6 +953,7 @@ func (b *Beat) LoadMeta(metaPath string) error { return fmt.Errorf("meta file failed to open: %w", err) } + // file exists, read and load the meta data if err == nil { m := meta{} if err := json.NewDecoder(f).Decode(&m); err != nil && err != io.EOF { @@ -978,30 +979,28 @@ func (b *Beat) LoadMeta(metaPath string) error { // file does not exist or ID is invalid or first start time is not defined, let's create a new one // write temporary file first - tempFile := metaPath + ".new" - f, err = os.OpenFile(tempFile, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0o600) + tmpFile, err := os.CreateTemp("", "beat_meta") if err != nil { - return fmt.Errorf("failed to create Beat meta file: %w", err) + return fmt.Errorf("failed to create temporary Beat meta file: %w", err) } - encodeErr := json.NewEncoder(f).Encode(meta{UUID: b.Info.ID, FirstStart: b.Info.FirstStart}) - err = f.Sync() + encodeErr := json.NewEncoder(tmpFile).Encode(meta{UUID: b.Info.ID, FirstStart: b.Info.FirstStart}) + err = tmpFile.Sync() if err != nil { - return fmt.Errorf("Beat meta file failed to write: %w", err) + return fmt.Errorf("beat meta file failed to sync data: %w", err) } - err = f.Close() + err = tmpFile.Close() if err != nil { - return fmt.Errorf("Beat meta file failed to write: %w", err) + return fmt.Errorf("beat meta file failed to close: %w", err) } if encodeErr != nil { - return fmt.Errorf("Beat meta file failed to write: %w", encodeErr) + return fmt.Errorf("beat meta file failed to encode vaules: %w", encodeErr) } // move temporary file into final location - err = file.SafeFileRotate(metaPath, tempFile) - return err + return file.SafeFileRotate(metaPath, tmpFile.Name()) } func openRegular(filename string) (*os.File, error) { diff --git a/libbeat/cmd/instance/beat_test.go b/libbeat/cmd/instance/beat_test.go index 8c85d70616f9..b1318540157b 100644 --- a/libbeat/cmd/instance/beat_test.go +++ b/libbeat/cmd/instance/beat_test.go @@ -22,8 +22,9 @@ package instance import ( "bytes" "flag" - "io/ioutil" "os" + "path/filepath" + "sync" "testing" "github.com/elastic/beats/v7/libbeat/beat" @@ -134,7 +135,7 @@ func TestEmptyMetaJson(t *testing.T) { } // prepare empty meta file - metaFile, err := ioutil.TempFile("../test", "meta.json") + metaFile, err := os.CreateTemp("../test", "meta.json") assert.Equal(t, nil, err, "Unable to create temporary meta file") metaPath := metaFile.Name() @@ -157,7 +158,7 @@ func TestMetaJsonWithTimestamp(t *testing.T) { } firstStart := firstBeat.Info.FirstStart - metaFile, err := ioutil.TempFile("../test", "meta.json") + metaFile, err := os.CreateTemp("../test", "meta.json") assert.Equal(t, nil, err, "Unable to create temporary meta file") metaPath := metaFile.Name() @@ -504,7 +505,7 @@ func (m mockManager) Enabled() bool { return m.enabled } func (m mockManager) RegisterAction(action client.Action) {} func (m mockManager) RegisterDiagnosticHook(name, description, filename, contentType string, hook client.DiagnosticHook) { } -func (m mockManager) SetPayload(payload map[string]interface{}) {} +func (m mockManager) SetPayload(payload map[string]any) {} func (m mockManager) SetStopCallback(f func()) {} func (m mockManager) Start() error { return nil } func (m mockManager) Status() status.Status { return status.Status(-42) } @@ -562,3 +563,21 @@ func TestManager(t *testing.T) { require.False(t, management.UnderAgent()) }) } + +func TestMultipleLoadMeta(t *testing.T) { + metaFile := filepath.Join(t.TempDir(), "meta.json") + var wg sync.WaitGroup + for range 64 { + wg.Add(1) + go func() { + defer wg.Done() + testBeat, err := NewBeat("filebeat", "testidx", "0.9", false, nil) + require.NoError(t, err) + logger := logptest.NewTestingLogger(t, "") + testBeat.Info.Logger = logger + err = testBeat.LoadMeta(metaFile) + require.NoError(t, err) + }() + } + wg.Wait() +} From 0e322852405b3d04ba126710ae46c5c7f1419667 Mon Sep 17 00:00:00 2001 From: "Lee E. Hinman" Date: Mon, 29 Sep 2025 14:15:44 -0500 Subject: [PATCH 2/4] skip TestMultipleReceivers on Windows until API start is fixed --- x-pack/filebeat/fbreceiver/receiver_test.go | 22 ++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/x-pack/filebeat/fbreceiver/receiver_test.go b/x-pack/filebeat/fbreceiver/receiver_test.go index f2801d28a62f..5b0f91fca49b 100644 --- a/x-pack/filebeat/fbreceiver/receiver_test.go +++ b/x-pack/filebeat/fbreceiver/receiver_test.go @@ -533,10 +533,10 @@ func TestConsumeContract(t *testing.T) { os.Setenv("OTELCONSUMER_RECEIVERTEST", "1") cfg := &Config{ - Beatconfig: map[string]interface{}{ + Beatconfig: map[string]any{ "queue.mem.flush.timeout": "0s", - "filebeat": map[string]interface{}{ - "inputs": []map[string]interface{}{ + "filebeat": map[string]any{ + "inputs": []map[string]any{ { "type": "filestream", "id": "filestream-test", @@ -544,16 +544,16 @@ func TestConsumeContract(t *testing.T) { "paths": []string{ filepath.Join(tmpDir, "input-*.log"), }, - "file_identity.native": map[string]interface{}{}, - "prospector": map[string]interface{}{ - "scanner": map[string]interface{}{ + "file_identity.native": map[string]any{}, + "prospector": map[string]any{ + "scanner": map[string]any{ "fingerprint.enabled": false, "check_interval": "0.1s", }, }, - "parsers": []map[string]interface{}{ + "parsers": []map[string]any{ { - "ndjson": map[string]interface{}{ + "ndjson": map[string]any{ "document_id": "id", }, }, @@ -561,10 +561,10 @@ func TestConsumeContract(t *testing.T) { }, }, }, - "output": map[string]interface{}{ - "otelconsumer": map[string]interface{}{}, + "output": map[string]any{ + "otelconsumer": map[string]any{}, }, - "logging": map[string]interface{}{ + "logging": map[string]any{ "level": "debug", "selectors": []string{ "*", From be310b9d6064778e0aa21ba031ea729aa771d611 Mon Sep 17 00:00:00 2001 From: "Lee E. Hinman" Date: Mon, 29 Sep 2025 16:10:00 -0500 Subject: [PATCH 3/4] use same dir as dst to avoid invalid cross-device links --- libbeat/cmd/instance/beat.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index 4999f327c8be..afe0dc6c5998 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -27,6 +27,7 @@ import ( "net" "os" "os/user" + "path/filepath" "runtime" "runtime/debug" "strconv" @@ -978,8 +979,8 @@ func (b *Beat) LoadMeta(metaPath string) error { // file does not exist or ID is invalid or first start time is not defined, let's create a new one - // write temporary file first - tmpFile, err := os.CreateTemp("", "beat_meta") + // write temporary file first, use same dir as destination to avoid invalid cross-device links + tmpFile, err := os.CreateTemp(filepath.Dir(metaPath), "new") if err != nil { return fmt.Errorf("failed to create temporary Beat meta file: %w", err) } From b84f9352bbacca2f15c3390adb2cadf7ada42d72 Mon Sep 17 00:00:00 2001 From: "Lee E. Hinman" Date: Wed, 1 Oct 2025 17:09:01 -0500 Subject: [PATCH 4/4] make temp filename more descriptive --- libbeat/cmd/instance/beat.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index afe0dc6c5998..9811183d17e2 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -980,7 +980,7 @@ func (b *Beat) LoadMeta(metaPath string) error { // file does not exist or ID is invalid or first start time is not defined, let's create a new one // write temporary file first, use same dir as destination to avoid invalid cross-device links - tmpFile, err := os.CreateTemp(filepath.Dir(metaPath), "new") + tmpFile, err := os.CreateTemp(filepath.Dir(metaPath), filepath.Base(metaPath)+".new") if err != nil { return fmt.Errorf("failed to create temporary Beat meta file: %w", err) }