storage_proxy: Don't fetch superfluous partitions

This patch ensures we keep track of how many partitions we've queried
so we don't ask for more than the number we need.

Signed-off-by: Duarte Nunes <duarte@scylladb.com>
This commit is contained in:
Duarte Nunes
2016-12-13 16:40:52 +01:00
parent 93be8d7cef
commit 9572c19dc6
4 changed files with 23 additions and 11 deletions

View File

@@ -323,7 +323,7 @@ public:
return _partition_count;
}
uint32_t calculate_row_count(const query::partition_slice&);
void calculate_counts(const query::partition_slice&);
struct printer {
schema_ptr s;

View File

@@ -161,16 +161,18 @@ std::ostream& operator<<(std::ostream& os, const query::result::printer& p) {
return os;
}
uint32_t result::calculate_row_count(const query::partition_slice& slice) {
void result::calculate_counts(const query::partition_slice& slice) {
struct {
uint32_t total_count = 0;
uint32_t current_partition_count = 0;
uint32_t live_partitions = 0;
void accept_new_partition(const partition_key& key, uint32_t row_count) {
accept_new_partition(row_count);
}
void accept_new_partition(uint32_t row_count) {
total_count += row_count;
current_partition_count = row_count;
live_partitions += 1;
}
void accept_new_row(const clustering_key& key, const result_row_view& static_row, const result_row_view& row) {}
void accept_new_row(const result_row_view& static_row, const result_row_view& row) {}
@@ -182,7 +184,8 @@ uint32_t result::calculate_row_count(const query::partition_slice& slice) {
} counter;
result_view::consume(*this, slice, counter);
return counter.total_count;
_row_count = counter.total_count;
_partition_count = counter.live_partitions;
}
result::result()

View File

@@ -2687,7 +2687,8 @@ storage_proxy::query_singular(lw_shared_ptr<query::read_command> cmd, std::vecto
future<std::vector<foreign_ptr<lw_shared_ptr<query::result>>>>
storage_proxy::query_partition_key_range_concurrent(std::chrono::steady_clock::time_point timeout, std::vector<foreign_ptr<lw_shared_ptr<query::result>>>&& results,
lw_shared_ptr<query::read_command> cmd, db::consistency_level cl, std::vector<query::partition_range>::iterator&& i,
std::vector<query::partition_range>&& ranges, int concurrency_factor, tracing::trace_state_ptr trace_state, uint32_t total_row_count) {
std::vector<query::partition_range>&& ranges, int concurrency_factor, tracing::trace_state_ptr trace_state,
uint32_t total_row_count, uint32_t total_partition_count) {
schema_ptr schema = local_schema_registry().get(cmd->schema_version);
keyspace& ks = _db.local().find_keyspace(schema->ks_name());
std::vector<::shared_ptr<abstract_read_executor>> exec;
@@ -2759,15 +2760,21 @@ storage_proxy::query_partition_key_range_concurrent(std::chrono::steady_clock::t
return rex->execute(timeout);
}, std::move(merger));
return f.then([p, exec = std::move(exec), results = std::move(results), i = std::move(i), ranges = std::move(ranges), cl, cmd, concurrency_factor, timeout, total_row_count, trace_state = std::move(trace_state)]
return f.then([p, exec = std::move(exec), results = std::move(results), i = std::move(i), ranges = std::move(ranges),
cl, cmd, concurrency_factor, timeout, total_row_count, total_partition_count, trace_state = std::move(trace_state)]
(foreign_ptr<lw_shared_ptr<query::result>>&& result) mutable {
total_row_count += result->row_count() ? result->row_count().value() :
(logger.error("no row count in query result, should not happen here"), result->calculate_row_count(cmd->slice));
if (!result->row_count() || !result->partition_count()) {
logger.error("no row count in query result, should not happen here");
result->calculate_counts(cmd->slice);
}
total_row_count += result->row_count().value();
total_partition_count += result->partition_count().value();
results.emplace_back(std::move(result));
if (i == ranges.end() || total_row_count >= cmd->row_limit) {
if (i == ranges.end() || total_row_count >= cmd->row_limit || total_partition_count >= cmd->partition_limit) {
return make_ready_future<std::vector<foreign_ptr<lw_shared_ptr<query::result>>>>(std::move(results));
} else {
return p->query_partition_key_range_concurrent(timeout, std::move(results), cmd, cl, std::move(i), std::move(ranges), concurrency_factor, std::move(trace_state), total_row_count);
return p->query_partition_key_range_concurrent(timeout, std::move(results), cmd, cl, std::move(i),
std::move(ranges), concurrency_factor, std::move(trace_state), total_row_count, total_partition_count);
}
}).handle_exception([p] (std::exception_ptr eptr) {
p->handle_read_error(eptr, true);
@@ -2838,7 +2845,8 @@ storage_proxy::query(schema_ptr s,
logger.trace("query {}.{} cmd={}, ranges={}, id={}", s->ks_name(), s->cf_name(), *cmd, partition_ranges, query_id);
return do_query(s, cmd, std::move(partition_ranges), cl, std::move(trace_state)).then([query_id, cmd, s] (foreign_ptr<lw_shared_ptr<query::result>>&& res) {
if (res->buf().is_linearized()) {
logger.trace("query_result id={}, size={}, rows={}", query_id, res->buf().size(), res->calculate_row_count(cmd->slice));
res->calculate_counts(cmd->slice);
logger.trace("query_result id={}, size={}, rows={}, partitions={}", query_id, res->buf().size(), *res->row_count(), *res->partition_count());
} else {
logger.trace("query_result id={}, size={}", query_id, res->buf().size());
}

View File

@@ -243,7 +243,8 @@ private:
static std::vector<gms::inet_address> intersection(const std::vector<gms::inet_address>& l1, const std::vector<gms::inet_address>& l2);
future<std::vector<foreign_ptr<lw_shared_ptr<query::result>>>> query_partition_key_range_concurrent(std::chrono::steady_clock::time_point timeout,
std::vector<foreign_ptr<lw_shared_ptr<query::result>>>&& results, lw_shared_ptr<query::read_command> cmd, db::consistency_level cl, std::vector<query::partition_range>::iterator&& i,
std::vector<query::partition_range>&& ranges, int concurrency_factor, tracing::trace_state_ptr trace_state, uint32_t total_row_count = 0);
std::vector<query::partition_range>&& ranges, int concurrency_factor, tracing::trace_state_ptr trace_state,
uint32_t total_row_count = 0, uint32_t total_partition_count = 0);
future<foreign_ptr<lw_shared_ptr<query::result>>> do_query(schema_ptr,
lw_shared_ptr<query::read_command> cmd,