Skip to content

Commit b6ba017

Browse files
committed
runtime/controller: add support for detecting enqueue events while reconciling
Signed-off-by: Matheus Pimenta <[email protected]>
1 parent 7301068 commit b6ba017

File tree

7 files changed

+651
-0
lines changed

7 files changed

+651
-0
lines changed

apis/meta/conditions.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,10 @@ const (
156156
// FeatureGateDisabledReason represents the fact that a feature is trying to
157157
// be used, but the feature gate for that feature is disabled.
158158
FeatureGateDisabledReason string = "FeatureGateDisabled"
159+
160+
// HealthCheckCanceledReason represents the fact that
161+
// the health check was canceled.
162+
HealthCheckCanceledReason string = "HealthCheckCanceled"
159163
)
160164

161165
// ObjectWithConditions describes a Kubernetes resource object with status conditions.

runtime/controller/builder.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
Copyright 2025 The Flux authors
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package controller
18+
19+
import (
20+
"context"
21+
"strings"
22+
23+
ctrl "sigs.k8s.io/controller-runtime"
24+
"sigs.k8s.io/controller-runtime/pkg/builder"
25+
"sigs.k8s.io/controller-runtime/pkg/client"
26+
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
27+
"sigs.k8s.io/controller-runtime/pkg/manager"
28+
"sigs.k8s.io/controller-runtime/pkg/predicate"
29+
"sigs.k8s.io/controller-runtime/pkg/source"
30+
)
31+
32+
// controllerBuilder wraps a *builder.Builder to
33+
// enhance it with additional functionality.
34+
type controllerBuilder struct {
35+
*builder.Builder
36+
mgr manager.Manager
37+
reconciler *reconcilerWrapper
38+
}
39+
40+
// NewControllerManagedBy returns a wrapped *builder.Builder
41+
// that facilitates building a controller for a specific
42+
// object type harvesting the capabilities of the reconciler
43+
// wrapper.
44+
func NewControllerManagedBy(mgr manager.Manager, r *reconcilerWrapper) *controllerBuilder {
45+
return &controllerBuilder{
46+
Builder: ctrl.NewControllerManagedBy(mgr),
47+
mgr: mgr,
48+
reconciler: r,
49+
}
50+
}
51+
52+
// For is similar to builder.Builder.For, but internally
53+
// uses WatchesRawSource to set up the watch harvesting
54+
// the capabilities of the reconciler wrapper.
55+
func (c *controllerBuilder) For(obj client.Object, pred predicate.Predicate) *controllerBuilder {
56+
// Do the same as builder.Builder.For to define the controller name,
57+
// lowercased kind of the object being watched.
58+
gvk, err := apiutil.GVKForObject(obj, c.mgr.GetScheme())
59+
// Here we need to panic because builder.Builder.For does not return an error.
60+
// This panic is fine, as it is caught during the controller initialization.
61+
if err != nil {
62+
panic(err)
63+
}
64+
name := strings.ToLower(gvk.Kind)
65+
66+
c.Named(name)
67+
c.WatchesRawSource(source.Kind(
68+
c.mgr.GetCache(),
69+
obj,
70+
c.reconciler.EnqueueRequestsFromMapFunc(gvk.Kind, func(ctx context.Context, obj client.Object) []ctrl.Request {
71+
return []ctrl.Request{{NamespacedName: client.ObjectKeyFromObject(obj)}}
72+
}),
73+
pred,
74+
))
75+
return c
76+
}

