@@ -166,62 +166,19 @@ func (rp *reverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
166
166
func executeWithRetry (
167
167
ctx context.Context ,
168
168
s * scope ,
169
- retryNum int ,
169
+ maxRetry int ,
170
170
rp func (http.ResponseWriter , * http.Request ),
171
171
rw ResponseWriterWithCode ,
172
172
srw StatResponseWriter ,
173
173
req * http.Request ,
174
174
monitorDuration func (float64 ),
175
175
monitorRetryRequestInc func (prometheus.Labels ),
176
176
) (float64 , error ) {
177
- // num of replicas should > 1,
178
- // when the host is unavailable,
179
- // it could make sure there might be another set of replicas contains all the data of clickhouse
180
177
startTime := time .Now ()
181
178
var since float64
182
- numReplicas := len (s .host .replica .cluster .replicas )
183
- if numReplicas > 1 && retryNum <= numReplicas {
184
- for i := 0 ; i <= retryNum ; i ++ {
185
- rp (rw , req )
186
179
187
- err := ctx .Err ()
188
- if err != nil {
189
- since = time .Since (startTime ).Seconds ()
190
-
191
- return since , err
192
- }
193
- srw .SetStatusCode (rw .StatusCode ())
194
- if rw .StatusCode () == http .StatusBadGateway {
195
- log .Debugf ("the invalid host is: %s" , s .host .addr )
196
- if i == retryNum {
197
- since = time .Since (startTime ).Seconds ()
198
- monitorDuration (since )
199
-
200
- s .host .penalize ()
201
- q := getQuerySnippet (req )
202
- err1 := fmt .Errorf ("%s: cannot reach %s; query: %q" , s , s .host .addr .Host , q )
203
- respondWith (srw , err1 , srw .StatusCode ())
204
- return since , nil
205
- } else {
206
- // the query execution has been failed
207
- s .host .penalize ()
208
- s .host .dec ()
209
- atomic .StoreUint32 (& s .host .active , uint32 (0 ))
210
- monitorRetryRequestInc (s .labels )
211
- h := s .host
212
- s .host = h .replica .cluster .getHost ()
213
-
214
- req .URL .Host = s .host .addr .Host
215
- req .URL .Scheme = s .host .addr .Scheme
216
- log .Debugf ("the valid host is: %s" , s .host .addr )
217
- }
218
- } else {
219
- since = time .Since (startTime ).Seconds ()
220
-
221
- return since , nil
222
- }
223
- }
224
- } else {
180
+ numRetry := 0
181
+ for {
225
182
rp (rw , req )
226
183
227
184
err := ctx .Err ()
@@ -232,19 +189,39 @@ func executeWithRetry(
232
189
}
233
190
// The request has been successfully proxied.
234
191
235
- since = time .Since (startTime ).Seconds ()
236
- monitorDuration (since )
237
-
238
192
srw .SetStatusCode (rw .StatusCode ())
239
193
// StatusBadGateway response is returned by http.ReverseProxy when
240
194
// it cannot establish connection to remote host.
241
195
if rw .StatusCode () == http .StatusBadGateway {
242
196
log .Debugf ("the invalid host is: %s" , s .host .addr )
243
197
s .host .penalize ()
244
- q := getQuerySnippet (req )
245
- err1 := fmt .Errorf ("%s: cannot reach %s; query: %q" , s , s .host .addr .Host , q )
246
- respondWith (srw , err1 , srw .StatusCode ())
198
+ s .host .dec ()
199
+ atomic .StoreUint32 (& s .host .active , uint32 (0 ))
200
+ newHost := s .host .replica .cluster .getHost ()
201
+ // The query could be retried if it has no stickiness to a certain server
202
+ if numRetry < maxRetry && newHost .isActive () && s .sessionId == "" {
203
+ // the query execution has been failed
204
+ monitorRetryRequestInc (s .labels )
205
+
206
+ // update host
207
+ s .host = newHost
208
+
209
+ req .URL .Host = s .host .addr .Host
210
+ req .URL .Scheme = s .host .addr .Scheme
211
+ log .Debugf ("the valid host is: %s" , s .host .addr )
212
+ } else {
213
+ since = time .Since (startTime ).Seconds ()
214
+ monitorDuration (since )
215
+ q := getQuerySnippet (req )
216
+ err1 := fmt .Errorf ("%s: cannot reach %s; query: %q" , s , s .host .addr .Host , q )
217
+ respondWith (srw , err1 , srw .StatusCode ())
218
+ break
219
+ }
220
+ } else {
221
+ since = time .Since (startTime ).Seconds ()
222
+ break
247
223
}
224
+ numRetry ++
248
225
}
249
226
return since , nil
250
227
}
0 commit comments