Merge 'reader_concurrency_semaphore: add set_resources()' from Botond Dénes
Allowing to change the total or initial resources the semaphore has. After calling `set_resources()` the semaphore will look like as if it was created with the specified amount of resources when created. Use the new method in `replica::database::revert_initial_system_read_concurrency_boost()` so it doesn't lead to strange semaphore diagnostics output. Currently the system semaphore has 90/100 count units when there are no reads against it, which has led to some confusion. I also plan on using the new facility in enterprise. Closes #11772 * github.com:scylladb/scylladb: replica/database: revert initial boost to system semaphore with set_resources() reader_concurrency_semaphore: add set_resources()
This commit is contained in:
@@ -1021,6 +1021,13 @@ future<> reader_concurrency_semaphore::with_ready_permit(reader_permit permit, r
|
||||
return fut;
|
||||
}
|
||||
|
||||
void reader_concurrency_semaphore::set_resources(resources r) {
|
||||
auto delta = r - _initial_resources;
|
||||
_initial_resources = r;
|
||||
_resources += delta;
|
||||
maybe_admit_waiters();
|
||||
}
|
||||
|
||||
void reader_concurrency_semaphore::broken(std::exception_ptr ex) {
|
||||
if (!ex) {
|
||||
ex = std::make_exception_ptr(broken_semaphore{});
|
||||
|
||||
@@ -170,7 +170,7 @@ public:
|
||||
};
|
||||
|
||||
private:
|
||||
const resources _initial_resources;
|
||||
resources _initial_resources;
|
||||
resources _resources;
|
||||
|
||||
expiring_fifo<entry, expiry_handler, db::timeout_clock> _wait_list;
|
||||
@@ -401,6 +401,12 @@ public:
|
||||
/// optimal then just using \ref with_permit().
|
||||
future<> with_ready_permit(reader_permit permit, read_func func);
|
||||
|
||||
/// Set the total resources of the semaphore to \p r.
|
||||
///
|
||||
/// After this call, \ref initial_resources() will reflect the new value.
|
||||
/// Available resources will be adjusted by the delta.
|
||||
void set_resources(resources r);
|
||||
|
||||
const resources initial_resources() const {
|
||||
return _initial_resources;
|
||||
}
|
||||
|
||||
@@ -2247,7 +2247,7 @@ future<> database::close_tables(table_kind kind_to_close) {
|
||||
}
|
||||
|
||||
void database::revert_initial_system_read_concurrency_boost() {
|
||||
_system_read_concurrency_sem.consume({database::max_count_concurrent_reads - database::max_count_system_concurrent_reads, 0});
|
||||
_system_read_concurrency_sem.set_resources({database::max_count_system_concurrent_reads, max_memory_system_concurrent_reads()});
|
||||
dblog.debug("Reverted system read concurrency from initial {} to normal {}", database::max_count_concurrent_reads, database::max_count_system_concurrent_reads);
|
||||
}
|
||||
|
||||
|
||||
@@ -1036,3 +1036,40 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_evict_inactive_reads_
|
||||
rd2.set_read_done();
|
||||
fut2.get();
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_set_resources) {
|
||||
const auto initial_resources = reader_concurrency_semaphore::resources{4, 4 * 1024};
|
||||
reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::for_tests{}, get_name(), initial_resources.count, initial_resources.memory);
|
||||
auto stop_sem = deferred_stop(semaphore);
|
||||
|
||||
auto permit1 = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout).get0();
|
||||
auto permit2 = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout).get0();
|
||||
BOOST_REQUIRE_EQUAL(semaphore.available_resources(), reader_resources(2, 2 * 1024));
|
||||
BOOST_REQUIRE_EQUAL(semaphore.initial_resources(), reader_resources(4, 4 * 1024));
|
||||
|
||||
semaphore.set_resources({8, 8 * 1024});
|
||||
BOOST_REQUIRE_EQUAL(semaphore.available_resources(), reader_resources(6, 6 * 1024));
|
||||
BOOST_REQUIRE_EQUAL(semaphore.initial_resources(), reader_resources(8, 8 * 1024));
|
||||
|
||||
semaphore.set_resources({2, 2 * 1024});
|
||||
BOOST_REQUIRE_EQUAL(semaphore.available_resources(), reader_resources(0, 0));
|
||||
BOOST_REQUIRE_EQUAL(semaphore.initial_resources(), reader_resources(2, 2 * 1024));
|
||||
|
||||
semaphore.set_resources({3, 128});
|
||||
BOOST_REQUIRE_EQUAL(semaphore.available_resources(), reader_resources(1, 128 - 2 * 1024));
|
||||
BOOST_REQUIRE_EQUAL(semaphore.initial_resources(), reader_resources(3, 128));
|
||||
|
||||
semaphore.set_resources({1, 3 * 1024});
|
||||
BOOST_REQUIRE_EQUAL(semaphore.available_resources(), reader_resources(-1, 1024));
|
||||
BOOST_REQUIRE_EQUAL(semaphore.initial_resources(), reader_resources(1, 3 * 1024));
|
||||
|
||||
auto permit3_fut = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().reads_enqueued, 1);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.waiters(), 1);
|
||||
|
||||
semaphore.set_resources({4, 4 * 1024});
|
||||
BOOST_REQUIRE_EQUAL(semaphore.waiters(), 0);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.available_resources(), reader_resources(1, 1024));
|
||||
BOOST_REQUIRE_EQUAL(semaphore.initial_resources(), reader_resources(4, 4 * 1024));
|
||||
permit3_fut.get();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user