@@ -73,6 +73,7 @@ type Filebeat struct {
73
73
done chan struct {}
74
74
stopOnce sync.Once // wraps the Stop() method
75
75
pipeline beat.PipelineConnector
76
+ logger * logp.Logger
76
77
}
77
78
78
79
type PluginFactory func (beat.Info , * logp.Logger , statestore.States ) []v2.Plugin
@@ -87,7 +88,7 @@ func New(plugins PluginFactory) beat.Creator {
87
88
func newBeater (b * beat.Beat , plugins PluginFactory , rawConfig * conf.C ) (beat.Beater , error ) {
88
89
config := cfg .DefaultConfig
89
90
if err := rawConfig .Unpack (& config ); err != nil {
90
- return nil , fmt .Errorf ("Error reading config file: %w" , err )
91
+ return nil , fmt .Errorf ("Error reading config file: %w" , err ) //nolint:staticcheck //Keep old behavior
91
92
}
92
93
93
94
if err := cfgwarn .CheckRemoved6xSettings (
@@ -132,7 +133,7 @@ func newBeater(b *beat.Beat, plugins PluginFactory, rawConfig *conf.C) (beat.Bea
132
133
"input_metrics.json" , "application/json" , func () []byte {
133
134
data , err := inputmon .MetricSnapshotJSON ()
134
135
if err != nil {
135
- logp . L () .Warnw ("Failed to collect input metric snapshot for Agent diagnostics." , "error" , err )
136
+ b . Info . Logger .Warnw ("Failed to collect input metric snapshot for Agent diagnostics." , "error" , err )
136
137
return []byte (err .Error ())
137
138
}
138
139
return data
@@ -161,7 +162,7 @@ func newBeater(b *beat.Beat, plugins PluginFactory, rawConfig *conf.C) (beat.Bea
161
162
}
162
163
163
164
// in the `setup` command, log this only as a warning
164
- logp .Warn ("Setup called, but no modules enabled." )
165
+ b . Info . Logger .Warn ("Setup called, but no modules enabled." )
165
166
}
166
167
167
168
if * once && config .ConfigInput .Enabled () && config .ConfigModules .Enabled () {
@@ -177,6 +178,7 @@ func newBeater(b *beat.Beat, plugins PluginFactory, rawConfig *conf.C) (beat.Bea
177
178
config : & config ,
178
179
moduleRegistry : moduleRegistry ,
179
180
pluginFactory : plugins ,
181
+ logger : b .Info .Logger ,
180
182
}
181
183
182
184
err = fb .setupPipelineLoaderCallback (b )
@@ -190,7 +192,7 @@ func newBeater(b *beat.Beat, plugins PluginFactory, rawConfig *conf.C) (beat.Bea
190
192
// setupPipelineLoaderCallback sets the callback function for loading pipelines during setup.
191
193
func (fb * Filebeat ) setupPipelineLoaderCallback (b * beat.Beat ) error {
192
194
if b .Config .Output .Name () != "elasticsearch" && ! b .Manager .Enabled () {
193
- logp .Warn (pipelinesWarning )
195
+ fb . logger .Warn (pipelinesWarning )
194
196
return nil
195
197
}
196
198
@@ -223,7 +225,7 @@ func (fb *Filebeat) setupPipelineLoaderCallback(b *beat.Beat) error {
223
225
newPath := strings .TrimSuffix (origPath , ".yml" )
224
226
_ = fb .config .ConfigModules .SetString ("path" , - 1 , newPath )
225
227
}
226
- modulesLoader := cfgfile .NewReloader (logp . L () .Named ("module.reloader" ), fb .pipeline , fb .config .ConfigModules )
228
+ modulesLoader := cfgfile .NewReloader (fb . logger .Named ("module.reloader" ), fb .pipeline , fb .config .ConfigModules )
227
229
modulesLoader .Load (modulesFactory )
228
230
}
229
231
@@ -236,7 +238,7 @@ func (fb *Filebeat) setupPipelineLoaderCallback(b *beat.Beat) error {
236
238
// setup.
237
239
func (fb * Filebeat ) loadModulesPipelines (b * beat.Beat ) error {
238
240
if b .Config .Output .Name () != "elasticsearch" {
239
- logp .Warn (pipelinesWarning )
241
+ fb . logger .Warn (pipelinesWarning )
240
242
return nil
241
243
}
242
244
@@ -288,7 +290,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
288
290
289
291
registryMigrator := registrar .NewMigrator (config .Registry )
290
292
if err := registryMigrator .Run (); err != nil {
291
- logp . Err ("Failed to migrate registry file: %+v" , err )
293
+ fb . logger . Errorf ("Failed to migrate registry file: %+v" , err )
292
294
return err
293
295
}
294
296
@@ -299,9 +301,9 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
299
301
cn ()
300
302
}()
301
303
302
- stateStore , err := openStateStore (ctx , b .Info , logp . NewLogger ("filebeat" ), config .Registry )
304
+ stateStore , err := openStateStore (ctx , b .Info , fb . logger . Named ("filebeat" ), config .Registry )
303
305
if err != nil {
304
- logp . Err ("Failed to open state store: %+v" , err )
306
+ fb . logger . Errorf ("Failed to open state store: %+v" , err )
305
307
return err
306
308
}
307
309
defer stateStore .Close ()
@@ -313,7 +315,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
313
315
b .OutputConfigReloader = reload .ReloadableFunc (func (r * reload.ConfigWithMeta ) error {
314
316
outCfg := conf.Namespace {}
315
317
if err := r .Config .Unpack (& outCfg ); err != nil || outCfg .Name () != "elasticsearch" {
316
- logp . Err ("Failed to unpack the output config: %v" , err )
318
+ fb . logger . Errorf ("Failed to unpack the output config: %v" , err )
317
319
return nil
318
320
}
319
321
@@ -322,24 +324,24 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
322
324
// See https://github.com/elastic/beats/issues/42815
323
325
configCopy , err := conf .NewConfigFrom (outCfg .Config ())
324
326
if err != nil {
325
- logp . Err ("Failed to create a new config from the output config: %v" , err )
327
+ fb . logger . Errorf ("Failed to create a new config from the output config: %v" , err )
326
328
return nil
327
329
}
328
330
stateStore .notifier .Notify (configCopy )
329
331
return nil
330
332
})
331
333
}
332
334
333
- err = filestream .ValidateInputIDs (config .Inputs , logp . NewLogger ("input.filestream" ))
335
+ err = filestream .ValidateInputIDs (config .Inputs , fb . logger . Named ("input.filestream" ))
334
336
if err != nil {
335
- logp . Err ("invalid filestream configuration: %+v" , err )
337
+ fb . logger . Errorf ("invalid filestream configuration: %+v" , err )
336
338
return err
337
339
}
338
340
339
341
// Setup registrar to persist state
340
342
registrar , err := registrar .New (stateStore , finishedLogger , config .Registry .FlushTimeout )
341
343
if err != nil {
342
- logp . Err ("Could not init registrar: %v" , err )
344
+ fb . logger . Errorf ("Could not init registrar: %v" , err )
343
345
return err
344
346
}
345
347
@@ -364,7 +366,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
364
366
outDone := make (chan struct {}) // outDone closes down all active pipeline connections
365
367
pipelineConnector := channel .NewOutletFactory (outDone ).Create
366
368
367
- inputsLogger := logp . NewLogger ("input" )
369
+ inputsLogger := fb . logger . Named ("input" )
368
370
v2Inputs := fb .pluginFactory (b .Info , inputsLogger , stateStore )
369
371
v2InputLoader , err := v2 .NewLoader (inputsLogger , v2Inputs , "type" , cfg .DefaultType )
370
372
if err != nil {
@@ -378,7 +380,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
378
380
379
381
// Store needs to be fully configured at this point
380
382
if err := v2InputLoader .Init (& inputTaskGroup ); err != nil {
381
- logp . Err ("Failed to initialize the input managers: %v" , err )
383
+ fb . logger . Errorf ("Failed to initialize the input managers: %v" , err )
382
384
return err
383
385
}
384
386
@@ -399,12 +401,12 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
399
401
if b .Config .Output .Name () == "elasticsearch" {
400
402
pipelineLoaderFactory = newPipelineLoaderFactory (pipelineFactoryCtx , b .Config .Output .Config ())
401
403
} else {
402
- logp .Warn (pipelinesWarning )
404
+ fb . logger .Warn (pipelinesWarning )
403
405
}
404
406
moduleLoader := fileset .NewFactory (inputLoader , b .Info , pipelineLoaderFactory , config .OverwritePipelines )
405
- crawler , err := newCrawler (inputLoader , moduleLoader , config .Inputs , fb .done , * once )
407
+ crawler , err := newCrawler (inputLoader , moduleLoader , config .Inputs , fb .done , * once , fb . logger )
406
408
if err != nil {
407
- logp . Err ("Could not init crawler: %v" , err )
409
+ fb . logger . Errorf ("Could not init crawler: %v" , err )
408
410
return err
409
411
}
410
412
@@ -415,7 +417,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
415
417
// Start the registrar
416
418
err = registrar .Start ()
417
419
if err != nil {
418
- return fmt .Errorf ("Could not start registrar: %w" , err )
420
+ return fmt .Errorf ("Could not start registrar: %w" , err ) //nolint:staticcheck //Keep old behavior
419
421
}
420
422
421
423
// Stopping registrar will write last state
@@ -433,31 +435,31 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
433
435
defer waitEvents .Wait ()
434
436
435
437
if config .OverwritePipelines {
436
- logp .Debug ("modules" , "Existing Ingest pipelines will be updated" )
438
+ fb . logger .Debug ("modules" , "Existing Ingest pipelines will be updated" )
437
439
}
438
440
439
441
err = crawler .Start (fb .pipeline , config .ConfigInput , config .ConfigModules )
440
442
if err != nil {
441
443
crawler .Stop ()
442
444
cancelPipelineFactoryCtx ()
443
- return fmt .Errorf ("Failed to start crawler: %w" , err )
445
+ return fmt .Errorf ("Failed to start crawler: %w" , err ) //nolint:staticcheck //Keep old behavior
444
446
}
445
447
446
448
// If run once, add crawler completion check as alternative to done signal
447
449
if * once {
448
450
runOnce := func () {
449
- logp .Info ("Running filebeat once. Waiting for completion ..." )
451
+ fb . logger .Info ("Running filebeat once. Waiting for completion ..." )
450
452
crawler .WaitForCompletion ()
451
- logp .Info ("All data collection completed. Shutting down." )
453
+ fb . logger .Info ("All data collection completed. Shutting down." )
452
454
}
453
455
waitFinished .Add (runOnce )
454
456
}
455
457
456
458
// Register reloadable list of inputs and modules
457
- inputs := cfgfile .NewRunnerList (management .DebugK , inputLoader , fb .pipeline )
459
+ inputs := cfgfile .NewRunnerList (management .DebugK , inputLoader , fb .pipeline , fb . logger )
458
460
b .Registry .MustRegisterInput (inputs )
459
461
460
- modules := cfgfile .NewRunnerList (management .DebugK , moduleLoader , fb .pipeline )
462
+ modules := cfgfile .NewRunnerList (management .DebugK , moduleLoader , fb .pipeline , fb . logger )
461
463
462
464
var adiscover * autodiscover.Autodiscover
463
465
if fb .config .Autodiscover != nil {
@@ -471,6 +473,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
471
473
autodiscover .QueryConfig (),
472
474
config .Autodiscover ,
473
475
b .Keystore ,
476
+ fb .logger ,
474
477
)
475
478
if err != nil {
476
479
return err
@@ -506,7 +509,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
506
509
"Continue shutdown: All enqueued events being published." ))
507
510
// Wait for either timeout or all events having been ACKed by outputs.
508
511
if fb .config .ShutdownTimeout > 0 {
509
- logp .Info ("Shutdown output timer started. Waiting for max %v." , timeout )
512
+ fb . logger .Info ("Shutdown output timer started. Waiting for max %v." , timeout )
510
513
waitEvents .Add (withLog (waitDuration (timeout ),
511
514
"Continue shutdown: Time out waiting for events being published." ))
512
515
} else {
@@ -532,7 +535,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
532
535
533
536
// Stop is called on exit to stop the crawling, spooling and registration processes.
534
537
func (fb * Filebeat ) Stop () {
535
- logp .Info ("Stopping filebeat" )
538
+ fb . logger .Info ("Stopping filebeat" )
536
539
537
540
// Stop Filebeat
538
541
fb .stopOnce .Do (func () { close (fb .done ) })
@@ -543,7 +546,7 @@ func newPipelineLoaderFactory(ctx context.Context, esConfig *conf.C) fileset.Pip
543
546
pipelineLoaderFactory := func () (fileset.PipelineLoader , error ) {
544
547
esClient , err := eslegclient .NewConnectedClient (ctx , esConfig , "Filebeat" )
545
548
if err != nil {
546
- return nil , fmt .Errorf ("Error creating Elasticsearch client: %w" , err )
549
+ return nil , fmt .Errorf ("Error creating Elasticsearch client: %w" , err ) //nolint:staticcheck //Keep old behavior
547
550
}
548
551
return esClient , nil
549
552
}
0 commit comments