Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion controller/api/destination/endpoint_profile_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func newEndpointProfileTranslator(
stream pb.Destination_GetProfileServer,
endStream chan struct{},
log *logging.Entry,
queueCapacity int,
) *endpointProfileTranslator {
return &endpointProfileTranslator{
forceOpaqueTransport: forceOpaqueTransport,
Expand All @@ -66,7 +67,7 @@ func newEndpointProfileTranslator(

stream: stream,
endStream: endStream,
updates: make(chan *watcher.Address, updateQueueCapacity),
updates: make(chan *watcher.Address, queueCapacity),
stop: make(chan struct{}),

log: log.WithField("component", "endpoint-profile-translator"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func TestEndpointProfileTranslator(t *testing.T) {
mockGetProfileServer,
nil,
log,
DefaultStreamQueueCapacity,
)
translator.Start()
defer translator.Stop()
Expand Down Expand Up @@ -83,17 +84,19 @@ func TestEndpointProfileTranslator(t *testing.T) {
}
log := logging.WithField("test", t.Name())
endStream := make(chan struct{})
queueCapacity := DefaultStreamQueueCapacity
translator := newEndpointProfileTranslator(
true, true, "cluster", "identity", make(map[uint32]struct{}), nil,
mockGetProfileServer,
endStream,
log,
queueCapacity,
)

// We avoid starting the translator so that it doesn't drain its update
// queue and we can test the overflow behavior.

for i := 0; i < updateQueueCapacity/2; i++ {
for i := 0; i < queueCapacity/2; i++ {
if err := translator.Update(podAddr); err != nil {
t.Fatal("Expected update")
}
Expand All @@ -114,10 +117,10 @@ func TestEndpointProfileTranslator(t *testing.T) {
}

// The queue should be full and the next update should fail.
t.Logf("Queue length=%d capacity=%d", translator.queueLen(), updateQueueCapacity)
t.Logf("Queue length=%d capacity=%d", translator.queueLen(), queueCapacity)
if err := translator.Update(podAddr); err == nil {
if !errors.Is(err, http.ErrServerClosed) {
t.Fatalf("Expected update to fail; queue=%d; capacity=%d", translator.queueLen(), updateQueueCapacity)
t.Fatalf("Expected update to fail; queue=%d; capacity=%d", translator.queueLen(), queueCapacity)
}
}

Expand Down
5 changes: 2 additions & 3 deletions controller/api/destination/endpoint_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ const (
envAdminListenAddr = "LINKERD2_PROXY_ADMIN_LISTEN_ADDR"
envControlListenAddr = "LINKERD2_PROXY_CONTROL_LISTEN_ADDR"

updateQueueCapacity = 100

defaultProxyInboundPort = 4143
)

Expand Down Expand Up @@ -101,6 +99,7 @@ func newEndpointTranslator(
stream pb.Destination_GetServer,
endStream chan struct{},
log *logging.Entry,
queueCapacity int,
) (*endpointTranslator, error) {
log = log.WithFields(logging.Fields{
"component": "endpoint-translator",
Expand Down Expand Up @@ -139,7 +138,7 @@ func newEndpointTranslator(
endStream,
log,
counter,
make(chan interface{}, updateQueueCapacity),
make(chan interface{}, queueCapacity),
make(chan struct{}),
}, nil
}
Expand Down
2 changes: 2 additions & 0 deletions controller/api/destination/federated_service_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,7 @@ func (fs *federatedService) remoteDiscoverySubscribe(
subscriber.stream,
subscriber.endStream,
fs.log,
fs.config.StreamQueueCapacity,
)
if err != nil {
fs.log.Errorf("Failed to create endpoint translator for remote discovery service %q in cluster %s: %s", id.service.Name, id.cluster, err)
Expand Down Expand Up @@ -418,6 +419,7 @@ func (fs *federatedService) localDiscoverySubscribe(
subscriber.stream,
subscriber.endStream,
fs.log,
fs.config.StreamQueueCapacity,
)
if err != nil {
fs.log.Errorf("Failed to create endpoint translator for %s: %s", localDiscovery, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func mockFederatedServiceWatcher(t *testing.T) (*federatedServiceWatcher, error)
if err != nil {
return nil, fmt.Errorf("NewClusterStoreWithDecoder returned an error: %w", err)
}
fsw, err := newFederatedServiceWatcher(k8sAPI, metadataAPI, &Config{}, clusterStore, localEndpoints, logging.WithField("test", t.Name()))
fsw, err := newFederatedServiceWatcher(k8sAPI, metadataAPI, &Config{StreamQueueCapacity: DefaultStreamQueueCapacity}, clusterStore, localEndpoints, logging.WithField("test", t.Name()))
if err != nil {
return nil, fmt.Errorf("newFederatedServiceWatcher returned an error: %w", err)
}
Expand Down
6 changes: 5 additions & 1 deletion controller/api/destination/profile_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ var profileUpdatesQueueOverflowCounter = promauto.NewCounterVec(
)

func newProfileTranslator(serviceID watcher.ServiceID, stream pb.Destination_GetProfileServer, log *logging.Entry, fqn string, port uint32, endStream chan struct{}) (*profileTranslator, error) {
return newProfileTranslatorWithCapacity(serviceID, stream, log, fqn, port, endStream, DefaultStreamQueueCapacity)
}

func newProfileTranslatorWithCapacity(serviceID watcher.ServiceID, stream pb.Destination_GetProfileServer, log *logging.Entry, fqn string, port uint32, endStream chan struct{}, queueCapacity int) (*profileTranslator, error) {
parentRef := &meta.Metadata{
Kind: &meta.Metadata_Resource{
Resource: &meta.Resource{
Expand All @@ -73,7 +77,7 @@ func newProfileTranslator(serviceID watcher.ServiceID, stream pb.Destination_Get
endStream: endStream,
log: log.WithField("component", "profile-translator"),
overflowCounter: overflowCounter,
updates: make(chan *sp.ServiceProfile, updateQueueCapacity),
updates: make(chan *sp.ServiceProfile, queueCapacity),
stop: make(chan struct{}),
}, nil
}
Expand Down
11 changes: 10 additions & 1 deletion controller/api/destination/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ import (
kerrors "k8s.io/apimachinery/pkg/api/errors"
)

// DefaultStreamQueueCapacity defines the default maximum number of pending
// updates buffered per stream before the stream is closed.
const DefaultStreamQueueCapacity = 100

type (
Config struct {
ControllerNS,
Expand All @@ -39,6 +43,8 @@ type (
MeshedHttp2ClientParams *pb.Http2ClientParams

DefaultOpaquePorts map[uint32]struct{}

StreamQueueCapacity int
}

server struct {
Expand Down Expand Up @@ -219,6 +225,7 @@ func (s *server) Get(dest *pb.GetDestination, stream pb.Destination_GetServer) e
stream,
streamEnd,
log,
s.config.StreamQueueCapacity,
)
if err != nil {
return status.Errorf(codes.Internal, "Failed to create endpoint translator: %s", err)
Expand Down Expand Up @@ -257,6 +264,7 @@ func (s *server) Get(dest *pb.GetDestination, stream pb.Destination_GetServer) e
stream,
streamEnd,
log,
s.config.StreamQueueCapacity,
)
if err != nil {
return status.Errorf(codes.Internal, "Failed to create endpoint translator: %s", err)
Expand Down Expand Up @@ -403,7 +411,7 @@ func (s *server) subscribeToServiceProfile(
// We build up the pipeline of profile updaters backwards, starting from
// the translator which takes profile updates, translates them to protobuf
// and pushes them onto the gRPC stream.
translator, err := newProfileTranslator(service, stream, log, fqn, port, streamEnd)
translator, err := newProfileTranslatorWithCapacity(service, stream, log, fqn, port, streamEnd, s.config.StreamQueueCapacity)
if err != nil {
return status.Errorf(codes.Internal, "Failed to create profile translator: %s", err)
}
Expand Down Expand Up @@ -552,6 +560,7 @@ func (s *server) subscribeToEndpointProfile(
stream,
streamEnd,
log,
s.config.StreamQueueCapacity,
)
translator.Start()
defer translator.Stop()
Expand Down
4 changes: 3 additions & 1 deletion controller/api/destination/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1062,7 +1062,7 @@ spec:
t.Fatalf("can't create cluster store: %s", err)
}

federatedServices, err := newFederatedServiceWatcher(k8sAPI, metadataAPI, &Config{}, clusterStore, endpoints, log)
federatedServices, err := newFederatedServiceWatcher(k8sAPI, metadataAPI, &Config{StreamQueueCapacity: DefaultStreamQueueCapacity}, clusterStore, endpoints, log)
if err != nil {
t.Fatalf("can't create federated service watcher: %s", err)
}
Expand All @@ -1082,6 +1082,7 @@ spec:
ClusterDomain: "mycluster.local",
IdentityTrustDomain: "trust.domain",
DefaultOpaquePorts: defaultOpaquePorts,
StreamQueueCapacity: DefaultStreamQueueCapacity,
},
workloads,
endpoints,
Expand Down Expand Up @@ -1191,6 +1192,7 @@ metadata:
mockGetServer,
nil,
logging.WithField("test", t.Name()),
DefaultStreamQueueCapacity,
)
if err != nil {
t.Fatalf("failed to create endpoint translator: %s", err)
Expand Down
6 changes: 6 additions & 0 deletions controller/cmd/destination/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func Main(args []string) {
// This will default to true. It can be overridden with experimental CLI
// flags. Currently not exposed as a configuration value through Helm.
exportControllerQueueMetrics := cmd.Bool("export-queue-metrics", true, "Exports queue metrics for the external workload controller")
streamQueueCapacity := cmd.Int("stream-queue-capacity", destination.DefaultStreamQueueCapacity, "Maximum number of updates buffered per stream before the stream is closed")

traceCollector := flags.AddTraceFlags(cmd)

Expand All @@ -66,6 +67,10 @@ func Main(args []string) {

flags.ConfigureAndParse(cmd, args)

if *streamQueueCapacity <= 0 {
log.Fatalf("--stream-queue-capacity must be greater than 0")
}

if *enableIPv6 && !*enableEndpointSlices {
log.Fatal("If --enable-ipv6=true then --enable-endpoint-slices needs to be true")
}
Expand Down Expand Up @@ -190,6 +195,7 @@ func Main(args []string) {
EnableIPv6: *enableIPv6,
ExtEndpointZoneWeights: *extEndpointZoneWeights,
MeshedHttp2ClientParams: meshedHTTP2ClientParams,
StreamQueueCapacity: *streamQueueCapacity,
}
server, err := destination.NewServer(
*addr,
Expand Down