Merge 'tasks: add tablet resize virtual task' from Aleksandra Martyniuk

In this change, tablet_virtual_task starts supporting tablet
resize (i.e. split and merge).

Users can see running resize tasks - finished tasks are not
presented with the task manager API.

A new task state "suspended" is added. If a resize was revoked,
it will appear to users as suspended. We assume that the resize was revoked
when the tablet number didn't change.

Fixes: #21366.
Fixes: #21367.

No backport, new feature

Closes scylladb/scylladb#21891

* github.com:scylladb/scylladb:
  test: boost: check resize_task_info in tablet_test.cc
  test: add tests to check revoked resize virtual tasks
  test: add tests to check the list of resize virtual tasks
  test: add tests to check spilt and merge virtual tasks status
  test: test_tablet_tasks: generalize functions
  replica: service: add split virtual task's children
  replica: service: pass parent info down to storage_group::split
  tasks: children of virtual tasks aren't internal by default
  tasks: initialize shard in task_info ctor
  service: extend tablet_virtual_task::abort
  service: retrun status_helper struct from tablet_virtual_task::get_status_helper
  service: extend tablet_virtual_task::wait
  tasks: add suspended task state
  service: extend tablet_virtual_task::get_status
  service: extend tablet_virtual_task::contains
  service: extend tablet_virtual_task::get_stats
  service: add service::task_manager_module::get_nodes
  tasks: add task_manager::get_nodes
  tasks: drop noexcept from module::get_nodes
  replica: service: add resize_task_info static column to system.tablets
  locator: extend tablet_task_info to cover resize tasks
This commit is contained in:
Botond Dénes
2025-01-17 14:24:07 +02:00
23 changed files with 420 additions and 82 deletions

View File

@@ -284,7 +284,8 @@
"created",
"running",
"done",
"failed"
"failed",
"suspended"
],
"description":"The state of a task"
},
@@ -352,7 +353,8 @@
"created",
"running",
"done",
"failed"
"failed",
"suspended"
],
"description":"The state of the task"
},

View File

@@ -150,6 +150,7 @@ public:
gms::feature tablet_merge { *this, "TABLET_MERGE"sv };
gms::feature tablet_migration_virtual_task { *this, "TABLET_MIGRATION_VIRTUAL_TASK"sv };
gms::feature tablet_resize_virtual_task { *this, "TABLET_RESIZE_VIRTUAL_TASK"sv };
// A feature just for use in tests. It must not be advertised unless
// the "features_enable_test_feature" injection is enabled.

View File

@@ -425,6 +425,10 @@ void tablet_map::set_resize_decision(locator::resize_decision decision) {
_resize_decision = std::move(decision);
}
void tablet_map::set_resize_task_info(tablet_task_info task_info) {
_resize_task_info = std::move(task_info);
}
void tablet_map::set_repair_scheduler_config(locator::repair_scheduler_config config) {
_repair_scheduler_config = std::move(config);
}
@@ -558,6 +562,8 @@ static const std::unordered_map<tablet_task_type, sstring> tablet_task_type_to_n
{locator::tablet_task_type::auto_repair, "auto_repair"},
{locator::tablet_task_type::migration, "migration"},
{locator::tablet_task_type::intranode_migration, "intranode_migration"},
{locator::tablet_task_type::split, "split"},
{locator::tablet_task_type::merge, "merge"},
};
static const std::unordered_map<sstring, tablet_task_type> tablet_task_type_from_name = std::invoke([] {
@@ -604,6 +610,10 @@ const locator::resize_decision& tablet_map::resize_decision() const {
return _resize_decision;
}
const tablet_task_info& tablet_map::resize_task_info() const {
return _resize_task_info;
}
const locator::repair_scheduler_config& tablet_map::repair_scheduler_config() const {
return _repair_scheduler_config;
}
@@ -1158,3 +1168,15 @@ locator::tablet_task_info locator::tablet_task_info::make_intranode_migration_re
auto tablet_task_id = locator::tablet_task_id(utils::UUID_gen::get_time_UUID());
return locator::tablet_task_info{locator::tablet_task_type::intranode_migration, tablet_task_id, db_clock::now(), sched_nr, db_clock::time_point()};
}
locator::tablet_task_info locator::tablet_task_info::make_split_request() {
long sched_nr = 0;
auto tablet_task_id = locator::tablet_task_id(utils::UUID_gen::get_time_UUID());
return locator::tablet_task_info{locator::tablet_task_type::split, tablet_task_id, db_clock::now(), sched_nr, db_clock::time_point()};
}
locator::tablet_task_info locator::tablet_task_info::make_merge_request() {
long sched_nr = 0;
auto tablet_task_id = locator::tablet_task_id(utils::UUID_gen::get_time_UUID());
return locator::tablet_task_info{locator::tablet_task_type::merge, tablet_task_id, db_clock::now(), sched_nr, db_clock::time_point()};
}

View File

@@ -148,7 +148,9 @@ enum class tablet_task_type {
user_repair,
auto_repair,
migration,
intranode_migration
intranode_migration,
split,
merge
};
sstring tablet_task_type_to_string(tablet_task_type);
@@ -169,6 +171,8 @@ struct tablet_task_info {
static tablet_task_info make_auto_repair_request();
static tablet_task_info make_migration_request();
static tablet_task_info make_intranode_migration_request();
static tablet_task_info make_split_request();
static tablet_task_info make_merge_request();
};
/// Stores information about a single tablet.
@@ -398,6 +402,7 @@ private:
size_t _log2_tablets; // log_2(_tablets.size())
std::unordered_map<tablet_id, tablet_transition_info> _transitions;
resize_decision _resize_decision;
tablet_task_info _resize_task_info;
repair_scheduler_config _repair_scheduler_config;
/// Returns the largest token owned by tablet_id when the tablet_count is `1 << log2_tablets`.
@@ -520,11 +525,13 @@ public:
dht::token_range get_token_range_after_split(const token& t) const noexcept;
const locator::resize_decision& resize_decision() const;
const tablet_task_info& resize_task_info() const;
const locator::repair_scheduler_config& repair_scheduler_config() const;
public:
void set_tablet(tablet_id, tablet_info);
void set_tablet_transition_info(tablet_id, tablet_transition_info);
void set_resize_decision(locator::resize_decision);
void set_resize_task_info(tablet_task_info);
void set_repair_scheduler_config(locator::repair_scheduler_config config);
void clear_tablet_transition_info(tablet_id);
void clear_transitions();

View File

@@ -203,15 +203,8 @@ task_manager_module::task_manager_module(tasks::task_manager& tm, service::stora
, _ss(ss)
{}
std::set<gms::inet_address> task_manager_module::get_nodes() const noexcept {
return std::ranges::join_view(std::to_array({
std::views::all(_ss._topology_state_machine._topology.normal_nodes),
std::views::all(_ss._topology_state_machine._topology.transition_nodes)})
) | std::views::transform([&ss = _ss] (auto& node) {
return ss.host2ip(locator::host_id{node.first.uuid()});
}) | std::views::filter([&ss = _ss] (gms::inet_address ip) {
return ss._gossiper.is_alive(ip);
}) | std::ranges::to<std::set<gms::inet_address>>();
std::set<gms::inet_address> task_manager_module::get_nodes() const {
return get_task_manager().get_nodes(_ss);
}
}

View File

@@ -67,7 +67,7 @@ private:
public:
task_manager_module(tasks::task_manager& tm, service::storage_service& ss) noexcept;
virtual std::set<gms::inet_address> get_nodes() const noexcept override;
virtual std::set<gms::inet_address> get_nodes() const override;
};
}

