Skip to content

Commit 68d1a7a

Browse files
kingamartonyangwwei
authored andcommitted
[YUNIKORN-574] Wait for placeholder cleanup (#260)
With this fix a new state is introduced: Failing state. Also, Waiting state is renamed to Completing. When the scheduler cleans up the placeholders, the core notifies the shim to release allocations/asks wait and wait for the confirmation before moving the application into the terminating state (Completed or Failed).
1 parent 84472e1 commit 68d1a7a

File tree

7 files changed

+170
-179
lines changed

7 files changed

+170
-179
lines changed

pkg/scheduler/objects/application.go

Lines changed: 50 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,10 @@ import (
4040
)
4141

4242
var (
43-
reservationDelay = 2 * time.Second
44-
startingTimeout = 5 * time.Minute
45-
waitingTimeout = 30 * time.Second
46-
completedTimeout = 3 * 24 * time.Hour
43+
reservationDelay = 2 * time.Second
44+
startingTimeout = 5 * time.Minute
45+
completingTimeout = 30 * time.Second
46+
terminatedTimeout = 3 * 24 * time.Hour
4747
defaultPlaceholderTimeout = 15 * time.Minute
4848
)
4949

@@ -146,8 +146,8 @@ func (sa *Application) IsRunning() bool {
146146
return sa.stateMachine.Is(Running.String())
147147
}
148148

149-
func (sa *Application) IsWaiting() bool {
150-
return sa.stateMachine.Is(Waiting.String())
149+
func (sa *Application) IsCompleting() bool {
150+
return sa.stateMachine.Is(Completing.String())
151151
}
152152

153153
func (sa *Application) IsCompleted() bool {
@@ -158,6 +158,10 @@ func (sa *Application) IsExpired() bool {
158158
return sa.stateMachine.Is(Expired.String())
159159
}
160160

161+
func (sa *Application) IsFailing() bool {
162+
return sa.stateMachine.Is(Failing.String())
163+
}
164+
161165
func (sa *Application) IsFailed() bool {
162166
return sa.stateMachine.Is(Failed.String())
163167
}
@@ -208,13 +212,13 @@ func (sa *Application) setStateTimer(timeout time.Duration, currentState string,
208212
func (sa *Application) timeoutStateTimer(expectedState string, event applicationEvent) func() {
209213
return func() {
210214
// make sure we are still in the right state
211-
// we could have been killed or something might have happened while waiting for a lock
215+
// we could have been failed or something might have happened while waiting for a lock
212216
if expectedState == sa.stateMachine.Current() {
213217
log.Logger().Debug("Application state: auto progress",
214218
zap.String("applicationID", sa.ApplicationID),
215219
zap.String("state", sa.stateMachine.Current()))
216-
// if the app is waiting, but there are placeholders left, first do the cleanup
217-
if sa.IsWaiting() && !resources.IsZero(sa.GetPlaceholderResource()) {
220+
// if the app is completing, but there are placeholders left, first do the cleanup
221+
if sa.IsCompleting() && !resources.IsZero(sa.GetPlaceholderResource()) {
218222
sa.notifyRMAllocationReleased(sa.rmID, sa.getPlaceholderAllocations(), si.TerminationType_TIMEOUT, "releasing placeholders on app complete")
219223
sa.clearStateTimer()
220224
} else {
@@ -239,9 +243,6 @@ func (sa *Application) clearStateTimer() {
239243
zap.String("state", sa.stateMachine.Current()))
240244
}
241245

242-
func (sa *Application) isWaitingStateTimedOut() bool {
243-
return sa.IsWaiting() && sa.stateTimer == nil
244-
}
245246
func (sa *Application) initPlaceholderTimer() {
246247
if sa.placeholderTimer != nil || !sa.IsAccepted() || sa.execTimeout <= 0 {
247248
return
@@ -271,16 +272,17 @@ func (sa *Application) timeoutPlaceholderProcessing() {
271272
}
272273
// Case 2: in every other case fail the application, and notify the context about the expired placeholder asks
273274
default:
274-
sa.notifyRMAllocationAskReleased(sa.rmID, sa.getAllRequests(), si.TerminationType_TIMEOUT, "releasing placeholders on placeholder timeout")
275-
sa.removeAsksInternal("")
275+
// change the status of the app to Failing. Once all the placeholders are cleaned up, if will be changed to Failed
276276
if err := sa.HandleApplicationEvent(FailApplication); err != nil {
277277
log.Logger().Debug("Application state change failed when placeholder timed out",
278278
zap.String("AppID", sa.ApplicationID),
279279
zap.String("currentState", sa.CurrentState()),
280280
zap.Error(err))
281281
}
282+
sa.notifyRMAllocationAskReleased(sa.rmID, sa.getAllRequests(), si.TerminationType_TIMEOUT, "releasing placeholders asks on placeholder timeout")
283+
sa.removeAsksInternal("")
282284
}
283-
sa.notifyRMAllocationReleased(sa.rmID, sa.getPlaceholderAllocations(), si.TerminationType_TIMEOUT, "releasing placeholders on placeholder timeout")
285+
sa.notifyRMAllocationReleased(sa.rmID, sa.getPlaceholderAllocations(), si.TerminationType_TIMEOUT, "releasing allocated placeholders on placeholder timeout")
284286
sa.clearPlaceholderTimer()
285287
}
286288

@@ -397,12 +399,11 @@ func (sa *Application) removeAsksInternal(allocKey string) int {
397399
// Check if we need to change state based on the ask removal:
398400
// 1) if pending is zero (no more asks left)
399401
// 2) if confirmed allocations is zero (no real tasks running)
400-
// 3) if placeholder allocations is zero (no placeholders running)
401-
// Change the state to waiting.
402+
// Change the state to completing.
402403
// When the resource trackers are zero we should not expect anything to come in later.
403-
if resources.IsZero(sa.pending) && resources.IsZero(sa.allocatedResource) {
404-
if err := sa.HandleApplicationEvent(WaitApplication); err != nil {
405-
log.Logger().Warn("Application state not changed to Waiting while updating ask(s)",
404+
if resources.IsZero(sa.pending) && resources.IsZero(sa.allocatedResource) && !sa.IsFailing() {
405+
if err := sa.HandleApplicationEvent(CompleteApplication); err != nil {
406+
log.Logger().Warn("Application state not changed to Completing while updating ask(s)",
406407
zap.String("currentState", sa.CurrentState()),
407408
zap.Error(err))
408409
}
@@ -437,9 +438,9 @@ func (sa *Application) AddAllocationAsk(ask *AllocationAsk) error {
437438

438439
// Check if we need to change state based on the ask added, there are two cases:
439440
// 1) first ask added on a new app: state is New
440-
// 2) all asks and allocation have been removed: state is Waiting
441+
// 2) all asks and allocation have been removed: state is Completing
441442
// Move the state and get it scheduling (again)
442-
if sa.stateMachine.Is(New.String()) || sa.stateMachine.Is(Waiting.String()) {
443+
if sa.stateMachine.Is(New.String()) || sa.stateMachine.Is(Completing.String()) {
443444
if err := sa.HandleApplicationEvent(RunApplication); err != nil {
444445
log.Logger().Debug("Application state change failed while adding new ask",
445446
zap.String("currentState", sa.CurrentState()),
@@ -1185,21 +1186,25 @@ func (sa *Application) removeAllocationInternal(uuid string) *Allocation {
11851186
// if all the placeholders are replaced, clear the placeholder timer
11861187
if resources.IsZero(sa.allocatedPlaceholder) {
11871188
sa.clearPlaceholderTimer()
1189+
if (sa.IsCompleting() && sa.stateTimer == nil) || sa.IsFailing() {
1190+
event := CompleteApplication
1191+
if sa.IsFailing() {
1192+
event = FailApplication
1193+
}
1194+
if err := sa.HandleApplicationEvent(event); err != nil {
1195+
log.Logger().Warn("Application state not changed while removing a placeholder allocation",
1196+
zap.String("currentState", sa.CurrentState()),
1197+
zap.String("event", event.String()),
1198+
zap.Error(err))
1199+
}
1200+
}
11881201
}
11891202
} else {
11901203
sa.allocatedResource = resources.Sub(sa.allocatedResource, alloc.AllocatedResource)
1191-
}
1192-
// When the resource trackers are zero we should not expect anything to come in later.
1193-
if resources.IsZero(sa.pending) && resources.IsZero(sa.allocatedResource) {
1194-
if sa.isWaitingStateTimedOut() && resources.IsZero(sa.allocatedPlaceholder) {
1204+
// When the resource trackers are zero we should not expect anything to come in later.
1205+
if resources.IsZero(sa.pending) && resources.IsZero(sa.allocatedResource) {
11951206
if err := sa.HandleApplicationEvent(CompleteApplication); err != nil {
1196-
log.Logger().Warn("Application state not changed to Completed while removing some allocation(s)",
1197-
zap.String("currentState", sa.CurrentState()),
1198-
zap.Error(err))
1199-
}
1200-
} else {
1201-
if err := sa.HandleApplicationEvent(WaitApplication); err != nil {
1202-
log.Logger().Warn("Application state not changed to Waiting while removing some allocation(s)",
1207+
log.Logger().Warn("Application state not changed to Waiting while removing an allocation",
12031208
zap.String("currentState", sa.CurrentState()),
12041209
zap.Error(err))
12051210
}
@@ -1225,7 +1230,7 @@ func (sa *Application) RemoveAllAllocations() []*Allocation {
12251230
sa.allocations = make(map[string]*Allocation)
12261231
// When the resource trackers are zero we should not expect anything to come in later.
12271232
if resources.IsZero(sa.pending) {
1228-
if err := sa.HandleApplicationEvent(WaitApplication); err != nil {
1233+
if err := sa.HandleApplicationEvent(CompleteApplication); err != nil {
12291234
log.Logger().Warn("Application state not changed to Waiting while removing all allocations",
12301235
zap.String("currentState", sa.CurrentState()),
12311236
zap.Error(err))
@@ -1303,3 +1308,14 @@ func (sa *Application) notifyRMAllocationAskReleased(rmID string, released []*Al
13031308
}
13041309
sa.rmEventHandler.HandleEvent(releaseEvent)
13051310
}
1311+
1312+
// Auto progress the application when it enters the Failing state if there is nothing to clean up.
1313+
// Since this is called by the _locked_ state machine while processing an event we cannot call back
1314+
// into the statemachine directly and we need a go routine to avoid a deadlock.
1315+
func (sa *Application) failAppIfPossible() {
1316+
if resources.IsZero(sa.pending) && resources.IsZero(sa.allocatedResource) && resources.IsZero(sa.allocatedPlaceholder) {
1317+
// The event handling cannot fail
1318+
//nolint: errcheck
1319+
go sa.HandleApplicationEvent(FailApplication)
1320+
}
1321+
}

pkg/scheduler/objects/application_state.go

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -38,15 +38,14 @@ type applicationEvent int
3838

3939
const (
4040
RunApplication applicationEvent = iota
41-
WaitApplication
4241
RejectApplication
4342
CompleteApplication
4443
FailApplication
4544
ExpireApplication
4645
)
4746

4847
func (ae applicationEvent) String() string {
49-
return [...]string{"runApplication", "waitApplication", "rejectApplication", "completeApplication", "KillApplication", "expireApplication"}[ae]
48+
return [...]string{"runApplication", "rejectApplication", "completeApplication", "failApplication", "expireApplication"}[ae]
5049
}
5150

5251
// ----------------------------------
@@ -59,15 +58,16 @@ const (
5958
Accepted
6059
Starting
6160
Running
62-
Waiting
6361
Rejected
62+
Completing
6463
Completed
64+
Failing
6565
Failed
6666
Expired
6767
)
6868

6969
func (as applicationState) String() string {
70-
return [...]string{"New", "Accepted", "Starting", "Running", "Waiting", "Rejected", "Completed", "Failed", "Expired"}[as]
70+
return [...]string{"New", "Accepted", "Starting", "Running", "Rejected", "Completing", "Completed", "Failing", "Failed", "Expired"}[as]
7171
}
7272

7373
func NewAppState() *fsm.FSM {
@@ -87,23 +87,27 @@ func NewAppState() *fsm.FSM {
8787
Dst: Starting.String(),
8888
}, {
8989
Name: RunApplication.String(),
90-
Src: []string{Running.String(), Starting.String(), Waiting.String()},
90+
Src: []string{Running.String(), Starting.String(), Completing.String()},
9191
Dst: Running.String(),
9292
}, {
9393
Name: CompleteApplication.String(),
94-
Src: []string{Running.String(), Starting.String(), Waiting.String()},
94+
Src: []string{Accepted.String(), Running.String(), Starting.String()},
95+
Dst: Completing.String(),
96+
}, {
97+
Name: CompleteApplication.String(),
98+
Src: []string{Completing.String()},
9599
Dst: Completed.String(),
96100
}, {
97-
Name: WaitApplication.String(),
98-
Src: []string{Accepted.String(), Running.String(), Starting.String()},
99-
Dst: Waiting.String(),
101+
Name: FailApplication.String(),
102+
Src: []string{Accepted.String(), New.String(), Running.String(), Starting.String(), Completing.String()},
103+
Dst: Failing.String(),
100104
}, {
101105
Name: FailApplication.String(),
102-
Src: []string{Accepted.String(), Failed.String(), New.String(), Running.String(), Starting.String(), Waiting.String()},
106+
Src: []string{Failing.String()},
103107
Dst: Failed.String(),
104108
}, {
105109
Name: ExpireApplication.String(),
106-
Src: []string{Completed.String()},
110+
Src: []string{Completed.String(), Failed.String()},
107111
Dst: Expired.String(),
108112
},
109113
},
@@ -127,8 +131,8 @@ func NewAppState() *fsm.FSM {
127131
fmt.Sprintf("enter_%s", Starting.String()): func(event *fsm.Event) {
128132
setTimer(startingTimeout, event, RunApplication)
129133
},
130-
fmt.Sprintf("enter_%s", Waiting.String()): func(event *fsm.Event) {
131-
setTimer(waitingTimeout, event, CompleteApplication)
134+
fmt.Sprintf("enter_%s", Completing.String()): func(event *fsm.Event) {
135+
setTimer(completingTimeout, event, CompleteApplication)
132136
},
133137
fmt.Sprintf("leave_%s", New.String()): func(event *fsm.Event) {
134138
metrics.GetSchedulerMetrics().IncTotalApplicationsAdded()
@@ -144,12 +148,15 @@ func NewAppState() *fsm.FSM {
144148
},
145149
fmt.Sprintf("enter_%s", Completed.String()): func(event *fsm.Event) {
146150
metrics.GetSchedulerMetrics().IncTotalApplicationsCompleted()
147-
app := setTimer(completedTimeout, event, ExpireApplication)
151+
app := setTimer(terminatedTimeout, event, ExpireApplication)
148152
app.executeTerminatedCallback()
149153
app.clearPlaceholderTimer()
150154
},
155+
fmt.Sprintf("enter_%s", Failing.String()): func(event *fsm.Event) {
156+
event.Args[0].(*Application).failAppIfPossible()
157+
},
151158
fmt.Sprintf("enter_%s", Failed.String()): func(event *fsm.Event) {
152-
app := setTimer(completedTimeout, event, ExpireApplication)
159+
app := setTimer(terminatedTimeout, event, ExpireApplication)
153160
app.executeTerminatedCallback()
154161
},
155162
},

0 commit comments

Comments
 (0)