raft: move poll_output() from fsm to server
`server` was the only user of this function and it can now be implemented using `fsm`'s public interface. In later commits we'll extend the logic of `io_fiber` to also subscribe to other events, triggered by `server` API calls, not only to outputs from `fsm`.
This commit is contained in:
@@ -300,15 +300,6 @@ void fsm::become_candidate(bool is_prevote, bool is_leadership_transfer) {
|
||||
}
|
||||
}
|
||||
|
||||
future<fsm_output> fsm::poll_output() {
|
||||
co_await _sm_events.when(std::bind_front(&fsm::has_output, this));
|
||||
|
||||
while (utils::get_local_injector().enter("fsm::poll_output/pause")) {
|
||||
co_await seastar::sleep(std::chrono::milliseconds(100));
|
||||
}
|
||||
co_return get_output();
|
||||
}
|
||||
|
||||
bool fsm::has_output() const {
|
||||
logger.trace("fsm::has_output() {} stable index: {} last index: {}",
|
||||
_my_id, _log.stable_idx(), _log.last_idx());
|
||||
|
||||
17
raft/fsm.hh
17
raft/fsm.hh
@@ -146,9 +146,13 @@ struct leader {
|
||||
// in-memory state machine with a catch-all API step(message)
|
||||
// method. The method handles any kind of input and performs the
|
||||
// needed state machine state transitions. To get state machine output
|
||||
// poll_output() function has to be called. This call produces an output
|
||||
// get_output() function has to be called. To check first if
|
||||
// any new output is present, call has_output(). To wait for new
|
||||
// new output events, use the sm_events condition variable passed
|
||||
// to fsm constructor; fs` signals it each time new output may appear.
|
||||
// The get_output() call produces an output
|
||||
// object, which encapsulates a list of actions that must be
|
||||
// performed until the next poll_output() call can be made. The time is
|
||||
// performed until the next get_output() call can be made. The time is
|
||||
// represented with a logical timer. The client is responsible for
|
||||
// periodically invoking tick() method, which advances the state
|
||||
// machine time and allows it to track such events as election or
|
||||
@@ -417,13 +421,6 @@ public:
|
||||
// committed to the persistent Raft log afterwards.
|
||||
template<typename T> const log_entry& add_entry(T command);
|
||||
|
||||
// Wait until there is, and return state machine output that
|
||||
// needs to be handled.
|
||||
// This includes a list of the entries that need
|
||||
// to be logged. The logged entries are eventually
|
||||
// discarded from the state machine after applying a snapshot.
|
||||
future<fsm_output> poll_output();
|
||||
|
||||
// Check if there is any state machine output
|
||||
// that `get_output()` will return.
|
||||
bool has_output() const;
|
||||
@@ -439,7 +436,7 @@ public:
|
||||
|
||||
// Feed one Raft RPC message into the state machine.
|
||||
// Advances the state machine state and generates output,
|
||||
// accessible via poll_output().
|
||||
// accessible via get_output().
|
||||
template <typename Message>
|
||||
void step(server_id from, Message&& msg);
|
||||
|
||||
|
||||
@@ -225,6 +225,12 @@ private:
|
||||
// to be applied.
|
||||
void signal_applied();
|
||||
|
||||
// Wait until there is, and return `fsm` output that needs to be handled.
|
||||
// This includes a list of the entries that need to be logged.
|
||||
// The logged entries are eventually discarded from the state machine
|
||||
// after applying a snapshot.
|
||||
future<fsm_output> poll_fsm_output();
|
||||
|
||||
// This fiber processes FSM output by doing the following steps in order:
|
||||
// - persist the current term and vote
|
||||
// - persist unstable log entries on disk.
|
||||
@@ -989,11 +995,21 @@ static rpc_config_diff diff_address_sets(const server_address_set& prev, const c
|
||||
return result;
|
||||
}
|
||||
|
||||
future<fsm_output> server_impl::poll_fsm_output() {
|
||||
co_await _sm_events.when(std::bind_front(&fsm::has_output, _fsm.get()));
|
||||
|
||||
while (utils::get_local_injector().enter("poll_fsm_output/pause")) {
|
||||
co_await seastar::sleep(std::chrono::milliseconds(100));
|
||||
}
|
||||
|
||||
co_return _fsm->get_output();
|
||||
}
|
||||
|
||||
future<> server_impl::io_fiber(index_t last_stable) {
|
||||
logger.trace("[{}] io_fiber start", _id);
|
||||
try {
|
||||
while (true) {
|
||||
auto batch = co_await _fsm->poll_output();
|
||||
auto batch = co_await poll_fsm_output();
|
||||
_stats.polls++;
|
||||
|
||||
if (batch.term_and_vote) {
|
||||
|
||||
@@ -100,7 +100,7 @@ SEASTAR_TEST_CASE(test_group0_cmd_merge) {
|
||||
raft::command cmd;
|
||||
ser::serialize(cmd, group0_cmd);
|
||||
auto merges = mm.canonical_mutation_merge_count;
|
||||
utils::get_local_injector().enable("fsm::poll_output/pause");
|
||||
utils::get_local_injector().enable("poll_fsm_output/pause");
|
||||
auto f = when_all(
|
||||
group0.add_entry(cmd, raft::wait_type::applied, nullptr),
|
||||
group0.add_entry(cmd, raft::wait_type::applied, nullptr),
|
||||
@@ -108,7 +108,7 @@ SEASTAR_TEST_CASE(test_group0_cmd_merge) {
|
||||
// Sleep is needed for all the entries added above to hit the log
|
||||
seastar::sleep(std::chrono::milliseconds(100)).get();
|
||||
// After unpause all entries added above will be committed and applied together
|
||||
utils::get_local_injector().disable("fsm::poll_output/pause");
|
||||
utils::get_local_injector().disable("poll_fsm_output/pause");
|
||||
f.get(); // Wait for apply to complete
|
||||
// Thete should be two calls to migration manager since two out of
|
||||
// three command should be merged.
|
||||
|
||||
Reference in New Issue
Block a user