diff --git a/database.cc b/database.cc index 6eb473ab2f..4a6ebe8b22 100644 --- a/database.cc +++ b/database.cc @@ -1176,10 +1176,12 @@ future, cache_temperature>> database::query(schema_ptr s, const query::read_command& cmd, query::result_options opts, const dht::partition_range_vector& ranges, tracing::trace_state_ptr trace_state, uint64_t max_result_size, db::timeout_clock::time_point timeout) { column_family& cf = find_column_family(cmd.cf_id); + query_class_config class_config{cf.read_concurrency_semaphore(), cf.get_config().max_memory_for_unlimited_query}; query::querier_cache_context cache_ctx(_querier_cache, cmd.query_uuid, cmd.is_first_page); return _data_query_stage(&cf, std::move(s), seastar::cref(cmd), + class_config, opts, seastar::cref(ranges), std::move(trace_state), @@ -1203,6 +1205,7 @@ future> database::query_mutations(schema_ptr s, const query::read_command& cmd, const dht::partition_range& range, query::result_memory_accounter&& accounter, tracing::trace_state_ptr trace_state, db::timeout_clock::time_point timeout) { column_family& cf = find_column_family(cmd.cf_id); + query_class_config class_config{cf.read_concurrency_semaphore(), cf.get_config().max_memory_for_unlimited_query}; query::querier_cache_context cache_ctx(_querier_cache, cmd.query_uuid, cmd.is_first_page); return _mutation_query_stage(std::move(s), cf.as_mutation_source(), @@ -1212,7 +1215,7 @@ database::query_mutations(schema_ptr s, const query::read_command& cmd, const dh cmd.partition_limit, cmd.timestamp, timeout, - cf.get_config().max_memory_for_unlimited_query, + class_config, std::move(accounter), std::move(trace_state), std::move(cache_ctx)).then_wrapped([this, s = _stats, hit_rate = cf.get_global_cache_hit_rate(), op = cf.read_in_progress()] (auto f) { diff --git a/database.hh b/database.hh index 79be4a4ca5..c1f7a4f1e7 100644 --- a/database.hh +++ b/database.hh @@ -93,6 +93,7 @@ #include "utils/disk-error-handler.hh" #include "utils/updateable_value.hh" #include "user_types_metadata.hh" +#include "query_class_config.hh" class cell_locker; class cell_locker_stats; @@ -798,6 +799,7 @@ public: // Returns at most "cmd.limit" rows future> query(schema_ptr, const query::read_command& cmd, + query_class_config class_config, query::result_options opts, const dht::partition_range_vector& ranges, tracing::trace_state_ptr trace_state, @@ -1350,6 +1352,7 @@ private: column_family*, schema_ptr, const query::read_command&, + query_class_config, query::result_options, const dht::partition_range_vector&, tracing::trace_state_ptr, diff --git a/mutation_partition.cc b/mutation_partition.cc index 58bf6daf78..d2fdfb8d26 100644 --- a/mutation_partition.cc +++ b/mutation_partition.cc @@ -2167,7 +2167,7 @@ future<> data_query( gc_clock::time_point query_time, query::result::builder& builder, db::timeout_clock::time_point timeout, - uint64_t max_memory_reverse_query, + query_class_config class_config, tracing::trace_state_ptr trace_ptr, query::querier_cache_context cache_ctx) { @@ -2180,10 +2180,9 @@ future<> data_query( ? std::move(*querier_opt) : query::data_querier(source, s, range, slice, service::get_local_sstable_query_read_priority(), trace_ptr); - return do_with(std::move(q), [=, &builder, trace_ptr = std::move(trace_ptr), - cache_ctx = std::move(cache_ctx)] (query::data_querier& q) mutable { + return do_with(std::move(q), [=, &builder, trace_ptr = std::move(trace_ptr), cache_ctx = std::move(cache_ctx)] (query::data_querier& q) mutable { auto qrb = query_result_builder(*s, builder); - return q.consume_page(std::move(qrb), row_limit, partition_limit, query_time, timeout, max_memory_reverse_query).then( + return q.consume_page(std::move(qrb), row_limit, partition_limit, query_time, timeout, class_config.max_memory_for_unlimited_query).then( [=, &builder, &q, trace_ptr = std::move(trace_ptr), cache_ctx = std::move(cache_ctx)] () mutable { if (q.are_limits_reached() || builder.is_short_read()) { cache_ctx.insert(std::move(q), std::move(trace_ptr)); @@ -2262,7 +2261,7 @@ static do_mutation_query(schema_ptr s, uint32_t partition_limit, gc_clock::time_point query_time, db::timeout_clock::time_point timeout, - uint64_t max_memory_reverse_query, + query_class_config class_config, query::result_memory_accounter&& accounter, tracing::trace_state_ptr trace_ptr, query::querier_cache_context cache_ctx) @@ -2279,7 +2278,7 @@ static do_mutation_query(schema_ptr s, return do_with(std::move(q), [=, &slice, accounter = std::move(accounter), trace_ptr = std::move(trace_ptr), cache_ctx = std::move(cache_ctx)] ( query::mutation_querier& q) mutable { auto rrb = reconcilable_result_builder(*s, slice, std::move(accounter)); - return q.consume_page(std::move(rrb), row_limit, partition_limit, query_time, timeout, max_memory_reverse_query).then( + return q.consume_page(std::move(rrb), row_limit, partition_limit, query_time, timeout, class_config.max_memory_for_unlimited_query).then( [=, &q, trace_ptr = std::move(trace_ptr), cache_ctx = std::move(cache_ctx)] (reconcilable_result r) mutable { if (q.are_limits_reached() || r.is_short_read()) { cache_ctx.insert(std::move(q), std::move(trace_ptr)); @@ -2302,13 +2301,13 @@ mutation_query(schema_ptr s, uint32_t partition_limit, gc_clock::time_point query_time, db::timeout_clock::time_point timeout, - uint64_t max_memory_reverse_query, + query_class_config class_config, query::result_memory_accounter&& accounter, tracing::trace_state_ptr trace_ptr, query::querier_cache_context cache_ctx) { return do_mutation_query(std::move(s), std::move(source), seastar::cref(range), seastar::cref(slice), - row_limit, partition_limit, query_time, timeout, max_memory_reverse_query, std::move(accounter), std::move(trace_ptr), std::move(cache_ctx)); + row_limit, partition_limit, query_time, timeout, class_config, std::move(accounter), std::move(trace_ptr), std::move(cache_ctx)); } deletable_row::deletable_row(clustering_row&& cr) diff --git a/mutation_query.hh b/mutation_query.hh index 6b5efb5a39..0e51942056 100644 --- a/mutation_query.hh +++ b/mutation_query.hh @@ -28,6 +28,7 @@ #include "db/timeout_clock.hh" #include "querier.hh" #include "utils/chunked_vector.hh" +#include "query_class_config.hh" #include class reconcilable_result; @@ -162,7 +163,7 @@ future mutation_query( uint32_t partition_limit, gc_clock::time_point query_time, db::timeout_clock::time_point timeout, - uint64_t max_memory_reverse_query, + query_class_config class_config, query::result_memory_accounter&& accounter = { }, tracing::trace_state_ptr trace_ptr = nullptr, query::querier_cache_context cache_ctx = { }); @@ -177,7 +178,7 @@ future<> data_query( gc_clock::time_point query_time, query::result::builder& builder, db::timeout_clock::time_point timeout, - uint64_t max_memory_reverse_query, + query_class_config class_config, tracing::trace_state_ptr trace_ptr = nullptr, query::querier_cache_context cache_ctx = { }); @@ -192,7 +193,7 @@ class mutation_query_stage { uint32_t, gc_clock::time_point, db::timeout_clock::time_point, - uint64_t, + query_class_config, query::result_memory_accounter&&, tracing::trace_state_ptr, query::querier_cache_context> _execution_stage; diff --git a/table.cc b/table.cc index bf55ce62d4..df37e02fdf 100644 --- a/table.cc +++ b/table.cc @@ -2357,6 +2357,7 @@ struct query_state { future> table::query(schema_ptr s, const query::read_command& cmd, + query_class_config class_config, query::result_options opts, const dht::partition_range_vector& partition_ranges, tracing::trace_state_ptr trace_state, @@ -2368,14 +2369,14 @@ table::query(schema_ptr s, _stats.reads.set_latency(lc); auto f = opts.request == query::result_request::only_digest ? memory_limiter.new_digest_read(max_size) : memory_limiter.new_data_read(max_size); - return f.then([this, lc, s = std::move(s), &cmd, opts, &partition_ranges, + return f.then([this, lc, s = std::move(s), &cmd, class_config, opts, &partition_ranges, trace_state = std::move(trace_state), timeout, cache_ctx = std::move(cache_ctx)] (query::result_memory_accounter accounter) mutable { auto qs_ptr = std::make_unique(std::move(s), cmd, opts, partition_ranges, std::move(accounter)); auto& qs = *qs_ptr; - return do_until(std::bind(&query_state::done, &qs), [this, &qs, trace_state = std::move(trace_state), timeout, cache_ctx = std::move(cache_ctx)] { + return do_until(std::bind(&query_state::done, &qs), [this, &qs, class_config, trace_state = std::move(trace_state), timeout, cache_ctx = std::move(cache_ctx)] { auto&& range = *qs.current_partition_range++; return data_query(qs.schema, as_mutation_source(), range, qs.cmd.slice, qs.remaining_rows(), - qs.remaining_partitions(), qs.cmd.timestamp, qs.builder, timeout, _config.max_memory_for_unlimited_query, trace_state, cache_ctx); + qs.remaining_partitions(), qs.cmd.timestamp, qs.builder, timeout, class_config, trace_state, cache_ctx); }).then([qs_ptr = std::move(qs_ptr), &qs] { return make_ready_future>( make_lw_shared(qs.builder.build())); diff --git a/test/boost/mutation_query_test.cc b/test/boost/mutation_query_test.cc index 5aeee26439..ecea6e2c76 100644 --- a/test/boost/mutation_query_test.cc +++ b/test/boost/mutation_query_test.cc @@ -34,6 +34,7 @@ #include "test/lib/mutation_assertions.hh" #include "test/lib/result_set_assertions.hh" #include "test/lib/mutation_source_test.hh" +#include "test/lib/reader_permit.hh" #include "mutation_query.hh" #include @@ -76,7 +77,6 @@ static query::partition_slice make_full_slice(const schema& s) { } static auto inf32 = std::numeric_limits::max(); -static const uint64_t max_memory_for_reverse_query = 1 << 20; query::result_set to_result_set(const reconcilable_result& r, schema_ptr s, const query::partition_slice& slice) { return query::result_set::from_raw_result(s, slice, to_data_query_result(r, s, slice, inf32, inf32)); @@ -101,7 +101,7 @@ SEASTAR_TEST_CASE(test_reading_from_single_partition) { auto slice = make_full_slice(*s); reconcilable_result result = mutation_query(s, src, - query::full_partition_range, slice, 2, query::max_partitions, now, db::no_timeout, max_memory_for_reverse_query).get0(); + query::full_partition_range, slice, 2, query::max_partitions, now, db::no_timeout, tests::make_query_class_config()).get0(); // FIXME: use mutation assertions assert_that(to_result_set(result, s, slice)) @@ -124,7 +124,7 @@ SEASTAR_TEST_CASE(test_reading_from_single_partition) { .build(); reconcilable_result result = mutation_query(s, src, - query::full_partition_range, slice, query::max_rows, query::max_partitions, now, db::no_timeout, max_memory_for_reverse_query).get0(); + query::full_partition_range, slice, query::max_rows, query::max_partitions, now, db::no_timeout, tests::make_query_class_config()).get0(); assert_that(to_result_set(result, s, slice)) .has_only(a_row() @@ -160,7 +160,7 @@ SEASTAR_TEST_CASE(test_cells_are_expired_according_to_query_timestamp) { auto slice = make_full_slice(*s); reconcilable_result result = mutation_query(s, src, - query::full_partition_range, slice, 1, query::max_partitions, now, db::no_timeout, max_memory_for_reverse_query).get0(); + query::full_partition_range, slice, 1, query::max_partitions, now, db::no_timeout, tests::make_query_class_config()).get0(); assert_that(to_result_set(result, s, slice)) .has_only(a_row() @@ -174,7 +174,7 @@ SEASTAR_TEST_CASE(test_cells_are_expired_according_to_query_timestamp) { auto slice = make_full_slice(*s); reconcilable_result result = mutation_query(s, src, - query::full_partition_range, slice, 1, query::max_partitions, now + 2s, db::no_timeout, max_memory_for_reverse_query).get0(); + query::full_partition_range, slice, 1, query::max_partitions, now + 2s, db::no_timeout, tests::make_query_class_config()).get0(); assert_that(to_result_set(result, s, slice)) .has_only(a_row() @@ -207,7 +207,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) { .build(); reconcilable_result result = mutation_query(s, src, - query::full_partition_range, slice, 3, query::max_partitions, now, db::no_timeout, max_memory_for_reverse_query).get0(); + query::full_partition_range, slice, 3, query::max_partitions, now, db::no_timeout, tests::make_query_class_config()).get0(); assert_that(to_result_set(result, s, slice)) .has_size(3) @@ -237,7 +237,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) { .build(); reconcilable_result result = mutation_query(s, src, - query::full_partition_range, slice, 3, query::max_partitions, now, db::no_timeout, max_memory_for_reverse_query).get0(); + query::full_partition_range, slice, 3, query::max_partitions, now, db::no_timeout, tests::make_query_class_config()).get0(); assert_that(to_result_set(result, s, slice)) .has_size(3) @@ -265,7 +265,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) { { reconcilable_result result = mutation_query(s, src, - query::full_partition_range, slice, 10, query::max_partitions, now, db::no_timeout, max_memory_for_reverse_query).get0(); + query::full_partition_range, slice, 10, query::max_partitions, now, db::no_timeout, tests::make_query_class_config()).get0(); assert_that(to_result_set(result, s, slice)) .has_size(3) @@ -285,7 +285,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) { { reconcilable_result result = mutation_query(s, src, - query::full_partition_range, slice, 1, query::max_partitions, now, db::no_timeout, max_memory_for_reverse_query).get0(); + query::full_partition_range, slice, 1, query::max_partitions, now, db::no_timeout, tests::make_query_class_config()).get0(); assert_that(to_result_set(result, s, slice)) .has_size(1) @@ -297,7 +297,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) { { reconcilable_result result = mutation_query(s, src, - query::full_partition_range, slice, 2, query::max_partitions, now, db::no_timeout, max_memory_for_reverse_query).get0(); + query::full_partition_range, slice, 2, query::max_partitions, now, db::no_timeout, tests::make_query_class_config()).get0(); assert_that(to_result_set(result, s, slice)) .has_size(2) @@ -324,7 +324,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) { .build(); reconcilable_result result = mutation_query(s, src, - query::full_partition_range, slice, 2, query::max_partitions, now, db::no_timeout, max_memory_for_reverse_query).get0(); + query::full_partition_range, slice, 2, query::max_partitions, now, db::no_timeout, tests::make_query_class_config()).get0(); assert_that(to_result_set(result, s, slice)) .has_size(2) @@ -348,7 +348,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) { .build(); reconcilable_result result = mutation_query(s, src, - query::full_partition_range, slice, 3, query::max_partitions, now, db::no_timeout, max_memory_for_reverse_query).get0(); + query::full_partition_range, slice, 3, query::max_partitions, now, db::no_timeout, tests::make_query_class_config()).get0(); assert_that(to_result_set(result, s, slice)) .has_size(2) @@ -370,7 +370,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) { .build(); reconcilable_result result = mutation_query(s, src, - query::full_partition_range, slice, 3, query::max_partitions, now, db::no_timeout, max_memory_for_reverse_query).get0(); + query::full_partition_range, slice, 3, query::max_partitions, now, db::no_timeout, tests::make_query_class_config()).get0(); assert_that(to_result_set(result, s, slice)) .has_only(a_row() @@ -396,7 +396,7 @@ SEASTAR_TEST_CASE(test_query_when_partition_tombstone_covers_live_cells) { auto slice = make_full_slice(*s); reconcilable_result result = mutation_query(s, src, - query::full_partition_range, slice, query::max_rows, query::max_partitions, now, db::no_timeout, max_memory_for_reverse_query).get0(); + query::full_partition_range, slice, query::max_rows, query::max_partitions, now, db::no_timeout, tests::make_query_class_config()).get0(); assert_that(to_result_set(result, s, slice)) .is_empty(); @@ -447,7 +447,7 @@ SEASTAR_TEST_CASE(test_partitions_with_only_expired_tombstones_are_dropped) { auto query_time = now + std::chrono::seconds(1); reconcilable_result result = mutation_query(s, src, query::full_partition_range, slice, query::max_rows, query::max_partitions, query_time, - db::no_timeout, max_memory_for_reverse_query).get0(); + db::no_timeout, tests::make_query_class_config()).get0(); BOOST_REQUIRE_EQUAL(result.partitions().size(), 2); BOOST_REQUIRE_EQUAL(result.row_count(), 2); @@ -466,28 +466,28 @@ SEASTAR_TEST_CASE(test_result_row_count) { auto src = make_source({m1}); auto r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, query::max_partitions, now, - db::no_timeout, max_memory_for_reverse_query).get0(), s, slice, inf32, inf32); + db::no_timeout, tests::make_query_class_config()).get0(), s, slice, inf32, inf32); BOOST_REQUIRE_EQUAL(r.row_count().value(), 0); m1.set_static_cell("s1", data_value(bytes("S_v1")), 1); r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, query::max_partitions, now, - db::no_timeout, max_memory_for_reverse_query).get0(), s, slice, inf32, inf32); + db::no_timeout, tests::make_query_class_config()).get0(), s, slice, inf32, inf32); BOOST_REQUIRE_EQUAL(r.row_count().value(), 1); m1.set_clustered_cell(clustering_key::from_single_value(*s, bytes("A")), "v1", data_value(bytes("A_v1")), 1); r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, query::max_partitions, now, - db::no_timeout, max_memory_for_reverse_query).get0(), s, slice, inf32, inf32); + db::no_timeout, tests::make_query_class_config()).get0(), s, slice, inf32, inf32); BOOST_REQUIRE_EQUAL(r.row_count().value(), 1); m1.set_clustered_cell(clustering_key::from_single_value(*s, bytes("B")), "v1", data_value(bytes("B_v1")), 1); r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, query::max_partitions, now, - db::no_timeout, max_memory_for_reverse_query).get0(), s, slice, inf32, inf32); + db::no_timeout, tests::make_query_class_config()).get0(), s, slice, inf32, inf32); BOOST_REQUIRE_EQUAL(r.row_count().value(), 2); mutation m2(s, partition_key::from_single_value(*s, "key2")); m2.set_static_cell("s1", data_value(bytes("S_v1")), 1); r = to_data_query_result(mutation_query(s, make_source({m1, m2}), query::full_partition_range, slice, 10000, query::max_partitions, now, - db::no_timeout, max_memory_for_reverse_query).get0(), s, slice, inf32, inf32); + db::no_timeout, tests::make_query_class_config()).get0(), s, slice, inf32, inf32); BOOST_REQUIRE_EQUAL(r.row_count().value(), 3); }); } @@ -510,7 +510,7 @@ SEASTAR_TEST_CASE(test_partition_limit) { { reconcilable_result result = mutation_query(s, src, - query::full_partition_range, slice, query::max_rows, 10, now, db::no_timeout, max_memory_for_reverse_query).get0(); + query::full_partition_range, slice, query::max_rows, 10, now, db::no_timeout, tests::make_query_class_config()).get0(); assert_that(to_result_set(result, s, slice)) .has_size(2) @@ -526,7 +526,7 @@ SEASTAR_TEST_CASE(test_partition_limit) { { reconcilable_result result = mutation_query(s, src, - query::full_partition_range, slice, query::max_rows, 1, now, db::no_timeout, max_memory_for_reverse_query).get0(); + query::full_partition_range, slice, query::max_rows, 1, now, db::no_timeout, tests::make_query_class_config()).get0(); assert_that(to_result_set(result, s, slice)) .has_size(1) @@ -549,11 +549,11 @@ SEASTAR_THREAD_TEST_CASE(test_result_size_calculation) { query::result::builder digest_only_builder(slice, query::result_options{query::result_request::only_digest, query::digest_algorithm::xxHash}, l.new_digest_read(query::result_memory_limiter::maximum_result_size).get0()); data_query(s, source, query::full_partition_range, slice, std::numeric_limits::max(), std::numeric_limits::max(), - gc_clock::now(), digest_only_builder, db::no_timeout, max_memory_for_reverse_query).get0(); + gc_clock::now(), digest_only_builder, db::no_timeout, tests::make_query_class_config()).get0(); query::result::builder result_and_digest_builder(slice, query::result_options{query::result_request::result_and_digest, query::digest_algorithm::xxHash}, l.new_data_read(query::result_memory_limiter::maximum_result_size).get0()); data_query(s, source, query::full_partition_range, slice, std::numeric_limits::max(), std::numeric_limits::max(), - gc_clock::now(), result_and_digest_builder, db::no_timeout, max_memory_for_reverse_query).get0(); + gc_clock::now(), result_and_digest_builder, db::no_timeout, tests::make_query_class_config()).get0(); BOOST_REQUIRE_EQUAL(digest_only_builder.memory_accounter().used_memory(), result_and_digest_builder.memory_accounter().used_memory()); }