From 1b7eea0f523203f4e55085439c8aca904d9a0b75 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 25 Mar 2021 16:54:34 +0200 Subject: [PATCH] reader_concurrency_semaphore: admission: flip the switch This patch flips two "switches": 1) It switches admission to be up-front. 2) It changes the admission algorithm. (1) by now all permits are obtained up-front, so this patch just yanks out the restricted reader from all reader stacks and simultaneously switches all `obtain_permit_nowait()` calls to `obtain_permit()`. By doing this admission is now waited on when creating the permit. (2) we switch to an admission algorithm that adds a new aspect to the existing resource availability: the number of used/blocked reads. Namely it only admits new reads if in addition to the necessary amount of resources being available, all currently used readers are blocked. In other words we only admit new reads if all currently admitted reads requires something other than CPU to progress. They are either waiting on I/O, a remote shard, or attention from their consumers (not used currently). We flip these two switches at the same time because up-front admission means cache reads now need to obtain a permit too. For cache reads the optimal concurrency is 1. Anything above that just increases latency (without increasing throughput). So we want to make sure that if a cache reader hits it doesn't get any competition for CPU and it can run to completion. We admit new reads only if the read misses and has to go to disk. Another change made to accommodate this switch is the replacement of the replica side read execution stages which the reader concurrency semaphore as an execution stage. This replacement is needed because with the introduction of up-front admission, reads are not independent of each other any-more. One read executed can influence whether later reads executed will be admitted or not, and execution stages require independent operations to work well. By moving the execution stage into the semaphore, we have an execution stage which is in control of both admission and running the operations in batches, avoiding the bad interaction between the two. --- database.cc | 32 +++++++++++++++++++++------ db/view/view_update_generator.cc | 2 +- db/virtual_table.cc | 2 +- mutation_reader.cc | 1 + reader_concurrency_semaphore.cc | 34 +++++++++++++++++++---------- reader_concurrency_semaphore.hh | 4 +++- reader_permit.hh | 9 +++----- table.cc | 2 +- test/boost/view_build_test.cc | 15 ++----------- test/lib/reader_lifecycle_policy.hh | 2 +- 10 files changed, 60 insertions(+), 43 deletions(-) diff --git a/database.cc b/database.cc index e82163d29e..d09d1eead7 100644 --- a/database.cc +++ b/database.cc @@ -1420,13 +1420,22 @@ database::query(schema_ptr s, const query::read_command& cmd, query::result_opti querier_opt = _querier_cache.lookup_data_querier(cmd.query_uuid, *s, ranges.front(), cmd.slice, trace_state); } - auto permit = querier_opt ? querier_opt->permit() : semaphore.make_permit(s.get(), "data-query"); + auto read_func = [&, this] (reader_permit permit) { + reader_permit::used_guard ug{permit}; + return cf.query(std::move(s), std::move(permit), cmd, class_config, opts, ranges, trace_state, get_result_memory_limiter(), + timeout, &querier_opt).then([&result, ug = std::move(ug)] (lw_shared_ptr res) { + result = std::move(res); + }); + }; try { auto op = cf.read_in_progress(); - result = co_await _data_query_stage(&cf, std::move(s), std::move(permit), seastar::cref(cmd), class_config, opts, seastar::cref(ranges), - std::move(trace_state), seastar::ref(get_result_memory_limiter()), timeout, &querier_opt); + if (querier_opt) { + co_await semaphore.with_ready_permit(querier_opt->permit(), read_func); + } else { + co_await semaphore.with_permit(s.get(), "data-query", cf.estimate_read_memory_cost(), timeout, read_func); + } if (cmd.query_uuid != utils::UUID{} && querier_opt) { _querier_cache.insert(cmd.query_uuid, std::move(*querier_opt), std::move(trace_state)); @@ -1466,13 +1475,22 @@ database::query_mutations(schema_ptr s, const query::read_command& cmd, const dh querier_opt = _querier_cache.lookup_mutation_querier(cmd.query_uuid, *s, range, cmd.slice, trace_state); } - auto permit = querier_opt ? querier_opt->permit() : semaphore.make_permit(s.get(), "mutation-query"); + auto read_func = [&, this] (reader_permit permit) { + reader_permit::used_guard ug{permit}; + return cf.mutation_query(std::move(s), std::move(permit), cmd, class_config, range, + std::move(trace_state), std::move(accounter), timeout, &querier_opt).then([&result, ug = std::move(ug)] (reconcilable_result res) { + result = std::move(res); + }); + }; try { auto op = cf.read_in_progress(); - result = co_await _mutation_query_stage(&cf, std::move(s), std::move(permit), seastar::cref(cmd), class_config, seastar::cref(range), - std::move(trace_state), std::move(accounter), timeout, &querier_opt); + if (querier_opt) { + co_await semaphore.with_ready_permit(querier_opt->permit(), read_func); + } else { + co_await semaphore.with_permit(s.get(), "mutation-query", cf.estimate_read_memory_cost(), timeout, read_func); + } if (cmd.query_uuid != utils::UUID{} && querier_opt) { _querier_cache.insert(cmd.query_uuid, std::move(*querier_opt), std::move(trace_state)); @@ -1595,7 +1613,7 @@ reader_concurrency_semaphore& database::get_reader_concurrency_semaphore() { } future database::obtain_reader_permit(table& tbl, const char* const op_name, db::timeout_clock::time_point timeout) { - return get_reader_concurrency_semaphore().obtain_permit_nowait(tbl.schema().get(), op_name, tbl.estimate_read_memory_cost(), timeout); + return get_reader_concurrency_semaphore().obtain_permit(tbl.schema().get(), op_name, tbl.estimate_read_memory_cost(), timeout); } future database::obtain_reader_permit(schema_ptr schema, const char* const op_name, db::timeout_clock::time_point timeout) { diff --git a/db/view/view_update_generator.cc b/db/view/view_update_generator.cc index 60154aa3f8..71f9529d4c 100644 --- a/db/view/view_update_generator.cc +++ b/db/view/view_update_generator.cc @@ -83,7 +83,7 @@ future<> view_update_generator::start() { tracing::trace_state_ptr ts, streamed_mutation::forwarding fwd_ms, mutation_reader::forwarding fwd_mr) { - return make_restricted_range_sstable_reader(std::move(ssts), s, std::move(permit), pr, ps, pc, std::move(ts), fwd_ms, fwd_mr); + return ssts->make_range_sstable_reader(s, std::move(permit), pr, ps, pc, std::move(ts), fwd_ms, fwd_mr); }); auto [staging_sstable_reader, staging_sstable_reader_handle] = make_manually_paused_evictable_reader( std::move(ms), diff --git a/db/virtual_table.cc b/db/virtual_table.cc index a1e9c7a056..102c7e8c8b 100644 --- a/db/virtual_table.cc +++ b/db/virtual_table.cc @@ -89,7 +89,7 @@ mutation_source memtable_filling_virtual_table::as_mutation_source() { }; return execute(mutation_sink, timeout).then([this, mt, s, units, &range, &slice, &pc, &trace_state, &fwd, &fwd_mr] () { - auto rd = make_restricted_flat_reader(mt->as_data_source(), s, units->units.permit(), range, slice, pc, trace_state, fwd, fwd_mr); + auto rd = mt->as_data_source().make_reader(s, units->units.permit(), range, slice, pc, trace_state, fwd, fwd_mr); if (!_shard_aware) { rd = make_filtering_reader(std::move(rd), [this] (const dht::decorated_key& dk) -> bool { diff --git a/mutation_reader.cc b/mutation_reader.cc index 631a01c2ed..e8776161ec 100644 --- a/mutation_reader.cc +++ b/mutation_reader.cc @@ -1264,6 +1264,7 @@ future evictable_reader::resume_or_create_reader(db::timeo if (auto reader_opt = try_resume()) { co_return std::move(*reader_opt); } + co_await _permit.maybe_wait_readmission(timeout); co_return recreate_reader(); } diff --git a/reader_concurrency_semaphore.cc b/reader_concurrency_semaphore.cc index b0f0917f4f..d37e3c359d 100644 --- a/reader_concurrency_semaphore.cc +++ b/reader_concurrency_semaphore.cc @@ -790,6 +790,10 @@ bool reader_concurrency_semaphore::has_available_units(const resources& r) const return (bool(_resources) && _resources >= r) || _resources.count == _initial_resources.count; } +bool reader_concurrency_semaphore::all_used_permits_are_stalled() const { + return _permit_list->stats.used_permits == _permit_list->stats.blocked_permits; +} + std::exception_ptr reader_concurrency_semaphore::check_queue_size(std::string_view queue_name) { if ((_wait_list.size() + _ready_list.size()) >= _max_queue_length) { _stats.total_reads_shed_due_to_overload++; @@ -827,27 +831,31 @@ future<> reader_concurrency_semaphore::do_wait_admission(reader_permit permit, d if (!_execution_loop_future) { _execution_loop_future.emplace(execution_loop()); } - auto first = _wait_list.empty() && _ready_list.empty(); + if (!_wait_list.empty() || !_ready_list.empty()) { + return enqueue_waiter(std::move(permit), timeout, std::move(func)); + } - if (first && has_available_units(permit.base_resources())) { - permit.on_admission(); - if (func) { - return with_ready_permit(std::move(permit), std::move(func)); + if (!has_available_units(permit.base_resources())) { + auto fut = enqueue_waiter(std::move(permit), timeout, std::move(func)); + if (!_inactive_reads.empty()) { + evict_readers_in_background(); } - return make_ready_future<>(); + return fut; } - auto fut = enqueue_waiter(std::move(permit), timeout, std::move(func)); - - if (first && !_inactive_reads.empty()) { - evict_readers_in_background(); + if (!all_used_permits_are_stalled()) { + return enqueue_waiter(std::move(permit), timeout, std::move(func)); } - return fut; + permit.on_admission(); + if (func) { + return with_ready_permit(std::move(permit), std::move(func)); + } + return make_ready_future<>(); } void reader_concurrency_semaphore::maybe_admit_waiters() noexcept { - while (!_wait_list.empty() && _ready_list.empty() && has_available_units(_wait_list.front().permit.base_resources())) { + while (!_wait_list.empty() && _ready_list.empty() && has_available_units(_wait_list.front().permit.base_resources()) && all_used_permits_are_stalled()) { auto& x = _wait_list.front(); try { x.permit.on_admission(); @@ -888,11 +896,13 @@ void reader_concurrency_semaphore::on_permit_unused() noexcept { assert(_permit_list->stats.used_permits); --_permit_list->stats.used_permits; assert(_permit_list->stats.used_permits >= _permit_list->stats.blocked_permits); + maybe_admit_waiters(); } void reader_concurrency_semaphore::on_permit_blocked() noexcept { ++_permit_list->stats.blocked_permits; assert(_permit_list->stats.used_permits >= _permit_list->stats.blocked_permits); + maybe_admit_waiters(); } void reader_concurrency_semaphore::on_permit_unblocked() noexcept { diff --git a/reader_concurrency_semaphore.hh b/reader_concurrency_semaphore.hh index 1a24919c00..3d15011957 100644 --- a/reader_concurrency_semaphore.hh +++ b/reader_concurrency_semaphore.hh @@ -47,7 +47,7 @@ using namespace seastar; /// It's also possible to specify the maximum allowed number of waiting /// readers by the `max_queue_length` constructor parameter. When the /// number of waiting readers becomes equal or greater than -/// `max_queue_length` (upon calling `wait_admission()`) an exception of +/// `max_queue_length` (upon calling `obtain_permit()`) an exception of /// type `std::runtime_error` is thrown. Optionally, some additional /// code can be executed just before throwing (`prethrow_action` /// constructor parameter). @@ -199,6 +199,8 @@ private: bool has_available_units(const resources& r) const; + bool all_used_permits_are_stalled() const; + [[nodiscard]] std::exception_ptr check_queue_size(std::string_view queue_name); // Add the permit to the wait queue and return the future which resolves when diff --git a/reader_permit.hh b/reader_permit.hh index 0461fa2a2a..73ee578172 100644 --- a/reader_permit.hh +++ b/reader_permit.hh @@ -81,12 +81,9 @@ class reader_concurrency_semaphore; /// A permit for a specific read. /// -/// Used to track the read's resource consumption and wait for admission to read -/// from the disk. -/// Use `consume_memory()` to register memory usage. Use `wait_admission()` to -/// wait for admission, before reading from the disk. Both methods return a -/// `resource_units` RAII object that should be held onto while the respective -/// resources are in use. +/// Used to track the read's resource consumption. Use `consume_memory()` to +/// register memory usage, which returns a `resource_units` RAII object that +/// should be held onto while the respective resources are in use. class reader_permit { friend class reader_concurrency_semaphore; diff --git a/table.cc b/table.cc index 53d1a665e3..679ae5dbf4 100644 --- a/table.cc +++ b/table.cc @@ -117,7 +117,7 @@ table::make_sstable_reader(schema_ptr s, } }(); - return make_restricted_flat_reader(std::move(ms), std::move(s), std::move(permit), pr, slice, pc, std::move(trace_state), fwd, fwd_mr); + return ms.make_reader(std::move(s), std::move(permit), pr, slice, pc, std::move(trace_state), fwd, fwd_mr); } lw_shared_ptr table::make_compound_sstable_set() { diff --git a/test/boost/view_build_test.cc b/test/boost/view_build_test.cc index 8172704a3b..ac7c436b05 100644 --- a/test/boost/view_build_test.cc +++ b/test/boost/view_build_test.cc @@ -832,26 +832,15 @@ SEASTAR_THREAD_TEST_CASE(test_view_update_generator_buffering) { return less(a.decorated_key(), b.decorated_key()); }); - auto permit = sem.obtain_permit_nowait(schema.get(), get_name(), new_reader_base_cost, db::no_timeout).get0(); + auto permit = sem.obtain_permit(schema.get(), get_name(), new_reader_base_cost, db::no_timeout).get0(); auto mt = make_lw_shared(schema); for (const auto& mut : muts) { mt->apply(mut); } - auto ms = mutation_source([mt] ( - schema_ptr s, - reader_permit permit, - const dht::partition_range& pr, - const query::partition_slice& ps, - const io_priority_class& pc, - tracing::trace_state_ptr ts, - streamed_mutation::forwarding fwd_ms, - mutation_reader::forwarding fwd_mr) { - return make_restricted_flat_reader(mt->as_data_source(), s, std::move(permit), pr, ps, pc, std::move(ts), fwd_ms, fwd_mr); - }); auto p = make_manually_paused_evictable_reader( - std::move(ms), + mt->as_data_source(), schema, permit, query::full_partition_range, diff --git a/test/lib/reader_lifecycle_policy.hh b/test/lib/reader_lifecycle_policy.hh index d1fc078891..56852d7b73 100644 --- a/test/lib/reader_lifecycle_policy.hh +++ b/test/lib/reader_lifecycle_policy.hh @@ -103,7 +103,7 @@ public: return *_contexts[shard]->semaphore; } virtual future obtain_reader_permit(schema_ptr schema, const char* const description, db::timeout_clock::time_point timeout) override { - return semaphore().obtain_permit_nowait(schema.get(), description, 128 * 1024, timeout); + return semaphore().obtain_permit(schema.get(), description, 128 * 1024, timeout); } };