utils/logalloc: move segment pool into tracker

Instead of a separate global segment pool instance, make it a member of
the already global tracker. Most users are inside the tracker instance
anyway. Outside users can access the pool through the global tracker
instance.
This commit is contained in:
Botond Dénes
2022-04-22 16:34:18 +03:00
parent 5b86dfc35a
commit 3bd94e41bf
2 changed files with 132 additions and 105 deletions

View File

@@ -1073,6 +1073,15 @@ class sharded:
return self.instance()
def get_lsa_segment_pool():
try:
tracker = gdb.parse_and_eval('\'logalloc::tracker_instance\'')
tracker_impl = std_unique_ptr(tracker["_impl"]).get().dereference()
return std_unique_ptr(tracker_impl["_segment_pool"]).get().dereference()
except gdb.error:
return gdb.parse_and_eval('\'logalloc::shard_segment_pool\'')
def find_db(shard=None):
try:
db = gdb.parse_and_eval('::debug::the_database')
@@ -2028,7 +2037,7 @@ class scylla_memory(gdb.Command):
gdb.write('Used memory: {used_mem:>13}\nFree memory: {free_mem:>13}\nTotal memory: {total_mem:>12}\n\n'
.format(used_mem=total_mem - free_mem, free_mem=free_mem, total_mem=total_mem))
lsa = gdb.parse_and_eval('\'logalloc::shard_segment_pool\'')
lsa = get_lsa_segment_pool()
segment_size = int(gdb.parse_and_eval('\'logalloc::segment::size\''))
lsa_free = int(lsa['_free_segments']) * segment_size
non_lsa_mem = int(lsa['_non_lsa_memory_in_use'])
@@ -2492,11 +2501,11 @@ class scylla_ptr(gdb.Command):
ptr_meta.offset_in_object = ptr - span.start
# FIXME: handle debug-mode build
try:
index = gdb.parse_and_eval('(%d - \'logalloc::shard_segment_pool\'._store._segments_base) / \'logalloc::segment\'::size' % (ptr))
except gdb.error:
index = gdb.parse_and_eval('(%d - \'logalloc::shard_segment_pool\'._segments_base) / \'logalloc::segment\'::size' % (ptr)) # Scylla 3.0 compatibility
desc = gdb.parse_and_eval('\'logalloc::shard_segment_pool\'._segments._M_impl._M_start[%d]' % (index))
segment_pool = get_lsa_segment_pool()
segments_base = int(segment_pool["_store"]["_segments_base"])
segment_size = int(gdb.parse_and_eval('\'logalloc::segment\'::size'))
index = int((int(ptr) - segments_base) / segment_size)
desc = std_vector(segment_pool["_segments"])[index]
ptr_meta.is_lsa = bool(desc['_region'])
return ptr_meta
@@ -2533,11 +2542,12 @@ class scylla_segment_descs(gdb.Command):
def invoke(self, arg, from_tty):
segment_size = int(gdb.parse_and_eval('\'logalloc\'::segment::size'))
segment_pool = get_lsa_segment_pool()
try:
base = int(gdb.parse_and_eval('\'logalloc\'::shard_segment_pool._store._segments_base'))
base = int(segment_pool["_store"]["_segments_base"])
except gdb.error:
try:
base = int(gdb.parse_and_eval('\'logalloc\'::shard_segment_pool._segments_base'))
base = int(segment_pool["_segments_base"])
except gdb.error:
base = None
@@ -2551,15 +2561,15 @@ class scylla_segment_descs(gdb.Command):
gdb.write('0x%x: std\n' % (seg_addr))
if base is None: # debug mode build
segs = std_vector(gdb.parse_and_eval('\'logalloc\'::shard_segment_pool._store._segments'))
descs = std_vector(gdb.parse_and_eval('\'logalloc\'::shard_segment_pool._segments'))
segs = std_vector(segment_pool["_store"]["_segments"])
descs = std_vector(segment_pool["_segments"])
for seg, desc_ref in zip(segs, descs):
desc = segment_descriptor(desc_ref)
print_desc(int(seg), desc)
else:
addr = base
for desc in std_vector(gdb.parse_and_eval('\'logalloc\'::shard_segment_pool._segments')):
for desc in std_vector(segment_pool["_segments"]):
desc = segment_descriptor(desc)
print_desc(addr, desc)
addr += segment_size
@@ -2594,10 +2604,11 @@ class scylla_lsa_check(gdb.Command):
# Scan shard's segment_descriptor:s for anomalies:
# - detect segments owned by cache which are not in cache region's _segment_descs
# - compute segment occupancy statistics for comparison with region's stored ones
base = int(gdb.parse_and_eval('\'logalloc\'::shard_segment_pool._store._segments_base'))
segment_pool = get_lsa_segment_pool()
base = int(segment_pool["_store"]["_segments_base"])
desc_free_space = 0
desc_total_space = 0
for desc in std_vector(gdb.parse_and_eval('\'logalloc\'::shard_segment_pool._segments')):
for desc in std_vector(segment_pool["_segments"]):
desc = segment_descriptor(desc)
if desc.is_lsa() and desc.region() == cache_region.impl():
if not int(desc.address) in in_buckets:
@@ -2628,7 +2639,7 @@ class scylla_lsa(gdb.Command):
gdb.Command.__init__(self, 'scylla lsa', gdb.COMMAND_USER, gdb.COMPLETE_COMMAND)
def invoke(self, arg, from_tty):
lsa = gdb.parse_and_eval('\'logalloc::shard_segment_pool\'')
lsa = get_lsa_segment_pool()
segment_size = int(gdb.parse_and_eval('\'logalloc::segment::size\''))
lsa_mem = int(lsa['_segments_in_use']) * segment_size

