diff --git a/src/Microsoft.Azure.EventHubs/Amqp/AmqpClientConstants.cs b/src/Microsoft.Azure.EventHubs/Amqp/AmqpClientConstants.cs index e5c16ab..d7be6da 100644 --- a/src/Microsoft.Azure.EventHubs/Amqp/AmqpClientConstants.cs +++ b/src/Microsoft.Azure.EventHubs/Amqp/AmqpClientConstants.cs @@ -72,6 +72,7 @@ class AmqpClientConstants public const string ManagementPartitionLastEnqueuedOffset = "last_enqueued_offset"; public const string ManagementPartitionLastEnqueuedTimeUtc = "last_enqueued_time_utc"; public const string ManagementPartitionRuntimeInfoRetrievalTimeUtc = "runtime_info_retrieval_time_utc"; + public const string ManagementPartitionRuntimeInfoPartitionIsEmpty = "is_partition_empty"; // Response codes public const string ResponseStatusCode = "status-code"; diff --git a/src/Microsoft.Azure.EventHubs/Amqp/AmqpServiceClient.cs b/src/Microsoft.Azure.EventHubs/Amqp/AmqpServiceClient.cs index a2511c0..adbe010 100644 --- a/src/Microsoft.Azure.EventHubs/Amqp/AmqpServiceClient.cs +++ b/src/Microsoft.Azure.EventHubs/Amqp/AmqpServiceClient.cs @@ -122,13 +122,14 @@ public async Task GetPartitionRuntimeInform return new EventHubPartitionRuntimeInformation() { - Type = (string)infoMap[new MapKey("type")], - Path = (string)infoMap[new MapKey("name")], - PartitionId = (string)infoMap[new MapKey("partition")], - BeginSequenceNumber = (long)infoMap[new MapKey("begin_sequence_number")], - LastEnqueuedSequenceNumber = (long)infoMap[new MapKey("last_enqueued_sequence_number")], - LastEnqueuedOffset = (string)infoMap[new MapKey("last_enqueued_offset")], - LastEnqueuedTimeUtc = (DateTime)infoMap[new MapKey("last_enqueued_time_utc")] + Type = (string)infoMap[new MapKey(AmqpClientConstants.EntityTypeName)], + Path = (string)infoMap[new MapKey(AmqpClientConstants.EntityNameKey)], + PartitionId = (string)infoMap[new MapKey(AmqpClientConstants.PartitionNameKey)], + BeginSequenceNumber = (long)infoMap[new MapKey(AmqpClientConstants.ManagementPartitionBeginSequenceNumber)], + LastEnqueuedSequenceNumber = (long)infoMap[new MapKey(AmqpClientConstants.ManagementPartitionLastEnqueuedSequenceNumber)], + LastEnqueuedOffset = (string)infoMap[new MapKey(AmqpClientConstants.ManagementPartitionLastEnqueuedOffset)], + LastEnqueuedTimeUtc = (DateTime)infoMap[new MapKey(AmqpClientConstants.ManagementPartitionLastEnqueuedTimeUtc)], + IsEmpty = (bool)infoMap[new MapKey(AmqpClientConstants.ManagementPartitionRuntimeInfoPartitionIsEmpty)] }; } diff --git a/src/Microsoft.Azure.EventHubs/EventHubPartitionRuntimeInformation.cs b/src/Microsoft.Azure.EventHubs/EventHubPartitionRuntimeInformation.cs index b7394a6..d362b70 100644 --- a/src/Microsoft.Azure.EventHubs/EventHubPartitionRuntimeInformation.cs +++ b/src/Microsoft.Azure.EventHubs/EventHubPartitionRuntimeInformation.cs @@ -35,5 +35,8 @@ public class EventHubPartitionRuntimeInformation /// Gets the enqueued UTC time of the last event. /// The enqueued time of the last event. public DateTime LastEnqueuedTimeUtc { get; set; } + + /// Gets whether partition is empty or not. + public bool IsEmpty { get; set; } } } diff --git a/test/Microsoft.Azure.EventHubs.Tests/Client/ReceiverRuntimeMetricsTests.cs b/test/Microsoft.Azure.EventHubs.Tests/Client/ReceiverRuntimeMetricsTests.cs index a999ef0..68cd964 100644 --- a/test/Microsoft.Azure.EventHubs.Tests/Client/ReceiverRuntimeMetricsTests.cs +++ b/test/Microsoft.Azure.EventHubs.Tests/Client/ReceiverRuntimeMetricsTests.cs @@ -131,6 +131,7 @@ async Task ValidateEnabledBehavior(PartitionReceiver partitionReceiver, EventHub var message = messages.Single(); + Assert.False(pInfo.IsEmpty, $"pInfo.IsEmpty == {pInfo.IsEmpty}"); Assert.True(partitionReceiver.RuntimeInfo.LastEnqueuedOffset == pInfo.LastEnqueuedOffset, $"FAILED partitionReceiver.RuntimeInfo.LastEnqueuedOffset == {partitionReceiver.RuntimeInfo.LastEnqueuedOffset}"); Assert.True(partitionReceiver.RuntimeInfo.LastEnqueuedTimeUtc == pInfo.LastEnqueuedTimeUtc,