Skip to content

Commit b6c56e4

Browse files
committed
include file record num in reader config
1 parent b58ec21 commit b6c56e4

File tree

5 files changed

+58
-66
lines changed

5 files changed

+58
-66
lines changed

pkg/stanza/fileconsumer/attrs/attrs.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ type Resolver struct {
2727
IncludeFilePathResolved bool `mapstructure:"include_file_path_resolved,omitempty"`
2828
IncludeFileOwnerName bool `mapstructure:"include_file_owner_name,omitempty"`
2929
IncludeFileOwnerGroupName bool `mapstructure:"include_file_owner_group_name,omitempty"`
30-
IncludeFileRecordNumber bool `mapstructure:"include_file_record_number,omitempty"`
3130
}
3231

3332
func (r *Resolver) Resolve(file *os.File) (attributes map[string]any, err error) {
@@ -46,10 +45,6 @@ func (r *Resolver) Resolve(file *os.File) (attributes map[string]any, err error)
4645
return nil, err
4746
}
4847
}
49-
if r.IncludeFileRecordNumber {
50-
// non-zero value to flag for setting
51-
attributes[LogFileRecordNumber] = 1
52-
}
5348
if !r.IncludeFileNameResolved && !r.IncludeFilePathResolved {
5449
return attributes, nil
5550
}

