reader_concurrency_semaphore: handle reader blocked on memory becoming inactive
Kill said read's memory requests with std::bad_alloc and dequeue it from the memory wait list, then evict it on the spot. Now that `_inactive_reads` just store permits, we can do this easily.
This commit is contained in:
@@ -314,7 +314,7 @@ public:
|
||||
}
|
||||
|
||||
void on_register_as_inactive() {
|
||||
assert(_state == reader_permit::state::active_unused || _state == reader_permit::state::active_used);
|
||||
assert(_state == reader_permit::state::active_unused || _state == reader_permit::state::active_used || _state == reader_permit::state::waiting_for_memory);
|
||||
on_permit_inactive(reader_permit::state::inactive);
|
||||
}
|
||||
|
||||
@@ -896,6 +896,11 @@ reader_concurrency_semaphore::~reader_concurrency_semaphore() {
|
||||
|
||||
reader_concurrency_semaphore::inactive_read_handle reader_concurrency_semaphore::register_inactive_read(flat_mutation_reader_v2 reader) noexcept {
|
||||
auto& permit = reader.permit();
|
||||
if (permit->get_state() == reader_permit::state::waiting_for_memory) {
|
||||
// Kill all outstanding memory requests, the read is going to be evicted.
|
||||
permit->aux_data().pr.set_exception(std::make_exception_ptr(std::bad_alloc{}));
|
||||
dequeue_permit(*permit);
|
||||
}
|
||||
permit->on_register_as_inactive();
|
||||
if (_blessed_permit == &*permit) {
|
||||
_blessed_permit = nullptr;
|
||||
|
||||
@@ -1645,3 +1645,39 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_stop_with_inactive_re
|
||||
permit = {};
|
||||
stop_f.get();
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_permit_waiting_for_memory_goes_inactive) {
|
||||
const auto initial_resources = reader_concurrency_semaphore::resources{2, 2 * 1024};
|
||||
const auto serialize_multiplier = 2;
|
||||
const auto kill_multiplier = std::numeric_limits<uint32_t>::max(); // we don't want this to interfere with our test
|
||||
reader_concurrency_semaphore semaphore(initial_resources.count, initial_resources.memory, get_name(), 100,
|
||||
utils::updateable_value<uint32_t>(serialize_multiplier), utils::updateable_value<uint32_t>(kill_multiplier));
|
||||
auto stop_sem = deferred_stop(semaphore);
|
||||
|
||||
auto permit1 = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout).get0();
|
||||
auto permit2 = semaphore.obtain_permit(nullptr, get_name(), 1024, db::no_timeout).get0();
|
||||
|
||||
std::vector<reader_permit::resource_units> res;
|
||||
|
||||
res.emplace_back(permit1.consume_memory(2048));
|
||||
res.emplace_back(permit2.consume_memory(2048));
|
||||
|
||||
res.emplace_back(permit1.request_memory(1024).get());
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_blessed_permit(), permit1.id());
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().reads_enqueued_for_memory, 0);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().waiters, 0);
|
||||
|
||||
auto res_fut = permit2.request_memory(1024);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().reads_enqueued_for_memory, 1);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().waiters, 1);
|
||||
|
||||
simple_schema ss;
|
||||
auto s = ss.schema();
|
||||
auto handle = semaphore.register_inactive_read(make_empty_flat_reader_v2(s, permit2));
|
||||
|
||||
// permit2 should have been evicted, its memory requests killed with std::bad_alloc
|
||||
BOOST_REQUIRE(!handle);
|
||||
BOOST_REQUIRE_EQUAL(semaphore.get_stats().permit_based_evictions, 1);
|
||||
BOOST_REQUIRE_EQUAL(permit2.get_state(), reader_permit::state::evicted);
|
||||
BOOST_REQUIRE_THROW(res_fut.get(), std::bad_alloc);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user