Skip to content

Commit a6bb8c2

Browse files
committed
ingore the failed resources in wait task
1 parent b9ff655 commit a6bb8c2

File tree

8 files changed

+52
-39
lines changed

8 files changed

+52
-39
lines changed

pkg/apply/applier_test.go

Lines changed: 1 addition & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -265,20 +265,6 @@ func TestApplier(t *testing.T) {
265265
Status: status.CurrentStatus,
266266
},
267267
},
268-
{
269-
EventType: pollevent.ResourceUpdateEvent,
270-
Resource: &pollevent.ResourceStatus{
271-
Identifier: toIdentifier(t, resources["deployment"]),
272-
Status: status.InProgressStatus,
273-
},
274-
},
275-
{
276-
EventType: pollevent.ResourceUpdateEvent,
277-
Resource: &pollevent.ResourceStatus{
278-
Identifier: toIdentifier(t, resources["deployment"]),
279-
Status: status.CurrentStatus,
280-
},
281-
},
282268
},
283269
expectedEventTypes: []expectedEvent{
284270
{
@@ -297,19 +283,12 @@ func TestApplier(t *testing.T) {
297283
eventType: event.StatusType,
298284
statusEventType: event.StatusEventResourceUpdate,
299285
},
300-
{
301-
eventType: event.StatusType,
302-
statusEventType: event.StatusEventResourceUpdate,
303-
},
304-
{
305-
eventType: event.StatusType,
306-
statusEventType: event.StatusEventResourceUpdate,
307-
},
308286
{
309287
eventType: event.StatusType,
310288
statusEventType: event.StatusEventCompleted,
311289
},
312290
},
291+
prune: false,
313292
clusterObj: deploymentUnmatched,
314293
},
315294
"prune with inventory object annotation unmatched": {

pkg/apply/destroyer.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"k8s.io/apimachinery/pkg/util/sets"
1212
"sigs.k8s.io/cli-utils/pkg/apply/event"
1313
"sigs.k8s.io/cli-utils/pkg/apply/prune"
14+
"sigs.k8s.io/cli-utils/pkg/apply/taskrunner"
1415
"sigs.k8s.io/cli-utils/pkg/common"
1516
"sigs.k8s.io/cli-utils/pkg/inventory"
1617
"sigs.k8s.io/cli-utils/pkg/provider"
@@ -70,7 +71,8 @@ func (d *Destroyer) Run(inv inventory.InventoryInfo) <-chan event.Event {
7071
// Events. That we use Prune to implement destroy is an
7172
// implementation detail and the events should not be Prune events.
7273
tempChannel, completedChannel := runPruneEventTransformer(ch)
73-
err := d.PruneOptions.Prune(inv, nil, sets.NewString(), tempChannel, prune.Options{
74+
taskContext := taskrunner.NewTaskContext(tempChannel)
75+
err := d.PruneOptions.Prune(inv, nil, sets.NewString(), taskContext, prune.Options{
7476
DryRunStrategy: d.DryRunStrategy,
7577
PropagationPolicy: metav1.DeletePropagationBackground,
7678
})

pkg/apply/prune/prune.go

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"k8s.io/klog"
2727
"k8s.io/kubectl/pkg/cmd/util"
2828
"sigs.k8s.io/cli-utils/pkg/apply/event"
29+
"sigs.k8s.io/cli-utils/pkg/apply/taskrunner"
2930
"sigs.k8s.io/cli-utils/pkg/common"
3031
"sigs.k8s.io/cli-utils/pkg/inventory"
3132
"sigs.k8s.io/cli-utils/pkg/object"
@@ -86,7 +87,7 @@ type Options struct {
8687
// the current apply. Prune also delete all previous inventory
8788
// objects. Returns an error if there was a problem.
8889
func (po *PruneOptions) Prune(localInv inventory.InventoryInfo, localObjs []*unstructured.Unstructured, currentUIDs sets.String,
89-
eventChannel chan<- event.Event, o Options) error {
90+
taskContext *taskrunner.TaskContext, o Options) error {
9091
if localInv == nil {
9192
return fmt.Errorf("the local inventory object can't be nil")
9293
}
@@ -114,7 +115,8 @@ func (po *PruneOptions) Prune(localInv inventory.InventoryInfo, localObjs []*uns
114115
mapping, err := po.mapper.RESTMapping(clusterObj.GroupKind)
115116
if err != nil {
116117
localIds = append(localIds, clusterObj)
117-
eventChannel <- createPruneFailedEvent(clusterObj, err)
118+
taskContext.EventChannel() <- createPruneFailedEvent(clusterObj, err)
119+
taskContext.CaptureResourceFailure(clusterObj)
118120
continue
119121
}
120122
namespacedClient := po.client.Resource(mapping.Resource).Namespace(clusterObj.Namespace)
@@ -125,13 +127,15 @@ func (po *PruneOptions) Prune(localInv inventory.InventoryInfo, localObjs []*uns
125127
continue
126128
}
127129
localIds = append(localIds, clusterObj)
128-
eventChannel <- createPruneFailedEvent(clusterObj, err)
130+
taskContext.EventChannel() <- createPruneFailedEvent(clusterObj, err)
131+
taskContext.CaptureResourceFailure(clusterObj)
129132
continue
130133
}
131134
metadata, err := meta.Accessor(obj)
132135
if err != nil {
133136
localIds = append(localIds, clusterObj)
134-
eventChannel <- createPruneFailedEvent(clusterObj, err)
137+
taskContext.EventChannel() <- createPruneFailedEvent(clusterObj, err)
138+
taskContext.CaptureResourceFailure(clusterObj)
135139
continue
136140
}
137141
// If this cluster object is not also a currently applied
@@ -145,7 +149,7 @@ func (po *PruneOptions) Prune(localInv inventory.InventoryInfo, localObjs []*uns
145149
}
146150
// Handle lifecycle directive preventing deletion.
147151
if !canPrune(localInv, obj, o.InventoryPolicy, uid) {
148-
eventChannel <- createPruneEvent(clusterObj, obj, event.PruneSkipped)
152+
taskContext.EventChannel() <- createPruneEvent(clusterObj, obj, event.PruneSkipped)
149153
localIds = append(localIds, clusterObj)
150154
continue
151155
}
@@ -155,21 +159,23 @@ func (po *PruneOptions) Prune(localInv inventory.InventoryInfo, localObjs []*uns
155159
if clusterObj.GroupKind == object.CoreV1Namespace.GroupKind() &&
156160
localNamespaces.Has(clusterObj.Name) {
157161
klog.V(4).Infof("skip pruning namespace: %s", clusterObj.Name)
158-
eventChannel <- createPruneEvent(clusterObj, obj, event.PruneSkipped)
162+
taskContext.EventChannel() <- createPruneEvent(clusterObj, obj, event.PruneSkipped)
159163
localIds = append(localIds, clusterObj)
164+
taskContext.CaptureResourceFailure(clusterObj)
160165
continue
161166
}
162167
}
163168
if !o.DryRunStrategy.ClientOrServerDryRun() {
164169
klog.V(4).Infof("prune object delete: %s/%s", clusterObj.Namespace, clusterObj.Name)
165170
err = namespacedClient.Delete(context.TODO(), clusterObj.Name, metav1.DeleteOptions{})
166171
if err != nil {
167-
eventChannel <- createPruneFailedEvent(clusterObj, err)
172+
taskContext.EventChannel() <- createPruneFailedEvent(clusterObj, err)
168173
localIds = append(localIds, clusterObj)
174+
taskContext.CaptureResourceFailure(clusterObj)
169175
continue
170176
}
171177
}
172-
eventChannel <- createPruneEvent(clusterObj, obj, event.Pruned)
178+
taskContext.EventChannel() <- createPruneEvent(clusterObj, obj, event.Pruned)
173179
}
174180
return po.InvClient.Replace(localInv, localIds)
175181
}

pkg/apply/prune/prune_test.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"k8s.io/client-go/dynamic/fake"
2222
"k8s.io/kubectl/pkg/scheme"
2323
"sigs.k8s.io/cli-utils/pkg/apply/event"
24+
"sigs.k8s.io/cli-utils/pkg/apply/taskrunner"
2425
"sigs.k8s.io/cli-utils/pkg/common"
2526
"sigs.k8s.io/cli-utils/pkg/inventory"
2627
"sigs.k8s.io/cli-utils/pkg/object"
@@ -250,10 +251,11 @@ func TestPrune(t *testing.T) {
250251
// The event channel can not block; make sure its bigger than all
251252
// the events that can be put on it.
252253
eventChannel := make(chan event.Event, len(tc.pastObjs)+1) // Add one for inventory object
254+
taskContext := taskrunner.NewTaskContext(eventChannel)
253255
err := func() error {
254256
defer close(eventChannel)
255257
// Run the prune and validate.
256-
return po.Prune(currentInventory, tc.currentObjs, populateObjectIds(tc.currentObjs, t), eventChannel, Options{
258+
return po.Prune(currentInventory, tc.currentObjs, populateObjectIds(tc.currentObjs, t), taskContext, Options{
257259
DryRunStrategy: drs,
258260
})
259261
}()
@@ -453,10 +455,11 @@ func TestPruneWithError(t *testing.T) {
453455
// The event channel can not block; make sure its bigger than all
454456
// the events that can be put on it.
455457
eventChannel := make(chan event.Event, len(tc.pastObjs)+1) // Add one for inventory object
458+
taskContext := taskrunner.NewTaskContext(eventChannel)
456459
err := func() error {
457460
defer close(eventChannel)
458461
// Run the prune and validate.
459-
return po.Prune(currentInventory, tc.currentObjs, populateObjectIds(tc.currentObjs, t), eventChannel, Options{
462+
return po.Prune(currentInventory, tc.currentObjs, populateObjectIds(tc.currentObjs, t), taskContext, Options{
460463
DryRunStrategy: drs,
461464
})
462465
}()

pkg/apply/task/apply_task.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -121,28 +121,32 @@ func (a *ApplyTask) Start(taskContext *taskrunner.TaskContext) {
121121
// Set the client and mapping fields on the provided
122122
// info so they can be applied to the cluster.
123123
info, err := a.InfoHelper.BuildInfo(obj)
124+
id := object.UnstructuredToObjMeta(obj)
124125
if err != nil {
125126
taskContext.EventChannel() <- createApplyEvent(
126-
object.UnstructuredToObjMeta(obj), event.Failed, applyerror.NewUnknownTypeError(err))
127+
id, event.Failed, applyerror.NewUnknownTypeError(err))
128+
taskContext.CaptureResourceFailure(id)
127129
continue
128130
}
129131

130132
clusterObj, err := getClusterObj(dynamic, info)
131133
if err != nil {
132134
if !apierrors.IsNotFound(err) {
133135
taskContext.EventChannel() <- createApplyEvent(
134-
object.UnstructuredToObjMeta(obj),
136+
id,
135137
event.Unchanged,
136138
err)
139+
taskContext.CaptureResourceFailure(id)
137140
continue
138141
}
139142
}
140143
canApply, err := inventory.CanApply(a.InvInfo, clusterObj, a.InventoryPolicy)
141144
if !canApply {
142145
taskContext.EventChannel() <- createApplyEvent(
143-
object.UnstructuredToObjMeta(obj),
146+
id,
144147
event.Unchanged,
145148
err)
149+
taskContext.CaptureResourceFailure(id)
146150
continue
147151
}
148152
// add the inventory annotation to the resource being applied.
@@ -152,7 +156,8 @@ func (a *ApplyTask) Start(taskContext *taskrunner.TaskContext) {
152156
err = ao.Run()
153157
if err != nil {
154158
taskContext.EventChannel() <- createApplyEvent(
155-
object.UnstructuredToObjMeta(obj), event.Failed, applyerror.NewApplyRunError(err))
159+
id, event.Failed, applyerror.NewApplyRunError(err))
160+
taskContext.CaptureResourceFailure(id)
156161
}
157162
}
158163

@@ -331,7 +336,9 @@ func createApplyEvent(id object.ObjMetadata, operation event.ApplyEventOperation
331336
// a list of resources when failed to initialize the apply process.
332337
func sendBatchApplyEvents(taskContext *taskrunner.TaskContext, objects []*unstructured.Unstructured, err error) {
333338
for _, obj := range objects {
339+
id := object.UnstructuredToObjMeta(obj)
334340
taskContext.EventChannel() <- createApplyEvent(
335-
object.UnstructuredToObjMeta(obj), event.Failed, applyerror.NewInitializeApplyOptionError(err))
341+
id, event.Failed, applyerror.NewInitializeApplyOptionError(err))
342+
taskContext.CaptureResourceFailure(id)
336343
}
337344
}

pkg/apply/task/prune_task.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ func (p *PruneTask) Start(taskContext *taskrunner.TaskContext) {
3232
go func() {
3333
currentUIDs := taskContext.AllResourceUIDs()
3434
err := p.PruneOptions.Prune(p.InventoryObject, p.Objects,
35-
currentUIDs, taskContext.EventChannel(), prune.Options{
35+
currentUIDs, taskContext, prune.Options{
3636
DryRunStrategy: p.DryRunStrategy,
3737
PropagationPolicy: p.PropagationPolicy,
3838
InventoryPolicy: p.InventoryPolicy,

pkg/apply/taskrunner/context.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ func NewTaskContext(eventChannel chan event.Event) *TaskContext {
1616
taskChannel: make(chan TaskResult),
1717
eventChannel: eventChannel,
1818
appliedResources: make(map[object.ObjMetadata]applyInfo),
19+
failedResources: make(map[object.ObjMetadata]struct{}),
1920
}
2021
}
2122

@@ -27,6 +28,9 @@ type TaskContext struct {
2728
eventChannel chan event.Event
2829

2930
appliedResources map[object.ObjMetadata]applyInfo
31+
32+
// failedResources records the IDs of resources that are failed during applying and pruning.
33+
failedResources map[object.ObjMetadata]struct{}
3034
}
3135

3236
func (tc *TaskContext) TaskChannel() chan TaskResult {
@@ -76,6 +80,15 @@ func (tc *TaskContext) ResourceGeneration(id object.ObjMetadata) (int64, bool) {
7680
return ai.generation, true
7781
}
7882

83+
func (tc *TaskContext) ResourceFailed(id object.ObjMetadata) bool {
84+
_, found := tc.failedResources[id]
85+
return found
86+
}
87+
88+
func (tc *TaskContext) CaptureResourceFailure(id object.ObjMetadata) {
89+
tc.failedResources[id] = struct{}{}
90+
}
91+
7992
// applyInfo captures information about resources that have been
8093
// applied. This is captured in the TaskContext so other tasks
8194
// running later might use this information.

pkg/apply/taskrunner/task.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,9 @@ func (w *WaitTask) checkCondition(taskContext *TaskContext, coll *resourceStatus
112112
func (w *WaitTask) computeResourceWaitData(taskContext *TaskContext) []resourceWaitData {
113113
var rwd []resourceWaitData
114114
for _, id := range w.Identifiers {
115+
if taskContext.ResourceFailed(id) {
116+
continue
117+
}
115118
gen, _ := taskContext.ResourceGeneration(id)
116119
rwd = append(rwd, resourceWaitData{
117120
identifier: id,

0 commit comments

Comments
 (0)