Skip to content

Commit 8f2beb7

Browse files
djaglowskiXinRanZhAWS
authored andcommitted
[chore][pkg/stanza] Add file disambiguation tests (open-telemetry#31171)
Closes open-telemetry#20850 Closes open-telemetry#20851 Also enhances the `emittest` package, though it remains internal.
1 parent b38aca3 commit 8f2beb7

File tree

3 files changed

+171
-36
lines changed

3 files changed

+171
-36
lines changed

pkg/stanza/fileconsumer/file_test.go

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1318,3 +1318,95 @@ func TestWindowsFilesClosedImmediately(t *testing.T) {
13181318
// On Windows, poll should close the file after reading it. We can test this by trying to move it.
13191319
require.NoError(t, os.Rename(temp.Name(), temp.Name()+"_renamed"))
13201320
}
1321+
1322+
func TestDelayedDisambiguation(t *testing.T) {
1323+
t.Parallel()
1324+
tempDir := t.TempDir()
1325+
cfg := NewConfig().includeDir(tempDir)
1326+
cfg.FingerprintSize = 18
1327+
cfg.StartAt = "beginning"
1328+
operator, sink := testManager(t, cfg)
1329+
operator.persister = testutil.NewMockPersister("test")
1330+
1331+
// Two identical files, smaller than fingerprint size
1332+
file1 := filetest.OpenTempWithPattern(t, tempDir, "*.log1")
1333+
file2 := filetest.OpenTempWithPattern(t, tempDir, "*.log2")
1334+
1335+
sameContent := "aaaaaaaaaaa"
1336+
filetest.WriteString(t, file1, sameContent+"\n")
1337+
filetest.WriteString(t, file2, sameContent+"\n")
1338+
operator.poll(context.Background())
1339+
1340+
token, attributes := sink.NextCall(t)
1341+
require.Equal(t, []byte(sameContent), token)
1342+
sink.ExpectNoCallsUntil(t, 100*time.Millisecond)
1343+
operator.wg.Wait()
1344+
1345+
// Append different data
1346+
newContent1 := "more content in file 1 only"
1347+
newContent2 := "different content in file 2"
1348+
filetest.WriteString(t, file1, newContent1+"\n")
1349+
filetest.WriteString(t, file2, newContent2+"\n")
1350+
operator.poll(context.Background())
1351+
1352+
var sameTokenOtherFile emittest.Call
1353+
if attributes[attrs.LogFileName].(string) == filepath.Base(file1.Name()) {
1354+
sameTokenOtherFile = emittest.Call{Token: []byte(sameContent), Attrs: map[string]any{attrs.LogFileName: filepath.Base(file2.Name())}}
1355+
} else {
1356+
sameTokenOtherFile = emittest.Call{Token: []byte(sameContent), Attrs: map[string]any{attrs.LogFileName: filepath.Base(file1.Name())}}
1357+
}
1358+
newFromFile1 := emittest.Call{Token: []byte(newContent1), Attrs: map[string]any{attrs.LogFileName: filepath.Base(file1.Name())}}
1359+
newFromFile2 := emittest.Call{Token: []byte(newContent2), Attrs: map[string]any{attrs.LogFileName: filepath.Base(file2.Name())}}
1360+
sink.ExpectCalls(t, &sameTokenOtherFile, &newFromFile1, &newFromFile2)
1361+
}
1362+
1363+
func TestNoLostPartial(t *testing.T) {
1364+
t.Parallel()
1365+
tempDir := t.TempDir()
1366+
cfg := NewConfig().includeDir(tempDir)
1367+
cfg.FingerprintSize = 18
1368+
cfg.StartAt = "beginning"
1369+
operator, sink := testManager(t, cfg)
1370+
operator.persister = testutil.NewMockPersister("test")
1371+
1372+
// Two same fingerprint file , and smaller than config size
1373+
file1 := filetest.OpenTempWithPattern(t, tempDir, "*.log1")
1374+
file2 := filetest.OpenTempWithPattern(t, tempDir, "*.log2")
1375+
1376+
sameContent := "aaaaaaaaaaa"
1377+
filetest.WriteString(t, file1, sameContent+"\n")
1378+
filetest.WriteString(t, file2, sameContent+"\n")
1379+
operator.poll(context.Background())
1380+
1381+
token, attributes := sink.NextCall(t)
1382+
require.Equal(t, []byte(sameContent), token)
1383+
sink.ExpectNoCallsUntil(t, 100*time.Millisecond)
1384+
operator.wg.Wait()
1385+
1386+
newContent1 := "additional content in file 1 only"
1387+
filetest.WriteString(t, file1, newContent1+"\n")
1388+
1389+
var otherFileName string
1390+
if attributes[attrs.LogFileName].(string) == filepath.Base(file1.Name()) {
1391+
otherFileName = filepath.Base(file2.Name())
1392+
} else {
1393+
otherFileName = filepath.Base(file1.Name())
1394+
}
1395+
1396+
var foundSameFromOtherFile, foundNewFromFileOne bool
1397+
require.Eventually(t, func() bool {
1398+
operator.poll(context.Background())
1399+
defer operator.wg.Wait()
1400+
1401+
token, attributes = sink.NextCall(t)
1402+
switch {
1403+
case string(token) == sameContent && attributes[attrs.LogFileName].(string) == otherFileName:
1404+
foundSameFromOtherFile = true
1405+
case string(token) == newContent1 && attributes[attrs.LogFileName].(string) == filepath.Base(file1.Name()):
1406+
foundNewFromFileOne = true
1407+
default:
1408+
t.Errorf("unexpected token from file %q: %s", filepath.Base(attributes[attrs.LogFileName].(string)), token)
1409+
}
1410+
return foundSameFromOtherFile && foundNewFromFileOne
1411+
}, time.Second, 100*time.Millisecond)
1412+
}

pkg/stanza/fileconsumer/internal/emittest/sink.go

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,13 @@ type sinkCfg struct {
2222

2323
type SinkOpt func(*sinkCfg)
2424

25-
type call struct {
26-
token []byte
27-
attrs map[string]any
25+
type Call struct {
26+
Token []byte
27+
Attrs map[string]any
2828
}
2929

3030
type Sink struct {
31-
emitChan chan *call
31+
emitChan chan *Call
3232
timeout time.Duration
3333
emit.Callback
3434
}
@@ -53,14 +53,14 @@ func NewSink(opts ...SinkOpt) *Sink {
5353
for _, opt := range opts {
5454
opt(cfg)
5555
}
56-
emitChan := make(chan *call, cfg.emitChanLen)
56+
emitChan := make(chan *Call, cfg.emitChanLen)
5757
return &Sink{
5858
emitChan: emitChan,
5959
timeout: cfg.timeout,
6060
Callback: func(_ context.Context, token []byte, attrs map[string]any) error {
6161
copied := make([]byte, len(token))
6262
copy(copied, token)
63-
emitChan <- &call{copied, attrs}
63+
emitChan <- &Call{copied, attrs}
6464
return nil
6565
},
6666
}
@@ -76,7 +76,7 @@ func (s *Sink) NextTokens(t *testing.T, n int) [][]byte {
7676
for i := 0; i < n; i++ {
7777
select {
7878
case call := <-s.emitChan:
79-
emitChan = append(emitChan, call.token)
79+
emitChan = append(emitChan, call.Token)
8080
case <-time.After(s.timeout):
8181
assert.Fail(t, "Timed out waiting for message")
8282
return nil
@@ -88,7 +88,7 @@ func (s *Sink) NextTokens(t *testing.T, n int) [][]byte {
8888
func (s *Sink) NextCall(t *testing.T) ([]byte, map[string]any) {
8989
select {
9090
case c := <-s.emitChan:
91-
return c.token, c.attrs
91+
return c.Token, c.Attrs
9292
case <-time.After(s.timeout):
9393
assert.Fail(t, "Timed out waiting for message")
9494
return nil, nil
@@ -98,7 +98,7 @@ func (s *Sink) NextCall(t *testing.T) ([]byte, map[string]any) {
9898
func (s *Sink) ExpectToken(t *testing.T, expected []byte) {
9999
select {
100100
case call := <-s.emitChan:
101-
assert.Equal(t, expected, call.token)
101+
assert.Equal(t, expected, call.Token)
102102
case <-time.After(s.timeout):
103103
assert.Fail(t, fmt.Sprintf("Timed out waiting for token: %s", expected))
104104
}
@@ -109,7 +109,7 @@ func (s *Sink) ExpectTokens(t *testing.T, expected ...[]byte) {
109109
for i := 0; i < len(expected); i++ {
110110
select {
111111
case call := <-s.emitChan:
112-
actual = append(actual, call.token)
112+
actual = append(actual, call.Token)
113113
case <-time.After(s.timeout):
114114
assert.Fail(t, "Timed out waiting for message")
115115
return
@@ -121,21 +121,35 @@ func (s *Sink) ExpectTokens(t *testing.T, expected ...[]byte) {
121121
func (s *Sink) ExpectCall(t *testing.T, expected []byte, attrs map[string]any) {
122122
select {
123123
case c := <-s.emitChan:
124-
assert.Equal(t, expected, c.token)
125-
assert.Equal(t, attrs, c.attrs)
124+
assert.Equal(t, expected, c.Token)
125+
assert.Equal(t, attrs, c.Attrs)
126126
case <-time.After(s.timeout):
127127
assert.Fail(t, fmt.Sprintf("Timed out waiting for token: %s", expected))
128128
}
129129
}
130130

131+
func (s *Sink) ExpectCalls(t *testing.T, expected ...*Call) {
132+
actual := make([]*Call, 0, len(expected))
133+
for i := 0; i < len(expected); i++ {
134+
select {
135+
case call := <-s.emitChan:
136+
actual = append(actual, call)
137+
case <-time.After(s.timeout):
138+
assert.Fail(t, "Timed out waiting for message")
139+
return
140+
}
141+
}
142+
require.ElementsMatch(t, expected, actual)
143+
}
144+
131145
func (s *Sink) ExpectNoCalls(t *testing.T) {
132146
s.ExpectNoCallsUntil(t, 200*time.Millisecond)
133147
}
134148

135149
func (s *Sink) ExpectNoCallsUntil(t *testing.T, d time.Duration) {
136150
select {
137151
case c := <-s.emitChan:
138-
assert.Fail(t, "Received unexpected message", "Message: %s", c.token)
152+
assert.Fail(t, "Received unexpected message", "Message: %s", c.Token)
139153
case <-time.After(d):
140154
}
141155
}

pkg/stanza/fileconsumer/internal/emittest/sink_test.go

Lines changed: 52 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,15 @@ func TestNextToken(t *testing.T) {
1717
s, testCalls := sinkTest(t)
1818
for _, c := range testCalls {
1919
token := s.NextToken(t)
20-
assert.Equal(t, c.token, token)
20+
assert.Equal(t, c.Token, token)
2121
}
2222
}
2323

2424
func TestNextTokenTimeout(t *testing.T) {
2525
s, testCalls := sinkTest(t, WithTimeout(10*time.Millisecond))
2626
for _, c := range testCalls {
2727
token := s.NextToken(t)
28-
assert.Equal(t, c.token, token)
28+
assert.Equal(t, c.Token, token)
2929
}
3030

3131
// Create a new T so we can expect it to fail without failing the overall test.
@@ -38,17 +38,17 @@ func TestNextTokens(t *testing.T) {
3838
s, testCalls := sinkTest(t)
3939
for i := 0; i < 5; i++ {
4040
tokens := s.NextTokens(t, 2)
41-
assert.Equal(t, testCalls[2*i].token, tokens[0])
42-
assert.Equal(t, testCalls[2*i+1].token, tokens[1])
41+
assert.Equal(t, testCalls[2*i].Token, tokens[0])
42+
assert.Equal(t, testCalls[2*i+1].Token, tokens[1])
4343
}
4444
}
4545

4646
func TestNextTokensTimeout(t *testing.T) {
4747
s, testCalls := sinkTest(t, WithTimeout(10*time.Millisecond))
4848
for i := 0; i < 5; i++ {
4949
tokens := s.NextTokens(t, 2)
50-
assert.Equal(t, testCalls[2*i].token, tokens[0])
51-
assert.Equal(t, testCalls[2*i+1].token, tokens[1])
50+
assert.Equal(t, testCalls[2*i].Token, tokens[0])
51+
assert.Equal(t, testCalls[2*i+1].Token, tokens[1])
5252
}
5353

5454
// Create a new T so we can expect it to fail without failing the overall test.
@@ -61,17 +61,17 @@ func TestNextCall(t *testing.T) {
6161
s, testCalls := sinkTest(t)
6262
for _, c := range testCalls {
6363
token, attributes := s.NextCall(t)
64-
require.Equal(t, c.token, token)
65-
require.Equal(t, c.attrs, attributes)
64+
require.Equal(t, c.Token, token)
65+
require.Equal(t, c.Attrs, attributes)
6666
}
6767
}
6868

6969
func TestNextCallTimeout(t *testing.T) {
7070
s, testCalls := sinkTest(t, WithTimeout(10*time.Millisecond))
7171
for _, c := range testCalls {
7272
token, attributes := s.NextCall(t)
73-
require.Equal(t, c.token, token)
74-
require.Equal(t, c.attrs, attributes)
73+
require.Equal(t, c.Token, token)
74+
require.Equal(t, c.Attrs, attributes)
7575
}
7676

7777
// Create a new T so we can expect it to fail without failing the overall test.
@@ -83,14 +83,14 @@ func TestNextCallTimeout(t *testing.T) {
8383
func TestExpectToken(t *testing.T) {
8484
s, testCalls := sinkTest(t)
8585
for _, c := range testCalls {
86-
s.ExpectToken(t, c.token)
86+
s.ExpectToken(t, c.Token)
8787
}
8888
}
8989

9090
func TestExpectTokenTimeout(t *testing.T) {
9191
s, testCalls := sinkTest(t, WithTimeout(10*time.Millisecond))
9292
for _, c := range testCalls {
93-
s.ExpectToken(t, c.token)
93+
s.ExpectToken(t, c.Token)
9494
}
9595

9696
// Create a new T so we can expect it to fail without failing the overall test.
@@ -102,14 +102,14 @@ func TestExpectTokenTimeout(t *testing.T) {
102102
func TestExpectTokens(t *testing.T) {
103103
s, testCalls := sinkTest(t)
104104
for i := 0; i < 5; i++ {
105-
s.ExpectTokens(t, testCalls[2*i].token, testCalls[2*i+1].token)
105+
s.ExpectTokens(t, testCalls[2*i].Token, testCalls[2*i+1].Token)
106106
}
107107
}
108108

109109
func TestExpectTokensTimeout(t *testing.T) {
110110
s, testCalls := sinkTest(t, WithTimeout(10*time.Millisecond))
111111
for i := 0; i < 5; i++ {
112-
s.ExpectTokens(t, testCalls[2*i].token, testCalls[2*i+1].token)
112+
s.ExpectTokens(t, testCalls[2*i].Token, testCalls[2*i+1].Token)
113113
}
114114

115115
// Create a new T so we can expect it to fail without failing the overall test.
@@ -121,14 +121,14 @@ func TestExpectTokensTimeout(t *testing.T) {
121121
func TestExpectCall(t *testing.T) {
122122
s, testCalls := sinkTest(t)
123123
for _, c := range testCalls {
124-
s.ExpectCall(t, c.token, c.attrs)
124+
s.ExpectCall(t, c.Token, c.Attrs)
125125
}
126126
}
127127

128128
func TestExpectCallTimeout(t *testing.T) {
129129
s, testCalls := sinkTest(t, WithTimeout(10*time.Millisecond))
130130
for _, c := range testCalls {
131-
s.ExpectCall(t, c.token, c.attrs)
131+
s.ExpectCall(t, c.Token, c.Attrs)
132132
}
133133

134134
// Create a new T so we can expect it to fail without failing the overall test.
@@ -137,6 +137,35 @@ func TestExpectCallTimeout(t *testing.T) {
137137
assert.True(t, tt.Failed())
138138
}
139139

140+
func TestExpectCalls(t *testing.T) {
141+
s, testCalls := sinkTest(t)
142+
testCallsOutOfOrder := make([]*Call, 0, 10)
143+
for i := 0; i < len(testCalls); i += 2 {
144+
testCallsOutOfOrder = append(testCallsOutOfOrder, testCalls[i])
145+
}
146+
for i := 1; i < len(testCalls); i += 2 {
147+
testCallsOutOfOrder = append(testCallsOutOfOrder, testCalls[i])
148+
}
149+
s.ExpectCalls(t, testCallsOutOfOrder...)
150+
}
151+
152+
func TestExpectCallsTimeout(t *testing.T) {
153+
s, testCalls := sinkTest(t, WithTimeout(10*time.Millisecond))
154+
testCallsOutOfOrder := make([]*Call, 0, 10)
155+
for i := 0; i < len(testCalls); i += 2 {
156+
testCallsOutOfOrder = append(testCallsOutOfOrder, testCalls[i])
157+
}
158+
for i := 1; i < len(testCalls); i += 2 {
159+
testCallsOutOfOrder = append(testCallsOutOfOrder, testCalls[i])
160+
}
161+
s.ExpectCalls(t, testCallsOutOfOrder...)
162+
163+
// Create a new T so we can expect it to fail without failing the overall test.
164+
tt := new(testing.T)
165+
s.ExpectCalls(tt, new(Call))
166+
assert.True(t, tt.Failed())
167+
}
168+
140169
func TestExpectNoCalls(t *testing.T) {
141170
s, _ := sinkTest(t)
142171
s.NextTokens(t, 10) // drain the channel
@@ -156,24 +185,24 @@ func TestExpectNoCallsFailure(t *testing.T) {
156185
func TestWithCallBuffer(t *testing.T) {
157186
s, testCalls := sinkTest(t, WithCallBuffer(5))
158187
for i := 0; i < 10; i++ {
159-
s.ExpectCall(t, testCalls[i].token, testCalls[i].attrs)
188+
s.ExpectCall(t, testCalls[i].Token, testCalls[i].Attrs)
160189
}
161190
}
162191

163-
func sinkTest(t *testing.T, opts ...SinkOpt) (*Sink, []*call) {
192+
func sinkTest(t *testing.T, opts ...SinkOpt) (*Sink, []*Call) {
164193
s := NewSink(opts...)
165-
testCalls := make([]*call, 0, 10)
194+
testCalls := make([]*Call, 0, 10)
166195
for i := 0; i < 10; i++ {
167-
testCalls = append(testCalls, &call{
168-
token: []byte(fmt.Sprintf("token-%d", i)),
169-
attrs: map[string]any{
196+
testCalls = append(testCalls, &Call{
197+
Token: []byte(fmt.Sprintf("token-%d", i)),
198+
Attrs: map[string]any{
170199
"key": fmt.Sprintf("value-%d", i),
171200
},
172201
})
173202
}
174203
go func() {
175204
for _, c := range testCalls {
176-
require.NoError(t, s.Callback(context.Background(), c.token, c.attrs))
205+
require.NoError(t, s.Callback(context.Background(), c.Token, c.Attrs))
177206
}
178207
}()
179208
return s, testCalls

0 commit comments

Comments
 (0)