diff --git a/idl/reconcilable_result.idl.hh b/idl/reconcilable_result.idl.hh index ffe0bc4eb0..26dd445528 100644 --- a/idl/reconcilable_result.idl.hh +++ b/idl/reconcilable_result.idl.hh @@ -27,4 +27,5 @@ class partition { class reconcilable_result { uint32_t row_count(); std::vector partitions(); + query::short_read is_short_read() [[version 1.7]] = query::short_read::no; }; diff --git a/mutation_partition.cc b/mutation_partition.cc index 542d88f338..fb3a77693e 100644 --- a/mutation_partition.cc +++ b/mutation_partition.cc @@ -1877,7 +1877,7 @@ public: } reconcilable_result consume_end_of_stream() { - return reconcilable_result(_total_live_rows, std::move(_result)); + return reconcilable_result(_total_live_rows, std::move(_result), query::short_read::no); } }; diff --git a/mutation_query.cc b/mutation_query.cc index b04f26ac5a..a3160fcf34 100644 --- a/mutation_query.cc +++ b/mutation_query.cc @@ -31,8 +31,9 @@ reconcilable_result::reconcilable_result() : _row_count(0) { } -reconcilable_result::reconcilable_result(uint32_t row_count, std::vector p) +reconcilable_result::reconcilable_result(uint32_t row_count, std::vector p, query::short_read short_read) : _row_count(row_count) + , _short_read(short_read) , _partitions(std::move(p)) { } @@ -62,11 +63,15 @@ to_data_query_result(const reconcilable_result& r, schema_ptr s, const query::pa } p.mut().unfreeze(s).query(builder, slice, gc_clock::time_point::min(), query::max_rows); } + if (r.is_short_read()) { + builder.mark_as_short_read(); + } return builder.build(); } std::ostream& operator<<(std::ostream& out, const reconcilable_result::printer& pr) { - out << "{rows=" << pr.self.row_count() << ", ["; + out << "{rows=" << pr.self.row_count() << ", short_read=" + << pr.self.is_short_read() << ", ["; bool first = true; for (const partition& p : pr.self.partitions()) { if (!first) { diff --git a/mutation_query.hh b/mutation_query.hh index 631208ed57..f778c8ba84 100644 --- a/mutation_query.hh +++ b/mutation_query.hh @@ -67,13 +67,14 @@ struct partition { // Can be read by other cores after publishing. class reconcilable_result { uint32_t _row_count; + query::short_read _short_read; std::vector _partitions; public: ~reconcilable_result(); reconcilable_result(); reconcilable_result(reconcilable_result&&) = default; reconcilable_result& operator=(reconcilable_result&&) = default; - reconcilable_result(uint32_t row_count, std::vector partitions); + reconcilable_result(uint32_t row_count, std::vector partitions, query::short_read short_read); const std::vector& partitions() const; std::vector& partitions(); @@ -82,6 +83,10 @@ public: return _row_count; } + query::short_read is_short_read() const { + return _short_read; + } + bool operator==(const reconcilable_result& other) const; bool operator!=(const reconcilable_result& other) const; diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 686c2a6967..177c2d919a 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -2079,7 +2079,7 @@ public: return std::ref(a); }); - return reconcilable_result(_total_live_count, std::move(r.get())); + return reconcilable_result(_total_live_count, std::move(r.get()), query::short_read::no); } auto total_live_count() const { return _total_live_count; @@ -3421,6 +3421,7 @@ void storage_proxy::uninit_messaging_service() { class mutation_result_merger { unsigned _row_count = 0; unsigned _partition_count = 0; + query::short_read _short_read; std::vector _partitions; public: void add_result(foreign_ptr> partial_result) { @@ -3430,9 +3431,13 @@ public: _row_count += p._row_count; _partition_count += p._row_count > 0; } + _short_read = partial_result->is_short_read(); } reconcilable_result get() && { - return reconcilable_result(_row_count, std::move(_partitions)); + return reconcilable_result(_row_count, std::move(_partitions), _short_read); + } + bool short_read() const { + return bool(_short_read); } unsigned partition_count() const { return _partition_count; @@ -3495,7 +3500,7 @@ storage_proxy::query_nonsingular_mutations_locally(schema_ptr s, lw_shared_ptr> rr) -> stdx::optional { mrm.add_result(std::move(rr)); - if (mrm.partition_count() >= cmd->partition_limit || mrm.row_count() >= cmd->row_limit) { + if (mrm.short_read() || mrm.partition_count() >= cmd->partition_limit || mrm.row_count() >= cmd->row_limit) { return std::move(mrm).get(); } return stdx::nullopt;