Skip to content

Commit 221628d

Browse files
ArthurSenszeck-ops
authored andcommitted
[exporter/prometheusremotewrite] Fix WAL deadlock (open-telemetry#37630)
I was taking a look over open-telemetry#20875 and hoping to finish it. Fixes open-telemetry#19363 Fixes open-telemetry#24399 Fixes open-telemetry#15277 --- As mentioned in open-telemetry#24399 (comment), I used a library to help me understand how the deadlock was happening. (1st commit). It showed that `persistToWal` was trying to acquire the lock, while `readPrompbFromWal` held it forever. I changed the strategy here and instead of using fs.Notify, and all that complicated logic around it, we're just using a pub/sub strategy between the writer and reader Go routines. The reader go routine, once finding an empty WAL, will now release the lock immediately and wait for a notification from the writer. While previously it would hold the lock while waiting for a write that would never happen. --------- Signed-off-by: Arthur Silva Sens <[email protected]>
1 parent 430c21c commit 221628d

File tree

4 files changed

+108
-67
lines changed

4 files changed

+108
-67
lines changed

.chloggen/prw-WAL-deadlock.yaml

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
2+
change_type: bug_fix
3+
4+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
5+
component: prometheusremotewriteexproter
6+
7+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
8+
note: |
9+
Resolves a deadlock in the WAL by temporarily releasing a lock while waiting for new writes to the WAL.
10+
# One or more tracking issues related to the change
11+
issues: [19363, 24399, 15277]
12+
13+
# (Optional) One or more lines of additional information to render under the primary note.
14+
# These lines will be padded with 2 spaces and then inserted directly into the document.
15+
# Use pipe (|) for multiline entries.
16+
subtext:
17+
18+
19+
change_logs: [user]

exporter/prometheusremotewriteexporter/go.mod

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ go 1.22.0
44

55
require (
66
github.com/cenkalti/backoff/v4 v4.3.0
7-
github.com/fsnotify/fsnotify v1.8.0
87
github.com/gogo/protobuf v1.3.2
98
github.com/golang/snappy v0.0.4
109
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.119.0
@@ -37,6 +36,8 @@ require (
3736
go.uber.org/zap v1.27.0
3837
)
3938

39+
require github.com/fsnotify/fsnotify v1.8.0 // indirect
40+
4041
require (
4142
github.com/cespare/xxhash/v2 v2.3.0 // indirect
4243
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect

exporter/prometheusremotewriteexporter/wal.go

Lines changed: 21 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import (
1212
"sync/atomic"
1313
"time"
1414

15-
"github.com/fsnotify/fsnotify"
1615
"github.com/gogo/protobuf/proto"
1716
"github.com/prometheus/prometheus/prompb"
1817
"github.com/tidwall/wal"
@@ -30,6 +29,7 @@ type prweWAL struct {
3029

3130
stopOnce sync.Once
3231
stopChan chan struct{}
32+
rNotify chan struct{}
3333
rWALIndex *atomic.Uint64
3434
wWALIndex *atomic.Uint64
3535
}
@@ -70,6 +70,7 @@ func newWAL(walConfig *WALConfig, exportSink func(context.Context, []*prompb.Wri
7070
exportSink: exportSink,
7171
walConfig: walConfig,
7272
stopChan: make(chan struct{}),
73+
rNotify: make(chan struct{}),
7374
rWALIndex: &atomic.Uint64{},
7475
wWALIndex: &atomic.Uint64{},
7576
}
@@ -315,13 +316,15 @@ func (prwe *prweWAL) persistToWAL(requests []*prompb.WriteRequest) error {
315316
batch.Write(wIndex, protoBlob)
316317
}
317318

319+
// Notify reader go routine that is possibly waiting for writes.
320+
select {
321+
case prwe.rNotify <- struct{}{}:
322+
default:
323+
}
318324
return prwe.wal.WriteBatch(batch)
319325
}
320326

321327
func (prwe *prweWAL) readPrompbFromWAL(ctx context.Context, index uint64) (wreq *prompb.WriteRequest, err error) {
322-
prwe.mu.Lock()
323-
defer prwe.mu.Unlock()
324-
325328
var protoBlob []byte
326329
for i := 0; i < 12; i++ {
327330
// Firstly check if we've been terminated, then exit if so.
@@ -337,10 +340,10 @@ func (prwe *prweWAL) readPrompbFromWAL(ctx context.Context, index uint64) (wreq
337340
index = 1
338341
}
339342

343+
prwe.mu.Lock()
340344
if prwe.wal == nil {
341345
return nil, fmt.Errorf("attempt to read from closed WAL")
342346
}
343-
344347
protoBlob, err = prwe.wal.Read(index)
345348
if err == nil { // The read succeeded.
346349
req := new(prompb.WriteRequest)
@@ -351,74 +354,26 @@ func (prwe *prweWAL) readPrompbFromWAL(ctx context.Context, index uint64) (wreq
351354
// Now increment the WAL's read index.
352355
prwe.rWALIndex.Add(1)
353356

357+
prwe.mu.Unlock()
354358
return req, nil
355359
}
360+
prwe.mu.Unlock()
356361

357-
if !errors.Is(err, wal.ErrNotFound) {
358-
return nil, err
359-
}
360-
361-
if index <= 1 {
362-
// This could be the very first attempted read, so try again, after a small sleep.
363-
time.Sleep(time.Duration(1<<i) * time.Millisecond)
364-
continue
365-
}
366-
367-
// Otherwise, we couldn't find the record, let's try watching
368-
// the WAL file until perhaps there is a write to it.
369-
walWatcher, werr := fsnotify.NewWatcher()
370-
if werr != nil {
371-
return nil, werr
372-
}
373-
if werr = walWatcher.Add(prwe.walPath); werr != nil {
374-
return nil, werr
375-
}
376-
377-
// Watch until perhaps there is a write to the WAL file.
378-
watchCh := make(chan error)
379-
wErr := err
380-
go func() {
381-
defer func() {
382-
watchCh <- wErr
383-
close(watchCh)
384-
// Close the file watcher.
385-
walWatcher.Close()
386-
}()
387-
362+
// If WAL was empty, let's wait for a notification from
363+
// the writer go routine.
364+
if errors.Is(err, wal.ErrNotFound) {
388365
select {
389-
case <-ctx.Done(): // If the context was cancelled, bail out ASAP.
390-
wErr = ctx.Err()
391-
return
392-
393-
case event, ok := <-walWatcher.Events:
394-
if !ok {
395-
return
396-
}
397-
switch event.Op {
398-
case fsnotify.Remove:
399-
// The file got deleted.
400-
// TODO: Add capabilities to search for the updated file.
401-
case fsnotify.Rename:
402-
// Renamed, we don't have information about the renamed file's new name.
403-
case fsnotify.Write:
404-
// Finally a write, let's try reading again, but after some watch.
405-
wErr = nil
406-
}
407-
408-
case eerr, ok := <-walWatcher.Errors:
409-
if ok {
410-
wErr = eerr
411-
}
366+
case <-prwe.rNotify:
367+
case <-ctx.Done():
368+
return nil, ctx.Err()
369+
case <-prwe.stopChan:
370+
return nil, fmt.Errorf("attempt to read from WAL after stopped")
412371
}
413-
}()
414-
415-
if gerr := <-watchCh; gerr != nil {
416-
return nil, gerr
417372
}
418373

419-
// Otherwise a write occurred might have occurred,
420-
// and we can sleep for a little bit then try again.
421-
time.Sleep(time.Duration(1<<i) * time.Millisecond)
374+
if !errors.Is(err, wal.ErrNotFound) {
375+
return nil, err
376+
}
422377
}
423378
return nil, err
424379
}

exporter/prometheusremotewriteexporter/wal_test.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,22 @@ package prometheusremotewriteexporter
55

66
import (
77
"context"
8+
"io"
9+
"net/http"
10+
"net/http/httptest"
811
"sort"
912
"testing"
1013
"time"
1114

15+
"github.com/gogo/protobuf/proto"
16+
"github.com/golang/snappy"
1217
"github.com/prometheus/prometheus/prompb"
1318
"github.com/stretchr/testify/assert"
1419
"github.com/stretchr/testify/require"
20+
"go.opentelemetry.io/collector/component"
21+
"go.opentelemetry.io/collector/component/componenttest"
22+
"go.opentelemetry.io/collector/config/confighttp"
23+
"go.opentelemetry.io/collector/exporter/exportertest"
1524
)
1625

1726
func doNothingExportSink(_ context.Context, reqL []*prompb.WriteRequest) error {
@@ -149,3 +158,60 @@ func TestWAL_persist(t *testing.T) {
149158
require.Equal(t, reqLFromWAL[0], reqL[0])
150159
require.Equal(t, reqLFromWAL[1], reqL[1])
151160
}
161+
162+
func TestExportWithWALEnabled(t *testing.T) {
163+
cfg := &Config{
164+
WAL: &WALConfig{
165+
Directory: t.TempDir(),
166+
},
167+
TargetInfo: &TargetInfo{}, // Declared just to avoid nil pointer dereference.
168+
CreatedMetric: &CreatedMetric{}, // Declared just to avoid nil pointer dereference.
169+
}
170+
buildInfo := component.BuildInfo{
171+
Description: "OpenTelemetry Collector",
172+
Version: "1.0",
173+
}
174+
set := exportertest.NewNopSettings()
175+
set.BuildInfo = buildInfo
176+
177+
server := httptest.NewServer(http.HandlerFunc(func(_ http.ResponseWriter, r *http.Request) {
178+
body, err := io.ReadAll(r.Body)
179+
assert.NoError(t, err)
180+
assert.NotNil(t, body)
181+
// Receives the http requests and unzip, unmarshalls, and extracts TimeSeries
182+
writeReq := &prompb.WriteRequest{}
183+
var unzipped []byte
184+
185+
dest, err := snappy.Decode(unzipped, body)
186+
assert.NoError(t, err)
187+
188+
ok := proto.Unmarshal(dest, writeReq)
189+
assert.NoError(t, ok)
190+
191+
assert.Len(t, writeReq.Timeseries, 1)
192+
}))
193+
clientConfig := confighttp.NewDefaultClientConfig()
194+
clientConfig.Endpoint = server.URL
195+
cfg.ClientConfig = clientConfig
196+
197+
prwe, err := newPRWExporter(cfg, set)
198+
assert.NoError(t, err)
199+
assert.NotNil(t, prwe)
200+
err = prwe.Start(context.Background(), componenttest.NewNopHost())
201+
require.NoError(t, err)
202+
assert.NotNil(t, prwe.client)
203+
204+
metrics := map[string]*prompb.TimeSeries{
205+
"test_metric": {
206+
Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}},
207+
Samples: []prompb.Sample{{Value: 1, Timestamp: 100}},
208+
},
209+
}
210+
err = prwe.handleExport(context.Background(), metrics, nil)
211+
assert.NoError(t, err)
212+
213+
// While on Unix systems, t.TempDir() would easily close the WAL files,
214+
// on Windows, it doesn't. So we need to close it manually to avoid flaky tests.
215+
err = prwe.Shutdown(context.Background())
216+
assert.NoError(t, err)
217+
}

0 commit comments

Comments
 (0)