raft: implement prevoting stage in leader election

This is how PhD explain the need for prevoting stage:

  One downside of Raft's leader election algorithm is that a server that
  has been partitioned from the cluster is likely to cause a disruption
  when it regains connectivity. When a server is partitioned, it will
  not receive heartbeats. It will soon increment its term to start
  an election, although it won't be able to collect enough votes to
  become leader. When the server regains connectivity sometime later, its
  larger term number will propagate to the rest of the cluster (either
  through the server's RequestVote requests or through its AppendEntries
  response). This will force the cluster leader to step down, and a new
  election will have to take place to select a new leader.

  Prevoting stage is addressing that. In the Prevote algorithm, a
  candidate only increments its term if it first learns from a majority of
  the cluster that they would be willing to grant the candidate their votes
  (if the candidate's log is sufficiently up-to-date, and the voters have
  not received heartbeats from a valid leader for at least a baseline
  election timeout).

  The Prevote algorithm solves the issue of a partitioned server disrupting
  the cluster when it rejoins. While a server is partitioned, it won't
  be able to increment its term, since it can't receive permission
  from a majority of the cluster. Then, when it rejoins the cluster, it
  still won't be able to increment its term, since the other servers
  will have been receiving regular heartbeats from the leader. Once the
  server receives a heartbeat from the leader itself, it will return to
  the follower state(in the same term).

In our implementation we have "stable leader" extension that prevents
spurious RequestVote to dispose an active leader, but AppendEntries with
higher term will still do that, so prevoting extension is also required.
This commit is contained in:
Gleb Natapov
2021-03-03 15:45:33 +02:00
parent a849246cfc
commit 1f868d516e
8 changed files with 105 additions and 36 deletions

View File

