tasks: repair: api: remove type attribute from task_manager::task::status

This commit is contained in:
Aleksandra Martyniuk
2022-12-02 11:54:39 +01:00
parent 8d5377932d
commit 5bc09daa7a
7 changed files with 23 additions and 30 deletions

View File

@@ -30,6 +30,7 @@ inline bool filter_tasks(tasks::task_manager::task_ptr task, std::unordered_map<
struct full_task_status {
tasks::task_manager::task::status task_status;
std::string type;
tasks::task_manager::task::progress progress;
std::string module;
tasks::task_id parent_id;
@@ -52,7 +53,7 @@ tm::task_status make_status(full_task_status status) {
tm::task_status res{};
res.id = status.task_status.id.to_sstring();
res.type = status.task_status.type;
res.type = status.type;
res.state = status.task_status.state;
res.is_abortable = bool(status.abortable);
res.start_time = st;
@@ -77,6 +78,7 @@ future<json::json_return_type> retrieve_status(tasks::task_manager::foreign_task
auto progress = co_await task->get_progress();
full_task_status s;
s.task_status = task->get_status();
s.type = task->type();
s.parent_id = task->get_parent_id();
s.abortable = task->is_abortable();
s.module = task->get_module_name();

View File

@@ -60,7 +60,7 @@ void set_task_manager_test(http_context& ctx, routes& r, db::config& cfg) {
}
auto module = tms.local().find_module("test");
id = co_await module->make_task<tasks::test_task_impl>(shard, id, keyspace, table, type, entity, data);
id = co_await module->make_task<tasks::test_task_impl>(shard, id, keyspace, table, entity, data);
co_await tms.invoke_on(shard, [id] (tasks::task_manager& tm) {
auto it = tm.get_all_tasks().find(id);
if (it != tm.get_all_tasks().end()) {

View File

@@ -553,7 +553,6 @@ get_sharder_for_tables(seastar::sharded<replica::database>& db, const sstring& k
shard_repair_task_impl::shard_repair_task_impl(tasks::task_manager::module_ptr module,
tasks::task_id id,
const sstring& keyspace,
std::string type,
std::exception_ptr ex,
repair_service& repair,
locator::effective_replication_map_ptr erm_,
@@ -566,7 +565,7 @@ shard_repair_task_impl::shard_repair_task_impl(tasks::task_manager::module_ptr m
streaming::stream_reason reason_,
abort_source* as,
bool hints_batchlog_flushed)
: repair_task_impl(module, id, 0, keyspace, "", std::move(type), "", parent_id_.uuid(), reason_)
: repair_task_impl(module, id, 0, keyspace, "", "", parent_id_.uuid(), reason_)
, _ex(std::move(ex))
, rs(repair)
, db(repair.get_db())
@@ -1121,7 +1120,7 @@ future<int> repair_service::do_repair_start(sstring keyspace, std::unordered_map
co_return id.id;
}
auto task_impl_ptr = std::make_unique<user_requested_repair_task_impl>(_repair_module, id, std::move(keyspace), format("{}", streaming::stream_reason::repair), "", germs, std::move(cfs), std::move(ranges), std::move(options.hosts), std::move(options.data_centers), std::move(ignore_nodes));
auto task_impl_ptr = std::make_unique<user_requested_repair_task_impl>(_repair_module, id, std::move(keyspace), "", germs, std::move(cfs), std::move(ranges), std::move(options.hosts), std::move(options.data_centers), std::move(ignore_nodes));
auto task = co_await start_repair_task(std::move(task_impl_ptr), _repair_module);
co_return id.id;
}
@@ -1232,8 +1231,8 @@ future<> user_requested_repair_task_impl::run() {
data_centers, hosts, ignore_nodes, parent_data = get_repair_uniq_id().task_info, germs] (repair_service& local_repair) mutable -> future<> {
std::exception_ptr ex;
local_repair.get_metrics().repair_total_ranges_sum += ranges.size();
auto task_impl_ptr = std::make_unique<shard_repair_task_impl>(local_repair._repair_module, tasks::task_id::create_random_id(), keyspace, format("{}",
streaming::stream_reason::repair), ex, local_repair, germs->get().shared_from_this(), std::move(ranges), std::move(table_ids),
auto task_impl_ptr = std::make_unique<shard_repair_task_impl>(local_repair._repair_module, tasks::task_id::create_random_id(), keyspace,
ex, local_repair, germs->get().shared_from_this(), std::move(ranges), std::move(table_ids),
id, std::move(data_centers), std::move(hosts), std::move(ignore_nodes), streaming::stream_reason::repair, nullptr, hints_batchlog_flushed);
auto task = co_await start_repair_task(std::move(task_impl_ptr), local_repair._repair_module, parent_data);
co_await task->done();
@@ -1307,7 +1306,7 @@ future<> repair_service::sync_data_using_repair(
}
assert(this_shard_id() == 0);
auto task_impl_ptr = std::make_unique<data_sync_repair_task_impl>(_repair_module, _repair_module->new_repair_uniq_id(), std::move(keyspace), format("{}", reason), "", std::move(ranges), std::move(neighbors), reason, ops_info);
auto task_impl_ptr = std::make_unique<data_sync_repair_task_impl>(_repair_module, _repair_module->new_repair_uniq_id(), std::move(keyspace), "", std::move(ranges), std::move(neighbors), reason, ops_info);
auto task = co_await start_repair_task(std::move(task_impl_ptr), _repair_module);
co_await task->done();
}
@@ -1348,7 +1347,7 @@ future<> data_sync_repair_task_impl::run() {
ex = std::current_exception();
}
auto task_impl_ptr = std::make_unique<shard_repair_task_impl>(local_repair._repair_module, tasks::task_id::create_random_id(), keyspace,
format("{}", streaming::stream_reason::repair), ex, local_repair, germs->get().shared_from_this(), std::move(ranges), std::move(table_ids),
ex, local_repair, germs->get().shared_from_this(), std::move(ranges), std::move(table_ids),
id, std::move(data_centers), std::move(hosts), std::move(ignore_nodes), reason, asp, hints_batchlog_flushed);
task_impl_ptr->neighbors = std::move(neighbors);
auto task = co_await start_repair_task(std::move(task_impl_ptr), local_repair._repair_module, parent_data);

View File

@@ -15,8 +15,8 @@ class repair_task_impl : public tasks::task_manager::task::impl {
protected:
streaming::stream_reason _reason;
public:
repair_task_impl(tasks::task_manager::module_ptr module, tasks::task_id id, unsigned sequence_number, std::string keyspace, std::string table, std::string type, std::string entity, tasks::task_id parent_id, streaming::stream_reason reason) noexcept
: tasks::task_manager::task::impl(module, id, sequence_number, std::move(keyspace), std::move(table), std::move(type), std::move(entity), parent_id)
repair_task_impl(tasks::task_manager::module_ptr module, tasks::task_id id, unsigned sequence_number, std::string keyspace, std::string table, std::string entity, tasks::task_id parent_id, streaming::stream_reason reason) noexcept
: tasks::task_manager::task::impl(module, id, sequence_number, std::move(keyspace), std::move(table), std::move(entity), parent_id)
, _reason(reason) {
_status.progress_units = "ranges";
}
@@ -44,8 +44,8 @@ private:
std::vector<sstring> _data_centers;
std::unordered_set<gms::inet_address> _ignore_nodes;
public:
user_requested_repair_task_impl(tasks::task_manager::module_ptr module, repair_uniq_id id, std::string keyspace, std::string type, std::string entity, lw_shared_ptr<locator::global_effective_replication_map> germs, std::vector<sstring> cfs, dht::token_range_vector ranges, std::vector<sstring> hosts, std::vector<sstring> data_centers, std::unordered_set<gms::inet_address> ignore_nodes) noexcept
: repair_task_impl(module, id.uuid(), id.id, std::move(keyspace), "", std::move(type), std::move(entity), tasks::task_id::create_null_id(), streaming::stream_reason::repair)
user_requested_repair_task_impl(tasks::task_manager::module_ptr module, repair_uniq_id id, std::string keyspace, std::string entity, lw_shared_ptr<locator::global_effective_replication_map> germs, std::vector<sstring> cfs, dht::token_range_vector ranges, std::vector<sstring> hosts, std::vector<sstring> data_centers, std::unordered_set<gms::inet_address> ignore_nodes) noexcept
: repair_task_impl(module, id.uuid(), id.id, std::move(keyspace), "", std::move(entity), tasks::task_id::create_null_id(), streaming::stream_reason::repair)
, _germs(germs)
, _cfs(std::move(cfs))
, _ranges(std::move(ranges))
@@ -65,8 +65,8 @@ private:
std::unordered_map<dht::token_range, repair_neighbors> _neighbors;
shared_ptr<node_ops_info> _ops_info;
public:
data_sync_repair_task_impl(tasks::task_manager::module_ptr module, repair_uniq_id id, std::string keyspace, std::string type, std::string entity, dht::token_range_vector ranges, std::unordered_map<dht::token_range, repair_neighbors> neighbors, streaming::stream_reason reason, shared_ptr<node_ops_info> ops_info)
: repair_task_impl(module, id.uuid(), id.id, std::move(keyspace), "", std::move(type), std::move(entity), tasks::task_id::create_null_id(), reason)
data_sync_repair_task_impl(tasks::task_manager::module_ptr module, repair_uniq_id id, std::string keyspace, std::string entity, dht::token_range_vector ranges, std::unordered_map<dht::token_range, repair_neighbors> neighbors, streaming::stream_reason reason, shared_ptr<node_ops_info> ops_info)
: repair_task_impl(module, id.uuid(), id.id, std::move(keyspace), "", std::move(entity), tasks::task_id::create_null_id(), reason)
, _ranges(std::move(ranges))
, _neighbors(std::move(neighbors))
, _ops_info(ops_info)
@@ -112,7 +112,6 @@ public:
shard_repair_task_impl(tasks::task_manager::module_ptr module,
tasks::task_id id,
const sstring& keyspace,
std::string type,
std::exception_ptr ex,
repair_service& repair,
locator::effective_replication_map_ptr erm_,

View File

@@ -15,10 +15,9 @@ namespace tasks {
logging::logger tmlogger("task_manager");
task_manager::task::impl::impl(module_ptr module, task_id id, uint64_t sequence_number, std::string keyspace, std::string table, std::string type, std::string entity, task_id parent_id) noexcept
task_manager::task::impl::impl(module_ptr module, task_id id, uint64_t sequence_number, std::string keyspace, std::string table, std::string entity, task_id parent_id) noexcept
: _status({
.id = id,
.type = std::move(type),
.state = task_state::created,
.sequence_number = sequence_number,
.shard = this_shard_id(),
@@ -121,10 +120,6 @@ task_id task_manager::task::get_parent_id() const noexcept {
return _impl->_parent_id;
}
void task_manager::task::set_type(std::string type) noexcept {
_impl->_status.type = std::move(type);
}
void task_manager::task::change_state(task_state state) noexcept {
_impl->_status.state = state;
}

View File

@@ -77,7 +77,6 @@ public:
struct status {
task_id id;
std::string type;
task_state state = task_state::created;
db_clock::time_point start_time;
db_clock::time_point end_time;
@@ -100,7 +99,7 @@ public:
module_ptr _module;
abort_source _as;
public:
impl(module_ptr module, task_id id, uint64_t sequence_number, std::string keyspace, std::string table, std::string type, std::string entity, task_id parent_id) noexcept;
impl(module_ptr module, task_id id, uint64_t sequence_number, std::string keyspace, std::string table, std::string entity, task_id parent_id) noexcept;
virtual ~impl() = default;
virtual std::string type() const = 0;
@@ -128,7 +127,6 @@ public:
status& get_status() noexcept;
uint64_t get_sequence_number() const noexcept;
task_id get_parent_id() const noexcept;
void set_type(std::string type) noexcept;
void change_state(task_state state) noexcept;
void add_child(foreign_task_ptr&& child);
void start();
@@ -171,10 +169,10 @@ public:
public:
template<typename T>
requires std::is_base_of_v<task_manager::task::impl, T>
future<task_id> make_task(unsigned shard, task_id id, std::string keyspace, std::string table, std::string type, std::string entity, task_info parent_d) {
return _tm.container().invoke_on(shard, [id, module = _name, keyspace = std::move(keyspace), table = std::move(table), type = std::move(type), entity = std::move(entity), parent_d] (task_manager& tm) {
future<task_id> make_task(unsigned shard, task_id id, std::string keyspace, std::string table, std::string entity, task_info parent_d) {
return _tm.container().invoke_on(shard, [id, module = _name, keyspace = std::move(keyspace), table = std::move(table), entity = std::move(entity), parent_d] (task_manager& tm) {
auto module_ptr = tm.find_module(module);
auto task_impl_ptr = std::make_unique<T>(module_ptr, id ? id : task_id::create_random_id(), parent_d ? 0 : module_ptr->new_sequence_number(), std::move(keyspace), std::move(table), std::move(type), std::move(entity), parent_d.id);
auto task_impl_ptr = std::make_unique<T>(module_ptr, id ? id : task_id::create_random_id(), parent_d ? 0 : module_ptr->new_sequence_number(), std::move(keyspace), std::move(table), std::move(entity), parent_d.id);
return module_ptr->make_task(std::move(task_impl_ptr), parent_d).then([] (auto task) {
return task->id();
});

View File

@@ -35,8 +35,8 @@ private:
promise<> _finish_run;
bool _finished = false;
public:
test_task_impl(task_manager::module_ptr module, task_id id, uint64_t sequence_number = 0, std::string keyspace = "", std::string table = "", std::string type = "", std::string entity = "", task_id parent_id = task_id::create_null_id()) noexcept
: task_manager::task::impl(module, id, sequence_number, std::move(keyspace), std::move(table), std::move(type), std::move(entity), parent_id)
test_task_impl(task_manager::module_ptr module, task_id id, uint64_t sequence_number = 0, std::string keyspace = "", std::string table = "", std::string entity = "", task_id parent_id = task_id::create_null_id()) noexcept
: task_manager::task::impl(module, id, sequence_number, std::move(keyspace), std::move(table), std::move(entity), parent_id)
{}
virtual std::string type() const override {