controllers: unify the I/O and CPU controllers
We have had so far an I/O controller, for compactions and memtables, and a CPU controller, for memtables only -- since the scheduling was still quota-based. Now that the CPU scheduler is fully functional, it is time to do away with the differences and integrate them both into one. We now have a memtable controller and a compaction controller, and they control both CPU and I/O. In the future, we may want to control processes that don't do one of them, like cache updates. If that ever happens, we'll try to make controlling one of them optional. But for now, since the I/O and CPU controllers for our main two processes would look exactly the same we should integrate them. Signed-off-by: Glauber Costa <glauber@scylladb.com>
This commit is contained in:
@@ -45,90 +45,56 @@
|
||||
//
|
||||
// The constants q1 and q2 are used to determine the proportional factor at each stage.
|
||||
class backlog_controller {
|
||||
public:
|
||||
future<> shutdown() {
|
||||
return std::move(_inflight_update);
|
||||
}
|
||||
protected:
|
||||
struct control_point {
|
||||
float input;
|
||||
float output;
|
||||
};
|
||||
|
||||
seastar::scheduling_group _scheduling_group;
|
||||
const ::io_priority_class& _io_priority;
|
||||
std::chrono::milliseconds _interval;
|
||||
timer<> _update_timer;
|
||||
|
||||
std::vector<control_point> _control_points;
|
||||
|
||||
std::function<float()> _current_backlog;
|
||||
// updating shares for an I/O class may contact another shard and returns a future.
|
||||
future<> _inflight_update;
|
||||
|
||||
virtual void update_controller(float quota) = 0;
|
||||
virtual void update_controller(float quota);
|
||||
|
||||
void adjust();
|
||||
|
||||
backlog_controller(std::chrono::milliseconds interval, std::vector<control_point> control_points, std::function<float()> backlog)
|
||||
: _interval(interval)
|
||||
backlog_controller(seastar::scheduling_group sg, const ::io_priority_class& iop, std::chrono::milliseconds interval,
|
||||
std::vector<control_point> control_points, std::function<float()> backlog)
|
||||
: _scheduling_group(sg)
|
||||
, _io_priority(iop)
|
||||
, _interval(interval)
|
||||
, _update_timer([this] { adjust(); })
|
||||
, _control_points({{0,0}})
|
||||
, _current_backlog(std::move(backlog))
|
||||
, _inflight_update(make_ready_future<>())
|
||||
{
|
||||
_control_points.insert(_control_points.end(), control_points.begin(), control_points.end());
|
||||
_update_timer.arm_periodic(_interval);
|
||||
}
|
||||
|
||||
// Used when the controllers are disabled. When we deprecate the --auto-adjust-flush-quota
|
||||
// parameter we can delete this constructor.
|
||||
backlog_controller() = default;
|
||||
virtual ~backlog_controller() {}
|
||||
};
|
||||
|
||||
|
||||
class backlog_cpu_controller : public backlog_controller {
|
||||
public:
|
||||
float current_shares() const {
|
||||
return _current_shares;
|
||||
}
|
||||
protected:
|
||||
unsigned _current_shares = 1;
|
||||
|
||||
void update_controller(float quota) override;
|
||||
|
||||
seastar::scheduling_group _scheduling_group;
|
||||
|
||||
backlog_cpu_controller(seastar::scheduling_group sg, float static_shares)
|
||||
// Used when the controllers are disabled and a static share is used
|
||||
// When that option is deprecated we should remove this.
|
||||
backlog_controller(seastar::scheduling_group sg, const ::io_priority_class& iop, float static_shares)
|
||||
: _scheduling_group(sg)
|
||||
{
|
||||
update_controller(static_shares);
|
||||
}
|
||||
backlog_cpu_controller(seastar::scheduling_group sg, std::chrono::milliseconds interval, std::vector<backlog_controller::control_point> control_points, std::function<float()> backlog)
|
||||
: backlog_controller(interval, std::move(control_points), backlog)
|
||||
, _scheduling_group(sg)
|
||||
{}
|
||||
};
|
||||
|
||||
// Right now: the CPU controller deals with quotas, the I/O controller deals with shares.
|
||||
// The I/O Controllers will be always-enabled, the CPU controllers, conditionally. So it simplifies
|
||||
// things to keep them separate. When the work on the CPU controller is fully done, we can unify
|
||||
// them.
|
||||
class backlog_io_controller : public backlog_controller {
|
||||
const ::io_priority_class& _io_priority;
|
||||
// updating shares for an I/O class may contact another shard and returns a future.
|
||||
future<> _inflight_update;
|
||||
|
||||
public:
|
||||
backlog_io_controller(const ::io_priority_class& iop, float static_shares)
|
||||
: _io_priority(iop)
|
||||
, _inflight_update(make_ready_future<>())
|
||||
{
|
||||
update_controller(static_shares);
|
||||
}
|
||||
backlog_io_controller(const ::io_priority_class& iop, std::chrono::milliseconds interval, std::vector<backlog_controller::control_point> control_points, std::function<float()> backlog)
|
||||
: backlog_controller(interval, std::move(control_points), backlog)
|
||||
, _io_priority(iop)
|
||||
, _inflight_update(make_ready_future<>())
|
||||
{}
|
||||
|
||||
void update_controller(float shares) override;
|
||||
|
||||
future<> shutdown() {
|
||||
return std::move(_inflight_update);
|
||||
{
|
||||
update_controller(static_shares);
|
||||
}
|
||||
|
||||
virtual ~backlog_controller() {}
|
||||
};
|
||||
|
||||
// memtable flush CPU controller.
|
||||
@@ -146,37 +112,24 @@ public:
|
||||
//
|
||||
// In the second half, we're getting close to the hard dirty limit so we increase the slope and
|
||||
// become more responsive, up to a maximum quota of qmax.
|
||||
class flush_cpu_controller : public backlog_cpu_controller {
|
||||
class flush_controller : public backlog_controller {
|
||||
static constexpr float hard_dirty_limit = 1.0f;
|
||||
public:
|
||||
flush_cpu_controller(seastar::scheduling_group sg, float static_shares) : backlog_cpu_controller(sg, static_shares) {}
|
||||
flush_cpu_controller(flush_cpu_controller&&) = default;
|
||||
flush_cpu_controller(seastar::scheduling_group sg, std::chrono::milliseconds interval, float soft_limit, std::function<float()> current_dirty)
|
||||
: backlog_cpu_controller(sg, std::move(interval),
|
||||
flush_controller(seastar::scheduling_group sg, const ::io_priority_class& iop, float static_shares) : backlog_controller(sg, iop, static_shares) {}
|
||||
flush_controller(seastar::scheduling_group sg, const ::io_priority_class& iop, std::chrono::milliseconds interval, float soft_limit, std::function<float()> current_dirty)
|
||||
: backlog_controller(sg, iop, std::move(interval),
|
||||
std::vector<backlog_controller::control_point>({{soft_limit, 100}, {soft_limit + (hard_dirty_limit - soft_limit) / 2, 200} , {hard_dirty_limit, 1000}}),
|
||||
std::move(current_dirty)
|
||||
)
|
||||
{}
|
||||
};
|
||||
|
||||
class flush_io_controller : public backlog_io_controller {
|
||||
static constexpr float hard_dirty_limit = 1.0f;
|
||||
public:
|
||||
flush_io_controller(const ::io_priority_class& iop, float static_shares) : backlog_io_controller(iop, static_shares) {}
|
||||
flush_io_controller(const ::io_priority_class& iop, std::chrono::milliseconds interval, float soft_limit, std::function<float()> current_backlog)
|
||||
: backlog_io_controller(iop, std::move(interval),
|
||||
std::vector<backlog_controller::control_point>({{soft_limit, 100}, {soft_limit + (hard_dirty_limit - soft_limit) / 2, 200}, {hard_dirty_limit, 1000}}),
|
||||
std::move(current_backlog)
|
||||
)
|
||||
{}
|
||||
};
|
||||
|
||||
class compaction_io_controller : public backlog_io_controller {
|
||||
class compaction_controller : public backlog_controller {
|
||||
public:
|
||||
static constexpr unsigned normalization_factor = 10;
|
||||
compaction_io_controller(const ::io_priority_class& iop, float static_shares) : backlog_io_controller(iop, static_shares) {}
|
||||
compaction_io_controller(const ::io_priority_class& iop, std::chrono::milliseconds interval, std::function<float()> current_backlog)
|
||||
: backlog_io_controller(iop, std::move(interval),
|
||||
compaction_controller(seastar::scheduling_group sg, const ::io_priority_class& iop, float static_shares) : backlog_controller(sg, iop, static_shares) {}
|
||||
compaction_controller(seastar::scheduling_group sg, const ::io_priority_class& iop, std::chrono::milliseconds interval, std::function<float()> current_backlog)
|
||||
: backlog_controller(sg, iop, std::move(interval),
|
||||
std::vector<backlog_controller::control_point>({{0.5, 10}, {1.5, 100} , {normalization_factor, 1000}}),
|
||||
std::move(current_backlog)
|
||||
)
|
||||
|
||||
46
database.cc
46
database.cc
@@ -2055,30 +2055,21 @@ future<> distributed_loader::populate_column_family(distributed<database>& db, s
|
||||
}
|
||||
|
||||
inline
|
||||
flush_cpu_controller
|
||||
make_flush_controller(db::config& cfg, seastar::scheduling_group sg, std::function<double()> fn) {
|
||||
flush_controller
|
||||
make_flush_controller(db::config& cfg, seastar::scheduling_group sg, const ::io_priority_class& iop, std::function<double()> fn) {
|
||||
if (cfg.memtable_flush_static_shares() > 0) {
|
||||
return flush_cpu_controller(sg, cfg.memtable_flush_static_shares());
|
||||
return flush_controller(sg, iop, cfg.memtable_flush_static_shares());
|
||||
}
|
||||
return flush_cpu_controller(sg, 250ms, cfg.virtual_dirty_soft_limit(), std::move(fn));
|
||||
return flush_controller(sg, iop, 250ms, cfg.virtual_dirty_soft_limit(), std::move(fn));
|
||||
}
|
||||
|
||||
inline
|
||||
flush_io_controller
|
||||
make_flush_controller(db::config& cfg, const ::io_priority_class& iop, std::function<double()> fn) {
|
||||
if (cfg.memtable_flush_static_shares() > 0) {
|
||||
return flush_io_controller(iop, cfg.memtable_flush_static_shares());
|
||||
}
|
||||
return flush_io_controller(iop, 250ms, cfg.virtual_dirty_soft_limit(), std::move(fn));
|
||||
}
|
||||
|
||||
inline
|
||||
compaction_io_controller
|
||||
make_compaction_controller(db::config& cfg, const ::io_priority_class& iop, std::function<double()> fn) {
|
||||
compaction_controller
|
||||
make_compaction_controller(db::config& cfg, seastar::scheduling_group sg, const ::io_priority_class& iop, std::function<double()> fn) {
|
||||
if (cfg.compaction_static_shares() > 0) {
|
||||
return compaction_io_controller(iop, cfg.compaction_static_shares());
|
||||
return compaction_controller(sg, iop, cfg.compaction_static_shares());
|
||||
}
|
||||
return compaction_io_controller(iop, 250ms, std::move(fn));
|
||||
return compaction_controller(sg, iop, 250ms, std::move(fn));
|
||||
}
|
||||
|
||||
utils::UUID database::empty_version = utils::UUID_gen::get_name_UUID(bytes{});
|
||||
@@ -2095,24 +2086,21 @@ database::database(const db::config& cfg, database_config dbcfg)
|
||||
, _dirty_memory_manager(*this, memory::stats().total_memory() * 0.45, cfg.virtual_dirty_soft_limit())
|
||||
, _streaming_dirty_memory_manager(*this, memory::stats().total_memory() * 0.10, cfg.virtual_dirty_soft_limit())
|
||||
, _dbcfg(dbcfg)
|
||||
, _memtable_cpu_controller(make_flush_controller(*_cfg, dbcfg.memtable_scheduling_group, [this, limit = float(_dirty_memory_manager.throttle_threshold())] {
|
||||
, _memtable_controller(make_flush_controller(*_cfg, dbcfg.memtable_scheduling_group, service::get_local_memtable_flush_priority(), [this, limit = float(_dirty_memory_manager.throttle_threshold())] {
|
||||
return (_dirty_memory_manager.virtual_dirty_memory()) / limit;
|
||||
}))
|
||||
, _data_query_stage("data_query", _dbcfg.query_scheduling_group, &column_family::query)
|
||||
, _version(empty_version)
|
||||
, _compaction_manager(std::make_unique<compaction_manager>(dbcfg.compaction_scheduling_group))
|
||||
, _enable_incremental_backups(cfg.incremental_backups())
|
||||
, _flush_io_controller(make_flush_controller(*_cfg, service::get_local_memtable_flush_priority(), [this, limit = float(_dirty_memory_manager.throttle_threshold())] {
|
||||
return _dirty_memory_manager.virtual_dirty_memory() / limit;
|
||||
}))
|
||||
, _compaction_io_controller(make_compaction_controller(*_cfg, service::get_local_compaction_priority(), [this] () -> float {
|
||||
, _compaction_controller(make_compaction_controller(*_cfg, dbcfg.compaction_scheduling_group, service::get_local_compaction_priority(), [this] () -> float {
|
||||
auto backlog = _compaction_manager->backlog();
|
||||
// This means we are using an unimplemented strategy
|
||||
if (std::isinf(backlog)) {
|
||||
// returning the normalization factor means that we'll return the maximum
|
||||
// output in the _control_points. We can get rid of this when we implement
|
||||
// all strategies.
|
||||
return compaction_io_controller::normalization_factor;
|
||||
return compaction_controller::normalization_factor;
|
||||
}
|
||||
return _compaction_manager->backlog() / memory::stats().total_memory();
|
||||
}))
|
||||
@@ -2139,12 +2127,8 @@ void backlog_controller::adjust() {
|
||||
update_controller(result);
|
||||
}
|
||||
|
||||
void backlog_cpu_controller::update_controller(float shares) {
|
||||
_current_shares = shares;
|
||||
_scheduling_group.set_shares(_current_shares);
|
||||
}
|
||||
|
||||
void backlog_io_controller::update_controller(float shares) {
|
||||
void backlog_controller::update_controller(float shares) {
|
||||
_scheduling_group.set_shares(shares);
|
||||
if (!_inflight_update.available()) {
|
||||
return; // next timer will fix it
|
||||
}
|
||||
@@ -3633,9 +3617,9 @@ database::stop() {
|
||||
}).then([this] {
|
||||
return _streaming_dirty_memory_manager.shutdown();
|
||||
}).then([this] {
|
||||
return _flush_io_controller.shutdown();
|
||||
return _memtable_controller.shutdown();
|
||||
}).then([this] {
|
||||
return _compaction_io_controller.shutdown();
|
||||
return _compaction_controller.shutdown();
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -1095,7 +1095,7 @@ private:
|
||||
dirty_memory_manager _streaming_dirty_memory_manager;
|
||||
|
||||
database_config _dbcfg;
|
||||
flush_cpu_controller _memtable_cpu_controller;
|
||||
flush_controller _memtable_controller;
|
||||
|
||||
db::timeout_semaphore _read_concurrency_sem{max_memory_concurrent_reads()};
|
||||
db::timeout_semaphore _streaming_concurrency_sem{max_memory_streaming_concurrent_reads()};
|
||||
@@ -1116,8 +1116,7 @@ private:
|
||||
seastar::metrics::metric_groups _metrics;
|
||||
bool _enable_incremental_backups = false;
|
||||
|
||||
flush_io_controller _flush_io_controller;
|
||||
compaction_io_controller _compaction_io_controller;
|
||||
compaction_controller _compaction_controller;
|
||||
future<> init_commitlog();
|
||||
future<> apply_in_memory(const frozen_mutation& m, schema_ptr m_schema, db::rp_handle&&, db::timeout_clock::time_point timeout);
|
||||
future<> apply_in_memory(const mutation& m, column_family& cf, db::rp_handle&&, db::timeout_clock::time_point timeout);
|
||||
|
||||
Reference in New Issue
Block a user