Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 15 additions & 7 deletions pkg/loadbalancer/loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -775,7 +775,9 @@ func (lb *LoadBalancer) OnDestinationRuleUpdate(oldDr, dr *istioapi.DestinationR
delete(lb.policyMap, svcPort)
}
lb.setLoadBalancerPolicy(dr, policyName, svcPort, state.endpoints)
lb.policyMap[svcPort].Update(oldDr, dr)
if _, exists := lb.policyMap[svcPort]; exists {
lb.policyMap[svcPort].Update(oldDr, dr)
}
lb.policyMutex.Unlock()
}
}
Expand Down Expand Up @@ -815,8 +817,10 @@ func (lb *LoadBalancer) setLoadBalancerPolicy(dr *istioapi.DestinationRule, poli
}

// TryPickEndpoint try to pick a service endpoint from load-balance strategy.
func (lb *LoadBalancer) tryPickEndpoint(svcPort proxy.ServicePortName, sessionAffinityEnabled bool, endpoints []string,
srcAddr net.Addr, netConn net.Conn, cliReq *http.Request) (string, *http.Request, bool) {
func (lb *LoadBalancer) tryPickEndpoint(
svcPort proxy.ServicePortName, sessionAffinityEnabled bool, endpoints []string,
srcAddr net.Addr, netConn net.Conn, cliReq *http.Request,
) (string, *http.Request, bool) {
lb.policyMutex.Lock()
defer lb.policyMutex.Unlock()

Expand All @@ -835,8 +839,10 @@ func (lb *LoadBalancer) tryPickEndpoint(svcPort proxy.ServicePortName, sessionAf
return endpoint, req, true
}

func (lb *LoadBalancer) nextEndpointWithConn(svcPort proxy.ServicePortName, srcAddr net.Addr, sessionAffinityReset bool,
netConn net.Conn, cliReq *http.Request) (string, *http.Request, error) {
func (lb *LoadBalancer) nextEndpointWithConn(
svcPort proxy.ServicePortName, srcAddr net.Addr, sessionAffinityReset bool,
netConn net.Conn, cliReq *http.Request,
) (string, *http.Request, error) {
// Coarse locking is simple. We can get more fine-grained if/when we
// can prove it matters.
lb.lock.Lock()
Expand Down Expand Up @@ -901,8 +907,10 @@ func (lb *LoadBalancer) nextEndpointWithConn(svcPort proxy.ServicePortName, srcA

// TryConnectEndpoints attempts to connect to the next available endpoint for the given service, cycling
// through until it is able to successfully connect, or it has tried with all timeouts in EndpointDialTimeouts.
func (lb *LoadBalancer) TryConnectEndpoints(service proxy.ServicePortName, srcAddr net.Addr, protocol string,
netConn net.Conn, cliReq *http.Request) (out net.Conn, err error) {
func (lb *LoadBalancer) TryConnectEndpoints(
service proxy.ServicePortName, srcAddr net.Addr, protocol string,
netConn net.Conn, cliReq *http.Request,
) (out net.Conn, err error) {
sessionAffinityReset := false
for _, dialTimeout := range userspace.EndpointDialTimeouts {
endpoint, req, err := lb.nextEndpointWithConn(service, srcAddr, sessionAffinityReset, netConn, cliReq)
Expand Down
Loading