Skip to content

Commit b302d80

Browse files
committed
feat(rx): add find operator (#174)
1 parent 8a8da81 commit b302d80

File tree

3 files changed

+100
-0
lines changed

3 files changed

+100
-0
lines changed

rx/observable-operator-find_test.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package rx_test
2+
3+
import (
4+
"context"
5+
6+
"github.com/fortytw2/leaktest"
7+
. "github.com/onsi/ginkgo/v2" //nolint:revive // ginkgo ok
8+
"github.com/snivilised/lorax/rx"
9+
)
10+
11+
var _ = Describe("Observable operator", func() {
12+
Context("Find", func() {
13+
Context("not empty", func() {
14+
When("is present", func() {
15+
It("🧪 should: return requested item", func() {
16+
// rxgo: Test_Observable_Find_NotEmpty
17+
defer leaktest.Check(GinkgoT())()
18+
19+
ctx, cancel := context.WithCancel(context.Background())
20+
defer cancel()
21+
obs := testObservable[int](ctx, 1, 2, 3).Find(func(item rx.Item[int]) bool {
22+
return item.V == 2
23+
})
24+
rx.Assert(ctx, obs, rx.HasItem[int]{
25+
Expected: 2,
26+
})
27+
})
28+
})
29+
30+
When("is not present", func() {
31+
It("🧪 should: return nothing", func() {
32+
defer leaktest.Check(GinkgoT())()
33+
34+
ctx, cancel := context.WithCancel(context.Background())
35+
defer cancel()
36+
obs := testObservable[int](ctx, 1, 2, 3).Find(func(item rx.Item[int]) bool {
37+
return item.V == 99
38+
})
39+
rx.Assert(ctx, obs, rx.IsEmpty[int]{})
40+
})
41+
})
42+
})
43+
44+
When("empty", func() {
45+
It("🧪 should: return nothing", func() {
46+
// rxgo: Test_Observable_Find_Empty
47+
defer leaktest.Check(GinkgoT())()
48+
49+
ctx, cancel := context.WithCancel(context.Background())
50+
defer cancel()
51+
52+
obs := rx.Empty[int]().Find(func(_ rx.Item[int]) bool {
53+
return true
54+
})
55+
rx.Assert(ctx, obs, rx.IsEmpty[int]{})
56+
})
57+
})
58+
})
59+
})

rx/observable-operator.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -706,6 +706,46 @@ func (op *filterOperator[T]) gatherNext(_ context.Context, _ Item[T],
706706
) {
707707
}
708708

709+
// Find emits the first item passing a predicate then complete.
710+
func (o *ObservableImpl[T]) Find(find Predicate[T], opts ...Option[T]) OptionalSingle[T] {
711+
const (
712+
forceSeq = true
713+
bypassGather = true
714+
)
715+
716+
return optionalSingle(o.parent, o, func() operator[T] {
717+
return &findOperator[T]{
718+
find: find,
719+
}
720+
}, true, true, opts...)
721+
}
722+
723+
type findOperator[T any] struct {
724+
find Predicate[T]
725+
}
726+
727+
func (op *findOperator[T]) next(ctx context.Context, item Item[T],
728+
dst chan<- Item[T], operatorOptions operatorOptions[T],
729+
) {
730+
if op.find(item) {
731+
item.SendContext(ctx, dst)
732+
operatorOptions.stop()
733+
}
734+
}
735+
736+
func (op *findOperator[T]) err(ctx context.Context, item Item[T],
737+
dst chan<- Item[T], operatorOptions operatorOptions[T],
738+
) {
739+
defaultErrorFuncOperator(ctx, item, dst, operatorOptions)
740+
}
741+
742+
func (op *findOperator[T]) end(_ context.Context, _ chan<- Item[T]) {
743+
}
744+
745+
func (op *findOperator[T]) gatherNext(_ context.Context, _ Item[T],
746+
_ chan<- Item[T], _ operatorOptions[T]) {
747+
}
748+
709749
// !!!
710750

711751
// Max determines and emits the maximum-valued item emitted by an Observable according to a comparator.

rx/observable.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ type Observable[T any] interface {
2727
Error(opts ...Option[T]) error
2828
Errors(opts ...Option[T]) []error
2929
Filter(apply Predicate[T], opts ...Option[T]) Observable[T]
30+
Find(find Predicate[T], opts ...Option[T]) OptionalSingle[T]
3031
Max(comparator Comparator[T], initLimit InitLimit[T], opts ...Option[T]) OptionalSingle[T]
3132
Map(apply Func[T], opts ...Option[T]) Observable[T]
3233
Min(comparator Comparator[T], initLimit InitLimit[T], opts ...Option[T]) OptionalSingle[T]

0 commit comments

Comments
 (0)