Merge: move rest of internal ddl users to use raft from Gleb

The patch series moves the rest of internal ddl users to do schema
change over raft (if enabled). After that series only tests are left
using old API.

* 'gleb/raft-schema-rest-v6' of github.com:scylladb/scylla-dev: (33 commits)
  migration_manager: drop no longer used functions
  system_distributed_keyspace: move schema creation code to use raft
  auth: move table creation code to use raft
  auth: move keyspace creation code to use raft
  table_helper: move schema creation code to use raft
  cql3: make query_processor inherit from peering_sharded_service
  table_helper: make setup_table() static
  table_helper: co-routinize setup_keyspace()
  redis: move schema creation code to go through raft
  thrift: move system_update_column_family() to raft
  thrift: authenticate a statement before verifying in system_update_column_family()
  thrift: co-routinize system_update_column_family()
  thrift: move system_update_keyspace() to raft
  thrift: authenticate a statement before verifying in system_update_keyspace()
  thrift: co-routinize system_update_keyspace()
  thrift: move system_drop_keyspace() to raft
  thrift: authenticate a statement before verifying in system_drop_keyspace()
  thrift: co-routinize system_drop_keyspace()
  thrift: move system_add_keyspace() to raft
  thrift: co-routinize system_add_keyspace()
  ...
This commit is contained in:
Nadav Har'El
2022-01-12 17:56:25 +02:00
16 changed files with 436 additions and 300 deletions

View File

