Skip to content

Commit 7c0b0d9

Browse files
Yongjun Zhangwilfred-s
authored andcommitted
[YUNIKORN-1848] Report resource used by preempted pods in the app summary (#649)
Closes: #649 Signed-off-by: Peter Bacsko <[email protected]>
1 parent 0a13235 commit 7c0b0d9

File tree

6 files changed

+100
-44
lines changed

6 files changed

+100
-44
lines changed

pkg/common/resources/resources.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -47,43 +47,43 @@ func (q Quantity) string() string {
4747
}
4848

4949
// Util struct to keep track of application resource usage
50-
type UsedResource struct {
50+
type TrackedResource struct {
5151
// Two level map for aggregated resource usage
5252
// With instance type being the top level key, the mapped value is a map:
5353
// resource type (CPU, memory etc) -> the aggregated used time (in seconds) of the resource type
5454
//
55-
UsedResourceMap map[string]map[string]int64
55+
TrackedResourceMap map[string]map[string]int64
5656

5757
sync.RWMutex
5858
}
5959

60-
func (ur *UsedResource) Clone() *UsedResource {
60+
func (ur *TrackedResource) Clone() *TrackedResource {
6161
if ur == nil {
6262
return nil
6363
}
64-
ret := NewUsedResource()
64+
ret := NewTrackedResource()
6565
ur.RLock()
6666
defer ur.RUnlock()
67-
for k, v := range ur.UsedResourceMap {
67+
for k, v := range ur.TrackedResourceMap {
6868
dest := make(map[string]int64)
6969
for key, element := range v {
7070
dest[key] = element
7171
}
72-
ret.UsedResourceMap[k] = dest
72+
ret.TrackedResourceMap[k] = dest
7373
}
7474
return ret
7575
}
7676

7777
// Aggregate the resource usage to UsedResourceMap[instType]
7878
// The time the given resource used is the delta between the resource createTime and currentTime
79-
func (ur *UsedResource) AggregateUsedResource(instType string,
79+
func (ur *TrackedResource) AggregateTrackedResource(instType string,
8080
resource *Resource, bindTime time.Time) {
8181
ur.Lock()
8282
defer ur.Unlock()
8383

8484
releaseTime := time.Now()
8585
timeDiff := int64(releaseTime.Sub(bindTime).Seconds())
86-
aggregatedResourceTime, ok := ur.UsedResourceMap[instType]
86+
aggregatedResourceTime, ok := ur.TrackedResourceMap[instType]
8787
if !ok {
8888
aggregatedResourceTime = map[string]int64{}
8989
}
@@ -95,7 +95,7 @@ func (ur *UsedResource) AggregateUsedResource(instType string,
9595
curUsage += int64(element) * timeDiff // resource size times timeDiff
9696
aggregatedResourceTime[key] = curUsage
9797
}
98-
ur.UsedResourceMap[instType] = aggregatedResourceTime
98+
ur.TrackedResourceMap[instType] = aggregatedResourceTime
9999
}
100100

101101
// Never update value of Zero
@@ -157,8 +157,8 @@ func NewResourceFromConf(configMap map[string]string) (*Resource, error) {
157157
return res, nil
158158
}
159159

160-
func NewUsedResource() *UsedResource {
161-
return &UsedResource{UsedResourceMap: make(map[string]map[string]int64)}
160+
func NewTrackedResource() *TrackedResource {
161+
return &TrackedResource{TrackedResourceMap: make(map[string]map[string]int64)}
162162
}
163163

164164
func (r *Resource) String() string {

pkg/scheduler/objects/application.go

Lines changed: 58 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,8 @@ type Application struct {
8787
user security.UserGroup // owner of the application
8888
allocatedResource *resources.Resource // total allocated resources
8989

90-
usedResource *resources.UsedResource // keep track of resource usage of the application
90+
usedResource *resources.TrackedResource // keep track of resource usage of the application
91+
preemptedResource *resources.TrackedResource // keep track of preempted resource usage of the application
9192

9293
maxAllocatedResource *resources.Resource // max allocated resources
9394
allocatedPlaceholder *resources.Resource // total allocated placeholder resources
@@ -117,15 +118,16 @@ type Application struct {
117118
}
118119

119120
type ApplicationSummary struct {
120-
ApplicationID string
121-
SubmissionTime time.Time
122-
StartTime time.Time
123-
FinishTime time.Time
124-
User string
125-
Queue string
126-
State string
127-
RmID string
128-
ResourceUsage *resources.UsedResource
121+
ApplicationID string
122+
SubmissionTime time.Time
123+
StartTime time.Time
124+
FinishTime time.Time
125+
User string
126+
Queue string
127+
State string
128+
RmID string
129+
ResourceUsage *resources.TrackedResource
130+
PreemptedResource *resources.TrackedResource
129131
}
130132

131133
func (as *ApplicationSummary) DoLogging() {
@@ -138,24 +140,28 @@ func (as *ApplicationSummary) DoLogging() {
138140
zap.String("queue", as.Queue),
139141
zap.String("state", as.State),
140142
zap.String("rmID", as.RmID),
141-
zap.Any("resourceUsage", as.ResourceUsage.UsedResourceMap))
143+
zap.Any("resourceUsage", as.ResourceUsage.TrackedResourceMap),
144+
zap.Any("preemptedResource", as.PreemptedResource.TrackedResourceMap),
145+
)
142146
}
143147

144148
func (sa *Application) GetApplicationSummary(rmID string) *ApplicationSummary {
145-
state := sa.stateMachine.Current()
146-
ru := sa.usedResource.Clone()
147149
sa.RLock()
148150
defer sa.RUnlock()
151+
state := sa.stateMachine.Current()
152+
ru := sa.usedResource.Clone()
153+
pu := sa.preemptedResource.Clone()
149154
appSummary := &ApplicationSummary{
150-
ApplicationID: sa.ApplicationID,
151-
SubmissionTime: sa.SubmissionTime,
152-
StartTime: sa.startTime,
153-
FinishTime: sa.finishedTime,
154-
User: sa.user.User,
155-
Queue: sa.queuePath,
156-
State: state,
157-
RmID: rmID,
158-
ResourceUsage: ru,
155+
ApplicationID: sa.ApplicationID,
156+
SubmissionTime: sa.SubmissionTime,
157+
StartTime: sa.startTime,
158+
FinishTime: sa.finishedTime,
159+
User: sa.user.User,
160+
Queue: sa.queuePath,
161+
State: state,
162+
RmID: rmID,
163+
ResourceUsage: ru,
164+
PreemptedResource: pu,
159165
}
160166
return appSummary
161167
}
@@ -169,7 +175,8 @@ func NewApplication(siApp *si.AddApplicationRequest, ugi security.UserGroup, eve
169175
tags: siApp.Tags,
170176
pending: resources.NewResource(),
171177
allocatedResource: resources.NewResource(),
172-
usedResource: resources.NewUsedResource(),
178+
usedResource: resources.NewTrackedResource(),
179+
preemptedResource: resources.NewTrackedResource(),
173180
maxAllocatedResource: resources.NewResource(),
174181
allocatedPlaceholder: resources.NewResource(),
175182
requests: make(map[string]*AllocationAsk),
@@ -1673,10 +1680,26 @@ func (sa *Application) decUserResourceUsage(resource *resources.Resource, remove
16731680
ugm.GetUserManager().DecreaseTrackedResource(sa.queuePath, sa.ApplicationID, resource, sa.user, removeApp)
16741681
}
16751682

1683+
// Track used and preempted resources
1684+
func (sa *Application) trackCompletedResource(info *Allocation) {
1685+
if info.IsPreempted() {
1686+
sa.updatePreemptedResource(info)
1687+
} else {
1688+
sa.updateUsedResource(info)
1689+
}
1690+
}
1691+
16761692
// When the resource allocated with this allocation is to be removed,
16771693
// have the usedResource to aggregate the resource used by this allocation
16781694
func (sa *Application) updateUsedResource(info *Allocation) {
1679-
sa.usedResource.AggregateUsedResource(info.GetInstanceType(),
1695+
sa.usedResource.AggregateTrackedResource(info.GetInstanceType(),
1696+
info.GetAllocatedResource(), info.GetBindTime())
1697+
}
1698+
1699+
// When the resource allocated with this allocation is to be preempted,
1700+
// have the preemptedResource to aggregate the resource used by this allocation
1701+
func (sa *Application) updatePreemptedResource(info *Allocation) {
1702+
sa.preemptedResource.AggregateTrackedResource(info.GetInstanceType(),
16801703
info.GetAllocatedResource(), info.GetBindTime())
16811704
}
16821705

@@ -1772,8 +1795,8 @@ func (sa *Application) removeAllocationInternal(uuid string, releaseType si.Term
17721795
} else {
17731796
sa.allocatedResource = resources.Sub(sa.allocatedResource, alloc.GetAllocatedResource())
17741797

1775-
// Aggregate the resources used by this alloc to the application's user resource tracker
1776-
sa.updateUsedResource(alloc)
1798+
// Aggregate the resources used by this alloc to the application's resource tracker
1799+
sa.trackCompletedResource(alloc)
17771800

17781801
// When the resource trackers are zero we should not expect anything to come in later.
17791802
if sa.hasZeroAllocations() {
@@ -1825,7 +1848,7 @@ func (sa *Application) RemoveAllAllocations() []*Allocation {
18251848
for _, alloc := range sa.allocations {
18261849
allocationsToRelease = append(allocationsToRelease, alloc)
18271850
// Aggregate the resources used by this alloc to the application's user resource tracker
1828-
sa.updateUsedResource(alloc)
1851+
sa.trackCompletedResource(alloc)
18291852
sa.appEvents.sendRemoveAllocationEvent(alloc, si.TerminationType_STOPPED_BY_RM)
18301853
}
18311854

@@ -2008,8 +2031,15 @@ func (sa *Application) cleanupAsks() {
20082031
sa.sortedRequests = nil
20092032
}
20102033

2011-
func (sa *Application) CleanupUsedResource() {
2034+
func (sa *Application) cleanupTrackedResource() {
20122035
sa.usedResource = nil
2036+
sa.preemptedResource = nil
2037+
}
2038+
2039+
func (sa *Application) CleanupTrackedResource() {
2040+
sa.Lock()
2041+
defer sa.Unlock()
2042+
sa.cleanupTrackedResource()
20132043
}
20142044

20152045
func (sa *Application) LogAppSummary(rmID string) {

pkg/scheduler/objects/application_state.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ func NewAppState() *fsm.FSM {
199199
metrics.GetSchedulerMetrics().IncTotalApplicationsRejected()
200200
app.setStateTimer(terminatedTimeout, app.stateMachine.Current(), ExpireApplication)
201201
app.finishedTime = time.Now()
202-
app.CleanupUsedResource()
202+
app.cleanupTrackedResource()
203203
// No rejected message when use app.HandleApplicationEvent(RejectApplication)
204204
if len(event.Args) == 2 {
205205
app.rejectedMessage = event.Args[1].(string) //nolint:errcheck

pkg/scheduler/objects/application_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1062,7 +1062,7 @@ func TestCompleted(t *testing.T) {
10621062
}
10631063

10641064
func assertResourceUsage(t *testing.T, appSummary *ApplicationSummary, memorySeconds int64, vcoresSecconds int64) {
1065-
detailedResource := appSummary.ResourceUsage.UsedResourceMap[instType1]
1065+
detailedResource := appSummary.ResourceUsage.TrackedResourceMap[instType1]
10661066
assert.Equal(t, memorySeconds, detailedResource["memory"])
10671067
assert.Equal(t, vcoresSecconds, detailedResource["vcores"])
10681068
}

pkg/scheduler/partition.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1462,7 +1462,7 @@ func (pc *PartitionContext) moveTerminatedApp(appID string) {
14621462
zap.String("appID", appID),
14631463
zap.String("app status", app.CurrentState()))
14641464
app.LogAppSummary(pc.RmID)
1465-
app.CleanupUsedResource()
1465+
app.CleanupTrackedResource()
14661466
pc.Lock()
14671467
defer pc.Unlock()
14681468
delete(pc.applications, appID)

pkg/scheduler/partition_test.go

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1815,9 +1815,28 @@ func TestRequiredNodeAllocation(t *testing.T) {
18151815
assertLimits(t, getTestUserGroup(), resources.Multiply(res, 2))
18161816
}
18171817

1818+
func assertPreemptedResource(t *testing.T, appSummary *objects.ApplicationSummary, memorySeconds int64,
1819+
vcoresSecconds int64) {
1820+
detailedResource := appSummary.PreemptedResource.TrackedResourceMap["UNKNOWN"]
1821+
memValue, memPresent := detailedResource["memory"]
1822+
vcoreValue, vcorePresent := detailedResource["vcore"]
1823+
1824+
if memorySeconds != -1 {
1825+
assert.Equal(t, memorySeconds, memValue)
1826+
} else {
1827+
assert.Equal(t, memPresent, false)
1828+
}
1829+
1830+
if vcoresSecconds != -1 {
1831+
assert.Equal(t, vcoresSecconds, vcoreValue)
1832+
} else {
1833+
assert.Equal(t, vcorePresent, false)
1834+
}
1835+
}
1836+
18181837
func TestPreemption(t *testing.T) {
18191838
setupUGM()
1820-
partition, _, app2, alloc1, alloc2 := setupPreemption(t)
1839+
partition, app1, app2, alloc1, alloc2 := setupPreemption(t)
18211840

18221841
res, err := resources.NewResourceFromConf(map[string]string{"vcore": "5"})
18231842
assert.NilError(t, err, "failed to create resource")
@@ -1828,7 +1847,8 @@ func TestPreemption(t *testing.T) {
18281847
assert.NilError(t, err, "failed to add ask alloc-3 to app-2")
18291848

18301849
// delay so that preemption delay passes
1831-
time.Sleep(100 * time.Millisecond)
1850+
// also make the delay 1 second to have a minimum non-zero resource*seconds measurement for preempted resources
1851+
time.Sleep(time.Second)
18321852

18331853
// third allocation should not succeed, as we are currently above capacity
18341854
alloc := partition.tryAllocate()
@@ -1873,6 +1893,12 @@ func TestPreemption(t *testing.T) {
18731893
assert.Equal(t, alloc.GetResult(), objects.Allocated, "result should be allocated")
18741894
assert.Equal(t, alloc.GetAllocationKey(), allocID3, "expected ask alloc-3 to be allocated")
18751895
assertUserGroupResourceMaxLimits(t, getTestUserGroup(), resources.NewResourceFromMap(map[string]resources.Quantity{"vcore": 10000}), getExpectedQueuesLimitsForPreemption())
1896+
1897+
appSummary := app1.GetApplicationSummary("default")
1898+
assertPreemptedResource(t, appSummary, -1, 5000)
1899+
1900+
appSummary = app2.GetApplicationSummary("default")
1901+
assertPreemptedResource(t, appSummary, -1, 0)
18761902
}
18771903

18781904
// Preemption followed by a normal allocation

0 commit comments

Comments
 (0)