Skip to content

Commit 8dca484

Browse files
authored
fix(destination): prevent task leak in federated service watchers (#14693)
When unsubscribing a stream from a federated service, we: 1. For each cluster, send an update on the stream's channel to remove endpoints. 2. Call synchronizedGetStream.Stop() so that it stops processing updates. These steps can race and deadlock: if Stop() has been called before endoint updates are processed, the subsequent Send() calls block forever. This change improves test coverage to cover this case as well as that fixed in f4e6795.
1 parent 2ec2abc commit 8dca484

File tree

2 files changed

+117
-2
lines changed

2 files changed

+117
-2
lines changed

controller/api/destination/syncronized_get_stream.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package destination
22

33
import (
44
"context"
5+
"errors"
56

67
pb "github.com/linkerd/linkerd2-proxy-api/go/destination"
78
logging "github.com/sirupsen/logrus"
@@ -21,6 +22,8 @@ type synchronizedGetStream struct {
2122
log *logging.Entry
2223
}
2324

25+
var errStreamStopped = errors.New("synchronized stream stopped")
26+
2427
func newSyncronizedGetStream(stream pb.Destination_GetServer, log *logging.Entry) *synchronizedGetStream {
2528
return &synchronizedGetStream{
2629
done: make(chan struct{}),
@@ -50,8 +53,12 @@ func (s *synchronizedGetStream) RecvMsg(m any) error {
5053
}
5154

5255
func (s *synchronizedGetStream) Send(update *pb.Update) error {
53-
s.ch <- update
54-
return nil
56+
select {
57+
case <-s.done:
58+
return errStreamStopped
59+
case s.ch <- update:
60+
return nil
61+
}
5562
}
5663

5764
func (s *synchronizedGetStream) Start() {
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
package destination
2+
3+
import (
4+
"errors"
5+
"sync"
6+
"testing"
7+
"time"
8+
9+
pb "github.com/linkerd/linkerd2-proxy-api/go/destination"
10+
"github.com/linkerd/linkerd2/controller/api/util"
11+
logging "github.com/sirupsen/logrus"
12+
)
13+
14+
type blockingDestinationGetServer struct {
15+
util.MockServerStream
16+
block chan struct{}
17+
sendCalled chan struct{}
18+
once sync.Once
19+
}
20+
21+
func newBlockingDestinationGetServer() *blockingDestinationGetServer {
22+
return &blockingDestinationGetServer{
23+
block: make(chan struct{}),
24+
sendCalled: make(chan struct{}),
25+
}
26+
}
27+
28+
func (b *blockingDestinationGetServer) Send(update *pb.Update) error {
29+
b.once.Do(func() {
30+
close(b.sendCalled)
31+
})
32+
<-b.block
33+
return nil
34+
}
35+
36+
func (b *blockingDestinationGetServer) unblock() {
37+
close(b.block)
38+
}
39+
40+
// TestSynchronizedGetStreamSendAfterStop ensures Send returns promptly once the
41+
// stream has been stopped so goroutines don't leak waiting on an unconsumed
42+
// channel send.
43+
func TestSynchronizedGetStreamSendAfterStop(t *testing.T) {
44+
mock := &mockDestinationGetServer{
45+
updatesReceived: make(chan *pb.Update, 1),
46+
}
47+
stream := newSyncronizedGetStream(mock, logging.WithField("test", t.Name()))
48+
stream.Start()
49+
stream.Stop()
50+
51+
errCh := make(chan error, 1)
52+
go func() {
53+
errCh <- stream.Send(&pb.Update{})
54+
}()
55+
56+
select {
57+
case err := <-errCh:
58+
if !errors.Is(err, errStreamStopped) {
59+
t.Fatalf("expected errStreamStopped, got %v", err)
60+
}
61+
case <-time.After(time.Second):
62+
t.Fatal("Send blocked after Stop")
63+
}
64+
}
65+
66+
func TestSynchronizedGetStreamStopWhileInnerSendBlocked(t *testing.T) {
67+
mock := newBlockingDestinationGetServer()
68+
stream := newSyncronizedGetStream(mock, logging.WithField("test", t.Name()))
69+
stream.Start()
70+
71+
firstSend := make(chan error, 1)
72+
go func() {
73+
firstSend <- stream.Send(&pb.Update{})
74+
}()
75+
76+
select {
77+
case err := <-firstSend:
78+
if err != nil {
79+
t.Fatalf("unexpected error from initial Send: %v", err)
80+
}
81+
case <-time.After(time.Second):
82+
t.Fatal("initial Send did not complete")
83+
}
84+
85+
select {
86+
case <-mock.sendCalled:
87+
case <-time.After(time.Second):
88+
t.Fatal("inner Send was not invoked")
89+
}
90+
91+
stream.Stop()
92+
93+
secondSend := make(chan error, 1)
94+
go func() {
95+
secondSend <- stream.Send(&pb.Update{})
96+
}()
97+
98+
select {
99+
case err := <-secondSend:
100+
if !errors.Is(err, errStreamStopped) {
101+
t.Fatalf("expected errStreamStopped, got %v", err)
102+
}
103+
case <-time.After(time.Second):
104+
t.Fatal("second Send blocked after Stop")
105+
}
106+
107+
mock.unblock()
108+
}

0 commit comments

Comments
 (0)