diff --git a/raft/server.cc b/raft/server.cc index 84df62a2c3..5ec19904cb 100644 --- a/raft/server.cc +++ b/raft/server.cc @@ -122,6 +122,8 @@ private: std::optional> _state_change_promise; // Index of the last entry applied to `_state_machine`. index_t _applied_idx; + // Index of the last persisted snapshot descriptor. + index_t _snapshot_desc_idx; std::list _reads; std::multimap _awaited_indexes; @@ -134,6 +136,9 @@ private: // Signaled when apply index is changed condition_variable _applied_index_changed; + // Signaled when _snapshot_desc_idx is changed + condition_variable _snapshot_desc_idx_changed; + struct stop_apply_fiber{}; // exception to send when apply fiber is needs to be stopepd struct removed_from_config{}; // sent to applier_fiber when we're not a leader and we're outside the current configuration @@ -371,9 +376,10 @@ future<> server_impl::start() { _events); _applied_idx = index_t{0}; + _snapshot_desc_idx = index_t{0}; if (snapshot.id) { co_await _state_machine->load_snapshot(snapshot.id); - _applied_idx = snapshot.idx; + _snapshot_desc_idx = _applied_idx = snapshot.idx; } if (!rpc_config.current.empty()) { @@ -1017,6 +1023,8 @@ future<> server_impl::process_fsm_output(index_t& last_stable, fsm_output&& batc logger.trace("[{}] io_fiber storing snapshot {}", _id, snp.id); // Persist the snapshot co_await _persistence->store_snapshot_descriptor(snp, max_trailing_entries); + _snapshot_desc_idx = snp.idx; + _snapshot_desc_idx_changed.broadcast(); _stats.store_snapshot++; // If this is locally generated snapshot there is no need to // load it.