query_singular(): return the used replicas

This patch implements the last_replicas returning part of the query()
signature changes for singular queries. It allows for client code to
save the last returned replicas and pass it to query() on the next page
as the preferred-replicas parameter, thus faciliate the read requests
for the next page hitting the same replicas.
This commit is contained in:
Botond Dénes
2017-12-21 12:25:22 +02:00
parent aaf67bcbaa
commit 536a32bb5e
3 changed files with 57 additions and 7 deletions

View File

@@ -222,6 +222,14 @@ utils::UUID token_metadata::get_host_id(inet_address endpoint) const {
return _endpoint_to_host_id_map.at(endpoint);
}
std::optional<utils::UUID> token_metadata::get_host_id_if_known(inet_address endpoint) const {
auto it = _endpoint_to_host_id_map.find(endpoint);
if (it == _endpoint_to_host_id_map.end()) {
return { };
}
return it->second;
}
std::experimental::optional<inet_address> token_metadata::get_endpoint_for_host_id(UUID host_id) const {
auto beg = _endpoint_to_host_id_map.cbegin();
auto end = _endpoint_to_host_id_map.cend();

View File

@@ -469,6 +469,9 @@ public:
/** Return the unique host ID for an end-point. */
UUID get_host_id(inet_address endpoint) const;
/// Return the unique host ID for an end-point or nullopt if not found.
std::optional<UUID> get_host_id_if_known(inet_address endpoint) const;
/** Return the end-point for a unique host ID */
std::experimental::optional<inet_address> get_endpoint_for_host_id(UUID host_id) const;

View File

@@ -418,6 +418,22 @@ replica_ids_to_endpoints(const std::vector<utils::UUID>& replica_ids) {
return endpoints;
}
static std::vector<utils::UUID>
endpoints_to_replica_ids(const std::vector<gms::inet_address>& endpoints) {
const auto& tm = get_local_storage_service().get_token_metadata();
std::vector<utils::UUID> replica_ids;
replica_ids.reserve(endpoints.size());
for (const auto& endpoint : endpoints) {
if (auto replica_id_opt = tm.get_host_id_if_known(endpoint)) {
replica_ids.push_back(*replica_id_opt);
}
}
return replica_ids;
}
bool storage_proxy::need_throttle_writes() const {
return _stats.background_write_bytes > memory::stats().total_memory() / 10 || _stats.queued_write_bytes > 6*1024*1024;
}
@@ -2494,6 +2510,8 @@ protected:
db::consistency_level _cl;
size_t _block_for;
std::vector<gms::inet_address> _targets;
// Targets that were succesfully used for a data or digest request
std::vector<gms::inet_address> _used_targets;
promise<foreign_ptr<lw_shared_ptr<query::result>>> _result_promise;
tracing::trace_state_ptr _trace_state;
lw_shared_ptr<column_family> _cf;
@@ -2507,7 +2525,15 @@ public:
}
virtual ~abstract_read_executor() {
_proxy->_stats.reads--;
};
}
/// Targets that were successfully ised for data and/or digest requests.
///
/// Only filled after the request is finished, call only after
/// execute()'s future is ready.
std::vector<gms::inet_address> used_targets() const {
return _used_targets;
}
protected:
future<foreign_ptr<lw_shared_ptr<reconcilable_result>>, cache_temperature> make_mutation_data_request(lw_shared_ptr<query::read_command> cmd, gms::inet_address ep, clock_type::time_point timeout) {
@@ -2579,6 +2605,7 @@ protected:
_cf->set_hit_rate(ep, std::get<1>(v));
resolver->add_data(ep, std::get<0>(std::move(v)));
++_proxy->_stats.data_read_completed.get_ep_stat(ep);
_used_targets.push_back(ep);
} catch(...) {
++_proxy->_stats.data_read_errors.get_ep_stat(ep);
resolver->error(ep, std::current_exception());
@@ -2594,6 +2621,7 @@ protected:
_cf->set_hit_rate(ep, std::get<2>(v));
resolver->add_digest(ep, std::get<0>(v), std::get<1>(v));
++_proxy->_stats.digest_read_completed.get_ep_stat(ep);
_used_targets.push_back(ep);
} catch(...) {
++_proxy->_stats.digest_read_errors.get_ep_stat(ep);
resolver->error(ep, std::current_exception());
@@ -2982,7 +3010,7 @@ storage_proxy::query_singular(lw_shared_ptr<query::read_command> cmd,
tracing::trace_state_ptr trace_state,
clock_type::time_point timeout,
replicas_per_token_range preferred_replicas) {
std::vector<::shared_ptr<abstract_read_executor>> exec;
std::vector<std::pair<::shared_ptr<abstract_read_executor>, nonwrapping_range<dht::token>>> exec;
exec.reserve(partition_ranges.size());
for (auto&& pr: partition_ranges) {
@@ -2994,30 +3022,41 @@ storage_proxy::query_singular(lw_shared_ptr<query::read_command> cmd,
auto it = preferred_replicas.find(token_range);
const auto replicas = it == preferred_replicas.end() ? std::vector<gms::inet_address>{} : replica_ids_to_endpoints(it->second);
exec.push_back(get_read_executor(cmd, std::move(pr), cl, trace_state, replicas));
exec.emplace_back(get_read_executor(cmd, std::move(pr), cl, trace_state, replicas), std::move(token_range));
}
query::result_merger merger(cmd->row_limit, cmd->partition_limit);
merger.reserve(exec.size());
auto f = ::map_reduce(exec.begin(), exec.end(), [timeout] (::shared_ptr<abstract_read_executor>& rex) {
auto used_replicas = make_lw_shared<replicas_per_token_range>();
auto f = ::map_reduce(exec.begin(), exec.end(), [timeout, used_replicas] (
std::pair<::shared_ptr<abstract_read_executor>, nonwrapping_range<dht::token>>& executor_and_token_range) {
auto& [rex, token_range] = executor_and_token_range;
utils::latency_counter lc;
lc.start();
return rex->execute(timeout).finally([lc, rex] () mutable {
return rex->execute(timeout).then_wrapped([lc, rex, used_replicas, token_range = std::move(token_range)] (
future<foreign_ptr<lw_shared_ptr<query::result>>> f) mutable {
if (!f.failed()) {
used_replicas->emplace(std::move(token_range), endpoints_to_replica_ids(rex->used_targets()));
}
if (lc.is_start()) {
rex->get_cf()->add_coordinator_read_latency(lc.stop().latency());
}
return std::move(f);
});
}, std::move(merger));
return f.then_wrapped([exec = std::move(exec), p = shared_from_this()] (future<foreign_ptr<lw_shared_ptr<query::result>>> f) {
return f.then_wrapped([exec = std::move(exec),
p = shared_from_this(),
used_replicas] (future<foreign_ptr<lw_shared_ptr<query::result>>> f) {
if (f.failed()) {
auto eptr = f.get_exception();
// hold onto exec until read is complete
p->handle_read_error(eptr, false);
return make_exception_future<foreign_ptr<lw_shared_ptr<query::result>>, replicas_per_token_range>(eptr);
}
return make_ready_future<foreign_ptr<lw_shared_ptr<query::result>>, replicas_per_token_range>(std::move(f.get0()), replicas_per_token_range{});
return make_ready_future<foreign_ptr<lw_shared_ptr<query::result>>, replicas_per_token_range>(std::move(f.get0()), std::move(*used_replicas));
});
}