Merge 'reader_permit: minor improvements to resource consume/release safety' from Botond Dénes

This PR contains some small improvements to the safety of consuming/releasing resources to/from the semaphore:
* reader_permit: make the low-level `consume()/signal()` API private, making the only user (an RAII class) friend.
* reader_resources: split `reset()` into `noexcept` and potentially throwing variant.
* reader_resources::reset_to(): try harder to avoid calling `consume()` (when the new resource amount is smaller then the previous one)

Closes #13678

* github.com:scylladb/scylladb:
  reader_permit: resource_units::reset_to(): try harder to avoid calling consume()
  reader_permit: split resource_units::reset()
  reader_permit: make consume()/signal() API private
This commit is contained in:
Avi Kivity
2023-05-11 22:57:50 +03:00
7 changed files with 55 additions and 28 deletions

View File

@@ -105,7 +105,7 @@ mutation_fragment::mutation_fragment(const schema& s, reader_permit permit, part
void mutation_fragment::reset_memory(const schema& s, std::optional<reader_resources> res) {
try {
_data->_memory.reset(res ? *res : reader_resources::with_memory(calculate_memory_usage(s)));
_data->_memory.reset_to(res ? *res : reader_resources::with_memory(calculate_memory_usage(s)));
} catch (...) {
destroy_data();
throw;
@@ -191,7 +191,7 @@ void mutation_fragment_v2::destroy_data() noexcept
void mutation_fragment_v2::reset_memory(const schema& s, std::optional<reader_resources> res) {
try {
_data->_memory.reset(res ? *res : reader_resources::with_memory(calculate_memory_usage(s)));
_data->_memory.reset_to(res ? *res : reader_resources::with_memory(calculate_memory_usage(s)));
} catch (...) {
destroy_data();
throw;

View File

@@ -418,7 +418,7 @@ public:
template<typename Consumer>
requires MutationFragmentConsumer<Consumer, decltype(std::declval<Consumer>().consume(std::declval<range_tombstone>()))>
decltype(auto) consume(Consumer& consumer) && {
_data->_memory.reset();
_data->_memory.reset_to_zero();
switch (_kind) {
case kind::static_row:
return consumer.consume(std::move(_data->_static_row));

View File

@@ -177,7 +177,7 @@ public:
, _data(std::make_unique<data>(std::move(permit)))
{
new (&_data->_clustering_row) clustering_row(std::forward<Args>(args)...);
_data->_memory.reset(reader_resources::with_memory(calculate_memory_usage(s)));
_data->_memory.reset_to(reader_resources::with_memory(calculate_memory_usage(s)));
}
mutation_fragment_v2(const schema& s, reader_permit permit, static_row&& r);
@@ -205,7 +205,7 @@ public:
new (&_data->_partition_end) partition_end(o._data->_partition_end);
break;
}
_data->_memory.reset(o._data->_memory.resources());
_data->_memory.reset_to(o._data->_memory.resources());
}
mutation_fragment_v2(mutation_fragment_v2&& other) = default;
mutation_fragment_v2& operator=(mutation_fragment_v2&& other) noexcept {
@@ -242,19 +242,19 @@ public:
void mutate_as_static_row(const schema& s, std::invocable<static_row&> auto&& fn) {
fn(_data->_static_row);
_data->_memory.reset(reader_resources::with_memory(calculate_memory_usage(s)));
_data->_memory.reset_to(reader_resources::with_memory(calculate_memory_usage(s)));
}
void mutate_as_clustering_row(const schema& s, std::invocable<clustering_row&> auto&& fn) {
fn(_data->_clustering_row);
_data->_memory.reset(reader_resources::with_memory(calculate_memory_usage(s)));
_data->_memory.reset_to(reader_resources::with_memory(calculate_memory_usage(s)));
}
void mutate_as_range_tombstone_change(const schema& s, std::invocable<range_tombstone_change&> auto&& fn) {
fn(_data->_range_tombstone_chg);
_data->_memory.reset(reader_resources::with_memory(calculate_memory_usage(s)));
_data->_memory.reset_to(reader_resources::with_memory(calculate_memory_usage(s)));
}
void mutate_as_partition_start(const schema& s, std::invocable<partition_start&> auto&& fn) {
fn(_data->_partition_start);
_data->_memory.reset(reader_resources::with_memory(calculate_memory_usage(s)));
_data->_memory.reset_to(reader_resources::with_memory(calculate_memory_usage(s)));
}
static_row&& as_static_row() && { return std::move(_data->_static_row); }
@@ -275,7 +275,7 @@ public:
template<typename Consumer>
requires MutationFragmentConsumerV2<Consumer, decltype(std::declval<Consumer>().consume(std::declval<range_tombstone_change>()))>
decltype(auto) consume(Consumer& consumer) && {
_data->_memory.reset();
_data->_memory.reset_to_zero();
switch (_kind) {
case kind::static_row:
return consumer.consume(std::move(_data->_static_row));

View File

@@ -87,16 +87,14 @@ reader_permit::resource_units::resource_units(resource_units&& o) noexcept
}
reader_permit::resource_units::~resource_units() {
if (_resources.non_zero()) {
reset();
}
reset_to_zero();
}
reader_permit::resource_units& reader_permit::resource_units::operator=(resource_units&& o) noexcept {
if (&o == this) {
return *this;
}
reset();
reset_to_zero();
_permit = std::move(o._permit);
_resources = std::exchange(o._resources, {});
return *this;
@@ -107,7 +105,16 @@ void reader_permit::resource_units::add(resource_units&& o) {
_resources += std::exchange(o._resources, {});
}
void reader_permit::resource_units::reset(reader_resources res) {
void reader_permit::resource_units::reset_to(reader_resources res) {
if (res == _resources) {
return;
}
if (res.count < _resources.count && res.memory < _resources.memory) {
_permit.signal(reader_resources{_resources.count - res.count, _resources.memory - res.memory});
_resources = res;
return;
}
if (res.non_zero()) {
_permit.consume(res);
}
@@ -117,6 +124,13 @@ void reader_permit::resource_units::reset(reader_resources res) {
_resources = res;
}
void reader_permit::resource_units::reset_to_zero() noexcept {
if (_resources.non_zero()) {
_permit.signal(_resources);
_resources = {};
}
}
class reader_permit::impl
: public boost::intrusive::list_base_hook<boost::intrusive::link_mode<boost::intrusive::auto_unlink>>
, public enable_shared_from_this<reader_permit::impl> {

View File

@@ -73,6 +73,7 @@ class reader_concurrency_semaphore;
/// should be held onto while the respective resources are in use.
class reader_permit {
friend class reader_concurrency_semaphore;
friend class tracking_allocator_base;
public:
class resource_units;
@@ -118,6 +119,10 @@ private:
friend class optimized_optional<reader_permit>;
void consume(reader_resources res);
void signal(reader_resources res);
public:
~reader_permit();
@@ -142,10 +147,6 @@ public:
// Call only when needs_readmission() = true.
future<> wait_readmission();
void consume(reader_resources res);
void signal(reader_resources res);
resource_units consume_memory(size_t memory = 0);
resource_units consume_resources(reader_resources res);
@@ -200,7 +201,8 @@ public:
resource_units& operator=(const resource_units&) = delete;
resource_units& operator=(resource_units&&) noexcept;
void add(resource_units&& o);
void reset(reader_resources res = {});
void reset_to(reader_resources res);
void reset_to_zero() noexcept;
reader_permit permit() const { return _permit; }
reader_resources resources() const { return _resources; }
};
@@ -268,24 +270,35 @@ inline temporary_buffer<char> make_new_tracked_temporary_buffer(size_t size, rea
file make_tracked_file(file f, reader_permit p);
class tracking_allocator_base {
reader_permit _permit;
protected:
tracking_allocator_base(reader_permit permit) noexcept : _permit(std::move(permit)) { }
void consume(size_t memory) {
_permit.consume(reader_resources::with_memory(memory));
}
void signal(size_t memory) {
_permit.signal(reader_resources::with_memory(memory));
}
};
template <typename T>
class tracking_allocator {
class tracking_allocator : public tracking_allocator_base {
public:
using value_type = T;
using propagate_on_container_move_assignment = std::true_type;
using is_always_equal = std::false_type;
private:
reader_permit _permit;
std::allocator<T> _alloc;
public:
tracking_allocator(reader_permit permit) noexcept : _permit(std::move(permit)) { }
tracking_allocator(reader_permit permit) noexcept : tracking_allocator_base(std::move(permit)) { }
T* allocate(size_t n) {
auto p = _alloc.allocate(n);
try {
_permit.consume(reader_resources::with_memory(n * sizeof(T)));
consume(n * sizeof(T));
} catch (...) {
_alloc.deallocate(p, n);
throw;
@@ -295,7 +308,7 @@ public:
void deallocate(T* p, size_t n) {
_alloc.deallocate(p, n);
if (n) {
_permit.signal(reader_resources::with_memory(n * sizeof(T)));
signal(n * sizeof(T));
}
}

View File

@@ -740,7 +740,7 @@ SEASTAR_THREAD_TEST_CASE(test_immediate_evict_on_insert) {
.no_drops()
.resource_based_evictions();
resources.reset();
resources.reset_to_zero();
fut.get();
}

View File

@@ -161,7 +161,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_readmission_preserves
}
BOOST_REQUIRE(!fut.available());
consumed_resources.reset();
consumed_resources.reset_to_zero();
fut.get();
} else {
if (permit->needs_readmission()) {
@@ -1519,7 +1519,7 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_request_memory_preser
BOOST_REQUIRE(!units2_fut.available());
BOOST_REQUIRE_EQUAL(semaphore.get_stats().reads_enqueued_for_memory, ++reads_enqueued_for_memory);
sponge_units.reset();
sponge_units.reset_to_zero();
auto units2 = units2_fut.get();
BOOST_REQUIRE_EQUAL(semaphore.get_stats().current_permits, 2);