diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs
index 609a8b35d..abeecb06e 100644
--- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs
+++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs
@@ -318,5 +318,12 @@ internal LogHelper Logger
/// Consumers that require separate dispatch (such as the new out-of-proc v2 SDKs) must set this to true.
///
public bool UseSeparateQueueForEntityWorkItems { get; set; } = false;
+
+ ///
+ /// Gets or sets whether or not misrouted queue message should be corrected, according to the instanceID hashing function.
+ /// This defaults to false, to avoid possible "infinite re-routing" of messages. Users may set this to true to try to recover
+ /// from accidentally changing the partitionCount of an existing TaskHub, which can lead to misrouted messages.
+ ///
+ public bool CorrectMisourtedMessages { get; set; } = false;
}
}
diff --git a/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs b/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs
index 4484c3ee9..43ac457ba 100644
--- a/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs
+++ b/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs
@@ -46,6 +46,32 @@ public ControlQueue(
protected override TimeSpan MessageVisibilityTimeout => this.settings.ControlQueueVisibilityTimeout;
+ private async Task correctMessagesIfMisourted(MessageData messageData)
+ {
+ // validate that the message came from the expected queue
+ string instanceId = messageData.TaskMessage.OrchestrationInstance.InstanceId;
+ uint partitionIndex = Fnv1aHashHelper.ComputeHash(instanceId) % (uint)this.settings.PartitionCount;
+ string expectedQueueOfOrigin = AzureStorageOrchestrationService.GetControlQueueName(this.settings.TaskHubName, (int)partitionIndex);
+
+ // route to the right queue if the user opted in to the mitigation.
+ // This assumes that all workers already have correct partitionCount configuration
+ // RISK - if different workers have different partitionCount values,
+ // this could lead to infinite re-routing of orchestrator messages.
+ if (expectedQueueOfOrigin != this.Name)
+ {
+ // obtain reference to expected queue (should have been created already)
+ var expectedQueue = this.azureStorageClient.GetQueueReference(expectedQueueOfOrigin);
+ await expectedQueue.CreateIfNotExistsAsync();
+
+ // place on correct queue
+ var originalMessage = messageData.OriginalQueueMessage;
+ await expectedQueue.AddMessageAsync(originalMessage, TimeSpan.FromMinutes(1));
+
+ // delete message from current queue
+ await this.storageQueue.DeleteMessageAsync(originalMessage);
+ }
+ }
+
public async Task> GetMessagesAsync(CancellationToken cancellationToken)
{
using (var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(this.releaseCancellationToken, cancellationToken))
@@ -108,6 +134,12 @@ await batch.ParallelForEachAsync(async delegate (QueueMessage queueMessage)
messageData = await this.messageManager.DeserializeQueueMessageAsync(
queueMessage,
this.storageQueue.Name);
+
+ if (this.settings.CorrectMisourtedMessages)
+ {
+ await correctMessagesIfMisourted(messageData);
+ return; // skip further processing of this message
+ }
}
catch (Exception e)
{