diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs
index 609a8b35d..886098e26 100644
--- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs
+++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs
@@ -117,6 +117,11 @@ public class AzureStorageOrchestrationServiceSettings
///
public int MaxConcurrentTaskEntityWorkItems { get; set; } = 100;
+ ///
+ /// Gets or sets the maximum dequeue count of any message before it is flagged as a "poison message".
+ /// The default value is 20.
+ ///
+ public int PoisonMessageDeuqueCountThreshold { get; set; } = 20;
///
/// Gets or sets the maximum number of concurrent storage operations that can be executed in the context
/// of a single orchestration instance.
diff --git a/src/DurableTask.AzureStorage/DataContractJsonConverter.cs b/src/DurableTask.AzureStorage/DataContractJsonConverter.cs
index bd0ac265c..e6727d861 100644
--- a/src/DurableTask.AzureStorage/DataContractJsonConverter.cs
+++ b/src/DurableTask.AzureStorage/DataContractJsonConverter.cs
@@ -29,6 +29,8 @@ namespace DurableTask.AzureStorage
///
internal class DataContractJsonConverter : JsonConverter
{
+ public JsonSerializer alternativeSerializer = null;
+
public override bool CanConvert(Type objectType)
{
if (objectType == null)
@@ -59,15 +61,24 @@ public override object ReadJson(
throw new ArgumentNullException(nameof(serializer));
}
- using (var stream = new MemoryStream())
- using (var writer = new StreamWriter(stream))
- using (var jsonWriter = new JsonTextWriter(writer))
+ // JsonReader is forward only, need to make a copy so we can read it twice.
+ using var stream = new MemoryStream();
+ using var writer = new StreamWriter(stream);
+ using var jsonWriter = new JsonTextWriter(writer);
+ jsonWriter.WriteToken(reader, writeChildren: true);
+ jsonWriter.Flush();
+ stream.Position = 0;
+
+ try
+ {
+ using var reader2 = new JsonTextReader(new StreamReader(stream));
+ reader2.CloseInput = false;
+ return this.alternativeSerializer.Deserialize(reader2, objectType);
+ }
+ catch
{
- jsonWriter.WriteToken(reader, writeChildren: true);
- jsonWriter.Flush();
stream.Position = 0;
-
- var contractSerializer = CreateSerializer(objectType, serializer);
+ DataContractJsonSerializer contractSerializer = CreateSerializer(objectType, serializer);
return contractSerializer.ReadObject(stream);
}
}
@@ -75,34 +86,8 @@ public override object ReadJson(
///
public override void WriteJson(JsonWriter writer, object value, JsonSerializer serializer)
{
- if (writer == null)
- {
- throw new ArgumentNullException(nameof(writer));
- }
-
- if (value == null)
- {
- writer.WriteNull();
- return;
- }
-
- if (serializer == null)
- {
- throw new ArgumentNullException(nameof(serializer));
- }
-
- using (var memoryStream = new MemoryStream())
- {
- var contractSerializer = CreateSerializer(value.GetType(), serializer);
- contractSerializer.WriteObject(memoryStream, value);
- memoryStream.Position = 0;
-
- using (var streamReader = new StreamReader(memoryStream))
- using (var jsonReader = new JsonTextReader(streamReader))
- {
- writer.WriteToken(jsonReader, writeChildren: true);
- }
- }
+ // Ignore data contract, use Newtonsoft
+ this.alternativeSerializer.Serialize(writer, value);
}
private static DataContractJsonSerializer CreateSerializer(Type type, JsonSerializer serializer)
@@ -115,4 +100,4 @@ private static DataContractJsonSerializer CreateSerializer(Type type, JsonSerial
});
}
}
-}
+}
\ No newline at end of file
diff --git a/src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj b/src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj
index 60aa12404..c0a81b6d9 100644
--- a/src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj
+++ b/src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj
@@ -21,8 +21,9 @@
1
17
- 3
+ 4
$(MajorVersion).$(MinorVersion).$(PatchVersion)
+ poisonmessagehandler.4
$(VersionPrefix).0
$(VersionPrefix).$(FileVersionRevision)
diff --git a/src/DurableTask.AzureStorage/EntityTrackingStoreQueries.cs b/src/DurableTask.AzureStorage/EntityTrackingStoreQueries.cs
index 3ba243908..b37cc8e6c 100644
--- a/src/DurableTask.AzureStorage/EntityTrackingStoreQueries.cs
+++ b/src/DurableTask.AzureStorage/EntityTrackingStoreQueries.cs
@@ -235,9 +235,10 @@ bool OrchestrationIsRunning(OrchestrationStatus? status)
{
// first, retrieve the entity scheduler state (= input of the orchestration state), possibly from blob storage.
string serializedSchedulerState;
- if (MessageManager.TryGetLargeMessageReference(state.Input, out Uri blobUrl))
+ if (MessageManager.TryGetLargeMessageReference(state.Input, out Uri? blobUrl))
{
- serializedSchedulerState = await this.messageManager.DownloadAndDecompressAsBytesAsync(blobUrl);
+ // we know blobUrl is not null because TryGetLargeMessageReference returned true
+ serializedSchedulerState = await this.messageManager.DownloadAndDecompressAsBytesAsync(blobUrl!);
}
else
{
diff --git a/src/DurableTask.AzureStorage/MessageManager.cs b/src/DurableTask.AzureStorage/MessageManager.cs
index 7834f830c..bcbae912f 100644
--- a/src/DurableTask.AzureStorage/MessageManager.cs
+++ b/src/DurableTask.AzureStorage/MessageManager.cs
@@ -10,7 +10,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// ----------------------------------------------------------------------------------
-
namespace DurableTask.AzureStorage
{
using System;
@@ -50,7 +49,7 @@ public MessageManager(
{
this.settings = settings;
this.azureStorageClient = azureStorageClient;
- this.blobContainer = this.azureStorageClient.GetBlobContainerReference(blobContainerName);
+ this.blobContainer = this.azureStorageClient?.GetBlobContainerReference(blobContainerName);
this.taskMessageSerializerSettings = new JsonSerializerSettings
{
TypeNameHandling = TypeNameHandling.Objects,
@@ -61,17 +60,19 @@ public MessageManager(
#endif
};
- if (this.settings.UseDataContractSerialization)
- {
- this.taskMessageSerializerSettings.Converters.Add(new DataContractJsonConverter());
- }
+ JsonSerializer newtonSoftSerializer = JsonSerializer.Create(taskMessageSerializerSettings);
+
+ // make hotfix always present
+ var dataConverter = new DataContractJsonConverter();
+ dataConverter.alternativeSerializer = newtonSoftSerializer;
+ this.taskMessageSerializerSettings.Converters.Add(dataConverter);
// We _need_ to create the Json serializer after providing the data converter,
// otherwise the converters will be ignored.
this.serializer = JsonSerializer.Create(taskMessageSerializerSettings);
}
-
+#nullable enable
public async Task EnsureContainerAsync()
{
bool created = false;
@@ -122,9 +123,10 @@ public async Task SerializeMessageDataAsync(MessageData messageData)
/// Actual string representation of message.
public async Task FetchLargeMessageIfNecessary(string message)
{
- if (TryGetLargeMessageReference(message, out Uri blobUrl))
+ if (TryGetLargeMessageReference(message, out Uri? blobUrl))
{
- return await this.DownloadAndDecompressAsBytesAsync(blobUrl);
+ // we know blobUrl is not null because TryGetLargeMessageReference returned true
+ return await this.DownloadAndDecompressAsBytesAsync(blobUrl!);
}
else
{
@@ -132,7 +134,7 @@ public async Task FetchLargeMessageIfNecessary(string message)
}
}
- internal static bool TryGetLargeMessageReference(string messagePayload, out Uri blobUrl)
+ internal static bool TryGetLargeMessageReference(string messagePayload, out Uri? blobUrl)
{
if (Uri.IsWellFormedUriString(messagePayload, UriKind.Absolute))
{
@@ -318,6 +320,7 @@ public async Task DeleteLargeMessageBlobs(string sanitizedInstanceId)
return storageOperationCount;
}
}
+#nullable disable
#if NETSTANDARD2_0
class TypeNameSerializationBinder : ISerializationBinder
diff --git a/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs b/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs
index 4484c3ee9..71119b701 100644
--- a/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs
+++ b/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs
@@ -22,6 +22,10 @@ namespace DurableTask.AzureStorage.Messaging
using DurableTask.AzureStorage.Monitoring;
using DurableTask.AzureStorage.Partitioning;
using DurableTask.AzureStorage.Storage;
+ using DurableTask.AzureStorage.Tracking;
+ using DurableTask.Core;
+ using DurableTask.Core.History;
+ using Microsoft.WindowsAzure.Storage.Table;
class ControlQueue : TaskHubQueue, IDisposable
{
@@ -46,6 +50,199 @@ public ControlQueue(
protected override TimeSpan MessageVisibilityTimeout => this.settings.ControlQueueVisibilityTimeout;
+ internal async Task FetchInstanceStatusInternalAsync(string instanceId, bool fetchInput)
+ {
+ if (instanceId == null)
+ {
+ throw new ArgumentNullException(nameof(instanceId));
+ }
+
+ var queryCondition = new OrchestrationInstanceStatusQueryCondition
+ {
+ InstanceId = instanceId,
+ FetchInput = fetchInput,
+ };
+
+ string instancesTableName = settings.InstanceTableName;
+
+ var instancesTable = this.azureStorageClient.GetTableReference(instancesTableName);
+
+ var tableEntitiesResponseInfo = await instancesTable.ExecuteQueryAsync(queryCondition.ToTableQuery());
+
+ var tableEntity = tableEntitiesResponseInfo.ReturnedEntities.FirstOrDefault();
+
+ OrchestrationState? orchestrationState = null;
+ if (tableEntity != null)
+ {
+ orchestrationState = await this.ConvertFromAsync(tableEntity);
+ }
+
+ this.settings.Logger.FetchedInstanceStatus(
+ this.storageAccountName,
+ this.settings.TaskHubName,
+ instanceId,
+ orchestrationState?.OrchestrationInstance.ExecutionId ?? string.Empty,
+ orchestrationState?.OrchestrationStatus.ToString() ?? "NotFound",
+ tableEntitiesResponseInfo.ElapsedMilliseconds);
+
+ if (tableEntity == null || orchestrationState == null)
+ {
+ return null;
+ }
+
+ return new InstanceStatus(orchestrationState, tableEntity.ETag);
+ }
+
+ async Task ConvertFromAsync(DynamicTableEntity tableEntity)
+ {
+ var orchestrationInstanceStatus = await CreateOrchestrationInstanceStatusAsync(tableEntity.Properties);
+ var instanceId = KeySanitation.UnescapePartitionKey(tableEntity.PartitionKey);
+ return await ConvertFromAsync(orchestrationInstanceStatus, instanceId);
+ }
+
+ private async Task CreateOrchestrationInstanceStatusAsync(IDictionary properties)
+ {
+ var instance = new OrchestrationInstanceStatus();
+ EntityProperty property;
+
+ if (properties.TryGetValue(nameof(OrchestrationInstanceStatus.ExecutionId), out property))
+ {
+ instance.ExecutionId = property.StringValue;
+ }
+
+ if (properties.TryGetValue(nameof(OrchestrationInstanceStatus.Name), out property))
+ {
+ instance.Name = property.StringValue;
+ }
+
+ if (properties.TryGetValue(nameof(OrchestrationInstanceStatus.Version), out property))
+ {
+ instance.Version = property.StringValue;
+ }
+
+ if (properties.TryGetValue(nameof(OrchestrationInstanceStatus.Input), out property))
+ {
+ instance.Input = property.StringValue;
+ }
+
+ if (properties.TryGetValue(nameof(OrchestrationInstanceStatus.Output), out property))
+ {
+ instance.Output = property.StringValue;
+ }
+
+ if (properties.TryGetValue(nameof(OrchestrationInstanceStatus.CustomStatus), out property))
+ {
+ instance.CustomStatus = property.StringValue;
+ }
+
+ if (properties.TryGetValue(nameof(OrchestrationInstanceStatus.CreatedTime), out property) && property.DateTime is { } createdTime)
+ {
+ instance.CreatedTime = createdTime;
+ }
+
+ if (properties.TryGetValue(nameof(OrchestrationInstanceStatus.LastUpdatedTime), out property) && property.DateTime is { } lastUpdatedTime)
+ {
+ instance.LastUpdatedTime = lastUpdatedTime;
+ }
+
+ if (properties.TryGetValue(nameof(OrchestrationInstanceStatus.CompletedTime), out property))
+ {
+ instance.CompletedTime = property.DateTime;
+ }
+
+ if (properties.TryGetValue(nameof(OrchestrationInstanceStatus.RuntimeStatus), out property))
+ {
+ instance.RuntimeStatus = property.StringValue;
+ }
+
+ if (properties.TryGetValue(nameof(OrchestrationInstanceStatus.ScheduledStartTime), out property))
+ {
+ instance.ScheduledStartTime = property.DateTime;
+ }
+
+ if (properties.TryGetValue(nameof(OrchestrationInstanceStatus.Generation), out property) && property.Int32Value is { } generation)
+ {
+ instance.Generation = generation;
+ }
+
+ if (properties.TryGetValue(nameof(OrchestrationInstanceStatus.Tags), out property) && property.StringValue is { Length: > 0 } tags)
+ {
+ instance.Tags = TagsSerializer.Deserialize(tags);
+ }
+ else if (properties.TryGetValue(nameof(OrchestrationInstanceStatus.Tags) + "BlobName", out property) && property.StringValue is { Length: > 0 } blob)
+ {
+ var blobContents = await messageManager.DownloadAndDecompressAsBytesAsync(blob);
+ instance.Tags = TagsSerializer.Deserialize(blobContents);
+ }
+
+ return instance;
+ }
+
+ async Task ConvertFromAsync(OrchestrationInstanceStatus orchestrationInstanceStatus, string instanceId)
+ {
+ var orchestrationState = new OrchestrationState();
+ if (!Enum.TryParse(orchestrationInstanceStatus.RuntimeStatus, out orchestrationState.OrchestrationStatus))
+ {
+ // This is not expected, but could happen if there is invalid data in the Instances table.
+ orchestrationState.OrchestrationStatus = (OrchestrationStatus)(-1);
+ }
+
+ orchestrationState.OrchestrationInstance = new OrchestrationInstance
+ {
+ InstanceId = instanceId,
+ ExecutionId = orchestrationInstanceStatus.ExecutionId,
+ };
+
+ orchestrationState.Name = orchestrationInstanceStatus.Name;
+ orchestrationState.Version = orchestrationInstanceStatus.Version;
+ orchestrationState.Status = orchestrationInstanceStatus.CustomStatus;
+ orchestrationState.CreatedTime = orchestrationInstanceStatus.CreatedTime;
+ orchestrationState.CompletedTime = orchestrationInstanceStatus.CompletedTime.GetValueOrDefault();
+ orchestrationState.LastUpdatedTime = orchestrationInstanceStatus.LastUpdatedTime;
+ orchestrationState.Input = orchestrationInstanceStatus.Input;
+ orchestrationState.Output = orchestrationInstanceStatus.Output;
+ orchestrationState.ScheduledStartTime = orchestrationInstanceStatus.ScheduledStartTime;
+ orchestrationState.Generation = orchestrationInstanceStatus.Generation;
+ orchestrationState.Tags = orchestrationInstanceStatus.Tags;
+
+ if (this.settings.FetchLargeMessageDataEnabled)
+ {
+ if (MessageManager.TryGetLargeMessageReference(orchestrationState.Input, out Uri? blobUrl))
+ {
+ string json = await this.messageManager.DownloadAndDecompressAsBytesAsync(blobUrl!);
+
+ // Depending on which blob this is, we interpret it differently.
+ if (blobUrl!.AbsolutePath.EndsWith("ExecutionStarted.json.gz"))
+ {
+ // The downloaded content is an ExecutedStarted message payload that
+ // was created when the orchestration was started.
+ MessageData msg = this.messageManager.DeserializeMessageData(json);
+ if (msg?.TaskMessage?.Event is ExecutionStartedEvent startEvent)
+ {
+ orchestrationState.Input = startEvent.Input;
+ }
+ else
+ {
+ this.settings.Logger.GeneralWarning(
+ this.storageAccountName,
+ this.settings.TaskHubName,
+ $"Orchestration input blob URL '{blobUrl}' contained unrecognized data.",
+ instanceId);
+ }
+ }
+ else
+ {
+ // The downloaded content is the raw input JSON
+ orchestrationState.Input = json;
+ }
+ }
+
+ orchestrationState.Output = await this.messageManager.FetchLargeMessageIfNecessary(orchestrationState.Output);
+ }
+
+ return orchestrationState;
+ }
+
public async Task> GetMessagesAsync(CancellationToken cancellationToken)
{
using (var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(this.releaseCancellationToken, cancellationToken))
@@ -105,27 +302,39 @@ await batch.ParallelForEachAsync(async delegate (QueueMessage queueMessage)
MessageData messageData;
try
{
+ // try to de-serialize message
messageData = await this.messageManager.DeserializeQueueMessageAsync(
queueMessage,
this.storageQueue.Name);
+
+ // if successful, check if it's a poison message. If so, we handle it
+ // and log metadata about it as the de-serialization succeeded.
+ await this.HandleIfPoisonMessageAsync(messageData);
+
+ var instanceId = messageData.TaskMessage.OrchestrationInstance.InstanceId;
+ InstanceStatus? status = await FetchInstanceStatusInternalAsync(instanceId, false);
+ if (status != null)
+ {
+ if (status.State.OrchestrationStatus == OrchestrationStatus.Terminated)
+ {
+ // delete the message, the orchestration is terminated, we won't load it's history
+ // TODO: possibly do not delete, but move to poison message table?
+ await this.InnerQueue.DeleteMessageAsync(queueMessage);
+ }
+ }
}
- catch (Exception e)
+ catch (Exception exception)
{
- // We have limited information about the details of the message
- // since we failed to deserialize it.
- this.settings.Logger.MessageFailure(
- this.storageAccountName,
- this.settings.TaskHubName,
- queueMessage.Id /* MessageId */,
- string.Empty /* InstanceId */,
- string.Empty /* ExecutionId */,
- this.storageQueue.Name,
- string.Empty /* EventType */,
- 0 /* TaskEventId */,
- e.ToString());
-
- // Abandon the message so we can try it again later.
- await this.AbandonMessageAsync(queueMessage);
+ // Deserialization errors can be persistent, so we check if this is a poison message.
+ bool isPoisonMessage = await this.TryHandlingDeserializationPoisonMessage(queueMessage, exception);
+ if (isPoisonMessage)
+ {
+ // we have already handled the poison message, so we move on.
+ return;
+ }
+
+ // This is not a poison message (at least not yet), so we abandon it to retry later.
+ await this.AbandonMessageAsync(queueMessage, exception);
return;
}
@@ -190,8 +399,22 @@ await batch.ParallelForEachAsync(async delegate (QueueMessage queueMessage)
}
// This overload is intended for cases where we aren't able to deserialize an instance of MessageData.
- public Task AbandonMessageAsync(QueueMessage queueMessage)
+ public Task AbandonMessageAsync(QueueMessage queueMessage, Exception exception)
{
+
+ // We have limited information about the details of the message
+ // since we failed to deserialize it.
+ this.settings.Logger.MessageFailure(
+ this.storageAccountName,
+ this.settings.TaskHubName,
+ queueMessage.Id /* MessageId */,
+ string.Empty /* InstanceId */,
+ string.Empty /* ExecutionId */,
+ this.storageQueue.Name,
+ string.Empty /* EventType */,
+ 0 /* TaskEventId */,
+ exception.ToString());
+
this.stats.PendingOrchestratorMessages.TryRemove(queueMessage.Id, out _);
return base.AbandonMessageAsync(
queueMessage,
diff --git a/src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs b/src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs
index 9dba81579..1d9644cb0 100644
--- a/src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs
+++ b/src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs
@@ -22,6 +22,7 @@ namespace DurableTask.AzureStorage.Messaging
using DurableTask.AzureStorage.Storage;
using DurableTask.Core;
using DurableTask.Core.History;
+ using Microsoft.WindowsAzure.Storage.Table;
abstract class TaskHubQueue
{
@@ -57,6 +58,98 @@ public TaskHubQueue(
this.backoffHelper = new BackoffPollingHelper(minPollingDelay, maxPollingDelay);
}
+ public async Task HandleIfPoisonMessageAsync(MessageData messageData)
+ {
+ var queueMessage = messageData.OriginalQueueMessage;
+ var maxThreshold = this.settings.PoisonMessageDeuqueCountThreshold;
+
+ if (queueMessage.DequeueCount > maxThreshold)
+ {
+ string poisonMessageTableName = this.settings.TaskHubName.ToLowerInvariant() + "Poison";
+ Table poisonMessagesTable = this.azureStorageClient.GetTableReference(poisonMessageTableName);
+ await poisonMessagesTable.CreateIfNotExistsAsync();
+
+ // provide guidance, which is backend-specific
+ string guidance = $"Queue message ID '{queueMessage.Id}' was dequeued {queueMessage.DequeueCount} times," +
+ $" which is greater than the threshold poison message threshold ({maxThreshold}). " +
+ $"The message has been moved to the '{poisonMessageTableName}' table for manual review. " +
+ $"This will fail the consuming orchestrator, activity, or entity";
+ messageData.TaskMessage.Event.PoisonGuidance = guidance;
+
+ // add to poison table
+ var poisonMessage = new DynamicTableEntity(queueMessage.Id, this.Name)
+ {
+ Properties =
+ {
+ ["RawMessage"] = new EntityProperty(queueMessage.Message),
+ ["Reason"] = new EntityProperty(guidance)
+ }
+ };
+
+ await poisonMessagesTable.InsertAsync(poisonMessage);
+
+ // delete from queue so it doesn't get processed again.
+ await this.storageQueue.DeleteMessageAsync(queueMessage);
+
+ // since isPoison is `true`, we'll override the deserialized message
+ messageData.TaskMessage.Event.IsPoison = true;
+
+ this.settings.Logger.PoisonMessageDetected(
+ this.storageAccountName,
+ this.settings.TaskHubName,
+ messageData.TaskMessage.Event.EventType.ToString(),
+ messageData.TaskMessage.Event.EventId,
+ messageData.OriginalQueueMessage.Id,
+ messageData.TaskMessage.OrchestrationInstance.InstanceId,
+ messageData.TaskMessage.OrchestrationInstance.ExecutionId,
+ this.Name,
+ messageData.OriginalQueueMessage.DequeueCount);
+ }
+ }
+
+ public async Task TryHandlingDeserializationPoisonMessage(QueueMessage queueMessage, Exception deserializationException)
+ {
+
+ var maxThreshold = this.settings.PoisonMessageDeuqueCountThreshold;
+ bool isPoisonMessage = queueMessage.DequeueCount > maxThreshold;
+ if (isPoisonMessage)
+ {
+ isPoisonMessage = true;
+ string guidance = $"Queue message ID '{queueMessage.Id}' was dequeued {queueMessage.DequeueCount} times," +
+ $" which is greater than the threshold poison message threshold ({maxThreshold}). " +
+ $"A de-serialization error ocurred: \n {deserializationException}";
+ var poisonMessage = new DynamicTableEntity(queueMessage.Id, this.Name)
+ {
+ Properties =
+ {
+ ["RawMessage"] = new EntityProperty(queueMessage.Message),
+ ["Reason"] = new EntityProperty(guidance)
+ }
+ };
+
+ // add to poison table
+ string poisonMessageTableName = this.settings.TaskHubName.ToLowerInvariant() + "Poison";
+ Table poisonMessagesTable = this.azureStorageClient.GetTableReference(poisonMessageTableName);
+ await poisonMessagesTable.CreateIfNotExistsAsync();
+ await poisonMessagesTable.InsertAsync(poisonMessage);
+
+ // delete from queue so it doesn't get processed again.
+ await this.storageQueue.DeleteMessageAsync(queueMessage);
+
+ this.settings.Logger.PoisonMessageDetected(
+ this.storageAccountName,
+ this.settings.TaskHubName,
+ string.Empty,
+ 0,
+ string.Empty,
+ string.Empty,
+ string.Empty,
+ this.Name,
+ queueMessage.DequeueCount);
+ }
+ return isPoisonMessage;
+ }
+
public string Name => this.storageQueue.Name;
public Uri Uri => this.storageQueue.Uri;
diff --git a/src/DurableTask.AzureStorage/Messaging/WorkItemQueue.cs b/src/DurableTask.AzureStorage/Messaging/WorkItemQueue.cs
index a420c87bf..eaf01402b 100644
--- a/src/DurableTask.AzureStorage/Messaging/WorkItemQueue.cs
+++ b/src/DurableTask.AzureStorage/Messaging/WorkItemQueue.cs
@@ -10,7 +10,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// ----------------------------------------------------------------------------------
-
+#nullable enable
namespace DurableTask.AzureStorage.Messaging
{
using System;
@@ -30,13 +30,13 @@ public WorkItemQueue(
protected override TimeSpan MessageVisibilityTimeout => this.settings.WorkItemQueueVisibilityTimeout;
- public async Task GetMessageAsync(CancellationToken cancellationToken)
+ public async Task GetMessageAsync(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
try
{
- QueueMessage queueMessage = await this.storageQueue.GetMessageAsync(this.settings.WorkItemQueueVisibilityTimeout, cancellationToken);
+ QueueMessage? queueMessage = await this.storageQueue.GetMessageAsync(this.settings.WorkItemQueueVisibilityTimeout, cancellationToken);
if (queueMessage == null)
{
@@ -44,12 +44,30 @@ public async Task GetMessageAsync(CancellationToken cancellationTok
continue;
}
- MessageData data = await this.messageManager.DeserializeQueueMessageAsync(
- queueMessage,
- this.storageQueue.Name);
+ try
+ {
+ MessageData data = await this.messageManager.DeserializeQueueMessageAsync(
+ queueMessage,
+ this.storageQueue.Name);
+
+ // if successful, check if it's a poison message. If so, we handle it
+ // and log metadata about it as the de-serialization succeeded.
+ await this.HandleIfPoisonMessageAsync(data);
+ this.backoffHelper.Reset();
+ return data;
+ }
+ catch (Exception exception)
+ {
+ // Deserialization errors can be persistent, so we check if this is a poison message.
+ bool isPoisonMessage = await this.TryHandlingDeserializationPoisonMessage(queueMessage, exception);
+ if (isPoisonMessage)
+ {
+ // we have already handled the poison message, so we move on.
+ continue;
+ }
+ }
+
- this.backoffHelper.Reset();
- return data;
}
catch (Exception e)
{
diff --git a/src/DurableTask.Core/DurableTask.Core.csproj b/src/DurableTask.Core/DurableTask.Core.csproj
index 25c247e1b..39cbc748f 100644
--- a/src/DurableTask.Core/DurableTask.Core.csproj
+++ b/src/DurableTask.Core/DurableTask.Core.csproj
@@ -18,8 +18,9 @@
2
17
- 1
+ 3
$(MajorVersion).$(MinorVersion).$(PatchVersion)
+ poisonmessagehandler.3
$(VersionPrefix).0
$(VersionPrefix).$(FileVersionRevision)
diff --git a/src/DurableTask.Core/History/HistoryEvent.cs b/src/DurableTask.Core/History/HistoryEvent.cs
index 8e6a8f316..29f799b57 100644
--- a/src/DurableTask.Core/History/HistoryEvent.cs
+++ b/src/DurableTask.Core/History/HistoryEvent.cs
@@ -93,5 +93,16 @@ protected HistoryEvent(int eventId)
/// Implementation for .
///
public ExtensionDataObject? ExtensionData { get; set; }
+
+ ///
+ /// Gets or sets whether this is a poison message.
+ ///
+ public bool IsPoison { get; set; } = false;
+
+ ///
+ /// Gets or sets user-facing details for why a message was labeled as poison.
+ /// This is to be set by each storage provider.
+ ///
+ public string PoisonGuidance { get; set; } = "";
}
}
\ No newline at end of file
diff --git a/src/DurableTask.Core/TaskActivityDispatcher.cs b/src/DurableTask.Core/TaskActivityDispatcher.cs
index c9e402401..8f16263d9 100644
--- a/src/DurableTask.Core/TaskActivityDispatcher.cs
+++ b/src/DurableTask.Core/TaskActivityDispatcher.cs
@@ -22,6 +22,7 @@ namespace DurableTask.Core
using DurableTask.Core.History;
using DurableTask.Core.Logging;
using DurableTask.Core.Middleware;
+ using DurableTask.Core.Serializing;
using DurableTask.Core.Tracing;
///
@@ -190,6 +191,21 @@ await this.dispatchPipeline.RunAsync(dispatchContext, async _ =>
try
{
+ if (scheduledEvent.IsPoison)
+ {
+ // if the activity is "poison", then we should not executed again. Instead, we'll manually fail the activity
+ // by throwing an exception on behalf of the user-code. In the exception, we provide the storage-provider's guidance
+ // on how to deal with the poison message.
+
+ // We need to account for all possible deserialization modes, so we construct an exception valid in all modes.
+ // TODO: revise - this is clunky
+ var exception = new Exception(scheduledEvent.PoisonGuidance);
+ var failureDetails = new FailureDetails(exception);
+ var details = Utils.SerializeCause(exception, JsonDataConverter.Default);
+ var taskFailure = new TaskFailureException(details, exception, details).WithFailureDetails(failureDetails);
+ throw taskFailure;
+ }
+
string? output = await taskActivity.RunAsync(context, scheduledEvent.Input);
responseEvent = new TaskCompletedEvent(-1, scheduledEvent.EventId, output);
}
diff --git a/src/DurableTask.Core/TaskOrchestrationExecutor.cs b/src/DurableTask.Core/TaskOrchestrationExecutor.cs
index c5e100a2f..7c5cafaf8 100644
--- a/src/DurableTask.Core/TaskOrchestrationExecutor.cs
+++ b/src/DurableTask.Core/TaskOrchestrationExecutor.cs
@@ -197,6 +197,22 @@ void ProcessEvents(IEnumerable events)
void ProcessEvent(HistoryEvent historyEvent)
{
+ if (historyEvent.IsPoison)
+ {
+ // If the message is labeled as "poison", then we should avoid processing it again.
+ // Therefore, we replace the event "in place" with an "ExecutionTerminatedEvent", so the
+ // orchestrator stops immediately.
+
+ var terminationEvent = new ExecutionTerminatedEvent(-1, historyEvent.PoisonGuidance);
+ historyEvent = terminationEvent;
+
+ // since replay is not guaranteed, we need to populate `this.result`
+ // with a completed task
+ var taskCompletionSource = new TaskCompletionSource();
+ taskCompletionSource.SetResult("");
+ this.result = taskCompletionSource.Task;
+ }
+
bool overrideSuspension = historyEvent.EventType == EventType.ExecutionResumed || historyEvent.EventType == EventType.ExecutionTerminated;
if (this.context.IsSuspended && !overrideSuspension)
{
diff --git a/test/DurableTask.AzureStorage.Tests/DataContractJsonConverterTests.cs b/test/DurableTask.AzureStorage.Tests/DataContractJsonConverterTests.cs
index fa28622f6..1fab2b512 100644
--- a/test/DurableTask.AzureStorage.Tests/DataContractJsonConverterTests.cs
+++ b/test/DurableTask.AzureStorage.Tests/DataContractJsonConverterTests.cs
@@ -12,12 +12,31 @@ public class DataContractJsonConverterTests
{
private static readonly JsonSerializerSettings serializerSettings = new JsonSerializerSettings
{
+ TypeNameHandling = TypeNameHandling.Objects,
Converters =
{
- new DataContractJsonConverter(),
+ new DataContractJsonConverter()
+ {
+ alternativeSerializer = JsonSerializer.Create(new JsonSerializerSettings
+ {
+ TypeNameHandling = TypeNameHandling.Objects,
+ }),
+ },
}
};
+ [TestMethod]
+ public void Fallback_Succeeds()
+ {
+ var messageManager = new MessageManager(
+ new AzureStorageOrchestrationServiceSettings() { UseDataContractSerialization = true },
+ null,
+ "test");
+ string json = @"{""$type"":""DurableTask.AzureStorage.MessageData"",""ActivityId"":""03d10904-8a30-4bc5-9659-3c2c46b9435c"",""TaskMessage"":{""Event"":{""__type"":""TimerFiredEvent:#DurableTask.Core.History"",""EventId"":-1,""EventType"":11,""IsPlayed"":false,""Timestamp"":""2024-03-14T16:00:05.1285428Z"",""FireAt"":""2024-03-14T19:00:05.0563363Z"",""TimerId"":1},""OrchestrationInstance"":{""ExecutionId"":""8c0384066f414e1e9f4f8af94f9cc03f"",""InstanceId"":""8f30203236d34f8ea5eeff5428eb5ee7:20""},""SequenceNumber"":0},""SequenceNumber"":147825,""Episode"":1,""Sender"":{""ExecutionId"":""8c0384066f414e1e9f4f8af94f9cc03f"",""InstanceId"":""8f30203236d34f8ea5eeff5428eb5ee7:20""},""SerializableTraceContext"":""{\""$id\"":\""1\"",\""$type\"":\""DurableTask.Core.W3CTraceContext, DurableTask.Core\"",\""TraceParent\"":\""00-8fdaec3721ee2f7e0e43b5be5b22e54f-264d84ac14ac0835-00\"",\""TraceState\"":\""corId=8bb00592-7083-4190-8a20-98703183e8ca\"",\""ParentSpanId\"":\""abcd4f1b232271e8\"",\""StartTime\"":\""2024-03-14T16:00:05.1286681+00:00\"",\""TelemetryType\"":\""Dependency\"",\""OrchestrationTraceContexts\"":[{\""$ref\"":\""1\""},{\""$id\"":\""2\"",\""$type\"":\""DurableTask.Core.W3CTraceContext, DurableTask.Core\"",\""TraceParent\"":\""00-8fdaec3721ee2f7e0e43b5be5b22e54f-abcd4f1b232271e8-00\"",\""TraceState\"":\""corId=8bb00592-7083-4190-8a20-98703183e8ca\"",\""ParentSpanId\"":\""19d9c9a2ac642d79\"",\""StartTime\"":\""2024-03-14T16:00:04.7358608+00:00\"",\""TelemetryType\"":\""Request\"",\""OrchestrationTraceContexts\"":[{\""$ref\"":\""2\""},{\""$id\"":\""3\"",\""$type\"":\""DurableTask.Core.W3CTraceContext, DurableTask.Core\"",\""TraceParent\"":\""00-8fdaec3721ee2f7e0e43b5be5b22e54f-19d9c9a2ac642d79-00\"",\""TraceState\"":\""corId=8bb00592-7083-4190-8a20-98703183e8ca\"",\""ParentSpanId\"":\""586d2cfbe8ad7fb6\"",\""StartTime\"":\""2024-03-14T16:00:03.8475412+00:00\"",\""TelemetryType\"":\""Dependency\"",\""OrchestrationTraceContexts\"":[{\""$ref\"":\""3\""},{\""$id\"":\""4\"",\""$type\"":\""DurableTask.Core.W3CTraceContext, DurableTask.Core\"",\""TraceParent\"":\""00-8fdaec3721ee2f7e0e43b5be5b22e54f-586d2cfbe8ad7fb6-00\"",\""TraceState\"":\""corId=8bb00592-7083-4190-8a20-98703183e8ca\"",\""ParentSpanId\"":\""1402edc2597b9617\"",\""StartTime\"":\""2024-03-14T16:00:01.7224413+00:00\"",\""TelemetryType\"":\""Request\"",\""OrchestrationTraceContexts\"":[{\""$id\"":\""5\"",\""$type\"":\""DurableTask.Core.W3CTraceContext, DurableTask.Core\"",\""TraceParent\"":\""00-8fdaec3721ee2f7e0e43b5be5b22e54f-22a551c6fd5d3acd-00\"",\""TraceState\"":\""corId=8bb00592-7083-4190-8a20-98703183e8ca\"",\""ParentSpanId\"":\""9a882fa89395da4f\"",\""StartTime\"":\""2024-03-14T15:56:00.8004397+00:00\"",\""TelemetryType\"":\""Request\"",\""OrchestrationTraceContexts\"":[{\""$ref\"":\""5\""}],\""OperationName\"":\""DtOrchestrator\""},{\""$id\"":\""6\"",\""$type\"":\""DurableTask.Core.W3CTraceContext, DurableTask.Core\"",\""TraceParent\"":\""00-8fdaec3721ee2f7e0e43b5be5b22e54f-e331e54d39cad3c4-00\"",\""TraceState\"":\""corId=8bb00592-7083-4190-8a20-98703183e8ca\"",\""ParentSpanId\"":\""22a551c6fd5d3acd\"",\""StartTime\"":\""2024-03-14T15:56:01.3294411+00:00\"",\""TelemetryType\"":\""Dependency\"",\""OrchestrationTraceContexts\"":[{\""$ref\"":\""5\""},{\""$ref\"":\""6\""}],\""OperationName\"":\""DtOrchestrator Microsoft.LabServices.ManagedLabs.Application.Schedules.RunScheduleActionOrchestration+Handler\""},{\""$id\"":\""7\"",\""$type\"":\""DurableTask.Core.W3CTraceContext, DurableTask.Core\"",\""TraceParent\"":\""00-8fdaec3721ee2f7e0e43b5be5b22e54f-179a6ba5961635d7-00\"",\""TraceState\"":\""corId=8bb00592-7083-4190-8a20-98703183e8ca\"",\""ParentSpanId\"":\""e331e54d39cad3c4\"",\""StartTime\"":\""2024-03-14T15:56:01.4212406+00:00\"",\""TelemetryType\"":\""Request\"",\""OrchestrationTraceContexts\"":[{\""$ref\"":\""7\""},{\""$ref\"":\""6\""},{\""$ref\"":\""5\""}],\""OperationName\"":\""DtOrchestrator\""},{\""$id\"":\""8\"",\""$type\"":\""DurableTask.Core.W3CTraceContext, DurableTask.Core\"",\""TraceParent\"":\""00-8fdaec3721ee2f7e0e43b5be5b22e54f-1402edc2597b9617-00\"",\""TraceState\"":\""corId=8bb00592-7083-4190-8a20-98703183e8ca\"",\""ParentSpanId\"":\""179a6ba5961635d7\"",\""StartTime\"":\""2024-03-14T16:00:01.3381712+00:00\"",\""TelemetryType\"":\""Dependency\"",\""OrchestrationTraceContexts\"":[{\""$ref\"":\""5\""},{\""$ref\"":\""6\""},{\""$ref\"":\""7\""},{\""$ref\"":\""8\""}],\""OperationName\"":\""DtOrchestrator Microsoft.LabServices.ManagedLabs.Application.Labs.NotifyLabVmTrackersOrchestration+Handler\""},{\""$ref\"":\""4\""}],\""OperationName\"":\""DtOrchestrator\""},{\""$ref\"":\""8\""},{\""$ref\"":\""7\""},{\""$ref\"":\""6\""},{\""$ref\"":\""5\""}],\""OperationName\"":\""DtOrchestrator Microsoft.LabServices.ManagedLabs.Application.VirtualMachines.NotifyVmTrackerOrchestration+Handler\""},{\""$ref\"":\""4\""},{\""$ref\"":\""8\""},{\""$ref\"":\""7\""},{\""$ref\"":\""6\""},{\""$ref\"":\""5\""}],\""OperationName\"":\""DtOrchestrator\""},{\""$ref\"":\""3\""},{\""$ref\"":\""4\""},{\""$ref\"":\""8\""},{\""$ref\"":\""7\""},{\""$ref\"":\""6\""},{\""$ref\"":\""5\""}],\""OperationName\"":\""DtOrchestrator outbound\""}""}";
+
+ object obj = messageManager.DeserializeMessageData(json);
+ }
+
[TestMethod]
public void ReadWrite_Null_Succeeds()
{