-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Support topology aware in the preempt action #4279
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support topology aware in the preempt action #4279
Conversation
ad7f931 to
90731b3
Compare
|
Please attach the doc. |
|
Please fix the CI |
7adfca5 to
0816c63
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR enhances the preempt action with topology awareness by adding simulation functions and new cycle state management to various plugins and session functions. Key changes include:
- Enhancements in proportion, predicates, and capacity plugins with new simulation and parallel functions.
- Updates to session framework for managing the new cycle state functions.
- Addition of new topology-aware configuration constants and design documentation.
Reviewed Changes
Copilot reviewed 15 out of 15 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
| pkg/scheduler/plugins/proportion/proportion.go | Added capability fields, new state key, simulation, and parallel allocatable functions. |
| pkg/scheduler/plugins/predicates/predicates.go | Introduced predicate state management and simulation functions. |
| pkg/scheduler/plugins/capacity/capacity.go | Enhanced simulation functions and parallel allocatable checks with hierarchical support. |
| pkg/scheduler/framework/session_plugins*.go & session.go | Added new session functions for cycle state initialization and simulation operations. |
| pkg/scheduler/conf/constants.go | Introduced new topology-aware preemption configuration constants. |
| pkg/scheduler/api/types.go | Updated Status methods (changed String() receiver to pointer) and added helper functions. |
| pkg/scheduler/actions/reclaim/reclaim.go | Updated retrieval of candidate nodes using new filtering methods. |
| docs/design/preempt-action-support-topology.md | Added design documentation for topology-aware preemption. |
Comments suppressed due to low confidence (1)
pkg/scheduler/api/types.go:170
- Changing the receiver from value to pointer in Status.String() may affect existing usages; ensure compatibility or update the API documentation accordingly.
func (s *Status) String() string {
|
|
||
| s, ok := c.(*proportionState) | ||
| if !ok { | ||
| return nil, fmt.Errorf("%+v convert to capacity.state error", c) |
Copilot
AI
May 13, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Error message in getProportionState incorrectly refers to 'capacity.state' instead of 'proportionState'; update the error text for clarity.
| return nil, fmt.Errorf("%+v convert to capacity.state error", c) | |
| return nil, fmt.Errorf("%+v convert to proportionState error", c) |
29d6408 to
436e1a3
Compare
| return &Action{ | ||
| enablePredicateErrorCache: true, | ||
| enablePredicateErrorCache: true, | ||
| enableTopologyAwarePreemption: true, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a newly introduced feature, I think this version should be set to false by default, to avoid the impact of user upgrades, and then set it to true after a few iterations of the version is stable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the default value is set to false
| return true, nil | ||
| } | ||
|
|
||
| func (pmpt *Action) taskEligibleToPreemptOthers(preemptor *api.TaskInfo) (bool, string) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is already a taskEligibleToPreempt function, which can be unified together
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
update it
| return true, "" | ||
| } | ||
|
|
||
| podPriority := PodPriority(preemptor.Pod) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better to rename as preemptorPodPriority for better clarification
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| filter func(*api.TaskInfo) bool, | ||
| predicateHelper util.PredicateHelper, | ||
| ) (bool, error) { | ||
| if err := ssn.PrePredicateFn(preemptor); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
L397-L410 is the same as in normalPreempt L277-L285(Swapping the order of PrePredicate and taskEligibleToPreempt should not affect?), I think we can extract a public func
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| } | ||
|
|
||
| func (pmpt *Action) findCandidates(preemptor *api.TaskInfo, filter func(*api.TaskInfo) bool, predicateHelper util.PredicateHelper, stmt *framework.Statement) ([]Candidate, map[string]api.Status, error) { | ||
| // we should filter out those nodes that are UnschedulableAndUnresolvable status got in allocate action |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
L486-L499 also is duplicated with L287-L290
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| // GetOffsetAndNumCandidates chooses a random offset and calculates the number | ||
| // of candidates that should be shortlisted for dry running preemption. | ||
| func (pmpt *Action) GetOffsetAndNumCandidates(numNodes int) (int, int) { | ||
| return rand.Intn(numNodes), pmpt.calculateNumCandidates(numNodes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the purpose of offset here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
offset is used to randomly select a starting point in the potentialNodes array. This helps distribute the preemption checks across different nodes and avoid always starting from the beginning of the node list, which could lead to uneven distribution of preemption attempts.
| return nil | ||
| } | ||
|
|
||
| // InitCycleStateFn 调用插件的初始化CycleState函数 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please translate all the Chinese comments into English
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| statusesLock.Unlock() | ||
| } | ||
| klog.Infof("the worker number is %d, the potentialNodes number is %d", 160, len(potentialNodes)) | ||
| workqueue.ParallelizeUntil(ctx, 160, len(potentialNodes), checkNode) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Default worker size is 160 or 16?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the default value should be 16, and have been updated
| return newState | ||
| } | ||
|
|
||
| func updateQueueAttrShare(attr *queueAttr) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Duplicate with cp.updateShare, only different at capacity's updateShared method will do one more step to update metrics: metrics.UpdateQueueShare(attr.name, attr.share), which we can unify the logic together
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| k8sNodeInfo := k8sframework.NewNodeInfo(nodeInfo.Pods()...) | ||
| k8sNodeInfo.SetNode(nodeInfo.Node) | ||
|
|
||
| if predicate.podAffinityEnable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only contains podAffinity is enough?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
other plugin can be added in the future
| return !overused | ||
| }) | ||
|
|
||
| // queueAllocatable := func(queue *api.QueueInfo, candidate *api.TaskInfo) bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
useless comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
deleted
| // return allocatable | ||
| // } | ||
|
|
||
| queueConcurrentAllocatable := func(state *capacityState, queue *api.QueueInfo, candidate *api.TaskInfo) bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see the difference with capacity's own queueAllocatable, it doesn't reflect concurrent, I think it's not necessary to add a new function to pass one more cycleState,
volcano/pkg/scheduler/plugins/capacity/capacity.go
Lines 724 to 734 in d2c9211
| func (cp *capacityPlugin) queueAllocatable(queue *api.QueueInfo, candidate *api.TaskInfo) bool { | |
| attr := cp.queueOpts[queue.UID] | |
| futureUsed := attr.allocated.Clone().Add(candidate.Resreq) | |
| allocatable := futureUsed.LessEqualWithDimension(attr.realCapability, candidate.Resreq) | |
| if !allocatable { | |
| klog.V(3).Infof("Queue <%v>: realCapability <%v>, allocated <%v>; Candidate <%v>: resource request <%v>", | |
| queue.Name, attr.realCapability, attr.allocated, candidate.Name, candidate.Resreq) | |
| } | |
| return allocatable | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function is still retained. Can it not be reused? The only difference is the state parameter. In addition, can the name Concurrent be changed to be like Simulate?
| } | ||
|
|
||
| // ConcurrentAllocatableFn 调用插件的并发分配函数 | ||
| func (ssn *Session) ConcurrentAllocatableFn(ctx context.Context, state *k8sframework.CycleState, queue *api.QueueInfo, task *api.TaskInfo) bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where does ConcurrentAllocatableFn reflect Concurrent? I didn't see it. And the name will be easy to confuse users, there are both AllocatableFn and ConcurrentAllocatableFn
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
| return true | ||
| } | ||
|
|
||
| // ParallelPredicateFn 调用插件的并发断言函数 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as ParallelPredicateFn, didn't see codes reflect the concept of Parallel
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated.
| ssn.jobStarvingFns[name] = fn | ||
| } | ||
|
|
||
| func (ssn *Session) AddInitCycleStateFn(name string, fn api.InitCycleStateFn) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#4152, In this PR, I added a cycleStateMap to the session to store the cycleState of different tasks. Since cycleState may not only be used in predicates plugins. I think there is no need to add a new extension point called InitCycleStateFn? I think it is very likely that this extension point will only be used for simulation in preempt action. Therefore we can do it this way: Before simulation starts, we can clear the cycleStateMap again, and then simulateXXXFn can directly get the cycleState from this cycleStateMap. There is no need to add so many new extension points, which will confuse users. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
have removed this function and use the cache in the session
| queueAttrs map[api.QueueID]*queueAttr | ||
| } | ||
|
|
||
| // Clone 深拷贝 queueAttr 对象,包括其所有子节点 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove useless Chinese comments or translate it into Engligh
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated.
| return s, nil | ||
| } | ||
|
|
||
| func updateQueueAttrShare(attr *queueAttr) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also has duplicate codes with proportion's own updateShare method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where was it changed? Isn't the updateshare function of proportion still kept on line 400?
|
|
||
| ### Key Function Modifications | ||
|
|
||
| - `GetBestNodeByPreemptCost`: A function that finds the best node for preemption by calculating and comparing preemption costs. It takes a list of candidate nodes and their corresponding victim pods, iterates through them to compute the cost of preempting victims on each node using the provided cost function, and returns the node with the minimum preemption cost. This helps select the most suitable node that minimizes the impact of preemption. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't see GetBestNodeByPreemptCost and PreemptCostNodeOrderFn in codes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated.
pkg/scheduler/api/types.go
Outdated
| // NodeOrderReduceFn is the func declaration used to reduce priority score of all nodes for a plugin for a particular task. | ||
| type NodeOrderReduceFn func(*TaskInfo, map[string]k8sframework.NodeScoreList) (map[string]float64, error) | ||
|
|
||
| // PreemptOrderMapFn is the func declaration used to calculate the score of a node for evicting victims |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't see PreemptOrderMapFn being used in the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated.
pkg/scheduler/api/types.go
Outdated
| // ParallelAllocatableFn is the function declaration used to determine if a task can run on a node in concurrent scenarios. | ||
| type ParallelAllocatableFn func(ctx context.Context, state *k8sframework.CycleState, queue *QueueInfo, task *TaskInfo) bool | ||
|
|
||
| type EvictCostFn func(ctx context.Context, state *k8sframework.CycleState, task *TaskInfo, nodeInfo *NodeInfo) (int, error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Didn't see EvictCostFn being used in the code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated.
|
|
||
| // Candidate represents a nominated node on which the preemptor can be scheduled, | ||
| // along with the list of victims that should be evicted for the preemptor to fit the node. | ||
| type Candidate interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it necessary to define a interface? Seems that it only returns its victims list and name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, its for candidateList to manage candidates
| } | ||
|
|
||
| if !fits { | ||
| if err := removeTask(pi); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is is necessary to do removeTask again?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we dont remove again, it will affect next predicate
|
|
||
| // pickOneNodeForPreemption chooses one node among the given nodes. | ||
| // It assumes pods in each map entry are ordered by decreasing priority. | ||
| // If the scoreFuns is not empty, It picks a node based on score scoreFuns returns. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo scoreFuns ----> scoreFuncs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| } | ||
|
|
||
| func OrderedScoreFuncs(nodesToVictims map[string][]*api.TaskInfo) []func(node string) int64 { | ||
| return nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return nil? Does it need to be modified later?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add todo
|
I think this PR should add e2e test cases, and it would be better to make the test scenarios clear in the docs |
66872d1 to
38579b2
Compare
38579b2 to
0f5150c
Compare
0f5150c to
563f3b6
Compare
563f3b6 to
9aa38be
Compare
7b6a9bb to
c909caa
Compare
Signed-off-by: Box Zhang <[email protected]>
c909caa to
9a18058
Compare
have add the predicate e2e test case |
|
|
/lgtm |
pkg/scheduler/api/types.go
Outdated
| // SimulateAddTaskFn is the func declaration used to simulate the result of adding a task to a node. | ||
| type SimulateAddTaskFn func(ctx context.Context, state *k8sframework.CycleState, taskToSchedule *TaskInfo, taskInfoToAdd *TaskInfo, nodeInfo *NodeInfo) error | ||
|
|
||
| // SimulatePredicateFn is the function declaration used to determine if a task can run on a node in parallel scenarios. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comments here need to be consistent with the documentation: Simulate the predicate check for a task on a node, plugins implement this function to verify if the task can be scheduled to the node while maintaining topology constraints
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
pkg/scheduler/api/types.go
Outdated
| // It ensures thread safety and state consistency in parallel environments. | ||
| type SimulatePredicateFn func(ctx context.Context, state *k8sframework.CycleState, task *TaskInfo, nodeInfo *NodeInfo) error | ||
|
|
||
| // SimulateAllocatableFn is the function declaration used to determine if a task can run on a node in concurrent scenarios. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as SimulateAllocatableFn, the comments here is inconsistent with the doc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
| // return allocatable | ||
| // } | ||
|
|
||
| queueConcurrentAllocatable := func(state *capacityState, queue *api.QueueInfo, candidate *api.TaskInfo) bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function is still retained. Can it not be reused? The only difference is the state parameter. In addition, can the name Concurrent be changed to be like Simulate?
| return s, nil | ||
| } | ||
|
|
||
| func updateQueueAttrShare(attr *queueAttr) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where was it changed? Isn't the updateshare function of proportion still kept on line 400?
| } | ||
|
|
||
| attr.share = res | ||
| metrics.UpdateQueueShare(attr.name, attr.share) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should not update metrics when simulate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
Signed-off-by: Box Zhang <[email protected]>
9a18058 to
e9040d3
Compare
|
/approve |
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: wangyang0616 The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
|
/lgtm |
What type of PR is this?
/kind feature
improve the preempt action to support topology aware
What this PR does / why we need it:
For more details, please refer to the proposal content.