-
Notifications
You must be signed in to change notification settings - Fork 56
Implementing Plugins for EventHub #324
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Thanks David, I will review the proposed changes. I wonder if I misread your first sentence. The plugin model isn't supported .Net EH library today. Are you referring to some other library? #Resolved |
@serkantkaraca Yes the same feature is supported by Service bus dotnet libarary, which is handy, because you can dynamically set plugins to the clients, and decouple implementations. event differents plugins for differents tenancies, I'm using a lot this feature in Service Bus, and currently doing some workarounds to do the same with Event bus. Plugin references for Service bus : |
OK, then we are on the same page. I thought you were referring to https://www.nuget.org/packages/WindowsAzure.ServiceBus #Resolved |
@serkantkaraca yeah the same (one is the net framework the other the new one with net standard) : ) #Resolved |
@@ -40,6 +44,50 @@ internal static int ValidateEvents(IEnumerable<EventData> eventDatas) | |||
return count; | |||
} | |||
|
|||
async Task<EventData> ProcessEvent(EventData @event) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perfect will check if in other place there is a @event also #Resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -1,6 +1,9 @@ | |||
// Copyright (c) Microsoft. All rights reserved. | |||
// Licensed under the MIT license. See LICENSE file in the project root for full license information. | |||
|
|||
using System.Linq; | |||
using Microsoft.Azure.EventHubs.Core; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move under namespace #Resolved
@@ -1,6 +1,8 @@ | |||
// Copyright (c) Microsoft. All rights reserved. | |||
// Licensed under the MIT license. See LICENSE file in the project root for full license information. | |||
|
|||
using System; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move under namespace #Resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure will keep in mind for further PR's thanks #Resolved
@@ -209,6 +211,33 @@ public void ReceiveHandlerExitingWithError(string clientId, string partitionId, | |||
} | |||
} | |||
|
|||
[Event(22, Level = EventLevel.Verbose, Message = "User plugin {0} called on client {1}")] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
22 [](start = 15, length = 2)
Let's start plugin events from 100 #Resolved
@@ -39,7 +39,7 @@ internal class Resources { | |||
internal static global::System.Resources.ResourceManager ResourceManager { | |||
get { | |||
if (object.ReferenceEquals(resourceMan, null)) { | |||
global::System.Resources.ResourceManager temp = new global::System.Resources.ResourceManager("Microsoft.Azure.EventHubs.Resources", typeof(Resources).GetTypeInfo().Assembly); | |||
global::System.Resources.ResourceManager temp = new global::System.Resources.ResourceManager("Microsoft.Azure.EventHubs.Resources", typeof(Resources).Assembly); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change will probably break one of the builds. You can revert the change manually if needed. #Resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@serkantkaraca Why is the reason this fail w/o typeof(Resources).GetTypeInfo().Assembly
? #Resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't really remember the exact reason but I guess some assembly binding issue.
In reply to: 224279162 [](ancestors = 224279162)
|
||
using System.Collections.Generic; | ||
using System.Threading.Tasks; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move under namespace #Resolved
/// <summary> | ||
/// This class provides methods that can be overridden to manipulate messages for custom plugin functionality. | ||
/// </summary> | ||
public abstract class EventHubPlugin |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
EventHub [](start = 26, length = 8)
EventHubs #Resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks #Resolved
public virtual bool ShouldContinueOnException => false; | ||
|
||
/// <summary> | ||
/// This operation is called before a event is sent. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a [](start = 48, length = 1)
an #Resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks #Resolved
/// </summary> | ||
/// <param name="eventData">The <see cref="EventData" /> to be modified by the plugin</param> | ||
/// <returns>The modified event <see cref="EventData" /></returns> | ||
public virtual Task<EventData> BeforeMessageSend(EventData eventData) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BeforeMessageSend [](start = 39, length = 17)
Makes sense to have AfterMessageSend?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes for that I've to change the processor I can do, just want to be sure that the Feature was not a conflict with the roadmap to go ahead.
@@ -1,6 +1,10 @@ | |||
// Copyright (c) Microsoft. All rights reserved. | |||
// Licensed under the MIT license. See LICENSE file in the project root for full license information. | |||
|
|||
using System.Collections.Generic; | |||
using System.Linq; | |||
using Microsoft.Azure.EventHubs.Core; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move under namespace #Resolved
@@ -55,6 +63,43 @@ public RetryPolicy RetryPolicy | |||
/// <returns>The asynchronous operation</returns> | |||
public abstract Task CloseAsync(); | |||
|
|||
/// <summary> | |||
/// Registers a <see cref="EventHubPlugin"/> to be used with this sender. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sender [](start = 74, length = 6)
This may not be a sender. Let's keep this generic. #Resolved
{ | ||
if (eventHubPlugin == null) | ||
{ | ||
throw new ArgumentNullException(nameof(eventHubPlugin), Resources.ArgumentNullOrWhiteSpace.FormatForUser(nameof(eventHubPlugin))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nameof [](start = 48, length = 6)
Plugin has property 'Name'. Shouldn't we use it instead for duplicate check? #Resolved
} | ||
if (this.RegisteredPlugins.Any(p => p.Name == eventHubPlugin)) | ||
{ | ||
var plugin = this.RegisteredPlugins.First(p => p.Name == eventHubPlugin); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First [](start = 52, length = 5)
Remove all matching instead of first here in case RegisteredPlugins has duplicates. #Resolved
@@ -10,6 +10,8 @@ namespace Microsoft.Azure.EventHubs | |||
using System.Threading.Tasks; | |||
using Microsoft.Azure.EventHubs.Amqp; | |||
using Microsoft.IdentityModel.Clients.ActiveDirectory; | |||
using System.Linq; | |||
using Microsoft.Azure.EventHubs.Core; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you sort usings? Same for the other classes. #Resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure ! :) #Resolved
@serkantkaraca Hey Serkant I made the proposed changes. However didn't get that if a plugin with same name is configured this will break no take the last one so the first will break the code there can not be more than 1 with the same name, I changed the error message to take the plugin name instead for more clarification. |
@serkantkaraca All the requested changes were made
Waiting for the feedback. Great thanks :) #Resolved |
RegisteredPlugins is a generic List. It can end up with duplicates easily due to race. If you think having duplicates will hurt then you can try using ConcurrentDictionary or a lock. IMHO, duplicates won't hurt but let's do the cleanup accounting duplicate potential here. In reply to: 428391883 [](ancestors = 428391883) |
Let's not throw in that case. #Resolved |
@serkantkaraca I was thinking the same :) #Resolved |
@serkantkaraca Done :) #Resolved |
/// <summary> | ||
/// Registers a <see cref="EventHubsPlugin"/> to be used with this client. | ||
/// </summary> | ||
public virtual void RegisterPlugin(EventHubsPlugin eventHubPlugin) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
eventHub [](start = 59, length = 8)
Can you make sure it is 'eventhubs' everywhere since this needs to be same as the service name. 'Event Hub' refers to single entity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure #Resolved
/// Unregisters a <see cref="EventHubsPlugin"/>. | ||
/// </summary> | ||
/// <param name="eventHubPlugin">The <see cref="EventHubsPlugin.Name"/> of the plugin to be unregistered.</param> | ||
public virtual void UnregisterPlugin(string eventHubPlugin) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
eventHubPlugin [](start = 52, length = 14)
You can use 'pluginName' here I guess.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure #Resolved
if (this.RegisteredPlugins.Any(p => p.Key == eventHubPlugin.Name)) | ||
{ | ||
throw new ArgumentException(nameof(eventHubPlugin), Resources.PluginAlreadyRegistered.FormatForUser(nameof(eventHubPlugin.Name))); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't need this anymore since TryAdd does same job.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually according this : https://docs.microsoft.com/en-us/dotnet/api/system.collections.concurrent.concurrentdictionary-2.tryadd?redirectedfrom=MSDN&view=netframework-4.7.2#System_Collections_Concurrent_ConcurrentDictionary_2_TryAdd__0__1
It throws ArgumentNullException
Only if key is null which will never be the case because above is this code
if (string.IsNullOrWhiteSpace(pluginName))
{
throw new ArgumentNullException(nameof(pluginName), Resources.ArgumentNullOrWhiteSpace.FormatForUser(nameof(pluginName)));
}
Just my opinion but it's handy get a bit more information there about which one failed also. #Resolved
} | ||
if (this.RegisteredPlugins.Any(p => p.Key == eventHubPlugin.Name)) | ||
{ | ||
throw new ArgumentException(nameof(eventHubPlugin), Resources.PluginAlreadyRegistered.FormatForUser(nameof(eventHubPlugin.Name))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nameof [](start = 116, length = 6)
just plugin.Name, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean this instead ?
if (this.RegisteredPlugins.Any(p => p.Value.Name == eventHubPlugin.Name))
{
throw new ArgumentException(nameof(eventHubPlugin), Resources.PluginAlreadyRegistered.FormatForUser(nameof(eventHubPlugin.Name)));
}
``` #Resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nameof will return 'Name' and the error message will build as: The Name plugin has already been registered.
In reply to: 224288914 [](ancestors = 224288914)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair enought #Resolved
@serkantkaraca done ! |
Seems this is not fixed yet. nameof(eventHubsPlugin.Name) |
@serkantkaraca which Line ? I've changed here |
Change |
@serkantkaraca Ok sorry because the misunderstanding Can't put nameof(eventHubsPlugin.Name) either just eventHubsPlugin.Name in line 74 as the object is null and will crash. Are you agree with this ? |
My comment is for lines 78 and 82 only. |
@serkantkaraca Ok thanks |
@@ -1,11 +1,15 @@ | |||
// Copyright (c) Microsoft. All rights reserved. | |||
// Licensed under the MIT license. See LICENSE file in the project root for full license information. | |||
|
|||
using System.Collections.Concurrent; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move under namespace section
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ohh resharper move it automatically and I didn't aware of that, super sorry will configure to avoid that in a future I did with other configurations :) .
Added one more comment and after that I think we are all good. Thx! |
@serkantkaraca No Serkat thanks to you for let me participate and for you patient in this big chunk of code, please if anytime you need help because of time or need something to investigate is needed feel free to ask :D will be more than happy to help. ! |
Seems below test is failing. Can you take a look? Plugin_without_ShouldContinueOnException_should_throw |
One last thing to solve but then celebrate the feature 🎉 |
* Use Azure Storage Account NameValidator to check LeaseContainerName (#327) * Create EventHubsTimeoutException for consistency (#328) * Mark readonly fields, complete cancellation token and remove useless Where in LINQ (#326) * Implementing Plugins for EventHub (#324) * Implement Plugin to Process each event when client is sending telemetry * Microsoft copyright header * Fix Typo * Changes for #324 (review) * Implement AfterEventsReceive for EventHubsPlugin * Implement Plugin Tests * Sort usings * changes for #324 (comment) * Fix Resources * Changes for #324 (review) * Change for #324 (comment) * Move Using to Namespace block * Copy Plugins for InnerSender in AmqpEventHubClient (#329) * Prevent event data being over writed when multiple plugins called (#330) * Parallelize expired lease check in processor host (#333) * Parallelize expired lease check * - * Remove unit test and rename FirstPlugin (#335) * Using Lazy instead of static initialization for ExceptionUtility (#337) * Using Lazy instead of static initialization * Use default ctor * Complete Missing CancellationToken (#338) * Fix LazyLoad Ctor (#341) * Nullify Task when The Stop is complete (#342) * Nullfy Task when The Stop is complete * Test for Re Register event processor * Reset CancellationTokenSource * Replace Locks in AmqpEventHubClient and Code Clean ups in AmqpEventHubClient (#345) * Replace Double lock patterns by using Lazy * Code CleanUps in AmqpEventHubClient * Check client management address when it is being created * Remove Result for AzureStorageCheckpointLeaseManager GetAllLeases (#346) * Remove Result for async call * Get awaiter get result for GetAllLeases * Remove useless using * Remove useless initializator * Replace Task Run Call * Remove Task Run * Fix Tests Moving Lazy initialization at the top of the ctor (#347) * Max message size is 1MB now. Updating the test accordingly. (#344) * Use Guard Class to improve code legibility and avoid lines (#339) * Using Guard to improve code reading and avoid lines * Using ArgumentNotNullOrEmpty * Complete More validations with Guard * Replace All ArgumentNullException * Fix Namespace * Change a few expecting exceptions (#351) * Code CleanUps in Primitives / General (#350) * EventHubCode Clean ups * Code CleanUps in Primitives * Rename variable * Leave string cast * Some Code Cleanups in Ampq Implementation (#349) * Omit failures in receive pump (#354) * Clean more results in sync context (#348) * Clean more results in sync context * Order using * Changes for #348 (review) * Remove using * Support partition-empty in runtime metrics (#352) * Adding partition IsEmpty support to runtime metrics * Use AMQP client constants * Move to correct name * is_partition_empty is the correct name * Reduce the number of storage calls in lease manager (#357) * Couple improvements in Azure Lease Manager to reduce numberof storage calls. * N/A as partition id * Go with default timeout * Moving to most recent AMQP release * Fix flaky EPH test * Adding 30 seconds default operation timeout back to tests. * Reducing EPH to storage IO calls. * Couple more fixes * . * Set token for owned leases. * Refresh lease before acquiring in processor host. * Fix metada removal order during lease release. * Update lease token only for already running pumps to avoid resetting receiver position data. * FetchAttributesAsync of blob as part of GetAllLeasesAsync() call. * Refresh lease before attempting to steal * Don't retry if we already lost the lease during receiver open. * Don't attempt to steal if owner has changed from the calculation time to refresh time. * - * Partition pump to close when hit ReceiverDisconnectedException since this is not recoverable. * - * Ignore any failure during releasing the lease * Don't update pump token if token is empty * Nullify the owner on the lease in case this host lost it. * Increment ourLeaseCount when a lease is acquired. * Correcting task list * No need to assign pump lease token to downloaded lease. * comment update * comment update * Clear ownership on partial acquisition. * Clear ownership on partial acquisition. * Make sure we don't leave the lease as owned if acquisition failed. * Adding logs to debug lease corruption bug * Adding logs to debug lease corruption bug * Small fix at steal lease check * Protect subject iterator variable during task creation in for loops. * . * Renew lease right after ChangeLease call * Don't create pump if partition expired or already moved to some other host. * Use refreshed lease while creating partition pump. * Remove temporary debug logs. * Addressing SJ's comments * Remove obsolete * Moving AD SDK 4.5 and Xamarin.iOS10 support (#365) * Making SystemProperties public addressing testability issues. (#332) * Provide batch object's event data enumerator publicly. (#356) * Bumping up the SDK version (#366) * ServiceFabricProcessor preview (#262) This is the code that built and released as preview version 0.5.2 https://www.nuget.org/packages/Microsoft.Azure.EventHubs.ServiceFabricProcessor/0.5.2 At the time it couldn't be merged with dev due to test issues from unrelated work, so we did the release from the SFprocessor branch. Those issues have been resolved, and we expect that future preview releases will come from dev branch. * Client changes to support SFP (#367) Three changes in the client needed to support SFP or SFP testing: 1) A previous PR added the ability to set EventData.SystemProperties, but it is not much use without the ability to create a new SystemPropertiesCollection instance. SFP testing does not need to set individual values on a SystemPropertiesCollection, just create new instances with values that do not change after creation time, so I added a public constructor which sets all the values. 2) SFP was previously creating EventHubClients with connection strings, but there is no string syntax for setting the operation timeout, and the message pump feature on PartitionReceiver uses the operation timeout. The easiest way to let SFP set the operation timeout is to make public the existing EventHubClient.Create call which takes a ConnectionStringBuilder. 3) Originally, SFP was a friend assembly of the client, the same as EPH, but it was decided that that was not the best design. Supporting the EnableReceiverRuntimeMetric option requires the ability to create new ReceiverRuntimeInformation instances (made constructor public) and update the values from a received EventData. Copying the values in SFP code would require making a bunch of properties on EventData public get, and the corresponding properties on ReceiverRuntimeInformation public set. Instead, I added an Update method which takes an EventData and performs the copy within the client assembly so no visibility change is required. Also modified the EPH code to use the new Update method. * delaysign=false
Description
This is an Implementation for Plugins supported in the NetClient Event Hub
The approach is support the same feature ServiceBus does, which is handy.
The code doesn't have include unit test, if the EventHub team are interested in support this feature I can go ahead and complete the required UT.
I think it'd be useful, I've to use it and have to do some workarounds to overwrite
EventDataSender
This checklist is used to make sure that common guidelines for a pull request are followed.