diff --git a/cql3/query_processor.cc b/cql3/query_processor.cc index d495ea4644..892a9fc95c 100644 --- a/cql3/query_processor.cc +++ b/cql3/query_processor.cc @@ -68,7 +68,7 @@ const std::chrono::minutes prepared_statements_cache::entry_expiry = std::chrono class query_processor::internal_state { service::query_state _qs; public: - internal_state() : _qs(service::client_state::for_internal_calls(), empty_service_permit()) { + internal_state(qos::service_level_controller &sl_controller) : _qs(service::client_state::for_internal_calls(), empty_service_permit(), sl_controller) { } operator service::query_state&() { return _qs; @@ -84,14 +84,15 @@ public: } }; -query_processor::query_processor(service::storage_proxy& proxy, database& db, service::migration_notifier& mn, service::migration_manager& mm, query_processor::memory_config mcfg, cql_config& cql_cfg) +query_processor::query_processor(service::storage_proxy& proxy, database& db, service::migration_notifier& mn, service::migration_manager& mm, query_processor::memory_config mcfg, cql_config& cql_cfg, + sharded &sl_controller) : _migration_subscriber{std::make_unique(this)} , _proxy(proxy) , _db(db) , _mnotifier(mn) , _mm(mm) , _cql_config(cql_cfg) - , _internal_state(new internal_state()) + , _internal_state(new internal_state(sl_controller.local())) , _prepared_cache(prep_cache_log, mcfg.prepared_statment_cache_size) , _authorized_prepared_cache(std::min(std::chrono::milliseconds(_db.get_config().permissions_validity_in_ms()), std::chrono::duration_cast(prepared_statements_cache::entry_expiry)), diff --git a/cql3/query_processor.hh b/cql3/query_processor.hh index 3808f4895a..5558dda085 100644 --- a/cql3/query_processor.hh +++ b/cql3/query_processor.hh @@ -154,7 +154,8 @@ public: static std::unique_ptr parse_statement(const std::string_view& query); - query_processor(service::storage_proxy& proxy, database& db, service::migration_notifier& mn, service::migration_manager& mm, memory_config mcfg, cql_config& cql_cfg); + query_processor(service::storage_proxy& proxy, database& db, service::migration_notifier& mn, service::migration_manager& mm, memory_config mcfg, cql_config& cql_cfg, + sharded &sl_controller); ~query_processor(); diff --git a/main.cc b/main.cc index f0ee0813e7..d62cb0ac59 100644 --- a/main.cc +++ b/main.cc @@ -944,7 +944,7 @@ int main(int ac, char** av) { supervisor::notify("starting query processor"); cql3::query_processor::memory_config qp_mcfg = {memory::stats().total_memory() / 256, memory::stats().total_memory() / 2560}; debug::the_query_processor = &qp; - qp.start(std::ref(proxy), std::ref(db), std::ref(mm_notifier), std::ref(mm), qp_mcfg, std::ref(cql_config)).get(); + qp.start(std::ref(proxy), std::ref(db), std::ref(mm_notifier), std::ref(mm), qp_mcfg, std::ref(cql_config), std::ref(sl_controller)).get(); // #293 - do not stop anything // engine().at_exit([&qp] { return qp.stop(); }); supervisor::notify("initializing batchlog manager"); diff --git a/service/query_state.hh b/service/query_state.hh index 0e9e3a5452..df4e4e486b 100644 --- a/service/query_state.hh +++ b/service/query_state.hh @@ -27,6 +27,9 @@ #include "tracing/tracing.hh" #include "service_permit.hh" +namespace qos { +class service_level_controller; +} namespace service { class query_state final { @@ -34,13 +37,21 @@ private: client_state& _client_state; tracing::trace_state_ptr _trace_state_ptr; service_permit _permit; + std::optional> _sl_controller; public: query_state(client_state& client_state, service_permit permit) + : _client_state(client_state) + , _trace_state_ptr(tracing::trace_state_ptr()) + , _permit(std::move(permit)) + {} + + query_state(client_state& client_state, service_permit permit, qos::service_level_controller &sl_controller) : _client_state(client_state) , _trace_state_ptr(tracing::trace_state_ptr()) , _permit(std::move(permit)) - { } + , _sl_controller(sl_controller) + {} query_state(client_state& client_state, tracing::trace_state_ptr trace_state_ptr, service_permit permit) : _client_state(client_state) @@ -48,6 +59,13 @@ public: , _permit(std::move(permit)) { } + query_state(client_state& client_state, tracing::trace_state_ptr trace_state_ptr, service_permit permit, qos::service_level_controller& sl_controller) + : _client_state(client_state) + , _trace_state_ptr(std::move(trace_state_ptr)) + , _permit(std::move(permit)) + , _sl_controller(sl_controller) + { } + const tracing::trace_state_ptr& get_trace_state() const { return _trace_state_ptr; } @@ -75,6 +93,10 @@ public: return std::move(_permit); } + qos::service_level_controller& get_service_level_controller() const { + return _sl_controller->get(); + } + }; }