diff --git a/service/raft/raft_sys_table_storage.cc b/service/raft/raft_sys_table_storage.cc index 0a8c10b02b..f9bfd713c3 100644 --- a/service/raft/raft_sys_table_storage.cc +++ b/service/raft/raft_sys_table_storage.cc @@ -180,19 +180,18 @@ future<> raft_sys_table_storage::store_snapshot_descriptor(const raft::snapshot_ {_group_id.id, "PREVIOUS", srv.addr.id.id, srv.can_vote}, cql3::query_processor::cache_internal::yes); } - // Also update the latest snapshot id in `system.raft` table - static const auto store_latest_id_cql = format("INSERT INTO system.{} (group_id, snapshot_id) VALUES (?, ?)", - db::system_keyspace::RAFT); - co_await _qp.execute_internal( - store_latest_id_cql, - {_group_id.id, snap.id.id}, - cql3::query_processor::cache_internal::yes - ); + if (preserve_log_entries > snap.idx) { - co_return; + static const auto store_latest_id_cql = format("INSERT INTO system.{} (group_id, snapshot_id) VALUES (?, ?)", + db::system_keyspace::RAFT); + co_await _qp.execute_internal( + store_latest_id_cql, + {_group_id.id, snap.id.id}, + cql3::query_processor::cache_internal::yes + ); + } else { + co_await update_snapshot_and_truncate_log_tail(snap, preserve_log_entries); } - // TODO: make truncation and snapshot update in `system.raft` atomic - co_await truncate_log_tail(raft::index_t(static_cast(snap.idx) - static_cast(preserve_log_entries))); }); } @@ -306,9 +305,20 @@ future<> raft_sys_table_storage::abort() { return std::move(_pending_op_fut); } -future<> raft_sys_table_storage::truncate_log_tail(raft::index_t idx) { - static const auto truncate_cql = format("DELETE FROM system.{} WHERE group_id = ? AND \"index\" <= ?", db::system_keyspace::RAFT); - return _qp.execute_internal(truncate_cql, {_group_id.id, int64_t(idx)}, cql3::query_processor::cache_internal::yes).discard_result(); +future<> raft_sys_table_storage::update_snapshot_and_truncate_log_tail(const raft::snapshot_descriptor &snap, size_t preserve_log_entries) { + // Update snapshot and truncate logs in `system.raft` atomically + raft::index_t log_tail_idx = raft::index_t(static_cast(snap.idx) - static_cast(preserve_log_entries)); + static const auto store_latest_id_and_truncate_log_tail_cql = format( + "BEGIN UNLOGGED BATCH" + " INSERT INTO system.{} (group_id, snapshot_id) VALUES (?, ?);" // store latest id + " DELETE FROM system.{} WHERE group_id = ? AND \"index\" <= ?;" // truncate log tail + "APPLY BATCH", + db::system_keyspace::RAFT, db::system_keyspace::RAFT); + return _qp.execute_internal( + store_latest_id_and_truncate_log_tail_cql, + {_group_id.id, snap.id.id, _group_id.id, int64_t(log_tail_idx)}, + cql3::query_processor::cache_internal::yes + ).discard_result(); } future<> raft_sys_table_storage::execute_with_linearization_point(std::function()> f) { diff --git a/service/raft/raft_sys_table_storage.hh b/service/raft/raft_sys_table_storage.hh index 482d68e71f..fda8c30702 100644 --- a/service/raft/raft_sys_table_storage.hh +++ b/service/raft/raft_sys_table_storage.hh @@ -87,7 +87,7 @@ private: future<> do_store_log_entries(const std::vector& entries); // Truncate all entries from the persisted log with indices <= idx // Called from the `store_snapshot` function. - future<> truncate_log_tail(raft::index_t idx); + future<> update_snapshot_and_truncate_log_tail(const raft::snapshot_descriptor &snap, size_t preserve_log_entries); future<> execute_with_linearization_point(std::function()> f); }; diff --git a/test/raft/raft_sys_table_storage_test.cc b/test/raft/raft_sys_table_storage_test.cc index 7664436cea..630d8a0425 100644 --- a/test/raft/raft_sys_table_storage_test.cc +++ b/test/raft/raft_sys_table_storage_test.cc @@ -182,7 +182,7 @@ SEASTAR_TEST_CASE(test_store_snapshot_truncate_log_tail) { co_await storage.store_snapshot_descriptor(snp, preserve_log_entries); raft::log_entries loaded_entries = co_await storage.load_log(); - BOOST_CHECK_EQUAL(loaded_entries.size(), 2); + BOOST_CHECK_EQUAL(loaded_entries.size(), preserve_log_entries); for (size_t i = 0, end = loaded_entries.size(); i != end; ++i) { BOOST_CHECK(*entries[i + 1] == *loaded_entries[i]); }