Skip to content

Commit 30cc285

Browse files
committed
test(rx): add has items matcher (#266)
1 parent a79dc85 commit 30cc285

18 files changed

+557
-117
lines changed

.vscode/settings.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
"godirwalk",
2525
"gofmt",
2626
"goimports",
27+
"goleak",
2728
"gomnd",
2829
"gosec",
2930
"gosimple",
@@ -43,14 +44,17 @@
4344
"navi",
4445
"nicksnyder",
4546
"nolintlint",
47+
"onecontext",
4648
"prealloc",
4749
"repath",
4850
"Resumer",
51+
"rxgo",
4952
"sidewalk",
5053
"staticcheck",
5154
"structcheck",
5255
"stylecheck",
5356
"Syncer",
57+
"teivah",
5458
"thelper",
5559
"tparallel",
5660
"typecheck",

README.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,6 +359,14 @@ The ___Start___ and ___Stop___ conditions are defined using `ListenBy`, eg:
359359

360360
## 🔨 Development
361361

362+
___RxGo___
363+
364+
<p align="left">
365+
<a href="https://rxjs.dev/guide/overview"><img src="https://avatars.githubusercontent.com/u/6407041?s=200&v=4" width="50" /></a>
366+
</p>
367+
368+
To support concurrency features, Extendio uses the reactive model provided by [RxGo](https://github.com/ReactiveX/RxGo). However, since ___RxGo___ seems to be a dead project with its last release in April 2021 and its unit tests not currently running successfully, the decision has been made to re-implement this locally. One of the main reasons for the project no longer being actively maintained is the release of generics feature in Go version 1.18, and supporting generics in RxGo would require significant effort to re-write the entire library. While work on this has begun, it's unclear when this will be delivered. Despite this, the reactive model's support for concurrency is highly valued, and Extendio aims to make use of a minimal functionality set for parallel processing during directory traversal. The goal is to replace it with the next version of RxGo when it becomes available.
369+
362370
See:
363371

364372
- [Github Development Workflow](./resources/doc/GITHUB-DEV.md)

go.mod

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ require (
1414
require (
1515
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect
1616
github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 // indirect
17+
go.uber.org/goleak v1.2.1 // indirect
1718
golang.org/x/tools v0.8.0 // indirect
1819
)
1920

@@ -23,6 +24,7 @@ require (
2324
github.com/natefinch/lumberjack v2.0.0+incompatible
2425
github.com/nicksnyder/go-i18n/v2 v2.2.1
2526
github.com/pkg/errors v0.9.1
27+
github.com/teivah/onecontext v1.3.0
2628
go.uber.org/atomic v1.10.0 // indirect
2729
go.uber.org/multierr v1.9.0 // indirect
2830
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 // indirect

go.sum

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN
1717
github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 h1:yAJXTCF9TqKcTiHJAE8dj7HMvPfh66eeA2JYW7eFpSE=
1818
github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
1919
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
20+
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
21+
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
22+
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
2023
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 h1:RWengNIwukTxcDr9M+97sNutRR1RKhG96O6jWumTTnw=
2124
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826/go.mod h1:TaXosZuwdSHYgviHp1DAtfrULt5eUgsSMsZf+YrPgl8=
2225
github.com/natefinch/lumberjack v2.0.0+incompatible h1:4QJd3OLAMgj7ph+yZTuX13Ld4UpgHp07nNdFX7mqFfM=
@@ -34,12 +37,18 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
3437
github.com/samber/lo v1.38.1 h1:j2XEAqXKb09Am4ebOg31SpvzUTTs6EN3VfgeLUhPdXM=
3538
github.com/samber/lo v1.38.1/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA=
3639
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
40+
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
3741
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
3842
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
43+
github.com/teivah/onecontext v1.3.0 h1:tbikMhAlo6VhAuEGCvhc8HlTnpX4xTNPTOseWuhO1J0=
44+
github.com/teivah/onecontext v1.3.0/go.mod h1:hoW1nmdPVK/0jrvGtcx8sCKYs2PiS4z0zzfdeuEVyb0=
3945
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
4046
go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ=
4147
go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
48+
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
4249
go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI=
50+
go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=
51+
go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4=
4352
go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI=
4453
go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ=
4554
go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60=
@@ -48,7 +57,9 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk
4857
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
4958
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 h1:3MTrJm4PyNL9NBqvYDSj3DHl46qQakyfqfWo4jgfaEM=
5059
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17/go.mod h1:lgLbSvA5ygNOMpwM/9anMpWVlVJ7Z+cHWq/eFuinpGE=
60+
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
5161
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
62+
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
5263
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
5364
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
5465
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
@@ -73,6 +84,8 @@ golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
7384
golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE=
7485
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
7586
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
87+
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
88+
golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
7689
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
7790
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
7891
golang.org/x/tools v0.8.0 h1:vSDcovVPld282ceKgDimkRSC8kpaH1dgyc9UMzlt84Y=
@@ -81,8 +94,10 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T
8194
google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw=
8295
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
8396
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
97+
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
8498
gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc=
8599
gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc=
100+
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
86101
gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
87102
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
88103
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

rx/item.go

Lines changed: 0 additions & 3 deletions
This file was deleted.

rx/item_test.go

Lines changed: 0 additions & 19 deletions
This file was deleted.

rxgo/README.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
This implementation is based upon the original version of rxgo at https://github.com/ReactiveX/RxGo, based upon generics rather than relying on reflection and interface{}. It is intended to replace this local version with the official version when and if it becomes available and for this to happen with minimal issues, the design here has to mirror the original as closely as possible.

rxgo/factory.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package rxgo
2+
3+
// FromChannel creates a cold observable from a channel.
4+
func FromChannel[T any](next <-chan Item[T], opts ...Option[T]) Observable[T] {
5+
option := parseOptions(opts...)
6+
ctx := option.buildContext(emptyContext)
7+
8+
return &ObservableImpl[T]{
9+
parent: ctx,
10+
iterable: newChannelIterable(next, opts...),
11+
}
12+
}

rxgo/item.go

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
package rxgo
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"github.com/samber/lo"
8+
)
9+
10+
// CloseChannelStrategy indicates a strategy on whether to close a channel.
11+
type CloseChannelStrategy uint32
12+
13+
const (
14+
// LeaveChannelOpen indicates to leave the channel open after completion.
15+
LeaveChannelOpen CloseChannelStrategy = iota
16+
// CloseChannel indicates to close the channel open after completion.
17+
CloseChannel
18+
)
19+
20+
type Item[T any] struct {
21+
V T
22+
E error
23+
}
24+
25+
// Of creates an item from a value.
26+
func Of[T any](i T) Item[T] {
27+
return Item[T]{V: i}
28+
}
29+
30+
// Error checks if an item is an error.
31+
func (i Item[T]) Error() bool {
32+
return i.E != nil
33+
}
34+
35+
// SendItemsV is an utility function that send a list of items and indicate
36+
// a strategy on whether to close the channel once the function completes.
37+
func SendItems[T any](ctx context.Context, ch chan<- Item[T], strategy CloseChannelStrategy, items ...T) {
38+
if strategy == CloseChannel {
39+
defer close(ch)
40+
}
41+
42+
send(ctx, ch, items...)
43+
}
44+
45+
func send[T any](ctx context.Context, ch chan<- Item[T], items ...T) {
46+
// This is only the basic implementation. It does not yet support sending
47+
// a slice or a channel. Support for these don't need to be added unless
48+
// explicitly required.
49+
//
50+
for _, currentItem := range items {
51+
_ = Of(currentItem).SendContext(ctx, ch)
52+
}
53+
}
54+
55+
// SendItemsV (verbose) is an utility function that send a list of items and indicate
56+
// a strategy on whether to close the channel once the function completes.
57+
// This will eventually be removed as its only required for debugging purposes
58+
func SendItemsV[T any](ctx context.Context, ch chan<- Item[T], strategy CloseChannelStrategy, items ...T) {
59+
if strategy == CloseChannel {
60+
defer close(ch)
61+
}
62+
63+
sendV(ctx, ch, items...)
64+
}
65+
66+
// This will eventually be removed as its only required for debugging purposes
67+
func sendV[T any](ctx context.Context, ch chan<- Item[T], items ...T) {
68+
fmt.Println("")
69+
70+
// This is only the basic implementation. It does not yet support sending
71+
// a slice or a channel. Support for these don't need to be added unless
72+
// explicitly required.
73+
//
74+
for _, currentItem := range items {
75+
result := Of(currentItem).SendContext(ctx, ch)
76+
indicator := lo.Ternary(result, "✔️", "❌")
77+
78+
fmt.Printf("===> 💘 sending item: '%v' (%v)\n", currentItem, indicator)
79+
}
80+
}
81+
82+
// SendContext sends an item and blocks until it is sent or a context canceled.
83+
// It returns a boolean to indicate whether the item was sent.
84+
func (i Item[T]) SendContext(ctx context.Context, ch chan<- Item[T]) bool {
85+
select {
86+
case <-ctx.Done(): // Context's done channel has the highest priority
87+
return false
88+
default:
89+
select {
90+
case <-ctx.Done():
91+
return false
92+
case ch <- i:
93+
return true
94+
}
95+
}
96+
}

rxgo/item_test.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package rxgo_test
2+
3+
import (
4+
"context"
5+
6+
. "github.com/onsi/ginkgo/v2"
7+
. "github.com/onsi/gomega"
8+
. "github.com/onsi/gomega/matchers"
9+
"github.com/onsi/gomega/types"
10+
11+
"github.com/snivilised/extendio/rxgo"
12+
)
13+
14+
// TODO: check to see if we need to use async ginkgo types
15+
16+
var _ = Describe("Item", func() {
17+
Context("Variadic", func() {
18+
Context("AndMatcher", func() {
19+
It("🧪 should: Send Items", func() {
20+
ch := make(chan rxgo.Item[int], 3)
21+
actual := RxObservable[int]{
22+
context: context.Background(),
23+
observable: rxgo.FromChannel(ch),
24+
}
25+
26+
go rxgo.SendItemsV(context.Background(), ch, rxgo.CloseChannel, 1, 2, 3)
27+
Expect(actual).Should(&AndMatcher{
28+
Matchers: []types.GomegaMatcher{
29+
MatchHasItems(1, 2, 3),
30+
},
31+
})
32+
})
33+
})
34+
35+
Context("Singular MatchHasItems", func() {
36+
It("🧪 should: Send Items", func() {
37+
ch := make(chan rxgo.Item[int], 3)
38+
actual := RxObservable[int]{
39+
context: context.Background(),
40+
observable: rxgo.FromChannel(ch),
41+
}
42+
43+
go rxgo.SendItems(context.Background(), ch, rxgo.CloseChannel, 1, 2, 3)
44+
Expect(actual).Should(MatchHasItems(1, 2, 3))
45+
})
46+
})
47+
})
48+
49+
Context("Variadic with Error", func() {
50+
It("🧪 should: (can't send an error into a channel)", Pending)
51+
})
52+
})

0 commit comments

Comments
 (0)