From 4c95277619a49c196a499dd23d2869de3fee1cda Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Fri, 23 Apr 2021 19:31:17 +0200 Subject: [PATCH] raft: fsm: fix assertion failure on stray rejects When probes are sent over a slow network, the leader would send multiple probes to a lagging follower before it would get a reject response to the first probe back. After getting a reject, the leader will be able to correctly position `next_idx` for that follower and switch to pipeline mode. Then, an out of order reject to a now irrelevant probe could crash the leader, since it would effectively request it to "rewind" its `match_idx` for that follower, and the code asserts this never happens. We fix the problem by strengthening `is_stray_reject`. The check that was previously only made in `PIPELINE` case (`rejected.non_matching_idx <= match_idx`) is now always performed and we add a new check: `rejected.last_idx < match_idx`. We also strengthen the assert. The commit improves the documentation by explaining that `is_stray_reject` may return false negatives. We also precisely state the preconditions and postconditions of `is_stray_reject`, give a more precise definition of `progress.match_idx`, argue how the postconditions of `is_stray_reject` follow from its preconditions and Raft invariants, and argue why the (strengthened) assert must always pass. Message-Id: <20210423173117.32939-1-kbraun@scylladb.com> --- raft/fsm.cc | 10 +++++-- raft/fsm.hh | 2 ++ raft/tracker.cc | 28 ++++++++++++++----- raft/tracker.hh | 20 ++++++++++---- test/raft/fsm_test.cc | 62 +++++++++++++++++++++++++++++++++++++++++++ test/raft/helpers.hh | 26 +++++++++++------- 6 files changed, 126 insertions(+), 22 deletions(-) diff --git a/raft/fsm.cc b/raft/fsm.cc index 09b5b1c07b..ff1f6574cb 100644 --- a/raft/fsm.cc +++ b/raft/fsm.cc @@ -594,6 +594,11 @@ void fsm::append_entries_reply(server_id from, append_reply&& reply) { return; } + // is_stray_reject may return a false negative so even if the check above passes, + // we may still be dealing with a stray reject. That's fine though; it is always safe + // to rollback next_idx on a reject and in fact that's what the Raft spec (TLA+) does. + // Detecting stray rejects is an optimization that should rarely even be needed. + // Start re-sending from the non matching index, or from // the last index in the follower's log. // FIXME: make it more efficient @@ -601,8 +606,9 @@ void fsm::append_entries_reply(server_id from, append_reply&& reply) { progress.become_probe(); - // We should not fail to apply an entry following the matched one. - assert(progress.next_idx != progress.match_idx); + // By `is_stray_reject(rejected) == false` we know that `rejected.non_matching_idx > progress.match_idx` + // and `rejected.last_idx + 1 > progress.match_idx`. By the assignment to `progress.next_idx` above, we get: + assert(progress.next_idx > progress.match_idx); } // We may have just applied a configuration that removes this diff --git a/raft/fsm.hh b/raft/fsm.hh index 96e6f5e0b3..98e745028a 100644 --- a/raft/fsm.hh +++ b/raft/fsm.hh @@ -257,6 +257,8 @@ private: void replicate_to(follower_progress& progress, bool allow_empty); void replicate(); void append_entries(server_id from, append_request&& append_request); + + // Precondition: `is_leader() && reply.current_term == _current_term` void append_entries_reply(server_id from, append_reply&& reply); void request_vote(server_id from, vote_request&& vote_request); diff --git a/raft/tracker.cc b/raft/tracker.cc index 4513667c70..a0ba2e0455 100644 --- a/raft/tracker.cc +++ b/raft/tracker.cc @@ -24,16 +24,32 @@ namespace raft { bool follower_progress::is_stray_reject(const append_reply::rejected& rejected) { + // By precondition, we are the leader and `rejected.current_term` is equal to our term. + // By definition of `match_idx` we know that at some point all entries up to and including + // `match_idx` were the same in our log and the follower's log; ... + if (rejected.non_matching_idx <= match_idx) { + // ... in particular, entry `rejected.non_matching_idx` (which is <= `match_idx`) at some point + // was the same in our log and the follower's log, but `rejected` claims they are different. + // A follower cannot change an entry unless it enters a different term, but `rejected.current_term` + // is equal to our term. Thus `rejected` must be stray. + return true; + } + if (rejected.last_idx < match_idx) { + // ... in particular, at some point the follower had to have an entry with index `rejected.last_idx + 1` + // (because `rejected.last_idx < match_idx implies rejected.last_idx + 1 <= match_idx) + // but `rejected` claims it doesn't have such entry now. + // A follower cannot truncate a suffix of its log unless it enters a different term, + // but `rejected.current_term` is equal to our term. Thus `rejected` must be stray. + return true; + } + switch (state) { case follower_progress::state::PIPELINE: - if (rejected.non_matching_idx <= match_idx) { - // If rejected index is smaller that matched it means this is a stray reply - return true; - } break; case follower_progress::state::PROBE: - // In the probe state the reply is only valid if it matches next_idx - 1, since only - // one append request is outstanding. + // In PROBE state we send a single append request `req` with `req.prev_log_idx == next_idx - 1`. + // When the follower generates a rejected response `r`, it sets `r.non_matching_idx = req.prev_log_idx`. + // Thus the reject either satisfies `rejected.non_matching_idx == next_idx - 1` or is stray. if (rejected.non_matching_idx != index_t(next_idx - 1)) { return true; } diff --git a/raft/tracker.hh b/raft/tracker.hh index 9148cd12e6..a830f7ffad 100644 --- a/raft/tracker.hh +++ b/raft/tracker.hh @@ -32,9 +32,12 @@ public: // Id of this server const server_id id; // Index of the next log entry to send to this server. + // Invariant: next_idx > match_idx. index_t next_idx; - // Index of the highest log entry known to be replicated to this - // server. + // Index of the highest log entry known to be replicated to this server. + // More specifically, this is the greatest `last_new_idx` received from this follower + // in an `accepted` message. As long as the follower remains in our term we know + // that its log must match with ours up to (and including) `match_idx`. index_t match_idx = index_t(0); // Index that we know to be committed by the follower index_t commit_idx = index_t(0); @@ -56,9 +59,16 @@ public: size_t in_flight = 0; static constexpr size_t max_in_flight = 10; - // check if a reject packet should be ignored because it was delayed - // or reordered - bool is_stray_reject(const append_reply::rejected&); + // Check if a reject packet should be ignored because it was delayed or reordered. + // This is not 100% accurate (may return false negatives) and should only be relied on + // for liveness optimizations, not for safety. + // + // Precondition: we are the leader and `r.current_term` is equal to our term (`_current_term`). + // Postcondition: if the function returns `false` it is guaranteed that: + // 1. `match_idx < r.non_matching_idx`. + // 2. `match_idx < r.last_idx + 1`. + // 3. If we're in PROBE mode then `next_idx == r.non_matching_idx + 1`. + bool is_stray_reject(const append_reply::rejected& r); void become_probe(); void become_pipeline(); diff --git a/test/raft/fsm_test.cc b/test/raft/fsm_test.cc index 50fb1ea6ed..994953f000 100644 --- a/test/raft/fsm_test.cc +++ b/test/raft/fsm_test.cc @@ -1451,3 +1451,65 @@ BOOST_AUTO_TEST_CASE(test_zero) { BOOST_CHECK_THROW(raft::configuration cfg(raft::server_address_set{raft::server_address{id}}), std::invalid_argument); BOOST_CHECK_THROW(create_follower(id, raft::log(raft::snapshot{})), std::invalid_argument); } + +BOOST_AUTO_TEST_CASE(test_reordered_reject) { + auto id1 = id(); + raft::fsm fsm1(id1, term_t{1}, server_id{}, + raft::log{raft::snapshot{.config = {{raft::server_address{id1.id}}}}}, + trivial_failure_detector, fsm_cfg); + + while (!fsm1.is_leader()) { + fsm1.tick(); + } + + fsm1.add_entry(log_entry::dummy{}); + (void)fsm1.get_output(); + + auto id2 = id(); + raft::fsm fsm2(id2, term_t{1}, server_id{}, + raft::log{raft::snapshot{.config = raft::configuration{}}}, + trivial_failure_detector, fsm_cfg); + + raft_routing_map routes{{fsm1.id(), &fsm1}, {fsm2.id(), &fsm2}}; + + fsm1.add_entry(raft::configuration{{raft::server_address{fsm1.id().id}, raft::server_address{fsm2.id().id}}}); + fsm1.tick(); + + // fsm1 sends append_entries with idx=2 to fsm2 + auto append_idx2_1 = fsm1.get_output(); + + fsm1.tick(); + + // fsm1 sends append_entries with idx=2 to fsm2 (again) + auto append_idx2_2 = fsm1.get_output(); + + raft::logger.trace("delivering first append idx=2"); + deliver(routes, fsm1.id(), std::move(append_idx2_1.messages)); + + // fsm2 rejects the first idx=2 append + auto reject_1 = fsm2.get_output(); + + raft::logger.trace("delivering second append idx=2"); + deliver(routes, fsm1.id(), std::move(append_idx2_2.messages)); + + // fsm2 rejects the second idx=2 append + auto reject_2 = fsm2.get_output(); + + raft::logger.trace("delivering first reject"); + deliver(routes, fsm2.id(), std::move(reject_1.messages)); + + // fsm1 sends append_entries with idx=1 to fsm2 + auto append_idx1 = fsm1.get_output(); + + raft::logger.trace("delivering append idx=1"); + deliver(routes, fsm1.id(), std::move(append_idx1.messages)); + + // fsm2 accepts the idx=1 append + auto accept = fsm2.get_output(); + + raft::logger.trace("delivering accept for append idx=1"); + deliver(routes, fsm2.id(), std::move(accept.messages)); + + raft::logger.trace("delivering second reject"); + deliver(routes, fsm2.id(), std::move(reject_2.messages)); +} diff --git a/test/raft/helpers.hh b/test/raft/helpers.hh index 225ef756d1..dea8ca8f78 100644 --- a/test/raft/helpers.hh +++ b/test/raft/helpers.hh @@ -133,6 +133,22 @@ bool compare_log_entries(raft::log& log1, raft::log& log2, size_t from, size_t t using raft_routing_map = std::unordered_map; +bool deliver(raft_routing_map& routes, raft::server_id from, std::pair m) { + auto it = routes.find(m.first); + if (it == routes.end()) { + // Destination not available + return false; + } + std::visit([from, &to = *it->second] (auto&& m) { to.step(from, std::move(m)); }, std::move(m.second)); + return true; +} + +void deliver(raft_routing_map& routes, raft::server_id from, std::vector> msgs) { + for (auto& m: msgs) { + deliver(routes, from, std::move(m)); + } +} + void communicate_impl(std::function stop_pred, raft_routing_map& map) { // To enable tracing, set: @@ -150,15 +166,7 @@ communicate_impl(std::function stop_pred, raft_routing_map& map) { } for (auto&& m : output.messages) { has_traffic = true; - auto it = map.find(m.first); - if (it == map.end()) { - // The node is not available, drop the message - continue; - } - raft::fsm& to = *(it->second); - std::visit([&from, &to](auto&& m) { to.step(from.id(), std::move(m)); }, - std::move(m.second)); - if (stop_pred()) { + if (deliver(map, from.id(), std::move(m)) && stop_pred()) { return; } }