Merge 'sstables_manager: trigger reclaim/reload on components_memory_reclaim_threshold update' from Lakshmi Narayanan Sreethar

The config variable `components_memory_reclaim_threshold` limits the
memory available to the sstable bloom filters. Any change to its value
is not immediately propagated to the sstable manager, despite it being
a LiveUpdate variable. The updated value takes effect only when a new
sstable is created or deleted.

This PR first refactors the reclaim and reload logic into a single
background fiber. It then updates the sstable manager to subscribe to
changes in the `components_memory_reclaim_threshold` configuration value
and immediately triggers the reclaim/reload fiber when a change is
detected.

Fixes #21947

This is an improvement and does not need to be backported.

Closes scylladb/scylladb#22725

* github.com:scylladb/scylladb:
  sstables_manager: trigger reclaim/reload on `components_memory_reclaim_threshold` update
  sstables_manager: maybe_reclaim_components: yield between iterations
  sstables_manager: rename `increment_total_reclaimable_memory_and_maybe_reclaim()`
  sstables_manager: move reclaim logic into `components_reclaim_reload_fiber()`
  sstables_manager: rename `_sstable_deleted_event` condition variable
  sstables_manager: rename `components_reloader_fiber()`
  sstables_manager: fix `maybe_reclaim_components()` indentation
  sstables_manager: reclaim components memory until usage falls below threshold
  sstables_manager: introduce `get_components_memory_reclaim_threshold()`
  sstables_manager: extract `maybe_reclaim_components()`
  sstables_manager: fix `maybe_reload_components()` indentation
  sstables_manager: extract out `maybe_reload_components()`
This commit is contained in:
Avi Kivity
2025-02-17 22:33:33 +02:00
5 changed files with 167 additions and 85 deletions

View File

