Skip to content

Commit 2682eb3

Browse files
Move from protobuf-net to Google.Protobuf
1 parent 3187880 commit 2682eb3

30 files changed

+1354
-4097
lines changed

CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,13 @@ All notable changes to this project will be documented in this file.
44

55
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/) and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
66

7+
## [Unreleased]
8+
9+
### Changed
10+
11+
- Replaced the protobuf-net (version 3.2.56) dependency with Google.Protobuf (version 3.32.1)
12+
- System.Collections.Immutable (version 9.0.9) is now a dependency for .NET Standard 2.0 and 2.1
13+
714
## [4.3.2] - 2025-09-17
815

916
### Changed

src/DotPulsar/DotPulsar.csproj

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,19 +24,25 @@
2424
</PropertyGroup>
2525

2626
<ItemGroup>
27+
<PackageReference Include="Google.Protobuf" Version="3.32.1" />
28+
<PackageReference Include="Grpc.Tools" Version="2.72.0">
29+
<PrivateAssets>all</PrivateAssets>
30+
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
31+
</PackageReference>
2732
<PackageReference Include="HashDepot" Version="2.0.3" />
2833
<PackageReference Include="Microsoft.Extensions.ObjectPool" Version="9.0.9" />
29-
<PackageReference Include="protobuf-net" Version="3.2.56" />
3034
</ItemGroup>
3135

3236
<ItemGroup Condition="'$(TargetFramework)' == 'netstandard2.0'">
3337
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="9.0.9" />
3438
<PackageReference Include="Microsoft.Bcl.HashCode" Version="6.0.0" />
39+
<PackageReference Include="System.Collections.Immutable" Version="9.0.9" />
3540
<PackageReference Include="System.Diagnostics.DiagnosticSource" Version="9.0.9" />
3641
<PackageReference Include="System.IO.Pipelines" Version="8.0.0" />
3742
</ItemGroup>
3843

3944
<ItemGroup Condition="'$(TargetFramework)' == 'netstandard2.1'">
45+
<PackageReference Include="System.Collections.Immutable" Version="9.0.9" />
4046
<PackageReference Include="System.Diagnostics.DiagnosticSource" Version="9.0.9" />
4147
<PackageReference Include="System.IO.Pipelines" Version="8.0.0" />
4248
</ItemGroup>
@@ -60,6 +66,7 @@
6066
<ItemGroup>
6167
<None Include="PackageIcon.png" Pack="true" PackagePath="/" Visible="False" />
6268
<None Include="README.md" Pack="true" PackagePath="/" />
69+
<Protobuf Include="Internal/PulsarApi.proto" GrpcServices="None" />
6370
</ItemGroup>
6471

6572
</Project>

src/DotPulsar/Internal/Abstractions/IConnectionPool.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,5 +23,5 @@ public interface IConnectionPool : IAsyncDisposable
2323

2424
ValueTask<uint> GetNumberOfPartitions(string topic, CancellationToken cancellationToken = default);
2525

26-
ValueTask<IEnumerable<string>> GetTopicsOfNamespace(CommandGetTopicsOfNamespace.Mode mode, Regex topicsPattern, CancellationToken cancellationToken = default);
26+
ValueTask<IEnumerable<string>> GetTopicsOfNamespace(CommandGetTopicsOfNamespace.Types.Mode mode, Regex topicsPattern, CancellationToken cancellationToken = default);
2727
}

src/DotPulsar/Internal/Abstractions/IRequest.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,5 +20,5 @@ public interface IRequest : IEquatable<IRequest>
2020
{
2121
bool SenderIsProducer(ulong producerId);
2222
bool SenderIsConsumer(ulong consumerId);
23-
bool IsCommandType(BaseCommand.Type commandType);
23+
bool IsCommandType(BaseCommand.Types.Type commandType);
2424
}

