Merge "restrict background writers with scheduling groups" from Glauber

"This patchset restricts background writers - such as compactions,
streaming flushes and memtable flushes to a maximum amount of CPU usage
through a seastar::thread_scheduling_group.

The said maximum is recommended to be set  50 % - it is default
disabled, but can be adjusted through a configuration option until we
are able to auto-tune this.

The second patch in this series provides a preview on how such auto-tune
would look like. By implementing a simple controller we automatically
adjust the quota for the memtable writer processes, so that the rate at
which bytes come in is equal to the rates at which bytes are flushed.

Tail latencies are greatly reduced by this series, and heavy spikes that
previously appeared on CPU-bound workloads are no more."

* 'memtable-controller-v5' of https://github.com/glommer/scylla:
  simple controller for memtable/streaming writer shares.
  restrict background writers to 50 % of CPU.

(cherry picked from commit c5ee62a6a4)
This commit is contained in:
Avi Kivity
2017-07-20 10:58:53 +03:00
parent 83cc640c6a
commit 0291a4491e
9 changed files with 200 additions and 22 deletions

89
cpu_controller.hh Normal file
View File

@@ -0,0 +1,89 @@
/*
* Copyright (C) 2017 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include <seastar/core/thread.hh>
#include <seastar/core/timer.hh>
#include <chrono>
// Simple proportional controller to adjust shares of memtable/streaming flushes.
//
// Goal is to flush as fast as we can, but not so fast that we steal all the CPU from incoming
// requests, and at the same time minimize user-visible fluctuations in the flush quota.
//
// What that translates to is we'll try to keep virtual dirty's firt derivative at 0 (IOW, we keep
// virtual dirty constant), which means that the rate of incoming writes is equal to the rate of
// flushed bytes.
//
// The exact point at which the controller stops determines the desired flush CPU usage. As we
// approach the hard dirty limit, we need to be more aggressive. We will therefore define two
// thresholds, and increase the constant as we cross them.
//
// 1) the soft limit line
// 2) halfway between soft limit and dirty limit
//
// The constants q1 and q2 are used to determine the proportional factor at each stage.
//
// Below the soft limit, we are in no particular hurry to flush, since it means we're set to
// complete flushing before we a new memtable is ready. The quota is dirty * q1, and q1 is set to a
// low number.
//
// The first half of the virtual dirty region is where we expect to be usually, so we have a low
// slope corresponding to a sluggish response between q1 * soft_limit and q2.
//
// In the second half, we're getting close to the hard dirty limit so we increase the slope and
// become more responsive, up to a maximum quota of qmax.
//
// For now we'll just set them in the structure not to complicate the constructor. But q1, q2 and
// qmax can easily become parameters if we find another user.
class flush_cpu_controller {
static constexpr float hard_dirty_limit = 0.50;
static constexpr float q1 = 0.01;
static constexpr float q2 = 0.2;
static constexpr float qmax = 1;
float _current_quota = 0.0f;
float _goal;
std::function<float()> _current_dirty;
std::chrono::milliseconds _interval;
timer<> _update_timer;
seastar::thread_scheduling_group _scheduling_group;
seastar::thread_scheduling_group *_current_scheduling_group = nullptr;
void adjust();
public:
seastar::thread_scheduling_group* scheduling_group() {
return _current_scheduling_group;
}
float current_quota() const {
return _current_quota;
}
struct disabled {
seastar::thread_scheduling_group *backup;
};
flush_cpu_controller(disabled d) : _scheduling_group(std::chrono::nanoseconds(0), 0), _current_scheduling_group(d.backup) {}
flush_cpu_controller(std::chrono::milliseconds interval, float soft_limit, std::function<float()> current_dirty);
flush_cpu_controller(flush_cpu_controller&&) = default;
};

View File

@@ -870,7 +870,7 @@ column_family::seal_active_streaming_memtable_immediate() {
//
// Lastly, we don't have any commitlog RP to update, and we don't need to deal manipulate the
// memtable list, since this memtable was not available for reading up until this point.
return write_memtable_to_sstable(*old, newtab, incremental_backups_enabled(), priority).then([this, newtab, old] {
return write_memtable_to_sstable(*old, newtab, incremental_backups_enabled(), priority, false, _config.background_writer_scheduling_group).then([this, newtab, old] {
return newtab->open_data();
}).then([this, old, newtab] () {
add_sstable(newtab, {engine().cpu_id()});
@@ -917,7 +917,7 @@ future<> column_family::seal_active_streaming_memtable_big(streaming_memtable_bi
newtab->set_unshared();
auto&& priority = service::get_local_streaming_write_priority();
return write_memtable_to_sstable(*old, newtab, incremental_backups_enabled(), priority, true).then([this, newtab, old, &smb] {
return write_memtable_to_sstable(*old, newtab, incremental_backups_enabled(), priority, true, _config.background_writer_scheduling_group).then([this, newtab, old, &smb] {
smb.sstables.emplace_back(newtab);
}).handle_exception([] (auto ep) {
dblog.error("failed to write streamed sstable: {}", ep);
@@ -989,7 +989,7 @@ column_family::try_flush_memtable_to_sstable(lw_shared_ptr<memtable> old) {
// The code as is guarantees that we'll never partially backup a
// single sstable, so that is enough of a guarantee.
auto&& priority = service::get_local_memtable_flush_priority();
return write_memtable_to_sstable(*old, newtab, incremental_backups_enabled(), priority).then([this, newtab, old] {
return write_memtable_to_sstable(*old, newtab, incremental_backups_enabled(), priority, false, _config.memtable_scheduling_group).then([this, newtab, old] {
return newtab->open_data();
}).then_wrapped([this, old, newtab] (future<> ret) {
dblog.debug("Flushing to {} done", newtab->get_filename());
@@ -1342,7 +1342,7 @@ column_family::compact_sstables(sstables::compaction_descriptor descriptor, bool
return sst;
};
return sstables::compact_sstables(*sstables_to_compact, *this, create_sstable, descriptor.max_sstable_bytes, descriptor.level,
cleanup).then([this, sstables_to_compact] (auto new_sstables) {
cleanup, _config.background_writer_scheduling_group).then([this, sstables_to_compact] (auto new_sstables) {
_compaction_strategy.notify_completion(*sstables_to_compact, new_sstables);
return this->rebuild_sstable_list(new_sstables, *sstables_to_compact);
});
@@ -1709,7 +1709,7 @@ void distributed_loader::reshard(distributed<database>& db, sstring ks_name, sst
gc_clock::now(), default_io_error_handler_gen());
return sst;
};
auto f = sstables::reshard_sstables(sstables, *cf, creator, max_sstable_bytes, level);
auto f = sstables::reshard_sstables(sstables, *cf, creator, max_sstable_bytes, level, cf->background_writer_scheduling_group());
return f.then([&cf, sstables = std::move(sstables)] (std::vector<sstables::shared_sstable> new_sstables) mutable {
// an input sstable may belong to shard 1 and 2 and only have data which
@@ -1965,6 +1965,15 @@ future<> distributed_loader::populate_column_family(distributed<database>& db, s
}
inline
flush_cpu_controller
make_flush_cpu_controller(db::config& cfg, seastar::thread_scheduling_group* backup, std::function<double()> fn) {
if (cfg.auto_adjust_flush_quota()) {
return flush_cpu_controller(250ms, cfg.virtual_dirty_soft_limit(), std::move(fn));
}
return flush_cpu_controller(flush_cpu_controller::disabled{backup});
}
utils::UUID database::empty_version = utils::UUID_gen::get_name_UUID(bytes{});
database::database() : database(db::config())
@@ -1978,6 +1987,10 @@ database::database(const db::config& cfg)
, _system_dirty_memory_manager(*this, 10 << 20, cfg.virtual_dirty_soft_limit())
, _dirty_memory_manager(*this, memory::stats().total_memory() * 0.45, cfg.virtual_dirty_soft_limit())
, _streaming_dirty_memory_manager(*this, memory::stats().total_memory() * 0.10, cfg.virtual_dirty_soft_limit())
, _background_writer_scheduling_group(1ms, _cfg->background_writer_scheduling_quota())
, _memtable_cpu_controller(make_flush_cpu_controller(*_cfg, &_background_writer_scheduling_group, [this, limit = 2.0f * _dirty_memory_manager.throttle_threshold()] {
return (_dirty_memory_manager.virtual_dirty_memory()) / limit;
}))
, _version(empty_version)
, _enable_incremental_backups(cfg.incremental_backups())
{
@@ -1987,6 +2000,32 @@ database::database(const db::config& cfg)
dblog.info("Row: max_vector_size: {}, internal_count: {}", size_t(row::max_vector_size), size_t(row::internal_count));
}
void flush_cpu_controller::adjust() {
auto mid = _goal + (hard_dirty_limit - _goal) / 2;
auto dirty = _current_dirty();
if (dirty < _goal) {
_current_quota = dirty * q1 / _goal;
} else if ((dirty >= _goal) && (dirty < mid)) {
_current_quota = q1 + (dirty - _goal) * (q2 - q1)/(mid - _goal);
} else {
_current_quota = q2 + (dirty - mid) * (qmax - q2) / (hard_dirty_limit - mid);
}
dblog.trace("dirty {}, goal {}, mid {} quota {}", dirty, _goal, mid, _current_quota);
_scheduling_group.update_usage(_current_quota);
}
flush_cpu_controller::flush_cpu_controller(std::chrono::milliseconds interval, float soft_limit, std::function<float()> current_dirty)
: _goal(soft_limit / 2)
, _current_dirty(std::move(current_dirty))
, _interval(interval)
, _update_timer([this] { adjust(); })
, _scheduling_group(1ms, 0.0f)
, _current_scheduling_group(&_scheduling_group)
{
_update_timer.arm_periodic(_interval);
}
void
dirty_memory_manager::setup_collectd(sstring namestr) {
@@ -2095,6 +2134,9 @@ database::setup_metrics() {
sm::make_gauge("total_result_bytes", [this] { return get_result_memory_limiter().total_used_memory(); },
sm::description("Holds the current amount of memory used for results.")),
sm::make_gauge("cpu_flush_quota", [this] { return _memtable_cpu_controller.current_quota(); },
sm::description("The current quota for memtable CPU scheduling group")),
sm::make_derive("short_data_queries", _stats->short_data_queries,
sm::description("The rate of data queries (data or digest reads) that returned less rows than requested due to result size limiting.")),
@@ -2565,6 +2607,8 @@ keyspace::make_column_family_config(const schema& s, const db::config& db_config
cfg.streaming_read_concurrency_config = _config.streaming_read_concurrency_config;
cfg.cf_stats = _config.cf_stats;
cfg.enable_incremental_backups = _config.enable_incremental_backups;
cfg.background_writer_scheduling_group = _config.background_writer_scheduling_group;
cfg.memtable_scheduling_group = _config.memtable_scheduling_group;
return cfg;
}
@@ -3293,6 +3337,12 @@ database::make_keyspace_config(const keyspace_metadata& ksm) {
cfg.streaming_read_concurrency_config.timeout = {};
cfg.cf_stats = &_cf_stats;
cfg.enable_incremental_backups = _enable_incremental_backups;
if (_cfg->background_writer_scheduling_quota() < 1.0f) {
cfg.background_writer_scheduling_group = &_background_writer_scheduling_group;
cfg.memtable_scheduling_group = _memtable_cpu_controller.scheduling_group();
}
return cfg;
}
@@ -4062,11 +4112,12 @@ void column_family::drop_hit_rate(gms::inet_address addr) {
}
future<>
write_memtable_to_sstable(memtable& mt, sstables::shared_sstable sst, bool backup, const io_priority_class& pc, bool leave_unsealed) {
write_memtable_to_sstable(memtable& mt, sstables::shared_sstable sst, bool backup, const io_priority_class& pc, bool leave_unsealed, seastar::thread_scheduling_group *tsg) {
sstables::sstable_writer_config cfg;
cfg.replay_position = mt.replay_position();
cfg.backup = backup;
cfg.leave_unsealed = leave_unsealed;
cfg.thread_scheduling_group = tsg;
return sst->write_components(mt.make_flush_reader(mt.schema(), pc), mt.partition_count(), mt.schema(), cfg, pc);
}

View File

@@ -78,6 +78,7 @@
#include "db/view/view.hh"
#include "lister.hh"
#include "utils/phased_barrier.hh"
#include "cpu_controller.hh"
class cell_locker;
class cell_locker_stats;
@@ -430,6 +431,8 @@ public:
restricted_mutation_reader_config read_concurrency_config;
restricted_mutation_reader_config streaming_read_concurrency_config;
::cf_stats* cf_stats = nullptr;
seastar::thread_scheduling_group* background_writer_scheduling_group = nullptr;
seastar::thread_scheduling_group* memtable_scheduling_group = nullptr;
};
struct no_commitlog {};
struct stats {
@@ -855,6 +858,10 @@ public:
return _config.cf_stats;
}
seastar::thread_scheduling_group* background_writer_scheduling_group() {
return _config.background_writer_scheduling_group;
}
compaction_manager& get_compaction_manager() const {
return _compaction_manager;
}
@@ -1063,6 +1070,8 @@ public:
restricted_mutation_reader_config read_concurrency_config;
restricted_mutation_reader_config streaming_read_concurrency_config;
::cf_stats* cf_stats = nullptr;
seastar::thread_scheduling_group* background_writer_scheduling_group = nullptr;
seastar::thread_scheduling_group* memtable_scheduling_group = nullptr;
};
private:
std::unique_ptr<locator::abstract_replication_strategy> _replication_strategy;
@@ -1168,6 +1177,9 @@ private:
dirty_memory_manager _dirty_memory_manager;
dirty_memory_manager _streaming_dirty_memory_manager;
seastar::thread_scheduling_group _background_writer_scheduling_group;
flush_cpu_controller _memtable_cpu_controller;
semaphore _read_concurrency_sem{max_concurrent_reads()};
restricted_mutation_reader_config _read_concurrency_config;
semaphore _system_read_concurrency_sem{max_system_concurrent_reads()};

View File

@@ -166,6 +166,12 @@ public:
*/
#define _make_config_values(val) \
val(background_writer_scheduling_quota, double, 1.0, Used, \
"max cpu usage ratio (between 0 and 1) for compaction process. Not intended for setting in normal operations. Setting it to 1 or higher will disable it, recommended operational setting is 0.5." \
) \
val(auto_adjust_flush_quota, bool, false, Used, \
"true: auto-adjust quota for flush processes. false: put everyone together in the static background writer group - if background writer group is enabled. Not intended for setting in normal operations" \
) \
/* Initialization properties */ \
/* The minimal properties needed for configuring a cluster. */ \
val(cluster_name, sstring, "Test Cluster", Used, \

View File

@@ -29,11 +29,13 @@
#include "sstables/sstables.hh"
#include <seastar/core/future.hh>
#include <seastar/core/file.hh>
#include <seastar/core/thread.hh>
future<>
write_memtable_to_sstable(memtable& mt,
sstables::shared_sstable sst,
bool backup = false,
const io_priority_class& pc = default_priority_class(),
bool leave_unsealed = false);
bool leave_unsealed = false,
seastar::thread_scheduling_group* tsg = nullptr);

