Skip to content

Commit c640c50

Browse files
djaglowskijmsnll
authored andcommitted
[chore][pkg/stanza] Adjust length of knownFiles based on number of matches (open-telemetry#28646)
Follows open-telemetry#28493 This adjusts the length of `knownFiles` to be roughly 4x the number of matches per poll cycle. In other words, we will remember files for up to 4 poll cycles. Resolves open-telemetry#28567
1 parent 492be4e commit c640c50

File tree

2 files changed

+19
-7
lines changed

2 files changed

+19
-7
lines changed

pkg/stanza/fileconsumer/config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ func (c Config) buildManager(logger *zap.SugaredLogger, emit emit.Callback, spli
182182
maxBatchFiles: c.MaxConcurrentFiles / 2,
183183
maxBatches: c.MaxBatches,
184184
previousPollFiles: make([]*reader.Reader, 0, c.MaxConcurrentFiles/2),
185-
knownFiles: make([]*reader.Metadata, 0, 10*c.MaxConcurrentFiles),
185+
knownFiles: []*reader.Metadata{},
186186
}, nil
187187
}
188188

pkg/stanza/fileconsumer/file.go

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,26 @@ type Manager struct {
3434

3535
previousPollFiles []*reader.Reader
3636
knownFiles []*reader.Metadata
37+
38+
// This value approximates the expected number of files which we will find in a single poll cycle.
39+
// It is updated each poll cycle using a simple moving average calculation which assigns 20% weight
40+
// to the most recent poll cycle.
41+
// It is used to regulate the size of knownFiles. The goal is to allow knownFiles
42+
// to contain checkpoints from a few previous poll cycles, but not grow unbounded.
43+
movingAverageMatches int
3744
}
3845

3946
func (m *Manager) Start(persister operator.Persister) error {
4047
ctx, cancel := context.WithCancel(context.Background())
4148
m.cancel = cancel
4249

50+
if matches, err := m.fileMatcher.MatchFiles(); err != nil {
51+
m.Warnf("finding files: %v", err)
52+
} else {
53+
m.movingAverageMatches = len(matches)
54+
m.knownFiles = make([]*reader.Metadata, 0, 4*len(matches))
55+
}
56+
4357
if persister != nil {
4458
m.persister = persister
4559
offsets, err := checkpoint.Load(ctx, m.persister)
@@ -53,19 +67,15 @@ func (m *Manager) Start(persister operator.Persister) error {
5367
}
5468
}
5569

56-
if _, err := m.fileMatcher.MatchFiles(); err != nil {
57-
m.Warnf("finding files: %v", err)
58-
}
59-
6070
// Start polling goroutine
6171
m.startPoller(ctx)
6272

6373
return nil
6474
}
6575

6676
func (m *Manager) closePreviousFiles() {
67-
if forgetNum := len(m.previousPollFiles) + len(m.knownFiles) - cap(m.knownFiles); forgetNum > 0 {
68-
m.knownFiles = m.knownFiles[forgetNum:]
77+
if len(m.knownFiles) > 4*m.movingAverageMatches {
78+
m.knownFiles = m.knownFiles[m.movingAverageMatches:]
6979
}
7080
for _, r := range m.previousPollFiles {
7181
m.knownFiles = append(m.knownFiles, r.Close())
@@ -116,6 +126,8 @@ func (m *Manager) poll(ctx context.Context) {
116126
matches, err := m.fileMatcher.MatchFiles()
117127
if err != nil {
118128
m.Debugf("finding files: %v", err)
129+
} else {
130+
m.movingAverageMatches = (m.movingAverageMatches*3 + len(matches)) / 4
119131
}
120132
m.Debugf("matched files", zap.Strings("paths", matches))
121133

0 commit comments

Comments
 (0)