raft: Add descriptions for requested abort errors

Fixes: scylladb/scylladb#18902

This PR is intended to make debugging easier, hence backporting it to
previous versions shall be useful while debugging issues there

(cherry picked from commit a616f10)

For fixing the backport,  parentheses () were added after variable captures
in lambdas, absence of which wasn't supported in earlier versions of C++.

Closes scylladb/scylladb#20564
This commit is contained in:
Abhi
2024-08-26 14:21:11 +05:30
committed by Botond Dénes
parent 2f3f734cfb
commit d8a71ca6db
4 changed files with 51 additions and 18 deletions

View File

@@ -356,6 +356,15 @@ public:
bool is_candidate() const {
return std::holds_alternative<candidate>(_state);
}
std::string_view current_state() const {
static constexpr std::string_view leader_state = "Leader";
static constexpr std::string_view follower_state = "Follower";
static constexpr std::string_view candidate_state = "Candidate";
if (is_leader()) {
return leader_state;
}
return is_follower() ? follower_state : candidate_state;
}
bool is_prevote_candidate() const {
return is_candidate() && std::get<candidate>(_state).is_prevote;
}

View File

@@ -323,7 +323,7 @@ struct no_other_voting_member : public error {
};
struct request_aborted : public error {
request_aborted() : error("Request is aborted by a caller") {}
request_aborted(const std::string& error_msg) : error(error_msg) {}
};
inline bool is_uncertainty(const std::exception& e) {

View File

@@ -428,7 +428,7 @@ future<> server_impl::wait_for_next_tick(seastar::abort_source* as) {
try {
co_await (as ? _tick_promise->get_shared_future(*as) : _tick_promise->get_shared_future());
} catch (abort_requested_exception&) {
throw request_aborted();
throw request_aborted(format("Aborted while waiting for next tick on server: {}, latest applied entry: {}", _id, _applied_idx));
}
}
@@ -446,7 +446,7 @@ future<> server_impl::wait_for_leader(seastar::abort_source* as) {
try {
co_await (as ? _leader_promise->get_shared_future(*as) : _leader_promise->get_shared_future());
} catch (abort_requested_exception&) {
throw request_aborted();
throw request_aborted(format("Aborted while waiting for leader on server: {}, latest applied entry: {}", _id, _applied_idx));
}
}
@@ -458,7 +458,8 @@ future<> server_impl::wait_for_state_change(seastar::abort_source* as) {
try {
return as ? _state_change_promise->get_shared_future(*as) : _state_change_promise->get_shared_future();
} catch (abort_requested_exception&) {
throw request_aborted();
throw request_aborted(format(
"Aborted while waiting for state change on server: {}, latest applied entry: {}, current state: {}", _id, _applied_idx, _fsm->current_state()));
}
}
@@ -496,9 +497,19 @@ future<bool> server_impl::trigger_snapshot(seastar::abort_source* as) {
as->check();
}
} catch (abort_requested_exception&) {
throw request_aborted();
throw request_aborted(
format("Aborted in snapshot trigger waiting for index: {}, last persisted snapshot descriptor idx: {}, on server: {}, latest applied entry: {}",
awaited_idx,
_snapshot_desc_idx,
_id,
_applied_idx));
} catch (seastar::broken_condition_variable&) {
throw request_aborted();
throw request_aborted(format("Condition variable is broken in snapshot trigger waiting for index: {}, last persisted snapshot descriptor idx: {}, on "
"server: {}, latest applied entry: {}",
awaited_idx,
_snapshot_desc_idx,
_id,
_applied_idx));
}
logger.debug(
@@ -575,7 +586,7 @@ future<> server_impl::wait_for_entry(entry_id eid, wait_type type, seastar::abor
check_not_aborted();
if (as && as->abort_requested()) {
throw request_aborted();
throw request_aborted(format("Abort requested before waiting for entry with idx: {}, term: {}", eid.idx, eid.term));
}
auto& container = type == wait_type::committed ? _awaited_commits : _awaited_applies;
@@ -624,7 +635,8 @@ future<> server_impl::wait_for_entry(entry_id eid, wait_type type, seastar::abor
assert(inserted);
if (as) {
it->second.abort = as->subscribe([it = it, &container] () noexcept {
it->second.done.set_exception(request_aborted());
it->second.done.set_exception(
request_aborted(format("Abort requested while waiting for entry with idx: {}, term: {}", it->first, it->second.term)));
container.erase(it);
});
assert(it->second.abort);
@@ -642,7 +654,11 @@ future<entry_id> server_impl::add_entry_on_leader(command cmd, seastar::abort_so
try {
memory_permit = co_await _fsm->wait_for_memory_permit(as, log::memory_usage_of(cmd, _config.max_command_size));
} catch (semaphore_aborted&) {
throw request_aborted();
throw request_aborted(
format("Semaphore aborted while waiting for memory availability for adding entry on leader in term: {}, on server: {}, current term: {}",
t,
_id,
_fsm->get_current_term()));
}
if (t == _fsm->get_current_term()) {
break;
@@ -689,7 +705,9 @@ future<> server_impl::do_on_leader_with_retries(seastar::abort_source* as, Async
while (true) {
if (as && as->abort_requested()) {
throw request_aborted();
throw request_aborted(format("Request aborted while performing action on leader, current leader: {}, previous leader: {}",
leader ? leader.to_sstring() : "unknown",
prev_leader ? prev_leader.to_sstring() : "unknown"));
}
check_not_aborted();
if (leader == server_id{}) {
@@ -1429,7 +1447,7 @@ term_t server_impl::get_current_term() const {
future<> server_impl::wait_for_apply(index_t idx, abort_source* as) {
if (as && as->abort_requested()) {
throw request_aborted();
throw request_aborted(format("Aborted before waiting for applying entry: {}, last applied entry: {}", idx, _applied_idx));
}
check_not_aborted();
@@ -1440,7 +1458,8 @@ future<> server_impl::wait_for_apply(index_t idx, abort_source* as) {
auto it = _awaited_indexes.emplace(idx, awaited_index{{}, {}});
if (as) {
it->second.abort = as->subscribe([this, it] () noexcept {
it->second.promise.set_exception(request_aborted());
it->second.promise.set_exception(
request_aborted(format("Aborted while waiting to apply entry: {}, last applied entry: {}", it->first, _applied_idx)));
_awaited_indexes.erase(it);
});
assert(it->second.abort);
@@ -1467,13 +1486,15 @@ future<read_barrier_reply> server_impl::execute_read_barrier(server_id from, sea
logger.trace("[{}] execute_read_barrier read id is {} for commit idx {}",
_id, rid->first, rid->second);
if (as && as->abort_requested()) {
return make_exception_future<read_barrier_reply>(request_aborted());
return make_exception_future<read_barrier_reply>(
request_aborted(format("Abort requested before waiting for read barrier from {}, read id is {} for commit idx {}", from, rid->first, rid->second)));
}
_reads.push_back({rid->first, rid->second, {}, {}});
auto read = std::prev(_reads.end());
if (as) {
read->abort = as->subscribe([this, read] () noexcept {
read->promise.set_exception(request_aborted());
read->abort = as->subscribe([this, read, from] () noexcept {
read->promise.set_exception(
request_aborted(format("Abort requested while waiting for read barrier from {}, read id is {} for commit idx {}", from, read->id, read->idx)));
_reads.erase(read);
});
assert(read->abort);
@@ -1676,12 +1697,14 @@ future<> server_impl::set_configuration(config_member_set c_new, seastar::abort_
auto f = _non_joint_conf_commit_promise.emplace().promise.get_future();
if (as) {
_non_joint_conf_commit_promise->abort = as->subscribe([this] () noexcept {
_non_joint_conf_commit_promise->abort = as->subscribe([this, idx = e.idx, term = e.term] () noexcept {
// If we're inside this callback, the subscription wasn't destroyed yet.
// The subscription is destroyed when the field is reset, so if we're here, the field must be engaged.
assert(_non_joint_conf_commit_promise);
// Whoever resolves the promise must reset the field. Thus, if we're here, the promise is not resolved.
std::exchange(_non_joint_conf_commit_promise, std::nullopt)->promise.set_exception(request_aborted{});
std::exchange(_non_joint_conf_commit_promise, std::nullopt)
->promise.set_exception(request_aborted(
format("Aborted while setting configuration (at index: {}, term: {}, current config: {})", idx, term, _fsm->get_configuration())));
});
}

View File

@@ -320,7 +320,8 @@ future<> group0_state_machine::transfer_snapshot(raft::server_id from_id, raft::
co_await _sp.mutate_locally({std::move(history_mut)}, nullptr);
} catch (const abort_requested_exception&) {
throw raft::request_aborted();
throw raft::request_aborted(format(
"Abort requested while transferring snapshot from ID/IP: {}/{}, snapshot descriptor id: {}, snapshot index: {}", from_id, from_ip, snp.id, snp.idx));
}
}