reader_concurrency_semaphore: make count parameter live-update

So that the amount of count resources can be changed at run-time,
triggered by a e.g. a config change.
Previous constant-count based constructor is left intact, to avoid
patching all clients, as only a small subset will want the new
functionality.
This commit is contained in:
Botond Dénes
2024-06-12 02:54:38 -04:00
parent c752bda0a2
commit ba0cc29d82
3 changed files with 44 additions and 6 deletions

View File

@@ -983,10 +983,11 @@ void reader_concurrency_semaphore::signal(const resources& r) noexcept {
namespace sm = seastar::metrics;
static const sm::label class_label("class");
reader_concurrency_semaphore::reader_concurrency_semaphore(int count, ssize_t memory, sstring name, size_t max_queue_length,
reader_concurrency_semaphore::reader_concurrency_semaphore(utils::updateable_value<int> count, ssize_t memory, sstring name, size_t max_queue_length,
utils::updateable_value<uint32_t> serialize_limit_multiplier, utils::updateable_value<uint32_t> kill_limit_multiplier, register_metrics metrics)
: _initial_resources(count, memory)
, _resources(count, memory)
: _initial_resources(count(), memory)
, _resources(count(), memory)
, _count_observer(count.observe([this] (const int& new_count) { set_resources({new_count, _initial_resources.memory}); }))
, _name(std::move(name))
, _max_queue_length(max_queue_length)
, _serialize_limit_multiplier(std::move(serialize_limit_multiplier))
@@ -1049,7 +1050,7 @@ reader_concurrency_semaphore::reader_concurrency_semaphore(int count, ssize_t me
reader_concurrency_semaphore::reader_concurrency_semaphore(no_limits, sstring name, register_metrics metrics)
: reader_concurrency_semaphore(
std::numeric_limits<int>::max(),
utils::updateable_value(std::numeric_limits<int>::max()),
std::numeric_limits<ssize_t>::max(),
std::move(name),
std::numeric_limits<size_t>::max(),

View File

@@ -161,6 +161,7 @@ public:
private:
resources _initial_resources;
resources _resources;
utils::observer<int> _count_observer;
struct wait_queue {
// Stores entries for permits waiting to be admitted.
@@ -273,7 +274,8 @@ public:
/// Create a semaphore with the specified limits
///
/// The semaphore's name has to be unique!
reader_concurrency_semaphore(int count,
reader_concurrency_semaphore(
utils::updateable_value<int> count,
ssize_t memory,
sstring name,
size_t max_queue_length,
@@ -281,6 +283,18 @@ public:
utils::updateable_value<uint32_t> kill_limit_multiplier,
register_metrics metrics);
reader_concurrency_semaphore(
int count,
ssize_t memory,
sstring name,
size_t max_queue_length,
utils::updateable_value<uint32_t> serialize_limit_multiplier,
utils::updateable_value<uint32_t> kill_limit_multiplier,
register_metrics metrics)
: reader_concurrency_semaphore(utils::updateable_value(count), memory, std::move(name), max_queue_length,
std::move(serialize_limit_multiplier), std::move(kill_limit_multiplier), metrics)
{ }
/// Create a semaphore with practically unlimited count and memory.
///
/// And conversely, no queue limit either.
@@ -298,7 +312,7 @@ public:
utils::updateable_value<uint32_t> serialize_limit_multipler = utils::updateable_value(std::numeric_limits<uint32_t>::max()),
utils::updateable_value<uint32_t> kill_limit_multipler = utils::updateable_value(std::numeric_limits<uint32_t>::max()),
register_metrics metrics = register_metrics::no)
: reader_concurrency_semaphore(count, memory, std::move(name), max_queue_length, std::move(serialize_limit_multipler),
: reader_concurrency_semaphore(utils::updateable_value<uint32_t>(count), memory, std::move(name), max_queue_length, std::move(serialize_limit_multipler),
std::move(kill_limit_multipler), register_metrics::no)
{}

View File

@@ -1986,3 +1986,26 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_execution_stage_wakeu
permit2_fut.get();
}
SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_live_update_count) {
utils::updateable_value_source<int> count{1};
const uint32_t initial_memory = 4 * 1024;
const auto serialize_multiplier = std::numeric_limits<uint32_t>::max();
const auto kill_multiplier = std::numeric_limits<uint32_t>::max();
reader_concurrency_semaphore semaphore(
utils::updateable_value(count),
initial_memory,
get_name(),
100,
utils::updateable_value<uint32_t>(serialize_multiplier),
utils::updateable_value<uint32_t>(kill_multiplier),
reader_concurrency_semaphore::register_metrics::no);
auto stop_sem = deferred_stop(semaphore);
BOOST_REQUIRE_EQUAL(semaphore.initial_resources(), reader_resources(count(), initial_memory));
count.set(10);
BOOST_REQUIRE_EQUAL(semaphore.initial_resources(), reader_resources(count(), initial_memory));
}