@@ -1364,7 +1364,7 @@ future<> sstable::open_data(sstable_open_config cfg) noexcept {
_stats.on_open_for_reading();
_total_reclaimable_memory.reset();
_manager.increment_total_reclaimable_memory_and_maybe_reclaim(this);
_manager.increment_total_reclaimable_memory(this);
}
future<> sstable::update_info_for_opened_data(sstable_open_config cfg) {
@@ -1608,7 +1608,7 @@ future<> sstable::load(sstables::foreign_sstable_open_info info) noexcept {
validate_partitioner();
co_await update_info_for_opened_data();
_total_reclaimable_memory.reset();
_manager.increment_total_reclaimable_memory_and_maybe_reclaim(this);
_manager.increment_total_reclaimable_memory(this);
}
future<foreign_sstable_open_info> sstable::get_open_info() & {

View File

@@ -43,7 +43,7 @@ sstables_manager::sstables_manager(
, _maintenance_sg(std::move(maintenance_sg))
, _abort(abort)
{
_components_reloader_status = components_reloader_fiber();
_components_reloader_status = components_reclaim_reload_fiber();
}
sstables_manager::~sstables_manager() {
@@ -152,81 +152,101 @@ sstable_writer_config sstables_manager::configure_writer(sstring origin) const {
return cfg;
}
void sstables_manager::increment_total_reclaimable_memory_and_maybe_reclaim(sstable* sst) {
void sstables_manager::increment_total_reclaimable_memory(sstable* sst) {
_total_reclaimable_memory += sst->total_reclaimable_memory_size();
_components_memory_change_event.signal();
}
size_t memory_reclaim_threshold = _available_memory * _db_config.components_memory_reclaim_threshold();
if (_total_reclaimable_memory <= memory_reclaim_threshold) {
// total memory used is within limit; no need to reclaim.
return;
}
future<> sstables_manager::maybe_reclaim_components() {
while(_total_reclaimable_memory > get_components_memory_reclaim_threshold()) {
// Memory consumption is above threshold. Reclaim from the SSTable that
// has the most reclaimable memory to get the total consumption under limit.
// FIXME: Take SSTable usage into account during reclaim - see https://github.com/scylladb/scylladb/issues/21897
auto sst_with_max_memory = std::max_element(_active.begin(), _active.end(), [](const sstable& sst1, const sstable& sst2) {
return sst1.total_reclaimable_memory_size() < sst2.total_reclaimable_memory_size();
});
// Memory consumption has crossed threshold. Reclaim from the SSTable that
// has the most reclaimable memory to get the total consumption under limit.
auto sst_with_max_memory = std::max_element(_active.begin(), _active.end(), [](const sstable& sst1, const sstable& sst2) {
return sst1.total_reclaimable_memory_size() < sst2.total_reclaimable_memory_size();
auto memory_reclaimed = sst_with_max_memory->reclaim_memory_from_components();
_total_memory_reclaimed += memory_reclaimed;
_total_reclaimable_memory -= memory_reclaimed;
_reclaimed.insert(*sst_with_max_memory);
// TODO: As of now only bloom filter is reclaimed. Print actual component names when adding support for more components.
smlogger.info("Reclaimed {} bytes of memory from components of {}. Total memory reclaimed so far is {} bytes",
memory_reclaimed, sst_with_max_memory->get_filename(), _total_memory_reclaimed);
}
co_await coroutine::maybe_yield();
}
size_t sstables_manager::get_components_memory_reclaim_threshold() const {
return _available_memory * _db_config.components_memory_reclaim_threshold();
}
size_t sstables_manager::get_memory_available_for_reclaimable_components() const {
return get_components_memory_reclaim_threshold() - _total_reclaimable_memory;
}
future<> sstables_manager::components_reclaim_reload_fiber() {
auto components_memory_reclaim_threshold_observer = _db_config.components_memory_reclaim_threshold.observe([&] (double) {
// any change to the components_memory_reclaim_threshold config should trigger reload/reclaim
_components_memory_change_event.signal();
});
auto memory_reclaimed = sst_with_max_memory->reclaim_memory_from_components();
_total_memory_reclaimed += memory_reclaimed;
_total_reclaimable_memory -= memory_reclaimed;
_reclaimed.insert(*sst_with_max_memory);
// TODO: As of now only bloom filter is reclaimed. Print actual component names when adding support for more components.
smlogger.info("Reclaimed {} bytes of memory from components of {}. Total memory reclaimed so far is {} bytes",
memory_reclaimed, sst_with_max_memory->get_filename(), _total_memory_reclaimed);
}
size_t sstables_manager::get_memory_available_for_reclaimable_components() {
size_t memory_reclaim_threshold = _available_memory * _db_config.components_memory_reclaim_threshold();
return memory_reclaim_threshold - _total_reclaimable_memory;
}
future<> sstables_manager::components_reloader_fiber() {
co_await coroutine::switch_to(_maintenance_sg);
sstlog.trace("components_reloader_fiber start");
while (true) {
co_await _sstable_deleted_event.when();
co_await _components_memory_change_event.when();
if (_closing) {
co_return;
}
// Reload bloom filters from the smallest to largest so as to maximize
// the number of bloom filters being reloaded.
auto memory_available = get_memory_available_for_reclaimable_components();
while (!_reclaimed.empty() && memory_available > 0) {
auto sstable_to_reload = _reclaimed.begin();
const size_t reclaimed_memory = sstable_to_reload->total_memory_reclaimed();
if (reclaimed_memory > memory_available) {
// cannot reload anymore sstables
break;
}
// Increment the total memory before reloading to prevent any parallel
// fibers from loading new bloom filters into memory.
_total_reclaimable_memory += reclaimed_memory;
_reclaimed.erase(sstable_to_reload);
// Use a lw_shared_ptr to prevent the sstable from getting deleted when
// the components are being reloaded.
auto sstable_ptr = sstable_to_reload->shared_from_this();
try {
co_await sstable_ptr->reload_reclaimed_components();
} catch (...) {
// reload failed due to some reason
sstlog.warn("Failed to reload reclaimed SSTable components : {}", std::current_exception());
// revert back changes made before the reload
_total_reclaimable_memory -= reclaimed_memory;
_reclaimed.insert(*sstable_to_reload);
break;
}
_total_memory_reclaimed -= reclaimed_memory;
memory_available = get_memory_available_for_reclaimable_components();
if (_total_reclaimable_memory > get_components_memory_reclaim_threshold()) {
// reclaim memory to bring total memory usage under threshold
co_await maybe_reclaim_components();
} else {
// memory available for reloading components of previously reclaimed SSTables
co_await maybe_reload_components();
}
}
}
future<> sstables_manager::maybe_reload_components() {
// Reload bloom filters from the smallest to largest so as to maximize
// the number of bloom filters being reloaded.
auto memory_available = get_memory_available_for_reclaimable_components();
while (!_reclaimed.empty() && memory_available > 0) {
auto sstable_to_reload = _reclaimed.begin();
const size_t reclaimed_memory = sstable_to_reload->total_memory_reclaimed();
if (reclaimed_memory > memory_available) {
// cannot reload anymore sstables
break;
}
// Increment the total memory before reloading to prevent any parallel
// fibers from loading new bloom filters into memory.
_total_reclaimable_memory += reclaimed_memory;
_reclaimed.erase(sstable_to_reload);
// Use a lw_shared_ptr to prevent the sstable from getting deleted when
// the components are being reloaded.
auto sstable_ptr = sstable_to_reload->shared_from_this();
try {
co_await sstable_ptr->reload_reclaimed_components();
} catch (...) {
// reload failed due to some reason
sstlog.warn("Failed to reload reclaimed SSTable components : {}", std::current_exception());
// revert back changes made before the reload
_total_reclaimable_memory -= reclaimed_memory;
_reclaimed.insert(*sstable_to_reload);
break;
}
_total_memory_reclaimed -= reclaimed_memory;
memory_available = get_memory_available_for_reclaimable_components();
}
}
void sstables_manager::reclaim_memory_and_stop_tracking_sstable(sstable* sst) {
// remove the sstable from the memory tracking metrics
_total_reclaimable_memory -= sst->total_reclaimable_memory_size();
@@ -262,7 +282,7 @@ void sstables_manager::deactivate(sstable* sst) {
void sstables_manager::remove(sstable* sst) {
_undergoing_close.erase(_undergoing_close.iterator_to(*sst));
delete sst;
_sstable_deleted_event.signal();
_components_memory_change_event.signal();
maybe_done();
}
@@ -297,7 +317,7 @@ future<> sstables_manager::close() {
co_await _done.get_future();
co_await _sstable_metadata_concurrency_sem.stop();
// stop the components reload fiber
_sstable_deleted_event.signal();
_components_memory_change_event.signal();
co_await std::move(_components_reloader_status);
}

View File

@@ -108,8 +108,8 @@ private:
size_t _total_memory_reclaimed{0};
// Set of sstables from which memory has been reclaimed
set_type _reclaimed;
// Condition variable that gets notified when an sstable is deleted
seastar::condition_variable _sstable_deleted_event;
// Condition variable that needs to be notified when an sstable is created or deleted
seastar::condition_variable _components_memory_change_event;
future<> _components_reloader_status = make_ready_future<>();
bool _closing = false;
@@ -212,13 +212,16 @@ private:
// Allow at most 10% of memory to be filled with such reads.
size_t max_memory_sstable_metadata_concurrent_reads(size_t available_memory) { return available_memory * 0.1; }
// Increment the _total_reclaimable_memory with the new SSTable's reclaimable
// memory and if the total memory usage exceeds the pre-defined threshold,
// reclaim it from the SSTable that has the most reclaimable memory.
void increment_total_reclaimable_memory_and_maybe_reclaim(sstable* sst);
// Increment the _total_reclaimable_memory with the new SSTable's reclaimable memory
void increment_total_reclaimable_memory(sstable* sst);
// Fiber to reload reclaimed components back into memory when memory becomes available.
future<> components_reloader_fiber();
size_t get_memory_available_for_reclaimable_components();
future<> components_reclaim_reload_fiber();
// Reclaims components from SSTables if total memory usage exceeds the threshold.
future<> maybe_reclaim_components();
// Reloads components from reclaimed SSTables if memory is available.
future<> maybe_reload_components();
size_t get_components_memory_reclaim_threshold() const;
size_t get_memory_available_for_reclaimable_components() const;
// Reclaim memory from the SSTable and remove it from the memory tracking metrics.
// The method is idempotent and for an sstable that is deleted, it is called both
// during unlink and during deactivation.

View File

@@ -13,6 +13,7 @@
#include "test/lib/sstable_test_env.hh"
#include "test/lib/sstable_utils.hh"
#include "db/config.hh"
#include "readers/from_mutations_v2.hh"
#include "utils/bloom_filter.hh"
#include "utils/error_injection.hh"
@@ -78,8 +79,8 @@ SEASTAR_TEST_CASE(test_sstable_manager_auto_reclaim_and_reload_of_bloom_filter)
// Verify manager reclaims from the largest sst when the total usage crosses thresold.
auto [sst3, sst3_bf_memory] = create_sstable_with_bloom_filter(env, sst_mgr, schema_ptr, 50);
// sst1 has the most reclaimable memory
BOOST_REQUIRE_EQUAL(sst1->filter_memory_size(), 0);
// sst1 has the most reclaimable memory, so its filter should be reclaimed
REQUIRE_EVENTUALLY_EQUAL<size_t>([&] { return sst1->filter_memory_size(); }, 0);
BOOST_REQUIRE_EQUAL(sst2->filter_memory_size(), sst2_bf_memory);
BOOST_REQUIRE_EQUAL(sst3->filter_memory_size(), sst3_bf_memory);
BOOST_REQUIRE_EQUAL(sst_mgr.get_total_memory_reclaimed(), sst1_bf_memory);
@@ -87,10 +88,10 @@ SEASTAR_TEST_CASE(test_sstable_manager_auto_reclaim_and_reload_of_bloom_filter)
// Reclaim should also work on the latest sst being added
auto [sst4, sst4_bf_memory] = create_sstable_with_bloom_filter(env, sst_mgr, schema_ptr, 100);
// sst4 should have been reclaimed
REQUIRE_EVENTUALLY_EQUAL<size_t>([&] { return sst4->filter_memory_size(); }, 0);
BOOST_REQUIRE_EQUAL(sst1->filter_memory_size(), 0);
BOOST_REQUIRE_EQUAL(sst2->filter_memory_size(), sst2_bf_memory);
BOOST_REQUIRE_EQUAL(sst3->filter_memory_size(), sst3_bf_memory);
BOOST_REQUIRE_EQUAL(sst4->filter_memory_size(), 0);
BOOST_REQUIRE_EQUAL(sst_mgr.get_total_memory_reclaimed(), sst1_bf_memory + sst4_bf_memory);
// Test auto reload - disposing sst3 should trigger reload of the
@@ -152,7 +153,7 @@ SEASTAR_TEST_CASE(test_bloom_filter_reclaim_during_reload) {
auto [sst2, sst2_bf_memory] = create_sstable_with_bloom_filter(env, sst_mgr, schema_ptr, 60);
// total memory used by the bloom filters has crossed the threshold, so sst1's
// filter, which occupies the most memory, will be discarded from memory.
BOOST_REQUIRE_EQUAL(sst1->filter_memory_size(), 0);
REQUIRE_EVENTUALLY_EQUAL<size_t>([&] { return sst1->filter_memory_size(); }, 0);
BOOST_REQUIRE_EQUAL(sst2->filter_memory_size(), sst2_bf_memory);
BOOST_REQUIRE_EQUAL(sst_mgr.get_total_memory_reclaimed(), sst1_bf_memory);
@@ -164,20 +165,21 @@ SEASTAR_TEST_CASE(test_bloom_filter_reclaim_during_reload) {
// _total_reclaimable_memory will be updated when the reload begins; wait for it.
REQUIRE_EVENTUALLY_EQUAL<size_t>([&] { return sst_mgr.get_total_reclaimable_memory(); }, sst1_bf_memory);
// now that the reload is midway and paused, create new sst to verify that its
// filter gets evicted immediately as the memory that became available is reserved
// for sst1's filter reload.
// now that the reload is midway and paused, create new sst;
// it will not be reclaimed immediately as another reload is in progress
auto [sst3, sst3_bf_memory] = create_sstable_with_bloom_filter(env, sst_mgr, schema_ptr, 80);
BOOST_REQUIRE_EQUAL(sst3->filter_memory_size(), 0);
// confirm sst1 is not reloaded yet
REQUIRE_EVENTUALLY_EQUAL<size_t>([&] { return sst3->filter_memory_size(); }, sst3_bf_memory);
BOOST_REQUIRE_EQUAL(sst_mgr.get_total_reclaimable_memory(), sst1_bf_memory + sst3_bf_memory);
// verify sst1 is not actually reloaded yet
BOOST_REQUIRE_EQUAL(sst1->filter_memory_size(), 0);
BOOST_REQUIRE_EQUAL(sst_mgr.get_total_memory_reclaimed(), sst1_bf_memory + sst3_bf_memory);
BOOST_REQUIRE_EQUAL(sst_mgr.get_total_memory_reclaimed(), sst1_bf_memory);
// resume reloading sst1 filter
// Resume reloading sst1 filter - it will eventually be reclaimed again
utils::get_local_injector().receive_message("reload_reclaimed_components/pause");
REQUIRE_EVENTUALLY_EQUAL<size_t>([&] { return sst1->filter_memory_size(); }, sst1_bf_memory);
REQUIRE_EVENTUALLY_EQUAL<size_t>([&] { return sst_mgr.get_total_memory_reclaimed(); }, sst3_bf_memory);
BOOST_REQUIRE_EQUAL(sst_mgr.get_total_reclaimable_memory(), sst1_bf_memory);
// Eventually only sst3's bloom filter will be in memory
REQUIRE_EVENTUALLY_EQUAL<size_t>([&] { return sst_mgr.get_total_reclaimable_memory(); }, sst3_bf_memory);
BOOST_REQUIRE_EQUAL(sst_mgr.get_total_memory_reclaimed(), sst1_bf_memory);
BOOST_REQUIRE_EQUAL(sst1->filter_memory_size(), 0);
utils::get_local_injector().disable("reload_reclaimed_components/pause");
}, {
@@ -346,3 +348,60 @@ SEASTAR_TEST_CASE(test_bloom_filter_reclaim_after_unlink) {
.available_memory = 100
});
};
SEASTAR_TEST_CASE(test_components_memory_reclaim_threshold_liveupdateness) {
return test_env::do_with_async([] (test_env& env) {
simple_schema ss;
auto schema_ptr = ss.schema();
auto& sst_mgr = env.manager();
BOOST_REQUIRE_EQUAL(env.db_config().components_memory_reclaim_threshold(), 0.2);
// create a few sstables and verify their bloom filters are still in memory
auto [sst1, sst1_bf_memory] = create_sstable_with_bloom_filter(env, sst_mgr, schema_ptr, 70);
auto [sst2, sst2_bf_memory] = create_sstable_with_bloom_filter(env, sst_mgr, schema_ptr, 50);
auto [sst3, sst3_bf_memory] = create_sstable_with_bloom_filter(env, sst_mgr, schema_ptr, 20);
BOOST_REQUIRE_EQUAL(sst1->filter_memory_size(), sst1_bf_memory);
BOOST_REQUIRE_EQUAL(sst2->filter_memory_size(), sst2_bf_memory);
BOOST_REQUIRE_EQUAL(sst3->filter_memory_size(), sst3_bf_memory);
BOOST_REQUIRE_EQUAL(sst_mgr.get_total_memory_reclaimed(), 0);
BOOST_REQUIRE_EQUAL(sst_mgr.get_total_reclaimable_memory(), sst1_bf_memory + sst2_bf_memory + sst3_bf_memory);
// reduce the threshold to 0.1 and verify that sst1's bloom filter, which occupies most memory, gets evicted
env.db_config().components_memory_reclaim_threshold.set(0.1);
REQUIRE_EVENTUALLY_EQUAL<size_t>([&] { return sst1->filter_memory_size(); }, 0);
BOOST_REQUIRE_EQUAL(sst_mgr.get_total_memory_reclaimed(), sst1_bf_memory);
// the other two ssts are untouched
BOOST_REQUIRE_EQUAL(sst2->filter_memory_size(), sst2_bf_memory);
BOOST_REQUIRE_EQUAL(sst3->filter_memory_size(), sst3_bf_memory);
BOOST_REQUIRE_EQUAL(sst_mgr.get_total_reclaimable_memory(), sst2_bf_memory + sst3_bf_memory);
// reduce the threshold to 0 and verify that no bloom filter is in memory
env.db_config().components_memory_reclaim_threshold.set(0);
REQUIRE_EVENTUALLY_EQUAL<size_t>([&] { return sst_mgr.get_total_memory_reclaimed(); }, sst1_bf_memory + sst2_bf_memory + sst3_bf_memory);
BOOST_REQUIRE_EQUAL(sst_mgr.get_total_reclaimable_memory(), 0);
BOOST_REQUIRE_EQUAL(sst1->filter_memory_size(), 0);
BOOST_REQUIRE_EQUAL(sst2->filter_memory_size(), 0);
BOOST_REQUIRE_EQUAL(sst3->filter_memory_size(), 0);
// increase threshold back 0.1 and expect sst2 and sst3's bloom filter to be reloaded
env.db_config().components_memory_reclaim_threshold.set(0.1);
REQUIRE_EVENTUALLY_EQUAL<size_t>([&] { return sst3->filter_memory_size(); }, sst3_bf_memory);
REQUIRE_EVENTUALLY_EQUAL<size_t>([&] { return sst2->filter_memory_size(); }, sst2_bf_memory);
BOOST_REQUIRE_EQUAL(sst_mgr.get_total_reclaimable_memory(), sst2_bf_memory + sst3_bf_memory);
// sst1's bloom filter is not reloaded yet due to lack of available memory
REQUIRE_EVENTUALLY_EQUAL<size_t>([&] { return sst_mgr.get_total_memory_reclaimed(); }, sst1_bf_memory);
BOOST_REQUIRE_EQUAL(sst1->filter_memory_size(), 0);
// increase threshold back to 0.2 and expect sst1 to be reloaded
env.db_config().components_memory_reclaim_threshold.set(0.2);
REQUIRE_EVENTUALLY_EQUAL<size_t>([&] { return sst_mgr.get_total_memory_reclaimed(); }, 0);
BOOST_REQUIRE_EQUAL(sst1->filter_memory_size(), sst1_bf_memory);
BOOST_REQUIRE_EQUAL(sst2->filter_memory_size(), sst2_bf_memory);
BOOST_REQUIRE_EQUAL(sst3->filter_memory_size(), sst3_bf_memory);
BOOST_REQUIRE_EQUAL(sst_mgr.get_total_reclaimable_memory(), sst1_bf_memory + sst2_bf_memory + sst3_bf_memory);
}, {
// limit available memory to the sstables_manager to test reclaiming.
// this will set the reclaim threshold to 200 bytes.
.available_memory = 1000
});
}

View File

@@ -53,7 +53,7 @@ public:
}
void increment_total_reclaimable_memory_and_maybe_reclaim(sstable *sst) {
sstables_manager::increment_total_reclaimable_memory_and_maybe_reclaim(sst);
sstables_manager::increment_total_reclaimable_memory(sst);
}
size_t get_total_memory_reclaimed() {