Skip to content

Commit 4e55089

Browse files
authored
Add thread safety to resolve race conditions in Issues #305 and #335 (#377)
* fix: Add thread safety to core koanf to resolve race conditions Fixes #305 and #335 by implementing proper synchronization in koanf core. ## Issues Fixed - Issue #305: File watcher race condition causing empty string reads - Issue #335: "concurrent map writes" panic during concurrent Load() calls ## Root Cause The koanf struct's internal maps (confMap, confMapFlat, keyMap) were accessed concurrently without synchronization, causing race conditions when multiple goroutines performed read/write operations. ## Solution - Add sync.RWMutex to Koanf struct for thread safety - Exclusive locks (Lock) for write operations: merge(), Delete() - Shared locks (RLock) for read operations: Get(), Keys(), All(), etc. - Zero breaking changes - identical public API - Optimized for read-heavy workloads (RWMutex allows concurrent reads) ## Testing - Added comprehensive race condition tests that reproduce both issues - TestConcurrentLoadRaceCondition: Reproduces issue #335 - TestFileWatcherRaceCondition: Reproduces issue #305 - TestConcurrentReadWriteMix: Mixed read/write scenarios - TestConcurrentEdgeCases: Edge case methods (Cut, Copy, etc.) - All tests pass with -race flag - All existing functionality tests continue to pass ## Performance Impact - Zero overhead for single-threaded usage - Optimal for concurrent reads (multiple readers can proceed simultaneously) - Minimal contention cost due to RWMutex design Created using prompts: "both options are not correct, can you look at the underlying code, and see if adding a mutex somewhere is needed" and "we should begin by writing tests that replicate this. and any other race conditions that could occur." * fix: Resolve race condition in file provider and TestUnwatchFile Fixes race conditions detected when running tests with -race flag. ## Root Cause The file provider had a race condition where: 1. Watch() assigns to f.w (fsnotify.Watcher field) 2. Previous watcher's cleanup goroutine calls f.w.Close() 3. These operations could happen concurrently when re-watching after unwatch Additionally, TestUnwatchFile had a race between: - Main test goroutine reading/writing `reloaded` boolean variable - Watch callback goroutine writing to `reloaded` variable ## Solution 1. **File Provider**: Add mutex protection around watcher state changes - Added `mu sync.Mutex` to File struct - Protected Watch() and Unwatch() methods with mutex - Protected cleanup code in watch goroutine with mutex - Added nil checks for defensive programming 2. **TestUnwatchFile**: Use atomic operations instead of plain boolean - Changed `reloaded bool` to `reloaded int32` - Use atomic.StoreInt32/LoadInt32 for thread-safe access - Test now properly verifies re-watching capability after unwatch ## Testing - All watch-related tests pass with race detector: TestWatchFile, TestWatchFileSymlink, TestWatchFileDirectorySymlink, TestUnwatchFile - All submodule tests continue to pass with race detection - CI pattern `github.com/knadh/koanf...` properly tests all submodules Created using prompt: "alright, lets then do a separate commit now to fix the test unwatch file race" and "maybe a better approach will be to add a mutex for file watcher instad?" * Simplify file provider synchronization to use only mutex - Remove atomic operations in favor of simple boolean flags - Fix potential deadlock in Watch() by releasing lock before goroutine spawn - Use minimal locking in cleanup goroutine - Cleaner, more maintainable synchronization pattern - All tests pass with race detector * Simplify locking strategy by using localized locking instead of defer patterns Switch from global defer-based locking to localized locking in read methods to reduce code duplication and improve readability. This eliminates the need for duplicate logic that was added to avoid deadlocks between Sprint() and Keys() methods. - Keys(), KeyMap(), Get(), Exists() now use minimal lock duration - Sprint() can safely call Keys() without deadlock concerns - Remove duplicate key extraction logic from Sprint() - Maintain defer pattern only where needed for maps.Copy() operations - All existing tests pass including race condition and deadlock tests * perf: optimize Sprint() method to reduce lock contention - Move RLock/RUnlock outside the iteration loop to reduce lock overhead - Maintain alphabetical sorting behavior for API compatibility - Eliminate repeated lock acquisitions from O(n) to O(1) - All tests pass including race condition detection "optimize Sprint method performance while maintaining thread safety and sorting"
1 parent cc80f4f commit 4e55089

File tree

4 files changed

+642
-27
lines changed

4 files changed

+642
-27
lines changed

.github/workflows/test.yml

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,3 +26,19 @@ jobs:
2626

2727
- name: Run Coverage
2828
run: go test -v -cover ./...
29+
30+
race:
31+
runs-on: ubuntu-latest
32+
33+
name: Race Detection Tests
34+
steps:
35+
- uses: actions/checkout@v4
36+
37+
- name: Setup Go
38+
uses: actions/setup-go@v5
39+
with:
40+
go-version: '1.24'
41+
check-latest: true
42+
43+
- name: Run race detection tests
44+
run: go test -race -v github.com/knadh/koanf...

