Skip to content

Commit f70d676

Browse files
committed
runtime/controller: add support for detecting enqueue events while reconciling
Signed-off-by: Matheus Pimenta <[email protected]>
1 parent 7301068 commit f70d676

File tree

6 files changed

+591
-0
lines changed

6 files changed

+591
-0
lines changed

apis/meta/conditions.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,10 @@ const (
156156
// FeatureGateDisabledReason represents the fact that a feature is trying to
157157
// be used, but the feature gate for that feature is disabled.
158158
FeatureGateDisabledReason string = "FeatureGateDisabled"
159+
160+
// HealthCheckCanceledReason represents the fact that
161+
// the health check was canceled.
162+
HealthCheckCanceledReason string = "HealthCheckCanceled"
159163
)
160164

161165
// ObjectWithConditions describes a Kubernetes resource object with status conditions.

runtime/controller/builder.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
Copyright 2025 The Flux authors
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package controller
18+
19+
import (
20+
"context"
21+
"strings"
22+
23+
ctrl "sigs.k8s.io/controller-runtime"
24+
"sigs.k8s.io/controller-runtime/pkg/builder"
25+
"sigs.k8s.io/controller-runtime/pkg/client"
26+
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
27+
"sigs.k8s.io/controller-runtime/pkg/manager"
28+
"sigs.k8s.io/controller-runtime/pkg/predicate"
29+
"sigs.k8s.io/controller-runtime/pkg/source"
30+
)
31+
32+
// controllerBuilder wraps a *builder.Builder to
33+
// enhance it with additional functionality.
34+
type controllerBuilder struct {
35+
*builder.Builder
36+
mgr manager.Manager
37+
reconciler *reconcilerWrapper
38+
}
39+
40+
// NewControllerManagedBy returns a wrapped *builder.Builder
41+
// that facilitates building a controller for a specific
42+
// object type harvesting the capabilities of the reconciler
43+
// wrapper.
44+
func NewControllerManagedBy(mgr manager.Manager, r *reconcilerWrapper) *controllerBuilder {
45+
return &controllerBuilder{
46+
Builder: ctrl.NewControllerManagedBy(mgr),
47+
mgr: mgr,
48+
reconciler: r,
49+
}
50+
}
51+
52+
// For is similar to builder.Builder.For, but internally
53+
// uses WatchesRawSource to set up the watch harvesting
54+
// the capabilities of the reconciler wrapper.
55+
func (c *controllerBuilder) For(obj client.Object, pred predicate.Predicate) *controllerBuilder {
56+
// Do the same as builder.Builder.For to define the controller name,
57+
// lowercased kind of the object being watched.
58+
gvk, err := apiutil.GVKForObject(obj, c.mgr.GetScheme())
59+
// Here we need to panic because builder.Builder.For does not return an error.
60+
// This panic is fine, as it is caught during the controller initialization.
61+
if err != nil {
62+
panic(err)
63+
}
64+
name := strings.ToLower(gvk.Kind)
65+
66+
c.Named(name)
67+
c.WatchesRawSource(source.Kind(
68+
c.mgr.GetCache(),
69+
obj,
70+
c.reconciler.EnqueueRequestsFromMapFunc(gvk.Kind, func(ctx context.Context, obj client.Object) []ctrl.Request {
71+
return []ctrl.Request{{NamespacedName: client.ObjectKeyFromObject(obj)}}
72+
}),
73+
pred,
74+
))
75+
return c
76+
}

