query: Limit number of partitions returned
This is required to implement a thrift verb. Signed-off-by: Duarte Nunes <duarte@scylladb.com>
This commit is contained in:
11
database.cc
11
database.cc
@@ -1885,6 +1885,7 @@ struct query_state {
|
||||
, cmd(cmd)
|
||||
, builder(cmd.slice, request)
|
||||
, limit(cmd.row_limit)
|
||||
, partition_limit(cmd.partition_limit)
|
||||
, current_partition_range(ranges.begin())
|
||||
, range_end(ranges.end()){
|
||||
}
|
||||
@@ -1892,6 +1893,7 @@ struct query_state {
|
||||
const query::read_command& cmd;
|
||||
query::result::builder builder;
|
||||
uint32_t limit;
|
||||
uint32_t partition_limit;
|
||||
bool range_empty = false; // Avoid ubsan false-positive when moving after construction
|
||||
std::vector<query::partition_range>::const_iterator current_partition_range;
|
||||
std::vector<query::partition_range>::const_iterator range_end;
|
||||
@@ -1910,8 +1912,10 @@ column_family::query(schema_ptr s, const query::read_command& cmd, query::result
|
||||
{
|
||||
return do_until(std::bind(&query_state::done, &qs), [this, &qs] {
|
||||
auto&& range = *qs.current_partition_range++;
|
||||
return data_query(qs.schema, as_mutation_source(), range, qs.cmd.slice, qs.limit, qs.cmd.timestamp, qs.builder).then([&qs] (auto live_rows) {
|
||||
qs.limit -= live_rows;
|
||||
return data_query(qs.schema, as_mutation_source(), range, qs.cmd.slice, qs.limit, qs.partition_limit,
|
||||
qs.cmd.timestamp, qs.builder).then([&qs] (auto&& r) {
|
||||
qs.limit -= r.live_rows;
|
||||
qs.partition_limit -= r.partitions;
|
||||
});
|
||||
}).then([qs_ptr = std::move(qs_ptr), &qs] {
|
||||
return make_ready_future<lw_shared_ptr<query::result>>(
|
||||
@@ -1947,7 +1951,8 @@ database::query(schema_ptr s, const query::read_command& cmd, query::result_requ
|
||||
future<reconcilable_result>
|
||||
database::query_mutations(schema_ptr s, const query::read_command& cmd, const query::partition_range& range) {
|
||||
column_family& cf = find_column_family(cmd.cf_id);
|
||||
return mutation_query(std::move(s), cf.as_mutation_source(), range, cmd.slice, cmd.row_limit, cmd.timestamp).then([this, s = _stats] (auto&& res) {
|
||||
return mutation_query(std::move(s), cf.as_mutation_source(), range, cmd.slice, cmd.row_limit, cmd.partition_limit,
|
||||
cmd.timestamp).then([this, s = _stats] (auto&& res) {
|
||||
++s->total_reads;
|
||||
return std::move(res);
|
||||
});
|
||||
|
||||
@@ -47,6 +47,7 @@ class read_command {
|
||||
uint32_t row_limit;
|
||||
std::chrono::time_point<gc_clock, gc_clock::duration> timestamp;
|
||||
std::experimental::optional<tracing::trace_info> trace_info [[version 1.3]];
|
||||
uint32_t partition_limit [[version 1.3]] = std::numeric_limits<uint32_t>::max();
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -1660,6 +1660,7 @@ class compact_for_query {
|
||||
api::timestamp_type _max_purgeable = api::max_timestamp;
|
||||
const query::partition_slice& _slice;
|
||||
uint32_t _row_limit;
|
||||
uint32_t _partition_limit;
|
||||
uint32_t _partition_row_limit;
|
||||
|
||||
Consumer _consumer;
|
||||
@@ -1693,12 +1694,14 @@ private:
|
||||
return t.timestamp < _max_purgeable && t.deletion_time < _gc_before;
|
||||
};
|
||||
public:
|
||||
compact_for_query(const schema& s, gc_clock::time_point query_time, const query::partition_slice& slice, uint32_t limit, Consumer consumer)
|
||||
compact_for_query(const schema& s, gc_clock::time_point query_time, const query::partition_slice& slice, uint32_t limit,
|
||||
uint32_t partition_limit, Consumer consumer)
|
||||
: _schema(s)
|
||||
, _query_time(query_time)
|
||||
, _gc_before(query_time - s.gc_grace_seconds())
|
||||
, _slice(slice)
|
||||
, _row_limit(limit)
|
||||
, _partition_limit(partition_limit)
|
||||
, _partition_row_limit(_slice.options.contains(query::partition_slice::option::distinct) ? 1 : slice.partition_row_limit())
|
||||
, _consumer(std::move(consumer))
|
||||
{ }
|
||||
@@ -1792,8 +1795,9 @@ public:
|
||||
}
|
||||
|
||||
_row_limit -= _rows_in_current_partition;
|
||||
_partition_limit -= 1;
|
||||
_consumer.consume_end_of_partition();
|
||||
return _row_limit ? stop_iteration::no : stop_iteration::yes;
|
||||
return _row_limit && _partition_limit ? stop_iteration::no : stop_iteration::yes;
|
||||
}
|
||||
return stop_iteration::no;
|
||||
}
|
||||
@@ -1931,6 +1935,7 @@ uint32_t mutation_querier::consume_end_of_stream() {
|
||||
class query_result_builder {
|
||||
const schema& _schema;
|
||||
uint32_t _live_rows = 0;
|
||||
uint32_t _partitions = 0;
|
||||
query::result::builder& _rb;
|
||||
stdx::optional<query::result::partition_writer> _pw;
|
||||
stdx::optional<mutation_querier> _mutation_consumer;
|
||||
@@ -1961,25 +1966,26 @@ public:
|
||||
|
||||
void consume_end_of_partition() {
|
||||
_live_rows += _mutation_consumer->consume_end_of_stream();
|
||||
_partitions += 1;
|
||||
}
|
||||
|
||||
uint32_t consume_end_of_stream() {
|
||||
return _live_rows;
|
||||
data_query_result consume_end_of_stream() {
|
||||
return {_live_rows, _partitions};
|
||||
}
|
||||
};
|
||||
|
||||
future<uint32_t> data_query(schema_ptr s, const mutation_source& source, const query::partition_range& range,
|
||||
const query::partition_slice& slice, uint32_t row_limit, gc_clock::time_point query_time,
|
||||
query::result::builder& builder)
|
||||
future<data_query_result> data_query(schema_ptr s, const mutation_source& source, const query::partition_range& range,
|
||||
const query::partition_slice& slice, uint32_t row_limit, uint32_t partition_limit,
|
||||
gc_clock::time_point query_time, query::result::builder& builder)
|
||||
{
|
||||
if (row_limit == 0 || slice.partition_row_limit() == 0) {
|
||||
return make_ready_future<uint32_t>(0);
|
||||
if (row_limit == 0 || slice.partition_row_limit() == 0 || partition_limit == 0) {
|
||||
return make_ready_future<data_query_result>();
|
||||
}
|
||||
|
||||
auto is_reversed = slice.options.contains(query::partition_slice::option::reversed);
|
||||
|
||||
auto qrb = query_result_builder(*s, builder);
|
||||
auto cfq = compact_for_query<emit_only_live_rows::yes, query_result_builder>(*s, query_time, slice, row_limit, std::move(qrb));
|
||||
auto cfq = compact_for_query<emit_only_live_rows::yes, query_result_builder>(*s, query_time, slice, row_limit, partition_limit, std::move(qrb));
|
||||
|
||||
auto reader = source(s, range, query::clustering_key_filtering_context::create(s, slice), service::get_local_sstable_query_read_priority());
|
||||
return consume_flattened(std::move(reader), std::move(cfq), is_reversed);
|
||||
@@ -2044,16 +2050,17 @@ mutation_query(schema_ptr s,
|
||||
const query::partition_range& range,
|
||||
const query::partition_slice& slice,
|
||||
uint32_t row_limit,
|
||||
uint32_t partition_limit,
|
||||
gc_clock::time_point query_time)
|
||||
{
|
||||
if (row_limit == 0 || slice.partition_row_limit() == 0) {
|
||||
if (row_limit == 0 || slice.partition_row_limit() == 0 || partition_limit == 0) {
|
||||
return make_ready_future<reconcilable_result>(reconcilable_result());
|
||||
}
|
||||
|
||||
auto is_reversed = slice.options.contains(query::partition_slice::option::reversed);
|
||||
|
||||
auto rrb = reconcilable_result_builder(*s, slice);
|
||||
auto cfq = compact_for_query<emit_only_live_rows::no, reconcilable_result_builder>(*s, query_time, slice, row_limit, std::move(rrb));
|
||||
auto cfq = compact_for_query<emit_only_live_rows::no, reconcilable_result_builder>(*s, query_time, slice, row_limit, partition_limit, std::move(rrb));
|
||||
|
||||
auto reader = source(s, range, query::clustering_key_filtering_context::create(s, slice), service::get_local_sstable_query_read_priority());
|
||||
return consume_flattened(std::move(reader), std::move(cfq), is_reversed);
|
||||
|
||||
@@ -54,9 +54,12 @@ bool reconcilable_result::operator!=(const reconcilable_result& other) const {
|
||||
}
|
||||
|
||||
query::result
|
||||
to_data_query_result(const reconcilable_result& r, schema_ptr s, const query::partition_slice& slice) {
|
||||
to_data_query_result(const reconcilable_result& r, schema_ptr s, const query::partition_slice& slice, uint32_t max_partitions) {
|
||||
query::result::builder builder(slice, query::result_request::only_result);
|
||||
for (const partition& p : r.partitions()) {
|
||||
if (!max_partitions--) {
|
||||
break;
|
||||
}
|
||||
p.mut().unfreeze(s).query(builder, slice, gc_clock::time_point::min(), query::max_rows);
|
||||
}
|
||||
return builder.build();
|
||||
|
||||
@@ -94,7 +94,7 @@ public:
|
||||
printer pretty_printer(schema_ptr) const;
|
||||
};
|
||||
|
||||
query::result to_data_query_result(const reconcilable_result&, schema_ptr, const query::partition_slice&);
|
||||
query::result to_data_query_result(const reconcilable_result&, schema_ptr, const query::partition_slice&, uint32_t partition_limit = query::max_partitions);
|
||||
|
||||
// Performs a query on given data source returning data in reconcilable form.
|
||||
//
|
||||
@@ -113,8 +113,14 @@ future<reconcilable_result> mutation_query(
|
||||
const query::partition_range& range,
|
||||
const query::partition_slice& slice,
|
||||
uint32_t row_limit,
|
||||
uint32_t partition_limit,
|
||||
gc_clock::time_point query_time);
|
||||
|
||||
future<uint32_t> data_query(schema_ptr s, const mutation_source& source, const query::partition_range& range,
|
||||
const query::partition_slice& slice, uint32_t row_limit, gc_clock::time_point query_time,
|
||||
query::result::builder& builder);
|
||||
struct data_query_result {
|
||||
uint32_t live_rows{0};
|
||||
uint32_t partitions{0};
|
||||
};
|
||||
|
||||
future<data_query_result> data_query(schema_ptr s, const mutation_source& source, const query::partition_range& range,
|
||||
const query::partition_slice& slice, uint32_t row_limit, uint32_t partition_limit,
|
||||
gc_clock::time_point query_time, query::result::builder& builder);
|
||||
@@ -155,6 +155,8 @@ public:
|
||||
friend std::ostream& operator<<(std::ostream& out, const specific_ranges& ps);
|
||||
};
|
||||
|
||||
constexpr auto max_partitions = std::numeric_limits<uint32_t>::max();
|
||||
|
||||
// Full specification of a query to the database.
|
||||
// Intended for passing across replicas.
|
||||
// Can be accessed across cores.
|
||||
@@ -166,6 +168,7 @@ public:
|
||||
uint32_t row_limit;
|
||||
gc_clock::time_point timestamp;
|
||||
std::experimental::optional<tracing::trace_info> trace_info;
|
||||
uint32_t partition_limit;
|
||||
api::timestamp_type read_timestamp; // not serialized
|
||||
public:
|
||||
read_command(utils::UUID cf_id,
|
||||
@@ -174,6 +177,7 @@ public:
|
||||
uint32_t row_limit = max_rows,
|
||||
gc_clock::time_point now = gc_clock::now(),
|
||||
std::experimental::optional<tracing::trace_info> ti = std::experimental::nullopt,
|
||||
uint32_t partition_limit = max_partitions,
|
||||
api::timestamp_type rt = api::missing_timestamp)
|
||||
: cf_id(std::move(cf_id))
|
||||
, schema_version(std::move(schema_version))
|
||||
@@ -181,6 +185,7 @@ public:
|
||||
, row_limit(row_limit)
|
||||
, timestamp(now)
|
||||
, trace_info(ti)
|
||||
, partition_limit(partition_limit)
|
||||
, read_timestamp(rt)
|
||||
{ }
|
||||
|
||||
|
||||
3
query.cc
3
query.cc
@@ -57,7 +57,8 @@ std::ostream& operator<<(std::ostream& out, const read_command& r) {
|
||||
<< ", version=" << r.schema_version
|
||||
<< ", slice=" << r.slice << ""
|
||||
<< ", limit=" << r.row_limit
|
||||
<< ", timestamp=" << r.timestamp.time_since_epoch().count() << "}";
|
||||
<< ", timestamp=" << r.timestamp.time_since_epoch().count() << "}"
|
||||
<< ", partition_limit=" << r.partition_limit << "}";
|
||||
}
|
||||
|
||||
std::ostream& operator<<(std::ostream& out, const specific_ranges& s) {
|
||||
|
||||
@@ -100,7 +100,7 @@ SEASTAR_TEST_CASE(test_reading_from_single_partition) {
|
||||
auto slice = make_full_slice(*s);
|
||||
|
||||
reconcilable_result result = mutation_query(s, src,
|
||||
query::full_partition_range, slice, 2, now).get0();
|
||||
query::full_partition_range, slice, 2, query::max_partitions, now).get0();
|
||||
|
||||
// FIXME: use mutation assertions
|
||||
assert_that(to_result_set(result, s, slice))
|
||||
@@ -123,7 +123,7 @@ SEASTAR_TEST_CASE(test_reading_from_single_partition) {
|
||||
.build();
|
||||
|
||||
reconcilable_result result = mutation_query(s, src,
|
||||
query::full_partition_range, slice, query::max_rows, now).get0();
|
||||
query::full_partition_range, slice, query::max_rows, query::max_partitions, now).get0();
|
||||
|
||||
assert_that(to_result_set(result, s, slice))
|
||||
.has_only(a_row()
|
||||
@@ -157,7 +157,7 @@ SEASTAR_TEST_CASE(test_cells_are_expired_according_to_query_timestamp) {
|
||||
auto slice = make_full_slice(*s);
|
||||
|
||||
reconcilable_result result = mutation_query(s, src,
|
||||
query::full_partition_range, slice, 1, now).get0();
|
||||
query::full_partition_range, slice, 1, query::max_partitions, now).get0();
|
||||
|
||||
assert_that(to_result_set(result, s, slice))
|
||||
.has_only(a_row()
|
||||
@@ -171,7 +171,7 @@ SEASTAR_TEST_CASE(test_cells_are_expired_according_to_query_timestamp) {
|
||||
auto slice = make_full_slice(*s);
|
||||
|
||||
reconcilable_result result = mutation_query(s, src,
|
||||
query::full_partition_range, slice, 1, now + 2s).get0();
|
||||
query::full_partition_range, slice, 1, query::max_partitions, now + 2s).get0();
|
||||
|
||||
assert_that(to_result_set(result, s, slice))
|
||||
.has_only(a_row()
|
||||
@@ -204,7 +204,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) {
|
||||
.build();
|
||||
|
||||
reconcilable_result result = mutation_query(s, src,
|
||||
query::full_partition_range, slice, 3, now).get0();
|
||||
query::full_partition_range, slice, 3, query::max_partitions, now).get0();
|
||||
|
||||
assert_that(to_result_set(result, s, slice))
|
||||
.has_size(3)
|
||||
@@ -234,7 +234,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) {
|
||||
.build();
|
||||
|
||||
reconcilable_result result = mutation_query(s, src,
|
||||
query::full_partition_range, slice, 3, now).get0();
|
||||
query::full_partition_range, slice, 3, query::max_partitions, now).get0();
|
||||
|
||||
assert_that(to_result_set(result, s, slice))
|
||||
.has_size(3)
|
||||
@@ -262,7 +262,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) {
|
||||
|
||||
{
|
||||
reconcilable_result result = mutation_query(s, src,
|
||||
query::full_partition_range, slice, 10, now).get0();
|
||||
query::full_partition_range, slice, 10, query::max_partitions, now).get0();
|
||||
|
||||
assert_that(to_result_set(result, s, slice))
|
||||
.has_size(3)
|
||||
@@ -282,7 +282,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) {
|
||||
|
||||
{
|
||||
reconcilable_result result = mutation_query(s, src,
|
||||
query::full_partition_range, slice, 1, now).get0();
|
||||
query::full_partition_range, slice, 1, query::max_partitions, now).get0();
|
||||
|
||||
assert_that(to_result_set(result, s, slice))
|
||||
.has_size(1)
|
||||
@@ -294,7 +294,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) {
|
||||
|
||||
{
|
||||
reconcilable_result result = mutation_query(s, src,
|
||||
query::full_partition_range, slice, 2, now).get0();
|
||||
query::full_partition_range, slice, 2, query::max_partitions, now).get0();
|
||||
|
||||
assert_that(to_result_set(result, s, slice))
|
||||
.has_size(2)
|
||||
@@ -321,7 +321,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) {
|
||||
.build();
|
||||
|
||||
reconcilable_result result = mutation_query(s, src,
|
||||
query::full_partition_range, slice, 2, now).get0();
|
||||
query::full_partition_range, slice, 2, query::max_partitions, now).get0();
|
||||
|
||||
assert_that(to_result_set(result, s, slice))
|
||||
.has_size(2)
|
||||
@@ -345,7 +345,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) {
|
||||
.build();
|
||||
|
||||
reconcilable_result result = mutation_query(s, src,
|
||||
query::full_partition_range, slice, 3, now).get0();
|
||||
query::full_partition_range, slice, 3, query::max_partitions, now).get0();
|
||||
|
||||
assert_that(to_result_set(result, s, slice))
|
||||
.has_size(2)
|
||||
@@ -367,7 +367,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) {
|
||||
.build();
|
||||
|
||||
reconcilable_result result = mutation_query(s, src,
|
||||
query::full_partition_range, slice, 3, now).get0();
|
||||
query::full_partition_range, slice, 3, query::max_partitions, now).get0();
|
||||
|
||||
assert_that(to_result_set(result, s, slice))
|
||||
.has_only(a_row()
|
||||
@@ -393,7 +393,7 @@ SEASTAR_TEST_CASE(test_query_when_partition_tombstone_covers_live_cells) {
|
||||
auto slice = make_full_slice(*s);
|
||||
|
||||
reconcilable_result result = mutation_query(s, src,
|
||||
query::full_partition_range, slice, query::max_rows, now).get0();
|
||||
query::full_partition_range, slice, query::max_rows, query::max_partitions, now).get0();
|
||||
|
||||
assert_that(to_result_set(result, s, slice))
|
||||
.is_empty();
|
||||
@@ -443,7 +443,7 @@ SEASTAR_TEST_CASE(test_partitions_with_only_expired_tombstones_are_dropped) {
|
||||
|
||||
auto query_time = now + std::chrono::seconds(1);
|
||||
|
||||
reconcilable_result result = mutation_query(s, src, query::full_partition_range, slice, query::max_rows, query_time).get0();
|
||||
reconcilable_result result = mutation_query(s, src, query::full_partition_range, slice, query::max_rows, query::max_partitions, query_time).get0();
|
||||
|
||||
BOOST_REQUIRE_EQUAL(result.partitions().size(), 2);
|
||||
BOOST_REQUIRE_EQUAL(result.row_count(), 2);
|
||||
@@ -462,24 +462,24 @@ SEASTAR_TEST_CASE(test_result_row_count) {
|
||||
auto src = make_source({m1});
|
||||
|
||||
|
||||
auto r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, now).get0(), s, slice);
|
||||
auto r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, query::max_partitions, now).get0(), s, slice);
|
||||
BOOST_REQUIRE_EQUAL(r.row_count().value(), 0);
|
||||
|
||||
m1.set_static_cell("s1", data_value(bytes("S_v1")), 1);
|
||||
r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, now).get0(), s, slice);
|
||||
r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, query::max_partitions, now).get0(), s, slice);
|
||||
BOOST_REQUIRE_EQUAL(r.row_count().value(), 1);
|
||||
|
||||
m1.set_clustered_cell(clustering_key::from_single_value(*s, bytes("A")), "v1", data_value(bytes("A_v1")), 1);
|
||||
r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, now).get0(), s, slice);
|
||||
r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, query::max_partitions, now).get0(), s, slice);
|
||||
BOOST_REQUIRE_EQUAL(r.row_count().value(), 1);
|
||||
|
||||
m1.set_clustered_cell(clustering_key::from_single_value(*s, bytes("B")), "v1", data_value(bytes("B_v1")), 1);
|
||||
r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, now).get0(), s, slice);
|
||||
r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, query::max_partitions, now).get0(), s, slice);
|
||||
BOOST_REQUIRE_EQUAL(r.row_count().value(), 2);
|
||||
|
||||
mutation m2(partition_key::from_single_value(*s, "key2"), s);
|
||||
m2.set_static_cell("s1", data_value(bytes("S_v1")), 1);
|
||||
r = to_data_query_result(mutation_query(s, make_source({m1, m2}), query::full_partition_range, slice, 10000, now).get0(), s, slice);
|
||||
r = to_data_query_result(mutation_query(s, make_source({m1, m2}), query::full_partition_range, slice, 10000, query::max_partitions, now).get0(), s, slice);
|
||||
BOOST_REQUIRE_EQUAL(r.row_count().value(), 3);
|
||||
});
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user