diff --git a/inet_address_vectors.hh b/inet_address_vectors.hh index 859b448adf..5408a51be4 100644 --- a/inet_address_vectors.hh +++ b/inet_address_vectors.hh @@ -9,8 +9,13 @@ #pragma once #include "gms/inet_address.hh" +#include "locator/host_id.hh" #include "utils/small_vector.hh" using inet_address_vector_replica_set = utils::small_vector; using inet_address_vector_topology_change = utils::small_vector; + +using host_id_vector_replica_set = utils::small_vector; + +using host_id_vector_topology_change = utils::small_vector; diff --git a/locator/abstract_replication_strategy.cc b/locator/abstract_replication_strategy.cc index fbdb579977..123a0b7a23 100644 --- a/locator/abstract_replication_strategy.cc +++ b/locator/abstract_replication_strategy.cc @@ -19,6 +19,18 @@ namespace locator { +static endpoint_set resolve_endpoints(const host_id_set& host_ids, const token_metadata2& tm) { + endpoint_set result{}; + result.reserve(host_ids.size()); + for (const auto& host_id: host_ids) { + // Empty host_id is used as a marker for local address. + // The reason for this hack is that we need local_strategy to + // work before the local host_id is loaded from the system.local table. + result.push_back(host_id ? tm.get_endpoint_for_host_id(host_id) : tm.get_topology().my_address()); + } + return result; +} + logging::logger rslogger("replication_strategy"); abstract_replication_strategy::abstract_replication_strategy( @@ -56,6 +68,11 @@ void abstract_replication_strategy::validate_replication_strategy(const sstring& } } +future abstract_replication_strategy::calculate_natural_ips(const token& search_token, const token_metadata2_ptr& tm) const { + const auto host_ids = co_await calculate_natural_endpoints(search_token, token_metadata(tm), true); + co_return resolve_endpoints(get(host_ids), *tm); +} + using strategy_class_registry = class_registry< locator::abstract_replication_strategy, const locator::replication_strategy_config_options&>; @@ -261,7 +278,7 @@ abstract_replication_strategy::get_ranges(inet_address ep, const token_metadata& // Using the common path would make the function quadratic in the number of endpoints. should_add = true; } else { - auto eps = co_await calculate_natural_endpoints(tok, tm); + auto eps = get(co_await calculate_natural_endpoints(tok, tm, false)); should_add = eps.contains(ep); } if (should_add) { @@ -326,7 +343,7 @@ abstract_replication_strategy::get_range_addresses(const token_metadata& tm) con std::unordered_map ret; for (auto& t : tm.sorted_tokens()) { dht::token_range_vector ranges = tm.get_primary_ranges_for(t); - auto eps = co_await calculate_natural_endpoints(t, tm); + auto eps = get(co_await calculate_natural_endpoints(t, tm, false)); for (auto& r : ranges) { ret.emplace(r, eps.get_vector()); } @@ -341,7 +358,7 @@ abstract_replication_strategy::get_pending_address_ranges(const token_metadata_p temp.update_topology(pending_address, std::move(dr)); co_await temp.update_normal_tokens(pending_tokens, pending_address); for (const auto& t : temp.sorted_tokens()) { - auto eps = co_await calculate_natural_endpoints(t, temp); + auto eps = get(co_await calculate_natural_endpoints(t, temp, false)); if (eps.contains(pending_address)) { dht::token_range_vector r = temp.get_primary_ranges_for(t); rslogger.debug("get_pending_address_ranges: token={} primary_range={} endpoint={}", t, r, pending_address); @@ -372,8 +389,8 @@ future calculate_effective_replicat const auto token = all_tokens[i]; - auto current_endpoints = co_await rs->calculate_natural_endpoints(token, base_token_metadata); - auto target_endpoints = co_await rs->calculate_natural_endpoints(token, *topology_changes->target_token_metadata); + auto current_endpoints = get(co_await rs->calculate_natural_endpoints(token, base_token_metadata, false)); + auto target_endpoints = get(co_await rs->calculate_natural_endpoints(token, *topology_changes->target_token_metadata, false)); auto add_mapping = [&](ring_mapping& target, std::unordered_set&& endpoints) { using interval = ring_mapping::interval_type; @@ -422,11 +439,11 @@ future calculate_effective_replicat } } else if (depend_on_token) { for (const auto &t : sorted_tokens) { - auto eps = co_await rs->calculate_natural_endpoints(t, *tmptr); + auto eps = get(co_await rs->calculate_natural_endpoints(t, *tmptr, false)); replication_map.emplace(t, std::move(eps).extract_vector()); } } else { - auto eps = co_await rs->calculate_natural_endpoints(default_replication_map_key, *tmptr); + auto eps = get(co_await rs->calculate_natural_endpoints(default_replication_map_key, *tmptr, false)); replication_map.emplace(default_replication_map_key, std::move(eps).extract_vector()); } diff --git a/locator/abstract_replication_strategy.hh b/locator/abstract_replication_strategy.hh index 5af70446a6..f296de7910 100644 --- a/locator/abstract_replication_strategy.hh +++ b/locator/abstract_replication_strategy.hh @@ -53,12 +53,19 @@ using replication_strategy_config_options = std::map; using replication_map = std::unordered_map; using endpoint_set = utils::basic_sequenced_set; +using host_id_set = utils::basic_sequenced_set; +using natural_ep_type = std::variant; +template +using set_type = std::conditional_t, endpoint_set, host_id_set>; +template +using vector_type = std::conditional_t, inet_address_vector_replica_set, host_id_vector_replica_set>; class vnode_effective_replication_map; class effective_replication_map_factory; class per_table_replication_strategy; class tablet_aware_replication_strategy; + class abstract_replication_strategy : public seastar::enable_shared_from_this { friend class vnode_effective_replication_map; friend class per_table_replication_strategy; @@ -85,6 +92,20 @@ protected: rslogger.debug(fmt, std::forward(args)...); } + template + static NodeId get_self_id(const generic_token_metadata& tm) { + if constexpr(std::is_same_v) { + return tm.get_topology().my_address(); + } else { + return NodeId{}; + } + } + + template + static future select_tm(Func&& func, const token_metadata& tm, bool use_host_id) { + return use_host_id ? func(*tm.template get_new()) : func(tm); + } + public: using ptr_type = seastar::shared_ptr; @@ -101,7 +122,8 @@ public: // is small, that implementation may not yield since by itself it won't cause a reactor stall (assuming practical // cluster sizes and number of tokens per node). The caller is responsible for yielding if they call this function // in a loop. - virtual future calculate_natural_endpoints(const token& search_token, const token_metadata& tm) const = 0; + virtual future calculate_natural_endpoints(const token& search_token, const token_metadata& tm, bool use_host_id) const = 0; + future calculate_natural_ips(const token& search_token, const token_metadata2_ptr& tm) const; virtual ~abstract_replication_strategy() {} static ptr_type create_replication_strategy(const sstring& strategy_name, const replication_strategy_config_options& config_options); diff --git a/locator/everywhere_replication_strategy.cc b/locator/everywhere_replication_strategy.cc index 3bf75f2021..1b88258d41 100644 --- a/locator/everywhere_replication_strategy.cc +++ b/locator/everywhere_replication_strategy.cc @@ -20,13 +20,15 @@ everywhere_replication_strategy::everywhere_replication_strategy(const replicati _natural_endpoints_depend_on_token = false; } -future everywhere_replication_strategy::calculate_natural_endpoints(const token& search_token, const token_metadata& tm) const { +future everywhere_replication_strategy::calculate_natural_endpoints(const token& search_token, const token_metadata& tm, bool use_host_id) const { + return select_tm([this](const generic_token_metadata& tm) -> future { if (tm.sorted_tokens().empty()) { - endpoint_set result{inet_address_vector_replica_set({tm.get_topology().my_address()})}; - return make_ready_future(std::move(result)); + set_type result{vector_type({this->get_self_id(tm)})}; + return make_ready_future(std::move(result)); } const auto& all_endpoints = tm.get_all_endpoints(); - return make_ready_future(endpoint_set(all_endpoints.begin(), all_endpoints.end())); + return make_ready_future(set_type(all_endpoints.begin(), all_endpoints.end())); + }, tm, use_host_id); } size_t everywhere_replication_strategy::get_replication_factor(const token_metadata& tm) const { diff --git a/locator/everywhere_replication_strategy.hh b/locator/everywhere_replication_strategy.hh index a3cd8ab134..ada8ea81ee 100644 --- a/locator/everywhere_replication_strategy.hh +++ b/locator/everywhere_replication_strategy.hh @@ -18,7 +18,7 @@ class everywhere_replication_strategy : public abstract_replication_strategy { public: everywhere_replication_strategy(const replication_strategy_config_options& config_options); - virtual future calculate_natural_endpoints(const token& search_token, const token_metadata& tm) const override; + virtual future calculate_natural_endpoints(const token& search_token, const token_metadata& tm, bool host_id) const override; virtual void validate_options(const gms::feature_service&) const override { /* noop */ } diff --git a/locator/local_strategy.cc b/locator/local_strategy.cc index 8f213489c4..34cc010aff 100644 --- a/locator/local_strategy.cc +++ b/locator/local_strategy.cc @@ -18,8 +18,10 @@ local_strategy::local_strategy(const replication_strategy_config_options& config _natural_endpoints_depend_on_token = false; } -future local_strategy::calculate_natural_endpoints(const token& t, const token_metadata& tm) const { - return make_ready_future(endpoint_set({tm.get_topology().my_address()})); +future local_strategy::calculate_natural_endpoints(const token& t, const token_metadata& tm, bool use_host_id) const { + return select_tm([this](const generic_token_metadata& tm) -> future { + return make_ready_future(set_type({this->get_self_id(tm)})); + }, tm, use_host_id); } void local_strategy::validate_options(const gms::feature_service&) const { diff --git a/locator/local_strategy.hh b/locator/local_strategy.hh index 60e58e1d4b..e87085e45d 100644 --- a/locator/local_strategy.hh +++ b/locator/local_strategy.hh @@ -27,7 +27,7 @@ public: virtual ~local_strategy() {}; virtual size_t get_replication_factor(const token_metadata&) const override; - virtual future calculate_natural_endpoints(const token& search_token, const token_metadata& tm) const override; + virtual future calculate_natural_endpoints(const token& search_token, const token_metadata& tm, bool host_id) const override; virtual void validate_options(const gms::feature_service&) const override; diff --git a/locator/network_topology_strategy.cc b/locator/network_topology_strategy.cc index e320fe96ca..7dbf89e866 100644 --- a/locator/network_topology_strategy.cc +++ b/locator/network_topology_strategy.cc @@ -76,13 +76,14 @@ network_topology_strategy::network_topology_strategy( using endpoint_dc_rack_set = std::unordered_set; +template class natural_endpoints_tracker { /** * Endpoint adder applying the replication rules for a given DC. */ struct data_center_endpoints { /** List accepted endpoints get pushed into. */ - endpoint_set& _endpoints; + set_type& _endpoints; /** * Racks encountered so far. Replicas are put into separate racks while possible. @@ -95,7 +96,7 @@ class natural_endpoints_tracker { size_t _rf_left; ssize_t _acceptable_rack_repeats; - data_center_endpoints(size_t rf, size_t rack_count, size_t node_count, endpoint_set& endpoints, endpoint_dc_rack_set& racks) + data_center_endpoints(size_t rf, size_t rack_count, size_t node_count, set_type& endpoints, endpoint_dc_rack_set& racks) : _endpoints(endpoints) , _racks(racks) // If there aren't enough nodes in this DC to fill the RF, the number of nodes is the effective RF. @@ -109,7 +110,7 @@ class natural_endpoints_tracker { * Attempts to add an endpoint to the replicas for this datacenter, adding to the endpoints set if successful. * Returns true if the endpoint was added, and this datacenter does not require further replicas. */ - bool add_endpoint_and_check_if_done(const inet_address& ep, const endpoint_dc_rack& location) { + bool add_endpoint_and_check_if_done(const NodeId& ep, const endpoint_dc_rack& location) { if (done()) { return false; } @@ -160,7 +161,7 @@ class natural_endpoints_tracker { } }; - const token_metadata& _tm; + const generic_token_metadata& _tm; const topology& _tp; std::unordered_map _dc_rep_factor; @@ -168,7 +169,7 @@ class natural_endpoints_tracker { // We want to preserve insertion order so that the first added endpoint // becomes primary. // - endpoint_set _replicas; + set_type _replicas; // tracks the racks we have already placed replicas in endpoint_dc_rack_set _seen_racks; @@ -189,7 +190,7 @@ class natural_endpoints_tracker { size_t _dcs_to_fill; public: - natural_endpoints_tracker(const token_metadata& tm, const std::unordered_map& dc_rep_factor) + natural_endpoints_tracker(const generic_token_metadata& tm, const std::unordered_map& dc_rep_factor) : _tm(tm) , _tp(_tm.get_topology()) , _dc_rep_factor(dc_rep_factor) @@ -219,7 +220,7 @@ public: } } - bool add_endpoint_and_check_if_done(inet_address ep) { + bool add_endpoint_and_check_if_done(NodeId ep) { auto& loc = _tp.get_location(ep); auto i = _dcs.find(loc.dc); if (i != _dcs.end() && i->second.add_endpoint_and_check_if_done(ep, loc)) { @@ -232,27 +233,29 @@ public: return _dcs_to_fill == 0; } - endpoint_set& replicas() noexcept { + set_type& replicas() noexcept { return _replicas; } }; -future +future network_topology_strategy::calculate_natural_endpoints( - const token& search_token, const token_metadata& tm) const { + const token& search_token, const token_metadata& tm, bool use_host_id) const { - natural_endpoints_tracker tracker(tm, _dc_rep_factor); + return select_tm([&](const generic_token_metadata& tm) -> future { + natural_endpoints_tracker tracker(tm, _dc_rep_factor); for (auto& next : tm.ring_range(search_token)) { co_await coroutine::maybe_yield(); - inet_address ep = *tm.get_endpoint(next); + NodeId ep = *tm.get_endpoint(next); if (tracker.add_endpoint_and_check_if_done(ep)) { break; } } co_return std::move(tracker.replicas()); + }, tm, use_host_id); } void network_topology_strategy::validate_options(const gms::feature_service& fs) const { @@ -306,7 +309,7 @@ future network_topology_strategy::allocate_tablets_for_new_table(sch auto token_range = tm->ring_range(dht::token::get_random_token()); for (tablet_id tb : tablets.tablet_ids()) { - natural_endpoints_tracker tracker(*tm, _dc_rep_factor); + natural_endpoints_tracker tracker(*tm, _dc_rep_factor); while (true) { co_await coroutine::maybe_yield(); diff --git a/locator/network_topology_strategy.hh b/locator/network_topology_strategy.hh index 3f704c98b8..57b8ce1ad9 100644 --- a/locator/network_topology_strategy.hh +++ b/locator/network_topology_strategy.hh @@ -50,8 +50,8 @@ protected: * calculate endpoints in one pass through the tokens by tracking our * progress in each DC, rack etc. */ - virtual future calculate_natural_endpoints( - const token& search_token, const token_metadata& tm) const override; + virtual future calculate_natural_endpoints( + const token& search_token, const token_metadata& tm, bool host_id) const override; virtual void validate_options(const gms::feature_service&) const override; diff --git a/locator/simple_strategy.cc b/locator/simple_strategy.cc index 81006dc800..af40382c28 100644 --- a/locator/simple_strategy.cc +++ b/locator/simple_strategy.cc @@ -33,15 +33,16 @@ simple_strategy::simple_strategy(const replication_strategy_config_options& conf } } -future simple_strategy::calculate_natural_endpoints(const token& t, const token_metadata& tm) const { +future simple_strategy::calculate_natural_endpoints(const token& t, const token_metadata& tm, bool use_host_id) const { + return select_tm([&](const generic_token_metadata& tm) -> future { const std::vector& tokens = tm.sorted_tokens(); if (tokens.empty()) { - co_return endpoint_set(); + co_return set_type{}; } size_t replicas = _replication_factor; - endpoint_set endpoints; + set_type endpoints; endpoints.reserve(replicas); for (auto& token : tm.ring_range(t)) { @@ -61,6 +62,7 @@ future simple_strategy::calculate_natural_endpoints(const token& t } co_return endpoints; + }, tm, use_host_id); } size_t simple_strategy::get_replication_factor(const token_metadata&) const { diff --git a/locator/simple_strategy.hh b/locator/simple_strategy.hh index a04e3e2ccd..427aa2a24b 100644 --- a/locator/simple_strategy.hh +++ b/locator/simple_strategy.hh @@ -26,7 +26,7 @@ public: return true; } - virtual future calculate_natural_endpoints(const token& search_token, const token_metadata& tm) const override; + virtual future calculate_natural_endpoints(const token& search_token, const token_metadata& tm, bool host_id) const override; private: size_t _replication_factor = 1; }; diff --git a/repair/repair.cc b/repair/repair.cc index 0363322339..92415b5f9e 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -1719,7 +1719,7 @@ future<> repair_service::do_decommission_removenode_with_repair(locator::token_m // Find (for each range) all nodes that store replicas for these ranges as well for (auto& r : ranges) { auto end_token = r.end() ? r.end()->value() : dht::maximum_token(); - auto eps = strat.calculate_natural_endpoints(end_token, *tmptr).get0(); + auto eps = get(strat.calculate_natural_endpoints(end_token, *tmptr, false).get0()); current_replica_endpoints.emplace(r, std::move(eps)); seastar::thread::maybe_yield(); } @@ -1738,7 +1738,7 @@ future<> repair_service::do_decommission_removenode_with_repair(locator::token_m ops->check_abort(); } auto end_token = r.end() ? r.end()->value() : dht::maximum_token(); - const auto new_eps = strat.calculate_natural_endpoints(end_token, temp).get0(); + const auto new_eps = get(strat.calculate_natural_endpoints(end_token, temp, false).get0()); const auto& current_eps = current_replica_endpoints[r]; std::unordered_set neighbors_set = new_eps.get_set(); bool skip_this_range = false; @@ -1929,7 +1929,7 @@ future<> repair_service::do_rebuild_replace_with_repair(locator::token_metadata_ auto& r = *it; seastar::thread::maybe_yield(); auto end_token = r.end() ? r.end()->value() : dht::maximum_token(); - auto neighbors = boost::copy_range>(strat.calculate_natural_endpoints(end_token, *tmptr).get0() | + auto neighbors = boost::copy_range>(get(strat.calculate_natural_endpoints(end_token, *tmptr, false).get0()) | boost::adaptors::filtered([myip, &source_dc, &topology, &ignore_nodes] (const gms::inet_address& node) { if (node == myip) { return false; diff --git a/service/storage_service.cc b/service/storage_service.cc index c31e77324b..a7afc1069e 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -5960,7 +5960,7 @@ storage_service::get_changed_ranges_for_leaving(locator::vnode_effective_replica const auto& rs = erm->get_replication_strategy(); for (auto& r : ranges) { auto end_token = r.end() ? r.end()->value() : dht::maximum_token(); - auto new_replica_endpoints = co_await rs.calculate_natural_endpoints(end_token, temp); + auto new_replica_endpoints = get(co_await rs.calculate_natural_endpoints(end_token, temp, false)); auto rg = current_replica_endpoints.equal_range(r); for (auto it = rg.first; it != rg.second; it++) { diff --git a/test/boost/network_topology_strategy_test.cc b/test/boost/network_topology_strategy_test.cc index 4f8368d308..c37016ddc5 100644 --- a/test/boost/network_topology_strategy_test.cc +++ b/test/boost/network_topology_strategy_test.cc @@ -671,7 +671,7 @@ static void test_equivalence(const shared_token_metadata& stm, const locator::to for (size_t i = 0; i < 1000; ++i) { auto token = dht::token::get_random_token(); auto expected = calculate_natural_endpoints(token, tm, topo, datacenters); - auto actual = nts.calculate_natural_endpoints(token, tm).get0(); + auto actual = get(nts.calculate_natural_endpoints(token, tm, false).get0()); // Because the old algorithm does not put the nodes in the correct order in the case where more replicas // are required than there are racks in a dc, we accept different order as long as the primary