From 9ac730dcc985cd4083bd4705ecf1219fac4002f1 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Wed, 29 Jun 2016 16:14:15 +0300 Subject: [PATCH] mutation_reader: make restricting_mutation_reader even more restricting While limiting the number of concurrently executing sstable readers reduces our memory load, the queued readers, although consuming a small amount of memory, can still grow without bounds. To limit the damage, add two limits on the queue: - a timeout, which is equal to the read timeout - a queue length limit, which is equal to 2% of the shard memory divided by an estimate of the queued request size (1kb) Together, these limits bound the amount of memory needed by queued disk requests in case the disk can't keep up. Message-Id: <1467206055-30769-1-git-send-email-avi@scylladb.com> --- database.cc | 22 ++++++++++++++++++---- database.hh | 7 +++++-- db/system_keyspace.cc | 4 +++- mutation_reader.cc | 20 +++++++++++++------- mutation_reader.hh | 19 +++++++++++++++++-- 5 files changed, 56 insertions(+), 16 deletions(-) diff --git a/database.cc b/database.cc index 15d51efbd3..abfd5ea27e 100644 --- a/database.cc +++ b/database.cc @@ -258,8 +258,8 @@ column_family::make_sstable_reader(schema_ptr s, const io_priority_class& pc) const { // restricts a reader's concurrency if the configuration specifies it auto restrict_reader = [&] (mutation_reader&& in) { - if (_config.read_concurrency_sem) { - return make_restricted_reader(*_config.read_concurrency_sem, 1, std::move(in)); + if (_config.read_concurrency_config.sem) { + return make_restricted_reader(_config.read_concurrency_config, 1, std::move(in)); } else { return std::move(in); } @@ -1373,6 +1373,13 @@ database::setup_collectd() { , scollectd::make_typed(scollectd::data_type::DERIVE, _stats->total_reads) )); + _collectd.push_back( + scollectd::add_polled_metric(scollectd::type_instance_id("database" + , scollectd::per_cpu_plugin_instance + , "total_operations", "sstable_read_queue_overloads") + , scollectd::make_typed(scollectd::data_type::COUNTER, _stats->sstable_read_queue_overloaded) + )); + _collectd.push_back( scollectd::add_polled_metric(scollectd::type_instance_id("database" , scollectd::per_cpu_plugin_instance @@ -1766,7 +1773,7 @@ keyspace::make_column_family_config(const schema& s) const { cfg.max_streaming_memtable_size = _config.max_streaming_memtable_size; cfg.dirty_memory_region_group = _config.dirty_memory_region_group; cfg.streaming_dirty_memory_region_group = _config.streaming_dirty_memory_region_group; - cfg.read_concurrency_sem = _config.read_concurreny_sem; + cfg.read_concurrency_config = _config.read_concurrency_config; cfg.cf_stats = _config.cf_stats; cfg.enable_incremental_backups = _config.enable_incremental_backups; @@ -2228,7 +2235,14 @@ database::make_keyspace_config(const keyspace_metadata& ksm) { } cfg.dirty_memory_region_group = &_dirty_memory_region_group; cfg.streaming_dirty_memory_region_group = &_streaming_dirty_memory_region_group; - cfg.read_concurreny_sem = &_read_concurrency_sem; + cfg.read_concurrency_config.sem = &_read_concurrency_sem; + cfg.read_concurrency_config.timeout = _cfg->read_request_timeout_in_ms() * 1ms; + // Assume a queued read takes up 1kB of memory, and allow 2% of memory to be filled up with such reads. + cfg.read_concurrency_config.max_queue_length = memory::stats().total_memory() * 0.02 / 1000; + cfg.read_concurrency_config.raise_queue_overloaded_exception = [this] { + ++_stats->sstable_read_queue_overloaded; + throw std::runtime_error("sstable inactive read queue overloaded"); + }; cfg.cf_stats = &_cf_stats; cfg.enable_incremental_backups = _enable_incremental_backups; return cfg; diff --git a/database.hh b/database.hh index 661cfb4320..ea34381c99 100644 --- a/database.hh +++ b/database.hh @@ -263,7 +263,7 @@ public: size_t max_streaming_memtable_size = 5'000'000; logalloc::region_group* dirty_memory_region_group = nullptr; logalloc::region_group* streaming_dirty_memory_region_group = nullptr; - semaphore* read_concurrency_sem = nullptr; + restricted_mutation_reader_config read_concurrency_config; ::cf_stats* cf_stats = nullptr; }; struct no_commitlog {}; @@ -790,7 +790,7 @@ public: size_t max_streaming_memtable_size = 5'000'000; logalloc::region_group* dirty_memory_region_group = nullptr; logalloc::region_group* streaming_dirty_memory_region_group = nullptr; - semaphore* read_concurreny_sem = nullptr; + restricted_mutation_reader_config read_concurrency_config; ::cf_stats* cf_stats = nullptr; }; private: @@ -875,6 +875,7 @@ class database { struct db_stats { uint64_t total_writes = 0; uint64_t total_reads = 0; + uint64_t sstable_read_queue_overloaded = 0; }; lw_shared_ptr _stats; @@ -885,7 +886,9 @@ class database { logalloc::region_group _dirty_memory_region_group; logalloc::region_group _streaming_dirty_memory_region_group; semaphore _read_concurrency_sem{max_concurrent_reads()}; + restricted_mutation_reader_config _read_concurrency_config; semaphore _system_read_concurrency_sem{max_system_concurrent_reads()}; + restricted_mutation_reader_config _system_read_concurrency_config; std::unordered_map _keyspaces; std::unordered_map> _column_families; diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index 411117b2f9..64a5eca3f2 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -1023,7 +1023,9 @@ void make(database& db, bool durable, bool volatile_testing_only) { kscfg.enable_commitlog = !volatile_testing_only; kscfg.enable_cache = true; // don't make system keyspace reads wait for user reads - kscfg.read_concurreny_sem = &db.system_keyspace_read_concurrency_sem(); + kscfg.read_concurrency_config.sem = &db.system_keyspace_read_concurrency_sem(); + kscfg.read_concurrency_config.timeout = {}; + kscfg.read_concurrency_config.max_queue_length = std::numeric_limits::max(); keyspace _ks{ksm, std::move(kscfg)}; auto rs(locator::abstract_replication_strategy::create_replication_strategy(NAME, "LocalStrategy", service::get_local_storage_service().get_token_metadata(), ksm->strategy_options())); _ks.set_replication_strategy(std::move(rs)); diff --git a/mutation_reader.cc b/mutation_reader.cc index 9c4cfada69..0663e3518b 100644 --- a/mutation_reader.cc +++ b/mutation_reader.cc @@ -192,17 +192,20 @@ mutation_reader make_empty_reader() { class restricting_mutation_reader : public mutation_reader::impl { - semaphore& _sem; + const restricted_mutation_reader_config& _config; unsigned _weight = 0; bool _waited = false; mutation_reader _base; public: - restricting_mutation_reader(semaphore& sem, unsigned weight, mutation_reader&& base) - : _sem(sem), _weight(weight), _base(std::move(base)) { + restricting_mutation_reader(const restricted_mutation_reader_config& config, unsigned weight, mutation_reader&& base) + : _config(config), _weight(weight), _base(std::move(base)) { + if (_config.sem->waiters() >= _config.max_queue_length) { + _config.raise_queue_overloaded_exception(); + } } ~restricting_mutation_reader() { if (_waited) { - _sem.signal(_weight); + _config.sem->signal(_weight); } } future operator()() override { @@ -211,7 +214,10 @@ public: if (_waited) { return _base(); } - return _sem.wait(_weight).then([this] { + auto waited = _config.timeout.count() != 0 + ? _config.sem->wait(_config.timeout, _weight) + : _config.sem->wait(_weight); + return waited.then([this] { _waited = true; return _base(); }); @@ -219,6 +225,6 @@ public: }; mutation_reader -make_restricted_reader(semaphore& sem, unsigned weight, mutation_reader&& base) { - return make_mutation_reader(sem, weight, std::move(base)); +make_restricted_reader(const restricted_mutation_reader_config& config, unsigned weight, mutation_reader&& base) { + return make_mutation_reader(config, weight, std::move(base)); } diff --git a/mutation_reader.hh b/mutation_reader.hh index 287bd7abc7..61fdbc10b0 100644 --- a/mutation_reader.hh +++ b/mutation_reader.hh @@ -83,8 +83,23 @@ mutation_reader make_reader_returning_many(std::vector, query::clustering_key_filtering_context filter = query::no_clustering_key_filtering); mutation_reader make_reader_returning_many(std::vector); mutation_reader make_empty_reader(); -// Restricts a given `mutation_reader` to a concurrency limited by a `semaphore`. -mutation_reader make_restricted_reader(semaphore& sem, unsigned weight, mutation_reader&& base); + +struct restricted_mutation_reader_config { + semaphore* sem = nullptr; + std::chrono::nanoseconds timeout = {}; + size_t max_queue_length = std::numeric_limits::max(); + std::function raise_queue_overloaded_exception = default_raise_queue_overloaded_exception; + + static void default_raise_queue_overloaded_exception() { + throw std::runtime_error("restricted mutation reader queue overload"); + } +}; + +// Restricts a given `mutation_reader` to a concurrency limited according to settings in +// a restricted_mutation_reader_config. These settings include a semaphore for limiting the number +// of active concurrent readers, a timeout for inactive readers, and a maximum queue size for +// inactive readers. +mutation_reader make_restricted_reader(const restricted_mutation_reader_config& config, unsigned weight, mutation_reader&& base); /* template