diff --git a/locator/token_metadata.cc b/locator/token_metadata.cc index 0021dd4c6c..5f413f19d3 100644 --- a/locator/token_metadata.cc +++ b/locator/token_metadata.cc @@ -458,21 +458,11 @@ token_metadata::get_pending_ranges(sstring keyspace_name, inet_address endpoint) return ret; } -future<> token_metadata::calculate_pending_ranges(abstract_replication_strategy& strategy, const sstring& keyspace_name) { - std::unordered_multimap, inet_address> new_pending_ranges; - - if (_bootstrap_tokens.empty() && _leaving_endpoints.empty() && _moving_endpoints.empty()) { - tlogger.debug("No bootstrapping, leaving or moving nodes -> empty pending ranges for {}", keyspace_name); - set_pending_ranges(keyspace_name, std::move(new_pending_ranges)); - return make_ready_future<>(); - } - +void token_metadata::calculate_pending_ranges_for_leaving( + abstract_replication_strategy& strategy, + lw_shared_ptr, inet_address>> new_pending_ranges, + lw_shared_ptr all_left_metadata) { std::unordered_multimap address_ranges = strategy.get_address_ranges(*this); - - // FIMXE - // Copy of metadata reflecting the situation after all leave operations are finished. - auto all_left_metadata = clone_after_all_left(); - // get all ranges that will be affected by leaving nodes std::unordered_set> affected_ranges; for (auto endpoint : _leaving_endpoints) { @@ -488,21 +478,23 @@ future<> token_metadata::calculate_pending_ranges(abstract_replication_strategy& for (const auto& r : affected_ranges) { auto t = r.end() ? r.end()->value() : dht::maximum_token(); auto current_endpoints = strategy.calculate_natural_endpoints(t, metadata); - auto new_endpoints = strategy.calculate_natural_endpoints(t, all_left_metadata); + auto new_endpoints = strategy.calculate_natural_endpoints(t, *all_left_metadata); std::vector diff; std::sort(current_endpoints.begin(), current_endpoints.end()); std::sort(new_endpoints.begin(), new_endpoints.end()); std::set_difference(new_endpoints.begin(), new_endpoints.end(), current_endpoints.begin(), current_endpoints.end(), std::back_inserter(diff)); for (auto& ep : diff) { - new_pending_ranges.emplace(r, ep); + new_pending_ranges->emplace(r, ep); } } tlogger.debug("In calculate_pending_ranges: affected_ranges.size={} ends", affected_ranges.size()); +} - // At this stage newPendingRanges has been updated according to leave operations. We can - // now continue the calculation by checking bootstrapping nodes. - +void token_metadata::calculate_pending_ranges_for_bootstrap( + abstract_replication_strategy& strategy, + lw_shared_ptr, inet_address>> new_pending_ranges, + lw_shared_ptr all_left_metadata) { // For each of the bootstrapping nodes, simply add and remove them one by one to // allLeftMetadata and check in between what their ranges would be. std::unordered_multimap bootstrap_addresses; @@ -520,18 +512,20 @@ future<> token_metadata::calculate_pending_ranges(abstract_replication_strategy& for (auto& x : tmp) { auto& endpoint = x.first; auto& tokens = x.second; - all_left_metadata.update_normal_tokens(tokens, endpoint); - for (auto& x : strategy.get_address_ranges(all_left_metadata)) { + all_left_metadata->update_normal_tokens(tokens, endpoint); + for (auto& x : strategy.get_address_ranges(*all_left_metadata)) { if (x.first == endpoint) { - new_pending_ranges.emplace(x.second, endpoint); + new_pending_ranges->emplace(x.second, endpoint); } } - all_left_metadata.remove_endpoint(endpoint); + all_left_metadata->remove_endpoint(endpoint); } +} - // At this stage newPendingRanges has been updated according to leaving and bootstrapping nodes. - // We can now finish the calculation by checking moving nodes. - +void token_metadata::calculate_pending_ranges_for_moving( + abstract_replication_strategy& strategy, + lw_shared_ptr, inet_address>> new_pending_ranges, + lw_shared_ptr all_left_metadata) { // For each of the moving nodes, we do the same thing we did for bootstrapping: // simply add and remove them one by one to allLeftMetadata and check in between what their ranges would be. for (auto& moving : _moving_endpoints) { @@ -539,18 +533,42 @@ future<> token_metadata::calculate_pending_ranges(abstract_replication_strategy& auto& endpoint = moving.second; // address of the moving node // moving.left is a new token of the endpoint - all_left_metadata.update_normal_token(t, endpoint); + all_left_metadata->update_normal_token(t, endpoint); - for (auto& x : strategy.get_address_ranges(all_left_metadata)) { + for (auto& x : strategy.get_address_ranges(*all_left_metadata)) { if (x.first == endpoint) { - new_pending_ranges.emplace(x.second, endpoint); + new_pending_ranges->emplace(x.second, endpoint); } } - all_left_metadata.remove_endpoint(endpoint); + all_left_metadata->remove_endpoint(endpoint); + } +} + + +future<> token_metadata::calculate_pending_ranges(abstract_replication_strategy& strategy, const sstring& keyspace_name) { + auto new_pending_ranges = make_lw_shared, inet_address>>(); + + if (_bootstrap_tokens.empty() && _leaving_endpoints.empty() && _moving_endpoints.empty()) { + tlogger.debug("No bootstrapping, leaving or moving nodes -> empty pending ranges for {}", keyspace_name); + set_pending_ranges(keyspace_name, std::move(*new_pending_ranges)); + return make_ready_future<>(); } - set_pending_ranges(keyspace_name, std::move(new_pending_ranges)); + // Copy of metadata reflecting the situation after all leave operations are finished. + auto all_left_metadata = make_lw_shared(clone_after_all_left()); + + calculate_pending_ranges_for_leaving(strategy, new_pending_ranges, all_left_metadata); + // At this stage newPendingRanges has been updated according to leave operations. We can + // now continue the calculation by checking bootstrapping nodes. + + calculate_pending_ranges_for_bootstrap(strategy, new_pending_ranges, all_left_metadata); + // At this stage newPendingRanges has been updated according to leaving and bootstrapping nodes. + // We can now finish the calculation by checking moving nodes. + + calculate_pending_ranges_for_moving(strategy, new_pending_ranges, all_left_metadata); + + set_pending_ranges(keyspace_name, std::move(*new_pending_ranges)); if (tlogger.is_enabled(logging::log_level::debug)) { tlogger.debug("Pending ranges: {}", (_pending_ranges.empty() ? "" : print_pending_ranges())); diff --git a/locator/token_metadata.hh b/locator/token_metadata.hh index 8716c5260d..7868a08a48 100644 --- a/locator/token_metadata.hh +++ b/locator/token_metadata.hh @@ -653,6 +653,18 @@ public: * changes state in the cluster, so it should be manageable. */ future<> calculate_pending_ranges(abstract_replication_strategy& strategy, const sstring& keyspace_name); + void calculate_pending_ranges_for_leaving( + abstract_replication_strategy& strategy, + lw_shared_ptr, inet_address>> new_pending_ranges, + lw_shared_ptr all_left_metadata); + void calculate_pending_ranges_for_bootstrap( + abstract_replication_strategy& strategy, + lw_shared_ptr, inet_address>> new_pending_ranges, + lw_shared_ptr all_left_metadata); + void calculate_pending_ranges_for_moving( + abstract_replication_strategy& strategy, + lw_shared_ptr, inet_address>> new_pending_ranges, + lw_shared_ptr all_left_metadata); public: token get_predecessor(token t);