storage_proxy: convert get_live{,_sorted}_endpoints() to accept an effective_replication_map

Allow callers to use consistent effective_replication_map:s across calls
by letting the caller select the object to use.
This commit is contained in:
Avi Kivity
2022-08-09 16:54:01 +03:00
parent 46bd0b1e62
commit f1b0e3d58e
3 changed files with 12 additions and 12 deletions

View File

@@ -493,7 +493,8 @@ future<query::forward_result> forward_service::dispatch(query::forward_request r
std::map<netw::messaging_service::msg_addr, dht::partition_range_vector> vnodes_per_addr;
const auto& topo = get_token_metadata_ptr()->get_topology();
while (std::optional<dht::partition_range> vnode = next_vnode()) {
inet_address_vector_replica_set live_endpoints = _proxy.get_live_endpoints(ks, end_token(*vnode));
auto erm = ks.get_effective_replication_map();
inet_address_vector_replica_set live_endpoints = _proxy.get_live_endpoints(*erm, end_token(*vnode));
// Do not choose an endpoint outside the current datacenter if a request has a local consistency
if (db::is_datacenter_local(req.cl)) {
retain_local_endpoints(topo, live_endpoints);

View File

@@ -2966,7 +2966,7 @@ future<result<>> storage_proxy::mutate_end(future<result<>> mutate_result, utils
gms::inet_address storage_proxy::find_leader_for_counter_update(const mutation& m, db::consistency_level cl) {
auto& ks = _db.local().find_keyspace(m.schema()->ks_name());
auto erm = ks.get_effective_replication_map();
auto live_endpoints = get_live_endpoints(ks, m.token());
auto live_endpoints = get_live_endpoints(*erm, m.token());
if (live_endpoints.empty()) {
throw exceptions::unavailable_exception(cl, block_for(*erm, cl), 0);
@@ -4927,7 +4927,7 @@ result<::shared_ptr<abstract_read_executor>> storage_proxy::get_read_executor(lw
speculative_retry::type retry_type = schema->speculative_retry().get_type();
gms::inet_address extra_replica;
inet_address_vector_replica_set all_replicas = get_live_sorted_endpoints(ks, token);
inet_address_vector_replica_set all_replicas = get_live_sorted_endpoints(*erm, token);
// Check for a non-local read before heat-weighted load balancing
// reordering of endpoints happens. The local endpoint, if
// present, is always first in the list, as get_live_sorted_endpoints()
@@ -5211,7 +5211,7 @@ storage_proxy::query_partition_key_range_concurrent(storage_proxy::clock_type::t
auto& gossiper = _remote->gossiper();
while (i != ranges.end()) {
dht::partition_range& range = *i;
inet_address_vector_replica_set live_endpoints = get_live_sorted_endpoints(ks, end_token(range));
inet_address_vector_replica_set live_endpoints = get_live_sorted_endpoints(*erm, end_token(range));
inet_address_vector_replica_set merged_preferred_replicas = preferred_replicas_for_range(*i);
inet_address_vector_replica_set filtered_endpoints = filter_for_query(cl, *erm, live_endpoints, merged_preferred_replicas, gossiper, pcf);
std::vector<dht::token_range> merged_ranges{to_token_range(range)};
@@ -5224,7 +5224,7 @@ storage_proxy::query_partition_key_range_concurrent(storage_proxy::clock_type::t
{
const auto current_range_preferred_replicas = preferred_replicas_for_range(*i);
dht::partition_range& next_range = *i;
inet_address_vector_replica_set next_endpoints = get_live_sorted_endpoints(ks, end_token(next_range));
inet_address_vector_replica_set next_endpoints = get_live_sorted_endpoints(*erm, end_token(next_range));
inet_address_vector_replica_set next_filtered_endpoints = filter_for_query(cl, *erm, next_endpoints, current_range_preferred_replicas, gossiper, pcf);
// Origin has this to say here:
@@ -5795,9 +5795,8 @@ future<bool> storage_proxy::cas(schema_ptr schema, shared_ptr<cas_request> reque
co_return condition_met;
}
inet_address_vector_replica_set storage_proxy::get_live_endpoints(replica::keyspace& ks, const dht::token& token) const {
auto erm = ks.get_effective_replication_map();
inet_address_vector_replica_set eps = erm->get_natural_endpoints_without_node_being_replaced(token);
inet_address_vector_replica_set storage_proxy::get_live_endpoints(const locator::effective_replication_map& erm, const dht::token& token) const {
inet_address_vector_replica_set eps = erm.get_natural_endpoints_without_node_being_replaced(token);
auto itend = boost::range::remove_if(eps, std::not_fn(std::bind_front(&storage_proxy::is_alive, this)));
eps.erase(itend, eps.end());
return eps;
@@ -5812,8 +5811,8 @@ void storage_proxy::sort_endpoints_by_proximity(inet_address_vector_replica_set&
}
}
inet_address_vector_replica_set storage_proxy::get_live_sorted_endpoints(replica::keyspace& ks, const dht::token& token) const {
auto eps = get_live_endpoints(ks, token);
inet_address_vector_replica_set storage_proxy::get_live_sorted_endpoints(const locator::effective_replication_map& erm, const dht::token& token) const {
auto eps = get_live_endpoints(erm, token);
sort_endpoints_by_proximity(eps);
return eps;
}

View File

@@ -226,7 +226,7 @@ public:
query::max_result_size get_max_result_size(const query::partition_slice& slice) const;
query::tombstone_limit get_tombstone_limit() const;
inet_address_vector_replica_set get_live_endpoints(replica::keyspace& ks, const dht::token& token) const;
inet_address_vector_replica_set get_live_endpoints(const locator::effective_replication_map& erm, const dht::token& token) const;
private:
distributed<replica::database>& _db;
@@ -332,7 +332,7 @@ private:
bool hints_enabled(db::write_type type) const noexcept;
db::hints::manager& hints_manager_for(db::write_type type);
static void sort_endpoints_by_proximity(inet_address_vector_replica_set& eps);
inet_address_vector_replica_set get_live_sorted_endpoints(replica::keyspace& ks, const dht::token& token) const;
inet_address_vector_replica_set get_live_sorted_endpoints(const locator::effective_replication_map& erm, const dht::token& token) const;
bool is_alive(const gms::inet_address&) const;
db::read_repair_decision new_read_repair_decision(const schema& s);
result<::shared_ptr<abstract_read_executor>> get_read_executor(lw_shared_ptr<query::read_command> cmd,