From a1604aa38871f9d24ce78f35d9f319b8871e39a4 Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Wed, 16 Mar 2022 15:14:36 +0200 Subject: [PATCH] raft: make raft requests abortable This patch adds an ability to pass abort_source to raft request APIs ( add_entry, modify_config) to make them abortable. A request issuer not always want to wait for a request to complete. For instance because a client disconnected or because it no longer interested in waiting because of a timeout. After this patch it can now abort waiting for such requests through an abort source. Note that aborting a request only aborts the wait for it to complete, it does not mean that the request will not be eventually executed. Message-Id: --- raft/fsm.cc | 4 +- raft/fsm.hh | 2 +- raft/raft.hh | 10 ++- raft/server.cc | 129 +++++++++++++++++---------- raft/server.hh | 13 ++- service/raft/raft_rpc.cc | 6 +- test/raft/randomized_nemesis_test.cc | 6 +- test/raft/replication.hh | 6 +- 8 files changed, 109 insertions(+), 67 deletions(-) diff --git a/raft/fsm.cc b/raft/fsm.cc index 3618572187..93a16e51b7 100644 --- a/raft/fsm.cc +++ b/raft/fsm.cc @@ -44,10 +44,10 @@ fsm::fsm(server_id id, term_t current_term, server_id voted_for, log log, failure_detector& failure_detector, fsm_config config) : fsm(id, current_term, voted_for, std::move(log), index_t{0}, failure_detector, config) {} -future<> fsm::wait_max_log_size() { +future<> fsm::wait_max_log_size(seastar::abort_source* as) { check_is_leader(); - return leader_state().log_limiter_semaphore->wait(); + return as ? leader_state().log_limiter_semaphore->wait(*as) : leader_state().log_limiter_semaphore->wait(); } const configuration& fsm::get_configuration() const { diff --git a/raft/fsm.hh b/raft/fsm.hh index 09e327ce8b..090f2ee51b 100644 --- a/raft/fsm.hh +++ b/raft/fsm.hh @@ -385,7 +385,7 @@ public: // Call this function to wait for the number of log entries to // go below max_log_size. - future<> wait_max_log_size(); + future<> wait_max_log_size(seastar::abort_source* as); // Return current configuration. Throws if not a leader. const configuration& get_configuration() const; diff --git a/raft/raft.hh b/raft/raft.hh index db267675a5..7c7660f6aa 100644 --- a/raft/raft.hh +++ b/raft/raft.hh @@ -257,6 +257,10 @@ struct no_other_voting_member : public error { no_other_voting_member() : error("Cannot stepdown because there is no other voting member") {} }; +struct request_aborted : public error { + request_aborted() : error("Request is aborted by a caller") {} +}; + // True if a failure to execute a Raft operation can be re-tried, // perhaps with a different server. inline bool is_transient_error(const std::exception& e) { @@ -580,18 +584,18 @@ public: virtual future apply_snapshot(server_id from, install_snapshot snp) = 0; // Try to execute read barrier, future resolves when the barrier is completed or error happens - virtual future execute_read_barrier(server_id from) = 0; + virtual future execute_read_barrier(server_id from, seastar::abort_source* as) = 0; // An endpoint on the leader to add an entry to the raft log, // as requested by a remote follower. - virtual future execute_add_entry(server_id from, command cmd) = 0; + virtual future execute_add_entry(server_id from, command cmd, seastar::abort_source* as) = 0; // An endpoint on the leader to change configuration, // as requested by a remote follower. // If the future resolves successfully, a dummy entry was committed after the configuration change. virtual future execute_modify_config(server_id from, std::vector add, - std::vector del) = 0; + std::vector del, seastar::abort_source* as) = 0; // Update RPC implementation with this client as // the receiver of RPC input. diff --git a/raft/server.cc b/raft/server.cc index 507c781301..4d917fc37f 100644 --- a/raft/server.cc +++ b/raft/server.cc @@ -32,6 +32,7 @@ struct active_read { read_id id; index_t idx; seastar::promise promise; + optimized_optional abort; }; static const seastar::metrics::label server_id_label("id"); @@ -56,21 +57,21 @@ public: void timeout_now_request(server_id from, timeout_now timeout_now) override; void read_quorum_request(server_id from, struct read_quorum read_quorum) override; void read_quorum_reply(server_id from, struct read_quorum_reply read_quorum_reply) override; - future execute_read_barrier(server_id) override; - future execute_add_entry(server_id from, command cmd) override; + future execute_read_barrier(server_id, seastar::abort_source* as) override; + future execute_add_entry(server_id from, command cmd, seastar::abort_source* as) override; future execute_modify_config(server_id from, - std::vector add, std::vector del) override; + std::vector add, std::vector del, seastar::abort_source* as) override; future apply_snapshot(server_id from, install_snapshot snp) override; // server interface - future<> add_entry(command command, wait_type type) override; - future<> set_configuration(server_address_set c_new) override; + future<> add_entry(command command, wait_type type, seastar::abort_source* as = nullptr) override; + future<> set_configuration(server_address_set c_new, seastar::abort_source* as = nullptr) override; raft::configuration get_configuration() const override; future<> start() override; future<> abort() override; term_t get_current_term() const override; - future<> read_barrier() override; + future<> read_barrier(seastar::abort_source* as = nullptr) override; void wait_until_candidate() override; future<> wait_election_done() override; future<> wait_log_idx_term(std::pair idx_log) override; @@ -80,8 +81,8 @@ public: void tick() override; raft::server_id id() const override; future<> stepdown(logical_clock::duration timeout) override; - future<> modify_config(std::vector add, std::vector del) override; - future add_entry_on_leader(command command); + future<> modify_config(std::vector add, std::vector del, seastar::abort_source* as = nullptr) override; + future add_entry_on_leader(command command, seastar::abort_source* as); void register_metrics() override; private: std::unique_ptr _rpc; @@ -145,6 +146,7 @@ private: struct op_status { term_t term; // term the entry was added with promise<> done; // notify when done here + optimized_optional abort; // abort subscription }; // Entries that have a waiter that needs to be notified when the @@ -205,7 +207,7 @@ private: // in-memory log. This allows to avoid deadlocks when // automatically appending a non-joint configuration // and makes set_configuration() safe from preemption. - template future<> add_entry_internal(T command, wait_type type); + template future<> add_entry_internal(T command, wait_type type, seastar::abort_source* as); template void send_message(server_id id, Message m); // Abort all snapshot transfer. @@ -248,20 +250,20 @@ private: void remove_from_rpc_config(const server_address& srv); // A helper to wait for a leader to get elected - future<> wait_for_leader(); + future<> wait_for_leader(seastar::abort_source* as); // Get "safe to read" index from a leader - future get_read_idx(server_id leader); + future get_read_idx(server_id leader, seastar::abort_source* as); // Wait for an entry with a specific term to get committed or // applied locally. - future<> wait_for_entry(entry_id eid, wait_type type); + future<> wait_for_entry(entry_id eid, wait_type type, seastar::abort_source* as); // Wait for a read barrier index to be applied. The index // is typically already committed, so we don't worry about the // term. future<> wait_for_apply(index_t idx); // Set configuration but don't wait for transition joint -> // non_joint. - future<> enter_joint_configuration(server_address_set c_new); + future<> enter_joint_configuration(server_address_set c_new, seastar::abort_source* as); friend std::ostream& operator<<(std::ostream& os, const server_impl& s); }; @@ -329,14 +331,14 @@ future<> server_impl::start() { co_return; } -future<> server_impl::wait_for_leader() { +future<> server_impl::wait_for_leader(seastar::abort_source* as) { if (!_leader_promise) { _leader_promise.emplace(); } - return _leader_promise->get_shared_future(); + return as ? _leader_promise->get_shared_future(*as) : _leader_promise->get_shared_future(); } -future<> server_impl::wait_for_entry(entry_id eid, wait_type type) { +future<> server_impl::wait_for_entry(entry_id eid, wait_type type, seastar::abort_source* as) { // The entry may have been already committed and even applied // in case it was forwarded to the leader. In this case // waiting for it is futile. @@ -361,6 +363,10 @@ future<> server_impl::wait_for_entry(entry_id eid, wait_type type) { throw stopped_error(); } + if (as && as->abort_requested()) { + throw request_aborted(); + } + auto& container = type == wait_type::committed ? _awaited_commits : _awaited_applies; logger.trace("[{}] waiting for entry {}.{}", id(), eid.term, eid.idx); @@ -405,22 +411,29 @@ future<> server_impl::wait_for_entry(entry_id eid, wait_type type) { } } assert(inserted); + if (as) { + it->second.abort = as->subscribe([it = it, &container] () noexcept { + it->second.done.set_exception(request_aborted()); + container.erase(it); + }); + assert(it->second.abort); + } co_await it->second.done.get_future(); logger.trace("[{}] done waiting for {}.{}", id(), eid.term, eid.idx); co_return; } template -future<> server_impl::add_entry_internal(T command, wait_type type) { +future<> server_impl::add_entry_internal(T command, wait_type type, seastar::abort_source* as) { // Must not preempt before adding entry, the caller, such as // set_configuration(), relies on it. const log_entry& e = _fsm->add_entry(std::move(command)); - co_return co_await wait_for_entry({.term = e.term, .idx = e.idx}, type); + co_return co_await wait_for_entry({.term = e.term, .idx = e.idx}, type, as); } -future server_impl::add_entry_on_leader(command cmd) { +future server_impl::add_entry_on_leader(command cmd, seastar::abort_source* as) { // Wait for a new slot to become available - co_await _fsm->wait_max_log_size(); + co_await _fsm->wait_max_log_size(as); logger.trace("[{}] adding entry after log size limit check", id()); const log_entry& e = _fsm->add_entry(std::move(cmd)); @@ -428,7 +441,7 @@ future server_impl::add_entry_on_leader(command cmd) { co_return entry_id{.term = e.term, .idx = e.idx}; } -future server_impl::execute_add_entry(server_id from, command cmd) { +future server_impl::execute_add_entry(server_id from, command cmd, seastar::abort_source* as) { if (from != _id && !_fsm->get_configuration().contains(from)) { // Do not accept entries from servers removed from the // configuration. @@ -436,13 +449,13 @@ future server_impl::execute_add_entry(server_id from, command c } logger.trace("[{}] adding a forwarded entry from {}", id(), from); try { - co_return add_entry_reply{co_await add_entry_on_leader(std::move(cmd))}; + co_return add_entry_reply{co_await add_entry_on_leader(std::move(cmd), as)}; } catch (raft::not_a_leader& e) { co_return add_entry_reply{e}; } } -future<> server_impl::add_entry(command command, wait_type type) { +future<> server_impl::add_entry(command command, wait_type type, seastar::abort_source* as) { _stats.add_command++; server_id leader = _fsm->current_leader(); logger.trace("[{}] an entry is submitted", id()); @@ -450,13 +463,16 @@ future<> server_impl::add_entry(command command, wait_type type) { if (leader != _id) { throw not_a_leader{leader}; } - auto eid = co_await add_entry_on_leader(std::move(command)); - co_return co_await wait_for_entry(eid, type); + auto eid = co_await add_entry_on_leader(std::move(command), as); + co_return co_await wait_for_entry(eid, type, as); } while (true) { + if (as && as->abort_requested()) { + throw request_aborted(); + } if (leader == server_id{}) { logger.trace("[{}] the leader is unknown, waiting through uncertainty", id()); - co_await wait_for_leader(); + co_await wait_for_leader(as); leader = _fsm->current_leader(); } else { auto reply = co_await [&] { @@ -464,14 +480,14 @@ future<> server_impl::add_entry(command command, wait_type type) { logger.trace("[{}] an entry proceeds on a leader", id()); // Make a copy of the command since we may still // retry and forward it. - return execute_add_entry(leader, command); + return execute_add_entry(leader, command, as); } else { logger.trace("[{}] forwarding the entry to {}", id(), leader); return _rpc->send_add_entry(leader, command); } }(); if (std::holds_alternative(reply)) { - co_return co_await wait_for_entry(std::get(reply), type); + co_return co_await wait_for_entry(std::get(reply), type, as); } leader = std::get(reply).leader; } @@ -479,7 +495,7 @@ future<> server_impl::add_entry(command command, wait_type type) { } future server_impl::execute_modify_config(server_id from, - std::vector add, std::vector del) { + std::vector add, std::vector del, seastar::abort_source* as) { if (from != _id && !_fsm->get_configuration().contains(from)) { // Do not accept entries from servers removed from the @@ -509,10 +525,10 @@ future server_impl::execute_modify_config(server_id from, logger.trace("[{}] removing server {}", id(), to_remove); cfg.erase(server_address{.id = to_remove}); } - co_await enter_joint_configuration(cfg); + co_await enter_joint_configuration(cfg, as); const log_entry& e = _fsm->add_entry(log_entry::dummy()); auto eid = entry_id{.term = e.term, .idx = e.idx}; - co_await wait_for_entry(eid, wait_type::committed); + co_await wait_for_entry(eid, wait_type::committed, as); // `modify_config` doesn't actually need the entry id for anything // but we reuse the `add_entry` RPC verb which requires it. co_return add_entry_reply{eid}; @@ -530,28 +546,31 @@ future server_impl::execute_modify_config(server_id from, co_return add_entry_reply{not_a_leader{_fsm->current_leader()}}; } -future<> server_impl::modify_config(std::vector add, std::vector del) { +future<> server_impl::modify_config(std::vector add, std::vector del, seastar::abort_source* as) { server_id leader = _fsm->current_leader(); if (!_config.enable_forwarding) { if (leader != _id) { throw not_a_leader{leader}; } - auto reply = co_await execute_modify_config(leader, std::move(add), std::move(del)); + auto reply = co_await execute_modify_config(leader, std::move(add), std::move(del), as); if (std::holds_alternative(reply)) { co_return; } throw raft::not_a_leader{_fsm->current_leader()}; } while (true) { + if (as && as->abort_requested()) { + throw request_aborted(); + } if (leader == server_id{}) { - co_await wait_for_leader(); + co_await wait_for_leader(as); leader = _fsm->current_leader(); } else { auto reply = co_await [&] { if (leader == _id) { // Make a copy since of the params since we may // still retry and forward them. - return execute_modify_config(leader, add, del); + return execute_modify_config(leader, add, del, as); } else { logger.trace("[{}] forwarding the entry to {}", id(), leader); return _rpc->send_modify_config(leader, add, del); @@ -1025,7 +1044,7 @@ future<> server_impl::wait_for_apply(index_t idx) { } } -future server_impl::execute_read_barrier(server_id from) { +future server_impl::execute_read_barrier(server_id from, seastar::abort_source* as) { logger.trace("[{}] execute_read_barrier start", _id); std::optional> rid; @@ -1040,32 +1059,46 @@ future server_impl::execute_read_barrier(server_id from) { } logger.trace("[{}] execute_read_barrier read id is {} for commit idx {}", _id, rid->first, rid->second); - _reads.push_back({rid->first, rid->second, {}}); - return _reads.back().promise.get_future(); + if (as && as->abort_requested()) { + return make_exception_future(request_aborted()); + } + _reads.push_back({rid->first, rid->second, {}, {}}); + auto read = --_reads.end(); + if (as) { + read->abort = as->subscribe([this, read] () noexcept { + read->promise.set_exception(request_aborted()); + _reads.erase(read); + }); + assert(read->abort); + } + return read->promise.get_future(); } -future server_impl::get_read_idx(server_id leader) { +future server_impl::get_read_idx(server_id leader, seastar::abort_source* as) { if (_id == leader) { - return execute_read_barrier(_id); + return execute_read_barrier(_id, as); } else { return _rpc->execute_read_barrier_on_leader(leader); } } -future<> server_impl::read_barrier() { +future<> server_impl::read_barrier(seastar::abort_source* as) { server_id leader = _fsm->current_leader(); logger.trace("[{}] read_barrier start", _id); index_t read_idx; while (read_idx == index_t{}) { + if (as && as->abort_requested()) { + throw request_aborted(); + } logger.trace("[{}] read_barrier forward to {}", _id, leader); if (leader == server_id{}) { - co_await wait_for_leader(); + co_await wait_for_leader(as); leader = _fsm->current_leader(); } else { auto applied = _applied_idx; - auto res = co_await get_read_idx(leader); + auto res = co_await get_read_idx(leader, as); if (std::holds_alternative(res)) { // the leader is not ready to answer because it did not // committed any entries yet, so wait for any entry to be @@ -1163,7 +1196,7 @@ future<> server_impl::abort() { co_await seastar::when_all_succeed(all_futures.begin(), all_futures.end()).discard_result(); } -future<> server_impl::enter_joint_configuration(server_address_set c_new) { +future<> server_impl::enter_joint_configuration(server_address_set c_new, seastar::abort_source* as) { const auto& cfg = _fsm->get_configuration(); // 4.1 Cluster membership changes. Safety. // When the leader receives a request to add or remove a server @@ -1175,11 +1208,11 @@ future<> server_impl::enter_joint_configuration(server_address_set c_new) { co_return; } _stats.add_config++; - co_await add_entry_internal(raft::configuration{std::move(c_new)}, wait_type::committed); + co_await add_entry_internal(raft::configuration{std::move(c_new)}, wait_type::committed, as); } -future<> server_impl::set_configuration(server_address_set c_new) { - co_await enter_joint_configuration(std::move(c_new)); +future<> server_impl::set_configuration(server_address_set c_new, seastar::abort_source* as) { + co_await enter_joint_configuration(std::move(c_new), as); // Above we co_wait that the joint configuration is committed. // Immediately, without yield, once the FSM discovers // this, it appends non-joint entry. Hence, @@ -1187,7 +1220,7 @@ future<> server_impl::set_configuration(server_address_set c_new) { // By waiting for a follow up dummy to get committed, we // automatically wait for the non-joint entry to get // committed. - co_await add_entry_internal(log_entry::dummy(), wait_type::committed); + co_await add_entry_internal(log_entry::dummy(), wait_type::committed, as); } raft::configuration diff --git a/raft/server.hh b/raft/server.hh index e1a3398c8d..a59e5ad9a0 100644 --- a/raft/server.hh +++ b/raft/server.hh @@ -6,6 +6,7 @@ * SPDX-License-Identifier: AGPL-3.0-or-later */ #pragma once +#include #include "raft.hh" namespace raft { @@ -55,7 +56,7 @@ public: // this means that the entry is committed/applied locally (depending on the wait type). // Applied locally means the local state machine replica applied this command; // committed locally means simply that the commit index is beyond this entry's index. - virtual future<> add_entry(command command, wait_type type) = 0; + virtual future<> add_entry(command command, wait_type type, seastar::abort_source* as = nullptr) = 0; // Set a new cluster configuration. If the configuration is // identical to the previous one does nothing. @@ -79,7 +80,9 @@ public: // Note: committing a dummy entry extends the opportunity for // uncertainty, thus commit_status_unknown exception may be // returned even in case of a successful config change. - virtual future<> set_configuration(server_address_set c_new) = 0; + // + // A caller may pass a pointer to an abort_source to make operation abortable. + virtual future<> set_configuration(server_address_set c_new, seastar::abort_source* as = nullptr) = 0; // A simplified wrapper around set_configuration() which adds // and deletes servers. Unlike set_configuration(), @@ -99,8 +102,9 @@ public: // The local commit index is not necessarily up-to-date yet and the state of the local state machine // replica may still come from before the configuration entry. // (exception: if no server was actually added or removed, then nothing gets committed and the leader responds immediately). + // A caller may pass a pointer to an abort_source to make operation abortable. virtual future<> modify_config(std::vector add, - std::vector del) = 0; + std::vector del, seastar::abort_source* as = nullptr) = 0; // Return the currently known configuration virtual raft::configuration get_configuration() const = 0; @@ -123,7 +127,8 @@ public: // May be called before attempting a read from the local state // machine. The read should proceed only after the returned // future has resolved successfully. - virtual future<> read_barrier() = 0; + // A caller may pass a pointer to an abort_source to make operation abortable. + virtual future<> read_barrier(seastar::abort_source* as = nullptr) = 0; // Initiate leader stepdown process. // If the node is not a leader returns not_a_leader exception. diff --git a/service/raft/raft_rpc.cc b/service/raft/raft_rpc.cc index 3144af6cde..fdee6429a8 100644 --- a/service/raft/raft_rpc.cc +++ b/service/raft/raft_rpc.cc @@ -178,7 +178,7 @@ void raft_rpc::read_quorum_reply(raft::server_id from, raft::read_quorum_reply c } future raft_rpc::execute_read_barrier(raft::server_id from) { - return _client->execute_read_barrier(from); + return _client->execute_read_barrier(from, nullptr); } future raft_rpc::apply_snapshot(raft::server_id from, raft::install_snapshot snp) { @@ -190,13 +190,13 @@ future raft_rpc::apply_snapshot(raft::server_id from, raft } future raft_rpc::execute_add_entry(raft::server_id from, raft::command cmd) { - return _client->execute_add_entry(from, std::move(cmd)); + return _client->execute_add_entry(from, std::move(cmd), nullptr); } future raft_rpc::execute_modify_config(raft::server_id from, std::vector add, std::vector del) { - return _client->execute_modify_config(from, std::move(add), std::move(del)); + return _client->execute_modify_config(from, std::move(add), std::move(del), nullptr); } } // end of namespace service diff --git a/test/raft/randomized_nemesis_test.cc b/test/raft/randomized_nemesis_test.cc index 2a67aa3135..fcfe237ef8 100644 --- a/test/raft/randomized_nemesis_test.cc +++ b/test/raft/randomized_nemesis_test.cc @@ -466,7 +466,7 @@ public: ++_read_barrier_executions; (void)[] (rpc& self, raft::server_id src, execute_barrier_on_leader m, gate::holder holder) -> future<> { try { - auto reply = co_await self._client->execute_read_barrier(src); + auto reply = co_await self._client->execute_read_barrier(src, nullptr); self._send(src, execute_barrier_on_leader_reply{ .reply = std::move(reply), @@ -498,7 +498,7 @@ public: ++_add_entry_executions; (void)[] (rpc& self, raft::server_id src, add_entry_message m, gate::holder holder) -> future<> { try { - auto reply = co_await self._client->execute_add_entry(src, std::move(m.cmd)); + auto reply = co_await self._client->execute_add_entry(src, std::move(m.cmd), nullptr); self._send(src, add_entry_reply_message{ .reply = std::move(reply), @@ -530,7 +530,7 @@ public: ++_modify_config_executions; (void)[] (rpc& self, raft::server_id src, modify_config_message m, gate::holder holder) -> future<> { try { - auto reply = co_await self._client->execute_modify_config(src, std::move(m.add), std::move(m.del)); + auto reply = co_await self._client->execute_modify_config(src, std::move(m.add), std::move(m.del), nullptr); self._send(src, add_entry_reply_message{ .reply = std::move(reply), diff --git a/test/raft/replication.hh b/test/raft/replication.hh index 3227965e6c..430ee8da78 100644 --- a/test/raft/replication.hh +++ b/test/raft/replication.hh @@ -707,7 +707,7 @@ public: if (!(*_connected)(id, _id)) { return make_exception_future(std::runtime_error("cannot send append since nodes are disconnected")); } - return _net[id]->_client->execute_read_barrier(_id); + return _net[id]->_client->execute_read_barrier(_id, nullptr); } void check_known_and_connected(raft::server_id id) { if (!_net.count(id)) { @@ -719,13 +719,13 @@ public: } future send_add_entry(raft::server_id id, const raft::command& cmd) override { check_known_and_connected(id); - return _net[id]->_client->execute_add_entry(_id, cmd); + return _net[id]->_client->execute_add_entry(_id, cmd, nullptr); } future send_modify_config(raft::server_id id, const std::vector& add, const std::vector& del) override { check_known_and_connected(id); - return _net[id]->_client->execute_modify_config(_id, add, del); + return _net[id]->_client->execute_modify_config(_id, add, del, nullptr); } void add_server(raft::server_id id, bytes node_info) override {