View File

@@ -431,7 +431,10 @@ public:
}
};
class segment_pool;
class tracker::impl {
std::unique_ptr<segment_pool> _segment_pool;
std::optional<background_reclaimer> _background_reclaimer;
std::vector<region::impl*> _regions;
seastar::metrics::metric_groups _metrics;
@@ -470,6 +473,9 @@ public:
void enable_reclaim() noexcept {
--_reclaiming_disabled_depth;
}
segment_pool& segment_pool() {
return *_segment_pool;
}
void register_region(region::impl*);
void unregister_region(region::impl*) noexcept;
size_t reclaim(size_t bytes, is_preemptible p);
@@ -968,6 +974,7 @@ private:
const size_t _segments_to_release;
const size_t _reserve_goal, _reserve_max;
tracker::impl* _tracker;
segment_pool& _segment_pool;
extra_logger _extra_logs;
const bool _debug_enabled;
@@ -980,10 +987,15 @@ private:
clock::duration _duration;
inline reclaim_timer(const char* name, is_preemptible preemptible, size_t memory_to_release, size_t segments_to_release, tracker::impl* tracker, segment_pool& segment_pool, extra_logger extra_logs);
public:
inline reclaim_timer(const char* name, is_preemptible preemptible, size_t memory_to_release, size_t segments_to_release, tracker::impl* tracker, extra_logger extra_logs = [](log_level){});
inline reclaim_timer(const char* name, is_preemptible preemptible, size_t memory_to_release, size_t segments_to_release, extra_logger extra_logs = [](log_level){})
: reclaim_timer(name, preemptible, memory_to_release, segments_to_release, nullptr, std::move(extra_logs))
inline reclaim_timer(const char* name, is_preemptible preemptible, size_t memory_to_release, size_t segments_to_release, tracker::impl& tracker, extra_logger extra_logs = [](log_level){})
: reclaim_timer(name, preemptible, memory_to_release, segments_to_release, &tracker, tracker.segment_pool(), std::move(extra_logs))
{}
inline reclaim_timer(const char* name, is_preemptible preemptible, size_t memory_to_release, size_t segments_to_release, segment_pool& segment_pool, extra_logger extra_logs = [](log_level){})
: reclaim_timer(name, preemptible, memory_to_release, segments_to_release, nullptr, segment_pool, std::move(extra_logs))
{}
inline ~reclaim_timer();
@@ -1024,7 +1036,7 @@ size_t segment_pool::reclaim_segments(size_t target, is_preemptible preempt) {
// Reclamation. Migrate segments to higher addresses and shrink segment pool.
size_t reclaimed_segments = 0;
reclaim_timer timing_guard("reclaim_segments", preempt, target * segment::size, target, [&] (log_level level) {
reclaim_timer timing_guard("reclaim_segments", preempt, target * segment::size, target, *this, [&] (log_level level) {
timing_logger.log(level, "- reclaimed {} out of requested {} segments", reclaimed_segments, target);
});
@@ -1245,25 +1257,18 @@ size_t segment_pool::segments_in_use() const noexcept {
return _segments_in_use;
}
static segment_pool& get_shard_segment_pool() noexcept {
memory::scoped_critical_alloc_section dfg;
static thread_local segment_pool obj;
return obj;
}
static thread_local segment_pool& shard_segment_pool = get_shard_segment_pool();
thread_local reclaim_timer* reclaim_timer::_active_timer;
thread_local clock::duration reclaim_timer::_duration_threshold = clock::duration::zero();
reclaim_timer::reclaim_timer(const char* name, is_preemptible preemptible, size_t memory_to_release, size_t segments_to_release, tracker::impl* tracker, extra_logger extra_logs)
reclaim_timer::reclaim_timer(const char* name, is_preemptible preemptible, size_t memory_to_release, size_t segments_to_release, tracker::impl* tracker, segment_pool& segment_pool, extra_logger extra_logs)
: _name(name)
, _preemptible(preemptible)
, _memory_to_release(memory_to_release)
, _segments_to_release(segments_to_release)
, _reserve_goal(shard_segment_pool.current_emergency_reserve_goal())
, _reserve_max(shard_segment_pool.emergency_reserve_max())
, _reserve_goal(segment_pool.current_emergency_reserve_goal())
, _reserve_max(segment_pool.emergency_reserve_max())
, _tracker(tracker)
, _segment_pool(segment_pool)
, _extra_logs(std::move(extra_logs))
, _debug_enabled(timing_logger.is_enabled(logging::log_level::debug))
{
@@ -1300,7 +1305,7 @@ void reclaim_timer::sample_stats(stats& data) {
if (_debug_enabled && _tracker) {
data.region_occupancy = _tracker->region_occupancy();
}
data.pool_stats = shard_segment_pool.statistics();
data.pool_stats = _segment_pool.statistics();
}
void reclaim_timer::report() const noexcept {
@@ -1510,7 +1515,7 @@ private:
// Align the end of the value so that the next descriptor is aligned
_active_offset = align_up_for_asan(_active_offset);
shard_segment_pool.descriptor(_active).record_alloc(_active_offset - old_active_offset);
segment_pool().descriptor(_active).record_alloc(_active_offset - old_active_offset);
return pos;
}
@@ -1542,7 +1547,7 @@ private:
auto pos =_active->at<char>(_active_offset);
desc.encode(pos);
}
auto& desc = shard_segment_pool.descriptor(_active);
auto& desc = segment_pool().descriptor(_active);
llogger.trace("Closing segment {}, used={}, waste={} [B]", fmt::ptr(_active), desc.occupancy(), segment::size - _active_offset);
_closed_occupancy += desc.occupancy();
@@ -1554,7 +1559,7 @@ private:
if (!_buf_active) {
return;
}
auto& desc = shard_segment_pool.descriptor(_buf_active);
auto& desc = segment_pool().descriptor(_buf_active);
llogger.trace("Closing buf segment {}, used={}, waste={} [B]", fmt::ptr(_buf_active), desc.occupancy(), segment::size - _buf_active_offset);
_closed_occupancy += desc.occupancy();
@@ -1563,15 +1568,15 @@ private:
}
void free_segment(segment_descriptor& desc) noexcept {
free_segment(shard_segment_pool.segment_from(desc), desc);
free_segment(segment_pool().segment_from(desc), desc);
}
void free_segment(segment* seg) noexcept {
free_segment(seg, shard_segment_pool.descriptor(seg));
free_segment(seg, segment_pool().descriptor(seg));
}
void free_segment(segment* seg, segment_descriptor& desc) noexcept {
shard_segment_pool.free_segment(seg, desc);
segment_pool().free_segment(seg, desc);
if (_listener) {
_evictable_space -= segment_size;
_listener->decrease_usage(_region, -segment::size);
@@ -1579,7 +1584,7 @@ private:
}
segment* new_segment() {
segment* seg = shard_segment_pool.new_segment(this);
segment* seg = segment_pool().new_segment(this);
if (_listener) {
_evictable_space += segment_size;
_listener->increase_usage(_region, segment::size);
@@ -1608,7 +1613,7 @@ private:
ptr._size = buf_size;
unpoison(ptr._buf, buf_size);
segment_descriptor& desc = shard_segment_pool.descriptor(_buf_active);
segment_descriptor& desc = segment_pool().descriptor(_buf_active);
ptr._desc = &desc;
desc._buf_pointers.emplace_back(entangled::make_paired_with(ptr._link));
auto alloc_size = align_up(buf_size, buf_align);
@@ -1620,7 +1625,7 @@ private:
void free_buf(lsa_buffer& buf) noexcept {
segment_descriptor &desc = *buf._desc;
segment *seg = shard_segment_pool.segment_from(desc);
segment *seg = segment_pool().segment_from(desc);
if (seg != _buf_active) {
_closed_occupancy -= desc.occupancy();
@@ -1685,7 +1690,7 @@ private:
}
free_segment(seg, desc);
shard_segment_pool.on_segment_compaction(seg_occupancy.used_space());
segment_pool().on_segment_compaction(seg_occupancy.used_space());
}
void close_and_open() {
@@ -1704,7 +1709,7 @@ private:
close_buf_active();
}
assert((uintptr_t)new_active->at(0) % buf_align == 0);
segment_descriptor& desc = shard_segment_pool.descriptor(new_active);
segment_descriptor& desc = segment_pool().descriptor(new_active);
desc._buf_pointers = std::move(ptrs);
desc.set_kind(segment_kind::bufs);
_buf_active = new_active;
@@ -1753,12 +1758,12 @@ public:
}
_closed_occupancy = {};
if (_active) {
assert(shard_segment_pool.descriptor(_active).is_empty());
assert(segment_pool().descriptor(_active).is_empty());
free_segment(_active);
_active = nullptr;
}
if (_buf_active) {
assert(shard_segment_pool.descriptor(_buf_active).is_empty());
assert(segment_pool().descriptor(_buf_active).is_empty());
free_segment(_buf_active);
_buf_active = nullptr;
}
@@ -1767,6 +1772,10 @@ public:
region_impl(region_impl&&) = delete;
region_impl(const region_impl&) = delete;
logalloc::segment_pool& segment_pool() const {
return _tracker.get_impl().segment_pool();
}
bool empty() const noexcept {
return occupancy().used_space() == 0;
}
@@ -1800,10 +1809,10 @@ public:
occupancy_stats total = _non_lsa_occupancy;
total += _closed_occupancy;
if (_active) {
total += shard_segment_pool.descriptor(_active).occupancy();
total += segment_pool().descriptor(_active).occupancy();
}
if (_buf_active) {
total += shard_segment_pool.descriptor(_buf_active).occupancy();
total += segment_pool().descriptor(_buf_active).occupancy();
}
return total;
}
@@ -1844,7 +1853,8 @@ public:
virtual void* alloc(allocation_strategy::migrate_fn migrator, size_t size, size_t alignment) override {
compaction_lock _(*this);
memory::on_alloc_point();
shard_segment_pool.on_memory_allocation(size);
auto& pool = segment_pool();
pool.on_memory_allocation(size);
if (size > max_managed_object_size) {
auto ptr = standard_allocator().alloc(migrator, size + sizeof(non_lsa_object_cookie), alignment);
// This isn't very acurrate, the correct free_space value would be
@@ -1857,7 +1867,7 @@ public:
_evictable_space += allocated_size;
_listener->increase_usage(_region, allocated_size);
}
shard_segment_pool.add_non_lsa_memory_in_use(allocated_size);
pool.add_non_lsa_memory_in_use(allocated_size);
return ptr;
} else {
auto ptr = alloc_small(object_descriptor(migrator), (segment::size_type) size, alignment);
@@ -1876,12 +1886,12 @@ private:
_evictable_space -= allocated_size;
_listener->decrease_usage(_region, allocated_size);
}
shard_segment_pool.subtract_non_lsa_memory_in_use(allocated_size);
segment_pool().subtract_non_lsa_memory_in_use(allocated_size);
}
public:
virtual void free(void* obj) noexcept override {
compaction_lock _(*this);
segment* seg = shard_segment_pool.containing_segment(obj);
segment* seg = segment_pool().containing_segment(obj);
if (!seg) {
on_non_lsa_free(obj);
standard_allocator().free(obj);
@@ -1895,7 +1905,8 @@ public:
virtual void free(void* obj, size_t size) noexcept override {
compaction_lock _(*this);
segment* seg = shard_segment_pool.containing_segment(obj);
auto& pool = segment_pool();
segment* seg = pool.containing_segment(obj);
if (!seg) {
on_non_lsa_free(obj);
@@ -1905,7 +1916,7 @@ public:
_sanitizer.on_free(obj, size);
segment_descriptor& seg_desc = shard_segment_pool.descriptor(seg);
segment_descriptor& seg_desc = pool.descriptor(seg);
auto pos = reinterpret_cast<const char*>(obj);
auto old_pos = pos;
@@ -1921,7 +1932,7 @@ public:
}
seg_desc.record_free(dead_size);
shard_segment_pool.on_memory_deallocation(dead_size);
pool.on_memory_deallocation(dead_size);
if (seg != _active) {
if (seg_desc.is_empty()) {
@@ -1935,7 +1946,7 @@ public:
}
virtual size_t object_memory_size_in_allocator(const void* obj) const noexcept override {
segment* seg = shard_segment_pool.containing_segment(obj);
segment* seg = segment_pool().containing_segment(obj);
if (!seg) {
return standard_allocator().object_memory_size_in_allocator(obj);
@@ -1960,8 +1971,10 @@ public:
unlisten_temporarily ult1(this);
unlisten_temporarily ult2(&other);
if (_active && shard_segment_pool.descriptor(_active).is_empty()) {
shard_segment_pool.free_segment(_active);
auto& pool = segment_pool();
if (_active && pool.descriptor(_active).is_empty()) {
pool.free_segment(_active);
_active = nullptr;
}
if (!_active) {
@@ -1969,7 +1982,7 @@ public:
other._active = nullptr;
_active_offset = other._active_offset;
if (_active) {
shard_segment_pool.set_region(_active, this);
pool.set_region(_active, this);
}
} else {
other.close_active();
@@ -1977,7 +1990,7 @@ public:
other.close_buf_active();
for (auto& desc : other._segment_descs) {
shard_segment_pool.set_region(desc, this);
pool.set_region(desc, this);
}
_segment_descs.merge(other._segment_descs);
@@ -2008,7 +2021,7 @@ public:
auto& desc = _segment_descs.one_of_largest();
_segment_descs.pop_one_of_largest();
_closed_occupancy -= desc.occupancy();
segment* seg = shard_segment_pool.segment_from(desc);
segment* seg = segment_pool().segment_from(desc);
compact_segment_locked(seg, desc);
}
@@ -2025,7 +2038,7 @@ public:
while (!all.empty()) {
auto& desc = all.one_of_largest();
all.pop_one_of_largest();
compact_segment_locked(shard_segment_pool.segment_from(desc), desc);
compact_segment_locked(segment_pool().segment_from(desc), desc);
}
llogger.debug("Done, {}", occupancy());
}
@@ -2057,9 +2070,10 @@ public:
memory::reclaiming_result evict_some() {
++_invalidate_counter;
auto freed = shard_segment_pool.statistics().memory_freed;
auto& pool = segment_pool();
auto freed = pool.statistics().memory_freed;
auto ret = _eviction_fn();
shard_segment_pool.on_memory_eviction(shard_segment_pool.statistics().memory_freed - freed);
pool.on_memory_eviction(pool.statistics().memory_freed - freed);
return ret;
}
@@ -2238,7 +2252,7 @@ occupancy_stats tracker::impl::region_occupancy() const noexcept {
occupancy_stats tracker::impl::occupancy() const noexcept {
auto occ = region_occupancy();
{
auto s = shard_segment_pool.free_segments() * segment::size;
auto s = _segment_pool->free_segments() * segment::size;
occ += occupancy_stats(s, s);
}
return occ;
@@ -2248,7 +2262,7 @@ size_t tracker::impl::non_lsa_used_space() const noexcept {
#ifdef SEASTAR_DEFAULT_ALLOCATOR
return 0;
#else
auto free_space_in_lsa = shard_segment_pool.free_segments() * segment_size;
auto free_space_in_lsa = _segment_pool->free_segments() * segment_size;
return memory::stats().allocated_memory() - region_occupancy().total_space() - free_space_in_lsa;
#endif
}
@@ -2256,7 +2270,7 @@ size_t tracker::impl::non_lsa_used_space() const noexcept {
void tracker::impl::reclaim_all_free_segments()
{
llogger.debug("Reclaiming all free segments");
shard_segment_pool.reclaim_all_free_segments();
_segment_pool->reclaim_all_free_segments();
llogger.debug("Reclamation done");
}
@@ -2275,16 +2289,16 @@ void tracker::impl::full_compaction() {
}
static void reclaim_from_evictable(region::impl& r, size_t target_mem_in_use, is_preemptible preempt) {
llogger.debug("reclaim_from_evictable: total_memory_in_use={} target={}", shard_segment_pool.total_memory_in_use(), target_mem_in_use);
llogger.debug("reclaim_from_evictable: total_memory_in_use={} target={}", r.segment_pool().total_memory_in_use(), target_mem_in_use);
// Before attempting segment compaction, try to evict at least deficit and one segment more so that
// for workloads in which eviction order matches allocation order we will reclaim full segments
// without needing to perform expensive compaction.
auto deficit = shard_segment_pool.total_memory_in_use() - target_mem_in_use;
auto deficit = r.segment_pool().total_memory_in_use() - target_mem_in_use;
auto used = r.occupancy().used_space();
auto used_target = used - std::min(used, deficit + segment::size);
while (shard_segment_pool.total_memory_in_use() > target_mem_in_use) {
while (r.segment_pool().total_memory_in_use() > target_mem_in_use) {
used = r.occupancy().used_space();
if (used > used_target) {
llogger.debug("Evicting {} bytes from region {}, occupancy={} in advance",
@@ -2300,7 +2314,7 @@ static void reclaim_from_evictable(region::impl& r, size_t target_mem_in_use, is
llogger.debug("Unable to evict more, evicted {} bytes", used - r.occupancy().used_space());
return;
}
if (shard_segment_pool.total_memory_in_use() <= target_mem_in_use) {
if (r.segment_pool().total_memory_in_use() <= target_mem_in_use) {
llogger.debug("Target met after evicting {} bytes", used - r.occupancy().used_space());
return;
}
@@ -2335,7 +2349,7 @@ idle_cpu_handler_result tracker::impl::compact_on_idle(work_waiting_on_reactor c
if (_regions.empty()) {
return idle_cpu_handler_result::no_more_work;
}
segment_pool::reservation_goal open_emergency_pool(shard_segment_pool, 0);
segment_pool::reservation_goal open_emergency_pool(*_segment_pool, 0);
auto cmp = [] (region::impl* c1, region::impl* c2) {
if (c1->is_idle_compactible() != c2->is_idle_compactible()) {
@@ -2366,7 +2380,7 @@ size_t tracker::impl::reclaim(size_t memory_to_release, is_preemptible preempt)
return 0;
}
reclaiming_lock rl(*this);
reclaim_timer timing_guard("reclaim", preempt, memory_to_release, 0, this);
reclaim_timer timing_guard("reclaim", preempt, memory_to_release, 0, *this);
return timing_guard.set_memory_released(reclaim_locked(memory_to_release, preempt));
}
@@ -2377,7 +2391,7 @@ size_t tracker::impl::reclaim_locked(size_t memory_to_release, is_preemptible pr
// 2. Compact used segments and/or evict data.
constexpr auto max_bytes = std::numeric_limits<size_t>::max() - segment::size;
auto segments_to_release = align_up(std::min(max_bytes, memory_to_release), segment::size) >> segment::size_shift;
auto nr_released = shard_segment_pool.reclaim_segments(segments_to_release, preempt);
auto nr_released = _segment_pool->reclaim_segments(segments_to_release, preempt);
size_t mem_released = nr_released * segment::size;
if (mem_released >= memory_to_release) {
llogger.debug("reclaim_locked() = {}", memory_to_release);
@@ -2388,7 +2402,7 @@ size_t tracker::impl::reclaim_locked(size_t memory_to_release, is_preemptible pr
return mem_released;
}
auto compacted = compact_and_evict_locked(shard_segment_pool.current_emergency_reserve_goal(), memory_to_release - mem_released, preempt);
auto compacted = compact_and_evict_locked(_segment_pool->current_emergency_reserve_goal(), memory_to_release - mem_released, preempt);
if (compacted == 0) {
llogger.debug("reclaim_locked() = {}", mem_released);
@@ -2397,7 +2411,7 @@ size_t tracker::impl::reclaim_locked(size_t memory_to_release, is_preemptible pr
// compact_and_evict_locked() will not return segments to the standard allocator,
// so do it here:
nr_released = shard_segment_pool.reclaim_segments(compacted / segment::size, preempt);
nr_released = _segment_pool->reclaim_segments(compacted / segment::size, preempt);
mem_released += nr_released * segment::size;
llogger.debug("reclaim_locked() = {}", mem_released);
@@ -2435,15 +2449,15 @@ size_t tracker::impl::compact_and_evict_locked(size_t reserve_segments, size_t m
size_t mem_released = 0;
size_t mem_in_use = shard_segment_pool.total_memory_in_use();
memory_to_release += (reserve_segments - std::min(reserve_segments, shard_segment_pool.free_segments())) * segment::size;
size_t mem_in_use = _segment_pool->total_memory_in_use();
memory_to_release += (reserve_segments - std::min(reserve_segments, _segment_pool->free_segments())) * segment::size;
auto target_mem = mem_in_use - std::min(mem_in_use, memory_to_release - mem_released);
llogger.debug("Compacting, requested {} bytes, {} bytes in use, target is {}",
memory_to_release, mem_in_use, target_mem);
// Allow dipping into reserves while compacting
segment_pool::reservation_goal open_emergency_pool(shard_segment_pool, 0);
segment_pool::reservation_goal open_emergency_pool(*_segment_pool, 0);
auto cmp = [] (region::impl* c1, region::impl* c2) {
if (c1->is_compactible() != c2->is_compactible()) {
@@ -2463,11 +2477,11 @@ size_t tracker::impl::compact_and_evict_locked(size_t reserve_segments, size_t m
{
int regions = 0, evictable_regions = 0;
reclaim_timer timing_guard("compact", preempt, memory_to_release, reserve_segments, this, [&] (log_level level) {
reclaim_timer timing_guard("compact", preempt, memory_to_release, reserve_segments, *this, [&] (log_level level) {
timing_logger.log(level, "- processed {} regions: reclaimed from {}, compacted {}",
regions, evictable_regions, regions - evictable_regions);
});
while (shard_segment_pool.total_memory_in_use() > target_mem) {
while (_segment_pool->total_memory_in_use() > target_mem) {
boost::range::pop_heap(_regions, cmp);
region::impl* r = _regions.back();
@@ -2496,11 +2510,11 @@ size_t tracker::impl::compact_and_evict_locked(size_t reserve_segments, size_t m
}
}
auto released_during_compaction = mem_in_use - shard_segment_pool.total_memory_in_use();
auto released_during_compaction = mem_in_use - _segment_pool->total_memory_in_use();
if (shard_segment_pool.total_memory_in_use() > target_mem) {
if (_segment_pool->total_memory_in_use() > target_mem) {
int regions = 0, evictable_regions = 0;
reclaim_timer timing_guard("evict", preempt, memory_to_release, reserve_segments, this, [&] (log_level level) {
reclaim_timer timing_guard("evict", preempt, memory_to_release, reserve_segments, *this, [&] (log_level level) {
timing_logger.log(level, "- processed {} regions, reclaimed from {}", regions, evictable_regions);
});
llogger.debug("Considering evictable regions.");
@@ -2513,14 +2527,14 @@ size_t tracker::impl::compact_and_evict_locked(size_t reserve_segments, size_t m
if (r->is_evictable()) {
++evictable_regions;
reclaim_from_evictable(*r, target_mem, preempt);
if (shard_segment_pool.total_memory_in_use() <= target_mem) {
if (_segment_pool->total_memory_in_use() <= target_mem) {
break;
}
}
}
}
mem_released += mem_in_use - shard_segment_pool.total_memory_in_use();
mem_released += mem_in_use - _segment_pool->total_memory_in_use();
llogger.debug("Released {} bytes (wanted {}), {} during compaction",
mem_released, memory_to_release, released_during_compaction);
@@ -2551,7 +2565,7 @@ void tracker::impl::unregister_region(region::impl* r) noexcept {
_regions.erase(std::remove(_regions.begin(), _regions.end(), r), _regions.end());
}
tracker::impl::impl() {
tracker::impl::impl() : _segment_pool(std::make_unique<logalloc::segment_pool>()) {
namespace sm = seastar::metrics;
_metrics.add_group("lsa", {
@@ -2561,37 +2575,37 @@ tracker::impl::impl() {
sm::make_gauge("used_space_bytes", [this] { return region_occupancy().used_space(); },
sm::description("Holds a current amount of used memory in bytes.")),
sm::make_gauge("small_objects_total_space_bytes", [this] { return region_occupancy().total_space() - shard_segment_pool.non_lsa_memory_in_use(); },
sm::make_gauge("small_objects_total_space_bytes", [this] { return region_occupancy().total_space() - _segment_pool->non_lsa_memory_in_use(); },
sm::description("Holds a current size of \"small objects\" memory region in bytes.")),
sm::make_gauge("small_objects_used_space_bytes", [this] { return region_occupancy().used_space() - shard_segment_pool.non_lsa_memory_in_use(); },
sm::make_gauge("small_objects_used_space_bytes", [this] { return region_occupancy().used_space() - _segment_pool->non_lsa_memory_in_use(); },
sm::description("Holds a current amount of used \"small objects\" memory in bytes.")),
sm::make_gauge("large_objects_total_space_bytes", [] { return shard_segment_pool.non_lsa_memory_in_use(); },
sm::make_gauge("large_objects_total_space_bytes", [this] { return _segment_pool->non_lsa_memory_in_use(); },
sm::description("Holds a current size of allocated non-LSA memory.")),
sm::make_gauge("non_lsa_used_space_bytes", [this] { return non_lsa_used_space(); },
sm::description("Holds a current amount of used non-LSA memory.")),
sm::make_gauge("free_space", [] { return shard_segment_pool.unreserved_free_segments() * segment_size; },
sm::make_gauge("free_space", [this] { return _segment_pool->unreserved_free_segments() * segment_size; },
sm::description("Holds a current amount of free memory that is under lsa control.")),
sm::make_gauge("occupancy", [this] { return region_occupancy().used_fraction() * 100; },
sm::description("Holds a current portion (in percents) of the used memory.")),
sm::make_counter("segments_compacted", [] { return shard_segment_pool.statistics().segments_compacted; },
sm::make_counter("segments_compacted", [this] { return _segment_pool->statistics().segments_compacted; },
sm::description("Counts a number of compacted segments.")),
sm::make_counter("memory_compacted", [] { return shard_segment_pool.statistics().memory_compacted; },
sm::make_counter("memory_compacted", [this] { return _segment_pool->statistics().memory_compacted; },
sm::description("Counts number of bytes which were copied as part of segment compaction.")),
sm::make_counter("memory_allocated", [] { return shard_segment_pool.statistics().memory_allocated; },
sm::make_counter("memory_allocated", [this] { return _segment_pool->statistics().memory_allocated; },
sm::description("Counts number of bytes which were requested from LSA.")),
sm::make_counter("memory_evicted", [] { return shard_segment_pool.statistics().memory_evicted; },
sm::make_counter("memory_evicted", [this] { return _segment_pool->statistics().memory_evicted; },
sm::description("Counts number of bytes which were evicted.")),
sm::make_counter("memory_freed", [] { return shard_segment_pool.statistics().memory_freed; },
sm::make_counter("memory_freed", [this] { return _segment_pool->statistics().memory_freed; },
sm::description("Counts number of bytes which were requested to be freed in LSA.")),
});
}
@@ -2623,11 +2637,11 @@ bool segment_pool::compact_segment(segment* seg) {
}
allocating_section::guard::guard() noexcept
: _prev(shard_segment_pool.emergency_reserve_max())
: _prev(shard_tracker().get_impl().segment_pool().emergency_reserve_max())
{ }
allocating_section::guard::~guard() {
shard_segment_pool.set_emergency_reserve_max(_prev);
shard_tracker().get_impl().segment_pool().set_emergency_reserve_max(_prev);
}
void allocating_section::maybe_decay_reserve() noexcept {
@@ -2659,9 +2673,10 @@ void allocating_section::maybe_decay_reserve() noexcept {
}
void allocating_section::reserve() {
auto& pool = shard_tracker().get_impl().segment_pool();
try {
shard_segment_pool.set_emergency_reserve_max(std::max(_lsa_reserve, _minimum_lsa_emergency_reserve));
shard_segment_pool.refill_emergency_reserve();
pool.set_emergency_reserve_max(std::max(_lsa_reserve, _minimum_lsa_emergency_reserve));
pool.refill_emergency_reserve();
while (true) {
size_t free = memory::stats().free_memory();
@@ -2673,7 +2688,7 @@ void allocating_section::reserve() {
}
}
shard_segment_pool.clear_allocation_failure_flag();
pool.clear_allocation_failure_flag();
} catch (const std::bad_alloc&) {
if (shard_tracker().should_abort_on_bad_alloc()) {
llogger.error("Aborting due to allocation failure");
@@ -2685,7 +2700,7 @@ void allocating_section::reserve() {
void allocating_section::on_alloc_failure(logalloc::region& r) {
r.allocator().invalidate_references();
if (shard_segment_pool.allocation_failure_flag()) {
if (shard_tracker().get_impl().segment_pool().allocation_failure_flag()) {
_lsa_reserve *= 2;
llogger.debug("LSA allocation failure, increasing reserve in section {} to {} segments", fmt::ptr(this), _lsa_reserve);
} else {
@@ -2705,28 +2720,29 @@ void allocating_section::set_std_reserve(size_t reserve) noexcept {
future<> prime_segment_pool(size_t available_memory, size_t min_free_memory) {
return smp::invoke_on_all([=] {
shard_segment_pool.prime(available_memory, min_free_memory);
shard_tracker().get_impl().segment_pool().prime(available_memory, min_free_memory);
});
}
uint64_t memory_allocated() noexcept {
return shard_segment_pool.statistics().memory_allocated;
return shard_tracker().get_impl().segment_pool().statistics().memory_allocated;
}
uint64_t memory_freed() noexcept {
return shard_segment_pool.statistics().memory_freed;
return shard_tracker().get_impl().segment_pool().statistics().memory_freed;
}
uint64_t memory_compacted() noexcept {
return shard_segment_pool.statistics().memory_compacted;
return shard_tracker().get_impl().segment_pool().statistics().memory_compacted;
}
uint64_t memory_evicted() noexcept {
return shard_segment_pool.statistics().memory_evicted;
return shard_tracker().get_impl().segment_pool().statistics().memory_evicted;
}
occupancy_stats lsa_global_occupancy_stats() noexcept {
return occupancy_stats(shard_segment_pool.total_free_memory(), shard_segment_pool.total_memory_in_use());
auto& pool = shard_tracker().get_impl().segment_pool();
return occupancy_stats(pool.total_free_memory(), pool.total_memory_in_use());
}
}