reader_concurrency_semaphore: admission: flip the switch
This patch flips two "switches": 1) It switches admission to be up-front. 2) It changes the admission algorithm. (1) by now all permits are obtained up-front, so this patch just yanks out the restricted reader from all reader stacks and simultaneously switches all `obtain_permit_nowait()` calls to `obtain_permit()`. By doing this admission is now waited on when creating the permit. (2) we switch to an admission algorithm that adds a new aspect to the existing resource availability: the number of used/blocked reads. Namely it only admits new reads if in addition to the necessary amount of resources being available, all currently used readers are blocked. In other words we only admit new reads if all currently admitted reads requires something other than CPU to progress. They are either waiting on I/O, a remote shard, or attention from their consumers (not used currently). We flip these two switches at the same time because up-front admission means cache reads now need to obtain a permit too. For cache reads the optimal concurrency is 1. Anything above that just increases latency (without increasing throughput). So we want to make sure that if a cache reader hits it doesn't get any competition for CPU and it can run to completion. We admit new reads only if the read misses and has to go to disk. Another change made to accommodate this switch is the replacement of the replica side read execution stages which the reader concurrency semaphore as an execution stage. This replacement is needed because with the introduction of up-front admission, reads are not independent of each other any-more. One read executed can influence whether later reads executed will be admitted or not, and execution stages require independent operations to work well. By moving the execution stage into the semaphore, we have an execution stage which is in control of both admission and running the operations in batches, avoiding the bad interaction between the two.
This commit is contained in:
32
database.cc
32
database.cc
@@ -1420,13 +1420,22 @@ database::query(schema_ptr s, const query::read_command& cmd, query::result_opti
|
||||
querier_opt = _querier_cache.lookup_data_querier(cmd.query_uuid, *s, ranges.front(), cmd.slice, trace_state);
|
||||
}
|
||||
|
||||
auto permit = querier_opt ? querier_opt->permit() : semaphore.make_permit(s.get(), "data-query");
|
||||
auto read_func = [&, this] (reader_permit permit) {
|
||||
reader_permit::used_guard ug{permit};
|
||||
return cf.query(std::move(s), std::move(permit), cmd, class_config, opts, ranges, trace_state, get_result_memory_limiter(),
|
||||
timeout, &querier_opt).then([&result, ug = std::move(ug)] (lw_shared_ptr<query::result> res) {
|
||||
result = std::move(res);
|
||||
});
|
||||
};
|
||||
|
||||
try {
|
||||
auto op = cf.read_in_progress();
|
||||
|
||||
result = co_await _data_query_stage(&cf, std::move(s), std::move(permit), seastar::cref(cmd), class_config, opts, seastar::cref(ranges),
|
||||
std::move(trace_state), seastar::ref(get_result_memory_limiter()), timeout, &querier_opt);
|
||||
if (querier_opt) {
|
||||
co_await semaphore.with_ready_permit(querier_opt->permit(), read_func);
|
||||
} else {
|
||||
co_await semaphore.with_permit(s.get(), "data-query", cf.estimate_read_memory_cost(), timeout, read_func);
|
||||
}
|
||||
|
||||
if (cmd.query_uuid != utils::UUID{} && querier_opt) {
|
||||
_querier_cache.insert(cmd.query_uuid, std::move(*querier_opt), std::move(trace_state));
|
||||
@@ -1466,13 +1475,22 @@ database::query_mutations(schema_ptr s, const query::read_command& cmd, const dh
|
||||
querier_opt = _querier_cache.lookup_mutation_querier(cmd.query_uuid, *s, range, cmd.slice, trace_state);
|
||||
}
|
||||
|
||||
auto permit = querier_opt ? querier_opt->permit() : semaphore.make_permit(s.get(), "mutation-query");
|
||||
auto read_func = [&, this] (reader_permit permit) {
|
||||
reader_permit::used_guard ug{permit};
|
||||
return cf.mutation_query(std::move(s), std::move(permit), cmd, class_config, range,
|
||||
std::move(trace_state), std::move(accounter), timeout, &querier_opt).then([&result, ug = std::move(ug)] (reconcilable_result res) {
|
||||
result = std::move(res);
|
||||
});
|
||||
};
|
||||
|
||||
try {
|
||||
auto op = cf.read_in_progress();
|
||||
|
||||
result = co_await _mutation_query_stage(&cf, std::move(s), std::move(permit), seastar::cref(cmd), class_config, seastar::cref(range),
|
||||
std::move(trace_state), std::move(accounter), timeout, &querier_opt);
|
||||
if (querier_opt) {
|
||||
co_await semaphore.with_ready_permit(querier_opt->permit(), read_func);
|
||||
} else {
|
||||
co_await semaphore.with_permit(s.get(), "mutation-query", cf.estimate_read_memory_cost(), timeout, read_func);
|
||||
}
|
||||
|
||||
if (cmd.query_uuid != utils::UUID{} && querier_opt) {
|
||||
_querier_cache.insert(cmd.query_uuid, std::move(*querier_opt), std::move(trace_state));
|
||||
@@ -1595,7 +1613,7 @@ reader_concurrency_semaphore& database::get_reader_concurrency_semaphore() {
|
||||
}
|
||||
|
||||
future<reader_permit> database::obtain_reader_permit(table& tbl, const char* const op_name, db::timeout_clock::time_point timeout) {
|
||||
return get_reader_concurrency_semaphore().obtain_permit_nowait(tbl.schema().get(), op_name, tbl.estimate_read_memory_cost(), timeout);
|
||||
return get_reader_concurrency_semaphore().obtain_permit(tbl.schema().get(), op_name, tbl.estimate_read_memory_cost(), timeout);
|
||||
}
|
||||
|
||||
future<reader_permit> database::obtain_reader_permit(schema_ptr schema, const char* const op_name, db::timeout_clock::time_point timeout) {
|
||||
|
||||
@@ -83,7 +83,7 @@ future<> view_update_generator::start() {
|
||||
tracing::trace_state_ptr ts,
|
||||
streamed_mutation::forwarding fwd_ms,
|
||||
mutation_reader::forwarding fwd_mr) {
|
||||
return make_restricted_range_sstable_reader(std::move(ssts), s, std::move(permit), pr, ps, pc, std::move(ts), fwd_ms, fwd_mr);
|
||||
return ssts->make_range_sstable_reader(s, std::move(permit), pr, ps, pc, std::move(ts), fwd_ms, fwd_mr);
|
||||
});
|
||||
auto [staging_sstable_reader, staging_sstable_reader_handle] = make_manually_paused_evictable_reader(
|
||||
std::move(ms),
|
||||
|
||||
@@ -89,7 +89,7 @@ mutation_source memtable_filling_virtual_table::as_mutation_source() {
|
||||
};
|
||||
|
||||
return execute(mutation_sink, timeout).then([this, mt, s, units, &range, &slice, &pc, &trace_state, &fwd, &fwd_mr] () {
|
||||
auto rd = make_restricted_flat_reader(mt->as_data_source(), s, units->units.permit(), range, slice, pc, trace_state, fwd, fwd_mr);
|
||||
auto rd = mt->as_data_source().make_reader(s, units->units.permit(), range, slice, pc, trace_state, fwd, fwd_mr);
|
||||
|
||||
if (!_shard_aware) {
|
||||
rd = make_filtering_reader(std::move(rd), [this] (const dht::decorated_key& dk) -> bool {
|
||||
|
||||
@@ -1264,6 +1264,7 @@ future<flat_mutation_reader> evictable_reader::resume_or_create_reader(db::timeo
|
||||
if (auto reader_opt = try_resume()) {
|
||||
co_return std::move(*reader_opt);
|
||||
}
|
||||
co_await _permit.maybe_wait_readmission(timeout);
|
||||
co_return recreate_reader();
|
||||
}
|
||||
|
||||
|
||||
@@ -790,6 +790,10 @@ bool reader_concurrency_semaphore::has_available_units(const resources& r) const
|
||||
return (bool(_resources) && _resources >= r) || _resources.count == _initial_resources.count;
|
||||
}
|
||||
|
||||
bool reader_concurrency_semaphore::all_used_permits_are_stalled() const {
|
||||
return _permit_list->stats.used_permits == _permit_list->stats.blocked_permits;
|
||||
}
|
||||
|
||||
std::exception_ptr reader_concurrency_semaphore::check_queue_size(std::string_view queue_name) {
|
||||
if ((_wait_list.size() + _ready_list.size()) >= _max_queue_length) {
|
||||
_stats.total_reads_shed_due_to_overload++;
|
||||
@@ -827,27 +831,31 @@ future<> reader_concurrency_semaphore::do_wait_admission(reader_permit permit, d
|
||||
if (!_execution_loop_future) {
|
||||
_execution_loop_future.emplace(execution_loop());
|
||||
}
|
||||
auto first = _wait_list.empty() && _ready_list.empty();
|
||||
if (!_wait_list.empty() || !_ready_list.empty()) {
|
||||
return enqueue_waiter(std::move(permit), timeout, std::move(func));
|
||||
}
|
||||
|
||||
if (first && has_available_units(permit.base_resources())) {
|
||||
permit.on_admission();
|
||||
if (func) {
|
||||
return with_ready_permit(std::move(permit), std::move(func));
|
||||
if (!has_available_units(permit.base_resources())) {
|
||||
auto fut = enqueue_waiter(std::move(permit), timeout, std::move(func));
|
||||
if (!_inactive_reads.empty()) {
|
||||
evict_readers_in_background();
|
||||
}
|
||||
return make_ready_future<>();
|
||||
return fut;
|
||||
}
|
||||
|
||||
auto fut = enqueue_waiter(std::move(permit), timeout, std::move(func));
|
||||
|
||||
if (first && !_inactive_reads.empty()) {
|
||||
evict_readers_in_background();
|
||||
if (!all_used_permits_are_stalled()) {
|
||||
return enqueue_waiter(std::move(permit), timeout, std::move(func));
|
||||
}
|
||||
|
||||
return fut;
|
||||
permit.on_admission();
|
||||
if (func) {
|
||||
return with_ready_permit(std::move(permit), std::move(func));
|
||||
}
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
void reader_concurrency_semaphore::maybe_admit_waiters() noexcept {
|
||||
while (!_wait_list.empty() && _ready_list.empty() && has_available_units(_wait_list.front().permit.base_resources())) {
|
||||
while (!_wait_list.empty() && _ready_list.empty() && has_available_units(_wait_list.front().permit.base_resources()) && all_used_permits_are_stalled()) {
|
||||
auto& x = _wait_list.front();
|
||||
try {
|
||||
x.permit.on_admission();
|
||||
@@ -888,11 +896,13 @@ void reader_concurrency_semaphore::on_permit_unused() noexcept {
|
||||
assert(_permit_list->stats.used_permits);
|
||||
--_permit_list->stats.used_permits;
|
||||
assert(_permit_list->stats.used_permits >= _permit_list->stats.blocked_permits);
|
||||
maybe_admit_waiters();
|
||||
}
|
||||
|
||||
void reader_concurrency_semaphore::on_permit_blocked() noexcept {
|
||||
++_permit_list->stats.blocked_permits;
|
||||
assert(_permit_list->stats.used_permits >= _permit_list->stats.blocked_permits);
|
||||
maybe_admit_waiters();
|
||||
}
|
||||
|
||||
void reader_concurrency_semaphore::on_permit_unblocked() noexcept {
|
||||
|
||||
@@ -47,7 +47,7 @@ using namespace seastar;
|
||||
/// It's also possible to specify the maximum allowed number of waiting
|
||||
/// readers by the `max_queue_length` constructor parameter. When the
|
||||
/// number of waiting readers becomes equal or greater than
|
||||
/// `max_queue_length` (upon calling `wait_admission()`) an exception of
|
||||
/// `max_queue_length` (upon calling `obtain_permit()`) an exception of
|
||||
/// type `std::runtime_error` is thrown. Optionally, some additional
|
||||
/// code can be executed just before throwing (`prethrow_action`
|
||||
/// constructor parameter).
|
||||
@@ -199,6 +199,8 @@ private:
|
||||
|
||||
bool has_available_units(const resources& r) const;
|
||||
|
||||
bool all_used_permits_are_stalled() const;
|
||||
|
||||
[[nodiscard]] std::exception_ptr check_queue_size(std::string_view queue_name);
|
||||
|
||||
// Add the permit to the wait queue and return the future which resolves when
|
||||
|
||||
@@ -81,12 +81,9 @@ class reader_concurrency_semaphore;
|
||||
|
||||
/// A permit for a specific read.
|
||||
///
|
||||
/// Used to track the read's resource consumption and wait for admission to read
|
||||
/// from the disk.
|
||||
/// Use `consume_memory()` to register memory usage. Use `wait_admission()` to
|
||||
/// wait for admission, before reading from the disk. Both methods return a
|
||||
/// `resource_units` RAII object that should be held onto while the respective
|
||||
/// resources are in use.
|
||||
/// Used to track the read's resource consumption. Use `consume_memory()` to
|
||||
/// register memory usage, which returns a `resource_units` RAII object that
|
||||
/// should be held onto while the respective resources are in use.
|
||||
class reader_permit {
|
||||
friend class reader_concurrency_semaphore;
|
||||
|
||||
|
||||
2
table.cc
2
table.cc
@@ -117,7 +117,7 @@ table::make_sstable_reader(schema_ptr s,
|
||||
}
|
||||
}();
|
||||
|
||||
return make_restricted_flat_reader(std::move(ms), std::move(s), std::move(permit), pr, slice, pc, std::move(trace_state), fwd, fwd_mr);
|
||||
return ms.make_reader(std::move(s), std::move(permit), pr, slice, pc, std::move(trace_state), fwd, fwd_mr);
|
||||
}
|
||||
|
||||
lw_shared_ptr<sstables::sstable_set> table::make_compound_sstable_set() {
|
||||
|
||||
@@ -832,26 +832,15 @@ SEASTAR_THREAD_TEST_CASE(test_view_update_generator_buffering) {
|
||||
return less(a.decorated_key(), b.decorated_key());
|
||||
});
|
||||
|
||||
auto permit = sem.obtain_permit_nowait(schema.get(), get_name(), new_reader_base_cost, db::no_timeout).get0();
|
||||
auto permit = sem.obtain_permit(schema.get(), get_name(), new_reader_base_cost, db::no_timeout).get0();
|
||||
|
||||
auto mt = make_lw_shared<memtable>(schema);
|
||||
for (const auto& mut : muts) {
|
||||
mt->apply(mut);
|
||||
}
|
||||
|
||||
auto ms = mutation_source([mt] (
|
||||
schema_ptr s,
|
||||
reader_permit permit,
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& ps,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr ts,
|
||||
streamed_mutation::forwarding fwd_ms,
|
||||
mutation_reader::forwarding fwd_mr) {
|
||||
return make_restricted_flat_reader(mt->as_data_source(), s, std::move(permit), pr, ps, pc, std::move(ts), fwd_ms, fwd_mr);
|
||||
});
|
||||
auto p = make_manually_paused_evictable_reader(
|
||||
std::move(ms),
|
||||
mt->as_data_source(),
|
||||
schema,
|
||||
permit,
|
||||
query::full_partition_range,
|
||||
|
||||
@@ -103,7 +103,7 @@ public:
|
||||
return *_contexts[shard]->semaphore;
|
||||
}
|
||||
virtual future<reader_permit> obtain_reader_permit(schema_ptr schema, const char* const description, db::timeout_clock::time_point timeout) override {
|
||||
return semaphore().obtain_permit_nowait(schema.get(), description, 128 * 1024, timeout);
|
||||
return semaphore().obtain_permit(schema.get(), description, 128 * 1024, timeout);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
Reference in New Issue
Block a user