Skip to content

Commit 9839974

Browse files
[release-23.0] Change connection pool idle expiration logic (#19004) (#19014)
Signed-off-by: Arthur Schreiber <[email protected]> Co-authored-by: vitess-bot[bot] <108069721+vitess-bot[bot]@users.noreply.github.com>
1 parent 30a3d9c commit 9839974

File tree

2 files changed

+153
-29
lines changed

2 files changed

+153
-29
lines changed

go/pools/smartconnpool/pool.go

Lines changed: 63 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -781,35 +781,74 @@ func (pool *ConnPool[C]) closeIdleResources(now time.Time) {
781781
mono := monotonicFromTime(now)
782782

783783
closeInStack := func(s *connStack[C]) {
784-
// Do a read-only best effort iteration of all the connection in this
785-
// stack and atomically attempt to mark them as expired.
786-
// Any connections that are marked as expired are _not_ removed from
787-
// the stack; it's generally unsafe to remove nodes from the stack
788-
// besides the head. When clients pop from the stack, they'll immediately
789-
// notice the expired connection and ignore it.
790-
// see: timestamp.expired
791-
for conn := s.Peek(); conn != nil; conn = conn.next.Load() {
792-
if conn.timeUsed.expired(mono, timeout) {
793-
pool.Metrics.idleClosed.Add(1)
784+
conn, ok := s.Pop()
785+
if !ok {
786+
// Early return to skip allocating slices when the stack is empty
787+
return
788+
}
794789

795-
conn.Close()
796-
pool.closedConn()
790+
activeConnections := pool.Active()
791+
792+
// Only expire up to ~half of the active connections at a time. This should
793+
// prevent us from closing too many connections in one go which could lead to
794+
// a lot of `.Get` calls being added to the waitlist if there's a sudden spike
795+
// coming in _after_ connections were popped off the stack but _before_ being
796+
// returned back to the pool. This is unlikely to happen, but better safe than sorry.
797+
//
798+
// We always expire at least one connection per stack per iteration to ensure
799+
// that idle connections are eventually closed even in small pools.
800+
//
801+
// We will expire any additional connections in the next iteration of the idle closer.
802+
expiredConnections := make([]*Pooled[C], 0, max(activeConnections/2, 1))
803+
validConnections := make([]*Pooled[C], 0, activeConnections)
804+
805+
// Pop out connections from the stack until we get a `nil` connection
806+
for ok {
807+
if conn.timeUsed.expired(mono, timeout) {
808+
expiredConnections = append(expiredConnections, conn)
797809

798-
// Using context.Background() is fine since MySQL connection already enforces
799-
// a connect timeout via the `db-connect-timeout-ms` config param.
800-
c, err := pool.getNew(context.Background())
801-
if err != nil {
802-
// If we couldn't open a new connection, just continue
803-
continue
810+
if len(expiredConnections) == cap(expiredConnections) {
811+
// We have collected enough connections for this iteration to expire
812+
break
804813
}
814+
} else {
815+
validConnections = append(validConnections, conn)
816+
}
805817

806-
// opening a new connection might have raced with other goroutines,
807-
// so it's possible that we got back `nil` here
808-
if c != nil {
809-
// Return the new connection to the pool
810-
pool.tryReturnConn(c)
811-
}
818+
conn, ok = s.Pop()
819+
}
820+
821+
// Return all the valid connections back to waiters or the stack
822+
//
823+
// The order here is not important - because we can't guarantee to
824+
// restore the order we got the connections out of the stack anyway.
825+
//
826+
// If we return the connections in the order popped off the stack:
827+
// * waiters will get the newest connection first
828+
// * stack will have the oldest connections at the top of the stack.
829+
//
830+
// If we return the connections in reverse order:
831+
// * waiters will get the oldest connection first
832+
// * stack will have the newest connections at the top of the stack.
833+
//
834+
// Neither of these is better or worse than the other.
835+
for _, conn := range validConnections {
836+
pool.tryReturnConn(conn)
837+
}
838+
839+
// Close all the expired connections and open new ones to replace them
840+
for _, conn := range expiredConnections {
841+
pool.Metrics.idleClosed.Add(1)
842+
843+
conn.Close()
844+
845+
err := pool.connReopen(context.Background(), conn, mono)
846+
if err != nil {
847+
pool.closedConn()
848+
continue
812849
}
850+
851+
pool.tryReturnConn(conn)
813852
}
814853
}
815854

go/pools/smartconnpool/pool_test.go

Lines changed: 90 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1365,16 +1365,22 @@ func TestIdleTimeoutConnectionLeak(t *testing.T) {
13651365

13661366
// Try to get connections while they're being reopened
13671367
// This should trigger the bug where connections get discarded
1368+
wg := sync.WaitGroup{}
1369+
13681370
for i := 0; i < 2; i++ {
1369-
getCtx, cancel := context.WithTimeout(t.Context(), 50*time.Millisecond)
1370-
defer cancel()
1371+
wg.Go(func() {
1372+
getCtx, cancel := context.WithTimeout(t.Context(), 300*time.Millisecond)
1373+
defer cancel()
13711374

1372-
conn, err := p.Get(getCtx, nil)
1373-
require.NoError(t, err)
1375+
conn, err := p.Get(getCtx, nil)
1376+
require.NoError(t, err)
13741377

1375-
p.put(conn)
1378+
p.put(conn)
1379+
})
13761380
}
13771381

1382+
wg.Wait()
1383+
13781384
// Wait a moment for all reopening to complete
13791385
require.EventuallyWithT(t, func(c *assert.CollectT) {
13801386
// Check the actual number of currently open connections
@@ -1405,3 +1411,82 @@ func TestIdleTimeoutConnectionLeak(t *testing.T) {
14051411
assert.Equal(t, int64(0), state.open.Load())
14061412
assert.Equal(t, int64(4), state.close.Load())
14071413
}
1414+
1415+
func TestIdleTimeoutDoesntLeaveLingeringConnection(t *testing.T) {
1416+
var state TestState
1417+
1418+
ctx := context.Background()
1419+
p := NewPool(&Config[*TestConn]{
1420+
Capacity: 10,
1421+
IdleTimeout: 50 * time.Millisecond,
1422+
LogWait: state.LogWait,
1423+
}).Open(newConnector(&state), nil)
1424+
1425+
defer p.Close()
1426+
1427+
var conns []*Pooled[*TestConn]
1428+
for i := 0; i < 10; i++ {
1429+
conn, err := p.Get(ctx, nil)
1430+
require.NoError(t, err)
1431+
conns = append(conns, conn)
1432+
}
1433+
1434+
for _, conn := range conns {
1435+
p.put(conn)
1436+
}
1437+
1438+
require.EqualValues(t, 10, p.Active())
1439+
require.EqualValues(t, 10, p.Available())
1440+
1441+
// Wait a bit for the idle timeout worker to refresh connections
1442+
assert.Eventually(t, func() bool {
1443+
return p.Metrics.IdleClosed() > 10
1444+
}, 500*time.Millisecond, 10*time.Millisecond, "Expected at least 10 connections to be closed by idle timeout")
1445+
1446+
// Verify that new connections were created to replace the closed ones
1447+
require.EqualValues(t, 10, p.Active())
1448+
require.EqualValues(t, 10, p.Available())
1449+
1450+
// Count how many connections in the stack are closed
1451+
totalInStack := 0
1452+
for conn := p.clean.Peek(); conn != nil; conn = conn.next.Load() {
1453+
totalInStack++
1454+
}
1455+
1456+
require.LessOrEqual(t, totalInStack, 10)
1457+
}
1458+
1459+
func BenchmarkPoolCleanupIdleConnectionsPerformanceNoIdleConnections(b *testing.B) {
1460+
var state TestState
1461+
1462+
capacity := 1000
1463+
1464+
p := NewPool(&Config[*TestConn]{
1465+
Capacity: int64(capacity),
1466+
IdleTimeout: 30 * time.Second,
1467+
LogWait: state.LogWait,
1468+
}).Open(newConnector(&state), nil)
1469+
defer p.Close()
1470+
1471+
// Fill the pool
1472+
connections := make([]*Pooled[*TestConn], 0, capacity)
1473+
for range capacity {
1474+
conn, err := p.Get(context.Background(), nil)
1475+
if err != nil {
1476+
b.Fatal(err)
1477+
}
1478+
1479+
connections = append(connections, conn)
1480+
}
1481+
1482+
// Return all connections to the pool
1483+
for _, conn := range connections {
1484+
conn.Recycle()
1485+
}
1486+
1487+
b.ResetTimer()
1488+
1489+
for b.Loop() {
1490+
p.closeIdleResources(time.Now())
1491+
}
1492+
}

0 commit comments

Comments
 (0)