Skip to content
This repository was archived by the owner on Oct 12, 2023. It is now read-only.

Commit b6f0474

Browse files
vinaysuryajtaubensee
authored andcommitted
Fix regression introduced in Sendpath because of an earlier checkin (#74)
1 parent ac87137 commit b6f0474

File tree

2 files changed

+30
-17
lines changed

2 files changed

+30
-17
lines changed

src/Microsoft.Azure.ServiceBus/Amqp/AmqpMessageConverter.cs

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -34,47 +34,43 @@ static class AmqpMessageConverter
3434
public static AmqpMessage BrokeredMessagesToAmqpMessage(IEnumerable<BrokeredMessage> brokeredMessages, bool batchable)
3535
{
3636
AmqpMessage amqpMessage = null;
37+
AmqpMessage firstAmqpMessage = null;
3738
BrokeredMessage firstBrokeredMessage = null;
3839
List<Data> dataList = null;
3940
int messageCount = 0;
4041
foreach (var brokeredMessage in brokeredMessages)
4142
{
42-
if (firstBrokeredMessage == null)
43-
{
44-
firstBrokeredMessage = brokeredMessage;
45-
}
43+
messageCount++;
4644

47-
if (messageCount == 1)
45+
amqpMessage = AmqpMessageConverter.ClientGetMessage(brokeredMessage);
46+
if (firstAmqpMessage == null)
4847
{
49-
dataList = new List<Data> { ToData(amqpMessage) };
48+
firstAmqpMessage = amqpMessage;
49+
firstBrokeredMessage = brokeredMessage;
50+
continue;
5051
}
5152

52-
amqpMessage = AmqpMessageConverter.ClientGetMessage(brokeredMessage);
53-
54-
if (messageCount > 1)
53+
if (dataList == null)
5554
{
56-
// ReSharper disable once PossibleNullReferenceException
57-
dataList.Add(ToData(amqpMessage));
55+
dataList = new List<Data>() { ToData(firstAmqpMessage) };
5856
}
5957

60-
messageCount++;
58+
dataList.Add(ToData(amqpMessage));
6159
}
6260

63-
if (messageCount == 1)
61+
if (messageCount == 1 && firstAmqpMessage != null)
6462
{
65-
amqpMessage.Batchable = batchable;
66-
return amqpMessage;
63+
firstAmqpMessage.Batchable = batchable;
64+
return firstAmqpMessage;
6765
}
6866

6967
amqpMessage = AmqpMessage.Create(dataList);
7068
amqpMessage.MessageFormat = AmqpConstants.AmqpBatchedMessageFormat;
7169

72-
// ReSharper disable once PossibleNullReferenceException
7370
if (firstBrokeredMessage.MessageId != null)
7471
{
7572
amqpMessage.Properties.MessageId = firstBrokeredMessage.MessageId;
7673
}
77-
7874
if (firstBrokeredMessage.SessionId != null)
7975
{
8076
amqpMessage.Properties.GroupId = firstBrokeredMessage.SessionId;

test/Microsoft.Azure.ServiceBus.UnitTests/TestUtility.cs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ internal static async Task<IEnumerable<BrokeredMessage>> ReceiveMessagesAsync(Me
7575
}
7676
}
7777

78+
VerifyUniqueMessages(messagesToReturn);
7879
Log($"Received {messagesToReturn.Count} messages");
7980
return messagesToReturn;
8081
}
@@ -100,6 +101,7 @@ internal static async Task<IEnumerable<BrokeredMessage>> PeekMessagesAsync(Messa
100101
}
101102
}
102103

104+
VerifyUniqueMessages(peekedMessages);
103105
Log($"Peeked {peekedMessages.Count} messages");
104106
return peekedMessages;
105107
}
@@ -127,5 +129,20 @@ internal static async Task DeferMessagesAsync(MessageReceiver messageReceiver, I
127129
await messageReceiver.DeferAsync(messages.Select(message => message.LockToken));
128130
Log($"Deferred {messages.Count()} messages");
129131
}
132+
133+
static void VerifyUniqueMessages(List<BrokeredMessage> messages)
134+
{
135+
if (messages != null && messages.Count > 1)
136+
{
137+
HashSet<long> sequenceNumbers = new HashSet<long>();
138+
foreach (var message in messages)
139+
{
140+
if (!sequenceNumbers.Add(message.SequenceNumber))
141+
{
142+
throw new Exception($"Sequence Number '{message.SequenceNumber}' was repeated");
143+
}
144+
}
145+
}
146+
}
130147
}
131148
}

0 commit comments

Comments
 (0)