repair: add shard param to task_manager_module::is_aborted
Currently, task_manager_module::is_aborted checks whether a task with given id was aborted on this shard. In tablet_repair_task_impl::run, is_aborted method is called on all shards to check if the parent task was aborted. However, even for aborted parent, is_aborted will return true only on owner shard of the parent. Pass shard param to task_manager_module::is_aborted that indicates which shard to check.
This commit is contained in:
@@ -547,9 +547,11 @@ size_t repair::task_manager_module::nr_running_repair_jobs() {
|
||||
return count;
|
||||
}
|
||||
|
||||
bool repair::task_manager_module::is_aborted(const tasks::task_id& uuid) {
|
||||
auto it = get_local_tasks().find(uuid);
|
||||
return it != get_local_tasks().end() && it->second->abort_requested();
|
||||
future<bool> repair::task_manager_module::is_aborted(const tasks::task_id& uuid, shard_id shard) {
|
||||
return smp::submit_to(shard, [&] () {
|
||||
auto it = get_local_tasks().find(uuid);
|
||||
return it != get_local_tasks().end() && it->second->abort_requested();
|
||||
});
|
||||
}
|
||||
|
||||
void repair::task_manager_module::abort_all_repairs() {
|
||||
@@ -2442,8 +2444,8 @@ future<> repair::tablet_repair_task_impl::run() {
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
rs.container().invoke_on_all([&idx, id, metas = _metas, parent_data, reason = _reason, tables = _tables, ranges_parallelism = _ranges_parallelism] (repair_service& rs) -> future<> {
|
||||
auto parent_shard = this_shard_id();
|
||||
rs.container().invoke_on_all([&idx, id, metas = _metas, parent_data, reason = _reason, tables = _tables, ranges_parallelism = _ranges_parallelism, parent_shard] (repair_service& rs) -> future<> {
|
||||
std::exception_ptr error;
|
||||
for (auto& m : metas) {
|
||||
if (m.master_shard_id != this_shard_id()) {
|
||||
@@ -2458,7 +2460,7 @@ future<> repair::tablet_repair_task_impl::run() {
|
||||
continue;
|
||||
}
|
||||
auto erm = t->get_effective_replication_map();
|
||||
if (rs.get_repair_module().is_aborted(id.uuid())) {
|
||||
if (co_await rs.get_repair_module().is_aborted(id.uuid(), parent_shard)) {
|
||||
throw abort_requested_exception();
|
||||
}
|
||||
|
||||
|
||||
@@ -256,7 +256,7 @@ public:
|
||||
future<> run(repair_uniq_id id, std::function<void ()> func);
|
||||
future<repair_status> repair_await_completion(int id, std::chrono::steady_clock::time_point timeout);
|
||||
float report_progress();
|
||||
bool is_aborted(const tasks::task_id& uuid);
|
||||
future<bool> is_aborted(const tasks::task_id& uuid, shard_id shard);
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user