locator: Rename effective_replication_map to vnode_effective_replication_map

In preparation for introducing a more abstract
effective_replication_map which can describe replication maps which
are not based on vnodes.
This commit is contained in:
Tomasz Grabiec
2023-03-22 00:16:15 +01:00
parent 1343bfa708
commit d3c9ad4ed6
15 changed files with 115 additions and 108 deletions

View File

@@ -387,7 +387,7 @@ class token_ranges_owned_by_this_shard {
class ranges_holder_primary {
const dht::token_range_vector _token_ranges;
public:
ranges_holder_primary(const locator::effective_replication_map_ptr& erm, gms::gossiper& g, gms::inet_address ep)
ranges_holder_primary(const locator::vnode_effective_replication_map_ptr& erm, gms::gossiper& g, gms::inet_address ep)
: _token_ranges(erm->get_primary_ranges(ep)) {}
std::size_t size() const { return _token_ranges.size(); }
const dht::token_range& operator[](std::size_t i) const {

View File

@@ -80,7 +80,7 @@ range_streamer::get_range_fetch_map(const std::unordered_map<dht::token_range, s
// Must be called from a seastar thread
std::unordered_map<dht::token_range, std::vector<inet_address>>
range_streamer::get_all_ranges_with_sources_for(const sstring& keyspace_name, locator::effective_replication_map_ptr erm, dht::token_range_vector desired_ranges) {
range_streamer::get_all_ranges_with_sources_for(const sstring& keyspace_name, locator::vnode_effective_replication_map_ptr erm, dht::token_range_vector desired_ranges) {
logger.debug("{} ks={}", __func__, keyspace_name);
auto range_addresses = erm->get_range_addresses().get0();
@@ -115,7 +115,7 @@ range_streamer::get_all_ranges_with_sources_for(const sstring& keyspace_name, lo
// Must be called from a seastar thread
std::unordered_map<dht::token_range, std::vector<inet_address>>
range_streamer::get_all_ranges_with_strict_sources_for(const sstring& keyspace_name, locator::effective_replication_map_ptr erm, dht::token_range_vector desired_ranges, gms::gossiper& gossiper) {
range_streamer::get_all_ranges_with_strict_sources_for(const sstring& keyspace_name, locator::vnode_effective_replication_map_ptr erm, dht::token_range_vector desired_ranges, gms::gossiper& gossiper) {
logger.debug("{} ks={}", __func__, keyspace_name);
assert (_tokens.empty() == false);
@@ -183,7 +183,7 @@ range_streamer::get_all_ranges_with_strict_sources_for(const sstring& keyspace_n
return range_sources;
}
bool range_streamer::use_strict_sources_for_ranges(const sstring& keyspace_name, const locator::effective_replication_map_ptr& erm) {
bool range_streamer::use_strict_sources_for_ranges(const sstring& keyspace_name, const locator::vnode_effective_replication_map_ptr& erm) {
auto rf = erm->get_replication_factor();
auto nr_nodes_in_ring = get_token_metadata().get_all_endpoints().size();
bool everywhere_topology = erm->get_replication_strategy().get_type() == locator::replication_strategy_type::everywhere_topology;
@@ -214,7 +214,7 @@ void range_streamer::add_rx_ranges(const sstring& keyspace_name, std::unordered_
}
// TODO: This is the legacy range_streamer interface, it is add_rx_ranges which adds rx ranges.
future<> range_streamer::add_ranges(const sstring& keyspace_name, locator::effective_replication_map_ptr erm, dht::token_range_vector ranges, gms::gossiper& gossiper, bool is_replacing) {
future<> range_streamer::add_ranges(const sstring& keyspace_name, locator::vnode_effective_replication_map_ptr erm, dht::token_range_vector ranges, gms::gossiper& gossiper, bool is_replacing) {
return seastar::async([this, keyspace_name, erm = std::move(erm), ranges= std::move(ranges), &gossiper, is_replacing] () mutable {
if (_nr_tx_added) {
throw std::runtime_error("Mixed sending and receiving is not supported");

View File

@@ -100,24 +100,24 @@ public:
_source_filters.emplace(std::move(filter));
}
future<> add_ranges(const sstring& keyspace_name, locator::effective_replication_map_ptr erm, dht::token_range_vector ranges, gms::gossiper& gossiper, bool is_replacing);
future<> add_ranges(const sstring& keyspace_name, locator::vnode_effective_replication_map_ptr erm, dht::token_range_vector ranges, gms::gossiper& gossiper, bool is_replacing);
void add_tx_ranges(const sstring& keyspace_name, std::unordered_map<inet_address, dht::token_range_vector> ranges_per_endpoint);
void add_rx_ranges(const sstring& keyspace_name, std::unordered_map<inet_address, dht::token_range_vector> ranges_per_endpoint);
private:
bool use_strict_sources_for_ranges(const sstring& keyspace_name, const locator::effective_replication_map_ptr& erm);
bool use_strict_sources_for_ranges(const sstring& keyspace_name, const locator::vnode_effective_replication_map_ptr& erm);
/**
* Get a map of all ranges and their respective sources that are candidates for streaming the given ranges
* to us. For each range, the list of sources is sorted by proximity relative to the given destAddress.
*/
std::unordered_map<dht::token_range, std::vector<inet_address>>
get_all_ranges_with_sources_for(const sstring& keyspace_name, locator::effective_replication_map_ptr erm, dht::token_range_vector desired_ranges);
get_all_ranges_with_sources_for(const sstring& keyspace_name, locator::vnode_effective_replication_map_ptr erm, dht::token_range_vector desired_ranges);
/**
* Get a map of all ranges and the source that will be cleaned up once this bootstrapped node is added for the given ranges.
* For each range, the list should only contain a single source. This allows us to consistently migrate data without violating
* consistency.
*/
std::unordered_map<dht::token_range, std::vector<inet_address>>
get_all_ranges_with_strict_sources_for(const sstring& keyspace_name, locator::effective_replication_map_ptr erm, dht::token_range_vector desired_ranges, gms::gossiper& gossiper);
get_all_ranges_with_strict_sources_for(const sstring& keyspace_name, locator::vnode_effective_replication_map_ptr erm, dht::token_range_vector desired_ranges, gms::gossiper& gossiper);
private:
/**
* @param rangesWithSources The ranges we want to fetch (key) and their potential sources (value)

View File

@@ -63,13 +63,13 @@ sstring abstract_replication_strategy::to_qualified_class_name(std::string_view
return strategy_class_registry::to_qualified_class_name(strategy_class_name);
}
inet_address_vector_replica_set abstract_replication_strategy::get_natural_endpoints(const token& search_token, const effective_replication_map& erm) const {
inet_address_vector_replica_set abstract_replication_strategy::get_natural_endpoints(const token& search_token, const vnode_effective_replication_map& erm) const {
const token& key_token = erm.get_token_metadata_ptr()->first_token(search_token);
auto res = erm.get_replication_map().find(key_token);
return res->second;
}
stop_iteration abstract_replication_strategy::for_each_natural_endpoint_until(const token& search_token, const effective_replication_map& erm, const noncopyable_function<stop_iteration(const inet_address&)>& func) const {
stop_iteration abstract_replication_strategy::for_each_natural_endpoint_until(const token& search_token, const vnode_effective_replication_map& erm, const noncopyable_function<stop_iteration(const inet_address&)>& func) const {
const token& key_token = erm.get_token_metadata_ptr()->first_token(search_token);
auto res = erm.get_replication_map().find(key_token);
for (const auto& ep : res->second) {
@@ -80,7 +80,7 @@ stop_iteration abstract_replication_strategy::for_each_natural_endpoint_until(co
return stop_iteration::no;
}
inet_address_vector_replica_set effective_replication_map::get_natural_endpoints_without_node_being_replaced(const token& search_token) const {
inet_address_vector_replica_set vnode_effective_replication_map::get_natural_endpoints_without_node_being_replaced(const token& search_token) const {
inet_address_vector_replica_set natural_endpoints = get_natural_endpoints(search_token);
if (_tmptr->is_any_node_being_replaced() &&
_rs->allow_remove_node_being_replaced_from_natural_endpoints()) {
@@ -103,7 +103,7 @@ inet_address_vector_replica_set effective_replication_map::get_natural_endpoints
return natural_endpoints;
}
inet_address_vector_topology_change effective_replication_map::get_pending_endpoints(const token& search_token, const sstring& ks_name) const {
inet_address_vector_topology_change vnode_effective_replication_map::get_pending_endpoints(const token& search_token, const sstring& ks_name) const {
return _tmptr->pending_endpoints_for(search_token, ks_name);
}
@@ -152,7 +152,7 @@ insert_token_range_to_sorted_container_while_unwrapping(
}
dht::token_range_vector
effective_replication_map::do_get_ranges(noncopyable_function<stop_iteration(bool&, const inet_address&)> consider_range_for_endpoint) const {
vnode_effective_replication_map::do_get_ranges(noncopyable_function<stop_iteration(bool&, const inet_address&)> consider_range_for_endpoint) const {
dht::token_range_vector ret;
const auto& tm = *_tmptr;
const auto& sorted_tokens = tm.sorted_tokens();
@@ -174,7 +174,7 @@ effective_replication_map::do_get_ranges(noncopyable_function<stop_iteration(boo
}
dht::token_range_vector
effective_replication_map::get_ranges(inet_address ep) const {
vnode_effective_replication_map::get_ranges(inet_address ep) const {
// The callback function below is called for each endpoint
// in each token natural endpoints.
// Add the range if `ep` is found in the token's natural endpoints
@@ -224,7 +224,7 @@ abstract_replication_strategy::get_ranges(inet_address ep, const token_metadata&
}
dht::token_range_vector
effective_replication_map::get_primary_ranges(inet_address ep) const {
vnode_effective_replication_map::get_primary_ranges(inet_address ep) const {
// The callback function below is called for each endpoint
// in each token natural endpoints.
// Add the range if `ep` is the primary replica in the token's natural endpoints.
@@ -237,7 +237,7 @@ effective_replication_map::get_primary_ranges(inet_address ep) const {
}
dht::token_range_vector
effective_replication_map::get_primary_ranges_within_dc(inet_address ep) const {
vnode_effective_replication_map::get_primary_ranges_within_dc(inet_address ep) const {
const topology& topo = _tmptr->get_topology();
sstring local_dc = topo.get_datacenter(ep);
std::unordered_set<inet_address> local_dc_nodes = topo.get_datacenter_endpoints().at(local_dc);
@@ -259,7 +259,7 @@ effective_replication_map::get_primary_ranges_within_dc(inet_address ep) const {
}
future<std::unordered_map<dht::token_range, inet_address_vector_replica_set>>
effective_replication_map::get_range_addresses() const {
vnode_effective_replication_map::get_range_addresses() const {
const token_metadata& tm = *_tmptr;
std::unordered_map<dht::token_range, inet_address_vector_replica_set> ret;
for (const auto& [t, eps] : _replication_map) {
@@ -303,7 +303,7 @@ abstract_replication_strategy::get_pending_address_ranges(const token_metadata_p
co_return ret;
}
future<mutable_effective_replication_map_ptr> calculate_effective_replication_map(abstract_replication_strategy::ptr_type rs, token_metadata_ptr tmptr) {
future<mutable_vnode_effective_replication_map_ptr> calculate_effective_replication_map(abstract_replication_strategy::ptr_type rs, token_metadata_ptr tmptr) {
replication_map replication_map;
const auto& sorted_tokens = tmptr->sorted_tokens();
@@ -327,7 +327,7 @@ future<mutable_effective_replication_map_ptr> calculate_effective_replication_ma
co_return make_effective_replication_map(std::move(rs), std::move(tmptr), std::move(replication_map), rf);
}
future<replication_map> effective_replication_map::clone_endpoints_gently() const {
future<replication_map> vnode_effective_replication_map::clone_endpoints_gently() const {
replication_map cloned_endpoints;
for (auto& i : _replication_map) {
@@ -338,20 +338,20 @@ future<replication_map> effective_replication_map::clone_endpoints_gently() cons
co_return cloned_endpoints;
}
inet_address_vector_replica_set effective_replication_map::get_natural_endpoints(const token& search_token) const {
inet_address_vector_replica_set vnode_effective_replication_map::get_natural_endpoints(const token& search_token) const {
return _rs->get_natural_endpoints(search_token, *this);
}
stop_iteration effective_replication_map::for_each_natural_endpoint_until(const token& search_token, const noncopyable_function<stop_iteration(const inet_address&)>& func) const {
stop_iteration vnode_effective_replication_map::for_each_natural_endpoint_until(const token& search_token, const noncopyable_function<stop_iteration(const inet_address&)>& func) const {
return _rs->for_each_natural_endpoint_until(search_token, *this, func);
}
future<> effective_replication_map::clear_gently() noexcept {
future<> vnode_effective_replication_map::clear_gently() noexcept {
co_await utils::clear_gently(_replication_map);
co_await utils::clear_gently(_tmptr);
}
effective_replication_map::~effective_replication_map() {
vnode_effective_replication_map::~vnode_effective_replication_map() {
if (is_registered()) {
_factory->erase_effective_replication_map(this);
try {
@@ -368,13 +368,13 @@ effective_replication_map::~effective_replication_map() {
}
}
effective_replication_map::factory_key effective_replication_map::make_factory_key(const abstract_replication_strategy::ptr_type& rs, const token_metadata_ptr& tmptr) {
vnode_effective_replication_map::factory_key vnode_effective_replication_map::make_factory_key(const abstract_replication_strategy::ptr_type& rs, const token_metadata_ptr& tmptr) {
return factory_key(rs->get_type(), rs->get_config_options(), tmptr->get_ring_version());
}
future<effective_replication_map_ptr> effective_replication_map_factory::create_effective_replication_map(abstract_replication_strategy::ptr_type rs, token_metadata_ptr tmptr) {
future<vnode_effective_replication_map_ptr> effective_replication_map_factory::create_effective_replication_map(abstract_replication_strategy::ptr_type rs, token_metadata_ptr tmptr) {
// lookup key on local shard
auto key = effective_replication_map::make_factory_key(rs, tmptr);
auto key = vnode_effective_replication_map::make_factory_key(rs, tmptr);
auto erm = find_effective_replication_map(key);
if (erm) {
rslogger.debug("create_effective_replication_map: found {} [{}]", key, fmt::ptr(erm.get()));
@@ -385,11 +385,11 @@ future<effective_replication_map_ptr> effective_replication_map_factory::create_
// TODO:
// - use hash of key to distribute the load
// - instaintiate only on NUMA nodes
auto ref_erm = co_await container().invoke_on(0, [key] (effective_replication_map_factory& ermf) -> future<foreign_ptr<effective_replication_map_ptr>> {
auto ref_erm = co_await container().invoke_on(0, [key] (effective_replication_map_factory& ermf) -> future<foreign_ptr<vnode_effective_replication_map_ptr>> {
auto erm = ermf.find_effective_replication_map(key);
co_return make_foreign<effective_replication_map_ptr>(std::move(erm));
co_return make_foreign<vnode_effective_replication_map_ptr>(std::move(erm));
});
mutable_effective_replication_map_ptr new_erm;
mutable_vnode_effective_replication_map_ptr new_erm;
if (ref_erm) {
auto rf = ref_erm->get_replication_factor();
auto local_replication_map = co_await ref_erm->clone_endpoints_gently();
@@ -400,7 +400,7 @@ future<effective_replication_map_ptr> effective_replication_map_factory::create_
co_return insert_effective_replication_map(std::move(new_erm), std::move(key));
}
effective_replication_map_ptr effective_replication_map_factory::find_effective_replication_map(const effective_replication_map::factory_key& key) const {
vnode_effective_replication_map_ptr effective_replication_map_factory::find_effective_replication_map(const vnode_effective_replication_map::factory_key& key) const {
auto it = _effective_replication_maps.find(key);
if (it != _effective_replication_maps.end()) {
return it->second->shared_from_this();
@@ -408,7 +408,7 @@ effective_replication_map_ptr effective_replication_map_factory::find_effective_
return {};
}
effective_replication_map_ptr effective_replication_map_factory::insert_effective_replication_map(mutable_effective_replication_map_ptr erm, effective_replication_map::factory_key key) {
vnode_effective_replication_map_ptr effective_replication_map_factory::insert_effective_replication_map(mutable_vnode_effective_replication_map_ptr erm, vnode_effective_replication_map::factory_key key) {
auto [it, inserted] = _effective_replication_maps.insert({key, erm.get()});
if (inserted) {
rslogger.debug("insert_effective_replication_map: inserted {} [{}]", key, fmt::ptr(erm.get()));
@@ -420,7 +420,7 @@ effective_replication_map_ptr effective_replication_map_factory::insert_effectiv
return res;
}
bool effective_replication_map_factory::erase_effective_replication_map(effective_replication_map* erm) {
bool effective_replication_map_factory::erase_effective_replication_map(vnode_effective_replication_map* erm) {
const auto& key = erm->get_factory_key();
auto it = _effective_replication_maps.find(key);
if (it == _effective_replication_maps.end()) {
@@ -474,7 +474,7 @@ void effective_replication_map_factory::submit_background_work(future<> fut) {
});
}
future<> global_effective_replication_map::get_keyspace_erms(sharded<replica::database>& sharded_db, std::string_view keyspace_name) {
future<> global_vnode_effective_replication_map::get_keyspace_erms(sharded<replica::database>& sharded_db, std::string_view keyspace_name) {
return sharded_db.invoke_on(0, [this, &sharded_db, keyspace_name] (replica::database& db) -> future<> {
// To ensure we get the same effective_replication_map
// on all shards, acquire the shared_token_metadata lock.
@@ -506,8 +506,8 @@ future<> global_effective_replication_map::get_keyspace_erms(sharded<replica::da
});
}
future<global_effective_replication_map> make_global_effective_replication_map(sharded<replica::database>& sharded_db, std::string_view keyspace_name) {
global_effective_replication_map ret;
future<global_vnode_effective_replication_map> make_global_effective_replication_map(sharded<replica::database>& sharded_db, std::string_view keyspace_name) {
global_vnode_effective_replication_map ret;
co_await ret.get_keyspace_erms(sharded_db, keyspace_name);
co_return ret;
}
@@ -528,7 +528,7 @@ std::ostream& operator<<(std::ostream& os, locator::replication_strategy_type t)
std::abort();
}
std::ostream& operator<<(std::ostream& os, const locator::effective_replication_map::factory_key& key) {
std::ostream& operator<<(std::ostream& os, const locator::vnode_effective_replication_map::factory_key& key) {
os << key.rs_type;
os << '.' << key.ring_version;
char sep = ':';

View File

@@ -49,11 +49,11 @@ using replication_map = std::unordered_map<token, inet_address_vector_replica_se
using endpoint_set = utils::basic_sequenced_set<inet_address, inet_address_vector_replica_set>;
class effective_replication_map;
class vnode_effective_replication_map;
class effective_replication_map_factory;
class abstract_replication_strategy {
friend class effective_replication_map;
friend class vnode_effective_replication_map;
protected:
replication_strategy_config_options _config_options;
replication_strategy_type _my_type;
@@ -102,9 +102,9 @@ public:
static sstring to_qualified_class_name(std::string_view strategy_class_name);
virtual inet_address_vector_replica_set get_natural_endpoints(const token& search_token, const effective_replication_map& erm) const;
virtual inet_address_vector_replica_set get_natural_endpoints(const token& search_token, const vnode_effective_replication_map& erm) const;
// Returns the last stop_iteration result of the called func
virtual stop_iteration for_each_natural_endpoint_until(const token& search_token, const effective_replication_map& erm, const noncopyable_function<stop_iteration(const inet_address&)>& func) const;
virtual stop_iteration for_each_natural_endpoint_until(const token& search_token, const vnode_effective_replication_map& erm, const noncopyable_function<stop_iteration(const inet_address&)>& func) const;
virtual void validate_options(const gms::feature_service&) const = 0;
virtual std::optional<std::unordered_set<sstring>> recognized_options(const topology&) const = 0;
virtual size_t get_replication_factor(const token_metadata& tm) const = 0;
@@ -131,7 +131,8 @@ public:
// Holds the full replication_map resulting from applying the
// effective replication strategy over the given token_metadata
// and replication_strategy_config_options.
class effective_replication_map : public enable_lw_shared_from_this<effective_replication_map> {
// Used for token-based replication strategies.
class vnode_effective_replication_map : public enable_shared_from_this<vnode_effective_replication_map> {
public:
struct factory_key {
replication_strategy_type rs_type;
@@ -164,15 +165,15 @@ private:
friend class abstract_replication_strategy;
friend class effective_replication_map_factory;
public:
explicit effective_replication_map(abstract_replication_strategy::ptr_type rs, token_metadata_ptr tmptr, replication_map replication_map, size_t replication_factor) noexcept
explicit vnode_effective_replication_map(abstract_replication_strategy::ptr_type rs, token_metadata_ptr tmptr, replication_map replication_map, size_t replication_factor) noexcept
: _rs(std::move(rs))
, _tmptr(std::move(tmptr))
, _replication_map(std::move(replication_map))
, _replication_factor(replication_factor)
{ }
effective_replication_map() = delete;
effective_replication_map(effective_replication_map&&) = default;
~effective_replication_map();
vnode_effective_replication_map() = delete;
vnode_effective_replication_map(vnode_effective_replication_map&&) = default;
~vnode_effective_replication_map();
const token_metadata& get_token_metadata() const noexcept {
return *_tmptr;
@@ -259,56 +260,61 @@ public:
}
};
using effective_replication_map_ptr = lw_shared_ptr<const effective_replication_map>;
using mutable_effective_replication_map_ptr = lw_shared_ptr<effective_replication_map>;
using vnode_effective_replication_map_ptr = shared_ptr<const vnode_effective_replication_map>;
using mutable_vnode_effective_replication_map_ptr = shared_ptr<vnode_effective_replication_map>;
using vnode_erm_ptr = vnode_effective_replication_map_ptr;
using mutable_vnode_erm_ptr = mutable_vnode_effective_replication_map_ptr;
using effective_replication_map = vnode_effective_replication_map_ptr;
using mutatble_effective_replication_map = mutable_vnode_effective_replication_map_ptr;
inline mutable_effective_replication_map_ptr make_effective_replication_map(abstract_replication_strategy::ptr_type rs, token_metadata_ptr tmptr, replication_map replication_map, size_t replication_factor) {
return make_lw_shared<effective_replication_map>(std::move(rs), std::move(tmptr), std::move(replication_map), replication_factor);
inline mutable_vnode_erm_ptr make_effective_replication_map(abstract_replication_strategy::ptr_type rs, token_metadata_ptr tmptr, replication_map replication_map, size_t replication_factor) {
return seastar::make_shared<vnode_effective_replication_map>(
std::move(rs), std::move(tmptr), std::move(replication_map), replication_factor);
}
// Apply the replication strategy over the current configuration and the given token_metadata.
future<mutable_effective_replication_map_ptr> calculate_effective_replication_map(abstract_replication_strategy::ptr_type rs, token_metadata_ptr tmptr);
future<mutable_vnode_erm_ptr> calculate_effective_replication_map(abstract_replication_strategy::ptr_type rs, token_metadata_ptr tmptr);
// Class to hold a coherent view of a keyspace
// effective replication map on all shards
class global_effective_replication_map {
std::vector<foreign_ptr<effective_replication_map_ptr>> _erms;
class global_vnode_effective_replication_map {
std::vector<foreign_ptr<vnode_erm_ptr>> _erms;
public:
global_effective_replication_map() : _erms(smp::count) {}
global_effective_replication_map(global_effective_replication_map&&) = default;
global_effective_replication_map& operator=(global_effective_replication_map&&) = default;
global_vnode_effective_replication_map() : _erms(smp::count) {}
global_vnode_effective_replication_map(global_vnode_effective_replication_map&&) = default;
global_vnode_effective_replication_map& operator=(global_vnode_effective_replication_map&&) = default;
future<> get_keyspace_erms(sharded<replica::database>& sharded_db, std::string_view keyspace_name);
const effective_replication_map& get() const noexcept {
const vnode_effective_replication_map& get() const noexcept {
return *_erms[this_shard_id()];
}
const effective_replication_map& operator*() const noexcept {
const vnode_effective_replication_map& operator*() const noexcept {
return get();
}
const effective_replication_map* operator->() const noexcept {
const vnode_effective_replication_map* operator->() const noexcept {
return &get();
}
};
future<global_effective_replication_map> make_global_effective_replication_map(sharded<replica::database>& sharded_db, std::string_view keyspace_name);
future<global_vnode_effective_replication_map> make_global_effective_replication_map(sharded<replica::database>& sharded_db, std::string_view keyspace_name);
} // namespace locator
std::ostream& operator<<(std::ostream& os, locator::replication_strategy_type);
std::ostream& operator<<(std::ostream& os, const locator::effective_replication_map::factory_key& key);
std::ostream& operator<<(std::ostream& os, const locator::vnode_effective_replication_map::factory_key& key);
template <>
struct fmt::formatter<locator::effective_replication_map::factory_key> {
struct fmt::formatter<locator::vnode_effective_replication_map::factory_key> {
constexpr auto parse(format_parse_context& ctx) {
return ctx.end();
}
template <typename FormatContext>
auto format(const locator::effective_replication_map::factory_key& key, FormatContext& ctx) {
auto format(const locator::vnode_effective_replication_map::factory_key& key, FormatContext& ctx) {
std::ostringstream os;
os << key;
return fmt::format_to(ctx.out(), "{}", os.str());
@@ -316,9 +322,9 @@ struct fmt::formatter<locator::effective_replication_map::factory_key> {
};
template<>
struct appending_hash<locator::effective_replication_map::factory_key> {
struct appending_hash<locator::vnode_effective_replication_map::factory_key> {
template<typename Hasher>
void operator()(Hasher& h, const locator::effective_replication_map::factory_key& key) const {
void operator()(Hasher& h, const locator::vnode_effective_replication_map::factory_key& key) const {
feed_hash(h, key.rs_type);
feed_hash(h, key.ring_version);
for (const auto& [opt, val] : key.rs_config_options) {
@@ -344,10 +350,10 @@ struct factory_key_hasher : public hasher {
namespace std {
template <>
struct hash<locator::effective_replication_map::factory_key> {
size_t operator()(const locator::effective_replication_map::factory_key& key) const {
struct hash<locator::vnode_effective_replication_map::factory_key> {
size_t operator()(const locator::vnode_effective_replication_map::factory_key& key) const {
factory_key_hasher h;
appending_hash<locator::effective_replication_map::factory_key>{}(h, key);
appending_hash<locator::vnode_effective_replication_map::factory_key>{}(h, key);
return h.finalize();
}
};
@@ -357,18 +363,18 @@ struct hash<locator::effective_replication_map::factory_key> {
namespace locator {
class effective_replication_map_factory : public peering_sharded_service<effective_replication_map_factory> {
std::unordered_map<effective_replication_map::factory_key, effective_replication_map*> _effective_replication_maps;
std::unordered_map<vnode_effective_replication_map::factory_key, vnode_effective_replication_map*> _effective_replication_maps;
future<> _background_work = make_ready_future<>();
bool _stopped = false;
public:
// looks up the effective_replication_map on the local shard.
// looks up the vnode_effective_replication_map on the local shard.
// If not found, tries to look one up for reference on shard 0
// so its replication map can be cloned. Otherwise, calculates the
// effective_replication_map for the local shard.
// vnode_effective_replication_map for the local shard.
//
// Therefore create should be called first on shard 0, then on all other shards.
future<effective_replication_map_ptr> create_effective_replication_map(abstract_replication_strategy::ptr_type rs, token_metadata_ptr tmptr);
future<vnode_erm_ptr> create_effective_replication_map(abstract_replication_strategy::ptr_type rs, token_metadata_ptr tmptr);
future<> stop() noexcept;
@@ -377,14 +383,14 @@ public:
}
private:
effective_replication_map_ptr find_effective_replication_map(const effective_replication_map::factory_key& key) const;
effective_replication_map_ptr insert_effective_replication_map(mutable_effective_replication_map_ptr erm, effective_replication_map::factory_key key);
vnode_erm_ptr find_effective_replication_map(const vnode_effective_replication_map::factory_key& key) const;
vnode_erm_ptr insert_effective_replication_map(mutable_vnode_erm_ptr erm, vnode_effective_replication_map::factory_key key);
bool erase_effective_replication_map(effective_replication_map* erm);
bool erase_effective_replication_map(vnode_effective_replication_map* erm);
void submit_background_work(future<> fut);
friend class effective_replication_map;
friend class vnode_effective_replication_map;
};
}

View File

@@ -28,7 +28,7 @@ size_t everywhere_replication_strategy::get_replication_factor(const token_metad
return tm.sorted_tokens().empty() ? 1 : tm.count_normal_token_owners();
}
inet_address_vector_replica_set everywhere_replication_strategy::get_natural_endpoints(const token&, const effective_replication_map& erm) const {
inet_address_vector_replica_set everywhere_replication_strategy::get_natural_endpoints(const token&, const vnode_effective_replication_map& erm) const {
const auto& tm = *erm.get_token_metadata_ptr();
if (tm.sorted_tokens().empty()) {
return inet_address_vector_replica_set({utils::fb_utilities::get_broadcast_address()});
@@ -36,7 +36,7 @@ inet_address_vector_replica_set everywhere_replication_strategy::get_natural_end
return boost::copy_range<inet_address_vector_replica_set>(tm.get_all_endpoints());
}
stop_iteration everywhere_replication_strategy::for_each_natural_endpoint_until(const token&, const effective_replication_map& erm, const noncopyable_function<stop_iteration(const inet_address&)>& func) const {
stop_iteration everywhere_replication_strategy::for_each_natural_endpoint_until(const token&, const vnode_effective_replication_map& erm, const noncopyable_function<stop_iteration(const inet_address&)>& func) const {
const auto& tm = *erm.get_token_metadata_ptr();
if (tm.sorted_tokens().empty()) {
return func(utils::fb_utilities::get_broadcast_address());

View File

@@ -39,7 +39,7 @@ public:
* We need to override this because the default implementation depends
* on token calculations but everywhere_replication_strategy may be used before tokens are set up.
*/
virtual inet_address_vector_replica_set get_natural_endpoints(const token&, const effective_replication_map&) const override;
virtual stop_iteration for_each_natural_endpoint_until(const token&, const effective_replication_map&, const noncopyable_function<stop_iteration(const inet_address&)>& func) const override;
virtual inet_address_vector_replica_set get_natural_endpoints(const token&, const vnode_effective_replication_map&) const override;
virtual stop_iteration for_each_natural_endpoint_until(const token&, const vnode_effective_replication_map&, const noncopyable_function<stop_iteration(const inet_address&)>& func) const override;
};
}

View File

@@ -33,11 +33,11 @@ size_t local_strategy::get_replication_factor(const token_metadata&) const {
return 1;
}
inet_address_vector_replica_set local_strategy::get_natural_endpoints(const token&, const effective_replication_map&) const {
inet_address_vector_replica_set local_strategy::get_natural_endpoints(const token&, const vnode_effective_replication_map&) const {
return inet_address_vector_replica_set({utils::fb_utilities::get_broadcast_address()});
}
stop_iteration local_strategy::for_each_natural_endpoint_until(const token&, const effective_replication_map&, const noncopyable_function<stop_iteration(const inet_address&)>& func) const {
stop_iteration local_strategy::for_each_natural_endpoint_until(const token&, const vnode_effective_replication_map&, const noncopyable_function<stop_iteration(const inet_address&)>& func) const {
return func(utils::fb_utilities::get_broadcast_address());
}

View File

@@ -43,8 +43,8 @@ public:
* We need to override this because the default implementation depends
* on token calculations but LocalStrategy may be used before tokens are set up.
*/
inet_address_vector_replica_set get_natural_endpoints(const token&, const effective_replication_map&) const override;
virtual stop_iteration for_each_natural_endpoint_until(const token&, const effective_replication_map&, const noncopyable_function<stop_iteration(const inet_address&)>& func) const override;
inet_address_vector_replica_set get_natural_endpoints(const token&, const vnode_effective_replication_map&) const override;
virtual stop_iteration for_each_natural_endpoint_until(const token&, const vnode_effective_replication_map&, const noncopyable_function<stop_iteration(const inet_address&)>& func) const override;
};
}

View File

@@ -39,14 +39,14 @@ protected:
class user_requested_repair_task_impl : public repair_task_impl {
private:
lw_shared_ptr<locator::global_effective_replication_map> _germs;
lw_shared_ptr<locator::global_vnode_effective_replication_map> _germs;
std::vector<sstring> _cfs;
dht::token_range_vector _ranges;
std::vector<sstring> _hosts;
std::vector<sstring> _data_centers;
std::unordered_set<gms::inet_address> _ignore_nodes;
public:
user_requested_repair_task_impl(tasks::task_manager::module_ptr module, repair_uniq_id id, std::string keyspace, std::string entity, lw_shared_ptr<locator::global_effective_replication_map> germs, std::vector<sstring> cfs, dht::token_range_vector ranges, std::vector<sstring> hosts, std::vector<sstring> data_centers, std::unordered_set<gms::inet_address> ignore_nodes) noexcept
user_requested_repair_task_impl(tasks::task_manager::module_ptr module, repair_uniq_id id, std::string keyspace, std::string entity, lw_shared_ptr<locator::global_vnode_effective_replication_map> germs, std::vector<sstring> cfs, dht::token_range_vector ranges, std::vector<sstring> hosts, std::vector<sstring> data_centers, std::unordered_set<gms::inet_address> ignore_nodes) noexcept
: repair_task_impl(module, id.uuid(), id.id, std::move(keyspace), "", std::move(entity), tasks::task_id::create_null_id(), streaming::stream_reason::repair)
, _germs(germs)
, _cfs(std::move(cfs))

View File

@@ -1136,8 +1136,8 @@ std::vector<sstring> database::get_non_local_strategy_keyspaces() const {
return res;
}
std::unordered_map<sstring, locator::effective_replication_map_ptr> database::get_non_local_strategy_keyspaces_erms() const {
std::unordered_map<sstring, locator::effective_replication_map_ptr> res;
std::unordered_map<sstring, locator::vnode_effective_replication_map_ptr> database::get_non_local_strategy_keyspaces_erms() const {
std::unordered_map<sstring, locator::vnode_effective_replication_map_ptr> res;
res.reserve(_keyspaces.size());
for (auto const& i : _keyspaces) {
if (i.second.get_replication_strategy().get_type() != locator::replication_strategy_type::local) {
@@ -1207,7 +1207,7 @@ keyspace::create_replication_strategy(const locator::shared_token_metadata& stm,
}
void
keyspace::update_effective_replication_map(locator::effective_replication_map_ptr erm) {
keyspace::update_effective_replication_map(locator::vnode_effective_replication_map_ptr erm) {
_effective_replication_map = std::move(erm);
}

View File

@@ -1164,7 +1164,7 @@ public:
};
private:
locator::abstract_replication_strategy::ptr_type _replication_strategy;
locator::effective_replication_map_ptr _effective_replication_map;
locator::vnode_effective_replication_map_ptr _effective_replication_map;
lw_shared_ptr<keyspace_metadata> _metadata;
config _config;
locator::effective_replication_map_factory& _erm_factory;
@@ -1190,7 +1190,7 @@ public:
*/
lw_shared_ptr<keyspace_metadata> metadata() const;
future<> create_replication_strategy(const locator::shared_token_metadata& stm, const locator::replication_strategy_config_options& options);
void update_effective_replication_map(locator::effective_replication_map_ptr erm);
void update_effective_replication_map(locator::vnode_effective_replication_map_ptr erm);
/**
* This should not really be return by reference, since replication
@@ -1204,7 +1204,7 @@ public:
return _replication_strategy;
}
locator::effective_replication_map_ptr get_effective_replication_map() const {
locator::vnode_effective_replication_map_ptr get_effective_replication_map() const {
return _effective_replication_map;
}
@@ -1521,7 +1521,7 @@ public:
std::vector<sstring> get_user_keyspaces() const;
std::vector<sstring> get_all_keyspaces() const;
std::vector<sstring> get_non_local_strategy_keyspaces() const;
std::unordered_map<sstring, locator::effective_replication_map_ptr> get_non_local_strategy_keyspaces_erms() const;
std::unordered_map<sstring, locator::vnode_effective_replication_map_ptr> get_non_local_strategy_keyspaces_erms() const;
column_family& find_column_family(std::string_view ks, std::string_view name);
const column_family& find_column_family(std::string_view ks, std::string_view name) const;
column_family& find_column_family(const table_id&);

View File

@@ -1828,13 +1828,13 @@ storage_service::get_range_to_address_map(const sstring& keyspace) const {
}
future<std::unordered_map<dht::token_range, inet_address_vector_replica_set>>
storage_service::get_range_to_address_map(locator::effective_replication_map_ptr erm) const {
storage_service::get_range_to_address_map(locator::vnode_effective_replication_map_ptr erm) const {
return get_range_to_address_map(erm, erm->get_token_metadata_ptr()->sorted_tokens());
}
// Caller is responsible to hold token_metadata valid until the returned future is resolved
future<std::unordered_map<dht::token_range, inet_address_vector_replica_set>>
storage_service::get_range_to_address_map(locator::effective_replication_map_ptr erm,
storage_service::get_range_to_address_map(locator::vnode_effective_replication_map_ptr erm,
const std::vector<token>& sorted_tokens) const {
co_return co_await construct_range_to_endpoint_map(erm, co_await get_all_ranges(sorted_tokens));
}
@@ -2531,7 +2531,7 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt
std::vector<mutable_token_metadata_ptr> pending_token_metadata_ptr;
pending_token_metadata_ptr.resize(smp::count);
std::vector<std::unordered_map<sstring, locator::effective_replication_map_ptr>> pending_effective_replication_maps;
std::vector<std::unordered_map<sstring, locator::vnode_effective_replication_map_ptr>> pending_effective_replication_maps;
pending_effective_replication_maps.resize(smp::count);
try {
@@ -2772,7 +2772,7 @@ future<std::map<gms::inet_address, float>> storage_service::get_ownership() {
future<std::map<gms::inet_address, float>> storage_service::effective_ownership(sstring keyspace_name) {
return run_with_no_api_lock([keyspace_name] (storage_service& ss) mutable -> future<std::map<gms::inet_address, float>> {
locator::effective_replication_map_ptr erm;
locator::vnode_effective_replication_map_ptr erm;
if (keyspace_name != "") {
//find throws no such keyspace if it is missing
const replica::keyspace& ks = ss._db.local().find_keyspace(keyspace_name);
@@ -4122,7 +4122,8 @@ int32_t storage_service::get_exception_count() {
return 0;
}
future<std::unordered_multimap<dht::token_range, inet_address>> storage_service::get_changed_ranges_for_leaving(locator::effective_replication_map_ptr erm, inet_address endpoint) {
future<std::unordered_multimap<dht::token_range, inet_address>>
storage_service::get_changed_ranges_for_leaving(locator::vnode_effective_replication_map_ptr erm, inet_address endpoint) {
// First get all ranges the leaving endpoint is responsible for
auto ranges = get_ranges_for_endpoint(erm, endpoint);
@@ -4472,7 +4473,7 @@ future<> storage_service::shutdown_protocol_servers() {
}
future<std::unordered_multimap<inet_address, dht::token_range>>
storage_service::get_new_source_ranges(locator::effective_replication_map_ptr erm, const dht::token_range_vector& ranges) const {
storage_service::get_new_source_ranges(locator::vnode_effective_replication_map_ptr erm, const dht::token_range_vector& ranges) const {
auto my_address = get_broadcast_address();
std::unordered_map<dht::token_range, inet_address_vector_replica_set> range_addresses = co_await erm->get_range_addresses();
std::unordered_multimap<inet_address, dht::token_range> source_ranges;
@@ -4520,7 +4521,7 @@ storage_service::describe_ring(const sstring& keyspace, bool include_only_local_
future<std::unordered_map<dht::token_range, inet_address_vector_replica_set>>
storage_service::construct_range_to_endpoint_map(
locator::effective_replication_map_ptr erm,
locator::vnode_effective_replication_map_ptr erm,
const dht::token_range_vector& ranges) const {
std::unordered_map<dht::token_range, inet_address_vector_replica_set> res;
res.reserve(ranges.size());
@@ -5026,7 +5027,7 @@ storage_service::get_splits(const sstring& ks_name, const sstring& cf_name, rang
};
dht::token_range_vector
storage_service::get_ranges_for_endpoint(const locator::effective_replication_map_ptr& erm, const gms::inet_address& ep) const {
storage_service::get_ranges_for_endpoint(const locator::vnode_effective_replication_map_ptr& erm, const gms::inet_address& ep) const {
return erm->get_ranges(ep);
}

View File

@@ -369,9 +369,9 @@ public:
sstring get_rpc_address(const inet_address& endpoint) const;
future<std::unordered_map<dht::token_range, inet_address_vector_replica_set>> get_range_to_address_map(const sstring& keyspace) const;
future<std::unordered_map<dht::token_range, inet_address_vector_replica_set>> get_range_to_address_map(locator::effective_replication_map_ptr erm) const;
future<std::unordered_map<dht::token_range, inet_address_vector_replica_set>> get_range_to_address_map(locator::vnode_effective_replication_map_ptr erm) const;
future<std::unordered_map<dht::token_range, inet_address_vector_replica_set>> get_range_to_address_map(locator::effective_replication_map_ptr erm,
future<std::unordered_map<dht::token_range, inet_address_vector_replica_set>> get_range_to_address_map(locator::vnode_effective_replication_map_ptr erm,
const std::vector<token>& sorted_tokens) const;
/**
@@ -405,7 +405,7 @@ public:
* @return mapping of ranges to the replicas responsible for them.
*/
future<std::unordered_map<dht::token_range, inet_address_vector_replica_set>> construct_range_to_endpoint_map(
locator::effective_replication_map_ptr erm,
locator::vnode_effective_replication_map_ptr erm,
const dht::token_range_vector& ranges) const;
public:
virtual future<> on_join(gms::inet_address endpoint, gms::endpoint_state ep_state) override;
@@ -554,7 +554,7 @@ private:
* @param ranges the ranges to find sources for
* @return multimap of addresses to ranges the address is responsible for
*/
future<std::unordered_multimap<inet_address, dht::token_range>> get_new_source_ranges(locator::effective_replication_map_ptr erm, const dht::token_range_vector& ranges) const;
future<std::unordered_multimap<inet_address, dht::token_range>> get_new_source_ranges(locator::vnode_effective_replication_map_ptr erm, const dht::token_range_vector& ranges) const;
/**
* Sends a notification to a node indicating we have finished replicating data.
@@ -578,7 +578,7 @@ private:
future<> removenode_add_ranges(lw_shared_ptr<dht::range_streamer> streamer, gms::inet_address leaving_node);
// needs to be modified to accept either a keyspace or ARS.
future<std::unordered_multimap<dht::token_range, inet_address>> get_changed_ranges_for_leaving(locator::effective_replication_map_ptr erm, inet_address endpoint);
future<std::unordered_multimap<dht::token_range, inet_address>> get_changed_ranges_for_leaving(locator::vnode_effective_replication_map_ptr erm, inet_address endpoint);
future<> maybe_reconnect_to_preferred_ip(inet_address ep, inet_address local_ip);
public:
@@ -596,7 +596,7 @@ public:
* @param ep endpoint we are interested in.
* @return ranges for the specified endpoint.
*/
dht::token_range_vector get_ranges_for_endpoint(const locator::effective_replication_map_ptr& erm, const gms::inet_address& ep) const;
dht::token_range_vector get_ranges_for_endpoint(const locator::vnode_effective_replication_map_ptr& erm, const gms::inet_address& ep) const;
/**
* Get all ranges that span the ring given a set

View File

@@ -64,7 +64,7 @@ static void verify_sorted(const dht::token_range_vector& trv) {
BOOST_CHECK(boost::adjacent_find(trv, not_strictly_before) == trv.end());
}
static void check_ranges_are_sorted(effective_replication_map_ptr erm, gms::inet_address ep) {
static void check_ranges_are_sorted(vnode_effective_replication_map_ptr erm, gms::inet_address ep) {
verify_sorted(erm->get_ranges(ep));
verify_sorted(erm->get_primary_ranges(ep));
verify_sorted(erm->get_primary_ranges_within_dc(ep));