@@ -112,6 +112,31 @@ func New(cfg *config.C, log *logp.Logger) (beat.Processor, error) {
112
112
p .geoData = mapstr.M {"host" : mapstr.M {"geo" : geoFields }}
113
113
}
114
114
115
+ << << << < HEAD
116
+ == == == =
117
+ // create a unique ID for this instance of the processor
118
+ var cbIDStr string
119
+ cbID , err := uuid .NewV4 ()
120
+ // if we fail, fall back to the processor name, hope for the best.
121
+ if err != nil {
122
+ p .logger .Errorf ("error generating ID for FQDN callback, reverting to processor name: %v" , err )
123
+ cbIDStr = processorName
124
+ } else {
125
+ cbIDStr = cbID .String ()
126
+ }
127
+
128
+ // this is safe as New() returns a pointer, not the actual object.
129
+ // This matters as other pieces of code in libbeat, like libbeat/processors/processor.go,
130
+ // will do weird stuff like copy the entire list of global processors.
131
+ err = features .AddFQDNOnChangeCallback (p .handleFQDNReportingChange , cbIDStr )
132
+ if err != nil {
133
+ return nil , fmt .Errorf (
134
+ "could not register callback for FQDN reporting onChange from %s processor: %w" ,
135
+ processorName , err ,
136
+ )
137
+ }
138
+
139
+ >> >> >> > a5be2a856 (chore : fix formatting issues in logp printf - style calls (#45944 ))
115
140
return p , nil
116
141
}
117
142
@@ -245,6 +270,63 @@ func (p *addHostMetadata) String() string {
245
270
processorName , p .config .NetInfoEnabled , p .config .CacheTTL )
246
271
}
247
272
273
+ << << << < HEAD
274
+ == == == =
275
+ func (p * addHostMetadata ) handleFQDNReportingChange (new , old bool ) {
276
+ if new == old {
277
+ // Nothing to do
278
+ return
279
+ }
280
+
281
+ // update the data for the processor
282
+ p .updateOrExpire (new )
283
+ }
284
+
285
+ // updateOrExpire will attempt to update the data for the processor, or expire the cache
286
+ // if the config update fails, or times out
287
+ func (p * addHostMetadata ) updateOrExpire (useFQDN bool ) {
288
+ if p .config .CacheTTL <= 0 {
289
+ return
290
+ }
291
+
292
+ p .lastUpdate .Lock ()
293
+ defer p .lastUpdate .Unlock ()
294
+
295
+ // while holding the mutex, attempt to update loadData()
296
+ // doing this with the mutex means other events must wait until we have the correct host data, as we assume that
297
+ // a call to this function means something else wants to force an update, and thus all events must sync.
298
+
299
+ updateChanSuccess := make (chan bool )
300
+ timeout := time .After (p .config .ExpireUpdateTimeout )
301
+ go func () {
302
+ err := p .loadData (false , useFQDN )
303
+ if err != nil {
304
+ p .logger .Errorf ("error updating data for processor: %v" , err )
305
+ updateChanSuccess <- false
306
+ return
307
+ }
308
+ updateChanSuccess <- true
309
+ }()
310
+
311
+ // this additional timeout check is paranoid, but when it's method is called from handleFQDNReportingChange(),
312
+ // it's blocking, which means we can hold a mutex in features. In addition, we don't want to break the processor by
313
+ // having all the events wait for too long.
314
+ select {
315
+ case <- timeout :
316
+ p .logger .Errorf ("got timeout while trying to update metadata" )
317
+ p .lastUpdate .Time = time.Time {}
318
+ case success := <- updateChanSuccess :
319
+ // only expire the cache if update was failed
320
+ if ! success {
321
+ p .lastUpdate .Time = time.Time {}
322
+ } else {
323
+ p .lastUpdate .Time = time .Now ()
324
+ }
325
+ }
326
+
327
+ }
328
+
329
+ >> >> >> > a5be2a856 (chore : fix formatting issues in logp printf - style calls (#45944 ))
248
330
func skipAddingHostMetadata (event * beat.Event ) bool {
249
331
// If host fields exist(besides host.name added by libbeat) in event, skip add_host_metadata.
250
332
hostFields , err := event .Fields .GetValue ("host" )
0 commit comments