Skip to content

Commit ae48631

Browse files
committed
Fix file desc. leak with wrong credentials connect
Fixes #132.
1 parent d2dfa4c commit ae48631

File tree

4 files changed

+77
-29
lines changed

4 files changed

+77
-29
lines changed

src/main/java/org/tarantool/TarantoolBase.java

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,13 @@ public TarantoolBase() {
4040
public TarantoolBase(String username, String password, Socket socket) {
4141
super();
4242
try {
43-
this.is = new DataInputStream(cis = new CountInputStreamImpl(socket.getInputStream()));
43+
cis = new CountInputStreamImpl(socket.getInputStream())
44+
is = new DataInputStream(cis);
4445
byte[] bytes = new byte[64];
4546
is.readFully(bytes);
4647
String firstLine = new String(bytes);
4748
if (!firstLine.startsWith(WELCOME)) {
49+
closeStreams();
4850
close();
4951
throw new CommunicationException("Welcome message should starts with tarantool but starts with '" + firstLine + "'", new IllegalStateException("Invalid welcome packet"));
5052
}
@@ -56,23 +58,15 @@ public TarantoolBase(String username, String password, Socket socket) {
5658
OutputStream os = socket.getOutputStream();
5759
os.write(authPacket.array(), 0, authPacket.remaining());
5860
os.flush();
59-
readPacket(is);
61+
readPacket();
6062
Long code = (Long) headers.get(Key.CODE.getId());
6163
if (code != 0) {
64+
closeStreams();
6265
throw serverError(code, body.get(Key.ERROR.getId()));
6366
}
6467
}
6568
} catch (IOException e) {
66-
try {
67-
is.close();
68-
} catch (IOException ignored) {
69-
70-
}
71-
try {
72-
cis.close();
73-
} catch (IOException ignored) {
74-
75-
}
69+
closeStreams();
7670
throw new CommunicationException("Couldn't connect to tarantool", e);
7771
}
7872
}
@@ -130,7 +124,7 @@ protected ByteBuffer createPacket(Code code, Long syncId, Long schemaId, Object.
130124
return buffer;
131125
}
132126

133-
protected void readPacket(DataInputStream is) throws IOException {
127+
protected void readPacket() throws IOException {
134128
int size = ((Number) msgPackLite.unpack(is)).intValue();
135129
long mark = cis.getBytesRead();
136130
headers = (Map<Integer, Object>) msgPackLite.unpack(is);
@@ -185,7 +179,6 @@ protected List<Map<String, Object>> readSqlResult(List<List<?>> data) {
185179
return values;
186180
}
187181

188-
189182
protected Long getSqlRowCount() {
190183
Map<Key, Object> info = (Map<Key, Object>) body.get(Key.SQL_INFO.getId());
191184
Number rowCount;
@@ -220,6 +213,21 @@ protected void closeChannel(SocketChannel channel) {
220213
}
221214
}
222215

216+
protected void closeStreams() {
217+
if (is != null) {
218+
try {
219+
is.close();
220+
} catch (IOException ignored) {
221+
}
222+
}
223+
if (cis != null) {
224+
try {
225+
cis.close();
226+
} catch (IOException ignored) {
227+
}
228+
}
229+
}
230+
223231
protected void validateArgs(Object[] args) {
224232
if (args != null) {
225233
for (int i = 0; i < args.length; i += 2) {

src/main/java/org/tarantool/TarantoolClientImpl.java

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -132,40 +132,34 @@ protected void reconnect(int retry, Throwable lastError) {
132132
}
133133

134134
protected void connect(final SocketChannel channel) throws Exception {
135+
closeStreams();
135136
try {
136-
DataInputStream is = new DataInputStream(cis = new ByteBufferInputStream(channel));
137+
cis = new ByteBufferInputStream(channel);
138+
is = new DataInputStream(cis);
137139
byte[] bytes = new byte[64];
138140
is.readFully(bytes);
139141
String firstLine = new String(bytes);
140142
if (!firstLine.startsWith("Tarantool")) {
141143
CommunicationException e = new CommunicationException("Welcome message should starts with tarantool " +
142144
"but starts with '" + firstLine + "'", new IllegalStateException("Invalid welcome packet"));
143145

146+
closeStreams();
144147
close(e);
145148
throw e;
146149
}
147150
is.readFully(bytes);
148151
this.salt = new String(bytes);
149152
if (config.username != null && config.password != null) {
150153
writeFully(channel, createAuthPacket(config.username, config.password));
151-
readPacket(is);
154+
readPacket();
152155
Long code = (Long) headers.get(Key.CODE.getId());
153156
if (code != 0) {
157+
closeStreams();
154158
throw serverError(code, body.get(Key.ERROR.getId()));
155159
}
156160
}
157-
this.is = is;
158161
} catch (IOException e) {
159-
try {
160-
is.close();
161-
} catch (IOException ignored) {
162-
163-
}
164-
try {
165-
cis.close();
166-
} catch (IOException ignored) {
167-
168-
}
162+
closeStreams();
169163
throw new CommunicationException("Couldn't connect to tarantool", e);
170164
}
171165
channel.configureBlocking(false);
@@ -358,7 +352,7 @@ protected void readThread() {
358352
while (!Thread.currentThread().isInterrupted()) {
359353
try {
360354
long code;
361-
readPacket(is);
355+
readPacket();
362356
code = (Long) headers.get(Key.CODE.getId());
363357
Long syncId = (Long) headers.get(Key.SYNC.getId());
364358
CompletableFuture<?> future = futures.remove(syncId);

src/main/java/org/tarantool/TarantoolConnection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ protected List<?> exec(Code code, Object... args) {
2828
ByteBuffer packet = createPacket(code, syncId.incrementAndGet(), null, args);
2929
out.write(packet.array(), 0, packet.remaining());
3030
out.flush();
31-
readPacket(is);
31+
readPacket();
3232
Long c = (Long) headers.get(Key.CODE.getId());
3333
if (c == 0) {
3434
return (List) body.get(Key.DATA.getId());

src/test/java/org/tarantool/ClientReconnectIT.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
public class ClientReconnectIT extends AbstractTarantoolConnectorIT {
2929
private static final String INSTANCE_NAME = "jdk-testing";
3030
private TarantoolClient client;
31+
private static final int NON_EXIST_PORT = 3333;
3132

3233
@AfterEach
3334
public void tearDown() {
@@ -302,4 +303,49 @@ public void run() {
302303

303304
assertTrue(cnt.get() > threads.length);
304305
}
306+
307+
/**
308+
* Verify that we don't exhaust a file descriptor limit (and so likely
309+
* don't leak file descriptors) when trying to connect to a non existing
310+
* node.
311+
*/
312+
@Test
313+
public void testReconnectNonExist() throws Exception {
314+
SocketChannelProvider provider = new TestSocketChannelProvider(host,
315+
NON_EXIST_PORT, RESTART_TIMEOUT);
316+
TarantoolClientConfig config = makeClientConfig();
317+
config.initTimeoutMillis = 100;
318+
for (int i = 0; i < 100; ++i) {
319+
try {
320+
TarantoolClientImpl client = new TarantoolClientImpl(
321+
provider, config);
322+
} catch (Exception ignored) {
323+
}
324+
if (i % 10 == 0)
325+
System.out.println(Integer.toString(100 - i) +
326+
" iterations remain");
327+
}
328+
}
329+
330+
/**
331+
* Verify that we don't exhaust a file descriptor limit (and so likely
332+
* don't leak file descriptors) when trying to connect to an existing node
333+
* with wrong authentification credentials.
334+
*/
335+
@Test
336+
public void testReconnectWrongAuth() throws Exception {
337+
TarantoolClientConfig config = makeClientConfig();
338+
config.initTimeoutMillis = 100;
339+
config.password = config.password + 'x';
340+
for (int i = 0; i < 100; ++i) {
341+
try {
342+
TarantoolClientImpl client = new TarantoolClientImpl(
343+
socketChannelProvider, config);
344+
} catch (Exception ignored) {
345+
}
346+
if (i % 10 == 0)
347+
System.out.println(Integer.toString(100 - i) +
348+
" iterations remain");
349+
}
350+
}
305351
}

0 commit comments

Comments
 (0)