diff --git a/database.cc b/database.cc index e82163d29e..d09d1eead7 100644 --- a/database.cc +++ b/database.cc @@ -1420,13 +1420,22 @@ database::query(schema_ptr s, const query::read_command& cmd, query::result_opti querier_opt = _querier_cache.lookup_data_querier(cmd.query_uuid, *s, ranges.front(), cmd.slice, trace_state); } - auto permit = querier_opt ? querier_opt->permit() : semaphore.make_permit(s.get(), "data-query"); + auto read_func = [&, this] (reader_permit permit) { + reader_permit::used_guard ug{permit}; + return cf.query(std::move(s), std::move(permit), cmd, class_config, opts, ranges, trace_state, get_result_memory_limiter(), + timeout, &querier_opt).then([&result, ug = std::move(ug)] (lw_shared_ptr res) { + result = std::move(res); + }); + }; try { auto op = cf.read_in_progress(); - result = co_await _data_query_stage(&cf, std::move(s), std::move(permit), seastar::cref(cmd), class_config, opts, seastar::cref(ranges), - std::move(trace_state), seastar::ref(get_result_memory_limiter()), timeout, &querier_opt); + if (querier_opt) { + co_await semaphore.with_ready_permit(querier_opt->permit(), read_func); + } else { + co_await semaphore.with_permit(s.get(), "data-query", cf.estimate_read_memory_cost(), timeout, read_func); + } if (cmd.query_uuid != utils::UUID{} && querier_opt) { _querier_cache.insert(cmd.query_uuid, std::move(*querier_opt), std::move(trace_state)); @@ -1466,13 +1475,22 @@ database::query_mutations(schema_ptr s, const query::read_command& cmd, const dh querier_opt = _querier_cache.lookup_mutation_querier(cmd.query_uuid, *s, range, cmd.slice, trace_state); } - auto permit = querier_opt ? querier_opt->permit() : semaphore.make_permit(s.get(), "mutation-query"); + auto read_func = [&, this] (reader_permit permit) { + reader_permit::used_guard ug{permit}; + return cf.mutation_query(std::move(s), std::move(permit), cmd, class_config, range, + std::move(trace_state), std::move(accounter), timeout, &querier_opt).then([&result, ug = std::move(ug)] (reconcilable_result res) { + result = std::move(res); + }); + }; try { auto op = cf.read_in_progress(); - result = co_await _mutation_query_stage(&cf, std::move(s), std::move(permit), seastar::cref(cmd), class_config, seastar::cref(range), - std::move(trace_state), std::move(accounter), timeout, &querier_opt); + if (querier_opt) { + co_await semaphore.with_ready_permit(querier_opt->permit(), read_func); + } else { + co_await semaphore.with_permit(s.get(), "mutation-query", cf.estimate_read_memory_cost(), timeout, read_func); + } if (cmd.query_uuid != utils::UUID{} && querier_opt) { _querier_cache.insert(cmd.query_uuid, std::move(*querier_opt), std::move(trace_state)); @@ -1595,7 +1613,7 @@ reader_concurrency_semaphore& database::get_reader_concurrency_semaphore() { } future database::obtain_reader_permit(table& tbl, const char* const op_name, db::timeout_clock::time_point timeout) { - return get_reader_concurrency_semaphore().obtain_permit_nowait(tbl.schema().get(), op_name, tbl.estimate_read_memory_cost(), timeout); + return get_reader_concurrency_semaphore().obtain_permit(tbl.schema().get(), op_name, tbl.estimate_read_memory_cost(), timeout); } future database::obtain_reader_permit(schema_ptr schema, const char* const op_name, db::timeout_clock::time_point timeout) { diff --git a/db/view/view_update_generator.cc b/db/view/view_update_generator.cc index 60154aa3f8..71f9529d4c 100644 --- a/db/view/view_update_generator.cc +++ b/db/view/view_update_generator.cc @@ -83,7 +83,7 @@ future<> view_update_generator::start() { tracing::trace_state_ptr ts, streamed_mutation::forwarding fwd_ms, mutation_reader::forwarding fwd_mr) { - return make_restricted_range_sstable_reader(std::move(ssts), s, std::move(permit), pr, ps, pc, std::move(ts), fwd_ms, fwd_mr); + return ssts->make_range_sstable_reader(s, std::move(permit), pr, ps, pc, std::move(ts), fwd_ms, fwd_mr); }); auto [staging_sstable_reader, staging_sstable_reader_handle] = make_manually_paused_evictable_reader( std::move(ms), diff --git a/db/virtual_table.cc b/db/virtual_table.cc index a1e9c7a056..102c7e8c8b 100644 --- a/db/virtual_table.cc +++ b/db/virtual_table.cc @@ -89,7 +89,7 @@ mutation_source memtable_filling_virtual_table::as_mutation_source() { }; return execute(mutation_sink, timeout).then([this, mt, s, units, &range, &slice, &pc, &trace_state, &fwd, &fwd_mr] () { - auto rd = make_restricted_flat_reader(mt->as_data_source(), s, units->units.permit(), range, slice, pc, trace_state, fwd, fwd_mr); + auto rd = mt->as_data_source().make_reader(s, units->units.permit(), range, slice, pc, trace_state, fwd, fwd_mr); if (!_shard_aware) { rd = make_filtering_reader(std::move(rd), [this] (const dht::decorated_key& dk) -> bool { diff --git a/mutation_reader.cc b/mutation_reader.cc index 631a01c2ed..e8776161ec 100644 --- a/mutation_reader.cc +++ b/mutation_reader.cc @@ -1264,6 +1264,7 @@ future evictable_reader::resume_or_create_reader(db::timeo if (auto reader_opt = try_resume()) { co_return std::move(*reader_opt); } + co_await _permit.maybe_wait_readmission(timeout); co_return recreate_reader(); } diff --git a/reader_concurrency_semaphore.cc b/reader_concurrency_semaphore.cc index b0f0917f4f..d37e3c359d 100644 --- a/reader_concurrency_semaphore.cc +++ b/reader_concurrency_semaphore.cc @@ -790,6 +790,10 @@ bool reader_concurrency_semaphore::has_available_units(const resources& r) const return (bool(_resources) && _resources >= r) || _resources.count == _initial_resources.count; } +bool reader_concurrency_semaphore::all_used_permits_are_stalled() const { + return _permit_list->stats.used_permits == _permit_list->stats.blocked_permits; +} + std::exception_ptr reader_concurrency_semaphore::check_queue_size(std::string_view queue_name) { if ((_wait_list.size() + _ready_list.size()) >= _max_queue_length) { _stats.total_reads_shed_due_to_overload++; @@ -827,27 +831,31 @@ future<> reader_concurrency_semaphore::do_wait_admission(reader_permit permit, d if (!_execution_loop_future) { _execution_loop_future.emplace(execution_loop()); } - auto first = _wait_list.empty() && _ready_list.empty(); + if (!_wait_list.empty() || !_ready_list.empty()) { + return enqueue_waiter(std::move(permit), timeout, std::move(func)); + } - if (first && has_available_units(permit.base_resources())) { - permit.on_admission(); - if (func) { - return with_ready_permit(std::move(permit), std::move(func)); + if (!has_available_units(permit.base_resources())) { + auto fut = enqueue_waiter(std::move(permit), timeout, std::move(func)); + if (!_inactive_reads.empty()) { + evict_readers_in_background(); } - return make_ready_future<>(); + return fut; } - auto fut = enqueue_waiter(std::move(permit), timeout, std::move(func)); - - if (first && !_inactive_reads.empty()) { - evict_readers_in_background(); + if (!all_used_permits_are_stalled()) { + return enqueue_waiter(std::move(permit), timeout, std::move(func)); } - return fut; + permit.on_admission(); + if (func) { + return with_ready_permit(std::move(permit), std::move(func)); + } + return make_ready_future<>(); } void reader_concurrency_semaphore::maybe_admit_waiters() noexcept { - while (!_wait_list.empty() && _ready_list.empty() && has_available_units(_wait_list.front().permit.base_resources())) { + while (!_wait_list.empty() && _ready_list.empty() && has_available_units(_wait_list.front().permit.base_resources()) && all_used_permits_are_stalled()) { auto& x = _wait_list.front(); try { x.permit.on_admission(); @@ -888,11 +896,13 @@ void reader_concurrency_semaphore::on_permit_unused() noexcept { assert(_permit_list->stats.used_permits); --_permit_list->stats.used_permits; assert(_permit_list->stats.used_permits >= _permit_list->stats.blocked_permits); + maybe_admit_waiters(); } void reader_concurrency_semaphore::on_permit_blocked() noexcept { ++_permit_list->stats.blocked_permits; assert(_permit_list->stats.used_permits >= _permit_list->stats.blocked_permits); + maybe_admit_waiters(); } void reader_concurrency_semaphore::on_permit_unblocked() noexcept { diff --git a/reader_concurrency_semaphore.hh b/reader_concurrency_semaphore.hh index 1a24919c00..3d15011957 100644 --- a/reader_concurrency_semaphore.hh +++ b/reader_concurrency_semaphore.hh @@ -47,7 +47,7 @@ using namespace seastar; /// It's also possible to specify the maximum allowed number of waiting /// readers by the `max_queue_length` constructor parameter. When the /// number of waiting readers becomes equal or greater than -/// `max_queue_length` (upon calling `wait_admission()`) an exception of +/// `max_queue_length` (upon calling `obtain_permit()`) an exception of /// type `std::runtime_error` is thrown. Optionally, some additional /// code can be executed just before throwing (`prethrow_action` /// constructor parameter). @@ -199,6 +199,8 @@ private: bool has_available_units(const resources& r) const; + bool all_used_permits_are_stalled() const; + [[nodiscard]] std::exception_ptr check_queue_size(std::string_view queue_name); // Add the permit to the wait queue and return the future which resolves when diff --git a/reader_permit.hh b/reader_permit.hh index 0461fa2a2a..73ee578172 100644 --- a/reader_permit.hh +++ b/reader_permit.hh @@ -81,12 +81,9 @@ class reader_concurrency_semaphore; /// A permit for a specific read. /// -/// Used to track the read's resource consumption and wait for admission to read -/// from the disk. -/// Use `consume_memory()` to register memory usage. Use `wait_admission()` to -/// wait for admission, before reading from the disk. Both methods return a -/// `resource_units` RAII object that should be held onto while the respective -/// resources are in use. +/// Used to track the read's resource consumption. Use `consume_memory()` to +/// register memory usage, which returns a `resource_units` RAII object that +/// should be held onto while the respective resources are in use. class reader_permit { friend class reader_concurrency_semaphore; diff --git a/table.cc b/table.cc index 53d1a665e3..679ae5dbf4 100644 --- a/table.cc +++ b/table.cc @@ -117,7 +117,7 @@ table::make_sstable_reader(schema_ptr s, } }(); - return make_restricted_flat_reader(std::move(ms), std::move(s), std::move(permit), pr, slice, pc, std::move(trace_state), fwd, fwd_mr); + return ms.make_reader(std::move(s), std::move(permit), pr, slice, pc, std::move(trace_state), fwd, fwd_mr); } lw_shared_ptr table::make_compound_sstable_set() { diff --git a/test/boost/view_build_test.cc b/test/boost/view_build_test.cc index 8172704a3b..ac7c436b05 100644 --- a/test/boost/view_build_test.cc +++ b/test/boost/view_build_test.cc @@ -832,26 +832,15 @@ SEASTAR_THREAD_TEST_CASE(test_view_update_generator_buffering) { return less(a.decorated_key(), b.decorated_key()); }); - auto permit = sem.obtain_permit_nowait(schema.get(), get_name(), new_reader_base_cost, db::no_timeout).get0(); + auto permit = sem.obtain_permit(schema.get(), get_name(), new_reader_base_cost, db::no_timeout).get0(); auto mt = make_lw_shared(schema); for (const auto& mut : muts) { mt->apply(mut); } - auto ms = mutation_source([mt] ( - schema_ptr s, - reader_permit permit, - const dht::partition_range& pr, - const query::partition_slice& ps, - const io_priority_class& pc, - tracing::trace_state_ptr ts, - streamed_mutation::forwarding fwd_ms, - mutation_reader::forwarding fwd_mr) { - return make_restricted_flat_reader(mt->as_data_source(), s, std::move(permit), pr, ps, pc, std::move(ts), fwd_ms, fwd_mr); - }); auto p = make_manually_paused_evictable_reader( - std::move(ms), + mt->as_data_source(), schema, permit, query::full_partition_range, diff --git a/test/lib/reader_lifecycle_policy.hh b/test/lib/reader_lifecycle_policy.hh index d1fc078891..56852d7b73 100644 --- a/test/lib/reader_lifecycle_policy.hh +++ b/test/lib/reader_lifecycle_policy.hh @@ -103,7 +103,7 @@ public: return *_contexts[shard]->semaphore; } virtual future obtain_reader_permit(schema_ptr schema, const char* const description, db::timeout_clock::time_point timeout) override { - return semaphore().obtain_permit_nowait(schema.get(), description, 128 * 1024, timeout); + return semaphore().obtain_permit(schema.get(), description, 128 * 1024, timeout); } };