diff --git a/alternator/executor.cc b/alternator/executor.cc index 26322e5a9f..0cc8912651 100644 --- a/alternator/executor.cc +++ b/alternator/executor.cc @@ -61,11 +61,14 @@ #include #include "service/storage_proxy.hh" #include "gms/gossiper.hh" +#include "schema_registry.hh" logging::logger elogger("alternator-executor"); namespace alternator { +static future> create_keyspace(std::string_view keyspace_name, service::migration_manager& mm, gms::gossiper& gossiper); + static map_type attrs_type() { static thread_local auto t = map_type_impl::get_instance(utf8_type, bytes_type, true); return t; @@ -116,13 +119,13 @@ std::string json_string::to_json() const { return _value; } -void executor::supplement_table_info(rjson::value& descr, const schema& schema) const { +void executor::supplement_table_info(rjson::value& descr, const schema& schema, service::storage_proxy& sp) { rjson::add(descr, "CreationDateTime", rjson::value(std::chrono::duration_cast(gc_clock::now().time_since_epoch()).count())); rjson::add(descr, "TableStatus", "ACTIVE"); auto schema_id_str = schema.id().to_sstring(); rjson::add(descr, "TableId", rjson::from_string(schema_id_str)); - executor::supplement_table_stream_info(descr, schema); + executor::supplement_table_stream_info(descr, schema, sp); } // We would have liked to support table names up to 255 bytes, like DynamoDB. @@ -483,7 +486,7 @@ future executor::describe_table(client_state& cli } rjson::add(table_description, "AttributeDefinitions", std::move(attribute_definitions)); - supplement_table_stream_info(table_description, *schema); + supplement_table_stream_info(table_description, *schema, _proxy); // FIXME: still missing some response fields (issue #5026) @@ -500,23 +503,31 @@ future executor::delete_table(client_state& clien std::string table_name = get_table_name(request); std::string keyspace_name = executor::KEYSPACE_NAME_PREFIX + table_name; tracing::add_table_name(trace_state, keyspace_name, table_name); + auto& p = _proxy.container(); - if (!_proxy.get_db().local().has_schema(keyspace_name, table_name)) { - return make_ready_future(api_error::resource_not_found( - format("Requested resource not found: Table: {} not found", table_name))); - } - return _mm.announce_column_family_drop(keyspace_name, table_name, service::migration_manager::drop_views::yes).then([this, keyspace_name] { - return _mm.announce_keyspace_drop(keyspace_name); - }).then([table_name = std::move(table_name)] { - // FIXME: need more attributes? - rjson::value table_description = rjson::empty_object(); - rjson::add(table_description, "TableName", rjson::from_string(table_name)); - rjson::add(table_description, "TableStatus", "DELETING"); - rjson::value response = rjson::empty_object(); - rjson::add(response, "TableDescription", std::move(table_description)); - elogger.trace("returning {}", response); - return make_ready_future(make_jsonable(std::move(response))); + co_await _mm.container().invoke_on(0, [&] (service::migration_manager& mm) -> future<> { + co_await mm.schema_read_barrier(); + + if (!p.local().get_db().local().has_schema(keyspace_name, table_name)) { + throw api_error::resource_not_found(format("Requested resource not found: Table: {} not found", table_name)); + } + + auto m = co_await mm.prepare_column_family_drop_announcement(keyspace_name, table_name, service::migration_manager::drop_views::yes); + auto m2 = mm.prepare_keyspace_drop_announcement(keyspace_name); + + std::move(m2.begin(), m2.end(), std::back_inserter(m)); + + co_await mm.announce(std::move(m)); }); + + // FIXME: need more attributes? + rjson::value table_description = rjson::empty_object(); + rjson::add(table_description, "TableName", rjson::from_string(table_name)); + rjson::add(table_description, "TableStatus", "DELETING"); + rjson::value response = rjson::empty_object(); + rjson::add(response, "TableDescription", std::move(table_description)); + elogger.trace("returning {}", response); + co_return make_jsonable(std::move(response)); } static data_type parse_key_type(const std::string& type) { @@ -750,9 +761,16 @@ static void update_tags_map(const rjson::value& tags, std::map // to races during concurrent updates of the same table. Once Scylla schema updates // are fixed, this issue will automatically get fixed as well. future<> update_tags(service::migration_manager& mm, schema_ptr schema, std::map&& tags_map) { - schema_builder builder(schema); - builder.add_extension(tags_extension::NAME, ::make_shared(std::move(tags_map))); - return mm.announce_column_family_update(builder.build(), false, std::nullopt); + co_await mm.container().invoke_on(0, [s = global_schema_ptr(std::move(schema)), tags_map = std::move(tags_map)] (service::migration_manager& mm) -> future<> { + co_await mm.schema_read_barrier(); + + schema_builder builder(s); + builder.add_extension(tags_extension::NAME, ::make_shared(tags_map)); + + auto m = co_await mm.prepare_column_family_update_announcement(builder.build(), false, std::vector(), std::nullopt); + + co_await mm.announce(std::move(m)); + }); } future executor::tag_resource(client_state& client_state, service_permit permit, rjson::value request) { @@ -847,13 +865,12 @@ static void verify_billing_mode(const rjson::value& request) { } } -future executor::create_table(client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit, rjson::value request) { - _stats.api_operations.create_table++; - elogger.trace("Creating table {}", request); +static future create_table_on_shard0(tracing::trace_state_ptr trace_state, rjson::value request, service::storage_proxy& sp, service::migration_manager& mm, gms::gossiper& gossiper) { + assert(this_shard_id() == 0); + std::string table_name = get_table_name(request); - if (table_name.find(INTERNAL_TABLE_PREFIX) == 0) { - return make_ready_future(api_error::validation( - format("Prefix {} is reserved for accessing internal tables", INTERNAL_TABLE_PREFIX))); + if (table_name.find(executor::INTERNAL_TABLE_PREFIX) == 0) { + co_return api_error::validation(format("Prefix {} is reserved for accessing internal tables", executor::INTERNAL_TABLE_PREFIX)); } std::string keyspace_name = executor::KEYSPACE_NAME_PREFIX + table_name; const rjson::value& attribute_definitions = request["AttributeDefinitions"]; @@ -866,10 +883,12 @@ future executor::create_table(client_state& clien if (!range_key.empty()) { add_column(builder, range_key, attribute_definitions, column_kind::clustering_key); } - builder.with_column(bytes(ATTRS_COLUMN_NAME), attrs_type(), column_kind::regular_column); + builder.with_column(bytes(executor::ATTRS_COLUMN_NAME), attrs_type(), column_kind::regular_column); verify_billing_mode(request); + co_await mm.schema_read_barrier(); + schema_ptr partial_schema = builder.build(); // Parse GlobalSecondaryIndexes parameters before creating the base @@ -880,12 +899,12 @@ future executor::create_table(client_state& clien std::vector where_clauses; if (gsi) { if (!gsi->IsArray()) { - return make_ready_future(api_error::validation("GlobalSecondaryIndexes must be an array.")); + co_return api_error::validation("GlobalSecondaryIndexes must be an array."); } for (const rjson::value& g : gsi->GetArray()) { const rjson::value* index_name = rjson::find(g, "IndexName"); if (!index_name || !index_name->IsString()) { - return make_ready_future(api_error::validation("GlobalSecondaryIndexes IndexName must be a string.")); + co_return api_error::validation("GlobalSecondaryIndexes IndexName must be a string."); } std::string vname(view_name(table_name, index_name->GetString())); elogger.trace("Adding GSI {}", index_name->GetString()); @@ -943,23 +962,21 @@ future executor::create_table(client_state& clien std::string vname(lsi_name(table_name, index_name->GetString())); elogger.trace("Adding LSI {}", index_name->GetString()); if (range_key.empty()) { - return make_ready_future(api_error::validation( - "LocalSecondaryIndex requires that the base table have a range key")); + co_return api_error::validation("LocalSecondaryIndex requires that the base table have a range key"); } // FIXME: read and handle "Projection" parameter. This will // require the MV code to copy just parts of the attrs map. schema_builder view_builder(keyspace_name, vname); auto [view_hash_key, view_range_key] = parse_key_schema(l); if (view_hash_key != hash_key) { - return make_ready_future(api_error::validation( - "LocalSecondaryIndex hash key must match the base table hash key")); + co_return api_error::validation("LocalSecondaryIndex hash key must match the base table hash key"); } add_column(view_builder, view_hash_key, attribute_definitions, column_kind::partition_key); if (view_range_key.empty()) { - return make_ready_future(api_error::validation("LocalSecondaryIndex must specify a sort key")); + co_return api_error::validation("LocalSecondaryIndex must specify a sort key"); } if (view_range_key == hash_key) { - return make_ready_future(api_error::validation("LocalSecondaryIndex sort key cannot be the same as hash key")); + co_return api_error::validation("LocalSecondaryIndex sort key cannot be the same as hash key"); } if (view_range_key != range_key) { add_column(builder, view_range_key, attribute_definitions, column_kind::regular_column); @@ -971,7 +988,7 @@ future executor::create_table(client_state& clien if (!range_key.empty() && view_range_key != range_key) { add_column(view_builder, range_key, attribute_definitions, column_kind::clustering_key); } - view_builder.with_column(bytes(ATTRS_COLUMN_NAME), attrs_type(), column_kind::regular_column); + view_builder.with_column(bytes(executor::ATTRS_COLUMN_NAME), attrs_type(), column_kind::regular_column); // Note above we don't need to add virtual columns, as all // base columns were copied to view. TODO: reconsider the need // for virtual columns when we support Projection. @@ -991,17 +1008,17 @@ future executor::create_table(client_state& clien if (sse_specification && sse_specification->IsObject()) { rjson::value* enabled = rjson::find(*sse_specification, "Enabled"); if (!enabled || !enabled->IsBool()) { - return make_ready_future(api_error("ValidationException", "SSESpecification needs boolean Enabled")); + co_return api_error("ValidationException", "SSESpecification needs boolean Enabled"); } if (enabled->GetBool()) { // TODO: full support for SSESpecification - return make_ready_future(api_error("ValidationException", "SSESpecification: configuring encryption-at-rest is not yet supported.")); + co_return api_error("ValidationException", "SSESpecification: configuring encryption-at-rest is not yet supported."); } } rjson::value* stream_specification = rjson::find(request, "StreamSpecification"); if (stream_specification && stream_specification->IsObject()) { - add_stream_options(*stream_specification, builder); + executor::add_stream_options(*stream_specification, builder, sp); } // Parse the "Tags" parameter early, so we can avoid creating the table @@ -1030,30 +1047,52 @@ future executor::create_table(client_state& clien ++where_clause_it; } - return create_keyspace(keyspace_name).handle_exception_type([] (exceptions::already_exists_exception&) { - // Ignore the fact that the keyspace may already exist. See discussion in #6340 - }).then([this, table_name, request = std::move(request), schema, view_builders = std::move(view_builders), tags_map = std::move(tags_map)] () mutable { - return futurize_invoke([&] { return _mm.announce_new_column_family(schema); }).then([this, table_info = std::move(request), schema, view_builders = std::move(view_builders), tags_map = std::move(tags_map)] () mutable { - return parallel_for_each(std::move(view_builders), [this, schema] (schema_builder builder) { - return _mm.announce_new_view(view_ptr(builder.build())); - }).then([this, table_info = std::move(table_info), schema, tags_map = std::move(tags_map)] () mutable { - future<> f = make_ready_future<>(); - if (!tags_map.empty()) { - f = update_tags(_mm, schema, std::move(tags_map)); - } - return f.then([this] { - return wait_for_schema_agreement(_mm, db::timeout_clock::now() + 10s); - }).then([this, table_info = std::move(table_info), schema] () mutable { - rjson::value status = rjson::empty_object(); - supplement_table_info(table_info, *schema); - rjson::add(status, "TableDescription", std::move(table_info)); - return make_ready_future(make_jsonable(std::move(status))); - }); - }); - }).handle_exception_type([table_name = std::move(table_name)] (exceptions::already_exists_exception&) { - return make_exception_future( - api_error::resource_in_use(format("Table {} already exists", table_name))); + try { + co_await mm.announce(co_await create_keyspace(keyspace_name, mm, gossiper)); + } catch (exceptions::already_exists_exception&) { + // Ignore the fact that the keyspace may already exist. See discussion in #6340 + } + + try { + // The code should be rewritten in a way that allows creating mutations + // for all the changes in a single mutation array before announcing. + // See https://github.com/scylladb/scylla/issues/9868 + co_await mm.announce(co_await mm.prepare_new_column_family_announcement(schema)); + + std::vector m; + + co_await parallel_for_each(std::move(view_builders), [&mm, schema, &m] (schema_builder builder) -> future<> { + auto vm = co_await mm.prepare_new_view_announcement(view_ptr(builder.build())); + std::move(vm.begin(), vm.end(), std::back_inserter(m)); }); + + co_await mm.announce(std::move(m)); + + if (!tags_map.empty()) { + schema_builder builder(schema); + builder.add_extension(tags_extension::NAME, ::make_shared(tags_map)); + + co_await mm.announce(co_await mm.prepare_column_family_update_announcement(builder.build(), false, std::vector(), std::nullopt)); + } + + co_await wait_for_schema_agreement(mm, db::timeout_clock::now() + 10s); + + rjson::value status = rjson::empty_object(); + executor::supplement_table_info(request, *schema, sp); + rjson::add(status, "TableDescription", std::move(request)); + co_return make_jsonable(std::move(status)); + } catch(exceptions::already_exists_exception&) { + co_return api_error::resource_in_use(format("Table {} already exists", table_name)); + } +} + +future executor::create_table(client_state& client_state, tracing::trace_state_ptr trace_state, service_permit permit, rjson::value request) { + _stats.api_operations.create_table++; + elogger.trace("Creating table {}", request); + + co_return co_await _mm.container().invoke_on(0, [&, tr = tracing::global_trace_state_ptr(trace_state), request = std::move(request), &sp = _proxy.container(), &g = _gossiper.container()] + (service::migration_manager& mm) mutable -> future { + co_return co_await create_table_on_shard0(tr, std::move(request), sp.local(), mm, g.local()); }); } @@ -1061,22 +1100,6 @@ future executor::update_table(client_state& clien _stats.api_operations.update_table++; elogger.trace("Updating table {}", request); - schema_ptr tab = get_table(_proxy, request); - // the ugly but harmless conversion to string_view here is because - // Seastar's sstring is missing a find(std::string_view) :-() - if (std::string_view(tab->cf_name()).find(INTERNAL_TABLE_PREFIX) == 0) { - return make_ready_future(api_error::validation( - format("Prefix {} is reserved for accessing internal tables", INTERNAL_TABLE_PREFIX))); - } - tracing::add_table_name(trace_state, tab->ks_name(), tab->cf_name()); - - schema_builder builder(tab); - - rjson::value* stream_specification = rjson::find(request, "StreamSpecification"); - if (stream_specification && stream_specification->IsObject()) { - add_stream_options(*stream_specification, builder); - } - static const std::vector unsupported = { "AttributeDefinitions", "GlobalSecondaryIndexUpdates", @@ -1087,7 +1110,7 @@ future executor::update_table(client_state& clien for (auto& s : unsupported) { if (rjson::find(request, s)) { - throw api_error::validation(s + " not supported"); + co_return coroutine::make_exception(api_error::validation(s + " not supported")); } } @@ -1095,15 +1118,39 @@ future executor::update_table(client_state& clien verify_billing_mode(request); } - auto schema = builder.build(); + co_return co_await _mm.container().invoke_on(0, [&p = _proxy.container(), request = std::move(request), gt = tracing::global_trace_state_ptr(std::move(trace_state))] + (service::migration_manager& mm) mutable -> future { + co_await mm.schema_read_barrier(); + + schema_ptr tab = get_table(p.local(), request); + + tracing::add_table_name(gt, tab->ks_name(), tab->cf_name()); + + // the ugly but harmless conversion to string_view here is because + // Seastar's sstring is missing a find(std::string_view) :-() + if (std::string_view(tab->cf_name()).find(INTERNAL_TABLE_PREFIX) == 0) { + co_return coroutine::make_exception(api_error::validation(format("Prefix {} is reserved for accessing internal tables", INTERNAL_TABLE_PREFIX))); + } + + schema_builder builder(tab); + + rjson::value* stream_specification = rjson::find(request, "StreamSpecification"); + if (stream_specification && stream_specification->IsObject()) { + add_stream_options(*stream_specification, builder, p.local()); + } + + auto schema = builder.build(); + + auto m = co_await mm.prepare_column_family_update_announcement(schema, false, std::vector(), std::nullopt); + + co_await mm.announce(std::move(m)); + + co_await wait_for_schema_agreement(mm, db::timeout_clock::now() + 10s); - return _mm.announce_column_family_update(schema, false, std::nullopt).then([this] { - return wait_for_schema_agreement(_mm, db::timeout_clock::now() + 10s); - }).then([this, table_info = std::move(request), schema] () mutable { rjson::value status = rjson::empty_object(); - supplement_table_info(table_info, *schema); - rjson::add(status, "TableDescription", std::move(table_info)); - return make_ready_future(make_jsonable(std::move(status))); + supplement_table_info(request, *schema, p.local()); + rjson::add(status, "TableDescription", std::move(request)); + co_return make_jsonable(std::move(status)); }); } @@ -4069,19 +4116,19 @@ static std::map get_network_topology_options(gms::gossiper& go // of nodes in the cluster: A cluster with 3 or more live nodes, gets RF=3. // A smaller cluster (presumably, a test only), gets RF=1. The user may // manually create the keyspace to override this predefined behavior. -future<> executor::create_keyspace(std::string_view keyspace_name) { +static future> create_keyspace(std::string_view keyspace_name, service::migration_manager& mm, gms::gossiper& gossiper) { sstring keyspace_name_str(keyspace_name); - return gms::get_all_endpoint_count(_gossiper).then([this, keyspace_name_str = std::move(keyspace_name_str)] (int endpoint_count) { - int rf = 3; - if (endpoint_count < rf) { - rf = 1; - elogger.warn("Creating keyspace '{}' for Alternator with unsafe RF={} because cluster only has {} nodes.", - keyspace_name_str, rf, endpoint_count); - } - auto opts = get_network_topology_options(_gossiper, rf); - auto ksm = keyspace_metadata::new_keyspace(keyspace_name_str, "org.apache.cassandra.locator.NetworkTopologyStrategy", std::move(opts), true); - return _mm.announce_new_keyspace(ksm, api::new_timestamp()); - }); + int endpoint_count = co_await gms::get_all_endpoint_count(gossiper); + int rf = 3; + if (endpoint_count < rf) { + rf = 1; + elogger.warn("Creating keyspace '{}' for Alternator with unsafe RF={} because cluster only has {} nodes.", + keyspace_name_str, rf, endpoint_count); + } + auto opts = get_network_topology_options(gossiper, rf); + auto ksm = keyspace_metadata::new_keyspace(keyspace_name_str, "org.apache.cassandra.locator.NetworkTopologyStrategy", std::move(opts), true); + + co_return mm.prepare_new_keyspace_announcement(ksm); } future<> executor::start() { diff --git a/alternator/executor.hh b/alternator/executor.hh index 71c432d2d8..5c6a917524 100644 --- a/alternator/executor.hh +++ b/alternator/executor.hh @@ -208,8 +208,6 @@ public: future<> start(); future<> stop() { return make_ready_future<>(); } - future<> create_keyspace(std::string_view keyspace_name); - static sstring table_name(const schema&); static db::timeout_clock::time_point default_timeout(); static void set_default_timeout(db::timeout_clock::duration timeout); @@ -239,9 +237,9 @@ public: rjson::value&, bool = false); - void add_stream_options(const rjson::value& stream_spec, schema_builder&) const; - void supplement_table_info(rjson::value& descr, const schema& schema) const; - void supplement_table_stream_info(rjson::value& descr, const schema& schema) const; + static void add_stream_options(const rjson::value& stream_spec, schema_builder&, service::storage_proxy& sp); + static void supplement_table_info(rjson::value& descr, const schema& schema, service::storage_proxy& sp); + static void supplement_table_stream_info(rjson::value& descr, const schema& schema, service::storage_proxy& sp); }; } diff --git a/alternator/streams.cc b/alternator/streams.cc index 43d73e9a71..e120a3be93 100644 --- a/alternator/streams.cc +++ b/alternator/streams.cc @@ -1050,14 +1050,14 @@ future executor::get_records(client_state& client }); } -void executor::add_stream_options(const rjson::value& stream_specification, schema_builder& builder) const { +void executor::add_stream_options(const rjson::value& stream_specification, schema_builder& builder, service::storage_proxy& sp) { auto stream_enabled = rjson::find(stream_specification, "StreamEnabled"); if (!stream_enabled || !stream_enabled->IsBool()) { throw api_error::validation("StreamSpecification needs boolean StreamEnabled"); } if (stream_enabled->GetBool()) { - auto& db = _proxy.get_db().local(); + auto& db = sp.get_db().local(); if (!db.features().cluster_supports_cdc()) { throw api_error::validation("StreamSpecification: streams (CDC) feature not enabled in cluster."); @@ -1094,10 +1094,10 @@ void executor::add_stream_options(const rjson::value& stream_specification, sche } } -void executor::supplement_table_stream_info(rjson::value& descr, const schema& schema) const { +void executor::supplement_table_stream_info(rjson::value& descr, const schema& schema, service::storage_proxy& sp) { auto& opts = schema.cdc_options(); if (opts.enabled()) { - auto& db = _proxy.get_db().local(); + auto& db = sp.get_db().local(); auto& cf = db.find_column_family(schema.ks_name(), cdc::log_name(schema.cf_name())); stream_arn arn(cf.schema()->id()); rjson::add(descr, "LatestStreamArn", arn); diff --git a/auth/common.cc b/auth/common.cc index a14bd18709..0badc0c18c 100644 --- a/auth/common.cc +++ b/auth/common.cc @@ -19,6 +19,7 @@ * along with Scylla. If not, see . */ +#include #include "auth/common.hh" #include @@ -61,9 +62,8 @@ static future<> create_metadata_table_if_missing_impl( cql3::query_processor& qp, std::string_view cql, ::service::migration_manager& mm) { - static auto ignore_existing = [] (seastar::noncopyable_function()> func) { - return futurize_invoke(std::move(func)).handle_exception_type([] (exceptions::already_exists_exception& ignored) { }); - }; + assert(this_shard_id() == 0); // once_among_shards makes sure a function is executed on shard 0 only + auto db = qp.db(); auto parsed_statement = cql3::query_processor::parse_statement(cql); auto& parsed_cf_statement = static_cast(*parsed_statement); @@ -79,9 +79,13 @@ static future<> create_metadata_table_if_missing_impl( schema_builder b(schema); b.set_uuid(uuid); schema_ptr table = b.build(); - return ignore_existing([&mm, table = std::move(table)] () { - return mm.announce_new_column_family(table); - }); + + if (!db.has_schema(table->ks_name(), table->cf_name())) { + co_await mm.schema_read_barrier(); + try { + co_return co_await mm.announce(co_await mm.prepare_new_column_family_announcement(table)); + } catch (exceptions::already_exists_exception&) {} + } } future<> create_metadata_table_if_missing( diff --git a/auth/service.cc b/auth/service.cc index 5c611296a4..d37a225dd9 100644 --- a/auth/service.cc +++ b/auth/service.cc @@ -19,6 +19,7 @@ * along with Scylla. If not, see . */ +#include #include "auth/service.hh" #include @@ -41,6 +42,7 @@ #include "utils/class_registrator.hh" #include "locator/abstract_replication_strategy.hh" #include "data_dictionary/keyspace_metadata.hh" +#include "mutation.hh" namespace auth { @@ -142,23 +144,24 @@ service::service( } future<> service::create_keyspace_if_missing(::service::migration_manager& mm) const { + assert(this_shard_id() == 0); // once_among_shards makes sure a function is executed on shard 0 only auto db = _qp.db(); if (!db.has_keyspace(meta::AUTH_KS)) { - locator::replication_strategy_config_options opts{{"replication_factor", "1"}}; + co_await mm.schema_read_barrier(); - auto ksm = data_dictionary::keyspace_metadata::new_keyspace( - meta::AUTH_KS, - "org.apache.cassandra.locator.SimpleStrategy", - opts, - true); + if (!db.has_keyspace(meta::AUTH_KS)) { + locator::replication_strategy_config_options opts{{"replication_factor", "1"}}; - // We use min_timestamp so that default keyspace metadata will loose with any manual adjustments. - // See issue #2129. - return mm.announce_new_keyspace(ksm, api::min_timestamp); + auto ksm = data_dictionary::keyspace_metadata::new_keyspace( + meta::AUTH_KS, + "org.apache.cassandra.locator.SimpleStrategy", + opts, + true); + + co_return co_await mm.announce(mm.prepare_new_keyspace_announcement(ksm)); + } } - - return make_ready_future<>(); } future<> service::start(::service::migration_manager& mm) { diff --git a/cql3/query_processor.hh b/cql3/query_processor.hh index 94cbb2a93b..4b5f3b1280 100644 --- a/cql3/query_processor.hh +++ b/cql3/query_processor.hh @@ -108,7 +108,7 @@ class cql_config; class query_options; class cql_statement; -class query_processor { +class query_processor : public seastar::peering_sharded_service { public: class migration_subscriber; struct memory_config { diff --git a/cql3/statements/create_keyspace_statement.cc b/cql3/statements/create_keyspace_statement.cc index 4593ac875d..8f607df3f3 100644 --- a/cql3/statements/create_keyspace_statement.cc +++ b/cql3/statements/create_keyspace_statement.cc @@ -120,7 +120,7 @@ future, std::vector< std::vector m; try { - m = qp.get_migration_manager().prepare_new_keyspace_announcement(_attrs->as_ks_metadata(_name, tm), api::new_timestamp()); + m = qp.get_migration_manager().prepare_new_keyspace_announcement(_attrs->as_ks_metadata(_name, tm)); ret = ::make_shared( event::schema_change::change_type::CREATED, diff --git a/db/system_distributed_keyspace.cc b/db/system_distributed_keyspace.cc index 6b23516e64..f88ab7e298 100644 --- a/db/system_distributed_keyspace.cc +++ b/db/system_distributed_keyspace.cc @@ -200,11 +200,33 @@ system_distributed_keyspace::system_distributed_keyspace(cql3::query_processor& , _sp(sp) { } +static thread_local std::pair new_columns[] { + {"timeout", duration_type}, + {"workload_type", utf8_type} +}; + +static bool has_missing_columns(data_dictionary::database db) noexcept { + assert(this_shard_id() == 0); + try { + auto schema = db.find_schema(system_distributed_keyspace::NAME, system_distributed_keyspace::SERVICE_LEVELS); + for (const auto& col : new_columns) { + auto& [col_name, col_type] = col; + bytes options_name = to_bytes(col_name.data()); + if (schema->get_column_definition(options_name)) { + continue; + } + return true; + } + } catch (...) { + dlogger.warn("Failed to update options column in the role attributes table: {}", std::current_exception()); + return true; + } + + return false; +} + static future<> add_new_columns_if_missing(replica::database& db, ::service::migration_manager& mm) noexcept { - static thread_local std::pair new_columns[] { - {"timeout", duration_type}, - {"workload_type", utf8_type} - }; + assert(this_shard_id() == 0); try { auto schema = db.find_schema(system_distributed_keyspace::NAME, system_distributed_keyspace::SERVICE_LEVELS); schema_builder b(schema); @@ -218,14 +240,14 @@ static future<> add_new_columns_if_missing(replica::database& db, ::service::mig updated = true; b.with_column(options_name, col_type, column_kind::regular_column); } - if (!updated) { - return make_ready_future<>(); + if (updated) { + schema_ptr table = b.build(); + try { + co_return co_await mm.announce(co_await mm.prepare_column_family_update_announcement(table, false, std::vector(), api::timestamp_type(1))); + } catch (...) {} } - schema_ptr table = b.build(); - return mm.announce_column_family_update(table, false, api::timestamp_type(1)).handle_exception([] (const std::exception_ptr&) {}); } catch (...) { dlogger.warn("Failed to update options column in the role attributes table: {}", std::current_exception()); - return make_ready_future<>(); } } @@ -235,38 +257,69 @@ future<> system_distributed_keyspace::start() { co_return; } - static auto ignore_existing = [] (seastar::noncopyable_function()> func) { - return futurize_invoke(std::move(func)).handle_exception_type([] (exceptions::already_exists_exception& ignored) { }); - }; + if (!_sp.get_db().local().has_keyspace(NAME)) { + co_await _mm.schema_read_barrier(); - // We use min_timestamp so that the default keyspace metadata will lose with any manual adjustments. - // See issue #2129. - co_await ignore_existing([this] { - auto ksm = keyspace_metadata::new_keyspace( - NAME, - "org.apache.cassandra.locator.SimpleStrategy", - {{"replication_factor", "3"}}, - true /* durable_writes */); - return _mm.announce_new_keyspace(ksm, api::min_timestamp); + try { + auto ksm = keyspace_metadata::new_keyspace( + NAME, + "org.apache.cassandra.locator.SimpleStrategy", + {{"replication_factor", "3"}}, + true /* durable_writes */); + co_await _mm.announce(_mm.prepare_new_keyspace_announcement(ksm)); + } catch (exceptions::already_exists_exception&) {} + } else { + dlogger.info("{} keyspase is already present. Not creating", NAME); + } + + if (!_sp.get_db().local().has_keyspace(NAME_EVERYWHERE)) { + co_await _mm.schema_read_barrier(); + + try { + auto ksm = keyspace_metadata::new_keyspace( + NAME_EVERYWHERE, + "org.apache.cassandra.locator.EverywhereStrategy", + {}, + true /* durable_writes */); + co_await _mm.announce(_mm.prepare_new_keyspace_announcement(ksm)); + } catch (exceptions::already_exists_exception&) {} + } else { + dlogger.info("{} keyspase is already present. Not creating", NAME_EVERYWHERE); + } + + auto tables = ensured_tables(); + bool exist = std::all_of(tables.begin(), tables.end(), [this] (schema_ptr s) { + return _sp.get_db().local().has_schema(s->ks_name(), s->cf_name()); }); - co_await ignore_existing([this] { - auto ksm = keyspace_metadata::new_keyspace( - NAME_EVERYWHERE, - "org.apache.cassandra.locator.EverywhereStrategy", - {}, - true /* durable_writes */); - return _mm.announce_new_keyspace(ksm, api::min_timestamp); - }); + if (!exist) { + co_await _mm.schema_read_barrier(); - for (auto&& table : ensured_tables()) { - co_await ignore_existing([this, table = std::move(table)] { - return _mm.announce_new_column_family(std::move(table), api::min_timestamp); + auto m = co_await map_reduce(tables, + /* Mapper */ [this] (auto&& table) -> future> { + try { + co_return co_await _mm.prepare_new_column_family_announcement(std::move(table), api::min_timestamp); + } catch (exceptions::already_exists_exception&) { + co_return std::vector(); + } + }, + /* Initial value*/ std::vector(), + /* Reducer */ [] (std::vector m1, std::vector m2) { + std::move(m2.begin(), m2.end(), std::back_inserter(m1)); + return m1; }); + co_await _mm.announce(std::move(m)); + } else { + dlogger.info("All tables are present on start"); } _started = true; - co_await add_new_columns_if_missing(_qp.db().real_database(), _mm); + if (has_missing_columns(_qp.db())) { + co_await _mm.schema_read_barrier(); + co_await add_new_columns_if_missing(_qp.db().real_database(), _mm); + } else { + dlogger.info("All schemas are uptodate on start"); + } } future<> system_distributed_keyspace::stop() { diff --git a/redis/keyspace_utils.cc b/redis/keyspace_utils.cc index 0cebea22ac..dd087775e2 100644 --- a/redis/keyspace_utils.cc +++ b/redis/keyspace_utils.cc @@ -19,6 +19,7 @@ * along with Scylla. If not, see . */ +#include #include "redis/keyspace_utils.hh" #include "schema_builder.hh" #include "types.hh" @@ -151,15 +152,17 @@ schema_ptr zsets_schema(sstring ks_name) { } future<> create_keyspace_if_not_exists_impl(seastar::sharded& mm, db::config& config, int default_replication_factor) { + assert(this_shard_id() == 0); auto keyspace_replication_strategy_options = config.redis_keyspace_replication_strategy_options(); if (!keyspace_replication_strategy_options.contains("class")) { keyspace_replication_strategy_options["class"] = "SimpleStrategy"; keyspace_replication_strategy_options["replication_factor"] = fmt::format("{}", default_replication_factor); } - auto keyspace_gen = [&mm, &config, keyspace_replication_strategy_options = std::move(keyspace_replication_strategy_options)] (sstring name) { + auto keyspace_gen = [&mm, &config, keyspace_replication_strategy_options = std::move(keyspace_replication_strategy_options)] (sstring name) -> future<> { + auto& mml = mm.local(); auto& proxy = service::get_local_storage_proxy(); if (proxy.get_db().local().has_keyspace(name)) { - return make_ready_future<>(); + co_return; } auto attrs = make_shared(); attrs->add_property(cql3::statements::ks_prop_defs::KW_DURABLE_WRITES, "true"); @@ -170,18 +173,22 @@ future<> create_keyspace_if_not_exists_impl(seastar::shardedadd_property(cql3::statements::ks_prop_defs::KW_REPLICATION, replication_properties); attrs->validate(); const auto& tm = *proxy.get_token_metadata_ptr(); - return mm.local().announce_new_keyspace(attrs->as_ks_metadata(name, tm)); + co_return co_await mml.announce(mml.prepare_new_keyspace_announcement(attrs->as_ks_metadata(name, tm))); }; - auto table_gen = [&mm] (sstring ks_name, sstring cf_name, schema_ptr schema) { + auto table_gen = [&mm] (sstring ks_name, sstring cf_name, schema_ptr schema) -> future<> { + auto& mml= mm.local(); auto& proxy = service::get_local_storage_proxy(); if (proxy.get_db().local().has_schema(ks_name, cf_name)) { - return make_ready_future<>(); + co_return; } logger.info("Create keyspace: {}, table: {} for redis.", ks_name, cf_name); - return mm.local().announce_new_column_family(schema); + co_return co_await mml.announce(co_await mml.prepare_new_column_family_announcement(schema)); }; + + co_await mm.local().schema_read_barrier(); + // create default databases for redis. - return parallel_for_each(boost::irange(0, config.redis_database_count()), [keyspace_gen = std::move(keyspace_gen), table_gen = std::move(table_gen)] (auto c) { + co_return co_await parallel_for_each(boost::irange(0, config.redis_database_count()), [keyspace_gen = std::move(keyspace_gen), table_gen = std::move(table_gen)] (auto c) { auto ks_name = fmt::format("REDIS_{}", c); return keyspace_gen(ks_name).then([ks_name, table_gen] { return when_all_succeed( diff --git a/service/client_state.cc b/service/client_state.cc index b50925ebe8..b6b0ab799c 100644 --- a/service/client_state.cc +++ b/service/client_state.cc @@ -128,6 +128,11 @@ future<> service::client_state::has_schema_access(const replica::database& db, c co_return co_await has_access(db, s.ks_name(), {p, r}); } +future<> service::client_state::has_schema_access(const replica::database& db, const sstring& ks_name, const sstring& cf_name, auth::permission p) const { + auth::resource r = auth::make_data_resource(ks_name, cf_name); + co_return co_await has_access(db, ks_name, {p, r}); +} + future<> service::client_state::has_access(const replica::database& db, const sstring& ks, auth::command_desc cmd) const { if (ks.empty()) { return make_exception_future<>(exceptions::invalid_request_exception("You have not set a keyspace for this session")); diff --git a/service/client_state.hh b/service/client_state.hh index 183d9fab8b..a4af627202 100644 --- a/service/client_state.hh +++ b/service/client_state.hh @@ -351,6 +351,7 @@ public: future<> has_column_family_access(const replica::database& db, const sstring&, const sstring&, auth::permission, auth::command_desc::type = auth::command_desc::type::OTHER) const; future<> has_schema_access(const replica::database& db, const schema& s, auth::permission p) const; + future<> has_schema_access(const replica::database& db, const sstring&, const sstring&, auth::permission p) const; private: future<> has_access(const replica::database& db, const sstring& keyspace, auth::command_desc) const; diff --git a/service/migration_manager.cc b/service/migration_manager.cc index 8b1ddbdcec..2a11c4d568 100644 --- a/service/migration_manager.cc +++ b/service/migration_manager.cc @@ -629,26 +629,13 @@ std::vector migration_manager::prepare_keyspace_update_announcement(lw return db::schema_tables::make_create_keyspace_mutations(ksm, api::new_timestamp()); } -future<> migration_manager::announce_keyspace_update(lw_shared_ptr ksm) { - return announce(prepare_keyspace_update_announcement(std::move(ksm))); -} - -future<>migration_manager::announce_new_keyspace(lw_shared_ptr ksm) -{ - return announce_new_keyspace(ksm, api::new_timestamp()); -} - -std::vector migration_manager::prepare_new_keyspace_announcement(lw_shared_ptr ksm, api::timestamp_type timestamp) { +std::vector migration_manager::prepare_new_keyspace_announcement(lw_shared_ptr ksm) { auto& proxy = _storage_proxy; auto& db = proxy.get_db().local(); db.validate_new_keyspace(*ksm); mlogger.info("Create new Keyspace: {}", ksm); - return db::schema_tables::make_create_keyspace_mutations(ksm, timestamp); -} - -future<> migration_manager::announce_new_keyspace(lw_shared_ptr ksm, api::timestamp_type timestamp) { - return announce(prepare_new_keyspace_announcement(std::move(ksm), timestamp)); + return db::schema_tables::make_create_keyspace_mutations(ksm, api::new_timestamp()); } future<> migration_manager::announce_new_column_family(schema_ptr cfm) @@ -836,10 +823,6 @@ std::vector migration_manager::prepare_keyspace_drop_announcement(cons return db::schema_tables::make_drop_keyspace_mutations(keyspace.metadata(), api::new_timestamp()); } -future<> migration_manager::announce_keyspace_drop(const sstring& ks_name) { - return announce(prepare_keyspace_drop_announcement(ks_name)); -} - future> migration_manager::prepare_column_family_drop_announcement(const sstring& ks_name, const sstring& cf_name, drop_views drop_views) { try { diff --git a/service/migration_manager.hh b/service/migration_manager.hh index 5943f2d1c8..5d6ea6e054 100644 --- a/service/migration_manager.hh +++ b/service/migration_manager.hh @@ -129,14 +129,9 @@ public: bool should_pull_schema_from(const gms::inet_address& endpoint); bool has_compatible_schema_tables_version(const gms::inet_address& endpoint); - future<> announce_keyspace_update(lw_shared_ptr ksm); std::vector prepare_keyspace_update_announcement(lw_shared_ptr ksm); - future<> announce_new_keyspace(lw_shared_ptr ksm); - - future<> announce_new_keyspace(lw_shared_ptr ksm, api::timestamp_type timestamp); - - std::vector prepare_new_keyspace_announcement(lw_shared_ptr ksm, api::timestamp_type timestamp); + std::vector prepare_new_keyspace_announcement(lw_shared_ptr ksm); // The timestamp parameter can be used to ensure that all nodes update their internal tables' schemas @@ -162,7 +157,6 @@ public: future> prepare_update_type_announcement(user_type updated_type); - future<> announce_keyspace_drop(const sstring& ks_name); std::vector prepare_keyspace_drop_announcement(const sstring& ks_name); class drop_views_tag; diff --git a/table_helper.cc b/table_helper.cc index 7859db1771..af8f480aec 100644 --- a/table_helper.cc +++ b/table_helper.cc @@ -20,6 +20,7 @@ * along with Scylla. If not, see . */ +#include #include "table_helper.hh" #include "cql3/query_processor.hh" #include "cql3/statements/create_table_statement.hh" @@ -27,22 +28,30 @@ #include "replica/database.hh" #include "service/migration_manager.hh" -future<> table_helper::setup_table(cql3::query_processor& qp) const { +future<> table_helper::setup_table(cql3::query_processor& qp, const sstring& create_cql) { auto db = qp.db(); - if (db.has_schema(_keyspace, _name)) { - return make_ready_future<>(); - } - - auto parsed = cql3::query_processor::parse_statement(_create_cql); + auto parsed = cql3::query_processor::parse_statement(create_cql); cql3::statements::raw::cf_statement* parsed_cf_stmt = static_cast(parsed.get()); - parsed_cf_stmt->prepare_keyspace(_keyspace); + (void)parsed_cf_stmt->keyspace(); // This will assert if cql statement did not contain keyspace ::shared_ptr statement = static_pointer_cast( parsed_cf_stmt->prepare(db, qp.get_cql_stats())->statement); auto schema = statement->get_cf_meta_data(db); + if (db.has_schema(schema->ks_name(), schema->cf_name())) { + co_return; + } + + auto& mm = qp.get_migration_manager(); + + co_await mm.schema_read_barrier(); + + if (db.has_schema(schema->ks_name(), schema->cf_name())) { // re-check after read barrier + co_return; + } + // Generate the CF UUID based on its KF names. This is needed to ensure that // all Nodes that create it would create it with the same UUID and we don't // hit the #420 issue. @@ -55,7 +64,9 @@ future<> table_helper::setup_table(cql3::query_processor& qp) const { // "CREATE TABLE" invocation on different Nodes. // The important thing is that it will converge eventually (some traces may // be lost in a process but that's ok). - return qp.get_migration_manager().announce_new_column_family(b.build()).discard_result().handle_exception([this] (auto ep) {});; + try { + co_return co_await mm.announce(co_await mm.prepare_new_column_family_announcement(b.build())); + } catch (...) {} } future<> table_helper::cache_table_info(cql3::query_processor& qp, service::query_state& qs) { @@ -92,7 +103,9 @@ future<> table_helper::cache_table_info(cql3::query_processor& qp, service::quer }).handle_exception([this, &qp] (auto eptr) { // One of the possible causes for an error here could be the table that doesn't exist. //FIXME: discarded future. - (void)this->setup_table(qp).discard_result(); + (void)qp.container().invoke_on(0, [create_cql = _create_cql] (cql3::query_processor& qp) -> future<> { + co_return co_await table_helper::setup_table(qp, create_cql); + }); // We throw the bad_column_family exception because the caller // expects and accounts this type of errors. @@ -116,35 +129,33 @@ future<> table_helper::insert(cql3::query_processor& qp, service::query_state& q } future<> table_helper::setup_keyspace(cql3::query_processor& qp, std::string_view keyspace_name, sstring replication_factor, service::query_state& qs, std::vector tables) { - if (this_shard_id() == 0) { - size_t n = tables.size(); - for (size_t i = 0; i < n; ++i) { - if (tables[i]->_keyspace != keyspace_name) { - throw std::invalid_argument("setup_keyspace called with table_helper for different keyspace"); - } - } - return seastar::async([&qp, keyspace_name, replication_factor, &qs, tables] { - data_dictionary::database db = qp.db(); - - // Create a keyspace - if (!db.has_keyspace(keyspace_name)) { - std::map opts; - opts["replication_factor"] = replication_factor; - auto ksm = keyspace_metadata::new_keyspace(keyspace_name, "org.apache.cassandra.locator.SimpleStrategy", std::move(opts), true); - // We use min_timestamp so that default keyspace metadata will loose with any manual adjustments. See issue #2129. - qp.get_migration_manager().announce_new_keyspace(ksm, api::min_timestamp).get(); - } - - qs.get_client_state().set_keyspace(db.real_database(), keyspace_name); - - - // Create tables - size_t n = tables.size(); - for (size_t i = 0; i < n; ++i) { - tables[i]->setup_table(qp).get(); - } - }); - } else { - return make_ready_future<>(); + if (this_shard_id() != 0) { + co_return; } + + if (std::any_of(tables.begin(), tables.end(), [&] (table_helper* t) { return t->_keyspace != keyspace_name; })) { + throw std::invalid_argument("setup_keyspace called with table_helper for different keyspace"); + } + + data_dictionary::database db = qp.db(); + auto& mm = qp.get_migration_manager(); + + if (!db.has_keyspace(keyspace_name)) { + co_await mm.schema_read_barrier(); + + // Create a keyspace + if (!db.has_keyspace(keyspace_name)) { + std::map opts; + opts["replication_factor"] = replication_factor; + auto ksm = keyspace_metadata::new_keyspace(keyspace_name, "org.apache.cassandra.locator.SimpleStrategy", std::move(opts), true); + co_await mm.announce(mm.prepare_new_keyspace_announcement(ksm)); + } + } + + qs.get_client_state().set_keyspace(db.real_database(), keyspace_name); + + // Create tables + co_await parallel_for_each(tables, [&qp] (table_helper* t) { + return table_helper::setup_table(qp, t->_create_cql); + }); } diff --git a/table_helper.hh b/table_helper.hh index b584dfcedf..978c39e1c2 100644 --- a/table_helper.hh +++ b/table_helper.hh @@ -65,7 +65,7 @@ public: * @return A future that resolves when the operation is complete. Any * possible errors are ignored. */ - future<> setup_table(cql3::query_processor& qp) const; + static future<> setup_table(cql3::query_processor& qp, const sstring& create_cql); /** * @return a future that resolves when the given t_helper is ready to be used for diff --git a/thrift/handler.cc b/thrift/handler.cc index 1dd6fdf24a..348cf723c9 100644 --- a/thrift/handler.cc +++ b/thrift/handler.cc @@ -869,119 +869,149 @@ public: }); } + static future execute_schema_command(distributed& dmm, distributed& db, std::function>(service::migration_manager&, replica::database&)> ddl) { + auto func = [ddl, &dmm] (replica::database& db) -> future { + auto& mm = dmm.local(); + + co_await mm.schema_read_barrier(); + + co_await mm.announce(co_await ddl(mm, db)); + + co_return std::string(db.get_version().to_sstring()); + }; + co_return co_await db.invoke_on(0, func); + } + void system_add_column_family(thrift_fn::function cob, thrift_fn::function exn_cob, const CfDef& cf_def) { service_permit permit = obtain_permit(); - with_cob(std::move(cob), std::move(exn_cob), [&] { - if (!_db.local().has_keyspace(cf_def.keyspace)) { - throw NotFoundException(); - } - if (_db.local().has_schema(cf_def.keyspace, cf_def.name)) { - throw make_exception("Column family {} already exists", cf_def.name); - } + with_cob(std::move(cob), std::move(exn_cob), [this, def = cf_def] () -> future { + auto& t = *this; + auto cf_def = def; - auto s = schema_from_thrift(cf_def, cf_def.keyspace); - return _query_state.get_client_state().has_keyspace_access(_db.local(), cf_def.keyspace, auth::permission::CREATE).then([this, s = std::move(s)] { - return _query_processor.local().get_migration_manager().announce_new_column_family(std::move(s)).then([this] { - return std::string(_db.local().get_version().to_sstring()); - }); + co_await t._query_state.get_client_state().has_keyspace_access(t._db.local(), cf_def.keyspace, auth::permission::CREATE); + + co_return co_await execute_schema_command(t._query_processor.local().get_migration_manager().container(), _db, [&cf_def] (service::migration_manager& mm, replica::database& db) -> future> { + if (!db.has_keyspace(cf_def.keyspace)) { + throw NotFoundException(); + } + if (db.has_schema(cf_def.keyspace, cf_def.name)) { + throw make_exception("Column family {} already exists", cf_def.name); + } + + auto s = schema_from_thrift(cf_def, cf_def.keyspace); + co_return co_await mm.prepare_new_column_family_announcement(std::move(s)); }); }); } void system_drop_column_family(thrift_fn::function cob, thrift_fn::function exn_cob, const std::string& column_family) { service_permit permit = obtain_permit(); - with_cob(std::move(cob), std::move(exn_cob), [&] { - return _query_state.get_client_state().has_column_family_access(_db.local(), current_keyspace(), column_family, auth::permission::DROP).then([this, column_family] { - auto& cf = _db.local().find_column_family(current_keyspace(), column_family); + with_cob(std::move(cob), std::move(exn_cob), [this, cfm = column_family] () -> future { + auto& t = *this; + auto column_family = cfm; + co_await t._query_state.get_client_state().has_column_family_access(t._db.local(), t.current_keyspace(), column_family, auth::permission::DROP); + + co_return co_await execute_schema_command(t._query_processor.local().get_migration_manager().container(), _db, + [&column_family, ¤t_keyspace = t.current_keyspace()] (service::migration_manager& mm, replica::database& db) -> future> { + auto& cf = db.find_column_family(current_keyspace, column_family); if (cf.schema()->is_view()) { throw make_exception("Cannot drop Materialized Views from Thrift"); } if (!cf.views().empty()) { throw make_exception("Cannot drop table with Materialized Views {}", column_family); } - return _query_processor.local().get_migration_manager().announce_column_family_drop(current_keyspace(), column_family).then([this] { - return std::string(_db.local().get_version().to_sstring()); - }); + + co_return co_await mm.prepare_column_family_drop_announcement(current_keyspace, column_family); }); }); } void system_add_keyspace(thrift_fn::function cob, thrift_fn::function exn_cob, const KsDef& ks_def) { service_permit permit = obtain_permit(); - with_cob(std::move(cob), std::move(exn_cob), [&] { - auto ksm = keyspace_from_thrift(ks_def); - return _query_state.get_client_state().has_all_keyspaces_access(auth::permission::CREATE).then([this, ksm = std::move(ksm)] { - return _query_processor.local().get_migration_manager().announce_new_keyspace(std::move(ksm)).then([this] { - return std::string(_db.local().get_version().to_sstring()); - }); + with_cob(std::move(cob), std::move(exn_cob), [this, def = ks_def] () -> future { + auto& t = *this; + auto ks_def = def; + + co_await t._query_state.get_client_state().has_all_keyspaces_access(auth::permission::CREATE); + + co_return co_await execute_schema_command(t._query_processor.local().get_migration_manager().container(), _db, [&ks_def] (service::migration_manager& mm, replica::database& db) -> future> { + co_return mm.prepare_new_keyspace_announcement(keyspace_from_thrift(ks_def)); }); }); } void system_drop_keyspace(thrift_fn::function cob, thrift_fn::function exn_cob, const std::string& keyspace) { service_permit permit = obtain_permit(); - with_cob(std::move(cob), std::move(exn_cob), [&] { - thrift_validation::validate_keyspace_not_system(keyspace); - if (!_db.local().has_keyspace(keyspace)) { - throw NotFoundException(); - } + with_cob(std::move(cob), std::move(exn_cob), [this, ks = keyspace] () -> future { + auto& t = *this; + auto keyspace = ks; - return _query_state.get_client_state().has_keyspace_access(_db.local(), keyspace, auth::permission::DROP).then([this, keyspace] { - return _query_processor.local().get_migration_manager().announce_keyspace_drop(keyspace).then([this] { - return std::string(_db.local().get_version().to_sstring()); - }); + co_await t._query_state.get_client_state().has_keyspace_access(t._db.local(), keyspace, auth::permission::DROP); + + co_return co_await execute_schema_command(t._query_processor.local().get_migration_manager().container(), _db, [&keyspace] (service::migration_manager& mm, replica::database& db) -> future> { + thrift_validation::validate_keyspace_not_system(keyspace); + if (!db.has_keyspace(keyspace)) { + throw NotFoundException(); + } + + co_return mm.prepare_keyspace_drop_announcement(keyspace); }); }); } void system_update_keyspace(thrift_fn::function cob, thrift_fn::function exn_cob, const KsDef& ks_def) { service_permit permit = obtain_permit(); - with_cob(std::move(cob), std::move(exn_cob), [&] { + with_cob(std::move(cob), std::move(exn_cob), [this, def = ks_def] () -> future { + auto& t = *this; + auto ks_def = def; thrift_validation::validate_keyspace_not_system(ks_def.name); - if (!_db.local().has_keyspace(ks_def.name)) { - throw NotFoundException(); - } - if (!ks_def.cf_defs.empty()) { - throw make_exception("Keyspace update must not contain any column family definitions."); - } + co_await t._query_state.get_client_state().has_keyspace_access(t._db.local(), ks_def.name, auth::permission::ALTER); - auto ksm = keyspace_from_thrift(ks_def); - return _query_state.get_client_state().has_keyspace_access(_db.local(), ks_def.name, auth::permission::ALTER).then([this, ksm = std::move(ksm)] { - return _query_processor.local().get_migration_manager().announce_keyspace_update(std::move(ksm)).then([this] { - return std::string(_db.local().get_version().to_sstring()); - }); + co_return co_await execute_schema_command(t._query_processor.local().get_migration_manager().container(), _db, [&ks_def] (service::migration_manager& mm, replica::database& db) -> future> { + if (db.has_keyspace(ks_def.name)) { + throw NotFoundException(); + } + if (!ks_def.cf_defs.empty()) { + throw make_exception("Keyspace update must not contain any column family definitions."); + } + + auto ksm = keyspace_from_thrift(ks_def); + co_return mm.prepare_keyspace_update_announcement(std::move(ksm)); }); }); } void system_update_column_family(thrift_fn::function cob, thrift_fn::function exn_cob, const CfDef& cf_def) { service_permit permit = obtain_permit(); - with_cob(std::move(cob), std::move(exn_cob), [&] { - auto& cf = _db.local().find_column_family(cf_def.keyspace, cf_def.name); - auto schema = cf.schema(); + with_cob(std::move(cob), std::move(exn_cob), [this, def = cf_def] () -> future { + auto& t = *this; + auto cf_def = def; - if (schema->is_cql3_table()) { - throw make_exception("Cannot modify CQL3 table {} as it may break the schema. You should use cqlsh to modify CQL3 tables instead.", cf_def.name); - } + co_await t._query_state.get_client_state().has_schema_access(t._db.local(), cf_def.keyspace, cf_def.name, auth::permission::ALTER); - if (schema->is_view()) { - throw make_exception("Cannot modify Materialized View table {} as it may break the schema. " - "You should use cqlsh to modify Materialized View tables instead.", cf_def.name); - } + co_return co_await execute_schema_command(t._query_processor.local().get_migration_manager().container(), _db, [&cf_def] (service::migration_manager& mm, replica::database& db) -> future> { + auto& cf = db.find_column_family(cf_def.keyspace, cf_def.name); + auto schema = cf.schema(); - if (!cf.views().empty()) { - throw make_exception("Cannot modify table with Materialized Views {} as it may break the schema. " - "You should use cqlsh to modify Materialized View tables instead.", cf_def.name); - } + if (schema->is_cql3_table()) { + throw make_exception("Cannot modify CQL3 table {} as it may break the schema. You should use cqlsh to modify CQL3 tables instead.", cf_def.name); + } - auto s = schema_from_thrift(cf_def, cf_def.keyspace, schema->id()); - if (schema->thrift().is_dynamic() != s->thrift().is_dynamic()) { - fail(unimplemented::cause::MIXED_CF); - } - return _query_state.get_client_state().has_schema_access(_db.local(), *schema, auth::permission::ALTER).then([this, s = std::move(s)] { - return _query_processor.local().get_migration_manager().announce_column_family_update(std::move(s), true, std::nullopt).then([this] { - return std::string(_db.local().get_version().to_sstring()); - }); + if (schema->is_view()) { + throw make_exception("Cannot modify Materialized View table {} as it may break the schema. " + "You should use cqlsh to modify Materialized View tables instead.", cf_def.name); + } + + if (!cf.views().empty()) { + throw make_exception("Cannot modify table with Materialized Views {} as it may break the schema. " + "You should use cqlsh to modify Materialized View tables instead.", cf_def.name); + } + + auto s = schema_from_thrift(cf_def, cf_def.keyspace, schema->id()); + if (schema->thrift().is_dynamic() != s->thrift().is_dynamic()) { + fail(unimplemented::cause::MIXED_CF); + } + co_return co_await mm.prepare_column_family_update_announcement(std::move(s), true, std::vector(), std::nullopt); }); }); }