Skip to content

Commit 130bc38

Browse files
authored
Merge pull request #49 from gatewayd-io/fix-concurrent-connection-handling
Fix concurrent connection handling
2 parents 32f6f02 + 514cd14 commit 130bc38

File tree

18 files changed

+363
-242
lines changed

18 files changed

+363
-242
lines changed

.github/workflows/test.yaml

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -92,13 +92,10 @@ jobs:
9292
run: |
9393
sudo apt-get update
9494
sudo apt-get install --yes --no-install-recommends postgresql-client
95-
psql -h ${PGHOST} -p ${PGPORT} -U ${PGUSER} -c "CREATE DATABASE gatewayd_test;"
96-
psql -h ${PGHOST} -p ${PGPORT} -U ${PGUSER} -d ${DBNAME} -c "CREATE TABLE test_table (id serial PRIMARY KEY, name varchar(255));"
97-
psql -h ${PGHOST} -p ${PGPORT} -U ${PGUSER} -d ${DBNAME} -c "INSERT INTO test_table (name) VALUES ('test');"
98-
psql -h ${PGHOST} -p ${PGPORT} -U ${PGUSER} -d ${DBNAME} -c "SELECT * FROM test_table;" | grep test
95+
psql ${PGURL1} -c "CREATE DATABASE gatewayd_test;"
96+
psql ${PGURL2} -c "CREATE TABLE test_table (id serial PRIMARY KEY, name varchar(255));"
97+
psql ${PGURL2} -c "INSERT INTO test_table (name) VALUES ('test');"
98+
psql ${PGURL2} -c "SELECT * FROM test_table;" | grep test || exit 1
9999
env:
100-
DBNAME: gatewayd_test
101-
PGUSER: postgres
102-
PGPASSWORD: postgres
103-
PGHOST: localhost
104-
PGPORT: 15432
100+
PGURL1: postgresql://postgres:postgres@localhost:15432/postgres
101+
PGURL2: postgresql://postgres:postgres@localhost:15432/gatewayd_test

client_test.py

Lines changed: 0 additions & 76 deletions
This file was deleted.

