From 69798df95e78c2574b850c367d8ae0cb12409fa8 Mon Sep 17 00:00:00 2001 From: Duarte Nunes Date: Sun, 12 Jun 2016 01:04:57 +0200 Subject: [PATCH] query: Limit number of partitions returned This is required to implement a thrift verb. Signed-off-by: Duarte Nunes --- database.cc | 11 ++++++++--- idl/read_command.idl.hh | 1 + mutation_partition.cc | 31 +++++++++++++++++------------ mutation_query.cc | 5 ++++- mutation_query.hh | 14 +++++++++---- query-request.hh | 5 +++++ query.cc | 3 ++- tests/mutation_query_test.cc | 38 ++++++++++++++++++------------------ 8 files changed, 68 insertions(+), 40 deletions(-) diff --git a/database.cc b/database.cc index 82f43f29de..9da82b05a8 100644 --- a/database.cc +++ b/database.cc @@ -1885,6 +1885,7 @@ struct query_state { , cmd(cmd) , builder(cmd.slice, request) , limit(cmd.row_limit) + , partition_limit(cmd.partition_limit) , current_partition_range(ranges.begin()) , range_end(ranges.end()){ } @@ -1892,6 +1893,7 @@ struct query_state { const query::read_command& cmd; query::result::builder builder; uint32_t limit; + uint32_t partition_limit; bool range_empty = false; // Avoid ubsan false-positive when moving after construction std::vector::const_iterator current_partition_range; std::vector::const_iterator range_end; @@ -1910,8 +1912,10 @@ column_family::query(schema_ptr s, const query::read_command& cmd, query::result { return do_until(std::bind(&query_state::done, &qs), [this, &qs] { auto&& range = *qs.current_partition_range++; - return data_query(qs.schema, as_mutation_source(), range, qs.cmd.slice, qs.limit, qs.cmd.timestamp, qs.builder).then([&qs] (auto live_rows) { - qs.limit -= live_rows; + return data_query(qs.schema, as_mutation_source(), range, qs.cmd.slice, qs.limit, qs.partition_limit, + qs.cmd.timestamp, qs.builder).then([&qs] (auto&& r) { + qs.limit -= r.live_rows; + qs.partition_limit -= r.partitions; }); }).then([qs_ptr = std::move(qs_ptr), &qs] { return make_ready_future>( @@ -1947,7 +1951,8 @@ database::query(schema_ptr s, const query::read_command& cmd, query::result_requ future database::query_mutations(schema_ptr s, const query::read_command& cmd, const query::partition_range& range) { column_family& cf = find_column_family(cmd.cf_id); - return mutation_query(std::move(s), cf.as_mutation_source(), range, cmd.slice, cmd.row_limit, cmd.timestamp).then([this, s = _stats] (auto&& res) { + return mutation_query(std::move(s), cf.as_mutation_source(), range, cmd.slice, cmd.row_limit, cmd.partition_limit, + cmd.timestamp).then([this, s = _stats] (auto&& res) { ++s->total_reads; return std::move(res); }); diff --git a/idl/read_command.idl.hh b/idl/read_command.idl.hh index 29348aa138..9165e5042c 100644 --- a/idl/read_command.idl.hh +++ b/idl/read_command.idl.hh @@ -47,6 +47,7 @@ class read_command { uint32_t row_limit; std::chrono::time_point timestamp; std::experimental::optional trace_info [[version 1.3]]; + uint32_t partition_limit [[version 1.3]] = std::numeric_limits::max(); }; } diff --git a/mutation_partition.cc b/mutation_partition.cc index 2ca08f8b15..62ecd436f5 100644 --- a/mutation_partition.cc +++ b/mutation_partition.cc @@ -1660,6 +1660,7 @@ class compact_for_query { api::timestamp_type _max_purgeable = api::max_timestamp; const query::partition_slice& _slice; uint32_t _row_limit; + uint32_t _partition_limit; uint32_t _partition_row_limit; Consumer _consumer; @@ -1693,12 +1694,14 @@ private: return t.timestamp < _max_purgeable && t.deletion_time < _gc_before; }; public: - compact_for_query(const schema& s, gc_clock::time_point query_time, const query::partition_slice& slice, uint32_t limit, Consumer consumer) + compact_for_query(const schema& s, gc_clock::time_point query_time, const query::partition_slice& slice, uint32_t limit, + uint32_t partition_limit, Consumer consumer) : _schema(s) , _query_time(query_time) , _gc_before(query_time - s.gc_grace_seconds()) , _slice(slice) , _row_limit(limit) + , _partition_limit(partition_limit) , _partition_row_limit(_slice.options.contains(query::partition_slice::option::distinct) ? 1 : slice.partition_row_limit()) , _consumer(std::move(consumer)) { } @@ -1792,8 +1795,9 @@ public: } _row_limit -= _rows_in_current_partition; + _partition_limit -= 1; _consumer.consume_end_of_partition(); - return _row_limit ? stop_iteration::no : stop_iteration::yes; + return _row_limit && _partition_limit ? stop_iteration::no : stop_iteration::yes; } return stop_iteration::no; } @@ -1931,6 +1935,7 @@ uint32_t mutation_querier::consume_end_of_stream() { class query_result_builder { const schema& _schema; uint32_t _live_rows = 0; + uint32_t _partitions = 0; query::result::builder& _rb; stdx::optional _pw; stdx::optional _mutation_consumer; @@ -1961,25 +1966,26 @@ public: void consume_end_of_partition() { _live_rows += _mutation_consumer->consume_end_of_stream(); + _partitions += 1; } - uint32_t consume_end_of_stream() { - return _live_rows; + data_query_result consume_end_of_stream() { + return {_live_rows, _partitions}; } }; -future data_query(schema_ptr s, const mutation_source& source, const query::partition_range& range, - const query::partition_slice& slice, uint32_t row_limit, gc_clock::time_point query_time, - query::result::builder& builder) +future data_query(schema_ptr s, const mutation_source& source, const query::partition_range& range, + const query::partition_slice& slice, uint32_t row_limit, uint32_t partition_limit, + gc_clock::time_point query_time, query::result::builder& builder) { - if (row_limit == 0 || slice.partition_row_limit() == 0) { - return make_ready_future(0); + if (row_limit == 0 || slice.partition_row_limit() == 0 || partition_limit == 0) { + return make_ready_future(); } auto is_reversed = slice.options.contains(query::partition_slice::option::reversed); auto qrb = query_result_builder(*s, builder); - auto cfq = compact_for_query(*s, query_time, slice, row_limit, std::move(qrb)); + auto cfq = compact_for_query(*s, query_time, slice, row_limit, partition_limit, std::move(qrb)); auto reader = source(s, range, query::clustering_key_filtering_context::create(s, slice), service::get_local_sstable_query_read_priority()); return consume_flattened(std::move(reader), std::move(cfq), is_reversed); @@ -2044,16 +2050,17 @@ mutation_query(schema_ptr s, const query::partition_range& range, const query::partition_slice& slice, uint32_t row_limit, + uint32_t partition_limit, gc_clock::time_point query_time) { - if (row_limit == 0 || slice.partition_row_limit() == 0) { + if (row_limit == 0 || slice.partition_row_limit() == 0 || partition_limit == 0) { return make_ready_future(reconcilable_result()); } auto is_reversed = slice.options.contains(query::partition_slice::option::reversed); auto rrb = reconcilable_result_builder(*s, slice); - auto cfq = compact_for_query(*s, query_time, slice, row_limit, std::move(rrb)); + auto cfq = compact_for_query(*s, query_time, slice, row_limit, partition_limit, std::move(rrb)); auto reader = source(s, range, query::clustering_key_filtering_context::create(s, slice), service::get_local_sstable_query_read_priority()); return consume_flattened(std::move(reader), std::move(cfq), is_reversed); diff --git a/mutation_query.cc b/mutation_query.cc index 4a0b584180..b04f26ac5a 100644 --- a/mutation_query.cc +++ b/mutation_query.cc @@ -54,9 +54,12 @@ bool reconcilable_result::operator!=(const reconcilable_result& other) const { } query::result -to_data_query_result(const reconcilable_result& r, schema_ptr s, const query::partition_slice& slice) { +to_data_query_result(const reconcilable_result& r, schema_ptr s, const query::partition_slice& slice, uint32_t max_partitions) { query::result::builder builder(slice, query::result_request::only_result); for (const partition& p : r.partitions()) { + if (!max_partitions--) { + break; + } p.mut().unfreeze(s).query(builder, slice, gc_clock::time_point::min(), query::max_rows); } return builder.build(); diff --git a/mutation_query.hh b/mutation_query.hh index 230bc081f8..631208ed57 100644 --- a/mutation_query.hh +++ b/mutation_query.hh @@ -94,7 +94,7 @@ public: printer pretty_printer(schema_ptr) const; }; -query::result to_data_query_result(const reconcilable_result&, schema_ptr, const query::partition_slice&); +query::result to_data_query_result(const reconcilable_result&, schema_ptr, const query::partition_slice&, uint32_t partition_limit = query::max_partitions); // Performs a query on given data source returning data in reconcilable form. // @@ -113,8 +113,14 @@ future mutation_query( const query::partition_range& range, const query::partition_slice& slice, uint32_t row_limit, + uint32_t partition_limit, gc_clock::time_point query_time); -future data_query(schema_ptr s, const mutation_source& source, const query::partition_range& range, - const query::partition_slice& slice, uint32_t row_limit, gc_clock::time_point query_time, - query::result::builder& builder); +struct data_query_result { + uint32_t live_rows{0}; + uint32_t partitions{0}; +}; + +future data_query(schema_ptr s, const mutation_source& source, const query::partition_range& range, + const query::partition_slice& slice, uint32_t row_limit, uint32_t partition_limit, + gc_clock::time_point query_time, query::result::builder& builder); \ No newline at end of file diff --git a/query-request.hh b/query-request.hh index 6a5b5382a5..0cd444ce67 100644 --- a/query-request.hh +++ b/query-request.hh @@ -155,6 +155,8 @@ public: friend std::ostream& operator<<(std::ostream& out, const specific_ranges& ps); }; +constexpr auto max_partitions = std::numeric_limits::max(); + // Full specification of a query to the database. // Intended for passing across replicas. // Can be accessed across cores. @@ -166,6 +168,7 @@ public: uint32_t row_limit; gc_clock::time_point timestamp; std::experimental::optional trace_info; + uint32_t partition_limit; api::timestamp_type read_timestamp; // not serialized public: read_command(utils::UUID cf_id, @@ -174,6 +177,7 @@ public: uint32_t row_limit = max_rows, gc_clock::time_point now = gc_clock::now(), std::experimental::optional ti = std::experimental::nullopt, + uint32_t partition_limit = max_partitions, api::timestamp_type rt = api::missing_timestamp) : cf_id(std::move(cf_id)) , schema_version(std::move(schema_version)) @@ -181,6 +185,7 @@ public: , row_limit(row_limit) , timestamp(now) , trace_info(ti) + , partition_limit(partition_limit) , read_timestamp(rt) { } diff --git a/query.cc b/query.cc index 047af323c2..8ab7b58d59 100644 --- a/query.cc +++ b/query.cc @@ -57,7 +57,8 @@ std::ostream& operator<<(std::ostream& out, const read_command& r) { << ", version=" << r.schema_version << ", slice=" << r.slice << "" << ", limit=" << r.row_limit - << ", timestamp=" << r.timestamp.time_since_epoch().count() << "}"; + << ", timestamp=" << r.timestamp.time_since_epoch().count() << "}" + << ", partition_limit=" << r.partition_limit << "}"; } std::ostream& operator<<(std::ostream& out, const specific_ranges& s) { diff --git a/tests/mutation_query_test.cc b/tests/mutation_query_test.cc index 00f2253118..fb2c69b824 100644 --- a/tests/mutation_query_test.cc +++ b/tests/mutation_query_test.cc @@ -100,7 +100,7 @@ SEASTAR_TEST_CASE(test_reading_from_single_partition) { auto slice = make_full_slice(*s); reconcilable_result result = mutation_query(s, src, - query::full_partition_range, slice, 2, now).get0(); + query::full_partition_range, slice, 2, query::max_partitions, now).get0(); // FIXME: use mutation assertions assert_that(to_result_set(result, s, slice)) @@ -123,7 +123,7 @@ SEASTAR_TEST_CASE(test_reading_from_single_partition) { .build(); reconcilable_result result = mutation_query(s, src, - query::full_partition_range, slice, query::max_rows, now).get0(); + query::full_partition_range, slice, query::max_rows, query::max_partitions, now).get0(); assert_that(to_result_set(result, s, slice)) .has_only(a_row() @@ -157,7 +157,7 @@ SEASTAR_TEST_CASE(test_cells_are_expired_according_to_query_timestamp) { auto slice = make_full_slice(*s); reconcilable_result result = mutation_query(s, src, - query::full_partition_range, slice, 1, now).get0(); + query::full_partition_range, slice, 1, query::max_partitions, now).get0(); assert_that(to_result_set(result, s, slice)) .has_only(a_row() @@ -171,7 +171,7 @@ SEASTAR_TEST_CASE(test_cells_are_expired_according_to_query_timestamp) { auto slice = make_full_slice(*s); reconcilable_result result = mutation_query(s, src, - query::full_partition_range, slice, 1, now + 2s).get0(); + query::full_partition_range, slice, 1, query::max_partitions, now + 2s).get0(); assert_that(to_result_set(result, s, slice)) .has_only(a_row() @@ -204,7 +204,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) { .build(); reconcilable_result result = mutation_query(s, src, - query::full_partition_range, slice, 3, now).get0(); + query::full_partition_range, slice, 3, query::max_partitions, now).get0(); assert_that(to_result_set(result, s, slice)) .has_size(3) @@ -234,7 +234,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) { .build(); reconcilable_result result = mutation_query(s, src, - query::full_partition_range, slice, 3, now).get0(); + query::full_partition_range, slice, 3, query::max_partitions, now).get0(); assert_that(to_result_set(result, s, slice)) .has_size(3) @@ -262,7 +262,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) { { reconcilable_result result = mutation_query(s, src, - query::full_partition_range, slice, 10, now).get0(); + query::full_partition_range, slice, 10, query::max_partitions, now).get0(); assert_that(to_result_set(result, s, slice)) .has_size(3) @@ -282,7 +282,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) { { reconcilable_result result = mutation_query(s, src, - query::full_partition_range, slice, 1, now).get0(); + query::full_partition_range, slice, 1, query::max_partitions, now).get0(); assert_that(to_result_set(result, s, slice)) .has_size(1) @@ -294,7 +294,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) { { reconcilable_result result = mutation_query(s, src, - query::full_partition_range, slice, 2, now).get0(); + query::full_partition_range, slice, 2, query::max_partitions, now).get0(); assert_that(to_result_set(result, s, slice)) .has_size(2) @@ -321,7 +321,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) { .build(); reconcilable_result result = mutation_query(s, src, - query::full_partition_range, slice, 2, now).get0(); + query::full_partition_range, slice, 2, query::max_partitions, now).get0(); assert_that(to_result_set(result, s, slice)) .has_size(2) @@ -345,7 +345,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) { .build(); reconcilable_result result = mutation_query(s, src, - query::full_partition_range, slice, 3, now).get0(); + query::full_partition_range, slice, 3, query::max_partitions, now).get0(); assert_that(to_result_set(result, s, slice)) .has_size(2) @@ -367,7 +367,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) { .build(); reconcilable_result result = mutation_query(s, src, - query::full_partition_range, slice, 3, now).get0(); + query::full_partition_range, slice, 3, query::max_partitions, now).get0(); assert_that(to_result_set(result, s, slice)) .has_only(a_row() @@ -393,7 +393,7 @@ SEASTAR_TEST_CASE(test_query_when_partition_tombstone_covers_live_cells) { auto slice = make_full_slice(*s); reconcilable_result result = mutation_query(s, src, - query::full_partition_range, slice, query::max_rows, now).get0(); + query::full_partition_range, slice, query::max_rows, query::max_partitions, now).get0(); assert_that(to_result_set(result, s, slice)) .is_empty(); @@ -443,7 +443,7 @@ SEASTAR_TEST_CASE(test_partitions_with_only_expired_tombstones_are_dropped) { auto query_time = now + std::chrono::seconds(1); - reconcilable_result result = mutation_query(s, src, query::full_partition_range, slice, query::max_rows, query_time).get0(); + reconcilable_result result = mutation_query(s, src, query::full_partition_range, slice, query::max_rows, query::max_partitions, query_time).get0(); BOOST_REQUIRE_EQUAL(result.partitions().size(), 2); BOOST_REQUIRE_EQUAL(result.row_count(), 2); @@ -462,24 +462,24 @@ SEASTAR_TEST_CASE(test_result_row_count) { auto src = make_source({m1}); - auto r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, now).get0(), s, slice); + auto r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, query::max_partitions, now).get0(), s, slice); BOOST_REQUIRE_EQUAL(r.row_count().value(), 0); m1.set_static_cell("s1", data_value(bytes("S_v1")), 1); - r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, now).get0(), s, slice); + r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, query::max_partitions, now).get0(), s, slice); BOOST_REQUIRE_EQUAL(r.row_count().value(), 1); m1.set_clustered_cell(clustering_key::from_single_value(*s, bytes("A")), "v1", data_value(bytes("A_v1")), 1); - r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, now).get0(), s, slice); + r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, query::max_partitions, now).get0(), s, slice); BOOST_REQUIRE_EQUAL(r.row_count().value(), 1); m1.set_clustered_cell(clustering_key::from_single_value(*s, bytes("B")), "v1", data_value(bytes("B_v1")), 1); - r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, now).get0(), s, slice); + r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, query::max_partitions, now).get0(), s, slice); BOOST_REQUIRE_EQUAL(r.row_count().value(), 2); mutation m2(partition_key::from_single_value(*s, "key2"), s); m2.set_static_cell("s1", data_value(bytes("S_v1")), 1); - r = to_data_query_result(mutation_query(s, make_source({m1, m2}), query::full_partition_range, slice, 10000, now).get0(), s, slice); + r = to_data_query_result(mutation_query(s, make_source({m1, m2}), query::full_partition_range, slice, 10000, query::max_partitions, now).get0(), s, slice); BOOST_REQUIRE_EQUAL(r.row_count().value(), 3); }); }