Skip to content

Commit e4078bc

Browse files
crepererummario-s
andcommitted
test: SOCKS5 in CI and docker compose files
Closes #110. Co-authored-by: Mario-S <[email protected]>
1 parent bd61405 commit e4078bc

12 files changed

+140
-75
lines changed

.circleci/config.yml

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,8 @@ jobs:
177177
- --kafka-addr redpanda-2:9092
178178
- --rpc-addr redpanda-2:33145
179179
- --seeds redpanda-0:33145
180+
- image: serjs/go-socks5-proxy
181+
name: proxy
180182
resource_class: xlarge # use of a smaller executor tends crashes on link
181183
environment:
182184
# Disable incremental compilation to avoid overhead. We are not preserving these files anyway.
@@ -193,7 +195,8 @@ jobs:
193195
TEST_JAVA_INTEROPT: 1
194196
# Don't use the first node here since this is likely the controller and we want to ensure that we automatically
195197
# pick the controller for certain actions (e.g. topic creation) and don't just get lucky.
196-
KAFKA_CONNECT: "redpanda-1:9092"
198+
KAFKA_CONNECT: "invalid:9092,redpanda-1:9092"
199+
SOCKS_PROXY: "proxy:1080"
197200
steps:
198201
- checkout
199202
- rust_components
@@ -244,6 +247,8 @@ jobs:
244247
- KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
245248
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka-2:9092,EXTERNAL://kafka-2:9093
246249
- KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT
250+
- image: serjs/go-socks5-proxy
251+
name: proxy
247252
resource_class: xlarge # use of a smaller executor tends crashes on link
248253
environment:
249254
# Disable incremental compilation to avoid overhead. We are not preserving these files anyway.
@@ -262,7 +267,8 @@ jobs:
262267
TEST_JAVA_INTEROPT: 1
263268
# Don't use the first node here since this is likely the controller and we want to ensure that we automatically
264269
# pick the controller for certain actions (e.g. topic creation) and don't just get lucky.
265-
KAFKA_CONNECT: "kafka-1:9093"
270+
KAFKA_CONNECT: "invalid:9093,kafka-1:9093"
271+
SOCKS_PROXY: "proxy:1080"
266272
steps:
267273
- checkout
268274
- rust_components

README.md

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ $ docker-compose -f docker-compose-redpanda.yml up
114114
in one session, and then run:
115115

116116
```console
117-
$ TEST_INTEGRATION=1 KAFKA_CONNECT=0.0.0.0:9093 cargo test
117+
$ TEST_INTEGRATION=1 KAFKA_CONNECT=0.0.0.0:9011 cargo test
118118
```
119119

120120
in another session.
@@ -130,12 +130,24 @@ $ docker-compose -f docker-compose-kafka.yml up
130130
in one session, and then run:
131131

132132
```console
133-
$ TEST_INTEGRATION=1 TEST_DELETE_RECORDS=1 KAFKA_CONNECT=localhost:9094 cargo test
133+
$ TEST_INTEGRATION=1 TEST_DELETE_RECORDS=1 KAFKA_CONNECT=localhost:9011 cargo test
134134
```
135135

136136
in another session. Note that Apache Kafka supports a different set of features then redpanda, so we pass other
137137
environment variables.
138138

139+
### Using a SOCKS5 Proxy
140+
141+
To run the integration test via a SOCKS5 proxy, you need to set the environment variable `SOCKS_PROXY`. The following
142+
command requires a running proxy on the local machine.
143+
144+
```console
145+
$ KAFKA_CONNECT=0.0.0.0:9011,kafka-1:9021,redpanda-1:9021 SOCKS_PROXY=localhost:1080 cargo test --features full
146+
```
147+
148+
The SOCKS5 proxy will automatically be started by the docker compose files. Note that `KAFKA_CONNECT` was extended by
149+
addresses that are reachable via the proxy.
150+
139151
### Java Interopt
140152
To test if RSKafka can produce/consume records to/from the official Java client, you need to have Java installed and the
141153
`TEST_JAVA_INTEROPT=1` environment variable set.
@@ -218,14 +230,14 @@ execution that hooks right into the place where it is about to exit:
218230
Install [cargo-criterion], make sure you have some Kafka cluster running, and then you can run all benchmarks with:
219231

220232
```console
221-
$ TEST_INTEGRATION=1 KAFKA_CONNECT=localhost:9093 cargo criterion --all-features
233+
$ TEST_INTEGRATION=1 KAFKA_CONNECT=localhost:9011 cargo criterion --all-features
222234
```
223235

224236
If you find a benchmark that is too slow, you can may want to profile it. Get [cargo-with], and [perf], then run (here
225237
for the `parallel/rskafka` benchmark):
226238

