Skip to content

Commit 3819674

Browse files
committed
Initial implementation
1 parent d48539e commit 3819674

File tree

5 files changed

+347
-7
lines changed

5 files changed

+347
-7
lines changed
Lines changed: 227 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,227 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT License.
3+
4+
namespace DurableTask.SqlServer
5+
{
6+
using System;
7+
using System.Threading;
8+
using System.Threading.Tasks;
9+
using System.Linq;
10+
using DurableTask.Core;
11+
using DurableTask.Core.Entities;
12+
using System.Collections.Generic;
13+
using System.Diagnostics;
14+
15+
class EntitySqlBackendQueries : EntityBackendQueries
16+
{
17+
readonly SqlOrchestrationService orchestrationService;
18+
19+
static TimeSpan timeLimitForCleanEntityStorageLoop = TimeSpan.FromSeconds(5);
20+
21+
public EntitySqlBackendQueries(
22+
SqlOrchestrationService orchestrationService)
23+
{
24+
this.orchestrationService = orchestrationService;
25+
}
26+
27+
public async override Task<EntityMetadata?> GetEntityAsync(
28+
EntityId id,
29+
bool includeState = false,
30+
bool includeStateless = false,
31+
CancellationToken cancellation = default)
32+
{
33+
OrchestrationState? state = (await this.orchestrationService.GetOrchestrationStateAsync(id.ToString(), allExecutions: false)).FirstOrDefault();
34+
return this.GetEntityMetadata(state, includeStateless, includeState);
35+
}
36+
37+
public async override Task<EntityQueryResult> QueryEntitiesAsync(EntityQuery filter, CancellationToken cancellation)
38+
{
39+
int pageNumber = 0;
40+
if (!string.IsNullOrEmpty(filter.ContinuationToken) && !int.TryParse(filter.ContinuationToken, out pageNumber))
41+
{
42+
throw new ArgumentException($"Invalid continuation token {filter.ContinuationToken}");
43+
}
44+
45+
int retrievedResults = 0;
46+
IEnumerable<OrchestrationState> allResults = Array.Empty<OrchestrationState>();
47+
var stopwatch = new Stopwatch();
48+
stopwatch.Start();
49+
do
50+
{
51+
SqlOrchestrationQuery entityInstancesQuery = new SqlOrchestrationQuery()
52+
{
53+
PageSize = filter.PageSize.GetValueOrDefault(100),
54+
PageNumber = pageNumber,
55+
InstanceIdPrefix = filter.InstanceIdStartsWith,
56+
CreatedTimeFrom = filter.LastModifiedFrom.GetValueOrDefault(DateTime.MinValue),
57+
CreatedTimeTo = filter.LastModifiedTo.GetValueOrDefault(DateTime.MaxValue),
58+
FetchInput = filter.IncludeState,
59+
};
60+
IReadOnlyCollection<OrchestrationState> results = await this.orchestrationService.GetManyOrchestrationsAsync(entityInstancesQuery, cancellation);
61+
allResults = allResults.Concat(results);
62+
pageNumber++;
63+
64+
retrievedResults = results.Count;
65+
if (retrievedResults == 0)
66+
{
67+
pageNumber = -1;
68+
}
69+
} while (retrievedResults > 0 && stopwatch.ElapsedMilliseconds <= 100);
70+
71+
IEnumerable<EntityMetadata> entities = allResults.Select(result => this.GetEntityMetadata(result, filter.IncludeTransient, filter.IncludeState))
72+
.OfType<EntityMetadata>();
73+
74+
return new EntityQueryResult()
75+
{
76+
Results = entities,
77+
ContinuationToken = pageNumber < 0 ? null : pageNumber.ToString()
78+
};
79+
}
80+
81+
public async override Task<CleanEntityStorageResult> CleanEntityStorageAsync(CleanEntityStorageRequest request = default, CancellationToken cancellation = default)
82+
{
83+
DateTime now = DateTime.UtcNow;
84+
int emptyEntitiesRemoved = 0;
85+
int orphanedLocksReleased = 0;
86+
int pageNumber = 0;
87+
if (!string.IsNullOrEmpty(request.ContinuationToken) && !int.TryParse(request.ContinuationToken, out pageNumber))
88+
{
89+
throw new ArgumentException($"Invalid continuation token {request.ContinuationToken}");
90+
}
91+
92+
int retrievedResults = 0;
93+
IEnumerable<OrchestrationState> allResults = Array.Empty<OrchestrationState>();
94+
var stopwatch = new Stopwatch();
95+
stopwatch.Start();
96+
do
97+
{
98+
SqlOrchestrationQuery entityInstancesQuery = new SqlOrchestrationQuery()
99+
{
100+
PageSize = 100,
101+
PageNumber = pageNumber,
102+
InstanceIdPrefix = "@",
103+
CreatedTimeFrom = DateTime.MinValue,
104+
CreatedTimeTo = DateTime.MaxValue,
105+
FetchInput = true,
106+
};
107+
108+
IReadOnlyCollection<OrchestrationState> page = await this.orchestrationService.GetManyOrchestrationsAsync(entityInstancesQuery, cancellation);
109+
110+
pageNumber++;
111+
retrievedResults = page.Count;
112+
if (retrievedResults == 0)
113+
{
114+
pageNumber = -1;
115+
break;
116+
}
117+
118+
var tasks = new List<Task>();
119+
IEnumerable<string> emptyEntityIds = new List<string>();
120+
121+
foreach (OrchestrationState state in page)
122+
{
123+
EntityStatus? status = ClientEntityHelpers.GetEntityStatus(state.Status);
124+
if (status != null)
125+
{
126+
if (request.ReleaseOrphanedLocks && status.LockedBy != null)
127+
{
128+
tasks.Add(CheckForOrphanedLockAndFixIt(state, status.LockedBy));
129+
}
130+
131+
if (request.RemoveEmptyEntities)
132+
{
133+
bool isEmptyEntity = !status.EntityExists && status.LockedBy == null && status.BacklogQueueSize == 0;
134+
bool safeToRemoveWithoutBreakingMessageSorterLogic =
135+
now - state.LastUpdatedTime > this.orchestrationService.EntityBackendProperties.EntityMessageReorderWindow;
136+
if (isEmptyEntity && safeToRemoveWithoutBreakingMessageSorterLogic)
137+
{
138+
emptyEntityIds.Append(state.OrchestrationInstance.InstanceId);
139+
orphanedLocksReleased++;
140+
}
141+
}
142+
}
143+
}
144+
145+
async Task CheckForOrphanedLockAndFixIt(OrchestrationState state, string lockOwner)
146+
{
147+
OrchestrationState? ownerState
148+
= (await this.orchestrationService.GetOrchestrationStateAsync(lockOwner, allExecutions: false)).FirstOrDefault();
149+
150+
bool OrchestrationIsRunning(OrchestrationStatus? status)
151+
=> status != null && (status == OrchestrationStatus.Running || status == OrchestrationStatus.Suspended);
152+
153+
if (!OrchestrationIsRunning(ownerState?.OrchestrationStatus))
154+
{
155+
// the owner is not a running orchestration. Send a lock release.
156+
EntityMessageEvent eventToSend = ClientEntityHelpers.EmitUnlockForOrphanedLock(state.OrchestrationInstance, lockOwner);
157+
await this.orchestrationService.SendTaskOrchestrationMessageAsync(eventToSend.AsTaskMessage());
158+
Interlocked.Increment(ref orphanedLocksReleased);
159+
}
160+
}
161+
162+
await this.orchestrationService.PurgeOrchestrationHistoryAsync(emptyEntityIds);
163+
164+
} while (retrievedResults > 0 && stopwatch.Elapsed <= timeLimitForCleanEntityStorageLoop);
165+
166+
return new CleanEntityStorageResult()
167+
{
168+
EmptyEntitiesRemoved = emptyEntitiesRemoved,
169+
OrphanedLocksReleased = orphanedLocksReleased,
170+
ContinuationToken = pageNumber < 0 ? null : pageNumber.ToString()
171+
};
172+
}
173+
174+
EntityMetadata? GetEntityMetadata(OrchestrationState? state, bool includeTransient, bool includeState)
175+
{
176+
if (state == null)
177+
{
178+
return null;
179+
}
180+
181+
if (!includeState)
182+
{
183+
if (!includeTransient)
184+
{
185+
// it is possible that this entity was logically deleted even though its orchestration was not purged yet.
186+
// we can check this efficiently (i.e. without deserializing anything) by looking at just the custom status
187+
if (!EntityStatus.TestEntityExists(state.Status))
188+
{
189+
return null;
190+
}
191+
}
192+
193+
EntityStatus? status = ClientEntityHelpers.GetEntityStatus(state.Status);
194+
195+
return new EntityMetadata()
196+
{
197+
EntityId = EntityId.FromString(state.OrchestrationInstance.InstanceId),
198+
LastModifiedTime = state.CreatedTime,
199+
BacklogQueueSize = status?.BacklogQueueSize ?? 0,
200+
LockedBy = status?.LockedBy,
201+
SerializedState = null, // we were instructed to not include the state
202+
};
203+
}
204+
else
205+
{
206+
// return the result to the user
207+
if (!includeTransient && state.Input == null)
208+
{
209+
return null;
210+
}
211+
else
212+
{
213+
EntityStatus? status = ClientEntityHelpers.GetEntityStatus(state.Status);
214+
215+
return new EntityMetadata()
216+
{
217+
EntityId = EntityId.FromString(state.OrchestrationInstance.InstanceId),
218+
LastModifiedTime = state.CreatedTime,
219+
BacklogQueueSize = status?.BacklogQueueSize ?? 0,
220+
LockedBy = status?.LockedBy,
221+
SerializedState = state.Input,
222+
};
223+
}
224+
}
225+
}
226+
}
227+
}

