calculate_effective_replication_map: use new token_metadata

In this commit we switch the function
calculate_effective_replication_map to use the new
token_metadata. We do this by employing our new helper
calculate_natural_ips function. We can't use this helper for
current_endpoints/target_endpoints though,
since in that case we won't add the IP to the
pending_endpoints in the replace-with-same-ip scenario

The token_metadata_test is migrated to host_ids in the same
commit to make it pass. Other tests work because they fill
both versions of the token_metadata, but for this test it was
simpler to just migrate it straight away. The test constructs
the old token_metadata over the new token_metadata,
this means only the get_new() method will work on it. That's
why we also need to switch some other functions
(maybe_remove_node_being_replaced, do_get_natural_endpoints,
get_replication_factor) to the new version in the same commit.

All the boost and topology tests pass with this change.
This commit is contained in:
Petr Gusev
2023-11-04 20:17:40 +04:00
parent fe3c543c4e
commit f5038f6c72
3 changed files with 122 additions and 73 deletions

View File

@@ -90,7 +90,7 @@ inet_address_vector_replica_set vnode_effective_replication_map::get_natural_end
void maybe_remove_node_being_replaced(const token_metadata& tm,
const abstract_replication_strategy& rs,
inet_address_vector_replica_set& natural_endpoints) {
if (tm.is_any_node_being_replaced() &&
if (tm.get_new()->is_any_node_being_replaced() &&
rs.allow_remove_node_being_replaced_from_natural_endpoints()) {
// When a new node is started to replace an existing dead node, we want
// to make the replacing node take writes but do not count it for
@@ -104,7 +104,8 @@ void maybe_remove_node_being_replaced(const token_metadata& tm,
// as the natural_endpoints and the node will not appear in the
// pending_endpoints.
auto it = boost::range::remove_if(natural_endpoints, [&] (gms::inet_address& p) {
return tm.is_being_replaced(p);
const auto host_id = tm.get_new()->get_host_id(p);
return tm.get_new()->is_being_replaced(host_id);
});
natural_endpoints.erase(it, natural_endpoints.end());
}
@@ -376,21 +377,22 @@ future<mutable_vnode_effective_replication_map_ptr> calculate_effective_replicat
ring_mapping pending_endpoints;
ring_mapping read_endpoints;
const auto depend_on_token = rs->natural_endpoints_depend_on_token();
const auto& sorted_tokens = tmptr->sorted_tokens();
auto tmpr_new = tmptr->get_new_strong();
const auto& sorted_tokens = tmpr_new->sorted_tokens();
replication_map.reserve(depend_on_token ? sorted_tokens.size() : 1);
if (const auto& topology_changes = tmptr->get_topology_change_info(); topology_changes) {
if (const auto& topology_changes = tmpr_new->get_topology_change_info(); topology_changes) {
const auto& all_tokens = topology_changes->all_tokens;
const auto& base_token_metadata = topology_changes->base_token_metadata
? *topology_changes->base_token_metadata
: *tmptr;
const auto& current_tokens = tmptr->get_token_to_endpoint();
? topology_changes->base_token_metadata
: tmpr_new;
const auto& current_tokens = tmpr_new->get_token_to_endpoint();
for (size_t i = 0, size = all_tokens.size(); i < size; ++i) {
co_await coroutine::maybe_yield();
const auto token = all_tokens[i];
auto current_endpoints = get<endpoint_set>(co_await rs->calculate_natural_endpoints(token, base_token_metadata, false));
auto target_endpoints = get<endpoint_set>(co_await rs->calculate_natural_endpoints(token, *topology_changes->target_token_metadata, false));
auto current_endpoints = co_await rs->calculate_natural_endpoints(token, *base_token_metadata);
auto target_endpoints = co_await rs->calculate_natural_endpoints(token, *topology_changes->target_token_metadata);
auto add_mapping = [&](ring_mapping& target, std::unordered_set<inet_address>&& endpoints) {
using interval = ring_mapping::interval_type;
@@ -413,37 +415,37 @@ future<mutable_vnode_effective_replication_map_ptr> calculate_effective_replicat
};
{
std::unordered_set<inet_address> endpoints_diff;
host_id_set endpoints_diff;
for (const auto& e: target_endpoints) {
if (!current_endpoints.contains(e)) {
endpoints_diff.insert(e);
}
}
if (!endpoints_diff.empty()) {
add_mapping(pending_endpoints, std::move(endpoints_diff));
add_mapping(pending_endpoints, resolve_endpoints(endpoints_diff, *base_token_metadata).extract_set());
}
}
// in order not to waste memory, we update read_endpoints only if the
// new endpoints differs from the old one
if (topology_changes->read_new && target_endpoints.get_vector() != current_endpoints.get_vector()) {
add_mapping(read_endpoints, std::move(target_endpoints).extract_set());
add_mapping(read_endpoints, resolve_endpoints(target_endpoints, *base_token_metadata).extract_set());
}
if (!depend_on_token) {
replication_map.emplace(default_replication_map_key, std::move(current_endpoints).extract_vector());
replication_map.emplace(default_replication_map_key, resolve_endpoints(current_endpoints, *base_token_metadata).extract_vector());
break;
} else if (current_tokens.contains(token)) {
replication_map.emplace(token, std::move(current_endpoints).extract_vector());
replication_map.emplace(token, resolve_endpoints(current_endpoints, *base_token_metadata).extract_vector());
}
}
} else if (depend_on_token) {
for (const auto &t : sorted_tokens) {
auto eps = get<endpoint_set>(co_await rs->calculate_natural_endpoints(t, *tmptr, false));
auto eps = co_await rs->calculate_natural_ips(t, tmpr_new);
replication_map.emplace(t, std::move(eps).extract_vector());
}
} else {
auto eps = get<endpoint_set>(co_await rs->calculate_natural_endpoints(default_replication_map_key, *tmptr, false));
auto eps = co_await rs->calculate_natural_ips(default_replication_map_key, tmpr_new);
replication_map.emplace(default_replication_map_key, std::move(eps).extract_vector());
}
@@ -476,7 +478,7 @@ const inet_address_vector_replica_set& vnode_effective_replication_map::do_get_n
bool is_vnode) const
{
const token& key_token = _rs->natural_endpoints_depend_on_token()
? (is_vnode ? tok : _tmptr->first_token(tok))
? (is_vnode ? tok : _tmptr->get_new()->first_token(tok))
: default_replication_map_key;
const auto it = _replication_map.find(key_token);
return it->second;

View File

@@ -32,7 +32,7 @@ future<natural_ep_type> everywhere_replication_strategy::calculate_natural_endpo
}
size_t everywhere_replication_strategy::get_replication_factor(const token_metadata& tm) const {
return tm.sorted_tokens().empty() ? 1 : tm.count_normal_token_owners();
return tm.get_new()->sorted_tokens().empty() ? 1 : tm.get_new()->count_normal_token_owners();
}
using registry = class_registrator<abstract_replication_strategy, everywhere_replication_strategy, const replication_strategy_config_options&>;

View File

@@ -17,43 +17,50 @@ using namespace locator;
namespace {
const auto ks_name = sstring("test-ks");
endpoint_dc_rack get_dc_rack(gms::inet_address) {
host_id gen_id(int id) {
return host_id{utils::UUID(0, id)};
}
endpoint_dc_rack get_dc_rack(host_id) {
return {
.dc = "unk-dc",
.rack = "unk-rack"
};
}
mutable_token_metadata_ptr create_token_metadata(inet_address this_endpoint) {
return make_lw_shared<token_metadata>(token_metadata::config {
mutable_token_metadata2_ptr create_token_metadata(host_id this_host_id) {
return make_lw_shared<token_metadata2>(token_metadata::config {
topology::config {
.this_endpoint = this_endpoint,
.this_cql_address = this_endpoint,
.local_dc_rack = get_dc_rack(this_endpoint)
.this_host_id = this_host_id,
.local_dc_rack = get_dc_rack(this_host_id)
}
});
}
template <typename Strategy>
mutable_vnode_erm_ptr create_erm(mutable_token_metadata_ptr tmptr, replication_strategy_config_options opts = {}) {
dc_rack_fn<gms::inet_address> get_dc_rack_fn = get_dc_rack;
mutable_vnode_erm_ptr create_erm(mutable_token_metadata2_ptr tmptr, replication_strategy_config_options opts = {}) {
dc_rack_fn<host_id> get_dc_rack_fn = get_dc_rack;
tmptr->update_topology_change_info(get_dc_rack_fn).get();
auto strategy = seastar::make_shared<Strategy>(std::move(opts));
return calculate_effective_replication_map(std::move(strategy), std::move(tmptr)).get0();
return calculate_effective_replication_map(std::move(strategy), make_token_metadata_ptr(tmptr)).get0();
}
}
SEASTAR_THREAD_TEST_CASE(test_pending_and_read_endpoints_for_everywhere_strategy) {
const auto e1 = inet_address("192.168.0.1");
const auto e2 = inet_address("192.168.0.2");
const auto e1_id = gen_id(1);
const auto e2_id = gen_id(2);
const auto t1 = dht::token::from_int64(10);
const auto t2 = dht::token::from_int64(20);
auto token_metadata = create_token_metadata(e1);
token_metadata->update_topology(e1, get_dc_rack(e1));
token_metadata->update_topology(e2, get_dc_rack(e2));
token_metadata->update_normal_tokens({t1}, e1).get();
token_metadata->add_bootstrap_token(t2, e2);
auto token_metadata = create_token_metadata(e1_id);
token_metadata->update_host_id(e1_id, e1);
token_metadata->update_host_id(e2_id, e2);
token_metadata->update_topology(e1_id, get_dc_rack(e1_id));
token_metadata->update_topology(e2_id, get_dc_rack(e2_id));
token_metadata->update_normal_tokens({t1}, e1_id).get();
token_metadata->add_bootstrap_token(t2, e2_id);
token_metadata->set_read_new(token_metadata::read_new_t::yes);
auto erm = create_erm<everywhere_replication_strategy>(token_metadata);
@@ -68,12 +75,16 @@ SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_bootstrap_second_node) {
const auto t1 = dht::token::from_int64(1);
const auto e2 = inet_address("192.168.0.2");
const auto t2 = dht::token::from_int64(100);
const auto e1_id = gen_id(1);
const auto e2_id = gen_id(2);
auto token_metadata = create_token_metadata(e1);
token_metadata->update_topology(e1, get_dc_rack(e1));
token_metadata->update_topology(e2, get_dc_rack(e2));
token_metadata->update_normal_tokens({t1}, e1).get();
token_metadata->add_bootstrap_token(t2, e2);
auto token_metadata = create_token_metadata(e1_id);
token_metadata->update_host_id(e1_id, e1);
token_metadata->update_host_id(e2_id, e2);
token_metadata->update_topology(e1_id, get_dc_rack(e1_id));
token_metadata->update_topology(e2_id, get_dc_rack(e2_id));
token_metadata->update_normal_tokens({t1}, e1_id).get();
token_metadata->add_bootstrap_token(t2, e2_id);
auto erm = create_erm<simple_strategy>(token_metadata, {{"replication_factor", "1"}});
BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(0)),
@@ -96,14 +107,20 @@ SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_bootstrap_with_replicas) {
const auto e1 = inet_address("192.168.0.1");
const auto e2 = inet_address("192.168.0.2");
const auto e3 = inet_address("192.168.0.3");
const auto e1_id = gen_id(1);
const auto e2_id = gen_id(2);
const auto e3_id = gen_id(3);
auto token_metadata = create_token_metadata(e1);
token_metadata->update_topology(e1, get_dc_rack(e1));
token_metadata->update_topology(e2, get_dc_rack(e2));
token_metadata->update_topology(e3, get_dc_rack(e3));
token_metadata->update_normal_tokens({t1, t1000}, e2).get();
token_metadata->update_normal_tokens({t10}, e3).get();
token_metadata->add_bootstrap_token(t100, e1);
auto token_metadata = create_token_metadata(e1_id);
token_metadata->update_host_id(e1_id, e1);
token_metadata->update_host_id(e2_id, e2);
token_metadata->update_host_id(e3_id, e3);
token_metadata->update_topology(e1_id, get_dc_rack(e1_id));
token_metadata->update_topology(e2_id, get_dc_rack(e2_id));
token_metadata->update_topology(e3_id, get_dc_rack(e3_id));
token_metadata->update_normal_tokens({t1, t1000}, e2_id).get();
token_metadata->update_normal_tokens({t10}, e3_id).get();
token_metadata->add_bootstrap_token(t100, e1_id);
auto erm = create_erm<simple_strategy>(token_metadata, {{"replication_factor", "2"}});
BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(1)),
@@ -126,15 +143,21 @@ SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_leave_with_replicas) {
const auto e1 = inet_address("192.168.0.1");
const auto e2 = inet_address("192.168.0.2");
const auto e3 = inet_address("192.168.0.3");
const auto e1_id = gen_id(1);
const auto e2_id = gen_id(2);
const auto e3_id = gen_id(3);
auto token_metadata = create_token_metadata(e1);
token_metadata->update_topology(e1, get_dc_rack(e1));
token_metadata->update_topology(e2, get_dc_rack(e2));
token_metadata->update_topology(e3, get_dc_rack(e3));
token_metadata->update_normal_tokens({t1, t1000}, e2).get();
token_metadata->update_normal_tokens({t10}, e3).get();
token_metadata->update_normal_tokens({t100}, e1).get();
token_metadata->add_leaving_endpoint(e1);
auto token_metadata = create_token_metadata(e1_id);
token_metadata->update_host_id(e1_id, e1);
token_metadata->update_host_id(e2_id, e2);
token_metadata->update_host_id(e3_id, e3);
token_metadata->update_topology(e1_id, get_dc_rack(e1_id));
token_metadata->update_topology(e2_id, get_dc_rack(e2_id));
token_metadata->update_topology(e3_id, get_dc_rack(e3_id));
token_metadata->update_normal_tokens({t1, t1000}, e2_id).get();
token_metadata->update_normal_tokens({t10}, e3_id).get();
token_metadata->update_normal_tokens({t100}, e1_id).get();
token_metadata->add_leaving_endpoint(e1_id);
auto erm = create_erm<simple_strategy>(token_metadata, {{"replication_factor", "2"}});
BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(1)),
@@ -158,16 +181,24 @@ SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_replace_with_replicas) {
const auto e2 = inet_address("192.168.0.2");
const auto e3 = inet_address("192.168.0.3");
const auto e4 = inet_address("192.168.0.4");
const auto e1_id = gen_id(1);
const auto e2_id = gen_id(2);
const auto e3_id = gen_id(3);
const auto e4_id = gen_id(4);
auto token_metadata = create_token_metadata(e1);
token_metadata->update_topology(e1, get_dc_rack(e1));
token_metadata->update_topology(e2, get_dc_rack(e2));
token_metadata->update_topology(e3, get_dc_rack(e3));
token_metadata->update_topology(e4, get_dc_rack(e4));
token_metadata->update_normal_tokens({t1000}, e1).get();
token_metadata->update_normal_tokens({t1, t100}, e2).get();
token_metadata->update_normal_tokens({t10}, e3).get();
token_metadata->add_replacing_endpoint(e3, e4);
auto token_metadata = create_token_metadata(e1_id);
token_metadata->update_host_id(e1_id, e1);
token_metadata->update_host_id(e2_id, e2);
token_metadata->update_host_id(e3_id, e3);
token_metadata->update_host_id(e4_id, e4);
token_metadata->update_topology(e1_id, get_dc_rack(e1_id));
token_metadata->update_topology(e2_id, get_dc_rack(e2_id));
token_metadata->update_topology(e3_id, get_dc_rack(e3_id));
token_metadata->update_topology(e4_id, get_dc_rack(e4_id));
token_metadata->update_normal_tokens({t1000}, e1_id).get();
token_metadata->update_normal_tokens({t1, t100}, e2_id).get();
token_metadata->update_normal_tokens({t10}, e3_id).get();
token_metadata->add_replacing_endpoint(e3_id, e4_id);
auto erm = create_erm<simple_strategy>(token_metadata, {{"replication_factor", "2"}});
BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(100)),
@@ -194,14 +225,20 @@ SEASTAR_THREAD_TEST_CASE(test_endpoints_for_reading_when_bootstrap_with_replicas
const auto e1 = inet_address("192.168.0.1");
const auto e2 = inet_address("192.168.0.2");
const auto e3 = inet_address("192.168.0.3");
const auto e1_id = gen_id(1);
const auto e2_id = gen_id(2);
const auto e3_id = gen_id(3);
auto token_metadata = create_token_metadata(e1);
token_metadata->update_topology(e1, get_dc_rack(e1));
token_metadata->update_topology(e2, get_dc_rack(e2));
token_metadata->update_topology(e3, get_dc_rack(e3));
token_metadata->update_normal_tokens({t1, t1000}, e2).get();
token_metadata->update_normal_tokens({t10}, e3).get();
token_metadata->add_bootstrap_token(t100, e1);
auto token_metadata = create_token_metadata(e1_id);
token_metadata->update_host_id(e1_id, e1);
token_metadata->update_host_id(e2_id, e2);
token_metadata->update_host_id(e3_id, e3);
token_metadata->update_topology(e1_id, get_dc_rack(e1_id));
token_metadata->update_topology(e2_id, get_dc_rack(e2_id));
token_metadata->update_topology(e3_id, get_dc_rack(e3_id));
token_metadata->update_normal_tokens({t1, t1000}, e2_id).get();
token_metadata->update_normal_tokens({t10}, e3_id).get();
token_metadata->add_bootstrap_token(t100, e1_id);
auto check_endpoints = [](mutable_vnode_erm_ptr erm, int64_t t,
inet_address_vector_replica_set expected_replicas,
@@ -246,14 +283,24 @@ SEASTAR_THREAD_TEST_CASE(test_endpoints_for_reading_when_bootstrap_with_replicas
SEASTAR_THREAD_TEST_CASE(test_replace_node_with_same_endpoint) {
const auto t1 = dht::token::from_int64(1);
const auto e1 = inet_address("192.168.0.1");
const auto e1_id1 = gen_id(1);
const auto e1_id2 = gen_id(2);
auto token_metadata = create_token_metadata(e1);
token_metadata->update_topology(e1, get_dc_rack(e1));
token_metadata->update_normal_tokens({t1}, e1).get();
token_metadata->add_replacing_endpoint(e1, e1);
auto token_metadata = create_token_metadata(e1_id2);
token_metadata->update_host_id(e1_id1, e1);
token_metadata->update_topology(e1_id1, get_dc_rack(e1_id1), node::state::being_replaced);
token_metadata->update_normal_tokens({t1}, e1_id1).get();
token_metadata->update_topology(e1_id2, get_dc_rack(e1_id2), node::state::replacing);
token_metadata->update_host_id(e1_id2, e1);
token_metadata->add_replacing_endpoint(e1_id1, e1_id2);
auto erm = create_erm<simple_strategy>(token_metadata, {{"replication_factor", "2"}});
BOOST_REQUIRE_EQUAL(token_metadata->get_host_id(e1), e1_id1);
BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(1)),
inet_address_vector_topology_change{e1});
BOOST_REQUIRE_EQUAL(token_metadata->get_endpoint(t1), e1);
BOOST_REQUIRE_EQUAL(erm->get_natural_endpoints_without_node_being_replaced(dht::token::from_int64(1)),
inet_address_vector_replica_set{});
BOOST_REQUIRE_EQUAL(token_metadata->get_endpoint(t1), e1_id1);
}