Skip to content

Commit 0a8df2f

Browse files
authored
Merge pull request #689 from svenssonaxel/RONDB-896
[RONDB-896] rdrs2: Fix memory leak in getNextReqRS_Buffer
2 parents 7308112 + a22390f commit 0a8df2f

File tree

7 files changed

+86
-57
lines changed

7 files changed

+86
-57
lines changed

storage/ndb/rest-server2/server/src/batch_feature_store_ctrl.cpp

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
#include <vector>
3939
#include <EventLogger.hpp>
4040
#include <ArenaMalloc.hpp>
41+
#include <util/require.h>
4142

4243
extern EventLogger *g_eventLogger;
4344

@@ -221,16 +222,20 @@ void BatchFeatureStoreCtrl::batch_featureStore(
221222
Uint32 request_buffer_size = globalConfigs.internal.reqBufferSize * 2;
222223
Uint32 request_buffer_limit = request_buffer_size / 2;
223224
Uint32 current_head = 0;
224-
RS_Buffer current_request_buffer = rsBufferArrayManager.get_req_buffer();
225+
reqBuffs[0] = rsBufferArrayManager.get_req_buffer();
225226
respBuffs[0] = rsBufferArrayManager.get_resp_buffer();
227+
Uint32 current_request_buffer_idx = 0;
226228
for (Uint32 i = 0; i < noOps; i++) {
227-
RS_Buffer reqBuff = getNextReqRS_Buffer(current_head,
228-
request_buffer_limit,
229-
current_request_buffer,
230-
i);
231-
reqBuffs[i] = reqBuff;
229+
if (i > 0)
230+
reqBuffs[i] = getNextReqRS_Buffer(
231+
current_head,
232+
request_buffer_limit,
233+
reqBuffs[current_request_buffer_idx],
234+
current_request_buffer_idx,
235+
i
236+
);
232237
status = create_native_request(readParams[i],
233-
(Uint32*)reqBuff.buffer,
238+
(Uint32*)reqBuffs[i].buffer,
234239
current_head);
235240
if (static_cast<drogon::HttpStatusCode>(status.http_code) !=
236241
drogon::HttpStatusCode::k200OK) {
@@ -241,10 +246,12 @@ void BatchFeatureStoreCtrl::batch_featureStore(
241246
return;
242247
}
243248
UintPtr length_ptr =
244-
reinterpret_cast<UintPtr>(reqBuff.buffer) +
249+
reinterpret_cast<UintPtr>(reqBuffs[i].buffer) +
245250
static_cast<UintPtr>(PK_REQ_LENGTH_IDX) * ADDRESS_SIZE;
246251
Uint32 *length_ptr_casted = reinterpret_cast<Uint32*>(length_ptr);
247252
reqBuffs[i].size = *length_ptr_casted;
253+
require(reqBuffs[i].buffer + reqBuffs[i].size <=
254+
reqBuffs[current_request_buffer_idx].buffer + current_head);
248255
}
249256
metricsUpdater.set_key_requests(noOps);
250257
// pk_batch_read

storage/ndb/rest-server2/server/src/batch_pk_read_ctrl.cpp

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
#include <simdjson.h>
3434
#include <EventLogger.hpp>
3535
#include <ArenaMalloc.hpp>
36+
#include <util/require.h>
3637

3738
extern EventLogger *g_eventLogger;
3839

@@ -201,18 +202,22 @@ void BatchPKReadCtrl::batchPKRead(
201202
Uint32 request_buffer_size = globalConfigs.internal.reqBufferSize * 2;
202203
Uint32 request_buffer_limit = request_buffer_size / 2;
203204
Uint32 current_head = 0;
204-
RS_Buffer current_request_buffer = rsBufferArrayManager.get_req_buffer();
205+
reqBuffs[0] = rsBufferArrayManager.get_req_buffer();
205206
respBuffs[0] = rsBufferArrayManager.get_resp_buffer();
207+
Uint32 current_request_buffer_idx = 0;
206208
for (Uint32 i = 0; i < noOps; i++) {
207-
RS_Buffer reqBuff = getNextReqRS_Buffer(current_head,
208-
request_buffer_limit,
209-
current_request_buffer,
210-
i);
211-
reqBuffs[i] = reqBuff;
209+
if (i > 0)
210+
reqBuffs[i] = getNextReqRS_Buffer(
211+
current_head,
212+
request_buffer_limit,
213+
reqBuffs[current_request_buffer_idx],
214+
current_request_buffer_idx,
215+
i
216+
);
212217
DEB_BPK_CTRL("Buffer: %p, current_head: %u",
213218
reqBuff.buffer, current_head);
214219
status = create_native_request(reqStructs[i],
215-
(Uint32*)reqBuff.buffer,
220+
(Uint32*)reqBuffs[i].buffer,
216221
current_head);
217222
if (unlikely(static_cast<drogon::HttpStatusCode>(status.http_code) !=
218223
drogon::HttpStatusCode::k200OK)) {
@@ -222,10 +227,12 @@ void BatchPKReadCtrl::batchPKRead(
222227
release_array_buffers(reqBuffs.data(), respBuffs.data(), i);
223228
return;
224229
}
225-
UintPtr length_ptr = reinterpret_cast<UintPtr>(reqBuff.buffer) +
230+
UintPtr length_ptr = reinterpret_cast<UintPtr>(reqBuffs[i].buffer) +
226231
static_cast<UintPtr>(PK_REQ_LENGTH_IDX) * ADDRESS_SIZE;
227232
Uint32 *length_ptr_casted = reinterpret_cast<Uint32*>(length_ptr);
228233
reqBuffs[i].size = *length_ptr_casted;
234+
require(reqBuffs[i].buffer + reqBuffs[i].size <=
235+
reqBuffs[current_request_buffer_idx].buffer + current_head);
229236
}
230237

231238
metricsUpdater.set_key_requests(noOps);

storage/ndb/rest-server2/server/src/buffer_manager.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,12 +133,14 @@ class RS_BufferArrayManager {
133133

134134
void return_req_buffer(RS_Buffer buffer) {
135135
NdbMutex_Lock(reqBufferMutex);
136+
buffer.next_allocated_buffer = 0;
136137
reqBufferArray.push_back(buffer);
137138
NdbMutex_Unlock(reqBufferMutex);
138139
}
139140

140141
void return_resp_buffer(RS_Buffer buffer) {
141142
NdbMutex_Lock(respBufferMutex);
143+
buffer.next_allocated_buffer = 0;
142144
respBufferArray.push_back(buffer);
143145
NdbMutex_Unlock(respBufferMutex);
144146
}

storage/ndb/rest-server2/server/src/db_operations/pk/pkr_operation.cpp

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -509,13 +509,17 @@ RS_Status BatchKeyOperations::create_response(RS_Buffer *respBuffs) {
509509
Uint32 response_buffer_limit = response_buffer_size / 2;
510510
Uint32 current_head = 0;
511511
Uint32 response_length = 0;
512-
RS_Buffer current_response_buffer = respBuffs[0];;
512+
Uint32 current_response_buffer_idx = 0;
513513
for (size_t i = 0; i < m_numOperations; i++) {
514514
current_head += response_length;
515-
respBuffs[i] = getNextRespRS_Buffer(current_head,
516-
response_buffer_limit,
517-
current_response_buffer,
518-
i);
515+
if (i > 0)
516+
respBuffs[i] = getNextRespRS_Buffer(
517+
current_head,
518+
response_buffer_limit,
519+
respBuffs[current_response_buffer_idx],
520+
current_response_buffer_idx,
521+
i
522+
);
519523
KeyOperation *key_op = &m_key_ops[i];
520524
PKRResponse *resp =
521525
new (&key_op->m_resp) PKRResponse(&respBuffs[i]);

storage/ndb/rest-server2/server/src/encoding.cpp

Lines changed: 28 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@
3535
extern EventLogger *g_eventLogger;
3636

3737
#if (defined(VM_TRACE) || defined(ERROR_INSERT))
38-
#define DEBUG_ENC 1
39-
#define DEBUG_ENC_RESP 1
38+
//#define DEBUG_ENC 1
39+
//#define DEBUG_ENC_RESP 1
4040
#endif
4141

4242
#ifdef DEBUG_ENC
@@ -225,48 +225,48 @@ RS_Status create_native_request(PKReadParams &pkReadParams,
225225
RS_Buffer getNextReqRS_Buffer(Uint32 &current_head,
226226
Uint32 request_buffer_limit,
227227
RS_Buffer &current_request_buffer,
228+
Uint32 &current_request_buffer_idx,
228229
Uint32 index) {
229-
RS_Buffer reqBuff;
230-
if (index == 0) { // First buffer already allocated
231-
return current_request_buffer;
232-
} else if (current_head >= request_buffer_limit) {
230+
require(index > 0);
231+
if (current_head >= request_buffer_limit) {
233232
current_request_buffer.next_allocated_buffer = index;
234-
current_request_buffer = rsBufferArrayManager.get_req_buffer();
233+
current_request_buffer_idx = index;
235234
current_head = 0;
235+
RS_Buffer reqBuff = rsBufferArrayManager.get_req_buffer();
236236
DEB_ENC("Allocating a new request buffer index = %u, ptr: %p",
237-
index, current_request_buffer.buffer);
238-
return current_request_buffer;
239-
} else {
240-
reqBuff.next_allocated_buffer = 0xFFFFFFFF; // Garbage
241-
reqBuff.buffer = current_request_buffer.buffer + current_head;
242-
reqBuff.size = (globalConfigs.internal.reqBufferSize * 2) - current_head;
243-
DEB_ENC("Reuse request buffer index = %u at pos: %u, ptr: %p",
244-
index, current_head, reqBuff.buffer);
237+
index, reqBuff.buffer);
238+
return reqBuff;
245239
}
240+
RS_Buffer reqBuff;
241+
reqBuff.next_allocated_buffer = 0xFFFFFFFF; // Garbage
242+
reqBuff.buffer = current_request_buffer.buffer + current_head;
243+
reqBuff.size = (globalConfigs.internal.reqBufferSize * 2) - current_head;
244+
DEB_ENC("Reuse request buffer index = %u at pos: %u, ptr: %p",
245+
index, current_head, reqBuff.buffer);
246246
return reqBuff;
247247
}
248248

249249
RS_Buffer getNextRespRS_Buffer(Uint32 &current_head,
250250
Uint32 response_buffer_limit,
251251
RS_Buffer &current_response_buffer,
252+
Uint32 &current_response_buffer_idx,
252253
Uint32 index) {
253-
RS_Buffer respBuff;
254-
if (index == 0) { // First buffer already allocated
255-
return current_response_buffer;
256-
} else if (current_head >= response_buffer_limit) {
254+
require(index > 0);
255+
if (current_head >= response_buffer_limit) {
257256
current_response_buffer.next_allocated_buffer = index;
258-
current_response_buffer = rsBufferArrayManager.get_resp_buffer();
257+
current_response_buffer_idx = index;
259258
current_head = 0;
259+
RS_Buffer respBuff = rsBufferArrayManager.get_resp_buffer();
260260
DEB_ENC("Allocating a new response buffer index = %u, ptr: %p",
261-
index, current_response_buffer.buffer);
262-
return current_response_buffer;
263-
} else {
264-
respBuff.next_allocated_buffer = 0xFFFFFFFF; // Garbage
265-
respBuff.buffer = current_response_buffer.buffer + current_head;
266-
respBuff.size = (globalConfigs.internal.respBufferSize * 2) - current_head;
267-
DEB_ENC("Reuse response buffer index = %u at pos: %u, ptr: %p",
268-
index, current_head, respBuff.buffer);
261+
index, respBuff.buffer);
262+
return respBuff;
269263
}
264+
RS_Buffer respBuff;
265+
respBuff.next_allocated_buffer = 0xFFFFFFFF; // Garbage
266+
respBuff.buffer = current_response_buffer.buffer + current_head;
267+
respBuff.size = (globalConfigs.internal.respBufferSize * 2) - current_head;
268+
DEB_ENC("Reuse response buffer index = %u at pos: %u, ptr: %p",
269+
index, current_head, respBuff.buffer);
270270
return respBuff;
271271
}
272272

storage/ndb/rest-server2/server/src/encoding.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,12 @@ RS_Status process_pkread_response(ArenaMalloc*,
3838
RS_Buffer getNextReqRS_Buffer(Uint32 &head,
3939
Uint32 request_buffer_limit,
4040
RS_Buffer &current_request_buffer,
41+
Uint32 &current_request_buffer_idx,
4142
Uint32 index);
4243
RS_Buffer getNextRespRS_Buffer(Uint32 &head,
4344
Uint32 response_buffer_limit,
4445
RS_Buffer &current_response_buffer,
46+
Uint32 &current_response_buffer_idx,
4547
Uint32 index);
4648
void release_array_buffers(RS_Buffer *req_buffers,
4749
RS_Buffer *resp_buffers,

storage/ndb/rest-server2/server/src/feature_store_ctrl.cpp

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
#include <vector>
4545
#include <EventLogger.hpp>
4646
#include <ArenaMalloc.hpp>
47+
#include <util/require.h>
4748

4849
extern EventLogger *g_eventLogger;
4950

@@ -712,17 +713,21 @@ void FeatureStoreCtrl::featureStore(
712713
Uint32 request_buffer_size = globalConfigs.internal.reqBufferSize * 2;
713714
Uint32 request_buffer_limit = request_buffer_size / 2;
714715
Uint32 current_head = 0;
715-
RS_Buffer current_request_buffer = rsBufferArrayManager.get_req_buffer();
716+
reqBuffs[0] = rsBufferArrayManager.get_req_buffer();
716717
respBuffs[0] = rsBufferArrayManager.get_resp_buffer();
718+
Uint32 current_request_buffer_idx = 0;
717719
for (Uint32 i = 0; i < noOps; i++) {
718-
RS_Buffer reqBuff = getNextReqRS_Buffer(current_head,
719-
request_buffer_limit,
720-
current_request_buffer,
721-
i);
722-
reqBuffs[i] = reqBuff;
720+
if (i > 0)
721+
reqBuffs[i] = getNextReqRS_Buffer(
722+
current_head,
723+
request_buffer_limit,
724+
reqBuffs[current_request_buffer_idx],
725+
current_request_buffer_idx,
726+
i
727+
);
723728
RS_Status status =
724729
create_native_request(readParams[i],
725-
(Uint32*)reqBuff.buffer,
730+
(Uint32*)reqBuffs[i].buffer,
726731
current_head);
727732
if (unlikely(static_cast<drogon::HttpStatusCode>(status.http_code) !=
728733
drogon::HttpStatusCode::k200OK)) {
@@ -733,10 +738,12 @@ void FeatureStoreCtrl::featureStore(
733738
return;
734739
}
735740
UintPtr length_ptr =
736-
reinterpret_cast<UintPtr>(reqBuff.buffer) +
741+
reinterpret_cast<UintPtr>(reqBuffs[i].buffer) +
737742
static_cast<UintPtr>(PK_REQ_LENGTH_IDX) * ADDRESS_SIZE;
738743
Uint32 *length_ptr_casted = reinterpret_cast<Uint32*>(length_ptr);
739744
reqBuffs[i].size = *length_ptr_casted;
745+
require(reqBuffs[i].buffer + reqBuffs[i].size <=
746+
reqBuffs[current_request_buffer_idx].buffer + current_head);
740747
}
741748
// pk_batch_read
742749
{

0 commit comments

Comments
 (0)