@@ -39,7 +39,7 @@ type Input struct {
39
39
remote RemoteConfig
40
40
remoteSessionHandle windows.Handle
41
41
startRemoteSession func () error
42
- processEvent func (context.Context , Event )
42
+ processEvent func (context.Context , Event ) error
43
43
}
44
44
45
45
// newInput creates a new Input operator.
@@ -219,7 +219,9 @@ func (i *Input) read(ctx context.Context) {
219
219
}
220
220
221
221
for n , event := range events {
222
- i .processEvent (ctx , event )
222
+ if err := i .processEvent (ctx , event ); err != nil {
223
+ i .Logger ().Error ("process event" , zap .Error (err ))
224
+ }
223
225
if len (events ) == n + 1 {
224
226
i .updateBookmarkOffset (ctx , event )
225
227
}
@@ -240,70 +242,66 @@ func (i *Input) getPublisherName(event Event) (name string, excluded bool) {
240
242
return providerName , false
241
243
}
242
244
243
- func (i * Input ) renderSimpleAndSend (ctx context.Context , event Event ) {
245
+ func (i * Input ) renderSimpleAndSend (ctx context.Context , event Event ) error {
244
246
simpleEvent , err := event .RenderSimple (i .buffer )
245
247
if err != nil {
246
- i .Logger ().Error ("Failed to render simple event" , zap .Error (err ))
247
- return
248
+ return fmt .Errorf ("render simple event: %w" , err )
248
249
}
249
- i .sendEvent (ctx , simpleEvent )
250
+ return i .sendEvent (ctx , simpleEvent )
250
251
}
251
252
252
- func (i * Input ) renderDeepAndSend (ctx context.Context , event Event , publisher Publisher ) {
253
+ func (i * Input ) renderDeepAndSend (ctx context.Context , event Event , publisher Publisher ) error {
253
254
deepEvent , err := event .RenderDeep (i .buffer , publisher )
254
255
if err == nil {
255
- i .sendEvent (ctx , deepEvent )
256
- return
256
+ return i .sendEvent (ctx , deepEvent )
257
257
}
258
- i .Logger ().Error ("Failed to render formatted event" , zap .Error (err ))
259
- i .renderSimpleAndSend (ctx , event )
258
+ return errors .Join (
259
+ fmt .Errorf ("render deep event: %w" , err ),
260
+ i .renderSimpleAndSend (ctx , event ),
261
+ )
260
262
}
261
263
262
264
// processEvent will process and send an event retrieved from windows event log.
263
- func (i * Input ) processEventWithoutRenderingInfo (ctx context.Context , event Event ) {
265
+ func (i * Input ) processEventWithoutRenderingInfo (ctx context.Context , event Event ) error {
264
266
if len (i .excludeProviders ) == 0 {
265
- i .renderSimpleAndSend (ctx , event )
266
- return
267
+ return i .renderSimpleAndSend (ctx , event )
267
268
}
268
269
if _ , exclude := i .getPublisherName (event ); exclude {
269
- return
270
+ return nil
270
271
}
271
- i .renderSimpleAndSend (ctx , event )
272
+ return i .renderSimpleAndSend (ctx , event )
272
273
}
273
274
274
- func (i * Input ) processEventWithRenderingInfo (ctx context.Context , event Event ) {
275
+ func (i * Input ) processEventWithRenderingInfo (ctx context.Context , event Event ) error {
275
276
providerName , exclude := i .getPublisherName (event )
276
277
if exclude {
277
- return
278
+ return nil
278
279
}
279
280
280
281
publisher , err := i .publisherCache .get (providerName )
281
282
if err != nil {
282
- i .Logger ().Warn (
283
- "Failed to open event source, respective log entries cannot be formatted" ,
284
- zap .String ("provider" , providerName ), zap .Error (err ))
285
- i .renderSimpleAndSend (ctx , event )
286
- return
283
+ return errors .Join (
284
+ fmt .Errorf ("open event source for provider %q: %w" , providerName , err ),
285
+ i .renderSimpleAndSend (ctx , event ),
286
+ )
287
287
}
288
288
289
289
if publisher .Valid () {
290
- i .renderDeepAndSend (ctx , event , publisher )
291
- return
290
+ return i .renderDeepAndSend (ctx , event , publisher )
292
291
}
293
- i .renderSimpleAndSend (ctx , event )
292
+ return i .renderSimpleAndSend (ctx , event )
294
293
}
295
294
296
295
// sendEvent will send EventXML as an entry to the operator's output.
297
- func (i * Input ) sendEvent (ctx context.Context , eventXML * EventXML ) {
296
+ func (i * Input ) sendEvent (ctx context.Context , eventXML * EventXML ) error {
298
297
var body any = eventXML .Original
299
298
if ! i .raw {
300
299
body = formattedBody (eventXML )
301
300
}
302
301
303
302
e , err := i .NewEntry (body )
304
303
if err != nil {
305
- i .Logger ().Error ("Failed to create entry" , zap .Error (err ))
306
- return
304
+ return fmt .Errorf ("create entry: %w" , err )
307
305
}
308
306
309
307
e .Timestamp = parseTimestamp (eventXML .TimeCreated .SystemTime )
@@ -313,7 +311,7 @@ func (i *Input) sendEvent(ctx context.Context, eventXML *EventXML) {
313
311
e .Attributes ["server.address" ] = i .remote .Server
314
312
}
315
313
316
- _ = i .Write (ctx , e )
314
+ return i .Write (ctx , e )
317
315
}
318
316
319
317
// getBookmarkXML will get the bookmark xml from the offsets database.
0 commit comments