From c17ce79977fb9eecc104ac0feceabb87c9538849 Mon Sep 17 00:00:00 2001 From: Asias He Date: Fri, 9 Feb 2018 17:17:35 +0800 Subject: [PATCH] token_metadata: Handle affected_ranges with do_for_each affected_ranges can be very large in a large cluster or node with big num_tokens account. calculate_natural_endpoints takes more time to process in this case as well. Futurize calculate_pending_ranges_for_leaving and handle the loop with do_for_each to give some time for the reactor to breath, so it does not block. --- locator/token_metadata.cc | 61 +++++++++++++++++++++------------------ locator/token_metadata.hh | 2 +- 2 files changed, 34 insertions(+), 29 deletions(-) diff --git a/locator/token_metadata.cc b/locator/token_metadata.cc index 5f413f19d3..4870dbc882 100644 --- a/locator/token_metadata.cc +++ b/locator/token_metadata.cc @@ -458,7 +458,7 @@ token_metadata::get_pending_ranges(sstring keyspace_name, inet_address endpoint) return ret; } -void token_metadata::calculate_pending_ranges_for_leaving( +future<> token_metadata::calculate_pending_ranges_for_leaving( abstract_replication_strategy& strategy, lw_shared_ptr, inet_address>> new_pending_ranges, lw_shared_ptr all_left_metadata) { @@ -474,21 +474,25 @@ void token_metadata::calculate_pending_ranges_for_leaving( // for each of those ranges, find what new nodes will be responsible for the range when // all leaving nodes are gone. auto metadata = clone_only_token_map(); // don't do this in the loop! #7758 - tlogger.debug("In calculate_pending_ranges: affected_ranges.size={} stars", affected_ranges.size()); - for (const auto& r : affected_ranges) { - auto t = r.end() ? r.end()->value() : dht::maximum_token(); - auto current_endpoints = strategy.calculate_natural_endpoints(t, metadata); - auto new_endpoints = strategy.calculate_natural_endpoints(t, *all_left_metadata); - std::vector diff; - std::sort(current_endpoints.begin(), current_endpoints.end()); - std::sort(new_endpoints.begin(), new_endpoints.end()); - std::set_difference(new_endpoints.begin(), new_endpoints.end(), - current_endpoints.begin(), current_endpoints.end(), std::back_inserter(diff)); - for (auto& ep : diff) { - new_pending_ranges->emplace(r, ep); - } - } - tlogger.debug("In calculate_pending_ranges: affected_ranges.size={} ends", affected_ranges.size()); + auto affected_ranges_size = affected_ranges.size(); + tlogger.debug("In calculate_pending_ranges: affected_ranges.size={} stars", affected_ranges_size); + return do_with(std::move(metadata), std::move(affected_ranges), [&strategy, new_pending_ranges, all_left_metadata, affected_ranges_size] (auto& metadata, auto& affected_ranges) { + return do_for_each(affected_ranges, [&metadata, &strategy, new_pending_ranges, all_left_metadata] (auto& r) { + auto t = r.end() ? r.end()->value() : dht::maximum_token(); + auto current_endpoints = strategy.calculate_natural_endpoints(t, metadata); + auto new_endpoints = strategy.calculate_natural_endpoints(t, *all_left_metadata); + std::vector diff; + std::sort(current_endpoints.begin(), current_endpoints.end()); + std::sort(new_endpoints.begin(), new_endpoints.end()); + std::set_difference(new_endpoints.begin(), new_endpoints.end(), + current_endpoints.begin(), current_endpoints.end(), std::back_inserter(diff)); + for (auto& ep : diff) { + new_pending_ranges->emplace(r, ep); + } + }).finally([affected_ranges_size] { + tlogger.debug("In calculate_pending_ranges: affected_ranges.size={} ends", affected_ranges_size); + }); + }); } void token_metadata::calculate_pending_ranges_for_bootstrap( @@ -558,22 +562,23 @@ future<> token_metadata::calculate_pending_ranges(abstract_replication_strategy& // Copy of metadata reflecting the situation after all leave operations are finished. auto all_left_metadata = make_lw_shared(clone_after_all_left()); - calculate_pending_ranges_for_leaving(strategy, new_pending_ranges, all_left_metadata); - // At this stage newPendingRanges has been updated according to leave operations. We can - // now continue the calculation by checking bootstrapping nodes. + return calculate_pending_ranges_for_leaving(strategy, new_pending_ranges, all_left_metadata).then([this, keyspace_name, &strategy, new_pending_ranges, all_left_metadata] { + // At this stage newPendingRanges has been updated according to leave operations. We can + // now continue the calculation by checking bootstrapping nodes. + calculate_pending_ranges_for_bootstrap(strategy, new_pending_ranges, all_left_metadata); - calculate_pending_ranges_for_bootstrap(strategy, new_pending_ranges, all_left_metadata); - // At this stage newPendingRanges has been updated according to leaving and bootstrapping nodes. - // We can now finish the calculation by checking moving nodes. + // At this stage newPendingRanges has been updated according to leaving and bootstrapping nodes. + // We can now finish the calculation by checking moving nodes. + calculate_pending_ranges_for_moving(strategy, new_pending_ranges, all_left_metadata); - calculate_pending_ranges_for_moving(strategy, new_pending_ranges, all_left_metadata); + set_pending_ranges(keyspace_name, std::move(*new_pending_ranges)); - set_pending_ranges(keyspace_name, std::move(*new_pending_ranges)); + if (tlogger.is_enabled(logging::log_level::debug)) { + tlogger.debug("Pending ranges: {}", (_pending_ranges.empty() ? "" : print_pending_ranges())); + } + return make_ready_future<>(); + }); - if (tlogger.is_enabled(logging::log_level::debug)) { - tlogger.debug("Pending ranges: {}", (_pending_ranges.empty() ? "" : print_pending_ranges())); - } - return make_ready_future<>(); } sstring token_metadata::print_pending_ranges() { diff --git a/locator/token_metadata.hh b/locator/token_metadata.hh index 7868a08a48..b74c32a94d 100644 --- a/locator/token_metadata.hh +++ b/locator/token_metadata.hh @@ -653,7 +653,7 @@ public: * changes state in the cluster, so it should be manageable. */ future<> calculate_pending_ranges(abstract_replication_strategy& strategy, const sstring& keyspace_name); - void calculate_pending_ranges_for_leaving( + future<> calculate_pending_ranges_for_leaving( abstract_replication_strategy& strategy, lw_shared_ptr, inet_address>> new_pending_ranges, lw_shared_ptr all_left_metadata);