Skip to content

Commit 6816bc1

Browse files
authored
[fileconsumer] Generlize emit function (#24036)
1 parent 9a408c4 commit 6816bc1

File tree

15 files changed

+244
-287
lines changed

15 files changed

+244
-287
lines changed
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# Use this changelog template to create an entry for release notes.
2+
# If your change doesn't affect end users, such as a test fix or a tooling change,
3+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
4+
5+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
6+
change_type: deprecation
7+
8+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
9+
component: pkg/stanza
10+
11+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
12+
note: Deprecate fileconsumer.EmitFunc in favor of fileconsumer.emit.Callback
13+
14+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
15+
issues: [24036]
16+
17+
# (Optional) One or more lines of additional information to render under the primary note.
18+
# These lines will be padded with 2 spaces and then inserted directly into the document.
19+
# Use pipe (|) for multiline entries.
20+
subtext:

pkg/stanza/fileconsumer/attributes.go

Lines changed: 3 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,10 @@
44
package fileconsumer // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer"
55

66
import (
7-
"path/filepath"
8-
"runtime"
9-
10-
"go.uber.org/multierr"
11-
127
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/util"
138
)
149

10+
// Deprecated: [v0.82.0] This will be removed in a future release, tentatively v0.84.0.
1511
type FileAttributes struct {
1612
Name string `json:"-"`
1713
Path string `json:"-"`
@@ -21,28 +17,8 @@ type FileAttributes struct {
2117
}
2218

2319
// HeaderAttributesCopy gives a copy of the HeaderAttributes, in order to restrict mutation of the HeaderAttributes.
20+
//
21+
// Deprecated: [v0.82.0] This will be removed in a future release, tentatively v0.84.0.
2422
func (f *FileAttributes) HeaderAttributesCopy() map[string]any {
2523
return util.MapCopy(f.HeaderAttributes)
2624
}
27-
28-
// resolveFileAttributes resolves file attributes
29-
// and sets it to empty string in case of error
30-
func resolveFileAttributes(path string) (*FileAttributes, error) {
31-
resolved := ""
32-
var symErr error
33-
// Dirty solution, waiting for this permanent fix https://github.com/golang/go/issues/39786
34-
// EvalSymlinks on windows is partially working depending on the way you use Symlinks and Junctions
35-
if runtime.GOOS != "windows" {
36-
resolved, symErr = filepath.EvalSymlinks(path)
37-
} else {
38-
resolved = path
39-
}
40-
abs, absErr := filepath.Abs(resolved)
41-
42-
return &FileAttributes{
43-
Path: path,
44-
Name: filepath.Base(path),
45-
PathResolved: abs,
46-
NameResolved: filepath.Base(abs),
47-
}, multierr.Combine(symErr, absErr)
48-
}

pkg/stanza/fileconsumer/config.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"go.opentelemetry.io/collector/featuregate"
1414
"go.uber.org/zap"
1515

16+
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/emit"
1617
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint"
1718
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
1819
)
@@ -72,7 +73,7 @@ type Config struct {
7273
}
7374

7475
// Build will build a file input operator from the supplied configuration
75-
func (c Config) Build(logger *zap.SugaredLogger, emit EmitFunc) (*Manager, error) {
76+
func (c Config) Build(logger *zap.SugaredLogger, emit emit.Callback) (*Manager, error) {
7677
if err := c.validate(); err != nil {
7778
return nil, err
7879
}
@@ -87,7 +88,7 @@ func (c Config) Build(logger *zap.SugaredLogger, emit EmitFunc) (*Manager, error
8788
}
8889

8990
// BuildWithSplitFunc will build a file input operator with customized splitFunc function
90-
func (c Config) BuildWithSplitFunc(logger *zap.SugaredLogger, emit EmitFunc, splitFunc bufio.SplitFunc) (*Manager, error) {
91+
func (c Config) BuildWithSplitFunc(logger *zap.SugaredLogger, emit emit.Callback, splitFunc bufio.SplitFunc) (*Manager, error) {
9192
if err := c.validate(); err != nil {
9293
return nil, err
9394
}
@@ -105,7 +106,7 @@ func (c Config) BuildWithSplitFunc(logger *zap.SugaredLogger, emit EmitFunc, spl
105106
return c.buildManager(logger, emit, factory)
106107
}
107108

108-
func (c Config) buildManager(logger *zap.SugaredLogger, emit EmitFunc, factory splitterFactory) (*Manager, error) {
109+
func (c Config) buildManager(logger *zap.SugaredLogger, emit emit.Callback, factory splitterFactory) (*Manager, error) {
109110
if emit == nil {
110111
return nil, fmt.Errorf("must provide emit function")
111112
}
@@ -138,9 +139,13 @@ func (c Config) buildManager(logger *zap.SugaredLogger, emit EmitFunc, factory s
138139
readerFactory: readerFactory{
139140
SugaredLogger: logger.With("component", "fileconsumer"),
140141
readerConfig: &readerConfig{
141-
fingerprintSize: int(c.FingerprintSize),
142-
maxLogSize: int(c.MaxLogSize),
143-
emit: emit,
142+
fingerprintSize: int(c.FingerprintSize),
143+
maxLogSize: int(c.MaxLogSize),
144+
emit: emit,
145+
includeFileName: c.IncludeFileName,
146+
includeFilePath: c.IncludeFilePath,
147+
includeFileNameResolved: c.IncludeFileNameResolved,
148+
includeFilePathResolved: c.IncludeFilePathResolved,
144149
},
145150
fromBeginning: startAtBeginning,
146151
splitterFactory: factory,

pkg/stanza/fileconsumer/config_test.go

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
package fileconsumer
55

66
import (
7-
"context"
87
"path/filepath"
98
"testing"
109
"time"
@@ -636,9 +635,7 @@ func TestBuild(t *testing.T) {
636635
cfg := basicConfig()
637636
tc.modifyBaseConfig(cfg)
638637

639-
nopEmit := func(_ context.Context, _ *FileAttributes, _ []byte) {}
640-
641-
input, err := cfg.Build(testutil.Logger(t), nopEmit)
638+
input, err := cfg.Build(testutil.Logger(t), nopEmitFunc)
642639
tc.errorRequirement(t, err)
643640
if err != nil {
644641
return
@@ -708,7 +705,6 @@ func TestBuildWithSplitFunc(t *testing.T) {
708705
cfg := basicConfig()
709706
tc.modifyBaseConfig(cfg)
710707

711-
nopEmit := func(_ context.Context, _ *FileAttributes, _ []byte) {}
712708
splitNone := func(data []byte, atEOF bool) (advance int, token []byte, err error) {
713709
if !atEOF {
714710
return 0, nil, nil
@@ -719,7 +715,7 @@ func TestBuildWithSplitFunc(t *testing.T) {
719715
return len(data), data, nil
720716
}
721717

722-
input, err := cfg.BuildWithSplitFunc(testutil.Logger(t), nopEmit, splitNone)
718+
input, err := cfg.BuildWithSplitFunc(testutil.Logger(t), nopEmitFunc, splitNone)
723719
tc.errorRequirement(t, err)
724720
if err != nil {
725721
return
@@ -809,9 +805,7 @@ func TestBuildWithHeader(t *testing.T) {
809805
cfg := basicConfig()
810806
tc.modifyBaseConfig(cfg)
811807

812-
nopEmit := func(_ context.Context, _ *FileAttributes, _ []byte) {}
813-
814-
input, err := cfg.Build(testutil.Logger(t), nopEmit)
808+
input, err := cfg.Build(testutil.Logger(t), nopEmitFunc)
815809
tc.errorRequirement(t, err)
816810
if err != nil {
817811
return

pkg/stanza/fileconsumer/emit/emit.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package emit // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/emit"
5+
6+
import (
7+
"context"
8+
)
9+
10+
type Callback func(ctx context.Context, token []byte, attrs map[string]any) error

pkg/stanza/fileconsumer/file.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,14 @@ import (
1818
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
1919
)
2020

21+
const (
22+
logFileName = "log.file.name"
23+
logFilePath = "log.file.path"
24+
logFileNameResolved = "log.file.name_resolved"
25+
logFilePathResolved = "log.file.path_resolved"
26+
)
27+
28+
// Deprecated: [v0.82.0] Use emit.Callback instead. This will be removed in a future release, tentatively v0.84.0.
2129
type EmitFunc func(ctx context.Context, attrs *FileAttributes, token []byte)
2230

2331
type Manager struct {
@@ -369,6 +377,21 @@ func (m *Manager) loadLastPollFiles(ctx context.Context) error {
369377
if err = dec.Decode(unsafeReader); err != nil {
370378
return err
371379
}
380+
381+
// Migrate readers that used FileAttributes.HeaderAttributes
382+
// This block can be removed in a future release, tentatively v0.90.0
383+
if ha, ok := unsafeReader.FileAttributes["HeaderAttributes"]; ok {
384+
switch hat := ha.(type) {
385+
case map[string]any:
386+
for k, v := range hat {
387+
unsafeReader.FileAttributes[k] = v
388+
}
389+
delete(unsafeReader.FileAttributes, "HeaderAttributes")
390+
default:
391+
m.Errorw("migrate header attributes: unexpected format")
392+
}
393+
}
394+
372395
m.knownFiles = append(m.knownFiles, unsafeReader)
373396
}
374397

pkg/stanza/fileconsumer/file_test.go

Lines changed: 34 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ func TestAddFileFields(t *testing.T) {
5454
cfg.StartAt = "beginning"
5555
cfg.IncludeFileName = true
5656
cfg.IncludeFilePath = true
57+
cfg.IncludeFileNameResolved = false
58+
cfg.IncludeFilePathResolved = false
5759
operator, emitCalls := buildTestManager(t, cfg)
5860

5961
// Create a file, then start
@@ -66,8 +68,10 @@ func TestAddFileFields(t *testing.T) {
6668
}()
6769

6870
emitCall := waitForEmit(t, emitCalls)
69-
require.Equal(t, filepath.Base(temp.Name()), emitCall.attrs.Name)
70-
require.Equal(t, temp.Name(), emitCall.attrs.Path)
71+
require.Equal(t, filepath.Base(temp.Name()), emitCall.attrs[logFileName])
72+
require.Equal(t, temp.Name(), emitCall.attrs[logFilePath])
73+
require.Nil(t, emitCall.attrs[logFileNameResolved])
74+
require.Nil(t, emitCall.attrs[logFilePathResolved])
7175
}
7276

7377
// AddFileResolvedFields tests that the `log.file.name_resolved` and `log.file.path_resolved` fields are included
@@ -116,10 +120,10 @@ func TestAddFileResolvedFields(t *testing.T) {
116120
}()
117121

118122
emitCall := waitForEmit(t, emitCalls)
119-
require.Equal(t, filepath.Base(symLinkPath), emitCall.attrs.Name)
120-
require.Equal(t, symLinkPath, emitCall.attrs.Path)
121-
require.Equal(t, filepath.Base(resolved), emitCall.attrs.NameResolved)
122-
require.Equal(t, resolved, emitCall.attrs.PathResolved)
123+
require.Equal(t, filepath.Base(symLinkPath), emitCall.attrs[logFileName])
124+
require.Equal(t, symLinkPath, emitCall.attrs[logFilePath])
125+
require.Equal(t, filepath.Base(resolved), emitCall.attrs[logFileNameResolved])
126+
require.Equal(t, resolved, emitCall.attrs[logFilePathResolved])
123127
}
124128

125129
// AddFileResolvedFields tests that the `log.file.name_resolved` and `log.file.path_resolved` fields are included
@@ -186,10 +190,10 @@ func TestAddFileResolvedFieldsWithChangeOfSymlinkTarget(t *testing.T) {
186190
}()
187191

188192
emitCall := waitForEmit(t, emitCalls)
189-
require.Equal(t, filepath.Base(symLinkPath), emitCall.attrs.Name)
190-
require.Equal(t, symLinkPath, emitCall.attrs.Path)
191-
require.Equal(t, filepath.Base(resolved1), emitCall.attrs.NameResolved)
192-
require.Equal(t, resolved1, emitCall.attrs.PathResolved)
193+
require.Equal(t, filepath.Base(symLinkPath), emitCall.attrs[logFileName])
194+
require.Equal(t, symLinkPath, emitCall.attrs[logFilePath])
195+
require.Equal(t, filepath.Base(resolved1), emitCall.attrs[logFileNameResolved])
196+
require.Equal(t, resolved1, emitCall.attrs[logFilePathResolved])
193197

194198
// Change middleSymLink to point to file2
195199
err = os.Remove(middleSymLinkPath)
@@ -201,10 +205,10 @@ func TestAddFileResolvedFieldsWithChangeOfSymlinkTarget(t *testing.T) {
201205
writeString(t, file2, "testlog2\n")
202206

203207
emitCall = waitForEmit(t, emitCalls)
204-
require.Equal(t, filepath.Base(symLinkPath), emitCall.attrs.Name)
205-
require.Equal(t, symLinkPath, emitCall.attrs.Path)
206-
require.Equal(t, filepath.Base(resolved2), emitCall.attrs.NameResolved)
207-
require.Equal(t, resolved2, emitCall.attrs.PathResolved)
208+
require.Equal(t, filepath.Base(symLinkPath), emitCall.attrs[logFileName])
209+
require.Equal(t, symLinkPath, emitCall.attrs[logFilePath])
210+
require.Equal(t, filepath.Base(resolved2), emitCall.attrs[logFileNameResolved])
211+
require.Equal(t, resolved2, emitCall.attrs[logFilePathResolved])
208212
}
209213

210214
func TestFileFieldsUpdatedAfterRestart(t *testing.T) {
@@ -227,8 +231,10 @@ func TestFileFieldsUpdatedAfterRestart(t *testing.T) {
227231

228232
emitCall1 := waitForEmit(t, emitCalls1)
229233
assert.Equal(t, []byte("testlog1"), emitCall1.token)
230-
assert.Equal(t, filepath.Base(temp.Name()), emitCall1.attrs.Name)
231-
assert.Equal(t, temp.Name(), emitCall1.attrs.Path)
234+
assert.Equal(t, filepath.Base(temp.Name()), emitCall1.attrs[logFileName])
235+
assert.Equal(t, temp.Name(), emitCall1.attrs[logFilePath])
236+
assert.Nil(t, emitCall1.attrs[logFileNameResolved])
237+
assert.Nil(t, emitCall1.attrs[logFilePathResolved])
232238

233239
require.NoError(t, op1.Stop())
234240
temp.Close() // On windows, we must close the file before renaming it
@@ -245,8 +251,10 @@ func TestFileFieldsUpdatedAfterRestart(t *testing.T) {
245251

246252
emitCall2 := waitForEmit(t, emitCalls2)
247253
assert.Equal(t, []byte("testlog2"), emitCall2.token)
248-
assert.Equal(t, filepath.Base(newPath), emitCall2.attrs.Name)
249-
assert.Equal(t, newPath, emitCall2.attrs.Path)
254+
assert.Equal(t, filepath.Base(newPath), emitCall2.attrs[logFileName])
255+
assert.Equal(t, newPath, emitCall2.attrs[logFilePath])
256+
assert.Nil(t, emitCall2.attrs[logFileNameResolved])
257+
assert.Nil(t, emitCall2.attrs[logFilePathResolved])
250258

251259
require.NoError(t, op2.Stop())
252260
}
@@ -1457,9 +1465,10 @@ func TestReadExistingLogsWithHeader(t *testing.T) {
14571465
require.NoError(t, operator.Stop())
14581466
}()
14591467

1460-
waitForTokenHeaderAttributes(t, emitCalls, []byte("testlog"), map[string]any{
1468+
waitForTokenWithAttributes(t, emitCalls, []byte("testlog"), map[string]any{
14611469
"header_key": "headerField",
14621470
"header_value": "headerValue",
1471+
logFileName: filepath.Base(temp.Name()),
14631472
})
14641473
}
14651474

@@ -1555,9 +1564,10 @@ func TestHeaderPersistance(t *testing.T) {
15551564
persister := testutil.NewUnscopedMockPersister()
15561565
require.NoError(t, op1.Start(persister))
15571566

1558-
waitForTokenHeaderAttributes(t, emitCalls1, []byte("log line"), map[string]any{
1567+
waitForTokenWithAttributes(t, emitCalls1, []byte("log line"), map[string]any{
15591568
"header_key": "headerField",
15601569
"header_value": "headerValue",
1570+
logFileName: filepath.Base(temp.Name()),
15611571
})
15621572

15631573
require.NoError(t, op1.Stop())
@@ -1568,13 +1578,13 @@ func TestHeaderPersistance(t *testing.T) {
15681578

15691579
require.NoError(t, op2.Start(persister))
15701580

1571-
waitForTokenHeaderAttributes(t, emitCalls2, []byte("log line 2"), map[string]any{
1581+
waitForTokenWithAttributes(t, emitCalls2, []byte("log line 2"), map[string]any{
15721582
"header_key": "headerField",
15731583
"header_value": "headerValue",
1584+
logFileName: filepath.Base(temp.Name()),
15741585
})
15751586

15761587
require.NoError(t, op2.Stop())
1577-
15781588
}
15791589

15801590
func TestHeaderPersistanceInHeader(t *testing.T) {
@@ -1613,9 +1623,10 @@ func TestHeaderPersistanceInHeader(t *testing.T) {
16131623

16141624
require.NoError(t, op2.Start(persister))
16151625

1616-
waitForTokenHeaderAttributes(t, emitCalls, []byte("log line"), map[string]any{
1626+
waitForTokenWithAttributes(t, emitCalls, []byte("log line"), map[string]any{
16171627
"header_value_1": "headerValue1",
16181628
"header_value_2": "headerValue2",
1629+
logFileName: filepath.Base(temp.Name()),
16191630
})
16201631

16211632
require.NoError(t, op2.Stop())

0 commit comments

Comments
 (0)