reader_concurrency_semaphore: add OOM killer

When the collective memory consumption of all readers goes above
$kill_limit_multiplier * $memory_limit, consume() will throw
std::bad_alloc(), instantly unwinding the read that is unlucky enough
to have requested the last bytes of memory. This should help situation
where there are some problematic partitions, either because of large
cells or because they are scattered in too many sstables. Currently
nothing prevents such reads from bringing down the entire node via OOM.
This commit is contained in:
Botond Dénes
2022-11-22 12:06:07 +02:00
parent 81e2a2be7d
commit edb32cb171
2 changed files with 33 additions and 4 deletions

View File

@@ -92,6 +92,7 @@ class reader_permit::impl
uint64_t _sstables_read = 0;
size_t _requested_memory = 0;
std::optional<shared_future<>> _memory_future;
uint64_t _oom_kills = 0;
private:
void on_permit_used() {
@@ -256,7 +257,7 @@ public:
}
void consume(reader_resources res) {
_semaphore.consume(res);
_semaphore.consume(*this, res);
_resources += res;
}
@@ -386,6 +387,10 @@ public:
--_semaphore._stats.disk_reads;
}
}
bool on_oom_kill() noexcept {
return !bool(_oom_kills++);
}
};
static_assert(std::is_nothrow_copy_constructible_v<reader_permit>);
@@ -721,13 +726,30 @@ future<> reader_concurrency_semaphore::execution_loop() noexcept {
}
uint64_t reader_concurrency_semaphore::get_serialize_limit() const {
if (!_serialize_limit_multiplier() || _serialize_limit_multiplier() == std::numeric_limits<uint32_t>::max() || is_unlimited()) {
if (!_serialize_limit_multiplier() || _serialize_limit_multiplier() == std::numeric_limits<uint32_t>::max() || is_unlimited()) [[unlikely]] {
return std::numeric_limits<uint64_t>::max();
}
return _initial_resources.memory * _serialize_limit_multiplier();
}
void reader_concurrency_semaphore::consume(resources r) {
uint64_t reader_concurrency_semaphore::get_kill_limit() const {
if (!_kill_limit_multiplier() || _kill_limit_multiplier() == std::numeric_limits<uint32_t>::max() || is_unlimited()) [[unlikely]] {
return std::numeric_limits<uint64_t>::max();
}
return _initial_resources.memory * _kill_limit_multiplier();
}
void reader_concurrency_semaphore::consume(reader_permit::impl& permit, resources r) {
// We check whether we even reached the memory limit first.
// This is a cheap check and should be false most of the time, providing a
// cheap short-circuit.
if (_resources.memory <= 0 && (consumed_resources().memory + r.memory) >= get_kill_limit()) [[unlikely]] {
if (permit.on_oom_kill()) {
++_stats.total_reads_killed_due_to_kill_limit;
}
maybe_dump_reader_permit_diagnostics(*this, _permit_list, "kill limit triggered");
throw std::bad_alloc();
}
_resources -= r;
}
@@ -1001,6 +1023,9 @@ reader_concurrency_semaphore::can_admit
reader_concurrency_semaphore::can_admit_read(const reader_permit& permit) const noexcept {
if (_resources.memory < 0) [[unlikely]] {
const auto consumed_memory = consumed_resources().memory;
if (consumed_memory >= get_kill_limit()) {
return can_admit::no;
}
if (consumed_memory >= get_serialize_limit()) {
if (_blessed_permit) {
// blessed permit is never in the wait list

View File

@@ -72,6 +72,8 @@ public:
uint64_t total_failed_reads = 0;
// Total number of reads rejected because the admission queue reached its max capacity
uint64_t total_reads_shed_due_to_overload = 0;
// Total number of reads killed due to the memory consumption reaching the kill limit.
uint64_t total_reads_killed_due_to_kill_limit = 0;
// Total number of reads admitted, via all admission paths.
uint64_t reads_admitted = 0;
// Total number of reads enqueued to wait for admission.
@@ -287,8 +289,10 @@ private:
future<> execution_loop() noexcept;
uint64_t get_serialize_limit() const;
uint64_t get_kill_limit() const;
void consume(resources r);
// Throws std::bad_alloc if memory consumed is oom_kill_limit_multiply_threshold more than the memory limit.
void consume(reader_permit::impl& permit, resources r);
void signal(const resources& r) noexcept;
public: