Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions apis/meta/conditions.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ const (
// FeatureGateDisabledReason represents the fact that a feature is trying to
// be used, but the feature gate for that feature is disabled.
FeatureGateDisabledReason string = "FeatureGateDisabled"

// HealthCheckCanceledReason represents the fact that
// the health check was canceled.
HealthCheckCanceledReason string = "HealthCheckCanceled"
)

// ObjectWithConditions describes a Kubernetes resource object with status conditions.
Expand Down
2 changes: 1 addition & 1 deletion artifact/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ replace (

require (
github.com/cyphar/filepath-securejoin v0.4.1
github.com/fluxcd/pkg/apis/meta v1.22.0
github.com/fluxcd/pkg/apis/meta v1.23.0
github.com/fluxcd/pkg/lockedfile v0.7.0
github.com/fluxcd/pkg/oci v0.57.0
github.com/fluxcd/pkg/sourceignore v0.15.0
Expand Down
2 changes: 1 addition & 1 deletion auth/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ require (
github.com/aws/aws-sdk-go-v2/service/eks v1.74.2
github.com/aws/aws-sdk-go-v2/service/sts v1.38.6
github.com/coreos/go-oidc/v3 v3.16.0
github.com/fluxcd/pkg/apis/meta v1.22.0
github.com/fluxcd/pkg/apis/meta v1.23.0
github.com/fluxcd/pkg/cache v0.12.0
github.com/golang-jwt/jwt/v5 v5.3.0
github.com/google/go-containerregistry v0.20.6
Expand Down
2 changes: 1 addition & 1 deletion chartutil/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ replace github.com/fluxcd/pkg/apis/meta => ../apis/meta
replace github.com/opencontainers/go-digest => github.com/opencontainers/go-digest v1.0.1-0.20231025023718-d50d2fec9c98

require (
github.com/fluxcd/pkg/apis/meta v1.22.0
github.com/fluxcd/pkg/apis/meta v1.23.0
github.com/go-logr/logr v1.4.3
github.com/onsi/gomega v1.38.2
github.com/opencontainers/go-digest v1.0.0
Expand Down
76 changes: 76 additions & 0 deletions runtime/controller/builder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
Copyright 2025 The Flux authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controller

import (
"context"
"strings"

ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/source"
)

// controllerBuilder wraps a *builder.Builder to
// enhance it with additional functionality.
type controllerBuilder struct {
*builder.Builder
mgr manager.Manager
reconciler *reconcilerWrapper
}

// NewControllerManagedBy returns a wrapped *builder.Builder
// that facilitates building a controller for a specific
// object type harvesting the capabilities of the reconciler
// wrapper.
func NewControllerManagedBy(mgr manager.Manager, r *reconcilerWrapper) *controllerBuilder {
return &controllerBuilder{
Builder: ctrl.NewControllerManagedBy(mgr),
mgr: mgr,
reconciler: r,
}
}

// For is similar to builder.Builder.For, but internally
// uses WatchesRawSource to set up the watch harvesting
// the capabilities of the reconciler wrapper.
func (c *controllerBuilder) For(obj client.Object, pred predicate.Predicate) *controllerBuilder {
// Do the same as builder.Builder.For to define the controller name,
// lowercased kind of the object being watched.
gvk, err := apiutil.GVKForObject(obj, c.mgr.GetScheme())
// Here we need to panic because builder.Builder.For does not return an error.
// This panic is fine, as it is caught during the controller initialization.
if err != nil {
panic(err)
}
name := strings.ToLower(gvk.Kind)

c.Named(name)
c.WatchesRawSource(source.Kind(
c.mgr.GetCache(),
obj,
c.reconciler.EnqueueRequestsFromMapFunc(gvk.Kind, func(ctx context.Context, obj client.Object) []ctrl.Request {
return []ctrl.Request{{NamespacedName: client.ObjectKeyFromObject(obj)}}
}),
pred,
))
return c
}
97 changes: 97 additions & 0 deletions runtime/controller/builder_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
Copyright 2025 The Flux authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controller_test

import (
"context"
"sync/atomic"
"testing"
"time"

"github.com/fluxcd/pkg/runtime/controller"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/envtest"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)

type noopReconciler struct {
reconciled atomic.Bool
}

func (r *noopReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
if req.Name == "test-configmap" && req.Namespace == "default" {
r.reconciled.Store(true)
}
return ctrl.Result{}, nil
}

func TestControllerBuilder(t *testing.T) {
g := NewWithT(t)

// Create test environment.
testEnv := &envtest.Environment{}
conf, err := testEnv.Start()
g.Expect(err).NotTo(HaveOccurred())
t.Cleanup(func() { testEnv.Stop() })
kubeClient, err := client.New(conf, client.Options{})
g.Expect(err).NotTo(HaveOccurred())
g.Expect(kubeClient).NotTo(BeNil())

// Create manager.
mgr, err := ctrl.NewManager(conf, ctrl.Options{})
g.Expect(err).NotTo(HaveOccurred())
g.Expect(mgr).NotTo(BeNil())

// Create and setup controller.
nr := &noopReconciler{}
r := controller.WrapReconciler(nr)
err = controller.NewControllerManagedBy(mgr, r).
For(&corev1.ConfigMap{}, predicate.ResourceVersionChangedPredicate{}).
Complete(r)
g.Expect(err).NotTo(HaveOccurred())

// Start manager.
errCh := make(chan error, 1)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
go func() {
errCh <- mgr.Start(ctx)
close(errCh)
}()

// Create a ConfigMap and expect the reconciler to be called.
g.Expect(nr.reconciled.Load()).To(BeFalse())
g.Expect(kubeClient.Create(ctx, &corev1.ConfigMap{
ObjectMeta: ctrl.ObjectMeta{
Name: "test-configmap",
Namespace: "default",
},
})).To(Succeed())
g.Eventually(func() bool { return nr.reconciled.Load() }, time.Second).To(BeTrue())

// Stop the manager.
cancel()
select {
case err := <-errCh:
g.Expect(err).NotTo(HaveOccurred())
case <-time.After(time.Second):
t.Fatal("timeout waiting for manager to stop")
}
}
133 changes: 133 additions & 0 deletions runtime/controller/queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
Copyright 2025 The Flux authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controller

import (
"context"
"fmt"
"sync"

"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
)

// QueueEventSource holds enough tracking information about the
// source object that triggered a queue event and implements the
// error interface.
type QueueEventSource struct {
Kind string `json:"kind"`
Name string `json:"name"`
Namespace string `json:"namespace"`
UID types.UID `json:"uid"`
ResourceVersion string `json:"resourceVersion"`
}

// Ensure QueueEventSource implements the error interface.
var _ error = &QueueEventSource{}

// Error returns a string representation of the object represented by QueueEventSource.
func (q *QueueEventSource) Error() string {
return fmt.Sprintf("%s/%s/%s", q.Kind, q.Namespace, q.Name)
}

// Is returns true if the target error is a QueueEventSource object.
func (*QueueEventSource) Is(target error) bool {
_, ok := target.(*QueueEventSource)
return ok
}

// queueEventType represents the type of event that occurred in the queue.
type queueEventType int

const (
// queueEventObjectEnqueued indicates that an object was enqueued.
queueEventObjectEnqueued queueEventType = iota
)

// queueEventPayload is the payload delivered to listeners
// when a queue event occurs.
type queueEventPayload struct {
source QueueEventSource
}

// queueHooks implements mechanisms for hooking to queue events.
type queueHooks struct {
lis map[queueEvent][]*queueListener
mu sync.Mutex
}

// queueEvent represents an event related to the queue.
type queueEvent struct {
queueEventType
ctrl.Request
}

// queueListener represents a listener for a queue event.
type queueListener struct {
ctx context.Context
cancel context.CancelFunc
payload chan<- *queueEventPayload
}

func newQueueHooks() *queueHooks {
return &queueHooks{
lis: make(map[queueEvent][]*queueListener),
}
}

func (q *queueHooks) dispatch(event queueEvent, payload queueEventPayload) {
q.mu.Lock()
listeners := q.lis[event]
delete(q.lis, event)
q.collectGarbage()
q.mu.Unlock()

for _, l := range listeners {
l.payload <- &payload
l.cancel()
}
}

func (q *queueHooks) registerListener(ctx context.Context, event queueEvent) (
context.Context, context.CancelFunc, <-chan *queueEventPayload,
) {
ctx, cancel := context.WithCancel(ctx)
payload := make(chan *queueEventPayload, 1)

q.mu.Lock()
q.collectGarbage()
q.lis[event] = append(q.lis[event], &queueListener{ctx, cancel, payload})
q.mu.Unlock()

return ctx, cancel, payload
}

func (q *queueHooks) collectGarbage() {
for key, listeners := range q.lis {
var alive []*queueListener
for _, l := range listeners {
if l.ctx.Err() == nil {
alive = append(alive, l)
}
}
if len(alive) > 0 {
q.lis[key] = alive
} else {
delete(q.lis, key)
}
}
}
45 changes: 45 additions & 0 deletions runtime/controller/queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
Copyright 2025 The Flux authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controller_test

import (
"testing"

. "github.com/onsi/gomega"

"github.com/fluxcd/pkg/runtime/controller"
)

func TestQueueEventSource_Error(t *testing.T) {
g := NewWithT(t)
var err error = &controller.QueueEventSource{
Kind: "TestKind",
Name: "test-name",
Namespace: "test-namespace",
UID: "12345",
ResourceVersion: "1",
}
g.Expect(err.Error()).To(Equal("TestKind/test-namespace/test-name"))
}

func TestQueueEventSource_Is(t *testing.T) {
g := NewWithT(t)
qes := controller.QueueEventSource{
Kind: "TestKind",
}
g.Expect(qes.Is(&controller.QueueEventSource{})).To(BeTrue())
}
Loading
Loading