Skip to content

Commit 86fb2c4

Browse files
leehinmanmergify[bot]
authored andcommitted
[beatreceivers] fix race condition in beat LoadMeta (#46844)
* fix race condition in beat LoadMeta (cherry picked from commit 89721e6)
1 parent b2d1609 commit 86fb2c4

File tree

3 files changed

+46
-27
lines changed

3 files changed

+46
-27
lines changed

libbeat/cmd/instance/beat.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"net"
2828
"os"
2929
"os/user"
30+
"path/filepath"
3031
"runtime"
3132
"runtime/debug"
3233
"strconv"
@@ -951,6 +952,7 @@ func (b *Beat) LoadMeta(metaPath string) error {
951952
return fmt.Errorf("meta file failed to open: %w", err)
952953
}
953954

955+
// file exists, read and load the meta data
954956
if err == nil {
955957
m := meta{}
956958
if err := json.NewDecoder(f).Decode(&m); err != nil && err != io.EOF {
@@ -975,31 +977,29 @@ func (b *Beat) LoadMeta(metaPath string) error {
975977

976978
// file does not exist or ID is invalid or first start time is not defined, let's create a new one
977979

978-
// write temporary file first
979-
tempFile := metaPath + ".new"
980-
f, err = os.OpenFile(tempFile, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0o600)
980+
// write temporary file first, use same dir as destination to avoid invalid cross-device links
981+
tmpFile, err := os.CreateTemp(filepath.Dir(metaPath), filepath.Base(metaPath)+".new")
981982
if err != nil {
982-
return fmt.Errorf("failed to create Beat meta file: %w", err)
983+
return fmt.Errorf("failed to create temporary Beat meta file: %w", err)
983984
}
984985

985-
encodeErr := json.NewEncoder(f).Encode(meta{UUID: b.Info.ID, FirstStart: b.Info.FirstStart})
986-
err = f.Sync()
986+
encodeErr := json.NewEncoder(tmpFile).Encode(meta{UUID: b.Info.ID, FirstStart: b.Info.FirstStart})
987+
err = tmpFile.Sync()
987988
if err != nil {
988-
return fmt.Errorf("Beat meta file failed to write: %w", err)
989+
return fmt.Errorf("beat meta file failed to sync data: %w", err)
989990
}
990991

991-
err = f.Close()
992+
err = tmpFile.Close()
992993
if err != nil {
993-
return fmt.Errorf("Beat meta file failed to write: %w", err)
994+
return fmt.Errorf("beat meta file failed to close: %w", err)
994995
}
995996

996997
if encodeErr != nil {
997-
return fmt.Errorf("Beat meta file failed to write: %w", encodeErr)
998+
return fmt.Errorf("beat meta file failed to encode vaules: %w", encodeErr)
998999
}
9991000

10001001
// move temporary file into final location
1001-
err = file.SafeFileRotate(metaPath, tempFile)
1002-
return err
1002+
return file.SafeFileRotate(metaPath, tmpFile.Name())
10031003
}
10041004

10051005
func openRegular(filename string) (*os.File, error) {

libbeat/cmd/instance/beat_test.go

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,9 @@ package instance
2222
import (
2323
"bytes"
2424
"flag"
25-
"io/ioutil"
2625
"os"
26+
"path/filepath"
27+
"sync"
2728
"testing"
2829

2930
"github.com/elastic/beats/v7/libbeat/beat"
@@ -134,7 +135,7 @@ func TestEmptyMetaJson(t *testing.T) {
134135
}
135136

136137
// prepare empty meta file
137-
metaFile, err := ioutil.TempFile("../test", "meta.json")
138+
metaFile, err := os.CreateTemp("../test", "meta.json")
138139
assert.Equal(t, nil, err, "Unable to create temporary meta file")
139140

140141
metaPath := metaFile.Name()
@@ -157,7 +158,7 @@ func TestMetaJsonWithTimestamp(t *testing.T) {
157158
}
158159
firstStart := firstBeat.Info.FirstStart
159160

160-
metaFile, err := ioutil.TempFile("../test", "meta.json")
161+
metaFile, err := os.CreateTemp("../test", "meta.json")
161162
assert.Equal(t, nil, err, "Unable to create temporary meta file")
162163

163164
metaPath := metaFile.Name()
@@ -504,7 +505,7 @@ func (m mockManager) Enabled() bool { return m.enabled }
504505
func (m mockManager) RegisterAction(action client.Action) {}
505506
func (m mockManager) RegisterDiagnosticHook(name, description, filename, contentType string, hook client.DiagnosticHook) {
506507
}
507-
func (m mockManager) SetPayload(payload map[string]interface{}) {}
508+
func (m mockManager) SetPayload(payload map[string]any) {}
508509
func (m mockManager) SetStopCallback(f func()) {}
509510
func (m mockManager) Start() error { return nil }
510511
func (m mockManager) Status() status.Status { return status.Status(-42) }
@@ -562,3 +563,21 @@ func TestManager(t *testing.T) {
562563
require.False(t, management.UnderAgent())
563564
})
564565
}
566+
567+
func TestMultipleLoadMeta(t *testing.T) {
568+
metaFile := filepath.Join(t.TempDir(), "meta.json")
569+
var wg sync.WaitGroup
570+
for range 64 {
571+
wg.Add(1)
572+
go func() {
573+
defer wg.Done()
574+
testBeat, err := NewBeat("filebeat", "testidx", "0.9", false, nil)
575+
require.NoError(t, err)
576+
logger := logptest.NewTestingLogger(t, "")
577+
testBeat.Info.Logger = logger
578+
err = testBeat.LoadMeta(metaFile)
579+
require.NoError(t, err)
580+
}()
581+
}
582+
wg.Wait()
583+
}

x-pack/filebeat/fbreceiver/receiver_test.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -483,38 +483,38 @@ func TestConsumeContract(t *testing.T) {
483483
os.Setenv("OTELCONSUMER_RECEIVERTEST", "1")
484484

485485
cfg := &Config{
486-
Beatconfig: map[string]interface{}{
486+
Beatconfig: map[string]any{
487487
"queue.mem.flush.timeout": "0s",
488-
"filebeat": map[string]interface{}{
489-
"inputs": []map[string]interface{}{
488+
"filebeat": map[string]any{
489+
"inputs": []map[string]any{
490490
{
491491
"type": "filestream",
492492
"id": "filestream-test",
493493
"enabled": true,
494494
"paths": []string{
495495
filepath.Join(tmpDir, "input-*.log"),
496496
},
497-
"file_identity.native": map[string]interface{}{},
498-
"prospector": map[string]interface{}{
499-
"scanner": map[string]interface{}{
497+
"file_identity.native": map[string]any{},
498+
"prospector": map[string]any{
499+
"scanner": map[string]any{
500500
"fingerprint.enabled": false,
501501
"check_interval": "0.1s",
502502
},
503503
},
504-
"parsers": []map[string]interface{}{
504+
"parsers": []map[string]any{
505505
{
506-
"ndjson": map[string]interface{}{
506+
"ndjson": map[string]any{
507507
"document_id": "id",
508508
},
509509
},
510510
},
511511
},
512512
},
513513
},
514-
"output": map[string]interface{}{
515-
"otelconsumer": map[string]interface{}{},
514+
"output": map[string]any{
515+
"otelconsumer": map[string]any{},
516516
},
517-
"logging": map[string]interface{}{
517+
"logging": map[string]any{
518518
"level": "debug",
519519
"selectors": []string{
520520
"*",

0 commit comments

Comments
 (0)