Skip to content

Commit ee67a2d

Browse files
Sticky Sessions: Tolerate ClickHouse Session ID Mechanism (#117)
* Adjusted logic * Error handling * tests * Adjust * session_id * test adjustment * session_timeout logic * session_timeout tests * header test * cleaning unused code * handling session_id verification * Ability to build version under Docker (Native go build doesn't work due to Datadog lib)
1 parent 7cabf6e commit ee67a2d

File tree

8 files changed

+156
-21
lines changed

8 files changed

+156
-21
lines changed

Dockerfile

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ RUN go get golang.org/x/lint/golint
55
RUN mkdir -p /go/src/github.com/Vertamedia/chproxy
66
WORKDIR /go/src/github.com/Vertamedia/chproxy
77
COPY . ./
8+
ARG EXT_BUILD_TAG
9+
ENV EXT_BUILD_TAG ${EXT_BUILD_TAG}
810
RUN make release-build
911

1012
FROM alpine

Makefile

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1+
current_dir = $(pwd)
12
pkgs = $(shell go list ./...)
23
gofiles := $(shell find . -name "*.go" -type f -not -path "./vendor/*")
34

4-
BUILD_TAG = $(shell git tag --points-at HEAD)
5+
BUILD_TAG = $(or $(shell git tag --points-at HEAD), $(EXT_BUILD_TAG), latest)
56

67
BUILD_CONSTS = \
78
-X main.buildTime=`date -u '+%Y-%m-%d_%H:%M:%S'` \
@@ -36,7 +37,16 @@ clean:
3637
rm -f chproxy
3738

3839
release-build:
40+
@echo "Ver: $(BUILD_TAG), OPTS: $(BUILD_OPTS)"
3941
GOOS=linux GOARCH=amd64 go build $(BUILD_OPTS)
42+
rm chproxy-linux-amd64-*.tar.gz
43+
tar czf chproxy-linux-amd64-$(BUILD_TAG).tar.gz chproxy
4044

4145
release: format lint test clean release-build
46+
@echo "Ver: $(BUILD_TAG), OPTS: $(BUILD_OPTS)"
4247
tar czf chproxy-linux-amd64-$(BUILD_TAG).tar.gz chproxy
48+
49+
release-build-docker:
50+
@echo "Ver: $(BUILD_TAG)"
51+
@DOCKER_BUILDKIT=1 docker build --target build --build-arg EXT_BUILD_TAG=$(BUILD_TAG) --progress plain -t chproxy-build .
52+
@docker run --rm --entrypoint "/bin/sh" -v $(CURDIR):/host chproxy-build -c "/bin/cp /go/src/github.com/Vertamedia/chproxy/*.tar.gz /host"

main_test.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,26 @@ func TestServe(t *testing.T) {
284284
},
285285
startHTTP,
286286
},
287+
{
288+
"http POST request with session id",
289+
"testdata/http-session-id.yml",
290+
func(t *testing.T) {
291+
req, err := http.NewRequest("POST",
292+
"http://127.0.0.1:9090/?query_id=45395792-a432-4b92-8cc9-536c14e1e1a9&extremes=0&session_id=default-session-id233",
293+
bytes.NewBufferString("SELECT * FROM system.numbers LIMIT 10"))
294+
req.Header.Set("Content-Type", "application/x-www-form-urlencoded;") // This makes it work
295+
296+
checkErr(t, err)
297+
resp, err := http.DefaultClient.Do(req)
298+
checkErr(t, err)
299+
300+
if resp.StatusCode != http.StatusOK || resp.StatusCode != http.StatusOK && resp.Header.Get("X-Clickhouse-Server-Session-Id") == "" {
301+
t.Fatalf("unexpected status code: %d; expected: %d", resp.StatusCode, http.StatusOK)
302+
}
303+
resp.Body.Close()
304+
},
305+
startHTTP,
306+
},
287307
{
288308
"http request",
289309
"testdata/http.yml",

proxy.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@ func newReverseProxy() *reverseProxy {
5151

5252
func (rp *reverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
5353
startTime := time.Now()
54-
5554
s, status, err := rp.getScope(req)
5655
if err != nil {
5756
q := getQuerySnippet(req)
@@ -99,6 +98,11 @@ func (rp *reverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
9998
ReadCloser: req.Body,
10099
}
101100

101+
// publish session_id if needed
102+
if s.sessionId != "" {
103+
rw.Header().Set("X-ClickHouse-Server-Session-Id", s.sessionId)
104+
}
105+
102106
if s.user.cache == nil {
103107
rp.proxyRequest(s, srw, srw, req)
104108
} else {
@@ -110,9 +114,9 @@ func (rp *reverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
110114
q := getQuerySnippet(req)
111115
if srw.statusCode == http.StatusOK {
112116
requestSuccess.With(s.labels).Inc()
113-
log.Debugf("%s: request success; query: %q; URL: %q", s, q, req.URL.String())
117+
log.Debugf("%s: request success; query: %q; Method: %s; URL: %q", s, q, req.Method, req.URL.String())
114118
} else {
115-
log.Debugf("%s: request failure: non-200 status code %d; query: %q; URL: %q", s, srw.statusCode, q, req.URL.String())
119+
log.Debugf("%s: request failure: non-200 status code %d; query: %q; Method: %s; URL: %q", s, srw.statusCode, q, req.Method, req.URL.String())
116120
}
117121

118122
statusCodes.With(
@@ -435,7 +439,7 @@ func (rp *reverseProxy) applyConfig(cfg *config.Config) error {
435439
return nil
436440
}
437441

438-
// refreshCacheMetrics refresehs cacheSize and cacheItems metrics.
442+
// refreshCacheMetrics refreshes cacheSize and cacheItems metrics.
439443
func (rp *reverseProxy) refreshCacheMetrics() {
440444
rp.lock.RLock()
441445
defer rp.lock.RUnlock()
@@ -452,7 +456,8 @@ func (rp *reverseProxy) refreshCacheMetrics() {
452456

453457
func (rp *reverseProxy) getScope(req *http.Request) (*scope, int, error) {
454458
name, password := getAuth(req)
455-
459+
sessionId := getSessionId(req)
460+
sessionTimeout := getSessionTimeout(req)
456461
var (
457462
u *user
458463
c *cluster
@@ -489,6 +494,6 @@ func (rp *reverseProxy) getScope(req *http.Request) (*scope, int, error) {
489494
return nil, http.StatusForbidden, fmt.Errorf("cluster user %q is not allowed to access", cu.name)
490495
}
491496

492-
s := newScope(req, u, c, cu)
497+
s := newScope(req, u, c, cu, sessionId, sessionTimeout)
493498
return s, 0, nil
494499
}

scope.go

Lines changed: 66 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"net/http"
1010
"net/url"
1111
"regexp"
12+
"strconv"
1213
"strings"
1314
"sync/atomic"
1415
"time"
@@ -40,6 +41,9 @@ type scope struct {
4041
user *user
4142
clusterUser *clusterUser
4243

44+
sessionId string
45+
sessionTimeout int
46+
4347
remoteAddr string
4448
localAddr string
4549

@@ -49,20 +53,24 @@ type scope struct {
4953
labels prometheus.Labels
5054
}
5155

52-
func newScope(req *http.Request, u *user, c *cluster, cu *clusterUser) *scope {
56+
func newScope(req *http.Request, u *user, c *cluster, cu *clusterUser, sessionId string, sessionTimeout int) *scope {
5357
h := c.getHost()
54-
58+
if sessionId != "" {
59+
h = c.getHostSticky(sessionId)
60+
}
5561
var localAddr string
5662
if addr, ok := req.Context().Value(http.LocalAddrContextKey).(net.Addr); ok {
5763
localAddr = addr.String()
5864
}
5965
s := &scope{
60-
startTime: time.Now(),
61-
id: newScopeID(),
62-
host: h,
63-
cluster: c,
64-
user: u,
65-
clusterUser: cu,
66+
startTime: time.Now(),
67+
id: newScopeID(),
68+
host: h,
69+
cluster: c,
70+
user: u,
71+
clusterUser: cu,
72+
sessionId: sessionId,
73+
sessionTimeout: sessionTimeout,
6674

6775
remoteAddr: req.RemoteAddr,
6876
localAddr: localAddr,
@@ -305,6 +313,10 @@ var allowedParams = []string{
305313
"extremes",
306314
// what to do if the volume of the result exceeds one of the limits
307315
"result_overflow_mode",
316+
// session stickiness
317+
"session_id",
318+
// session timeout
319+
"session_timeout",
308320
}
309321

310322
// This regexp must match params needed to describe a way to use external data
@@ -349,6 +361,8 @@ func (s *scope) decorateRequest(req *http.Request) (*http.Request, url.Values) {
349361

350362
// Set query_id as scope_id to have possibility to kill query if needed.
351363
params.Set("query_id", s.id.String())
364+
// Set session_timeout an idle timeout for session
365+
params.Set("session_timeout", strconv.Itoa(s.sessionTimeout))
352366

353367
req.URL.RawQuery = params.Encode()
354368

@@ -810,6 +824,42 @@ func (c *cluster) getReplica() *replica {
810824
return r
811825
}
812826

827+
// getHostSticky returns host by stickiness from replica.
828+
//
829+
// Always returns non-nil.
830+
func (r *replica) getHostSticky(sessionId string) *host {
831+
idx := atomic.AddUint32(&r.nextHostIdx, 1)
832+
n := uint32(len(r.hosts))
833+
if n == 1 {
834+
return r.hosts[0]
835+
}
836+
837+
idx %= n
838+
h := r.hosts[idx]
839+
840+
// Scan all the hosts for the least loaded host.
841+
for i := uint32(1); i < n; i++ {
842+
tmpIdx := (idx + i) % n
843+
844+
// handling sticky session
845+
sessionId := hash(sessionId)
846+
tmpIdx = (sessionId) % n
847+
tmpHSticky := r.hosts[tmpIdx]
848+
log.Debugf("Sticky server candidate is: %s", tmpHSticky.addr)
849+
if !tmpHSticky.isActive() {
850+
log.Debugf("Sticky session server has been picked up, but it is not available")
851+
continue
852+
}
853+
log.Debugf("Sticky session server is: %s, session_id: %d, server_idx: %d, max nodes in pool: %d", tmpHSticky.addr, sessionId, tmpIdx, n)
854+
return tmpHSticky
855+
}
856+
857+
// The returned host may be inactive. This is OK,
858+
// since this means all the hosts are inactive,
859+
// so let's try proxying the request to any host.
860+
return h
861+
}
862+
813863
// getHost returns least loaded + round-robin host from replica.
814864
//
815865
// Always returns non-nil.
@@ -856,6 +906,14 @@ func (r *replica) getHost() *host {
856906
return h
857907
}
858908

909+
// getHostSticky returns host based on stickiness from cluster.
910+
//
911+
// Always returns non-nil.
912+
func (c *cluster) getHostSticky(sessionId string) *host {
913+
r := c.getReplica()
914+
return r.getHostSticky(sessionId)
915+
}
916+
859917
// getHost returns least loaded + round-robin host from cluster.
860918
//
861919
// Always returns non-nil.

scope_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -330,14 +330,14 @@ func TestDecorateRequest(t *testing.T) {
330330
"text/plain",
331331
"GET",
332332
nil,
333-
[]string{"query_id", "query"},
333+
[]string{"query_id", "session_timeout", "query"},
334334
},
335335
{
336336
"http://127.0.0.1?user=default&password=default&query=SELECT&database=default&wait_end_of_query=1",
337337
"text/plain",
338338
"GET",
339339
nil,
340-
[]string{"query_id", "query", "database"},
340+
[]string{"query_id", "session_timeout", "query", "database"},
341341
},
342342
{
343343
"http://127.0.0.1?user=default&password=default&query=SELECT&testdata_structure=id+UInt32&testdata_format=TSV",
@@ -352,7 +352,7 @@ func TestDecorateRequest(t *testing.T) {
352352
},
353353
},
354354
},
355-
[]string{"query_id", "query", "max_threads"},
355+
[]string{"query_id", "session_timeout", "query", "max_threads"},
356356
},
357357
{
358358
"http://127.0.0.1?user=default&password=default&query=SELECT&testdata_structure=id+UInt32&testdata_format=TSV",
@@ -367,7 +367,7 @@ func TestDecorateRequest(t *testing.T) {
367367
},
368368
},
369369
},
370-
[]string{"query_id", "query"},
370+
[]string{"query_id", "session_timeout", "query"},
371371
},
372372
{
373373
"http://127.0.0.1?user=default&password=default&query=SELECT&testdata_type_buzz=1&testdata_structure_foo=id+UInt32&testdata_format-bar=TSV",
@@ -386,14 +386,14 @@ func TestDecorateRequest(t *testing.T) {
386386
},
387387
},
388388
},
389-
[]string{"query_id", "query", "max_threads", "background_pool_size"},
389+
[]string{"query_id", "session_timeout", "query", "max_threads", "background_pool_size"},
390390
},
391391
{
392392
"http://127.0.0.1?user=default&password=default&query=SELECT&testdata_structure=id+UInt32&testdata_format=TSV",
393393
"multipart/form-data; boundary=foobar",
394394
"POST",
395395
nil,
396-
[]string{"query_id", "testdata_structure", "testdata_format", "query"},
396+
[]string{"query_id", "session_timeout", "testdata_structure", "testdata_format", "query"},
397397
},
398398
}
399399

testdata/http-session-id.yml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
log_debug: true
2+
server:
3+
http:
4+
listen_addr: ":9090"
5+
allowed_networks: ["127.0.0.1/24"]
6+
7+
users:
8+
- name: "default"
9+
to_cluster: "default"
10+
to_user: "default"
11+
12+
clusters:
13+
- name: "default"
14+
nodes:
15+
- 127.0.0.1:8124

utils.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@ import (
44
"bytes"
55
"compress/gzip"
66
"fmt"
7+
"hash/fnv"
78
"io"
89
"io/ioutil"
910
"net/http"
1011
"sort"
12+
"strconv"
1113
"strings"
1214

1315
"github.com/Vertamedia/chproxy/chdecompressor"
@@ -43,6 +45,23 @@ func getAuth(req *http.Request) (string, string) {
4345
return "default", ""
4446
}
4547

48+
// getSessionId retrieves session id
49+
func getSessionId(req *http.Request) string {
50+
params := req.URL.Query()
51+
sessionId := params.Get("session_id")
52+
return sessionId
53+
}
54+
55+
// getSessionId retrieves session id
56+
func getSessionTimeout(req *http.Request) int {
57+
params := req.URL.Query()
58+
sessionTimeout, err := strconv.Atoi(params.Get("session_timeout"))
59+
if err != nil && sessionTimeout > 0 {
60+
return sessionTimeout
61+
}
62+
return 60
63+
}
64+
4665
// getQuerySnippet returns query snippet.
4766
//
4867
// getQuerySnippet must be called only for error reporting.
@@ -57,6 +76,12 @@ func getQuerySnippet(req *http.Request) string {
5776
return query + body
5877
}
5978

79+
func hash(s string) uint32 {
80+
h := fnv.New32a()
81+
h.Write([]byte(s))
82+
return h.Sum32()
83+
}
84+
6085
func getQuerySnippetFromBody(req *http.Request) string {
6186
if req.Body == nil {
6287
return ""

0 commit comments

Comments
 (0)