calculate_effective_replication_map: compute pending_endpoints and read_endpoints
In this commit we add logic to calculate pending_endpoints and read_endpoints, similar to how it was done in update_pending_ranges. For situations where 'natural_endpoints_depend_on_token' is false we short-circuit the calculations, breaking out of the loop after the first iteration. In this case we add a single item with key=default_replication_map_key to the replication_map and set pending_endpoints/read_endpoints key range to the entire set of possible values. In the loop we iterate over all_tokens, which contains the union of all boundary tokens, from the old and from the new topology. In addition to updating pending_endpoints and read_endpoints in the loop, we remember the new natural endpoints in the replication_map if the current token is contained in the current set of boundary tokens.
This commit is contained in:
@@ -315,13 +315,74 @@ static const auto default_replication_map_key = dht::token::from_int64(0);
|
||||
|
||||
future<mutable_vnode_effective_replication_map_ptr> calculate_effective_replication_map(replication_strategy_ptr rs, token_metadata_ptr tmptr) {
|
||||
replication_map replication_map;
|
||||
ring_mapping pending_endpoints;
|
||||
ring_mapping read_endpoints;
|
||||
const auto depend_on_token = rs->natural_endpoints_depend_on_token();
|
||||
const auto& sorted_tokens = tmptr->sorted_tokens();
|
||||
replication_map.reserve(depend_on_token ? sorted_tokens.size() : 1);
|
||||
if (depend_on_token) {
|
||||
if (const auto& topology_changes = tmptr->get_topology_change_info(); topology_changes) {
|
||||
const auto& all_tokens = topology_changes->all_tokens;
|
||||
const auto& base_token_metadata = topology_changes->base_token_metadata
|
||||
? *topology_changes->base_token_metadata
|
||||
: *tmptr;
|
||||
const auto& current_tokens = tmptr->get_token_to_endpoint();
|
||||
for (size_t i = 0, size = all_tokens.size(); i < size; ++i) {
|
||||
co_await coroutine::maybe_yield();
|
||||
|
||||
const auto token = all_tokens[i];
|
||||
|
||||
auto current_endpoints = co_await rs->calculate_natural_endpoints(token, base_token_metadata);
|
||||
auto target_endpoints = co_await rs->calculate_natural_endpoints(token, topology_changes->target_token_metadata);
|
||||
|
||||
auto add_mapping = [&](ring_mapping& target, std::unordered_set<inet_address>&& endpoints) {
|
||||
using interval = ring_mapping::interval_type;
|
||||
if (!depend_on_token) {
|
||||
target += std::make_pair(
|
||||
interval::open(dht::minimum_token(), dht::maximum_token()),
|
||||
std::move(endpoints));
|
||||
} else if (i == 0) {
|
||||
target += std::make_pair(
|
||||
interval::open(all_tokens.back(), dht::maximum_token()),
|
||||
endpoints);
|
||||
target += std::make_pair(
|
||||
interval::left_open(dht::minimum_token(), token),
|
||||
std::move(endpoints));
|
||||
} else {
|
||||
target += std::make_pair(
|
||||
interval::left_open(all_tokens[i - 1], token),
|
||||
std::move(endpoints));
|
||||
}
|
||||
};
|
||||
|
||||
{
|
||||
std::unordered_set<inet_address> endpoints_diff;
|
||||
for (const auto& e: target_endpoints) {
|
||||
if (!current_endpoints.contains(e)) {
|
||||
endpoints_diff.insert(e);
|
||||
}
|
||||
}
|
||||
if (!endpoints_diff.empty()) {
|
||||
add_mapping(pending_endpoints, std::move(endpoints_diff));
|
||||
}
|
||||
}
|
||||
|
||||
// in order not to waste memory, we update read_endpoints only if the
|
||||
// new endpoints differs from the old one
|
||||
if (topology_changes->read_new && target_endpoints.get_vector() != current_endpoints.get_vector()) {
|
||||
add_mapping(read_endpoints, std::move(target_endpoints).extract_set());
|
||||
}
|
||||
|
||||
if (!depend_on_token) {
|
||||
replication_map.emplace(default_replication_map_key, std::move(current_endpoints).extract_vector());
|
||||
break;
|
||||
} else if (current_tokens.contains(token)) {
|
||||
replication_map.emplace(token, std::move(current_endpoints).extract_vector());
|
||||
}
|
||||
}
|
||||
} else if (depend_on_token) {
|
||||
for (const auto &t : sorted_tokens) {
|
||||
auto eps = co_await rs->calculate_natural_endpoints(t, *tmptr);
|
||||
replication_map.emplace(t, eps.get_vector());
|
||||
replication_map.emplace(t, std::move(eps).extract_vector());
|
||||
}
|
||||
} else {
|
||||
auto eps = co_await rs->calculate_natural_endpoints(default_replication_map_key, *tmptr);
|
||||
@@ -330,7 +391,7 @@ future<mutable_vnode_effective_replication_map_ptr> calculate_effective_replicat
|
||||
|
||||
auto rf = rs->get_replication_factor(*tmptr);
|
||||
co_return make_effective_replication_map(std::move(rs), std::move(tmptr), std::move(replication_map),
|
||||
ring_mapping{}, ring_mapping{}, rf);
|
||||
std::move(pending_endpoints), std::move(read_endpoints), rf);
|
||||
}
|
||||
|
||||
auto vnode_effective_replication_map::clone_data_gently() const -> future<std::unique_ptr<cloned_data>> {
|
||||
|
||||
Reference in New Issue
Block a user