compaction: add shard_reshard_sstables_compaction_task_impl
Add task manager's task covering resharding compaction on one shard.
This commit is contained in:
@@ -532,19 +532,30 @@ future<> table_resharding_compaction_task_impl::run() {
|
||||
dblog.info("Resharding {} for {}.{}", utils::pretty_printed_data_size(total_size), _status.keyspace, _status.table);
|
||||
|
||||
co_await _db.invoke_on_all(coroutine::lambda([&] (replica::database& db) -> future<> {
|
||||
auto& table = db.find_column_family(_status.keyspace, _status.table);
|
||||
auto info_vec = std::move(destinations[this_shard_id()].info_vec);
|
||||
// make shard-local copy of owned_ranges
|
||||
compaction::owned_ranges_ptr local_owned_ranges_ptr;
|
||||
if (_owned_ranges_ptr) {
|
||||
local_owned_ranges_ptr = make_lw_shared<const dht::token_range_vector>(*_owned_ranges_ptr);
|
||||
}
|
||||
co_await reshard(_dir.local(), std::move(info_vec), table, _creator, std::move(local_owned_ranges_ptr));
|
||||
co_await _dir.local().move_foreign_sstables(_dir);
|
||||
tasks::task_info parent_info{_status.id, _status.shard};
|
||||
auto& compaction_module = _db.local().get_compaction_manager().get_task_manager_module();
|
||||
auto task = co_await compaction_module.make_and_start_task<shard_resharding_compaction_task_impl>(parent_info, _status.keyspace, _status.table, _status.id, _dir, db, _creator, _owned_ranges_ptr, destinations);
|
||||
co_await task->done();
|
||||
}));
|
||||
|
||||
auto duration = std::chrono::duration_cast<std::chrono::duration<float>>(std::chrono::steady_clock::now() - start);
|
||||
dblog.info("Resharded {} for {}.{} in {:.2f} seconds, {}", utils::pretty_printed_data_size(total_size), _status.keyspace, _status.table, duration.count(), utils::pretty_printed_throughput(total_size, duration));
|
||||
}
|
||||
|
||||
tasks::is_internal shard_resharding_compaction_task_impl::is_internal() const noexcept {
|
||||
return tasks::is_internal::yes;
|
||||
}
|
||||
|
||||
future<> shard_resharding_compaction_task_impl::run() {
|
||||
auto& table = _db.find_column_family(_status.keyspace, _status.table);
|
||||
auto info_vec = std::move(_destinations[this_shard_id()].info_vec);
|
||||
// make shard-local copy of owned_ranges
|
||||
compaction::owned_ranges_ptr local_owned_ranges_ptr;
|
||||
if (_owned_ranges_ptr) {
|
||||
local_owned_ranges_ptr = make_lw_shared<const dht::token_range_vector>(*_owned_ranges_ptr);
|
||||
}
|
||||
co_await reshard(_dir.local(), std::move(info_vec), table, _creator, std::move(local_owned_ranges_ptr));
|
||||
co_await _dir.local().move_foreign_sstables(_dir);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -17,6 +17,10 @@ namespace sstables {
|
||||
class sstable_directory;
|
||||
}
|
||||
|
||||
namespace replica {
|
||||
class reshard_shard_descriptor;
|
||||
}
|
||||
|
||||
namespace compaction {
|
||||
|
||||
class compaction_task_impl : public tasks::task_manager::task::impl {
|
||||
@@ -597,6 +601,36 @@ protected:
|
||||
virtual future<> run() override;
|
||||
};
|
||||
|
||||
class shard_resharding_compaction_task_impl : public resharding_compaction_task_impl {
|
||||
private:
|
||||
sharded<sstables::sstable_directory>& _dir;
|
||||
replica::database& _db;
|
||||
sstables::compaction_sstable_creator_fn _creator;
|
||||
compaction::owned_ranges_ptr _owned_ranges_ptr;
|
||||
std::vector<replica::reshard_shard_descriptor>& _destinations;
|
||||
public:
|
||||
shard_resharding_compaction_task_impl(tasks::task_manager::module_ptr module,
|
||||
std::string keyspace,
|
||||
std::string table,
|
||||
tasks::task_id parent_id,
|
||||
sharded<sstables::sstable_directory>& dir,
|
||||
replica::database& db,
|
||||
sstables::compaction_sstable_creator_fn creator,
|
||||
compaction::owned_ranges_ptr owned_ranges_ptr,
|
||||
std::vector<replica::reshard_shard_descriptor>& destinations) noexcept
|
||||
: resharding_compaction_task_impl(module, tasks::task_id::create_random_id(), 0, std::move(keyspace), std::move(table), "", parent_id)
|
||||
, _dir(dir)
|
||||
, _db(db)
|
||||
, _creator(std::move(creator))
|
||||
, _owned_ranges_ptr(std::move(owned_ranges_ptr))
|
||||
, _destinations(destinations)
|
||||
{}
|
||||
|
||||
virtual tasks::is_internal is_internal() const noexcept override;
|
||||
protected:
|
||||
virtual future<> run() override;
|
||||
};
|
||||
|
||||
class task_manager_module : public tasks::task_manager::module {
|
||||
public:
|
||||
task_manager_module(tasks::task_manager& tm) noexcept : tasks::task_manager::module(tm, "compaction") {}
|
||||
|
||||
Reference in New Issue
Block a user