Skip to content

Commit 0d9e3e8

Browse files
committed
ref(nav): tidy concurrency (#329)
1 parent 7803b9f commit 0d9e3e8

13 files changed

+155
-83
lines changed

β€Žxfs/nav/navigation-async_test.go

Lines changed: 26 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,13 @@ type (
2727
Listen nav.ListeningState
2828
}
2929

30-
acceleratorFunc func(a nav.AccelerationOperators) nav.AccelerationOperators
30+
operatorFunc func(op nav.AccelerationOperators) nav.AccelerationOperators
3131

3232
asyncTE struct {
33-
given string
34-
should string
35-
acc acceleratorFunc
36-
resume *asyncResumeTE
33+
given string
34+
should string
35+
operator operatorFunc
36+
resume *asyncResumeTE
3737
}
3838

3939
asyncOkTE struct {
@@ -195,11 +195,11 @@ var _ = Describe("navigation", Ordered, func() {
195195
},
196196
)
197197

198-
if entry.acc != nil {
199-
entry.acc(runner)
198+
if entry.operator != nil {
199+
entry.operator(runner)
200200
}
201201

202-
_, err := runner.Run(ctx, cancel)
202+
result, err := runner.Run(ctx, cancel)
203203

204204
if outputCh != nil {
205205
consumer = StartConsumer[nav.TraverseOutput](
@@ -208,7 +208,10 @@ var _ = Describe("navigation", Ordered, func() {
208208
outputCh,
209209
)
210210
}
211+
211212
wgan.Wait("πŸ‘Ύ test-main")
213+
_ = result.Session.StartedAt()
214+
_ = result.Session.Elapsed()
212215

213216
if consumer != nil {
214217
fmt.Printf("---> πŸ“ŒπŸ“ŒπŸ“Œ consumer.count: '%v'\n", consumer.Count)
@@ -222,20 +225,20 @@ var _ = Describe("navigation", Ordered, func() {
222225

223226
Entry(nil, &asyncOkTE{
224227
asyncTE: asyncTE{
225-
given: "PrimarySession WithCPUPool",
228+
given: "Primary Session WithCPUPool",
226229
should: "run with context",
227-
acc: func(a nav.AccelerationOperators) nav.AccelerationOperators {
228-
return a // the default is like CPUPool
230+
operator: func(op nav.AccelerationOperators) nav.AccelerationOperators {
231+
return op // the default is like CPUPool
229232
},
230233
},
231234
}, SpecTimeout(time.Second*2)),
232235

233236
Entry(nil, &asyncOkTE{
234237
asyncTE: asyncTE{
235-
given: "PrimarySession WithPool",
238+
given: "Primary Session WithPool",
236239
should: "run with context",
237-
acc: func(a nav.AccelerationOperators) nav.AccelerationOperators {
238-
return a.NoW(3)
240+
operator: func(op nav.AccelerationOperators) nav.AccelerationOperators {
241+
return op.NoW(3)
239242
},
240243
},
241244
}, SpecTimeout(time.Second*2)),
@@ -244,8 +247,8 @@ var _ = Describe("navigation", Ordered, func() {
244247
asyncTE: asyncTE{
245248
given: "Fastward Resume WithCPUPool(universal: listen pending(logged)",
246249
should: "run with context",
247-
acc: func(a nav.AccelerationOperators) nav.AccelerationOperators {
248-
return a
250+
operator: func(op nav.AccelerationOperators) nav.AccelerationOperators {
251+
return op
249252
},
250253
// πŸ”₯ panic: send on closed channel; this is intermittent
251254
// probably a race condition
@@ -261,8 +264,8 @@ var _ = Describe("navigation", Ordered, func() {
261264
asyncTE: asyncTE{
262265
given: "Spawn Resume WithPool(universal: listen not active/deaf)",
263266
should: "run with context",
264-
acc: func(a nav.AccelerationOperators) nav.AccelerationOperators {
265-
return a.NoW(3)
267+
operator: func(op nav.AccelerationOperators) nav.AccelerationOperators {
268+
return op.NoW(3)
266269
},
267270
resume: &asyncResumeTE{
268271
Strategy: nav.ResumeStrategySpawnEn,
@@ -273,10 +276,10 @@ var _ = Describe("navigation", Ordered, func() {
273276

274277
Entry(nil, &asyncOkTE{
275278
asyncTE: asyncTE{
276-
given: "PrimarySession Consume",
279+
given: "Primary Session Consume",
277280
should: "enable output to be consumed externally",
278-
acc: func(a nav.AccelerationOperators) nav.AccelerationOperators {
279-
return a.NoW(4).Consume(outputCh)
281+
operator: func(op nav.AccelerationOperators) nav.AccelerationOperators {
282+
return op.NoW(4).Consume(outputCh)
280283
},
281284
},
282285
}, SpecTimeout(time.Second*2)),
@@ -285,9 +288,9 @@ var _ = Describe("navigation", Ordered, func() {
285288
asyncTE: asyncTE{
286289
given: "Fastward Resume Consume(universal: listen pending(logged)",
287290
should: "enable output to be consumed externally",
288-
acc: func(a nav.AccelerationOperators) nav.AccelerationOperators {
291+
operator: func(op nav.AccelerationOperators) nav.AccelerationOperators {
289292
outputCh = nav.CreateTraverseOutputCh(3)
290-
return a.NoW(4).Consume(outputCh)
293+
return op.NoW(4).Consume(outputCh)
291294
},
292295
// πŸ”₯ panic: send on closed channel;
293296
//

β€Žxfs/nav/navigation-listener_test.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,13 +73,16 @@ var _ = Describe("Listener", Ordered, func() {
7373
},
7474
}
7575
}
76-
_, _ = nav.New().Primary(&nav.Prime{
76+
result, _ := nav.New().Primary(&nav.Prime{
7777
Path: path,
7878
OptionsFn: optionFn,
7979
}).Run()
8080

8181
reason := fmt.Sprintf("❌ remaining: '%v'", strings.Join(entry.mandatory, ", "))
8282
Expect(len(entry.mandatory)).To(Equal(0), reason)
83+
84+
_ = result.Session.StartedAt()
85+
_ = result.Session.Elapsed()
8386
},
8487
func(entry *listenTE) string {
8588
return fmt.Sprintf("πŸ§ͺ ===> given: '%v'", entry.message)
@@ -229,10 +232,13 @@ var _ = Describe("Listener", Ordered, func() {
229232
o.Callback = foldersCallback("EARLY-EXIT-😴", o.Store.DoExtend)
230233
}
231234

232-
_, _ = nav.New().Primary(&nav.Prime{
235+
result, _ := nav.New().Primary(&nav.Prime{
233236
Path: path,
234237
OptionsFn: optionFn,
235238
}).Run()
239+
240+
_ = result.Session.StartedAt()
241+
_ = result.Session.Elapsed()
236242
})
237243

238244
It("should: exit early (files)", func() {
@@ -255,10 +261,13 @@ var _ = Describe("Listener", Ordered, func() {
255261
o.Callback = filesCallback("EARLY-EXIT-😴", o.Store.DoExtend)
256262
}
257263

258-
_, _ = nav.New().Primary(&nav.Prime{
264+
result, _ := nav.New().Primary(&nav.Prime{
259265
Path: path,
260266
OptionsFn: optionFn,
261267
}).Run()
268+
269+
_ = result.Session.StartedAt()
270+
_ = result.Session.Elapsed()
262271
})
263272
})
264273

@@ -336,6 +345,9 @@ var _ = Describe("Listener", Ordered, func() {
336345
GinkgoWriter.Printf("---> πŸ•πŸ• Metrics, files:'%v', folders:'%v'\n",
337346
files, folders,
338347
)
348+
349+
_ = result.Session.StartedAt()
350+
_ = result.Session.Elapsed()
339351
})
340352
})
341353
})

β€Žxfs/nav/navigation-session.go

Lines changed: 42 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,13 @@ import (
66
xi18n "github.com/snivilised/extendio/i18n"
77
)
88

9-
type TraverseSession interface {
9+
type Session interface {
1010
StartedAt() time.Time
1111
Elapsed() time.Duration
12+
}
13+
14+
type TraverseSession interface {
15+
Session
1216
run(sync NavigationSync, args ...any) (*TraverseResult, error)
1317
Save(path string) error
1418
}
@@ -20,12 +24,24 @@ type session struct {
2024
duration time.Duration
2125
}
2226

27+
func (s *session) StartedAt() time.Time {
28+
return s.startAt
29+
}
30+
31+
func (s *session) Elapsed() time.Duration {
32+
return s.duration
33+
}
34+
2335
func (s *session) start() {
2436
s.startAt = time.Now()
2537
}
2638

27-
func (s *session) finish(_ *TraverseResult, _ error) {
39+
func (s *session) finish(result *TraverseResult, _ error) {
2840
s.duration = time.Since(s.startAt)
41+
42+
if result != nil {
43+
result.Session = s
44+
}
2945
}
3046

3147
// Primary
@@ -36,45 +52,39 @@ type Primary struct {
3652
navigator TraverseNavigator
3753
}
3854

39-
func (s *Primary) init() {
40-
s.navigator = navigatorFactory{}.new(s.OptionFn)
41-
}
42-
4355
// Save persists the current state for a primary session, that allows
4456
// a subsequent run to complete the resume.
4557
func (s *Primary) Save(path string) error {
4658
return s.navigator.save(path)
4759
}
4860

49-
func (s *Primary) run(sync NavigationSync, args ...any) (result *TraverseResult, err error) {
50-
defer s.finish(result, err)
61+
func (s *Primary) init() {
62+
s.navigator = navigatorFactory{}.new(s.OptionFn)
63+
}
5164

65+
func (s *Primary) run(sync NavigationSync, args ...any) (*TraverseResult, error) {
5266
s.start()
5367
s.init()
5468

55-
return sync.Run(
69+
result, err := sync.Run(
5670
func() (*TraverseResult, error) {
5771
return s.navigator.walk(s.Path)
5872
},
5973
s.navigator,
6074
args...,
6175
)
62-
}
6376

64-
func (s *Primary) StartedAt() time.Time {
65-
return s.startAt
66-
}
77+
s.finish(result, err)
6778

68-
func (s *Primary) Elapsed() time.Duration {
69-
return s.duration
79+
return result, err
7080
}
7181

7282
func (s *Primary) finish(result *TraverseResult, err error) {
73-
defer s.session.finish(result, err)
74-
7583
if s.navigator != nil {
7684
_ = s.navigator.finish()
7785
}
86+
87+
s.session.finish(result, err)
7888
}
7989

8090
// Resume represents a traversal that is invoked as a result
@@ -88,6 +98,12 @@ type Resume struct {
8898
rsc *resumeStrategyController
8999
}
90100

101+
// Save persists the current state for a resume session, that allows
102+
// a subsequent run to complete the resume.
103+
func (s *Resume) Save(path string) error {
104+
return s.rsc.nc.save(path)
105+
}
106+
91107
func (s *Resume) init() {
92108
var err error
93109

@@ -102,37 +118,27 @@ func (s *Resume) init() {
102118
}
103119
}
104120

105-
// Save persists the current state for a resume session, that allows
106-
// a subsequent run to complete the resume.
107-
func (s *Resume) Save(path string) error {
108-
return s.rsc.nc.save(path)
109-
}
110-
111-
func (s *Resume) run(sync NavigationSync, args ...any) (result *TraverseResult, err error) {
112-
defer s.finish(result, err)
113-
114-
s.init()
121+
func (s *Resume) run(sync NavigationSync, args ...any) (*TraverseResult, error) {
115122
s.start()
123+
s.init()
116124

117-
return sync.Run(
125+
result, err := sync.Run(
118126
func() (*TraverseResult, error) {
119127
return s.rsc.run()
120128
},
121129
s.rsc.nc,
122130
args...,
123131
)
124-
}
125132

126-
func (s *Resume) StartedAt() time.Time {
127-
return s.startAt
128-
}
133+
s.finish(result, err)
129134

130-
func (s *Resume) Elapsed() time.Duration {
131-
return s.duration
135+
return result, err
132136
}
133137

134138
func (s *Resume) finish(result *TraverseResult, err error) {
135-
defer s.session.finish(result, err)
139+
if s.rsc != nil {
140+
_ = s.rsc.finish()
141+
}
136142

137-
_ = s.rsc.finish()
143+
s.session.finish(result, err)
138144
}

β€Žxfs/nav/navigation-sync.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,9 @@ func (s *acceleratedSync) Run(callback sessionCallback, nc syncable, args ...any
7171
extracted, ctx, cancel := s.extract(args...)
7272

7373
if !extracted {
74-
panic("failed to obtain context")
74+
// TODO: convert to i18n error
75+
//
76+
panic("failed to extract context")
7577
}
7678

7779
nc.ensync(ctx, cancel, s.ai)

β€Žxfs/nav/navigator-abstract.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ func (n *navigator) ensync(
7272
// intermittent panic: send on closed channel, in fastward resume scenarios
7373
// 'gr:observable-navigator'
7474

75-
fmt.Printf("-->> πŸ†πŸ† sending job(%v)\n", job.ID)
75+
fmt.Printf("-->> πŸ‡πŸ‡ sending job(%v)\n", job.ID)
7676
}
7777
}
7878

β€Žxfs/nav/resume-strategy_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,9 @@ var _ = Describe("Resume", Ordered, func() {
300300
files: result.Metrics.Count(nav.MetricNoFilesInvokedEn),
301301
folders: result.Metrics.Count(nav.MetricNoFoldersInvokedEn),
302302
}
303+
304+
_ = result.Session.StartedAt()
305+
_ = result.Session.Elapsed()
303306
}
304307

305308
for _, strategyEn := range strategies {

β€Žxfs/nav/traverse-defs.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,9 +88,9 @@ type EndHandler func(_ *TraverseResult)
8888

8989
// TraverseResult the result of the traversal process.
9090
type TraverseResult struct {
91+
Session Session
9192
Metrics *NavigationMetrics
92-
// collection *MetricCollection
93-
err error
93+
err error
9494
}
9595

9696
func (r *TraverseResult) merge(other *TraverseResult) (*TraverseResult, error) {

0 commit comments

Comments
Β (0)