raft: server: track last persisted snapshot descriptor index
Also introduce a condition variable notified whenever this index is updated. Will be user in following commits.
This commit is contained in:
@@ -122,6 +122,8 @@ private:
|
|||||||
std::optional<shared_promise<>> _state_change_promise;
|
std::optional<shared_promise<>> _state_change_promise;
|
||||||
// Index of the last entry applied to `_state_machine`.
|
// Index of the last entry applied to `_state_machine`.
|
||||||
index_t _applied_idx;
|
index_t _applied_idx;
|
||||||
|
// Index of the last persisted snapshot descriptor.
|
||||||
|
index_t _snapshot_desc_idx;
|
||||||
std::list<active_read> _reads;
|
std::list<active_read> _reads;
|
||||||
std::multimap<index_t, awaited_index> _awaited_indexes;
|
std::multimap<index_t, awaited_index> _awaited_indexes;
|
||||||
|
|
||||||
@@ -134,6 +136,9 @@ private:
|
|||||||
// Signaled when apply index is changed
|
// Signaled when apply index is changed
|
||||||
condition_variable _applied_index_changed;
|
condition_variable _applied_index_changed;
|
||||||
|
|
||||||
|
// Signaled when _snapshot_desc_idx is changed
|
||||||
|
condition_variable _snapshot_desc_idx_changed;
|
||||||
|
|
||||||
struct stop_apply_fiber{}; // exception to send when apply fiber is needs to be stopepd
|
struct stop_apply_fiber{}; // exception to send when apply fiber is needs to be stopepd
|
||||||
|
|
||||||
struct removed_from_config{}; // sent to applier_fiber when we're not a leader and we're outside the current configuration
|
struct removed_from_config{}; // sent to applier_fiber when we're not a leader and we're outside the current configuration
|
||||||
@@ -371,9 +376,10 @@ future<> server_impl::start() {
|
|||||||
_events);
|
_events);
|
||||||
|
|
||||||
_applied_idx = index_t{0};
|
_applied_idx = index_t{0};
|
||||||
|
_snapshot_desc_idx = index_t{0};
|
||||||
if (snapshot.id) {
|
if (snapshot.id) {
|
||||||
co_await _state_machine->load_snapshot(snapshot.id);
|
co_await _state_machine->load_snapshot(snapshot.id);
|
||||||
_applied_idx = snapshot.idx;
|
_snapshot_desc_idx = _applied_idx = snapshot.idx;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!rpc_config.current.empty()) {
|
if (!rpc_config.current.empty()) {
|
||||||
@@ -1017,6 +1023,8 @@ future<> server_impl::process_fsm_output(index_t& last_stable, fsm_output&& batc
|
|||||||
logger.trace("[{}] io_fiber storing snapshot {}", _id, snp.id);
|
logger.trace("[{}] io_fiber storing snapshot {}", _id, snp.id);
|
||||||
// Persist the snapshot
|
// Persist the snapshot
|
||||||
co_await _persistence->store_snapshot_descriptor(snp, max_trailing_entries);
|
co_await _persistence->store_snapshot_descriptor(snp, max_trailing_entries);
|
||||||
|
_snapshot_desc_idx = snp.idx;
|
||||||
|
_snapshot_desc_idx_changed.broadcast();
|
||||||
_stats.store_snapshot++;
|
_stats.store_snapshot++;
|
||||||
// If this is locally generated snapshot there is no need to
|
// If this is locally generated snapshot there is no need to
|
||||||
// load it.
|
// load it.
|
||||||
|
|||||||
Reference in New Issue
Block a user