Skip to content

Commit 73bdd51

Browse files
authored
chore: Refactor host in scope and seperate to new package topology (#356)
* chore: Refactor host in scope and seperate to new package topology *Motivation:* Right now a lot of the code base in chproxy resides in a shared main package. This has lead to a lot of coupled code and a code base that is very hard to read. This PR is a continuation of the work started with the heartbeat to move code away from the main package, decouple the code and improve readability. *Additions and Changes:* - Create a new `Node` struct in the new `topology` package that exposes methods from the previous `host` struct. However, it doesn't expose internal state. - Improve the Node code by using more modern constructs such as `atomic.Bool`. - Update the scope package and every usage of `host` with `topology.Node`. - Include a new test case for `Node.StartHeartbeat` *Notes:* Due to the coupled nature of the code around scope, I didn't see an opportunity to do this incrementally. The PR will sadly be large and hard to review. Signed-off-by: Lennard Eijsackers <[email protected]> * chore: Resolve comments on PR --------- Signed-off-by: Lennard Eijsackers <[email protected]>
1 parent 466b4bf commit 73bdd51

File tree

11 files changed

+500
-304
lines changed

11 files changed

+500
-304
lines changed

internal/counter/counter.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package counter
2+
3+
import "sync/atomic"
4+
5+
type Counter struct {
6+
value atomic.Uint32
7+
}
8+
9+
func (c *Counter) Store(n uint32) { c.value.Store(n) }
10+
11+
func (c *Counter) Load() uint32 { return c.value.Load() }
12+
13+
func (c *Counter) Dec() { c.value.Add(^uint32(0)) }
14+
15+
func (c *Counter) Inc() uint32 { return c.value.Add(1) }

internal/topology/main_test.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package topology
2+
3+
import (
4+
"os"
5+
"testing"
6+
7+
"github.com/contentsquare/chproxy/config"
8+
)
9+
10+
func TestMain(m *testing.M) {
11+
cfg := &config.Config{
12+
Server: config.Server{
13+
Metrics: config.Metrics{
14+
Namespace: "test",
15+
},
16+
},
17+
}
18+
19+
// Metrics should be preregistered to avoid nil-panics.
20+
RegisterMetrics(cfg)
21+
code := m.Run()
22+
os.Exit(code)
23+
}

internal/topology/metrics.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package topology
2+
3+
// TODO this is only here to avoid recursive imports. We should have a separate package for metrics.
4+
import (
5+
"github.com/contentsquare/chproxy/config"
6+
"github.com/prometheus/client_golang/prometheus"
7+
)
8+
9+
var (
10+
HostHealth *prometheus.GaugeVec
11+
HostPenalties *prometheus.CounterVec
12+
)
13+
14+
func initMetrics(cfg *config.Config) {
15+
namespace := cfg.Server.Metrics.Namespace
16+
HostHealth = prometheus.NewGaugeVec(
17+
prometheus.GaugeOpts{
18+
Namespace: namespace,
19+
Name: "host_health",
20+
Help: "Health state of hosts by clusters",
21+
},
22+
[]string{"cluster", "replica", "cluster_node"},
23+
)
24+
HostPenalties = prometheus.NewCounterVec(
25+
prometheus.CounterOpts{
26+
Namespace: namespace,
27+
Name: "host_penalties_total",
28+
Help: "Total number of given penalties by host",
29+
},
30+
[]string{"cluster", "replica", "cluster_node"},
31+
)
32+
}
33+
34+
func RegisterMetrics(cfg *config.Config) {
35+
initMetrics(cfg)
36+
prometheus.MustRegister(HostHealth, HostPenalties)
37+
}
38+
39+
func reportNodeHealthMetric(clusterName, replicaName, nodeName string, active bool) {
40+
label := prometheus.Labels{
41+
"cluster": clusterName,
42+
"replica": replicaName,
43+
"cluster_node": nodeName,
44+
}
45+
46+
if active {
47+
HostHealth.With(label).Set(1)
48+
} else {
49+
HostHealth.With(label).Set(0)
50+
}
51+
}
52+
53+
func incrementPenaltiesMetric(clusterName, replicaName, nodeName string) {
54+
label := prometheus.Labels{
55+
"cluster": clusterName,
56+
"replica": replicaName,
57+
"cluster_node": nodeName,
58+
}
59+
60+
HostPenalties.With(label).Inc()
61+
}

internal/topology/node.go

Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
package topology
2+
3+
import (
4+
"context"
5+
"net/url"
6+
"sync/atomic"
7+
"time"
8+
9+
"github.com/contentsquare/chproxy/internal/counter"
10+
"github.com/contentsquare/chproxy/internal/heartbeat"
11+
"github.com/contentsquare/chproxy/log"
12+
)
13+
14+
const (
15+
// prevents excess goroutine creating while penalizing overloaded host
16+
DefaultPenaltySize = 5
17+
DefaultMaxSize = 300
18+
DefaultPenaltyDuration = time.Second * 10
19+
)
20+
21+
type nodeOpts struct {
22+
defaultActive bool
23+
penaltySize uint32
24+
penaltyMaxSize uint32
25+
penaltyDuration time.Duration
26+
}
27+
28+
func defaultNodeOpts() nodeOpts {
29+
return nodeOpts{
30+
penaltySize: DefaultPenaltySize,
31+
penaltyMaxSize: DefaultMaxSize,
32+
penaltyDuration: DefaultPenaltyDuration,
33+
}
34+
}
35+
36+
type NodeOption interface {
37+
apply(*nodeOpts)
38+
}
39+
40+
type defaultActive struct {
41+
active bool
42+
}
43+
44+
func (o defaultActive) apply(opts *nodeOpts) {
45+
opts.defaultActive = o.active
46+
}
47+
48+
func WithDefaultActiveState(active bool) NodeOption {
49+
return defaultActive{
50+
active: active,
51+
}
52+
}
53+
54+
type Node struct {
55+
// Node Address.
56+
addr *url.URL
57+
58+
// Whether this node is alive.
59+
active atomic.Bool
60+
61+
// Counter of currently running connections.
62+
connections counter.Counter
63+
64+
// Counter of unsuccesfull request to decrease host priority.
65+
penalty atomic.Uint32
66+
67+
// Heartbeat function
68+
hb heartbeat.HeartBeat
69+
70+
// TODO These fields are only used for labels in prometheus. We should have a different way to pass the labels.
71+
// For metrics only
72+
clusterName string
73+
replicaName string
74+
75+
// Additional configuration options
76+
opts nodeOpts
77+
}
78+
79+
func NewNode(addr *url.URL, hb heartbeat.HeartBeat, clusterName, replicaName string, opts ...NodeOption) *Node {
80+
nodeOpts := defaultNodeOpts()
81+
82+
for _, opt := range opts {
83+
opt.apply(&nodeOpts)
84+
}
85+
86+
n := &Node{
87+
addr: addr,
88+
hb: hb,
89+
clusterName: clusterName,
90+
replicaName: replicaName,
91+
opts: nodeOpts,
92+
}
93+
94+
if n.opts.defaultActive {
95+
n.SetIsActive(true)
96+
}
97+
98+
return n
99+
}
100+
101+
func (n *Node) IsActive() bool {
102+
return n.active.Load()
103+
}
104+
105+
func (n *Node) SetIsActive(active bool) {
106+
n.active.Store(active)
107+
}
108+
109+
// StartHeartbeat runs the heartbeat healthcheck against the node
110+
// until the done channel is closed.
111+
// If the heartbeat fails, the active status of the node is changed.
112+
func (n *Node) StartHeartbeat(done <-chan struct{}) {
113+
ctx, cancel := context.WithCancel(context.Background())
114+
for {
115+
n.heartbeat(ctx)
116+
select {
117+
case <-done:
118+
cancel()
119+
return
120+
case <-time.After(n.hb.Interval()):
121+
}
122+
}
123+
}
124+
125+
func (n *Node) heartbeat(ctx context.Context) {
126+
if err := n.hb.IsHealthy(ctx, n.addr.String()); err == nil {
127+
n.active.Store(true)
128+
reportNodeHealthMetric(n.clusterName, n.replicaName, n.Host(), true)
129+
} else {
130+
log.Errorf("error while health-checking %q host: %s", n.Host(), err)
131+
n.active.Store(false)
132+
reportNodeHealthMetric(n.clusterName, n.replicaName, n.Host(), false)
133+
}
134+
}
135+
136+
// Penalize a node if a request failed to decrease it's priority.
137+
// If the penalty is already at the maximum allowed size this function
138+
// will not penalize the node further.
139+
// A function will be registered to run after the penalty duration to
140+
// increase the priority again.
141+
func (n *Node) Penalize() {
142+
penalty := n.penalty.Load()
143+
if penalty >= n.opts.penaltyMaxSize {
144+
return
145+
}
146+
147+
incrementPenaltiesMetric(n.clusterName, n.replicaName, n.Host())
148+
149+
n.penalty.Add(n.opts.penaltySize)
150+
151+
time.AfterFunc(n.opts.penaltyDuration, func() {
152+
n.penalty.Add(^uint32(n.opts.penaltySize - 1))
153+
})
154+
}
155+
156+
// CurrentLoad returns the current node returns the number of open connections
157+
// plus the penalty.
158+
func (n *Node) CurrentLoad() uint32 {
159+
c := n.connections.Load()
160+
p := n.penalty.Load()
161+
return c + p
162+
}
163+
164+
func (n *Node) CurrentConnections() uint32 {
165+
return n.connections.Load()
166+
}
167+
168+
func (n *Node) CurrentPenalty() uint32 {
169+
return n.penalty.Load()
170+
}
171+
172+
func (n *Node) IncrementConnections() {
173+
n.connections.Inc()
174+
}
175+
176+
func (n *Node) DecrementConnections() {
177+
n.connections.Dec()
178+
}
179+
180+
func (n *Node) Scheme() string {
181+
return n.addr.Scheme
182+
}
183+
184+
func (n *Node) Host() string {
185+
return n.addr.Host
186+
}
187+
188+
func (n *Node) ReplicaName() string {
189+
return n.replicaName
190+
}
191+
192+
func (n *Node) String() string {
193+
return n.addr.String()
194+
}

internal/topology/node_test.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package topology
2+
3+
import (
4+
"context"
5+
"errors"
6+
"net/url"
7+
"testing"
8+
"time"
9+
10+
"github.com/contentsquare/chproxy/internal/heartbeat"
11+
"github.com/stretchr/testify/assert"
12+
)
13+
14+
var _ heartbeat.HeartBeat = &mockHeartbeat{}
15+
16+
type mockHeartbeat struct {
17+
interval time.Duration
18+
err error
19+
}
20+
21+
func (hb *mockHeartbeat) Interval() time.Duration {
22+
return hb.interval
23+
}
24+
25+
func (hb *mockHeartbeat) IsHealthy(ctx context.Context, addr string) error {
26+
return hb.err
27+
}
28+
29+
func TestPenalize(t *testing.T) {
30+
node := NewNode(&url.URL{Host: "127.0.0.1"}, nil, "test", "test")
31+
expectedLoad := uint32(0)
32+
assert.Equal(t, expectedLoad, node.CurrentLoad(), "got running queries %d; expected %d", node.CurrentLoad(), expectedLoad)
33+
34+
node.Penalize()
35+
expectedLoad = uint32(DefaultPenaltySize)
36+
assert.Equal(t, expectedLoad, node.CurrentLoad(), "got running queries %d; expected %d", node.CurrentLoad(), expectedLoad)
37+
38+
// do more penalties than `penaltyMaxSize` allows
39+
max := int(DefaultMaxSize/DefaultPenaltySize) * 2
40+
for i := 0; i < max; i++ {
41+
node.Penalize()
42+
}
43+
44+
expectedLoad = uint32(DefaultMaxSize)
45+
assert.Equal(t, expectedLoad, node.CurrentLoad(), "got running queries %d; expected %d", node.CurrentLoad(), expectedLoad)
46+
47+
// Still allow connections to increase.
48+
node.IncrementConnections()
49+
expectedLoad++
50+
assert.Equal(t, expectedLoad, node.CurrentLoad(), "got running queries %d; expected %d", node.CurrentLoad(), expectedLoad)
51+
}
52+
53+
func TestStartHeartbeat(t *testing.T) {
54+
hb := &mockHeartbeat{
55+
interval: 10 * time.Millisecond,
56+
err: nil,
57+
}
58+
59+
done := make(chan struct{})
60+
defer close(done)
61+
62+
node := NewNode(&url.URL{Host: "127.0.0.1"}, hb, "test", "test")
63+
64+
// Node is eventually active after start.
65+
go node.StartHeartbeat(done)
66+
67+
assert.Eventually(t, func() bool {
68+
return node.IsActive()
69+
}, time.Second, 100*time.Millisecond)
70+
71+
// change heartbeat to error, node eventually becomes inactive.
72+
hb.err = errors.New("failed connection")
73+
74+
assert.Eventually(t, func() bool {
75+
return !node.IsActive()
76+
}, time.Second, 100*time.Millisecond)
77+
78+
// If error is removed node becomes active again.
79+
hb.err = nil
80+
81+
assert.Eventually(t, func() bool {
82+
return node.IsActive()
83+
}, time.Second, 100*time.Millisecond)
84+
}

0 commit comments

Comments
 (0)