Skip to content
This repository was archived by the owner on Feb 5, 2025. It is now read-only.

Commit 9cafe32

Browse files
author
Mateusz Rzeszutek
authored
Merge pull request #268 from Excaleo/configurable-retries
Make retryable exceptions configurable
2 parents 16ca544 + e720426 commit 9cafe32

8 files changed

+183
-36
lines changed

signalfx-java/src/main/java/com/signalfx/connection/AbstractHttpReceiverConnection.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.signalfx.connection;
22

33
import java.io.IOException;
4+
import java.util.List;
45
import java.util.regex.Pattern;
56

67
import org.apache.commons.io.IOUtils;
@@ -22,7 +23,8 @@
2223
import com.signalfx.endpoint.SignalFxReceiverEndpoint;
2324
import com.signalfx.metrics.SignalFxMetricsException;
2425

25-
import static com.signalfx.connection.RetryHandler.DEFAULT_MAX_RETRIES;
26+
import static com.signalfx.connection.RetryDefaults.DEFAULT_MAX_RETRIES;
27+
import static com.signalfx.connection.RetryDefaults.DEFAULT_NON_RETRYABLE_EXCEPTIONS;
2628

2729
public abstract class AbstractHttpReceiverConnection {
2830

@@ -47,9 +49,14 @@ protected AbstractHttpReceiverConnection(SignalFxReceiverEndpoint endpoint, int
4749

4850
protected AbstractHttpReceiverConnection(SignalFxReceiverEndpoint endpoint, int timeoutMs, int maxRetries,
4951
HttpClientConnectionManager httpClientConnectionManager) {
52+
this(endpoint, timeoutMs, DEFAULT_MAX_RETRIES, httpClientConnectionManager, DEFAULT_NON_RETRYABLE_EXCEPTIONS);
53+
}
54+
55+
protected AbstractHttpReceiverConnection(SignalFxReceiverEndpoint endpoint, int timeoutMs, int maxRetries,
56+
HttpClientConnectionManager httpClientConnectionManager, List<Class<? extends IOException>> nonRetryableExceptions) {
5057
this.client = HttpClientBuilder.create()
5158
.setConnectionManager(httpClientConnectionManager)
52-
.setRetryHandler(new RetryHandler(maxRetries))
59+
.setRetryHandler(new RetryHandler(maxRetries, nonRetryableExceptions))
5360
.setServiceUnavailableRetryStrategy(new RetryStrategy(maxRetries))
5461
.build();
5562
this.host = new HttpHost(endpoint.getHostname(), endpoint.getPort(), endpoint.getScheme());
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package com.signalfx.connection;
2+
3+
import java.io.IOException;
4+
import java.io.InterruptedIOException;
5+
import java.net.ConnectException;
6+
import java.net.UnknownHostException;
7+
import java.util.Arrays;
8+
import java.util.Collections;
9+
import java.util.List;
10+
11+
public final class RetryDefaults {
12+
private RetryDefaults() {
13+
}
14+
15+
public static final int DEFAULT_MAX_RETRIES = 3;
16+
public static final List<Class<? extends IOException>> DEFAULT_NON_RETRYABLE_EXCEPTIONS = Collections.unmodifiableList(Arrays.asList(
17+
InterruptedIOException.class,
18+
UnknownHostException.class,
19+
ConnectException.class));
20+
}
Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,30 @@
11
package com.signalfx.connection;
22

3-
import java.io.InterruptedIOException;
4-
import java.net.ConnectException;
5-
import java.net.UnknownHostException;
6-
import java.util.Arrays;
3+
import java.io.IOException;
4+
import java.util.List;
75

86
import org.apache.http.impl.client.DefaultHttpRequestRetryHandler;
97

8+
import static com.signalfx.connection.RetryDefaults.DEFAULT_MAX_RETRIES;
9+
import static com.signalfx.connection.RetryDefaults.DEFAULT_NON_RETRYABLE_EXCEPTIONS;
10+
1011
/**
1112
* Compared to the {@link DefaultHttpRequestRetryHandler} we allow retry on {@link
1213
* javax.net.ssl.SSLException}, because it gets thrown when we try to send data points over a
1314
* connection that our server has already closed. It is still unknown how exactly our server closes
1415
* "stale" connections in such a way that http client is unable to detect this.
1516
*/
1617
class RetryHandler extends DefaultHttpRequestRetryHandler {
17-
public static final Integer DEFAULT_MAX_RETRIES = 3;
1818

1919
public RetryHandler(final int maxRetries) {
20-
super(maxRetries, true, Arrays.asList(
21-
InterruptedIOException.class,
22-
UnknownHostException.class,
23-
ConnectException.class));
20+
this(maxRetries, DEFAULT_NON_RETRYABLE_EXCEPTIONS);
2421
}
2522

2623
public RetryHandler() {
27-
super(DEFAULT_MAX_RETRIES, true, Arrays.asList(
28-
InterruptedIOException.class,
29-
UnknownHostException.class,
30-
ConnectException.class));
24+
this(DEFAULT_MAX_RETRIES, DEFAULT_NON_RETRYABLE_EXCEPTIONS);
25+
}
26+
27+
public RetryHandler(final int maxRetries, List<Class<? extends IOException>> clazzes) {
28+
super(maxRetries, true, clazzes);
3129
}
3230
}

signalfx-java/src/main/java/com/signalfx/metrics/connection/AbstractHttpDataPointProtobufReceiverConnection.java

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,29 @@
11
package com.signalfx.metrics.connection;
22

3-
import java.io.IOException;
4-
import java.nio.charset.StandardCharsets;
5-
import java.util.List;
6-
import java.util.Map;
7-
83
import com.google.common.collect.Lists;
4+
import com.signalfx.common.proto.ProtocolBufferStreamingInputStream;
5+
import com.signalfx.connection.AbstractHttpReceiverConnection;
6+
import com.signalfx.endpoint.SignalFxReceiverEndpoint;
7+
import com.signalfx.metrics.SignalFxMetricsException;
8+
import com.signalfx.metrics.protobuf.SignalFxProtocolBuffers;
99
import org.apache.http.HttpEntity;
10-
import org.apache.http.HttpStatus;
1110
import org.apache.http.NameValuePair;
1211
import org.apache.http.client.methods.CloseableHttpResponse;
1312
import org.apache.http.client.utils.URLEncodedUtils;
1413
import org.apache.http.conn.HttpClientConnectionManager;
1514
import org.apache.http.entity.ContentType;
1615
import org.apache.http.entity.InputStreamEntity;
1716
import org.apache.http.message.BasicNameValuePair;
18-
19-
import com.signalfx.common.proto.ProtocolBufferStreamingInputStream;
20-
import com.signalfx.connection.AbstractHttpReceiverConnection;
21-
import com.signalfx.endpoint.SignalFxReceiverEndpoint;
22-
import com.signalfx.metrics.SignalFxMetricsException;
23-
import com.signalfx.metrics.protobuf.SignalFxProtocolBuffers;
2417
import org.apache.http.util.EntityUtils;
2518

19+
import java.io.IOException;
20+
import java.nio.charset.StandardCharsets;
21+
import java.util.List;
22+
import java.util.Map;
23+
24+
import static com.signalfx.connection.RetryDefaults.DEFAULT_MAX_RETRIES;
25+
import static com.signalfx.connection.RetryDefaults.DEFAULT_NON_RETRYABLE_EXCEPTIONS;
26+
2627
public abstract class AbstractHttpDataPointProtobufReceiverConnection extends AbstractHttpReceiverConnection implements DataPointReceiver {
2728

2829
protected static final ContentType PROTO_TYPE = ContentType.create("application/x-protobuf");
@@ -32,15 +33,22 @@ public abstract class AbstractHttpDataPointProtobufReceiverConnection extends Ab
3233
public AbstractHttpDataPointProtobufReceiverConnection(SignalFxReceiverEndpoint endpoint,
3334
int timeoutMs,
3435
HttpClientConnectionManager httpClientConnectionManager) {
35-
super(endpoint, timeoutMs, httpClientConnectionManager);
36-
this.compress = !Boolean.getBoolean(DISABLE_COMPRESSION_PROPERTY);
36+
this(endpoint, timeoutMs, DEFAULT_MAX_RETRIES, httpClientConnectionManager, DEFAULT_NON_RETRYABLE_EXCEPTIONS);
3737
}
3838

3939
public AbstractHttpDataPointProtobufReceiverConnection(SignalFxReceiverEndpoint endpoint,
4040
int timeoutMs,
4141
int maxRetries,
4242
HttpClientConnectionManager httpClientConnectionManager) {
43-
super(endpoint, timeoutMs, maxRetries, httpClientConnectionManager);
43+
this(endpoint, timeoutMs, maxRetries, httpClientConnectionManager, DEFAULT_NON_RETRYABLE_EXCEPTIONS);
44+
}
45+
46+
public AbstractHttpDataPointProtobufReceiverConnection(SignalFxReceiverEndpoint endpoint,
47+
int timeoutMs,
48+
int maxRetries,
49+
HttpClientConnectionManager httpClientConnectionManager,
50+
List<Class<? extends IOException>> nonRetryableExceptions) {
51+
super(endpoint, timeoutMs, maxRetries, httpClientConnectionManager, nonRetryableExceptions);
4452
this.compress = !Boolean.getBoolean(DISABLE_COMPRESSION_PROPERTY);
4553
}
4654

@@ -84,7 +92,7 @@ protected abstract HttpEntity getEntityForVersion(
8492
List<SignalFxProtocolBuffers.DataPoint> dataPoints);
8593

8694
@Override
87-
public void backfillDataPoints(String auth, String metric, String metricType, String orgId, Map<String,String> dimensions,
95+
public void backfillDataPoints(String auth, String metric, String metricType, String orgId, Map<String, String> dimensions,
8896
List<SignalFxProtocolBuffers.PointValue> datumPoints)
8997
throws SignalFxMetricsException {
9098
if (datumPoints.isEmpty()) {
@@ -97,7 +105,7 @@ public void backfillDataPoints(String auth, String metric, String metricType, St
97105
params.add(new BasicNameValuePair("metric", metric));
98106

99107
// Each dimension is added as a param in the form of "sfxdim_DIMNAME"
100-
for (Map.Entry<String,String> entry : dimensions.entrySet()) {
108+
for (Map.Entry<String, String> entry : dimensions.entrySet()) {
101109
params.add(new BasicNameValuePair("sfxdim_" + entry.getKey(), entry.getValue()));
102110
}
103111

signalfx-java/src/main/java/com/signalfx/metrics/connection/HttpDataPointProtobufReceiverConnection.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,12 @@ public HttpDataPointProtobufReceiverConnection(
3737
super(endpoint, timeoutMs, maxRetries, httpClientConnectionManager);
3838
}
3939

40+
public HttpDataPointProtobufReceiverConnection(
41+
SignalFxReceiverEndpoint endpoint, int timeoutMs, int maxRetries,
42+
HttpClientConnectionManager httpClientConnectionManager, List<Class<? extends IOException>> nonRetryableExceptions) {
43+
super(endpoint, timeoutMs, maxRetries, httpClientConnectionManager, nonRetryableExceptions);
44+
}
45+
4046
@Override
4147
protected HttpEntity getEntityForVersion(List<SignalFxProtocolBuffers.DataPoint> dataPoints) {
4248
return new InputStreamEntity(

signalfx-java/src/main/java/com/signalfx/metrics/connection/HttpDataPointProtobufReceiverConnectionV2.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.signalfx.metrics.connection;
22

3+
import java.io.IOException;
34
import java.util.HashMap;
45
import java.util.List;
56
import java.util.Map;
@@ -26,6 +27,12 @@ public HttpDataPointProtobufReceiverConnectionV2(
2627
super(endpoint, timeoutMs, maxRetries, httpClientConnectionManager);
2728
}
2829

30+
public HttpDataPointProtobufReceiverConnectionV2(
31+
SignalFxReceiverEndpoint endpoint, int timeoutMs, int maxRetries,
32+
HttpClientConnectionManager httpClientConnectionManager, List<Class<? extends IOException>> nonRetryableExceptions) {
33+
super(endpoint, timeoutMs, maxRetries, httpClientConnectionManager, nonRetryableExceptions);
34+
}
35+
2936
@Override
3037
protected String getEndpointForAddDatapoints() {
3138
return "/v2/datapoint";

signalfx-java/src/main/java/com/signalfx/metrics/connection/HttpDataPointProtobufReceiverFactory.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,25 @@
66
import com.signalfx.endpoint.SignalFxReceiverEndpoint;
77
import com.signalfx.metrics.SignalFxMetricsException;
88

9+
import java.io.IOException;
10+
import java.util.ArrayList;
11+
import java.util.Collections;
12+
import java.util.List;
13+
14+
import static com.signalfx.connection.RetryDefaults.DEFAULT_MAX_RETRIES;
15+
import static com.signalfx.connection.RetryDefaults.DEFAULT_NON_RETRYABLE_EXCEPTIONS;
16+
917
public class HttpDataPointProtobufReceiverFactory implements DataPointReceiverFactory {
1018
public static final int DEFAULT_TIMEOUT_MS = 2000;
1119
public static final int DEFAULT_VERSION = 2;
12-
public static final int DEFAULT_MAX_RETRIES = 3;
1320

1421
private final SignalFxReceiverEndpoint endpoint;
1522
private HttpClientConnectionManager httpClientConnectionManager;
1623
private HttpClientConnectionManager explicitHttpClientConnectionManager;
1724
private int timeoutMs = DEFAULT_TIMEOUT_MS;
1825
private int version = DEFAULT_VERSION;
1926
private int maxRetries = DEFAULT_MAX_RETRIES;
27+
private List<Class<? extends IOException>> nonRetryableExceptions = DEFAULT_NON_RETRYABLE_EXCEPTIONS;
2028

2129
public HttpDataPointProtobufReceiverFactory(SignalFxReceiverEndpoint endpoint) {
2230
this.endpoint = endpoint;
@@ -42,6 +50,11 @@ public HttpDataPointProtobufReceiverFactory setMaxRetries(int maxRetries) {
4250
return this;
4351
}
4452

53+
public HttpDataPointProtobufReceiverFactory setNonRetryableExceptions(List<Class<? extends IOException>> clazzes) {
54+
this.nonRetryableExceptions = Collections.unmodifiableList(new ArrayList<>(clazzes));
55+
return this;
56+
}
57+
4558
public void setHttpClientConnectionManager(
4659
HttpClientConnectionManager httpClientConnectionManager) {
4760
this.explicitHttpClientConnectionManager = httpClientConnectionManager;
@@ -55,13 +68,15 @@ public DataPointReceiver createDataPointReceiver() throws
5568
endpoint,
5669
this.timeoutMs,
5770
this.maxRetries,
58-
resolveHttpClientConnectionManager());
71+
resolveHttpClientConnectionManager(),
72+
this.nonRetryableExceptions);
5973
} else {
6074
return new HttpDataPointProtobufReceiverConnectionV2(
6175
endpoint,
6276
this.timeoutMs,
6377
this.maxRetries,
64-
resolveHttpClientConnectionManager());
78+
resolveHttpClientConnectionManager(),
79+
this.nonRetryableExceptions);
6580
}
6681

6782
}

signalfx-java/src/test/java/com/signalfx/metrics/connection/HttpDataPointProtobufReceiverConnectionTest.java

Lines changed: 87 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@
1414
import java.util.Collections;
1515
import java.util.HashMap;
1616
import java.util.List;
17+
import java.util.concurrent.CountDownLatch;
1718
import java.util.zip.GZIPInputStream;
18-
import javax.servlet.ServletException;
1919
import javax.servlet.http.HttpServletRequest;
2020
import javax.servlet.http.HttpServletResponse;
2121
import org.apache.http.HttpStatus;
@@ -29,6 +29,10 @@
2929
import org.eclipse.jetty.server.handler.AbstractHandler;
3030
import org.junit.Test;
3131

32+
import static java.util.concurrent.TimeUnit.MILLISECONDS;
33+
import static org.junit.Assert.assertEquals;
34+
import static org.junit.Assert.assertTrue;
35+
3236
public class HttpDataPointProtobufReceiverConnectionTest {
3337

3438
public static final String AUTH_TOKEN = "AUTH_TOKEN";
@@ -95,6 +99,69 @@ public void shouldSendDataEvenIfServerClosedConnection() throws Exception {
9599
}
96100
}
97101

102+
@Test
103+
public void shouldRetryOnSocketTimeout() throws Exception {
104+
final CountDownLatch latch = new CountDownLatch(2);
105+
final int clientTimeoutMs = 100;
106+
final LatchedTimeoutHandler handler = new LatchedTimeoutHandler(latch, clientTimeoutMs);
107+
108+
Server server = new Server();
109+
ServerConnector connector = new ServerConnector(server);
110+
connector.setIdleTimeout(1000);
111+
connector.setPort(0);
112+
server.setConnectors(new Connector[]{connector});
113+
server.setHandler(handler);
114+
server.start();
115+
116+
try (AutoCloseable ignored = server::stop) {
117+
URI uri = server.getURI();
118+
DataPointReceiver dpr = new HttpDataPointProtobufReceiverFactory(
119+
new SignalFxEndpoint(uri.getScheme(), uri.getHost(), uri.getPort()))
120+
.setMaxRetries(1)
121+
.setTimeoutMs(clientTimeoutMs)
122+
.setNonRetryableExceptions(Collections.emptyList())
123+
.createDataPointReceiver();
124+
try {
125+
dpr.addDataPoints(AUTH_TOKEN, Collections.singletonList(
126+
SignalFxProtocolBuffers.DataPoint.newBuilder().setSource("source").build()));
127+
} catch (Exception ignored2) {
128+
}
129+
}
130+
131+
assertTrue(latch.await(1000, MILLISECONDS));
132+
}
133+
134+
@Test
135+
public void shouldNotRetryOnDefaultNonRetryableExceptions() throws Exception {
136+
final CountDownLatch latch = new CountDownLatch(1);
137+
final int timeoutMs = 100;
138+
final LatchedTimeoutHandler handler = new LatchedTimeoutHandler(latch, timeoutMs);
139+
140+
Server server = new Server();
141+
ServerConnector connector = new ServerConnector(server);
142+
connector.setIdleTimeout(1000);
143+
connector.setPort(0);
144+
server.setConnectors(new Connector[]{connector});
145+
server.setHandler(handler);
146+
server.start();
147+
148+
try (AutoCloseable ignored = server::stop) {
149+
URI uri = server.getURI();
150+
DataPointReceiver dpr = new HttpDataPointProtobufReceiverFactory(
151+
new SignalFxEndpoint(uri.getScheme(), uri.getHost(), uri.getPort()))
152+
.setTimeoutMs(timeoutMs)
153+
.setMaxRetries(1)
154+
.createDataPointReceiver();
155+
try {
156+
dpr.addDataPoints(AUTH_TOKEN, Collections.singletonList(
157+
SignalFxProtocolBuffers.DataPoint.newBuilder().setSource("source").build()));
158+
} catch (Exception ignored2) {
159+
}
160+
}
161+
162+
assertTrue(latch.await(1000, MILLISECONDS));
163+
}
164+
98165
@Test
99166
public void testBackfill() throws Exception {
100167
Server server = new Server(0);
@@ -193,6 +260,25 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques
193260
}
194261
}
195262

263+
private static class LatchedTimeoutHandler extends AbstractHandler {
264+
private final CountDownLatch latch;
265+
private final int timeoutMs;
266+
267+
LatchedTimeoutHandler(CountDownLatch latch, int timeoutMs) {
268+
this.latch = latch;
269+
this.timeoutMs = timeoutMs;
270+
}
271+
272+
@Override
273+
public void handle(String s, Request request, HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse) throws IOException {
274+
latch.countDown();
275+
try {
276+
Thread.sleep(timeoutMs);
277+
} catch (Exception ignored) {
278+
}
279+
}
280+
}
281+
196282
private static void error(String message, HttpServletResponse response, Request baseRequest)
197283
throws IOException {
198284
response.setStatus(HttpStatus.SC_BAD_REQUEST);

0 commit comments

Comments
 (0)