Skip to content

Commit 971cb4c

Browse files
authored
Watch for PackageRevisions (#3644)
1 parent aa271f2 commit 971cb4c

File tree

13 files changed

+234
-57
lines changed

13 files changed

+234
-57
lines changed

porch/pkg/apiserver/apiserver.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,10 +216,13 @@ func (c completedConfig) New() (*PorchServer, error) {
216216
referenceResolver := porch.NewReferenceResolver(coreClient)
217217
userInfoProvider := &porch.ApiserverUserInfoProvider{}
218218

219+
watcherMgr := engine.NewWatcherManager()
220+
219221
cache := cache.NewCache(c.ExtraConfig.CacheDirectory, cache.CacheOptions{
220222
CredentialResolver: credentialResolver,
221223
UserInfoProvider: userInfoProvider,
222224
MetadataStore: metadataStore,
225+
ObjectNotifier: watcherMgr,
223226
})
224227

225228
runnerOptionsResolver := func(namespace string) fnruntime.RunnerOptions {
@@ -247,6 +250,7 @@ func (c completedConfig) New() (*PorchServer, error) {
247250
engine.WithReferenceResolver(referenceResolver),
248251
engine.WithUserInfoProvider(userInfoProvider),
249252
engine.WithMetadataStore(metadataStore),
253+
engine.WithWatcherManager(watcherMgr),
250254
)
251255
if err != nil {
252256
return nil, err

porch/pkg/cache/cache.go

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"github.com/GoogleContainerTools/kpt/porch/pkg/oci"
2828
"github.com/GoogleContainerTools/kpt/porch/pkg/repository"
2929
"go.opentelemetry.io/otel/trace"
30+
"k8s.io/apimachinery/pkg/watch"
3031
)
3132

3233
// Cache allows us to keep state for repositories, rather than querying them every time.
@@ -47,33 +48,31 @@ type Cache struct {
4748
userInfoProvider repository.UserInfoProvider
4849
metadataStore meta.MetadataStore
4950

50-
objectCache *objectCache
51+
objectNotifier objectNotifier
52+
}
53+
54+
type objectNotifier interface {
55+
NotifyPackageRevisionChange(eventType watch.EventType, obj repository.PackageRevision, objMeta meta.PackageRevisionMeta)
5156
}
5257

5358
type CacheOptions struct {
5459
CredentialResolver repository.CredentialResolver
5560
UserInfoProvider repository.UserInfoProvider
5661
MetadataStore meta.MetadataStore
62+
ObjectNotifier objectNotifier
5763
}
5864

5965
func NewCache(cacheDir string, opts CacheOptions) *Cache {
60-
objectCache := &objectCache{}
61-
6266
return &Cache{
6367
repositories: make(map[string]*cachedRepository),
6468
cacheDir: cacheDir,
6569
credentialResolver: opts.CredentialResolver,
6670
userInfoProvider: opts.UserInfoProvider,
6771
metadataStore: opts.MetadataStore,
68-
objectCache: objectCache,
72+
objectNotifier: opts.ObjectNotifier,
6973
}
7074
}
7175

72-
// ObjectCache() is a cache of all our objects.
73-
func (c *Cache) ObjectCache() ObjectCache {
74-
return c.objectCache
75-
}
76-
7776
func (c *Cache) OpenRepository(ctx context.Context, repositorySpec *configapi.Repository) (*cachedRepository, error) {
7877
ctx, span := tracer.Start(ctx, "Cache::OpenRepository", trace.WithAttributes())
7978
defer span.End()
@@ -95,7 +94,7 @@ func (c *Cache) OpenRepository(ctx context.Context, repositorySpec *configapi.Re
9594
if err != nil {
9695
return nil, err
9796
}
98-
cr = newRepository(key, repositorySpec, r, c.objectCache, c.metadataStore)
97+
cr = newRepository(key, repositorySpec, r, c.objectNotifier, c.metadataStore)
9998
c.repositories[key] = cr
10099
}
101100
return cr, nil
@@ -131,7 +130,7 @@ func (c *Cache) OpenRepository(ctx context.Context, repositorySpec *configapi.Re
131130
}); err != nil {
132131
return nil, err
133132
} else {
134-
cr = newRepository(key, repositorySpec, r, c.objectCache, c.metadataStore)
133+
cr = newRepository(key, repositorySpec, r, c.objectNotifier, c.metadataStore)
135134
c.repositories[key] = cr
136135
}
137136
} else {

porch/pkg/cache/cache_test.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,10 @@ import (
2323

2424
api "github.com/GoogleContainerTools/kpt/porch/api/porch/v1alpha1"
2525
"github.com/GoogleContainerTools/kpt/porch/api/porchconfig/v1alpha1"
26+
fakecache "github.com/GoogleContainerTools/kpt/porch/pkg/cache/fake"
2627
"github.com/GoogleContainerTools/kpt/porch/pkg/git"
2728
"github.com/GoogleContainerTools/kpt/porch/pkg/meta"
28-
"github.com/GoogleContainerTools/kpt/porch/pkg/meta/fake"
29+
fakemeta "github.com/GoogleContainerTools/kpt/porch/pkg/meta/fake"
2930
"github.com/GoogleContainerTools/kpt/porch/pkg/repository"
3031
gogit "github.com/go-git/go-git/v5"
3132
"github.com/google/go-cmp/cmp"
@@ -134,7 +135,8 @@ func openRepositoryFromArchive(t *testing.T, ctx context.Context, testPath, name
134135
metadataStore := createMetadataStoreFromArchive(t, "", "")
135136

136137
cache := NewCache(t.TempDir(), CacheOptions{
137-
MetadataStore: metadataStore,
138+
MetadataStore: metadataStore,
139+
ObjectNotifier: &fakecache.ObjectNotifier{},
138140
})
139141
cachedGit, err := cache.OpenRepository(ctx, &v1alpha1.Repository{
140142
TypeMeta: metav1.TypeMeta{
@@ -170,7 +172,7 @@ func createMetadataStoreFromArchive(t *testing.T, testPath, name string) meta.Me
170172
t.Fatalf("Error reading metadata file found for repository %s", name)
171173
}
172174
if os.IsNotExist(err) {
173-
return &fake.MemoryMetadataStore{
175+
return &fakemeta.MemoryMetadataStore{
174176
Metas: []meta.PackageRevisionMeta{},
175177
}
176178
}
@@ -180,7 +182,7 @@ func createMetadataStoreFromArchive(t *testing.T, testPath, name string) meta.Me
180182
t.Fatalf("Error unmarshalling metadata file for repository %s", name)
181183
}
182184

183-
return &fake.MemoryMetadataStore{
185+
return &fakemeta.MemoryMetadataStore{
184186
Metas: metas,
185187
}
186188
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
// Copyright 2022 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package fake
16+
17+
import (
18+
"github.com/GoogleContainerTools/kpt/porch/pkg/meta"
19+
"github.com/GoogleContainerTools/kpt/porch/pkg/repository"
20+
"k8s.io/apimachinery/pkg/watch"
21+
)
22+
23+
type ObjectNotifier struct{}
24+
25+
func (o *ObjectNotifier) NotifyPackageRevisionChange(watch.EventType, repository.PackageRevision, meta.PackageRevisionMeta) {
26+
}

porch/pkg/cache/repository.go

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -62,20 +62,20 @@ type cachedRepository struct {
6262
refreshRevisionsError error
6363
refreshPkgsError error
6464

65-
objectCache *objectCache
65+
objectNotifier objectNotifier
6666

6767
metadataStore meta.MetadataStore
6868
}
6969

70-
func newRepository(id string, repoSpec *configapi.Repository, repo repository.Repository, objectCache *objectCache, metadataStore meta.MetadataStore) *cachedRepository {
70+
func newRepository(id string, repoSpec *configapi.Repository, repo repository.Repository, objectNotifier objectNotifier, metadataStore meta.MetadataStore) *cachedRepository {
7171
ctx, cancel := context.WithCancel(context.Background())
7272
r := &cachedRepository{
73-
id: id,
74-
repoSpec: repoSpec,
75-
repo: repo,
76-
cancel: cancel,
77-
objectCache: objectCache,
78-
metadataStore: metadataStore,
73+
id: id,
74+
repoSpec: repoSpec,
75+
repo: repo,
76+
cancel: cancel,
77+
objectNotifier: objectNotifier,
78+
metadataStore: metadataStore,
7979
}
8080

8181
// TODO: Should we fetch the packages here?
@@ -357,9 +357,10 @@ func (r *cachedRepository) refreshAllCachedPackages(ctx context.Context) (map[re
357357
return nil, nil, err
358358
}
359359
// Create a map so we can quickly check if a specific PackageRevisionMeta exists.
360-
existingPkgRevCRsMap := make(map[string]bool)
361-
for _, pr := range existingPkgRevCRs {
362-
existingPkgRevCRsMap[pr.Name] = true
360+
existingPkgRevCRsMap := make(map[string]meta.PackageRevisionMeta)
361+
for i := range existingPkgRevCRs {
362+
pr := existingPkgRevCRs[i]
363+
existingPkgRevCRsMap[pr.Name] = pr
363364
}
364365

365366
// TODO: Can we avoid holding the lock for the ListPackageRevisions / identifyLatestRevisions section?
@@ -440,18 +441,26 @@ func (r *cachedRepository) refreshAllCachedPackages(ctx context.Context) (map[re
440441
// Send notification for packages that changed.
441442
for k, newPackage := range r.cachedPackageRevisions {
442443
oldPackage := oldPackageRevisions[k]
444+
metaPackage, found := existingPkgRevCRsMap[newPackage.KubeObjectName()]
445+
if !found {
446+
klog.Warningf("no PackageRev CR found for PackageRevision %s", newPackage.KubeObjectName())
447+
}
443448
if oldPackage == nil {
444-
r.objectCache.notifyPackageRevisionChange(watch.Added, newPackage)
449+
r.objectNotifier.NotifyPackageRevisionChange(watch.Added, newPackage, metaPackage)
445450
} else {
446451
// TODO: only if changed
447452
klog.Warningf("over-notifying of package updates (even on unchanged packages)")
448-
r.objectCache.notifyPackageRevisionChange(watch.Modified, newPackage)
453+
r.objectNotifier.NotifyPackageRevisionChange(watch.Modified, newPackage, metaPackage)
449454
}
450455
}
451456

452457
for k, oldPackage := range oldPackageRevisions {
458+
metaPackage, found := existingPkgRevCRsMap[oldPackage.KubeObjectName()]
459+
if !found {
460+
klog.Warningf("no PackageRev CR found for PackageRevision %s", oldPackage.KubeObjectName())
461+
}
453462
if newPackageRevisionMap[k] == nil {
454-
r.objectCache.notifyPackageRevisionChange(watch.Deleted, oldPackage)
463+
r.objectNotifier.NotifyPackageRevisionChange(watch.Deleted, oldPackage, metaPackage)
455464
}
456465
}
457466

porch/pkg/engine/engine.go

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ import (
4444
"k8s.io/apimachinery/pkg/runtime"
4545
"k8s.io/apimachinery/pkg/runtime/schema"
4646
"k8s.io/apimachinery/pkg/types"
47+
"k8s.io/apimachinery/pkg/watch"
4748
"k8s.io/klog/v2"
4849
"sigs.k8s.io/controller-runtime/pkg/client"
4950
"sigs.k8s.io/kustomize/kyaml/comments"
@@ -55,7 +56,7 @@ var tracer = otel.Tracer("engine")
5556

5657
type CaDEngine interface {
5758
// ObjectCache() is a cache of all our objects.
58-
ObjectCache() cache.ObjectCache
59+
ObjectCache() WatcherManager
5960

6061
UpdatePackageResources(ctx context.Context, repositoryObj *configapi.Repository, oldPackage *PackageRevision, old, new *api.PackageRevisionResources) (*PackageRevision, *api.RenderStatus, error)
6162
ListFunctions(ctx context.Context, repositoryObj *configapi.Repository) ([]*Function, error)
@@ -83,6 +84,16 @@ func (p *Package) KubeObjectName() string {
8384
return p.repoPackage.KubeObjectName()
8485
}
8586

87+
// TODO: This is a bit awkward, and we should see if there is a way to avoid
88+
// having to expose this function. Any functionality that requires creating new
89+
// engine.PackageRevision resources should be in the engine package.
90+
func ToPackageRevision(pkgRev repository.PackageRevision, pkgRevMeta meta.PackageRevisionMeta) *PackageRevision {
91+
return &PackageRevision{
92+
repoPackageRevision: pkgRev,
93+
packageRevisionMeta: pkgRevMeta,
94+
}
95+
}
96+
8697
type PackageRevision struct {
8798
repoPackageRevision repository.PackageRevision
8899
packageRevisionMeta meta.PackageRevisionMeta
@@ -149,6 +160,7 @@ type cadEngine struct {
149160
referenceResolver ReferenceResolver
150161
userInfoProvider repository.UserInfoProvider
151162
metadataStore meta.MetadataStore
163+
watcherManager *watcherManager
152164
}
153165

154166
var _ CaDEngine = &cadEngine{}
@@ -158,8 +170,8 @@ type mutation interface {
158170
}
159171

160172
// ObjectCache is a cache of all our objects.
161-
func (cad *cadEngine) ObjectCache() cache.ObjectCache {
162-
return cad.cache.ObjectCache()
173+
func (cad *cadEngine) ObjectCache() WatcherManager {
174+
return cad.watcherManager
163175
}
164176

165177
func (cad *cadEngine) OpenRepository(ctx context.Context, repositorySpec *configapi.Repository) (repository.Repository, error) {
@@ -327,6 +339,7 @@ func (cad *cadEngine) CreatePackageRevision(ctx context.Context, repositoryObj *
327339
if err != nil {
328340
return nil, err
329341
}
342+
cad.watcherManager.NotifyPackageRevisionChange(watch.Added, repoPkgRev, pkgRevMeta)
330343
return &PackageRevision{
331344
repoPackageRevision: repoPkgRev,
332345
packageRevisionMeta: pkgRevMeta,
@@ -539,6 +552,7 @@ func (cad *cadEngine) UpdatePackageRevision(ctx context.Context, repositoryObj *
539552
}
540553
cad.metadataStore.Update(ctx, pkgRevMeta)
541554

555+
cad.watcherManager.NotifyPackageRevisionChange(watch.Modified, repoPkgRev, pkgRevMeta)
542556
return &PackageRevision{
543557
repoPackageRevision: repoPkgRev,
544558
packageRevisionMeta: pkgRevMeta,
@@ -560,8 +574,19 @@ func (cad *cadEngine) UpdatePackageRevision(ctx context.Context, repositoryObj *
560574
if err != nil {
561575
return nil, err
562576
}
577+
578+
pkgRevMeta := meta.PackageRevisionMeta{
579+
Name: repoPkgRev.KubeObjectName(),
580+
Namespace: repoPkgRev.KubeObjectNamespace(),
581+
Labels: newObj.Labels,
582+
Annotations: newObj.Annotations,
583+
}
584+
cad.metadataStore.Update(ctx, pkgRevMeta)
585+
586+
cad.watcherManager.NotifyPackageRevisionChange(watch.Modified, repoPkgRev, pkgRevMeta)
563587
return &PackageRevision{
564588
repoPackageRevision: repoPkgRev,
589+
packageRevisionMeta: pkgRevMeta,
565590
}, nil
566591
}
567592

@@ -664,6 +689,7 @@ func (cad *cadEngine) UpdatePackageRevision(ctx context.Context, repositoryObj *
664689
}
665690
cad.metadataStore.Update(ctx, pkgRevMeta)
666691

692+
cad.watcherManager.NotifyPackageRevisionChange(watch.Modified, repoPkgRev, pkgRevMeta)
667693
return &PackageRevision{
668694
repoPackageRevision: repoPkgRev,
669695
packageRevisionMeta: pkgRevMeta,
@@ -800,6 +826,7 @@ func (cad *cadEngine) DeletePackageRevision(ctx context.Context, repositoryObj *
800826
return err
801827
}
802828

829+
cad.watcherManager.NotifyPackageRevisionChange(watch.Deleted, oldPackage.repoPackageRevision, oldPackage.packageRevisionMeta)
803830
return nil
804831
}
805832

porch/pkg/engine/options.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,3 +127,10 @@ func WithMetadataStore(metadataStore meta.MetadataStore) EngineOption {
127127
return nil
128128
})
129129
}
130+
131+
func WithWatcherManager(watcherManager *watcherManager) EngineOption {
132+
return EngineOptionFunc(func(engine *cadEngine) error {
133+
engine.watcherManager = watcherManager
134+
return nil
135+
})
136+
}

0 commit comments

Comments
 (0)