raft: fsm: fix assertion failure on stray rejects
When probes are sent over a slow network, the leader would send multiple probes to a lagging follower before it would get a reject response to the first probe back. After getting a reject, the leader will be able to correctly position `next_idx` for that follower and switch to pipeline mode. Then, an out of order reject to a now irrelevant probe could crash the leader, since it would effectively request it to "rewind" its `match_idx` for that follower, and the code asserts this never happens. We fix the problem by strengthening `is_stray_reject`. The check that was previously only made in `PIPELINE` case (`rejected.non_matching_idx <= match_idx`) is now always performed and we add a new check: `rejected.last_idx < match_idx`. We also strengthen the assert. The commit improves the documentation by explaining that `is_stray_reject` may return false negatives. We also precisely state the preconditions and postconditions of `is_stray_reject`, give a more precise definition of `progress.match_idx`, argue how the postconditions of `is_stray_reject` follow from its preconditions and Raft invariants, and argue why the (strengthened) assert must always pass. Message-Id: <20210423173117.32939-1-kbraun@scylladb.com>
This commit is contained in:
committed by
Tomasz Grabiec
parent
fba1910770
commit
4c95277619
10
raft/fsm.cc
10
raft/fsm.cc
@@ -594,6 +594,11 @@ void fsm::append_entries_reply(server_id from, append_reply&& reply) {
|
||||
return;
|
||||
}
|
||||
|
||||
// is_stray_reject may return a false negative so even if the check above passes,
|
||||
// we may still be dealing with a stray reject. That's fine though; it is always safe
|
||||
// to rollback next_idx on a reject and in fact that's what the Raft spec (TLA+) does.
|
||||
// Detecting stray rejects is an optimization that should rarely even be needed.
|
||||
|
||||
// Start re-sending from the non matching index, or from
|
||||
// the last index in the follower's log.
|
||||
// FIXME: make it more efficient
|
||||
@@ -601,8 +606,9 @@ void fsm::append_entries_reply(server_id from, append_reply&& reply) {
|
||||
|
||||
progress.become_probe();
|
||||
|
||||
// We should not fail to apply an entry following the matched one.
|
||||
assert(progress.next_idx != progress.match_idx);
|
||||
// By `is_stray_reject(rejected) == false` we know that `rejected.non_matching_idx > progress.match_idx`
|
||||
// and `rejected.last_idx + 1 > progress.match_idx`. By the assignment to `progress.next_idx` above, we get:
|
||||
assert(progress.next_idx > progress.match_idx);
|
||||
}
|
||||
|
||||
// We may have just applied a configuration that removes this
|
||||
|
||||
@@ -257,6 +257,8 @@ private:
|
||||
void replicate_to(follower_progress& progress, bool allow_empty);
|
||||
void replicate();
|
||||
void append_entries(server_id from, append_request&& append_request);
|
||||
|
||||
// Precondition: `is_leader() && reply.current_term == _current_term`
|
||||
void append_entries_reply(server_id from, append_reply&& reply);
|
||||
|
||||
void request_vote(server_id from, vote_request&& vote_request);
|
||||
|
||||
@@ -24,16 +24,32 @@
|
||||
namespace raft {
|
||||
|
||||
bool follower_progress::is_stray_reject(const append_reply::rejected& rejected) {
|
||||
// By precondition, we are the leader and `rejected.current_term` is equal to our term.
|
||||
// By definition of `match_idx` we know that at some point all entries up to and including
|
||||
// `match_idx` were the same in our log and the follower's log; ...
|
||||
if (rejected.non_matching_idx <= match_idx) {
|
||||
// ... in particular, entry `rejected.non_matching_idx` (which is <= `match_idx`) at some point
|
||||
// was the same in our log and the follower's log, but `rejected` claims they are different.
|
||||
// A follower cannot change an entry unless it enters a different term, but `rejected.current_term`
|
||||
// is equal to our term. Thus `rejected` must be stray.
|
||||
return true;
|
||||
}
|
||||
if (rejected.last_idx < match_idx) {
|
||||
// ... in particular, at some point the follower had to have an entry with index `rejected.last_idx + 1`
|
||||
// (because `rejected.last_idx < match_idx implies rejected.last_idx + 1 <= match_idx)
|
||||
// but `rejected` claims it doesn't have such entry now.
|
||||
// A follower cannot truncate a suffix of its log unless it enters a different term,
|
||||
// but `rejected.current_term` is equal to our term. Thus `rejected` must be stray.
|
||||
return true;
|
||||
}
|
||||
|
||||
switch (state) {
|
||||
case follower_progress::state::PIPELINE:
|
||||
if (rejected.non_matching_idx <= match_idx) {
|
||||
// If rejected index is smaller that matched it means this is a stray reply
|
||||
return true;
|
||||
}
|
||||
break;
|
||||
case follower_progress::state::PROBE:
|
||||
// In the probe state the reply is only valid if it matches next_idx - 1, since only
|
||||
// one append request is outstanding.
|
||||
// In PROBE state we send a single append request `req` with `req.prev_log_idx == next_idx - 1`.
|
||||
// When the follower generates a rejected response `r`, it sets `r.non_matching_idx = req.prev_log_idx`.
|
||||
// Thus the reject either satisfies `rejected.non_matching_idx == next_idx - 1` or is stray.
|
||||
if (rejected.non_matching_idx != index_t(next_idx - 1)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -32,9 +32,12 @@ public:
|
||||
// Id of this server
|
||||
const server_id id;
|
||||
// Index of the next log entry to send to this server.
|
||||
// Invariant: next_idx > match_idx.
|
||||
index_t next_idx;
|
||||
// Index of the highest log entry known to be replicated to this
|
||||
// server.
|
||||
// Index of the highest log entry known to be replicated to this server.
|
||||
// More specifically, this is the greatest `last_new_idx` received from this follower
|
||||
// in an `accepted` message. As long as the follower remains in our term we know
|
||||
// that its log must match with ours up to (and including) `match_idx`.
|
||||
index_t match_idx = index_t(0);
|
||||
// Index that we know to be committed by the follower
|
||||
index_t commit_idx = index_t(0);
|
||||
@@ -56,9 +59,16 @@ public:
|
||||
size_t in_flight = 0;
|
||||
static constexpr size_t max_in_flight = 10;
|
||||
|
||||
// check if a reject packet should be ignored because it was delayed
|
||||
// or reordered
|
||||
bool is_stray_reject(const append_reply::rejected&);
|
||||
// Check if a reject packet should be ignored because it was delayed or reordered.
|
||||
// This is not 100% accurate (may return false negatives) and should only be relied on
|
||||
// for liveness optimizations, not for safety.
|
||||
//
|
||||
// Precondition: we are the leader and `r.current_term` is equal to our term (`_current_term`).
|
||||
// Postcondition: if the function returns `false` it is guaranteed that:
|
||||
// 1. `match_idx < r.non_matching_idx`.
|
||||
// 2. `match_idx < r.last_idx + 1`.
|
||||
// 3. If we're in PROBE mode then `next_idx == r.non_matching_idx + 1`.
|
||||
bool is_stray_reject(const append_reply::rejected& r);
|
||||
|
||||
void become_probe();
|
||||
void become_pipeline();
|
||||
|
||||
@@ -1451,3 +1451,65 @@ BOOST_AUTO_TEST_CASE(test_zero) {
|
||||
BOOST_CHECK_THROW(raft::configuration cfg(raft::server_address_set{raft::server_address{id}}), std::invalid_argument);
|
||||
BOOST_CHECK_THROW(create_follower(id, raft::log(raft::snapshot{})), std::invalid_argument);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_reordered_reject) {
|
||||
auto id1 = id();
|
||||
raft::fsm fsm1(id1, term_t{1}, server_id{},
|
||||
raft::log{raft::snapshot{.config = {{raft::server_address{id1.id}}}}},
|
||||
trivial_failure_detector, fsm_cfg);
|
||||
|
||||
while (!fsm1.is_leader()) {
|
||||
fsm1.tick();
|
||||
}
|
||||
|
||||
fsm1.add_entry(log_entry::dummy{});
|
||||
(void)fsm1.get_output();
|
||||
|
||||
auto id2 = id();
|
||||
raft::fsm fsm2(id2, term_t{1}, server_id{},
|
||||
raft::log{raft::snapshot{.config = raft::configuration{}}},
|
||||
trivial_failure_detector, fsm_cfg);
|
||||
|
||||
raft_routing_map routes{{fsm1.id(), &fsm1}, {fsm2.id(), &fsm2}};
|
||||
|
||||
fsm1.add_entry(raft::configuration{{raft::server_address{fsm1.id().id}, raft::server_address{fsm2.id().id}}});
|
||||
fsm1.tick();
|
||||
|
||||
// fsm1 sends append_entries with idx=2 to fsm2
|
||||
auto append_idx2_1 = fsm1.get_output();
|
||||
|
||||
fsm1.tick();
|
||||
|
||||
// fsm1 sends append_entries with idx=2 to fsm2 (again)
|
||||
auto append_idx2_2 = fsm1.get_output();
|
||||
|
||||
raft::logger.trace("delivering first append idx=2");
|
||||
deliver(routes, fsm1.id(), std::move(append_idx2_1.messages));
|
||||
|
||||
// fsm2 rejects the first idx=2 append
|
||||
auto reject_1 = fsm2.get_output();
|
||||
|
||||
raft::logger.trace("delivering second append idx=2");
|
||||
deliver(routes, fsm1.id(), std::move(append_idx2_2.messages));
|
||||
|
||||
// fsm2 rejects the second idx=2 append
|
||||
auto reject_2 = fsm2.get_output();
|
||||
|
||||
raft::logger.trace("delivering first reject");
|
||||
deliver(routes, fsm2.id(), std::move(reject_1.messages));
|
||||
|
||||
// fsm1 sends append_entries with idx=1 to fsm2
|
||||
auto append_idx1 = fsm1.get_output();
|
||||
|
||||
raft::logger.trace("delivering append idx=1");
|
||||
deliver(routes, fsm1.id(), std::move(append_idx1.messages));
|
||||
|
||||
// fsm2 accepts the idx=1 append
|
||||
auto accept = fsm2.get_output();
|
||||
|
||||
raft::logger.trace("delivering accept for append idx=1");
|
||||
deliver(routes, fsm2.id(), std::move(accept.messages));
|
||||
|
||||
raft::logger.trace("delivering second reject");
|
||||
deliver(routes, fsm2.id(), std::move(reject_2.messages));
|
||||
}
|
||||
|
||||
@@ -133,6 +133,22 @@ bool compare_log_entries(raft::log& log1, raft::log& log2, size_t from, size_t t
|
||||
|
||||
using raft_routing_map = std::unordered_map<raft::server_id, raft::fsm*>;
|
||||
|
||||
bool deliver(raft_routing_map& routes, raft::server_id from, std::pair<raft::server_id, raft::rpc_message> m) {
|
||||
auto it = routes.find(m.first);
|
||||
if (it == routes.end()) {
|
||||
// Destination not available
|
||||
return false;
|
||||
}
|
||||
std::visit([from, &to = *it->second] (auto&& m) { to.step(from, std::move(m)); }, std::move(m.second));
|
||||
return true;
|
||||
}
|
||||
|
||||
void deliver(raft_routing_map& routes, raft::server_id from, std::vector<std::pair<raft::server_id, raft::rpc_message>> msgs) {
|
||||
for (auto& m: msgs) {
|
||||
deliver(routes, from, std::move(m));
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
communicate_impl(std::function<bool()> stop_pred, raft_routing_map& map) {
|
||||
// To enable tracing, set:
|
||||
@@ -150,15 +166,7 @@ communicate_impl(std::function<bool()> stop_pred, raft_routing_map& map) {
|
||||
}
|
||||
for (auto&& m : output.messages) {
|
||||
has_traffic = true;
|
||||
auto it = map.find(m.first);
|
||||
if (it == map.end()) {
|
||||
// The node is not available, drop the message
|
||||
continue;
|
||||
}
|
||||
raft::fsm& to = *(it->second);
|
||||
std::visit([&from, &to](auto&& m) { to.step(from.id(), std::move(m)); },
|
||||
std::move(m.second));
|
||||
if (stop_pred()) {
|
||||
if (deliver(map, from.id(), std::move(m)) && stop_pred()) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user