diff --git a/raft/server.cc b/raft/server.cc index 55dd60f883..6447e0cb89 100644 --- a/raft/server.cc +++ b/raft/server.cc @@ -225,12 +225,6 @@ private: // to be applied. void signal_applied(); - // Wait until there is, and return `fsm` output that needs to be handled. - // This includes a list of the entries that need to be logged. - // The logged entries are eventually discarded from the state machine - // after applying a snapshot. - future poll_fsm_output(); - // This fiber processes FSM output by doing the following steps in order: // - persist the current term and vote // - persist unstable log entries on disk. @@ -996,16 +990,6 @@ static rpc_config_diff diff_address_sets(const server_address_set& prev, const c return result; } -future server_impl::poll_fsm_output() { - co_await _sm_events.when(std::bind_front(&fsm::has_output, _fsm.get())); - - while (utils::get_local_injector().enter("poll_fsm_output/pause")) { - co_await seastar::sleep(std::chrono::milliseconds(100)); - } - - co_return _fsm->get_output(); -} - future<> server_impl::process_fsm_output(index_t& last_stable, fsm_output&& batch) { if (batch.term_and_vote) { // Current term and vote are always persisted @@ -1145,7 +1129,13 @@ future<> server_impl::io_fiber(index_t last_stable) { logger.trace("[{}] io_fiber start", _id); try { while (true) { - auto batch = co_await poll_fsm_output(); + co_await _sm_events.when(std::bind_front(&fsm::has_output, _fsm.get())); + + while (utils::get_local_injector().enter("poll_fsm_output/pause")) { + co_await seastar::sleep(std::chrono::milliseconds(100)); + } + + auto batch = _fsm->get_output(); _stats.polls++; co_await process_fsm_output(last_stable, std::move(batch));