Skip to content

Commit f564d15

Browse files
committed
add error
max ErrMaxReconnectAttemptsReached Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 25962ec commit f564d15

File tree

4 files changed

+27
-4
lines changed

4 files changed

+27
-4
lines changed

docs/examples/reliable/reliable.go

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,19 @@ func main() {
1616
var stateAccepted int32
1717
var stateReleased int32
1818
var stateRejected int32
19+
var isRunning bool
1920

2021
var received int32
2122
var failed int32
2223

2324
startTime := time.Now()
25+
isRunning = true
2426
go func() {
25-
for {
27+
for isRunning {
2628
time.Sleep(5 * time.Second)
2729
total := stateAccepted + stateReleased + stateRejected
2830
messagesPerSecond := float64(total) / time.Since(startTime).Seconds()
2931
rmq.Info("[Stats]", "sent", total, "received", received, "failed", failed, "messagesPerSecond", messagesPerSecond)
30-
3132
}
3233
}()
3334

@@ -41,6 +42,16 @@ func main() {
4142
switch statusChanged.To.(type) {
4243
case *rmq.StateOpen:
4344
signalBlock.Broadcast()
45+
case *rmq.StateReconnecting:
46+
rmq.Info("[connection]", "Reconnecting to the AMQP 1.0 server")
47+
case *rmq.StateClosed:
48+
StateClosed := statusChanged.To.(*rmq.StateClosed)
49+
if errors.Is(StateClosed.GetError(), rmq.ErrMaxReconnectAttemptsReached) {
50+
rmq.Error("[connection]", "Max reconnect attempts reached. Closing connection", StateClosed.GetError())
51+
signalBlock.Broadcast()
52+
isRunning = false
53+
}
54+
4455
}
4556
}
4657
}(stateChanged)
@@ -130,6 +141,10 @@ func main() {
130141
go func() {
131142
defer wg.Done()
132143
for i := 0; i < 500_000; i++ {
144+
if !isRunning {
145+
rmq.Info("[Publisher]", "Publisher is stopped simulation not running, queue", queueName)
146+
return
147+
}
133148
publishResult, err := publisher.Publish(context.Background(), rmq.NewMessage([]byte("Hello, World!"+fmt.Sprintf("%d", i))))
134149
if err != nil {
135150
// here you need to deal with the error. You can store the message in a local in memory/persistent storage

pkg/rabbitmqamqp/amqp_binding.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,8 @@ func (b *AMQPBinding) Bind(ctx context.Context) (string, error) {
5858
kv[destination] = b.destinationName
5959
kv["arguments"] = make(map[string]any)
6060
_, err := b.management.Request(ctx, kv, path, commandPost, []int{responseCode204})
61-
bindingPathWithExchangeQueueKey := bindingPathWithExchangeQueueKey(b.toQueue, b.sourceName, b.destinationName, b.bindingKey)
62-
return bindingPathWithExchangeQueueKey, err
61+
bindingPathWithExchangeQueueAndKey := bindingPathWithExchangeQueueKey(b.toQueue, b.sourceName, b.destinationName, b.bindingKey)
62+
return bindingPathWithExchangeQueueAndKey, err
6363
}
6464

6565
// Unbind removes a binding between an exchange and a queue or exchange

pkg/rabbitmqamqp/amqp_connection.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -350,6 +350,10 @@ func (a *AmqpConnection) maybeReconnect() {
350350
Error("Reconnection attempt failed", "attempt", attempt, "error", err, "ID", a.Id())
351351
}
352352

353+
// If we reach here, all attempts failed
354+
Error("All reconnection attempts failed, closing connection", "ID", a.Id())
355+
a.lifeCycle.SetState(&StateClosed{error: ErrMaxReconnectAttemptsReached})
356+
353357
}
354358

355359
// restartEntities attempts to restart all publishers and consumers after a reconnection

pkg/rabbitmqamqp/amqp_connection_recovery.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
11
package rabbitmqamqp
22

33
import (
4+
"errors"
45
"sync"
56
"time"
67
)
78

9+
// ErrMaxReconnectAttemptsReached typed error when the MaxReconnectAttempts is reached
10+
var ErrMaxReconnectAttemptsReached = errors.New("max reconnect attempts reached, connection will not be recovered")
11+
812
type RecoveryConfiguration struct {
913
/*
1014
ActiveRecovery Define if the recovery is activated.

0 commit comments

Comments
 (0)