raft: server: inline poll_fsm_output
This commit is contained in:
@@ -225,12 +225,6 @@ private:
|
||||
// to be applied.
|
||||
void signal_applied();
|
||||
|
||||
// Wait until there is, and return `fsm` output that needs to be handled.
|
||||
// This includes a list of the entries that need to be logged.
|
||||
// The logged entries are eventually discarded from the state machine
|
||||
// after applying a snapshot.
|
||||
future<fsm_output> poll_fsm_output();
|
||||
|
||||
// This fiber processes FSM output by doing the following steps in order:
|
||||
// - persist the current term and vote
|
||||
// - persist unstable log entries on disk.
|
||||
@@ -996,16 +990,6 @@ static rpc_config_diff diff_address_sets(const server_address_set& prev, const c
|
||||
return result;
|
||||
}
|
||||
|
||||
future<fsm_output> server_impl::poll_fsm_output() {
|
||||
co_await _sm_events.when(std::bind_front(&fsm::has_output, _fsm.get()));
|
||||
|
||||
while (utils::get_local_injector().enter("poll_fsm_output/pause")) {
|
||||
co_await seastar::sleep(std::chrono::milliseconds(100));
|
||||
}
|
||||
|
||||
co_return _fsm->get_output();
|
||||
}
|
||||
|
||||
future<> server_impl::process_fsm_output(index_t& last_stable, fsm_output&& batch) {
|
||||
if (batch.term_and_vote) {
|
||||
// Current term and vote are always persisted
|
||||
@@ -1145,7 +1129,13 @@ future<> server_impl::io_fiber(index_t last_stable) {
|
||||
logger.trace("[{}] io_fiber start", _id);
|
||||
try {
|
||||
while (true) {
|
||||
auto batch = co_await poll_fsm_output();
|
||||
co_await _sm_events.when(std::bind_front(&fsm::has_output, _fsm.get()));
|
||||
|
||||
while (utils::get_local_injector().enter("poll_fsm_output/pause")) {
|
||||
co_await seastar::sleep(std::chrono::milliseconds(100));
|
||||
}
|
||||
|
||||
auto batch = _fsm->get_output();
|
||||
_stats.polls++;
|
||||
|
||||
co_await process_fsm_output(last_stable, std::move(batch));
|
||||
|
||||
Reference in New Issue
Block a user