Skip to content

⚠️ NewTypedUnmanaged: Stop requiring a manager #3141

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions pkg/config/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,15 @@ limitations under the License.

package config

import "time"
import (
"time"

// Controller contains configuration options for a controller.
"github.com/go-logr/logr"
)

// Controller contains configuration options for controllers. It only includes options
// that makes sense for a set of controllers and is used for defaulting the options
// of multiple controllers.
type Controller struct {
// SkipNameValidation allows skipping the name validation that ensures that every controller name is unique.
// Unique controller names are important to get unique metrics and logs for a controller.
Expand Down Expand Up @@ -59,4 +65,7 @@ type Controller struct {
//
// Note: This flag is disabled by default until a future version. It's currently in beta.
UsePriorityQueue *bool

// Logger is the logger controllers should use.
Logger logr.Logger
}
86 changes: 54 additions & 32 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"k8s.io/klog/v2"
"k8s.io/utils/ptr"

"sigs.k8s.io/controller-runtime/pkg/config"
"sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue"
"sigs.k8s.io/controller-runtime/pkg/internal/controller"
"sigs.k8s.io/controller-runtime/pkg/manager"
Expand Down Expand Up @@ -80,13 +81,53 @@ type TypedOptions[request comparable] struct {
// Only use a custom NewQueue if you know what you are doing.
NewQueue func(controllerName string, rateLimiter workqueue.TypedRateLimiter[request]) workqueue.TypedRateLimitingInterface[request]

// Logger will be used to build a default LogConstructor if unset.
Logger logr.Logger

// LogConstructor is used to construct a logger used for this controller and passed
// to each reconciliation via the context field.
LogConstructor func(request *request) logr.Logger

// UsePriorityQueue configures the controllers queue to use the controller-runtime provided
// priority queue.
//
// Note: This flag is disabled by default until a future version. It's currently in beta.
UsePriorityQueue *bool
}

// DefaultFromConfig defaults the config from a config.Controller
func (options *TypedOptions[request]) DefaultFromConfig(config config.Controller) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is notably absent from this change are tests - That is because this behavior is already verified by existing tests

if options.Logger.GetSink() == nil {
options.Logger = config.Logger
}

if options.SkipNameValidation == nil {
options.SkipNameValidation = config.SkipNameValidation
}

if options.MaxConcurrentReconciles <= 0 && config.MaxConcurrentReconciles > 0 {
options.MaxConcurrentReconciles = config.MaxConcurrentReconciles
}

if options.CacheSyncTimeout == 0 && config.CacheSyncTimeout > 0 {
options.CacheSyncTimeout = config.CacheSyncTimeout
}

if options.UsePriorityQueue == nil {
options.UsePriorityQueue = config.UsePriorityQueue
}

if options.RecoverPanic == nil {
options.RecoverPanic = config.RecoverPanic
}

if options.NeedLeaderElection == nil {
options.NeedLeaderElection = config.NeedLeaderElection
}
}

