storage_proxy: add coordinator_query_options and coordinator_query_result

As yet more parameters and return-values are about to be added to all
storage_proxy::query_* methods we need a way that scales better than
changing the signatures every time. To this end we aggregate all
non-mandatory query parameters into `coordinator_query_options` and all
return values into `coordinator_query_result`.
This way new fields can be simply added to the respective structs while
the signatures of the methods themselves and their client code can
remain unchanged.
This commit is contained in:
Botond Dénes
2018-03-15 15:24:12 +02:00
parent b55dcc2ce5
commit 2e2abf6edb
7 changed files with 111 additions and 126 deletions

View File

@@ -295,8 +295,8 @@ modification_statement::read_required_rows(
query::read_command cmd(s->id(), s->version(), ps, std::numeric_limits<uint32_t>::max());
// FIXME: ignoring "local"
return proxy.local().query(s, make_lw_shared(std::move(cmd)), std::move(keys),
cl, std::move(trace_state)).then([this, ps] (auto result, service::replicas_per_token_range) {
return query::result_view::do_with(*result, [&] (query::result_view v) {
cl, {std::move(trace_state)}).then([this, ps] (auto qr) {
return query::result_view::do_with(*qr.query_result, [&] (query::result_view v) {
auto prefetched_rows = update_parameters::prefetched_rows_type({update_parameters::prefetch_data(s)});
v.consume(ps, prefetch_data_builder(s, prefetched_rows.value(), ps));
return prefetched_rows;

View File

@@ -330,17 +330,17 @@ select_statement::execute(distributed<service::storage_proxy>& proxy,
command,
std::move(prange),
options.get_consistency(),
state.get_trace_state()).then([] (foreign_ptr<lw_shared_ptr<query::result>>&& result, service::replicas_per_token_range) {
return std::move(result);
{state.get_trace_state()}).then([] (service::storage_proxy::coordinator_query_result qr) {
return std::move(qr.query_result);
});
}, std::move(merger));
}).then([this, &options, now, cmd] (auto result) {
return this->process_results(std::move(result), cmd, options, now);
});
} else {
return proxy.local().query(_schema, cmd, std::move(partition_ranges), options.get_consistency(), state.get_trace_state())
.then([this, &options, now, cmd] (auto result, service::replicas_per_token_range) {
return this->process_results(std::move(result), cmd, options, now);
return proxy.local().query(_schema, cmd, std::move(partition_ranges), options.get_consistency(), {state.get_trace_state()})
.then([this, &options, now, cmd] (service::storage_proxy::coordinator_query_result qr) {
return this->process_results(std::move(qr.query_result), cmd, options, now);
});
}
}
@@ -371,18 +371,18 @@ select_statement::execute_internal(distributed<service::storage_proxy>& proxy,
return map_reduce(prs.begin(), prs.end(), [this, &proxy, &state, command] (auto pr) {
dht::partition_range_vector prange { pr };
auto cmd = ::make_lw_shared<query::read_command>(*command);
return proxy.local().query(_schema, cmd, std::move(prange), db::consistency_level::ONE, state.get_trace_state(),
db::no_timeout, {}).then([] (foreign_ptr<lw_shared_ptr<query::result>> result, service::replicas_per_token_range) {
return std::move(result);
return proxy.local().query(_schema, cmd, std::move(prange), db::consistency_level::ONE, {state.get_trace_state(),
db::no_timeout}).then([] (service::storage_proxy::coordinator_query_result qr) {
return std::move(qr.query_result);
});
}, std::move(merger));
}).then([command, this, &options, now] (auto result) {
return this->process_results(std::move(result), command, options, now);
}).finally([command] { });
} else {
auto query = proxy.local().query(_schema, command, std::move(partition_ranges), db::consistency_level::ONE, state.get_trace_state(), db::no_timeout);
return query.then([command, this, &options, now] (auto result, service::replicas_per_token_range) {
return this->process_results(std::move(result), command, options, now);
auto query = proxy.local().query(_schema, command, std::move(partition_ranges), db::consistency_level::ONE, {state.get_trace_state(), db::no_timeout});
return query.then([command, this, &options, now] (service::storage_proxy::coordinator_query_result qr) {
return this->process_results(std::move(qr.query_result), command, options, now);
}).finally([command] {});
}
}
@@ -551,15 +551,15 @@ indexed_table_select_statement::find_index_partition_ranges(distributed<service:
cmd,
std::move(partition_ranges),
options.get_consistency(),
state.get_trace_state()).then([cmd, this, &options, now, &view] (foreign_ptr<lw_shared_ptr<query::result>> result,
service::replicas_per_token_range) {
{state.get_trace_state()}).then(
[cmd, this, &options, now, &view] (service::storage_proxy::coordinator_query_result qr) {
std::vector<const column_definition*> columns;
for (const column_definition& cdef : _schema->partition_key_columns()) {
columns.emplace_back(view.schema()->get_column_definition(cdef.name()));
}
auto selection = selection::selection::for_columns(view.schema(), columns);
cql3::selection::result_set_builder builder(*selection, now, options.get_cql_serialization_format());
query::result_view::consume(*result,
query::result_view::consume(*qr.query_result,
cmd->slice,
cql3::selection::result_set_builder::visitor(builder, *view.schema(), *selection));
auto rs = cql3::untyped_result_set(::make_shared<cql_transport::messages::result_message::rows>(std::move(builder.build())));

View File

@@ -1664,8 +1664,8 @@ query(distributed<service::storage_proxy>& proxy, const sstring& ks_name, const
auto cmd = make_lw_shared<query::read_command>(schema->id(), schema->version(),
std::move(slice), std::numeric_limits<uint32_t>::max());
return proxy.local().query(schema, cmd, {query::full_partition_range}, db::consistency_level::ONE,
nullptr, db::no_timeout).then([schema, cmd] (auto&& result, service::replicas_per_token_range) {
return make_lw_shared(query::result_set::from_raw_result(schema, cmd->slice, *result));
{nullptr, db::no_timeout}).then([schema, cmd] (auto&& qr) {
return make_lw_shared(query::result_set::from_raw_result(schema, cmd->slice, *qr.query_result));
});
}
@@ -1680,8 +1680,8 @@ query(distributed<service::storage_proxy>& proxy, const sstring& ks_name, const
auto cmd = make_lw_shared<query::read_command>(schema->id(), schema->version(), std::move(slice), query::max_rows);
return proxy.local().query(schema, cmd, {dht::partition_range::make_singular(key)}, db::consistency_level::ONE,
nullptr, db::no_timeout).then([schema, cmd] (auto&& result, service::replicas_per_token_range) {
return make_lw_shared(query::result_set::from_raw_result(schema, cmd->slice, *result));
{nullptr, db::no_timeout}).then([schema, cmd] (auto&& qr) {
return make_lw_shared(query::result_set::from_raw_result(schema, cmd->slice, *qr.query_result));
});
}

View File

@@ -214,12 +214,14 @@ private:
auto ranges = _ranges;
auto command = ::make_lw_shared<query::read_command>(*_cmd);
auto& sp = get_local_storage_proxy();
return sp.query(_schema, std::move(command), std::move(ranges),
_options.get_consistency(), _state.get_trace_state(), sp.default_query_timeout(), std::move(_last_replicas)).then(
[this, &builder, page_size, now](foreign_ptr<lw_shared_ptr<query::result>> results, paging_state::replicas_per_token_range last_replicas) {
_last_replicas = std::move(last_replicas);
handle_result(builder, std::move(results), page_size, now);
return get_local_storage_proxy().query(_schema,
std::move(command),
std::move(ranges),
_options.get_consistency(),
{_state.get_trace_state(), {}, std::move(_last_replicas)}).then(
[this, &builder, page_size, now](service::storage_proxy::coordinator_query_result qr) {
_last_replicas = std::move(qr.last_replicas);
handle_result(builder, std::move(qr.query_result), page_size, now);
});
}

View File

@@ -3003,13 +3003,11 @@ void storage_proxy::handle_read_error(std::exception_ptr eptr, bool range) {
}
}
future<foreign_ptr<lw_shared_ptr<query::result>>, replicas_per_token_range>
future<storage_proxy::coordinator_query_result>
storage_proxy::query_singular(lw_shared_ptr<query::read_command> cmd,
dht::partition_range_vector&& partition_ranges,
db::consistency_level cl,
tracing::trace_state_ptr trace_state,
clock_type::time_point timeout,
replicas_per_token_range preferred_replicas) {
storage_proxy::coordinator_query_options query_options) {
std::vector<std::pair<::shared_ptr<abstract_read_executor>, nonwrapping_range<dht::token>>> exec;
exec.reserve(partition_ranges.size());
@@ -3019,10 +3017,11 @@ storage_proxy::query_singular(lw_shared_ptr<query::read_command> cmd,
}
auto token_range = nonwrapping_range<dht::token>::make_singular(pr.start()->value().token());
auto it = preferred_replicas.find(token_range);
const auto replicas = it == preferred_replicas.end() ? std::vector<gms::inet_address>{} : replica_ids_to_endpoints(it->second);
auto it = query_options.preferred_replicas.find(token_range);
const auto replicas = it == query_options.preferred_replicas.end()
? std::vector<gms::inet_address>{} : replica_ids_to_endpoints(it->second);
exec.emplace_back(get_read_executor(cmd, std::move(pr), cl, trace_state, replicas), std::move(token_range));
exec.emplace_back(get_read_executor(cmd, std::move(pr), cl, query_options.trace_state, replicas), std::move(token_range));
}
query::result_merger merger(cmd->row_limit, cmd->partition_limit);
@@ -3030,7 +3029,7 @@ storage_proxy::query_singular(lw_shared_ptr<query::read_command> cmd,
auto used_replicas = make_lw_shared<replicas_per_token_range>();
auto f = ::map_reduce(exec.begin(), exec.end(), [timeout, used_replicas] (
auto f = ::map_reduce(exec.begin(), exec.end(), [timeout = query_options.timeout(*this), used_replicas] (
std::pair<::shared_ptr<abstract_read_executor>, nonwrapping_range<dht::token>>& executor_and_token_range) {
auto& [rex, token_range] = executor_and_token_range;
utils::latency_counter lc;
@@ -3054,9 +3053,9 @@ storage_proxy::query_singular(lw_shared_ptr<query::read_command> cmd,
auto eptr = f.get_exception();
// hold onto exec until read is complete
p->handle_read_error(eptr, false);
return make_exception_future<foreign_ptr<lw_shared_ptr<query::result>>, replicas_per_token_range>(eptr);
return make_exception_future<storage_proxy::coordinator_query_result>(eptr);
}
return make_ready_future<foreign_ptr<lw_shared_ptr<query::result>>, replicas_per_token_range>(std::move(f.get0()), std::move(*used_replicas));
return make_ready_future<coordinator_query_result>(coordinator_query_result(std::move(f.get0()), std::move(*used_replicas)));
});
}
@@ -3179,13 +3178,11 @@ storage_proxy::query_partition_key_range_concurrent(storage_proxy::clock_type::t
});
}
future<foreign_ptr<lw_shared_ptr<query::result>>, replicas_per_token_range>
future<storage_proxy::coordinator_query_result>
storage_proxy::query_partition_key_range(lw_shared_ptr<query::read_command> cmd,
dht::partition_range_vector partition_ranges,
db::consistency_level cl,
tracing::trace_state_ptr trace_state,
clock_type::time_point timeout,
replicas_per_token_range preferred_replicas) {
storage_proxy::coordinator_query_options query_options) {
schema_ptr schema = local_schema_registry().get(cmd->schema_version);
keyspace& ks = _db.local().find_keyspace(schema->ks_name());
dht::partition_range_vector ranges;
@@ -3220,8 +3217,8 @@ storage_proxy::query_partition_key_range(lw_shared_ptr<query::read_command> cmd,
slogger.debug("Estimated result rows per range: {}; requested rows: {}, ranges.size(): {}; concurrent range requests: {}",
result_rows_per_range, cmd->row_limit, ranges.size(), concurrency_factor);
return query_partition_key_range_concurrent(timeout, std::move(results), cmd, cl, ranges.begin(), std::move(ranges), concurrency_factor,
std::move(trace_state), cmd->row_limit, cmd->partition_limit)
return query_partition_key_range_concurrent(query_options.timeout(*this), std::move(results), cmd, cl, ranges.begin(), std::move(ranges),
concurrency_factor, std::move(query_options.trace_state), cmd->row_limit, cmd->partition_limit)
.then([row_limit = cmd->row_limit, partition_limit = cmd->partition_limit](std::vector<foreign_ptr<lw_shared_ptr<query::result>>> results) {
query::result_merger merger(row_limit, partition_limit);
merger.reserve(results.size());
@@ -3230,26 +3227,24 @@ storage_proxy::query_partition_key_range(lw_shared_ptr<query::read_command> cmd,
merger(std::move(r));
}
return make_ready_future<foreign_ptr<lw_shared_ptr<query::result>>, replicas_per_token_range>(merger.get(), replicas_per_token_range{});
return make_ready_future<coordinator_query_result>(coordinator_query_result(merger.get()));
});
}
future<foreign_ptr<lw_shared_ptr<query::result>>, replicas_per_token_range>
future<storage_proxy::coordinator_query_result>
storage_proxy::query(schema_ptr s,
lw_shared_ptr<query::read_command> cmd,
dht::partition_range_vector&& partition_ranges,
db::consistency_level cl,
tracing::trace_state_ptr trace_state,
clock_type::time_point timeout,
replicas_per_token_range preferred_replicas)
storage_proxy::coordinator_query_options query_options)
{
if (slogger.is_enabled(logging::log_level::trace) || qlogger.is_enabled(logging::log_level::trace)) {
static thread_local int next_id = 0;
auto query_id = next_id++;
slogger.trace("query {}.{} cmd={}, ranges={}, id={}", s->ks_name(), s->cf_name(), *cmd, partition_ranges, query_id);
return do_query(s, cmd, std::move(partition_ranges), cl, std::move(trace_state), std::move(timeout), std::move(preferred_replicas)).then(
[query_id, cmd, s] (foreign_ptr<lw_shared_ptr<query::result>>&& res, replicas_per_token_range&& preferred_replicas) {
return do_query(s, cmd, std::move(partition_ranges), cl, std::move(query_options)).then([query_id, cmd, s] (coordinator_query_result qr) {
auto& res = qr.query_result;
if (res->buf().is_linearized()) {
res->ensure_counts();
slogger.trace("query_result id={}, size={}, rows={}, partitions={}", query_id, res->buf().size(), *res->row_count(), *res->partition_count());
@@ -3257,26 +3252,22 @@ storage_proxy::query(schema_ptr s,
slogger.trace("query_result id={}, size={}", query_id, res->buf().size());
}
qlogger.trace("id={}, {}", query_id, res->pretty_printer(s, cmd->slice));
return make_ready_future<foreign_ptr<lw_shared_ptr<query::result>>, replicas_per_token_range>(
std::move(res), std::move(preferred_replicas));
return make_ready_future<coordinator_query_result>(std::move(qr));
});
}
return do_query(s, cmd, std::move(partition_ranges), cl, std::move(trace_state), std::move(timeout), std::move(preferred_replicas));
return do_query(s, cmd, std::move(partition_ranges), cl, std::move(query_options));
}
future<foreign_ptr<lw_shared_ptr<query::result>>, replicas_per_token_range>
future<storage_proxy::coordinator_query_result>
storage_proxy::do_query(schema_ptr s,
lw_shared_ptr<query::read_command> cmd,
dht::partition_range_vector&& partition_ranges,
db::consistency_level cl,
tracing::trace_state_ptr trace_state,
clock_type::time_point timeout,
replicas_per_token_range preferred_replicas)
storage_proxy::coordinator_query_options query_options)
{
static auto make_empty = [] {
return make_ready_future<foreign_ptr<lw_shared_ptr<query::result>>, replicas_per_token_range>(
make_foreign(make_lw_shared<query::result>()), replicas_per_token_range{});
return make_ready_future<coordinator_query_result>(make_foreign(make_lw_shared<query::result>()));
};
auto& slice = cmd->slice;
@@ -3293,9 +3284,7 @@ storage_proxy::do_query(schema_ptr s,
return query_singular(cmd,
std::move(partition_ranges),
cl,
std::move(trace_state),
std::move(timeout),
std::move(preferred_replicas)).finally([lc, p] () mutable {
std::move(query_options)).finally([lc, p] () mutable {
p->_stats.read.mark(lc.stop().latency());
if (lc.is_start()) {
p->_stats.estimated_read.add(lc.latency(), p->_stats.read.hist.count);
@@ -3310,9 +3299,7 @@ storage_proxy::do_query(schema_ptr s,
return query_partition_key_range(cmd,
std::move(partition_ranges),
cl,
std::move(trace_state),
std::move(timeout),
std::move(preferred_replicas)).finally([lc, p] () mutable {
std::move(query_options)).finally([lc, p] () mutable {
p->_stats.range.mark(lc.stop().latency());
if (lc.is_start()) {
p->_stats.estimated_range.add(lc.latency(), p->_stats.range.hist.count);

View File

@@ -208,6 +208,41 @@ public:
public:
stats();
};
class coordinator_query_options {
std::optional<clock_type::time_point> _timeout;
public:
tracing::trace_state_ptr trace_state = nullptr;
replicas_per_token_range preferred_replicas;
coordinator_query_options(tracing::trace_state_ptr trace_state = nullptr,
std::optional<clock_type::time_point> timeout = std::nullopt,
replicas_per_token_range preferred_replicas = { })
: _timeout(timeout)
, trace_state(std::move(trace_state))
, preferred_replicas(std::move(preferred_replicas)) {
}
clock_type::time_point timeout(storage_proxy& sp) const {
return _timeout ? *_timeout : sp.default_query_timeout();
}
};
struct coordinator_query_result {
foreign_ptr<lw_shared_ptr<query::result>> query_result;
replicas_per_token_range last_replicas;
coordinator_query_result(foreign_ptr<lw_shared_ptr<query::result>> query_result,
replicas_per_token_range last_replicas)
: query_result(std::move(query_result))
, last_replicas(std::move(last_replicas)) {
}
coordinator_query_result(foreign_ptr<lw_shared_ptr<query::result>> query_result)
: query_result(std::move(query_result)) {
}
};
private:
distributed<database>& _db;
response_id_type _next_response_id = 1; // 0 is reserved for unique_response_handler
@@ -229,12 +264,10 @@ private:
seastar::metrics::metric_groups _metrics;
private:
void uninit_messaging_service();
future<foreign_ptr<lw_shared_ptr<query::result>>, replicas_per_token_range> query_singular(lw_shared_ptr<query::read_command> cmd,
future<coordinator_query_result> query_singular(lw_shared_ptr<query::read_command> cmd,
dht::partition_range_vector&& partition_ranges,
db::consistency_level cl,
tracing::trace_state_ptr trace_state,
clock_type::time_point timeout,
replicas_per_token_range preferred_replicas);
coordinator_query_options optional_params);
response_id_type register_response_handler(shared_ptr<abstract_write_response_handler>&& h);
void remove_response_handler(response_id_type id);
void got_response(response_id_type id, gms::inet_address from);
@@ -269,12 +302,10 @@ private:
clock_type::time_point timeout,
query::digest_algorithm da,
uint64_t max_size = query::result_memory_limiter::maximum_result_size);
future<foreign_ptr<lw_shared_ptr<query::result>>, replicas_per_token_range> query_partition_key_range(lw_shared_ptr<query::read_command> cmd,
future<coordinator_query_result> query_partition_key_range(lw_shared_ptr<query::read_command> cmd,
dht::partition_range_vector partition_ranges,
db::consistency_level cl,
tracing::trace_state_ptr trace_state,
clock_type::time_point timeout,
replicas_per_token_range preferred_replicas);
coordinator_query_options optional_params);
dht::partition_range_vector get_restricted_ranges(const schema& s, dht::partition_range range);
float estimate_result_rows_per_range(lw_shared_ptr<query::read_command> cmd, keyspace& ks);
static std::vector<gms::inet_address> intersection(const std::vector<gms::inet_address>& l1, const std::vector<gms::inet_address>& l2);
@@ -283,13 +314,11 @@ private:
dht::partition_range_vector&& ranges, int concurrency_factor, tracing::trace_state_ptr trace_state,
uint32_t remaining_row_count, uint32_t remaining_partition_count);
future<foreign_ptr<lw_shared_ptr<query::result>>, replicas_per_token_range> do_query(schema_ptr,
future<coordinator_query_result> do_query(schema_ptr,
lw_shared_ptr<query::read_command> cmd,
dht::partition_range_vector&& partition_ranges,
db::consistency_level cl,
tracing::trace_state_ptr trace_state,
clock_type::time_point timeout,
replicas_per_token_range preferred_replicas);
coordinator_query_options optional_params);
template<typename Range, typename CreateWriteHandler>
future<std::vector<unique_response_handler>> mutate_prepare(const Range& mutations, db::consistency_level cl, db::write_type type, CreateWriteHandler handler);
template<typename Range>
@@ -411,28 +440,11 @@ public:
* IMPORTANT: Not all fibers started by this method have to be done by the time it returns so no
* parameter can be changed after being passed to this method.
*/
future<foreign_ptr<lw_shared_ptr<query::result>>, replicas_per_token_range> query(schema_ptr,
future<coordinator_query_result> query(schema_ptr,
lw_shared_ptr<query::read_command> cmd,
dht::partition_range_vector&& partition_ranges,
db::consistency_level cl,
tracing::trace_state_ptr trace_state,
clock_type::time_point timeout,
replicas_per_token_range preferred_replicas = {});
/**
* query() overload without the preferred_replicas and timeout.
*
* Convenience overload for code that doesn't care about them.
* As timeout the default one is used as defined by the configuration.
*/
future<foreign_ptr<lw_shared_ptr<query::result>>, replicas_per_token_range> query(schema_ptr s,
lw_shared_ptr<query::read_command> cmd,
dht::partition_range_vector&& partition_ranges,
db::consistency_level cl,
tracing::trace_state_ptr trace_state) {
return query(std::move(s), std::move(cmd), std::move(partition_ranges), std::move(cl),
std::move(trace_state), default_query_timeout(), {});
}
coordinator_query_options optional_params = {});
future<foreign_ptr<lw_shared_ptr<reconcilable_result>>, cache_temperature> query_mutations_locally(
schema_ptr, lw_shared_ptr<query::read_command> cmd, const dht::partition_range&,

View File

@@ -265,13 +265,9 @@ public:
auto pranges = make_partition_ranges(*schema, keys);
auto f = _query_state.get_client_state().has_schema_access(*schema, auth::permission::SELECT);
return f.then([schema, cmd, pranges = std::move(pranges), cell_limit, consistency_level, keys]() mutable {
return service::get_local_storage_proxy().query(
schema,
cmd,
std::move(pranges),
cl_from_thrift(consistency_level),
nullptr).then([schema, cmd, cell_limit, keys = std::move(keys)](auto result, service::replicas_per_token_range) {
return query::result_view::do_with(*result, [schema, cmd, cell_limit, keys = std::move(keys)](query::result_view v) mutable {
return service::get_local_storage_proxy().query(schema, cmd, std::move(pranges), cl_from_thrift(consistency_level)).then(
[schema, cmd, cell_limit, keys = std::move(keys)](service::storage_proxy::coordinator_query_result qr) {
return query::result_view::do_with(*qr.query_result, [schema, cmd, cell_limit, keys = std::move(keys)](query::result_view v) mutable {
if (schema->is_counter()) {
counter_column_aggregator aggregator(*schema, cmd->slice, cell_limit, std::move(keys));
v.consume(cmd->slice, aggregator);
@@ -296,13 +292,9 @@ public:
auto pranges = make_partition_ranges(*schema, keys);
auto f = _query_state.get_client_state().has_schema_access(*schema, auth::permission::SELECT);
return f.then([schema, cmd, pranges = std::move(pranges), cell_limit, consistency_level, keys]() mutable {
return service::get_local_storage_proxy().query(
schema,
cmd,
std::move(pranges),
cl_from_thrift(consistency_level),
nullptr).then([schema, cmd, cell_limit, keys = std::move(keys)](auto&& result, service::replicas_per_token_range) {
return query::result_view::do_with(*result, [schema, cmd, cell_limit, keys = std::move(keys)](query::result_view v) mutable {
return service::get_local_storage_proxy().query(schema, cmd, std::move(pranges), cl_from_thrift(consistency_level)).then(
[schema, cmd, cell_limit, keys = std::move(keys)](service::storage_proxy::coordinator_query_result qr) {
return query::result_view::do_with(*qr.query_result, [schema, cmd, cell_limit, keys = std::move(keys)](query::result_view v) mutable {
column_counter counter(*schema, cmd->slice, cell_limit, std::move(keys));
v.consume(cmd->slice, counter);
return counter.release();
@@ -336,13 +328,9 @@ public:
}
auto f = _query_state.get_client_state().has_schema_access(*schema, auth::permission::SELECT);
return f.then([schema, cmd, prange = std::move(prange), consistency_level] () mutable {
return service::get_local_storage_proxy().query(
schema,
cmd,
std::move(prange),
cl_from_thrift(consistency_level),
nullptr).then([schema, cmd](auto result, service::replicas_per_token_range) {
return query::result_view::do_with(*result, [schema, cmd](query::result_view v) {
return service::get_local_storage_proxy().query(schema, cmd, std::move(prange), cl_from_thrift(consistency_level)).then(
[schema, cmd](service::storage_proxy::coordinator_query_result qr) {
return query::result_view::do_with(*qr.query_result, [schema, cmd](query::result_view v) {
return to_key_slices(*schema, cmd->slice, v, std::numeric_limits<uint32_t>::max());
});
});
@@ -402,9 +390,9 @@ public:
range = {dht::partition_range::make_singular(std::move(range[0].start()->value()))};
}
auto range1 = range; // query() below accepts an rvalue, so need a copy to reuse later
return service::get_local_storage_proxy().query(schema, cmd, std::move(range),
consistency_level, nullptr).then([schema, cmd, column_limit](auto result, service::replicas_per_token_range) {
return query::result_view::do_with(*result, [schema, cmd, column_limit](query::result_view v) {
return service::get_local_storage_proxy().query(schema, cmd, std::move(range), consistency_level).then(
[schema, cmd, column_limit](service::storage_proxy::coordinator_query_result qr) {
return query::result_view::do_with(*qr.query_result, [schema, cmd, column_limit](query::result_view v) {
return to_key_slices(*schema, cmd->slice, v, column_limit);
});
}).then([schema, cmd, column_limit, range = std::move(range1), consistency_level, start_key = std::move(start_key), end = std::move(end), &output](auto&& slices) mutable {
@@ -646,13 +634,9 @@ public:
auto cmd = make_lw_shared<query::read_command>(schema->id(), schema->version(), std::move(slice), row_limit);
auto f = _query_state.get_client_state().has_schema_access(*schema, auth::permission::SELECT);
return f.then([dk = std::move(dk), cmd, schema, column_limit = request.count, cl = request.consistency_level] {
return service::get_local_storage_proxy().query(
schema,
cmd,
{dht::partition_range::make_singular(dk)},
cl_from_thrift(cl),
nullptr).then([schema, cmd, column_limit](auto result, service::replicas_per_token_range) {
return query::result_view::do_with(*result, [schema, cmd, column_limit](query::result_view v) {
return service::get_local_storage_proxy().query(schema, cmd, {dht::partition_range::make_singular(dk)}, cl_from_thrift(cl)).then(
[schema, cmd, column_limit](service::storage_proxy::coordinator_query_result qr) {
return query::result_view::do_with(*qr.query_result, [schema, cmd, column_limit](query::result_view v) {
column_aggregator<query_order::no> aggregator(*schema, cmd->slice, column_limit, { });
v.consume(cmd->slice, aggregator);
auto cols = aggregator.release();