Skip to content

Commit c3efe18

Browse files
authored
[exporter/loadbalancing] Fix panic on a sub-exporter shutdown (#31456)
Fix panic when a sub-exporter is shut down while still handling requests. This change wraps exporters with an additional working group to ensure that exporters are shut down only after they finish processing data. Fixes #31410 It has some small related refactoring changes. I can extract them in separate PRs if needed.
1 parent 89f2709 commit c3efe18

File tree

10 files changed

+358
-183
lines changed

10 files changed

+358
-183
lines changed

.chloggen/fix-load-balancing-exp.yaml

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: bug_fix
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: exporter/loadbalancing
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Fix panic when a sub-exporter is shut down while still handling requests.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [31410]
14+
15+
# If your change doesn't affect end users or the exported elements of any package,
16+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
17+
# Optional: The change log or logs in which this entry should be included.
18+
# e.g. '[user]' or '[user, api]'
19+
# Include 'user' if the change is relevant to end users.
20+
# Include 'api' if there is a change to a library API.
21+
# Default: '[user]'
22+
change_logs: [user]

exporter/loadbalancingexporter/loadbalancer.go

Lines changed: 19 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -24,32 +24,24 @@ var (
2424
errMultipleResolversProvided = errors.New("only one resolver should be specified")
2525
)
2626

27-
var _ loadBalancer = (*loadBalancerImp)(nil)
28-
2927
type componentFactory func(ctx context.Context, endpoint string) (component.Component, error)
3028

31-
type loadBalancer interface {
32-
component.Component
33-
Endpoint(identifier []byte) string
34-
Exporter(endpoint string) (component.Component, error)
35-
}
36-
37-
type loadBalancerImp struct {
29+
type loadBalancer struct {
3830
logger *zap.Logger
3931
host component.Host
4032

4133
res resolver
4234
ring *hashRing
4335

4436
componentFactory componentFactory
45-
exporters map[string]component.Component
37+
exporters map[string]*wrappedExporter
4638

4739
stopped bool
4840
updateLock sync.RWMutex
4941
}
5042

5143
// Create new load balancer
52-
func newLoadBalancer(params exporter.CreateSettings, cfg component.Config, factory componentFactory) (*loadBalancerImp, error) {
44+
func newLoadBalancer(params exporter.CreateSettings, cfg component.Config, factory componentFactory) (*loadBalancer, error) {
5345
oCfg := cfg.(*Config)
5446

5547
if oCfg.Resolver.DNS != nil && oCfg.Resolver.Static != nil {
@@ -90,21 +82,21 @@ func newLoadBalancer(params exporter.CreateSettings, cfg component.Config, facto
9082
return nil, errNoResolver
9183
}
9284

93-
return &loadBalancerImp{
85+
return &loadBalancer{
9486
logger: params.Logger,
9587
res: res,
9688
componentFactory: factory,
97-
exporters: map[string]component.Component{},
89+
exporters: map[string]*wrappedExporter{},
9890
}, nil
9991
}
10092

101-
func (lb *loadBalancerImp) Start(ctx context.Context, host component.Host) error {
93+
func (lb *loadBalancer) Start(ctx context.Context, host component.Host) error {
10294
lb.res.onChange(lb.onBackendChanges)
10395
lb.host = host
10496
return lb.res.start(ctx)
10597
}
10698

107-
func (lb *loadBalancerImp) onBackendChanges(resolved []string) {
99+
func (lb *loadBalancer) onBackendChanges(resolved []string) {
108100
newRing := newHashRing(resolved)
109101

110102
if !newRing.equal(lb.ring) {
@@ -122,7 +114,7 @@ func (lb *loadBalancerImp) onBackendChanges(resolved []string) {
122114
}
123115
}
124116

125-
func (lb *loadBalancerImp) addMissingExporters(ctx context.Context, endpoints []string) {
117+
func (lb *loadBalancer) addMissingExporters(ctx context.Context, endpoints []string) {
126118
for _, endpoint := range endpoints {
127119
endpoint = endpointWithPort(endpoint)
128120

@@ -132,12 +124,12 @@ func (lb *loadBalancerImp) addMissingExporters(ctx context.Context, endpoints []
132124
lb.logger.Error("failed to create new exporter for endpoint", zap.String("endpoint", endpoint), zap.Error(err))
133125
continue
134126
}
135-
136-
if err = exp.Start(ctx, lb.host); err != nil {
127+
we := newWrappedExporter(exp)
128+
if err = we.Start(ctx, lb.host); err != nil {
137129
lb.logger.Error("failed to start new exporter for endpoint", zap.String("endpoint", endpoint), zap.Error(err))
138130
continue
139131
}
140-
lb.exporters[endpoint] = exp
132+
lb.exporters[endpoint] = we
141133
}
142134
}
143135
}
@@ -149,7 +141,7 @@ func endpointWithPort(endpoint string) string {
149141
return endpoint
150142
}
151143

152-
func (lb *loadBalancerImp) removeExtraExporters(ctx context.Context, endpoints []string) {
144+
func (lb *loadBalancer) removeExtraExporters(ctx context.Context, endpoints []string) {
153145
endpointsWithPort := make([]string, len(endpoints))
154146
for i, e := range endpoints {
155147
endpointsWithPort[i] = endpointWithPort(e)
@@ -172,29 +164,24 @@ func endpointFound(endpoint string, endpoints []string) bool {
172164
return false
173165
}
174166

175-
func (lb *loadBalancerImp) Shutdown(context.Context) error {
167+
func (lb *loadBalancer) Shutdown(context.Context) error {
176168
lb.stopped = true
177169
return nil
178170
}
179171

180-
func (lb *loadBalancerImp) Endpoint(identifier []byte) string {
181-
lb.updateLock.RLock()
182-
defer lb.updateLock.RUnlock()
183-
184-
return lb.ring.endpointFor(identifier)
185-
}
186-
187-
func (lb *loadBalancerImp) Exporter(endpoint string) (component.Component, error) {
172+
// exporterAndEndpoint returns the exporter and the endpoint for the given identifier.
173+
func (lb *loadBalancer) exporterAndEndpoint(identifier []byte) (*wrappedExporter, string, error) {
188174
// NOTE: make rolling updates of next tier of collectors work. currently, this may cause
189175
// data loss because the latest batches sent to outdated backend will never find their way out.
190176
// for details: https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/1690
191177
lb.updateLock.RLock()
178+
defer lb.updateLock.RUnlock()
179+
endpoint := lb.ring.endpointFor(identifier)
192180
exp, found := lb.exporters[endpointWithPort(endpoint)]
193-
lb.updateLock.RUnlock()
194181
if !found {
195182
// something is really wrong... how come we couldn't find the exporter??
196-
return nil, fmt.Errorf("couldn't find the exporter for the endpoint %q", endpoint)
183+
return nil, "", fmt.Errorf("couldn't find the exporter for the endpoint %q", endpoint)
197184
}
198185

199-
return exp, nil
186+
return exp, endpoint, nil
200187
}

exporter/loadbalancingexporter/loadbalancer_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ func TestWithDNSResolverNoEndpoints(t *testing.T) {
136136
require.NoError(t, err)
137137

138138
// test
139-
e := p.Endpoint([]byte{128, 128, 0, 0})
139+
_, e, _ := p.exporterAndEndpoint([]byte{128, 128, 0, 0})
140140

141141
// verify
142142
assert.Equal(t, "", e)
@@ -376,19 +376,19 @@ func TestFailedExporterInRing(t *testing.T) {
376376

377377
// test
378378
// this trace ID will reach the endpoint-2 -- see the consistent hashing tests for more info
379-
_, err = p.Exporter(p.Endpoint([]byte{128, 128, 0, 0}))
379+
_, _, err = p.exporterAndEndpoint([]byte{128, 128, 0, 0})
380380

381381
// verify
382382
assert.Error(t, err)
383383

384384
// test
385385
// this service name will reach the endpoint-2 -- see the consistent hashing tests for more info
386-
_, err = p.Exporter(p.Endpoint([]byte("get-recommendations-1")))
386+
_, _, err = p.exporterAndEndpoint([]byte("get-recommendations-1"))
387387

388388
// verify
389389
assert.Error(t, err)
390390
}
391391

392-
func newNopMockExporter() component.Component {
393-
return mockComponent{}
392+
func newNopMockExporter() *wrappedExporter {
393+
return newWrappedExporter(mockComponent{})
394394
}

exporter/loadbalancingexporter/log_exporter.go

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ package loadbalancingexporter // import "github.com/open-telemetry/opentelemetry
55

66
import (
77
"context"
8-
"fmt"
98
"math/rand"
109
"sync"
1110
"time"
@@ -26,7 +25,7 @@ import (
2625
var _ exporter.Logs = (*logExporterImp)(nil)
2726

2827
type logExporterImp struct {
29-
loadBalancer loadBalancer
28+
loadBalancer *loadBalancer
3029

3130
started bool
3231
shutdownWg sync.WaitGroup
@@ -87,16 +86,13 @@ func (e *logExporterImp) consumeLog(ctx context.Context, ld plog.Logs) error {
8786
balancingKey = random()
8887
}
8988

90-
endpoint := e.loadBalancer.Endpoint(balancingKey[:])
91-
exp, err := e.loadBalancer.Exporter(endpoint)
89+
le, endpoint, err := e.loadBalancer.exporterAndEndpoint(balancingKey[:])
9290
if err != nil {
9391
return err
9492
}
9593

96-
le, ok := exp.(exporter.Logs)
97-
if !ok {
98-
return fmt.Errorf("unable to export logs, unexpected exporter type: expected exporter.Logs but got %T", exp)
99-
}
94+
le.consumeWG.Add(1)
95+
defer le.consumeWG.Done()
10096

10197
start := time.Now()
10298
err = le.ConsumeLogs(ctx, ld)

exporter/loadbalancingexporter/log_exporter_test.go

Lines changed: 65 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,58 @@ func TestLogsWithoutTraceID(t *testing.T) {
287287
assert.Len(t, sink.AllLogs(), 1)
288288
}
289289

290+
// this test validates that exporter is can concurrently change the endpoints while consuming logs.
291+
func TestConsumeLogs_ConcurrentResolverChange(t *testing.T) {
292+
consumeStarted := make(chan struct{})
293+
consumeDone := make(chan struct{})
294+
295+
// imitate a slow exporter
296+
te := &mockLogsExporter{Component: mockComponent{}}
297+
te.consumelogsfn = func(ctx context.Context, td plog.Logs) error {
298+
close(consumeStarted)
299+
time.Sleep(50 * time.Millisecond)
300+
return te.consumeErr
301+
}
302+
componentFactory := func(ctx context.Context, endpoint string) (component.Component, error) {
303+
return te, nil
304+
}
305+
lb, err := newLoadBalancer(exportertest.NewNopCreateSettings(), simpleConfig(), componentFactory)
306+
require.NotNil(t, lb)
307+
require.NoError(t, err)
308+
309+
p, err := newLogsExporter(exportertest.NewNopCreateSettings(), simpleConfig())
310+
require.NotNil(t, p)
311+
require.NoError(t, err)
312+
313+
endpoints := []string{"endpoint-1"}
314+
lb.res = &mockResolver{
315+
triggerCallbacks: true,
316+
onResolve: func(ctx context.Context) ([]string, error) {
317+
return endpoints, nil
318+
},
319+
}
320+
p.loadBalancer = lb
321+
322+
err = p.Start(context.Background(), componenttest.NewNopHost())
323+
require.NoError(t, err)
324+
defer func() {
325+
require.NoError(t, p.Shutdown(context.Background()))
326+
}()
327+
328+
go func() {
329+
assert.NoError(t, p.ConsumeLogs(context.Background(), simpleLogs()))
330+
close(consumeDone)
331+
}()
332+
333+
// update endpoint while consuming logs
334+
<-consumeStarted
335+
endpoints = []string{"endpoint-2"}
336+
endpoint, err := lb.res.resolve(context.Background())
337+
require.NoError(t, err)
338+
require.Equal(t, endpoints, endpoint)
339+
<-consumeDone
340+
}
341+
290342
func TestRollingUpdatesWhenConsumeLogs(t *testing.T) {
291343
t.Skip("Flaky Test - See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/13331")
292344

@@ -360,19 +412,17 @@ func TestRollingUpdatesWhenConsumeLogs(t *testing.T) {
360412

361413
counter1 := &atomic.Int64{}
362414
counter2 := &atomic.Int64{}
363-
defaultExporters := map[string]component.Component{
364-
"127.0.0.1:4317": newMockLogsExporter(func(ctx context.Context, ld plog.Logs) error {
415+
defaultExporters := map[string]*wrappedExporter{
416+
"127.0.0.1:4317": newWrappedExporter(newMockLogsExporter(func(ctx context.Context, ld plog.Logs) error {
365417
counter1.Add(1)
366418
// simulate an unreachable backend
367419
time.Sleep(10 * time.Second)
368420
return nil
369-
},
370-
),
371-
"127.0.0.2:4317": newMockLogsExporter(func(ctx context.Context, ld plog.Logs) error {
421+
})),
422+
"127.0.0.2:4317": newWrappedExporter(newMockLogsExporter(func(ctx context.Context, ld plog.Logs) error {
372423
counter2.Add(1)
373424
return nil
374-
},
375-
),
425+
})),
376426
}
377427

378428
// test
@@ -458,15 +508,21 @@ func simpleLogWithoutID() plog.Logs {
458508
type mockLogsExporter struct {
459509
component.Component
460510
consumelogsfn func(ctx context.Context, ld plog.Logs) error
511+
consumeErr error
461512
}
462513

463514
func (e *mockLogsExporter) Capabilities() consumer.Capabilities {
464515
return consumer.Capabilities{MutatesData: false}
465516
}
466517

518+
func (e *mockLogsExporter) Shutdown(context.Context) error {
519+
e.consumeErr = errors.New("exporter is shut down")
520+
return nil
521+
}
522+
467523
func (e *mockLogsExporter) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
468524
if e.consumelogsfn == nil {
469-
return nil
525+
return e.consumeErr
470526
}
471527
return e.consumelogsfn(ctx, ld)
472528
}
@@ -484,10 +540,5 @@ func newMockLogsExporter(consumelogsfn func(ctx context.Context, ld plog.Logs) e
484540
}
485541

486542
func newNopMockLogsExporter() exporter.Logs {
487-
return &mockLogsExporter{
488-
Component: mockComponent{},
489-
consumelogsfn: func(ctx context.Context, ld plog.Logs) error {
490-
return nil
491-
},
492-
}
543+
return &mockLogsExporter{Component: mockComponent{}}
493544
}

0 commit comments

Comments
 (0)