logalloc, dirty_memory_manager: move region_group and associated code

region_group is an abstraction that allows accounting for groups of
regions, but the cost/benefit ratio of maintaining the abstraction
is poor. Each time we need to change decision algorithm of memtable
flushing (admittedly rarely), we need to distill that into an abstraction
for region_groups and then use it. An example is virtual regions groups;
we wanted to account for the partially flushed memtables and had to
invent region groups to stand in their place.

Rather than continuing to invest in the abstraction, break it now
and move it to the memtable dirty memory manager which is responsible
for making those decisions. The relevant code is moved to
dirty_memory_manager.hh and dirty_memory_manager.cc (new file), and
a new unit test file is added as well.

A downside of the change is that unit testing will be more difficult.
This commit is contained in:
Avi Kivity
2022-06-21 13:38:19 +03:00
parent bffee2540f
commit fbe8ea7727
9 changed files with 1392 additions and 1319 deletions

View File

@@ -399,6 +399,7 @@ scylla_tests = set([
'test/boost/crc_test',
'test/boost/data_listeners_test',
'test/boost/database_test',
'test/boost/dirty_memory_manager_test',
'test/boost/double_decker_test',
'test/boost/duration_test',
'test/boost/dynamic_bitset_test',
@@ -674,6 +675,7 @@ scylla_core = (['replica/database.cc',
'replica/distributed_loader.cc',
'replica/memtable.cc',
'replica/exceptions.cc',
'dirty_memory_manager.cc',
'absl-flat_hash_map.cc',
'atomic_cell.cc',
'caching_options.cc',

176
dirty_memory_manager.cc Normal file
View File

@@ -0,0 +1,176 @@
// Copyright (C) 2012-present ScyllaDB
// SPDX-License-Identifier: AGPL-3.0-or-later
#include "dirty_memory_manager.hh"
#include <seastar/util/later.hh>
#include <seastar/core/with_scheduling_group.hh>
#include "seastarx.hh"
// Code previously under logalloc namespace
namespace dirty_memory_manager_logalloc {
using namespace logalloc;
inline void
region_group_binomial_group_sanity_check(const region_group::region_heap& bh) {
#ifdef SEASTAR_DEBUG
bool failed = false;
size_t last = std::numeric_limits<size_t>::max();
for (auto b = bh.ordered_begin(); b != bh.ordered_end(); b++) {
auto t = region_impl_to_region(*b)->evictable_occupancy().total_space();
if (!(t <= last)) {
failed = true;
break;
}
last = t;
}
if (!failed) {
return;
}
fmt::print("Sanity checking FAILED, size {}\n", bh.size());
for (auto b = bh.ordered_begin(); b != bh.ordered_end(); b++) {
auto r = region_impl_to_region(*b);
auto t = r->evictable_occupancy().total_space();
fmt::print(" r = {} (id={}), occupancy = {}\n", fmt::ptr(r), r->id(), t);
}
assert(0);
#endif
}
region_group_reclaimer region_group::no_reclaimer;
uint64_t region_group::top_region_evictable_space() const {
return _regions.empty() ? 0 : region_impl_to_region(_regions.top())->evictable_occupancy().total_space();
}
region* region_group::get_largest_region() {
if (!_maximal_rg || _maximal_rg->_regions.empty()) {
return nullptr;
}
return region_impl_to_region(_maximal_rg->_regions.top());
}
void
region_group::add(region_group* child) {
child->_subgroup_heap_handle = _subgroups.push(child);
update(child->_total_memory);
}
void
region_group::del(region_group* child) {
_subgroups.erase(child->_subgroup_heap_handle);
update(-child->_total_memory);
}
void
region_group::add(region* child_r) {
auto child = region_to_region_impl(child_r);
child_r->region_heap_handle() = _regions.push(child);
region_group_binomial_group_sanity_check(_regions);
update(child_r->occupancy().total_space());
}
void
region_group::del(region* child_r) {
_regions.erase(child_r->region_heap_handle());
region_group_binomial_group_sanity_check(_regions);
update(-child_r->occupancy().total_space());
}
void
region_group::moved(region* old_address, region* new_address) {
}
bool
region_group::execution_permitted() noexcept {
return do_for_each_parent(this, [] (auto rg) {
return rg->under_pressure() ? stop_iteration::yes : stop_iteration::no;
}) == nullptr;
}
future<>
region_group::start_releaser(scheduling_group deferred_work_sg) {
return with_scheduling_group(deferred_work_sg, [this] {
return yield().then([this] {
return repeat([this] () noexcept {
if (_shutdown_requested) {
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
if (!_blocked_requests.empty() && execution_permitted()) {
auto req = std::move(_blocked_requests.front());
_blocked_requests.pop_front();
req->allocate();
return make_ready_future<stop_iteration>(stop_iteration::no);
} else {
// Block reclaiming to prevent signal() from being called by reclaimer inside wait()
// FIXME: handle allocation failures (not very likely) like allocating_section does
tracker_reclaimer_lock rl;
return _relief.wait().then([] {
return stop_iteration::no;
});
}
});
});
});
}
region_group::region_group(sstring name, region_group *parent,
region_group_reclaimer& reclaimer, scheduling_group deferred_work_sg)
: _parent(parent)
, _reclaimer(reclaimer)
, _blocked_requests(on_request_expiry{std::move(name)})
, _releaser(reclaimer_can_block() ? start_releaser(deferred_work_sg) : make_ready_future<>())
{
if (_parent) {
_parent->add(this);
}
}
bool region_group::reclaimer_can_block() const {
return _reclaimer.throttle_threshold() != std::numeric_limits<size_t>::max();
}
void region_group::notify_relief() {
_relief.signal();
for (region_group* child : _subgroups) {
child->notify_relief();
}
}
void region_group::update(ssize_t delta) {
// Most-enclosing group which was relieved.
region_group* top_relief = nullptr;
do_for_each_parent(this, [&top_relief, delta] (region_group* rg) mutable {
rg->update_maximal_rg();
rg->_total_memory += delta;
if (rg->_total_memory >= rg->_reclaimer.soft_limit_threshold()) {
rg->_reclaimer.notify_soft_pressure();
} else {
rg->_reclaimer.notify_soft_relief();
}
if (rg->_total_memory > rg->_reclaimer.throttle_threshold()) {
rg->_reclaimer.notify_pressure();
} else if (rg->_reclaimer.under_pressure()) {
rg->_reclaimer.notify_relief();
top_relief = rg;
}
return stop_iteration::no;
});
if (top_relief) {
top_relief->notify_relief();
}
}
void region_group::on_request_expiry::operator()(std::unique_ptr<allocating_function>& func) noexcept {
func->fail(std::make_exception_ptr(blocked_requests_timed_out_error{_name}));
}
}

View File

@@ -16,6 +16,379 @@
#include "replica/database_fwd.hh"
#include "utils/logalloc.hh"
// Code previously under logalloc namespace
namespace dirty_memory_manager_logalloc {
using namespace logalloc;
//
// Users of a region_group can pass an instance of the class region_group_reclaimer, and specialize
// its methods start_reclaiming() and stop_reclaiming(). Those methods will be called when the LSA
// see relevant changes in the memory pressure conditions for this region_group. By specializing
// those methods - which are a nop by default - the callers can take action to aid the LSA in
// alleviating pressure.
class region_group_reclaimer {
protected:
size_t _threshold;
size_t _soft_limit;
bool _under_pressure = false;
bool _under_soft_pressure = false;
// The following restrictions apply to implementations of start_reclaiming() and stop_reclaiming():
//
// - must not use any region or region_group objects, because they're invoked synchronously
// with operations on those.
//
// - must be noexcept, because they're called on the free path.
//
// - the implementation may be called synchronously with any operation
// which allocates memory, because these are called by memory reclaimer.
// In particular, the implementation should not depend on memory allocation
// because that may fail when in reclaiming context.
//
virtual void start_reclaiming() noexcept {}
virtual void stop_reclaiming() noexcept {}
public:
bool under_pressure() const {
return _under_pressure;
}
bool over_soft_limit() const {
return _under_soft_pressure;
}
void notify_soft_pressure() noexcept {
if (!_under_soft_pressure) {
_under_soft_pressure = true;
start_reclaiming();
}
}
void notify_soft_relief() noexcept {
if (_under_soft_pressure) {
_under_soft_pressure = false;
stop_reclaiming();
}
}
void notify_pressure() noexcept {
_under_pressure = true;
}
void notify_relief() noexcept {
_under_pressure = false;
}
region_group_reclaimer()
: _threshold(std::numeric_limits<size_t>::max()), _soft_limit(std::numeric_limits<size_t>::max()) {}
region_group_reclaimer(size_t threshold)
: _threshold(threshold), _soft_limit(threshold) {}
region_group_reclaimer(size_t threshold, size_t soft)
: _threshold(threshold), _soft_limit(soft) {
assert(_soft_limit <= _threshold);
}
virtual ~region_group_reclaimer() {}
size_t throttle_threshold() const {
return _threshold;
}
size_t soft_limit_threshold() const {
return _soft_limit;
}
};
// Groups regions for the purpose of statistics. Can be nested.
// Interfaces to regions via region_listener
class region_group : public region_listener {
static region_group_reclaimer no_reclaimer;
using region_evictable_occupancy_ascending_less_comparator = logalloc::region_evictable_occupancy_ascending_less_comparator;
// We want to sort the subgroups so that we can easily find the one that holds the biggest
// region for freeing purposes. Please note that this is not the biggest of the region groups,
// since a big region group can have a big collection of very small regions, and freeing them
// won't achieve anything. An example of such scenario is a ScyllaDB region with a lot of very
// small memtables that add up, versus one with a very big memtable. The small memtables are
// likely still growing, and freeing the big memtable will guarantee that the most memory is
// freed up, while maximizing disk throughput.
//
// As asynchronous reclaim will likely involve disk operation, and those tend to be more
// efficient when bulk done, this behavior is not ScyllaDB memtable specific.
//
// The maximal score is recursively defined as:
//
// max(our_biggest_region, our_subtree_biggest_region)
struct subgroup_maximal_region_ascending_less_comparator {
bool operator()(region_group* rg1, region_group* rg2) const {
return rg1->maximal_score() < rg2->maximal_score();
}
};
friend struct subgroup_maximal_region_ascending_less_comparator;
using region_heap = logalloc::region_heap;
using subgroup_heap = boost::heap::binomial_heap<region_group*,
boost::heap::compare<subgroup_maximal_region_ascending_less_comparator>,
boost::heap::allocator<std::allocator<region_group*>>,
//constant_time_size<true> causes corruption with boost < 1.60
boost::heap::constant_time_size<false>>;
region_group* _parent = nullptr;
size_t _total_memory = 0;
region_group_reclaimer& _reclaimer;
subgroup_heap _subgroups;
subgroup_heap::handle_type _subgroup_heap_handle;
region_heap _regions;
region_group* _maximal_rg = nullptr;
// We need to store the score separately, otherwise we'd have to have an extra pass
// before we update the region occupancy.
size_t _maximal_score = 0;
struct allocating_function {
virtual ~allocating_function() = default;
virtual void allocate() = 0;
virtual void fail(std::exception_ptr) = 0;
};
template <typename Func>
struct concrete_allocating_function : public allocating_function {
using futurator = futurize<std::result_of_t<Func()>>;
typename futurator::promise_type pr;
Func func;
public:
void allocate() override {
futurator::invoke(func).forward_to(std::move(pr));
}
void fail(std::exception_ptr e) override {
pr.set_exception(e);
}
concrete_allocating_function(Func&& func) : func(std::forward<Func>(func)) {}
typename futurator::type get_future() {
return pr.get_future();
}
};
class on_request_expiry {
class blocked_requests_timed_out_error : public timed_out_error {
const sstring _msg;
public:
explicit blocked_requests_timed_out_error(sstring name)
: _msg(std::move(name) + ": timed out") {}
virtual const char* what() const noexcept override {
return _msg.c_str();
}
};
sstring _name;
public:
explicit on_request_expiry(sstring name) : _name(std::move(name)) {}
void operator()(std::unique_ptr<allocating_function>&) noexcept;
};
// It is a more common idiom to just hold the promises in the circular buffer and make them
// ready. However, in the time between the promise being made ready and the function execution,
// it could be that our memory usage went up again. To protect against that, we have to recheck
// if memory is still available after the future resolves.
//
// But we can greatly simplify it if we store the function itself in the circular_buffer, and
// execute it synchronously in release_requests() when we are sure memory is available.
//
// This allows us to easily provide strong execution guarantees while keeping all re-check
// complication in release_requests and keep the main request execution path simpler.
expiring_fifo<std::unique_ptr<allocating_function>, on_request_expiry, db::timeout_clock> _blocked_requests;
uint64_t _blocked_requests_counter = 0;
condition_variable _relief;
future<> _releaser;
bool _shutdown_requested = false;
bool reclaimer_can_block() const;
future<> start_releaser(scheduling_group deferered_work_sg);
void notify_relief();
friend void region_group_binomial_group_sanity_check(const region_group::region_heap& bh);
private: // from region_listener
virtual void moved(region* old_address, region* new_address) override;
public:
// When creating a region_group, one can specify an optional throttle_threshold parameter. This
// parameter won't affect normal allocations, but an API is provided, through the region_group's
// method run_when_memory_available(), to make sure that a given function is only executed when
// the total memory for the region group (and all of its parents) is lower or equal to the
// region_group's throttle_treshold (and respectively for its parents).
//
// The deferred_work_sg parameter specifies a scheduling group in which to run allocations
// (given to run_when_memory_available()) when they must be deferred due to lack of memory
// at the time the call to run_when_memory_available() was made.
region_group(sstring name = "(unnamed region_group)",
region_group_reclaimer& reclaimer = no_reclaimer,
scheduling_group deferred_work_sg = default_scheduling_group())
: region_group(name, nullptr, reclaimer, deferred_work_sg) {}
region_group(sstring name, region_group* parent, region_group_reclaimer& reclaimer = no_reclaimer,
scheduling_group deferred_work_sg = default_scheduling_group());
region_group(region_group&& o) = delete;
region_group(const region_group&) = delete;
~region_group() {
// If we set a throttle threshold, we'd be postponing many operations. So shutdown must be
// called.
if (reclaimer_can_block()) {
assert(_shutdown_requested);
}
if (_parent) {
_parent->del(this);
}
}
region_group& operator=(const region_group&) = delete;
region_group& operator=(region_group&&) = delete;
size_t memory_used() const {
return _total_memory;
}
void update(ssize_t delta);
// It would be easier to call update, but it is unfortunately broken in boost versions up to at
// least 1.59.
//
// One possibility would be to just test for delta sigdness, but we adopt an explicit call for
// two reasons:
//
// 1) it save us a branch
// 2) some callers would like to pass delta = 0. For instance, when we are making a region
// evictable / non-evictable. Because the evictable occupancy changes, we would like to call
// the full update cycle even then.
virtual void increase_usage(region_heap::handle_type& r_handle, ssize_t delta) override { // From region_listener
_regions.increase(r_handle);
update(delta);
}
virtual void decrease_evictable_usage(region_heap::handle_type& r_handle) override { // From region_listener
_regions.decrease(r_handle);
}
virtual void decrease_usage(region_heap::handle_type& r_handle, ssize_t delta) override { // From region_listener
decrease_evictable_usage(r_handle);
update(delta);
}
//
// Make sure that the function specified by the parameter func only runs when this region_group,
// as well as each of its ancestors have a memory_used() amount of memory that is lesser or
// equal the throttle_threshold, as specified in the region_group's constructor.
//
// region_groups that did not specify a throttle_threshold will always allow for execution.
//
// In case current memory_used() is over the threshold, a non-ready future is returned and it
// will be made ready at some point in the future, at which memory usage in the offending
// region_group (either this or an ancestor) falls below the threshold.
//
// Requests that are not allowed for execution are queued and released in FIFO order within the
// same region_group, but no guarantees are made regarding release ordering across different
// region_groups.
//
// When timeout is reached first, the returned future is resolved with timed_out_error exception.
template <typename Func>
// We disallow future-returning functions here, because otherwise memory may be available
// when we start executing it, but no longer available in the middle of the execution.
requires (!is_future<std::invoke_result_t<Func>>::value)
futurize_t<std::result_of_t<Func()>> run_when_memory_available(Func&& func, db::timeout_clock::time_point timeout) {
auto blocked_at = do_for_each_parent(this, [] (auto rg) {
return (rg->_blocked_requests.empty() && !rg->under_pressure()) ? stop_iteration::no : stop_iteration::yes;
});
if (!blocked_at) {
return futurize_invoke(func);
}
auto fn = std::make_unique<concrete_allocating_function<Func>>(std::forward<Func>(func));
auto fut = fn->get_future();
_blocked_requests.push_back(std::move(fn), timeout);
++_blocked_requests_counter;
return fut;
}
// returns a pointer to the largest region (in terms of memory usage) that sits below this
// region group. This includes the regions owned by this region group as well as all of its
// children.
region* get_largest_region();
// Shutdown is mandatory for every user who has set a threshold
// Can be called at most once.
future<> shutdown() {
_shutdown_requested = true;
_relief.signal();
return std::move(_releaser);
}
size_t blocked_requests() {
return _blocked_requests.size();
}
uint64_t blocked_requests_counter() const {
return _blocked_requests_counter;
}
private:
// Returns true if and only if constraints of this group are not violated.
// That's taking into account any constraints imposed by enclosing (parent) groups.
bool execution_permitted() noexcept;
// Executes the function func for each region_group upwards in the hierarchy, starting with the
// parameter node. The function func may return stop_iteration::no, in which case it proceeds to
// the next ancestor in the hierarchy, or stop_iteration::yes, in which case it stops at this
// level.
//
// This method returns a pointer to the region_group that was processed last, or nullptr if the
// root was reached.
template <typename Func>
static region_group* do_for_each_parent(region_group *node, Func&& func) {
auto rg = node;
while (rg) {
if (func(rg) == stop_iteration::yes) {
return rg;
}
rg = rg->_parent;
}
return nullptr;
}
inline bool under_pressure() const {
return _reclaimer.under_pressure();
}
uint64_t top_region_evictable_space() const;
uint64_t maximal_score() const {
return _maximal_score;
}
void update_maximal_rg() {
auto my_score = top_region_evictable_space();
auto children_score = _subgroups.empty() ? 0 : _subgroups.top()->maximal_score();
auto old_maximal_score = _maximal_score;
if (children_score > my_score) {
_maximal_rg = _subgroups.top()->_maximal_rg;
} else {
_maximal_rg = this;
}
_maximal_score = _maximal_rg->top_region_evictable_space();
if (_parent) {
// binomial heap update boost bug.
if (_maximal_score > old_maximal_score) {
_parent->_subgroups.increase(_subgroup_heap_handle);
} else if (_maximal_score < old_maximal_score) {
_parent->_subgroups.decrease(_subgroup_heap_handle);
}
}
}
void add(region_group* child);
void del(region_group* child);
virtual void add(region* child) override; // from region_listener
virtual void del(region* child) override; // from region_listener
};
}
class dirty_memory_manager;
class sstable_write_permit final {
@@ -58,8 +431,8 @@ public:
future<flush_permit> reacquire_sstable_write_permit() &&;
};
class dirty_memory_manager: public logalloc::region_group_reclaimer {
logalloc::region_group_reclaimer _real_dirty_reclaimer;
class dirty_memory_manager: public dirty_memory_manager_logalloc::region_group_reclaimer {
dirty_memory_manager_logalloc::region_group_reclaimer _real_dirty_reclaimer;
// We need a separate boolean, because from the LSA point of view, pressure may still be
// mounting, in which case the pressure flag could be set back on if we force it off.
bool _db_shutdown_requested = false;
@@ -67,10 +440,10 @@ class dirty_memory_manager: public logalloc::region_group_reclaimer {
replica::database* _db;
// The _real_region_group protects against actual dirty memory usage hitting the maximum. Usage
// for this group is the real dirty memory usage of the system.
logalloc::region_group _real_region_group;
dirty_memory_manager_logalloc::region_group _real_region_group;
// The _virtual_region_group accounts for virtual memory usage. It is defined as the real dirty
// memory usage minus bytes that were already written to disk.
logalloc::region_group _virtual_region_group;
dirty_memory_manager_logalloc::region_group _virtual_region_group;
// We would like to serialize the flushing of memtables. While flushing many memtables
// simultaneously can sustain high levels of throughput, the memory is not freed until the
@@ -139,22 +512,22 @@ public:
// We then set the soft limit to 80 % of the virtual dirty hard limit, which is equal to 40 % of
// the user-supplied threshold.
dirty_memory_manager(replica::database& db, size_t threshold, double soft_limit, scheduling_group deferred_work_sg);
dirty_memory_manager() : logalloc::region_group_reclaimer()
dirty_memory_manager() : dirty_memory_manager_logalloc::region_group_reclaimer()
, _db(nullptr)
, _real_region_group("memtable", _real_dirty_reclaimer)
, _virtual_region_group("memtable (virtual)", &_real_region_group, *this)
, _flush_serializer(1)
, _waiting_flush(make_ready_future<>()) {}
static dirty_memory_manager& from_region_group(logalloc::region_group *rg) {
static dirty_memory_manager& from_region_group(dirty_memory_manager_logalloc::region_group *rg) {
return *(boost::intrusive::get_parent_from_member(rg, &dirty_memory_manager::_virtual_region_group));
}
logalloc::region_group& region_group() {
dirty_memory_manager_logalloc::region_group& region_group() {
return _virtual_region_group;
}
const logalloc::region_group& region_group() const {
const dirty_memory_manager_logalloc::region_group& region_group() const {
return _virtual_region_group;
}

View File

@@ -458,7 +458,7 @@ void backlog_controller::update_controller(float shares) {
dirty_memory_manager::dirty_memory_manager(replica::database& db, size_t threshold, double soft_limit, scheduling_group deferred_work_sg)
: logalloc::region_group_reclaimer(threshold / 2, threshold * soft_limit / 2)
: dirty_memory_manager_logalloc::region_group_reclaimer(threshold / 2, threshold * soft_limit / 2)
, _real_dirty_reclaimer(threshold)
, _db(&db)
, _real_region_group("memtable", _real_dirty_reclaimer, deferred_work_sg)

View File

@@ -262,7 +262,7 @@ public:
_memtables.emplace_back(new_memtable());
}
logalloc::region_group& region_group() {
dirty_memory_manager_logalloc::region_group& region_group() {
return _dirty_memory_manager->region_group();
}
// This is used for explicit flushes. Will queue the memtable for flushing and proceed when the
@@ -634,7 +634,7 @@ private:
std::chrono::steady_clock::time_point _sstable_writes_disabled_at;
void do_trigger_compaction();
logalloc::region_group& dirty_memory_region_group() const {
dirty_memory_manager_logalloc::region_group& dirty_memory_region_group() const {
return _config.dirty_memory_manager->region_group();
}
@@ -1653,7 +1653,7 @@ public:
// drops the table on all shards and removes the table directory if there are no snapshots
static future<> drop_table_on_all_shards(sharded<database>& db, sstring ks_name, sstring cf_name, timestamp_func, bool with_snapshot = true);
const logalloc::region_group& dirty_memory_region_group() const {
const dirty_memory_manager_logalloc::region_group& dirty_memory_region_group() const {
return _dirty_memory_manager.region_group();
}

View File

@@ -0,0 +1,828 @@
/*
* Copyright (C) 2015-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#include <boost/test/unit_test.hpp>
#include <boost/intrusive/parent_from_member.hpp>
#include <algorithm>
#include <chrono>
#include <random>
#include <seastar/core/circular_buffer.hh>
#include <seastar/core/print.hh>
#include <seastar/core/thread.hh>
#include <seastar/core/timer.hh>
#include <seastar/core/sleep.hh>
#include <seastar/core/thread_cputime_clock.hh>
#include <seastar/core/when_all.hh>
#include <seastar/core/with_timeout.hh>
#include <seastar/testing/test_case.hh>
#include <seastar/testing/thread_test_case.hh>
#include <seastar/util/defer.hh>
#include <deque>
#include "utils/lsa/weak_ptr.hh"
#include "utils/phased_barrier.hh"
#include "utils/logalloc.hh"
#include "dirty_memory_manager.hh"
#include "utils/managed_ref.hh"
#include "utils/managed_bytes.hh"
#include "utils/chunked_vector.hh"
#include "test/lib/log.hh"
#include "log.hh"
#include "test/lib/random_utils.hh"
#include "test/lib/make_random_string.hh"
[[gnu::unused]]
static auto x = [] {
logging::logger_registry().set_all_loggers_level(logging::log_level::debug);
return 0;
}();
using namespace logalloc;
using namespace dirty_memory_manager_logalloc;
SEASTAR_TEST_CASE(test_region_groups) {
return seastar::async([] {
region_group just_four;
region_group all;
region_group one_and_two("one_and_two", &all);
auto one = std::make_unique<logalloc::region>(one_and_two);
auto two = std::make_unique<logalloc::region>(one_and_two);
auto three = std::make_unique<logalloc::region>(all);
auto four = std::make_unique<logalloc::region>(just_four);
auto five = std::make_unique<logalloc::region>();
constexpr size_t base_count = 16 * 1024;
constexpr size_t one_count = 16 * base_count;
std::vector<managed_ref<int>> one_objs;
with_allocator(one->allocator(), [&] {
for (size_t i = 0; i < one_count; i++) {
one_objs.emplace_back(make_managed<int>());
}
});
BOOST_REQUIRE_GE(ssize_t(one->occupancy().used_space()), ssize_t(one_count * sizeof(int)));
BOOST_REQUIRE_GE(ssize_t(one->occupancy().total_space()), ssize_t(one->occupancy().used_space()));
BOOST_REQUIRE_EQUAL(one_and_two.memory_used(), one->occupancy().total_space());
BOOST_REQUIRE_EQUAL(all.memory_used(), one->occupancy().total_space());
constexpr size_t two_count = 8 * base_count;
std::vector<managed_ref<int>> two_objs;
with_allocator(two->allocator(), [&] {
for (size_t i = 0; i < two_count; i++) {
two_objs.emplace_back(make_managed<int>());
}
});
BOOST_REQUIRE_GE(ssize_t(two->occupancy().used_space()), ssize_t(two_count * sizeof(int)));
BOOST_REQUIRE_GE(ssize_t(two->occupancy().total_space()), ssize_t(two->occupancy().used_space()));
BOOST_REQUIRE_EQUAL(one_and_two.memory_used(), one->occupancy().total_space() + two->occupancy().total_space());
BOOST_REQUIRE_EQUAL(all.memory_used(), one_and_two.memory_used());
constexpr size_t three_count = 32 * base_count;
std::vector<managed_ref<int>> three_objs;
with_allocator(three->allocator(), [&] {
for (size_t i = 0; i < three_count; i++) {
three_objs.emplace_back(make_managed<int>());
}
});
BOOST_REQUIRE_GE(ssize_t(three->occupancy().used_space()), ssize_t(three_count * sizeof(int)));
BOOST_REQUIRE_GE(ssize_t(three->occupancy().total_space()), ssize_t(three->occupancy().used_space()));
BOOST_REQUIRE_EQUAL(all.memory_used(), one_and_two.memory_used() + three->occupancy().total_space());
constexpr size_t four_count = 4 * base_count;
std::vector<managed_ref<int>> four_objs;
with_allocator(four->allocator(), [&] {
for (size_t i = 0; i < four_count; i++) {
four_objs.emplace_back(make_managed<int>());
}
});
BOOST_REQUIRE_GE(ssize_t(four->occupancy().used_space()), ssize_t(four_count * sizeof(int)));
BOOST_REQUIRE_GE(ssize_t(four->occupancy().total_space()), ssize_t(four->occupancy().used_space()));
BOOST_REQUIRE_EQUAL(just_four.memory_used(), four->occupancy().total_space());
with_allocator(five->allocator(), [] {
constexpr size_t five_count = base_count;
std::vector<managed_ref<int>> five_objs;
for (size_t i = 0; i < five_count; i++) {
five_objs.emplace_back(make_managed<int>());
}
});
three->merge(*four);
BOOST_REQUIRE_GE(ssize_t(three->occupancy().used_space()), ssize_t((three_count + four_count)* sizeof(int)));
BOOST_REQUIRE_GE(ssize_t(three->occupancy().total_space()), ssize_t(three->occupancy().used_space()));
BOOST_REQUIRE_EQUAL(all.memory_used(), one_and_two.memory_used() + three->occupancy().total_space());
BOOST_REQUIRE_EQUAL(just_four.memory_used(), 0);
three->merge(*five);
BOOST_REQUIRE_GE(ssize_t(three->occupancy().used_space()), ssize_t((three_count + four_count)* sizeof(int)));
BOOST_REQUIRE_GE(ssize_t(three->occupancy().total_space()), ssize_t(three->occupancy().used_space()));
BOOST_REQUIRE_EQUAL(all.memory_used(), one_and_two.memory_used() + three->occupancy().total_space());
with_allocator(two->allocator(), [&] {
two_objs.clear();
});
two.reset();
BOOST_REQUIRE_EQUAL(one_and_two.memory_used(), one->occupancy().total_space());
BOOST_REQUIRE_EQUAL(all.memory_used(), one_and_two.memory_used() + three->occupancy().total_space());
with_allocator(one->allocator(), [&] {
one_objs.clear();
});
one.reset();
BOOST_REQUIRE_EQUAL(one_and_two.memory_used(), 0);
BOOST_REQUIRE_EQUAL(all.memory_used(), three->occupancy().total_space());
with_allocator(three->allocator(), [&] {
three_objs.clear();
four_objs.clear();
});
three.reset();
four.reset();
five.reset();
BOOST_REQUIRE_EQUAL(all.memory_used(), 0);
});
}
using namespace std::chrono_literals;
template <typename FutureType>
inline void quiesce(FutureType&& fut) {
// Unfortunately seastar::thread::yield is not enough here, because the process of releasing
// a request may be broken into many continuations. While we could just yield many times, the
// exact amount needed to guarantee execution would be dependent on the internals of the
// implementation, we want to avoid that.
with_timeout(lowres_clock::now() + 2s, std::move(fut)).get();
}
// Simple RAII structure that wraps around a region_group
// Not using defer because we usually employ many region groups
struct test_region_group: public region_group {
test_region_group(region_group* parent, region_group_reclaimer& reclaimer)
: region_group("test_region_group", parent, reclaimer) {}
test_region_group(region_group_reclaimer& reclaimer)
: region_group("test_region_group", nullptr, reclaimer) {}
~test_region_group() {
shutdown().get();
}
};
struct test_region: public logalloc::region {
test_region(test_region_group& rg) : logalloc::region(rg) {}
~test_region() {
clear();
}
void clear() {
with_allocator(allocator(), [this] {
std::vector<managed_bytes>().swap(_alloc);
std::vector<managed_ref<uint64_t>>().swap(_alloc_simple);
});
}
void alloc(size_t size = logalloc::segment_size) {
with_allocator(allocator(), [this, size] {
_alloc.push_back(managed_bytes(bytes(bytes::initialized_later(), size)));
});
}
void alloc_small(size_t nr = 1) {
with_allocator(allocator(), [this] {
_alloc_simple.emplace_back(make_managed<uint64_t>());
});
}
private:
std::vector<managed_bytes> _alloc;
// For small objects we don't want to get caught in basic_sstring's internal buffer. We know
// which size we need to allocate to avoid that, but that's technically internal representation.
// Better to use integers if we want something small.
std::vector<managed_ref<uint64_t>> _alloc_simple;
};
SEASTAR_TEST_CASE(test_region_groups_basic_throttling) {
return seastar::async([] {
region_group_reclaimer simple_reclaimer(logalloc::segment_size);
// singleton hierarchy, only one segment allowed
test_region_group simple(simple_reclaimer);
auto simple_region = std::make_unique<test_region>(simple);
// Expectation: after first allocation region will have one segment,
// memory_used() == throttle_threshold and we are good to go, future
// is ready immediately.
//
// The allocation of the first element won't change the memory usage inside
// the group and we'll be okay to do that a second time.
auto fut = simple.run_when_memory_available([&simple_region] { simple_region->alloc_small(); }, db::no_timeout);
BOOST_REQUIRE_EQUAL(fut.available(), true);
BOOST_REQUIRE_EQUAL(simple.memory_used(), logalloc::segment_size);
fut = simple.run_when_memory_available([&simple_region] { simple_region->alloc_small(); }, db::no_timeout);
BOOST_REQUIRE_EQUAL(fut.available(), true);
BOOST_REQUIRE_EQUAL(simple.memory_used(), logalloc::segment_size);
auto big_region = std::make_unique<test_region>(simple);
// Allocate a big chunk, that will certainly get us over the threshold
big_region->alloc();
// We should not be permitted to go forward with a new allocation now...
testlog.info("now = {}", lowres_clock::now().time_since_epoch().count());
fut = simple.run_when_memory_available([&simple_region] { simple_region->alloc_small(); }, db::no_timeout);
BOOST_REQUIRE_EQUAL(fut.available(), false);
BOOST_REQUIRE_GT(simple.memory_used(), logalloc::segment_size);
testlog.info("now = {}", lowres_clock::now().time_since_epoch().count());
testlog.info("used = {}", simple.memory_used());
testlog.info("Resetting");
// But when we remove the big bytes allocator from the region, then we should.
// Internally, we can't guarantee that just freeing the object will give the segment back,
// that's up to the internal policies. So to make sure we need to remove the whole region.
big_region.reset();
testlog.info("used = {}", simple.memory_used());
testlog.info("now = {}", lowres_clock::now().time_since_epoch().count());
try {
quiesce(std::move(fut));
} catch (...) {
testlog.info("Aborting: {}", std::current_exception());
testlog.info("now = {}", lowres_clock::now().time_since_epoch().count());
testlog.info("used = {}", simple.memory_used());
abort();
}
testlog.info("now = {}", lowres_clock::now().time_since_epoch().count());
});
}
SEASTAR_TEST_CASE(test_region_groups_linear_hierarchy_throttling_child_alloc) {
return seastar::async([] {
region_group_reclaimer parent_reclaimer(2 * logalloc::segment_size);
region_group_reclaimer child_reclaimer(logalloc::segment_size);
test_region_group parent(parent_reclaimer);
test_region_group child(&parent, child_reclaimer);
auto child_region = std::make_unique<test_region>(child);
auto parent_region = std::make_unique<test_region>(parent);
child_region->alloc();
BOOST_REQUIRE_GE(parent.memory_used(), logalloc::segment_size);
auto fut = parent.run_when_memory_available([&parent_region] { parent_region->alloc_small(); }, db::no_timeout);
BOOST_REQUIRE_EQUAL(fut.available(), true);
BOOST_REQUIRE_GE(parent.memory_used(), 2 * logalloc::segment_size);
// This time child will use all parent's memory. Note that because the child's memory limit
// is lower than the parent's, for that to happen we need to allocate directly.
child_region->alloc();
BOOST_REQUIRE_GE(child.memory_used(), 2 * logalloc::segment_size);
fut = parent.run_when_memory_available([&parent_region] { parent_region->alloc_small(); }, db::no_timeout);
BOOST_REQUIRE_EQUAL(fut.available(), false);
BOOST_REQUIRE_GE(parent.memory_used(), 2 * logalloc::segment_size);
child_region.reset();
quiesce(std::move(fut));
});
}
SEASTAR_TEST_CASE(test_region_groups_linear_hierarchy_throttling_parent_alloc) {
return seastar::async([] {
region_group_reclaimer simple_reclaimer(logalloc::segment_size);
test_region_group parent(simple_reclaimer);
test_region_group child(&parent, simple_reclaimer);
auto parent_region = std::make_unique<test_region>(parent);
parent_region->alloc();
BOOST_REQUIRE_GE(parent.memory_used(), logalloc::segment_size);
auto fut = child.run_when_memory_available([] {}, db::no_timeout);
BOOST_REQUIRE_EQUAL(fut.available(), false);
parent_region.reset();
quiesce(std::move(fut));
});
}
SEASTAR_TEST_CASE(test_region_groups_fifo_order) {
// tests that requests that are queued for later execution execute in FIFO order
return seastar::async([] {
region_group_reclaimer simple_reclaimer(logalloc::segment_size);
test_region_group rg(simple_reclaimer);
auto region = std::make_unique<test_region>(rg);
// fill the parent. Try allocating at child level. Should not be allowed.
region->alloc();
BOOST_REQUIRE_GE(rg.memory_used(), logalloc::segment_size);
auto exec_cnt = make_lw_shared<int>(0);
std::vector<future<>> executions;
for (auto index = 0; index < 100; ++index) {
auto fut = rg.run_when_memory_available([exec_cnt, index] {
BOOST_REQUIRE_EQUAL(index, (*exec_cnt)++);
}, db::no_timeout);
BOOST_REQUIRE_EQUAL(fut.available(), false);
executions.push_back(std::move(fut));
}
region.reset();
quiesce(when_all(executions.begin(), executions.end()));
});
}
SEASTAR_TEST_CASE(test_region_groups_linear_hierarchy_throttling_moving_restriction) {
// Hierarchy here is A -> B -> C.
// We will fill B causing an execution in C to fail. We then fill A and free B.
//
// C should still be blocked.
return seastar::async([] {
region_group_reclaimer simple_reclaimer(logalloc::segment_size);
test_region_group root(simple_reclaimer);
test_region_group inner(&root, simple_reclaimer);
test_region_group child(&inner, simple_reclaimer);
auto inner_region = std::make_unique<test_region>(inner);
auto root_region = std::make_unique<test_region>(root);
// fill the inner node. Try allocating at child level. Should not be allowed.
circular_buffer<managed_bytes> big_alloc;
with_allocator(inner_region->allocator(), [&big_alloc] {
big_alloc.push_back(managed_bytes(bytes(bytes::initialized_later(), logalloc::segment_size)));
});
BOOST_REQUIRE_GE(inner.memory_used(), logalloc::segment_size);
auto fut = child.run_when_memory_available([] {}, db::no_timeout);
BOOST_REQUIRE_EQUAL(fut.available(), false);
// Now fill the root...
with_allocator(root_region->allocator(), [&big_alloc] {
big_alloc.push_back(managed_bytes(bytes(bytes::initialized_later(), logalloc::segment_size)));
});
BOOST_REQUIRE_GE(root.memory_used(), logalloc::segment_size);
// And free the inner node. We will verify that
// 1) the notifications that the inner node sent the child when it was freed won't
// erroneously cause it to execute
// 2) the child is still able to receive notifications from the root
with_allocator(inner_region->allocator(), [&big_alloc] {
big_alloc.pop_front();
});
inner_region.reset();
// Verifying (1)
// Can't quiesce because we don't want to wait on the futures.
sleep(10ms).get();
BOOST_REQUIRE_EQUAL(fut.available(), false);
// Verifying (2)
with_allocator(root_region->allocator(), [&big_alloc] {
big_alloc.pop_front();
});
root_region.reset();
quiesce(std::move(fut));
});
}
SEASTAR_TEST_CASE(test_region_groups_tree_hierarchy_throttling_leaf_alloc) {
return seastar::async([] {
class leaf {
region_group_reclaimer _leaf_reclaimer;
test_region_group _rg;
std::unique_ptr<test_region> _region;
public:
leaf(test_region_group& parent)
: _leaf_reclaimer(logalloc::segment_size)
, _rg(&parent, _leaf_reclaimer)
, _region(std::make_unique<test_region>(_rg))
{}
void alloc(size_t size) {
_region->alloc(size);
}
future<> try_alloc(size_t size) {
return _rg.run_when_memory_available([this, size] {
alloc(size);
}, db::no_timeout);
}
void reset() {
_region.reset(new test_region(_rg));
}
};
region_group_reclaimer simple_reclaimer(logalloc::segment_size);
test_region_group parent(simple_reclaimer);
leaf first_leaf(parent);
leaf second_leaf(parent);
leaf third_leaf(parent);
first_leaf.alloc(logalloc::segment_size);
second_leaf.alloc(logalloc::segment_size);
third_leaf.alloc(logalloc::segment_size);
auto fut_1 = first_leaf.try_alloc(sizeof(uint64_t));
auto fut_2 = second_leaf.try_alloc(sizeof(uint64_t));
auto fut_3 = third_leaf.try_alloc(sizeof(uint64_t));
BOOST_REQUIRE_EQUAL(fut_1.available() || fut_2.available() || fut_3.available(), false);
// Total memory is still 2 * segment_size, can't proceed
first_leaf.reset();
// Can't quiesce because we don't want to wait on the futures.
sleep(10ms).get();
BOOST_REQUIRE_EQUAL(fut_1.available() || fut_2.available() || fut_3.available(), false);
// Now all futures should resolve.
first_leaf.reset();
second_leaf.reset();
third_leaf.reset();
quiesce(when_all(std::move(fut_1), std::move(fut_2), std::move(fut_3)));
});
}
// Helper for all async reclaim tests.
class test_async_reclaim_region {
logalloc::region _region;
std::vector<managed_bytes> _alloc;
size_t _alloc_size;
// Make sure we don't reclaim the same region more than once. It is supposed to be empty
// after the first reclaim
int _reclaim_counter = 0;
region_group& _rg;
public:
test_async_reclaim_region(region_group& rg, size_t alloc_size)
: _region(rg)
, _alloc_size(alloc_size)
, _rg(rg)
{
with_allocator(_region.allocator(), [this] {
_alloc.push_back(managed_bytes(bytes(bytes::initialized_later(), this->_alloc_size)));
});
}
~test_async_reclaim_region() {
with_allocator(_region.allocator(), [this] {
std::vector<managed_bytes>().swap(_alloc);
});
}
size_t evict() {
BOOST_REQUIRE_EQUAL(_reclaim_counter++, 0);
with_allocator(_region.allocator(), [this] {
std::vector<managed_bytes>().swap(_alloc);
});
_region = logalloc::region(_rg);
return this->_alloc_size;
}
static test_async_reclaim_region& from_region(region* region_ptr) {
auto aptr = boost::intrusive::get_parent_from_member(region_ptr, &test_async_reclaim_region::_region);
return *aptr;
}
};
class test_reclaimer: public region_group_reclaimer {
test_reclaimer *_result_accumulator;
region_group _rg;
std::vector<size_t> _reclaim_sizes;
shared_promise<> _unleash_reclaimer;
seastar::gate _reclaimers_done;
promise<> _unleashed;
public:
virtual void start_reclaiming() noexcept override {
// Future is waited on indirectly in `~test_reclaimer()` (via `_reclaimers_done`).
(void)with_gate(_reclaimers_done, [this] {
return _unleash_reclaimer.get_shared_future().then([this] {
_unleashed.set_value();
while (this->under_pressure()) {
size_t reclaimed = test_async_reclaim_region::from_region(_rg.get_largest_region()).evict();
_result_accumulator->_reclaim_sizes.push_back(reclaimed);
}
});
});
}
~test_reclaimer() {
_reclaimers_done.close().get();
_rg.shutdown().get();
}
std::vector<size_t>& reclaim_sizes() {
return _reclaim_sizes;
}
region_group& rg() {
return _rg;
}
test_reclaimer(size_t threshold) : region_group_reclaimer(threshold), _result_accumulator(this), _rg("test_reclaimer RG", *this) {}
test_reclaimer(test_reclaimer& parent, size_t threshold) : region_group_reclaimer(threshold), _result_accumulator(&parent), _rg("test_reclaimer RG", &parent._rg, *this) {}
future<> unleash(future<> after) {
// Result indirectly forwarded to _unleashed (returned below).
(void)after.then([this] { _unleash_reclaimer.set_value(); });
return _unleashed.get_future();
}
};
SEASTAR_TEST_CASE(test_region_groups_basic_throttling_simple_active_reclaim) {
return seastar::async([] {
// allocate a single region to exhaustion, and make sure active reclaim is activated.
test_reclaimer simple(logalloc::segment_size);
test_async_reclaim_region simple_region(simple.rg(), logalloc::segment_size);
// FIXME: discarded future.
(void)simple.unleash(make_ready_future<>());
// Can't run this function until we have reclaimed something
auto fut = simple.rg().run_when_memory_available([] {}, db::no_timeout);
// Initially not available
BOOST_REQUIRE_EQUAL(fut.available(), false);
quiesce(std::move(fut));
BOOST_REQUIRE_EQUAL(simple.reclaim_sizes().size(), 1);
});
}
SEASTAR_TEST_CASE(test_region_groups_basic_throttling_active_reclaim_worst_offender) {
return seastar::async([] {
// allocate three regions with three different sizes (segment boundary must be used due to
// LSA granularity).
//
// The function can only be executed when all three are freed - which exercises continous
// reclaim, but they must be freed in descending order of their sizes
test_reclaimer simple(logalloc::segment_size);
test_async_reclaim_region small_region(simple.rg(), logalloc::segment_size);
test_async_reclaim_region medium_region(simple.rg(), 2 * logalloc::segment_size);
test_async_reclaim_region big_region(simple.rg(), 3 * logalloc::segment_size);
// FIXME: discarded future.
(void)simple.unleash(make_ready_future<>());
// Can't run this function until we have reclaimed
auto fut = simple.rg().run_when_memory_available([&simple] {
BOOST_REQUIRE_EQUAL(simple.reclaim_sizes().size(), 3);
}, db::no_timeout);
// Initially not available
BOOST_REQUIRE_EQUAL(fut.available(), false);
quiesce(std::move(fut));
// Test if the ordering is the one we have expected
BOOST_REQUIRE_EQUAL(simple.reclaim_sizes()[2], logalloc::segment_size);
BOOST_REQUIRE_EQUAL(simple.reclaim_sizes()[1], 2 * logalloc::segment_size);
BOOST_REQUIRE_EQUAL(simple.reclaim_sizes()[0], 3 * logalloc::segment_size);
});
}
SEASTAR_TEST_CASE(test_region_groups_basic_throttling_active_reclaim_leaf_offender) {
return seastar::async([] {
// allocate a parent region group (A) with two leaf region groups (B and C), so that B has
// the largest size, then A, then C. Make sure that the freeing happens in descending order.
// of their sizes regardless of the topology
test_reclaimer root(logalloc::segment_size);
test_reclaimer large_leaf(root, logalloc::segment_size);
test_reclaimer small_leaf(root, logalloc::segment_size);
test_async_reclaim_region small_region(small_leaf.rg(), logalloc::segment_size);
test_async_reclaim_region medium_region(root.rg(), 2 * logalloc::segment_size);
test_async_reclaim_region big_region(large_leaf.rg(), 3 * logalloc::segment_size);
auto fr = root.unleash(make_ready_future<>());
auto flf = large_leaf.unleash(std::move(fr));
// FIXME: discarded future.
(void)small_leaf.unleash(std::move(flf));
// Can't run this function until we have reclaimed. Try at the root, and we'll make sure
// that the leaves are forced correctly.
auto fut = root.rg().run_when_memory_available([&root] {
BOOST_REQUIRE_EQUAL(root.reclaim_sizes().size(), 3);
}, db::no_timeout);
// Initially not available
BOOST_REQUIRE_EQUAL(fut.available(), false);
quiesce(std::move(fut));
// Test if the ordering is the one we have expected
BOOST_REQUIRE_EQUAL(root.reclaim_sizes()[2], logalloc::segment_size);
BOOST_REQUIRE_EQUAL(root.reclaim_sizes()[1], 2 * logalloc::segment_size);
BOOST_REQUIRE_EQUAL(root.reclaim_sizes()[0], 3 * logalloc::segment_size);
});
}
SEASTAR_TEST_CASE(test_region_groups_basic_throttling_active_reclaim_ancestor_block) {
return seastar::async([] {
// allocate a parent region group (A) with a leaf region group (B)
// Make sure that active reclaim still works when we block at an ancestor
test_reclaimer root(logalloc::segment_size);
test_reclaimer leaf(root, logalloc::segment_size);
test_async_reclaim_region root_region(root.rg(), logalloc::segment_size);
auto f = root.unleash(make_ready_future<>());
// FIXME: discarded future.
(void)leaf.unleash(std::move(f));
// Can't run this function until we have reclaimed. Try at the leaf, and we'll make sure
// that the root reclaims
auto fut = leaf.rg().run_when_memory_available([&root] {
BOOST_REQUIRE_EQUAL(root.reclaim_sizes().size(), 1);
}, db::no_timeout);
// Initially not available
BOOST_REQUIRE_EQUAL(fut.available(), false);
quiesce(std::move(fut));
BOOST_REQUIRE_EQUAL(root.reclaim_sizes()[0], logalloc::segment_size);
});
}
SEASTAR_TEST_CASE(test_region_groups_basic_throttling_active_reclaim_big_region_goes_first) {
return seastar::async([] {
// allocate a parent region group (A) with a leaf region group (B). B's usage is higher, but
// due to multiple small regions. Make sure we reclaim from A first.
test_reclaimer root(logalloc::segment_size);
test_reclaimer leaf(root, logalloc::segment_size);
test_async_reclaim_region root_region(root.rg(), 4 * logalloc::segment_size);
test_async_reclaim_region big_leaf_region(leaf.rg(), 3 * logalloc::segment_size);
test_async_reclaim_region small_leaf_region(leaf.rg(), 2 * logalloc::segment_size);
auto f = root.unleash(make_ready_future<>());
// FIXME: discarded future.
(void)leaf.unleash(std::move(f));
auto fut = root.rg().run_when_memory_available([&root] {
BOOST_REQUIRE_EQUAL(root.reclaim_sizes().size(), 3);
}, db::no_timeout);
// Initially not available
BOOST_REQUIRE_EQUAL(fut.available(), false);
quiesce(std::move(fut));
BOOST_REQUIRE_EQUAL(root.reclaim_sizes()[2], 2 * logalloc::segment_size);
BOOST_REQUIRE_EQUAL(root.reclaim_sizes()[1], 3 * logalloc::segment_size);
BOOST_REQUIRE_EQUAL(root.reclaim_sizes()[0], 4 * logalloc::segment_size);
});
}
SEASTAR_TEST_CASE(test_region_groups_basic_throttling_active_reclaim_no_double_reclaim) {
return seastar::async([] {
// allocate a parent region group (A) with a leaf region group (B), and let B go over limit.
// Both A and B try to execute requests, and we need to make sure that doesn't cause B's
// region eviction function to be called more than once. Node that test_async_reclaim_region
// will already make sure that we don't have double calls, so all we have to do is to
// generate a situation in which a double call would happen
test_reclaimer root(logalloc::segment_size);
test_reclaimer leaf(root, logalloc::segment_size);
test_async_reclaim_region leaf_region(leaf.rg(), logalloc::segment_size);
auto f = root.unleash(make_ready_future<>());
// FIXME: discarded future.
(void)leaf.unleash(std::move(f));
auto fut_root = root.rg().run_when_memory_available([&root] {
BOOST_REQUIRE_EQUAL(root.reclaim_sizes().size(), 1);
}, db::no_timeout);
auto fut_leaf = leaf.rg().run_when_memory_available([&root] {
BOOST_REQUIRE_EQUAL(root.reclaim_sizes().size(), 1);
}, db::no_timeout);
// Initially not available
BOOST_REQUIRE_EQUAL(fut_root.available(), false);
BOOST_REQUIRE_EQUAL(fut_leaf.available(), false);
quiesce(std::move(fut_root));
quiesce(std::move(fut_leaf));
BOOST_REQUIRE_EQUAL(root.reclaim_sizes().size(), 1);
BOOST_REQUIRE_EQUAL(root.reclaim_sizes()[0], logalloc::segment_size);
});
}
// Reproduces issue #2021
SEASTAR_TEST_CASE(test_no_crash_when_a_lot_of_requests_released_which_change_region_group_size) {
return seastar::async([test_name = get_name()] {
#ifndef SEASTAR_DEFAULT_ALLOCATOR // Because we need memory::stats().free_memory();
logging::logger_registry().set_logger_level("lsa", seastar::log_level::debug);
auto free_space = memory::stats().free_memory();
size_t threshold = size_t(0.75 * free_space);
region_group_reclaimer recl(threshold, threshold);
region_group gr(test_name, recl);
auto close_gr = defer([&gr] () noexcept { gr.shutdown().get(); });
region r(gr);
with_allocator(r.allocator(), [&] {
std::vector<managed_bytes> objs;
r.make_evictable([&] {
if (objs.empty()) {
return memory::reclaiming_result::reclaimed_nothing;
}
with_allocator(r.allocator(), [&] {
objs.pop_back();
});
return memory::reclaiming_result::reclaimed_something;
});
auto fill_to_pressure = [&] {
while (!recl.under_pressure()) {
objs.emplace_back(managed_bytes(managed_bytes::initialized_later(), 1024));
}
};
utils::phased_barrier request_barrier;
auto wait_for_requests = defer([&] () noexcept { request_barrier.advance_and_await().get(); });
for (int i = 0; i < 1000000; ++i) {
fill_to_pressure();
future<> f = gr.run_when_memory_available([&, op = request_barrier.start()] {
// Trigger group size change (Refs issue #2021)
gr.update(-10);
gr.update(+10);
}, db::no_timeout);
BOOST_REQUIRE(!f.available());
}
// Release
while (recl.under_pressure()) {
objs.pop_back();
}
});
#endif
});
}
SEASTAR_TEST_CASE(test_reclaiming_runs_as_long_as_there_is_soft_pressure) {
return seastar::async([test_name = get_name()] {
size_t hard_threshold = logalloc::segment_size * 8;
size_t soft_threshold = hard_threshold / 2;
class reclaimer : public region_group_reclaimer {
bool _reclaim = false;
protected:
void start_reclaiming() noexcept override {
_reclaim = true;
}
void stop_reclaiming() noexcept override {
_reclaim = false;
}
public:
reclaimer(size_t hard_threshold, size_t soft_threshold)
: region_group_reclaimer(hard_threshold, soft_threshold)
{ }
bool reclaiming() const { return _reclaim; };
};
reclaimer recl(hard_threshold, soft_threshold);
region_group gr(test_name, recl);
auto close_gr = defer([&gr] () noexcept { gr.shutdown().get(); });
region r(gr);
with_allocator(r.allocator(), [&] {
std::vector<managed_bytes> objs;
BOOST_REQUIRE(!recl.reclaiming());
while (!recl.over_soft_limit()) {
objs.emplace_back(managed_bytes(managed_bytes::initialized_later(), logalloc::segment_size));
}
BOOST_REQUIRE(recl.reclaiming());
while (!recl.under_pressure()) {
objs.emplace_back(managed_bytes(managed_bytes::initialized_later(), logalloc::segment_size));
}
BOOST_REQUIRE(recl.reclaiming());
while (recl.under_pressure()) {
objs.pop_back();
}
BOOST_REQUIRE(recl.over_soft_limit());
BOOST_REQUIRE(recl.reclaiming());
while (recl.over_soft_limit()) {
objs.pop_back();
}
BOOST_REQUIRE(!recl.reclaiming());
});
});
}

View File

@@ -44,6 +44,7 @@ static auto x = [] {
}();
using namespace logalloc;
using namespace std::chrono_literals;
// this test should be first in order to initialize logalloc for others
SEASTAR_TEST_CASE(test_prime_logalloc) {
@@ -470,786 +471,6 @@ SEASTAR_TEST_CASE(test_large_allocation) {
}
#endif
SEASTAR_TEST_CASE(test_region_groups) {
return seastar::async([] {
logalloc::region_group just_four;
logalloc::region_group all;
logalloc::region_group one_and_two("one_and_two", &all);
auto one = std::make_unique<logalloc::region>(one_and_two);
auto two = std::make_unique<logalloc::region>(one_and_two);
auto three = std::make_unique<logalloc::region>(all);
auto four = std::make_unique<logalloc::region>(just_four);
auto five = std::make_unique<logalloc::region>();
constexpr size_t base_count = 16 * 1024;
constexpr size_t one_count = 16 * base_count;
std::vector<managed_ref<int>> one_objs;
with_allocator(one->allocator(), [&] {
for (size_t i = 0; i < one_count; i++) {
one_objs.emplace_back(make_managed<int>());
}
});
BOOST_REQUIRE_GE(ssize_t(one->occupancy().used_space()), ssize_t(one_count * sizeof(int)));
BOOST_REQUIRE_GE(ssize_t(one->occupancy().total_space()), ssize_t(one->occupancy().used_space()));
BOOST_REQUIRE_EQUAL(one_and_two.memory_used(), one->occupancy().total_space());
BOOST_REQUIRE_EQUAL(all.memory_used(), one->occupancy().total_space());
constexpr size_t two_count = 8 * base_count;
std::vector<managed_ref<int>> two_objs;
with_allocator(two->allocator(), [&] {
for (size_t i = 0; i < two_count; i++) {
two_objs.emplace_back(make_managed<int>());
}
});
BOOST_REQUIRE_GE(ssize_t(two->occupancy().used_space()), ssize_t(two_count * sizeof(int)));
BOOST_REQUIRE_GE(ssize_t(two->occupancy().total_space()), ssize_t(two->occupancy().used_space()));
BOOST_REQUIRE_EQUAL(one_and_two.memory_used(), one->occupancy().total_space() + two->occupancy().total_space());
BOOST_REQUIRE_EQUAL(all.memory_used(), one_and_two.memory_used());
constexpr size_t three_count = 32 * base_count;
std::vector<managed_ref<int>> three_objs;
with_allocator(three->allocator(), [&] {
for (size_t i = 0; i < three_count; i++) {
three_objs.emplace_back(make_managed<int>());
}
});
BOOST_REQUIRE_GE(ssize_t(three->occupancy().used_space()), ssize_t(three_count * sizeof(int)));
BOOST_REQUIRE_GE(ssize_t(three->occupancy().total_space()), ssize_t(three->occupancy().used_space()));
BOOST_REQUIRE_EQUAL(all.memory_used(), one_and_two.memory_used() + three->occupancy().total_space());
constexpr size_t four_count = 4 * base_count;
std::vector<managed_ref<int>> four_objs;
with_allocator(four->allocator(), [&] {
for (size_t i = 0; i < four_count; i++) {
four_objs.emplace_back(make_managed<int>());
}
});
BOOST_REQUIRE_GE(ssize_t(four->occupancy().used_space()), ssize_t(four_count * sizeof(int)));
BOOST_REQUIRE_GE(ssize_t(four->occupancy().total_space()), ssize_t(four->occupancy().used_space()));
BOOST_REQUIRE_EQUAL(just_four.memory_used(), four->occupancy().total_space());
with_allocator(five->allocator(), [] {
constexpr size_t five_count = base_count;
std::vector<managed_ref<int>> five_objs;
for (size_t i = 0; i < five_count; i++) {
five_objs.emplace_back(make_managed<int>());
}
});
three->merge(*four);
BOOST_REQUIRE_GE(ssize_t(three->occupancy().used_space()), ssize_t((three_count + four_count)* sizeof(int)));
BOOST_REQUIRE_GE(ssize_t(three->occupancy().total_space()), ssize_t(three->occupancy().used_space()));
BOOST_REQUIRE_EQUAL(all.memory_used(), one_and_two.memory_used() + three->occupancy().total_space());
BOOST_REQUIRE_EQUAL(just_four.memory_used(), 0);
three->merge(*five);
BOOST_REQUIRE_GE(ssize_t(three->occupancy().used_space()), ssize_t((three_count + four_count)* sizeof(int)));
BOOST_REQUIRE_GE(ssize_t(three->occupancy().total_space()), ssize_t(three->occupancy().used_space()));
BOOST_REQUIRE_EQUAL(all.memory_used(), one_and_two.memory_used() + three->occupancy().total_space());
with_allocator(two->allocator(), [&] {
two_objs.clear();
});
two.reset();
BOOST_REQUIRE_EQUAL(one_and_two.memory_used(), one->occupancy().total_space());
BOOST_REQUIRE_EQUAL(all.memory_used(), one_and_two.memory_used() + three->occupancy().total_space());
with_allocator(one->allocator(), [&] {
one_objs.clear();
});
one.reset();
BOOST_REQUIRE_EQUAL(one_and_two.memory_used(), 0);
BOOST_REQUIRE_EQUAL(all.memory_used(), three->occupancy().total_space());
with_allocator(three->allocator(), [&] {
three_objs.clear();
four_objs.clear();
});
three.reset();
four.reset();
five.reset();
BOOST_REQUIRE_EQUAL(all.memory_used(), 0);
});
}
using namespace std::chrono_literals;
template <typename FutureType>
inline void quiesce(FutureType&& fut) {
// Unfortunately seastar::thread::yield is not enough here, because the process of releasing
// a request may be broken into many continuations. While we could just yield many times, the
// exact amount needed to guarantee execution would be dependent on the internals of the
// implementation, we want to avoid that.
with_timeout(lowres_clock::now() + 2s, std::move(fut)).get();
}
// Simple RAII structure that wraps around a region_group
// Not using defer because we usually employ many region groups
struct test_region_group: public logalloc::region_group {
test_region_group(region_group* parent, region_group_reclaimer& reclaimer)
: logalloc::region_group("test_region_group", parent, reclaimer) {}
test_region_group(region_group_reclaimer& reclaimer)
: logalloc::region_group("test_region_group", nullptr, reclaimer) {}
~test_region_group() {
shutdown().get();
}
};
struct test_region: public logalloc::region {
test_region(test_region_group& rg) : logalloc::region(rg) {}
~test_region() {
clear();
}
void clear() {
with_allocator(allocator(), [this] {
std::vector<managed_bytes>().swap(_alloc);
std::vector<managed_ref<uint64_t>>().swap(_alloc_simple);
});
}
void alloc(size_t size = logalloc::segment_size) {
with_allocator(allocator(), [this, size] {
_alloc.push_back(managed_bytes(bytes(bytes::initialized_later(), size)));
});
}
void alloc_small(size_t nr = 1) {
with_allocator(allocator(), [this] {
_alloc_simple.emplace_back(make_managed<uint64_t>());
});
}
private:
std::vector<managed_bytes> _alloc;
// For small objects we don't want to get caught in basic_sstring's internal buffer. We know
// which size we need to allocate to avoid that, but that's technically internal representation.
// Better to use integers if we want something small.
std::vector<managed_ref<uint64_t>> _alloc_simple;
};
SEASTAR_TEST_CASE(test_region_groups_basic_throttling) {
return seastar::async([] {
region_group_reclaimer simple_reclaimer(logalloc::segment_size);
// singleton hierarchy, only one segment allowed
test_region_group simple(simple_reclaimer);
auto simple_region = std::make_unique<test_region>(simple);
// Expectation: after first allocation region will have one segment,
// memory_used() == throttle_threshold and we are good to go, future
// is ready immediately.
//
// The allocation of the first element won't change the memory usage inside
// the group and we'll be okay to do that a second time.
auto fut = simple.run_when_memory_available([&simple_region] { simple_region->alloc_small(); }, db::no_timeout);
BOOST_REQUIRE_EQUAL(fut.available(), true);
BOOST_REQUIRE_EQUAL(simple.memory_used(), logalloc::segment_size);
fut = simple.run_when_memory_available([&simple_region] { simple_region->alloc_small(); }, db::no_timeout);
BOOST_REQUIRE_EQUAL(fut.available(), true);
BOOST_REQUIRE_EQUAL(simple.memory_used(), logalloc::segment_size);
auto big_region = std::make_unique<test_region>(simple);
// Allocate a big chunk, that will certainly get us over the threshold
big_region->alloc();
// We should not be permitted to go forward with a new allocation now...
testlog.info("now = {}", lowres_clock::now().time_since_epoch().count());
fut = simple.run_when_memory_available([&simple_region] { simple_region->alloc_small(); }, db::no_timeout);
BOOST_REQUIRE_EQUAL(fut.available(), false);
BOOST_REQUIRE_GT(simple.memory_used(), logalloc::segment_size);
testlog.info("now = {}", lowres_clock::now().time_since_epoch().count());
testlog.info("used = {}", simple.memory_used());
testlog.info("Resetting");
// But when we remove the big bytes allocator from the region, then we should.
// Internally, we can't guarantee that just freeing the object will give the segment back,
// that's up to the internal policies. So to make sure we need to remove the whole region.
big_region.reset();
testlog.info("used = {}", simple.memory_used());
testlog.info("now = {}", lowres_clock::now().time_since_epoch().count());
try {
quiesce(std::move(fut));
} catch (...) {
testlog.info("Aborting: {}", std::current_exception());
testlog.info("now = {}", lowres_clock::now().time_since_epoch().count());
testlog.info("used = {}", simple.memory_used());
abort();
}
testlog.info("now = {}", lowres_clock::now().time_since_epoch().count());
});
}
SEASTAR_TEST_CASE(test_region_groups_linear_hierarchy_throttling_child_alloc) {
return seastar::async([] {
region_group_reclaimer parent_reclaimer(2 * logalloc::segment_size);
region_group_reclaimer child_reclaimer(logalloc::segment_size);
test_region_group parent(parent_reclaimer);
test_region_group child(&parent, child_reclaimer);
auto child_region = std::make_unique<test_region>(child);
auto parent_region = std::make_unique<test_region>(parent);
child_region->alloc();
BOOST_REQUIRE_GE(parent.memory_used(), logalloc::segment_size);
auto fut = parent.run_when_memory_available([&parent_region] { parent_region->alloc_small(); }, db::no_timeout);
BOOST_REQUIRE_EQUAL(fut.available(), true);
BOOST_REQUIRE_GE(parent.memory_used(), 2 * logalloc::segment_size);
// This time child will use all parent's memory. Note that because the child's memory limit
// is lower than the parent's, for that to happen we need to allocate directly.
child_region->alloc();
BOOST_REQUIRE_GE(child.memory_used(), 2 * logalloc::segment_size);
fut = parent.run_when_memory_available([&parent_region] { parent_region->alloc_small(); }, db::no_timeout);
BOOST_REQUIRE_EQUAL(fut.available(), false);
BOOST_REQUIRE_GE(parent.memory_used(), 2 * logalloc::segment_size);
child_region.reset();
quiesce(std::move(fut));
});
}
SEASTAR_TEST_CASE(test_region_groups_linear_hierarchy_throttling_parent_alloc) {
return seastar::async([] {
region_group_reclaimer simple_reclaimer(logalloc::segment_size);
test_region_group parent(simple_reclaimer);
test_region_group child(&parent, simple_reclaimer);
auto parent_region = std::make_unique<test_region>(parent);
parent_region->alloc();
BOOST_REQUIRE_GE(parent.memory_used(), logalloc::segment_size);
auto fut = child.run_when_memory_available([] {}, db::no_timeout);
BOOST_REQUIRE_EQUAL(fut.available(), false);
parent_region.reset();
quiesce(std::move(fut));
});
}
SEASTAR_TEST_CASE(test_region_groups_fifo_order) {
// tests that requests that are queued for later execution execute in FIFO order
return seastar::async([] {
region_group_reclaimer simple_reclaimer(logalloc::segment_size);
test_region_group rg(simple_reclaimer);
auto region = std::make_unique<test_region>(rg);
// fill the parent. Try allocating at child level. Should not be allowed.
region->alloc();
BOOST_REQUIRE_GE(rg.memory_used(), logalloc::segment_size);
auto exec_cnt = make_lw_shared<int>(0);
std::vector<future<>> executions;
for (auto index = 0; index < 100; ++index) {
auto fut = rg.run_when_memory_available([exec_cnt, index] {
BOOST_REQUIRE_EQUAL(index, (*exec_cnt)++);
}, db::no_timeout);
BOOST_REQUIRE_EQUAL(fut.available(), false);
executions.push_back(std::move(fut));
}
region.reset();
quiesce(when_all(executions.begin(), executions.end()));
});
}
SEASTAR_TEST_CASE(test_region_groups_linear_hierarchy_throttling_moving_restriction) {
// Hierarchy here is A -> B -> C.
// We will fill B causing an execution in C to fail. We then fill A and free B.
//
// C should still be blocked.
return seastar::async([] {
region_group_reclaimer simple_reclaimer(logalloc::segment_size);
test_region_group root(simple_reclaimer);
test_region_group inner(&root, simple_reclaimer);
test_region_group child(&inner, simple_reclaimer);
auto inner_region = std::make_unique<test_region>(inner);
auto root_region = std::make_unique<test_region>(root);
// fill the inner node. Try allocating at child level. Should not be allowed.
circular_buffer<managed_bytes> big_alloc;
with_allocator(inner_region->allocator(), [&big_alloc] {
big_alloc.push_back(managed_bytes(bytes(bytes::initialized_later(), logalloc::segment_size)));
});
BOOST_REQUIRE_GE(inner.memory_used(), logalloc::segment_size);
auto fut = child.run_when_memory_available([] {}, db::no_timeout);
BOOST_REQUIRE_EQUAL(fut.available(), false);
// Now fill the root...
with_allocator(root_region->allocator(), [&big_alloc] {
big_alloc.push_back(managed_bytes(bytes(bytes::initialized_later(), logalloc::segment_size)));
});
BOOST_REQUIRE_GE(root.memory_used(), logalloc::segment_size);
// And free the inner node. We will verify that
// 1) the notifications that the inner node sent the child when it was freed won't
// erroneously cause it to execute
// 2) the child is still able to receive notifications from the root
with_allocator(inner_region->allocator(), [&big_alloc] {
big_alloc.pop_front();
});
inner_region.reset();
// Verifying (1)
// Can't quiesce because we don't want to wait on the futures.
sleep(10ms).get();
BOOST_REQUIRE_EQUAL(fut.available(), false);
// Verifying (2)
with_allocator(root_region->allocator(), [&big_alloc] {
big_alloc.pop_front();
});
root_region.reset();
quiesce(std::move(fut));
});
}
SEASTAR_TEST_CASE(test_region_groups_tree_hierarchy_throttling_leaf_alloc) {
return seastar::async([] {
class leaf {
region_group_reclaimer _leaf_reclaimer;
test_region_group _rg;
std::unique_ptr<test_region> _region;
public:
leaf(test_region_group& parent)
: _leaf_reclaimer(logalloc::segment_size)
, _rg(&parent, _leaf_reclaimer)
, _region(std::make_unique<test_region>(_rg))
{}
void alloc(size_t size) {
_region->alloc(size);
}
future<> try_alloc(size_t size) {
return _rg.run_when_memory_available([this, size] {
alloc(size);
}, db::no_timeout);
}
void reset() {
_region.reset(new test_region(_rg));
}
};
region_group_reclaimer simple_reclaimer(logalloc::segment_size);
test_region_group parent(simple_reclaimer);
leaf first_leaf(parent);
leaf second_leaf(parent);
leaf third_leaf(parent);
first_leaf.alloc(logalloc::segment_size);
second_leaf.alloc(logalloc::segment_size);
third_leaf.alloc(logalloc::segment_size);
auto fut_1 = first_leaf.try_alloc(sizeof(uint64_t));
auto fut_2 = second_leaf.try_alloc(sizeof(uint64_t));
auto fut_3 = third_leaf.try_alloc(sizeof(uint64_t));
BOOST_REQUIRE_EQUAL(fut_1.available() || fut_2.available() || fut_3.available(), false);
// Total memory is still 2 * segment_size, can't proceed
first_leaf.reset();
// Can't quiesce because we don't want to wait on the futures.
sleep(10ms).get();
BOOST_REQUIRE_EQUAL(fut_1.available() || fut_2.available() || fut_3.available(), false);
// Now all futures should resolve.
first_leaf.reset();
second_leaf.reset();
third_leaf.reset();
quiesce(when_all(std::move(fut_1), std::move(fut_2), std::move(fut_3)));
});
}
// Helper for all async reclaim tests.
class test_async_reclaim_region {
logalloc::region _region;
std::vector<managed_bytes> _alloc;
size_t _alloc_size;
// Make sure we don't reclaim the same region more than once. It is supposed to be empty
// after the first reclaim
int _reclaim_counter = 0;
region_group& _rg;
public:
test_async_reclaim_region(region_group& rg, size_t alloc_size)
: _region(rg)
, _alloc_size(alloc_size)
, _rg(rg)
{
with_allocator(_region.allocator(), [this] {
_alloc.push_back(managed_bytes(bytes(bytes::initialized_later(), this->_alloc_size)));
});
}
~test_async_reclaim_region() {
with_allocator(_region.allocator(), [this] {
std::vector<managed_bytes>().swap(_alloc);
});
}
size_t evict() {
BOOST_REQUIRE_EQUAL(_reclaim_counter++, 0);
with_allocator(_region.allocator(), [this] {
std::vector<managed_bytes>().swap(_alloc);
});
_region = logalloc::region(_rg);
return this->_alloc_size;
}
static test_async_reclaim_region& from_region(region* region_ptr) {
auto aptr = boost::intrusive::get_parent_from_member(region_ptr, &test_async_reclaim_region::_region);
return *aptr;
}
};
class test_reclaimer: public region_group_reclaimer {
test_reclaimer *_result_accumulator;
region_group _rg;
std::vector<size_t> _reclaim_sizes;
shared_promise<> _unleash_reclaimer;
seastar::gate _reclaimers_done;
promise<> _unleashed;
public:
virtual void start_reclaiming() noexcept override {
// Future is waited on indirectly in `~test_reclaimer()` (via `_reclaimers_done`).
(void)with_gate(_reclaimers_done, [this] {
return _unleash_reclaimer.get_shared_future().then([this] {
_unleashed.set_value();
while (this->under_pressure()) {
size_t reclaimed = test_async_reclaim_region::from_region(_rg.get_largest_region()).evict();
_result_accumulator->_reclaim_sizes.push_back(reclaimed);
}
});
});
}
~test_reclaimer() {
_reclaimers_done.close().get();
_rg.shutdown().get();
}
std::vector<size_t>& reclaim_sizes() {
return _reclaim_sizes;
}
region_group& rg() {
return _rg;
}
test_reclaimer(size_t threshold) : region_group_reclaimer(threshold), _result_accumulator(this), _rg("test_reclaimer RG", *this) {}
test_reclaimer(test_reclaimer& parent, size_t threshold) : region_group_reclaimer(threshold), _result_accumulator(&parent), _rg("test_reclaimer RG", &parent._rg, *this) {}
future<> unleash(future<> after) {
// Result indirectly forwarded to _unleashed (returned below).
(void)after.then([this] { _unleash_reclaimer.set_value(); });
return _unleashed.get_future();
}
};
SEASTAR_TEST_CASE(test_region_groups_basic_throttling_simple_active_reclaim) {
return seastar::async([] {
// allocate a single region to exhaustion, and make sure active reclaim is activated.
test_reclaimer simple(logalloc::segment_size);
test_async_reclaim_region simple_region(simple.rg(), logalloc::segment_size);
// FIXME: discarded future.
(void)simple.unleash(make_ready_future<>());
// Can't run this function until we have reclaimed something
auto fut = simple.rg().run_when_memory_available([] {}, db::no_timeout);
// Initially not available
BOOST_REQUIRE_EQUAL(fut.available(), false);
quiesce(std::move(fut));
BOOST_REQUIRE_EQUAL(simple.reclaim_sizes().size(), 1);
});
}
SEASTAR_TEST_CASE(test_region_groups_basic_throttling_active_reclaim_worst_offender) {
return seastar::async([] {
// allocate three regions with three different sizes (segment boundary must be used due to
// LSA granularity).
//
// The function can only be executed when all three are freed - which exercises continous
// reclaim, but they must be freed in descending order of their sizes
test_reclaimer simple(logalloc::segment_size);
test_async_reclaim_region small_region(simple.rg(), logalloc::segment_size);
test_async_reclaim_region medium_region(simple.rg(), 2 * logalloc::segment_size);
test_async_reclaim_region big_region(simple.rg(), 3 * logalloc::segment_size);
// FIXME: discarded future.
(void)simple.unleash(make_ready_future<>());
// Can't run this function until we have reclaimed
auto fut = simple.rg().run_when_memory_available([&simple] {
BOOST_REQUIRE_EQUAL(simple.reclaim_sizes().size(), 3);
}, db::no_timeout);
// Initially not available
BOOST_REQUIRE_EQUAL(fut.available(), false);
quiesce(std::move(fut));
// Test if the ordering is the one we have expected
BOOST_REQUIRE_EQUAL(simple.reclaim_sizes()[2], logalloc::segment_size);
BOOST_REQUIRE_EQUAL(simple.reclaim_sizes()[1], 2 * logalloc::segment_size);
BOOST_REQUIRE_EQUAL(simple.reclaim_sizes()[0], 3 * logalloc::segment_size);
});
}
SEASTAR_TEST_CASE(test_region_groups_basic_throttling_active_reclaim_leaf_offender) {
return seastar::async([] {
// allocate a parent region group (A) with two leaf region groups (B and C), so that B has
// the largest size, then A, then C. Make sure that the freeing happens in descending order.
// of their sizes regardless of the topology
test_reclaimer root(logalloc::segment_size);
test_reclaimer large_leaf(root, logalloc::segment_size);
test_reclaimer small_leaf(root, logalloc::segment_size);
test_async_reclaim_region small_region(small_leaf.rg(), logalloc::segment_size);
test_async_reclaim_region medium_region(root.rg(), 2 * logalloc::segment_size);
test_async_reclaim_region big_region(large_leaf.rg(), 3 * logalloc::segment_size);
auto fr = root.unleash(make_ready_future<>());
auto flf = large_leaf.unleash(std::move(fr));
// FIXME: discarded future.
(void)small_leaf.unleash(std::move(flf));
// Can't run this function until we have reclaimed. Try at the root, and we'll make sure
// that the leaves are forced correctly.
auto fut = root.rg().run_when_memory_available([&root] {
BOOST_REQUIRE_EQUAL(root.reclaim_sizes().size(), 3);
}, db::no_timeout);
// Initially not available
BOOST_REQUIRE_EQUAL(fut.available(), false);
quiesce(std::move(fut));
// Test if the ordering is the one we have expected
BOOST_REQUIRE_EQUAL(root.reclaim_sizes()[2], logalloc::segment_size);
BOOST_REQUIRE_EQUAL(root.reclaim_sizes()[1], 2 * logalloc::segment_size);
BOOST_REQUIRE_EQUAL(root.reclaim_sizes()[0], 3 * logalloc::segment_size);
});
}
SEASTAR_TEST_CASE(test_region_groups_basic_throttling_active_reclaim_ancestor_block) {
return seastar::async([] {
// allocate a parent region group (A) with a leaf region group (B)
// Make sure that active reclaim still works when we block at an ancestor
test_reclaimer root(logalloc::segment_size);
test_reclaimer leaf(root, logalloc::segment_size);
test_async_reclaim_region root_region(root.rg(), logalloc::segment_size);
auto f = root.unleash(make_ready_future<>());
// FIXME: discarded future.
(void)leaf.unleash(std::move(f));
// Can't run this function until we have reclaimed. Try at the leaf, and we'll make sure
// that the root reclaims
auto fut = leaf.rg().run_when_memory_available([&root] {
BOOST_REQUIRE_EQUAL(root.reclaim_sizes().size(), 1);
}, db::no_timeout);
// Initially not available
BOOST_REQUIRE_EQUAL(fut.available(), false);
quiesce(std::move(fut));
BOOST_REQUIRE_EQUAL(root.reclaim_sizes()[0], logalloc::segment_size);
});
}
SEASTAR_TEST_CASE(test_region_groups_basic_throttling_active_reclaim_big_region_goes_first) {
return seastar::async([] {
// allocate a parent region group (A) with a leaf region group (B). B's usage is higher, but
// due to multiple small regions. Make sure we reclaim from A first.
test_reclaimer root(logalloc::segment_size);
test_reclaimer leaf(root, logalloc::segment_size);
test_async_reclaim_region root_region(root.rg(), 4 * logalloc::segment_size);
test_async_reclaim_region big_leaf_region(leaf.rg(), 3 * logalloc::segment_size);
test_async_reclaim_region small_leaf_region(leaf.rg(), 2 * logalloc::segment_size);
auto f = root.unleash(make_ready_future<>());
// FIXME: discarded future.
(void)leaf.unleash(std::move(f));
auto fut = root.rg().run_when_memory_available([&root] {
BOOST_REQUIRE_EQUAL(root.reclaim_sizes().size(), 3);
}, db::no_timeout);
// Initially not available
BOOST_REQUIRE_EQUAL(fut.available(), false);
quiesce(std::move(fut));
BOOST_REQUIRE_EQUAL(root.reclaim_sizes()[2], 2 * logalloc::segment_size);
BOOST_REQUIRE_EQUAL(root.reclaim_sizes()[1], 3 * logalloc::segment_size);
BOOST_REQUIRE_EQUAL(root.reclaim_sizes()[0], 4 * logalloc::segment_size);
});
}
SEASTAR_TEST_CASE(test_region_groups_basic_throttling_active_reclaim_no_double_reclaim) {
return seastar::async([] {
// allocate a parent region group (A) with a leaf region group (B), and let B go over limit.
// Both A and B try to execute requests, and we need to make sure that doesn't cause B's
// region eviction function to be called more than once. Node that test_async_reclaim_region
// will already make sure that we don't have double calls, so all we have to do is to
// generate a situation in which a double call would happen
test_reclaimer root(logalloc::segment_size);
test_reclaimer leaf(root, logalloc::segment_size);
test_async_reclaim_region leaf_region(leaf.rg(), logalloc::segment_size);
auto f = root.unleash(make_ready_future<>());
// FIXME: discarded future.
(void)leaf.unleash(std::move(f));
auto fut_root = root.rg().run_when_memory_available([&root] {
BOOST_REQUIRE_EQUAL(root.reclaim_sizes().size(), 1);
}, db::no_timeout);
auto fut_leaf = leaf.rg().run_when_memory_available([&root] {
BOOST_REQUIRE_EQUAL(root.reclaim_sizes().size(), 1);
}, db::no_timeout);
// Initially not available
BOOST_REQUIRE_EQUAL(fut_root.available(), false);
BOOST_REQUIRE_EQUAL(fut_leaf.available(), false);
quiesce(std::move(fut_root));
quiesce(std::move(fut_leaf));
BOOST_REQUIRE_EQUAL(root.reclaim_sizes().size(), 1);
BOOST_REQUIRE_EQUAL(root.reclaim_sizes()[0], logalloc::segment_size);
});
}
// Reproduces issue #2021
SEASTAR_TEST_CASE(test_no_crash_when_a_lot_of_requests_released_which_change_region_group_size) {
return seastar::async([test_name = get_name()] {
#ifndef SEASTAR_DEFAULT_ALLOCATOR // Because we need memory::stats().free_memory();
logging::logger_registry().set_logger_level("lsa", seastar::log_level::debug);
auto free_space = memory::stats().free_memory();
size_t threshold = size_t(0.75 * free_space);
region_group_reclaimer recl(threshold, threshold);
region_group gr(test_name, recl);
auto close_gr = defer([&gr] () noexcept { gr.shutdown().get(); });
region r(gr);
with_allocator(r.allocator(), [&] {
std::vector<managed_bytes> objs;
r.make_evictable([&] {
if (objs.empty()) {
return memory::reclaiming_result::reclaimed_nothing;
}
with_allocator(r.allocator(), [&] {
objs.pop_back();
});
return memory::reclaiming_result::reclaimed_something;
});
auto fill_to_pressure = [&] {
while (!recl.under_pressure()) {
objs.emplace_back(managed_bytes(managed_bytes::initialized_later(), 1024));
}
};
utils::phased_barrier request_barrier;
auto wait_for_requests = defer([&] () noexcept { request_barrier.advance_and_await().get(); });
for (int i = 0; i < 1000000; ++i) {
fill_to_pressure();
future<> f = gr.run_when_memory_available([&, op = request_barrier.start()] {
// Trigger group size change (Refs issue #2021)
gr.update(-10);
gr.update(+10);
}, db::no_timeout);
BOOST_REQUIRE(!f.available());
}
// Release
while (recl.under_pressure()) {
objs.pop_back();
}
});
#endif
});
}
SEASTAR_TEST_CASE(test_reclaiming_runs_as_long_as_there_is_soft_pressure) {
return seastar::async([test_name = get_name()] {
size_t hard_threshold = logalloc::segment_size * 8;
size_t soft_threshold = hard_threshold / 2;
class reclaimer : public region_group_reclaimer {
bool _reclaim = false;
protected:
void start_reclaiming() noexcept override {
_reclaim = true;
}
void stop_reclaiming() noexcept override {
_reclaim = false;
}
public:
reclaimer(size_t hard_threshold, size_t soft_threshold)
: region_group_reclaimer(hard_threshold, soft_threshold)
{ }
bool reclaiming() const { return _reclaim; };
};
reclaimer recl(hard_threshold, soft_threshold);
region_group gr(test_name, recl);
auto close_gr = defer([&gr] () noexcept { gr.shutdown().get(); });
region r(gr);
with_allocator(r.allocator(), [&] {
std::vector<managed_bytes> objs;
BOOST_REQUIRE(!recl.reclaiming());
while (!recl.over_soft_limit()) {
objs.emplace_back(managed_bytes(managed_bytes::initialized_later(), logalloc::segment_size));
}
BOOST_REQUIRE(recl.reclaiming());
while (!recl.under_pressure()) {
objs.emplace_back(managed_bytes(managed_bytes::initialized_later(), logalloc::segment_size));
}
BOOST_REQUIRE(recl.reclaiming());
while (recl.under_pressure()) {
objs.pop_back();
}
BOOST_REQUIRE(recl.over_soft_limit());
BOOST_REQUIRE(recl.reclaiming());
while (recl.over_soft_limit()) {
objs.pop_back();
}
BOOST_REQUIRE(!recl.reclaiming());
});
});
}
SEASTAR_TEST_CASE(test_zone_reclaiming_preserves_free_size) {
return seastar::async([] {
region r;

View File

@@ -2073,33 +2073,6 @@ lsa_buffer::~lsa_buffer() {
}
}
inline void
region_group_binomial_group_sanity_check(const region_group::region_heap& bh) {
#ifdef SEASTAR_DEBUG
bool failed = false;
size_t last = std::numeric_limits<size_t>::max();
for (auto b = bh.ordered_begin(); b != bh.ordered_end(); b++) {
auto t = region_impl_to_region(*b)->evictable_occupancy().total_space();
if (!(t <= last)) {
failed = true;
break;
}
last = t;
}
if (!failed) {
return;
}
fmt::print("Sanity checking FAILED, size {}\n", bh.size());
for (auto b = bh.ordered_begin(); b != bh.ordered_end(); b++) {
auto r = region_impl_to_region(*b);
auto t = r->evictable_occupancy().total_space();
fmt::print(" r = {} (id={}), occupancy = {}\n", fmt::ptr(r), r->id(), t);
}
assert(0);
#endif
}
size_t tracker::reclamation_step() const {
return _impl->reclamation_step();
}
@@ -2656,136 +2629,6 @@ bool segment_pool::compact_segment(segment* seg) {
return true;
}
region_group_reclaimer region_group::no_reclaimer;
uint64_t region_group::top_region_evictable_space() const {
return _regions.empty() ? 0 : region_impl_to_region(_regions.top())->evictable_occupancy().total_space();
}
region* region_group::get_largest_region() {
if (!_maximal_rg || _maximal_rg->_regions.empty()) {
return nullptr;
}
return region_impl_to_region(_maximal_rg->_regions.top());
}
void
region_group::add(region_group* child) {
child->_subgroup_heap_handle = _subgroups.push(child);
update(child->_total_memory);
}
void
region_group::del(region_group* child) {
_subgroups.erase(child->_subgroup_heap_handle);
update(-child->_total_memory);
}
void
region_group::add(region* child_r) {
auto child = region_to_region_impl(child_r);
child_r->region_heap_handle() = _regions.push(child);
region_group_binomial_group_sanity_check(_regions);
update(child_r->occupancy().total_space());
}
void
region_group::del(region* child_r) {
_regions.erase(child_r->region_heap_handle());
region_group_binomial_group_sanity_check(_regions);
update(-child_r->occupancy().total_space());
}
void
region_group::moved(region* old_address, region* new_address) {
}
bool
region_group::execution_permitted() noexcept {
return do_for_each_parent(this, [] (auto rg) {
return rg->under_pressure() ? stop_iteration::yes : stop_iteration::no;
}) == nullptr;
}
future<>
region_group::start_releaser(scheduling_group deferred_work_sg) {
return with_scheduling_group(deferred_work_sg, [this] {
return yield().then([this] {
return repeat([this] () noexcept {
if (_shutdown_requested) {
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
if (!_blocked_requests.empty() && execution_permitted()) {
auto req = std::move(_blocked_requests.front());
_blocked_requests.pop_front();
req->allocate();
return make_ready_future<stop_iteration>(stop_iteration::no);
} else {
// Block reclaiming to prevent signal() from being called by reclaimer inside wait()
// FIXME: handle allocation failures (not very likely) like allocating_section does
tracker_reclaimer_lock rl;
return _relief.wait().then([] {
return stop_iteration::no;
});
}
});
});
});
}
region_group::region_group(sstring name, region_group *parent,
region_group_reclaimer& reclaimer, scheduling_group deferred_work_sg)
: _parent(parent)
, _reclaimer(reclaimer)
, _blocked_requests(on_request_expiry{std::move(name)})
, _releaser(reclaimer_can_block() ? start_releaser(deferred_work_sg) : make_ready_future<>())
{
if (_parent) {
_parent->add(this);
}
}
bool region_group::reclaimer_can_block() const {
return _reclaimer.throttle_threshold() != std::numeric_limits<size_t>::max();
}
void region_group::notify_relief() {
_relief.signal();
for (region_group* child : _subgroups) {
child->notify_relief();
}
}
void region_group::update(ssize_t delta) {
// Most-enclosing group which was relieved.
region_group* top_relief = nullptr;
do_for_each_parent(this, [&top_relief, delta] (region_group* rg) mutable {
rg->update_maximal_rg();
rg->_total_memory += delta;
if (rg->_total_memory >= rg->_reclaimer.soft_limit_threshold()) {
rg->_reclaimer.notify_soft_pressure();
} else {
rg->_reclaimer.notify_soft_relief();
}
if (rg->_total_memory > rg->_reclaimer.throttle_threshold()) {
rg->_reclaimer.notify_pressure();
} else if (rg->_reclaimer.under_pressure()) {
rg->_reclaimer.notify_relief();
top_relief = rg;
}
return stop_iteration::no;
});
if (top_relief) {
top_relief->notify_relief();
}
}
region* region_impl_to_region(region_impl* ri) {
return ri->_region;
}
@@ -2875,10 +2718,6 @@ void allocating_section::set_std_reserve(size_t reserve) {
_std_reserve = reserve;
}
void region_group::on_request_expiry::operator()(std::unique_ptr<allocating_function>& func) noexcept {
func->fail(std::make_exception_ptr(blocked_requests_timed_out_error{_name}));
}
future<> prime_segment_pool(size_t available_memory, size_t min_free_memory) {
return smp::invoke_on_all([=] {
shard_segment_pool.prime(available_memory, min_free_memory);

View File

@@ -41,82 +41,6 @@ constexpr size_t max_zone_segments = 256;
//
using eviction_fn = std::function<memory::reclaiming_result()>;
//
// Users of a region_group can pass an instance of the class region_group_reclaimer, and specialize
// its methods start_reclaiming() and stop_reclaiming(). Those methods will be called when the LSA
// see relevant changes in the memory pressure conditions for this region_group. By specializing
// those methods - which are a nop by default - the callers can take action to aid the LSA in
// alleviating pressure.
class region_group_reclaimer {
protected:
size_t _threshold;
size_t _soft_limit;
bool _under_pressure = false;
bool _under_soft_pressure = false;
// The following restrictions apply to implementations of start_reclaiming() and stop_reclaiming():
//
// - must not use any region or region_group objects, because they're invoked synchronously
// with operations on those.
//
// - must be noexcept, because they're called on the free path.
//
// - the implementation may be called synchronously with any operation
// which allocates memory, because these are called by memory reclaimer.
// In particular, the implementation should not depend on memory allocation
// because that may fail when in reclaiming context.
//
virtual void start_reclaiming() noexcept {}
virtual void stop_reclaiming() noexcept {}
public:
bool under_pressure() const {
return _under_pressure;
}
bool over_soft_limit() const {
return _under_soft_pressure;
}
void notify_soft_pressure() noexcept {
if (!_under_soft_pressure) {
_under_soft_pressure = true;
start_reclaiming();
}
}
void notify_soft_relief() noexcept {
if (_under_soft_pressure) {
_under_soft_pressure = false;
stop_reclaiming();
}
}
void notify_pressure() noexcept {
_under_pressure = true;
}
void notify_relief() noexcept {
_under_pressure = false;
}
region_group_reclaimer()
: _threshold(std::numeric_limits<size_t>::max()), _soft_limit(std::numeric_limits<size_t>::max()) {}
region_group_reclaimer(size_t threshold)
: _threshold(threshold), _soft_limit(threshold) {}
region_group_reclaimer(size_t threshold, size_t soft)
: _threshold(threshold), _soft_limit(soft) {
assert(_soft_limit <= _threshold);
}
virtual ~region_group_reclaimer() {}
size_t throttle_threshold() const {
return _threshold;
}
size_t soft_limit_threshold() const {
return _soft_limit;
}
};
struct region_evictable_occupancy_ascending_less_comparator {
bool operator()(region_impl* r1, region_impl* r2) const;
};
@@ -142,296 +66,6 @@ public:
virtual void decrease_usage(region_heap::handle_type& r_handle, ssize_t delta) = 0;
};
// Groups regions for the purpose of statistics. Can be nested.
// Interfaces to regions via region_listener
class region_group : public region_listener {
static region_group_reclaimer no_reclaimer;
using region_evictable_occupancy_ascending_less_comparator = logalloc::region_evictable_occupancy_ascending_less_comparator;
// We want to sort the subgroups so that we can easily find the one that holds the biggest
// region for freeing purposes. Please note that this is not the biggest of the region groups,
// since a big region group can have a big collection of very small regions, and freeing them
// won't achieve anything. An example of such scenario is a ScyllaDB region with a lot of very
// small memtables that add up, versus one with a very big memtable. The small memtables are
// likely still growing, and freeing the big memtable will guarantee that the most memory is
// freed up, while maximizing disk throughput.
//
// As asynchronous reclaim will likely involve disk operation, and those tend to be more
// efficient when bulk done, this behavior is not ScyllaDB memtable specific.
//
// The maximal score is recursively defined as:
//
// max(our_biggest_region, our_subtree_biggest_region)
struct subgroup_maximal_region_ascending_less_comparator {
bool operator()(region_group* rg1, region_group* rg2) const {
return rg1->maximal_score() < rg2->maximal_score();
}
};
friend struct subgroup_maximal_region_ascending_less_comparator;
using region_heap = logalloc::region_heap;
using subgroup_heap = boost::heap::binomial_heap<region_group*,
boost::heap::compare<subgroup_maximal_region_ascending_less_comparator>,
boost::heap::allocator<std::allocator<region_group*>>,
//constant_time_size<true> causes corruption with boost < 1.60
boost::heap::constant_time_size<false>>;
region_group* _parent = nullptr;
size_t _total_memory = 0;
region_group_reclaimer& _reclaimer;
subgroup_heap _subgroups;
subgroup_heap::handle_type _subgroup_heap_handle;
region_heap _regions;
region_group* _maximal_rg = nullptr;
// We need to store the score separately, otherwise we'd have to have an extra pass
// before we update the region occupancy.
size_t _maximal_score = 0;
struct allocating_function {
virtual ~allocating_function() = default;
virtual void allocate() = 0;
virtual void fail(std::exception_ptr) = 0;
};
template <typename Func>
struct concrete_allocating_function : public allocating_function {
using futurator = futurize<std::result_of_t<Func()>>;
typename futurator::promise_type pr;
Func func;
public:
void allocate() override {
futurator::invoke(func).forward_to(std::move(pr));
}
void fail(std::exception_ptr e) override {
pr.set_exception(e);
}
concrete_allocating_function(Func&& func) : func(std::forward<Func>(func)) {}
typename futurator::type get_future() {
return pr.get_future();
}
};
class on_request_expiry {
class blocked_requests_timed_out_error : public timed_out_error {
const sstring _msg;
public:
explicit blocked_requests_timed_out_error(sstring name)
: _msg(std::move(name) + ": timed out") {}
virtual const char* what() const noexcept override {
return _msg.c_str();
}
};
sstring _name;
public:
explicit on_request_expiry(sstring name) : _name(std::move(name)) {}
void operator()(std::unique_ptr<allocating_function>&) noexcept;
};
// It is a more common idiom to just hold the promises in the circular buffer and make them
// ready. However, in the time between the promise being made ready and the function execution,
// it could be that our memory usage went up again. To protect against that, we have to recheck
// if memory is still available after the future resolves.
//
// But we can greatly simplify it if we store the function itself in the circular_buffer, and
// execute it synchronously in release_requests() when we are sure memory is available.
//
// This allows us to easily provide strong execution guarantees while keeping all re-check
// complication in release_requests and keep the main request execution path simpler.
expiring_fifo<std::unique_ptr<allocating_function>, on_request_expiry, db::timeout_clock> _blocked_requests;
uint64_t _blocked_requests_counter = 0;
condition_variable _relief;
future<> _releaser;
bool _shutdown_requested = false;
bool reclaimer_can_block() const;
future<> start_releaser(scheduling_group deferered_work_sg);
void notify_relief();
friend void region_group_binomial_group_sanity_check(const region_group::region_heap& bh);
private: // from region_listener
virtual void moved(region* old_address, region* new_address) override;
public:
// When creating a region_group, one can specify an optional throttle_threshold parameter. This
// parameter won't affect normal allocations, but an API is provided, through the region_group's
// method run_when_memory_available(), to make sure that a given function is only executed when
// the total memory for the region group (and all of its parents) is lower or equal to the
// region_group's throttle_treshold (and respectively for its parents).
//
// The deferred_work_sg parameter specifies a scheduling group in which to run allocations
// (given to run_when_memory_available()) when they must be deferred due to lack of memory
// at the time the call to run_when_memory_available() was made.
region_group(sstring name = "(unnamed region_group)",
region_group_reclaimer& reclaimer = no_reclaimer,
scheduling_group deferred_work_sg = default_scheduling_group())
: region_group(name, nullptr, reclaimer, deferred_work_sg) {}
region_group(sstring name, region_group* parent, region_group_reclaimer& reclaimer = no_reclaimer,
scheduling_group deferred_work_sg = default_scheduling_group());
region_group(region_group&& o) = delete;
region_group(const region_group&) = delete;
~region_group() {
// If we set a throttle threshold, we'd be postponing many operations. So shutdown must be
// called.
if (reclaimer_can_block()) {
assert(_shutdown_requested);
}
if (_parent) {
_parent->del(this);
}
}
region_group& operator=(const region_group&) = delete;
region_group& operator=(region_group&&) = delete;
size_t memory_used() const {
return _total_memory;
}
void update(ssize_t delta);
// It would be easier to call update, but it is unfortunately broken in boost versions up to at
// least 1.59.
//
// One possibility would be to just test for delta sigdness, but we adopt an explicit call for
// two reasons:
//
// 1) it save us a branch
// 2) some callers would like to pass delta = 0. For instance, when we are making a region
// evictable / non-evictable. Because the evictable occupancy changes, we would like to call
// the full update cycle even then.
virtual void increase_usage(region_heap::handle_type& r_handle, ssize_t delta) override { // From region_listener
_regions.increase(r_handle);
update(delta);
}
virtual void decrease_evictable_usage(region_heap::handle_type& r_handle) override { // From region_listener
_regions.decrease(r_handle);
}
virtual void decrease_usage(region_heap::handle_type& r_handle, ssize_t delta) override { // From region_listener
decrease_evictable_usage(r_handle);
update(delta);
}
//
// Make sure that the function specified by the parameter func only runs when this region_group,
// as well as each of its ancestors have a memory_used() amount of memory that is lesser or
// equal the throttle_threshold, as specified in the region_group's constructor.
//
// region_groups that did not specify a throttle_threshold will always allow for execution.
//
// In case current memory_used() is over the threshold, a non-ready future is returned and it
// will be made ready at some point in the future, at which memory usage in the offending
// region_group (either this or an ancestor) falls below the threshold.
//
// Requests that are not allowed for execution are queued and released in FIFO order within the
// same region_group, but no guarantees are made regarding release ordering across different
// region_groups.
//
// When timeout is reached first, the returned future is resolved with timed_out_error exception.
template <typename Func>
// We disallow future-returning functions here, because otherwise memory may be available
// when we start executing it, but no longer available in the middle of the execution.
requires (!is_future<std::invoke_result_t<Func>>::value)
futurize_t<std::result_of_t<Func()>> run_when_memory_available(Func&& func, db::timeout_clock::time_point timeout) {
auto blocked_at = do_for_each_parent(this, [] (auto rg) {
return (rg->_blocked_requests.empty() && !rg->under_pressure()) ? stop_iteration::no : stop_iteration::yes;
});
if (!blocked_at) {
return futurize_invoke(func);
}
auto fn = std::make_unique<concrete_allocating_function<Func>>(std::forward<Func>(func));
auto fut = fn->get_future();
_blocked_requests.push_back(std::move(fn), timeout);
++_blocked_requests_counter;
return fut;
}
// returns a pointer to the largest region (in terms of memory usage) that sits below this
// region group. This includes the regions owned by this region group as well as all of its
// children.
region* get_largest_region();
// Shutdown is mandatory for every user who has set a threshold
// Can be called at most once.
future<> shutdown() {
_shutdown_requested = true;
_relief.signal();
return std::move(_releaser);
}
size_t blocked_requests() {
return _blocked_requests.size();
}
uint64_t blocked_requests_counter() const {
return _blocked_requests_counter;
}
private:
// Returns true if and only if constraints of this group are not violated.
// That's taking into account any constraints imposed by enclosing (parent) groups.
bool execution_permitted() noexcept;
// Executes the function func for each region_group upwards in the hierarchy, starting with the
// parameter node. The function func may return stop_iteration::no, in which case it proceeds to
// the next ancestor in the hierarchy, or stop_iteration::yes, in which case it stops at this
// level.
//
// This method returns a pointer to the region_group that was processed last, or nullptr if the
// root was reached.
template <typename Func>
static region_group* do_for_each_parent(region_group *node, Func&& func) {
auto rg = node;
while (rg) {
if (func(rg) == stop_iteration::yes) {
return rg;
}
rg = rg->_parent;
}
return nullptr;
}
inline bool under_pressure() const {
return _reclaimer.under_pressure();
}
uint64_t top_region_evictable_space() const;
uint64_t maximal_score() const {
return _maximal_score;
}
void update_maximal_rg() {
auto my_score = top_region_evictable_space();
auto children_score = _subgroups.empty() ? 0 : _subgroups.top()->maximal_score();
auto old_maximal_score = _maximal_score;
if (children_score > my_score) {
_maximal_rg = _subgroups.top()->_maximal_rg;
} else {
_maximal_rg = this;
}
_maximal_score = _maximal_rg->top_region_evictable_space();
if (_parent) {
// binomial heap update boost bug.
if (_maximal_score > old_maximal_score) {
_parent->_subgroups.increase(_subgroup_heap_handle);
} else if (_maximal_score < old_maximal_score) {
_parent->_subgroups.decrease(_subgroup_heap_handle);
}
}
}
void add(region_group* child);
void del(region_group* child);
virtual void add(region* child) override; // from region_listener
virtual void del(region* child) override; // from region_listener
};
// Controller for all LSA regions. There's one per shard.
class tracker {
public: