Skip to content

Commit 54a76b3

Browse files
committed
ssa: Implement cancelable WaitForSet
Signed-off-by: Stefan Prodan <[email protected]>
1 parent 04c997d commit 54a76b3

File tree

3 files changed

+74
-13
lines changed

3 files changed

+74
-13
lines changed

ssa/main_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ var (
4040
cfg *rest.Config
4141
manager *ResourceManager
4242
restMapper meta.RESTMapper
43+
poller *polling.StatusPoller
4344
)
4445

4546
func TestMain(m *testing.M) {
@@ -67,8 +68,7 @@ func TestMain(m *testing.M) {
6768
panic(err)
6869
}
6970

70-
poller := polling.NewStatusPoller(kubeClient, restMapper, polling.Options{})
71-
71+
poller = polling.NewStatusPoller(kubeClient, restMapper, polling.Options{})
7272
manager = &ResourceManager{
7373
client: kubeClient,
7474
poller: poller,

ssa/manager_wait.go

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -72,11 +72,18 @@ func (m *ResourceManager) Wait(objects []*unstructured.Unstructured, opts WaitOp
7272
return m.WaitForSet(objectsMeta, opts)
7373
}
7474

75-
// WaitForSet checks if the given set of FmtObjMetadata has been fully reconciled.
75+
// WaitForSet checks if the given ObjMetadataSet has been fully reconciled.
7676
func (m *ResourceManager) WaitForSet(set object.ObjMetadataSet, opts WaitOptions) error {
77+
return m.WaitForSetWithContext(context.Background(), set, opts)
78+
}
79+
80+
// WaitForSetWithContext checks if the given ObjMetadataSet has been fully reconciled.
81+
// The provided context can be used to cancel the operation.
82+
func (m *ResourceManager) WaitForSetWithContext(ctx context.Context, set object.ObjMetadataSet, opts WaitOptions) error {
7783
statusCollector := collector.NewResourceStatusCollector(set)
84+
canceledInternally := false
7885

79-
ctx, cancel := context.WithTimeout(context.Background(), opts.Timeout)
86+
ctx, cancel := context.WithTimeout(ctx, opts.Timeout)
8087
defer cancel()
8188

8289
pollingOpts := polling.PollOptions{
@@ -110,6 +117,7 @@ func (m *ResourceManager) WaitForSet(set object.ObjMetadataSet, opts WaitOptions
110117
desired := status.CurrentStatus
111118
aggStatus := aggregator.AggregateStatus(rss, desired)
112119
if aggStatus == desired || (opts.FailFast && countFailed > 0) {
120+
canceledInternally = true
113121
cancel()
114122
return
115123
}
@@ -118,6 +126,11 @@ func (m *ResourceManager) WaitForSet(set object.ObjMetadataSet, opts WaitOptions
118126

119127
<-done
120128

129+
// If the context was cancelled externally, return early.
130+
if !canceledInternally && errors.Is(ctx.Err(), context.Canceled) {
131+
return ctx.Err()
132+
}
133+
121134
if statusCollector.Error != nil {
122135
return statusCollector.Error
123136
}
@@ -127,15 +140,9 @@ func (m *ResourceManager) WaitForSet(set object.ObjMetadataSet, opts WaitOptions
127140
switch {
128141
case rs == nil || lastStatus[id] == nil:
129142
errs = append(errs, fmt.Sprintf("can't determine status for %s", utils.FmtObjMetadata(id)))
130-
case lastStatus[id].Status == status.FailedStatus:
131-
var builder strings.Builder
132-
builder.WriteString(fmt.Sprintf("%s status: '%s'",
133-
utils.FmtObjMetadata(rs.Identifier), lastStatus[id].Status))
134-
if rs.Error != nil {
135-
builder.WriteString(fmt.Sprintf(": %s", rs.Error))
136-
}
137-
errs = append(errs, builder.String())
138-
case errors.Is(ctx.Err(), context.DeadlineExceeded) && lastStatus[id].Status != status.CurrentStatus:
143+
case lastStatus[id].Status == status.FailedStatus,
144+
errors.Is(ctx.Err(), context.DeadlineExceeded) &&
145+
lastStatus[id].Status != status.CurrentStatus:
139146
var builder strings.Builder
140147
builder.WriteString(fmt.Sprintf("%s status: '%s'",
141148
utils.FmtObjMetadata(rs.Identifier), lastStatus[id].Status))

ssa/manager_wait_test.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,11 @@ func TestWaitForSet_ErrorOnReaderError(t *testing.T) {
264264
},
265265
})
266266

267+
// Restore the original poller otherwise all other tests will fail
268+
defer func() {
269+
manager.poller = poller
270+
}()
271+
267272
set := []object.ObjMetadata{{
268273
Name: "test",
269274
Namespace: "default",
@@ -278,6 +283,55 @@ func TestWaitForSet_ErrorOnReaderError(t *testing.T) {
278283
g.Expect(err.Error()).To(Equal("timeout waiting for: [ConfigMap/default/test status: 'Unknown': error reading status]"))
279284
}
280285

286+
func TestWaitWithContext_Cancellation(t *testing.T) {
287+
g := NewWithT(t)
288+
289+
id := generateName("cancellation")
290+
objects, err := readManifest("testdata/test2.yaml", id)
291+
g.Expect(err).NotTo(HaveOccurred())
292+
293+
// Apply objects to the cluster which will never reach Ready state
294+
manager.SetOwnerLabels(objects, "app1", "cancellation")
295+
changeSet, err := manager.ApplyAllStaged(context.Background(), objects, ApplyOptions{
296+
WaitInterval: 500 * time.Millisecond,
297+
WaitTimeout: 5 * time.Second,
298+
})
299+
g.Expect(err).NotTo(HaveOccurred())
300+
301+
// Create a context that we can cancel for the wait operation
302+
ctx, cancel := context.WithCancel(context.Background())
303+
304+
// Configure wait options with a longer timeout to ensure we can cancel before it times out
305+
waitOpts := WaitOptions{
306+
Interval: 500 * time.Millisecond,
307+
Timeout: 5 * time.Second,
308+
FailFast: true,
309+
}
310+
311+
// Channel to capture the error from WaitForSetWithContext
312+
errChan := make(chan error, 1)
313+
314+
// Start WaitForSetWithContext in a goroutine
315+
go func() {
316+
errChan <- manager.WaitForSetWithContext(ctx, changeSet.ToObjMetadataSet(), waitOpts)
317+
}()
318+
319+
// Wait for one second to ensure WaitForSetWithContext has started
320+
time.Sleep(time.Second)
321+
322+
// Cancel the context to trigger early exit
323+
cancel()
324+
325+
// Wait for the goroutine to finish and verify it returned due to context cancellation
326+
select {
327+
case waitErr := <-errChan:
328+
g.Expect(waitErr).To(HaveOccurred(), "Expected an error due to context cancellation")
329+
g.Expect(waitErr).To(Equal(context.Canceled))
330+
case <-time.After(2 * time.Second):
331+
t.Fatal("WaitForSetWithContext did not return within expected time after cancellation")
332+
}
333+
}
334+
281335
func TestWaitForSetTermination(t *testing.T) {
282336
timeout := 10 * time.Second
283337
ctx, cancel := context.WithTimeout(context.Background(), timeout)

0 commit comments

Comments
 (0)