From 15cc1667d01c7e75f0fe30f0eaa8fe711baa1ddf Mon Sep 17 00:00:00 2001 From: Petr Gusev Date: Tue, 15 Nov 2022 12:31:49 +0400 Subject: [PATCH] raft: use wait_for_next_tick in read_barrier Replaced the yield on transport_error with wait_for_next_tick. Added delays for retries, similar to add_entry/modify_config: we postpone the next call attempt if we haven't received new information about the current leader. --- raft/server.cc | 58 ++++++++++++++++++++++++++++---------------------- 1 file changed, 33 insertions(+), 25 deletions(-) diff --git a/raft/server.cc b/raft/server.cc index dd0664ca5d..9fb1bff376 100644 --- a/raft/server.cc +++ b/raft/server.cc @@ -1265,6 +1265,7 @@ future<> server_impl::read_barrier(seastar::abort_source* as) { logger.trace("[{}] read_barrier start", _id); index_t read_idx; + server_id prev_leader{}; while (read_idx == index_t{}) { if (as && as->abort_requested()) { @@ -1275,32 +1276,39 @@ future<> server_impl::read_barrier(seastar::abort_source* as) { if (leader == server_id{}) { co_await wait_for_leader(as); leader = _fsm->current_leader(); + continue; + } + if (prev_leader && leader == prev_leader) { + // This is to protect against busy loop in case we didn't get + // any new information about the current leader. + // This can happen if the server responds with a transient_error with + // an empty leader and the current node has not yet learned the new leader. + // We neglect an excessive delay if the newly elected leader is the same as + // the previous one, this supposed to be a rare. + co_await wait_for_next_tick(as); + prev_leader = leader = server_id{}; + continue; + } + prev_leader = leader; + auto applied = _applied_idx; + read_barrier_reply res; + try { + res = co_await get_read_idx(leader, as); + } catch (const transport_error& e) { + logger.trace("[{}] read_barrier on {} resulted in {}; retrying", _id, leader, e); + leader = server_id{}; + continue; + } + if (std::holds_alternative(res)) { + // the leader is not ready to answer because it did not + // committed any entries yet, so wait for any entry to be + // committed (if non were since start of the attempt) and retry. + logger.trace("[{}] read_barrier leader not ready", _id); + co_await wait_for_apply(++applied, as); + } else if (std::holds_alternative(res)) { + leader = std::get(res).leader; } else { - auto applied = _applied_idx; - read_barrier_reply res; - bool need_retry = false; - try { - res = co_await get_read_idx(leader, as); - } catch (const transport_error& e) { - logger.trace("[{}] read_barrier on {} resulted in {}; retrying", _id, leader, e); - need_retry = true; - } - if (need_retry) { - leader = server_id{}; - co_await yield(); - continue; - } - if (std::holds_alternative(res)) { - // the leader is not ready to answer because it did not - // committed any entries yet, so wait for any entry to be - // committed (if non were since start of the attempt) and retry. - logger.trace("[{}] read_barrier leader not ready", _id); - co_await wait_for_apply(++applied, as); - } else if (std::holds_alternative(res)) { - leader = std::get(res).leader; - } else { - read_idx = std::get(res); - } + read_idx = std::get(res); } }