db: commitlog_replayer: ignore mutations affected by (tablet) cleanups

To avoid data resurrection, mutations deleted by cleanup operations
have to be skipped during commitlog replay.

This patch implements this, based on the metadata recorded on cleanup
operations into system.commitlog_cleanups.
This commit is contained in:
Michał Chojnowski
2024-01-20 00:31:41 +01:00
parent f458a1bf3e
commit a246bb39ef

View File

@@ -96,10 +96,20 @@ public:
auto j = i->second.find(uuid);
return j != i->second.end() ? j->second : replay_position();
}
replay_position token_min_pos(const table_id& uuid, unsigned shard, dht::token token) const {
replay_position rp;
if (auto i = _cleanup_map.find({uuid, shard}); i != _cleanup_map.end()) {
if (auto candidate_rp = i->second.get(dht::token::to_int64(token))) {
rp = *candidate_rp;
}
}
return rp;
}
seastar::sharded<replica::database>& _db;
seastar::sharded<db::system_keyspace>& _sys_ks;
shard_rpm_map _rpm;
db::system_keyspace::commitlog_cleanup_map _cleanup_map;
shard_rp_map _min_pos;
};
@@ -121,6 +131,7 @@ future<> db::commitlog_replayer::impl::init() {
}
}
});
_cleanup_map = co_await _sys_ks.local().get_commitlog_cleanup_records();
// bugfix: the above code will not_ detect if sstables
// are _missing_ from a CF. And because of re-sharding, we can't
@@ -214,6 +225,10 @@ future<> db::commitlog_replayer::impl::process(stats* s, commitlog::buffer_and_r
}
auto uuid = fm.column_family_id();
auto& table = _db.local().find_column_family(uuid);
const auto& schema = *table.schema();
auto token = fm.token(schema);
auto cf_rp = cf_min_pos(uuid, shard_id);
if (rp <= cf_rp) {
rlogger.trace("entry {} at {} is younger than recorded replay position {}. skipping", fm.column_family_id(), rp, cf_rp);
@@ -221,9 +236,15 @@ future<> db::commitlog_replayer::impl::process(stats* s, commitlog::buffer_and_r
return make_ready_future<>();
}
auto& table = _db.local().find_column_family(uuid);
const auto& schema = *table.schema();
auto shard = table.get_effective_replication_map()->shard_of(schema, fm.token(schema));
auto token_range_rp = token_min_pos(uuid, shard_id, token);
if (rp <= token_range_rp) {
rlogger.trace("entry {}, token {} in table {}, is younger than recorded replay position {} for its token range. skipping",
rp, token, fm.column_family_id(), token_range_rp);
s->skipped_mutations++;
return make_ready_future<>();
}
auto shard = table.get_effective_replication_map()->shard_of(schema, token);
return _db.invoke_on(shard, [this, cer = std::move(cer), &src_cm, rp] (replica::database& db) mutable -> future<> {
auto& fm = cer.mutation();
// TODO: might need better verification that the deserialized mutation