Merge 'Reapply: "disable_auto_compaction: stop ongoing compactions"' from Eliran Sinvani

Reapply: "disable_auto_compaction: stop ongoing compactions"
This is a reapplication of a former commit
4affa801a5 which was reverted by
8e8dc2c930.
This commit is a fixed version of the original where a call to the
compaction_manager constructor accidentally issued (`compaction_manager()`)
instead a call to retrieve a compaction manager reference
(`get_compaction_manager()`), we don't use this function because it
doesn't exist anymore - it existed at the time the patch was written
bu was removed in 9066224cf4 later on,
instead, we just use the private table member _compaction_manager which refs
the compaction manager.

The explanation for the bad effect is probably that a `this` pointer
capture down the call chain, resulted in a use after free which had
an unknown effect on the system. (memory corruption at startup).

Test: unit (dev,debug)
      write performance test as the one used to find the bug.
A screenshot of the performance test can be found at
https://github.com/scylladb/scylla/issues/10146/#issuecomment-1129578381

Fixes https://github.com/scylladb/scylla/issues/9313
Refs https://github.com/scylladb/scylla/issues/10146

For completeness, the original commit message was:
The api call disables new regular compaction jobs from starting
but it doesn't wait for ongoing compaction to stop and so it's
much less useful.

Returning after stopping regular compaction jobs and waiting
for them to stop guarantees that no regular compactions job are
running when nodetool disableautocompaction returns successfully.

Signed-off-by: Eliran Sinvani <eliransin@scylladb.com>

Closes #10597

* github.com:scylladb/scylla:
  compaction_manager: Make invoking the empty constructor more explicit
  Reapply: "disable_auto_compaction: stop ongoing compactions"
This commit is contained in:
Avi Kivity
2022-05-18 18:33:12 +03:00
8 changed files with 30 additions and 24 deletions

View File

@@ -342,11 +342,17 @@ private:
// Propagate replacement of sstables to all ongoing compaction of a given table
void propagate_replacement(replica::table* t, const std::vector<sstables::shared_sstable>& removed, const std::vector<sstables::shared_sstable>& added);
// This constructor is suposed to only be used for testing so lets be more explicit
// about invoking it. Ref #10146
compaction_manager();
public:
compaction_manager(compaction_scheduling_group csg, maintenance_scheduling_group msg, size_t available_memory, abort_source& as);
compaction_manager(compaction_scheduling_group csg, maintenance_scheduling_group msg, size_t available_memory, uint64_t shares, abort_source& as);
compaction_manager();
~compaction_manager();
class for_testing_tag{};
// An inline constructor for testing
compaction_manager(for_testing_tag) : compaction_manager() {}
void register_metrics();

View File

