reconcilable_result: properly propagate short_read flag
reconcilable_result can be merged with another or transformed into query::result. Make sure that short_read information is never lost. Signed-off-by: Paweł Dziepak <pdziepak@scylladb.com>
This commit is contained in:
@@ -27,4 +27,5 @@ class partition {
|
||||
class reconcilable_result {
|
||||
uint32_t row_count();
|
||||
std::vector<partition> partitions();
|
||||
query::short_read is_short_read() [[version 1.7]] = query::short_read::no;
|
||||
};
|
||||
|
||||
@@ -1877,7 +1877,7 @@ public:
|
||||
}
|
||||
|
||||
reconcilable_result consume_end_of_stream() {
|
||||
return reconcilable_result(_total_live_rows, std::move(_result));
|
||||
return reconcilable_result(_total_live_rows, std::move(_result), query::short_read::no);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -31,8 +31,9 @@ reconcilable_result::reconcilable_result()
|
||||
: _row_count(0)
|
||||
{ }
|
||||
|
||||
reconcilable_result::reconcilable_result(uint32_t row_count, std::vector<partition> p)
|
||||
reconcilable_result::reconcilable_result(uint32_t row_count, std::vector<partition> p, query::short_read short_read)
|
||||
: _row_count(row_count)
|
||||
, _short_read(short_read)
|
||||
, _partitions(std::move(p))
|
||||
{ }
|
||||
|
||||
@@ -62,11 +63,15 @@ to_data_query_result(const reconcilable_result& r, schema_ptr s, const query::pa
|
||||
}
|
||||
p.mut().unfreeze(s).query(builder, slice, gc_clock::time_point::min(), query::max_rows);
|
||||
}
|
||||
if (r.is_short_read()) {
|
||||
builder.mark_as_short_read();
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
std::ostream& operator<<(std::ostream& out, const reconcilable_result::printer& pr) {
|
||||
out << "{rows=" << pr.self.row_count() << ", [";
|
||||
out << "{rows=" << pr.self.row_count() << ", short_read="
|
||||
<< pr.self.is_short_read() << ", [";
|
||||
bool first = true;
|
||||
for (const partition& p : pr.self.partitions()) {
|
||||
if (!first) {
|
||||
|
||||
@@ -67,13 +67,14 @@ struct partition {
|
||||
// Can be read by other cores after publishing.
|
||||
class reconcilable_result {
|
||||
uint32_t _row_count;
|
||||
query::short_read _short_read;
|
||||
std::vector<partition> _partitions;
|
||||
public:
|
||||
~reconcilable_result();
|
||||
reconcilable_result();
|
||||
reconcilable_result(reconcilable_result&&) = default;
|
||||
reconcilable_result& operator=(reconcilable_result&&) = default;
|
||||
reconcilable_result(uint32_t row_count, std::vector<partition> partitions);
|
||||
reconcilable_result(uint32_t row_count, std::vector<partition> partitions, query::short_read short_read);
|
||||
|
||||
const std::vector<partition>& partitions() const;
|
||||
std::vector<partition>& partitions();
|
||||
@@ -82,6 +83,10 @@ public:
|
||||
return _row_count;
|
||||
}
|
||||
|
||||
query::short_read is_short_read() const {
|
||||
return _short_read;
|
||||
}
|
||||
|
||||
bool operator==(const reconcilable_result& other) const;
|
||||
bool operator!=(const reconcilable_result& other) const;
|
||||
|
||||
|
||||
@@ -2079,7 +2079,7 @@ public:
|
||||
return std::ref(a);
|
||||
});
|
||||
|
||||
return reconcilable_result(_total_live_count, std::move(r.get()));
|
||||
return reconcilable_result(_total_live_count, std::move(r.get()), query::short_read::no);
|
||||
}
|
||||
auto total_live_count() const {
|
||||
return _total_live_count;
|
||||
@@ -3421,6 +3421,7 @@ void storage_proxy::uninit_messaging_service() {
|
||||
class mutation_result_merger {
|
||||
unsigned _row_count = 0;
|
||||
unsigned _partition_count = 0;
|
||||
query::short_read _short_read;
|
||||
std::vector<partition> _partitions;
|
||||
public:
|
||||
void add_result(foreign_ptr<lw_shared_ptr<reconcilable_result>> partial_result) {
|
||||
@@ -3430,9 +3431,13 @@ public:
|
||||
_row_count += p._row_count;
|
||||
_partition_count += p._row_count > 0;
|
||||
}
|
||||
_short_read = partial_result->is_short_read();
|
||||
}
|
||||
reconcilable_result get() && {
|
||||
return reconcilable_result(_row_count, std::move(_partitions));
|
||||
return reconcilable_result(_row_count, std::move(_partitions), _short_read);
|
||||
}
|
||||
bool short_read() const {
|
||||
return bool(_short_read);
|
||||
}
|
||||
unsigned partition_count() const {
|
||||
return _partition_count;
|
||||
@@ -3495,7 +3500,7 @@ storage_proxy::query_nonsingular_mutations_locally(schema_ptr s, lw_shared_ptr<q
|
||||
});
|
||||
}).then([&] (foreign_ptr<lw_shared_ptr<reconcilable_result>> rr) -> stdx::optional<reconcilable_result> {
|
||||
mrm.add_result(std::move(rr));
|
||||
if (mrm.partition_count() >= cmd->partition_limit || mrm.row_count() >= cmd->row_limit) {
|
||||
if (mrm.short_read() || mrm.partition_count() >= cmd->partition_limit || mrm.row_count() >= cmd->row_limit) {
|
||||
return std::move(mrm).get();
|
||||
}
|
||||
return stdx::nullopt;
|
||||
|
||||
Reference in New Issue
Block a user