effective_replication_map: use new get_pending_endpoints and get_endpoints_for_reading

We already use the new pending_endpoints from erm though
the get_pending_ranges virtual function, in this commit
we update all the remaining places to use the new
implementation in erm, as well as remove the old implementation
in token_metadata.
This commit is contained in:
Petr Gusev
2023-05-01 18:16:08 +04:00
parent d4f004f5c7
commit 87307781c4
5 changed files with 62 additions and 119 deletions

View File

@@ -1637,7 +1637,7 @@ future<> view_update_generator::mutate_MV(
auto view_token = dht::get_token(*mut.s, mut.fm.key());
auto& keyspace_name = mut.s->ks_name();
auto target_endpoint = get_view_natural_endpoint(_proxy.local().local_db(), keyspace_name, base_token, view_token);
auto remote_endpoints = _proxy.local().get_token_metadata_ptr()->pending_endpoints_for(view_token, keyspace_name);
auto remote_endpoints = _proxy.local().local_db().find_keyspace(keyspace_name).get_effective_replication_map()->get_pending_endpoints(view_token);
auto sem_units = pending_view_updates.split(mut.fm.representation().size());
const bool update_synchronously = should_update_synchronously(*mut.s);

View File

@@ -281,14 +281,6 @@ public:
size_t count_normal_token_owners() const;
private:
future<> update_normal_token_owners();
enum class endpoints_field {
pending_endpoints,
read_endpoints
};
const std::unordered_set<inet_address>* maybe_migration_endpoints(endpoints_field field,
const token& token,
const sstring& keyspace_name) const;
public:
// returns empty vector if keyspace_name not found.
inet_address_vector_topology_change pending_endpoints_for(const token& token, const sstring& keyspace_name) const;
@@ -998,50 +990,6 @@ void token_metadata_impl::del_replacing_endpoint(inet_address existing_node) {
_replacing_endpoints.erase(existing_node);
}
const std::unordered_set<inet_address>* token_metadata_impl::maybe_migration_endpoints(endpoints_field field,
const token& token,
const sstring& keyspace_name) const
{
// Fast path 0: migration_info not found for this keyspace_name
const auto migration_info_it = _keyspace_to_migration_info.find(keyspace_name);
if (migration_info_it == _keyspace_to_migration_info.end()) {
return nullptr;
}
// Fast path 1: empty ring_mapping for this keyspace_name
const auto& migration_info = migration_info_it->second;
const auto& ring_mapping = field == endpoints_field::pending_endpoints
? migration_info.pending_endpoints
: migration_info.read_endpoints;
if (ring_mapping.empty()) {
return nullptr;
}
// Slow path: lookup remapping
const auto interval = range_to_interval(range<dht::token>(token));
const auto it = ring_mapping.find(interval);
return it != ring_mapping.end() ? &it->second : nullptr;
}
inet_address_vector_topology_change token_metadata_impl::pending_endpoints_for(const token& token, const sstring& keyspace_name) const {
inet_address_vector_topology_change endpoints;
const auto* pending_endpoints = maybe_migration_endpoints(endpoints_field::pending_endpoints,
token, keyspace_name);
if (pending_endpoints) {
// interval_map does not work with std::vector, convert to inet_address_vector_topology_change
endpoints = inet_address_vector_topology_change(pending_endpoints->begin(), pending_endpoints->end());
}
return endpoints;
}
std::optional<inet_address_vector_replica_set> token_metadata_impl::endpoints_for_reading(const token& token, const sstring& keyspace_name) const {
const auto* endpoints = maybe_migration_endpoints(endpoints_field::read_endpoints, token, keyspace_name);
if (endpoints == nullptr) {
return std::nullopt;
}
return inet_address_vector_replica_set(endpoints->begin(), endpoints->end());
}
std::map<token, inet_address> token_metadata_impl::get_normal_and_bootstrapping_token_to_endpoint_map() const {
std::map<token, inet_address> ret(_token_to_endpoint_map.begin(), _token_to_endpoint_map.end());
ret.insert(_bootstrap_tokens.begin(), _bootstrap_tokens.end());
@@ -1361,16 +1309,6 @@ token_metadata::count_normal_token_owners() const {
return _impl->count_normal_token_owners();
}
inet_address_vector_topology_change
token_metadata::pending_endpoints_for(const token& token, const sstring& keyspace_name) const {
return _impl->pending_endpoints_for(token, keyspace_name);
}
std::optional<inet_address_vector_replica_set>
token_metadata::endpoints_for_reading(const token& token, const sstring& keyspace_name) const {
return _impl->endpoints_for_reading(token, keyspace_name);
}
void
token_metadata::set_read_new(read_new_t read_new) {
_impl->set_read_new(read_new);

View File

@@ -269,14 +269,6 @@ public:
* Bootstrapping tokens are not taken into account. */
size_t count_normal_token_owners() const;
// returns empty vector if keyspace_name not found.
inet_address_vector_topology_change pending_endpoints_for(const token& token, const sstring& keyspace_name) const;
// This function returns a list of nodes to which a read request should be directed.
// Returns not null only during topology changes, if _topology_change_stage == read_new and
// new set of replicas differs from the old one.
std::optional<inet_address_vector_replica_set> endpoints_for_reading(const token& token, const sstring& keyspace_name) const;
// Updates the read_new flag, switching read requests from
// the old endpoints to the new ones during topology changes:
// read_new_t::no - no read_endpoints will be stored on update_pending_ranges, all reads goes to normal endpoints;

View File

@@ -6008,7 +6008,7 @@ void storage_proxy::sort_endpoints_by_proximity(const locator::topology& topo, i
}
inet_address_vector_replica_set storage_proxy::get_endpoints_for_reading(const sstring& ks_name, const locator::effective_replication_map& erm, const dht::token& token) const {
auto endpoints = erm.get_token_metadata_ptr()->endpoints_for_reading(token, ks_name);
auto endpoints = erm.get_endpoints_for_reading(token);
if (!endpoints) {
endpoints = erm.get_natural_endpoints_without_node_being_replaced(token);
}

View File

@@ -46,12 +46,13 @@ SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_bootstrap_first_node) {
{"replication_factor", "1"}
});
token_metadata->add_bootstrap_token(t1, e1);
token_metadata->update_pending_ranges(*replication_strategy, ks_name, get_dc_rack_fn).get();
BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(0), ks_name),
token_metadata->update_topology_change_info(get_dc_rack_fn).get();
auto erm = calculate_effective_replication_map(replication_strategy, token_metadata).get0();
BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(0)),
inet_address_vector_topology_change{e1});
BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(1), ks_name),
BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(1)),
inet_address_vector_topology_change{e1});
BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(2), ks_name),
BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(2)),
inet_address_vector_topology_change{e1});
}
@@ -70,16 +71,17 @@ SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_bootstrap_second_node) {
});
token_metadata->update_normal_tokens({t1}, e1).get();
token_metadata->add_bootstrap_token(t2, e2);
token_metadata->update_pending_ranges(*replication_strategy, ks_name, get_dc_rack_fn).get();
BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(0), ks_name),
token_metadata->update_topology_change_info(get_dc_rack_fn).get();
auto erm = calculate_effective_replication_map(replication_strategy, token_metadata).get0();
BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(0)),
inet_address_vector_topology_change{});
BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(1), ks_name),
BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(1)),
inet_address_vector_topology_change{});
BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(2), ks_name),
BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(2)),
inet_address_vector_topology_change{e2});
BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(100), ks_name),
BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(100)),
inet_address_vector_topology_change{e2});
BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(101), ks_name),
BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(101)),
inet_address_vector_topology_change{});
}
@@ -103,16 +105,17 @@ SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_bootstrap_with_replicas) {
token_metadata->update_normal_tokens({t1, t1000}, e2).get();
token_metadata->update_normal_tokens({t10}, e3).get();
token_metadata->add_bootstrap_token(t100, e1);
token_metadata->update_pending_ranges(*replication_strategy, ks_name, get_dc_rack_fn).get();
BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(1), ks_name),
token_metadata->update_topology_change_info(get_dc_rack_fn).get();
auto erm = calculate_effective_replication_map(replication_strategy, token_metadata).get0();
BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(1)),
inet_address_vector_topology_change{});
BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(2), ks_name),
BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(2)),
inet_address_vector_topology_change{e1});
BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(11), ks_name),
BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(11)),
inet_address_vector_topology_change{e1});
BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(100), ks_name),
BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(100)),
inet_address_vector_topology_change{e1});
BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(101), ks_name),
BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(101)),
inet_address_vector_topology_change{});
}
@@ -137,16 +140,17 @@ SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_leave_with_replicas) {
token_metadata->update_normal_tokens({t10}, e3).get();
token_metadata->update_normal_tokens({t100}, e1).get();
token_metadata->add_leaving_endpoint(e1);
token_metadata->update_pending_ranges(*replication_strategy, ks_name, get_dc_rack_fn).get();
BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(1), ks_name),
token_metadata->update_topology_change_info(get_dc_rack_fn).get();
auto erm = calculate_effective_replication_map(replication_strategy, token_metadata).get0();
BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(1)),
inet_address_vector_topology_change{});
BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(2), ks_name),
BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(2)),
inet_address_vector_topology_change{e2});
BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(11), ks_name),
BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(11)),
inet_address_vector_topology_change{e3});
BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(100), ks_name),
BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(100)),
inet_address_vector_topology_change{e3});
BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(101), ks_name),
BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(101)),
inet_address_vector_topology_change{});
}
@@ -173,20 +177,21 @@ SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_replace_with_replicas) {
token_metadata->update_normal_tokens({t1, t100}, e2).get();
token_metadata->update_normal_tokens({t10}, e3).get();
token_metadata->add_replacing_endpoint(e3, e4);
token_metadata->update_pending_ranges(*replication_strategy, ks_name, get_dc_rack_fn).get();
BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(100), ks_name),
token_metadata->update_topology_change_info(get_dc_rack_fn).get();
auto erm = calculate_effective_replication_map(replication_strategy, token_metadata).get0();
BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(100)),
inet_address_vector_topology_change{});
BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(1000), ks_name),
BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(1000)),
inet_address_vector_topology_change{});
BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(1001), ks_name),
BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(1001)),
inet_address_vector_topology_change{e4});
BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(1), ks_name),
BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(1)),
inet_address_vector_topology_change{e4});
BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(2), ks_name),
BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(2)),
inet_address_vector_topology_change{e4});
BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(10), ks_name),
BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(10)),
inet_address_vector_topology_change{e4});
BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(11), ks_name),
BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(11)),
inet_address_vector_topology_change{});
}
@@ -211,39 +216,46 @@ SEASTAR_THREAD_TEST_CASE(test_endpoints_for_reading_when_bootstrap_with_replicas
token_metadata->update_normal_tokens({t10}, e3).get();
token_metadata->add_bootstrap_token(t100, e1);
auto check_endpoints = [&](int64_t t, inet_address_vector_replica_set expected_replicas,
auto check_endpoints = [](mutable_vnode_erm_ptr erm, int64_t t,
inet_address_vector_replica_set expected_replicas,
seastar::compat::source_location sl = seastar::compat::source_location::current())
{
BOOST_TEST_INFO("line: " << sl.line());
const auto expected_set = std::unordered_set<inet_address>(expected_replicas.begin(),
expected_replicas.end());
const auto actual_replicas = token_metadata->endpoints_for_reading(dht::token::from_int64(t), ks_name);
const auto actual_replicas = erm->get_endpoints_for_reading(dht::token::from_int64(t));
BOOST_REQUIRE(actual_replicas.has_value());
const auto actual_set = std::unordered_set<inet_address>(actual_replicas->begin(),
actual_replicas->end());
BOOST_REQUIRE_EQUAL(expected_set, actual_set);
};
auto check_no_endpoints = [&](int64_t t,
auto check_no_endpoints = [](mutable_vnode_erm_ptr erm, int64_t t,
seastar::compat::source_location sl = seastar::compat::source_location::current())
{
BOOST_TEST_INFO("line: " << sl.line());
BOOST_REQUIRE(!token_metadata->endpoints_for_reading(dht::token::from_int64(t), ks_name).has_value());
BOOST_REQUIRE(!erm->get_endpoints_for_reading(dht::token::from_int64(t)).has_value());
};
token_metadata->update_pending_ranges(*replication_strategy, ks_name, get_dc_rack_fn).get();
check_no_endpoints(2);
{
token_metadata->update_topology_change_info(get_dc_rack_fn).get();
auto erm = calculate_effective_replication_map(replication_strategy, token_metadata).get0();
check_no_endpoints(erm, 2);
}
{
token_metadata->set_read_new(locator::token_metadata::read_new_t::yes);
token_metadata->update_pending_ranges(*replication_strategy, ks_name, get_dc_rack_fn).get();
token_metadata->update_topology_change_info(get_dc_rack_fn).get();
auto erm = calculate_effective_replication_map(replication_strategy, token_metadata).get0();
check_endpoints(2, {e3, e1});
check_endpoints(10, {e3, e1});
check_endpoints(11, {e1, e2});
check_endpoints(100, {e1, e2});
check_no_endpoints(101);
check_no_endpoints(1001);
check_no_endpoints(1);
check_endpoints(erm, 2, {e3, e1});
check_endpoints(erm, 10, {e3, e1});
check_endpoints(erm, 11, {e1, e2});
check_endpoints(erm, 100, {e1, e2});
check_no_endpoints(erm, 101);
check_no_endpoints(erm, 1001);
check_no_endpoints(erm, 1);
}
}
SEASTAR_THREAD_TEST_CASE(test_replace_node_with_same_endpoint) {
@@ -258,8 +270,9 @@ SEASTAR_THREAD_TEST_CASE(test_replace_node_with_same_endpoint) {
});
token_metadata->update_normal_tokens({t1}, e1).get();
token_metadata->add_replacing_endpoint(e1, e1);
token_metadata->update_pending_ranges(*replication_strategy, ks_name, get_dc_rack_fn).get();
BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(1), ks_name),
token_metadata->update_topology_change_info(get_dc_rack_fn).get();
auto erm = calculate_effective_replication_map(replication_strategy, token_metadata).get0();
BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(1)),
inet_address_vector_topology_change{e1});
BOOST_REQUIRE_EQUAL(token_metadata->get_endpoint(t1), e1);
}