raft: server: framework for handling server requests

Add data structures and modify `io_fiber` code to prepare it for
handling requests generated by the `server`, not just `fsm`.
Used in later commits.
This commit is contained in:
Kamil Braun
2024-01-18 16:45:33 +01:00
parent 8d9b0a6538
commit 1e786d9d64

View File

@@ -105,7 +105,7 @@ public:
void register_metrics() override; void register_metrics() override;
size_t max_command_size() const override; size_t max_command_size() const override;
private: private:
seastar::condition_variable _sm_events; seastar::condition_variable _events;
std::unique_ptr<rpc> _rpc; std::unique_ptr<rpc> _rpc;
std::unique_ptr<state_machine> _state_machine; std::unique_ptr<state_machine> _state_machine;
@@ -214,6 +214,14 @@ private:
}; };
absl::flat_hash_map<server_id, append_request_queue> _append_request_status; absl::flat_hash_map<server_id, append_request_queue> _append_request_status;
struct server_requests {
bool empty() const {
return true;
}
};
server_requests _new_server_requests;
// Called to commit entries (on a leader or otherwise). // Called to commit entries (on a leader or otherwise).
void notify_waiters(std::map<index_t, op_status>& waiters, const std::vector<log_entry_ptr>& entries); void notify_waiters(std::map<index_t, op_status>& waiters, const std::vector<log_entry_ptr>& entries);
@@ -225,11 +233,15 @@ private:
// to be applied. // to be applied.
void signal_applied(); void signal_applied();
// This fiber processes FSM output by doing the following steps in order: // Processes FSM output by doing the following steps in order:
// - persist the current term and vote // - persist the current term and vote
// - persist unstable log entries on disk. // - persist unstable log entries on disk.
// - send out messages // - send out messages
future<> process_fsm_output(index_t& stable_idx, fsm_output&&); future<> process_fsm_output(index_t& stable_idx, fsm_output&&);
future<> process_server_requests(server_requests&&);
// Processes new FSM outputs and server requests as they appear.
future<> io_fiber(index_t stable_idx); future<> io_fiber(index_t stable_idx);
// This fiber runs in the background and applies committed entries. // This fiber runs in the background and applies committed entries.
@@ -356,7 +368,7 @@ future<> server_impl::start() {
.max_log_size = _config.max_log_size, .max_log_size = _config.max_log_size,
.enable_prevoting = _config.enable_prevoting .enable_prevoting = _config.enable_prevoting
}, },
_sm_events); _events);
_applied_idx = index_t{0}; _applied_idx = index_t{0};
if (snapshot.id) { if (snapshot.id) {
@@ -1125,20 +1137,37 @@ future<> server_impl::process_fsm_output(index_t& last_stable, fsm_output&& batc
} }
} }
future<> server_impl::process_server_requests(server_requests&& requests) {
co_return;
}
future<> server_impl::io_fiber(index_t last_stable) { future<> server_impl::io_fiber(index_t last_stable) {
logger.trace("[{}] io_fiber start", _id); logger.trace("[{}] io_fiber start", _id);
try { try {
while (true) { while (true) {
co_await _sm_events.when(std::bind_front(&fsm::has_output, _fsm.get())); bool has_fsm_output = false;
bool has_server_request = false;
co_await _events.when([this, &has_fsm_output, &has_server_request] {
has_fsm_output = _fsm->has_output();
has_server_request = !_new_server_requests.empty();
return has_fsm_output || has_server_request;
});
while (utils::get_local_injector().enter("poll_fsm_output/pause")) { while (utils::get_local_injector().enter("poll_fsm_output/pause")) {
co_await seastar::sleep(std::chrono::milliseconds(100)); co_await seastar::sleep(std::chrono::milliseconds(100));
} }
auto batch = _fsm->get_output();
_stats.polls++; _stats.polls++;
co_await process_fsm_output(last_stable, std::move(batch)); if (has_fsm_output) {
auto batch = _fsm->get_output();
co_await process_fsm_output(last_stable, std::move(batch));
}
if (has_server_request) {
auto requests = std::exchange(_new_server_requests, server_requests{});
co_await process_server_requests(std::move(requests));
}
} }
} catch (seastar::broken_condition_variable&) { } catch (seastar::broken_condition_variable&) {
// Log fiber is stopped explicitly. // Log fiber is stopped explicitly.
@@ -1451,7 +1480,7 @@ future<> server_impl::abort(sstring reason) {
_aborted = std::move(reason); _aborted = std::move(reason);
logger.trace("[{}]: abort() called", _id); logger.trace("[{}]: abort() called", _id);
_fsm->stop(); _fsm->stop();
_sm_events.broken(); _events.broken();
// IO and applier fibers may update waiters and start new snapshot // IO and applier fibers may update waiters and start new snapshot
// transfers, so abort them first // transfers, so abort them first