Skip to content

Commit ad0e4b2

Browse files
authored
Merge pull request #53 from lucaslorentz/support-dtfx-rewind
Add support for DTFx native rewind
2 parents a154947 + 986b8b9 commit ad0e4b2

File tree

4 files changed

+75
-47
lines changed

4 files changed

+75
-47
lines changed

src/LLL.DurableTask.Core/Serializing/HistoryEventConverter.cs

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ public class HistoryEventConverter : JsonConverter
1717
//{ EventType.ExecutionFailed, typeof(ExecutionFailedEvent) },
1818
{ EventType.ExecutionStarted, typeof(ExecutionStartedEvent) },
1919
{ EventType.ExecutionTerminated, typeof(ExecutionTerminatedEvent) },
20+
{ EventType.ExecutionRewound, typeof(ExecutionRewoundEvent) },
2021
{ EventType.GenericEvent, typeof(GenericEvent) },
2122
{ EventType.HistoryState, typeof(HistoryStateEvent) },
2223
{ EventType.OrchestratorCompleted, typeof(OrchestratorCompletedEvent) },
@@ -43,15 +44,24 @@ public override object ReadJson(JsonReader reader, Type objectType, object exist
4344
{
4445
var jObject = JObject.Load(reader);
4546

46-
var eventTypeToken = jObject.GetValue("EventType", StringComparison.OrdinalIgnoreCase);
47+
var eventType = jObject.GetValue("EventType", StringComparison.OrdinalIgnoreCase)
48+
?.ToObject<EventType>()
49+
?? throw new Exception("Expected EventType field in HistoryEvent");
4750

48-
if (eventTypeToken is null)
49-
throw new Exception("Expected EventType field in HistoryEvent");
50-
51-
var eventType = eventTypeToken.ToObject<EventType>();
51+
var eventId = jObject.GetValue("EventId", StringComparison.OrdinalIgnoreCase)
52+
?.ToObject<int>()
53+
?? throw new Exception("Expected EventId field in HistoryEvent");
5254

5355
var type = _typesMap[eventType];
5456

57+
if (type == typeof(ExecutionRewoundEvent))
58+
{
59+
// Handles multiple constructors present in ExecutionRewoundEvent
60+
var @event = new ExecutionRewoundEvent(eventId);
61+
serializer.Populate(jObject.CreateReader(), @event);
62+
return @event;
63+
}
64+
5565
return jObject.ToObject(type, serializer);
5666
}
5767

src/LLL.DurableTask.EFCore/EFCoreOrchestrationOptions.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,5 @@ public class EFCoreOrchestrationOptions
1717
public TimeSpan ActivtyLockTimeout { get; set; } = TimeSpan.FromMinutes(1);
1818
public TimeSpan FetchNewMessagesPollingTimeout { get; set; } = TimeSpan.FromSeconds(10);
1919
public int DelayInSecondsAfterFailure { get; set; } = 5;
20+
public bool UseDTFxRewind { get; set; } = true;
2021
}

