raft: introduce leader stepdown procedure

Section 3.10 of the PhD describes two cases for which the extension can
be helpful:

1. Sometimes the leader must step down. For example, it may need to reboot
 for maintenance, or it may be removed from the cluster. When it steps
 down, the cluster will be idle for an election timeout until another
 server times out and wins an election. This brief unavailability can be
 avoided by having the leader transfer its leadership to another server
 before it steps down.

2. In some cases, one or more servers may be more suitable to lead the
 cluster than others. For example, a server with high load would not make
 a good leader, or in a WAN deployment, servers in a primary datacenter
 may be preferred in order to minimize the latency between clients and
 the leader. Other consensus algorithms may be able to accommodate these
 preferences during leader election, but Raft needs a server with a
 sufficiently up-to-date log to become leader, which might not be the
 most preferred one. Instead, a leader in Raft can periodically check
 to see whether one of its available followers would be more suitable,
 and if so, transfer its leadership to that server. (If only human leaders
 were so graceful.)

The patch here implements the extension and employs it automatically
when a leader removes itself from a cluster.
This commit is contained in:
Gleb Natapov
2021-03-11 11:19:23 +02:00
parent 888b52dea1
commit 9d6bf7f351
13 changed files with 155 additions and 17 deletions

View File

@@ -63,6 +63,7 @@ struct vote_request {
raft::internal::tagged_uint64<raft::index_tag> last_log_idx;
raft::internal::tagged_uint64<raft::term_tag> last_log_term;
bool is_prevote;
bool force;
};
struct vote_reply {
@@ -111,4 +112,8 @@ struct append_request {
std::vector<lw_shared_ptr<const raft::log_entry>> entries;
};
struct timeout_now {
raft::internal::tagged_uint64<raft::term_tag> current_term;
};
} // namespace raft

View File

