@@ -18,15 +18,19 @@ import (
18
18
"context"
19
19
"flag"
20
20
"fmt"
21
+ "strconv"
22
+ "strings"
21
23
22
24
kptoci "github.com/GoogleContainerTools/kpt/pkg/oci"
25
+ porchapi "github.com/GoogleContainerTools/kpt/porch/api/porch/v1alpha1"
23
26
api "github.com/GoogleContainerTools/kpt/porch/controllers/remoterootsyncsets/api/v1alpha1"
24
27
"github.com/GoogleContainerTools/kpt/porch/controllers/remoterootsyncsets/pkg/applyset"
25
28
"github.com/GoogleContainerTools/kpt/porch/controllers/remoterootsyncsets/pkg/remoteclient"
26
29
"github.com/GoogleContainerTools/kpt/porch/pkg/objects"
27
30
"github.com/GoogleContainerTools/kpt/porch/pkg/oci"
28
31
"k8s.io/apimachinery/pkg/api/meta"
29
32
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
33
+ "k8s.io/apimachinery/pkg/types"
30
34
"k8s.io/client-go/rest"
31
35
"k8s.io/klog/v2"
32
36
ctrl "sigs.k8s.io/controller-runtime"
@@ -54,9 +58,15 @@ func (o *Options) BindFlags(prefix string, flags *flag.FlagSet) {
54
58
type RemoteRootSyncSetReconciler struct {
55
59
Options
56
60
57
- remoteclient.RemoteClientGetter
61
+ remoteClientGetter remoteclient.RemoteClientGetter
58
62
59
- client.Client
63
+ client client.Client
64
+
65
+ // uncachedClient queries the apiserver without using a watch cache.
66
+ // This is useful for PackageRevisionResources, which are large
67
+ // and would consume a lot of memory, and so we deliberately don't
68
+ // support watching them.
69
+ uncachedClient client.Client
60
70
61
71
ociStorage * kptoci.Storage
62
72
@@ -70,11 +80,12 @@ type RemoteRootSyncSetReconciler struct {
70
80
//+kubebuilder:rbac:groups=config.porch.kpt.dev,resources=remoterootsyncsets,verbs=get;list;watch;create;update;patch;delete
71
81
//+kubebuilder:rbac:groups=config.porch.kpt.dev,resources=remoterootsyncsets/status,verbs=get;update;patch
72
82
//+kubebuilder:rbac:groups=config.porch.kpt.dev,resources=remoterootsyncsets/finalizers,verbs=update
83
+ //+kubebuilder:rbac:groups=porch.kpt.dev,resources=packagerevisions;packagerevisionresources,verbs=get;list;watch
73
84
74
85
// Reconcile implements the main kubernetes reconciliation loop.
75
86
func (r * RemoteRootSyncSetReconciler ) Reconcile (ctx context.Context , req ctrl.Request ) (ctrl.Result , error ) {
76
87
var subject api.RemoteRootSyncSet
77
- if err := r .Get (ctx , req .NamespacedName , & subject ); err != nil {
88
+ if err := r .client . Get (ctx , req .NamespacedName , & subject ); err != nil {
78
89
return ctrl.Result {}, client .IgnoreNotFound (err )
79
90
}
80
91
myFinalizerName := "config.porch.kpt.dev/finalizer"
@@ -84,7 +95,7 @@ func (r *RemoteRootSyncSetReconciler) Reconcile(ctx context.Context, req ctrl.Re
84
95
// registering our finalizer.
85
96
if ! controllerutil .ContainsFinalizer (& subject , myFinalizerName ) {
86
97
controllerutil .AddFinalizer (& subject , myFinalizerName )
87
- if err := r .Update (ctx , & subject ); err != nil {
98
+ if err := r .client . Update (ctx , & subject ); err != nil {
88
99
return ctrl.Result {}, err
89
100
}
90
101
}
@@ -99,7 +110,7 @@ func (r *RemoteRootSyncSetReconciler) Reconcile(ctx context.Context, req ctrl.Re
99
110
}
100
111
// remove our finalizer from the list and update it.
101
112
controllerutil .RemoveFinalizer (& subject , myFinalizerName )
102
- if err := r .Update (ctx , & subject ); err != nil {
113
+ if err := r .client . Update (ctx , & subject ); err != nil {
103
114
return ctrl.Result {}, fmt .Errorf ("failed to update %s after delete finalizer: %w" , req .Name , err )
104
115
}
105
116
}
@@ -116,7 +127,7 @@ func (r *RemoteRootSyncSetReconciler) Reconcile(ctx context.Context, req ctrl.Re
116
127
patchErrs = append (patchErrs , err )
117
128
}
118
129
if updateTargetStatus (& subject , clusterRef , results , err ) {
119
- if err := r .Status ().Update (ctx , & subject ); err != nil {
130
+ if err := r .client . Status ().Update (ctx , & subject ); err != nil {
120
131
patchErrs = append (patchErrs , err )
121
132
}
122
133
}
@@ -226,7 +237,7 @@ func updateAggregateStatus(subject *api.RemoteRootSyncSet) bool {
226
237
}
227
238
228
239
func (r * RemoteRootSyncSetReconciler ) applyToClusterRef (ctx context.Context , subject * api.RemoteRootSyncSet , clusterRef * api.ClusterRef ) (* applyset.ApplyResults , error ) {
229
- remoteClient , err := r .GetRemoteClient (ctx , clusterRef , subject .Namespace )
240
+ remoteClient , err := r .remoteClientGetter . GetRemoteClient (ctx , clusterRef , subject .Namespace )
230
241
if err != nil {
231
242
return nil , err
232
243
}
@@ -281,6 +292,18 @@ func (r *RemoteRootSyncSetReconciler) applyToClusterRef(ctx context.Context, sub
281
292
282
293
// BuildObjectsToApply config root sync
283
294
func (r * RemoteRootSyncSetReconciler ) BuildObjectsToApply (ctx context.Context , subject * api.RemoteRootSyncSet ) ([]applyset.ApplyableObject , error ) {
295
+ sourceFormat := subject .GetSpec ().GetTemplate ().GetSourceFormat ()
296
+ switch sourceFormat {
297
+ case "oci" :
298
+ return r .buildObjectsToApplyFromOci (ctx , subject )
299
+ case "package" :
300
+ return r .buildObjectsToApplyFromPackage (ctx , subject )
301
+ default :
302
+ return nil , fmt .Errorf ("unknown sourceFormat %q" , sourceFormat )
303
+ }
304
+ }
305
+
306
+ func (r * RemoteRootSyncSetReconciler ) buildObjectsToApplyFromOci (ctx context.Context , subject * api.RemoteRootSyncSet ) ([]applyset.ApplyableObject , error ) {
284
307
repository := subject .GetSpec ().GetTemplate ().GetOCI ().GetRepository ()
285
308
if repository == "" {
286
309
return nil , fmt .Errorf ("spec.template.oci.repository is not set" )
@@ -313,17 +336,132 @@ func (r *RemoteRootSyncSetReconciler) BuildObjectsToApply(ctx context.Context, s
313
336
return applyables , nil
314
337
}
315
338
339
+ func (r * RemoteRootSyncSetReconciler ) buildObjectsToApplyFromPackage (ctx context.Context , subject * api.RemoteRootSyncSet ) ([]applyset.ApplyableObject , error ) {
340
+ packageName := subject .GetSpec ().GetTemplate ().GetPackageRef ().GetName ()
341
+ if packageName == "" {
342
+ return nil , fmt .Errorf ("spec.template.packageRef.name is not set" )
343
+ }
344
+
345
+ ns := subject .GetNamespace ()
346
+
347
+ var packageRevisions porchapi.PackageRevisionList
348
+ // Note that latest revision is planned for removal: #3672
349
+
350
+ // TODO: publish package name as label?
351
+ // TODO: Make package a first class concept?
352
+ // TODO: Have some indicator of latest revision?
353
+ if err := r .client .List (ctx , & packageRevisions , client .InNamespace (ns )); err != nil {
354
+ // Not found here is unexpected
355
+ return nil , fmt .Errorf ("error listing package revisions: %w" , err )
356
+ }
357
+
358
+ var latestPackageRevision * porchapi.PackageRevision
359
+ for i := range packageRevisions .Items {
360
+ candidate := & packageRevisions .Items [i ]
361
+ if candidate .Spec .PackageName != packageName {
362
+ continue
363
+ }
364
+ if ! strings .Contains (candidate .Spec .RepositoryName , "deployment" ) {
365
+ // TODO: How can we only pick up deployment packages? Probably labels...
366
+ klog .Warningf ("HACK: ignoring package that does not appear to be a deployment package" )
367
+ continue
368
+ }
369
+
370
+ candidateRevision := candidate .Spec .Revision
371
+ if ! strings .HasPrefix (candidateRevision , "v" ) {
372
+ klog .Warningf ("ignoring revision %q with unexpected format %q" , candidate .Name , candidateRevision )
373
+ continue
374
+ }
375
+
376
+ if latestPackageRevision == nil {
377
+ latestPackageRevision = candidate
378
+ } else {
379
+ latestRevision := latestPackageRevision .Spec .Revision
380
+
381
+ if ! strings .HasPrefix (latestRevision , "v" ) {
382
+ return nil , fmt .Errorf ("unexpected revision format %q" , latestRevision )
383
+ }
384
+ latestRevision = strings .TrimPrefix (latestRevision , "v" )
385
+
386
+ if ! strings .HasPrefix (candidateRevision , "v" ) {
387
+ return nil , fmt .Errorf ("unexpected revision format %q" , candidateRevision )
388
+ }
389
+ candidateRevision = strings .TrimPrefix (candidateRevision , "v" )
390
+
391
+ latestRevisionInt , err := strconv .Atoi (latestRevision )
392
+ if err != nil {
393
+ return nil , fmt .Errorf ("unexpected revision format %q" , latestRevision )
394
+ }
395
+
396
+ candidateRevisionInt , err := strconv .Atoi (candidateRevision )
397
+ if err != nil {
398
+ return nil , fmt .Errorf ("unexpected revision format %q" , candidateRevision )
399
+ }
400
+
401
+ if candidateRevisionInt == latestRevisionInt {
402
+ return nil , fmt .Errorf ("found two package revision with same revision: %q and %q" , candidate .Name , latestPackageRevision .Name )
403
+ }
404
+
405
+ if candidateRevisionInt > latestRevisionInt {
406
+ latestPackageRevision = candidate
407
+ }
408
+ }
409
+ }
410
+ if latestPackageRevision == nil {
411
+ return nil , fmt .Errorf ("cannot find latest version of package %q in namespace %q" , packageName , ns )
412
+ }
413
+
414
+ id := types.NamespacedName {
415
+ Namespace : latestPackageRevision .Namespace ,
416
+ Name : latestPackageRevision .Name ,
417
+ }
418
+ klog .Infof ("found latest package %q" , id )
419
+ latestPackageRevisionResources := & porchapi.PackageRevisionResources {}
420
+ if err := r .uncachedClient .Get (ctx , id , latestPackageRevisionResources ); err != nil {
421
+ // Not found here is unexpected
422
+ return nil , fmt .Errorf ("error getting package revision resources for %v: %w" , id , err )
423
+ }
424
+
425
+ unstructureds , err := objects.Parser {}.AsUnstructureds (latestPackageRevisionResources .Spec .Resources )
426
+ if err != nil {
427
+ return nil , err
428
+ }
429
+
430
+ var applyables []applyset.ApplyableObject
431
+ for _ , u := range unstructureds {
432
+ applyables = append (applyables , u )
433
+ }
434
+ return applyables , nil
435
+ }
436
+
316
437
// SetupWithManager sets up the controller with the Manager.
317
438
func (r * RemoteRootSyncSetReconciler ) SetupWithManager (mgr ctrl.Manager ) error {
318
439
if err := api .AddToScheme (mgr .GetScheme ()); err != nil {
319
440
return err
320
441
}
442
+ if err := porchapi .AddToScheme (mgr .GetScheme ()); err != nil {
443
+ return err
444
+ }
321
445
322
- if err := r .RemoteClientGetter .Init (mgr ); err != nil {
446
+ if err := r .remoteClientGetter .Init (mgr ); err != nil {
323
447
return err
324
448
}
325
449
326
- r .Client = mgr .GetClient ()
450
+ r .client = mgr .GetClient ()
451
+
452
+ // We need an uncachedClient to query objects directly.
453
+ // In particular we don't want to watch PackageRevisionResources,
454
+ // they are large so would have a large memory footprint,
455
+ // and we don't want to support watch on them anyway.
456
+ // If you need to watch PackageRevisionResources, you can watch PackageRevisions instead.
457
+ uncachedClient , err := client .New (mgr .GetConfig (), client.Options {
458
+ Scheme : mgr .GetScheme (),
459
+ Mapper : mgr .GetRESTMapper (),
460
+ })
461
+ if err != nil {
462
+ return fmt .Errorf ("creating uncached client: %w" , err )
463
+ }
464
+ r .uncachedClient = uncachedClient
327
465
328
466
if err := ctrl .NewControllerManagedBy (mgr ).
329
467
For (& api.RemoteRootSyncSet {}).
0 commit comments