Skip to content

Commit dac90d4

Browse files
committed
Add ExtendedOrchestrationContext to simplify workers API
1 parent 7d4094d commit dac90d4

File tree

15 files changed

+270
-224
lines changed

15 files changed

+270
-224
lines changed

samples/BpmnWorker/Orchestrations/BPMNOrchestrator.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -347,8 +347,7 @@ private async Task VisitIntermediateCatchMessageEvent(
347347
{
348348
var messageRef = messageEventDefinition.MessageRef.ToString();
349349
_logger.LogWarning("Waiting for message {message}", messageRef);
350-
351-
var result = await EventReceiver.WaitForEventAsync<JObject>(messageRef, _terminateCancellationTokenSource.Token);
350+
await Context.WaitForEventAsync<JObject>(messageRef, _terminateCancellationTokenSource.Token);
352351
_logger.LogWarning("Received message {message}", messageRef);
353352

354353
await VisitFlowNodes(GetParallelOutgoingNodes(intermediateCatchEvent));
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Threading;
4+
using System.Threading.Tasks;
5+
using DurableTask.Core;
6+
using DurableTask.Core.Serializing;
7+
using LLL.DurableTask.Worker.Utils;
8+
9+
namespace LLL.DurableTask.Worker
10+
{
11+
public class ExtendedOrchestrationContext
12+
{
13+
private static readonly Guid _orchestrationGuidNamespace = new("93810b2d-3411-4fc0-b51b-47f2233dac7a");
14+
private int _count = 0;
15+
16+
private readonly OrchestrationContext _baseContext;
17+
18+
public ExtendedOrchestrationContext(OrchestrationContext baseContext)
19+
{
20+
_baseContext = baseContext;
21+
}
22+
23+
public OrchestrationContext BaseContext => _baseContext;
24+
25+
/// <inheritdoc cref="OrchestrationContext.ContinueAsNew" />
26+
public void ContinueAsNew(object input)
27+
{
28+
BaseContext.ContinueAsNew(input);
29+
}
30+
31+
/// <inheritdoc cref="OrchestrationContext.ContinueAsNew" />
32+
public void ContinueAsNew(string newVersion, object input)
33+
{
34+
BaseContext.ContinueAsNew(newVersion, input);
35+
}
36+
37+
/// <inheritdoc cref="OrchestrationContext.CreateSubOrchestrationInstance" />
38+
public Task<T> CreateSubOrchestrationInstance<T>(string name, string version, object input)
39+
{
40+
return BaseContext.CreateSubOrchestrationInstance<T>(name, version, input);
41+
}
42+
43+
/// <inheritdoc cref="OrchestrationContext.CreateSubOrchestrationInstance" />
44+
public Task<T> CreateSubOrchestrationInstance<T>(string name, string version, string instanceId, object input)
45+
{
46+
return BaseContext.CreateSubOrchestrationInstance<T>(name, version, instanceId, input);
47+
}
48+
49+
/// <inheritdoc cref="OrchestrationContext.CreateSubOrchestrationInstance" />
50+
public Task<T> CreateSubOrchestrationInstance<T>(string name, string version, string instanceId, object input, IDictionary<string, string> tags)
51+
{
52+
return BaseContext.CreateSubOrchestrationInstance<T>(name, version, instanceId, input, tags);
53+
}
54+
55+
/// <inheritdoc cref="OrchestrationContext.CreateTimer" />
56+
public Task<T> CreateTimer<T>(DateTime fireAt, T state)
57+
{
58+
return BaseContext.CreateTimer(fireAt, state); ;
59+
}
60+
61+
/// <inheritdoc cref="OrchestrationContext.CreateTimer" />
62+
public Task<T> CreateTimer<T>(DateTime fireAt, T state, CancellationToken cancelToken)
63+
{
64+
return BaseContext.CreateTimer(fireAt, state, cancelToken);
65+
}
66+
67+
/// <inheritdoc cref="OrchestrationContext.ScheduleTask" />
68+
public Task<TResult> ScheduleTask<TResult>(string name, string version, params object[] parameters)
69+
{
70+
return BaseContext.ScheduleTask<TResult>(name, version, parameters);
71+
}
72+
73+
/// <inheritdoc cref="OrchestrationContext.SendEvent" />
74+
public void SendEvent(OrchestrationInstance orchestrationInstance, string eventName, object eventData)
75+
{
76+
BaseContext.SendEvent(orchestrationInstance, eventName, eventData);
77+
}
78+
79+
/// <inheritdoc cref="OrchestrationContext.CurrentUtcDateTime" />
80+
public virtual DateTime CurrentUtcDateTime => BaseContext.CurrentUtcDateTime;
81+
82+
/// <inheritdoc cref="OrchestrationContext.IsReplaying" />
83+
public bool IsReplaying => BaseContext.IsReplaying;
84+
85+
/// <inheritdoc cref="OrchestrationContext.MessageDataConverter" />
86+
public JsonDataConverter MessageDataConverter => _baseContext.MessageDataConverter;
87+
88+
/// <inheritdoc cref="OrchestrationContext.ErrorDataConverter" />
89+
public JsonDataConverter ErrorDataConverter => _baseContext.ErrorDataConverter;
90+
91+
/// <inheritdoc cref="OrchestrationContext.OrchestrationInstance" />
92+
public OrchestrationInstance OrchestrationInstance => _baseContext.OrchestrationInstance;
93+
94+
public event Action<string, string> Event;
95+
public Func<string> StatusProvider { get; set; }
96+
97+
public Guid NewGuid()
98+
{
99+
return DeterministicGuid.Create(_orchestrationGuidNamespace, $"{BaseContext.OrchestrationInstance.ExecutionId}/{++_count}");
100+
}
101+
102+
internal void RaiseEvent(string name, string input)
103+
{
104+
Event?.Invoke(name, input);
105+
}
106+
}
107+
}

src/LLL.DurableTask.Worker/OrchestrationBase.cs

Lines changed: 10 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,47 +1,36 @@
1-
using System;
2-
using System.Threading.Tasks;
1+
using System.Threading.Tasks;
32
using DurableTask.Core;
4-
using DurableTask.Core.Exceptions;
5-
using DurableTask.Core.Serializing;
63
using LLL.DurableTask.Core.Serializing;
7-
using DUtils = DurableTask.Core.Common.Utils;
84

95
namespace LLL.DurableTask.Worker
106
{
117
public abstract class OrchestrationBase<TResult, TInput> : TaskOrchestration
128
{
13-
public DataConverter DataConverter { get; }
14-
public OrchestrationContext Context { get; private set; }
15-
public OrchestrationEventReceiver EventReceiver { get; private set; }
16-
public OrchestrationGuidGenerator GuidGenerator { get; private set; }
17-
18-
public OrchestrationBase()
19-
{
20-
DataConverter = new TypelessJsonDataConverter();
21-
}
9+
public ExtendedOrchestrationContext Context { get; private set; }
2210

2311
public sealed override async Task<string> Execute(OrchestrationContext context, string input)
2412
{
2513
context.MessageDataConverter = new TypelessJsonDataConverter();
2614
context.ErrorDataConverter = new TypelessJsonDataConverter();
27-
Context = context;
28-
EventReceiver = new OrchestrationEventReceiver(context);
29-
GuidGenerator = new OrchestrationGuidGenerator(context.OrchestrationInstance.ExecutionId);
15+
Context = new ExtendedOrchestrationContext(context);
3016

31-
var parameter = DataConverter.Deserialize<TInput>(input);
17+
var parameter = context.MessageDataConverter.Deserialize<TInput>(input);
3218

3319
var result = await Execute(parameter);
34-
return DataConverter.Serialize(result);
20+
return context.MessageDataConverter.Serialize(result);
3521
}
3622

3723
public sealed override string GetStatus()
3824
{
39-
return DataConverter.Serialize(OnGetStatus());
25+
if (Context.StatusProvider != null)
26+
return Context.StatusProvider();
27+
28+
return Context.MessageDataConverter.Serialize(OnGetStatus());
4029
}
4130

4231
public sealed override void RaiseEvent(OrchestrationContext context, string name, string input)
4332
{
44-
EventReceiver.RaiseEvent(name, input);
33+
Context.RaiseEvent(name, input);
4534
}
4635

4736
public abstract Task<TResult> Execute(TInput input);
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
using System;
2+
using System.Threading;
3+
using System.Threading.Tasks;
4+
5+
namespace LLL.DurableTask.Worker
6+
{
7+
public static class OrchestrationContextEventExtensions
8+
{
9+
public static async Task<T> WaitForEventAsync<T>(
10+
this ExtendedOrchestrationContext context,
11+
string eventType,
12+
TimeSpan timeout,
13+
T defaultValue,
14+
CancellationToken cancellationToken = default)
15+
{
16+
using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
17+
var timerTask = context.CreateTimer<object>(context.CurrentUtcDateTime.Add(timeout), null, cts.Token);
18+
var eventTask = context.WaitForEventAsync<T>(eventType, cts.Token);
19+
var winningTask = await Task.WhenAny(timerTask, eventTask);
20+
cts.Cancel();
21+
return timerTask == winningTask
22+
? defaultValue
23+
: await eventTask;
24+
}
25+
26+
public static async Task<T> WaitForEventAsync<T>(
27+
this ExtendedOrchestrationContext context,
28+
string eventType,
29+
CancellationToken cancellationToken = default)
30+
{
31+
var taskCompletionSource = new TaskCompletionSource<T>();
32+
33+
using (cancellationToken.Register(() => taskCompletionSource.TrySetCanceled(cancellationToken)))
34+
{
35+
using (context.AddEventListener<T>(eventType, taskCompletionSource.SetResult, taskCompletionSource.SetException))
36+
{
37+
return await taskCompletionSource.Task;
38+
}
39+
}
40+
}
41+
42+
public static IDisposable AddEventListener<T>(
43+
this ExtendedOrchestrationContext context,
44+
string eventType,
45+
Action<T> handler,
46+
Action<Exception> exceptionHandler = null)
47+
{
48+
return new OrchestrationEventListener(context, (type, input) =>
49+
{
50+
if (type == eventType)
51+
{
52+
try
53+
{
54+
handler(context.MessageDataConverter.Deserialize<T>(input));
55+
}
56+
catch (Exception ex)
57+
{
58+
exceptionHandler?.Invoke(ex);
59+
}
60+
}
61+
});
62+
}
63+
64+
public static IDisposable AddEventListener(
65+
this ExtendedOrchestrationContext context,
66+
Action<string, string> handler)
67+
{
68+
return new OrchestrationEventListener(context, handler);
69+
}
70+
71+
private class OrchestrationEventListener : IDisposable
72+
{
73+
private readonly ExtendedOrchestrationContext _context;
74+
private readonly Action<string, string> _handler;
75+
76+
public OrchestrationEventListener(
77+
ExtendedOrchestrationContext context,
78+
Action<string, string> handler)
79+
{
80+
_context = context;
81+
_handler = handler;
82+
_context.Event += _handler;
83+
}
84+
85+
public void Dispose()
86+
{
87+
GC.SuppressFinalize(this);
88+
_context.Event -= _handler;
89+
}
90+
}
91+
}
92+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
using System;
2+
3+
namespace LLL.DurableTask.Worker
4+
{
5+
public static class OrchestrationContextStatusExtensions
6+
{
7+
public static void SetStatusProvider<T>(
8+
this ExtendedOrchestrationContext context,
9+
Func<T> statusProvider)
10+
{
11+
if (statusProvider == null)
12+
{
13+
context.StatusProvider = null;
14+
return;
15+
}
16+
17+
context.StatusProvider = () => context.MessageDataConverter.Serialize(statusProvider());
18+
}
19+
}
20+
}

src/LLL.DurableTask.Worker/OrchestrationEventListener.cs

Lines changed: 0 additions & 24 deletions
This file was deleted.

0 commit comments

Comments
 (0)