diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index aa93db94b65..9811183d17e 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -27,6 +27,7 @@ import ( "net" "os" "os/user" + "path/filepath" "runtime" "runtime/debug" "strconv" @@ -953,6 +954,7 @@ func (b *Beat) LoadMeta(metaPath string) error { return fmt.Errorf("meta file failed to open: %w", err) } + // file exists, read and load the meta data if err == nil { m := meta{} if err := json.NewDecoder(f).Decode(&m); err != nil && err != io.EOF { @@ -977,31 +979,29 @@ func (b *Beat) LoadMeta(metaPath string) error { // file does not exist or ID is invalid or first start time is not defined, let's create a new one - // write temporary file first - tempFile := metaPath + ".new" - f, err = os.OpenFile(tempFile, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0o600) + // write temporary file first, use same dir as destination to avoid invalid cross-device links + tmpFile, err := os.CreateTemp(filepath.Dir(metaPath), filepath.Base(metaPath)+".new") if err != nil { - return fmt.Errorf("failed to create Beat meta file: %w", err) + return fmt.Errorf("failed to create temporary Beat meta file: %w", err) } - encodeErr := json.NewEncoder(f).Encode(meta{UUID: b.Info.ID, FirstStart: b.Info.FirstStart}) - err = f.Sync() + encodeErr := json.NewEncoder(tmpFile).Encode(meta{UUID: b.Info.ID, FirstStart: b.Info.FirstStart}) + err = tmpFile.Sync() if err != nil { - return fmt.Errorf("Beat meta file failed to write: %w", err) + return fmt.Errorf("beat meta file failed to sync data: %w", err) } - err = f.Close() + err = tmpFile.Close() if err != nil { - return fmt.Errorf("Beat meta file failed to write: %w", err) + return fmt.Errorf("beat meta file failed to close: %w", err) } if encodeErr != nil { - return fmt.Errorf("Beat meta file failed to write: %w", encodeErr) + return fmt.Errorf("beat meta file failed to encode vaules: %w", encodeErr) } // move temporary file into final location - err = file.SafeFileRotate(metaPath, tempFile) - return err + return file.SafeFileRotate(metaPath, tmpFile.Name()) } func openRegular(filename string) (*os.File, error) { diff --git a/libbeat/cmd/instance/beat_test.go b/libbeat/cmd/instance/beat_test.go index 8c85d70616f..b1318540157 100644 --- a/libbeat/cmd/instance/beat_test.go +++ b/libbeat/cmd/instance/beat_test.go @@ -22,8 +22,9 @@ package instance import ( "bytes" "flag" - "io/ioutil" "os" + "path/filepath" + "sync" "testing" "github.com/elastic/beats/v7/libbeat/beat" @@ -134,7 +135,7 @@ func TestEmptyMetaJson(t *testing.T) { } // prepare empty meta file - metaFile, err := ioutil.TempFile("../test", "meta.json") + metaFile, err := os.CreateTemp("../test", "meta.json") assert.Equal(t, nil, err, "Unable to create temporary meta file") metaPath := metaFile.Name() @@ -157,7 +158,7 @@ func TestMetaJsonWithTimestamp(t *testing.T) { } firstStart := firstBeat.Info.FirstStart - metaFile, err := ioutil.TempFile("../test", "meta.json") + metaFile, err := os.CreateTemp("../test", "meta.json") assert.Equal(t, nil, err, "Unable to create temporary meta file") metaPath := metaFile.Name() @@ -504,7 +505,7 @@ func (m mockManager) Enabled() bool { return m.enabled } func (m mockManager) RegisterAction(action client.Action) {} func (m mockManager) RegisterDiagnosticHook(name, description, filename, contentType string, hook client.DiagnosticHook) { } -func (m mockManager) SetPayload(payload map[string]interface{}) {} +func (m mockManager) SetPayload(payload map[string]any) {} func (m mockManager) SetStopCallback(f func()) {} func (m mockManager) Start() error { return nil } func (m mockManager) Status() status.Status { return status.Status(-42) } @@ -562,3 +563,21 @@ func TestManager(t *testing.T) { require.False(t, management.UnderAgent()) }) } + +func TestMultipleLoadMeta(t *testing.T) { + metaFile := filepath.Join(t.TempDir(), "meta.json") + var wg sync.WaitGroup + for range 64 { + wg.Add(1) + go func() { + defer wg.Done() + testBeat, err := NewBeat("filebeat", "testidx", "0.9", false, nil) + require.NoError(t, err) + logger := logptest.NewTestingLogger(t, "") + testBeat.Info.Logger = logger + err = testBeat.LoadMeta(metaFile) + require.NoError(t, err) + }() + } + wg.Wait() +} diff --git a/x-pack/filebeat/fbreceiver/receiver_test.go b/x-pack/filebeat/fbreceiver/receiver_test.go index f2801d28a62..5b0f91fca49 100644 --- a/x-pack/filebeat/fbreceiver/receiver_test.go +++ b/x-pack/filebeat/fbreceiver/receiver_test.go @@ -533,10 +533,10 @@ func TestConsumeContract(t *testing.T) { os.Setenv("OTELCONSUMER_RECEIVERTEST", "1") cfg := &Config{ - Beatconfig: map[string]interface{}{ + Beatconfig: map[string]any{ "queue.mem.flush.timeout": "0s", - "filebeat": map[string]interface{}{ - "inputs": []map[string]interface{}{ + "filebeat": map[string]any{ + "inputs": []map[string]any{ { "type": "filestream", "id": "filestream-test", @@ -544,16 +544,16 @@ func TestConsumeContract(t *testing.T) { "paths": []string{ filepath.Join(tmpDir, "input-*.log"), }, - "file_identity.native": map[string]interface{}{}, - "prospector": map[string]interface{}{ - "scanner": map[string]interface{}{ + "file_identity.native": map[string]any{}, + "prospector": map[string]any{ + "scanner": map[string]any{ "fingerprint.enabled": false, "check_interval": "0.1s", }, }, - "parsers": []map[string]interface{}{ + "parsers": []map[string]any{ { - "ndjson": map[string]interface{}{ + "ndjson": map[string]any{ "document_id": "id", }, }, @@ -561,10 +561,10 @@ func TestConsumeContract(t *testing.T) { }, }, }, - "output": map[string]interface{}{ - "otelconsumer": map[string]interface{}{}, + "output": map[string]any{ + "otelconsumer": map[string]any{}, }, - "logging": map[string]interface{}{ + "logging": map[string]any{ "level": "debug", "selectors": []string{ "*",