Skip to content

Commit de0a195

Browse files
VihasMakwanadjaglowski
authored andcommitted
[chore][Testbed] - Try flushing the data before exiting in load generator (open-telemetry#35211)
**Description:** In the `LoadGenerator`, if we encounter a non-permanent error and are in the process of retrying, we should not immediately exit upon receiving a stop signal. Instead, we need to first flush the existing data and then proceed to exit the LoadGenerator. This is necessary for stress test cases and we need to validate the data received and data sent. --------- Co-authored-by: Daniel Jaglowski <[email protected]>
1 parent 650ffc5 commit de0a195

File tree

1 file changed

+14
-4
lines changed

1 file changed

+14
-4
lines changed

testbed/testbed/load_generator.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,9 @@ type LoadOptions struct {
4646

4747
// Parallel specifies how many goroutines to send from.
4848
Parallel int
49+
50+
// MaxDelay defines the longest amount of time we can continue retrying for non-permanent errors.
51+
MaxDelay time.Duration
4952
}
5053

5154
var _ LoadGenerator = (*ProviderSender)(nil)
@@ -112,6 +115,11 @@ func (ps *ProviderSender) Start(options LoadOptions) {
112115
ps.options.ItemsPerBatch = 10
113116
}
114117

118+
if ps.options.MaxDelay == 0 {
119+
// retry for an additional 10 seconds by default
120+
ps.options.MaxDelay = time.Second * 10
121+
}
122+
115123
log.Printf("Starting load generator at %d items/sec.", ps.options.DataItemsPerSecond)
116124

117125
// Indicate that generation is in progress.
@@ -240,6 +248,7 @@ func (ps *ProviderSender) generateTrace() error {
240248
traceSender := ps.Sender.(TraceDataSender)
241249

242250
traceData, done := ps.Provider.GenerateTraces()
251+
timer := time.NewTimer(ps.options.MaxDelay)
243252
if done {
244253
return nil
245254
}
@@ -258,9 +267,8 @@ func (ps *ProviderSender) generateTrace() error {
258267
return fmt.Errorf("cannot send traces: %w", err)
259268
}
260269
ps.nonPermanentErrors.Add(uint64(traceData.SpanCount()))
261-
262270
select {
263-
case <-ps.stopSignal:
271+
case <-timer.C:
264272
return nil
265273
default:
266274
}
@@ -271,6 +279,7 @@ func (ps *ProviderSender) generateMetrics() error {
271279
metricSender := ps.Sender.(MetricDataSender)
272280

273281
metricData, done := ps.Provider.GenerateMetrics()
282+
timer := time.NewTimer(ps.options.MaxDelay)
274283
if done {
275284
return nil
276285
}
@@ -291,7 +300,7 @@ func (ps *ProviderSender) generateMetrics() error {
291300
ps.nonPermanentErrors.Add(uint64(metricData.DataPointCount()))
292301

293302
select {
294-
case <-ps.stopSignal:
303+
case <-timer.C:
295304
return nil
296305
default:
297306
}
@@ -302,6 +311,7 @@ func (ps *ProviderSender) generateLog() error {
302311
logSender := ps.Sender.(LogDataSender)
303312

304313
logData, done := ps.Provider.GenerateLogs()
314+
timer := time.NewTimer(ps.options.MaxDelay)
305315
if done {
306316
return nil
307317
}
@@ -322,7 +332,7 @@ func (ps *ProviderSender) generateLog() error {
322332
ps.nonPermanentErrors.Add(uint64(logData.LogRecordCount()))
323333

324334
select {
325-
case <-ps.stopSignal:
335+
case <-timer.C:
326336
return nil
327337
default:
328338
}

0 commit comments

Comments
 (0)