reader_concurrency_semaphore: add protection against negative count resource leaks
The semaphore has detection and protection against regular resource leaks, where some resources go unaccounted for and are not released by the time the semaphore is destroyed. There is no detection or protection against negative leaks: where resources are "made up" of thin air. This kind of leaks looks benign at first sight, a few extra resources won't hurt anyone so long as this is a small amount. But turns out that even a single extra count resource can defeat a very important anti-deadlock protection in can_admit_read(): the special case which admits a new permit regardless of memory resources, when all original count resources all available. This check uses ==, so if resource > original, the protection is defeated indefinitely. Instead of just changing == to >=, we add detection of such negative leaks to signal(), via on_internal_error_noexcept(). At this time I still don't now how this negative leak happens (the code doesn't confess), with this detection, hopefully we'll get a clue from tests or the field. Note that on_internal_error_noexcept() will not generate a coredump, unless ScyllaDB is explicitely configured to do so. In production, it will just generate an error log with a backtrace. The detection also clams the _resources to _initial_resources, to prevent any damage from the negativae leak. I just noticed that there is no unit test for the deadlock protection described above, so one is added in this PR, even if only loosely related to the rest of the patch. Fixes: SCYLLADB-163 Closes scylladb/scylladb#27764
This commit is contained in:
@@ -1020,6 +1020,13 @@ void reader_concurrency_semaphore::consume(reader_permit::impl& permit, resource
|
||||
|
||||
void reader_concurrency_semaphore::signal(const resources& r) noexcept {
|
||||
_resources += r;
|
||||
if (_resources.count > _initial_resources.count || _resources.memory > _initial_resources.memory) [[unlikely]] {
|
||||
on_internal_error_noexcept(rcslog,
|
||||
format("reader_concurrency_semaphore::signal(): semaphore {} detected resource leak, available {} exceeds initial {}", _name,
|
||||
_resources, _initial_resources));
|
||||
_resources.count = std::max(_resources.count, _initial_resources.count);
|
||||
_resources.memory = std::max(_resources.memory, _initial_resources.memory);
|
||||
}
|
||||
maybe_wake_execution_loop();
|
||||
}
|
||||
|
||||
|
||||
@@ -691,7 +691,8 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_stop_waits_on_permits
|
||||
|
||||
static void require_can_admit(schema_ptr schema, reader_concurrency_semaphore& semaphore, bool expected_can_admit, const char* description,
|
||||
std::source_location sl = std::source_location::current()) {
|
||||
testlog.trace("Running admission scenario {}, with exepcted_can_admit={}", description, expected_can_admit);
|
||||
testlog.trace("Running admission scenario {}, with expected_can_admit={}, available resources on the semaphore: {}", description,
|
||||
expected_can_admit, semaphore.available_resources());
|
||||
const auto stats_before = semaphore.get_stats();
|
||||
|
||||
auto admit_fut = semaphore.obtain_permit(schema, "require_can_admit", 1024, db::timeout_clock::now(), {});
|
||||
@@ -2373,4 +2374,44 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_double_permit_abort)
|
||||
BOOST_REQUIRE_THROW(requested_memory2_fut.get(), named_semaphore_timed_out);
|
||||
}
|
||||
|
||||
/// Test that if no count resources are currently used, a single permit is always admitted regardless of available memory.
|
||||
SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_always_admit_one_permit) {
|
||||
simple_schema s;
|
||||
const auto schema = s.schema();
|
||||
|
||||
const std::string test_name = get_name();
|
||||
|
||||
reader_concurrency_semaphore semaphore(
|
||||
utils::updateable_value<int>(2),
|
||||
2048,
|
||||
test_name + " semaphore",
|
||||
std::numeric_limits<size_t>::max(),
|
||||
utils::updateable_value<uint32_t>(200),
|
||||
utils::updateable_value<uint32_t>(400),
|
||||
utils::updateable_value<uint32_t>(1),
|
||||
reader_concurrency_semaphore::register_metrics::no);
|
||||
auto stop_sem = deferred_stop(semaphore);
|
||||
|
||||
// Scenario1: all memory use used by tracking permit (not consuming count resources)
|
||||
{
|
||||
auto permit = semaphore.make_tracking_only_permit(schema, test_name, db::no_timeout, {});
|
||||
auto res = permit.consume_memory(4096);
|
||||
|
||||
require_can_admit(schema, semaphore, true, "all memory used, but one permit should always be admitted");
|
||||
}
|
||||
|
||||
// Scenario2: all memory use used by evicted permit (recouped count resource)
|
||||
{
|
||||
auto permit = semaphore.obtain_permit(schema, test_name, 1024, db::no_timeout, {}).get();
|
||||
auto res = permit.consume_memory(4096);
|
||||
|
||||
require_can_admit(schema, semaphore, false, "all memory used, cannot admit");
|
||||
|
||||
auto irh = semaphore.register_inactive_read(make_empty_mutation_reader(s.schema(), permit));
|
||||
BOOST_REQUIRE(!irh);
|
||||
|
||||
require_can_admit(schema, semaphore, true, "all memory used, but one permit should always be admitted");
|
||||
}
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_SUITE_END()
|
||||
|
||||
@@ -38,7 +38,7 @@ custom_args:
|
||||
cql_query_test:
|
||||
- '-c2 -m2G --fail-on-abandoned-failed-futures=true'
|
||||
reader_concurrency_semaphore_test:
|
||||
- '-c1 -m256M'
|
||||
- '-c1 -m256M --logger-log-level testlog=trace:reader_concurrency_semaphore=trace'
|
||||
multishard_query_test:
|
||||
- '-c2 -m3G'
|
||||
cache_algorithm_test:
|
||||
|
||||
Reference in New Issue
Block a user