Skip to content

Commit 931aac9

Browse files
deschedule/balance order (continuation) (#1177)
* generalise RunDeschedulerLoop and RunProfiles and stabilish deschedule/balance order * assign nodes outside RunDeschedulerLoop and use instanced profiles * stop exporting internal profile bits * refactoring RunProfiles and add methods to Deschduler * types outside function * shutdown eventBroadcaster outside NewDescheduler * all new methods inside descheduler.go * avoid exporting all Descheduler fields * Address review comments --------- Co-authored-by: Lucas Severo Alves <[email protected]>
1 parent a497541 commit 931aac9

File tree

1 file changed

+169
-105
lines changed

1 file changed

+169
-105
lines changed

pkg/descheduler/descheduler.go

Lines changed: 169 additions & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -26,35 +26,185 @@ import (
2626
"time"
2727

2828
"k8s.io/client-go/discovery"
29+
"k8s.io/client-go/informers"
30+
"k8s.io/client-go/tools/events"
2931
componentbaseconfig "k8s.io/component-base/config"
3032
"k8s.io/klog/v2"
3133

34+
v1 "k8s.io/api/core/v1"
3235
policy "k8s.io/api/policy/v1"
3336
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3437
"k8s.io/apimachinery/pkg/labels"
3538
"k8s.io/apimachinery/pkg/runtime"
3639
"k8s.io/apimachinery/pkg/util/wait"
37-
"k8s.io/client-go/informers"
3840
clientset "k8s.io/client-go/kubernetes"
3941
fakeclientset "k8s.io/client-go/kubernetes/fake"
4042
listersv1 "k8s.io/client-go/listers/core/v1"
4143
schedulingv1 "k8s.io/client-go/listers/scheduling/v1"
4244
core "k8s.io/client-go/testing"
4345

46+
"sigs.k8s.io/descheduler/pkg/descheduler/client"
47+
eutils "sigs.k8s.io/descheduler/pkg/descheduler/evictions/utils"
48+
nodeutil "sigs.k8s.io/descheduler/pkg/descheduler/node"
49+
"sigs.k8s.io/descheduler/pkg/utils"
50+
"sigs.k8s.io/descheduler/pkg/version"
51+
4452
"sigs.k8s.io/descheduler/cmd/descheduler/app/options"
4553
"sigs.k8s.io/descheduler/metrics"
4654
"sigs.k8s.io/descheduler/pkg/api"
47-
"sigs.k8s.io/descheduler/pkg/descheduler/client"
4855
"sigs.k8s.io/descheduler/pkg/descheduler/evictions"
49-
eutils "sigs.k8s.io/descheduler/pkg/descheduler/evictions/utils"
50-
nodeutil "sigs.k8s.io/descheduler/pkg/descheduler/node"
5156
podutil "sigs.k8s.io/descheduler/pkg/descheduler/pod"
5257
"sigs.k8s.io/descheduler/pkg/framework/pluginregistry"
5358
frameworkprofile "sigs.k8s.io/descheduler/pkg/framework/profile"
54-
"sigs.k8s.io/descheduler/pkg/utils"
55-
"sigs.k8s.io/descheduler/pkg/version"
59+
frameworktypes "sigs.k8s.io/descheduler/pkg/framework/types"
5660
)
5761

62+
type eprunner func(ctx context.Context, nodes []*v1.Node) *frameworktypes.Status
63+
64+
type profileRunner struct {
65+
name string
66+
descheduleEPs, balanceEPs eprunner
67+
}
68+
69+
type descheduler struct {
70+
rs *options.DeschedulerServer
71+
podLister listersv1.PodLister
72+
nodeLister listersv1.NodeLister
73+
namespaceLister listersv1.NamespaceLister
74+
priorityClassLister schedulingv1.PriorityClassLister
75+
getPodsAssignedToNode podutil.GetPodsAssignedToNodeFunc
76+
sharedInformerFactory informers.SharedInformerFactory
77+
evictionPolicyGroupVersion string
78+
deschedulerPolicy *api.DeschedulerPolicy
79+
eventRecorder events.EventRecorder
80+
}
81+
82+
func newDescheduler(ctx context.Context, rs *options.DeschedulerServer, deschedulerPolicy *api.DeschedulerPolicy, evictionPolicyGroupVersion string, eventRecorder events.EventRecorder, sharedInformerFactory informers.SharedInformerFactory) (*descheduler, error) {
83+
podInformer := sharedInformerFactory.Core().V1().Pods().Informer()
84+
podLister := sharedInformerFactory.Core().V1().Pods().Lister()
85+
nodeLister := sharedInformerFactory.Core().V1().Nodes().Lister()
86+
namespaceLister := sharedInformerFactory.Core().V1().Namespaces().Lister()
87+
priorityClassLister := sharedInformerFactory.Scheduling().V1().PriorityClasses().Lister()
88+
89+
getPodsAssignedToNode, err := podutil.BuildGetPodsAssignedToNodeFunc(podInformer)
90+
if err != nil {
91+
return nil, fmt.Errorf("build get pods assigned to node function error: %v", err)
92+
}
93+
94+
return &descheduler{
95+
rs: rs,
96+
podLister: podLister,
97+
nodeLister: nodeLister,
98+
namespaceLister: namespaceLister,
99+
priorityClassLister: priorityClassLister,
100+
getPodsAssignedToNode: getPodsAssignedToNode,
101+
sharedInformerFactory: sharedInformerFactory,
102+
evictionPolicyGroupVersion: evictionPolicyGroupVersion,
103+
deschedulerPolicy: deschedulerPolicy,
104+
eventRecorder: eventRecorder,
105+
}, nil
106+
}
107+
108+
func (d *descheduler) runDeschedulerLoop(ctx context.Context, nodes []*v1.Node) error {
109+
loopStartDuration := time.Now()
110+
defer metrics.DeschedulerLoopDuration.With(map[string]string{}).Observe(time.Since(loopStartDuration).Seconds())
111+
112+
// if len is still <= 1 error out
113+
if len(nodes) <= 1 {
114+
klog.V(1).InfoS("The cluster size is 0 or 1 meaning eviction causes service disruption or degradation. So aborting..")
115+
return fmt.Errorf("the cluster size is 0 or 1")
116+
}
117+
118+
var client clientset.Interface
119+
// When the dry mode is enable, collect all the relevant objects (mostly pods) under a fake client.
120+
// So when evicting pods while running multiple strategies in a row have the cummulative effect
121+
// as is when evicting pods for real.
122+
if d.rs.DryRun {
123+
klog.V(3).Infof("Building a cached client from the cluster for the dry run")
124+
// Create a new cache so we start from scratch without any leftovers
125+
fakeClient, err := cachedClient(d.rs.Client, d.podLister, d.nodeLister, d.namespaceLister, d.priorityClassLister)
126+
if err != nil {
127+
return err
128+
}
129+
130+
// create a new instance of the shared informer factor from the cached client
131+
fakeSharedInformerFactory := informers.NewSharedInformerFactory(fakeClient, 0)
132+
// register the pod informer, otherwise it will not get running
133+
d.getPodsAssignedToNode, err = podutil.BuildGetPodsAssignedToNodeFunc(fakeSharedInformerFactory.Core().V1().Pods().Informer())
134+
if err != nil {
135+
return fmt.Errorf("build get pods assigned to node function error: %v", err)
136+
}
137+
138+
fakeCtx, cncl := context.WithCancel(context.TODO())
139+
defer cncl()
140+
fakeSharedInformerFactory.Start(fakeCtx.Done())
141+
fakeSharedInformerFactory.WaitForCacheSync(fakeCtx.Done())
142+
143+
client = fakeClient
144+
d.sharedInformerFactory = fakeSharedInformerFactory
145+
} else {
146+
client = d.rs.Client
147+
}
148+
149+
klog.V(3).Infof("Building a pod evictor")
150+
podEvictor := evictions.NewPodEvictor(
151+
client,
152+
d.evictionPolicyGroupVersion,
153+
d.rs.DryRun,
154+
d.deschedulerPolicy.MaxNoOfPodsToEvictPerNode,
155+
d.deschedulerPolicy.MaxNoOfPodsToEvictPerNamespace,
156+
nodes,
157+
!d.rs.DisableMetrics,
158+
d.eventRecorder,
159+
)
160+
161+
d.runProfiles(ctx, client, nodes, podEvictor)
162+
163+
klog.V(1).InfoS("Number of evicted pods", "totalEvicted", podEvictor.TotalEvicted())
164+
165+
return nil
166+
}
167+
168+
// runProfiles runs all the deschedule plugins of all profiles and
169+
// later runs through all balance plugins of all profiles. (All Balance plugins should come after all Deschedule plugins)
170+
// see https://github.com/kubernetes-sigs/descheduler/issues/979
171+
func (d *descheduler) runProfiles(ctx context.Context, client clientset.Interface, nodes []*v1.Node, podEvictor *evictions.PodEvictor) {
172+
var profileRunners []profileRunner
173+
for _, profile := range d.deschedulerPolicy.Profiles {
174+
currProfile, err := frameworkprofile.NewProfile(
175+
profile,
176+
pluginregistry.PluginRegistry,
177+
frameworkprofile.WithClientSet(client),
178+
frameworkprofile.WithSharedInformerFactory(d.sharedInformerFactory),
179+
frameworkprofile.WithPodEvictor(podEvictor),
180+
frameworkprofile.WithGetPodsAssignedToNodeFnc(d.getPodsAssignedToNode),
181+
)
182+
if err != nil {
183+
klog.ErrorS(err, "unable to create a profile", "profile", profile.Name)
184+
continue
185+
}
186+
profileRunners = append(profileRunners, profileRunner{profile.Name, currProfile.RunDeschedulePlugins, currProfile.RunBalancePlugins})
187+
}
188+
189+
for _, profileR := range profileRunners {
190+
// First deschedule
191+
status := profileR.descheduleEPs(ctx, nodes)
192+
if status != nil && status.Err != nil {
193+
klog.ErrorS(status.Err, "running deschedule extension point failed with error", "profile", profileR.name)
194+
continue
195+
}
196+
}
197+
198+
for _, profileR := range profileRunners {
199+
// Balance Later
200+
status := profileR.balanceEPs(ctx, nodes)
201+
if status != nil && status.Err != nil {
202+
klog.ErrorS(status.Err, "running balance extension point failed with error", "profile", profileR.name)
203+
continue
204+
}
205+
}
206+
}
207+
58208
func Run(ctx context.Context, rs *options.DeschedulerServer) error {
59209
metrics.Register()
60210

@@ -217,22 +367,7 @@ func cachedClient(
217367

218368
func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer, deschedulerPolicy *api.DeschedulerPolicy, evictionPolicyGroupVersion string) error {
219369
sharedInformerFactory := informers.NewSharedInformerFactory(rs.Client, 0)
220-
podInformer := sharedInformerFactory.Core().V1().Pods().Informer()
221-
podLister := sharedInformerFactory.Core().V1().Pods().Lister()
222370
nodeLister := sharedInformerFactory.Core().V1().Nodes().Lister()
223-
namespaceLister := sharedInformerFactory.Core().V1().Namespaces().Lister()
224-
priorityClassLister := sharedInformerFactory.Scheduling().V1().PriorityClasses().Lister()
225-
226-
ctx, cancel := context.WithCancel(ctx)
227-
defer cancel()
228-
229-
getPodsAssignedToNode, err := podutil.BuildGetPodsAssignedToNodeFunc(podInformer)
230-
if err != nil {
231-
return fmt.Errorf("build get pods assigned to node function error: %v", err)
232-
}
233-
234-
sharedInformerFactory.Start(ctx.Done())
235-
sharedInformerFactory.WaitForCacheSync(ctx.Done())
236371

237372
var nodeSelector string
238373
if deschedulerPolicy.NodeSelector != nil {
@@ -245,103 +380,32 @@ func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer
245380
} else {
246381
eventClient = rs.Client
247382
}
248-
249383
eventBroadcaster, eventRecorder := utils.GetRecorderAndBroadcaster(ctx, eventClient)
250384
defer eventBroadcaster.Shutdown()
251385

252-
cycleSharedInformerFactory := sharedInformerFactory
386+
descheduler, err := newDescheduler(ctx, rs, deschedulerPolicy, evictionPolicyGroupVersion, eventRecorder, sharedInformerFactory)
387+
if err != nil {
388+
return err
389+
}
390+
ctx, cancel := context.WithCancel(ctx)
391+
defer cancel()
392+
393+
sharedInformerFactory.Start(ctx.Done())
394+
sharedInformerFactory.WaitForCacheSync(ctx.Done())
253395

254396
wait.NonSlidingUntil(func() {
255-
loopStartDuration := time.Now()
256-
defer metrics.DeschedulerLoopDuration.With(map[string]string{}).Observe(time.Since(loopStartDuration).Seconds())
257397
nodes, err := nodeutil.ReadyNodes(ctx, rs.Client, nodeLister, nodeSelector)
258398
if err != nil {
259-
klog.V(1).InfoS("Unable to get ready nodes", "err", err)
399+
klog.Error(err)
260400
cancel()
261401
return
262402
}
263-
264-
if len(nodes) <= 1 {
265-
klog.V(1).InfoS("The cluster size is 0 or 1 meaning eviction causes service disruption or degradation. So aborting..")
403+
err = descheduler.runDeschedulerLoop(ctx, nodes)
404+
if err != nil {
405+
klog.Error(err)
266406
cancel()
267407
return
268408
}
269-
270-
var client clientset.Interface
271-
// When the dry mode is enable, collect all the relevant objects (mostly pods) under a fake client.
272-
// So when evicting pods while running multiple strategies in a row have the cummulative effect
273-
// as is when evicting pods for real.
274-
if rs.DryRun {
275-
klog.V(3).Infof("Building a cached client from the cluster for the dry run")
276-
// Create a new cache so we start from scratch without any leftovers
277-
fakeClient, err := cachedClient(rs.Client, podLister, nodeLister, namespaceLister, priorityClassLister)
278-
if err != nil {
279-
klog.Error(err)
280-
return
281-
}
282-
283-
// create a new instance of the shared informer factor from the cached client
284-
fakeSharedInformerFactory := informers.NewSharedInformerFactory(fakeClient, 0)
285-
// register the pod informer, otherwise it will not get running
286-
getPodsAssignedToNode, err = podutil.BuildGetPodsAssignedToNodeFunc(fakeSharedInformerFactory.Core().V1().Pods().Informer())
287-
if err != nil {
288-
klog.Errorf("build get pods assigned to node function error: %v", err)
289-
return
290-
}
291-
292-
fakeCtx, cncl := context.WithCancel(context.TODO())
293-
defer cncl()
294-
fakeSharedInformerFactory.Start(fakeCtx.Done())
295-
fakeSharedInformerFactory.WaitForCacheSync(fakeCtx.Done())
296-
297-
client = fakeClient
298-
cycleSharedInformerFactory = fakeSharedInformerFactory
299-
} else {
300-
client = rs.Client
301-
}
302-
303-
klog.V(3).Infof("Building a pod evictor")
304-
podEvictor := evictions.NewPodEvictor(
305-
client,
306-
evictionPolicyGroupVersion,
307-
rs.DryRun,
308-
deschedulerPolicy.MaxNoOfPodsToEvictPerNode,
309-
deschedulerPolicy.MaxNoOfPodsToEvictPerNamespace,
310-
nodes,
311-
!rs.DisableMetrics,
312-
eventRecorder,
313-
)
314-
315-
for _, profile := range deschedulerPolicy.Profiles {
316-
currProfile, err := frameworkprofile.NewProfile(
317-
profile,
318-
pluginregistry.PluginRegistry,
319-
frameworkprofile.WithClientSet(client),
320-
frameworkprofile.WithSharedInformerFactory(cycleSharedInformerFactory),
321-
frameworkprofile.WithPodEvictor(podEvictor),
322-
frameworkprofile.WithGetPodsAssignedToNodeFnc(getPodsAssignedToNode),
323-
)
324-
if err != nil {
325-
klog.ErrorS(err, "unable to create a profile", "profile", profile.Name)
326-
continue
327-
}
328-
329-
// First deschedule
330-
status := currProfile.RunDeschedulePlugins(ctx, nodes)
331-
if status != nil && status.Err != nil {
332-
klog.ErrorS(status.Err, "running deschedule extension point failed with error", "profile", profile.Name)
333-
continue
334-
}
335-
// Then balance
336-
status = currProfile.RunBalancePlugins(ctx, nodes)
337-
if status != nil && status.Err != nil {
338-
klog.ErrorS(status.Err, "running balance extension point failed with error", "profile", profile.Name)
339-
continue
340-
}
341-
}
342-
343-
klog.V(1).InfoS("Number of evicted pods", "totalEvicted", podEvictor.TotalEvicted())
344-
345409
// If there was no interval specified, send a signal to the stopChannel to end the wait.Until loop after 1 iteration
346410
if rs.DeschedulingInterval.Seconds() == 0 {
347411
cancel()

0 commit comments

Comments
 (0)