diff --git a/database.cc b/database.cc index 1471f6ae70..24e937896b 100644 --- a/database.cc +++ b/database.cc @@ -1282,13 +1282,14 @@ void database::register_connection_drop_notifier(netw::messaging_service& ms) { query::query_class_config database::make_query_class_config() { // Everything running in the statement group is considered a user query if (current_scheduling_group() == _dbcfg.statement_scheduling_group) { - return query::query_class_config{_read_concurrency_sem, _cfg.max_memory_for_unlimited_query_hard_limit()}; + return query::query_class_config{_read_concurrency_sem, + query::max_result_size(_cfg.max_memory_for_unlimited_query_soft_limit(), _cfg.max_memory_for_unlimited_query_hard_limit())}; // Reads done on behalf of view update generation run in the streaming group } else if (current_scheduling_group() == _dbcfg.streaming_scheduling_group) { - return query::query_class_config{_streaming_concurrency_sem, std::numeric_limits::max()}; + return query::query_class_config{_streaming_concurrency_sem, query::max_result_size(std::numeric_limits::max())}; // Everything else is considered a system query } else { - return query::query_class_config{_system_read_concurrency_sem, std::numeric_limits::max()}; + return query::query_class_config{_system_read_concurrency_sem, query::max_result_size(std::numeric_limits::max())}; } } diff --git a/flat_mutation_reader.cc b/flat_mutation_reader.cc index 746c09e6f8..c054a80c18 100644 --- a/flat_mutation_reader.cc +++ b/flat_mutation_reader.cc @@ -60,14 +60,15 @@ void flat_mutation_reader::impl::clear_buffer_to_next_partition() { _buffer_size = compute_buffer_size(*_schema, _buffer); } -flat_mutation_reader make_reversing_reader(flat_mutation_reader& original, size_t max_memory_consumption) { +flat_mutation_reader make_reversing_reader(flat_mutation_reader& original, query::max_result_size max_size) { class partition_reversing_mutation_reader final : public flat_mutation_reader::impl { flat_mutation_reader* _source; range_tombstone_list _range_tombstones; std::stack _mutation_fragments; mutation_fragment_opt _partition_end; size_t _stack_size = 0; - const size_t _max_stack_size; + const query::max_result_size _max_size; + bool _below_soft_limit = true; private: stop_iteration emit_partition() { auto emit_range_tombstone = [&] { @@ -119,7 +120,7 @@ flat_mutation_reader make_reversing_reader(flat_mutation_reader& original, size_ } else { _mutation_fragments.emplace(std::move(mf)); _stack_size += _mutation_fragments.top().memory_usage(*_schema); - if (_stack_size >= _max_stack_size) { + if (_stack_size > _max_size.hard_limit || (_stack_size > _max_size.soft_limit && _below_soft_limit)) { const partition_key* key = nullptr; auto it = buffer().end(); --it; @@ -129,21 +130,30 @@ flat_mutation_reader make_reversing_reader(flat_mutation_reader& original, size_ --it; key = &it->as_partition_start().key().key(); } - throw std::runtime_error(fmt::format( - "Aborting reverse partition read because partition {} is larger than the maximum safe size of {} for reversible partitions.", - key->with_schema(*_schema), - _max_stack_size)); + + if (_stack_size > _max_size.hard_limit) { + throw std::runtime_error(fmt::format( + "Memory usage of reversed read exceeds hard limit of {} (configured via max_memory_for_unlimited_query_hard_limit), while reading partition {}", + _max_size.hard_limit, + key->with_schema(*_schema))); + } else { + fmr_logger.warn( + "Memory usage of reversed read exceeds soft limit of {} (configured via max_memory_for_unlimited_query_soft_limit), while reading partition {}", + _max_size.soft_limit, + key->with_schema(*_schema)); + _below_soft_limit = false; + } } } } return make_ready_future(is_buffer_full()); } public: - explicit partition_reversing_mutation_reader(flat_mutation_reader& mr, size_t max_stack_size) + explicit partition_reversing_mutation_reader(flat_mutation_reader& mr, query::max_result_size max_size) : flat_mutation_reader::impl(mr.schema()) , _source(&mr) , _range_tombstones(*_schema) - , _max_stack_size(max_stack_size) + , _max_size(max_size) { } virtual future<> fill_buffer(db::timeout_clock::time_point timeout) override { @@ -185,7 +195,7 @@ flat_mutation_reader make_reversing_reader(flat_mutation_reader& original, size_ } }; - return make_flat_mutation_reader(original, max_memory_consumption); + return make_flat_mutation_reader(original, max_size); } template diff --git a/flat_mutation_reader.hh b/flat_mutation_reader.hh index 31373a44f6..a7b0a4ca23 100644 --- a/flat_mutation_reader.hh +++ b/flat_mutation_reader.hh @@ -29,6 +29,7 @@ #include "mutation_fragment.hh" #include "tracing/trace_state.hh" #include "mutation.hh" +#include "query_class_config.hh" #include #include @@ -720,15 +721,17 @@ make_generating_reader(schema_ptr s, std::function /// /// \param original the reader to be reversed, has to be kept alive while the /// reversing reader is in use. -/// \param max_memory_consumption the maximum amount of memory the reader is -/// allowed to use for reversing. The reverse reader reads entire partitions -/// into memory, before reversing them. Since partitions can be larger than -/// the available memory, we need to enforce a limit on memory consumption. -/// If the read uses more memory then this limit, the read is aborted. +/// \param max_size the maximum amount of memory the reader is allowed to use +/// for reversing and conversely the maximum size of the results. The +/// reverse reader reads entire partitions into memory, before reversing +/// them. Since partitions can be larger than the available memory, we need +/// to enforce a limit on memory consumption. When reaching the soft limit +/// a warning will be logged. When reaching the hard limit the read will be +/// aborted. /// /// FIXME: reversing should be done in the sstable layer, see #1413. flat_mutation_reader -make_reversing_reader(flat_mutation_reader& original, size_t max_memory_consumption); +make_reversing_reader(flat_mutation_reader& original, query::max_result_size max_size); /// Low level fragment stream validator. /// diff --git a/querier.hh b/querier.hh index 1e3f5691f5..04efbccc21 100644 --- a/querier.hh +++ b/querier.hh @@ -83,7 +83,7 @@ auto consume_page(flat_mutation_reader& reader, uint32_t partition_limit, gc_clock::time_point query_time, db::timeout_clock::time_point timeout, - size_t reverse_read_max_memory) { + query::max_result_size max_size) { return reader.peek(timeout).then([=, &reader, consumer = std::move(consumer), &slice] ( mutation_fragment* next_fragment) mutable { const auto next_fragment_kind = next_fragment ? next_fragment->mutation_fragment_kind() : mutation_fragment::kind::partition_end; @@ -94,9 +94,9 @@ auto consume_page(flat_mutation_reader& reader, compaction_state, clustering_position_tracker(std::move(consumer), last_ckey)); - auto consume = [&reader, &slice, reader_consumer = std::move(reader_consumer), timeout, reverse_read_max_memory] () mutable { + auto consume = [&reader, &slice, reader_consumer = std::move(reader_consumer), timeout, max_size] () mutable { if (slice.options.contains(query::partition_slice::option::reversed)) { - return do_with(make_reversing_reader(reader, reverse_read_max_memory), + return do_with(make_reversing_reader(reader, max_size), [reader_consumer = std::move(reader_consumer), timeout] (flat_mutation_reader& reversing_reader) mutable { return reversing_reader.consume(std::move(reader_consumer), timeout); }); @@ -223,9 +223,9 @@ public: uint32_t partition_limit, gc_clock::time_point query_time, db::timeout_clock::time_point timeout, - size_t reverse_read_max_memory) { + query::max_result_size max_size) { return ::query::consume_page(_reader, _compaction_state, *_slice, std::move(consumer), row_limit, partition_limit, query_time, - timeout, reverse_read_max_memory).then([this] (auto&& results) { + timeout, max_size).then([this] (auto&& results) { _last_ckey = std::get>(std::move(results)); constexpr auto size = std::tuple_size>::value; static_assert(size <= 2); diff --git a/query_class_config.hh b/query_class_config.hh index 5119934e33..168143219a 100644 --- a/query_class_config.hh +++ b/query_class_config.hh @@ -38,7 +38,7 @@ struct max_result_size { struct query_class_config { reader_concurrency_semaphore& semaphore; - uint64_t max_memory_for_unlimited_query; + max_result_size max_memory_for_unlimited_query; }; } diff --git a/test/boost/flat_mutation_reader_test.cc b/test/boost/flat_mutation_reader_test.cc index 7e2ffefe2f..f6ce5426b8 100644 --- a/test/boost/flat_mutation_reader_test.cc +++ b/test/boost/flat_mutation_reader_test.cc @@ -592,7 +592,7 @@ void test_flat_stream(schema_ptr s, std::vector muts, reversed_partiti return fmr.consume_in_thread(std::move(fsc), db::no_timeout); } else { if (reversed) { - auto reverse_reader = make_reversing_reader(fmr, size_t(1) << 20); + auto reverse_reader = make_reversing_reader(fmr, query::max_result_size(size_t(1) << 20)); return reverse_reader.consume(std::move(fsc), db::no_timeout).get0(); } return fmr.consume(std::move(fsc), db::no_timeout).get0(); @@ -805,7 +805,8 @@ SEASTAR_THREAD_TEST_CASE(test_reverse_reader_memory_limit) { auto test_with_partition = [&] (bool with_static_row) { testlog.info("Testing with_static_row={}", with_static_row); - auto mut = schema.new_mutation("pk1"); + const auto pk = "pk1"; + auto mut = schema.new_mutation(pk); const size_t desired_mut_size = 1 * 1024 * 1024; const size_t row_size = 10 * 1024; @@ -817,8 +818,9 @@ SEASTAR_THREAD_TEST_CASE(test_reverse_reader_memory_limit) { schema.add_row(mut, schema.make_ckey(++i), sstring(row_size, '0')); } + const uint64_t hard_limit = size_t(1) << 18; auto reader = flat_mutation_reader_from_mutations({mut}); - auto reverse_reader = make_reversing_reader(reader, size_t(1) << 10); + auto reverse_reader = make_reversing_reader(reader, query::max_result_size(size_t(1) << 10, hard_limit)); try { reverse_reader.consume(phony_consumer{}, db::no_timeout).get(); @@ -826,7 +828,12 @@ SEASTAR_THREAD_TEST_CASE(test_reverse_reader_memory_limit) { } catch (const std::runtime_error& e) { testlog.info("Got exception with message: {}", e.what()); auto str = sstring(e.what()); - BOOST_REQUIRE_EQUAL(str.find("Aborting reverse partition read because partition pk1"), 0); + const auto expected_str = format( + "Memory usage of reversed read exceeds hard limit of {} (configured via max_memory_for_unlimited_query_hard_limit), while reading partition {}", + hard_limit, + pk); + + BOOST_REQUIRE_EQUAL(str.find(expected_str), 0); } catch (...) { throw; } diff --git a/test/boost/querier_cache_test.cc b/test/boost/querier_cache_test.cc index 79e6203156..a393a06f35 100644 --- a/test/boost/querier_cache_test.cc +++ b/test/boost/querier_cache_test.cc @@ -214,7 +214,7 @@ public: auto querier = make_querier(range); auto [dk, ck] = querier.consume_page(dummy_result_builder{}, row_limit, std::numeric_limits::max(), - gc_clock::now(), db::no_timeout, std::numeric_limits::max()).get0(); + gc_clock::now(), db::no_timeout, query::max_result_size(std::numeric_limits::max())).get0(); const auto memory_usage = querier.memory_usage(); _cache.insert(cache_key, std::move(querier), nullptr); diff --git a/test/lib/reader_permit.cc b/test/lib/reader_permit.cc index bf292784e7..cb0e429375 100644 --- a/test/lib/reader_permit.cc +++ b/test/lib/reader_permit.cc @@ -34,7 +34,7 @@ reader_permit make_permit() { } query::query_class_config make_query_class_config() { - return query::query_class_config{the_semaphore, std::numeric_limits::max()}; + return query::query_class_config{the_semaphore, query::max_result_size(std::numeric_limits::max())}; } } // namespace tests