diff --git a/alternator/ttl.cc b/alternator/ttl.cc index 889d675290..74274d3783 100644 --- a/alternator/ttl.cc +++ b/alternator/ttl.cc @@ -387,7 +387,7 @@ class token_ranges_owned_by_this_shard { class ranges_holder_primary { const dht::token_range_vector _token_ranges; public: - ranges_holder_primary(const locator::effective_replication_map_ptr& erm, gms::gossiper& g, gms::inet_address ep) + ranges_holder_primary(const locator::vnode_effective_replication_map_ptr& erm, gms::gossiper& g, gms::inet_address ep) : _token_ranges(erm->get_primary_ranges(ep)) {} std::size_t size() const { return _token_ranges.size(); } const dht::token_range& operator[](std::size_t i) const { diff --git a/dht/range_streamer.cc b/dht/range_streamer.cc index 55f6ba4a5f..df080d42d0 100644 --- a/dht/range_streamer.cc +++ b/dht/range_streamer.cc @@ -80,7 +80,7 @@ range_streamer::get_range_fetch_map(const std::unordered_map> -range_streamer::get_all_ranges_with_sources_for(const sstring& keyspace_name, locator::effective_replication_map_ptr erm, dht::token_range_vector desired_ranges) { +range_streamer::get_all_ranges_with_sources_for(const sstring& keyspace_name, locator::vnode_effective_replication_map_ptr erm, dht::token_range_vector desired_ranges) { logger.debug("{} ks={}", __func__, keyspace_name); auto range_addresses = erm->get_range_addresses().get0(); @@ -115,7 +115,7 @@ range_streamer::get_all_ranges_with_sources_for(const sstring& keyspace_name, lo // Must be called from a seastar thread std::unordered_map> -range_streamer::get_all_ranges_with_strict_sources_for(const sstring& keyspace_name, locator::effective_replication_map_ptr erm, dht::token_range_vector desired_ranges, gms::gossiper& gossiper) { +range_streamer::get_all_ranges_with_strict_sources_for(const sstring& keyspace_name, locator::vnode_effective_replication_map_ptr erm, dht::token_range_vector desired_ranges, gms::gossiper& gossiper) { logger.debug("{} ks={}", __func__, keyspace_name); assert (_tokens.empty() == false); @@ -183,7 +183,7 @@ range_streamer::get_all_ranges_with_strict_sources_for(const sstring& keyspace_n return range_sources; } -bool range_streamer::use_strict_sources_for_ranges(const sstring& keyspace_name, const locator::effective_replication_map_ptr& erm) { +bool range_streamer::use_strict_sources_for_ranges(const sstring& keyspace_name, const locator::vnode_effective_replication_map_ptr& erm) { auto rf = erm->get_replication_factor(); auto nr_nodes_in_ring = get_token_metadata().get_all_endpoints().size(); bool everywhere_topology = erm->get_replication_strategy().get_type() == locator::replication_strategy_type::everywhere_topology; @@ -214,7 +214,7 @@ void range_streamer::add_rx_ranges(const sstring& keyspace_name, std::unordered_ } // TODO: This is the legacy range_streamer interface, it is add_rx_ranges which adds rx ranges. -future<> range_streamer::add_ranges(const sstring& keyspace_name, locator::effective_replication_map_ptr erm, dht::token_range_vector ranges, gms::gossiper& gossiper, bool is_replacing) { +future<> range_streamer::add_ranges(const sstring& keyspace_name, locator::vnode_effective_replication_map_ptr erm, dht::token_range_vector ranges, gms::gossiper& gossiper, bool is_replacing) { return seastar::async([this, keyspace_name, erm = std::move(erm), ranges= std::move(ranges), &gossiper, is_replacing] () mutable { if (_nr_tx_added) { throw std::runtime_error("Mixed sending and receiving is not supported"); diff --git a/dht/range_streamer.hh b/dht/range_streamer.hh index e5d2655cfe..d0afc8d533 100644 --- a/dht/range_streamer.hh +++ b/dht/range_streamer.hh @@ -100,24 +100,24 @@ public: _source_filters.emplace(std::move(filter)); } - future<> add_ranges(const sstring& keyspace_name, locator::effective_replication_map_ptr erm, dht::token_range_vector ranges, gms::gossiper& gossiper, bool is_replacing); + future<> add_ranges(const sstring& keyspace_name, locator::vnode_effective_replication_map_ptr erm, dht::token_range_vector ranges, gms::gossiper& gossiper, bool is_replacing); void add_tx_ranges(const sstring& keyspace_name, std::unordered_map ranges_per_endpoint); void add_rx_ranges(const sstring& keyspace_name, std::unordered_map ranges_per_endpoint); private: - bool use_strict_sources_for_ranges(const sstring& keyspace_name, const locator::effective_replication_map_ptr& erm); + bool use_strict_sources_for_ranges(const sstring& keyspace_name, const locator::vnode_effective_replication_map_ptr& erm); /** * Get a map of all ranges and their respective sources that are candidates for streaming the given ranges * to us. For each range, the list of sources is sorted by proximity relative to the given destAddress. */ std::unordered_map> - get_all_ranges_with_sources_for(const sstring& keyspace_name, locator::effective_replication_map_ptr erm, dht::token_range_vector desired_ranges); + get_all_ranges_with_sources_for(const sstring& keyspace_name, locator::vnode_effective_replication_map_ptr erm, dht::token_range_vector desired_ranges); /** * Get a map of all ranges and the source that will be cleaned up once this bootstrapped node is added for the given ranges. * For each range, the list should only contain a single source. This allows us to consistently migrate data without violating * consistency. */ std::unordered_map> - get_all_ranges_with_strict_sources_for(const sstring& keyspace_name, locator::effective_replication_map_ptr erm, dht::token_range_vector desired_ranges, gms::gossiper& gossiper); + get_all_ranges_with_strict_sources_for(const sstring& keyspace_name, locator::vnode_effective_replication_map_ptr erm, dht::token_range_vector desired_ranges, gms::gossiper& gossiper); private: /** * @param rangesWithSources The ranges we want to fetch (key) and their potential sources (value) diff --git a/locator/abstract_replication_strategy.cc b/locator/abstract_replication_strategy.cc index 5f1b7d1a15..12921f248e 100644 --- a/locator/abstract_replication_strategy.cc +++ b/locator/abstract_replication_strategy.cc @@ -63,13 +63,13 @@ sstring abstract_replication_strategy::to_qualified_class_name(std::string_view return strategy_class_registry::to_qualified_class_name(strategy_class_name); } -inet_address_vector_replica_set abstract_replication_strategy::get_natural_endpoints(const token& search_token, const effective_replication_map& erm) const { +inet_address_vector_replica_set abstract_replication_strategy::get_natural_endpoints(const token& search_token, const vnode_effective_replication_map& erm) const { const token& key_token = erm.get_token_metadata_ptr()->first_token(search_token); auto res = erm.get_replication_map().find(key_token); return res->second; } -stop_iteration abstract_replication_strategy::for_each_natural_endpoint_until(const token& search_token, const effective_replication_map& erm, const noncopyable_function& func) const { +stop_iteration abstract_replication_strategy::for_each_natural_endpoint_until(const token& search_token, const vnode_effective_replication_map& erm, const noncopyable_function& func) const { const token& key_token = erm.get_token_metadata_ptr()->first_token(search_token); auto res = erm.get_replication_map().find(key_token); for (const auto& ep : res->second) { @@ -80,7 +80,7 @@ stop_iteration abstract_replication_strategy::for_each_natural_endpoint_until(co return stop_iteration::no; } -inet_address_vector_replica_set effective_replication_map::get_natural_endpoints_without_node_being_replaced(const token& search_token) const { +inet_address_vector_replica_set vnode_effective_replication_map::get_natural_endpoints_without_node_being_replaced(const token& search_token) const { inet_address_vector_replica_set natural_endpoints = get_natural_endpoints(search_token); if (_tmptr->is_any_node_being_replaced() && _rs->allow_remove_node_being_replaced_from_natural_endpoints()) { @@ -103,7 +103,7 @@ inet_address_vector_replica_set effective_replication_map::get_natural_endpoints return natural_endpoints; } -inet_address_vector_topology_change effective_replication_map::get_pending_endpoints(const token& search_token, const sstring& ks_name) const { +inet_address_vector_topology_change vnode_effective_replication_map::get_pending_endpoints(const token& search_token, const sstring& ks_name) const { return _tmptr->pending_endpoints_for(search_token, ks_name); } @@ -152,7 +152,7 @@ insert_token_range_to_sorted_container_while_unwrapping( } dht::token_range_vector -effective_replication_map::do_get_ranges(noncopyable_function consider_range_for_endpoint) const { +vnode_effective_replication_map::do_get_ranges(noncopyable_function consider_range_for_endpoint) const { dht::token_range_vector ret; const auto& tm = *_tmptr; const auto& sorted_tokens = tm.sorted_tokens(); @@ -174,7 +174,7 @@ effective_replication_map::do_get_ranges(noncopyable_functionget_topology(); sstring local_dc = topo.get_datacenter(ep); std::unordered_set local_dc_nodes = topo.get_datacenter_endpoints().at(local_dc); @@ -259,7 +259,7 @@ effective_replication_map::get_primary_ranges_within_dc(inet_address ep) const { } future> -effective_replication_map::get_range_addresses() const { +vnode_effective_replication_map::get_range_addresses() const { const token_metadata& tm = *_tmptr; std::unordered_map ret; for (const auto& [t, eps] : _replication_map) { @@ -303,7 +303,7 @@ abstract_replication_strategy::get_pending_address_ranges(const token_metadata_p co_return ret; } -future calculate_effective_replication_map(abstract_replication_strategy::ptr_type rs, token_metadata_ptr tmptr) { +future calculate_effective_replication_map(abstract_replication_strategy::ptr_type rs, token_metadata_ptr tmptr) { replication_map replication_map; const auto& sorted_tokens = tmptr->sorted_tokens(); @@ -327,7 +327,7 @@ future calculate_effective_replication_ma co_return make_effective_replication_map(std::move(rs), std::move(tmptr), std::move(replication_map), rf); } -future effective_replication_map::clone_endpoints_gently() const { +future vnode_effective_replication_map::clone_endpoints_gently() const { replication_map cloned_endpoints; for (auto& i : _replication_map) { @@ -338,20 +338,20 @@ future effective_replication_map::clone_endpoints_gently() cons co_return cloned_endpoints; } -inet_address_vector_replica_set effective_replication_map::get_natural_endpoints(const token& search_token) const { +inet_address_vector_replica_set vnode_effective_replication_map::get_natural_endpoints(const token& search_token) const { return _rs->get_natural_endpoints(search_token, *this); } -stop_iteration effective_replication_map::for_each_natural_endpoint_until(const token& search_token, const noncopyable_function& func) const { +stop_iteration vnode_effective_replication_map::for_each_natural_endpoint_until(const token& search_token, const noncopyable_function& func) const { return _rs->for_each_natural_endpoint_until(search_token, *this, func); } -future<> effective_replication_map::clear_gently() noexcept { +future<> vnode_effective_replication_map::clear_gently() noexcept { co_await utils::clear_gently(_replication_map); co_await utils::clear_gently(_tmptr); } -effective_replication_map::~effective_replication_map() { +vnode_effective_replication_map::~vnode_effective_replication_map() { if (is_registered()) { _factory->erase_effective_replication_map(this); try { @@ -368,13 +368,13 @@ effective_replication_map::~effective_replication_map() { } } -effective_replication_map::factory_key effective_replication_map::make_factory_key(const abstract_replication_strategy::ptr_type& rs, const token_metadata_ptr& tmptr) { +vnode_effective_replication_map::factory_key vnode_effective_replication_map::make_factory_key(const abstract_replication_strategy::ptr_type& rs, const token_metadata_ptr& tmptr) { return factory_key(rs->get_type(), rs->get_config_options(), tmptr->get_ring_version()); } -future effective_replication_map_factory::create_effective_replication_map(abstract_replication_strategy::ptr_type rs, token_metadata_ptr tmptr) { +future effective_replication_map_factory::create_effective_replication_map(abstract_replication_strategy::ptr_type rs, token_metadata_ptr tmptr) { // lookup key on local shard - auto key = effective_replication_map::make_factory_key(rs, tmptr); + auto key = vnode_effective_replication_map::make_factory_key(rs, tmptr); auto erm = find_effective_replication_map(key); if (erm) { rslogger.debug("create_effective_replication_map: found {} [{}]", key, fmt::ptr(erm.get())); @@ -385,11 +385,11 @@ future effective_replication_map_factory::create_ // TODO: // - use hash of key to distribute the load // - instaintiate only on NUMA nodes - auto ref_erm = co_await container().invoke_on(0, [key] (effective_replication_map_factory& ermf) -> future> { + auto ref_erm = co_await container().invoke_on(0, [key] (effective_replication_map_factory& ermf) -> future> { auto erm = ermf.find_effective_replication_map(key); - co_return make_foreign(std::move(erm)); + co_return make_foreign(std::move(erm)); }); - mutable_effective_replication_map_ptr new_erm; + mutable_vnode_effective_replication_map_ptr new_erm; if (ref_erm) { auto rf = ref_erm->get_replication_factor(); auto local_replication_map = co_await ref_erm->clone_endpoints_gently(); @@ -400,7 +400,7 @@ future effective_replication_map_factory::create_ co_return insert_effective_replication_map(std::move(new_erm), std::move(key)); } -effective_replication_map_ptr effective_replication_map_factory::find_effective_replication_map(const effective_replication_map::factory_key& key) const { +vnode_effective_replication_map_ptr effective_replication_map_factory::find_effective_replication_map(const vnode_effective_replication_map::factory_key& key) const { auto it = _effective_replication_maps.find(key); if (it != _effective_replication_maps.end()) { return it->second->shared_from_this(); @@ -408,7 +408,7 @@ effective_replication_map_ptr effective_replication_map_factory::find_effective_ return {}; } -effective_replication_map_ptr effective_replication_map_factory::insert_effective_replication_map(mutable_effective_replication_map_ptr erm, effective_replication_map::factory_key key) { +vnode_effective_replication_map_ptr effective_replication_map_factory::insert_effective_replication_map(mutable_vnode_effective_replication_map_ptr erm, vnode_effective_replication_map::factory_key key) { auto [it, inserted] = _effective_replication_maps.insert({key, erm.get()}); if (inserted) { rslogger.debug("insert_effective_replication_map: inserted {} [{}]", key, fmt::ptr(erm.get())); @@ -420,7 +420,7 @@ effective_replication_map_ptr effective_replication_map_factory::insert_effectiv return res; } -bool effective_replication_map_factory::erase_effective_replication_map(effective_replication_map* erm) { +bool effective_replication_map_factory::erase_effective_replication_map(vnode_effective_replication_map* erm) { const auto& key = erm->get_factory_key(); auto it = _effective_replication_maps.find(key); if (it == _effective_replication_maps.end()) { @@ -474,7 +474,7 @@ void effective_replication_map_factory::submit_background_work(future<> fut) { }); } -future<> global_effective_replication_map::get_keyspace_erms(sharded& sharded_db, std::string_view keyspace_name) { +future<> global_vnode_effective_replication_map::get_keyspace_erms(sharded& sharded_db, std::string_view keyspace_name) { return sharded_db.invoke_on(0, [this, &sharded_db, keyspace_name] (replica::database& db) -> future<> { // To ensure we get the same effective_replication_map // on all shards, acquire the shared_token_metadata lock. @@ -506,8 +506,8 @@ future<> global_effective_replication_map::get_keyspace_erms(sharded make_global_effective_replication_map(sharded& sharded_db, std::string_view keyspace_name) { - global_effective_replication_map ret; +future make_global_effective_replication_map(sharded& sharded_db, std::string_view keyspace_name) { + global_vnode_effective_replication_map ret; co_await ret.get_keyspace_erms(sharded_db, keyspace_name); co_return ret; } @@ -528,7 +528,7 @@ std::ostream& operator<<(std::ostream& os, locator::replication_strategy_type t) std::abort(); } -std::ostream& operator<<(std::ostream& os, const locator::effective_replication_map::factory_key& key) { +std::ostream& operator<<(std::ostream& os, const locator::vnode_effective_replication_map::factory_key& key) { os << key.rs_type; os << '.' << key.ring_version; char sep = ':'; diff --git a/locator/abstract_replication_strategy.hh b/locator/abstract_replication_strategy.hh index d99cacac33..9774aaeeca 100644 --- a/locator/abstract_replication_strategy.hh +++ b/locator/abstract_replication_strategy.hh @@ -49,11 +49,11 @@ using replication_map = std::unordered_map; -class effective_replication_map; +class vnode_effective_replication_map; class effective_replication_map_factory; class abstract_replication_strategy { - friend class effective_replication_map; + friend class vnode_effective_replication_map; protected: replication_strategy_config_options _config_options; replication_strategy_type _my_type; @@ -102,9 +102,9 @@ public: static sstring to_qualified_class_name(std::string_view strategy_class_name); - virtual inet_address_vector_replica_set get_natural_endpoints(const token& search_token, const effective_replication_map& erm) const; + virtual inet_address_vector_replica_set get_natural_endpoints(const token& search_token, const vnode_effective_replication_map& erm) const; // Returns the last stop_iteration result of the called func - virtual stop_iteration for_each_natural_endpoint_until(const token& search_token, const effective_replication_map& erm, const noncopyable_function& func) const; + virtual stop_iteration for_each_natural_endpoint_until(const token& search_token, const vnode_effective_replication_map& erm, const noncopyable_function& func) const; virtual void validate_options(const gms::feature_service&) const = 0; virtual std::optional> recognized_options(const topology&) const = 0; virtual size_t get_replication_factor(const token_metadata& tm) const = 0; @@ -131,7 +131,8 @@ public: // Holds the full replication_map resulting from applying the // effective replication strategy over the given token_metadata // and replication_strategy_config_options. -class effective_replication_map : public enable_lw_shared_from_this { +// Used for token-based replication strategies. +class vnode_effective_replication_map : public enable_shared_from_this { public: struct factory_key { replication_strategy_type rs_type; @@ -164,15 +165,15 @@ private: friend class abstract_replication_strategy; friend class effective_replication_map_factory; public: - explicit effective_replication_map(abstract_replication_strategy::ptr_type rs, token_metadata_ptr tmptr, replication_map replication_map, size_t replication_factor) noexcept + explicit vnode_effective_replication_map(abstract_replication_strategy::ptr_type rs, token_metadata_ptr tmptr, replication_map replication_map, size_t replication_factor) noexcept : _rs(std::move(rs)) , _tmptr(std::move(tmptr)) , _replication_map(std::move(replication_map)) , _replication_factor(replication_factor) { } - effective_replication_map() = delete; - effective_replication_map(effective_replication_map&&) = default; - ~effective_replication_map(); + vnode_effective_replication_map() = delete; + vnode_effective_replication_map(vnode_effective_replication_map&&) = default; + ~vnode_effective_replication_map(); const token_metadata& get_token_metadata() const noexcept { return *_tmptr; @@ -259,56 +260,61 @@ public: } }; -using effective_replication_map_ptr = lw_shared_ptr; -using mutable_effective_replication_map_ptr = lw_shared_ptr; +using vnode_effective_replication_map_ptr = shared_ptr; +using mutable_vnode_effective_replication_map_ptr = shared_ptr; +using vnode_erm_ptr = vnode_effective_replication_map_ptr; +using mutable_vnode_erm_ptr = mutable_vnode_effective_replication_map_ptr; +using effective_replication_map = vnode_effective_replication_map_ptr; +using mutatble_effective_replication_map = mutable_vnode_effective_replication_map_ptr; -inline mutable_effective_replication_map_ptr make_effective_replication_map(abstract_replication_strategy::ptr_type rs, token_metadata_ptr tmptr, replication_map replication_map, size_t replication_factor) { - return make_lw_shared(std::move(rs), std::move(tmptr), std::move(replication_map), replication_factor); +inline mutable_vnode_erm_ptr make_effective_replication_map(abstract_replication_strategy::ptr_type rs, token_metadata_ptr tmptr, replication_map replication_map, size_t replication_factor) { + return seastar::make_shared( + std::move(rs), std::move(tmptr), std::move(replication_map), replication_factor); } // Apply the replication strategy over the current configuration and the given token_metadata. -future calculate_effective_replication_map(abstract_replication_strategy::ptr_type rs, token_metadata_ptr tmptr); +future calculate_effective_replication_map(abstract_replication_strategy::ptr_type rs, token_metadata_ptr tmptr); // Class to hold a coherent view of a keyspace // effective replication map on all shards -class global_effective_replication_map { - std::vector> _erms; +class global_vnode_effective_replication_map { + std::vector> _erms; public: - global_effective_replication_map() : _erms(smp::count) {} - global_effective_replication_map(global_effective_replication_map&&) = default; - global_effective_replication_map& operator=(global_effective_replication_map&&) = default; + global_vnode_effective_replication_map() : _erms(smp::count) {} + global_vnode_effective_replication_map(global_vnode_effective_replication_map&&) = default; + global_vnode_effective_replication_map& operator=(global_vnode_effective_replication_map&&) = default; future<> get_keyspace_erms(sharded& sharded_db, std::string_view keyspace_name); - const effective_replication_map& get() const noexcept { + const vnode_effective_replication_map& get() const noexcept { return *_erms[this_shard_id()]; } - const effective_replication_map& operator*() const noexcept { + const vnode_effective_replication_map& operator*() const noexcept { return get(); } - const effective_replication_map* operator->() const noexcept { + const vnode_effective_replication_map* operator->() const noexcept { return &get(); } }; -future make_global_effective_replication_map(sharded& sharded_db, std::string_view keyspace_name); +future make_global_effective_replication_map(sharded& sharded_db, std::string_view keyspace_name); } // namespace locator std::ostream& operator<<(std::ostream& os, locator::replication_strategy_type); -std::ostream& operator<<(std::ostream& os, const locator::effective_replication_map::factory_key& key); +std::ostream& operator<<(std::ostream& os, const locator::vnode_effective_replication_map::factory_key& key); template <> -struct fmt::formatter { +struct fmt::formatter { constexpr auto parse(format_parse_context& ctx) { return ctx.end(); } template - auto format(const locator::effective_replication_map::factory_key& key, FormatContext& ctx) { + auto format(const locator::vnode_effective_replication_map::factory_key& key, FormatContext& ctx) { std::ostringstream os; os << key; return fmt::format_to(ctx.out(), "{}", os.str()); @@ -316,9 +322,9 @@ struct fmt::formatter { }; template<> -struct appending_hash { +struct appending_hash { template - void operator()(Hasher& h, const locator::effective_replication_map::factory_key& key) const { + void operator()(Hasher& h, const locator::vnode_effective_replication_map::factory_key& key) const { feed_hash(h, key.rs_type); feed_hash(h, key.ring_version); for (const auto& [opt, val] : key.rs_config_options) { @@ -344,10 +350,10 @@ struct factory_key_hasher : public hasher { namespace std { template <> -struct hash { - size_t operator()(const locator::effective_replication_map::factory_key& key) const { +struct hash { + size_t operator()(const locator::vnode_effective_replication_map::factory_key& key) const { factory_key_hasher h; - appending_hash{}(h, key); + appending_hash{}(h, key); return h.finalize(); } }; @@ -357,18 +363,18 @@ struct hash { namespace locator { class effective_replication_map_factory : public peering_sharded_service { - std::unordered_map _effective_replication_maps; + std::unordered_map _effective_replication_maps; future<> _background_work = make_ready_future<>(); bool _stopped = false; public: - // looks up the effective_replication_map on the local shard. + // looks up the vnode_effective_replication_map on the local shard. // If not found, tries to look one up for reference on shard 0 // so its replication map can be cloned. Otherwise, calculates the - // effective_replication_map for the local shard. + // vnode_effective_replication_map for the local shard. // // Therefore create should be called first on shard 0, then on all other shards. - future create_effective_replication_map(abstract_replication_strategy::ptr_type rs, token_metadata_ptr tmptr); + future create_effective_replication_map(abstract_replication_strategy::ptr_type rs, token_metadata_ptr tmptr); future<> stop() noexcept; @@ -377,14 +383,14 @@ public: } private: - effective_replication_map_ptr find_effective_replication_map(const effective_replication_map::factory_key& key) const; - effective_replication_map_ptr insert_effective_replication_map(mutable_effective_replication_map_ptr erm, effective_replication_map::factory_key key); + vnode_erm_ptr find_effective_replication_map(const vnode_effective_replication_map::factory_key& key) const; + vnode_erm_ptr insert_effective_replication_map(mutable_vnode_erm_ptr erm, vnode_effective_replication_map::factory_key key); - bool erase_effective_replication_map(effective_replication_map* erm); + bool erase_effective_replication_map(vnode_effective_replication_map* erm); void submit_background_work(future<> fut); - friend class effective_replication_map; + friend class vnode_effective_replication_map; }; } diff --git a/locator/everywhere_replication_strategy.cc b/locator/everywhere_replication_strategy.cc index 34b73e3280..43126347c0 100644 --- a/locator/everywhere_replication_strategy.cc +++ b/locator/everywhere_replication_strategy.cc @@ -28,7 +28,7 @@ size_t everywhere_replication_strategy::get_replication_factor(const token_metad return tm.sorted_tokens().empty() ? 1 : tm.count_normal_token_owners(); } -inet_address_vector_replica_set everywhere_replication_strategy::get_natural_endpoints(const token&, const effective_replication_map& erm) const { +inet_address_vector_replica_set everywhere_replication_strategy::get_natural_endpoints(const token&, const vnode_effective_replication_map& erm) const { const auto& tm = *erm.get_token_metadata_ptr(); if (tm.sorted_tokens().empty()) { return inet_address_vector_replica_set({utils::fb_utilities::get_broadcast_address()}); @@ -36,7 +36,7 @@ inet_address_vector_replica_set everywhere_replication_strategy::get_natural_end return boost::copy_range(tm.get_all_endpoints()); } -stop_iteration everywhere_replication_strategy::for_each_natural_endpoint_until(const token&, const effective_replication_map& erm, const noncopyable_function& func) const { +stop_iteration everywhere_replication_strategy::for_each_natural_endpoint_until(const token&, const vnode_effective_replication_map& erm, const noncopyable_function& func) const { const auto& tm = *erm.get_token_metadata_ptr(); if (tm.sorted_tokens().empty()) { return func(utils::fb_utilities::get_broadcast_address()); diff --git a/locator/everywhere_replication_strategy.hh b/locator/everywhere_replication_strategy.hh index 75af40fb0e..a123efbfa1 100644 --- a/locator/everywhere_replication_strategy.hh +++ b/locator/everywhere_replication_strategy.hh @@ -39,7 +39,7 @@ public: * We need to override this because the default implementation depends * on token calculations but everywhere_replication_strategy may be used before tokens are set up. */ - virtual inet_address_vector_replica_set get_natural_endpoints(const token&, const effective_replication_map&) const override; - virtual stop_iteration for_each_natural_endpoint_until(const token&, const effective_replication_map&, const noncopyable_function& func) const override; + virtual inet_address_vector_replica_set get_natural_endpoints(const token&, const vnode_effective_replication_map&) const override; + virtual stop_iteration for_each_natural_endpoint_until(const token&, const vnode_effective_replication_map&, const noncopyable_function& func) const override; }; } diff --git a/locator/local_strategy.cc b/locator/local_strategy.cc index 8127cdc949..b3df8248b5 100644 --- a/locator/local_strategy.cc +++ b/locator/local_strategy.cc @@ -33,11 +33,11 @@ size_t local_strategy::get_replication_factor(const token_metadata&) const { return 1; } -inet_address_vector_replica_set local_strategy::get_natural_endpoints(const token&, const effective_replication_map&) const { +inet_address_vector_replica_set local_strategy::get_natural_endpoints(const token&, const vnode_effective_replication_map&) const { return inet_address_vector_replica_set({utils::fb_utilities::get_broadcast_address()}); } -stop_iteration local_strategy::for_each_natural_endpoint_until(const token&, const effective_replication_map&, const noncopyable_function& func) const { +stop_iteration local_strategy::for_each_natural_endpoint_until(const token&, const vnode_effective_replication_map&, const noncopyable_function& func) const { return func(utils::fb_utilities::get_broadcast_address()); } diff --git a/locator/local_strategy.hh b/locator/local_strategy.hh index b90c229e09..be4d2856b7 100644 --- a/locator/local_strategy.hh +++ b/locator/local_strategy.hh @@ -43,8 +43,8 @@ public: * We need to override this because the default implementation depends * on token calculations but LocalStrategy may be used before tokens are set up. */ - inet_address_vector_replica_set get_natural_endpoints(const token&, const effective_replication_map&) const override; - virtual stop_iteration for_each_natural_endpoint_until(const token&, const effective_replication_map&, const noncopyable_function& func) const override; + inet_address_vector_replica_set get_natural_endpoints(const token&, const vnode_effective_replication_map&) const override; + virtual stop_iteration for_each_natural_endpoint_until(const token&, const vnode_effective_replication_map&, const noncopyable_function& func) const override; }; } diff --git a/repair/task_manager_module.hh b/repair/task_manager_module.hh index 56e7245d0f..234ccafab5 100644 --- a/repair/task_manager_module.hh +++ b/repair/task_manager_module.hh @@ -39,14 +39,14 @@ protected: class user_requested_repair_task_impl : public repair_task_impl { private: - lw_shared_ptr _germs; + lw_shared_ptr _germs; std::vector _cfs; dht::token_range_vector _ranges; std::vector _hosts; std::vector _data_centers; std::unordered_set _ignore_nodes; public: - user_requested_repair_task_impl(tasks::task_manager::module_ptr module, repair_uniq_id id, std::string keyspace, std::string entity, lw_shared_ptr germs, std::vector cfs, dht::token_range_vector ranges, std::vector hosts, std::vector data_centers, std::unordered_set ignore_nodes) noexcept + user_requested_repair_task_impl(tasks::task_manager::module_ptr module, repair_uniq_id id, std::string keyspace, std::string entity, lw_shared_ptr germs, std::vector cfs, dht::token_range_vector ranges, std::vector hosts, std::vector data_centers, std::unordered_set ignore_nodes) noexcept : repair_task_impl(module, id.uuid(), id.id, std::move(keyspace), "", std::move(entity), tasks::task_id::create_null_id(), streaming::stream_reason::repair) , _germs(germs) , _cfs(std::move(cfs)) diff --git a/replica/database.cc b/replica/database.cc index d933272bed..a9f5a25be0 100644 --- a/replica/database.cc +++ b/replica/database.cc @@ -1136,8 +1136,8 @@ std::vector database::get_non_local_strategy_keyspaces() const { return res; } -std::unordered_map database::get_non_local_strategy_keyspaces_erms() const { - std::unordered_map res; +std::unordered_map database::get_non_local_strategy_keyspaces_erms() const { + std::unordered_map res; res.reserve(_keyspaces.size()); for (auto const& i : _keyspaces) { if (i.second.get_replication_strategy().get_type() != locator::replication_strategy_type::local) { @@ -1207,7 +1207,7 @@ keyspace::create_replication_strategy(const locator::shared_token_metadata& stm, } void -keyspace::update_effective_replication_map(locator::effective_replication_map_ptr erm) { +keyspace::update_effective_replication_map(locator::vnode_effective_replication_map_ptr erm) { _effective_replication_map = std::move(erm); } diff --git a/replica/database.hh b/replica/database.hh index aec3681504..9284712599 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -1164,7 +1164,7 @@ public: }; private: locator::abstract_replication_strategy::ptr_type _replication_strategy; - locator::effective_replication_map_ptr _effective_replication_map; + locator::vnode_effective_replication_map_ptr _effective_replication_map; lw_shared_ptr _metadata; config _config; locator::effective_replication_map_factory& _erm_factory; @@ -1190,7 +1190,7 @@ public: */ lw_shared_ptr metadata() const; future<> create_replication_strategy(const locator::shared_token_metadata& stm, const locator::replication_strategy_config_options& options); - void update_effective_replication_map(locator::effective_replication_map_ptr erm); + void update_effective_replication_map(locator::vnode_effective_replication_map_ptr erm); /** * This should not really be return by reference, since replication @@ -1204,7 +1204,7 @@ public: return _replication_strategy; } - locator::effective_replication_map_ptr get_effective_replication_map() const { + locator::vnode_effective_replication_map_ptr get_effective_replication_map() const { return _effective_replication_map; } @@ -1521,7 +1521,7 @@ public: std::vector get_user_keyspaces() const; std::vector get_all_keyspaces() const; std::vector get_non_local_strategy_keyspaces() const; - std::unordered_map get_non_local_strategy_keyspaces_erms() const; + std::unordered_map get_non_local_strategy_keyspaces_erms() const; column_family& find_column_family(std::string_view ks, std::string_view name); const column_family& find_column_family(std::string_view ks, std::string_view name) const; column_family& find_column_family(const table_id&); diff --git a/service/storage_service.cc b/service/storage_service.cc index 4ff68a077c..1a89a133aa 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -1828,13 +1828,13 @@ storage_service::get_range_to_address_map(const sstring& keyspace) const { } future> -storage_service::get_range_to_address_map(locator::effective_replication_map_ptr erm) const { +storage_service::get_range_to_address_map(locator::vnode_effective_replication_map_ptr erm) const { return get_range_to_address_map(erm, erm->get_token_metadata_ptr()->sorted_tokens()); } // Caller is responsible to hold token_metadata valid until the returned future is resolved future> -storage_service::get_range_to_address_map(locator::effective_replication_map_ptr erm, +storage_service::get_range_to_address_map(locator::vnode_effective_replication_map_ptr erm, const std::vector& sorted_tokens) const { co_return co_await construct_range_to_endpoint_map(erm, co_await get_all_ranges(sorted_tokens)); } @@ -2531,7 +2531,7 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt std::vector pending_token_metadata_ptr; pending_token_metadata_ptr.resize(smp::count); - std::vector> pending_effective_replication_maps; + std::vector> pending_effective_replication_maps; pending_effective_replication_maps.resize(smp::count); try { @@ -2772,7 +2772,7 @@ future> storage_service::get_ownership() { future> storage_service::effective_ownership(sstring keyspace_name) { return run_with_no_api_lock([keyspace_name] (storage_service& ss) mutable -> future> { - locator::effective_replication_map_ptr erm; + locator::vnode_effective_replication_map_ptr erm; if (keyspace_name != "") { //find throws no such keyspace if it is missing const replica::keyspace& ks = ss._db.local().find_keyspace(keyspace_name); @@ -4122,7 +4122,8 @@ int32_t storage_service::get_exception_count() { return 0; } -future> storage_service::get_changed_ranges_for_leaving(locator::effective_replication_map_ptr erm, inet_address endpoint) { +future> +storage_service::get_changed_ranges_for_leaving(locator::vnode_effective_replication_map_ptr erm, inet_address endpoint) { // First get all ranges the leaving endpoint is responsible for auto ranges = get_ranges_for_endpoint(erm, endpoint); @@ -4472,7 +4473,7 @@ future<> storage_service::shutdown_protocol_servers() { } future> -storage_service::get_new_source_ranges(locator::effective_replication_map_ptr erm, const dht::token_range_vector& ranges) const { +storage_service::get_new_source_ranges(locator::vnode_effective_replication_map_ptr erm, const dht::token_range_vector& ranges) const { auto my_address = get_broadcast_address(); std::unordered_map range_addresses = co_await erm->get_range_addresses(); std::unordered_multimap source_ranges; @@ -4520,7 +4521,7 @@ storage_service::describe_ring(const sstring& keyspace, bool include_only_local_ future> storage_service::construct_range_to_endpoint_map( - locator::effective_replication_map_ptr erm, + locator::vnode_effective_replication_map_ptr erm, const dht::token_range_vector& ranges) const { std::unordered_map res; res.reserve(ranges.size()); @@ -5026,7 +5027,7 @@ storage_service::get_splits(const sstring& ks_name, const sstring& cf_name, rang }; dht::token_range_vector -storage_service::get_ranges_for_endpoint(const locator::effective_replication_map_ptr& erm, const gms::inet_address& ep) const { +storage_service::get_ranges_for_endpoint(const locator::vnode_effective_replication_map_ptr& erm, const gms::inet_address& ep) const { return erm->get_ranges(ep); } diff --git a/service/storage_service.hh b/service/storage_service.hh index 3c7b7d3536..84d7d777c4 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -369,9 +369,9 @@ public: sstring get_rpc_address(const inet_address& endpoint) const; future> get_range_to_address_map(const sstring& keyspace) const; - future> get_range_to_address_map(locator::effective_replication_map_ptr erm) const; + future> get_range_to_address_map(locator::vnode_effective_replication_map_ptr erm) const; - future> get_range_to_address_map(locator::effective_replication_map_ptr erm, + future> get_range_to_address_map(locator::vnode_effective_replication_map_ptr erm, const std::vector& sorted_tokens) const; /** @@ -405,7 +405,7 @@ public: * @return mapping of ranges to the replicas responsible for them. */ future> construct_range_to_endpoint_map( - locator::effective_replication_map_ptr erm, + locator::vnode_effective_replication_map_ptr erm, const dht::token_range_vector& ranges) const; public: virtual future<> on_join(gms::inet_address endpoint, gms::endpoint_state ep_state) override; @@ -554,7 +554,7 @@ private: * @param ranges the ranges to find sources for * @return multimap of addresses to ranges the address is responsible for */ - future> get_new_source_ranges(locator::effective_replication_map_ptr erm, const dht::token_range_vector& ranges) const; + future> get_new_source_ranges(locator::vnode_effective_replication_map_ptr erm, const dht::token_range_vector& ranges) const; /** * Sends a notification to a node indicating we have finished replicating data. @@ -578,7 +578,7 @@ private: future<> removenode_add_ranges(lw_shared_ptr streamer, gms::inet_address leaving_node); // needs to be modified to accept either a keyspace or ARS. - future> get_changed_ranges_for_leaving(locator::effective_replication_map_ptr erm, inet_address endpoint); + future> get_changed_ranges_for_leaving(locator::vnode_effective_replication_map_ptr erm, inet_address endpoint); future<> maybe_reconnect_to_preferred_ip(inet_address ep, inet_address local_ip); public: @@ -596,7 +596,7 @@ public: * @param ep endpoint we are interested in. * @return ranges for the specified endpoint. */ - dht::token_range_vector get_ranges_for_endpoint(const locator::effective_replication_map_ptr& erm, const gms::inet_address& ep) const; + dht::token_range_vector get_ranges_for_endpoint(const locator::vnode_effective_replication_map_ptr& erm, const gms::inet_address& ep) const; /** * Get all ranges that span the ring given a set diff --git a/test/boost/network_topology_strategy_test.cc b/test/boost/network_topology_strategy_test.cc index 18e9e66a4a..1190e6b9d2 100644 --- a/test/boost/network_topology_strategy_test.cc +++ b/test/boost/network_topology_strategy_test.cc @@ -64,7 +64,7 @@ static void verify_sorted(const dht::token_range_vector& trv) { BOOST_CHECK(boost::adjacent_find(trv, not_strictly_before) == trv.end()); } -static void check_ranges_are_sorted(effective_replication_map_ptr erm, gms::inet_address ep) { +static void check_ranges_are_sorted(vnode_effective_replication_map_ptr erm, gms::inet_address ep) { verify_sorted(erm->get_ranges(ep)); verify_sorted(erm->get_primary_ranges(ep)); verify_sorted(erm->get_primary_ranges_within_dc(ep));