diff --git a/idl/raft.idl.hh b/idl/raft.idl.hh index 7881768e7b..b1a43d737e 100644 --- a/idl/raft.idl.hh +++ b/idl/raft.idl.hh @@ -63,6 +63,7 @@ struct vote_request { raft::internal::tagged_uint64 last_log_idx; raft::internal::tagged_uint64 last_log_term; bool is_prevote; + bool force; }; struct vote_reply { @@ -111,4 +112,8 @@ struct append_request { std::vector> entries; }; +struct timeout_now { + raft::internal::tagged_uint64 current_term; +}; + } // namespace raft diff --git a/message/messaging_service.cc b/message/messaging_service.cc index 33ff8f8a5c..c59c173e7c 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -576,6 +576,7 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) { case messaging_verb::RAFT_APPEND_ENTRIES_REPLY: case messaging_verb::RAFT_VOTE_REQUEST: case messaging_verb::RAFT_VOTE_REPLY: + case messaging_verb::RAFT_TIMEOUT_NOW: return 2; case messaging_verb::MUTATION_DONE: case messaging_verb::MUTATION_FAILED: @@ -1547,6 +1548,15 @@ future<> messaging_service::send_raft_vote_reply(msg_addr id, clock_type::time_p return send_message_oneway_timeout(this, timeout, messaging_verb::RAFT_VOTE_REPLY, std::move(id), group_id, std::move(from_id), std::move(dst_id), vote_reply); } +void messaging_service::register_raft_timeout_now(std::function (const rpc::client_info&, rpc::opt_time_point, uint64_t group_id, raft::server_id from_id, raft::server_id dst_id, raft::timeout_now)>&& func) { + register_handler(this, netw::messaging_verb::RAFT_TIMEOUT_NOW, std::move(func)); +} +future<> messaging_service::unregister_raft_timeout_now() { + return unregister_handler(netw::messaging_verb::RAFT_TIMEOUT_NOW); +} +future<> messaging_service::send_raft_timeout_now(msg_addr id, clock_type::time_point timeout, uint64_t group_id, raft::server_id from_id, raft::server_id dst_id, const raft::timeout_now& timeout_now) { + return send_message_oneway_timeout(this, timeout, messaging_verb::RAFT_TIMEOUT_NOW, std::move(id), group_id, std::move(from_id), std::move(dst_id), timeout_now); +} void init_messaging_service(sharded& ms, messaging_service::config mscfg, netw::messaging_service::scheduling_config scfg, sstring ms_trust_store, sstring ms_cert, sstring ms_key, sstring ms_tls_prio, bool ms_client_auth) { diff --git a/message/messaging_service.hh b/message/messaging_service.hh index 0bbaf1fb65..f0ab9c8a9e 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -150,7 +150,8 @@ enum class messaging_verb : int32_t { RAFT_APPEND_ENTRIES_REPLY = 48, RAFT_VOTE_REQUEST = 49, RAFT_VOTE_REPLY = 50, - LAST = 51, + RAFT_TIMEOUT_NOW = 51, + LAST = 52, }; } // namespace netw @@ -574,6 +575,10 @@ public: future<> unregister_raft_vote_reply(); future<> send_raft_vote_reply(msg_addr id, clock_type::time_point timeout, uint64_t group_id, raft::server_id from_id, raft::server_id dst_id, const raft::vote_reply& vote_reply); + void register_raft_timeout_now(std::function (const rpc::client_info&, rpc::opt_time_point, uint64_t group_id, raft::server_id from_id, raft::server_id dst_id, raft::timeout_now)>&& func); + future<> unregister_raft_timeout_now(); + future<> send_raft_timeout_now(msg_addr id, clock_type::time_point timeout, uint64_t group_id, raft::server_id from_id, raft::server_id dst_id, const raft::timeout_now& timeout_now); + void foreach_server_connection_stats(std::function&& f) const; private: bool remove_rpc_client_one(clients_map& clients, msg_addr id, bool dead_only); diff --git a/raft/fsm.cc b/raft/fsm.cc index c33554991c..d3262a384d 100644 --- a/raft/fsm.cc +++ b/raft/fsm.cc @@ -51,6 +51,12 @@ template const log_entry& fsm::add_entry(T command) { // It's only possible to add entries on a leader. check_is_leader(); + if(leader_state().stepdown) { + // A leader that is stepping down should not add new entries + // to its log (see 3.10), but it still does not know who the new + // leader will be. + throw not_a_leader({}); + } if constexpr (std::is_same_v) { if (_log.last_conf_idx() > _commit_idx || @@ -152,7 +158,7 @@ void fsm::become_follower(server_id leader) { } } -void fsm::become_candidate(bool is_prevote) { +void fsm::become_candidate(bool is_prevote, bool is_leadership_transfer) { // When starting a campain we need to reset current leader otherwise // disruptive server prevention will stall an election if quorum of nodes // start election together since each one will ignore vote requests from others @@ -198,10 +204,11 @@ void fsm::become_candidate(bool is_prevote) { // Already signaled _sm_events in update_current_term() continue; } - logger.trace("{} [term: {}, index: {}, last log term: {}{}] sent vote request to {}", - _my_id, term, _log.last_idx(), _log.last_term(), is_prevote ? ", prevote" : "", server.id); + logger.trace("{} [term: {}, index: {}, last log term: {}{}{}] sent vote request to {}", + _my_id, term, _log.last_idx(), _log.last_term(), is_prevote ? ", prevote" : "", + is_leadership_transfer ? ", force" : "", server.id); - send_to(server.id, vote_request{term, _log.last_idx(), _log.last_term(), is_prevote}); + send_to(server.id, vote_request{term, _log.last_idx(), _log.last_term(), is_prevote, is_leadership_transfer}); } if (votes.tally_votes() == vote_result::WON) { // A single node cluster. @@ -295,6 +302,7 @@ fsm_output fsm::get_output() { void fsm::advance_stable_idx(index_t idx) { _log.stable_to(idx); if (is_leader()) { + replicate(); if (leader_state().tracker.leader_progress()) { // If this server is leader and is part of the current // configuration, update it's progress and optionally @@ -302,7 +310,6 @@ void fsm::advance_stable_idx(index_t idx) { leader_state().tracker.leader_progress()->accepted(idx); maybe_commit(); } - replicate(); } } @@ -367,10 +374,7 @@ void fsm::maybe_commit() { // // A leader that is removed from the configuration // steps down once the C_new entry is committed. - // - // @todo: when leadership transfer extension is - // implemented, send TimeoutNow to a member of C_new - become_follower(server_id{}); + transfer_leadership(); } } } @@ -510,11 +514,18 @@ void fsm::append_entries_reply(server_id from, append_reply&& reply) { progress.become_pipeline(); + // If a leader is stepping down, transfer the leadership + // to a first voting node that has fully replicated log. + if (leader_state().stepdown && !leader_state().timeout_now_sent && + progress.can_vote && progress.match_idx == _log.last_idx()) { + send_timeout_now(progress.id); + } + // check if any new entry can be committed maybe_commit(); - // We may have resigned leadership if committed a new - // configuration. + // We may have resigned leadership if a stepdown process completed + // while the leader is no longer part of the configuration. if (!is_leader()) { return; } @@ -800,6 +811,28 @@ bool fsm::apply_snapshot(snapshot snp, size_t trailing) { return true; } +void fsm::transfer_leadership() { + check_is_leader(); + leader_state().stepdown = true; + // Stop new requests from commig in + leader_state().log_limiter_semaphore.consume(_config.max_log_size); + // If there is a fully up-to-date voting replica make it start an election + for (auto&& [_, p] : leader_state().tracker) { + if (p.id != _my_id && p.can_vote && p.match_idx == _log.last_idx()) { + send_timeout_now(p.id); + break; + } + } +} + +void fsm::send_timeout_now(server_id id) { + send_to(id, timeout_now{_current_term}); + leader_state().timeout_now_sent = true; + if (leader_state().tracker.leader_progress() == nullptr) { + become_follower({}); + } +} + void fsm::stop() { _sm_events.broken(); } diff --git a/raft/fsm.hh b/raft/fsm.hh index a0ee1cda2a..f8c25a06b2 100644 --- a/raft/fsm.hh +++ b/raft/fsm.hh @@ -85,6 +85,10 @@ struct leader { const server_id& current_leader; // Used to limit log size seastar::semaphore log_limiter_semaphore; + // True if the leader is in the process of transferring the leadership + bool stepdown = false; + // True it timeout_now was already sent to one of the followers + bool timeout_now_sent = false; leader(server_id id, size_t max_log_size, const server_id& leader_) : tracker(id), current_leader(leader_), log_limiter_semaphore(max_log_size) {} ~leader() { log_limiter_semaphore.broken(not_a_leader(current_leader)); @@ -225,7 +229,7 @@ private: void become_leader(); - void become_candidate(bool is_prevote); + void become_candidate(bool is_prevote, bool is_leadership_transfer = false); void become_follower(server_id leader); @@ -264,6 +268,7 @@ private: return std::get(_state); } + void send_timeout_now(server_id); protected: // For testing leader& leader_state() { return std::get(_state); @@ -329,6 +334,15 @@ public: template void step(server_id from, Message&& msg); + // This function can be called on a leader only. + // When called it makes the leader to stop accepting + // new requests and waits for one of the voting followers + // to be fully up-to-date. When such follower appears it + // sends timeout_now rpc to it and makes it initiate new election. + // Can be used for leader stepdown if new configuration does not contain + // current leader. + void transfer_leadership(); + void stop(); // @sa can_read() @@ -360,6 +374,7 @@ public: friend std::ostream& operator<<(std::ostream& os, const fsm& f); }; + template void fsm::step(server_id from, Message&& msg) { static_assert(std::is_rvalue_reference::value, "must be rvalue"); @@ -387,12 +402,14 @@ void fsm::step(server_id from, Message&& msg) { leader = from; } else { if constexpr (std::is_same_v) { - if (_current_leader != server_id{} && election_elapsed() < ELECTION_TIMEOUT) { + if (_current_leader != server_id{} && election_elapsed() < ELECTION_TIMEOUT && !msg.force) { // 4.2.3 Disruptive servers // If a server receives a RequestVote request // within the minimum election timeout of // hearing from a current leader, it does not // update its term or grant its vote. + // Unless `force` flag is set which indicates that the current leader + // wants to stepdown. logger.trace("{} [term: {}] not granting a vote within a minimum election timeout, elapsed {} (current leader = {})", _my_id, _current_term, election_elapsed(), _current_leader); return; @@ -491,6 +508,13 @@ void fsm::step(server_id from, Message&& msg) { // Switch the follower to log transfer mode. install_snapshot_reply(from, std::move(msg)); } + } else if constexpr (std::is_same_v) { + if constexpr (std::is_same_v) { + // Leadership transfers never use pre-vote; we know we are not + // recovering from a partition so there is no need for the + // extra round trip. + become_candidate(false, true); + } } }; diff --git a/raft/raft.hh b/raft/raft.hh index 52409fb451..732b77c498 100644 --- a/raft/raft.hh +++ b/raft/raft.hh @@ -261,6 +261,9 @@ struct vote_request { term_t last_log_term; // True if this is prevote request bool is_prevote; + // If the flag is set the request will not be ignored even + // if there is an active leader. Used during leadership transfer. + bool force; }; struct vote_reply { @@ -286,7 +289,13 @@ struct snapshot_reply { bool success; }; -using rpc_message = std::variant; +// 3.10 section from PhD Leadership transfer extension +struct timeout_now { + // Current term on a leader + term_t current_term; +}; + +using rpc_message = std::variant; // we need something that can be truncated form both sides. // std::deque move constructor is not nothrow hence cannot be used @@ -382,6 +391,11 @@ public: // received. virtual future<> send_vote_reply(server_id id, const vote_reply& vote_reply) = 0; + // Send a request to start leader election immediately + // resolves when message is sent. It does not mean it was + // received. + virtual future<> send_timeout_now(server_id, const timeout_now& timeout_now) = 0; + // When a new server is learn this function is called with the // info about the server. virtual void add_server(server_id id, server_info info) = 0; @@ -416,6 +430,8 @@ public: // Handle response to RequestVote RPC virtual void request_vote_reply(server_id from, vote_reply vote_reply) = 0; + virtual void timeout_now_request(server_id from, timeout_now timeout_now) = 0; + // Apply incoming snapshot, future resolves when application is complete virtual future apply_snapshot(server_id from, install_snapshot snp) = 0; diff --git a/raft/server.cc b/raft/server.cc index af1d4d5ed8..e05879fc84 100644 --- a/raft/server.cc +++ b/raft/server.cc @@ -57,6 +57,7 @@ public: void append_entries_reply(server_id from, append_reply reply) override; void request_vote(server_id from, vote_request vote_request) override; void request_vote_reply(server_id from, vote_reply vote_reply) override; + void timeout_now_request(server_id from, timeout_now timeout_now) override; // server interface future<> add_entry(command command, wait_type type); @@ -110,6 +111,8 @@ private: uint64_t queue_entries_for_apply = 0; uint64_t applied_entries = 0; uint64_t snapshots_taken = 0; + uint64_t timeout_now_sent = 0; + uint64_t timeout_now_received = 0; } _stats; struct op_status { @@ -263,6 +266,11 @@ void server_impl::request_vote_reply(server_id from, vote_reply vote_reply) { _fsm->step(from, std::move(vote_reply)); } +void server_impl::timeout_now_request(server_id from, timeout_now timeout_now) { + _stats.timeout_now_received++; + _fsm->step(from, std::move(timeout_now)); +} + void server_impl::notify_waiters(std::map& waiters, const std::vector& entries) { index_t commit_idx = entries.back()->idx; @@ -321,6 +329,9 @@ future<> server_impl::send_message(server_id id, Message m) { } else if constexpr (std::is_same_v) { _stats.vote_request_reply_sent++; return _rpc->send_vote_reply(id, m); + } else if constexpr (std::is_same_v) { + _stats.timeout_now_sent++; + return _rpc->send_timeout_now(id, m); } else if constexpr (std::is_same_v) { _stats.install_snapshot_sent++; // Send in the background. @@ -558,6 +569,8 @@ void server_impl::register_metrics() { sm::description("how many messages were received"), {server_id_label(_id), message_type("request_vote")}), sm::make_total_operations("messages_received", _stats.request_vote_reply_received, sm::description("how many messages were received"), {server_id_label(_id), message_type("request_vote_reply")}), + sm::make_total_operations("messages_received", _stats.timeout_now_received, + sm::description("how many messages were received"), {server_id_label(_id), message_type("timeout_now")}), sm::make_total_operations("messages_sent", _stats.append_entries_sent, sm::description("how many messages were send"), {server_id_label(_id), message_type("append_entries")}), @@ -571,6 +584,8 @@ void server_impl::register_metrics() { sm::description("how many messages were sent"), {server_id_label(_id), message_type("install_snapshot")}), sm::make_total_operations("messages_sent", _stats.snapshot_reply_sent, sm::description("how many messages were sent"), {server_id_label(_id), message_type("snapshot_reply")}), + sm::make_total_operations("messages_sent", _stats.timeout_now_sent, + sm::description("how many messages were sent"), {server_id_label(_id), message_type("timeout_now")}), sm::make_total_operations("waiter_awaiken", _stats.waiters_awaiken, sm::description("how many waiters got result back"), {server_id_label(_id)}), diff --git a/raft/tracker.cc b/raft/tracker.cc index 2df3d0e8ad..1259ab8ffc 100644 --- a/raft/tracker.cc +++ b/raft/tracker.cc @@ -121,6 +121,7 @@ void tracker::set_configuration(const configuration& configuration, index_t next } else { newp = this->progress::emplace(s.id, follower_progress{s.id, next_idx}).first; } + newp->second.can_vote = s.can_vote; if (s.id == _my_id) { // The leader is part of the current // configuration. diff --git a/raft/tracker.hh b/raft/tracker.hh index 9793853aff..88035d8fb8 100644 --- a/raft/tracker.hh +++ b/raft/tracker.hh @@ -38,6 +38,8 @@ public: index_t match_idx = index_t(0); // Index that we know to be committed by the follower index_t commit_idx = index_t(0); + // True if the follower is voting one + bool can_vote = true; enum class state { // In this state only one append entry is send until matching index is found diff --git a/service/raft/raft_rpc.cc b/service/raft/raft_rpc.cc index cd36b8fa97..6c17d70033 100644 --- a/service/raft/raft_rpc.cc +++ b/service/raft/raft_rpc.cc @@ -56,6 +56,11 @@ future<> raft_rpc::send_vote_reply(raft::server_id id, const raft::vote_reply& v netw::msg_addr(_raft_services.get_inet_address(id)), db::no_timeout, _group_id, _server_id, id, vote_reply); } +future<> raft_rpc::send_timeout_now(raft::server_id id, const raft::timeout_now& timeout_now) { + return _messaging.send_raft_timeout_now( + netw::msg_addr(_raft_services.get_inet_address(id)), db::no_timeout, _group_id, _server_id, id, timeout_now); +} + void raft_rpc::add_server(raft::server_id id, raft::server_info info) { // Parse gms::inet_address from server_info auto in = ser::as_input_stream(bytes_view(info)); @@ -89,6 +94,10 @@ void raft_rpc::request_vote_reply(raft::server_id from, raft::vote_reply vote_re _client->request_vote_reply(from, vote_reply); } +void raft_rpc::timeout_now_request(raft::server_id from, raft::timeout_now timeout_now) { + _client->timeout_now_request(from, timeout_now); +} + future raft_rpc::apply_snapshot(raft::server_id from, raft::install_snapshot snp) { return _client->apply_snapshot(from, std::move(snp)); } diff --git a/service/raft/raft_rpc.hh b/service/raft/raft_rpc.hh index f4866476d0..7cbfc99506 100644 --- a/service/raft/raft_rpc.hh +++ b/service/raft/raft_rpc.hh @@ -43,6 +43,7 @@ public: future<> send_append_entries_reply(raft::server_id id, const raft::append_reply& reply) override; future<> send_vote_request(raft::server_id id, const raft::vote_request& vote_request) override; future<> send_vote_reply(raft::server_id id, const raft::vote_reply& vote_reply) override; + future<> send_timeout_now(raft::server_id id, const raft::timeout_now& timeout_now) override; void add_server(raft::server_id id, raft::server_info info) override; void remove_server(raft::server_id id) override; future<> abort() override; @@ -52,5 +53,6 @@ public: void append_entries_reply(raft::server_id from, raft::append_reply reply); void request_vote(raft::server_id from, raft::vote_request vote_request); void request_vote_reply(raft::server_id from, raft::vote_reply vote_reply); + void timeout_now_request(raft::server_id from, raft::timeout_now timeout_now); future apply_snapshot(raft::server_id from, raft::install_snapshot snp); }; diff --git a/service/raft/raft_services.cc b/service/raft/raft_services.cc index 8312061bcb..5427a2028d 100644 --- a/service/raft/raft_services.cc +++ b/service/raft/raft_services.cc @@ -94,6 +94,14 @@ void raft_services::init_rpc_verbs() { return make_ready_future<>(); }); }); + + _ms.register_raft_timeout_now([handle_raft_rpc] (const rpc::client_info& cinfo, rpc::opt_time_point timeout, + uint64_t group_id, raft::server_id from, raft::server_id dst, raft::timeout_now timeout_now) mutable { + return handle_raft_rpc(cinfo, group_id, from, dst, [from, timeout_now] (raft_rpc& rpc) mutable { + rpc.timeout_now_request(std::move(from), std::move(timeout_now)); + return make_ready_future<>(); + }); + }); } future<> raft_services::uninit_rpc_verbs() { @@ -102,7 +110,8 @@ future<> raft_services::uninit_rpc_verbs() { _ms.unregister_raft_append_entries(), _ms.unregister_raft_append_entries_reply(), _ms.unregister_raft_vote_request(), - _ms.unregister_raft_vote_reply() + _ms.unregister_raft_vote_reply(), + _ms.unregister_raft_timeout_now() ).discard_result(); } @@ -220,4 +229,4 @@ void raft_services::update_address_mapping(raft::server_id id, gms::inet_address void raft_services::remove_address_mapping(raft::server_id id) { _server_addresses.erase(id); -} \ No newline at end of file +} diff --git a/test/raft/replication_test.cc b/test/raft/replication_test.cc index 425b1d4b6a..b454ec3343 100644 --- a/test/raft/replication_test.cc +++ b/test/raft/replication_test.cc @@ -358,6 +358,13 @@ public: net[id]->_client->request_vote_reply(_id, std::move(vote_reply)); return make_ready_future<>(); } + virtual future<> send_timeout_now(raft::server_id id, const raft::timeout_now& timeout_now) { + if (!_connected(id) || !_connected(_id)) { + return make_ready_future<>(); + } + net[id]->_client->timeout_now_request(_id, std::move(timeout_now)); + return make_ready_future<>(); + } virtual void add_server(raft::server_id id, bytes node_info) {} virtual void remove_server(raft::server_id id) {} virtual future<> abort() { return make_ready_future<>(); }