using System.Net; using System.Text; using System.Text.Json; using Microsoft.Extensions.Logging; using RabbitMQ.Stream.Client; using RabbitMQ.Stream.Client.AMQP; using RabbitMQ.Stream.Client.Reliable; namespace Rmq.Stream.Producer; public static class GettingStarted { public static async Task Start() { // The Logger is not mandatory but it is very useful to understand what is going on. // The logger is a Microsoft.Extensions.Logging.ILogger // In this example we are using the Microsoft.Extensions.Logging.Console package. // Microsoft.Extensions.Logging.Console is NOT shipped with the client. // You can use any logger you want. var factory = LoggerFactory.Create(builder => { builder.AddSimpleConsole(); builder.AddFilter("RabbitMQ.Stream", LogLevel.Information); }); // Define the logger for the StreamSystem and the Producer var producerLogger = factory.CreateLogger(); var streamLogger = factory.CreateLogger(); // Create the StreamSystem var streamSystem = await StreamSystem.Create( new StreamSystemConfig() { UserName = "guest", Password = "guest", Endpoints = new List() { new IPEndPoint(IPAddress.Loopback, 5552) }, }, streamLogger ).ConfigureAwait(false); // Create a stream const string StreamName = "my-stream"; await streamSystem.CreateStream( new StreamSpec(StreamName) { MaxAge = TimeSpan.FromSeconds(20), MaxSegmentSizeBytes = 50 }).ConfigureAwait(false); // Create a producer var confirmationTaskCompletionSource = new TaskCompletionSource(); var confirmationCount = 0; int MessageCount = 1; var producer = await RabbitMQ.Stream.Client.Reliable.Producer.Create( new ProducerConfig(streamSystem, StreamName) { ConfirmationHandler = async confirmation => { Interlocked.Increment(ref confirmationCount); // here you can handle the confirmation switch (confirmation.Status) { case ConfirmationStatus.Confirmed: // all the messages received here are confirmed if (confirmationCount == MessageCount) { Console.WriteLine("*********************************"); Console.WriteLine($"All the {MessageCount} messages are confirmed"); Console.WriteLine("*********************************"); } break; case ConfirmationStatus.StreamNotAvailable: case ConfirmationStatus.InternalError: case ConfirmationStatus.AccessRefused: case ConfirmationStatus.PreconditionFailed: case ConfirmationStatus.PublisherDoesNotExist: case ConfirmationStatus.UndefinedError: case ConfirmationStatus.ClientTimeoutError: Console.WriteLine( $"Message {confirmation.PublishingId} failed with {confirmation.Status}"); break; default: throw new ArgumentOutOfRangeException(); } if (confirmationCount == MessageCount) { confirmationTaskCompletionSource.SetResult(MessageCount); } await Task.CompletedTask.ConfigureAwait(false); } }, producerLogger ) .ConfigureAwait(false); // Send 100 messages Console.WriteLine("Starting publishing..."); PublishMessage(producer, MessageCount); confirmationTaskCompletionSource.Task.Wait(); //Console.WriteLine("Press any key to exit"); } private static async Task PublishMessage(RabbitMQ.Stream.Client.Reliable.Producer producer, int MessageCount) { for (int i=0; i < MessageCount; i++) { var message = new SampleMessage() { Code = $"P0{i}", Name = $"P0{i} - Sample", Content = $"Sample text message - {i}" }; var data = JsonSerializer.Serialize(message); var body = Encoding.UTF8.GetBytes(data); var count = ASCIIEncoding.ASCII.GetByteCount(data); await producer.Send(new Message(body) { ApplicationProperties = new ApplicationProperties() { { "key1", "value1" }, { "key2", "value2" } }, Properties = new Properties() { MessageId = "message-id", CorrelationId = "correlation-id", ContentType = "application/json", ContentEncoding = "utf-8", } }).ConfigureAwait(false); } MessageCount = Convert.ToInt32(Console.ReadLine()); PublishMessage(producer, MessageCount); } }