// Controller implements a Kubernetes API. A Controller manages a work queue fed reconcile.Requests
// from source.Sources. Work is performed through the reconcile.Reconciler for each enqueued item.
// Controller implements an API. A Controller manages a work queue fed reconcile.Requests
// from source.Sources. Work is performed through the reconcile.Reconciler for each enqueued item.
// Work typically is reads and writes Kubernetes objects to make the system state match the state specified
// in the object Spec.
type Controller = TypedController[reconcile.Request]
Expand Down Expand Up @@ -119,7 +160,8 @@ func New(name string, mgr manager.Manager, options Options) (Controller, error)
//
// The name must be unique as it is used to identify the controller in metrics and logs.
func NewTyped[request comparable](name string, mgr manager.Manager, options TypedOptions[request]) (TypedController[request], error) {
c, err := NewTypedUnmanaged(name, mgr, options)
options.DefaultFromConfig(mgr.GetControllerOptions())
c, err := NewTypedUnmanaged(name, options)
if err != nil {
return nil, err
}
Expand All @@ -132,14 +174,14 @@ func NewTyped[request comparable](name string, mgr manager.Manager, options Type
// caller is responsible for starting the returned controller.
//
// The name must be unique as it is used to identify the controller in metrics and logs.
func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller, error) {
return NewTypedUnmanaged(name, mgr, options)
func NewUnmanaged(name string, options Options) (Controller, error) {
return NewTypedUnmanaged(name, options)
}

// NewTypedUnmanaged returns a new typed controller without adding it to the manager.
//
// The name must be unique as it is used to identify the controller in metrics and logs.
func NewTypedUnmanaged[request comparable](name string, mgr manager.Manager, options TypedOptions[request]) (TypedController[request], error) {
func NewTypedUnmanaged[request comparable](name string, options TypedOptions[request]) (TypedController[request], error) {
if options.Reconciler == nil {
return nil, fmt.Errorf("must specify Reconciler")
}
Expand All @@ -148,18 +190,14 @@ func NewTypedUnmanaged[request comparable](name string, mgr manager.Manager, opt
return nil, fmt.Errorf("must specify Name for Controller")
}

if options.SkipNameValidation == nil {
options.SkipNameValidation = mgr.GetControllerOptions().SkipNameValidation
}

if options.SkipNameValidation == nil || !*options.SkipNameValidation {
if err := checkName(name); err != nil {
return nil, err
}
}

if options.LogConstructor == nil {
log := mgr.GetLogger().WithValues(
log := options.Logger.WithValues(
"controller", name,
)
options.LogConstructor = func(in *request) logr.Logger {
Expand All @@ -175,23 +213,15 @@ func NewTypedUnmanaged[request comparable](name string, mgr manager.Manager, opt
}

if options.MaxConcurrentReconciles <= 0 {
if mgr.GetControllerOptions().MaxConcurrentReconciles > 0 {
options.MaxConcurrentReconciles = mgr.GetControllerOptions().MaxConcurrentReconciles
} else {
options.MaxConcurrentReconciles = 1
}
options.MaxConcurrentReconciles = 1
}

if options.CacheSyncTimeout == 0 {
if mgr.GetControllerOptions().CacheSyncTimeout != 0 {
options.CacheSyncTimeout = mgr.GetControllerOptions().CacheSyncTimeout
} else {
options.CacheSyncTimeout = 2 * time.Minute
}
options.CacheSyncTimeout = 2 * time.Minute
}

if options.RateLimiter == nil {
if ptr.Deref(mgr.GetControllerOptions().UsePriorityQueue, false) {
if ptr.Deref(options.UsePriorityQueue, false) {
options.RateLimiter = workqueue.NewTypedItemExponentialFailureRateLimiter[request](5*time.Millisecond, 1000*time.Second)
} else {
options.RateLimiter = workqueue.DefaultTypedControllerRateLimiter[request]()
Expand All @@ -200,9 +230,9 @@ func NewTypedUnmanaged[request comparable](name string, mgr manager.Manager, opt

if options.NewQueue == nil {
options.NewQueue = func(controllerName string, rateLimiter workqueue.TypedRateLimiter[request]) workqueue.TypedRateLimitingInterface[request] {
if ptr.Deref(mgr.GetControllerOptions().UsePriorityQueue, false) {
if ptr.Deref(options.UsePriorityQueue, false) {
return priorityqueue.New(controllerName, func(o *priorityqueue.Opts[request]) {
o.Log = mgr.GetLogger().WithValues("controller", controllerName)
o.Log = options.Logger.WithValues("controller", controllerName)
o.RateLimiter = rateLimiter
})
}
Expand All @@ -212,14 +242,6 @@ func NewTypedUnmanaged[request comparable](name string, mgr manager.Manager, opt
}
}

if options.RecoverPanic == nil {
options.RecoverPanic = mgr.GetControllerOptions().RecoverPanic
}

if options.NeedLeaderElection == nil {
options.NeedLeaderElection = mgr.GetControllerOptions().NeedLeaderElection
}

// Create controller with dependencies set
return &controller.Controller[request]{
Do: options.Reconciler,
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func ExampleNewUnmanaged() {

// Configure creates a new controller but does not add it to the supplied
// manager.
c, err := controller.NewUnmanaged("pod-controller", mgr, controller.Options{
c, err := controller.NewUnmanaged("pod-controller", controller.Options{
Reconciler: reconcile.Func(func(context.Context, reconcile.Request) (reconcile.Result, error) {
return reconcile.Result{}, nil
}),
Expand Down
4 changes: 4 additions & 0 deletions pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,10 @@ func setOptionsDefaults(options Options) Options {
options.Logger = log.Log
}

if options.Controller.Logger.GetSink() == nil {
options.Controller.Logger = options.Logger
}

if options.BaseContext == nil {
options.BaseContext = defaultBaseContext
}
Expand Down
41 changes: 41 additions & 0 deletions pkg/manager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1056,6 +1056,47 @@ var _ = Describe("manger.Manager", func() {
)))
})

It("should default controller logger from manager logger", func() {
var lock sync.Mutex
var messages []string
options.Logger = funcr.NewJSON(func(object string) {
lock.Lock()
messages = append(messages, object)
lock.Unlock()
}, funcr.Options{})
options.LeaderElection = false

m, err := New(cfg, options)
Expect(err).NotTo(HaveOccurred())
for _, cb := range callbacks {
cb(m)
}

started := make(chan struct{})
Expect(m.Add(RunnableFunc(func(ctx context.Context) error {
close(started)
return nil
}))).To(Succeed())

stopped := make(chan error)
ctx, cancel := context.WithCancel(context.Background())
go func() {
stopped <- m.Start(ctx)
}()

// Wait for runnables to start as a proxy for the manager being fully started.
<-started
cancel()
Expect(<-stopped).To(Succeed())

msg := "controller log message"
m.GetControllerOptions().Logger.Info(msg)

Expect(messages).To(ContainElement(
ContainSubstring(msg),
))
})

It("should return both runnables and stop errors when both error", func() {
m, err := New(cfg, options)
Expect(err).NotTo(HaveOccurred())
Expand Down