Skip to content

Commit 0556d9a

Browse files
committed
Fix shutdown logic in persistent queue to not require consumers to be closed first
Signed-off-by: Bogdan Drutu <[email protected]>
1 parent 483569a commit 0556d9a

File tree

6 files changed

+92
-83
lines changed

6 files changed

+92
-83
lines changed

.chloggen/fixshutdown.yaml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: 'enhancement'
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: exporterhelper
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: "Fix shutdown logic in persistent queue to not require consumers to be closed first"
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [8899]

exporter/exporterhelper/internal/mock_storage.go

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"context"
88
"errors"
99
"sync"
10+
"sync/atomic"
1011

1112
"go.opentelemetry.io/collector/component"
1213
"go.opentelemetry.io/collector/extension/experimental/storage"
@@ -23,43 +24,41 @@ func (m *mockStorageExtension) GetClient(_ context.Context, _ component.Kind, _
2324
if m.getClientError != nil {
2425
return nil, m.getClientError
2526
}
26-
return &mockStorageClient{st: &m.st}, nil
27+
return &mockStorageClient{st: &m.st, closed: &atomic.Bool{}}, nil
2728
}
2829

2930
func NewMockStorageExtension(getClientError error) storage.Extension {
3031
return &mockStorageExtension{getClientError: getClientError}
3132
}
3233

3334
type mockStorageClient struct {
34-
st *sync.Map
35-
closeCounter uint64
35+
st *sync.Map
36+
closed *atomic.Bool
3637
}
3738

