Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 40 additions & 11 deletions sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System;
using System.Buffers;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Linq;
using System.Runtime.ExceptionServices;
using System.Runtime.InteropServices;
Expand Down Expand Up @@ -94,6 +95,17 @@ internal class AmqpReceiver : TransportReceiver
/// </summary>
private readonly ConcurrentExpiringSet<Guid> _requestResponseLockedMessages;

private static IReadOnlyList<ServiceBusReceivedMessage> s_backingEmptyList;

private static IReadOnlyList<ServiceBusReceivedMessage> EmptyList
{
get
{
s_backingEmptyList ??= new ReadOnlyCollection<ServiceBusReceivedMessage>(new List<ServiceBusReceivedMessage>(0));
return s_backingEmptyList;
}
}

/// <summary>
/// Initializes a new instance of the <see cref="AmqpReceiver"/> class.
/// </summary>
Expand Down Expand Up @@ -286,7 +298,6 @@ private async Task<IReadOnlyList<ServiceBusReceivedMessage>> ReceiveMessagesAsyn
CancellationToken cancellationToken)
{
var link = default(ReceivingAmqpLink);
var receivedMessages = new List<ServiceBusReceivedMessage>();

ThrowIfSessionLockLost();

Expand Down Expand Up @@ -350,10 +361,15 @@ private async Task<IReadOnlyList<ServiceBusReceivedMessage>> ReceiveMessagesAsyn
var messagesReceived = await receiveMessagesCompletionSource.Task
.ConfigureAwait(false);

List<ServiceBusReceivedMessage> receivedMessages = null;
// If event messages were received, then package them for consumption and
// return them.
foreach (AmqpMessage message in messagesReceived)
{
receivedMessages ??= messagesReceived is IReadOnlyCollection<AmqpMessage> readOnlyList
? new List<ServiceBusReceivedMessage>(readOnlyList.Count)
: new List<ServiceBusReceivedMessage>();

if (_receiveMode == ServiceBusReceiveMode.ReceiveAndDelete)
{
link.DisposeDelivery(message, true, AmqpConstants.AcceptedOutcome);
Expand All @@ -363,7 +379,7 @@ private async Task<IReadOnlyList<ServiceBusReceivedMessage>> ReceiveMessagesAsyn
message.Dispose();
}

return receivedMessages;
return receivedMessages ?? EmptyList;
}
catch (OperationCanceledException)
{
Expand Down Expand Up @@ -956,13 +972,19 @@ private async Task<IReadOnlyList<ServiceBusReceivedMessage>> PeekMessagesInterna

AmqpResponseMessage amqpResponseMessage = AmqpResponseMessage.CreateResponse(responseAmqpMessage);

var messages = new List<ServiceBusReceivedMessage>();
List<ServiceBusReceivedMessage> messages = null;
if (amqpResponseMessage.StatusCode == AmqpResponseStatusCode.OK)
{
ServiceBusReceivedMessage message = null;
IEnumerable<AmqpMap> messageList = amqpResponseMessage.GetListValue<AmqpMap>(ManagementConstants.Properties.Messages);
foreach (AmqpMap entry in messageList)
var messageList = amqpResponseMessage.GetListValue<AmqpMap>(ManagementConstants.Properties.Messages);
// not using foreach for better performance
for (var index = 0; index < messageList.Count; index++)
{
AmqpMap entry = messageList[index];
messages ??= messageList is IReadOnlyCollection<AmqpMap> readOnlyList
? new List<ServiceBusReceivedMessage>(readOnlyList.Count)
: new List<ServiceBusReceivedMessage>();

var payload = (ArraySegment<byte>)entry[ManagementConstants.Properties.Message];
var amqpMessage = AmqpMessage.CreateAmqpStreamMessage(new BufferListStream(new[] { payload }), true);
message = AmqpMessageConverter.AmqpMessageToSBMessage(amqpMessage, true);
Expand All @@ -973,13 +995,13 @@ private async Task<IReadOnlyList<ServiceBusReceivedMessage>> PeekMessagesInterna
{
LastPeekedSequenceNumber = message.SequenceNumber;
}
return messages;
return messages ?? EmptyList;
}

if (amqpResponseMessage.StatusCode == AmqpResponseStatusCode.NoContent ||
(amqpResponseMessage.StatusCode == AmqpResponseStatusCode.NotFound && Equals(AmqpClientConstants.MessageNotFoundError, amqpResponseMessage.GetResponseErrorCondition())))
{
return messages;
return EmptyList;
}

throw amqpResponseMessage.ToMessagingContractException();
Expand Down Expand Up @@ -1237,7 +1259,7 @@ internal virtual async Task<IReadOnlyList<ServiceBusReceivedMessage>> ReceiveDef
long[] sequenceNumbers,
TimeSpan timeout)
{
var messages = new List<ServiceBusReceivedMessage>();
List<ServiceBusReceivedMessage> messages = null;
try
{
var amqpRequestMessage = AmqpRequestMessage.CreateRequest(ManagementConstants.Operations.ReceiveBySequenceNumberOperation, timeout, null);
Expand All @@ -1260,10 +1282,17 @@ internal virtual async Task<IReadOnlyList<ServiceBusReceivedMessage>> ReceiveDef
if (response.StatusCode == AmqpResponseStatusCode.OK)
{
var amqpMapList = response.GetListValue<AmqpMap>(ManagementConstants.Properties.Messages);
foreach (var entry in amqpMapList)
// not using foreach for better performance
for (var index = 0; index < amqpMapList.Count; index++)
{
var entry = amqpMapList[index];
messages ??= amqpMapList is IReadOnlyCollection<AmqpMap> readOnlyList
? new List<ServiceBusReceivedMessage>(readOnlyList.Count)
: new List<ServiceBusReceivedMessage>();

var payload = (ArraySegment<byte>)entry[ManagementConstants.Properties.Message];
var amqpMessage = AmqpMessage.CreateAmqpStreamMessage(new BufferListStream(new[] { payload }), true);
var amqpMessage =
AmqpMessage.CreateAmqpStreamMessage(new BufferListStream(new[] { payload }), true);
var message = AmqpMessageConverter.AmqpMessageToSBMessage(amqpMessage);
if (entry.TryGetValue<Guid>(ManagementConstants.Properties.LockToken, out var lockToken))
{
Expand All @@ -1287,7 +1316,7 @@ internal virtual async Task<IReadOnlyList<ServiceBusReceivedMessage>> ReceiveDef
throw; // will never be reached
}

return messages;
return messages ?? EmptyList;
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,24 @@ public TValue GetValue<TValue>(MapKey key)
return (TValue)Map[key];
}

public IEnumerable<TValue> GetListValue<TValue>(MapKey key)
// returning list for better performance
public List<TValue> GetListValue<TValue>(MapKey key)
{
if (Map == null)
{
throw new ArgumentException(AmqpValue.Name);
}

var list = (List<object>)Map[key];

return list.Cast<TValue>();
// not optimized for empty lists due to the generic nature of this method
var values = new List<TValue>(list.Count);
// not using foreach for better performance
for (var index = 0; index < list.Count; index++)
{
var item = list[index];
values.Add((TValue)item);
}
return values;
}

public AmqpSymbol GetResponseErrorCondition()
Expand Down