@@ -2170,7 +2170,7 @@ std::chrono::milliseconds table::get_coordinator_read_latency_percentile(double
void
table::enable_auto_compaction() {
// XXX: unmute backlog. turn table backlog back on.
// FIXME: unmute backlog. turn table backlog back on.
// see table::disable_auto_compaction() notes.
_compaction_disabled_by_user = false;
trigger_compaction();
@@ -2178,7 +2178,7 @@ table::enable_auto_compaction() {
future<>
table::disable_auto_compaction() {
// XXX: mute backlog. When we disable background compactions
// FIXME: mute backlog. When we disable background compactions
// for the table, we must also disable current backlog of the
// table compaction strategy that contributes to the scheduling
// group resources prioritization.
@@ -2205,8 +2205,9 @@ table::disable_auto_compaction() {
// - it will break computation of major compaction descriptor
// for new submissions
_compaction_disabled_by_user = true;
// FIXME: stop ongoing compactions
return make_ready_future<>();
return with_gate(_async_gate, [this] {
return _compaction_manager.stop_ongoing_compactions("disable auto-compaction", this, sstables::compaction_type::Compaction);
});
}
flat_mutation_reader_v2

View File

@@ -93,7 +93,7 @@ with_column_family(schema_ptr s, replica::column_family::config cfg, noncopyable
auto tracker = make_lw_shared<cache_tracker>();
auto dir = tmpdir();
cfg.datadir = dir.path().string();
auto cm = make_lw_shared<compaction_manager>();
auto cm = make_lw_shared<compaction_manager>(compaction_manager::for_testing_tag{});
auto cl_stats = make_lw_shared<cell_locker_stats>();
auto cf = make_lw_shared<replica::column_family>(s, cfg, replica::column_family::no_commitlog(), *cm, *cl_stats, *tracker);
cf->mark_ready_for_writes();

View File

@@ -3020,7 +3020,7 @@ static std::vector<sstables::shared_sstable> open_sstables(test_env& env, schema
static flat_mutation_reader compacted_sstable_reader(test_env& env, schema_ptr s,
sstring table_name, std::vector<unsigned long> generations) {
auto cm = make_lw_shared<compaction_manager>();
auto cm = make_lw_shared<compaction_manager>(compaction_manager::for_testing_tag{});
auto cl_stats = make_lw_shared<cell_locker_stats>();
auto tracker = make_lw_shared<cache_tracker>();
auto cf = make_lw_shared<replica::column_family>(s, column_family_test_config(env.manager(), env.semaphore()), replica::column_family::no_commitlog(), *cm, *cl_stats, *tracker);

View File

@@ -190,7 +190,7 @@ SEASTAR_TEST_CASE(compaction_manager_basic_test) {
auto s = make_shared_schema({}, some_keyspace, some_column_family,
{{"p1", utf8_type}}, {{"c1", utf8_type}}, {{"r1", int32_type}}, {}, utf8_type);
auto cm = make_lw_shared<compaction_manager>();
auto cm = make_lw_shared<compaction_manager>(compaction_manager::for_testing_tag{});
cm->enable();
auto stop_cm = defer([&cm] {
cm->stop().get();
@@ -269,7 +269,7 @@ SEASTAR_TEST_CASE(compact) {
builder.set_comment("Example table for compaction");
builder.set_gc_grace_seconds(std::numeric_limits<int32_t>::max());
auto s = builder.build();
auto cm = make_lw_shared<compaction_manager>();
auto cm = make_lw_shared<compaction_manager>(compaction_manager::for_testing_tag{});
auto cl_stats = make_lw_shared<cell_locker_stats>();
auto tracker = make_lw_shared<cache_tracker>();
auto cf = make_lw_shared<replica::column_family>(s, column_family_test_config(env.manager(), env.semaphore()), replica::column_family::no_commitlog(), *cm, *cl_stats, *tracker);
@@ -3248,7 +3248,7 @@ SEASTAR_TEST_CASE(partial_sstable_run_filtered_out_test) {
auto tmp = tmpdir();
auto cm = make_lw_shared<compaction_manager>();
auto cm = make_lw_shared<compaction_manager>(compaction_manager::for_testing_tag{});
auto stop_cm = defer([cm] {
cm->stop().get();
});
@@ -3518,7 +3518,7 @@ SEASTAR_TEST_CASE(incremental_compaction_data_resurrection_test) {
// make mut1_deletion gc'able.
forward_jump_clocks(std::chrono::seconds(ttl));
auto cm = make_lw_shared<compaction_manager>();
auto cm = make_lw_shared<compaction_manager>(compaction_manager::for_testing_tag{});
replica::column_family::config cfg = column_family_test_config(env.manager(), env.semaphore());
cfg.datadir = tmp.path().string();
cfg.enable_disk_writes = false;
@@ -3626,7 +3626,7 @@ SEASTAR_TEST_CASE(twcs_major_compaction_test) {
auto mut3 = make_insert(0ms);
auto mut4 = make_insert(1ms);
auto cm = make_lw_shared<compaction_manager>();
auto cm = make_lw_shared<compaction_manager>(compaction_manager::for_testing_tag{});
replica::column_family::config cfg = column_family_test_config(env.manager(), env.semaphore());
cfg.datadir = tmp.path().string();
cfg.enable_disk_writes = true;
@@ -3655,7 +3655,7 @@ SEASTAR_TEST_CASE(autocompaction_control_test) {
cell_locker_stats cl_stats;
cache_tracker tracker;
compaction_manager cm;
compaction_manager cm(compaction_manager::for_testing_tag{});
auto stop_compaction_manager = deferred_stop(cm);
cm.enable();
@@ -3765,7 +3765,7 @@ SEASTAR_TEST_CASE(test_bug_6472) {
return m;
};
auto cm = make_lw_shared<compaction_manager>();
auto cm = make_lw_shared<compaction_manager>(compaction_manager::for_testing_tag{});
replica::column_family::config cfg = column_family_test_config(env.manager(), env.semaphore());
cfg.datadir = tmpdir_path;
cfg.enable_disk_writes = true;
@@ -3897,7 +3897,7 @@ SEASTAR_TEST_CASE(test_twcs_partition_estimate) {
return make_sstable_containing(sst_gen, {m});
};
auto cm = make_lw_shared<compaction_manager>();
auto cm = make_lw_shared<compaction_manager>(compaction_manager::for_testing_tag{});
replica::column_family::config cfg = column_family_test_config(env.manager(), env.semaphore());
cfg.datadir = tmpdir_path;
cfg.enable_disk_writes = true;
@@ -4026,7 +4026,7 @@ SEASTAR_TEST_CASE(test_twcs_interposer_on_memtable_flush) {
};
auto tmp = tmpdir();
auto cm = make_lw_shared<compaction_manager>();
auto cm = make_lw_shared<compaction_manager>(compaction_manager::for_testing_tag{});
replica::column_family::config cfg = column_family_test_config(env.manager(), env.semaphore());
cfg.datadir = tmp.path().string();
cfg.enable_disk_writes = true;
@@ -4133,7 +4133,7 @@ SEASTAR_TEST_CASE(test_offstrategy_sstable_compaction) {
auto mut = mutation(s, pk);
ss.add_row(mut, ss.make_ckey(0), "val");
auto cm = make_lw_shared<compaction_manager>();
auto cm = make_lw_shared<compaction_manager>(compaction_manager::for_testing_tag{});
auto stop_cm = defer([cm] {
cm->stop().get();
});
@@ -4420,7 +4420,7 @@ SEASTAR_TEST_CASE(test_twcs_single_key_reader_filtering) {
auto sst2 = make_sstable_containing(sst_gen, {make_row(0, 1)});
auto dkey = sst1->get_first_decorated_key();
auto cm = make_lw_shared<compaction_manager>();
auto cm = make_lw_shared<compaction_manager>(compaction_manager::for_testing_tag{});
replica::column_family::config cfg = column_family_test_config(env.manager(), env.semaphore());
replica::cf_stats cf_stats{0};
cfg.cf_stats = &cf_stats;
@@ -4488,7 +4488,7 @@ SEASTAR_TEST_CASE(max_ongoing_compaction_test) {
return builder.build();
};
auto cm = make_lw_shared<compaction_manager>();
auto cm = make_lw_shared<compaction_manager>(compaction_manager::for_testing_tag{});
cm->enable();
auto stop_cm = defer([&cm] {
cm->stop().get();
@@ -4717,7 +4717,7 @@ SEASTAR_TEST_CASE(twcs_single_key_reader_through_compound_set_test) {
};
auto tmp = tmpdir();
auto cm = make_lw_shared<compaction_manager>();
auto cm = make_lw_shared<compaction_manager>(compaction_manager::for_testing_tag{});
replica::column_family::config cfg = column_family_test_config(env.manager(), env.semaphore());
replica::cf_stats cf_stats{0};
cfg.cf_stats = &cf_stats;

View File

@@ -45,7 +45,7 @@ void run_sstable_resharding_test() {
for (const auto version : writable_sstable_versions) {
auto tmp = tmpdir();
auto s = get_schema();
auto cm = make_lw_shared<compaction_manager>();
auto cm = make_lw_shared<compaction_manager>(compaction_manager::for_testing_tag{});
auto cl_stats = make_lw_shared<cell_locker_stats>();
auto cf = make_lw_shared<replica::column_family>(s, column_family_test_config(env.manager(), env.semaphore()), replica::column_family::no_commitlog(), *cm, *cl_stats, tracker);
cf->mark_ready_for_writes();

View File

@@ -48,9 +48,8 @@ struct column_family_for_tests {
replica::cf_stats cf_stats{0};
replica::column_family::config cfg;
cell_locker_stats cl_stats;
compaction_manager cm;
compaction_manager cm{compaction_manager::for_testing_tag{}};
lw_shared_ptr<replica::column_family> cf;
data();
};
lw_shared_ptr<data> _data;

View File

@@ -169,7 +169,7 @@ public:
cache_tracker tracker;
cell_locker_stats cl_stats;
auto cm = make_lw_shared<compaction_manager>();
auto cm = make_lw_shared<compaction_manager>(compaction_manager::for_testing_tag{});
auto cf = make_lw_shared<replica::column_family>(s, column_family_test_config(env.manager(), env.semaphore()), replica::column_family::no_commitlog(), *cm, cl_stats, tracker);
auto start = perf_sstable_test_env::now();