reader_concurrency_semaphore: store permits directly in queues
Instead of the `entry` wrapper. In _wait_list and _ready_list, that is. Data stored in the `entry` wrapper is moved to a new `reader_permit::auxiliary_data` type. This makes the reader permit self-sufficient. This in turn prepares the ground for the ability to de-queue a permit from any queue, with nothing but a permit reference at hand: no need to have back pointer to wrappers and/or iterators.
This commit is contained in:
@@ -75,6 +75,14 @@ void reader_permit::resource_units::reset(reader_resources res) {
|
||||
class reader_permit::impl
|
||||
: public boost::intrusive::list_base_hook<boost::intrusive::link_mode<boost::intrusive::auto_unlink>>
|
||||
, public enable_shared_from_this<reader_permit::impl> {
|
||||
public:
|
||||
struct auxiliary_data {
|
||||
promise<> pr;
|
||||
std::optional<shared_future<>> fut;
|
||||
reader_concurrency_semaphore::read_func func;
|
||||
};
|
||||
|
||||
private:
|
||||
reader_concurrency_semaphore& _semaphore;
|
||||
const schema* _schema;
|
||||
sstring _op_name;
|
||||
@@ -92,9 +100,12 @@ class reader_permit::impl
|
||||
query::max_result_size _max_result_size{query::result_memory_limiter::unlimited_result_size};
|
||||
uint64_t _sstables_read = 0;
|
||||
size_t _requested_memory = 0;
|
||||
std::optional<shared_future<>> _memory_future;
|
||||
uint64_t _oom_kills = 0;
|
||||
|
||||
// Not strictly related to the permit.
|
||||
// Used by the semaphore to to manage the permit.
|
||||
auxiliary_data _aux_data;
|
||||
|
||||
private:
|
||||
void on_permit_used() {
|
||||
_semaphore.on_permit_used();
|
||||
@@ -217,13 +228,16 @@ public:
|
||||
return _state;
|
||||
}
|
||||
|
||||
auxiliary_data& aux_data() {
|
||||
return _aux_data;
|
||||
}
|
||||
|
||||
void on_waiting_for_admission() {
|
||||
on_permit_inactive(reader_permit::state::waiting_for_admission);
|
||||
}
|
||||
|
||||
void on_waiting_for_memory(future<> fut) {
|
||||
void on_waiting_for_memory() {
|
||||
on_permit_inactive(reader_permit::state::waiting_for_memory);
|
||||
_memory_future.emplace(std::move(fut));
|
||||
}
|
||||
|
||||
void on_admission() {
|
||||
@@ -240,10 +254,6 @@ public:
|
||||
consume({0, std::exchange(_requested_memory, 0)});
|
||||
}
|
||||
|
||||
future<> get_memory_future() {
|
||||
return _memory_future->get_future();
|
||||
}
|
||||
|
||||
void on_register_as_inactive() {
|
||||
assert(_state == reader_permit::state::active_unused || _state == reader_permit::state::active_used);
|
||||
on_permit_inactive(reader_permit::state::inactive);
|
||||
@@ -668,8 +678,8 @@ static void maybe_dump_reader_permit_diagnostics(const reader_concurrency_semaph
|
||||
|
||||
} // anonymous namespace
|
||||
|
||||
void reader_concurrency_semaphore::expiry_handler::operator()(entry& e) noexcept {
|
||||
e.pr.set_exception(named_semaphore_timed_out(_semaphore._name));
|
||||
void reader_concurrency_semaphore::expiry_handler::operator()(reader_permit& p) noexcept {
|
||||
p->aux_data().pr.set_exception(named_semaphore_timed_out(_semaphore._name));
|
||||
--_semaphore._stats.waiters;
|
||||
|
||||
maybe_dump_reader_permit_diagnostics(_semaphore, "timed out");
|
||||
@@ -709,11 +719,12 @@ future<> reader_concurrency_semaphore::execution_loop() noexcept {
|
||||
}
|
||||
|
||||
while (!_ready_list.empty()) {
|
||||
auto e = _ready_list.pop();
|
||||
auto permit = _ready_list.pop();
|
||||
auto e = std::move(permit->aux_data());
|
||||
--_stats.waiters;
|
||||
|
||||
try {
|
||||
e.func(std::move(e.permit)).forward_to(std::move(e.pr));
|
||||
e.func(std::move(permit)).forward_to(std::move(e.pr));
|
||||
} catch (...) {
|
||||
e.pr.set_exception(std::current_exception());
|
||||
}
|
||||
@@ -984,21 +995,23 @@ std::exception_ptr reader_concurrency_semaphore::check_queue_size(std::string_vi
|
||||
return {};
|
||||
}
|
||||
|
||||
future<> reader_concurrency_semaphore::enqueue_waiter(reader_permit permit, read_func func, wait_on wait) {
|
||||
future<> reader_concurrency_semaphore::enqueue_waiter(reader_permit permit, wait_on wait) {
|
||||
if (auto ex = check_queue_size("wait")) {
|
||||
return make_exception_future<>(std::move(ex));
|
||||
}
|
||||
promise<> pr;
|
||||
auto fut = pr.get_future();
|
||||
auto& ad = permit->aux_data();
|
||||
ad.pr = {};
|
||||
auto fut = ad.pr.get_future();
|
||||
auto timeout = permit.timeout();
|
||||
if (wait == wait_on::admission) {
|
||||
permit->on_waiting_for_admission();
|
||||
_wait_list.push_to_admission_queue(entry(std::move(pr), std::move(permit), std::move(func)), timeout);
|
||||
_wait_list.push_to_admission_queue(std::move(permit), timeout);
|
||||
++_stats.reads_enqueued_for_admission;
|
||||
} else {
|
||||
permit->on_waiting_for_memory(std::move(fut));
|
||||
fut = permit->get_memory_future();
|
||||
_wait_list.push_to_memory_queue(entry(std::move(pr), std::move(permit), std::move(func)), timeout);
|
||||
permit->on_waiting_for_memory();
|
||||
ad.fut.emplace(std::move(fut));
|
||||
fut = ad.fut->get_future();
|
||||
_wait_list.push_to_memory_queue(std::move(permit), timeout);
|
||||
++_stats.reads_enqueued_for_memory;
|
||||
}
|
||||
++_stats.waiters;
|
||||
@@ -1066,14 +1079,14 @@ reader_concurrency_semaphore::can_admit_read(const reader_permit& permit) const
|
||||
return can_admit::yes;
|
||||
}
|
||||
|
||||
future<> reader_concurrency_semaphore::do_wait_admission(reader_permit permit, read_func func) {
|
||||
future<> reader_concurrency_semaphore::do_wait_admission(reader_permit permit) {
|
||||
if (!_execution_loop_future) {
|
||||
_execution_loop_future.emplace(execution_loop());
|
||||
}
|
||||
|
||||
const auto admit = can_admit_read(permit);
|
||||
if (admit != can_admit::yes || !_wait_list.empty()) {
|
||||
auto fut = enqueue_waiter(std::move(permit), std::move(func), wait_on::admission);
|
||||
auto fut = enqueue_waiter(std::move(permit), wait_on::admission);
|
||||
if (admit == can_admit::yes && !_wait_list.empty()) {
|
||||
// This is a contradiction: the semaphore could admit waiters yet it has waiters.
|
||||
// Normally, the semaphore should admit waiters as soon as it can.
|
||||
@@ -1089,33 +1102,33 @@ future<> reader_concurrency_semaphore::do_wait_admission(reader_permit permit, r
|
||||
|
||||
permit->on_admission();
|
||||
++_stats.reads_admitted;
|
||||
if (func) {
|
||||
return with_ready_permit(std::move(permit), std::move(func));
|
||||
if (permit->aux_data().func) {
|
||||
return with_ready_permit(std::move(permit));
|
||||
}
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
void reader_concurrency_semaphore::maybe_admit_waiters() noexcept {
|
||||
auto admit = can_admit::no;
|
||||
while (!_wait_list.empty() && (admit = can_admit_read(_wait_list.front().permit)) == can_admit::yes) {
|
||||
auto& x = _wait_list.front();
|
||||
while (!_wait_list.empty() && (admit = can_admit_read(_wait_list.front())) == can_admit::yes) {
|
||||
auto& permit = *_wait_list.front();
|
||||
try {
|
||||
if (x.permit.get_state() == reader_permit::state::waiting_for_memory) {
|
||||
_blessed_permit = &*x.permit;
|
||||
x.permit->on_granted_memory();
|
||||
if (permit.get_state() == reader_permit::state::waiting_for_memory) {
|
||||
_blessed_permit = &permit;
|
||||
permit.on_granted_memory();
|
||||
} else {
|
||||
x.permit->on_admission();
|
||||
permit.on_admission();
|
||||
++_stats.reads_admitted;
|
||||
}
|
||||
if (x.func) {
|
||||
_ready_list.push(std::move(x));
|
||||
if (permit.aux_data().func) {
|
||||
_ready_list.push(reader_permit(permit.shared_from_this()));
|
||||
// permit is just transferred to another queue, no need to update waiters counter
|
||||
} else {
|
||||
x.pr.set_value();
|
||||
permit.aux_data().pr.set_value();
|
||||
--_stats.waiters;
|
||||
}
|
||||
} catch (...) {
|
||||
x.pr.set_exception(std::current_exception());
|
||||
permit.aux_data().pr.set_exception(std::current_exception());
|
||||
}
|
||||
_wait_list.pop_front();
|
||||
}
|
||||
@@ -1128,7 +1141,7 @@ void reader_concurrency_semaphore::maybe_admit_waiters() noexcept {
|
||||
future<> reader_concurrency_semaphore::request_memory(reader_permit::impl& permit, size_t memory) {
|
||||
// Already blocked on memory?
|
||||
if (permit.get_state() == reader_permit::state::waiting_for_memory) {
|
||||
return permit.get_memory_future();
|
||||
return permit.aux_data().fut->get_future();
|
||||
}
|
||||
|
||||
if (_resources.memory > 0 || (consumed_resources().memory + memory) < get_serialize_limit()) {
|
||||
@@ -1145,7 +1158,7 @@ future<> reader_concurrency_semaphore::request_memory(reader_permit::impl& permi
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
return enqueue_waiter(reader_permit(permit.shared_from_this()), {}, wait_on::memory);
|
||||
return enqueue_waiter(reader_permit(permit.shared_from_this()), wait_on::memory);
|
||||
}
|
||||
|
||||
void reader_concurrency_semaphore::on_permit_created(reader_permit::impl& permit) {
|
||||
@@ -1213,20 +1226,28 @@ reader_permit reader_concurrency_semaphore::make_tracking_only_permit(const sche
|
||||
|
||||
future<> reader_concurrency_semaphore::with_permit(const schema* const schema, const char* const op_name, size_t memory,
|
||||
db::timeout_clock::time_point timeout, read_func func) {
|
||||
return do_wait_admission(reader_permit(*this, schema, std::string_view(op_name), {1, static_cast<ssize_t>(memory)}, timeout), std::move(func));
|
||||
auto permit = reader_permit(*this, schema, std::string_view(op_name), {1, static_cast<ssize_t>(memory)}, timeout);
|
||||
permit->aux_data().func = std::move(func);
|
||||
return do_wait_admission(std::move(permit));
|
||||
}
|
||||
|
||||
future<> reader_concurrency_semaphore::with_ready_permit(reader_permit permit, read_func func) {
|
||||
future<> reader_concurrency_semaphore::with_ready_permit(reader_permit permit) {
|
||||
if (auto ex = check_queue_size("ready")) {
|
||||
return make_exception_future<>(std::move(ex));
|
||||
}
|
||||
promise<> pr;
|
||||
auto fut = pr.get_future();
|
||||
_ready_list.push(entry(std::move(pr), std::move(permit), std::move(func)));
|
||||
auto& ad = permit->aux_data();
|
||||
ad.pr = {};
|
||||
auto fut = ad.pr.get_future();
|
||||
_ready_list.push(std::move(permit));
|
||||
++_stats.waiters;
|
||||
return fut;
|
||||
}
|
||||
|
||||
future<> reader_concurrency_semaphore::with_ready_permit(reader_permit permit, read_func func) {
|
||||
permit->aux_data().func = std::move(func);
|
||||
return with_ready_permit(std::move(permit));
|
||||
}
|
||||
|
||||
void reader_concurrency_semaphore::set_resources(resources r) {
|
||||
auto delta = r - _initial_resources;
|
||||
_initial_resources = r;
|
||||
@@ -1239,7 +1260,7 @@ void reader_concurrency_semaphore::broken(std::exception_ptr ex) {
|
||||
ex = std::make_exception_ptr(broken_semaphore{});
|
||||
}
|
||||
while (!_wait_list.empty()) {
|
||||
_wait_list.front().pr.set_exception(ex);
|
||||
_wait_list.front()->aux_data().pr.set_exception(ex);
|
||||
_wait_list.pop_front();
|
||||
--_stats.waiters;
|
||||
}
|
||||
|
||||
@@ -121,20 +121,12 @@ public:
|
||||
using read_func = noncopyable_function<future<>(reader_permit)>;
|
||||
|
||||
private:
|
||||
struct entry {
|
||||
promise<> pr;
|
||||
reader_permit permit;
|
||||
read_func func;
|
||||
entry(promise<>&& pr, reader_permit permit, read_func func)
|
||||
: pr(std::move(pr)), permit(std::move(permit)), func(std::move(func)) {}
|
||||
};
|
||||
|
||||
class expiry_handler {
|
||||
reader_concurrency_semaphore& _semaphore;
|
||||
public:
|
||||
explicit expiry_handler(reader_concurrency_semaphore& semaphore)
|
||||
: _semaphore(semaphore) {}
|
||||
void operator()(entry& e) noexcept;
|
||||
void operator()(reader_permit& p) noexcept;
|
||||
};
|
||||
|
||||
struct inactive_read : public bi::list_base_hook<bi::link_mode<bi::auto_unlink>> {
|
||||
@@ -201,21 +193,21 @@ private:
|
||||
|
||||
class wait_queue {
|
||||
// Stores entries for permits waiting to be admitted.
|
||||
expiring_fifo<entry, expiry_handler, db::timeout_clock> _admission_queue;
|
||||
expiring_fifo<reader_permit, expiry_handler, db::timeout_clock> _admission_queue;
|
||||
// Stores entries for serialized permits waiting to obtain memory.
|
||||
expiring_fifo<entry, expiry_handler, db::timeout_clock> _memory_queue;
|
||||
expiring_fifo<reader_permit, expiry_handler, db::timeout_clock> _memory_queue;
|
||||
public:
|
||||
wait_queue(expiry_handler eh) : _admission_queue(eh), _memory_queue(eh) { }
|
||||
bool empty() const {
|
||||
return _admission_queue.empty() && _memory_queue.empty();
|
||||
}
|
||||
void push_to_admission_queue(entry&& e, db::timeout_clock::time_point timeout) {
|
||||
void push_to_admission_queue(reader_permit&& e, db::timeout_clock::time_point timeout) {
|
||||
_admission_queue.push_back(std::move(e), timeout);
|
||||
}
|
||||
void push_to_memory_queue(entry&& e, db::timeout_clock::time_point timeout) {
|
||||
void push_to_memory_queue(reader_permit&& e, db::timeout_clock::time_point timeout) {
|
||||
_memory_queue.push_back(std::move(e), timeout);
|
||||
}
|
||||
entry& front() {
|
||||
reader_permit& front() {
|
||||
if (_memory_queue.empty()) {
|
||||
return _admission_queue.front();
|
||||
} else {
|
||||
@@ -232,7 +224,7 @@ private:
|
||||
};
|
||||
|
||||
wait_queue _wait_list;
|
||||
queue<entry> _ready_list;
|
||||
queue<reader_permit> _ready_list;
|
||||
|
||||
sstring _name;
|
||||
size_t _max_queue_length = std::numeric_limits<size_t>::max();
|
||||
@@ -262,9 +254,9 @@ private:
|
||||
// Add the permit to the wait queue and return the future which resolves when
|
||||
// the permit is admitted (popped from the queue).
|
||||
enum class wait_on { admission, memory };
|
||||
future<> enqueue_waiter(reader_permit permit, read_func func, wait_on wait);
|
||||
future<> enqueue_waiter(reader_permit permit, wait_on wait);
|
||||
void evict_readers_in_background();
|
||||
future<> do_wait_admission(reader_permit permit, read_func func = {});
|
||||
future<> do_wait_admission(reader_permit permit);
|
||||
|
||||
// Check whether permit can be admitted or not.
|
||||
// The wait list is not taken into consideration, this is the caller's
|
||||
@@ -309,6 +301,8 @@ private:
|
||||
void consume(reader_permit::impl& permit, resources r);
|
||||
void signal(const resources& r) noexcept;
|
||||
|
||||
future<> with_ready_permit(reader_permit permit);
|
||||
|
||||
public:
|
||||
struct no_limits { };
|
||||
|
||||
|
||||
Reference in New Issue
Block a user