Skip to content

Commit ae1ee5c

Browse files
Merge pull request #150 from influxdata/crepererum/remove_redpanda_out_of_bounds_quirk
fix: remove potentially buggy Redpanda quirk
2 parents e30696e + 4253939 commit ae1ee5c

File tree

4 files changed

+26
-20
lines changed

4 files changed

+26
-20
lines changed

.circleci/config.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ jobs:
139139
# setup multiple docker images (see https://circleci.com/docs/2.0/configuration-reference/#docker)
140140
docker:
141141
- image: quay.io/influxdb/rust:ci
142-
- image: vectorized/redpanda:v21.11.2
142+
- image: vectorized/redpanda:v22.1.4
143143
name: redpanda-0
144144
command:
145145
- redpanda
@@ -152,7 +152,7 @@ jobs:
152152
- --check=false
153153
- --kafka-addr redpanda-0:9092
154154
- --rpc-addr redpanda-0:33145
155-
- image: vectorized/redpanda:v21.11.2
155+
- image: vectorized/redpanda:v22.1.4
156156
name: redpanda-1
157157
command:
158158
- redpanda
@@ -166,7 +166,7 @@ jobs:
166166
- --kafka-addr redpanda-1:9092
167167
- --rpc-addr redpanda-1:33145
168168
- --seeds redpanda-0:33145
169-
- image: vectorized/redpanda:v21.11.2
169+
- image: vectorized/redpanda:v22.1.4
170170
name: redpanda-2
171171
command:
172172
- redpanda

docker-compose-redpanda.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
version: '3.7'
22
services:
33
redpanda-0:
4-
image: vectorized/redpanda:v21.11.2
4+
image: vectorized/redpanda:v22.1.4
55
container_name: redpanda-0
66
ports:
77
- '9010:9010'
@@ -19,7 +19,7 @@ services:
1919
- --rpc-addr 0.0.0.0:33145
2020
- --advertise-rpc-addr redpanda-0:33145
2121
redpanda-1:
22-
image: vectorized/redpanda:v21.11.2
22+
image: vectorized/redpanda:v22.1.4
2323
container_name: redpanda-1
2424
ports:
2525
- '9011:9011'
@@ -38,7 +38,7 @@ services:
3838
- --rpc-addr 0.0.0.0:33146
3939
- --advertise-rpc-addr redpanda-1:33146
4040
redpanda-2:
41-
image: vectorized/redpanda:v21.11.2
41+
image: vectorized/redpanda:v22.1.4
4242
container_name: redpanda-2
4343
ports:
4444
- '9012:9012'

src/client/partition.rs

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use std::ops::{ControlFlow, Deref, Range};
2424
use std::sync::Arc;
2525
use time::OffsetDateTime;
2626
use tokio::sync::Mutex;
27-
use tracing::{error, info, warn};
27+
use tracing::{error, info};
2828

2929
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
3030
pub enum Compression {
@@ -146,18 +146,6 @@ impl PartitionClient {
146146
})
147147
.await?;
148148

149-
// Redpanda never sends OffsetOutOfRange even when it should. "Luckily" it does not support deletions so we can
150-
// implement a simple heuristic.
151-
if partition.high_watermark.0 < offset {
152-
warn!(
153-
"This message looks like Redpanda wants to report a OffsetOutOfRange but doesn't."
154-
);
155-
return Err(Error::ServerError(
156-
ProtocolError::OffsetOutOfRange,
157-
String::from("Offset out of range"),
158-
));
159-
}
160-
161149
let records = extract_records(partition.records.0, offset)?;
162150

163151
Ok((records, partition.high_watermark.0))

tests/client.rs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,17 +125,35 @@ async fn test_socks5() {
125125
let connection = maybe_skip_kafka_integration!();
126126
// e.g. "localhost:1080"
127127
let proxy = maybe_skip_SOCKS_PROXY!();
128+
let topic_name = random_topic_name();
128129

129130
let client = ClientBuilder::new(connection)
130131
.socks5_proxy(proxy)
131132
.build()
132133
.await
133134
.unwrap();
134-
let partition_client = client.partition_client("myorg_mybucket", 0).unwrap();
135+
136+
let controller_client = client.controller_client().unwrap();
137+
controller_client
138+
.create_topic(&topic_name, 1, 1, 5_000)
139+
.await
140+
.unwrap();
141+
142+
let partition_client = client.partition_client(topic_name, 0).unwrap();
143+
144+
let record = record(b"");
135145
partition_client
146+
.produce(vec![record.clone()], Compression::NoCompression)
147+
.await
148+
.unwrap();
149+
150+
let (mut records, _watermark) = partition_client
136151
.fetch_records(0, 1..10_000_001, 1_000)
137152
.await
138153
.unwrap();
154+
assert_eq!(records.len(), 1);
155+
let record2 = records.remove(0).record;
156+
assert_eq!(record, record2);
139157
}
140158

141159
#[tokio::test]

0 commit comments

Comments
 (0)