Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"net"
"os"
"os/user"
"path/filepath"
"runtime"
"runtime/debug"
"strconv"
Expand Down Expand Up @@ -953,6 +954,7 @@ func (b *Beat) LoadMeta(metaPath string) error {
return fmt.Errorf("meta file failed to open: %w", err)
}

// file exists, read and load the meta data
if err == nil {
m := meta{}
if err := json.NewDecoder(f).Decode(&m); err != nil && err != io.EOF {
Expand All @@ -977,31 +979,29 @@ func (b *Beat) LoadMeta(metaPath string) error {

// file does not exist or ID is invalid or first start time is not defined, let's create a new one

// write temporary file first
tempFile := metaPath + ".new"
f, err = os.OpenFile(tempFile, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0o600)
// write temporary file first, use same dir as destination to avoid invalid cross-device links
tmpFile, err := os.CreateTemp(filepath.Dir(metaPath), filepath.Base(metaPath)+".new")
if err != nil {
return fmt.Errorf("failed to create Beat meta file: %w", err)
return fmt.Errorf("failed to create temporary Beat meta file: %w", err)
}

encodeErr := json.NewEncoder(f).Encode(meta{UUID: b.Info.ID, FirstStart: b.Info.FirstStart})
err = f.Sync()
encodeErr := json.NewEncoder(tmpFile).Encode(meta{UUID: b.Info.ID, FirstStart: b.Info.FirstStart})
err = tmpFile.Sync()
if err != nil {
return fmt.Errorf("Beat meta file failed to write: %w", err)
return fmt.Errorf("beat meta file failed to sync data: %w", err)
}

err = f.Close()
err = tmpFile.Close()
if err != nil {
return fmt.Errorf("Beat meta file failed to write: %w", err)
return fmt.Errorf("beat meta file failed to close: %w", err)
}

if encodeErr != nil {
return fmt.Errorf("Beat meta file failed to write: %w", encodeErr)
return fmt.Errorf("beat meta file failed to encode vaules: %w", encodeErr)
}

// move temporary file into final location
err = file.SafeFileRotate(metaPath, tempFile)
return err
return file.SafeFileRotate(metaPath, tmpFile.Name())
}

func openRegular(filename string) (*os.File, error) {
Expand Down
27 changes: 23 additions & 4 deletions libbeat/cmd/instance/beat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ package instance
import (
"bytes"
"flag"
"io/ioutil"
"os"
"path/filepath"
"sync"
"testing"

"github.com/elastic/beats/v7/libbeat/beat"
Expand Down Expand Up @@ -134,7 +135,7 @@ func TestEmptyMetaJson(t *testing.T) {
}

// prepare empty meta file
metaFile, err := ioutil.TempFile("../test", "meta.json")
metaFile, err := os.CreateTemp("../test", "meta.json")
assert.Equal(t, nil, err, "Unable to create temporary meta file")

metaPath := metaFile.Name()
Expand All @@ -157,7 +158,7 @@ func TestMetaJsonWithTimestamp(t *testing.T) {
}
firstStart := firstBeat.Info.FirstStart

metaFile, err := ioutil.TempFile("../test", "meta.json")
metaFile, err := os.CreateTemp("../test", "meta.json")
assert.Equal(t, nil, err, "Unable to create temporary meta file")

metaPath := metaFile.Name()
Expand Down Expand Up @@ -504,7 +505,7 @@ func (m mockManager) Enabled() bool { return m.enabled }
func (m mockManager) RegisterAction(action client.Action) {}
func (m mockManager) RegisterDiagnosticHook(name, description, filename, contentType string, hook client.DiagnosticHook) {
}
func (m mockManager) SetPayload(payload map[string]interface{}) {}
func (m mockManager) SetPayload(payload map[string]any) {}
func (m mockManager) SetStopCallback(f func()) {}
func (m mockManager) Start() error { return nil }
func (m mockManager) Status() status.Status { return status.Status(-42) }
Expand Down Expand Up @@ -562,3 +563,21 @@ func TestManager(t *testing.T) {
require.False(t, management.UnderAgent())
})
}

func TestMultipleLoadMeta(t *testing.T) {
metaFile := filepath.Join(t.TempDir(), "meta.json")
var wg sync.WaitGroup
for range 64 {
wg.Add(1)
go func() {
defer wg.Done()
testBeat, err := NewBeat("filebeat", "testidx", "0.9", false, nil)
require.NoError(t, err)
logger := logptest.NewTestingLogger(t, "")
testBeat.Info.Logger = logger
err = testBeat.LoadMeta(metaFile)
require.NoError(t, err)
}()
}
wg.Wait()
}
22 changes: 11 additions & 11 deletions x-pack/filebeat/fbreceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,38 +533,38 @@ func TestConsumeContract(t *testing.T) {
os.Setenv("OTELCONSUMER_RECEIVERTEST", "1")

cfg := &Config{
Beatconfig: map[string]interface{}{
Beatconfig: map[string]any{
"queue.mem.flush.timeout": "0s",
"filebeat": map[string]interface{}{
"inputs": []map[string]interface{}{
"filebeat": map[string]any{
"inputs": []map[string]any{
{
"type": "filestream",
"id": "filestream-test",
"enabled": true,
"paths": []string{
filepath.Join(tmpDir, "input-*.log"),
},
"file_identity.native": map[string]interface{}{},
"prospector": map[string]interface{}{
"scanner": map[string]interface{}{
"file_identity.native": map[string]any{},
"prospector": map[string]any{
"scanner": map[string]any{
"fingerprint.enabled": false,
"check_interval": "0.1s",
},
},
"parsers": []map[string]interface{}{
"parsers": []map[string]any{
{
"ndjson": map[string]interface{}{
"ndjson": map[string]any{
"document_id": "id",
},
},
},
},
},
},
"output": map[string]interface{}{
"otelconsumer": map[string]interface{}{},
"output": map[string]any{
"otelconsumer": map[string]any{},
},
"logging": map[string]interface{}{
"logging": map[string]any{
"level": "debug",
"selectors": []string{
"*",
Expand Down