diff --git a/CMakeLists.txt b/CMakeLists.txt index 9c2b90bab2..1a19edda5e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -199,6 +199,7 @@ target_sources(scylla-main tombstone_gc_options.cc tombstone_gc.cc reader_concurrency_semaphore.cc + reader_concurrency_semaphore_group.cc row_cache.cc schema_mutations.cc serializer.cc diff --git a/configure.py b/configure.py index f8e8fd407a..5960768e4f 100755 --- a/configure.py +++ b/configure.py @@ -1160,6 +1160,7 @@ scylla_core = (['message/messaging_service.cc', 'service/topology_coordinator.cc', 'node_ops/node_ops_ctl.cc', 'node_ops/task_manager_module.cc', + 'reader_concurrency_semaphore_group.cc', ] + [Antlr3Grammar('cql3/Cql.g')] \ + scylla_raft_core ) diff --git a/main.cc b/main.cc index b0fe95778e..0b982a60e6 100644 --- a/main.cc +++ b/main.cc @@ -1211,6 +1211,20 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl sstm.stop().get(); }); + static sharded auth_service; + static sharded maintenance_auth_service; + static sharded sl_controller; + debug::the_sl_controller = &sl_controller; + + //starting service level controller + qos::service_level_options default_service_level_configuration; + default_service_level_configuration.shares = 1000; + sl_controller.start(std::ref(auth_service), std::ref(token_metadata), std::ref(stop_signal.as_sharded_abort_source()), default_service_level_configuration, dbcfg.statement_scheduling_group).get(); + sl_controller.invoke_on_all(&qos::service_level_controller::start).get(); + auto stop_sl_controller = defer_verbose_shutdown("service level controller", [] { + sl_controller.stop().get(); + }); + lang::manager::config lang_config; lang_config.lua.max_bytes = cfg->user_defined_function_allocation_limit_bytes(); lang_config.lua.max_contiguous = cfg->user_defined_function_contiguous_allocation_limit_bytes(); @@ -1247,7 +1261,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl // because it obtains the list of pre-existing segments for replay, which must // not include reserve segments created by active commitlogs. db.local().init_commitlog().get(); - db.invoke_on_all(&replica::database::start).get(); + db.invoke_on_all(&replica::database::start, std::ref(sl_controller)).get(); ::sigquit_handler sigquit_handler(db); @@ -1339,20 +1353,6 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl api::unset_server_config(ctx).get(); }); - static sharded auth_service; - static sharded maintenance_auth_service; - static sharded sl_controller; - debug::the_sl_controller = &sl_controller; - - //starting service level controller - qos::service_level_options default_service_level_configuration; - default_service_level_configuration.shares = 1000; - sl_controller.start(std::ref(auth_service), std::ref(token_metadata), std::ref(stop_signal.as_sharded_abort_source()), default_service_level_configuration, dbcfg.statement_scheduling_group).get(); - sl_controller.invoke_on_all(&qos::service_level_controller::start).get(); - auto stop_sl_controller = defer_verbose_shutdown("service level controller", [] { - sl_controller.stop().get(); - }); - static sharded sys_dist_ks; static sharded sys_ks; static sharded view_update_generator; diff --git a/reader_concurrency_semaphore.hh b/reader_concurrency_semaphore.hh index 44aba5437b..b52afefc9a 100644 --- a/reader_concurrency_semaphore.hh +++ b/reader_concurrency_semaphore.hh @@ -124,6 +124,8 @@ public: uint64_t sstables_read = 0; // Permits waiting on something: admission, memory or execution uint64_t waiters = 0; + + friend auto operator<=>(const stats&, const stats&) = default; }; using permit_list_type = bi::list< diff --git a/reader_concurrency_semaphore_group.cc b/reader_concurrency_semaphore_group.cc new file mode 100644 index 0000000000..fcc5ea7894 --- /dev/null +++ b/reader_concurrency_semaphore_group.cc @@ -0,0 +1,112 @@ +/* + * Copyright (C) 2021-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 + */ + +#include "reader_concurrency_semaphore_group.hh" + +// Calling adjust is serialized since 2 adjustments can't happen simultaneosly, +// if they did the behaviour would be undefined. +future<> reader_concurrency_semaphore_group::adjust() { + return with_semaphore(_operations_serializer, 1, [this] () { + ssize_t distributed_memory = 0; + for (auto& [sg, wsem] : _semaphores) { + const ssize_t memory_share = std::floor((double(wsem.weight) / double(_total_weight)) * _total_memory); + wsem.sem.set_resources({_max_concurrent_reads, memory_share}); + distributed_memory += memory_share; + } + // Slap the remainder on one of the semaphores. + // This will be a few bytes, doesn't matter where we add it. + auto& sem = _semaphores.begin()->second.sem; + sem.set_resources(sem.initial_resources() + reader_resources{0, _total_memory - distributed_memory}); + }); +} + +// The call to change_weight is serialized as a consequence of the call to adjust. +future<> reader_concurrency_semaphore_group::change_weight(weighted_reader_concurrency_semaphore& sem, size_t new_weight) { + auto diff = new_weight - sem.weight; + if (diff) { + sem.weight += diff; + _total_weight += diff; + return adjust(); + } + return make_ready_future<>(); +} + +future<> reader_concurrency_semaphore_group::wait_adjust_complete() { + return with_semaphore(_operations_serializer, 1, [] { + return make_ready_future<>(); + }); +} + +future<> reader_concurrency_semaphore_group::stop() noexcept { + return parallel_for_each(_semaphores, [] (auto&& item) { + return item.second.sem.stop(); + }).then([this] { + _semaphores.clear(); + }); +} + +reader_concurrency_semaphore& reader_concurrency_semaphore_group::get(scheduling_group sg) { + return _semaphores.at(sg).sem; +} +reader_concurrency_semaphore* reader_concurrency_semaphore_group::get_or_null(scheduling_group sg) { + auto it = _semaphores.find(sg); + if (it == _semaphores.end()) { + return nullptr; + } else { + return &(it->second.sem); + } +} +reader_concurrency_semaphore& reader_concurrency_semaphore_group::add_or_update(scheduling_group sg, size_t shares) { + auto result = _semaphores.try_emplace( + sg, + 0, + _max_concurrent_reads, + _name_prefix ? format("{}_{}", *_name_prefix, sg.name()) : sg.name(), + _max_queue_length, + _serialize_limit_multiplier, + _kill_limit_multiplier, + _cpu_concurrency + ); + auto&& it = result.first; + // since we serialize all group changes this change wait will be queues and no further operations + // will be executed until this adjustment ends. + (void)change_weight(it->second, shares); + return it->second.sem; +} + +future<> reader_concurrency_semaphore_group::remove(scheduling_group sg) { + auto node_handle = _semaphores.extract(sg); + if (!node_handle.empty()) { + weighted_reader_concurrency_semaphore& sem = node_handle.mapped(); + return sem.sem.stop().then([this, &sem] { + return change_weight(sem, 0); + }).finally([node_handle = std::move(node_handle)] () { + // this holds on to the node handle until we destroy it only after the semaphore + // is stopped properly. + }); + } + return make_ready_future(); +} + +size_t reader_concurrency_semaphore_group::size() { + return _semaphores.size(); +} + +void reader_concurrency_semaphore_group::foreach_semaphore(std::function func) { + for (auto& [sg, wsem] : _semaphores) { + func(sg, wsem.sem); + } +} + +future<> +reader_concurrency_semaphore_group::foreach_semaphore_async(std::function (scheduling_group, reader_concurrency_semaphore&)> func) { + auto units = co_await get_units(_operations_serializer, 1); + for (auto& [sg, wsem] : _semaphores) { + co_await func(sg, wsem.sem); + } +} diff --git a/reader_concurrency_semaphore_group.hh b/reader_concurrency_semaphore_group.hh new file mode 100644 index 0000000000..6d073ef472 --- /dev/null +++ b/reader_concurrency_semaphore_group.hh @@ -0,0 +1,90 @@ +/* + * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 + */ + +/* + * Copyright (C) 2021-present ScyllaDB + */ + +#pragma once + +#include +#include +#include "reader_concurrency_semaphore.hh" +#include +#include + +// The reader_concurrency_semaphore_group is a group of semaphores that shares a common pool of memory, +// the memory is dynamically divided between them according to a relative slice of shares each semaphore +// is given. +// All of the mutating operations on the group are asynchronic and serialized. The semaphores are created +// and managed by the group. + +class reader_concurrency_semaphore_group { + size_t _total_memory; + size_t _total_weight; + size_t _max_concurrent_reads; + size_t _max_queue_length; + utils::updateable_value _serialize_limit_multiplier; + utils::updateable_value _kill_limit_multiplier; + utils::updateable_value _cpu_concurrency; + + friend class database_test_wrapper; + + struct weighted_reader_concurrency_semaphore { + size_t weight; + ssize_t memory_share; + reader_concurrency_semaphore sem; + weighted_reader_concurrency_semaphore(size_t shares, int count, sstring name, size_t max_queue_length, + utils::updateable_value serialize_limit_multiplier, + utils::updateable_value kill_limit_multiplier, + utils::updateable_value cpu_concurrency) + : weight(shares) + , memory_share(0) + , sem(utils::updateable_value(count), 0, name, max_queue_length, std::move(serialize_limit_multiplier), std::move(kill_limit_multiplier), + std::move(cpu_concurrency), reader_concurrency_semaphore::register_metrics::yes) {} + }; + + std::unordered_map _semaphores; + seastar::semaphore _operations_serializer; + std::optional _name_prefix; + + future<> change_weight(weighted_reader_concurrency_semaphore& sem, size_t new_weight); + +public: + reader_concurrency_semaphore_group(size_t memory, size_t max_concurrent_reads, size_t max_queue_length, + utils::updateable_value serialize_limit_multiplier, + utils::updateable_value kill_limit_multiplier, + utils::updateable_value cpu_concurrency, + std::optional name_prefix = std::nullopt) + : _total_memory(memory) + , _total_weight(0) + , _max_concurrent_reads(max_concurrent_reads) + , _max_queue_length(max_queue_length) + , _serialize_limit_multiplier(std::move(serialize_limit_multiplier)) + , _kill_limit_multiplier(std::move(kill_limit_multiplier)) + , _cpu_concurrency(std::move(cpu_concurrency)) + , _operations_serializer(1) + , _name_prefix(std::move(name_prefix)) { } + + ~reader_concurrency_semaphore_group() { + assert(_semaphores.empty()); + } + future<> adjust(); + future<> wait_adjust_complete(); + + future<> stop() noexcept; + reader_concurrency_semaphore& get(scheduling_group sg); + reader_concurrency_semaphore* get_or_null(scheduling_group sg); + reader_concurrency_semaphore& add_or_update(scheduling_group sg, size_t shares); + future<> remove(scheduling_group sg); + size_t size(); + void foreach_semaphore(std::function func); + + future<> foreach_semaphore_async(std::function (scheduling_group, reader_concurrency_semaphore&)> func); + + auto sum_read_concurrency_sem_var(std::invocable auto member) { + using ret_type = std::invoke_result_t; + return boost::accumulate(_semaphores | boost::adaptors::map_values | boost::adaptors::transformed([=] (weighted_reader_concurrency_semaphore& wrcs) { return std::invoke(member, wrcs.sem); }), ret_type(0)); + } +}; diff --git a/replica/database.cc b/replica/database.cc index 87da7731d3..234f2991be 100644 --- a/replica/database.cc +++ b/replica/database.cc @@ -67,6 +67,7 @@ #include "locator/abstract_replication_strategy.hh" #include "timeout_config.hh" #include "tombstone_gc.hh" +#include "service/qos/service_level_controller.hh" #include "replica/data_dictionary_impl.hh" #include "replica/global_table_ptr.hh" @@ -220,14 +221,8 @@ void database::setup_scylla_memory_diagnostics_producer() { writeln("Replica:\n"); writeln(" Read Concurrency Semaphores:\n"); - const std::pair semaphores[] = { - {"user", _read_concurrency_sem}, - {"streaming", _streaming_concurrency_sem}, - {"system", _system_read_concurrency_sem}, - {"compaction", _compaction_concurrency_sem}, - {"view update", _view_update_read_concurrency_sem}, - }; - for (const auto& [name, sem] : semaphores) { + + static auto semaphore_dump = [&writeln] (const sstring& name, const reader_concurrency_semaphore& sem) { const auto initial_res = sem.initial_resources(); const auto available_res = sem.available_resources(); if (sem.is_unlimited()) { @@ -245,7 +240,17 @@ void database::setup_scylla_memory_diagnostics_producer() { utils::to_hr_size(initial_res.memory), sem.get_stats().waiters); } - } + }; + + semaphore_dump("streaming", _streaming_concurrency_sem); + semaphore_dump("system", _system_read_concurrency_sem); + semaphore_dump("compaction", _compaction_concurrency_sem); + _reader_concurrency_semaphores_group.foreach_semaphore([] (scheduling_group sg, reader_concurrency_semaphore& sem) { + semaphore_dump(sg.name(), sem); + }); + _view_update_read_concurrency_semaphores_group.foreach_semaphore([] (scheduling_group sg, reader_concurrency_semaphore& sem) { + semaphore_dump(sg.name(), sem); + }); writeln(" Execution Stages:\n"); const std::pair execution_stage_summaries[] = { @@ -311,6 +316,42 @@ public: } }; +reader_concurrency_semaphore& +database::read_concurrency_sem() { + reader_concurrency_semaphore* sem = _reader_concurrency_semaphores_group.get_or_null(current_scheduling_group()); + if (!sem) { + // this line is commented out, however we shouldn't get here because it means that a user query or even worse, + // some random query was triggered from an unanticipated scheduling groups and this violates the isolation we are trying to achieve. + // It is commented out for two reasons: + // 1. So we will be able to ease into this new system, first testing functionality and effect and only then mix in exceptions and asserts. + // 2. So the series containing those changes will be backportable without causing too harsh regressions (aborts) on one hand and without forcing + // extensive changes on the other hand. + // Follow Up: uncomment this line and run extensive testing. Handle every case of abort. + // seastar::on_internal_error(dblog, format("Tried to run a user query in a wrong scheduling group (scheduling group: '{}')", current_scheduling_group().name())); + sem = _reader_concurrency_semaphores_group.get_or_null(_default_read_concurrency_group); + if (!sem) { + // If we got here - the initialization went very wrong and we can't do anything about it. + // This can only happen if someone touched the initialization code which is assumed to initialize at least + // this default semaphore. + seastar::on_internal_error(dblog, "Default read concurrency semaphore wasn't found, something probably went wrong during database::start"); + } + } + return *sem; +} + +// With same concerns as read_concurrency_sem(). +reader_concurrency_semaphore& +database::view_update_read_concurrency_sem() { + reader_concurrency_semaphore* sem = _view_update_read_concurrency_semaphores_group.get_or_null(current_scheduling_group()); + if (!sem) { + sem = _view_update_read_concurrency_semaphores_group.get_or_null(_default_read_concurrency_group); + if (!sem) { + seastar::on_internal_error(dblog, "Default view update read concurrency semaphore wasn't found, something probably went wrong during database::start"); + } + } + return *sem; +} + database::database(const db::config& cfg, database_config dbcfg, service::migration_notifier& mn, gms::feature_service& feat, const locator::shared_token_metadata& stm, compaction_manager& cm, sstables::storage_manager& sstm, lang::manager& langm, sstables::directory_semaphore& sst_dir_sem, const abort_source& abort, utils::cross_shard_barrier barrier) : _stats(make_lw_shared()) @@ -329,15 +370,6 @@ database::database(const db::config& cfg, database_config dbcfg, service::migrat } return backlog; })) - , _read_concurrency_sem( - utils::updateable_value(max_count_concurrent_reads), - max_memory_concurrent_reads(), - "user", - max_inactive_queue_length(), - _cfg.reader_concurrency_semaphore_serialize_limit_multiplier, - _cfg.reader_concurrency_semaphore_kill_limit_multiplier, - _cfg.reader_concurrency_semaphore_cpu_concurrency, - reader_concurrency_semaphore::register_metrics::yes) // No timeouts or queue length limits - a failure here can kill an entire repair. // Trust the caller to limit concurrency. , _streaming_concurrency_sem( @@ -360,15 +392,14 @@ database::database(const db::config& cfg, database_config dbcfg, service::migrat utils::updateable_value(std::numeric_limits::max()), utils::updateable_value(std::numeric_limits::max()), reader_concurrency_semaphore::register_metrics::yes) - , _view_update_read_concurrency_sem( - utils::updateable_value(max_count_concurrent_view_update_reads), + , _view_update_read_concurrency_semaphores_group( max_memory_concurrent_view_update_reads(), - "view_update", + utils::updateable_value(max_count_concurrent_view_update_reads), max_inactive_view_update_queue_length(), _cfg.view_update_reader_concurrency_semaphore_serialize_limit_multiplier, _cfg.view_update_reader_concurrency_semaphore_kill_limit_multiplier, _cfg.view_update_reader_concurrency_semaphore_cpu_concurrency, - reader_concurrency_semaphore::register_metrics::yes) + "view_update") , _row_cache_tracker(_cfg.index_cache_fraction.operator utils::updateable_value(), cache_tracker::register_metrics::yes) , _apply_stage("db_apply", &database::do_apply) , _version(empty_version) @@ -392,6 +423,10 @@ database::database(const db::config& cfg, database_config dbcfg, service::migrat , _feat(feat) , _shared_token_metadata(stm) , _lang_manager(langm) + , _reader_concurrency_semaphores_group(max_memory_concurrent_reads(), max_count_concurrent_reads, max_inactive_queue_length(), + _cfg.reader_concurrency_semaphore_serialize_limit_multiplier, + _cfg.reader_concurrency_semaphore_kill_limit_multiplier, + _cfg.reader_concurrency_semaphore_cpu_concurrency) , _stop_barrier(std::move(barrier)) , _update_memtable_flush_static_shares_action([this, &cfg] { return _memtable_controller.update_static_shares(cfg.memtable_flush_static_shares()); }) , _memtable_flush_static_shares_observer(cfg.memtable_flush_static_shares.observe(_update_memtable_flush_static_shares_action.make_observer())) @@ -485,6 +520,12 @@ namespace replica { static const metrics::label class_label("class"); + +auto +database::sum_read_concurrency_sem_stat(std::invocable auto stats_member) { + return _reader_concurrency_semaphores_group.sum_read_concurrency_sem_var([&] (reader_concurrency_semaphore& rcs) { return std::invoke(stats_member, rcs.get_stats()); }); +} + void database::setup_metrics() { _dirty_memory_manager.setup_collectd("regular"); @@ -1605,7 +1646,7 @@ query::max_result_size database::get_query_max_result_size() const { reader_concurrency_semaphore& database::get_reader_concurrency_semaphore() { switch (classify_request(_dbcfg)) { - case request_class::user: return _read_concurrency_sem; + case request_class::user: return read_concurrency_sem(); case request_class::system: return _system_read_concurrency_sem; case request_class::maintenance: return _streaming_concurrency_sem; } @@ -1634,9 +1675,15 @@ future<> database::clear_inactive_reads_for_tablet(table_id table, dht::token_ra } future<> database::foreach_reader_concurrency_semaphore(std::function(reader_concurrency_semaphore&)> func) { - for (auto* sem : {&_read_concurrency_sem, &_streaming_concurrency_sem, &_compaction_concurrency_sem, &_system_read_concurrency_sem, &_view_update_read_concurrency_sem}) { + for (auto* sem : {&_streaming_concurrency_sem, &_compaction_concurrency_sem, &_system_read_concurrency_sem}) { co_await func(*sem); } + co_await _reader_concurrency_semaphores_group.foreach_semaphore_async([&] (scheduling_group sg, reader_concurrency_semaphore& sem) -> future<> { + co_await func(sem); + }); + co_await _view_update_read_concurrency_semaphores_group.foreach_semaphore_async([&] (scheduling_group sg, reader_concurrency_semaphore& sem) -> future<> { + co_await func(sem); + }); } std::ostream& operator<<(std::ostream& out, const column_family& cf) { @@ -1935,7 +1982,7 @@ future<> database::do_apply(schema_ptr s, const frozen_mutation& m, tracing::tra co_await coroutine::return_exception(std::runtime_error("view update generator not plugged to push updates")); } - auto lock_f = co_await coroutine::as_future(cf.push_view_replica_updates(_view_update_generator, s, m, timeout, std::move(tr_state), _view_update_read_concurrency_sem)); + auto lock_f = co_await coroutine::as_future(cf.push_view_replica_updates(_view_update_generator, s, m, timeout, std::move(tr_state), view_update_read_concurrency_sem())); if (lock_f.failed()) { auto ex = lock_f.get_exception(); if (is_timeout_exception(ex)) { @@ -2182,7 +2229,54 @@ void database::revert_initial_system_read_concurrency_boost() { dblog.debug("Reverted system read concurrency from initial {} to normal {}", database::max_count_concurrent_reads, database::max_count_system_concurrent_reads); } -future<> database::start() { +future<> database::start(sharded& sl_controller) { + sl_controller.local().register_subscriber(this); + _unsubscribe_qos_configuration_change = [this, &sl_controller] () { + return sl_controller.local().unregister_subscriber(this); + }; + qos::service_level default_service_level = sl_controller.local().get_service_level(qos::service_level_controller::default_service_level_name); + int32_t default_shares = 1000; + if (int32_t* default_shares_p = std::get_if(&(default_service_level.slo.shares))) { + default_shares = *default_shares_p; + } else { + on_internal_error(dblog, "The default service_level should always contain shares value"); + } + + // The former _dbcfg.statement_scheduling_group and the later can be the same group, so we want + // the later to be the accurate one. + _default_read_concurrency_group = default_service_level.sg; + _reader_concurrency_semaphores_group.add_or_update(default_service_level.sg, default_shares); + _view_update_read_concurrency_semaphores_group.add_or_update(default_service_level.sg, default_shares); + + // lets insert the statement scheduling group only if we haven't reused it in sl_controller, + // but it shouldn't happen + if (!_reader_concurrency_semaphores_group.get_or_null(_dbcfg.statement_scheduling_group)) { + // This is super ugly, we need to either force the database to use system scheduling group for non-user queries + // or, if we have user queries running on this scheduling group make it's definition more robust (what runs in it). + // Another ugly thing here is that we have to have a pre-existing knowladge about the shares ammount this group was + // built with. I think we should have a followup that makes this more robust. + _reader_concurrency_semaphores_group.add_or_update(_dbcfg.statement_scheduling_group, 1000); + _view_update_read_concurrency_semaphores_group.add_or_update(_dbcfg.statement_scheduling_group, 1000); + } + + // This will wait for the semaphores to be given some memory. + // We need this since the below statements (get_distributed_service_levels in particular) will need + // to run queries and for this they will need to admit some memory. + co_await _reader_concurrency_semaphores_group.wait_adjust_complete(); + co_await _view_update_read_concurrency_semaphores_group.wait_adjust_complete(); + + auto service_levels = co_await sl_controller.local().get_distributed_service_levels(qos::query_context::group0); + for (auto&& service_level_record : service_levels) { + auto service_level = sl_controller.local().get_service_level(service_level_record.first); + if (service_level.slo.shares_name && *service_level.slo.shares_name != qos::service_level_controller::default_service_level_name) { + // We know slo.shares is valid becuse we know that slo.shares_name is valid + _reader_concurrency_semaphores_group.add_or_update(service_level.sg, std::get(service_level.slo.shares)); + _view_update_read_concurrency_semaphores_group.add_or_update(service_level.sg, std::get(service_level.slo.shares)); + } + } + + co_await _reader_concurrency_semaphores_group.adjust(); + co_await _view_update_read_concurrency_semaphores_group.adjust(); _large_data_handler->start(); // We need the compaction manager ready early so we can reshard. _compaction_manager.enable(); @@ -2215,10 +2309,12 @@ future<> database::shutdown() { } future<> database::stop() { + if (_unsubscribe_qos_configuration_change) { + co_await std::exchange(_unsubscribe_qos_configuration_change, {})(); + } if (!_shutdown) { co_await shutdown(); } - // try to ensure that CL has done disk flushing if (_commitlog) { dblog.info("Shutting down commitlog"); @@ -2250,11 +2346,11 @@ future<> database::stop() { dblog.info("Stopping querier cache"); co_await _querier_cache.stop(); dblog.info("Stopping concurrency semaphores"); - co_await _read_concurrency_sem.stop(); + co_await _reader_concurrency_semaphores_group.stop(); + co_await _view_update_read_concurrency_semaphores_group.stop(); co_await _streaming_concurrency_sem.stop(); co_await _compaction_concurrency_sem.stop(); co_await _system_read_concurrency_sem.stop(); - co_await _view_update_read_concurrency_sem.stop(); dblog.info("Joining memtable update action"); co_await _update_memtable_flush_static_shares_action.join(); } @@ -3029,3 +3125,41 @@ future>> query_data( } } // namespace replica + +namespace replica { + +/** This callback is going to be called just before the service level is available **/ +future<> database::on_before_service_level_add(qos::service_level_options slo, qos::service_level_info sl_info) { + if (auto shares_p = std::get_if(&slo.shares)) { + _reader_concurrency_semaphores_group.add_or_update(sl_info.sg, *shares_p); + _view_update_read_concurrency_semaphores_group.add_or_update(sl_info.sg, *shares_p); + // the call to add_or_update_read_concurrency_sem will take the semaphore until the adjustment + // is completed, we need to wait for the operation to complete. + co_await _reader_concurrency_semaphores_group.wait_adjust_complete(); + co_await _view_update_read_concurrency_semaphores_group.wait_adjust_complete(); + } +} +/** This callback is going to be called just after the service level is removed **/ +future<> database::on_after_service_level_remove(qos::service_level_info sl_info) { + co_await _reader_concurrency_semaphores_group.remove(sl_info.sg); + co_await _view_update_read_concurrency_semaphores_group.remove(sl_info.sg); +} +/** This callback is going to be called just before the service level is changed **/ +future<> database::on_before_service_level_change(qos::service_level_options slo_before, qos::service_level_options slo_after, + qos::service_level_info sl_info) { + if (auto shares_p = std::get_if(&slo_after.shares)) { + _reader_concurrency_semaphores_group.add_or_update(sl_info.sg, *shares_p); + _view_update_read_concurrency_semaphores_group.add_or_update(sl_info.sg, *shares_p); + // the call to add_or_update_read_concurrency_sem will take the semaphore until the adjustment + // is completed, we need to wait for the operation to complete. + co_await _reader_concurrency_semaphores_group.wait_adjust_complete(); + co_await _view_update_read_concurrency_semaphores_group.wait_adjust_complete(); + } +} + +future<> +database::on_effective_service_levels_cache_reloaded() { + co_return; +} + +} diff --git a/replica/database.hh b/replica/database.hh index d0e2534384..94fc42e372 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -47,7 +47,7 @@ #include "utils/phased_barrier.hh" #include "backlog_controller.hh" #include "dirty_memory_manager.hh" -#include "reader_concurrency_semaphore.hh" +#include "reader_concurrency_semaphore_group.hh" #include "db/timeout_clock.hh" #include "querier.hh" #include "cache_temperature.hh" @@ -67,6 +67,7 @@ #include "utils/serialized_action.hh" #include "compaction/compaction_fwd.hh" #include "compaction_group.hh" +#include "service/qos/qos_configuration_change_subscriber.hh" class cell_locker; class cell_locker_stats; @@ -137,6 +138,10 @@ class view_update_generator; } +namespace qos { + class service_level_controller; +} + class mutation_reordered_with_truncate_exception : public std::exception {}; class column_family_test; @@ -1383,7 +1388,7 @@ class db_user_types_storage; // local metadata reads // use table::shard_for_reads()/table::shard_for_writes() for data -class database : public peering_sharded_service { +class database : public peering_sharded_service, qos::qos_configuration_change_subscriber { friend class ::database_test_wrapper; public: enum class table_kind { @@ -1487,13 +1492,13 @@ private: flush_controller _memtable_controller; drain_progress _drain_progress {}; - reader_concurrency_semaphore _read_concurrency_sem; + reader_concurrency_semaphore _streaming_concurrency_sem; reader_concurrency_semaphore _compaction_concurrency_sem; reader_concurrency_semaphore _system_read_concurrency_sem; - // The view update read concurrency semaphore used for view updates coming from user writes. - reader_concurrency_semaphore _view_update_read_concurrency_sem; + // The view update read concurrency semaphores used for view updates coming from user writes. + reader_concurrency_semaphore_group _view_update_read_concurrency_semaphores_group; db::timeout_semaphore _view_update_concurrency_sem{max_memory_pending_view_updates()}; cache_tracker _row_cache_tracker; @@ -1540,6 +1545,10 @@ private: const locator::shared_token_metadata& _shared_token_metadata; lang::manager& _lang_manager; + reader_concurrency_semaphore_group _reader_concurrency_semaphores_group; + scheduling_group _default_read_concurrency_group; + noncopyable_function()> _unsubscribe_qos_configuration_change; + utils::cross_shard_barrier _stop_barrier; db::rate_limiter _rate_limiter; @@ -1579,6 +1588,10 @@ private: future<> create_in_memory_keyspace(const lw_shared_ptr& ksm, locator::effective_replication_map_factory& erm_factory, system_keyspace system); void setup_metrics(); void setup_scylla_memory_diagnostics_producer(); + reader_concurrency_semaphore& read_concurrency_sem(); + reader_concurrency_semaphore& view_update_read_concurrency_sem(); + auto sum_read_concurrency_sem_var(std::invocable auto member); + auto sum_read_concurrency_sem_stat(std::invocable auto stats_member); future<> do_apply(schema_ptr, const frozen_mutation&, tracing::trace_state_ptr tr_state, db::timeout_clock::time_point timeout, db::commitlog_force_sync sync, db::per_partition_rate_limit::info rate_limit_info); future<> do_apply_many(const std::vector&, db::timeout_clock::time_point timeout); @@ -1714,7 +1727,7 @@ public: /// reads, to speed up startup. After startup this should be reverted to /// the normal concurrency. void revert_initial_system_read_concurrency_boost(); - future<> start(); + future<> start(sharded&); future<> shutdown(); future<> stop(); future<> close_tables(table_kind kind_to_close); @@ -1906,6 +1919,14 @@ public: } future<> clear_inactive_reads_for_tablet(table_id table, dht::token_range tablet_range); + + /** This callback is going to be called just before the service level is available **/ + virtual future<> on_before_service_level_add(qos::service_level_options slo, qos::service_level_info sl_info) override; + /** This callback is going to be called just after the service level is removed **/ + virtual future<> on_after_service_level_remove(qos::service_level_info sl_info) override; + /** This callback is going to be called just before the service level is changed **/ + virtual future<> on_before_service_level_change(qos::service_level_options slo_before, qos::service_level_options slo_after, qos::service_level_info sl_info) override; + virtual future<> on_effective_service_levels_cache_reloaded() override; }; // A helper function to parse the directory name back diff --git a/scylla-gdb.py b/scylla-gdb.py index 92082943a9..d098c5be6e 100755 --- a/scylla-gdb.py +++ b/scylla-gdb.py @@ -2311,13 +2311,21 @@ class scylla_memory(gdb.Command): if not db: return + per_service_level_sem = [] + for sg, sem in unordered_map(db["_reader_concurrency_semaphores_group"]["_semaphores"]): + per_service_level_sem.append(scylla_memory.format_semaphore_stats(sem["sem"])) + + per_service_level_vu_sem = [] + for sg, sem in unordered_map(db["_view_update_read_concurrency_semaphores_group"]["_semaphores"]): + per_service_level_vu_sem.append(scylla_memory.format_semaphore_stats(sem["sem"])) + database_typename = lookup_type(['replica::database', 'database'])[1].name gdb.write('Replica:\n') gdb.write(' Read Concurrency Semaphores:\n {}\n {}\n {}\n {}\n'.format( - scylla_memory.format_semaphore_stats(db['_read_concurrency_sem']), + '\n '.join(per_service_level_sem), scylla_memory.format_semaphore_stats(db['_streaming_concurrency_sem']), scylla_memory.format_semaphore_stats(db['_system_read_concurrency_sem']), - scylla_memory.format_semaphore_stats(db['_view_update_read_concurrency_sem']))) + '\n '.join(per_service_level_vu_sem))) gdb.write(' Execution Stages:\n') for es_path in [('_apply_stage',)]: @@ -5809,12 +5817,17 @@ class scylla_read_stats(gdb.Command): semaphores = [gdb.parse_and_eval(arg) for arg in args.split(' ')] else: db = find_db() - semaphores = [db["_read_concurrency_sem"], db["_streaming_concurrency_sem"], db["_system_read_concurrency_sem"]] + semaphores = [db["_streaming_concurrency_sem"], db["_system_read_concurrency_sem"]] semaphores.append(db["_compaction_concurrency_sem"]) try: - semaphores.append(db["_view_update_read_concurrency_sem"]) + semaphores += [weighted_sem["sem"] for (_, weighted_sem) in unordered_map(db["_reader_concurrency_semaphores_group"]["_semaphores"])] except gdb.error: - # 6.2 compatibility + # compatibility with code before per-scheduling-group semaphore + pass + try: + semaphores += [weighted_sem["sem"] for (_, weighted_sem) in unordered_map(db["_view_update_read_concurrency_semaphores_group"]["_semaphores"])] + except gdb.error: + # 2024.2 compatibility pass for semaphore in semaphores: diff --git a/service/qos/service_level_controller.hh b/service/qos/service_level_controller.hh index 3534b62aaa..402113b2d5 100644 --- a/service/qos/service_level_controller.hh +++ b/service/qos/service_level_controller.hh @@ -205,6 +205,23 @@ public: void abort_group0_operations(); + /** + * this is an executor of a function with arguments under a specific + * service level. + * @param service_level_name + * @param func - the function to be executed + * @param args - the arguments to pass to the function. + * @return a future that is resolved when the function's operation is resolved + * (if it returns a future). or a ready future containing the returned value + * from the function/ + */ + template > + requires std::invocable + futurize_t with_service_level(sstring service_level_name, Func&& func) { + service_level& sl = get_service_level(service_level_name); + return with_scheduling_group(sl.sg, std::move(func)); + } + /** * @return the default service level scheduling group (see service_level_controller::initialize). */ diff --git a/test/boost/database_test.cc b/test/boost/database_test.cc index 10a2381d67..647c44da5c 100644 --- a/test/boost/database_test.cc +++ b/test/boost/database_test.cc @@ -61,7 +61,7 @@ public: explicit database_test_wrapper(replica::database& db) : _db(db) { } reader_concurrency_semaphore& get_user_read_concurrency_semaphore() { - return _db._read_concurrency_sem; + return _db.read_concurrency_sem(); } reader_concurrency_semaphore& get_streaming_read_concurrency_semaphore() { return _db._streaming_concurrency_sem; @@ -69,6 +69,14 @@ public: reader_concurrency_semaphore& get_system_read_concurrency_semaphore() { return _db._system_read_concurrency_sem; } + + size_t get_total_user_reader_concurrency_semaphore_memory() { + return _db._reader_concurrency_semaphores_group._total_memory; + } + + size_t get_total_user_reader_concurrency_semaphore_weight() { + return _db._reader_concurrency_semaphores_group._total_weight; + } }; static future<> apply_mutation(sharded& sharded_db, table_id uuid, const mutation& m, bool do_flush = false, @@ -1151,7 +1159,8 @@ SEASTAR_THREAD_TEST_CASE(reader_concurrency_semaphore_selection_test) { auto& db = e.local_db(); database_test_wrapper tdb(db); for (const auto& [sched_group, expected_sem_getter] : scheduling_group_and_expected_semaphore) { - with_scheduling_group(sched_group, [&db, sched_group = sched_group, expected_sem_ptr = &expected_sem_getter(tdb)] { + with_scheduling_group(sched_group, [&db, sched_group = sched_group, &tdb, &expected_sem_getter = expected_sem_getter] { + auto expected_sem_ptr = &expected_sem_getter(tdb); auto& sem = db.get_reader_concurrency_semaphore(); if (&sem != expected_sem_ptr) { BOOST_FAIL(fmt::format("Unexpected semaphore for scheduling group {}, expected {}, got {}", sched_group.name(), expected_sem_ptr->name(), sem.name())); @@ -1296,6 +1305,92 @@ SEASTAR_TEST_CASE(upgrade_sstables) { }); } +SEASTAR_THREAD_TEST_CASE(per_service_level_reader_concurrency_semaphore_test) { + cql_test_config cfg; + do_with_cql_env_thread([] (cql_test_env& e) { + const size_t num_service_levels = 3; + const size_t num_keys_to_insert = 10; + const size_t num_individual_reads_to_test = 50; + auto& db = e.local_db(); + database_test_wrapper dbt(db); + size_t total_memory = dbt.get_total_user_reader_concurrency_semaphore_memory(); + sharded& sl_controller = e.service_level_controller_service(); + std::array sl_names; + qos::service_level_options slo; + size_t expected_total_weight = 0; + auto index_to_weight = [] (size_t i) -> size_t { + return (i + 1)*100; + }; + + // make the default service level take as little memory as possible + slo.shares.emplace(1); + expected_total_weight += 1; + sl_controller.local().add_service_level(qos::service_level_controller::default_service_level_name, slo).get(); + + // Just to make the code more readable. + auto get_reader_concurrency_semaphore_for_sl = [&] (sstring sl_name) -> reader_concurrency_semaphore& { + return *sl_controller.local().with_service_level(sl_name, noncopyable_function([&] { + return &db.get_reader_concurrency_semaphore(); + })).get(); + }; + + for (unsigned i = 0; i < num_service_levels; i++) { + sstring sl_name = format("sl{}", i); + slo.shares.emplace(index_to_weight(i)); + sl_controller.local().add_service_level(sl_name, slo).get(); + expected_total_weight += index_to_weight(i); + // Make sure that the total weight is tracked correctly in the semaphore group + BOOST_REQUIRE_EQUAL(expected_total_weight, dbt.get_total_user_reader_concurrency_semaphore_weight()); + sl_names[i] = sl_name; + size_t total_distributed_memory = 0; + for (unsigned j = 0 ; j <= i ; j++) { + reader_concurrency_semaphore& sem = get_reader_concurrency_semaphore_for_sl(sl_names[j]); + // Make sure that all semaphores that has been created until now - have the right amount of available memory + // after the operation has ended. + // We allow for a small delta of up to num_service_levels. This allows an off-by-one for each semaphore, + // the remainder being added to one of the semaphores. + // We make sure this didn't leak/create memory by checking the total below. + const auto delta = std::abs(ssize_t((index_to_weight(j) * total_memory) / expected_total_weight) - sem.available_resources().memory); + BOOST_REQUIRE_LE(delta, num_service_levels); + total_distributed_memory += sem.available_resources().memory; + } + total_distributed_memory += get_reader_concurrency_semaphore_for_sl(qos::service_level_controller::default_service_level_name).available_resources().memory; + BOOST_REQUIRE_EQUAL(total_distributed_memory, total_memory); + } + + auto get_semaphores_stats_snapshot = [&] () { + std::unordered_map snapshot; + for (auto&& sl_name : sl_names) { + snapshot[sl_name] = get_reader_concurrency_semaphore_for_sl(sl_name).get_stats(); + } + return snapshot; + }; + e.execute_cql("CREATE TABLE tbl (a int, b int, PRIMARY KEY (a));").get(); + + for (unsigned i = 0; i < num_keys_to_insert; i++) { + for (unsigned j = 0; j < num_keys_to_insert; j++) { + e.execute_cql(format("INSERT INTO tbl(a, b) VALUES ({}, {});", i, j)).get(); + } + } + + for (unsigned i = 0; i < num_individual_reads_to_test; i++) { + int random_service_level = tests::random::get_int(num_service_levels - 1); + auto snapshot_before = get_semaphores_stats_snapshot(); + + sl_controller.local().with_service_level(sl_names[random_service_level], noncopyable_function()> ([&] { + return e.execute_cql("SELECT * FROM tbl;").discard_result(); + })).get(); + auto snapshot_after = get_semaphores_stats_snapshot(); + for (auto& [sl_name, stats] : snapshot_before) { + // Make sure that the only semaphore that experienced any activity (at least measured activity) is + // the semaphore that belongs to the current service level. + BOOST_REQUIRE((stats == snapshot_after[sl_name] && sl_name != sl_names[random_service_level]) || + (stats != snapshot_after[sl_name] && sl_name == sl_names[random_service_level])); + } + } + }, std::move(cfg)).get(); +} + SEASTAR_TEST_CASE(populate_from_quarantine_works) { auto tmpdir_for_data = make_lw_shared(); auto db_cfg_ptr = make_shared(); diff --git a/test/boost/reader_concurrency_semaphore_test.cc b/test/boost/reader_concurrency_semaphore_test.cc index cb39f0990f..be0283916d 100644 --- a/test/boost/reader_concurrency_semaphore_test.cc +++ b/test/boost/reader_concurrency_semaphore_test.cc @@ -11,6 +11,7 @@ #include #include "reader_concurrency_semaphore.hh" #include "sstables/sstables_manager.hh" +#include "reader_concurrency_semaphore_group.hh" #include "test/lib/log.hh" #include "test/lib/simple_schema.hh" #include "test/lib/cql_assertions.hh" @@ -1219,6 +1220,113 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_set_resources) { } // namespace reader_concurrency_semaphore_test +SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_group) { + const auto initial_resources = reader_resources{100, 100 * 1024}; + auto serialize_multiplier = utils::updateable_value_source(2); + auto kill_multiplier = utils::updateable_value_source(3); + auto cpu_concurrency = utils::updateable_value_source(1); + + reader_concurrency_semaphore_group sem_group(initial_resources.memory, initial_resources.count, 1000, + utils::updateable_value(serialize_multiplier), + utils::updateable_value(kill_multiplier), + utils::updateable_value(cpu_concurrency)); + auto stop_sem = deferred_stop(sem_group); + + circular_buffer recycle_bin; + + const auto initial_shares = 1000; + struct scheduling_group_with_shares { + scheduling_group sg; + size_t shares; + + scheduling_group_with_shares(scheduling_group sg, size_t shares) : sg(sg), shares(shares) { } + }; + std::vector scheduling_groups; + const auto max_sched_groups = 8; + + auto check_sem_group = [&] { + const auto total_shares = boost::accumulate(scheduling_groups + | boost::adaptors::transformed([] (const scheduling_group_with_shares& sgs) { return sgs.shares; }), size_t(0)); + ssize_t total_memory = 0; + sem_group.foreach_semaphore([&] (scheduling_group sg, reader_concurrency_semaphore& sem) { + const auto res = sem.available_resources(); + BOOST_CHECK_EQUAL(res.count, initial_resources.count); // currently count is not partitioned among the semaphores + auto it = std::find_if(scheduling_groups.begin(), scheduling_groups.end(), [sg] (const scheduling_group_with_shares& sgs) { return sgs.sg == sg; }); + BOOST_REQUIRE(it != scheduling_groups.end()); + const auto shares = it->shares; + const ssize_t expected_memory = std::floor((double(shares) / double(total_shares)) * initial_resources.memory); + const auto memory_diff = std::abs(res.memory - expected_memory); + testlog.trace("{}: {}/{} (shares) -> {}/{} (memory) | res.memory: {}", sg.name(), shares, total_shares, expected_memory, initial_resources.memory, res.memory); + BOOST_CHECK_LE(memory_diff, scheduling_groups.size()); // due to integer division, we allow for ceil/floor (off-by-one), the remainder being added to any semaphore + total_memory += res.memory; + }); + BOOST_CHECK_EQUAL(total_memory, initial_resources.memory); // no off-by-one allowed on the total + }; + + auto add_sg = [&, sgi = 0] () mutable { + if (scheduling_groups.size() >= max_sched_groups) { + return false; + } + testlog.debug("create sg{}", sgi); + scheduling_group sg; + const auto sg_name = format("sg{}", sgi++); + if (recycle_bin.empty()) { + sg = create_scheduling_group(sg_name, initial_shares).get(); + } else { + sg = recycle_bin.front(); + recycle_bin.pop_front(); + rename_scheduling_group(sg, sg_name).get(); + } + scheduling_groups.emplace_back(sg, initial_shares); + sem_group.add_or_update(sg, initial_shares); + sem_group.wait_adjust_complete().get(); + return true; + }; + + while (add_sg()) { + check_sem_group(); + } + + for (size_t i = 0; i < 32; ++i) { + testlog.debug("iteration {}", i); + std::shuffle(scheduling_groups.begin(), scheduling_groups.end(), tests::random::gen()); + switch (tests::random::get_int(0, 3)) { + case 0: // add + { + testlog.debug("maybe add sg"); + if (add_sg()) { + break; + } + [[fallthrough]]; + } + case 1: //remove + { + const auto& sgs = scheduling_groups.back(); + testlog.debug("maybe remove {}", sgs.sg.name()); + if (scheduling_groups.size() > 1) { + testlog.debug("remove {}", sgs.sg.name()); + sem_group.remove(sgs.sg).get(); + recycle_bin.push_back(sgs.sg); + scheduling_groups.pop_back(); + break; + } + [[fallthrough]]; + } + default: //update + { + auto& sgs = scheduling_groups.back(); + const auto new_shares = tests::random::get_int(100, 1000); + sgs.shares = new_shares; + testlog.debug("update {}: {}->{}", sgs.sg.name(), sgs.shares, new_shares); + sem_group.add_or_update(sgs.sg, new_shares); + sem_group.wait_adjust_complete().get(); + break; + } + } + check_sem_group(); + } +} + namespace { class allocating_reader { diff --git a/test/lib/cql_test_env.cc b/test/lib/cql_test_env.cc index 47f2002c0a..5116083576 100644 --- a/test/lib/cql_test_env.cc +++ b/test/lib/cql_test_env.cc @@ -593,6 +593,10 @@ private: _sstm.start(std::ref(*cfg), sstables::storage_manager::config{}).get(); auto stop_sstm = deferred_stop(_sstm); + _sl_controller.start(std::ref(_auth_service), std::ref(_token_metadata), std::ref(abort_sources), qos::service_level_options{.shares = 1000}, scheduling_groups.statement_scheduling_group).get(); + auto stop_sl_controller = defer([this] { _sl_controller.stop().get(); }); + _sl_controller.invoke_on_all(&qos::service_level_controller::start).get(); + lang::manager::config lang_config; lang_config.lua.max_bytes = cfg->user_defined_function_allocation_limit_bytes(); lang_config.lua.max_contiguous = cfg->user_defined_function_contiguous_allocation_limit_bytes(); @@ -618,7 +622,7 @@ private: _db.stop().get(); }); - _db.invoke_on_all(&replica::database::start).get(); + _db.invoke_on_all(&replica::database::start, std::ref(_sl_controller)).get(); smp::invoke_on_all([blocked_reactor_notify_ms] { engine().update_blocked_reactor_notify_ms(blocked_reactor_notify_ms); @@ -659,9 +663,6 @@ private: set_abort_on_internal_error(true); const gms::inet_address listen("127.0.0.1"); - _sl_controller.start(std::ref(_auth_service), std::ref(_token_metadata), std::ref(abort_sources), qos::service_level_options{.shares = 1000}, scheduling_groups.statement_scheduling_group).get(); - auto stop_sl_controller = defer([this] { _sl_controller.stop().get(); }); - _sl_controller.invoke_on_all(&qos::service_level_controller::start).get(); _sys_ks.start(std::ref(_qp), std::ref(_db)).get(); auto stop_sys_kd = defer([this] { @@ -1105,6 +1106,10 @@ public: return cql_transport::messages::propagate_exception_as_future(std::move(msg)); }); } + + virtual sharded& service_level_controller_service() override { + return _sl_controller; + } }; std::atomic single_node_cql_env::active = { false }; diff --git a/test/lib/cql_test_env.hh b/test/lib/cql_test_env.hh index 040684ca82..0779da7b26 100644 --- a/test/lib/cql_test_env.hh +++ b/test/lib/cql_test_env.hh @@ -184,6 +184,8 @@ public: virtual sharded& get_task_manager() = 0; data_dictionary::database data_dictionary(); + + virtual sharded& service_level_controller_service() = 0; }; future<> do_with_cql_env(std::function(cql_test_env&)> func, cql_test_config = {}, std::optional = {});