From 45a4f8d1fa8026c8decf2d2cdd2fb38d6d51486c Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Mon, 11 Jul 2022 11:18:20 -0300 Subject: [PATCH 01/30] compaction_manager: move task ctor into source That's to be able to get table_state from table in subsequent patch, as table only has a forward declaration to it in compaction_manager.hh to avoid including database.hh. Once everything is moved to table_state, then ctor can be moved back into header. Signed-off-by: Raphael S. Carvalho --- compaction/compaction_manager.cc | 9 +++++++++ compaction/compaction_manager.hh | 9 +-------- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index fcecb85363..c3be1edd1d 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -261,6 +261,15 @@ compaction_manager::compaction_state& compaction_manager::get_compaction_state(r } } +compaction_manager::task::task(compaction_manager& mgr, replica::table* t, sstables::compaction_type type, sstring desc) + : _cm(mgr) + , _compacting_table(t) + , _compaction_state(_cm.get_compaction_state(t)) + , _type(type) + , _gate_holder(_compaction_state.gate.hold()) + , _description(std::move(desc)) +{} + future<> compaction_manager::perform_task(shared_ptr task) { _tasks.push_back(task); auto unregister_task = defer([this, task] { diff --git a/compaction/compaction_manager.hh b/compaction/compaction_manager.hh index 467989bc85..32f8391d91 100644 --- a/compaction/compaction_manager.hh +++ b/compaction/compaction_manager.hh @@ -112,14 +112,7 @@ public: sstring _description; public: - explicit task(compaction_manager& mgr, replica::table* t, sstables::compaction_type type, sstring desc) - : _cm(mgr) - , _compacting_table(t) - , _compaction_state(_cm.get_compaction_state(t)) - , _type(type) - , _gate_holder(_compaction_state.gate.hold()) - , _description(std::move(desc)) - {} + explicit task(compaction_manager& mgr, replica::table* t, sstables::compaction_type type, sstring desc); task(task&&) = delete; task(const task&) = delete; From b47ed727c7fe4cf3a178d78c712ff2ddc4d30074 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Mon, 11 Jul 2022 11:42:09 -0300 Subject: [PATCH 02/30] compaction_manager: switch to table_state for mapping of compaction_state manager stores a state for each table. As we're transitioning towards table_state, the mapping of a table to compaction state will now use table_state ptr as key. table_state ptr is stable and its lifetime is the same as table. we're temporarily adding a ptr to compaction_state, as there's lots of dependency on replica::table, but we'll get rid of it once we complete the transition. Signed-off-by: Raphael S. Carvalho --- compaction/compaction_manager.cc | 18 +++++++++++------- compaction/compaction_manager.hh | 11 ++++++----- 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index c3be1edd1d..d4777f98d7 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -252,7 +252,7 @@ private: virtual void replace_sstables(std::vector old_ssts, std::vector new_ssts) override {} }; -compaction_manager::compaction_state& compaction_manager::get_compaction_state(replica::table* t) { +compaction_manager::compaction_state& compaction_manager::get_compaction_state(compaction::table_state* t) { try { return _compaction_state.at(t); } catch (std::out_of_range&) { @@ -264,7 +264,7 @@ compaction_manager::compaction_state& compaction_manager::get_compaction_state(r compaction_manager::task::task(compaction_manager& mgr, replica::table* t, sstables::compaction_type type, sstring desc) : _cm(mgr) , _compacting_table(t) - , _compaction_state(_cm.get_compaction_state(t)) + , _compaction_state(_cm.get_compaction_state(&t->as_table_state())) , _type(type) , _gate_holder(_compaction_state.gate.hold()) , _description(std::move(desc)) @@ -434,7 +434,7 @@ future<> compaction_manager::run_custom_job(replica::table* t, sstables::compact compaction_manager::compaction_reenabler::compaction_reenabler(compaction_manager& cm, replica::table* t) : _cm(cm) , _table(t) - , _compaction_state(cm.get_compaction_state(_table)) + , _compaction_state(cm.get_compaction_state(&_table->as_table_state())) , _holder(_compaction_state.gate.hold()) { _compaction_state.compaction_disabled_counter++; @@ -709,7 +709,7 @@ void compaction_manager::enable() { std::function compaction_manager::compaction_submission_callback() { return [this] () mutable { for (auto& e: _compaction_state) { - submit(e.first); + submit(e.second.t); } }; } @@ -831,7 +831,7 @@ void compaction_manager::do_stop() noexcept { } inline bool compaction_manager::can_proceed(replica::table* t) const { - return (_state == state::enabled) && _compaction_state.contains(t) && !_compaction_state.at(t).compaction_disabled(); + return (_state == state::enabled) && _compaction_state.contains(&t->as_table_state()) && !_compaction_state.at(&t->as_table_state()).compaction_disabled(); } inline bool compaction_manager::task::can_proceed(throw_if_stopping do_throw_if_stopping) const { @@ -1432,7 +1432,7 @@ future<> compaction_manager::perform_sstable_scrub(replica::table* t, sstables:: } void compaction_manager::add(replica::table* t) { - auto [_, inserted] = _compaction_state.insert({t, compaction_state{}}); + auto [_, inserted] = _compaction_state.insert({&t->as_table_state(), compaction_state{.t = t}}); if (!inserted) { auto s = t->schema(); on_internal_error(cmlog, format("compaction_state for table {}.{} [{}] already exists", s->ks_name(), s->cf_name(), fmt::ptr(t))); @@ -1440,7 +1440,7 @@ void compaction_manager::add(replica::table* t) { } future<> compaction_manager::remove(replica::table* t) { - auto handle = _compaction_state.extract(t); + auto handle = _compaction_state.extract(&t->as_table_state()); if (!handle.empty()) { auto& c_state = handle.mapped(); @@ -1491,6 +1491,10 @@ const std::vector compaction_manager::get_compactions }) | boost::adaptors::transformed(to_info)); } +bool compaction_manager::compaction_disabled(replica::table* t) const { + return _compaction_state.contains(&t->as_table_state()) && _compaction_state.at(&t->as_table_state()).compaction_disabled(); +} + future<> compaction_manager::stop_compaction(sstring type, replica::table* table) { sstables::compaction_type target_type; try { diff --git a/compaction/compaction_manager.hh b/compaction/compaction_manager.hh index 32f8391d91..9a39eefe9e 100644 --- a/compaction/compaction_manager.hh +++ b/compaction/compaction_manager.hh @@ -60,6 +60,9 @@ public: }; private: struct compaction_state { + // FIXME: remove once we complete the transition to table_state. + replica::table* t = nullptr; + // Used both by compaction tasks that refer to the compaction_state // and by any function running under run_with_compaction_disabled(). seastar::gate gate; @@ -262,7 +265,7 @@ private: // weight is value assigned to a compaction job that is log base N of total size of all input sstables. std::unordered_set _weight_tracker; - std::unordered_map _compaction_state; + std::unordered_map _compaction_state; // Purpose is to serialize all maintenance (non regular) compaction activity to reduce aggressiveness and space requirement. // If the operation must be serialized with regular, then the per-table write lock must be taken. @@ -321,7 +324,7 @@ private: // gets the table's compaction state // throws std::out_of_range exception if not found. - compaction_state& get_compaction_state(replica::table* t); + compaction_state& get_compaction_state(compaction::table_state* t); // Return true if compaction manager is enabled and // table still exists and compaction is not disabled for the table. @@ -461,9 +464,7 @@ public: }); }; - bool compaction_disabled(replica::table* t) const { - return _compaction_state.contains(t) && _compaction_state.at(t).compaction_disabled(); - } + bool compaction_disabled(replica::table* t) const; // Stops ongoing compaction of a given type. future<> stop_compaction(sstring type, replica::table* table = nullptr); From ff9e9524e61e9d3066066ac3d5b38b801af9ef8b Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Mon, 11 Jul 2022 11:50:54 -0300 Subject: [PATCH 03/30] compaction_manager: make compaction_disabled() switch to table_state Signed-off-by: Raphael S. Carvalho --- compaction/compaction_manager.cc | 4 ++-- compaction/compaction_manager.hh | 2 +- replica/table.cc | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index d4777f98d7..38ea117bc7 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -1491,8 +1491,8 @@ const std::vector compaction_manager::get_compactions }) | boost::adaptors::transformed(to_info)); } -bool compaction_manager::compaction_disabled(replica::table* t) const { - return _compaction_state.contains(&t->as_table_state()) && _compaction_state.at(&t->as_table_state()).compaction_disabled(); +bool compaction_manager::compaction_disabled(compaction::table_state& t) const { + return _compaction_state.contains(&t) && _compaction_state.at(&t).compaction_disabled(); } future<> compaction_manager::stop_compaction(sstring type, replica::table* table) { diff --git a/compaction/compaction_manager.hh b/compaction/compaction_manager.hh index 9a39eefe9e..454963690d 100644 --- a/compaction/compaction_manager.hh +++ b/compaction/compaction_manager.hh @@ -464,7 +464,7 @@ public: }); }; - bool compaction_disabled(replica::table* t) const; + bool compaction_disabled(compaction::table_state& t) const; // Stops ongoing compaction of a given type. future<> stop_compaction(sstring type, replica::table* table = nullptr); diff --git a/replica/table.cc b/replica/table.cc index 172e42d383..2edff35a9c 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -1521,7 +1521,7 @@ future<> table::clear() { // NOTE: does not need to be futurized, but might eventually, depending on // if we implement notifications, whatnot. future table::discard_sstables(db_clock::time_point truncated_at) { - assert(_compaction_manager.compaction_disabled(this)); + assert(_compaction_manager.compaction_disabled(as_table_state())); struct pruner { column_family& cf; From e4d9cdf2847654e12780c95bc03931c3eb776394 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Sat, 16 Jul 2022 14:48:45 -0300 Subject: [PATCH 04/30] compaction_manager: make has_table_ongoing_compaction() switch to table_state Signed-off-by: Raphael S. Carvalho --- compaction/compaction_manager.cc | 8 +++++++- compaction/compaction_manager.hh | 6 +----- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index 38ea117bc7..e73fd77db9 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -161,7 +161,7 @@ unsigned compaction_manager::current_compaction_fan_in_threshold() const { bool compaction_manager::can_register_compaction(replica::table* t, int weight, unsigned fan_in) const { // Only one weight is allowed if parallel compaction is disabled. - if (!t->get_compaction_strategy().parallel_compaction() && has_table_ongoing_compaction(t)) { + if (!t->get_compaction_strategy().parallel_compaction() && has_table_ongoing_compaction(t->as_table_state())) { return false; } // Weightless compaction doesn't have to be serialized, and won't dillute overall efficiency. @@ -1491,6 +1491,12 @@ const std::vector compaction_manager::get_compactions }) | boost::adaptors::transformed(to_info)); } +bool compaction_manager::has_table_ongoing_compaction(const compaction::table_state& t) const { + return std::any_of(_tasks.begin(), _tasks.end(), [&t] (const shared_ptr& task) { + return &task->compacting_table()->as_table_state() == &t && task->compaction_running(); + }); +}; + bool compaction_manager::compaction_disabled(compaction::table_state& t) const { return _compaction_state.contains(&t) && _compaction_state.at(&t).compaction_disabled(); } diff --git a/compaction/compaction_manager.hh b/compaction/compaction_manager.hh index 454963690d..9d4e24f4ba 100644 --- a/compaction/compaction_manager.hh +++ b/compaction/compaction_manager.hh @@ -458,11 +458,7 @@ public: const std::vector get_compactions(replica::table* t = nullptr) const; // Returns true if table has an ongoing compaction, running on its behalf - bool has_table_ongoing_compaction(const replica::table* t) const { - return std::any_of(_tasks.begin(), _tasks.end(), [t] (const shared_ptr& task) { - return task->compacting_table() == t && task->compaction_running(); - }); - }; + bool has_table_ongoing_compaction(const compaction::table_state& t) const; bool compaction_disabled(compaction::table_state& t) const; From 23e21ed5bcc64fe69fff03aaab1a39fa01b14b71 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Mon, 11 Jul 2022 14:19:31 -0300 Subject: [PATCH 05/30] compaction: table_state: Add maintenance sstable set Needed for off-strategy compaction. Signed-off-by: Raphael S. Carvalho --- compaction/table_state.hh | 1 + replica/table.cc | 3 +++ test/boost/sstable_compaction_test.cc | 3 +++ 3 files changed, 7 insertions(+) diff --git a/compaction/table_state.hh b/compaction/table_state.hh index 6890735555..beca464034 100644 --- a/compaction/table_state.hh +++ b/compaction/table_state.hh @@ -29,6 +29,7 @@ public: virtual unsigned min_compaction_threshold() const noexcept = 0; virtual bool compaction_enforce_min_threshold() const noexcept = 0; virtual const sstables::sstable_set& main_sstable_set() const = 0; + virtual const sstables::sstable_set& maintenance_sstable_set() const = 0; virtual std::unordered_set fully_expired_sstables(const std::vector& sstables, gc_clock::time_point compaction_time) const = 0; virtual const std::vector& compacted_undeleted_sstables() const noexcept = 0; virtual sstables::compaction_strategy& get_compaction_strategy() const noexcept = 0; diff --git a/replica/table.cc b/replica/table.cc index 2edff35a9c..2447e8cc1e 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -2399,6 +2399,9 @@ public: const sstables::sstable_set& main_sstable_set() const override { return _t.get_sstable_set(); } + const sstables::sstable_set& maintenance_sstable_set() const override { + return _t.maintenance_sstable_set(); + } std::unordered_set fully_expired_sstables(const std::vector& sstables, gc_clock::time_point query_time) const override { return sstables::get_fully_expired_sstables(*this, sstables, query_time); } diff --git a/test/boost/sstable_compaction_test.cc b/test/boost/sstable_compaction_test.cc index 8ac1beee91..c59ce0e3b5 100644 --- a/test/boost/sstable_compaction_test.cc +++ b/test/boost/sstable_compaction_test.cc @@ -142,6 +142,9 @@ public: const sstables::sstable_set& main_sstable_set() const override { return _t->get_sstable_set(); } + const sstables::sstable_set& maintenance_sstable_set() const override { + return _t->maintenance_sstable_set(); + } std::unordered_set fully_expired_sstables(const std::vector& sstables, gc_clock::time_point query_time) const override { return sstables::get_fully_expired_sstables(_t->as_table_state(), sstables, query_time); } From cb05142d585ef706377be16197e05f9772dd7dbe Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Mon, 11 Jul 2022 13:14:49 -0300 Subject: [PATCH 06/30] compaction: Move table::in_strategy_sstables() and switch to table_state in_strategy_sstables() doesn't have to be implemented in table, as it's simply about main set with maintenance and staging files filtered out. Also, let's make it switch to table_state as part of ongoing work. Signed-off-by: Raphael S. Carvalho --- compaction/compaction_manager.cc | 9 +++++- compaction/compaction_manager.hh | 3 ++ replica/database.hh | 2 -- replica/table.cc | 8 ------ test/boost/database_test.cc | 4 +-- test/boost/sstable_compaction_test.cc | 40 +++++++++++++-------------- 6 files changed, 33 insertions(+), 33 deletions(-) diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index e73fd77db9..31f64609e3 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -195,6 +195,13 @@ void compaction_manager::deregister_weight(int weight) { reevaluate_postponed_compactions(); } +std::vector in_strategy_sstables(compaction::table_state& table_s) { + auto sstables = table_s.main_sstable_set().all(); + return boost::copy_range>(*sstables | boost::adaptors::filtered([] (const sstables::shared_sstable& sst) { + return sstables::is_eligible_for_compaction(sst); + })); +} + std::vector compaction_manager::get_candidates(const replica::table& t) { std::vector candidates; candidates.reserve(t.sstables_count()); @@ -206,7 +213,7 @@ std::vector compaction_manager::get_candidates(const r auto& cs = t.get_compaction_strategy(); // Filter out sstables that are being compacted. - for (auto& sst : t.in_strategy_sstables()) { + for (auto& sst : in_strategy_sstables(t.as_table_state())) { if (_compacting_sstables.contains(sst)) { continue; } diff --git a/compaction/compaction_manager.hh b/compaction/compaction_manager.hh index 9d4e24f4ba..3034006be0 100644 --- a/compaction/compaction_manager.hh +++ b/compaction/compaction_manager.hh @@ -487,5 +487,8 @@ public: bool needs_cleanup(const sstables::shared_sstable& sst, const dht::token_range_vector& owned_ranges, schema_ptr s); +// Return all sstables but those that are off-strategy like the ones in maintenance set and staging dir. +std::vector in_strategy_sstables(compaction::table_state& table_s); + std::ostream& operator<<(std::ostream& os, compaction_manager::task::state s); std::ostream& operator<<(std::ostream& os, const compaction_manager::task& task); diff --git a/replica/database.hh b/replica/database.hh index f04416eeb2..7f76253629 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -902,8 +902,6 @@ public: lw_shared_ptr get_sstables_including_compacted_undeleted() const; const std::vector& compacted_undeleted_sstables() const; std::vector select_sstables(const dht::partition_range& range) const; - // Return all sstables but those that are off-strategy like the ones in maintenance set and staging dir. - std::vector in_strategy_sstables() const; size_t sstables_count() const; std::vector sstable_count_per_level() const; int64_t get_unleveled_sstables() const; diff --git a/replica/table.cc b/replica/table.cc index 2447e8cc1e..dc9ab57cd1 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -1118,14 +1118,6 @@ std::vector table::select_sstables(const dht::partitio return _sstables->select(range); } -std::vector table::in_strategy_sstables() const { - auto sstables = _main_sstables->all(); - return boost::copy_range>(*sstables - | boost::adaptors::filtered([this] (auto& sst) { - return sstables::is_eligible_for_compaction(sst); - })); -} - // Gets the list of all sstables in the column family, including ones that are // not used for active queries because they have already been compacted, but are // waiting for delete_atomically() to return. diff --git a/test/boost/database_test.cc b/test/boost/database_test.cc index 4179e6e19b..368f7dcc54 100644 --- a/test/boost/database_test.cc +++ b/test/boost/database_test.cc @@ -975,7 +975,7 @@ SEASTAR_TEST_CASE(populate_from_quarantine_works) { for (auto i = 0; i < smp::count && !found; i++) { found = co_await db.invoke_on((shard + i) % smp::count, [] (replica::database& db) -> future { auto& cf = db.find_column_family("ks", "cf"); - auto sstables = cf.in_strategy_sstables(); + auto sstables = in_strategy_sstables(cf.as_table_state()); if (sstables.empty()) { co_return false; } @@ -1024,7 +1024,7 @@ SEASTAR_TEST_CASE(snapshot_with_quarantine_works) { for (auto i = 0; i < smp::count; i++) { co_await db.invoke_on((shard + i) % smp::count, [&] (replica::database& db) -> future<> { auto& cf = db.find_column_family("ks", "cf"); - auto sstables = cf.in_strategy_sstables(); + auto sstables = in_strategy_sstables(cf.as_table_state()); if (sstables.empty()) { co_return; } diff --git a/test/boost/sstable_compaction_test.cc b/test/boost/sstable_compaction_test.cc index c59ce0e3b5..b9e1afd148 100644 --- a/test/boost/sstable_compaction_test.cc +++ b/test/boost/sstable_compaction_test.cc @@ -2286,8 +2286,8 @@ SEASTAR_TEST_CASE(sstable_scrub_validate_mode_test) { table->add_sstable_and_update_cache(sst).get(); - BOOST_REQUIRE(table->in_strategy_sstables().size() == 1); - BOOST_REQUIRE(table->in_strategy_sstables().front() == sst); + BOOST_REQUIRE(in_strategy_sstables(table->as_table_state()).size() == 1); + BOOST_REQUIRE(in_strategy_sstables(table->as_table_state()).front() == sst); auto verify_fragments = [&] (sstables::shared_sstable sst, const std::vector& mfs) { auto r = assert_that(sst->as_mutation_source().make_reader_v2(schema, env.make_reader_permit())); @@ -2312,7 +2312,7 @@ SEASTAR_TEST_CASE(sstable_scrub_validate_mode_test) { compaction_manager.perform_sstable_scrub(table.get(), opts).get(); BOOST_REQUIRE(sst->is_quarantined()); - BOOST_REQUIRE(table->in_strategy_sstables().empty()); + BOOST_REQUIRE(in_strategy_sstables(table->as_table_state()).empty()); verify_fragments(sst, corrupt_fragments); }); }, test_cfg); @@ -2485,8 +2485,8 @@ SEASTAR_TEST_CASE(sstable_scrub_skip_mode_test) { table->add_sstable_and_update_cache(sst).get(); - BOOST_REQUIRE(table->in_strategy_sstables().size() == 1); - BOOST_REQUIRE(table->in_strategy_sstables().front() == sst); + BOOST_REQUIRE(in_strategy_sstables(table->as_table_state()).size() == 1); + BOOST_REQUIRE(in_strategy_sstables(table->as_table_state()).front() == sst); auto verify_fragments = [&] (sstables::shared_sstable sst, const std::vector& mfs) { auto r = assert_that(sst->as_mutation_source().make_reader_v2(schema, permit)); @@ -2509,7 +2509,7 @@ SEASTAR_TEST_CASE(sstable_scrub_skip_mode_test) { opts.operation_mode = sstables::compaction_type_options::scrub::mode::abort; compaction_manager.perform_sstable_scrub(table.get(), opts).get(); - BOOST_REQUIRE(table->in_strategy_sstables().size() == 1); + BOOST_REQUIRE(in_strategy_sstables(table->as_table_state()).size() == 1); verify_fragments(sst, corrupt_fragments); testlog.info("Scrub in skip mode"); @@ -2518,9 +2518,9 @@ SEASTAR_TEST_CASE(sstable_scrub_skip_mode_test) { opts.operation_mode = sstables::compaction_type_options::scrub::mode::skip; compaction_manager.perform_sstable_scrub(table.get(), opts).get(); - BOOST_REQUIRE(table->in_strategy_sstables().size() == 1); - BOOST_REQUIRE(table->in_strategy_sstables().front() != sst); - verify_fragments(table->in_strategy_sstables().front(), scrubbed_fragments); + BOOST_REQUIRE(in_strategy_sstables(table->as_table_state()).size() == 1); + BOOST_REQUIRE(in_strategy_sstables(table->as_table_state()).front() != sst); + verify_fragments(in_strategy_sstables(table->as_table_state()).front(), scrubbed_fragments); }); }, test_cfg); } @@ -2582,8 +2582,8 @@ SEASTAR_TEST_CASE(sstable_scrub_segregate_mode_test) { table->add_sstable_and_update_cache(sst).get(); - BOOST_REQUIRE(table->in_strategy_sstables().size() == 1); - BOOST_REQUIRE(table->in_strategy_sstables().front() == sst); + BOOST_REQUIRE(in_strategy_sstables(table->as_table_state()).size() == 1); + BOOST_REQUIRE(in_strategy_sstables(table->as_table_state()).front() == sst); auto verify_fragments = [&] (sstables::shared_sstable sst, const std::vector& mfs) { auto r = assert_that(sst->as_mutation_source().make_reader_v2(schema, env.make_reader_permit())); @@ -2606,7 +2606,7 @@ SEASTAR_TEST_CASE(sstable_scrub_segregate_mode_test) { opts.operation_mode = sstables::compaction_type_options::scrub::mode::abort; compaction_manager.perform_sstable_scrub(table.get(), opts).get(); - BOOST_REQUIRE(table->in_strategy_sstables().size() == 1); + BOOST_REQUIRE(in_strategy_sstables(table->as_table_state()).size() == 1); verify_fragments(sst, corrupt_fragments); testlog.info("Scrub in segregate mode"); @@ -2615,8 +2615,8 @@ SEASTAR_TEST_CASE(sstable_scrub_segregate_mode_test) { opts.operation_mode = sstables::compaction_type_options::scrub::mode::segregate; compaction_manager.perform_sstable_scrub(table.get(), opts).get(); - testlog.info("Scrub resulted in {} sstables", table->in_strategy_sstables().size()); - BOOST_REQUIRE(table->in_strategy_sstables().size() > 1); + testlog.info("Scrub resulted in {} sstables", in_strategy_sstables(table->as_table_state()).size()); + BOOST_REQUIRE(in_strategy_sstables(table->as_table_state()).size() > 1); { auto sst_reader = assert_that(table->as_mutation_source().make_reader_v2(schema, env.make_reader_permit())); auto mt_reader = scrubbed_mt->as_data_source().make_reader_v2(schema, env.make_reader_permit()); @@ -2694,8 +2694,8 @@ SEASTAR_TEST_CASE(sstable_scrub_quarantine_mode_test) { table->add_sstable_and_update_cache(sst).get(); - BOOST_REQUIRE(table->in_strategy_sstables().size() == 1); - BOOST_REQUIRE(table->in_strategy_sstables().front() == sst); + BOOST_REQUIRE(in_strategy_sstables(table->as_table_state()).size() == 1); + BOOST_REQUIRE(in_strategy_sstables(table->as_table_state()).front() == sst); auto verify_fragments = [&] (sstables::shared_sstable sst, const std::vector& mfs) { auto r = assert_that(sst->as_mutation_source().make_reader_v2(schema, env.make_reader_permit())); @@ -2718,7 +2718,7 @@ SEASTAR_TEST_CASE(sstable_scrub_quarantine_mode_test) { opts.operation_mode = sstables::compaction_type_options::scrub::mode::validate; compaction_manager.perform_sstable_scrub(table.get(), opts).get(); - BOOST_REQUIRE(table->in_strategy_sstables().empty()); + BOOST_REQUIRE(in_strategy_sstables(table->as_table_state()).empty()); BOOST_REQUIRE(sst->is_quarantined()); verify_fragments(sst, corrupt_fragments); @@ -2733,8 +2733,8 @@ SEASTAR_TEST_CASE(sstable_scrub_quarantine_mode_test) { case sstables::compaction_type_options::scrub::quarantine_mode::include: case sstables::compaction_type_options::scrub::quarantine_mode::only: // The sstable should be found and scrubbed when scrub::quarantine_mode is scrub::quarantine_mode::{include,only} - testlog.info("Scrub resulted in {} sstables", table->in_strategy_sstables().size()); - BOOST_REQUIRE(table->in_strategy_sstables().size() > 1); + testlog.info("Scrub resulted in {} sstables", in_strategy_sstables(table->as_table_state()).size()); + BOOST_REQUIRE(in_strategy_sstables(table->as_table_state()).size() > 1); { auto sst_reader = assert_that(table->as_mutation_source().make_reader_v2(schema, env.make_reader_permit())); auto mt_reader = scrubbed_mt->as_data_source().make_reader_v2(schema, env.make_reader_permit()); @@ -2748,7 +2748,7 @@ SEASTAR_TEST_CASE(sstable_scrub_quarantine_mode_test) { break; case sstables::compaction_type_options::scrub::quarantine_mode::exclude: // The sstable should not be found when scrub::quarantine_mode is scrub::quarantine_mode::exclude - BOOST_REQUIRE(table->in_strategy_sstables().empty()); + BOOST_REQUIRE(in_strategy_sstables(table->as_table_state()).empty()); BOOST_REQUIRE(sst->is_quarantined()); verify_fragments(sst, corrupt_fragments); break; From b5417096e2fdf6e8372f41aa85dc9f11d306b4b2 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Sat, 16 Jul 2022 14:52:30 -0300 Subject: [PATCH 07/30] compaction_manager: make propagate_replacement() switch to table_state propagate_replacement is used by incremental compaction to notify ongoing compaction about sstable list updates, such that the ongoing job won't hold reference to exhausted sstables. So it needs to switch to table_state, too. Signed-off-by: Raphael S. Carvalho --- compaction/compaction_manager.cc | 7 ++++--- compaction/compaction_manager.hh | 2 +- test/lib/sstable_utils.hh | 2 +- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index 31f64609e3..dead4e77b7 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -329,7 +329,7 @@ future compaction_manager::task::compact_sstables(s }; descriptor.replacer = [this, &t, release_exhausted] (sstables::compaction_completion_desc desc) { t.get_compaction_strategy().notify_completion(desc.old_sstables, desc.new_sstables); - _cm.propagate_replacement(&t, desc.old_sstables, desc.new_sstables); + _cm.propagate_replacement(t.as_table_state(), desc.old_sstables, desc.new_sstables); auto old_sstables = desc.old_sstables; t.on_compaction_completion(std::move(desc)); // Calls compaction manager's task for this compaction to release reference to exhausted SSTables. @@ -1527,10 +1527,11 @@ future<> compaction_manager::stop_compaction(sstring type, replica::table* table return stop_ongoing_compactions("user request", table, target_type); } -void compaction_manager::propagate_replacement(replica::table* t, +void compaction_manager::propagate_replacement(compaction::table_state& t, const std::vector& removed, const std::vector& added) { for (auto& task : _tasks) { - if (task->compacting_table() == t && task->compaction_running()) { + // FIXME: stop using as_table_state() once compaction_manager::task switches to table_state. + if (&task->compacting_table()->as_table_state() == &t && task->compaction_running()) { task->compaction_data().pending_replacements.push_back({ removed, added }); } } diff --git a/compaction/compaction_manager.hh b/compaction/compaction_manager.hh index 3034006be0..d9542e43e7 100644 --- a/compaction/compaction_manager.hh +++ b/compaction/compaction_manager.hh @@ -353,7 +353,7 @@ private: future<> really_do_stop(); // Propagate replacement of sstables to all ongoing compaction of a given table - void propagate_replacement(replica::table* t, const std::vector& removed, const std::vector& added); + void propagate_replacement(compaction::table_state& t, const std::vector& removed, const std::vector& added); // This constructor is suposed to only be used for testing so lets be more explicit // about invoking it. Ref #10146 diff --git a/test/lib/sstable_utils.hh b/test/lib/sstable_utils.hh index 91c416a92a..028b1c689a 100644 --- a/test/lib/sstable_utils.hh +++ b/test/lib/sstable_utils.hh @@ -377,7 +377,7 @@ public: future<> run(utils::UUID output_run_id, replica::column_family* cf, noncopyable_function (sstables::compaction_data&)> job); void propagate_replacement(replica::table* t, const std::vector& removed, const std::vector& added) { - _cm.propagate_replacement(t, removed, added); + _cm.propagate_replacement(t->as_table_state(), removed, added); } private: sstables::compaction_data& register_compaction(shared_ptr task); From 61510af62aba39b27ce8cd6811754bf0642da441 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Mon, 11 Jul 2022 13:42:54 -0300 Subject: [PATCH 08/30] compaction_manager: make get_candidates() switch to table_state Signed-off-by: Raphael S. Carvalho --- compaction/compaction_manager.cc | 14 +++++++------- compaction/compaction_manager.hh | 2 +- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index dead4e77b7..5ba04da416 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -202,9 +202,9 @@ std::vector in_strategy_sstables(compaction::table_sta })); } -std::vector compaction_manager::get_candidates(const replica::table& t) { +std::vector compaction_manager::get_candidates(compaction::table_state& t) { std::vector candidates; - candidates.reserve(t.sstables_count()); + candidates.reserve(t.main_sstable_set().all()->size()); // prevents sstables that belongs to a partial run being generated by ongoing compaction from being // selected for compaction, which could potentially result in wrong behavior. auto partial_run_identifiers = boost::copy_range>(_tasks @@ -213,7 +213,7 @@ std::vector compaction_manager::get_candidates(const r auto& cs = t.get_compaction_strategy(); // Filter out sstables that are being compacted. - for (auto& sst : in_strategy_sstables(t.as_table_state())) { + for (auto& sst : in_strategy_sstables(t)) { if (_compacting_sstables.contains(sst)) { continue; } @@ -371,7 +371,7 @@ protected: // those are eligible for major compaction. auto* t = _compacting_table; sstables::compaction_strategy cs = t->get_compaction_strategy(); - sstables::compaction_descriptor descriptor = cs.get_major_compaction_job(t->as_table_state(), _cm.get_candidates(*t)); + sstables::compaction_descriptor descriptor = cs.get_major_compaction_job(t->as_table_state(), _cm.get_candidates(t->as_table_state())); auto compacting = compacting_sstable_registration(_cm, descriptor.sstables); auto release_exhausted = [&compacting] (const std::vector& exhausted_sstables) { compacting.release_compacting(exhausted_sstables); @@ -904,7 +904,7 @@ protected: replica::table& t = *_compacting_table; sstables::compaction_strategy cs = t.get_compaction_strategy(); - sstables::compaction_descriptor descriptor = cs.get_sstables_for_compaction(t.as_table_state(), _cm.get_strategy_control(), _cm.get_candidates(t)); + sstables::compaction_descriptor descriptor = cs.get_sstables_for_compaction(t.as_table_state(), _cm.get_strategy_control(), _cm.get_candidates(t.as_table_state())); int weight = calculate_weight(descriptor); if (descriptor.sstables.empty() || !can_proceed() || t.is_auto_compaction_disabled_by_user()) { @@ -1371,7 +1371,7 @@ future<> compaction_manager::perform_cleanup(replica::database& db, replica::tab return seastar::async([this, &db, t, sorted_owned_ranges = std::move(sorted_owned_ranges)] { auto schema = t->schema(); auto sstables = std::vector{}; - const auto candidates = get_candidates(*t); + const auto candidates = get_candidates(t->as_table_state()); std::copy_if(candidates.begin(), candidates.end(), std::back_inserter(sstables), [&sorted_owned_ranges, schema] (const sstables::shared_sstable& sst) { seastar::thread::maybe_yield(); return sorted_owned_ranges.empty() || needs_cleanup(sst, sorted_owned_ranges, schema); @@ -1391,7 +1391,7 @@ future<> compaction_manager::perform_sstable_upgrade(replica::database& db, repl auto last_version = t->get_sstables_manager().get_highest_supported_format(); - for (auto& sst : get_candidates(*t)) { + for (auto& sst : get_candidates(t->as_table_state())) { // if we are a "normal" upgrade, we only care about // tables with older versions, but potentially // we are to actually rewrite everything. (-a) diff --git a/compaction/compaction_manager.hh b/compaction/compaction_manager.hh index d9542e43e7..869533a602 100644 --- a/compaction/compaction_manager.hh +++ b/compaction/compaction_manager.hh @@ -312,7 +312,7 @@ private: void deregister_weight(int weight); // Get candidates for compaction strategy, which are all sstables but the ones being compacted. - std::vector get_candidates(const replica::table& t); + std::vector get_candidates(compaction::table_state& t); template requires std::same_as || std::sentinel_for From 598ede607fe5b8087ba1a7984cdf2d59d0ed289c Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Sat, 16 Jul 2022 14:55:43 -0300 Subject: [PATCH 09/30] compaction_manager: make can_register_compaction() switch to table_state Signed-off-by: Raphael S. Carvalho --- compaction/compaction_manager.cc | 6 +++--- compaction/compaction_manager.hh | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index 5ba04da416..7c010599c7 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -159,9 +159,9 @@ unsigned compaction_manager::current_compaction_fan_in_threshold() const { return std::min(unsigned(32), largest_fan_in); } -bool compaction_manager::can_register_compaction(replica::table* t, int weight, unsigned fan_in) const { +bool compaction_manager::can_register_compaction(compaction::table_state& t, int weight, unsigned fan_in) const { // Only one weight is allowed if parallel compaction is disabled. - if (!t->get_compaction_strategy().parallel_compaction() && has_table_ongoing_compaction(t->as_table_state())) { + if (!t.get_compaction_strategy().parallel_compaction() && has_table_ongoing_compaction(t)) { return false; } // Weightless compaction doesn't have to be serialized, and won't dillute overall efficiency. @@ -911,7 +911,7 @@ protected: cmlog.debug("{}: sstables={} can_proceed={} auto_compaction={}", *this, descriptor.sstables.size(), can_proceed(), t.is_auto_compaction_disabled_by_user()); co_return; } - if (!_cm.can_register_compaction(&t, weight, descriptor.fan_in())) { + if (!_cm.can_register_compaction(t.as_table_state(), weight, descriptor.fan_in())) { cmlog.debug("Refused compaction job ({} sstable(s)) of weight {} for {}.{}, postponing it...", descriptor.sstables.size(), weight, t.schema()->ks_name(), t.schema()->cf_name()); switch_state(state::postponed); diff --git a/compaction/compaction_manager.hh b/compaction/compaction_manager.hh index 869533a602..7673cda6b7 100644 --- a/compaction/compaction_manager.hh +++ b/compaction/compaction_manager.hh @@ -304,7 +304,7 @@ private: unsigned current_compaction_fan_in_threshold() const; // Return true if compaction can be initiated - bool can_register_compaction(replica::table* t, int weight, unsigned fan_in) const; + bool can_register_compaction(compaction::table_state& t, int weight, unsigned fan_in) const; // Register weight for a table. Do that only if can_register_weight() // returned true. void register_weight(int weight); From 309d73c584ca0195de80418a0dd434b69dc1e5a0 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Mon, 11 Jul 2022 13:54:16 -0300 Subject: [PATCH 10/30] compaction_manager: change task::update_history() to use table_state instead Signed-off-by: Raphael S. Carvalho --- compaction/compaction_manager.cc | 8 ++++---- compaction/compaction_manager.hh | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index 7c010599c7..37cda515a2 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -315,7 +315,7 @@ future<> compaction_manager::task::compact_sstables_and_update_history(sstables: sstables::compaction_result res = co_await compact_sstables(std::move(descriptor), cdata, std::move(release_exhausted), std::move(can_purge)); if (should_update_history) { - co_await update_history(*_compacting_table, res, cdata); + co_await update_history(_compacting_table->as_table_state(), res, cdata); } } future compaction_manager::task::compact_sstables(sstables::compaction_descriptor descriptor, sstables::compaction_data& cdata, release_exhausted_func_t release_exhausted, can_purge_tombstones can_purge) { @@ -340,10 +340,10 @@ future compaction_manager::task::compact_sstables(s co_return co_await sstables::compact_sstables(std::move(descriptor), cdata, t.as_table_state()); } -future<> compaction_manager::task::update_history(replica::table& t, const sstables::compaction_result& res, const sstables::compaction_data& cdata) { +future<> compaction_manager::task::update_history(compaction::table_state& t, const sstables::compaction_result& res, const sstables::compaction_data& cdata) { auto ended_at = std::chrono::duration_cast(res.ended_at.time_since_epoch()); - co_return co_await t.as_table_state().update_compaction_history(cdata.compaction_uuid, t.schema()->ks_name(), t.schema()->cf_name(), ended_at, + co_return co_await t.update_compaction_history(cdata.compaction_uuid, t.schema()->ks_name(), t.schema()->cf_name(), ended_at, res.start_size, res.end_size); } @@ -947,7 +947,7 @@ protected: // the weight earlier to remove unnecessary // serialization. weight_r.deregister(); - co_await update_history(*_compacting_table, res, _compaction_data); + co_await update_history(_compacting_table->as_table_state(), res, _compaction_data); } _cm.reevaluate_postponed_compactions(); continue; diff --git a/compaction/compaction_manager.hh b/compaction/compaction_manager.hh index 7673cda6b7..bc4aa3c33d 100644 --- a/compaction/compaction_manager.hh +++ b/compaction/compaction_manager.hh @@ -149,7 +149,7 @@ public: can_purge_tombstones can_purge = can_purge_tombstones::yes); future compact_sstables(sstables::compaction_descriptor descriptor, sstables::compaction_data& cdata, release_exhausted_func_t release_exhausted, can_purge_tombstones can_purge = can_purge_tombstones::yes); - future<> update_history(replica::table& t, const sstables::compaction_result& res, const sstables::compaction_data& cdata); + future<> update_history(compaction::table_state& t, const sstables::compaction_result& res, const sstables::compaction_data& cdata); bool should_update_history(sstables::compaction_type ct) { return ct == sstables::compaction_type::Compaction; } From b6126395e12566298c39f316f2d43adf304e54c0 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Tue, 12 Jul 2022 17:33:11 -0300 Subject: [PATCH 11/30] compaction_manager: make get_compactions() switch to table_state The only external user of get_compactions() doesn't use any filtering, so after table_state switch, one will be allowed to get all jobs running associated with a table_state. Signed-off-by: Raphael S. Carvalho --- compaction/compaction_manager.cc | 6 +++--- compaction/compaction_manager.hh | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index 37cda515a2..c333e01371 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -773,7 +773,7 @@ future<> compaction_manager::stop_tasks(std::vector> tasks, sst } future<> compaction_manager::stop_ongoing_compactions(sstring reason, replica::table* t, std::optional type_opt) { - auto ongoing_compactions = get_compactions(t).size(); + auto ongoing_compactions = get_compactions(&t->as_table_state()).size(); auto tasks = boost::copy_range>>(_tasks | boost::adaptors::filtered([t, type_opt] (auto& task) { return (!t || task->compacting_table() == t) && (!type_opt || task->type() == *type_opt); })); @@ -1481,7 +1481,7 @@ future<> compaction_manager::remove(replica::table* t) { #endif } -const std::vector compaction_manager::get_compactions(replica::table* t) const { +const std::vector compaction_manager::get_compactions(compaction::table_state* t) const { auto to_info = [] (const shared_ptr& task) { sstables::compaction_info ret; ret.compaction_uuid = task->compaction_data().compaction_uuid; @@ -1494,7 +1494,7 @@ const std::vector compaction_manager::get_compactions }; using ret = std::vector; return boost::copy_range(_tasks | boost::adaptors::filtered([t] (const shared_ptr& task) { - return (!t || task->compacting_table() == t) && task->compaction_running(); + return (!t || &task->compacting_table()->as_table_state() == t) && task->compaction_running(); }) | boost::adaptors::transformed(to_info)); } diff --git a/compaction/compaction_manager.hh b/compaction/compaction_manager.hh index bc4aa3c33d..6f363a4fff 100644 --- a/compaction/compaction_manager.hh +++ b/compaction/compaction_manager.hh @@ -455,7 +455,7 @@ public: return _stats; } - const std::vector get_compactions(replica::table* t = nullptr) const; + const std::vector get_compactions(compaction::table_state* t = nullptr) const; // Returns true if table has an ongoing compaction, running on its behalf bool has_table_ongoing_compaction(const compaction::table_state& t) const; From 7a9908dbf1e738eff931f87fe2a1bc2aaa2de47b Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Tue, 12 Jul 2022 17:50:39 -0300 Subject: [PATCH 12/30] compaction_manager: make stop compaction procedures switch to table_state they're used to stop all ongoing compaction on behalf of a given table T. Today, each table has a single table_state representing it, but after we implement compaction groups, we'll need to call the procedure for each group in a table. But the discussion doesn't belong here, as compaction group work will only come later. By the time being, we're only making compaction manager fully switch to table_state. Signed-off-by: Raphael S. Carvalho --- api/compaction_manager.cc | 2 +- compaction/compaction_manager.cc | 12 ++++++------ compaction/compaction_manager.hh | 4 ++-- replica/table.cc | 2 +- 4 files changed, 10 insertions(+), 10 deletions(-) diff --git a/api/compaction_manager.cc b/api/compaction_manager.cc index e5c2cb8488..3e0fe669f8 100644 --- a/api/compaction_manager.cc +++ b/api/compaction_manager.cc @@ -119,7 +119,7 @@ void set_compaction_manager(http_context& ctx, routes& r) { auto& cm = db.get_compaction_manager(); return parallel_for_each(table_names, [&db, &cm, &ks_name, type] (sstring& table_name) { auto& t = db.find_column_family(ks_name, table_name); - return cm.stop_compaction(type, &t); + return cm.stop_compaction(type, &t.as_table_state()); }); }); co_return json_void(); diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index c333e01371..bb6284f53d 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -473,7 +473,7 @@ compaction_manager::compaction_reenabler::~compaction_reenabler() { future compaction_manager::stop_and_disable_compaction(replica::table* t) { compaction_reenabler cre(*this, t); - co_await stop_ongoing_compactions("user-triggered operation", t); + co_await stop_ongoing_compactions("user-triggered operation", &t->as_table_state()); co_return cre; } @@ -772,10 +772,10 @@ future<> compaction_manager::stop_tasks(std::vector> tasks, sst }); } -future<> compaction_manager::stop_ongoing_compactions(sstring reason, replica::table* t, std::optional type_opt) { - auto ongoing_compactions = get_compactions(&t->as_table_state()).size(); +future<> compaction_manager::stop_ongoing_compactions(sstring reason, compaction::table_state* t, std::optional type_opt) { + auto ongoing_compactions = get_compactions(t).size(); auto tasks = boost::copy_range>>(_tasks | boost::adaptors::filtered([t, type_opt] (auto& task) { - return (!t || task->compacting_table() == t) && (!type_opt || task->type() == *type_opt); + return (!t || &task->compacting_table()->as_table_state() == t) && (!type_opt || task->type() == *type_opt); })); logging::log_level level = tasks.empty() ? log_level::debug : log_level::info; if (cmlog.is_enabled(level)) { @@ -1458,7 +1458,7 @@ future<> compaction_manager::remove(replica::table* t) { _postponed.erase(t); // Wait for the termination of an ongoing compaction on table T, if any. - co_await stop_ongoing_compactions("table removal", t); + co_await stop_ongoing_compactions("table removal", &t->as_table_state()); // Wait for all functions running under gate to terminate. co_await c_state.gate.close(); @@ -1508,7 +1508,7 @@ bool compaction_manager::compaction_disabled(compaction::table_state& t) const { return _compaction_state.contains(&t) && _compaction_state.at(&t).compaction_disabled(); } -future<> compaction_manager::stop_compaction(sstring type, replica::table* table) { +future<> compaction_manager::stop_compaction(sstring type, compaction::table_state* table) { sstables::compaction_type target_type; try { target_type = sstables::to_compaction_type(type); diff --git a/compaction/compaction_manager.hh b/compaction/compaction_manager.hh index 6f363a4fff..84d01973c9 100644 --- a/compaction/compaction_manager.hh +++ b/compaction/compaction_manager.hh @@ -463,10 +463,10 @@ public: bool compaction_disabled(compaction::table_state& t) const; // Stops ongoing compaction of a given type. - future<> stop_compaction(sstring type, replica::table* table = nullptr); + future<> stop_compaction(sstring type, compaction::table_state* table = nullptr); // Stops ongoing compaction of a given table and/or compaction_type. - future<> stop_ongoing_compactions(sstring reason, replica::table* t = nullptr, std::optional type_opt = {}); + future<> stop_ongoing_compactions(sstring reason, compaction::table_state* t = nullptr, std::optional type_opt = {}); double backlog() { return _backlog_manager.backlog(); diff --git a/replica/table.cc b/replica/table.cc index dc9ab57cd1..9fc4297d19 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -2201,7 +2201,7 @@ table::disable_auto_compaction() { // for new submissions _compaction_disabled_by_user = true; return with_gate(_async_gate, [this] { - return _compaction_manager.stop_ongoing_compactions("disable auto-compaction", this, sstables::compaction_type::Compaction); + return _compaction_manager.stop_ongoing_compactions("disable auto-compaction", &as_table_state(), sstables::compaction_type::Compaction); }); } From 956c3997cb5c461bd8f7156dc0ce244600634c7e Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Tue, 12 Jul 2022 18:06:40 -0300 Subject: [PATCH 13/30] compaction_manager: make can_proceed switch to table_state Signed-off-by: Raphael S. Carvalho --- compaction/compaction_manager.cc | 6 +++--- compaction/compaction_manager.hh | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index bb6284f53d..a052abf41f 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -837,8 +837,8 @@ void compaction_manager::do_stop() noexcept { } } -inline bool compaction_manager::can_proceed(replica::table* t) const { - return (_state == state::enabled) && _compaction_state.contains(&t->as_table_state()) && !_compaction_state.at(&t->as_table_state()).compaction_disabled(); +inline bool compaction_manager::can_proceed(compaction::table_state* t) const { + return (_state == state::enabled) && _compaction_state.contains(t) && !_compaction_state.at(t).compaction_disabled(); } inline bool compaction_manager::task::can_proceed(throw_if_stopping do_throw_if_stopping) const { @@ -849,7 +849,7 @@ inline bool compaction_manager::task::can_proceed(throw_if_stopping do_throw_if_ } return false; } - return _cm.can_proceed(_compacting_table); + return _cm.can_proceed(&_compacting_table->as_table_state()); } future compaction_manager::task::maybe_retry(std::exception_ptr err) { diff --git a/compaction/compaction_manager.hh b/compaction/compaction_manager.hh index 84d01973c9..613b58d2d0 100644 --- a/compaction/compaction_manager.hh +++ b/compaction/compaction_manager.hh @@ -328,7 +328,7 @@ private: // Return true if compaction manager is enabled and // table still exists and compaction is not disabled for the table. - inline bool can_proceed(replica::table* t) const; + inline bool can_proceed(compaction::table_state* t) const; void postponed_compactions_reevaluation(); void reevaluate_postponed_compactions() noexcept; From 1520580212bf98c31c09312b5372c7a77ee2f71b Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Tue, 12 Jul 2022 18:14:15 -0300 Subject: [PATCH 14/30] compaction: table_state: Add make_sstable() compaction_manager needs this interface when setting the sstable creation lambda in compaction_descriptor, which is then forwarded into the actual compaction procedure. Signed-off-by: Raphael S. Carvalho --- compaction/table_state.hh | 1 + replica/table.cc | 3 +++ test/boost/sstable_compaction_test.cc | 3 +++ 3 files changed, 7 insertions(+) diff --git a/compaction/table_state.hh b/compaction/table_state.hh index beca464034..5e6a03e6ba 100644 --- a/compaction/table_state.hh +++ b/compaction/table_state.hh @@ -34,6 +34,7 @@ public: virtual const std::vector& compacted_undeleted_sstables() const noexcept = 0; virtual sstables::compaction_strategy& get_compaction_strategy() const noexcept = 0; virtual reader_permit make_compaction_reader_permit() const = 0; + virtual sstables::shared_sstable make_sstable() const = 0; virtual sstables::sstable_writer_config configure_writer(sstring origin) const = 0; virtual api::timestamp_type min_memtable_timestamp() const = 0; virtual future<> update_compaction_history(utils::UUID compaction_id, sstring ks_name, sstring cf_name, std::chrono::milliseconds ended_at, int64_t bytes_in, int64_t bytes_out) = 0; diff --git a/replica/table.cc b/replica/table.cc index 9fc4297d19..42fbad2f4f 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -2406,6 +2406,9 @@ public: reader_permit make_compaction_reader_permit() const override { return _t.compaction_concurrency_semaphore().make_tracking_only_permit(schema().get(), "compaction", db::no_timeout); } + sstables::shared_sstable make_sstable() const override { + return _t.make_sstable(); + } sstables::sstable_writer_config configure_writer(sstring origin) const override { return _t.get_sstables_manager().configure_writer(std::move(origin)); } diff --git a/test/boost/sstable_compaction_test.cc b/test/boost/sstable_compaction_test.cc index b9e1afd148..2fcd2b25d8 100644 --- a/test/boost/sstable_compaction_test.cc +++ b/test/boost/sstable_compaction_test.cc @@ -157,6 +157,9 @@ public: reader_permit make_compaction_reader_permit() const override { return _env.make_reader_permit(); } + sstables::shared_sstable make_sstable() const override { + return _t->make_sstable(); + } sstables::sstable_writer_config configure_writer(sstring origin) const override { return _env.manager().configure_writer(std::move(origin)); } From 1deeeff8254425e53fdfe66625947d287b33b709 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Sat, 16 Jul 2022 15:03:25 -0300 Subject: [PATCH 15/30] compaction: table_state: Add on_compaction_completion() The idea is that we'll have a single on-completion interface for both "in-strategy" and off-strategy compactions, so not to pollute table_state with one interface for each. replica::table::on_compaction_completion is being moved into private namespace. Signed-off-by: Raphael S. Carvalho --- compaction/compaction_manager.cc | 4 ++-- compaction/table_state.hh | 2 ++ replica/database.hh | 1 - replica/table.cc | 7 +++++++ test/boost/sstable_compaction_test.cc | 3 +++ 5 files changed, 14 insertions(+), 3 deletions(-) diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index a052abf41f..f0c0507a22 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -331,7 +331,7 @@ future compaction_manager::task::compact_sstables(s t.get_compaction_strategy().notify_completion(desc.old_sstables, desc.new_sstables); _cm.propagate_replacement(t.as_table_state(), desc.old_sstables, desc.new_sstables); auto old_sstables = desc.old_sstables; - t.on_compaction_completion(std::move(desc)); + t.as_table_state().on_compaction_completion(std::move(desc), sstables::offstrategy::no).get(); // Calls compaction manager's task for this compaction to release reference to exhausted SSTables. if (release_exhausted) { release_exhausted(old_sstables); @@ -1052,7 +1052,7 @@ public: .old_sstables = std::move(old_sstables), .new_sstables = std::move(reshape_candidates) }; - co_await t.update_sstable_lists_on_off_strategy_completion(std::move(completion_desc)); + co_await t.as_table_state().on_compaction_completion(std::move(completion_desc), sstables::offstrategy::yes); cleanup_new_unused_sstables_on_failure.cancel(); // By marking input sstables for deletion instead, the ones which require view building will stay in the staging diff --git a/compaction/table_state.hh b/compaction/table_state.hh index 5e6a03e6ba..c8a6ad6e47 100644 --- a/compaction/table_state.hh +++ b/compaction/table_state.hh @@ -11,6 +11,7 @@ #include "schema_fwd.hh" #include "sstables/sstable_set.hh" +#include "compaction_descriptor.hh" class reader_permit; @@ -38,6 +39,7 @@ public: virtual sstables::sstable_writer_config configure_writer(sstring origin) const = 0; virtual api::timestamp_type min_memtable_timestamp() const = 0; virtual future<> update_compaction_history(utils::UUID compaction_id, sstring ks_name, sstring cf_name, std::chrono::milliseconds ended_at, int64_t bytes_in, int64_t bytes_out) = 0; + virtual future<> on_compaction_completion(sstables::compaction_completion_desc desc, sstables::offstrategy offstrategy) = 0; }; } diff --git a/replica/database.hh b/replica/database.hh index 7f76253629..eb4d83dc04 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -592,7 +592,6 @@ private: static seastar::shard_id calculate_shard_from_sstable_generation(sstables::generation_type sstable_generation) { return sstables::generation_value(sstable_generation) % smp::count; } -public: // This will update sstable lists on behalf of off-strategy compaction, where // input files will be removed from the maintenance set and output files will // be inserted into the main set. diff --git a/replica/table.cc b/replica/table.cc index 42fbad2f4f..32bebe7666 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -2426,6 +2426,13 @@ public: return db::system_keyspace::update_compaction_history(compaction_id, ks_name, cf_name, ended_at.count(), bytes_in, bytes_out, std::unordered_map{}); } + future<> on_compaction_completion(sstables::compaction_completion_desc desc, sstables::offstrategy offstrategy) override { + if (offstrategy) { + return _t.update_sstable_lists_on_off_strategy_completion(std::move(desc)); + } + _t.on_compaction_completion(std::move(desc)); + return make_ready_future<>(); + } }; compaction::table_state& table::as_table_state() const noexcept { diff --git a/test/boost/sstable_compaction_test.cc b/test/boost/sstable_compaction_test.cc index 2fcd2b25d8..57c3778e05 100644 --- a/test/boost/sstable_compaction_test.cc +++ b/test/boost/sstable_compaction_test.cc @@ -170,6 +170,9 @@ public: future<> update_compaction_history(utils::UUID compaction_id, sstring ks_name, sstring cf_name, std::chrono::milliseconds ended_at, int64_t bytes_in, int64_t bytes_out) override { return make_ready_future<>(); } + future<> on_compaction_completion(sstables::compaction_completion_desc desc, sstables::offstrategy offstrategy) override { + return _t->as_table_state().on_compaction_completion(std::move(desc), offstrategy); + } }; static std::unique_ptr make_table_state_for_test(column_family_for_tests& t, test_env& env) { From 43136a3ca7138cb4971446e4aed17c9a0e809441 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Tue, 12 Jul 2022 20:39:53 -0300 Subject: [PATCH 16/30] compaction: table_state: Add is_auto_compaction_disabled_by_user() auto_compaction_disabled_by_user is a configuration that can be enabled or disabled on a particular table. We're adding this interface to avoid having to push the configuration for every compaction_state, which would result in redundant information as the configuration value is the same for all table states. Signed-off-by: Raphael S. Carvalho --- compaction/table_state.hh | 1 + replica/table.cc | 3 +++ test/boost/sstable_compaction_test.cc | 3 +++ 3 files changed, 7 insertions(+) diff --git a/compaction/table_state.hh b/compaction/table_state.hh index c8a6ad6e47..883ac76bf7 100644 --- a/compaction/table_state.hh +++ b/compaction/table_state.hh @@ -40,6 +40,7 @@ public: virtual api::timestamp_type min_memtable_timestamp() const = 0; virtual future<> update_compaction_history(utils::UUID compaction_id, sstring ks_name, sstring cf_name, std::chrono::milliseconds ended_at, int64_t bytes_in, int64_t bytes_out) = 0; virtual future<> on_compaction_completion(sstables::compaction_completion_desc desc, sstables::offstrategy offstrategy) = 0; + virtual bool is_auto_compaction_disabled_by_user() const noexcept = 0; }; } diff --git a/replica/table.cc b/replica/table.cc index 32bebe7666..99e0879ec1 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -2433,6 +2433,9 @@ public: _t.on_compaction_completion(std::move(desc)); return make_ready_future<>(); } + bool is_auto_compaction_disabled_by_user() const noexcept override { + return _t.is_auto_compaction_disabled_by_user(); + } }; compaction::table_state& table::as_table_state() const noexcept { diff --git a/test/boost/sstable_compaction_test.cc b/test/boost/sstable_compaction_test.cc index 57c3778e05..9eacb5ccca 100644 --- a/test/boost/sstable_compaction_test.cc +++ b/test/boost/sstable_compaction_test.cc @@ -173,6 +173,9 @@ public: future<> on_compaction_completion(sstables::compaction_completion_desc desc, sstables::offstrategy offstrategy) override { return _t->as_table_state().on_compaction_completion(std::move(desc), offstrategy); } + bool is_auto_compaction_disabled_by_user() const noexcept override { + return false; + } }; static std::unique_ptr make_table_state_for_test(column_family_for_tests& t, test_env& env) { From a17602227222d3112dbd9f34a7b7d9f5e505c535 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Sat, 16 Jul 2022 15:12:29 -0300 Subject: [PATCH 17/30] compaction_manager: task: switch to table_state Signed-off-by: Raphael S. Carvalho --- compaction/compaction_manager.cc | 84 ++++++++++++++++---------------- compaction/compaction_manager.hh | 8 +-- test/lib/sstable_utils.cc | 2 +- 3 files changed, 48 insertions(+), 46 deletions(-) diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index f0c0507a22..9e6044cb1c 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -268,10 +268,10 @@ compaction_manager::compaction_state& compaction_manager::get_compaction_state(c } } -compaction_manager::task::task(compaction_manager& mgr, replica::table* t, sstables::compaction_type type, sstring desc) +compaction_manager::task::task(compaction_manager& mgr, compaction::table_state* t, sstables::compaction_type type, sstring desc) : _cm(mgr) , _compacting_table(t) - , _compaction_state(_cm.get_compaction_state(&t->as_table_state())) + , _compaction_state(_cm.get_compaction_state(t)) , _type(type) , _gate_holder(_compaction_state.gate.hold()) , _description(std::move(desc)) @@ -315,13 +315,13 @@ future<> compaction_manager::task::compact_sstables_and_update_history(sstables: sstables::compaction_result res = co_await compact_sstables(std::move(descriptor), cdata, std::move(release_exhausted), std::move(can_purge)); if (should_update_history) { - co_await update_history(_compacting_table->as_table_state(), res, cdata); + co_await update_history(*_compacting_table, res, cdata); } } future compaction_manager::task::compact_sstables(sstables::compaction_descriptor descriptor, sstables::compaction_data& cdata, release_exhausted_func_t release_exhausted, can_purge_tombstones can_purge) { - replica::table& t = *_compacting_table; + compaction::table_state& t = *_compacting_table; if (can_purge) { - descriptor.enable_garbage_collection(t.get_sstable_set()); + descriptor.enable_garbage_collection(t.main_sstable_set()); } descriptor.creator = [&t] (shard_id dummy) { auto sst = t.make_sstable(); @@ -329,16 +329,16 @@ future compaction_manager::task::compact_sstables(s }; descriptor.replacer = [this, &t, release_exhausted] (sstables::compaction_completion_desc desc) { t.get_compaction_strategy().notify_completion(desc.old_sstables, desc.new_sstables); - _cm.propagate_replacement(t.as_table_state(), desc.old_sstables, desc.new_sstables); + _cm.propagate_replacement(t, desc.old_sstables, desc.new_sstables); auto old_sstables = desc.old_sstables; - t.as_table_state().on_compaction_completion(std::move(desc), sstables::offstrategy::no).get(); + t.on_compaction_completion(std::move(desc), sstables::offstrategy::no).get(); // Calls compaction manager's task for this compaction to release reference to exhausted SSTables. if (release_exhausted) { release_exhausted(old_sstables); } }; - co_return co_await sstables::compact_sstables(std::move(descriptor), cdata, t.as_table_state()); + co_return co_await sstables::compact_sstables(std::move(descriptor), cdata, t); } future<> compaction_manager::task::update_history(compaction::table_state& t, const sstables::compaction_result& res, const sstables::compaction_data& cdata) { auto ended_at = std::chrono::duration_cast(res.ended_at.time_since_epoch()); @@ -349,7 +349,7 @@ future<> compaction_manager::task::update_history(compaction::table_state& t, co class compaction_manager::major_compaction_task : public compaction_manager::task { public: - major_compaction_task(compaction_manager& mgr, replica::table* t) + major_compaction_task(compaction_manager& mgr, compaction::table_state* t) : task(mgr, t, sstables::compaction_type::Compaction, "Major compaction") {} @@ -369,9 +369,9 @@ protected: // candidates are sstables that aren't being operated on by other compaction types. // those are eligible for major compaction. - auto* t = _compacting_table; + compaction::table_state* t = _compacting_table; sstables::compaction_strategy cs = t->get_compaction_strategy(); - sstables::compaction_descriptor descriptor = cs.get_major_compaction_job(t->as_table_state(), _cm.get_candidates(t->as_table_state())); + sstables::compaction_descriptor descriptor = cs.get_major_compaction_job(*t, _cm.get_candidates(*t)); auto compacting = compacting_sstable_registration(_cm, descriptor.sstables); auto release_exhausted = [&compacting] (const std::vector& exhausted_sstables) { compacting.release_compacting(exhausted_sstables); @@ -397,14 +397,14 @@ future<> compaction_manager::perform_major_compaction(replica::table* t) { if (_state != state::enabled) { return make_ready_future<>(); } - return perform_task(make_shared(*this, t)); + return perform_task(make_shared(*this, &t->as_table_state())); } class compaction_manager::custom_compaction_task : public compaction_manager::task { noncopyable_function(sstables::compaction_data&)> _job; public: - custom_compaction_task(compaction_manager& mgr, replica::table* t, sstables::compaction_type type, sstring desc, noncopyable_function(sstables::compaction_data&)> job) + custom_compaction_task(compaction_manager& mgr, compaction::table_state* t, sstables::compaction_type type, sstring desc, noncopyable_function(sstables::compaction_data&)> job) : task(mgr, t, type, std::move(desc)) , _job(std::move(job)) {} @@ -435,7 +435,7 @@ future<> compaction_manager::run_custom_job(replica::table* t, sstables::compact return make_ready_future<>(); } - return perform_task(make_shared(*this, t, type, desc, std::move(job))); + return perform_task(make_shared(*this, &t->as_table_state(), type, desc, std::move(job))); } compaction_manager::compaction_reenabler::compaction_reenabler(compaction_manager& cm, replica::table* t) @@ -775,7 +775,7 @@ future<> compaction_manager::stop_tasks(std::vector> tasks, sst future<> compaction_manager::stop_ongoing_compactions(sstring reason, compaction::table_state* t, std::optional type_opt) { auto ongoing_compactions = get_compactions(t).size(); auto tasks = boost::copy_range>>(_tasks | boost::adaptors::filtered([t, type_opt] (auto& task) { - return (!t || &task->compacting_table()->as_table_state() == t) && (!type_opt || task->type() == *type_opt); + return (!t || task->compacting_table() == t) && (!type_opt || task->type() == *type_opt); })); logging::log_level level = tasks.empty() ? log_level::debug : log_level::info; if (cmlog.is_enabled(level)) { @@ -849,7 +849,7 @@ inline bool compaction_manager::task::can_proceed(throw_if_stopping do_throw_if_ } return false; } - return _cm.can_proceed(&_compacting_table->as_table_state()); + return _cm.can_proceed(_compacting_table); } future compaction_manager::task::maybe_retry(std::exception_ptr err) { @@ -883,9 +883,12 @@ future compaction_manager::task::maybe_retry(std::exception_ptr } class compaction_manager::regular_compaction_task : public compaction_manager::task { + // FIXME: remove it once submit() switches to table_state. + replica::table* _table; public: regular_compaction_task(compaction_manager& mgr, replica::table* t) - : task(mgr, t, sstables::compaction_type::Compaction, "Compaction") + : task(mgr, &t->as_table_state(), sstables::compaction_type::Compaction, "Compaction") + , _table(t) {} protected: virtual future<> do_run() override { @@ -902,20 +905,20 @@ protected: co_return; } - replica::table& t = *_compacting_table; + compaction::table_state& t = *_compacting_table; sstables::compaction_strategy cs = t.get_compaction_strategy(); - sstables::compaction_descriptor descriptor = cs.get_sstables_for_compaction(t.as_table_state(), _cm.get_strategy_control(), _cm.get_candidates(t.as_table_state())); + sstables::compaction_descriptor descriptor = cs.get_sstables_for_compaction(t, _cm.get_strategy_control(), _cm.get_candidates(t)); int weight = calculate_weight(descriptor); if (descriptor.sstables.empty() || !can_proceed() || t.is_auto_compaction_disabled_by_user()) { cmlog.debug("{}: sstables={} can_proceed={} auto_compaction={}", *this, descriptor.sstables.size(), can_proceed(), t.is_auto_compaction_disabled_by_user()); co_return; } - if (!_cm.can_register_compaction(t.as_table_state(), weight, descriptor.fan_in())) { + if (!_cm.can_register_compaction(t, weight, descriptor.fan_in())) { cmlog.debug("Refused compaction job ({} sstable(s)) of weight {} for {}.{}, postponing it...", descriptor.sstables.size(), weight, t.schema()->ks_name(), t.schema()->cf_name()); switch_state(state::postponed); - _cm.postpone_compaction_for_table(&t); + _cm.postpone_compaction_for_table(_table); co_return; } auto compacting = compacting_sstable_registration(_cm, descriptor.sstables); @@ -947,7 +950,7 @@ protected: // the weight earlier to remove unnecessary // serialization. weight_r.deregister(); - co_await update_history(_compacting_table->as_table_state(), res, _compaction_data); + co_await update_history(*_compacting_table, res, _compaction_data); } _cm.reevaluate_postponed_compactions(); continue; @@ -976,7 +979,7 @@ void compaction_manager::submit(replica::table* t) { class compaction_manager::offstrategy_compaction_task : public compaction_manager::task { bool _performed = false; public: - offstrategy_compaction_task(compaction_manager& mgr, replica::table* t) + offstrategy_compaction_task(compaction_manager& mgr, compaction::table_state* t) : task(mgr, t, sstables::compaction_type::Reshape, "Offstrategy compaction") {} @@ -996,7 +999,7 @@ public: // by the fact that off-strategy is serialized across all tables, meaning that the // actual requirement is the size of the largest table's maintenance set. - replica::table& t = *_compacting_table; + compaction::table_state& t = *_compacting_table; const auto& maintenance_sstables = t.maintenance_sstable_set(); const auto old_sstables = boost::copy_range>(*maintenance_sstables.all()); @@ -1024,7 +1027,7 @@ public: }; auto input = boost::copy_range>(desc->sstables); - auto ret = co_await sstables::compact_sstables(std::move(*desc), cdata, t.as_table_state()); + auto ret = co_await sstables::compact_sstables(std::move(*desc), cdata, t); _performed = true; // update list of reshape candidates without input but with output added to it @@ -1052,7 +1055,7 @@ public: .old_sstables = std::move(old_sstables), .new_sstables = std::move(reshape_candidates) }; - co_await t.as_table_state().on_compaction_completion(std::move(completion_desc), sstables::offstrategy::yes); + co_await t.on_compaction_completion(std::move(completion_desc), sstables::offstrategy::yes); cleanup_new_unused_sstables_on_failure.cancel(); // By marking input sstables for deletion instead, the ones which require view building will stay in the staging @@ -1079,7 +1082,7 @@ protected: std::exception_ptr ex; try { - replica::table& t = *_compacting_table; + compaction::table_state& t = *_compacting_table; auto maintenance_sstables = t.maintenance_sstable_set().all(); cmlog.info("Starting off-strategy compaction for {}.{}, {} candidates were found", t.schema()->ks_name(), t.schema()->cf_name(), maintenance_sstables->size()); @@ -1103,7 +1106,7 @@ future compaction_manager::perform_offstrategy(replica::table* t) { if (_state != state::enabled) { co_return false; } - auto task = make_shared(*this, t); + auto task = make_shared(*this, &t->as_table_state()); co_await perform_task(task); co_return task->performed(); } @@ -1114,7 +1117,7 @@ class compaction_manager::rewrite_sstables_compaction_task : public compaction_m can_purge_tombstones _can_purge; public: - rewrite_sstables_compaction_task(compaction_manager& mgr, replica::table* t, sstables::compaction_type_options options, + rewrite_sstables_compaction_task(compaction_manager& mgr, compaction::table_state* t, sstables::compaction_type_options options, std::vector sstables, compacting_sstable_registration compacting, can_purge_tombstones can_purge) : sstables_task(mgr, t, options.type(), sstring(sstables::to_string(options.type())), std::move(sstables)) @@ -1201,7 +1204,7 @@ future<> compaction_manager::perform_task_on_all_files(replica::table* t, sstabl return a->data_size() > b->data_size(); }); }); - co_await perform_task(seastar::make_shared(*this, t, std::move(options), std::move(sstables), std::move(compacting), std::forward(args)...)); + co_await perform_task(seastar::make_shared(*this, &t->as_table_state(), std::move(options), std::move(sstables), std::move(compacting), std::forward(args)...)); } future<> compaction_manager::rewrite_sstables(replica::table* t, sstables::compaction_type_options options, get_candidates_func get_func, can_purge_tombstones can_purge) { @@ -1210,7 +1213,7 @@ future<> compaction_manager::rewrite_sstables(replica::table* t, sstables::compa class compaction_manager::validate_sstables_compaction_task : public compaction_manager::sstables_task { public: - validate_sstables_compaction_task(compaction_manager& mgr, replica::table* t, std::vector sstables) + validate_sstables_compaction_task(compaction_manager& mgr, compaction::table_state* t, std::vector sstables) : sstables_task(mgr, t, sstables::compaction_type::Scrub, "Scrub compaction in validate mode", std::move(sstables)) {} @@ -1236,7 +1239,7 @@ private: sstables::compaction_descriptor::default_max_sstable_bytes, sst->run_identifier(), sstables::compaction_type_options::make_scrub(sstables::compaction_type_options::scrub::mode::validate)); - co_await sstables::compact_sstables(std::move(desc), _compaction_data, _compacting_table->as_table_state()); + co_await sstables::compact_sstables(std::move(desc), _compaction_data, *_compacting_table); } catch (sstables::compaction_stopped_exception&) { // ignore, will be handled by can_proceed() } catch (storage_io_error& e) { @@ -1260,7 +1263,7 @@ future<> compaction_manager::perform_sstable_scrub_validate_mode(replica::table* } // All sstables must be included, even the ones being compacted, such that everything in table is validated. auto all_sstables = boost::copy_range>(*t->get_sstables()); - return perform_task(seastar::make_shared(*this, t, std::move(all_sstables))); + return perform_task(seastar::make_shared(*this, &t->as_table_state(), std::move(all_sstables))); } class compaction_manager::cleanup_sstables_compaction_task : public compaction_manager::task { @@ -1268,12 +1271,12 @@ class compaction_manager::cleanup_sstables_compaction_task : public compaction_m compacting_sstable_registration _compacting; std::vector _pending_cleanup_jobs; public: - cleanup_sstables_compaction_task(compaction_manager& mgr, replica::table* t, sstables::compaction_type_options options, + cleanup_sstables_compaction_task(compaction_manager& mgr, compaction::table_state* t, sstables::compaction_type_options options, std::vector candidates, compacting_sstable_registration compacting) : task(mgr, t, options.type(), sstring(sstables::to_string(options.type()))) , _cleanup_options(std::move(options)) , _compacting(std::move(compacting)) - , _pending_cleanup_jobs(t->get_compaction_strategy().get_cleanup_compaction_jobs(t->as_table_state(), std::move(candidates))) + , _pending_cleanup_jobs(t->get_compaction_strategy().get_cleanup_compaction_jobs(*t, std::move(candidates))) { // Cleanup is made more resilient under disk space pressure, by cleaning up smaller jobs first, so larger jobs // will have more space available released by previous jobs. @@ -1358,7 +1361,7 @@ bool needs_cleanup(const sstables::shared_sstable& sst, future<> compaction_manager::perform_cleanup(replica::database& db, replica::table* t) { auto check_for_cleanup = [this, t] { return boost::algorithm::any_of(_tasks, [t] (auto& task) { - return task->compacting_table() == t && task->type() == sstables::compaction_type::Cleanup; + return task->compacting_table() == &t->as_table_state() && task->type() == sstables::compaction_type::Cleanup; }); }; if (check_for_cleanup()) { @@ -1467,7 +1470,7 @@ future<> compaction_manager::remove(replica::table* t) { auto found = false; sstring msg; for (auto& task : _tasks) { - if (task->compacting_table() == t) { + if (task->compacting_table() == &t->as_table_state()) { if (!msg.empty()) { msg += "\n"; } @@ -1494,13 +1497,13 @@ const std::vector compaction_manager::get_compactions }; using ret = std::vector; return boost::copy_range(_tasks | boost::adaptors::filtered([t] (const shared_ptr& task) { - return (!t || &task->compacting_table()->as_table_state() == t) && task->compaction_running(); + return (!t || task->compacting_table() == t) && task->compaction_running(); }) | boost::adaptors::transformed(to_info)); } bool compaction_manager::has_table_ongoing_compaction(const compaction::table_state& t) const { return std::any_of(_tasks.begin(), _tasks.end(), [&t] (const shared_ptr& task) { - return &task->compacting_table()->as_table_state() == &t && task->compaction_running(); + return task->compacting_table() == &t && task->compaction_running(); }); }; @@ -1530,8 +1533,7 @@ future<> compaction_manager::stop_compaction(sstring type, compaction::table_sta void compaction_manager::propagate_replacement(compaction::table_state& t, const std::vector& removed, const std::vector& added) { for (auto& task : _tasks) { - // FIXME: stop using as_table_state() once compaction_manager::task switches to table_state. - if (&task->compacting_table()->as_table_state() == &t && task->compaction_running()) { + if (task->compacting_table() == &t && task->compaction_running()) { task->compaction_data().pending_replacements.push_back({ removed, added }); } } diff --git a/compaction/compaction_manager.hh b/compaction/compaction_manager.hh index 613b58d2d0..b799f72160 100644 --- a/compaction/compaction_manager.hh +++ b/compaction/compaction_manager.hh @@ -101,7 +101,7 @@ public: protected: compaction_manager& _cm; - replica::table* _compacting_table = nullptr; + compaction::table_state* _compacting_table = nullptr; compaction_state& _compaction_state; sstables::compaction_data _compaction_data; state _state = state::none; @@ -115,7 +115,7 @@ public: sstring _description; public: - explicit task(compaction_manager& mgr, replica::table* t, sstables::compaction_type type, sstring desc); + explicit task(compaction_manager& mgr, compaction::table_state* t, sstables::compaction_type type, sstring desc); task(task&&) = delete; task(const task&) = delete; @@ -156,7 +156,7 @@ public: public: future<> run() noexcept; - const replica::table* compacting_table() const noexcept { + const compaction::table_state* compacting_table() const noexcept { return _compacting_table; } @@ -210,7 +210,7 @@ public: sstables::shared_sstable consume_sstable(); public: - explicit sstables_task(compaction_manager& mgr, replica::table* t, sstables::compaction_type compaction_type, sstring desc, std::vector sstables) + explicit sstables_task(compaction_manager& mgr, compaction::table_state* t, sstables::compaction_type compaction_type, sstring desc, std::vector sstables) : task(mgr, t, compaction_type, std::move(desc)) { set_sstables(std::move(sstables)); diff --git a/test/lib/sstable_utils.cc b/test/lib/sstable_utils.cc index 8c6a4c100e..27b28a2d60 100644 --- a/test/lib/sstable_utils.cc +++ b/test/lib/sstable_utils.cc @@ -212,7 +212,7 @@ class compaction_manager::compaction_manager_test_task : public compaction_manag public: compaction_manager_test_task(compaction_manager& cm, replica::column_family* cf, utils::UUID run_id, noncopyable_function (sstables::compaction_data&)> job) - : compaction_manager::task(cm, cf, sstables::compaction_type::Compaction, "Test compaction") + : compaction_manager::task(cm, &cf->as_table_state(), sstables::compaction_type::Compaction, "Test compaction") , _run_id(run_id) , _job(std::move(job)) { } From 7c1d178f4e2891de0b27be7e0c0eee2140a13c32 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Tue, 12 Jul 2022 21:13:27 -0300 Subject: [PATCH 18/30] compaction_manager: make submit(T) switch to table_state Now that submit() switched to table_state, compaction_reenabler and friends can switch to table_state too. Signed-off-by: Raphael S. Carvalho --- compaction/compaction_manager.cc | 25 +++++++++++-------------- compaction/compaction_manager.hh | 9 +++------ replica/table.cc | 2 +- test/boost/sstable_compaction_test.cc | 2 +- 4 files changed, 16 insertions(+), 22 deletions(-) diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index 9e6044cb1c..24354b926f 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -462,7 +462,7 @@ compaction_manager::compaction_reenabler::~compaction_reenabler() { cmlog.debug("Reenabling compaction for {}.{}", _table->schema()->ks_name(), _table->schema()->cf_name()); try { - _cm.submit(_table); + _cm.submit(_table->as_table_state()); } catch (...) { cmlog.warn("compaction_reenabler could not reenable compaction for {}.{}: {}", _table->schema()->ks_name(), _table->schema()->cf_name(), std::current_exception()); @@ -716,7 +716,7 @@ void compaction_manager::enable() { std::function compaction_manager::compaction_submission_callback() { return [this] () mutable { for (auto& e: _compaction_state) { - submit(e.second.t); + submit(*e.first); } }; } @@ -733,7 +733,7 @@ void compaction_manager::postponed_compactions_reevaluation() { for (auto& t : postponed) { auto s = t->schema(); cmlog.debug("resubmitting postponed compaction for table {}.{} [{}]", s->ks_name(), s->cf_name(), fmt::ptr(t)); - submit(t); + submit(*t); } } catch (...) { _postponed = std::move(postponed); @@ -747,7 +747,7 @@ void compaction_manager::reevaluate_postponed_compactions() noexcept { _postponed_reevaluation.signal(); } -void compaction_manager::postpone_compaction_for_table(replica::table* t) { +void compaction_manager::postpone_compaction_for_table(compaction::table_state* t) { _postponed.insert(t); } @@ -883,12 +883,9 @@ future compaction_manager::task::maybe_retry(std::exception_ptr } class compaction_manager::regular_compaction_task : public compaction_manager::task { - // FIXME: remove it once submit() switches to table_state. - replica::table* _table; public: - regular_compaction_task(compaction_manager& mgr, replica::table* t) - : task(mgr, &t->as_table_state(), sstables::compaction_type::Compaction, "Compaction") - , _table(t) + regular_compaction_task(compaction_manager& mgr, compaction::table_state& t) + : task(mgr, &t, sstables::compaction_type::Compaction, "Compaction") {} protected: virtual future<> do_run() override { @@ -918,7 +915,7 @@ protected: cmlog.debug("Refused compaction job ({} sstable(s)) of weight {} for {}.{}, postponing it...", descriptor.sstables.size(), weight, t.schema()->ks_name(), t.schema()->cf_name()); switch_state(state::postponed); - _cm.postpone_compaction_for_table(_table); + _cm.postpone_compaction_for_table(&t); co_return; } auto compacting = compacting_sstable_registration(_cm, descriptor.sstables); @@ -966,8 +963,8 @@ protected: } }; -void compaction_manager::submit(replica::table* t) { - if (_state != state::enabled || t->is_auto_compaction_disabled_by_user()) { +void compaction_manager::submit(compaction::table_state& t) { + if (_state != state::enabled || t.is_auto_compaction_disabled_by_user()) { return; } @@ -1442,7 +1439,7 @@ future<> compaction_manager::perform_sstable_scrub(replica::table* t, sstables:: } void compaction_manager::add(replica::table* t) { - auto [_, inserted] = _compaction_state.insert({&t->as_table_state(), compaction_state{.t = t}}); + auto [_, inserted] = _compaction_state.insert({&t->as_table_state(), compaction_state{}}); if (!inserted) { auto s = t->schema(); on_internal_error(cmlog, format("compaction_state for table {}.{} [{}] already exists", s->ks_name(), s->cf_name(), fmt::ptr(t))); @@ -1458,7 +1455,7 @@ future<> compaction_manager::remove(replica::table* t) { // We need to guarantee that a task being stopped will not retry to compact // a table being removed. // The requirement above is provided by stop_ongoing_compactions(). - _postponed.erase(t); + _postponed.erase(&t->as_table_state()); // Wait for the termination of an ongoing compaction on table T, if any. co_await stop_ongoing_compactions("table removal", &t->as_table_state()); diff --git a/compaction/compaction_manager.hh b/compaction/compaction_manager.hh index b799f72160..1acc8a920a 100644 --- a/compaction/compaction_manager.hh +++ b/compaction/compaction_manager.hh @@ -60,9 +60,6 @@ public: }; private: struct compaction_state { - // FIXME: remove once we complete the transition to table_state. - replica::table* t = nullptr; - // Used both by compaction tasks that refer to the compaction_state // and by any function running under run_with_compaction_disabled(). seastar::gate gate; @@ -260,7 +257,7 @@ private: future<> _waiting_reevalution = make_ready_future<>(); condition_variable _postponed_reevaluation; // tables that wait for compaction but had its submission postponed due to ongoing compaction. - std::unordered_set _postponed; + std::unordered_set _postponed; // tracks taken weights of ongoing compactions, only one compaction per weight is allowed. // weight is value assigned to a compaction job that is log base N of total size of all input sstables. std::unordered_set _weight_tracker; @@ -334,7 +331,7 @@ private: void reevaluate_postponed_compactions() noexcept; // Postpone compaction for a table that couldn't be executed due to ongoing // similar-sized compaction. - void postpone_compaction_for_table(replica::table* t); + void postpone_compaction_for_table(compaction::table_state* t); future<> perform_sstable_scrub_validate_mode(replica::table* t); @@ -380,7 +377,7 @@ public: future<> drain(); // Submit a table to be compacted. - void submit(replica::table* t); + void submit(compaction::table_state& t); // Submit a table to be off-strategy compacted. // Returns true iff off-strategy compaction was required and performed. diff --git a/replica/table.cc b/replica/table.cc index 99e0879ec1..d091d44770 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -1011,7 +1011,7 @@ void table::try_trigger_compaction() noexcept { void table::do_trigger_compaction() { // But not if we're locked out or stopping if (!_async_gate.is_closed()) { - _compaction_manager.submit(this); + _compaction_manager.submit(as_table_state()); } } diff --git a/test/boost/sstable_compaction_test.cc b/test/boost/sstable_compaction_test.cc index 9eacb5ccca..9584ccd9b7 100644 --- a/test/boost/sstable_compaction_test.cc +++ b/test/boost/sstable_compaction_test.cc @@ -3711,7 +3711,7 @@ SEASTAR_TEST_CASE(autocompaction_control_test) { cf->start(); auto stop_cf = deferred_stop(*cf); cf->trigger_compaction(); - cm.submit(cf.get()); + cm.submit(cf->as_table_state()); BOOST_REQUIRE(cm.get_stats().pending_tasks == 0 && cm.get_stats().active_tasks == 0 && ss.completed_tasks == 0); // enable auto compaction cf->enable_auto_compaction(); From 79f91fe61eceb9caf0a63f5035823a0a8777f356 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Tue, 12 Jul 2022 21:24:28 -0300 Subject: [PATCH 19/30] compaction_manager: compaction_reenabler: switch to table_state Signed-off-by: Raphael S. Carvalho --- compaction/compaction_manager.cc | 14 +++++++------- compaction/compaction_manager.hh | 8 ++++---- replica/database.cc | 4 ++-- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index 24354b926f..3c81b5fb61 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -438,10 +438,10 @@ future<> compaction_manager::run_custom_job(replica::table* t, sstables::compact return perform_task(make_shared(*this, &t->as_table_state(), type, desc, std::move(job))); } -compaction_manager::compaction_reenabler::compaction_reenabler(compaction_manager& cm, replica::table* t) +compaction_manager::compaction_reenabler::compaction_reenabler(compaction_manager& cm, compaction::table_state& t) : _cm(cm) - , _table(t) - , _compaction_state(cm.get_compaction_state(&_table->as_table_state())) + , _table(&t) + , _compaction_state(cm.get_compaction_state(_table)) , _holder(_compaction_state.gate.hold()) { _compaction_state.compaction_disabled_counter++; @@ -462,7 +462,7 @@ compaction_manager::compaction_reenabler::~compaction_reenabler() { cmlog.debug("Reenabling compaction for {}.{}", _table->schema()->ks_name(), _table->schema()->cf_name()); try { - _cm.submit(_table->as_table_state()); + _cm.submit(*_table); } catch (...) { cmlog.warn("compaction_reenabler could not reenable compaction for {}.{}: {}", _table->schema()->ks_name(), _table->schema()->cf_name(), std::current_exception()); @@ -471,15 +471,15 @@ compaction_manager::compaction_reenabler::~compaction_reenabler() { } future -compaction_manager::stop_and_disable_compaction(replica::table* t) { +compaction_manager::stop_and_disable_compaction(compaction::table_state& t) { compaction_reenabler cre(*this, t); - co_await stop_ongoing_compactions("user-triggered operation", &t->as_table_state()); + co_await stop_ongoing_compactions("user-triggered operation", &t); co_return cre; } future<> compaction_manager::run_with_compaction_disabled(replica::table* t, std::function ()> func) { - compaction_reenabler cre = co_await stop_and_disable_compaction(t); + compaction_reenabler cre = co_await stop_and_disable_compaction(t->as_table_state()); co_await func(); } diff --git a/compaction/compaction_manager.hh b/compaction/compaction_manager.hh index 1acc8a920a..1e35d37089 100644 --- a/compaction/compaction_manager.hh +++ b/compaction/compaction_manager.hh @@ -413,17 +413,17 @@ public: class compaction_reenabler { compaction_manager& _cm; - replica::table* _table; + compaction::table_state* _table; compaction_manager::compaction_state& _compaction_state; gate::holder _holder; public: - compaction_reenabler(compaction_manager&, replica::table*); + compaction_reenabler(compaction_manager&, compaction::table_state&); compaction_reenabler(compaction_reenabler&&) noexcept; ~compaction_reenabler(); - replica::table* compacting_table() const noexcept { + compaction::table_state* compacting_table() const noexcept { return _table; } @@ -434,7 +434,7 @@ public: // Disable compaction temporarily for a table t. // Caller should call the compaction_reenabler::reenable - future stop_and_disable_compaction(replica::table* t); + future stop_and_disable_compaction(compaction::table_state& t); // Run a function with compaction temporarily disabled for a table T. future<> run_with_compaction_disabled(replica::table* t, std::function ()> func); diff --git a/replica/database.cc b/replica/database.cc index 8aa24b6d48..bac2572b52 100644 --- a/replica/database.cc +++ b/replica/database.cc @@ -2372,10 +2372,10 @@ future<> database::truncate(const keyspace& ks, column_family& cf, timestamp_fun std::vector cres; cres.reserve(1 + cf.views().size()); - cres.emplace_back(co_await _compaction_manager->stop_and_disable_compaction(&cf)); + cres.emplace_back(co_await _compaction_manager->stop_and_disable_compaction(cf.as_table_state())); co_await coroutine::parallel_for_each(cf.views(), [&, this] (view_ptr v) -> future<> { auto& vcf = find_column_family(v); - cres.emplace_back(co_await _compaction_manager->stop_and_disable_compaction(&vcf)); + cres.emplace_back(co_await _compaction_manager->stop_and_disable_compaction(vcf.as_table_state())); }); bool did_flush = false; From 79e385057fad305683b979c78a25497d0c1d5822 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Tue, 12 Jul 2022 21:27:23 -0300 Subject: [PATCH 20/30] compaction_manager: make run_with_compaction_disabled() switch to table_state Signed-off-by: Raphael S. Carvalho --- compaction/compaction_manager.cc | 6 +++--- compaction/compaction_manager.hh | 2 +- test/perf/perf_fast_forward.cc | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index 3c81b5fb61..be10f760da 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -478,8 +478,8 @@ compaction_manager::stop_and_disable_compaction(compaction::table_state& t) { } future<> -compaction_manager::run_with_compaction_disabled(replica::table* t, std::function ()> func) { - compaction_reenabler cre = co_await stop_and_disable_compaction(t->as_table_state()); +compaction_manager::run_with_compaction_disabled(compaction::table_state& t, std::function ()> func) { + compaction_reenabler cre = co_await stop_and_disable_compaction(t); co_await func(); } @@ -1188,7 +1188,7 @@ future<> compaction_manager::perform_task_on_all_files(replica::table* t, sstabl // compaction. std::vector sstables; compacting_sstable_registration compacting(*this); - co_await run_with_compaction_disabled(t, [this, &sstables, &compacting, get_func = std::move(get_func)] () -> future<> { + co_await run_with_compaction_disabled(t->as_table_state(), [this, &sstables, &compacting, get_func = std::move(get_func)] () -> future<> { // Getting sstables and registering them as compacting must be atomic, to avoid a race condition where // regular compaction runs in between and picks the same files. sstables = co_await get_func(); diff --git a/compaction/compaction_manager.hh b/compaction/compaction_manager.hh index 1e35d37089..fdc09c9b80 100644 --- a/compaction/compaction_manager.hh +++ b/compaction/compaction_manager.hh @@ -437,7 +437,7 @@ public: future stop_and_disable_compaction(compaction::table_state& t); // Run a function with compaction temporarily disabled for a table T. - future<> run_with_compaction_disabled(replica::table* t, std::function ()> func); + future<> run_with_compaction_disabled(compaction::table_state& t, std::function ()> func); // Adds a table to the compaction manager. // Creates a compaction_state structure that can be used for submitting diff --git a/test/perf/perf_fast_forward.cc b/test/perf/perf_fast_forward.cc index 52828f20b5..c08b9594b8 100644 --- a/test/perf/perf_fast_forward.cc +++ b/test/perf/perf_fast_forward.cc @@ -1758,7 +1758,7 @@ void populate(const std::vector& datasets, cql_test_env& env, const ta output_mgr->set_test_param_names({{"flush@ (MiB)", "{:<12}"}}, test_result::stats_names()); - db.get_compaction_manager().run_with_compaction_disabled(&cf, [&] { + db.get_compaction_manager().run_with_compaction_disabled(cf.as_table_state(), [&] { return seastar::async([&] { auto gen = ds.make_generator(s, cfg); while (auto mopt = gen()) { @@ -1864,7 +1864,7 @@ auto make_compaction_disabling_guard(replica::database& db, std::vector pr; for (auto&& t : tables) { // FIXME: discarded future. - (void)db.get_compaction_manager().run_with_compaction_disabled(t, [f = shared_future<>(pr.get_shared_future())] { + (void)db.get_compaction_manager().run_with_compaction_disabled(t->as_table_state(), [f = shared_future<>(pr.get_shared_future())] { return f.get_future(); }); } From 538d412fba03a6af0587a6dd8ddcc2f29a64794b Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Tue, 12 Jul 2022 21:45:02 -0300 Subject: [PATCH 21/30] compaction_manager: rewrite_sstables(): switch to table_state rewrite_sstables() is used by maintenance compactions that perform an operation on a single file at a time. Signed-off-by: Raphael S. Carvalho --- compaction/compaction_manager.cc | 14 +++++++------- compaction/compaction_manager.hh | 4 ++-- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index be10f760da..0436dd0ea2 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -1177,7 +1177,7 @@ private: template requires std::derived_from -future<> compaction_manager::perform_task_on_all_files(replica::table* t, sstables::compaction_type_options options, get_candidates_func get_func, Args... args) { +future<> compaction_manager::perform_task_on_all_files(compaction::table_state& t, sstables::compaction_type_options options, get_candidates_func get_func, Args... args) { if (_state != state::enabled) { co_return; } @@ -1188,7 +1188,7 @@ future<> compaction_manager::perform_task_on_all_files(replica::table* t, sstabl // compaction. std::vector sstables; compacting_sstable_registration compacting(*this); - co_await run_with_compaction_disabled(t->as_table_state(), [this, &sstables, &compacting, get_func = std::move(get_func)] () -> future<> { + co_await run_with_compaction_disabled(t, [this, &sstables, &compacting, get_func = std::move(get_func)] () -> future<> { // Getting sstables and registering them as compacting must be atomic, to avoid a race condition where // regular compaction runs in between and picks the same files. sstables = co_await get_func(); @@ -1201,10 +1201,10 @@ future<> compaction_manager::perform_task_on_all_files(replica::table* t, sstabl return a->data_size() > b->data_size(); }); }); - co_await perform_task(seastar::make_shared(*this, &t->as_table_state(), std::move(options), std::move(sstables), std::move(compacting), std::forward(args)...)); + co_await perform_task(seastar::make_shared(*this, &t, std::move(options), std::move(sstables), std::move(compacting), std::forward(args)...)); } -future<> compaction_manager::rewrite_sstables(replica::table* t, sstables::compaction_type_options options, get_candidates_func get_func, can_purge_tombstones can_purge) { +future<> compaction_manager::rewrite_sstables(compaction::table_state& t, sstables::compaction_type_options options, get_candidates_func get_func, can_purge_tombstones can_purge) { return perform_task_on_all_files(t, std::move(options), std::move(get_func), can_purge); } @@ -1380,7 +1380,7 @@ future<> compaction_manager::perform_cleanup(replica::database& db, replica::tab }); }; - co_await perform_task_on_all_files(t, sstables::compaction_type_options::make_cleanup(std::move(sorted_owned_ranges)), + co_await perform_task_on_all_files(t->as_table_state(), sstables::compaction_type_options::make_cleanup(std::move(sorted_owned_ranges)), std::move(get_sstables)); } @@ -1409,7 +1409,7 @@ future<> compaction_manager::perform_sstable_upgrade(replica::database& db, repl // Note that we potentially could be doing multiple // upgrades here in parallel, but that is really the users // problem. - return rewrite_sstables(t, sstables::compaction_type_options::make_upgrade(db.get_keyspace_local_ranges(t->schema()->ks_name())), std::move(get_sstables)); + return rewrite_sstables(t->as_table_state(), sstables::compaction_type_options::make_upgrade(db.get_keyspace_local_ranges(t->schema()->ks_name())), std::move(get_sstables)); } // Submit a table to be scrubbed and wait for its termination. @@ -1418,7 +1418,7 @@ future<> compaction_manager::perform_sstable_scrub(replica::table* t, sstables:: if (scrub_mode == sstables::compaction_type_options::scrub::mode::validate) { return perform_sstable_scrub_validate_mode(t); } - return rewrite_sstables(t, sstables::compaction_type_options::make_scrub(scrub_mode), [this, t, opts] { + return rewrite_sstables(t->as_table_state(), sstables::compaction_type_options::make_scrub(scrub_mode), [this, t, opts] { auto all_sstables = t->get_sstable_set().all(); std::vector sstables = boost::copy_range>(*all_sstables | boost::adaptors::filtered([&opts] (const sstables::shared_sstable& sst) { diff --git a/compaction/compaction_manager.hh b/compaction/compaction_manager.hh index fdc09c9b80..1562fd3372 100644 --- a/compaction/compaction_manager.hh +++ b/compaction/compaction_manager.hh @@ -341,9 +341,9 @@ private: // by retrieving set of candidates only after all compactions for table T were stopped, if any. template requires std::derived_from - future<> perform_task_on_all_files(replica::table* t, sstables::compaction_type_options options, get_candidates_func, Args... args); + future<> perform_task_on_all_files(compaction::table_state& t, sstables::compaction_type_options options, get_candidates_func, Args... args); - future<> rewrite_sstables(replica::table* t, sstables::compaction_type_options options, get_candidates_func, can_purge_tombstones can_purge = can_purge_tombstones::yes); + future<> rewrite_sstables(compaction::table_state& t, sstables::compaction_type_options options, get_candidates_func, can_purge_tombstones can_purge = can_purge_tombstones::yes); // Stop all fibers, without waiting. Safe to be called multiple times. void do_stop() noexcept; From f547e0f2fb3987143c9e794781be9d02951d7315 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Tue, 12 Jul 2022 21:47:01 -0300 Subject: [PATCH 22/30] compaction_manager: offstrategy: switch to table_state() Signed-off-by: Raphael S. Carvalho --- compaction/compaction_manager.cc | 4 ++-- compaction/compaction_manager.hh | 2 +- replica/table.cc | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index 0436dd0ea2..0086749f3a 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -1099,11 +1099,11 @@ protected: } }; -future compaction_manager::perform_offstrategy(replica::table* t) { +future compaction_manager::perform_offstrategy(compaction::table_state& t) { if (_state != state::enabled) { co_return false; } - auto task = make_shared(*this, &t->as_table_state()); + auto task = make_shared(*this, &t); co_await perform_task(task); co_return task->performed(); } diff --git a/compaction/compaction_manager.hh b/compaction/compaction_manager.hh index 1562fd3372..5153b2446c 100644 --- a/compaction/compaction_manager.hh +++ b/compaction/compaction_manager.hh @@ -381,7 +381,7 @@ public: // Submit a table to be off-strategy compacted. // Returns true iff off-strategy compaction was required and performed. - future perform_offstrategy(replica::table* t); + future perform_offstrategy(compaction::table_state& t); // Submit a table to be cleaned up and wait for its termination. // diff --git a/replica/table.cc b/replica/table.cc index d091d44770..41f63af426 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -1031,7 +1031,7 @@ future table::perform_offstrategy_compaction() { // If the user calls trigger_offstrategy_compaction() to trigger // off-strategy explicitly, cancel the timeout based automatic trigger. _off_strategy_trigger.cancel(); - return _compaction_manager.perform_offstrategy(this); + return _compaction_manager.perform_offstrategy(as_table_state()); } void table::set_compaction_strategy(sstables::compaction_strategy_type strategy) { From bdd049afd622fa87f92b10b29fe0e7028746c2e6 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Tue, 12 Jul 2022 21:52:32 -0300 Subject: [PATCH 23/30] compaction_manager: cleanup: switch to table_state Signed-off-by: Raphael S. Carvalho --- api/storage_service.cc | 2 +- compaction/compaction_manager.cc | 22 +++++++++++----------- compaction/compaction_manager.hh | 2 +- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/api/storage_service.cc b/api/storage_service.cc index 7679050bb8..041bd7f759 100644 --- a/api/storage_service.cc +++ b/api/storage_service.cc @@ -645,7 +645,7 @@ void set_storage_service(http_context& ctx, routes& r, sharded compaction_manager::perform_cleanup(replica::database& db, replica::table* t) { - auto check_for_cleanup = [this, t] { - return boost::algorithm::any_of(_tasks, [t] (auto& task) { - return task->compacting_table() == &t->as_table_state() && task->type() == sstables::compaction_type::Cleanup; +future<> compaction_manager::perform_cleanup(replica::database& db, compaction::table_state& t) { + auto check_for_cleanup = [this, &t] { + return boost::algorithm::any_of(_tasks, [&t] (auto& task) { + return task->compacting_table() == &t && task->type() == sstables::compaction_type::Cleanup; }); }; if (check_for_cleanup()) { throw std::runtime_error(format("cleanup request failed: there is an ongoing cleanup on {}.{}", - t->schema()->ks_name(), t->schema()->cf_name())); + t.schema()->ks_name(), t.schema()->cf_name())); } - auto sorted_owned_ranges = db.get_keyspace_local_ranges(t->schema()->ks_name()); - auto get_sstables = [this, &db, t, sorted_owned_ranges] () -> future> { - return seastar::async([this, &db, t, sorted_owned_ranges = std::move(sorted_owned_ranges)] { - auto schema = t->schema(); + auto sorted_owned_ranges = db.get_keyspace_local_ranges(t.schema()->ks_name()); + auto get_sstables = [this, &db, &t, sorted_owned_ranges] () -> future> { + return seastar::async([this, &db, &t, sorted_owned_ranges = std::move(sorted_owned_ranges)] { + auto schema = t.schema(); auto sstables = std::vector{}; - const auto candidates = get_candidates(t->as_table_state()); + const auto candidates = get_candidates(t); std::copy_if(candidates.begin(), candidates.end(), std::back_inserter(sstables), [&sorted_owned_ranges, schema] (const sstables::shared_sstable& sst) { seastar::thread::maybe_yield(); return sorted_owned_ranges.empty() || needs_cleanup(sst, sorted_owned_ranges, schema); @@ -1380,7 +1380,7 @@ future<> compaction_manager::perform_cleanup(replica::database& db, replica::tab }); }; - co_await perform_task_on_all_files(t->as_table_state(), sstables::compaction_type_options::make_cleanup(std::move(sorted_owned_ranges)), + co_await perform_task_on_all_files(t, sstables::compaction_type_options::make_cleanup(std::move(sorted_owned_ranges)), std::move(get_sstables)); } diff --git a/compaction/compaction_manager.hh b/compaction/compaction_manager.hh index 5153b2446c..db8ac4d895 100644 --- a/compaction/compaction_manager.hh +++ b/compaction/compaction_manager.hh @@ -390,7 +390,7 @@ public: // Cleanup is about discarding keys that are no longer relevant for a // given sstable, e.g. after node loses part of its token range because // of a newly added node. - future<> perform_cleanup(replica::database& db, replica::table* t); + future<> perform_cleanup(replica::database& db, compaction::table_state& t); // Submit a table to be upgraded and wait for its termination. future<> perform_sstable_upgrade(replica::database& db, replica::table* t, bool exclude_current_version); From c2678ca661b48a8e7da071f53e18516be8b842b5 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Sat, 16 Jul 2022 15:26:14 -0300 Subject: [PATCH 24/30] compaction: table_state: add get_sstables_manager() That will be needed for retrieving sstable manager in perform_sstable_upgrade(). Signed-off-by: Raphael S. Carvalho --- compaction/table_state.hh | 2 ++ replica/table.cc | 3 +++ test/boost/sstable_compaction_test.cc | 3 +++ 3 files changed, 8 insertions(+) diff --git a/compaction/table_state.hh b/compaction/table_state.hh index 883ac76bf7..2bb07f1089 100644 --- a/compaction/table_state.hh +++ b/compaction/table_state.hh @@ -11,6 +11,7 @@ #include "schema_fwd.hh" #include "sstables/sstable_set.hh" +#include "sstables/sstables_manager.hh" #include "compaction_descriptor.hh" class reader_permit; @@ -35,6 +36,7 @@ public: virtual const std::vector& compacted_undeleted_sstables() const noexcept = 0; virtual sstables::compaction_strategy& get_compaction_strategy() const noexcept = 0; virtual reader_permit make_compaction_reader_permit() const = 0; + virtual sstables::sstables_manager& get_sstables_manager() noexcept = 0; virtual sstables::shared_sstable make_sstable() const = 0; virtual sstables::sstable_writer_config configure_writer(sstring origin) const = 0; virtual api::timestamp_type min_memtable_timestamp() const = 0; diff --git a/replica/table.cc b/replica/table.cc index 41f63af426..46c622bafe 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -2406,6 +2406,9 @@ public: reader_permit make_compaction_reader_permit() const override { return _t.compaction_concurrency_semaphore().make_tracking_only_permit(schema().get(), "compaction", db::no_timeout); } + sstables::sstables_manager& get_sstables_manager() noexcept override { + return _t.get_sstables_manager(); + } sstables::shared_sstable make_sstable() const override { return _t.make_sstable(); } diff --git a/test/boost/sstable_compaction_test.cc b/test/boost/sstable_compaction_test.cc index 9584ccd9b7..d2b167658a 100644 --- a/test/boost/sstable_compaction_test.cc +++ b/test/boost/sstable_compaction_test.cc @@ -157,6 +157,9 @@ public: reader_permit make_compaction_reader_permit() const override { return _env.make_reader_permit(); } + sstables::sstables_manager& get_sstables_manager() noexcept override { + return _env.manager(); + } sstables::shared_sstable make_sstable() const override { return _t->make_sstable(); } From d29f7070d96729aa8761c6b41110ca062b616179 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Tue, 12 Jul 2022 21:57:51 -0300 Subject: [PATCH 25/30] compaction_manager: upgrade: switch to table_state Signed-off-by: Raphael S. Carvalho --- api/storage_service.cc | 2 +- compaction/compaction_manager.cc | 10 +++++----- compaction/compaction_manager.hh | 2 +- test/boost/database_test.cc | 2 +- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/api/storage_service.cc b/api/storage_service.cc index 041bd7f759..40214c80dc 100644 --- a/api/storage_service.cc +++ b/api/storage_service.cc @@ -672,7 +672,7 @@ void set_storage_service(http_context& ctx, routes& r, sharded(0); diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index 2bacd6d2d1..14a7c25ba1 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -1385,13 +1385,13 @@ future<> compaction_manager::perform_cleanup(replica::database& db, compaction:: } // Submit a table to be upgraded and wait for its termination. -future<> compaction_manager::perform_sstable_upgrade(replica::database& db, replica::table* t, bool exclude_current_version) { - auto get_sstables = [this, &db, t, exclude_current_version] { +future<> compaction_manager::perform_sstable_upgrade(replica::database& db, compaction::table_state& t, bool exclude_current_version) { + auto get_sstables = [this, &db, &t, exclude_current_version] { std::vector tables; - auto last_version = t->get_sstables_manager().get_highest_supported_format(); + auto last_version = t.get_sstables_manager().get_highest_supported_format(); - for (auto& sst : get_candidates(t->as_table_state())) { + for (auto& sst : get_candidates(t)) { // if we are a "normal" upgrade, we only care about // tables with older versions, but potentially // we are to actually rewrite everything. (-a) @@ -1409,7 +1409,7 @@ future<> compaction_manager::perform_sstable_upgrade(replica::database& db, repl // Note that we potentially could be doing multiple // upgrades here in parallel, but that is really the users // problem. - return rewrite_sstables(t->as_table_state(), sstables::compaction_type_options::make_upgrade(db.get_keyspace_local_ranges(t->schema()->ks_name())), std::move(get_sstables)); + return rewrite_sstables(t, sstables::compaction_type_options::make_upgrade(db.get_keyspace_local_ranges(t.schema()->ks_name())), std::move(get_sstables)); } // Submit a table to be scrubbed and wait for its termination. diff --git a/compaction/compaction_manager.hh b/compaction/compaction_manager.hh index db8ac4d895..ad4c7de37b 100644 --- a/compaction/compaction_manager.hh +++ b/compaction/compaction_manager.hh @@ -393,7 +393,7 @@ public: future<> perform_cleanup(replica::database& db, compaction::table_state& t); // Submit a table to be upgraded and wait for its termination. - future<> perform_sstable_upgrade(replica::database& db, replica::table* t, bool exclude_current_version); + future<> perform_sstable_upgrade(replica::database& db, compaction::table_state& t, bool exclude_current_version); // Submit a table to be scrubbed and wait for its termination. future<> perform_sstable_scrub(replica::table* t, sstables::compaction_type_options::scrub opts); diff --git a/test/boost/database_test.cc b/test/boost/database_test.cc index 368f7dcc54..f6642ebbc5 100644 --- a/test/boost/database_test.cc +++ b/test/boost/database_test.cc @@ -951,7 +951,7 @@ SEASTAR_TEST_CASE(upgrade_sstables) { auto& cm = db.get_compaction_manager(); return do_for_each(db.get_column_families(), [&] (std::pair> t) { constexpr bool exclude_current_version = false; - return cm.perform_sstable_upgrade(db, t.second.get(), exclude_current_version); + return cm.perform_sstable_upgrade(db, t.second->as_table_state(), exclude_current_version); }); }).get(); }); From cebe6e22cb9850f165793acf559170396f38bc43 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Sat, 16 Jul 2022 15:37:00 -0300 Subject: [PATCH 26/30] compaction_manager: scrub: switch to table_state Signed-off-by: Raphael S. Carvalho --- api/storage_service.cc | 2 +- compaction/compaction_manager.cc | 21 ++++++++++++++------- compaction/compaction_manager.hh | 4 ++-- test/boost/sstable_compaction_test.cc | 14 +++++++------- 4 files changed, 24 insertions(+), 17 deletions(-) diff --git a/api/storage_service.cc b/api/storage_service.cc index 40214c80dc..4679a1a280 100644 --- a/api/storage_service.cc +++ b/api/storage_service.cc @@ -1408,7 +1408,7 @@ void set_snapshot(http_context& ctx, routes& r, sharded& snap_ return do_for_each(column_families, [=, &db](sstring cfname) { auto& cm = db.get_compaction_manager(); auto& cf = db.find_column_family(keyspace, cfname); - return cm.perform_sstable_scrub(&cf, opts); + return cm.perform_sstable_scrub(cf.as_table_state(), opts); }); }); }).then([]{ diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index 14a7c25ba1..91224a8c54 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -1254,13 +1254,20 @@ private: } }; -future<> compaction_manager::perform_sstable_scrub_validate_mode(replica::table* t) { +static std::vector get_all_sstables(compaction::table_state& t) { + auto s = boost::copy_range>(*t.main_sstable_set().all()); + auto maintenance_set = t.maintenance_sstable_set().all(); + s.insert(s.end(), maintenance_set->begin(), maintenance_set->end()); + return s; +} + +future<> compaction_manager::perform_sstable_scrub_validate_mode(compaction::table_state& t) { if (_state != state::enabled) { return make_ready_future<>(); } // All sstables must be included, even the ones being compacted, such that everything in table is validated. - auto all_sstables = boost::copy_range>(*t->get_sstables()); - return perform_task(seastar::make_shared(*this, &t->as_table_state(), std::move(all_sstables))); + auto all_sstables = get_all_sstables(t); + return perform_task(seastar::make_shared(*this, &t, std::move(all_sstables))); } class compaction_manager::cleanup_sstables_compaction_task : public compaction_manager::task { @@ -1413,14 +1420,14 @@ future<> compaction_manager::perform_sstable_upgrade(replica::database& db, comp } // Submit a table to be scrubbed and wait for its termination. -future<> compaction_manager::perform_sstable_scrub(replica::table* t, sstables::compaction_type_options::scrub opts) { +future<> compaction_manager::perform_sstable_scrub(compaction::table_state& t, sstables::compaction_type_options::scrub opts) { auto scrub_mode = opts.operation_mode; if (scrub_mode == sstables::compaction_type_options::scrub::mode::validate) { return perform_sstable_scrub_validate_mode(t); } - return rewrite_sstables(t->as_table_state(), sstables::compaction_type_options::make_scrub(scrub_mode), [this, t, opts] { - auto all_sstables = t->get_sstable_set().all(); - std::vector sstables = boost::copy_range>(*all_sstables + return rewrite_sstables(t, sstables::compaction_type_options::make_scrub(scrub_mode), [this, &t, opts] { + auto all_sstables = get_all_sstables(t); + std::vector sstables = boost::copy_range>(all_sstables | boost::adaptors::filtered([&opts] (const sstables::shared_sstable& sst) { if (sst->requires_view_building()) { return false; diff --git a/compaction/compaction_manager.hh b/compaction/compaction_manager.hh index ad4c7de37b..23cb88f09e 100644 --- a/compaction/compaction_manager.hh +++ b/compaction/compaction_manager.hh @@ -333,7 +333,7 @@ private: // similar-sized compaction. void postpone_compaction_for_table(compaction::table_state* t); - future<> perform_sstable_scrub_validate_mode(replica::table* t); + future<> perform_sstable_scrub_validate_mode(compaction::table_state& t); using get_candidates_func = std::function>()>; @@ -396,7 +396,7 @@ public: future<> perform_sstable_upgrade(replica::database& db, compaction::table_state& t, bool exclude_current_version); // Submit a table to be scrubbed and wait for its termination. - future<> perform_sstable_scrub(replica::table* t, sstables::compaction_type_options::scrub opts); + future<> perform_sstable_scrub(compaction::table_state& t, sstables::compaction_type_options::scrub opts); // Submit a table for major compaction. future<> perform_major_compaction(replica::table* t); diff --git a/test/boost/sstable_compaction_test.cc b/test/boost/sstable_compaction_test.cc index d2b167658a..5ae3d04d92 100644 --- a/test/boost/sstable_compaction_test.cc +++ b/test/boost/sstable_compaction_test.cc @@ -2321,7 +2321,7 @@ SEASTAR_TEST_CASE(sstable_scrub_validate_mode_test) { sstables::compaction_type_options::scrub opts = { .operation_mode = sstables::compaction_type_options::scrub::mode::validate, }; - compaction_manager.perform_sstable_scrub(table.get(), opts).get(); + compaction_manager.perform_sstable_scrub(table->as_table_state(), opts).get(); BOOST_REQUIRE(sst->is_quarantined()); BOOST_REQUIRE(in_strategy_sstables(table->as_table_state()).empty()); @@ -2519,7 +2519,7 @@ SEASTAR_TEST_CASE(sstable_scrub_skip_mode_test) { // We expect the scrub with mode=srub::mode::abort to stop on the first invalid fragment. sstables::compaction_type_options::scrub opts = {}; opts.operation_mode = sstables::compaction_type_options::scrub::mode::abort; - compaction_manager.perform_sstable_scrub(table.get(), opts).get(); + compaction_manager.perform_sstable_scrub(table->as_table_state(), opts).get(); BOOST_REQUIRE(in_strategy_sstables(table->as_table_state()).size() == 1); verify_fragments(sst, corrupt_fragments); @@ -2528,7 +2528,7 @@ SEASTAR_TEST_CASE(sstable_scrub_skip_mode_test) { // We expect the scrub with mode=srub::mode::skip to get rid of all invalid data. opts.operation_mode = sstables::compaction_type_options::scrub::mode::skip; - compaction_manager.perform_sstable_scrub(table.get(), opts).get(); + compaction_manager.perform_sstable_scrub(table->as_table_state(), opts).get(); BOOST_REQUIRE(in_strategy_sstables(table->as_table_state()).size() == 1); BOOST_REQUIRE(in_strategy_sstables(table->as_table_state()).front() != sst); @@ -2616,7 +2616,7 @@ SEASTAR_TEST_CASE(sstable_scrub_segregate_mode_test) { // We expect the scrub with mode=srub::mode::abort to stop on the first invalid fragment. sstables::compaction_type_options::scrub opts = {}; opts.operation_mode = sstables::compaction_type_options::scrub::mode::abort; - compaction_manager.perform_sstable_scrub(table.get(), opts).get(); + compaction_manager.perform_sstable_scrub(table->as_table_state(), opts).get(); BOOST_REQUIRE(in_strategy_sstables(table->as_table_state()).size() == 1); verify_fragments(sst, corrupt_fragments); @@ -2625,7 +2625,7 @@ SEASTAR_TEST_CASE(sstable_scrub_segregate_mode_test) { // We expect the scrub with mode=srub::mode::segregate to fix all out-of-order data. opts.operation_mode = sstables::compaction_type_options::scrub::mode::segregate; - compaction_manager.perform_sstable_scrub(table.get(), opts).get(); + compaction_manager.perform_sstable_scrub(table->as_table_state(), opts).get(); testlog.info("Scrub resulted in {} sstables", in_strategy_sstables(table->as_table_state()).size()); BOOST_REQUIRE(in_strategy_sstables(table->as_table_state()).size() > 1); @@ -2728,7 +2728,7 @@ SEASTAR_TEST_CASE(sstable_scrub_quarantine_mode_test) { // We expect the scrub with mode=scrub::mode::validate to quarantine the sstable. sstables::compaction_type_options::scrub opts = {}; opts.operation_mode = sstables::compaction_type_options::scrub::mode::validate; - compaction_manager.perform_sstable_scrub(table.get(), opts).get(); + compaction_manager.perform_sstable_scrub(table->as_table_state(), opts).get(); BOOST_REQUIRE(in_strategy_sstables(table->as_table_state()).empty()); BOOST_REQUIRE(sst->is_quarantined()); @@ -2739,7 +2739,7 @@ SEASTAR_TEST_CASE(sstable_scrub_quarantine_mode_test) { // We expect the scrub with mode=scrub::mode::segregate to fix all out-of-order data. opts.operation_mode = sstables::compaction_type_options::scrub::mode::segregate; opts.quarantine_operation_mode = qmode; - compaction_manager.perform_sstable_scrub(table.get(), opts).get(); + compaction_manager.perform_sstable_scrub(table->as_table_state(), opts).get(); switch (qmode) { case sstables::compaction_type_options::scrub::quarantine_mode::include: From 9a1efc69d053e450a73bb5c0ba3f68eea6deb050 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Tue, 12 Jul 2022 22:16:28 -0300 Subject: [PATCH 27/30] compaction_manager: major: switch to table_state Signed-off-by: Raphael S. Carvalho --- compaction/compaction_manager.cc | 4 ++-- compaction/compaction_manager.hh | 2 +- replica/table.cc | 2 +- test/boost/cql_query_large_test.cc | 2 +- test/boost/view_complex_test.cc | 4 ++-- 5 files changed, 7 insertions(+), 7 deletions(-) diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index 91224a8c54..1b1b532931 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -393,11 +393,11 @@ protected: } }; -future<> compaction_manager::perform_major_compaction(replica::table* t) { +future<> compaction_manager::perform_major_compaction(compaction::table_state& t) { if (_state != state::enabled) { return make_ready_future<>(); } - return perform_task(make_shared(*this, &t->as_table_state())); + return perform_task(make_shared(*this, &t)); } class compaction_manager::custom_compaction_task : public compaction_manager::task { diff --git a/compaction/compaction_manager.hh b/compaction/compaction_manager.hh index 23cb88f09e..93d2832c8d 100644 --- a/compaction/compaction_manager.hh +++ b/compaction/compaction_manager.hh @@ -399,7 +399,7 @@ public: future<> perform_sstable_scrub(compaction::table_state& t, sstables::compaction_type_options::scrub opts); // Submit a table for major compaction. - future<> perform_major_compaction(replica::table* t); + future<> perform_major_compaction(compaction::table_state& t); // Run a custom job for a given table, defined by a function diff --git a/replica/table.cc b/replica/table.cc index 46c622bafe..4e1e6606c0 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -988,7 +988,7 @@ table::on_compaction_completion(sstables::compaction_completion_desc desc) { future<> table::compact_all_sstables() { co_await flush(); - co_await _compaction_manager.perform_major_compaction(this); + co_await _compaction_manager.perform_major_compaction(as_table_state()); } void table::start_compaction() { diff --git a/test/boost/cql_query_large_test.cc b/test/boost/cql_query_large_test.cc index 4c892e9fa2..6e56a7f1f8 100644 --- a/test/boost/cql_query_large_test.cc +++ b/test/boost/cql_query_large_test.cc @@ -117,7 +117,7 @@ SEASTAR_THREAD_TEST_CASE(test_large_data) { flush(e); e.db().invoke_on_all([] (replica::database& dbi) { return parallel_for_each(dbi.get_column_families(), [&dbi] (auto& table) { - return dbi.get_compaction_manager().perform_major_compaction(&*table.second); + return dbi.get_compaction_manager().perform_major_compaction((table.second)->as_table_state()); }); }).get(); diff --git a/test/boost/view_complex_test.cc b/test/boost/view_complex_test.cc index 166bef7a12..397ccf0880 100644 --- a/test/boost/view_complex_test.cc +++ b/test/boost/view_complex_test.cc @@ -840,7 +840,7 @@ void test_commutative_row_deletion(cql_test_env& e, std::function&& mayb }}); }); - e.local_db().get_compaction_manager().perform_major_compaction(&e.local_db().find_column_family("ks", "vcf")).get(); + e.local_db().get_compaction_manager().perform_major_compaction(e.local_db().find_column_family("ks", "vcf").as_table_state()).get(); } SEASTAR_TEST_CASE(test_commutative_row_deletion_without_flush) { @@ -1076,7 +1076,7 @@ void test_update_with_column_timestamp_bigger_than_pk(cql_test_env& e, std::func }}); }); - e.local_db().get_compaction_manager().perform_major_compaction(&e.local_db().find_column_family("ks", "vcf")).get(); + e.local_db().get_compaction_manager().perform_major_compaction(e.local_db().find_column_family("ks", "vcf").as_table_state()).get(); eventually([&] { auto msg = e.execute_cql("select * from vcf limit 1").get0(); assert_that(msg).is_rows().with_rows({{ From 31655acb5e4c4858cad794b389e72d1c9582a20b Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Tue, 12 Jul 2022 22:18:33 -0300 Subject: [PATCH 28/30] compaction_manager: make run_custom_job() switch to table_state Signed-off-by: Raphael S. Carvalho --- compaction/compaction_manager.cc | 4 ++-- compaction/compaction_manager.hh | 2 +- sstables/sstable_directory.cc | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index 1b1b532931..df50d8122b 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -430,12 +430,12 @@ protected: } }; -future<> compaction_manager::run_custom_job(replica::table* t, sstables::compaction_type type, const char* desc, noncopyable_function(sstables::compaction_data&)> job) { +future<> compaction_manager::run_custom_job(compaction::table_state& t, sstables::compaction_type type, const char* desc, noncopyable_function(sstables::compaction_data&)> job) { if (_state != state::enabled) { return make_ready_future<>(); } - return perform_task(make_shared(*this, &t->as_table_state(), type, desc, std::move(job))); + return perform_task(make_shared(*this, &t, type, desc, std::move(job))); } compaction_manager::compaction_reenabler::compaction_reenabler(compaction_manager& cm, compaction::table_state& t) diff --git a/compaction/compaction_manager.hh b/compaction/compaction_manager.hh index 93d2832c8d..fa6526bc61 100644 --- a/compaction/compaction_manager.hh +++ b/compaction/compaction_manager.hh @@ -409,7 +409,7 @@ public: // parameter type is the compaction type the operation can most closely be // associated with, use compaction_type::Compaction, if none apply. // parameter job is a function that will carry the operation - future<> run_custom_job(replica::table* t, sstables::compaction_type type, const char *desc, noncopyable_function(sstables::compaction_data&)> job); + future<> run_custom_job(compaction::table_state& s, sstables::compaction_type type, const char *desc, noncopyable_function(sstables::compaction_data&)> job); class compaction_reenabler { compaction_manager& _cm; diff --git a/sstables/sstable_directory.cc b/sstables/sstable_directory.cc index 1005bbf694..c3e0e7e3bc 100644 --- a/sstables/sstable_directory.cc +++ b/sstables/sstable_directory.cc @@ -363,7 +363,7 @@ future sstable_directory::reshape(compaction_manager& cm, replica::tab desc.creator = creator; - return cm.run_custom_job(&table, compaction_type::Reshape, "Reshape compaction", [this, &table, sstlist = std::move(sstlist), desc = std::move(desc)] (sstables::compaction_data& info) mutable { + return cm.run_custom_job(table.as_table_state(), compaction_type::Reshape, "Reshape compaction", [this, &table, sstlist = std::move(sstlist), desc = std::move(desc)] (sstables::compaction_data& info) mutable { return sstables::compact_sstables(std::move(desc), info, table.as_table_state()).then([this, sstlist = std::move(sstlist)] (sstables::compaction_result result) mutable { return remove_input_sstables_from_reshaping(std::move(sstlist)).then([this, new_sstables = std::move(result.new_sstables)] () mutable { return collect_output_sstables_from_reshaping(std::move(new_sstables)); @@ -418,7 +418,7 @@ sstable_directory::reshard(sstable_info_vector shared_info, compaction_manager& // parallel_for_each so the statistics about pending jobs are updated to reflect all // jobs. But only one will run in parallel at a time return parallel_for_each(buckets, [this, iop, &cm, &table, creator = std::move(creator)] (std::vector& sstlist) mutable { - return cm.run_custom_job(&table, compaction_type::Reshard, "Reshard compaction", [this, iop, &cm, &table, creator, &sstlist] (sstables::compaction_data& info) { + return cm.run_custom_job(table.as_table_state(), compaction_type::Reshard, "Reshard compaction", [this, iop, &cm, &table, creator, &sstlist] (sstables::compaction_data& info) { sstables::compaction_descriptor desc(sstlist, iop); desc.options = sstables::compaction_type_options::make_reshard(); desc.creator = std::move(creator); From a94d974835ffe3301018858bf2033ad6558e328b Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Tue, 12 Jul 2022 22:21:17 -0300 Subject: [PATCH 29/30] compaction_manager: make add() and remove() switch to table_state Signed-off-by: Raphael S. Carvalho --- compaction/compaction_manager.cc | 18 +++++++++--------- compaction/compaction_manager.hh | 4 ++-- replica/table.cc | 4 ++-- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index df50d8122b..0aa3d6c8bd 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -1445,16 +1445,16 @@ future<> compaction_manager::perform_sstable_scrub(compaction::table_state& t, s }, can_purge_tombstones::no); } -void compaction_manager::add(replica::table* t) { - auto [_, inserted] = _compaction_state.insert({&t->as_table_state(), compaction_state{}}); +void compaction_manager::add(compaction::table_state& t) { + auto [_, inserted] = _compaction_state.insert({&t, compaction_state{}}); if (!inserted) { - auto s = t->schema(); - on_internal_error(cmlog, format("compaction_state for table {}.{} [{}] already exists", s->ks_name(), s->cf_name(), fmt::ptr(t))); + auto s = t.schema(); + on_internal_error(cmlog, format("compaction_state for table {}.{} [{}] already exists", s->ks_name(), s->cf_name(), fmt::ptr(&t))); } } -future<> compaction_manager::remove(replica::table* t) { - auto handle = _compaction_state.extract(&t->as_table_state()); +future<> compaction_manager::remove(compaction::table_state& t) { + auto handle = _compaction_state.extract(&t); if (!handle.empty()) { auto& c_state = handle.mapped(); @@ -1462,10 +1462,10 @@ future<> compaction_manager::remove(replica::table* t) { // We need to guarantee that a task being stopped will not retry to compact // a table being removed. // The requirement above is provided by stop_ongoing_compactions(). - _postponed.erase(&t->as_table_state()); + _postponed.erase(&t); // Wait for the termination of an ongoing compaction on table T, if any. - co_await stop_ongoing_compactions("table removal", &t->as_table_state()); + co_await stop_ongoing_compactions("table removal", &t); // Wait for all functions running under gate to terminate. co_await c_state.gate.close(); @@ -1474,7 +1474,7 @@ future<> compaction_manager::remove(replica::table* t) { auto found = false; sstring msg; for (auto& task : _tasks) { - if (task->compacting_table() == &t->as_table_state()) { + if (task->compacting_table() == &t) { if (!msg.empty()) { msg += "\n"; } diff --git a/compaction/compaction_manager.hh b/compaction/compaction_manager.hh index fa6526bc61..02100c20cb 100644 --- a/compaction/compaction_manager.hh +++ b/compaction/compaction_manager.hh @@ -442,11 +442,11 @@ public: // Adds a table to the compaction manager. // Creates a compaction_state structure that can be used for submitting // compaction jobs of all types. - void add(replica::table* t); + void add(compaction::table_state& t); // Remove a table from the compaction manager. // Cancel requests on table and wait for possible ongoing compactions. - future<> remove(replica::table* t); + future<> remove(compaction::table_state& t); const stats& get_stats() const { return _stats; diff --git a/replica/table.cc b/replica/table.cc index 4e1e6606c0..b567595792 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -747,7 +747,7 @@ table::stop() { return _async_gate.close().then([this] { return await_pending_ops().finally([this] { return _memtables->flush().finally([this] { - return _compaction_manager.remove(this).then([this] { + return _compaction_manager.remove(as_table_state()).then([this] { return _sstable_deletion_gate.close().then([this] { return get_row_cache().invalidate(row_cache::external_updater([this] { _main_sstables = _compaction_strategy.make_sstable_set(_schema); @@ -1183,7 +1183,7 @@ table::table(schema_ptr schema, config config, db::commitlog* cl, compaction_man tlogger.warn("Writes disabled, column family no durable."); } set_metrics(); - _compaction_manager.add(this); + _compaction_manager.add(as_table_state()); update_optimized_twcs_queries_flag(); } From 246e945086ee3799d218f49f5c5503230db5e858 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Sat, 16 Jul 2022 15:41:36 -0300 Subject: [PATCH 30/30] compaction: remove forward declaration of replica::table compaction_manager.cc still cannot stop including replica/database.hh because upgrade and scrub still take replica::database as param, but I'll remove it soon in another series. Signed-off-by: Raphael S. Carvalho --- compaction/compaction_manager.hh | 4 ---- 1 file changed, 4 deletions(-) diff --git a/compaction/compaction_manager.hh b/compaction/compaction_manager.hh index 02100c20cb..e4b3b2ef8b 100644 --- a/compaction/compaction_manager.hh +++ b/compaction/compaction_manager.hh @@ -34,10 +34,6 @@ #include "seastarx.hh" #include "sstables/exceptions.hh" -namespace replica { -class table; -} - class compacting_sstable_registration; // Compaction manager provides facilities to submit and track compaction jobs on