@@ -576,6 +576,7 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) {
case messaging_verb::RAFT_APPEND_ENTRIES_REPLY:
case messaging_verb::RAFT_VOTE_REQUEST:
case messaging_verb::RAFT_VOTE_REPLY:
case messaging_verb::RAFT_TIMEOUT_NOW:
return 2;
case messaging_verb::MUTATION_DONE:
case messaging_verb::MUTATION_FAILED:
@@ -1547,6 +1548,15 @@ future<> messaging_service::send_raft_vote_reply(msg_addr id, clock_type::time_p
return send_message_oneway_timeout(this, timeout, messaging_verb::RAFT_VOTE_REPLY, std::move(id), group_id, std::move(from_id), std::move(dst_id), vote_reply);
}
void messaging_service::register_raft_timeout_now(std::function<future<> (const rpc::client_info&, rpc::opt_time_point, uint64_t group_id, raft::server_id from_id, raft::server_id dst_id, raft::timeout_now)>&& func) {
register_handler(this, netw::messaging_verb::RAFT_TIMEOUT_NOW, std::move(func));
}
future<> messaging_service::unregister_raft_timeout_now() {
return unregister_handler(netw::messaging_verb::RAFT_TIMEOUT_NOW);
}
future<> messaging_service::send_raft_timeout_now(msg_addr id, clock_type::time_point timeout, uint64_t group_id, raft::server_id from_id, raft::server_id dst_id, const raft::timeout_now& timeout_now) {
return send_message_oneway_timeout(this, timeout, messaging_verb::RAFT_TIMEOUT_NOW, std::move(id), group_id, std::move(from_id), std::move(dst_id), timeout_now);
}
void init_messaging_service(sharded<messaging_service>& ms,
messaging_service::config mscfg, netw::messaging_service::scheduling_config scfg,
sstring ms_trust_store, sstring ms_cert, sstring ms_key, sstring ms_tls_prio, bool ms_client_auth) {

View File

@@ -150,7 +150,8 @@ enum class messaging_verb : int32_t {
RAFT_APPEND_ENTRIES_REPLY = 48,
RAFT_VOTE_REQUEST = 49,
RAFT_VOTE_REPLY = 50,
LAST = 51,
RAFT_TIMEOUT_NOW = 51,
LAST = 52,
};
} // namespace netw
@@ -574,6 +575,10 @@ public:
future<> unregister_raft_vote_reply();
future<> send_raft_vote_reply(msg_addr id, clock_type::time_point timeout, uint64_t group_id, raft::server_id from_id, raft::server_id dst_id, const raft::vote_reply& vote_reply);
void register_raft_timeout_now(std::function<future<> (const rpc::client_info&, rpc::opt_time_point, uint64_t group_id, raft::server_id from_id, raft::server_id dst_id, raft::timeout_now)>&& func);
future<> unregister_raft_timeout_now();
future<> send_raft_timeout_now(msg_addr id, clock_type::time_point timeout, uint64_t group_id, raft::server_id from_id, raft::server_id dst_id, const raft::timeout_now& timeout_now);
void foreach_server_connection_stats(std::function<void(const rpc::client_info&, const rpc::stats&)>&& f) const;
private:
bool remove_rpc_client_one(clients_map& clients, msg_addr id, bool dead_only);

View File

@@ -51,6 +51,12 @@ template<typename T>
const log_entry& fsm::add_entry(T command) {
// It's only possible to add entries on a leader.
check_is_leader();
if(leader_state().stepdown) {
// A leader that is stepping down should not add new entries
// to its log (see 3.10), but it still does not know who the new
// leader will be.
throw not_a_leader({});
}
if constexpr (std::is_same_v<T, configuration>) {
if (_log.last_conf_idx() > _commit_idx ||
@@ -152,7 +158,7 @@ void fsm::become_follower(server_id leader) {
}
}
void fsm::become_candidate(bool is_prevote) {
void fsm::become_candidate(bool is_prevote, bool is_leadership_transfer) {
// When starting a campain we need to reset current leader otherwise
// disruptive server prevention will stall an election if quorum of nodes
// start election together since each one will ignore vote requests from others
@@ -198,10 +204,11 @@ void fsm::become_candidate(bool is_prevote) {
// Already signaled _sm_events in update_current_term()
continue;
}
logger.trace("{} [term: {}, index: {}, last log term: {}{}] sent vote request to {}",
_my_id, term, _log.last_idx(), _log.last_term(), is_prevote ? ", prevote" : "", server.id);
logger.trace("{} [term: {}, index: {}, last log term: {}{}{}] sent vote request to {}",
_my_id, term, _log.last_idx(), _log.last_term(), is_prevote ? ", prevote" : "",
is_leadership_transfer ? ", force" : "", server.id);
send_to(server.id, vote_request{term, _log.last_idx(), _log.last_term(), is_prevote});
send_to(server.id, vote_request{term, _log.last_idx(), _log.last_term(), is_prevote, is_leadership_transfer});
}
if (votes.tally_votes() == vote_result::WON) {
// A single node cluster.
@@ -295,6 +302,7 @@ fsm_output fsm::get_output() {
void fsm::advance_stable_idx(index_t idx) {
_log.stable_to(idx);
if (is_leader()) {
replicate();
if (leader_state().tracker.leader_progress()) {
// If this server is leader and is part of the current
// configuration, update it's progress and optionally
@@ -302,7 +310,6 @@ void fsm::advance_stable_idx(index_t idx) {
leader_state().tracker.leader_progress()->accepted(idx);
maybe_commit();
}
replicate();
}
}
@@ -367,10 +374,7 @@ void fsm::maybe_commit() {
//
// A leader that is removed from the configuration
// steps down once the C_new entry is committed.
//
// @todo: when leadership transfer extension is
// implemented, send TimeoutNow to a member of C_new
become_follower(server_id{});
transfer_leadership();
}
}
}
@@ -510,11 +514,18 @@ void fsm::append_entries_reply(server_id from, append_reply&& reply) {
progress.become_pipeline();
// If a leader is stepping down, transfer the leadership
// to a first voting node that has fully replicated log.
if (leader_state().stepdown && !leader_state().timeout_now_sent &&
progress.can_vote && progress.match_idx == _log.last_idx()) {
send_timeout_now(progress.id);
}
// check if any new entry can be committed
maybe_commit();
// We may have resigned leadership if committed a new
// configuration.
// We may have resigned leadership if a stepdown process completed
// while the leader is no longer part of the configuration.
if (!is_leader()) {
return;
}
@@ -800,6 +811,28 @@ bool fsm::apply_snapshot(snapshot snp, size_t trailing) {
return true;
}
void fsm::transfer_leadership() {
check_is_leader();
leader_state().stepdown = true;
// Stop new requests from commig in
leader_state().log_limiter_semaphore.consume(_config.max_log_size);
// If there is a fully up-to-date voting replica make it start an election
for (auto&& [_, p] : leader_state().tracker) {
if (p.id != _my_id && p.can_vote && p.match_idx == _log.last_idx()) {
send_timeout_now(p.id);
break;
}
}
}
void fsm::send_timeout_now(server_id id) {
send_to(id, timeout_now{_current_term});
leader_state().timeout_now_sent = true;
if (leader_state().tracker.leader_progress() == nullptr) {
become_follower({});
}
}
void fsm::stop() {
_sm_events.broken();
}

View File

@@ -85,6 +85,10 @@ struct leader {
const server_id& current_leader;
// Used to limit log size
seastar::semaphore log_limiter_semaphore;
// True if the leader is in the process of transferring the leadership
bool stepdown = false;
// True it timeout_now was already sent to one of the followers
bool timeout_now_sent = false;
leader(server_id id, size_t max_log_size, const server_id& leader_) : tracker(id), current_leader(leader_), log_limiter_semaphore(max_log_size) {}
~leader() {
log_limiter_semaphore.broken(not_a_leader(current_leader));
@@ -225,7 +229,7 @@ private:
void become_leader();
void become_candidate(bool is_prevote);
void become_candidate(bool is_prevote, bool is_leadership_transfer = false);
void become_follower(server_id leader);
@@ -264,6 +268,7 @@ private:
return std::get<candidate>(_state);
}
void send_timeout_now(server_id);
protected: // For testing
leader& leader_state() {
return std::get<leader>(_state);
@@ -329,6 +334,15 @@ public:
template <typename Message>
void step(server_id from, Message&& msg);
// This function can be called on a leader only.
// When called it makes the leader to stop accepting
// new requests and waits for one of the voting followers
// to be fully up-to-date. When such follower appears it
// sends timeout_now rpc to it and makes it initiate new election.
// Can be used for leader stepdown if new configuration does not contain
// current leader.
void transfer_leadership();
void stop();
// @sa can_read()
@@ -360,6 +374,7 @@ public:
friend std::ostream& operator<<(std::ostream& os, const fsm& f);
};
template <typename Message>
void fsm::step(server_id from, Message&& msg) {
static_assert(std::is_rvalue_reference<decltype(msg)>::value, "must be rvalue");
@@ -387,12 +402,14 @@ void fsm::step(server_id from, Message&& msg) {
leader = from;
} else {
if constexpr (std::is_same_v<Message, vote_request>) {
if (_current_leader != server_id{} && election_elapsed() < ELECTION_TIMEOUT) {
if (_current_leader != server_id{} && election_elapsed() < ELECTION_TIMEOUT && !msg.force) {
// 4.2.3 Disruptive servers
// If a server receives a RequestVote request
// within the minimum election timeout of
// hearing from a current leader, it does not
// update its term or grant its vote.
// Unless `force` flag is set which indicates that the current leader
// wants to stepdown.
logger.trace("{} [term: {}] not granting a vote within a minimum election timeout, elapsed {} (current leader = {})",
_my_id, _current_term, election_elapsed(), _current_leader);
return;
@@ -491,6 +508,13 @@ void fsm::step(server_id from, Message&& msg) {
// Switch the follower to log transfer mode.
install_snapshot_reply(from, std::move(msg));
}
} else if constexpr (std::is_same_v<Message, timeout_now>) {
if constexpr (std::is_same_v<State, follower>) {
// Leadership transfers never use pre-vote; we know we are not
// recovering from a partition so there is no need for the
// extra round trip.
become_candidate(false, true);
}
}
};

View File

@@ -261,6 +261,9 @@ struct vote_request {
term_t last_log_term;
// True if this is prevote request
bool is_prevote;
// If the flag is set the request will not be ignored even
// if there is an active leader. Used during leadership transfer.
bool force;
};
struct vote_reply {
@@ -286,7 +289,13 @@ struct snapshot_reply {
bool success;
};
using rpc_message = std::variant<append_request, append_reply, vote_request, vote_reply, install_snapshot, snapshot_reply>;
// 3.10 section from PhD Leadership transfer extension
struct timeout_now {
// Current term on a leader
term_t current_term;
};
using rpc_message = std::variant<append_request, append_reply, vote_request, vote_reply, install_snapshot, snapshot_reply, timeout_now>;
// we need something that can be truncated form both sides.
// std::deque move constructor is not nothrow hence cannot be used
@@ -382,6 +391,11 @@ public:
// received.
virtual future<> send_vote_reply(server_id id, const vote_reply& vote_reply) = 0;
// Send a request to start leader election immediately
// resolves when message is sent. It does not mean it was
// received.
virtual future<> send_timeout_now(server_id, const timeout_now& timeout_now) = 0;
// When a new server is learn this function is called with the
// info about the server.
virtual void add_server(server_id id, server_info info) = 0;
@@ -416,6 +430,8 @@ public:
// Handle response to RequestVote RPC
virtual void request_vote_reply(server_id from, vote_reply vote_reply) = 0;
virtual void timeout_now_request(server_id from, timeout_now timeout_now) = 0;
// Apply incoming snapshot, future resolves when application is complete
virtual future<snapshot_reply> apply_snapshot(server_id from, install_snapshot snp) = 0;

View File

@@ -57,6 +57,7 @@ public:
void append_entries_reply(server_id from, append_reply reply) override;
void request_vote(server_id from, vote_request vote_request) override;
void request_vote_reply(server_id from, vote_reply vote_reply) override;
void timeout_now_request(server_id from, timeout_now timeout_now) override;
// server interface
future<> add_entry(command command, wait_type type);
@@ -110,6 +111,8 @@ private:
uint64_t queue_entries_for_apply = 0;
uint64_t applied_entries = 0;
uint64_t snapshots_taken = 0;
uint64_t timeout_now_sent = 0;
uint64_t timeout_now_received = 0;
} _stats;
struct op_status {
@@ -263,6 +266,11 @@ void server_impl::request_vote_reply(server_id from, vote_reply vote_reply) {
_fsm->step(from, std::move(vote_reply));
}
void server_impl::timeout_now_request(server_id from, timeout_now timeout_now) {
_stats.timeout_now_received++;
_fsm->step(from, std::move(timeout_now));
}
void server_impl::notify_waiters(std::map<index_t, op_status>& waiters,
const std::vector<log_entry_ptr>& entries) {
index_t commit_idx = entries.back()->idx;
@@ -321,6 +329,9 @@ future<> server_impl::send_message(server_id id, Message m) {
} else if constexpr (std::is_same_v<T, vote_reply>) {
_stats.vote_request_reply_sent++;
return _rpc->send_vote_reply(id, m);
} else if constexpr (std::is_same_v<T, timeout_now>) {
_stats.timeout_now_sent++;
return _rpc->send_timeout_now(id, m);
} else if constexpr (std::is_same_v<T, install_snapshot>) {
_stats.install_snapshot_sent++;
// Send in the background.
@@ -558,6 +569,8 @@ void server_impl::register_metrics() {
sm::description("how many messages were received"), {server_id_label(_id), message_type("request_vote")}),
sm::make_total_operations("messages_received", _stats.request_vote_reply_received,
sm::description("how many messages were received"), {server_id_label(_id), message_type("request_vote_reply")}),
sm::make_total_operations("messages_received", _stats.timeout_now_received,
sm::description("how many messages were received"), {server_id_label(_id), message_type("timeout_now")}),
sm::make_total_operations("messages_sent", _stats.append_entries_sent,
sm::description("how many messages were send"), {server_id_label(_id), message_type("append_entries")}),
@@ -571,6 +584,8 @@ void server_impl::register_metrics() {
sm::description("how many messages were sent"), {server_id_label(_id), message_type("install_snapshot")}),
sm::make_total_operations("messages_sent", _stats.snapshot_reply_sent,
sm::description("how many messages were sent"), {server_id_label(_id), message_type("snapshot_reply")}),
sm::make_total_operations("messages_sent", _stats.timeout_now_sent,
sm::description("how many messages were sent"), {server_id_label(_id), message_type("timeout_now")}),
sm::make_total_operations("waiter_awaiken", _stats.waiters_awaiken,
sm::description("how many waiters got result back"), {server_id_label(_id)}),

View File

@@ -121,6 +121,7 @@ void tracker::set_configuration(const configuration& configuration, index_t next
} else {
newp = this->progress::emplace(s.id, follower_progress{s.id, next_idx}).first;
}
newp->second.can_vote = s.can_vote;
if (s.id == _my_id) {
// The leader is part of the current
// configuration.

View File

@@ -38,6 +38,8 @@ public:
index_t match_idx = index_t(0);
// Index that we know to be committed by the follower
index_t commit_idx = index_t(0);
// True if the follower is voting one
bool can_vote = true;
enum class state {
// In this state only one append entry is send until matching index is found

View File

@@ -56,6 +56,11 @@ future<> raft_rpc::send_vote_reply(raft::server_id id, const raft::vote_reply& v
netw::msg_addr(_raft_services.get_inet_address(id)), db::no_timeout, _group_id, _server_id, id, vote_reply);
}
future<> raft_rpc::send_timeout_now(raft::server_id id, const raft::timeout_now& timeout_now) {
return _messaging.send_raft_timeout_now(
netw::msg_addr(_raft_services.get_inet_address(id)), db::no_timeout, _group_id, _server_id, id, timeout_now);
}
void raft_rpc::add_server(raft::server_id id, raft::server_info info) {
// Parse gms::inet_address from server_info
auto in = ser::as_input_stream(bytes_view(info));
@@ -89,6 +94,10 @@ void raft_rpc::request_vote_reply(raft::server_id from, raft::vote_reply vote_re
_client->request_vote_reply(from, vote_reply);
}
void raft_rpc::timeout_now_request(raft::server_id from, raft::timeout_now timeout_now) {
_client->timeout_now_request(from, timeout_now);
}
future<raft::snapshot_reply> raft_rpc::apply_snapshot(raft::server_id from, raft::install_snapshot snp) {
return _client->apply_snapshot(from, std::move(snp));
}

View File

@@ -43,6 +43,7 @@ public:
future<> send_append_entries_reply(raft::server_id id, const raft::append_reply& reply) override;
future<> send_vote_request(raft::server_id id, const raft::vote_request& vote_request) override;
future<> send_vote_reply(raft::server_id id, const raft::vote_reply& vote_reply) override;
future<> send_timeout_now(raft::server_id id, const raft::timeout_now& timeout_now) override;
void add_server(raft::server_id id, raft::server_info info) override;
void remove_server(raft::server_id id) override;
future<> abort() override;
@@ -52,5 +53,6 @@ public:
void append_entries_reply(raft::server_id from, raft::append_reply reply);
void request_vote(raft::server_id from, raft::vote_request vote_request);
void request_vote_reply(raft::server_id from, raft::vote_reply vote_reply);
void timeout_now_request(raft::server_id from, raft::timeout_now timeout_now);
future<raft::snapshot_reply> apply_snapshot(raft::server_id from, raft::install_snapshot snp);
};

View File

@@ -94,6 +94,14 @@ void raft_services::init_rpc_verbs() {
return make_ready_future<>();
});
});
_ms.register_raft_timeout_now([handle_raft_rpc] (const rpc::client_info& cinfo, rpc::opt_time_point timeout,
uint64_t group_id, raft::server_id from, raft::server_id dst, raft::timeout_now timeout_now) mutable {
return handle_raft_rpc(cinfo, group_id, from, dst, [from, timeout_now] (raft_rpc& rpc) mutable {
rpc.timeout_now_request(std::move(from), std::move(timeout_now));
return make_ready_future<>();
});
});
}
future<> raft_services::uninit_rpc_verbs() {
@@ -102,7 +110,8 @@ future<> raft_services::uninit_rpc_verbs() {
_ms.unregister_raft_append_entries(),
_ms.unregister_raft_append_entries_reply(),
_ms.unregister_raft_vote_request(),
_ms.unregister_raft_vote_reply()
_ms.unregister_raft_vote_reply(),
_ms.unregister_raft_timeout_now()
).discard_result();
}
@@ -220,4 +229,4 @@ void raft_services::update_address_mapping(raft::server_id id, gms::inet_address
void raft_services::remove_address_mapping(raft::server_id id) {
_server_addresses.erase(id);
}
}

View File

@@ -358,6 +358,13 @@ public:
net[id]->_client->request_vote_reply(_id, std::move(vote_reply));
return make_ready_future<>();
}
virtual future<> send_timeout_now(raft::server_id id, const raft::timeout_now& timeout_now) {
if (!_connected(id) || !_connected(_id)) {
return make_ready_future<>();
}
net[id]->_client->timeout_now_request(_id, std::move(timeout_now));
return make_ready_future<>();
}
virtual void add_server(raft::server_id id, bytes node_info) {}
virtual void remove_server(raft::server_id id) {}
virtual future<> abort() { return make_ready_future<>(); }