data_query, mutation_query: use query_class_config

We want to move away from the current practice of selecting the relevant
read concurrency semaphore inside `table` and instead want to pass it
down from `database` so that we can pass down a semaphore that is
appropriate for the class of the query. Use the recently created
`query_class_config` struct for this. This is added as a parameter to
`data_query`, `mutation_query` and propagated down to the point where we
create the `querier` to execute the read. We are already propagating
down a parameter down the same route -- max_memory_reverse_query --
which also happens to be part of `query_class_config`, so simply replace
this parameter with a `query_class_config` one. As the lower layers are
not prepared for a semaphore passed from above, make sure this semaphore
is the same that is selected inside `table`. After the lower layers are
prepared for a semaphore arriving from above, we will switch it to be
the appropriate one for the class of the query.
This commit is contained in:
Botond Dénes
2020-04-16 12:02:37 +03:00
parent 0ee58d1d47
commit 14743c4412
6 changed files with 46 additions and 39 deletions

View File

@@ -1176,10 +1176,12 @@ future<std::tuple<lw_shared_ptr<query::result>, cache_temperature>>
database::query(schema_ptr s, const query::read_command& cmd, query::result_options opts, const dht::partition_range_vector& ranges,
tracing::trace_state_ptr trace_state, uint64_t max_result_size, db::timeout_clock::time_point timeout) {
column_family& cf = find_column_family(cmd.cf_id);
query_class_config class_config{cf.read_concurrency_semaphore(), cf.get_config().max_memory_for_unlimited_query};
query::querier_cache_context cache_ctx(_querier_cache, cmd.query_uuid, cmd.is_first_page);
return _data_query_stage(&cf,
std::move(s),
seastar::cref(cmd),
class_config,
opts,
seastar::cref(ranges),
std::move(trace_state),
@@ -1203,6 +1205,7 @@ future<std::tuple<reconcilable_result, cache_temperature>>
database::query_mutations(schema_ptr s, const query::read_command& cmd, const dht::partition_range& range,
query::result_memory_accounter&& accounter, tracing::trace_state_ptr trace_state, db::timeout_clock::time_point timeout) {
column_family& cf = find_column_family(cmd.cf_id);
query_class_config class_config{cf.read_concurrency_semaphore(), cf.get_config().max_memory_for_unlimited_query};
query::querier_cache_context cache_ctx(_querier_cache, cmd.query_uuid, cmd.is_first_page);
return _mutation_query_stage(std::move(s),
cf.as_mutation_source(),
@@ -1212,7 +1215,7 @@ database::query_mutations(schema_ptr s, const query::read_command& cmd, const dh
cmd.partition_limit,
cmd.timestamp,
timeout,
cf.get_config().max_memory_for_unlimited_query,
class_config,
std::move(accounter),
std::move(trace_state),
std::move(cache_ctx)).then_wrapped([this, s = _stats, hit_rate = cf.get_global_cache_hit_rate(), op = cf.read_in_progress()] (auto f) {

View File

@@ -93,6 +93,7 @@
#include "utils/disk-error-handler.hh"
#include "utils/updateable_value.hh"
#include "user_types_metadata.hh"
#include "query_class_config.hh"
class cell_locker;
class cell_locker_stats;
@@ -798,6 +799,7 @@ public:
// Returns at most "cmd.limit" rows
future<lw_shared_ptr<query::result>> query(schema_ptr,
const query::read_command& cmd,
query_class_config class_config,
query::result_options opts,
const dht::partition_range_vector& ranges,
tracing::trace_state_ptr trace_state,
@@ -1350,6 +1352,7 @@ private:
column_family*,
schema_ptr,
const query::read_command&,
query_class_config,
query::result_options,
const dht::partition_range_vector&,
tracing::trace_state_ptr,

View File

@@ -2167,7 +2167,7 @@ future<> data_query(
gc_clock::time_point query_time,
query::result::builder& builder,
db::timeout_clock::time_point timeout,
uint64_t max_memory_reverse_query,
query_class_config class_config,
tracing::trace_state_ptr trace_ptr,
query::querier_cache_context cache_ctx)
{
@@ -2180,10 +2180,9 @@ future<> data_query(
? std::move(*querier_opt)
: query::data_querier(source, s, range, slice, service::get_local_sstable_query_read_priority(), trace_ptr);
return do_with(std::move(q), [=, &builder, trace_ptr = std::move(trace_ptr),
cache_ctx = std::move(cache_ctx)] (query::data_querier& q) mutable {
return do_with(std::move(q), [=, &builder, trace_ptr = std::move(trace_ptr), cache_ctx = std::move(cache_ctx)] (query::data_querier& q) mutable {
auto qrb = query_result_builder(*s, builder);
return q.consume_page(std::move(qrb), row_limit, partition_limit, query_time, timeout, max_memory_reverse_query).then(
return q.consume_page(std::move(qrb), row_limit, partition_limit, query_time, timeout, class_config.max_memory_for_unlimited_query).then(
[=, &builder, &q, trace_ptr = std::move(trace_ptr), cache_ctx = std::move(cache_ctx)] () mutable {
if (q.are_limits_reached() || builder.is_short_read()) {
cache_ctx.insert(std::move(q), std::move(trace_ptr));
@@ -2262,7 +2261,7 @@ static do_mutation_query(schema_ptr s,
uint32_t partition_limit,
gc_clock::time_point query_time,
db::timeout_clock::time_point timeout,
uint64_t max_memory_reverse_query,
query_class_config class_config,
query::result_memory_accounter&& accounter,
tracing::trace_state_ptr trace_ptr,
query::querier_cache_context cache_ctx)
@@ -2279,7 +2278,7 @@ static do_mutation_query(schema_ptr s,
return do_with(std::move(q), [=, &slice, accounter = std::move(accounter), trace_ptr = std::move(trace_ptr), cache_ctx = std::move(cache_ctx)] (
query::mutation_querier& q) mutable {
auto rrb = reconcilable_result_builder(*s, slice, std::move(accounter));
return q.consume_page(std::move(rrb), row_limit, partition_limit, query_time, timeout, max_memory_reverse_query).then(
return q.consume_page(std::move(rrb), row_limit, partition_limit, query_time, timeout, class_config.max_memory_for_unlimited_query).then(
[=, &q, trace_ptr = std::move(trace_ptr), cache_ctx = std::move(cache_ctx)] (reconcilable_result r) mutable {
if (q.are_limits_reached() || r.is_short_read()) {
cache_ctx.insert(std::move(q), std::move(trace_ptr));
@@ -2302,13 +2301,13 @@ mutation_query(schema_ptr s,
uint32_t partition_limit,
gc_clock::time_point query_time,
db::timeout_clock::time_point timeout,
uint64_t max_memory_reverse_query,
query_class_config class_config,
query::result_memory_accounter&& accounter,
tracing::trace_state_ptr trace_ptr,
query::querier_cache_context cache_ctx)
{
return do_mutation_query(std::move(s), std::move(source), seastar::cref(range), seastar::cref(slice),
row_limit, partition_limit, query_time, timeout, max_memory_reverse_query, std::move(accounter), std::move(trace_ptr), std::move(cache_ctx));
row_limit, partition_limit, query_time, timeout, class_config, std::move(accounter), std::move(trace_ptr), std::move(cache_ctx));
}
deletable_row::deletable_row(clustering_row&& cr)

View File

@@ -28,6 +28,7 @@
#include "db/timeout_clock.hh"
#include "querier.hh"
#include "utils/chunked_vector.hh"
#include "query_class_config.hh"
#include <seastar/core/execution_stage.hh>
class reconcilable_result;
@@ -162,7 +163,7 @@ future<reconcilable_result> mutation_query(
uint32_t partition_limit,
gc_clock::time_point query_time,
db::timeout_clock::time_point timeout,
uint64_t max_memory_reverse_query,
query_class_config class_config,
query::result_memory_accounter&& accounter = { },
tracing::trace_state_ptr trace_ptr = nullptr,
query::querier_cache_context cache_ctx = { });
@@ -177,7 +178,7 @@ future<> data_query(
gc_clock::time_point query_time,
query::result::builder& builder,
db::timeout_clock::time_point timeout,
uint64_t max_memory_reverse_query,
query_class_config class_config,
tracing::trace_state_ptr trace_ptr = nullptr,
query::querier_cache_context cache_ctx = { });
@@ -192,7 +193,7 @@ class mutation_query_stage {
uint32_t,
gc_clock::time_point,
db::timeout_clock::time_point,
uint64_t,
query_class_config,
query::result_memory_accounter&&,
tracing::trace_state_ptr,
query::querier_cache_context> _execution_stage;

View File

@@ -2357,6 +2357,7 @@ struct query_state {
future<lw_shared_ptr<query::result>>
table::query(schema_ptr s,
const query::read_command& cmd,
query_class_config class_config,
query::result_options opts,
const dht::partition_range_vector& partition_ranges,
tracing::trace_state_ptr trace_state,
@@ -2368,14 +2369,14 @@ table::query(schema_ptr s,
_stats.reads.set_latency(lc);
auto f = opts.request == query::result_request::only_digest
? memory_limiter.new_digest_read(max_size) : memory_limiter.new_data_read(max_size);
return f.then([this, lc, s = std::move(s), &cmd, opts, &partition_ranges,
return f.then([this, lc, s = std::move(s), &cmd, class_config, opts, &partition_ranges,
trace_state = std::move(trace_state), timeout, cache_ctx = std::move(cache_ctx)] (query::result_memory_accounter accounter) mutable {
auto qs_ptr = std::make_unique<query_state>(std::move(s), cmd, opts, partition_ranges, std::move(accounter));
auto& qs = *qs_ptr;
return do_until(std::bind(&query_state::done, &qs), [this, &qs, trace_state = std::move(trace_state), timeout, cache_ctx = std::move(cache_ctx)] {
return do_until(std::bind(&query_state::done, &qs), [this, &qs, class_config, trace_state = std::move(trace_state), timeout, cache_ctx = std::move(cache_ctx)] {
auto&& range = *qs.current_partition_range++;
return data_query(qs.schema, as_mutation_source(), range, qs.cmd.slice, qs.remaining_rows(),
qs.remaining_partitions(), qs.cmd.timestamp, qs.builder, timeout, _config.max_memory_for_unlimited_query, trace_state, cache_ctx);
qs.remaining_partitions(), qs.cmd.timestamp, qs.builder, timeout, class_config, trace_state, cache_ctx);
}).then([qs_ptr = std::move(qs_ptr), &qs] {
return make_ready_future<lw_shared_ptr<query::result>>(
make_lw_shared<query::result>(qs.builder.build()));

View File

@@ -34,6 +34,7 @@
#include "test/lib/mutation_assertions.hh"
#include "test/lib/result_set_assertions.hh"
#include "test/lib/mutation_source_test.hh"
#include "test/lib/reader_permit.hh"
#include "mutation_query.hh"
#include <seastar/core/do_with.hh>
@@ -76,7 +77,6 @@ static query::partition_slice make_full_slice(const schema& s) {
}
static auto inf32 = std::numeric_limits<unsigned>::max();
static const uint64_t max_memory_for_reverse_query = 1 << 20;
query::result_set to_result_set(const reconcilable_result& r, schema_ptr s, const query::partition_slice& slice) {
return query::result_set::from_raw_result(s, slice, to_data_query_result(r, s, slice, inf32, inf32));
@@ -101,7 +101,7 @@ SEASTAR_TEST_CASE(test_reading_from_single_partition) {
auto slice = make_full_slice(*s);
reconcilable_result result = mutation_query(s, src,
query::full_partition_range, slice, 2, query::max_partitions, now, db::no_timeout, max_memory_for_reverse_query).get0();
query::full_partition_range, slice, 2, query::max_partitions, now, db::no_timeout, tests::make_query_class_config()).get0();
// FIXME: use mutation assertions
assert_that(to_result_set(result, s, slice))
@@ -124,7 +124,7 @@ SEASTAR_TEST_CASE(test_reading_from_single_partition) {
.build();
reconcilable_result result = mutation_query(s, src,
query::full_partition_range, slice, query::max_rows, query::max_partitions, now, db::no_timeout, max_memory_for_reverse_query).get0();
query::full_partition_range, slice, query::max_rows, query::max_partitions, now, db::no_timeout, tests::make_query_class_config()).get0();
assert_that(to_result_set(result, s, slice))
.has_only(a_row()
@@ -160,7 +160,7 @@ SEASTAR_TEST_CASE(test_cells_are_expired_according_to_query_timestamp) {
auto slice = make_full_slice(*s);
reconcilable_result result = mutation_query(s, src,
query::full_partition_range, slice, 1, query::max_partitions, now, db::no_timeout, max_memory_for_reverse_query).get0();
query::full_partition_range, slice, 1, query::max_partitions, now, db::no_timeout, tests::make_query_class_config()).get0();
assert_that(to_result_set(result, s, slice))
.has_only(a_row()
@@ -174,7 +174,7 @@ SEASTAR_TEST_CASE(test_cells_are_expired_according_to_query_timestamp) {
auto slice = make_full_slice(*s);
reconcilable_result result = mutation_query(s, src,
query::full_partition_range, slice, 1, query::max_partitions, now + 2s, db::no_timeout, max_memory_for_reverse_query).get0();
query::full_partition_range, slice, 1, query::max_partitions, now + 2s, db::no_timeout, tests::make_query_class_config()).get0();
assert_that(to_result_set(result, s, slice))
.has_only(a_row()
@@ -207,7 +207,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) {
.build();
reconcilable_result result = mutation_query(s, src,
query::full_partition_range, slice, 3, query::max_partitions, now, db::no_timeout, max_memory_for_reverse_query).get0();
query::full_partition_range, slice, 3, query::max_partitions, now, db::no_timeout, tests::make_query_class_config()).get0();
assert_that(to_result_set(result, s, slice))
.has_size(3)
@@ -237,7 +237,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) {
.build();
reconcilable_result result = mutation_query(s, src,
query::full_partition_range, slice, 3, query::max_partitions, now, db::no_timeout, max_memory_for_reverse_query).get0();
query::full_partition_range, slice, 3, query::max_partitions, now, db::no_timeout, tests::make_query_class_config()).get0();
assert_that(to_result_set(result, s, slice))
.has_size(3)
@@ -265,7 +265,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) {
{
reconcilable_result result = mutation_query(s, src,
query::full_partition_range, slice, 10, query::max_partitions, now, db::no_timeout, max_memory_for_reverse_query).get0();
query::full_partition_range, slice, 10, query::max_partitions, now, db::no_timeout, tests::make_query_class_config()).get0();
assert_that(to_result_set(result, s, slice))
.has_size(3)
@@ -285,7 +285,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) {
{
reconcilable_result result = mutation_query(s, src,
query::full_partition_range, slice, 1, query::max_partitions, now, db::no_timeout, max_memory_for_reverse_query).get0();
query::full_partition_range, slice, 1, query::max_partitions, now, db::no_timeout, tests::make_query_class_config()).get0();
assert_that(to_result_set(result, s, slice))
.has_size(1)
@@ -297,7 +297,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) {
{
reconcilable_result result = mutation_query(s, src,
query::full_partition_range, slice, 2, query::max_partitions, now, db::no_timeout, max_memory_for_reverse_query).get0();
query::full_partition_range, slice, 2, query::max_partitions, now, db::no_timeout, tests::make_query_class_config()).get0();
assert_that(to_result_set(result, s, slice))
.has_size(2)
@@ -324,7 +324,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) {
.build();
reconcilable_result result = mutation_query(s, src,
query::full_partition_range, slice, 2, query::max_partitions, now, db::no_timeout, max_memory_for_reverse_query).get0();
query::full_partition_range, slice, 2, query::max_partitions, now, db::no_timeout, tests::make_query_class_config()).get0();
assert_that(to_result_set(result, s, slice))
.has_size(2)
@@ -348,7 +348,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) {
.build();
reconcilable_result result = mutation_query(s, src,
query::full_partition_range, slice, 3, query::max_partitions, now, db::no_timeout, max_memory_for_reverse_query).get0();
query::full_partition_range, slice, 3, query::max_partitions, now, db::no_timeout, tests::make_query_class_config()).get0();
assert_that(to_result_set(result, s, slice))
.has_size(2)
@@ -370,7 +370,7 @@ SEASTAR_TEST_CASE(test_reverse_ordering_is_respected) {
.build();
reconcilable_result result = mutation_query(s, src,
query::full_partition_range, slice, 3, query::max_partitions, now, db::no_timeout, max_memory_for_reverse_query).get0();
query::full_partition_range, slice, 3, query::max_partitions, now, db::no_timeout, tests::make_query_class_config()).get0();
assert_that(to_result_set(result, s, slice))
.has_only(a_row()
@@ -396,7 +396,7 @@ SEASTAR_TEST_CASE(test_query_when_partition_tombstone_covers_live_cells) {
auto slice = make_full_slice(*s);
reconcilable_result result = mutation_query(s, src,
query::full_partition_range, slice, query::max_rows, query::max_partitions, now, db::no_timeout, max_memory_for_reverse_query).get0();
query::full_partition_range, slice, query::max_rows, query::max_partitions, now, db::no_timeout, tests::make_query_class_config()).get0();
assert_that(to_result_set(result, s, slice))
.is_empty();
@@ -447,7 +447,7 @@ SEASTAR_TEST_CASE(test_partitions_with_only_expired_tombstones_are_dropped) {
auto query_time = now + std::chrono::seconds(1);
reconcilable_result result = mutation_query(s, src, query::full_partition_range, slice, query::max_rows, query::max_partitions, query_time,
db::no_timeout, max_memory_for_reverse_query).get0();
db::no_timeout, tests::make_query_class_config()).get0();
BOOST_REQUIRE_EQUAL(result.partitions().size(), 2);
BOOST_REQUIRE_EQUAL(result.row_count(), 2);
@@ -466,28 +466,28 @@ SEASTAR_TEST_CASE(test_result_row_count) {
auto src = make_source({m1});
auto r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, query::max_partitions, now,
db::no_timeout, max_memory_for_reverse_query).get0(), s, slice, inf32, inf32);
db::no_timeout, tests::make_query_class_config()).get0(), s, slice, inf32, inf32);
BOOST_REQUIRE_EQUAL(r.row_count().value(), 0);
m1.set_static_cell("s1", data_value(bytes("S_v1")), 1);
r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, query::max_partitions, now,
db::no_timeout, max_memory_for_reverse_query).get0(), s, slice, inf32, inf32);
db::no_timeout, tests::make_query_class_config()).get0(), s, slice, inf32, inf32);
BOOST_REQUIRE_EQUAL(r.row_count().value(), 1);
m1.set_clustered_cell(clustering_key::from_single_value(*s, bytes("A")), "v1", data_value(bytes("A_v1")), 1);
r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, query::max_partitions, now,
db::no_timeout, max_memory_for_reverse_query).get0(), s, slice, inf32, inf32);
db::no_timeout, tests::make_query_class_config()).get0(), s, slice, inf32, inf32);
BOOST_REQUIRE_EQUAL(r.row_count().value(), 1);
m1.set_clustered_cell(clustering_key::from_single_value(*s, bytes("B")), "v1", data_value(bytes("B_v1")), 1);
r = to_data_query_result(mutation_query(s, make_source({m1}), query::full_partition_range, slice, 10000, query::max_partitions, now,
db::no_timeout, max_memory_for_reverse_query).get0(), s, slice, inf32, inf32);
db::no_timeout, tests::make_query_class_config()).get0(), s, slice, inf32, inf32);
BOOST_REQUIRE_EQUAL(r.row_count().value(), 2);
mutation m2(s, partition_key::from_single_value(*s, "key2"));
m2.set_static_cell("s1", data_value(bytes("S_v1")), 1);
r = to_data_query_result(mutation_query(s, make_source({m1, m2}), query::full_partition_range, slice, 10000, query::max_partitions, now,
db::no_timeout, max_memory_for_reverse_query).get0(), s, slice, inf32, inf32);
db::no_timeout, tests::make_query_class_config()).get0(), s, slice, inf32, inf32);
BOOST_REQUIRE_EQUAL(r.row_count().value(), 3);
});
}
@@ -510,7 +510,7 @@ SEASTAR_TEST_CASE(test_partition_limit) {
{
reconcilable_result result = mutation_query(s, src,
query::full_partition_range, slice, query::max_rows, 10, now, db::no_timeout, max_memory_for_reverse_query).get0();
query::full_partition_range, slice, query::max_rows, 10, now, db::no_timeout, tests::make_query_class_config()).get0();
assert_that(to_result_set(result, s, slice))
.has_size(2)
@@ -526,7 +526,7 @@ SEASTAR_TEST_CASE(test_partition_limit) {
{
reconcilable_result result = mutation_query(s, src,
query::full_partition_range, slice, query::max_rows, 1, now, db::no_timeout, max_memory_for_reverse_query).get0();
query::full_partition_range, slice, query::max_rows, 1, now, db::no_timeout, tests::make_query_class_config()).get0();
assert_that(to_result_set(result, s, slice))
.has_size(1)
@@ -549,11 +549,11 @@ SEASTAR_THREAD_TEST_CASE(test_result_size_calculation) {
query::result::builder digest_only_builder(slice, query::result_options{query::result_request::only_digest, query::digest_algorithm::xxHash}, l.new_digest_read(query::result_memory_limiter::maximum_result_size).get0());
data_query(s, source, query::full_partition_range, slice, std::numeric_limits<uint32_t>::max(), std::numeric_limits<uint32_t>::max(),
gc_clock::now(), digest_only_builder, db::no_timeout, max_memory_for_reverse_query).get0();
gc_clock::now(), digest_only_builder, db::no_timeout, tests::make_query_class_config()).get0();
query::result::builder result_and_digest_builder(slice, query::result_options{query::result_request::result_and_digest, query::digest_algorithm::xxHash}, l.new_data_read(query::result_memory_limiter::maximum_result_size).get0());
data_query(s, source, query::full_partition_range, slice, std::numeric_limits<uint32_t>::max(), std::numeric_limits<uint32_t>::max(),
gc_clock::now(), result_and_digest_builder, db::no_timeout, max_memory_for_reverse_query).get0();
gc_clock::now(), result_and_digest_builder, db::no_timeout, tests::make_query_class_config()).get0();
BOOST_REQUIRE_EQUAL(digest_only_builder.memory_accounter().used_memory(), result_and_digest_builder.memory_accounter().used_memory());
}