@@ -22,25 +22,29 @@ import (
22
22
"time"
23
23
24
24
"github.com/GoogleContainerTools/kpt/pkg/status"
25
+ "github.com/GoogleContainerTools/kpt/thirdparty/cli-utils/pkg/apply/taskrunner"
25
26
apierrors "k8s.io/apimachinery/pkg/api/errors"
26
27
"k8s.io/apimachinery/pkg/api/meta"
27
28
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28
29
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
30
+ "k8s.io/apimachinery/pkg/runtime"
29
31
"k8s.io/apimachinery/pkg/runtime/schema"
32
+ "k8s.io/cli-runtime/pkg/resource"
30
33
"k8s.io/klog/v2"
31
34
cmdutil "k8s.io/kubectl/pkg/cmd/util"
32
- "sigs.k8s.io/cli-utils/pkg/apply/cache"
33
- "sigs.k8s.io/cli-utils/pkg/apply/event"
34
- "sigs.k8s.io/cli-utils/pkg/apply/taskrunner"
35
+ "k8s.io/kubectl/pkg/util"
35
36
"sigs.k8s.io/cli-utils/pkg/common"
36
37
"sigs.k8s.io/cli-utils/pkg/inventory"
38
+ "sigs.k8s.io/cli-utils/pkg/kstatus/polling"
39
+ pollevent "sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
40
+ kstatus "sigs.k8s.io/cli-utils/pkg/kstatus/status"
37
41
"sigs.k8s.io/cli-utils/pkg/object"
38
42
"sigs.k8s.io/kustomize/kyaml/yaml"
39
43
)
40
44
41
45
const (
42
- applyCRDTimeout = 10 * time .Second
43
- applyCRDPollInterval = 2 * time .Second
46
+ applyRGTimeout = 10 * time .Second
47
+ applyRGPollInterval = 2 * time .Second
44
48
)
45
49
46
50
// ResourceGroupGVK is the group/version/kind of the custom
@@ -296,83 +300,83 @@ func ResourceGroupCRDMatched(factory cmdutil.Factory) bool {
296
300
return reflect .DeepEqual (liveSpec , latestspec )
297
301
}
298
302
299
- // InstallResourceGroupCRD applies the custom resource definition for the
300
- // ResourceGroup by creating and running a TaskQueue of Tasks necessary.
301
- // The Tasks are 1) Apply CRD task, 2) Wait Task (for CRD to become
302
- // established), and 3) Reset RESTMapper task. Returns an error if
303
- // a non-"AlreadyExists" error is returned on the event channel.
304
- // Runs the CRD installation in a separate goroutine (timeout
305
- // ensures no hanging).
306
- func InstallResourceGroupCRD (factory cmdutil.Factory ) error {
307
- eventChannel := make (chan event.Event )
308
- go func () {
309
- defer close (eventChannel )
310
- mapper , err := factory .ToRESTMapper ()
311
- if err != nil {
312
- handleError (eventChannel , err )
313
- return
314
- }
315
- crd , err := rgCRD (mapper )
316
- if err != nil {
317
- handleError (eventChannel , err )
318
- return
319
- }
320
- // Create the task to apply the ResourceGroup CRD.
321
- applyRGTask := NewApplyCRDTask (factory , crd )
322
- objs := object .UnstructuredSetToObjMetadataSet ([]* unstructured.Unstructured {crd })
323
- // Create the tasks to apply the ResourceGroup CRD.
324
- tasks := []taskrunner.Task {
325
- applyRGTask ,
326
- taskrunner .NewWaitTask ("wait-rg-crd" , objs , taskrunner .AllCurrent ,
327
- applyCRDTimeout , mapper ),
328
- }
329
- // Create the task queue channel, and send tasks in order into the channel.
330
- taskQueue := make (chan taskrunner.Task , len (tasks ))
331
- for _ , t := range tasks {
332
- taskQueue <- t
333
- }
334
- statusPoller , err := status .NewStatusPoller (factory )
335
- if err != nil {
336
- handleError (eventChannel , err )
337
- return
338
- }
339
- // Create a new cache map to hold the last known resource state & status
340
- resourceCache := cache .NewResourceCacheMap ()
341
- // Run the task queue.
342
- runner := taskrunner .NewTaskStatusRunner (objs , statusPoller , resourceCache )
343
- err = runner .Run (context .Background (), taskQueue , eventChannel , taskrunner.Options {
344
- PollInterval : applyCRDPollInterval ,
345
- UseCache : true ,
346
- EmitStatusEvents : true ,
347
- })
348
- if err != nil {
349
- handleError (eventChannel , err )
350
- return
351
- }
352
- }()
303
+ // ResourceGroupInstaller can install the ResourceGroup CRD into a cluster.
304
+ type ResourceGroupInstaller struct {
305
+ Factory cmdutil.Factory
306
+ }
353
307
354
- // Return the error on the eventChannel if it exists; return
355
- // closes the channel. "AlreadyExists" is NOT an error.
356
- for e := range eventChannel {
357
- if e .Type == event .ErrorType {
358
- err := e .ErrorEvent .Err
359
- if ! apierrors .IsAlreadyExists (err ) {
360
- return err
361
- }
308
+ func (rgi * ResourceGroupInstaller ) InstallRG (ctx context.Context ) error {
309
+ poller , err := status .NewStatusPoller (rgi .Factory )
310
+ if err != nil {
311
+ return err
312
+ }
313
+
314
+ mapper , err := rgi .Factory .ToRESTMapper ()
315
+ if err != nil {
316
+ return err
317
+ }
318
+
319
+ crd , err := rgCRD (mapper )
320
+ if err != nil {
321
+ return err
322
+ }
323
+
324
+ if err := rgi .applyRG (crd ); err != nil {
325
+ if apierrors .IsAlreadyExists (err ) {
326
+ return nil
362
327
}
328
+ return err
363
329
}
364
330
365
- return nil
331
+ objs := object .UnstructuredSetToObjMetadataSet ([]* unstructured.Unstructured {crd })
332
+ ctx , cancel := context .WithTimeout (ctx , applyRGTimeout )
333
+ return func () error {
334
+ defer cancel ()
335
+ for e := range poller .Poll (ctx , objs , polling.Options {PollInterval : applyRGPollInterval }) {
336
+ switch e .EventType {
337
+ case pollevent .ErrorEvent :
338
+ return e .Error
339
+ case pollevent .ResourceUpdateEvent :
340
+ if e .Resource .Status == kstatus .CurrentStatus {
341
+ // TODO: Replace this with a call to meta.MaybeResetRESTMapper
342
+ // once we update the k8s libraries.
343
+ m , err := taskrunner .ExtractDeferredDiscoveryRESTMapper (mapper )
344
+ if err != nil {
345
+ return err
346
+ }
347
+ m .Reset ()
348
+ return nil
349
+ }
350
+ }
351
+ }
352
+ return nil
353
+ }()
366
354
}
367
355
368
- // handleError sends an error onto the event channel.
369
- func handleError (eventChannel chan event.Event , err error ) {
370
- eventChannel <- event.Event {
371
- Type : event .ErrorType ,
372
- ErrorEvent : event.ErrorEvent {
373
- Err : err ,
374
- },
356
+ func (rgi * ResourceGroupInstaller ) applyRG (crd runtime.Object ) error {
357
+ mapper , err := rgi .Factory .ToRESTMapper ()
358
+ if err != nil {
359
+ return err
360
+ }
361
+ mapping , err := mapper .RESTMapping (crdGroupKind )
362
+ if err != nil {
363
+ return err
364
+ }
365
+ client , err := rgi .Factory .UnstructuredClientForMapping (mapping )
366
+ if err != nil {
367
+ return err
368
+ }
369
+
370
+ // Set the "last-applied-annotation" so future applies work correctly.
371
+ if err := util .CreateApplyAnnotation (crd , unstructured .UnstructuredJSONScheme ); err != nil {
372
+ return err
375
373
}
374
+ // Apply the CRD to the cluster and ignore already exists error.
375
+ var clearResourceVersion = false
376
+ var emptyNamespace = ""
377
+ helper := resource .NewHelper (client , mapping )
378
+ _ , err = helper .Create (emptyNamespace , clearResourceVersion , crd )
379
+ return err
376
380
}
377
381
378
382
// rgCRD returns the ResourceGroup CRD in Unstructured format or an error.
0 commit comments