diff --git a/main.cc b/main.cc index 66cd165edf..28ea3ab583 100644 --- a/main.cc +++ b/main.cc @@ -1508,6 +1508,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl // This ensures we can't mis-mash older records with a newer crashed run. // I.e: never keep replay_positions alive across a restart cycle. sys_ks.local().drop_truncation_rp_records().get(); + sys_ks.local().drop_all_commitlog_cleanup_records().get(); db.invoke_on_all([] (replica::database& db) { db.get_tables_metadata().for_each_table([] (table_id, lw_shared_ptr table) { diff --git a/replica/database.hh b/replica/database.hh index 08a2d8d83c..756c03a5fd 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -827,7 +827,7 @@ public: const locator::effective_replication_map_ptr& get_effective_replication_map() const { return _erm; } void update_effective_replication_map(locator::effective_replication_map_ptr); [[gnu::always_inline]] bool uses_tablets() const; - future<> cleanup_tablet(locator::tablet_id); + future<> cleanup_tablet(database&, db::system_keyspace&, locator::tablet_id); future find_partition(schema_ptr, reader_permit permit, const dht::decorated_key& key) const; future find_row(schema_ptr, reader_permit permit, const dht::decorated_key& partition_key, clustering_key clustering_key) const; shard_id shard_of(const mutation& m) const { diff --git a/replica/table.cc b/replica/table.cc index 2170ffe189..ff2a18722c 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -3205,7 +3205,7 @@ future<> compaction_group::cleanup() { co_await _t._cache.invalidate(std::move(updater), p_range); } -future<> table::cleanup_tablet(locator::tablet_id tid) { +future<> table::cleanup_tablet(db::system_keyspace& sys_ks, locator::tablet_id tid) { auto holder = async_gate().hold(); auto& sg = _storage_groups[tid.value()]; @@ -3221,6 +3221,19 @@ future<> table::cleanup_tablet(locator::tablet_id tid) { //co_await _cg.stop(); co_await cg_ptr->flush(); co_await cg_ptr->cleanup(); + // FIXME: at this point _highest_rp might be greater than the replay_position of the last cleaned mutation, + // and can cover some mutations which weren't cleaned, causing them to be lost during replay. + // + // This should be okay, because writes are not supposed to race with cleanups + // in the first place, but it would be better to extract the exact replay_position from + // the actually flushed/deleted sstables, like discard_sstable() does, so that the mutations + // cleaned from sstables are exactly the same as the ones cleaned from commitlog. + co_await sys_ks.save_commitlog_cleanup_record(schema()->id(), sg->token_range(), _highest_rp); + // This is the only place (outside of reboot) where we delete unneeded commitlog cleanup + // records. This isn't ideal -- it would be more natural if the unneeded records + // were deleted as soon as they become unneeded -- but this gets the job done with a + // minimal amount of code. + co_await sys_ks.drop_old_range_cleanup_records(db.commitlog()->min_position()); } tlogger.info("Cleaned up tablet {} of table {}.{} successfully.", tid, _schema->ks_name(), _schema->cf_name()); diff --git a/service/storage_service.cc b/service/storage_service.cc index 73aef33be4..edcb3d8117 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -4981,9 +4981,9 @@ future<> storage_service::cleanup_tablet(locator::global_tablet_id tablet) { } shard = *shard_opt; } - return _db.invoke_on(shard, [tablet] (replica::database& db) { + return _db.invoke_on(shard, [tablet, &sys_ks = _sys_ks] (replica::database& db) { auto& table = db.find_column_family(tablet.table); - return table.cleanup_tablet(tablet.tablet); + return table.cleanup_tablet(sys_ks.local(), tablet.tablet); }); }); }