storage_proxy: endpoint_filter: remove gossiper dependency

The function used `gossiper&` to check whether an endpoint is considered
alive. Abstract this out through `noncopyable_function`.

This will allow us to use `endpoint_filter` during local queries when
`remote` (which contains the `gossiper` reference) is unavailable.
This commit is contained in:
Kamil Braun
2022-08-09 13:25:48 +02:00
parent 79bfe04d2a
commit c5c78a7922

View File

@@ -3368,7 +3368,9 @@ storage_proxy::mutate_atomically(std::vector<mutation> mutations, db::consistenc
.then(utils::result_into_future<result<>>);
}
static inet_address_vector_replica_set endpoint_filter(const gms::gossiper& g, const sstring& local_rack, const std::unordered_map<sstring, std::unordered_set<gms::inet_address>>& endpoints) {
static inet_address_vector_replica_set endpoint_filter(
const noncopyable_function<bool(const gms::inet_address&)>& is_alive,
const sstring& local_rack, const std::unordered_map<sstring, std::unordered_set<gms::inet_address>>& endpoints) {
// special case for single-node data centers
if (endpoints.size() == 1 && endpoints.begin()->second.size() == 1) {
return boost::copy_range<inet_address_vector_replica_set>(endpoints.begin()->second);
@@ -3377,9 +3379,9 @@ static inet_address_vector_replica_set endpoint_filter(const gms::gossiper& g, c
// strip out dead endpoints and localhost
std::unordered_multimap<sstring, gms::inet_address> validated;
auto is_valid = [&g] (gms::inet_address input) {
auto is_valid = [&is_alive] (gms::inet_address input) {
return input != utils::fb_utilities::get_broadcast_address()
&& g.is_alive(input);
&& is_alive(input);
};
for (auto& e : endpoints) {
@@ -3473,8 +3475,8 @@ storage_proxy::mutate_atomically_result(std::vector<mutation> mutations, db::con
auto local_dc = topology.get_datacenter();
auto& local_endpoints = topology.get_datacenter_racks().at(local_dc);
auto local_rack = topology.get_rack();
auto& gossiper = _p._remote->gossiper();
auto chosen_endpoints = endpoint_filter(gossiper, local_rack, local_endpoints);
auto chosen_endpoints = endpoint_filter(std::bind_front(&storage_proxy::is_alive, &_p),
local_rack, local_endpoints);
if (chosen_endpoints.empty()) {
if (_cl == db::consistency_level::ANY) {