Skip to content

Commit 4ed6328

Browse files
authored
[chore] [receiver/filelog] Add test to verify receiver consume contract (#22833)
Verify that `retry_on_failure.enabled` config option makes the receiver correctly re-send all the logs temporarily rejected by the next consumer
1 parent a8c03d0 commit 4ed6328

File tree

1 file changed

+64
-0
lines changed

1 file changed

+64
-0
lines changed

receiver/filelogreceiver/filelog_test.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"path/filepath"
1212
"runtime"
1313
"sync"
14+
"sync/atomic"
1415
"testing"
1516
"time"
1617

@@ -31,6 +32,7 @@ import (
3132
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
3233
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
3334
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/input/file"
35+
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/parser/json"
3436
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/parser/regex"
3537
)
3638

@@ -313,3 +315,65 @@ func rotationTestConfig(tempDir string) *FileLogConfig {
313315
}(),
314316
}
315317
}
318+
319+
// TestConsumeContract tests the contract between the filelog receiver and the next consumer with enabled retry.
320+
func TestConsumeContract(t *testing.T) {
321+
tmpDir := t.TempDir()
322+
filePattern := "test-*.log"
323+
flg := &fileLogGenerator{t: t, tmpDir: tmpDir, filePattern: filePattern}
324+
325+
cfg := createDefaultConfig()
326+
cfg.RetryOnFailure.Enabled = true
327+
cfg.RetryOnFailure.InitialInterval = 1 * time.Millisecond
328+
cfg.RetryOnFailure.MaxInterval = 10 * time.Millisecond
329+
cfg.InputConfig.Include = []string{filepath.Join(tmpDir, filePattern)}
330+
cfg.InputConfig.StartAt = "beginning"
331+
jsonParser := json.NewConfig()
332+
tsField := entry.NewAttributeField("ts")
333+
jsonParser.TimeParser = &helper.TimeParser{
334+
ParseFrom: &tsField,
335+
Layout: time.RFC3339,
336+
LayoutType: "gotime",
337+
}
338+
jsonParser.ParseTo = entry.RootableField{Field: entry.NewAttributeField()}
339+
logField := entry.NewAttributeField("log")
340+
jsonParser.BodyField = &logField
341+
cfg.Operators = []operator.Config{{Builder: jsonParser}}
342+
343+
receivertest.CheckConsumeContract(receivertest.CheckConsumeContractParams{
344+
T: t,
345+
Factory: NewFactory(),
346+
DataType: component.DataTypeLogs,
347+
Config: cfg,
348+
Generator: flg,
349+
GenerateCount: 10000,
350+
})
351+
}
352+
353+
type fileLogGenerator struct {
354+
t *testing.T
355+
tmpDir string
356+
filePattern string
357+
tmpFile *os.File
358+
sequenceNum int64
359+
}
360+
361+
func (g *fileLogGenerator) Start() {
362+
tmpFile, err := os.CreateTemp(g.tmpDir, g.filePattern)
363+
require.NoError(g.t, err)
364+
g.tmpFile = tmpFile
365+
}
366+
367+
func (g *fileLogGenerator) Stop() {
368+
require.NoError(g.t, g.tmpFile.Close())
369+
require.NoError(g.t, os.Remove(g.tmpFile.Name()))
370+
}
371+
372+
func (g *fileLogGenerator) Generate() []receivertest.UniqueIDAttrVal {
373+
id := receivertest.UniqueIDAttrVal(fmt.Sprintf("%d", atomic.AddInt64(&g.sequenceNum, 1)))
374+
logLine := fmt.Sprintf(`{"ts": "%s", "log": "log-%s", "%s": "%s"}`, time.Now().Format(time.RFC3339), id,
375+
receivertest.UniqueIDAttrName, id)
376+
_, err := g.tmpFile.WriteString(logLine + "\n")
377+
require.NoError(g.t, err)
378+
return []receivertest.UniqueIDAttrVal{id}
379+
}

0 commit comments

Comments
 (0)