raft: Store snapshot update and truncate log atomically

In case the snapshot update fails, we don't truncate commit log.

Fixes scylladb/scylladb#9603

Closes scylladb/scylladb#15540
This commit is contained in:
Michael Huang
2023-09-25 08:17:33 -04:00
committed by Kamil Braun
parent ecceb554c3
commit 1640f83fdc
3 changed files with 26 additions and 16 deletions

View File

@@ -180,19 +180,18 @@ future<> raft_sys_table_storage::store_snapshot_descriptor(const raft::snapshot_
{_group_id.id, "PREVIOUS", srv.addr.id.id, srv.can_vote},
cql3::query_processor::cache_internal::yes);
}
// Also update the latest snapshot id in `system.raft` table
static const auto store_latest_id_cql = format("INSERT INTO system.{} (group_id, snapshot_id) VALUES (?, ?)",
db::system_keyspace::RAFT);
co_await _qp.execute_internal(
store_latest_id_cql,
{_group_id.id, snap.id.id},
cql3::query_processor::cache_internal::yes
);
if (preserve_log_entries > snap.idx) {
co_return;
static const auto store_latest_id_cql = format("INSERT INTO system.{} (group_id, snapshot_id) VALUES (?, ?)",
db::system_keyspace::RAFT);
co_await _qp.execute_internal(
store_latest_id_cql,
{_group_id.id, snap.id.id},
cql3::query_processor::cache_internal::yes
);
} else {
co_await update_snapshot_and_truncate_log_tail(snap, preserve_log_entries);
}
// TODO: make truncation and snapshot update in `system.raft` atomic
co_await truncate_log_tail(raft::index_t(static_cast<uint64_t>(snap.idx) - static_cast<uint64_t>(preserve_log_entries)));
});
}
@@ -306,9 +305,20 @@ future<> raft_sys_table_storage::abort() {
return std::move(_pending_op_fut);
}
future<> raft_sys_table_storage::truncate_log_tail(raft::index_t idx) {
static const auto truncate_cql = format("DELETE FROM system.{} WHERE group_id = ? AND \"index\" <= ?", db::system_keyspace::RAFT);
return _qp.execute_internal(truncate_cql, {_group_id.id, int64_t(idx)}, cql3::query_processor::cache_internal::yes).discard_result();
future<> raft_sys_table_storage::update_snapshot_and_truncate_log_tail(const raft::snapshot_descriptor &snap, size_t preserve_log_entries) {
// Update snapshot and truncate logs in `system.raft` atomically
raft::index_t log_tail_idx = raft::index_t(static_cast<uint64_t>(snap.idx) - static_cast<uint64_t>(preserve_log_entries));
static const auto store_latest_id_and_truncate_log_tail_cql = format(
"BEGIN UNLOGGED BATCH"
" INSERT INTO system.{} (group_id, snapshot_id) VALUES (?, ?);" // store latest id
" DELETE FROM system.{} WHERE group_id = ? AND \"index\" <= ?;" // truncate log tail
"APPLY BATCH",
db::system_keyspace::RAFT, db::system_keyspace::RAFT);
return _qp.execute_internal(
store_latest_id_and_truncate_log_tail_cql,
{_group_id.id, snap.id.id, _group_id.id, int64_t(log_tail_idx)},
cql3::query_processor::cache_internal::yes
).discard_result();
}
future<> raft_sys_table_storage::execute_with_linearization_point(std::function<future<>()> f) {

View File

@@ -87,7 +87,7 @@ private:
future<> do_store_log_entries(const std::vector<raft::log_entry_ptr>& entries);
// Truncate all entries from the persisted log with indices <= idx
// Called from the `store_snapshot` function.
future<> truncate_log_tail(raft::index_t idx);
future<> update_snapshot_and_truncate_log_tail(const raft::snapshot_descriptor &snap, size_t preserve_log_entries);
future<> execute_with_linearization_point(std::function<future<>()> f);
};

View File

@@ -182,7 +182,7 @@ SEASTAR_TEST_CASE(test_store_snapshot_truncate_log_tail) {
co_await storage.store_snapshot_descriptor(snp, preserve_log_entries);
raft::log_entries loaded_entries = co_await storage.load_log();
BOOST_CHECK_EQUAL(loaded_entries.size(), 2);
BOOST_CHECK_EQUAL(loaded_entries.size(), preserve_log_entries);
for (size_t i = 0, end = loaded_entries.size(); i != end; ++i) {
BOOST_CHECK(*entries[i + 1] == *loaded_entries[i]);
}