Skip to content

Commit 4fd84f1

Browse files
djaglowskichengchuanpeng
authored andcommitted
[receiver/filelog] Fix issue where flushed tokens could be truncated (open-telemetry#37596)
Fixes open-telemetry#35042 (and open-telemetry#32100 again) The issue affected unterminated logs of particular lengths. Specifically, longer than our internal `scanner.DefaultBufferSize` (16kB) and shorter than `max_log_size`. The failure mode was described in open-telemetry#32100 but was apparently only fixed in some circumstances. I believe this is a more robust fix. I'll articulate the exact failure mode again here: 1. During a poll cycle, `reader.ReadToEnd` is called. Within this, a scanner is created which starts with a default buffer size. The buffer is filled, but no terminator is found. Therefore the scanner resizes the buffer to accommodate more data, hoping to find a terminator. Eventually, the buffer is large enough to contain all content until EOF, but still no terminator was found. At this time, the flush timer has not expired, so `reader.ReadToEnd` returns without emitting anything. 2. During the _next_ poll cycle, `reader.ReadToEnd` creates a new scanner, also with default buffer size. The first time is looks for a terminator, it of course doesn't find one, but at this time the flush timer has expired. Therefore, instead of resizing the buffer and continuing to look for a terminator, it just emits what it has. What should happen instead is the scanner continues to resize the buffer to find as much of the unterminated token as possible before emitting it. Therefore, this fix introduces a simple layer into the split func stack which allows us to reason about unterminated tokens more carefully. It captures the length of unterminated tokens and ensures that when we recreate a scanner, we will start with a buffer size that is appropriate to read the same content as last time, plus one additional byte. The extra byte allows us to check if new content has been added, in which case we will resume resizing. If no new content is found, the flusher will emit the entire unterminated token as one.
1 parent 8f043c1 commit 4fd84f1

File tree

7 files changed

+282
-4
lines changed

7 files changed

+282
-4
lines changed

.chloggen/fix-flush-short.yaml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: bug_fix
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: filelogreceiver
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Fix issue where flushed tokens could be truncated.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [35042]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: []

pkg/stanza/fileconsumer/internal/reader/factory.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint"
2121
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/header"
2222
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/flush"
23+
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/tokenlen"
2324
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim"
2425
)
2526

@@ -56,14 +57,23 @@ func (f *Factory) NewReader(file *os.File, fp *fingerprint.Fingerprint) (*Reader
5657
if err != nil {
5758
return nil, err
5859
}
59-
m := &Metadata{Fingerprint: fp, FileAttributes: attributes}
60+
m := &Metadata{
61+
Fingerprint: fp,
62+
FileAttributes: attributes,
63+
TokenLenState: &tokenlen.State{},
64+
}
6065
if f.FlushTimeout > 0 {
6166
m.FlushState = &flush.State{LastDataChange: time.Now()}
6267
}
6368
return f.NewReaderFromMetadata(file, m)
6469
}
6570

