diff --git a/locator/token_metadata.cc b/locator/token_metadata.cc index b4d7af46cf..6d02780bbd 100644 --- a/locator/token_metadata.cc +++ b/locator/token_metadata.cc @@ -222,6 +222,14 @@ utils::UUID token_metadata::get_host_id(inet_address endpoint) const { return _endpoint_to_host_id_map.at(endpoint); } +std::optional token_metadata::get_host_id_if_known(inet_address endpoint) const { + auto it = _endpoint_to_host_id_map.find(endpoint); + if (it == _endpoint_to_host_id_map.end()) { + return { }; + } + return it->second; +} + std::experimental::optional token_metadata::get_endpoint_for_host_id(UUID host_id) const { auto beg = _endpoint_to_host_id_map.cbegin(); auto end = _endpoint_to_host_id_map.cend(); diff --git a/locator/token_metadata.hh b/locator/token_metadata.hh index 2ad0b38ac5..aaf2f66c6c 100644 --- a/locator/token_metadata.hh +++ b/locator/token_metadata.hh @@ -469,6 +469,9 @@ public: /** Return the unique host ID for an end-point. */ UUID get_host_id(inet_address endpoint) const; + /// Return the unique host ID for an end-point or nullopt if not found. + std::optional get_host_id_if_known(inet_address endpoint) const; + /** Return the end-point for a unique host ID */ std::experimental::optional get_endpoint_for_host_id(UUID host_id) const; diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 26d4915e1a..2800c003e9 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -418,6 +418,22 @@ replica_ids_to_endpoints(const std::vector& replica_ids) { return endpoints; } +static std::vector +endpoints_to_replica_ids(const std::vector& endpoints) { + const auto& tm = get_local_storage_service().get_token_metadata(); + + std::vector replica_ids; + replica_ids.reserve(endpoints.size()); + + for (const auto& endpoint : endpoints) { + if (auto replica_id_opt = tm.get_host_id_if_known(endpoint)) { + replica_ids.push_back(*replica_id_opt); + } + } + + return replica_ids; +} + bool storage_proxy::need_throttle_writes() const { return _stats.background_write_bytes > memory::stats().total_memory() / 10 || _stats.queued_write_bytes > 6*1024*1024; } @@ -2494,6 +2510,8 @@ protected: db::consistency_level _cl; size_t _block_for; std::vector _targets; + // Targets that were succesfully used for a data or digest request + std::vector _used_targets; promise>> _result_promise; tracing::trace_state_ptr _trace_state; lw_shared_ptr _cf; @@ -2507,7 +2525,15 @@ public: } virtual ~abstract_read_executor() { _proxy->_stats.reads--; - }; + } + + /// Targets that were successfully ised for data and/or digest requests. + /// + /// Only filled after the request is finished, call only after + /// execute()'s future is ready. + std::vector used_targets() const { + return _used_targets; + } protected: future>, cache_temperature> make_mutation_data_request(lw_shared_ptr cmd, gms::inet_address ep, clock_type::time_point timeout) { @@ -2579,6 +2605,7 @@ protected: _cf->set_hit_rate(ep, std::get<1>(v)); resolver->add_data(ep, std::get<0>(std::move(v))); ++_proxy->_stats.data_read_completed.get_ep_stat(ep); + _used_targets.push_back(ep); } catch(...) { ++_proxy->_stats.data_read_errors.get_ep_stat(ep); resolver->error(ep, std::current_exception()); @@ -2594,6 +2621,7 @@ protected: _cf->set_hit_rate(ep, std::get<2>(v)); resolver->add_digest(ep, std::get<0>(v), std::get<1>(v)); ++_proxy->_stats.digest_read_completed.get_ep_stat(ep); + _used_targets.push_back(ep); } catch(...) { ++_proxy->_stats.digest_read_errors.get_ep_stat(ep); resolver->error(ep, std::current_exception()); @@ -2982,7 +3010,7 @@ storage_proxy::query_singular(lw_shared_ptr cmd, tracing::trace_state_ptr trace_state, clock_type::time_point timeout, replicas_per_token_range preferred_replicas) { - std::vector<::shared_ptr> exec; + std::vector, nonwrapping_range>> exec; exec.reserve(partition_ranges.size()); for (auto&& pr: partition_ranges) { @@ -2994,30 +3022,41 @@ storage_proxy::query_singular(lw_shared_ptr cmd, auto it = preferred_replicas.find(token_range); const auto replicas = it == preferred_replicas.end() ? std::vector{} : replica_ids_to_endpoints(it->second); - exec.push_back(get_read_executor(cmd, std::move(pr), cl, trace_state, replicas)); + exec.emplace_back(get_read_executor(cmd, std::move(pr), cl, trace_state, replicas), std::move(token_range)); } query::result_merger merger(cmd->row_limit, cmd->partition_limit); merger.reserve(exec.size()); - auto f = ::map_reduce(exec.begin(), exec.end(), [timeout] (::shared_ptr& rex) { + auto used_replicas = make_lw_shared(); + + auto f = ::map_reduce(exec.begin(), exec.end(), [timeout, used_replicas] ( + std::pair<::shared_ptr, nonwrapping_range>& executor_and_token_range) { + auto& [rex, token_range] = executor_and_token_range; utils::latency_counter lc; lc.start(); - return rex->execute(timeout).finally([lc, rex] () mutable { + return rex->execute(timeout).then_wrapped([lc, rex, used_replicas, token_range = std::move(token_range)] ( + future>> f) mutable { + if (!f.failed()) { + used_replicas->emplace(std::move(token_range), endpoints_to_replica_ids(rex->used_targets())); + } if (lc.is_start()) { rex->get_cf()->add_coordinator_read_latency(lc.stop().latency()); } + return std::move(f); }); }, std::move(merger)); - return f.then_wrapped([exec = std::move(exec), p = shared_from_this()] (future>> f) { + return f.then_wrapped([exec = std::move(exec), + p = shared_from_this(), + used_replicas] (future>> f) { if (f.failed()) { auto eptr = f.get_exception(); // hold onto exec until read is complete p->handle_read_error(eptr, false); return make_exception_future>, replicas_per_token_range>(eptr); } - return make_ready_future>, replicas_per_token_range>(std::move(f.get0()), replicas_per_token_range{}); + return make_ready_future>, replicas_per_token_range>(std::move(f.get0()), std::move(*used_replicas)); }); }