src/DurableTask.SqlServer/Scripts/logic.sql

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -627,7 +627,9 @@ GO
627627
CREATE OR ALTER PROCEDURE __SchemaNamePlaceholder__._LockNextOrchestration
628628
@BatchSize int,
629629
@LockedBy varchar(100),
630-
@LockExpiration datetime2
630+
@LockExpiration datetime2,
631+
-- Orchestration type: NULL = any, 0 = orchestration, 1 = entity
632+
@OrchestrationType BIT = NULL
631633
AS
632634
BEGIN
633635
DECLARE @now datetime2 = SYSUTCDATETIME()
@@ -662,7 +664,11 @@ BEGIN
662664
WHERE
663665
I.TaskHub = @TaskHub AND
664666
(I.[LockExpiration] IS NULL OR I.[LockExpiration] < @now) AND
665-
(E.[VisibleTime] IS NULL OR E.[VisibleTime] < @now)
667+
(E.[VisibleTime] IS NULL OR E.[VisibleTime] < @now) AND
668+
(@OrchestrationType IS NULL OR
669+
(@OrchestrationType = 0 AND I.[InstanceID] NOT LIKE '@%@%') OR
670+
(@OrchestrationType = 1 AND I.[InstanceID] LIKE '@%@%')
671+
)
666672

