Skip to content

Commit 1d85725

Browse files
authored
Delete unnecessary FingerprintUpdatingReader (#202)
* Merged FingerprintUpdatingReader into Reader * Add a robust test for fingerprint updating * lint * Don't save fingerprintSize on reader
1 parent d485c45 commit 1d85725

File tree

2 files changed

+67
-31
lines changed

2 files changed

+67
-31
lines changed

operator/builtin/input/file/file_test.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -635,6 +635,65 @@ func TestFileReader_FingerprintUpdated(t *testing.T) {
635635
require.Equal(t, []byte("testlog1\n"), reader.Fingerprint.FirstBytes)
636636
}
637637

638+
// Test that a fingerprint:
639+
// - Starts empty
640+
// - Updates as a file is read
641+
// - Stops updating when the max fingerprint size is reached
642+
// - Stops exactly at max fingerprint size, regardless of content
643+
func TestFingerprintGrowsAndStops(t *testing.T) {
644+
t.Parallel()
645+
646+
// Use a number with many factors.
647+
// Sometimes fingerprint length will align with
648+
// the end of a line, sometimes not. Test both.
649+
maxFP := 360
650+
651+
// Use prime numbers to ensure variation in
652+
// whether or not they are factors of maxFP
653+
lineLens := []int{3, 5, 7, 11, 13, 17, 19, 23, 27}
654+
655+
for _, lineLen := range lineLens {
656+
t.Run(fmt.Sprintf("%d", lineLen), func(t *testing.T) {
657+
t.Parallel()
658+
operator, _, tempDir := newTestFileOperator(t, func(cfg *InputConfig) {
659+
cfg.FingerprintSize = helper.ByteSize(maxFP)
660+
}, nil)
661+
defer operator.Stop()
662+
663+
temp := openTemp(t, tempDir)
664+
tempCopy := openFile(t, temp.Name())
665+
fp, err := operator.NewFingerprint(temp)
666+
require.NoError(t, err)
667+
require.Equal(t, []byte(""), fp.FirstBytes)
668+
669+
reader, err := operator.NewReader(temp.Name(), tempCopy, fp)
670+
require.NoError(t, err)
671+
defer reader.Close()
672+
673+
// keep track of what has been written to the file
674+
fileContent := []byte{}
675+
676+
// keep track of expected fingerprint size
677+
expectedFP := 0
678+
679+
// Write lines until file is much larger than the length of the fingerprint
680+
for len(fileContent) < 2*maxFP {
681+
expectedFP += lineLen
682+
if expectedFP > maxFP {
683+
expectedFP = maxFP
684+
}
685+
686+
line := stringWithLength(lineLen-1) + "\n"
687+
fileContent = append(fileContent, []byte(line)...)
688+
689+
writeString(t, temp, line)
690+
reader.ReadToEnd(context.Background())
691+
require.Equal(t, fileContent[:expectedFP], reader.Fingerprint.FirstBytes)
692+
}
693+
})
694+
}
695+
}
696+
638697
func TestEncodings(t *testing.T) {
639698
t.Parallel()
640699
cases := []struct {

operator/builtin/input/file/reader.go

Lines changed: 8 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ import (
1818
"bufio"
1919
"context"
2020
"fmt"
21-
"io"
2221
"os"
2322
"path/filepath"
2423

@@ -107,7 +106,6 @@ func (f *Reader) InitializeOffset(startAtBeginning bool) error {
107106
}
108107
f.Offset = info.Size()
109108
}
110-
111109
return nil
112110
}
113111

@@ -118,8 +116,7 @@ func (f *Reader) ReadToEnd(ctx context.Context) {
118116
return
119117
}
120118

121-
fr := NewFingerprintUpdatingReader(f.file, f.Offset, f.Fingerprint, f.fileInput.fingerprintSize)
122-
scanner := NewPositionalScanner(fr, f.fileInput.MaxLogSize, f.Offset, f.fileInput.SplitFunc)
119+
scanner := NewPositionalScanner(f, f.fileInput.MaxLogSize, f.Offset, f.fileInput.SplitFunc)
123120

124121
// Iterate over the tokenized file, emitting entries as we go
125122
for {
@@ -215,34 +212,14 @@ func getScannerError(scanner *PositionalScanner) error {
215212
return nil
216213
}
217214

218-
// NewFingerprintUpdatingReader creates a new FingerprintUpdatingReader starting starting at the given offset
219-
func NewFingerprintUpdatingReader(r io.Reader, offset int64, f *Fingerprint, fingerprintSize int) *FingerprintUpdatingReader {
220-
return &FingerprintUpdatingReader{
221-
fingerprint: f,
222-
fingerprintSize: fingerprintSize,
223-
reader: r,
224-
offset: offset,
225-
}
226-
}
227-
228-
// FingerprintUpdatingReader wraps another reader, and updates the fingerprint
229-
// with each read in the first fingerPrintSize bytes
230-
type FingerprintUpdatingReader struct {
231-
fingerprint *Fingerprint
232-
fingerprintSize int
233-
reader io.Reader
234-
offset int64
235-
}
236-
237-
// Read reads from the wrapped reader, saving the read bytes to the fingerprint
238-
func (f *FingerprintUpdatingReader) Read(dst []byte) (int, error) {
239-
if len(f.fingerprint.FirstBytes) == f.fingerprintSize {
240-
return f.reader.Read(dst)
215+
// Read from the file and update the fingerprint if necessary
216+
func (f *Reader) Read(dst []byte) (int, error) {
217+
if len(f.Fingerprint.FirstBytes) == f.fileInput.fingerprintSize {
218+
return f.file.Read(dst)
241219
}
242-
n, err := f.reader.Read(dst)
243-
appendCount := min0(n, f.fingerprintSize-int(f.offset))
244-
f.fingerprint.FirstBytes = append(f.fingerprint.FirstBytes[:f.offset], dst[:appendCount]...)
245-
f.offset += int64(n)
220+
n, err := f.file.Read(dst)
221+
appendCount := min0(n, f.fileInput.fingerprintSize-int(f.Offset))
222+
f.Fingerprint.FirstBytes = append(f.Fingerprint.FirstBytes[:f.Offset], dst[:appendCount]...)
246223
return n, err
247224
}
248225

0 commit comments

Comments
 (0)