raft: rafactor: remove duplicate code on retries delays

Introduce a templated function do_on_leader_with_retries,
use it in add_entries/modify_config/read_barrier. The
function implements the basic logic of retries with aborts
and leader changes handling, adds a delay between
iterations to protect against tight loops.
This commit is contained in:
Petr Gusev
2022-11-15 13:18:53 +04:00
parent 15cc1667d0
commit ae3e0e3627

View File

@@ -282,6 +282,15 @@ private:
std::optional<shared_promise<>> _tick_promise;
future<> wait_for_next_tick(seastar::abort_source* as);
// Call a function on a current leader until it returns stop_iteration::yes.
// Handles aborts and leader changes, adds a delay between
// iterations to protect against tight loops.
template <typename AsyncAction>
requires requires(server_id& leader, AsyncAction aa) {
{ aa(leader) } -> std::same_as<future<stop_iteration>>;
}
future<> do_on_leader_with_retries(seastar::abort_source* as, AsyncAction&& action);
friend std::ostream& operator<<(std::ostream& os, const server_impl& s);
};
@@ -523,23 +532,13 @@ future<add_entry_reply> server_impl::execute_add_entry(server_id from, command c
}
}
future<> server_impl::add_entry(command command, wait_type type, seastar::abort_source* as) {
if (command.size() > _config.max_command_size) {
logger.trace("[{}] add_entry command size exceeds the limit: {} > {}",
id(), command.size(), _config.max_command_size);
throw command_is_too_big_error(command.size(), _config.max_command_size);
}
_stats.add_command++;
server_id leader = _fsm->current_leader();
logger.trace("[{}] an entry is submitted", id());
if (!_config.enable_forwarding) {
if (leader != _id) {
throw not_a_leader{leader};
}
auto eid = co_await add_entry_on_leader(std::move(command), as);
co_return co_await wait_for_entry(eid, type, as);
}
server_id prev_leader{};
template <typename AsyncAction>
requires requires (server_id& leader, AsyncAction aa) {
{ aa(leader) } -> std::same_as<future<stop_iteration>>;
}
future<> server_impl::do_on_leader_with_retries(seastar::abort_source* as, AsyncAction&& action) {
server_id leader = _fsm->current_leader(), prev_leader{};
while (true) {
if (as && as->abort_requested()) {
throw request_aborted();
@@ -551,7 +550,7 @@ future<> server_impl::add_entry(command command, wait_type type, seastar::abort_
continue;
}
if (prev_leader && leader == prev_leader) {
// This is to protect against busy loop in case we didn't get
// This is to protect against tight loop in case we didn't get
// any new information about the current leader.
// This can happen if the server responds with a transient_error with
// an empty leader and the current node has not yet learned the new leader.
@@ -562,6 +561,30 @@ future<> server_impl::add_entry(command command, wait_type type, seastar::abort_
continue;
}
prev_leader = leader;
if (co_await action(leader) == stop_iteration::yes) {
break;
}
}
}
future<> server_impl::add_entry(command command, wait_type type, seastar::abort_source* as) {
if (command.size() > _config.max_command_size) {
logger.trace("[{}] add_entry command size exceeds the limit: {} > {}",
id(), command.size(), _config.max_command_size);
throw command_is_too_big_error(command.size(), _config.max_command_size);
}
_stats.add_command++;
logger.trace("[{}] an entry is submitted", id());
if (!_config.enable_forwarding) {
if (const auto leader = _fsm->current_leader(); leader != _id) {
throw not_a_leader{leader};
}
auto eid = co_await add_entry_on_leader(std::move(command), as);
co_return co_await wait_for_entry(eid, type, as);
}
co_await do_on_leader_with_retries(as, [&](server_id& leader) -> future<stop_iteration> {
auto reply = co_await [&]() -> future<add_entry_reply> {
if (leader == _id) {
logger.trace("[{}] an entry proceeds on a leader", id());
@@ -580,19 +603,21 @@ future<> server_impl::add_entry(command command, wait_type type, seastar::abort_
}
}();
if (std::holds_alternative<raft::entry_id>(reply)) {
co_return co_await wait_for_entry(std::get<raft::entry_id>(reply), type, as);
} else if (std::holds_alternative<raft::commit_status_unknown>(reply)) {
co_await wait_for_entry(std::get<raft::entry_id>(reply), type, as);
co_return stop_iteration::yes;
}
if (std::holds_alternative<raft::commit_status_unknown>(reply)) {
// It should be impossible to obtain `commit_status_unknown` here
// because neither `execute_add_entry` nor `send_add_entry` wait for the entry
// to be committed/applied.
on_internal_error(logger, "add_entry: `execute_add_entry` or `send_add_entry`"
" returned `commit_status_unknown`");
} else {
const auto e = std::get<transient_error>(reply);
logger.trace("[{}] got {}", _id, e);
leader = e.leader;
" returned `commit_status_unknown`");
}
}
const auto e = std::get<transient_error>(reply);
logger.trace("[{}] got {}", _id, e);
leader = e.leader;
co_return stop_iteration::no;
});
}
future<add_entry_reply> server_impl::execute_modify_config(server_id from,
@@ -657,8 +682,8 @@ future<add_entry_reply> server_impl::execute_modify_config(server_id from,
}
future<> server_impl::modify_config(std::vector<config_member> add, std::vector<server_id> del, seastar::abort_source* as) {
server_id leader = _fsm->current_leader();
if (!_config.enable_forwarding) {
const auto leader = _fsm->current_leader();
if (leader != _id) {
throw not_a_leader{leader};
}
@@ -668,29 +693,8 @@ future<> server_impl::modify_config(std::vector<config_member> add, std::vector<
}
throw raft::not_a_leader{_fsm->current_leader()};
}
server_id prev_leader{};
while (true) {
if (as && as->abort_requested()) {
throw request_aborted();
}
check_not_aborted();
if (leader == server_id{}) {
co_await wait_for_leader(as);
leader = _fsm->current_leader();
continue;
}
if (prev_leader && leader == prev_leader) {
// This is to protect against busy loop in case we didn't get
// any new information about the current leader.
// This can happen if the server responds with a transient_error with
// an empty leader and the current node has not yet learned the new leader.
// We neglect an excessive delay if the newly elected leader is the same as
// the previous one, this supposed to be a rare.
co_await wait_for_next_tick(as);
prev_leader = leader = server_id{};
continue;
}
prev_leader = leader;
co_await do_on_leader_with_retries(as, [&](server_id& leader) -> future<stop_iteration> {
auto reply = co_await [&]() -> future<add_entry_reply> {
if (leader == _id) {
// Make a copy since of the params since we may
@@ -711,15 +715,15 @@ future<> server_impl::modify_config(std::vector<config_member> add, std::vector<
// Do not wait for the entry locally. The reply means that the leader committed it,
// and there is no reason to wait for our local commit index to match.
// See also #9981.
co_return;
co_return stop_iteration::yes;
}
if (const auto e = std::get_if<raft::transient_error>(&reply)) {
logger.trace("[{}] got {}", _id, *e);
leader = e->leader;
} else {
throw std::get<raft::commit_status_unknown>(reply);
co_return stop_iteration::no;
}
}
throw std::get<raft::commit_status_unknown>(reply);
});
}
void server_impl::append_entries(server_id from, append_request append_request) {
@@ -1261,35 +1265,10 @@ future<read_barrier_reply> server_impl::get_read_idx(server_id leader, seastar::
}
future<> server_impl::read_barrier(seastar::abort_source* as) {
server_id leader = _fsm->current_leader();
logger.trace("[{}] read_barrier start", _id);
index_t read_idx;
server_id prev_leader{};
while (read_idx == index_t{}) {
if (as && as->abort_requested()) {
throw request_aborted();
}
check_not_aborted();
logger.trace("[{}] read_barrier forward to {}", _id, leader);
if (leader == server_id{}) {
co_await wait_for_leader(as);
leader = _fsm->current_leader();
continue;
}
if (prev_leader && leader == prev_leader) {
// This is to protect against busy loop in case we didn't get
// any new information about the current leader.
// This can happen if the server responds with a transient_error with
// an empty leader and the current node has not yet learned the new leader.
// We neglect an excessive delay if the newly elected leader is the same as
// the previous one, this supposed to be a rare.
co_await wait_for_next_tick(as);
prev_leader = leader = server_id{};
continue;
}
prev_leader = leader;
co_await do_on_leader_with_retries(as, [&](server_id& leader) -> future<stop_iteration> {
auto applied = _applied_idx;
read_barrier_reply res;
try {
@@ -1297,7 +1276,7 @@ future<> server_impl::read_barrier(seastar::abort_source* as) {
} catch (const transport_error& e) {
logger.trace("[{}] read_barrier on {} resulted in {}; retrying", _id, leader, e);
leader = server_id{};
continue;
co_return stop_iteration::no;
}
if (std::holds_alternative<std::monostate>(res)) {
// the leader is not ready to answer because it did not
@@ -1305,12 +1284,15 @@ future<> server_impl::read_barrier(seastar::abort_source* as) {
// committed (if non were since start of the attempt) and retry.
logger.trace("[{}] read_barrier leader not ready", _id);
co_await wait_for_apply(++applied, as);
} else if (std::holds_alternative<raft::not_a_leader>(res)) {
leader = std::get<not_a_leader>(res).leader;
} else {
read_idx = std::get<index_t>(res);
co_return stop_iteration::no;
}
}
if (std::holds_alternative<raft::not_a_leader>(res)) {
leader = std::get<not_a_leader>(res).leader;
co_return stop_iteration::no;
}
read_idx = std::get<index_t>(res);
co_return stop_iteration::yes;
});
logger.trace("[{}] read_barrier read index {}, append index {}", _id, read_idx, _applied_idx);
co_return co_await wait_for_apply(read_idx, as);