Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
"net"
"os"
"os/user"
"path/filepath"
"runtime"
"runtime/debug"
"strconv"
Expand Down Expand Up @@ -248,7 +249,7 @@
ID: id,
FirstStart: time.Now(),
StartTime: time.Now(),
EphemeralID: metricreport.EphemeralID(),

Check failure on line 252 in libbeat/cmd/instance/beat.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

SA1019: metricreport.EphemeralID is deprecated: generate your own EphemeralID (staticcheck)
FIPSDistribution: version.FIPSDistribution,
},
Fields: fields,
Expand Down Expand Up @@ -344,13 +345,13 @@

reg := b.Monitoring.StatsRegistry().GetOrCreateRegistry("libbeat")

err = metricreport.SetupMetrics(b.Info.Logger.Named("metrics"), b.Info.Beat, version.GetDefaultVersion())

Check failure on line 348 in libbeat/cmd/instance/beat.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

SA1019: metricreport.SetupMetrics is deprecated: use SetupMetricsOptions (staticcheck)
if err != nil {
return nil, err
}

// Report central management state
mgmt := b.Monitoring.StateRegistry().NewRegistry("management")

Check failure on line 354 in libbeat/cmd/instance/beat.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

SA1019: b.Monitoring.StateRegistry().NewRegistry is deprecated: Use GetOrCreateRegistry instead, which does not panic if the given name already exists. (staticcheck)
monitoring.NewBool(mgmt, "enabled").Set(b.Manager.Enabled())

log.Debug("Initializing output plugins")
Expand Down Expand Up @@ -951,6 +952,7 @@
return fmt.Errorf("meta file failed to open: %w", err)
}

