cql: Support accessing service_level_controller from query state
In order to implement service level cql queries, the queries objects needs access to the service_level_controller object when processing. This patch adds this access by embedding it into the query state object. In order to accomplish the above the query processor object needs an access to service_level_controller in order to instantiate the query state. Message-Id: <68f5a7796068a49d9cd004f1cbf34bdf93b418bc.1609234193.git.sarna@scylladb.com>
This commit is contained in:
committed by
Piotr Sarna
parent
e173eaa032
commit
f78707d3fb
@@ -68,7 +68,7 @@ const std::chrono::minutes prepared_statements_cache::entry_expiry = std::chrono
|
||||
class query_processor::internal_state {
|
||||
service::query_state _qs;
|
||||
public:
|
||||
internal_state() : _qs(service::client_state::for_internal_calls(), empty_service_permit()) {
|
||||
internal_state(qos::service_level_controller &sl_controller) : _qs(service::client_state::for_internal_calls(), empty_service_permit(), sl_controller) {
|
||||
}
|
||||
operator service::query_state&() {
|
||||
return _qs;
|
||||
@@ -84,14 +84,15 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
query_processor::query_processor(service::storage_proxy& proxy, database& db, service::migration_notifier& mn, service::migration_manager& mm, query_processor::memory_config mcfg, cql_config& cql_cfg)
|
||||
query_processor::query_processor(service::storage_proxy& proxy, database& db, service::migration_notifier& mn, service::migration_manager& mm, query_processor::memory_config mcfg, cql_config& cql_cfg,
|
||||
sharded<qos::service_level_controller> &sl_controller)
|
||||
: _migration_subscriber{std::make_unique<migration_subscriber>(this)}
|
||||
, _proxy(proxy)
|
||||
, _db(db)
|
||||
, _mnotifier(mn)
|
||||
, _mm(mm)
|
||||
, _cql_config(cql_cfg)
|
||||
, _internal_state(new internal_state())
|
||||
, _internal_state(new internal_state(sl_controller.local()))
|
||||
, _prepared_cache(prep_cache_log, mcfg.prepared_statment_cache_size)
|
||||
, _authorized_prepared_cache(std::min(std::chrono::milliseconds(_db.get_config().permissions_validity_in_ms()),
|
||||
std::chrono::duration_cast<std::chrono::milliseconds>(prepared_statements_cache::entry_expiry)),
|
||||
|
||||
@@ -154,7 +154,8 @@ public:
|
||||
|
||||
static std::unique_ptr<statements::raw::parsed_statement> parse_statement(const std::string_view& query);
|
||||
|
||||
query_processor(service::storage_proxy& proxy, database& db, service::migration_notifier& mn, service::migration_manager& mm, memory_config mcfg, cql_config& cql_cfg);
|
||||
query_processor(service::storage_proxy& proxy, database& db, service::migration_notifier& mn, service::migration_manager& mm, memory_config mcfg, cql_config& cql_cfg,
|
||||
sharded<qos::service_level_controller> &sl_controller);
|
||||
|
||||
~query_processor();
|
||||
|
||||
|
||||
2
main.cc
2
main.cc
@@ -944,7 +944,7 @@ int main(int ac, char** av) {
|
||||
supervisor::notify("starting query processor");
|
||||
cql3::query_processor::memory_config qp_mcfg = {memory::stats().total_memory() / 256, memory::stats().total_memory() / 2560};
|
||||
debug::the_query_processor = &qp;
|
||||
qp.start(std::ref(proxy), std::ref(db), std::ref(mm_notifier), std::ref(mm), qp_mcfg, std::ref(cql_config)).get();
|
||||
qp.start(std::ref(proxy), std::ref(db), std::ref(mm_notifier), std::ref(mm), qp_mcfg, std::ref(cql_config), std::ref(sl_controller)).get();
|
||||
// #293 - do not stop anything
|
||||
// engine().at_exit([&qp] { return qp.stop(); });
|
||||
supervisor::notify("initializing batchlog manager");
|
||||
|
||||
@@ -27,6 +27,9 @@
|
||||
#include "tracing/tracing.hh"
|
||||
#include "service_permit.hh"
|
||||
|
||||
namespace qos {
|
||||
class service_level_controller;
|
||||
}
|
||||
namespace service {
|
||||
|
||||
class query_state final {
|
||||
@@ -34,13 +37,21 @@ private:
|
||||
client_state& _client_state;
|
||||
tracing::trace_state_ptr _trace_state_ptr;
|
||||
service_permit _permit;
|
||||
std::optional<std::reference_wrapper<qos::service_level_controller>> _sl_controller;
|
||||
|
||||
public:
|
||||
query_state(client_state& client_state, service_permit permit)
|
||||
: _client_state(client_state)
|
||||
, _trace_state_ptr(tracing::trace_state_ptr())
|
||||
, _permit(std::move(permit))
|
||||
{}
|
||||
|
||||
query_state(client_state& client_state, service_permit permit, qos::service_level_controller &sl_controller)
|
||||
: _client_state(client_state)
|
||||
, _trace_state_ptr(tracing::trace_state_ptr())
|
||||
, _permit(std::move(permit))
|
||||
{ }
|
||||
, _sl_controller(sl_controller)
|
||||
{}
|
||||
|
||||
query_state(client_state& client_state, tracing::trace_state_ptr trace_state_ptr, service_permit permit)
|
||||
: _client_state(client_state)
|
||||
@@ -48,6 +59,13 @@ public:
|
||||
, _permit(std::move(permit))
|
||||
{ }
|
||||
|
||||
query_state(client_state& client_state, tracing::trace_state_ptr trace_state_ptr, service_permit permit, qos::service_level_controller& sl_controller)
|
||||
: _client_state(client_state)
|
||||
, _trace_state_ptr(std::move(trace_state_ptr))
|
||||
, _permit(std::move(permit))
|
||||
, _sl_controller(sl_controller)
|
||||
{ }
|
||||
|
||||
const tracing::trace_state_ptr& get_trace_state() const {
|
||||
return _trace_state_ptr;
|
||||
}
|
||||
@@ -75,6 +93,10 @@ public:
|
||||
return std::move(_permit);
|
||||
}
|
||||
|
||||
qos::service_level_controller& get_service_level_controller() const {
|
||||
return _sl_controller->get();
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user