storage_proxy, storage_service: use new read endpoints

We use set_topology_transition_state to set read_new state
in storage_service::topology_state_load
based on _topology_state_machine._topology.tstate.
This triggers update_pending_ranges to compute and store new ranges
for read requests. We use this information in
storage_proxy::get_endpoints_for_reading
when we need to decide which nodes to use for reading.
This commit is contained in:
Petr Gusev
2023-04-05 19:39:16 +04:00
parent 052b91fb1f
commit 08529a1c6c
2 changed files with 10 additions and 3 deletions

View File

@@ -6006,9 +6006,14 @@ void storage_proxy::sort_endpoints_by_proximity(const locator::topology& topo, i
}
inet_address_vector_replica_set storage_proxy::get_endpoints_for_reading(const sstring& ks_name, const locator::effective_replication_map& erm, const dht::token& token) const {
auto eps = get_live_endpoints(erm, token);
sort_endpoints_by_proximity(erm.get_topology(), eps);
return eps;
auto endpoints = erm.get_token_metadata_ptr()->endpoints_for_reading(token, ks_name);
if (!endpoints) {
endpoints = erm.get_natural_endpoints_without_node_being_replaced(token);
}
auto it = boost::range::remove_if(*endpoints, std::not_fn(std::bind_front(&storage_proxy::is_alive, this)));
endpoints->erase(it, endpoints->end());
sort_endpoints_by_proximity(erm.get_topology(), *endpoints);
return std::move(*endpoints);
}
bool storage_proxy::is_alive(const gms::inet_address& ep) const {

View File

@@ -354,6 +354,8 @@ future<> storage_service::topology_state_load(cdc::generation_service& cdc_gen_s
co_await add_normal_node(id, rs);
}
tmptr->set_topology_transition_state(_topology_state_machine._topology.tstate);
for (const auto& [id, rs]: _topology_state_machine._topology.transition_nodes) {
locator::host_id host_id{id.uuid()};
auto ip = co_await id2ip(id);