src/LLL.DurableTask.EFCore/EFCoreOrchestrationServiceClient.cs

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
using DurableTask.Core.Exceptions;
88
using DurableTask.Core.History;
99
using DurableTask.Core.Query;
10+
using DurableTask.Core.Tracing;
1011
using LLL.DurableTask.Core;
1112
using LLL.DurableTask.EFCore.Extensions;
1213
using LLL.DurableTask.EFCore.Mappers;
@@ -240,9 +241,25 @@ public async Task<OrchestrationQueryResult> GetOrchestrationWithQueryAsync(Orche
240241

241242
public async Task RewindTaskOrchestrationAsync(string instanceId, string reason)
242243
{
243-
using var dbContext = _dbContextFactory.CreateDbContext();
244-
await RewindInstanceAsync(dbContext, instanceId, reason, true, FindLastErrorOrCompletionRewindPoint);
245-
await dbContext.SaveChangesAsync();
244+
if (_options.UseDTFxRewind)
245+
{
246+
var taskMessage = new TaskMessage
247+
{
248+
OrchestrationInstance = new OrchestrationInstance { InstanceId = instanceId },
249+
Event = new ExecutionRewoundEvent(-1, reason)
250+
{
251+
// Set a dummy trace context to avoid an exception in DTFx
252+
ParentTraceContext = new DistributedTraceContext($"{instanceId}")
253+
}
254+
};
255+
await SendTaskOrchestrationMessageAsync(taskMessage);
256+
}
257+
else
258+
{
259+
using var dbContext = _dbContextFactory.CreateDbContext();
260+
await RewindInstanceAsync(dbContext, instanceId, reason, true, FindLastErrorOrCompletionRewindPoint);
261+
await dbContext.SaveChangesAsync();
262+
}
246263
}
247264

248265
private async Task RewindInstanceAsync(OrchestrationDbContext dbContext, string instanceId, string reason, bool rewindParents, Func<IList<HistoryEvent>, HistoryEvent> findRewindPoint)

src/LLL.DurableTask.EFCore/EFCoreOrchestrationSession.cs

Lines changed: 39 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -70,54 +70,38 @@ public async Task<IList<TaskMessage>> FetchNewMessagesAsync(
7070
.OrderBy(w => w.AvailableAt)
7171
.ThenBy(w => w.SequenceNumber)
7272
.AsNoTracking()
73-
.ToArrayAsync(cancellationToken);
74-
75-
var messagesToDiscard = newDbMessages
76-
.Where(m => m.ExecutionId is not null && m.ExecutionId != Instance.LastExecutionId)
77-
.ToArray();
78-
79-
if (messagesToDiscard.Length > 0)
80-
{
81-
foreach (var message in messagesToDiscard)
82-
{
83-
dbContext.OrchestrationMessages.Attach(message);
84-
dbContext.OrchestrationMessages.Remove(message);
85-
}
86-
87-
newDbMessages = newDbMessages
88-
.Except(messagesToDiscard)
89-
.ToArray();
90-
}
73+
.ToListAsync(cancellationToken);
9174

9275
var deserializedMessages = newDbMessages
9376
.Select(w => _options.DataConverter.Deserialize<TaskMessage>(w.Message))
9477
.ToList();
9578

96-
if (RuntimeState.ExecutionStartedEvent is not null)
79+
if (RuntimeState.ExecutionStartedEvent is not null
80+
&& RuntimeState.OrchestrationStatus is OrchestrationStatus.Completed
81+
&& deserializedMessages.Any(m => m.Event.EventType == EventType.EventRaised))
9782
{
98-
if (RuntimeState.OrchestrationStatus is OrchestrationStatus.Completed
99-
&& deserializedMessages.Any(m => m.Event.EventType == EventType.EventRaised))
100-
{
101-
// Reopen completed orchestrations after receiving an event raised
102-
RuntimeState = new OrchestrationRuntimeState(
103-
RuntimeState.Events.Reopen(_options.DataConverter)
104-
);
105-
}
83+
// Reopen completed orchestrations after receiving an event raised
84+
RuntimeState = new OrchestrationRuntimeState(
85+
RuntimeState.Events.Reopen(_options.DataConverter)
86+
);
87+
}
88+
89+
var isRunning = RuntimeState.ExecutionStartedEvent is null
90+
|| RuntimeState.OrchestrationStatus is OrchestrationStatus.Running
91+
or OrchestrationStatus.Suspended
92+
or OrchestrationStatus.Pending;
10693

107-
var isRunning = RuntimeState.OrchestrationStatus is OrchestrationStatus.Running
108-
or OrchestrationStatus.Suspended
109-
or OrchestrationStatus.Pending;
94+
for (var i = newDbMessages.Count - 1; i >= 0; i--)
95+
{
96+
var dbMessage = newDbMessages[i];
97+
var deserializedMessage = deserializedMessages[i];
11098

111-
if (!isRunning)
99+
if (ShouldDropNewMessage(isRunning, dbMessage, deserializedMessage))
112100
{
113-
// Discard all messages if not running
114-
foreach (var message in newDbMessages)
115-
{
116-
dbContext.OrchestrationMessages.Attach(message);
117-
dbContext.OrchestrationMessages.Remove(message);
118-
}
119-
newDbMessages = [];
120-
deserializedMessages = [];
101+
dbContext.OrchestrationMessages.Attach(dbMessage);
102+
dbContext.OrchestrationMessages.Remove(dbMessage);
103+
newDbMessages.RemoveAt(i);
104+
deserializedMessages.RemoveAt(i);
121105
}
122106
}
123107

@@ -126,6 +110,22 @@ or OrchestrationStatus.Suspended
126110
return deserializedMessages;
127111
}
128112

113+
private bool ShouldDropNewMessage(
114+
bool isRunning,
115+
OrchestrationMessage dbMessage,
116+
TaskMessage taskMessage)
117+
{
118+
// Drop messages to previous executions
119+
if (dbMessage.ExecutionId is not null && dbMessage.ExecutionId != Instance.LastExecutionId)
120+
return true;
121+
122+
// When not running, drop anything that is not execution rewound
123+
if (!isRunning && taskMessage.Event.EventType != EventType.ExecutionRewound)
124+
return true;
125+
126+
return false;
127+
}
128+
129129
public void ClearMessages()
130130
{
131131
Messages.Clear();

0 commit comments

Comments
 (0)