compaction: add reshard_sstables_compaction_task_impl

Add task manager's task covering resharding compaction.

A struct and some functions are moved from replica/distributed_loader.cc
to compaction/task_manager_module.cc.
This commit is contained in:
Aleksandra Martyniuk
2023-05-19 18:08:28 +02:00
parent e486f4eba6
commit 7a7e287d8c
3 changed files with 222 additions and 186 deletions

View File

@@ -6,6 +6,8 @@
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#include <boost/range/algorithm/min_element.hpp>
#include "compaction/task_manager_module.hh"
#include "compaction/compaction_manager.hh"
#include "replica/database.hh"
@@ -13,6 +15,191 @@
#include "sstables/sstable_directory.hh"
#include "utils/pretty_printers.hh"
namespace replica {
// Helper structure for resharding.
//
// Describes the sstables (represented by their foreign_sstable_open_info) that are shared and
// need to be resharded. Each shard will keep one such descriptor, that contains the list of
// SSTables assigned to it, and their total size. The total size is used to make sure we are
// fairly balancing SSTables among shards.
struct reshard_shard_descriptor {
sstables::sstable_directory::sstable_open_info_vector info_vec;
uint64_t uncompressed_data_size = 0;
bool total_size_smaller(const reshard_shard_descriptor& rhs) const {
return uncompressed_data_size < rhs.uncompressed_data_size;
}
uint64_t size() const {
return uncompressed_data_size;
}
};
} // namespace replica
// Collects shared SSTables from all shards and sstables that require cleanup and returns a vector containing them all.
// This function assumes that the list of SSTables can be fairly big so it is careful to
// manipulate it in a do_for_each loop (which yields) instead of using standard accumulators.
future<sstables::sstable_directory::sstable_open_info_vector>
collect_all_shared_sstables(sharded<sstables::sstable_directory>& dir, sharded<replica::database>& db, sstring ks_name, sstring table_name, compaction::owned_ranges_ptr owned_ranges_ptr) {
auto info_vec = sstables::sstable_directory::sstable_open_info_vector();
// We want to make sure that each distributed object reshards about the same amount of data.
// Each sharded object has its own shared SSTables. We can use a clever algorithm in which they
// all distributely figure out which SSTables to exchange, but we'll keep it simple and move all
// their foreign_sstable_open_info to a coordinator (the shard who called this function). We can
// move in bulk and that's efficient. That shard can then distribute the work among all the
// others who will reshard.
auto coordinator = this_shard_id();
// We will first move all of the foreign open info to temporary storage so that we can sort
// them. We want to distribute bigger sstables first.
const auto* sorted_owned_ranges_ptr = owned_ranges_ptr.get();
co_await dir.invoke_on_all([&] (sstables::sstable_directory& d) -> future<> {
auto shared_sstables = d.retrieve_shared_sstables();
sstables::sstable_directory::sstable_open_info_vector need_cleanup;
if (sorted_owned_ranges_ptr) {
co_await d.filter_sstables([&] (sstables::shared_sstable sst) -> future<bool> {
if (needs_cleanup(sst, *sorted_owned_ranges_ptr)) {
need_cleanup.push_back(co_await sst->get_open_info());
co_return false;
}
co_return true;
});
}
if (shared_sstables.empty() && need_cleanup.empty()) {
co_return;
}
co_await smp::submit_to(coordinator, [&] () -> future<> {
info_vec.reserve(info_vec.size() + shared_sstables.size() + need_cleanup.size());
for (auto& info : shared_sstables) {
info_vec.emplace_back(std::move(info));
co_await coroutine::maybe_yield();
}
for (auto& info : need_cleanup) {
info_vec.emplace_back(std::move(info));
co_await coroutine::maybe_yield();
}
});
});
co_return info_vec;
}
// Given a vector of shared sstables to be resharded, distribute it among all shards.
// The vector is first sorted to make sure that we are moving the biggest SSTables first.
//
// Returns a reshard_shard_descriptor per shard indicating the work that each shard has to do.
future<std::vector<replica::reshard_shard_descriptor>>
distribute_reshard_jobs(sstables::sstable_directory::sstable_open_info_vector source) {
auto destinations = std::vector<replica::reshard_shard_descriptor>(smp::count);
std::sort(source.begin(), source.end(), [] (const sstables::foreign_sstable_open_info& a, const sstables::foreign_sstable_open_info& b) {
// Sort on descending SSTable sizes.
return a.uncompressed_data_size > b.uncompressed_data_size;
});
for (auto& info : source) {
// Choose the stable shard owner with the smallest amount of accumulated work.
// Note that for sstables that need cleanup via resharding, owners may contain
// a single shard.
auto shard_it = boost::min_element(info.owners, [&] (const shard_id& lhs, const shard_id& rhs) {
return destinations[lhs].total_size_smaller(destinations[rhs]);
});
auto& dest = destinations[*shard_it];
dest.uncompressed_data_size += info.uncompressed_data_size;
dest.info_vec.push_back(std::move(info));
co_await coroutine::maybe_yield();
}
co_return destinations;
}
// reshards a collection of SSTables.
//
// A reference to the compaction manager must be passed so we can register with it. Knowing
// which table is being processed is a requirement of the compaction manager, so this must be
// passed too.
//
// We will reshard max_sstables_per_job at once.
//
// A creator function must be passed that will create an SSTable object in the correct shard,
// and an I/O priority must be specified.
future<> reshard(sstables::sstable_directory& dir, sstables::sstable_directory::sstable_open_info_vector shared_info, replica::table& table,
sstables::compaction_sstable_creator_fn creator, compaction::owned_ranges_ptr owned_ranges_ptr)
{
// Resharding doesn't like empty sstable sets, so bail early. There is nothing
// to reshard in this shard.
if (shared_info.empty()) {
co_return;
}
// We want to reshard many SSTables at a time for efficiency. However if we have too many we may
// be risking OOM.
auto max_sstables_per_job = table.schema()->max_compaction_threshold();
auto num_jobs = (shared_info.size() + max_sstables_per_job - 1) / max_sstables_per_job;
auto sstables_per_job = shared_info.size() / num_jobs;
std::vector<std::vector<sstables::shared_sstable>> buckets;
buckets.reserve(num_jobs);
buckets.emplace_back();
co_await coroutine::parallel_for_each(shared_info, [&] (sstables::foreign_sstable_open_info& info) -> future<> {
auto sst = co_await dir.load_foreign_sstable(info);
// Last bucket gets leftover SSTables
if ((buckets.back().size() >= sstables_per_job) && (buckets.size() < num_jobs)) {
buckets.emplace_back();
}
buckets.back().push_back(std::move(sst));
});
// There is a semaphore inside the compaction manager in run_resharding_jobs. So we
// parallel_for_each so the statistics about pending jobs are updated to reflect all
// jobs. But only one will run in parallel at a time
auto& t = table.as_table_state();
co_await coroutine::parallel_for_each(buckets, [&] (std::vector<sstables::shared_sstable>& sstlist) mutable {
return table.get_compaction_manager().run_custom_job(table.as_table_state(), sstables::compaction_type::Reshard, "Reshard compaction", [&] (sstables::compaction_data& info) -> future<> {
auto erm = table.get_effective_replication_map(); // keep alive around compaction.
sstables::compaction_descriptor desc(sstlist);
desc.options = sstables::compaction_type_options::make_reshard();
desc.creator = creator;
desc.sharder = &erm->get_sharder(*table.schema());
desc.owned_ranges = owned_ranges_ptr;
auto result = co_await sstables::compact_sstables(std::move(desc), info, t);
// input sstables are moved, to guarantee their resources are released once we're done
// resharding them.
co_await when_all_succeed(dir.collect_output_unshared_sstables(std::move(result.new_sstables), sstables::sstable_directory::can_be_remote::yes), dir.remove_sstables(std::move(sstlist))).discard_result();
});
});
}
future<> run_resharding_jobs(sharded<sstables::sstable_directory>& dir, std::vector<replica::reshard_shard_descriptor> reshard_jobs,
sharded<replica::database>& db, sstring ks_name, sstring table_name, sstables::compaction_sstable_creator_fn creator,
compaction::owned_ranges_ptr owned_ranges_ptr) {
uint64_t total_size = boost::accumulate(reshard_jobs | boost::adaptors::transformed(std::mem_fn(&replica::reshard_shard_descriptor::size)), uint64_t(0));
if (total_size == 0) {
co_return;
}
auto start = std::chrono::steady_clock::now();
dblog.info("Resharding {} for {}.{}", utils::pretty_printed_data_size(total_size), ks_name, table_name);
co_await dir.invoke_on_all(coroutine::lambda([&] (sstables::sstable_directory& d) -> future<> {
auto& table = db.local().find_column_family(ks_name, table_name);
auto info_vec = std::move(reshard_jobs[this_shard_id()].info_vec);
// make shard-local copy of owned_ranges
compaction::owned_ranges_ptr local_owned_ranges_ptr;
if (owned_ranges_ptr) {
local_owned_ranges_ptr = make_lw_shared<const dht::token_range_vector>(*owned_ranges_ptr);
}
co_await reshard(d, std::move(info_vec), table, creator, std::move(local_owned_ranges_ptr));
co_await d.move_foreign_sstables(dir);
}));
auto duration = std::chrono::duration_cast<std::chrono::duration<float>>(std::chrono::steady_clock::now() - start);
dblog.info("Resharded {} for {}.{} in {:.2f} seconds, {}", utils::pretty_printed_data_size(total_size), ks_name, table_name, duration.count(), utils::pretty_printed_throughput(total_size, duration));
}
namespace compaction {
struct table_tasks_info {
@@ -359,4 +546,10 @@ future<> shard_reshaping_compaction_task_impl::run() {
_total_shard_size = reshaped_size;
}
future<> table_resharding_compaction_task_impl::run() {
auto all_jobs = co_await collect_all_shared_sstables(_dir, _db, _status.keyspace, _status.table, _owned_ranges_ptr);
auto destinations = co_await distribute_reshard_jobs(std::move(all_jobs));
co_await run_resharding_jobs(_dir, std::move(destinations), _db, _status.keyspace, _status.table, std::move(_creator), std::move(_owned_ranges_ptr));
}
}

View File

@@ -16,6 +16,7 @@
namespace sstables {
class sstable_directory;
}
namespace compaction {
class compaction_task_impl : public tasks::task_manager::task::impl {
@@ -572,6 +573,30 @@ protected:
virtual future<> run() override = 0;
};
class table_resharding_compaction_task_impl : public resharding_compaction_task_impl {
private:
sharded<sstables::sstable_directory>& _dir;
sharded<replica::database>& _db;
sstables::compaction_sstable_creator_fn _creator;
compaction::owned_ranges_ptr _owned_ranges_ptr;
public:
table_resharding_compaction_task_impl(tasks::task_manager::module_ptr module,
std::string keyspace,
std::string table,
sharded<sstables::sstable_directory>& dir,
sharded<replica::database>& db,
sstables::compaction_sstable_creator_fn creator,
compaction::owned_ranges_ptr owned_ranges_ptr) noexcept
: resharding_compaction_task_impl(module, tasks::task_id::create_random_id(), module->new_sequence_number(), std::move(keyspace), std::move(table), "", tasks::task_id::create_null_id())
, _dir(dir)
, _db(db)
, _creator(std::move(creator))
, _owned_ranges_ptr(std::move(owned_ranges_ptr))
{}
protected:
virtual future<> run() override;
};
class task_manager_module : public tasks::task_manager::module {
public:
task_manager_module(tasks::task_manager& tm) noexcept : tasks::task_manager::module(tm, "compaction") {}

View File

@@ -23,6 +23,7 @@
#include "utils/lister.hh"
#include "compaction/compaction.hh"
#include "compaction/compaction_manager.hh"
#include "compaction/task_manager_module.hh"
#include "sstables/sstables.hh"
#include "sstables/sstables_manager.hh"
#include "sstables/sstable_directory.hh"
@@ -31,7 +32,6 @@
#include "db/view/view_update_checks.hh"
#include <unordered_map>
#include <boost/range/adaptor/map.hpp>
#include <boost/range/algorithm/min_element.hpp>
#include "db/view/view_update_generator.hh"
#include "utils/directories.hh"
@@ -114,188 +114,6 @@ distributed_loader::lock_table(sharded<sstables::sstable_directory>& dir, sharde
});
}
// Helper structure for resharding.
//
// Describes the sstables (represented by their foreign_sstable_open_info) that are shared and
// need to be resharded. Each shard will keep one such descriptor, that contains the list of
// SSTables assigned to it, and their total size. The total size is used to make sure we are
// fairly balancing SSTables among shards.
struct reshard_shard_descriptor {
sstables::sstable_directory::sstable_open_info_vector info_vec;
uint64_t uncompressed_data_size = 0;
bool total_size_smaller(const reshard_shard_descriptor& rhs) const {
return uncompressed_data_size < rhs.uncompressed_data_size;
}
uint64_t size() const {
return uncompressed_data_size;
}
};
// Collects shared SSTables from all shards and sstables that require cleanup and returns a vector containing them all.
// This function assumes that the list of SSTables can be fairly big so it is careful to
// manipulate it in a do_for_each loop (which yields) instead of using standard accumulators.
future<sstables::sstable_directory::sstable_open_info_vector>
collect_all_shared_sstables(sharded<sstables::sstable_directory>& dir, sharded<replica::database>& db, sstring ks_name, sstring table_name, compaction::owned_ranges_ptr owned_ranges_ptr) {
auto info_vec = sstables::sstable_directory::sstable_open_info_vector();
// We want to make sure that each distributed object reshards about the same amount of data.
// Each sharded object has its own shared SSTables. We can use a clever algorithm in which they
// all distributely figure out which SSTables to exchange, but we'll keep it simple and move all
// their foreign_sstable_open_info to a coordinator (the shard who called this function). We can
// move in bulk and that's efficient. That shard can then distribute the work among all the
// others who will reshard.
auto coordinator = this_shard_id();
// We will first move all of the foreign open info to temporary storage so that we can sort
// them. We want to distribute bigger sstables first.
const auto* sorted_owned_ranges_ptr = owned_ranges_ptr.get();
co_await dir.invoke_on_all([&] (sstables::sstable_directory& d) -> future<> {
auto shared_sstables = d.retrieve_shared_sstables();
sstables::sstable_directory::sstable_open_info_vector need_cleanup;
if (sorted_owned_ranges_ptr) {
co_await d.filter_sstables([&] (sstables::shared_sstable sst) -> future<bool> {
if (needs_cleanup(sst, *sorted_owned_ranges_ptr)) {
need_cleanup.push_back(co_await sst->get_open_info());
co_return false;
}
co_return true;
});
}
if (shared_sstables.empty() && need_cleanup.empty()) {
co_return;
}
co_await smp::submit_to(coordinator, [&] () -> future<> {
info_vec.reserve(info_vec.size() + shared_sstables.size() + need_cleanup.size());
for (auto& info : shared_sstables) {
info_vec.emplace_back(std::move(info));
co_await coroutine::maybe_yield();
}
for (auto& info : need_cleanup) {
info_vec.emplace_back(std::move(info));
co_await coroutine::maybe_yield();
}
});
});
co_return info_vec;
}
// Given a vector of shared sstables to be resharded, distribute it among all shards.
// The vector is first sorted to make sure that we are moving the biggest SSTables first.
//
// Returns a reshard_shard_descriptor per shard indicating the work that each shard has to do.
future<std::vector<reshard_shard_descriptor>>
distribute_reshard_jobs(sstables::sstable_directory::sstable_open_info_vector source) {
auto destinations = std::vector<reshard_shard_descriptor>(smp::count);
std::sort(source.begin(), source.end(), [] (const sstables::foreign_sstable_open_info& a, const sstables::foreign_sstable_open_info& b) {
// Sort on descending SSTable sizes.
return a.uncompressed_data_size > b.uncompressed_data_size;
});
for (auto& info : source) {
// Choose the stable shard owner with the smallest amount of accumulated work.
// Note that for sstables that need cleanup via resharding, owners may contain
// a single shard.
auto shard_it = boost::min_element(info.owners, [&] (const shard_id& lhs, const shard_id& rhs) {
return destinations[lhs].total_size_smaller(destinations[rhs]);
});
auto& dest = destinations[*shard_it];
dest.uncompressed_data_size += info.uncompressed_data_size;
dest.info_vec.push_back(std::move(info));
co_await coroutine::maybe_yield();
}
co_return destinations;
}
// reshards a collection of SSTables.
//
// A reference to the compaction manager must be passed so we can register with it. Knowing
// which table is being processed is a requirement of the compaction manager, so this must be
// passed too.
//
// We will reshard max_sstables_per_job at once.
//
// A creator function must be passed that will create an SSTable object in the correct shard,
// and an I/O priority must be specified.
future<> reshard(sstables::sstable_directory& dir, sstables::sstable_directory::sstable_open_info_vector shared_info, replica::table& table,
sstables::compaction_sstable_creator_fn creator, compaction::owned_ranges_ptr owned_ranges_ptr)
{
// Resharding doesn't like empty sstable sets, so bail early. There is nothing
// to reshard in this shard.
if (shared_info.empty()) {
co_return;
}
// We want to reshard many SSTables at a time for efficiency. However if we have too many we may
// be risking OOM.
auto max_sstables_per_job = table.schema()->max_compaction_threshold();
auto num_jobs = (shared_info.size() + max_sstables_per_job - 1) / max_sstables_per_job;
auto sstables_per_job = shared_info.size() / num_jobs;
std::vector<std::vector<sstables::shared_sstable>> buckets;
buckets.reserve(num_jobs);
buckets.emplace_back();
co_await coroutine::parallel_for_each(shared_info, [&] (sstables::foreign_sstable_open_info& info) -> future<> {
auto sst = co_await dir.load_foreign_sstable(info);
// Last bucket gets leftover SSTables
if ((buckets.back().size() >= sstables_per_job) && (buckets.size() < num_jobs)) {
buckets.emplace_back();
}
buckets.back().push_back(std::move(sst));
});
// There is a semaphore inside the compaction manager in run_resharding_jobs. So we
// parallel_for_each so the statistics about pending jobs are updated to reflect all
// jobs. But only one will run in parallel at a time
auto& t = table.as_table_state();
co_await coroutine::parallel_for_each(buckets, [&] (std::vector<sstables::shared_sstable>& sstlist) mutable {
return table.get_compaction_manager().run_custom_job(table.as_table_state(), sstables::compaction_type::Reshard, "Reshard compaction", [&] (sstables::compaction_data& info) -> future<> {
auto erm = table.get_effective_replication_map(); // keep alive around compaction.
sstables::compaction_descriptor desc(sstlist);
desc.options = sstables::compaction_type_options::make_reshard();
desc.creator = creator;
desc.sharder = &erm->get_sharder(*table.schema());
desc.owned_ranges = owned_ranges_ptr;
auto result = co_await sstables::compact_sstables(std::move(desc), info, t);
// input sstables are moved, to guarantee their resources are released once we're done
// resharding them.
co_await when_all_succeed(dir.collect_output_unshared_sstables(std::move(result.new_sstables), sstables::sstable_directory::can_be_remote::yes), dir.remove_sstables(std::move(sstlist))).discard_result();
});
});
}
future<> run_resharding_jobs(sharded<sstables::sstable_directory>& dir, std::vector<reshard_shard_descriptor> reshard_jobs,
sharded<replica::database>& db, sstring ks_name, sstring table_name, sstables::compaction_sstable_creator_fn creator,
compaction::owned_ranges_ptr owned_ranges_ptr) {
uint64_t total_size = boost::accumulate(reshard_jobs | boost::adaptors::transformed(std::mem_fn(&reshard_shard_descriptor::size)), uint64_t(0));
if (total_size == 0) {
co_return;
}
auto start = std::chrono::steady_clock::now();
dblog.info("Resharding {} for {}.{}", utils::pretty_printed_data_size(total_size), ks_name, table_name);
co_await dir.invoke_on_all(coroutine::lambda([&] (sstables::sstable_directory& d) -> future<> {
auto& table = db.local().find_column_family(ks_name, table_name);
auto info_vec = std::move(reshard_jobs[this_shard_id()].info_vec);
// make shard-local copy of owned_ranges
compaction::owned_ranges_ptr local_owned_ranges_ptr;
if (owned_ranges_ptr) {
local_owned_ranges_ptr = make_lw_shared<const dht::token_range_vector>(*owned_ranges_ptr);
}
co_await ::replica::reshard(d, std::move(info_vec), table, creator, std::move(local_owned_ranges_ptr));
co_await d.move_foreign_sstables(dir);
}));
auto duration = std::chrono::duration_cast<std::chrono::duration<float>>(std::chrono::steady_clock::now() - start);
dblog.info("Resharded {} for {}.{} in {:.2f} seconds, {}", utils::pretty_printed_data_size(total_size), ks_name, table_name, duration.count(), utils::pretty_printed_throughput(total_size, duration));
}
// Global resharding function. Done in two parts:
// - The first part spreads the foreign_sstable_open_info across shards so that all of them are
// resharding about the same amount of data
@@ -303,9 +121,9 @@ future<> run_resharding_jobs(sharded<sstables::sstable_directory>& dir, std::vec
// assigned.
future<>
distributed_loader::reshard(sharded<sstables::sstable_directory>& dir, sharded<replica::database>& db, sstring ks_name, sstring table_name, sstables::compaction_sstable_creator_fn creator, compaction::owned_ranges_ptr owned_ranges_ptr) {
auto all_jobs = co_await collect_all_shared_sstables(dir, db, ks_name, table_name, owned_ranges_ptr);
auto destinations = co_await distribute_reshard_jobs(std::move(all_jobs));
co_await run_resharding_jobs(dir, std::move(destinations), db, ks_name, table_name, std::move(creator), std::move(owned_ranges_ptr));
auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module();
auto task = co_await compaction_module.make_and_start_task<table_resharding_compaction_task_impl>({}, std::move(ks_name), std::move(table_name), dir, db, std::move(creator), std::move(owned_ranges_ptr));
co_await task->done();
}
future<sstables::sstable::version_types>