diff --git a/idl/raft.idl.hh b/idl/raft.idl.hh index 0d9426d4b6..d1d84d77de 100644 --- a/idl/raft.idl.hh +++ b/idl/raft.idl.hh @@ -80,6 +80,11 @@ struct not_a_leader { raft::server_id leader; }; +struct transient_error { + sstring message(); + raft::server_id leader; +}; + struct commit_status_unknown { }; diff --git a/raft/raft.hh b/raft/raft.hh index 90038dfba1..7829e297eb 100644 --- a/raft/raft.hh +++ b/raft/raft.hh @@ -317,14 +317,6 @@ struct request_aborted : public error { request_aborted() : error("Request is aborted by a caller") {} }; -// True if a failure to execute a Raft operation can be re-tried, -// perhaps with a different server. -inline bool is_transient_error(const std::exception& e) { - return dynamic_cast(&e) || - dynamic_cast(&e) || - dynamic_cast(&e); -} - inline bool is_uncertainty(const std::exception& e) { return dynamic_cast(&e) || dynamic_cast(&e); @@ -450,12 +442,41 @@ struct entry_id { index_t idx; }; +// The execute_add_entry/execute_modify_config methods can return this error to signal +// that the request should be retried. +// The exception is only used internally for entry/config forwarding and should not be leaked to a user. +struct transient_error: public error { + // for IDL serialization + sstring message() const { + return what(); + } + // A leader that the client should use for retrying. + // Could be empty, if the new leader is not known. + // Client should wait for a new leader in this case. + server_id leader; + + explicit transient_error(const sstring& message, server_id leader) + : error(message) + , leader(leader) + { + } + + explicit transient_error(std::exception_ptr e, server_id leader) + : transient_error(format("Transient error: '{}'", e), leader) + { + } + + friend std::ostream& operator<<(std::ostream& os, const transient_error& e) { + return os << "transient_error, message: " << e.what() << ", leader: " << e.leader; + } +}; + // Response to add_entry or modify_config RPC. // Carries either entry id (the entry is not committed yet), -// not_a_leader (the entry is not added to Raft log), or, for +// transient_error (the entry is not added to Raft log), or, for // modify_config, commit_status_unknown (commit status is // unknown). -using add_entry_reply = std::variant; +using add_entry_reply = std::variant; // std::monostate {} if the leader cannot execute the barrier because // it did not commit any entries yet diff --git a/raft/server.cc b/raft/server.cc index 942291cdfa..6e12d501fd 100644 --- a/raft/server.cc +++ b/raft/server.cc @@ -278,6 +278,19 @@ private: void check_not_aborted(); void handle_background_error(const char* fiber_name); + // Triggered on the next tick, used to delay retries in add_entry, modify_config, read_barrier. + std::optional> _tick_promise; + future<> wait_for_next_tick(seastar::abort_source* as); + + // Call a function on a current leader until it returns stop_iteration::yes. + // Handles aborts and leader changes, adds a delay between + // iterations to protect against tight loops. + template + requires requires(server_id& leader, AsyncAction aa) { + { aa(leader) } -> std::same_as>; + } + future<> do_on_leader_with_retries(seastar::abort_source* as, AsyncAction&& action); + friend std::ostream& operator<<(std::ostream& os, const server_impl& s); }; @@ -358,6 +371,19 @@ future<> server_impl::start() { co_return; } +future<> server_impl::wait_for_next_tick(seastar::abort_source* as) { + check_not_aborted(); + + if (!_tick_promise) { + _tick_promise.emplace(); + } + try { + co_await (as ? _tick_promise->get_shared_future(*as) : _tick_promise->get_shared_future()); + } catch (abort_requested_exception&) { + throw request_aborted(); + } +} + future<> server_impl::wait_for_leader(seastar::abort_source* as) { if (_fsm->current_leader()) { co_return; @@ -495,13 +521,49 @@ future server_impl::execute_add_entry(server_id from, command c if (from != _id && !_fsm->get_configuration().contains(from)) { // Do not accept entries from servers removed from the // configuration. - co_return add_entry_reply{not_a_leader{server_id{}}}; + co_return add_entry_reply{transient_error{format("Add entry from {} was discarded since " + "it is not part of the configuration", from), {}}}; } logger.trace("[{}] adding a forwarded entry from {}", id(), from); try { co_return add_entry_reply{co_await add_entry_on_leader(std::move(cmd), as)}; } catch (raft::not_a_leader& e) { - co_return add_entry_reply{e}; + co_return add_entry_reply{transient_error{std::current_exception(), e.leader}}; + } +} + +template +requires requires (server_id& leader, AsyncAction aa) { + { aa(leader) } -> std::same_as>; +} +future<> server_impl::do_on_leader_with_retries(seastar::abort_source* as, AsyncAction&& action) { + server_id leader = _fsm->current_leader(), prev_leader{}; + + while (true) { + if (as && as->abort_requested()) { + throw request_aborted(); + } + check_not_aborted(); + if (leader == server_id{}) { + co_await wait_for_leader(as); + leader = _fsm->current_leader(); + continue; + } + if (prev_leader && leader == prev_leader) { + // This is to protect against tight loop in case we didn't get + // any new information about the current leader. + // This can happen if the server responds with a transient_error with + // an empty leader and the current node has not yet learned the new leader. + // We neglect an excessive delay if the newly elected leader is the same as + // the previous one, this supposed to be a rare. + co_await wait_for_next_tick(as); + prev_leader = leader = server_id{}; + continue; + } + prev_leader = leader; + if (co_await action(leader) == stop_iteration::yes) { + break; + } } } @@ -512,54 +574,50 @@ future<> server_impl::add_entry(command command, wait_type type, seastar::abort_ throw command_is_too_big_error(command.size(), _config.max_command_size); } _stats.add_command++; - server_id leader = _fsm->current_leader(); + logger.trace("[{}] an entry is submitted", id()); if (!_config.enable_forwarding) { - if (leader != _id) { + if (const auto leader = _fsm->current_leader(); leader != _id) { throw not_a_leader{leader}; } auto eid = co_await add_entry_on_leader(std::move(command), as); co_return co_await wait_for_entry(eid, type, as); } - while (true) { - if (as && as->abort_requested()) { - throw request_aborted(); - } - check_not_aborted(); - if (leader == server_id{}) { - co_await wait_for_leader(as); - leader = _fsm->current_leader(); - } else { - auto reply = co_await [&]() -> future { - if (leader == _id) { - logger.trace("[{}] an entry proceeds on a leader", id()); - // Make a copy of the command since we may still - // retry and forward it. - co_return co_await execute_add_entry(leader, command, as); - } else { - logger.trace("[{}] forwarding the entry to {}", id(), leader); - try { - co_return co_await _rpc->send_add_entry(leader, command); - } catch (const transport_error& e) { - logger.trace("[{}] send_add_entry on {} resulted in {}; " - "rethrow as commit_status_unknown", _id, leader, e); - throw raft::commit_status_unknown(); - } - } - }(); - if (std::holds_alternative(reply)) { - co_return co_await wait_for_entry(std::get(reply), type, as); - } else if (std::holds_alternative(reply)) { - // It should be impossible to obtain `commit_status_unknown` here - // because neither `execute_add_entry` nor `send_add_entry` wait for the entry - // to be committed/applied. - on_internal_error(logger, "add_entry: `execute_add_entry` or `send_add_entry`" - " returned `commit_status_unknown`"); + + co_await do_on_leader_with_retries(as, [&](server_id& leader) -> future { + auto reply = co_await [&]() -> future { + if (leader == _id) { + logger.trace("[{}] an entry proceeds on a leader", id()); + // Make a copy of the command since we may still + // retry and forward it. + co_return co_await execute_add_entry(leader, command, as); } else { - leader = std::get(reply).leader; + logger.trace("[{}] forwarding the entry to {}", id(), leader); + try { + co_return co_await _rpc->send_add_entry(leader, command); + } catch (const transport_error& e) { + logger.trace("[{}] send_add_entry on {} resulted in {}; " + "rethrow as commit_status_unknown", _id, leader, e); + throw raft::commit_status_unknown(); + } } + }(); + if (std::holds_alternative(reply)) { + co_await wait_for_entry(std::get(reply), type, as); + co_return stop_iteration::yes; } - } + if (std::holds_alternative(reply)) { + // It should be impossible to obtain `commit_status_unknown` here + // because neither `execute_add_entry` nor `send_add_entry` wait for the entry + // to be committed/applied. + on_internal_error(logger, "add_entry: `execute_add_entry` or `send_add_entry`" + " returned `commit_status_unknown`"); + } + const auto& e = std::get(reply); + logger.trace("[{}] got {}", _id, e); + leader = e.leader; + co_return stop_iteration::no; + }); } future server_impl::execute_modify_config(server_id from, @@ -568,7 +626,8 @@ future server_impl::execute_modify_config(server_id from, if (from != _id && !_fsm->get_configuration().contains(from)) { // Do not accept entries from servers removed from the // configuration. - co_return add_entry_reply{not_a_leader{server_id{}}}; + co_return add_entry_reply{transient_error{format("Modify config from {} was discarded since " + "it is not part of the configuration", from), {}}}; } try { // Wait for a new slot to become available @@ -608,16 +667,23 @@ future server_impl::execute_modify_config(server_id from, // information that the entry may already have been // committed in the return value. co_return add_entry_reply{commit_status_unknown()}; - } else if (!is_transient_error(e)) { - throw; } + if (const auto* ex = dynamic_cast(&e)) { + co_return add_entry_reply{transient_error{std::current_exception(), ex->leader}}; + } + if (const auto* ex = dynamic_cast(&e)) { + co_return add_entry_reply{transient_error{std::current_exception(), {}}}; + } + if (const auto* ex = dynamic_cast(&e)) { + co_return add_entry_reply{transient_error{std::current_exception(), {}}}; + } + throw; } - co_return add_entry_reply{not_a_leader{_fsm->current_leader()}}; } future<> server_impl::modify_config(std::vector add, std::vector del, seastar::abort_source* as) { - server_id leader = _fsm->current_leader(); if (!_config.enable_forwarding) { + const auto leader = _fsm->current_leader(); if (leader != _id) { throw not_a_leader{leader}; } @@ -627,44 +693,37 @@ future<> server_impl::modify_config(std::vector add, std::vector< } throw raft::not_a_leader{_fsm->current_leader()}; } - while (true) { - if (as && as->abort_requested()) { - throw request_aborted(); - } - check_not_aborted(); - if (leader == server_id{}) { - co_await wait_for_leader(as); - leader = _fsm->current_leader(); - } else { - auto reply = co_await [&]() -> future { - if (leader == _id) { - // Make a copy since of the params since we may - // still retry and forward them. - co_return co_await execute_modify_config(leader, add, del, as); - } else { - logger.trace("[{}] forwarding the entry to {}", id(), leader); - try { - co_return co_await _rpc->send_modify_config(leader, add, del); - } catch (const transport_error& e) { - logger.trace("[{}] send_modify_config on {} resulted in {}; " - "rethrow as commit_status_unknown", _id, leader, e); - throw raft::commit_status_unknown(); - } - } - }(); - if (std::holds_alternative(reply)) { - // Do not wait for the entry locally. The reply means that the leader committed it, - // and there is no reason to wait for our local commit index to match. - // See also #9981. - co_return; - } - if (auto nal = std::get_if(&reply)) { - leader = nal->leader; + + co_await do_on_leader_with_retries(as, [&](server_id& leader) -> future { + auto reply = co_await [&]() -> future { + if (leader == _id) { + // Make a copy since of the params since we may + // still retry and forward them. + co_return co_await execute_modify_config(leader, add, del, as); } else { - throw std::get(reply); + logger.trace("[{}] forwarding the entry to {}", id(), leader); + try { + co_return co_await _rpc->send_modify_config(leader, add, del); + } catch (const transport_error& e) { + logger.trace("[{}] send_modify_config on {} resulted in {}; " + "rethrow as commit_status_unknown", _id, leader, e); + throw raft::commit_status_unknown(); + } } + }(); + if (std::holds_alternative(reply)) { + // Do not wait for the entry locally. The reply means that the leader committed it, + // and there is no reason to wait for our local commit index to match. + // See also #9981. + co_return stop_iteration::yes; } - } + if (const auto e = std::get_if(&reply)) { + logger.trace("[{}] got {}", _id, *e); + leader = e->leader; + co_return stop_iteration::no; + } + throw std::get(reply); + }); } void server_impl::append_entries(server_id from, append_request append_request) { @@ -1206,48 +1265,34 @@ future server_impl::get_read_idx(server_id leader, seastar:: } future<> server_impl::read_barrier(seastar::abort_source* as) { - server_id leader = _fsm->current_leader(); - logger.trace("[{}] read_barrier start", _id); index_t read_idx; - while (read_idx == index_t{}) { - if (as && as->abort_requested()) { - throw request_aborted(); + co_await do_on_leader_with_retries(as, [&](server_id& leader) -> future { + auto applied = _applied_idx; + read_barrier_reply res; + try { + res = co_await get_read_idx(leader, as); + } catch (const transport_error& e) { + logger.trace("[{}] read_barrier on {} resulted in {}; retrying", _id, leader, e); + leader = server_id{}; + co_return stop_iteration::no; } - check_not_aborted(); - logger.trace("[{}] read_barrier forward to {}", _id, leader); - if (leader == server_id{}) { - co_await wait_for_leader(as); - leader = _fsm->current_leader(); - } else { - auto applied = _applied_idx; - read_barrier_reply res; - bool need_retry = false; - try { - res = co_await get_read_idx(leader, as); - } catch (const transport_error& e) { - logger.trace("[{}] read_barrier on {} resulted in {}; retrying", _id, leader, e); - need_retry = true; - } - if (need_retry) { - leader = server_id{}; - co_await yield(); - continue; - } - if (std::holds_alternative(res)) { - // the leader is not ready to answer because it did not - // committed any entries yet, so wait for any entry to be - // committed (if non were since start of the attempt) and retry. - logger.trace("[{}] read_barrier leader not ready", _id); - co_await wait_for_apply(++applied, as); - } else if (std::holds_alternative(res)) { - leader = std::get(res).leader; - } else { - read_idx = std::get(res); - } + if (std::holds_alternative(res)) { + // the leader is not ready to answer because it did not + // committed any entries yet, so wait for any entry to be + // committed (if non were since start of the attempt) and retry. + logger.trace("[{}] read_barrier leader not ready", _id); + co_await wait_for_apply(++applied, as); + co_return stop_iteration::no; } - } + if (std::holds_alternative(res)) { + leader = std::get(res).leader; + co_return stop_iteration::no; + } + read_idx = std::get(res); + co_return stop_iteration::yes; + }); logger.trace("[{}] read_barrier read index {}, append index {}", _id, read_idx, _applied_idx); co_return co_await wait_for_apply(read_idx, as); @@ -1342,6 +1387,9 @@ future<> server_impl::abort(sstring reason) { if (_leader_promise) { _leader_promise->set_exception(stopped_error(*_aborted)); } + if (_tick_promise) { + _tick_promise->set_exception(stopped_error(*_aborted)); + } abort_snapshot_transfers(); @@ -1524,6 +1572,10 @@ void server_impl::elapse_election() { void server_impl::tick() { _fsm->tick(); + + if (_tick_promise && !_aborted) { + std::exchange(_tick_promise, std::nullopt)->set_value(); + } } raft::server_id server_impl::id() const {