diff --git a/db/system_distributed_keyspace.cc b/db/system_distributed_keyspace.cc index 2cf2d1e9b8..51451b545b 100644 --- a/db/system_distributed_keyspace.cc +++ b/db/system_distributed_keyspace.cc @@ -246,85 +246,85 @@ future<> system_distributed_keyspace::start() { auto tables = ensured_tables(); while (true) { - // Check if there is any work to do before taking the group 0 guard. - bool keyspaces_setup = db.has_keyspace(NAME) && db.has_keyspace(NAME_EVERYWHERE); - bool tables_setup = std::all_of(tables.begin(), tables.end(), [db] (schema_ptr t) { return db.has_schema(t->ks_name(), t->cf_name()); } ); - bool service_levels_up_to_date = get_current_service_levels(db)->equal_columns(*get_updated_service_levels(db)); - if (keyspaces_setup && tables_setup && service_levels_up_to_date) { - dlogger.info("system_distributed(_everywhere) keyspaces and tables are up-to-date. Not creating"); + // Check if there is any work to do before taking the group 0 guard. + bool keyspaces_setup = db.has_keyspace(NAME) && db.has_keyspace(NAME_EVERYWHERE); + bool tables_setup = std::all_of(tables.begin(), tables.end(), [db] (schema_ptr t) { return db.has_schema(t->ks_name(), t->cf_name()); } ); + bool service_levels_up_to_date = get_current_service_levels(db)->equal_columns(*get_updated_service_levels(db)); + if (keyspaces_setup && tables_setup && service_levels_up_to_date) { + dlogger.info("system_distributed(_everywhere) keyspaces and tables are up-to-date. Not creating"); + _started = true; + co_return; + } + + auto group0_guard = co_await _mm.start_group0_operation(); + auto ts = group0_guard.write_timestamp(); + std::vector mutations; + sstring description; + + auto sd_ksm = keyspace_metadata::new_keyspace( + NAME, + "org.apache.cassandra.locator.SimpleStrategy", + {{"replication_factor", "3"}}, + true /* durable_writes */); + if (!db.has_keyspace(NAME)) { + mutations = service::prepare_new_keyspace_announcement(db.real_database(), sd_ksm, ts); + description += format(" create {} keyspace;", NAME); + } else { + dlogger.info("{} keyspace is already present. Not creating", NAME); + } + + auto sde_ksm = keyspace_metadata::new_keyspace( + NAME_EVERYWHERE, + "org.apache.cassandra.locator.EverywhereStrategy", + {}, + true /* durable_writes */); + if (!db.has_keyspace(NAME_EVERYWHERE)) { + auto sde_mutations = service::prepare_new_keyspace_announcement(db.real_database(), sde_ksm, ts); + std::move(sde_mutations.begin(), sde_mutations.end(), std::back_inserter(mutations)); + description += format(" create {} keyspace;", NAME_EVERYWHERE); + } else { + dlogger.info("{} keyspace is already present. Not creating", NAME_EVERYWHERE); + } + + // Get mutations for creating and updating tables. + auto num_keyspace_mutations = mutations.size(); + co_await coroutine::parallel_for_each(ensured_tables(), + [this, &mutations, db, ts, sd_ksm, sde_ksm] (auto&& table) -> future<> { + auto ksm = table->ks_name() == NAME ? sd_ksm : sde_ksm; + + // Ensure that the service_levels table contains new columns. + if (table->cf_name() == SERVICE_LEVELS) { + table = get_updated_service_levels(db); + } + + if (!db.has_schema(table->ks_name(), table->cf_name())) { + co_return co_await service::prepare_new_column_family_announcement(mutations, _sp, *ksm, std::move(table), ts); + } + + // The service_levels table exists. Update it if it lacks new columns. + if (table->cf_name() == SERVICE_LEVELS && !get_current_service_levels(db)->equal_columns(*table)) { + auto update_mutations = co_await service::prepare_column_family_update_announcement(_sp, table, false, std::vector(), ts); + std::move(update_mutations.begin(), update_mutations.end(), std::back_inserter(mutations)); + } + }); + if (mutations.size() > num_keyspace_mutations) { + description += " create and update system_distributed(_everywhere) tables"; + } else { + dlogger.info("All tables are present and up-to-date on start"); + } + + if (!mutations.empty()) { + try { + co_await _mm.announce(std::move(mutations), std::move(group0_guard), description); + } catch (service::group0_concurrent_modification&) { + dlogger.info("Concurrent operation is detected while starting, retrying."); + continue; + } + } + _started = true; co_return; } - - auto group0_guard = co_await _mm.start_group0_operation(); - auto ts = group0_guard.write_timestamp(); - std::vector mutations; - sstring description; - - auto sd_ksm = keyspace_metadata::new_keyspace( - NAME, - "org.apache.cassandra.locator.SimpleStrategy", - {{"replication_factor", "3"}}, - true /* durable_writes */); - if (!db.has_keyspace(NAME)) { - mutations = service::prepare_new_keyspace_announcement(db.real_database(), sd_ksm, ts); - description += format(" create {} keyspace;", NAME); - } else { - dlogger.info("{} keyspace is already present. Not creating", NAME); - } - - auto sde_ksm = keyspace_metadata::new_keyspace( - NAME_EVERYWHERE, - "org.apache.cassandra.locator.EverywhereStrategy", - {}, - true /* durable_writes */); - if (!db.has_keyspace(NAME_EVERYWHERE)) { - auto sde_mutations = service::prepare_new_keyspace_announcement(db.real_database(), sde_ksm, ts); - std::move(sde_mutations.begin(), sde_mutations.end(), std::back_inserter(mutations)); - description += format(" create {} keyspace;", NAME_EVERYWHERE); - } else { - dlogger.info("{} keyspace is already present. Not creating", NAME_EVERYWHERE); - } - - // Get mutations for creating and updating tables. - auto num_keyspace_mutations = mutations.size(); - co_await coroutine::parallel_for_each(ensured_tables(), - [this, &mutations, db, ts, sd_ksm, sde_ksm] (auto&& table) -> future<> { - auto ksm = table->ks_name() == NAME ? sd_ksm : sde_ksm; - - // Ensure that the service_levels table contains new columns. - if (table->cf_name() == SERVICE_LEVELS) { - table = get_updated_service_levels(db); - } - - if (!db.has_schema(table->ks_name(), table->cf_name())) { - co_return co_await service::prepare_new_column_family_announcement(mutations, _sp, *ksm, std::move(table), ts); - } - - // The service_levels table exists. Update it if it lacks new columns. - if (table->cf_name() == SERVICE_LEVELS && !get_current_service_levels(db)->equal_columns(*table)) { - auto update_mutations = co_await service::prepare_column_family_update_announcement(_sp, table, false, std::vector(), ts); - std::move(update_mutations.begin(), update_mutations.end(), std::back_inserter(mutations)); - } - }); - if (mutations.size() > num_keyspace_mutations) { - description += " create and update system_distributed(_everywhere) tables"; - } else { - dlogger.info("All tables are present and up-to-date on start"); - } - - if (!mutations.empty()) { - try { - co_await _mm.announce(std::move(mutations), std::move(group0_guard), description); - } catch (service::group0_concurrent_modification&) { - dlogger.info("Concurrent operation is detected while starting, retrying."); - continue; - } - } - - _started = true; - co_return; - } } future<> system_distributed_keyspace::stop() {