667673
-- Result #1: The list of new events to fetch.
668674
-- IMPORTANT: DO NOT CHANGE THE ORDER OF RETURNED COLUMNS!

src/DurableTask.SqlServer/SqlOrchestrationService.cs

Lines changed: 64 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ namespace DurableTask.SqlServer
1414
using System.Threading.Tasks;
1515
using DurableTask.Core;
1616
using DurableTask.Core.Common;
17+
using DurableTask.Core.Entities;
1718
using DurableTask.Core.Exceptions;
1819
using DurableTask.Core.History;
1920
using DurableTask.Core.Query;
@@ -53,6 +54,19 @@ public SqlOrchestrationService(SqlOrchestrationServiceSettings? settings)
5354

5455
public override int MaxConcurrentTaskActivityWorkItems => this.settings.MaxConcurrentActivities;
5556

57+
public override EntityBackendProperties EntityBackendProperties
58+
=> new EntityBackendProperties()
59+
{
60+
EntityMessageReorderWindow = TimeSpan.FromMinutes(this.settings.EntityMessageReorderWindowInMinutes),
61+
MaxEntityOperationBatchSize = this.settings.MaxEntityOperationBatchSize,
62+
MaxConcurrentTaskEntityWorkItems = this.settings.MaxConcurrentTaskEntityWorkItems,
63+
SupportsImplicitEntityDeletion = false, // not supported by this backend
64+
MaximumSignalDelayTime = TimeSpan.MaxValue,
65+
UseSeparateQueueForEntityWorkItems = this.settings.UseSeparateQueueForEntityWorkItems,
66+
};
67+
68+
public override EntityBackendQueries? EntityBackendQueries => new EntitySqlBackendQueries(this.EntityBackendProperties);
69+
5670
static SqlOrchestrationServiceSettings? ValidateSettings(SqlOrchestrationServiceSettings? settings)
5771
{
5872
if (settings != null)
@@ -112,9 +126,39 @@ public override Task DeleteAsync(bool deleteInstanceStore)
112126
return this.dbManager.DeleteSchemaAsync();
113127
}
114128

