Skip to content

Commit 185aa3d

Browse files
committed
Better final inventory calculation during errors
1 parent dfab5b4 commit 185aa3d

File tree

8 files changed

+332
-56
lines changed

8 files changed

+332
-56
lines changed

pkg/apply/applier.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,12 @@ func (a *Applier) prepareObjects(localInv inventory.InventoryInfo, localObjs []*
104104
return nil, err
105105
}
106106
}
107+
// Retrieve previous inventory objects. Must happen before inventory client merge.
108+
prevInv, err := a.invClient.GetClusterObjs(localInv)
109+
if err != nil {
110+
return nil, err
111+
}
112+
klog.V(4).Infof("%d previous inventory objects in cluster", len(prevInv))
107113

108114
klog.V(4).Infof("applier merging %d objects into inventory", len(localObjs))
109115
currentObjs := object.UnstructuredsToObjMetas(localObjs)
@@ -122,6 +128,7 @@ func (a *Applier) prepareObjects(localInv inventory.InventoryInfo, localObjs []*
122128
LocalInv: localInv,
123129
Resources: localObjs,
124130
PruneIds: pruneIds,
131+
PrevInv: prevInv,
125132
}, nil
126133
}
127134

@@ -132,6 +139,7 @@ type ResourceObjects struct {
132139
LocalInv inventory.InventoryInfo
133140
Resources []*unstructured.Unstructured
134141
PruneIds []object.ObjMetadata
142+
PrevInv []object.ObjMetadata
135143
}
136144

137145
// ObjsForApply returns the unstructured representation for all the resources
@@ -162,6 +170,13 @@ func (r *ResourceObjects) IdsForPrune() []object.ObjMetadata {
162170
return r.PruneIds
163171
}
164172

173+
// IdsForPrevInv returns the Ids for the previous inventory. These
174+
// Ids reference the objects managed by the inventory object which
175+
// are already in the cluster.
176+
func (r *ResourceObjects) IdsForPrevInv() []object.ObjMetadata {
177+
return r.PrevInv
178+
}
179+
165180
// AllIds returns the Ids for all resources that are relevant. This
166181
// includes resources that will be applied or pruned.
167182
func (r *ResourceObjects) AllIds() []object.ObjMetadata {

pkg/apply/prune/prune.go

Lines changed: 3 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -194,29 +194,9 @@ func (po *PruneOptions) Prune(localInv inventory.InventoryInfo,
194194
}
195195
taskContext.EventChannel() <- createPruneEvent(pruneObj, obj, event.Pruned)
196196
}
197-
// Calculate final inventory items, ensuring only successfully applied
198-
// objects are in the inventory along with prune failures.
199-
finalInventory := []object.ObjMetadata{}
200-
for _, localObj := range localIds {
201-
obj, err := po.getObject(localObj)
202-
if err != nil {
203-
if klog.V(4) {
204-
klog.Errorf("error retrieving object for inventory determination: %s", err)
205-
}
206-
continue
207-
}
208-
uid := string(obj.GetUID())
209-
if currentUIDs.Has(uid) {
210-
klog.V(5).Infof("adding final inventory object %s/%s", localObj.Namespace, localObj.Name)
211-
finalInventory = append(finalInventory, localObj)
212-
} else {
213-
klog.V(5).Infof("uid not found (%s); not adding final inventory obj %s/%s",
214-
uid, localObj.Namespace, localObj.Name)
215-
}
216-
}
217-
klog.V(4).Infof("final inventory %d successfully applied objects", len(finalInventory))
218-
finalInventory = append(finalInventory, pruneFailures...)
219-
klog.V(4).Infof("final inventory %d objects after appending prune failures", len(finalInventory))
197+
// Final inventory equals applied objects and prune failures.
198+
appliedResources := taskContext.AppliedResources()
199+
finalInventory := append(appliedResources, pruneFailures...)
220200
return po.InvClient.Replace(localInv, finalInventory)
221201
}
222202

