From 7383013f43643282d318cf379285f82d45b960e2 Mon Sep 17 00:00:00 2001 From: Piotr Dulikowski Date: Mon, 2 Dec 2024 19:51:03 +0100 Subject: [PATCH] replica/database: add reader concurrency semaphore groups Replace the reader concurrency semaphores for user reads and view updates with the newly introduced reader concurrency semaphore group, which assigns a semaphore for each service level. Each group is statically assigned to some pool of memory on startup and dynamically distribute this memory between the semaphores, relative to the number of shares of the corresponding scheduling group. The intent of having a separate reader concurrency semaphore for each scheduling group is to prevent priority inversion issues due to reads with different priorities waiting on the same semaphore, as well as make memory allocation more fair between service levels due to the adjusted number of shares. --- CMakeLists.txt | 1 + configure.py | 1 + main.cc | 30 +-- reader_concurrency_semaphore.hh | 2 + reader_concurrency_semaphore_group.cc | 112 ++++++++++ reader_concurrency_semaphore_group.hh | 90 ++++++++ replica/database.cc | 192 +++++++++++++++--- replica/database.hh | 33 ++- scylla-gdb.py | 23 ++- service/qos/service_level_controller.hh | 17 ++ test/boost/database_test.cc | 99 ++++++++- .../reader_concurrency_semaphore_test.cc | 108 ++++++++++ test/lib/cql_test_env.cc | 13 +- test/lib/cql_test_env.hh | 2 + 14 files changed, 662 insertions(+), 61 deletions(-) create mode 100644 reader_concurrency_semaphore_group.cc create mode 100644 reader_concurrency_semaphore_group.hh diff --git a/CMakeLists.txt b/CMakeLists.txt index 9c2b90bab2..1a19edda5e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -199,6 +199,7 @@ target_sources(scylla-main tombstone_gc_options.cc tombstone_gc.cc reader_concurrency_semaphore.cc + reader_concurrency_semaphore_group.cc row_cache.cc schema_mutations.cc serializer.cc diff --git a/configure.py b/configure.py index f8e8fd407a..5960768e4f 100755 --- a/configure.py +++ b/configure.py @@ -1160,6 +1160,7 @@ scylla_core = (['message/messaging_service.cc', 'service/topology_coordinator.cc', 'node_ops/node_ops_ctl.cc', 'node_ops/task_manager_module.cc', + 'reader_concurrency_semaphore_group.cc', ] + [Antlr3Grammar('cql3/Cql.g')] \ + scylla_raft_core ) diff --git a/main.cc b/main.cc index b0fe95778e..0b982a60e6 100644 --- a/main.cc +++ b/main.cc @@ -1211,6 +1211,20 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl sstm.stop().get(); }); + static sharded auth_service; + static sharded maintenance_auth_service; + static sharded sl_controller; + debug::the_sl_controller = &sl_controller; + + //starting service level controller + qos::service_level_options default_service_level_configuration; + default_service_level_configuration.shares = 1000; + sl_controller.start(std::ref(auth_service), std::ref(token_metadata), std::ref(stop_signal.as_sharded_abort_source()), default_service_level_configuration, dbcfg.statement_scheduling_group).get(); + sl_controller.invoke_on_all(&qos::service_level_controller::start).get(); + auto stop_sl_controller = defer_verbose_shutdown("service level controller", [] { + sl_controller.stop().get(); + }); + lang::manager::config lang_config; lang_config.lua.max_bytes = cfg->user_defined_function_allocation_limit_bytes(); lang_config.lua.max_contiguous = cfg->user_defined_function_contiguous_allocation_limit_bytes(); @@ -1247,7 +1261,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl // because it obtains the list of pre-existing segments for replay, which must // not include reserve segments created by active commitlogs. db.local().init_commitlog().get(); - db.invoke_on_all(&replica::database::start).get(); + db.invoke_on_all(&replica::database::start, std::ref(sl_controller)).get(); ::sigquit_handler sigquit_handler(db); @@ -1339,20 +1353,6 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl api::unset_server_config(ctx).get(); }); - static sharded auth_service; - static sharded maintenance_auth_service; - static sharded sl_controller; - debug::the_sl_controller = &sl_controller; - - //starting service level controller - qos::service_level_options default_service_level_configuration; - default_service_level_configuration.shares = 1000; - sl_controller.start(std::ref(auth_service), std::ref(token_metadata), std::ref(stop_signal.as_sharded_abort_source()), default_service_level_configuration, dbcfg.statement_scheduling_group).get(); - sl_controller.invoke_on_all(&qos::service_level_controller::start).get(); - auto stop_sl_controller = defer_verbose_shutdown("service level controller", [] { - sl_controller.stop().get(); - }); - static sharded sys_dist_ks; static sharded sys_ks; static sharded view_update_generator; diff --git a/reader_concurrency_semaphore.hh b/reader_concurrency_semaphore.hh index 44aba5437b..b52afefc9a 100644 --- a/reader_concurrency_semaphore.hh +++ b/reader_concurrency_semaphore.hh @@ -124,6 +124,8 @@ public: uint64_t sstables_read = 0; // Permits waiting on something: admission, memory or execution uint64_t waiters = 0; + + friend auto operator<=>(const stats&, const stats&) = default; }; using permit_list_type = bi::list< diff --git a/reader_concurrency_semaphore_group.cc b/reader_concurrency_semaphore_group.cc new file mode 100644 index 0000000000..fcc5ea7894 --- /dev/null +++ b/reader_concurrency_semaphore_group.cc @@ -0,0 +1,112 @@ +/* + * Copyright (C) 2021-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 + */ + +#include "reader_concurrency_semaphore_group.hh" + +// Calling adjust is serialized since 2 adjustments can't happen simultaneosly, +// if they did the behaviour would be undefined. +future<> reader_concurrency_semaphore_group::adjust() { + return with_semaphore(_operations_serializer, 1, [this] () { + ssize_t distributed_memory = 0; + for (auto& [sg, wsem] : _semaphores) { + const ssize_t memory_share = std::floor((double(wsem.weight) / double(_total_weight)) * _total_memory); + wsem.sem.set_resources({_max_concurrent_reads, memory_share}); + distributed_memory += memory_share; + } + // Slap the remainder on one of the semaphores. + // This will be a few bytes, doesn't matter where we add it. + auto& sem = _semaphores.begin()->second.sem; + sem.set_resources(sem.initial_resources() + reader_resources{0, _total_memory - distributed_memory}); + }); +} + +// The call to change_weight is serialized as a consequence of the call to adjust. +future<> reader_concurrency_semaphore_group::change_weight(weighted_reader_concurrency_semaphore& sem, size_t new_weight) { + auto diff = new_weight - sem.weight; + if (diff) { + sem.weight += diff; + _total_weight += diff; + return adjust(); + } + return make_ready_future<>(); +} + +future<> reader_concurrency_semaphore_group::wait_adjust_complete() { + return with_semaphore(_operations_serializer, 1, [] { + return make_ready_future<>(); + }); +} + +future<> reader_concurrency_semaphore_group::stop() noexcept { + return parallel_for_each(_semaphores, [] (auto&& item) { + return item.second.sem.stop(); + }).then([this] { + _semaphores.clear(); + }); +} + +reader_concurrency_semaphore& reader_concurrency_semaphore_group::get(scheduling_group sg) { + return _semaphores.at(sg).sem; +} +reader_concurrency_semaphore* reader_concurrency_semaphore_group::get_or_null(scheduling_group sg) { + auto it = _semaphores.find(sg); + if (it == _semaphores.end()) { + return nullptr; + } else { + return &(it->second.sem); + } +} +reader_concurrency_semaphore& reader_concurrency_semaphore_group::add_or_update(scheduling_group sg, size_t shares) { + auto result = _semaphores.try_emplace( + sg, + 0, + _max_concurrent_reads, + _name_prefix ? format("{}_{}", *_name_prefix, sg.name()) : sg.name(), + _max_queue_length, + _serialize_limit_multiplier, + _kill_limit_multiplier, + _cpu_concurrency + ); + auto&& it = result.first; + // since we serialize all group changes this change wait will be queues and no further operations + // will be executed until this adjustment ends. + (void)change_weight(it->second, shares); + return it->second.sem; +} + +future<> reader_concurrency_semaphore_group::remove(scheduling_group sg) { + auto node_handle = _semaphores.extract(sg); + if (!node_handle.empty()) { + weighted_reader_concurrency_semaphore& sem = node_handle.mapped(); + return sem.sem.stop().then([this, &sem] { + return change_weight(sem, 0); + }).finally([node_handle = std::move(node_handle)] () { + // this holds on to the node handle until we destroy it only after the semaphore + // is stopped properly. + }); + } + return make_ready_future(); +} + +size_t reader_concurrency_semaphore_group::size() { + return _semaphores.size(); +} + +void reader_concurrency_semaphore_group::foreach_semaphore(std::function func) { + for (auto& [sg, wsem] : _semaphores) { + func(sg, wsem.sem); + } +} + +future<> +reader_concurrency_semaphore_group::foreach_semaphore_async(std::function (scheduling_group, reader_concurrency_semaphore&)> func) { + auto units = co_await get_units(_operations_serializer, 1); + for (auto& [sg, wsem] : _semaphores) { + co_await func(sg, wsem.sem); + } +} diff --git a/reader_concurrency_semaphore_group.hh b/reader_concurrency_semaphore_group.hh new file mode 100644 index 0000000000..6d073ef472 --- /dev/null +++ b/reader_concurrency_semaphore_group.hh @@ -0,0 +1,90 @@ +/* + * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 + */ + +/* + * Copyright (C) 2021-present ScyllaDB + */ + +#pragma once + +#include +#include +#include "reader_concurrency_semaphore.hh" +#include +#include + +// The reader_concurrency_semaphore_group is a group of semaphores that shares a common pool of memory, +// the memory is dynamically divided between them according to a relative slice of shares each semaphore +// is given. +// All of the mutating operations on the group are asynchronic and serialized. The semaphores are created +// and managed by the group. + +class reader_concurrency_semaphore_group { + size_t _total_memory; + size_t _total_weight; + size_t _max_concurrent_reads; + size_t _max_queue_length; + utils::updateable_value _serialize_limit_multiplier; + utils::updateable_value _kill_limit_multiplier; + utils::updateable_value _cpu_concurrency; + + friend class database_test_wrapper; + + struct weighted_reader_concurrency_semaphore { + size_t weight; + ssize_t memory_share; + reader_concurrency_semaphore sem; + weighted_reader_concurrency_semaphore(size_t shares, int count, sstring name, size_t max_queue_length, + utils::updateable_value serialize_limit_multiplier, + utils::updateable_value kill_limit_multiplier, + utils::updateable_value cpu_concurrency) + : weight(shares) + , memory_share(0) + , sem(utils::updateable_value(count), 0, name, max_queue_length, std::move(serialize_limit_multiplier), std::move(kill_limit_multiplier), + std::move(cpu_concurrency), reader_concurrency_semaphore::register_metrics::yes) {} + }; + + std::unordered_map _semaphores; + seastar::semaphore _operations_serializer; + std::optional _name_prefix; + + future<> change_weight(weighted_reader_concurrency_semaphore& sem, size_t new_weight); + +public: + reader_concurrency_semaphore_group(size_t memory, size_t max_concurrent_reads, size_t max_queue_length, + utils::updateable_value serialize_limit_multiplier, + utils::updateable_value kill_limit_multiplier, + utils::updateable_value cpu_concurrency, + std::optional name_prefix = std::nullopt) + : _total_memory(memory) + , _total_weight(0) + , _max_concurrent_reads(max_concurrent_reads) + , _max_queue_length(max_queue_length) + , _serialize_limit_multiplier(std::move(serialize_limit_multiplier)) + , _kill_limit_multiplier(std::move(kill_limit_multiplier)) + , _cpu_concurrency(std::move(cpu_concurrency)) + , _operations_serializer(1) + , _name_prefix(std::move(name_prefix)) { } + + ~reader_concurrency_semaphore_group() { + assert(_semaphores.empty()); + } + future<> adjust(); + future<> wait_adjust_complete(); + + future<> stop() noexcept; + reader_concurrency_semaphore& get(scheduling_group sg); + reader_concurrency_semaphore* get_or_null(scheduling_group sg); + reader_concurrency_semaphore& add_or_update(scheduling_group sg, size_t shares); + future<> remove(scheduling_group sg); + size_t size(); + void foreach_semaphore(std::function func); + + future<> foreach_semaphore_async(std::function (scheduling_group, reader_concurrency_semaphore&)> func); + + auto sum_read_concurrency_sem_var(std::invocable auto member) { + using ret_type = std::invoke_result_t; + return boost::accumulate(_semaphores | boost::adaptors::map_values | boost::adaptors::transformed([=] (weighted_reader_concurrency_semaphore& wrcs) { return std::invoke(member, wrcs.sem); }), ret_type(0)); + } +}; diff --git a/replica/database.cc b/replica/database.cc index 87da7731d3..234f2991be 100644 --- a/replica/database.cc +++ b/replica/database.cc @@ -67,6 +67,7 @@ #include "locator/abstract_replication_strategy.hh" #include "timeout_config.hh" #include "tombstone_gc.hh" +#include "service/qos/service_level_controller.hh" #include "replica/data_dictionary_impl.hh" #include "replica/global_table_ptr.hh" @@ -220,14 +221,8 @@ void database::setup_scylla_memory_diagnostics_producer() { writeln("Replica:\n"); writeln(" Read Concurrency Semaphores:\n"); - const std::pair semaphores[] = { - {"user", _read_concurrency_sem}, - {"streaming", _streaming_concurrency_sem}, - {"system", _system_read_concurrency_sem}, - {"compaction", _compaction_concurrency_sem}, - {"view update", _view_update_read_concurrency_sem}, - }; - for (const auto& [name, sem] : semaphores) { + + static auto semaphore_dump = [&writeln] (const sstring& name, const reader_concurrency_semaphore& sem) { const auto initial_res = sem.initial_resources(); const auto available_res = sem.available_resources(); if (sem.is_unlimited()) { @@ -245,7 +240,17 @@ void database::setup_scylla_memory_diagnostics_producer() { utils::to_hr_size(initial_res.memory), sem.get_stats().waiters); } - } + }; + + semaphore_dump("streaming", _streaming_concurrency_sem); + semaphore_dump("system", _system_read_concurrency_sem); + semaphore_dump("compaction", _compaction_concurrency_sem); + _reader_concurrency_semaphores_group.foreach_semaphore([] (scheduling_group sg, reader_concurrency_semaphore& sem) { + semaphore_dump(sg.name(), sem); + }); + _view_update_read_concurrency_semaphores_group.foreach_semaphore([] (scheduling_group sg, reader_concurrency_semaphore& sem) { + semaphore_dump(sg.name(), sem); + }); writeln(" Execution Stages:\n"); const std::pair execution_stage_summaries[] = { @@ -311,6 +316,42 @@ public: } }; +reader_concurrency_semaphore& +database::read_concurrency_sem() { + reader_concurrency_semaphore* sem = _reader_concurrency_semaphores_group.get_or_null(current_scheduling_group()); + if (!sem) { + // this line is commented out, however we shouldn't get here because it means that a user query or even worse, + // some random query was triggered from an unanticipated scheduling groups and this violates the isolation we are trying to achieve. + // It is commented out for two reasons: + // 1. So we will be able to ease into this new system, first testing functionality and effect and only then mix in exceptions and asserts. + // 2. So the series containing those changes will be backportable without causing too harsh regressions (aborts) on one hand and without forcing + // extensive changes on the other hand. + // Follow Up: uncomment this line and run extensive testing. Handle every case of abort. + // seastar::on_internal_error(dblog, format("Tried to run a user query in a wrong scheduling group (scheduling group: '{}')", current_scheduling_group().name())); + sem = _reader_concurrency_semaphores_group.get_or_null(_default_read_concurrency_group); + if (!sem) { + // If we got here - the initialization went very wrong and we can't do anything about it. + // This can only happen if someone touched the initialization code which is assumed to initialize at least + // this default semaphore. + seastar::on_internal_error(dblog, "Default read concurrency semaphore wasn't found, something probably went wrong during database::start"); + } + } + return *sem; +} + +// With same concerns as read_concurrency_sem(). +reader_concurrency_semaphore& +database::view_update_read_concurrency_sem() { + reader_concurrency_semaphore* sem = _view_update_read_concurrency_semaphores_group.get_or_null(current_scheduling_group()); + if (!sem) { + sem = _view_update_read_concurrency_semaphores_group.get_or_null(_default_read_concurrency_group); + if (!sem) { + seastar::on_internal_error(dblog, "Default view update read concurrency semaphore wasn't found, something probably went wrong during database::start"); + } + } + return *sem; +} + database::database(const db::config& cfg, database_config dbcfg, service::migration_notifier& mn, gms::feature_service& feat, const locator::shared_token_metadata& stm, compaction_manager& cm, sstables::storage_manager& sstm, lang::manager& langm, sstables::directory_semaphore& sst_dir_sem, const abort_source& abort, utils::cross_shard_barrier barrier) : _stats(make_lw_shared()) @@ -329,15 +370,6 @@ database::database(const db::config& cfg, database_config dbcfg, service::migrat } return backlog; })) - , _read_concurrency_sem( - utils::updateable_value(max_count_concurrent_reads), - max_memory_concurrent_reads(), - "user", - max_inactive_queue_length(), - _cfg.reader_concurrency_semaphore_serialize_limit_multiplier, - _cfg.reader_concurrency_semaphore_kill_limit_multiplier, - _cfg.reader_concurrency_semaphore_cpu_concurrency, - reader_concurrency_semaphore::register_metrics::yes) // No timeouts or queue length limits - a failure here can kill an entire repair. // Trust the caller to limit concurrency. , _streaming_concurrency_sem( @@ -360,15 +392,14 @@ database::database(const db::config& cfg, database_config dbcfg, service::migrat utils::updateable_value(std::numeric_limits::max()), utils::updateable_value(std::numeric_limits::max()), reader_concurrency_semaphore::register_metrics::yes) - , _view_update_read_concurrency_sem( - utils::updateable_value(max_count_concurrent_view_update_reads), + , _view_update_read_concurrency_semaphores_group( max_memory_concurrent_view_update_reads(), - "view_update", + utils::updateable_value(max_count_concurrent_view_update_reads), max_inactive_view_update_queue_length(), _cfg.view_update_reader_concurrency_semaphore_serialize_limit_multiplier, _cfg.view_update_reader_concurrency_semaphore_kill_limit_multiplier, _cfg.view_update_reader_concurrency_semaphore_cpu_concurrency, - reader_concurrency_semaphore::register_metrics::yes) + "view_update") , _row_cache_tracker(_cfg.index_cache_fraction.operator utils::updateable_value(), cache_tracker::register_metrics::yes) , _apply_stage("db_apply", &database::do_apply) , _version(empty_version) @@ -392,6 +423,10 @@ database::database(const db::config& cfg, database_config dbcfg, service::migrat , _feat(feat) , _shared_token_metadata(stm) , _lang_manager(langm) + , _reader_concurrency_semaphores_group(max_memory_concurrent_reads(), max_count_concurrent_reads, max_inactive_queue_length(), + _cfg.reader_concurrency_semaphore_serialize_limit_multiplier, + _cfg.reader_concurrency_semaphore_kill_limit_multiplier, + _cfg.reader_concurrency_semaphore_cpu_concurrency) , _stop_barrier(std::move(barrier)) , _update_memtable_flush_static_shares_action([this, &cfg] { return _memtable_controller.update_static_shares(cfg.memtable_flush_static_shares()); }) , _memtable_flush_static_shares_observer(cfg.memtable_flush_static_shares.observe(_update_memtable_flush_static_shares_action.make_observer())) @@ -485,6 +520,12 @@ namespace replica { static const metrics::label class_label("class"); + +auto +database::sum_read_concurrency_sem_stat(std::invocable auto stats_member) { + return _reader_concurrency_semaphores_group.sum_read_concurrency_sem_var([&] (reader_concurrency_semaphore& rcs) { return std::invoke(stats_member, rcs.get_stats()); }); +} + void database::setup_metrics() { _dirty_memory_manager.setup_collectd("regular"); @@ -1605,7 +1646,7 @@ query::max_result_size database::get_query_max_result_size() const { reader_concurrency_semaphore& database::get_reader_concurrency_semaphore() { switch (classify_request(_dbcfg)) { - case request_class::user: return _read_concurrency_sem; + case request_class::user: return read_concurrency_sem(); case request_class::system: return _system_read_concurrency_sem; case request_class::maintenance: return _streaming_concurrency_sem; } @@ -1634,9 +1675,15 @@ future<> database::clear_inactive_reads_for_tablet(table_id table, dht::token_ra } future<> database::foreach_reader_concurrency_semaphore(std::function(reader_concurrency_semaphore&)> func) { - for (auto* sem : {&_read_concurrency_sem, &_streaming_concurrency_sem, &_compaction_concurrency_sem, &_system_read_concurrency_sem, &_view_update_read_concurrency_sem}) { + for (auto* sem : {&_streaming_concurrency_sem, &_compaction_concurrency_sem, &_system_read_concurrency_sem}) { co_await func(*sem); } + co_await _reader_concurrency_semaphores_group.foreach_semaphore_async([&] (scheduling_group sg, reader_concurrency_semaphore& sem) -> future<> { + co_await func(sem); + }); + co_await _view_update_read_concurrency_semaphores_group.foreach_semaphore_async([&] (scheduling_group sg, reader_concurrency_semaphore& sem) -> future<> { + co_await func(sem); + }); } std::ostream& operator<<(std::ostream& out, const column_family& cf) { @@ -1935,7 +1982,7 @@ future<> database::do_apply(schema_ptr s, const frozen_mutation& m, tracing::tra co_await coroutine::return_exception(std::runtime_error("view update generator not plugged to push updates")); } - auto lock_f = co_await coroutine::as_future(cf.push_view_replica_updates(_view_update_generator, s, m, timeout, std::move(tr_state), _view_update_read_concurrency_sem)); + auto lock_f = co_await coroutine::as_future(cf.push_view_replica_updates(_view_update_generator, s, m, timeout, std::move(tr_state), view_update_read_concurrency_sem())); if (lock_f.failed()) { auto ex = lock_f.get_exception(); if (is_timeout_exception(ex)) { @@ -2182,7 +2229,54 @@ void database::revert_initial_system_read_concurrency_boost() { dblog.debug("Reverted system read concurrency from initial {} to normal {}", database::max_count_concurrent_reads, database::max_count_system_concurrent_reads); } -future<> database::start() { +future<> database::start(sharded& sl_controller) { + sl_controller.local().register_subscriber(this); + _unsubscribe_qos_configuration_change = [this, &sl_controller] () { + return sl_controller.local().unregister_subscriber(this); + }; + qos::service_level default_service_level = sl_controller.local().get_service_level(qos::service_level_controller::default_service_level_name); + int32_t default_shares = 1000; + if (int32_t* default_shares_p = std::get_if(&(default_service_level.slo.shares))) { + default_shares = *default_shares_p; + } else { + on_internal_error(dblog, "The default service_level should always contain shares value"); + } + + // The former _dbcfg.statement_scheduling_group and the later can be the same group, so we want + // the later to be the accurate one. + _default_read_concurrency_group = default_service_level.sg; + _reader_concurrency_semaphores_group.add_or_update(default_service_level.sg, default_shares); + _view_update_read_concurrency_semaphores_group.add_or_update(default_service_level.sg, default_shares); + + // lets insert the statement scheduling group only if we haven't reused it in sl_controller, + // but it shouldn't happen + if (!_reader_concurrency_semaphores_group.get_or_null(_dbcfg.statement_scheduling_group)) { + // This is super ugly, we need to either force the database to use system scheduling group for non-user queries + // or, if we have user queries running on this scheduling group make it's definition more robust (what runs in it). + // Another ugly thing here is that we have to have a pre-existing knowladge about the shares ammount this group was + // built with. I think we should have a followup that makes this more robust. + _reader_concurrency_semaphores_group.add_or_update(_dbcfg.statement_scheduling_group, 1000); + _view_update_read_concurrency_semaphores_group.add_or_update(_dbcfg.statement_scheduling_group, 1000); + } + + // This will wait for the semaphores to be given some memory. + // We need this since the below statements (get_distributed_service_levels in particular) will need + // to run queries and for this they will need to admit some memory. + co_await _reader_concurrency_semaphores_group.wait_adjust_complete(); + co_await _view_update_read_concurrency_semaphores_group.wait_adjust_complete(); + + auto service_levels = co_await sl_controller.local().get_distributed_service_levels(qos::query_context::group0); + for (auto&& service_level_record : service_levels) { + auto service_level = sl_controller.local().get_service_level(service_level_record.first); + if (service_level.slo.shares_name && *service_level.slo.shares_name != qos::service_level_controller::default_service_level_name) { + // We know slo.shares is valid becuse we know that slo.shares_name is valid + _reader_concurrency_semaphores_group.add_or_update(service_level.sg, std::get(service_level.slo.shares)); + _view_update_read_concurrency_semaphores_group.add_or_update(service_level.sg, std::get(service_level.slo.shares)); + } + } + + co_await _reader_concurrency_semaphores_group.adjust(); + co_await _view_update_read_concurrency_semaphores_group.adjust(); _large_data_handler->start(); // We need the compaction manager ready early so we can reshard. _compaction_manager.enable(); @@ -2215,10 +2309,12 @@ future<> database::shutdown() { } future<> database::stop() { + if (_unsubscribe_qos_configuration_change) { + co_await std::exchange(_unsubscribe_qos_configuration_change, {})(); + } if (!_shutdown) { co_await shutdown(); } - // try to ensure that CL has done disk flushing if (_commitlog) { dblog.info("Shutting down commitlog"); @@ -2250,11 +2346,11 @@ future<> database::stop() { dblog.info("Stopping querier cache"); co_await _querier_cache.stop(); dblog.info("Stopping concurrency semaphores"); - co_await _read_concurrency_sem.stop(); + co_await _reader_concurrency_semaphores_group.stop(); + co_await _view_update_read_concurrency_semaphores_group.stop(); co_await _streaming_concurrency_sem.stop(); co_await _compaction_concurrency_sem.stop(); co_await _system_read_concurrency_sem.stop(); - co_await _view_update_read_concurrency_sem.stop(); dblog.info("Joining memtable update action"); co_await _update_memtable_flush_static_shares_action.join(); } @@ -3029,3 +3125,41 @@ future>> query_data( } } // namespace replica + +namespace replica { + +/** This callback is going to be called just before the service level is available **/ +future<> database::on_before_service_level_add(qos::service_level_options slo, qos::service_level_info sl_info) { + if (auto shares_p = std::get_if(&slo.shares)) { + _reader_concurrency_semaphores_group.add_or_update(sl_info.sg, *shares_p); + _view_update_read_concurrency_semaphores_group.add_or_update(sl_info.sg, *shares_p); + // the call to add_or_update_read_concurrency_sem will take the semaphore until the adjustment + // is completed, we need to wait for the operation to complete. + co_await _reader_concurrency_semaphores_group.wait_adjust_complete(); + co_await _view_update_read_concurrency_semaphores_group.wait_adjust_complete(); + } +} +/** This callback is going to be called just after the service level is removed **/ +future<> database::on_after_service_level_remove(qos::service_level_info sl_info) { + co_await _reader_concurrency_semaphores_group.remove(sl_info.sg); + co_await _view_update_read_concurrency_semaphores_group.remove(sl_info.sg); +} +/** This callback is going to be called just before the service level is changed **/ +future<> database::on_before_service_level_change(qos::service_level_options slo_before, qos::service_level_options slo_after, + qos::service_level_info sl_info) { + if (auto shares_p = std::get_if(&slo_after.shares)) { + _reader_concurrency_semaphores_group.add_or_update(sl_info.sg, *shares_p); + _view_update_read_concurrency_semaphores_group.add_or_update(sl_info.sg, *shares_p); + // the call to add_or_update_read_concurrency_sem will take the semaphore until the adjustment + // is completed, we need to wait for the operation to complete. + co_await _reader_concurrency_semaphores_group.wait_adjust_complete(); + co_await _view_update_read_concurrency_semaphores_group.wait_adjust_complete(); + } +} + +future<> +database::on_effective_service_levels_cache_reloaded() { + co_return; +} + +} diff --git a/replica/database.hh b/replica/database.hh index d0e2534384..94fc42e372 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -47,7 +47,7 @@ #include "utils/phased_barrier.hh" #include "backlog_controller.hh" #include "dirty_memory_manager.hh" -#include "reader_concurrency_semaphore.hh" +#include "reader_concurrency_semaphore_group.hh" #include "db/timeout_clock.hh" #include "querier.hh" #include "cache_temperature.hh" @@ -67,6 +67,7 @@ #include "utils/serialized_action.hh" #include "compaction/compaction_fwd.hh" #include "compaction_group.hh" +#include "service/qos/qos_configuration_change_subscriber.hh" class cell_locker; class cell_locker_stats; @@ -137,6 +138,10 @@ class view_update_generator; } +namespace qos { + class service_level_controller; +} + class mutation_reordered_with_truncate_exception : public std::exception {}; class column_family_test; @@ -1383,7 +1388,7 @@ class db_user_types_storage; // local metadata reads // use table::shard_for_reads()/table::shard_for_writes() for data -class database : public peering_sharded_service { +class database : public peering_sharded_service, qos::qos_configuration_change_subscriber { friend class ::database_test_wrapper; public: enum class table_kind { @@ -1487,13 +1492,13 @@ private: flush_controller _memtable_controller; drain_progress _drain_progress {}; - reader_concurrency_semaphore _read_concurrency_sem; + reader_concurrency_semaphore _streaming_concurrency_sem; reader_concurrency_semaphore _compaction_concurrency_sem; reader_concurrency_semaphore _system_read_concurrency_sem; - // The view update read concurrency semaphore used for view updates coming from user writes. - reader_concurrency_semaphore _view_update_read_concurrency_sem; + // The view update read concurrency semaphores used for view updates coming from user writes. + reader_concurrency_semaphore_group _view_update_read_concurrency_semaphores_group; db::timeout_semaphore _view_update_concurrency_sem{max_memory_pending_view_updates()}; cache_tracker _row_cache_tracker; @@ -1540,6 +1545,10 @@ private: const locator::shared_token_metadata& _shared_token_metadata; lang::manager& _lang_manager; + reader_concurrency_semaphore_group _reader_concurrency_semaphores_group; + scheduling_group _default_read_concurrency_group; + noncopyable_function()> _unsubscribe_qos_configuration_change; + utils::cross_shard_barrier _stop_barrier; db::rate_limiter _rate_limiter; @@ -1579,6 +1588,10 @@ private: future<> create_in_memory_keyspace(const lw_shared_ptr& ksm, locator::effective_replication_map_factory& erm_factory, system_keyspace system); void setup_metrics(); void setup_scylla_memory_diagnostics_producer(); + reader_concurrency_semaphore& read_concurrency_sem(); + reader_concurrency_semaphore& view_update_read_concurrency_sem(); + auto sum_read_concurrency_sem_var(std::invocable auto member); + auto sum_read_concurrency_sem_stat(std::invocable auto stats_member); future<> do_apply(schema_ptr, const frozen_mutation&, tracing::trace_state_ptr tr_state, db::timeout_clock::time_point timeout, db::commitlog_force_sync sync, db::per_partition_rate_limit::info rate_limit_info); future<> do_apply_many(const std::vector&, db::timeout_clock::time_point timeout); @@ -1714,7 +1727,7 @@ public: /// reads, to speed up startup. After startup this should be reverted to /// the normal concurrency. void revert_initial_system_read_concurrency_boost(); - future<> start(); + future<> start(sharded&); future<> shutdown(); future<> stop(); future<> close_tables(table_kind kind_to_close); @@ -1906,6 +1919,14 @@ public: } future<> clear_inactive_reads_for_tablet(table_id table, dht::token_range tablet_range); + + /** This callback is going to be called just before the service level is available **/ + virtual future<> on_before_service_level_add(qos::service_level_options slo, qos::service_level_info sl_info) override; + /** This callback is going to be called just after the service level is removed **/ + virtual future<> on_after_service_level_remove(qos::service_level_info sl_info) override; + /** This callback is going to be called just before the service level is changed **/ + virtual future<> on_before_service_level_change(qos::service_level_options slo_before, qos::service_level_options slo_after, qos::service_level_info sl_info) override; + virtual future<> on_effective_service_levels_cache_reloaded() override; }; // A helper function to parse the directory name back diff --git a/scylla-gdb.py b/scylla-gdb.py index 92082943a9..d098c5be6e 100755 --- a/scylla-gdb.py +++ b/scylla-gdb.py @@ -2311,13 +2311,21 @@ class scylla_memory(gdb.Command): if not db: return + per_service_level_sem = [] + for sg, sem in unordered_map(db["_reader_concurrency_semaphores_group"]["_semaphores"]): + per_service_level_sem.append(scylla_memory.format_semaphore_stats(sem["sem"])) + + per_service_level_vu_sem = [] + for sg, sem in unordered_map(db["_view_update_read_concurrency_semaphores_group"]["_semaphores"]): + per_service_level_vu_sem.append(scylla_memory.format_semaphore_stats(sem["sem"])) + database_typename = lookup_type(['replica::database', 'database'])[1].name gdb.write('Replica:\n') gdb.write(' Read Concurrency Semaphores:\n {}\n {}\n {}\n {}\n'.format( - scylla_memory.format_semaphore_stats(db['_read_concurrency_sem']), + '\n '.join(per_service_level_sem), scylla_memory.format_semaphore_stats(db['_streaming_concurrency_sem']), scylla_memory.format_semaphore_stats(db['_system_read_concurrency_sem']), - scylla_memory.format_semaphore_stats(db['_view_update_read_concurrency_sem']))) + '\n '.join(per_service_level_vu_sem))) gdb.write(' Execution Stages:\n') for es_path in [('_apply_stage',)]: @@ -5809,12 +5817,17 @@ class scylla_read_stats(gdb.Command): semaphores = [gdb.parse_and_eval(arg) for arg in args.split(' ')] else: db = find_db() - semaphores = [db["_read_concurrency_sem"], db["_streaming_concurrency_sem"], db["_system_read_concurrency_sem"]] + semaphores = [db["_streaming_concurrency_sem"], db["_system_read_concurrency_sem"]] semaphores.append(db["_compaction_concurrency_sem"]) try: - semaphores.append(db["_view_update_read_concurrency_sem"]) + semaphores += [weighted_sem["sem"] for (_, weighted_sem) in unordered_map(db["_reader_concurrency_semaphores_group"]["_semaphores"])] except gdb.error: - # 6.2 compatibility + # compatibility with code before per-scheduling-group semaphore + pass + try: + semaphores += [weighted_sem["sem"] for (_, weighted_sem) in unordered_map(db["_view_update_read_concurrency_semaphores_group"]["_semaphores"])] + except gdb.error: + # 2024.2 compatibility pass for semaphore in semaphores: diff --git a/service/qos/service_level_controller.hh b/service/qos/service_level_controller.hh index 3534b62aaa..402113b2d5 100644 --- a/service/qos/service_level_controller.hh +++ b/service/qos/service_level_controller.hh @@ -205,6 +205,23 @@ public: void abort_group0_operations(); + /** + * this is an executor of a function with arguments under a specific + * service level. + * @param service_level_name + * @param func - the function to be executed + * @param args - the arguments to pass to the function. + * @return a future that is resolved when the function's operation is resolved + * (if it returns a future). or a ready future containing the returned value + * from the function/ + */ + template > + requires std::invocable + futurize_t with_service_level(sstring service_level_name, Func&& func) { + service_level& sl = get_service_level(service_level_name); + return with_scheduling_group(sl.sg, std::move(func)); + } + /** * @return the default service level scheduling group (see service_level_controller::initialize). */ diff --git a/test/boost/database_test.cc b/test/boost/database_test.cc index 10a2381d67..647c44da5c 100644 --- a/test/boost/database_test.cc +++ b/test/boost/database_test.cc @@ -61,7 +61,7 @@ public: explicit database_test_wrapper(replica::database& db) : _db(db) { } reader_concurrency_semaphore& get_user_read_concurrency_semaphore() { - return _db._read_concurrency_sem; + return _db.read_concurrency_sem(); } reader_concurrency_semaphore& get_streaming_read_concurrency_semaphore() { return _db._streaming_concurrency_sem; @@ -69,6 +69,14 @@ public: reader_concurrency_semaphore& get_system_read_concurrency_semaphore() { return _db._system_read_concurrency_sem; } + + size_t get_total_user_reader_concurrency_semaphore_memory() { + return _db._reader_concurrency_semaphores_group._total_memory; + } + + size_t get_total_user_reader_concurrency_semaphore_weight() { + return _db._reader_concurrency_semaphores_group._total_weight; + } }; static future<> apply_mutation(sharded& sharded_db, table_id uuid, const mutation& m, bool do_flush = false, @@ -1151,7 +1159,8 @@ SEASTAR_THREAD_TEST_CASE(reader_concurrency_semaphore_selection_test) { auto& db = e.local_db(); database_test_wrapper tdb(db); for (const auto& [sched_group, expected_sem_getter] : scheduling_group_and_expected_semaphore) { - with_scheduling_group(sched_group, [&db, sched_group = sched_group, expected_sem_ptr = &expected_sem_getter(tdb)] { + with_scheduling_group(sched_group, [&db, sched_group = sched_group, &tdb, &expected_sem_getter = expected_sem_getter] { + auto expected_sem_ptr = &expected_sem_getter(tdb); auto& sem = db.get_reader_concurrency_semaphore(); if (&sem != expected_sem_ptr) { BOOST_FAIL(fmt::format("Unexpected semaphore for scheduling group {}, expected {}, got {}", sched_group.name(), expected_sem_ptr->name(), sem.name())); @@ -1296,6 +1305,92 @@ SEASTAR_TEST_CASE(upgrade_sstables) { }); } +SEASTAR_THREAD_TEST_CASE(per_service_level_reader_concurrency_semaphore_test) { + cql_test_config cfg; + do_with_cql_env_thread([] (cql_test_env& e) { + const size_t num_service_levels = 3; + const size_t num_keys_to_insert = 10; + const size_t num_individual_reads_to_test = 50; + auto& db = e.local_db(); + database_test_wrapper dbt(db); + size_t total_memory = dbt.get_total_user_reader_concurrency_semaphore_memory(); + sharded& sl_controller = e.service_level_controller_service(); + std::array sl_names; + qos::service_level_options slo; + size_t expected_total_weight = 0; + auto index_to_weight = [] (size_t i) -> size_t { + return (i + 1)*100; + }; + + // make the default service level take as little memory as possible + slo.shares.emplace(1); + expected_total_weight += 1; + sl_controller.local().add_service_level(qos::service_level_controller::default_service_level_name, slo).get(); + + // Just to make the code more readable. + auto get_reader_concurrency_semaphore_for_sl = [&] (sstring sl_name) -> reader_concurrency_semaphore& { + return *sl_controller.local().with_service_level(sl_name, noncopyable_function([&] { + return &db.get_reader_concurrency_semaphore(); + })).get(); + }; + + for (unsigned i = 0; i < num_service_levels; i++) { + sstring sl_name = format("sl{}", i); + slo.shares.emplace(index_to_weight(i)); + sl_controller.local().add_service_level(sl_name, slo).get(); + expected_total_weight += index_to_weight(i); + // Make sure that the total weight is tracked correctly in the semaphore group + BOOST_REQUIRE_EQUAL(expected_total_weight, dbt.get_total_user_reader_concurrency_semaphore_weight()); + sl_names[i] = sl_name; + size_t total_distributed_memory = 0; + for (unsigned j = 0 ; j <= i ; j++) { + reader_concurrency_semaphore& sem = get_reader_concurrency_semaphore_for_sl(sl_names[j]); + // Make sure that all semaphores that has been created until now - have the right amount of available memory + // after the operation has ended. + // We allow for a small delta of up to num_service_levels. This allows an off-by-one for each semaphore, + // the remainder being added to one of the semaphores. + // We make sure this didn't leak/create memory by checking the total below. + const auto delta = std::abs(ssize_t((index_to_weight(j) * total_memory) / expected_total_weight) - sem.available_resources().memory); + BOOST_REQUIRE_LE(delta, num_service_levels); + total_distributed_memory += sem.available_resources().memory; + } + total_distributed_memory += get_reader_concurrency_semaphore_for_sl(qos::service_level_controller::default_service_level_name).available_resources().memory; + BOOST_REQUIRE_EQUAL(total_distributed_memory, total_memory); + } + + auto get_semaphores_stats_snapshot = [&] () { + std::unordered_map snapshot; + for (auto&& sl_name : sl_names) { + snapshot[sl_name] = get_reader_concurrency_semaphore_for_sl(sl_name).get_stats(); + } + return snapshot; + }; + e.execute_cql("CREATE TABLE tbl (a int, b int, PRIMARY KEY (a));").get(); + + for (unsigned i = 0; i < num_keys_to_insert; i++) { + for (unsigned j = 0; j < num_keys_to_insert; j++) { + e.execute_cql(format("INSERT INTO tbl(a, b) VALUES ({}, {});", i, j)).get(); + } + } + + for (unsigned i = 0; i < num_individual_reads_to_test; i++) { + int random_service_level = tests::random::get_int(num_service_levels - 1); + auto snapshot_before = get_semaphores_stats_snapshot(); + + sl_controller.local().with_service_level(sl_names[random_service_level], noncopyable_function()> ([&] { + return e.execute_cql("SELECT * FROM tbl;").discard_result(); + })).get(); + auto snapshot_after = get_semaphores_stats_snapshot(); + for (auto& [sl_name, stats] : snapshot_before) { + // Make sure that the only semaphore that experienced any activity (at least measured activity) is + // the semaphore that belongs to the current service level. + BOOST_REQUIRE((stats == snapshot_after[sl_name] && sl_name != sl_names[random_service_level]) || + (stats != snapshot_after[sl_name] && sl_name == sl_names[random_service_level])); + } + } + }, std::move(cfg)).get(); +} + SEASTAR_TEST_CASE(populate_from_quarantine_works) { auto tmpdir_for_data = make_lw_shared(); auto db_cfg_ptr = make_shared(); diff --git a/test/boost/reader_concurrency_semaphore_test.cc b/test/boost/reader_concurrency_semaphore_test.cc index cb39f0990f..be0283916d 100644 --- a/test/boost/reader_concurrency_semaphore_test.cc +++ b/test/boost/reader_concurrency_semaphore_test.cc @@ -11,6 +11,7 @@ #include #include "reader_concurrency_semaphore.hh" #include "sstables/sstables_manager.hh" +#include "reader_concurrency_semaphore_group.hh" #include "test/lib/log.hh" #include "test/lib/simple_schema.hh" #include "test/lib/cql_assertions.hh" @@ -1219,6 +1220,113 @@ SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_set_resources) { } // namespace reader_concurrency_semaphore_test +SEASTAR_THREAD_TEST_CASE(test_reader_concurrency_semaphore_group) { + const auto initial_resources = reader_resources{100, 100 * 1024}; + auto serialize_multiplier = utils::updateable_value_source(2); + auto kill_multiplier = utils::updateable_value_source(3); + auto cpu_concurrency = utils::updateable_value_source(1); + + reader_concurrency_semaphore_group sem_group(initial_resources.memory, initial_resources.count, 1000, + utils::updateable_value(serialize_multiplier), + utils::updateable_value(kill_multiplier), + utils::updateable_value(cpu_concurrency)); + auto stop_sem = deferred_stop(sem_group); + + circular_buffer recycle_bin; + + const auto initial_shares = 1000; + struct scheduling_group_with_shares { + scheduling_group sg; + size_t shares; + + scheduling_group_with_shares(scheduling_group sg, size_t shares) : sg(sg), shares(shares) { } + }; + std::vector scheduling_groups; + const auto max_sched_groups = 8; + + auto check_sem_group = [&] { + const auto total_shares = boost::accumulate(scheduling_groups + | boost::adaptors::transformed([] (const scheduling_group_with_shares& sgs) { return sgs.shares; }), size_t(0)); + ssize_t total_memory = 0; + sem_group.foreach_semaphore([&] (scheduling_group sg, reader_concurrency_semaphore& sem) { + const auto res = sem.available_resources(); + BOOST_CHECK_EQUAL(res.count, initial_resources.count); // currently count is not partitioned among the semaphores + auto it = std::find_if(scheduling_groups.begin(), scheduling_groups.end(), [sg] (const scheduling_group_with_shares& sgs) { return sgs.sg == sg; }); + BOOST_REQUIRE(it != scheduling_groups.end()); + const auto shares = it->shares; + const ssize_t expected_memory = std::floor((double(shares) / double(total_shares)) * initial_resources.memory); + const auto memory_diff = std::abs(res.memory - expected_memory); + testlog.trace("{}: {}/{} (shares) -> {}/{} (memory) | res.memory: {}", sg.name(), shares, total_shares, expected_memory, initial_resources.memory, res.memory); + BOOST_CHECK_LE(memory_diff, scheduling_groups.size()); // due to integer division, we allow for ceil/floor (off-by-one), the remainder being added to any semaphore + total_memory += res.memory; + }); + BOOST_CHECK_EQUAL(total_memory, initial_resources.memory); // no off-by-one allowed on the total + }; + + auto add_sg = [&, sgi = 0] () mutable { + if (scheduling_groups.size() >= max_sched_groups) { + return false; + } + testlog.debug("create sg{}", sgi); + scheduling_group sg; + const auto sg_name = format("sg{}", sgi++); + if (recycle_bin.empty()) { + sg = create_scheduling_group(sg_name, initial_shares).get(); + } else { + sg = recycle_bin.front(); + recycle_bin.pop_front(); + rename_scheduling_group(sg, sg_name).get(); + } + scheduling_groups.emplace_back(sg, initial_shares); + sem_group.add_or_update(sg, initial_shares); + sem_group.wait_adjust_complete().get(); + return true; + }; + + while (add_sg()) { + check_sem_group(); + } + + for (size_t i = 0; i < 32; ++i) { + testlog.debug("iteration {}", i); + std::shuffle(scheduling_groups.begin(), scheduling_groups.end(), tests::random::gen()); + switch (tests::random::get_int(0, 3)) { + case 0: // add + { + testlog.debug("maybe add sg"); + if (add_sg()) { + break; + } + [[fallthrough]]; + } + case 1: //remove + { + const auto& sgs = scheduling_groups.back(); + testlog.debug("maybe remove {}", sgs.sg.name()); + if (scheduling_groups.size() > 1) { + testlog.debug("remove {}", sgs.sg.name()); + sem_group.remove(sgs.sg).get(); + recycle_bin.push_back(sgs.sg); + scheduling_groups.pop_back(); + break; + } + [[fallthrough]]; + } + default: //update + { + auto& sgs = scheduling_groups.back(); + const auto new_shares = tests::random::get_int(100, 1000); + sgs.shares = new_shares; + testlog.debug("update {}: {}->{}", sgs.sg.name(), sgs.shares, new_shares); + sem_group.add_or_update(sgs.sg, new_shares); + sem_group.wait_adjust_complete().get(); + break; + } + } + check_sem_group(); + } +} + namespace { class allocating_reader { diff --git a/test/lib/cql_test_env.cc b/test/lib/cql_test_env.cc index 47f2002c0a..5116083576 100644 --- a/test/lib/cql_test_env.cc +++ b/test/lib/cql_test_env.cc @@ -593,6 +593,10 @@ private: _sstm.start(std::ref(*cfg), sstables::storage_manager::config{}).get(); auto stop_sstm = deferred_stop(_sstm); + _sl_controller.start(std::ref(_auth_service), std::ref(_token_metadata), std::ref(abort_sources), qos::service_level_options{.shares = 1000}, scheduling_groups.statement_scheduling_group).get(); + auto stop_sl_controller = defer([this] { _sl_controller.stop().get(); }); + _sl_controller.invoke_on_all(&qos::service_level_controller::start).get(); + lang::manager::config lang_config; lang_config.lua.max_bytes = cfg->user_defined_function_allocation_limit_bytes(); lang_config.lua.max_contiguous = cfg->user_defined_function_contiguous_allocation_limit_bytes(); @@ -618,7 +622,7 @@ private: _db.stop().get(); }); - _db.invoke_on_all(&replica::database::start).get(); + _db.invoke_on_all(&replica::database::start, std::ref(_sl_controller)).get(); smp::invoke_on_all([blocked_reactor_notify_ms] { engine().update_blocked_reactor_notify_ms(blocked_reactor_notify_ms); @@ -659,9 +663,6 @@ private: set_abort_on_internal_error(true); const gms::inet_address listen("127.0.0.1"); - _sl_controller.start(std::ref(_auth_service), std::ref(_token_metadata), std::ref(abort_sources), qos::service_level_options{.shares = 1000}, scheduling_groups.statement_scheduling_group).get(); - auto stop_sl_controller = defer([this] { _sl_controller.stop().get(); }); - _sl_controller.invoke_on_all(&qos::service_level_controller::start).get(); _sys_ks.start(std::ref(_qp), std::ref(_db)).get(); auto stop_sys_kd = defer([this] { @@ -1105,6 +1106,10 @@ public: return cql_transport::messages::propagate_exception_as_future(std::move(msg)); }); } + + virtual sharded& service_level_controller_service() override { + return _sl_controller; + } }; std::atomic single_node_cql_env::active = { false }; diff --git a/test/lib/cql_test_env.hh b/test/lib/cql_test_env.hh index 040684ca82..0779da7b26 100644 --- a/test/lib/cql_test_env.hh +++ b/test/lib/cql_test_env.hh @@ -184,6 +184,8 @@ public: virtual sharded& get_task_manager() = 0; data_dictionary::database data_dictionary(); + + virtual sharded& service_level_controller_service() = 0; }; future<> do_with_cql_env(std::function(cql_test_env&)> func, cql_test_config = {}, std::optional = {});