repair: change type of repair_module::_repairs
As a preparation to replacing repair_info with shard_repair_task_impl, type of _repairs in repair module is changed from std::unordered_map<int, lw_shared_ptr<repair_info>> to std::unordered_map<int, tasks::task_id>.
This commit is contained in:
@@ -410,18 +410,21 @@ void repair_module::check_in_shutdown() {
|
||||
abort_source().check();
|
||||
}
|
||||
|
||||
void repair_module::add_repair_info(int id, lw_shared_ptr<repair_info> ri) {
|
||||
_repairs.emplace(id, ri);
|
||||
void repair_module::add_repair_info(int id, tasks::task_id uuid) {
|
||||
_repairs.emplace(id, uuid);
|
||||
}
|
||||
|
||||
void repair_module::remove_repair_info(int id) {
|
||||
_repairs.erase(id);
|
||||
}
|
||||
|
||||
lw_shared_ptr<repair_info> repair_module::get_repair_info(int id) {
|
||||
tasks::task_manager::task_ptr repair_module::get_repair_info(int id) {
|
||||
auto it = _repairs.find(id);
|
||||
if (it != _repairs.end()) {
|
||||
return it->second;
|
||||
auto task_it = _tasks.find(it->second);
|
||||
if (task_it != _tasks.end()) {
|
||||
return task_it->second;
|
||||
}
|
||||
}
|
||||
return {};
|
||||
}
|
||||
@@ -455,8 +458,11 @@ bool repair_module::is_aborted(const tasks::task_id& uuid) {
|
||||
void repair_module::abort_all_repairs() {
|
||||
_aborted_pending_repairs = _pending_repairs;
|
||||
for (auto& x : _repairs) {
|
||||
auto& ri = x.second;
|
||||
ri->abort_repair_info();
|
||||
auto it = _tasks.find(x.second);
|
||||
if (it != _tasks.end()) {
|
||||
auto& impl = dynamic_cast<shard_repair_task_impl&>(*it->second->_impl);
|
||||
impl.get_repair_info()->abort_repair_info();
|
||||
}
|
||||
}
|
||||
rlogger.info0("Aborted {} repair job(s), aborted={}", _aborted_pending_repairs.size(), _aborted_pending_repairs);
|
||||
}
|
||||
@@ -465,10 +471,13 @@ float repair_module::report_progress(streaming::stream_reason reason) {
|
||||
uint64_t nr_ranges_finished = 0;
|
||||
uint64_t nr_ranges_total = 0;
|
||||
for (auto& x : _repairs) {
|
||||
auto& ri = x.second;
|
||||
if (ri->reason == reason) {
|
||||
nr_ranges_total += ri->ranges_size();
|
||||
nr_ranges_finished += ri->nr_ranges_finished;
|
||||
auto it = _tasks.find(x.second);
|
||||
if (it != _tasks.end()) {
|
||||
auto& impl = dynamic_cast<shard_repair_task_impl&>(*it->second->_impl);
|
||||
if (impl.get_repair_info()->reason == reason) {
|
||||
nr_ranges_total += impl.get_repair_info()->ranges_size();
|
||||
nr_ranges_finished += impl.get_repair_info()->nr_ranges_finished;
|
||||
}
|
||||
}
|
||||
}
|
||||
return nr_ranges_total == 0 ? 1 : float(nr_ranges_finished) / float(nr_ranges_total);
|
||||
@@ -1009,13 +1018,13 @@ future<> shard_repair_task_impl::run() {
|
||||
}
|
||||
|
||||
auto& ri = _ri;
|
||||
ri->rs.get_repair_module().add_repair_info(ri->id.id, ri);
|
||||
ri->rs.get_repair_module().add_repair_info(ri->id.id, _status.id);
|
||||
return do_repair_ranges().then([ri] {
|
||||
ri->check_failed_ranges();
|
||||
ri->rs.get_repair_module().remove_repair_info(ri->id.id);
|
||||
return make_ready_future<>();
|
||||
}).handle_exception([ri] (std::exception_ptr eptr) {
|
||||
ri->rs.get_repair_module().remove_repair_info(ri->id.id);
|
||||
}).handle_exception([this, ri] (std::exception_ptr eptr) {
|
||||
ri->rs.get_repair_module().remove_repair_info(_status.sequence_number);
|
||||
return make_exception_future<>(std::move(eptr));
|
||||
});
|
||||
}
|
||||
|
||||
@@ -129,7 +129,7 @@ private:
|
||||
// but aren't listed as running or failed the status map.
|
||||
std::unordered_map<int, repair_status> _status;
|
||||
// Map repair id into repair_info.
|
||||
std::unordered_map<int, lw_shared_ptr<repair_info>> _repairs;
|
||||
std::unordered_map<int, tasks::task_id> _repairs;
|
||||
std::unordered_set<tasks::task_id> _pending_repairs;
|
||||
std::unordered_set<tasks::task_id> _aborted_pending_repairs;
|
||||
// The semaphore used to control the maximum
|
||||
@@ -156,9 +156,9 @@ public:
|
||||
|
||||
repair_status get(int id) const;
|
||||
void check_in_shutdown();
|
||||
void add_repair_info(int id, lw_shared_ptr<repair_info> ri);
|
||||
void add_repair_info(int id, tasks::task_id ri);
|
||||
void remove_repair_info(int id);
|
||||
lw_shared_ptr<repair_info> get_repair_info(int id);
|
||||
tasks::task_manager::task_ptr get_repair_info(int id);
|
||||
std::vector<int> get_active() const;
|
||||
size_t nr_running_repair_jobs();
|
||||
void abort_all_repairs();
|
||||
|
||||
@@ -20,6 +20,8 @@
|
||||
#include "utils/serialized_action.hh"
|
||||
#include "utils/updateable_value.hh"
|
||||
|
||||
class repair_module;
|
||||
|
||||
namespace tasks {
|
||||
|
||||
using is_abortable = bool_class <struct abortable_tag>;
|
||||
@@ -282,6 +284,7 @@ public:
|
||||
}
|
||||
|
||||
friend class test_task;
|
||||
friend class ::repair_module;
|
||||
};
|
||||
|
||||
class module : public enable_shared_from_this<module> {
|
||||
|
||||
Reference in New Issue
Block a user