diff --git a/CHANGELOG.md b/CHANGELOG.md index c449d98ea..6f025e122 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -46,6 +46,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release. ### Fixed - Flaky decimal/TestSelect (#300) +- Race condition at roundRobinStrategy.GetNextConnection() (#309) ## [1.12.0] - 2023-06-07 diff --git a/pool/connection_pool_test.go b/pool/connection_pool_test.go index c34e71673..dd0210d62 100644 --- a/pool/connection_pool_test.go +++ b/pool/connection_pool_test.go @@ -2231,6 +2231,35 @@ func TestDo(t *testing.T) { require.NotNilf(t, resp, "response is nil after Ping") } +func TestDo_concurrent(t *testing.T) { + roles := []bool{true, true, false, true, false} + + err := test_helpers.SetClusterRO(servers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + + connPool, err := pool.Connect(servers, connOpts) + require.Nilf(t, err, "failed to connect") + require.NotNilf(t, connPool, "conn is nil after Connect") + + defer connPool.Close() + + req := tarantool.NewPingRequest() + const concurrency = 100 + var wg sync.WaitGroup + wg.Add(concurrency) + + for i := 0; i < concurrency; i++ { + go func() { + defer wg.Done() + + _, err := connPool.Do(req, pool.ANY).Get() + assert.Nil(t, err) + }() + } + + wg.Wait() +} + func TestNewPrepared(t *testing.T) { test_helpers.SkipIfSQLUnsupported(t) diff --git a/pool/round_robin.go b/pool/round_robin.go index 1c50b97d9..c3400d371 100644 --- a/pool/round_robin.go +++ b/pool/round_robin.go @@ -2,6 +2,7 @@ package pool import ( "sync" + "sync/atomic" "github.com/tarantool/go-tarantool/v2" ) @@ -10,8 +11,8 @@ type roundRobinStrategy struct { conns []*tarantool.Connection indexByAddr map[string]uint mutex sync.RWMutex - size uint - current uint + size uint64 + current uint64 } func newRoundRobinStrategy(size int) *roundRobinStrategy { @@ -98,13 +99,12 @@ func (r *roundRobinStrategy) AddConn(addr string, conn *tarantool.Connection) { r.conns[idx] = conn } else { r.conns = append(r.conns, conn) - r.indexByAddr[addr] = r.size + r.indexByAddr[addr] = uint(r.size) r.size += 1 } } -func (r *roundRobinStrategy) nextIndex() uint { - ret := r.current % r.size - r.current++ - return ret +func (r *roundRobinStrategy) nextIndex() uint64 { + next := atomic.AddUint64(&r.current, 1) + return (next - 1) % r.size }