compaction: use table_info in compaction tasks
Task manager compaction tasks need table names for logs. Thus, compaction tasks store table infos instead of table ids. get_table_ids function is deleted as it isn't used anywhere.
This commit is contained in:
@@ -1053,7 +1053,10 @@ void set_column_family(http_context& ctx, routes& r, sharded<db::system_keyspace
|
||||
apilog.info("column_family/force_major_compaction: name={}", req->param["name"]);
|
||||
auto [ks, cf] = parse_fully_qualified_cf_name(req->param["name"]);
|
||||
auto keyspace = validate_keyspace(ctx, ks);
|
||||
std::vector<table_id> table_infos = {ctx.db.local().find_uuid(ks, cf)};
|
||||
std::vector<table_info> table_infos = {table_info{
|
||||
.name = cf,
|
||||
.id = ctx.db.local().find_uuid(ks, cf)
|
||||
}};
|
||||
|
||||
auto& compaction_module = ctx.db.local().get_compaction_manager().get_task_manager_module();
|
||||
auto task = co_await compaction_module.make_and_start_task<major_keyspace_compaction_task_impl>({}, std::move(keyspace), ctx.db, std::move(table_infos));
|
||||
|
||||
@@ -465,14 +465,6 @@ static future<json::json_return_type> describe_ring_as_json(sharded<service::sto
|
||||
co_return json::json_return_type(stream_range_as_array(co_await ss.local().describe_ring(keyspace), token_range_endpoints_to_json));
|
||||
}
|
||||
|
||||
static std::vector<table_id> get_table_ids(const std::vector<table_info>& table_infos) {
|
||||
std::vector<table_id> table_ids{table_infos.size()};
|
||||
boost::transform(table_infos, table_ids.begin(), [] (const auto& ti) {
|
||||
return ti.id;
|
||||
});
|
||||
return table_ids;
|
||||
}
|
||||
|
||||
void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_service>& ss, gms::gossiper& g, sharded<cdc::generation_service>& cdc_gs, sharded<db::system_keyspace>& sys_ks) {
|
||||
ss::local_hostid.set(r, [&ctx](std::unique_ptr<http::request> req) {
|
||||
auto id = ctx.db.local().get_config().host_id;
|
||||
@@ -691,7 +683,7 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
|
||||
apilog.debug("force_keyspace_compaction: keyspace={} tables={}", keyspace, table_infos);
|
||||
|
||||
auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module();
|
||||
auto task = co_await compaction_module.make_and_start_task<major_keyspace_compaction_task_impl>({}, std::move(keyspace), db, get_table_ids(table_infos));
|
||||
auto task = co_await compaction_module.make_and_start_task<major_keyspace_compaction_task_impl>({}, std::move(keyspace), db, table_infos);
|
||||
try {
|
||||
co_await task->done();
|
||||
} catch (...) {
|
||||
@@ -714,7 +706,7 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
|
||||
}
|
||||
|
||||
auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module();
|
||||
auto task = co_await compaction_module.make_and_start_task<cleanup_keyspace_compaction_task_impl>({}, std::move(keyspace), db, get_table_ids(table_infos));
|
||||
auto task = co_await compaction_module.make_and_start_task<cleanup_keyspace_compaction_task_impl>({}, std::move(keyspace), db, table_infos);
|
||||
try {
|
||||
co_await task->done();
|
||||
} catch (...) {
|
||||
@@ -729,7 +721,7 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
|
||||
apilog.info("perform_keyspace_offstrategy_compaction: keyspace={} tables={}", keyspace, table_infos);
|
||||
bool res = false;
|
||||
auto& compaction_module = ctx.db.local().get_compaction_manager().get_task_manager_module();
|
||||
auto task = co_await compaction_module.make_and_start_task<offstrategy_keyspace_compaction_task_impl>({}, std::move(keyspace), ctx.db, get_table_ids(table_infos), res);
|
||||
auto task = co_await compaction_module.make_and_start_task<offstrategy_keyspace_compaction_task_impl>({}, std::move(keyspace), ctx.db, table_infos, res);
|
||||
try {
|
||||
co_await task->done();
|
||||
} catch (...) {
|
||||
@@ -747,7 +739,7 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
|
||||
apilog.info("upgrade_sstables: keyspace={} tables={} exclude_current_version={}", keyspace, table_infos, exclude_current_version);
|
||||
|
||||
auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module();
|
||||
auto task = co_await compaction_module.make_and_start_task<upgrade_sstables_compaction_task_impl>({}, std::move(keyspace), db, get_table_ids(table_infos), exclude_current_version);
|
||||
auto task = co_await compaction_module.make_and_start_task<upgrade_sstables_compaction_task_impl>({}, std::move(keyspace), db, table_infos, exclude_current_version);
|
||||
try {
|
||||
co_await task->done();
|
||||
} catch (...) {
|
||||
|
||||
@@ -13,17 +13,17 @@
|
||||
namespace compaction {
|
||||
|
||||
// Run on all tables, skipping dropped tables
|
||||
future<> run_on_existing_tables(sstring op, replica::database& db, std::string_view keyspace, const std::vector<table_id> local_tables, std::function<future<> (replica::table&)> func) {
|
||||
future<> run_on_existing_tables(sstring op, replica::database& db, std::string_view keyspace, const std::vector<table_info> local_tables, std::function<future<> (replica::table&)> func) {
|
||||
std::exception_ptr ex;
|
||||
for (const auto& ti : local_tables) {
|
||||
tasks::tmlogger.debug("Starting {} on {}.{}", op, keyspace, ti);
|
||||
tasks::tmlogger.debug("Starting {} on {}.{}", op, keyspace, ti.name);
|
||||
try {
|
||||
co_await func(db.find_column_family(ti));
|
||||
co_await func(db.find_column_family(ti.id));
|
||||
} catch (const replica::no_such_column_family& e) {
|
||||
tasks::tmlogger.warn("Skipping {} of {}.{}: {}", op, keyspace, ti, e.what());
|
||||
tasks::tmlogger.warn("Skipping {} of {}.{}: {}", op, keyspace, ti.name, e.what());
|
||||
} catch (...) {
|
||||
ex = std::current_exception();
|
||||
tasks::tmlogger.error("Failed {} of {}.{}: {}", op, keyspace, ti, ex);
|
||||
tasks::tmlogger.error("Failed {} of {}.{}: {}", op, keyspace, ti.name, ex);
|
||||
}
|
||||
if (ex) {
|
||||
co_await coroutine::return_exception_ptr(std::move(ex));
|
||||
@@ -46,9 +46,9 @@ tasks::is_internal shard_major_keyspace_compaction_task_impl::is_internal() cons
|
||||
|
||||
future<> shard_major_keyspace_compaction_task_impl::run() {
|
||||
// Major compact smaller tables first, to increase chances of success if low on space.
|
||||
std::ranges::sort(_local_tables, std::less<>(), [&] (const table_id& ti) {
|
||||
std::ranges::sort(_local_tables, std::less<>(), [&] (const table_info& ti) {
|
||||
try {
|
||||
return _db.find_column_family(ti).get_stats().live_disk_space_used;
|
||||
return _db.find_column_family(ti.id).get_stats().live_disk_space_used;
|
||||
} catch (const replica::no_such_column_family& e) {
|
||||
return int64_t(-1);
|
||||
}
|
||||
@@ -61,7 +61,7 @@ future<> shard_major_keyspace_compaction_task_impl::run() {
|
||||
future<> cleanup_keyspace_compaction_task_impl::run() {
|
||||
co_await _db.invoke_on_all([&] (replica::database& db) -> future<> {
|
||||
auto& module = db.get_compaction_manager().get_task_manager_module();
|
||||
auto task = co_await module.make_and_start_task<shard_cleanup_keyspace_compaction_task_impl>({_status.id, _status.shard}, _status.keyspace, _status.id, db, _table_ids);
|
||||
auto task = co_await module.make_and_start_task<shard_cleanup_keyspace_compaction_task_impl>({_status.id, _status.shard}, _status.keyspace, _status.id, db, _table_infos);
|
||||
co_await task->done();
|
||||
});
|
||||
}
|
||||
@@ -72,9 +72,9 @@ tasks::is_internal shard_cleanup_keyspace_compaction_task_impl::is_internal() co
|
||||
|
||||
future<> shard_cleanup_keyspace_compaction_task_impl::run() {
|
||||
// Cleanup smaller tables first, to increase chances of success if low on space.
|
||||
std::ranges::sort(_local_tables, std::less<>(), [&] (const table_id& ti) {
|
||||
std::ranges::sort(_local_tables, std::less<>(), [&] (const table_info& ti) {
|
||||
try {
|
||||
return _db.find_column_family(ti).get_stats().live_disk_space_used;
|
||||
return _db.find_column_family(ti.id).get_stats().live_disk_space_used;
|
||||
} catch (const replica::no_such_column_family& e) {
|
||||
return int64_t(-1);
|
||||
}
|
||||
|
||||
@@ -58,12 +58,12 @@ protected:
|
||||
class major_keyspace_compaction_task_impl : public major_compaction_task_impl {
|
||||
private:
|
||||
sharded<replica::database>& _db;
|
||||
std::vector<table_id> _table_infos;
|
||||
std::vector<table_info> _table_infos;
|
||||
public:
|
||||
major_keyspace_compaction_task_impl(tasks::task_manager::module_ptr module,
|
||||
std::string keyspace,
|
||||
sharded<replica::database>& db,
|
||||
std::vector<table_id> table_infos) noexcept
|
||||
std::vector<table_info> table_infos) noexcept
|
||||
: major_compaction_task_impl(module, tasks::task_id::create_random_id(), module->new_sequence_number(), std::move(keyspace), "", "", tasks::task_id::create_null_id())
|
||||
, _db(db)
|
||||
, _table_infos(std::move(table_infos))
|
||||
@@ -75,13 +75,13 @@ protected:
|
||||
class shard_major_keyspace_compaction_task_impl : public major_compaction_task_impl {
|
||||
private:
|
||||
replica::database& _db;
|
||||
std::vector<table_id> _local_tables;
|
||||
std::vector<table_info> _local_tables;
|
||||
public:
|
||||
shard_major_keyspace_compaction_task_impl(tasks::task_manager::module_ptr module,
|
||||
std::string keyspace,
|
||||
tasks::task_id parent_id,
|
||||
replica::database& db,
|
||||
std::vector<table_id> local_tables) noexcept
|
||||
std::vector<table_info> local_tables) noexcept
|
||||
: major_compaction_task_impl(module, tasks::task_id::create_random_id(), 0, std::move(keyspace), "", "", parent_id)
|
||||
, _db(db)
|
||||
, _local_tables(std::move(local_tables))
|
||||
@@ -116,15 +116,15 @@ protected:
|
||||
class cleanup_keyspace_compaction_task_impl : public cleanup_compaction_task_impl {
|
||||
private:
|
||||
sharded<replica::database>& _db;
|
||||
std::vector<table_id> _table_ids;
|
||||
std::vector<table_info> _table_infos;
|
||||
public:
|
||||
cleanup_keyspace_compaction_task_impl(tasks::task_manager::module_ptr module,
|
||||
std::string keyspace,
|
||||
sharded<replica::database>& db,
|
||||
std::vector<table_id> table_ids) noexcept
|
||||
std::vector<table_info> table_infos) noexcept
|
||||
: cleanup_compaction_task_impl(module, tasks::task_id::create_random_id(), module->new_sequence_number(), std::move(keyspace), "", "", tasks::task_id::create_null_id())
|
||||
, _db(db)
|
||||
, _table_ids(std::move(table_ids))
|
||||
, _table_infos(std::move(table_infos))
|
||||
{}
|
||||
protected:
|
||||
virtual future<> run() override;
|
||||
@@ -133,13 +133,13 @@ protected:
|
||||
class shard_cleanup_keyspace_compaction_task_impl : public cleanup_compaction_task_impl {
|
||||
private:
|
||||
replica::database& _db;
|
||||
std::vector<table_id> _local_tables;
|
||||
std::vector<table_info> _local_tables;
|
||||
public:
|
||||
shard_cleanup_keyspace_compaction_task_impl(tasks::task_manager::module_ptr module,
|
||||
std::string keyspace,
|
||||
tasks::task_id parent_id,
|
||||
replica::database& db,
|
||||
std::vector<table_id> local_tables) noexcept
|
||||
std::vector<table_info> local_tables) noexcept
|
||||
: cleanup_compaction_task_impl(module, tasks::task_id::create_random_id(), 0, std::move(keyspace), "", "", parent_id)
|
||||
, _db(db)
|
||||
, _local_tables(std::move(local_tables))
|
||||
@@ -174,13 +174,13 @@ protected:
|
||||
class offstrategy_keyspace_compaction_task_impl : public offstrategy_compaction_task_impl {
|
||||
private:
|
||||
sharded<replica::database>& _db;
|
||||
std::vector<table_id> _table_infos;
|
||||
std::vector<table_info> _table_infos;
|
||||
bool& _needed;
|
||||
public:
|
||||
offstrategy_keyspace_compaction_task_impl(tasks::task_manager::module_ptr module,
|
||||
std::string keyspace,
|
||||
sharded<replica::database>& db,
|
||||
std::vector<table_id> table_infos,
|
||||
std::vector<table_info> table_infos,
|
||||
bool& needed) noexcept
|
||||
: offstrategy_compaction_task_impl(module, tasks::task_id::create_random_id(), module->new_sequence_number(), std::move(keyspace), "", "", tasks::task_id::create_null_id())
|
||||
, _db(db)
|
||||
@@ -194,14 +194,14 @@ protected:
|
||||
class shard_offstrategy_keyspace_compaction_task_impl : public offstrategy_compaction_task_impl {
|
||||
private:
|
||||
replica::database& _db;
|
||||
std::vector<table_id> _table_infos;
|
||||
std::vector<table_info> _table_infos;
|
||||
bool& _needed;
|
||||
public:
|
||||
shard_offstrategy_keyspace_compaction_task_impl(tasks::task_manager::module_ptr module,
|
||||
std::string keyspace,
|
||||
tasks::task_id parent_id,
|
||||
replica::database& db,
|
||||
std::vector<table_id> table_infos,
|
||||
std::vector<table_info> table_infos,
|
||||
bool& needed) noexcept
|
||||
: offstrategy_compaction_task_impl(module, tasks::task_id::create_random_id(), 0, std::move(keyspace), "", "", parent_id)
|
||||
, _db(db)
|
||||
@@ -238,13 +238,13 @@ protected:
|
||||
class upgrade_sstables_compaction_task_impl : public sstables_compaction_task_impl {
|
||||
private:
|
||||
sharded<replica::database>& _db;
|
||||
std::vector<table_id> _table_infos;
|
||||
std::vector<table_info> _table_infos;
|
||||
bool _exclude_current_version;
|
||||
public:
|
||||
upgrade_sstables_compaction_task_impl(tasks::task_manager::module_ptr module,
|
||||
std::string keyspace,
|
||||
sharded<replica::database>& db,
|
||||
std::vector<table_id> table_infos,
|
||||
std::vector<table_info> table_infos,
|
||||
bool exclude_current_version) noexcept
|
||||
: sstables_compaction_task_impl(module, tasks::task_id::create_random_id(), module->new_sequence_number(), std::move(keyspace), "", "", tasks::task_id::create_null_id())
|
||||
, _db(db)
|
||||
@@ -258,14 +258,14 @@ protected:
|
||||
class shard_upgrade_sstables_compaction_task_impl : public sstables_compaction_task_impl {
|
||||
private:
|
||||
replica::database& _db;
|
||||
std::vector<table_id> _table_infos;
|
||||
std::vector<table_info> _table_infos;
|
||||
bool _exclude_current_version;
|
||||
public:
|
||||
shard_upgrade_sstables_compaction_task_impl(tasks::task_manager::module_ptr module,
|
||||
std::string keyspace,
|
||||
tasks::task_id parent_id,
|
||||
replica::database& db,
|
||||
std::vector<table_id> table_infos,
|
||||
std::vector<table_info> table_infos,
|
||||
bool exclude_current_version) noexcept
|
||||
: sstables_compaction_task_impl(module, tasks::task_id::create_random_id(), 0, std::move(keyspace), "", "", parent_id)
|
||||
, _db(db)
|
||||
|
||||
Reference in New Issue
Block a user