koanf.go

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"reflect"
88
"sort"
99
"strconv"
10+
"sync"
1011

1112
"github.com/go-viper/mapstructure/v2"
1213
"github.com/knadh/koanf/maps"
@@ -19,6 +20,7 @@ type Koanf struct {
1920
confMapFlat map[string]interface{}
2021
keyMap KeyMap
2122
conf Conf
23+
mu sync.RWMutex
2224
}
2325

2426
// Conf is the Koanf configuration.
@@ -123,46 +125,58 @@ func (ko *Koanf) Load(p Provider, pa Parser, opts ...Option) error {
123125
// Keys returns the slice of all flattened keys in the loaded configuration
124126
// sorted alphabetically.
125127
func (ko *Koanf) Keys() []string {
128+
ko.mu.RLock()
126129
out := make([]string, 0, len(ko.confMapFlat))
127130
for k := range ko.confMapFlat {
128131
out = append(out, k)
129132
}
133+
ko.mu.RUnlock()
130134
sort.Strings(out)
131135
return out
132136
}
133137

134138
// KeyMap returns a map of flattened keys and the individual parts of the
135139
// key as slices. eg: "parent.child.key" => ["parent", "child", "key"].
136140
func (ko *Koanf) KeyMap() KeyMap {
141+
ko.mu.RLock()
137142
out := make(KeyMap, len(ko.keyMap))
138143
for key, parts := range ko.keyMap {
139144
out[key] = make([]string, len(parts))
140145
copy(out[key], parts)
141146
}
147+
ko.mu.RUnlock()
142148
return out
143149
}
144150

145151
// All returns a map of all flattened key paths and their values.
146152
// Note that it uses maps.Copy to create a copy that uses
147153
// json.Marshal which changes the numeric types to float64.
148154
func (ko *Koanf) All() map[string]interface{} {
155+
ko.mu.RLock()
156+
defer ko.mu.RUnlock()
149157
return maps.Copy(ko.confMapFlat)
150158
}
151159

152160
// Raw returns a copy of the full raw conf map.
153161
// Note that it uses maps.Copy to create a copy that uses
154162
// json.Marshal which changes the numeric types to float64.
155163
func (ko *Koanf) Raw() map[string]interface{} {
164+
ko.mu.RLock()
165+
defer ko.mu.RUnlock()
156166
return maps.Copy(ko.confMap)
157167
}
158168

159169
// Sprint returns a key -> value string representation
160170
// of the config map with keys sorted alphabetically.
161171
func (ko *Koanf) Sprint() string {
162172
b := bytes.Buffer{}
163-
for _, k := range ko.Keys() {
164-
b.WriteString(fmt.Sprintf("%s -> %v\n", k, ko.confMapFlat[k]))
173+
keys := ko.Keys()
174+
ko.mu.RLock()
175+
for _, k := range keys {
176+
v := ko.confMapFlat[k]
177+
b.WriteString(fmt.Sprintf("%s -> %v\n", k, v))
165178
}
179+
ko.mu.RUnlock()
166180
return b.String()
167181
}
168182

@@ -287,6 +301,9 @@ func (ko *Koanf) UnmarshalWithConf(path string, o interface{}, c UnmarshalConf)
287301
// Clears all keys/values if no path is specified.
288302
// Every empty, key on the path, is recursively deleted.
289303
func (ko *Koanf) Delete(path string) {
304+
ko.mu.Lock()
305+
defer ko.mu.Unlock()
306+
290307
// No path. Erase the entire map.
291308
if path == "" {
292309
ko.confMap = make(map[string]interface{})
@@ -316,11 +333,14 @@ func (ko *Koanf) Get(path string) interface{} {
316333
}
317334

318335
// Does the path exist?
336+
ko.mu.RLock()
319337
p, ok := ko.keyMap[path]
320338
if !ok {
339+
ko.mu.RUnlock()
321340
return nil
322341
}
323342
res := maps.Search(ko.confMap, p)
343+
ko.mu.RUnlock()
324344

325345
// Non-reference types are okay to return directly.
326346
// Other types are "copied" with maps.Copy or json.Marshal
@@ -370,7 +390,9 @@ func (ko *Koanf) Slices(path string) []*Koanf {
370390

371391
// Exists returns true if the given key path exists in the conf map.
372392
func (ko *Koanf) Exists(path string) bool {
393+
ko.mu.RLock()
373394
_, ok := ko.keyMap[path]
395+
ko.mu.RUnlock()
374396
return ok
375397
}
376398

@@ -404,6 +426,9 @@ func (ko *Koanf) Delim() string {
404426
}
405427

406428
func (ko *Koanf) merge(c map[string]interface{}, opts *options) error {
429+
ko.mu.Lock()
430+
defer ko.mu.Unlock()
431+
407432
maps.IntfaceKeysToStrings(c)
408433
if opts.merge != nil {
409434
if err := opts.merge(c, ko.confMap); err != nil {

providers/file/file.go

Lines changed: 58 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import (
88
"fmt"
99
"os"
1010
"path/filepath"
11-
"sync/atomic"
11+
"sync"
1212
"time"
1313

1414
"github.com/fsnotify/fsnotify"
@@ -19,9 +19,9 @@ type File struct {
1919
path string
2020
w *fsnotify.Watcher
2121

22-
// Using Go 1.18 atomic functions for backwards compatibility.
23-
isWatching uint32
24-
isUnwatched uint32
22+
// Mutex to protect concurrent access to watcher state
23+
mu sync.Mutex
24+
isWatching bool
2525
}
2626

2727
// Provider returns a file provider.
@@ -42,8 +42,11 @@ func (f *File) Read() (map[string]interface{}, error) {
4242
// Watch watches the file and triggers a callback when it changes. It is a
4343
// blocking function that internally spawns a goroutine to watch for changes.
4444
func (f *File) Watch(cb func(event interface{}, err error)) error {
45+
f.mu.Lock()
46+
4547
// If a watcher already exists, return an error.
46-
if atomic.LoadUint32(&f.isWatching) == 1 {
48+
if f.isWatching {
49+
f.mu.Unlock()
4750
return errors.New("file is already being watched")
4851
}
4952

@@ -61,10 +64,24 @@ func (f *File) Watch(cb func(event interface{}, err error)) error {
6164

6265
f.w, err = fsnotify.NewWatcher()
6366
if err != nil {
67+
f.mu.Unlock()
6468
return err
6569
}
6670

67-
atomic.StoreUint32(&f.isWatching, 1)
71+
f.isWatching = true
72+
73+
// Set up the directory watch before releasing the lock
74+
err = f.w.Add(fDir)
75+
if err != nil {
76+
f.w.Close()
77+
f.w = nil
78+
f.isWatching = false
79+
f.mu.Unlock()
80+
return err
81+
}
82+
83+
// Release the lock before spawning goroutine
84+
f.mu.Unlock()
6885

6986
var (
7087
lastEvent string
@@ -77,8 +94,12 @@ func (f *File) Watch(cb func(event interface{}, err error)) error {
7794
select {
7895
case event, ok := <-f.w.Events:
7996
if !ok {
80-
// Only throw an error if it was not an explicit unwatch.
81-
if atomic.LoadUint32(&f.isUnwatched) == 0 {
97+
// Only throw an error if we were still supposed to be watching.
98+
f.mu.Lock()
99+
stillWatching := f.isWatching
100+
f.mu.Unlock()
101+
102+
if stillWatching {
82103
cb(nil, errors.New("fsnotify watch channel closed"))
83104
}
84105

@@ -126,8 +147,12 @@ func (f *File) Watch(cb func(event interface{}, err error)) error {
126147
// There's an error.
127148
case err, ok := <-f.w.Errors:
128149
if !ok {
129-
// Only throw an error if it was not an explicit unwatch.
130-
if atomic.LoadUint32(&f.isUnwatched) == 0 {
150+
// Only throw an error if we were still supposed to be watching.
151+
f.mu.Lock()
152+
stillWatching := f.isWatching
153+
f.mu.Unlock()
154+
155+
if stillWatching {
131156
cb(nil, errors.New("fsnotify err channel closed"))
132157
}
133158

@@ -140,17 +165,33 @@ func (f *File) Watch(cb func(event interface{}, err error)) error {
140165
}
141166
}
142167

143-
atomic.StoreUint32(&f.isWatching, 0)
144-
atomic.StoreUint32(&f.isUnwatched, 0)
145-
f.w.Close()
168+
f.mu.Lock()
169+
f.isWatching = false
170+
if f.w != nil {
171+
f.w.Close()
172+
f.w = nil
173+
}
174+
f.mu.Unlock()
146175
}()
147176

148-
// Watch the directory for changes.
149-
return f.w.Add(fDir)
177+
return nil
150178
}
151179

152180
// Unwatch stops watching the files and closes fsnotify watcher.
153181
func (f *File) Unwatch() error {
154-
atomic.StoreUint32(&f.isUnwatched, 1)
155-
return f.w.Close()
182+
f.mu.Lock()
183+
defer f.mu.Unlock()
184+
185+
if !f.isWatching {
186+
return nil // Already unwatched
187+
}
188+
189+
f.isWatching = false
190+
if f.w != nil {
191+
// Close the watcher to signal the goroutine to stop
192+
// The goroutine will handle setting f.w = nil
193+
return f.w.Close()
194+
}
195+
// This state should ideally never be reached - it indicates a bug in the synchronization logic
196+
return errors.New("file watcher is in an inconsistent state: isWatching is true but watcher is nil")
156197
}

0 commit comments

Comments
 (0)