Skip to content

Commit 9162152

Browse files
dehaansaZenoCC-Peng
authored andcommitted
[exporter/loadbalancing] Update k8sresolver handler to properly manage update events (open-telemetry#36505)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description The load balancing exporter's k8sresolver was not handling update events properly. The `callback` function was being executed after cleanup of old endpoints and also after adding new endpoints. This causes exporter churn in the case of an event in which the lists contain shared elements. See the [documentation](https://pkg.go.dev/k8s.io/client-go/tools/cache#ResourceEventHandler) for examples where the state might change but the IP Addresses would not, including the regular re-list events that might have zero changes. <!-- Issue number (e.g. open-telemetry#1234) or full URL to issue, if applicable. --> #### Link to tracking issue Fixes open-telemetry#35658 May be related to open-telemetry#35810 as well. <!--Describe what testing was performed and which tests were added.--> #### Testing Added tests for no-change onChange call. <!--Please delete paragraphs that you did not use before submitting.-->
1 parent 76082cf commit 9162152

File tree

3 files changed

+91
-16
lines changed

3 files changed

+91
-16
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: bug_fix
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: loadbalancingexporter
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: The k8sresolver in loadbalancingexporter was triggering exporter churn in the way the change event was handled.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [35658]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: []

exporter/loadbalancingexporter/resolver_k8s_handler.go

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ type handler struct {
2525
}
2626

2727
func (h handler) OnAdd(obj any, _ bool) {
28-
var endpoints []string
28+
var endpoints map[string]bool
2929

3030
switch object := obj.(type) {
3131
case *corev1.Endpoints:
@@ -36,7 +36,7 @@ func (h handler) OnAdd(obj any, _ bool) {
3636
return
3737
}
3838
changed := false
39-
for _, ep := range endpoints {
39+
for ep := range endpoints {
4040
if _, loaded := h.endpoints.LoadOrStore(ep, true); !loaded {
4141
changed = true
4242
}
@@ -49,28 +49,36 @@ func (h handler) OnAdd(obj any, _ bool) {
4949
func (h handler) OnUpdate(oldObj, newObj any) {
5050
switch oldEps := oldObj.(type) {
5151
case *corev1.Endpoints:
52-
epRemove := convertToEndpoints(oldEps)
53-
for _, ep := range epRemove {
54-
h.endpoints.Delete(ep)
55-
}
56-
if len(epRemove) > 0 {
57-
_, _ = h.callback(context.Background())
58-
}
59-
6052
newEps, ok := newObj.(*corev1.Endpoints)
6153
if !ok {
6254
h.logger.Warn("Got an unexpected Kubernetes data type during the update of the pods for a service", zap.Any("obj", newObj))
6355
h.telemetry.LoadbalancerNumResolutions.Add(context.Background(), 1, metric.WithAttributeSet(k8sResolverFailureAttrSet))
6456
return
6557
}
58+
59+
oldEndpoints := convertToEndpoints(oldEps)
60+
newEndpoints := convertToEndpoints(newEps)
6661
changed := false
67-
for _, ep := range convertToEndpoints(newEps) {
62+
63+
// Iterate through old endpoints and remove those that are not in the new list.
64+
for ep := range oldEndpoints {
65+
if _, ok := newEndpoints[ep]; !ok {
66+
h.endpoints.Delete(ep)
67+
changed = true
68+
}
69+
}
70+
71+
// Iterate through new endpoints and add those that are not in the endpoints map already.
72+
for ep := range newEndpoints {
6873
if _, loaded := h.endpoints.LoadOrStore(ep, true); !loaded {
6974
changed = true
7075
}
7176
}
77+
7278
if changed {
7379
_, _ = h.callback(context.Background())
80+
} else {
81+
h.logger.Debug("No changes detected in the endpoints for the service", zap.Any("old", oldEps), zap.Any("new", newEps))
7482
}
7583
default: // unsupported
7684
h.logger.Warn("Got an unexpected Kubernetes data type during the update of the pods for a service", zap.Any("obj", oldObj))
@@ -80,7 +88,7 @@ func (h handler) OnUpdate(oldObj, newObj any) {
8088
}
8189

8290
func (h handler) OnDelete(obj any) {
83-
var endpoints []string
91+
var endpoints map[string]bool
8492
switch object := obj.(type) {
8593
case *cache.DeletedFinalStateUnknown:
8694
h.OnDelete(object.Obj)
@@ -95,19 +103,19 @@ func (h handler) OnDelete(obj any) {
95103
return
96104
}
97105
if len(endpoints) != 0 {
98-
for _, endpoint := range endpoints {
106+
for endpoint := range endpoints {
99107
h.endpoints.Delete(endpoint)
100108
}
101109
_, _ = h.callback(context.Background())
102110
}
103111
}
104112

105-
func convertToEndpoints(eps ...*corev1.Endpoints) []string {
106-
var ipAddress []string
113+
func convertToEndpoints(eps ...*corev1.Endpoints) map[string]bool {
114+
ipAddress := map[string]bool{}
107115
for _, ep := range eps {
108116
for _, subsets := range ep.Subsets {
109117
for _, addr := range subsets.Addresses {
110-
ipAddress = append(ipAddress, addr.IP)
118+
ipAddress[addr.IP] = true
111119
}
112120
}
113121
}

exporter/loadbalancingexporter/resolver_k8s_test.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ func TestK8sResolve(t *testing.T) {
7777
name string
7878
args args
7979
simulateFn func(*suiteContext, args) error
80+
onChangeFn func([]string)
8081
verifyFn func(*suiteContext, args) error
8182
}{
8283
{
@@ -116,6 +117,41 @@ func TestK8sResolve(t *testing.T) {
116117
return nil
117118
},
118119
},
120+
{
121+
name: "simulate re-list that does not change endpoints",
122+
args: args{
123+
logger: zap.NewNop(),
124+
service: "lb",
125+
namespace: "default",
126+
ports: []int32{8080, 9090},
127+
},
128+
simulateFn: func(suiteCtx *suiteContext, args args) error {
129+
exist := suiteCtx.endpoint.DeepCopy()
130+
patch := client.MergeFrom(exist)
131+
data, err := patch.Data(exist)
132+
if err != nil {
133+
return err
134+
}
135+
_, err = suiteCtx.clientset.CoreV1().Endpoints(args.namespace).
136+
Patch(context.TODO(), args.service, types.MergePatchType, data, metav1.PatchOptions{})
137+
return err
138+
},
139+
onChangeFn: func([]string) {
140+
assert.Fail(t, "should not call onChange")
141+
},
142+
verifyFn: func(ctx *suiteContext, _ args) error {
143+
if _, err := ctx.resolver.resolve(context.Background()); err != nil {
144+
return err
145+
}
146+
147+
assert.Equal(t, []string{
148+
"192.168.10.100:8080",
149+
"192.168.10.100:9090",
150+
}, ctx.resolver.Endpoints(), "resolver failed, endpoints not equal")
151+
152+
return nil
153+
},
154+
},
119155
{
120156
name: "simulate change the backend ip address",
121157
args: args{
@@ -177,6 +213,10 @@ func TestK8sResolve(t *testing.T) {
177213
suiteCtx, teardownSuite := setupSuite(t, tt.args)
178214
defer teardownSuite(t)
179215

216+
if tt.onChangeFn != nil {
217+
suiteCtx.resolver.onChange(tt.onChangeFn)
218+
}
219+
180220
err := tt.simulateFn(suiteCtx, tt.args)
181221
assert.NoError(t, err)
182222

0 commit comments

Comments
 (0)