Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ssa/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ var (
cfg *rest.Config
manager *ResourceManager
restMapper meta.RESTMapper
poller *polling.StatusPoller
)

func TestMain(m *testing.M) {
Expand Down Expand Up @@ -67,8 +68,7 @@ func TestMain(m *testing.M) {
panic(err)
}

poller := polling.NewStatusPoller(kubeClient, restMapper, polling.Options{})

poller = polling.NewStatusPoller(kubeClient, restMapper, polling.Options{})
manager = &ResourceManager{
client: kubeClient,
poller: poller,
Expand Down
29 changes: 18 additions & 11 deletions ssa/manager_wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,18 @@ func (m *ResourceManager) Wait(objects []*unstructured.Unstructured, opts WaitOp
return m.WaitForSet(objectsMeta, opts)
}

// WaitForSet checks if the given set of FmtObjMetadata has been fully reconciled.
// WaitForSet checks if the given ObjMetadataSet has been fully reconciled.
func (m *ResourceManager) WaitForSet(set object.ObjMetadataSet, opts WaitOptions) error {
return m.WaitForSetWithContext(context.Background(), set, opts)
}

// WaitForSetWithContext checks if the given ObjMetadataSet has been fully reconciled.
// The provided context can be used to cancel the operation.
func (m *ResourceManager) WaitForSetWithContext(ctx context.Context, set object.ObjMetadataSet, opts WaitOptions) error {
statusCollector := collector.NewResourceStatusCollector(set)
canceledInternally := false

ctx, cancel := context.WithTimeout(context.Background(), opts.Timeout)
ctx, cancel := context.WithTimeout(ctx, opts.Timeout)
defer cancel()

pollingOpts := polling.PollOptions{
Expand Down Expand Up @@ -110,6 +117,7 @@ func (m *ResourceManager) WaitForSet(set object.ObjMetadataSet, opts WaitOptions
desired := status.CurrentStatus
aggStatus := aggregator.AggregateStatus(rss, desired)
if aggStatus == desired || (opts.FailFast && countFailed > 0) {
canceledInternally = true
cancel()
return
}
Expand All @@ -118,6 +126,11 @@ func (m *ResourceManager) WaitForSet(set object.ObjMetadataSet, opts WaitOptions

<-done

// If the context was cancelled externally, return early.
if !canceledInternally && errors.Is(ctx.Err(), context.Canceled) {
return ctx.Err()
}

if statusCollector.Error != nil {
return statusCollector.Error
}
Expand All @@ -127,15 +140,9 @@ func (m *ResourceManager) WaitForSet(set object.ObjMetadataSet, opts WaitOptions
switch {
case rs == nil || lastStatus[id] == nil:
errs = append(errs, fmt.Sprintf("can't determine status for %s", utils.FmtObjMetadata(id)))
case lastStatus[id].Status == status.FailedStatus:
var builder strings.Builder
builder.WriteString(fmt.Sprintf("%s status: '%s'",
utils.FmtObjMetadata(rs.Identifier), lastStatus[id].Status))
if rs.Error != nil {
builder.WriteString(fmt.Sprintf(": %s", rs.Error))
}
errs = append(errs, builder.String())
case errors.Is(ctx.Err(), context.DeadlineExceeded) && lastStatus[id].Status != status.CurrentStatus:
case lastStatus[id].Status == status.FailedStatus,
errors.Is(ctx.Err(), context.DeadlineExceeded) &&
lastStatus[id].Status != status.CurrentStatus:
var builder strings.Builder
builder.WriteString(fmt.Sprintf("%s status: '%s'",
utils.FmtObjMetadata(rs.Identifier), lastStatus[id].Status))
Expand Down
54 changes: 54 additions & 0 deletions ssa/manager_wait_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,11 @@ func TestWaitForSet_ErrorOnReaderError(t *testing.T) {
},
})

// Restore the original poller otherwise all other tests will fail
defer func() {
manager.poller = poller
}()

set := []object.ObjMetadata{{
Name: "test",
Namespace: "default",
Expand All @@ -278,6 +283,55 @@ func TestWaitForSet_ErrorOnReaderError(t *testing.T) {
g.Expect(err.Error()).To(Equal("timeout waiting for: [ConfigMap/default/test status: 'Unknown': error reading status]"))
}

func TestWaitWithContext_Cancellation(t *testing.T) {
g := NewWithT(t)

id := generateName("cancellation")
objects, err := readManifest("testdata/test2.yaml", id)
g.Expect(err).NotTo(HaveOccurred())

// Apply objects to the cluster which will never reach Ready state
manager.SetOwnerLabels(objects, "app1", "cancellation")
changeSet, err := manager.ApplyAllStaged(context.Background(), objects, ApplyOptions{
WaitInterval: 500 * time.Millisecond,
WaitTimeout: 5 * time.Second,
})
g.Expect(err).NotTo(HaveOccurred())

// Create a context that we can cancel for the wait operation
ctx, cancel := context.WithCancel(context.Background())

// Configure wait options with a longer timeout to ensure we can cancel before it times out
waitOpts := WaitOptions{
Interval: 500 * time.Millisecond,
Timeout: 5 * time.Second,
FailFast: true,
}

// Channel to capture the error from WaitForSetWithContext
errChan := make(chan error, 1)

// Start WaitForSetWithContext in a goroutine
go func() {
errChan <- manager.WaitForSetWithContext(ctx, changeSet.ToObjMetadataSet(), waitOpts)
}()

// Wait for one second to ensure WaitForSetWithContext has started
time.Sleep(time.Second)

// Cancel the context to trigger early exit
cancel()

// Wait for the goroutine to finish and verify it returned due to context cancellation
select {
case waitErr := <-errChan:
g.Expect(waitErr).To(HaveOccurred(), "Expected an error due to context cancellation")
g.Expect(waitErr).To(Equal(context.Canceled))
case <-time.After(2 * time.Second):
t.Fatal("WaitForSetWithContext did not return within expected time after cancellation")
}
}

func TestWaitForSetTermination(t *testing.T) {
timeout := 10 * time.Second
ctx, cancel := context.WithTimeout(context.Background(), timeout)
Expand Down
Loading