query: query_class_config: use max_result_size for the max_memory_for_unlimited_query field
We want to switch from using a single limit to a dual soft/hard limit. As a first step we switch the limit field of `query_class_config` to use the recently introduced type for this. As this field has a single user at the moment -- reverse queries (and not a lot of propagation) -- we update it in this same patch to use the soft/hard limit: warn on reaching the soft limit and abort on the hard limit (the previous behaviour).
This commit is contained in:
@@ -1282,13 +1282,14 @@ void database::register_connection_drop_notifier(netw::messaging_service& ms) {
|
||||
query::query_class_config database::make_query_class_config() {
|
||||
// Everything running in the statement group is considered a user query
|
||||
if (current_scheduling_group() == _dbcfg.statement_scheduling_group) {
|
||||
return query::query_class_config{_read_concurrency_sem, _cfg.max_memory_for_unlimited_query_hard_limit()};
|
||||
return query::query_class_config{_read_concurrency_sem,
|
||||
query::max_result_size(_cfg.max_memory_for_unlimited_query_soft_limit(), _cfg.max_memory_for_unlimited_query_hard_limit())};
|
||||
// Reads done on behalf of view update generation run in the streaming group
|
||||
} else if (current_scheduling_group() == _dbcfg.streaming_scheduling_group) {
|
||||
return query::query_class_config{_streaming_concurrency_sem, std::numeric_limits<uint64_t>::max()};
|
||||
return query::query_class_config{_streaming_concurrency_sem, query::max_result_size(std::numeric_limits<uint64_t>::max())};
|
||||
// Everything else is considered a system query
|
||||
} else {
|
||||
return query::query_class_config{_system_read_concurrency_sem, std::numeric_limits<uint64_t>::max()};
|
||||
return query::query_class_config{_system_read_concurrency_sem, query::max_result_size(std::numeric_limits<uint64_t>::max())};
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -60,14 +60,15 @@ void flat_mutation_reader::impl::clear_buffer_to_next_partition() {
|
||||
_buffer_size = compute_buffer_size(*_schema, _buffer);
|
||||
}
|
||||
|
||||
flat_mutation_reader make_reversing_reader(flat_mutation_reader& original, size_t max_memory_consumption) {
|
||||
flat_mutation_reader make_reversing_reader(flat_mutation_reader& original, query::max_result_size max_size) {
|
||||
class partition_reversing_mutation_reader final : public flat_mutation_reader::impl {
|
||||
flat_mutation_reader* _source;
|
||||
range_tombstone_list _range_tombstones;
|
||||
std::stack<mutation_fragment> _mutation_fragments;
|
||||
mutation_fragment_opt _partition_end;
|
||||
size_t _stack_size = 0;
|
||||
const size_t _max_stack_size;
|
||||
const query::max_result_size _max_size;
|
||||
bool _below_soft_limit = true;
|
||||
private:
|
||||
stop_iteration emit_partition() {
|
||||
auto emit_range_tombstone = [&] {
|
||||
@@ -119,7 +120,7 @@ flat_mutation_reader make_reversing_reader(flat_mutation_reader& original, size_
|
||||
} else {
|
||||
_mutation_fragments.emplace(std::move(mf));
|
||||
_stack_size += _mutation_fragments.top().memory_usage(*_schema);
|
||||
if (_stack_size >= _max_stack_size) {
|
||||
if (_stack_size > _max_size.hard_limit || (_stack_size > _max_size.soft_limit && _below_soft_limit)) {
|
||||
const partition_key* key = nullptr;
|
||||
auto it = buffer().end();
|
||||
--it;
|
||||
@@ -129,21 +130,30 @@ flat_mutation_reader make_reversing_reader(flat_mutation_reader& original, size_
|
||||
--it;
|
||||
key = &it->as_partition_start().key().key();
|
||||
}
|
||||
throw std::runtime_error(fmt::format(
|
||||
"Aborting reverse partition read because partition {} is larger than the maximum safe size of {} for reversible partitions.",
|
||||
key->with_schema(*_schema),
|
||||
_max_stack_size));
|
||||
|
||||
if (_stack_size > _max_size.hard_limit) {
|
||||
throw std::runtime_error(fmt::format(
|
||||
"Memory usage of reversed read exceeds hard limit of {} (configured via max_memory_for_unlimited_query_hard_limit), while reading partition {}",
|
||||
_max_size.hard_limit,
|
||||
key->with_schema(*_schema)));
|
||||
} else {
|
||||
fmr_logger.warn(
|
||||
"Memory usage of reversed read exceeds soft limit of {} (configured via max_memory_for_unlimited_query_soft_limit), while reading partition {}",
|
||||
_max_size.soft_limit,
|
||||
key->with_schema(*_schema));
|
||||
_below_soft_limit = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return make_ready_future<stop_iteration>(is_buffer_full());
|
||||
}
|
||||
public:
|
||||
explicit partition_reversing_mutation_reader(flat_mutation_reader& mr, size_t max_stack_size)
|
||||
explicit partition_reversing_mutation_reader(flat_mutation_reader& mr, query::max_result_size max_size)
|
||||
: flat_mutation_reader::impl(mr.schema())
|
||||
, _source(&mr)
|
||||
, _range_tombstones(*_schema)
|
||||
, _max_stack_size(max_stack_size)
|
||||
, _max_size(max_size)
|
||||
{ }
|
||||
|
||||
virtual future<> fill_buffer(db::timeout_clock::time_point timeout) override {
|
||||
@@ -185,7 +195,7 @@ flat_mutation_reader make_reversing_reader(flat_mutation_reader& original, size_
|
||||
}
|
||||
};
|
||||
|
||||
return make_flat_mutation_reader<partition_reversing_mutation_reader>(original, max_memory_consumption);
|
||||
return make_flat_mutation_reader<partition_reversing_mutation_reader>(original, max_size);
|
||||
}
|
||||
|
||||
template<typename Source>
|
||||
|
||||
@@ -29,6 +29,7 @@
|
||||
#include "mutation_fragment.hh"
|
||||
#include "tracing/trace_state.hh"
|
||||
#include "mutation.hh"
|
||||
#include "query_class_config.hh"
|
||||
|
||||
#include <seastar/core/thread.hh>
|
||||
#include <seastar/core/file.hh>
|
||||
@@ -720,15 +721,17 @@ make_generating_reader(schema_ptr s, std::function<future<mutation_fragment_opt>
|
||||
///
|
||||
/// \param original the reader to be reversed, has to be kept alive while the
|
||||
/// reversing reader is in use.
|
||||
/// \param max_memory_consumption the maximum amount of memory the reader is
|
||||
/// allowed to use for reversing. The reverse reader reads entire partitions
|
||||
/// into memory, before reversing them. Since partitions can be larger than
|
||||
/// the available memory, we need to enforce a limit on memory consumption.
|
||||
/// If the read uses more memory then this limit, the read is aborted.
|
||||
/// \param max_size the maximum amount of memory the reader is allowed to use
|
||||
/// for reversing and conversely the maximum size of the results. The
|
||||
/// reverse reader reads entire partitions into memory, before reversing
|
||||
/// them. Since partitions can be larger than the available memory, we need
|
||||
/// to enforce a limit on memory consumption. When reaching the soft limit
|
||||
/// a warning will be logged. When reaching the hard limit the read will be
|
||||
/// aborted.
|
||||
///
|
||||
/// FIXME: reversing should be done in the sstable layer, see #1413.
|
||||
flat_mutation_reader
|
||||
make_reversing_reader(flat_mutation_reader& original, size_t max_memory_consumption);
|
||||
make_reversing_reader(flat_mutation_reader& original, query::max_result_size max_size);
|
||||
|
||||
/// Low level fragment stream validator.
|
||||
///
|
||||
|
||||
10
querier.hh
10
querier.hh
@@ -83,7 +83,7 @@ auto consume_page(flat_mutation_reader& reader,
|
||||
uint32_t partition_limit,
|
||||
gc_clock::time_point query_time,
|
||||
db::timeout_clock::time_point timeout,
|
||||
size_t reverse_read_max_memory) {
|
||||
query::max_result_size max_size) {
|
||||
return reader.peek(timeout).then([=, &reader, consumer = std::move(consumer), &slice] (
|
||||
mutation_fragment* next_fragment) mutable {
|
||||
const auto next_fragment_kind = next_fragment ? next_fragment->mutation_fragment_kind() : mutation_fragment::kind::partition_end;
|
||||
@@ -94,9 +94,9 @@ auto consume_page(flat_mutation_reader& reader,
|
||||
compaction_state,
|
||||
clustering_position_tracker(std::move(consumer), last_ckey));
|
||||
|
||||
auto consume = [&reader, &slice, reader_consumer = std::move(reader_consumer), timeout, reverse_read_max_memory] () mutable {
|
||||
auto consume = [&reader, &slice, reader_consumer = std::move(reader_consumer), timeout, max_size] () mutable {
|
||||
if (slice.options.contains(query::partition_slice::option::reversed)) {
|
||||
return do_with(make_reversing_reader(reader, reverse_read_max_memory),
|
||||
return do_with(make_reversing_reader(reader, max_size),
|
||||
[reader_consumer = std::move(reader_consumer), timeout] (flat_mutation_reader& reversing_reader) mutable {
|
||||
return reversing_reader.consume(std::move(reader_consumer), timeout);
|
||||
});
|
||||
@@ -223,9 +223,9 @@ public:
|
||||
uint32_t partition_limit,
|
||||
gc_clock::time_point query_time,
|
||||
db::timeout_clock::time_point timeout,
|
||||
size_t reverse_read_max_memory) {
|
||||
query::max_result_size max_size) {
|
||||
return ::query::consume_page(_reader, _compaction_state, *_slice, std::move(consumer), row_limit, partition_limit, query_time,
|
||||
timeout, reverse_read_max_memory).then([this] (auto&& results) {
|
||||
timeout, max_size).then([this] (auto&& results) {
|
||||
_last_ckey = std::get<std::optional<clustering_key>>(std::move(results));
|
||||
constexpr auto size = std::tuple_size<std::decay_t<decltype(results)>>::value;
|
||||
static_assert(size <= 2);
|
||||
|
||||
@@ -38,7 +38,7 @@ struct max_result_size {
|
||||
|
||||
struct query_class_config {
|
||||
reader_concurrency_semaphore& semaphore;
|
||||
uint64_t max_memory_for_unlimited_query;
|
||||
max_result_size max_memory_for_unlimited_query;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -592,7 +592,7 @@ void test_flat_stream(schema_ptr s, std::vector<mutation> muts, reversed_partiti
|
||||
return fmr.consume_in_thread(std::move(fsc), db::no_timeout);
|
||||
} else {
|
||||
if (reversed) {
|
||||
auto reverse_reader = make_reversing_reader(fmr, size_t(1) << 20);
|
||||
auto reverse_reader = make_reversing_reader(fmr, query::max_result_size(size_t(1) << 20));
|
||||
return reverse_reader.consume(std::move(fsc), db::no_timeout).get0();
|
||||
}
|
||||
return fmr.consume(std::move(fsc), db::no_timeout).get0();
|
||||
@@ -805,7 +805,8 @@ SEASTAR_THREAD_TEST_CASE(test_reverse_reader_memory_limit) {
|
||||
|
||||
auto test_with_partition = [&] (bool with_static_row) {
|
||||
testlog.info("Testing with_static_row={}", with_static_row);
|
||||
auto mut = schema.new_mutation("pk1");
|
||||
const auto pk = "pk1";
|
||||
auto mut = schema.new_mutation(pk);
|
||||
const size_t desired_mut_size = 1 * 1024 * 1024;
|
||||
const size_t row_size = 10 * 1024;
|
||||
|
||||
@@ -817,8 +818,9 @@ SEASTAR_THREAD_TEST_CASE(test_reverse_reader_memory_limit) {
|
||||
schema.add_row(mut, schema.make_ckey(++i), sstring(row_size, '0'));
|
||||
}
|
||||
|
||||
const uint64_t hard_limit = size_t(1) << 18;
|
||||
auto reader = flat_mutation_reader_from_mutations({mut});
|
||||
auto reverse_reader = make_reversing_reader(reader, size_t(1) << 10);
|
||||
auto reverse_reader = make_reversing_reader(reader, query::max_result_size(size_t(1) << 10, hard_limit));
|
||||
|
||||
try {
|
||||
reverse_reader.consume(phony_consumer{}, db::no_timeout).get();
|
||||
@@ -826,7 +828,12 @@ SEASTAR_THREAD_TEST_CASE(test_reverse_reader_memory_limit) {
|
||||
} catch (const std::runtime_error& e) {
|
||||
testlog.info("Got exception with message: {}", e.what());
|
||||
auto str = sstring(e.what());
|
||||
BOOST_REQUIRE_EQUAL(str.find("Aborting reverse partition read because partition pk1"), 0);
|
||||
const auto expected_str = format(
|
||||
"Memory usage of reversed read exceeds hard limit of {} (configured via max_memory_for_unlimited_query_hard_limit), while reading partition {}",
|
||||
hard_limit,
|
||||
pk);
|
||||
|
||||
BOOST_REQUIRE_EQUAL(str.find(expected_str), 0);
|
||||
} catch (...) {
|
||||
throw;
|
||||
}
|
||||
|
||||
@@ -214,7 +214,7 @@ public:
|
||||
|
||||
auto querier = make_querier<Querier>(range);
|
||||
auto [dk, ck] = querier.consume_page(dummy_result_builder{}, row_limit, std::numeric_limits<uint32_t>::max(),
|
||||
gc_clock::now(), db::no_timeout, std::numeric_limits<uint64_t>::max()).get0();
|
||||
gc_clock::now(), db::no_timeout, query::max_result_size(std::numeric_limits<uint64_t>::max())).get0();
|
||||
const auto memory_usage = querier.memory_usage();
|
||||
_cache.insert(cache_key, std::move(querier), nullptr);
|
||||
|
||||
|
||||
@@ -34,7 +34,7 @@ reader_permit make_permit() {
|
||||
}
|
||||
|
||||
query::query_class_config make_query_class_config() {
|
||||
return query::query_class_config{the_semaphore, std::numeric_limits<uint64_t>::max()};
|
||||
return query::query_class_config{the_semaphore, query::max_result_size(std::numeric_limits<uint64_t>::max())};
|
||||
}
|
||||
|
||||
} // namespace tests
|
||||
|
||||
Reference in New Issue
Block a user