transport: initialize query state with service level controller
Query state should be aware of the service level controller in order to properly serve service-level-related CQL queries.
This commit is contained in:
2
main.cc
2
main.cc
@@ -1292,7 +1292,7 @@ int main(int ac, char** av) {
|
||||
db.revert_initial_system_read_concurrency_boost();
|
||||
}).get();
|
||||
|
||||
cql_transport::controller cql_server_ctl(db, auth_service, mm_notifier, gossiper.local(), qp, service_memory_limiter);
|
||||
cql_transport::controller cql_server_ctl(db, auth_service, mm_notifier, gossiper.local(), qp, service_memory_limiter, sl_controller);
|
||||
|
||||
ss.register_client_shutdown_hook("native transport", [&cql_server_ctl] {
|
||||
cql_server_ctl.stop().get();
|
||||
|
||||
@@ -33,14 +33,16 @@ namespace cql_transport {
|
||||
|
||||
static logging::logger logger("cql_server_controller");
|
||||
|
||||
controller::controller(distributed<database>& db, sharded<auth::service>& auth, sharded<service::migration_notifier>& mn, gms::gossiper& gossiper, sharded<cql3::query_processor>& qp, sharded<service::memory_limiter>& ml)
|
||||
controller::controller(distributed<database>& db, sharded<auth::service>& auth, sharded<service::migration_notifier>& mn, gms::gossiper& gossiper, sharded<cql3::query_processor>& qp, sharded<service::memory_limiter>& ml,
|
||||
sharded<qos::service_level_controller>& sl_controller)
|
||||
: _ops_sem(1)
|
||||
, _db(db)
|
||||
, _auth_service(auth)
|
||||
, _mnotifier(mn)
|
||||
, _gossiper(gossiper)
|
||||
, _qp(qp)
|
||||
, _mem_limiter(ml) {
|
||||
, _mem_limiter(ml)
|
||||
, _sl_controller(sl_controller) {
|
||||
}
|
||||
|
||||
future<> controller::start_server() {
|
||||
@@ -147,7 +149,7 @@ future<> controller::do_start_server() {
|
||||
}
|
||||
}
|
||||
|
||||
cserver->start(std::ref(_qp), std::ref(_auth_service), std::ref(_mnotifier), std::ref(_db), std::ref(_mem_limiter), cql_server_config).get();
|
||||
cserver->start(std::ref(_qp), std::ref(_auth_service), std::ref(_mnotifier), std::ref(_db), std::ref(_mem_limiter), cql_server_config, std::ref(_sl_controller)).get();
|
||||
|
||||
try {
|
||||
parallel_for_each(configs, [cserver, keepalive](const listen_cfg & cfg) {
|
||||
|
||||
@@ -24,6 +24,7 @@
|
||||
#include <seastar/core/semaphore.hh>
|
||||
#include <seastar/core/distributed.hh>
|
||||
#include <seastar/core/future.hh>
|
||||
#include "service/qos/service_level_controller.hh"
|
||||
|
||||
using namespace seastar;
|
||||
|
||||
@@ -50,13 +51,14 @@ class controller {
|
||||
gms::gossiper& _gossiper;
|
||||
sharded<cql3::query_processor>& _qp;
|
||||
sharded<service::memory_limiter>& _mem_limiter;
|
||||
sharded<qos::service_level_controller>& _sl_controller;
|
||||
|
||||
future<> set_cql_ready(bool ready);
|
||||
future<> do_start_server();
|
||||
future<> do_stop_server();
|
||||
|
||||
public:
|
||||
controller(distributed<database>&, sharded<auth::service>&, sharded<service::migration_notifier>&, gms::gossiper&, sharded<cql3::query_processor>&, sharded<service::memory_limiter>&);
|
||||
controller(distributed<database>&, sharded<auth::service>&, sharded<service::migration_notifier>&, gms::gossiper&, sharded<cql3::query_processor>&, sharded<service::memory_limiter>&, sharded<qos::service_level_controller>&);
|
||||
future<> start_server();
|
||||
future<> stop_server();
|
||||
future<> stop();
|
||||
|
||||
@@ -160,7 +160,7 @@ event::event_type parse_event_type(const sstring& value)
|
||||
}
|
||||
|
||||
cql_server::cql_server(distributed<cql3::query_processor>& qp, auth::service& auth_service,
|
||||
service::migration_notifier& mn, database& db, service::memory_limiter& ml, cql_server_config config)
|
||||
service::migration_notifier& mn, database& db, service::memory_limiter& ml, cql_server_config config, qos::service_level_controller& sl_controller)
|
||||
: _query_processor(qp)
|
||||
, _config(config)
|
||||
, _max_request_size(config.max_request_size)
|
||||
@@ -168,6 +168,7 @@ cql_server::cql_server(distributed<cql3::query_processor>& qp, auth::service& au
|
||||
, _memory_available(ml.get_semaphore())
|
||||
, _notifier(std::make_unique<event_notifier>(mn))
|
||||
, _auth_service(auth_service)
|
||||
, _sl_controller(sl_controller)
|
||||
{
|
||||
namespace sm = seastar::metrics;
|
||||
|
||||
@@ -952,7 +953,7 @@ cql_server::connection::process_on_shard(unsigned shard, uint16_t stream, fragme
|
||||
(bytes_ostream& linearization_buffer, service::client_state& client_state) mutable {
|
||||
request_reader in(is, linearization_buffer);
|
||||
return process_fn(client_state, server._query_processor, in, stream, _version, _cql_serialization_format,
|
||||
/* FIXME */empty_service_permit(), std::move(trace_state), false).then([] (auto msg) {
|
||||
/* FIXME */empty_service_permit(), std::move(trace_state), false, _server._sl_controller).then([] (auto msg) {
|
||||
// result here has to be foreign ptr
|
||||
return std::get<foreign_ptr<std::unique_ptr<cql_server::response>>>(std::move(msg));
|
||||
});
|
||||
@@ -967,7 +968,7 @@ cql_server::connection::process(uint16_t stream, request_reader in, service::cli
|
||||
fragmented_temporary_buffer::istream is = in.get_stream();
|
||||
|
||||
return process_fn(client_state, _server._query_processor, in, stream,
|
||||
_version, _cql_serialization_format, permit, trace_state, true)
|
||||
_version, _cql_serialization_format, permit, trace_state, true, _server._sl_controller)
|
||||
.then([stream, &client_state, this, is, permit, process_fn, trace_state]
|
||||
(std::variant<foreign_ptr<std::unique_ptr<cql_server::response>>, unsigned> msg) mutable {
|
||||
unsigned* shard = std::get_if<unsigned>(&msg);
|
||||
@@ -981,9 +982,10 @@ cql_server::connection::process(uint16_t stream, request_reader in, service::cli
|
||||
static future<std::variant<foreign_ptr<std::unique_ptr<cql_server::response>>, unsigned>>
|
||||
process_query_internal(service::client_state& client_state, distributed<cql3::query_processor>& qp, request_reader in,
|
||||
uint16_t stream, cql_protocol_version_type version, cql_serialization_format serialization_format,
|
||||
service_permit permit, tracing::trace_state_ptr trace_state, bool init_trace) {
|
||||
service_permit permit, tracing::trace_state_ptr trace_state, bool init_trace,
|
||||
qos::service_level_controller& sl_controller) {
|
||||
auto query = in.read_long_string_view();
|
||||
auto q_state = std::make_unique<cql_query_state>(client_state, trace_state, std::move(permit));
|
||||
auto q_state = std::make_unique<cql_query_state>(client_state, trace_state, std::move(permit), sl_controller);
|
||||
auto& query_state = q_state->query_state;
|
||||
q_state->options = in.read_options(version, serialization_format, qp.local().get_cql_config());
|
||||
auto& options = *q_state->options;
|
||||
@@ -1048,7 +1050,8 @@ future<std::unique_ptr<cql_server::response>> cql_server::connection::process_pr
|
||||
static future<std::variant<foreign_ptr<std::unique_ptr<cql_server::response>>, unsigned>>
|
||||
process_execute_internal(service::client_state& client_state, distributed<cql3::query_processor>& qp, request_reader in,
|
||||
uint16_t stream, cql_protocol_version_type version, cql_serialization_format serialization_format,
|
||||
service_permit permit, tracing::trace_state_ptr trace_state, bool init_trace) {
|
||||
service_permit permit, tracing::trace_state_ptr trace_state, bool init_trace,
|
||||
qos::service_level_controller& sl_controller) {
|
||||
cql3::prepared_cache_key_type cache_key(in.read_short_bytes());
|
||||
auto& id = cql3::prepared_cache_key_type::cql_id(cache_key);
|
||||
bool needs_authorization = false;
|
||||
@@ -1065,7 +1068,7 @@ process_execute_internal(service::client_state& client_state, distributed<cql3::
|
||||
throw exceptions::prepared_query_not_found_exception(id);
|
||||
}
|
||||
|
||||
auto q_state = std::make_unique<cql_query_state>(client_state, trace_state, std::move(permit));
|
||||
auto q_state = std::make_unique<cql_query_state>(client_state, trace_state, std::move(permit), sl_controller);
|
||||
auto& query_state = q_state->query_state;
|
||||
if (version == 1) {
|
||||
std::vector<cql3::raw_value_view> values;
|
||||
@@ -1127,7 +1130,8 @@ future<foreign_ptr<std::unique_ptr<cql_server::response>>> cql_server::connectio
|
||||
static future<std::variant<foreign_ptr<std::unique_ptr<cql_server::response>>, unsigned>>
|
||||
process_batch_internal(service::client_state& client_state, distributed<cql3::query_processor>& qp, request_reader in,
|
||||
uint16_t stream, cql_protocol_version_type version, cql_serialization_format serialization_format,
|
||||
service_permit permit, tracing::trace_state_ptr trace_state, bool init_trace) {
|
||||
service_permit permit, tracing::trace_state_ptr trace_state, bool init_trace,
|
||||
qos::service_level_controller& sl_controller) {
|
||||
if (version == 1) {
|
||||
throw exceptions::protocol_exception("BATCH messages are not support in version 1 of the protocol");
|
||||
}
|
||||
@@ -1212,7 +1216,7 @@ process_batch_internal(service::client_state& client_state, distributed<cql3::qu
|
||||
values.emplace_back(std::move(tmp));
|
||||
}
|
||||
|
||||
auto q_state = std::make_unique<cql_query_state>(client_state, trace_state, std::move(permit));
|
||||
auto q_state = std::make_unique<cql_query_state>(client_state, trace_state, std::move(permit), sl_controller);
|
||||
auto& query_state = q_state->query_state;
|
||||
// #563. CQL v2 encodes query_options in v1 format for batch requests.
|
||||
q_state->options = std::make_unique<cql3::query_options>(cql3::query_options::make_batch_options(std::move(*in.read_options(version < 3 ? 1 : version, serialization_format,
|
||||
|
||||
@@ -40,6 +40,7 @@
|
||||
#include "service_permit.hh"
|
||||
#include <seastar/core/sharded.hh>
|
||||
#include "utils/updateable_value.hh"
|
||||
#include "service/qos/service_level_controller.hh"
|
||||
|
||||
namespace scollectd {
|
||||
|
||||
@@ -102,8 +103,8 @@ struct cql_query_state {
|
||||
service::query_state query_state;
|
||||
std::unique_ptr<cql3::query_options> options;
|
||||
|
||||
cql_query_state(service::client_state& client_state, tracing::trace_state_ptr trace_state_ptr, service_permit permit)
|
||||
: query_state(client_state, std::move(trace_state_ptr), std::move(permit))
|
||||
cql_query_state(service::client_state& client_state, tracing::trace_state_ptr trace_state_ptr, service_permit permit, qos::service_level_controller& sl_controller)
|
||||
: query_state(client_state, std::move(trace_state_ptr), std::move(permit), sl_controller)
|
||||
{ }
|
||||
};
|
||||
|
||||
@@ -157,10 +158,12 @@ private:
|
||||
private:
|
||||
transport_stats _stats = {};
|
||||
auth::service& _auth_service;
|
||||
qos::service_level_controller& _sl_controller;
|
||||
public:
|
||||
cql_server(distributed<cql3::query_processor>& qp, auth::service&,
|
||||
service::migration_notifier& mn, database& db, service::memory_limiter& ml,
|
||||
cql_server_config config);
|
||||
cql_server_config config,
|
||||
qos::service_level_controller& sl_controller);
|
||||
future<> listen(socket_address addr, std::shared_ptr<seastar::tls::credentials_builder> = {}, bool is_shard_aware = false, bool keepalive = false);
|
||||
future<> do_accepts(int which, bool keepalive, socket_address server_addr);
|
||||
future<> stop();
|
||||
|
||||
Reference in New Issue
Block a user