|
|
|
|
@@ -564,7 +564,8 @@ repair::shard_repair_task_impl::shard_repair_task_impl(tasks::task_manager::modu
|
|
|
|
|
const std::vector<sstring>& hosts_,
|
|
|
|
|
const std::unordered_set<gms::inet_address>& ignore_nodes_,
|
|
|
|
|
streaming::stream_reason reason_,
|
|
|
|
|
bool hints_batchlog_flushed)
|
|
|
|
|
bool hints_batchlog_flushed,
|
|
|
|
|
std::optional<int> ranges_parallelism)
|
|
|
|
|
: repair_task_impl(module, id, 0, keyspace, "", "", parent_id_.uuid(), reason_)
|
|
|
|
|
, rs(repair)
|
|
|
|
|
, db(repair.get_db())
|
|
|
|
|
@@ -585,7 +586,11 @@ repair::shard_repair_task_impl::shard_repair_task_impl(tasks::task_manager::modu
|
|
|
|
|
, total_rf(erm->get_replication_factor())
|
|
|
|
|
, nr_ranges_total(ranges.size())
|
|
|
|
|
, _hints_batchlog_flushed(std::move(hints_batchlog_flushed))
|
|
|
|
|
{ }
|
|
|
|
|
, _user_ranges_parallelism(ranges_parallelism ? std::optional<semaphore>(semaphore(*ranges_parallelism)) : std::nullopt)
|
|
|
|
|
{
|
|
|
|
|
rlogger.debug("repair[{}]: Setting user_ranges_parallelism to {}", global_repair_id.uuid(),
|
|
|
|
|
_user_ranges_parallelism ? std::to_string(_user_ranges_parallelism->available_units()) : "unlimited");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void repair::shard_repair_task_impl::check_failed_ranges() {
|
|
|
|
|
rlogger.info("repair[{}]: stats: repair_reason={}, keyspace={}, tables={}, ranges_nr={}, {}",
|
|
|
|
|
@@ -795,6 +800,8 @@ struct repair_options {
|
|
|
|
|
// repair to a data center other than the named one returns an error.
|
|
|
|
|
std::vector<sstring> data_centers;
|
|
|
|
|
|
|
|
|
|
int ranges_parallelism = -1;
|
|
|
|
|
|
|
|
|
|
repair_options(std::unordered_map<sstring, sstring> options) {
|
|
|
|
|
bool_opt(primary_range, options, PRIMARY_RANGE_KEY);
|
|
|
|
|
ranges_opt(ranges, options, RANGES_KEY);
|
|
|
|
|
@@ -830,6 +837,8 @@ struct repair_options {
|
|
|
|
|
int job_threads;
|
|
|
|
|
int_opt(job_threads, options, JOB_THREADS_KEY);
|
|
|
|
|
|
|
|
|
|
int_opt(ranges_parallelism, options, RANGES_PARALLELISM_KEY);
|
|
|
|
|
|
|
|
|
|
// The parsing code above removed from the map options we have parsed.
|
|
|
|
|
// If anything is left there in the end, it's an unsupported option.
|
|
|
|
|
if (!options.empty()) {
|
|
|
|
|
@@ -850,6 +859,7 @@ struct repair_options {
|
|
|
|
|
static constexpr const char* TRACE_KEY = "trace";
|
|
|
|
|
static constexpr const char* START_TOKEN = "startToken";
|
|
|
|
|
static constexpr const char* END_TOKEN = "endToken";
|
|
|
|
|
static constexpr const char* RANGES_PARALLELISM_KEY = "ranges_parallelism";
|
|
|
|
|
|
|
|
|
|
// Settings of "parallelism" option. Numbers must match Cassandra's
|
|
|
|
|
// RepairParallelism enum, which is used by the caller.
|
|
|
|
|
@@ -950,33 +960,34 @@ future<> repair::shard_repair_task_impl::do_repair_ranges() {
|
|
|
|
|
// repair all the ranges in limited parallelism
|
|
|
|
|
rlogger.info("repair[{}]: Started to repair {} out of {} tables in keyspace={}, table={}, table_id={}, repair_reason={}",
|
|
|
|
|
global_repair_id.uuid(), idx + 1, table_ids.size(), _status.keyspace, table_name, table_id, _reason);
|
|
|
|
|
co_await coroutine::parallel_for_each(ranges, [this, table_id] (auto&& range) {
|
|
|
|
|
return with_semaphore(rs.get_repair_module().range_parallelism_semaphore(), 1, [this, &range, table_id] {
|
|
|
|
|
return repair_range(range, table_id).then([this] {
|
|
|
|
|
if (_reason == streaming::stream_reason::bootstrap) {
|
|
|
|
|
rs.get_metrics().bootstrap_finished_ranges++;
|
|
|
|
|
} else if (_reason == streaming::stream_reason::replace) {
|
|
|
|
|
rs.get_metrics().replace_finished_ranges++;
|
|
|
|
|
} else if (_reason == streaming::stream_reason::rebuild) {
|
|
|
|
|
rs.get_metrics().rebuild_finished_ranges++;
|
|
|
|
|
} else if (_reason == streaming::stream_reason::decommission) {
|
|
|
|
|
rs.get_metrics().decommission_finished_ranges++;
|
|
|
|
|
} else if (_reason == streaming::stream_reason::removenode) {
|
|
|
|
|
rs.get_metrics().removenode_finished_ranges++;
|
|
|
|
|
} else if (_reason == streaming::stream_reason::repair) {
|
|
|
|
|
rs.get_metrics().repair_finished_ranges_sum++;
|
|
|
|
|
nr_ranges_finished++;
|
|
|
|
|
}
|
|
|
|
|
rlogger.debug("repair[{}]: node ops progress bootstrap={}, replace={}, rebuild={}, decommission={}, removenode={}, repair={}",
|
|
|
|
|
global_repair_id.uuid(),
|
|
|
|
|
rs.get_metrics().bootstrap_finished_percentage(),
|
|
|
|
|
rs.get_metrics().replace_finished_percentage(),
|
|
|
|
|
rs.get_metrics().rebuild_finished_percentage(),
|
|
|
|
|
rs.get_metrics().decommission_finished_percentage(),
|
|
|
|
|
rs.get_metrics().removenode_finished_percentage(),
|
|
|
|
|
rs.get_metrics().repair_finished_percentage());
|
|
|
|
|
});
|
|
|
|
|
});
|
|
|
|
|
co_await coroutine::parallel_for_each(ranges, [this, table_id] (auto&& range) -> future<> {
|
|
|
|
|
// Get the system range parallelism
|
|
|
|
|
auto permit = co_await seastar::get_units(rs.get_repair_module().range_parallelism_semaphore(), 1);
|
|
|
|
|
// Get the range parallelism specified by user
|
|
|
|
|
auto user_permit = _user_ranges_parallelism ? co_await seastar::get_units(*_user_ranges_parallelism, 1) : semaphore_units<>();
|
|
|
|
|
co_await repair_range(range, table_id);
|
|
|
|
|
if (_reason == streaming::stream_reason::bootstrap) {
|
|
|
|
|
rs.get_metrics().bootstrap_finished_ranges++;
|
|
|
|
|
} else if (_reason == streaming::stream_reason::replace) {
|
|
|
|
|
rs.get_metrics().replace_finished_ranges++;
|
|
|
|
|
} else if (_reason == streaming::stream_reason::rebuild) {
|
|
|
|
|
rs.get_metrics().rebuild_finished_ranges++;
|
|
|
|
|
} else if (_reason == streaming::stream_reason::decommission) {
|
|
|
|
|
rs.get_metrics().decommission_finished_ranges++;
|
|
|
|
|
} else if (_reason == streaming::stream_reason::removenode) {
|
|
|
|
|
rs.get_metrics().removenode_finished_ranges++;
|
|
|
|
|
} else if (_reason == streaming::stream_reason::repair) {
|
|
|
|
|
rs.get_metrics().repair_finished_ranges_sum++;
|
|
|
|
|
nr_ranges_finished++;
|
|
|
|
|
}
|
|
|
|
|
rlogger.debug("repair[{}]: node ops progress bootstrap={}, replace={}, rebuild={}, decommission={}, removenode={}, repair={}",
|
|
|
|
|
global_repair_id.uuid(),
|
|
|
|
|
rs.get_metrics().bootstrap_finished_percentage(),
|
|
|
|
|
rs.get_metrics().replace_finished_percentage(),
|
|
|
|
|
rs.get_metrics().rebuild_finished_percentage(),
|
|
|
|
|
rs.get_metrics().decommission_finished_percentage(),
|
|
|
|
|
rs.get_metrics().removenode_finished_percentage(),
|
|
|
|
|
rs.get_metrics().repair_finished_percentage());
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
if (_reason != streaming::stream_reason::repair) {
|
|
|
|
|
@@ -1130,7 +1141,8 @@ future<int> repair_service::do_repair_start(sstring keyspace, std::unordered_map
|
|
|
|
|
co_return id.id;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
auto task = co_await _repair_module->make_and_start_task<repair::user_requested_repair_task_impl>({}, id, std::move(keyspace), "", germs, std::move(cfs), std::move(ranges), std::move(options.hosts), std::move(options.data_centers), std::move(ignore_nodes));
|
|
|
|
|
auto ranges_parallelism = options.ranges_parallelism == -1 ? std::nullopt : std::optional<int>(options.ranges_parallelism);
|
|
|
|
|
auto task = co_await _repair_module->make_and_start_task<repair::user_requested_repair_task_impl>({}, id, std::move(keyspace), "", germs, std::move(cfs), std::move(ranges), std::move(options.hosts), std::move(options.data_centers), std::move(ignore_nodes), ranges_parallelism);
|
|
|
|
|
co_return id.id;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -1237,13 +1249,14 @@ future<> repair::user_requested_repair_task_impl::run() {
|
|
|
|
|
throw std::runtime_error("aborted by user request");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
auto ranges_parallelism = _ranges_parallelism;
|
|
|
|
|
for (auto shard : boost::irange(unsigned(0), smp::count)) {
|
|
|
|
|
auto f = rs.container().invoke_on(shard, [keyspace, table_ids, id, ranges, hints_batchlog_flushed,
|
|
|
|
|
auto f = rs.container().invoke_on(shard, [keyspace, table_ids, id, ranges, hints_batchlog_flushed, ranges_parallelism,
|
|
|
|
|
data_centers, hosts, ignore_nodes, parent_data = get_repair_uniq_id().task_info, germs] (repair_service& local_repair) mutable -> future<> {
|
|
|
|
|
local_repair.get_metrics().repair_total_ranges_sum += ranges.size();
|
|
|
|
|
auto task = co_await local_repair._repair_module->make_and_start_task<repair::shard_repair_task_impl>(parent_data, tasks::task_id::create_random_id(), keyspace,
|
|
|
|
|
local_repair, germs->get().shared_from_this(), std::move(ranges), std::move(table_ids),
|
|
|
|
|
id, std::move(data_centers), std::move(hosts), std::move(ignore_nodes), streaming::stream_reason::repair, hints_batchlog_flushed);
|
|
|
|
|
id, std::move(data_centers), std::move(hosts), std::move(ignore_nodes), streaming::stream_reason::repair, hints_batchlog_flushed, ranges_parallelism);
|
|
|
|
|
co_await task->done();
|
|
|
|
|
});
|
|
|
|
|
repair_results.push_back(std::move(f));
|
|
|
|
|
@@ -1347,9 +1360,10 @@ future<> repair::data_sync_repair_task_impl::run() {
|
|
|
|
|
auto hosts = std::vector<sstring>();
|
|
|
|
|
auto ignore_nodes = std::unordered_set<gms::inet_address>();
|
|
|
|
|
bool hints_batchlog_flushed = false;
|
|
|
|
|
auto ranges_parallelism = std::nullopt;
|
|
|
|
|
auto task_impl_ptr = seastar::make_shared<repair::shard_repair_task_impl>(local_repair._repair_module, tasks::task_id::create_random_id(), keyspace,
|
|
|
|
|
local_repair, germs->get().shared_from_this(), std::move(ranges), std::move(table_ids),
|
|
|
|
|
id, std::move(data_centers), std::move(hosts), std::move(ignore_nodes), reason, hints_batchlog_flushed);
|
|
|
|
|
id, std::move(data_centers), std::move(hosts), std::move(ignore_nodes), reason, hints_batchlog_flushed, ranges_parallelism);
|
|
|
|
|
task_impl_ptr->neighbors = std::move(neighbors);
|
|
|
|
|
auto task = co_await local_repair._repair_module->make_task(std::move(task_impl_ptr), parent_data);
|
|
|
|
|
task->start();
|
|
|
|
|
|