Skip to content

Commit b90d8ed

Browse files
taj-pjpkrohlingdmitryax
authored
Multi Receiver Testbed Support (#5962)
* [processor/resourcedetection] Fix system detector not setting attributes (open-telemetry#24670) (open-telemetry#24684) Cherry-pick the commit from the patch release v0.82.1 Co-authored-by: Dmitrii Anoshin <[email protected]> * feat: working * . * . * . * . * . --------- Co-authored-by: Juraci Paixão Kröhling <[email protected]> Co-authored-by: Dmitrii Anoshin <[email protected]>
1 parent e9acd9e commit b90d8ed

File tree

4 files changed

+443
-0
lines changed

4 files changed

+443
-0
lines changed
Lines changed: 325 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,325 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package testbed // import "github.com/open-telemetry/opentelemetry-collector-contrib/testbed/testbed"
5+
6+
import (
7+
"fmt"
8+
"log"
9+
"net"
10+
"os"
11+
"path"
12+
"path/filepath"
13+
"testing"
14+
"time"
15+
16+
"github.com/stretchr/testify/require"
17+
)
18+
19+
// MultiReceiverTestCase defines a running test case.
20+
type MultiReceiverTestCase struct {
21+
T *testing.T
22+
23+
// Directory where test case results and logs will be written.
24+
resultDir string
25+
26+
// Resource spec for agent.
27+
resourceSpec ResourceSpec
28+
29+
// Agent process.
30+
agentProc OtelcolRunner
31+
32+
Sender DataSender
33+
Receivers []DataReceiver
34+
35+
LoadGenerator *LoadGenerator
36+
MockBackends map[DataReceiver]*MockBackend
37+
validator *MultiReceiverTestCaseValidator
38+
39+
startTime time.Time
40+
41+
// errorSignal indicates an error in the test case execution, e.g. process execution
42+
// failure or exceeding resource consumption, etc. The actual error message is already
43+
// logged, this is only an indicator on which you can wait to be informed.
44+
errorSignal chan struct{}
45+
// Duration is the requested duration of the tests. Configured via TESTBED_DURATION
46+
// env variable and defaults to 15 seconds if env variable is unspecified.
47+
Duration time.Duration
48+
doneSignal chan struct{}
49+
errorCause string
50+
ResultsSummary TestResultsSummary
51+
}
52+
53+
// NewMultiReceiverTestCase creates a new MultiReceiverTestCase. It expects agent-config.yaml in the specified directory.
54+
func NewMultiReceiverTestCase(
55+
t *testing.T,
56+
dataProvider DataProvider,
57+
sender DataSender,
58+
receivers []DataReceiver,
59+
agentProc OtelcolRunner,
60+
validator *MultiReceiverTestCaseValidator,
61+
resultsSummary TestResultsSummary,
62+
opts ...MultiReceiverTestCaseOption,
63+
) *MultiReceiverTestCase {
64+
tc := MultiReceiverTestCase{
65+
T: t,
66+
errorSignal: make(chan struct{}),
67+
doneSignal: make(chan struct{}),
68+
startTime: time.Now(),
69+
Sender: sender,
70+
Receivers: receivers,
71+
agentProc: agentProc,
72+
validator: validator,
73+
ResultsSummary: resultsSummary,
74+
MockBackends: make(map[DataReceiver]*MockBackend),
75+
}
76+
77+
// Get requested test case duration from env variable.
78+
duration := os.Getenv(testcaseDurationVar)
79+
if duration == "" {
80+
duration = "15s"
81+
}
82+
var err error
83+
tc.Duration, err = time.ParseDuration(duration)
84+
if err != nil {
85+
log.Fatalf("Invalid "+testcaseDurationVar+": %v. Expecting a valid duration string.", duration)
86+
}
87+
88+
// Apply all provided options.
89+
for _, opt := range opts {
90+
opt(&tc)
91+
}
92+
93+
// Prepare directory for results.
94+
tc.resultDir, err = filepath.Abs(path.Join("results", t.Name()))
95+
require.NoErrorf(t, err, "Cannot resolve %s", t.Name())
96+
require.NoErrorf(t, os.MkdirAll(tc.resultDir, os.ModePerm), "Cannot create directory %s", tc.resultDir)
97+
98+
// Set default resource check period.
99+
tc.resourceSpec.ResourceCheckPeriod = 3 * time.Second
100+
if tc.Duration < tc.resourceSpec.ResourceCheckPeriod {
101+
// Resource check period should not be longer than entire test duration.
102+
tc.resourceSpec.ResourceCheckPeriod = tc.Duration
103+
}
104+
105+
tc.LoadGenerator, err = NewLoadGenerator(dataProvider, sender)
106+
require.NoError(t, err, "Cannot create generator")
107+
108+
for _, receiver := range receivers {
109+
tc.MockBackends[receiver] = NewMockBackend(tc.composeTestResultFileName("backend.log"), receiver)
110+
}
111+
112+
go tc.logStats()
113+
114+
return &tc
115+
}
116+
117+
func (tc *MultiReceiverTestCase) composeTestResultFileName(fileName string) string {
118+
fileName, err := filepath.Abs(path.Join(tc.resultDir, fileName))
119+
require.NoError(tc.T, err, "Cannot resolve %s", fileName)
120+
return fileName
121+
}
122+
123+
// StartAgent starts the agent and redirects its standard output and standard error
124+
// to "agent.log" file located in the test directory.
125+
func (tc *MultiReceiverTestCase) StartAgent(args ...string) {
126+
logFileName := tc.composeTestResultFileName("agent.log")
127+
128+
startParams := StartParams{
129+
Name: "Agent",
130+
LogFilePath: logFileName,
131+
CmdArgs: args,
132+
resourceSpec: &tc.resourceSpec,
133+
}
134+
if err := tc.agentProc.Start(startParams); err != nil {
135+
tc.indicateError(err)
136+
return
137+
}
138+
139+
// Start watching resource consumption.
140+
go func() {
141+
if err := tc.agentProc.WatchResourceConsumption(); err != nil {
142+
tc.indicateError(err)
143+
}
144+
}()
145+
146+
endpoint := tc.LoadGenerator.sender.GetEndpoint()
147+
if endpoint != nil {
148+
// Wait for agent to start. We consider the agent started when we can
149+
// connect to the port to which we intend to send load. We only do this
150+
// if the endpoint is not-empty, i.e. the sender does use network (some senders
151+
// like text log writers don't).
152+
tc.WaitFor(func() bool {
153+
conn, err := net.Dial(tc.LoadGenerator.sender.GetEndpoint().Network(), tc.LoadGenerator.sender.GetEndpoint().String())
154+
if err == nil && conn != nil {
155+
conn.Close()
156+
return true
157+
}
158+
return false
159+
}, fmt.Sprintf("connection to %s:%s", tc.LoadGenerator.sender.GetEndpoint().Network(), tc.LoadGenerator.sender.GetEndpoint().String()))
160+
}
161+
}
162+
163+
// StopAgent stops agent process.
164+
func (tc *MultiReceiverTestCase) StopAgent() {
165+
if _, err := tc.agentProc.Stop(); err != nil {
166+
tc.indicateError(err)
167+
}
168+
}
169+
170+
// StartLoad starts the load generator and redirects its standard output and standard error
171+
// to "load-generator.log" file located in the test directory.
172+
func (tc *MultiReceiverTestCase) StartLoad(options LoadOptions) {
173+
tc.LoadGenerator.Start(options)
174+
}
175+
176+
// StopLoad stops load generator.
177+
func (tc *MultiReceiverTestCase) StopLoad() {
178+
tc.LoadGenerator.Stop()
179+
}
180+
181+
// StartBackend starts the specified backend type.
182+
func (tc *MultiReceiverTestCase) StartBackends() {
183+
184+
for _, backend := range tc.MockBackends {
185+
require.NoError(tc.T, backend.Start(), "Cannot start backend for: "+backend.receiver.ProtocolName())
186+
}
187+
}
188+
189+
// StopBackend stops the backend.
190+
func (tc *MultiReceiverTestCase) StopBackend() {
191+
for _, backend := range tc.MockBackends {
192+
backend.Stop()
193+
}
194+
}
195+
196+
// EnableRecording enables recording of all data received by MockBackend.
197+
func (tc *MultiReceiverTestCase) EnableRecording() {
198+
for _, backend := range tc.MockBackends {
199+
backend.EnableRecording()
200+
}
201+
}
202+
203+
// AgentMemoryInfo returns raw memory info struct about the agent
204+
// as returned by github.com/shirou/gopsutil/process
205+
func (tc *MultiReceiverTestCase) AgentMemoryInfo() (uint32, uint32, error) {
206+
stat, err := tc.agentProc.GetProcessMon().MemoryInfo()
207+
if err != nil {
208+
return 0, 0, err
209+
}
210+
return uint32(stat.RSS / mibibyte), uint32(stat.VMS / mibibyte), nil
211+
}
212+
213+
// Stop stops the load generator, the agent and the backend.
214+
func (tc *MultiReceiverTestCase) Stop() {
215+
// Stop monitoring the agent
216+
close(tc.doneSignal)
217+
218+
// Stop all components
219+
tc.StopLoad()
220+
tc.StopAgent()
221+
tc.StopBackend()
222+
223+
// Report test results
224+
tc.validator.RecordResults(tc)
225+
}
226+
227+
// ValidateData validates data received by mock backend against what was generated and sent to the collector
228+
// instance(s) under test by the LoadGenerator.
229+
func (tc *MultiReceiverTestCase) ValidateData() {
230+
select {
231+
case <-tc.errorSignal:
232+
// Error is already signaled and recorded. Validating data is pointless.
233+
return
234+
default:
235+
}
236+
237+
tc.validator.Validate(tc)
238+
}
239+
240+
// Sleep for specified duration or until error is signaled.
241+
func (tc *MultiReceiverTestCase) Sleep(d time.Duration) {
242+
select {
243+
case <-time.After(d):
244+
case <-tc.errorSignal:
245+
}
246+
}
247+
248+
// WaitForN the specific condition for up to a specified duration. Records a test error
249+
// if time is out and condition does not become true. If error is signaled
250+
// while waiting the function will return false, but will not record additional
251+
// test error (we assume that signaled error is already recorded in indicateError()).
252+
func (tc *MultiReceiverTestCase) WaitForN(cond func() bool, duration time.Duration, errMsg interface{}) bool {
253+
startTime := time.Now()
254+
255+
// Start with 5 ms waiting interval between condition re-evaluation.
256+
waitInterval := time.Millisecond * 5
257+
258+
for {
259+
if cond() {
260+
return true
261+
}
262+
263+
select {
264+
case <-time.After(waitInterval):
265+
case <-tc.errorSignal:
266+
return false
267+
}
268+
269+
// Increase waiting interval exponentially up to 500 ms.
270+
if waitInterval < time.Millisecond*500 {
271+
waitInterval *= 2
272+
}
273+
274+
if time.Since(startTime) > duration {
275+
// Waited too long
276+
tc.T.Error("Time out waiting for", errMsg)
277+
return false
278+
}
279+
}
280+
}
281+
282+
// WaitFor is like WaitForN but with a fixed duration of 10 seconds
283+
func (tc *MultiReceiverTestCase) WaitFor(cond func() bool, errMsg interface{}) bool {
284+
return tc.WaitForN(cond, time.Second*60, errMsg)
285+
}
286+
287+
func (tc *MultiReceiverTestCase) indicateError(err error) {
288+
// Print to log for visibility
289+
log.Print(err.Error())
290+
291+
// Indicate error for the test
292+
tc.T.Error(err.Error())
293+
294+
tc.errorCause = err.Error()
295+
296+
// Signal the error via channel
297+
close(tc.errorSignal)
298+
}
299+
300+
func (tc *MultiReceiverTestCase) logStats() {
301+
t := time.NewTicker(tc.resourceSpec.ResourceCheckPeriod)
302+
defer t.Stop()
303+
304+
for {
305+
select {
306+
case <-t.C:
307+
tc.logStatsOnce()
308+
case <-tc.doneSignal:
309+
return
310+
}
311+
}
312+
}
313+
314+
func (tc *MultiReceiverTestCase) logStatsOnce() {
315+
316+
log.Printf("%s | %s | ",
317+
tc.agentProc.GetResourceConsumption(),
318+
tc.LoadGenerator.GetStats())
319+
320+
for _, backend := range tc.MockBackends {
321+
log.Printf("%s | ",
322+
backend.GetStats())
323+
}
324+
}
325+
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package testbed // import "github.com/open-telemetry/opentelemetry-collector-contrib/testbed/testbed"
5+
6+
// MultiReceiverValidator defines the interface for validating and reporting test results.
7+
type MultiReceiverTestCaseValidator struct {
8+
// Validate executes validation routines and test assertions.
9+
Validate func(tc *MultiReceiverTestCase)
10+
11+
dataProvider DataProvider
12+
assertionFailures []*TraceAssertionFailure
13+
}
14+
15+
16+
func NewMultiReceiverTestCaseValidator(
17+
senderName string,
18+
provider DataProvider,
19+
validator func(tc *MultiReceiverTestCase),
20+
) *MultiReceiverTestCaseValidator {
21+
return &MultiReceiverTestCaseValidator{
22+
dataProvider: provider,
23+
Validate: validator,
24+
}
25+
}
26+
27+
func (v *MultiReceiverTestCaseValidator) RecordResults(tc *MultiReceiverTestCase) {
28+
var result string
29+
if tc.T.Failed() {
30+
result = "FAIL"
31+
} else {
32+
result = "PASS"
33+
}
34+
35+
// Remove "Test" prefix from test name.
36+
testName := tc.T.Name()[4:]
37+
tc.ResultsSummary.Add(tc.T.Name(), &MultiReceiverTestResult{
38+
testName: testName,
39+
result: result,
40+
traceAssertionFailureCount: uint64(len(v.assertionFailures)),
41+
traceAssertionFailures: v.assertionFailures,
42+
})
43+
}

testbed/testbed/options.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@ func (rs *ResourceSpec) isSpecified() bool {
4242
// TestCaseOption defines a TestCase option.
4343
type TestCaseOption func(t *TestCase)
4444

45+
// MultiReceiverTestCaseOption defines a MultiReceiverTestCase option.
46+
type MultiReceiverTestCaseOption func(t *MultiReceiverTestCase)
47+
4548
// WithSkipResults disables writing out results file for a TestCase.
4649
func WithSkipResults() TestCaseOption {
4750
return func(tc *TestCase) {

0 commit comments

Comments
 (0)