From 31b42b8215571422a4f81a38ad18e50e37e15b79 Mon Sep 17 00:00:00 2001 From: Paulo Janotti Date: Tue, 7 Nov 2023 12:50:38 -0800 Subject: [PATCH] Limit concurrency for certain tests --- .../internal/idbatcher/id_batcher_test.go | 6 ++++++ processor/tailsamplingprocessor/processor_test.go | 8 ++++++++ 2 files changed, 14 insertions(+) diff --git a/processor/tailsamplingprocessor/internal/idbatcher/id_batcher_test.go b/processor/tailsamplingprocessor/internal/idbatcher/id_batcher_test.go index bb0cda216fade..f9b221a2afd93 100644 --- a/processor/tailsamplingprocessor/internal/idbatcher/id_batcher_test.go +++ b/processor/tailsamplingprocessor/internal/idbatcher/id_batcher_test.go @@ -103,11 +103,17 @@ func concurrencyTest(t *testing.T, numBatches, newBatchesInitialCapacity, batchC ids := generateSequentialIds(10000) wg := &sync.WaitGroup{} + // Limit the concurrency here to avoid creating too many goroutines and hit + // https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/9126 + concurrencyLimiter := make(chan struct{}, 128) + defer close(concurrencyLimiter) for i := 0; i < len(ids); i++ { wg.Add(1) + concurrencyLimiter <- struct{}{} go func(id pcommon.TraceID) { batcher.AddToCurrentBatch(id) wg.Done() + <-concurrencyLimiter }(ids[i]) } diff --git a/processor/tailsamplingprocessor/processor_test.go b/processor/tailsamplingprocessor/processor_test.go index f5da628dc503b..94d574c90009f 100644 --- a/processor/tailsamplingprocessor/processor_test.go +++ b/processor/tailsamplingprocessor/processor_test.go @@ -208,16 +208,24 @@ func TestConcurrentTraceArrival(t *testing.T) { require.NoError(t, tsp.Shutdown(context.Background())) }() + // Limit the concurrency here to avoid creating too many goroutines and hit + // https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/9126 + concurrencyLimiter := make(chan struct{}, 128) + defer close(concurrencyLimiter) for _, batch := range batches { // Add the same traceId twice. wg.Add(2) + concurrencyLimiter <- struct{}{} go func(td ptrace.Traces) { require.NoError(t, tsp.ConsumeTraces(context.Background(), td)) wg.Done() + <-concurrencyLimiter }(batch) + concurrencyLimiter <- struct{}{} go func(td ptrace.Traces) { require.NoError(t, tsp.ConsumeTraces(context.Background(), td)) wg.Done() + <-concurrencyLimiter }(batch) }