Skip to content

Commit 12d99e0

Browse files
authored
Improve handling of concurrent identical queries (#146)
* fix(concurrent-queries): improve handling of identical queries * feat(transactionRegistry): created, completed, failed states * Concurrent test suites to verify transactions behaviour. Inmemory transaction registry fixes. * fix(proxy_test): queue overflow test * fix(review): remove ErrMissingTransaction. remove max exec time/2 check after awaiting concurrent transaction. * fix(review): use max execution time as default * fix(review): deprecation in README. Simplify transaction output.
1 parent 88c790f commit 12d99e0

20 files changed

+476
-181
lines changed

cache/async_cache.go

Lines changed: 29 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,17 @@ import (
44
"fmt"
55
"time"
66

7+
"github.com/contentsquare/chproxy/log"
8+
79
"github.com/contentsquare/chproxy/clients"
810
"github.com/contentsquare/chproxy/config"
911
"github.com/go-redis/redis/v8"
1012
)
1113

12-
// AsyncCache is a transactional cache allowing the results from concurrent queries.
13-
// When query A is equal to query B and A arrives no more than defined graceTime, query A will await for the results of query B for the max time equal to:
14-
// graceTime - (arrivalB - arrivalA)
14+
// AsyncCache is a transactional cache enabled to serve the results from concurrent queries.
15+
// When query A and B are equal, and query B arrives after query A with no more than defined deadline interval [[graceTime]],
16+
// query B will await for the results of query B for the max time equal to:
17+
// max_awaiting_time = graceTime - (arrivalB - arrivalA)
1518
type AsyncCache struct {
1619
Cache
1720
TransactionRegistry
@@ -29,22 +32,28 @@ func (c *AsyncCache) Close() error {
2932
return nil
3033
}
3134

32-
func (c *AsyncCache) AwaitForConcurrentTransaction(key *Key) bool {
35+
func (c *AsyncCache) AwaitForConcurrentTransaction(key *Key) (TransactionState, error) {
3336
startTime := time.Now()
34-
37+
seenState := transactionAbsent
3538
for {
36-
if time.Since(startTime) > c.graceTime {
37-
// The entry didn't appear during graceTime.
39+
elapsedTime := time.Since(startTime)
40+
if elapsedTime > c.graceTime {
41+
// The entry didn't appear during deadline.
3842
// Let the caller creating it.
39-
return false
43+
return seenState, nil
44+
}
45+
46+
state, err := c.TransactionRegistry.Status(key)
47+
48+
if err != nil {
49+
return seenState, err
4050
}
4151

42-
ok := c.TransactionRegistry.IsDone(key)
43-
if ok {
44-
return ok
52+
if !state.IsPending() {
53+
return state, nil
4554
}
4655

47-
// Wait for graceTime in the hope the entry will appear
56+
// Wait for deadline in the hope the entry will appear
4857
// in the cache.
4958
//
5059
// This should protect from thundering herd problem when
@@ -57,11 +66,14 @@ func (c *AsyncCache) AwaitForConcurrentTransaction(key *Key) bool {
5766
}
5867
}
5968

60-
func NewAsyncCache(cfg config.Cache) (*AsyncCache, error) {
69+
func NewAsyncCache(cfg config.Cache, maxExecutionTime time.Duration) (*AsyncCache, error) {
6170
graceTime := time.Duration(cfg.GraceTime)
71+
if graceTime > 0 {
72+
log.Errorf("[DEPRECATED] detected grace time configuration %s. It will be removed in the new version", graceTime)
73+
}
6274
if graceTime == 0 {
6375
// Default grace time.
64-
graceTime = 5 * time.Second
76+
graceTime = maxExecutionTime
6577
}
6678
if graceTime < 0 {
6779
// Disable protection from `dogpile effect`.
@@ -71,11 +83,13 @@ func NewAsyncCache(cfg config.Cache) (*AsyncCache, error) {
7183
var cache Cache
7284
var transaction TransactionRegistry
7385
var err error
86+
// transaction will be kept until we're sure there's no possible concurrent query running
87+
transactionDeadline := 2 * graceTime
7488

7589
switch cfg.Mode {
7690
case "file_system":
7791
cache, err = newFilesSystemCache(cfg, graceTime)
78-
transaction = newInMemoryTransactionRegistry(graceTime)
92+
transaction = newInMemoryTransactionRegistry(transactionDeadline)
7993
case "redis":
8094
var redisClient redis.UniversalClient
8195
redisClient, err = clients.NewRedisClient(cfg.Redis)

cache/async_cache_test.go

Lines changed: 97 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package cache
22

33
import (
4+
"github.com/stretchr/testify/assert"
45
"log"
56
"os"
67
"testing"
@@ -15,104 +16,169 @@ const asyncTestDir = "./async-test-data"
1516
func TestAsyncCache_Cleanup_Of_Expired_Transactions(t *testing.T) {
1617
graceTime := 100 * time.Millisecond
1718
asyncCache := newAsyncTestCache(t, graceTime)
19+
defer func() {
20+
asyncCache.Close()
21+
os.RemoveAll(asyncTestDir)
22+
}()
1823

1924
key := &Key{
2025
Query: []byte("SELECT async cache"),
2126
}
22-
23-
if done := asyncCache.IsDone(key); !done {
27+
status, err := asyncCache.Status(key)
28+
assert.NoError(t, err)
29+
if !status.IsAbsent() {
2430
t.Fatalf("unexpected behaviour: transaction isnt done while it wasnt even started")
2531
}
2632

27-
if err := asyncCache.Register(key); err != nil {
33+
if err := asyncCache.Create(key); err != nil {
2834
t.Fatalf("unexpected error: %s failed to register transaction", err)
2935
}
3036

31-
if done := asyncCache.IsDone(key); done {
37+
status, err = asyncCache.Status(key)
38+
assert.NoError(t, err)
39+
if !status.IsPending() {
3240
t.Fatalf("unexpected behaviour: transaction isnt finished")
3341
}
3442

3543
time.Sleep(graceTime * 2)
3644

37-
if done := asyncCache.IsDone(key); !done {
45+
status, err = asyncCache.Status(key)
46+
assert.NoError(t, err)
47+
if status.IsPending() {
3848
t.Fatalf("unexpected behaviour: transaction grace time elapsed and yet it was still pending")
3949
}
40-
41-
asyncCache.Close()
42-
os.RemoveAll(asyncTestDir)
4350
}
4451

4552
func TestAsyncCache_AwaitForConcurrentTransaction_GraceTimeWithoutTransactionCompletion(t *testing.T) {
46-
graceTime := 300 * time.Millisecond
53+
graceTime := 100 * time.Millisecond
4754
asyncCache := newAsyncTestCache(t, graceTime)
4855

56+
defer func() {
57+
asyncCache.Close()
58+
os.RemoveAll(asyncTestDir)
59+
}()
60+
4961
key := &Key{
5062
Query: []byte("SELECT async cache AwaitForConcurrentTransaction"),
5163
}
5264

53-
if done := asyncCache.IsDone(key); !done {
65+
status, err := asyncCache.Status(key)
66+
assert.NoError(t, err)
67+
if !status.IsAbsent() {
5468
t.Fatalf("unexpected behaviour: transaction isnt done while it wasnt even started")
5569
}
5670

57-
if err := asyncCache.Register(key); err != nil {
71+
if err := asyncCache.Create(key); err != nil {
5872
t.Fatalf("unexpected error: %s failed to register transaction", err)
5973
}
6074

61-
if done := asyncCache.IsDone(key); done {
75+
status, err = asyncCache.Status(key)
76+
assert.NoError(t, err)
77+
if !status.IsPending() {
6278
t.Fatalf("unexpected behaviour: transaction isnt finished")
6379
}
6480

6581
startTime := time.Now()
66-
done := asyncCache.AwaitForConcurrentTransaction(key)
82+
_, err = asyncCache.AwaitForConcurrentTransaction(key)
83+
assert.NoError(t, err)
6784
elapsedTime := time.Since(startTime)
6885

6986
// in order to let the cleaner swipe the transaction
70-
time.Sleep(100 * time.Millisecond)
71-
if done == asyncCache.IsDone(key) && done {
87+
time.Sleep(150 * time.Millisecond)
88+
status, err = asyncCache.Status(key)
89+
assert.NoError(t, err)
90+
if !status.IsAbsent() {
7291
t.Fatalf("unexpected behaviour: transaction awaiting time elapsed %s", elapsedTime.String())
7392
}
74-
75-
asyncCache.Close()
76-
os.RemoveAll(asyncTestDir)
7793
}
7894

7995
func TestAsyncCache_AwaitForConcurrentTransaction_TransactionCompletedWhileAwaiting(t *testing.T) {
8096
graceTime := 300 * time.Millisecond
8197
asyncCache := newAsyncTestCache(t, graceTime)
8298

99+
defer func() {
100+
asyncCache.Close()
101+
os.RemoveAll(asyncTestDir)
102+
}()
83103
key := &Key{
84104
Query: []byte("SELECT async cache AwaitForConcurrentTransactionCompleted"),
85105
}
86106

87-
if err := asyncCache.Register(key); err != nil {
107+
if err := asyncCache.Create(key); err != nil {
88108
t.Fatalf("unexpected error: %s failed to register transaction", err)
89109
}
90110

91111
errs := make(chan error)
92112
go func() {
93113
time.Sleep(graceTime / 2)
94-
if err := asyncCache.Unregister(key); err != nil {
114+
if err := asyncCache.Complete(key); err != nil {
95115
errs <- err
96116
} else {
97117
errs <- nil
98118
}
99119
}()
100120

101121
startTime := time.Now()
102-
done := asyncCache.AwaitForConcurrentTransaction(key)
122+
transactionState, err := asyncCache.AwaitForConcurrentTransaction(key)
123+
if err != nil {
124+
t.Fatalf("unexpected error: %s failed to unregister transaction", err)
125+
}
126+
103127
elapsedTime := time.Since(startTime)
104128

105-
err := <-errs
129+
err = <-errs
106130
if err != nil {
107131
t.Fatalf("unexpected error: %s failed to unregister transaction", err)
108132
}
109133

110-
if done != asyncCache.IsDone(key) || !done || elapsedTime >= graceTime {
134+
if !transactionState.IsCompleted() || elapsedTime >= graceTime {
111135
t.Fatalf("unexpected behaviour: transaction awaiting time elapsed %s", elapsedTime.String())
112136
}
137+
}
138+
139+
func TestAsyncCache_AwaitForConcurrentTransaction_TransactionFailedWhileAwaiting(t *testing.T) {
140+
graceTime := 300 * time.Millisecond
141+
asyncCache := newAsyncTestCache(t, graceTime)
142+
143+
defer func() {
144+
asyncCache.Close()
145+
os.RemoveAll(asyncTestDir)
146+
}()
147+
148+
key := &Key{
149+
Query: []byte("SELECT async cache AwaitForConcurrentTransactionCompleted"),
150+
}
151+
152+
if err := asyncCache.Create(key); err != nil {
153+
t.Fatalf("unexpected error: %s failed to register transaction", err)
154+
}
155+
156+
errs := make(chan error)
157+
go func() {
158+
time.Sleep(graceTime / 2)
159+
if err := asyncCache.Fail(key); err != nil {
160+
errs <- err
161+
} else {
162+
errs <- nil
163+
}
164+
}()
165+
166+
startTime := time.Now()
167+
transactionState, err := asyncCache.AwaitForConcurrentTransaction(key)
168+
if err != nil {
169+
t.Fatalf("unexpected error: %s failed to unregister transaction", err)
170+
}
171+
172+
elapsedTime := time.Since(startTime)
173+
174+
err = <-errs
175+
if err != nil {
176+
t.Fatalf("unexpected error: %s failed to unregister transaction", err)
177+
}
113178

114-
asyncCache.Close()
115-
os.RemoveAll(asyncTestDir)
179+
if !transactionState.IsFailed() || elapsedTime >= graceTime {
180+
t.Fatalf("unexpected behaviour: transaction awaiting time elapsed %s", elapsedTime.String())
181+
}
116182
}
117183

118184
func newAsyncTestCache(t *testing.T, graceTime time.Duration) *AsyncCache {
@@ -152,7 +218,7 @@ func TestAsyncCache_FilesystemCache_instantiation(t *testing.T) {
152218
if err := os.RemoveAll(testDirAsync); err != nil {
153219
log.Fatalf("cannot remove %q: %s", testDirAsync, err)
154220
}
155-
_, err := NewAsyncCache(fileSystemCfg)
221+
_, err := NewAsyncCache(fileSystemCfg, 1*time.Second)
156222
if err != nil {
157223
t.Fatalf("could not instanciate filsystem async cache because of the following error: %s", err)
158224
}
@@ -168,7 +234,7 @@ func TestAsyncCache_FilesystemCache_wrong_instantiation(t *testing.T) {
168234
},
169235
Expire: config.Duration(time.Minute),
170236
}
171-
_, err := NewAsyncCache(fileSystemCfg)
237+
_, err := NewAsyncCache(fileSystemCfg, 1*time.Second)
172238
if err == nil {
173239
t.Fatalf("the instanciate of filsystem async cache should have crashed")
174240
}
@@ -185,7 +251,7 @@ func TestAsyncCache_RedisCache_instantiation(t *testing.T) {
185251
Expire: config.Duration(cacheTTL),
186252
}
187253

188-
_, err := NewAsyncCache(redisCfg)
254+
_, err := NewAsyncCache(redisCfg, 1*time.Second)
189255
if err != nil {
190256
t.Fatalf("could not instanciate redis async cache because of the following error: %s", err)
191257
}
@@ -201,21 +267,21 @@ func TestAsyncCache_RedisCache_wrong_instantiation(t *testing.T) {
201267
},
202268
}
203269

204-
_, err := NewAsyncCache(redisCfg)
270+
_, err := NewAsyncCache(redisCfg, 1*time.Second)
205271
if err == nil {
206272
t.Fatalf("the redis instanciation should have crashed")
207273
}
208274
}
209275

210-
func TestAsyncCache_Unkown_instantiation(t *testing.T) {
276+
func TestAsyncCache_Unknown_instantiation(t *testing.T) {
211277
var redisCfg = config.Cache{
212278
Name: "test",
213279
Mode: "Unkown Mode",
214280
Redis: config.RedisCacheConfig{},
215281
Expire: config.Duration(cacheTTL),
216282
}
217283

218-
_, err := NewAsyncCache(redisCfg)
284+
_, err := NewAsyncCache(redisCfg, 1*time.Second)
219285
if err == nil {
220286
t.Fatalf("The instanciation should have crash")
221287
}

cache/filesystem_cache.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ func (f *fileSystemCache) Get(key *Key) (*CachedData, error) {
115115
return nil, ErrMissing
116116
}
117117
// Serve expired file in the hope it will be substituted
118-
// with the fresh file during graceTime.
118+
// with the fresh file during deadline.
119119
}
120120

121121
b, err := ioutil.ReadAll(file)
@@ -242,7 +242,7 @@ func (f *fileSystemCache) clean() {
242242

243243
log.Debugf("cache %q: start cleaning dir %q", f.Name(), f.dir)
244244

245-
// Remove cached files after a graceTime from their expiration,
245+
// Remove cached files after a deadline from their expiration,
246246
// so they may be served until they are substituted with fresh files.
247247
expire := f.expire + f.grace
248248

cache/redis_cache.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ func (r *redisCache) Get(key *Key) (*CachedData, error) {
9797
val, err := r.client.Get(ctx, key.String()).Result()
9898

9999
// if key not found in cache
100-
if errors.Is(err, redis.Nil) || val == pendingTransactionVal {
100+
if errors.Is(err, redis.Nil) {
101101
return nil, ErrMissing
102102
}
103103

0 commit comments

Comments
 (0)