diff --git a/db/view/view.cc b/db/view/view.cc index adfbcc8498..20588eed9e 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -1637,7 +1637,7 @@ future<> view_update_generator::mutate_MV( auto view_token = dht::get_token(*mut.s, mut.fm.key()); auto& keyspace_name = mut.s->ks_name(); auto target_endpoint = get_view_natural_endpoint(_proxy.local().local_db(), keyspace_name, base_token, view_token); - auto remote_endpoints = _proxy.local().get_token_metadata_ptr()->pending_endpoints_for(view_token, keyspace_name); + auto remote_endpoints = _proxy.local().local_db().find_keyspace(keyspace_name).get_effective_replication_map()->get_pending_endpoints(view_token); auto sem_units = pending_view_updates.split(mut.fm.representation().size()); const bool update_synchronously = should_update_synchronously(*mut.s); diff --git a/locator/token_metadata.cc b/locator/token_metadata.cc index 0a30fe8a98..491d720337 100644 --- a/locator/token_metadata.cc +++ b/locator/token_metadata.cc @@ -281,14 +281,6 @@ public: size_t count_normal_token_owners() const; private: future<> update_normal_token_owners(); - - enum class endpoints_field { - pending_endpoints, - read_endpoints - }; - const std::unordered_set* maybe_migration_endpoints(endpoints_field field, - const token& token, - const sstring& keyspace_name) const; public: // returns empty vector if keyspace_name not found. inet_address_vector_topology_change pending_endpoints_for(const token& token, const sstring& keyspace_name) const; @@ -998,50 +990,6 @@ void token_metadata_impl::del_replacing_endpoint(inet_address existing_node) { _replacing_endpoints.erase(existing_node); } -const std::unordered_set* token_metadata_impl::maybe_migration_endpoints(endpoints_field field, - const token& token, - const sstring& keyspace_name) const -{ - // Fast path 0: migration_info not found for this keyspace_name - const auto migration_info_it = _keyspace_to_migration_info.find(keyspace_name); - if (migration_info_it == _keyspace_to_migration_info.end()) { - return nullptr; - } - - // Fast path 1: empty ring_mapping for this keyspace_name - const auto& migration_info = migration_info_it->second; - const auto& ring_mapping = field == endpoints_field::pending_endpoints - ? migration_info.pending_endpoints - : migration_info.read_endpoints; - if (ring_mapping.empty()) { - return nullptr; - } - - // Slow path: lookup remapping - const auto interval = range_to_interval(range(token)); - const auto it = ring_mapping.find(interval); - return it != ring_mapping.end() ? &it->second : nullptr; -} - -inet_address_vector_topology_change token_metadata_impl::pending_endpoints_for(const token& token, const sstring& keyspace_name) const { - inet_address_vector_topology_change endpoints; - const auto* pending_endpoints = maybe_migration_endpoints(endpoints_field::pending_endpoints, - token, keyspace_name); - if (pending_endpoints) { - // interval_map does not work with std::vector, convert to inet_address_vector_topology_change - endpoints = inet_address_vector_topology_change(pending_endpoints->begin(), pending_endpoints->end()); - } - return endpoints; -} - -std::optional token_metadata_impl::endpoints_for_reading(const token& token, const sstring& keyspace_name) const { - const auto* endpoints = maybe_migration_endpoints(endpoints_field::read_endpoints, token, keyspace_name); - if (endpoints == nullptr) { - return std::nullopt; - } - return inet_address_vector_replica_set(endpoints->begin(), endpoints->end()); -} - std::map token_metadata_impl::get_normal_and_bootstrapping_token_to_endpoint_map() const { std::map ret(_token_to_endpoint_map.begin(), _token_to_endpoint_map.end()); ret.insert(_bootstrap_tokens.begin(), _bootstrap_tokens.end()); @@ -1361,16 +1309,6 @@ token_metadata::count_normal_token_owners() const { return _impl->count_normal_token_owners(); } -inet_address_vector_topology_change -token_metadata::pending_endpoints_for(const token& token, const sstring& keyspace_name) const { - return _impl->pending_endpoints_for(token, keyspace_name); -} - -std::optional -token_metadata::endpoints_for_reading(const token& token, const sstring& keyspace_name) const { - return _impl->endpoints_for_reading(token, keyspace_name); -} - void token_metadata::set_read_new(read_new_t read_new) { _impl->set_read_new(read_new); diff --git a/locator/token_metadata.hh b/locator/token_metadata.hh index 947217a3cd..860ace552f 100644 --- a/locator/token_metadata.hh +++ b/locator/token_metadata.hh @@ -269,14 +269,6 @@ public: * Bootstrapping tokens are not taken into account. */ size_t count_normal_token_owners() const; - // returns empty vector if keyspace_name not found. - inet_address_vector_topology_change pending_endpoints_for(const token& token, const sstring& keyspace_name) const; - - // This function returns a list of nodes to which a read request should be directed. - // Returns not null only during topology changes, if _topology_change_stage == read_new and - // new set of replicas differs from the old one. - std::optional endpoints_for_reading(const token& token, const sstring& keyspace_name) const; - // Updates the read_new flag, switching read requests from // the old endpoints to the new ones during topology changes: // read_new_t::no - no read_endpoints will be stored on update_pending_ranges, all reads goes to normal endpoints; diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 209a10baba..8531901af0 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -6008,7 +6008,7 @@ void storage_proxy::sort_endpoints_by_proximity(const locator::topology& topo, i } inet_address_vector_replica_set storage_proxy::get_endpoints_for_reading(const sstring& ks_name, const locator::effective_replication_map& erm, const dht::token& token) const { - auto endpoints = erm.get_token_metadata_ptr()->endpoints_for_reading(token, ks_name); + auto endpoints = erm.get_endpoints_for_reading(token); if (!endpoints) { endpoints = erm.get_natural_endpoints_without_node_being_replaced(token); } diff --git a/test/boost/token_metadata_test.cc b/test/boost/token_metadata_test.cc index 18db80dfbf..530e7f2f03 100644 --- a/test/boost/token_metadata_test.cc +++ b/test/boost/token_metadata_test.cc @@ -46,12 +46,13 @@ SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_bootstrap_first_node) { {"replication_factor", "1"} }); token_metadata->add_bootstrap_token(t1, e1); - token_metadata->update_pending_ranges(*replication_strategy, ks_name, get_dc_rack_fn).get(); - BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(0), ks_name), + token_metadata->update_topology_change_info(get_dc_rack_fn).get(); + auto erm = calculate_effective_replication_map(replication_strategy, token_metadata).get0(); + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(0)), inet_address_vector_topology_change{e1}); - BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(1), ks_name), + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(1)), inet_address_vector_topology_change{e1}); - BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(2), ks_name), + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(2)), inet_address_vector_topology_change{e1}); } @@ -70,16 +71,17 @@ SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_bootstrap_second_node) { }); token_metadata->update_normal_tokens({t1}, e1).get(); token_metadata->add_bootstrap_token(t2, e2); - token_metadata->update_pending_ranges(*replication_strategy, ks_name, get_dc_rack_fn).get(); - BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(0), ks_name), + token_metadata->update_topology_change_info(get_dc_rack_fn).get(); + auto erm = calculate_effective_replication_map(replication_strategy, token_metadata).get0(); + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(0)), inet_address_vector_topology_change{}); - BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(1), ks_name), + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(1)), inet_address_vector_topology_change{}); - BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(2), ks_name), + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(2)), inet_address_vector_topology_change{e2}); - BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(100), ks_name), + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(100)), inet_address_vector_topology_change{e2}); - BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(101), ks_name), + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(101)), inet_address_vector_topology_change{}); } @@ -103,16 +105,17 @@ SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_bootstrap_with_replicas) { token_metadata->update_normal_tokens({t1, t1000}, e2).get(); token_metadata->update_normal_tokens({t10}, e3).get(); token_metadata->add_bootstrap_token(t100, e1); - token_metadata->update_pending_ranges(*replication_strategy, ks_name, get_dc_rack_fn).get(); - BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(1), ks_name), + token_metadata->update_topology_change_info(get_dc_rack_fn).get(); + auto erm = calculate_effective_replication_map(replication_strategy, token_metadata).get0(); + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(1)), inet_address_vector_topology_change{}); - BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(2), ks_name), + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(2)), inet_address_vector_topology_change{e1}); - BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(11), ks_name), + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(11)), inet_address_vector_topology_change{e1}); - BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(100), ks_name), + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(100)), inet_address_vector_topology_change{e1}); - BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(101), ks_name), + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(101)), inet_address_vector_topology_change{}); } @@ -137,16 +140,17 @@ SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_leave_with_replicas) { token_metadata->update_normal_tokens({t10}, e3).get(); token_metadata->update_normal_tokens({t100}, e1).get(); token_metadata->add_leaving_endpoint(e1); - token_metadata->update_pending_ranges(*replication_strategy, ks_name, get_dc_rack_fn).get(); - BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(1), ks_name), + token_metadata->update_topology_change_info(get_dc_rack_fn).get(); + auto erm = calculate_effective_replication_map(replication_strategy, token_metadata).get0(); + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(1)), inet_address_vector_topology_change{}); - BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(2), ks_name), + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(2)), inet_address_vector_topology_change{e2}); - BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(11), ks_name), + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(11)), inet_address_vector_topology_change{e3}); - BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(100), ks_name), + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(100)), inet_address_vector_topology_change{e3}); - BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(101), ks_name), + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(101)), inet_address_vector_topology_change{}); } @@ -173,20 +177,21 @@ SEASTAR_THREAD_TEST_CASE(test_pending_endpoints_for_replace_with_replicas) { token_metadata->update_normal_tokens({t1, t100}, e2).get(); token_metadata->update_normal_tokens({t10}, e3).get(); token_metadata->add_replacing_endpoint(e3, e4); - token_metadata->update_pending_ranges(*replication_strategy, ks_name, get_dc_rack_fn).get(); - BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(100), ks_name), + token_metadata->update_topology_change_info(get_dc_rack_fn).get(); + auto erm = calculate_effective_replication_map(replication_strategy, token_metadata).get0(); + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(100)), inet_address_vector_topology_change{}); - BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(1000), ks_name), + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(1000)), inet_address_vector_topology_change{}); - BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(1001), ks_name), + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(1001)), inet_address_vector_topology_change{e4}); - BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(1), ks_name), + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(1)), inet_address_vector_topology_change{e4}); - BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(2), ks_name), + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(2)), inet_address_vector_topology_change{e4}); - BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(10), ks_name), + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(10)), inet_address_vector_topology_change{e4}); - BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(11), ks_name), + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(11)), inet_address_vector_topology_change{}); } @@ -211,39 +216,46 @@ SEASTAR_THREAD_TEST_CASE(test_endpoints_for_reading_when_bootstrap_with_replicas token_metadata->update_normal_tokens({t10}, e3).get(); token_metadata->add_bootstrap_token(t100, e1); - auto check_endpoints = [&](int64_t t, inet_address_vector_replica_set expected_replicas, + auto check_endpoints = [](mutable_vnode_erm_ptr erm, int64_t t, + inet_address_vector_replica_set expected_replicas, seastar::compat::source_location sl = seastar::compat::source_location::current()) { BOOST_TEST_INFO("line: " << sl.line()); const auto expected_set = std::unordered_set(expected_replicas.begin(), expected_replicas.end()); - const auto actual_replicas = token_metadata->endpoints_for_reading(dht::token::from_int64(t), ks_name); + const auto actual_replicas = erm->get_endpoints_for_reading(dht::token::from_int64(t)); BOOST_REQUIRE(actual_replicas.has_value()); const auto actual_set = std::unordered_set(actual_replicas->begin(), actual_replicas->end()); BOOST_REQUIRE_EQUAL(expected_set, actual_set); }; - auto check_no_endpoints = [&](int64_t t, + auto check_no_endpoints = [](mutable_vnode_erm_ptr erm, int64_t t, seastar::compat::source_location sl = seastar::compat::source_location::current()) { BOOST_TEST_INFO("line: " << sl.line()); - BOOST_REQUIRE(!token_metadata->endpoints_for_reading(dht::token::from_int64(t), ks_name).has_value()); + BOOST_REQUIRE(!erm->get_endpoints_for_reading(dht::token::from_int64(t)).has_value()); }; - token_metadata->update_pending_ranges(*replication_strategy, ks_name, get_dc_rack_fn).get(); - check_no_endpoints(2); + { + token_metadata->update_topology_change_info(get_dc_rack_fn).get(); + auto erm = calculate_effective_replication_map(replication_strategy, token_metadata).get0(); + check_no_endpoints(erm, 2); + } - token_metadata->set_read_new(locator::token_metadata::read_new_t::yes); - token_metadata->update_pending_ranges(*replication_strategy, ks_name, get_dc_rack_fn).get(); + { + token_metadata->set_read_new(locator::token_metadata::read_new_t::yes); + token_metadata->update_topology_change_info(get_dc_rack_fn).get(); + auto erm = calculate_effective_replication_map(replication_strategy, token_metadata).get0(); - check_endpoints(2, {e3, e1}); - check_endpoints(10, {e3, e1}); - check_endpoints(11, {e1, e2}); - check_endpoints(100, {e1, e2}); - check_no_endpoints(101); - check_no_endpoints(1001); - check_no_endpoints(1); + check_endpoints(erm, 2, {e3, e1}); + check_endpoints(erm, 10, {e3, e1}); + check_endpoints(erm, 11, {e1, e2}); + check_endpoints(erm, 100, {e1, e2}); + check_no_endpoints(erm, 101); + check_no_endpoints(erm, 1001); + check_no_endpoints(erm, 1); + } } SEASTAR_THREAD_TEST_CASE(test_replace_node_with_same_endpoint) { @@ -258,8 +270,9 @@ SEASTAR_THREAD_TEST_CASE(test_replace_node_with_same_endpoint) { }); token_metadata->update_normal_tokens({t1}, e1).get(); token_metadata->add_replacing_endpoint(e1, e1); - token_metadata->update_pending_ranges(*replication_strategy, ks_name, get_dc_rack_fn).get(); - BOOST_REQUIRE_EQUAL(token_metadata->pending_endpoints_for(dht::token::from_int64(1), ks_name), + token_metadata->update_topology_change_info(get_dc_rack_fn).get(); + auto erm = calculate_effective_replication_map(replication_strategy, token_metadata).get0(); + BOOST_REQUIRE_EQUAL(erm->get_pending_endpoints(dht::token::from_int64(1)), inet_address_vector_topology_change{e1}); BOOST_REQUIRE_EQUAL(token_metadata->get_endpoint(t1), e1); }