From a246bb39ef4d89d1a76b62cb958399b859a6f934 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Chojnowski?= Date: Sat, 20 Jan 2024 00:31:41 +0100 Subject: [PATCH] db: commitlog_replayer: ignore mutations affected by (tablet) cleanups To avoid data resurrection, mutations deleted by cleanup operations have to be skipped during commitlog replay. This patch implements this, based on the metadata recorded on cleanup operations into system.commitlog_cleanups. --- db/commitlog/commitlog_replayer.cc | 27 ++++++++++++++++++++++++--- 1 file changed, 24 insertions(+), 3 deletions(-) diff --git a/db/commitlog/commitlog_replayer.cc b/db/commitlog/commitlog_replayer.cc index f4d2d22d65..f3c3ab81c3 100644 --- a/db/commitlog/commitlog_replayer.cc +++ b/db/commitlog/commitlog_replayer.cc @@ -96,10 +96,20 @@ public: auto j = i->second.find(uuid); return j != i->second.end() ? j->second : replay_position(); } + replay_position token_min_pos(const table_id& uuid, unsigned shard, dht::token token) const { + replay_position rp; + if (auto i = _cleanup_map.find({uuid, shard}); i != _cleanup_map.end()) { + if (auto candidate_rp = i->second.get(dht::token::to_int64(token))) { + rp = *candidate_rp; + } + } + return rp; + } seastar::sharded& _db; seastar::sharded& _sys_ks; shard_rpm_map _rpm; + db::system_keyspace::commitlog_cleanup_map _cleanup_map; shard_rp_map _min_pos; }; @@ -121,6 +131,7 @@ future<> db::commitlog_replayer::impl::init() { } } }); + _cleanup_map = co_await _sys_ks.local().get_commitlog_cleanup_records(); // bugfix: the above code will not_ detect if sstables // are _missing_ from a CF. And because of re-sharding, we can't @@ -214,6 +225,10 @@ future<> db::commitlog_replayer::impl::process(stats* s, commitlog::buffer_and_r } auto uuid = fm.column_family_id(); + auto& table = _db.local().find_column_family(uuid); + const auto& schema = *table.schema(); + auto token = fm.token(schema); + auto cf_rp = cf_min_pos(uuid, shard_id); if (rp <= cf_rp) { rlogger.trace("entry {} at {} is younger than recorded replay position {}. skipping", fm.column_family_id(), rp, cf_rp); @@ -221,9 +236,15 @@ future<> db::commitlog_replayer::impl::process(stats* s, commitlog::buffer_and_r return make_ready_future<>(); } - auto& table = _db.local().find_column_family(uuid); - const auto& schema = *table.schema(); - auto shard = table.get_effective_replication_map()->shard_of(schema, fm.token(schema)); + auto token_range_rp = token_min_pos(uuid, shard_id, token); + if (rp <= token_range_rp) { + rlogger.trace("entry {}, token {} in table {}, is younger than recorded replay position {} for its token range. skipping", + rp, token, fm.column_family_id(), token_range_rp); + s->skipped_mutations++; + return make_ready_future<>(); + } + + auto shard = table.get_effective_replication_map()->shard_of(schema, token); return _db.invoke_on(shard, [this, cer = std::move(cer), &src_cm, rp] (replica::database& db) mutable -> future<> { auto& fm = cer.mutation(); // TODO: might need better verification that the deserialized mutation