Skip to content

Commit 5454d7d

Browse files
committed
[YUNIKORN-2703] Core: Fallback to default queue if no placement rules match
1 parent 4834b19 commit 5454d7d

File tree

5 files changed

+357
-10
lines changed

5 files changed

+357
-10
lines changed

pkg/common/constants.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,12 @@ package common
2121
const (
2222
Empty = ""
2323

24-
Wildcard = "*"
25-
Separator = ","
26-
Space = " "
27-
AnonymousUser = "nobody"
28-
AnonymousGroup = "nogroup"
29-
RecoveryQueue = "@recovery@"
30-
RecoveryQueueFull = "root." + RecoveryQueue
24+
Wildcard = "*"
25+
Separator = ","
26+
Space = " "
27+
AnonymousUser = "nobody"
28+
AnonymousGroup = "nogroup"
29+
RecoveryQueue = "@recovery@"
30+
RecoveryQueueFull = "root." + RecoveryQueue
31+
DefaultPlacementQueue = "root.default"
3132
)

pkg/scheduler/partition_test.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -923,7 +923,7 @@ func TestAddApp(t *testing.T) {
923923
}
924924

925925
func TestAddAppForced(t *testing.T) {
926-
partition, err := newBasePartition()
926+
partition, err := newBasePartitionNoRootDefault()
927927
assert.NilError(t, err, "partition create failed")
928928

929929
// add a new app to an invalid queue
@@ -1465,9 +1465,9 @@ func TestUpdateQueues(t *testing.T) {
14651465
}
14661466

14671467
func TestGetApplication(t *testing.T) {
1468-
partition, err := newBasePartition()
1468+
partition, err := newBasePartitionNoRootDefault()
14691469
assert.NilError(t, err, "partition create failed")
1470-
app := newApplication(appID1, "default", defQueue)
1470+
app := newApplication(appID1, "default", "root.custom")
14711471
err = partition.AddApplication(app)
14721472
assert.NilError(t, err, "no error expected while adding the application")
14731473
assert.Equal(t, partition.GetApplication(appID1), app, "partition failed to add app incorrect app returned")
@@ -1479,6 +1479,12 @@ func TestGetApplication(t *testing.T) {
14791479
if partition.GetApplication(appID2) != nil {
14801480
t.Fatal("partition added app incorrectly should have failed")
14811481
}
1482+
1483+
partition, err = newBasePartition()
1484+
assert.NilError(t, err, "partition create failed")
1485+
err = partition.AddApplication(app2)
1486+
assert.NilError(t, err, "no error expected while adding the application")
1487+
assert.Equal(t, partition.GetApplication(appID2), app2, "partition failed to add app incorrect app returned")
14821488
}
14831489

14841490
func TestGetQueue(t *testing.T) {

pkg/scheduler/placement/placement.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424

2525
"go.uber.org/zap"
2626

27+
"github.com/apache/yunikorn-core/pkg/common"
2728
"github.com/apache/yunikorn-core/pkg/common/configs"
2829
"github.com/apache/yunikorn-core/pkg/locking"
2930
"github.com/apache/yunikorn-core/pkg/log"
@@ -115,7 +116,9 @@ func (m *AppPlacementManager) PlaceApplication(app *objects.Application) error {
115116
var queueName string
116117
var aclCheck bool
117118
var err error
119+
var remainingRules = len(m.rules)
118120
for _, checkRule := range m.rules {
121+
remainingRules--
119122
log.Log(log.Config).Debug("Executing rule for placing application",
120123
zap.String("ruleName", checkRule.getName()),
121124
zap.String("application", app.ApplicationID))
@@ -127,6 +130,18 @@ func (m *AppPlacementManager) PlaceApplication(app *objects.Application) error {
127130
app.SetQueuePath("")
128131
return err
129132
}
133+
// if no queue found even after the last rule, try to place in the default queue
134+
if remainingRules == 0 && queueName == "" {
135+
log.Log(log.Config).Info("No rule matched, placing application in default queue",
136+
zap.String("application", app.ApplicationID),
137+
zap.String("defaultQueue", common.DefaultPlacementQueue))
138+
// get the queue object
139+
queue := m.queueFn(common.DefaultPlacementQueue)
140+
if queue != nil {
141+
// default queue exist
142+
queueName = common.DefaultPlacementQueue
143+
}
144+
}
130145
// queueName returned make sure ACL allows access and create the queueName if not exist
131146
if queueName != "" {
132147
// get the queue object

pkg/scheduler/placement/placement_test.go

Lines changed: 268 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,11 @@ import (
2323

2424
"gotest.tools/v3/assert"
2525

26+
"github.com/apache/yunikorn-core/pkg/common"
2627
"github.com/apache/yunikorn-core/pkg/common/configs"
2728
"github.com/apache/yunikorn-core/pkg/common/security"
2829
"github.com/apache/yunikorn-core/pkg/scheduler/placement/types"
30+
siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common"
2931
)
3032

3133
// basic test to check if no rules leave the manager unusable
@@ -256,3 +258,269 @@ partitions:
256258
t.Errorf("parent queue: app should not have been placed, queue: '%s', error: %v", queueName, err)
257259
}
258260
}
261+
262+
func TestForcePlaceApp(t *testing.T) {
263+
const (
264+
provided = "provided"
265+
providedQ = "root.provided"
266+
defQ = "root.default"
267+
customDefaultQ = "root.custom"
268+
)
269+
270+
// Create the structure for the test
271+
// specifically no acl to allow on root
272+
// root.default - undefined
273+
data := `
274+
partitions:
275+
- name: default
276+
queues:
277+
- name: root
278+
submitacl: "any-user"
279+
queues:
280+
- name: provided
281+
submitacl: "*"
282+
- name: acldeny
283+
submitacl: " "
284+
- name: parent
285+
parent: true
286+
submitacl: "*"
287+
`
288+
err := initQueueStructure([]byte(data))
289+
assert.NilError(t, err, "setting up the queue config failed")
290+
// update the manager
291+
rules := []configs.PlacementRule{
292+
{Name: "provided",
293+
Create: false},
294+
{Name: "tag",
295+
Value: "namespace",
296+
Create: true},
297+
}
298+
man := NewPlacementManager(rules, queueFunc)
299+
if man == nil {
300+
t.Fatal("placement manager create failed")
301+
}
302+
303+
tags := make(map[string]string)
304+
user := security.UserGroup{
305+
User: "any-user",
306+
Groups: []string{},
307+
}
308+
deny := security.UserGroup{
309+
User: "deny-user",
310+
Groups: []string{},
311+
}
312+
var tests = []struct {
313+
name string
314+
queue string
315+
placed string
316+
tags map[string]string
317+
user security.UserGroup
318+
}{
319+
{"empty", "", "", tags, user},
320+
{"provided unqualified", provided, providedQ, tags, user},
321+
{"provided qualified", providedQ, providedQ, tags, user},
322+
{"provided not exist", "unknown", "", tags, user},
323+
{"provided parent", "root.parent", "", tags, user},
324+
{"acl deny", "root.acldeny", "", tags, deny},
325+
{"create", "unknown", "root.namespace", map[string]string{"namespace": "namespace"}, user},
326+
{"deny create", "unknown", "", map[string]string{"namespace": "namespace"}, deny},
327+
{"forced exist", providedQ, providedQ, map[string]string{siCommon.AppTagCreateForce: "true"}, user},
328+
{"forced and create", "unknown", "root.namespace", map[string]string{siCommon.AppTagCreateForce: "true", "namespace": "namespace"}, user},
329+
{"forced and deny create", "unknown", common.RecoveryQueueFull, map[string]string{siCommon.AppTagCreateForce: "true", "namespace": "namespace"}, deny},
330+
{"forced parent", "root.parent", common.RecoveryQueueFull, map[string]string{siCommon.AppTagCreateForce: "true"}, user},
331+
{"forced acl deny", "root.acldeny", common.RecoveryQueueFull, map[string]string{siCommon.AppTagCreateForce: "true"}, deny},
332+
{"forced not exist", "unknown", common.RecoveryQueueFull, map[string]string{siCommon.AppTagCreateForce: "true"}, user},
333+
{"forced not exist acl deny", "unknown", common.RecoveryQueueFull, map[string]string{siCommon.AppTagCreateForce: "true"}, deny},
334+
}
335+
for _, tt := range tests {
336+
t.Run(tt.name, func(t *testing.T) {
337+
app := newApplication("app1", "default", tt.queue, tt.user, tt.tags, nil, "")
338+
err = man.PlaceApplication(app)
339+
if tt.placed == "" {
340+
assert.ErrorContains(t, err, "rejected", "unexpected error or no error returned")
341+
} else {
342+
assert.NilError(t, err, "unexpected placement failure")
343+
assert.Equal(t, tt.placed, app.GetQueuePath(), "incorrect queue set")
344+
}
345+
})
346+
}
347+
348+
// Update Queue structure
349+
// root.default - defined
350+
data = `
351+
partitions:
352+
- name: default
353+
queues:
354+
- name: root
355+
submitacl: "any-user"
356+
queues:
357+
- name: default
358+
submitacl: "*"
359+
- name: provided
360+
submitacl: "*"
361+
- name: acldeny
362+
submitacl: " "
363+
- name: parent
364+
parent: true
365+
submitacl: "*"
366+
`
367+
err = initQueueStructure([]byte(data))
368+
assert.NilError(t, err, "setting up the queue config failed")
369+
370+
tests = []struct {
371+
name string
372+
queue string
373+
placed string
374+
tags map[string]string
375+
user security.UserGroup
376+
}{
377+
{"empty | defaulQ defined", "", defQ, tags, user},
378+
{"provided unqualified | defaulQ defined", provided, providedQ, tags, user},
379+
{"provided qualified | defaulQ defined", defQ, defQ, tags, user},
380+
{"provided not exist | defaulQ defined", "unknown", defQ, tags, user},
381+
{"provided parent | defaulQ defined", "root.parent", defQ, tags, user},
382+
{"acl deny | defaulQ defined", "root.acldeny", defQ, tags, deny},
383+
{"create | defaulQ defined", "unknown", "root.namespace", map[string]string{"namespace": "namespace"}, user},
384+
{"deny create | defaulQ defined", "unknown", defQ, map[string]string{"namespace": "namespace"}, deny},
385+
{"forced exist | defaulQ defined", defQ, defQ, map[string]string{siCommon.AppTagCreateForce: "true"}, user},
386+
{"forced and create | defaulQ defined", "unknown", "root.namespace", map[string]string{siCommon.AppTagCreateForce: "true", "namespace": "namespace"}, user},
387+
{"forced and deny create | defaulQ defined", "unknown", common.RecoveryQueueFull, map[string]string{siCommon.AppTagCreateForce: "true", "namespace": "namespace"}, deny},
388+
{"forced parent | defaulQ defined", "root.parent", common.RecoveryQueueFull, map[string]string{siCommon.AppTagCreateForce: "true"}, user},
389+
{"forced acl deny | defaulQ defined", "root.acldeny", common.RecoveryQueueFull, map[string]string{siCommon.AppTagCreateForce: "true"}, deny},
390+
{"forced not exist | defaulQ defined", "unknown", common.RecoveryQueueFull, map[string]string{siCommon.AppTagCreateForce: "true"}, user},
391+
{"forced not exist acl deny | defaulQ defined", "unknown", common.RecoveryQueueFull, map[string]string{siCommon.AppTagCreateForce: "true"}, deny},
392+
}
393+
for _, tt := range tests {
394+
t.Run(tt.name, func(t *testing.T) {
395+
app := newApplication("app1", "default", tt.queue, tt.user, tt.tags, nil, "")
396+
err = man.PlaceApplication(app)
397+
if tt.placed == "" {
398+
assert.ErrorContains(t, err, "rejected", "unexpected error or no error returned")
399+
} else {
400+
assert.NilError(t, err, "unexpected placement failure")
401+
assert.Equal(t, tt.placed, app.GetQueuePath(), "incorrect queue set")
402+
}
403+
})
404+
}
405+
406+
// initialize queues with no root.default
407+
// Add fixed placement rule to define custom default queue
408+
data = `
409+
partitions:
410+
- name: default
411+
queues:
412+
- name: root
413+
submitacl: "any-user"
414+
queues:
415+
- name: custom
416+
submitacl: "*"
417+
- name: provided
418+
submitacl: "*"
419+
- name: acldeny
420+
submitacl: " "
421+
- name: parent
422+
parent: true
423+
submitacl: "*"
424+
`
425+
err = initQueueStructure([]byte(data))
426+
assert.NilError(t, err, "setting up the queue config failed")
427+
428+
// update the manager
429+
rules = []configs.PlacementRule{
430+
{Name: "provided",
431+
Create: false},
432+
{Name: "tag",
433+
Value: "namespace",
434+
Create: true},
435+
{Name: "fixed",
436+
Value: "root.custom",
437+
Create: true},
438+
}
439+
man1 := NewPlacementManager(rules, queueFunc)
440+
if man1 == nil {
441+
t.Fatal("placement manager create failed")
442+
}
443+
444+
tests = []struct {
445+
name string
446+
queue string
447+
placed string
448+
tags map[string]string
449+
user security.UserGroup
450+
}{
451+
{"empty | custom defaulQ", "", customDefaultQ, tags, user},
452+
{"provided unqualified | custom defaulQ", provided, providedQ, tags, user},
453+
{"provided qualified | custom defaulQ", providedQ, providedQ, tags, user},
454+
{"provided not exist | custom defaulQ", "unknown", customDefaultQ, tags, user},
455+
{"provided parent | custom defaulQ", "root.parent", customDefaultQ, tags, user},
456+
{"acl deny | custom defaulQ", "root.acldeny", customDefaultQ, tags, deny},
457+
{"create | custom defaulQ", "unknown", "root.namespace", map[string]string{"namespace": "namespace"}, user},
458+
{"deny create | custom defaulQ", "unknown", customDefaultQ, map[string]string{"namespace": "namespace"}, deny},
459+
{"forced exist | custom defaulQ", providedQ, providedQ, map[string]string{siCommon.AppTagCreateForce: "true"}, user},
460+
{"forced and create | custom defaulQ", "unknown", "root.namespace", map[string]string{siCommon.AppTagCreateForce: "true", "namespace": "namespace"}, user},
461+
}
462+
for _, tt := range tests {
463+
t.Run(tt.name, func(t *testing.T) {
464+
app := newApplication("app1", "default", tt.queue, tt.user, tt.tags, nil, "")
465+
err = man1.PlaceApplication(app)
466+
if tt.placed == "" {
467+
assert.ErrorContains(t, err, "rejected", "unexpected error or no error returned")
468+
} else {
469+
assert.NilError(t, err, "unexpected placement failure")
470+
assert.Equal(t, tt.placed, app.GetQueuePath(), "incorrect queue set")
471+
}
472+
})
473+
}
474+
}
475+
476+
func TestManagerPlaceApp_Error(t *testing.T) {
477+
// Create the structure for the test
478+
data := `
479+
partitions:
480+
- name: default
481+
queues:
482+
- name: root
483+
queues:
484+
- name: testparent
485+
submitacl: "*"
486+
queues:
487+
- name: testchild
488+
- name: fixed
489+
submitacl: "other-user "
490+
parent: true
491+
`
492+
err := initQueueStructure([]byte(data))
493+
assert.NilError(t, err, "setting up the queue config failed")
494+
// basic info without rules, manager should init
495+
man := NewPlacementManager(nil, queueFunc)
496+
if man == nil {
497+
t.Fatal("placement manager create failed")
498+
}
499+
rules := []configs.PlacementRule{
500+
{
501+
Name: "user",
502+
Create: false,
503+
Parent: &configs.PlacementRule{
504+
Name: "user",
505+
Create: false,
506+
Parent: &configs.PlacementRule{
507+
Name: "fixed",
508+
Value: "testparent",
509+
},
510+
},
511+
},
512+
}
513+
user := security.UserGroup{
514+
User: "testchild",
515+
Groups: []string{},
516+
}
517+
tags := make(map[string]string)
518+
err = man.UpdateRules(rules)
519+
assert.NilError(t, err, "failed to update existing manager")
520+
app := newApplication("app1", "default", "", user, tags, nil, "")
521+
err = man.PlaceApplication(app)
522+
queueName := app.GetQueuePath()
523+
if err == nil || queueName != "" {
524+
t.Errorf("failed placed app, queue: '%s', error: %v", queueName, err)
525+
}
526+
}

0 commit comments

Comments
 (0)