@@ -62,11 +62,13 @@ struct vote_request {
raft::internal::tagged_uint64<raft::term_tag> current_term;
raft::internal::tagged_uint64<raft::index_tag> last_log_idx;
raft::internal::tagged_uint64<raft::term_tag> last_log_term;
bool is_prevote;
};
struct vote_reply {
raft::internal::tagged_uint64<raft::term_tag> current_term;
bool vote_granted;
bool is_prevote;
};
struct install_snapshot {

View File

@@ -31,6 +31,7 @@ fsm::fsm(server_id id, term_t current_term, server_id voted_for, log log,
_observed.advance(*this);
logger.trace("{}: starting log length {}", _my_id, _log.last_idx());
reset_election_timeout();
assert(!bool(_current_leader));
}
@@ -118,13 +119,11 @@ void fsm::update_current_term(term_t current_term)
assert(_current_term < current_term);
_current_term = current_term;
_voted_for = server_id{};
}
void fsm::reset_election_timeout() {
static thread_local std::default_random_engine re{std::random_device{}()};
static thread_local std::uniform_int_distribution<> dist(1, ELECTION_TIMEOUT.count());
// Reset the randomized election timeout on each term
// change, even if we do not plan to campaign during this
// term: the main purpose of the timeout is to avoid
// starting our campaign simultaneously with other followers.
_randomized_election_timeout = ELECTION_TIMEOUT + logical_clock::duration{dist(re)};
}
@@ -160,12 +159,15 @@ void fsm::become_follower(server_id leader) {
}
}
void fsm::become_candidate() {
void fsm::become_candidate(bool is_prevote) {
// When starting a campain we need to reset current leader otherwise
// disruptive server prevention will stall an election if quorum of nodes
// start election together since each one will ignore vote requests from others
_current_leader = {};
_state = candidate{};
reset_election_timeout();
_tracker = std::nullopt;
_log_limiter_semaphore = std::nullopt;
// 3.4 Leader election
@@ -178,7 +180,7 @@ void fsm::become_candidate() {
// and initiating another round of RequestVote RPCs.
_last_election_time = _clock.now();
_votes.emplace(_log.get_configuration());
_votes.emplace(_log.get_configuration(), is_prevote);
const auto& voters = _votes->voters();
if (!voters.contains(server_address{_my_id})) {
@@ -188,24 +190,35 @@ void fsm::become_candidate() {
become_follower(server_id{});
return;
}
update_current_term(term_t{_current_term + 1});
term_t term{_current_term + 1};
if (!is_prevote) {
update_current_term(term);
}
// Replicate RequestVote
for (const auto& server : voters) {
if (server.id == _my_id) {
// Vote for self.
_votes->register_vote(server.id, true);
_voted_for = _my_id;
if (!is_prevote) {
// Only record real votes
_voted_for = _my_id;
}
// Already signaled _sm_events in update_current_term()
continue;
}
logger.trace("{} [term: {}, index: {}, last log term: {}] sent vote request to {}",
_my_id, _current_term, _log.last_idx(), _log.last_term(), server.id);
logger.trace("{} [term: {}, index: {}, last log term: {}{}] sent vote request to {}",
_my_id, term, _log.last_idx(), _log.last_term(), is_prevote ? ", prevote" : "", server.id);
send_to(server.id, vote_request{_current_term, _log.last_idx(), _log.last_term()});
send_to(server.id, vote_request{term, _log.last_idx(), _log.last_term(), is_prevote});
}
if (_votes->tally_votes() == vote_result::WON) {
// A single node cluster.
become_leader();
if (is_prevote) {
become_candidate(false);
} else {
become_leader();
}
}
}
@@ -422,7 +435,7 @@ void fsm::tick() {
} else if (is_past_election_timeout()) {
logger.trace("tick[{}]: becoming a candidate, last election: {}, now: {}", _my_id,
_last_election_time, _clock.now());
become_candidate();
become_candidate(_config.enable_prevoting);
}
}
@@ -551,13 +564,17 @@ void fsm::request_vote(server_id from, vote_request&& request) {
// We can cast a vote in any state. If the candidate's term is
// lower than ours, we ignore the request. Otherwise we first
// update our current term and convert to a follower.
assert(_current_term == request.current_term);
assert(request.is_prevote || _current_term == request.current_term);
bool can_vote =
// We can vote if this is a repeat of a vote we've already cast...
_voted_for == from ||
// ...we haven't voted and we don't think there's a leader yet in this term...
(_voted_for == server_id{} && _current_leader == server_id{});
(_voted_for == server_id{} && _current_leader == server_id{}) ||
// ...this is prevote for a future term...
// (we will get here if the node does not know any leader yet and already
// voted for some other node, but now it get even newer prevote request)
(request.is_prevote && request.current_term > _current_term);
// ...and we believe the candidate is up to date.
if (can_vote && _log.is_up_to_date(request.last_log_idx, request.last_log_term)) {
@@ -566,23 +583,34 @@ void fsm::request_vote(server_id from, vote_request&& request) {
"voted for {} [log_term: {}, log_index: {}]",
_my_id, _current_term, _log.last_idx(), _log.last_term(), _voted_for,
from, request.last_log_term, request.last_log_idx);
// If a server grants a vote, it must reset its election
// timer. See Raft Summary.
_last_election_time = _clock.now();
_voted_for = from;
send_to(from, vote_reply{_current_term, true});
if (!request.is_prevote) { // Only record real votes
// If a server grants a vote, it must reset its election
// timer. See Raft Summary.
_last_election_time = _clock.now();
_voted_for = from;
}
// The term in the original message and current local term are the
// same in the case of regular votes, but different for pre-votes.
//
// When responding to {Pre,}Vote messages we include the term
// from the message, not the local term. To see why, consider the
// case where a single node was previously partitioned away and
// its local term is now out of date. If we include the local term
// (recall that for pre-votes we don't update the local term), the
// (pre-)campaigning node on the other end will proceed to ignore
// the message (it ignores all out of date messages).
send_to(from, vote_reply{request.current_term, true, request.is_prevote});
} else {
// If a vote is not granted, this server is a potential
// viable candidate, so it should not reset its election
// timer, to avoid election disruption by non-viable
// candidates.
logger.trace("{} [term: {}, index: {}, log_term: {}, voted_for: {}] "
"rejected vote for {} [log_term: {}, log_index: {}]",
"rejected vote for {} [current_term: {}, log_term: {}, log_index: {}, is_prevote: {}]",
_my_id, _current_term, _log.last_idx(), _log.last_term(), _voted_for,
from, request.last_log_term, request.last_log_idx);
from, request.current_term, request.last_log_term, request.last_log_idx, request.is_prevote);
send_to(from, vote_reply{_current_term, false});
send_to(from, vote_reply{_current_term, false, request.is_prevote});
}
}
@@ -597,7 +625,11 @@ void fsm::request_vote_reply(server_id from, vote_reply&& reply) {
case vote_result::UNKNOWN:
break;
case vote_result::WON:
become_leader();
if (_votes->is_prevote()) {
become_candidate(false);
} else {
become_leader();
}
break;
case vote_result::LOST:
become_follower(server_id{});

View File

@@ -48,6 +48,8 @@ struct fsm_config {
// is configured by the snapshot, otherwise the state
// machine will deadlock.
size_t max_log_size;
// If set to true will enable prevoting stage during election
bool enable_prevoting;
};
// 3.4 Leader election
@@ -223,7 +225,7 @@ private:
void become_leader();
void become_candidate();
void become_candidate(bool is_prevote);
void become_follower(server_id leader);
@@ -250,6 +252,8 @@ private:
// Tick implementation on a leader
void tick_leader();
void reset_election_timeout();
public:
explicit fsm(server_id id, term_t current_term, server_id voted_for, log log,
failure_detector& failure_detector, fsm_config conf);
@@ -353,11 +357,13 @@ void fsm::step(server_id from, Message&& msg) {
// follower state. If a server receives a request with
// a stale term number, it rejects the request.
if (msg.current_term > _current_term) {
server_id leader{};
logger.trace("{} [term: {}] received a message with higher term from {} [term: {}]",
_my_id, _current_term, from, msg.current_term);
if constexpr (std::is_same_v<Message, append_request>) {
become_follower(from);
leader = from;
} else {
if constexpr (std::is_same_v<Message, vote_request>) {
if (_current_leader != server_id{} && election_elapsed() < ELECTION_TIMEOUT) {
@@ -366,15 +372,29 @@ void fsm::step(server_id from, Message&& msg) {
// within the minimum election timeout of
// hearing from a current leader, it does not
// update its term or grant its vote.
logger.trace("{} [term: {}] not granting a vote within a minimum election timeout, elapsed {}",
_my_id, _current_term, election_elapsed());
logger.trace("{} [term: {}] not granting a vote within a minimum election timeout, elapsed {} (current leader = {})",
_my_id, _current_term, election_elapsed(), _current_leader);
return;
}
}
become_follower(server_id{});
}
update_current_term(msg.current_term);
bool ignore_term = false;
if constexpr (std::is_same_v<Message, vote_request>) {
// Do not update term on prevote request
ignore_term = msg.is_prevote;
} else if constexpr (std::is_same_v<Message, vote_reply>) {
// We send pre-vote requests with a term in our future. If the
// pre-vote is granted, we will increment our term when we get a
// quorum. If it is not, the term comes from the node that
// rejected our vote so we should become a follower at the new
// term.
ignore_term = msg.is_prevote && msg.vote_granted;
}
if (!ignore_term) {
become_follower(leader);
update_current_term(msg.current_term);
}
} else if (msg.current_term < _current_term) {
if constexpr (std::is_same_v<Message, append_request>) {
// Instructs the leader to step down.
@@ -382,6 +402,10 @@ void fsm::step(server_id from, Message&& msg) {
send_to(from, std::move(reply));
} else if constexpr (std::is_same_v<Message, install_snapshot>) {
send_to(from, snapshot_reply{ .success = false });
} else if constexpr (std::is_same_v<Message, vote_request>) {
if (msg.is_prevote) {
send_to(from, vote_reply{_current_term, false, true});
}
} else {
// Ignore other cases
logger.trace("{} [term: {}] ignored a message with lower term from {} [term: {}]",

View File

@@ -259,6 +259,8 @@ struct vote_request {
index_t last_log_idx;
// The term of the candidate's last log entry.
term_t last_log_term;
// True if this is prevote request
bool is_prevote;
};
struct vote_reply {
@@ -266,6 +268,8 @@ struct vote_reply {
term_t current_term;
// True means the candidate received a vote.
bool vote_granted;
// True if it is a reply to prevote request
bool is_prevote;
};
struct install_snapshot {

View File

@@ -198,7 +198,8 @@ future<> server_impl::start() {
_fsm = std::make_unique<fsm>(_id, term, vote, std::move(log), *_failure_detector,
fsm_config {
.append_request_threshold = _config.append_request_threshold,
.max_log_size = _config.max_log_size
.max_log_size = _config.max_log_size,
.enable_prevoting = _config.enable_prevoting
});
assert(_fsm->get_current_term() != term_t(0));
@@ -604,7 +605,7 @@ void server_impl::register_metrics() {
}
future<> server_impl::elect_me_leader() {
while (!_fsm->is_candidate() && !_fsm->is_leader()) {
while (_fsm->is_follower()) {
_fsm->tick();
}
do {

View File

@@ -46,6 +46,8 @@ public:
// is configured by the snapshot, otherwise the state
// machine will deadlock on attempt to submit a new entry.
size_t max_log_size = 5000;
// If set to true will enable prevoting stage during election
bool enable_prevoting = true;
};
virtual ~server() {}

View File

@@ -202,9 +202,9 @@ index_t tracker::committed(index_t prev_commit_idx) {
}
}
votes::votes(configuration configuration)
votes::votes(configuration configuration, bool is_prevote)
:_voters(configuration.current)
, _current(configuration.current) {
, _current(configuration.current), _is_prevote(is_prevote) {
if (configuration.is_joint()) {
_previous.emplace(configuration.previous);

View File

@@ -176,8 +176,9 @@ class votes {
server_address_set _voters;
election_tracker _current;
std::optional<election_tracker> _previous;
bool _is_prevote;
public:
votes(configuration configuration);
votes(configuration configuration, bool is_prevote = false);
const server_address_set& voters() const {
return _voters;
@@ -185,6 +186,9 @@ public:
void register_vote(server_id from, bool granted);
vote_result tally_votes() const;
bool is_prevote() const {
return _is_prevote;
};
friend std::ostream& operator<<(std::ostream& os, const votes& v);
};