// file exists, read and load the meta data
if err == nil {
m := meta{}
if err := json.NewDecoder(f).Decode(&m); err != nil && err != io.EOF {
Expand All @@ -975,31 +977,29 @@

// file does not exist or ID is invalid or first start time is not defined, let's create a new one

// write temporary file first
tempFile := metaPath + ".new"
f, err = os.OpenFile(tempFile, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0o600)
// write temporary file first, use same dir as destination to avoid invalid cross-device links
tmpFile, err := os.CreateTemp(filepath.Dir(metaPath), filepath.Base(metaPath)+".new")
if err != nil {
return fmt.Errorf("failed to create Beat meta file: %w", err)
return fmt.Errorf("failed to create temporary Beat meta file: %w", err)
}

encodeErr := json.NewEncoder(f).Encode(meta{UUID: b.Info.ID, FirstStart: b.Info.FirstStart})
err = f.Sync()
encodeErr := json.NewEncoder(tmpFile).Encode(meta{UUID: b.Info.ID, FirstStart: b.Info.FirstStart})
err = tmpFile.Sync()
if err != nil {
return fmt.Errorf("Beat meta file failed to write: %w", err)
return fmt.Errorf("beat meta file failed to sync data: %w", err)
}

err = f.Close()
err = tmpFile.Close()
if err != nil {
return fmt.Errorf("Beat meta file failed to write: %w", err)
return fmt.Errorf("beat meta file failed to close: %w", err)
}

if encodeErr != nil {
return fmt.Errorf("Beat meta file failed to write: %w", encodeErr)
return fmt.Errorf("beat meta file failed to encode vaules: %w", encodeErr)
}

// move temporary file into final location
err = file.SafeFileRotate(metaPath, tempFile)
return err
return file.SafeFileRotate(metaPath, tmpFile.Name())
}

func openRegular(filename string) (*os.File, error) {
Expand Down Expand Up @@ -1280,7 +1280,7 @@

// Build and return a callback to fetch the Elasticsearch cluster_uuid for monitoring
func (b *Beat) clusterUUIDFetchingCallback() elasticsearch.ConnectCallback {
elasticsearchRegistry := b.Monitoring.StateRegistry().NewRegistry("outputs.elasticsearch")

Check failure on line 1283 in libbeat/cmd/instance/beat.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

SA1019: b.Monitoring.StateRegistry().NewRegistry is deprecated: Use GetOrCreateRegistry instead, which does not panic if the given name already exists. (staticcheck)
clusterUUIDRegVar := monitoring.NewString(elasticsearchRegistry, "cluster_uuid")

callback := func(esClient *eslegclient.Connection) error {
Expand Down Expand Up @@ -1317,7 +1317,7 @@

// Expose monitoring.cluster_uuid in state API
if monitoringClusterUUID != "" {
monitoringRegistry := b.Monitoring.StateRegistry().NewRegistry("monitoring")

Check failure on line 1320 in libbeat/cmd/instance/beat.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

SA1019: b.Monitoring.StateRegistry().NewRegistry is deprecated: Use GetOrCreateRegistry instead, which does not panic if the given name already exists. (staticcheck)
clusterUUIDRegVar := monitoring.NewString(monitoringRegistry, "cluster_uuid")
clusterUUIDRegVar.Set(monitoringClusterUUID)
}
Expand Down
27 changes: 23 additions & 4 deletions libbeat/cmd/instance/beat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ package instance
import (
"bytes"
"flag"
"io/ioutil"
"os"
"path/filepath"
"sync"
"testing"

"github.com/elastic/beats/v7/libbeat/beat"
Expand Down Expand Up @@ -134,7 +135,7 @@ func TestEmptyMetaJson(t *testing.T) {
}

// prepare empty meta file
metaFile, err := ioutil.TempFile("../test", "meta.json")
metaFile, err := os.CreateTemp("../test", "meta.json")
assert.Equal(t, nil, err, "Unable to create temporary meta file")

metaPath := metaFile.Name()
Expand All @@ -157,7 +158,7 @@ func TestMetaJsonWithTimestamp(t *testing.T) {
}
firstStart := firstBeat.Info.FirstStart

metaFile, err := ioutil.TempFile("../test", "meta.json")
metaFile, err := os.CreateTemp("../test", "meta.json")
assert.Equal(t, nil, err, "Unable to create temporary meta file")

metaPath := metaFile.Name()
Expand Down Expand Up @@ -504,7 +505,7 @@ func (m mockManager) Enabled() bool { return m.enabled }
func (m mockManager) RegisterAction(action client.Action) {}
func (m mockManager) RegisterDiagnosticHook(name, description, filename, contentType string, hook client.DiagnosticHook) {
}
func (m mockManager) SetPayload(payload map[string]interface{}) {}
func (m mockManager) SetPayload(payload map[string]any) {}
func (m mockManager) SetStopCallback(f func()) {}
func (m mockManager) Start() error { return nil }
func (m mockManager) Status() status.Status { return status.Status(-42) }
Expand Down Expand Up @@ -562,3 +563,21 @@ func TestManager(t *testing.T) {
require.False(t, management.UnderAgent())
})
}

func TestMultipleLoadMeta(t *testing.T) {
metaFile := filepath.Join(t.TempDir(), "meta.json")
var wg sync.WaitGroup
for range 64 {
wg.Add(1)
go func() {
defer wg.Done()
testBeat, err := NewBeat("filebeat", "testidx", "0.9", false, nil)
require.NoError(t, err)
logger := logptest.NewTestingLogger(t, "")
testBeat.Info.Logger = logger
err = testBeat.LoadMeta(metaFile)
require.NoError(t, err)
}()
}
wg.Wait()
}
22 changes: 11 additions & 11 deletions x-pack/filebeat/fbreceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,38 +483,38 @@ func TestConsumeContract(t *testing.T) {
os.Setenv("OTELCONSUMER_RECEIVERTEST", "1")

cfg := &Config{
Beatconfig: map[string]interface{}{
Beatconfig: map[string]any{
"queue.mem.flush.timeout": "0s",
"filebeat": map[string]interface{}{
"inputs": []map[string]interface{}{
"filebeat": map[string]any{
"inputs": []map[string]any{
{
"type": "filestream",
"id": "filestream-test",
"enabled": true,
"paths": []string{
filepath.Join(tmpDir, "input-*.log"),
},
"file_identity.native": map[string]interface{}{},
"prospector": map[string]interface{}{
"scanner": map[string]interface{}{
"file_identity.native": map[string]any{},
"prospector": map[string]any{
"scanner": map[string]any{
"fingerprint.enabled": false,
"check_interval": "0.1s",
},
},
"parsers": []map[string]interface{}{
"parsers": []map[string]any{
{
"ndjson": map[string]interface{}{
"ndjson": map[string]any{
"document_id": "id",
},
},
},
},
},
},
"output": map[string]interface{}{
"otelconsumer": map[string]interface{}{},
"output": map[string]any{
"otelconsumer": map[string]any{},
},
"logging": map[string]interface{}{
"logging": map[string]any{
"level": "debug",
"selectors": []string{
"*",
Expand Down
Loading