raft: make raft requests abortable

This patch adds an ability to pass abort_source to raft request APIs (
add_entry, modify_config) to make them abortable. A request issuer not
always want to wait for a request to complete. For instance because a
client disconnected or because it no longer interested in waiting
because of a timeout. After this patch it can now abort waiting for such
requests through an abort source. Note that aborting a request only
aborts the wait for it to complete, it does not mean that the request
will not be eventually executed.

Message-Id: <YjHivLfIB9Xj5F4g@scylladb.com>
This commit is contained in:
Gleb Natapov
2022-03-16 15:14:36 +02:00
committed by Tomasz Grabiec
parent a1d0f089c8
commit a1604aa388
8 changed files with 109 additions and 67 deletions

View File

@@ -44,10 +44,10 @@ fsm::fsm(server_id id, term_t current_term, server_id voted_for, log log,
failure_detector& failure_detector, fsm_config config) :
fsm(id, current_term, voted_for, std::move(log), index_t{0}, failure_detector, config) {}
future<> fsm::wait_max_log_size() {
future<> fsm::wait_max_log_size(seastar::abort_source* as) {
check_is_leader();
return leader_state().log_limiter_semaphore->wait();
return as ? leader_state().log_limiter_semaphore->wait(*as) : leader_state().log_limiter_semaphore->wait();
}
const configuration& fsm::get_configuration() const {

View File

@@ -385,7 +385,7 @@ public:
// Call this function to wait for the number of log entries to
// go below max_log_size.
future<> wait_max_log_size();
future<> wait_max_log_size(seastar::abort_source* as);
// Return current configuration. Throws if not a leader.
const configuration& get_configuration() const;

View File

@@ -257,6 +257,10 @@ struct no_other_voting_member : public error {
no_other_voting_member() : error("Cannot stepdown because there is no other voting member") {}
};
struct request_aborted : public error {
request_aborted() : error("Request is aborted by a caller") {}
};
// True if a failure to execute a Raft operation can be re-tried,
// perhaps with a different server.
inline bool is_transient_error(const std::exception& e) {
@@ -580,18 +584,18 @@ public:
virtual future<snapshot_reply> apply_snapshot(server_id from, install_snapshot snp) = 0;
// Try to execute read barrier, future resolves when the barrier is completed or error happens
virtual future<read_barrier_reply> execute_read_barrier(server_id from) = 0;
virtual future<read_barrier_reply> execute_read_barrier(server_id from, seastar::abort_source* as) = 0;
// An endpoint on the leader to add an entry to the raft log,
// as requested by a remote follower.
virtual future<add_entry_reply> execute_add_entry(server_id from, command cmd) = 0;
virtual future<add_entry_reply> execute_add_entry(server_id from, command cmd, seastar::abort_source* as) = 0;
// An endpoint on the leader to change configuration,
// as requested by a remote follower.
// If the future resolves successfully, a dummy entry was committed after the configuration change.
virtual future<add_entry_reply> execute_modify_config(server_id from,
std::vector<server_address> add,
std::vector<server_id> del) = 0;
std::vector<server_id> del, seastar::abort_source* as) = 0;
// Update RPC implementation with this client as
// the receiver of RPC input.

View File

@@ -32,6 +32,7 @@ struct active_read {
read_id id;
index_t idx;
seastar::promise<read_barrier_reply> promise;
optimized_optional<abort_source::subscription> abort;
};
static const seastar::metrics::label server_id_label("id");
@@ -56,21 +57,21 @@ public:
void timeout_now_request(server_id from, timeout_now timeout_now) override;
void read_quorum_request(server_id from, struct read_quorum read_quorum) override;
void read_quorum_reply(server_id from, struct read_quorum_reply read_quorum_reply) override;
future<read_barrier_reply> execute_read_barrier(server_id) override;
future<add_entry_reply> execute_add_entry(server_id from, command cmd) override;
future<read_barrier_reply> execute_read_barrier(server_id, seastar::abort_source* as) override;
future<add_entry_reply> execute_add_entry(server_id from, command cmd, seastar::abort_source* as) override;
future<add_entry_reply> execute_modify_config(server_id from,
std::vector<server_address> add, std::vector<server_id> del) override;
std::vector<server_address> add, std::vector<server_id> del, seastar::abort_source* as) override;
future<snapshot_reply> apply_snapshot(server_id from, install_snapshot snp) override;
// server interface
future<> add_entry(command command, wait_type type) override;
future<> set_configuration(server_address_set c_new) override;
future<> add_entry(command command, wait_type type, seastar::abort_source* as = nullptr) override;
future<> set_configuration(server_address_set c_new, seastar::abort_source* as = nullptr) override;
raft::configuration get_configuration() const override;
future<> start() override;
future<> abort() override;
term_t get_current_term() const override;
future<> read_barrier() override;
future<> read_barrier(seastar::abort_source* as = nullptr) override;
void wait_until_candidate() override;
future<> wait_election_done() override;
future<> wait_log_idx_term(std::pair<index_t, term_t> idx_log) override;
@@ -80,8 +81,8 @@ public:
void tick() override;
raft::server_id id() const override;
future<> stepdown(logical_clock::duration timeout) override;
future<> modify_config(std::vector<server_address> add, std::vector<server_id> del) override;
future<entry_id> add_entry_on_leader(command command);
future<> modify_config(std::vector<server_address> add, std::vector<server_id> del, seastar::abort_source* as = nullptr) override;
future<entry_id> add_entry_on_leader(command command, seastar::abort_source* as);
void register_metrics() override;
private:
std::unique_ptr<rpc> _rpc;
@@ -145,6 +146,7 @@ private:
struct op_status {
term_t term; // term the entry was added with
promise<> done; // notify when done here
optimized_optional<seastar::abort_source::subscription> abort; // abort subscription
};
// Entries that have a waiter that needs to be notified when the
@@ -205,7 +207,7 @@ private:
// in-memory log. This allows to avoid deadlocks when
// automatically appending a non-joint configuration
// and makes set_configuration() safe from preemption.
template <typename T> future<> add_entry_internal(T command, wait_type type);
template <typename T> future<> add_entry_internal(T command, wait_type type, seastar::abort_source* as);
template <typename Message> void send_message(server_id id, Message m);
// Abort all snapshot transfer.
@@ -248,20 +250,20 @@ private:
void remove_from_rpc_config(const server_address& srv);
// A helper to wait for a leader to get elected
future<> wait_for_leader();
future<> wait_for_leader(seastar::abort_source* as);
// Get "safe to read" index from a leader
future<read_barrier_reply> get_read_idx(server_id leader);
future<read_barrier_reply> get_read_idx(server_id leader, seastar::abort_source* as);
// Wait for an entry with a specific term to get committed or
// applied locally.
future<> wait_for_entry(entry_id eid, wait_type type);
future<> wait_for_entry(entry_id eid, wait_type type, seastar::abort_source* as);
// Wait for a read barrier index to be applied. The index
// is typically already committed, so we don't worry about the
// term.
future<> wait_for_apply(index_t idx);
// Set configuration but don't wait for transition joint ->
// non_joint.
future<> enter_joint_configuration(server_address_set c_new);
future<> enter_joint_configuration(server_address_set c_new, seastar::abort_source* as);
friend std::ostream& operator<<(std::ostream& os, const server_impl& s);
};
@@ -329,14 +331,14 @@ future<> server_impl::start() {
co_return;
}
future<> server_impl::wait_for_leader() {
future<> server_impl::wait_for_leader(seastar::abort_source* as) {
if (!_leader_promise) {
_leader_promise.emplace();
}
return _leader_promise->get_shared_future();
return as ? _leader_promise->get_shared_future(*as) : _leader_promise->get_shared_future();
}
future<> server_impl::wait_for_entry(entry_id eid, wait_type type) {
future<> server_impl::wait_for_entry(entry_id eid, wait_type type, seastar::abort_source* as) {
// The entry may have been already committed and even applied
// in case it was forwarded to the leader. In this case
// waiting for it is futile.
@@ -361,6 +363,10 @@ future<> server_impl::wait_for_entry(entry_id eid, wait_type type) {
throw stopped_error();
}
if (as && as->abort_requested()) {
throw request_aborted();
}
auto& container = type == wait_type::committed ? _awaited_commits : _awaited_applies;
logger.trace("[{}] waiting for entry {}.{}", id(), eid.term, eid.idx);
@@ -405,22 +411,29 @@ future<> server_impl::wait_for_entry(entry_id eid, wait_type type) {
}
}
assert(inserted);
if (as) {
it->second.abort = as->subscribe([it = it, &container] () noexcept {
it->second.done.set_exception(request_aborted());
container.erase(it);
});
assert(it->second.abort);
}
co_await it->second.done.get_future();
logger.trace("[{}] done waiting for {}.{}", id(), eid.term, eid.idx);
co_return;
}
template <typename T>
future<> server_impl::add_entry_internal(T command, wait_type type) {
future<> server_impl::add_entry_internal(T command, wait_type type, seastar::abort_source* as) {
// Must not preempt before adding entry, the caller, such as
// set_configuration(), relies on it.
const log_entry& e = _fsm->add_entry(std::move(command));
co_return co_await wait_for_entry({.term = e.term, .idx = e.idx}, type);
co_return co_await wait_for_entry({.term = e.term, .idx = e.idx}, type, as);
}
future<entry_id> server_impl::add_entry_on_leader(command cmd) {
future<entry_id> server_impl::add_entry_on_leader(command cmd, seastar::abort_source* as) {
// Wait for a new slot to become available
co_await _fsm->wait_max_log_size();
co_await _fsm->wait_max_log_size(as);
logger.trace("[{}] adding entry after log size limit check", id());
const log_entry& e = _fsm->add_entry(std::move(cmd));
@@ -428,7 +441,7 @@ future<entry_id> server_impl::add_entry_on_leader(command cmd) {
co_return entry_id{.term = e.term, .idx = e.idx};
}
future<add_entry_reply> server_impl::execute_add_entry(server_id from, command cmd) {
future<add_entry_reply> server_impl::execute_add_entry(server_id from, command cmd, seastar::abort_source* as) {
if (from != _id && !_fsm->get_configuration().contains(from)) {
// Do not accept entries from servers removed from the
// configuration.
@@ -436,13 +449,13 @@ future<add_entry_reply> server_impl::execute_add_entry(server_id from, command c
}
logger.trace("[{}] adding a forwarded entry from {}", id(), from);
try {
co_return add_entry_reply{co_await add_entry_on_leader(std::move(cmd))};
co_return add_entry_reply{co_await add_entry_on_leader(std::move(cmd), as)};
} catch (raft::not_a_leader& e) {
co_return add_entry_reply{e};
}
}
future<> server_impl::add_entry(command command, wait_type type) {
future<> server_impl::add_entry(command command, wait_type type, seastar::abort_source* as) {
_stats.add_command++;
server_id leader = _fsm->current_leader();
logger.trace("[{}] an entry is submitted", id());
@@ -450,13 +463,16 @@ future<> server_impl::add_entry(command command, wait_type type) {
if (leader != _id) {
throw not_a_leader{leader};
}
auto eid = co_await add_entry_on_leader(std::move(command));
co_return co_await wait_for_entry(eid, type);
auto eid = co_await add_entry_on_leader(std::move(command), as);
co_return co_await wait_for_entry(eid, type, as);
}
while (true) {
if (as && as->abort_requested()) {
throw request_aborted();
}
if (leader == server_id{}) {
logger.trace("[{}] the leader is unknown, waiting through uncertainty", id());
co_await wait_for_leader();
co_await wait_for_leader(as);
leader = _fsm->current_leader();
} else {
auto reply = co_await [&] {
@@ -464,14 +480,14 @@ future<> server_impl::add_entry(command command, wait_type type) {
logger.trace("[{}] an entry proceeds on a leader", id());
// Make a copy of the command since we may still
// retry and forward it.
return execute_add_entry(leader, command);
return execute_add_entry(leader, command, as);
} else {
logger.trace("[{}] forwarding the entry to {}", id(), leader);
return _rpc->send_add_entry(leader, command);
}
}();
if (std::holds_alternative<raft::entry_id>(reply)) {
co_return co_await wait_for_entry(std::get<raft::entry_id>(reply), type);
co_return co_await wait_for_entry(std::get<raft::entry_id>(reply), type, as);
}
leader = std::get<raft::not_a_leader>(reply).leader;
}
@@ -479,7 +495,7 @@ future<> server_impl::add_entry(command command, wait_type type) {
}
future<add_entry_reply> server_impl::execute_modify_config(server_id from,
std::vector<server_address> add, std::vector<server_id> del) {
std::vector<server_address> add, std::vector<server_id> del, seastar::abort_source* as) {
if (from != _id && !_fsm->get_configuration().contains(from)) {
// Do not accept entries from servers removed from the
@@ -509,10 +525,10 @@ future<add_entry_reply> server_impl::execute_modify_config(server_id from,
logger.trace("[{}] removing server {}", id(), to_remove);
cfg.erase(server_address{.id = to_remove});
}
co_await enter_joint_configuration(cfg);
co_await enter_joint_configuration(cfg, as);
const log_entry& e = _fsm->add_entry(log_entry::dummy());
auto eid = entry_id{.term = e.term, .idx = e.idx};
co_await wait_for_entry(eid, wait_type::committed);
co_await wait_for_entry(eid, wait_type::committed, as);
// `modify_config` doesn't actually need the entry id for anything
// but we reuse the `add_entry` RPC verb which requires it.
co_return add_entry_reply{eid};
@@ -530,28 +546,31 @@ future<add_entry_reply> server_impl::execute_modify_config(server_id from,
co_return add_entry_reply{not_a_leader{_fsm->current_leader()}};
}
future<> server_impl::modify_config(std::vector<server_address> add, std::vector<server_id> del) {
future<> server_impl::modify_config(std::vector<server_address> add, std::vector<server_id> del, seastar::abort_source* as) {
server_id leader = _fsm->current_leader();
if (!_config.enable_forwarding) {
if (leader != _id) {
throw not_a_leader{leader};
}
auto reply = co_await execute_modify_config(leader, std::move(add), std::move(del));
auto reply = co_await execute_modify_config(leader, std::move(add), std::move(del), as);
if (std::holds_alternative<raft::entry_id>(reply)) {
co_return;
}
throw raft::not_a_leader{_fsm->current_leader()};
}
while (true) {
if (as && as->abort_requested()) {
throw request_aborted();
}
if (leader == server_id{}) {
co_await wait_for_leader();
co_await wait_for_leader(as);
leader = _fsm->current_leader();
} else {
auto reply = co_await [&] {
if (leader == _id) {
// Make a copy since of the params since we may
// still retry and forward them.
return execute_modify_config(leader, add, del);
return execute_modify_config(leader, add, del, as);
} else {
logger.trace("[{}] forwarding the entry to {}", id(), leader);
return _rpc->send_modify_config(leader, add, del);
@@ -1025,7 +1044,7 @@ future<> server_impl::wait_for_apply(index_t idx) {
}
}
future<read_barrier_reply> server_impl::execute_read_barrier(server_id from) {
future<read_barrier_reply> server_impl::execute_read_barrier(server_id from, seastar::abort_source* as) {
logger.trace("[{}] execute_read_barrier start", _id);
std::optional<std::pair<read_id, index_t>> rid;
@@ -1040,32 +1059,46 @@ future<read_barrier_reply> server_impl::execute_read_barrier(server_id from) {
}
logger.trace("[{}] execute_read_barrier read id is {} for commit idx {}",
_id, rid->first, rid->second);
_reads.push_back({rid->first, rid->second, {}});
return _reads.back().promise.get_future();
if (as && as->abort_requested()) {
return make_exception_future<read_barrier_reply>(request_aborted());
}
_reads.push_back({rid->first, rid->second, {}, {}});
auto read = --_reads.end();
if (as) {
read->abort = as->subscribe([this, read] () noexcept {
read->promise.set_exception(request_aborted());
_reads.erase(read);
});
assert(read->abort);
}
return read->promise.get_future();
}
future<read_barrier_reply> server_impl::get_read_idx(server_id leader) {
future<read_barrier_reply> server_impl::get_read_idx(server_id leader, seastar::abort_source* as) {
if (_id == leader) {
return execute_read_barrier(_id);
return execute_read_barrier(_id, as);
} else {
return _rpc->execute_read_barrier_on_leader(leader);
}
}
future<> server_impl::read_barrier() {
future<> server_impl::read_barrier(seastar::abort_source* as) {
server_id leader = _fsm->current_leader();
logger.trace("[{}] read_barrier start", _id);
index_t read_idx;
while (read_idx == index_t{}) {
if (as && as->abort_requested()) {
throw request_aborted();
}
logger.trace("[{}] read_barrier forward to {}", _id, leader);
if (leader == server_id{}) {
co_await wait_for_leader();
co_await wait_for_leader(as);
leader = _fsm->current_leader();
} else {
auto applied = _applied_idx;
auto res = co_await get_read_idx(leader);
auto res = co_await get_read_idx(leader, as);
if (std::holds_alternative<std::monostate>(res)) {
// the leader is not ready to answer because it did not
// committed any entries yet, so wait for any entry to be
@@ -1163,7 +1196,7 @@ future<> server_impl::abort() {
co_await seastar::when_all_succeed(all_futures.begin(), all_futures.end()).discard_result();
}
future<> server_impl::enter_joint_configuration(server_address_set c_new) {
future<> server_impl::enter_joint_configuration(server_address_set c_new, seastar::abort_source* as) {
const auto& cfg = _fsm->get_configuration();
// 4.1 Cluster membership changes. Safety.
// When the leader receives a request to add or remove a server
@@ -1175,11 +1208,11 @@ future<> server_impl::enter_joint_configuration(server_address_set c_new) {
co_return;
}
_stats.add_config++;
co_await add_entry_internal(raft::configuration{std::move(c_new)}, wait_type::committed);
co_await add_entry_internal(raft::configuration{std::move(c_new)}, wait_type::committed, as);
}
future<> server_impl::set_configuration(server_address_set c_new) {
co_await enter_joint_configuration(std::move(c_new));
future<> server_impl::set_configuration(server_address_set c_new, seastar::abort_source* as) {
co_await enter_joint_configuration(std::move(c_new), as);
// Above we co_wait that the joint configuration is committed.
// Immediately, without yield, once the FSM discovers
// this, it appends non-joint entry. Hence,
@@ -1187,7 +1220,7 @@ future<> server_impl::set_configuration(server_address_set c_new) {
// By waiting for a follow up dummy to get committed, we
// automatically wait for the non-joint entry to get
// committed.
co_await add_entry_internal(log_entry::dummy(), wait_type::committed);
co_await add_entry_internal(log_entry::dummy(), wait_type::committed, as);
}
raft::configuration

View File

@@ -6,6 +6,7 @@
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#pragma once
#include <seastar/core/abort_source.hh>
#include "raft.hh"
namespace raft {
@@ -55,7 +56,7 @@ public:
// this means that the entry is committed/applied locally (depending on the wait type).
// Applied locally means the local state machine replica applied this command;
// committed locally means simply that the commit index is beyond this entry's index.
virtual future<> add_entry(command command, wait_type type) = 0;
virtual future<> add_entry(command command, wait_type type, seastar::abort_source* as = nullptr) = 0;
// Set a new cluster configuration. If the configuration is
// identical to the previous one does nothing.
@@ -79,7 +80,9 @@ public:
// Note: committing a dummy entry extends the opportunity for
// uncertainty, thus commit_status_unknown exception may be
// returned even in case of a successful config change.
virtual future<> set_configuration(server_address_set c_new) = 0;
//
// A caller may pass a pointer to an abort_source to make operation abortable.
virtual future<> set_configuration(server_address_set c_new, seastar::abort_source* as = nullptr) = 0;
// A simplified wrapper around set_configuration() which adds
// and deletes servers. Unlike set_configuration(),
@@ -99,8 +102,9 @@ public:
// The local commit index is not necessarily up-to-date yet and the state of the local state machine
// replica may still come from before the configuration entry.
// (exception: if no server was actually added or removed, then nothing gets committed and the leader responds immediately).
// A caller may pass a pointer to an abort_source to make operation abortable.
virtual future<> modify_config(std::vector<server_address> add,
std::vector<server_id> del) = 0;
std::vector<server_id> del, seastar::abort_source* as = nullptr) = 0;
// Return the currently known configuration
virtual raft::configuration get_configuration() const = 0;
@@ -123,7 +127,8 @@ public:
// May be called before attempting a read from the local state
// machine. The read should proceed only after the returned
// future has resolved successfully.
virtual future<> read_barrier() = 0;
// A caller may pass a pointer to an abort_source to make operation abortable.
virtual future<> read_barrier(seastar::abort_source* as = nullptr) = 0;
// Initiate leader stepdown process.
// If the node is not a leader returns not_a_leader exception.

View File

@@ -178,7 +178,7 @@ void raft_rpc::read_quorum_reply(raft::server_id from, raft::read_quorum_reply c
}
future<raft::read_barrier_reply> raft_rpc::execute_read_barrier(raft::server_id from) {
return _client->execute_read_barrier(from);
return _client->execute_read_barrier(from, nullptr);
}
future<raft::snapshot_reply> raft_rpc::apply_snapshot(raft::server_id from, raft::install_snapshot snp) {
@@ -190,13 +190,13 @@ future<raft::snapshot_reply> raft_rpc::apply_snapshot(raft::server_id from, raft
}
future<raft::add_entry_reply> raft_rpc::execute_add_entry(raft::server_id from, raft::command cmd) {
return _client->execute_add_entry(from, std::move(cmd));
return _client->execute_add_entry(from, std::move(cmd), nullptr);
}
future<raft::add_entry_reply> raft_rpc::execute_modify_config(raft::server_id from,
std::vector<raft::server_address> add,
std::vector<raft::server_id> del) {
return _client->execute_modify_config(from, std::move(add), std::move(del));
return _client->execute_modify_config(from, std::move(add), std::move(del), nullptr);
}
} // end of namespace service

View File

@@ -466,7 +466,7 @@ public:
++_read_barrier_executions;
(void)[] (rpc& self, raft::server_id src, execute_barrier_on_leader m, gate::holder holder) -> future<> {
try {
auto reply = co_await self._client->execute_read_barrier(src);
auto reply = co_await self._client->execute_read_barrier(src, nullptr);
self._send(src, execute_barrier_on_leader_reply{
.reply = std::move(reply),
@@ -498,7 +498,7 @@ public:
++_add_entry_executions;
(void)[] (rpc& self, raft::server_id src, add_entry_message m, gate::holder holder) -> future<> {
try {
auto reply = co_await self._client->execute_add_entry(src, std::move(m.cmd));
auto reply = co_await self._client->execute_add_entry(src, std::move(m.cmd), nullptr);
self._send(src, add_entry_reply_message{
.reply = std::move(reply),
@@ -530,7 +530,7 @@ public:
++_modify_config_executions;
(void)[] (rpc& self, raft::server_id src, modify_config_message m, gate::holder holder) -> future<> {
try {
auto reply = co_await self._client->execute_modify_config(src, std::move(m.add), std::move(m.del));
auto reply = co_await self._client->execute_modify_config(src, std::move(m.add), std::move(m.del), nullptr);
self._send(src, add_entry_reply_message{
.reply = std::move(reply),

View File

@@ -707,7 +707,7 @@ public:
if (!(*_connected)(id, _id)) {
return make_exception_future<raft::read_barrier_reply>(std::runtime_error("cannot send append since nodes are disconnected"));
}
return _net[id]->_client->execute_read_barrier(_id);
return _net[id]->_client->execute_read_barrier(_id, nullptr);
}
void check_known_and_connected(raft::server_id id) {
if (!_net.count(id)) {
@@ -719,13 +719,13 @@ public:
}
future<raft::add_entry_reply> send_add_entry(raft::server_id id, const raft::command& cmd) override {
check_known_and_connected(id);
return _net[id]->_client->execute_add_entry(_id, cmd);
return _net[id]->_client->execute_add_entry(_id, cmd, nullptr);
}
future<raft::add_entry_reply> send_modify_config(raft::server_id id,
const std::vector<raft::server_address>& add,
const std::vector<raft::server_id>& del) override {
check_known_and_connected(id);
return _net[id]->_client->execute_modify_config(_id, add, del);
return _net[id]->_client->execute_modify_config(_id, add, del, nullptr);
}
void add_server(raft::server_id id, bytes node_info) override {