src/DotPulsar/Internal/BatchHandler.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public IMessage<TMessage> Add(MessageIdData messageId, uint redeliveryCount, Mes
4848
{
4949
var singleMetadataSize = data.ReadUInt32(index, true);
5050
index += 4;
51-
var singleMetadata = Serializer.Deserialize<SingleMessageMetadata>(data.Slice(index, singleMetadataSize));
51+
var singleMetadata = SingleMessageMetadata.Parser.ParseFrom(data.Slice(index, singleMetadataSize));
5252
index += singleMetadataSize;
5353
var singleMessageId = new MessageId(messageId.LedgerId, messageId.EntryId, messageId.Partition, i);
5454
var message = _messageFactory.Create(singleMessageId, redeliveryCount, data.Slice(index, singleMetadata.PayloadSize), metadata, singleMetadata);

src/DotPulsar/Internal/ChannelManager.cs

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ public sealed class ChannelManager : IStateHolder<ChannelManagerState>, IDisposa
2626
private readonly RequestResponseHandler _requestResponseHandler;
2727
private readonly IdLookup<IChannel> _consumerChannels;
2828
private readonly IdLookup<IChannel> _producerChannels;
29-
private readonly EnumLookup<BaseCommand.Type, Action<BaseCommand>> _incoming;
29+
private readonly EnumLookup<BaseCommand.Types.Type, Action<BaseCommand>> _incoming;
3030

3131
public IState<ChannelManagerState> State => _stateManager;
3232

@@ -36,11 +36,11 @@ public ChannelManager()
3636
_requestResponseHandler = new RequestResponseHandler();
3737
_consumerChannels = new IdLookup<IChannel>();
3838
_producerChannels = new IdLookup<IChannel>();
39-
_incoming = new EnumLookup<BaseCommand.Type, Action<BaseCommand>>(cmd => _requestResponseHandler.Incoming(cmd));
40-
_incoming.Set(BaseCommand.Type.CloseConsumer, cmd => Incoming(cmd.CloseConsumer));
41-
_incoming.Set(BaseCommand.Type.CloseProducer, cmd => Incoming(cmd.CloseProducer));
42-
_incoming.Set(BaseCommand.Type.ActiveConsumerChange, cmd => Incoming(cmd.ActiveConsumerChange));
43-
_incoming.Set(BaseCommand.Type.ReachedEndOfTopic, cmd => Incoming(cmd.ReachedEndOfTopic));
39+
_incoming = new EnumLookup<BaseCommand.Types.Type, Action<BaseCommand>>(cmd => _requestResponseHandler.Incoming(cmd));
40+
_incoming.Set(BaseCommand.Types.Type.CloseConsumer, cmd => Incoming(cmd.CloseConsumer));
41+
_incoming.Set(BaseCommand.Types.Type.CloseProducer, cmd => Incoming(cmd.CloseProducer));
42+
_incoming.Set(BaseCommand.Types.Type.ActiveConsumerChange, cmd => Incoming(cmd.ActiveConsumerChange));
43+
_incoming.Set(BaseCommand.Types.Type.ReachedEndOfTopic, cmd => Incoming(cmd.ReachedEndOfTopic));
4444
}
4545

4646
public Task<ProducerResponse> Outgoing(CommandProducer command, IChannel channel)
@@ -51,7 +51,7 @@ public Task<ProducerResponse> Outgoing(CommandProducer command, IChannel channel
5151

5252
return response.ContinueWith(result =>
5353
{
54-
if (result.Result.CommandType == BaseCommand.Type.Error)
54+
if (result.Result.Type == BaseCommand.Types.Type.Error)
5555
{
5656
_ = RemoveProducerChannel(producerId);
5757
result.Result.Error.Throw();
@@ -79,7 +79,7 @@ public Task<SubscribeResponse> Outgoing(CommandSubscribe command, IChannel chann
7979

8080
return response.ContinueWith(result =>
8181
{
82-
if (result.Result.CommandType == BaseCommand.Type.Error)
82+
if (result.Result.Type == BaseCommand.Types.Type.Error)
8383
{
8484
_ = RemoveConsumerChannel(consumerId);
8585
result.Result.Error.Throw();
@@ -104,7 +104,7 @@ public Task<BaseCommand> Outgoing(CommandCloseConsumer command)
104104

105105
_ = response.ContinueWith(result =>
106106
{
107-
if (result.Result.CommandType == BaseCommand.Type.Success)
107+
if (result.Result.Type == BaseCommand.Types.Type.Success)
108108
_ = RemoveConsumerChannel(consumerId);
109109
}, TaskContinuationOptions.OnlyOnRanToCompletion);
110110

@@ -124,7 +124,7 @@ public Task<BaseCommand> Outgoing(CommandCloseProducer command)
124124

125125
_ = response.ContinueWith(result =>
126126
{
127-
if (result.Result.CommandType == BaseCommand.Type.Success)
127+
if (result.Result.Type == BaseCommand.Types.Type.Success)
128128
_ = RemoveProducerChannel(producerId);
129129
}, TaskContinuationOptions.OnlyOnRanToCompletion);
130130

@@ -144,7 +144,7 @@ public Task<BaseCommand> Outgoing(CommandUnsubscribe command)
144144

145145
_ = response.ContinueWith(result =>
146146
{
147-
if (result.Result.CommandType == BaseCommand.Type.Success)
147+
if (result.Result.Type == BaseCommand.Types.Type.Success)
148148
RemoveConsumerChannel(consumerId)?.Unsubscribed();
149149
}, TaskContinuationOptions.OnlyOnRanToCompletion);
150150

@@ -191,7 +191,7 @@ public Task<BaseCommand> Outgoing(CommandGetLastMessageId command)
191191
}
192192

193193
public void Incoming(BaseCommand command)
194-
=> _incoming.Get(command.CommandType)(command);
194+
=> _incoming.Get(command.Type)(command);
195195

196196
public void Incoming(CommandMessage command, ReadOnlySequence<byte> data)
197197
=> _consumerChannels[command.ConsumerId]?.Received(new MessagePackage(command.MessageId, command.RedeliveryCount, data));
@@ -266,7 +266,7 @@ private void HandleAdditionalProducerSuccess(CommandProducer command, Action suc
266266
{
267267
_ = _requestResponseHandler.ExpectAdditionalResponse(command).ContinueWith(response =>
268268
{
269-
if (response.IsCanceled || response.IsFaulted || response.Result.CommandType == BaseCommand.Type.Error)
269+
if (response.IsCanceled || response.IsFaulted || response.Result.Type == BaseCommand.Types.Type.Error)
270270
{
271271
_producerChannels[command.ProducerId]?.Disconnected();
272272
return;

src/DotPulsar/Internal/Connection.cs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ namespace DotPulsar.Internal;
2121
using DotPulsar.Internal.Exceptions;
2222
using DotPulsar.Internal.Extensions;
2323
using DotPulsar.Internal.PulsarApi;
24+
using Google.Protobuf;
2425
using System.Buffers;
2526

2627
public sealed class Connection : IConnection
@@ -101,7 +102,7 @@ private async Task Send(CommandAuthResponse command, CancellationToken cancellat
101102
{
102103
command.Response ??= new AuthData();
103104
command.Response.AuthMethodName = _authentication.AuthenticationMethodName;
104-
command.Response.Data = await _authentication.GetAuthenticationData(cancellationToken).ConfigureAwait(false);
105+
command.Response.AuthData_ = ByteString.CopyFrom(await _authentication.GetAuthenticationData(cancellationToken).ConfigureAwait(false));
105106
}
106107

107108
await Send(command.AsBaseCommand(), cancellationToken).ConfigureAwait(false);
@@ -145,7 +146,7 @@ public async Task<BaseCommand> Send(CommandConnect command, CancellationToken ca
145146
if (_authentication is not null)
146147
{
147148
command.AuthMethodName = _authentication.AuthenticationMethodName;
148-
command.AuthData = await _authentication.GetAuthenticationData(cancellationToken).ConfigureAwait(false);
149+
command.AuthData = ByteString.CopyFrom(await _authentication.GetAuthenticationData(cancellationToken).ConfigureAwait(false));
149150
}
150151

151152
Task<BaseCommand>? responseTask;
@@ -259,7 +260,7 @@ public async Task Send(SendPackage command, TaskCompletionSource<BaseCommand> re
259260
}
260261
catch (OperationCanceledException)
261262
{
262-
_ = responseTcs.TrySetCanceled();
263+
_ = responseTcs.TrySetCanceled(CancellationToken.None);
263264
throw;
264265
}
265266
}
@@ -357,17 +358,17 @@ private async Task ProcessIncomingFrames(CancellationToken cancellationToken)
357358
await foreach (var frame in _stream.Frames(cancellationToken).ConfigureAwait(false))
358359
{
359360
var commandSize = frame.ReadUInt32(0, true);
360-
var command = Serializer.Deserialize<BaseCommand>(frame.Slice(4, commandSize));
361+
var command = BaseCommand.Parser.ParseFrom(frame.Slice(4, commandSize));
361362

362-
_pingPongHandler.Incoming(command.CommandType);
363+
_pingPongHandler.Incoming(command.Type);
363364

364-
if (command.CommandType == BaseCommand.Type.Message)
365+
if (command.Type == BaseCommand.Types.Type.Message)
365366
_channelManager.Incoming(command.Message, new ReadOnlySequence<byte>(frame.Slice(commandSize + 4).ToArray()));
366-
else if (command.CommandType == BaseCommand.Type.AuthChallenge)
367+
else if (command.Type == BaseCommand.Types.Type.AuthChallenge)
367368
_ = Task.Factory.StartNew(async () => await Send(new CommandAuthResponse(), cancellationToken).ConfigureAwait(false));
368-
else if (command.CommandType == BaseCommand.Type.Ping)
369+
else if (command.Type == BaseCommand.Types.Type.Ping)
369370
_ = Task.Factory.StartNew(async () => await Send(new CommandPong(), cancellationToken).ConfigureAwait(false));
370-
else if (command.CommandType == BaseCommand.Type.Pong)
371+
else if (command.Type == BaseCommand.Types.Type.Pong)
371372
continue;
372373
else
373374
_channelManager.Incoming(command);

src/DotPulsar/Internal/ConnectionPool.cs

Lines changed: 34 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -72,27 +72,29 @@ public async ValueTask<IConnection> FindConnectionForTopic(string topic, Cancell
7272
var lookup = new CommandLookupTopic
7373
{
7474
Topic = topic,
75-
Authoritative = false,
76-
AdvertisedListenerName = _listenerName
75+
Authoritative = false
7776
};
7877

78+
if (_listenerName is not null)
79+
lookup.AdvertisedListenerName = _listenerName;
80+
7981
var physicalUrl = _serviceUrl;
8082

8183
while (true)
8284
{
8385
var connection = await GetConnection(physicalUrl, cancellationToken).ConfigureAwait(false);
8486
var response = await connection.Send(lookup, cancellationToken).ConfigureAwait(false);
8587

86-
response.Expect(BaseCommand.Type.LookupResponse);
88+
response.Expect(BaseCommand.Types.Type.LookupResponse);
8789

88-
if (response.LookupTopicResponse.Response == CommandLookupTopicResponse.LookupType.Failed)
90+
if (response.LookupTopicResponse.Response == CommandLookupTopicResponse.Types.LookupType.Failed)
8991
response.LookupTopicResponse.Throw();
9092

9193
lookup.Authoritative = response.LookupTopicResponse.Authoritative;
9294

9395
var lookupResponseServiceUrl = new Uri(GetBrokerServiceUrl(response.LookupTopicResponse));
9496

95-
if (response.LookupTopicResponse.Response == CommandLookupTopicResponse.LookupType.Redirect || !response.LookupTopicResponse.Authoritative)
97+
if (response.LookupTopicResponse.Response == CommandLookupTopicResponse.Types.LookupType.Redirect || !response.LookupTopicResponse.Authoritative)
9698
{
9799
physicalUrl = lookupResponseServiceUrl;
98100
continue;
@@ -155,7 +157,7 @@ private async Task<Connection> EstablishNewConnection(PulsarUrl url, Cancellatio
155157
var connection = Connection.Connect(new PulsarStream(stream), _authentication, _keepAliveInterval, _closeInactiveConnectionsInterval);
156158
_ = connection.State.OnStateChangeFrom(ConnectionState.Connected, CancellationToken.None).AsTask().ContinueWith(t => DisposeConnection(url, connection), CancellationToken.None);
157159
var response = await connection.Send(commandConnect, cancellationToken).ConfigureAwait(false);
158-
response.Expect(BaseCommand.Type.Connected);
160+
response.Expect(BaseCommand.Types.Type.Connected);
159161
_connections[url] = connection;
160162
connection.MaxMessageSize = response.Connected.MaxMessageSize;
161163
return connection;
@@ -169,19 +171,28 @@ private async ValueTask DisposeConnection(PulsarUrl serviceUrl, Connection conne
169171

170172
private static CommandConnect WithProxyToBroker(CommandConnect commandConnect, Uri logicalUrl)
171173
{
172-
return new CommandConnect
174+
var command = new CommandConnect
173175
{
174-
AuthData = commandConnect.ShouldSerializeAuthData() ? commandConnect.AuthData : null,
175-
AuthMethod = commandConnect.ShouldSerializeAuthMethod() ? commandConnect.AuthMethod : AuthMethod.AuthMethodNone,
176-
AuthMethodName = commandConnect.ShouldSerializeAuthMethodName() ? commandConnect.AuthMethodName : null,
177176
ClientVersion = commandConnect.ClientVersion,
178-
OriginalPrincipal = commandConnect.ShouldSerializeOriginalPrincipal() ? commandConnect.OriginalPrincipal : null,
179177
ProtocolVersion = commandConnect.ProtocolVersion,
180-
OriginalAuthData = commandConnect.ShouldSerializeOriginalAuthData() ? commandConnect.OriginalAuthData : null,
181-
OriginalAuthMethod = commandConnect.ShouldSerializeOriginalAuthMethod() ? commandConnect.OriginalAuthMethod : null,
182178
ProxyToBrokerUrl = $"{logicalUrl.Host}:{logicalUrl.Port}",
183179
FeatureFlags = commandConnect.FeatureFlags
184180
};
181+
182+
if (commandConnect.HasAuthData)
183+
command.AuthData = commandConnect.AuthData;
184+
if (commandConnect.HasAuthMethod)
185+
command.AuthMethod = commandConnect.AuthMethod;
186+
if (commandConnect.HasAuthMethodName)
187+
command.AuthMethodName = commandConnect.AuthMethodName;
188+
if (command.HasOriginalPrincipal)
189+
command.OriginalPrincipal = commandConnect.OriginalPrincipal;
190+
if (command.HasOriginalAuthData)
191+
command.OriginalAuthData = commandConnect.OriginalAuthData;
192+
if (command.HasOriginalAuthMethod)
193+
command.OriginalAuthMethod = commandConnect.OriginalAuthMethod;
194+
195+
return command;
185196
}
186197

187198
private sealed class PulsarUrl : IEquatable<PulsarUrl>
@@ -226,15 +237,15 @@ public async ValueTask<uint> GetNumberOfPartitions(string topic, CancellationTok
226237
var commandPartitionedMetadata = new CommandPartitionedTopicMetadata { Topic = topic };
227238
var response = await connection.Send(commandPartitionedMetadata, cancellationToken).ConfigureAwait(false);
228239

229-
response.Expect(BaseCommand.Type.PartitionedMetadataResponse);
240+
response.Expect(BaseCommand.Types.Type.PartitionedMetadataResponse);
230241

231-
if (response.PartitionMetadataResponse.Response == CommandPartitionedTopicMetadataResponse.LookupType.Failed)
242+
if (response.PartitionMetadataResponse.Response == CommandPartitionedTopicMetadataResponse.Types.LookupType.Failed)
232243
response.PartitionMetadataResponse.Throw();
233244

234245
return response.PartitionMetadataResponse.Partitions;
235246
}
236247

237-
public async ValueTask<IEnumerable<string>> GetTopicsOfNamespace(CommandGetTopicsOfNamespace.Mode mode, Regex topicsPattern, CancellationToken cancellationToken = default)
248+
public async ValueTask<IEnumerable<string>> GetTopicsOfNamespace(CommandGetTopicsOfNamespace.Types.Mode mode, Regex topicsPattern, CancellationToken cancellationToken = default)
238249
{
239250
var topicUriPattern = new Regex(@"^(persistent|non-persistent)://([^/]+)/([^/]+)/(.+)$", RegexOptions.Compiled);
240251

@@ -251,29 +262,29 @@ public async ValueTask<IEnumerable<string>> GetTopicsOfNamespace(CommandGetTopic
251262
if (!string.IsNullOrEmpty(persistence))
252263
{
253264
if (persistence.Equals("persistent"))
254-
mode = CommandGetTopicsOfNamespace.Mode.Persistent;
265+
mode = CommandGetTopicsOfNamespace.Types.Mode.Persistent;
255266
else
256-
mode = CommandGetTopicsOfNamespace.Mode.NonPersistent;
267+
mode = CommandGetTopicsOfNamespace.Types.Mode.NonPersistent;
257268
}
258269

259270
var getTopicsOfNamespace = new CommandGetTopicsOfNamespace
260271
{
261-
mode = mode,
272+
Mode = mode,
262273
Namespace = $"{tenant}/{ns}",
263274
TopicsPattern = patternString
264275
};
265276

266277
var connection = await GetConnection(_serviceUrl, cancellationToken).ConfigureAwait(false);
267278
var response = await connection.Send(getTopicsOfNamespace, cancellationToken).ConfigureAwait(false);
268279

269-
response.Expect(BaseCommand.Type.GetTopicsOfNamespaceResponse);
280+
response.Expect(BaseCommand.Types.Type.GetTopicsOfNamespaceResponse);
270281

271-
if (response.getTopicsOfNamespaceResponse.Filtered)
272-
return response.getTopicsOfNamespaceResponse.Topics;
282+
if (response.GetTopicsOfNamespaceResponse.Filtered)
283+
return response.GetTopicsOfNamespaceResponse.Topics;
273284

274285
var topics = new List<string>();
275286

276-
foreach (var topic in response.getTopicsOfNamespaceResponse.Topics)
287+
foreach (var topic in response.GetTopicsOfNamespaceResponse.Topics)
277288
{
278289
if (topicsPattern.Match(topic).Success)
279290
topics.Add(topic);

0 commit comments

Comments
 (0)