sstables/compaction_manager: Fix logic for filtering out partial sstable runs

ignore_partial_runs() brings confusion because i__p__r() equal to true
doesn't mean filter out partial runs from compaction. It actually means
not caring about compaction of a partial run.

The logic was wrong because any compaction strategy that chooses not to ignore
partial sstable run[1] would have any fragment composing it incorrectly
becoming a candidate for compaction.
This problem could make compaction include only a subset of fragments composing
the partial run or even make the same fragment be compacted twice due to
parallel compaction.

[1]: partial sstable run is a sstable that is still being generated by
compaction and as a result cannot be selected as candidate whatsoever.

Fix is about making sure partial sstable run has none of its fragments
selected for compaction. And also renaming i__p__r.

Fixes #4729.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
Message-Id: <20190807022814.12567-1-raphaelsc@scylladb.com>
This commit is contained in:
Raphael S. Carvalho
2019-08-06 23:28:14 -03:00
committed by Avi Kivity
parent 7d4bf10d87
commit 76cde84540
5 changed files with 71 additions and 9 deletions

View File

@@ -82,8 +82,8 @@ public:
// Return if optimization to rule out sstables based on clustering key filter should be applied.
bool use_clustering_key_filter() const;
// Return true if compaction strategy ignores sstables coming from partial runs.
bool ignore_partial_runs() const;
// Return true if compaction strategy doesn't care if a sstable belonging to partial sstable run is compacted.
bool can_compact_partial_runs() const;
// An estimation of number of compaction for strategy to be satisfied.
int64_t estimated_pending_compactions(column_family& cf) const;

View File

@@ -207,9 +207,13 @@ std::vector<sstables::shared_sstable> compaction_manager::get_candidates(const c
// Filter out sstables that are being compacted.
for (auto& sst : cf.candidates_for_compaction()) {
if (!_compacting_sstables.count(sst) && (!cs.ignore_partial_runs() || !partial_run_identifiers.count(sst->run_identifier()))) {
candidates.push_back(sst);
if (_compacting_sstables.count(sst)) {
continue;
}
if (!cs.can_compact_partial_runs() && partial_run_identifiers.count(sst->run_identifier())) {
continue;
}
candidates.push_back(sst);
}
return candidates;
}

View File

@@ -639,8 +639,8 @@ public:
}
};
bool compaction_strategy::ignore_partial_runs() const {
return _compaction_strategy_impl->ignore_partial_runs();
bool compaction_strategy::can_compact_partial_runs() const {
return _compaction_strategy_impl->can_compact_partial_runs();
}

View File

@@ -86,7 +86,7 @@ public:
return _use_clustering_key_filter;
}
virtual bool ignore_partial_runs() const {
virtual bool can_compact_partial_runs() const {
return false;
}

View File

@@ -3638,9 +3638,9 @@ SEASTAR_TEST_CASE(test_partition_skipping) {
// Must be run in a seastar thread
static
shared_sstable make_sstable_easy(test_env& env, const fs::path& path, flat_mutation_reader rd, sstable_writer_config cfg, const sstables::sstable::version_types version) {
shared_sstable make_sstable_easy(test_env& env, const fs::path& path, flat_mutation_reader rd, sstable_writer_config cfg, const sstables::sstable::version_types version, int64_t generation = 1) {
auto s = rd.schema();
auto sst = env.make_sstable(s, path.string(), 1, version, big);
auto sst = env.make_sstable(s, path.string(), generation, version, big);
sst->write_components(std::move(rd), 1, s, cfg, encoding_stats{}).get();
sst->load().get();
return sst;
@@ -5047,3 +5047,61 @@ SEASTAR_TEST_CASE(basic_interval_map_testing_for_sstable_set) {
return make_ready_future<>();
}
SEASTAR_TEST_CASE(partial_sstable_run_filtered_out_test) {
BOOST_REQUIRE(smp::count == 1);
return test_env::do_with_async([] (test_env& env) {
storage_service_for_tests ssft;
auto s = schema_builder("tests", "partial_sstable_run_filtered_out_test")
.with_column("id", utf8_type, column_kind::partition_key)
.with_column("value", int32_type).build();
auto tmp = tmpdir();
auto cm = make_lw_shared<compaction_manager>();
cm->start();
column_family::config cfg = column_family_test_config();
cfg.datadir = tmp.path().string();
cfg.enable_commitlog = false;
cfg.enable_incremental_backups = false;
auto cl_stats = make_lw_shared<cell_locker_stats>();
auto tracker = make_lw_shared<cache_tracker>();
auto cf = make_lw_shared<column_family>(s, cfg, column_family::no_commitlog(), *cm, *cl_stats, *tracker);
cf->start();
cf->mark_ready_for_writes();
utils::UUID partial_sstable_run_identifier = utils::make_random_uuid();
mutation mut(s, partition_key::from_exploded(*s, {to_bytes("alpha")}));
mut.set_clustered_cell(clustering_key::make_empty(), bytes("value"), data_value(int32_t(1)), 0);
sstable_writer_config sst_cfg;
sst_cfg.run_identifier = partial_sstable_run_identifier;
auto partial_sstable_run_sst = make_sstable_easy(env, tmp.path(), flat_mutation_reader_from_mutations({ std::move(mut) }),
sst_cfg, la, 1);
column_family_test(cf).add_sstable(partial_sstable_run_sst);
column_family_test::update_sstables_known_generation(*cf, partial_sstable_run_sst->generation());
auto generation_exists = [&cf] (int64_t generation) {
auto sstables = cf->get_sstables();
auto entry = boost::range::find_if(*sstables, [generation] (shared_sstable sst) { return generation == sst->generation(); });
return entry != sstables->end();
};
BOOST_REQUIRE(generation_exists(partial_sstable_run_sst->generation()));
// register partial sstable run
auto c_info = make_lw_shared<compaction_info>();
c_info->run_identifier = partial_sstable_run_identifier;
cm->register_compaction(c_info);
cf->compact_all_sstables().get();
// make sure partial sstable run has none of its fragments compacted.
BOOST_REQUIRE(generation_exists(partial_sstable_run_sst->generation()));
cm->stop().get();
});
}