38-
func (m *mockStorageClient) Get(_ context.Context, s string) ([]byte, error) {
39-
val, found := m.st.Load(s)
40-
if !found {
41-
return nil, nil
42-
}
43-
44-
return val.([]byte), nil
39+
func (m *mockStorageClient) Get(ctx context.Context, s string) ([]byte, error) {
40+
getOp := storage.GetOperation(s)
41+
err := m.Batch(ctx, getOp)
42+
return getOp.Value, err
4543
}
4644

47-
func (m *mockStorageClient) Set(_ context.Context, s string, bytes []byte) error {
48-
m.st.Store(s, bytes)
49-
return nil
45+
func (m *mockStorageClient) Set(ctx context.Context, s string, bytes []byte) error {
46+
return m.Batch(ctx, storage.SetOperation(s, bytes))
5047
}
5148

52-
func (m *mockStorageClient) Delete(_ context.Context, s string) error {
53-
m.st.Delete(s)
54-
return nil
49+
func (m *mockStorageClient) Delete(ctx context.Context, s string) error {
50+
return m.Batch(ctx, storage.DeleteOperation(s))
5551
}
5652

5753
func (m *mockStorageClient) Close(_ context.Context) error {
58-
m.closeCounter++
54+
m.closed.Store(true)
5955
return nil
6056
}
6157

6258
func (m *mockStorageClient) Batch(_ context.Context, ops ...storage.Operation) error {
59+
if m.isClosed() {
60+
panic("client already closed")
61+
}
6362
for _, op := range ops {
6463
switch op.Type {
6564
case storage.Get:
@@ -80,6 +79,6 @@ func (m *mockStorageClient) Batch(_ context.Context, ops ...storage.Operation) e
8079
return nil
8180
}
8281

83-
func (m *mockStorageClient) getCloseCount() uint64 {
84-
return m.closeCounter
82+
func (m *mockStorageClient) isClosed() bool {
83+
return m.closed.Load()
8584
}

exporter/exporterhelper/internal/persistent_queue.go

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,6 @@ import (
1414
)
1515

1616
var (
17-
// Monkey patching for unit test
18-
stopStorage = func(client storage.Client, ctx context.Context) error {
19-
if client == nil {
20-
return nil
21-
}
22-
return client.Close(ctx)
23-
}
2417
errNoStorageClient = errors.New("no storage client extension found")
2518
errWrongExtensionType = errors.New("requested extension is not a storage extension")
2619
)
@@ -70,9 +63,9 @@ func (pq *persistentQueue) Start(ctx context.Context, host component.Host, set Q
7063

7164
// Shutdown stops accepting items, shuts down the queue and closes the persistent queue
7265
func (pq *persistentQueue) Shutdown(ctx context.Context) error {
73-
close(pq.persistentContiguousStorage.stopChan)
66+
err := pq.persistentContiguousStorage.Shutdown(ctx)
7467
pq.stopWG.Wait()
75-
return stopStorage(pq.persistentContiguousStorage.client, ctx)
68+
return err
7669
}
7770

7871
func toStorageClient(ctx context.Context, storageID component.ID, host component.Host, ownerID component.ID, signal component.DataType) (storage.Client, error) {

exporter/exporterhelper/internal/persistent_queue_test.go

Lines changed: 15 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -39,28 +39,26 @@ func createTestQueue(t *testing.T, capacity, numConsumers int, set QueueSettings
3939
host := &mockHost{ext: map[component.ID]component.Component{
4040
{}: NewMockStorageExtension(nil),
4141
}}
42-
err := pq.Start(context.Background(), host, set)
43-
require.NoError(t, err)
42+
require.NoError(t, pq.Start(context.Background(), host, set))
4443
return pq
4544
}
4645

47-
func TestPersistentQueue_Capacity(t *testing.T) {
48-
pq := NewPersistentQueue(5, 1, component.ID{}, newFakeTracesRequestMarshalerFunc(),
49-
newFakeTracesRequestUnmarshalerFunc(), exportertest.NewNopCreateSettings())
50-
host := &mockHost{ext: map[component.ID]component.Component{
51-
{}: NewMockStorageExtension(nil),
52-
}}
53-
err := pq.Start(context.Background(), host, newNopQueueSettings())
54-
require.NoError(t, err)
55-
56-
// Stop consumer to imitate queue overflow
57-
close(pq.(*persistentQueue).persistentContiguousStorage.stopChan)
58-
pq.(*persistentQueue).stopWG.Wait()
59-
46+
func TestPersistentQueue_FullCapacity(t *testing.T) {
47+
start := make(chan struct{})
48+
done := make(chan struct{})
49+
pq := createTestQueue(t, 5, 1, newQueueSettings(func(item QueueRequest) {
50+
start <- struct{}{}
51+
<-done
52+
item.OnProcessingFinished()
53+
}))
6054
assert.Equal(t, 0, pq.Size())
6155

6256
req := newFakeTracesRequest(newTraces(1, 10))
6357

58+
// First request is picked by the consumer. Wait until the consumer is blocked on done.
59+
assert.NoError(t, pq.Offer(context.Background(), req))
60+
<-start
61+
6462
for i := 0; i < 10; i++ {
6563
result := pq.Offer(context.Background(), req)
6664
if i < 5 {
@@ -70,7 +68,8 @@ func TestPersistentQueue_Capacity(t *testing.T) {
7068
}
7169
}
7270
assert.Equal(t, 5, pq.Size())
73-
assert.NoError(t, stopStorage(pq.(*persistentQueue).persistentContiguousStorage.client, context.Background()))
71+
close(done)
72+
assert.NoError(t, pq.Shutdown(context.Background()))
7473
}
7574

7675
func TestPersistentQueueShutdown(t *testing.T) {
@@ -83,31 +82,6 @@ func TestPersistentQueueShutdown(t *testing.T) {
8382
assert.NoError(t, pq.Shutdown(context.Background()))
8483
}
8584

86-
// Verify storage closes after queue consumers. If not in this order, successfully consumed items won't be updated in storage
87-
func TestPersistentQueue_Close_StorageCloseAfterConsumers(t *testing.T) {
88-
pq := createTestQueue(t, 1001, 1, newNopQueueSettings())
89-
90-
lastRequestProcessedTime := time.Now()
91-
req := newFakeTracesRequest(newTraces(1, 10))
92-
req.processingFinishedCallback = func() {
93-
lastRequestProcessedTime = time.Now()
94-
}
95-
96-
fnBefore := stopStorage
97-
stopStorageTime := time.Now()
98-
stopStorage = func(storage storage.Client, ctx context.Context) error {
99-
stopStorageTime = time.Now()
100-
return storage.Close(ctx)
101-
}
102-
103-
for i := 0; i < 1000; i++ {
104-
assert.NoError(t, pq.Offer(context.Background(), req))
105-
}
106-
assert.NoError(t, pq.Shutdown(context.Background()))
107-
assert.True(t, stopStorageTime.After(lastRequestProcessedTime), "storage stop time should be after last request processed time")
108-
stopStorage = fnBefore
109-
}
110-
11185
func TestPersistentQueue_ConsumersProducers(t *testing.T) {
11286
cases := []struct {
11387
numMessagesProduced int

exporter/exporterhelper/internal/persistent_storage.go

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ type persistentContiguousStorage struct {
5353
readIndex itemIndex
5454
writeIndex itemIndex
5555
currentlyDispatchedItems []itemIndex
56+
refClient int64
5657
}
5758

5859
type itemIndex uint64
@@ -88,6 +89,7 @@ func newPersistentContiguousStorage(
8889

8990
func (pcs *persistentContiguousStorage) start(ctx context.Context, client storage.Client) {
9091
pcs.client = client
92+
pcs.refClient = 1
9193
pcs.initPersistentContiguousStorage(ctx)
9294
// Make sure the leftover requests are handled
9395
pcs.retrieveAndEnqueueNotDispatchedReqs(ctx)
@@ -154,9 +156,22 @@ func (pcs *persistentContiguousStorage) Capacity() int {
154156
return int(pcs.capacity)
155157
}
156158

157-
func (pcs *persistentContiguousStorage) stop(ctx context.Context) error {
158-
pcs.logger.Debug("Stopping persistentContiguousStorage")
159-
return pcs.client.Close(ctx)
159+
func (pcs *persistentContiguousStorage) Shutdown(ctx context.Context) error {
160+
close(pcs.stopChan)
161+
// Hold the lock only for `refClient`.
162+
pcs.mu.Lock()
163+
defer pcs.mu.Unlock()
164+
return pcs.unrefClient(ctx)
165+
}
166+
167+
// unrefClient unrefs the client, and closes if no more references. Callers MUST hold the mutex.
168+
// This is needed because consumers of the queue may still process the requests while the queue is shutting down or immediately after.
169+
func (pcs *persistentContiguousStorage) unrefClient(ctx context.Context) error {
170+
pcs.refClient--
171+
if pcs.refClient == 0 {
172+
return pcs.client.Close(ctx)
173+
}
174+
return nil
160175
}
161176

162177
// Offer inserts the specified element into this queue if it is possible to do so immediately
@@ -202,6 +217,11 @@ func (pcs *persistentContiguousStorage) getNextItem(ctx context.Context) QueueRe
202217
pcs.mu.Lock()
203218
defer pcs.mu.Unlock()
204219

220+
// If called in the same time with Shutdown, make sure client is not closed.
221+
if pcs.refClient <= 0 {
222+
return QueueRequest{}
223+
}
224+
205225
if pcs.readIndex == pcs.writeIndex {
206226
return QueueRequest{}
207227
}
@@ -232,12 +252,16 @@ func (pcs *persistentContiguousStorage) getNextItem(ctx context.Context) QueueRe
232252
}
233253

234254
// If all went well so far, cleanup will be handled by callback
255+
pcs.refClient++
235256
req.onProcessingFinishedFunc = func() {
236257
pcs.mu.Lock()
237258
defer pcs.mu.Unlock()
238259
if err := pcs.itemDispatchingFinish(ctx, index); err != nil {
239260
pcs.logger.Error("Error deleting item from queue", zap.Error(err))
240261
}
262+
if err = pcs.unrefClient(ctx); err != nil {
263+
pcs.logger.Error("Error closing the storage client", zap.Error(err))
264+
}
241265
}
242266
return req
243267
}

exporter/exporterhelper/internal/persistent_storage_test.go

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ func TestPersistentStorage_CorruptedData(t *testing.T) {
145145
assert.Equal(t, 3, ps.Size())
146146
_, _ = ps.get()
147147
assert.Equal(t, 2, ps.Size())
148-
assert.NoError(t, ps.stop(context.Background()))
148+
assert.NoError(t, ps.Shutdown(context.Background()))
149149

150150
// ... so now we can corrupt data (in several ways)
151151
if c.corruptAllData || c.corruptSomeData {
@@ -253,7 +253,7 @@ func TestPersistentStorage_StartWithNonDispatched(t *testing.T) {
253253
require.NoError(t, ps.Offer(context.Background(), req))
254254

255255
require.Equal(t, 5, ps.Size())
256-
assert.NoError(t, ps.stop(context.Background()))
256+
assert.NoError(t, ps.Shutdown(context.Background()))
257257

258258
// Reload
259259
newPs := createTestPersistentStorageWithCapacity(client, 5)
@@ -272,7 +272,7 @@ func TestPersistentStorage_PutCloseReadClose(t *testing.T) {
272272
assert.Equal(t, 2, ps.Size())
273273
// TODO: Remove this, after the initialization writes the readIndex.
274274
_, _ = ps.get()
275-
assert.NoError(t, ps.stop(context.Background()))
275+
assert.NoError(t, ps.Shutdown(context.Background()))
276276

277277
newPs := createTestPersistentStorage(createTestClient(t, ext))
278278
require.Equal(t, 2, newPs.Size())
@@ -286,7 +286,7 @@ func TestPersistentStorage_PutCloseReadClose(t *testing.T) {
286286
require.True(t, found)
287287
require.Equal(t, req.td, readReq.Request.(*fakeTracesRequest).td)
288288
require.Equal(t, 0, newPs.Size())
289-
assert.NoError(t, ps.stop(context.Background()))
289+
assert.NoError(t, newPs.Shutdown(context.Background()))
290290
}
291291

292292
func TestPersistentStorage_EmptyRequest(t *testing.T) {
@@ -399,16 +399,22 @@ func TestItemIndexArrayMarshaling(t *testing.T) {
399399
}
400400
}
401401

402-
func TestPersistentStorage_StopShouldCloseClient(t *testing.T) {
403-
ext := NewMockStorageExtension(nil)
404-
client := createTestClient(t, ext)
402+
func TestPersistentStorage_ShutdownWhileConsuming(t *testing.T) {
403+
client := createTestClient(t, NewMockStorageExtension(nil))
405404
ps := createTestPersistentStorage(client)
406405

407-
assert.NoError(t, ps.stop(context.Background()))
406+
assert.Equal(t, 0, ps.Size())
407+
assert.False(t, client.(*mockStorageClient).isClosed())
408+
409+
assert.NoError(t, ps.Offer(context.Background(), newFakeTracesRequest(newTraces(5, 10))))
408410

409-
castedClient, ok := client.(*mockStorageClient)
410-
require.True(t, ok, "expected client to be mockStorageClient")
411-
require.Equal(t, uint64(1), castedClient.getCloseCount())
411+
req, ok := ps.get()
412+
require.True(t, ok)
413+
assert.False(t, client.(*mockStorageClient).isClosed())
414+
assert.NoError(t, ps.Shutdown(context.Background()))
415+
assert.False(t, client.(*mockStorageClient).isClosed())
416+
req.OnProcessingFinished()
417+
assert.True(t, client.(*mockStorageClient).isClosed())
412418
}
413419

414420
func TestPersistentStorage_StorageFull(t *testing.T) {

0 commit comments

Comments
 (0)