Merge 'Introduce service levels' from Piotr Sarna

This series introduces service level syntax borrowed from https://docs.scylladb.com/using-scylla/workload-prioritization/ , but without workload prioritization itself - just for the sake of using identical syntax to provide different parameters later. The new parameters may include:
 * per-service-level timeouts
 * oltp/olap declaration, which may change the way Scylla treats long requests - e.g. time them out (the oltp way) or keep them sustained with empty pages (the olap way)

Refs #7617

Closes #7867

* github.com:scylladb/scylla:
  transport: initialize query state with service level controller
  main: add initializing service level data accessor
  service: make enable_shared_from_this inheritance public
  cql3: add SERVICE LEVEL syntax (without an underscore)
  unit test: Add unit test for per user sla syntax
  cql: Add support for service level cql queries
  auth: Add service_level resource for supporting in authorization of cql service_level
  cql: Support accessing service_level_controller from query state
  instantiate and initialize the service_level_controller
  qos: Add a standard implementation for service level data accessor
  qos: add waiting for the updater future
  service/qos: adding service level controller
  service_levels: Add documentation for distributed tables
  service/qos: adding service level table to the distributed keyspace
  service/qos: add common definitions
  auth: add support for role attributes
This commit is contained in:
Avi Kivity
2021-04-12 17:34:43 +03:00
47 changed files with 2505 additions and 36 deletions

View File

@@ -56,6 +56,7 @@ std::ostream& operator<<(std::ostream& os, resource_kind kind) {
switch (kind) {
case resource_kind::data: os << "data"; break;
case resource_kind::role: os << "role"; break;
case resource_kind::service_level: os << "service_level"; break;
}
return os;
@@ -63,11 +64,13 @@ std::ostream& operator<<(std::ostream& os, resource_kind kind) {
static const std::unordered_map<resource_kind, std::string_view> roots{
{resource_kind::data, "data"},
{resource_kind::role, "roles"}};
{resource_kind::role, "roles"},
{resource_kind::service_level, "service_levels"}};
static const std::unordered_map<resource_kind, std::size_t> max_parts{
{resource_kind::data, 2},
{resource_kind::role, 1}};
{resource_kind::role, 1},
{resource_kind::service_level, 0}};
static permission_set applicable_permissions(const data_resource_view& dv) {
if (dv.table()) {
@@ -101,6 +104,15 @@ static permission_set applicable_permissions(const role_resource_view& rv) {
permission::DESCRIBE>();
}
static permission_set applicable_permissions(const service_level_resource_view &rv) {
return permission_set::of<
permission::CREATE,
permission::ALTER,
permission::DROP,
permission::DESCRIBE,
permission::AUTHORIZE>();
}
resource::resource(resource_kind kind) : _kind(kind) {
_parts.emplace_back(roots.at(kind));
}
@@ -122,6 +134,9 @@ resource::resource(role_resource_t, std::string_view role) : resource(resource_k
_parts.emplace_back(role);
}
resource::resource(service_level_resource_t): resource(resource_kind::service_level) {
}
sstring resource::name() const {
return boost::algorithm::join(_parts, "/");
}
@@ -142,6 +157,7 @@ permission_set resource::applicable_permissions() const {
switch (_kind) {
case resource_kind::data: ps = ::auth::applicable_permissions(data_resource_view(*this)); break;
case resource_kind::role: ps = ::auth::applicable_permissions(role_resource_view(*this)); break;
case resource_kind::service_level: ps = ::auth::applicable_permissions(service_level_resource_view(*this)); break;
}
return ps;
@@ -163,11 +179,24 @@ std::ostream& operator<<(std::ostream& os, const resource& r) {
switch (r.kind()) {
case resource_kind::data: return os << data_resource_view(r);
case resource_kind::role: return os << role_resource_view(r);
case resource_kind::service_level: return os << service_level_resource_view(r);
}
return os;
}
service_level_resource_view::service_level_resource_view(const resource &r) :
_resource(r) {
if (r._kind != resource_kind::service_level) {
throw resource_kind_mismatch(resource_kind::service_level, r._kind);
}
}
std::ostream &operator<<(std::ostream &os, const service_level_resource_view &v) {
os << "<all service levels>";
return os;
}
data_resource_view::data_resource_view(const resource& r) : _resource(r) {
if (r._kind != resource_kind::data) {
throw resource_kind_mismatch(resource_kind::data, r._kind);
@@ -276,6 +305,12 @@ const resource& root_role_resource() {
return the_root_role_resource;
}
static const resource the_root_service_level_resource{resource_kind::service_level};
const resource &root_service_level_resource() {
return the_root_service_level_resource;
}
resource_set expand_resource_family(const resource& rr) {
resource r = rr;
resource_set rs;

View File

@@ -67,7 +67,7 @@ public:
};
enum class resource_kind {
data, role
data, role, service_level
};
std::ostream& operator<<(std::ostream&, resource_kind);
@@ -82,6 +82,11 @@ struct data_resource_t final {};
///
struct role_resource_t final {};
///
/// Type tag for constructing service_level resources.
///
struct service_level_resource_t final {};
///
/// Resources are entities that users can be granted permissions on.
///
@@ -108,6 +113,7 @@ public:
resource(data_resource_t, std::string_view keyspace);
resource(data_resource_t, std::string_view keyspace, std::string_view table);
resource(role_resource_t, std::string_view role);
resource(service_level_resource_t);
resource_kind kind() const noexcept {
return _kind;
@@ -128,6 +134,7 @@ private:
friend class std::hash<resource>;
friend class data_resource_view;
friend class role_resource_view;
friend class service_level_resource_view;
friend bool operator<(const resource&, const resource&);
friend bool operator==(const resource&, const resource&);
@@ -192,6 +199,22 @@ public:
std::ostream& operator<<(std::ostream&, const role_resource_view&);
///
/// A "service_level" view of \ref resource.
///
class service_level_resource_view final {
const resource& _resource;
public:
///
/// \throws \ref resource_kind_mismatch if the argument is not a "service_level" resource.
///
explicit service_level_resource_view(const resource&);
};
std::ostream& operator<<(std::ostream&, const service_level_resource_view&);
///
/// Parse a resource from its name.
///
@@ -214,6 +237,12 @@ inline resource make_role_resource(std::string_view role) {
return resource(role_resource_t{}, role);
}
const resource& root_service_level_resource();
inline resource make_service_level_resource() {
return resource(service_level_resource_t{});
}
}
namespace std {
@@ -228,12 +257,17 @@ struct hash<auth::resource> {
return utils::tuple_hash()(std::make_tuple(auth::resource_kind::role, rv.role()));
}
static size_t hash_service_level(const auth::service_level_resource_view& rv) {
return utils::tuple_hash()(std::make_tuple(auth::resource_kind::service_level));
}
size_t operator()(const auth::resource& r) const {
std::size_t value;
switch (r._kind) {
case auth::resource_kind::data: value = hash_data(auth::data_resource_view(r)); break;
case auth::resource_kind::role: value = hash_role(auth::role_resource_view(r)); break;
case auth::resource_kind::service_level: value = hash_service_level(auth::service_level_resource_view(r)); break;
}
return value;

View File

@@ -101,6 +101,11 @@ enum class recursive_role_query { yes, no };
/// access-control should never be enforced in implementations.
///
class role_manager {
public:
// this type represents a mapping between a role and some attribute value.
// i.e: given attribute name 'a' this map holds role name and it's assigned
// value of 'a'.
using attribute_vals = std::unordered_map<sstring, sstring>;
public:
virtual ~role_manager() = default;
@@ -164,6 +169,26 @@ public:
/// \returns an exceptional future with \ref nonexistant_role if the role does not exist.
///
virtual future<bool> can_login(std::string_view role_name) const = 0;
};
///
/// \returns the value of the named attribute, if one is set.
///
virtual future<std::optional<sstring>> get_attribute(std::string_view role_name, std::string_view attribute_name) const = 0;
///
/// \returns a mapping of each role's value for the named attribute, if one is set for the role.
///
virtual future<attribute_vals> query_attribute_for_all(std::string_view attribute_name) const = 0;
/// Sets `attribute_name` with `attribute_value` for `role_name`.
/// \returns an exceptional future with nonexistant_role if the role does not exist.
///
virtual future<> set_attribute(std::string_view role_name, std::string_view attribute_name, std::string_view attribute_value) const = 0;
/// Removes `attribute_name` for `role_name`.
/// \returns an exceptional future with nonexistant_role if the role does not exist.
/// \note: This is a no-op if the role does not have the named attribute set.
///
virtual future<> remove_attribute(std::string_view role_name, std::string_view attribute_name) const = 0;
};
}

View File

@@ -327,6 +327,8 @@ future<bool> service::exists(const resource& r) const {
return make_ready_future<bool>(true);
}
case resource_kind::service_level:
return make_ready_future<bool>(true);
}
return make_ready_future<bool>(false);

View File

@@ -53,6 +53,26 @@ constexpr std::string_view qualified_name("system_auth.role_members");
}
namespace role_attributes_table {
constexpr std::string_view name{"role_attributes", 15};
static std::string_view qualified_name() noexcept {
static const sstring instance = format("{}.{}", AUTH_KS, name);
return instance;
}
static std::string_view creation_query() noexcept {
static const sstring instance = format(
"CREATE TABLE {} ("
" role text,"
" name text,"
" value text,"
" PRIMARY KEY(role, name)"
")",
qualified_name());
return instance;
}
}
}
static logging::logger log("standard_role_manager");
@@ -152,6 +172,11 @@ future<> standard_role_manager::create_metadata_tables_if_missing() const {
meta::role_members_table::name,
_qp,
create_role_members_query,
_migration_manager),
create_metadata_table_if_missing(
meta::role_attributes_table::name,
_qp,
meta::role_attributes_table::creation_query(),
_migration_manager)).discard_result();
}
@@ -345,6 +370,12 @@ future<> standard_role_manager::drop(std::string_view role_name) const {
});
};
// Delete all attributes for that role
const auto remove_attributes_of = [this, role_name] {
static const sstring query = format("DELETE FROM {} WHERE role = ?", meta::role_attributes_table::qualified_name());
return _qp.execute_internal(query, {sstring(role_name)}).discard_result();
};
// Finally, delete the role itself.
auto delete_role = [this, role_name] {
static const sstring query = format("DELETE FROM {} WHERE {} = ?",
@@ -358,7 +389,8 @@ future<> standard_role_manager::drop(std::string_view role_name) const {
{sstring(role_name)}).discard_result();
};
return when_all_succeed(revoke_from_members(), revoke_members_of()).then_unpack([delete_role = std::move(delete_role)] {
return when_all_succeed(revoke_from_members(), revoke_members_of(),
remove_attributes_of()).then_unpack([delete_role = std::move(delete_role)] {
return delete_role();
});
});
@@ -536,4 +568,56 @@ future<bool> standard_role_manager::can_login(std::string_view role_name) const
});
}
future<std::optional<sstring>> standard_role_manager::get_attribute(std::string_view role_name, std::string_view attribute_name) const {
static const sstring query = format("SELECT name, value FROM {} WHERE role = ? AND name = ?", meta::role_attributes_table::qualified_name());
return _qp.execute_internal(query, {sstring(role_name), sstring(attribute_name)}).then([] (shared_ptr<cql3::untyped_result_set> result_set) {
if (!result_set->empty()) {
const cql3::untyped_result_set_row &row = result_set->one();
return std::optional<sstring>(row.get_as<sstring>("value"));
}
return std::optional<sstring>{};
});
}
future<role_manager::attribute_vals> standard_role_manager::query_attribute_for_all (std::string_view attribute_name) const {
return query_all().then([this, attribute_name] (role_set roles) {
return do_with(attribute_vals{}, [this, attribute_name, roles = std::move(roles)] (attribute_vals &role_to_att_val) {
return parallel_for_each(roles.begin(), roles.end(), [this, &role_to_att_val, attribute_name] (sstring role) {
return get_attribute(role, attribute_name).then([&role_to_att_val, role] (std::optional<sstring> att_val) {
if (att_val) {
role_to_att_val.emplace(std::move(role), std::move(*att_val));
}
});
}).then([&role_to_att_val] () {
return make_ready_future<attribute_vals>(std::move(role_to_att_val));
});
});
});
}
future<> standard_role_manager::set_attribute(std::string_view role_name, std::string_view attribute_name, std::string_view attribute_value) const {
static const sstring query = format("INSERT INTO {} (role, name, value) VALUES (?, ?, ?)", meta::role_attributes_table::qualified_name());
return do_with(sstring(role_name), sstring(attribute_name), sstring(attribute_value), [this] (sstring& role_name, sstring &attribute_name,
sstring &attribute_value) {
return exists(role_name).then([&role_name, &attribute_name, &attribute_value, this] (bool role_exists) {
if (!role_exists) {
throw auth::nonexistant_role(role_name);
}
return _qp.execute_internal(query, {sstring(role_name), sstring(attribute_name), sstring(attribute_value)}).discard_result();
});
});
}
future<> standard_role_manager::remove_attribute(std::string_view role_name, std::string_view attribute_name) const {
static const sstring query = format("DELETE FROM {} WHERE role = ? AND name = ?", meta::role_attributes_table::qualified_name());
return do_with(sstring(role_name), sstring(attribute_name), [this] (sstring& role_name, sstring &attribute_name) {
return exists(role_name).then([&role_name, &attribute_name, this] (bool role_exists) {
if (!role_exists) {
throw auth::nonexistant_role(role_name);
}
return _qp.execute_internal(query, {sstring(role_name), sstring(attribute_name)}).discard_result();
});
});
}
}

