From 46d9a7b3796b8f547ec2eaaca438f8987c50545f Mon Sep 17 00:00:00 2001 From: PJ Doerner Date: Thu, 7 Aug 2025 09:14:33 -0700 Subject: [PATCH 1/3] Improve Nexus cancellation type test assertions --- test/nexus_test.go | 54 ++++++++++++++++++++++++++++++++++------------ 1 file changed, 40 insertions(+), 14 deletions(-) diff --git a/test/nexus_test.go b/test/nexus_test.go index 3267f4c28..60ab3266a 100644 --- a/test/nexus_test.go +++ b/test/nexus_test.go @@ -1061,24 +1061,48 @@ func TestAsyncOperationFromWorkflow(t *testing.T) { }) } -func runCancellationTypeTest(ctx context.Context, tc *testContext, cancellationType workflow.NexusOperationCancellationType, t *testing.T) (client.WorkflowRun, string, time.Time) { +// cancelTypeOp is a wrapper for a workflow run operation that delays responding to the cancel request so that time +// based assertions aren't flakey. +type cancelTypeOp struct { + nexus.UnimplementedOperation[string, string] + workflowRunOp nexus.Operation[string, string] + cancelDelay time.Duration +} + +func (o *cancelTypeOp) Name() string { + return o.workflowRunOp.Name() +} + +func (o *cancelTypeOp) Start(ctx context.Context, input string, options nexus.StartOperationOptions) (nexus.HandlerStartOperationResult[string], error) { + return o.workflowRunOp.Start(ctx, input, options) +} + +func (o *cancelTypeOp) Cancel(ctx context.Context, token string, options nexus.CancelOperationOptions) error { + time.Sleep(o.cancelDelay) + return o.workflowRunOp.Cancel(ctx, token, options) +} + +func runCancellationTypeTest(ctx context.Context, tc *testContext, cancellationType workflow.NexusOperationCancellationType, cancelDelay time.Duration, t *testing.T) (client.WorkflowRun, string, time.Time) { handlerWf := func(ctx workflow.Context, ownID string) (string, error) { err := workflow.Await(ctx, func() bool { return false }) // Delay completion after receiving cancellation so that assertions on end time aren't flakey. disconCtx, _ := workflow.NewDisconnectedContext(ctx) - _ = workflow.Sleep(disconCtx, time.Second) + _ = workflow.Sleep(disconCtx, 2*time.Second) return "", err } handlerID := atomic.Value{} - op := temporalnexus.NewWorkflowRunOperation( - "workflow-op", - handlerWf, - func(ctx context.Context, _ string, soo nexus.StartOperationOptions) (client.StartWorkflowOptions, error) { - handlerID.Store(soo.RequestID) - return client.StartWorkflowOptions{ID: soo.RequestID}, nil - }, - ) + op := &cancelTypeOp{ + cancelDelay: cancelDelay, + workflowRunOp: temporalnexus.NewWorkflowRunOperation( + "workflow-op", + handlerWf, + func(ctx context.Context, _ string, soo nexus.StartOperationOptions) (client.StartWorkflowOptions, error) { + handlerID.Store(soo.RequestID) + return client.StartWorkflowOptions{ID: soo.RequestID}, nil + }, + ), + } var unblockedTime time.Time callerWf := func(ctx workflow.Context, cancellation workflow.NexusOperationCancellationType) error { @@ -1148,7 +1172,7 @@ func TestAsyncOperationFromWorkflow_CancellationTypes(t *testing.T) { defer cancel() tc := newTestContext(t, ctx) - callerRun, handlerID, unblockedTime := runCancellationTypeTest(ctx, tc, workflow.NexusOperationCancellationTypeAbandon, t) + callerRun, handlerID, unblockedTime := runCancellationTypeTest(ctx, tc, workflow.NexusOperationCancellationTypeAbandon, 0, t) require.NotZero(t, unblockedTime) // Verify that caller never sent a cancellation request. @@ -1172,7 +1196,7 @@ func TestAsyncOperationFromWorkflow_CancellationTypes(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultNexusTestTimeout) defer cancel() tc := newTestContext(t, ctx) - callerRun, handlerID, unblockedTime := runCancellationTypeTest(ctx, tc, workflow.NexusOperationCancellationTypeTryCancel, t) + callerRun, handlerID, unblockedTime := runCancellationTypeTest(ctx, tc, workflow.NexusOperationCancellationTypeTryCancel, time.Second, t) // Verify operation future was unblocked after cancel command was recorded. callerHist := tc.client.GetWorkflowHistory(ctx, callerRun.GetID(), callerRun.GetRunID(), false, enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT) @@ -1185,6 +1209,8 @@ func TestAsyncOperationFromWorkflow_CancellationTypes(t *testing.T) { foundRequestedEvent = true require.Greater(t, unblockedTime, event.EventTime.AsTime().UTC()) } + require.NotEqual(t, enumspb.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUEST_COMPLETED, event.EventType) + require.NotEqual(t, enumspb.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUEST_FAILED, event.EventType) callerCloseEvent = event } require.True(t, foundRequestedEvent) @@ -1204,7 +1230,7 @@ func TestAsyncOperationFromWorkflow_CancellationTypes(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultNexusTestTimeout) defer cancel() tc := newTestContext(t, ctx) - callerRun, handlerID, unblockedTime := runCancellationTypeTest(ctx, tc, workflow.NexusOperationCancellationTypeWaitRequested, t) + callerRun, handlerID, unblockedTime := runCancellationTypeTest(ctx, tc, workflow.NexusOperationCancellationTypeWaitRequested, 0, t) // Verify operation future was unblocked after cancel request was delivered. callerHist := tc.client.GetWorkflowHistory(ctx, callerRun.GetID(), callerRun.GetRunID(), false, enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT) @@ -1236,7 +1262,7 @@ func TestAsyncOperationFromWorkflow_CancellationTypes(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultNexusTestTimeout) defer cancel() tc := newTestContext(t, ctx) - callerRun, handlerID, unblockedTime := runCancellationTypeTest(ctx, tc, workflow.NexusOperationCancellationTypeWaitCompleted, t) + callerRun, handlerID, unblockedTime := runCancellationTypeTest(ctx, tc, workflow.NexusOperationCancellationTypeWaitCompleted, 0, t) // Verify operation future was unblocked after operation was cancelled. callerHist := tc.client.GetWorkflowHistory(ctx, callerRun.GetID(), callerRun.GetRunID(), false, enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT) From 9f7f884c4f87f4034b5fb249c5bfa33d7ddcc9ce Mon Sep 17 00:00:00 2001 From: PJ Doerner Date: Thu, 7 Aug 2025 10:15:15 -0700 Subject: [PATCH 2/3] Remove sleeps --- test/nexus_test.go | 51 +++++++++++++++++++++++++++------------------- 1 file changed, 30 insertions(+), 21 deletions(-) diff --git a/test/nexus_test.go b/test/nexus_test.go index 60ab3266a..0791f8a7a 100644 --- a/test/nexus_test.go +++ b/test/nexus_test.go @@ -9,7 +9,6 @@ import ( "os" "slices" "strings" - "sync/atomic" "testing" "time" @@ -1065,8 +1064,8 @@ func TestAsyncOperationFromWorkflow(t *testing.T) { // based assertions aren't flakey. type cancelTypeOp struct { nexus.UnimplementedOperation[string, string] - workflowRunOp nexus.Operation[string, string] - cancelDelay time.Duration + workflowRunOp nexus.Operation[string, string] + unblockCancelCh chan struct{} } func (o *cancelTypeOp) Name() string { @@ -1078,28 +1077,30 @@ func (o *cancelTypeOp) Start(ctx context.Context, input string, options nexus.St } func (o *cancelTypeOp) Cancel(ctx context.Context, token string, options nexus.CancelOperationOptions) error { - time.Sleep(o.cancelDelay) + if o.unblockCancelCh != nil { + // Should only be non-nil in the TRY_CANCEL case. + <-o.unblockCancelCh + } return o.workflowRunOp.Cancel(ctx, token, options) } -func runCancellationTypeTest(ctx context.Context, tc *testContext, cancellationType workflow.NexusOperationCancellationType, cancelDelay time.Duration, t *testing.T) (client.WorkflowRun, string, time.Time) { +func runCancellationTypeTest(ctx context.Context, tc *testContext, cancellationType workflow.NexusOperationCancellationType, unblockCancelCh chan struct{}, t *testing.T) (client.WorkflowRun, string, time.Time) { handlerWf := func(ctx workflow.Context, ownID string) (string, error) { err := workflow.Await(ctx, func() bool { return false }) // Delay completion after receiving cancellation so that assertions on end time aren't flakey. disconCtx, _ := workflow.NewDisconnectedContext(ctx) - _ = workflow.Sleep(disconCtx, 2*time.Second) + workflow.GetSignalChannel(disconCtx, "unblock").Receive(disconCtx, nil) return "", err } - handlerID := atomic.Value{} + handlerID := uuid.NewString() op := &cancelTypeOp{ - cancelDelay: cancelDelay, + unblockCancelCh: unblockCancelCh, workflowRunOp: temporalnexus.NewWorkflowRunOperation( "workflow-op", handlerWf, func(ctx context.Context, _ string, soo nexus.StartOperationOptions) (client.StartWorkflowOptions, error) { - handlerID.Store(soo.RequestID) - return client.StartWorkflowOptions{ID: soo.RequestID}, nil + return client.StartWorkflowOptions{ID: handlerID}, nil }, ), } @@ -1115,13 +1116,16 @@ func runCancellationTypeTest(ctx context.Context, tc *testContext, cancellationT return err } + disconCtx, _ := workflow.NewDisconnectedContext(ctx) // Use disconnected ctx so it is not auto canceled. if cancellation == workflow.NexusOperationCancellationTypeTryCancel || cancellation == workflow.NexusOperationCancellationTypeWaitRequested { - disconCtx, _ := workflow.NewDisconnectedContext(ctx) // Use disconnected ctx so it is not auto canceled. workflow.Go(disconCtx, func(ctx workflow.Context) { // Wake up the caller so it is not waiting for the operation to complete to get the next WFT. _ = workflow.Sleep(ctx, time.Millisecond) }) } + if cancellation == workflow.NexusOperationCancellationTypeWaitCompleted { + _ = workflow.SignalExternalWorkflow(disconCtx, handlerID, "", "unblock", nil).Get(disconCtx, nil) + } _ = fut.Get(ctx, nil) unblockedTime = workflow.Now(ctx).UTC() @@ -1143,11 +1147,7 @@ func runCancellationTypeTest(ctx context.Context, tc *testContext, cancellationT }, callerWf, cancellationType) require.NoError(t, err) require.Eventuallyf(t, func() bool { - id := handlerID.Load() - if id == nil { - return false - } - _, descErr := tc.client.DescribeWorkflow(ctx, id.(string), "") + _, descErr := tc.client.DescribeWorkflow(ctx, handlerID, "") return descErr == nil }, 2*time.Second, 20*time.Millisecond, "timed out waiting for handler wf to start") require.NoError(t, tc.client.CancelWorkflow(ctx, run.GetID(), run.GetRunID())) @@ -1159,7 +1159,15 @@ func runCancellationTypeTest(ctx context.Context, tc *testContext, cancellationT var canceledErr *temporal.CanceledError require.ErrorAs(t, err, &canceledErr) - return run, handlerID.Load().(string), unblockedTime + if unblockCancelCh != nil { + // Should only be non-nil in the TRY_CANCEL case. + close(unblockCancelCh) + } + if cancellationType != workflow.NexusOperationCancellationTypeWaitCompleted { + require.NoError(t, tc.client.SignalWorkflow(ctx, handlerID, "", "unblock", nil)) + } + + return run, handlerID, unblockedTime } func TestAsyncOperationFromWorkflow_CancellationTypes(t *testing.T) { @@ -1172,7 +1180,7 @@ func TestAsyncOperationFromWorkflow_CancellationTypes(t *testing.T) { defer cancel() tc := newTestContext(t, ctx) - callerRun, handlerID, unblockedTime := runCancellationTypeTest(ctx, tc, workflow.NexusOperationCancellationTypeAbandon, 0, t) + callerRun, handlerID, unblockedTime := runCancellationTypeTest(ctx, tc, workflow.NexusOperationCancellationTypeAbandon, nil, t) require.NotZero(t, unblockedTime) // Verify that caller never sent a cancellation request. @@ -1196,7 +1204,8 @@ func TestAsyncOperationFromWorkflow_CancellationTypes(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultNexusTestTimeout) defer cancel() tc := newTestContext(t, ctx) - callerRun, handlerID, unblockedTime := runCancellationTypeTest(ctx, tc, workflow.NexusOperationCancellationTypeTryCancel, time.Second, t) + unblockCancelCh := make(chan struct{}) + callerRun, handlerID, unblockedTime := runCancellationTypeTest(ctx, tc, workflow.NexusOperationCancellationTypeTryCancel, unblockCancelCh, t) // Verify operation future was unblocked after cancel command was recorded. callerHist := tc.client.GetWorkflowHistory(ctx, callerRun.GetID(), callerRun.GetRunID(), false, enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT) @@ -1230,7 +1239,7 @@ func TestAsyncOperationFromWorkflow_CancellationTypes(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultNexusTestTimeout) defer cancel() tc := newTestContext(t, ctx) - callerRun, handlerID, unblockedTime := runCancellationTypeTest(ctx, tc, workflow.NexusOperationCancellationTypeWaitRequested, 0, t) + callerRun, handlerID, unblockedTime := runCancellationTypeTest(ctx, tc, workflow.NexusOperationCancellationTypeWaitRequested, nil, t) // Verify operation future was unblocked after cancel request was delivered. callerHist := tc.client.GetWorkflowHistory(ctx, callerRun.GetID(), callerRun.GetRunID(), false, enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT) @@ -1262,7 +1271,7 @@ func TestAsyncOperationFromWorkflow_CancellationTypes(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultNexusTestTimeout) defer cancel() tc := newTestContext(t, ctx) - callerRun, handlerID, unblockedTime := runCancellationTypeTest(ctx, tc, workflow.NexusOperationCancellationTypeWaitCompleted, 0, t) + callerRun, handlerID, unblockedTime := runCancellationTypeTest(ctx, tc, workflow.NexusOperationCancellationTypeWaitCompleted, nil, t) // Verify operation future was unblocked after operation was cancelled. callerHist := tc.client.GetWorkflowHistory(ctx, callerRun.GetID(), callerRun.GetRunID(), false, enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT) From 797f29a42121747d9413e713f42a1ea13e0bc0b4 Mon Sep 17 00:00:00 2001 From: PJ Doerner Date: Thu, 7 Aug 2025 10:32:42 -0700 Subject: [PATCH 3/3] reduce flakes --- test/nexus_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/nexus_test.go b/test/nexus_test.go index 0791f8a7a..248ce3346 100644 --- a/test/nexus_test.go +++ b/test/nexus_test.go @@ -1282,7 +1282,7 @@ func TestAsyncOperationFromWorkflow_CancellationTypes(t *testing.T) { require.NoError(t, err) if event.EventType == enumspb.EVENT_TYPE_NEXUS_OPERATION_CANCELED { foundCancelledEvent = true - require.Greater(t, unblockedTime, event.EventTime.AsTime().UTC()) + require.GreaterOrEqual(t, unblockedTime, event.EventTime.AsTime().UTC()) } callerCloseEvent = event }