6671
func (f *Factory) NewReaderFromMetadata(file *os.File, m *Metadata) (r *Reader, err error) {
72+
// Ensure TokenLenState is initialized
73+
if m.TokenLenState == nil {
74+
m.TokenLenState = &tokenlen.State{}
75+
}
76+
6777
r = &Reader{
6878
Metadata: m,
6979
set: f.TelemetrySettings,
@@ -77,6 +87,7 @@ func (f *Factory) NewReaderFromMetadata(file *os.File, m *Metadata) (r *Reader,
7787
includeFileRecordNum: f.IncludeFileRecordNumber,
7888
compression: f.Compression,
7989
acquireFSLock: f.AcquireFSLock,
90+
emitFunc: f.EmitFunc,
8091
}
8192
r.set.Logger = r.set.Logger.With(zap.String("path", r.fileName))
8293

@@ -100,9 +111,10 @@ func (f *Factory) NewReaderFromMetadata(file *os.File, m *Metadata) (r *Reader,
100111
r.Offset = info.Size()
101112
}
102113

103-
flushFunc := m.FlushState.Func(f.SplitFunc, f.FlushTimeout)
114+
tokenLenFunc := m.TokenLenState.Func(f.SplitFunc)
115+
flushFunc := m.FlushState.Func(tokenLenFunc, f.FlushTimeout)
104116
r.contentSplitFunc = trim.WithFunc(trim.ToLength(flushFunc, f.MaxLogSize), f.TrimFunc)
105-
r.emitFunc = f.EmitFunc
117+
106118
if f.HeaderConfig != nil && !m.HeaderFinalized {
107119
r.headerSplitFunc = f.HeaderConfig.SplitFunc
108120
r.headerReader, err = header.NewReader(f.TelemetrySettings, *f.HeaderConfig)

pkg/stanza/fileconsumer/internal/reader/reader.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/header"
2222
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/scanner"
2323
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/flush"
24+
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/tokenlen"
2425
)
2526

2627
type Metadata struct {
@@ -30,6 +31,7 @@ type Metadata struct {
3031
FileAttributes map[string]any
3132
HeaderFinalized bool
3233
FlushState *flush.State
34+
TokenLenState *tokenlen.State
3335
}
3436

3537
// Reader manages a single file
@@ -177,7 +179,14 @@ func (r *Reader) readHeader(ctx context.Context) (doneReadingFile bool) {
177179

178180
func (r *Reader) readContents(ctx context.Context) {
179181
// Create the scanner to read the contents of the file.
180-
s := scanner.New(r, r.maxLogSize, r.initialBufferSize, r.Offset, r.contentSplitFunc)
182+
bufferSize := r.initialBufferSize
183+
if r.TokenLenState.MinimumLength > bufferSize {
184+
// If we previously saw a potential token larger than the default buffer,
185+
// size the buffer to be at least one byte larger so we can see if there's more data
186+
bufferSize = r.TokenLenState.MinimumLength + 1
187+
}
188+
189+
s := scanner.New(r, r.maxLogSize, bufferSize, r.Offset, r.contentSplitFunc)
181190

182191
// Iterate over the contents of the file.
183192
for {

pkg/stanza/fileconsumer/internal/reader/reader_test.go

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,7 @@ func TestFingerprintChangeSize(t *testing.T) {
189189
func TestFlushPeriodEOF(t *testing.T) {
190190
tempDir := t.TempDir()
191191
temp := filetest.OpenTemp(t, tempDir)
192+
192193
// Create a long enough initial token, so the scanner can't read the whole file at once
193194
aContentLength := 2 * 16 * 1024
194195
content := []byte(strings.Repeat("a", aContentLength))
@@ -223,3 +224,101 @@ func TestFlushPeriodEOF(t *testing.T) {
223224
r.ReadToEnd(context.Background())
224225
sink.ExpectToken(t, []byte{'b'})
225226
}
227+
228+
func TestUntermintedLongLogEntry(t *testing.T) {
229+
tempDir := t.TempDir()
230+
temp := filetest.OpenTemp(t, tempDir)
231+
232+
// Create a log entry longer than DefaultBufferSize (16KB) but shorter than maxLogSize
233+
content := filetest.TokenWithLength(20 * 1024) // 20KB
234+
_, err := temp.WriteString(string(content)) // no newline
235+
require.NoError(t, err)
236+
237+
// Use a controlled clock. It advances by 1ns each time Now() is called, which may happen
238+
// a few times during a call to ReadToEnd.
239+
clock := internaltime.NewAlwaysIncreasingClock()
240+
internaltime.Now = clock.Now
241+
internaltime.Since = clock.Since
242+
defer func() {
243+
internaltime.Now = time.Now
244+
internaltime.Since = time.Since
245+
}()
246+
247+
// Use a long flush period to ensure it does not expire DURING a ReadToEnd
248+
flushPeriod := time.Second
249+
250+
f, sink := testFactory(t, withFlushPeriod(flushPeriod))
251+
fp, err := f.NewFingerprint(temp)
252+
require.NoError(t, err)
253+
r, err := f.NewReader(temp, fp)
254+
require.NoError(t, err)
255+
assert.Equal(t, int64(0), r.Offset)
256+
257+
// First ReadToEnd should not emit anything as flush period hasn't expired
258+
r.ReadToEnd(context.Background())
259+
sink.ExpectNoCalls(t)
260+
261+
// Advance time past the flush period to test behavior after timer is expired
262+
clock.Advance(2 * flushPeriod)
263+
264+
// Second ReadToEnd should emit the full untruncated token
265+
r.ReadToEnd(context.Background())
266+
sink.ExpectToken(t, content)
267+
268+
sink.ExpectNoCalls(t)
269+
}
270+
271+
func TestUntermintedLogEntryGrows(t *testing.T) {
272+
tempDir := t.TempDir()
273+
temp := filetest.OpenTemp(t, tempDir)
274+
275+
// Create a log entry longer than DefaultBufferSize (16KB) but shorter than maxLogSize
276+
content := filetest.TokenWithLength(20 * 1024) // 20KB
277+
_, err := temp.WriteString(string(content)) // no newline
278+
require.NoError(t, err)
279+
280+
// Use a controlled clock. It advances by 1ns each time Now() is called, which may happen
281+
// a few times during a call to ReadToEnd.
282+
clock := internaltime.NewAlwaysIncreasingClock()
283+
internaltime.Now = clock.Now
284+
internaltime.Since = clock.Since
285+
defer func() {
286+
internaltime.Now = time.Now
287+
internaltime.Since = time.Since
288+
}()
289+
290+
// Use a long flush period to ensure it does not expire DURING a ReadToEnd
291+
flushPeriod := time.Second
292+
293+
f, sink := testFactory(t, withFlushPeriod(flushPeriod))
294+
fp, err := f.NewFingerprint(temp)
295+
require.NoError(t, err)
296+
r, err := f.NewReader(temp, fp)
297+
require.NoError(t, err)
298+
assert.Equal(t, int64(0), r.Offset)
299+
300+
// First ReadToEnd should not emit anything as flush period hasn't expired
301+
r.ReadToEnd(context.Background())
302+
sink.ExpectNoCalls(t)
303+
304+
// Advance time past the flush period to test behavior after timer is expired
305+
clock.Advance(2 * flushPeriod)
306+
307+
// Write additional unterminated content to ensure all is picked up in the same token
308+
// The flusher should notice new data and not return anything on the next call
309+
additionalContext := filetest.TokenWithLength(1024)
310+
_, err = temp.WriteString(string(additionalContext)) // no newline
311+
require.NoError(t, err)
312+
313+
r.ReadToEnd(context.Background())
314+
sink.ExpectNoCalls(t)
315+
316+
// Advance time past the flush period to test behavior after timer is expired
317+
clock.Advance(2 * flushPeriod)
318+
319+
// Finally, since we haven't seen new data, flusher should emit the token
320+
r.ReadToEnd(context.Background())
321+
sink.ExpectToken(t, append(content, additionalContext...))
322+
323+
sink.ExpectNoCalls(t)
324+
}

pkg/stanza/internal/time/time.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,3 +35,7 @@ func (c AlwaysIncreasingClock) Since(t time.Time) time.Duration {
3535
c.FakeClock.Advance(time.Nanosecond)
3636
return c.FakeClock.Since(t)
3737
}
38+
39+
func (c AlwaysIncreasingClock) Advance(d time.Duration) {
40+
c.FakeClock.Advance(d)
41+
}

pkg/stanza/tokenlen/tokenlen.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package tokenlen // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/tokenlen"
5+
6+
import "bufio"
7+
8+
// State tracks the potential length of a token before any terminator checking
9+
type State struct {
10+
MinimumLength int
11+
}
12+
13+
// Func wraps a bufio.SplitFunc to track potential token lengths
14+
// Records the length of the data before delegating to the wrapped function
15+
func (s *State) Func(splitFunc bufio.SplitFunc) bufio.SplitFunc {
16+
if s == nil {
17+
return splitFunc
18+
}
19+
20+
return func(data []byte, atEOF bool) (advance int, token []byte, err error) {
21+
// Note the potential token length but don't update state until we know
22+
// whether or not a token is actually returned
23+
potentialLen := len(data)
24+
25+
advance, token, err = splitFunc(data, atEOF)
26+
if advance == 0 && token == nil && err == nil {
27+
// The splitFunc is asking for more data. Remember how much
28+
// we saw previously so the buffer can be sized appropriately.
29+
s.MinimumLength = potentialLen
30+
} else {
31+
// A token was returned. This state represented that token, so clear it.
32+
s.MinimumLength = 0
33+
}
34+
return advance, token, err
35+
}
36+
}

pkg/stanza/tokenlen/tokenlen_test.go

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package tokenlen
5+
6+
import (
7+
"bufio"
8+
"testing"
9+
10+
"github.com/stretchr/testify/require"
11+
)
12+
13+
func TestTokenLenState_Func(t *testing.T) {
14+
cases := []struct {
15+
name string
16+
input []byte
17+
atEOF bool
18+
expectedLen int
19+
expectedToken []byte
20+
expectedAdv int
21+
expectedErr error
22+
}{
23+
{
24+
name: "no token yet",
25+
input: []byte("partial"),
26+
atEOF: false,
27+
expectedLen: len("partial"),
28+
},
29+
{
30+
name: "complete token",
31+
input: []byte("complete\ntoken"),
32+
atEOF: false,
33+
expectedLen: 0, // should clear state after finding token
34+
expectedToken: []byte("complete"),
35+
expectedAdv: len("complete\n"),
36+
},
37+
{
38+
name: "growing token",
39+
input: []byte("growing"),
40+
atEOF: false,
41+
expectedLen: len("growing"),
42+
},
43+
{
44+
name: "flush at EOF",
45+
input: []byte("flush"),
46+
atEOF: true,
47+
expectedLen: 0, // should clear state after flushing
48+
expectedToken: []byte("flush"),
49+
expectedAdv: len("flush"),
50+
},
51+
}
52+
53+
for _, tc := range cases {
54+
t.Run(tc.name, func(t *testing.T) {
55+
state := &State{}
56+
splitFunc := state.Func(bufio.ScanLines)
57+
58+
adv, token, err := splitFunc(tc.input, tc.atEOF)
59+
require.Equal(t, tc.expectedErr, err)
60+
require.Equal(t, tc.expectedToken, token)
61+
require.Equal(t, tc.expectedAdv, adv)
62+
require.Equal(t, tc.expectedLen, state.MinimumLength)
63+
})
64+
}
65+
}
66+
67+
func TestTokenLenState_GrowingToken(t *testing.T) {
68+
state := &State{}
69+
splitFunc := state.Func(bufio.ScanLines)
70+
71+
// First call with partial token
72+
adv, token, err := splitFunc([]byte("part"), false)
73+
require.NoError(t, err)
74+
require.Nil(t, token)
75+
require.Equal(t, 0, adv)
76+
require.Equal(t, len("part"), state.MinimumLength)
77+
78+
// Second call with longer partial token
79+
adv, token, err = splitFunc([]byte("partial"), false)
80+
require.NoError(t, err)
81+
require.Nil(t, token)
82+
require.Equal(t, 0, adv)
83+
require.Equal(t, len("partial"), state.MinimumLength)
84+
85+
// Final call with complete token
86+
adv, token, err = splitFunc([]byte("partial\ntoken"), false)
87+
require.NoError(t, err)
88+
require.Equal(t, []byte("partial"), token)
89+
require.Equal(t, len("partial\n"), adv)
90+
require.Equal(t, 0, state.MinimumLength) // State should be cleared after emitting token
91+
}

0 commit comments

Comments
 (0)