service: migration_manager: rename schema_read_barrier to start_group0_operation
1. Generalize the name so it mentions group 0, which schema will be a strict subset of. 2. Remove the fact that it performs a "read barrier" from the name. The function will be used in general to ensure linearizability of group0 operations - both reads and writes. "Read barrier" is Raft-specific terminology, so it can be thought of as an implementation detail.
This commit is contained in:
@@ -494,7 +494,7 @@ future<executor::request_return_type> executor::delete_table(client_state& clien
|
||||
auto& p = _proxy.container();
|
||||
|
||||
co_await _mm.container().invoke_on(0, [&] (service::migration_manager& mm) -> future<> {
|
||||
co_await mm.schema_read_barrier();
|
||||
co_await mm.start_group0_operation();
|
||||
|
||||
if (!p.local().data_dictionary().has_schema(keyspace_name, table_name)) {
|
||||
throw api_error::resource_not_found(format("Requested resource not found: Table: {} not found", table_name));
|
||||
@@ -750,7 +750,7 @@ static void update_tags_map(const rjson::value& tags, std::map<sstring, sstring>
|
||||
// are fixed, this issue will automatically get fixed as well.
|
||||
future<> update_tags(service::migration_manager& mm, schema_ptr schema, std::map<sstring, sstring>&& tags_map) {
|
||||
co_await mm.container().invoke_on(0, [s = global_schema_ptr(std::move(schema)), tags_map = std::move(tags_map)] (service::migration_manager& mm) -> future<> {
|
||||
co_await mm.schema_read_barrier();
|
||||
co_await mm.start_group0_operation();
|
||||
|
||||
schema_builder builder(s);
|
||||
builder.add_extension(tags_extension::NAME, ::make_shared<tags_extension>(tags_map));
|
||||
@@ -875,7 +875,7 @@ static future<executor::request_return_type> create_table_on_shard0(tracing::tra
|
||||
|
||||
verify_billing_mode(request);
|
||||
|
||||
co_await mm.schema_read_barrier();
|
||||
co_await mm.start_group0_operation();
|
||||
|
||||
schema_ptr partial_schema = builder.build();
|
||||
|
||||
@@ -1108,7 +1108,7 @@ future<executor::request_return_type> executor::update_table(client_state& clien
|
||||
|
||||
co_return co_await _mm.container().invoke_on(0, [&p = _proxy.container(), request = std::move(request), gt = tracing::global_trace_state_ptr(std::move(trace_state))]
|
||||
(service::migration_manager& mm) mutable -> future<executor::request_return_type> {
|
||||
co_await mm.schema_read_barrier();
|
||||
co_await mm.start_group0_operation();
|
||||
|
||||
schema_ptr tab = get_table(p.local(), request);
|
||||
|
||||
|
||||
@@ -68,7 +68,7 @@ static future<> create_metadata_table_if_missing_impl(
|
||||
schema_ptr table = b.build();
|
||||
|
||||
if (!db.has_schema(table->ks_name(), table->cf_name())) {
|
||||
co_await mm.schema_read_barrier();
|
||||
co_await mm.start_group0_operation();
|
||||
try {
|
||||
co_return co_await mm.announce(co_await mm.prepare_new_column_family_announcement(table, api::new_timestamp()));
|
||||
} catch (exceptions::already_exists_exception&) {}
|
||||
|
||||
@@ -135,7 +135,7 @@ future<> service::create_keyspace_if_missing(::service::migration_manager& mm) c
|
||||
auto db = _qp.db();
|
||||
|
||||
if (!db.has_keyspace(meta::AUTH_KS)) {
|
||||
co_await mm.schema_read_barrier();
|
||||
co_await mm.start_group0_operation();
|
||||
|
||||
if (!db.has_keyspace(meta::AUTH_KS)) {
|
||||
locator::replication_strategy_config_options opts{{"replication_factor", "1"}};
|
||||
|
||||
@@ -76,7 +76,7 @@ schema_altering_statement::execute0(query_processor& qp, service::query_state& s
|
||||
std::move(const_cast<cql3::query_options&>(options).take_cached_pk_function_calls()));
|
||||
}
|
||||
|
||||
co_await mm.schema_read_barrier();
|
||||
co_await mm.start_group0_operation();
|
||||
|
||||
auto [ret, m] = co_await prepare_schema_mutations(qp, api::new_timestamp());
|
||||
|
||||
|
||||
@@ -245,7 +245,7 @@ future<> system_distributed_keyspace::start() {
|
||||
}
|
||||
|
||||
if (!_sp.get_db().local().has_keyspace(NAME)) {
|
||||
co_await _mm.schema_read_barrier();
|
||||
co_await _mm.start_group0_operation();
|
||||
|
||||
try {
|
||||
auto ksm = keyspace_metadata::new_keyspace(
|
||||
@@ -260,7 +260,7 @@ future<> system_distributed_keyspace::start() {
|
||||
}
|
||||
|
||||
if (!_sp.get_db().local().has_keyspace(NAME_EVERYWHERE)) {
|
||||
co_await _mm.schema_read_barrier();
|
||||
co_await _mm.start_group0_operation();
|
||||
|
||||
try {
|
||||
auto ksm = keyspace_metadata::new_keyspace(
|
||||
@@ -280,7 +280,7 @@ future<> system_distributed_keyspace::start() {
|
||||
});
|
||||
|
||||
if (!exist) {
|
||||
co_await _mm.schema_read_barrier();
|
||||
co_await _mm.start_group0_operation();
|
||||
|
||||
auto m = co_await map_reduce(tables,
|
||||
/* Mapper */ [this] (auto&& table) -> future<std::vector<mutation>> {
|
||||
@@ -302,7 +302,7 @@ future<> system_distributed_keyspace::start() {
|
||||
|
||||
_started = true;
|
||||
if (has_missing_columns(_qp.db())) {
|
||||
co_await _mm.schema_read_barrier();
|
||||
co_await _mm.start_group0_operation();
|
||||
co_await add_new_columns_if_missing(_qp.db().real_database(), _mm);
|
||||
} else {
|
||||
dlogger.info("All schemas are uptodate on start");
|
||||
|
||||
@@ -179,7 +179,7 @@ future<> create_keyspace_if_not_exists_impl(seastar::sharded<service::storage_pr
|
||||
auto& mml = mm.local();
|
||||
auto tm = proxy.local().get_token_metadata_ptr();
|
||||
|
||||
co_await mml.schema_read_barrier();
|
||||
co_await mml.start_group0_operation();
|
||||
|
||||
std::vector<mutation> ks_mutations;
|
||||
for (auto& ks_name: ks_names) {
|
||||
|
||||
@@ -908,7 +908,7 @@ future<> migration_manager::announce(std::vector<mutation> schema) {
|
||||
return announce_without_raft(std::move(schema));
|
||||
}
|
||||
|
||||
future<> migration_manager::schema_read_barrier() {
|
||||
future<> migration_manager::start_group0_operation() {
|
||||
if (_raft_gr.is_enabled()) {
|
||||
assert(this_shard_id() == 0);
|
||||
return _raft_gr.group0().read_barrier();
|
||||
|
||||
@@ -138,8 +138,11 @@ public:
|
||||
|
||||
future<std::vector<mutation>> prepare_view_drop_announcement(const sstring& ks_name, const sstring& cf_name, api::timestamp_type);
|
||||
|
||||
// the function need to be called if a user wants to access most up-to-date schema state
|
||||
future<> schema_read_barrier();
|
||||
// The function needs to be called if the user wants to read most up-to-date group0 state (including schema state)
|
||||
// (the function ensures that all previously finished group0 operations are visible on this node) or to write it.
|
||||
// Call ONLY on shard 0.
|
||||
// Requires a quorum of nodes to be available.
|
||||
future<> start_group0_operation();
|
||||
|
||||
// used to check if raft is enabled on the cluster
|
||||
bool is_raft_enabled() { return _raft_gr.is_enabled(); }
|
||||
|
||||
@@ -33,7 +33,7 @@ future<> table_helper::setup_table(cql3::query_processor& qp, const sstring& cre
|
||||
|
||||
auto& mm = qp.get_migration_manager();
|
||||
|
||||
co_await mm.schema_read_barrier();
|
||||
co_await mm.start_group0_operation();
|
||||
|
||||
if (db.has_schema(schema->ks_name(), schema->cf_name())) { // re-check after read barrier
|
||||
co_return;
|
||||
@@ -128,7 +128,7 @@ future<> table_helper::setup_keyspace(cql3::query_processor& qp, std::string_vie
|
||||
auto& mm = qp.get_migration_manager();
|
||||
|
||||
if (!db.has_keyspace(keyspace_name)) {
|
||||
co_await mm.schema_read_barrier();
|
||||
co_await mm.start_group0_operation();
|
||||
|
||||
// Create a keyspace
|
||||
if (!db.has_keyspace(keyspace_name)) {
|
||||
|
||||
@@ -863,7 +863,7 @@ public:
|
||||
auto func = [ddl, &dmm] (cql3::query_processor& qp) -> future<std::string> {
|
||||
auto& mm = dmm.local();
|
||||
|
||||
co_await mm.schema_read_barrier();
|
||||
co_await mm.start_group0_operation();
|
||||
|
||||
co_await mm.announce(co_await ddl(mm, qp.db()));
|
||||
|
||||
|
||||
Reference in New Issue
Block a user