runtime/controller/builder_test.go

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
Copyright 2025 The Flux authors
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package controller_test
18+
19+
import (
20+
"context"
21+
"sync/atomic"
22+
"testing"
23+
"time"
24+
25+
"github.com/fluxcd/pkg/runtime/controller"
26+
. "github.com/onsi/gomega"
27+
corev1 "k8s.io/api/core/v1"
28+
ctrl "sigs.k8s.io/controller-runtime"
29+
"sigs.k8s.io/controller-runtime/pkg/client"
30+
"sigs.k8s.io/controller-runtime/pkg/envtest"
31+
"sigs.k8s.io/controller-runtime/pkg/predicate"
32+
)
33+
34+
type noopReconciler struct {
35+
reconciled atomic.Bool
36+
}
37+
38+
func (r *noopReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
39+
if req.Name == "test-configmap" && req.Namespace == "default" {
40+
r.reconciled.Store(true)
41+
}
42+
return ctrl.Result{}, nil
43+
}
44+
45+
func TestControllerBuilder(t *testing.T) {
46+
g := NewWithT(t)
47+
48+
// Create test environment.
49+
testEnv := &envtest.Environment{}
50+
conf, err := testEnv.Start()
51+
g.Expect(err).NotTo(HaveOccurred())
52+
t.Cleanup(func() { testEnv.Stop() })
53+
kubeClient, err := client.New(conf, client.Options{})
54+
g.Expect(err).NotTo(HaveOccurred())
55+
g.Expect(kubeClient).NotTo(BeNil())
56+
57+
// Create manager.
58+
mgr, err := ctrl.NewManager(conf, ctrl.Options{})
59+
g.Expect(err).NotTo(HaveOccurred())
60+
g.Expect(mgr).NotTo(BeNil())
61+
62+
// Create and setup controller.
63+
nr := &noopReconciler{}
64+
r := controller.WrapReconciler(nr)
65+
err = controller.NewControllerManagedBy(mgr, r).
66+
For(&corev1.ConfigMap{}, predicate.ResourceVersionChangedPredicate{}).
67+
Complete(r)
68+
g.Expect(err).NotTo(HaveOccurred())
69+
70+
// Start manager.
71+
errCh := make(chan error, 1)
72+
ctx, cancel := context.WithCancel(context.Background())
73+
t.Cleanup(cancel)
74+
go func() {
75+
errCh <- mgr.Start(ctx)
76+
close(errCh)
77+
}()
78+
79+
// Create a ConfigMap and expect the reconciler to be called.
80+
g.Expect(nr.reconciled.Load()).To(BeFalse())
81+
g.Expect(kubeClient.Create(ctx, &corev1.ConfigMap{
82+
ObjectMeta: ctrl.ObjectMeta{
83+
Name: "test-configmap",
84+
Namespace: "default",
85+
},
86+
})).To(Succeed())
87+
g.Eventually(func() bool { return nr.reconciled.Load() }, time.Second).To(BeTrue())
88+
89+
// Stop the manager.
90+
cancel()
91+
select {
92+
case err := <-errCh:
93+
g.Expect(err).NotTo(HaveOccurred())
94+
case <-time.After(time.Second):
95+
t.Fatal("timeout waiting for manager to stop")
96+
}
97+
}