pkg/stanza/fileconsumer/attrs/attrs_test.go

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,10 @@ import (
1717
func TestResolver(t *testing.T) {
1818
t.Parallel()
1919

20-
for i := 0; i < 128; i++ {
20+
for i := 0; i < 64; i++ {
2121

22-
// Create a 7 bit string where each bit represents the value of a config option
23-
bitString := fmt.Sprintf("%07b", i)
22+
// Create a 6 bit string where each bit represents the value of a config option
23+
bitString := fmt.Sprintf("%06b", i)
2424

2525
// Create a resolver with a config that matches the bit pattern of i
2626
r := Resolver{
@@ -30,7 +30,6 @@ func TestResolver(t *testing.T) {
3030
IncludeFilePathResolved: bitString[3] == '1',
3131
IncludeFileOwnerName: bitString[4] == '1' && runtime.GOOS != "windows",
3232
IncludeFileOwnerGroupName: bitString[5] == '1' && runtime.GOOS != "windows",
33-
IncludeFileRecordNumber: bitString[6] == '1',
3433
}
3534

3635
t.Run(bitString, func(t *testing.T) {
@@ -54,12 +53,6 @@ func TestResolver(t *testing.T) {
5453
} else {
5554
assert.Empty(t, attributes[LogFilePath])
5655
}
57-
if r.IncludeFileRecordNumber {
58-
expectLen++
59-
assert.Equal(t, 1, attributes[LogFileRecordNumber])
60-
} else {
61-
assert.Empty(t, attributes[LogFileRecordNumber])
62-
}
6356

6457
// We don't have an independent way to resolve the path, so the only meaningful validate
6558
// is to ensure that the resolver returns nothing vs something based on the config.

pkg/stanza/fileconsumer/config.go

Lines changed: 29 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -71,20 +71,21 @@ func NewConfig() *Config {
7171

7272
// Config is the configuration of a file input operator
7373
type Config struct {
74-
matcher.Criteria `mapstructure:",squash"`
75-
attrs.Resolver `mapstructure:",squash"`
76-
PollInterval time.Duration `mapstructure:"poll_interval,omitempty"`
77-
MaxConcurrentFiles int `mapstructure:"max_concurrent_files,omitempty"`
78-
MaxBatches int `mapstructure:"max_batches,omitempty"`
79-
StartAt string `mapstructure:"start_at,omitempty"`
80-
FingerprintSize helper.ByteSize `mapstructure:"fingerprint_size,omitempty"`
81-
MaxLogSize helper.ByteSize `mapstructure:"max_log_size,omitempty"`
82-
Encoding string `mapstructure:"encoding,omitempty"`
83-
SplitConfig split.Config `mapstructure:"multiline,omitempty"`
84-
TrimConfig trim.Config `mapstructure:",squash,omitempty"`
85-
FlushPeriod time.Duration `mapstructure:"force_flush_period,omitempty"`
86-
Header *HeaderConfig `mapstructure:"header,omitempty"`
87-
DeleteAfterRead bool `mapstructure:"delete_after_read,omitempty"`
74+
matcher.Criteria `mapstructure:",squash"`
75+
attrs.Resolver `mapstructure:",squash"`
76+
PollInterval time.Duration `mapstructure:"poll_interval,omitempty"`
77+
MaxConcurrentFiles int `mapstructure:"max_concurrent_files,omitempty"`
78+
MaxBatches int `mapstructure:"max_batches,omitempty"`
79+
StartAt string `mapstructure:"start_at,omitempty"`
80+
FingerprintSize helper.ByteSize `mapstructure:"fingerprint_size,omitempty"`
81+
MaxLogSize helper.ByteSize `mapstructure:"max_log_size,omitempty"`
82+
Encoding string `mapstructure:"encoding,omitempty"`
83+
SplitConfig split.Config `mapstructure:"multiline,omitempty"`
84+
TrimConfig trim.Config `mapstructure:",squash,omitempty"`
85+
FlushPeriod time.Duration `mapstructure:"force_flush_period,omitempty"`
86+
Header *HeaderConfig `mapstructure:"header,omitempty"`
87+
DeleteAfterRead bool `mapstructure:"delete_after_read,omitempty"`
88+
IncludeFileRecordNumber bool `mapstructure:"include_file_record_number,omitempty"`
8889
}
8990

9091
type HeaderConfig struct {
@@ -153,19 +154,20 @@ func (c Config) Build(set component.TelemetrySettings, emit emit.Callback, opts
153154

154155
set.Logger = set.Logger.With(zap.String("component", "fileconsumer"))
155156
readerFactory := reader.Factory{
156-
TelemetrySettings: set,
157-
FromBeginning: startAtBeginning,
158-
FingerprintSize: int(c.FingerprintSize),
159-
InitialBufferSize: scanner.DefaultBufferSize,
160-
MaxLogSize: int(c.MaxLogSize),
161-
Encoding: enc,
162-
SplitFunc: splitFunc,
163-
TrimFunc: trimFunc,
164-
FlushTimeout: c.FlushPeriod,
165-
EmitFunc: emit,
166-
Attributes: c.Resolver,
167-
HeaderConfig: hCfg,
168-
DeleteAtEOF: c.DeleteAfterRead,
157+
TelemetrySettings: set,
158+
FromBeginning: startAtBeginning,
159+
FingerprintSize: int(c.FingerprintSize),
160+
InitialBufferSize: scanner.DefaultBufferSize,
161+
MaxLogSize: int(c.MaxLogSize),
162+
Encoding: enc,
163+
SplitFunc: splitFunc,
164+
TrimFunc: trimFunc,
165+
FlushTimeout: c.FlushPeriod,
166+
EmitFunc: emit,
167+
Attributes: c.Resolver,
168+
HeaderConfig: hCfg,
169+
DeleteAtEOF: c.DeleteAfterRead,
170+
IncludeFileRecordNumber: c.IncludeFileRecordNumber,
169171
}
170172

171173
var t tracker.Tracker

pkg/stanza/fileconsumer/internal/reader/factory.go

Lines changed: 24 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -30,18 +30,19 @@ const (
3030

3131
type Factory struct {
3232
component.TelemetrySettings
33-
HeaderConfig *header.Config
34-
FromBeginning bool
35-
FingerprintSize int
36-
InitialBufferSize int
37-
MaxLogSize int
38-
Encoding encoding.Encoding
39-
SplitFunc bufio.SplitFunc
40-
TrimFunc trim.Func
41-
FlushTimeout time.Duration
42-
EmitFunc emit.Callback
43-
Attributes attrs.Resolver
44-
DeleteAtEOF bool
33+
HeaderConfig *header.Config
34+
FromBeginning bool
35+
FingerprintSize int
36+
InitialBufferSize int
37+
MaxLogSize int
38+
Encoding encoding.Encoding
39+
SplitFunc bufio.SplitFunc
40+
TrimFunc trim.Func
41+
FlushTimeout time.Duration
42+
EmitFunc emit.Callback
43+
Attributes attrs.Resolver
44+
DeleteAtEOF bool
45+
IncludeFileRecordNumber bool
4546
}
4647

4748
func (f *Factory) NewFingerprint(file *os.File) (*fingerprint.Fingerprint, error) {
@@ -63,16 +64,17 @@ func (f *Factory) NewReader(file *os.File, fp *fingerprint.Fingerprint) (*Reader
6364
func (f *Factory) NewReaderFromMetadata(file *os.File, m *Metadata) (r *Reader, err error) {
6465

6566
r = &Reader{
66-
Metadata: m,
67-
set: f.TelemetrySettings,
68-
file: file,
69-
fileName: file.Name(),
70-
fingerprintSize: f.FingerprintSize,
71-
initialBufferSize: f.InitialBufferSize,
72-
maxLogSize: f.MaxLogSize,
73-
decoder: decode.New(f.Encoding),
74-
lineSplitFunc: f.SplitFunc,
75-
deleteAtEOF: f.DeleteAtEOF,
67+
Metadata: m,
68+
set: f.TelemetrySettings,
69+
file: file,
70+
fileName: file.Name(),
71+
fingerprintSize: f.FingerprintSize,
72+
initialBufferSize: f.InitialBufferSize,
73+
maxLogSize: f.MaxLogSize,
74+
decoder: decode.New(f.Encoding),
75+
lineSplitFunc: f.SplitFunc,
76+
deleteAtEOF: f.DeleteAtEOF,
77+
includeFileRecordNum: f.IncludeFileRecordNumber,
7678
}
7779
r.set.Logger = r.set.Logger.With(zap.String("path", r.fileName))
7880

pkg/stanza/fileconsumer/internal/reader/reader.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ type Reader struct {
4848
emitFunc emit.Callback
4949
deleteAtEOF bool
5050
needsUpdateFingerprint bool
51+
includeFileRecordNum bool
5152
}
5253

5354
// ReadToEnd will read until the end of the file
@@ -90,8 +91,7 @@ func (r *Reader) ReadToEnd(ctx context.Context) {
9091
continue
9192
}
9293

93-
_, fileRecordNumEnabled := r.FileAttributes[attrs.LogFileRecordNumber]
94-
if fileRecordNumEnabled {
94+
if r.includeFileRecordNum {
9595
r.RecordNum++
9696
r.FileAttributes[attrs.LogFileRecordNumber] = r.RecordNum
9797
}

0 commit comments

Comments
 (0)