Merge 'Compaction reshape tasks' from Aleksandra Martyniuk

Task manager's tasks covering resharding compaction
on top and shard level.

Closes #14112

* github.com:scylladb/scylladb:
  test: extend test_compaction_task.py to test reshaping compaction
  compaction: move reshape function to shard_reshaping_table_compaction_task_impl::run()
  compaction: add shard_reshaping_compaction_task_impl
  replica: delete unused function
  compaction: add table_reshaping_compaction_task_impl
  compaction: copy reshape to task_manager_module.cc
  compaction: add reshaping_compaction_task_impl
This commit is contained in:
Botond Dénes
2023-06-26 11:56:07 +03:00
4 changed files with 168 additions and 69 deletions

View File

@@ -9,6 +9,8 @@
#include "compaction/task_manager_module.hh"
#include "compaction/compaction_manager.hh"
#include "replica/database.hh"
#include "sstables/sstables.hh"
#include "sstables/sstable_directory.hh"
namespace compaction {
@@ -280,5 +282,80 @@ future<> table_scrub_sstables_compaction_task_impl::run() {
});
}
future<> table_reshaping_compaction_task_impl::run() {
auto start = std::chrono::steady_clock::now();
auto total_size = co_await _dir.map_reduce0([&] (sstables::sstable_directory& d) -> future<uint64_t> {
uint64_t total_shard_size;
tasks::task_info parent_info{_status.id, _status.shard};
auto& compaction_module = _db.local().get_compaction_manager().get_task_manager_module();
auto task = co_await compaction_module.make_and_start_task<shard_reshaping_compaction_task_impl>(parent_info, _status.keyspace, _status.table, _status.id, d, _db, _mode, _creator, _filter, total_shard_size);
co_await task->done();
co_return total_shard_size;
}, uint64_t(0), std::plus<uint64_t>());
if (total_size > 0) {
auto duration = std::chrono::duration_cast<std::chrono::duration<float>>(std::chrono::steady_clock::now() - start);
dblog.info("Reshaped {} in {:.2f} seconds, {}", sstables::pretty_printed_data_size(total_size), duration.count(), sstables::pretty_printed_throughput(total_size, duration));
}
}
tasks::is_internal shard_reshaping_compaction_task_impl::is_internal() const noexcept {
return tasks::is_internal::yes;
}
future<> shard_reshaping_compaction_task_impl::run() {
auto& table = _db.local().find_column_family(_status.keyspace, _status.table);
uint64_t reshaped_size = 0;
while (true) {
auto reshape_candidates = boost::copy_range<std::vector<sstables::shared_sstable>>(_dir.get_unshared_local_sstables()
| boost::adaptors::filtered([&filter = _filter] (const auto& sst) {
return filter(sst);
}));
auto desc = table.get_compaction_strategy().get_reshaping_job(std::move(reshape_candidates), table.schema(), _mode);
if (desc.sstables.empty()) {
break;
}
if (!reshaped_size) {
dblog.info("Table {}.{} with compaction strategy {} found SSTables that need reshape. Starting reshape process", table.schema()->ks_name(), table.schema()->cf_name(), table.get_compaction_strategy().name());
}
std::vector<sstables::shared_sstable> sstlist;
for (auto& sst : desc.sstables) {
reshaped_size += sst->data_size();
sstlist.push_back(sst);
}
desc.creator = _creator;
std::exception_ptr ex;
try {
co_await table.get_compaction_manager().run_custom_job(table.as_table_state(), sstables::compaction_type::Reshape, "Reshape compaction", [&dir = _dir, &table, sstlist = std::move(sstlist), desc = std::move(desc)] (sstables::compaction_data& info) mutable -> future<> {
sstables::compaction_result result = co_await sstables::compact_sstables(std::move(desc), info, table.as_table_state());
co_await dir.remove_unshared_sstables(std::move(sstlist));
co_await dir.collect_output_unshared_sstables(std::move(result.new_sstables), sstables::sstable_directory::can_be_remote::no);
});
} catch (...) {
ex = std::current_exception();
}
if (ex != nullptr) {
try {
std::rethrow_exception(std::move(ex));
} catch (sstables::compaction_stopped_exception& e) {
dblog.info("Table {}.{} with compaction strategy {} had reshape successfully aborted.", table.schema()->ks_name(), table.schema()->cf_name(), table.get_compaction_strategy().name());
break;
} catch (...) {
dblog.info("Reshape failed for Table {}.{} with compaction strategy {} due to {}", table.schema()->ks_name(), table.schema()->cf_name(), table.get_compaction_strategy().name(), std::current_exception());
break;
}
}
co_await coroutine::maybe_yield();
}
_total_shard_size = reshaped_size;
}
}

