diff --git a/database.cc b/database.cc index 15d51efbd3..abfd5ea27e 100644 --- a/database.cc +++ b/database.cc @@ -258,8 +258,8 @@ column_family::make_sstable_reader(schema_ptr s, const io_priority_class& pc) const { // restricts a reader's concurrency if the configuration specifies it auto restrict_reader = [&] (mutation_reader&& in) { - if (_config.read_concurrency_sem) { - return make_restricted_reader(*_config.read_concurrency_sem, 1, std::move(in)); + if (_config.read_concurrency_config.sem) { + return make_restricted_reader(_config.read_concurrency_config, 1, std::move(in)); } else { return std::move(in); } @@ -1373,6 +1373,13 @@ database::setup_collectd() { , scollectd::make_typed(scollectd::data_type::DERIVE, _stats->total_reads) )); + _collectd.push_back( + scollectd::add_polled_metric(scollectd::type_instance_id("database" + , scollectd::per_cpu_plugin_instance + , "total_operations", "sstable_read_queue_overloads") + , scollectd::make_typed(scollectd::data_type::COUNTER, _stats->sstable_read_queue_overloaded) + )); + _collectd.push_back( scollectd::add_polled_metric(scollectd::type_instance_id("database" , scollectd::per_cpu_plugin_instance @@ -1766,7 +1773,7 @@ keyspace::make_column_family_config(const schema& s) const { cfg.max_streaming_memtable_size = _config.max_streaming_memtable_size; cfg.dirty_memory_region_group = _config.dirty_memory_region_group; cfg.streaming_dirty_memory_region_group = _config.streaming_dirty_memory_region_group; - cfg.read_concurrency_sem = _config.read_concurreny_sem; + cfg.read_concurrency_config = _config.read_concurrency_config; cfg.cf_stats = _config.cf_stats; cfg.enable_incremental_backups = _config.enable_incremental_backups; @@ -2228,7 +2235,14 @@ database::make_keyspace_config(const keyspace_metadata& ksm) { } cfg.dirty_memory_region_group = &_dirty_memory_region_group; cfg.streaming_dirty_memory_region_group = &_streaming_dirty_memory_region_group; - cfg.read_concurreny_sem = &_read_concurrency_sem; + cfg.read_concurrency_config.sem = &_read_concurrency_sem; + cfg.read_concurrency_config.timeout = _cfg->read_request_timeout_in_ms() * 1ms; + // Assume a queued read takes up 1kB of memory, and allow 2% of memory to be filled up with such reads. + cfg.read_concurrency_config.max_queue_length = memory::stats().total_memory() * 0.02 / 1000; + cfg.read_concurrency_config.raise_queue_overloaded_exception = [this] { + ++_stats->sstable_read_queue_overloaded; + throw std::runtime_error("sstable inactive read queue overloaded"); + }; cfg.cf_stats = &_cf_stats; cfg.enable_incremental_backups = _enable_incremental_backups; return cfg; diff --git a/database.hh b/database.hh index 661cfb4320..ea34381c99 100644 --- a/database.hh +++ b/database.hh @@ -263,7 +263,7 @@ public: size_t max_streaming_memtable_size = 5'000'000; logalloc::region_group* dirty_memory_region_group = nullptr; logalloc::region_group* streaming_dirty_memory_region_group = nullptr; - semaphore* read_concurrency_sem = nullptr; + restricted_mutation_reader_config read_concurrency_config; ::cf_stats* cf_stats = nullptr; }; struct no_commitlog {}; @@ -790,7 +790,7 @@ public: size_t max_streaming_memtable_size = 5'000'000; logalloc::region_group* dirty_memory_region_group = nullptr; logalloc::region_group* streaming_dirty_memory_region_group = nullptr; - semaphore* read_concurreny_sem = nullptr; + restricted_mutation_reader_config read_concurrency_config; ::cf_stats* cf_stats = nullptr; }; private: @@ -875,6 +875,7 @@ class database { struct db_stats { uint64_t total_writes = 0; uint64_t total_reads = 0; + uint64_t sstable_read_queue_overloaded = 0; }; lw_shared_ptr _stats; @@ -885,7 +886,9 @@ class database { logalloc::region_group _dirty_memory_region_group; logalloc::region_group _streaming_dirty_memory_region_group; semaphore _read_concurrency_sem{max_concurrent_reads()}; + restricted_mutation_reader_config _read_concurrency_config; semaphore _system_read_concurrency_sem{max_system_concurrent_reads()}; + restricted_mutation_reader_config _system_read_concurrency_config; std::unordered_map _keyspaces; std::unordered_map> _column_families; diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index 411117b2f9..64a5eca3f2 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -1023,7 +1023,9 @@ void make(database& db, bool durable, bool volatile_testing_only) { kscfg.enable_commitlog = !volatile_testing_only; kscfg.enable_cache = true; // don't make system keyspace reads wait for user reads - kscfg.read_concurreny_sem = &db.system_keyspace_read_concurrency_sem(); + kscfg.read_concurrency_config.sem = &db.system_keyspace_read_concurrency_sem(); + kscfg.read_concurrency_config.timeout = {}; + kscfg.read_concurrency_config.max_queue_length = std::numeric_limits::max(); keyspace _ks{ksm, std::move(kscfg)}; auto rs(locator::abstract_replication_strategy::create_replication_strategy(NAME, "LocalStrategy", service::get_local_storage_service().get_token_metadata(), ksm->strategy_options())); _ks.set_replication_strategy(std::move(rs)); diff --git a/mutation_reader.cc b/mutation_reader.cc index 9c4cfada69..0663e3518b 100644 --- a/mutation_reader.cc +++ b/mutation_reader.cc @@ -192,17 +192,20 @@ mutation_reader make_empty_reader() { class restricting_mutation_reader : public mutation_reader::impl { - semaphore& _sem; + const restricted_mutation_reader_config& _config; unsigned _weight = 0; bool _waited = false; mutation_reader _base; public: - restricting_mutation_reader(semaphore& sem, unsigned weight, mutation_reader&& base) - : _sem(sem), _weight(weight), _base(std::move(base)) { + restricting_mutation_reader(const restricted_mutation_reader_config& config, unsigned weight, mutation_reader&& base) + : _config(config), _weight(weight), _base(std::move(base)) { + if (_config.sem->waiters() >= _config.max_queue_length) { + _config.raise_queue_overloaded_exception(); + } } ~restricting_mutation_reader() { if (_waited) { - _sem.signal(_weight); + _config.sem->signal(_weight); } } future operator()() override { @@ -211,7 +214,10 @@ public: if (_waited) { return _base(); } - return _sem.wait(_weight).then([this] { + auto waited = _config.timeout.count() != 0 + ? _config.sem->wait(_config.timeout, _weight) + : _config.sem->wait(_weight); + return waited.then([this] { _waited = true; return _base(); }); @@ -219,6 +225,6 @@ public: }; mutation_reader -make_restricted_reader(semaphore& sem, unsigned weight, mutation_reader&& base) { - return make_mutation_reader(sem, weight, std::move(base)); +make_restricted_reader(const restricted_mutation_reader_config& config, unsigned weight, mutation_reader&& base) { + return make_mutation_reader(config, weight, std::move(base)); } diff --git a/mutation_reader.hh b/mutation_reader.hh index 287bd7abc7..61fdbc10b0 100644 --- a/mutation_reader.hh +++ b/mutation_reader.hh @@ -83,8 +83,23 @@ mutation_reader make_reader_returning_many(std::vector, query::clustering_key_filtering_context filter = query::no_clustering_key_filtering); mutation_reader make_reader_returning_many(std::vector); mutation_reader make_empty_reader(); -// Restricts a given `mutation_reader` to a concurrency limited by a `semaphore`. -mutation_reader make_restricted_reader(semaphore& sem, unsigned weight, mutation_reader&& base); + +struct restricted_mutation_reader_config { + semaphore* sem = nullptr; + std::chrono::nanoseconds timeout = {}; + size_t max_queue_length = std::numeric_limits::max(); + std::function raise_queue_overloaded_exception = default_raise_queue_overloaded_exception; + + static void default_raise_queue_overloaded_exception() { + throw std::runtime_error("restricted mutation reader queue overload"); + } +}; + +// Restricts a given `mutation_reader` to a concurrency limited according to settings in +// a restricted_mutation_reader_config. These settings include a semaphore for limiting the number +// of active concurrent readers, a timeout for inactive readers, and a maximum queue size for +// inactive readers. +mutation_reader make_restricted_reader(const restricted_mutation_reader_config& config, unsigned weight, mutation_reader&& base); /* template