Skip to content

Commit 6c94334

Browse files
[pkg/stanza] Improve error logs produced by transformer processors (#37285)
#### Description This improves the error messages of the log transformers processors to include the log file path and the original log record, effectively allowing users to quickly debug and investigate deeper the reason for the problem **without** having to use the debug exporter or using the highest verbosity level. Here's an example of one of these failures: > ```2025-01-17T15:38:13.637Z error helper/transformer.go:114 Failed to process entry {"kind": "receiver", "name": "filelog", "data_type": "logs", "operator_id": "move5", "operator_type": "move", "log.file.path": "/var/log/pods/kube-system_kindnet-jxpz6_0784c9f9-ec2b-4829-aeb3-263ec66ef953/kindnet-cni/19.log", "entry.timestamp": "2025-01-17T15:38:07.645938111Z", "error": "move: field does not exist: attributes.uid", "action": "send"}``` This log line could be even more helpful if the it included the regex pattern that was matched against the entry. I'm not sure about adding it now and it could be added in the future if desired. A small note on the name of the log keys: they could have shorter names, like `body` and `file_path` but I think we could also use the attribute names from the semantic conventions for logs. I have no preference though and I'm happy to change it if needed. #### Testing - Ran it locally in a kind cluster. - Some unit tests were updated to ensure the `log.record.original` key is present in the logs.
1 parent e8a0def commit 6c94334

File tree

4 files changed

+84
-6
lines changed

4 files changed

+84
-6
lines changed
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: pkg/stanza
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add entry's timestamp and attributes to errors logs from log transformers processors
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [37285]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: |
19+
When a log transformer processor fails to process an log entry it will include entry's timestamp and attributes in its own logs.
20+
With this information the user can more easily identify the log file and find the entry that's having issues.
21+
22+
# If your change doesn't affect end users or the exported elements of any package,
23+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
24+
# Optional: The change log or logs in which this entry should be included.
25+
# e.g. '[user]' or '[user, api]'
26+
# Include 'user' if the change is relevant to end users.
27+
# Include 'api' if there is a change to a library API.
28+
# Default: '[user]'
29+
change_logs: [user]

pkg/stanza/operator/helper/transformer.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,10 +94,23 @@ func (t *TransformerOperator) ProcessWith(ctx context.Context, entry *entry.Entr
9494

9595
// HandleEntryError will handle an entry error using the on_error strategy.
9696
func (t *TransformerOperator) HandleEntryError(ctx context.Context, entry *entry.Entry, err error) error {
97+
if entry == nil {
98+
return fmt.Errorf("got a nil entry, this should not happen and is potentially a bug")
99+
}
100+
101+
logFields := []zap.Field{
102+
zap.Any("error", err),
103+
zap.Any("action", t.OnError),
104+
zap.Any("entry.timestamp", entry.Timestamp),
105+
}
106+
for attrName, attrValue := range entry.Attributes {
107+
logFields = append(logFields, zap.Any(attrName, attrValue))
108+
}
109+
97110
if t.OnError == SendOnErrorQuiet || t.OnError == DropOnErrorQuiet {
98-
t.Logger().Debug("Failed to process entry", zap.Any("error", err), zap.Any("action", t.OnError))
111+
t.Logger().Debug("Failed to process entry", logFields...)
99112
} else {
100-
t.Logger().Error("Failed to process entry", zap.Any("error", err), zap.Any("action", t.OnError))
113+
t.Logger().Error("Failed to process entry", logFields...)
101114
}
102115
if t.OnError == SendOnError || t.OnError == SendOnErrorQuiet {
103116
writeErr := t.Write(ctx, entry)

pkg/stanza/operator/helper/transformer_test.go

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"context"
88
"fmt"
99
"testing"
10+
"time"
1011

1112
"github.com/stretchr/testify/mock"
1213
"github.com/stretchr/testify/require"
@@ -17,6 +18,7 @@ import (
1718
"go.uber.org/zap/zaptest/observer"
1819

1920
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
21+
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/attrs"
2022
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
2123
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/testutil"
2224
)
@@ -91,6 +93,9 @@ func TestTransformerDropOnError(t *testing.T) {
9193
}
9294
ctx := context.Background()
9395
testEntry := entry.New()
96+
now := time.Now()
97+
testEntry.Timestamp = now
98+
testEntry.AddAttribute(attrs.LogFilePath, "/test/file")
9499
transform := func(_ *entry.Entry) error {
95100
return fmt.Errorf("Failure")
96101
}
@@ -104,8 +109,10 @@ func TestTransformerDropOnError(t *testing.T) {
104109
{
105110
Entry: zapcore.Entry{Level: zap.ErrorLevel, Message: "Failed to process entry"},
106111
Context: []zapcore.Field{
107-
{Key: "error", Type: 26, Interface: fmt.Errorf("Failure")},
112+
{Key: "error", Type: zapcore.ErrorType, Interface: fmt.Errorf("Failure")},
108113
zap.Any("action", "drop"),
114+
zap.Any("entry.timestamp", now),
115+
zap.Any(attrs.LogFilePath, "/test/file"),
109116
},
110117
},
111118
}
@@ -136,6 +143,9 @@ func TestTransformerDropOnErrorQuiet(t *testing.T) {
136143
}
137144
ctx := context.Background()
138145
testEntry := entry.New()
146+
now := time.Now()
147+
testEntry.Timestamp = now
148+
testEntry.AddAttribute(attrs.LogFilePath, "/test/file")
139149
transform := func(_ *entry.Entry) error {
140150
return fmt.Errorf("Failure")
141151
}
@@ -151,6 +161,8 @@ func TestTransformerDropOnErrorQuiet(t *testing.T) {
151161
Context: []zapcore.Field{
152162
{Key: "error", Type: 26, Interface: fmt.Errorf("Failure")},
153163
zap.Any("action", "drop_quiet"),
164+
zap.Any("entry.timestamp", now),
165+
zap.Any(attrs.LogFilePath, "/test/file"),
154166
},
155167
},
156168
}
@@ -181,6 +193,9 @@ func TestTransformerSendOnError(t *testing.T) {
181193
}
182194
ctx := context.Background()
183195
testEntry := entry.New()
196+
now := time.Now()
197+
testEntry.Timestamp = now
198+
testEntry.AddAttribute(attrs.LogFilePath, "/test/file")
184199
transform := func(_ *entry.Entry) error {
185200
return fmt.Errorf("Failure")
186201
}
@@ -196,6 +211,8 @@ func TestTransformerSendOnError(t *testing.T) {
196211
Context: []zapcore.Field{
197212
{Key: "error", Type: 26, Interface: fmt.Errorf("Failure")},
198213
zap.Any("action", "send"),
214+
zap.Any("entry.timestamp", now),
215+
zap.Any(attrs.LogFilePath, "/test/file"),
199216
},
200217
},
201218
}
@@ -226,6 +243,9 @@ func TestTransformerSendOnErrorQuiet(t *testing.T) {
226243
}
227244
ctx := context.Background()
228245
testEntry := entry.New()
246+
now := time.Now()
247+
testEntry.Timestamp = now
248+
testEntry.AddAttribute(attrs.LogFilePath, "/test/file")
229249
transform := func(_ *entry.Entry) error {
230250
return fmt.Errorf("Failure")
231251
}
@@ -241,6 +261,8 @@ func TestTransformerSendOnErrorQuiet(t *testing.T) {
241261
Context: []zapcore.Field{
242262
{Key: "error", Type: 26, Interface: fmt.Errorf("Failure")},
243263
zap.Any("action", "send_quiet"),
264+
zap.Any("entry.timestamp", now),
265+
zap.Any(attrs.LogFilePath, "/test/file"),
244266
},
245267
},
246268
}

pkg/stanza/operator/transformer/router/transformer.go

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,27 +36,41 @@ func (t *Transformer) CanProcess() bool {
3636

3737
// Process will route incoming entries based on matching expressions
3838
func (t *Transformer) Process(ctx context.Context, entry *entry.Entry) error {
39+
if entry == nil {
40+
return fmt.Errorf("got a nil entry, this should not happen and is potentially a bug")
41+
}
42+
3943
env := helper.GetExprEnv(entry)
4044
defer helper.PutExprEnv(env)
4145

46+
logFields := []zap.Field{
47+
zap.Any("entry.timestamp", entry.Timestamp),
48+
}
49+
for attrName, attrValue := range entry.Attributes {
50+
logFields = append(logFields, zap.Any(attrName, attrValue))
51+
}
52+
4253
for _, route := range t.routes {
4354
matches, err := vm.Run(route.Expression, env)
4455
if err != nil {
45-
t.Logger().Warn("Running expression returned an error", zap.Error(err))
56+
logFields = append(logFields, zap.Any("error", err))
57+
t.Logger().Warn("Running expression returned an error", logFields...)
4658
continue
4759
}
4860

4961
// we compile the expression with "AsBool", so this should be safe
5062
if matches.(bool) {
5163
if err = route.Attribute(entry); err != nil {
52-
t.Logger().Error("Failed to label entry", zap.Error(err))
64+
logFields = append(logFields, zap.Any("error", err))
65+
t.Logger().Error("Failed to label entry", logFields...)
5366
return err
5467
}
5568

5669
for _, output := range route.OutputOperators {
5770
err = output.Process(ctx, entry)
71+
logFields = append(logFields, zap.Any("error", err))
5872
if err != nil {
59-
t.Logger().Error("Failed to process entry", zap.Error(err))
73+
t.Logger().Error("Failed to process entry", logFields...)
6074
}
6175
}
6276
break

0 commit comments

Comments
 (0)