View File

@@ -264,7 +264,7 @@ public:
// 1) Flushes all memtables which were created in non-split mode, and waits for that to complete.
// 2) Compacts all sstables which overlap with the split point
// Returns a future which resolves when this process is complete.
future<> split(sstables::compaction_type_options::split opt);
future<> split(sstables::compaction_type_options::split opt, tasks::task_info tablet_split_task_info);
// Make an sstable set spanning all sstables in the storage_group
lw_shared_ptr<const sstables::sstable_set> make_sstable_set() const;
@@ -368,7 +368,7 @@ public:
virtual locator::table_load_stats table_load_stats(std::function<bool(const locator::tablet_map&, locator::global_tablet_id)> tablet_filter) const noexcept = 0;
virtual bool all_storage_groups_split() = 0;
virtual future<> split_all_storage_groups() = 0;
virtual future<> split_all_storage_groups(tasks::task_info tablet_split_task_info) = 0;
virtual future<> maybe_split_compaction_group_of(size_t idx) = 0;
virtual future<std::vector<sstables::shared_sstable>> maybe_split_sstable(const sstables::shared_sstable& sst) = 0;
virtual dht::token_range get_token_range_after_split(const dht::token&) const noexcept = 0;

View File

@@ -587,7 +587,7 @@ public:
// Precondition: table needs tablet splitting.
// Returns true if all storage of table is ready for splitting.
bool all_storage_groups_split();
future<> split_all_storage_groups();
future<> split_all_storage_groups(tasks::task_info tablet_split_task_info);
// Splits compaction group of a single tablet, if and only if the underlying table has
// split request emitted by coordinator (found in tablet metadata).

View File