runtime/controller/queue.go

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
/*
2+
Copyright 2025 The Flux authors
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package controller
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"sync"
23+
24+
"k8s.io/apimachinery/pkg/types"
25+
ctrl "sigs.k8s.io/controller-runtime"
26+
)
27+
28+
// QueueEventSource holds enough tracking information about the
29+
// source object that triggered a queue event and implements the
30+
// error interface.
31+
type QueueEventSource struct {
32+
Kind string `json:"kind"`
33+
Name string `json:"name"`
34+
Namespace string `json:"namespace"`
35+
UID types.UID `json:"uid"`
36+
ResourceVersion string `json:"resourceVersion"`
37+
}
38+
39+
// Ensure QueueEventSource implements the error interface.
40+
var _ error = &QueueEventSource{}
41+
42+
// Error returns a string representation of the object represented by QueueEventSource.
43+
func (q *QueueEventSource) Error() string {
44+
return fmt.Sprintf("%s/%s/%s", q.Kind, q.Namespace, q.Name)
45+
}
46+
47+
// Is returns true if the target error is a QueueEventSource object.
48+
func (*QueueEventSource) Is(target error) bool {
49+
_, ok := target.(*QueueEventSource)
50+
return ok
51+
}
52+
53+
// queueEventType represents the type of event that occurred in the queue.
54+
type queueEventType int
55+
56+
const (
57+
// queueEventObjectEnqueued indicates that an object was enqueued.
58+
queueEventObjectEnqueued queueEventType = iota
59+
)
60+
61+
// queueEventPayload is the payload delivered to listeners
62+
// when a queue event occurs.
63+
type queueEventPayload struct {
64+
source QueueEventSource
65+
}
66+
67+
// queueHooks implements mechanisms for hooking to queue events.
68+
type queueHooks struct {
69+
lis map[queueEvent][]*queueListener
70+
mu sync.Mutex
71+
}
72+
73+
// queueEvent represents an event related to the queue.
74+
type queueEvent struct {
75+
queueEventType
76+
ctrl.Request
77+
}
78+
79+
// queueListener represents a listener for a queue event.
80+
type queueListener struct {
81+
ctx context.Context
82+
cancel context.CancelFunc
83+
payload chan<- *queueEventPayload
84+
}
85+
86+
func newQueueHooks() *queueHooks {
87+
return &queueHooks{
88+
lis: make(map[queueEvent][]*queueListener),
89+
}
90+
}
91+
92+
func (q *queueHooks) dispatch(event queueEvent, payload queueEventPayload) {
93+
q.mu.Lock()
94+
listeners := q.lis[event]
95+
delete(q.lis, event)
96+
q.collectGarbage()
97+
q.mu.Unlock()
98+
99+
for _, l := range listeners {
100+
l.payload <- &payload
101+
l.cancel()
102+
}
103+
}
104+
105+
func (q *queueHooks) registerListener(ctx context.Context, event queueEvent) (
106+
context.Context, context.CancelFunc, <-chan *queueEventPayload,
107+
) {
108+
ctx, cancel := context.WithCancel(ctx)
109+
payload := make(chan *queueEventPayload, 1)
110+
111+
q.mu.Lock()
112+
q.collectGarbage()
113+
q.lis[event] = append(q.lis[event], &queueListener{ctx, cancel, payload})
114+
q.mu.Unlock()
115+
116+
return ctx, cancel, payload
117+
}
118+
119+
func (q *queueHooks) collectGarbage() {
120+
for key, listeners := range q.lis {
121+
var alive []*queueListener
122+
for _, l := range listeners {
123+
if l.ctx.Err() == nil {
124+
alive = append(alive, l)
125+
}
126+
}
127+
if len(alive) > 0 {
128+
q.lis[key] = alive
129+
} else {
130+
delete(q.lis, key)
131+
}
132+
}
133+
}

runtime/controller/queue_test.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
Copyright 2025 The Flux authors
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package controller_test
18+
19+
import (
20+
"testing"
21+
22+
. "github.com/onsi/gomega"
23+
24+
"github.com/fluxcd/pkg/runtime/controller"
25+
)
26+
27+
func TestQueueEventSource_Error(t *testing.T) {
28+
g := NewWithT(t)
29+
var err error = &controller.QueueEventSource{
30+
Kind: "TestKind",
31+
Name: "test-name",
32+
Namespace: "test-namespace",
33+
UID: "12345",
34+
ResourceVersion: "1",
35+
}
36+
g.Expect(err.Error()).To(Equal("TestKind/test-namespace/test-name"))
37+
}
38+
39+
func TestQueueEventSource_Is(t *testing.T) {
40+
g := NewWithT(t)
41+
qes := controller.QueueEventSource{
42+
Kind: "TestKind",
43+
}
44+
g.Expect(qes.Is(&controller.QueueEventSource{})).To(BeTrue())
45+
}

0 commit comments

Comments
 (0)