compaction: Fix sstable cleanup after resharding on refresh
Problem can be reproduced easily: 1) wrote some sstables with smp 1 2) shut down scylla 3) moved sstables to upload 4) restarted scylla with smp 2 5) ran refresh (resharding happens, adds sstable to cleanup set and never removes it) 6) cleanup (tries to cleanup resharded sstables which were leaked in the cleanup set) Bumps into assert "Assertion `!sst->is_shared()' failed", as cleanup picks a shared sstable that was leaked and already processed by resharding. Fix is about not inserting shared sstables into cleanup set, as shared sstables are restricted to resharding and cannot be processed later by cleanup (nor it should because resharding itself cleaned up its input files). Dtest: https://github.com/scylladb/scylla-dtest/pull/3206 Fixes #14001. Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com> Closes #14147
This commit is contained in:
committed by
Pavel Emelyanov
parent
17795757d3
commit
156d771101
@@ -1590,6 +1590,10 @@ bool needs_cleanup(const sstables::shared_sstable& sst,
|
||||
|
||||
bool compaction_manager::update_sstable_cleanup_state(table_state& t, const sstables::shared_sstable& sst, const dht::token_range_vector& sorted_owned_ranges) {
|
||||
auto& cs = get_compaction_state(&t);
|
||||
if (sst->is_shared()) {
|
||||
throw std::runtime_error(format("Shared SSTable {} cannot be marked as requiring cleanup, as it can only be processed by resharding",
|
||||
sst->get_filename()));
|
||||
}
|
||||
if (needs_cleanup(sst, sorted_owned_ranges)) {
|
||||
cs.sstables_requiring_cleanup.insert(sst);
|
||||
return true;
|
||||
|
||||
@@ -307,6 +307,8 @@ public:
|
||||
private:
|
||||
future<> try_perform_cleanup(owned_ranges_ptr sorted_owned_ranges, compaction::table_state& t);
|
||||
|
||||
// Add sst to or remove it from the respective compaction_state.sstables_requiring_cleanup set.
|
||||
bool update_sstable_cleanup_state(table_state& t, const sstables::shared_sstable& sst, const dht::token_range_vector& sorted_owned_ranges);
|
||||
public:
|
||||
// Submit a table to be upgraded and wait for its termination.
|
||||
future<> perform_sstable_upgrade(owned_ranges_ptr sorted_owned_ranges, compaction::table_state& t, bool exclude_current_version);
|
||||
@@ -407,9 +409,6 @@ public:
|
||||
return _tombstone_gc_state;
|
||||
};
|
||||
|
||||
// Add sst to or remove it from the respective compaction_state.sstables_requiring_cleanup set.
|
||||
bool update_sstable_cleanup_state(table_state& t, const sstables::shared_sstable& sst, const dht::token_range_vector& sorted_owned_ranges);
|
||||
|
||||
// Uncoditionally erase sst from `sstables_requiring_cleanup`
|
||||
// Returns true iff sst was found and erased.
|
||||
bool erase_sstable_cleanup_state(table_state& t, const sstables::shared_sstable& sst);
|
||||
|
||||
@@ -1136,9 +1136,6 @@ public:
|
||||
// Safely iterate through table states, while performing async operations on them.
|
||||
future<> parallel_foreach_table_state(std::function<future<>(compaction::table_state&)> action);
|
||||
|
||||
// Add sst to or remove it from the sstables_requiring_cleanup set.
|
||||
bool update_sstable_cleanup_state(const sstables::shared_sstable& sst, const dht::token_range_vector& sorted_owned_ranges);
|
||||
|
||||
// Uncoditionally erase sst from `sstables_requiring_cleanup`
|
||||
// Returns true iff sst was found and erased.
|
||||
bool erase_sstable_cleanup_state(const sstables::shared_sstable& sst);
|
||||
|
||||
@@ -155,9 +155,8 @@ collect_all_shared_sstables(sharded<sstables::sstable_directory>& dir, sharded<r
|
||||
auto shared_sstables = d.retrieve_shared_sstables();
|
||||
sstables::sstable_directory::sstable_open_info_vector need_cleanup;
|
||||
if (sorted_owned_ranges_ptr) {
|
||||
auto& table = db.local().find_column_family(ks_name, table_name);
|
||||
co_await d.filter_sstables([&] (sstables::shared_sstable sst) -> future<bool> {
|
||||
if (table.update_sstable_cleanup_state(sst, *sorted_owned_ranges_ptr)) {
|
||||
if (needs_cleanup(sst, *sorted_owned_ranges_ptr)) {
|
||||
need_cleanup.push_back(co_await sst->get_open_info());
|
||||
co_return false;
|
||||
}
|
||||
@@ -242,9 +241,6 @@ future<> reshard(sstables::sstable_directory& dir, sstables::sstable_directory::
|
||||
buckets.emplace_back();
|
||||
co_await coroutine::parallel_for_each(shared_info, [&] (sstables::foreign_sstable_open_info& info) -> future<> {
|
||||
auto sst = co_await dir.load_foreign_sstable(info);
|
||||
if (owned_ranges_ptr) {
|
||||
table.update_sstable_cleanup_state(sst, *owned_ranges_ptr);
|
||||
}
|
||||
// Last bucket gets leftover SSTables
|
||||
if ((buckets.back().size() >= sstables_per_job) && (buckets.size() < num_jobs)) {
|
||||
buckets.emplace_back();
|
||||
|
||||
@@ -2861,12 +2861,6 @@ table::as_data_dictionary() const {
|
||||
return _impl.wrap(*this);
|
||||
}
|
||||
|
||||
bool table::update_sstable_cleanup_state(const sstables::shared_sstable& sst, const dht::token_range_vector& sorted_owned_ranges) {
|
||||
// FIXME: it's possible that the sstable belongs to multiple compaction_groups
|
||||
auto& cg = compaction_group_for_sstable(sst);
|
||||
return get_compaction_manager().update_sstable_cleanup_state(cg.as_table_state(), sst, sorted_owned_ranges);
|
||||
}
|
||||
|
||||
bool table::erase_sstable_cleanup_state(const sstables::shared_sstable& sst) {
|
||||
// FIXME: it's possible that the sstable belongs to multiple compaction_groups
|
||||
auto& cg = compaction_group_for_sstable(sst);
|
||||
|
||||
Reference in New Issue
Block a user