pkg/apply/prune/prune_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,11 @@ func TestPrune(t *testing.T) {
258258
// the events that can be put on it.
259259
eventChannel := make(chan event.Event, len(tc.pastObjs)+1) // Add one for inventory object
260260
taskContext := taskrunner.NewTaskContext(eventChannel)
261+
for _, u := range tc.currentObjs {
262+
o := object.UnstructuredToObjMeta(u)
263+
uid := u.GetUID()
264+
taskContext.ResourceApplied(o, uid, 0)
265+
}
261266
err := func() error {
262267
defer close(eventChannel)
263268
// Run the prune and validate.

pkg/apply/solver/solver.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ type resourceObjects interface {
5656
Inventory() inventory.InventoryInfo
5757
IdsForApply() []object.ObjMetadata
5858
IdsForPrune() []object.ObjMetadata
59+
IdsForPrevInv() []object.ObjMetadata
5960
}
6061

6162
// BuildTaskQueue takes a set of resources in the form of info objects
@@ -65,12 +66,19 @@ func (t *TaskQueueSolver) BuildTaskQueue(ro resourceObjects,
6566
o Options) chan taskrunner.Task {
6667
var tasks []taskrunner.Task
6768
remainingInfos := ro.ObjsForApply()
69+
// Convert slice of previous inventory objects into a map.
70+
prevInvSlice := ro.IdsForPrevInv()
71+
prevInventory := make(map[object.ObjMetadata]bool, len(prevInvSlice))
72+
for _, prevInvObj := range prevInvSlice {
73+
prevInventory[prevInvObj] = true
74+
}
6875

6976
crdSplitRes, hasCRDs := splitAfterCRDs(remainingInfos)
7077
if hasCRDs {
7178
tasks = append(tasks, &task.ApplyTask{
7279
Objects: append(crdSplitRes.before, crdSplitRes.crds...),
7380
CRDs: crdSplitRes.crds,
81+
PrevInventory: prevInventory,
7482
ServerSideOptions: o.ServerSideOptions,
7583
DryRunStrategy: o.DryRunStrategy,
7684
InfoHelper: t.InfoHelper,
@@ -96,6 +104,7 @@ func (t *TaskQueueSolver) BuildTaskQueue(ro resourceObjects,
96104
&task.ApplyTask{
97105
Objects: remainingInfos,
98106
CRDs: crdSplitRes.crds,
107+
PrevInventory: prevInventory,
99108
ServerSideOptions: o.ServerSideOptions,
100109
DryRunStrategy: o.DryRunStrategy,
101110
InfoHelper: t.InfoHelper,

pkg/apply/solver/solver_test.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -309,10 +309,11 @@ func getType(task taskrunner.Task) reflect.Type {
309309
}
310310

311311
type fakeResourceObjects struct {
312-
objsForApply []*unstructured.Unstructured
313-
inventory inventory.InventoryInfo
314-
idsForApply []object.ObjMetadata
315-
idsForPrune []object.ObjMetadata
312+
objsForApply []*unstructured.Unstructured
313+
inventory inventory.InventoryInfo
314+
idsForApply []object.ObjMetadata
315+
idsForPrune []object.ObjMetadata
316+
idsForPrevInv []object.ObjMetadata
316317
}
317318

318319
func (f *fakeResourceObjects) ObjsForApply() []*unstructured.Unstructured {
@@ -331,6 +332,10 @@ func (f *fakeResourceObjects) IdsForPrune() []object.ObjMetadata {
331332
return f.idsForPrune
332333
}
333334

335+
func (f *fakeResourceObjects) IdsForPrevInv() []object.ObjMetadata {
336+
return f.idsForPrevInv
337+
}
338+
334339
func ignoreErrInfoToObjMeta(info *unstructured.Unstructured) object.ObjMetadata {
335340
objMeta := object.UnstructuredToObjMeta(info)
336341
return objMeta

pkg/apply/task/apply_task.go

Lines changed: 50 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import (
1717
"k8s.io/client-go/dynamic"
1818
"k8s.io/klog"
1919
"k8s.io/kubectl/pkg/cmd/apply"
20-
"k8s.io/kubectl/pkg/cmd/delete"
20+
cmddelete "k8s.io/kubectl/pkg/cmd/delete"
2121
"k8s.io/kubectl/pkg/cmd/util"
2222
"k8s.io/kubectl/pkg/util/slice"
2323
applyerror "sigs.k8s.io/cli-utils/pkg/apply/error"
@@ -45,11 +45,13 @@ type applyOptions interface {
4545
// ApplyTask applies the given Objects to the cluster
4646
// by using the ApplyOptions.
4747
type ApplyTask struct {
48-
Factory util.Factory
49-
InfoHelper info.InfoHelper
50-
Mapper meta.RESTMapper
51-
Objects []*unstructured.Unstructured
52-
CRDs []*unstructured.Unstructured
48+
Factory util.Factory
49+
InfoHelper info.InfoHelper
50+
Mapper meta.RESTMapper
51+
Objects []*unstructured.Unstructured
52+
CRDs []*unstructured.Unstructured
53+
// Used for determining inventory during errors
54+
PrevInventory map[object.ObjMetadata]bool
5355
DryRunStrategy common.DryRunStrategy
5456
ServerSideOptions common.ServerSideOptions
5557
InventoryPolicy inventory.InventoryPolicy
@@ -80,6 +82,7 @@ func (a *ApplyTask) Start(taskContext *taskrunner.TaskContext) {
8082
// we have a CRD and a CR in the same resource set, but the CRD
8183
// will not actually have been applied when we reach the CR.
8284
if a.DryRunStrategy.ClientOrServerDryRun() {
85+
klog.V(4).Infof("dry-run filtering custom resources...")
8386
// Find all resources in the set that doesn't exist in the
8487
// RESTMapper, but where we do have the CRD for the type in
8588
// the resource set.
@@ -97,6 +100,7 @@ func (a *ApplyTask) Start(taskContext *taskrunner.TaskContext) {
97100
taskContext.EventChannel() <- createApplyEvent(object.UnstructuredToObjMeta(obj), event.Created, nil)
98101
}
99102
// Update the resource set to no longer include the CRs.
103+
klog.V(4).Infof("after dry-run filtering custom resources, %d objects left", len(objs))
100104
objects = objs
101105
}
102106

@@ -123,7 +127,8 @@ func (a *ApplyTask) Start(taskContext *taskrunner.TaskContext) {
123127
}
124128

125129
klog.V(4).Infof("attempting to apply %d remaining objects", len(objects))
126-
var infos []*resource.Info
130+
// invInfos stores the objects which should be stored in the final inventory.
131+
invInfos := make(map[object.ObjMetadata]*resource.Info, len(objects))
127132
for _, obj := range objects {
128133
// Set the client and mapping fields on the provided
129134
// info so they can be applied to the cluster.
@@ -144,18 +149,25 @@ func (a *ApplyTask) Start(taskContext *taskrunner.TaskContext) {
144149
if err != nil {
145150
if !apierrors.IsNotFound(err) {
146151
if klog.V(4) {
147-
klog.Errorf("error retrieving %s/%s from cluster--continue",
152+
klog.Errorf("error (%s) retrieving %s/%s from cluster--continue",
153+
err, info.Namespace, info.Name)
154+
}
155+
op := event.Failed
156+
if a.objInCluster(id) {
157+
// Object in cluster stays in the inventory.
158+
klog.V(4).Infof("%s/%s apply retrieval failure, but in cluster--keep in inventory",
148159
info.Namespace, info.Name)
160+
invInfos[id] = info
161+
op = event.Unchanged
149162
}
150-
taskContext.EventChannel() <- createApplyEvent(
151-
id,
152-
event.Unchanged,
153-
err)
163+
taskContext.EventChannel() <- createApplyEvent(id, op, err)
154164
taskContext.CaptureResourceFailure(id)
155165
continue
156166
}
157167
}
158-
infos = append(infos, info)
168+
// At this point the object was either 1) successfully retrieved from the cluster, or
169+
// 2) returned "Not Found" error (meaning first-time creation). Add to final inventory.
170+
invInfos[id] = info
159171
canApply, err := inventory.CanApply(a.InvInfo, clusterObj, a.InventoryPolicy)
160172
if !canApply {
161173
klog.V(5).Infof("can not apply %s/%s--continue",
@@ -176,30 +188,30 @@ func (a *ApplyTask) Start(taskContext *taskrunner.TaskContext) {
176188
if klog.V(4) {
177189
klog.Errorf("error applying (%s/%s) %s", info.Namespace, info.Name, err)
178190
}
191+
// If apply failed and the object is not in the cluster, remove
192+
// it from the final inventory.
193+
if !a.objInCluster(id) {
194+
klog.V(5).Infof("not in cluster; removing apply fail object %s/%s from inventory",
195+
info.Namespace, info.Name)
196+
delete(invInfos, id)
197+
}
179198
taskContext.EventChannel() <- createApplyEvent(
180199
id, event.Failed, applyerror.NewApplyRunError(err))
181200
taskContext.CaptureResourceFailure(id)
182201
}
183202
}
184203

185-
// Fetch the Generation from all Infos after they have been
186-
// applied.
187-
for _, inf := range infos {
188-
id, err := object.InfoToObjMeta(inf)
189-
if err != nil {
190-
continue
191-
}
192-
if inf.Object != nil {
193-
acc, err := meta.Accessor(inf.Object)
204+
// Store objects (and some obj metadata) in the task context
205+
// for the final inventory.
206+
for id, info := range invInfos {
207+
if info.Object != nil {
208+
acc, err := meta.Accessor(info.Object)
194209
if err != nil {
195210
continue
196211
}
197-
// Only add a resource if it successfully applied.
198212
uid := acc.GetUID()
199-
if string(uid) != "" {
200-
gen := acc.GetGeneration()
201-
taskContext.ResourceApplied(id, uid, gen)
202-
}
213+
gen := acc.GetGeneration()
214+
taskContext.ResourceApplied(id, uid, gen)
203215
}
204216
}
205217
a.sendTaskResult(taskContext)
@@ -232,7 +244,7 @@ func newApplyOptions(eventChannel chan event.Event, serverSideOptions common.Ser
232244
},
233245
// FilenameOptions are not needed since we don't use the ApplyOptions
234246
// to read manifests.
235-
DeleteOptions: &delete.DeleteOptions{},
247+
DeleteOptions: &cmddelete.DeleteOptions{},
236248
PrintFlags: &genericclioptions.PrintFlags{
237249
OutputFormat: &emptyString,
238250
},
@@ -292,6 +304,16 @@ func (a *ApplyTask) filterCRsWithCRDInSet(objects []*unstructured.Unstructured)
292304
return objs, objsWithCRD, nil
293305
}
294306

307+
// objInCluster returns true if the passed object is in the slice of
308+
// previous inventory, because an object in the previous inventory
309+
// exists in the cluster.
310+
func (a *ApplyTask) objInCluster(obj object.ObjMetadata) bool {
311+
if _, found := a.PrevInventory[obj]; found {
312+
return true
313+
}
314+
return false
315+
}
316+
295317
type crdsInfo struct {
296318
crds []crdInfo
297319
}

0 commit comments

Comments
 (0)