Revert "Merge 'reduce announcements of the automatic schema changes ' from Patryk Jędrzejczak"

This reverts commit 4b80130b0b, reversing
changes made to a5519c7c1f. It's suspected
of causing dtest failures due to a bug in coroutine::parallel_for_each.
This commit is contained in:
Avi Kivity
2023-10-29 18:32:06 +02:00
parent f08e7aad61
commit d450a145ce
9 changed files with 205 additions and 215 deletions

View File

@@ -80,7 +80,7 @@ static sstring_view table_status_to_sstring(table_status tbl_status) {
return "UKNOWN";
}
static lw_shared_ptr<keyspace_metadata> create_keyspace_metadata(std::string_view keyspace_name, service::storage_proxy& sp, gms::gossiper& gossiper, api::timestamp_type);
static future<std::vector<mutation>> create_keyspace(std::string_view keyspace_name, service::storage_proxy& sp, gms::gossiper& gossiper, api::timestamp_type);
static map_type attrs_type() {
static thread_local auto t = map_type_impl::get_instance(utf8_type, bytes_type, true);
@@ -1121,9 +1121,8 @@ static future<executor::request_return_type> create_table_on_shard0(tracing::tra
auto group0_guard = co_await mm.start_group0_operation();
auto ts = group0_guard.write_timestamp();
std::vector<mutation> schema_mutations;
auto ksm = create_keyspace_metadata(keyspace_name, sp, gossiper, ts);
try {
schema_mutations = service::prepare_new_keyspace_announcement(sp.local_db(), ksm, ts);
schema_mutations = co_await create_keyspace(keyspace_name, sp, gossiper, ts);
} catch (exceptions::already_exists_exception&) {
if (sp.data_dictionary().has_schema(keyspace_name, table_name)) {
co_return api_error::resource_in_use(format("Table {} already exists", table_name));
@@ -1133,7 +1132,15 @@ static future<executor::request_return_type> create_table_on_shard0(tracing::tra
// This should never happen, the ID is supposed to be unique
co_return api_error::internal(format("Table with ID {} already exists", schema->id()));
}
co_await service::prepare_new_column_family_announcement(schema_mutations, sp, *ksm, schema, ts);
db::schema_tables::add_table_or_view_to_schema_mutation(schema, ts, true, schema_mutations);
// we must call before_create_column_family callbacks - which allow
// listeners to modify our schema_mutations. For example, CDC may add
// another table (the CDC log table) to the same keyspace.
// Unfortunately the convention is that this callback must be run in
// a Seastar thread.
co_await seastar::async([&] {
mm.get_notifier().before_create_column_family(*schema, schema_mutations, ts);
});
for (schema_builder& view_builder : view_builders) {
db::schema_tables::add_table_or_view_to_schema_mutation(
view_ptr(view_builder.build()), ts, true, schema_mutations);
@@ -4457,23 +4464,25 @@ future<executor::request_return_type> executor::describe_continuous_backups(clie
co_return make_jsonable(std::move(response));
}
// Create the metadata for the keyspace in which we put the alternator
// table if it doesn't already exist.
// Create the keyspace in which we put the alternator table, if it doesn't
// already exist.
// Currently, we automatically configure the keyspace based on the number
// of nodes in the cluster: A cluster with 3 or more live nodes, gets RF=3.
// A smaller cluster (presumably, a test only), gets RF=1. The user may
// manually create the keyspace to override this predefined behavior.
static lw_shared_ptr<keyspace_metadata> create_keyspace_metadata(std::string_view keyspace_name, service::storage_proxy& sp, gms::gossiper& gossiper, api::timestamp_type ts) {
static future<std::vector<mutation>> create_keyspace(std::string_view keyspace_name, service::storage_proxy& sp, gms::gossiper& gossiper, api::timestamp_type ts) {
sstring keyspace_name_str(keyspace_name);
int endpoint_count = gossiper.num_endpoints();
int rf = 3;
if (endpoint_count < rf) {
rf = 1;
elogger.warn("Creating keyspace '{}' for Alternator with unsafe RF={} because cluster only has {} nodes.",
keyspace_name, rf, endpoint_count);
keyspace_name_str, rf, endpoint_count);
}
auto opts = get_network_topology_options(sp, gossiper, rf);
auto ksm = keyspace_metadata::new_keyspace(keyspace_name_str, "org.apache.cassandra.locator.NetworkTopologyStrategy", std::move(opts), true);
return keyspace_metadata::new_keyspace(keyspace_name, "org.apache.cassandra.locator.NetworkTopologyStrategy", std::move(opts), true);
co_return service::prepare_new_keyspace_announcement(sp.local_db(), ksm, ts);
}
future<> executor::start() {

View File

@@ -160,7 +160,7 @@ public:
});
}
void on_before_create_column_family(const keyspace_metadata& ksm, const schema& schema, std::vector<mutation>& mutations, api::timestamp_type timestamp) override {
void on_before_create_column_family(const schema& schema, std::vector<mutation>& mutations, api::timestamp_type timestamp) override {
if (schema.cdc_options().enabled()) {
auto& db = _ctxt._proxy.get_db().local();
auto logname = log_name(schema.cf_name());

View File

@@ -29,7 +29,6 @@
#include <seastar/core/coroutine.hh>
#include <seastar/core/future-util.hh>
#include <seastar/coroutine/maybe_yield.hh>
#include <seastar/coroutine/parallel_for_each.hh>
#include <boost/range/adaptor/transformed.hpp>
@@ -215,25 +214,54 @@ static thread_local std::pair<std::string_view, data_type> new_columns[] {
{"workload_type", utf8_type}
};
static schema_ptr get_current_service_levels(data_dictionary::database db) {
return db.has_schema(system_distributed_keyspace::NAME, system_distributed_keyspace::SERVICE_LEVELS)
? db.find_schema(system_distributed_keyspace::NAME, system_distributed_keyspace::SERVICE_LEVELS)
: service_levels();
static bool has_missing_columns(data_dictionary::database db) noexcept {
assert(this_shard_id() == 0);
try {
auto schema = db.find_schema(system_distributed_keyspace::NAME, system_distributed_keyspace::SERVICE_LEVELS);
for (const auto& col : new_columns) {
auto& [col_name, col_type] = col;
bytes options_name = to_bytes(col_name.data());
if (schema->get_column_definition(options_name)) {
continue;
}
return true;
}
} catch (...) {
dlogger.warn("Failed to update options column in the role attributes table: {}", std::current_exception());
return true;
}
return false;
}
static schema_ptr get_updated_service_levels(data_dictionary::database db) {
static future<> add_new_columns_if_missing(replica::database& db, ::service::migration_manager& mm, ::service::group0_guard group0_guard) noexcept {
assert(this_shard_id() == 0);
auto schema = get_current_service_levels(db);
schema_builder b(schema);
for (const auto& col : new_columns) {
auto& [col_name, col_type] = col;
bytes options_name = to_bytes(col_name.data());
if (schema->get_column_definition(options_name)) {
continue;
try {
auto schema = db.find_schema(system_distributed_keyspace::NAME, system_distributed_keyspace::SERVICE_LEVELS);
schema_builder b(schema);
bool updated = false;
for (const auto& col : new_columns) {
auto& [col_name, col_type] = col;
bytes options_name = to_bytes(col_name.data());
if (schema->get_column_definition(options_name)) {
continue;
}
updated = true;
b.with_column(options_name, col_type, column_kind::regular_column);
}
b.with_column(options_name, col_type, column_kind::regular_column);
if (updated) {
schema_ptr table = b.build();
try {
auto ts = group0_guard.write_timestamp();
co_return co_await mm.announce(co_await service::prepare_column_family_update_announcement(mm.get_storage_proxy(), table, false,
std::vector<view_ptr>(), ts), std::move(group0_guard), "Add new columns to system_distributed.service_levels");
} catch (...) {}
}
} catch (...) {
// FIXME: do we really want to allow the node to boot if the table fails to update?
// Will this not prevent other components from working correctly?
dlogger.warn("Failed to update options column in the role attributes table: {}", std::current_exception());
}
return b.build();
}
future<> system_distributed_keyspace::start() {
@@ -242,81 +270,79 @@ future<> system_distributed_keyspace::start() {
co_return;
}
auto db = _sp.data_dictionary();
auto tables = ensured_tables();
// FIXME: fix this code to `announce` once
// Check if there is any work to do before taking the group 0 guard.
bool keyspaces_setup = db.has_keyspace(NAME) && db.has_keyspace(NAME_EVERYWHERE);
bool tables_setup = std::all_of(tables.begin(), tables.end(), [db] (schema_ptr t) { return db.has_schema(t->ks_name(), t->cf_name()); } );
bool service_levels_up_to_date = get_current_service_levels(db)->equal_columns(*get_updated_service_levels(db));
if (keyspaces_setup && tables_setup && service_levels_up_to_date) {
dlogger.info("system_distributed(_everywhere) keyspaces and tables are up-to-date. Not creating");
_started = true;
co_return;
}
if (!_sp.get_db().local().has_keyspace(NAME)) {
auto group0_guard = co_await _mm.start_group0_operation();
auto ts = group0_guard.write_timestamp();
auto group0_guard = co_await _mm.start_group0_operation();
auto ts = group0_guard.write_timestamp();
std::vector<mutation> mutations;
sstring description;
auto sd_ksm = keyspace_metadata::new_keyspace(
NAME,
"org.apache.cassandra.locator.SimpleStrategy",
{{"replication_factor", "3"}},
true /* durable_writes */);
if (!db.has_keyspace(NAME)) {
mutations = service::prepare_new_keyspace_announcement(db.real_database(), sd_ksm, ts);
description += format(" create {} keyspace;", NAME);
try {
auto ksm = keyspace_metadata::new_keyspace(
NAME,
"org.apache.cassandra.locator.SimpleStrategy",
{{"replication_factor", "3"}},
true /* durable_writes */);
co_await _mm.announce(service::prepare_new_keyspace_announcement(_sp.local_db(), ksm, ts), std::move(group0_guard),
"Create system_distributed keyspace");
} catch (exceptions::already_exists_exception&) {}
} else {
dlogger.info("{} keyspace is already present. Not creating", NAME);
}
auto sde_ksm = keyspace_metadata::new_keyspace(
NAME_EVERYWHERE,
"org.apache.cassandra.locator.EverywhereStrategy",
{},
true /* durable_writes */);
if (!db.has_keyspace(NAME_EVERYWHERE)) {
auto sde_mutations = service::prepare_new_keyspace_announcement(db.real_database(), sde_ksm, ts);
std::move(sde_mutations.begin(), sde_mutations.end(), std::back_inserter(mutations));
description += format(" create {} keyspace;", NAME_EVERYWHERE);
if (!_sp.get_db().local().has_keyspace(NAME_EVERYWHERE)) {
auto group0_guard = co_await _mm.start_group0_operation();
auto ts = group0_guard.write_timestamp();
try {
auto ksm = keyspace_metadata::new_keyspace(
NAME_EVERYWHERE,
"org.apache.cassandra.locator.EverywhereStrategy",
{},
true /* durable_writes */);
co_await _mm.announce(service::prepare_new_keyspace_announcement(_sp.local_db(), ksm, ts), std::move(group0_guard),
"Create system_distributed_everywhere keyspace");
} catch (exceptions::already_exists_exception&) {}
} else {
dlogger.info("{} keyspace is already present. Not creating", NAME_EVERYWHERE);
}
// Get mutations for creating and updating tables.
auto num_keyspace_mutations = mutations.size();
co_await coroutine::parallel_for_each(ensured_tables(),
[this, &mutations, db, ts, sd_ksm, sde_ksm] (auto&& table) -> future<> {
auto ksm = table->ks_name() == NAME ? sd_ksm : sde_ksm;
// Ensure that the service_levels table contains new columns.
if (table->cf_name() == SERVICE_LEVELS) {
table = get_updated_service_levels(db);
}
if (!db.has_schema(table->ks_name(), table->cf_name())) {
co_return co_await service::prepare_new_column_family_announcement(mutations, _sp, *ksm, std::move(table), ts);
}
// The service_levels table exists. Update it if it lacks new columns.
if (table->cf_name() == SERVICE_LEVELS && !get_current_service_levels(db)->equal_columns(*table)) {
auto update_mutations = co_await service::prepare_column_family_update_announcement(_sp, table, false, std::vector<view_ptr>(), ts);
std::move(update_mutations.begin(), update_mutations.end(), std::back_inserter(mutations));
}
auto tables = ensured_tables();
bool exist = std::all_of(tables.begin(), tables.end(), [this] (schema_ptr s) {
return _sp.get_db().local().has_schema(s->ks_name(), s->cf_name());
});
if (mutations.size() > num_keyspace_mutations) {
description += " create and update system_distributed(_everywhere) tables";
} else {
dlogger.info("All tables are present and up-to-date on start");
}
if (!mutations.empty()) {
co_await _mm.announce(std::move(mutations), std::move(group0_guard), description);
if (!exist) {
auto group0_guard = co_await _mm.start_group0_operation();
auto ts = group0_guard.write_timestamp();
auto m = co_await map_reduce(tables,
/* Mapper */ [this, ts] (auto&& table) -> future<std::vector<mutation>> {
try {
co_return co_await service::prepare_new_column_family_announcement(_sp, std::move(table), ts);
} catch (exceptions::already_exists_exception&) {
co_return std::vector<mutation>();
}
},
/* Initial value*/ std::vector<mutation>(),
/* Reducer */ [] (std::vector<mutation> m1, std::vector<mutation> m2) {
std::move(m2.begin(), m2.end(), std::back_inserter(m1));
return m1;
});
if (m.size()) {
co_await _mm.announce(std::move(m), std::move(group0_guard),
"Create system_distributed(_everywhere) tables");
}
} else {
dlogger.info("All tables are present on start");
}
_started = true;
if (has_missing_columns(_qp.db())) {
auto group0_guard = co_await _mm.start_group0_operation();
co_await add_new_columns_if_missing(_qp.db().real_database(), _mm, std::move(group0_guard));
} else {
dlogger.info("All schemas are uptodate on start");
}
}
future<> system_distributed_keyspace::stop() {

View File

@@ -174,55 +174,60 @@ future<> create_keyspace_if_not_exists_impl(seastar::sharded<service::storage_pr
co_return; // if schema is created already do nothing
}
// FIXME: fix this code to `announce` once
auto& mml = mm.local();
auto tm = proxy.local().get_token_metadata_ptr();
std::vector<lw_shared_ptr<keyspace_metadata>> ksms;
for (auto& ks_name: ks_names) {
cql3::statements::ks_prop_defs attrs;
attrs.add_property(cql3::statements::ks_prop_defs::KW_DURABLE_WRITES, "true");
std::map<sstring, sstring> replication_properties;
for (auto&& option : keyspace_replication_strategy_options) {
replication_properties.emplace(option.first, option.second);
}
attrs.add_property(cql3::statements::ks_prop_defs::KW_REPLICATION, replication_properties);
attrs.validate();
{
auto group0_guard = co_await mml.start_group0_operation();
auto ts = group0_guard.write_timestamp();
std::vector<mutation> ks_mutations;
for (auto& ks_name: ks_names) {
if (db.has_keyspace(ks_name)) {
continue;
}
ksms.push_back(attrs.as_ks_metadata(ks_name, *tm));
cql3::statements::ks_prop_defs attrs;
attrs.add_property(cql3::statements::ks_prop_defs::KW_DURABLE_WRITES, "true");
std::map<sstring, sstring> replication_properties;
for (auto&& option : keyspace_replication_strategy_options) {
replication_properties.emplace(option.first, option.second);
}
attrs.add_property(cql3::statements::ks_prop_defs::KW_REPLICATION, replication_properties);
attrs.validate();
auto muts = service::prepare_new_keyspace_announcement(db.real_database(), attrs.as_ks_metadata(ks_name, *tm), ts);
std::move(muts.begin(), muts.end(), std::back_inserter(ks_mutations));
}
if (!ks_mutations.empty()) {
co_await mml.announce(std::move(ks_mutations), std::move(group0_guard), "keyspace-utils: create keyspaces for redis");
}
}
auto group0_guard = co_await mml.start_group0_operation();
auto ts = group0_guard.write_timestamp();
std::vector<mutation> mutations;
for (auto ksm: ksms) {
if (db.has_keyspace(ksm->name())) {
continue;
}
auto muts = service::prepare_new_keyspace_announcement(db.real_database(), ksm, ts);
std::move(muts.begin(), muts.end(), std::back_inserter(mutations));
}
std::vector<mutation> table_mutations;
auto table_gen = std::bind_front(
[] (data_dictionary::database db, service::storage_proxy& sp, std::vector<mutation>& mutations,
api::timestamp_type ts, const keyspace_metadata& ksm, sstring cf_name, schema_ptr schema) -> future<> {
if (db.has_schema(ksm.name(), cf_name)) {
[] (data_dictionary::database db, service::storage_proxy& sp, std::vector<mutation>& table_mutations,
api::timestamp_type ts, sstring ks_name, sstring cf_name, schema_ptr schema) -> future<> {
if (db.has_schema(ks_name, cf_name)) {
co_return;
}
logger.info("Create keyspace: {}, table: {} for redis.", ks_name, cf_name);
auto muts = co_await service::prepare_new_column_family_announcement(sp, schema, ts);
std::move(muts.begin(), muts.end(), std::back_inserter(table_mutations));
}, db, std::ref(proxy.local()), std::ref(table_mutations), group0_guard.write_timestamp());
logger.info("Create keyspace: {}, table: {} for redis.", ksm.name(), cf_name);
co_await service::prepare_new_column_family_announcement(mutations, sp, ksm, schema, ts);
}, db, std::ref(proxy.local()), std::ref(mutations), ts);
co_await coroutine::parallel_for_each(ksms, [table_gen = std::move(table_gen)] (const lw_shared_ptr<keyspace_metadata> ksm) mutable {
return parallel_for_each(tables, [ksm, table_gen = std::move(table_gen)] (table t) {
return table_gen(*ksm, t.name, t.schema(ksm->name()));
co_await coroutine::parallel_for_each(ks_names, [table_gen = std::move(table_gen)] (const sstring& ks_name) mutable {
return parallel_for_each(tables, [ks_name, table_gen = std::move(table_gen)] (table t) {
return table_gen(ks_name, t.name, t.schema(ks_name));
}).discard_result();
});
if (!mutations.empty()) {
co_await mml.announce(std::move(mutations), std::move(group0_guard), "keyspace-utils: create default keyspaces and databases for redis");
// create default databases for redis.
if (!table_mutations.empty()) {
co_await mml.announce(std::move(table_mutations), std::move(group0_guard), "keyspace-utils: create default databases for redis");
}
}

View File

@@ -73,12 +73,7 @@ public:
// The callback runs inside seastar thread
// called before adding/updating/dropping column family.
// listener can add additional type altering mutations if he knows what he is doing.
//
// The `on_before_create_column_family` method is different as it doesn't assume the existence
// of the column family's keyspace. The reason for this is that we sometimes create a keyspace
// and its column families together. Therefore, listeners can't load the keyspace from the
// database. Instead, they should use the `ksm` parameter if needed.
virtual void on_before_create_column_family(const keyspace_metadata& ksm, const schema&, std::vector<mutation>&, api::timestamp_type) {}
virtual void on_before_create_column_family(const schema&, std::vector<mutation>&, api::timestamp_type) {}
virtual void on_before_update_column_family(const schema& new_schema, const schema& old_schema, std::vector<mutation>&, api::timestamp_type) {}
virtual void on_before_drop_column_family(const schema&, std::vector<mutation>&, api::timestamp_type) {}
virtual void on_before_drop_keyspace(const sstring& keyspace_name, std::vector<mutation>&, api::timestamp_type) {}
@@ -144,7 +139,7 @@ public:
future<> drop_function(const db::functions::function_name& fun_name, const std::vector<data_type>& arg_types);
future<> drop_aggregate(const db::functions::function_name& fun_name, const std::vector<data_type>& arg_types);
void before_create_column_family(const keyspace_metadata& ksm, const schema&, std::vector<mutation>&, api::timestamp_type);
void before_create_column_family(const schema&, std::vector<mutation>&, api::timestamp_type);
void before_update_column_family(const schema& new_schema, const schema& old_schema, std::vector<mutation>&, api::timestamp_type);
void before_drop_column_family(const schema&, std::vector<mutation>&, api::timestamp_type);
void before_drop_keyspace(const sstring& keyspace_name, std::vector<mutation>&, api::timestamp_type);

View File

@@ -614,11 +614,11 @@ future<> migration_notifier::drop_aggregate(const db::functions::function_name&
});
}
void migration_notifier::before_create_column_family(const keyspace_metadata& ksm,
const schema& schema, std::vector<mutation>& mutations, api::timestamp_type timestamp) {
_listeners.thread_for_each([&ksm, &schema, &mutations, timestamp] (migration_listener* listener) {
void migration_notifier::before_create_column_family(const schema& schema,
std::vector<mutation>& mutations, api::timestamp_type timestamp) {
_listeners.thread_for_each([&mutations, &schema, timestamp] (migration_listener* listener) {
// allow exceptions. so a listener can effectively kill a create-table
listener->on_before_create_column_family(ksm, schema, mutations, timestamp);
listener->on_before_create_column_family(schema, mutations, timestamp);
});
}
@@ -679,49 +679,35 @@ static future<std::vector<mutation>> include_keyspace(
co_return std::move(mutations);
}
static future<std::vector<mutation>> do_prepare_new_column_family_announcement(storage_proxy& sp,
const keyspace_metadata& ksm, schema_ptr cfm, api::timestamp_type timestamp) {
auto& db = sp.local_db();
if (db.has_schema(cfm->ks_name(), cfm->cf_name())) {
throw exceptions::already_exists_exception(cfm->ks_name(), cfm->cf_name());
}
if (db.column_family_exists(cfm->id())) {
throw exceptions::invalid_request_exception(format("Table with ID {} already exists: {}", cfm->id(), db.find_schema(cfm->id())));
}
mlogger.info("Create new ColumnFamily: {}", cfm);
return seastar::async([&db, &ksm, cfm, timestamp] {
auto mutations = db::schema_tables::make_create_table_mutations(cfm, timestamp);
db.get_notifier().before_create_column_family(ksm, *cfm, mutations, timestamp);
return mutations;
}).then([&sp, &ksm](std::vector<mutation> mutations) {
return include_keyspace(sp, ksm, std::move(mutations));
});
}
future<std::vector<mutation>> prepare_new_column_family_announcement(storage_proxy& sp, schema_ptr cfm, api::timestamp_type timestamp) {
#if 0
cfm.validate();
#endif
try {
auto& db = sp.get_db().local();
auto ksm = db.find_keyspace(cfm->ks_name()).metadata();
return do_prepare_new_column_family_announcement(sp, *ksm, cfm, timestamp);
auto&& keyspace = db.find_keyspace(cfm->ks_name());
if (db.has_schema(cfm->ks_name(), cfm->cf_name())) {
throw exceptions::already_exists_exception(cfm->ks_name(), cfm->cf_name());
}
if (db.column_family_exists(cfm->id())) {
throw exceptions::invalid_request_exception(format("Table with ID {} already exists: {}", cfm->id(), db.find_schema(cfm->id())));
}
mlogger.info("Create new ColumnFamily: {}", cfm);
auto ksm = keyspace.metadata();
return seastar::async([&db, cfm, timestamp, ksm] {
auto mutations = db::schema_tables::make_create_table_mutations(cfm, timestamp);
db.get_notifier().before_create_column_family(*cfm, mutations, timestamp);
return mutations;
}).then([&sp, ksm](std::vector<mutation> mutations) {
return include_keyspace(sp, *ksm, std::move(mutations));
});
} catch (const replica::no_such_keyspace& e) {
throw exceptions::configuration_exception(format("Cannot add table '{}' to non existing keyspace '{}'.", cfm->cf_name(), cfm->ks_name()));
}
}
future<> prepare_new_column_family_announcement(std::vector<mutation>& mutations,
storage_proxy& sp, const keyspace_metadata& ksm, schema_ptr cfm, api::timestamp_type timestamp) {
auto& db = sp.local_db();
// If the keyspace exists, ensure that we use the current metadata.
const auto& current_ksm = db.has_keyspace(ksm.name()) ? *db.find_keyspace(ksm.name()).metadata() : ksm;
auto new_mutations = co_await do_prepare_new_column_family_announcement(sp, current_ksm, cfm, timestamp);
std::move(new_mutations.begin(), new_mutations.end(), std::back_inserter(mutations));
}
future<std::vector<mutation>> prepare_column_family_update_announcement(storage_proxy& sp,
schema_ptr cfm, bool from_thrift, std::vector<view_ptr> view_updates, api::timestamp_type ts) {
warn(unimplemented::cause::VALIDATION);

View File

@@ -215,10 +215,6 @@ future<std::vector<mutation>> prepare_column_family_update_announcement(storage_
schema_ptr cfm, bool from_thrift, std::vector<view_ptr> view_updates, api::timestamp_type ts);
future<std::vector<mutation>> prepare_new_column_family_announcement(storage_proxy& sp, schema_ptr cfm, api::timestamp_type timestamp);
// The ksm parameter can describe a keyspace that hasn't been created yet.
// This function allows announcing a new keyspace together with its tables at once.
future<> prepare_new_column_family_announcement(std::vector<mutation>& mutations,
storage_proxy& sp, const keyspace_metadata& ksm, schema_ptr cfm, api::timestamp_type timestamp);
future<std::vector<mutation>> prepare_new_type_announcement(storage_proxy& sp, user_type new_type, api::timestamp_type ts);

View File

@@ -824,9 +824,10 @@ public:
co_return co_await lb.make_plan();
}
void on_before_create_column_family(const keyspace_metadata& ksm, const schema& s, std::vector<mutation>& muts, api::timestamp_type ts) override {
auto rs = abstract_replication_strategy::create_replication_strategy(ksm.strategy_name(), ksm.strategy_options());
if (auto&& tablet_rs = rs->maybe_as_tablet_aware()) {
void on_before_create_column_family(const schema& s, std::vector<mutation>& muts, api::timestamp_type ts) override {
keyspace& ks = _db.find_keyspace(s.ks_name());
auto&& rs = ks.get_replication_strategy();
if (auto&& tablet_rs = rs.maybe_as_tablet_aware()) {
auto tm = _db.get_shared_token_metadata().get();
auto map = tablet_rs->allocate_tablets_for_new_table(s.shared_from_this(), tm).get0();
muts.emplace_back(tablet_map_to_mutation(map, s.id(), s.keypace_name(), s.cf_name(), ts).get0());

View File

@@ -16,7 +16,7 @@
#include "replica/database.hh"
#include "service/migration_manager.hh"
static schema_ptr parse_new_cf_statement(cql3::query_processor& qp, const sstring& create_cql) {
future<> table_helper::setup_table(cql3::query_processor& qp, service::migration_manager& mm, const sstring& create_cql) {
auto db = qp.db();
auto parsed = cql3::query_processor::parse_statement(create_cql);
@@ -28,22 +28,6 @@ static schema_ptr parse_new_cf_statement(cql3::query_processor& qp, const sstrin
parsed_cf_stmt->prepare(db, qp.get_cql_stats())->statement);
auto schema = statement->get_cf_meta_data(db);
// Generate the CF UUID based on its KF names. This is needed to ensure that
// all Nodes that create it would create it with the same UUID and we don't
// hit the #420 issue.
auto uuid = generate_legacy_id(schema->ks_name(), schema->cf_name());
schema_builder b(schema);
b.set_uuid(uuid);
return b.build();
}
future<> table_helper::setup_table(cql3::query_processor& qp, service::migration_manager& mm, const sstring& create_cql) {
auto db = qp.db();
auto schema = parse_new_cf_statement(qp, create_cql);
if (db.has_schema(schema->ks_name(), schema->cf_name())) {
co_return;
}
@@ -55,12 +39,20 @@ future<> table_helper::setup_table(cql3::query_processor& qp, service::migration
co_return;
}
// Generate the CF UUID based on its KF names. This is needed to ensure that
// all Nodes that create it would create it with the same UUID and we don't
// hit the #420 issue.
auto uuid = generate_legacy_id(schema->ks_name(), schema->cf_name());
schema_builder b(schema);
b.set_uuid(uuid);
// We don't care it it fails really - this may happen due to concurrent
// "CREATE TABLE" invocation on different Nodes.
// The important thing is that it will converge eventually (some traces may
// be lost in a process but that's ok).
try {
co_return co_await mm.announce(co_await service::prepare_new_column_family_announcement(qp.proxy(), schema, ts),
co_return co_await mm.announce(co_await service::prepare_new_column_family_announcement(qp.proxy(), b.build(), ts),
std::move(group0_guard), format("table_helper: create {} table", schema->cf_name()));
} catch (...) {}
}
@@ -129,11 +121,7 @@ future<> table_helper::setup_keyspace(cql3::query_processor& qp, service::migrat
co_return;
}
// FIXME: call `announce` once (`announce` keyspace and tables together)
//
// Note that the CQL code in `parse_new_cf_statement` assumes that the keyspace exists.
// To solve this problem, we could, for example, use `schema_builder` instead of the
// CQL statements to create tables in `table_helper`.
// FIXME: call `announce` once (collapse the calls here and in `setup_table`)
if (std::any_of(tables.begin(), tables.end(), [&] (table_helper* t) { return t->_keyspace != keyspace_name; })) {
throw std::invalid_argument("setup_keyspace called with table_helper for different keyspace");
@@ -141,15 +129,14 @@ future<> table_helper::setup_keyspace(cql3::query_processor& qp, service::migrat
data_dictionary::database db = qp.db();
std::map<sstring, sstring> opts;
opts["replication_factor"] = replication_factor;
auto ksm = keyspace_metadata::new_keyspace(keyspace_name, "org.apache.cassandra.locator.SimpleStrategy", std::move(opts), true);
if (!db.has_keyspace(keyspace_name)) {
auto group0_guard = co_await mm.start_group0_operation();
auto ts = group0_guard.write_timestamp();
if (!db.has_keyspace(keyspace_name)) {
std::map<sstring, sstring> opts;
opts["replication_factor"] = replication_factor;
auto ksm = keyspace_metadata::new_keyspace(keyspace_name, "org.apache.cassandra.locator.SimpleStrategy", std::move(opts), true);
co_await mm.announce(service::prepare_new_keyspace_announcement(db.real_database(), ksm, ts),
std::move(group0_guard), format("table_helper: create {} keyspace", keyspace_name));
}
@@ -157,23 +144,8 @@ future<> table_helper::setup_keyspace(cql3::query_processor& qp, service::migrat
qs.get_client_state().set_keyspace(db.real_database(), keyspace_name);
if (std::all_of(tables.begin(), tables.end(), [db] (table_helper* t) { return db.has_schema(t->_keyspace, t->_name); })) {
co_return;
}
auto group0_guard = co_await mm.start_group0_operation();
auto ts = group0_guard.write_timestamp();
std::vector<mutation> table_mutations;
co_await coroutine::parallel_for_each(tables, [&] (auto&& table) -> future<> {
auto schema = parse_new_cf_statement(qp, table->_create_cql);
if (!db.has_schema(schema->ks_name(), schema->cf_name())) {
co_return co_await service::prepare_new_column_family_announcement(table_mutations, qp.proxy(), *ksm, schema, ts);
}
// Create tables
co_await coroutine::parallel_for_each(tables, [&qp, &mm] (table_helper* t) {
return table_helper::setup_table(qp, mm, t->_create_cql);
});
if (!table_mutations.empty()) {
co_await mm.announce(std::move(table_mutations), std::move(group0_guard),
format("table_helper: create tables for {} keyspace", keyspace_name));
}
}