Skip to content

Commit 4c84447

Browse files
committed
[stanza] Use buffer pool for the read buffers to limit allocations
Signed-off-by: Bogdan Drutu <[email protected]>
1 parent 1fdf89d commit 4c84447

File tree

8 files changed

+172
-15
lines changed

8 files changed

+172
-15
lines changed

.chloggen/pool-bufs-factory.yaml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: pkg/stanza
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Use buffer pool for the read buffers to limit allocations"
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [39373]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: []

pkg/stanza/fileconsumer/benchmark_test.go

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"os"
1010
"path/filepath"
1111
"sync"
12+
"sync/atomic"
1213
"testing"
1314
"time"
1415

@@ -161,6 +162,7 @@ func BenchmarkFileInput(b *testing.B) {
161162

162163
for _, bench := range cases {
163164
b.Run(bench.name, func(b *testing.B) {
165+
b.ReportAllocs()
164166
rootDir := b.TempDir()
165167

166168
var files []*os.File
@@ -228,3 +230,110 @@ func BenchmarkFileInput(b *testing.B) {
228230
})
229231
}
230232
}
233+
234+
func BenchmarkConsumeFiles(b *testing.B) {
235+
cases := []fileInputBenchmark{
236+
{
237+
name: "Single",
238+
paths: []string{
239+
"file0.log",
240+
},
241+
config: func() *Config {
242+
cfg := NewConfig()
243+
cfg.Include = []string{
244+
"file0.log",
245+
}
246+
cfg.MaxLogSize = 1 * 1024 * 1024
247+
cfg.InitialBufferSize = 1 * 1024 * 1024
248+
cfg.FingerprintSize = fingerprint.DefaultSize / 10
249+
return cfg
250+
},
251+
},
252+
{
253+
name: "Multiple",
254+
paths: func() []string {
255+
paths := make([]string, 100)
256+
for i := range paths {
257+
paths[i] = fmt.Sprintf("file%d.log", i)
258+
}
259+
return paths
260+
}(),
261+
config: func() *Config {
262+
cfg := NewConfig()
263+
cfg.Include = []string{"file*.log"}
264+
cfg.Encoding = ""
265+
cfg.FingerprintSize = fingerprint.DefaultSize / 10
266+
cfg.MaxLogSize = 1 * 1024 * 1024
267+
cfg.InitialBufferSize = 1 * 1024 * 1024
268+
cfg.MaxConcurrentFiles = 10
269+
return cfg
270+
},
271+
},
272+
}
273+
274+
// Pregenerate some lines which we can write to the files
275+
// to avoid measuring the time it takes to generate them
276+
// and to reduce the amount of syscalls in the benchmark.
277+
uniqueLines := 10
278+
severalLines := ""
279+
for i := 0; i < uniqueLines; i++ {
280+
severalLines += string(filetest.TokenWithLength(999)) + "\n"
281+
}
282+
283+
for _, bench := range cases {
284+
b.Run(bench.name, func(b *testing.B) {
285+
rootDir := b.TempDir()
286+
287+
var consumePaths []string
288+
var files []*os.File
289+
for _, path := range bench.paths {
290+
consumePath := filepath.Join(rootDir, path)
291+
consumePaths = append(consumePaths, consumePath)
292+
f := filetest.OpenFile(b, consumePath)
293+
files = append(files, f)
294+
// Initialize the file to ensure a unique fingerprint
295+
_, err := f.WriteString(f.Name() + "\n")
296+
require.NoError(b, err)
297+
for i := 0; i < b.N; i++ {
298+
_, err := f.WriteString(severalLines)
299+
require.NoError(b, err)
300+
}
301+
require.NoError(b, f.Sync())
302+
}
303+
304+
cfg := bench.config()
305+
for i, inc := range cfg.Include {
306+
cfg.Include[i] = filepath.Join(rootDir, inc)
307+
}
308+
cfg.StartAt = "beginning"
309+
// Use a long poll so that we don't trigger it.
310+
cfg.PollInterval = 1 * time.Hour
311+
312+
doneChan := make(chan bool, len(files))
313+
numTokens := &atomic.Int64{}
314+
callback := func(_ context.Context, tokens [][]byte, _ map[string]any, _ int64) error {
315+
if numTokens.Add(int64(len(tokens))) == int64(len(files)*(b.N*uniqueLines+1)) {
316+
close(doneChan)
317+
}
318+
return nil
319+
}
320+
set := componenttest.NewNopTelemetrySettings()
321+
op, err := cfg.Build(set, callback)
322+
require.NoError(b, err)
323+
324+
require.NoError(b, op.Start(testutil.NewUnscopedMockPersister()))
325+
defer func() {
326+
require.NoError(b, op.Stop())
327+
}()
328+
329+
b.ReportAllocs()
330+
b.ResetTimer()
331+
for len(consumePaths) > op.maxBatchFiles {
332+
op.consume(context.Background(), consumePaths[:op.maxBatchFiles])
333+
consumePaths = consumePaths[op.maxBatchFiles:]
334+
}
335+
op.consume(context.Background(), consumePaths)
336+
<-doneChan
337+
})
338+
}
339+
}

pkg/stanza/fileconsumer/config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ func (c Config) Build(set component.TelemetrySettings, emit emit.Callback, opts
152152
}
153153

154154
set.Logger = set.Logger.With(zap.String("component", "fileconsumer"))
155-
readerFactory := reader.Factory{
155+
readerFactory := &reader.Factory{
156156
TelemetrySettings: set,
157157
FromBeginning: startAtBeginning,
158158
FingerprintSize: int(c.FingerprintSize),

pkg/stanza/fileconsumer/file.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ type Manager struct {
2727
wg sync.WaitGroup
2828
cancel context.CancelFunc
2929

30-
readerFactory reader.Factory
30+
readerFactory *reader.Factory
3131
fileMatcher *matcher.Matcher
3232
tracker tracker.Tracker
3333
noTracking bool

pkg/stanza/fileconsumer/internal/reader/factory.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"errors"
99
"fmt"
1010
"os"
11+
"sync"
1112
"time"
1213

1314
"go.opentelemetry.io/collector/component"
@@ -34,6 +35,7 @@ type Factory struct {
3435
HeaderConfig *header.Config
3536
FromBeginning bool
3637
FingerprintSize int
38+
BufPool sync.Pool
3739
InitialBufferSize int
3840
MaxLogSize int
3941
Encoding encoding.Encoding
@@ -75,6 +77,7 @@ func (f *Factory) NewReaderFromMetadata(file *os.File, m *Metadata) (r *Reader,
7577
file: file,
7678
fileName: file.Name(),
7779
fingerprintSize: f.FingerprintSize,
80+
bufPool: &f.BufPool,
7881
initialBufferSize: f.InitialBufferSize,
7982
maxLogSize: f.MaxLogSize,
8083
decoder: f.Encoding.NewDecoder(),

pkg/stanza/fileconsumer/internal/reader/reader.go

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,10 @@ import (
88
"compress/gzip"
99
"context"
1010
"errors"
11+
"fmt"
1112
"io"
1213
"os"
14+
"sync"
1315

1416
"go.opentelemetry.io/collector/component"
1517
"go.uber.org/zap"
@@ -42,6 +44,7 @@ type Reader struct {
4244
file *os.File
4345
reader io.Reader
4446
fingerprintSize int
47+
bufPool *sync.Pool
4548
initialBufferSize int
4649
maxLogSize int
4750
headerSplitFunc bufio.SplitFunc
@@ -116,7 +119,9 @@ func (r *Reader) ReadToEnd(ctx context.Context) {
116119
}
117120

118121
func (r *Reader) readHeader(ctx context.Context) (doneReadingFile bool) {
119-
s := scanner.New(r, r.maxLogSize, r.initialBufferSize, r.Offset, r.headerSplitFunc)
122+
bufPtr := r.getBufPtrFromPool()
123+
defer r.bufPool.Put(bufPtr)
124+
s := scanner.New(r, r.maxLogSize, *bufPtr, r.Offset, r.headerSplitFunc)
120125

121126
// Read the tokens from the file until no more header tokens are found or the end of file is reached.
122127
for {
@@ -176,15 +181,19 @@ func (r *Reader) readHeader(ctx context.Context) (doneReadingFile bool) {
176181
}
177182

178183
func (r *Reader) readContents(ctx context.Context) {
179-
// Create the scanner to read the contents of the file.
180-
bufferSize := r.initialBufferSize
181-
if r.TokenLenState.MinimumLength > bufferSize {
184+
var buf []byte
185+
fmt.Println(r.fileName)
186+
if r.TokenLenState.MinimumLength <= r.initialBufferSize {
187+
bufPtr := r.getBufPtrFromPool()
188+
buf = *bufPtr
189+
defer r.bufPool.Put(bufPtr)
190+
} else {
182191
// If we previously saw a potential token larger than the default buffer,
183-
// size the buffer to be at least one byte larger so we can see if there's more data
184-
bufferSize = r.TokenLenState.MinimumLength + 1
192+
// size the buffer to be at least one byte larger so we can see if there's more data.
193+
// Usually, expect this to be a rare event so that we don't bother pooling this special buffer size.
194+
buf = make([]byte, 0, r.TokenLenState.MinimumLength+1)
185195
}
186-
187-
s := scanner.New(r, r.maxLogSize, bufferSize, r.Offset, r.contentSplitFunc)
196+
s := scanner.New(r, r.maxLogSize, buf, r.Offset, r.contentSplitFunc)
188197

189198
tokenBodies := make([][]byte, r.maxBatchSize)
190199
numTokensBatched := 0
@@ -319,3 +328,12 @@ func (r *Reader) updateFingerprint() {
319328
}
320329
r.Fingerprint = refreshedFingerprint
321330
}
331+
332+
func (r *Reader) getBufPtrFromPool() *[]byte {
333+
bufP := r.bufPool.Get()
334+
if bufP == nil {
335+
buf := make([]byte, 0, r.initialBufferSize)
336+
return &buf
337+
}
338+
return bufP.(*[]byte)
339+
}

pkg/stanza/fileconsumer/internal/scanner/scanner.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@ type Scanner struct {
2020
}
2121

2222
// New creates a new positional scanner
23-
func New(r io.Reader, maxLogSize int, bufferSize int, startOffset int64, splitFunc bufio.SplitFunc) *Scanner {
23+
func New(r io.Reader, maxLogSize int, buf []byte, startOffset int64, splitFunc bufio.SplitFunc) *Scanner {
2424
s := &Scanner{Scanner: bufio.NewScanner(r), pos: startOffset}
25-
s.Buffer(make([]byte, 0, bufferSize), maxLogSize)
25+
s.Buffer(buf, maxLogSize)
2626
scanFunc := func(data []byte, atEOF bool) (advance int, token []byte, err error) {
2727
advance, token, err = splitFunc(data, atEOF)
2828
s.pos += int64(advance)

pkg/stanza/fileconsumer/internal/scanner/scanner_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ func TestScanner(t *testing.T) {
7777

7878
for _, tc := range testCases {
7979
t.Run(tc.name, func(t *testing.T) {
80-
scanner := New(bytes.NewReader(tc.stream), tc.maxSize, tc.bufferSize, tc.startOffset, simpleSplit(tc.delimiter))
80+
scanner := New(bytes.NewReader(tc.stream), tc.maxSize, make([]byte, 0, tc.bufferSize), tc.startOffset, simpleSplit(tc.delimiter))
8181
for i, p := 0, 0; scanner.Scan(); i++ {
8282
assert.NoError(t, scanner.Error())
8383

@@ -117,12 +117,12 @@ func (r *errReader) Read([]byte) (n int, err error) {
117117

118118
func TestScannerError(t *testing.T) {
119119
reader := &errReader{err: bufio.ErrTooLong}
120-
scanner := New(reader, 100, 100, 0, simpleSplit([]byte("\n")))
120+
scanner := New(reader, 100, make([]byte, 0, 100), 0, simpleSplit([]byte("\n")))
121121
assert.False(t, scanner.Scan())
122122
assert.EqualError(t, scanner.Error(), "log entry too large")
123123

124124
reader = &errReader{err: errors.New("some err")}
125-
scanner = New(reader, 100, 100, 0, simpleSplit([]byte("\n")))
125+
scanner = New(reader, 100, make([]byte, 0, 100), 0, simpleSplit([]byte("\n")))
126126
assert.False(t, scanner.Scan())
127127
assert.EqualError(t, scanner.Error(), "scanner error: some err")
128128
}

0 commit comments

Comments
 (0)