diff --git a/.golangci.yml b/.golangci.yml index 4c43665e2b..6dcbfb64d6 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -13,7 +13,6 @@ linters: - exhaustive - exportloopref - ginkgolinter - - goconst - gocritic - gocyclo - gofmt diff --git a/examples/kcp/go.mod b/examples/kcp/go.mod index db3f29bbaf..123278f6bf 100644 --- a/examples/kcp/go.mod +++ b/examples/kcp/go.mod @@ -50,6 +50,7 @@ require ( github.com/google/uuid v1.3.1 // indirect github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 // indirect + github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect github.com/imdario/mergo v0.3.13 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/josharian/intern v1.0.0 // indirect diff --git a/examples/kcp/go.sum b/examples/kcp/go.sum index a85a6c0f70..25e2c1f872 100644 --- a/examples/kcp/go.sum +++ b/examples/kcp/go.sum @@ -96,6 +96,8 @@ github.com/grpc-ecosystem/grpc-gateway v1.16.0 h1:gmcG1KaJ57LophUzW0Hy8NmPhnMZb4 github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 h1:YBftPWNWd4WwGqtY2yeZL2ef8rHAxPBD8KFhJpmcqms= github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0/go.mod h1:YN5jB8ie0yfIUg6VvR9Kz84aCaG7AsGZnLjhHbUqwPg= +github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= +github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/imdario/mergo v0.3.13 h1:lFzP57bqS/wsqKssCGmtLAb8A0wKjLGrve2q3PPVcBk= github.com/imdario/mergo v0.3.13/go.mod h1:4lJ1jqUDcsbIECGy0RUJAXNIhg+6ocWgb1ALK2O4oXg= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= diff --git a/go.mod b/go.mod index 03527ada6f..c0cf175190 100644 --- a/go.mod +++ b/go.mod @@ -31,6 +31,7 @@ require ( ) require ( + github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/kcp-dev/apimachinery/v2 v2.0.0-alpha.0.0.20230926071920-57d168bcbe34 github.com/kcp-dev/logicalcluster/v3 v3.0.5 ) diff --git a/go.sum b/go.sum index a5a2ef8c22..69320f082a 100644 --- a/go.sum +++ b/go.sum @@ -65,6 +65,8 @@ github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4= github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 h1:YBftPWNWd4WwGqtY2yeZL2ef8rHAxPBD8KFhJpmcqms= github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0/go.mod h1:YN5jB8ie0yfIUg6VvR9Kz84aCaG7AsGZnLjhHbUqwPg= +github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= +github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/imdario/mergo v0.3.13 h1:lFzP57bqS/wsqKssCGmtLAb8A0wKjLGrve2q3PPVcBk= github.com/imdario/mergo v0.3.13/go.mod h1:4lJ1jqUDcsbIECGy0RUJAXNIhg+6ocWgb1ALK2O4oXg= diff --git a/pkg/client/client.go b/pkg/client/client.go index a3eb0ff5e2..26474cc251 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -23,6 +23,8 @@ import ( "net/http" "strings" + lru "github.com/hashicorp/golang-lru/v2" + "github.com/kcp-dev/logicalcluster/v3" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -47,6 +49,10 @@ type Options struct { // Mapper, if provided, will be used to map GroupVersionKinds to Resources Mapper meta.RESTMapper + // MapperWithContext, if provided, will be used to map GroupVersionKinds to Resources. + // This overrides Mapper if set. + MapperWithContext func(context.Context) (meta.RESTMapper, error) + // Cache, if provided, is used to read objects from the cache. Cache *CacheOptions @@ -56,6 +62,10 @@ type Options struct { // DryRun instructs the client to only perform dry run requests. DryRun *bool + + // KcpClusterDiscoveryCacheSize is the size of the cache for cluster discovery + // information backing the client's REST mapper. + KcpClusterDiscoveryCacheSize int } // WarningHandlerOptions are options for configuring a @@ -170,16 +180,27 @@ func newClient(config *rest.Config, options Options) (*client, error) { } } + if options.KcpClusterDiscoveryCacheSize == 0 { + options.KcpClusterDiscoveryCacheSize = 1000 + } + + // Init a MapperWithContext if none provided + if options.MapperWithContext == nil { + options.MapperWithContext = func(context.Context) (meta.RESTMapper, error) { return options.Mapper, nil } + } + resources := &clientRestResources{ httpClient: options.HTTPClient, config: config, scheme: options.Scheme, - mapper: options.Mapper, + mapper: options.MapperWithContext, codecs: serializer.NewCodecFactory(options.Scheme), - - structuredResourceByType: make(map[schema.GroupVersionKind]*resourceMeta), - unstructuredResourceByType: make(map[schema.GroupVersionKind]*resourceMeta), } + cr, err := lru.New[logicalcluster.Path, clusterResources](options.KcpClusterDiscoveryCacheSize) + if err != nil { + return nil, err + } + resources.clusterResources = cr rawMetaClient, err := metadata.NewForConfigAndClient(metadata.ConfigFor(config), options.HTTPClient) if err != nil { @@ -197,11 +218,16 @@ func newClient(config *rest.Config, options Options) (*client, error) { }, metadataClient: metadataClient{ client: rawMetaClient, - restMapper: options.Mapper, + restMapper: options.MapperWithContext, }, scheme: options.Scheme, mapper: options.Mapper, } + mapperCache, err := lru.New[logicalcluster.Name, meta.RESTMapper](options.KcpClusterDiscoveryCacheSize) + if err != nil { + return nil, err + } + c.metadataClient.mapperCache = mapperCache if options.Cache == nil || options.Cache.Reader == nil { return c, nil } diff --git a/pkg/client/client_rest_resources.go b/pkg/client/client_rest_resources.go index 2d07879520..1247879e5e 100644 --- a/pkg/client/client_rest_resources.go +++ b/pkg/client/client_rest_resources.go @@ -17,10 +17,13 @@ limitations under the License. package client import ( + "context" "net/http" "strings" "sync" + lru "github.com/hashicorp/golang-lru/v2" + "github.com/kcp-dev/logicalcluster/v3" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -28,8 +31,18 @@ import ( "k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/client-go/rest" "sigs.k8s.io/controller-runtime/pkg/client/apiutil" + "sigs.k8s.io/controller-runtime/pkg/kontext" ) +type clusterResources struct { + mapper meta.RESTMapper + + // structuredResourceByType stores structured type metadata + structuredResourceByType map[schema.GroupVersionKind]*resourceMeta + // unstructuredResourceByType stores unstructured type metadata + unstructuredResourceByType map[schema.GroupVersionKind]*resourceMeta +} + // clientRestResources creates and stores rest clients and metadata for Kubernetes types. type clientRestResources struct { // httpClient is the http client to use for requests @@ -42,21 +55,18 @@ type clientRestResources struct { scheme *runtime.Scheme // mapper maps GroupVersionKinds to Resources - mapper meta.RESTMapper + mapper func(ctx context.Context) (meta.RESTMapper, error) // codecs are used to create a REST client for a gvk codecs serializer.CodecFactory - // structuredResourceByType stores structured type metadata - structuredResourceByType map[schema.GroupVersionKind]*resourceMeta - // unstructuredResourceByType stores unstructured type metadata - unstructuredResourceByType map[schema.GroupVersionKind]*resourceMeta - mu sync.RWMutex + clusterResources *lru.Cache[logicalcluster.Path, clusterResources] + mu sync.RWMutex } // newResource maps obj to a Kubernetes Resource and constructs a client for that Resource. // If the object is a list, the resource represents the item's type instead. -func (c *clientRestResources) newResource(gvk schema.GroupVersionKind, isList, isUnstructured bool) (*resourceMeta, error) { +func (c *clientRestResources) newResource(gvk schema.GroupVersionKind, isList, isUnstructured bool, mapper meta.RESTMapper) (*resourceMeta, error) { if strings.HasSuffix(gvk.Kind, "List") && isList { // if this was a list, treat it as a request for the item's resource gvk.Kind = gvk.Kind[:len(gvk.Kind)-4] @@ -66,7 +76,7 @@ func (c *clientRestResources) newResource(gvk schema.GroupVersionKind, isList, i if err != nil { return nil, err } - mapping, err := c.mapper.RESTMapping(gvk.GroupKind(), gvk.Version) + mapping, err := mapper.RESTMapping(gvk.GroupKind(), gvk.Version) if err != nil { return nil, err } @@ -75,7 +85,7 @@ func (c *clientRestResources) newResource(gvk schema.GroupVersionKind, isList, i // getResource returns the resource meta information for the given type of object. // If the object is a list, the resource represents the item's type instead. -func (c *clientRestResources) getResource(obj runtime.Object) (*resourceMeta, error) { +func (c *clientRestResources) getResource(ctx context.Context, obj runtime.Object) (*resourceMeta, error) { gvk, err := apiutil.GVKForObject(obj, c.scheme) if err != nil { return nil, err @@ -86,9 +96,25 @@ func (c *clientRestResources) getResource(obj runtime.Object) (*resourceMeta, er // It's better to do creation work twice than to not let multiple // people make requests at once c.mu.RLock() - resourceByType := c.structuredResourceByType + cluster, _ := kontext.ClusterFrom(ctx) + cr, found := c.clusterResources.Get(cluster.Path()) + if !found { + m, err := c.mapper(ctx) + if err != nil { + c.mu.RUnlock() + return nil, err + } + cr = clusterResources{ + mapper: m, + structuredResourceByType: make(map[schema.GroupVersionKind]*resourceMeta), + unstructuredResourceByType: make(map[schema.GroupVersionKind]*resourceMeta), + } + c.clusterResources.Purge() + c.clusterResources.Add(cluster.Path(), cr) + } + resourceByType := cr.structuredResourceByType if isUnstructured { - resourceByType = c.unstructuredResourceByType + resourceByType = cr.unstructuredResourceByType } r, known := resourceByType[gvk] c.mu.RUnlock() @@ -100,7 +126,7 @@ func (c *clientRestResources) getResource(obj runtime.Object) (*resourceMeta, er // Initialize a new Client c.mu.Lock() defer c.mu.Unlock() - r, err = c.newResource(gvk, meta.IsListType(obj), isUnstructured) + r, err = c.newResource(gvk, meta.IsListType(obj), isUnstructured, cr.mapper) if err != nil { return nil, err } @@ -109,8 +135,8 @@ func (c *clientRestResources) getResource(obj runtime.Object) (*resourceMeta, er } // getObjMeta returns objMeta containing both type and object metadata and state. -func (c *clientRestResources) getObjMeta(obj runtime.Object) (*objMeta, error) { - r, err := c.getResource(obj) +func (c *clientRestResources) getObjMeta(ctx context.Context, obj runtime.Object) (*objMeta, error) { + r, err := c.getResource(ctx, obj) if err != nil { return nil, err } diff --git a/pkg/client/metadata_client.go b/pkg/client/metadata_client.go index d0c6b8e13a..8368035607 100644 --- a/pkg/client/metadata_client.go +++ b/pkg/client/metadata_client.go @@ -20,11 +20,15 @@ import ( "context" "fmt" "strings" + "sync" + lru "github.com/hashicorp/golang-lru/v2" + "github.com/kcp-dev/logicalcluster/v3" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/metadata" + "sigs.k8s.io/controller-runtime/pkg/kontext" ) // TODO(directxman12): we could rewrite this on top of the low-level REST @@ -34,12 +38,28 @@ import ( // metadataClient is a client that reads & writes metadata-only requests to/from the API server. type metadataClient struct { - client metadata.Interface - restMapper meta.RESTMapper + client metadata.Interface + restMapper func(ctx context.Context) (meta.RESTMapper, error) + mu sync.Mutex + mapperCache *lru.Cache[logicalcluster.Name, meta.RESTMapper] } -func (mc *metadataClient) getResourceInterface(gvk schema.GroupVersionKind, ns string) (metadata.ResourceInterface, error) { - mapping, err := mc.restMapper.RESTMapping(gvk.GroupKind(), gvk.Version) +func (mc *metadataClient) getResourceInterface(ctx context.Context, gvk schema.GroupVersionKind, ns string) (metadata.ResourceInterface, error) { + cluster, _ := kontext.ClusterFrom(ctx) + mc.mu.Lock() + mapper, _ := mc.mapperCache.Get(cluster) + if mapper == nil { + var err error + mapper, err = mc.restMapper(ctx) + if err != nil { + mc.mu.Unlock() + return nil, err + } + mc.mapperCache.Add(cluster, mapper) + } + mc.mu.Unlock() + + mapping, err := mapper.RESTMapping(gvk.GroupKind(), gvk.Version) if err != nil { return nil, err } @@ -56,7 +76,7 @@ func (mc *metadataClient) Delete(ctx context.Context, obj Object, opts ...Delete return fmt.Errorf("metadata client did not understand object: %T", obj) } - resInt, err := mc.getResourceInterface(metadata.GroupVersionKind(), metadata.Namespace) + resInt, err := mc.getResourceInterface(ctx, metadata.GroupVersionKind(), metadata.Namespace) if err != nil { return err } @@ -77,7 +97,7 @@ func (mc *metadataClient) DeleteAllOf(ctx context.Context, obj Object, opts ...D deleteAllOfOpts := DeleteAllOfOptions{} deleteAllOfOpts.ApplyOptions(opts) - resInt, err := mc.getResourceInterface(metadata.GroupVersionKind(), deleteAllOfOpts.ListOptions.Namespace) + resInt, err := mc.getResourceInterface(ctx, metadata.GroupVersionKind(), deleteAllOfOpts.ListOptions.Namespace) if err != nil { return err } @@ -93,7 +113,7 @@ func (mc *metadataClient) Patch(ctx context.Context, obj Object, patch Patch, op } gvk := metadata.GroupVersionKind() - resInt, err := mc.getResourceInterface(gvk, metadata.Namespace) + resInt, err := mc.getResourceInterface(ctx, gvk, metadata.Namespace) if err != nil { return err } @@ -127,7 +147,7 @@ func (mc *metadataClient) Get(ctx context.Context, key ObjectKey, obj Object, op getOpts := GetOptions{} getOpts.ApplyOptions(opts) - resInt, err := mc.getResourceInterface(gvk, key.Namespace) + resInt, err := mc.getResourceInterface(ctx, gvk, key.Namespace) if err != nil { return err } @@ -154,7 +174,7 @@ func (mc *metadataClient) List(ctx context.Context, obj ObjectList, opts ...List listOpts := ListOptions{} listOpts.ApplyOptions(opts) - resInt, err := mc.getResourceInterface(gvk, listOpts.Namespace) + resInt, err := mc.getResourceInterface(ctx, gvk, listOpts.Namespace) if err != nil { return err } @@ -175,7 +195,7 @@ func (mc *metadataClient) PatchSubResource(ctx context.Context, obj Object, subR } gvk := metadata.GroupVersionKind() - resInt, err := mc.getResourceInterface(gvk, metadata.Namespace) + resInt, err := mc.getResourceInterface(ctx, gvk, metadata.Namespace) if err != nil { return err } diff --git a/pkg/client/typed_client.go b/pkg/client/typed_client.go index 92afd9a9c2..aff18d8468 100644 --- a/pkg/client/typed_client.go +++ b/pkg/client/typed_client.go @@ -32,7 +32,7 @@ type typedClient struct { // Create implements client.Client. func (c *typedClient) Create(ctx context.Context, obj Object, opts ...CreateOption) error { - o, err := c.resources.getObjMeta(obj) + o, err := c.resources.getObjMeta(ctx, obj) if err != nil { return err } @@ -51,7 +51,7 @@ func (c *typedClient) Create(ctx context.Context, obj Object, opts ...CreateOpti // Update implements client.Client. func (c *typedClient) Update(ctx context.Context, obj Object, opts ...UpdateOption) error { - o, err := c.resources.getObjMeta(obj) + o, err := c.resources.getObjMeta(ctx, obj) if err != nil { return err } @@ -71,7 +71,7 @@ func (c *typedClient) Update(ctx context.Context, obj Object, opts ...UpdateOpti // Delete implements client.Client. func (c *typedClient) Delete(ctx context.Context, obj Object, opts ...DeleteOption) error { - o, err := c.resources.getObjMeta(obj) + o, err := c.resources.getObjMeta(ctx, obj) if err != nil { return err } @@ -90,7 +90,7 @@ func (c *typedClient) Delete(ctx context.Context, obj Object, opts ...DeleteOpti // DeleteAllOf implements client.Client. func (c *typedClient) DeleteAllOf(ctx context.Context, obj Object, opts ...DeleteAllOfOption) error { - o, err := c.resources.getObjMeta(obj) + o, err := c.resources.getObjMeta(ctx, obj) if err != nil { return err } @@ -109,7 +109,7 @@ func (c *typedClient) DeleteAllOf(ctx context.Context, obj Object, opts ...Delet // Patch implements client.Client. func (c *typedClient) Patch(ctx context.Context, obj Object, patch Patch, opts ...PatchOption) error { - o, err := c.resources.getObjMeta(obj) + o, err := c.resources.getObjMeta(ctx, obj) if err != nil { return err } @@ -134,7 +134,7 @@ func (c *typedClient) Patch(ctx context.Context, obj Object, patch Patch, opts . // Get implements client.Client. func (c *typedClient) Get(ctx context.Context, key ObjectKey, obj Object, opts ...GetOption) error { - r, err := c.resources.getResource(obj) + r, err := c.resources.getResource(ctx, obj) if err != nil { return err } @@ -149,7 +149,7 @@ func (c *typedClient) Get(ctx context.Context, key ObjectKey, obj Object, opts . // List implements client.Client. func (c *typedClient) List(ctx context.Context, obj ObjectList, opts ...ListOption) error { - r, err := c.resources.getResource(obj) + r, err := c.resources.getResource(ctx, obj) if err != nil { return err } @@ -166,7 +166,7 @@ func (c *typedClient) List(ctx context.Context, obj ObjectList, opts ...ListOpti } func (c *typedClient) GetSubResource(ctx context.Context, obj, subResourceObj Object, subResource string, opts ...SubResourceGetOption) error { - o, err := c.resources.getObjMeta(obj) + o, err := c.resources.getObjMeta(ctx, obj) if err != nil { return err } @@ -189,7 +189,7 @@ func (c *typedClient) GetSubResource(ctx context.Context, obj, subResourceObj Ob } func (c *typedClient) CreateSubResource(ctx context.Context, obj Object, subResourceObj Object, subResource string, opts ...SubResourceCreateOption) error { - o, err := c.resources.getObjMeta(obj) + o, err := c.resources.getObjMeta(ctx, obj) if err != nil { return err } @@ -214,7 +214,7 @@ func (c *typedClient) CreateSubResource(ctx context.Context, obj Object, subReso // UpdateSubResource used by SubResourceWriter to write status. func (c *typedClient) UpdateSubResource(ctx context.Context, obj Object, subResource string, opts ...SubResourceUpdateOption) error { - o, err := c.resources.getObjMeta(obj) + o, err := c.resources.getObjMeta(ctx, obj) if err != nil { return err } @@ -249,7 +249,7 @@ func (c *typedClient) UpdateSubResource(ctx context.Context, obj Object, subReso // PatchSubResource used by SubResourceWriter to write subresource. func (c *typedClient) PatchSubResource(ctx context.Context, obj Object, subResource string, patch Patch, opts ...SubResourcePatchOption) error { - o, err := c.resources.getObjMeta(obj) + o, err := c.resources.getObjMeta(ctx, obj) if err != nil { return err } diff --git a/pkg/client/unstructured_client.go b/pkg/client/unstructured_client.go index 0d96951780..872f316077 100644 --- a/pkg/client/unstructured_client.go +++ b/pkg/client/unstructured_client.go @@ -41,7 +41,7 @@ func (uc *unstructuredClient) Create(ctx context.Context, obj Object, opts ...Cr gvk := u.GetObjectKind().GroupVersionKind() - o, err := uc.resources.getObjMeta(obj) + o, err := uc.resources.getObjMeta(ctx, obj) if err != nil { return err } @@ -70,7 +70,7 @@ func (uc *unstructuredClient) Update(ctx context.Context, obj Object, opts ...Up gvk := u.GetObjectKind().GroupVersionKind() - o, err := uc.resources.getObjMeta(obj) + o, err := uc.resources.getObjMeta(ctx, obj) if err != nil { return err } @@ -97,7 +97,7 @@ func (uc *unstructuredClient) Delete(ctx context.Context, obj Object, opts ...De return fmt.Errorf("unstructured client did not understand object: %T", obj) } - o, err := uc.resources.getObjMeta(obj) + o, err := uc.resources.getObjMeta(ctx, obj) if err != nil { return err } @@ -120,7 +120,7 @@ func (uc *unstructuredClient) DeleteAllOf(ctx context.Context, obj Object, opts return fmt.Errorf("unstructured client did not understand object: %T", obj) } - o, err := uc.resources.getObjMeta(obj) + o, err := uc.resources.getObjMeta(ctx, obj) if err != nil { return err } @@ -143,7 +143,7 @@ func (uc *unstructuredClient) Patch(ctx context.Context, obj Object, patch Patch return fmt.Errorf("unstructured client did not understand object: %T", obj) } - o, err := uc.resources.getObjMeta(obj) + o, err := uc.resources.getObjMeta(ctx, obj) if err != nil { return err } @@ -178,7 +178,7 @@ func (uc *unstructuredClient) Get(ctx context.Context, key ObjectKey, obj Object getOpts := GetOptions{} getOpts.ApplyOptions(opts) - r, err := uc.resources.getResource(obj) + r, err := uc.resources.getResource(ctx, obj) if err != nil { return err } @@ -206,7 +206,7 @@ func (uc *unstructuredClient) List(ctx context.Context, obj ObjectList, opts ... gvk := u.GetObjectKind().GroupVersionKind() gvk.Kind = strings.TrimSuffix(gvk.Kind, "List") - r, err := uc.resources.getResource(obj) + r, err := uc.resources.getResource(ctx, obj) if err != nil { return err } @@ -235,7 +235,7 @@ func (uc *unstructuredClient) GetSubResource(ctx context.Context, obj, subResour subResourceObj.SetName(obj.GetName()) } - o, err := uc.resources.getObjMeta(obj) + o, err := uc.resources.getObjMeta(ctx, obj) if err != nil { return err } @@ -266,7 +266,7 @@ func (uc *unstructuredClient) CreateSubResource(ctx context.Context, obj, subRes subResourceObj.SetName(obj.GetName()) } - o, err := uc.resources.getObjMeta(obj) + o, err := uc.resources.getObjMeta(ctx, obj) if err != nil { return err } @@ -290,7 +290,7 @@ func (uc *unstructuredClient) UpdateSubResource(ctx context.Context, obj Object, return fmt.Errorf("unstructured client did not understand object: %T", obj) } - o, err := uc.resources.getObjMeta(obj) + o, err := uc.resources.getObjMeta(ctx, obj) if err != nil { return err } @@ -328,7 +328,7 @@ func (uc *unstructuredClient) PatchSubResource(ctx context.Context, obj Object, gvk := u.GetObjectKind().GroupVersionKind() - o, err := uc.resources.getObjMeta(obj) + o, err := uc.resources.getObjMeta(ctx, obj) if err != nil { return err } diff --git a/pkg/client/watch.go b/pkg/client/watch.go index 181b22a673..317c87ffc0 100644 --- a/pkg/client/watch.go +++ b/pkg/client/watch.go @@ -67,7 +67,7 @@ func (w *watchingClient) metadataWatch(ctx context.Context, obj *metav1.PartialO listOpts := w.listOpts(opts...) - resInt, err := w.client.metadataClient.getResourceInterface(gvk, listOpts.Namespace) + resInt, err := w.client.metadataClient.getResourceInterface(ctx, gvk, listOpts.Namespace) if err != nil { return nil, err } @@ -76,7 +76,7 @@ func (w *watchingClient) metadataWatch(ctx context.Context, obj *metav1.PartialO } func (w *watchingClient) unstructuredWatch(ctx context.Context, obj runtime.Unstructured, opts ...ListOption) (watch.Interface, error) { - r, err := w.client.unstructuredClient.resources.getResource(obj) + r, err := w.client.unstructuredClient.resources.getResource(ctx, obj) if err != nil { return nil, err } @@ -91,7 +91,7 @@ func (w *watchingClient) unstructuredWatch(ctx context.Context, obj runtime.Unst } func (w *watchingClient) typedWatch(ctx context.Context, obj ObjectList, opts ...ListOption) (watch.Interface, error) { - r, err := w.client.typedClient.resources.getResource(obj) + r, err := w.client.typedClient.resources.getResource(ctx, obj) if err != nil { return nil, err } diff --git a/pkg/kcp/kcp_suite_test.go b/pkg/kcp/kcp_suite_test.go new file mode 100644 index 0000000000..b81a20199e --- /dev/null +++ b/pkg/kcp/kcp_suite_test.go @@ -0,0 +1,35 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kcp_test + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + logf "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/log/zap" +) + +func TestKCP(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "kcp Suite") +} + +var _ = BeforeSuite(func() { + logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true))) +}) diff --git a/pkg/kcp/kcp_test.go b/pkg/kcp/kcp_test.go new file mode 100644 index 0000000000..a4bfce85cf --- /dev/null +++ b/pkg/kcp/kcp_test.go @@ -0,0 +1,339 @@ +package kcp + +import ( + "context" + "fmt" + "net/http" + "net/http/httptest" + "sync" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/rest" + "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/kontext" +) + +var _ = Describe("NewClusterAwareClient", Ordered, func() { + var ( + srv *httptest.Server + mu sync.Mutex + paths []string + cfg *rest.Config + ) + + BeforeAll(func() { + srv = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + mu.Lock() + paths = append(paths, req.URL.Path) + mu.Unlock() + + switch req.URL.Path { + case "/api/v1", "/clusters/root/api/v1", "/clusters/*/api/v1": + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{"kind":"APIResourceList","groupVersion":"v1","resources":[{"name":"pods","singularName":"pod","namespaced":true,"kind":"Pod","verbs":["create","delete","deletecollection","get","list","patch","update","watch"],"shortNames":["po"],"categories":["all"],"storageVersionHash":"xPOwRZ+Yhw8="}]}`)) + case "/api/v1/pods", "/clusters/root/api/v1/pods", "/clusters/*/api/v1/pods": + if req.URL.Query().Get("watch") != "true" { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{"kind": "PodList","apiVersion": "v1","metadata": {"resourceVersion": "184126176"}, "items": [{"kind":"Pod","apiVersion":"v1","metadata":{"name":"foo","namespace":"default","resourceVersion":"184126176"}}]}`)) + return + } + fallthrough + case "/api/v1/namespaces/default/pods/foo", "/clusters/root/api/v1/namespaces/default/pods/foo": + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{"kind":"Pod","apiVersion":"v1","metadata":{"name":"foo","namespace":"default","resourceVersion":"184126176"}}`)) + default: + _, _ = w.Write([]byte(fmt.Sprintf("Not found %q", req.RequestURI))) + w.WriteHeader(http.StatusNotFound) + } + })) + + cfg = &rest.Config{ + Host: srv.URL, + } + Expect(rest.SetKubernetesDefaults(cfg)).To(Succeed()) + }) + + BeforeEach(func() { + mu.Lock() + defer mu.Unlock() + paths = []string{} + }) + + AfterAll(func() { + srv.Close() + }) + + Describe("with typed list", func() { + It("should work with no cluster in the kontext", func(ctx context.Context) { + cl, err := NewClusterAwareClient(cfg, client.Options{}) + Expect(err).NotTo(HaveOccurred()) + + pods := &corev1.PodList{} + err = cl.List(ctx, pods) + Expect(err).NotTo(HaveOccurred()) + + pod := &corev1.Pod{} + err = cl.Get(ctx, types.NamespacedName{Namespace: "default", Name: "foo"}, pod) + Expect(err).NotTo(HaveOccurred()) + + mu.Lock() + defer mu.Unlock() + Expect(paths).To(Equal([]string{"/api/v1", "/api/v1/pods", "/api/v1/namespaces/default/pods/foo"})) + }) + + It("should work with a cluster in the kontext", func(ctx context.Context) { + cl, err := NewClusterAwareClient(cfg, client.Options{}) + Expect(err).NotTo(HaveOccurred()) + + pods := &corev1.PodList{} + err = cl.List(kontext.WithCluster(ctx, "root"), pods) + Expect(err).NotTo(HaveOccurred()) + + pod := &corev1.Pod{} + err = cl.Get(kontext.WithCluster(ctx, "root"), types.NamespacedName{Namespace: "default", Name: "foo"}, pod) + Expect(err).NotTo(HaveOccurred()) + + mu.Lock() + defer mu.Unlock() + Expect(paths).To(Equal([]string{"/clusters/root/api/v1", "/clusters/root/api/v1/pods", "/clusters/root/api/v1/namespaces/default/pods/foo"})) + }) + + It("should work with a wildcard cluster in the kontext", func(ctx context.Context) { + cl, err := NewClusterAwareClient(cfg, client.Options{}) + Expect(err).NotTo(HaveOccurred()) + + pods := &corev1.PodList{} + err = cl.List(kontext.WithCluster(ctx, "*"), pods) + Expect(err).NotTo(HaveOccurred()) + + mu.Lock() + defer mu.Unlock() + Expect(paths).To(Equal([]string{"/clusters/*/api/v1", "/clusters/*/api/v1/pods"})) + }) + }) + + Describe("with unstructured list", func() { + It("should work with no cluster in the kontext", func(ctx context.Context) { + cl, err := NewClusterAwareClient(cfg, client.Options{}) + Expect(err).NotTo(HaveOccurred()) + + pods := &unstructured.UnstructuredList{} + pods.SetAPIVersion("v1") + pods.SetKind("PodList") + err = cl.List(ctx, pods) + Expect(err).NotTo(HaveOccurred()) + + pod := &unstructured.Unstructured{} + pod.SetAPIVersion("v1") + pod.SetKind("Pod") + err = cl.Get(ctx, types.NamespacedName{Namespace: "default", Name: "foo"}, pod) + Expect(err).NotTo(HaveOccurred()) + + mu.Lock() + defer mu.Unlock() + Expect(paths).To(Equal([]string{"/api/v1", "/api/v1/pods", "/api/v1/namespaces/default/pods/foo"})) + }) + + It("should work with a cluster in the kontext", func(ctx context.Context) { + cl, err := NewClusterAwareClient(cfg, client.Options{}) + Expect(err).NotTo(HaveOccurred()) + + pods := &unstructured.UnstructuredList{} + pods.SetAPIVersion("v1") + pods.SetKind("PodList") + err = cl.List(kontext.WithCluster(ctx, "root"), pods) + Expect(err).NotTo(HaveOccurred()) + + pod := &unstructured.Unstructured{} + pod.SetAPIVersion("v1") + pod.SetKind("Pod") + err = cl.Get(kontext.WithCluster(ctx, "root"), types.NamespacedName{Namespace: "default", Name: "foo"}, pod) + Expect(err).NotTo(HaveOccurred()) + + mu.Lock() + defer mu.Unlock() + Expect(paths).To(Equal([]string{"/clusters/root/api/v1", "/clusters/root/api/v1/pods", "/clusters/root/api/v1/namespaces/default/pods/foo"})) + }) + + It("should work with a wildcard cluster in the kontext", func(ctx context.Context) { + cl, err := NewClusterAwareClient(cfg, client.Options{}) + Expect(err).NotTo(HaveOccurred()) + + pods := &unstructured.UnstructuredList{} + pods.SetAPIVersion("v1") + pods.SetKind("PodList") + err = cl.List(kontext.WithCluster(ctx, "*"), pods) + Expect(err).NotTo(HaveOccurred()) + + mu.Lock() + defer mu.Unlock() + Expect(paths).To(Equal([]string{"/clusters/*/api/v1", "/clusters/*/api/v1/pods"})) + }) + }) + + Describe("with a metadata object", func() { + It("should work with no cluster in the kontext", func(ctx context.Context) { + cl, err := NewClusterAwareClient(cfg, client.Options{}) + Expect(err).NotTo(HaveOccurred()) + + pods := &metav1.PartialObjectMetadataList{TypeMeta: metav1.TypeMeta{APIVersion: "v1", Kind: "PodList"}} + err = cl.List(ctx, pods) + Expect(err).NotTo(HaveOccurred()) + + mu.Lock() + defer mu.Unlock() + Expect(paths).To(Equal([]string{"/api/v1", "/api/v1/pods"})) + }) + + It("should work with a cluster in the kontext", func(ctx context.Context) { + cl, err := NewClusterAwareClient(cfg, client.Options{}) + Expect(err).NotTo(HaveOccurred()) + + pods := &metav1.PartialObjectMetadataList{TypeMeta: metav1.TypeMeta{APIVersion: "v1", Kind: "PodList"}} + err = cl.List(kontext.WithCluster(ctx, "root"), pods) + Expect(err).NotTo(HaveOccurred()) + + mu.Lock() + defer mu.Unlock() + Expect(paths).To(Equal([]string{"/clusters/root/api/v1", "/clusters/root/api/v1/pods"})) + }) + + It("should work with a wildcard cluster in the kontext", func(ctx context.Context) { + cl, err := NewClusterAwareClient(cfg, client.Options{}) + Expect(err).NotTo(HaveOccurred()) + + pods := &metav1.PartialObjectMetadataList{TypeMeta: metav1.TypeMeta{APIVersion: "v1", Kind: "PodList"}} + err = cl.List(kontext.WithCluster(ctx, "*"), pods) + Expect(err).NotTo(HaveOccurred()) + + mu.Lock() + defer mu.Unlock() + Expect(paths).To(Equal([]string{"/clusters/*/api/v1", "/clusters/*/api/v1/pods"})) + }) + }) +}) + +var _ = Describe("NewClusterAwareCache", Ordered, func() { + var ( + cancelCtx context.CancelFunc + srv *httptest.Server + mu sync.Mutex + paths []string + cfg *rest.Config + c cache.Cache + ) + + BeforeAll(func() { + var ctx context.Context + ctx, cancelCtx = context.WithCancel(context.Background()) + + srv = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + mu.Lock() + pth := req.URL.Path + if req.URL.Query().Get("watch") == "true" { + pth += "?watch=true" + } + paths = append(paths, pth) + mu.Unlock() + + switch { + case req.URL.Path == "/clusters/*/api/v1": + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{"kind":"APIResourceList","groupVersion":"v1","resources":[{"name":"pods","singularName":"pod","namespaced":true,"kind":"Pod","verbs":["create","delete","deletecollection","get","list","patch","update","watch"],"shortNames":["po"],"categories":["all"],"storageVersionHash":"xPOwRZ+Yhw8="}]}`)) + case req.URL.Path == "/clusters/*/api/v1/pods" && req.URL.Query().Get("watch") != "true": + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{"kind": "PodList","apiVersion": "v1","metadata": {"resourceVersion": "184126176"}, "items": [ + {"kind":"Pod","apiVersion":"v1","metadata":{"name":"foo","namespace":"default","resourceVersion":"184126176","annotations":{"kcp.io/cluster":"root"}}}, + {"kind":"Pod","apiVersion":"v1","metadata":{"name":"foo","namespace":"default","resourceVersion":"184126093","annotations":{"kcp.io/cluster":"ws"}}} + ]}`)) + case req.URL.Path == "/clusters/*/api/v1/pods" && req.URL.Query().Get("watch") == "true": + w.Header().Set("Content-Type", "application/json") + w.Header().Set("Transfer-Encoding", "chunked") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{"type":"ADDED","object":{"kind":"Pod","apiVersion":"v1","metadata":{"name":"bar","namespace":"default","resourceVersion":"184126177","annotations":{"kcp.io/cluster":"root"}}}}`)) + _, _ = w.Write([]byte(`{"type":"ADDED","object":{"kind":"Pod","apiVersion":"v1","metadata":{"name":"bar","namespace":"default","resourceVersion":"184126178","annotations":{"kcp.io/cluster":"ws"}}}}`)) + if w, ok := w.(http.Flusher); ok { + w.Flush() + } + time.Sleep(1 * time.Second) + default: + w.WriteHeader(http.StatusNotFound) + _, _ = w.Write([]byte(fmt.Sprintf("Not found %q", req.RequestURI))) + } + })) + go func() { + <-ctx.Done() + srv.Close() + }() + + cfg = &rest.Config{ + Host: srv.URL, + } + Expect(rest.SetKubernetesDefaults(cfg)).To(Succeed()) + + var err error + c, err = NewClusterAwareCache(cfg, cache.Options{}) + Expect(err).NotTo(HaveOccurred()) + go func() { + if err := c.Start(ctx); err != nil { + Expect(err).NotTo(HaveOccurred()) + } + }() + c.WaitForCacheSync(ctx) + }) + + BeforeEach(func() { + mu.Lock() + defer mu.Unlock() + paths = []string{} + }) + + AfterAll(func() { + cancelCtx() + }) + + It("should always access wildcard clusters and serve other clusters from memory", func(ctx context.Context) { + pod := &corev1.Pod{} + err := c.Get(kontext.WithCluster(ctx, "root"), types.NamespacedName{Namespace: "default", Name: "foo"}, pod) + Expect(err).NotTo(HaveOccurred()) + + mu.Lock() + defer mu.Unlock() + Expect(paths).To(Equal([]string{"/clusters/*/api/v1", "/clusters/*/api/v1/pods", "/clusters/*/api/v1/pods?watch=true"})) + }) + + It("should return only the pods from the requested cluster", func(ctx context.Context) { + pod := &corev1.Pod{} + err := c.Get(kontext.WithCluster(ctx, "root"), types.NamespacedName{Namespace: "default", Name: "foo"}, pod) + Expect(err).NotTo(HaveOccurred()) + Expect(pod.Annotations).To(HaveKeyWithValue("kcp.io/cluster", "root")) + + pods := &corev1.PodList{} + err = c.List(kontext.WithCluster(ctx, "root"), pods) + Expect(err).NotTo(HaveOccurred()) + Expect(pods.Items).To(HaveLen(2)) + Expect(pods.Items[0].Annotations).To(HaveKeyWithValue("kcp.io/cluster", "root")) + Expect(pods.Items[1].Annotations).To(HaveKeyWithValue("kcp.io/cluster", "root")) + Expect(sets.New(pods.Items[0].Name, pods.Items[1].Name)).To(Equal(sets.New("foo", "bar"))) + }) + + It("should return all pods from all clusters without cluster in context", func(ctx context.Context) { + pods := &corev1.PodList{} + err := c.List(ctx, pods) + Expect(err).NotTo(HaveOccurred()) + Expect(pods.Items).To(HaveLen(4)) + }) +}) diff --git a/pkg/kcp/wrappers.go b/pkg/kcp/wrappers.go index 597be213c2..e5d905ed72 100644 --- a/pkg/kcp/wrappers.go +++ b/pkg/kcp/wrappers.go @@ -17,6 +17,7 @@ limitations under the License. package kcp import ( + "context" "fmt" "net/http" "regexp" @@ -27,7 +28,6 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/rest" k8scache "k8s.io/client-go/tools/cache" - ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" @@ -56,11 +56,11 @@ func NewClusterAwareManager(cfg *rest.Config, options ctrl.Options) (manager.Man } if options.MapperProvider == nil { - options.MapperProvider = NewClusterAwareMapperProvider + options.MapperProvider = newWildcardClusterMapperProvider } cfg.Wrap(func(rt http.RoundTripper) http.RoundTripper { - return newClusterRoundTripper(rt) + return newClusterAwareRoundTripper(rt) }) return ctrl.NewManager(cfg, options) } @@ -77,7 +77,7 @@ func NewInformerWithClusterIndexes(lw k8scache.ListerWatcher, obj runtime.Object // NewClusterAwareCache returns a cache.Cache that handles multi-cluster watches. func NewClusterAwareCache(config *rest.Config, opts cache.Options) (cache.Cache, error) { c := rest.CopyConfig(config) - c.Host += "/clusters/*" + c.Host = strings.TrimSuffix(c.Host, "/") + "/clusters/*" opts.NewInformerFunc = NewInformerWithClusterIndexes return cache.New(c, opts) @@ -99,11 +99,16 @@ func NewClusterAwareCache(config *rest.Config, opts cache.Options) (cache.Cache, // ... // } func NewClusterAwareAPIReader(config *rest.Config, opts client.Options) (client.Reader, error) { - httpClient, err := ClusterAwareHTTPClient(config) - if err != nil { - return nil, err + if opts.HTTPClient == nil { + httpClient, err := NewClusterAwareHTTPClient(config) + if err != nil { + return nil, err + } + opts.HTTPClient = httpClient + } + if opts.Mapper == nil && opts.MapperWithContext == nil { + opts.MapperWithContext = NewClusterAwareMapperProvider(config, opts.HTTPClient) } - opts.HTTPClient = httpClient return client.NewAPIReader(config, opts) } @@ -122,52 +127,43 @@ func NewClusterAwareAPIReader(config *rest.Config, opts client.Options) (client. // ... // } func NewClusterAwareClient(config *rest.Config, opts client.Options) (client.Client, error) { - httpClient, err := ClusterAwareHTTPClient(config) - if err != nil { - return nil, err + if opts.HTTPClient == nil { + httpClient, err := NewClusterAwareHTTPClient(config) + if err != nil { + return nil, err + } + opts.HTTPClient = httpClient } - opts.HTTPClient = httpClient - return client.New(config, opts) -} - -// NewClusterAwareClientForConfig returns a client.Client that is configured to use the context to scope -// requests to the proper cluster. To scope requests, pass the request context with the cluster set. -// Example: -// -// import ( -// "context" -// kcpclient "github.com/kcp-dev/apimachinery/v2/pkg/client" -// ctrl "sigs.k8s.io/controller-runtime" -// ) -// func (r *reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { -// ctx = kcpclient.WithCluster(ctx, req.ObjectKey.Cluster) -// // from here on pass this context to all client calls -// ... -// } -func NewClusterAwareClientForConfig(config *rest.Config, httpClient *http.Client) (client.Client, error) { - restMapper, err := NewClusterAwareMapperProvider(config, httpClient) - if err != nil { - return nil, err + if opts.Mapper == nil && opts.MapperWithContext == nil { + opts.MapperWithContext = NewClusterAwareMapperProvider(config, opts.HTTPClient) } - return client.New(config, client.Options{ - Mapper: restMapper, - HTTPClient: httpClient, - }) + return client.New(config, opts) } -// ClusterAwareHTTPClient returns an http.Client with a cluster aware round tripper. -func ClusterAwareHTTPClient(config *rest.Config) (*http.Client, error) { +// NewClusterAwareHTTPClient returns an http.Client with a cluster aware round tripper. +func NewClusterAwareHTTPClient(config *rest.Config) (*http.Client, error) { httpClient, err := rest.HTTPClientFor(config) if err != nil { return nil, err } - httpClient.Transport = newClusterRoundTripper(httpClient.Transport) + httpClient.Transport = newClusterAwareRoundTripper(httpClient.Transport) return httpClient, nil } -// NewClusterAwareMapperProvider is a MapperProvider that returns a logical cluster aware meta.RESTMapper. -func NewClusterAwareMapperProvider(c *rest.Config, httpClient *http.Client) (meta.RESTMapper, error) { +// NewClusterAwareMapperProvider returns a function producing RESTMapper for the +// cluster specified in the context. +func NewClusterAwareMapperProvider(c *rest.Config, httpClient *http.Client) func(ctx context.Context) (meta.RESTMapper, error) { + return func(ctx context.Context) (meta.RESTMapper, error) { + cluster, _ := kontext.ClusterFrom(ctx) // intentionally ignoring second "found" return value + cl := *httpClient + cl.Transport = clusterRoundTripper{cluster: cluster.Path(), delegate: httpClient.Transport} + return apiutil.NewDynamicRESTMapper(c, &cl) + } +} + +// newWildcardClusterMapperProvider returns a RESTMapper that talks to the /clusters/* endpoint. +func newWildcardClusterMapperProvider(c *rest.Config, httpClient *http.Client) (meta.RESTMapper, error) { mapperCfg := rest.CopyConfig(c) if !strings.HasSuffix(mapperCfg.Host, "/clusters/*") { mapperCfg.Host += "/clusters/*" @@ -202,24 +198,38 @@ func ClusterAwareBuilderWithOptions(options cache.Options) cache.NewCacheFunc { } } -// clusterRoundTripper is a cluster aware wrapper around http.RoundTripper. -type clusterRoundTripper struct { +// clusterAwareRoundTripper is a cluster-aware wrapper around http.RoundTripper +// taking the cluster from the context. +type clusterAwareRoundTripper struct { delegate http.RoundTripper } -// newClusterRoundTripper creates a new cluster aware round tripper. -func newClusterRoundTripper(delegate http.RoundTripper) *clusterRoundTripper { - return &clusterRoundTripper{ +// newClusterAwareRoundTripper creates a new cluster aware round tripper. +func newClusterAwareRoundTripper(delegate http.RoundTripper) *clusterAwareRoundTripper { + return &clusterAwareRoundTripper{ delegate: delegate, } } -func (c *clusterRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { +func (c *clusterAwareRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { cluster, ok := kontext.ClusterFrom(req.Context()) - if ok { + if ok && !cluster.Empty() { + return clusterRoundTripper{cluster: cluster.Path(), delegate: c.delegate}.RoundTrip(req) + } + return c.delegate.RoundTrip(req) +} + +// clusterRoundTripper is static cluster-aware wrapper around http.RoundTripper. +type clusterRoundTripper struct { + cluster logicalcluster.Path + delegate http.RoundTripper +} + +func (c clusterRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + if !c.cluster.Empty() { req = req.Clone(req.Context()) - req.URL.Path = generatePath(req.URL.Path, cluster.Path()) - req.URL.RawPath = generatePath(req.URL.RawPath, cluster.Path()) + req.URL.Path = generatePath(req.URL.Path, c.cluster) + req.URL.RawPath = generatePath(req.URL.RawPath, c.cluster) } return c.delegate.RoundTrip(req) }