raft: use wait_for_next_tick in read_barrier

Replaced the yield on transport_error
with wait_for_next_tick. Added delays for retries, similar
to add_entry/modify_config: we postpone the next
call attempt if we haven't received new information
about the current leader.
This commit is contained in:
Petr Gusev
2022-11-15 12:31:49 +04:00
parent 5e15c3c9bd
commit 15cc1667d0

View File

@@ -1265,6 +1265,7 @@ future<> server_impl::read_barrier(seastar::abort_source* as) {
logger.trace("[{}] read_barrier start", _id);
index_t read_idx;
server_id prev_leader{};
while (read_idx == index_t{}) {
if (as && as->abort_requested()) {
@@ -1275,32 +1276,39 @@ future<> server_impl::read_barrier(seastar::abort_source* as) {
if (leader == server_id{}) {
co_await wait_for_leader(as);
leader = _fsm->current_leader();
continue;
}
if (prev_leader && leader == prev_leader) {
// This is to protect against busy loop in case we didn't get
// any new information about the current leader.
// This can happen if the server responds with a transient_error with
// an empty leader and the current node has not yet learned the new leader.
// We neglect an excessive delay if the newly elected leader is the same as
// the previous one, this supposed to be a rare.
co_await wait_for_next_tick(as);
prev_leader = leader = server_id{};
continue;
}
prev_leader = leader;
auto applied = _applied_idx;
read_barrier_reply res;
try {
res = co_await get_read_idx(leader, as);
} catch (const transport_error& e) {
logger.trace("[{}] read_barrier on {} resulted in {}; retrying", _id, leader, e);
leader = server_id{};
continue;
}
if (std::holds_alternative<std::monostate>(res)) {
// the leader is not ready to answer because it did not
// committed any entries yet, so wait for any entry to be
// committed (if non were since start of the attempt) and retry.
logger.trace("[{}] read_barrier leader not ready", _id);
co_await wait_for_apply(++applied, as);
} else if (std::holds_alternative<raft::not_a_leader>(res)) {
leader = std::get<not_a_leader>(res).leader;
} else {
auto applied = _applied_idx;
read_barrier_reply res;
bool need_retry = false;
try {
res = co_await get_read_idx(leader, as);
} catch (const transport_error& e) {
logger.trace("[{}] read_barrier on {} resulted in {}; retrying", _id, leader, e);
need_retry = true;
}
if (need_retry) {
leader = server_id{};
co_await yield();
continue;
}
if (std::holds_alternative<std::monostate>(res)) {
// the leader is not ready to answer because it did not
// committed any entries yet, so wait for any entry to be
// committed (if non were since start of the attempt) and retry.
logger.trace("[{}] read_barrier leader not ready", _id);
co_await wait_for_apply(++applied, as);
} else if (std::holds_alternative<raft::not_a_leader>(res)) {
leader = std::get<not_a_leader>(res).leader;
} else {
read_idx = std::get<index_t>(res);
}
read_idx = std::get<index_t>(res);
}
}