Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,18 +1,24 @@
# Changelog

## v1.5.2

### Updates

* Fix issue where scheduled orchestrations aren't immediately terminated ([#178](https://github.com/microsoft/durabletask-mssql/issues/178))

## v1.5.1

### Updates

* Use Singelton metrics provider instead of IScaleMonitor and ITargetScaler ([#285]https://github.com/microsoft/durabletask-mssql/pull/285)
* Use Singleton metrics provider instead of IScaleMonitor and ITargetScaler ([#285](https://github.com/microsoft/durabletask-mssql/pull/285))
* Updated repo to use central package management
* Resolved multiple CVEs in dependencies

## v1.5.0

### Updates

* Updated Microsoft.Azure.WebJobs.Extensions.DurableTask dependency to 3.0.0 and DurableTask.Core to 3.*. ([#281]https://github.com/microsoft/durabletask-mssql/pull/281)
* Updated Microsoft.Azure.WebJobs.Extensions.DurableTask dependency to 3.0.0 and DurableTask.Core to 3.*. ([#281](https://github.com/microsoft/durabletask-mssql/pull/281))
* Removed `netstandard2.0` TFM from Microsoft.DurableTask.SqlServer.AzureFunctions

## v1.4.0
Expand Down
1 change: 1 addition & 0 deletions src/DurableTask.SqlServer/DTUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ public static bool HasPayload(HistoryEvent e)
return historyEvent.EventType switch
{
EventType.ExecutionStarted => ((ExecutionStartedEvent)historyEvent).Version,
EventType.SubOrchestrationInstanceCreated => ((SubOrchestrationInstanceCreatedEvent)historyEvent).Version,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an unrelated fix that I happened to notice independently. I'm just adding it to this PR as a convenience since it's a small change.

EventType.TaskScheduled => ((TaskScheduledEvent)historyEvent).Version,
_ => null,
};
Expand Down
87 changes: 56 additions & 31 deletions src/DurableTask.SqlServer/Scripts/logic.sql
Original file line number Diff line number Diff line change
Expand Up @@ -465,49 +465,74 @@ BEGIN
-- *** IMPORTANT ***
-- To prevent deadlocks, it is important to maintain consistent table access
-- order across all stored procedures that execute within a transaction.
-- Table order for this sproc: Instances --> (NewEvents --> Payloads --> NewEvents)
-- Table order for this sproc: Instances --> (Payloads --> Instances --> NewEvents)

DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub()

DECLARE @existingStatus varchar(30) = (
SELECT TOP 1 existing.[RuntimeStatus]
FROM Instances existing WITH (HOLDLOCK)
WHERE [TaskHub] = @TaskHub AND [InstanceID] = @InstanceID
)
DECLARE @existingStatus varchar(30)
DECLARE @existingLockExpiration datetime2(7)

-- Get the status of an existing orchestration
SELECT TOP 1
@existingStatus = existing.[RuntimeStatus],
@existingLockExpiration = existing.[LockExpiration]
FROM Instances existing WITH (HOLDLOCK)
WHERE [TaskHub] = @TaskHub AND [InstanceID] = @InstanceID

IF @existingStatus IS NULL
BEGIN
ROLLBACK TRANSACTION;
THROW 50000, 'The instance does not exist.', 1;
END
-- If the instance is already completed, no need to terminate it.
IF @existingStatus IN ('Pending', 'Running')

DECLARE @now datetime2(7) = SYSUTCDATETIME()

IF @existingStatus IN ('Running', 'Pending')
BEGIN
IF NOT EXISTS (
SELECT TOP (1) 1 FROM NewEvents
WHERE [TaskHub] = @TaskHub AND [InstanceID] = @InstanceID AND [EventType] = 'ExecutionTerminated'
)
-- Create a payload to store the reason, if any
DECLARE @PayloadID uniqueidentifier = NULL
IF @Reason IS NOT NULL
BEGIN
-- Payloads are stored separately from the events
DECLARE @PayloadID uniqueidentifier = NULL
IF @Reason IS NOT NULL
-- Note that we don't use the Reason column for the Reason with terminate events
SET @PayloadID = NEWID()
INSERT INTO Payloads ([TaskHub], [InstanceID], [PayloadID], [Text])
VALUES (@TaskHub, @InstanceID, @PayloadID, @Reason)
END

-- Check the status of the orchestration to determine which termination path to take
IF @existingStatus = 'Pending' AND (@existingLockExpiration IS NULL OR @existingLockExpiration <= @now)
BEGIN
-- The orchestration hasn't started yet - transition it directly to the Terminated state and delete
-- any pending messages
UPDATE Instances SET
[RuntimeStatus] = 'Terminated',
[LastUpdatedTime] = @now,
[CompletedTime] = @now,
[OutputPayloadID] = @PayloadID,
[LockExpiration] = NULL -- release the lock, if any
WHERE [TaskHub] = @TaskHub AND [InstanceID] = @InstanceID

DELETE FROM NewEvents WHERE [TaskHub] = @TaskHub AND [InstanceID] = @InstanceID
END
ELSE
BEGIN
-- The orchestration has actually started running in this case
IF NOT EXISTS (
SELECT TOP (1) 1 FROM NewEvents
WHERE [TaskHub] = @TaskHub AND [InstanceID] = @InstanceID AND [EventType] = 'ExecutionTerminated'
)
BEGIN
-- Note that we don't use the Reason column for the Reason with terminate events
SET @PayloadID = NEWID()
INSERT INTO Payloads ([TaskHub], [InstanceID], [PayloadID], [Text])
VALUES (@TaskHub, @InstanceID, @PayloadID, @Reason)
INSERT INTO NewEvents (
[TaskHub],
[InstanceID],
[EventType],
[PayloadID]
) VALUES (
@TaskHub,
@InstanceID,
'ExecutionTerminated',
@PayloadID)
END

INSERT INTO NewEvents (
[TaskHub],
[InstanceID],
[EventType],
[PayloadID]
) VALUES (
@TaskHub,
@InstanceID,
'ExecutionTerminated',
@PayloadID)
END
END

Expand Down Expand Up @@ -1444,7 +1469,7 @@ BEGIN
-- Instance IDs can be overwritten only if the orchestration is in a terminal state
IF @existingStatus NOT IN ('Failed')
BEGIN
DECLARE @msg nvarchar(4000) = FORMATMESSAGE('Cannot rewing instance with ID ''%s'' because it is not in a ''Failed'' state, but in ''%s'' state.', @InstanceID, @existingStatus);
DECLARE @msg nvarchar(4000) = FORMATMESSAGE('Cannot rewind instance with ID ''%s'' because it is not in a ''Failed'' state, but in ''%s'' state.', @InstanceID, @existingStatus);
THROW 50001, @msg, 1;
END

Expand Down
8 changes: 7 additions & 1 deletion src/DurableTask.SqlServer/SqlOrchestrationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -335,8 +335,14 @@ public override async Task CompleteTaskOrchestrationWorkItemAsync(
IList<TaskMessage> orchestratorMessages,
IList<TaskMessage> timerMessages,
TaskMessage continuedAsNewMessage,
OrchestrationState orchestrationState)
OrchestrationState? orchestrationState)
{
if (orchestrationState is null || !newRuntimeState.IsValid)
{
// The work item was invalid. We can't do anything with it so we ignore it.
return;
}

ExtendedOrchestrationWorkItem currentWorkItem = (ExtendedOrchestrationWorkItem)workItem;

this.traceHelper.CheckpointStarting(orchestrationState);
Expand Down
2 changes: 1 addition & 1 deletion src/common.props
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
<PropertyGroup>
<MajorVersion>1</MajorVersion>
<MinorVersion>5</MinorVersion>
<PatchVersion>1</PatchVersion>
<PatchVersion>2</PatchVersion>
<VersionPrefix>$(MajorVersion).$(MinorVersion).$(PatchVersion)</VersionPrefix>
<VersionSuffix></VersionSuffix>
<AssemblyVersion>$(MajorVersion).$(MinorVersion).0.0</AssemblyVersion>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ async Task ValidateDatabaseSchemaAsync(TestDatabase database, string schemaName
schemaName);
Assert.Equal(1, currentSchemaVersion.Major);
Assert.Equal(5, currentSchemaVersion.Minor);
Assert.Equal(1, currentSchemaVersion.Patch);
Assert.Equal(2, currentSchemaVersion.Patch);
}

sealed class TestDatabase : IDisposable
Expand Down
29 changes: 29 additions & 0 deletions test/DurableTask.SqlServer.Tests/Integration/Orchestrations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -863,5 +863,34 @@ await Assert.ThrowsAnyAsync<OperationCanceledException>(
// Now the orchestration should complete immediately
await instance.WaitForCompletion(timeout: TimeSpan.FromSeconds(3), expectedOutput: EventCount);
}

[Fact]
public async Task TerminateScheduledOrchestration()
{
string orchestrationName = "ScheduledOrchestration";

// Does nothing except return the original input
TestInstance<object> instance = await this.testService.RunOrchestration(
input: (object)null,
orchestrationName,
version: null,
instanceId: null,
scheduledStartTime: DateTime.UtcNow.AddSeconds(30),
implementation: (ctx, input) => Task.FromResult("done"));

// Confirm that the orchestration is pending
OrchestrationState state = await instance.GetStateAsync();
Assert.Equal(OrchestrationStatus.Pending, state.OrchestrationStatus);

// Terminate the orchestration before it starts
await instance.TerminateAsync("Bye!");

// Confirm the orchestration was terminated
await instance.WaitForCompletion(
expectedStatus: OrchestrationStatus.Terminated,
expectedOutput: "Bye!");

LogAssert.NoWarningsOrErrors(this.testService.LogProvider);
}
}
}
69 changes: 64 additions & 5 deletions test/DurableTask.SqlServer.Tests/Utils/TestService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,32 @@ public Task<TestInstance<TInput>> RunOrchestration<TOutput, TInput>(
activities);
}

public Task<TestInstance<TInput>> RunOrchestration<TOutput, TInput>(
TInput input,
string orchestrationName,
string version,
string instanceId,
Func<OrchestrationContext, TInput, Task<TOutput>> implementation,
Action<OrchestrationContext, string, string> onEvent = null,
params (string name, TaskActivity activity)[] activities)
{
return this.RunOrchestration(
input,
orchestrationName,
version,
instanceId,
scheduledStartTime: null,
implementation,
onEvent,
activities);
}

public async Task<TestInstance<TInput>> RunOrchestration<TOutput, TInput>(
TInput input,
string orchestrationName,
string version,
string instanceId,
DateTime? scheduledStartTime,
Func<OrchestrationContext, TInput, Task<TOutput>> implementation,
Action<OrchestrationContext, string, string> onEvent = null,
params (string name, TaskActivity activity)[] activities)
Expand All @@ -147,19 +168,43 @@ public async Task<TestInstance<TInput>> RunOrchestration<TOutput, TInput>(
inputGenerator: i => input,
orchestrationName: orchestrationName,
version: version,
scheduledStartTime: scheduledStartTime,
implementation,
onEvent,
activities);

return instances[0];
}

public Task<IReadOnlyList<TestInstance<TInput>>> RunOrchestrations<TOutput, TInput>(
int count,
Func<int, string> instanceIdGenerator,
Func<int, TInput> inputGenerator,
string orchestrationName,
string version,
Func<OrchestrationContext, TInput, Task<TOutput>> implementation,
Action<OrchestrationContext, string, string> onEvent = null,
params (string name, TaskActivity activity)[] activities)
{
return this.RunOrchestrations(
count,
instanceIdGenerator,
inputGenerator,
orchestrationName,
version,
scheduledStartTime: null,
implementation,
onEvent,
activities);
}

public async Task<IReadOnlyList<TestInstance<TInput>>> RunOrchestrations<TOutput, TInput>(
int count,
Func<int, string> instanceIdGenerator,
Func<int, TInput> inputGenerator,
string orchestrationName,
string version,
DateTime? scheduledStartTime,
Func<OrchestrationContext, TInput, Task<TOutput>> implementation,
Action<OrchestrationContext, string, string> onEvent = null,
params (string name, TaskActivity activity)[] activities)
Expand All @@ -178,11 +223,25 @@ public async Task<IReadOnlyList<TestInstance<TInput>>> RunOrchestrations<TOutput
TInput input = inputGenerator != null ? inputGenerator(i) : default;

DateTime utcNow = DateTime.UtcNow;
OrchestrationInstance instance = await this.client.CreateOrchestrationInstanceAsync(
orchestrationName,
version,
instanceId,
input);

OrchestrationInstance instance;
if (scheduledStartTime.HasValue)
{
instance = await this.client.CreateScheduledOrchestrationInstanceAsync(
orchestrationName,
version,
instanceId,
input,
startAt: scheduledStartTime.Value);
}
else
{
instance = await this.client.CreateOrchestrationInstanceAsync(
orchestrationName,
version,
instanceId,
input);
}

return new TestInstance<TInput>(
this.client,
Expand Down
Loading