replica: table: populate system.commitlog_cleanups on tablet cleanup
To avoid data resurrection after cleanup, we have to filter out the cleaned mutations during commitlog replay. In this patch, we get tablet cleanup to record the affected set of mutations to system.commitlog_cleanups. In a later patch, we will use these records for filtering during commitlog replay.
This commit is contained in:
1
main.cc
1
main.cc
@@ -1508,6 +1508,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
// This ensures we can't mis-mash older records with a newer crashed run.
|
||||
// I.e: never keep replay_positions alive across a restart cycle.
|
||||
sys_ks.local().drop_truncation_rp_records().get();
|
||||
sys_ks.local().drop_all_commitlog_cleanup_records().get();
|
||||
|
||||
db.invoke_on_all([] (replica::database& db) {
|
||||
db.get_tables_metadata().for_each_table([] (table_id, lw_shared_ptr<replica::table> table) {
|
||||
|
||||
@@ -827,7 +827,7 @@ public:
|
||||
const locator::effective_replication_map_ptr& get_effective_replication_map() const { return _erm; }
|
||||
void update_effective_replication_map(locator::effective_replication_map_ptr);
|
||||
[[gnu::always_inline]] bool uses_tablets() const;
|
||||
future<> cleanup_tablet(locator::tablet_id);
|
||||
future<> cleanup_tablet(database&, db::system_keyspace&, locator::tablet_id);
|
||||
future<const_mutation_partition_ptr> find_partition(schema_ptr, reader_permit permit, const dht::decorated_key& key) const;
|
||||
future<const_row_ptr> find_row(schema_ptr, reader_permit permit, const dht::decorated_key& partition_key, clustering_key clustering_key) const;
|
||||
shard_id shard_of(const mutation& m) const {
|
||||
|
||||
@@ -3205,7 +3205,7 @@ future<> compaction_group::cleanup() {
|
||||
co_await _t._cache.invalidate(std::move(updater), p_range);
|
||||
}
|
||||
|
||||
future<> table::cleanup_tablet(locator::tablet_id tid) {
|
||||
future<> table::cleanup_tablet(db::system_keyspace& sys_ks, locator::tablet_id tid) {
|
||||
auto holder = async_gate().hold();
|
||||
|
||||
auto& sg = _storage_groups[tid.value()];
|
||||
@@ -3221,6 +3221,19 @@ future<> table::cleanup_tablet(locator::tablet_id tid) {
|
||||
//co_await _cg.stop();
|
||||
co_await cg_ptr->flush();
|
||||
co_await cg_ptr->cleanup();
|
||||
// FIXME: at this point _highest_rp might be greater than the replay_position of the last cleaned mutation,
|
||||
// and can cover some mutations which weren't cleaned, causing them to be lost during replay.
|
||||
//
|
||||
// This should be okay, because writes are not supposed to race with cleanups
|
||||
// in the first place, but it would be better to extract the exact replay_position from
|
||||
// the actually flushed/deleted sstables, like discard_sstable() does, so that the mutations
|
||||
// cleaned from sstables are exactly the same as the ones cleaned from commitlog.
|
||||
co_await sys_ks.save_commitlog_cleanup_record(schema()->id(), sg->token_range(), _highest_rp);
|
||||
// This is the only place (outside of reboot) where we delete unneeded commitlog cleanup
|
||||
// records. This isn't ideal -- it would be more natural if the unneeded records
|
||||
// were deleted as soon as they become unneeded -- but this gets the job done with a
|
||||
// minimal amount of code.
|
||||
co_await sys_ks.drop_old_range_cleanup_records(db.commitlog()->min_position());
|
||||
}
|
||||
|
||||
tlogger.info("Cleaned up tablet {} of table {}.{} successfully.", tid, _schema->ks_name(), _schema->cf_name());
|
||||
|
||||
@@ -4981,9 +4981,9 @@ future<> storage_service::cleanup_tablet(locator::global_tablet_id tablet) {
|
||||
}
|
||||
shard = *shard_opt;
|
||||
}
|
||||
return _db.invoke_on(shard, [tablet] (replica::database& db) {
|
||||
return _db.invoke_on(shard, [tablet, &sys_ks = _sys_ks] (replica::database& db) {
|
||||
auto& table = db.find_column_family(tablet.table);
|
||||
return table.cleanup_tablet(tablet.tablet);
|
||||
return table.cleanup_tablet(sys_ks.local(), tablet.tablet);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user