From 1e786d9d6438f383f6dfa9484b80884999ccde3b Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Thu, 18 Jan 2024 16:45:33 +0100 Subject: [PATCH] raft: server: framework for handling server requests Add data structures and modify `io_fiber` code to prepare it for handling requests generated by the `server`, not just `fsm`. Used in later commits. --- raft/server.cc | 43 ++++++++++++++++++++++++++++++++++++------- 1 file changed, 36 insertions(+), 7 deletions(-) diff --git a/raft/server.cc b/raft/server.cc index 6447e0cb89..84df62a2c3 100644 --- a/raft/server.cc +++ b/raft/server.cc @@ -105,7 +105,7 @@ public: void register_metrics() override; size_t max_command_size() const override; private: - seastar::condition_variable _sm_events; + seastar::condition_variable _events; std::unique_ptr _rpc; std::unique_ptr _state_machine; @@ -214,6 +214,14 @@ private: }; absl::flat_hash_map _append_request_status; + struct server_requests { + bool empty() const { + return true; + } + }; + + server_requests _new_server_requests; + // Called to commit entries (on a leader or otherwise). void notify_waiters(std::map& waiters, const std::vector& entries); @@ -225,11 +233,15 @@ private: // to be applied. void signal_applied(); - // This fiber processes FSM output by doing the following steps in order: + // Processes FSM output by doing the following steps in order: // - persist the current term and vote // - persist unstable log entries on disk. // - send out messages future<> process_fsm_output(index_t& stable_idx, fsm_output&&); + + future<> process_server_requests(server_requests&&); + + // Processes new FSM outputs and server requests as they appear. future<> io_fiber(index_t stable_idx); // This fiber runs in the background and applies committed entries. @@ -356,7 +368,7 @@ future<> server_impl::start() { .max_log_size = _config.max_log_size, .enable_prevoting = _config.enable_prevoting }, - _sm_events); + _events); _applied_idx = index_t{0}; if (snapshot.id) { @@ -1125,20 +1137,37 @@ future<> server_impl::process_fsm_output(index_t& last_stable, fsm_output&& batc } } +future<> server_impl::process_server_requests(server_requests&& requests) { + co_return; +} + future<> server_impl::io_fiber(index_t last_stable) { logger.trace("[{}] io_fiber start", _id); try { while (true) { - co_await _sm_events.when(std::bind_front(&fsm::has_output, _fsm.get())); + bool has_fsm_output = false; + bool has_server_request = false; + co_await _events.when([this, &has_fsm_output, &has_server_request] { + has_fsm_output = _fsm->has_output(); + has_server_request = !_new_server_requests.empty(); + return has_fsm_output || has_server_request; + }); while (utils::get_local_injector().enter("poll_fsm_output/pause")) { co_await seastar::sleep(std::chrono::milliseconds(100)); } - auto batch = _fsm->get_output(); _stats.polls++; - co_await process_fsm_output(last_stable, std::move(batch)); + if (has_fsm_output) { + auto batch = _fsm->get_output(); + co_await process_fsm_output(last_stable, std::move(batch)); + } + + if (has_server_request) { + auto requests = std::exchange(_new_server_requests, server_requests{}); + co_await process_server_requests(std::move(requests)); + } } } catch (seastar::broken_condition_variable&) { // Log fiber is stopped explicitly. @@ -1451,7 +1480,7 @@ future<> server_impl::abort(sstring reason) { _aborted = std::move(reason); logger.trace("[{}]: abort() called", _id); _fsm->stop(); - _sm_events.broken(); + _events.broken(); // IO and applier fibers may update waiters and start new snapshot // transfers, so abort them first