Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 22 additions & 10 deletions controller/api/destination/watcher/workload_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type (
k8sAPI *k8s.API
metadataAPI *k8s.MetadataAPI
publishers map[IPPort]*workloadPublisher
metrics metrics
log *logging.Entry
enableEndpointSlices bool

Expand Down Expand Up @@ -63,18 +64,25 @@ type (
var workloadVecs = newMetricsVecs("workload", []string{})

func NewWorkloadWatcher(k8sAPI *k8s.API, metadataAPI *k8s.MetadataAPI, log *logging.Entry, enableEndpointSlices bool, defaultOpaquePorts map[uint32]struct{}) (*WorkloadWatcher, error) {
// Omit high-cardinality IP:port labels.
metrics, err := workloadVecs.newMetrics(prometheus.Labels{})
if err != nil {
return nil, err
}

ww := &WorkloadWatcher{
defaultOpaquePorts: defaultOpaquePorts,
k8sAPI: k8sAPI,
metadataAPI: metadataAPI,
publishers: make(map[IPPort]*workloadPublisher),
metrics: metrics,
log: log.WithFields(logging.Fields{
"component": "workload-watcher",
}),
enableEndpointSlices: enableEndpointSlices,
}

_, err := k8sAPI.Pod().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
_, err = k8sAPI.Pod().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: ww.addPod,
DeleteFunc: ww.deletePod,
UpdateFunc: ww.updatePod,
Expand Down Expand Up @@ -125,6 +133,8 @@ func (ww *WorkloadWatcher) Subscribe(service *ServiceID, hostname, ip string, po
return "", err
}

ww.updateSubscriberCount()

return wp.addr.IP, nil
}

Expand All @@ -145,6 +155,16 @@ func (ww *WorkloadWatcher) Unsubscribe(ip string, port Port, listener WorkloadUp
if len(wp.listeners) == 0 {
delete(ww.publishers, IPPort{wp.addr.IP, wp.addr.Port})
}

ww.updateSubscriberCount()
}

func (ww *WorkloadWatcher) updateSubscriberCount() {
totalSubscribers := 0
for _, wp := range ww.publishers {
totalSubscribers += len(wp.listeners)
}
ww.metrics.setSubscribers(totalSubscribers)
}

// addPod is an event handler so it cannot block
Expand Down Expand Up @@ -454,11 +474,6 @@ func (ww *WorkloadWatcher) getOrNewWorkloadPublisher(service *ServiceID, hostnam
ipPort := IPPort{ip, port}
wp, ok := ww.publishers[ipPort]
if !ok {
// Omit high-cardinality IP:port labels.
metrics, err := workloadVecs.newMetrics(prometheus.Labels{})
if err != nil {
return nil, err
}
wp = &workloadPublisher{
defaultOpaquePorts: ww.defaultOpaquePorts,
k8sAPI: ww.k8sAPI,
Expand All @@ -467,7 +482,7 @@ func (ww *WorkloadWatcher) getOrNewWorkloadPublisher(service *ServiceID, hostnam
IP: ip,
Port: port,
},
metrics: metrics,
metrics: ww.metrics,
log: ww.log.WithFields(logging.Fields{
"component": "workload-publisher",
"ip": ip,
Expand Down Expand Up @@ -630,7 +645,6 @@ func (wp *workloadPublisher) subscribe(listener WorkloadUpdateListener) error {
defer wp.mu.Unlock()

wp.listeners = append(wp.listeners, listener)
wp.metrics.setSubscribers(len(wp.listeners))

if err := listener.Update(&wp.addr); err != nil {
return fmt.Errorf("failed to send initial update: %w", err)
Expand All @@ -652,8 +666,6 @@ func (wp *workloadPublisher) unsubscribe(listener WorkloadUpdateListener) {
break
}
}

wp.metrics.setSubscribers(len(wp.listeners))
}

// updatePod creates an Address instance for the given pod, that is passed to
Expand Down