Skip to content

Commit e59b272

Browse files
authored
[chore][pkg/stanaza] Remove currentFps from fileconsumer.Manager (#28493)
1 parent 9685d4b commit e59b272

File tree

1 file changed

+26
-43
lines changed

1 file changed

+26
-43
lines changed

pkg/stanza/fileconsumer/file.go

Lines changed: 26 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,6 @@ type Manager struct {
3434

3535
previousPollFiles []*reader.Reader
3636
knownFiles []*reader.Metadata
37-
38-
currentFps []*fingerprint.Fingerprint
3937
}
4038

4139
func (m *Manager) Start(persister operator.Persister) error {
@@ -146,14 +144,8 @@ func (m *Manager) poll(ctx context.Context) {
146144
}
147145

148146
func (m *Manager) consume(ctx context.Context, paths []string) {
149-
m.Debug("Consuming files")
150-
readers := make([]*reader.Reader, 0, len(paths))
151-
for _, path := range paths {
152-
r := m.makeReader(path)
153-
if r != nil {
154-
readers = append(readers, r)
155-
}
156-
}
147+
m.Debug("Consuming files", zap.Strings("paths", paths))
148+
readers := m.makeReaders(paths)
157149

158150
// take care of files which disappeared from the pattern since the last poll cycle
159151
// this can mean either files which were removed, or rotated into a name not matching the pattern
@@ -173,7 +165,6 @@ func (m *Manager) consume(ctx context.Context, paths []string) {
173165
wg.Wait()
174166

175167
m.previousPollFiles = readers
176-
m.clearCurrentFingerprints()
177168
}
178169

179170
func (m *Manager) makeFingerprint(path string) (*fingerprint.Fingerprint, *os.File) {
@@ -201,45 +192,37 @@ func (m *Manager) makeFingerprint(path string) (*fingerprint.Fingerprint, *os.Fi
201192
return fp, file
202193
}
203194

204-
func (m *Manager) checkDuplicates(fp *fingerprint.Fingerprint) bool {
205-
for i := 0; i < len(m.currentFps); i++ {
206-
if fp.Equal(m.currentFps[i]) {
207-
return true
208-
}
209-
}
210-
return false
211-
}
212-
213195
// makeReader take a file path, then creates reader,
214196
// discarding any that have a duplicate fingerprint to other files that have already
215197
// been read this polling interval
216-
func (m *Manager) makeReader(path string) *reader.Reader {
217-
// Open the files first to minimize the time between listing and opening
218-
fp, file := m.makeFingerprint(path)
219-
if fp == nil {
220-
return nil
221-
}
222-
223-
// Exclude any empty fingerprints or duplicate fingerprints to avoid doubling up on copy-truncate files
224-
if m.checkDuplicates(fp) {
225-
if err := file.Close(); err != nil {
226-
m.Debugw("problem closing file", zap.Error(err))
198+
func (m *Manager) makeReaders(paths []string) []*reader.Reader {
199+
readers := make([]*reader.Reader, 0, len(paths))
200+
for _, path := range paths {
201+
fp, file := m.makeFingerprint(path)
202+
if fp == nil {
203+
continue
227204
}
228-
return nil
229-
}
230205

231-
m.currentFps = append(m.currentFps, fp)
232-
reader, err := m.newReader(file, fp)
233-
if err != nil {
234-
m.Errorw("Failed to create reader", zap.Error(err))
235-
return nil
236-
}
206+
// Exclude duplicate paths with the same content. This can happen when files are
207+
// being rotated with copy/truncate strategy. (After copy, prior to truncate.)
208+
for _, r := range readers {
209+
if fp.Equal(r.Fingerprint) {
210+
if err := file.Close(); err != nil {
211+
m.Debugw("problem closing file", zap.Error(err))
212+
}
213+
continue
214+
}
215+
}
237216

238-
return reader
239-
}
217+
r, err := m.newReader(file, fp)
218+
if err != nil {
219+
m.Errorw("Failed to create reader", zap.Error(err))
220+
continue
221+
}
240222

241-
func (m *Manager) clearCurrentFingerprints() {
242-
m.currentFps = make([]*fingerprint.Fingerprint, 0)
223+
readers = append(readers, r)
224+
}
225+
return readers
243226
}
244227

245228
func (m *Manager) newReader(file *os.File, fp *fingerprint.Fingerprint) (*reader.Reader, error) {

0 commit comments

Comments
 (0)