Merge 'Enable incremental compaction on off-strategy' from Raphael "Raph" Carvalho
Off-strategy suffers with a 100% space overhead, as it adopted a sort of all or nothing approach. Meaning all input sstables, living in maintenance set, are kept alive until they're all reshaped according to the strategy criteria. Input sstables in off-strategy are very likely to be mostly disjoint, so it can greatly benefit from incremental compaction. The incremental compaction approach is not only good for decreasing disk usage, but also memory usage (as metadata of input and output live in memory), and file desc count, which takes memory away from OS. Turns out that this approach also greatly simplifies the off-strategy impl in compaction manager, as it no longer have to maintain new unused sstables and mark them for deletion on failure, and also unlink intermediary sstables used between reshape rounds. Fixes https://github.com/scylladb/scylladb/issues/14992. Closes scylladb/scylladb#15400 * github.com:scylladb/scylladb: test: Verify that off-strategy can do incremental compaction compaction: Clear pending_replacement list when tombstone GC is disabled compaction: Enable incremental compaction on off-strategy compaction: Extend reshape type to allow for incremental compaction compaction: Move reshape_compaction in the source compaction: Enable incremental compaction only if replacer callback is engaged
This commit is contained in:
@@ -608,8 +608,8 @@ protected:
|
||||
return _used_garbage_collected_sstables;
|
||||
}
|
||||
|
||||
bool enable_garbage_collected_sstable_writer() const noexcept {
|
||||
return _contains_multi_fragment_runs && _max_sstable_size != std::numeric_limits<uint64_t>::max();
|
||||
virtual bool enable_garbage_collected_sstable_writer() const noexcept {
|
||||
return _contains_multi_fragment_runs && _max_sstable_size != std::numeric_limits<uint64_t>::max() && bool(_replacer);
|
||||
}
|
||||
public:
|
||||
compaction& operator=(const compaction&) = delete;
|
||||
@@ -1038,56 +1038,6 @@ void compacted_fragments_writer::consume_end_of_stream() {
|
||||
}
|
||||
}
|
||||
|
||||
class reshape_compaction : public compaction {
|
||||
public:
|
||||
reshape_compaction(table_state& table_s, compaction_descriptor descriptor, compaction_data& cdata)
|
||||
: compaction(table_s, std::move(descriptor), cdata) {
|
||||
}
|
||||
|
||||
virtual sstables::sstable_set make_sstable_set_for_input() const override {
|
||||
return sstables::make_partitioned_sstable_set(_schema, false);
|
||||
}
|
||||
|
||||
flat_mutation_reader_v2 make_sstable_reader(schema_ptr s,
|
||||
reader_permit permit,
|
||||
const dht::partition_range& range,
|
||||
const query::partition_slice& slice,
|
||||
tracing::trace_state_ptr trace,
|
||||
streamed_mutation::forwarding sm_fwd,
|
||||
mutation_reader::forwarding mr_fwd) const override {
|
||||
return _compacting->make_local_shard_sstable_reader(std::move(s),
|
||||
std::move(permit),
|
||||
range,
|
||||
slice,
|
||||
std::move(trace),
|
||||
sm_fwd,
|
||||
mr_fwd,
|
||||
default_read_monitor_generator());
|
||||
}
|
||||
|
||||
std::string_view report_start_desc() const override {
|
||||
return "Reshaping";
|
||||
}
|
||||
|
||||
std::string_view report_finish_desc() const override {
|
||||
return "Reshaped";
|
||||
}
|
||||
|
||||
virtual compaction_writer create_compaction_writer(const dht::decorated_key& dk) override {
|
||||
auto sst = _sstable_creator(this_shard_id());
|
||||
setup_new_sstable(sst);
|
||||
|
||||
sstable_writer_config cfg = make_sstable_writer_config(compaction_type::Reshape);
|
||||
return compaction_writer{sst->get_writer(*_schema, partitions_per_sstable(), cfg, get_encoding_stats()), sst};
|
||||
}
|
||||
|
||||
virtual void stop_sstable_writer(compaction_writer* writer) override {
|
||||
if (writer) {
|
||||
finish_new_sstable(writer);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
class regular_compaction : public compaction {
|
||||
// keeps track of monitors for input sstable, which are responsible for adjusting backlog as compaction progresses.
|
||||
mutable compaction_read_monitor_generator _monitor_generator;
|
||||
@@ -1202,12 +1152,13 @@ private:
|
||||
}
|
||||
|
||||
void update_pending_ranges() {
|
||||
if (!_sstable_set || _sstable_set->all()->empty() || _cdata.pending_replacements.empty()) { // set can be empty for testing scenario.
|
||||
auto pending_replacements = std::exchange(_cdata.pending_replacements, {});
|
||||
if (!_sstable_set || _sstable_set->all()->empty() || pending_replacements.empty()) { // set can be empty for testing scenario.
|
||||
return;
|
||||
}
|
||||
// Releases reference to sstables compacted by this compaction or another, both of which belongs
|
||||
// to the same column family
|
||||
for (auto& pending_replacement : _cdata.pending_replacements) {
|
||||
for (auto& pending_replacement : pending_replacements) {
|
||||
for (auto& sst : pending_replacement.removed) {
|
||||
// Set may not contain sstable to be removed because this compaction may have started
|
||||
// before the creation of that sstable.
|
||||
@@ -1221,7 +1172,75 @@ private:
|
||||
}
|
||||
}
|
||||
_selector.emplace(_sstable_set->make_incremental_selector());
|
||||
_cdata.pending_replacements.clear();
|
||||
}
|
||||
};
|
||||
|
||||
class reshape_compaction : public regular_compaction {
|
||||
private:
|
||||
bool has_sstable_replacer() const noexcept {
|
||||
return bool(_replacer);
|
||||
}
|
||||
public:
|
||||
reshape_compaction(table_state& table_s, compaction_descriptor descriptor, compaction_data& cdata)
|
||||
: regular_compaction(table_s, std::move(descriptor), cdata) {
|
||||
}
|
||||
|
||||
virtual sstables::sstable_set make_sstable_set_for_input() const override {
|
||||
return sstables::make_partitioned_sstable_set(_schema, false);
|
||||
}
|
||||
|
||||
// Unconditionally enable incremental compaction if the strategy specifies a max output size, e.g. LCS.
|
||||
virtual bool enable_garbage_collected_sstable_writer() const noexcept override {
|
||||
return _max_sstable_size != std::numeric_limits<uint64_t>::max() && bool(_replacer);
|
||||
}
|
||||
|
||||
flat_mutation_reader_v2 make_sstable_reader(schema_ptr s,
|
||||
reader_permit permit,
|
||||
const dht::partition_range& range,
|
||||
const query::partition_slice& slice,
|
||||
tracing::trace_state_ptr trace,
|
||||
streamed_mutation::forwarding sm_fwd,
|
||||
mutation_reader::forwarding mr_fwd) const override {
|
||||
return _compacting->make_local_shard_sstable_reader(std::move(s),
|
||||
std::move(permit),
|
||||
range,
|
||||
slice,
|
||||
std::move(trace),
|
||||
sm_fwd,
|
||||
mr_fwd,
|
||||
default_read_monitor_generator());
|
||||
}
|
||||
|
||||
std::string_view report_start_desc() const override {
|
||||
return "Reshaping";
|
||||
}
|
||||
|
||||
std::string_view report_finish_desc() const override {
|
||||
return "Reshaped";
|
||||
}
|
||||
|
||||
virtual compaction_writer create_compaction_writer(const dht::decorated_key& dk) override {
|
||||
auto sst = _sstable_creator(this_shard_id());
|
||||
setup_new_sstable(sst);
|
||||
|
||||
sstable_writer_config cfg = make_sstable_writer_config(compaction_type::Reshape);
|
||||
return compaction_writer{sst->get_writer(*_schema, partitions_per_sstable(), cfg, get_encoding_stats()), sst};
|
||||
}
|
||||
|
||||
virtual void stop_sstable_writer(compaction_writer* writer) override {
|
||||
if (writer) {
|
||||
if (has_sstable_replacer()) {
|
||||
regular_compaction::stop_sstable_writer(writer);
|
||||
} else {
|
||||
finish_new_sstable(writer);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
virtual void on_end_of_compaction() override {
|
||||
if (has_sstable_replacer()) {
|
||||
regular_compaction::on_end_of_compaction();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -379,7 +379,8 @@ future<sstables::compaction_result> compaction_task_executor::compact_sstables_a
|
||||
|
||||
co_return res;
|
||||
}
|
||||
future<sstables::compaction_result> compaction_task_executor::compact_sstables(sstables::compaction_descriptor descriptor, sstables::compaction_data& cdata, on_replacement& on_replace, compaction_manager::can_purge_tombstones can_purge) {
|
||||
future<sstables::compaction_result> compaction_task_executor::compact_sstables(sstables::compaction_descriptor descriptor, sstables::compaction_data& cdata, on_replacement& on_replace, compaction_manager::can_purge_tombstones can_purge,
|
||||
sstables::offstrategy offstrategy) {
|
||||
table_state& t = *_compacting_table;
|
||||
if (can_purge) {
|
||||
descriptor.enable_garbage_collection(t.main_sstable_set());
|
||||
@@ -388,7 +389,7 @@ future<sstables::compaction_result> compaction_task_executor::compact_sstables(s
|
||||
auto sst = t.make_sstable();
|
||||
return sst;
|
||||
};
|
||||
descriptor.replacer = [this, &t, &on_replace] (sstables::compaction_completion_desc desc) {
|
||||
descriptor.replacer = [this, &t, &on_replace, offstrategy] (sstables::compaction_completion_desc desc) {
|
||||
t.get_compaction_strategy().notify_completion(t, desc.old_sstables, desc.new_sstables);
|
||||
_cm.propagate_replacement(t, desc.old_sstables, desc.new_sstables);
|
||||
// on_replace updates the compacting registration with the old and new
|
||||
@@ -405,7 +406,7 @@ future<sstables::compaction_result> compaction_task_executor::compact_sstables(s
|
||||
// - are not being compacted.
|
||||
on_replace.on_addition(desc.new_sstables);
|
||||
auto old_sstables = desc.old_sstables;
|
||||
_cm.on_compaction_completion(t, std::move(desc), sstables::offstrategy::no).get();
|
||||
_cm.on_compaction_completion(t, std::move(desc), offstrategy).get();
|
||||
on_replace.on_removal(old_sstables);
|
||||
};
|
||||
|
||||
@@ -1311,53 +1312,38 @@ protected:
|
||||
}
|
||||
private:
|
||||
future<> run_offstrategy_compaction(sstables::compaction_data& cdata) {
|
||||
// This procedure will reshape sstables in maintenance set until it's ready for
|
||||
// integration into main set.
|
||||
// It may require N reshape rounds before the set satisfies the strategy invariant.
|
||||
// This procedure also only updates maintenance set at the end, on success.
|
||||
// Otherwise, some overlapping could be introduced in the set after each reshape
|
||||
// round, progressively degrading read amplification until integration happens.
|
||||
// The drawback of this approach is the 2x space requirement as the old sstables
|
||||
// will only be deleted at the end. The impact of this space requirement is reduced
|
||||
// by the fact that off-strategy is serialized across all tables, meaning that the
|
||||
// actual requirement is the size of the largest table's maintenance set.
|
||||
// Incrementally reshape the SSTables in maintenance set. The output of each reshape
|
||||
// round is merged into the main set. The common case is that off-strategy input
|
||||
// is mostly disjoint, e.g. repair-based node ops, then all the input will be
|
||||
// reshaped in a single round. The incremental approach allows us to be space
|
||||
// efficient (avoiding a 100% overhead) as we will incrementally replace input
|
||||
// SSTables from maintenance set by output ones into main set.
|
||||
|
||||
table_state& t = *_compacting_table;
|
||||
const auto& maintenance_sstables = t.maintenance_sstable_set();
|
||||
|
||||
// Filter out sstables that require view building, to avoid a race between off-strategy
|
||||
// and view building. Refs: #11882
|
||||
const auto old_sstables = boost::copy_range<std::vector<sstables::shared_sstable>>(*maintenance_sstables.all()
|
||||
| boost::adaptors::filtered([] (const sstables::shared_sstable& sst) {
|
||||
return !sst->requires_view_building();
|
||||
}));
|
||||
std::vector<sstables::shared_sstable> reshape_candidates = old_sstables;
|
||||
std::unordered_set<sstables::shared_sstable> new_unused_sstables;
|
||||
|
||||
auto cleanup_new_unused_sstables_on_failure = defer([&new_unused_sstables] {
|
||||
for (auto& sst : new_unused_sstables) {
|
||||
sst->mark_for_deletion();
|
||||
}
|
||||
});
|
||||
auto get_reshape_candidates = [&t] () {
|
||||
return boost::copy_range<std::vector<sstables::shared_sstable>>(*t.maintenance_sstable_set().all()
|
||||
| boost::adaptors::filtered([](const sstables::shared_sstable &sst) {
|
||||
return !sst->requires_view_building();
|
||||
}));
|
||||
};
|
||||
|
||||
auto get_next_job = [&] () -> std::optional<sstables::compaction_descriptor> {
|
||||
auto desc = t.get_compaction_strategy().get_reshaping_job(reshape_candidates, t.schema(), sstables::reshape_mode::strict);
|
||||
auto desc = t.get_compaction_strategy().get_reshaping_job(get_reshape_candidates(), t.schema(), sstables::reshape_mode::strict);
|
||||
return desc.sstables.size() ? std::make_optional(std::move(desc)) : std::nullopt;
|
||||
};
|
||||
|
||||
std::exception_ptr err;
|
||||
while (auto desc = get_next_job()) {
|
||||
desc->creator = [&new_unused_sstables, &t] (shard_id dummy) {
|
||||
auto sst = t.make_sstable();
|
||||
new_unused_sstables.insert(sst);
|
||||
return sst;
|
||||
};
|
||||
desc->owned_ranges = _compaction_state.owned_ranges_ptr;
|
||||
auto input = boost::copy_range<std::unordered_set<sstables::shared_sstable>>(desc->sstables);
|
||||
auto compacting = compacting_sstable_registration(_cm, _cm.get_compaction_state(&t), desc->sstables);
|
||||
auto on_replace = compacting.update_on_sstable_replacement();
|
||||
|
||||
sstables::compaction_result ret;
|
||||
try {
|
||||
ret = co_await sstables::compact_sstables(std::move(*desc), cdata, t);
|
||||
sstables::compaction_result _ = co_await compact_sstables(std::move(*desc), _compaction_data, on_replace,
|
||||
compaction_manager::can_purge_tombstones::no,
|
||||
sstables::offstrategy::yes);
|
||||
} catch (sstables::compaction_stopped_exception&) {
|
||||
// If off-strategy compaction stopped on user request, let's not discard the partial work.
|
||||
// Therefore, both un-reshaped and reshaped data will be integrated into main set, allowing
|
||||
@@ -1366,33 +1352,20 @@ private:
|
||||
break;
|
||||
}
|
||||
_performed = true;
|
||||
|
||||
// update list of reshape candidates without input but with output added to it
|
||||
auto it = boost::remove_if(reshape_candidates, [&] (auto& s) { return input.contains(s); });
|
||||
reshape_candidates.erase(it, reshape_candidates.end());
|
||||
std::move(ret.new_sstables.begin(), ret.new_sstables.end(), std::back_inserter(reshape_candidates));
|
||||
|
||||
// If compaction strategy is unable to reshape input data in a single round, it may happen that a SSTable A
|
||||
// created in round 1 will be compacted in a next round producing SSTable B. As SSTable A is no longer needed,
|
||||
// it can be removed immediately. Let's remove all such SSTables immediately to reduce off-strategy space requirement.
|
||||
// Input SSTables from maintenance set can only be removed later, as SSTable sets are only updated on completion.
|
||||
auto can_remove_now = [&] (const sstables::shared_sstable& s) { return new_unused_sstables.contains(s); };
|
||||
for (auto&& sst : input) {
|
||||
if (can_remove_now(sst)) {
|
||||
co_await sst->unlink();
|
||||
new_unused_sstables.erase(std::move(sst));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// at this moment reshape_candidates contains a set of sstables ready for integration into main set
|
||||
auto completion_desc = sstables::compaction_completion_desc{
|
||||
.old_sstables = std::move(old_sstables),
|
||||
.new_sstables = std::move(reshape_candidates)
|
||||
};
|
||||
co_await _cm.on_compaction_completion(t, std::move(completion_desc), sstables::offstrategy::yes);
|
||||
// There might be some remaining sstables in maintenance set that didn't require reshape, or the
|
||||
// user has aborted off-strategy. So we can only integrate them into the main set, such that
|
||||
// they become candidates for regular compaction. We cannot hold them forever in maintenance set,
|
||||
// as that causes read and space amplification issues.
|
||||
if (auto sstables = get_reshape_candidates(); sstables.size()) {
|
||||
auto completion_desc = sstables::compaction_completion_desc{
|
||||
.old_sstables = sstables, // removes from maintenance set.
|
||||
.new_sstables = sstables, // adds into main set.
|
||||
};
|
||||
co_await _cm.on_compaction_completion(t, std::move(completion_desc), sstables::offstrategy::yes);
|
||||
}
|
||||
|
||||
cleanup_new_unused_sstables_on_failure.cancel();
|
||||
if (err) {
|
||||
co_await coroutine::return_exception_ptr(std::move(err));
|
||||
}
|
||||
|
||||
@@ -525,7 +525,8 @@ protected:
|
||||
future<sstables::compaction_result> compact_sstables_and_update_history(sstables::compaction_descriptor descriptor, sstables::compaction_data& cdata, on_replacement&,
|
||||
compaction_manager::can_purge_tombstones can_purge = compaction_manager::can_purge_tombstones::yes);
|
||||
future<sstables::compaction_result> compact_sstables(sstables::compaction_descriptor descriptor, sstables::compaction_data& cdata, on_replacement&,
|
||||
compaction_manager::can_purge_tombstones can_purge = compaction_manager::can_purge_tombstones::yes);
|
||||
compaction_manager::can_purge_tombstones can_purge = compaction_manager::can_purge_tombstones::yes,
|
||||
sstables::offstrategy offstrategy = sstables::offstrategy::no);
|
||||
future<> update_history(::compaction::table_state& t, const sstables::compaction_result& res, const sstables::compaction_data& cdata);
|
||||
bool should_update_history(sstables::compaction_type ct) {
|
||||
return ct == sstables::compaction_type::Compaction;
|
||||
|
||||
@@ -4987,8 +4987,8 @@ SEASTAR_TEST_CASE(compaction_optimization_to_avoid_bloom_filter_checks) {
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(cleanup_incremental_compaction_test) {
|
||||
return test_env::do_with_async([] (test_env& env) {
|
||||
static future<> run_incremental_compaction_test(sstables::offstrategy offstrategy, std::function<future<>(table_for_tests&, owned_ranges_ptr)> run_compaction) {
|
||||
return test_env::do_with_async([run_compaction = std::move(run_compaction), offstrategy] (test_env& env) {
|
||||
auto builder = schema_builder("tests", "test")
|
||||
.with_column("id", utf8_type, column_kind::partition_key)
|
||||
.with_column("value", int32_type);
|
||||
@@ -5011,7 +5011,7 @@ SEASTAR_TEST_CASE(cleanup_incremental_compaction_test) {
|
||||
std::vector<shared_sstable> ssts;
|
||||
size_t sstables_closed = 0;
|
||||
size_t sstables_closed_during_cleanup = 0;
|
||||
static constexpr size_t sstables_nr = 10;
|
||||
const size_t sstables_nr = s->max_compaction_threshold() * 2;
|
||||
|
||||
dht::token_range_vector owned_token_ranges;
|
||||
|
||||
@@ -5033,7 +5033,7 @@ SEASTAR_TEST_CASE(cleanup_incremental_compaction_test) {
|
||||
std::move(mut2)
|
||||
});
|
||||
sstables::test(sst).set_run_identifier(run_identifier); // in order to produce multi-fragment run.
|
||||
sst->set_sstable_level(1);
|
||||
sst->set_sstable_level(offstrategy ? 0 : 1);
|
||||
|
||||
// every sstable will be eligible for cleanup, by having both an owned and unowned token.
|
||||
owned_token_ranges.push_back(dht::token_range::make_singular(sst->get_last_decorated_key().token()));
|
||||
@@ -5050,8 +5050,8 @@ SEASTAR_TEST_CASE(cleanup_incremental_compaction_test) {
|
||||
t->disable_auto_compaction().get();
|
||||
const dht::token_range_vector empty_owned_ranges;
|
||||
for (auto&& sst : ssts) {
|
||||
testlog.info("run id {}", sst->run_identifier());
|
||||
column_family_test(t).add_sstable(sst).get();
|
||||
t->add_sstable_and_update_cache(sst, offstrategy).get();
|
||||
testlog.info("run id {}, refcount = {}", sst->run_identifier(), sst.use_count());
|
||||
column_family_test::update_sstables_known_generation(*t, sst->generation());
|
||||
observers.push_back(sst->add_on_closed_handler([&] (sstable& sst) mutable {
|
||||
auto sstables = t->get_sstables();
|
||||
@@ -5069,7 +5069,7 @@ SEASTAR_TEST_CASE(cleanup_incremental_compaction_test) {
|
||||
}
|
||||
ssts = {}; // releases references
|
||||
auto owned_ranges_ptr = make_lw_shared<const dht::token_range_vector>(std::move(owned_token_ranges));
|
||||
t->perform_cleanup_compaction(std::move(owned_ranges_ptr)).get();
|
||||
run_compaction(t, std::move(owned_ranges_ptr)).get();
|
||||
BOOST_REQUIRE(cm.sstables_requiring_cleanup(t->as_table_state()).empty());
|
||||
testlog.info("Cleanup has finished");
|
||||
}
|
||||
@@ -5085,6 +5085,19 @@ SEASTAR_TEST_CASE(cleanup_incremental_compaction_test) {
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(cleanup_incremental_compaction_test) {
|
||||
return run_incremental_compaction_test(sstables::offstrategy::no, [] (table_for_tests& t, owned_ranges_ptr owned_ranges) -> future<> {
|
||||
return t->perform_cleanup_compaction(std::move(owned_ranges));
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(offstrategy_incremental_compaction_test) {
|
||||
return run_incremental_compaction_test(sstables::offstrategy::yes, [] (table_for_tests& t, owned_ranges_ptr owned_ranges) -> future<> {
|
||||
bool performed = co_await t->perform_offstrategy_compaction(tasks::task_info{});
|
||||
BOOST_REQUIRE(performed);
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(cleanup_during_offstrategy_incremental_compaction_test) {
|
||||
return test_env::do_with_async([] (test_env& env) {
|
||||
auto builder = schema_builder("tests", "test")
|
||||
|
||||
Reference in New Issue
Block a user