query: put live row count into query::result

The patch calculates row count during result building and while merging.
If one of results that are being merged does not have row count the
merged result will not have one either.
This commit is contained in:
Gleb Natapov
2016-05-02 13:35:47 +03:00
parent 41c586313a
commit db322d8f74
4 changed files with 29 additions and 7 deletions

View File

@@ -706,6 +706,7 @@ mutation_partition::query_compacted(query::result::partition_writer& pw, const s
|| !has_any_live_data(s, column_kind::static_column, static_row()))) {
pw.retract();
} else {
pw.row_count() = row_count ? : 1;
std::move(rows_wr).end_rows().end_qr_partition();
}
}

View File

@@ -50,6 +50,7 @@ class result::partition_writer {
bool _static_row_added = false;
md5_hasher& _digest;
md5_hasher _digest_pos;
uint32_t& _row_count;
public:
partition_writer(
result_request request,
@@ -58,7 +59,8 @@ public:
ser::query_result__partitions& pw,
ser::vector_position pos,
ser::after_qr_partition__key w,
md5_hasher& digest)
md5_hasher& digest,
uint32_t& row_count)
: _request(request)
, _w(std::move(w))
, _slice(slice)
@@ -67,6 +69,7 @@ public:
, _pos(std::move(pos))
, _digest(digest)
, _digest_pos(digest)
, _row_count(row_count)
{ }
bool requested_digest() const {
@@ -98,6 +101,9 @@ public:
md5_hasher& digest() {
return _digest;
}
uint32_t& row_count() {
return _row_count;
}
};
class result::builder {
@@ -106,6 +112,7 @@ class result::builder {
const partition_slice& _slice;
ser::query_result__partitions _w;
result_request _request;
uint32_t _row_count = 0;
public:
builder(const partition_slice& slice, result_request request)
: _slice(slice)
@@ -130,21 +137,21 @@ public:
if (_request != result_request::only_result) {
key.feed_hash(_digest, s);
}
return partition_writer(_request, _slice, ranges, _w, std::move(pos), std::move(after_key), _digest);
return partition_writer(_request, _slice, ranges, _w, std::move(pos), std::move(after_key), _digest, _row_count);
}
result build() {
std::move(_w).end_partitions().end_query_result();
switch (_request) {
case result_request::only_result:
return result(std::move(_out));
return result(std::move(_out), _row_count);
case result_request::only_digest: {
bytes_ostream buf;
ser::writer_of_query_result(buf).start_partitions().end_partitions().end_query_result();
return result(std::move(buf), result_digest(_digest.finalize_array()));
}
case result_request::result_and_digest:
return result(std::move(_out), result_digest(_digest.finalize_array()));
return result(std::move(_out), result_digest(_digest.finalize_array()), _row_count);
}
abort();
}

View File

@@ -96,14 +96,16 @@ public:
class result {
bytes_ostream _w;
stdx::optional<result_digest> _digest;
stdx::optional<uint32_t> _row_count;
public:
class builder;
class partition_writer;
friend class result_merger;
result();
result(bytes_ostream&& w) : _w(std::move(w)) {}
result(bytes_ostream&& w, stdx::optional<result_digest> d) : _w(std::move(w)), _digest(d) {}
result(bytes_ostream&& w, stdx::optional<uint32_t> c = {}) : _w(std::move(w)), _row_count(c) {}
result(bytes_ostream&& w, stdx::optional<result_digest> d, stdx::optional<uint32_t> c = {}) : _w(std::move(w)), _digest(d), _row_count(c) {}
result(result&&) = default;
result(const result&) = default;
result& operator=(result&&) = default;
@@ -117,6 +119,10 @@ public:
return _digest;
}
const stdx::optional<uint32_t>& row_count() const {
return _row_count;
}
uint32_t calculate_row_count(const query::partition_slice&);
struct printer {

View File

@@ -213,8 +213,16 @@ foreign_ptr<lw_shared_ptr<query::result>> result_merger::get() {
bytes_ostream w;
auto partitions = ser::writer_of_query_result(w).start_partitions();
std::experimental::optional<uint32_t> row_count = 0;
for (auto&& r : _partial) {
if (row_count) {
if (r->row_count()) {
row_count = row_count.value() + r->row_count().value();
} else {
row_count = std::experimental::nullopt;
}
}
result_view::do_with(*r, [&] (result_view rv) {
for (auto&& pv : rv._v.partitions()) {
partitions.add(pv);
@@ -224,7 +232,7 @@ foreign_ptr<lw_shared_ptr<query::result>> result_merger::get() {
std::move(partitions).end_partitions().end_query_result();
return make_foreign(make_lw_shared<query::result>(std::move(w)));
return make_foreign(make_lw_shared<query::result>(std::move(w), row_count));
}
}