From 981a50e4907cb01eea0777cbee1a6c315592a9a4 Mon Sep 17 00:00:00 2001 From: Aleksandra Martyniuk Date: Fri, 26 May 2023 15:38:24 +0200 Subject: [PATCH 1/7] compaction: add reshaping_compaction_task_impl reshaping_compaction_task_impl serves as a base class of all concrete reshaping compaction task classes. --- compaction/task_manager_module.hh | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/compaction/task_manager_module.hh b/compaction/task_manager_module.hh index ab1b68bc93..49ee966f14 100644 --- a/compaction/task_manager_module.hh +++ b/compaction/task_manager_module.hh @@ -466,6 +466,27 @@ protected: virtual future<> run() override; }; +class reshaping_compaction_task_impl : public compaction_task_impl { +public: + reshaping_compaction_task_impl(tasks::task_manager::module_ptr module, + tasks::task_id id, + unsigned sequence_number, + std::string keyspace, + std::string table, + std::string entity, + tasks::task_id parent_id) noexcept + : compaction_task_impl(module, id, sequence_number, std::move(keyspace), std::move(table), std::move(entity), parent_id) + { + // FIXME: add progress units + } + + virtual std::string type() const override { + return "reshaping compaction"; + } +protected: + virtual future<> run() override = 0; +}; + class task_manager_module : public tasks::task_manager::module { public: task_manager_module(tasks::task_manager& tm) noexcept : tasks::task_manager::module(tm, "compaction") {} From dace5fb004be351d4d9fa9205ca92e26afb95932 Mon Sep 17 00:00:00 2001 From: Aleksandra Martyniuk Date: Fri, 26 May 2023 16:12:31 +0200 Subject: [PATCH 2/7] compaction: copy reshape to task_manager_module.cc distributed_loader::reshape is copied to compaction/task_manager_module.cc as it will be used in reshape compaction tasks. --- compaction/task_manager_module.cc | 60 +++++++++++++++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/compaction/task_manager_module.cc b/compaction/task_manager_module.cc index 6eb708666e..61884c016c 100644 --- a/compaction/task_manager_module.cc +++ b/compaction/task_manager_module.cc @@ -9,6 +9,8 @@ #include "compaction/task_manager_module.hh" #include "compaction/compaction_manager.hh" #include "replica/database.hh" +#include "sstables/sstables.hh" +#include "sstables/sstable_directory.hh" namespace compaction { @@ -91,6 +93,64 @@ future<> run_table_tasks(replica::database& db, std::vector ta } } +using sstable_filter_func_t = std::function; + +future reshape(sstables::sstable_directory& dir, replica::table& table, sstables::compaction_sstable_creator_fn creator, + sstables::reshape_mode mode, sstable_filter_func_t filter) +{ + uint64_t reshaped_size = 0; + + while (true) { + auto reshape_candidates = boost::copy_range>(dir.get_unshared_local_sstables() + | boost::adaptors::filtered([&filter] (const auto& sst) { + return filter(sst); + })); + auto desc = table.get_compaction_strategy().get_reshaping_job(std::move(reshape_candidates), table.schema(), mode); + if (desc.sstables.empty()) { + break; + } + + if (!reshaped_size) { + dblog.info("Table {}.{} with compaction strategy {} found SSTables that need reshape. Starting reshape process", table.schema()->ks_name(), table.schema()->cf_name(), table.get_compaction_strategy().name()); + } + + std::vector sstlist; + for (auto& sst : desc.sstables) { + reshaped_size += sst->data_size(); + sstlist.push_back(sst); + } + + desc.creator = creator; + + std::exception_ptr ex; + try { + co_await table.get_compaction_manager().run_custom_job(table.as_table_state(), sstables::compaction_type::Reshape, "Reshape compaction", [&dir, &table, sstlist = std::move(sstlist), desc = std::move(desc)] (sstables::compaction_data& info) mutable -> future<> { + sstables::compaction_result result = co_await sstables::compact_sstables(std::move(desc), info, table.as_table_state()); + co_await dir.remove_unshared_sstables(std::move(sstlist)); + co_await dir.collect_output_unshared_sstables(std::move(result.new_sstables), sstables::sstable_directory::can_be_remote::no); + }); + } catch (...) { + ex = std::current_exception(); + } + + if (ex != nullptr) { + try { + std::rethrow_exception(std::move(ex)); + } catch (sstables::compaction_stopped_exception& e) { + dblog.info("Table {}.{} with compaction strategy {} had reshape successfully aborted.", table.schema()->ks_name(), table.schema()->cf_name(), table.get_compaction_strategy().name()); + break; + } catch (...) { + dblog.info("Reshape failed for Table {}.{} with compaction strategy {} due to {}", table.schema()->ks_name(), table.schema()->cf_name(), table.get_compaction_strategy().name(), std::current_exception()); + break; + } + } + + co_await coroutine::maybe_yield(); + } + + co_return reshaped_size; +} + future<> major_keyspace_compaction_task_impl::run() { co_await _db.invoke_on_all([&] (replica::database& db) -> future<> { tasks::task_info parent_info{_status.id, _status.shard}; From e3e2d6b88668dbf96ea32cda8e0aaeff0bdc769a Mon Sep 17 00:00:00 2001 From: Aleksandra Martyniuk Date: Fri, 26 May 2023 16:48:41 +0200 Subject: [PATCH 3/7] compaction: add table_reshaping_compaction_task_impl --- compaction/task_manager_module.cc | 12 ++++++++++++ compaction/task_manager_module.hh | 30 ++++++++++++++++++++++++++++++ replica/distributed_loader.cc | 14 +++----------- 3 files changed, 45 insertions(+), 11 deletions(-) diff --git a/compaction/task_manager_module.cc b/compaction/task_manager_module.cc index 61884c016c..80f5a322cc 100644 --- a/compaction/task_manager_module.cc +++ b/compaction/task_manager_module.cc @@ -340,5 +340,17 @@ future<> table_scrub_sstables_compaction_task_impl::run() { }); } +future<> table_reshaping_compaction_task_impl::run() { + auto start = std::chrono::steady_clock::now(); + auto total_size = co_await _dir.map_reduce0([&db = _db, &ks_name = _status.keyspace, &table_name = _status.table, creator = std::move(_creator), mode = _mode, filter = _filter] (sstables::sstable_directory& d) { + auto& table = db.local().find_column_family(ks_name, table_name); + return reshape(d, table, creator, mode, filter); + }, uint64_t(0), std::plus()); + + if (total_size > 0) { + auto duration = std::chrono::duration_cast>(std::chrono::steady_clock::now() - start); + dblog.info("Reshaped {} in {:.2f} seconds, {}", sstables::pretty_printed_data_size(total_size), duration.count(), sstables::pretty_printed_throughput(total_size, duration)); + } +} } diff --git a/compaction/task_manager_module.hh b/compaction/task_manager_module.hh index 49ee966f14..31e17a9eca 100644 --- a/compaction/task_manager_module.hh +++ b/compaction/task_manager_module.hh @@ -13,6 +13,9 @@ #include "schema/schema_fwd.hh" #include "tasks/task_manager.hh" +namespace sstables { +class sstable_directory; +} namespace compaction { class compaction_task_impl : public tasks::task_manager::task::impl { @@ -487,6 +490,33 @@ protected: virtual future<> run() override = 0; }; +class table_reshaping_compaction_task_impl : public reshaping_compaction_task_impl { +private: + sharded& _dir; + sharded& _db; + sstables::reshape_mode _mode; + sstables::compaction_sstable_creator_fn _creator; + std::function _filter; +public: + table_reshaping_compaction_task_impl(tasks::task_manager::module_ptr module, + std::string keyspace, + std::string table, + sharded& dir, + sharded& db, + sstables::reshape_mode mode, + sstables::compaction_sstable_creator_fn creator, + std::function filter) noexcept + : reshaping_compaction_task_impl(module, tasks::task_id::create_random_id(), module->new_sequence_number(), std::move(keyspace), std::move(table), "", tasks::task_id::create_null_id()) + , _dir(dir) + , _db(db) + , _mode(mode) + , _creator(std::move(creator)) + , _filter(std::move(filter)) + {} +protected: + virtual future<> run() override; +}; + class task_manager_module : public tasks::task_manager::module { public: task_manager_module(tasks::task_manager& tm) noexcept : tasks::task_manager::module(tm, "compaction") {} diff --git a/replica/distributed_loader.cc b/replica/distributed_loader.cc index 8851da9053..9277f25d75 100644 --- a/replica/distributed_loader.cc +++ b/replica/distributed_loader.cc @@ -378,17 +378,9 @@ future<> distributed_loader::reshape(sharded& dir, sharded& db, sstables::reshape_mode mode, sstring ks_name, sstring table_name, sstables::compaction_sstable_creator_fn creator, std::function filter) { - - auto start = std::chrono::steady_clock::now(); - auto total_size = co_await dir.map_reduce0([&db, ks_name = std::move(ks_name), table_name = std::move(table_name), creator = std::move(creator), mode, filter] (sstables::sstable_directory& d) { - auto& table = db.local().find_column_family(ks_name, table_name); - return ::replica::reshape(d, table, creator, mode, filter); - }, uint64_t(0), std::plus()); - - if (total_size > 0) { - auto duration = std::chrono::duration_cast>(std::chrono::steady_clock::now() - start); - dblog.info("Reshaped {} in {:.2f} seconds, {}", sstables::pretty_printed_data_size(total_size), duration.count(), sstables::pretty_printed_throughput(total_size, duration)); - } + auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module(); + auto task = co_await compaction_module.make_and_start_task({}, std::move(ks_name), std::move(table_name), dir, db, mode, std::move(creator), std::move(filter)); + co_await task->done(); } // Loads SSTables into the main directory (or staging) and returns how many were loaded From 19ec5b42561f77eeb08039a9730e782e867bc2ee Mon Sep 17 00:00:00 2001 From: Aleksandra Martyniuk Date: Fri, 26 May 2023 17:36:59 +0200 Subject: [PATCH 4/7] replica: delete unused function --- replica/distributed_loader.cc | 58 ----------------------------------- 1 file changed, 58 deletions(-) diff --git a/replica/distributed_loader.cc b/replica/distributed_loader.cc index 9277f25d75..af0eb012c1 100644 --- a/replica/distributed_loader.cc +++ b/replica/distributed_loader.cc @@ -316,64 +316,6 @@ highest_version_seen(sharded& dir, sstables::sstabl }); } -using sstable_filter_func_t = std::function; - -future reshape(sstables::sstable_directory& dir, replica::table& table, sstables::compaction_sstable_creator_fn creator, - sstables::reshape_mode mode, sstable_filter_func_t filter) -{ - uint64_t reshaped_size = 0; - - while (true) { - auto reshape_candidates = boost::copy_range>(dir.get_unshared_local_sstables() - | boost::adaptors::filtered([&filter] (const auto& sst) { - return filter(sst); - })); - auto desc = table.get_compaction_strategy().get_reshaping_job(std::move(reshape_candidates), table.schema(), mode); - if (desc.sstables.empty()) { - break; - } - - if (!reshaped_size) { - dblog.info("Table {}.{} with compaction strategy {} found SSTables that need reshape. Starting reshape process", table.schema()->ks_name(), table.schema()->cf_name(), table.get_compaction_strategy().name()); - } - - std::vector sstlist; - for (auto& sst : desc.sstables) { - reshaped_size += sst->data_size(); - sstlist.push_back(sst); - } - - desc.creator = creator; - - std::exception_ptr ex; - try { - co_await table.get_compaction_manager().run_custom_job(table.as_table_state(), sstables::compaction_type::Reshape, "Reshape compaction", [&dir, &table, sstlist = std::move(sstlist), desc = std::move(desc)] (sstables::compaction_data& info) mutable -> future<> { - sstables::compaction_result result = co_await sstables::compact_sstables(std::move(desc), info, table.as_table_state()); - co_await dir.remove_unshared_sstables(std::move(sstlist)); - co_await dir.collect_output_unshared_sstables(std::move(result.new_sstables), sstables::sstable_directory::can_be_remote::no); - }); - } catch (...) { - ex = std::current_exception(); - } - - if (ex != nullptr) { - try { - std::rethrow_exception(std::move(ex)); - } catch (sstables::compaction_stopped_exception& e) { - dblog.info("Table {}.{} with compaction strategy {} had reshape successfully aborted.", table.schema()->ks_name(), table.schema()->cf_name(), table.get_compaction_strategy().name()); - break; - } catch (...) { - dblog.info("Reshape failed for Table {}.{} with compaction strategy {} due to {}", table.schema()->ks_name(), table.schema()->cf_name(), table.get_compaction_strategy().name(), std::current_exception()); - break; - } - } - - co_await coroutine::maybe_yield(); - } - - co_return reshaped_size; -} - future<> distributed_loader::reshape(sharded& dir, sharded& db, sstables::reshape_mode mode, sstring ks_name, sstring table_name, sstables::compaction_sstable_creator_fn creator, From 1960904a726942e9c99c71ec74b76a0a144beadc Mon Sep 17 00:00:00 2001 From: Aleksandra Martyniuk Date: Fri, 26 May 2023 17:32:49 +0200 Subject: [PATCH 5/7] compaction: add shard_reshaping_compaction_task_impl shard_reshaping_compaction_task_impl covers reshaping compaction on one shard. --- compaction/task_manager_module.cc | 19 +++++++++++++++--- compaction/task_manager_module.hh | 33 +++++++++++++++++++++++++++++++ 2 files changed, 49 insertions(+), 3 deletions(-) diff --git a/compaction/task_manager_module.cc b/compaction/task_manager_module.cc index 80f5a322cc..44afcc239c 100644 --- a/compaction/task_manager_module.cc +++ b/compaction/task_manager_module.cc @@ -342,9 +342,13 @@ future<> table_scrub_sstables_compaction_task_impl::run() { future<> table_reshaping_compaction_task_impl::run() { auto start = std::chrono::steady_clock::now(); - auto total_size = co_await _dir.map_reduce0([&db = _db, &ks_name = _status.keyspace, &table_name = _status.table, creator = std::move(_creator), mode = _mode, filter = _filter] (sstables::sstable_directory& d) { - auto& table = db.local().find_column_family(ks_name, table_name); - return reshape(d, table, creator, mode, filter); + auto total_size = co_await _dir.map_reduce0([&] (sstables::sstable_directory& d) -> future { + uint64_t total_shard_size; + tasks::task_info parent_info{_status.id, _status.shard}; + auto& compaction_module = _db.local().get_compaction_manager().get_task_manager_module(); + auto task = co_await compaction_module.make_and_start_task(parent_info, _status.keyspace, _status.table, _status.id, d, _db, _mode, _creator, _filter, total_shard_size); + co_await task->done(); + co_return total_shard_size; }, uint64_t(0), std::plus()); if (total_size > 0) { @@ -353,4 +357,13 @@ future<> table_reshaping_compaction_task_impl::run() { } } +tasks::is_internal shard_reshaping_compaction_task_impl::is_internal() const noexcept { + return tasks::is_internal::yes; +} + +future<> shard_reshaping_compaction_task_impl::run() { + auto& table = _db.local().find_column_family(_status.keyspace, _status.table); + _total_shard_size = co_await reshape(_dir, table, _creator, _mode, _filter); +} + } diff --git a/compaction/task_manager_module.hh b/compaction/task_manager_module.hh index 31e17a9eca..6a88913d36 100644 --- a/compaction/task_manager_module.hh +++ b/compaction/task_manager_module.hh @@ -517,6 +517,39 @@ protected: virtual future<> run() override; }; +class shard_reshaping_compaction_task_impl : public reshaping_compaction_task_impl { +private: + sstables::sstable_directory& _dir; + sharded& _db; + sstables::reshape_mode _mode; + sstables::compaction_sstable_creator_fn _creator; + std::function _filter; + uint64_t& _total_shard_size; +public: + shard_reshaping_compaction_task_impl(tasks::task_manager::module_ptr module, + std::string keyspace, + std::string table, + tasks::task_id parent_id, + sstables::sstable_directory& dir, + sharded& db, + sstables::reshape_mode mode, + sstables::compaction_sstable_creator_fn creator, + std::function filter, + uint64_t& total_shard_size) noexcept + : reshaping_compaction_task_impl(module, tasks::task_id::create_random_id(), 0, std::move(keyspace), std::move(table), "", parent_id) + , _dir(dir) + , _db(db) + , _mode(mode) + , _creator(std::move(creator)) + , _filter(std::move(filter)) + , _total_shard_size(total_shard_size) + {} + + virtual tasks::is_internal is_internal() const noexcept override; +protected: + virtual future<> run() override; +}; + class task_manager_module : public tasks::task_manager::module { public: task_manager_module(tasks::task_manager& tm) noexcept : tasks::task_manager::module(tm, "compaction") {} From f9a527b06dd16a8f1ea8922836c929c12a15a7b6 Mon Sep 17 00:00:00 2001 From: Aleksandra Martyniuk Date: Wed, 31 May 2023 14:44:40 +0200 Subject: [PATCH 6/7] compaction: move reshape function to shard_reshaping_table_compaction_task_impl::run() --- compaction/task_manager_module.cc | 110 ++++++++++++++---------------- 1 file changed, 51 insertions(+), 59 deletions(-) diff --git a/compaction/task_manager_module.cc b/compaction/task_manager_module.cc index 44afcc239c..84ca9e4635 100644 --- a/compaction/task_manager_module.cc +++ b/compaction/task_manager_module.cc @@ -93,64 +93,6 @@ future<> run_table_tasks(replica::database& db, std::vector ta } } -using sstable_filter_func_t = std::function; - -future reshape(sstables::sstable_directory& dir, replica::table& table, sstables::compaction_sstable_creator_fn creator, - sstables::reshape_mode mode, sstable_filter_func_t filter) -{ - uint64_t reshaped_size = 0; - - while (true) { - auto reshape_candidates = boost::copy_range>(dir.get_unshared_local_sstables() - | boost::adaptors::filtered([&filter] (const auto& sst) { - return filter(sst); - })); - auto desc = table.get_compaction_strategy().get_reshaping_job(std::move(reshape_candidates), table.schema(), mode); - if (desc.sstables.empty()) { - break; - } - - if (!reshaped_size) { - dblog.info("Table {}.{} with compaction strategy {} found SSTables that need reshape. Starting reshape process", table.schema()->ks_name(), table.schema()->cf_name(), table.get_compaction_strategy().name()); - } - - std::vector sstlist; - for (auto& sst : desc.sstables) { - reshaped_size += sst->data_size(); - sstlist.push_back(sst); - } - - desc.creator = creator; - - std::exception_ptr ex; - try { - co_await table.get_compaction_manager().run_custom_job(table.as_table_state(), sstables::compaction_type::Reshape, "Reshape compaction", [&dir, &table, sstlist = std::move(sstlist), desc = std::move(desc)] (sstables::compaction_data& info) mutable -> future<> { - sstables::compaction_result result = co_await sstables::compact_sstables(std::move(desc), info, table.as_table_state()); - co_await dir.remove_unshared_sstables(std::move(sstlist)); - co_await dir.collect_output_unshared_sstables(std::move(result.new_sstables), sstables::sstable_directory::can_be_remote::no); - }); - } catch (...) { - ex = std::current_exception(); - } - - if (ex != nullptr) { - try { - std::rethrow_exception(std::move(ex)); - } catch (sstables::compaction_stopped_exception& e) { - dblog.info("Table {}.{} with compaction strategy {} had reshape successfully aborted.", table.schema()->ks_name(), table.schema()->cf_name(), table.get_compaction_strategy().name()); - break; - } catch (...) { - dblog.info("Reshape failed for Table {}.{} with compaction strategy {} due to {}", table.schema()->ks_name(), table.schema()->cf_name(), table.get_compaction_strategy().name(), std::current_exception()); - break; - } - } - - co_await coroutine::maybe_yield(); - } - - co_return reshaped_size; -} - future<> major_keyspace_compaction_task_impl::run() { co_await _db.invoke_on_all([&] (replica::database& db) -> future<> { tasks::task_info parent_info{_status.id, _status.shard}; @@ -363,7 +305,57 @@ tasks::is_internal shard_reshaping_compaction_task_impl::is_internal() const noe future<> shard_reshaping_compaction_task_impl::run() { auto& table = _db.local().find_column_family(_status.keyspace, _status.table); - _total_shard_size = co_await reshape(_dir, table, _creator, _mode, _filter); + uint64_t reshaped_size = 0; + + while (true) { + auto reshape_candidates = boost::copy_range>(_dir.get_unshared_local_sstables() + | boost::adaptors::filtered([&filter = _filter] (const auto& sst) { + return filter(sst); + })); + auto desc = table.get_compaction_strategy().get_reshaping_job(std::move(reshape_candidates), table.schema(), _mode); + if (desc.sstables.empty()) { + break; + } + + if (!reshaped_size) { + dblog.info("Table {}.{} with compaction strategy {} found SSTables that need reshape. Starting reshape process", table.schema()->ks_name(), table.schema()->cf_name(), table.get_compaction_strategy().name()); + } + + std::vector sstlist; + for (auto& sst : desc.sstables) { + reshaped_size += sst->data_size(); + sstlist.push_back(sst); + } + + desc.creator = _creator; + + std::exception_ptr ex; + try { + co_await table.get_compaction_manager().run_custom_job(table.as_table_state(), sstables::compaction_type::Reshape, "Reshape compaction", [&dir = _dir, &table, sstlist = std::move(sstlist), desc = std::move(desc)] (sstables::compaction_data& info) mutable -> future<> { + sstables::compaction_result result = co_await sstables::compact_sstables(std::move(desc), info, table.as_table_state()); + co_await dir.remove_unshared_sstables(std::move(sstlist)); + co_await dir.collect_output_unshared_sstables(std::move(result.new_sstables), sstables::sstable_directory::can_be_remote::no); + }); + } catch (...) { + ex = std::current_exception(); + } + + if (ex != nullptr) { + try { + std::rethrow_exception(std::move(ex)); + } catch (sstables::compaction_stopped_exception& e) { + dblog.info("Table {}.{} with compaction strategy {} had reshape successfully aborted.", table.schema()->ks_name(), table.schema()->cf_name(), table.get_compaction_strategy().name()); + break; + } catch (...) { + dblog.info("Reshape failed for Table {}.{} with compaction strategy {} due to {}", table.schema()->ks_name(), table.schema()->cf_name(), table.get_compaction_strategy().name(), std::current_exception()); + break; + } + } + + co_await coroutine::maybe_yield(); + } + + _total_shard_size = reshaped_size; } } From b02a5fd18401c3bf8744b144990836ba3b41357e Mon Sep 17 00:00:00 2001 From: Aleksandra Martyniuk Date: Thu, 1 Jun 2023 16:50:54 +0200 Subject: [PATCH 7/7] test: extend test_compaction_task.py to test reshaping compaction --- test/rest_api/test_compaction_task.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/test/rest_api/test_compaction_task.py b/test/rest_api/test_compaction_task.py index addf4a0220..eb4e55e4ce 100644 --- a/test/rest_api/test_compaction_task.py +++ b/test/rest_api/test_compaction_task.py @@ -55,3 +55,7 @@ def test_rewrite_sstables_keyspace_compaction_task(cql, this_dc, rest_api): check_compaction_task(cql, this_dc, rest_api, lambda keyspace, _: rest_api.send("GET", f"storage_service/keyspace_upgrade_sstables/{keyspace}"), "rewrite sstables compaction", task_tree_depth) # scrub sstables compaction check_compaction_task(cql, this_dc, rest_api, lambda keyspace, _: rest_api.send("GET", f"storage_service/keyspace_scrub/{keyspace}"), "rewrite sstables compaction", task_tree_depth) + +def test_reshaping_compaction_task(cql, this_dc, rest_api): + task_tree_depth = 1 + check_compaction_task(cql, this_dc, rest_api, lambda keyspace, table: rest_api.send("POST", f"storage_service/sstables/{keyspace}", {'cf': table, 'load_and_stream': False}), "reshaping compaction", task_tree_depth)