reader_permit: introduce (private) operator * and ->

Currently the reader_permit has some private methods that only the
semaphore's internal calls. But this method of communication is not
consistent, other times the semaphore accesses the permit impl directly,
calling methods on that.
This commit introduces operator * and -> for reader_permit. With this,
the semaphore internals always call the reader_permit::impl methods
direcly, either via a direct reference, or via the above operators.
This makes the permit internface a little narrower and reduces
boilerplate code.
This commit is contained in:
Botond Dénes
2023-02-02 08:50:29 -05:00
parent f5b80fdfd8
commit bcfb8715f9
3 changed files with 16 additions and 39 deletions

View File

@@ -420,26 +420,6 @@ reader_permit::reader_permit(reader_concurrency_semaphore& semaphore, const sche
{
}
void reader_permit::on_waiting_for_admission() {
_impl->on_waiting_for_admission();
}
void reader_permit::on_waiting_for_memory(future<> fut) {
_impl->on_waiting_for_memory(std::move(fut));
}
void reader_permit::on_admission() {
_impl->on_admission();
}
void reader_permit::on_granted_memory() {
_impl->on_granted_memory();
}
future<> reader_permit::get_memory_future() {
return _impl->get_memory_future();
}
reader_permit::~reader_permit() {
}
@@ -815,9 +795,9 @@ reader_concurrency_semaphore::~reader_concurrency_semaphore() {
}
reader_concurrency_semaphore::inactive_read_handle reader_concurrency_semaphore::register_inactive_read(flat_mutation_reader_v2 reader) noexcept {
auto& permit_impl = *reader.permit()._impl;
permit_impl.on_register_as_inactive();
if (_blessed_permit == &permit_impl) {
auto& permit = reader.permit();
permit->on_register_as_inactive();
if (_blessed_permit == &*permit) {
_blessed_permit = nullptr;
// No need to call maybe_admit_waiters() here.
// If there are waiters, the reader is going to be evicted below,
@@ -844,7 +824,7 @@ reader_concurrency_semaphore::inactive_read_handle reader_concurrency_semaphore:
rcslog.warn("Registering inactive read failed: {}. Ignored as if it was evicted.", std::current_exception());
}
} else {
permit_impl.on_evicted();
permit->on_evicted();
++_stats.permit_based_evictions;
}
close_reader(std::move(reader));
@@ -885,7 +865,7 @@ flat_mutation_reader_v2_opt reader_concurrency_semaphore::unregister_inactive_re
--_stats.inactive_reads;
std::unique_ptr<inactive_read> irp(irh._irp);
irp->reader.permit()._impl->on_unregister_as_inactive();
irp->reader.permit()->on_unregister_as_inactive();
return std::move(irp->reader);
}
@@ -946,7 +926,7 @@ void reader_concurrency_semaphore::do_detach_inactive_reader(inactive_read& ir,
ir.unlink();
ir.ttl_timer.cancel();
ir.detach();
ir.reader.permit()._impl->on_evicted();
ir.reader.permit()->on_evicted();
try {
if (ir.notify_handler) {
ir.notify_handler(reason);
@@ -1012,12 +992,12 @@ future<> reader_concurrency_semaphore::enqueue_waiter(reader_permit permit, read
auto fut = pr.get_future();
auto timeout = permit.timeout();
if (wait == wait_on::admission) {
permit.on_waiting_for_admission();
permit->on_waiting_for_admission();
_wait_list.push_to_admission_queue(entry(std::move(pr), std::move(permit), std::move(func)), timeout);
++_stats.reads_enqueued_for_admission;
} else {
permit.on_waiting_for_memory(std::move(fut));
fut = permit.get_memory_future();
permit->on_waiting_for_memory(std::move(fut));
fut = permit->get_memory_future();
_wait_list.push_to_memory_queue(entry(std::move(pr), std::move(permit), std::move(func)), timeout);
++_stats.reads_enqueued_for_memory;
}
@@ -1107,7 +1087,7 @@ future<> reader_concurrency_semaphore::do_wait_admission(reader_permit permit, r
return fut;
}
permit.on_admission();
permit->on_admission();
++_stats.reads_admitted;
if (func) {
return with_ready_permit(std::move(permit), std::move(func));
@@ -1121,10 +1101,10 @@ void reader_concurrency_semaphore::maybe_admit_waiters() noexcept {
auto& x = _wait_list.front();
try {
if (x.permit.get_state() == reader_permit::state::waiting_for_memory) {
_blessed_permit = x.permit._impl.get();
x.permit.on_granted_memory();
_blessed_permit = &*x.permit;
x.permit->on_granted_memory();
} else {
x.permit.on_admission();
x.permit->on_admission();
++_stats.reads_admitted;
}
if (x.func) {

View File

@@ -101,12 +101,8 @@ private:
explicit reader_permit(reader_concurrency_semaphore& semaphore, const schema* const schema, sstring&& op_name,
reader_resources base_resources, db::timeout_clock::time_point timeout);
void on_waiting_for_admission();
void on_waiting_for_memory(future<> f);
void on_admission();
void on_granted_memory();
future<> get_memory_future();
reader_permit::impl& operator*() { return *_impl; }
reader_permit::impl* operator->() { return _impl.get(); }
void mark_used() noexcept;

View File

@@ -580,6 +580,7 @@ public:
void unpop_mutation_fragment(mutation_fragment_v2 mf) { _impl->unpop_mutation_fragment(std::move(mf)); }
const schema_ptr& schema() const { return _impl->_schema; }
const reader_permit& permit() const { return _impl->_permit; }
reader_permit& permit() { return _impl->_permit; }
db::timeout_clock::time_point timeout() const noexcept { return _impl->timeout(); }
void set_timeout(db::timeout_clock::time_point timeout) noexcept { _impl->set_timeout(timeout); }
void set_max_buffer_size(size_t size) {