@@ -724,7 +724,7 @@ public:
};
}
bool all_storage_groups_split() override { return true; }
future<> split_all_storage_groups() override { return make_ready_future(); }
future<> split_all_storage_groups(tasks::task_info tablet_split_task_info) override { return make_ready_future(); }
future<> maybe_split_compaction_group_of(size_t idx) override { return make_ready_future(); }
future<std::vector<sstables::shared_sstable>> maybe_split_sstable(const sstables::shared_sstable& sst) override {
return make_ready_future<std::vector<sstables::shared_sstable>>(std::vector<sstables::shared_sstable>{sst});
@@ -857,7 +857,7 @@ public:
locator::table_load_stats table_load_stats(std::function<bool(const locator::tablet_map&, locator::global_tablet_id)> tablet_filter) const noexcept override;
bool all_storage_groups_split() override;
future<> split_all_storage_groups() override;
future<> split_all_storage_groups(tasks::task_info tablet_split_task_info) override;
future<> maybe_split_compaction_group_of(size_t idx) override;
future<std::vector<sstables::shared_sstable>> maybe_split_sstable(const sstables::shared_sstable& sst) override;
dht::token_range get_token_range_after_split(const dht::token& token) const noexcept override {
@@ -978,7 +978,7 @@ future<> storage_group::remove_empty_merging_groups() {
std::erase_if(_merging_groups, std::mem_fn(&compaction_group::empty));
}
future<> storage_group::split(sstables::compaction_type_options::split opt) {
future<> storage_group::split(sstables::compaction_type_options::split opt, tasks::task_info tablet_split_task_info) {
if (set_split_mode()) {
co_return;
}
@@ -994,8 +994,8 @@ future<> storage_group::split(sstables::compaction_type_options::split opt) {
auto holder = cg->async_gate().hold();
co_await cg->flush();
// Waits on sstables produced by repair to be integrated into main set; off-strategy is usually a no-op with tablets.
co_await cg->get_compaction_manager().perform_offstrategy(_main_cg->as_table_state(), tasks::task_info{});
co_await cg->get_compaction_manager().perform_split_compaction(_main_cg->as_table_state(), std::move(opt), tasks::task_info{});
co_await cg->get_compaction_manager().perform_offstrategy(_main_cg->as_table_state(), tablet_split_task_info);
co_await cg->get_compaction_manager().perform_split_compaction(_main_cg->as_table_state(), std::move(opt), tablet_split_task_info);
}
}
@@ -1055,23 +1055,24 @@ sstables::compaction_type_options::split tablet_storage_group_manager::split_com
}};
}
future<> tablet_storage_group_manager::split_all_storage_groups() {
future<> tablet_storage_group_manager::split_all_storage_groups(tasks::task_info tablet_split_task_info) {
sstables::compaction_type_options::split opt = split_compaction_options();
co_await for_each_storage_group_gently([opt] (storage_group& storage_group) {
return storage_group.split(opt);
co_await for_each_storage_group_gently([opt, tablet_split_task_info] (storage_group& storage_group) {
return storage_group.split(opt, tablet_split_task_info);
});
}
future<> table::split_all_storage_groups() {
future<> table::split_all_storage_groups(tasks::task_info tablet_split_task_info) {
auto holder = async_gate().hold();
co_await _sg_manager->split_all_storage_groups();
co_await _sg_manager->split_all_storage_groups(tablet_split_task_info);
}
future<> tablet_storage_group_manager::maybe_split_compaction_group_of(size_t idx) {
if (!tablet_map().needs_split()) {
return make_ready_future<>();
}
tasks::task_info tablet_split_task_info{tasks::task_id{tablet_map().resize_task_info().tablet_task_id.uuid()}, 0};
auto& sg = _storage_groups[idx];
if (!sg) {
@@ -1079,7 +1080,7 @@ future<> tablet_storage_group_manager::maybe_split_compaction_group_of(size_t id
idx, schema()->ks_name(), schema()->cf_name()));
}
return sg->split(split_compaction_options());
return sg->split(split_compaction_options(), tablet_split_task_info);
}
future<std::vector<sstables::shared_sstable>>

View File

@@ -39,13 +39,15 @@ public:
tablet_mutation_builder& set_session(dht::token last_token, service::session_id);
tablet_mutation_builder& del_session(dht::token last_token);
tablet_mutation_builder& del_transition(dht::token last_token);
tablet_mutation_builder& set_resize_decision(locator::resize_decision);
tablet_mutation_builder& set_resize_decision(locator::resize_decision, const gms::feature_service&);
tablet_mutation_builder& set_repair_scheduler_config(locator::repair_scheduler_config);
tablet_mutation_builder& set_repair_time(dht::token last_token, db_clock::time_point repair_time);
tablet_mutation_builder& set_repair_task_info(dht::token last_token, locator::tablet_task_info info);
tablet_mutation_builder& del_repair_task_info(dht::token last_token);
tablet_mutation_builder& set_migration_task_info(dht::token last_token, locator::tablet_task_info info, const gms::feature_service& features);
tablet_mutation_builder& del_migration_task_info(dht::token last_token, const gms::feature_service& features);
tablet_mutation_builder& set_resize_task_info(locator::tablet_task_info info, const gms::feature_service& features);
tablet_mutation_builder& del_resize_task_info(const gms::feature_service& features);
mutation build() {
return std::move(_m);

View File

@@ -75,6 +75,7 @@ schema_ptr make_tablets_schema() {
.with_column("repair_task_info", tablet_task_info_type)
.with_column("repair_scheduler_config", repair_scheduler_config_type, column_kind::static_column)
.with_column("migration_task_info", tablet_task_info_type)
.with_column("resize_task_info", tablet_task_info_type, column_kind::static_column)
.with_hash_version()
.build();
}
@@ -128,6 +129,9 @@ tablet_map_to_mutation(const tablet_map& tablets, table_id id, const sstring& ke
m.set_static_cell("table_name", data_value(table_name), ts);
m.set_static_cell("resize_type", data_value(tablets.resize_decision().type_name()), ts);
m.set_static_cell("resize_seq_number", data_value(int64_t(tablets.resize_decision().sequence_number)), ts);
if (features.tablet_resize_virtual_task && tablets.resize_task_info().is_valid()) {
m.set_static_cell("resize_task_info", tablet_task_info_to_data_value(tablets.resize_task_info()), ts);
}
if (features.tablet_repair_scheduler) {
m.set_static_cell("repair_scheduler_config", repair_scheduler_config_to_data_value(tablets.repair_scheduler_config()), ts);
}
@@ -214,9 +218,19 @@ tablet_mutation_builder::del_transition(dht::token last_token) {
}
tablet_mutation_builder&
tablet_mutation_builder::set_resize_decision(locator::resize_decision resize_decision) {
tablet_mutation_builder::set_resize_decision(locator::resize_decision resize_decision, const gms::feature_service& features) {
_m.set_static_cell("resize_type", data_value(resize_decision.type_name()), _ts);
_m.set_static_cell("resize_seq_number", data_value(int64_t(resize_decision.sequence_number)), _ts);
if (resize_decision.split_or_merge()) {
auto resize_task_info = std::holds_alternative<resize_decision::split>(resize_decision.way)
? locator::tablet_task_info::make_split_request()
: locator::tablet_task_info::make_merge_request();
resize_task_info.sched_nr++;
resize_task_info.sched_time = db_clock::now();
return set_resize_task_info(std::move(resize_task_info), features);
} else {
return del_resize_task_info(features);
}
return *this;
}
@@ -262,6 +276,23 @@ tablet_mutation_builder::del_migration_task_info(dht::token last_token, const gm
return *this;
}
tablet_mutation_builder&
tablet_mutation_builder::set_resize_task_info(locator::tablet_task_info resize_task_info, const gms::feature_service& features) {
if (features.tablet_resize_virtual_task) {
_m.set_static_cell("resize_task_info", tablet_task_info_to_data_value(resize_task_info), _ts);
}
return *this;
}
tablet_mutation_builder&
tablet_mutation_builder::del_resize_task_info(const gms::feature_service& features) {
if (features.tablet_resize_virtual_task) {
auto col = _s->get_column_definition("resize_task_info");
_m.set_static_cell(*col, atomic_cell::make_dead(_ts, gc_clock::now()));
}
return *this;
}
mutation make_drop_tablet_map_mutation(table_id id, api::timestamp_type ts) {
auto s = db::system_keyspace::tablets();
mutation m(s, partition_key::from_single_value(*s,
@@ -550,6 +581,9 @@ struct tablet_metadata_builder {
locator::resize_decision resize_decision(std::move(resize_type_name), resize_seq_number);
current->map.set_resize_decision(std::move(resize_decision));
}
if (row.has("resize_task_info")) {
current->map.set_resize_task_info(deserialize_tablet_task_info(row.get_view("resize_task_info")));
}
if (row.has("repair_scheduler_config")) {
auto config = deserialize_repair_scheduler_config(row.get_view("repair_scheduler_config"));

View File

@@ -202,7 +202,7 @@ storage_service::storage_service(abort_source& abort_source,
, _group0(nullptr)
, _node_ops_abort_thread(node_ops_abort_thread())
, _node_ops_module(make_shared<node_ops::task_manager_module>(tm, *this))
, _tablets_module(make_shared<service::task_manager_module>(tm))
, _tablets_module(make_shared<service::task_manager_module>(tm, *this))
, _address_map(address_map)
, _shared_token_metadata(stm)
, _erm_factory(erm_factory)
@@ -5402,6 +5402,8 @@ future<> storage_service::load_tablet_metadata(const locator::tablet_metadata_ch
}
future<> storage_service::process_tablet_split_candidate(table_id table) noexcept {
tasks::task_info tablet_split_task_info;
auto all_compaction_groups_split = [&] () mutable {
return _db.map_reduce0([table_ = table] (replica::database& db) {
auto all_split = db.find_column_family(table_).all_storage_groups_split();
@@ -5410,8 +5412,8 @@ future<> storage_service::process_tablet_split_candidate(table_id table) noexcep
};
auto split_all_compaction_groups = [&] () -> future<> {
return _db.invoke_on_all([table] (replica::database& db) -> future<> {
return db.find_column_family(table).split_all_storage_groups();
return _db.invoke_on_all([table, tablet_split_task_info] (replica::database& db) -> future<> {
return db.find_column_family(table).split_all_storage_groups(tablet_split_task_info);
});
};
@@ -5427,6 +5429,7 @@ future<> storage_service::process_tablet_split_candidate(table_id table) noexcep
release_guard(std::move(guard));
break;
}
tablet_split_task_info.id = tasks::task_id{tmap.resize_task_info().tablet_task_id.uuid()};
if (co_await all_compaction_groups_split()) {
slogger.debug("All compaction groups of table {} are split ready.", table);

View File

@@ -998,7 +998,7 @@ private:
friend class join_node_rpc_handshaker;
friend class node_ops::node_ops_virtual_task;
friend class node_ops::task_manager_module;
friend class tasks::task_manager;
friend class tablet_virtual_task;
};

View File

@@ -17,6 +17,12 @@
namespace service {
struct status_helper {
tasks::task_status status;
utils::chunked_vector<locator::tablet_id> tablets;
std::optional<locator::tablet_replica> pending_replica;
};
tasks::task_manager::task_group tablet_virtual_task::get_group() const noexcept {
return tasks::task_manager::task_group::tablets_group;
}
@@ -25,6 +31,21 @@ static std::optional<locator::tablet_task_type> maybe_get_task_type(const locato
return task_info.is_valid() && task_info.tablet_task_id.uuid() == task_id.uuid() ? std::make_optional(task_info.request_type) : std::nullopt;
}
static sstring get_scope(locator::tablet_task_type task_type) {
switch (task_type) {
case locator::tablet_task_type::user_repair:
case locator::tablet_task_type::split:
case locator::tablet_task_type::merge:
return "table";
case locator::tablet_task_type::auto_repair:
case locator::tablet_task_type::migration:
case locator::tablet_task_type::intranode_migration:
return "tablet";
case locator::tablet_task_type::none:
on_internal_error(tasks::tmlogger, "attempted to get the scope for none task type");
}
}
static std::optional<tasks::task_stats> maybe_make_task_stats(const locator::tablet_task_info& task_info, schema_ptr schema) {
if (!task_info.is_valid()) {
return std::nullopt;
@@ -34,7 +55,7 @@ static std::optional<tasks::task_stats> maybe_make_task_stats(const locator::tab
.task_id = tasks::task_id{task_info.tablet_task_id.uuid()},
.type = locator::tablet_task_type_to_string(task_info.request_type),
.kind = tasks::task_kind::cluster,
.scope = task_info.is_user_repair_request() ? "table" : "tablet",
.scope = get_scope(task_info.request_type),
.state = tasks::task_manager::task_state::running,
.keyspace = schema->ks_name(),
.table = schema->cf_name()
@@ -45,14 +66,29 @@ static bool is_repair_task(const locator::tablet_task_type& task_type) {
return task_type == locator::tablet_task_type::user_repair || task_type == locator::tablet_task_type::auto_repair;
}
static bool is_migration_task(const locator::tablet_task_type& task_type) {
return task_type == locator::tablet_task_type::migration || task_type == locator::tablet_task_type::intranode_migration;
}
static bool is_resize_task(const locator::tablet_task_type& task_type) {
return task_type == locator::tablet_task_type::split || task_type == locator::tablet_task_type::merge;
}
static bool tablet_id_provided(const locator::tablet_task_type& task_type) {
return !is_repair_task(task_type);
return is_migration_task(task_type);
}
future<std::optional<tasks::virtual_task_hint>> tablet_virtual_task::contains(tasks::task_id task_id) const {
auto tables = get_table_ids();
for (auto table : tables) {
auto& tmap = _ss.get_token_metadata().tablets().get_tablet_map(table);
if (auto task_type = maybe_get_task_type(tmap.resize_task_info(), task_id); task_type.has_value()) {
co_return tasks::virtual_task_hint{
.table_id = table,
.task_type = task_type.value(),
.tablet_id = std::nullopt,
};
}
std::optional<locator::tablet_id> tid = tmap.first_tablet();
for (const locator::tablet_info& info : tmap.tablets()) {
auto task_type = maybe_get_task_type(info.repair_task_info, task_id).or_else([&] () {
@@ -78,9 +114,10 @@ future<tasks::is_abortable> tablet_virtual_task::is_abortable(tasks::virtual_tas
}
future<std::optional<tasks::task_status>> tablet_virtual_task::get_status(tasks::task_id id, tasks::virtual_task_hint hint) {
utils::chunked_vector<locator::tablet_id> tablets;
std::optional<locator::tablet_replica> pending_replica;
co_return co_await get_status_helper(id, tablets, std::move(hint), pending_replica);
auto res = co_await get_status_helper(id, std::move(hint));
co_return res.transform([] (status_helper s) {
return s.status;
});
}
future<std::optional<tasks::task_status>> tablet_virtual_task::wait(tasks::task_id id, tasks::virtual_task_hint hint) {
@@ -88,38 +125,43 @@ future<std::optional<tasks::task_status>> tablet_virtual_task::wait(tasks::task_
auto task_type = hint.get_task_type();
auto tablet_id_opt = tablet_id_provided(task_type) ? std::make_optional(hint.get_tablet_id()) : std::nullopt;
utils::chunked_vector<locator::tablet_id> tablets;
std::optional<locator::tablet_replica> pending_replica;
auto status = co_await get_status_helper(id, tablets, std::move(hint), pending_replica);
if (!status) {
size_t tablet_count = _ss.get_token_metadata().tablets().get_tablet_map(table).tablet_count();
auto res = co_await get_status_helper(id, std::move(hint));
if (!res) {
co_return std::nullopt;
}
tasks::tmlogger.info("tablet_virtual_task: wait until tablet operation is finished");
co_await _ss._topology_state_machine.event.wait([&] {
auto& tmap = _ss.get_token_metadata().tablets().get_tablet_map(table);
if (tablet_id_opt.has_value()) {
if (is_resize_task(task_type)) { // Resize task.
return tmap.resize_task_info().tablet_task_id.uuid() != id.uuid();
} else if (tablet_id_opt.has_value()) { // Migration task.
return tmap.get_tablet_info(tablet_id_opt.value()).migration_task_info.tablet_task_id.uuid() != id.uuid();
} else { // Repair task.
return std::all_of(res->tablets.begin(), res->tablets.end(), [&] (const locator::tablet_id& tablet) {
return tmap.get_tablet_info(tablet).repair_task_info.tablet_task_id.uuid() != id.uuid();
});
}
return std::all_of(tablets.begin(), tablets.end(), [&] (const locator::tablet_id& tablet) {
return tmap.get_tablet_info(tablet).repair_task_info.tablet_task_id.uuid() != id.uuid();
});
});
status->state = tasks::task_manager::task_state::done; // Failed repair task is retried.
if (!is_repair_task(task_type)) {
res->status.state = tasks::task_manager::task_state::done; // Failed repair task is retried.
if (is_migration_task(task_type)) {
auto& replicas = _ss.get_token_metadata().tablets().get_tablet_map(table).get_tablet_info(tablet_id_opt.value()).replicas;
auto migration_failed = std::all_of(replicas.begin(), replicas.end(), [&] (const auto& replica) { return pending_replica.has_value() && replica != pending_replica.value(); });
status->state = migration_failed ? tasks::task_manager::task_state::failed : tasks::task_manager::task_state::done;
auto migration_failed = std::all_of(replicas.begin(), replicas.end(), [&] (const auto& replica) { return res->pending_replica.has_value() && replica != res->pending_replica.value(); });
res->status.state = migration_failed ? tasks::task_manager::task_state::failed : tasks::task_manager::task_state::done;
} else if (is_resize_task(task_type)) {
auto new_tablet_count = _ss.get_token_metadata().tablets().get_tablet_map(table).tablet_count();
res->status.state = new_tablet_count == tablet_count ? tasks::task_manager::task_state::suspended : tasks::task_manager::task_state::done;
res->status.children = task_type == locator::tablet_task_type::split ? co_await get_children(get_module(), id) : std::vector<tasks::task_identity>{};
}
status->end_time = db_clock::now(); // FIXME: Get precise end time.
co_return status;
res->status.end_time = db_clock::now(); // FIXME: Get precise end time.
co_return res->status;
}
future<> tablet_virtual_task::abort(tasks::task_id id, tasks::virtual_task_hint hint) noexcept {
auto table = hint.get_table_id();
auto task_type = hint.get_task_type();
if (!is_repair_task(task_type)) {
on_internal_error(tasks::tmlogger, format("non-abortable task {} of type {} cannot be aborted", id, task_type));
}
@@ -134,6 +176,10 @@ future<std::vector<tasks::task_stats>> tablet_virtual_task::get_stats() {
auto schema = _ss._db.local().get_tables_metadata().get_table(table).schema();
std::unordered_map<tasks::task_id, tasks::task_stats> user_requests;
std::unordered_map<tasks::task_id, size_t> sched_num_sum;
auto resize_stats = maybe_make_task_stats(tmap.resize_task_info(), schema);
if (resize_stats) {
res.push_back(std::move(resize_stats.value()));
}
co_await tmap.for_each_tablet([&] (locator::tablet_id tid, const locator::tablet_info& info) {
auto repair_stats = maybe_make_task_stats(info.repair_task_info, schema);
if (repair_stats) {
@@ -171,15 +217,16 @@ std::vector<table_id> tablet_virtual_task::get_table_ids() const {
static void update_status(const locator::tablet_task_info& task_info, tasks::task_status& status, size_t& sched_nr) {
sched_nr += task_info.sched_nr;
status.type = locator::tablet_task_type_to_string(task_info.request_type);
status.scope = task_info.is_user_repair_request() ? "table" : "tablet";
status.scope = get_scope(task_info.request_type);
status.start_time = task_info.request_time;
}
future<std::optional<tasks::task_status>> tablet_virtual_task::get_status_helper(tasks::task_id id, utils::chunked_vector<locator::tablet_id>& tablets, tasks::virtual_task_hint hint, std::optional<locator::tablet_replica>& pending_replica) {
future<std::optional<status_helper>> tablet_virtual_task::get_status_helper(tasks::task_id id, tasks::virtual_task_hint hint) {
status_helper res;
auto table = hint.get_table_id();
auto task_type = hint.get_task_type();
auto schema = _ss._db.local().get_tables_metadata().get_table(table).schema();
tasks::task_status res{
res.status = {
.task_id = id,
.kind = tasks::task_kind::cluster,
.is_abortable = co_await is_abortable(std::move(hint)),
@@ -192,31 +239,44 @@ future<std::optional<tasks::task_status>> tablet_virtual_task::get_status_helper
co_await tmap.for_each_tablet([&] (locator::tablet_id tid, const locator::tablet_info& info) {
auto& task_info = info.repair_task_info;
if (task_info.tablet_task_id.uuid() == id.uuid()) {
update_status(task_info, res, sched_nr);
tablets.push_back(tid);
update_status(task_info, res.status, sched_nr);
res.tablets.push_back(tid);
}
return make_ready_future();
});
} else { // Migration task.
} else if (is_migration_task(task_type)) { // Migration task.
auto tablet_id = hint.get_tablet_id();
pending_replica = tmap.get_tablet_transition_info(tablet_id)->pending_replica;
res.pending_replica = tmap.get_tablet_transition_info(tablet_id)->pending_replica;
auto& task_info = tmap.get_tablet_info(tablet_id).migration_task_info;
if (task_info.tablet_task_id.uuid() == id.uuid()) {
update_status(task_info, res, sched_nr);
tablets.push_back(tablet_id);
update_status(task_info, res.status, sched_nr);
res.tablets.push_back(tablet_id);
}
} else { // Resize task.
auto& task_info = tmap.resize_task_info();
if (task_info.tablet_task_id.uuid() == id.uuid()) {
update_status(task_info, res.status, sched_nr);
res.status.state = tasks::task_manager::task_state::running;
res.status.children = task_type == locator::tablet_task_type::split ? co_await get_children(get_module(), id) : std::vector<tasks::task_identity>{};
co_return res;
}
}
if (!tablets.empty()) {
res.state = sched_nr == 0 ? tasks::task_manager::task_state::created : tasks::task_manager::task_state::running;
if (!res.tablets.empty()) {
res.status.state = sched_nr == 0 ? tasks::task_manager::task_state::created : tasks::task_manager::task_state::running;
co_return res;
}
// FIXME: Show finished tasks.
co_return std::nullopt;
}
task_manager_module::task_manager_module(tasks::task_manager& tm) noexcept
task_manager_module::task_manager_module(tasks::task_manager& tm, service::storage_service& ss) noexcept
: tasks::task_manager::module(tm, "tablets")
, _ss(ss)
{}
std::set<gms::inet_address> task_manager_module::get_nodes() const {
return get_task_manager().get_nodes(_ss);
}
}

View File

@@ -19,6 +19,7 @@ class tablet_replica;
namespace service {
class status_helper;
class storage_service;
class tablet_virtual_task : public tasks::task_manager::virtual_task::impl {
@@ -42,11 +43,15 @@ public:
virtual future<std::vector<tasks::task_stats>> get_stats() override;
private:
std::vector<table_id> get_table_ids() const;
future<std::optional<tasks::task_status>> get_status_helper(tasks::task_id id, utils::chunked_vector<locator::tablet_id>& tablets, tasks::virtual_task_hint hint, std::optional<locator::tablet_replica>& pending_replica);
future<std::optional<status_helper>> get_status_helper(tasks::task_id id, tasks::virtual_task_hint hint);
};
class task_manager_module : public tasks::task_manager::module {
private:
service::storage_service& _ss;
public:
task_manager_module(tasks::task_manager& tm) noexcept;
task_manager_module(tasks::task_manager& tm, service::storage_service& ss) noexcept;
std::set<gms::inet_address> get_nodes() const override;
};
}

View File

@@ -1283,7 +1283,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
table_id, resize_decision.type_name(), resize_decision.sequence_number);
out.emplace_back(
replica::tablet_mutation_builder(guard.write_timestamp(), table_id)
.set_resize_decision(std::move(resize_decision))
.set_resize_decision(std::move(resize_decision), _db.features())
.build());
}

View File

@@ -20,6 +20,7 @@
#include "message/messaging_service.hh"
#include "utils/assert.hh"
#include "utils/overloaded_functor.hh"
#include "service/storage_service.hh"
#include "tasks/task_handler.hh"
#include "task_manager.hh"
#include "tasks/virtual_task_hint.hh"
@@ -155,7 +156,7 @@ is_abortable task_manager::task::impl::is_abortable() const noexcept {
}
is_internal task_manager::task::impl::is_internal() const noexcept {
return tasks::is_internal(bool(_parent_id));
return tasks::is_internal(_parent_id && _parent_kind != task_kind::cluster);
}
tasks::is_user_task task_manager::task::impl::is_user_task() const noexcept {
@@ -491,6 +492,10 @@ task_manager& task_manager::module::get_task_manager() noexcept {
return _tm;
}
const task_manager& task_manager::module::get_task_manager() const noexcept {
return _tm;
}
abort_source& task_manager::module::abort_source() noexcept {
return _as;
}
@@ -527,7 +532,7 @@ const task_manager::tasks_collection& task_manager::module::get_tasks_collection
return _tasks;
}
std::set<gms::inet_address> task_manager::module::get_nodes() const noexcept {
std::set<gms::inet_address> task_manager::module::get_nodes() const {
return {_tm.get_broadcast_address()};
}
@@ -681,6 +686,17 @@ const task_manager::tasks_collection& task_manager::get_tasks_collection() const
return _tasks;
}
std::set<gms::inet_address> task_manager::get_nodes(service::storage_service& ss) const {
return std::ranges::join_view(std::to_array({
std::views::all(ss._topology_state_machine._topology.normal_nodes),
std::views::all(ss._topology_state_machine._topology.transition_nodes)})
) | std::views::transform([&ss] (auto& node) {
return ss.host2ip(locator::host_id{node.first.uuid()});
}) | std::views::filter([&ss] (gms::inet_address ip) {
return ss._gossiper.is_alive(ip);
}) | std::ranges::to<std::set<gms::inet_address>>();
}
future<std::vector<task_id>> task_manager::get_virtual_task_children(task_id parent_id) {
return container().map_reduce0([parent_id] (task_manager& tm) {
return tm.get_local_tasks() |

View File

@@ -29,6 +29,10 @@ namespace repair {
class task_manager_module;
}
namespace service {
class storage_service;
}
namespace netw {
class messaging_service;
}
@@ -100,7 +104,8 @@ public:
created,
running,
done,
failed
failed,
suspended
};
enum class task_group {
@@ -322,6 +327,7 @@ public:
uint64_t new_sequence_number() noexcept;
task_manager& get_task_manager() noexcept;
const task_manager& get_task_manager() const noexcept;
seastar::abort_source& abort_source() noexcept;
gate& async_gate() noexcept;
const std::string& get_name() const noexcept;
@@ -332,7 +338,7 @@ public:
tasks_collection& get_tasks_collection() noexcept;
const tasks_collection& get_tasks_collection() const noexcept;
// Returns a set of nodes on which some of virtual tasks on this module can have their children.
virtual std::set<gms::inet_address> get_nodes() const noexcept;
virtual std::set<gms::inet_address> get_nodes() const;
future<utils::chunked_vector<task_stats>> get_stats(is_internal internal, std::function<bool(std::string&, std::string&)> filter) const;
void register_task(task_ptr task);
@@ -385,6 +391,8 @@ public:
const tasks_collection& get_tasks_collection() const noexcept;
future<std::vector<task_id>> get_virtual_task_children(task_id parent_id);
std::set<gms::inet_address> get_nodes(service::storage_service& ss) const;
module_ptr make_module(std::string name);
void register_module(std::string name, module_ptr module);
module_ptr find_module(std::string module_name);

View File

@@ -18,7 +18,7 @@ struct task_info {
task_id id;
unsigned shard;
task_info() noexcept : id(task_id::create_null_id()) {}
task_info() noexcept : id(task_id::create_null_id()), shard(0) {}
task_info(task_id id, unsigned parent_shard) noexcept : id(id), shard(parent_shard) {}
operator bool() const noexcept {

View File

@@ -190,7 +190,7 @@ SEASTAR_TEST_CASE(test_tablet_sstable_set_copy_ctor) {
}
auto& cf = env.local_db().find_column_family("test_tablet_sstable_set_copy_ctor", "test");
auto& sgm = column_family_test::get_storage_group_manager(cf);
sgm->split_all_storage_groups().get();
sgm->split_all_storage_groups(tasks::task_info{}).get();
auto tablet_sstable_set = replica::make_tablet_sstable_set(cf.schema(), *sgm.get(), locator::tablet_map(8));
auto tablet_sstable_set_copy = *tablet_sstable_set.get();

View File

@@ -283,6 +283,7 @@ SEASTAR_TEST_CASE(test_tablet_metadata_persistence) {
decision.way = locator::resize_decision::split{},
decision.sequence_number = 1;
tmap.set_resize_decision(decision);
tmap.set_resize_task_info(locator::tablet_task_info::make_split_request());
tm.set_tablet_map(table1, std::move(tmap));
}
@@ -980,7 +981,7 @@ SEASTAR_TEST_CASE(test_mutation_builder) {
tablet_replica {h2, 3},
});
b.del_transition(last_token);
b.set_resize_decision(resize_decision);
b.set_resize_decision(resize_decision, e.local_db().features());
e.local_db().apply({freeze(b.build())}, db::no_timeout).get();
}
@@ -1003,6 +1004,7 @@ SEASTAR_TEST_CASE(test_mutation_builder) {
expected_tmap.set_resize_decision(resize_decision);
auto tm_from_disk = read_tablet_metadata(e.local_qp()).get();
expected_tmap.set_resize_task_info(tm_from_disk.get_tablet_map(table1).resize_task_info());
BOOST_REQUIRE_EQUAL(expected_tmap, tm_from_disk.get_tablet_map(table1));
}
}, tablet_cql_test_config());
@@ -2774,7 +2776,7 @@ SEASTAR_THREAD_TEST_CASE(basic_tablet_storage_splitting_test) {
e.db().invoke_on_all([] (replica::database& db) {
auto& table = db.find_column_family("ks", "cf");
testlog.info("sstable count: {}", table.sstables_count());
return table.split_all_storage_groups();
return table.split_all_storage_groups(tasks::task_info{});
}).get();
testlog.info("Verifying sstables are split...");

View File

@@ -23,6 +23,7 @@ class State(StrEnum):
running = "running"
done = "done"
failed = "failed"
suspended = "suspended"
class TaskStats(NamedTuple):

View File

@@ -5,6 +5,8 @@
#
import asyncio
import random
from typing import Optional
import pytest
from test.pylib.internal_types import ServerInfo
@@ -24,23 +26,24 @@ async def disable_injection(manager: ManagerClient, servers: list[ServerInfo], i
for server in servers:
await manager.api.disable_injection(server.ip_addr, injection)
async def wait_tasks_created(tm: TaskManagerClient, server: ServerInfo, module_name: str, expected_number: int, type: str):
async def wait_tasks_created(tm: TaskManagerClient, server: ServerInfo, module_name: str, expected_number: int, type: str, table: Optional[str] = None):
async def get_tasks():
return [task for task in await tm.list_tasks(server.ip_addr, module_name) if task.kind == "cluster" and task.type == type and task.keyspace == "test"]
tasks = [task for task in await tm.list_tasks(server.ip_addr, module_name) if task.kind == "cluster" and task.type == type and task.keyspace == "test"]
return [task for task in tasks if not table or table == task.table]
tasks = await get_tasks()
while len(tasks) != expected_number:
tasks = await get_tasks()
return tasks
def check_task_status(status: TaskStatus, states: list[str], type: str, scope: str, abortable: bool):
def check_task_status(status: TaskStatus, states: list[str], type: str, scope: str, abortable: bool, keyspace: str = "test", table: str = "test", possible_child_num: list[int] = [0]):
assert status.scope == scope
assert status.kind == "cluster"
assert status.type == type
assert status.keyspace == "test"
assert status.table == "test"
assert status.keyspace == keyspace
assert status.table == table
assert status.is_abortable == abortable
assert not status.children_ids
assert len(status.children_ids) in possible_child_num
assert status.state in states
async def check_and_abort_repair_task(manager: ManagerClient, tm: TaskManagerClient, servers: list[ServerInfo], module_name: str):
@@ -312,3 +315,181 @@ async def test_repair_task_info_is_none_when_no_running_repair(manager: ManagerC
await check_none()
await asyncio.gather(repair_task(), wait_and_check_none())
async def prepare_split(manager: ManagerClient, server: ServerInfo, keyspace: str, table: str, keys: list[int]):
await manager.api.disable_tablet_balancing(server.ip_addr)
cql = manager.get_cql()
insert = cql.prepare(f"INSERT INTO {keyspace}.{table}(pk, c) VALUES(?, ?)")
for pk in keys:
value = random.randbytes(1000)
cql.execute(insert, [pk, value])
await manager.api.flush_keyspace(server.ip_addr, keyspace)
async def prepare_merge(manager: ManagerClient, server: ServerInfo, keyspace: str, table: str, keys: list[int]):
await manager.api.disable_tablet_balancing(server.ip_addr)
cql = manager.get_cql()
await asyncio.gather(*[cql.run_async(f"DELETE FROM {keyspace}.{table} WHERE pk={k};") for k in keys])
await manager.api.flush_keyspace(server.ip_addr, keyspace)
async def enable_tablet_balancing_and_wait(manager: ManagerClient, server: ServerInfo, message: str):
s1_log = await manager.server_open_log(server.server_id)
s1_mark = await s1_log.mark()
await manager.api.enable_tablet_balancing(server.ip_addr)
await s1_log.wait_for(message, from_mark=s1_mark)
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_tablet_resize_task(manager: ManagerClient):
module_name = "tablets"
tm = TaskManagerClient(manager.api)
cmdline = [
'--target-tablet-size-in-bytes', '30000',
]
servers = [await manager.server_add(cmdline=cmdline, config={
'error_injections_at_startup': ['short_tablet_stats_refresh_interval']
})]
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
cql = manager.get_cql()
keyspace = "test"
table1 = "test1"
table2 = "test2"
await cql.run_async(f"CREATE KEYSPACE {keyspace} WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} AND tablets = {{'initial': 1}};")
await cql.run_async(f"CREATE TABLE {keyspace}.{table1} (pk int PRIMARY KEY, c blob) WITH gc_grace_seconds=0 AND bloom_filter_fp_chance=1;")
await cql.run_async(f"CREATE TABLE {keyspace}.{table2} (pk int PRIMARY KEY, c blob) WITH gc_grace_seconds=0 AND bloom_filter_fp_chance=1;")
total_keys = 60
keys = range(total_keys)
await prepare_split(manager, servers[0], keyspace, table1, keys)
await enable_tablet_balancing_and_wait(manager, servers[0], "Detected tablet split for table")
await wait_tasks_created(tm, servers[0], module_name, 0, "split", table1)
await prepare_split(manager, servers[0], keyspace, table2, keys)
await prepare_merge(manager, servers[0], keyspace, table1, keys[:-1])
await manager.api.keyspace_compaction(servers[0].ip_addr, "test")
injection = "tablet_split_finalization_postpone"
await enable_injection(manager, servers, injection)
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
async def wait_and_check_status(server, type, keyspace, table):
task = (await wait_tasks_created(tm, server, module_name, 1, type, table))[0]
status = await tm.get_task_status(server.ip_addr, task.task_id)
check_task_status(status, ["running"], type, "table", False, keyspace, table, [0, 1, 2])
await wait_and_check_status(servers[0], "split", keyspace, table2)
await wait_and_check_status(servers[0], "merge", keyspace, table1)
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_tablet_resize_list(manager: ManagerClient):
module_name = "tablets"
tm = TaskManagerClient(manager.api)
cmdline = [
'--target-tablet-size-in-bytes', '30000',
]
servers = [await manager.server_add(cmdline=cmdline, config={
'error_injections_at_startup': ['short_tablet_stats_refresh_interval']
})]
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
cql = manager.get_cql()
keyspace = "test"
table1 = "test1"
await cql.run_async(f"CREATE KEYSPACE {keyspace} WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} AND tablets = {{'initial': 1}};")
await cql.run_async(f"CREATE TABLE {keyspace}.{table1} (pk int PRIMARY KEY, c blob) WITH gc_grace_seconds=0 AND bloom_filter_fp_chance=1;")
total_keys = 60
keys = range(total_keys)
await prepare_split(manager, servers[0], keyspace, table1, keys)
servers.append(await manager.server_add(cmdline=cmdline, config={
'error_injections_at_startup': ['short_tablet_stats_refresh_interval']
}))
s1_log = await manager.server_open_log(servers[0].server_id)
s1_mark = await s1_log.mark()
injection = "tablet_split_finalization_postpone"
compaction_injection = "split_sstable_rewrite"
await enable_injection(manager, servers, injection)
await manager.api.enable_injection(servers[0].ip_addr, compaction_injection, one_shot=True)
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
task0 = (await wait_tasks_created(tm, servers[0], module_name, 1, "split", table1))[0]
task1 = (await wait_tasks_created(tm, servers[1], module_name, 1, "split", table1))[0]
assert task0.task_id == task1.task_id
for task in [task0, task1]:
assert task.state == "running"
assert task.type == "split"
assert task.kind == "cluster"
assert task.scope == "table"
assert task.table == table1
assert task.keyspace == keyspace
await s1_log.wait_for("split_sstable_rewrite: waiting", from_mark=s1_mark)
await manager.api.message_injection(servers[0].ip_addr, "split_sstable_rewrite")
status1 = await tm.get_task_status(servers[1].ip_addr, task0.task_id)
status0 = await tm.get_task_status(servers[0].ip_addr, task0.task_id)
assert len(status0.children_ids) == 2
assert status0.children_ids == status1.children_ids
await disable_injection(manager, servers, injection)
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
@skip_mode('debug', 'debug mode is too time-sensitive')
async def test_tablet_resize_revoked(manager: ManagerClient):
module_name = "tablets"
tm = TaskManagerClient(manager.api)
cmdline = [
'--target-tablet-size-in-bytes', '30000',
]
servers = [await manager.server_add(cmdline=cmdline, config={
'error_injections_at_startup': ['short_tablet_stats_refresh_interval']
})]
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
cql = manager.get_cql()
keyspace = "test"
table1 = "test1"
await cql.run_async(f"CREATE KEYSPACE {keyspace} WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} AND tablets = {{'initial': 1}};")
await cql.run_async(f"CREATE TABLE {keyspace}.{table1} (pk int PRIMARY KEY, c blob) WITH gc_grace_seconds=0 AND bloom_filter_fp_chance=1;")
total_keys = 60
keys = range(total_keys)
await prepare_split(manager, servers[0], keyspace, table1, keys)
injection = "tablet_split_finalization_postpone"
await enable_injection(manager, servers, injection)
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
task0 = (await wait_tasks_created(tm, servers[0], module_name, 1, "split", table1))[0]
log = await manager.server_open_log(servers[0].server_id)
mark = await log.mark()
async def revoke_resize(log, mark):
await log.wait_for('tablet_virtual_task: wait until tablet operation is finished', from_mark=mark)
await asyncio.gather(*[cql.run_async(f"DELETE FROM {keyspace}.{table1} WHERE pk={k};") for k in keys])
await manager.api.flush_keyspace(servers[0].ip_addr, keyspace)
async def wait_for_task(task_id):
status = await tm.wait_for_task(servers[0].ip_addr, task_id)
check_task_status(status, ["suspended"], "split", "table", False, keyspace, table1, [0, 1, 2])
await asyncio.gather(revoke_resize(log, mark), wait_for_task(task0.task_id))