diff --git a/auth/resource.cc b/auth/resource.cc index 529a56d340..764e296456 100644 --- a/auth/resource.cc +++ b/auth/resource.cc @@ -56,6 +56,7 @@ std::ostream& operator<<(std::ostream& os, resource_kind kind) { switch (kind) { case resource_kind::data: os << "data"; break; case resource_kind::role: os << "role"; break; + case resource_kind::service_level: os << "service_level"; break; } return os; @@ -63,11 +64,13 @@ std::ostream& operator<<(std::ostream& os, resource_kind kind) { static const std::unordered_map roots{ {resource_kind::data, "data"}, - {resource_kind::role, "roles"}}; + {resource_kind::role, "roles"}, + {resource_kind::service_level, "service_levels"}}; static const std::unordered_map max_parts{ {resource_kind::data, 2}, - {resource_kind::role, 1}}; + {resource_kind::role, 1}, + {resource_kind::service_level, 0}}; static permission_set applicable_permissions(const data_resource_view& dv) { if (dv.table()) { @@ -101,6 +104,15 @@ static permission_set applicable_permissions(const role_resource_view& rv) { permission::DESCRIBE>(); } +static permission_set applicable_permissions(const service_level_resource_view &rv) { + return permission_set::of< + permission::CREATE, + permission::ALTER, + permission::DROP, + permission::DESCRIBE, + permission::AUTHORIZE>(); +} + resource::resource(resource_kind kind) : _kind(kind) { _parts.emplace_back(roots.at(kind)); } @@ -122,6 +134,9 @@ resource::resource(role_resource_t, std::string_view role) : resource(resource_k _parts.emplace_back(role); } +resource::resource(service_level_resource_t): resource(resource_kind::service_level) { +} + sstring resource::name() const { return boost::algorithm::join(_parts, "/"); } @@ -142,6 +157,7 @@ permission_set resource::applicable_permissions() const { switch (_kind) { case resource_kind::data: ps = ::auth::applicable_permissions(data_resource_view(*this)); break; case resource_kind::role: ps = ::auth::applicable_permissions(role_resource_view(*this)); break; + case resource_kind::service_level: ps = ::auth::applicable_permissions(service_level_resource_view(*this)); break; } return ps; @@ -163,11 +179,24 @@ std::ostream& operator<<(std::ostream& os, const resource& r) { switch (r.kind()) { case resource_kind::data: return os << data_resource_view(r); case resource_kind::role: return os << role_resource_view(r); + case resource_kind::service_level: return os << service_level_resource_view(r); } return os; } +service_level_resource_view::service_level_resource_view(const resource &r) : + _resource(r) { + if (r._kind != resource_kind::service_level) { + throw resource_kind_mismatch(resource_kind::service_level, r._kind); + } +} + +std::ostream &operator<<(std::ostream &os, const service_level_resource_view &v) { + os << ""; + return os; +} + data_resource_view::data_resource_view(const resource& r) : _resource(r) { if (r._kind != resource_kind::data) { throw resource_kind_mismatch(resource_kind::data, r._kind); @@ -276,6 +305,12 @@ const resource& root_role_resource() { return the_root_role_resource; } +static const resource the_root_service_level_resource{resource_kind::service_level}; + +const resource &root_service_level_resource() { + return the_root_service_level_resource; +} + resource_set expand_resource_family(const resource& rr) { resource r = rr; resource_set rs; diff --git a/auth/resource.hh b/auth/resource.hh index da527223d9..f79ae260d1 100644 --- a/auth/resource.hh +++ b/auth/resource.hh @@ -67,7 +67,7 @@ public: }; enum class resource_kind { - data, role + data, role, service_level }; std::ostream& operator<<(std::ostream&, resource_kind); @@ -82,6 +82,11 @@ struct data_resource_t final {}; /// struct role_resource_t final {}; +/// +/// Type tag for constructing service_level resources. +/// +struct service_level_resource_t final {}; + /// /// Resources are entities that users can be granted permissions on. /// @@ -108,6 +113,7 @@ public: resource(data_resource_t, std::string_view keyspace); resource(data_resource_t, std::string_view keyspace, std::string_view table); resource(role_resource_t, std::string_view role); + resource(service_level_resource_t); resource_kind kind() const noexcept { return _kind; @@ -128,6 +134,7 @@ private: friend class std::hash; friend class data_resource_view; friend class role_resource_view; + friend class service_level_resource_view; friend bool operator<(const resource&, const resource&); friend bool operator==(const resource&, const resource&); @@ -192,6 +199,22 @@ public: std::ostream& operator<<(std::ostream&, const role_resource_view&); +/// +/// A "service_level" view of \ref resource. +/// +class service_level_resource_view final { + const resource& _resource; + +public: + /// + /// \throws \ref resource_kind_mismatch if the argument is not a "service_level" resource. + /// + explicit service_level_resource_view(const resource&); + +}; + +std::ostream& operator<<(std::ostream&, const service_level_resource_view&); + /// /// Parse a resource from its name. /// @@ -214,6 +237,12 @@ inline resource make_role_resource(std::string_view role) { return resource(role_resource_t{}, role); } +const resource& root_service_level_resource(); + +inline resource make_service_level_resource() { + return resource(service_level_resource_t{}); +} + } namespace std { @@ -228,12 +257,17 @@ struct hash { return utils::tuple_hash()(std::make_tuple(auth::resource_kind::role, rv.role())); } + static size_t hash_service_level(const auth::service_level_resource_view& rv) { + return utils::tuple_hash()(std::make_tuple(auth::resource_kind::service_level)); + } + size_t operator()(const auth::resource& r) const { std::size_t value; switch (r._kind) { case auth::resource_kind::data: value = hash_data(auth::data_resource_view(r)); break; case auth::resource_kind::role: value = hash_role(auth::role_resource_view(r)); break; + case auth::resource_kind::service_level: value = hash_service_level(auth::service_level_resource_view(r)); break; } return value; diff --git a/auth/role_manager.hh b/auth/role_manager.hh index 12a9210639..4361b1f959 100644 --- a/auth/role_manager.hh +++ b/auth/role_manager.hh @@ -101,6 +101,11 @@ enum class recursive_role_query { yes, no }; /// access-control should never be enforced in implementations. /// class role_manager { +public: + // this type represents a mapping between a role and some attribute value. + // i.e: given attribute name 'a' this map holds role name and it's assigned + // value of 'a'. + using attribute_vals = std::unordered_map; public: virtual ~role_manager() = default; @@ -164,6 +169,26 @@ public: /// \returns an exceptional future with \ref nonexistant_role if the role does not exist. /// virtual future can_login(std::string_view role_name) const = 0; -}; + /// + /// \returns the value of the named attribute, if one is set. + /// + virtual future> get_attribute(std::string_view role_name, std::string_view attribute_name) const = 0; + + /// + /// \returns a mapping of each role's value for the named attribute, if one is set for the role. + /// + virtual future query_attribute_for_all(std::string_view attribute_name) const = 0; + + /// Sets `attribute_name` with `attribute_value` for `role_name`. + /// \returns an exceptional future with nonexistant_role if the role does not exist. + /// + virtual future<> set_attribute(std::string_view role_name, std::string_view attribute_name, std::string_view attribute_value) const = 0; + + /// Removes `attribute_name` for `role_name`. + /// \returns an exceptional future with nonexistant_role if the role does not exist. + /// \note: This is a no-op if the role does not have the named attribute set. + /// + virtual future<> remove_attribute(std::string_view role_name, std::string_view attribute_name) const = 0; +}; } diff --git a/auth/service.cc b/auth/service.cc index 327a1e5236..b0290bcd32 100644 --- a/auth/service.cc +++ b/auth/service.cc @@ -327,6 +327,8 @@ future service::exists(const resource& r) const { return make_ready_future(true); } + case resource_kind::service_level: + return make_ready_future(true); } return make_ready_future(false); diff --git a/auth/standard_role_manager.cc b/auth/standard_role_manager.cc index 57d253e844..5dcc23a413 100644 --- a/auth/standard_role_manager.cc +++ b/auth/standard_role_manager.cc @@ -53,6 +53,26 @@ constexpr std::string_view qualified_name("system_auth.role_members"); } +namespace role_attributes_table { +constexpr std::string_view name{"role_attributes", 15}; + +static std::string_view qualified_name() noexcept { + static const sstring instance = format("{}.{}", AUTH_KS, name); + return instance; +} +static std::string_view creation_query() noexcept { + static const sstring instance = format( + "CREATE TABLE {} (" + " role text," + " name text," + " value text," + " PRIMARY KEY(role, name)" + ")", + qualified_name()); + + return instance; +} +} } static logging::logger log("standard_role_manager"); @@ -152,6 +172,11 @@ future<> standard_role_manager::create_metadata_tables_if_missing() const { meta::role_members_table::name, _qp, create_role_members_query, + _migration_manager), + create_metadata_table_if_missing( + meta::role_attributes_table::name, + _qp, + meta::role_attributes_table::creation_query(), _migration_manager)).discard_result(); } @@ -345,6 +370,12 @@ future<> standard_role_manager::drop(std::string_view role_name) const { }); }; + // Delete all attributes for that role + const auto remove_attributes_of = [this, role_name] { + static const sstring query = format("DELETE FROM {} WHERE role = ?", meta::role_attributes_table::qualified_name()); + return _qp.execute_internal(query, {sstring(role_name)}).discard_result(); + }; + // Finally, delete the role itself. auto delete_role = [this, role_name] { static const sstring query = format("DELETE FROM {} WHERE {} = ?", @@ -358,7 +389,8 @@ future<> standard_role_manager::drop(std::string_view role_name) const { {sstring(role_name)}).discard_result(); }; - return when_all_succeed(revoke_from_members(), revoke_members_of()).then_unpack([delete_role = std::move(delete_role)] { + return when_all_succeed(revoke_from_members(), revoke_members_of(), + remove_attributes_of()).then_unpack([delete_role = std::move(delete_role)] { return delete_role(); }); }); @@ -536,4 +568,56 @@ future standard_role_manager::can_login(std::string_view role_name) const }); } +future> standard_role_manager::get_attribute(std::string_view role_name, std::string_view attribute_name) const { + static const sstring query = format("SELECT name, value FROM {} WHERE role = ? AND name = ?", meta::role_attributes_table::qualified_name()); + return _qp.execute_internal(query, {sstring(role_name), sstring(attribute_name)}).then([] (shared_ptr result_set) { + if (!result_set->empty()) { + const cql3::untyped_result_set_row &row = result_set->one(); + return std::optional(row.get_as("value")); + } + return std::optional{}; + }); +} + +future standard_role_manager::query_attribute_for_all (std::string_view attribute_name) const { + return query_all().then([this, attribute_name] (role_set roles) { + return do_with(attribute_vals{}, [this, attribute_name, roles = std::move(roles)] (attribute_vals &role_to_att_val) { + return parallel_for_each(roles.begin(), roles.end(), [this, &role_to_att_val, attribute_name] (sstring role) { + return get_attribute(role, attribute_name).then([&role_to_att_val, role] (std::optional att_val) { + if (att_val) { + role_to_att_val.emplace(std::move(role), std::move(*att_val)); + } + }); + }).then([&role_to_att_val] () { + return make_ready_future(std::move(role_to_att_val)); + }); + }); + }); +} + +future<> standard_role_manager::set_attribute(std::string_view role_name, std::string_view attribute_name, std::string_view attribute_value) const { + static const sstring query = format("INSERT INTO {} (role, name, value) VALUES (?, ?, ?)", meta::role_attributes_table::qualified_name()); + return do_with(sstring(role_name), sstring(attribute_name), sstring(attribute_value), [this] (sstring& role_name, sstring &attribute_name, + sstring &attribute_value) { + return exists(role_name).then([&role_name, &attribute_name, &attribute_value, this] (bool role_exists) { + if (!role_exists) { + throw auth::nonexistant_role(role_name); + } + return _qp.execute_internal(query, {sstring(role_name), sstring(attribute_name), sstring(attribute_value)}).discard_result(); + }); + }); + +} + +future<> standard_role_manager::remove_attribute(std::string_view role_name, std::string_view attribute_name) const { + static const sstring query = format("DELETE FROM {} WHERE role = ? AND name = ?", meta::role_attributes_table::qualified_name()); + return do_with(sstring(role_name), sstring(attribute_name), [this] (sstring& role_name, sstring &attribute_name) { + return exists(role_name).then([&role_name, &attribute_name, this] (bool role_exists) { + if (!role_exists) { + throw auth::nonexistant_role(role_name); + } + return _qp.execute_internal(query, {sstring(role_name), sstring(attribute_name)}).discard_result(); + }); + }); +} } diff --git a/auth/standard_role_manager.hh b/auth/standard_role_manager.hh index de2b0f1c64..c0fc42f489 100644 --- a/auth/standard_role_manager.hh +++ b/auth/standard_role_manager.hh @@ -83,6 +83,14 @@ public: virtual future can_login(std::string_view role_name) const override; + virtual future> get_attribute(std::string_view role_name, std::string_view attribute_name) const override; + + virtual future query_attribute_for_all(std::string_view attribute_name) const override; + + virtual future<> set_attribute(std::string_view role_name, std::string_view attribute_name, std::string_view attribute_value) const override; + + virtual future<> remove_attribute(std::string_view role_name, std::string_view attribute_name) const override; + private: enum class membership_change { add, remove }; diff --git a/configure.py b/configure.py index 60a0ee4bca..5f5bd62b54 100755 --- a/configure.py +++ b/configure.py @@ -716,6 +716,15 @@ scylla_core = (['database.cc', 'cql3/statements/alter_type_statement.cc', 'cql3/statements/alter_keyspace_statement.cc', 'cql3/statements/role-management-statements.cc', + 'cql3/statements/service_level_statement.cc', + 'cql3/statements/create_service_level_statement.cc', + 'cql3/statements/alter_service_level_statement.cc', + 'cql3/statements/sl_prop_defs.cc', + 'cql3/statements/drop_service_level_statement.cc', + 'cql3/statements/attach_service_level_statement.cc', + 'cql3/statements/detach_service_level_statement.cc', + 'cql3/statements/list_service_level_statement.cc', + 'cql3/statements/list_service_level_attachments_statement.cc', 'cql3/update_parameters.cc', 'cql3/util.cc', 'cql3/ut_name.cc', @@ -836,6 +845,9 @@ scylla_core = (['database.cc', 'service/misc_services.cc', 'service/pager/paging_state.cc', 'service/pager/query_pagers.cc', + 'service/qos/qos_common.cc', + 'service/qos/service_level_controller.cc', + 'service/qos/standard_service_level_distributed_data_accessor.cc', 'streaming/stream_task.cc', 'streaming/stream_session.cc', 'streaming/stream_request.cc', diff --git a/cql3/Cql.g b/cql3/Cql.g index 29a01a9480..bcc991f83d 100644 --- a/cql3/Cql.g +++ b/cql3/Cql.g @@ -37,6 +37,7 @@ options { #include "cql3/statements/alter_keyspace_statement.hh" #include "cql3/statements/alter_table_statement.hh" #include "cql3/statements/alter_view_statement.hh" +#include "cql3/statements/alter_service_level_statement.hh" #include "cql3/statements/create_keyspace_statement.hh" #include "cql3/statements/drop_keyspace_statement.hh" #include "cql3/statements/create_index_statement.hh" @@ -44,6 +45,9 @@ options { #include "cql3/statements/create_view_statement.hh" #include "cql3/statements/create_type_statement.hh" #include "cql3/statements/create_function_statement.hh" +#include "cql3/statements/create_service_level_statement.hh" +#include "cql3/statements/sl_prop_defs.hh" +#include "cql3/statements/attach_service_level_statement.hh" #include "cql3/statements/drop_type_statement.hh" #include "cql3/statements/alter_type_statement.hh" #include "cql3/statements/property_definitions.hh" @@ -51,6 +55,8 @@ options { #include "cql3/statements/drop_table_statement.hh" #include "cql3/statements/drop_view_statement.hh" #include "cql3/statements/drop_function_statement.hh" +#include "cql3/statements/drop_service_level_statement.hh" +#include "cql3/statements/detach_service_level_statement.hh" #include "cql3/statements/truncate_statement.hh" #include "cql3/statements/raw/update_statement.hh" #include "cql3/statements/raw/insert_statement.hh" @@ -64,6 +70,8 @@ options { #include "cql3/statements/list_permissions_statement.hh" #include "cql3/statements/alter_role_statement.hh" #include "cql3/statements/list_roles_statement.hh" +#include "cql3/statements/list_service_level_statement.hh" +#include "cql3/statements/list_service_level_attachments_statement.hh" #include "cql3/statements/grant_role_statement.hh" #include "cql3/statements/revoke_role_statement.hh" #include "cql3/statements/drop_role_statement.hh" @@ -194,7 +202,7 @@ struct uninitialized { listener->syntax_error(*this, token_names, ex); } - void add_recognition_error(const sstring& msg) { + void add_recognition_error(const sstring& msg) { listener->syntax_error(*this, msg); } @@ -370,6 +378,14 @@ cqlStatement returns [std::unique_ptr stmt] | st38=dropRoleStatement { $stmt = std::move(st38); } | st39=createRoleStatement { $stmt = std::move(st39); } | st40=alterRoleStatement { $stmt = std::move(st40); } + | st41=createServiceLevelStatement { $stmt = std::move(st41); } + | st42=alterServiceLevelStatement { $stmt = std::move(st42); } + | st43=dropServiceLevelStatement { $stmt = std::move(st43); } + | st44=attachServiceLevelStatement { $stmt = std::move(st44); } + | st45=detachServiceLevelStatement { $stmt = std::move(st45); } + | st46=listServiceLevelStatement { $stmt = std::move(st46); } + | st47=listServiceLevelAttachStatement { $stmt = std::move(st47); } + ; /* @@ -1241,6 +1257,96 @@ roleOption[cql3::role_options& opts] | K_LOGIN '=' b=BOOLEAN { opts.can_login = convert_boolean_literal($b.text); } ; +// Introduce a more natural syntax (SERVICE LEVEL), but still allow +// the original one (SERVICE_LEVEL) +serviceLevel + : K_SERVICE_LEVEL | ( K_SERVICE K_LEVEL ) + ; +serviceLevels + : K_SERVICE_LEVELS | ( K_SERVICE K_LEVELS ) + ; +/** + * CREATE SERVICE_LEVEL [IF NOT EXISTS] [WITH = ] + */ +createServiceLevelStatement returns [std::unique_ptr stmt] + @init { + auto attrs = make_shared(); + bool if_not_exists = false; + } + : K_CREATE serviceLevel (K_IF K_NOT K_EXISTS { if_not_exists = true; })? name=serviceLevelOrRoleName (K_WITH properties[*attrs])? + { $stmt = std::make_unique(name, attrs, if_not_exists); } + ; + +/** + * ALTER SERVICE_LEVEL WITH = + */ +alterServiceLevelStatement returns [std::unique_ptr stmt] + @init { + auto attrs = make_shared(); + } + : K_ALTER serviceLevel name=serviceLevelOrRoleName K_WITH properties[*attrs] + { $stmt = std::make_unique(name, attrs); } + ; + +/** + * DROP SERVICE_LEVEL [IF EXISTS] + */ +dropServiceLevelStatement returns [std::unique_ptr stmt] + @init { + bool if_exists = false; + } + : K_DROP serviceLevel (K_IF K_EXISTS { if_exists = true; })? name=serviceLevelOrRoleName + { $stmt = std::make_unique(name, if_exists); } + ; + +/** + * ATTACH SERVICE_LEVEL TO + */ +attachServiceLevelStatement returns [std::unique_ptr stmt] + @init { + } + : K_ATTACH serviceLevel service_level_name=serviceLevelOrRoleName K_TO role_name=serviceLevelOrRoleName + { $stmt = std::make_unique(service_level_name, role_name); } + ; + +/** + * DETACH SERVICE_LEVEL FROM + */ +detachServiceLevelStatement returns [std::unique_ptr stmt] + @init { + } + : K_DETACH serviceLevel K_FROM role_name=serviceLevelOrRoleName + { $stmt = std::make_unique(role_name); } + ; + + +/** + * LIST SERVICE_LEVEL + * LIST ALL SERVICE_LEVELS + */ +listServiceLevelStatement returns [std::unique_ptr stmt] + @init { + } + : K_LIST serviceLevel service_level_name=serviceLevelOrRoleName + { $stmt = std::make_unique(service_level_name, false); } | + K_LIST K_ALL serviceLevels + { $stmt = std::make_unique("", true); } + ; + +/** + * LIST ATTACHED SERVICE_LEVEL OF + * LIST ALL ATTACHED SERVICE_LEVELS + */ +listServiceLevelAttachStatement returns [std::unique_ptr stmt] + @init { + bool allow_nonexisting_roles = false; + } + : K_LIST K_ATTACHED serviceLevel K_OF role_name=serviceLevelOrRoleName + { $stmt = std::make_unique(role_name); } | + K_LIST K_ALL K_ATTACHED serviceLevels + { $stmt = std::make_unique(); } + ; + /** DEFINITIONS **/ // Column Identifiers. These need to be treated differently from other @@ -1286,6 +1392,16 @@ userOrRoleName returns [uninitialized name] | k=unreserved_keyword { $name = cql3::role_name(k, cql3::preserve_role_case::no); } | QMARK {add_recognition_error("Bind variables cannot be used for role names");} ; + +serviceLevelOrRoleName returns [sstring name] +: t=IDENT { $name = sstring($t.text); + std::transform($name.begin(), $name.end(), $name.begin(), ::tolower); } +| t=STRING_LITERAL { $name = sstring($t.text); } +| t=QUOTED_NAME { $name = sstring($t.text); } +| k=unreserved_keyword { $name = sstring($t.text); + std::transform($name.begin(), $name.end(), $name.begin(), ::tolower);} +| QMARK {add_recognition_error("Bind variables cannot be used for service levels or role names");} +; ksName[cql3::keyspace_element_name& name] : t=IDENT { $name.set_keyspace($t.text, false);} @@ -1767,8 +1883,17 @@ basic_unreserved_keyword returns [sstring str] | K_LIKE | K_PER | K_PARTITION + | K_SERVICE_LEVEL + | K_ATTACH + | K_DETACH + | K_SERVICE_LEVELS + | K_ATTACHED + | K_FOR | K_GROUP | K_TIMEOUT + | K_SERVICE + | K_LEVEL + | K_LEVELS ) { $str = $k.text; } ; @@ -1917,6 +2042,16 @@ K_CACHE: C A C H E; K_PER: P E R; K_PARTITION: P A R T I T I O N; +K_SERVICE_LEVEL: S E R V I C E '_' L E V E L; +K_ATTACH: A T T A C H; +K_DETACH: D E T A C H; +K_SERVICE_LEVELS: S E R V I C E '_' L E V E L S; +K_ATTACHED: A T T A C H E D; +K_FOR: F O R; +K_SERVICE: S E R V I C E; +K_LEVEL: L E V E L; +K_LEVELS: L E V E L S; + K_SCYLLA_TIMEUUID_LIST_INDEX: S C Y L L A '_' T I M E U U I D '_' L I S T '_' I N D E X; K_SCYLLA_COUNTER_SHARD_LIST: S C Y L L A '_' C O U N T E R '_' S H A R D '_' L I S T; K_SCYLLA_CLUSTERING_BOUND: S C Y L L A '_' C L U S T E R I N G '_' B O U N D; diff --git a/cql3/query_processor.cc b/cql3/query_processor.cc index d495ea4644..892a9fc95c 100644 --- a/cql3/query_processor.cc +++ b/cql3/query_processor.cc @@ -68,7 +68,7 @@ const std::chrono::minutes prepared_statements_cache::entry_expiry = std::chrono class query_processor::internal_state { service::query_state _qs; public: - internal_state() : _qs(service::client_state::for_internal_calls(), empty_service_permit()) { + internal_state(qos::service_level_controller &sl_controller) : _qs(service::client_state::for_internal_calls(), empty_service_permit(), sl_controller) { } operator service::query_state&() { return _qs; @@ -84,14 +84,15 @@ public: } }; -query_processor::query_processor(service::storage_proxy& proxy, database& db, service::migration_notifier& mn, service::migration_manager& mm, query_processor::memory_config mcfg, cql_config& cql_cfg) +query_processor::query_processor(service::storage_proxy& proxy, database& db, service::migration_notifier& mn, service::migration_manager& mm, query_processor::memory_config mcfg, cql_config& cql_cfg, + sharded &sl_controller) : _migration_subscriber{std::make_unique(this)} , _proxy(proxy) , _db(db) , _mnotifier(mn) , _mm(mm) , _cql_config(cql_cfg) - , _internal_state(new internal_state()) + , _internal_state(new internal_state(sl_controller.local())) , _prepared_cache(prep_cache_log, mcfg.prepared_statment_cache_size) , _authorized_prepared_cache(std::min(std::chrono::milliseconds(_db.get_config().permissions_validity_in_ms()), std::chrono::duration_cast(prepared_statements_cache::entry_expiry)), diff --git a/cql3/query_processor.hh b/cql3/query_processor.hh index 3808f4895a..5558dda085 100644 --- a/cql3/query_processor.hh +++ b/cql3/query_processor.hh @@ -154,7 +154,8 @@ public: static std::unique_ptr parse_statement(const std::string_view& query); - query_processor(service::storage_proxy& proxy, database& db, service::migration_notifier& mn, service::migration_manager& mm, memory_config mcfg, cql_config& cql_cfg); + query_processor(service::storage_proxy& proxy, database& db, service::migration_notifier& mn, service::migration_manager& mm, memory_config mcfg, cql_config& cql_cfg, + sharded &sl_controller); ~query_processor(); diff --git a/cql3/statements/alter_service_level_statement.cc b/cql3/statements/alter_service_level_statement.cc new file mode 100644 index 0000000000..2923e6a3ca --- /dev/null +++ b/cql3/statements/alter_service_level_statement.cc @@ -0,0 +1,60 @@ +/* + * Copyright (C) 2021 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#include "seastarx.hh" +#include "cql3/statements/alter_service_level_statement.hh" +#include "service/qos/service_level_controller.hh" +#include "transport/messages/result_message.hh" + +namespace cql3 { + +namespace statements { + +alter_service_level_statement::alter_service_level_statement(sstring service_level, shared_ptr attrs) + : _service_level(service_level) { + attrs->validate(); +} + +std::unique_ptr +cql3::statements::alter_service_level_statement::prepare( + database &db, cql_stats &stats) { + return std::make_unique(::make_shared(*this)); +} + +void alter_service_level_statement::validate(service::storage_proxy &, const service::client_state &) const { +} + +future<> alter_service_level_statement::check_access(service::storage_proxy& sp, const service::client_state &state) const { + return state.ensure_has_permission(auth::command_desc{.permission = auth::permission::ALTER, .resource = auth::root_service_level_resource()}); +} + +future<::shared_ptr> +alter_service_level_statement::execute(query_processor& qp, + service::query_state &state, + const query_options &) const { + return state.get_service_level_controller().alter_distributed_service_level(_service_level, _slo).then([] { + using void_result_msg = cql_transport::messages::result_message::void_message; + using result_msg = cql_transport::messages::result_message; + return ::static_pointer_cast(make_shared()); + }); +} +} +} diff --git a/cql3/statements/alter_service_level_statement.hh b/cql3/statements/alter_service_level_statement.hh new file mode 100644 index 0000000000..771165ac5b --- /dev/null +++ b/cql3/statements/alter_service_level_statement.hh @@ -0,0 +1,48 @@ +/* + * Copyright (C) 2021 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#pragma once + +#include + +#include "cql3/statements/service_level_statement.hh" +#include "cql3/statements/sl_prop_defs.hh" +#include "service/qos/qos_common.hh" + +namespace cql3 { +namespace statements { + +class alter_service_level_statement final : public service_level_statement { + sstring _service_level; + qos::service_level_options _slo; + +public: + alter_service_level_statement(sstring service_level, shared_ptr attrs); + std::unique_ptr prepare(database &db, cql_stats &stats) override; + void validate(service::storage_proxy&, const service::client_state&) const override; + virtual future<> check_access(service::storage_proxy& sp, const service::client_state&) const override; + virtual future<::shared_ptr> + execute(query_processor&, service::query_state&, const query_options&) const override; +}; + +} + +} diff --git a/cql3/statements/attach_service_level_statement.cc b/cql3/statements/attach_service_level_statement.cc new file mode 100644 index 0000000000..ac52532e25 --- /dev/null +++ b/cql3/statements/attach_service_level_statement.cc @@ -0,0 +1,67 @@ +/* + * Copyright (C) 2021 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#include "seastarx.hh" +#include "cql3/statements/attach_service_level_statement.hh" +#include "service/qos/service_level_controller.hh" +#include "exceptions/exceptions.hh" +#include "transport/messages/result_message.hh" + +namespace cql3 { + +namespace statements { + +attach_service_level_statement::attach_service_level_statement(sstring service_level, sstring role_name) : + _service_level(service_level), _role_name(role_name) { +} + +std::unique_ptr +cql3::statements::attach_service_level_statement::prepare( + database &db, cql_stats &stats) { + return std::make_unique(::make_shared(*this)); +} + +void attach_service_level_statement::validate(service::storage_proxy &, const service::client_state &) const { +} + +future<> attach_service_level_statement::check_access(service::storage_proxy& sp, const service::client_state &state) const { + return state.ensure_has_permission(auth::command_desc{.permission = auth::permission::AUTHORIZE, .resource = auth::root_service_level_resource()}); +} + +future<::shared_ptr> +attach_service_level_statement::execute(query_processor& qp, + service::query_state &state, + const query_options &) const { + return state.get_service_level_controller().get_distributed_service_level(_service_level).then([this] (qos::service_levels_info sli) { + if (sli.empty()) { + throw qos::nonexistant_service_level_exception(_service_level); + } + }).then([&state, this] () { + return state.get_client_state().get_auth_service()->underlying_role_manager().set_attribute(_role_name, "service_level", _service_level).then([] { + using void_result_msg = cql_transport::messages::result_message::void_message; + using result_msg = cql_transport::messages::result_message; + return ::static_pointer_cast(make_shared()); + }); + }); + +} +} +} diff --git a/cql3/statements/attach_service_level_statement.hh b/cql3/statements/attach_service_level_statement.hh new file mode 100644 index 0000000000..b30bcbc096 --- /dev/null +++ b/cql3/statements/attach_service_level_statement.hh @@ -0,0 +1,47 @@ +/* + * Copyright (C) 2021 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#pragma once + +#include + +#include "cql3/statements/service_level_statement.hh" +#include "service/qos/qos_common.hh" + +namespace cql3 { +namespace statements { + +class attach_service_level_statement final : public service_level_statement { + sstring _service_level; + sstring _role_name; + +public: + attach_service_level_statement(sstring service_level, sstring role_name); + std::unique_ptr prepare(database &db, cql_stats &stats) override; + void validate(service::storage_proxy&, const service::client_state&) const override; + virtual future<> check_access(service::storage_proxy& sp, const service::client_state&) const override; + virtual future<::shared_ptr> + execute(query_processor&, service::query_state&, const query_options&) const override; +}; + +} + +} diff --git a/cql3/statements/create_service_level_statement.cc b/cql3/statements/create_service_level_statement.cc new file mode 100644 index 0000000000..e8d1db69f8 --- /dev/null +++ b/cql3/statements/create_service_level_statement.cc @@ -0,0 +1,60 @@ +/* + * Copyright (C) 2021 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#include "seastarx.hh" +#include "cql3/statements/create_service_level_statement.hh" +#include "service/qos/service_level_controller.hh" +#include "transport/messages/result_message.hh" + +namespace cql3 { + +namespace statements { + +create_service_level_statement::create_service_level_statement(sstring service_level, shared_ptr attrs, bool if_not_exists) + : _service_level(service_level), _if_not_exists(if_not_exists) { + attrs->validate(); +} + +std::unique_ptr +cql3::statements::create_service_level_statement::prepare( + database &db, cql_stats &stats) { + return std::make_unique(::make_shared(*this)); +} + +void create_service_level_statement::validate(service::storage_proxy &, const service::client_state &) const { +} + +future<> create_service_level_statement::check_access(service::storage_proxy& sp, const service::client_state &state) const { + return state.ensure_has_permission(auth::command_desc{.permission = auth::permission::CREATE, .resource = auth::root_service_level_resource()}); +} + +future<::shared_ptr> +create_service_level_statement::execute(query_processor& qp, + service::query_state &state, + const query_options &) const { + return state.get_service_level_controller().add_distributed_service_level(_service_level, _slo, _if_not_exists).then([] { + using void_result_msg = cql_transport::messages::result_message::void_message; + using result_msg = cql_transport::messages::result_message; + return ::static_pointer_cast(make_shared()); + }); +} +} +} diff --git a/cql3/statements/create_service_level_statement.hh b/cql3/statements/create_service_level_statement.hh new file mode 100644 index 0000000000..3113a5f106 --- /dev/null +++ b/cql3/statements/create_service_level_statement.hh @@ -0,0 +1,49 @@ +/* + * Copyright (C) 2021 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#pragma once + +#include + +#include "cql3/statements/service_level_statement.hh" +#include "cql3/statements/sl_prop_defs.hh" +#include "service/qos/qos_common.hh" + +namespace cql3 { +namespace statements { + +class create_service_level_statement final : public service_level_statement { + sstring _service_level; + qos::service_level_options _slo; + bool _if_not_exists; + +public: + create_service_level_statement(sstring service_level, shared_ptr attrs, bool if_not_exists); + std::unique_ptr prepare(database &db, cql_stats &stats) override; + void validate(service::storage_proxy&, const service::client_state&) const override; + virtual future<> check_access(service::storage_proxy& sp, const service::client_state&) const override; + virtual future<::shared_ptr> + execute(query_processor&, service::query_state&, const query_options&) const override; +}; + +} + +} diff --git a/cql3/statements/detach_service_level_statement.cc b/cql3/statements/detach_service_level_statement.cc new file mode 100644 index 0000000000..90bfb44fe1 --- /dev/null +++ b/cql3/statements/detach_service_level_statement.cc @@ -0,0 +1,59 @@ +/* + * Copyright (C) 2021 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#include "seastarx.hh" +#include "cql3/statements/detach_service_level_statement.hh" +#include "service/qos/service_level_controller.hh" +#include "transport/messages/result_message.hh" + +namespace cql3 { + +namespace statements { + +detach_service_level_statement::detach_service_level_statement(sstring role_name) : + _role_name(role_name) { +} + +std::unique_ptr +cql3::statements::detach_service_level_statement::prepare( + database &db, cql_stats &stats) { + return std::make_unique(::make_shared(*this)); +} + +void detach_service_level_statement::validate(service::storage_proxy &, const service::client_state &) const { +} + +future<> detach_service_level_statement::check_access(service::storage_proxy& sp, const service::client_state &state) const { + return state.ensure_has_permission(auth::command_desc{.permission = auth::permission::AUTHORIZE, .resource = auth::root_service_level_resource()}); +} + +future<::shared_ptr> +detach_service_level_statement::execute(query_processor& qp, + service::query_state &state, + const query_options &) const { + return state.get_client_state().get_auth_service()->underlying_role_manager().remove_attribute(_role_name, "service_level").then([] { + using void_result_msg = cql_transport::messages::result_message::void_message; + using result_msg = cql_transport::messages::result_message; + return ::static_pointer_cast(make_shared()); + }); +} +} +} diff --git a/cql3/statements/detach_service_level_statement.hh b/cql3/statements/detach_service_level_statement.hh new file mode 100644 index 0000000000..82ea7792ff --- /dev/null +++ b/cql3/statements/detach_service_level_statement.hh @@ -0,0 +1,45 @@ +/* + * Copyright (C) 2021 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#pragma once + +#include + +#include "cql3/statements/service_level_statement.hh" +#include "service/qos/qos_common.hh" + +namespace cql3 { +namespace statements { + +class detach_service_level_statement final : public service_level_statement { + sstring _role_name; +public: + detach_service_level_statement(sstring role_name); + std::unique_ptr prepare(database &db, cql_stats &stats) override; + void validate(service::storage_proxy&, const service::client_state&) const override; + virtual future<> check_access(service::storage_proxy& sp, const service::client_state&) const override; + virtual future<::shared_ptr> + execute(query_processor&, service::query_state&, const query_options&) const override; +}; + +} + +} diff --git a/cql3/statements/drop_service_level_statement.cc b/cql3/statements/drop_service_level_statement.cc new file mode 100644 index 0000000000..10d7a774fc --- /dev/null +++ b/cql3/statements/drop_service_level_statement.cc @@ -0,0 +1,59 @@ +/* + * Copyright (C) 2021 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#include "seastarx.hh" +#include "cql3/statements/drop_service_level_statement.hh" +#include "service/qos/service_level_controller.hh" +#include "transport/messages/result_message.hh" + +namespace cql3 { + +namespace statements { + +drop_service_level_statement::drop_service_level_statement(sstring service_level, bool if_exists) : + _service_level(service_level), _if_exists(if_exists) { +} + +std::unique_ptr +cql3::statements::drop_service_level_statement::prepare( + database &db, cql_stats &stats) { + return std::make_unique(::make_shared(*this)); +} + +void drop_service_level_statement::validate(service::storage_proxy &, const service::client_state &) const { +} + +future<> drop_service_level_statement::check_access(service::storage_proxy& sp, const service::client_state &state) const { + return state.ensure_has_permission(auth::command_desc{.permission = auth::permission::DROP, .resource = auth::root_service_level_resource()}); +} + +future<::shared_ptr> +drop_service_level_statement::execute(query_processor& qp, + service::query_state &state, + const query_options &) const { + return state.get_service_level_controller().drop_distributed_service_level(_service_level, _if_exists).then([] { + using void_result_msg = cql_transport::messages::result_message::void_message; + using result_msg = cql_transport::messages::result_message; + return ::static_pointer_cast(make_shared()); + }); +} +} +} diff --git a/cql3/statements/drop_service_level_statement.hh b/cql3/statements/drop_service_level_statement.hh new file mode 100644 index 0000000000..bf26887c21 --- /dev/null +++ b/cql3/statements/drop_service_level_statement.hh @@ -0,0 +1,46 @@ +/* + * Copyright (C) 2021 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#pragma once + +#include + +#include "cql3/statements/service_level_statement.hh" +#include "service/qos/qos_common.hh" + +namespace cql3 { +namespace statements { + +class drop_service_level_statement final : public service_level_statement { + sstring _service_level; + bool _if_exists; +public: + drop_service_level_statement(sstring service_level, bool if_exists); + std::unique_ptr prepare(database &db, cql_stats &stats) override; + void validate(service::storage_proxy&, const service::client_state&) const override; + virtual future<> check_access(service::storage_proxy& sp, const service::client_state&) const override; + virtual future<::shared_ptr> + execute(query_processor&, service::query_state&, const query_options&) const override; +}; + +} + +} diff --git a/cql3/statements/list_service_level_attachments_statement.cc b/cql3/statements/list_service_level_attachments_statement.cc new file mode 100644 index 0000000000..07672aaaed --- /dev/null +++ b/cql3/statements/list_service_level_attachments_statement.cc @@ -0,0 +1,100 @@ +/* + * Copyright (C) 2021 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#include "seastarx.hh" +#include "cql3/statements/list_service_level_attachments_statement.hh" +#include "service/qos/service_level_controller.hh" +#include "transport/messages/result_message.hh" + +namespace cql3 { + +namespace statements { + +list_service_level_attachments_statement::list_service_level_attachments_statement(sstring role_name) : + _role_name(role_name), _describe_all(false) { +} + +list_service_level_attachments_statement::list_service_level_attachments_statement() : + _role_name(), _describe_all(true) { +} + +std::unique_ptr +cql3::statements::list_service_level_attachments_statement::prepare( + database &db, cql_stats &stats) { + return std::make_unique(::make_shared(*this)); +} + +void list_service_level_attachments_statement::validate(service::storage_proxy &, const service::client_state &) const { +} + +future<> list_service_level_attachments_statement::check_access(service::storage_proxy& sp, const service::client_state &state) const { + return state.ensure_has_permission(auth::command_desc{.permission = auth::permission::DESCRIBE, .resource = auth::root_service_level_resource()}); +} + +future<::shared_ptr> +list_service_level_attachments_statement::execute(query_processor& qp, + service::query_state &state, + const query_options &) const { + + static auto make_column = [] (sstring name, const shared_ptr type) { + return make_lw_shared( + "QOS", + "service_levels_attachments", + ::make_shared(std::move(name), true), + type); + }; + + static thread_local const std::vector> metadata({ + make_column("role", utf8_type), make_column("service_level", utf8_type) + }); + + + return make_ready_future().then([this, &state] () { + if (_describe_all) { + return state.get_client_state().get_auth_service()->underlying_role_manager().query_attribute_for_all("service_level"); + } else { + return state.get_client_state().get_auth_service()->underlying_role_manager().get_attribute(_role_name, "service_level").then([this] (std::optional att_val) { + std::unordered_map ret; + if (att_val) { + ret.emplace(_role_name, *att_val); + } + return make_ready_future>(ret); + }); + + } + }).then([this, &state] (std::unordered_map roles_to_att_val) { + + auto rs = std::make_unique(metadata); + for (auto&& role_to_sl : roles_to_att_val) { + rs->add_row(std::vector{ + utf8_type->decompose(role_to_sl.first), + utf8_type->decompose(role_to_sl.second), + }); + } + auto rows = ::make_shared(result(std::move(std::move(rs)))); + return ::static_pointer_cast(rows); + + }); +} + + +} +} diff --git a/cql3/statements/list_service_level_attachments_statement.hh b/cql3/statements/list_service_level_attachments_statement.hh new file mode 100644 index 0000000000..4b59607cb9 --- /dev/null +++ b/cql3/statements/list_service_level_attachments_statement.hh @@ -0,0 +1,47 @@ +/* + * Copyright (C) 2021 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#pragma once + +#include + +#include "cql3/statements/service_level_statement.hh" +#include "service/qos/qos_common.hh" + +namespace cql3 { +namespace statements { + +class list_service_level_attachments_statement final : public service_level_statement { + sstring _role_name; + bool _describe_all; +public: + list_service_level_attachments_statement(sstring role_name); + list_service_level_attachments_statement(); + std::unique_ptr prepare(database &db, cql_stats &stats) override; + void validate(service::storage_proxy&, const service::client_state&) const override; + virtual future<> check_access(service::storage_proxy& sp, const service::client_state&) const override; + virtual future<::shared_ptr> + execute(query_processor&, service::query_state&, const query_options&) const override; +}; + +} + +} diff --git a/cql3/statements/list_service_level_statement.cc b/cql3/statements/list_service_level_statement.cc new file mode 100644 index 0000000000..6c522bd281 --- /dev/null +++ b/cql3/statements/list_service_level_statement.cc @@ -0,0 +1,82 @@ +/* + * Copyright (C) 2021 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#include "seastarx.hh" +#include "cql3/statements/list_service_level_statement.hh" +#include "service/qos/service_level_controller.hh" +#include "transport/messages/result_message.hh" + +namespace cql3 { + +namespace statements { + +list_service_level_statement::list_service_level_statement(sstring service_level, bool describe_all) : + _service_level(service_level), _describe_all(describe_all) { +} + +std::unique_ptr +cql3::statements::list_service_level_statement::prepare( + database &db, cql_stats &stats) { + return std::make_unique(::make_shared(*this)); +} + +void list_service_level_statement::validate(service::storage_proxy &, const service::client_state &) const { +} + +future<> list_service_level_statement::check_access(service::storage_proxy& sp, const service::client_state &state) const { + return state.ensure_has_permission(auth::command_desc{.permission = auth::permission::DESCRIBE, .resource = auth::root_service_level_resource()}); +} + +future<::shared_ptr> +list_service_level_statement::execute(query_processor& qp, + service::query_state &state, + const query_options &) const { + + static auto make_column = [] (sstring name, const shared_ptr type) { + return make_lw_shared( + "QOS", + "service_levels", + ::make_shared(std::move(name), true), + type); + }; + + static thread_local const std::vector> metadata({make_column("service_level", utf8_type)}); + + return make_ready_future().then([this, &state] () { + if (_describe_all) { + return state.get_service_level_controller().get_distributed_service_levels(); + } else { + return state.get_service_level_controller().get_distributed_service_level(_service_level); + } + }) + .then([this] (qos::service_levels_info sl_info) { + auto rs = std::make_unique(metadata); + for (auto &&sl : sl_info) { + rs->add_row(std::vector{ + utf8_type->decompose(sl.first)}); + } + + auto rows = ::make_shared(result(std::move(std::move(rs)))); + return ::static_pointer_cast(rows); + }); +} +} +} diff --git a/cql3/statements/list_service_level_statement.hh b/cql3/statements/list_service_level_statement.hh new file mode 100644 index 0000000000..8bc14aff9e --- /dev/null +++ b/cql3/statements/list_service_level_statement.hh @@ -0,0 +1,46 @@ +/* + * Copyright (C) 2021 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#pragma once + +#include + +#include "cql3/statements/service_level_statement.hh" +#include "service/qos/qos_common.hh" + +namespace cql3 { +namespace statements { + +class list_service_level_statement final : public service_level_statement { + sstring _service_level; + bool _describe_all; +public: + list_service_level_statement(sstring service_level, bool describe_all); + std::unique_ptr prepare(database &db, cql_stats &stats) override; + void validate(service::storage_proxy&, const service::client_state&) const override; + virtual future<> check_access(service::storage_proxy& sp, const service::client_state&) const override; + virtual future<::shared_ptr> + execute(query_processor&, service::query_state&, const query_options&) const override; +}; + +} + +} diff --git a/cql3/statements/service_level_statement.cc b/cql3/statements/service_level_statement.cc new file mode 100644 index 0000000000..10789cc778 --- /dev/null +++ b/cql3/statements/service_level_statement.cc @@ -0,0 +1,53 @@ +/* + * Copyright (C) 2021 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#include "service_level_statement.hh" +#include "transport/messages/result_message.hh" + +namespace cql3 { + +namespace statements { + +uint32_t service_level_statement::get_bound_terms() const { + return 0; +} + +bool service_level_statement::depends_on_keyspace( + const sstring &ks_name) const { + return false; +} + +bool service_level_statement::depends_on_column_family( + const sstring &cf_name) const { + return false; +} + +void service_level_statement::validate( + service::storage_proxy &, + const service::client_state &state) const { +} + +future<> service_level_statement::check_access(service::storage_proxy& sp, const service::client_state &state) const { + return make_ready_future<>(); +} + +} +} diff --git a/cql3/statements/service_level_statement.hh b/cql3/statements/service_level_statement.hh new file mode 100644 index 0000000000..2ed5e586ff --- /dev/null +++ b/cql3/statements/service_level_statement.hh @@ -0,0 +1,70 @@ +/* + * Copyright (C) 2021 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#pragma once + +#include "cql3/cql_statement.hh" +#include "prepared_statement.hh" +#include "raw/parsed_statement.hh" +#include "transport/messages_fwd.hh" + +namespace cql3 { + +namespace statements { + +/// +/// A logical argument error for a service_level statement operation. +/// +class service_level_argument_exception : public std::invalid_argument { +public: + using std::invalid_argument::invalid_argument; +}; + +/// +/// An exception to indicate that the service level given as parameter doesn't exist. +/// +class nonexitent_service_level_exception : public service_level_argument_exception { +public: + nonexitent_service_level_exception(sstring service_level_name) + : service_level_argument_exception(format("Service Level {} doesn't exists.", service_level_name)) { + } +}; + + + + +class service_level_statement : public raw::parsed_statement, public cql_statement_no_metadata { +public: + service_level_statement() : cql_statement_no_metadata(&timeout_config::other_timeout) {} + + uint32_t get_bound_terms() const override; + + bool depends_on_keyspace(const sstring& ks_name) const override; + + bool depends_on_column_family(const sstring& cf_name) const override; + + future<> check_access(service::storage_proxy& sp, const service::client_state& state) const override; + + void validate(service::storage_proxy&, const service::client_state& state) const override; +}; + +} +} diff --git a/cql3/statements/sl_prop_defs.cc b/cql3/statements/sl_prop_defs.cc new file mode 100644 index 0000000000..ff7ffe4071 --- /dev/null +++ b/cql3/statements/sl_prop_defs.cc @@ -0,0 +1,36 @@ +/* + * Copyright (C) 2021 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#include "cql3/statements/sl_prop_defs.hh" +#include "database.hh" + + +namespace cql3 { + +namespace statements { + +void sl_prop_defs::validate() { + property_definitions::validate({}); +} + +} + +} diff --git a/cql3/statements/sl_prop_defs.hh b/cql3/statements/sl_prop_defs.hh new file mode 100644 index 0000000000..b2b9d9ad43 --- /dev/null +++ b/cql3/statements/sl_prop_defs.hh @@ -0,0 +1,43 @@ +/* + * Copyright (C) 2021 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#pragma once + +#include "cql3/statements/property_definitions.hh" + +#include +#include +#include + +class keyspace_metadata; + +namespace cql3 { + +namespace statements { + +class sl_prop_defs : public property_definitions { +public: + void validate(); +}; + +} + +} diff --git a/db/system_distributed_keyspace.cc b/db/system_distributed_keyspace.cc index e274f0aca2..4eecb77d98 100644 --- a/db/system_distributed_keyspace.cc +++ b/db/system_distributed_keyspace.cc @@ -128,12 +128,24 @@ schema_ptr cdc_timestamps() { static const sstring CDC_TIMESTAMPS_KEY = "timestamps"; +schema_ptr service_levels() { + static thread_local auto schema = [] { + auto id = generate_legacy_id(system_distributed_keyspace::NAME, system_distributed_keyspace::SERVICE_LEVELS); + return schema_builder(system_distributed_keyspace::NAME, system_distributed_keyspace::SERVICE_LEVELS, std::make_optional(id)) + .with_column("service_level", utf8_type, column_kind::partition_key) + .with_version(db::system_keyspace::generate_schema_version(id)) + .build(); + }(); + return schema; +} + static std::vector all_tables() { return { view_build_status(), cdc_generations(), cdc_desc(), cdc_timestamps(), + service_levels(), }; } @@ -516,4 +528,42 @@ system_distributed_keyspace::get_cdc_desc_v1_timestamps(context ctx) { co_return res; } +future system_distributed_keyspace::get_service_levels() const { + static sstring prepared_query = format("SELECT * FROM {}.{};", NAME, SERVICE_LEVELS); + + return _qp.execute_internal(prepared_query, {}).then([] (shared_ptr result_set) { + qos::service_levels_info service_levels; + for (auto &&row : *result_set) { + auto service_level_name = row.get_as("service_level"); + qos::service_level_options slo{}; + service_levels.emplace(service_level_name, slo); + } + return service_levels; + }); +} + +future system_distributed_keyspace::get_service_level(sstring service_level_name) const { + static sstring prepared_query = format("SELECT * FROM {}.{} WHERE service_level = ?;", NAME, SERVICE_LEVELS); + return _qp.execute_internal(prepared_query, {service_level_name}).then([] (shared_ptr result_set) { + qos::service_levels_info service_levels; + if (!result_set->empty()) { + auto &&row = result_set->one(); + auto service_level_name = row.get_as("service_level"); + qos::service_level_options slo{}; + service_levels.emplace(service_level_name, slo); + } + return service_levels; + }); +} + +future<> system_distributed_keyspace::set_service_level(sstring service_level_name, qos::service_level_options slo) const { + static sstring prepared_puery = format("INSERT INTO {}.{} (service_level) VALUES (?);", NAME, SERVICE_LEVELS); + return _qp.execute_internal(prepared_puery, {service_level_name}).discard_result(); +} + +future<> system_distributed_keyspace::drop_service_level(sstring service_level_name) const { + static sstring prepared_query = format("DELETE FROM {}.{} WHERE service_level= ?;", NAME, SERVICE_LEVELS); + return _qp.execute_internal(prepared_query, {service_level_name}).discard_result(); +} + } diff --git a/db/system_distributed_keyspace.hh b/db/system_distributed_keyspace.hh index 14c94f83b0..3bd7f4e445 100644 --- a/db/system_distributed_keyspace.hh +++ b/db/system_distributed_keyspace.hh @@ -24,6 +24,7 @@ #include "bytes.hh" #include "schema_fwd.hh" #include "service/migration_manager.hh" +#include "service/qos/qos_common.hh" #include "utils/UUID.hh" #include "cdc/generation_id.hh" @@ -52,6 +53,7 @@ class system_distributed_keyspace { public: static constexpr auto NAME = "system_distributed"; static constexpr auto VIEW_BUILD_STATUS = "view_build_status"; + static constexpr auto SERVICE_LEVELS = "service_levels"; /* Nodes use this table to communicate new CDC stream generations to other nodes. */ static constexpr auto CDC_TOPOLOGY_DESCRIPTION = "cdc_generation_descriptions"; @@ -111,6 +113,10 @@ public: future cdc_current_generation_timestamp(context); + future get_service_levels() const; + future get_service_level(sstring service_level_name) const; + future<> set_service_level(sstring service_level_name, qos::service_level_options slo) const; + future<> drop_service_level(sstring service_level_name) const; }; } diff --git a/docs/service_levels.md b/docs/service_levels.md new file mode 100644 index 0000000000..65b5b904dc --- /dev/null +++ b/docs/service_levels.md @@ -0,0 +1,38 @@ +## Service Level Distributed Data + +There are two system tables that are used to facilitate the service level feature. + + +### Service Level Attachment Table + +```CREATE TABLE system_auth.role_attributes ( + role text, + attribute_name text, + attribute_value text, + PRIMARY KEY (role, attribute_name)) +``` +The table was created with generality in mind, but its purpose is to record +information about roles. The table columns meaning are: +*role* - the name of the role that the attribute belongs to. +*attribute_name* - the name of the attribute for the role. +*attribute_value* - the value of the specified attribute. + +For the service level, the relevant attribute name is `service_level`. +So for example in order to find out which `service_level` is attached to role `r` +one can run the following query: + +```SELECT * FROM system_auth.role_attributes WHERE role='r' and attribute_name='service_level' +``` + +### Service Level Configuration Table + +```CREATE TABLE system_distributed.service_levels ( + service_level text PRIMARY KEY); +``` + +The table is used to store and distribute the service levels configuration. +The table column names meanings are: +*service_level* - the name of the service level. + +This table is currently a stub and does not hold any parameters yet. + ``` diff --git a/main.cc b/main.cc index c6e152e4dc..bfd27e57c1 100644 --- a/main.cc +++ b/main.cc @@ -35,6 +35,7 @@ #include "service/migration_manager.hh" #include "service/load_meter.hh" #include "service/view_update_backlog_broker.hh" +#include "service/qos/service_level_controller.hh" #include "streaming/stream_session.hh" #include "db/system_keyspace.hh" #include "db/system_distributed_keyspace.hh" @@ -87,6 +88,7 @@ #include "alternator/tags_extension.hh" #include "alternator/rmw_operation.hh" #include "db/paxos_grace_seconds_extension.hh" +#include "service/qos/standard_service_level_distributed_data_accessor.hh" #include "service/raft/raft_services.hh" @@ -795,6 +797,16 @@ int main(int ac, char** av) { mscfg.tcp_nodelay = netw::messaging_service::tcp_nodelay_what::local; } + static sharded auth_service; + static sharded sl_controller; + + //starting service level controller + qos::service_level_options default_service_level_configuration; + sl_controller.start(std::ref(auth_service), default_service_level_configuration).get(); + sl_controller.invoke_on_all(&qos::service_level_controller::start).get(); + //This starts the update loop - but no real update happens until the data accessor is not initialized. + sl_controller.local().update_from_distributed_data(std::chrono::seconds(10)); + netw::messaging_service::scheduling_config scfg; scfg.statement_tenants = { {dbcfg.statement_scheduling_group, "$user"}, {default_scheduling_group(), "$system"} }; scfg.streaming = dbcfg.streaming_scheduling_group; @@ -806,7 +818,6 @@ int main(int ac, char** av) { netw::uninit_messaging_service(messaging).get(); }); - static sharded auth_service; static sharded sys_dist_ks; static sharded view_update_generator; static sharded cql_config; @@ -934,7 +945,7 @@ int main(int ac, char** av) { supervisor::notify("starting query processor"); cql3::query_processor::memory_config qp_mcfg = {memory::stats().total_memory() / 256, memory::stats().total_memory() / 2560}; debug::the_query_processor = &qp; - qp.start(std::ref(proxy), std::ref(db), std::ref(mm_notifier), std::ref(mm), qp_mcfg, std::ref(cql_config)).get(); + qp.start(std::ref(proxy), std::ref(db), std::ref(mm_notifier), std::ref(mm), qp_mcfg, std::ref(cql_config), std::ref(sl_controller)).get(); // #293 - do not stop anything // engine().at_exit([&qp] { return qp.stop(); }); supervisor::notify("initializing batchlog manager"); @@ -1170,6 +1181,11 @@ int main(int ac, char** av) { return ss.join_cluster(); }).get(); + sl_controller.invoke_on_all([] (qos::service_level_controller& controller) { + controller.set_distributed_data_accessor(::static_pointer_cast( + ::make_shared(sys_dist_ks.local()))); + }).get(); + supervisor::notify("starting tracing"); tracing::tracing::start_tracing(qp).get(); auto stop_tracing = defer_verbose_shutdown("tracing", [] { @@ -1276,7 +1292,7 @@ int main(int ac, char** av) { db.revert_initial_system_read_concurrency_boost(); }).get(); - cql_transport::controller cql_server_ctl(db, auth_service, mm_notifier, gossiper.local(), qp, service_memory_limiter); + cql_transport::controller cql_server_ctl(db, auth_service, mm_notifier, gossiper.local(), qp, service_memory_limiter, sl_controller); ss.register_client_shutdown_hook("native transport", [&cql_server_ctl] { cql_server_ctl.stop().get(); @@ -1429,6 +1445,10 @@ int main(int ac, char** av) { repair_shutdown(service::get_local_storage_service().db()).get(); }); + auto stop_sl_controller = defer_verbose_shutdown("service level controller", [] { + sl_controller.stop().get(); + }); + auto stop_view_update_generator = defer_verbose_shutdown("view update generator", [] { view_update_generator.stop().get(); }); diff --git a/service/qos/qos_common.cc b/service/qos/qos_common.cc new file mode 100644 index 0000000000..d055307d36 --- /dev/null +++ b/service/qos/qos_common.cc @@ -0,0 +1,24 @@ +/* + * Copyright (C) 2020 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#include "qos_common.hh" +namespace qos { +} diff --git a/service/qos/qos_common.hh b/service/qos/qos_common.hh new file mode 100644 index 0000000000..cb9bceeac3 --- /dev/null +++ b/service/qos/qos_common.hh @@ -0,0 +1,63 @@ +/* + * Copyright (C) 2020 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#pragma once + +#include "seastarx.hh" +#include +#include +#include +#include + + + +namespace qos { + +/** + * a structure that holds the configuration for + * a service level. + */ +struct service_level_options { + bool operator==(const service_level_options& other) const = default; + bool operator!=(const service_level_options& other) const = default; +}; + +using service_levels_info = std::map; + +/// +/// A logical argument error for a service_level statement operation. +/// +class service_level_argument_exception : public std::invalid_argument { +public: + using std::invalid_argument::invalid_argument; +}; + +/// +/// An exception to indicate that the service level given as parameter doesn't exist. +/// +class nonexistant_service_level_exception : public service_level_argument_exception { +public: + nonexistant_service_level_exception(sstring service_level_name) + : service_level_argument_exception(format("Service Level {} doesn't exists.", service_level_name)) { + } +}; + +} diff --git a/service/qos/service_level_controller.cc b/service/qos/service_level_controller.cc new file mode 100644 index 0000000000..e554610594 --- /dev/null +++ b/service/qos/service_level_controller.cc @@ -0,0 +1,360 @@ +/* + * Copyright (C) 2020 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#include +#include "service_level_controller.hh" +#include "service/storage_service.hh" +#include "service/priority_manager.hh" +#include "message/messaging_service.hh" +#include "db/system_distributed_keyspace.hh" + +namespace qos { + +sstring service_level_controller::default_service_level_name = "default"; + + + +service_level_controller::service_level_controller(sharded& auth_service, service_level_options default_service_level_config): + _sl_data_accessor(nullptr), + _auth_service(auth_service) +{ + if (this_shard_id() == global_controller) { + _global_controller_db = std::make_unique(); + _global_controller_db->default_service_level_config = default_service_level_config; + } +} + +future<> service_level_controller::add_service_level(sstring name, service_level_options slo, bool is_static) { + return container().invoke_on(global_controller, [=] (service_level_controller &sl_controller) { + return with_semaphore(sl_controller._global_controller_db->notifications_serializer, 1, [&sl_controller, name, slo, is_static] () { + return sl_controller.do_add_service_level(name, slo, is_static); + }); + }); +} + +future<> service_level_controller::remove_service_level(sstring name, bool remove_static) { + return container().invoke_on(global_controller, [=] (service_level_controller &sl_controller) { + return with_semaphore(sl_controller._global_controller_db->notifications_serializer, 1, [&sl_controller, name, remove_static] () { + return sl_controller.do_remove_service_level(name, remove_static); + }); + }); +} + +future<> service_level_controller::start() { + if (this_shard_id() != global_controller) { + return make_ready_future(); + } + return with_semaphore(_global_controller_db->notifications_serializer, 1, [this] () { + return do_add_service_level(default_service_level_name, _global_controller_db->default_service_level_config, true).then([this] () { + return container().invoke_on_all([] (service_level_controller& sl) { + sl._default_service_level = sl.get_service_level(default_service_level_name); + }); + }); + }); +} + + +void service_level_controller::set_distributed_data_accessor(service_level_distributed_data_accessor_ptr sl_data_accessor) { + // unregistering the accessor is always legal + if (!sl_data_accessor) { + _sl_data_accessor = nullptr; + } + + // Registration of a new accessor can be done only when the _sl_data_accessor is not already set. + // This behavior is intended to allow to unit testing debug to set this value without having + // overriden by storage_proxy + if (!_sl_data_accessor) { + _sl_data_accessor = sl_data_accessor; + } +} + +future<> service_level_controller::stop() { + // unregister from the service level distributed data accessor. + _sl_data_accessor = nullptr; + if (this_shard_id() == global_controller) { + // abort the loop of the distributed data checking if it is running + _global_controller_db->dist_data_update_aborter.request_abort(); + _global_controller_db->notifications_serializer.broken(); + } + return std::exchange(_distributed_data_updater, make_ready_future<>()); +} + +future<> service_level_controller::update_service_levels_from_distributed_data() { + + if (!_sl_data_accessor) { + return make_ready_future(); + } + + return container().invoke_on(global_controller, [] (service_level_controller& sl_controller) { + return with_semaphore(sl_controller._global_controller_db->notifications_serializer, 1, [&sl_controller] () { + return async([&sl_controller] () { + service_levels_info service_levels = sl_controller._sl_data_accessor->get_service_levels().get0(); + service_levels_info service_levels_for_add_or_update; + service_levels_info service_levels_for_delete; + + auto current_it = sl_controller._service_levels_db.begin(); + auto new_state_it = service_levels.begin(); + + // we want to detect 3 kinds of objects in one pass - + // 1. new service levels that have been added to the distributed keyspace + // 2. existing service levels that have changed + // 3. removed service levels + // this loop is batching together add/update operation and remove operation + // then they are all executed together.The reason for this is to allow for + // firstly delete all that there is to be deleted and only then adding new + // service levels. + while (current_it != sl_controller._service_levels_db.end() && new_state_it != service_levels.end()) { + if (current_it->first == new_state_it->first) { + //the service level exists on both the cureent and new state. + if (current_it->second.slo != new_state_it->second) { + // The service level configuration is different + // in the new state and the old state, meaning it needs to be updated. + service_levels_for_add_or_update.insert(*new_state_it); + } + current_it++; + new_state_it++; + } else if (current_it->first < new_state_it->first) { + //The service level does not exists in the new state so it needs to be + //removed, but only if it is not static since static configurations dont + //come from the distributed keyspace but from code. + if (!current_it->second.is_static) { + service_levels_for_delete.emplace(current_it->first, current_it->second.slo); + } + current_it++; + } else { /*new_it->first < current_it->first */ + // The service level exits in the new state but not in the old state + // so it needs to be added. + service_levels_for_add_or_update.insert(*new_state_it); + new_state_it++; + } + } + + for (; current_it != sl_controller._service_levels_db.end(); current_it++) { + service_levels_for_delete.emplace(current_it->first, current_it->second.slo); + } + std::copy(new_state_it, service_levels.end(), std::inserter(service_levels_for_add_or_update, + service_levels_for_add_or_update.end())); + + for (auto&& sl : service_levels_for_delete) { + sl_controller.do_remove_service_level(sl.first, false).get(); + } + for (auto&& sl : service_levels_for_add_or_update) { + sl_controller.do_add_service_level(sl.first, sl.second).get(); + } + }); + }); + }); +} + +future service_level_controller::find_service_level(auth::role_set roles) { + // FIXME: this arbitrary order based on names is OK for now, because the service levels do not yet + // have any parameters. Once they do, this comparison could be based on comparing the parameters. + static auto sl_compare = std::less(); + auto& role_manager = _auth_service.local().underlying_role_manager(); + + // converts a list of roles into the chosen service level. + return ::map_reduce(roles.begin(), roles.end(), [&role_manager, this] (const sstring& role) { + return role_manager.get_attribute(role, "service_level").then_wrapped([this, role] (future> sl_name_fut) { + try { + std::optional sl_name = sl_name_fut.get0(); + if (! sl_name) { + return sl_name; + } + auto sl_it = _service_levels_db.find(*sl_name); + if ( sl_it == _service_levels_db.end()) { + return std::optional{}; + } else { + return sl_name; + } + } catch(...) { // when we fail, we act as if the attribute does not exist so the node + // will not be brought down. + return std::optional{}; + } + }); + }, std::optional{}, [this] (std::optional first, std::optional second) { + if (!second) { + return first; + } else if (!first) { + return second; + } else { + return std::optional{ sl_compare(*first, *second) ? second : first }; + } + }).then([] (std::optional sl) { + return sl? *sl:default_service_level_name; + }); +} + +future<> service_level_controller::notify_service_level_added(sstring name, service_level sl_data) { + _service_levels_db.emplace(name, sl_data); + return make_ready_future(); +} + +future<> service_level_controller::notify_service_level_updated(sstring name, service_level_options slo) { + auto sl_it = _service_levels_db.find(name); + future<> f = make_ready_future(); + if (sl_it != _service_levels_db.end()) { + sl_it->second.slo = slo; + } + return f; +} + +future<> service_level_controller::notify_service_level_removed(sstring name) { + auto sl_it = _service_levels_db.find(name); + if (sl_it != _service_levels_db.end()) { + _service_levels_db.erase(sl_it); + } + return make_ready_future(); +} + +void service_level_controller::update_from_distributed_data(std::chrono::duration interval) { + _distributed_data_updater = container().invoke_on(global_controller, [interval] (service_level_controller& global_sl) { + if (global_sl._global_controller_db->distributed_data_update.available()) { + global_sl._global_controller_db->distributed_data_update = repeat([interval, &global_sl] { + return sleep_abortable(std::chrono::duration_cast(interval), + global_sl._global_controller_db->dist_data_update_aborter).then_wrapped([&global_sl] (future<>&& f) { + try { + f.get(); + return global_sl.update_service_levels_from_distributed_data().then([] { + return stop_iteration::no; + }); + } + catch (const sleep_aborted& e) { + return make_ready_future>(stop_iteration::yes); + } + }); + }); + } + }); +} + +future<> service_level_controller::add_distributed_service_level(sstring name, service_level_options slo, bool if_not_exists) { + set_service_level_op_type add_type = if_not_exists ? set_service_level_op_type::add_if_not_exists : set_service_level_op_type::add; + return set_distributed_service_level(name, slo, add_type); +} + +future<> service_level_controller::alter_distributed_service_level(sstring name, service_level_options slo) { + return set_distributed_service_level(name, slo, set_service_level_op_type::alter); +} + +future<> service_level_controller::drop_distributed_service_level(sstring name, bool if_exists) { + return _sl_data_accessor->get_service_levels().then([this, name, if_exists] (qos::service_levels_info sl_info) { + auto it = sl_info.find(name); + if (it == sl_info.end()) { + if (if_exists) { + return make_ready_future(); + } else { + return make_exception_future(nonexistant_service_level_exception(name)); + } + } else { + auto& role_manager = _auth_service.local().underlying_role_manager(); + return role_manager.query_attribute_for_all("service_level").then( [&role_manager, name] (auth::role_manager::attribute_vals attributes) { + return parallel_for_each(attributes.begin(), attributes.end(), [&role_manager, name] (auto&& attr) { + if (attr.second == name) { + return role_manager.remove_attribute(attr.first, "service_level"); + } else { + return make_ready_future(); + } + }); + }).then([this, name] { + return _sl_data_accessor->drop_service_level(name); + }); + } + }); +} + +future service_level_controller::get_distributed_service_levels() { + return _sl_data_accessor->get_service_levels(); +} + +future service_level_controller::get_distributed_service_level(sstring service_level_name) { + return _sl_data_accessor->get_service_level(service_level_name); +} + +future<> service_level_controller::set_distributed_service_level(sstring name, service_level_options slo, set_service_level_op_type op_type) { + return _sl_data_accessor->get_service_levels().then([this, name, slo, op_type] (qos::service_levels_info sl_info) { + auto it = sl_info.find(name); + // test for illegal requests or requests that should terminate without any action + if (it == sl_info.end()) { + if (op_type == set_service_level_op_type::alter) { + return make_exception_future(exceptions::invalid_request_exception(format("The service level '{}' desn't exist.", name))); + } + } else { + if (op_type == set_service_level_op_type::add) { + return make_exception_future(exceptions::invalid_request_exception(format("The service level '{}' already exists.", name))); + } else if (op_type == set_service_level_op_type::add_if_not_exists) { + return make_ready_future(); + } + } + return _sl_data_accessor->set_service_level(name, slo); + }); +} + +future<> service_level_controller::do_add_service_level(sstring name, service_level_options slo, bool is_static) { + auto service_level_it = _service_levels_db.find(name); + if (is_static) { + _global_controller_db->static_configurations[name] = slo; + } + if (service_level_it != _service_levels_db.end()) { + if ((is_static && service_level_it->second.is_static) || !is_static) { + if ((service_level_it->second.is_static) && (!is_static)) { + service_level_it->second.is_static = false; + } + return container().invoke_on_all(&service_level_controller::notify_service_level_updated, name, slo); + } else { + // this means we set static layer when the the service level + // is running of the non static configuration. so we have nothing + // else to do since we already saved the static configuration. + return make_ready_future(); + } + } else { + return do_with(service_level{.slo = slo, .is_static = is_static}, std::move(name), [this] (service_level& sl, sstring& name) { + return container().invoke_on_all(&service_level_controller::notify_service_level_added, name, sl); + }); + } + return make_ready_future(); +} + +future<> service_level_controller::do_remove_service_level(sstring name, bool remove_static) { + auto service_level_it = _service_levels_db.find(name); + if (service_level_it != _service_levels_db.end()) { + auto static_conf_it = _global_controller_db->static_configurations.end(); + bool static_exists = false; + if (remove_static) { + _global_controller_db->static_configurations.erase(name); + } else { + static_conf_it = _global_controller_db->static_configurations.find(name); + static_exists = static_conf_it != _global_controller_db->static_configurations.end(); + } + if (remove_static && service_level_it->second.is_static) { + return container().invoke_on_all(&service_level_controller::notify_service_level_removed, name); + } else if (!remove_static && !service_level_it->second.is_static) { + if (static_exists) { + service_level_it->second.is_static = true; + return container().invoke_on_all(&service_level_controller::notify_service_level_updated, name, static_conf_it->second); + } else { + return container().invoke_on_all(&service_level_controller::notify_service_level_removed, name); + } + } + } + return make_ready_future(); +} + +} diff --git a/service/qos/service_level_controller.hh b/service/qos/service_level_controller.hh new file mode 100644 index 0000000000..1831bfa2bc --- /dev/null +++ b/service/qos/service_level_controller.hh @@ -0,0 +1,242 @@ +/* + * Copyright (C) 2020 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#pragma once + +#include "seastarx.hh" +#include "log.hh" +#include "auth/role_manager.hh" +#include "auth/authenticated_user.hh" +#include +#include +#include "auth/service.hh" +#include "service/storage_service.hh" +#include +#include +#include "qos_common.hh" + +namespace db { + class system_distributed_keyspace; +} +namespace qos { +/** + * a structure to hold a service level + * data and configuration. + */ +struct service_level { + service_level_options slo; + bool marked_for_deletion; + bool is_static; +}; + +/** + * The service_level_controller class is an implementation of the service level + * controller design. + * It is logically divided into 2 parts: + * 1. Global controller which is responsible for all of the data and plumbing + * manipulation. + * 2. Local controllers that act upon the data and facilitates execution in + * the service level context + */ +class service_level_controller : public peering_sharded_service { +public: + class service_level_distributed_data_accessor { + public: + virtual future get_service_levels() const = 0; + virtual future get_service_level(sstring service_level_name) const = 0; + virtual future<> set_service_level(sstring service_level_name, qos::service_level_options slo) const = 0; + virtual future<> drop_service_level(sstring service_level_name) const = 0; + }; + using service_level_distributed_data_accessor_ptr = ::shared_ptr; +private: + struct global_controller_data { + service_levels_info static_configurations{}; + int schedg_group_cnt = 0; + int io_priority_cnt = 0; + service_level_options default_service_level_config; + // The below future is used to serialize work so no reordering can occur. + // This is needed so for example: delete(x), add(x) will not reverse yielding + // a completely different result than the one intended. + future<> work_future = make_ready_future(); + semaphore notifications_serializer = semaphore(1); + future<> distributed_data_update = make_ready_future(); + abort_source dist_data_update_aborter; + }; + + std::unique_ptr _global_controller_db; + + static constexpr shard_id global_controller = 0; + + std::unordered_map _service_levels_db; + std::unordered_map _role_to_service_level; + service_level _default_service_level; + service_level_distributed_data_accessor_ptr _sl_data_accessor; + sharded& _auth_service; + future<> _distributed_data_updater = make_ready_future<>(); +public: + service_level_controller(sharded& auth_service, service_level_options default_service_level_config); + + /** + * this function must be called *once* from any shard before any other functions are called. + * No other function should be called before the future returned by the function is resolved. + * @return a future that resolves when the initialization is over. + */ + future<> start(); + + void set_distributed_data_accessor(service_level_distributed_data_accessor_ptr sl_data_accessor); + + /** + * Adds a service level configuration if it doesn't exists, and updates + * an the existing one if it does exist. + * Handles both, static and non static service level configurations. + * @param name - the service level name. + * @param slo - the service level configuration + * @param is_static - is this configuration static or not + */ + future<> add_service_level(sstring name, service_level_options slo, bool is_static = false); + + /** + * Removes a service level configuration if it exists. + * Handles both, static and non static service level configurations. + * @param name - the service level name. + * @param remove_static - indicates if it is a removal of a static configuration + * or not. + */ + future<> remove_service_level(sstring name, bool remove_static); + + /** + * stops all ongoing operations if exists + * @return a future that is resolved when all operations has stopped + */ + future<> stop(); + + /** + * this is an executor of a function with arguments under a service level + * that corresponds to a given user. + * @param usr - the user for determining the service level + * @param func - the function to be executed + * @return a future that is resolved when the function's operation is resolved + * (if it returns a future). or a ready future containing the returned value + * from the function/ + */ + template + futurize_t with_user_service_level(shared_ptr usr, noncopyable_function func) { + if (usr) { + auth::service& ser = _auth_service.local(); + return auth::get_roles(ser, *usr).then([this] (auth::role_set roles) { + return find_service_level(roles); + }).then([usr, this, func = std::move(func)] (sstring service_level_name) mutable { + return with_service_level(service_level_name, std::move(func)); + }); + } else { + return with_service_level(default_service_level_name, std::move(func)); + } + } + + /** + * Chack the distributed data for changes in a constant interval and updates + * the service_levels configuration in accordance (adds, removes, or updates + * service levels as necessairy). + * @param interval - the interval is seconds to check the distributed data. + * @return a future that is resolved when the update loop stops. + */ + void update_from_distributed_data(std::chrono::duration interval); + + /** + * Updates the service level data from the distributed data store. + * @return a future that is resolved when the update is done + */ + future<> update_service_levels_from_distributed_data(); + + + future<> add_distributed_service_level(sstring name, service_level_options slo, bool if_not_exsists); + future<> alter_distributed_service_level(sstring name, service_level_options slo); + future<> drop_distributed_service_level(sstring name, bool if_exists); + future get_distributed_service_levels(); + future get_distributed_service_level(sstring service_level_name); + +private: + /** + * Adds a service level configuration if it doesn't exists, and updates + * an the existing one if it does exist. + * Handles both, static and non static service level configurations. + * @param name - the service level name. + * @param slo - the service level configuration + * @param is_static - is this configuration static or not + */ + future<> do_add_service_level(sstring name, service_level_options slo, bool is_static = false); + + /** + * Removes a service level configuration if it exists. + * Handles both, static and non static service level configurations. + * @param name - the service level name. + * @param remove_static - indicates if it is a removal of a static configuration + * or not. + */ + future<> do_remove_service_level(sstring name, bool remove_static); + + + + /** + * Returns the service level **in effect** for a user having the given + * collection of roles. + * @param roles - the collection of roles to consider + * @return the name of the service level in effect. + */ + future find_service_level(auth::role_set roles); + + /** + * The notify functions are used by the global service level controller + * to propagate configuration changes to the local controllers. + * the returned future is resolved when the local controller is done acting + * on the notification. updates are done in sequence so their meaning will not + * change due to execution reordering. + */ + future<> notify_service_level_added(sstring name, service_level sl_data); + future<> notify_service_level_updated(sstring name, service_level_options slo); + future<> notify_service_level_removed(sstring name); + + /** + * Gets the service level data by name. + * @param service_level_name - the name of the requested service level + * @return the service level data if it exists (in the local controller) or + * get_service_level("default") otherwise. + */ + service_level& get_service_level(sstring service_level_name) { + auto sl_it = _service_levels_db.find(service_level_name); + if (sl_it == _service_levels_db.end() || sl_it->second.marked_for_deletion) { + sl_it = _service_levels_db.find(default_service_level_name); + } + return sl_it->second; + } + + enum class set_service_level_op_type { + add_if_not_exists, + add, + alter + }; + + future<> set_distributed_service_level(sstring name, service_level_options slo, set_service_level_op_type op_type); + +public: + static sstring default_service_level_name; +}; +} diff --git a/service/qos/standard_service_level_distributed_data_accessor.cc b/service/qos/standard_service_level_distributed_data_accessor.cc new file mode 100644 index 0000000000..2beb5b1b93 --- /dev/null +++ b/service/qos/standard_service_level_distributed_data_accessor.cc @@ -0,0 +1,47 @@ +/* + * Copyright (C) 2020 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#include "standard_service_level_distributed_data_accessor.hh" +#include "db/system_distributed_keyspace.hh" + +namespace qos { + +standard_service_level_distributed_data_accessor::standard_service_level_distributed_data_accessor(db::system_distributed_keyspace &sys_dist_ks): +_sys_dist_ks(sys_dist_ks) { +} + +future standard_service_level_distributed_data_accessor::get_service_levels() const { + return _sys_dist_ks.get_service_levels(); +} + +future standard_service_level_distributed_data_accessor::get_service_level(sstring service_level_name) const { + return _sys_dist_ks.get_service_level(service_level_name); +} + +future<> standard_service_level_distributed_data_accessor::set_service_level(sstring service_level_name, qos::service_level_options slo) const { + return _sys_dist_ks.set_service_level(service_level_name, slo); +} + +future<> standard_service_level_distributed_data_accessor::drop_service_level(sstring service_level_name) const { + return _sys_dist_ks.drop_service_level(service_level_name); +} + +} diff --git a/service/qos/standard_service_level_distributed_data_accessor.hh b/service/qos/standard_service_level_distributed_data_accessor.hh new file mode 100644 index 0000000000..3b648618ec --- /dev/null +++ b/service/qos/standard_service_level_distributed_data_accessor.hh @@ -0,0 +1,43 @@ +/* + * Copyright (C) 2020 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#pragma once + +#include "seastarx.hh" +#include "service_level_controller.hh" + + +namespace db { + class system_distributed_keyspace; +} +namespace qos { +class standard_service_level_distributed_data_accessor : public service_level_controller::service_level_distributed_data_accessor, + public ::enable_shared_from_this { +private: + db::system_distributed_keyspace& _sys_dist_ks; +public: + standard_service_level_distributed_data_accessor(db::system_distributed_keyspace &sys_dist_ks); + virtual future get_service_levels() const override; + virtual future get_service_level(sstring service_level_name) const override; + virtual future<> set_service_level(sstring service_level_name, qos::service_level_options slo) const override; + virtual future<> drop_service_level(sstring service_level_name) const override; +}; +} diff --git a/service/query_state.hh b/service/query_state.hh index 0e9e3a5452..df4e4e486b 100644 --- a/service/query_state.hh +++ b/service/query_state.hh @@ -27,6 +27,9 @@ #include "tracing/tracing.hh" #include "service_permit.hh" +namespace qos { +class service_level_controller; +} namespace service { class query_state final { @@ -34,13 +37,21 @@ private: client_state& _client_state; tracing::trace_state_ptr _trace_state_ptr; service_permit _permit; + std::optional> _sl_controller; public: query_state(client_state& client_state, service_permit permit) + : _client_state(client_state) + , _trace_state_ptr(tracing::trace_state_ptr()) + , _permit(std::move(permit)) + {} + + query_state(client_state& client_state, service_permit permit, qos::service_level_controller &sl_controller) : _client_state(client_state) , _trace_state_ptr(tracing::trace_state_ptr()) , _permit(std::move(permit)) - { } + , _sl_controller(sl_controller) + {} query_state(client_state& client_state, tracing::trace_state_ptr trace_state_ptr, service_permit permit) : _client_state(client_state) @@ -48,6 +59,13 @@ public: , _permit(std::move(permit)) { } + query_state(client_state& client_state, tracing::trace_state_ptr trace_state_ptr, service_permit permit, qos::service_level_controller& sl_controller) + : _client_state(client_state) + , _trace_state_ptr(std::move(trace_state_ptr)) + , _permit(std::move(permit)) + , _sl_controller(sl_controller) + { } + const tracing::trace_state_ptr& get_trace_state() const { return _trace_state_ptr; } @@ -75,6 +93,10 @@ public: return std::move(_permit); } + qos::service_level_controller& get_service_level_controller() const { + return _sl_controller->get(); + } + }; } diff --git a/test/boost/auth_resource_test.cc b/test/boost/auth_resource_test.cc index 3be904b3cd..1dc25cad56 100644 --- a/test/boost/auth_resource_test.cc +++ b/test/boost/auth_resource_test.cc @@ -50,6 +50,13 @@ BOOST_AUTO_TEST_CASE(root_of) { const auto rv = auth::role_resource_view(rr); BOOST_REQUIRE(!rv.role()); + + // + // service_level + // + + const auto slr = auth::resource(auth::resource_kind::service_level); + BOOST_REQUIRE_EQUAL(slr.kind(), auth::resource_kind::service_level); } BOOST_AUTO_TEST_CASE(data) { @@ -76,6 +83,11 @@ BOOST_AUTO_TEST_CASE(role) { BOOST_REQUIRE_EQUAL(*v.role(), "joe"); } +BOOST_AUTO_TEST_CASE(service_level) { + const auto r = auth::make_service_level_resource(); + BOOST_REQUIRE_EQUAL(r.kind(), auth::resource_kind::service_level); +} + BOOST_AUTO_TEST_CASE(from_name) { // // data @@ -104,6 +116,15 @@ BOOST_AUTO_TEST_CASE(from_name) { BOOST_REQUIRE_THROW(auth::parse_resource("roles/joe/smith"), auth::invalid_resource_name); + // + //service_level + // + + const auto slr1 = auth::parse_resource("service_levels"); + BOOST_REQUIRE_EQUAL(slr1, auth::root_service_level_resource()); + + BOOST_REQUIRE_THROW(auth::parse_resource("service_levels/low_priority"), auth::invalid_resource_name); + // // Generic errors. // @@ -127,6 +148,12 @@ BOOST_AUTO_TEST_CASE(name) { BOOST_REQUIRE_EQUAL(auth::root_role_resource().name(), "roles"); BOOST_REQUIRE_EQUAL(auth::make_role_resource("joe").name(), "roles/joe"); + + // + // service_level + // + + BOOST_REQUIRE_EQUAL(auth::root_service_level_resource().name(), "service_levels"); } BOOST_AUTO_TEST_CASE(parent) { @@ -162,6 +189,12 @@ BOOST_AUTO_TEST_CASE(output) { BOOST_REQUIRE_EQUAL(format("{}", auth::root_role_resource()), ""); BOOST_REQUIRE_EQUAL(format("{}", auth::make_role_resource("joe")), ""); + + // + // service_level + // + + BOOST_REQUIRE_EQUAL(format("{}", auth::root_service_level_resource()), ""); } BOOST_AUTO_TEST_CASE(expand) { diff --git a/test/boost/cql_query_test.cc b/test/boost/cql_query_test.cc index 9b12b52f1f..1223784c94 100644 --- a/test/boost/cql_query_test.cc +++ b/test/boost/cql_query_test.cc @@ -54,6 +54,7 @@ #include #include "gms/feature.hh" #include "db/query_context.hh" +#include "service/qos/qos_common.hh" using namespace std::literals::chrono_literals; @@ -4881,3 +4882,81 @@ SEASTAR_THREAD_TEST_CASE(test_query_unselected_columns) { } }, std::move(cfg), thread_attributes{.sched_group = statement_sched_group}).get(); } + +SEASTAR_TEST_CASE(test_user_based_sla_queries) { + return do_with_cql_env_thread([] (cql_test_env& e) { + // test create service level with defaults + e.execute_cql("CREATE SERVICE_LEVEL sl_1;").get(); + auto msg = e.execute_cql("LIST SERVICE_LEVEL sl_1;").get0(); + assert_that(msg).is_rows().with_rows({ + {utf8_type->decompose("sl_1")}, + }); + e.execute_cql("CREATE SERVICE_LEVEL sl_2;").get(); + //drop service levels + e.execute_cql("DROP SERVICE_LEVEL sl_1;").get(); + msg = e.execute_cql("LIST ALL SERVICE_LEVELS;").get0(); + assert_that(msg).is_rows().with_rows({ + {utf8_type->decompose("sl_2")}, + }); + + // validate exceptions (illegal requests) + BOOST_REQUIRE_THROW(e.execute_cql("DROP SERVICE_LEVEL sl_1;").get(), qos::nonexistant_service_level_exception); + e.execute_cql("DROP SERVICE_LEVEL IF EXISTS sl_1;").get(); + + BOOST_REQUIRE_THROW(e.execute_cql("CREATE SERVICE_LEVEL sl_2;").get(), exceptions::invalid_request_exception); + BOOST_REQUIRE_THROW(e.execute_cql("CREATE SERVICE_LEVEL sl_2;").get(), exceptions::invalid_request_exception); + e.execute_cql("CREATE SERVICE_LEVEL IF NOT EXISTS sl_2;").get(); + + // test attach role + e.execute_cql("ATTACH SERVICE_LEVEL sl_2 TO tester").get(); + msg = e.execute_cql("LIST ATTACHED SERVICE_LEVEL OF tester;").get0(); + assert_that(msg).is_rows().with_rows({ + {utf8_type->decompose("tester"), utf8_type->decompose("sl_2")}, + }); + msg = e.execute_cql("LIST ALL ATTACHED SERVICE_LEVELS;").get0(); + assert_that(msg).is_rows().with_rows({ + {utf8_type->decompose("tester"), utf8_type->decompose("sl_2")}, + }); + + // test attachment illegal request + BOOST_REQUIRE_THROW(e.execute_cql("ATTACH SERVICE_LEVEL sl_2 TO tester2;").get(), auth::nonexistant_role); + BOOST_REQUIRE_THROW(e.execute_cql("ATTACH SERVICE_LEVEL sl_1 TO tester;").get(), qos::nonexistant_service_level_exception); + BOOST_CHECK(true); + // tests detaching service levels + e.execute_cql("CREATE ROLE tester2;").get(); + e.execute_cql("CREATE SERVICE_LEVEL sl_1;").get(); + e.execute_cql("ATTACH SERVICE_LEVEL sl_1 TO tester2;").get(); + e.execute_cql("DETACH SERVICE_LEVEL FROM tester;").get(); + msg = e.execute_cql("LIST ATTACHED SERVICE_LEVEL OF tester2;").get0(); + assert_that(msg).is_rows().with_rows({ + {utf8_type->decompose("tester2"), utf8_type->decompose("sl_1")}, + }); + BOOST_CHECK(true); + msg = e.execute_cql("LIST ATTACHED SERVICE_LEVEL OF tester;").get0(); + assert_that(msg).is_rows().with_rows({ + }); + BOOST_CHECK(true); + msg = e.execute_cql("LIST ALL ATTACHED SERVICE_LEVELS;").get0(); + assert_that(msg).is_rows().with_rows({ + {utf8_type->decompose("tester2"), utf8_type->decompose("sl_1")}, + }); + BOOST_CHECK(true); + //test implicit detach when removing role + e.execute_cql("DROP ROLE tester2;").get(); + msg = e.execute_cql("LIST ALL ATTACHED SERVICE_LEVELS;").get0(); + assert_that(msg).is_rows().with_rows({ + }); + BOOST_CHECK(true); + e.execute_cql("ATTACH SERVICE_LEVEL sl_1 TO tester;").get(); + msg = e.execute_cql("LIST ALL ATTACHED SERVICE_LEVELS;").get0(); + assert_that(msg).is_rows().with_rows({ + {utf8_type->decompose("tester"), utf8_type->decompose("sl_1")}, + }); + BOOST_CHECK(true); + //test implicit detach when removing service level + e.execute_cql("DROP SERVICE_LEVEL sl_1;").get(); + msg = e.execute_cql("LIST ALL ATTACHED SERVICE_LEVELS;").get0(); + assert_that(msg).is_rows().with_rows({ + }); + }); +} diff --git a/test/lib/cql_test_env.cc b/test/lib/cql_test_env.cc index ae7eb464a7..29a6ed32c2 100644 --- a/test/lib/cql_test_env.cc +++ b/test/lib/cql_test_env.cc @@ -51,6 +51,7 @@ #include "test/lib/reader_permit.hh" #include "db/query_context.hh" #include "test/lib/test_services.hh" +#include "unit_test_service_levels_accessor.hh" #include "db/view/view_builder.hh" #include "db/view/node_view_update_backlog.hh" #include "distributed_loader.hh" @@ -123,6 +124,7 @@ private: sharded& _view_update_generator; sharded& _mnotifier; sharded& _cdc_generation_service; + sharded& _sl_controller; private: struct core_local_state { service::client_state client_state; @@ -143,7 +145,7 @@ private: if (_db.local().has_keyspace(ks_name)) { _core_local.local().client_state.set_keyspace(_db.local(), ks_name); } - return ::make_shared(_core_local.local().client_state, empty_service_permit()); + return ::make_shared(_core_local.local().client_state, empty_service_permit(), _sl_controller.local()); } public: single_node_cql_env( @@ -154,7 +156,8 @@ public: sharded& view_builder, sharded& view_update_generator, sharded& mnotifier, - sharded& cdc_generation_service) + sharded& cdc_generation_service, + sharded &sl_controller) : _feature_service(feature_service) , _db(db) , _qp(qp) @@ -163,6 +166,7 @@ public: , _view_update_generator(view_update_generator) , _mnotifier(mnotifier) , _cdc_generation_service(cdc_generation_service) + , _sl_controller(sl_controller) { } virtual future<::shared_ptr> execute_cql(sstring_view text) override { @@ -438,15 +442,27 @@ public: mm_notif.start().get(); auto stop_mm_notify = defer([&mm_notif] { mm_notif.stop().get(); }); + sharded auth_service; + set_abort_on_internal_error(true); const gms::inet_address listen("127.0.0.1"); + auto sys_dist_ks = seastar::sharded(); + auto sl_controller = sharded(); + sl_controller.start(std::ref(auth_service), qos::service_level_options{}).get(); + auto stop_sl_controller = defer([&sl_controller] { sl_controller.stop().get(); }); + sl_controller.invoke_on_all(&qos::service_level_controller::start).get(); + sl_controller.invoke_on_all([&sys_dist_ks, &sl_controller] (qos::service_level_controller& service) { + qos::service_level_controller::service_level_distributed_data_accessor_ptr service_level_data_accessor = + ::static_pointer_cast( + make_shared(sl_controller,sys_dist_ks)); + return service.set_distributed_data_accessor(std::move(service_level_data_accessor)); + }).get(); sharded ms; // don't start listening so tests can be run in parallel ms.start(listen, std::move(7000)).get(); auto stop_ms = defer([&ms] { ms.stop().get(); }); - sharded auth_service; // Normally the auth server is already stopped in here, // but if there is an initialization failure we have to // make sure to stop it now or ~sharded will assert. @@ -454,7 +470,6 @@ public: auth_service.stop().get(); }); - auto sys_dist_ks = seastar::sharded(); auto stop_sys_dist_ks = defer([&sys_dist_ks] { sys_dist_ks.stop().get(); }); gms::feature_config fcfg = gms::feature_config_from_db_config(*cfg, cfg_in.disabled_features); @@ -529,7 +544,7 @@ public: sharded qp; cql3::query_processor::memory_config qp_mcfg = {memory::stats().total_memory() / 256, memory::stats().total_memory() / 2560}; - qp.start(std::ref(proxy), std::ref(db), std::ref(mm_notif), std::ref(mm), qp_mcfg, std::ref(cql_config)).get(); + qp.start(std::ref(proxy), std::ref(db), std::ref(mm_notif), std::ref(mm), qp_mcfg, std::ref(cql_config), std::ref(sl_controller)).get(); auto stop_qp = defer([&qp] { qp.stop().get(); }); // In main.cc we call db::system_keyspace::setup which calls @@ -643,7 +658,7 @@ public: // The default user may already exist if this `cql_test_env` is starting with previously populated data. } - single_node_cql_env env(feature_service, db, qp, auth_service, view_builder, view_update_generator, mm_notif, cdc_generation_service); + single_node_cql_env env(feature_service, db, qp, auth_service, view_builder, view_update_generator, mm_notif, cdc_generation_service, std::ref(sl_controller)); env.start().get(); auto stop_env = defer([&env] { env.stop().get(); }); diff --git a/test/lib/unit_test_service_levels_accessor.hh b/test/lib/unit_test_service_levels_accessor.hh new file mode 100644 index 0000000000..34a885db1e --- /dev/null +++ b/test/lib/unit_test_service_levels_accessor.hh @@ -0,0 +1,62 @@ +/* + * Copyright (C) 2021 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#include "service/qos/service_level_controller.hh" +#include "service/qos/qos_common.hh" +#include "db/system_distributed_keyspace.hh" +#pragma once + +namespace qos { + +/** + * This class is a helper for unit testing. It implements the service level distributed + * accessor interface in order to be used in the unit testing environment. The advantage + * of this class over the standard implementation is that it makes sure that updates are + * Immediately propagated to the underlying service level controller. + */ +class unit_test_service_levels_accessor : public service_level_controller::service_level_distributed_data_accessor { + sharded &_sl_controller; + sharded &_sys_dist_ks; +public: + unit_test_service_levels_accessor(sharded& sl_controller, sharded &sys_dist_ks) + : _sl_controller(sl_controller) + , _sys_dist_ks(sys_dist_ks) + {} + virtual future get_service_levels() const { + return _sys_dist_ks.local().get_service_levels(); + } + virtual future get_service_level(sstring service_level_name) const { + return _sys_dist_ks.local().get_service_level(service_level_name); + } + virtual future<> set_service_level(sstring service_level_name, qos::service_level_options slo) const { + return _sys_dist_ks.local().set_service_level(service_level_name, slo).then([this] () { + return _sl_controller.invoke_on_all(&service_level_controller::update_service_levels_from_distributed_data); + }); + + } + virtual future<> drop_service_level(sstring service_level_name) const { + return _sys_dist_ks.local().drop_service_level(service_level_name).then([this] () { + return _sl_controller.invoke_on_all(&service_level_controller::update_service_levels_from_distributed_data); + }); + } +}; + +} diff --git a/transport/controller.cc b/transport/controller.cc index b1b3e6bcc8..22cd9a01f9 100644 --- a/transport/controller.cc +++ b/transport/controller.cc @@ -33,14 +33,16 @@ namespace cql_transport { static logging::logger logger("cql_server_controller"); -controller::controller(distributed& db, sharded& auth, sharded& mn, gms::gossiper& gossiper, sharded& qp, sharded& ml) +controller::controller(distributed& db, sharded& auth, sharded& mn, gms::gossiper& gossiper, sharded& qp, sharded& ml, + sharded& sl_controller) : _ops_sem(1) , _db(db) , _auth_service(auth) , _mnotifier(mn) , _gossiper(gossiper) , _qp(qp) - , _mem_limiter(ml) { + , _mem_limiter(ml) + , _sl_controller(sl_controller) { } future<> controller::start_server() { @@ -147,7 +149,7 @@ future<> controller::do_start_server() { } } - cserver->start(std::ref(_qp), std::ref(_auth_service), std::ref(_mnotifier), std::ref(_db), std::ref(_mem_limiter), cql_server_config).get(); + cserver->start(std::ref(_qp), std::ref(_auth_service), std::ref(_mnotifier), std::ref(_db), std::ref(_mem_limiter), cql_server_config, std::ref(_sl_controller)).get(); try { parallel_for_each(configs, [cserver, keepalive](const listen_cfg & cfg) { diff --git a/transport/controller.hh b/transport/controller.hh index 801a4db05c..03808e82a5 100644 --- a/transport/controller.hh +++ b/transport/controller.hh @@ -24,6 +24,7 @@ #include #include #include +#include "service/qos/service_level_controller.hh" using namespace seastar; @@ -50,13 +51,14 @@ class controller { gms::gossiper& _gossiper; sharded& _qp; sharded& _mem_limiter; + sharded& _sl_controller; future<> set_cql_ready(bool ready); future<> do_start_server(); future<> do_stop_server(); public: - controller(distributed&, sharded&, sharded&, gms::gossiper&, sharded&, sharded&); + controller(distributed&, sharded&, sharded&, gms::gossiper&, sharded&, sharded&, sharded&); future<> start_server(); future<> stop_server(); future<> stop(); diff --git a/transport/server.cc b/transport/server.cc index acb2cc2926..2fe7777517 100644 --- a/transport/server.cc +++ b/transport/server.cc @@ -160,7 +160,7 @@ event::event_type parse_event_type(const sstring& value) } cql_server::cql_server(distributed& qp, auth::service& auth_service, - service::migration_notifier& mn, database& db, service::memory_limiter& ml, cql_server_config config) + service::migration_notifier& mn, database& db, service::memory_limiter& ml, cql_server_config config, qos::service_level_controller& sl_controller) : _query_processor(qp) , _config(config) , _max_request_size(config.max_request_size) @@ -168,6 +168,7 @@ cql_server::cql_server(distributed& qp, auth::service& au , _memory_available(ml.get_semaphore()) , _notifier(std::make_unique(mn)) , _auth_service(auth_service) + , _sl_controller(sl_controller) { namespace sm = seastar::metrics; @@ -952,7 +953,7 @@ cql_server::connection::process_on_shard(unsigned shard, uint16_t stream, fragme (bytes_ostream& linearization_buffer, service::client_state& client_state) mutable { request_reader in(is, linearization_buffer); return process_fn(client_state, server._query_processor, in, stream, _version, _cql_serialization_format, - /* FIXME */empty_service_permit(), std::move(trace_state), false).then([] (auto msg) { + /* FIXME */empty_service_permit(), std::move(trace_state), false, _server._sl_controller).then([] (auto msg) { // result here has to be foreign ptr return std::get>>(std::move(msg)); }); @@ -967,7 +968,7 @@ cql_server::connection::process(uint16_t stream, request_reader in, service::cli fragmented_temporary_buffer::istream is = in.get_stream(); return process_fn(client_state, _server._query_processor, in, stream, - _version, _cql_serialization_format, permit, trace_state, true) + _version, _cql_serialization_format, permit, trace_state, true, _server._sl_controller) .then([stream, &client_state, this, is, permit, process_fn, trace_state] (std::variant>, unsigned> msg) mutable { unsigned* shard = std::get_if(&msg); @@ -981,9 +982,10 @@ cql_server::connection::process(uint16_t stream, request_reader in, service::cli static future>, unsigned>> process_query_internal(service::client_state& client_state, distributed& qp, request_reader in, uint16_t stream, cql_protocol_version_type version, cql_serialization_format serialization_format, - service_permit permit, tracing::trace_state_ptr trace_state, bool init_trace) { + service_permit permit, tracing::trace_state_ptr trace_state, bool init_trace, + qos::service_level_controller& sl_controller) { auto query = in.read_long_string_view(); - auto q_state = std::make_unique(client_state, trace_state, std::move(permit)); + auto q_state = std::make_unique(client_state, trace_state, std::move(permit), sl_controller); auto& query_state = q_state->query_state; q_state->options = in.read_options(version, serialization_format, qp.local().get_cql_config()); auto& options = *q_state->options; @@ -1048,7 +1050,8 @@ future> cql_server::connection::process_pr static future>, unsigned>> process_execute_internal(service::client_state& client_state, distributed& qp, request_reader in, uint16_t stream, cql_protocol_version_type version, cql_serialization_format serialization_format, - service_permit permit, tracing::trace_state_ptr trace_state, bool init_trace) { + service_permit permit, tracing::trace_state_ptr trace_state, bool init_trace, + qos::service_level_controller& sl_controller) { cql3::prepared_cache_key_type cache_key(in.read_short_bytes()); auto& id = cql3::prepared_cache_key_type::cql_id(cache_key); bool needs_authorization = false; @@ -1065,7 +1068,7 @@ process_execute_internal(service::client_state& client_state, distributed(client_state, trace_state, std::move(permit)); + auto q_state = std::make_unique(client_state, trace_state, std::move(permit), sl_controller); auto& query_state = q_state->query_state; if (version == 1) { std::vector values; @@ -1127,7 +1130,8 @@ future>> cql_server::connectio static future>, unsigned>> process_batch_internal(service::client_state& client_state, distributed& qp, request_reader in, uint16_t stream, cql_protocol_version_type version, cql_serialization_format serialization_format, - service_permit permit, tracing::trace_state_ptr trace_state, bool init_trace) { + service_permit permit, tracing::trace_state_ptr trace_state, bool init_trace, + qos::service_level_controller& sl_controller) { if (version == 1) { throw exceptions::protocol_exception("BATCH messages are not support in version 1 of the protocol"); } @@ -1212,7 +1216,7 @@ process_batch_internal(service::client_state& client_state, distributed(client_state, trace_state, std::move(permit)); + auto q_state = std::make_unique(client_state, trace_state, std::move(permit), sl_controller); auto& query_state = q_state->query_state; // #563. CQL v2 encodes query_options in v1 format for batch requests. q_state->options = std::make_unique(cql3::query_options::make_batch_options(std::move(*in.read_options(version < 3 ? 1 : version, serialization_format, diff --git a/transport/server.hh b/transport/server.hh index 55fe13ed0c..47cd88e788 100644 --- a/transport/server.hh +++ b/transport/server.hh @@ -40,6 +40,7 @@ #include "service_permit.hh" #include #include "utils/updateable_value.hh" +#include "service/qos/service_level_controller.hh" namespace scollectd { @@ -102,8 +103,8 @@ struct cql_query_state { service::query_state query_state; std::unique_ptr options; - cql_query_state(service::client_state& client_state, tracing::trace_state_ptr trace_state_ptr, service_permit permit) - : query_state(client_state, std::move(trace_state_ptr), std::move(permit)) + cql_query_state(service::client_state& client_state, tracing::trace_state_ptr trace_state_ptr, service_permit permit, qos::service_level_controller& sl_controller) + : query_state(client_state, std::move(trace_state_ptr), std::move(permit), sl_controller) { } }; @@ -157,10 +158,12 @@ private: private: transport_stats _stats = {}; auth::service& _auth_service; + qos::service_level_controller& _sl_controller; public: cql_server(distributed& qp, auth::service&, service::migration_notifier& mn, database& db, service::memory_limiter& ml, - cql_server_config config); + cql_server_config config, + qos::service_level_controller& sl_controller); future<> listen(socket_address addr, std::shared_ptr = {}, bool is_shard_aware = false, bool keepalive = false); future<> do_accepts(int which, bool keepalive, socket_address server_addr); future<> stop();