129+
public override async Task<TaskOrchestrationWorkItem> LockNextOrchestrationWorkItemAsync(TimeSpan receiveTimeout, CancellationToken cancellationToken)
130+
{
131+
#pragma warning disable CS8603 // Possible null reference return. Need to update base signature in IEntityOrchestrationService
132+
return await this.LockNextTaskOrchestrationWorkItemAsync(
133+
receiveTimeout,
134+
cancellationToken,
135+
OrchestrationFilterType.OrchestrationsOnly);
136+
# pragma warning restore CS8603
137+
}
138+
139+
public override async Task<TaskOrchestrationWorkItem> LockNextEntityWorkItemAsync(TimeSpan receiveTimeout, CancellationToken cancellationToken)
140+
{
141+
#pragma warning disable CS8603 // Possible null reference return. Need to update base signature in IEntityOrchestrationService
142+
return await this.LockNextTaskOrchestrationWorkItemAsync(
143+
receiveTimeout,
144+
cancellationToken,
145+
OrchestrationFilterType.EntitiesOnly);
146+
# pragma warning restore CS8603
147+
}
148+
115149
public override async Task<TaskOrchestrationWorkItem?> LockNextTaskOrchestrationWorkItemAsync(
116150
TimeSpan receiveTimeout,
117151
CancellationToken cancellationToken)
152+
{
153+
return await this.LockNextOrchestrationWorkItemAsync(
154+
receiveTimeout,
155+
cancellationToken);
156+
}
157+
158+
async Task<TaskOrchestrationWorkItem?> LockNextTaskOrchestrationWorkItemAsync(
159+
TimeSpan receiveTimeout,
160+
CancellationToken cancellationToken,
161+
OrchestrationFilterType orchestrationFilterType = OrchestrationFilterType.All)
118162
{
119163
bool isWaiting = false;
120164
Stopwatch stopwatch = Stopwatch.StartNew();
@@ -129,8 +173,16 @@ public override Task DeleteAsync(bool deleteInstanceStore)
129173
command.Parameters.Add("@BatchSize", SqlDbType.Int).Value = batchSize;
130174
command.Parameters.Add("@LockedBy", SqlDbType.VarChar, 100).Value = this.lockedByValue;
131175
command.Parameters.Add("@LockExpiration", SqlDbType.DateTime2).Value = lockExpiration;
176+
if (orchestrationFilterType == OrchestrationFilterType.OrchestrationsOnly)
177+
{
178+
command.Parameters.Add("@OrchestrationType", SqlDbType.Int).Value = 0;
179+
}
180+
else if (orchestrationFilterType == OrchestrationFilterType.EntitiesOnly)
181+
{
182+
command.Parameters.Add("@OrchestrationType", SqlDbType.Int).Value = 1;
183+
}
132184

133-
DbDataReader reader;
185+
DbDataReader reader;
134186

135187
try
136188
{
@@ -724,7 +776,7 @@ public override async Task<PurgeResult> PurgeInstanceStateAsync(PurgeInstanceFil
724776
FetchInput = false,
725777
FetchOutput = false,
726778
};
727-
779+
728780
if (purgeInstanceFilter.CreatedTimeTo != null)
729781
{
730782
purgeQuery.CreatedTimeTo = purgeInstanceFilter.CreatedTimeTo.Value;
@@ -734,9 +786,9 @@ public override async Task<PurgeResult> PurgeInstanceStateAsync(PurgeInstanceFil
734786
{
735787
purgeQuery.StatusFilter = new HashSet<OrchestrationStatus>(purgeInstanceFilter.RuntimeStatus);
736788
}
737-
789+
738790
IReadOnlyCollection<OrchestrationState> results = await this.GetManyOrchestrationsAsync(purgeQuery, CancellationToken.None);
739-
791+
740792
IEnumerable<string> instanceIds = results.Select(r => r.OrchestrationInstance.InstanceId);
741793
int purgedInstanceCount = await this.PurgeOrchestrationHistoryAsync(instanceIds);
742794
return new PurgeResult(purgedInstanceCount);
@@ -779,7 +831,7 @@ public override async Task<OrchestrationQueryResult> GetOrchestrationWithQueryAs
779831
}
780832

781833
IReadOnlyCollection<OrchestrationState> results = await this.GetManyOrchestrationsAsync(sqlOrchestrationQuery, cancellationToken);
782-
string? continuationToken =
834+
string? continuationToken =
783835
results.Count == sqlOrchestrationQuery.PageSize ? (sqlOrchestrationQuery.PageNumber + 1).ToString() : null;
784836
return new OrchestrationQueryResult(results, continuationToken);
785837
}
@@ -919,5 +971,12 @@ public ExtendedActivityWorkItem(TaskScheduledEvent scheduledEvent)
919971

920972
public TaskScheduledEvent ScheduledEvent { get; }
921973
}
974+
975+
enum OrchestrationFilterType
976+
{
977+
All,
978+
EntitiesOnly,
979+
OrchestrationsOnly
980+
}
922981
}
923982
}

0 commit comments

Comments
 (0)