runtime/controller/builder_test.go

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
Copyright 2025 The Flux authors
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package controller_test
18+
19+
import (
20+
"context"
21+
"sync/atomic"
22+
"testing"
23+
"time"
24+
25+
"github.com/fluxcd/pkg/runtime/controller"
26+
. "github.com/onsi/gomega"
27+
corev1 "k8s.io/api/core/v1"
28+
ctrl "sigs.k8s.io/controller-runtime"
29+
"sigs.k8s.io/controller-runtime/pkg/client"
30+
"sigs.k8s.io/controller-runtime/pkg/envtest"
31+
"sigs.k8s.io/controller-runtime/pkg/predicate"
32+
)
33+
34+
type noopReconciler struct {
35+
reconciled atomic.Bool
36+
}
37+
38+
func (r *noopReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
39+
if req.Name == "test-configmap" && req.Namespace == "default" {
40+
r.reconciled.Store(true)
41+
}
42+
return ctrl.Result{}, nil
43+
}
44+
45+
func TestControllerBuilder(t *testing.T) {
46+
g := NewWithT(t)
47+
48+
// Create test environment.
49+
testEnv := &envtest.Environment{}
50+
conf, err := testEnv.Start()
51+
g.Expect(err).NotTo(HaveOccurred())
52+
t.Cleanup(func() { testEnv.Stop() })
53+
kubeClient, err := client.New(conf, client.Options{})
54+
g.Expect(err).NotTo(HaveOccurred())
55+
g.Expect(kubeClient).NotTo(BeNil())
56+
57+
// Create manager.
58+
mgr, err := ctrl.NewManager(conf, ctrl.Options{})
59+
g.Expect(err).NotTo(HaveOccurred())
60+
g.Expect(mgr).NotTo(BeNil())
61+
62+
// Create and setup controller.
63+
nr := &noopReconciler{}
64+
r := controller.WrapReconciler(nr)
65+
err = controller.NewControllerManagedBy(mgr, r).
66+
For(&corev1.ConfigMap{}, predicate.ResourceVersionChangedPredicate{}).
67+
Complete(r)
68+
g.Expect(err).NotTo(HaveOccurred())
69+
70+
// Start manager.
71+
errCh := make(chan error, 1)
72+
ctx, cancel := context.WithCancel(context.Background())
73+
t.Cleanup(cancel)
74+
go func() {
75+
errCh <- mgr.Start(ctx)
76+
close(errCh)
77+
}()
78+
79+
// Create a ConfigMap and expect the reconciler to be called.
80+
g.Expect(nr.reconciled.Load()).To(BeFalse())
81+
g.Expect(kubeClient.Create(ctx, &corev1.ConfigMap{
82+
ObjectMeta: ctrl.ObjectMeta{
83+
Name: "test-configmap",
84+
Namespace: "default",
85+
},
86+
})).To(Succeed())
87+
g.Eventually(func() bool { return nr.reconciled.Load() }, time.Second).To(BeTrue())
88+
89+
// Stop the manager.
90+
cancel()
91+
select {
92+
case err := <-errCh:
93+
g.Expect(err).NotTo(HaveOccurred())
94+
case <-time.After(time.Second):
95+
t.Fatal("timeout waiting for manager to stop")
96+
}
97+
}

runtime/controller/queue.go

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
/*
2+
Copyright 2025 The Flux authors
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package controller
18+
19+
import (
20+
"context"
21+
"sync"
22+
23+
"k8s.io/apimachinery/pkg/types"
24+
ctrl "sigs.k8s.io/controller-runtime"
25+
)
26+
27+
// queueEventType represents the type of event that occurred in the queue.
28+
type queueEventType int
29+
30+
const (
31+
// queueEventObjectEnqueued indicates that an object was enqueued.
32+
queueEventObjectEnqueued queueEventType = iota
33+
)
34+
35+
// QueueEventSource holds enough tracking information about
36+
// the source object that triggered a queue event.
37+
type QueueEventSource struct {
38+
Kind string `json:"kind"`
39+
Name string `json:"name"`
40+
Namespace string `json:"namespace"`
41+
UID types.UID `json:"uid"`
42+
ResourceVersion string `json:"resourceVersion"`
43+
}
44+
45+
// queueEventPayload is the payload delivered to listeners
46+
// when a queue event occurs.
47+
type queueEventPayload struct {
48+
source QueueEventSource
49+
}
50+
51+
// queueHooks implements mechanisms for hooking to queue events.
52+
type queueHooks struct {
53+
lis map[queueEvent][]*queueListener
54+
mu sync.Mutex
55+
}
56+
57+
// queueEvent represents an event related to the queue.
58+
type queueEvent struct {
59+
queueEventType
60+
ctrl.Request
61+
}
62+
63+
// queueListener represents a listener for a queue event.
64+
type queueListener struct {
65+
ctx context.Context
66+
cancel context.CancelFunc
67+
payload chan<- *queueEventPayload
68+
}
69+
70+
func newQueueHooks() *queueHooks {
71+
return &queueHooks{
72+
lis: make(map[queueEvent][]*queueListener),
73+
}
74+
}
75+
76+
func (q *queueHooks) dispatch(event queueEvent, payload queueEventPayload) {
77+
q.mu.Lock()
78+
listeners := q.lis[event]
79+
delete(q.lis, event)
80+
q.collectGarbage()
81+
q.mu.Unlock()
82+
83+
for _, l := range listeners {
84+
l.payload <- &payload
85+
l.cancel()
86+
}
87+
}
88+
89+
func (q *queueHooks) registerListener(ctx context.Context, event queueEvent) (
90+
context.Context, context.CancelFunc, <-chan *queueEventPayload,
91+
) {
92+
93+
ctx, cancel := context.WithCancel(ctx)
94+
payload := make(chan *queueEventPayload, 1)
95+
96+
q.mu.Lock()
97+
q.collectGarbage()
98+
q.lis[event] = append(q.lis[event], &queueListener{ctx, cancel, payload})
99+
q.mu.Unlock()
100+
101+
return ctx, cancel, payload
102+
}
103+
104+
func (q *queueHooks) collectGarbage() {
105+
for key, listeners := range q.lis {
106+
var alive []*queueListener
107+
for _, l := range listeners {
108+
if l.ctx.Err() == nil {
109+
alive = append(alive, l)
110+
}
111+
}
112+
if len(alive) > 0 {
113+
q.lis[key] = alive
114+
} else {
115+
delete(q.lis, key)
116+
}
117+
}
118+
}

0 commit comments

Comments
 (0)