@@ -61,11 +61,14 @@
#include <boost/range/algorithm/find_end.hpp>
#include "service/storage_proxy.hh"
#include "gms/gossiper.hh"
#include "schema_registry.hh"
logging::logger elogger("alternator-executor");
namespace alternator {
static future<std::vector<mutation>> create_keyspace(std::string_view keyspace_name, service::migration_manager& mm, gms::gossiper& gossiper);
static map_type attrs_type() {
static thread_local auto t = map_type_impl::get_instance(utf8_type, bytes_type, true);
return t;
@@ -116,13 +119,13 @@ std::string json_string::to_json() const {
return _value;
}
void executor::supplement_table_info(rjson::value& descr, const schema& schema) const {
void executor::supplement_table_info(rjson::value& descr, const schema& schema, service::storage_proxy& sp) {
rjson::add(descr, "CreationDateTime", rjson::value(std::chrono::duration_cast<std::chrono::seconds>(gc_clock::now().time_since_epoch()).count()));
rjson::add(descr, "TableStatus", "ACTIVE");
auto schema_id_str = schema.id().to_sstring();
rjson::add(descr, "TableId", rjson::from_string(schema_id_str));
executor::supplement_table_stream_info(descr, schema);
executor::supplement_table_stream_info(descr, schema, sp);
}
// We would have liked to support table names up to 255 bytes, like DynamoDB.
@@ -483,7 +486,7 @@ future<executor::request_return_type> executor::describe_table(client_state& cli
}
rjson::add(table_description, "AttributeDefinitions", std::move(attribute_definitions));
supplement_table_stream_info(table_description, *schema);
supplement_table_stream_info(table_description, *schema, _proxy);
// FIXME: still missing some response fields (issue #5026)
@@ -500,23 +503,31 @@ future<executor::request_return_type> executor::delete_table(client_state& clien
std::string table_name = get_table_name(request);
std::string keyspace_name = executor::KEYSPACE_NAME_PREFIX + table_name;
tracing::add_table_name(trace_state, keyspace_name, table_name);
auto& p = _proxy.container();
if (!_proxy.get_db().local().has_schema(keyspace_name, table_name)) {
return make_ready_future<request_return_type>(api_error::resource_not_found(
format("Requested resource not found: Table: {} not found", table_name)));
}
return _mm.announce_column_family_drop(keyspace_name, table_name, service::migration_manager::drop_views::yes).then([this, keyspace_name] {
return _mm.announce_keyspace_drop(keyspace_name);
}).then([table_name = std::move(table_name)] {
// FIXME: need more attributes?
rjson::value table_description = rjson::empty_object();
rjson::add(table_description, "TableName", rjson::from_string(table_name));
rjson::add(table_description, "TableStatus", "DELETING");
rjson::value response = rjson::empty_object();
rjson::add(response, "TableDescription", std::move(table_description));
elogger.trace("returning {}", response);
return make_ready_future<executor::request_return_type>(make_jsonable(std::move(response)));
co_await _mm.container().invoke_on(0, [&] (service::migration_manager& mm) -> future<> {
co_await mm.schema_read_barrier();
if (!p.local().get_db().local().has_schema(keyspace_name, table_name)) {
throw api_error::resource_not_found(format("Requested resource not found: Table: {} not found", table_name));
}
auto m = co_await mm.prepare_column_family_drop_announcement(keyspace_name, table_name, service::migration_manager::drop_views::yes);
auto m2 = mm.prepare_keyspace_drop_announcement(keyspace_name);
std::move(m2.begin(), m2.end(), std::back_inserter(m));
co_await mm.announce(std::move(m));
});
// FIXME: need more attributes?
rjson::value table_description = rjson::empty_object();
rjson::add(table_description, "TableName", rjson::from_string(table_name));
rjson::add(table_description, "TableStatus", "DELETING");
rjson::value response = rjson::empty_object();
rjson::add(response, "TableDescription", std::move(table_description));
elogger.trace("returning {}", response);
co_return make_jsonable(std::move(response));
}
static data_type parse_key_type(const std::string& type) {
@@ -750,9 +761,16 @@ static void update_tags_map(const rjson::value& tags, std::map<sstring, sstring>
// to races during concurrent updates of the same table. Once Scylla schema updates
// are fixed, this issue will automatically get fixed as well.
future<> update_tags(service::migration_manager& mm, schema_ptr schema, std::map<sstring, sstring>&& tags_map) {
schema_builder builder(schema);
builder.add_extension(tags_extension::NAME, ::make_shared<tags_extension>(std::move(tags_map)));
return mm.announce_column_family_update(builder.build(), false, std::nullopt);
co_await mm.container().invoke_on(0, [s = global_schema_ptr(std::move(schema)), tags_map = std::move(tags_map)] (service::migration_manager& mm) -> future<> {
co_await mm.schema_read_barrier();
schema_builder builder(s);
builder.add_extension(tags_extension::NAME, ::make_shared<tags_extension>(tags_map));
auto m = co_await mm.prepare_column_family_update_announcement(builder.build(), false, std::vector<view_ptr>(), std::nullopt);
co_await mm.announce(std::move(m));
});
}
future<executor::request_return_type> executor::tag_resource(client_state& client_state, service_permit permit, rjson::value request) {
@@ -847,13 +865,12 @@ static void verify_billing_mode(const rjson::value& request) {
}
}
future<executor::request_return_type> executor::create_table(client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit, rjson::value request) {
_stats.api_operations.create_table++;
elogger.trace("Creating table {}", request);
static future<executor::request_return_type> create_table_on_shard0(tracing::trace_state_ptr trace_state, rjson::value request, service::storage_proxy& sp, service::migration_manager& mm, gms::gossiper& gossiper) {
assert(this_shard_id() == 0);
std::string table_name = get_table_name(request);
if (table_name.find(INTERNAL_TABLE_PREFIX) == 0) {
return make_ready_future<request_return_type>(api_error::validation(
format("Prefix {} is reserved for accessing internal tables", INTERNAL_TABLE_PREFIX)));
if (table_name.find(executor::INTERNAL_TABLE_PREFIX) == 0) {
co_return api_error::validation(format("Prefix {} is reserved for accessing internal tables", executor::INTERNAL_TABLE_PREFIX));
}
std::string keyspace_name = executor::KEYSPACE_NAME_PREFIX + table_name;
const rjson::value& attribute_definitions = request["AttributeDefinitions"];
@@ -866,10 +883,12 @@ future<executor::request_return_type> executor::create_table(client_state& clien
if (!range_key.empty()) {
add_column(builder, range_key, attribute_definitions, column_kind::clustering_key);
}
builder.with_column(bytes(ATTRS_COLUMN_NAME), attrs_type(), column_kind::regular_column);
builder.with_column(bytes(executor::ATTRS_COLUMN_NAME), attrs_type(), column_kind::regular_column);
verify_billing_mode(request);
co_await mm.schema_read_barrier();
schema_ptr partial_schema = builder.build();
// Parse GlobalSecondaryIndexes parameters before creating the base
@@ -880,12 +899,12 @@ future<executor::request_return_type> executor::create_table(client_state& clien
std::vector<sstring> where_clauses;
if (gsi) {
if (!gsi->IsArray()) {
return make_ready_future<request_return_type>(api_error::validation("GlobalSecondaryIndexes must be an array."));
co_return api_error::validation("GlobalSecondaryIndexes must be an array.");
}
for (const rjson::value& g : gsi->GetArray()) {
const rjson::value* index_name = rjson::find(g, "IndexName");
if (!index_name || !index_name->IsString()) {
return make_ready_future<request_return_type>(api_error::validation("GlobalSecondaryIndexes IndexName must be a string."));
co_return api_error::validation("GlobalSecondaryIndexes IndexName must be a string.");
}
std::string vname(view_name(table_name, index_name->GetString()));
elogger.trace("Adding GSI {}", index_name->GetString());
@@ -943,23 +962,21 @@ future<executor::request_return_type> executor::create_table(client_state& clien
std::string vname(lsi_name(table_name, index_name->GetString()));
elogger.trace("Adding LSI {}", index_name->GetString());
if (range_key.empty()) {
return make_ready_future<request_return_type>(api_error::validation(
"LocalSecondaryIndex requires that the base table have a range key"));
co_return api_error::validation("LocalSecondaryIndex requires that the base table have a range key");
}
// FIXME: read and handle "Projection" parameter. This will
// require the MV code to copy just parts of the attrs map.
schema_builder view_builder(keyspace_name, vname);
auto [view_hash_key, view_range_key] = parse_key_schema(l);
if (view_hash_key != hash_key) {
return make_ready_future<request_return_type>(api_error::validation(
"LocalSecondaryIndex hash key must match the base table hash key"));
co_return api_error::validation("LocalSecondaryIndex hash key must match the base table hash key");
}
add_column(view_builder, view_hash_key, attribute_definitions, column_kind::partition_key);
if (view_range_key.empty()) {
return make_ready_future<request_return_type>(api_error::validation("LocalSecondaryIndex must specify a sort key"));
co_return api_error::validation("LocalSecondaryIndex must specify a sort key");
}
if (view_range_key == hash_key) {
return make_ready_future<request_return_type>(api_error::validation("LocalSecondaryIndex sort key cannot be the same as hash key"));
co_return api_error::validation("LocalSecondaryIndex sort key cannot be the same as hash key");
}
if (view_range_key != range_key) {
add_column(builder, view_range_key, attribute_definitions, column_kind::regular_column);
@@ -971,7 +988,7 @@ future<executor::request_return_type> executor::create_table(client_state& clien
if (!range_key.empty() && view_range_key != range_key) {
add_column(view_builder, range_key, attribute_definitions, column_kind::clustering_key);
}
view_builder.with_column(bytes(ATTRS_COLUMN_NAME), attrs_type(), column_kind::regular_column);
view_builder.with_column(bytes(executor::ATTRS_COLUMN_NAME), attrs_type(), column_kind::regular_column);
// Note above we don't need to add virtual columns, as all
// base columns were copied to view. TODO: reconsider the need
// for virtual columns when we support Projection.
@@ -991,17 +1008,17 @@ future<executor::request_return_type> executor::create_table(client_state& clien
if (sse_specification && sse_specification->IsObject()) {
rjson::value* enabled = rjson::find(*sse_specification, "Enabled");
if (!enabled || !enabled->IsBool()) {
return make_ready_future<request_return_type>(api_error("ValidationException", "SSESpecification needs boolean Enabled"));
co_return api_error("ValidationException", "SSESpecification needs boolean Enabled");
}
if (enabled->GetBool()) {
// TODO: full support for SSESpecification
return make_ready_future<request_return_type>(api_error("ValidationException", "SSESpecification: configuring encryption-at-rest is not yet supported."));
co_return api_error("ValidationException", "SSESpecification: configuring encryption-at-rest is not yet supported.");
}
}
rjson::value* stream_specification = rjson::find(request, "StreamSpecification");
if (stream_specification && stream_specification->IsObject()) {
add_stream_options(*stream_specification, builder);
executor::add_stream_options(*stream_specification, builder, sp);
}
// Parse the "Tags" parameter early, so we can avoid creating the table
@@ -1030,30 +1047,52 @@ future<executor::request_return_type> executor::create_table(client_state& clien
++where_clause_it;
}
return create_keyspace(keyspace_name).handle_exception_type([] (exceptions::already_exists_exception&) {
// Ignore the fact that the keyspace may already exist. See discussion in #6340
}).then([this, table_name, request = std::move(request), schema, view_builders = std::move(view_builders), tags_map = std::move(tags_map)] () mutable {
return futurize_invoke([&] { return _mm.announce_new_column_family(schema); }).then([this, table_info = std::move(request), schema, view_builders = std::move(view_builders), tags_map = std::move(tags_map)] () mutable {
return parallel_for_each(std::move(view_builders), [this, schema] (schema_builder builder) {
return _mm.announce_new_view(view_ptr(builder.build()));
}).then([this, table_info = std::move(table_info), schema, tags_map = std::move(tags_map)] () mutable {
future<> f = make_ready_future<>();
if (!tags_map.empty()) {
f = update_tags(_mm, schema, std::move(tags_map));
}
return f.then([this] {
return wait_for_schema_agreement(_mm, db::timeout_clock::now() + 10s);
}).then([this, table_info = std::move(table_info), schema] () mutable {
rjson::value status = rjson::empty_object();
supplement_table_info(table_info, *schema);
rjson::add(status, "TableDescription", std::move(table_info));
return make_ready_future<executor::request_return_type>(make_jsonable(std::move(status)));
});
});
}).handle_exception_type([table_name = std::move(table_name)] (exceptions::already_exists_exception&) {
return make_exception_future<executor::request_return_type>(
api_error::resource_in_use(format("Table {} already exists", table_name)));
try {
co_await mm.announce(co_await create_keyspace(keyspace_name, mm, gossiper));
} catch (exceptions::already_exists_exception&) {
// Ignore the fact that the keyspace may already exist. See discussion in #6340
}
try {
// The code should be rewritten in a way that allows creating mutations
// for all the changes in a single mutation array before announcing.
// See https://github.com/scylladb/scylla/issues/9868
co_await mm.announce(co_await mm.prepare_new_column_family_announcement(schema));
std::vector<mutation> m;
co_await parallel_for_each(std::move(view_builders), [&mm, schema, &m] (schema_builder builder) -> future<> {
auto vm = co_await mm.prepare_new_view_announcement(view_ptr(builder.build()));
std::move(vm.begin(), vm.end(), std::back_inserter(m));
});
co_await mm.announce(std::move(m));
if (!tags_map.empty()) {
schema_builder builder(schema);
builder.add_extension(tags_extension::NAME, ::make_shared<tags_extension>(tags_map));
co_await mm.announce(co_await mm.prepare_column_family_update_announcement(builder.build(), false, std::vector<view_ptr>(), std::nullopt));
}
co_await wait_for_schema_agreement(mm, db::timeout_clock::now() + 10s);
rjson::value status = rjson::empty_object();
executor::supplement_table_info(request, *schema, sp);
rjson::add(status, "TableDescription", std::move(request));
co_return make_jsonable(std::move(status));
} catch(exceptions::already_exists_exception&) {
co_return api_error::resource_in_use(format("Table {} already exists", table_name));
}
}
future<executor::request_return_type> executor::create_table(client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit, rjson::value request) {
_stats.api_operations.create_table++;
elogger.trace("Creating table {}", request);
co_return co_await _mm.container().invoke_on(0, [&, tr = tracing::global_trace_state_ptr(trace_state), request = std::move(request), &sp = _proxy.container(), &g = _gossiper.container()]
(service::migration_manager& mm) mutable -> future<executor::request_return_type> {
co_return co_await create_table_on_shard0(tr, std::move(request), sp.local(), mm, g.local());
});
}
@@ -1061,22 +1100,6 @@ future<executor::request_return_type> executor::update_table(client_state& clien
_stats.api_operations.update_table++;
elogger.trace("Updating table {}", request);
schema_ptr tab = get_table(_proxy, request);
// the ugly but harmless conversion to string_view here is because
// Seastar's sstring is missing a find(std::string_view) :-()
if (std::string_view(tab->cf_name()).find(INTERNAL_TABLE_PREFIX) == 0) {
return make_ready_future<request_return_type>(api_error::validation(
format("Prefix {} is reserved for accessing internal tables", INTERNAL_TABLE_PREFIX)));
}
tracing::add_table_name(trace_state, tab->ks_name(), tab->cf_name());
schema_builder builder(tab);
rjson::value* stream_specification = rjson::find(request, "StreamSpecification");
if (stream_specification && stream_specification->IsObject()) {
add_stream_options(*stream_specification, builder);
}
static const std::vector<sstring> unsupported = {
"AttributeDefinitions",
"GlobalSecondaryIndexUpdates",
@@ -1087,7 +1110,7 @@ future<executor::request_return_type> executor::update_table(client_state& clien
for (auto& s : unsupported) {
if (rjson::find(request, s)) {
throw api_error::validation(s + " not supported");
co_return coroutine::make_exception(api_error::validation(s + " not supported"));
}
}
@@ -1095,15 +1118,39 @@ future<executor::request_return_type> executor::update_table(client_state& clien
verify_billing_mode(request);
}
auto schema = builder.build();
co_return co_await _mm.container().invoke_on(0, [&p = _proxy.container(), request = std::move(request), gt = tracing::global_trace_state_ptr(std::move(trace_state))]
(service::migration_manager& mm) mutable -> future<executor::request_return_type> {
co_await mm.schema_read_barrier();
schema_ptr tab = get_table(p.local(), request);
tracing::add_table_name(gt, tab->ks_name(), tab->cf_name());
// the ugly but harmless conversion to string_view here is because
// Seastar's sstring is missing a find(std::string_view) :-()
if (std::string_view(tab->cf_name()).find(INTERNAL_TABLE_PREFIX) == 0) {
co_return coroutine::make_exception(api_error::validation(format("Prefix {} is reserved for accessing internal tables", INTERNAL_TABLE_PREFIX)));
}
schema_builder builder(tab);
rjson::value* stream_specification = rjson::find(request, "StreamSpecification");
if (stream_specification && stream_specification->IsObject()) {
add_stream_options(*stream_specification, builder, p.local());
}
auto schema = builder.build();
auto m = co_await mm.prepare_column_family_update_announcement(schema, false, std::vector<view_ptr>(), std::nullopt);
co_await mm.announce(std::move(m));
co_await wait_for_schema_agreement(mm, db::timeout_clock::now() + 10s);
return _mm.announce_column_family_update(schema, false, std::nullopt).then([this] {
return wait_for_schema_agreement(_mm, db::timeout_clock::now() + 10s);
}).then([this, table_info = std::move(request), schema] () mutable {
rjson::value status = rjson::empty_object();
supplement_table_info(table_info, *schema);
rjson::add(status, "TableDescription", std::move(table_info));
return make_ready_future<executor::request_return_type>(make_jsonable(std::move(status)));
supplement_table_info(request, *schema, p.local());
rjson::add(status, "TableDescription", std::move(request));
co_return make_jsonable(std::move(status));
});
}
@@ -4069,19 +4116,19 @@ static std::map<sstring, sstring> get_network_topology_options(gms::gossiper& go
// of nodes in the cluster: A cluster with 3 or more live nodes, gets RF=3.
// A smaller cluster (presumably, a test only), gets RF=1. The user may
// manually create the keyspace to override this predefined behavior.
future<> executor::create_keyspace(std::string_view keyspace_name) {
static future<std::vector<mutation>> create_keyspace(std::string_view keyspace_name, service::migration_manager& mm, gms::gossiper& gossiper) {
sstring keyspace_name_str(keyspace_name);
return gms::get_all_endpoint_count(_gossiper).then([this, keyspace_name_str = std::move(keyspace_name_str)] (int endpoint_count) {
int rf = 3;
if (endpoint_count < rf) {
rf = 1;
elogger.warn("Creating keyspace '{}' for Alternator with unsafe RF={} because cluster only has {} nodes.",
keyspace_name_str, rf, endpoint_count);
}
auto opts = get_network_topology_options(_gossiper, rf);
auto ksm = keyspace_metadata::new_keyspace(keyspace_name_str, "org.apache.cassandra.locator.NetworkTopologyStrategy", std::move(opts), true);
return _mm.announce_new_keyspace(ksm, api::new_timestamp());
});
int endpoint_count = co_await gms::get_all_endpoint_count(gossiper);
int rf = 3;
if (endpoint_count < rf) {
rf = 1;
elogger.warn("Creating keyspace '{}' for Alternator with unsafe RF={} because cluster only has {} nodes.",
keyspace_name_str, rf, endpoint_count);
}
auto opts = get_network_topology_options(gossiper, rf);
auto ksm = keyspace_metadata::new_keyspace(keyspace_name_str, "org.apache.cassandra.locator.NetworkTopologyStrategy", std::move(opts), true);
co_return mm.prepare_new_keyspace_announcement(ksm);
}
future<> executor::start() {

View File

@@ -208,8 +208,6 @@ public:
future<> start();
future<> stop() { return make_ready_future<>(); }
future<> create_keyspace(std::string_view keyspace_name);
static sstring table_name(const schema&);
static db::timeout_clock::time_point default_timeout();
static void set_default_timeout(db::timeout_clock::duration timeout);
@@ -239,9 +237,9 @@ public:
rjson::value&,
bool = false);
void add_stream_options(const rjson::value& stream_spec, schema_builder&) const;
void supplement_table_info(rjson::value& descr, const schema& schema) const;
void supplement_table_stream_info(rjson::value& descr, const schema& schema) const;
static void add_stream_options(const rjson::value& stream_spec, schema_builder&, service::storage_proxy& sp);
static void supplement_table_info(rjson::value& descr, const schema& schema, service::storage_proxy& sp);
static void supplement_table_stream_info(rjson::value& descr, const schema& schema, service::storage_proxy& sp);
};
}

View File

@@ -1050,14 +1050,14 @@ future<executor::request_return_type> executor::get_records(client_state& client
});
}
void executor::add_stream_options(const rjson::value& stream_specification, schema_builder& builder) const {
void executor::add_stream_options(const rjson::value& stream_specification, schema_builder& builder, service::storage_proxy& sp) {
auto stream_enabled = rjson::find(stream_specification, "StreamEnabled");
if (!stream_enabled || !stream_enabled->IsBool()) {
throw api_error::validation("StreamSpecification needs boolean StreamEnabled");
}
if (stream_enabled->GetBool()) {
auto& db = _proxy.get_db().local();
auto& db = sp.get_db().local();
if (!db.features().cluster_supports_cdc()) {
throw api_error::validation("StreamSpecification: streams (CDC) feature not enabled in cluster.");
@@ -1094,10 +1094,10 @@ void executor::add_stream_options(const rjson::value& stream_specification, sche
}
}
void executor::supplement_table_stream_info(rjson::value& descr, const schema& schema) const {
void executor::supplement_table_stream_info(rjson::value& descr, const schema& schema, service::storage_proxy& sp) {
auto& opts = schema.cdc_options();
if (opts.enabled()) {
auto& db = _proxy.get_db().local();
auto& db = sp.get_db().local();
auto& cf = db.find_column_family(schema.ks_name(), cdc::log_name(schema.cf_name()));
stream_arn arn(cf.schema()->id());
rjson::add(descr, "LatestStreamArn", arn);

View File

@@ -19,6 +19,7 @@
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include <seastar/core/coroutine.hh>
#include "auth/common.hh"
#include <seastar/core/shared_ptr.hh>
@@ -61,9 +62,8 @@ static future<> create_metadata_table_if_missing_impl(
cql3::query_processor& qp,
std::string_view cql,
::service::migration_manager& mm) {
static auto ignore_existing = [] (seastar::noncopyable_function<future<>()> func) {
return futurize_invoke(std::move(func)).handle_exception_type([] (exceptions::already_exists_exception& ignored) { });
};
assert(this_shard_id() == 0); // once_among_shards makes sure a function is executed on shard 0 only
auto db = qp.db();
auto parsed_statement = cql3::query_processor::parse_statement(cql);
auto& parsed_cf_statement = static_cast<cql3::statements::raw::cf_statement&>(*parsed_statement);
@@ -79,9 +79,13 @@ static future<> create_metadata_table_if_missing_impl(
schema_builder b(schema);
b.set_uuid(uuid);
schema_ptr table = b.build();
return ignore_existing([&mm, table = std::move(table)] () {
return mm.announce_new_column_family(table);
});
if (!db.has_schema(table->ks_name(), table->cf_name())) {
co_await mm.schema_read_barrier();
try {
co_return co_await mm.announce(co_await mm.prepare_new_column_family_announcement(table));
} catch (exceptions::already_exists_exception&) {}
}
}
future<> create_metadata_table_if_missing(

View File

@@ -19,6 +19,7 @@
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include <seastar/core/coroutine.hh>
#include "auth/service.hh"
#include <algorithm>
@@ -41,6 +42,7 @@
#include "utils/class_registrator.hh"
#include "locator/abstract_replication_strategy.hh"
#include "data_dictionary/keyspace_metadata.hh"
#include "mutation.hh"
namespace auth {
@@ -142,23 +144,24 @@ service::service(
}
future<> service::create_keyspace_if_missing(::service::migration_manager& mm) const {
assert(this_shard_id() == 0); // once_among_shards makes sure a function is executed on shard 0 only
auto db = _qp.db();
if (!db.has_keyspace(meta::AUTH_KS)) {
locator::replication_strategy_config_options opts{{"replication_factor", "1"}};
co_await mm.schema_read_barrier();
auto ksm = data_dictionary::keyspace_metadata::new_keyspace(
meta::AUTH_KS,
"org.apache.cassandra.locator.SimpleStrategy",
opts,
true);
if (!db.has_keyspace(meta::AUTH_KS)) {
locator::replication_strategy_config_options opts{{"replication_factor", "1"}};
// We use min_timestamp so that default keyspace metadata will loose with any manual adjustments.
// See issue #2129.
return mm.announce_new_keyspace(ksm, api::min_timestamp);
auto ksm = data_dictionary::keyspace_metadata::new_keyspace(
meta::AUTH_KS,
"org.apache.cassandra.locator.SimpleStrategy",
opts,
true);
co_return co_await mm.announce(mm.prepare_new_keyspace_announcement(ksm));
}
}
return make_ready_future<>();
}
future<> service::start(::service::migration_manager& mm) {

View File

@@ -108,7 +108,7 @@ class cql_config;
class query_options;
class cql_statement;
class query_processor {
class query_processor : public seastar::peering_sharded_service<query_processor> {
public:
class migration_subscriber;
struct memory_config {

View File

@@ -120,7 +120,7 @@ future<std::pair<::shared_ptr<cql_transport::event::schema_change>, std::vector<
std::vector<mutation> m;
try {
m = qp.get_migration_manager().prepare_new_keyspace_announcement(_attrs->as_ks_metadata(_name, tm), api::new_timestamp());
m = qp.get_migration_manager().prepare_new_keyspace_announcement(_attrs->as_ks_metadata(_name, tm));
ret = ::make_shared<event::schema_change>(
event::schema_change::change_type::CREATED,

View File

@@ -200,11 +200,33 @@ system_distributed_keyspace::system_distributed_keyspace(cql3::query_processor&
, _sp(sp) {
}
static thread_local std::pair<std::string_view, data_type> new_columns[] {
{"timeout", duration_type},
{"workload_type", utf8_type}
};
static bool has_missing_columns(data_dictionary::database db) noexcept {
assert(this_shard_id() == 0);
try {
auto schema = db.find_schema(system_distributed_keyspace::NAME, system_distributed_keyspace::SERVICE_LEVELS);
for (const auto& col : new_columns) {
auto& [col_name, col_type] = col;
bytes options_name = to_bytes(col_name.data());
if (schema->get_column_definition(options_name)) {
continue;
}
return true;
}
} catch (...) {
dlogger.warn("Failed to update options column in the role attributes table: {}", std::current_exception());
return true;
}
return false;
}
static future<> add_new_columns_if_missing(replica::database& db, ::service::migration_manager& mm) noexcept {
static thread_local std::pair<std::string_view, data_type> new_columns[] {
{"timeout", duration_type},
{"workload_type", utf8_type}
};
assert(this_shard_id() == 0);
try {
auto schema = db.find_schema(system_distributed_keyspace::NAME, system_distributed_keyspace::SERVICE_LEVELS);
schema_builder b(schema);
@@ -218,14 +240,14 @@ static future<> add_new_columns_if_missing(replica::database& db, ::service::mig
updated = true;
b.with_column(options_name, col_type, column_kind::regular_column);
}
if (!updated) {
return make_ready_future<>();
if (updated) {
schema_ptr table = b.build();
try {
co_return co_await mm.announce(co_await mm.prepare_column_family_update_announcement(table, false, std::vector<view_ptr>(), api::timestamp_type(1)));
} catch (...) {}
}
schema_ptr table = b.build();
return mm.announce_column_family_update(table, false, api::timestamp_type(1)).handle_exception([] (const std::exception_ptr&) {});
} catch (...) {
dlogger.warn("Failed to update options column in the role attributes table: {}", std::current_exception());
return make_ready_future<>();
}
}
@@ -235,38 +257,69 @@ future<> system_distributed_keyspace::start() {
co_return;
}
static auto ignore_existing = [] (seastar::noncopyable_function<future<>()> func) {
return futurize_invoke(std::move(func)).handle_exception_type([] (exceptions::already_exists_exception& ignored) { });
};
if (!_sp.get_db().local().has_keyspace(NAME)) {
co_await _mm.schema_read_barrier();
// We use min_timestamp so that the default keyspace metadata will lose with any manual adjustments.
// See issue #2129.
co_await ignore_existing([this] {
auto ksm = keyspace_metadata::new_keyspace(
NAME,
"org.apache.cassandra.locator.SimpleStrategy",
{{"replication_factor", "3"}},
true /* durable_writes */);
return _mm.announce_new_keyspace(ksm, api::min_timestamp);
try {
auto ksm = keyspace_metadata::new_keyspace(
NAME,
"org.apache.cassandra.locator.SimpleStrategy",
{{"replication_factor", "3"}},
true /* durable_writes */);
co_await _mm.announce(_mm.prepare_new_keyspace_announcement(ksm));
} catch (exceptions::already_exists_exception&) {}
} else {
dlogger.info("{} keyspase is already present. Not creating", NAME);
}
if (!_sp.get_db().local().has_keyspace(NAME_EVERYWHERE)) {
co_await _mm.schema_read_barrier();
try {
auto ksm = keyspace_metadata::new_keyspace(
NAME_EVERYWHERE,
"org.apache.cassandra.locator.EverywhereStrategy",
{},
true /* durable_writes */);
co_await _mm.announce(_mm.prepare_new_keyspace_announcement(ksm));
} catch (exceptions::already_exists_exception&) {}
} else {
dlogger.info("{} keyspase is already present. Not creating", NAME_EVERYWHERE);
}
auto tables = ensured_tables();
bool exist = std::all_of(tables.begin(), tables.end(), [this] (schema_ptr s) {
return _sp.get_db().local().has_schema(s->ks_name(), s->cf_name());
});
co_await ignore_existing([this] {
auto ksm = keyspace_metadata::new_keyspace(
NAME_EVERYWHERE,
"org.apache.cassandra.locator.EverywhereStrategy",
{},
true /* durable_writes */);
return _mm.announce_new_keyspace(ksm, api::min_timestamp);
});
if (!exist) {
co_await _mm.schema_read_barrier();
for (auto&& table : ensured_tables()) {
co_await ignore_existing([this, table = std::move(table)] {
return _mm.announce_new_column_family(std::move(table), api::min_timestamp);
auto m = co_await map_reduce(tables,
/* Mapper */ [this] (auto&& table) -> future<std::vector<mutation>> {
try {
co_return co_await _mm.prepare_new_column_family_announcement(std::move(table), api::min_timestamp);
} catch (exceptions::already_exists_exception&) {
co_return std::vector<mutation>();
}
},
/* Initial value*/ std::vector<mutation>(),
/* Reducer */ [] (std::vector<mutation> m1, std::vector<mutation> m2) {
std::move(m2.begin(), m2.end(), std::back_inserter(m1));
return m1;
});
co_await _mm.announce(std::move(m));
} else {
dlogger.info("All tables are present on start");
}
_started = true;
co_await add_new_columns_if_missing(_qp.db().real_database(), _mm);
if (has_missing_columns(_qp.db())) {
co_await _mm.schema_read_barrier();
co_await add_new_columns_if_missing(_qp.db().real_database(), _mm);
} else {
dlogger.info("All schemas are uptodate on start");
}
}
future<> system_distributed_keyspace::stop() {

View File

@@ -19,6 +19,7 @@
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include <seastar/core/coroutine.hh>
#include "redis/keyspace_utils.hh"
#include "schema_builder.hh"
#include "types.hh"
@@ -151,15 +152,17 @@ schema_ptr zsets_schema(sstring ks_name) {
}
future<> create_keyspace_if_not_exists_impl(seastar::sharded<service::migration_manager>& mm, db::config& config, int default_replication_factor) {
assert(this_shard_id() == 0);
auto keyspace_replication_strategy_options = config.redis_keyspace_replication_strategy_options();
if (!keyspace_replication_strategy_options.contains("class")) {
keyspace_replication_strategy_options["class"] = "SimpleStrategy";
keyspace_replication_strategy_options["replication_factor"] = fmt::format("{}", default_replication_factor);
}
auto keyspace_gen = [&mm, &config, keyspace_replication_strategy_options = std::move(keyspace_replication_strategy_options)] (sstring name) {
auto keyspace_gen = [&mm, &config, keyspace_replication_strategy_options = std::move(keyspace_replication_strategy_options)] (sstring name) -> future<> {
auto& mml = mm.local();
auto& proxy = service::get_local_storage_proxy();
if (proxy.get_db().local().has_keyspace(name)) {
return make_ready_future<>();
co_return;
}
auto attrs = make_shared<cql3::statements::ks_prop_defs>();
attrs->add_property(cql3::statements::ks_prop_defs::KW_DURABLE_WRITES, "true");
@@ -170,18 +173,22 @@ future<> create_keyspace_if_not_exists_impl(seastar::sharded<service::migration_
attrs->add_property(cql3::statements::ks_prop_defs::KW_REPLICATION, replication_properties);
attrs->validate();
const auto& tm = *proxy.get_token_metadata_ptr();
return mm.local().announce_new_keyspace(attrs->as_ks_metadata(name, tm));
co_return co_await mml.announce(mml.prepare_new_keyspace_announcement(attrs->as_ks_metadata(name, tm)));
};
auto table_gen = [&mm] (sstring ks_name, sstring cf_name, schema_ptr schema) {
auto table_gen = [&mm] (sstring ks_name, sstring cf_name, schema_ptr schema) -> future<> {
auto& mml= mm.local();
auto& proxy = service::get_local_storage_proxy();
if (proxy.get_db().local().has_schema(ks_name, cf_name)) {
return make_ready_future<>();
co_return;
}
logger.info("Create keyspace: {}, table: {} for redis.", ks_name, cf_name);
return mm.local().announce_new_column_family(schema);
co_return co_await mml.announce(co_await mml.prepare_new_column_family_announcement(schema));
};
co_await mm.local().schema_read_barrier();
// create default databases for redis.
return parallel_for_each(boost::irange<unsigned>(0, config.redis_database_count()), [keyspace_gen = std::move(keyspace_gen), table_gen = std::move(table_gen)] (auto c) {
co_return co_await parallel_for_each(boost::irange<unsigned>(0, config.redis_database_count()), [keyspace_gen = std::move(keyspace_gen), table_gen = std::move(table_gen)] (auto c) {
auto ks_name = fmt::format("REDIS_{}", c);
return keyspace_gen(ks_name).then([ks_name, table_gen] {
return when_all_succeed(

View File

@@ -128,6 +128,11 @@ future<> service::client_state::has_schema_access(const replica::database& db, c
co_return co_await has_access(db, s.ks_name(), {p, r});
}
future<> service::client_state::has_schema_access(const replica::database& db, const sstring& ks_name, const sstring& cf_name, auth::permission p) const {
auth::resource r = auth::make_data_resource(ks_name, cf_name);
co_return co_await has_access(db, ks_name, {p, r});
}
future<> service::client_state::has_access(const replica::database& db, const sstring& ks, auth::command_desc cmd) const {
if (ks.empty()) {
return make_exception_future<>(exceptions::invalid_request_exception("You have not set a keyspace for this session"));

View File

@@ -351,6 +351,7 @@ public:
future<> has_column_family_access(const replica::database& db, const sstring&, const sstring&, auth::permission,
auth::command_desc::type = auth::command_desc::type::OTHER) const;
future<> has_schema_access(const replica::database& db, const schema& s, auth::permission p) const;
future<> has_schema_access(const replica::database& db, const sstring&, const sstring&, auth::permission p) const;
private:
future<> has_access(const replica::database& db, const sstring& keyspace, auth::command_desc) const;

View File

@@ -629,26 +629,13 @@ std::vector<mutation> migration_manager::prepare_keyspace_update_announcement(lw
return db::schema_tables::make_create_keyspace_mutations(ksm, api::new_timestamp());
}
future<> migration_manager::announce_keyspace_update(lw_shared_ptr<keyspace_metadata> ksm) {
return announce(prepare_keyspace_update_announcement(std::move(ksm)));
}
future<>migration_manager::announce_new_keyspace(lw_shared_ptr<keyspace_metadata> ksm)
{
return announce_new_keyspace(ksm, api::new_timestamp());
}
std::vector<mutation> migration_manager::prepare_new_keyspace_announcement(lw_shared_ptr<keyspace_metadata> ksm, api::timestamp_type timestamp) {
std::vector<mutation> migration_manager::prepare_new_keyspace_announcement(lw_shared_ptr<keyspace_metadata> ksm) {
auto& proxy = _storage_proxy;
auto& db = proxy.get_db().local();
db.validate_new_keyspace(*ksm);
mlogger.info("Create new Keyspace: {}", ksm);
return db::schema_tables::make_create_keyspace_mutations(ksm, timestamp);
}
future<> migration_manager::announce_new_keyspace(lw_shared_ptr<keyspace_metadata> ksm, api::timestamp_type timestamp) {
return announce(prepare_new_keyspace_announcement(std::move(ksm), timestamp));
return db::schema_tables::make_create_keyspace_mutations(ksm, api::new_timestamp());
}
future<> migration_manager::announce_new_column_family(schema_ptr cfm)
@@ -836,10 +823,6 @@ std::vector<mutation> migration_manager::prepare_keyspace_drop_announcement(cons
return db::schema_tables::make_drop_keyspace_mutations(keyspace.metadata(), api::new_timestamp());
}
future<> migration_manager::announce_keyspace_drop(const sstring& ks_name) {
return announce(prepare_keyspace_drop_announcement(ks_name));
}
future<std::vector<mutation>> migration_manager::prepare_column_family_drop_announcement(const sstring& ks_name,
const sstring& cf_name, drop_views drop_views) {
try {

View File

@@ -129,14 +129,9 @@ public:
bool should_pull_schema_from(const gms::inet_address& endpoint);
bool has_compatible_schema_tables_version(const gms::inet_address& endpoint);
future<> announce_keyspace_update(lw_shared_ptr<keyspace_metadata> ksm);
std::vector<mutation> prepare_keyspace_update_announcement(lw_shared_ptr<keyspace_metadata> ksm);
future<> announce_new_keyspace(lw_shared_ptr<keyspace_metadata> ksm);
future<> announce_new_keyspace(lw_shared_ptr<keyspace_metadata> ksm, api::timestamp_type timestamp);
std::vector<mutation> prepare_new_keyspace_announcement(lw_shared_ptr<keyspace_metadata> ksm, api::timestamp_type timestamp);
std::vector<mutation> prepare_new_keyspace_announcement(lw_shared_ptr<keyspace_metadata> ksm);
// The timestamp parameter can be used to ensure that all nodes update their internal tables' schemas
@@ -162,7 +157,6 @@ public:
future<std::vector<mutation>> prepare_update_type_announcement(user_type updated_type);
future<> announce_keyspace_drop(const sstring& ks_name);
std::vector<mutation> prepare_keyspace_drop_announcement(const sstring& ks_name);
class drop_views_tag;

View File

@@ -20,6 +20,7 @@
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include <seastar/core/coroutine.hh>
#include "table_helper.hh"
#include "cql3/query_processor.hh"
#include "cql3/statements/create_table_statement.hh"
@@ -27,22 +28,30 @@
#include "replica/database.hh"
#include "service/migration_manager.hh"
future<> table_helper::setup_table(cql3::query_processor& qp) const {
future<> table_helper::setup_table(cql3::query_processor& qp, const sstring& create_cql) {
auto db = qp.db();
if (db.has_schema(_keyspace, _name)) {
return make_ready_future<>();
}
auto parsed = cql3::query_processor::parse_statement(_create_cql);
auto parsed = cql3::query_processor::parse_statement(create_cql);
cql3::statements::raw::cf_statement* parsed_cf_stmt = static_cast<cql3::statements::raw::cf_statement*>(parsed.get());
parsed_cf_stmt->prepare_keyspace(_keyspace);
(void)parsed_cf_stmt->keyspace(); // This will assert if cql statement did not contain keyspace
::shared_ptr<cql3::statements::create_table_statement> statement =
static_pointer_cast<cql3::statements::create_table_statement>(
parsed_cf_stmt->prepare(db, qp.get_cql_stats())->statement);
auto schema = statement->get_cf_meta_data(db);
if (db.has_schema(schema->ks_name(), schema->cf_name())) {
co_return;
}
auto& mm = qp.get_migration_manager();
co_await mm.schema_read_barrier();
if (db.has_schema(schema->ks_name(), schema->cf_name())) { // re-check after read barrier
co_return;
}
// Generate the CF UUID based on its KF names. This is needed to ensure that
// all Nodes that create it would create it with the same UUID and we don't
// hit the #420 issue.
@@ -55,7 +64,9 @@ future<> table_helper::setup_table(cql3::query_processor& qp) const {
// "CREATE TABLE" invocation on different Nodes.
// The important thing is that it will converge eventually (some traces may
// be lost in a process but that's ok).
return qp.get_migration_manager().announce_new_column_family(b.build()).discard_result().handle_exception([this] (auto ep) {});;
try {
co_return co_await mm.announce(co_await mm.prepare_new_column_family_announcement(b.build()));
} catch (...) {}
}
future<> table_helper::cache_table_info(cql3::query_processor& qp, service::query_state& qs) {
@@ -92,7 +103,9 @@ future<> table_helper::cache_table_info(cql3::query_processor& qp, service::quer
}).handle_exception([this, &qp] (auto eptr) {
// One of the possible causes for an error here could be the table that doesn't exist.
//FIXME: discarded future.
(void)this->setup_table(qp).discard_result();
(void)qp.container().invoke_on(0, [create_cql = _create_cql] (cql3::query_processor& qp) -> future<> {
co_return co_await table_helper::setup_table(qp, create_cql);
});
// We throw the bad_column_family exception because the caller
// expects and accounts this type of errors.
@@ -116,35 +129,33 @@ future<> table_helper::insert(cql3::query_processor& qp, service::query_state& q
}
future<> table_helper::setup_keyspace(cql3::query_processor& qp, std::string_view keyspace_name, sstring replication_factor, service::query_state& qs, std::vector<table_helper*> tables) {
if (this_shard_id() == 0) {
size_t n = tables.size();
for (size_t i = 0; i < n; ++i) {
if (tables[i]->_keyspace != keyspace_name) {
throw std::invalid_argument("setup_keyspace called with table_helper for different keyspace");
}
}
return seastar::async([&qp, keyspace_name, replication_factor, &qs, tables] {
data_dictionary::database db = qp.db();
// Create a keyspace
if (!db.has_keyspace(keyspace_name)) {
std::map<sstring, sstring> opts;
opts["replication_factor"] = replication_factor;
auto ksm = keyspace_metadata::new_keyspace(keyspace_name, "org.apache.cassandra.locator.SimpleStrategy", std::move(opts), true);
// We use min_timestamp so that default keyspace metadata will loose with any manual adjustments. See issue #2129.
qp.get_migration_manager().announce_new_keyspace(ksm, api::min_timestamp).get();
}
qs.get_client_state().set_keyspace(db.real_database(), keyspace_name);
// Create tables
size_t n = tables.size();
for (size_t i = 0; i < n; ++i) {
tables[i]->setup_table(qp).get();
}
});
} else {
return make_ready_future<>();
if (this_shard_id() != 0) {
co_return;
}
if (std::any_of(tables.begin(), tables.end(), [&] (table_helper* t) { return t->_keyspace != keyspace_name; })) {
throw std::invalid_argument("setup_keyspace called with table_helper for different keyspace");
}
data_dictionary::database db = qp.db();
auto& mm = qp.get_migration_manager();
if (!db.has_keyspace(keyspace_name)) {
co_await mm.schema_read_barrier();
// Create a keyspace
if (!db.has_keyspace(keyspace_name)) {
std::map<sstring, sstring> opts;
opts["replication_factor"] = replication_factor;
auto ksm = keyspace_metadata::new_keyspace(keyspace_name, "org.apache.cassandra.locator.SimpleStrategy", std::move(opts), true);
co_await mm.announce(mm.prepare_new_keyspace_announcement(ksm));
}
}
qs.get_client_state().set_keyspace(db.real_database(), keyspace_name);
// Create tables
co_await parallel_for_each(tables, [&qp] (table_helper* t) {
return table_helper::setup_table(qp, t->_create_cql);
});
}

View File

@@ -65,7 +65,7 @@ public:
* @return A future that resolves when the operation is complete. Any
* possible errors are ignored.
*/
future<> setup_table(cql3::query_processor& qp) const;
static future<> setup_table(cql3::query_processor& qp, const sstring& create_cql);
/**
* @return a future that resolves when the given t_helper is ready to be used for

View File

@@ -869,119 +869,149 @@ public:
});
}
static future<std::string> execute_schema_command(distributed<service::migration_manager>& dmm, distributed<replica::database>& db, std::function<future<std::vector<mutation>>(service::migration_manager&, replica::database&)> ddl) {
auto func = [ddl, &dmm] (replica::database& db) -> future<std::string> {
auto& mm = dmm.local();
co_await mm.schema_read_barrier();
co_await mm.announce(co_await ddl(mm, db));
co_return std::string(db.get_version().to_sstring());
};
co_return co_await db.invoke_on(0, func);
}
void system_add_column_family(thrift_fn::function<void(std::string const& _return)> cob, thrift_fn::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const CfDef& cf_def) {
service_permit permit = obtain_permit();
with_cob(std::move(cob), std::move(exn_cob), [&] {
if (!_db.local().has_keyspace(cf_def.keyspace)) {
throw NotFoundException();
}
if (_db.local().has_schema(cf_def.keyspace, cf_def.name)) {
throw make_exception<InvalidRequestException>("Column family {} already exists", cf_def.name);
}
with_cob(std::move(cob), std::move(exn_cob), [this, def = cf_def] () -> future<std::string> {
auto& t = *this;
auto cf_def = def;
auto s = schema_from_thrift(cf_def, cf_def.keyspace);
return _query_state.get_client_state().has_keyspace_access(_db.local(), cf_def.keyspace, auth::permission::CREATE).then([this, s = std::move(s)] {
return _query_processor.local().get_migration_manager().announce_new_column_family(std::move(s)).then([this] {
return std::string(_db.local().get_version().to_sstring());
});
co_await t._query_state.get_client_state().has_keyspace_access(t._db.local(), cf_def.keyspace, auth::permission::CREATE);
co_return co_await execute_schema_command(t._query_processor.local().get_migration_manager().container(), _db, [&cf_def] (service::migration_manager& mm, replica::database& db) -> future<std::vector<mutation>> {
if (!db.has_keyspace(cf_def.keyspace)) {
throw NotFoundException();
}
if (db.has_schema(cf_def.keyspace, cf_def.name)) {
throw make_exception<InvalidRequestException>("Column family {} already exists", cf_def.name);
}
auto s = schema_from_thrift(cf_def, cf_def.keyspace);
co_return co_await mm.prepare_new_column_family_announcement(std::move(s));
});
});
}
void system_drop_column_family(thrift_fn::function<void(std::string const& _return)> cob, thrift_fn::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const std::string& column_family) {
service_permit permit = obtain_permit();
with_cob(std::move(cob), std::move(exn_cob), [&] {
return _query_state.get_client_state().has_column_family_access(_db.local(), current_keyspace(), column_family, auth::permission::DROP).then([this, column_family] {
auto& cf = _db.local().find_column_family(current_keyspace(), column_family);
with_cob(std::move(cob), std::move(exn_cob), [this, cfm = column_family] () -> future<std::string> {
auto& t = *this;
auto column_family = cfm;
co_await t._query_state.get_client_state().has_column_family_access(t._db.local(), t.current_keyspace(), column_family, auth::permission::DROP);
co_return co_await execute_schema_command(t._query_processor.local().get_migration_manager().container(), _db,
[&column_family, &current_keyspace = t.current_keyspace()] (service::migration_manager& mm, replica::database& db) -> future<std::vector<mutation>> {
auto& cf = db.find_column_family(current_keyspace, column_family);
if (cf.schema()->is_view()) {
throw make_exception<InvalidRequestException>("Cannot drop Materialized Views from Thrift");
}
if (!cf.views().empty()) {
throw make_exception<InvalidRequestException>("Cannot drop table with Materialized Views {}", column_family);
}
return _query_processor.local().get_migration_manager().announce_column_family_drop(current_keyspace(), column_family).then([this] {
return std::string(_db.local().get_version().to_sstring());
});
co_return co_await mm.prepare_column_family_drop_announcement(current_keyspace, column_family);
});
});
}
void system_add_keyspace(thrift_fn::function<void(std::string const& _return)> cob, thrift_fn::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const KsDef& ks_def) {
service_permit permit = obtain_permit();
with_cob(std::move(cob), std::move(exn_cob), [&] {
auto ksm = keyspace_from_thrift(ks_def);
return _query_state.get_client_state().has_all_keyspaces_access(auth::permission::CREATE).then([this, ksm = std::move(ksm)] {
return _query_processor.local().get_migration_manager().announce_new_keyspace(std::move(ksm)).then([this] {
return std::string(_db.local().get_version().to_sstring());
});
with_cob(std::move(cob), std::move(exn_cob), [this, def = ks_def] () -> future<std::string> {
auto& t = *this;
auto ks_def = def;
co_await t._query_state.get_client_state().has_all_keyspaces_access(auth::permission::CREATE);
co_return co_await execute_schema_command(t._query_processor.local().get_migration_manager().container(), _db, [&ks_def] (service::migration_manager& mm, replica::database& db) -> future<std::vector<mutation>> {
co_return mm.prepare_new_keyspace_announcement(keyspace_from_thrift(ks_def));
});
});
}
void system_drop_keyspace(thrift_fn::function<void(std::string const& _return)> cob, thrift_fn::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const std::string& keyspace) {
service_permit permit = obtain_permit();
with_cob(std::move(cob), std::move(exn_cob), [&] {
thrift_validation::validate_keyspace_not_system(keyspace);
if (!_db.local().has_keyspace(keyspace)) {
throw NotFoundException();
}
with_cob(std::move(cob), std::move(exn_cob), [this, ks = keyspace] () -> future<std::string> {
auto& t = *this;
auto keyspace = ks;
return _query_state.get_client_state().has_keyspace_access(_db.local(), keyspace, auth::permission::DROP).then([this, keyspace] {
return _query_processor.local().get_migration_manager().announce_keyspace_drop(keyspace).then([this] {
return std::string(_db.local().get_version().to_sstring());
});
co_await t._query_state.get_client_state().has_keyspace_access(t._db.local(), keyspace, auth::permission::DROP);
co_return co_await execute_schema_command(t._query_processor.local().get_migration_manager().container(), _db, [&keyspace] (service::migration_manager& mm, replica::database& db) -> future<std::vector<mutation>> {
thrift_validation::validate_keyspace_not_system(keyspace);
if (!db.has_keyspace(keyspace)) {
throw NotFoundException();
}
co_return mm.prepare_keyspace_drop_announcement(keyspace);
});
});
}
void system_update_keyspace(thrift_fn::function<void(std::string const& _return)> cob, thrift_fn::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const KsDef& ks_def) {
service_permit permit = obtain_permit();
with_cob(std::move(cob), std::move(exn_cob), [&] {
with_cob(std::move(cob), std::move(exn_cob), [this, def = ks_def] () -> future<std::string> {
auto& t = *this;
auto ks_def = def;
thrift_validation::validate_keyspace_not_system(ks_def.name);
if (!_db.local().has_keyspace(ks_def.name)) {
throw NotFoundException();
}
if (!ks_def.cf_defs.empty()) {
throw make_exception<InvalidRequestException>("Keyspace update must not contain any column family definitions.");
}
co_await t._query_state.get_client_state().has_keyspace_access(t._db.local(), ks_def.name, auth::permission::ALTER);
auto ksm = keyspace_from_thrift(ks_def);
return _query_state.get_client_state().has_keyspace_access(_db.local(), ks_def.name, auth::permission::ALTER).then([this, ksm = std::move(ksm)] {
return _query_processor.local().get_migration_manager().announce_keyspace_update(std::move(ksm)).then([this] {
return std::string(_db.local().get_version().to_sstring());
});
co_return co_await execute_schema_command(t._query_processor.local().get_migration_manager().container(), _db, [&ks_def] (service::migration_manager& mm, replica::database& db) -> future<std::vector<mutation>> {
if (db.has_keyspace(ks_def.name)) {
throw NotFoundException();
}
if (!ks_def.cf_defs.empty()) {
throw make_exception<InvalidRequestException>("Keyspace update must not contain any column family definitions.");
}
auto ksm = keyspace_from_thrift(ks_def);
co_return mm.prepare_keyspace_update_announcement(std::move(ksm));
});
});
}
void system_update_column_family(thrift_fn::function<void(std::string const& _return)> cob, thrift_fn::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob, const CfDef& cf_def) {
service_permit permit = obtain_permit();
with_cob(std::move(cob), std::move(exn_cob), [&] {
auto& cf = _db.local().find_column_family(cf_def.keyspace, cf_def.name);
auto schema = cf.schema();
with_cob(std::move(cob), std::move(exn_cob), [this, def = cf_def] () -> future<std::string> {
auto& t = *this;
auto cf_def = def;
if (schema->is_cql3_table()) {
throw make_exception<InvalidRequestException>("Cannot modify CQL3 table {} as it may break the schema. You should use cqlsh to modify CQL3 tables instead.", cf_def.name);
}
co_await t._query_state.get_client_state().has_schema_access(t._db.local(), cf_def.keyspace, cf_def.name, auth::permission::ALTER);
if (schema->is_view()) {
throw make_exception<InvalidRequestException>("Cannot modify Materialized View table {} as it may break the schema. "
"You should use cqlsh to modify Materialized View tables instead.", cf_def.name);
}
co_return co_await execute_schema_command(t._query_processor.local().get_migration_manager().container(), _db, [&cf_def] (service::migration_manager& mm, replica::database& db) -> future<std::vector<mutation>> {
auto& cf = db.find_column_family(cf_def.keyspace, cf_def.name);
auto schema = cf.schema();
if (!cf.views().empty()) {
throw make_exception<InvalidRequestException>("Cannot modify table with Materialized Views {} as it may break the schema. "
"You should use cqlsh to modify Materialized View tables instead.", cf_def.name);
}
if (schema->is_cql3_table()) {
throw make_exception<InvalidRequestException>("Cannot modify CQL3 table {} as it may break the schema. You should use cqlsh to modify CQL3 tables instead.", cf_def.name);
}
auto s = schema_from_thrift(cf_def, cf_def.keyspace, schema->id());
if (schema->thrift().is_dynamic() != s->thrift().is_dynamic()) {
fail(unimplemented::cause::MIXED_CF);
}
return _query_state.get_client_state().has_schema_access(_db.local(), *schema, auth::permission::ALTER).then([this, s = std::move(s)] {
return _query_processor.local().get_migration_manager().announce_column_family_update(std::move(s), true, std::nullopt).then([this] {
return std::string(_db.local().get_version().to_sstring());
});
if (schema->is_view()) {
throw make_exception<InvalidRequestException>("Cannot modify Materialized View table {} as it may break the schema. "
"You should use cqlsh to modify Materialized View tables instead.", cf_def.name);
}
if (!cf.views().empty()) {
throw make_exception<InvalidRequestException>("Cannot modify table with Materialized Views {} as it may break the schema. "
"You should use cqlsh to modify Materialized View tables instead.", cf_def.name);
}
auto s = schema_from_thrift(cf_def, cf_def.keyspace, schema->id());
if (schema->thrift().is_dynamic() != s->thrift().is_dynamic()) {
fail(unimplemented::cause::MIXED_CF);
}
co_return co_await mm.prepare_column_family_update_announcement(std::move(s), true, std::vector<view_ptr>(), std::nullopt);
});
});
}