227239
```console
228-
$ TEST_INTEGRATION=1 KAFKA_CONNECT=localhost:9093 cargo with 'perf record --call-graph dwarf -- {bin}' -- \
240+
$ TEST_INTEGRATION=1 KAFKA_CONNECT=localhost:9011 cargo with 'perf record --call-graph dwarf -- {bin}' -- \
229241
bench --all-features --bench write_throughput -- \
230242
--bench --noplot parallel/rskafka
231243
```

benches/write_throughput.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,11 @@ macro_rules! maybe_skip_kafka_integration {
183183
env::var("TEST_INTEGRATION").is_ok(),
184184
env::var("KAFKA_CONNECT").ok(),
185185
) {
186-
(true, Some(kafka_connection)) => kafka_connection,
186+
(true, Some(kafka_connection)) => {
187+
let kafka_connection: Vec<String> =
188+
kafka_connection.split(",").map(|s| s.to_owned()).collect();
189+
kafka_connection
190+
}
187191
(true, None) => {
188192
panic!(
189193
"TEST_INTEGRATION is set which requires running integration tests, but \
@@ -227,7 +231,7 @@ fn random_topic_name() -> String {
227231
format!("test_topic_{}", uuid::Uuid::new_v4())
228232
}
229233

230-
async fn setup_rdkafka(connection: String, buffering: bool) -> (FutureProducer, String) {
234+
async fn setup_rdkafka(connection: Vec<String>, buffering: bool) -> (FutureProducer, String) {
231235
use rdkafka::{
232236
admin::{AdminClient, AdminOptions, NewTopic, TopicReplication},
233237
producer::FutureRecord,
@@ -239,7 +243,7 @@ async fn setup_rdkafka(connection: String, buffering: bool) -> (FutureProducer,
239243

240244
// configure clients
241245
let mut cfg = ClientConfig::new();
242-
cfg.set("bootstrap.servers", connection);
246+
cfg.set("bootstrap.servers", connection.join(","));
243247
cfg.set("message.timeout.ms", "5000");
244248
if buffering {
245249
cfg.set("batch.num.messages", PARALLEL_BATCH_SIZE.to_string()); // = loads
@@ -273,10 +277,10 @@ async fn setup_rdkafka(connection: String, buffering: bool) -> (FutureProducer,
273277
(producer_client, topic_name)
274278
}
275279

276-
async fn setup_rskafka(connection: String) -> PartitionClient {
280+
async fn setup_rskafka(connection: Vec<String>) -> PartitionClient {
277281
let topic_name = random_topic_name();
278282

279-
let client = ClientBuilder::new(vec![connection]).build().await.unwrap();
283+
let client = ClientBuilder::new(connection).build().await.unwrap();
280284
client
281285
.controller_client()
282286
.await

docker-compose-kafka.yml

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
version: "2"
1+
version: "3"
22

33
services:
44
zookeeper:
@@ -12,14 +12,14 @@ services:
1212
kafka-0:
1313
image: docker.io/bitnami/kafka:3
1414
ports:
15-
- "9093:9093"
15+
- "9010:9010"
1616
environment:
1717
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
1818
- KAFKA_CFG_BROKER_ID=0
1919
- ALLOW_PLAINTEXT_LISTENER=yes
20-
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
21-
- KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
22-
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka-0:9092,EXTERNAL://localhost:9093
20+
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT,FOR_PROXY:PLAINTEXT
21+
- KAFKA_CFG_LISTENERS=CLIENT://:9000,EXTERNAL://:9010,FOR_PROXY://:9020
22+
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka-0:9000,EXTERNAL://localhost:9010,FOR_PROXY://kafka-0:9020
2323
- KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT
2424
volumes:
2525
- kafka_0_data:/bitnami/kafka
@@ -28,14 +28,14 @@ services:
2828
kafka-1:
2929
image: docker.io/bitnami/kafka:3
3030
ports:
31-
- "9094:9094"
31+
- "9011:9011"
3232
environment:
3333
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
3434
- KAFKA_CFG_BROKER_ID=1
3535
- ALLOW_PLAINTEXT_LISTENER=yes
36-
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
37-
- KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9094
38-
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka-1:9092,EXTERNAL://localhost:9094
36+
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT,FOR_PROXY:PLAINTEXT
37+
- KAFKA_CFG_LISTENERS=CLIENT://:9000,EXTERNAL://:9011,FOR_PROXY://:9021
38+
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka-1:9000,EXTERNAL://localhost:9011,FOR_PROXY://kafka-1:9021
3939
- KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT
4040
volumes:
4141
- kafka_1_data:/bitnami/kafka
@@ -44,19 +44,27 @@ services:
4444
kafka-2:
4545
image: docker.io/bitnami/kafka:3
4646
ports:
47-
- "9095:9095"
47+
- "9012:9012"
4848
environment:
4949
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
5050
- KAFKA_CFG_BROKER_ID=2
5151
- ALLOW_PLAINTEXT_LISTENER=yes
52-
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
53-
- KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9095
54-
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka-2:9092,EXTERNAL://localhost:9095
52+
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT,FOR_PROXY:PLAINTEXT
53+
- KAFKA_CFG_LISTENERS=CLIENT://:9000,EXTERNAL://:9012,FOR_PROXY://:9022
54+
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka-2:9000,EXTERNAL://localhost:9012,FOR_PROXY://kafka-2:9022
5555
- KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT
5656
volumes:
5757
- kafka_2_data:/bitnami/kafka
5858
depends_on:
5959
- zookeeper
60+
proxy:
61+
image: serjs/go-socks5-proxy
62+
ports:
63+
- "1080:1080"
64+
depends_on:
65+
- kafka-0
66+
- kafka-1
67+
- kafka-2
6068

6169
volumes:
6270
zookeeper_data:

docker-compose-redpanda.yml

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ services:
44
image: vectorized/redpanda:v21.11.2
55
container_name: redpanda-0
66
ports:
7-
- '9092:9092'
7+
- '9010:9010'
88
command:
99
- redpanda
1010
- start
@@ -14,15 +14,15 @@ services:
1414
- --overprovisioned
1515
- --node-id 0
1616
- --check=false
17-
- --kafka-addr 0.0.0.0:9092
18-
- --advertise-kafka-addr 127.0.0.1:9092
17+
- --kafka-addr EXTERNAL://0.0.0.0:9010,FOR_PROXY://0.0.0.0:9020
18+
- --advertise-kafka-addr EXTERNAL://127.0.0.1:9010,FOR_PROXY://redpanda-0:9020
1919
- --rpc-addr 0.0.0.0:33145
2020
- --advertise-rpc-addr redpanda-0:33145
2121
redpanda-1:
2222
image: vectorized/redpanda:v21.11.2
2323
container_name: redpanda-1
2424
ports:
25-
- '9093:9093'
25+
- '9011:9011'
2626
command:
2727
- redpanda
2828
- start
@@ -33,15 +33,15 @@ services:
3333
- --node-id 1
3434
- --seeds "redpanda-0:33145"
3535
- --check=false
36-
- --kafka-addr 0.0.0.0:9093
37-
- --advertise-kafka-addr 127.0.0.1:9093
36+
- --kafka-addr EXTERNAL://0.0.0.0:9011,FOR_PROXY://0.0.0.0:9021
37+
- --advertise-kafka-addr EXTERNAL://127.0.0.1:9011,FOR_PROXY://redpanda-1:9021
3838
- --rpc-addr 0.0.0.0:33146
3939
- --advertise-rpc-addr redpanda-1:33146
4040
redpanda-2:
4141
image: vectorized/redpanda:v21.11.2
4242
container_name: redpanda-2
4343
ports:
44-
- '9094:9094'
44+
- '9012:9012'
4545
command:
4646
- redpanda
4747
- start
@@ -52,7 +52,15 @@ services:
5252
- --node-id 2
5353
- --seeds "redpanda-0:33145"
5454
- --check=false
55-
- --kafka-addr 0.0.0.0:9094
56-
- --advertise-kafka-addr 127.0.0.1:9094
55+
- --kafka-addr EXTERNAL://0.0.0.0:9012,FOR_PROXY://0.0.0.0:9022
56+
- --advertise-kafka-addr EXTERNAL://127.0.0.1:9012,FOR_PROXY://redpanda-2:9022
5757
- --rpc-addr 0.0.0.0:33147
58-
- --advertise-rpc-addr redpanda-2:33147
58+
- --advertise-rpc-addr redpanda-2:33147
59+
proxy:
60+
image: serjs/go-socks5-proxy
61+
ports:
62+
- "1080:1080"
63+
depends_on:
64+
- redpanda-0
65+
- redpanda-1
66+
- redpanda-2

tests/client.rs

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,15 @@ async fn test_plain() {
1717
maybe_start_logging();
1818

1919
let connection = maybe_skip_kafka_integration!();
20-
ClientBuilder::new(vec![connection]).build().await.unwrap();
20+
ClientBuilder::new(connection).build().await.unwrap();
2121
}
2222

2323
#[tokio::test]
2424
async fn test_topic_crud() {
2525
maybe_start_logging();
2626

2727
let connection = maybe_skip_kafka_integration!();
28-
let client = ClientBuilder::new(vec![connection]).build().await.unwrap();
28+
let client = ClientBuilder::new(connection).build().await.unwrap();
2929
let controller_client = client.controller_client().await.unwrap();
3030
let topics = client.list_topics().await.unwrap();
3131

@@ -109,22 +109,25 @@ async fn test_tls() {
109109
.unwrap();
110110

111111
let connection = maybe_skip_kafka_integration!();
112-
ClientBuilder::new(vec![connection])
112+
ClientBuilder::new(connection)
113113
.tls_config(Arc::new(config))
114114
.build()
115115
.await
116116
.unwrap();
117117
}
118118

119-
// Disabled as currently no SOCKS5 integration tests
120119
#[cfg(feature = "transport-socks5")]
121-
#[ignore]
122120
#[tokio::test]
123121
async fn test_socks5() {
124122
maybe_start_logging();
125123

126-
let client = ClientBuilder::new(vec!["my-cluster-kafka-bootstrap:9092".to_owned()])
127-
.socks5_proxy("localhost:1080".to_owned())
124+
// e.g. "my-connection-kafka-bootstrap:9092"
125+
let connection = maybe_skip_kafka_integration!();
126+
// e.g. "localhost:1080"
127+
let proxy = maybe_skip_SOCKS_PROXY!();
128+
129+
let client = ClientBuilder::new(connection)
130+
.socks5_proxy(proxy)
128131
.build()
129132
.await
130133
.unwrap();
@@ -143,7 +146,7 @@ async fn test_produce_empty() {
143146
let topic_name = random_topic_name();
144147
let n_partitions = 2;
145148

146-
let client = ClientBuilder::new(vec![connection]).build().await.unwrap();
149+
let client = ClientBuilder::new(connection).build().await.unwrap();
147150
let controller_client = client.controller_client().await.unwrap();
148151
controller_client
149152
.create_topic(&topic_name, n_partitions, 1, 5_000)
@@ -165,7 +168,7 @@ async fn test_consume_empty() {
165168
let topic_name = random_topic_name();
166169
let n_partitions = 2;
167170

168-
let client = ClientBuilder::new(vec![connection]).build().await.unwrap();
171+
let client = ClientBuilder::new(connection).build().await.unwrap();
169172
let controller_client = client.controller_client().await.unwrap();
170173
controller_client
171174
.create_topic(&topic_name, n_partitions, 1, 5_000)
@@ -189,7 +192,7 @@ async fn test_consume_offset_out_of_range() {
189192
let topic_name = random_topic_name();
190193
let n_partitions = 2;
191194

192-
let client = ClientBuilder::new(vec![connection]).build().await.unwrap();
195+
let client = ClientBuilder::new(connection).build().await.unwrap();
193196
let controller_client = client.controller_client().await.unwrap();
194197
controller_client
195198
.create_topic(&topic_name, n_partitions, 1, 5_000)
@@ -222,7 +225,7 @@ async fn test_get_offset() {
222225
let topic_name = random_topic_name();
223226
let n_partitions = 1;
224227

225-
let client = ClientBuilder::new(vec![connection.clone()])
228+
let client = ClientBuilder::new(connection.clone())
226229
.build()
227230
.await
228231
.unwrap();
@@ -289,7 +292,7 @@ async fn test_produce_consume_size_cutoff() {
289292
let connection = maybe_skip_kafka_integration!();
290293
let topic_name = random_topic_name();
291294

292-
let client = ClientBuilder::new(vec![connection]).build().await.unwrap();
295+
let client = ClientBuilder::new(connection).build().await.unwrap();
293296
let controller_client = client.controller_client().await.unwrap();
294297
controller_client
295298
.create_topic(&topic_name, 1, 1, 5_000)
@@ -362,7 +365,7 @@ async fn test_consume_midbatch() {
362365
let connection = maybe_skip_kafka_integration!();
363366
let topic_name = random_topic_name();
364367

365-
let client = ClientBuilder::new(vec![connection]).build().await.unwrap();
368+
let client = ClientBuilder::new(connection).build().await.unwrap();
366369
let controller_client = client.controller_client().await.unwrap();
367370
controller_client
368371
.create_topic(&topic_name, 1, 1, 5_000)
@@ -407,7 +410,7 @@ async fn test_delete_records() {
407410
let connection = maybe_skip_kafka_integration!();
408411
let topic_name = random_topic_name();
409412

410-
let client = ClientBuilder::new(vec![connection]).build().await.unwrap();
413+
let client = ClientBuilder::new(connection).build().await.unwrap();
411414
let controller_client = client.controller_client().await.unwrap();
412415
controller_client
413416
.create_topic(&topic_name, 1, 1, 5_000)

0 commit comments

Comments
 (0)