Skip to content

Commit f0be0b5

Browse files
committed
[stanza] Use buffer pool for the read buffers to limit allocations
Signed-off-by: Bogdan Drutu <[email protected]>
1 parent 1fdf89d commit f0be0b5

File tree

5 files changed

+59
-13
lines changed

5 files changed

+59
-13
lines changed

.chloggen/pool-bufs-factory.yaml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: pkg/stanza
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Use buffer pool for the read buffers to limit allocations"
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [39373]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: []

pkg/stanza/fileconsumer/internal/reader/factory.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"errors"
99
"fmt"
1010
"os"
11+
"sync"
1112
"time"
1213

1314
"go.opentelemetry.io/collector/component"
@@ -34,6 +35,7 @@ type Factory struct {
3435
HeaderConfig *header.Config
3536
FromBeginning bool
3637
FingerprintSize int
38+
BufPool sync.Pool
3739
InitialBufferSize int
3840
MaxLogSize int
3941
Encoding encoding.Encoding
@@ -75,6 +77,7 @@ func (f *Factory) NewReaderFromMetadata(file *os.File, m *Metadata) (r *Reader,
7577
file: file,
7678
fileName: file.Name(),
7779
fingerprintSize: f.FingerprintSize,
80+
bufPool: &f.BufPool,
7881
initialBufferSize: f.InitialBufferSize,
7982
maxLogSize: f.MaxLogSize,
8083
decoder: f.Encoding.NewDecoder(),

pkg/stanza/fileconsumer/internal/reader/reader.go

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"errors"
1111
"io"
1212
"os"
13+
"sync"
1314

1415
"go.opentelemetry.io/collector/component"
1516
"go.uber.org/zap"
@@ -42,6 +43,7 @@ type Reader struct {
4243
file *os.File
4344
reader io.Reader
4445
fingerprintSize int
46+
bufPool *sync.Pool
4547
initialBufferSize int
4648
maxLogSize int
4749
headerSplitFunc bufio.SplitFunc
@@ -116,7 +118,9 @@ func (r *Reader) ReadToEnd(ctx context.Context) {
116118
}
117119

118120
func (r *Reader) readHeader(ctx context.Context) (doneReadingFile bool) {
119-
s := scanner.New(r, r.maxLogSize, r.initialBufferSize, r.Offset, r.headerSplitFunc)
121+
bufPtr := r.getBufPtrFromPool()
122+
defer r.bufPool.Put(bufPtr)
123+
s := scanner.New(r, r.maxLogSize, *bufPtr, r.Offset, r.headerSplitFunc)
120124

121125
// Read the tokens from the file until no more header tokens are found or the end of file is reached.
122126
for {
@@ -176,15 +180,18 @@ func (r *Reader) readHeader(ctx context.Context) (doneReadingFile bool) {
176180
}
177181

178182
func (r *Reader) readContents(ctx context.Context) {
179-
// Create the scanner to read the contents of the file.
180-
bufferSize := r.initialBufferSize
181-
if r.TokenLenState.MinimumLength > bufferSize {
183+
var buf []byte
184+
if r.TokenLenState.MinimumLength <= r.initialBufferSize {
185+
bufPtr := r.getBufPtrFromPool()
186+
buf = *bufPtr
187+
defer r.bufPool.Put(bufPtr)
188+
} else {
182189
// If we previously saw a potential token larger than the default buffer,
183-
// size the buffer to be at least one byte larger so we can see if there's more data
184-
bufferSize = r.TokenLenState.MinimumLength + 1
190+
// size the buffer to be at least one byte larger so we can see if there's more data.
191+
// Usually, expect this to be a rare event so that we don't bother pooling this special buffer size.
192+
buf = make([]byte, 0, r.TokenLenState.MinimumLength+1)
185193
}
186-
187-
s := scanner.New(r, r.maxLogSize, bufferSize, r.Offset, r.contentSplitFunc)
194+
s := scanner.New(r, r.maxLogSize, buf, r.Offset, r.contentSplitFunc)
188195

189196
tokenBodies := make([][]byte, r.maxBatchSize)
190197
numTokensBatched := 0
@@ -319,3 +326,12 @@ func (r *Reader) updateFingerprint() {
319326
}
320327
r.Fingerprint = refreshedFingerprint
321328
}
329+
330+
func (r *Reader) getBufPtrFromPool() *[]byte {
331+
bufP := r.bufPool.Get()
332+
if bufP == nil {
333+
buf := make([]byte, 0, r.initialBufferSize)
334+
return &buf
335+
}
336+
return bufP.(*[]byte)
337+
}

pkg/stanza/fileconsumer/internal/scanner/scanner.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@ type Scanner struct {
2020
}
2121

2222
// New creates a new positional scanner
23-
func New(r io.Reader, maxLogSize int, bufferSize int, startOffset int64, splitFunc bufio.SplitFunc) *Scanner {
23+
func New(r io.Reader, maxLogSize int, buf []byte, startOffset int64, splitFunc bufio.SplitFunc) *Scanner {
2424
s := &Scanner{Scanner: bufio.NewScanner(r), pos: startOffset}
25-
s.Buffer(make([]byte, 0, bufferSize), maxLogSize)
25+
s.Buffer(buf, maxLogSize)
2626
scanFunc := func(data []byte, atEOF bool) (advance int, token []byte, err error) {
2727
advance, token, err = splitFunc(data, atEOF)
2828
s.pos += int64(advance)

pkg/stanza/fileconsumer/internal/scanner/scanner_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ func TestScanner(t *testing.T) {
7777

7878
for _, tc := range testCases {
7979
t.Run(tc.name, func(t *testing.T) {
80-
scanner := New(bytes.NewReader(tc.stream), tc.maxSize, tc.bufferSize, tc.startOffset, simpleSplit(tc.delimiter))
80+
scanner := New(bytes.NewReader(tc.stream), tc.maxSize, make([]byte, 0, tc.bufferSize), tc.startOffset, simpleSplit(tc.delimiter))
8181
for i, p := 0, 0; scanner.Scan(); i++ {
8282
assert.NoError(t, scanner.Error())
8383

@@ -117,12 +117,12 @@ func (r *errReader) Read([]byte) (n int, err error) {
117117

118118
func TestScannerError(t *testing.T) {
119119
reader := &errReader{err: bufio.ErrTooLong}
120-
scanner := New(reader, 100, 100, 0, simpleSplit([]byte("\n")))
120+
scanner := New(reader, 100, make([]byte, 0, 100), 0, simpleSplit([]byte("\n")))
121121
assert.False(t, scanner.Scan())
122122
assert.EqualError(t, scanner.Error(), "log entry too large")
123123

124124
reader = &errReader{err: errors.New("some err")}
125-
scanner = New(reader, 100, 100, 0, simpleSplit([]byte("\n")))
125+
scanner = New(reader, 100, make([]byte, 0, 100), 0, simpleSplit([]byte("\n")))
126126
assert.False(t, scanner.Scan())
127127
assert.EqualError(t, scanner.Error(), "scanner error: some err")
128128
}

0 commit comments

Comments
 (0)