Merge 'raft: wait for the next tick before retrying' from Gusev Petr

When `modify_config` or `add_entry` is forwarded to the leader, it may
reach the node at "inappropriate" time and result in an exception. There
are two reasons for it - the leader is changing and, in case of
`modify_config`, other `modify_config` is currently in progress. In both
cases the command is retried, but before this patch there was no delay
before retrying, which could led to a tight loop.

The patch adds a new exception type `transient_error`. When the client
receives it, it is obliged to retry the request after some delay.
Previously leader-side exceptions were converted to `not_a_leader`,
which is strange, especially for `conf_change_in_progress`.

Fixes: #11564

Closes #11769

* github.com:scylladb/scylladb:
  raft: rafactor: remove duplicate code on retries delays
  raft: use wait_for_next_tick in read_barrier
  raft: wait for the next tick before retrying
This commit is contained in:
Kamil Braun
2022-11-16 18:18:02 +01:00
3 changed files with 205 additions and 127 deletions

View File

@@ -80,6 +80,11 @@ struct not_a_leader {
raft::server_id leader;
};
struct transient_error {
sstring message();
raft::server_id leader;
};
struct commit_status_unknown {
};

View File

@@ -317,14 +317,6 @@ struct request_aborted : public error {
request_aborted() : error("Request is aborted by a caller") {}
};
// True if a failure to execute a Raft operation can be re-tried,
// perhaps with a different server.
inline bool is_transient_error(const std::exception& e) {
return dynamic_cast<const not_a_leader*>(&e) ||
dynamic_cast<const dropped_entry*>(&e) ||
dynamic_cast<const conf_change_in_progress*>(&e);
}
inline bool is_uncertainty(const std::exception& e) {
return dynamic_cast<const commit_status_unknown*>(&e) ||
dynamic_cast<const stopped_error*>(&e);
@@ -450,12 +442,41 @@ struct entry_id {
index_t idx;
};
// The execute_add_entry/execute_modify_config methods can return this error to signal
// that the request should be retried.
// The exception is only used internally for entry/config forwarding and should not be leaked to a user.
struct transient_error: public error {
// for IDL serialization
sstring message() const {
return what();
}
// A leader that the client should use for retrying.
// Could be empty, if the new leader is not known.
// Client should wait for a new leader in this case.
server_id leader;
explicit transient_error(const sstring& message, server_id leader)
: error(message)
, leader(leader)
{
}
explicit transient_error(std::exception_ptr e, server_id leader)
: transient_error(format("Transient error: '{}'", e), leader)
{
}
friend std::ostream& operator<<(std::ostream& os, const transient_error& e) {
return os << "transient_error, message: " << e.what() << ", leader: " << e.leader;
}
};
// Response to add_entry or modify_config RPC.
// Carries either entry id (the entry is not committed yet),
// not_a_leader (the entry is not added to Raft log), or, for
// transient_error (the entry is not added to Raft log), or, for
// modify_config, commit_status_unknown (commit status is
// unknown).
using add_entry_reply = std::variant<entry_id, not_a_leader, commit_status_unknown>;
using add_entry_reply = std::variant<entry_id, transient_error, commit_status_unknown>;
// std::monostate {} if the leader cannot execute the barrier because
// it did not commit any entries yet

View File

@@ -278,6 +278,19 @@ private:
void check_not_aborted();
void handle_background_error(const char* fiber_name);
// Triggered on the next tick, used to delay retries in add_entry, modify_config, read_barrier.
std::optional<shared_promise<>> _tick_promise;
future<> wait_for_next_tick(seastar::abort_source* as);
// Call a function on a current leader until it returns stop_iteration::yes.
// Handles aborts and leader changes, adds a delay between
// iterations to protect against tight loops.
template <typename AsyncAction>
requires requires(server_id& leader, AsyncAction aa) {
{ aa(leader) } -> std::same_as<future<stop_iteration>>;
}
future<> do_on_leader_with_retries(seastar::abort_source* as, AsyncAction&& action);
friend std::ostream& operator<<(std::ostream& os, const server_impl& s);
};
@@ -358,6 +371,19 @@ future<> server_impl::start() {
co_return;
}
future<> server_impl::wait_for_next_tick(seastar::abort_source* as) {
check_not_aborted();
if (!_tick_promise) {
_tick_promise.emplace();
}
try {
co_await (as ? _tick_promise->get_shared_future(*as) : _tick_promise->get_shared_future());
} catch (abort_requested_exception&) {
throw request_aborted();
}
}
future<> server_impl::wait_for_leader(seastar::abort_source* as) {
if (_fsm->current_leader()) {
co_return;
@@ -495,13 +521,49 @@ future<add_entry_reply> server_impl::execute_add_entry(server_id from, command c
if (from != _id && !_fsm->get_configuration().contains(from)) {
// Do not accept entries from servers removed from the
// configuration.
co_return add_entry_reply{not_a_leader{server_id{}}};
co_return add_entry_reply{transient_error{format("Add entry from {} was discarded since "
"it is not part of the configuration", from), {}}};
}
logger.trace("[{}] adding a forwarded entry from {}", id(), from);
try {
co_return add_entry_reply{co_await add_entry_on_leader(std::move(cmd), as)};
} catch (raft::not_a_leader& e) {
co_return add_entry_reply{e};
co_return add_entry_reply{transient_error{std::current_exception(), e.leader}};
}
}
template <typename AsyncAction>
requires requires (server_id& leader, AsyncAction aa) {
{ aa(leader) } -> std::same_as<future<stop_iteration>>;
}
future<> server_impl::do_on_leader_with_retries(seastar::abort_source* as, AsyncAction&& action) {
server_id leader = _fsm->current_leader(), prev_leader{};
while (true) {
if (as && as->abort_requested()) {
throw request_aborted();
}
check_not_aborted();
if (leader == server_id{}) {
co_await wait_for_leader(as);
leader = _fsm->current_leader();
continue;
}
if (prev_leader && leader == prev_leader) {
// This is to protect against tight loop in case we didn't get
// any new information about the current leader.
// This can happen if the server responds with a transient_error with
// an empty leader and the current node has not yet learned the new leader.
// We neglect an excessive delay if the newly elected leader is the same as
// the previous one, this supposed to be a rare.
co_await wait_for_next_tick(as);
prev_leader = leader = server_id{};
continue;
}
prev_leader = leader;
if (co_await action(leader) == stop_iteration::yes) {
break;
}
}
}
@@ -512,54 +574,50 @@ future<> server_impl::add_entry(command command, wait_type type, seastar::abort_
throw command_is_too_big_error(command.size(), _config.max_command_size);
}
_stats.add_command++;
server_id leader = _fsm->current_leader();
logger.trace("[{}] an entry is submitted", id());
if (!_config.enable_forwarding) {
if (leader != _id) {
if (const auto leader = _fsm->current_leader(); leader != _id) {
throw not_a_leader{leader};
}
auto eid = co_await add_entry_on_leader(std::move(command), as);
co_return co_await wait_for_entry(eid, type, as);
}
while (true) {
if (as && as->abort_requested()) {
throw request_aborted();
}
check_not_aborted();
if (leader == server_id{}) {
co_await wait_for_leader(as);
leader = _fsm->current_leader();
} else {
auto reply = co_await [&]() -> future<add_entry_reply> {
if (leader == _id) {
logger.trace("[{}] an entry proceeds on a leader", id());
// Make a copy of the command since we may still
// retry and forward it.
co_return co_await execute_add_entry(leader, command, as);
} else {
logger.trace("[{}] forwarding the entry to {}", id(), leader);
try {
co_return co_await _rpc->send_add_entry(leader, command);
} catch (const transport_error& e) {
logger.trace("[{}] send_add_entry on {} resulted in {}; "
"rethrow as commit_status_unknown", _id, leader, e);
throw raft::commit_status_unknown();
}
}
}();
if (std::holds_alternative<raft::entry_id>(reply)) {
co_return co_await wait_for_entry(std::get<raft::entry_id>(reply), type, as);
} else if (std::holds_alternative<raft::commit_status_unknown>(reply)) {
// It should be impossible to obtain `commit_status_unknown` here
// because neither `execute_add_entry` nor `send_add_entry` wait for the entry
// to be committed/applied.
on_internal_error(logger, "add_entry: `execute_add_entry` or `send_add_entry`"
" returned `commit_status_unknown`");
co_await do_on_leader_with_retries(as, [&](server_id& leader) -> future<stop_iteration> {
auto reply = co_await [&]() -> future<add_entry_reply> {
if (leader == _id) {
logger.trace("[{}] an entry proceeds on a leader", id());
// Make a copy of the command since we may still
// retry and forward it.
co_return co_await execute_add_entry(leader, command, as);
} else {
leader = std::get<raft::not_a_leader>(reply).leader;
logger.trace("[{}] forwarding the entry to {}", id(), leader);
try {
co_return co_await _rpc->send_add_entry(leader, command);
} catch (const transport_error& e) {
logger.trace("[{}] send_add_entry on {} resulted in {}; "
"rethrow as commit_status_unknown", _id, leader, e);
throw raft::commit_status_unknown();
}
}
}();
if (std::holds_alternative<raft::entry_id>(reply)) {
co_await wait_for_entry(std::get<raft::entry_id>(reply), type, as);
co_return stop_iteration::yes;
}
}
if (std::holds_alternative<raft::commit_status_unknown>(reply)) {
// It should be impossible to obtain `commit_status_unknown` here
// because neither `execute_add_entry` nor `send_add_entry` wait for the entry
// to be committed/applied.
on_internal_error(logger, "add_entry: `execute_add_entry` or `send_add_entry`"
" returned `commit_status_unknown`");
}
const auto& e = std::get<transient_error>(reply);
logger.trace("[{}] got {}", _id, e);
leader = e.leader;
co_return stop_iteration::no;
});
}
future<add_entry_reply> server_impl::execute_modify_config(server_id from,
@@ -568,7 +626,8 @@ future<add_entry_reply> server_impl::execute_modify_config(server_id from,
if (from != _id && !_fsm->get_configuration().contains(from)) {
// Do not accept entries from servers removed from the
// configuration.
co_return add_entry_reply{not_a_leader{server_id{}}};
co_return add_entry_reply{transient_error{format("Modify config from {} was discarded since "
"it is not part of the configuration", from), {}}};
}
try {
// Wait for a new slot to become available
@@ -608,16 +667,23 @@ future<add_entry_reply> server_impl::execute_modify_config(server_id from,
// information that the entry may already have been
// committed in the return value.
co_return add_entry_reply{commit_status_unknown()};
} else if (!is_transient_error(e)) {
throw;
}
if (const auto* ex = dynamic_cast<const not_a_leader*>(&e)) {
co_return add_entry_reply{transient_error{std::current_exception(), ex->leader}};
}
if (const auto* ex = dynamic_cast<const dropped_entry*>(&e)) {
co_return add_entry_reply{transient_error{std::current_exception(), {}}};
}
if (const auto* ex = dynamic_cast<const conf_change_in_progress*>(&e)) {
co_return add_entry_reply{transient_error{std::current_exception(), {}}};
}
throw;
}
co_return add_entry_reply{not_a_leader{_fsm->current_leader()}};
}
future<> server_impl::modify_config(std::vector<config_member> add, std::vector<server_id> del, seastar::abort_source* as) {
server_id leader = _fsm->current_leader();
if (!_config.enable_forwarding) {
const auto leader = _fsm->current_leader();
if (leader != _id) {
throw not_a_leader{leader};
}
@@ -627,44 +693,37 @@ future<> server_impl::modify_config(std::vector<config_member> add, std::vector<
}
throw raft::not_a_leader{_fsm->current_leader()};
}
while (true) {
if (as && as->abort_requested()) {
throw request_aborted();
}
check_not_aborted();
if (leader == server_id{}) {
co_await wait_for_leader(as);
leader = _fsm->current_leader();
} else {
auto reply = co_await [&]() -> future<add_entry_reply> {
if (leader == _id) {
// Make a copy since of the params since we may
// still retry and forward them.
co_return co_await execute_modify_config(leader, add, del, as);
} else {
logger.trace("[{}] forwarding the entry to {}", id(), leader);
try {
co_return co_await _rpc->send_modify_config(leader, add, del);
} catch (const transport_error& e) {
logger.trace("[{}] send_modify_config on {} resulted in {}; "
"rethrow as commit_status_unknown", _id, leader, e);
throw raft::commit_status_unknown();
}
}
}();
if (std::holds_alternative<raft::entry_id>(reply)) {
// Do not wait for the entry locally. The reply means that the leader committed it,
// and there is no reason to wait for our local commit index to match.
// See also #9981.
co_return;
}
if (auto nal = std::get_if<raft::not_a_leader>(&reply)) {
leader = nal->leader;
co_await do_on_leader_with_retries(as, [&](server_id& leader) -> future<stop_iteration> {
auto reply = co_await [&]() -> future<add_entry_reply> {
if (leader == _id) {
// Make a copy since of the params since we may
// still retry and forward them.
co_return co_await execute_modify_config(leader, add, del, as);
} else {
throw std::get<raft::commit_status_unknown>(reply);
logger.trace("[{}] forwarding the entry to {}", id(), leader);
try {
co_return co_await _rpc->send_modify_config(leader, add, del);
} catch (const transport_error& e) {
logger.trace("[{}] send_modify_config on {} resulted in {}; "
"rethrow as commit_status_unknown", _id, leader, e);
throw raft::commit_status_unknown();
}
}
}();
if (std::holds_alternative<raft::entry_id>(reply)) {
// Do not wait for the entry locally. The reply means that the leader committed it,
// and there is no reason to wait for our local commit index to match.
// See also #9981.
co_return stop_iteration::yes;
}
}
if (const auto e = std::get_if<raft::transient_error>(&reply)) {
logger.trace("[{}] got {}", _id, *e);
leader = e->leader;
co_return stop_iteration::no;
}
throw std::get<raft::commit_status_unknown>(reply);
});
}
void server_impl::append_entries(server_id from, append_request append_request) {
@@ -1206,48 +1265,34 @@ future<read_barrier_reply> server_impl::get_read_idx(server_id leader, seastar::
}
future<> server_impl::read_barrier(seastar::abort_source* as) {
server_id leader = _fsm->current_leader();
logger.trace("[{}] read_barrier start", _id);
index_t read_idx;
while (read_idx == index_t{}) {
if (as && as->abort_requested()) {
throw request_aborted();
co_await do_on_leader_with_retries(as, [&](server_id& leader) -> future<stop_iteration> {
auto applied = _applied_idx;
read_barrier_reply res;
try {
res = co_await get_read_idx(leader, as);
} catch (const transport_error& e) {
logger.trace("[{}] read_barrier on {} resulted in {}; retrying", _id, leader, e);
leader = server_id{};
co_return stop_iteration::no;
}
check_not_aborted();
logger.trace("[{}] read_barrier forward to {}", _id, leader);
if (leader == server_id{}) {
co_await wait_for_leader(as);
leader = _fsm->current_leader();
} else {
auto applied = _applied_idx;
read_barrier_reply res;
bool need_retry = false;
try {
res = co_await get_read_idx(leader, as);
} catch (const transport_error& e) {
logger.trace("[{}] read_barrier on {} resulted in {}; retrying", _id, leader, e);
need_retry = true;
}
if (need_retry) {
leader = server_id{};
co_await yield();
continue;
}
if (std::holds_alternative<std::monostate>(res)) {
// the leader is not ready to answer because it did not
// committed any entries yet, so wait for any entry to be
// committed (if non were since start of the attempt) and retry.
logger.trace("[{}] read_barrier leader not ready", _id);
co_await wait_for_apply(++applied, as);
} else if (std::holds_alternative<raft::not_a_leader>(res)) {
leader = std::get<not_a_leader>(res).leader;
} else {
read_idx = std::get<index_t>(res);
}
if (std::holds_alternative<std::monostate>(res)) {
// the leader is not ready to answer because it did not
// committed any entries yet, so wait for any entry to be
// committed (if non were since start of the attempt) and retry.
logger.trace("[{}] read_barrier leader not ready", _id);
co_await wait_for_apply(++applied, as);
co_return stop_iteration::no;
}
}
if (std::holds_alternative<raft::not_a_leader>(res)) {
leader = std::get<not_a_leader>(res).leader;
co_return stop_iteration::no;
}
read_idx = std::get<index_t>(res);
co_return stop_iteration::yes;
});
logger.trace("[{}] read_barrier read index {}, append index {}", _id, read_idx, _applied_idx);
co_return co_await wait_for_apply(read_idx, as);
@@ -1342,6 +1387,9 @@ future<> server_impl::abort(sstring reason) {
if (_leader_promise) {
_leader_promise->set_exception(stopped_error(*_aborted));
}
if (_tick_promise) {
_tick_promise->set_exception(stopped_error(*_aborted));
}
abort_snapshot_transfers();
@@ -1524,6 +1572,10 @@ void server_impl::elapse_election() {
void server_impl::tick() {
_fsm->tick();
if (_tick_promise && !_aborted) {
std::exchange(_tick_promise, std::nullopt)->set_value();
}
}
raft::server_id server_impl::id() const {