From 7017ad6822df70053a6bf31ea64e02e551ab9eb0 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Tue, 12 Jul 2022 14:29:26 +0300 Subject: [PATCH] abstract_replication_strategy: calculate_natural_endpoints: return endpoint_set So it could be used also for easily searching for an endpoint. Signed-off-by: Benny Halevy --- locator/abstract_replication_strategy.cc | 5 +- locator/abstract_replication_strategy.hh | 5 +- locator/everywhere_replication_strategy.cc | 5 +- locator/everywhere_replication_strategy.hh | 2 +- locator/local_strategy.cc | 4 +- locator/local_strategy.hh | 2 +- locator/network_topology_strategy.cc | 8 +-- locator/network_topology_strategy.hh | 2 +- locator/simple_strategy.cc | 8 +-- locator/simple_strategy.hh | 2 +- repair/repair.cc | 6 +- test/boost/network_topology_strategy_test.cc | 8 +-- utils/sequenced_set.hh | 71 ++++++++++++++++++++ 13 files changed, 101 insertions(+), 27 deletions(-) diff --git a/locator/abstract_replication_strategy.cc b/locator/abstract_replication_strategy.cc index 00239d696a..d7e02f46e0 100644 --- a/locator/abstract_replication_strategy.cc +++ b/locator/abstract_replication_strategy.cc @@ -286,7 +286,7 @@ abstract_replication_strategy::get_range_addresses(const token_metadata& tm) con dht::token_range_vector ranges = tm.get_primary_ranges_for(t); auto eps = co_await calculate_natural_endpoints(t, tm); for (auto& r : ranges) { - ret.emplace(r, eps); + ret.emplace(r, eps.get_vector()); } } co_return ret; @@ -314,7 +314,8 @@ future calculate_effective_replication_ma replication_map replication_map; for (const auto &t : tmptr->sorted_tokens()) { - replication_map.emplace(t, co_await rs->calculate_natural_endpoints(t, *tmptr)); + auto eps = co_await rs->calculate_natural_endpoints(t, *tmptr); + replication_map.emplace(t, eps.get_vector()); } auto rf = rs->get_replication_factor(*tmptr); diff --git a/locator/abstract_replication_strategy.hh b/locator/abstract_replication_strategy.hh index 5531a8a6b8..9b933ea78f 100644 --- a/locator/abstract_replication_strategy.hh +++ b/locator/abstract_replication_strategy.hh @@ -18,6 +18,7 @@ #include "snitch_base.hh" #include #include "utils/maybe_yield.hh" +#include "utils/sequenced_set.hh" // forward declaration since replica/database.hh includes this file namespace replica { @@ -44,6 +45,8 @@ using replication_strategy_config_options = std::map; using replication_map = std::unordered_map; +using endpoint_set = utils::basic_sequenced_set; + class effective_replication_map; class effective_replication_map_factory; @@ -80,7 +83,7 @@ public: // is small, that implementation may not yield since by itself it won't cause a reactor stall (assuming practical // cluster sizes and number of tokens per node). The caller is responsible for yielding if they call this function // in a loop. - virtual future calculate_natural_endpoints(const token& search_token, const token_metadata& tm) const = 0; + virtual future calculate_natural_endpoints(const token& search_token, const token_metadata& tm) const = 0; virtual ~abstract_replication_strategy() {} static ptr_type create_replication_strategy(const sstring& strategy_name, const replication_strategy_config_options& config_options); diff --git a/locator/everywhere_replication_strategy.cc b/locator/everywhere_replication_strategy.cc index 3a3d64b9ea..de8e5da2ea 100644 --- a/locator/everywhere_replication_strategy.cc +++ b/locator/everywhere_replication_strategy.cc @@ -19,8 +19,9 @@ namespace locator { everywhere_replication_strategy::everywhere_replication_strategy(const replication_strategy_config_options& config_options) : abstract_replication_strategy(config_options, replication_strategy_type::everywhere_topology) {} -future everywhere_replication_strategy::calculate_natural_endpoints(const token& search_token, const token_metadata& tm) const { - return make_ready_future(boost::copy_range(tm.get_all_endpoints())); +future everywhere_replication_strategy::calculate_natural_endpoints(const token& search_token, const token_metadata& tm) const { + auto eps = tm.get_all_endpoints(); + return make_ready_future(endpoint_set(eps.begin(), eps.end())); } size_t everywhere_replication_strategy::get_replication_factor(const token_metadata& tm) const { diff --git a/locator/everywhere_replication_strategy.hh b/locator/everywhere_replication_strategy.hh index 65f1490f8f..cd80502071 100644 --- a/locator/everywhere_replication_strategy.hh +++ b/locator/everywhere_replication_strategy.hh @@ -18,7 +18,7 @@ class everywhere_replication_strategy : public abstract_replication_strategy { public: everywhere_replication_strategy(const replication_strategy_config_options& config_options); - virtual future calculate_natural_endpoints(const token& search_token, const token_metadata& tm) const override; + virtual future calculate_natural_endpoints(const token& search_token, const token_metadata& tm) const override; virtual void validate_options() const override { /* noop */ } diff --git a/locator/local_strategy.cc b/locator/local_strategy.cc index 591fdd14ae..cd09772d38 100644 --- a/locator/local_strategy.cc +++ b/locator/local_strategy.cc @@ -17,8 +17,8 @@ namespace locator { local_strategy::local_strategy(const replication_strategy_config_options& config_options) : abstract_replication_strategy(config_options, replication_strategy_type::local) {} -future local_strategy::calculate_natural_endpoints(const token& t, const token_metadata& tm) const { - return make_ready_future(inet_address_vector_replica_set({utils::fb_utilities::get_broadcast_address()})); +future local_strategy::calculate_natural_endpoints(const token& t, const token_metadata& tm) const { + return make_ready_future(endpoint_set({utils::fb_utilities::get_broadcast_address()})); } void local_strategy::validate_options() const { diff --git a/locator/local_strategy.hh b/locator/local_strategy.hh index 3b93c1543d..6f1bbb2654 100644 --- a/locator/local_strategy.hh +++ b/locator/local_strategy.hh @@ -27,7 +27,7 @@ public: virtual ~local_strategy() {}; virtual size_t get_replication_factor(const token_metadata&) const override; - virtual future calculate_natural_endpoints(const token& search_token, const token_metadata& tm) const override; + virtual future calculate_natural_endpoints(const token& search_token, const token_metadata& tm) const override; virtual void validate_options() const override; diff --git a/locator/network_topology_strategy.cc b/locator/network_topology_strategy.cc index 8bac1bf245..3f42aeaab5 100644 --- a/locator/network_topology_strategy.cc +++ b/locator/network_topology_strategy.cc @@ -14,7 +14,6 @@ #include #include "locator/network_topology_strategy.hh" -#include "utils/sequenced_set.hh" #include #include "utils/hash.hh" @@ -75,7 +74,6 @@ network_topology_strategy::network_topology_strategy( } } -using endpoint_set = utils::sequenced_set; using endpoint_dc_rack_set = std::unordered_set; class natural_endpoints_tracker { @@ -234,12 +232,12 @@ public: return _dcs_to_fill == 0; } - const endpoint_set& replicas() const noexcept { + endpoint_set& replicas() noexcept { return _replicas; } }; -future +future network_topology_strategy::calculate_natural_endpoints( const token& search_token, const token_metadata& tm) const { @@ -254,7 +252,7 @@ network_topology_strategy::calculate_natural_endpoints( } } - co_return boost::copy_range(tracker.replicas().get_vector()); + co_return std::move(tracker.replicas()); } void network_topology_strategy::validate_options() const { diff --git a/locator/network_topology_strategy.hh b/locator/network_topology_strategy.hh index fa4c15f047..f0cb13431c 100644 --- a/locator/network_topology_strategy.hh +++ b/locator/network_topology_strategy.hh @@ -44,7 +44,7 @@ protected: * calculate endpoints in one pass through the tokens by tracking our * progress in each DC, rack etc. */ - virtual future calculate_natural_endpoints( + virtual future calculate_natural_endpoints( const token& search_token, const token_metadata& tm) const override; virtual void validate_options() const override; diff --git a/locator/simple_strategy.cc b/locator/simple_strategy.cc index 349e82b3a2..6d5644b853 100644 --- a/locator/simple_strategy.cc +++ b/locator/simple_strategy.cc @@ -33,15 +33,15 @@ simple_strategy::simple_strategy(const replication_strategy_config_options& conf } } -future simple_strategy::calculate_natural_endpoints(const token& t, const token_metadata& tm) const { +future simple_strategy::calculate_natural_endpoints(const token& t, const token_metadata& tm) const { const std::vector& tokens = tm.sorted_tokens(); if (tokens.empty()) { - co_return inet_address_vector_replica_set(); + co_return endpoint_set(); } size_t replicas = _replication_factor; - utils::sequenced_set endpoints; + endpoint_set endpoints; endpoints.reserve(replicas); for (auto& token : tm.ring_range(t)) { @@ -60,7 +60,7 @@ future simple_strategy::calculate_natural_endpo co_await coroutine::maybe_yield(); } - co_return boost::copy_range(endpoints.get_vector()); + co_return endpoints; } size_t simple_strategy::get_replication_factor(const token_metadata&) const { diff --git a/locator/simple_strategy.hh b/locator/simple_strategy.hh index 7ea6597146..d6cd4e6c3c 100644 --- a/locator/simple_strategy.hh +++ b/locator/simple_strategy.hh @@ -26,7 +26,7 @@ public: return true; } - virtual future calculate_natural_endpoints(const token& search_token, const token_metadata& tm) const override; + virtual future calculate_natural_endpoints(const token& search_token, const token_metadata& tm) const override; private: size_t _replication_factor = 1; }; diff --git a/repair/repair.cc b/repair/repair.cc index a80706dd00..b167b0565b 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -1559,7 +1559,7 @@ future<> repair_service::do_decommission_removenode_with_repair(locator::token_m rlogger.info("{}: started with keyspace={}, leaving_node={}, nr_ranges={}", op, keyspace_name, leaving_node, ranges.size() * nr_tables); size_t nr_ranges_total = ranges.size() * nr_tables; size_t nr_ranges_skipped = 0; - std::unordered_map current_replica_endpoints; + std::unordered_map current_replica_endpoints; // Find (for each range) all nodes that store replicas for these ranges as well for (auto& r : ranges) { auto end_token = r.end() ? r.end()->value() : dht::maximum_token(); @@ -1583,8 +1583,8 @@ future<> repair_service::do_decommission_removenode_with_repair(locator::token_m ops->check_abort(); } auto end_token = r.end() ? r.end()->value() : dht::maximum_token(); - const inet_address_vector_replica_set new_eps = ks.get_replication_strategy().calculate_natural_endpoints(end_token, temp).get0(); - const inet_address_vector_replica_set& current_eps = current_replica_endpoints[r]; + const auto new_eps = ks.get_replication_strategy().calculate_natural_endpoints(end_token, temp).get0(); + const auto& current_eps = current_replica_endpoints[r]; std::unordered_set neighbors_set(new_eps.begin(), new_eps.end()); bool skip_this_range = false; auto new_owner = neighbors_set; diff --git a/test/boost/network_topology_strategy_test.cc b/test/boost/network_topology_strategy_test.cc index cc909fb581..9ff3af75d5 100644 --- a/test/boost/network_topology_strategy_test.cc +++ b/test/boost/network_topology_strategy_test.cc @@ -369,7 +369,7 @@ static bool has_sufficient_replicas( return true; } -static std::vector calculate_natural_endpoints( +static locator::endpoint_set calculate_natural_endpoints( const token& search_token, const token_metadata& tm, snitch_ptr& snitch, const std::unordered_map& datacenters) { @@ -377,7 +377,7 @@ static std::vector calculate_natural_endpoints( // We want to preserve insertion order so that the first added endpoint // becomes primary. // - utils::sequenced_set replicas; + locator::endpoint_set replicas; // replicas we have found in each DC std::unordered_map> dc_replicas; @@ -388,7 +388,7 @@ static std::vector calculate_natural_endpoints( // when we relax the rack uniqueness we can append this to the current // result so we don't have to wind back the iterator // - std::unordered_map> + std::unordered_map skipped_dc_endpoints; // @@ -477,7 +477,7 @@ static std::vector calculate_natural_endpoints( } } - return std::move(replicas.get_vector()); + return replicas; } // Called in a seastar thread. diff --git a/utils/sequenced_set.hh b/utils/sequenced_set.hh index 8f2a72e0bc..566db42cc1 100644 --- a/utils/sequenced_set.hh +++ b/utils/sequenced_set.hh @@ -11,6 +11,7 @@ #include #include #include +#include namespace utils { /** @@ -28,6 +29,36 @@ public: using iterator = typename VectorType::iterator; using const_iterator = typename VectorType::const_iterator; + basic_sequenced_set() = default; + + basic_sequenced_set(std::initializer_list init) + : _set(init) + , _vec(init) + { } + + explicit basic_sequenced_set(VectorType v) + : _set(v.begin(), v.end()) + , _vec(std::move(v)) + { } + + template + explicit basic_sequenced_set(InputIt first, InputIt last) + : _set(first, last) + , _vec(first, last) + { } + + const T& operator[](size_t i) const noexcept { + return _vec[i]; + } + + T& operator[](size_t i) noexcept { + return _vec[i]; + } + + bool empty() const noexcept { + return _vec.empty(); + } + void push_back(const T& val) { insert(val); } @@ -74,6 +105,22 @@ public: return _vec.cend(); } + auto& front() const noexcept { + return _vec.front(); + } + + auto& front() noexcept { + return _vec.front(); + } + + auto& back() const noexcept { + return _vec.back(); + } + + auto& back() noexcept { + return _vec.back(); + } + const auto& get_vector() const noexcept { return _vec; } @@ -83,6 +130,22 @@ public: _vec.reserve(sz); } + iterator erase(const_iterator pos) { + auto val = *pos; + auto it = _vec.erase(pos); + _set.erase(val); + return it; + } + + // The implementation is not exception safe + // so mark the method noexcept to terminate in case anything throws + iterator erase(const_iterator first, const_iterator last) noexcept { + for (auto it = first; it != last; ++it) { + _set.erase(*it); + } + return _vec.erase(first, last); + } + private: std::unordered_set _set; VectorType _vec; @@ -93,3 +156,11 @@ using sequenced_set = basic_sequenced_set>; } // namespace utils +namespace std { + +template +ostream& operator<<(ostream& os, const utils::basic_sequenced_set& s) { + return os << s.get_vector(); +} + +} // namespace std