service: raft: rpc: don't call execute... functions after abort()

The functions are called from RPC when a follower forwards a request to
a leader (`add_entry`, `modify_config`, `read_barrier`). The call may be
attempted during shutdown. The Raft shutdown code cleans up data structures
created by those requests. Make sure that they are not updated
concurrently with shutdown. This can lead to problems such as using the
server object after it was aborted, or even destroyed.

After this change, the RPC implementation may wait for a `execute_modify_config`
call to finish before finishing abort. That call in turn may be stuck on
`wait_for_entry`. Thus the waiter may prevent RPC from aborting. Fix
this be moving the wait on the future returned from `_rpc->abort()` in
`server::abort()` until after waiters were destroyed.
This commit is contained in:
Kamil Braun
2022-05-10 18:36:18 +02:00
parent 5e06d0ad6f
commit 4767b163ef
3 changed files with 34 additions and 12 deletions

View File

@@ -550,7 +550,8 @@ public:
// unfinished send operation may return an error after this
// function is called.
//
// The implementation must ensure that `_client->apply_snapshot(...)` is not called
// The implementation must ensure that `_client->apply_snapshot`, `_client->execute_add_entry`,
// `_client->execute_modify_config` and `_client->execute_read_barrier` are not called
// after `abort()` is called (even before `abort()` future resolves).
virtual future<> abort() = 0;
private:

View File

@@ -1172,8 +1172,9 @@ future<> server_impl::abort() {
_apply_entries.abort(std::make_exception_ptr(stop_apply_fiber()));
co_await seastar::when_all_succeed(std::move(_io_status), std::move(_applier_status)).discard_result();
// Start RPC abort before aborting snapshot applications.
// After calling `_rpc->abort()` no new applications should be started (see `rpc::abort()` comment).
// Start RPC abort before aborting snapshot applications or destroying entry waiters.
// After calling `_rpc->abort()` no new snapshot applications should be started or new waiters created
// (see `rpc::abort()` comment and `_aborted` flag).
auto abort_rpc = _rpc->abort();
auto abort_sm = _state_machine->abort();
auto abort_persistence = _persistence->abort();
@@ -1184,8 +1185,11 @@ future<> server_impl::abort() {
f.set_exception(std::runtime_error("Snapshot application aborted"));
}
co_await seastar::when_all_succeed(std::move(abort_rpc), std::move(abort_sm), std::move(abort_persistence)).discard_result();
// Destroy entry waiters before waiting for `abort_rpc`,
// since the RPC implementation may wait for forwarded `modify_config` calls to finish
// (and `modify_config` does not finish until the configuration entry is committed or an error occurs).
// FIXME: probably need to do the same with read barriers (`_reads`)
// (not doing it yet because I want to catch the problem first in nemesis test)
for (auto& ac: _awaited_commits) {
ac.second.done.set_exception(stopped_error());
}
@@ -1194,6 +1198,9 @@ future<> server_impl::abort() {
}
_awaited_commits.clear();
_awaited_applies.clear();
co_await seastar::when_all_succeed(std::move(abort_rpc), std::move(abort_sm), std::move(abort_persistence)).discard_result();
if (_leader_promise) {
_leader_promise->set_exception(stopped_error());
}

View File

@@ -184,26 +184,40 @@ void raft_rpc::read_quorum_reply(raft::server_id from, raft::read_quorum_reply c
_client->read_quorum_reply(from, check_quorum_reply);
}
// Simple helper that throws `raft::stopped_error` instead of `gate_closed_exception` on shutdown.
template <typename F>
auto raft_with_gate(gate& g, F&& f) -> decltype(f()) {
if (g.is_closed()) {
throw raft::stopped_error{};
}
return with_gate(g, std::forward<F>(f));
}
future<raft::read_barrier_reply> raft_rpc::execute_read_barrier(raft::server_id from) {
return _client->execute_read_barrier(from, nullptr);
return raft_with_gate(_shutdown_gate, [&] {
return _client->execute_read_barrier(from, nullptr);
});
}
future<raft::snapshot_reply> raft_rpc::apply_snapshot(raft::server_id from, raft::install_snapshot snp) {
co_await _sm.transfer_snapshot(_address_map.get_inet_address(from), snp.snp);
if (_shutdown_gate.is_closed()) {
throw raft::stopped_error{};
}
co_return co_await _client->apply_snapshot(from, std::move(snp));
co_return co_await raft_with_gate(_shutdown_gate, [&] {
return _client->apply_snapshot(from, std::move(snp));
});
}
future<raft::add_entry_reply> raft_rpc::execute_add_entry(raft::server_id from, raft::command cmd) {
return _client->execute_add_entry(from, std::move(cmd), nullptr);
return raft_with_gate(_shutdown_gate, [&] {
return _client->execute_add_entry(from, std::move(cmd), nullptr);
});
}
future<raft::add_entry_reply> raft_rpc::execute_modify_config(raft::server_id from,
std::vector<raft::server_address> add,
std::vector<raft::server_id> del) {
return _client->execute_modify_config(from, std::move(add), std::move(del), nullptr);
return raft_with_gate(_shutdown_gate, [&] {
return _client->execute_modify_config(from, std::move(add), std::move(del), nullptr);
});
}
} // end of namespace service