code: Call sort_endpoints_by_proximity() via topology

The method is about to be moved from snitch to topology, this patch
prepares the rest of the code to use the latter to call it. The
topology's method just calls snitch, but it's going to change in the
next patch.

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
This commit is contained in:
Pavel Emelyanov
2022-08-30 13:05:00 +03:00
parent 4184091f1c
commit b6fdea9a79
7 changed files with 14 additions and 12 deletions

View File

@@ -11,7 +11,6 @@
#include <seastar/core/sleep.hh>
#include "dht/range_streamer.hh"
#include "utils/fb_utilities.hh"
#include "locator/snitch_base.hh"
#include "replica/database.hh"
#include "gms/gossiper.hh"
#include "gms/failure_detector.hh"
@@ -89,7 +88,6 @@ range_streamer::get_all_ranges_with_sources_for(const sstring& keyspace_name, lo
logger.debug("keyspace={}, desired_ranges.size={}, range_addresses.size={}", keyspace_name, desired_ranges.size(), range_addresses.size());
std::unordered_map<dht::token_range, std::vector<inet_address>> range_sources;
auto& snitch = locator::i_endpoint_snitch::get_local_snitch_ptr();
for (auto& desired_range : desired_ranges) {
auto found = false;
for (auto& x : range_addresses) {
@@ -99,7 +97,7 @@ range_streamer::get_all_ranges_with_sources_for(const sstring& keyspace_name, lo
const range<token>& src_range = x.first;
if (src_range.contains(desired_range, dht::tri_compare)) {
inet_address_vector_replica_set preferred(x.second.begin(), x.second.end());
snitch->sort_by_proximity(_address, preferred);
get_token_metadata().get_topology().sort_by_proximity(_address, preferred);
for (inet_address& p : preferred) {
range_sources[desired_range].push_back(p);
}

View File

@@ -1319,6 +1319,10 @@ std::function<bool(inet_address)> topology::get_local_dc_filter() const noexcept
};
}
void topology::sort_by_proximity(inet_address address, inet_address_vector_replica_set& addresses) const {
i_endpoint_snitch::get_local_snitch_ptr()->sort_by_proximity(address, addresses);
}
/////////////////// class topology end /////////////////////////////////////////
void shared_token_metadata::set(mutable_token_metadata_ptr tmptr) noexcept {

View File

@@ -105,6 +105,8 @@ public:
return std::count_if(endpoints.begin(), endpoints.end(), filter);
}
void sort_by_proximity(inet_address address, inet_address_vector_replica_set& addresses) const;
private:
/** multi-map: DC -> endpoints in that DC */
std::unordered_map<sstring,

View File

@@ -2441,9 +2441,8 @@ private:
inet_address_vector_replica_set sort_peer_nodes(const std::vector<gms::inet_address>& nodes) {
auto myip = utils::fb_utilities::get_broadcast_address();
auto& snitch = locator::i_endpoint_snitch::get_local_snitch_ptr();
inet_address_vector_replica_set sorted_nodes(nodes.begin(), nodes.end());
snitch->sort_by_proximity(myip, sorted_nodes);
_ri.db.local().get_token_metadata().get_topology().sort_by_proximity(myip, sorted_nodes);
return sorted_nodes;
}

View File

@@ -3003,8 +3003,7 @@ gms::inet_address storage_proxy::find_leader_for_counter_update(const mutation&
if (local_endpoints.empty()) {
// FIXME: O(n log n) to get maximum
auto& snitch = locator::i_endpoint_snitch::get_local_snitch_ptr();
snitch->sort_by_proximity(my_address, live_endpoints);
erm->get_topology().sort_by_proximity(my_address, live_endpoints);
return live_endpoints[0];
} else {
static thread_local std::default_random_engine re{std::random_device{}()};
@@ -5859,8 +5858,8 @@ inet_address_vector_replica_set storage_proxy::get_live_endpoints(const locator:
return eps;
}
void storage_proxy::sort_endpoints_by_proximity(inet_address_vector_replica_set& eps) {
locator::i_endpoint_snitch::get_local_snitch_ptr()->sort_by_proximity(utils::fb_utilities::get_broadcast_address(), eps);
void storage_proxy::sort_endpoints_by_proximity(inet_address_vector_replica_set& eps) const {
get_token_metadata_ptr()->get_topology().sort_by_proximity(utils::fb_utilities::get_broadcast_address(), eps);
// FIXME: before dynamic snitch is implement put local address (if present) at the beginning
auto it = boost::range::find(eps, utils::fb_utilities::get_broadcast_address());
if (it != eps.end() && it != eps.begin()) {

View File

@@ -332,7 +332,7 @@ private:
bool cannot_hint(const Range& targets, db::write_type type) const;
bool hints_enabled(db::write_type type) const noexcept;
db::hints::manager& hints_manager_for(db::write_type type);
static void sort_endpoints_by_proximity(inet_address_vector_replica_set& eps);
void sort_endpoints_by_proximity(inet_address_vector_replica_set& eps) const;
inet_address_vector_replica_set get_live_sorted_endpoints(const locator::effective_replication_map& erm, const dht::token& token) const;
bool is_alive(const gms::inet_address&) const;
db::read_repair_decision new_read_repair_decision(const schema& s);

View File

@@ -3082,6 +3082,7 @@ storage_service::get_new_source_ranges(locator::effective_replication_map_ptr er
std::unordered_multimap<inet_address, dht::token_range> source_ranges;
// find alive sources for our new ranges
auto tmptr = erm->get_token_metadata_ptr();
for (auto r : ranges) {
inet_address_vector_replica_set sources;
auto it = range_addresses.find(r);
@@ -3089,8 +3090,7 @@ storage_service::get_new_source_ranges(locator::effective_replication_map_ptr er
sources = it->second;
}
auto& snitch = locator::i_endpoint_snitch::get_local_snitch_ptr();
snitch->sort_by_proximity(my_address, sources);
tmptr->get_topology().sort_by_proximity(my_address, sources);
if (std::find(sources.begin(), sources.end(), my_address) != sources.end()) {
auto err = format("get_new_source_ranges: sources={}, my_address={}", sources, my_address);