Skip to content

txix-open/grmq

Repository files navigation

GRMQ

Go Rabbit MQ

Build and test codecov Go Report Card

What are the typical use-cases for RabbitMQ broker ?

  • We create a durable topology (exchanges, queues, bindings).
  • Begin queue consuming (commonly in several goroutines with prefetch count) and use DLQ to avoid poison messages.
  • If we can't handle message at this time, we can retry a bit later (some external service is not available for instance)
  • Also, we expect that if something happens with connection, we can reestablish it and continue our work transparently.
  • Graceful shutdown to reduce probability of message duplication.

All of those commonly used cases are implemented in the package.

High abstraction wrapper for amqp091-go. Inspired by http package and cony

Features

  • re-connection support
  • graceful shutdown support
  • flexible context.Context based api
  • middlewares for publishers and consumers
  • DLQ declaration out of the box
  • flexible retries

Complete Example

type LogObserver struct {
	grmq.NoopObserver
}

func (o LogObserver) ClientError(err error) {
	log.Printf("rmq client error: %v", err)
}

func (o LogObserver) ConsumerError(consumer consumer.Consumer, err error) {
	log.Printf("unexpected consumer error (queue=%s): %v", consumer.Queue, err)
}

func main() {
	url := amqpUrl()

	pub := publisher.New(
		"exchange",
		"test",
		publisher.WithMiddlewares(publisher.PersistentMode()),
	)

	simpleHandler := consumer.HandlerFunc(func(ctx context.Context, delivery *consumer.Delivery) {
		log.Printf("message body: %s, queue: %s", delivery.Source().Body, delivery.Source().RoutingKey)
		err := delivery.Ack()
		if err != nil {
			panic(err)
		}
	})
	simpleConsumer := consumer.New(
		simpleHandler,
		"queue",
		consumer.WithConcurrency(32),   //default 1
		consumer.WithPrefetchCount(32), //default 1
	)

	retryPolicy := retry.NewPolicy(
		true, //move to dlq after last failed try
		retry.WithDelay(500*time.Millisecond, 1),
		retry.WithDelay(1*time.Second, 1),
		retry.WithDelay(2*time.Second, 1),
	)
	retryHandler := consumer.HandlerFunc(func(ctx context.Context, delivery *consumer.Delivery) {
		log.Printf("message body: %s, queue: %s", delivery.Source().Body, delivery.Source().RoutingKey)
		err := delivery.Retry()
		if err != nil {
			panic(err)
		}
	})
	retryConsumer := consumer.New(
		retryHandler,
		"retryQueue",
		consumer.WithRetryPolicy(retryPolicy),
	)

	cli := grmq.New(
		url,
		grmq.WithPublishers(pub),
		grmq.WithConsumers(simpleConsumer, retryConsumer),
		grmq.WithTopologyBuilding(
			topology.WithQueue("queue", topology.WithDLQ(true)),
			//you MUST declare queue with the same retry policy
			topology.WithQueue("retryQueue", topology.WithRetryPolicy(retryPolicy)),
			topology.WithDirectExchange("exchange"),
			topology.WithBinding("exchange", "queue", "test"),
		),
		grmq.WithReconnectTimeout(3*time.Second), //default 1s
		grmq.WithObserver(LogObserver{}),
	)
	//it tries to connect
	//declare topology
	//init publishers and consumers
	//returns first occurred error or nil 
	//or you can use  cli.Serve(context.Background()), which is completely non-blocking
	err := cli.Run(context.Background())
	if err != nil {
		panic(err)
	}

	err = pub.Publish(context.Background(), &amqp091.Publishing{Body: []byte("hello world")})
	if err != nil {
		panic(err)
	}

	//you may use any publisher to send message to any exchange
	err = pub.PublishTo(context.Background(), "", "retryQueue", &amqp091.Publishing{Body: []byte("retry me")})
	if err != nil {
		panic(err)
	}

	time.Sleep(10 * time.Second)

	cli.Shutdown()
}

Retries

This is quite fresh feature implemented in 1.4.0. Before using it you must know how it works under the hood. It combines two mechanisms: DLQ + TTL

Lets say we use policy below for queue test

retryPolicy := retry.NewPolicy(
	true,
	retry.WithDelay(500*time.Millisecond, 1),
	retry.WithDelay(1*time.Second, 1),
	retry.WithDelay(2*time.Second, 1), 
)

This configuration will create

  • exchange with name default-dead-letter
  • 4 extra queues
    • test.DLQ
    • test.retry.500
    • test.retry.1000
    • test.retry.2000
  • each retry queue will have x-message-ttl property equal to its delay
  • each retry queue will have DLX routing to the original queue test
  • consumer.Delivery.Retry() will find a suitable queue by grmq-retry-count header(0 by default), increment grmq-retry-count header, directly publish with confirmation to the queue, manually acknowledge the delivery
  • if there is no suitable retry option and moveToDql is true, it moves the message to test.DLQ
  • otherwise, it performs ack

Recommendation: If you want to change retry policy for a queue, before doing it, ensure there is no messages in retry queues.

Don't forget to delete old retry queues.

State and road map

  • the package is used in production (reconnection works perfect)
  • more tests need to be implemented
  • add go doc
  • add supporting for publishing confirmation to achieve more reliable publishing

About

High abstraction wrapper for Golang Rabbit MQ Client

Topics

Resources

License

Code of conduct

Stars

Watchers

Forks

Packages

No packages published

Contributors 6

Languages