using System.Net; using System.Text; using System.Text.Json; using Microsoft.Extensions.Logging; using RabbitMQ.Stream.Client; using RabbitMQ.Stream.Client.AMQP; using RabbitMQ.Stream.Client.Reliable; namespace Rmq.Stream.Consumer; public static class GettingStarted { public static async Task Start() { // The Logger is not mandatory but it is very useful to understand what is going on. // The logger is a Microsoft.Extensions.Logging.ILogger // In this example we are using the Microsoft.Extensions.Logging.Console package. // Microsoft.Extensions.Logging.Console is NOT shipped with the client. // You can use any logger you want. var factory = LoggerFactory.Create(builder => { builder.AddSimpleConsole(); builder.AddFilter("RabbitMQ.Stream", LogLevel.Information); }); // Define the logger for the StreamSystem and the Consumer var consumerLogger = factory.CreateLogger(); var streamLogger = factory.CreateLogger(); // Create the StreamSystem var streamSystem = await StreamSystem.Create( new StreamSystemConfig() { UserName = "guest", Password = "guest", Endpoints = new List() { new IPEndPoint(IPAddress.Loopback, 5552) }, }, streamLogger ).ConfigureAwait(false); // Create a stream const string StreamName = "my-stream"; await streamSystem.CreateStream( new StreamSpec(StreamName) { MaxAge = TimeSpan.FromSeconds(20), MaxSegmentSizeBytes = 50 }).ConfigureAwait(false); var consumerTaskCompletionSource = new TaskCompletionSource(); const int MessageCount = 1; var consumerCount = 0; SampleMessage sampleMessage; // Create a consumer Console.WriteLine("Starting consuming..."); var consumer = await RabbitMQ.Stream.Client.Reliable.Consumer.Create( new ConsumerConfig(streamSystem, StreamName) { OffsetSpec = new OffsetTypeFirst(), MessageHandler = async (sourceStream, consumer, messageContext, message) => { await consumer.StoreOffset(messageContext.Offset).ConfigureAwait(false); string msg = EncodingExtensions.GetString(Encoding.UTF8, message.Data.Contents); sampleMessage = JsonSerializer.Deserialize(msg); Console.WriteLine($"Received Message - {sampleMessage.Code}"); Console.WriteLine($" {sampleMessage.Name}"); Console.WriteLine($" {sampleMessage.Content}"); Console.WriteLine(""); if (Interlocked.Increment(ref consumerCount) == MessageCount) { Console.WriteLine("*********************************"); Console.WriteLine($"All the {MessageCount} messages are received"); Console.WriteLine("*********************************"); consumerTaskCompletionSource.SetResult(MessageCount); } await Task.CompletedTask.ConfigureAwait(false); } }, consumerLogger ) .ConfigureAwait(false); RecieveMessage(); consumerTaskCompletionSource.Task.Wait(); } private static void RecieveMessage() { Console.ReadLine(); RecieveMessage(); } }