Skip to content

Commit 2e917b4

Browse files
committed
Try to remove pointers of state in metadata
Signed-off-by: Bogdan Drutu <[email protected]>
1 parent b589bef commit 2e917b4

File tree

4 files changed

+14
-25
lines changed

4 files changed

+14
-25
lines changed

pkg/stanza/fileconsumer/internal/reader/factory.go

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -59,20 +59,16 @@ func (f *Factory) NewReader(file *os.File, fp *fingerprint.Fingerprint) (*Reader
5959
m := &Metadata{
6060
Fingerprint: fp,
6161
FileAttributes: attributes,
62-
TokenLenState: &tokenlen.State{},
63-
}
64-
if f.FlushTimeout > 0 {
65-
m.FlushState = &flush.State{LastDataChange: time.Now()}
62+
TokenLenState: tokenlen.State{},
63+
FlushState: flush.State{
64+
FlushPeriod: f.FlushTimeout,
65+
LastDataChange: time.Now(),
66+
},
6667
}
6768
return f.NewReaderFromMetadata(file, m)
6869
}
6970

7071
func (f *Factory) NewReaderFromMetadata(file *os.File, m *Metadata) (r *Reader, err error) {
71-
// Ensure TokenLenState is initialized
72-
if m.TokenLenState == nil {
73-
m.TokenLenState = &tokenlen.State{}
74-
}
75-
7672
r = &Reader{
7773
Metadata: m,
7874
set: f.TelemetrySettings,
@@ -111,7 +107,7 @@ func (f *Factory) NewReaderFromMetadata(file *os.File, m *Metadata) (r *Reader,
111107
}
112108

113109
tokenLenFunc := m.TokenLenState.Func(f.SplitFunc)
114-
flushFunc := m.FlushState.Func(tokenLenFunc, f.FlushTimeout)
110+
flushFunc := m.FlushState.Func(tokenLenFunc)
115111
r.contentSplitFunc = trim.WithFunc(trim.ToLength(flushFunc, f.MaxLogSize), f.TrimFunc)
116112

117113
if f.HeaderConfig != nil && !m.HeaderFinalized {

pkg/stanza/fileconsumer/internal/reader/reader.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ type Metadata struct {
3131
RecordNum int64
3232
FileAttributes map[string]any
3333
HeaderFinalized bool
34-
FlushState *flush.State
35-
TokenLenState *tokenlen.State
34+
FlushState flush.State
35+
TokenLenState tokenlen.State
3636
}
3737

3838
// Reader manages a single file

pkg/stanza/flush/flush.go

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,16 @@ import (
1111
)
1212

1313
type State struct {
14+
FlushPeriod time.Duration
1415
LastDataChange time.Time
1516
LastDataLength int
1617
}
1718

1819
// Func wraps a bufio.SplitFunc with a timer.
1920
// When the timer expires, an incomplete token may be returned.
2021
// The timer will reset any time the data parameter changes.
21-
func (s *State) Func(splitFunc bufio.SplitFunc, period time.Duration) bufio.SplitFunc {
22-
if s == nil || period <= 0 {
22+
func (s *State) Func(splitFunc bufio.SplitFunc) bufio.SplitFunc {
23+
if s == nil || s.FlushPeriod <= 0 {
2324
return splitFunc
2425
}
2526

@@ -51,7 +52,7 @@ func (s *State) Func(splitFunc bufio.SplitFunc, period time.Duration) bufio.Spli
5152
}
5253

5354
// Flush timed out
54-
if internaltime.Since(s.LastDataChange) > period {
55+
if internaltime.Since(s.LastDataChange) > s.FlushPeriod {
5556
s.LastDataChange = internaltime.Now()
5657
s.LastDataLength = 0
5758
return len(data), data, nil
@@ -61,9 +62,3 @@ func (s *State) Func(splitFunc bufio.SplitFunc, period time.Duration) bufio.Spli
6162
return 0, nil, nil
6263
}
6364
}
64-
65-
// Deprecated: [v0.88.0] Use WithFunc instead.
66-
func WithPeriod(splitFunc bufio.SplitFunc, period time.Duration) bufio.SplitFunc {
67-
s := &State{LastDataChange: internaltime.Now()}
68-
return s.Func(splitFunc, period)
69-
}

pkg/stanza/flush/flush_test.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,7 @@ func TestNewlineSplitFunc(t *testing.T) {
4141
}
4242

4343
for _, tc := range testCases {
44-
t.Run(tc.name+"/WithPeriod", splittest.New(WithPeriod(tc.baseFunc, tc.flushPeriod), tc.input, tc.steps...))
45-
46-
previousState := &State{LastDataChange: time.Now()}
47-
t.Run(tc.name+"/Func", splittest.New(previousState.Func(tc.baseFunc, tc.flushPeriod), tc.input, tc.steps...))
44+
previousState := &State{FlushPeriod: tc.flushPeriod, LastDataChange: time.Now()}
45+
t.Run(tc.name+"/Func", splittest.New(previousState.Func(tc.baseFunc), tc.input, tc.steps...))
4846
}
4947
}

0 commit comments

Comments
 (0)