View File

@@ -83,6 +83,14 @@ public:
virtual future<bool> can_login(std::string_view role_name) const override;
virtual future<std::optional<sstring>> get_attribute(std::string_view role_name, std::string_view attribute_name) const override;
virtual future<role_manager::attribute_vals> query_attribute_for_all(std::string_view attribute_name) const override;
virtual future<> set_attribute(std::string_view role_name, std::string_view attribute_name, std::string_view attribute_value) const override;
virtual future<> remove_attribute(std::string_view role_name, std::string_view attribute_name) const override;
private:
enum class membership_change { add, remove };

View File

@@ -716,6 +716,15 @@ scylla_core = (['database.cc',
'cql3/statements/alter_type_statement.cc',
'cql3/statements/alter_keyspace_statement.cc',
'cql3/statements/role-management-statements.cc',
'cql3/statements/service_level_statement.cc',
'cql3/statements/create_service_level_statement.cc',
'cql3/statements/alter_service_level_statement.cc',
'cql3/statements/sl_prop_defs.cc',
'cql3/statements/drop_service_level_statement.cc',
'cql3/statements/attach_service_level_statement.cc',
'cql3/statements/detach_service_level_statement.cc',
'cql3/statements/list_service_level_statement.cc',
'cql3/statements/list_service_level_attachments_statement.cc',
'cql3/update_parameters.cc',
'cql3/util.cc',
'cql3/ut_name.cc',
@@ -836,6 +845,9 @@ scylla_core = (['database.cc',
'service/misc_services.cc',
'service/pager/paging_state.cc',
'service/pager/query_pagers.cc',
'service/qos/qos_common.cc',
'service/qos/service_level_controller.cc',
'service/qos/standard_service_level_distributed_data_accessor.cc',
'streaming/stream_task.cc',
'streaming/stream_session.cc',
'streaming/stream_request.cc',

View File

@@ -37,6 +37,7 @@ options {
#include "cql3/statements/alter_keyspace_statement.hh"
#include "cql3/statements/alter_table_statement.hh"
#include "cql3/statements/alter_view_statement.hh"
#include "cql3/statements/alter_service_level_statement.hh"
#include "cql3/statements/create_keyspace_statement.hh"
#include "cql3/statements/drop_keyspace_statement.hh"
#include "cql3/statements/create_index_statement.hh"
@@ -44,6 +45,9 @@ options {
#include "cql3/statements/create_view_statement.hh"
#include "cql3/statements/create_type_statement.hh"
#include "cql3/statements/create_function_statement.hh"
#include "cql3/statements/create_service_level_statement.hh"
#include "cql3/statements/sl_prop_defs.hh"
#include "cql3/statements/attach_service_level_statement.hh"
#include "cql3/statements/drop_type_statement.hh"
#include "cql3/statements/alter_type_statement.hh"
#include "cql3/statements/property_definitions.hh"
@@ -51,6 +55,8 @@ options {
#include "cql3/statements/drop_table_statement.hh"
#include "cql3/statements/drop_view_statement.hh"
#include "cql3/statements/drop_function_statement.hh"
#include "cql3/statements/drop_service_level_statement.hh"
#include "cql3/statements/detach_service_level_statement.hh"
#include "cql3/statements/truncate_statement.hh"
#include "cql3/statements/raw/update_statement.hh"
#include "cql3/statements/raw/insert_statement.hh"
@@ -64,6 +70,8 @@ options {
#include "cql3/statements/list_permissions_statement.hh"
#include "cql3/statements/alter_role_statement.hh"
#include "cql3/statements/list_roles_statement.hh"
#include "cql3/statements/list_service_level_statement.hh"
#include "cql3/statements/list_service_level_attachments_statement.hh"
#include "cql3/statements/grant_role_statement.hh"
#include "cql3/statements/revoke_role_statement.hh"
#include "cql3/statements/drop_role_statement.hh"
@@ -370,6 +378,14 @@ cqlStatement returns [std::unique_ptr<raw::parsed_statement> stmt]
| st38=dropRoleStatement { $stmt = std::move(st38); }
| st39=createRoleStatement { $stmt = std::move(st39); }
| st40=alterRoleStatement { $stmt = std::move(st40); }
| st41=createServiceLevelStatement { $stmt = std::move(st41); }
| st42=alterServiceLevelStatement { $stmt = std::move(st42); }
| st43=dropServiceLevelStatement { $stmt = std::move(st43); }
| st44=attachServiceLevelStatement { $stmt = std::move(st44); }
| st45=detachServiceLevelStatement { $stmt = std::move(st45); }
| st46=listServiceLevelStatement { $stmt = std::move(st46); }
| st47=listServiceLevelAttachStatement { $stmt = std::move(st47); }
;
/*
@@ -1241,6 +1257,96 @@ roleOption[cql3::role_options& opts]
| K_LOGIN '=' b=BOOLEAN { opts.can_login = convert_boolean_literal($b.text); }
;
// Introduce a more natural syntax (SERVICE LEVEL), but still allow
// the original one (SERVICE_LEVEL)
serviceLevel
: K_SERVICE_LEVEL | ( K_SERVICE K_LEVEL )
;
serviceLevels
: K_SERVICE_LEVELS | ( K_SERVICE K_LEVELS )
;
/**
* CREATE SERVICE_LEVEL [IF NOT EXISTS] <service_level_name> [WITH <param> = <value>]
*/
createServiceLevelStatement returns [std::unique_ptr<create_service_level_statement> stmt]
@init {
auto attrs = make_shared<cql3::statements::sl_prop_defs>();
bool if_not_exists = false;
}
: K_CREATE serviceLevel (K_IF K_NOT K_EXISTS { if_not_exists = true; })? name=serviceLevelOrRoleName (K_WITH properties[*attrs])?
{ $stmt = std::make_unique<create_service_level_statement>(name, attrs, if_not_exists); }
;
/**
* ALTER SERVICE_LEVEL <service_level_name> WITH <param> = <value>
*/
alterServiceLevelStatement returns [std::unique_ptr<alter_service_level_statement> stmt]
@init {
auto attrs = make_shared<cql3::statements::sl_prop_defs>();
}
: K_ALTER serviceLevel name=serviceLevelOrRoleName K_WITH properties[*attrs]
{ $stmt = std::make_unique<alter_service_level_statement>(name, attrs); }
;
/**
* DROP SERVICE_LEVEL [IF EXISTS] <service_level_name>
*/
dropServiceLevelStatement returns [std::unique_ptr<drop_service_level_statement> stmt]
@init {
bool if_exists = false;
}
: K_DROP serviceLevel (K_IF K_EXISTS { if_exists = true; })? name=serviceLevelOrRoleName
{ $stmt = std::make_unique<drop_service_level_statement>(name, if_exists); }
;
/**
* ATTACH SERVICE_LEVEL <service_level_name> TO <role_name>
*/
attachServiceLevelStatement returns [std::unique_ptr<attach_service_level_statement> stmt]
@init {
}
: K_ATTACH serviceLevel service_level_name=serviceLevelOrRoleName K_TO role_name=serviceLevelOrRoleName
{ $stmt = std::make_unique<attach_service_level_statement>(service_level_name, role_name); }
;
/**
* DETACH SERVICE_LEVEL FROM <role_name>
*/
detachServiceLevelStatement returns [std::unique_ptr<detach_service_level_statement> stmt]
@init {
}
: K_DETACH serviceLevel K_FROM role_name=serviceLevelOrRoleName
{ $stmt = std::make_unique<detach_service_level_statement>(role_name); }
;
/**
* LIST SERVICE_LEVEL <service_level_name>
* LIST ALL SERVICE_LEVELS
*/
listServiceLevelStatement returns [std::unique_ptr<list_service_level_statement> stmt]
@init {
}
: K_LIST serviceLevel service_level_name=serviceLevelOrRoleName
{ $stmt = std::make_unique<list_service_level_statement>(service_level_name, false); } |
K_LIST K_ALL serviceLevels
{ $stmt = std::make_unique<list_service_level_statement>("", true); }
;
/**
* LIST ATTACHED SERVICE_LEVEL OF <role_name>
* LIST ALL ATTACHED SERVICE_LEVELS
*/
listServiceLevelAttachStatement returns [std::unique_ptr<list_service_level_attachments_statement> stmt]
@init {
bool allow_nonexisting_roles = false;
}
: K_LIST K_ATTACHED serviceLevel K_OF role_name=serviceLevelOrRoleName
{ $stmt = std::make_unique<list_service_level_attachments_statement>(role_name); } |
K_LIST K_ALL K_ATTACHED serviceLevels
{ $stmt = std::make_unique<list_service_level_attachments_statement>(); }
;
/** DEFINITIONS **/
// Column Identifiers. These need to be treated differently from other
@@ -1287,6 +1393,16 @@ userOrRoleName returns [uninitialized<cql3::role_name> name]
| QMARK {add_recognition_error("Bind variables cannot be used for role names");}
;
serviceLevelOrRoleName returns [sstring name]
: t=IDENT { $name = sstring($t.text);
std::transform($name.begin(), $name.end(), $name.begin(), ::tolower); }
| t=STRING_LITERAL { $name = sstring($t.text); }
| t=QUOTED_NAME { $name = sstring($t.text); }
| k=unreserved_keyword { $name = sstring($t.text);
std::transform($name.begin(), $name.end(), $name.begin(), ::tolower);}
| QMARK {add_recognition_error("Bind variables cannot be used for service levels or role names");}
;
ksName[cql3::keyspace_element_name& name]
: t=IDENT { $name.set_keyspace($t.text, false);}
| t=QUOTED_NAME { $name.set_keyspace($t.text, true);}
@@ -1767,8 +1883,17 @@ basic_unreserved_keyword returns [sstring str]
| K_LIKE
| K_PER
| K_PARTITION
| K_SERVICE_LEVEL
| K_ATTACH
| K_DETACH
| K_SERVICE_LEVELS
| K_ATTACHED
| K_FOR
| K_GROUP
| K_TIMEOUT
| K_SERVICE
| K_LEVEL
| K_LEVELS
) { $str = $k.text; }
;
@@ -1917,6 +2042,16 @@ K_CACHE: C A C H E;
K_PER: P E R;
K_PARTITION: P A R T I T I O N;
K_SERVICE_LEVEL: S E R V I C E '_' L E V E L;
K_ATTACH: A T T A C H;
K_DETACH: D E T A C H;
K_SERVICE_LEVELS: S E R V I C E '_' L E V E L S;
K_ATTACHED: A T T A C H E D;
K_FOR: F O R;
K_SERVICE: S E R V I C E;
K_LEVEL: L E V E L;
K_LEVELS: L E V E L S;
K_SCYLLA_TIMEUUID_LIST_INDEX: S C Y L L A '_' T I M E U U I D '_' L I S T '_' I N D E X;
K_SCYLLA_COUNTER_SHARD_LIST: S C Y L L A '_' C O U N T E R '_' S H A R D '_' L I S T;
K_SCYLLA_CLUSTERING_BOUND: S C Y L L A '_' C L U S T E R I N G '_' B O U N D;

View File

@@ -68,7 +68,7 @@ const std::chrono::minutes prepared_statements_cache::entry_expiry = std::chrono
class query_processor::internal_state {
service::query_state _qs;
public:
internal_state() : _qs(service::client_state::for_internal_calls(), empty_service_permit()) {
internal_state(qos::service_level_controller &sl_controller) : _qs(service::client_state::for_internal_calls(), empty_service_permit(), sl_controller) {
}
operator service::query_state&() {
return _qs;
@@ -84,14 +84,15 @@ public:
}
};
query_processor::query_processor(service::storage_proxy& proxy, database& db, service::migration_notifier& mn, service::migration_manager& mm, query_processor::memory_config mcfg, cql_config& cql_cfg)
query_processor::query_processor(service::storage_proxy& proxy, database& db, service::migration_notifier& mn, service::migration_manager& mm, query_processor::memory_config mcfg, cql_config& cql_cfg,
sharded<qos::service_level_controller> &sl_controller)
: _migration_subscriber{std::make_unique<migration_subscriber>(this)}
, _proxy(proxy)
, _db(db)
, _mnotifier(mn)
, _mm(mm)
, _cql_config(cql_cfg)
, _internal_state(new internal_state())
, _internal_state(new internal_state(sl_controller.local()))
, _prepared_cache(prep_cache_log, mcfg.prepared_statment_cache_size)
, _authorized_prepared_cache(std::min(std::chrono::milliseconds(_db.get_config().permissions_validity_in_ms()),
std::chrono::duration_cast<std::chrono::milliseconds>(prepared_statements_cache::entry_expiry)),

View File

@@ -154,7 +154,8 @@ public:
static std::unique_ptr<statements::raw::parsed_statement> parse_statement(const std::string_view& query);
query_processor(service::storage_proxy& proxy, database& db, service::migration_notifier& mn, service::migration_manager& mm, memory_config mcfg, cql_config& cql_cfg);
query_processor(service::storage_proxy& proxy, database& db, service::migration_notifier& mn, service::migration_manager& mm, memory_config mcfg, cql_config& cql_cfg,
sharded<qos::service_level_controller> &sl_controller);
~query_processor();

View File

@@ -0,0 +1,60 @@
/*
* Copyright (C) 2021 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include "seastarx.hh"
#include "cql3/statements/alter_service_level_statement.hh"
#include "service/qos/service_level_controller.hh"
#include "transport/messages/result_message.hh"
namespace cql3 {
namespace statements {
alter_service_level_statement::alter_service_level_statement(sstring service_level, shared_ptr<sl_prop_defs> attrs)
: _service_level(service_level) {
attrs->validate();
}
std::unique_ptr<cql3::statements::prepared_statement>
cql3::statements::alter_service_level_statement::prepare(
database &db, cql_stats &stats) {
return std::make_unique<prepared_statement>(::make_shared<alter_service_level_statement>(*this));
}
void alter_service_level_statement::validate(service::storage_proxy &, const service::client_state &) const {
}
future<> alter_service_level_statement::check_access(service::storage_proxy& sp, const service::client_state &state) const {
return state.ensure_has_permission(auth::command_desc{.permission = auth::permission::ALTER, .resource = auth::root_service_level_resource()});
}
future<::shared_ptr<cql_transport::messages::result_message>>
alter_service_level_statement::execute(query_processor& qp,
service::query_state &state,
const query_options &) const {
return state.get_service_level_controller().alter_distributed_service_level(_service_level, _slo).then([] {
using void_result_msg = cql_transport::messages::result_message::void_message;
using result_msg = cql_transport::messages::result_message;
return ::static_pointer_cast<result_msg>(make_shared<void_result_msg>());
});
}
}
}

View File

@@ -0,0 +1,48 @@
/*
* Copyright (C) 2021 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include <seastar/core/sstring.hh>
#include "cql3/statements/service_level_statement.hh"
#include "cql3/statements/sl_prop_defs.hh"
#include "service/qos/qos_common.hh"
namespace cql3 {
namespace statements {
class alter_service_level_statement final : public service_level_statement {
sstring _service_level;
qos::service_level_options _slo;
public:
alter_service_level_statement(sstring service_level, shared_ptr<sl_prop_defs> attrs);
std::unique_ptr<cql3::statements::prepared_statement> prepare(database &db, cql_stats &stats) override;
void validate(service::storage_proxy&, const service::client_state&) const override;
virtual future<> check_access(service::storage_proxy& sp, const service::client_state&) const override;
virtual future<::shared_ptr<cql_transport::messages::result_message>>
execute(query_processor&, service::query_state&, const query_options&) const override;
};
}
}

View File

@@ -0,0 +1,67 @@
/*
* Copyright (C) 2021 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include "seastarx.hh"
#include "cql3/statements/attach_service_level_statement.hh"
#include "service/qos/service_level_controller.hh"
#include "exceptions/exceptions.hh"
#include "transport/messages/result_message.hh"
namespace cql3 {
namespace statements {
attach_service_level_statement::attach_service_level_statement(sstring service_level, sstring role_name) :
_service_level(service_level), _role_name(role_name) {
}
std::unique_ptr<cql3::statements::prepared_statement>
cql3::statements::attach_service_level_statement::prepare(
database &db, cql_stats &stats) {
return std::make_unique<prepared_statement>(::make_shared<attach_service_level_statement>(*this));
}
void attach_service_level_statement::validate(service::storage_proxy &, const service::client_state &) const {
}
future<> attach_service_level_statement::check_access(service::storage_proxy& sp, const service::client_state &state) const {
return state.ensure_has_permission(auth::command_desc{.permission = auth::permission::AUTHORIZE, .resource = auth::root_service_level_resource()});
}
future<::shared_ptr<cql_transport::messages::result_message>>
attach_service_level_statement::execute(query_processor& qp,
service::query_state &state,
const query_options &) const {
return state.get_service_level_controller().get_distributed_service_level(_service_level).then([this] (qos::service_levels_info sli) {
if (sli.empty()) {
throw qos::nonexistant_service_level_exception(_service_level);
}
}).then([&state, this] () {
return state.get_client_state().get_auth_service()->underlying_role_manager().set_attribute(_role_name, "service_level", _service_level).then([] {
using void_result_msg = cql_transport::messages::result_message::void_message;
using result_msg = cql_transport::messages::result_message;
return ::static_pointer_cast<result_msg>(make_shared<void_result_msg>());
});
});
}
}
}

View File

@@ -0,0 +1,47 @@
/*
* Copyright (C) 2021 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include <seastar/core/sstring.hh>
#include "cql3/statements/service_level_statement.hh"
#include "service/qos/qos_common.hh"
namespace cql3 {
namespace statements {
class attach_service_level_statement final : public service_level_statement {
sstring _service_level;
sstring _role_name;
public:
attach_service_level_statement(sstring service_level, sstring role_name);
std::unique_ptr<cql3::statements::prepared_statement> prepare(database &db, cql_stats &stats) override;
void validate(service::storage_proxy&, const service::client_state&) const override;
virtual future<> check_access(service::storage_proxy& sp, const service::client_state&) const override;
virtual future<::shared_ptr<cql_transport::messages::result_message>>
execute(query_processor&, service::query_state&, const query_options&) const override;
};
}
}

View File

@@ -0,0 +1,60 @@
/*
* Copyright (C) 2021 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include "seastarx.hh"
#include "cql3/statements/create_service_level_statement.hh"
#include "service/qos/service_level_controller.hh"
#include "transport/messages/result_message.hh"
namespace cql3 {
namespace statements {
create_service_level_statement::create_service_level_statement(sstring service_level, shared_ptr<sl_prop_defs> attrs, bool if_not_exists)
: _service_level(service_level), _if_not_exists(if_not_exists) {
attrs->validate();
}
std::unique_ptr<cql3::statements::prepared_statement>
cql3::statements::create_service_level_statement::prepare(
database &db, cql_stats &stats) {
return std::make_unique<prepared_statement>(::make_shared<create_service_level_statement>(*this));
}
void create_service_level_statement::validate(service::storage_proxy &, const service::client_state &) const {
}
future<> create_service_level_statement::check_access(service::storage_proxy& sp, const service::client_state &state) const {
return state.ensure_has_permission(auth::command_desc{.permission = auth::permission::CREATE, .resource = auth::root_service_level_resource()});
}
future<::shared_ptr<cql_transport::messages::result_message>>
create_service_level_statement::execute(query_processor& qp,
service::query_state &state,
const query_options &) const {
return state.get_service_level_controller().add_distributed_service_level(_service_level, _slo, _if_not_exists).then([] {
using void_result_msg = cql_transport::messages::result_message::void_message;
using result_msg = cql_transport::messages::result_message;
return ::static_pointer_cast<result_msg>(make_shared<void_result_msg>());
});
}
}
}

View File

@@ -0,0 +1,49 @@
/*
* Copyright (C) 2021 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include <seastar/core/sstring.hh>
#include "cql3/statements/service_level_statement.hh"
#include "cql3/statements/sl_prop_defs.hh"
#include "service/qos/qos_common.hh"
namespace cql3 {
namespace statements {
class create_service_level_statement final : public service_level_statement {
sstring _service_level;
qos::service_level_options _slo;
bool _if_not_exists;
public:
create_service_level_statement(sstring service_level, shared_ptr<sl_prop_defs> attrs, bool if_not_exists);
std::unique_ptr<cql3::statements::prepared_statement> prepare(database &db, cql_stats &stats) override;
void validate(service::storage_proxy&, const service::client_state&) const override;
virtual future<> check_access(service::storage_proxy& sp, const service::client_state&) const override;
virtual future<::shared_ptr<cql_transport::messages::result_message>>
execute(query_processor&, service::query_state&, const query_options&) const override;
};
}
}

View File

@@ -0,0 +1,59 @@
/*
* Copyright (C) 2021 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include "seastarx.hh"
#include "cql3/statements/detach_service_level_statement.hh"
#include "service/qos/service_level_controller.hh"
#include "transport/messages/result_message.hh"
namespace cql3 {
namespace statements {
detach_service_level_statement::detach_service_level_statement(sstring role_name) :
_role_name(role_name) {
}
std::unique_ptr<cql3::statements::prepared_statement>
cql3::statements::detach_service_level_statement::prepare(
database &db, cql_stats &stats) {
return std::make_unique<prepared_statement>(::make_shared<detach_service_level_statement>(*this));
}
void detach_service_level_statement::validate(service::storage_proxy &, const service::client_state &) const {
}
future<> detach_service_level_statement::check_access(service::storage_proxy& sp, const service::client_state &state) const {
return state.ensure_has_permission(auth::command_desc{.permission = auth::permission::AUTHORIZE, .resource = auth::root_service_level_resource()});
}
future<::shared_ptr<cql_transport::messages::result_message>>
detach_service_level_statement::execute(query_processor& qp,
service::query_state &state,
const query_options &) const {
return state.get_client_state().get_auth_service()->underlying_role_manager().remove_attribute(_role_name, "service_level").then([] {
using void_result_msg = cql_transport::messages::result_message::void_message;
using result_msg = cql_transport::messages::result_message;
return ::static_pointer_cast<result_msg>(make_shared<void_result_msg>());
});
}
}
}

View File

@@ -0,0 +1,45 @@
/*
* Copyright (C) 2021 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include <seastar/core/sstring.hh>
#include "cql3/statements/service_level_statement.hh"
#include "service/qos/qos_common.hh"
namespace cql3 {
namespace statements {
class detach_service_level_statement final : public service_level_statement {
sstring _role_name;
public:
detach_service_level_statement(sstring role_name);
std::unique_ptr<cql3::statements::prepared_statement> prepare(database &db, cql_stats &stats) override;
void validate(service::storage_proxy&, const service::client_state&) const override;
virtual future<> check_access(service::storage_proxy& sp, const service::client_state&) const override;
virtual future<::shared_ptr<cql_transport::messages::result_message>>
execute(query_processor&, service::query_state&, const query_options&) const override;
};
}
}

View File

@@ -0,0 +1,59 @@
/*
* Copyright (C) 2021 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include "seastarx.hh"
#include "cql3/statements/drop_service_level_statement.hh"
#include "service/qos/service_level_controller.hh"
#include "transport/messages/result_message.hh"
namespace cql3 {
namespace statements {
drop_service_level_statement::drop_service_level_statement(sstring service_level, bool if_exists) :
_service_level(service_level), _if_exists(if_exists) {
}
std::unique_ptr<cql3::statements::prepared_statement>
cql3::statements::drop_service_level_statement::prepare(
database &db, cql_stats &stats) {
return std::make_unique<prepared_statement>(::make_shared<drop_service_level_statement>(*this));
}
void drop_service_level_statement::validate(service::storage_proxy &, const service::client_state &) const {
}
future<> drop_service_level_statement::check_access(service::storage_proxy& sp, const service::client_state &state) const {
return state.ensure_has_permission(auth::command_desc{.permission = auth::permission::DROP, .resource = auth::root_service_level_resource()});
}
future<::shared_ptr<cql_transport::messages::result_message>>
drop_service_level_statement::execute(query_processor& qp,
service::query_state &state,
const query_options &) const {
return state.get_service_level_controller().drop_distributed_service_level(_service_level, _if_exists).then([] {
using void_result_msg = cql_transport::messages::result_message::void_message;
using result_msg = cql_transport::messages::result_message;
return ::static_pointer_cast<result_msg>(make_shared<void_result_msg>());
});
}
}
}

View File

@@ -0,0 +1,46 @@
/*
* Copyright (C) 2021 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include <seastar/core/sstring.hh>
#include "cql3/statements/service_level_statement.hh"
#include "service/qos/qos_common.hh"
namespace cql3 {
namespace statements {
class drop_service_level_statement final : public service_level_statement {
sstring _service_level;
bool _if_exists;
public:
drop_service_level_statement(sstring service_level, bool if_exists);
std::unique_ptr<cql3::statements::prepared_statement> prepare(database &db, cql_stats &stats) override;
void validate(service::storage_proxy&, const service::client_state&) const override;
virtual future<> check_access(service::storage_proxy& sp, const service::client_state&) const override;
virtual future<::shared_ptr<cql_transport::messages::result_message>>
execute(query_processor&, service::query_state&, const query_options&) const override;
};
}
}

View File

@@ -0,0 +1,100 @@
/*
* Copyright (C) 2021 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include "seastarx.hh"
#include "cql3/statements/list_service_level_attachments_statement.hh"
#include "service/qos/service_level_controller.hh"
#include "transport/messages/result_message.hh"
namespace cql3 {
namespace statements {
list_service_level_attachments_statement::list_service_level_attachments_statement(sstring role_name) :
_role_name(role_name), _describe_all(false) {
}
list_service_level_attachments_statement::list_service_level_attachments_statement() :
_role_name(), _describe_all(true) {
}
std::unique_ptr<cql3::statements::prepared_statement>
cql3::statements::list_service_level_attachments_statement::prepare(
database &db, cql_stats &stats) {
return std::make_unique<prepared_statement>(::make_shared<list_service_level_attachments_statement>(*this));
}
void list_service_level_attachments_statement::validate(service::storage_proxy &, const service::client_state &) const {
}
future<> list_service_level_attachments_statement::check_access(service::storage_proxy& sp, const service::client_state &state) const {
return state.ensure_has_permission(auth::command_desc{.permission = auth::permission::DESCRIBE, .resource = auth::root_service_level_resource()});
}
future<::shared_ptr<cql_transport::messages::result_message>>
list_service_level_attachments_statement::execute(query_processor& qp,
service::query_state &state,
const query_options &) const {
static auto make_column = [] (sstring name, const shared_ptr<const abstract_type> type) {
return make_lw_shared<column_specification>(
"QOS",
"service_levels_attachments",
::make_shared<column_identifier>(std::move(name), true),
type);
};
static thread_local const std::vector<lw_shared_ptr<column_specification>> metadata({
make_column("role", utf8_type), make_column("service_level", utf8_type)
});
return make_ready_future().then([this, &state] () {
if (_describe_all) {
return state.get_client_state().get_auth_service()->underlying_role_manager().query_attribute_for_all("service_level");
} else {
return state.get_client_state().get_auth_service()->underlying_role_manager().get_attribute(_role_name, "service_level").then([this] (std::optional<sstring> att_val) {
std::unordered_map<sstring, sstring> ret;
if (att_val) {
ret.emplace(_role_name, *att_val);
}
return make_ready_future<std::unordered_map<sstring, sstring>>(ret);
});
}
}).then([this, &state] (std::unordered_map<sstring, sstring> roles_to_att_val) {
auto rs = std::make_unique<result_set>(metadata);
for (auto&& role_to_sl : roles_to_att_val) {
rs->add_row(std::vector<bytes_opt>{
utf8_type->decompose(role_to_sl.first),
utf8_type->decompose(role_to_sl.second),
});
}
auto rows = ::make_shared<cql_transport::messages::result_message::rows>(result(std::move(std::move(rs))));
return ::static_pointer_cast<cql_transport::messages::result_message>(rows);
});
}
}
}

View File

@@ -0,0 +1,47 @@
/*
* Copyright (C) 2021 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include <seastar/core/sstring.hh>
#include "cql3/statements/service_level_statement.hh"
#include "service/qos/qos_common.hh"
namespace cql3 {
namespace statements {
class list_service_level_attachments_statement final : public service_level_statement {
sstring _role_name;
bool _describe_all;
public:
list_service_level_attachments_statement(sstring role_name);
list_service_level_attachments_statement();
std::unique_ptr<cql3::statements::prepared_statement> prepare(database &db, cql_stats &stats) override;
void validate(service::storage_proxy&, const service::client_state&) const override;
virtual future<> check_access(service::storage_proxy& sp, const service::client_state&) const override;
virtual future<::shared_ptr<cql_transport::messages::result_message>>
execute(query_processor&, service::query_state&, const query_options&) const override;
};
}
}

View File

@@ -0,0 +1,82 @@
/*
* Copyright (C) 2021 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include "seastarx.hh"
#include "cql3/statements/list_service_level_statement.hh"
#include "service/qos/service_level_controller.hh"
#include "transport/messages/result_message.hh"
namespace cql3 {
namespace statements {
list_service_level_statement::list_service_level_statement(sstring service_level, bool describe_all) :
_service_level(service_level), _describe_all(describe_all) {
}
std::unique_ptr<cql3::statements::prepared_statement>
cql3::statements::list_service_level_statement::prepare(
database &db, cql_stats &stats) {
return std::make_unique<prepared_statement>(::make_shared<list_service_level_statement>(*this));
}
void list_service_level_statement::validate(service::storage_proxy &, const service::client_state &) const {
}
future<> list_service_level_statement::check_access(service::storage_proxy& sp, const service::client_state &state) const {
return state.ensure_has_permission(auth::command_desc{.permission = auth::permission::DESCRIBE, .resource = auth::root_service_level_resource()});
}
future<::shared_ptr<cql_transport::messages::result_message>>
list_service_level_statement::execute(query_processor& qp,
service::query_state &state,
const query_options &) const {
static auto make_column = [] (sstring name, const shared_ptr<const abstract_type> type) {
return make_lw_shared<column_specification>(
"QOS",
"service_levels",
::make_shared<column_identifier>(std::move(name), true),
type);
};
static thread_local const std::vector<lw_shared_ptr<column_specification>> metadata({make_column("service_level", utf8_type)});
return make_ready_future().then([this, &state] () {
if (_describe_all) {
return state.get_service_level_controller().get_distributed_service_levels();
} else {
return state.get_service_level_controller().get_distributed_service_level(_service_level);
}
})
.then([this] (qos::service_levels_info sl_info) {
auto rs = std::make_unique<result_set>(metadata);
for (auto &&sl : sl_info) {
rs->add_row(std::vector<bytes_opt>{
utf8_type->decompose(sl.first)});
}
auto rows = ::make_shared<cql_transport::messages::result_message::rows>(result(std::move(std::move(rs))));
return ::static_pointer_cast<cql_transport::messages::result_message>(rows);
});
}
}
}

View File

@@ -0,0 +1,46 @@
/*
* Copyright (C) 2021 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include <seastar/core/sstring.hh>
#include "cql3/statements/service_level_statement.hh"
#include "service/qos/qos_common.hh"
namespace cql3 {
namespace statements {
class list_service_level_statement final : public service_level_statement {
sstring _service_level;
bool _describe_all;
public:
list_service_level_statement(sstring service_level, bool describe_all);
std::unique_ptr<cql3::statements::prepared_statement> prepare(database &db, cql_stats &stats) override;
void validate(service::storage_proxy&, const service::client_state&) const override;
virtual future<> check_access(service::storage_proxy& sp, const service::client_state&) const override;
virtual future<::shared_ptr<cql_transport::messages::result_message>>
execute(query_processor&, service::query_state&, const query_options&) const override;
};
}
}

View File

@@ -0,0 +1,53 @@
/*
* Copyright (C) 2021 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include "service_level_statement.hh"
#include "transport/messages/result_message.hh"
namespace cql3 {
namespace statements {
uint32_t service_level_statement::get_bound_terms() const {
return 0;
}
bool service_level_statement::depends_on_keyspace(
const sstring &ks_name) const {
return false;
}
bool service_level_statement::depends_on_column_family(
const sstring &cf_name) const {
return false;
}
void service_level_statement::validate(
service::storage_proxy &,
const service::client_state &state) const {
}
future<> service_level_statement::check_access(service::storage_proxy& sp, const service::client_state &state) const {
return make_ready_future<>();
}
}
}

View File

@@ -0,0 +1,70 @@
/*
* Copyright (C) 2021 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include "cql3/cql_statement.hh"
#include "prepared_statement.hh"
#include "raw/parsed_statement.hh"
#include "transport/messages_fwd.hh"
namespace cql3 {
namespace statements {
///
/// A logical argument error for a service_level statement operation.
///
class service_level_argument_exception : public std::invalid_argument {
public:
using std::invalid_argument::invalid_argument;
};
///
/// An exception to indicate that the service level given as parameter doesn't exist.
///
class nonexitent_service_level_exception : public service_level_argument_exception {
public:
nonexitent_service_level_exception(sstring service_level_name)
: service_level_argument_exception(format("Service Level {} doesn't exists.", service_level_name)) {
}
};
class service_level_statement : public raw::parsed_statement, public cql_statement_no_metadata {
public:
service_level_statement() : cql_statement_no_metadata(&timeout_config::other_timeout) {}
uint32_t get_bound_terms() const override;
bool depends_on_keyspace(const sstring& ks_name) const override;
bool depends_on_column_family(const sstring& cf_name) const override;
future<> check_access(service::storage_proxy& sp, const service::client_state& state) const override;
void validate(service::storage_proxy&, const service::client_state& state) const override;
};
}
}

View File

@@ -0,0 +1,36 @@
/*
* Copyright (C) 2021 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include "cql3/statements/sl_prop_defs.hh"
#include "database.hh"
namespace cql3 {
namespace statements {
void sl_prop_defs::validate() {
property_definitions::validate({});
}
}
}

View File

@@ -0,0 +1,43 @@
/*
* Copyright (C) 2021 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include "cql3/statements/property_definitions.hh"
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/sstring.hh>
#include <optional>
class keyspace_metadata;
namespace cql3 {
namespace statements {
class sl_prop_defs : public property_definitions {
public:
void validate();
};
}
}

View File

@@ -128,12 +128,24 @@ schema_ptr cdc_timestamps() {
static const sstring CDC_TIMESTAMPS_KEY = "timestamps";
schema_ptr service_levels() {
static thread_local auto schema = [] {
auto id = generate_legacy_id(system_distributed_keyspace::NAME, system_distributed_keyspace::SERVICE_LEVELS);
return schema_builder(system_distributed_keyspace::NAME, system_distributed_keyspace::SERVICE_LEVELS, std::make_optional(id))
.with_column("service_level", utf8_type, column_kind::partition_key)
.with_version(db::system_keyspace::generate_schema_version(id))
.build();
}();
return schema;
}
static std::vector<schema_ptr> all_tables() {
return {
view_build_status(),
cdc_generations(),
cdc_desc(),
cdc_timestamps(),
service_levels(),
};
}
@@ -516,4 +528,42 @@ system_distributed_keyspace::get_cdc_desc_v1_timestamps(context ctx) {
co_return res;
}
future<qos::service_levels_info> system_distributed_keyspace::get_service_levels() const {
static sstring prepared_query = format("SELECT * FROM {}.{};", NAME, SERVICE_LEVELS);
return _qp.execute_internal(prepared_query, {}).then([] (shared_ptr<cql3::untyped_result_set> result_set) {
qos::service_levels_info service_levels;
for (auto &&row : *result_set) {
auto service_level_name = row.get_as<sstring>("service_level");
qos::service_level_options slo{};
service_levels.emplace(service_level_name, slo);
}
return service_levels;
});
}
future<qos::service_levels_info> system_distributed_keyspace::get_service_level(sstring service_level_name) const {
static sstring prepared_query = format("SELECT * FROM {}.{} WHERE service_level = ?;", NAME, SERVICE_LEVELS);
return _qp.execute_internal(prepared_query, {service_level_name}).then([] (shared_ptr<cql3::untyped_result_set> result_set) {
qos::service_levels_info service_levels;
if (!result_set->empty()) {
auto &&row = result_set->one();
auto service_level_name = row.get_as<sstring>("service_level");
qos::service_level_options slo{};
service_levels.emplace(service_level_name, slo);
}
return service_levels;
});
}
future<> system_distributed_keyspace::set_service_level(sstring service_level_name, qos::service_level_options slo) const {
static sstring prepared_puery = format("INSERT INTO {}.{} (service_level) VALUES (?);", NAME, SERVICE_LEVELS);
return _qp.execute_internal(prepared_puery, {service_level_name}).discard_result();
}
future<> system_distributed_keyspace::drop_service_level(sstring service_level_name) const {
static sstring prepared_query = format("DELETE FROM {}.{} WHERE service_level= ?;", NAME, SERVICE_LEVELS);
return _qp.execute_internal(prepared_query, {service_level_name}).discard_result();
}
}

View File

@@ -24,6 +24,7 @@
#include "bytes.hh"
#include "schema_fwd.hh"
#include "service/migration_manager.hh"
#include "service/qos/qos_common.hh"
#include "utils/UUID.hh"
#include "cdc/generation_id.hh"
@@ -52,6 +53,7 @@ class system_distributed_keyspace {
public:
static constexpr auto NAME = "system_distributed";
static constexpr auto VIEW_BUILD_STATUS = "view_build_status";
static constexpr auto SERVICE_LEVELS = "service_levels";
/* Nodes use this table to communicate new CDC stream generations to other nodes. */
static constexpr auto CDC_TOPOLOGY_DESCRIPTION = "cdc_generation_descriptions";
@@ -111,6 +113,10 @@ public:
future<db_clock::time_point> cdc_current_generation_timestamp(context);
future<qos::service_levels_info> get_service_levels() const;
future<qos::service_levels_info> get_service_level(sstring service_level_name) const;
future<> set_service_level(sstring service_level_name, qos::service_level_options slo) const;
future<> drop_service_level(sstring service_level_name) const;
};
}

38
docs/service_levels.md Normal file
View File

@@ -0,0 +1,38 @@
## Service Level Distributed Data
There are two system tables that are used to facilitate the service level feature.
### Service Level Attachment Table
```CREATE TABLE system_auth.role_attributes (
role text,
attribute_name text,
attribute_value text,
PRIMARY KEY (role, attribute_name))
```
The table was created with generality in mind, but its purpose is to record
information about roles. The table columns meaning are:
*role* - the name of the role that the attribute belongs to.
*attribute_name* - the name of the attribute for the role.
*attribute_value* - the value of the specified attribute.
For the service level, the relevant attribute name is `service_level`.
So for example in order to find out which `service_level` is attached to role `r`
one can run the following query:
```SELECT * FROM system_auth.role_attributes WHERE role='r' and attribute_name='service_level'
```
### Service Level Configuration Table
```CREATE TABLE system_distributed.service_levels (
service_level text PRIMARY KEY);
```
The table is used to store and distribute the service levels configuration.
The table column names meanings are:
*service_level* - the name of the service level.
This table is currently a stub and does not hold any parameters yet.
```

26
main.cc
View File

@@ -35,6 +35,7 @@
#include "service/migration_manager.hh"
#include "service/load_meter.hh"
#include "service/view_update_backlog_broker.hh"
#include "service/qos/service_level_controller.hh"
#include "streaming/stream_session.hh"
#include "db/system_keyspace.hh"
#include "db/system_distributed_keyspace.hh"
@@ -87,6 +88,7 @@
#include "alternator/tags_extension.hh"
#include "alternator/rmw_operation.hh"
#include "db/paxos_grace_seconds_extension.hh"
#include "service/qos/standard_service_level_distributed_data_accessor.hh"
#include "service/raft/raft_services.hh"
@@ -795,6 +797,16 @@ int main(int ac, char** av) {
mscfg.tcp_nodelay = netw::messaging_service::tcp_nodelay_what::local;
}
static sharded<auth::service> auth_service;
static sharded<qos::service_level_controller> sl_controller;
//starting service level controller
qos::service_level_options default_service_level_configuration;
sl_controller.start(std::ref(auth_service), default_service_level_configuration).get();
sl_controller.invoke_on_all(&qos::service_level_controller::start).get();
//This starts the update loop - but no real update happens until the data accessor is not initialized.
sl_controller.local().update_from_distributed_data(std::chrono::seconds(10));
netw::messaging_service::scheduling_config scfg;
scfg.statement_tenants = { {dbcfg.statement_scheduling_group, "$user"}, {default_scheduling_group(), "$system"} };
scfg.streaming = dbcfg.streaming_scheduling_group;
@@ -806,7 +818,6 @@ int main(int ac, char** av) {
netw::uninit_messaging_service(messaging).get();
});
static sharded<auth::service> auth_service;
static sharded<db::system_distributed_keyspace> sys_dist_ks;
static sharded<db::view::view_update_generator> view_update_generator;
static sharded<cql3::cql_config> cql_config;
@@ -934,7 +945,7 @@ int main(int ac, char** av) {
supervisor::notify("starting query processor");
cql3::query_processor::memory_config qp_mcfg = {memory::stats().total_memory() / 256, memory::stats().total_memory() / 2560};
debug::the_query_processor = &qp;
qp.start(std::ref(proxy), std::ref(db), std::ref(mm_notifier), std::ref(mm), qp_mcfg, std::ref(cql_config)).get();
qp.start(std::ref(proxy), std::ref(db), std::ref(mm_notifier), std::ref(mm), qp_mcfg, std::ref(cql_config), std::ref(sl_controller)).get();
// #293 - do not stop anything
// engine().at_exit([&qp] { return qp.stop(); });
supervisor::notify("initializing batchlog manager");
@@ -1170,6 +1181,11 @@ int main(int ac, char** av) {
return ss.join_cluster();
}).get();
sl_controller.invoke_on_all([] (qos::service_level_controller& controller) {
controller.set_distributed_data_accessor(::static_pointer_cast<qos::service_level_controller::service_level_distributed_data_accessor>(
::make_shared<qos::standard_service_level_distributed_data_accessor>(sys_dist_ks.local())));
}).get();
supervisor::notify("starting tracing");
tracing::tracing::start_tracing(qp).get();
auto stop_tracing = defer_verbose_shutdown("tracing", [] {
@@ -1276,7 +1292,7 @@ int main(int ac, char** av) {
db.revert_initial_system_read_concurrency_boost();
}).get();
cql_transport::controller cql_server_ctl(db, auth_service, mm_notifier, gossiper.local(), qp, service_memory_limiter);
cql_transport::controller cql_server_ctl(db, auth_service, mm_notifier, gossiper.local(), qp, service_memory_limiter, sl_controller);
ss.register_client_shutdown_hook("native transport", [&cql_server_ctl] {
cql_server_ctl.stop().get();
@@ -1429,6 +1445,10 @@ int main(int ac, char** av) {
repair_shutdown(service::get_local_storage_service().db()).get();
});
auto stop_sl_controller = defer_verbose_shutdown("service level controller", [] {
sl_controller.stop().get();
});
auto stop_view_update_generator = defer_verbose_shutdown("view update generator", [] {
view_update_generator.stop().get();
});

24
service/qos/qos_common.cc Normal file
View File

@@ -0,0 +1,24 @@
/*
* Copyright (C) 2020 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include "qos_common.hh"
namespace qos {
}

63
service/qos/qos_common.hh Normal file
View File

@@ -0,0 +1,63 @@
/*
* Copyright (C) 2020 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include "seastarx.hh"
#include <seastar/core/sstring.hh>
#include <seastar/core/print.hh>
#include <map>
#include <stdexcept>
namespace qos {
/**
* a structure that holds the configuration for
* a service level.
*/
struct service_level_options {
bool operator==(const service_level_options& other) const = default;
bool operator!=(const service_level_options& other) const = default;
};
using service_levels_info = std::map<sstring, service_level_options>;
///
/// A logical argument error for a service_level statement operation.
///
class service_level_argument_exception : public std::invalid_argument {
public:
using std::invalid_argument::invalid_argument;
};
///
/// An exception to indicate that the service level given as parameter doesn't exist.
///
class nonexistant_service_level_exception : public service_level_argument_exception {
public:
nonexistant_service_level_exception(sstring service_level_name)
: service_level_argument_exception(format("Service Level {} doesn't exists.", service_level_name)) {
}
};
}

View File

@@ -0,0 +1,360 @@
/*
* Copyright (C) 2020 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include <algorithm>
#include "service_level_controller.hh"
#include "service/storage_service.hh"
#include "service/priority_manager.hh"
#include "message/messaging_service.hh"
#include "db/system_distributed_keyspace.hh"
namespace qos {
sstring service_level_controller::default_service_level_name = "default";
service_level_controller::service_level_controller(sharded<auth::service>& auth_service, service_level_options default_service_level_config):
_sl_data_accessor(nullptr),
_auth_service(auth_service)
{
if (this_shard_id() == global_controller) {
_global_controller_db = std::make_unique<global_controller_data>();
_global_controller_db->default_service_level_config = default_service_level_config;
}
}
future<> service_level_controller::add_service_level(sstring name, service_level_options slo, bool is_static) {
return container().invoke_on(global_controller, [=] (service_level_controller &sl_controller) {
return with_semaphore(sl_controller._global_controller_db->notifications_serializer, 1, [&sl_controller, name, slo, is_static] () {
return sl_controller.do_add_service_level(name, slo, is_static);
});
});
}
future<> service_level_controller::remove_service_level(sstring name, bool remove_static) {
return container().invoke_on(global_controller, [=] (service_level_controller &sl_controller) {
return with_semaphore(sl_controller._global_controller_db->notifications_serializer, 1, [&sl_controller, name, remove_static] () {
return sl_controller.do_remove_service_level(name, remove_static);
});
});
}
future<> service_level_controller::start() {
if (this_shard_id() != global_controller) {
return make_ready_future();
}
return with_semaphore(_global_controller_db->notifications_serializer, 1, [this] () {
return do_add_service_level(default_service_level_name, _global_controller_db->default_service_level_config, true).then([this] () {
return container().invoke_on_all([] (service_level_controller& sl) {
sl._default_service_level = sl.get_service_level(default_service_level_name);
});
});
});
}
void service_level_controller::set_distributed_data_accessor(service_level_distributed_data_accessor_ptr sl_data_accessor) {
// unregistering the accessor is always legal
if (!sl_data_accessor) {
_sl_data_accessor = nullptr;
}
// Registration of a new accessor can be done only when the _sl_data_accessor is not already set.
// This behavior is intended to allow to unit testing debug to set this value without having
// overriden by storage_proxy
if (!_sl_data_accessor) {
_sl_data_accessor = sl_data_accessor;
}
}
future<> service_level_controller::stop() {
// unregister from the service level distributed data accessor.
_sl_data_accessor = nullptr;
if (this_shard_id() == global_controller) {
// abort the loop of the distributed data checking if it is running
_global_controller_db->dist_data_update_aborter.request_abort();
_global_controller_db->notifications_serializer.broken();
}
return std::exchange(_distributed_data_updater, make_ready_future<>());
}
future<> service_level_controller::update_service_levels_from_distributed_data() {
if (!_sl_data_accessor) {
return make_ready_future();
}
return container().invoke_on(global_controller, [] (service_level_controller& sl_controller) {
return with_semaphore(sl_controller._global_controller_db->notifications_serializer, 1, [&sl_controller] () {
return async([&sl_controller] () {
service_levels_info service_levels = sl_controller._sl_data_accessor->get_service_levels().get0();
service_levels_info service_levels_for_add_or_update;
service_levels_info service_levels_for_delete;
auto current_it = sl_controller._service_levels_db.begin();
auto new_state_it = service_levels.begin();
// we want to detect 3 kinds of objects in one pass -
// 1. new service levels that have been added to the distributed keyspace
// 2. existing service levels that have changed
// 3. removed service levels
// this loop is batching together add/update operation and remove operation
// then they are all executed together.The reason for this is to allow for
// firstly delete all that there is to be deleted and only then adding new
// service levels.
while (current_it != sl_controller._service_levels_db.end() && new_state_it != service_levels.end()) {
if (current_it->first == new_state_it->first) {
//the service level exists on both the cureent and new state.
if (current_it->second.slo != new_state_it->second) {
// The service level configuration is different
// in the new state and the old state, meaning it needs to be updated.
service_levels_for_add_or_update.insert(*new_state_it);
}
current_it++;
new_state_it++;
} else if (current_it->first < new_state_it->first) {
//The service level does not exists in the new state so it needs to be
//removed, but only if it is not static since static configurations dont
//come from the distributed keyspace but from code.
if (!current_it->second.is_static) {
service_levels_for_delete.emplace(current_it->first, current_it->second.slo);
}
current_it++;
} else { /*new_it->first < current_it->first */
// The service level exits in the new state but not in the old state
// so it needs to be added.
service_levels_for_add_or_update.insert(*new_state_it);
new_state_it++;
}
}
for (; current_it != sl_controller._service_levels_db.end(); current_it++) {
service_levels_for_delete.emplace(current_it->first, current_it->second.slo);
}
std::copy(new_state_it, service_levels.end(), std::inserter(service_levels_for_add_or_update,
service_levels_for_add_or_update.end()));
for (auto&& sl : service_levels_for_delete) {
sl_controller.do_remove_service_level(sl.first, false).get();
}
for (auto&& sl : service_levels_for_add_or_update) {
sl_controller.do_add_service_level(sl.first, sl.second).get();
}
});
});
});
}
future<sstring> service_level_controller::find_service_level(auth::role_set roles) {
// FIXME: this arbitrary order based on names is OK for now, because the service levels do not yet
// have any parameters. Once they do, this comparison could be based on comparing the parameters.
static auto sl_compare = std::less<sstring>();
auto& role_manager = _auth_service.local().underlying_role_manager();
// converts a list of roles into the chosen service level.
return ::map_reduce(roles.begin(), roles.end(), [&role_manager, this] (const sstring& role) {
return role_manager.get_attribute(role, "service_level").then_wrapped([this, role] (future<std::optional<sstring>> sl_name_fut) {
try {
std::optional<sstring> sl_name = sl_name_fut.get0();
if (! sl_name) {
return sl_name;
}
auto sl_it = _service_levels_db.find(*sl_name);
if ( sl_it == _service_levels_db.end()) {
return std::optional<sstring>{};
} else {
return sl_name;
}
} catch(...) { // when we fail, we act as if the attribute does not exist so the node
// will not be brought down.
return std::optional<sstring>{};
}
});
}, std::optional<sstring>{}, [this] (std::optional<sstring> first, std::optional<sstring> second) {
if (!second) {
return first;
} else if (!first) {
return second;
} else {
return std::optional<sstring>{ sl_compare(*first, *second) ? second : first };
}
}).then([] (std::optional<sstring> sl) {
return sl? *sl:default_service_level_name;
});
}
future<> service_level_controller::notify_service_level_added(sstring name, service_level sl_data) {
_service_levels_db.emplace(name, sl_data);
return make_ready_future();
}
future<> service_level_controller::notify_service_level_updated(sstring name, service_level_options slo) {
auto sl_it = _service_levels_db.find(name);
future<> f = make_ready_future();
if (sl_it != _service_levels_db.end()) {
sl_it->second.slo = slo;
}
return f;
}
future<> service_level_controller::notify_service_level_removed(sstring name) {
auto sl_it = _service_levels_db.find(name);
if (sl_it != _service_levels_db.end()) {
_service_levels_db.erase(sl_it);
}
return make_ready_future();
}
void service_level_controller::update_from_distributed_data(std::chrono::duration<float> interval) {
_distributed_data_updater = container().invoke_on(global_controller, [interval] (service_level_controller& global_sl) {
if (global_sl._global_controller_db->distributed_data_update.available()) {
global_sl._global_controller_db->distributed_data_update = repeat([interval, &global_sl] {
return sleep_abortable<steady_clock_type>(std::chrono::duration_cast<steady_clock_type::duration>(interval),
global_sl._global_controller_db->dist_data_update_aborter).then_wrapped([&global_sl] (future<>&& f) {
try {
f.get();
return global_sl.update_service_levels_from_distributed_data().then([] {
return stop_iteration::no;
});
}
catch (const sleep_aborted& e) {
return make_ready_future<seastar::bool_class<seastar::stop_iteration_tag>>(stop_iteration::yes);
}
});
});
}
});
}
future<> service_level_controller::add_distributed_service_level(sstring name, service_level_options slo, bool if_not_exists) {
set_service_level_op_type add_type = if_not_exists ? set_service_level_op_type::add_if_not_exists : set_service_level_op_type::add;
return set_distributed_service_level(name, slo, add_type);
}
future<> service_level_controller::alter_distributed_service_level(sstring name, service_level_options slo) {
return set_distributed_service_level(name, slo, set_service_level_op_type::alter);
}
future<> service_level_controller::drop_distributed_service_level(sstring name, bool if_exists) {
return _sl_data_accessor->get_service_levels().then([this, name, if_exists] (qos::service_levels_info sl_info) {
auto it = sl_info.find(name);
if (it == sl_info.end()) {
if (if_exists) {
return make_ready_future();
} else {
return make_exception_future(nonexistant_service_level_exception(name));
}
} else {
auto& role_manager = _auth_service.local().underlying_role_manager();
return role_manager.query_attribute_for_all("service_level").then( [&role_manager, name] (auth::role_manager::attribute_vals attributes) {
return parallel_for_each(attributes.begin(), attributes.end(), [&role_manager, name] (auto&& attr) {
if (attr.second == name) {
return role_manager.remove_attribute(attr.first, "service_level");
} else {
return make_ready_future();
}
});
}).then([this, name] {
return _sl_data_accessor->drop_service_level(name);
});
}
});
}
future<service_levels_info> service_level_controller::get_distributed_service_levels() {
return _sl_data_accessor->get_service_levels();
}
future<service_levels_info> service_level_controller::get_distributed_service_level(sstring service_level_name) {
return _sl_data_accessor->get_service_level(service_level_name);
}
future<> service_level_controller::set_distributed_service_level(sstring name, service_level_options slo, set_service_level_op_type op_type) {
return _sl_data_accessor->get_service_levels().then([this, name, slo, op_type] (qos::service_levels_info sl_info) {
auto it = sl_info.find(name);
// test for illegal requests or requests that should terminate without any action
if (it == sl_info.end()) {
if (op_type == set_service_level_op_type::alter) {
return make_exception_future(exceptions::invalid_request_exception(format("The service level '{}' desn't exist.", name)));
}
} else {
if (op_type == set_service_level_op_type::add) {
return make_exception_future(exceptions::invalid_request_exception(format("The service level '{}' already exists.", name)));
} else if (op_type == set_service_level_op_type::add_if_not_exists) {
return make_ready_future();
}
}
return _sl_data_accessor->set_service_level(name, slo);
});
}
future<> service_level_controller::do_add_service_level(sstring name, service_level_options slo, bool is_static) {
auto service_level_it = _service_levels_db.find(name);
if (is_static) {
_global_controller_db->static_configurations[name] = slo;
}
if (service_level_it != _service_levels_db.end()) {
if ((is_static && service_level_it->second.is_static) || !is_static) {
if ((service_level_it->second.is_static) && (!is_static)) {
service_level_it->second.is_static = false;
}
return container().invoke_on_all(&service_level_controller::notify_service_level_updated, name, slo);
} else {
// this means we set static layer when the the service level
// is running of the non static configuration. so we have nothing
// else to do since we already saved the static configuration.
return make_ready_future();
}
} else {
return do_with(service_level{.slo = slo, .is_static = is_static}, std::move(name), [this] (service_level& sl, sstring& name) {
return container().invoke_on_all(&service_level_controller::notify_service_level_added, name, sl);
});
}
return make_ready_future();
}
future<> service_level_controller::do_remove_service_level(sstring name, bool remove_static) {
auto service_level_it = _service_levels_db.find(name);
if (service_level_it != _service_levels_db.end()) {
auto static_conf_it = _global_controller_db->static_configurations.end();
bool static_exists = false;
if (remove_static) {
_global_controller_db->static_configurations.erase(name);
} else {
static_conf_it = _global_controller_db->static_configurations.find(name);
static_exists = static_conf_it != _global_controller_db->static_configurations.end();
}
if (remove_static && service_level_it->second.is_static) {
return container().invoke_on_all(&service_level_controller::notify_service_level_removed, name);
} else if (!remove_static && !service_level_it->second.is_static) {
if (static_exists) {
service_level_it->second.is_static = true;
return container().invoke_on_all(&service_level_controller::notify_service_level_updated, name, static_conf_it->second);
} else {
return container().invoke_on_all(&service_level_controller::notify_service_level_removed, name);
}
}
}
return make_ready_future();
}
}

View File

@@ -0,0 +1,242 @@
/*
* Copyright (C) 2020 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include "seastarx.hh"
#include "log.hh"
#include "auth/role_manager.hh"
#include "auth/authenticated_user.hh"
#include <seastar/core/sstring.hh>
#include <seastar/core/distributed.hh>
#include "auth/service.hh"
#include "service/storage_service.hh"
#include <map>
#include <unordered_set>
#include "qos_common.hh"
namespace db {
class system_distributed_keyspace;
}
namespace qos {
/**
* a structure to hold a service level
* data and configuration.
*/
struct service_level {
service_level_options slo;
bool marked_for_deletion;
bool is_static;
};
/**
* The service_level_controller class is an implementation of the service level
* controller design.
* It is logically divided into 2 parts:
* 1. Global controller which is responsible for all of the data and plumbing
* manipulation.
* 2. Local controllers that act upon the data and facilitates execution in
* the service level context
*/
class service_level_controller : public peering_sharded_service<service_level_controller> {
public:
class service_level_distributed_data_accessor {
public:
virtual future<qos::service_levels_info> get_service_levels() const = 0;
virtual future<qos::service_levels_info> get_service_level(sstring service_level_name) const = 0;
virtual future<> set_service_level(sstring service_level_name, qos::service_level_options slo) const = 0;
virtual future<> drop_service_level(sstring service_level_name) const = 0;
};
using service_level_distributed_data_accessor_ptr = ::shared_ptr<service_level_distributed_data_accessor>;
private:
struct global_controller_data {
service_levels_info static_configurations{};
int schedg_group_cnt = 0;
int io_priority_cnt = 0;
service_level_options default_service_level_config;
// The below future is used to serialize work so no reordering can occur.
// This is needed so for example: delete(x), add(x) will not reverse yielding
// a completely different result than the one intended.
future<> work_future = make_ready_future();
semaphore notifications_serializer = semaphore(1);
future<> distributed_data_update = make_ready_future();
abort_source dist_data_update_aborter;
};
std::unique_ptr<global_controller_data> _global_controller_db;
static constexpr shard_id global_controller = 0;
std::unordered_map<sstring, service_level> _service_levels_db;
std::unordered_map<sstring, sstring> _role_to_service_level;
service_level _default_service_level;
service_level_distributed_data_accessor_ptr _sl_data_accessor;
sharded<auth::service>& _auth_service;
future<> _distributed_data_updater = make_ready_future<>();
public:
service_level_controller(sharded<auth::service>& auth_service, service_level_options default_service_level_config);
/**
* this function must be called *once* from any shard before any other functions are called.
* No other function should be called before the future returned by the function is resolved.
* @return a future that resolves when the initialization is over.
*/
future<> start();
void set_distributed_data_accessor(service_level_distributed_data_accessor_ptr sl_data_accessor);
/**
* Adds a service level configuration if it doesn't exists, and updates
* an the existing one if it does exist.
* Handles both, static and non static service level configurations.
* @param name - the service level name.
* @param slo - the service level configuration
* @param is_static - is this configuration static or not
*/
future<> add_service_level(sstring name, service_level_options slo, bool is_static = false);
/**
* Removes a service level configuration if it exists.
* Handles both, static and non static service level configurations.
* @param name - the service level name.
* @param remove_static - indicates if it is a removal of a static configuration
* or not.
*/
future<> remove_service_level(sstring name, bool remove_static);
/**
* stops all ongoing operations if exists
* @return a future that is resolved when all operations has stopped
*/
future<> stop();
/**
* this is an executor of a function with arguments under a service level
* that corresponds to a given user.
* @param usr - the user for determining the service level
* @param func - the function to be executed
* @return a future that is resolved when the function's operation is resolved
* (if it returns a future). or a ready future containing the returned value
* from the function/
*/
template <typename Ret>
futurize_t<Ret> with_user_service_level(shared_ptr<auth::authenticated_user> usr, noncopyable_function<Ret()> func) {
if (usr) {
auth::service& ser = _auth_service.local();
return auth::get_roles(ser, *usr).then([this] (auth::role_set roles) {
return find_service_level(roles);
}).then([usr, this, func = std::move(func)] (sstring service_level_name) mutable {
return with_service_level(service_level_name, std::move(func));
});
} else {
return with_service_level(default_service_level_name, std::move(func));
}
}
/**
* Chack the distributed data for changes in a constant interval and updates
* the service_levels configuration in accordance (adds, removes, or updates
* service levels as necessairy).
* @param interval - the interval is seconds to check the distributed data.
* @return a future that is resolved when the update loop stops.
*/
void update_from_distributed_data(std::chrono::duration<float> interval);
/**
* Updates the service level data from the distributed data store.
* @return a future that is resolved when the update is done
*/
future<> update_service_levels_from_distributed_data();
future<> add_distributed_service_level(sstring name, service_level_options slo, bool if_not_exsists);
future<> alter_distributed_service_level(sstring name, service_level_options slo);
future<> drop_distributed_service_level(sstring name, bool if_exists);
future<service_levels_info> get_distributed_service_levels();
future<service_levels_info> get_distributed_service_level(sstring service_level_name);
private:
/**
* Adds a service level configuration if it doesn't exists, and updates
* an the existing one if it does exist.
* Handles both, static and non static service level configurations.
* @param name - the service level name.
* @param slo - the service level configuration
* @param is_static - is this configuration static or not
*/
future<> do_add_service_level(sstring name, service_level_options slo, bool is_static = false);
/**
* Removes a service level configuration if it exists.
* Handles both, static and non static service level configurations.
* @param name - the service level name.
* @param remove_static - indicates if it is a removal of a static configuration
* or not.
*/
future<> do_remove_service_level(sstring name, bool remove_static);
/**
* Returns the service level **in effect** for a user having the given
* collection of roles.
* @param roles - the collection of roles to consider
* @return the name of the service level in effect.
*/
future<sstring> find_service_level(auth::role_set roles);
/**
* The notify functions are used by the global service level controller
* to propagate configuration changes to the local controllers.
* the returned future is resolved when the local controller is done acting
* on the notification. updates are done in sequence so their meaning will not
* change due to execution reordering.
*/
future<> notify_service_level_added(sstring name, service_level sl_data);
future<> notify_service_level_updated(sstring name, service_level_options slo);
future<> notify_service_level_removed(sstring name);
/**
* Gets the service level data by name.
* @param service_level_name - the name of the requested service level
* @return the service level data if it exists (in the local controller) or
* get_service_level("default") otherwise.
*/
service_level& get_service_level(sstring service_level_name) {
auto sl_it = _service_levels_db.find(service_level_name);
if (sl_it == _service_levels_db.end() || sl_it->second.marked_for_deletion) {
sl_it = _service_levels_db.find(default_service_level_name);
}
return sl_it->second;
}
enum class set_service_level_op_type {
add_if_not_exists,
add,
alter
};
future<> set_distributed_service_level(sstring name, service_level_options slo, set_service_level_op_type op_type);
public:
static sstring default_service_level_name;
};
}

View File

@@ -0,0 +1,47 @@
/*
* Copyright (C) 2020 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include "standard_service_level_distributed_data_accessor.hh"
#include "db/system_distributed_keyspace.hh"
namespace qos {
standard_service_level_distributed_data_accessor::standard_service_level_distributed_data_accessor(db::system_distributed_keyspace &sys_dist_ks):
_sys_dist_ks(sys_dist_ks) {
}
future<qos::service_levels_info> standard_service_level_distributed_data_accessor::get_service_levels() const {
return _sys_dist_ks.get_service_levels();
}
future<qos::service_levels_info> standard_service_level_distributed_data_accessor::get_service_level(sstring service_level_name) const {
return _sys_dist_ks.get_service_level(service_level_name);
}
future<> standard_service_level_distributed_data_accessor::set_service_level(sstring service_level_name, qos::service_level_options slo) const {
return _sys_dist_ks.set_service_level(service_level_name, slo);
}
future<> standard_service_level_distributed_data_accessor::drop_service_level(sstring service_level_name) const {
return _sys_dist_ks.drop_service_level(service_level_name);
}
}

View File

@@ -0,0 +1,43 @@
/*
* Copyright (C) 2020 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include "seastarx.hh"
#include "service_level_controller.hh"
namespace db {
class system_distributed_keyspace;
}
namespace qos {
class standard_service_level_distributed_data_accessor : public service_level_controller::service_level_distributed_data_accessor,
public ::enable_shared_from_this<standard_service_level_distributed_data_accessor> {
private:
db::system_distributed_keyspace& _sys_dist_ks;
public:
standard_service_level_distributed_data_accessor(db::system_distributed_keyspace &sys_dist_ks);
virtual future<qos::service_levels_info> get_service_levels() const override;
virtual future<qos::service_levels_info> get_service_level(sstring service_level_name) const override;
virtual future<> set_service_level(sstring service_level_name, qos::service_level_options slo) const override;
virtual future<> drop_service_level(sstring service_level_name) const override;
};
}

View File

@@ -27,6 +27,9 @@
#include "tracing/tracing.hh"
#include "service_permit.hh"
namespace qos {
class service_level_controller;
}
namespace service {
class query_state final {
@@ -34,6 +37,7 @@ private:
client_state& _client_state;
tracing::trace_state_ptr _trace_state_ptr;
service_permit _permit;
std::optional<std::reference_wrapper<qos::service_level_controller>> _sl_controller;
public:
query_state(client_state& client_state, service_permit permit)
@@ -42,12 +46,26 @@ public:
, _permit(std::move(permit))
{}
query_state(client_state& client_state, service_permit permit, qos::service_level_controller &sl_controller)
: _client_state(client_state)
, _trace_state_ptr(tracing::trace_state_ptr())
, _permit(std::move(permit))
, _sl_controller(sl_controller)
{}
query_state(client_state& client_state, tracing::trace_state_ptr trace_state_ptr, service_permit permit)
: _client_state(client_state)
, _trace_state_ptr(std::move(trace_state_ptr))
, _permit(std::move(permit))
{ }
query_state(client_state& client_state, tracing::trace_state_ptr trace_state_ptr, service_permit permit, qos::service_level_controller& sl_controller)
: _client_state(client_state)
, _trace_state_ptr(std::move(trace_state_ptr))
, _permit(std::move(permit))
, _sl_controller(sl_controller)
{ }
const tracing::trace_state_ptr& get_trace_state() const {
return _trace_state_ptr;
}
@@ -75,6 +93,10 @@ public:
return std::move(_permit);
}
qos::service_level_controller& get_service_level_controller() const {
return _sl_controller->get();
}
};
}

View File

@@ -50,6 +50,13 @@ BOOST_AUTO_TEST_CASE(root_of) {
const auto rv = auth::role_resource_view(rr);
BOOST_REQUIRE(!rv.role());
//
// service_level
//
const auto slr = auth::resource(auth::resource_kind::service_level);
BOOST_REQUIRE_EQUAL(slr.kind(), auth::resource_kind::service_level);
}
BOOST_AUTO_TEST_CASE(data) {
@@ -76,6 +83,11 @@ BOOST_AUTO_TEST_CASE(role) {
BOOST_REQUIRE_EQUAL(*v.role(), "joe");
}
BOOST_AUTO_TEST_CASE(service_level) {
const auto r = auth::make_service_level_resource();
BOOST_REQUIRE_EQUAL(r.kind(), auth::resource_kind::service_level);
}
BOOST_AUTO_TEST_CASE(from_name) {
//
// data
@@ -104,6 +116,15 @@ BOOST_AUTO_TEST_CASE(from_name) {
BOOST_REQUIRE_THROW(auth::parse_resource("roles/joe/smith"), auth::invalid_resource_name);
//
//service_level
//
const auto slr1 = auth::parse_resource("service_levels");
BOOST_REQUIRE_EQUAL(slr1, auth::root_service_level_resource());
BOOST_REQUIRE_THROW(auth::parse_resource("service_levels/low_priority"), auth::invalid_resource_name);
//
// Generic errors.
//
@@ -127,6 +148,12 @@ BOOST_AUTO_TEST_CASE(name) {
BOOST_REQUIRE_EQUAL(auth::root_role_resource().name(), "roles");
BOOST_REQUIRE_EQUAL(auth::make_role_resource("joe").name(), "roles/joe");
//
// service_level
//
BOOST_REQUIRE_EQUAL(auth::root_service_level_resource().name(), "service_levels");
}
BOOST_AUTO_TEST_CASE(parent) {
@@ -162,6 +189,12 @@ BOOST_AUTO_TEST_CASE(output) {
BOOST_REQUIRE_EQUAL(format("{}", auth::root_role_resource()), "<all roles>");
BOOST_REQUIRE_EQUAL(format("{}", auth::make_role_resource("joe")), "<role joe>");
//
// service_level
//
BOOST_REQUIRE_EQUAL(format("{}", auth::root_service_level_resource()), "<all service levels>");
}
BOOST_AUTO_TEST_CASE(expand) {

View File

@@ -54,6 +54,7 @@
#include <regex>
#include "gms/feature.hh"
#include "db/query_context.hh"
#include "service/qos/qos_common.hh"
using namespace std::literals::chrono_literals;
@@ -4881,3 +4882,81 @@ SEASTAR_THREAD_TEST_CASE(test_query_unselected_columns) {
}
}, std::move(cfg), thread_attributes{.sched_group = statement_sched_group}).get();
}
SEASTAR_TEST_CASE(test_user_based_sla_queries) {
return do_with_cql_env_thread([] (cql_test_env& e) {
// test create service level with defaults
e.execute_cql("CREATE SERVICE_LEVEL sl_1;").get();
auto msg = e.execute_cql("LIST SERVICE_LEVEL sl_1;").get0();
assert_that(msg).is_rows().with_rows({
{utf8_type->decompose("sl_1")},
});
e.execute_cql("CREATE SERVICE_LEVEL sl_2;").get();
//drop service levels
e.execute_cql("DROP SERVICE_LEVEL sl_1;").get();
msg = e.execute_cql("LIST ALL SERVICE_LEVELS;").get0();
assert_that(msg).is_rows().with_rows({
{utf8_type->decompose("sl_2")},
});
// validate exceptions (illegal requests)
BOOST_REQUIRE_THROW(e.execute_cql("DROP SERVICE_LEVEL sl_1;").get(), qos::nonexistant_service_level_exception);
e.execute_cql("DROP SERVICE_LEVEL IF EXISTS sl_1;").get();
BOOST_REQUIRE_THROW(e.execute_cql("CREATE SERVICE_LEVEL sl_2;").get(), exceptions::invalid_request_exception);
BOOST_REQUIRE_THROW(e.execute_cql("CREATE SERVICE_LEVEL sl_2;").get(), exceptions::invalid_request_exception);
e.execute_cql("CREATE SERVICE_LEVEL IF NOT EXISTS sl_2;").get();
// test attach role
e.execute_cql("ATTACH SERVICE_LEVEL sl_2 TO tester").get();
msg = e.execute_cql("LIST ATTACHED SERVICE_LEVEL OF tester;").get0();
assert_that(msg).is_rows().with_rows({
{utf8_type->decompose("tester"), utf8_type->decompose("sl_2")},
});
msg = e.execute_cql("LIST ALL ATTACHED SERVICE_LEVELS;").get0();
assert_that(msg).is_rows().with_rows({
{utf8_type->decompose("tester"), utf8_type->decompose("sl_2")},
});
// test attachment illegal request
BOOST_REQUIRE_THROW(e.execute_cql("ATTACH SERVICE_LEVEL sl_2 TO tester2;").get(), auth::nonexistant_role);
BOOST_REQUIRE_THROW(e.execute_cql("ATTACH SERVICE_LEVEL sl_1 TO tester;").get(), qos::nonexistant_service_level_exception);
BOOST_CHECK(true);
// tests detaching service levels
e.execute_cql("CREATE ROLE tester2;").get();
e.execute_cql("CREATE SERVICE_LEVEL sl_1;").get();
e.execute_cql("ATTACH SERVICE_LEVEL sl_1 TO tester2;").get();
e.execute_cql("DETACH SERVICE_LEVEL FROM tester;").get();
msg = e.execute_cql("LIST ATTACHED SERVICE_LEVEL OF tester2;").get0();
assert_that(msg).is_rows().with_rows({
{utf8_type->decompose("tester2"), utf8_type->decompose("sl_1")},
});
BOOST_CHECK(true);
msg = e.execute_cql("LIST ATTACHED SERVICE_LEVEL OF tester;").get0();
assert_that(msg).is_rows().with_rows({
});
BOOST_CHECK(true);
msg = e.execute_cql("LIST ALL ATTACHED SERVICE_LEVELS;").get0();
assert_that(msg).is_rows().with_rows({
{utf8_type->decompose("tester2"), utf8_type->decompose("sl_1")},
});
BOOST_CHECK(true);
//test implicit detach when removing role
e.execute_cql("DROP ROLE tester2;").get();
msg = e.execute_cql("LIST ALL ATTACHED SERVICE_LEVELS;").get0();
assert_that(msg).is_rows().with_rows({
});
BOOST_CHECK(true);
e.execute_cql("ATTACH SERVICE_LEVEL sl_1 TO tester;").get();
msg = e.execute_cql("LIST ALL ATTACHED SERVICE_LEVELS;").get0();
assert_that(msg).is_rows().with_rows({
{utf8_type->decompose("tester"), utf8_type->decompose("sl_1")},
});
BOOST_CHECK(true);
//test implicit detach when removing service level
e.execute_cql("DROP SERVICE_LEVEL sl_1;").get();
msg = e.execute_cql("LIST ALL ATTACHED SERVICE_LEVELS;").get0();
assert_that(msg).is_rows().with_rows({
});
});
}

View File

@@ -51,6 +51,7 @@
#include "test/lib/reader_permit.hh"
#include "db/query_context.hh"
#include "test/lib/test_services.hh"
#include "unit_test_service_levels_accessor.hh"
#include "db/view/view_builder.hh"
#include "db/view/node_view_update_backlog.hh"
#include "distributed_loader.hh"
@@ -123,6 +124,7 @@ private:
sharded<db::view::view_update_generator>& _view_update_generator;
sharded<service::migration_notifier>& _mnotifier;
sharded<cdc::generation_service>& _cdc_generation_service;
sharded<qos::service_level_controller>& _sl_controller;
private:
struct core_local_state {
service::client_state client_state;
@@ -143,7 +145,7 @@ private:
if (_db.local().has_keyspace(ks_name)) {
_core_local.local().client_state.set_keyspace(_db.local(), ks_name);
}
return ::make_shared<service::query_state>(_core_local.local().client_state, empty_service_permit());
return ::make_shared<service::query_state>(_core_local.local().client_state, empty_service_permit(), _sl_controller.local());
}
public:
single_node_cql_env(
@@ -154,7 +156,8 @@ public:
sharded<db::view::view_builder>& view_builder,
sharded<db::view::view_update_generator>& view_update_generator,
sharded<service::migration_notifier>& mnotifier,
sharded<cdc::generation_service>& cdc_generation_service)
sharded<cdc::generation_service>& cdc_generation_service,
sharded<qos::service_level_controller> &sl_controller)
: _feature_service(feature_service)
, _db(db)
, _qp(qp)
@@ -163,6 +166,7 @@ public:
, _view_update_generator(view_update_generator)
, _mnotifier(mnotifier)
, _cdc_generation_service(cdc_generation_service)
, _sl_controller(sl_controller)
{ }
virtual future<::shared_ptr<cql_transport::messages::result_message>> execute_cql(sstring_view text) override {
@@ -438,15 +442,27 @@ public:
mm_notif.start().get();
auto stop_mm_notify = defer([&mm_notif] { mm_notif.stop().get(); });
sharded<auth::service> auth_service;
set_abort_on_internal_error(true);
const gms::inet_address listen("127.0.0.1");
auto sys_dist_ks = seastar::sharded<db::system_distributed_keyspace>();
auto sl_controller = sharded<qos::service_level_controller>();
sl_controller.start(std::ref(auth_service), qos::service_level_options{}).get();
auto stop_sl_controller = defer([&sl_controller] { sl_controller.stop().get(); });
sl_controller.invoke_on_all(&qos::service_level_controller::start).get();
sl_controller.invoke_on_all([&sys_dist_ks, &sl_controller] (qos::service_level_controller& service) {
qos::service_level_controller::service_level_distributed_data_accessor_ptr service_level_data_accessor =
::static_pointer_cast<qos::service_level_controller::service_level_distributed_data_accessor>(
make_shared<qos::unit_test_service_levels_accessor>(sl_controller,sys_dist_ks));
return service.set_distributed_data_accessor(std::move(service_level_data_accessor));
}).get();
sharded<netw::messaging_service> ms;
// don't start listening so tests can be run in parallel
ms.start(listen, std::move(7000)).get();
auto stop_ms = defer([&ms] { ms.stop().get(); });
sharded<auth::service> auth_service;
// Normally the auth server is already stopped in here,
// but if there is an initialization failure we have to
// make sure to stop it now or ~sharded will assert.
@@ -454,7 +470,6 @@ public:
auth_service.stop().get();
});
auto sys_dist_ks = seastar::sharded<db::system_distributed_keyspace>();
auto stop_sys_dist_ks = defer([&sys_dist_ks] { sys_dist_ks.stop().get(); });
gms::feature_config fcfg = gms::feature_config_from_db_config(*cfg, cfg_in.disabled_features);
@@ -529,7 +544,7 @@ public:
sharded<cql3::query_processor> qp;
cql3::query_processor::memory_config qp_mcfg = {memory::stats().total_memory() / 256, memory::stats().total_memory() / 2560};
qp.start(std::ref(proxy), std::ref(db), std::ref(mm_notif), std::ref(mm), qp_mcfg, std::ref(cql_config)).get();
qp.start(std::ref(proxy), std::ref(db), std::ref(mm_notif), std::ref(mm), qp_mcfg, std::ref(cql_config), std::ref(sl_controller)).get();
auto stop_qp = defer([&qp] { qp.stop().get(); });
// In main.cc we call db::system_keyspace::setup which calls
@@ -643,7 +658,7 @@ public:
// The default user may already exist if this `cql_test_env` is starting with previously populated data.
}
single_node_cql_env env(feature_service, db, qp, auth_service, view_builder, view_update_generator, mm_notif, cdc_generation_service);
single_node_cql_env env(feature_service, db, qp, auth_service, view_builder, view_update_generator, mm_notif, cdc_generation_service, std::ref(sl_controller));
env.start().get();
auto stop_env = defer([&env] { env.stop().get(); });

View File

@@ -0,0 +1,62 @@
/*
* Copyright (C) 2021 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include "service/qos/service_level_controller.hh"
#include "service/qos/qos_common.hh"
#include "db/system_distributed_keyspace.hh"
#pragma once
namespace qos {
/**
* This class is a helper for unit testing. It implements the service level distributed
* accessor interface in order to be used in the unit testing environment. The advantage
* of this class over the standard implementation is that it makes sure that updates are
* Immediately propagated to the underlying service level controller.
*/
class unit_test_service_levels_accessor : public service_level_controller::service_level_distributed_data_accessor {
sharded<service_level_controller> &_sl_controller;
sharded<db::system_distributed_keyspace> &_sys_dist_ks;
public:
unit_test_service_levels_accessor(sharded<service_level_controller>& sl_controller, sharded<db::system_distributed_keyspace> &sys_dist_ks)
: _sl_controller(sl_controller)
, _sys_dist_ks(sys_dist_ks)
{}
virtual future<qos::service_levels_info> get_service_levels() const {
return _sys_dist_ks.local().get_service_levels();
}
virtual future<qos::service_levels_info> get_service_level(sstring service_level_name) const {
return _sys_dist_ks.local().get_service_level(service_level_name);
}
virtual future<> set_service_level(sstring service_level_name, qos::service_level_options slo) const {
return _sys_dist_ks.local().set_service_level(service_level_name, slo).then([this] () {
return _sl_controller.invoke_on_all(&service_level_controller::update_service_levels_from_distributed_data);
});
}
virtual future<> drop_service_level(sstring service_level_name) const {
return _sys_dist_ks.local().drop_service_level(service_level_name).then([this] () {
return _sl_controller.invoke_on_all(&service_level_controller::update_service_levels_from_distributed_data);
});
}
};
}

View File

@@ -33,14 +33,16 @@ namespace cql_transport {
static logging::logger logger("cql_server_controller");
controller::controller(distributed<database>& db, sharded<auth::service>& auth, sharded<service::migration_notifier>& mn, gms::gossiper& gossiper, sharded<cql3::query_processor>& qp, sharded<service::memory_limiter>& ml)
controller::controller(distributed<database>& db, sharded<auth::service>& auth, sharded<service::migration_notifier>& mn, gms::gossiper& gossiper, sharded<cql3::query_processor>& qp, sharded<service::memory_limiter>& ml,
sharded<qos::service_level_controller>& sl_controller)
: _ops_sem(1)
, _db(db)
, _auth_service(auth)
, _mnotifier(mn)
, _gossiper(gossiper)
, _qp(qp)
, _mem_limiter(ml) {
, _mem_limiter(ml)
, _sl_controller(sl_controller) {
}
future<> controller::start_server() {
@@ -147,7 +149,7 @@ future<> controller::do_start_server() {
}
}
cserver->start(std::ref(_qp), std::ref(_auth_service), std::ref(_mnotifier), std::ref(_db), std::ref(_mem_limiter), cql_server_config).get();
cserver->start(std::ref(_qp), std::ref(_auth_service), std::ref(_mnotifier), std::ref(_db), std::ref(_mem_limiter), cql_server_config, std::ref(_sl_controller)).get();
try {
parallel_for_each(configs, [cserver, keepalive](const listen_cfg & cfg) {

View File

@@ -24,6 +24,7 @@
#include <seastar/core/semaphore.hh>
#include <seastar/core/distributed.hh>
#include <seastar/core/future.hh>
#include "service/qos/service_level_controller.hh"
using namespace seastar;
@@ -50,13 +51,14 @@ class controller {
gms::gossiper& _gossiper;
sharded<cql3::query_processor>& _qp;
sharded<service::memory_limiter>& _mem_limiter;
sharded<qos::service_level_controller>& _sl_controller;
future<> set_cql_ready(bool ready);
future<> do_start_server();
future<> do_stop_server();
public:
controller(distributed<database>&, sharded<auth::service>&, sharded<service::migration_notifier>&, gms::gossiper&, sharded<cql3::query_processor>&, sharded<service::memory_limiter>&);
controller(distributed<database>&, sharded<auth::service>&, sharded<service::migration_notifier>&, gms::gossiper&, sharded<cql3::query_processor>&, sharded<service::memory_limiter>&, sharded<qos::service_level_controller>&);
future<> start_server();
future<> stop_server();
future<> stop();

View File

@@ -160,7 +160,7 @@ event::event_type parse_event_type(const sstring& value)
}
cql_server::cql_server(distributed<cql3::query_processor>& qp, auth::service& auth_service,
service::migration_notifier& mn, database& db, service::memory_limiter& ml, cql_server_config config)
service::migration_notifier& mn, database& db, service::memory_limiter& ml, cql_server_config config, qos::service_level_controller& sl_controller)
: _query_processor(qp)
, _config(config)
, _max_request_size(config.max_request_size)
@@ -168,6 +168,7 @@ cql_server::cql_server(distributed<cql3::query_processor>& qp, auth::service& au
, _memory_available(ml.get_semaphore())
, _notifier(std::make_unique<event_notifier>(mn))
, _auth_service(auth_service)
, _sl_controller(sl_controller)
{
namespace sm = seastar::metrics;
@@ -952,7 +953,7 @@ cql_server::connection::process_on_shard(unsigned shard, uint16_t stream, fragme
(bytes_ostream& linearization_buffer, service::client_state& client_state) mutable {
request_reader in(is, linearization_buffer);
return process_fn(client_state, server._query_processor, in, stream, _version, _cql_serialization_format,
/* FIXME */empty_service_permit(), std::move(trace_state), false).then([] (auto msg) {
/* FIXME */empty_service_permit(), std::move(trace_state), false, _server._sl_controller).then([] (auto msg) {
// result here has to be foreign ptr
return std::get<foreign_ptr<std::unique_ptr<cql_server::response>>>(std::move(msg));
});
@@ -967,7 +968,7 @@ cql_server::connection::process(uint16_t stream, request_reader in, service::cli
fragmented_temporary_buffer::istream is = in.get_stream();
return process_fn(client_state, _server._query_processor, in, stream,
_version, _cql_serialization_format, permit, trace_state, true)
_version, _cql_serialization_format, permit, trace_state, true, _server._sl_controller)
.then([stream, &client_state, this, is, permit, process_fn, trace_state]
(std::variant<foreign_ptr<std::unique_ptr<cql_server::response>>, unsigned> msg) mutable {
unsigned* shard = std::get_if<unsigned>(&msg);
@@ -981,9 +982,10 @@ cql_server::connection::process(uint16_t stream, request_reader in, service::cli
static future<std::variant<foreign_ptr<std::unique_ptr<cql_server::response>>, unsigned>>
process_query_internal(service::client_state& client_state, distributed<cql3::query_processor>& qp, request_reader in,
uint16_t stream, cql_protocol_version_type version, cql_serialization_format serialization_format,
service_permit permit, tracing::trace_state_ptr trace_state, bool init_trace) {
service_permit permit, tracing::trace_state_ptr trace_state, bool init_trace,
qos::service_level_controller& sl_controller) {
auto query = in.read_long_string_view();
auto q_state = std::make_unique<cql_query_state>(client_state, trace_state, std::move(permit));
auto q_state = std::make_unique<cql_query_state>(client_state, trace_state, std::move(permit), sl_controller);
auto& query_state = q_state->query_state;
q_state->options = in.read_options(version, serialization_format, qp.local().get_cql_config());
auto& options = *q_state->options;
@@ -1048,7 +1050,8 @@ future<std::unique_ptr<cql_server::response>> cql_server::connection::process_pr
static future<std::variant<foreign_ptr<std::unique_ptr<cql_server::response>>, unsigned>>
process_execute_internal(service::client_state& client_state, distributed<cql3::query_processor>& qp, request_reader in,
uint16_t stream, cql_protocol_version_type version, cql_serialization_format serialization_format,
service_permit permit, tracing::trace_state_ptr trace_state, bool init_trace) {
service_permit permit, tracing::trace_state_ptr trace_state, bool init_trace,
qos::service_level_controller& sl_controller) {
cql3::prepared_cache_key_type cache_key(in.read_short_bytes());
auto& id = cql3::prepared_cache_key_type::cql_id(cache_key);
bool needs_authorization = false;
@@ -1065,7 +1068,7 @@ process_execute_internal(service::client_state& client_state, distributed<cql3::
throw exceptions::prepared_query_not_found_exception(id);
}
auto q_state = std::make_unique<cql_query_state>(client_state, trace_state, std::move(permit));
auto q_state = std::make_unique<cql_query_state>(client_state, trace_state, std::move(permit), sl_controller);
auto& query_state = q_state->query_state;
if (version == 1) {
std::vector<cql3::raw_value_view> values;
@@ -1127,7 +1130,8 @@ future<foreign_ptr<std::unique_ptr<cql_server::response>>> cql_server::connectio
static future<std::variant<foreign_ptr<std::unique_ptr<cql_server::response>>, unsigned>>
process_batch_internal(service::client_state& client_state, distributed<cql3::query_processor>& qp, request_reader in,
uint16_t stream, cql_protocol_version_type version, cql_serialization_format serialization_format,
service_permit permit, tracing::trace_state_ptr trace_state, bool init_trace) {
service_permit permit, tracing::trace_state_ptr trace_state, bool init_trace,
qos::service_level_controller& sl_controller) {
if (version == 1) {
throw exceptions::protocol_exception("BATCH messages are not support in version 1 of the protocol");
}
@@ -1212,7 +1216,7 @@ process_batch_internal(service::client_state& client_state, distributed<cql3::qu
values.emplace_back(std::move(tmp));
}
auto q_state = std::make_unique<cql_query_state>(client_state, trace_state, std::move(permit));
auto q_state = std::make_unique<cql_query_state>(client_state, trace_state, std::move(permit), sl_controller);
auto& query_state = q_state->query_state;
// #563. CQL v2 encodes query_options in v1 format for batch requests.
q_state->options = std::make_unique<cql3::query_options>(cql3::query_options::make_batch_options(std::move(*in.read_options(version < 3 ? 1 : version, serialization_format,

View File

@@ -40,6 +40,7 @@
#include "service_permit.hh"
#include <seastar/core/sharded.hh>
#include "utils/updateable_value.hh"
#include "service/qos/service_level_controller.hh"
namespace scollectd {
@@ -102,8 +103,8 @@ struct cql_query_state {
service::query_state query_state;
std::unique_ptr<cql3::query_options> options;
cql_query_state(service::client_state& client_state, tracing::trace_state_ptr trace_state_ptr, service_permit permit)
: query_state(client_state, std::move(trace_state_ptr), std::move(permit))
cql_query_state(service::client_state& client_state, tracing::trace_state_ptr trace_state_ptr, service_permit permit, qos::service_level_controller& sl_controller)
: query_state(client_state, std::move(trace_state_ptr), std::move(permit), sl_controller)
{ }
};
@@ -157,10 +158,12 @@ private:
private:
transport_stats _stats = {};
auth::service& _auth_service;
qos::service_level_controller& _sl_controller;
public:
cql_server(distributed<cql3::query_processor>& qp, auth::service&,
service::migration_notifier& mn, database& db, service::memory_limiter& ml,
cql_server_config config);
cql_server_config config,
qos::service_level_controller& sl_controller);
future<> listen(socket_address addr, std::shared_ptr<seastar::tls::credentials_builder> = {}, bool is_shard_aware = false, bool keepalive = false);
future<> do_accepts(int which, bool keepalive, socket_address server_addr);
future<> stop();