View File

@@ -174,12 +174,14 @@ protected:
uint64_t _estimated_partitions = 0;
std::vector<unsigned long> _ancestors;
db::replay_position _rp;
seastar::thread_scheduling_group* _tsg;
protected:
compaction(column_family& cf, std::vector<shared_sstable> sstables, uint64_t max_sstable_size, uint32_t sstable_level)
compaction(column_family& cf, std::vector<shared_sstable> sstables, uint64_t max_sstable_size, uint32_t sstable_level, seastar::thread_scheduling_group* tsg)
: _cf(cf)
, _sstables(std::move(sstables))
, _max_sstable_size(max_sstable_size)
, _sstable_level(sstable_level)
, _tsg(tsg)
{
_cf.get_compaction_manager().register_compaction(_info);
}
@@ -211,6 +213,12 @@ public:
virtual ~compaction() {
_cf.get_compaction_manager().deregister_compaction(_info);
}
seastar::thread_attributes thread_attributes() {
seastar::thread_attributes attr;
attr.scheduling_group = _tsg;
return attr;
}
private:
::mutation_reader setup() {
std::vector<::mutation_reader> readers;
@@ -339,8 +347,8 @@ class regular_compaction : public compaction {
stdx::optional<sstable_writer> _writer;
public:
regular_compaction(column_family& cf, std::vector<shared_sstable> sstables, std::function<shared_sstable()> creator,
uint64_t max_sstable_size, uint32_t sstable_level)
: compaction(cf, std::move(sstables), max_sstable_size, sstable_level)
uint64_t max_sstable_size, uint32_t sstable_level, seastar::thread_scheduling_group* tsg)
: compaction(cf, std::move(sstables), max_sstable_size, sstable_level, tsg)
, _creator(std::move(creator))
, _set(cf.get_sstable_set())
, _selector(_set.make_incremental_selector())
@@ -407,8 +415,8 @@ public:
class cleanup_compaction final : public regular_compaction {
public:
cleanup_compaction(column_family& cf, std::vector<shared_sstable> sstables, std::function<shared_sstable()> creator,
uint64_t max_sstable_size, uint32_t sstable_level)
: regular_compaction(cf, std::move(sstables), std::move(creator), max_sstable_size, sstable_level)
uint64_t max_sstable_size, uint32_t sstable_level, seastar::thread_scheduling_group* tsg)
: regular_compaction(cf, std::move(sstables), std::move(creator), max_sstable_size, sstable_level, tsg)
{
_info->type = compaction_type::Cleanup;
}
@@ -444,8 +452,8 @@ class resharding_compaction final : public compaction {
std::function<shared_sstable(shard_id)> _sstable_creator;
public:
resharding_compaction(std::vector<shared_sstable> sstables, column_family& cf, std::function<shared_sstable(shard_id)> creator,
uint64_t max_sstable_size, uint32_t sstable_level)
: compaction(cf, std::move(sstables), max_sstable_size, sstable_level)
uint64_t max_sstable_size, uint32_t sstable_level, seastar::thread_scheduling_group* tsg)
: compaction(cf, std::move(sstables), max_sstable_size, sstable_level, tsg)
, _output_sstables(smp::count)
, _sstable_creator(std::move(creator))
{
@@ -494,7 +502,8 @@ public:
};
future<std::vector<shared_sstable>> compaction::run(std::unique_ptr<compaction> c) {
return seastar::async([c = std::move(c)] () mutable {
auto attr = c->thread_attributes();
return seastar::async(std::move(attr), [c = std::move(c)] () mutable {
auto reader = c->setup();
auto cr = c->get_compacting_sstable_writer();
@@ -527,21 +536,21 @@ static std::unique_ptr<compaction> make_compaction(bool cleanup, Params&&... par
future<std::vector<shared_sstable>>
compact_sstables(std::vector<shared_sstable> sstables, column_family& cf, std::function<shared_sstable()> creator,
uint64_t max_sstable_size, uint32_t sstable_level, bool cleanup) {
uint64_t max_sstable_size, uint32_t sstable_level, bool cleanup, seastar::thread_scheduling_group *tsg) {
if (sstables.empty()) {
throw std::runtime_error(sprint("Called compaction with empty set on behalf of {}.{}", cf.schema()->ks_name(), cf.schema()->cf_name()));
}
auto c = make_compaction(cleanup, cf, std::move(sstables), std::move(creator), max_sstable_size, sstable_level);
auto c = make_compaction(cleanup, cf, std::move(sstables), std::move(creator), max_sstable_size, sstable_level, tsg);
return compaction::run(std::move(c));
}
future<std::vector<shared_sstable>>
reshard_sstables(std::vector<shared_sstable> sstables, column_family& cf, std::function<shared_sstable(shard_id)> creator,
uint64_t max_sstable_size, uint32_t sstable_level) {
uint64_t max_sstable_size, uint32_t sstable_level, seastar::thread_scheduling_group* tsg) {
if (sstables.empty()) {
throw std::runtime_error(sprint("Called resharding with empty set on behalf of {}.{}", cf.schema()->ks_name(), cf.schema()->cf_name()));
}
auto c = std::make_unique<resharding_compaction>(std::move(sstables), cf, std::move(creator), max_sstable_size, sstable_level);
auto c = std::make_unique<resharding_compaction>(std::move(sstables), cf, std::move(creator), max_sstable_size, sstable_level, tsg);
return compaction::run(std::move(c));
}

View File

@@ -112,13 +112,15 @@ namespace sstables {
// cleaning operation, and compaction history will not be updated.
future<std::vector<shared_sstable>> compact_sstables(std::vector<shared_sstable> sstables,
column_family& cf, std::function<shared_sstable()> creator,
uint64_t max_sstable_size, uint32_t sstable_level, bool cleanup = false);
uint64_t max_sstable_size, uint32_t sstable_level, bool cleanup = false,
seastar::thread_scheduling_group* tsg = nullptr);
// Compacts a set of N shared sstables into M sstables. For every shard involved,
// i.e. which owns any of the sstables, a new unshared sstable is created.
future<std::vector<shared_sstable>> reshard_sstables(std::vector<shared_sstable> sstables,
column_family& cf, std::function<shared_sstable(shard_id)> creator,
uint64_t max_sstable_size, uint32_t sstable_level);
uint64_t max_sstable_size, uint32_t sstable_level,
seastar::thread_scheduling_group* tsg = nullptr);
// Return the most interesting bucket applying the size-tiered strategy.
std::vector<sstables::shared_sstable>

View File

@@ -2090,7 +2090,9 @@ future<> sstable::write_components(::mutation_reader mr,
if (cfg.replay_position) {
_collector.set_replay_position(cfg.replay_position.value());
}
return seastar::async([this, mr = std::move(mr), estimated_partitions, schema = std::move(schema), cfg, &pc] () mutable {
seastar::thread_attributes attr;
attr.scheduling_group = cfg.thread_scheduling_group;
return seastar::async(std::move(attr), [this, mr = std::move(mr), estimated_partitions, schema = std::move(schema), cfg, &pc] () mutable {
auto wr = get_writer(*schema, estimated_partitions, cfg, pc);
consume_flattened_in_thread(mr, wr);
});

View File

@@ -53,6 +53,10 @@
#include "sstables/shared_index_lists.hh"
#include "db/commitlog/replay_position.hh"
namespace seastar {
class thread_scheduling_group;
}
namespace sstables {
extern logging::logger sstlog;
@@ -131,6 +135,7 @@ struct sstable_writer_config {
bool backup = false;
bool leave_unsealed = false;
stdx::optional<db::replay_position> replay_position;
seastar::thread_scheduling_group* thread_scheduling_group = nullptr;
};
class sstable : public enable_lw_shared_from_this<sstable> {