Merge 'raft_rpc::send_append_entries: limit memory usage' from Petr Gusev
Serializing `raft::append_request` for transmission requires approximately the same amount of memory as its size. This means when the Raft library replicates a log item to M servers, the log item is effectively copied M times. To prevent excessive memory usage and potential out-of-memory issues, we limit the total memory consumption of in-flight `raft::append_request` messages. Fixes scylladb/scylladb#14411 Closes scylladb/scylladb#22835 * github.com:scylladb/scylladb: raft_rpc::send_append_entries: limit memory usage fms: extract entry_size to log_entry::get_size
This commit is contained in:
23
raft/fsm.cc
23
raft/fsm.cc
@@ -855,27 +855,6 @@ void fsm::request_vote_reply(server_id from, vote_reply&& reply) {
|
||||
}
|
||||
}
|
||||
|
||||
static size_t entry_size(const log_entry& e) {
|
||||
struct overloaded {
|
||||
size_t operator()(const command& c) {
|
||||
return c.size();
|
||||
}
|
||||
size_t operator()(const configuration& c) {
|
||||
size_t size = 0;
|
||||
for (auto& s : c.current) {
|
||||
size += sizeof(s.addr.id);
|
||||
size += s.addr.info.size();
|
||||
size += sizeof(s.can_vote);
|
||||
}
|
||||
return size;
|
||||
}
|
||||
size_t operator()(const log_entry::dummy& d) {
|
||||
return 0;
|
||||
}
|
||||
};
|
||||
return std::visit(overloaded{}, e.data) + sizeof(e);
|
||||
}
|
||||
|
||||
void fsm::replicate_to(follower_progress& progress, bool allow_empty) {
|
||||
|
||||
logger.trace("replicate_to[{}->{}]: called next={} match={}",
|
||||
@@ -935,7 +914,7 @@ void fsm::replicate_to(follower_progress& progress, bool allow_empty) {
|
||||
req.entries.push_back(entry);
|
||||
logger.trace("replicate_to[{}->{}]: send entry idx={}, term={}",
|
||||
_my_id, progress.id, entry->idx, entry->term);
|
||||
size += entry_size(*entry);
|
||||
size += entry->get_size();
|
||||
next_idx++;
|
||||
if (progress.state == follower_progress::state::PROBE) {
|
||||
break; // in PROBE mode send only one entry
|
||||
|
||||
21
raft/raft.cc
21
raft/raft.cc
@@ -12,6 +12,27 @@ namespace raft {
|
||||
|
||||
seastar::logger logger("raft");
|
||||
|
||||
size_t log_entry::get_size() const {
|
||||
struct overloaded {
|
||||
size_t operator()(const command& c) {
|
||||
return c.size();
|
||||
}
|
||||
size_t operator()(const configuration& c) {
|
||||
size_t size = 0;
|
||||
for (auto& s : c.current) {
|
||||
size += sizeof(s.addr.id);
|
||||
size += s.addr.info.size();
|
||||
size += sizeof(s.can_vote);
|
||||
}
|
||||
return size;
|
||||
}
|
||||
size_t operator()(const log_entry::dummy& d) {
|
||||
return 0;
|
||||
}
|
||||
};
|
||||
return std::visit(overloaded{}, this->data) + sizeof(*this);
|
||||
}
|
||||
|
||||
} // end of namespace raft
|
||||
|
||||
auto fmt::formatter<raft::server_address>::format(const raft::server_address& addr,
|
||||
|
||||
@@ -239,6 +239,8 @@ struct log_entry {
|
||||
term_t term;
|
||||
index_t idx;
|
||||
std::variant<command, configuration, dummy> data;
|
||||
|
||||
size_t get_size() const;
|
||||
};
|
||||
|
||||
using log_entry_ptr = seastar::lw_shared_ptr<const log_entry>;
|
||||
|
||||
@@ -7,6 +7,7 @@
|
||||
*/
|
||||
#include "service/raft/raft_rpc.hh"
|
||||
#include <seastar/core/coroutine.hh>
|
||||
#include <seastar/core/units.hh>
|
||||
#include "gms/inet_address.hh"
|
||||
#include "serializer_impl.hh"
|
||||
#include "message/msg_addr.hh"
|
||||
@@ -21,6 +22,8 @@ static seastar::logger rlogger("raft_rpc");
|
||||
|
||||
using sloc = seastar::compat::source_location;
|
||||
|
||||
static constexpr size_t append_entries_semaphore_limit_bytes = 10_MiB;
|
||||
|
||||
raft_ticker_type::time_point timeout() {
|
||||
return raft_ticker_type::clock::now() + raft_tick_interval * (raft::ELECTION_TIMEOUT.count() / 2);
|
||||
}
|
||||
@@ -29,6 +32,7 @@ raft_rpc::raft_rpc(raft_state_machine& sm, netw::messaging_service& ms,
|
||||
shared_ptr<raft::failure_detector> failure_detector, raft::group_id gid, raft::server_id my_id)
|
||||
: _sm(sm), _group_id(std::move(gid)), _my_id(my_id), _messaging(ms)
|
||||
, _failure_detector(std::move(failure_detector))
|
||||
, _append_entries_semaphore(append_entries_semaphore_limit_bytes)
|
||||
{}
|
||||
|
||||
|
||||
@@ -81,6 +85,17 @@ future<> raft_rpc::send_append_entries(raft::server_id id, const raft::append_re
|
||||
rlogger.debug("Failed to send append_entires to {}: node is not seen as alive by the failure detector", id);
|
||||
co_return;
|
||||
}
|
||||
|
||||
// Serializing raft::append_request for transmission requires approximately the same amount of memory
|
||||
// as its size. This means when the Raft library replicates a log item to M servers, the log
|
||||
// item is effectively copied M times. To prevent excessive memory usage and potential out-of-memory
|
||||
// issues, we limit the total memory consumption of in-flight raft::append_request messages.
|
||||
size_t req_size = 0;
|
||||
for (const auto& e: append_request.entries) {
|
||||
req_size += e->get_size();
|
||||
}
|
||||
const auto guard = co_await get_units(_append_entries_semaphore, std::min(req_size, append_entries_semaphore_limit_bytes));
|
||||
|
||||
co_return co_await ser::raft_rpc_verbs::send_raft_append_entries(&_messaging, locator::host_id{id.uuid()},
|
||||
db::no_timeout, _group_id, _my_id, id, append_request);
|
||||
}
|
||||
|
||||
@@ -31,6 +31,9 @@ protected:
|
||||
shared_ptr<raft::failure_detector> _failure_detector;
|
||||
seastar::gate _shutdown_gate;
|
||||
|
||||
// Limits the total memory usage of raft::append_request messages that are currently being sent
|
||||
seastar::semaphore _append_entries_semaphore;
|
||||
|
||||
explicit raft_rpc(raft_state_machine& sm, netw::messaging_service& ms,
|
||||
shared_ptr<raft::failure_detector> failure_detector, raft::group_id gid, raft::server_id my_id);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user