cmd/config_parser.go

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,10 @@ func proxyConfig() (bool, bool, *network.Client) {
131131
address := globalConfig.String(ref + ".address")
132132
receiveBufferSize := globalConfig.Int(ref + ".receiveBufferSize")
133133

134+
if receiveBufferSize <= 0 {
135+
receiveBufferSize = network.DefaultBufferSize
136+
}
137+
134138
return elastic, reuseElasticClients, &network.Client{
135139
Network: net,
136140
Address: address,
@@ -181,6 +185,26 @@ func getTCPNoDelay() gnet.TCPSocketOpt {
181185
}
182186

183187
func serverConfig() *ServerConfig {
188+
readBufferCap := globalConfig.Int("server.readBufferCap")
189+
if readBufferCap <= 0 {
190+
readBufferCap = network.DefaultBufferSize
191+
}
192+
193+
writeBufferCap := globalConfig.Int("server.writeBufferCap")
194+
if writeBufferCap <= 0 {
195+
writeBufferCap = network.DefaultBufferSize
196+
}
197+
198+
socketRecvBuffer := globalConfig.Int("server.socketRecvBuffer")
199+
if socketRecvBuffer <= 0 {
200+
socketRecvBuffer = network.DefaultBufferSize
201+
}
202+
203+
socketSendBuffer := globalConfig.Int("server.socketSendBuffer")
204+
if socketSendBuffer <= 0 {
205+
socketSendBuffer = network.DefaultBufferSize
206+
}
207+
184208
return &ServerConfig{
185209
Network: globalConfig.String("server.network"),
186210
Address: globalConfig.String("server.address"),
@@ -191,10 +215,10 @@ func serverConfig() *ServerConfig {
191215
MultiCore: globalConfig.Bool("server.multiCore"),
192216
LockOSThread: globalConfig.Bool("server.lockOSThread"),
193217
LoadBalancer: getLoadBalancer(globalConfig.String("server.loadBalancer")),
194-
ReadBufferCap: globalConfig.Int("server.readBufferCap"),
195-
WriteBufferCap: globalConfig.Int("server.writeBufferCap"),
196-
SocketRecvBuffer: globalConfig.Int("server.socketRecvBuffer"),
197-
SocketSendBuffer: globalConfig.Int("server.socketSendBuffer"),
218+
ReadBufferCap: readBufferCap,
219+
WriteBufferCap: writeBufferCap,
220+
SocketRecvBuffer: socketRecvBuffer,
221+
SocketSendBuffer: socketSendBuffer,
198222
ReuseAddress: globalConfig.Bool("server.reuseAddress"),
199223
ReusePort: globalConfig.Bool("server.reusePort"),
200224
TCPKeepAlive: globalConfig.Duration("server.tcpKeepAlive"),

cmd/run.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,8 +114,8 @@ var runCmd = &cobra.Command{
114114
}
115115

116116
// Create and initialize a pool of connections
117-
pool := pool.NewPool()
118117
poolSize, clientConfig := poolConfig()
118+
pool := pool.NewPool(poolSize)
119119

120120
// Add clients to the pool
121121
for i := 0; i < poolSize; i++ {
@@ -146,7 +146,10 @@ var runCmd = &cobra.Command{
146146
}
147147
}
148148

149-
pool.Put(client.ID, client)
149+
err = pool.Put(client.ID, client)
150+
if err != nil {
151+
logger.Error().Err(err).Msg("Failed to add client to the pool")
152+
}
150153
}
151154
}
152155

errors/errors.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,13 @@ var (
1010

1111
ErrPluginNotFound = errors.New("plugin not found")
1212
ErrPluginNotReady = errors.New("plugin is not ready")
13+
14+
ErrClientReceiveFailed = errors.New("couldn't receive data from the server")
15+
ErrClientSendFailed = errors.New("couldn't send data to the server")
16+
17+
ErrPutFailed = errors.New("failed to put in pool")
18+
19+
ErrCastFailed = errors.New("failed to cast")
1320
)
1421

1522
const (

gatewayd.yaml

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,14 @@ clients:
1515
client1:
1616
network: tcp
1717
address: localhost:5432
18-
receiveBufferSize: 4096
18+
# receiveBufferSize: 16777216
1919

2020
# Pool config
2121
pool:
2222
# Use the logger config passed here
2323
# i.e. don't assume it's the same as the logger config above
2424
logger: loggers.logger
25-
size: 2
25+
size: 10
2626
# Database configs for the connection pool
2727
client: clients.client1
2828

@@ -52,10 +52,10 @@ server:
5252
multiCore: True
5353
lockOSThread: False
5454
loadBalancer: roundrobin
55-
readBufferCap: 4096
56-
writeBufferCap: 4096
57-
socketRecvBuffer: 4096
58-
socketSendBuffer: 4096
55+
# readBufferCap: 16777216
56+
# writeBufferCap: 16777216
57+
# socketRecvBuffer: 16777216
58+
# socketSendBuffer: 16777216
5959
reuseAddress: True
6060
reusePort: True
6161
tcpKeepAlive: 3s # seconds

go.mod

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,19 +9,19 @@ require (
99
github.com/hashicorp/go-plugin v1.4.8
1010
github.com/knadh/koanf v1.4.4
1111
github.com/mitchellh/mapstructure v1.5.0
12-
github.com/panjf2000/gnet/v2 v2.2.0
12+
github.com/panjf2000/gnet/v2 v2.2.1
1313
github.com/rs/zerolog v1.28.0
1414
github.com/spf13/cobra v1.6.1
1515
github.com/stretchr/testify v1.8.1
16-
golang.org/x/exp v0.0.0-20221212164502-fae10dda9338
16+
golang.org/x/exp v0.0.0-20221217163422-3c43f8badb15
1717
google.golang.org/grpc v1.51.0
1818
google.golang.org/protobuf v1.28.1
1919
)
2020

2121
require (
2222
github.com/davecgh/go-spew v1.1.1 // indirect
2323
github.com/fatih/color v1.13.0 // indirect
24-
github.com/fsnotify/fsnotify v1.4.9 // indirect
24+
github.com/fsnotify/fsnotify v1.6.0 // indirect
2525
github.com/golang/protobuf v1.5.2 // indirect
2626
github.com/hashicorp/yamux v0.1.1 // indirect
2727
github.com/inconshreveable/mousetrap v1.1.0 // indirect
@@ -36,7 +36,7 @@ require (
3636
github.com/spf13/pflag v1.0.5 // indirect
3737
github.com/xi2/xz v0.0.0-20171230120015-48954b6210f8 // indirect
3838
go.uber.org/atomic v1.10.0 // indirect
39-
go.uber.org/multierr v1.8.0 // indirect
39+
go.uber.org/multierr v1.9.0 // indirect
4040
go.uber.org/zap v1.24.0 // indirect
4141
golang.org/x/net v0.4.0 // indirect
4242
golang.org/x/sys v0.3.0 // indirect

go.sum

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ github.com/fergusstrange/embedded-postgres v1.19.0 h1:NqDufJHeA03U7biULlPHZ0pZ10
5555
github.com/fergusstrange/embedded-postgres v1.19.0/go.mod h1:0B+3bPsMvcNgR9nN+bdM2x9YaNYDnf3ksUqYp1OAub0=
5656
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
5757
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
58+
github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY=
59+
github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw=
5860
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
5961
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
6062
github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
@@ -222,8 +224,11 @@ github.com/oklog/run v1.1.0 h1:GEenZ1cK0+q0+wsJew9qUg/DyD8k3JzYsZAi5gYi2mA=
222224
github.com/oklog/run v1.1.0/go.mod h1:sVPdnTZT1zYwAJeCMu2Th4T21pA3FPOQRfWjQlk7DVU=
223225
github.com/panjf2000/ants/v2 v2.4.8 h1:JgTbolX6K6RreZ4+bfctI0Ifs+3mrE5BIHudQxUDQ9k=
224226
github.com/panjf2000/ants/v2 v2.4.8/go.mod h1:f6F0NZVFsGCp5A7QW/Zj/m92atWwOkY0OIhFxRNFr4A=
227+
github.com/panjf2000/ants/v2 v2.7.0 h1:Y3Bgpfo9HDkBoHNVFbMfY5mAvi5TAA17y3HbzQ74p5Y=
225228
github.com/panjf2000/gnet/v2 v2.2.0 h1:+6itXhRlHJpv5UGAyN1DebHzK1l0GbZMOsg2Spb1VS0=
226229
github.com/panjf2000/gnet/v2 v2.2.0/go.mod h1:unWr2B4jF0DQPJH3GsXBGQiDcAamM6+Pf5FiK705kc4=
230+
github.com/panjf2000/gnet/v2 v2.2.1 h1:HJVK3vmD6rBgOeTnYkG4czW6jphVHygxLLWTEBU3nqU=
231+
github.com/panjf2000/gnet/v2 v2.2.1/go.mod h1:y8xWR1EEK6pGDuAQ6XULY/WWmPv0Pgbsq2Q4lbXJ6JA=
227232
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
228233
github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
229234
github.com/pelletier/go-toml v1.7.0 h1:7utD74fnzVc/cpcyy8sjrlFr5vYpypUixARcHIMIGuI=
@@ -304,6 +309,8 @@ go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9i
304309
go.uber.org/multierr v1.7.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak=
305310
go.uber.org/multierr v1.8.0 h1:dg6GjLku4EH+249NNmoIciG9N/jURbDG+pFlTkhzIC8=
306311
go.uber.org/multierr v1.8.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak=
312+
go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI=
313+
go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ=
307314
go.uber.org/zap v1.17.0/go.mod h1:MXVU+bhUf/A7Xi2HNOnopQOrmycQ5Ih87HtOu4q5SSo=
308315
go.uber.org/zap v1.21.0/go.mod h1:wjWOCqI0f2ZZrJF/UufIOkiC8ii6tm1iqIsLo76RfJw=
309316
go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60=
@@ -316,6 +323,10 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh
316323
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
317324
golang.org/x/exp v0.0.0-20221212164502-fae10dda9338 h1:OvjRkcNHnf6/W5FZXSxODbxwD+X7fspczG7Jn/xQVD4=
318325
golang.org/x/exp v0.0.0-20221212164502-fae10dda9338/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
326+
golang.org/x/exp v0.0.0-20221215174704-0915cd710c24 h1:6w3iSY8IIkp5OQtbYj8NeuKG1jS9d+kYaubXqsoOiQ8=
327+
golang.org/x/exp v0.0.0-20221215174704-0915cd710c24/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
328+
golang.org/x/exp v0.0.0-20221217163422-3c43f8badb15 h1:5oN1Pz/eDhCpbMbLstvIPa0b/BEQo6g6nwV3pLjfM6w=
329+
golang.org/x/exp v0.0.0-20221217163422-3c43f8badb15/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
319330
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
320331
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
321332
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
@@ -391,6 +402,7 @@ golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBc
391402
golang.org/x/sys v0.0.0-20220224120231-95c6836cb0e7/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
392403
golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
393404
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
405+
golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
394406
golang.org/x/sys v0.3.0 h1:w8ZOecv6NaNa/zC8944JTU3vz4u6Lagfk4RPQxv92NQ=
395407
golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
396408
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=

network/client.go

Lines changed: 40 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
package network
22

33
import (
4-
"fmt"
4+
"errors"
5+
"io"
56
"net"
67

8+
gerr "github.com/gatewayd-io/gatewayd/errors"
79
"github.com/rs/zerolog"
810
)
911

@@ -58,33 +60,37 @@ func NewClient(network, address string, receiveBufferSize int, logger zerolog.Lo
5860
}
5961

6062
client.Conn = conn
61-
if client.ReceiveBufferSize == 0 {
63+
if receiveBufferSize <= 0 {
6264
client.ReceiveBufferSize = DefaultBufferSize
65+
} else {
66+
client.ReceiveBufferSize = receiveBufferSize
6367
}
68+
6469
logger.Debug().Msgf("New client created: %s", client.Address)
6570
client.ID = GetID(conn.LocalAddr().Network(), conn.LocalAddr().String(), DefaultSeed, logger)
6671

6772
return &client
6873
}
6974

70-
func (c *Client) Send(data []byte) error {
71-
if _, err := c.Write(data); err != nil {
75+
func (c *Client) Send(data []byte) (int, error) {
76+
sent, err := c.Conn.Write(data)
77+
if err != nil {
7278
c.logger.Error().Err(err).Msgf("Couldn't send data to the server: %s", err)
73-
return fmt.Errorf("couldn't send data to the server: %w", err)
79+
// TODO: Wrap the original error
80+
return 0, gerr.ErrClientSendFailed
7481
}
7582
c.logger.Debug().Msgf("Sent %d bytes to %s", len(data), c.Address)
76-
return nil
83+
return sent, nil
7784
}
7885

7986
func (c *Client) Receive() (int, []byte, error) {
8087
buf := make([]byte, c.ReceiveBufferSize)
81-
read, err := c.Read(buf)
82-
if err != nil {
83-
c.logger.Error().Err(err).Msgf("Couldn't receive data from the server: %s", err)
84-
return 0, nil, fmt.Errorf("couldn't receive data from the server: %w", err)
88+
received, err := c.Conn.Read(buf)
89+
if err != nil && errors.Is(err, io.EOF) {
90+
c.logger.Error().Err(err).Msg("Couldn't receive data from the server")
91+
return 0, nil, err //nolint:wrapcheck
8592
}
86-
c.logger.Debug().Msgf("Received %d bytes from %s", read, c.Address)
87-
return read, buf, nil
93+
return received, buf, err //nolint:wrapcheck
8894
}
8995

9096
func (c *Client) Close() {
@@ -96,5 +102,26 @@ func (c *Client) Close() {
96102
c.Conn = nil
97103
c.Address = ""
98104
c.Network = ""
99-
c.ReceiveBufferSize = 0
105+
}
106+
107+
// Go returns io.EOF when the server closes the connection.
108+
// So, if I read 0 bytes and the error is io.EOF or net.ErrClosed, I should reconnect.
109+
func (c *Client) IsConnected() bool {
110+
if c == nil {
111+
return false
112+
}
113+
114+
if c != nil && c.Conn == nil || c.ID == "" {
115+
c.Close()
116+
return false
117+
}
118+
119+
buf := make([]byte, 0)
120+
if _, err := c.Read(buf); errors.Is(err, net.ErrClosed) || errors.Is(err, io.EOF) {
121+
c.logger.Debug().Msgf("Connection to %s is closed", c.Address)
122+
c.Close()
123+
return false
124+
}
125+
126+
return true
100127
}

0 commit comments

Comments
 (0)