Skip to content

Commit f68b9cd

Browse files
committed
feat(rx): add backoff-retry operator (#156)
1 parent bc0cdf5 commit f68b9cd

File tree

7 files changed

+129
-1
lines changed

7 files changed

+129
-1
lines changed

.vscode/settings.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
"Assistable",
88
"binaryheap",
99
"bodyclose",
10+
"cenkalti",
1011
"cmds",
1112
"coverpkg",
1213
"coverprofile",

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ require (
2222
)
2323

2424
require (
25+
github.com/cenkalti/backoff/v4 v4.3.0
2526
github.com/emirpasic/gods v1.18.1
2627
github.com/go-logr/logr v1.4.1 // indirect
2728
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ github.com/BurntSushi/toml v1.3.2 h1:o7IhLm0Msx3BaB+n3Ag7L8EVlByGnpq14C4YWiu/gL8
22
github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
33
github.com/avfs/avfs v0.33.0 h1:5WQXbUbr6VS7aani39ZN2Vrd/s3wLnyih1Sc4ExWTxs=
44
github.com/avfs/avfs v0.33.0/go.mod h1:Q59flcFRYe9KYkNMfrLUJney3yeKGQpcWRyxsDBW7vI=
5+
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
6+
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
57
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
68
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
79
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package rx_test
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
"github.com/cenkalti/backoff/v4"
8+
"github.com/fortytw2/leaktest"
9+
. "github.com/onsi/ginkgo/v2" //nolint:revive // ginkgo ok
10+
11+
"github.com/snivilised/lorax/rx"
12+
)
13+
14+
var _ = Describe("Observable operator", func() {
15+
Context("Backoff Retry", func() {
16+
Context("principle", func() {
17+
It("🧪 should: succeed after retry within max retries", func() {
18+
// rxgo: Test_Observable_BackOffRetry
19+
defer leaktest.Check(GinkgoT())()
20+
21+
ctx, cancel := context.WithCancel(context.Background())
22+
defer cancel()
23+
24+
i := 0
25+
backOffCfg := backoff.NewExponentialBackOff()
26+
backOffCfg.InitialInterval = time.Nanosecond
27+
obs := rx.Defer([]rx.Producer[int]{func(_ context.Context, next chan<- rx.Item[int]) {
28+
next <- rx.Of(1)
29+
next <- rx.Of(2)
30+
if i == 2 {
31+
next <- rx.Of(3)
32+
} else {
33+
i++
34+
next <- rx.Error[int](errFoo)
35+
}
36+
}}).BackOffRetry(backoff.WithMaxRetries(backOffCfg, 3))
37+
rx.Assert(ctx, obs,
38+
rx.HasItems[int]{
39+
Expected: []int{1, 2, 1, 2, 1, 2, 3},
40+
},
41+
rx.HasNoError[int]{},
42+
)
43+
})
44+
})
45+
46+
Context("Errors", func() {
47+
Context("given: foo", func() {
48+
It("🧪 should: fail after max retries exceeded", func() {
49+
// rxgo: Test_Observable_BackOffRetry_Error
50+
defer leaktest.Check(GinkgoT())()
51+
52+
ctx, cancel := context.WithCancel(context.Background())
53+
defer cancel()
54+
55+
backOffCfg := backoff.NewExponentialBackOff()
56+
backOffCfg.InitialInterval = time.Nanosecond
57+
obs := rx.Defer([]rx.Producer[int]{func(_ context.Context, next chan<- rx.Item[int]) {
58+
next <- rx.Of(1)
59+
next <- rx.Of(2)
60+
next <- rx.Error[int](errFoo)
61+
}}).BackOffRetry(backoff.WithMaxRetries(backOffCfg, 3))
62+
rx.Assert(ctx, obs,
63+
rx.HasItems[int]{
64+
Expected: []int{1, 2, 1, 2, 1, 2, 1, 2},
65+
},
66+
rx.HasError[int]{
67+
Expected: []error{errFoo},
68+
},
69+
)
70+
})
71+
})
72+
})
73+
})
74+
})

rx/observable-operator.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"context"
55
"fmt"
66
"reflect"
7+
8+
"github.com/cenkalti/backoff/v4"
79
)
810

911
func isZero[T any](limit T) bool {
@@ -134,6 +136,52 @@ func (op *averageOperator[T]) gatherNext(_ context.Context, item Item[T],
134136
panic("averageOperator.gatherNext NOT-IMPL")
135137
}
136138

139+
// BackOffRetry implements a backoff retry if a source Observable sends an error,
140+
// resubscribe to it in the hopes that it will complete without error.
141+
// Cannot be run in parallel.
142+
func (o *ObservableImpl[T]) BackOffRetry(backOffCfg backoff.BackOff, opts ...Option[T]) Observable[T] {
143+
option := parseOptions(opts...)
144+
next := option.buildChannel()
145+
ctx := option.buildContext(o.parent)
146+
f := func() error {
147+
observe := o.Observe(opts...)
148+
149+
for {
150+
select {
151+
case <-ctx.Done():
152+
close(next)
153+
154+
return nil
155+
case i, ok := <-observe:
156+
if !ok {
157+
return nil
158+
}
159+
160+
if i.IsError() {
161+
return i.E
162+
}
163+
164+
i.SendContext(ctx, next)
165+
}
166+
}
167+
}
168+
169+
go func() {
170+
if err := backoff.Retry(f, backOffCfg); err != nil {
171+
Error[T](err).SendContext(ctx, next)
172+
close(next)
173+
174+
return
175+
}
176+
177+
close(next)
178+
}()
179+
180+
return &ObservableImpl[T]{
181+
iterable: newChannelIterable(next),
182+
}
183+
}
184+
137185
// Connect instructs a connectable Observable to begin emitting items to its subscribers.
138186
func (o *ObservableImpl[T]) Connect(ctx context.Context) (context.Context, Disposable) {
139187
ctx, cancel := context.WithCancel(ctx)

rx/observable.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,15 @@ import (
55
"sync"
66
"sync/atomic"
77

8+
"github.com/cenkalti/backoff/v4"
89
"github.com/emirpasic/gods/trees/binaryheap"
910
)
1011

1112
type Observable[T any] interface {
1213
Iterable[T]
1314
All(predicate Predicate[T], opts ...Option[T]) Single[T]
1415
Average(calc Calculator[T], opts ...Option[T]) Single[T]
16+
BackOffRetry(backOffCfg backoff.BackOff, opts ...Option[T]) Observable[T]
1517
Connect(ctx context.Context) (context.Context, Disposable)
1618

1719
Max(comparator Comparator[T], initLimit InitLimit[T], opts ...Option[T]) OptionalSingle[T]

rx/operator-skeleton_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import (
99
)
1010

1111
var _ = Describe("Observable operator", func() {
12-
Context("${{OPERATOR-NAME}}", func() {
12+
XContext("${{OPERATOR-NAME}}", func() {
1313
Context("principle", func() {
1414
// success path
1515
It("🧪 should: ", func() {

0 commit comments

Comments
 (0)