reader_concurrency_semaphore: propagate permit to do_dump_reader_permit_diagnostics()

Will be used in the next patch.
This commit is contained in:
Botond Dénes
2024-09-10 08:23:43 -04:00
parent 67565a5eee
commit c044904f07

View File

@@ -69,7 +69,7 @@ struct fmt::formatter<reader_concurrency_semaphore::evict_reason> : fmt::formatt
namespace {
void maybe_dump_reader_permit_diagnostics(const reader_concurrency_semaphore& semaphore, std::string_view problem) noexcept;
void maybe_dump_reader_permit_diagnostics(const reader_concurrency_semaphore& semaphore, std::string_view problem, reader_permit::impl* permit) noexcept;
}
@@ -234,13 +234,13 @@ private:
case state::waiting_for_memory:
case state::waiting_for_execution:
_aux_data.pr.set_exception(ex);
maybe_dump_reader_permit_diagnostics(_semaphore, "timed out");
maybe_dump_reader_permit_diagnostics(_semaphore, "timed out", this);
_semaphore.dequeue_permit(*this);
break;
case state::active:
case state::active_need_cpu:
case state::active_await:
maybe_dump_reader_permit_diagnostics(_semaphore, "timed out");
maybe_dump_reader_permit_diagnostics(_semaphore, "timed out", this);
break;
case state::inactive:
_semaphore.evict(*this, reader_concurrency_semaphore::evict_reason::time);
@@ -780,7 +780,8 @@ static permit_stats do_dump_reader_permit_diagnostics(std::ostream& os, const pe
return total;
}
static void do_dump_reader_permit_diagnostics(std::ostream& os, const reader_concurrency_semaphore& semaphore, std::string_view problem, unsigned max_lines = 20) {
static void do_dump_reader_permit_diagnostics(std::ostream& os, const reader_concurrency_semaphore& semaphore, std::string_view problem,
reader_permit::impl* permit, unsigned max_lines = 20) {
permit_groups permits;
semaphore.foreach_permit([&] (const reader_permit::impl& permit) {
@@ -846,12 +847,12 @@ static void do_dump_reader_permit_diagnostics(std::ostream& os, const reader_con
stats.sstables_read);
}
void maybe_dump_reader_permit_diagnostics(const reader_concurrency_semaphore& semaphore, std::string_view problem) noexcept {
void maybe_dump_reader_permit_diagnostics(const reader_concurrency_semaphore& semaphore, std::string_view problem, reader_permit::impl* permit) noexcept {
static thread_local logger::rate_limit rate_limit(std::chrono::seconds(30));
rcslog.log(log_level::info, rate_limit, "{}", value_of([&] {
std::ostringstream os;
do_dump_reader_permit_diagnostics(os, semaphore, problem);
do_dump_reader_permit_diagnostics(os, semaphore, problem, permit);
return std::move(os).str();
}));
}
@@ -984,7 +985,7 @@ void reader_concurrency_semaphore::consume(reader_permit::impl& permit, resource
if (permit.on_oom_kill()) {
++_stats.total_reads_killed_due_to_kill_limit;
}
maybe_dump_reader_permit_diagnostics(*this, "kill limit triggered");
maybe_dump_reader_permit_diagnostics(*this, "kill limit triggered", &permit);
throw utils::memory_limit_reached(format("kill limit triggered on semaphore {} by permit {}", _name, permit.description()));
}
_resources -= r;
@@ -1296,7 +1297,7 @@ bool reader_concurrency_semaphore::cpu_concurrency_limit_reached() const {
std::exception_ptr reader_concurrency_semaphore::check_queue_size(std::string_view queue_name) {
if (_stats.waiters >= _max_queue_length) {
_stats.total_reads_shed_due_to_overload++;
maybe_dump_reader_permit_diagnostics(*this, fmt::format("{} queue overload", queue_name));
maybe_dump_reader_permit_diagnostics(*this, fmt::format("{} queue overload", queue_name), nullptr);
return std::make_exception_ptr(std::runtime_error(fmt::format("{}: {} queue overload", _name, queue_name)));
}
return {};
@@ -1431,7 +1432,7 @@ future<> reader_concurrency_semaphore::do_wait_admission(reader_permit::impl& pe
// Normally, the semaphore should admit waiters as soon as it can.
// So at any point in time, there should either be no waiters, or it
// shouldn't be able to admit new reads. Otherwise something went wrong.
maybe_dump_reader_permit_diagnostics(*this, "semaphore could admit new reads yet there are waiters");
maybe_dump_reader_permit_diagnostics(*this, "semaphore could admit new reads yet there are waiters", nullptr);
maybe_admit_waiters();
} else if (admit == can_admit::maybe) {
tracing::trace(permit.trace_state(), "[reader concurrency semaphore {}] evicting inactive reads in the background to free up resources", _name);
@@ -1637,7 +1638,7 @@ void reader_concurrency_semaphore::broken(std::exception_ptr ex) {
std::string reader_concurrency_semaphore::dump_diagnostics(unsigned max_lines) const {
std::ostringstream os;
do_dump_reader_permit_diagnostics(os, *this, "user request", max_lines);
do_dump_reader_permit_diagnostics(os, *this, "user request", nullptr, max_lines);
return std::move(os).str();
}