query: result_memory_limiter: use the new max_result_size type
This commit is contained in:
@@ -1207,7 +1207,7 @@ database::query(schema_ptr s, const query::read_command& cmd, query::result_opti
|
||||
future<std::tuple<reconcilable_result, cache_temperature>>
|
||||
database::query_mutations(schema_ptr s, const query::read_command& cmd, const dht::partition_range& range,
|
||||
size_t result_max_size, tracing::trace_state_ptr trace_state, db::timeout_clock::time_point timeout) {
|
||||
return get_result_memory_limiter().new_mutation_read(result_max_size).then(
|
||||
return get_result_memory_limiter().new_mutation_read(query::max_result_size(result_max_size)).then(
|
||||
[&, s = std::move(s), trace_state = std::move(trace_state), timeout] (query::result_memory_accounter accounter) {
|
||||
column_family& cf = find_column_family(cmd.cf_id);
|
||||
query::querier_cache_context cache_ctx(_querier_cache, cmd.query_uuid, cmd.is_first_page);
|
||||
|
||||
@@ -668,7 +668,7 @@ future<std::tuple<foreign_ptr<lw_shared_ptr<reconcilable_result>>, cache_tempera
|
||||
db.local().find_column_family(s).get_global_cache_hit_rate()));
|
||||
}
|
||||
|
||||
return db.local().get_result_memory_limiter().new_mutation_read(max_size).then([&, s = std::move(s), trace_state = std::move(trace_state),
|
||||
return db.local().get_result_memory_limiter().new_mutation_read(query::max_result_size(max_size)).then([&, s = std::move(s), trace_state = std::move(trace_state),
|
||||
timeout] (query::result_memory_accounter accounter) mutable {
|
||||
return do_query_mutations(db, s, cmd, ranges, std::move(trace_state), timeout, std::move(accounter)).then_wrapped(
|
||||
[&db, s = std::move(s)] (future<reconcilable_result>&& f) {
|
||||
|
||||
@@ -68,18 +68,18 @@ public:
|
||||
// Reserves minimum_result_size and creates new memory accounter for
|
||||
// mutation query. Uses the specified maximum result size and may be
|
||||
// stopped before reaching it due to memory pressure on shard.
|
||||
future<result_memory_accounter> new_mutation_read(size_t max_result_size);
|
||||
future<result_memory_accounter> new_mutation_read(query::max_result_size max_result_size);
|
||||
|
||||
// Reserves minimum_result_size and creates new memory accounter for
|
||||
// data query. Uses the specified maximum result size, result will *not*
|
||||
// be stopped due to on shard memory pressure in order to avoid digest
|
||||
// mismatches.
|
||||
future<result_memory_accounter> new_data_read(size_t max_result_size);
|
||||
future<result_memory_accounter> new_data_read(query::max_result_size max_result_size);
|
||||
|
||||
// Creates a memory accounter for digest reads. Such accounter doesn't
|
||||
// contribute to the shard memory usage, but still stops producing the
|
||||
// result after individual limit has been reached.
|
||||
future<result_memory_accounter> new_digest_read(size_t max_result_size);
|
||||
future<result_memory_accounter> new_digest_read(query::max_result_size max_result_size);
|
||||
|
||||
// Checks whether the result can grow any more, takes into account only
|
||||
// the per shard limit.
|
||||
@@ -119,13 +119,13 @@ class result_memory_accounter {
|
||||
size_t _blocked_bytes = 0;
|
||||
size_t _used_memory = 0;
|
||||
size_t _total_used_memory = 0;
|
||||
size_t _maximum_result_size = 0;
|
||||
query::max_result_size _maximum_result_size;
|
||||
stop_iteration _stop_on_global_limit;
|
||||
private:
|
||||
// Mutation query accounter. Uses provided individual result size limit and
|
||||
// will stop when shard memory pressure grows too high.
|
||||
struct mutation_query_tag { };
|
||||
explicit result_memory_accounter(mutation_query_tag, result_memory_limiter& limiter, size_t max_size) noexcept
|
||||
explicit result_memory_accounter(mutation_query_tag, result_memory_limiter& limiter, query::max_result_size max_size) noexcept
|
||||
: _limiter(&limiter)
|
||||
, _blocked_bytes(result_memory_limiter::minimum_result_size)
|
||||
, _maximum_result_size(max_size)
|
||||
@@ -135,7 +135,7 @@ private:
|
||||
// Data query accounter. Uses provided individual result size limit and
|
||||
// will *not* stop even though shard memory pressure grows too high.
|
||||
struct data_query_tag { };
|
||||
explicit result_memory_accounter(data_query_tag, result_memory_limiter& limiter, size_t max_size) noexcept
|
||||
explicit result_memory_accounter(data_query_tag, result_memory_limiter& limiter, query::max_result_size max_size) noexcept
|
||||
: _limiter(&limiter)
|
||||
, _blocked_bytes(result_memory_limiter::minimum_result_size)
|
||||
, _maximum_result_size(max_size)
|
||||
@@ -145,7 +145,7 @@ private:
|
||||
// will *not* stop even though shard memory pressure grows too high. This
|
||||
// accounter does not contribute to the shard memory limits.
|
||||
struct digest_query_tag { };
|
||||
explicit result_memory_accounter(digest_query_tag, result_memory_limiter&, size_t max_size) noexcept
|
||||
explicit result_memory_accounter(digest_query_tag, result_memory_limiter&, query::max_result_size max_size) noexcept
|
||||
: _blocked_bytes(0)
|
||||
, _maximum_result_size(max_size)
|
||||
{ }
|
||||
@@ -185,7 +185,7 @@ public:
|
||||
stop_iteration update_and_check(size_t n) {
|
||||
_used_memory += n;
|
||||
_total_used_memory += n;
|
||||
auto stop = stop_iteration(_total_used_memory > _maximum_result_size);
|
||||
auto stop = stop_iteration(_total_used_memory > _maximum_result_size.hard_limit);
|
||||
if (_limiter && _used_memory > _blocked_bytes) {
|
||||
auto to_block = std::min(_used_memory - _blocked_bytes, n);
|
||||
_blocked_bytes += to_block;
|
||||
@@ -196,7 +196,7 @@ public:
|
||||
|
||||
// Checks whether the result can grow any more.
|
||||
stop_iteration check() const {
|
||||
stop_iteration stop { _total_used_memory > _maximum_result_size };
|
||||
stop_iteration stop { _total_used_memory > _maximum_result_size.hard_limit };
|
||||
if (!stop && _used_memory >= _blocked_bytes && _limiter) {
|
||||
return _limiter->check() && _stop_on_global_limit;
|
||||
}
|
||||
@@ -217,19 +217,19 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
inline future<result_memory_accounter> result_memory_limiter::new_mutation_read(size_t max_size) {
|
||||
inline future<result_memory_accounter> result_memory_limiter::new_mutation_read(query::max_result_size max_size) {
|
||||
return _memory_limiter.wait(minimum_result_size).then([this, max_size] {
|
||||
return result_memory_accounter(result_memory_accounter::mutation_query_tag(), *this, max_size);
|
||||
});
|
||||
}
|
||||
|
||||
inline future<result_memory_accounter> result_memory_limiter::new_data_read(size_t max_size) {
|
||||
inline future<result_memory_accounter> result_memory_limiter::new_data_read(query::max_result_size max_size) {
|
||||
return _memory_limiter.wait(minimum_result_size).then([this, max_size] {
|
||||
return result_memory_accounter(result_memory_accounter::data_query_tag(), *this, max_size);
|
||||
});
|
||||
}
|
||||
|
||||
inline future<result_memory_accounter> result_memory_limiter::new_digest_read(size_t max_size) {
|
||||
inline future<result_memory_accounter> result_memory_limiter::new_digest_read(query::max_result_size max_size) {
|
||||
return make_ready_future<result_memory_accounter>(result_memory_accounter(result_memory_accounter::digest_query_tag(), *this, max_size));
|
||||
}
|
||||
|
||||
|
||||
2
table.cc
2
table.cc
@@ -2043,7 +2043,7 @@ table::query(schema_ptr s,
|
||||
utils::latency_counter lc;
|
||||
_stats.reads.set_latency(lc);
|
||||
auto f = opts.request == query::result_request::only_digest
|
||||
? memory_limiter.new_digest_read(max_size) : memory_limiter.new_data_read(max_size);
|
||||
? memory_limiter.new_digest_read(query::max_result_size(max_size)) : memory_limiter.new_data_read(query::max_result_size(max_size));
|
||||
return f.then([this, lc, s = std::move(s), &cmd, class_config, opts, &partition_ranges,
|
||||
trace_state = std::move(trace_state), timeout, cache_ctx = std::move(cache_ctx)] (query::result_memory_accounter accounter) mutable {
|
||||
auto qs_ptr = std::make_unique<query_state>(std::move(s), cmd, opts, partition_ranges, std::move(accounter));
|
||||
|
||||
@@ -547,11 +547,13 @@ SEASTAR_THREAD_TEST_CASE(test_result_size_calculation) {
|
||||
query::partition_slice slice = make_full_slice(*s);
|
||||
slice.options.set<query::partition_slice::option::allow_short_read>();
|
||||
|
||||
query::result::builder digest_only_builder(slice, query::result_options{query::result_request::only_digest, query::digest_algorithm::xxHash}, l.new_digest_read(query::result_memory_limiter::maximum_result_size).get0());
|
||||
query::result::builder digest_only_builder(slice, query::result_options{query::result_request::only_digest, query::digest_algorithm::xxHash},
|
||||
l.new_digest_read(query::max_result_size(query::result_memory_limiter::maximum_result_size)).get0());
|
||||
data_query(s, source, query::full_partition_range, slice, std::numeric_limits<uint32_t>::max(), std::numeric_limits<uint32_t>::max(),
|
||||
gc_clock::now(), digest_only_builder, db::no_timeout, tests::make_query_class_config()).get0();
|
||||
|
||||
query::result::builder result_and_digest_builder(slice, query::result_options{query::result_request::result_and_digest, query::digest_algorithm::xxHash}, l.new_data_read(query::result_memory_limiter::maximum_result_size).get0());
|
||||
query::result::builder result_and_digest_builder(slice, query::result_options{query::result_request::result_and_digest, query::digest_algorithm::xxHash},
|
||||
l.new_data_read(query::max_result_size(query::result_memory_limiter::maximum_result_size)).get0());
|
||||
data_query(s, source, query::full_partition_range, slice, std::numeric_limits<uint32_t>::max(), std::numeric_limits<uint32_t>::max(),
|
||||
gc_clock::now(), result_and_digest_builder, db::no_timeout, tests::make_query_class_config()).get0();
|
||||
|
||||
|
||||
Reference in New Issue
Block a user