From 7a7e287d8cd28e8a2fd535db51fb9c06b54e3207 Mon Sep 17 00:00:00 2001 From: Aleksandra Martyniuk Date: Fri, 19 May 2023 18:08:28 +0200 Subject: [PATCH] compaction: add reshard_sstables_compaction_task_impl Add task manager's task covering resharding compaction. A struct and some functions are moved from replica/distributed_loader.cc to compaction/task_manager_module.cc. --- compaction/task_manager_module.cc | 193 ++++++++++++++++++++++++++++++ compaction/task_manager_module.hh | 25 ++++ replica/distributed_loader.cc | 190 +---------------------------- 3 files changed, 222 insertions(+), 186 deletions(-) diff --git a/compaction/task_manager_module.cc b/compaction/task_manager_module.cc index d8988454ec..7f8753931f 100644 --- a/compaction/task_manager_module.cc +++ b/compaction/task_manager_module.cc @@ -6,6 +6,8 @@ * SPDX-License-Identifier: AGPL-3.0-or-later */ +#include + #include "compaction/task_manager_module.hh" #include "compaction/compaction_manager.hh" #include "replica/database.hh" @@ -13,6 +15,191 @@ #include "sstables/sstable_directory.hh" #include "utils/pretty_printers.hh" +namespace replica { + +// Helper structure for resharding. +// +// Describes the sstables (represented by their foreign_sstable_open_info) that are shared and +// need to be resharded. Each shard will keep one such descriptor, that contains the list of +// SSTables assigned to it, and their total size. The total size is used to make sure we are +// fairly balancing SSTables among shards. +struct reshard_shard_descriptor { + sstables::sstable_directory::sstable_open_info_vector info_vec; + uint64_t uncompressed_data_size = 0; + + bool total_size_smaller(const reshard_shard_descriptor& rhs) const { + return uncompressed_data_size < rhs.uncompressed_data_size; + } + + uint64_t size() const { + return uncompressed_data_size; + } +}; + +} // namespace replica + +// Collects shared SSTables from all shards and sstables that require cleanup and returns a vector containing them all. +// This function assumes that the list of SSTables can be fairly big so it is careful to +// manipulate it in a do_for_each loop (which yields) instead of using standard accumulators. +future +collect_all_shared_sstables(sharded& dir, sharded& db, sstring ks_name, sstring table_name, compaction::owned_ranges_ptr owned_ranges_ptr) { + auto info_vec = sstables::sstable_directory::sstable_open_info_vector(); + + // We want to make sure that each distributed object reshards about the same amount of data. + // Each sharded object has its own shared SSTables. We can use a clever algorithm in which they + // all distributely figure out which SSTables to exchange, but we'll keep it simple and move all + // their foreign_sstable_open_info to a coordinator (the shard who called this function). We can + // move in bulk and that's efficient. That shard can then distribute the work among all the + // others who will reshard. + auto coordinator = this_shard_id(); + // We will first move all of the foreign open info to temporary storage so that we can sort + // them. We want to distribute bigger sstables first. + const auto* sorted_owned_ranges_ptr = owned_ranges_ptr.get(); + co_await dir.invoke_on_all([&] (sstables::sstable_directory& d) -> future<> { + auto shared_sstables = d.retrieve_shared_sstables(); + sstables::sstable_directory::sstable_open_info_vector need_cleanup; + if (sorted_owned_ranges_ptr) { + co_await d.filter_sstables([&] (sstables::shared_sstable sst) -> future { + if (needs_cleanup(sst, *sorted_owned_ranges_ptr)) { + need_cleanup.push_back(co_await sst->get_open_info()); + co_return false; + } + co_return true; + }); + } + if (shared_sstables.empty() && need_cleanup.empty()) { + co_return; + } + co_await smp::submit_to(coordinator, [&] () -> future<> { + info_vec.reserve(info_vec.size() + shared_sstables.size() + need_cleanup.size()); + for (auto& info : shared_sstables) { + info_vec.emplace_back(std::move(info)); + co_await coroutine::maybe_yield(); + } + for (auto& info : need_cleanup) { + info_vec.emplace_back(std::move(info)); + co_await coroutine::maybe_yield(); + } + }); + }); + + co_return info_vec; +} + +// Given a vector of shared sstables to be resharded, distribute it among all shards. +// The vector is first sorted to make sure that we are moving the biggest SSTables first. +// +// Returns a reshard_shard_descriptor per shard indicating the work that each shard has to do. +future> +distribute_reshard_jobs(sstables::sstable_directory::sstable_open_info_vector source) { + auto destinations = std::vector(smp::count); + + std::sort(source.begin(), source.end(), [] (const sstables::foreign_sstable_open_info& a, const sstables::foreign_sstable_open_info& b) { + // Sort on descending SSTable sizes. + return a.uncompressed_data_size > b.uncompressed_data_size; + }); + + for (auto& info : source) { + // Choose the stable shard owner with the smallest amount of accumulated work. + // Note that for sstables that need cleanup via resharding, owners may contain + // a single shard. + auto shard_it = boost::min_element(info.owners, [&] (const shard_id& lhs, const shard_id& rhs) { + return destinations[lhs].total_size_smaller(destinations[rhs]); + }); + auto& dest = destinations[*shard_it]; + dest.uncompressed_data_size += info.uncompressed_data_size; + dest.info_vec.push_back(std::move(info)); + co_await coroutine::maybe_yield(); + } + + co_return destinations; +} + +// reshards a collection of SSTables. +// +// A reference to the compaction manager must be passed so we can register with it. Knowing +// which table is being processed is a requirement of the compaction manager, so this must be +// passed too. +// +// We will reshard max_sstables_per_job at once. +// +// A creator function must be passed that will create an SSTable object in the correct shard, +// and an I/O priority must be specified. +future<> reshard(sstables::sstable_directory& dir, sstables::sstable_directory::sstable_open_info_vector shared_info, replica::table& table, + sstables::compaction_sstable_creator_fn creator, compaction::owned_ranges_ptr owned_ranges_ptr) +{ + // Resharding doesn't like empty sstable sets, so bail early. There is nothing + // to reshard in this shard. + if (shared_info.empty()) { + co_return; + } + + // We want to reshard many SSTables at a time for efficiency. However if we have too many we may + // be risking OOM. + auto max_sstables_per_job = table.schema()->max_compaction_threshold(); + auto num_jobs = (shared_info.size() + max_sstables_per_job - 1) / max_sstables_per_job; + auto sstables_per_job = shared_info.size() / num_jobs; + + std::vector> buckets; + buckets.reserve(num_jobs); + buckets.emplace_back(); + co_await coroutine::parallel_for_each(shared_info, [&] (sstables::foreign_sstable_open_info& info) -> future<> { + auto sst = co_await dir.load_foreign_sstable(info); + // Last bucket gets leftover SSTables + if ((buckets.back().size() >= sstables_per_job) && (buckets.size() < num_jobs)) { + buckets.emplace_back(); + } + buckets.back().push_back(std::move(sst)); + }); + // There is a semaphore inside the compaction manager in run_resharding_jobs. So we + // parallel_for_each so the statistics about pending jobs are updated to reflect all + // jobs. But only one will run in parallel at a time + auto& t = table.as_table_state(); + co_await coroutine::parallel_for_each(buckets, [&] (std::vector& sstlist) mutable { + return table.get_compaction_manager().run_custom_job(table.as_table_state(), sstables::compaction_type::Reshard, "Reshard compaction", [&] (sstables::compaction_data& info) -> future<> { + auto erm = table.get_effective_replication_map(); // keep alive around compaction. + + sstables::compaction_descriptor desc(sstlist); + desc.options = sstables::compaction_type_options::make_reshard(); + desc.creator = creator; + desc.sharder = &erm->get_sharder(*table.schema()); + desc.owned_ranges = owned_ranges_ptr; + + auto result = co_await sstables::compact_sstables(std::move(desc), info, t); + // input sstables are moved, to guarantee their resources are released once we're done + // resharding them. + co_await when_all_succeed(dir.collect_output_unshared_sstables(std::move(result.new_sstables), sstables::sstable_directory::can_be_remote::yes), dir.remove_sstables(std::move(sstlist))).discard_result(); + }); + }); +} + +future<> run_resharding_jobs(sharded& dir, std::vector reshard_jobs, + sharded& db, sstring ks_name, sstring table_name, sstables::compaction_sstable_creator_fn creator, + compaction::owned_ranges_ptr owned_ranges_ptr) { + + uint64_t total_size = boost::accumulate(reshard_jobs | boost::adaptors::transformed(std::mem_fn(&replica::reshard_shard_descriptor::size)), uint64_t(0)); + if (total_size == 0) { + co_return; + } + + auto start = std::chrono::steady_clock::now(); + dblog.info("Resharding {} for {}.{}", utils::pretty_printed_data_size(total_size), ks_name, table_name); + + co_await dir.invoke_on_all(coroutine::lambda([&] (sstables::sstable_directory& d) -> future<> { + auto& table = db.local().find_column_family(ks_name, table_name); + auto info_vec = std::move(reshard_jobs[this_shard_id()].info_vec); + // make shard-local copy of owned_ranges + compaction::owned_ranges_ptr local_owned_ranges_ptr; + if (owned_ranges_ptr) { + local_owned_ranges_ptr = make_lw_shared(*owned_ranges_ptr); + } + co_await reshard(d, std::move(info_vec), table, creator, std::move(local_owned_ranges_ptr)); + co_await d.move_foreign_sstables(dir); + })); + + auto duration = std::chrono::duration_cast>(std::chrono::steady_clock::now() - start); + dblog.info("Resharded {} for {}.{} in {:.2f} seconds, {}", utils::pretty_printed_data_size(total_size), ks_name, table_name, duration.count(), utils::pretty_printed_throughput(total_size, duration)); +} namespace compaction { struct table_tasks_info { @@ -359,4 +546,10 @@ future<> shard_reshaping_compaction_task_impl::run() { _total_shard_size = reshaped_size; } +future<> table_resharding_compaction_task_impl::run() { + auto all_jobs = co_await collect_all_shared_sstables(_dir, _db, _status.keyspace, _status.table, _owned_ranges_ptr); + auto destinations = co_await distribute_reshard_jobs(std::move(all_jobs)); + co_await run_resharding_jobs(_dir, std::move(destinations), _db, _status.keyspace, _status.table, std::move(_creator), std::move(_owned_ranges_ptr)); +} + } diff --git a/compaction/task_manager_module.hh b/compaction/task_manager_module.hh index e62bb8b24d..b702e0828b 100644 --- a/compaction/task_manager_module.hh +++ b/compaction/task_manager_module.hh @@ -16,6 +16,7 @@ namespace sstables { class sstable_directory; } + namespace compaction { class compaction_task_impl : public tasks::task_manager::task::impl { @@ -572,6 +573,30 @@ protected: virtual future<> run() override = 0; }; +class table_resharding_compaction_task_impl : public resharding_compaction_task_impl { +private: + sharded& _dir; + sharded& _db; + sstables::compaction_sstable_creator_fn _creator; + compaction::owned_ranges_ptr _owned_ranges_ptr; +public: + table_resharding_compaction_task_impl(tasks::task_manager::module_ptr module, + std::string keyspace, + std::string table, + sharded& dir, + sharded& db, + sstables::compaction_sstable_creator_fn creator, + compaction::owned_ranges_ptr owned_ranges_ptr) noexcept + : resharding_compaction_task_impl(module, tasks::task_id::create_random_id(), module->new_sequence_number(), std::move(keyspace), std::move(table), "", tasks::task_id::create_null_id()) + , _dir(dir) + , _db(db) + , _creator(std::move(creator)) + , _owned_ranges_ptr(std::move(owned_ranges_ptr)) + {} +protected: + virtual future<> run() override; +}; + class task_manager_module : public tasks::task_manager::module { public: task_manager_module(tasks::task_manager& tm) noexcept : tasks::task_manager::module(tm, "compaction") {} diff --git a/replica/distributed_loader.cc b/replica/distributed_loader.cc index e5def81620..a7ac85aaee 100644 --- a/replica/distributed_loader.cc +++ b/replica/distributed_loader.cc @@ -23,6 +23,7 @@ #include "utils/lister.hh" #include "compaction/compaction.hh" #include "compaction/compaction_manager.hh" +#include "compaction/task_manager_module.hh" #include "sstables/sstables.hh" #include "sstables/sstables_manager.hh" #include "sstables/sstable_directory.hh" @@ -31,7 +32,6 @@ #include "db/view/view_update_checks.hh" #include #include -#include #include "db/view/view_update_generator.hh" #include "utils/directories.hh" @@ -114,188 +114,6 @@ distributed_loader::lock_table(sharded& dir, sharde }); } -// Helper structure for resharding. -// -// Describes the sstables (represented by their foreign_sstable_open_info) that are shared and -// need to be resharded. Each shard will keep one such descriptor, that contains the list of -// SSTables assigned to it, and their total size. The total size is used to make sure we are -// fairly balancing SSTables among shards. -struct reshard_shard_descriptor { - sstables::sstable_directory::sstable_open_info_vector info_vec; - uint64_t uncompressed_data_size = 0; - - bool total_size_smaller(const reshard_shard_descriptor& rhs) const { - return uncompressed_data_size < rhs.uncompressed_data_size; - } - - uint64_t size() const { - return uncompressed_data_size; - } -}; - -// Collects shared SSTables from all shards and sstables that require cleanup and returns a vector containing them all. -// This function assumes that the list of SSTables can be fairly big so it is careful to -// manipulate it in a do_for_each loop (which yields) instead of using standard accumulators. -future -collect_all_shared_sstables(sharded& dir, sharded& db, sstring ks_name, sstring table_name, compaction::owned_ranges_ptr owned_ranges_ptr) { - auto info_vec = sstables::sstable_directory::sstable_open_info_vector(); - - // We want to make sure that each distributed object reshards about the same amount of data. - // Each sharded object has its own shared SSTables. We can use a clever algorithm in which they - // all distributely figure out which SSTables to exchange, but we'll keep it simple and move all - // their foreign_sstable_open_info to a coordinator (the shard who called this function). We can - // move in bulk and that's efficient. That shard can then distribute the work among all the - // others who will reshard. - auto coordinator = this_shard_id(); - // We will first move all of the foreign open info to temporary storage so that we can sort - // them. We want to distribute bigger sstables first. - const auto* sorted_owned_ranges_ptr = owned_ranges_ptr.get(); - co_await dir.invoke_on_all([&] (sstables::sstable_directory& d) -> future<> { - auto shared_sstables = d.retrieve_shared_sstables(); - sstables::sstable_directory::sstable_open_info_vector need_cleanup; - if (sorted_owned_ranges_ptr) { - co_await d.filter_sstables([&] (sstables::shared_sstable sst) -> future { - if (needs_cleanup(sst, *sorted_owned_ranges_ptr)) { - need_cleanup.push_back(co_await sst->get_open_info()); - co_return false; - } - co_return true; - }); - } - if (shared_sstables.empty() && need_cleanup.empty()) { - co_return; - } - co_await smp::submit_to(coordinator, [&] () -> future<> { - info_vec.reserve(info_vec.size() + shared_sstables.size() + need_cleanup.size()); - for (auto& info : shared_sstables) { - info_vec.emplace_back(std::move(info)); - co_await coroutine::maybe_yield(); - } - for (auto& info : need_cleanup) { - info_vec.emplace_back(std::move(info)); - co_await coroutine::maybe_yield(); - } - }); - }); - - co_return info_vec; -} - -// Given a vector of shared sstables to be resharded, distribute it among all shards. -// The vector is first sorted to make sure that we are moving the biggest SSTables first. -// -// Returns a reshard_shard_descriptor per shard indicating the work that each shard has to do. -future> -distribute_reshard_jobs(sstables::sstable_directory::sstable_open_info_vector source) { - auto destinations = std::vector(smp::count); - - std::sort(source.begin(), source.end(), [] (const sstables::foreign_sstable_open_info& a, const sstables::foreign_sstable_open_info& b) { - // Sort on descending SSTable sizes. - return a.uncompressed_data_size > b.uncompressed_data_size; - }); - - for (auto& info : source) { - // Choose the stable shard owner with the smallest amount of accumulated work. - // Note that for sstables that need cleanup via resharding, owners may contain - // a single shard. - auto shard_it = boost::min_element(info.owners, [&] (const shard_id& lhs, const shard_id& rhs) { - return destinations[lhs].total_size_smaller(destinations[rhs]); - }); - auto& dest = destinations[*shard_it]; - dest.uncompressed_data_size += info.uncompressed_data_size; - dest.info_vec.push_back(std::move(info)); - co_await coroutine::maybe_yield(); - } - - co_return destinations; -} - -// reshards a collection of SSTables. -// -// A reference to the compaction manager must be passed so we can register with it. Knowing -// which table is being processed is a requirement of the compaction manager, so this must be -// passed too. -// -// We will reshard max_sstables_per_job at once. -// -// A creator function must be passed that will create an SSTable object in the correct shard, -// and an I/O priority must be specified. -future<> reshard(sstables::sstable_directory& dir, sstables::sstable_directory::sstable_open_info_vector shared_info, replica::table& table, - sstables::compaction_sstable_creator_fn creator, compaction::owned_ranges_ptr owned_ranges_ptr) -{ - // Resharding doesn't like empty sstable sets, so bail early. There is nothing - // to reshard in this shard. - if (shared_info.empty()) { - co_return; - } - - // We want to reshard many SSTables at a time for efficiency. However if we have too many we may - // be risking OOM. - auto max_sstables_per_job = table.schema()->max_compaction_threshold(); - auto num_jobs = (shared_info.size() + max_sstables_per_job - 1) / max_sstables_per_job; - auto sstables_per_job = shared_info.size() / num_jobs; - - std::vector> buckets; - buckets.reserve(num_jobs); - buckets.emplace_back(); - co_await coroutine::parallel_for_each(shared_info, [&] (sstables::foreign_sstable_open_info& info) -> future<> { - auto sst = co_await dir.load_foreign_sstable(info); - // Last bucket gets leftover SSTables - if ((buckets.back().size() >= sstables_per_job) && (buckets.size() < num_jobs)) { - buckets.emplace_back(); - } - buckets.back().push_back(std::move(sst)); - }); - // There is a semaphore inside the compaction manager in run_resharding_jobs. So we - // parallel_for_each so the statistics about pending jobs are updated to reflect all - // jobs. But only one will run in parallel at a time - auto& t = table.as_table_state(); - co_await coroutine::parallel_for_each(buckets, [&] (std::vector& sstlist) mutable { - return table.get_compaction_manager().run_custom_job(table.as_table_state(), sstables::compaction_type::Reshard, "Reshard compaction", [&] (sstables::compaction_data& info) -> future<> { - auto erm = table.get_effective_replication_map(); // keep alive around compaction. - - sstables::compaction_descriptor desc(sstlist); - desc.options = sstables::compaction_type_options::make_reshard(); - desc.creator = creator; - desc.sharder = &erm->get_sharder(*table.schema()); - desc.owned_ranges = owned_ranges_ptr; - - auto result = co_await sstables::compact_sstables(std::move(desc), info, t); - // input sstables are moved, to guarantee their resources are released once we're done - // resharding them. - co_await when_all_succeed(dir.collect_output_unshared_sstables(std::move(result.new_sstables), sstables::sstable_directory::can_be_remote::yes), dir.remove_sstables(std::move(sstlist))).discard_result(); - }); - }); -} - -future<> run_resharding_jobs(sharded& dir, std::vector reshard_jobs, - sharded& db, sstring ks_name, sstring table_name, sstables::compaction_sstable_creator_fn creator, - compaction::owned_ranges_ptr owned_ranges_ptr) { - - uint64_t total_size = boost::accumulate(reshard_jobs | boost::adaptors::transformed(std::mem_fn(&reshard_shard_descriptor::size)), uint64_t(0)); - if (total_size == 0) { - co_return; - } - - auto start = std::chrono::steady_clock::now(); - dblog.info("Resharding {} for {}.{}", utils::pretty_printed_data_size(total_size), ks_name, table_name); - - co_await dir.invoke_on_all(coroutine::lambda([&] (sstables::sstable_directory& d) -> future<> { - auto& table = db.local().find_column_family(ks_name, table_name); - auto info_vec = std::move(reshard_jobs[this_shard_id()].info_vec); - // make shard-local copy of owned_ranges - compaction::owned_ranges_ptr local_owned_ranges_ptr; - if (owned_ranges_ptr) { - local_owned_ranges_ptr = make_lw_shared(*owned_ranges_ptr); - } - co_await ::replica::reshard(d, std::move(info_vec), table, creator, std::move(local_owned_ranges_ptr)); - co_await d.move_foreign_sstables(dir); - })); - - auto duration = std::chrono::duration_cast>(std::chrono::steady_clock::now() - start); - dblog.info("Resharded {} for {}.{} in {:.2f} seconds, {}", utils::pretty_printed_data_size(total_size), ks_name, table_name, duration.count(), utils::pretty_printed_throughput(total_size, duration)); -} - // Global resharding function. Done in two parts: // - The first part spreads the foreign_sstable_open_info across shards so that all of them are // resharding about the same amount of data @@ -303,9 +121,9 @@ future<> run_resharding_jobs(sharded& dir, std::vec // assigned. future<> distributed_loader::reshard(sharded& dir, sharded& db, sstring ks_name, sstring table_name, sstables::compaction_sstable_creator_fn creator, compaction::owned_ranges_ptr owned_ranges_ptr) { - auto all_jobs = co_await collect_all_shared_sstables(dir, db, ks_name, table_name, owned_ranges_ptr); - auto destinations = co_await distribute_reshard_jobs(std::move(all_jobs)); - co_await run_resharding_jobs(dir, std::move(destinations), db, ks_name, table_name, std::move(creator), std::move(owned_ranges_ptr)); + auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module(); + auto task = co_await compaction_module.make_and_start_task({}, std::move(ks_name), std::move(table_name), dir, db, std::move(creator), std::move(owned_ranges_ptr)); + co_await task->done(); } future