diff --git a/locator/abstract_replication_strategy.cc b/locator/abstract_replication_strategy.cc index 57df21ca09..2817736688 100644 --- a/locator/abstract_replication_strategy.cc +++ b/locator/abstract_replication_strategy.cc @@ -315,13 +315,74 @@ static const auto default_replication_map_key = dht::token::from_int64(0); future calculate_effective_replication_map(replication_strategy_ptr rs, token_metadata_ptr tmptr) { replication_map replication_map; + ring_mapping pending_endpoints; + ring_mapping read_endpoints; const auto depend_on_token = rs->natural_endpoints_depend_on_token(); const auto& sorted_tokens = tmptr->sorted_tokens(); replication_map.reserve(depend_on_token ? sorted_tokens.size() : 1); - if (depend_on_token) { + if (const auto& topology_changes = tmptr->get_topology_change_info(); topology_changes) { + const auto& all_tokens = topology_changes->all_tokens; + const auto& base_token_metadata = topology_changes->base_token_metadata + ? *topology_changes->base_token_metadata + : *tmptr; + const auto& current_tokens = tmptr->get_token_to_endpoint(); + for (size_t i = 0, size = all_tokens.size(); i < size; ++i) { + co_await coroutine::maybe_yield(); + + const auto token = all_tokens[i]; + + auto current_endpoints = co_await rs->calculate_natural_endpoints(token, base_token_metadata); + auto target_endpoints = co_await rs->calculate_natural_endpoints(token, topology_changes->target_token_metadata); + + auto add_mapping = [&](ring_mapping& target, std::unordered_set&& endpoints) { + using interval = ring_mapping::interval_type; + if (!depend_on_token) { + target += std::make_pair( + interval::open(dht::minimum_token(), dht::maximum_token()), + std::move(endpoints)); + } else if (i == 0) { + target += std::make_pair( + interval::open(all_tokens.back(), dht::maximum_token()), + endpoints); + target += std::make_pair( + interval::left_open(dht::minimum_token(), token), + std::move(endpoints)); + } else { + target += std::make_pair( + interval::left_open(all_tokens[i - 1], token), + std::move(endpoints)); + } + }; + + { + std::unordered_set endpoints_diff; + for (const auto& e: target_endpoints) { + if (!current_endpoints.contains(e)) { + endpoints_diff.insert(e); + } + } + if (!endpoints_diff.empty()) { + add_mapping(pending_endpoints, std::move(endpoints_diff)); + } + } + + // in order not to waste memory, we update read_endpoints only if the + // new endpoints differs from the old one + if (topology_changes->read_new && target_endpoints.get_vector() != current_endpoints.get_vector()) { + add_mapping(read_endpoints, std::move(target_endpoints).extract_set()); + } + + if (!depend_on_token) { + replication_map.emplace(default_replication_map_key, std::move(current_endpoints).extract_vector()); + break; + } else if (current_tokens.contains(token)) { + replication_map.emplace(token, std::move(current_endpoints).extract_vector()); + } + } + } else if (depend_on_token) { for (const auto &t : sorted_tokens) { auto eps = co_await rs->calculate_natural_endpoints(t, *tmptr); - replication_map.emplace(t, eps.get_vector()); + replication_map.emplace(t, std::move(eps).extract_vector()); } } else { auto eps = co_await rs->calculate_natural_endpoints(default_replication_map_key, *tmptr); @@ -330,7 +391,7 @@ future calculate_effective_replicat auto rf = rs->get_replication_factor(*tmptr); co_return make_effective_replication_map(std::move(rs), std::move(tmptr), std::move(replication_map), - ring_mapping{}, ring_mapping{}, rf); + std::move(pending_endpoints), std::move(read_endpoints), rf); } auto vnode_effective_replication_map::clone_data_gently() const -> future> {