diff --git a/raft/fsm.cc b/raft/fsm.cc index 3d9e59ebb0..831c75dc80 100644 --- a/raft/fsm.cc +++ b/raft/fsm.cc @@ -855,27 +855,6 @@ void fsm::request_vote_reply(server_id from, vote_reply&& reply) { } } -static size_t entry_size(const log_entry& e) { - struct overloaded { - size_t operator()(const command& c) { - return c.size(); - } - size_t operator()(const configuration& c) { - size_t size = 0; - for (auto& s : c.current) { - size += sizeof(s.addr.id); - size += s.addr.info.size(); - size += sizeof(s.can_vote); - } - return size; - } - size_t operator()(const log_entry::dummy& d) { - return 0; - } - }; - return std::visit(overloaded{}, e.data) + sizeof(e); -} - void fsm::replicate_to(follower_progress& progress, bool allow_empty) { logger.trace("replicate_to[{}->{}]: called next={} match={}", @@ -935,7 +914,7 @@ void fsm::replicate_to(follower_progress& progress, bool allow_empty) { req.entries.push_back(entry); logger.trace("replicate_to[{}->{}]: send entry idx={}, term={}", _my_id, progress.id, entry->idx, entry->term); - size += entry_size(*entry); + size += entry->get_size(); next_idx++; if (progress.state == follower_progress::state::PROBE) { break; // in PROBE mode send only one entry diff --git a/raft/raft.cc b/raft/raft.cc index 37599ca0d7..a499a8a119 100644 --- a/raft/raft.cc +++ b/raft/raft.cc @@ -12,6 +12,27 @@ namespace raft { seastar::logger logger("raft"); +size_t log_entry::get_size() const { + struct overloaded { + size_t operator()(const command& c) { + return c.size(); + } + size_t operator()(const configuration& c) { + size_t size = 0; + for (auto& s : c.current) { + size += sizeof(s.addr.id); + size += s.addr.info.size(); + size += sizeof(s.can_vote); + } + return size; + } + size_t operator()(const log_entry::dummy& d) { + return 0; + } + }; + return std::visit(overloaded{}, this->data) + sizeof(*this); +} + } // end of namespace raft auto fmt::formatter::format(const raft::server_address& addr, diff --git a/raft/raft.hh b/raft/raft.hh index 65f3afd281..348c2642ec 100644 --- a/raft/raft.hh +++ b/raft/raft.hh @@ -239,6 +239,8 @@ struct log_entry { term_t term; index_t idx; std::variant data; + + size_t get_size() const; }; using log_entry_ptr = seastar::lw_shared_ptr; diff --git a/service/raft/raft_rpc.cc b/service/raft/raft_rpc.cc index 643e97bbe3..4c4ccd16c5 100644 --- a/service/raft/raft_rpc.cc +++ b/service/raft/raft_rpc.cc @@ -7,6 +7,7 @@ */ #include "service/raft/raft_rpc.hh" #include +#include #include "gms/inet_address.hh" #include "serializer_impl.hh" #include "message/msg_addr.hh" @@ -21,6 +22,8 @@ static seastar::logger rlogger("raft_rpc"); using sloc = seastar::compat::source_location; +static constexpr size_t append_entries_semaphore_limit_bytes = 10_MiB; + raft_ticker_type::time_point timeout() { return raft_ticker_type::clock::now() + raft_tick_interval * (raft::ELECTION_TIMEOUT.count() / 2); } @@ -29,6 +32,7 @@ raft_rpc::raft_rpc(raft_state_machine& sm, netw::messaging_service& ms, shared_ptr failure_detector, raft::group_id gid, raft::server_id my_id) : _sm(sm), _group_id(std::move(gid)), _my_id(my_id), _messaging(ms) , _failure_detector(std::move(failure_detector)) + , _append_entries_semaphore(append_entries_semaphore_limit_bytes) {} @@ -81,6 +85,17 @@ future<> raft_rpc::send_append_entries(raft::server_id id, const raft::append_re rlogger.debug("Failed to send append_entires to {}: node is not seen as alive by the failure detector", id); co_return; } + + // Serializing raft::append_request for transmission requires approximately the same amount of memory + // as its size. This means when the Raft library replicates a log item to M servers, the log + // item is effectively copied M times. To prevent excessive memory usage and potential out-of-memory + // issues, we limit the total memory consumption of in-flight raft::append_request messages. + size_t req_size = 0; + for (const auto& e: append_request.entries) { + req_size += e->get_size(); + } + const auto guard = co_await get_units(_append_entries_semaphore, std::min(req_size, append_entries_semaphore_limit_bytes)); + co_return co_await ser::raft_rpc_verbs::send_raft_append_entries(&_messaging, locator::host_id{id.uuid()}, db::no_timeout, _group_id, _my_id, id, append_request); } diff --git a/service/raft/raft_rpc.hh b/service/raft/raft_rpc.hh index 2429de5a8b..1a9a4b3174 100644 --- a/service/raft/raft_rpc.hh +++ b/service/raft/raft_rpc.hh @@ -31,6 +31,9 @@ protected: shared_ptr _failure_detector; seastar::gate _shutdown_gate; + // Limits the total memory usage of raft::append_request messages that are currently being sent + seastar::semaphore _append_entries_semaphore; + explicit raft_rpc(raft_state_machine& sm, netw::messaging_service& ms, shared_ptr failure_detector, raft::group_id gid, raft::server_id my_id);