Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion rfaas/include/rfaas/allocation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace rfaas {

struct LeaseRequest {
// > 0: Number of cores to be allocated
// < 0: client_id with negative sign, deallocation & disconnect request
// <= 0: client_id with negative sign, deallocation & disconnect request
int16_t cores;
int32_t memory;
};
Expand All @@ -25,6 +25,8 @@ namespace rfaas {
int32_t port;
char address[16];
//LeasedNode nodes[MAX_NODES_PER_LEASE];
int16_t cores;
int32_t memory;
};

struct AllocationRequest {
Expand Down
12 changes: 7 additions & 5 deletions rfaas/include/rfaas/client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,14 @@ namespace rfaas {
}

int response_id = responses[0].wr_id;
const LeaseResponse& lease_response = _resource_mgr.response(response_id);

return rfaas::executor{
std::string{_resource_mgr.response(response_id).address},
_resource_mgr.response(response_id).port,
cores,
memory,
_resource_mgr.response(response_id).lease_id,
std::string{lease_response.address},
lease_response.port,
lease_response.cores,
lease_response.memory,
lease_response.lease_id,
dev
Comment on lines +57 to 65
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Potential UB: unsafe std::string construction from fixed-size char buffer.

lease_response.address is filled with strncpy on the server; it’s not guaranteed to be null-terminated. Constructing std::string from char* can read past bounds.

Use a bounded constructor:

+#include <cstring> // for strnlen
 ...
-      return rfaas::executor{
-        std::string{lease_response.address},
+      return rfaas::executor{
+        std::string{
+          lease_response.address,
+          strnlen(lease_response.address, sizeof(lease_response.address))
+        },
         lease_response.port,
         lease_response.cores,
         lease_response.memory,
         lease_response.lease_id,
         dev
       };

Also ensure the producer side explicitly null-terminates address (see db.cpp review).

🤖 Prompt for AI Agents
In rfaas/include/rfaas/client.hpp around lines 57 to 65, constructing
std::string from lease_response.address (a fixed-size char buffer filled with
strncpy) is unsafe because it may not be null-terminated; change the
construction to use the bounded std::string constructor with an explicit length
(e.g., std::string(lease_response.address, strnlen(lease_response.address,
ADDRESS_MAX))) or pass the known buffer size trimmed by strnlen, and also ensure
the producer side (server/db.cpp) explicitly null-terminates address after
strncpy to prevent non-terminated buffers.

};
}
Expand Down
26 changes: 16 additions & 10 deletions server/executor_manager/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,22 @@ namespace rfaas::executor_manager {
waitpid(executor->id(), &status, WUNTRACED);
auto e = std::chrono::high_resolution_clock::now();
spdlog::info("Waited for child {} ms", std::chrono::duration_cast<std::chrono::milliseconds>(e-b).count());

int32_t lease_id = executor->_lease_id;
if (res_mgr_connection) {

spdlog::debug("Client {}: sending lease deallocation to resource manager for lease id {}", _id, lease_id);
res_mgr_connection->close_lease(
lease_id,
allocation_time,
accounting.data()[0].execution_time,
accounting.data()[0].hot_polling_time
);
}
else {
spdlog::error("Client {}: could not send lease deallocation to resource manager for lease id {}", _id, lease_id);
}

executor.reset();
}
spdlog::info(
Expand All @@ -104,16 +120,6 @@ namespace rfaas::executor_manager {
accounting.data()[0].execution_time / 1000.0
);

if(res_mgr_connection) {

res_mgr_connection->close_lease(
_id,
allocation_time,
accounting.data()[0].execution_time,
accounting.data()[0].hot_polling_time
);

}

//acc.hot_polling_time = acc.execution_time = 0;
// SEGFAULT?
Expand Down
6 changes: 4 additions & 2 deletions server/executor_manager/executor_process.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@ namespace rfaas::executor_manager {
connections[pos] = connection;
}

ProcessExecutor::ProcessExecutor(int cores, ProcessExecutor::time_t alloc_begin, pid_t pid):
ProcessExecutor::ProcessExecutor(int cores, ProcessExecutor::time_t alloc_begin, pid_t pid, int32_t lease_id):
ActiveExecutor(cores),
_pid(pid)
{
_allocation_begin = alloc_begin;
// FIXME: remove after connection
_allocation_finished = _allocation_begin;

_lease_id = lease_id;
}

std::tuple<ProcessExecutor::Status,int> ProcessExecutor::check() const
Expand Down Expand Up @@ -219,7 +221,7 @@ namespace rfaas::executor_manager {
}
if(counter == 36)
counter = 0;
return new ProcessExecutor{lease.cores, begin, mypid};
return new ProcessExecutor{lease.cores, begin, mypid, lease.id};
}

}
Expand Down
3 changes: 2 additions & 1 deletion server/executor_manager/executor_process.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ namespace rfaas::executor_manager {
rdmalib::Connection** connections;
int connections_len;
int cores;
int32_t _lease_id;

ActiveExecutor(int cores):
connections(new rdmalib::Connection*[cores]),
Expand All @@ -49,7 +50,7 @@ namespace rfaas::executor_manager {
{
pid_t _pid;

ProcessExecutor(int cores, time_t alloc_begin, pid_t pid);
ProcessExecutor(int cores, time_t alloc_begin, pid_t pid, int32_t lease_id);

// FIXME: kill active executor
//~ProcessExecutor();
Expand Down
14 changes: 6 additions & 8 deletions server/executor_manager/manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,8 @@ namespace rfaas::executor_manager {
);
auto end = std::chrono::high_resolution_clock::now();
spdlog::info(
"Client {} at {}:{} has executor with {} ID and {} cores, time {} us",
client.id(), client_address, client_port, client.executor->id(), lease->cores,
"Client {} at {}:{} has executor with pid {}, lease id {}, cores {}, time {} us",
client.id(), client_address, client_port, client.executor->id(), lease->id, lease->cores,
std::chrono::duration_cast<std::chrono::microseconds>(end-now).count()
);

Expand All @@ -327,7 +327,6 @@ namespace rfaas::executor_manager {
} else {

spdlog::info("Client {} disconnects", client.id());
//client.disable(i, _accounting_data.data()[i]);
client.disable(_res_mgr_connection.get());

return false;
Expand All @@ -340,7 +339,7 @@ namespace rfaas::executor_manager {
for(auto it = _clients.begin(); it != _clients.end(); ++it) {

Client & client = it->second;
int i = it->first;
uint32_t client_id = it->first;
if(!client.active()) {
continue;
}
Expand All @@ -361,9 +360,9 @@ namespace rfaas::executor_manager {
// send lease cancellation
spdlog::info(
"Executor at client {} exited, status {}, time allocated {} us, polling {} us, execution {} us",
i, std::get<1>(status), client.allocation_time,
client.accounting.data()[i].hot_polling_time / 1000.0,
client.accounting.data()[i].execution_time / 1000.0
client_id, std::get<1>(status), client.allocation_time,
client.accounting.data()[client_id].hot_polling_time / 1000.0,
client.accounting.data()[client_id].execution_time / 1000.0
);
client.executor.reset(nullptr);
spdlog::info("Finished cleanup");
Expand Down Expand Up @@ -426,7 +425,6 @@ namespace rfaas::executor_manager {
spdlog::debug("[Manager] Disconnecting client");

Client& client = (*it).second;
//client.disable(i, _accounting_data.data()[i]);
client.disable(_res_mgr_connection.get());
_clients.erase(it);

Expand Down
7 changes: 7 additions & 0 deletions server/resource_manager/db.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ namespace rfaas { namespace resource_manager {
continue;
}

// Lease entire free memory when receiving reserved value -1
if (memory == -1) {
memory = shared_ptr->_free_memory;
}

if(!shared_ptr->lease(numcores, memory)) {
++it;
SPDLOG_DEBUG("Node {} cannot be used, not enough resources!", shared_ptr->node);
Expand All @@ -84,6 +89,8 @@ namespace rfaas { namespace resource_manager {
lease.lease_id = _lease_count++;
lease.port = shared_ptr->port;
strncpy(lease.address, shared_ptr->address.c_str(), Executor::ADDRESS_LENGTH);
lease.cores = numcores;
lease.memory = memory;

bool is_total = shared_ptr->is_fully_leased();
if(is_total) {
Expand Down
5 changes: 4 additions & 1 deletion server/resource_manager/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,10 @@ namespace rfaas::resource_manager {

bool Executor::lease(int cores, int memory)
{
// Not enough memory? skip
if (cores <= 0 || memory <= 0 || _free_cores <= 0 || _free_memory <= 0) {
return false;
}

if(_free_memory < memory) {
return false;
}
Expand Down
16 changes: 2 additions & 14 deletions server/resource_manager/manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -322,8 +322,8 @@ void Manager::_handle_client_message(ibv_wc& wc, std::vector<Client*>& poll_send
} else {

allocated->_send_buffer[0].lease_id = client.response()[0].lease_id;
allocated->_send_buffer[0].cores = cores;
allocated->_send_buffer[0].memory = memory;
allocated->_send_buffer[0].cores = client.response()[0].cores;
allocated->_send_buffer[0].memory = client.response()[0].memory;

allocated->_connection->post_send(
allocated->_send_buffer,
Expand Down Expand Up @@ -359,7 +359,6 @@ void Manager::process_clients()
rdmalib::Poller recv_poller{std::get<1>(*_state.shared_queue(2))};
int client_count = 0;
std::vector<Client*> poll_send;
std::vector<client_t::iterator> removals;

while (!_shutdown.load()) {

Expand Down Expand Up @@ -392,17 +391,6 @@ void Manager::process_clients()
}
poll_send.clear();
}

if (removals.size()) {
for (auto it : removals) {
spdlog::info("Remove client id {}", it->second.client_id);
_clients.erase(it);
}

client_count -= removals.size();
removals.clear();
}

}

spdlog::info("Background thread stops processing client events");
Expand Down