Skip to content

Commit 9dab85f

Browse files
Daniel Marbachtimbussmann
andauthored
Make sure transport operations do not share mutable state between multiple routing strategies (#6905) (#6910)
* Add routing to dispatch connector tests to reproduce the shared state problem * Fix the state sharing problem in ToTransportOperation * Cross check headers and properties * Test for the header modifications in the routing strategy * add acceptance test * better naming and comments * block scoped namespace * Remove left overs * Move test * Primary ctor * Behavior --------- Co-authored-by: Tim Bussmann <[email protected]>
1 parent db82ac8 commit 9dab85f

File tree

6 files changed

+301
-13
lines changed

6 files changed

+301
-13
lines changed

src/NServiceBus.AcceptanceTesting/AcceptanceTestingPersistence/Outbox/AcceptanceTestingOutboxStorage.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
{
33
using System;
44
using System.Collections.Concurrent;
5+
using System.Linq;
56
using System.Threading;
67
using System.Threading.Tasks;
78
using Extensibility;
@@ -29,7 +30,7 @@ public Task Store(OutboxMessage message, IOutboxTransaction transaction, Context
2930
var tx = (AcceptanceTestingOutboxTransaction)transaction;
3031
tx.Enlist(() =>
3132
{
32-
if (!storage.TryAdd(message.MessageId, new StoredMessage(message.MessageId, message.TransportOperations)))
33+
if (!storage.TryAdd(message.MessageId, new StoredMessage(message.MessageId, message.TransportOperations.Select(o => o.DeepCopy()).ToArray())))
3334
{
3435
throw new Exception($"Outbox message with id '{message.MessageId}' is already present in storage.");
3536
}
Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
namespace NServiceBus.AcceptanceTests.Outbox
2+
{
3+
using System;
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
using AcceptanceTesting;
7+
using AcceptanceTesting.Customization;
8+
using EndpointTemplates;
9+
using Features;
10+
using NServiceBus.Pipeline;
11+
using NUnit.Framework;
12+
13+
public class When_publishing_with_outbox : NServiceBusAcceptanceTest
14+
{
15+
[Test]
16+
public async Task Should_be_delivered_to_all_subscribers()
17+
{
18+
Requires.OutboxPersistence();
19+
20+
Context context = await Scenario.Define<Context>()
21+
.WithEndpoint<Publisher>(b =>
22+
b.When(c => c.Subscriber1Subscribed && c.Subscriber2Subscribed, (session, c) =>
23+
{
24+
// Send a trigger message that will invoke the handler method that publishes the event
25+
c.AddTrace("Both subscribers are subscribed, going to send TriggerMessage");
26+
return session.SendLocal(new TriggerMessage());
27+
})
28+
)
29+
.WithEndpoint<Subscriber1>(b => b.When(async (session, ctx) =>
30+
{
31+
await session.Subscribe<MyEvent>();
32+
if (ctx.HasNativePubSubSupport)
33+
{
34+
ctx.Subscriber1Subscribed = true;
35+
ctx.AddTrace("Subscriber1 is now subscribed (at least we have asked the broker to be subscribed)");
36+
}
37+
else
38+
{
39+
ctx.AddTrace("Subscriber1 has now asked to be subscribed to MyEvent");
40+
}
41+
}))
42+
.WithEndpoint<Subscriber2>(b => b.When(async (session, ctx) =>
43+
{
44+
await session.Subscribe<MyEvent>();
45+
if (ctx.HasNativePubSubSupport)
46+
{
47+
ctx.Subscriber2Subscribed = true;
48+
ctx.AddTrace("Subscriber2 is now subscribed (at least we have asked the broker to be subscribed)");
49+
}
50+
else
51+
{
52+
ctx.AddTrace("Subscriber2 has now asked to be subscribed to MyEvent");
53+
}
54+
}))
55+
.Done(c => c.Subscriber1GotTheEvent && c.Subscriber2GotTheEvent)
56+
.Run(TimeSpan.FromSeconds(10));
57+
58+
Assert.True(context.Subscriber1GotTheEvent);
59+
Assert.True(context.Subscriber2GotTheEvent);
60+
}
61+
62+
public class Context : ScenarioContext
63+
{
64+
public bool Subscriber1GotTheEvent { get; set; }
65+
public bool Subscriber2GotTheEvent { get; set; }
66+
public bool Subscriber1Subscribed { get; set; }
67+
public bool Subscriber2Subscribed { get; set; }
68+
}
69+
70+
public class Publisher : EndpointConfigurationBuilder
71+
{
72+
public Publisher() =>
73+
EndpointSetup<DefaultPublisher>(b =>
74+
{
75+
b.ConfigureTransport().TransportTransactionMode = TransportTransactionMode.ReceiveOnly;
76+
b.EnableOutbox();
77+
// Test the outbox behavior in situations where messages are deserialized and dispatched from the outbox storage by injecting an exception into the dispatch pipeline
78+
b.Pipeline.Register(new BlowUpAfterDispatchBehavior(), "ensure outbox dispatch fails");
79+
b.Recoverability().Immediate(i => i.NumberOfRetries(1));
80+
b.OnEndpointSubscribed<Context>((s, context) =>
81+
{
82+
var subscriber1 = Conventions.EndpointNamingConvention(typeof(Subscriber1));
83+
if (s.SubscriberEndpoint.Contains(subscriber1))
84+
{
85+
context.Subscriber1Subscribed = true;
86+
context.AddTrace($"{subscriber1} is now subscribed");
87+
}
88+
var subscriber2 = Conventions.EndpointNamingConvention(typeof(Subscriber2));
89+
if (s.SubscriberEndpoint.Contains(subscriber2))
90+
{
91+
context.AddTrace($"{subscriber2} is now subscribed");
92+
context.Subscriber2Subscribed = true;
93+
}
94+
});
95+
b.DisableFeature<AutoSubscribe>();
96+
});
97+
98+
public class TriggerHandler : IHandleMessages<TriggerMessage>
99+
{
100+
public Task Handle(TriggerMessage message, IMessageHandlerContext context)
101+
=> context.Publish(new MyEvent());
102+
}
103+
104+
class BlowUpAfterDispatchBehavior : IBehavior<IBatchDispatchContext, IBatchDispatchContext>
105+
{
106+
public async Task Invoke(IBatchDispatchContext context, Func<IBatchDispatchContext, Task> next)
107+
{
108+
if (Interlocked.Increment(ref invocationCounter) == 1)
109+
{
110+
throw new SimulatedException();
111+
}
112+
113+
await next(context).ConfigureAwait(false);
114+
}
115+
116+
int invocationCounter;
117+
}
118+
}
119+
120+
public class Subscriber1 : EndpointConfigurationBuilder
121+
{
122+
public Subscriber1() =>
123+
EndpointSetup<DefaultServer>(c => c.DisableFeature<AutoSubscribe>(),
124+
metadata => metadata.RegisterPublisherFor<MyEvent>(typeof(Publisher)));
125+
126+
public class MyHandler : IHandleMessages<MyEvent>
127+
{
128+
public MyHandler(Context testContext) => this.testContext = testContext;
129+
130+
public Task Handle(MyEvent message, IMessageHandlerContext context)
131+
{
132+
testContext.Subscriber1GotTheEvent = true;
133+
return Task.CompletedTask;
134+
}
135+
136+
readonly Context testContext;
137+
}
138+
}
139+
140+
public class Subscriber2 : EndpointConfigurationBuilder
141+
{
142+
public Subscriber2() =>
143+
EndpointSetup<DefaultServer>(c => c.DisableFeature<AutoSubscribe>(),
144+
metadata => metadata.RegisterPublisherFor<MyEvent>(typeof(Publisher)));
145+
146+
public class MyHandler : IHandleMessages<MyEvent>
147+
{
148+
public MyHandler(Context testContext) => this.testContext = testContext;
149+
150+
public Task Handle(MyEvent messageThatIsEnlisted, IMessageHandlerContext context)
151+
{
152+
testContext.Subscriber2GotTheEvent = true;
153+
return Task.CompletedTask;
154+
}
155+
156+
readonly Context testContext;
157+
}
158+
}
159+
160+
public class MyEvent : IEvent
161+
{
162+
}
163+
164+
public class TriggerMessage : ICommand
165+
{
166+
}
167+
}
168+
}

src/NServiceBus.Core.Tests/Routing/RoutingToDispatchConnectorTests.cs

Lines changed: 117 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,107 @@
1414
[TestFixture]
1515
public class RoutingToDispatchConnectorTests
1616
{
17+
[Test]
18+
public async Task Should_preserve_message_state_for_one_routing_strategy_for_allocation_reasons()
19+
{
20+
var behavior = new RoutingToDispatchConnector();
21+
IEnumerable<TransportOperation> operations = null;
22+
var testableRoutingContext = new TestableRoutingContext
23+
{
24+
RoutingStrategies = new List<RoutingStrategy>
25+
{
26+
new DestinationRoutingStrategy("destination1", "HeaderKeyAddedByTheRoutingStrategy1", "HeaderValueAddedByTheRoutingStrategy1")
27+
}
28+
};
29+
var originalDispatchProperties = new DispatchProperties
30+
{
31+
{ "SomeKey", "SomeValue" }
32+
};
33+
testableRoutingContext.Extensions.Set(originalDispatchProperties);
34+
var originalHeaders = new Dictionary<string, string> { { "SomeHeaderKey", "SomeHeaderValue" } };
35+
testableRoutingContext.Message = new OutgoingMessage("ID", originalHeaders, Array.Empty<byte>());
36+
await behavior.Invoke(testableRoutingContext, context =>
37+
{
38+
operations = context.Operations;
39+
return Task.CompletedTask;
40+
});
41+
42+
Assert.That(operations, Has.Length.EqualTo(1));
43+
44+
TransportOperation destination1Operation = operations.ElementAt(0);
45+
Assert.That(destination1Operation.Message.MessageId, Is.EqualTo("ID"));
46+
Assert.That((destination1Operation.AddressTag as UnicastAddressTag)?.Destination, Is.EqualTo("destination1"));
47+
Dictionary<string, string> destination1Headers = destination1Operation.Message.Headers;
48+
Assert.That(destination1Headers, Contains.Item(new KeyValuePair<string, string>("SomeHeaderKey", "SomeHeaderValue")));
49+
Assert.That(destination1Headers, Contains.Item(new KeyValuePair<string, string>("HeaderKeyAddedByTheRoutingStrategy1", "HeaderValueAddedByTheRoutingStrategy1")));
50+
Assert.That(destination1Headers, Is.SameAs(originalHeaders));
51+
DispatchProperties destination1DispatchProperties = destination1Operation.Properties;
52+
Assert.That(destination1DispatchProperties, Contains.Item(new KeyValuePair<string, string>("SomeKey", "SomeValue")));
53+
Assert.That(destination1DispatchProperties, Is.SameAs(originalDispatchProperties));
54+
}
55+
56+
[Test]
57+
public async Task Should_copy_message_state_for_multiple_routing_strategies()
58+
{
59+
var behavior = new RoutingToDispatchConnector();
60+
IEnumerable<TransportOperation> operations = null;
61+
var testableRoutingContext = new TestableRoutingContext
62+
{
63+
RoutingStrategies = new List<RoutingStrategy>
64+
{
65+
new DestinationRoutingStrategy("destination1", "HeaderKeyAddedByTheRoutingStrategy1", "HeaderValueAddedByTheRoutingStrategy1"),
66+
new DestinationRoutingStrategy("destination2", "HeaderKeyAddedByTheRoutingStrategy2", "HeaderValueAddedByTheRoutingStrategy2")
67+
}
68+
};
69+
var originalDispatchProperties = new DispatchProperties
70+
{
71+
{ "SomeKey", "SomeValue" }
72+
};
73+
testableRoutingContext.Extensions.Set(originalDispatchProperties);
74+
var originalHeaders = new Dictionary<string, string> { { "SomeHeaderKey", "SomeHeaderValue" } };
75+
testableRoutingContext.Message = new OutgoingMessage("ID", originalHeaders, Array.Empty<byte>());
76+
await behavior.Invoke(testableRoutingContext, context =>
77+
{
78+
operations = context.Operations;
79+
return Task.CompletedTask;
80+
});
81+
82+
Assert.That(operations, Has.Length.EqualTo(2));
83+
84+
TransportOperation destination1Operation = operations.ElementAt(0);
85+
Assert.That(destination1Operation.Message.MessageId, Is.EqualTo("ID"));
86+
Assert.That((destination1Operation.AddressTag as UnicastAddressTag)?.Destination, Is.EqualTo("destination1"));
87+
Dictionary<string, string> destination1Headers = destination1Operation.Message.Headers;
88+
Assert.That(destination1Headers, Contains.Item(new KeyValuePair<string, string>("SomeHeaderKey", "SomeHeaderValue")));
89+
Assert.That(destination1Headers, Contains.Item(new KeyValuePair<string, string>("HeaderKeyAddedByTheRoutingStrategy1", "HeaderValueAddedByTheRoutingStrategy1")));
90+
Assert.That(destination1Headers, Does.Not.Contain(new KeyValuePair<string, string>("HeaderKeyAddedByTheRoutingStrategy2", "HeaderValueAddedByTheRoutingStrategy2")));
91+
Assert.That(destination1Headers, Is.Not.SameAs(originalHeaders));
92+
DispatchProperties destination1DispatchProperties = destination1Operation.Properties;
93+
Assert.That(destination1DispatchProperties, Contains.Item(new KeyValuePair<string, string>("SomeKey", "SomeValue")));
94+
Assert.That(destination1DispatchProperties, Is.Not.SameAs(originalDispatchProperties));
95+
96+
TransportOperation destination2Operation = operations.ElementAt(1);
97+
Assert.That(destination2Operation.Message.MessageId, Is.EqualTo("ID"));
98+
Assert.That((destination2Operation.AddressTag as UnicastAddressTag)?.Destination, Is.EqualTo("destination2"));
99+
Dictionary<string, string> destination2Headers = destination2Operation.Message.Headers;
100+
Assert.That(destination2Headers, Contains.Item(new KeyValuePair<string, string>("SomeHeaderKey", "SomeHeaderValue")));
101+
Assert.That(destination2Headers, Contains.Item(new KeyValuePair<string, string>("HeaderKeyAddedByTheRoutingStrategy2", "HeaderValueAddedByTheRoutingStrategy2")));
102+
Assert.That(destination2Headers, Does.Not.Contain(new KeyValuePair<string, string>("HeaderKeyAddedByTheRoutingStrategy1", "HeaderValueAddedByTheRoutingStrategy1")));
103+
Assert.That(destination2Headers, Is.Not.SameAs(originalHeaders));
104+
DispatchProperties destination2DispatchProperties = destination2Operation.Properties;
105+
Assert.That(destination2DispatchProperties, Is.Not.SameAs(originalDispatchProperties));
106+
Assert.That(destination2DispatchProperties, Contains.Item(new KeyValuePair<string, string>("SomeKey", "SomeValue")));
107+
108+
Assert.That(destination1Headers, Is.Not.SameAs(destination2Headers));
109+
Assert.That(destination1DispatchProperties, Is.Not.SameAs(destination2DispatchProperties));
110+
}
111+
17112
[Test]
18113
public async Task Should_preserve_headers_generated_by_custom_routing_strategy()
19114
{
20115
var behavior = new RoutingToDispatchConnector();
21116
Dictionary<string, string> headers = null;
22-
await behavior.Invoke(new TestableRoutingContext { RoutingStrategies = new List<RoutingStrategy> { new CustomRoutingStrategy() } }, context =>
117+
await behavior.Invoke(new TestableRoutingContext { RoutingStrategies = new List<RoutingStrategy> { new HeaderModifyingRoutingStrategy() } }, context =>
23118
{
24119
headers = context.Operations.First().Message.Headers;
25120
return Task.CompletedTask;
@@ -113,7 +208,7 @@ static IOutgoingSendContext CreateContext(SendOptions options, bool fromHandler)
113208
return context;
114209
}
115210

116-
class CustomRoutingStrategy : RoutingStrategy
211+
class HeaderModifyingRoutingStrategy : RoutingStrategy
117212
{
118213
public override AddressTag Apply(Dictionary<string, string> headers)
119214
{
@@ -122,6 +217,26 @@ public override AddressTag Apply(Dictionary<string, string> headers)
122217
}
123218
}
124219

220+
class DestinationRoutingStrategy : RoutingStrategy
221+
{
222+
public DestinationRoutingStrategy(string destination, string headerKey, string headerValue)
223+
{
224+
this.destination = destination;
225+
this.headerKey = headerKey;
226+
this.headerValue = headerValue;
227+
}
228+
229+
public override AddressTag Apply(Dictionary<string, string> headers)
230+
{
231+
headers[headerKey] = headerValue;
232+
return new UnicastAddressTag(destination);
233+
}
234+
235+
readonly string destination;
236+
readonly string headerKey;
237+
readonly string headerValue;
238+
}
239+
125240
class MyMessage : IMessage
126241
{
127242
}

src/NServiceBus.Core/Pipeline/Outgoing/RoutingContextExtensions.cs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,20 @@
11
namespace NServiceBus
22
{
3+
using System.Collections.Generic;
34
using Pipeline;
45
using Routing;
56
using Transport;
67

78
static class RoutingContextExtensions
89
{
9-
public static TransportOperation ToTransportOperation(this IRoutingContext context, RoutingStrategy strategy, DispatchConsistency dispatchConsistency)
10+
public static TransportOperation ToTransportOperation(this IRoutingContext context, RoutingStrategy strategy, DispatchConsistency dispatchConsistency, bool copySharedMutableMessageState)
1011
{
11-
var addressLabel = strategy.Apply(context.Message.Headers);
12-
var message = new OutgoingMessage(context.Message.MessageId, context.Message.Headers, context.Message.Body);
13-
14-
if (!context.Extensions.TryGet(out DispatchProperties dispatchProperties))
15-
{
16-
dispatchProperties = new DispatchProperties();
17-
}
12+
var headers = copySharedMutableMessageState ? new Dictionary<string, string>(context.Message.Headers) : context.Message.Headers;
13+
var dispatchProperties = context.Extensions.TryGet(out DispatchProperties properties)
14+
? copySharedMutableMessageState ? new DispatchProperties(properties) : properties
15+
: new DispatchProperties();
16+
var addressLabel = strategy.Apply(headers);
17+
var message = new OutgoingMessage(context.Message.MessageId, headers, context.Message.Body);
1818

1919
var transportOperation = new TransportOperation(message, addressLabel, dispatchProperties, dispatchConsistency);
2020
return transportOperation;

src/NServiceBus.Core/Pipeline/Outgoing/RoutingToDispatchConnector.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,11 @@ public override Task Invoke(IRoutingContext context, Func<IDispatchContext, Task
2626

2727
var operations = new TransportOperation[context.RoutingStrategies.Count];
2828
var index = 0;
29+
// when there are more than one routing strategy we want to make sure each transport operation is independent
30+
var copySharedMutableMessageState = context.RoutingStrategies.Count > 1;
2931
foreach (var strategy in context.RoutingStrategies)
3032
{
31-
operations[index] = context.ToTransportOperation(strategy, dispatchConsistency);
33+
operations[index] = context.ToTransportOperation(strategy, dispatchConsistency, copySharedMutableMessageState);
3234
index++;
3335
}
3436

src/NServiceBus.Core/Recoverability/SatelliteRecoverabilityExecutor.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,11 @@ public async Task<ErrorHandleResult> Invoke(
4444
// using the count here is not entirely accurate because of the way we duplicate based on the strategies
4545
// but in many cases it is a good approximation.
4646
transportOperations ??= new List<TransportOperation>(routingContexts.Count);
47+
// when there are more than one routing strategy we want to make sure each transport operation is independent
48+
var copySharedMutableMessageState = routingContext.RoutingStrategies.Count > 1;
4749
foreach (var strategy in routingContext.RoutingStrategies)
4850
{
49-
var transportOperation = routingContext.ToTransportOperation(strategy, DispatchConsistency.Default);
51+
var transportOperation = routingContext.ToTransportOperation(strategy, DispatchConsistency.Default, copySharedMutableMessageState);
5052
transportOperations.Add(transportOperation);
5153
}
5254
}

0 commit comments

Comments
 (0)