tasks: keep virtual tasks in task manager
Virtual tasks are kept in task manager together with regular tasks. All virtual tasks are stored on shard 0. task_manager::module::make_task is modified to consider virtual tasks as possible parents.
This commit is contained in:
@@ -129,7 +129,7 @@ void set_task_manager(http_context& ctx, routes& r, sharded<tasks::task_manager>
|
||||
} catch (...) {
|
||||
throw bad_param_exception(fmt::format("{}", std::current_exception()));
|
||||
}
|
||||
const auto& filtered_tasks = module->get_tasks() | boost::adaptors::filtered([¶ms = req->query_parameters, internal] (const auto& task) {
|
||||
const auto& filtered_tasks = module->get_local_tasks() | boost::adaptors::filtered([¶ms = req->query_parameters, internal] (const auto& task) {
|
||||
return (internal || !task.second->is_internal()) && filter_tasks(task.second, params);
|
||||
});
|
||||
for (auto& [task_id, task] : filtered_tasks) {
|
||||
|
||||
@@ -61,8 +61,8 @@ void set_task_manager_test(http_context& ctx, routes& r, sharded<tasks::task_man
|
||||
auto module = tms.local().find_module("test");
|
||||
id = co_await module->make_task<tasks::test_task_impl>(shard, id, keyspace, table, entity, data);
|
||||
co_await tms.invoke_on(shard, [id] (tasks::task_manager& tm) {
|
||||
auto it = tm.get_all_tasks().find(id);
|
||||
if (it != tm.get_all_tasks().end()) {
|
||||
auto it = tm.get_local_tasks().find(id);
|
||||
if (it != tm.get_local_tasks().end()) {
|
||||
it->second->start();
|
||||
}
|
||||
});
|
||||
|
||||
@@ -503,8 +503,8 @@ void repair::task_manager_module::remove_shard_task_id(int id) {
|
||||
tasks::task_manager::task_ptr repair::task_manager_module::get_shard_task_ptr(int id) {
|
||||
auto it = _repairs.find(id);
|
||||
if (it != _repairs.end()) {
|
||||
auto task_it = _tasks.find(it->second);
|
||||
if (task_it != _tasks.end()) {
|
||||
auto task_it = get_local_tasks().find(it->second);
|
||||
if (task_it != get_local_tasks().end()) {
|
||||
return task_it->second;
|
||||
}
|
||||
}
|
||||
@@ -540,8 +540,8 @@ bool repair::task_manager_module::is_aborted(const tasks::task_id& uuid) {
|
||||
void repair::task_manager_module::abort_all_repairs() {
|
||||
_aborted_pending_repairs = _pending_repairs;
|
||||
for (auto& x : _repairs) {
|
||||
auto it = _tasks.find(x.second);
|
||||
if (it != _tasks.end()) {
|
||||
auto it = get_local_tasks().find(x.second);
|
||||
if (it != get_local_tasks().end()) {
|
||||
auto& impl = dynamic_cast<repair::shard_repair_task_impl&>(*it->second->_impl);
|
||||
// If the task is aborted, its state will change to failed. One can wait for this with task_manager::task::done().
|
||||
impl.abort();
|
||||
@@ -554,8 +554,8 @@ float repair::task_manager_module::report_progress() {
|
||||
uint64_t nr_ranges_finished = 0;
|
||||
uint64_t nr_ranges_total = 0;
|
||||
for (auto& x : _repairs) {
|
||||
auto it = _tasks.find(x.second);
|
||||
if (it != _tasks.end()) {
|
||||
auto it = get_local_tasks().find(x.second);
|
||||
if (it != get_local_tasks().end()) {
|
||||
auto& impl = dynamic_cast<repair::shard_repair_task_impl&>(*it->second->_impl);
|
||||
if (impl.reason() == streaming::stream_reason::repair) {
|
||||
nr_ranges_total += impl.ranges_size();
|
||||
|
||||
@@ -157,7 +157,7 @@ static future<> abort_children(task_manager::module_ptr module, task_id parent_i
|
||||
module->async_gate().leave();
|
||||
});
|
||||
co_await module->get_task_manager().container().invoke_on_all([parent_id] (task_manager& tm) {
|
||||
for (auto& task : tm.get_all_tasks()) {
|
||||
for (auto& task : tm.get_local_tasks()) {
|
||||
if (task.second->get_parent_id() == parent_id) {
|
||||
task.second->abort();
|
||||
}
|
||||
@@ -458,26 +458,54 @@ const std::string& task_manager::module::get_name() const noexcept {
|
||||
return _name;
|
||||
}
|
||||
|
||||
task_manager::task_map& task_manager::module::get_tasks() noexcept {
|
||||
task_manager::task_map& task_manager::module::get_local_tasks() noexcept {
|
||||
return _tasks._local_tasks;
|
||||
}
|
||||
|
||||
const task_manager::task_map& task_manager::module::get_local_tasks() const noexcept {
|
||||
return _tasks._local_tasks;
|
||||
}
|
||||
|
||||
task_manager::virtual_task_map& task_manager::module::get_virtual_tasks() noexcept {
|
||||
return _tasks._virtual_tasks;
|
||||
}
|
||||
|
||||
const task_manager::virtual_task_map& task_manager::module::get_virtual_tasks() const noexcept {
|
||||
return _tasks._virtual_tasks;
|
||||
}
|
||||
|
||||
task_manager::tasks_collection& task_manager::module::get_tasks_collection() noexcept {
|
||||
return _tasks;
|
||||
}
|
||||
|
||||
const task_manager::task_map& task_manager::module::get_tasks() const noexcept {
|
||||
const task_manager::tasks_collection& task_manager::module::get_tasks_collection() const noexcept {
|
||||
return _tasks;
|
||||
}
|
||||
|
||||
void task_manager::module::register_task(task_ptr task) {
|
||||
_tasks[task->id()] = task;
|
||||
get_local_tasks()[task->id()] = task;
|
||||
try {
|
||||
_tm.register_task(task);
|
||||
} catch (...) {
|
||||
_tasks.erase(task->id());
|
||||
get_local_tasks().erase(task->id());
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
void task_manager::module::register_virtual_task(virtual_task_ptr task) {
|
||||
assert(this_shard_id() == 0);
|
||||
auto group = task->get_group();
|
||||
get_virtual_tasks()[group] = task;
|
||||
try {
|
||||
_tm.register_virtual_task(task);
|
||||
} catch (...) {
|
||||
get_virtual_tasks().erase(group);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
void task_manager::module::unregister_task(task_id id) noexcept {
|
||||
_tasks.erase(id);
|
||||
get_local_tasks().erase(id);
|
||||
_tm.unregister_task(id);
|
||||
}
|
||||
|
||||
@@ -485,6 +513,12 @@ future<> task_manager::module::stop() noexcept {
|
||||
tmlogger.info("Stopping module {}", _name);
|
||||
abort_source().request_abort();
|
||||
co_await _gate.close();
|
||||
if (this_shard_id() == 0) {
|
||||
for (auto& [group, _]: _tasks._virtual_tasks) {
|
||||
_tm.unregister_virtual_task(group);
|
||||
}
|
||||
_tasks._virtual_tasks = {};
|
||||
}
|
||||
_tm.unregister_module(_name);
|
||||
}
|
||||
|
||||
@@ -492,16 +526,23 @@ future<task_manager::task_ptr> task_manager::module::make_task(task::task_impl_p
|
||||
auto task = make_lw_shared<task_manager::task>(std::move(task_impl_ptr), async_gate().hold());
|
||||
bool abort = false;
|
||||
if (parent_d) {
|
||||
task->get_status().sequence_number = co_await _tm.container().invoke_on(parent_d.shard, coroutine::lambda([id = parent_d.id, task = make_foreign(task), &abort] (task_manager& tm) mutable -> future<uint64_t> {
|
||||
const auto& all_tasks = tm.get_all_tasks();
|
||||
// Regular task as a parent.
|
||||
auto sequence_number = co_await _tm.container().invoke_on(parent_d.shard, coroutine::lambda([id = parent_d.id, task = make_foreign(task), &abort] (task_manager& tm) mutable -> future<std::optional<uint64_t>> {
|
||||
const auto& all_tasks = tm.get_local_tasks();
|
||||
if (auto it = all_tasks.find(id); it != all_tasks.end()) {
|
||||
co_await it->second->add_child(std::move(task));
|
||||
abort = it->second->abort_requested();
|
||||
co_return it->second->get_sequence_number();
|
||||
} else {
|
||||
throw task_manager::task_not_found(id);
|
||||
}
|
||||
co_return std::nullopt;
|
||||
}));
|
||||
|
||||
if (sequence_number) {
|
||||
task->get_status().sequence_number = sequence_number.value();
|
||||
} else { // Virtual task as a parent.
|
||||
sequence_number = new_sequence_number();
|
||||
task->set_virtual_parent();
|
||||
}
|
||||
}
|
||||
if (abort) {
|
||||
task->abort();
|
||||
@@ -534,12 +575,28 @@ const task_manager::modules& task_manager::get_modules() const noexcept {
|
||||
return _modules;
|
||||
}
|
||||
|
||||
task_manager::task_map& task_manager::get_all_tasks() noexcept {
|
||||
return _all_tasks;
|
||||
task_manager::task_map& task_manager::get_local_tasks() noexcept {
|
||||
return _tasks._local_tasks;
|
||||
}
|
||||
|
||||
const task_manager::task_map& task_manager::get_all_tasks() const noexcept {
|
||||
return _all_tasks;
|
||||
const task_manager::task_map& task_manager::get_local_tasks() const noexcept {
|
||||
return _tasks._local_tasks;
|
||||
}
|
||||
|
||||
task_manager::virtual_task_map& task_manager::get_virtual_tasks() noexcept {
|
||||
return _tasks._virtual_tasks;
|
||||
}
|
||||
|
||||
const task_manager::virtual_task_map& task_manager::get_virtual_tasks() const noexcept {
|
||||
return _tasks._virtual_tasks;
|
||||
}
|
||||
|
||||
task_manager::tasks_collection& task_manager::get_tasks_collection() noexcept {
|
||||
return _tasks;
|
||||
}
|
||||
|
||||
const task_manager::tasks_collection& task_manager::get_tasks_collection() const noexcept {
|
||||
return _tasks;
|
||||
}
|
||||
|
||||
task_manager::module_ptr task_manager::make_module(std::string name) {
|
||||
@@ -595,11 +652,19 @@ void task_manager::unregister_module(std::string name) noexcept {
|
||||
}
|
||||
|
||||
void task_manager::register_task(task_ptr task) {
|
||||
_all_tasks[task->id()] = task;
|
||||
_tasks._local_tasks[task->id()] = task;
|
||||
}
|
||||
|
||||
void task_manager::register_virtual_task(virtual_task_ptr task) {
|
||||
_tasks._virtual_tasks[task->get_group()] = task;
|
||||
}
|
||||
|
||||
void task_manager::unregister_task(task_id id) noexcept {
|
||||
_all_tasks.erase(id);
|
||||
_tasks._local_tasks.erase(id);
|
||||
}
|
||||
|
||||
void task_manager::unregister_virtual_task(task_group group) noexcept {
|
||||
_tasks._virtual_tasks.erase(group);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -63,12 +63,18 @@ public:
|
||||
using task_ptr = lw_shared_ptr<task_manager::task>;
|
||||
using virtual_task_ptr = lw_shared_ptr<task_manager::virtual_task>;
|
||||
using task_map = std::unordered_map<task_id, task_ptr>;
|
||||
using virtual_task_map = std::unordered_map<task_group, virtual_task_ptr>;
|
||||
using foreign_task_ptr = foreign_ptr<task_ptr>;
|
||||
using foreign_task_map = std::unordered_map<task_id, foreign_task_ptr>;
|
||||
using module_ptr = shared_ptr<module>;
|
||||
using modules = std::unordered_map<std::string, module_ptr>;
|
||||
|
||||
struct tasks_collection {
|
||||
task_map _local_tasks;
|
||||
virtual_task_map _virtual_tasks;
|
||||
};
|
||||
private:
|
||||
task_map _all_tasks;
|
||||
tasks_collection _tasks;
|
||||
modules _modules;
|
||||
config _cfg;
|
||||
seastar::abort_source _as;
|
||||
@@ -292,7 +298,7 @@ public:
|
||||
protected:
|
||||
task_manager& _tm;
|
||||
std::string _name;
|
||||
task_map _tasks;
|
||||
tasks_collection _tasks;
|
||||
gate _gate;
|
||||
uint64_t _sequence_number = 0;
|
||||
private:
|
||||
@@ -307,10 +313,15 @@ public:
|
||||
seastar::abort_source& abort_source() noexcept;
|
||||
gate& async_gate() noexcept;
|
||||
const std::string& get_name() const noexcept;
|
||||
task_manager::task_map& get_tasks() noexcept;
|
||||
const task_manager::task_map& get_tasks() const noexcept;
|
||||
task_manager::task_map& get_local_tasks() noexcept;
|
||||
const task_manager::task_map& get_local_tasks() const noexcept;
|
||||
task_manager::virtual_task_map& get_virtual_tasks() noexcept;
|
||||
const task_manager::virtual_task_map& get_virtual_tasks() const noexcept;
|
||||
tasks_collection& get_tasks_collection() noexcept;
|
||||
const tasks_collection& get_tasks_collection() const noexcept;
|
||||
|
||||
void register_task(task_ptr task);
|
||||
void register_virtual_task(virtual_task_ptr task);
|
||||
void unregister_task(task_id id) noexcept;
|
||||
virtual future<> stop() noexcept;
|
||||
public:
|
||||
@@ -343,6 +354,18 @@ public:
|
||||
task->start();
|
||||
co_return task;
|
||||
}
|
||||
|
||||
// Must be called on target shard.
|
||||
template<typename VirtualTaskImpl, typename... Args>
|
||||
requires std::is_base_of_v<virtual_task::impl, VirtualTaskImpl> &&
|
||||
requires (module_ptr module, Args&&... args) {
|
||||
{VirtualTaskImpl(module, std::forward<Args>(args)...)} -> std::same_as<VirtualTaskImpl>;
|
||||
}
|
||||
void make_virtual_task(Args&&... args) {
|
||||
auto virtual_task_impl_ptr = std::make_unique<VirtualTaskImpl>(shared_from_this(), std::forward<Args>(args)...);
|
||||
auto vt = make_lw_shared<virtual_task>(std::move(virtual_task_impl_ptr));
|
||||
register_virtual_task(std::move(vt));
|
||||
}
|
||||
};
|
||||
public:
|
||||
task_manager(config cfg, seastar::abort_source& as) noexcept;
|
||||
@@ -350,8 +373,13 @@ public:
|
||||
|
||||
modules& get_modules() noexcept;
|
||||
const modules& get_modules() const noexcept;
|
||||
task_map& get_all_tasks() noexcept;
|
||||
const task_map& get_all_tasks() const noexcept;
|
||||
task_map& get_local_tasks() noexcept;
|
||||
const task_map& get_local_tasks() const noexcept;
|
||||
virtual_task_map& get_virtual_tasks() noexcept;
|
||||
const virtual_task_map& get_virtual_tasks() const noexcept;
|
||||
tasks_collection& get_tasks_collection() noexcept;
|
||||
const tasks_collection& get_tasks_collection() const noexcept;
|
||||
future<std::vector<task_id>> get_virtual_task_children(task_id parent_id);
|
||||
|
||||
module_ptr make_module(std::string name);
|
||||
void register_module(std::string name, module_ptr module);
|
||||
@@ -365,7 +393,7 @@ public:
|
||||
std::optional<T> res;
|
||||
co_await coroutine::parallel_for_each(boost::irange(0u, smp::count), [&tm, id, &res, &func] (unsigned shard) -> future<> {
|
||||
auto local_res = co_await tm.invoke_on(shard, [id, func] (const task_manager& local_tm) -> future<std::optional<T>> {
|
||||
const auto& all_tasks = local_tm.get_all_tasks();
|
||||
const auto& all_tasks = local_tm.get_local_tasks();
|
||||
if (auto it = all_tasks.find(id); it != all_tasks.end()) {
|
||||
co_return co_await func(it->second);
|
||||
}
|
||||
@@ -394,7 +422,9 @@ private:
|
||||
protected:
|
||||
void unregister_module(std::string name) noexcept;
|
||||
void register_task(task_ptr task);
|
||||
void register_virtual_task(virtual_task_ptr task);
|
||||
void unregister_task(task_id id) noexcept;
|
||||
void unregister_virtual_task(task_group group) noexcept;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user