View File

@@ -13,6 +13,9 @@
#include "schema/schema_fwd.hh"
#include "tasks/task_manager.hh"
namespace sstables {
class sstable_directory;
}
namespace compaction {
class compaction_task_impl : public tasks::task_manager::task::impl {
@@ -466,6 +469,87 @@ protected:
virtual future<> run() override;
};
class reshaping_compaction_task_impl : public compaction_task_impl {
public:
reshaping_compaction_task_impl(tasks::task_manager::module_ptr module,
tasks::task_id id,
unsigned sequence_number,
std::string keyspace,
std::string table,
std::string entity,
tasks::task_id parent_id) noexcept
: compaction_task_impl(module, id, sequence_number, std::move(keyspace), std::move(table), std::move(entity), parent_id)
{
// FIXME: add progress units
}
virtual std::string type() const override {
return "reshaping compaction";
}
protected:
virtual future<> run() override = 0;
};
class table_reshaping_compaction_task_impl : public reshaping_compaction_task_impl {
private:
sharded<sstables::sstable_directory>& _dir;
sharded<replica::database>& _db;
sstables::reshape_mode _mode;
sstables::compaction_sstable_creator_fn _creator;
std::function<bool (const sstables::shared_sstable&)> _filter;
public:
table_reshaping_compaction_task_impl(tasks::task_manager::module_ptr module,
std::string keyspace,
std::string table,
sharded<sstables::sstable_directory>& dir,
sharded<replica::database>& db,
sstables::reshape_mode mode,
sstables::compaction_sstable_creator_fn creator,
std::function<bool (const sstables::shared_sstable&)> filter) noexcept
: reshaping_compaction_task_impl(module, tasks::task_id::create_random_id(), module->new_sequence_number(), std::move(keyspace), std::move(table), "", tasks::task_id::create_null_id())
, _dir(dir)
, _db(db)
, _mode(mode)
, _creator(std::move(creator))
, _filter(std::move(filter))
{}
protected:
virtual future<> run() override;
};
class shard_reshaping_compaction_task_impl : public reshaping_compaction_task_impl {
private:
sstables::sstable_directory& _dir;
sharded<replica::database>& _db;
sstables::reshape_mode _mode;
sstables::compaction_sstable_creator_fn _creator;
std::function<bool (const sstables::shared_sstable&)> _filter;
uint64_t& _total_shard_size;
public:
shard_reshaping_compaction_task_impl(tasks::task_manager::module_ptr module,
std::string keyspace,
std::string table,
tasks::task_id parent_id,
sstables::sstable_directory& dir,
sharded<replica::database>& db,
sstables::reshape_mode mode,
sstables::compaction_sstable_creator_fn creator,
std::function<bool (const sstables::shared_sstable&)> filter,
uint64_t& total_shard_size) noexcept
: reshaping_compaction_task_impl(module, tasks::task_id::create_random_id(), 0, std::move(keyspace), std::move(table), "", parent_id)
, _dir(dir)
, _db(db)
, _mode(mode)
, _creator(std::move(creator))
, _filter(std::move(filter))
, _total_shard_size(total_shard_size)
{}
virtual tasks::is_internal is_internal() const noexcept override;
protected:
virtual future<> run() override;
};
class task_manager_module : public tasks::task_manager::module {
public:
task_manager_module(tasks::task_manager& tm) noexcept : tasks::task_manager::module(tm, "compaction") {}

View File

@@ -316,79 +316,13 @@ highest_version_seen(sharded<sstables::sstable_directory>& dir, sstables::sstabl
});
}
using sstable_filter_func_t = std::function<bool (const sstables::shared_sstable&)>;
future<uint64_t> reshape(sstables::sstable_directory& dir, replica::table& table, sstables::compaction_sstable_creator_fn creator,
sstables::reshape_mode mode, sstable_filter_func_t filter)
{
uint64_t reshaped_size = 0;
while (true) {
auto reshape_candidates = boost::copy_range<std::vector<sstables::shared_sstable>>(dir.get_unshared_local_sstables()
| boost::adaptors::filtered([&filter] (const auto& sst) {
return filter(sst);
}));
auto desc = table.get_compaction_strategy().get_reshaping_job(std::move(reshape_candidates), table.schema(), mode);
if (desc.sstables.empty()) {
break;
}
if (!reshaped_size) {
dblog.info("Table {}.{} with compaction strategy {} found SSTables that need reshape. Starting reshape process", table.schema()->ks_name(), table.schema()->cf_name(), table.get_compaction_strategy().name());
}
std::vector<sstables::shared_sstable> sstlist;
for (auto& sst : desc.sstables) {
reshaped_size += sst->data_size();
sstlist.push_back(sst);
}
desc.creator = creator;
std::exception_ptr ex;
try {
co_await table.get_compaction_manager().run_custom_job(table.as_table_state(), sstables::compaction_type::Reshape, "Reshape compaction", [&dir, &table, sstlist = std::move(sstlist), desc = std::move(desc)] (sstables::compaction_data& info) mutable -> future<> {
sstables::compaction_result result = co_await sstables::compact_sstables(std::move(desc), info, table.as_table_state());
co_await dir.remove_unshared_sstables(std::move(sstlist));
co_await dir.collect_output_unshared_sstables(std::move(result.new_sstables), sstables::sstable_directory::can_be_remote::no);
});
} catch (...) {
ex = std::current_exception();
}
if (ex != nullptr) {
try {
std::rethrow_exception(std::move(ex));
} catch (sstables::compaction_stopped_exception& e) {
dblog.info("Table {}.{} with compaction strategy {} had reshape successfully aborted.", table.schema()->ks_name(), table.schema()->cf_name(), table.get_compaction_strategy().name());
break;
} catch (...) {
dblog.info("Reshape failed for Table {}.{} with compaction strategy {} due to {}", table.schema()->ks_name(), table.schema()->cf_name(), table.get_compaction_strategy().name(), std::current_exception());
break;
}
}
co_await coroutine::maybe_yield();
}
co_return reshaped_size;
}
future<>
distributed_loader::reshape(sharded<sstables::sstable_directory>& dir, sharded<replica::database>& db, sstables::reshape_mode mode,
sstring ks_name, sstring table_name, sstables::compaction_sstable_creator_fn creator,
std::function<bool (const sstables::shared_sstable&)> filter) {
auto start = std::chrono::steady_clock::now();
auto total_size = co_await dir.map_reduce0([&db, ks_name = std::move(ks_name), table_name = std::move(table_name), creator = std::move(creator), mode, filter] (sstables::sstable_directory& d) {
auto& table = db.local().find_column_family(ks_name, table_name);
return ::replica::reshape(d, table, creator, mode, filter);
}, uint64_t(0), std::plus<uint64_t>());
if (total_size > 0) {
auto duration = std::chrono::duration_cast<std::chrono::duration<float>>(std::chrono::steady_clock::now() - start);
dblog.info("Reshaped {} in {:.2f} seconds, {}", sstables::pretty_printed_data_size(total_size), duration.count(), sstables::pretty_printed_throughput(total_size, duration));
}
auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module();
auto task = co_await compaction_module.make_and_start_task<table_reshaping_compaction_task_impl>({}, std::move(ks_name), std::move(table_name), dir, db, mode, std::move(creator), std::move(filter));
co_await task->done();
}
// Loads SSTables into the main directory (or staging) and returns how many were loaded

View File

@@ -55,3 +55,7 @@ def test_rewrite_sstables_keyspace_compaction_task(cql, this_dc, rest_api):
check_compaction_task(cql, this_dc, rest_api, lambda keyspace, _: rest_api.send("GET", f"storage_service/keyspace_upgrade_sstables/{keyspace}"), "rewrite sstables compaction", task_tree_depth)
# scrub sstables compaction
check_compaction_task(cql, this_dc, rest_api, lambda keyspace, _: rest_api.send("GET", f"storage_service/keyspace_scrub/{keyspace}"), "rewrite sstables compaction", task_tree_depth)
def test_reshaping_compaction_task(cql, this_dc, rest_api):
task_tree_depth = 1
check_compaction_task(cql, this_dc, rest_api, lambda keyspace, table: rest_api.send("POST", f"storage_service/sstables/{keyspace}", {'cf': table, 'load_and_stream': False}), "reshaping compaction", task_tree_depth)