reader_concurrency_semaphore: improve handling of base resources

reader_permit::release_base_resources() is a soft evict for the permit:
it releases the resources aquired during admission. This is used in
cases where a single process owns multiple permits, creating a risk for
deadlock, like it is the case for repair. In this case,
release_base_resources() acts as a manual eviction mechanism to prevent
permits blockings each other from admission.

Recently we found a bad interaction between release_base_resources() and
permit eviction. Repair uses both mechanism: it marks its permits as
inactive and later it also uses release_base_resources(). This partice
might be worth reconsidering, but the fact remains that there is a bug
in the reader permit which causes the base resources to be released
twice when release_base_resources() is called on an already evicted
permit. This is incorrect and is fixed in this patch.

Improve release_base_resources():
* make _base_resources const
* move signal call into the if (_base_resources_consumed()) { }
* use reader_permit::impl::signal() instead of
  reader_concurrency_semaphore::signal()
* all places where base resources are released now call
  release_base_resources()

A reproducer unit test is added, which fails before and passes after the
fix.

Fixes: #28083

Closes scylladb/scylladb#28155

(cherry picked from commit b7bc48e7b7)

Closes scylladb/scylladb#28242
This commit is contained in:
Botond Dénes
2026-01-14 15:55:44 +02:00
parent 94607c36e0
commit cb6b20e197
2 changed files with 78 additions and 10 deletions

View File

@@ -153,7 +153,7 @@ private:
sstring _op_name;
std::string_view _op_name_view;
reader_resources _base_resources;
const reader_resources _base_resources;
bool _base_resources_consumed = false;
reader_resources _resources;
reader_permit::state _state = reader_permit::state::active;
@@ -270,9 +270,7 @@ public:
_semaphore.on_permit_created(*this);
}
~impl() {
if (_base_resources_consumed) {
signal(_base_resources);
}
release_base_resources();
if (_resources.non_zero()) {
on_internal_error_noexcept(rcslog, format("reader_permit::impl::~impl(): permit {} detected a leak of {{count={}, memory={}}} resources",
@@ -342,6 +340,11 @@ public:
void on_admission() {
SCYLLA_ASSERT(_state != reader_permit::state::active_await);
on_permit_active();
if (_base_resources_consumed) {
on_internal_error(rcslog, fmt::format("on_admission(): permit {} already has its base resources consumed", description()));
}
consume(_base_resources);
_base_resources_consumed = true;
}
@@ -370,10 +373,7 @@ public:
void on_evicted() {
SCYLLA_ASSERT(_state == reader_permit::state::inactive);
_state = reader_permit::state::evicted;
if (_base_resources_consumed) {
signal(_base_resources);
_base_resources_consumed = false;
}
release_base_resources();
}
void consume(reader_resources res) {
@@ -403,10 +403,9 @@ public:
void release_base_resources() noexcept {
if (_base_resources_consumed) {
_resources -= _base_resources;
_base_resources_consumed = false;
signal(_base_resources);
}
_semaphore.signal(std::exchange(_base_resources, {}));
}
sstring description() const {

View File

@@ -2414,4 +2414,73 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_always_admit_one_perm
}
}
SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_release_base_resources) {
simple_schema s;
const auto schema = s.schema();
const std::string test_name = get_name();
const auto total_resources = reader_resources{2, 2048};
reader_concurrency_semaphore semaphore(
utils::updateable_value<int>(total_resources.count),
total_resources.memory,
test_name + " semaphore",
std::numeric_limits<size_t>::max(),
utils::updateable_value<uint32_t>(200),
utils::updateable_value<uint32_t>(400),
utils::updateable_value<uint32_t>(1),
reader_concurrency_semaphore::register_metrics::no);
auto stop_sem = deferred_stop(semaphore);
const auto expected_base_resources = reader_resources{1, 1024};
const auto expected_available_resources = total_resources - expected_base_resources;
const auto expected_consumed_resources = expected_base_resources;
{
auto permit = semaphore.obtain_permit(s.schema(), get_name(), 1024, db::no_timeout, {}).get();
BOOST_REQUIRE_EQUAL(permit.base_resources(), expected_base_resources);
BOOST_REQUIRE_EQUAL(permit.consumed_resources(), permit.base_resources());
BOOST_REQUIRE_EQUAL(semaphore.available_resources(), expected_available_resources);
BOOST_REQUIRE_EQUAL(semaphore.consumed_resources(), expected_consumed_resources);
permit.release_base_resources();
// Should be unchanged, it is just not consumed
BOOST_REQUIRE_EQUAL(permit.base_resources(), expected_base_resources);
BOOST_REQUIRE_EQUAL(permit.consumed_resources(), reader_resources{});
BOOST_REQUIRE_EQUAL(semaphore.available_resources(), total_resources);
BOOST_REQUIRE_EQUAL(semaphore.consumed_resources(), reader_resources{});
}
{
auto permit = semaphore.obtain_permit(s.schema(), get_name(), 1024, db::no_timeout, {}).get();
auto irh = semaphore.register_inactive_read(make_empty_mutation_reader(s.schema(), permit));
BOOST_REQUIRE(irh);
BOOST_REQUIRE(semaphore.try_evict_one_inactive_read());
BOOST_REQUIRE(!irh);
BOOST_REQUIRE_EQUAL(permit.consumed_resources(), reader_resources{});
BOOST_REQUIRE_EQUAL(permit.base_resources(), expected_base_resources);
BOOST_REQUIRE_EQUAL(semaphore.available_resources(), total_resources);
BOOST_REQUIRE_EQUAL(semaphore.consumed_resources(), reader_resources{});
// Should be no-op
permit.release_base_resources();
// Should be unchanged, it is just not consumed
BOOST_REQUIRE_EQUAL(permit.base_resources(), expected_base_resources);
BOOST_REQUIRE_EQUAL(permit.consumed_resources(), reader_resources{});
BOOST_REQUIRE_EQUAL(semaphore.available_resources(), total_resources);
BOOST_REQUIRE_EQUAL(semaphore.consumed_resources(), reader_resources{});
}
}
BOOST_AUTO_TEST_SUITE_END()