token_metadata: Handle affected_ranges with do_for_each

affected_ranges can be very large in a large cluster or node with big
num_tokens account. calculate_natural_endpoints takes more time to
process in this case as well.

Futurize calculate_pending_ranges_for_leaving and handle the loop with
do_for_each to give some time for the reactor to breath, so it does not
block.
This commit is contained in:
Asias He
2018-02-09 17:17:35 +08:00
parent 60143a7517
commit c17ce79977
2 changed files with 34 additions and 29 deletions

View File

@@ -458,7 +458,7 @@ token_metadata::get_pending_ranges(sstring keyspace_name, inet_address endpoint)
return ret;
}
void token_metadata::calculate_pending_ranges_for_leaving(
future<> token_metadata::calculate_pending_ranges_for_leaving(
abstract_replication_strategy& strategy,
lw_shared_ptr<std::unordered_multimap<range<token>, inet_address>> new_pending_ranges,
lw_shared_ptr<token_metadata> all_left_metadata) {
@@ -474,21 +474,25 @@ void token_metadata::calculate_pending_ranges_for_leaving(
// for each of those ranges, find what new nodes will be responsible for the range when
// all leaving nodes are gone.
auto metadata = clone_only_token_map(); // don't do this in the loop! #7758
tlogger.debug("In calculate_pending_ranges: affected_ranges.size={} stars", affected_ranges.size());
for (const auto& r : affected_ranges) {
auto t = r.end() ? r.end()->value() : dht::maximum_token();
auto current_endpoints = strategy.calculate_natural_endpoints(t, metadata);
auto new_endpoints = strategy.calculate_natural_endpoints(t, *all_left_metadata);
std::vector<inet_address> diff;
std::sort(current_endpoints.begin(), current_endpoints.end());
std::sort(new_endpoints.begin(), new_endpoints.end());
std::set_difference(new_endpoints.begin(), new_endpoints.end(),
current_endpoints.begin(), current_endpoints.end(), std::back_inserter(diff));
for (auto& ep : diff) {
new_pending_ranges->emplace(r, ep);
}
}
tlogger.debug("In calculate_pending_ranges: affected_ranges.size={} ends", affected_ranges.size());
auto affected_ranges_size = affected_ranges.size();
tlogger.debug("In calculate_pending_ranges: affected_ranges.size={} stars", affected_ranges_size);
return do_with(std::move(metadata), std::move(affected_ranges), [&strategy, new_pending_ranges, all_left_metadata, affected_ranges_size] (auto& metadata, auto& affected_ranges) {
return do_for_each(affected_ranges, [&metadata, &strategy, new_pending_ranges, all_left_metadata] (auto& r) {
auto t = r.end() ? r.end()->value() : dht::maximum_token();
auto current_endpoints = strategy.calculate_natural_endpoints(t, metadata);
auto new_endpoints = strategy.calculate_natural_endpoints(t, *all_left_metadata);
std::vector<inet_address> diff;
std::sort(current_endpoints.begin(), current_endpoints.end());
std::sort(new_endpoints.begin(), new_endpoints.end());
std::set_difference(new_endpoints.begin(), new_endpoints.end(),
current_endpoints.begin(), current_endpoints.end(), std::back_inserter(diff));
for (auto& ep : diff) {
new_pending_ranges->emplace(r, ep);
}
}).finally([affected_ranges_size] {
tlogger.debug("In calculate_pending_ranges: affected_ranges.size={} ends", affected_ranges_size);
});
});
}
void token_metadata::calculate_pending_ranges_for_bootstrap(
@@ -558,22 +562,23 @@ future<> token_metadata::calculate_pending_ranges(abstract_replication_strategy&
// Copy of metadata reflecting the situation after all leave operations are finished.
auto all_left_metadata = make_lw_shared<token_metadata>(clone_after_all_left());
calculate_pending_ranges_for_leaving(strategy, new_pending_ranges, all_left_metadata);
// At this stage newPendingRanges has been updated according to leave operations. We can
// now continue the calculation by checking bootstrapping nodes.
return calculate_pending_ranges_for_leaving(strategy, new_pending_ranges, all_left_metadata).then([this, keyspace_name, &strategy, new_pending_ranges, all_left_metadata] {
// At this stage newPendingRanges has been updated according to leave operations. We can
// now continue the calculation by checking bootstrapping nodes.
calculate_pending_ranges_for_bootstrap(strategy, new_pending_ranges, all_left_metadata);
calculate_pending_ranges_for_bootstrap(strategy, new_pending_ranges, all_left_metadata);
// At this stage newPendingRanges has been updated according to leaving and bootstrapping nodes.
// We can now finish the calculation by checking moving nodes.
// At this stage newPendingRanges has been updated according to leaving and bootstrapping nodes.
// We can now finish the calculation by checking moving nodes.
calculate_pending_ranges_for_moving(strategy, new_pending_ranges, all_left_metadata);
calculate_pending_ranges_for_moving(strategy, new_pending_ranges, all_left_metadata);
set_pending_ranges(keyspace_name, std::move(*new_pending_ranges));
set_pending_ranges(keyspace_name, std::move(*new_pending_ranges));
if (tlogger.is_enabled(logging::log_level::debug)) {
tlogger.debug("Pending ranges: {}", (_pending_ranges.empty() ? "<empty>" : print_pending_ranges()));
}
return make_ready_future<>();
});
if (tlogger.is_enabled(logging::log_level::debug)) {
tlogger.debug("Pending ranges: {}", (_pending_ranges.empty() ? "<empty>" : print_pending_ranges()));
}
return make_ready_future<>();
}
sstring token_metadata::print_pending_ranges() {

View File

@@ -653,7 +653,7 @@ public:
* changes state in the cluster, so it should be manageable.
*/
future<> calculate_pending_ranges(abstract_replication_strategy& strategy, const sstring& keyspace_name);
void calculate_pending_ranges_for_leaving(
future<> calculate_pending_ranges_for_leaving(
abstract_replication_strategy& strategy,
lw_shared_ptr<std::unordered_multimap<range<token>, inet_address>> new_pending_ranges,
lw_shared_ptr<token_metadata> all_left_metadata);