Skip to content

Commit 6d3543c

Browse files
[beatreceivers] fix race condition in beat LoadMeta (#46844) (#46897)
* fix race condition in beat LoadMeta (cherry picked from commit 89721e6) Co-authored-by: Lee E Hinman <[email protected]>
1 parent 4e51113 commit 6d3543c

File tree

3 files changed

+46
-27
lines changed

3 files changed

+46
-27
lines changed

libbeat/cmd/instance/beat.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"net"
2828
"os"
2929
"os/user"
30+
"path/filepath"
3031
"runtime"
3132
"runtime/debug"
3233
"strconv"
@@ -953,6 +954,7 @@ func (b *Beat) LoadMeta(metaPath string) error {
953954
return fmt.Errorf("meta file failed to open: %w", err)
954955
}
955956

957+
// file exists, read and load the meta data
956958
if err == nil {
957959
m := meta{}
958960
if err := json.NewDecoder(f).Decode(&m); err != nil && err != io.EOF {
@@ -977,31 +979,29 @@ func (b *Beat) LoadMeta(metaPath string) error {
977979

978980
// file does not exist or ID is invalid or first start time is not defined, let's create a new one
979981

980-
// write temporary file first
981-
tempFile := metaPath + ".new"
982-
f, err = os.OpenFile(tempFile, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0o600)
982+
// write temporary file first, use same dir as destination to avoid invalid cross-device links
983+
tmpFile, err := os.CreateTemp(filepath.Dir(metaPath), filepath.Base(metaPath)+".new")
983984
if err != nil {
984-
return fmt.Errorf("failed to create Beat meta file: %w", err)
985+
return fmt.Errorf("failed to create temporary Beat meta file: %w", err)
985986
}
986987

987-
encodeErr := json.NewEncoder(f).Encode(meta{UUID: b.Info.ID, FirstStart: b.Info.FirstStart})
988-
err = f.Sync()
988+
encodeErr := json.NewEncoder(tmpFile).Encode(meta{UUID: b.Info.ID, FirstStart: b.Info.FirstStart})
989+
err = tmpFile.Sync()
989990
if err != nil {
990-
return fmt.Errorf("Beat meta file failed to write: %w", err)
991+
return fmt.Errorf("beat meta file failed to sync data: %w", err)
991992
}
992993

993-
err = f.Close()
994+
err = tmpFile.Close()
994995
if err != nil {
995-
return fmt.Errorf("Beat meta file failed to write: %w", err)
996+
return fmt.Errorf("beat meta file failed to close: %w", err)
996997
}
997998

998999
if encodeErr != nil {
999-
return fmt.Errorf("Beat meta file failed to write: %w", encodeErr)
1000+
return fmt.Errorf("beat meta file failed to encode vaules: %w", encodeErr)
10001001
}
10011002

10021003
// move temporary file into final location
1003-
err = file.SafeFileRotate(metaPath, tempFile)
1004-
return err
1004+
return file.SafeFileRotate(metaPath, tmpFile.Name())
10051005
}
10061006

10071007
func openRegular(filename string) (*os.File, error) {

libbeat/cmd/instance/beat_test.go

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,9 @@ package instance
2222
import (
2323
"bytes"
2424
"flag"
25-
"io/ioutil"
2625
"os"
26+
"path/filepath"
27+
"sync"
2728
"testing"
2829

2930
"github.com/elastic/beats/v7/libbeat/beat"
@@ -134,7 +135,7 @@ func TestEmptyMetaJson(t *testing.T) {
134135
}
135136

136137
// prepare empty meta file
137-
metaFile, err := ioutil.TempFile("../test", "meta.json")
138+
metaFile, err := os.CreateTemp("../test", "meta.json")
138139
assert.Equal(t, nil, err, "Unable to create temporary meta file")
139140

140141
metaPath := metaFile.Name()
@@ -157,7 +158,7 @@ func TestMetaJsonWithTimestamp(t *testing.T) {
157158
}
158159
firstStart := firstBeat.Info.FirstStart
159160

160-
metaFile, err := ioutil.TempFile("../test", "meta.json")
161+
metaFile, err := os.CreateTemp("../test", "meta.json")
161162
assert.Equal(t, nil, err, "Unable to create temporary meta file")
162163

163164
metaPath := metaFile.Name()
@@ -504,7 +505,7 @@ func (m mockManager) Enabled() bool { return m.enabled }
504505
func (m mockManager) RegisterAction(action client.Action) {}
505506
func (m mockManager) RegisterDiagnosticHook(name, description, filename, contentType string, hook client.DiagnosticHook) {
506507
}
507-
func (m mockManager) SetPayload(payload map[string]interface{}) {}
508+
func (m mockManager) SetPayload(payload map[string]any) {}
508509
func (m mockManager) SetStopCallback(f func()) {}
509510
func (m mockManager) Start() error { return nil }
510511
func (m mockManager) Status() status.Status { return status.Status(-42) }
@@ -562,3 +563,21 @@ func TestManager(t *testing.T) {
562563
require.False(t, management.UnderAgent())
563564
})
564565
}
566+
567+
func TestMultipleLoadMeta(t *testing.T) {
568+
metaFile := filepath.Join(t.TempDir(), "meta.json")
569+
var wg sync.WaitGroup
570+
for range 64 {
571+
wg.Add(1)
572+
go func() {
573+
defer wg.Done()
574+
testBeat, err := NewBeat("filebeat", "testidx", "0.9", false, nil)
575+
require.NoError(t, err)
576+
logger := logptest.NewTestingLogger(t, "")
577+
testBeat.Info.Logger = logger
578+
err = testBeat.LoadMeta(metaFile)
579+
require.NoError(t, err)
580+
}()
581+
}
582+
wg.Wait()
583+
}

x-pack/filebeat/fbreceiver/receiver_test.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -533,38 +533,38 @@ func TestConsumeContract(t *testing.T) {
533533
os.Setenv("OTELCONSUMER_RECEIVERTEST", "1")
534534

535535
cfg := &Config{
536-
Beatconfig: map[string]interface{}{
536+
Beatconfig: map[string]any{
537537
"queue.mem.flush.timeout": "0s",
538-
"filebeat": map[string]interface{}{
539-
"inputs": []map[string]interface{}{
538+
"filebeat": map[string]any{
539+
"inputs": []map[string]any{
540540
{
541541
"type": "filestream",
542542
"id": "filestream-test",
543543
"enabled": true,
544544
"paths": []string{
545545
filepath.Join(tmpDir, "input-*.log"),
546546
},
547-
"file_identity.native": map[string]interface{}{},
548-
"prospector": map[string]interface{}{
549-
"scanner": map[string]interface{}{
547+
"file_identity.native": map[string]any{},
548+
"prospector": map[string]any{
549+
"scanner": map[string]any{
550550
"fingerprint.enabled": false,
551551
"check_interval": "0.1s",
552552
},
553553
},
554-
"parsers": []map[string]interface{}{
554+
"parsers": []map[string]any{
555555
{
556-
"ndjson": map[string]interface{}{
556+
"ndjson": map[string]any{
557557
"document_id": "id",
558558
},
559559
},
560560
},
561561
},
562562
},
563563
},
564-
"output": map[string]interface{}{
565-
"otelconsumer": map[string]interface{}{},
564+
"output": map[string]any{
565+
"otelconsumer": map[string]any{},
566566
},
567-
"logging": map[string]interface{}{
567+
"logging": map[string]any{
568568
"level": "debug",
569569
"selectors": []string{
570570
"*",

0 commit comments

Comments
 (0)