This bundle is extension to RabbitMqBundle, it will automatically add DLQ queues to existing multiple_consumer in old_sound_rabbit_mq.yaml
multiple_consumers:
default:
connection: default
exchange_options:
name: 'exchange'
type: 'topic'
graceful_max_execution:
timeout: 60
queues:
legacy.investments.investment_added.event:
name: 'legacy.investments.investment_added.event'
routing_key: 'legacy.investments.investment_added.event'
callback: Namespace\InvestmentAddedLegacyConsumer
legacy.investments.investment_edited.event:
name: 'legacy.investments.investment_edited.event'
routing_key: 'legacy.investments.investment_edited.event'
callback: Namespace\InvestmentEditedLegacyConsumerAfter that configuration you will have 2 additional DLQ queues with routing keys:
- legacy.investments.investment_added.retry
- legacy.investments.investment_edited.retry
Each *.retry queue will re-route all messages back to original queue after 30s delay.
To put message to *.retry queue you just need to throw any Exception when parsing message.
You are creating consumer like in example above - by adding callback. This callback MUST extends AbstractMessageConsumer.
That is it! If everything is OK, just leave it.
If there was any problem, then throw Exception.
Just inject MessageProducerInterface to your service where you need to produce message.
Then create class with extends AbstractMessage or implements MessageInterface.
<?php
declare(strict_types=1);
use MKoprek\RabbitmqDlqBundle\Message\AbstractMessage;
class Message extends AbstractMessage
{
public const ROUTING_KEY = 'legacy.investments.investment_added.event';
public function __construct(array $array)
{
$this->payload = [
'id' => '7186971d-1b63-46ba-9804-012e8477d370',
'name' => 'Lorem Ipsum',
'array' => $array,
];
}
}<?php
declare(strict_types=1);
use MKoprek\RabbitmqDlqBundle\Producer\MessageProducerInterface;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
class ProduceMessage
{
public function __construct(private MessageProducerInterface $producer)
{
}
protected function produce(InputInterface $input, OutputInterface $output): void
{
$this->producer->produce(
new Message(['some_key' => 'some_val'])
);
}
}