mutation_reader: make restricting_mutation_reader even more restricting

While limiting the number of concurrently executing sstable readers reduces
our memory load, the queued readers, although consuming a small amount of
memory, can still grow without bounds.

To limit the damage, add two limits on the queue:
 - a timeout, which is equal to the read timeout
 - a queue length limit, which is equal to 2% of the shard memory divided
   by an estimate of the queued request size (1kb)

Together, these limits bound the amount of memory needed by queued disk
requests in case the disk can't keep up.
Message-Id: <1467206055-30769-1-git-send-email-avi@scylladb.com>
This commit is contained in:
Avi Kivity
2016-06-29 16:14:15 +03:00
committed by Tomasz Grabiec
parent 85cb2a6d35
commit 9ac730dcc9
5 changed files with 56 additions and 16 deletions

View File

@@ -258,8 +258,8 @@ column_family::make_sstable_reader(schema_ptr s,
const io_priority_class& pc) const {
// restricts a reader's concurrency if the configuration specifies it
auto restrict_reader = [&] (mutation_reader&& in) {
if (_config.read_concurrency_sem) {
return make_restricted_reader(*_config.read_concurrency_sem, 1, std::move(in));
if (_config.read_concurrency_config.sem) {
return make_restricted_reader(_config.read_concurrency_config, 1, std::move(in));
} else {
return std::move(in);
}
@@ -1373,6 +1373,13 @@ database::setup_collectd() {
, scollectd::make_typed(scollectd::data_type::DERIVE, _stats->total_reads)
));
_collectd.push_back(
scollectd::add_polled_metric(scollectd::type_instance_id("database"
, scollectd::per_cpu_plugin_instance
, "total_operations", "sstable_read_queue_overloads")
, scollectd::make_typed(scollectd::data_type::COUNTER, _stats->sstable_read_queue_overloaded)
));
_collectd.push_back(
scollectd::add_polled_metric(scollectd::type_instance_id("database"
, scollectd::per_cpu_plugin_instance
@@ -1766,7 +1773,7 @@ keyspace::make_column_family_config(const schema& s) const {
cfg.max_streaming_memtable_size = _config.max_streaming_memtable_size;
cfg.dirty_memory_region_group = _config.dirty_memory_region_group;
cfg.streaming_dirty_memory_region_group = _config.streaming_dirty_memory_region_group;
cfg.read_concurrency_sem = _config.read_concurreny_sem;
cfg.read_concurrency_config = _config.read_concurrency_config;
cfg.cf_stats = _config.cf_stats;
cfg.enable_incremental_backups = _config.enable_incremental_backups;
@@ -2228,7 +2235,14 @@ database::make_keyspace_config(const keyspace_metadata& ksm) {
}
cfg.dirty_memory_region_group = &_dirty_memory_region_group;
cfg.streaming_dirty_memory_region_group = &_streaming_dirty_memory_region_group;
cfg.read_concurreny_sem = &_read_concurrency_sem;
cfg.read_concurrency_config.sem = &_read_concurrency_sem;
cfg.read_concurrency_config.timeout = _cfg->read_request_timeout_in_ms() * 1ms;
// Assume a queued read takes up 1kB of memory, and allow 2% of memory to be filled up with such reads.
cfg.read_concurrency_config.max_queue_length = memory::stats().total_memory() * 0.02 / 1000;
cfg.read_concurrency_config.raise_queue_overloaded_exception = [this] {
++_stats->sstable_read_queue_overloaded;
throw std::runtime_error("sstable inactive read queue overloaded");
};
cfg.cf_stats = &_cf_stats;
cfg.enable_incremental_backups = _enable_incremental_backups;
return cfg;

View File

@@ -263,7 +263,7 @@ public:
size_t max_streaming_memtable_size = 5'000'000;
logalloc::region_group* dirty_memory_region_group = nullptr;
logalloc::region_group* streaming_dirty_memory_region_group = nullptr;
semaphore* read_concurrency_sem = nullptr;
restricted_mutation_reader_config read_concurrency_config;
::cf_stats* cf_stats = nullptr;
};
struct no_commitlog {};
@@ -790,7 +790,7 @@ public:
size_t max_streaming_memtable_size = 5'000'000;
logalloc::region_group* dirty_memory_region_group = nullptr;
logalloc::region_group* streaming_dirty_memory_region_group = nullptr;
semaphore* read_concurreny_sem = nullptr;
restricted_mutation_reader_config read_concurrency_config;
::cf_stats* cf_stats = nullptr;
};
private:
@@ -875,6 +875,7 @@ class database {
struct db_stats {
uint64_t total_writes = 0;
uint64_t total_reads = 0;
uint64_t sstable_read_queue_overloaded = 0;
};
lw_shared_ptr<db_stats> _stats;
@@ -885,7 +886,9 @@ class database {
logalloc::region_group _dirty_memory_region_group;
logalloc::region_group _streaming_dirty_memory_region_group;
semaphore _read_concurrency_sem{max_concurrent_reads()};
restricted_mutation_reader_config _read_concurrency_config;
semaphore _system_read_concurrency_sem{max_system_concurrent_reads()};
restricted_mutation_reader_config _system_read_concurrency_config;
std::unordered_map<sstring, keyspace> _keyspaces;
std::unordered_map<utils::UUID, lw_shared_ptr<column_family>> _column_families;

View File

@@ -1023,7 +1023,9 @@ void make(database& db, bool durable, bool volatile_testing_only) {
kscfg.enable_commitlog = !volatile_testing_only;
kscfg.enable_cache = true;
// don't make system keyspace reads wait for user reads
kscfg.read_concurreny_sem = &db.system_keyspace_read_concurrency_sem();
kscfg.read_concurrency_config.sem = &db.system_keyspace_read_concurrency_sem();
kscfg.read_concurrency_config.timeout = {};
kscfg.read_concurrency_config.max_queue_length = std::numeric_limits<size_t>::max();
keyspace _ks{ksm, std::move(kscfg)};
auto rs(locator::abstract_replication_strategy::create_replication_strategy(NAME, "LocalStrategy", service::get_local_storage_service().get_token_metadata(), ksm->strategy_options()));
_ks.set_replication_strategy(std::move(rs));

View File

@@ -192,17 +192,20 @@ mutation_reader make_empty_reader() {
class restricting_mutation_reader : public mutation_reader::impl {
semaphore& _sem;
const restricted_mutation_reader_config& _config;
unsigned _weight = 0;
bool _waited = false;
mutation_reader _base;
public:
restricting_mutation_reader(semaphore& sem, unsigned weight, mutation_reader&& base)
: _sem(sem), _weight(weight), _base(std::move(base)) {
restricting_mutation_reader(const restricted_mutation_reader_config& config, unsigned weight, mutation_reader&& base)
: _config(config), _weight(weight), _base(std::move(base)) {
if (_config.sem->waiters() >= _config.max_queue_length) {
_config.raise_queue_overloaded_exception();
}
}
~restricting_mutation_reader() {
if (_waited) {
_sem.signal(_weight);
_config.sem->signal(_weight);
}
}
future<streamed_mutation_opt> operator()() override {
@@ -211,7 +214,10 @@ public:
if (_waited) {
return _base();
}
return _sem.wait(_weight).then([this] {
auto waited = _config.timeout.count() != 0
? _config.sem->wait(_config.timeout, _weight)
: _config.sem->wait(_weight);
return waited.then([this] {
_waited = true;
return _base();
});
@@ -219,6 +225,6 @@ public:
};
mutation_reader
make_restricted_reader(semaphore& sem, unsigned weight, mutation_reader&& base) {
return make_mutation_reader<restricting_mutation_reader>(sem, weight, std::move(base));
make_restricted_reader(const restricted_mutation_reader_config& config, unsigned weight, mutation_reader&& base) {
return make_mutation_reader<restricting_mutation_reader>(config, weight, std::move(base));
}

View File

@@ -83,8 +83,23 @@ mutation_reader make_reader_returning_many(std::vector<mutation>,
query::clustering_key_filtering_context filter = query::no_clustering_key_filtering);
mutation_reader make_reader_returning_many(std::vector<streamed_mutation>);
mutation_reader make_empty_reader();
// Restricts a given `mutation_reader` to a concurrency limited by a `semaphore`.
mutation_reader make_restricted_reader(semaphore& sem, unsigned weight, mutation_reader&& base);
struct restricted_mutation_reader_config {
semaphore* sem = nullptr;
std::chrono::nanoseconds timeout = {};
size_t max_queue_length = std::numeric_limits<size_t>::max();
std::function<void ()> raise_queue_overloaded_exception = default_raise_queue_overloaded_exception;
static void default_raise_queue_overloaded_exception() {
throw std::runtime_error("restricted mutation reader queue overload");
}
};
// Restricts a given `mutation_reader` to a concurrency limited according to settings in
// a restricted_mutation_reader_config. These settings include a semaphore for limiting the number
// of active concurrent readers, a timeout for inactive readers, and a maximum queue size for
// inactive readers.
mutation_reader make_restricted_reader(const restricted_mutation_reader_config& config, unsigned weight, mutation_reader&& base);
/*
template<typename T>