Skip to content

Commit 25a89b8

Browse files
committed
feat(rx): add flatmap, foreach, ignore-elements, groupby and last operators (#178)
feat(rx): add flatmap operator (#178) feat(rx): add foreach operator (#178) feat(rx): add groupby operator along with toslice (#178) feat(rx): add ignore-elements operator (#178) feat(rx): add last operator (#178)
1 parent e087538 commit 25a89b8

13 files changed

+1129
-9
lines changed

enums/item.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,11 @@ const (
3232
// ItemDiscTick enum value that represents a TickValue value.
3333
//
3434
ItemDiscTickValue
35+
36+
// ItemDiscOpaque enum value that can be used to represent anything,
37+
// typically a value that is not of type T or any of the other scalar
38+
// types already catered for.
39+
ItemDiscOpaque
3540
)
3641

3742
type (
@@ -49,5 +54,6 @@ func init() {
4954
ItemDiscNumeric: "numeric",
5055
ItemDiscPulse: "pulse",
5156
ItemDiscTickValue: "tick",
57+
ItemDiscOpaque: "opaque",
5258
}
5359
}

rx/item.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ type (
1818
C chan<- Item[T]
1919
N int
2020
B bool
21+
O any
2122
disc enums.ItemDiscriminator
2223
}
2324

@@ -115,6 +116,13 @@ func False[T any]() Item[T] {
115116
}
116117
}
117118

119+
func Opaque[T any](o any) Item[T] {
120+
return Item[T]{
121+
O: o,
122+
disc: enums.ItemDiscOpaque,
123+
}
124+
}
125+
118126
// SendItems is a utility function that sends a list of items and indicates a
119127
// strategy on whether to close the channel once the function completes.
120128
func SendItems[T any](ctx context.Context,
@@ -223,6 +231,11 @@ func (i Item[T]) IsBoolean() bool {
223231
return (i.disc & enums.ItemDiscBoolean) > 0
224232
}
225233

234+
// IsOpaque checks if an item is an opaque value.
235+
func (i Item[T]) IsOpaque() bool {
236+
return (i.disc & enums.ItemDiscOpaque) > 0
237+
}
238+
226239
func (i Item[T]) Desc() string {
227240
return enums.ItemDescriptions[i.disc]
228241
}

rx/iterable-slice.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package rx
2+
3+
type sliceIterable[T any] struct {
4+
items []Item[T]
5+
opts []Option[T]
6+
}
7+
8+
func newSliceIterable[T any](items []Item[T], opts ...Option[T]) Iterable[T] {
9+
return &sliceIterable[T]{
10+
items: items,
11+
opts: opts,
12+
}
13+
}
14+
15+
func (i *sliceIterable[T]) Observe(opts ...Option[T]) <-chan Item[T] {
16+
option := parseOptions(append(i.opts, opts...)...)
17+
next := option.buildChannel()
18+
ctx := option.buildContext(emptyContext)
19+
20+
go func() {
21+
for _, item := range i.items {
22+
select {
23+
case <-ctx.Done():
24+
return
25+
case next <- item:
26+
}
27+
}
28+
29+
close(next)
30+
}()
31+
32+
return next
33+
}

rx/observable-operator-do-on_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"github.com/fortytw2/leaktest"
77
. "github.com/onsi/ginkgo/v2" //nolint:revive // ginkgo ok
88
. "github.com/onsi/gomega" //nolint:revive // gomega ok
9+
"github.com/snivilised/lorax/rx"
910
)
1011

1112
var _ = Describe("Observable operator", func() {
@@ -95,8 +96,8 @@ var _ = Describe("Observable operator", func() {
9596
defer cancel()
9697

9798
s := make([]int, 0)
98-
<-testObservable[int](ctx, 1, 2, 3).DoOnNext(func(i int) {
99-
s = append(s, i)
99+
<-testObservable[int](ctx, 1, 2, 3).DoOnNext(func(item rx.Item[int]) {
100+
s = append(s, item.V)
100101
})
101102

102103
Expect(s).To(ContainElements([]int{1, 2, 3}))
@@ -113,8 +114,8 @@ var _ = Describe("Observable operator", func() {
113114
defer cancel()
114115

115116
s := make([]int, 0)
116-
<-testObservable[int](ctx, 1, errFoo, 3).DoOnNext(func(i int) {
117-
s = append(s, i)
117+
<-testObservable[int](ctx, 1, errFoo, 3).DoOnNext(func(item rx.Item[int]) {
118+
s = append(s, item.V)
118119
})
119120

120121
Expect(s).To(ContainElements([]int{1}))
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
package rx_test
2+
3+
import (
4+
"context"
5+
6+
"github.com/fortytw2/leaktest"
7+
. "github.com/onsi/ginkgo/v2" //nolint:revive // ginkgo ok
8+
"github.com/snivilised/lorax/rx"
9+
)
10+
11+
var _ = Describe("Observable operator", func() {
12+
Context("FlatMap", func() {
13+
When("principle", func() {
14+
It("🧪 should: ", func() {
15+
// rxgo: Test_Observable_FlatMap
16+
defer leaktest.Check(GinkgoT())()
17+
18+
ctx, cancel := context.WithCancel(context.Background())
19+
defer cancel()
20+
21+
obs := testObservable[int](ctx, 1, 2, 3).FlatMap(func(item rx.Item[int]) rx.Observable[int] {
22+
return testObservable[int](ctx, item.V+1, item.V*10)
23+
})
24+
rx.Assert[int](ctx, obs, rx.HasItems[int]{
25+
Expected: []int{2, 10, 3, 20, 4, 30},
26+
})
27+
})
28+
})
29+
30+
Context("Errors", func() {
31+
When("foo", func() {
32+
It("🧪 should: ", func() {
33+
// rxgo: Test_Observable_FlatMap_Error1
34+
defer leaktest.Check(GinkgoT())()
35+
36+
ctx, cancel := context.WithCancel(context.Background())
37+
defer cancel()
38+
39+
obs := testObservable[int](ctx, 1, 2, 3).FlatMap(func(i rx.Item[int]) rx.Observable[int] {
40+
if i.V == 2 {
41+
return testObservable[int](ctx, errFoo)
42+
}
43+
return testObservable[int](ctx, i.V+1, i.V*10)
44+
})
45+
rx.Assert[int](ctx, obs, rx.HasItems[int]{
46+
Expected: []int{2, 10},
47+
}, rx.HasError[int]{
48+
Expected: []error{errFoo},
49+
})
50+
})
51+
})
52+
53+
When("foo", func() {
54+
It("🧪 should: ", func() {
55+
// rxgo: Test_Observable_FlatMap_Error2
56+
defer leaktest.Check(GinkgoT())()
57+
58+
ctx, cancel := context.WithCancel(context.Background())
59+
defer cancel()
60+
61+
obs := testObservable[int](ctx, 1, errFoo, 3).FlatMap(func(i rx.Item[int]) rx.Observable[int] {
62+
if i.IsError() {
63+
return testObservable[int](ctx, 0)
64+
}
65+
return testObservable[int](ctx, i.V+1, i.V*10)
66+
})
67+
rx.Assert[int](ctx, obs,
68+
rx.HasItems[int]{
69+
Expected: []int{2, 10, 0, 4, 30},
70+
}, rx.HasNoError[int]{},
71+
)
72+
})
73+
})
74+
})
75+
76+
Context("Parallel", func() {
77+
When("foo", func() {
78+
It("🧪 should: ", func() {
79+
// rxgo: Test_Observable_FlatMap_Parallel
80+
defer leaktest.Check(GinkgoT())()
81+
82+
ctx, cancel := context.WithCancel(context.Background())
83+
defer cancel()
84+
85+
obs := testObservable[int](ctx, 1, 2, 3).FlatMap(func(i rx.Item[int]) rx.Observable[int] {
86+
return testObservable[int](ctx, i.V+1, i.V*10)
87+
}, rx.WithCPUPool[int]())
88+
rx.Assert[int](ctx, obs, rx.HasItemsNoOrder[int]{
89+
Expected: []int{2, 10, 3, 20, 4, 30},
90+
})
91+
})
92+
})
93+
})
94+
95+
Context("Parallel/Error", func() {
96+
When("foo", func() {
97+
It("🧪 should: ", func() {
98+
// rxgo: Test_Observable_FlatMap_Parallel_Error1
99+
defer leaktest.Check(GinkgoT())()
100+
101+
ctx, cancel := context.WithCancel(context.Background())
102+
defer cancel()
103+
104+
obs := testObservable[int](ctx, 1, 2, 3).FlatMap(func(i rx.Item[int]) rx.Observable[int] {
105+
if i.V == 2 {
106+
return testObservable[int](ctx, errFoo)
107+
}
108+
return testObservable[int](ctx, i.V+1, i.V*10)
109+
})
110+
rx.Assert[int](ctx, obs,
111+
rx.HasError[int]{
112+
Expected: []error{errFoo},
113+
},
114+
)
115+
})
116+
})
117+
})
118+
})
119+
})
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
package rx_test
2+
3+
import (
4+
"context"
5+
6+
"github.com/fortytw2/leaktest"
7+
. "github.com/onsi/ginkgo/v2" //nolint:revive // ginkgo ok
8+
. "github.com/onsi/gomega" //nolint:revive // gomega ok
9+
"github.com/snivilised/lorax/rx"
10+
)
11+
12+
var _ = Describe("Observable operator", func() {
13+
Context("ForEach", func() {
14+
When("principle", func() {
15+
It("🧪 should: ", func() {
16+
// rxgo: Test_Observable_ForEach_Done
17+
defer leaktest.Check(GinkgoT())()
18+
19+
ctx, cancel := context.WithCancel(context.Background())
20+
defer cancel()
21+
22+
var gotErr error
23+
count := 0
24+
done := make(chan struct{})
25+
obs := testObservable[int](ctx, 1, 2, 3)
26+
obs.ForEach(func(i rx.Item[int]) {
27+
count += i.V
28+
}, func(err error) {
29+
gotErr = err
30+
done <- struct{}{}
31+
}, func() {
32+
done <- struct{}{}
33+
})
34+
35+
// We avoid using the assertion API on purpose
36+
<-done
37+
38+
Expect(count).To(Equal(6))
39+
Expect(gotErr).To(Succeed())
40+
})
41+
})
42+
43+
Context("Errors", func() {
44+
When("foo", func() {
45+
It("🧪 should: ", func() {
46+
// rxgo: Test_Observable_ForEach_Error
47+
defer leaktest.Check(GinkgoT())()
48+
49+
ctx, cancel := context.WithCancel(context.Background())
50+
defer cancel()
51+
52+
count := 0
53+
var gotErr error
54+
done := make(chan struct{})
55+
56+
obs := testObservable[int](ctx, 1, 2, 3, errFoo)
57+
obs.ForEach(func(i rx.Item[int]) {
58+
count += i.V
59+
}, func(err error) {
60+
gotErr = err
61+
select {
62+
case <-ctx.Done():
63+
return
64+
case done <- struct{}{}:
65+
}
66+
}, func() {
67+
select {
68+
case <-ctx.Done():
69+
return
70+
case done <- struct{}{}:
71+
}
72+
}, rx.WithContext[int](ctx))
73+
74+
// We avoid using the assertion API on purpose
75+
<-done
76+
77+
Expect(count).To(Equal(6))
78+
Expect(gotErr).To(MatchError(errFoo))
79+
})
80+
})
81+
})
82+
})
83+
})

0 commit comments

Comments
 (0)