backlog_controller: Unwrap scheduling_group
Some time ago (997a34bf8c) the backlog
controller was generalized to maintain some scheduling group. Back then
the group was the pair of seastar::scheduling_group and
seastar::io_priority_class. Now the latter is gone, so the controller's
notion of what sched group is can be relaxed.
Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
Closes #14266
This commit is contained in:
committed by
Botond Dénes
parent
3cf15e6ad7
commit
5412c7947a
@@ -37,9 +37,8 @@
|
||||
// The constants q1 and q2 are used to determine the proportional factor at each stage.
|
||||
class backlog_controller {
|
||||
public:
|
||||
struct scheduling_group {
|
||||
seastar::scheduling_group cpu = default_scheduling_group();
|
||||
};
|
||||
using scheduling_group = seastar::scheduling_group;
|
||||
|
||||
future<> shutdown() {
|
||||
_update_timer.cancel();
|
||||
return std::move(_inflight_update);
|
||||
|
||||
@@ -433,7 +433,7 @@ protected:
|
||||
// it cannot be the other way around, or minor compaction for this table would be
|
||||
// prevented while an ongoing major compaction doesn't release the semaphore.
|
||||
virtual future<compaction_manager::compaction_stats_opt> do_run() override {
|
||||
co_await coroutine::switch_to(_cm.maintenance_sg().cpu);
|
||||
co_await coroutine::switch_to(_cm.maintenance_sg());
|
||||
|
||||
switch_state(state::pending);
|
||||
auto units = co_await acquire_semaphore(_cm._maintenance_ops_sem);
|
||||
@@ -715,7 +715,7 @@ sstables::compaction_stopped_exception compaction_task_executor::make_compaction
|
||||
compaction_manager::compaction_manager(config cfg, abort_source& as, tasks::task_manager& tm)
|
||||
: _task_manager_module(make_shared<task_manager_module>(tm))
|
||||
, _cfg(std::move(cfg))
|
||||
, _compaction_submission_timer(compaction_sg().cpu, compaction_submission_callback())
|
||||
, _compaction_submission_timer(compaction_sg(), compaction_submission_callback())
|
||||
, _compaction_controller(make_compaction_controller(compaction_sg(), static_shares(), [this] () -> float {
|
||||
_last_backlog = backlog();
|
||||
auto b = _last_backlog / available_memory();
|
||||
@@ -752,7 +752,7 @@ compaction_manager::compaction_manager(config cfg, abort_source& as, tasks::task
|
||||
compaction_manager::compaction_manager(tasks::task_manager& tm)
|
||||
: _task_manager_module(make_shared<task_manager_module>(tm))
|
||||
, _cfg(config{ .available_memory = 1 })
|
||||
, _compaction_submission_timer(compaction_sg().cpu, compaction_submission_callback())
|
||||
, _compaction_submission_timer(compaction_sg(), compaction_submission_callback())
|
||||
, _compaction_controller(make_compaction_controller(compaction_sg(), 1, [] () -> float { return 1.0; }))
|
||||
, _backlog_manager(_compaction_controller)
|
||||
, _throughput_updater(serialized_action([this] { return update_throughput(throughput_mbs()); }))
|
||||
@@ -774,7 +774,7 @@ compaction_manager::~compaction_manager() {
|
||||
|
||||
future<> compaction_manager::update_throughput(uint32_t value_mbs) {
|
||||
uint64_t bps = ((uint64_t)(value_mbs != 0 ? value_mbs : std::numeric_limits<uint32_t>::max())) << 20;
|
||||
return compaction_sg().cpu.update_io_bandwidth(bps).then_wrapped([value_mbs] (auto f) {
|
||||
return compaction_sg().update_io_bandwidth(bps).then_wrapped([value_mbs] (auto f) {
|
||||
if (f.failed()) {
|
||||
cmlog.warn("Couldn't update compaction bandwidth: {}", f.get_exception());
|
||||
} else if (value_mbs != 0) {
|
||||
@@ -1011,7 +1011,7 @@ public:
|
||||
{}
|
||||
protected:
|
||||
virtual future<compaction_manager::compaction_stats_opt> do_run() override {
|
||||
co_await coroutine::switch_to(_cm.compaction_sg().cpu);
|
||||
co_await coroutine::switch_to(_cm.compaction_sg());
|
||||
|
||||
for (;;) {
|
||||
if (!can_proceed()) {
|
||||
@@ -1243,7 +1243,7 @@ private:
|
||||
}
|
||||
protected:
|
||||
virtual future<compaction_manager::compaction_stats_opt> do_run() override {
|
||||
co_await coroutine::switch_to(_cm.maintenance_sg().cpu);
|
||||
co_await coroutine::switch_to(_cm.maintenance_sg());
|
||||
|
||||
for (;;) {
|
||||
if (!can_proceed()) {
|
||||
@@ -1333,7 +1333,7 @@ protected:
|
||||
|
||||
private:
|
||||
future<sstables::compaction_result> rewrite_sstable(const sstables::shared_sstable sst) {
|
||||
co_await coroutine::switch_to(_cm.compaction_sg().cpu);
|
||||
co_await coroutine::switch_to(_cm.compaction_sg());
|
||||
|
||||
for (;;) {
|
||||
switch_state(state::active);
|
||||
@@ -1431,7 +1431,7 @@ protected:
|
||||
|
||||
private:
|
||||
future<sstables::compaction_result> validate_sstable(const sstables::shared_sstable& sst) {
|
||||
co_await coroutine::switch_to(_cm.maintenance_sg().cpu);
|
||||
co_await coroutine::switch_to(_cm.maintenance_sg());
|
||||
|
||||
switch_state(state::active);
|
||||
std::exception_ptr ex;
|
||||
@@ -1523,7 +1523,7 @@ protected:
|
||||
}
|
||||
private:
|
||||
future<> run_cleanup_job(sstables::compaction_descriptor descriptor) {
|
||||
co_await coroutine::switch_to(_cm.compaction_sg().cpu);
|
||||
co_await coroutine::switch_to(_cm.compaction_sg());
|
||||
|
||||
// Releases reference to cleaned files such that respective used disk space can be freed.
|
||||
auto release_exhausted = [this, &descriptor] (std::vector<sstables::shared_sstable> exhausted_sstables) mutable {
|
||||
|
||||
@@ -319,7 +319,7 @@ database::database(const db::config& cfg, database_config dbcfg, service::migrat
|
||||
, _system_dirty_memory_manager(*this, 10 << 20, cfg.unspooled_dirty_soft_limit(), default_scheduling_group())
|
||||
, _dirty_memory_manager(*this, dbcfg.available_memory * 0.50, cfg.unspooled_dirty_soft_limit(), dbcfg.statement_scheduling_group)
|
||||
, _dbcfg(dbcfg)
|
||||
, _flush_sg(backlog_controller::scheduling_group{dbcfg.memtable_scheduling_group})
|
||||
, _flush_sg(dbcfg.memtable_scheduling_group)
|
||||
, _memtable_controller(make_flush_controller(_cfg, _flush_sg, [this, limit = float(_dirty_memory_manager.throttle_threshold())] {
|
||||
auto backlog = (_dirty_memory_manager.unspooled_dirty_memory()) / limit;
|
||||
if (_dirty_memory_manager.has_extraneous_flushes_requested()) {
|
||||
@@ -457,7 +457,7 @@ float backlog_controller::backlog_of_shares(float shares) const {
|
||||
}
|
||||
|
||||
void backlog_controller::update_controller(float shares) {
|
||||
_scheduling_group.cpu.set_shares(shares);
|
||||
_scheduling_group.set_shares(shares);
|
||||
}
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user