db/system_distributed_keyspace: fix indentation
Broken in the previous commit.
This commit is contained in:
@@ -246,85 +246,85 @@ future<> system_distributed_keyspace::start() {
|
||||
auto tables = ensured_tables();
|
||||
|
||||
while (true) {
|
||||
// Check if there is any work to do before taking the group 0 guard.
|
||||
bool keyspaces_setup = db.has_keyspace(NAME) && db.has_keyspace(NAME_EVERYWHERE);
|
||||
bool tables_setup = std::all_of(tables.begin(), tables.end(), [db] (schema_ptr t) { return db.has_schema(t->ks_name(), t->cf_name()); } );
|
||||
bool service_levels_up_to_date = get_current_service_levels(db)->equal_columns(*get_updated_service_levels(db));
|
||||
if (keyspaces_setup && tables_setup && service_levels_up_to_date) {
|
||||
dlogger.info("system_distributed(_everywhere) keyspaces and tables are up-to-date. Not creating");
|
||||
// Check if there is any work to do before taking the group 0 guard.
|
||||
bool keyspaces_setup = db.has_keyspace(NAME) && db.has_keyspace(NAME_EVERYWHERE);
|
||||
bool tables_setup = std::all_of(tables.begin(), tables.end(), [db] (schema_ptr t) { return db.has_schema(t->ks_name(), t->cf_name()); } );
|
||||
bool service_levels_up_to_date = get_current_service_levels(db)->equal_columns(*get_updated_service_levels(db));
|
||||
if (keyspaces_setup && tables_setup && service_levels_up_to_date) {
|
||||
dlogger.info("system_distributed(_everywhere) keyspaces and tables are up-to-date. Not creating");
|
||||
_started = true;
|
||||
co_return;
|
||||
}
|
||||
|
||||
auto group0_guard = co_await _mm.start_group0_operation();
|
||||
auto ts = group0_guard.write_timestamp();
|
||||
std::vector<mutation> mutations;
|
||||
sstring description;
|
||||
|
||||
auto sd_ksm = keyspace_metadata::new_keyspace(
|
||||
NAME,
|
||||
"org.apache.cassandra.locator.SimpleStrategy",
|
||||
{{"replication_factor", "3"}},
|
||||
true /* durable_writes */);
|
||||
if (!db.has_keyspace(NAME)) {
|
||||
mutations = service::prepare_new_keyspace_announcement(db.real_database(), sd_ksm, ts);
|
||||
description += format(" create {} keyspace;", NAME);
|
||||
} else {
|
||||
dlogger.info("{} keyspace is already present. Not creating", NAME);
|
||||
}
|
||||
|
||||
auto sde_ksm = keyspace_metadata::new_keyspace(
|
||||
NAME_EVERYWHERE,
|
||||
"org.apache.cassandra.locator.EverywhereStrategy",
|
||||
{},
|
||||
true /* durable_writes */);
|
||||
if (!db.has_keyspace(NAME_EVERYWHERE)) {
|
||||
auto sde_mutations = service::prepare_new_keyspace_announcement(db.real_database(), sde_ksm, ts);
|
||||
std::move(sde_mutations.begin(), sde_mutations.end(), std::back_inserter(mutations));
|
||||
description += format(" create {} keyspace;", NAME_EVERYWHERE);
|
||||
} else {
|
||||
dlogger.info("{} keyspace is already present. Not creating", NAME_EVERYWHERE);
|
||||
}
|
||||
|
||||
// Get mutations for creating and updating tables.
|
||||
auto num_keyspace_mutations = mutations.size();
|
||||
co_await coroutine::parallel_for_each(ensured_tables(),
|
||||
[this, &mutations, db, ts, sd_ksm, sde_ksm] (auto&& table) -> future<> {
|
||||
auto ksm = table->ks_name() == NAME ? sd_ksm : sde_ksm;
|
||||
|
||||
// Ensure that the service_levels table contains new columns.
|
||||
if (table->cf_name() == SERVICE_LEVELS) {
|
||||
table = get_updated_service_levels(db);
|
||||
}
|
||||
|
||||
if (!db.has_schema(table->ks_name(), table->cf_name())) {
|
||||
co_return co_await service::prepare_new_column_family_announcement(mutations, _sp, *ksm, std::move(table), ts);
|
||||
}
|
||||
|
||||
// The service_levels table exists. Update it if it lacks new columns.
|
||||
if (table->cf_name() == SERVICE_LEVELS && !get_current_service_levels(db)->equal_columns(*table)) {
|
||||
auto update_mutations = co_await service::prepare_column_family_update_announcement(_sp, table, false, std::vector<view_ptr>(), ts);
|
||||
std::move(update_mutations.begin(), update_mutations.end(), std::back_inserter(mutations));
|
||||
}
|
||||
});
|
||||
if (mutations.size() > num_keyspace_mutations) {
|
||||
description += " create and update system_distributed(_everywhere) tables";
|
||||
} else {
|
||||
dlogger.info("All tables are present and up-to-date on start");
|
||||
}
|
||||
|
||||
if (!mutations.empty()) {
|
||||
try {
|
||||
co_await _mm.announce(std::move(mutations), std::move(group0_guard), description);
|
||||
} catch (service::group0_concurrent_modification&) {
|
||||
dlogger.info("Concurrent operation is detected while starting, retrying.");
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
_started = true;
|
||||
co_return;
|
||||
}
|
||||
|
||||
auto group0_guard = co_await _mm.start_group0_operation();
|
||||
auto ts = group0_guard.write_timestamp();
|
||||
std::vector<mutation> mutations;
|
||||
sstring description;
|
||||
|
||||
auto sd_ksm = keyspace_metadata::new_keyspace(
|
||||
NAME,
|
||||
"org.apache.cassandra.locator.SimpleStrategy",
|
||||
{{"replication_factor", "3"}},
|
||||
true /* durable_writes */);
|
||||
if (!db.has_keyspace(NAME)) {
|
||||
mutations = service::prepare_new_keyspace_announcement(db.real_database(), sd_ksm, ts);
|
||||
description += format(" create {} keyspace;", NAME);
|
||||
} else {
|
||||
dlogger.info("{} keyspace is already present. Not creating", NAME);
|
||||
}
|
||||
|
||||
auto sde_ksm = keyspace_metadata::new_keyspace(
|
||||
NAME_EVERYWHERE,
|
||||
"org.apache.cassandra.locator.EverywhereStrategy",
|
||||
{},
|
||||
true /* durable_writes */);
|
||||
if (!db.has_keyspace(NAME_EVERYWHERE)) {
|
||||
auto sde_mutations = service::prepare_new_keyspace_announcement(db.real_database(), sde_ksm, ts);
|
||||
std::move(sde_mutations.begin(), sde_mutations.end(), std::back_inserter(mutations));
|
||||
description += format(" create {} keyspace;", NAME_EVERYWHERE);
|
||||
} else {
|
||||
dlogger.info("{} keyspace is already present. Not creating", NAME_EVERYWHERE);
|
||||
}
|
||||
|
||||
// Get mutations for creating and updating tables.
|
||||
auto num_keyspace_mutations = mutations.size();
|
||||
co_await coroutine::parallel_for_each(ensured_tables(),
|
||||
[this, &mutations, db, ts, sd_ksm, sde_ksm] (auto&& table) -> future<> {
|
||||
auto ksm = table->ks_name() == NAME ? sd_ksm : sde_ksm;
|
||||
|
||||
// Ensure that the service_levels table contains new columns.
|
||||
if (table->cf_name() == SERVICE_LEVELS) {
|
||||
table = get_updated_service_levels(db);
|
||||
}
|
||||
|
||||
if (!db.has_schema(table->ks_name(), table->cf_name())) {
|
||||
co_return co_await service::prepare_new_column_family_announcement(mutations, _sp, *ksm, std::move(table), ts);
|
||||
}
|
||||
|
||||
// The service_levels table exists. Update it if it lacks new columns.
|
||||
if (table->cf_name() == SERVICE_LEVELS && !get_current_service_levels(db)->equal_columns(*table)) {
|
||||
auto update_mutations = co_await service::prepare_column_family_update_announcement(_sp, table, false, std::vector<view_ptr>(), ts);
|
||||
std::move(update_mutations.begin(), update_mutations.end(), std::back_inserter(mutations));
|
||||
}
|
||||
});
|
||||
if (mutations.size() > num_keyspace_mutations) {
|
||||
description += " create and update system_distributed(_everywhere) tables";
|
||||
} else {
|
||||
dlogger.info("All tables are present and up-to-date on start");
|
||||
}
|
||||
|
||||
if (!mutations.empty()) {
|
||||
try {
|
||||
co_await _mm.announce(std::move(mutations), std::move(group0_guard), description);
|
||||
} catch (service::group0_concurrent_modification&) {
|
||||
dlogger.info("Concurrent operation is detected while starting, retrying.");
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
_started = true;
|
||||
co_return;
|
||||
}
|
||||
}
|
||||
|
||||
future<> system_distributed_keyspace::stop() {
|
||||
|
||||
Reference in New Issue
Block a user