query_result_merger: Limit rows

This patch makes the row limit enforced by the storage_proxy layer.
It adds a row limit to the query_result_merger, useful when merging
results for concurrent queries.

More importantly, it provides guarantees that upper layers may be
relying on implicitly (e.g., the paging code).

Signed-off-by: Duarte Nunes <duarte@scylladb.com>
This commit is contained in:
Duarte Nunes
2016-12-13 21:44:37 +01:00
parent efc986d548
commit fee0b7fa48
4 changed files with 47 additions and 19 deletions

View File

@@ -308,7 +308,7 @@ select_statement::execute(distributed<service::storage_proxy>& proxy,
if (needs_post_query_ordering() && _limit) {
return do_with(std::forward<std::vector<query::partition_range>>(partition_ranges), [this, &proxy, &state, &options, cmd](auto prs) {
assert(cmd->partition_limit == query::max_partitions);
query::result_merger merger(query::max_partitions);
query::result_merger merger(cmd->row_limit * prs.size(), query::max_partitions);
return map_reduce(prs.begin(), prs.end(), [this, &proxy, &state, &options, cmd] (auto pr) {
std::vector<query::partition_range> prange { pr };
auto command = ::make_lw_shared<query::read_command>(*cmd);
@@ -343,7 +343,7 @@ select_statement::execute_internal(distributed<service::storage_proxy>& proxy,
if (needs_post_query_ordering() && _limit) {
return do_with(std::move(partition_ranges), [this, &proxy, &state, command] (auto prs) {
assert(command->partition_limit == query::max_partitions);
query::result_merger merger(query::max_partitions);
query::result_merger merger(command->row_limit * prs.size(), query::max_partitions);
return map_reduce(prs.begin(), prs.end(), [this, &proxy, &state, command] (auto pr) {
std::vector<query::partition_range> prange { pr };
auto cmd = ::make_lw_shared<query::read_command>(*command);

View File

@@ -196,6 +196,27 @@ result::result()
}(), short_read::no, 0, 0)
{ }
static void write_partial_partition(ser::writer_of_qr_partition&& pw, const ser::qr_partition_view& pv, uint32_t rows_to_include) {
auto key = pv.key();
auto static_cells_wr = (key ? std::move(pw).write_key(*key) : std::move(pw).skip_key())
.start_static_row()
.start_cells();
for (auto&& cell : pv.static_row().cells()) {
static_cells_wr.add(cell);
}
auto rows_wr = std::move(static_cells_wr)
.end_cells()
.end_static_row()
.start_rows();
auto rows = pv.rows();
// rows.size() can be 0 is there's a single static row
auto it = rows.begin();
for (uint32_t i = 0; i < std::min(rows.size(), uint64_t{rows_to_include}); ++i) {
rows_wr.add(*it++);
}
std::move(rows_wr).end_rows().end_qr_partition();
}
foreign_ptr<lw_shared_ptr<query::result>> result_merger::get() {
if (_partial.size() == 1) {
return std::move(_partial[0]);
@@ -203,22 +224,27 @@ foreign_ptr<lw_shared_ptr<query::result>> result_merger::get() {
bytes_ostream w;
auto partitions = ser::writer_of_query_result(w).start_partitions();
std::experimental::optional<uint32_t> row_count = 0;
uint32_t row_count = 0;
short_read is_short_read;
uint32_t partition_count = 0;
for (auto&& r : _partial) {
if (row_count) {
if (r->row_count()) {
row_count = row_count.value() + r->row_count().value();
} else {
row_count = std::experimental::nullopt;
}
}
result_view::do_with(*r, [&] (result_view rv) {
for (auto&& pv : rv._v.partitions()) {
partitions.add(pv);
if (++partition_count >= _max_partitions) {
auto rows = pv.rows();
// If rows.empty(), then there's a static row, or there wouldn't be a partition
const uint32_t rows_in_partition = rows.size() ? : 1;
const uint32_t rows_to_include = std::min(_max_rows - row_count, rows_in_partition);
row_count += rows_to_include;
if (rows_to_include >= rows_in_partition) {
partitions.add(pv);
if (++partition_count >= _max_partitions) {
return;
}
} else if (rows_to_include > 0) {
write_partial_partition(partitions.add(), pv, rows_to_include);
return;
} else {
return;
}
}
@@ -227,7 +253,7 @@ foreign_ptr<lw_shared_ptr<query::result>> result_merger::get() {
is_short_read = short_read::yes;
break;
}
if (partition_count >= _max_partitions) {
if (row_count >= _max_rows || partition_count >= _max_partitions) {
break;
}
}

View File

@@ -30,10 +30,12 @@ namespace query {
// Implements @Reducer concept from distributed.hh
class result_merger {
std::vector<foreign_ptr<lw_shared_ptr<query::result>>> _partial;
const uint32_t _max_rows;
const uint32_t _max_partitions;
public:
explicit result_merger(uint32_t max_partitions)
: _max_partitions(max_partitions)
explicit result_merger(uint32_t max_rows, uint32_t max_partitions)
: _max_rows(max_rows)
, _max_partitions(max_partitions)
{ }
void reserve(size_t size) {

View File

@@ -2671,7 +2671,7 @@ storage_proxy::query_singular(lw_shared_ptr<query::read_command> cmd, std::vecto
exec.push_back(get_read_executor(cmd, std::move(pr), cl, trace_state));
}
query::result_merger merger(cmd->partition_limit);
query::result_merger merger(cmd->row_limit, cmd->partition_limit);
merger.reserve(exec.size());
auto f = ::map_reduce(exec.begin(), exec.end(), [timeout] (::shared_ptr<abstract_read_executor>& rex) {
@@ -2754,7 +2754,7 @@ storage_proxy::query_partition_key_range_concurrent(std::chrono::steady_clock::t
exec.push_back(::make_shared<range_slice_read_executor>(schema, p, cmd, std::move(range), cl, std::move(filtered_endpoints), trace_state));
}
query::result_merger merger(cmd->partition_limit);
query::result_merger merger(cmd->row_limit, cmd->partition_limit);
merger.reserve(exec.size());
auto f = ::map_reduce(exec.begin(), exec.end(), [timeout] (::shared_ptr<abstract_read_executor>& rex) {
@@ -2824,8 +2824,8 @@ storage_proxy::query_partition_key_range(lw_shared_ptr<query::read_command> cmd,
return query_partition_key_range_concurrent(timeout, std::move(results), cmd, cl, ranges.begin(), std::move(ranges), concurrency_factor,
std::move(trace_state), cmd->row_limit, cmd->partition_limit)
.then([partition_limit = cmd->partition_limit](std::vector<foreign_ptr<lw_shared_ptr<query::result>>> results) {
query::result_merger merger(partition_limit);
.then([row_limit = cmd->row_limit, partition_limit = cmd->partition_limit](std::vector<foreign_ptr<lw_shared_ptr<query::result>>> results) {
query::result_merger merger(row_limit, partition_limit);
merger.reserve(results.size());
for (auto&& r: results) {