diff --git a/alternator/executor.cc b/alternator/executor.cc index c7cd484461..b2adb6d814 100644 --- a/alternator/executor.cc +++ b/alternator/executor.cc @@ -494,7 +494,7 @@ future executor::delete_table(client_state& clien auto& p = _proxy.container(); co_await _mm.container().invoke_on(0, [&] (service::migration_manager& mm) -> future<> { - co_await mm.schema_read_barrier(); + co_await mm.start_group0_operation(); if (!p.local().data_dictionary().has_schema(keyspace_name, table_name)) { throw api_error::resource_not_found(format("Requested resource not found: Table: {} not found", table_name)); @@ -750,7 +750,7 @@ static void update_tags_map(const rjson::value& tags, std::map // are fixed, this issue will automatically get fixed as well. future<> update_tags(service::migration_manager& mm, schema_ptr schema, std::map&& tags_map) { co_await mm.container().invoke_on(0, [s = global_schema_ptr(std::move(schema)), tags_map = std::move(tags_map)] (service::migration_manager& mm) -> future<> { - co_await mm.schema_read_barrier(); + co_await mm.start_group0_operation(); schema_builder builder(s); builder.add_extension(tags_extension::NAME, ::make_shared(tags_map)); @@ -875,7 +875,7 @@ static future create_table_on_shard0(tracing::tra verify_billing_mode(request); - co_await mm.schema_read_barrier(); + co_await mm.start_group0_operation(); schema_ptr partial_schema = builder.build(); @@ -1108,7 +1108,7 @@ future executor::update_table(client_state& clien co_return co_await _mm.container().invoke_on(0, [&p = _proxy.container(), request = std::move(request), gt = tracing::global_trace_state_ptr(std::move(trace_state))] (service::migration_manager& mm) mutable -> future { - co_await mm.schema_read_barrier(); + co_await mm.start_group0_operation(); schema_ptr tab = get_table(p.local(), request); diff --git a/auth/common.cc b/auth/common.cc index be9fedfa5d..6c5d56e8dc 100644 --- a/auth/common.cc +++ b/auth/common.cc @@ -68,7 +68,7 @@ static future<> create_metadata_table_if_missing_impl( schema_ptr table = b.build(); if (!db.has_schema(table->ks_name(), table->cf_name())) { - co_await mm.schema_read_barrier(); + co_await mm.start_group0_operation(); try { co_return co_await mm.announce(co_await mm.prepare_new_column_family_announcement(table, api::new_timestamp())); } catch (exceptions::already_exists_exception&) {} diff --git a/auth/service.cc b/auth/service.cc index fcbc5efd15..1ae15800b5 100644 --- a/auth/service.cc +++ b/auth/service.cc @@ -135,7 +135,7 @@ future<> service::create_keyspace_if_missing(::service::migration_manager& mm) c auto db = _qp.db(); if (!db.has_keyspace(meta::AUTH_KS)) { - co_await mm.schema_read_barrier(); + co_await mm.start_group0_operation(); if (!db.has_keyspace(meta::AUTH_KS)) { locator::replication_strategy_config_options opts{{"replication_factor", "1"}}; diff --git a/cql3/statements/schema_altering_statement.cc b/cql3/statements/schema_altering_statement.cc index a460265b12..9408539d01 100644 --- a/cql3/statements/schema_altering_statement.cc +++ b/cql3/statements/schema_altering_statement.cc @@ -76,7 +76,7 @@ schema_altering_statement::execute0(query_processor& qp, service::query_state& s std::move(const_cast(options).take_cached_pk_function_calls())); } - co_await mm.schema_read_barrier(); + co_await mm.start_group0_operation(); auto [ret, m] = co_await prepare_schema_mutations(qp, api::new_timestamp()); diff --git a/db/system_distributed_keyspace.cc b/db/system_distributed_keyspace.cc index f2cc560c7b..7a0f7d4fd1 100644 --- a/db/system_distributed_keyspace.cc +++ b/db/system_distributed_keyspace.cc @@ -245,7 +245,7 @@ future<> system_distributed_keyspace::start() { } if (!_sp.get_db().local().has_keyspace(NAME)) { - co_await _mm.schema_read_barrier(); + co_await _mm.start_group0_operation(); try { auto ksm = keyspace_metadata::new_keyspace( @@ -260,7 +260,7 @@ future<> system_distributed_keyspace::start() { } if (!_sp.get_db().local().has_keyspace(NAME_EVERYWHERE)) { - co_await _mm.schema_read_barrier(); + co_await _mm.start_group0_operation(); try { auto ksm = keyspace_metadata::new_keyspace( @@ -280,7 +280,7 @@ future<> system_distributed_keyspace::start() { }); if (!exist) { - co_await _mm.schema_read_barrier(); + co_await _mm.start_group0_operation(); auto m = co_await map_reduce(tables, /* Mapper */ [this] (auto&& table) -> future> { @@ -302,7 +302,7 @@ future<> system_distributed_keyspace::start() { _started = true; if (has_missing_columns(_qp.db())) { - co_await _mm.schema_read_barrier(); + co_await _mm.start_group0_operation(); co_await add_new_columns_if_missing(_qp.db().real_database(), _mm); } else { dlogger.info("All schemas are uptodate on start"); diff --git a/redis/keyspace_utils.cc b/redis/keyspace_utils.cc index da35cdcf18..f336d88c4d 100644 --- a/redis/keyspace_utils.cc +++ b/redis/keyspace_utils.cc @@ -179,7 +179,7 @@ future<> create_keyspace_if_not_exists_impl(seastar::sharded ks_mutations; for (auto& ks_name: ks_names) { diff --git a/service/migration_manager.cc b/service/migration_manager.cc index 6a28c742fc..82c2f414e3 100644 --- a/service/migration_manager.cc +++ b/service/migration_manager.cc @@ -908,7 +908,7 @@ future<> migration_manager::announce(std::vector schema) { return announce_without_raft(std::move(schema)); } -future<> migration_manager::schema_read_barrier() { +future<> migration_manager::start_group0_operation() { if (_raft_gr.is_enabled()) { assert(this_shard_id() == 0); return _raft_gr.group0().read_barrier(); diff --git a/service/migration_manager.hh b/service/migration_manager.hh index c17f4053b5..2942173afb 100644 --- a/service/migration_manager.hh +++ b/service/migration_manager.hh @@ -138,8 +138,11 @@ public: future> prepare_view_drop_announcement(const sstring& ks_name, const sstring& cf_name, api::timestamp_type); - // the function need to be called if a user wants to access most up-to-date schema state - future<> schema_read_barrier(); + // The function needs to be called if the user wants to read most up-to-date group0 state (including schema state) + // (the function ensures that all previously finished group0 operations are visible on this node) or to write it. + // Call ONLY on shard 0. + // Requires a quorum of nodes to be available. + future<> start_group0_operation(); // used to check if raft is enabled on the cluster bool is_raft_enabled() { return _raft_gr.is_enabled(); } diff --git a/table_helper.cc b/table_helper.cc index cdd35598e5..0ad197627b 100644 --- a/table_helper.cc +++ b/table_helper.cc @@ -33,7 +33,7 @@ future<> table_helper::setup_table(cql3::query_processor& qp, const sstring& cre auto& mm = qp.get_migration_manager(); - co_await mm.schema_read_barrier(); + co_await mm.start_group0_operation(); if (db.has_schema(schema->ks_name(), schema->cf_name())) { // re-check after read barrier co_return; @@ -128,7 +128,7 @@ future<> table_helper::setup_keyspace(cql3::query_processor& qp, std::string_vie auto& mm = qp.get_migration_manager(); if (!db.has_keyspace(keyspace_name)) { - co_await mm.schema_read_barrier(); + co_await mm.start_group0_operation(); // Create a keyspace if (!db.has_keyspace(keyspace_name)) { diff --git a/thrift/handler.cc b/thrift/handler.cc index 14d8868295..b54a26047e 100644 --- a/thrift/handler.cc +++ b/thrift/handler.cc @@ -863,7 +863,7 @@ public: auto func = [ddl, &dmm] (cql3::query_processor& qp) -> future { auto& mm = dmm.local(); - co_await mm.schema_read_barrier(); + co_await mm.start_group0_operation(); co_await mm.announce(co_await ddl(mm, qp.db()));