compaction: Disable garbage collected writer if interposer consumer is used
GC writer, used for incremental compaction, cannot be currently used if interposer consumer is used. That's because compaction assumes that GC writer will be operated only by a single compaction writer at a given point in time. With interposer consumer, multiple writers will concurrently operate on the same GC writer, leading to race condition which potentially result in use-after-free. Let's disable GC writer if interposer consumer is enabled. We're not losing anything because GC writer is currently only needed on strategies which don't implement an interposer consumer. Resharding will always disable GC writer, which is the expected behavior because it doesn't support incremental compaction yet. The proper fix, which allows GC writer and interposer consumer to work together, will require more time to implement and test, and for that reason, I am postponing it as #6472 is a showstopper for the current release. Fixes #6472. tests: mode(dev). Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com> Reviewed-by: Glauber Costa <glauber@scylladb.com> Message-Id: <20200526195428.230472-1-raphaelsc@scylladb.com>
This commit is contained in:
committed by
Piotr Sarna
parent
17649ad0b5
commit
097a5e9e07
@@ -132,6 +132,9 @@ public:
|
||||
uint64_t adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate);
|
||||
|
||||
reader_consumer make_interposer_consumer(const mutation_source_metadata& ms_meta, reader_consumer end_consumer);
|
||||
|
||||
// Returns whether or not interposer consumer is used by a given strategy.
|
||||
bool use_interposer_consumer() const;
|
||||
};
|
||||
|
||||
// Creates a compaction_strategy object from one of the strategies available.
|
||||
|
||||
@@ -559,6 +559,7 @@ private:
|
||||
}
|
||||
|
||||
virtual reader_consumer make_interposer_consumer(reader_consumer end_consumer) = 0;
|
||||
virtual bool use_interposer_consumer() const = 0;
|
||||
|
||||
compaction_info finish(std::chrono::time_point<db_clock> started_at, std::chrono::time_point<db_clock> ended_at) {
|
||||
_info->ended_at = std::chrono::duration_cast<std::chrono::milliseconds>(ended_at.time_since_epoch()).count();
|
||||
@@ -640,8 +641,10 @@ public:
|
||||
return garbage_collected_sstable_writer(_gc_sstable_writer_data);
|
||||
}
|
||||
|
||||
bool contains_multi_fragment_runs() const {
|
||||
return _contains_multi_fragment_runs;
|
||||
bool enable_garbage_collected_sstable_writer() const {
|
||||
// FIXME: Disable GC writer if interposer consumer is enabled until they both can work simultaneously.
|
||||
// More details can be found at https://github.com/scylladb/scylla/issues/6472
|
||||
return _contains_multi_fragment_runs && !use_interposer_consumer();
|
||||
}
|
||||
|
||||
template <typename GCConsumer = noop_compacted_fragments_consumer>
|
||||
@@ -745,6 +748,10 @@ public:
|
||||
return _cf.get_compaction_strategy().make_interposer_consumer(_ms_metadata, std::move(end_consumer));
|
||||
}
|
||||
|
||||
bool use_interposer_consumer() const override {
|
||||
return _cf.get_compaction_strategy().use_interposer_consumer();
|
||||
}
|
||||
|
||||
void report_start(const sstring& formatted_msg) const override {
|
||||
clogger.info("Compacting {}", formatted_msg);
|
||||
}
|
||||
@@ -824,7 +831,7 @@ private:
|
||||
void maybe_replace_exhausted_sstables_by_sst(shared_sstable sst) {
|
||||
// Skip earlier replacement of exhausted sstables if compaction works with only single-fragment runs,
|
||||
// meaning incremental compaction is disabled for this compaction.
|
||||
if (!_contains_multi_fragment_runs) {
|
||||
if (!enable_garbage_collected_sstable_writer()) {
|
||||
return;
|
||||
}
|
||||
// Replace exhausted sstable(s), if any, by new one(s) in the column family.
|
||||
@@ -1242,6 +1249,10 @@ public:
|
||||
};
|
||||
}
|
||||
|
||||
bool use_interposer_consumer() const override {
|
||||
return true;
|
||||
}
|
||||
|
||||
void report_start(const sstring& formatted_msg) const override {
|
||||
clogger.info("Resharding {}", formatted_msg);
|
||||
}
|
||||
@@ -1333,7 +1344,7 @@ compact_sstables(sstables::compaction_descriptor descriptor, column_family& cf)
|
||||
cf.schema()->ks_name(), cf.schema()->cf_name()));
|
||||
}
|
||||
auto c = make_compaction(cf, std::move(descriptor));
|
||||
if (c->contains_multi_fragment_runs()) {
|
||||
if (c->enable_garbage_collected_sstable_writer()) {
|
||||
auto gc_writer = c->make_garbage_collected_sstable_writer();
|
||||
return compaction::run(std::move(c), std::move(gc_writer));
|
||||
}
|
||||
|
||||
@@ -1080,6 +1080,10 @@ reader_consumer compaction_strategy::make_interposer_consumer(const mutation_sou
|
||||
return _compaction_strategy_impl->make_interposer_consumer(ms_meta, std::move(end_consumer));
|
||||
}
|
||||
|
||||
bool compaction_strategy::use_interposer_consumer() const {
|
||||
return _compaction_strategy_impl->use_interposer_consumer();
|
||||
}
|
||||
|
||||
compaction_strategy make_compaction_strategy(compaction_strategy_type strategy, const std::map<sstring, sstring>& options) {
|
||||
::shared_ptr<compaction_strategy_impl> impl;
|
||||
|
||||
|
||||
@@ -99,5 +99,9 @@ public:
|
||||
virtual uint64_t adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate);
|
||||
|
||||
virtual reader_consumer make_interposer_consumer(const mutation_source_metadata& ms_meta, reader_consumer end_consumer);
|
||||
|
||||
virtual bool use_interposer_consumer() const {
|
||||
return false;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@@ -347,6 +347,10 @@ public:
|
||||
virtual uint64_t adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate) override;
|
||||
|
||||
virtual reader_consumer make_interposer_consumer(const mutation_source_metadata& ms_meta, reader_consumer end_consumer) override;
|
||||
|
||||
virtual bool use_interposer_consumer() const override {
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -5772,3 +5772,89 @@ SEASTAR_TEST_CASE(autocompaction_control_test) {
|
||||
cm.stop().wait();
|
||||
});
|
||||
}
|
||||
|
||||
//
|
||||
// Test that https://github.com/scylladb/scylla/issues/6472 is gone
|
||||
//
|
||||
SEASTAR_TEST_CASE(test_bug_6472) {
|
||||
return test_setup::do_with_tmp_directory([] (test_env& env, sstring tmpdir_path) {
|
||||
auto builder = schema_builder("tests", "test_bug_6472")
|
||||
.with_column("id", utf8_type, column_kind::partition_key)
|
||||
.with_column("cl", int32_type, column_kind::clustering_key)
|
||||
.with_column("value", int32_type);
|
||||
builder.set_compaction_strategy(sstables::compaction_strategy_type::time_window);
|
||||
std::map<sstring, sstring> opts = {
|
||||
{ time_window_compaction_strategy_options::COMPACTION_WINDOW_UNIT_KEY, "HOURS" },
|
||||
{ time_window_compaction_strategy_options::COMPACTION_WINDOW_SIZE_KEY, "1" },
|
||||
};
|
||||
builder.set_compaction_strategy_options(opts);
|
||||
builder.set_gc_grace_seconds(0);
|
||||
auto s = builder.build();
|
||||
|
||||
auto sst_gen = [&env, s, tmpdir_path, gen = make_lw_shared<unsigned>(1)] () mutable {
|
||||
return env.make_sstable(s, tmpdir_path, (*gen)++, la, big);
|
||||
};
|
||||
|
||||
auto next_timestamp = [] (auto step) {
|
||||
using namespace std::chrono;
|
||||
return (gc_clock::now().time_since_epoch() - duration_cast<microseconds>(step)).count();
|
||||
};
|
||||
|
||||
auto tokens = token_generation_for_shard(1, this_shard_id(), test_db_config.murmur3_partitioner_ignore_msb_bits(), smp::count);
|
||||
|
||||
auto make_expiring_cell = [&] (std::chrono::hours step) {
|
||||
static thread_local int32_t value = 1;
|
||||
|
||||
auto key_str = tokens[0].first;
|
||||
auto key = partition_key::from_exploded(*s, {to_bytes(key_str)});
|
||||
|
||||
mutation m(s, key);
|
||||
auto c_key = clustering_key::from_exploded(*s, {int32_type->decompose(value++)});
|
||||
m.set_clustered_cell(c_key, bytes("value"), data_value(int32_t(value)), next_timestamp(step), gc_clock::duration(step + 5s));
|
||||
return m;
|
||||
};
|
||||
|
||||
auto cm = make_lw_shared<compaction_manager>();
|
||||
column_family::config cfg = column_family_test_config();
|
||||
cfg.datadir = tmpdir_path;
|
||||
cfg.enable_disk_writes = true;
|
||||
cfg.enable_commitlog = false;
|
||||
cfg.enable_cache = false;
|
||||
cfg.enable_incremental_backups = false;
|
||||
reader_concurrency_semaphore sem = reader_concurrency_semaphore(reader_concurrency_semaphore::no_limits{});
|
||||
cfg.read_concurrency_semaphore = &sem;
|
||||
auto tracker = make_lw_shared<cache_tracker>();
|
||||
cell_locker_stats cl_stats;
|
||||
auto cf = make_lw_shared<column_family>(s, cfg, column_family::no_commitlog(), *cm, cl_stats, *tracker);
|
||||
cf->mark_ready_for_writes();
|
||||
cf->start();
|
||||
|
||||
// Make 100 expiring cells which belong to different time windows
|
||||
std::vector<mutation> muts;
|
||||
muts.reserve(101);
|
||||
for (auto i = 1; i < 101; i++) {
|
||||
muts.push_back(make_expiring_cell(std::chrono::hours(i)));
|
||||
}
|
||||
muts.push_back(make_expiring_cell(std::chrono::hours(110)));
|
||||
|
||||
//
|
||||
// Reproduce issue 6472 by making an input set which causes both interposer and GC writer to be enabled
|
||||
//
|
||||
std::vector<shared_sstable> sstables_spanning_many_windows = {
|
||||
make_sstable_containing(sst_gen, muts),
|
||||
make_sstable_containing(sst_gen, muts),
|
||||
};
|
||||
utils::UUID run_id = utils::make_random_uuid();
|
||||
for (auto& sst : sstables_spanning_many_windows) {
|
||||
sstables::test(sst).set_run_identifier(run_id);
|
||||
}
|
||||
|
||||
// Make sure everything we wanted expired is expired by now.
|
||||
forward_jump_clocks(std::chrono::hours(101));
|
||||
|
||||
auto ret = compact_sstables(sstables::compaction_descriptor(sstables_spanning_many_windows, default_priority_class()),
|
||||
*cf, sst_gen, replacer_fn_no_op()).get0();
|
||||
BOOST_REQUIRE(ret.new_sstables.size() == 1);
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user