From 23e889d710af936375d256d7f53925aff4854cb2 Mon Sep 17 00:00:00 2001 From: Eliran Sinvani Date: Mon, 28 Jan 2019 16:19:30 +0200 Subject: [PATCH 01/16] auth: add support for role attributes In the general case roles might come with attributes attached to them these attributes can originate in mechanisms such as LDAP where in the undelying directory each entity can have a key:value data structure. This patch add support for such attributes in the role manager interface, it also implements the attribute support in the standard role manager in the form of a table with an attribute map in the distributed system keyspace. Message-Id: --- auth/role_manager.hh | 27 ++++++++++- auth/standard_role_manager.cc | 86 ++++++++++++++++++++++++++++++++++- auth/standard_role_manager.hh | 8 ++++ 3 files changed, 119 insertions(+), 2 deletions(-) diff --git a/auth/role_manager.hh b/auth/role_manager.hh index 12a9210639..4361b1f959 100644 --- a/auth/role_manager.hh +++ b/auth/role_manager.hh @@ -101,6 +101,11 @@ enum class recursive_role_query { yes, no }; /// access-control should never be enforced in implementations. /// class role_manager { +public: + // this type represents a mapping between a role and some attribute value. + // i.e: given attribute name 'a' this map holds role name and it's assigned + // value of 'a'. + using attribute_vals = std::unordered_map; public: virtual ~role_manager() = default; @@ -164,6 +169,26 @@ public: /// \returns an exceptional future with \ref nonexistant_role if the role does not exist. /// virtual future can_login(std::string_view role_name) const = 0; -}; + /// + /// \returns the value of the named attribute, if one is set. + /// + virtual future> get_attribute(std::string_view role_name, std::string_view attribute_name) const = 0; + + /// + /// \returns a mapping of each role's value for the named attribute, if one is set for the role. + /// + virtual future query_attribute_for_all(std::string_view attribute_name) const = 0; + + /// Sets `attribute_name` with `attribute_value` for `role_name`. + /// \returns an exceptional future with nonexistant_role if the role does not exist. + /// + virtual future<> set_attribute(std::string_view role_name, std::string_view attribute_name, std::string_view attribute_value) const = 0; + + /// Removes `attribute_name` for `role_name`. + /// \returns an exceptional future with nonexistant_role if the role does not exist. + /// \note: This is a no-op if the role does not have the named attribute set. + /// + virtual future<> remove_attribute(std::string_view role_name, std::string_view attribute_name) const = 0; +}; } diff --git a/auth/standard_role_manager.cc b/auth/standard_role_manager.cc index 57d253e844..5dcc23a413 100644 --- a/auth/standard_role_manager.cc +++ b/auth/standard_role_manager.cc @@ -53,6 +53,26 @@ constexpr std::string_view qualified_name("system_auth.role_members"); } +namespace role_attributes_table { +constexpr std::string_view name{"role_attributes", 15}; + +static std::string_view qualified_name() noexcept { + static const sstring instance = format("{}.{}", AUTH_KS, name); + return instance; +} +static std::string_view creation_query() noexcept { + static const sstring instance = format( + "CREATE TABLE {} (" + " role text," + " name text," + " value text," + " PRIMARY KEY(role, name)" + ")", + qualified_name()); + + return instance; +} +} } static logging::logger log("standard_role_manager"); @@ -152,6 +172,11 @@ future<> standard_role_manager::create_metadata_tables_if_missing() const { meta::role_members_table::name, _qp, create_role_members_query, + _migration_manager), + create_metadata_table_if_missing( + meta::role_attributes_table::name, + _qp, + meta::role_attributes_table::creation_query(), _migration_manager)).discard_result(); } @@ -345,6 +370,12 @@ future<> standard_role_manager::drop(std::string_view role_name) const { }); }; + // Delete all attributes for that role + const auto remove_attributes_of = [this, role_name] { + static const sstring query = format("DELETE FROM {} WHERE role = ?", meta::role_attributes_table::qualified_name()); + return _qp.execute_internal(query, {sstring(role_name)}).discard_result(); + }; + // Finally, delete the role itself. auto delete_role = [this, role_name] { static const sstring query = format("DELETE FROM {} WHERE {} = ?", @@ -358,7 +389,8 @@ future<> standard_role_manager::drop(std::string_view role_name) const { {sstring(role_name)}).discard_result(); }; - return when_all_succeed(revoke_from_members(), revoke_members_of()).then_unpack([delete_role = std::move(delete_role)] { + return when_all_succeed(revoke_from_members(), revoke_members_of(), + remove_attributes_of()).then_unpack([delete_role = std::move(delete_role)] { return delete_role(); }); }); @@ -536,4 +568,56 @@ future standard_role_manager::can_login(std::string_view role_name) const }); } +future> standard_role_manager::get_attribute(std::string_view role_name, std::string_view attribute_name) const { + static const sstring query = format("SELECT name, value FROM {} WHERE role = ? AND name = ?", meta::role_attributes_table::qualified_name()); + return _qp.execute_internal(query, {sstring(role_name), sstring(attribute_name)}).then([] (shared_ptr result_set) { + if (!result_set->empty()) { + const cql3::untyped_result_set_row &row = result_set->one(); + return std::optional(row.get_as("value")); + } + return std::optional{}; + }); +} + +future standard_role_manager::query_attribute_for_all (std::string_view attribute_name) const { + return query_all().then([this, attribute_name] (role_set roles) { + return do_with(attribute_vals{}, [this, attribute_name, roles = std::move(roles)] (attribute_vals &role_to_att_val) { + return parallel_for_each(roles.begin(), roles.end(), [this, &role_to_att_val, attribute_name] (sstring role) { + return get_attribute(role, attribute_name).then([&role_to_att_val, role] (std::optional att_val) { + if (att_val) { + role_to_att_val.emplace(std::move(role), std::move(*att_val)); + } + }); + }).then([&role_to_att_val] () { + return make_ready_future(std::move(role_to_att_val)); + }); + }); + }); +} + +future<> standard_role_manager::set_attribute(std::string_view role_name, std::string_view attribute_name, std::string_view attribute_value) const { + static const sstring query = format("INSERT INTO {} (role, name, value) VALUES (?, ?, ?)", meta::role_attributes_table::qualified_name()); + return do_with(sstring(role_name), sstring(attribute_name), sstring(attribute_value), [this] (sstring& role_name, sstring &attribute_name, + sstring &attribute_value) { + return exists(role_name).then([&role_name, &attribute_name, &attribute_value, this] (bool role_exists) { + if (!role_exists) { + throw auth::nonexistant_role(role_name); + } + return _qp.execute_internal(query, {sstring(role_name), sstring(attribute_name), sstring(attribute_value)}).discard_result(); + }); + }); + +} + +future<> standard_role_manager::remove_attribute(std::string_view role_name, std::string_view attribute_name) const { + static const sstring query = format("DELETE FROM {} WHERE role = ? AND name = ?", meta::role_attributes_table::qualified_name()); + return do_with(sstring(role_name), sstring(attribute_name), [this] (sstring& role_name, sstring &attribute_name) { + return exists(role_name).then([&role_name, &attribute_name, this] (bool role_exists) { + if (!role_exists) { + throw auth::nonexistant_role(role_name); + } + return _qp.execute_internal(query, {sstring(role_name), sstring(attribute_name)}).discard_result(); + }); + }); +} } diff --git a/auth/standard_role_manager.hh b/auth/standard_role_manager.hh index de2b0f1c64..c0fc42f489 100644 --- a/auth/standard_role_manager.hh +++ b/auth/standard_role_manager.hh @@ -83,6 +83,14 @@ public: virtual future can_login(std::string_view role_name) const override; + virtual future> get_attribute(std::string_view role_name, std::string_view attribute_name) const override; + + virtual future query_attribute_for_all(std::string_view attribute_name) const override; + + virtual future<> set_attribute(std::string_view role_name, std::string_view attribute_name, std::string_view attribute_value) const override; + + virtual future<> remove_attribute(std::string_view role_name, std::string_view attribute_name) const override; + private: enum class membership_change { add, remove }; From 4fea0762c2a4be024dd0566e8bfc669761d7884a Mon Sep 17 00:00:00 2001 From: Eliran Sinvani Date: Mon, 28 Jan 2019 16:26:02 +0200 Subject: [PATCH 02/16] service/qos: add common definitions Adding common definitions that will be used by the performance isolation classes. Mainly defines the common ground for configuring a service level through the service level options structure. Message-Id: <12476f4a8e21af3a4c7a892683940698f3beacce.1609160860.git.sarna@scylladb.com> --- configure.py | 1 + service/qos/qos_common.cc | 24 +++++++++++++++ service/qos/qos_common.hh | 63 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 88 insertions(+) create mode 100644 service/qos/qos_common.cc create mode 100644 service/qos/qos_common.hh diff --git a/configure.py b/configure.py index 5ca9e6d567..ba7be61120 100755 --- a/configure.py +++ b/configure.py @@ -836,6 +836,7 @@ scylla_core = (['database.cc', 'service/misc_services.cc', 'service/pager/paging_state.cc', 'service/pager/query_pagers.cc', + 'service/qos/qos_common.cc', 'streaming/stream_task.cc', 'streaming/stream_session.cc', 'streaming/stream_request.cc', diff --git a/service/qos/qos_common.cc b/service/qos/qos_common.cc new file mode 100644 index 0000000000..d055307d36 --- /dev/null +++ b/service/qos/qos_common.cc @@ -0,0 +1,24 @@ +/* + * Copyright (C) 2020 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#include "qos_common.hh" +namespace qos { +} diff --git a/service/qos/qos_common.hh b/service/qos/qos_common.hh new file mode 100644 index 0000000000..cb9bceeac3 --- /dev/null +++ b/service/qos/qos_common.hh @@ -0,0 +1,63 @@ +/* + * Copyright (C) 2020 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#pragma once + +#include "seastarx.hh" +#include +#include +#include +#include + + + +namespace qos { + +/** + * a structure that holds the configuration for + * a service level. + */ +struct service_level_options { + bool operator==(const service_level_options& other) const = default; + bool operator!=(const service_level_options& other) const = default; +}; + +using service_levels_info = std::map; + +/// +/// A logical argument error for a service_level statement operation. +/// +class service_level_argument_exception : public std::invalid_argument { +public: + using std::invalid_argument::invalid_argument; +}; + +/// +/// An exception to indicate that the service level given as parameter doesn't exist. +/// +class nonexistant_service_level_exception : public service_level_argument_exception { +public: + nonexistant_service_level_exception(sstring service_level_name) + : service_level_argument_exception(format("Service Level {} doesn't exists.", service_level_name)) { + } +}; + +} From dd74556ad951080ce8fb183eeb2b7f205f9b3d94 Mon Sep 17 00:00:00 2001 From: Eliran Sinvani Date: Mon, 28 Jan 2019 18:05:24 +0200 Subject: [PATCH 03/16] service/qos: adding service level table to the distributed keyspace This patch adds the service level table and functions to manipulate it to the distributed keyspace. Message-Id: --- db/system_distributed_keyspace.cc | 50 +++++++++++++++++++++++++++++++ db/system_distributed_keyspace.hh | 6 ++++ 2 files changed, 56 insertions(+) diff --git a/db/system_distributed_keyspace.cc b/db/system_distributed_keyspace.cc index e274f0aca2..4eecb77d98 100644 --- a/db/system_distributed_keyspace.cc +++ b/db/system_distributed_keyspace.cc @@ -128,12 +128,24 @@ schema_ptr cdc_timestamps() { static const sstring CDC_TIMESTAMPS_KEY = "timestamps"; +schema_ptr service_levels() { + static thread_local auto schema = [] { + auto id = generate_legacy_id(system_distributed_keyspace::NAME, system_distributed_keyspace::SERVICE_LEVELS); + return schema_builder(system_distributed_keyspace::NAME, system_distributed_keyspace::SERVICE_LEVELS, std::make_optional(id)) + .with_column("service_level", utf8_type, column_kind::partition_key) + .with_version(db::system_keyspace::generate_schema_version(id)) + .build(); + }(); + return schema; +} + static std::vector all_tables() { return { view_build_status(), cdc_generations(), cdc_desc(), cdc_timestamps(), + service_levels(), }; } @@ -516,4 +528,42 @@ system_distributed_keyspace::get_cdc_desc_v1_timestamps(context ctx) { co_return res; } +future system_distributed_keyspace::get_service_levels() const { + static sstring prepared_query = format("SELECT * FROM {}.{};", NAME, SERVICE_LEVELS); + + return _qp.execute_internal(prepared_query, {}).then([] (shared_ptr result_set) { + qos::service_levels_info service_levels; + for (auto &&row : *result_set) { + auto service_level_name = row.get_as("service_level"); + qos::service_level_options slo{}; + service_levels.emplace(service_level_name, slo); + } + return service_levels; + }); +} + +future system_distributed_keyspace::get_service_level(sstring service_level_name) const { + static sstring prepared_query = format("SELECT * FROM {}.{} WHERE service_level = ?;", NAME, SERVICE_LEVELS); + return _qp.execute_internal(prepared_query, {service_level_name}).then([] (shared_ptr result_set) { + qos::service_levels_info service_levels; + if (!result_set->empty()) { + auto &&row = result_set->one(); + auto service_level_name = row.get_as("service_level"); + qos::service_level_options slo{}; + service_levels.emplace(service_level_name, slo); + } + return service_levels; + }); +} + +future<> system_distributed_keyspace::set_service_level(sstring service_level_name, qos::service_level_options slo) const { + static sstring prepared_puery = format("INSERT INTO {}.{} (service_level) VALUES (?);", NAME, SERVICE_LEVELS); + return _qp.execute_internal(prepared_puery, {service_level_name}).discard_result(); +} + +future<> system_distributed_keyspace::drop_service_level(sstring service_level_name) const { + static sstring prepared_query = format("DELETE FROM {}.{} WHERE service_level= ?;", NAME, SERVICE_LEVELS); + return _qp.execute_internal(prepared_query, {service_level_name}).discard_result(); +} + } diff --git a/db/system_distributed_keyspace.hh b/db/system_distributed_keyspace.hh index 14c94f83b0..3bd7f4e445 100644 --- a/db/system_distributed_keyspace.hh +++ b/db/system_distributed_keyspace.hh @@ -24,6 +24,7 @@ #include "bytes.hh" #include "schema_fwd.hh" #include "service/migration_manager.hh" +#include "service/qos/qos_common.hh" #include "utils/UUID.hh" #include "cdc/generation_id.hh" @@ -52,6 +53,7 @@ class system_distributed_keyspace { public: static constexpr auto NAME = "system_distributed"; static constexpr auto VIEW_BUILD_STATUS = "view_build_status"; + static constexpr auto SERVICE_LEVELS = "service_levels"; /* Nodes use this table to communicate new CDC stream generations to other nodes. */ static constexpr auto CDC_TOPOLOGY_DESCRIPTION = "cdc_generation_descriptions"; @@ -111,6 +113,10 @@ public: future cdc_current_generation_timestamp(context); + future get_service_levels() const; + future get_service_level(sstring service_level_name) const; + future<> set_service_level(sstring service_level_name, qos::service_level_options slo) const; + future<> drop_service_level(sstring service_level_name) const; }; } From 3ecdab30a1291f19e1feeb77ee4c6b4f31287176 Mon Sep 17 00:00:00 2001 From: Eliran Sinvani Date: Tue, 29 Jan 2019 18:10:15 +0200 Subject: [PATCH 04/16] service_levels: Add documentation for distributed tables This patch adds documentation for the distributed tables used for service_level feature and their meaning and usage. Message-Id: <5b7d2be166c2381ed33094b4545fafe0f142583f.1609170862.git.sarna@scylladb.com> --- docs/service_levels.md | 38 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) create mode 100644 docs/service_levels.md diff --git a/docs/service_levels.md b/docs/service_levels.md new file mode 100644 index 0000000000..65b5b904dc --- /dev/null +++ b/docs/service_levels.md @@ -0,0 +1,38 @@ +## Service Level Distributed Data + +There are two system tables that are used to facilitate the service level feature. + + +### Service Level Attachment Table + +```CREATE TABLE system_auth.role_attributes ( + role text, + attribute_name text, + attribute_value text, + PRIMARY KEY (role, attribute_name)) +``` +The table was created with generality in mind, but its purpose is to record +information about roles. The table columns meaning are: +*role* - the name of the role that the attribute belongs to. +*attribute_name* - the name of the attribute for the role. +*attribute_value* - the value of the specified attribute. + +For the service level, the relevant attribute name is `service_level`. +So for example in order to find out which `service_level` is attached to role `r` +one can run the following query: + +```SELECT * FROM system_auth.role_attributes WHERE role='r' and attribute_name='service_level' +``` + +### Service Level Configuration Table + +```CREATE TABLE system_distributed.service_levels ( + service_level text PRIMARY KEY); +``` + +The table is used to store and distribute the service levels configuration. +The table column names meanings are: +*service_level* - the name of the service level. + +This table is currently a stub and does not hold any parameters yet. + ``` From a54ea4667bf07b4787a09dbc7f4630501b282bf2 Mon Sep 17 00:00:00 2001 From: Eliran Sinvani Date: Tue, 29 Jan 2019 09:22:16 +0200 Subject: [PATCH 05/16] service/qos: adding service level controller adding the service level controller implementation. The implementation follows the design in: https://docs.google.com/document/d/1RrSTZ3ZX86-YDt2POwAVwFeKN9uX8frEvATJda5n1FU/edit?usp=sharing Some interfaces were added for registration with system componnents. The method of registration is chosen over a constructor parameter, due to the componnets being initialized prior to the service level controller being created. Message-Id: --- configure.py | 1 + service/qos/service_level_controller.cc | 360 ++++++++++++++++++++++++ service/qos/service_level_controller.hh | 241 ++++++++++++++++ 3 files changed, 602 insertions(+) create mode 100644 service/qos/service_level_controller.cc create mode 100644 service/qos/service_level_controller.hh diff --git a/configure.py b/configure.py index ba7be61120..b7e3f1a670 100755 --- a/configure.py +++ b/configure.py @@ -837,6 +837,7 @@ scylla_core = (['database.cc', 'service/pager/paging_state.cc', 'service/pager/query_pagers.cc', 'service/qos/qos_common.cc', + 'service/qos/service_level_controller.cc', 'streaming/stream_task.cc', 'streaming/stream_session.cc', 'streaming/stream_request.cc', diff --git a/service/qos/service_level_controller.cc b/service/qos/service_level_controller.cc new file mode 100644 index 0000000000..549be2de26 --- /dev/null +++ b/service/qos/service_level_controller.cc @@ -0,0 +1,360 @@ +/* + * Copyright (C) 2020 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#include +#include "service_level_controller.hh" +#include "service/storage_service.hh" +#include "service/priority_manager.hh" +#include "message/messaging_service.hh" +#include "db/system_distributed_keyspace.hh" + +namespace qos { + +sstring service_level_controller::default_service_level_name = "default"; + + + +service_level_controller::service_level_controller(sharded& auth_service, service_level_options default_service_level_config): + _sl_data_accessor(nullptr), + _auth_service(auth_service) +{ + if (this_shard_id() == global_controller) { + _global_controller_db = std::make_unique(); + _global_controller_db->default_service_level_config = default_service_level_config; + } +} + +future<> service_level_controller::add_service_level(sstring name, service_level_options slo, bool is_static) { + return container().invoke_on(global_controller, [=] (service_level_controller &sl_controller) { + return with_semaphore(sl_controller._global_controller_db->notifications_serializer, 1, [&sl_controller, name, slo, is_static] () { + return sl_controller.do_add_service_level(name, slo, is_static); + }); + }); +} + +future<> service_level_controller::remove_service_level(sstring name, bool remove_static) { + return container().invoke_on(global_controller, [=] (service_level_controller &sl_controller) { + return with_semaphore(sl_controller._global_controller_db->notifications_serializer, 1, [&sl_controller, name, remove_static] () { + return sl_controller.do_remove_service_level(name, remove_static); + }); + }); +} + +future<> service_level_controller::start() { + if (this_shard_id() != global_controller) { + return make_ready_future(); + } + return with_semaphore(_global_controller_db->notifications_serializer, 1, [this] () { + return do_add_service_level(default_service_level_name, _global_controller_db->default_service_level_config, true).then([this] () { + return container().invoke_on_all([] (service_level_controller& sl) { + sl._default_service_level = sl.get_service_level(default_service_level_name); + }); + }); + }); +} + + +void service_level_controller::set_distributed_data_accessor(service_level_distributed_data_accessor_ptr sl_data_accessor) { + // unregistering the accessor is always legal + if (!sl_data_accessor) { + _sl_data_accessor = nullptr; + } + + // Registration of a new accessor can be done only when the _sl_data_accessor is not already set. + // This behavior is intended to allow to unit testing debug to set this value without having + // overriden by storage_proxy + if (!_sl_data_accessor) { + _sl_data_accessor = sl_data_accessor; + } +} + +future<> service_level_controller::stop() { + // unregister from the service level distributed data accessor. + _sl_data_accessor = nullptr; + if (this_shard_id() == global_controller) { + // abort the loop of the distributed data checking if it is running + _global_controller_db->dist_data_update_aborter.request_abort(); + _global_controller_db->notifications_serializer.broken(); + } + return make_ready_future(); +} + +future<> service_level_controller::update_service_levels_from_distributed_data() { + + if (!_sl_data_accessor) { + return make_ready_future(); + } + + return container().invoke_on(global_controller, [] (service_level_controller& sl_controller) { + return with_semaphore(sl_controller._global_controller_db->notifications_serializer, 1, [&sl_controller] () { + return async([&sl_controller] () { + service_levels_info service_levels = sl_controller._sl_data_accessor->get_service_levels().get0(); + service_levels_info service_levels_for_add_or_update; + service_levels_info service_levels_for_delete; + + auto current_it = sl_controller._service_levels_db.begin(); + auto new_state_it = service_levels.begin(); + + // we want to detect 3 kinds of objects in one pass - + // 1. new service levels that have been added to the distributed keyspace + // 2. existing service levels that have changed + // 3. removed service levels + // this loop is batching together add/update operation and remove operation + // then they are all executed together.The reason for this is to allow for + // firstly delete all that there is to be deleted and only then adding new + // service levels. + while (current_it != sl_controller._service_levels_db.end() && new_state_it != service_levels.end()) { + if (current_it->first == new_state_it->first) { + //the service level exists on both the cureent and new state. + if (current_it->second.slo != new_state_it->second) { + // The service level configuration is different + // in the new state and the old state, meaning it needs to be updated. + service_levels_for_add_or_update.insert(*new_state_it); + } + current_it++; + new_state_it++; + } else if (current_it->first < new_state_it->first) { + //The service level does not exists in the new state so it needs to be + //removed, but only if it is not static since static configurations dont + //come from the distributed keyspace but from code. + if (!current_it->second.is_static) { + service_levels_for_delete.emplace(current_it->first, current_it->second.slo); + } + current_it++; + } else { /*new_it->first < current_it->first */ + // The service level exits in the new state but not in the old state + // so it needs to be added. + service_levels_for_add_or_update.insert(*new_state_it); + new_state_it++; + } + } + + for (; current_it != sl_controller._service_levels_db.end(); current_it++) { + service_levels_for_delete.emplace(current_it->first, current_it->second.slo); + } + std::copy(new_state_it, service_levels.end(), std::inserter(service_levels_for_add_or_update, + service_levels_for_add_or_update.end())); + + for (auto&& sl : service_levels_for_delete) { + sl_controller.do_remove_service_level(sl.first, false).get(); + } + for (auto&& sl : service_levels_for_add_or_update) { + sl_controller.do_add_service_level(sl.first, sl.second).get(); + } + }); + }); + }); +} + +future service_level_controller::find_service_level(auth::role_set roles) { + // FIXME: this arbitrary order based on names is OK for now, because the service levels do not yet + // have any parameters. Once they do, this comparison could be based on comparing the parameters. + static auto sl_compare = std::less(); + auto& role_manager = _auth_service.local().underlying_role_manager(); + + // converts a list of roles into the chosen service level. + return ::map_reduce(roles.begin(), roles.end(), [&role_manager, this] (const sstring& role) { + return role_manager.get_attribute(role, "service_level").then_wrapped([this, role] (future> sl_name_fut) { + try { + std::optional sl_name = sl_name_fut.get0(); + if (! sl_name) { + return sl_name; + } + auto sl_it = _service_levels_db.find(*sl_name); + if ( sl_it == _service_levels_db.end()) { + return std::optional{}; + } else { + return sl_name; + } + } catch(...) { // when we fail, we act as if the attribute does not exist so the node + // will not be brought down. + return std::optional{}; + } + }); + }, std::optional{}, [this] (std::optional first, std::optional second) { + if (!second) { + return first; + } else if (!first) { + return second; + } else { + return std::optional{ sl_compare(*first, *second) ? second : first }; + } + }).then([] (std::optional sl) { + return sl? *sl:default_service_level_name; + }); +} + +future<> service_level_controller::notify_service_level_added(sstring name, service_level sl_data) { + _service_levels_db.emplace(name, sl_data); + return make_ready_future(); +} + +future<> service_level_controller::notify_service_level_updated(sstring name, service_level_options slo) { + auto sl_it = _service_levels_db.find(name); + future<> f = make_ready_future(); + if (sl_it != _service_levels_db.end()) { + sl_it->second.slo = slo; + } + return f; +} + +future<> service_level_controller::notify_service_level_removed(sstring name) { + auto sl_it = _service_levels_db.find(name); + if (sl_it != _service_levels_db.end()) { + _service_levels_db.erase(sl_it); + } + return make_ready_future(); +} + +void service_level_controller::update_from_distributed_data(std::chrono::duration interval) { + (void)container().invoke_on(global_controller, [interval] (service_level_controller& global_sl) { + if (global_sl._global_controller_db->distributed_data_update.available()) { + global_sl._global_controller_db->distributed_data_update = repeat([interval, &global_sl] { + return sleep_abortable(std::chrono::duration_cast(interval), + global_sl._global_controller_db->dist_data_update_aborter).then_wrapped([&global_sl] (future<>&& f) { + try { + f.get(); + return global_sl.update_service_levels_from_distributed_data().then([] { + return stop_iteration::no; + }); + } + catch (const sleep_aborted& e) { + return make_ready_future>(stop_iteration::yes); + } + }); + }); + } + }); +} + +future<> service_level_controller::add_distributed_service_level(sstring name, service_level_options slo, bool if_not_exists) { + set_service_level_op_type add_type = if_not_exists ? set_service_level_op_type::add_if_not_exists : set_service_level_op_type::add; + return set_distributed_service_level(name, slo, add_type); +} + +future<> service_level_controller::alter_distributed_service_level(sstring name, service_level_options slo) { + return set_distributed_service_level(name, slo, set_service_level_op_type::alter); +} + +future<> service_level_controller::drop_distributed_service_level(sstring name, bool if_exists) { + return _sl_data_accessor->get_service_levels().then([this, name, if_exists] (qos::service_levels_info sl_info) { + auto it = sl_info.find(name); + if (it == sl_info.end()) { + if (if_exists) { + return make_ready_future(); + } else { + return make_exception_future(nonexistant_service_level_exception(name)); + } + } else { + auto& role_manager = _auth_service.local().underlying_role_manager(); + return role_manager.query_attribute_for_all("service_level").then( [&role_manager, name] (auth::role_manager::attribute_vals attributes) { + return parallel_for_each(attributes.begin(), attributes.end(), [&role_manager, name] (auto&& attr) { + if (attr.second == name) { + return role_manager.remove_attribute(attr.first, "service_level"); + } else { + return make_ready_future(); + } + }); + }).then([this, name] { + return _sl_data_accessor->drop_service_level(name); + }); + } + }); +} + +future service_level_controller::get_distributed_service_levels() { + return _sl_data_accessor->get_service_levels(); +} + +future service_level_controller::get_distributed_service_level(sstring service_level_name) { + return _sl_data_accessor->get_service_level(service_level_name); +} + +future<> service_level_controller::set_distributed_service_level(sstring name, service_level_options slo, set_service_level_op_type op_type) { + return _sl_data_accessor->get_service_levels().then([this, name, slo, op_type] (qos::service_levels_info sl_info) { + auto it = sl_info.find(name); + // test for illegal requests or requests that should terminate without any action + if (it == sl_info.end()) { + if (op_type == set_service_level_op_type::alter) { + return make_exception_future(exceptions::invalid_request_exception(format("The service level '{}' desn't exist.", name))); + } + } else { + if (op_type == set_service_level_op_type::add) { + return make_exception_future(exceptions::invalid_request_exception(format("The service level '{}' already exists.", name))); + } else if (op_type == set_service_level_op_type::add_if_not_exists) { + return make_ready_future(); + } + } + return _sl_data_accessor->set_service_level(name, slo); + }); +} + +future<> service_level_controller::do_add_service_level(sstring name, service_level_options slo, bool is_static) { + auto service_level_it = _service_levels_db.find(name); + if (is_static) { + _global_controller_db->static_configurations[name] = slo; + } + if (service_level_it != _service_levels_db.end()) { + if ((is_static && service_level_it->second.is_static) || !is_static) { + if ((service_level_it->second.is_static) && (!is_static)) { + service_level_it->second.is_static = false; + } + return container().invoke_on_all(&service_level_controller::notify_service_level_updated, name, slo); + } else { + // this means we set static layer when the the service level + // is running of the non static configuration. so we have nothing + // else to do since we already saved the static configuration. + return make_ready_future(); + } + } else { + return do_with(service_level{.slo = slo, .is_static = is_static}, std::move(name), [this] (service_level& sl, sstring& name) { + return container().invoke_on_all(&service_level_controller::notify_service_level_added, name, sl); + }); + } + return make_ready_future(); +} + +future<> service_level_controller::do_remove_service_level(sstring name, bool remove_static) { + auto service_level_it = _service_levels_db.find(name); + if (service_level_it != _service_levels_db.end()) { + auto static_conf_it = _global_controller_db->static_configurations.end(); + bool static_exists = false; + if (remove_static) { + _global_controller_db->static_configurations.erase(name); + } else { + static_conf_it = _global_controller_db->static_configurations.find(name); + static_exists = static_conf_it != _global_controller_db->static_configurations.end(); + } + if (remove_static && service_level_it->second.is_static) { + return container().invoke_on_all(&service_level_controller::notify_service_level_removed, name); + } else if (!remove_static && !service_level_it->second.is_static) { + if (static_exists) { + service_level_it->second.is_static = true; + return container().invoke_on_all(&service_level_controller::notify_service_level_updated, name, static_conf_it->second); + } else { + return container().invoke_on_all(&service_level_controller::notify_service_level_removed, name); + } + } + } + return make_ready_future(); +} + +} diff --git a/service/qos/service_level_controller.hh b/service/qos/service_level_controller.hh new file mode 100644 index 0000000000..d4885d09d5 --- /dev/null +++ b/service/qos/service_level_controller.hh @@ -0,0 +1,241 @@ +/* + * Copyright (C) 2020 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#pragma once + +#include "seastarx.hh" +#include "log.hh" +#include "auth/role_manager.hh" +#include "auth/authenticated_user.hh" +#include +#include +#include "auth/service.hh" +#include "service/storage_service.hh" +#include +#include +#include "qos_common.hh" + +namespace db { + class system_distributed_keyspace; +} +namespace qos { +/** + * a structure to hold a service level + * data and configuration. + */ +struct service_level { + service_level_options slo; + bool marked_for_deletion; + bool is_static; +}; + +/** + * The service_level_controller class is an implementation of the service level + * controller design. + * It is logically divided into 2 parts: + * 1. Global controller which is responsible for all of the data and plumbing + * manipulation. + * 2. Local controllers that act upon the data and facilitates execution in + * the service level context + */ +class service_level_controller : public peering_sharded_service { +public: + class service_level_distributed_data_accessor { + public: + virtual future get_service_levels() const = 0; + virtual future get_service_level(sstring service_level_name) const = 0; + virtual future<> set_service_level(sstring service_level_name, qos::service_level_options slo) const = 0; + virtual future<> drop_service_level(sstring service_level_name) const = 0; + }; + using service_level_distributed_data_accessor_ptr = ::shared_ptr; +private: + struct global_controller_data { + service_levels_info static_configurations{}; + int schedg_group_cnt = 0; + int io_priority_cnt = 0; + service_level_options default_service_level_config; + // The below future is used to serialize work so no reordering can occur. + // This is needed so for example: delete(x), add(x) will not reverse yielding + // a completely different result than the one intended. + future<> work_future = make_ready_future(); + semaphore notifications_serializer = semaphore(1); + future<> distributed_data_update = make_ready_future(); + abort_source dist_data_update_aborter; + }; + + std::unique_ptr _global_controller_db; + + static constexpr shard_id global_controller = 0; + + std::unordered_map _service_levels_db; + std::unordered_map _role_to_service_level; + service_level _default_service_level; + service_level_distributed_data_accessor_ptr _sl_data_accessor; + sharded& _auth_service; +public: + service_level_controller(sharded& auth_service, service_level_options default_service_level_config); + + /** + * this function must be called *once* from any shard before any other functions are called. + * No other function should be called before the future returned by the function is resolved. + * @return a future that resolves when the initialization is over. + */ + future<> start(); + + void set_distributed_data_accessor(service_level_distributed_data_accessor_ptr sl_data_accessor); + + /** + * Adds a service level configuration if it doesn't exists, and updates + * an the existing one if it does exist. + * Handles both, static and non static service level configurations. + * @param name - the service level name. + * @param slo - the service level configuration + * @param is_static - is this configuration static or not + */ + future<> add_service_level(sstring name, service_level_options slo, bool is_static = false); + + /** + * Removes a service level configuration if it exists. + * Handles both, static and non static service level configurations. + * @param name - the service level name. + * @param remove_static - indicates if it is a removal of a static configuration + * or not. + */ + future<> remove_service_level(sstring name, bool remove_static); + + /** + * stops all ongoing operations if exists + * @return a future that is resolved when all operations has stopped + */ + future<> stop(); + + /** + * this is an executor of a function with arguments under a service level + * that corresponds to a given user. + * @param usr - the user for determining the service level + * @param func - the function to be executed + * @return a future that is resolved when the function's operation is resolved + * (if it returns a future). or a ready future containing the returned value + * from the function/ + */ + template + futurize_t with_user_service_level(shared_ptr usr, noncopyable_function func) { + if (usr) { + auth::service& ser = _auth_service.local(); + return auth::get_roles(ser, *usr).then([this] (auth::role_set roles) { + return find_service_level(roles); + }).then([usr, this, func = std::move(func)] (sstring service_level_name) mutable { + return with_service_level(service_level_name, std::move(func)); + }); + } else { + return with_service_level(default_service_level_name, std::move(func)); + } + } + + /** + * Chack the distributed data for changes in a constant interval and updates + * the service_levels configuration in accordance (adds, removes, or updates + * service levels as necessairy). + * @param interval - the interval is seconds to check the distributed data. + * @return a future that is resolved when the update loop stops. + */ + void update_from_distributed_data(std::chrono::duration interval); + + /** + * Updates the service level data from the distributed data store. + * @return a future that is resolved when the update is done + */ + future<> update_service_levels_from_distributed_data(); + + + future<> add_distributed_service_level(sstring name, service_level_options slo, bool if_not_exsists); + future<> alter_distributed_service_level(sstring name, service_level_options slo); + future<> drop_distributed_service_level(sstring name, bool if_exists); + future get_distributed_service_levels(); + future get_distributed_service_level(sstring service_level_name); + +private: + /** + * Adds a service level configuration if it doesn't exists, and updates + * an the existing one if it does exist. + * Handles both, static and non static service level configurations. + * @param name - the service level name. + * @param slo - the service level configuration + * @param is_static - is this configuration static or not + */ + future<> do_add_service_level(sstring name, service_level_options slo, bool is_static = false); + + /** + * Removes a service level configuration if it exists. + * Handles both, static and non static service level configurations. + * @param name - the service level name. + * @param remove_static - indicates if it is a removal of a static configuration + * or not. + */ + future<> do_remove_service_level(sstring name, bool remove_static); + + + + /** + * Returns the service level **in effect** for a user having the given + * collection of roles. + * @param roles - the collection of roles to consider + * @return the name of the service level in effect. + */ + future find_service_level(auth::role_set roles); + + /** + * The notify functions are used by the global service level controller + * to propagate configuration changes to the local controllers. + * the returned future is resolved when the local controller is done acting + * on the notification. updates are done in sequence so their meaning will not + * change due to execution reordering. + */ + future<> notify_service_level_added(sstring name, service_level sl_data); + future<> notify_service_level_updated(sstring name, service_level_options slo); + future<> notify_service_level_removed(sstring name); + + /** + * Gets the service level data by name. + * @param service_level_name - the name of the requested service level + * @return the service level data if it exists (in the local controller) or + * get_service_level("default") otherwise. + */ + service_level& get_service_level(sstring service_level_name) { + auto sl_it = _service_levels_db.find(service_level_name); + if (sl_it == _service_levels_db.end() || sl_it->second.marked_for_deletion) { + sl_it = _service_levels_db.find(default_service_level_name); + } + return sl_it->second; + } + + enum class set_service_level_op_type { + add_if_not_exists, + add, + alter + }; + + future<> set_distributed_service_level(sstring name, service_level_options slo, set_service_level_op_type op_type); + +public: + static sstring default_service_level_name; +}; +} From 41951d34ad77b2e6fce76c381cf48a1024061a7c Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Mon, 28 Dec 2020 18:05:45 +0100 Subject: [PATCH 06/16] qos: add waiting for the updater future The distributed data updated used to spawn a future without waiting for it. It was quite safe, since the future had its own abort source, but it's better to remember it and wait for it during stop() anyway. --- service/qos/service_level_controller.cc | 4 ++-- service/qos/service_level_controller.hh | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/service/qos/service_level_controller.cc b/service/qos/service_level_controller.cc index 549be2de26..e554610594 100644 --- a/service/qos/service_level_controller.cc +++ b/service/qos/service_level_controller.cc @@ -94,7 +94,7 @@ future<> service_level_controller::stop() { _global_controller_db->dist_data_update_aborter.request_abort(); _global_controller_db->notifications_serializer.broken(); } - return make_ready_future(); + return std::exchange(_distributed_data_updater, make_ready_future<>()); } future<> service_level_controller::update_service_levels_from_distributed_data() { @@ -225,7 +225,7 @@ future<> service_level_controller::notify_service_level_removed(sstring name) { } void service_level_controller::update_from_distributed_data(std::chrono::duration interval) { - (void)container().invoke_on(global_controller, [interval] (service_level_controller& global_sl) { + _distributed_data_updater = container().invoke_on(global_controller, [interval] (service_level_controller& global_sl) { if (global_sl._global_controller_db->distributed_data_update.available()) { global_sl._global_controller_db->distributed_data_update = repeat([interval, &global_sl] { return sleep_abortable(std::chrono::duration_cast(interval), diff --git a/service/qos/service_level_controller.hh b/service/qos/service_level_controller.hh index d4885d09d5..1831bfa2bc 100644 --- a/service/qos/service_level_controller.hh +++ b/service/qos/service_level_controller.hh @@ -90,6 +90,7 @@ private: service_level _default_service_level; service_level_distributed_data_accessor_ptr _sl_data_accessor; sharded& _auth_service; + future<> _distributed_data_updater = make_ready_future<>(); public: service_level_controller(sharded& auth_service, service_level_options default_service_level_config); From 8493e19840545be518581b322b6b4887ffe56100 Mon Sep 17 00:00:00 2001 From: Eliran Sinvani Date: Sun, 3 Feb 2019 16:48:24 +0200 Subject: [PATCH 07/16] qos: Add a standard implementation for service level data accessor service_level_controller defines an interface for accessing the service level distributed data, this patch implements a standard implementation of the interface that delegates to the system distributed keyspace. Message-Id: <25e68302f6f4d4fe5fcb66ea19159ad68506ba64.1609175314.git.sarna@scylladb.com> --- configure.py | 1 + ...service_level_distributed_data_accessor.cc | 47 +++++++++++++++++++ ...service_level_distributed_data_accessor.hh | 43 +++++++++++++++++ 3 files changed, 91 insertions(+) create mode 100644 service/qos/standard_service_level_distributed_data_accessor.cc create mode 100644 service/qos/standard_service_level_distributed_data_accessor.hh diff --git a/configure.py b/configure.py index b7e3f1a670..02ff21efb9 100755 --- a/configure.py +++ b/configure.py @@ -838,6 +838,7 @@ scylla_core = (['database.cc', 'service/pager/query_pagers.cc', 'service/qos/qos_common.cc', 'service/qos/service_level_controller.cc', + 'service/qos/standard_service_level_distributed_data_accessor.cc', 'streaming/stream_task.cc', 'streaming/stream_session.cc', 'streaming/stream_request.cc', diff --git a/service/qos/standard_service_level_distributed_data_accessor.cc b/service/qos/standard_service_level_distributed_data_accessor.cc new file mode 100644 index 0000000000..2beb5b1b93 --- /dev/null +++ b/service/qos/standard_service_level_distributed_data_accessor.cc @@ -0,0 +1,47 @@ +/* + * Copyright (C) 2020 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#include "standard_service_level_distributed_data_accessor.hh" +#include "db/system_distributed_keyspace.hh" + +namespace qos { + +standard_service_level_distributed_data_accessor::standard_service_level_distributed_data_accessor(db::system_distributed_keyspace &sys_dist_ks): +_sys_dist_ks(sys_dist_ks) { +} + +future standard_service_level_distributed_data_accessor::get_service_levels() const { + return _sys_dist_ks.get_service_levels(); +} + +future standard_service_level_distributed_data_accessor::get_service_level(sstring service_level_name) const { + return _sys_dist_ks.get_service_level(service_level_name); +} + +future<> standard_service_level_distributed_data_accessor::set_service_level(sstring service_level_name, qos::service_level_options slo) const { + return _sys_dist_ks.set_service_level(service_level_name, slo); +} + +future<> standard_service_level_distributed_data_accessor::drop_service_level(sstring service_level_name) const { + return _sys_dist_ks.drop_service_level(service_level_name); +} + +} diff --git a/service/qos/standard_service_level_distributed_data_accessor.hh b/service/qos/standard_service_level_distributed_data_accessor.hh new file mode 100644 index 0000000000..89bc89c80a --- /dev/null +++ b/service/qos/standard_service_level_distributed_data_accessor.hh @@ -0,0 +1,43 @@ +/* + * Copyright (C) 2020 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#pragma once + +#include "seastarx.hh" +#include "service_level_controller.hh" + + +namespace db { + class system_distributed_keyspace; +} +namespace qos { +class standard_service_level_distributed_data_accessor : public service_level_controller::service_level_distributed_data_accessor, + ::enable_shared_from_this { +private: + db::system_distributed_keyspace& _sys_dist_ks; +public: + standard_service_level_distributed_data_accessor(db::system_distributed_keyspace &sys_dist_ks); + virtual future get_service_levels() const override; + virtual future get_service_level(sstring service_level_name) const override; + virtual future<> set_service_level(sstring service_level_name, qos::service_level_options slo) const override; + virtual future<> drop_service_level(sstring service_level_name) const override; +}; +} From e173eaa03297a9b3feea271a8928283bc57f1e40 Mon Sep 17 00:00:00 2001 From: Eliran Sinvani Date: Tue, 29 Jan 2019 09:57:45 +0200 Subject: [PATCH 08/16] instantiate and initialize the service_level_controller This patch adds the initialization of service_level_controller. It constructs the distributed service and start the watch loop for distributed data changes. Message-Id: --- main.cc | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/main.cc b/main.cc index c6e152e4dc..f0ee0813e7 100644 --- a/main.cc +++ b/main.cc @@ -35,6 +35,7 @@ #include "service/migration_manager.hh" #include "service/load_meter.hh" #include "service/view_update_backlog_broker.hh" +#include "service/qos/service_level_controller.hh" #include "streaming/stream_session.hh" #include "db/system_keyspace.hh" #include "db/system_distributed_keyspace.hh" @@ -795,6 +796,16 @@ int main(int ac, char** av) { mscfg.tcp_nodelay = netw::messaging_service::tcp_nodelay_what::local; } + static sharded auth_service; + static sharded sl_controller; + + //starting service level controller + qos::service_level_options default_service_level_configuration; + sl_controller.start(std::ref(auth_service), default_service_level_configuration).get(); + sl_controller.invoke_on_all(&qos::service_level_controller::start).get(); + //This starts the update loop - but no real update happens until the data accessor is not initialized. + sl_controller.local().update_from_distributed_data(std::chrono::seconds(10)); + netw::messaging_service::scheduling_config scfg; scfg.statement_tenants = { {dbcfg.statement_scheduling_group, "$user"}, {default_scheduling_group(), "$system"} }; scfg.streaming = dbcfg.streaming_scheduling_group; @@ -806,7 +817,6 @@ int main(int ac, char** av) { netw::uninit_messaging_service(messaging).get(); }); - static sharded auth_service; static sharded sys_dist_ks; static sharded view_update_generator; static sharded cql_config; From f78707d3fbb92eeb2045714327e76ad95e25bb33 Mon Sep 17 00:00:00 2001 From: Eliran Sinvani Date: Wed, 20 Feb 2019 00:31:24 +0200 Subject: [PATCH 09/16] cql: Support accessing service_level_controller from query state In order to implement service level cql queries, the queries objects needs access to the service_level_controller object when processing. This patch adds this access by embedding it into the query state object. In order to accomplish the above the query processor object needs an access to service_level_controller in order to instantiate the query state. Message-Id: <68f5a7796068a49d9cd004f1cbf34bdf93b418bc.1609234193.git.sarna@scylladb.com> --- cql3/query_processor.cc | 7 ++++--- cql3/query_processor.hh | 3 ++- main.cc | 2 +- service/query_state.hh | 24 +++++++++++++++++++++++- 4 files changed, 30 insertions(+), 6 deletions(-) diff --git a/cql3/query_processor.cc b/cql3/query_processor.cc index d495ea4644..892a9fc95c 100644 --- a/cql3/query_processor.cc +++ b/cql3/query_processor.cc @@ -68,7 +68,7 @@ const std::chrono::minutes prepared_statements_cache::entry_expiry = std::chrono class query_processor::internal_state { service::query_state _qs; public: - internal_state() : _qs(service::client_state::for_internal_calls(), empty_service_permit()) { + internal_state(qos::service_level_controller &sl_controller) : _qs(service::client_state::for_internal_calls(), empty_service_permit(), sl_controller) { } operator service::query_state&() { return _qs; @@ -84,14 +84,15 @@ public: } }; -query_processor::query_processor(service::storage_proxy& proxy, database& db, service::migration_notifier& mn, service::migration_manager& mm, query_processor::memory_config mcfg, cql_config& cql_cfg) +query_processor::query_processor(service::storage_proxy& proxy, database& db, service::migration_notifier& mn, service::migration_manager& mm, query_processor::memory_config mcfg, cql_config& cql_cfg, + sharded &sl_controller) : _migration_subscriber{std::make_unique(this)} , _proxy(proxy) , _db(db) , _mnotifier(mn) , _mm(mm) , _cql_config(cql_cfg) - , _internal_state(new internal_state()) + , _internal_state(new internal_state(sl_controller.local())) , _prepared_cache(prep_cache_log, mcfg.prepared_statment_cache_size) , _authorized_prepared_cache(std::min(std::chrono::milliseconds(_db.get_config().permissions_validity_in_ms()), std::chrono::duration_cast(prepared_statements_cache::entry_expiry)), diff --git a/cql3/query_processor.hh b/cql3/query_processor.hh index 3808f4895a..5558dda085 100644 --- a/cql3/query_processor.hh +++ b/cql3/query_processor.hh @@ -154,7 +154,8 @@ public: static std::unique_ptr parse_statement(const std::string_view& query); - query_processor(service::storage_proxy& proxy, database& db, service::migration_notifier& mn, service::migration_manager& mm, memory_config mcfg, cql_config& cql_cfg); + query_processor(service::storage_proxy& proxy, database& db, service::migration_notifier& mn, service::migration_manager& mm, memory_config mcfg, cql_config& cql_cfg, + sharded &sl_controller); ~query_processor(); diff --git a/main.cc b/main.cc index f0ee0813e7..d62cb0ac59 100644 --- a/main.cc +++ b/main.cc @@ -944,7 +944,7 @@ int main(int ac, char** av) { supervisor::notify("starting query processor"); cql3::query_processor::memory_config qp_mcfg = {memory::stats().total_memory() / 256, memory::stats().total_memory() / 2560}; debug::the_query_processor = &qp; - qp.start(std::ref(proxy), std::ref(db), std::ref(mm_notifier), std::ref(mm), qp_mcfg, std::ref(cql_config)).get(); + qp.start(std::ref(proxy), std::ref(db), std::ref(mm_notifier), std::ref(mm), qp_mcfg, std::ref(cql_config), std::ref(sl_controller)).get(); // #293 - do not stop anything // engine().at_exit([&qp] { return qp.stop(); }); supervisor::notify("initializing batchlog manager"); diff --git a/service/query_state.hh b/service/query_state.hh index 0e9e3a5452..df4e4e486b 100644 --- a/service/query_state.hh +++ b/service/query_state.hh @@ -27,6 +27,9 @@ #include "tracing/tracing.hh" #include "service_permit.hh" +namespace qos { +class service_level_controller; +} namespace service { class query_state final { @@ -34,13 +37,21 @@ private: client_state& _client_state; tracing::trace_state_ptr _trace_state_ptr; service_permit _permit; + std::optional> _sl_controller; public: query_state(client_state& client_state, service_permit permit) + : _client_state(client_state) + , _trace_state_ptr(tracing::trace_state_ptr()) + , _permit(std::move(permit)) + {} + + query_state(client_state& client_state, service_permit permit, qos::service_level_controller &sl_controller) : _client_state(client_state) , _trace_state_ptr(tracing::trace_state_ptr()) , _permit(std::move(permit)) - { } + , _sl_controller(sl_controller) + {} query_state(client_state& client_state, tracing::trace_state_ptr trace_state_ptr, service_permit permit) : _client_state(client_state) @@ -48,6 +59,13 @@ public: , _permit(std::move(permit)) { } + query_state(client_state& client_state, tracing::trace_state_ptr trace_state_ptr, service_permit permit, qos::service_level_controller& sl_controller) + : _client_state(client_state) + , _trace_state_ptr(std::move(trace_state_ptr)) + , _permit(std::move(permit)) + , _sl_controller(sl_controller) + { } + const tracing::trace_state_ptr& get_trace_state() const { return _trace_state_ptr; } @@ -75,6 +93,10 @@ public: return std::move(_permit); } + qos::service_level_controller& get_service_level_controller() const { + return _sl_controller->get(); + } + }; } From a88929da15b213908c7a8bfe8f311e23335e94a8 Mon Sep 17 00:00:00 2001 From: Eliran Sinvani Date: Wed, 20 Feb 2019 00:33:03 +0200 Subject: [PATCH 10/16] auth: Add service_level resource for supporting in authorization of cql service_level queries In order to be able to manage service_level configuration one must be authorized to do so, or to be a superuser. This commit adds the support for service_levels resource. Since service_levels are relative, reconfiguring one service level is not locallized only to that service level and will affect the QOS for all of the service levels, so there is not much sense of granting permissions to manage individual service_levels. This is why only root resource named service_levels that represents all service levels is used. This commit also implements the unit test additions for the newly introduced resource. Message-Id: <81ab16fa813b61be117155feea405da6266921e3.1609237687.git.sarna@scylladb.com> --- auth/resource.cc | 39 ++++++++++++++++++++++++++++++-- auth/resource.hh | 36 ++++++++++++++++++++++++++++- auth/service.cc | 2 ++ test/boost/auth_resource_test.cc | 33 +++++++++++++++++++++++++++ 4 files changed, 107 insertions(+), 3 deletions(-) diff --git a/auth/resource.cc b/auth/resource.cc index 529a56d340..764e296456 100644 --- a/auth/resource.cc +++ b/auth/resource.cc @@ -56,6 +56,7 @@ std::ostream& operator<<(std::ostream& os, resource_kind kind) { switch (kind) { case resource_kind::data: os << "data"; break; case resource_kind::role: os << "role"; break; + case resource_kind::service_level: os << "service_level"; break; } return os; @@ -63,11 +64,13 @@ std::ostream& operator<<(std::ostream& os, resource_kind kind) { static const std::unordered_map roots{ {resource_kind::data, "data"}, - {resource_kind::role, "roles"}}; + {resource_kind::role, "roles"}, + {resource_kind::service_level, "service_levels"}}; static const std::unordered_map max_parts{ {resource_kind::data, 2}, - {resource_kind::role, 1}}; + {resource_kind::role, 1}, + {resource_kind::service_level, 0}}; static permission_set applicable_permissions(const data_resource_view& dv) { if (dv.table()) { @@ -101,6 +104,15 @@ static permission_set applicable_permissions(const role_resource_view& rv) { permission::DESCRIBE>(); } +static permission_set applicable_permissions(const service_level_resource_view &rv) { + return permission_set::of< + permission::CREATE, + permission::ALTER, + permission::DROP, + permission::DESCRIBE, + permission::AUTHORIZE>(); +} + resource::resource(resource_kind kind) : _kind(kind) { _parts.emplace_back(roots.at(kind)); } @@ -122,6 +134,9 @@ resource::resource(role_resource_t, std::string_view role) : resource(resource_k _parts.emplace_back(role); } +resource::resource(service_level_resource_t): resource(resource_kind::service_level) { +} + sstring resource::name() const { return boost::algorithm::join(_parts, "/"); } @@ -142,6 +157,7 @@ permission_set resource::applicable_permissions() const { switch (_kind) { case resource_kind::data: ps = ::auth::applicable_permissions(data_resource_view(*this)); break; case resource_kind::role: ps = ::auth::applicable_permissions(role_resource_view(*this)); break; + case resource_kind::service_level: ps = ::auth::applicable_permissions(service_level_resource_view(*this)); break; } return ps; @@ -163,11 +179,24 @@ std::ostream& operator<<(std::ostream& os, const resource& r) { switch (r.kind()) { case resource_kind::data: return os << data_resource_view(r); case resource_kind::role: return os << role_resource_view(r); + case resource_kind::service_level: return os << service_level_resource_view(r); } return os; } +service_level_resource_view::service_level_resource_view(const resource &r) : + _resource(r) { + if (r._kind != resource_kind::service_level) { + throw resource_kind_mismatch(resource_kind::service_level, r._kind); + } +} + +std::ostream &operator<<(std::ostream &os, const service_level_resource_view &v) { + os << ""; + return os; +} + data_resource_view::data_resource_view(const resource& r) : _resource(r) { if (r._kind != resource_kind::data) { throw resource_kind_mismatch(resource_kind::data, r._kind); @@ -276,6 +305,12 @@ const resource& root_role_resource() { return the_root_role_resource; } +static const resource the_root_service_level_resource{resource_kind::service_level}; + +const resource &root_service_level_resource() { + return the_root_service_level_resource; +} + resource_set expand_resource_family(const resource& rr) { resource r = rr; resource_set rs; diff --git a/auth/resource.hh b/auth/resource.hh index da527223d9..f79ae260d1 100644 --- a/auth/resource.hh +++ b/auth/resource.hh @@ -67,7 +67,7 @@ public: }; enum class resource_kind { - data, role + data, role, service_level }; std::ostream& operator<<(std::ostream&, resource_kind); @@ -82,6 +82,11 @@ struct data_resource_t final {}; /// struct role_resource_t final {}; +/// +/// Type tag for constructing service_level resources. +/// +struct service_level_resource_t final {}; + /// /// Resources are entities that users can be granted permissions on. /// @@ -108,6 +113,7 @@ public: resource(data_resource_t, std::string_view keyspace); resource(data_resource_t, std::string_view keyspace, std::string_view table); resource(role_resource_t, std::string_view role); + resource(service_level_resource_t); resource_kind kind() const noexcept { return _kind; @@ -128,6 +134,7 @@ private: friend class std::hash; friend class data_resource_view; friend class role_resource_view; + friend class service_level_resource_view; friend bool operator<(const resource&, const resource&); friend bool operator==(const resource&, const resource&); @@ -192,6 +199,22 @@ public: std::ostream& operator<<(std::ostream&, const role_resource_view&); +/// +/// A "service_level" view of \ref resource. +/// +class service_level_resource_view final { + const resource& _resource; + +public: + /// + /// \throws \ref resource_kind_mismatch if the argument is not a "service_level" resource. + /// + explicit service_level_resource_view(const resource&); + +}; + +std::ostream& operator<<(std::ostream&, const service_level_resource_view&); + /// /// Parse a resource from its name. /// @@ -214,6 +237,12 @@ inline resource make_role_resource(std::string_view role) { return resource(role_resource_t{}, role); } +const resource& root_service_level_resource(); + +inline resource make_service_level_resource() { + return resource(service_level_resource_t{}); +} + } namespace std { @@ -228,12 +257,17 @@ struct hash { return utils::tuple_hash()(std::make_tuple(auth::resource_kind::role, rv.role())); } + static size_t hash_service_level(const auth::service_level_resource_view& rv) { + return utils::tuple_hash()(std::make_tuple(auth::resource_kind::service_level)); + } + size_t operator()(const auth::resource& r) const { std::size_t value; switch (r._kind) { case auth::resource_kind::data: value = hash_data(auth::data_resource_view(r)); break; case auth::resource_kind::role: value = hash_role(auth::role_resource_view(r)); break; + case auth::resource_kind::service_level: value = hash_service_level(auth::service_level_resource_view(r)); break; } return value; diff --git a/auth/service.cc b/auth/service.cc index 327a1e5236..b0290bcd32 100644 --- a/auth/service.cc +++ b/auth/service.cc @@ -327,6 +327,8 @@ future service::exists(const resource& r) const { return make_ready_future(true); } + case resource_kind::service_level: + return make_ready_future(true); } return make_ready_future(false); diff --git a/test/boost/auth_resource_test.cc b/test/boost/auth_resource_test.cc index 3be904b3cd..1dc25cad56 100644 --- a/test/boost/auth_resource_test.cc +++ b/test/boost/auth_resource_test.cc @@ -50,6 +50,13 @@ BOOST_AUTO_TEST_CASE(root_of) { const auto rv = auth::role_resource_view(rr); BOOST_REQUIRE(!rv.role()); + + // + // service_level + // + + const auto slr = auth::resource(auth::resource_kind::service_level); + BOOST_REQUIRE_EQUAL(slr.kind(), auth::resource_kind::service_level); } BOOST_AUTO_TEST_CASE(data) { @@ -76,6 +83,11 @@ BOOST_AUTO_TEST_CASE(role) { BOOST_REQUIRE_EQUAL(*v.role(), "joe"); } +BOOST_AUTO_TEST_CASE(service_level) { + const auto r = auth::make_service_level_resource(); + BOOST_REQUIRE_EQUAL(r.kind(), auth::resource_kind::service_level); +} + BOOST_AUTO_TEST_CASE(from_name) { // // data @@ -104,6 +116,15 @@ BOOST_AUTO_TEST_CASE(from_name) { BOOST_REQUIRE_THROW(auth::parse_resource("roles/joe/smith"), auth::invalid_resource_name); + // + //service_level + // + + const auto slr1 = auth::parse_resource("service_levels"); + BOOST_REQUIRE_EQUAL(slr1, auth::root_service_level_resource()); + + BOOST_REQUIRE_THROW(auth::parse_resource("service_levels/low_priority"), auth::invalid_resource_name); + // // Generic errors. // @@ -127,6 +148,12 @@ BOOST_AUTO_TEST_CASE(name) { BOOST_REQUIRE_EQUAL(auth::root_role_resource().name(), "roles"); BOOST_REQUIRE_EQUAL(auth::make_role_resource("joe").name(), "roles/joe"); + + // + // service_level + // + + BOOST_REQUIRE_EQUAL(auth::root_service_level_resource().name(), "service_levels"); } BOOST_AUTO_TEST_CASE(parent) { @@ -162,6 +189,12 @@ BOOST_AUTO_TEST_CASE(output) { BOOST_REQUIRE_EQUAL(format("{}", auth::root_role_resource()), ""); BOOST_REQUIRE_EQUAL(format("{}", auth::make_role_resource("joe")), ""); + + // + // service_level + // + + BOOST_REQUIRE_EQUAL(format("{}", auth::root_service_level_resource()), ""); } BOOST_AUTO_TEST_CASE(expand) { From 2701481cbce7c94e9d1e29ba7ff1232b7e37000c Mon Sep 17 00:00:00 2001 From: Eliran Sinvani Date: Wed, 20 Feb 2019 00:12:27 +0200 Subject: [PATCH 11/16] cql: Add support for service level cql queries This patch adds support for new service level cql queries. The queries implemented are: CREATE SERVICE_LEVEL [IF NOT EXISTS] ALTER SERVICE_LEVEL WITH param = DROP SERVICE_LEVEL [IF EXISTS] ATTACH SERVICE_LEVEL TO DETACH SERVICE_LEVEL FROM LIST SERVICE_LEVEL LIST ALL SERVICE_LEVELS LIST ATTACHED SERVICE_LEVEL OF LIST ALL ATTACHED SERVICE_LEVELS --- configure.py | 9 ++ cql3/Cql.g | 123 +++++++++++++++++- .../alter_service_level_statement.cc | 60 +++++++++ .../alter_service_level_statement.hh | 48 +++++++ .../attach_service_level_statement.cc | 67 ++++++++++ .../attach_service_level_statement.hh | 47 +++++++ .../create_service_level_statement.cc | 60 +++++++++ .../create_service_level_statement.hh | 49 +++++++ .../detach_service_level_statement.cc | 59 +++++++++ .../detach_service_level_statement.hh | 45 +++++++ .../drop_service_level_statement.cc | 59 +++++++++ .../drop_service_level_statement.hh | 46 +++++++ ...ist_service_level_attachments_statement.cc | 100 ++++++++++++++ ...ist_service_level_attachments_statement.hh | 47 +++++++ .../list_service_level_statement.cc | 82 ++++++++++++ .../list_service_level_statement.hh | 46 +++++++ cql3/statements/service_level_statement.cc | 53 ++++++++ cql3/statements/service_level_statement.hh | 70 ++++++++++ cql3/statements/sl_prop_defs.cc | 36 +++++ cql3/statements/sl_prop_defs.hh | 43 ++++++ 20 files changed, 1148 insertions(+), 1 deletion(-) create mode 100644 cql3/statements/alter_service_level_statement.cc create mode 100644 cql3/statements/alter_service_level_statement.hh create mode 100644 cql3/statements/attach_service_level_statement.cc create mode 100644 cql3/statements/attach_service_level_statement.hh create mode 100644 cql3/statements/create_service_level_statement.cc create mode 100644 cql3/statements/create_service_level_statement.hh create mode 100644 cql3/statements/detach_service_level_statement.cc create mode 100644 cql3/statements/detach_service_level_statement.hh create mode 100644 cql3/statements/drop_service_level_statement.cc create mode 100644 cql3/statements/drop_service_level_statement.hh create mode 100644 cql3/statements/list_service_level_attachments_statement.cc create mode 100644 cql3/statements/list_service_level_attachments_statement.hh create mode 100644 cql3/statements/list_service_level_statement.cc create mode 100644 cql3/statements/list_service_level_statement.hh create mode 100644 cql3/statements/service_level_statement.cc create mode 100644 cql3/statements/service_level_statement.hh create mode 100644 cql3/statements/sl_prop_defs.cc create mode 100644 cql3/statements/sl_prop_defs.hh diff --git a/configure.py b/configure.py index 02ff21efb9..885ffd5bc2 100755 --- a/configure.py +++ b/configure.py @@ -716,6 +716,15 @@ scylla_core = (['database.cc', 'cql3/statements/alter_type_statement.cc', 'cql3/statements/alter_keyspace_statement.cc', 'cql3/statements/role-management-statements.cc', + 'cql3/statements/service_level_statement.cc', + 'cql3/statements/create_service_level_statement.cc', + 'cql3/statements/alter_service_level_statement.cc', + 'cql3/statements/sl_prop_defs.cc', + 'cql3/statements/drop_service_level_statement.cc', + 'cql3/statements/attach_service_level_statement.cc', + 'cql3/statements/detach_service_level_statement.cc', + 'cql3/statements/list_service_level_statement.cc', + 'cql3/statements/list_service_level_attachments_statement.cc', 'cql3/update_parameters.cc', 'cql3/util.cc', 'cql3/ut_name.cc', diff --git a/cql3/Cql.g b/cql3/Cql.g index 29a01a9480..03d70be829 100644 --- a/cql3/Cql.g +++ b/cql3/Cql.g @@ -37,6 +37,7 @@ options { #include "cql3/statements/alter_keyspace_statement.hh" #include "cql3/statements/alter_table_statement.hh" #include "cql3/statements/alter_view_statement.hh" +#include "cql3/statements/alter_service_level_statement.hh" #include "cql3/statements/create_keyspace_statement.hh" #include "cql3/statements/drop_keyspace_statement.hh" #include "cql3/statements/create_index_statement.hh" @@ -44,6 +45,9 @@ options { #include "cql3/statements/create_view_statement.hh" #include "cql3/statements/create_type_statement.hh" #include "cql3/statements/create_function_statement.hh" +#include "cql3/statements/create_service_level_statement.hh" +#include "cql3/statements/sl_prop_defs.hh" +#include "cql3/statements/attach_service_level_statement.hh" #include "cql3/statements/drop_type_statement.hh" #include "cql3/statements/alter_type_statement.hh" #include "cql3/statements/property_definitions.hh" @@ -51,6 +55,8 @@ options { #include "cql3/statements/drop_table_statement.hh" #include "cql3/statements/drop_view_statement.hh" #include "cql3/statements/drop_function_statement.hh" +#include "cql3/statements/drop_service_level_statement.hh" +#include "cql3/statements/detach_service_level_statement.hh" #include "cql3/statements/truncate_statement.hh" #include "cql3/statements/raw/update_statement.hh" #include "cql3/statements/raw/insert_statement.hh" @@ -64,6 +70,8 @@ options { #include "cql3/statements/list_permissions_statement.hh" #include "cql3/statements/alter_role_statement.hh" #include "cql3/statements/list_roles_statement.hh" +#include "cql3/statements/list_service_level_statement.hh" +#include "cql3/statements/list_service_level_attachments_statement.hh" #include "cql3/statements/grant_role_statement.hh" #include "cql3/statements/revoke_role_statement.hh" #include "cql3/statements/drop_role_statement.hh" @@ -194,7 +202,7 @@ struct uninitialized { listener->syntax_error(*this, token_names, ex); } - void add_recognition_error(const sstring& msg) { + void add_recognition_error(const sstring& msg) { listener->syntax_error(*this, msg); } @@ -370,6 +378,14 @@ cqlStatement returns [std::unique_ptr stmt] | st38=dropRoleStatement { $stmt = std::move(st38); } | st39=createRoleStatement { $stmt = std::move(st39); } | st40=alterRoleStatement { $stmt = std::move(st40); } + | st41=createServiceLevelStatement { $stmt = std::move(st41); } + | st42=alterServiceLevelStatement { $stmt = std::move(st42); } + | st43=dropServiceLevelStatement { $stmt = std::move(st43); } + | st44=attachServiceLevelStatement { $stmt = std::move(st44); } + | st45=detachServiceLevelStatement { $stmt = std::move(st45); } + | st46=listServiceLevelStatement { $stmt = std::move(st46); } + | st47=listServiceLevelAttachStatement { $stmt = std::move(st47); } + ; /* @@ -1241,6 +1257,88 @@ roleOption[cql3::role_options& opts] | K_LOGIN '=' b=BOOLEAN { opts.can_login = convert_boolean_literal($b.text); } ; +/** + * CREATE SERVICE_LEVEL [IF NOT EXISTS] [WITH = ] + */ +createServiceLevelStatement returns [std::unique_ptr stmt] + @init { + auto attrs = make_shared(); + bool if_not_exists = false; + } + : K_CREATE K_SERVICE_LEVEL (K_IF K_NOT K_EXISTS { if_not_exists = true; })? name=serviceLevelOrRoleName (K_WITH properties[*attrs])? + { $stmt = std::make_unique(name, attrs, if_not_exists); } + ; + +/** + * ALTER SERVICE_LEVEL WITH = + */ +alterServiceLevelStatement returns [std::unique_ptr stmt] + @init { + auto attrs = make_shared(); + } + : K_ALTER K_SERVICE_LEVEL name=serviceLevelOrRoleName K_WITH properties[*attrs] + { $stmt = std::make_unique(name, attrs); } + ; + +/** + * DROP SERVICE_LEVEL [IF EXISTS] + */ +dropServiceLevelStatement returns [std::unique_ptr stmt] + @init { + bool if_exists = false; + } + : K_DROP K_SERVICE_LEVEL (K_IF K_EXISTS { if_exists = true; })? name=serviceLevelOrRoleName + { $stmt = std::make_unique(name, if_exists); } + ; + +/** + * ATTACH SERVICE_LEVEL TO + */ +attachServiceLevelStatement returns [std::unique_ptr stmt] + @init { + } + : K_ATTACH K_SERVICE_LEVEL service_level_name=serviceLevelOrRoleName K_TO role_name=serviceLevelOrRoleName + { $stmt = std::make_unique(service_level_name, role_name); } + ; + +/** + * DETACH SERVICE_LEVEL FROM + */ +detachServiceLevelStatement returns [std::unique_ptr stmt] + @init { + } + : K_DETACH K_SERVICE_LEVEL K_FROM role_name=serviceLevelOrRoleName + { $stmt = std::make_unique(role_name); } + ; + + +/** + * LIST SERVICE_LEVEL + * LIST ALL SERVICE_LEVELS + */ +listServiceLevelStatement returns [std::unique_ptr stmt] + @init { + } + : K_LIST K_SERVICE_LEVEL service_level_name=serviceLevelOrRoleName + { $stmt = std::make_unique(service_level_name, false); } | + K_LIST K_ALL K_SERVICE_LEVELS + { $stmt = std::make_unique("", true); } + ; + +/** + * LIST ATTACHED SERVICE_LEVEL OF + * LIST ALL ATTACHED SERVICE_LEVELS + */ +listServiceLevelAttachStatement returns [std::unique_ptr stmt] + @init { + bool allow_nonexisting_roles = false; + } + : K_LIST K_ATTACHED K_SERVICE_LEVEL K_OF role_name=serviceLevelOrRoleName + { $stmt = std::make_unique(role_name); } | + K_LIST K_ALL K_ATTACHED K_SERVICE_LEVELS + { $stmt = std::make_unique(); } + ; + /** DEFINITIONS **/ // Column Identifiers. These need to be treated differently from other @@ -1286,6 +1384,16 @@ userOrRoleName returns [uninitialized name] | k=unreserved_keyword { $name = cql3::role_name(k, cql3::preserve_role_case::no); } | QMARK {add_recognition_error("Bind variables cannot be used for role names");} ; + +serviceLevelOrRoleName returns [sstring name] +: t=IDENT { $name = sstring($t.text); + std::transform($name.begin(), $name.end(), $name.begin(), ::tolower); } +| t=STRING_LITERAL { $name = sstring($t.text); } +| t=QUOTED_NAME { $name = sstring($t.text); } +| k=unreserved_keyword { $name = sstring($t.text); + std::transform($name.begin(), $name.end(), $name.begin(), ::tolower);} +| QMARK {add_recognition_error("Bind variables cannot be used for service levels or role names");} +; ksName[cql3::keyspace_element_name& name] : t=IDENT { $name.set_keyspace($t.text, false);} @@ -1767,6 +1875,12 @@ basic_unreserved_keyword returns [sstring str] | K_LIKE | K_PER | K_PARTITION + | K_SERVICE_LEVEL + | K_ATTACH + | K_DETACH + | K_SERVICE_LEVELS + | K_ATTACHED + | K_FOR | K_GROUP | K_TIMEOUT ) { $str = $k.text; } @@ -1917,6 +2031,13 @@ K_CACHE: C A C H E; K_PER: P E R; K_PARTITION: P A R T I T I O N; +K_SERVICE_LEVEL: S E R V I C E '_' L E V E L; +K_ATTACH: A T T A C H; +K_DETACH: D E T A C H; +K_SERVICE_LEVELS: S E R V I C E '_' L E V E L S; +K_ATTACHED: A T T A C H E D; +K_FOR: F O R; + K_SCYLLA_TIMEUUID_LIST_INDEX: S C Y L L A '_' T I M E U U I D '_' L I S T '_' I N D E X; K_SCYLLA_COUNTER_SHARD_LIST: S C Y L L A '_' C O U N T E R '_' S H A R D '_' L I S T; K_SCYLLA_CLUSTERING_BOUND: S C Y L L A '_' C L U S T E R I N G '_' B O U N D; diff --git a/cql3/statements/alter_service_level_statement.cc b/cql3/statements/alter_service_level_statement.cc new file mode 100644 index 0000000000..2923e6a3ca --- /dev/null +++ b/cql3/statements/alter_service_level_statement.cc @@ -0,0 +1,60 @@ +/* + * Copyright (C) 2021 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#include "seastarx.hh" +#include "cql3/statements/alter_service_level_statement.hh" +#include "service/qos/service_level_controller.hh" +#include "transport/messages/result_message.hh" + +namespace cql3 { + +namespace statements { + +alter_service_level_statement::alter_service_level_statement(sstring service_level, shared_ptr attrs) + : _service_level(service_level) { + attrs->validate(); +} + +std::unique_ptr +cql3::statements::alter_service_level_statement::prepare( + database &db, cql_stats &stats) { + return std::make_unique(::make_shared(*this)); +} + +void alter_service_level_statement::validate(service::storage_proxy &, const service::client_state &) const { +} + +future<> alter_service_level_statement::check_access(service::storage_proxy& sp, const service::client_state &state) const { + return state.ensure_has_permission(auth::command_desc{.permission = auth::permission::ALTER, .resource = auth::root_service_level_resource()}); +} + +future<::shared_ptr> +alter_service_level_statement::execute(query_processor& qp, + service::query_state &state, + const query_options &) const { + return state.get_service_level_controller().alter_distributed_service_level(_service_level, _slo).then([] { + using void_result_msg = cql_transport::messages::result_message::void_message; + using result_msg = cql_transport::messages::result_message; + return ::static_pointer_cast(make_shared()); + }); +} +} +} diff --git a/cql3/statements/alter_service_level_statement.hh b/cql3/statements/alter_service_level_statement.hh new file mode 100644 index 0000000000..771165ac5b --- /dev/null +++ b/cql3/statements/alter_service_level_statement.hh @@ -0,0 +1,48 @@ +/* + * Copyright (C) 2021 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#pragma once + +#include + +#include "cql3/statements/service_level_statement.hh" +#include "cql3/statements/sl_prop_defs.hh" +#include "service/qos/qos_common.hh" + +namespace cql3 { +namespace statements { + +class alter_service_level_statement final : public service_level_statement { + sstring _service_level; + qos::service_level_options _slo; + +public: + alter_service_level_statement(sstring service_level, shared_ptr attrs); + std::unique_ptr prepare(database &db, cql_stats &stats) override; + void validate(service::storage_proxy&, const service::client_state&) const override; + virtual future<> check_access(service::storage_proxy& sp, const service::client_state&) const override; + virtual future<::shared_ptr> + execute(query_processor&, service::query_state&, const query_options&) const override; +}; + +} + +} diff --git a/cql3/statements/attach_service_level_statement.cc b/cql3/statements/attach_service_level_statement.cc new file mode 100644 index 0000000000..ac52532e25 --- /dev/null +++ b/cql3/statements/attach_service_level_statement.cc @@ -0,0 +1,67 @@ +/* + * Copyright (C) 2021 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#include "seastarx.hh" +#include "cql3/statements/attach_service_level_statement.hh" +#include "service/qos/service_level_controller.hh" +#include "exceptions/exceptions.hh" +#include "transport/messages/result_message.hh" + +namespace cql3 { + +namespace statements { + +attach_service_level_statement::attach_service_level_statement(sstring service_level, sstring role_name) : + _service_level(service_level), _role_name(role_name) { +} + +std::unique_ptr +cql3::statements::attach_service_level_statement::prepare( + database &db, cql_stats &stats) { + return std::make_unique(::make_shared(*this)); +} + +void attach_service_level_statement::validate(service::storage_proxy &, const service::client_state &) const { +} + +future<> attach_service_level_statement::check_access(service::storage_proxy& sp, const service::client_state &state) const { + return state.ensure_has_permission(auth::command_desc{.permission = auth::permission::AUTHORIZE, .resource = auth::root_service_level_resource()}); +} + +future<::shared_ptr> +attach_service_level_statement::execute(query_processor& qp, + service::query_state &state, + const query_options &) const { + return state.get_service_level_controller().get_distributed_service_level(_service_level).then([this] (qos::service_levels_info sli) { + if (sli.empty()) { + throw qos::nonexistant_service_level_exception(_service_level); + } + }).then([&state, this] () { + return state.get_client_state().get_auth_service()->underlying_role_manager().set_attribute(_role_name, "service_level", _service_level).then([] { + using void_result_msg = cql_transport::messages::result_message::void_message; + using result_msg = cql_transport::messages::result_message; + return ::static_pointer_cast(make_shared()); + }); + }); + +} +} +} diff --git a/cql3/statements/attach_service_level_statement.hh b/cql3/statements/attach_service_level_statement.hh new file mode 100644 index 0000000000..b30bcbc096 --- /dev/null +++ b/cql3/statements/attach_service_level_statement.hh @@ -0,0 +1,47 @@ +/* + * Copyright (C) 2021 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#pragma once + +#include + +#include "cql3/statements/service_level_statement.hh" +#include "service/qos/qos_common.hh" + +namespace cql3 { +namespace statements { + +class attach_service_level_statement final : public service_level_statement { + sstring _service_level; + sstring _role_name; + +public: + attach_service_level_statement(sstring service_level, sstring role_name); + std::unique_ptr prepare(database &db, cql_stats &stats) override; + void validate(service::storage_proxy&, const service::client_state&) const override; + virtual future<> check_access(service::storage_proxy& sp, const service::client_state&) const override; + virtual future<::shared_ptr> + execute(query_processor&, service::query_state&, const query_options&) const override; +}; + +} + +} diff --git a/cql3/statements/create_service_level_statement.cc b/cql3/statements/create_service_level_statement.cc new file mode 100644 index 0000000000..e8d1db69f8 --- /dev/null +++ b/cql3/statements/create_service_level_statement.cc @@ -0,0 +1,60 @@ +/* + * Copyright (C) 2021 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#include "seastarx.hh" +#include "cql3/statements/create_service_level_statement.hh" +#include "service/qos/service_level_controller.hh" +#include "transport/messages/result_message.hh" + +namespace cql3 { + +namespace statements { + +create_service_level_statement::create_service_level_statement(sstring service_level, shared_ptr attrs, bool if_not_exists) + : _service_level(service_level), _if_not_exists(if_not_exists) { + attrs->validate(); +} + +std::unique_ptr +cql3::statements::create_service_level_statement::prepare( + database &db, cql_stats &stats) { + return std::make_unique(::make_shared(*this)); +} + +void create_service_level_statement::validate(service::storage_proxy &, const service::client_state &) const { +} + +future<> create_service_level_statement::check_access(service::storage_proxy& sp, const service::client_state &state) const { + return state.ensure_has_permission(auth::command_desc{.permission = auth::permission::CREATE, .resource = auth::root_service_level_resource()}); +} + +future<::shared_ptr> +create_service_level_statement::execute(query_processor& qp, + service::query_state &state, + const query_options &) const { + return state.get_service_level_controller().add_distributed_service_level(_service_level, _slo, _if_not_exists).then([] { + using void_result_msg = cql_transport::messages::result_message::void_message; + using result_msg = cql_transport::messages::result_message; + return ::static_pointer_cast(make_shared()); + }); +} +} +} diff --git a/cql3/statements/create_service_level_statement.hh b/cql3/statements/create_service_level_statement.hh new file mode 100644 index 0000000000..3113a5f106 --- /dev/null +++ b/cql3/statements/create_service_level_statement.hh @@ -0,0 +1,49 @@ +/* + * Copyright (C) 2021 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#pragma once + +#include + +#include "cql3/statements/service_level_statement.hh" +#include "cql3/statements/sl_prop_defs.hh" +#include "service/qos/qos_common.hh" + +namespace cql3 { +namespace statements { + +class create_service_level_statement final : public service_level_statement { + sstring _service_level; + qos::service_level_options _slo; + bool _if_not_exists; + +public: + create_service_level_statement(sstring service_level, shared_ptr attrs, bool if_not_exists); + std::unique_ptr prepare(database &db, cql_stats &stats) override; + void validate(service::storage_proxy&, const service::client_state&) const override; + virtual future<> check_access(service::storage_proxy& sp, const service::client_state&) const override; + virtual future<::shared_ptr> + execute(query_processor&, service::query_state&, const query_options&) const override; +}; + +} + +} diff --git a/cql3/statements/detach_service_level_statement.cc b/cql3/statements/detach_service_level_statement.cc new file mode 100644 index 0000000000..90bfb44fe1 --- /dev/null +++ b/cql3/statements/detach_service_level_statement.cc @@ -0,0 +1,59 @@ +/* + * Copyright (C) 2021 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#include "seastarx.hh" +#include "cql3/statements/detach_service_level_statement.hh" +#include "service/qos/service_level_controller.hh" +#include "transport/messages/result_message.hh" + +namespace cql3 { + +namespace statements { + +detach_service_level_statement::detach_service_level_statement(sstring role_name) : + _role_name(role_name) { +} + +std::unique_ptr +cql3::statements::detach_service_level_statement::prepare( + database &db, cql_stats &stats) { + return std::make_unique(::make_shared(*this)); +} + +void detach_service_level_statement::validate(service::storage_proxy &, const service::client_state &) const { +} + +future<> detach_service_level_statement::check_access(service::storage_proxy& sp, const service::client_state &state) const { + return state.ensure_has_permission(auth::command_desc{.permission = auth::permission::AUTHORIZE, .resource = auth::root_service_level_resource()}); +} + +future<::shared_ptr> +detach_service_level_statement::execute(query_processor& qp, + service::query_state &state, + const query_options &) const { + return state.get_client_state().get_auth_service()->underlying_role_manager().remove_attribute(_role_name, "service_level").then([] { + using void_result_msg = cql_transport::messages::result_message::void_message; + using result_msg = cql_transport::messages::result_message; + return ::static_pointer_cast(make_shared()); + }); +} +} +} diff --git a/cql3/statements/detach_service_level_statement.hh b/cql3/statements/detach_service_level_statement.hh new file mode 100644 index 0000000000..82ea7792ff --- /dev/null +++ b/cql3/statements/detach_service_level_statement.hh @@ -0,0 +1,45 @@ +/* + * Copyright (C) 2021 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#pragma once + +#include + +#include "cql3/statements/service_level_statement.hh" +#include "service/qos/qos_common.hh" + +namespace cql3 { +namespace statements { + +class detach_service_level_statement final : public service_level_statement { + sstring _role_name; +public: + detach_service_level_statement(sstring role_name); + std::unique_ptr prepare(database &db, cql_stats &stats) override; + void validate(service::storage_proxy&, const service::client_state&) const override; + virtual future<> check_access(service::storage_proxy& sp, const service::client_state&) const override; + virtual future<::shared_ptr> + execute(query_processor&, service::query_state&, const query_options&) const override; +}; + +} + +} diff --git a/cql3/statements/drop_service_level_statement.cc b/cql3/statements/drop_service_level_statement.cc new file mode 100644 index 0000000000..10d7a774fc --- /dev/null +++ b/cql3/statements/drop_service_level_statement.cc @@ -0,0 +1,59 @@ +/* + * Copyright (C) 2021 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#include "seastarx.hh" +#include "cql3/statements/drop_service_level_statement.hh" +#include "service/qos/service_level_controller.hh" +#include "transport/messages/result_message.hh" + +namespace cql3 { + +namespace statements { + +drop_service_level_statement::drop_service_level_statement(sstring service_level, bool if_exists) : + _service_level(service_level), _if_exists(if_exists) { +} + +std::unique_ptr +cql3::statements::drop_service_level_statement::prepare( + database &db, cql_stats &stats) { + return std::make_unique(::make_shared(*this)); +} + +void drop_service_level_statement::validate(service::storage_proxy &, const service::client_state &) const { +} + +future<> drop_service_level_statement::check_access(service::storage_proxy& sp, const service::client_state &state) const { + return state.ensure_has_permission(auth::command_desc{.permission = auth::permission::DROP, .resource = auth::root_service_level_resource()}); +} + +future<::shared_ptr> +drop_service_level_statement::execute(query_processor& qp, + service::query_state &state, + const query_options &) const { + return state.get_service_level_controller().drop_distributed_service_level(_service_level, _if_exists).then([] { + using void_result_msg = cql_transport::messages::result_message::void_message; + using result_msg = cql_transport::messages::result_message; + return ::static_pointer_cast(make_shared()); + }); +} +} +} diff --git a/cql3/statements/drop_service_level_statement.hh b/cql3/statements/drop_service_level_statement.hh new file mode 100644 index 0000000000..bf26887c21 --- /dev/null +++ b/cql3/statements/drop_service_level_statement.hh @@ -0,0 +1,46 @@ +/* + * Copyright (C) 2021 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#pragma once + +#include + +#include "cql3/statements/service_level_statement.hh" +#include "service/qos/qos_common.hh" + +namespace cql3 { +namespace statements { + +class drop_service_level_statement final : public service_level_statement { + sstring _service_level; + bool _if_exists; +public: + drop_service_level_statement(sstring service_level, bool if_exists); + std::unique_ptr prepare(database &db, cql_stats &stats) override; + void validate(service::storage_proxy&, const service::client_state&) const override; + virtual future<> check_access(service::storage_proxy& sp, const service::client_state&) const override; + virtual future<::shared_ptr> + execute(query_processor&, service::query_state&, const query_options&) const override; +}; + +} + +} diff --git a/cql3/statements/list_service_level_attachments_statement.cc b/cql3/statements/list_service_level_attachments_statement.cc new file mode 100644 index 0000000000..07672aaaed --- /dev/null +++ b/cql3/statements/list_service_level_attachments_statement.cc @@ -0,0 +1,100 @@ +/* + * Copyright (C) 2021 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#include "seastarx.hh" +#include "cql3/statements/list_service_level_attachments_statement.hh" +#include "service/qos/service_level_controller.hh" +#include "transport/messages/result_message.hh" + +namespace cql3 { + +namespace statements { + +list_service_level_attachments_statement::list_service_level_attachments_statement(sstring role_name) : + _role_name(role_name), _describe_all(false) { +} + +list_service_level_attachments_statement::list_service_level_attachments_statement() : + _role_name(), _describe_all(true) { +} + +std::unique_ptr +cql3::statements::list_service_level_attachments_statement::prepare( + database &db, cql_stats &stats) { + return std::make_unique(::make_shared(*this)); +} + +void list_service_level_attachments_statement::validate(service::storage_proxy &, const service::client_state &) const { +} + +future<> list_service_level_attachments_statement::check_access(service::storage_proxy& sp, const service::client_state &state) const { + return state.ensure_has_permission(auth::command_desc{.permission = auth::permission::DESCRIBE, .resource = auth::root_service_level_resource()}); +} + +future<::shared_ptr> +list_service_level_attachments_statement::execute(query_processor& qp, + service::query_state &state, + const query_options &) const { + + static auto make_column = [] (sstring name, const shared_ptr type) { + return make_lw_shared( + "QOS", + "service_levels_attachments", + ::make_shared(std::move(name), true), + type); + }; + + static thread_local const std::vector> metadata({ + make_column("role", utf8_type), make_column("service_level", utf8_type) + }); + + + return make_ready_future().then([this, &state] () { + if (_describe_all) { + return state.get_client_state().get_auth_service()->underlying_role_manager().query_attribute_for_all("service_level"); + } else { + return state.get_client_state().get_auth_service()->underlying_role_manager().get_attribute(_role_name, "service_level").then([this] (std::optional att_val) { + std::unordered_map ret; + if (att_val) { + ret.emplace(_role_name, *att_val); + } + return make_ready_future>(ret); + }); + + } + }).then([this, &state] (std::unordered_map roles_to_att_val) { + + auto rs = std::make_unique(metadata); + for (auto&& role_to_sl : roles_to_att_val) { + rs->add_row(std::vector{ + utf8_type->decompose(role_to_sl.first), + utf8_type->decompose(role_to_sl.second), + }); + } + auto rows = ::make_shared(result(std::move(std::move(rs)))); + return ::static_pointer_cast(rows); + + }); +} + + +} +} diff --git a/cql3/statements/list_service_level_attachments_statement.hh b/cql3/statements/list_service_level_attachments_statement.hh new file mode 100644 index 0000000000..4b59607cb9 --- /dev/null +++ b/cql3/statements/list_service_level_attachments_statement.hh @@ -0,0 +1,47 @@ +/* + * Copyright (C) 2021 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#pragma once + +#include + +#include "cql3/statements/service_level_statement.hh" +#include "service/qos/qos_common.hh" + +namespace cql3 { +namespace statements { + +class list_service_level_attachments_statement final : public service_level_statement { + sstring _role_name; + bool _describe_all; +public: + list_service_level_attachments_statement(sstring role_name); + list_service_level_attachments_statement(); + std::unique_ptr prepare(database &db, cql_stats &stats) override; + void validate(service::storage_proxy&, const service::client_state&) const override; + virtual future<> check_access(service::storage_proxy& sp, const service::client_state&) const override; + virtual future<::shared_ptr> + execute(query_processor&, service::query_state&, const query_options&) const override; +}; + +} + +} diff --git a/cql3/statements/list_service_level_statement.cc b/cql3/statements/list_service_level_statement.cc new file mode 100644 index 0000000000..6c522bd281 --- /dev/null +++ b/cql3/statements/list_service_level_statement.cc @@ -0,0 +1,82 @@ +/* + * Copyright (C) 2021 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#include "seastarx.hh" +#include "cql3/statements/list_service_level_statement.hh" +#include "service/qos/service_level_controller.hh" +#include "transport/messages/result_message.hh" + +namespace cql3 { + +namespace statements { + +list_service_level_statement::list_service_level_statement(sstring service_level, bool describe_all) : + _service_level(service_level), _describe_all(describe_all) { +} + +std::unique_ptr +cql3::statements::list_service_level_statement::prepare( + database &db, cql_stats &stats) { + return std::make_unique(::make_shared(*this)); +} + +void list_service_level_statement::validate(service::storage_proxy &, const service::client_state &) const { +} + +future<> list_service_level_statement::check_access(service::storage_proxy& sp, const service::client_state &state) const { + return state.ensure_has_permission(auth::command_desc{.permission = auth::permission::DESCRIBE, .resource = auth::root_service_level_resource()}); +} + +future<::shared_ptr> +list_service_level_statement::execute(query_processor& qp, + service::query_state &state, + const query_options &) const { + + static auto make_column = [] (sstring name, const shared_ptr type) { + return make_lw_shared( + "QOS", + "service_levels", + ::make_shared(std::move(name), true), + type); + }; + + static thread_local const std::vector> metadata({make_column("service_level", utf8_type)}); + + return make_ready_future().then([this, &state] () { + if (_describe_all) { + return state.get_service_level_controller().get_distributed_service_levels(); + } else { + return state.get_service_level_controller().get_distributed_service_level(_service_level); + } + }) + .then([this] (qos::service_levels_info sl_info) { + auto rs = std::make_unique(metadata); + for (auto &&sl : sl_info) { + rs->add_row(std::vector{ + utf8_type->decompose(sl.first)}); + } + + auto rows = ::make_shared(result(std::move(std::move(rs)))); + return ::static_pointer_cast(rows); + }); +} +} +} diff --git a/cql3/statements/list_service_level_statement.hh b/cql3/statements/list_service_level_statement.hh new file mode 100644 index 0000000000..8bc14aff9e --- /dev/null +++ b/cql3/statements/list_service_level_statement.hh @@ -0,0 +1,46 @@ +/* + * Copyright (C) 2021 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#pragma once + +#include + +#include "cql3/statements/service_level_statement.hh" +#include "service/qos/qos_common.hh" + +namespace cql3 { +namespace statements { + +class list_service_level_statement final : public service_level_statement { + sstring _service_level; + bool _describe_all; +public: + list_service_level_statement(sstring service_level, bool describe_all); + std::unique_ptr prepare(database &db, cql_stats &stats) override; + void validate(service::storage_proxy&, const service::client_state&) const override; + virtual future<> check_access(service::storage_proxy& sp, const service::client_state&) const override; + virtual future<::shared_ptr> + execute(query_processor&, service::query_state&, const query_options&) const override; +}; + +} + +} diff --git a/cql3/statements/service_level_statement.cc b/cql3/statements/service_level_statement.cc new file mode 100644 index 0000000000..10789cc778 --- /dev/null +++ b/cql3/statements/service_level_statement.cc @@ -0,0 +1,53 @@ +/* + * Copyright (C) 2021 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#include "service_level_statement.hh" +#include "transport/messages/result_message.hh" + +namespace cql3 { + +namespace statements { + +uint32_t service_level_statement::get_bound_terms() const { + return 0; +} + +bool service_level_statement::depends_on_keyspace( + const sstring &ks_name) const { + return false; +} + +bool service_level_statement::depends_on_column_family( + const sstring &cf_name) const { + return false; +} + +void service_level_statement::validate( + service::storage_proxy &, + const service::client_state &state) const { +} + +future<> service_level_statement::check_access(service::storage_proxy& sp, const service::client_state &state) const { + return make_ready_future<>(); +} + +} +} diff --git a/cql3/statements/service_level_statement.hh b/cql3/statements/service_level_statement.hh new file mode 100644 index 0000000000..2ed5e586ff --- /dev/null +++ b/cql3/statements/service_level_statement.hh @@ -0,0 +1,70 @@ +/* + * Copyright (C) 2021 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#pragma once + +#include "cql3/cql_statement.hh" +#include "prepared_statement.hh" +#include "raw/parsed_statement.hh" +#include "transport/messages_fwd.hh" + +namespace cql3 { + +namespace statements { + +/// +/// A logical argument error for a service_level statement operation. +/// +class service_level_argument_exception : public std::invalid_argument { +public: + using std::invalid_argument::invalid_argument; +}; + +/// +/// An exception to indicate that the service level given as parameter doesn't exist. +/// +class nonexitent_service_level_exception : public service_level_argument_exception { +public: + nonexitent_service_level_exception(sstring service_level_name) + : service_level_argument_exception(format("Service Level {} doesn't exists.", service_level_name)) { + } +}; + + + + +class service_level_statement : public raw::parsed_statement, public cql_statement_no_metadata { +public: + service_level_statement() : cql_statement_no_metadata(&timeout_config::other_timeout) {} + + uint32_t get_bound_terms() const override; + + bool depends_on_keyspace(const sstring& ks_name) const override; + + bool depends_on_column_family(const sstring& cf_name) const override; + + future<> check_access(service::storage_proxy& sp, const service::client_state& state) const override; + + void validate(service::storage_proxy&, const service::client_state& state) const override; +}; + +} +} diff --git a/cql3/statements/sl_prop_defs.cc b/cql3/statements/sl_prop_defs.cc new file mode 100644 index 0000000000..ff7ffe4071 --- /dev/null +++ b/cql3/statements/sl_prop_defs.cc @@ -0,0 +1,36 @@ +/* + * Copyright (C) 2021 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#include "cql3/statements/sl_prop_defs.hh" +#include "database.hh" + + +namespace cql3 { + +namespace statements { + +void sl_prop_defs::validate() { + property_definitions::validate({}); +} + +} + +} diff --git a/cql3/statements/sl_prop_defs.hh b/cql3/statements/sl_prop_defs.hh new file mode 100644 index 0000000000..b2b9d9ad43 --- /dev/null +++ b/cql3/statements/sl_prop_defs.hh @@ -0,0 +1,43 @@ +/* + * Copyright (C) 2021 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#pragma once + +#include "cql3/statements/property_definitions.hh" + +#include +#include +#include + +class keyspace_metadata; + +namespace cql3 { + +namespace statements { + +class sl_prop_defs : public property_definitions { +public: + void validate(); +}; + +} + +} From 144fe02c232ea1b16c7bf87737b1bdcacf129f3c Mon Sep 17 00:00:00 2001 From: Eliran Sinvani Date: Wed, 20 Feb 2019 00:06:31 +0200 Subject: [PATCH 12/16] unit test: Add unit test for per user sla syntax This commit adds the infrastructure needed to test per user sla, more specificaly, a service level accessor that triggers the update_service_levels_from_distributed_data function uppon any change to the dystributed sla data. A test was added that indirectly consumes this infrastructure by changing the distributed service level data with cql queries. Message-Id: <23b2211e409446c4f4e3e57b00f78d9ff75fc978.1609249294.git.sarna@scylladb.com> --- test/boost/cql_query_test.cc | 79 +++++++++++++++++++ test/lib/cql_test_env.cc | 27 +++++-- test/lib/unit_test_service_levels_accessor.hh | 62 +++++++++++++++ 3 files changed, 162 insertions(+), 6 deletions(-) create mode 100644 test/lib/unit_test_service_levels_accessor.hh diff --git a/test/boost/cql_query_test.cc b/test/boost/cql_query_test.cc index 9b12b52f1f..1223784c94 100644 --- a/test/boost/cql_query_test.cc +++ b/test/boost/cql_query_test.cc @@ -54,6 +54,7 @@ #include #include "gms/feature.hh" #include "db/query_context.hh" +#include "service/qos/qos_common.hh" using namespace std::literals::chrono_literals; @@ -4881,3 +4882,81 @@ SEASTAR_THREAD_TEST_CASE(test_query_unselected_columns) { } }, std::move(cfg), thread_attributes{.sched_group = statement_sched_group}).get(); } + +SEASTAR_TEST_CASE(test_user_based_sla_queries) { + return do_with_cql_env_thread([] (cql_test_env& e) { + // test create service level with defaults + e.execute_cql("CREATE SERVICE_LEVEL sl_1;").get(); + auto msg = e.execute_cql("LIST SERVICE_LEVEL sl_1;").get0(); + assert_that(msg).is_rows().with_rows({ + {utf8_type->decompose("sl_1")}, + }); + e.execute_cql("CREATE SERVICE_LEVEL sl_2;").get(); + //drop service levels + e.execute_cql("DROP SERVICE_LEVEL sl_1;").get(); + msg = e.execute_cql("LIST ALL SERVICE_LEVELS;").get0(); + assert_that(msg).is_rows().with_rows({ + {utf8_type->decompose("sl_2")}, + }); + + // validate exceptions (illegal requests) + BOOST_REQUIRE_THROW(e.execute_cql("DROP SERVICE_LEVEL sl_1;").get(), qos::nonexistant_service_level_exception); + e.execute_cql("DROP SERVICE_LEVEL IF EXISTS sl_1;").get(); + + BOOST_REQUIRE_THROW(e.execute_cql("CREATE SERVICE_LEVEL sl_2;").get(), exceptions::invalid_request_exception); + BOOST_REQUIRE_THROW(e.execute_cql("CREATE SERVICE_LEVEL sl_2;").get(), exceptions::invalid_request_exception); + e.execute_cql("CREATE SERVICE_LEVEL IF NOT EXISTS sl_2;").get(); + + // test attach role + e.execute_cql("ATTACH SERVICE_LEVEL sl_2 TO tester").get(); + msg = e.execute_cql("LIST ATTACHED SERVICE_LEVEL OF tester;").get0(); + assert_that(msg).is_rows().with_rows({ + {utf8_type->decompose("tester"), utf8_type->decompose("sl_2")}, + }); + msg = e.execute_cql("LIST ALL ATTACHED SERVICE_LEVELS;").get0(); + assert_that(msg).is_rows().with_rows({ + {utf8_type->decompose("tester"), utf8_type->decompose("sl_2")}, + }); + + // test attachment illegal request + BOOST_REQUIRE_THROW(e.execute_cql("ATTACH SERVICE_LEVEL sl_2 TO tester2;").get(), auth::nonexistant_role); + BOOST_REQUIRE_THROW(e.execute_cql("ATTACH SERVICE_LEVEL sl_1 TO tester;").get(), qos::nonexistant_service_level_exception); + BOOST_CHECK(true); + // tests detaching service levels + e.execute_cql("CREATE ROLE tester2;").get(); + e.execute_cql("CREATE SERVICE_LEVEL sl_1;").get(); + e.execute_cql("ATTACH SERVICE_LEVEL sl_1 TO tester2;").get(); + e.execute_cql("DETACH SERVICE_LEVEL FROM tester;").get(); + msg = e.execute_cql("LIST ATTACHED SERVICE_LEVEL OF tester2;").get0(); + assert_that(msg).is_rows().with_rows({ + {utf8_type->decompose("tester2"), utf8_type->decompose("sl_1")}, + }); + BOOST_CHECK(true); + msg = e.execute_cql("LIST ATTACHED SERVICE_LEVEL OF tester;").get0(); + assert_that(msg).is_rows().with_rows({ + }); + BOOST_CHECK(true); + msg = e.execute_cql("LIST ALL ATTACHED SERVICE_LEVELS;").get0(); + assert_that(msg).is_rows().with_rows({ + {utf8_type->decompose("tester2"), utf8_type->decompose("sl_1")}, + }); + BOOST_CHECK(true); + //test implicit detach when removing role + e.execute_cql("DROP ROLE tester2;").get(); + msg = e.execute_cql("LIST ALL ATTACHED SERVICE_LEVELS;").get0(); + assert_that(msg).is_rows().with_rows({ + }); + BOOST_CHECK(true); + e.execute_cql("ATTACH SERVICE_LEVEL sl_1 TO tester;").get(); + msg = e.execute_cql("LIST ALL ATTACHED SERVICE_LEVELS;").get0(); + assert_that(msg).is_rows().with_rows({ + {utf8_type->decompose("tester"), utf8_type->decompose("sl_1")}, + }); + BOOST_CHECK(true); + //test implicit detach when removing service level + e.execute_cql("DROP SERVICE_LEVEL sl_1;").get(); + msg = e.execute_cql("LIST ALL ATTACHED SERVICE_LEVELS;").get0(); + assert_that(msg).is_rows().with_rows({ + }); + }); +} diff --git a/test/lib/cql_test_env.cc b/test/lib/cql_test_env.cc index ae7eb464a7..29a6ed32c2 100644 --- a/test/lib/cql_test_env.cc +++ b/test/lib/cql_test_env.cc @@ -51,6 +51,7 @@ #include "test/lib/reader_permit.hh" #include "db/query_context.hh" #include "test/lib/test_services.hh" +#include "unit_test_service_levels_accessor.hh" #include "db/view/view_builder.hh" #include "db/view/node_view_update_backlog.hh" #include "distributed_loader.hh" @@ -123,6 +124,7 @@ private: sharded& _view_update_generator; sharded& _mnotifier; sharded& _cdc_generation_service; + sharded& _sl_controller; private: struct core_local_state { service::client_state client_state; @@ -143,7 +145,7 @@ private: if (_db.local().has_keyspace(ks_name)) { _core_local.local().client_state.set_keyspace(_db.local(), ks_name); } - return ::make_shared(_core_local.local().client_state, empty_service_permit()); + return ::make_shared(_core_local.local().client_state, empty_service_permit(), _sl_controller.local()); } public: single_node_cql_env( @@ -154,7 +156,8 @@ public: sharded& view_builder, sharded& view_update_generator, sharded& mnotifier, - sharded& cdc_generation_service) + sharded& cdc_generation_service, + sharded &sl_controller) : _feature_service(feature_service) , _db(db) , _qp(qp) @@ -163,6 +166,7 @@ public: , _view_update_generator(view_update_generator) , _mnotifier(mnotifier) , _cdc_generation_service(cdc_generation_service) + , _sl_controller(sl_controller) { } virtual future<::shared_ptr> execute_cql(sstring_view text) override { @@ -438,15 +442,27 @@ public: mm_notif.start().get(); auto stop_mm_notify = defer([&mm_notif] { mm_notif.stop().get(); }); + sharded auth_service; + set_abort_on_internal_error(true); const gms::inet_address listen("127.0.0.1"); + auto sys_dist_ks = seastar::sharded(); + auto sl_controller = sharded(); + sl_controller.start(std::ref(auth_service), qos::service_level_options{}).get(); + auto stop_sl_controller = defer([&sl_controller] { sl_controller.stop().get(); }); + sl_controller.invoke_on_all(&qos::service_level_controller::start).get(); + sl_controller.invoke_on_all([&sys_dist_ks, &sl_controller] (qos::service_level_controller& service) { + qos::service_level_controller::service_level_distributed_data_accessor_ptr service_level_data_accessor = + ::static_pointer_cast( + make_shared(sl_controller,sys_dist_ks)); + return service.set_distributed_data_accessor(std::move(service_level_data_accessor)); + }).get(); sharded ms; // don't start listening so tests can be run in parallel ms.start(listen, std::move(7000)).get(); auto stop_ms = defer([&ms] { ms.stop().get(); }); - sharded auth_service; // Normally the auth server is already stopped in here, // but if there is an initialization failure we have to // make sure to stop it now or ~sharded will assert. @@ -454,7 +470,6 @@ public: auth_service.stop().get(); }); - auto sys_dist_ks = seastar::sharded(); auto stop_sys_dist_ks = defer([&sys_dist_ks] { sys_dist_ks.stop().get(); }); gms::feature_config fcfg = gms::feature_config_from_db_config(*cfg, cfg_in.disabled_features); @@ -529,7 +544,7 @@ public: sharded qp; cql3::query_processor::memory_config qp_mcfg = {memory::stats().total_memory() / 256, memory::stats().total_memory() / 2560}; - qp.start(std::ref(proxy), std::ref(db), std::ref(mm_notif), std::ref(mm), qp_mcfg, std::ref(cql_config)).get(); + qp.start(std::ref(proxy), std::ref(db), std::ref(mm_notif), std::ref(mm), qp_mcfg, std::ref(cql_config), std::ref(sl_controller)).get(); auto stop_qp = defer([&qp] { qp.stop().get(); }); // In main.cc we call db::system_keyspace::setup which calls @@ -643,7 +658,7 @@ public: // The default user may already exist if this `cql_test_env` is starting with previously populated data. } - single_node_cql_env env(feature_service, db, qp, auth_service, view_builder, view_update_generator, mm_notif, cdc_generation_service); + single_node_cql_env env(feature_service, db, qp, auth_service, view_builder, view_update_generator, mm_notif, cdc_generation_service, std::ref(sl_controller)); env.start().get(); auto stop_env = defer([&env] { env.stop().get(); }); diff --git a/test/lib/unit_test_service_levels_accessor.hh b/test/lib/unit_test_service_levels_accessor.hh new file mode 100644 index 0000000000..34a885db1e --- /dev/null +++ b/test/lib/unit_test_service_levels_accessor.hh @@ -0,0 +1,62 @@ +/* + * Copyright (C) 2021 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#include "service/qos/service_level_controller.hh" +#include "service/qos/qos_common.hh" +#include "db/system_distributed_keyspace.hh" +#pragma once + +namespace qos { + +/** + * This class is a helper for unit testing. It implements the service level distributed + * accessor interface in order to be used in the unit testing environment. The advantage + * of this class over the standard implementation is that it makes sure that updates are + * Immediately propagated to the underlying service level controller. + */ +class unit_test_service_levels_accessor : public service_level_controller::service_level_distributed_data_accessor { + sharded &_sl_controller; + sharded &_sys_dist_ks; +public: + unit_test_service_levels_accessor(sharded& sl_controller, sharded &sys_dist_ks) + : _sl_controller(sl_controller) + , _sys_dist_ks(sys_dist_ks) + {} + virtual future get_service_levels() const { + return _sys_dist_ks.local().get_service_levels(); + } + virtual future get_service_level(sstring service_level_name) const { + return _sys_dist_ks.local().get_service_level(service_level_name); + } + virtual future<> set_service_level(sstring service_level_name, qos::service_level_options slo) const { + return _sys_dist_ks.local().set_service_level(service_level_name, slo).then([this] () { + return _sl_controller.invoke_on_all(&service_level_controller::update_service_levels_from_distributed_data); + }); + + } + virtual future<> drop_service_level(sstring service_level_name) const { + return _sys_dist_ks.local().drop_service_level(service_level_name).then([this] () { + return _sl_controller.invoke_on_all(&service_level_controller::update_service_levels_from_distributed_data); + }); + } +}; + +} From c7f66d6fdd2ad828b6ce72d09a958613aa8494d5 Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Fri, 8 Jan 2021 10:41:15 +0100 Subject: [PATCH 13/16] cql3: add SERVICE LEVEL syntax (without an underscore) In order for the syntax to be more natural, it's now possible to use SERVICE LEVEL instead of SERVICE_LEVEL in all appropriate places. The old syntax is supported as well. --- cql3/Cql.g | 32 +++++++++++++++++++++++--------- 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/cql3/Cql.g b/cql3/Cql.g index 03d70be829..bcc991f83d 100644 --- a/cql3/Cql.g +++ b/cql3/Cql.g @@ -1257,6 +1257,14 @@ roleOption[cql3::role_options& opts] | K_LOGIN '=' b=BOOLEAN { opts.can_login = convert_boolean_literal($b.text); } ; +// Introduce a more natural syntax (SERVICE LEVEL), but still allow +// the original one (SERVICE_LEVEL) +serviceLevel + : K_SERVICE_LEVEL | ( K_SERVICE K_LEVEL ) + ; +serviceLevels + : K_SERVICE_LEVELS | ( K_SERVICE K_LEVELS ) + ; /** * CREATE SERVICE_LEVEL [IF NOT EXISTS] [WITH = ] */ @@ -1265,7 +1273,7 @@ createServiceLevelStatement returns [std::unique_ptr(); bool if_not_exists = false; } - : K_CREATE K_SERVICE_LEVEL (K_IF K_NOT K_EXISTS { if_not_exists = true; })? name=serviceLevelOrRoleName (K_WITH properties[*attrs])? + : K_CREATE serviceLevel (K_IF K_NOT K_EXISTS { if_not_exists = true; })? name=serviceLevelOrRoleName (K_WITH properties[*attrs])? { $stmt = std::make_unique(name, attrs, if_not_exists); } ; @@ -1276,7 +1284,7 @@ alterServiceLevelStatement returns [std::unique_ptr(); } - : K_ALTER K_SERVICE_LEVEL name=serviceLevelOrRoleName K_WITH properties[*attrs] + : K_ALTER serviceLevel name=serviceLevelOrRoleName K_WITH properties[*attrs] { $stmt = std::make_unique(name, attrs); } ; @@ -1287,7 +1295,7 @@ dropServiceLevelStatement returns [std::unique_ptr @init { bool if_exists = false; } - : K_DROP K_SERVICE_LEVEL (K_IF K_EXISTS { if_exists = true; })? name=serviceLevelOrRoleName + : K_DROP serviceLevel (K_IF K_EXISTS { if_exists = true; })? name=serviceLevelOrRoleName { $stmt = std::make_unique(name, if_exists); } ; @@ -1297,7 +1305,7 @@ dropServiceLevelStatement returns [std::unique_ptr attachServiceLevelStatement returns [std::unique_ptr stmt] @init { } - : K_ATTACH K_SERVICE_LEVEL service_level_name=serviceLevelOrRoleName K_TO role_name=serviceLevelOrRoleName + : K_ATTACH serviceLevel service_level_name=serviceLevelOrRoleName K_TO role_name=serviceLevelOrRoleName { $stmt = std::make_unique(service_level_name, role_name); } ; @@ -1307,7 +1315,7 @@ attachServiceLevelStatement returns [std::unique_ptr stmt] @init { } - : K_DETACH K_SERVICE_LEVEL K_FROM role_name=serviceLevelOrRoleName + : K_DETACH serviceLevel K_FROM role_name=serviceLevelOrRoleName { $stmt = std::make_unique(role_name); } ; @@ -1319,9 +1327,9 @@ detachServiceLevelStatement returns [std::unique_ptr stmt] @init { } - : K_LIST K_SERVICE_LEVEL service_level_name=serviceLevelOrRoleName + : K_LIST serviceLevel service_level_name=serviceLevelOrRoleName { $stmt = std::make_unique(service_level_name, false); } | - K_LIST K_ALL K_SERVICE_LEVELS + K_LIST K_ALL serviceLevels { $stmt = std::make_unique("", true); } ; @@ -1333,9 +1341,9 @@ listServiceLevelAttachStatement returns [std::unique_ptr(role_name); } | - K_LIST K_ALL K_ATTACHED K_SERVICE_LEVELS + K_LIST K_ALL K_ATTACHED serviceLevels { $stmt = std::make_unique(); } ; @@ -1883,6 +1891,9 @@ basic_unreserved_keyword returns [sstring str] | K_FOR | K_GROUP | K_TIMEOUT + | K_SERVICE + | K_LEVEL + | K_LEVELS ) { $str = $k.text; } ; @@ -2037,6 +2048,9 @@ K_DETACH: D E T A C H; K_SERVICE_LEVELS: S E R V I C E '_' L E V E L S; K_ATTACHED: A T T A C H E D; K_FOR: F O R; +K_SERVICE: S E R V I C E; +K_LEVEL: L E V E L; +K_LEVELS: L E V E L S; K_SCYLLA_TIMEUUID_LIST_INDEX: S C Y L L A '_' T I M E U U I D '_' L I S T '_' I N D E X; K_SCYLLA_COUNTER_SHARD_LIST: S C Y L L A '_' C O U N T E R '_' S H A R D '_' L I S T; From 3626bc253d35ddac1e7b229dd0dad74fdb5e6f77 Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Fri, 8 Jan 2021 11:14:58 +0100 Subject: [PATCH 14/16] service: make enable_shared_from_this inheritance public Without being public, making shared pointer from the service level accessor is not accessible outside of the class. --- service/qos/standard_service_level_distributed_data_accessor.hh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/service/qos/standard_service_level_distributed_data_accessor.hh b/service/qos/standard_service_level_distributed_data_accessor.hh index 89bc89c80a..3b648618ec 100644 --- a/service/qos/standard_service_level_distributed_data_accessor.hh +++ b/service/qos/standard_service_level_distributed_data_accessor.hh @@ -30,7 +30,7 @@ namespace db { } namespace qos { class standard_service_level_distributed_data_accessor : public service_level_controller::service_level_distributed_data_accessor, - ::enable_shared_from_this { + public ::enable_shared_from_this { private: db::system_distributed_keyspace& _sys_dist_ks; public: From 32bcbe59ad9f1f1990c4fa25b4dd6f7eda1742da Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Fri, 8 Jan 2021 11:15:59 +0100 Subject: [PATCH 15/16] main: add initializing service level data accessor The accessor must be set up in order to be able to use statement related to service level management. --- main.cc | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/main.cc b/main.cc index d62cb0ac59..a363cd528a 100644 --- a/main.cc +++ b/main.cc @@ -88,6 +88,7 @@ #include "alternator/tags_extension.hh" #include "alternator/rmw_operation.hh" #include "db/paxos_grace_seconds_extension.hh" +#include "service/qos/standard_service_level_distributed_data_accessor.hh" #include "service/raft/raft_services.hh" @@ -1180,6 +1181,11 @@ int main(int ac, char** av) { return ss.join_cluster(); }).get(); + sl_controller.invoke_on_all([] (qos::service_level_controller& controller) { + controller.set_distributed_data_accessor(::static_pointer_cast( + ::make_shared(sys_dist_ks.local()))); + }).get(); + supervisor::notify("starting tracing"); tracing::tracing::start_tracing(qp).get(); auto stop_tracing = defer_verbose_shutdown("tracing", [] { @@ -1439,6 +1445,10 @@ int main(int ac, char** av) { repair_shutdown(service::get_local_storage_service().db()).get(); }); + auto stop_sl_controller = defer_verbose_shutdown("service level controller", [] { + sl_controller.stop().get(); + }); + auto stop_view_update_generator = defer_verbose_shutdown("view update generator", [] { view_update_generator.stop().get(); }); From 26ee6aa1e9f26f90cf7298e7a992ede6d139d1cc Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Fri, 8 Jan 2021 12:45:34 +0100 Subject: [PATCH 16/16] transport: initialize query state with service level controller Query state should be aware of the service level controller in order to properly serve service-level-related CQL queries. --- main.cc | 2 +- transport/controller.cc | 8 +++++--- transport/controller.hh | 4 +++- transport/server.cc | 22 +++++++++++++--------- transport/server.hh | 9 ++++++--- 5 files changed, 28 insertions(+), 17 deletions(-) diff --git a/main.cc b/main.cc index a363cd528a..bfd27e57c1 100644 --- a/main.cc +++ b/main.cc @@ -1292,7 +1292,7 @@ int main(int ac, char** av) { db.revert_initial_system_read_concurrency_boost(); }).get(); - cql_transport::controller cql_server_ctl(db, auth_service, mm_notifier, gossiper.local(), qp, service_memory_limiter); + cql_transport::controller cql_server_ctl(db, auth_service, mm_notifier, gossiper.local(), qp, service_memory_limiter, sl_controller); ss.register_client_shutdown_hook("native transport", [&cql_server_ctl] { cql_server_ctl.stop().get(); diff --git a/transport/controller.cc b/transport/controller.cc index b1b3e6bcc8..22cd9a01f9 100644 --- a/transport/controller.cc +++ b/transport/controller.cc @@ -33,14 +33,16 @@ namespace cql_transport { static logging::logger logger("cql_server_controller"); -controller::controller(distributed& db, sharded& auth, sharded& mn, gms::gossiper& gossiper, sharded& qp, sharded& ml) +controller::controller(distributed& db, sharded& auth, sharded& mn, gms::gossiper& gossiper, sharded& qp, sharded& ml, + sharded& sl_controller) : _ops_sem(1) , _db(db) , _auth_service(auth) , _mnotifier(mn) , _gossiper(gossiper) , _qp(qp) - , _mem_limiter(ml) { + , _mem_limiter(ml) + , _sl_controller(sl_controller) { } future<> controller::start_server() { @@ -147,7 +149,7 @@ future<> controller::do_start_server() { } } - cserver->start(std::ref(_qp), std::ref(_auth_service), std::ref(_mnotifier), std::ref(_db), std::ref(_mem_limiter), cql_server_config).get(); + cserver->start(std::ref(_qp), std::ref(_auth_service), std::ref(_mnotifier), std::ref(_db), std::ref(_mem_limiter), cql_server_config, std::ref(_sl_controller)).get(); try { parallel_for_each(configs, [cserver, keepalive](const listen_cfg & cfg) { diff --git a/transport/controller.hh b/transport/controller.hh index 801a4db05c..03808e82a5 100644 --- a/transport/controller.hh +++ b/transport/controller.hh @@ -24,6 +24,7 @@ #include #include #include +#include "service/qos/service_level_controller.hh" using namespace seastar; @@ -50,13 +51,14 @@ class controller { gms::gossiper& _gossiper; sharded& _qp; sharded& _mem_limiter; + sharded& _sl_controller; future<> set_cql_ready(bool ready); future<> do_start_server(); future<> do_stop_server(); public: - controller(distributed&, sharded&, sharded&, gms::gossiper&, sharded&, sharded&); + controller(distributed&, sharded&, sharded&, gms::gossiper&, sharded&, sharded&, sharded&); future<> start_server(); future<> stop_server(); future<> stop(); diff --git a/transport/server.cc b/transport/server.cc index acb2cc2926..2fe7777517 100644 --- a/transport/server.cc +++ b/transport/server.cc @@ -160,7 +160,7 @@ event::event_type parse_event_type(const sstring& value) } cql_server::cql_server(distributed& qp, auth::service& auth_service, - service::migration_notifier& mn, database& db, service::memory_limiter& ml, cql_server_config config) + service::migration_notifier& mn, database& db, service::memory_limiter& ml, cql_server_config config, qos::service_level_controller& sl_controller) : _query_processor(qp) , _config(config) , _max_request_size(config.max_request_size) @@ -168,6 +168,7 @@ cql_server::cql_server(distributed& qp, auth::service& au , _memory_available(ml.get_semaphore()) , _notifier(std::make_unique(mn)) , _auth_service(auth_service) + , _sl_controller(sl_controller) { namespace sm = seastar::metrics; @@ -952,7 +953,7 @@ cql_server::connection::process_on_shard(unsigned shard, uint16_t stream, fragme (bytes_ostream& linearization_buffer, service::client_state& client_state) mutable { request_reader in(is, linearization_buffer); return process_fn(client_state, server._query_processor, in, stream, _version, _cql_serialization_format, - /* FIXME */empty_service_permit(), std::move(trace_state), false).then([] (auto msg) { + /* FIXME */empty_service_permit(), std::move(trace_state), false, _server._sl_controller).then([] (auto msg) { // result here has to be foreign ptr return std::get>>(std::move(msg)); }); @@ -967,7 +968,7 @@ cql_server::connection::process(uint16_t stream, request_reader in, service::cli fragmented_temporary_buffer::istream is = in.get_stream(); return process_fn(client_state, _server._query_processor, in, stream, - _version, _cql_serialization_format, permit, trace_state, true) + _version, _cql_serialization_format, permit, trace_state, true, _server._sl_controller) .then([stream, &client_state, this, is, permit, process_fn, trace_state] (std::variant>, unsigned> msg) mutable { unsigned* shard = std::get_if(&msg); @@ -981,9 +982,10 @@ cql_server::connection::process(uint16_t stream, request_reader in, service::cli static future>, unsigned>> process_query_internal(service::client_state& client_state, distributed& qp, request_reader in, uint16_t stream, cql_protocol_version_type version, cql_serialization_format serialization_format, - service_permit permit, tracing::trace_state_ptr trace_state, bool init_trace) { + service_permit permit, tracing::trace_state_ptr trace_state, bool init_trace, + qos::service_level_controller& sl_controller) { auto query = in.read_long_string_view(); - auto q_state = std::make_unique(client_state, trace_state, std::move(permit)); + auto q_state = std::make_unique(client_state, trace_state, std::move(permit), sl_controller); auto& query_state = q_state->query_state; q_state->options = in.read_options(version, serialization_format, qp.local().get_cql_config()); auto& options = *q_state->options; @@ -1048,7 +1050,8 @@ future> cql_server::connection::process_pr static future>, unsigned>> process_execute_internal(service::client_state& client_state, distributed& qp, request_reader in, uint16_t stream, cql_protocol_version_type version, cql_serialization_format serialization_format, - service_permit permit, tracing::trace_state_ptr trace_state, bool init_trace) { + service_permit permit, tracing::trace_state_ptr trace_state, bool init_trace, + qos::service_level_controller& sl_controller) { cql3::prepared_cache_key_type cache_key(in.read_short_bytes()); auto& id = cql3::prepared_cache_key_type::cql_id(cache_key); bool needs_authorization = false; @@ -1065,7 +1068,7 @@ process_execute_internal(service::client_state& client_state, distributed(client_state, trace_state, std::move(permit)); + auto q_state = std::make_unique(client_state, trace_state, std::move(permit), sl_controller); auto& query_state = q_state->query_state; if (version == 1) { std::vector values; @@ -1127,7 +1130,8 @@ future>> cql_server::connectio static future>, unsigned>> process_batch_internal(service::client_state& client_state, distributed& qp, request_reader in, uint16_t stream, cql_protocol_version_type version, cql_serialization_format serialization_format, - service_permit permit, tracing::trace_state_ptr trace_state, bool init_trace) { + service_permit permit, tracing::trace_state_ptr trace_state, bool init_trace, + qos::service_level_controller& sl_controller) { if (version == 1) { throw exceptions::protocol_exception("BATCH messages are not support in version 1 of the protocol"); } @@ -1212,7 +1216,7 @@ process_batch_internal(service::client_state& client_state, distributed(client_state, trace_state, std::move(permit)); + auto q_state = std::make_unique(client_state, trace_state, std::move(permit), sl_controller); auto& query_state = q_state->query_state; // #563. CQL v2 encodes query_options in v1 format for batch requests. q_state->options = std::make_unique(cql3::query_options::make_batch_options(std::move(*in.read_options(version < 3 ? 1 : version, serialization_format, diff --git a/transport/server.hh b/transport/server.hh index 55fe13ed0c..47cd88e788 100644 --- a/transport/server.hh +++ b/transport/server.hh @@ -40,6 +40,7 @@ #include "service_permit.hh" #include #include "utils/updateable_value.hh" +#include "service/qos/service_level_controller.hh" namespace scollectd { @@ -102,8 +103,8 @@ struct cql_query_state { service::query_state query_state; std::unique_ptr options; - cql_query_state(service::client_state& client_state, tracing::trace_state_ptr trace_state_ptr, service_permit permit) - : query_state(client_state, std::move(trace_state_ptr), std::move(permit)) + cql_query_state(service::client_state& client_state, tracing::trace_state_ptr trace_state_ptr, service_permit permit, qos::service_level_controller& sl_controller) + : query_state(client_state, std::move(trace_state_ptr), std::move(permit), sl_controller) { } }; @@ -157,10 +158,12 @@ private: private: transport_stats _stats = {}; auth::service& _auth_service; + qos::service_level_controller& _sl_controller; public: cql_server(distributed& qp, auth::service&, service::migration_notifier& mn, database& db, service::memory_limiter& ml, - cql_server_config config); + cql_server_config config, + qos::service_level_controller& sl_controller); future<> listen(socket_address addr, std::shared_ptr = {}, bool is_shard_aware = false, bool keepalive = false); future<> do_accepts(int which, bool keepalive, socket_address server_addr); future<> stop();