erm: switch the internal data structures to host_id-s
Before this patch the host_id -> IP mapping was done in calculate_effective_replication_map. This function is called from mutate_token_metadata, which means we have to have an IP for each host_id in topology_state_load, otherwise we get an error. We are going to remove the IP waiting loop from topology_state_load, so we need to get rid of IPs resolution from calculate_effective_replication_map. In this patch we move the host_id -> IP resolution to the data plane. When a write or read request is sent the target endpoints are requested from erm through get_natural_endpoints_without_node_being_replaced, get_pending_endpoints and get_endpoints_for_reading methods and this is where the IP resolution will now occur.
This commit is contained in:
@@ -19,8 +19,9 @@
|
||||
|
||||
namespace locator {
|
||||
|
||||
static endpoint_set resolve_endpoints(const host_id_set& host_ids, const token_metadata& tm) {
|
||||
endpoint_set result{};
|
||||
template <typename ResultSet, typename SourceSet>
|
||||
static ResultSet resolve_endpoints(const SourceSet& host_ids, const token_metadata& tm) {
|
||||
ResultSet result{};
|
||||
result.reserve(host_ids.size());
|
||||
for (const auto& host_id: host_ids) {
|
||||
// Empty host_id is used as a marker for local address.
|
||||
@@ -68,7 +69,7 @@ void abstract_replication_strategy::validate_replication_strategy(const sstring&
|
||||
|
||||
future<endpoint_set> abstract_replication_strategy::calculate_natural_ips(const token& search_token, const token_metadata& tm) const {
|
||||
const auto host_ids = co_await calculate_natural_endpoints(search_token, tm);
|
||||
co_return resolve_endpoints(host_ids, tm);
|
||||
co_return resolve_endpoints<endpoint_set>(host_ids, tm);
|
||||
}
|
||||
|
||||
using strategy_class_registry = class_registry<
|
||||
@@ -109,7 +110,7 @@ void maybe_remove_node_being_replaced(const token_metadata& tm,
|
||||
}
|
||||
}
|
||||
|
||||
static const std::unordered_set<inet_address>* find_token(const ring_mapping& ring_mapping, const token& token) {
|
||||
static const std::unordered_set<locator::host_id>* find_token(const ring_mapping& ring_mapping, const token& token) {
|
||||
if (ring_mapping.empty()) {
|
||||
return nullptr;
|
||||
}
|
||||
@@ -123,7 +124,7 @@ inet_address_vector_topology_change vnode_effective_replication_map::get_pending
|
||||
const auto* pending_endpoints = find_token(_pending_endpoints, search_token);
|
||||
if (pending_endpoints) {
|
||||
// interval_map does not work with std::vector, convert to inet_address_vector_topology_change
|
||||
endpoints = inet_address_vector_topology_change(pending_endpoints->begin(), pending_endpoints->end());
|
||||
endpoints = resolve_endpoints<inet_address_vector_topology_change>(*pending_endpoints, *_tmptr);
|
||||
}
|
||||
return endpoints;
|
||||
}
|
||||
@@ -133,7 +134,7 @@ inet_address_vector_replica_set vnode_effective_replication_map::get_endpoints_f
|
||||
if (endpoints == nullptr) {
|
||||
return get_natural_endpoints_without_node_being_replaced(token);
|
||||
}
|
||||
return inet_address_vector_replica_set(endpoints->begin(), endpoints->end());
|
||||
return resolve_endpoints<inet_address_vector_replica_set>(*endpoints, *_tmptr);
|
||||
}
|
||||
|
||||
std::optional<tablet_routing_info> vnode_effective_replication_map::check_locality(const token& token) const {
|
||||
@@ -141,13 +142,9 @@ std::optional<tablet_routing_info> vnode_effective_replication_map::check_locali
|
||||
}
|
||||
|
||||
bool vnode_effective_replication_map::has_pending_ranges(locator::host_id endpoint) const {
|
||||
const auto ep = _tmptr->get_endpoint_for_host_id_if_known(endpoint);
|
||||
if (!ep) {
|
||||
return false;
|
||||
}
|
||||
for (const auto& item : _pending_endpoints) {
|
||||
const auto& nodes = item.second;
|
||||
if (nodes.contains(*ep)) {
|
||||
if (nodes.contains(endpoint)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
@@ -392,7 +389,7 @@ future<mutable_vnode_effective_replication_map_ptr> calculate_effective_replicat
|
||||
auto current_endpoints = co_await rs->calculate_natural_endpoints(token, *tmptr);
|
||||
auto target_endpoints = co_await rs->calculate_natural_endpoints(token, *topology_changes->target_token_metadata);
|
||||
|
||||
auto add_mapping = [&](ring_mapping& target, std::unordered_set<inet_address>&& endpoints) {
|
||||
auto add_mapping = [&](ring_mapping& target, std::unordered_set<locator::host_id>&& endpoints) {
|
||||
using interval = ring_mapping::interval_type;
|
||||
if (!depend_on_token) {
|
||||
target += std::make_pair(
|
||||
@@ -420,30 +417,30 @@ future<mutable_vnode_effective_replication_map_ptr> calculate_effective_replicat
|
||||
}
|
||||
}
|
||||
if (!endpoints_diff.empty()) {
|
||||
add_mapping(pending_endpoints, resolve_endpoints(endpoints_diff, *tmptr).extract_set());
|
||||
add_mapping(pending_endpoints, std::move(endpoints_diff).extract_set());
|
||||
}
|
||||
}
|
||||
|
||||
// in order not to waste memory, we update read_endpoints only if the
|
||||
// new endpoints differs from the old one
|
||||
if (topology_changes->read_new && target_endpoints.get_vector() != current_endpoints.get_vector()) {
|
||||
add_mapping(read_endpoints, resolve_endpoints(target_endpoints, *tmptr).extract_set());
|
||||
add_mapping(read_endpoints, std::move(target_endpoints).extract_set());
|
||||
}
|
||||
|
||||
if (!depend_on_token) {
|
||||
replication_map.emplace(default_replication_map_key, resolve_endpoints(current_endpoints, *tmptr).extract_vector());
|
||||
replication_map.emplace(default_replication_map_key, std::move(current_endpoints).extract_vector());
|
||||
break;
|
||||
} else if (current_tokens.contains(token)) {
|
||||
replication_map.emplace(token, resolve_endpoints(current_endpoints, *tmptr).extract_vector());
|
||||
replication_map.emplace(token, std::move(current_endpoints).extract_vector());
|
||||
}
|
||||
}
|
||||
} else if (depend_on_token) {
|
||||
for (const auto &t : sorted_tokens) {
|
||||
auto eps = co_await rs->calculate_natural_ips(t, *tmptr);
|
||||
auto eps = co_await rs->calculate_natural_endpoints(t, *tmptr);
|
||||
replication_map.emplace(t, std::move(eps).extract_vector());
|
||||
}
|
||||
} else {
|
||||
auto eps = co_await rs->calculate_natural_ips(default_replication_map_key, *tmptr);
|
||||
auto eps = co_await rs->calculate_natural_endpoints(default_replication_map_key, *tmptr);
|
||||
replication_map.emplace(default_replication_map_key, std::move(eps).extract_vector());
|
||||
}
|
||||
|
||||
@@ -472,14 +469,14 @@ auto vnode_effective_replication_map::clone_data_gently() const -> future<std::u
|
||||
co_return std::move(result);
|
||||
}
|
||||
|
||||
const inet_address_vector_replica_set& vnode_effective_replication_map::do_get_natural_endpoints(const token& tok,
|
||||
inet_address_vector_replica_set vnode_effective_replication_map::do_get_natural_endpoints(const token& tok,
|
||||
bool is_vnode) const
|
||||
{
|
||||
const token& key_token = _rs->natural_endpoints_depend_on_token()
|
||||
? (is_vnode ? tok : _tmptr->first_token(tok))
|
||||
: default_replication_map_key;
|
||||
const auto it = _replication_map.find(key_token);
|
||||
return it->second;
|
||||
return resolve_endpoints<inet_address_vector_replica_set>(it->second, *_tmptr);
|
||||
}
|
||||
|
||||
inet_address_vector_replica_set vnode_effective_replication_map::get_natural_endpoints(const token& search_token) const {
|
||||
|
||||
@@ -55,7 +55,7 @@ struct replication_strategy_params {
|
||||
explicit replication_strategy_params(const replication_strategy_config_options& o, std::optional<unsigned> it) noexcept : options(o), initial_tablets(it) {}
|
||||
};
|
||||
|
||||
using replication_map = std::unordered_map<token, inet_address_vector_replica_set>;
|
||||
using replication_map = std::unordered_map<token, host_id_vector_replica_set>;
|
||||
|
||||
using endpoint_set = utils::basic_sequenced_set<inet_address, inet_address_vector_replica_set>;
|
||||
using host_id_set = utils::basic_sequenced_set<locator::host_id, host_id_vector_replica_set>;
|
||||
@@ -163,7 +163,7 @@ public:
|
||||
future<dht::token_range_vector> get_pending_address_ranges(const token_metadata_ptr tmptr, std::unordered_set<token> pending_tokens, locator::host_id pending_address, locator::endpoint_dc_rack dr) const;
|
||||
};
|
||||
|
||||
using ring_mapping = boost::icl::interval_map<token, std::unordered_set<inet_address>>;
|
||||
using ring_mapping = boost::icl::interval_map<token, std::unordered_set<locator::host_id>>;
|
||||
using replication_strategy_ptr = seastar::shared_ptr<const abstract_replication_strategy>;
|
||||
using mutable_replication_strategy_ptr = seastar::shared_ptr<abstract_replication_strategy>;
|
||||
|
||||
@@ -359,7 +359,7 @@ public:
|
||||
|
||||
private:
|
||||
dht::token_range_vector do_get_ranges(noncopyable_function<stop_iteration(bool& add_range, const inet_address& natural_endpoint)> consider_range_for_endpoint) const;
|
||||
const inet_address_vector_replica_set& do_get_natural_endpoints(const token& tok, bool is_vnode) const;
|
||||
inet_address_vector_replica_set do_get_natural_endpoints(const token& tok, bool is_vnode) const;
|
||||
|
||||
public:
|
||||
static factory_key make_factory_key(const replication_strategy_ptr& rs, const token_metadata_ptr& tmptr);
|
||||
|
||||
Reference in New Issue
Block a user