service: migration_manager: change the prepare_ methods to functions

The migration_manager service is responsible for schema convergence
in the cluster - pushing schema changes to other nodes and pulling
schema when a version mismatch is observed. However, there is also
a part of migration_manager that doesn't really belong there -
creating mutations for schema updates. These are the functions with
prepare_ prefix. They don't modify any state and don't exchange any
messages. They only need to read the local database.

We take these functions out of migration_manager and make them
separate functions to reduce the dependency of other modules
(especially query_processor and CQL statements) on
migration_manager. Since all of these functions only need access
to storage_proxy (or even only replica::database), doing such a
refactor is not complicated. We just have to add one parameter,
either storage_proxy or database and both of them are easily
accessible in the places where these functions are called.
This commit is contained in:
Patryk Jędrzejczak
2023-07-25 17:41:10 +02:00
parent 7351c8424d
commit 3468cbd66b
35 changed files with 165 additions and 170 deletions

View File

@@ -573,8 +573,8 @@ future<executor::request_return_type> executor::delete_table(client_state& clien
throw api_error::resource_not_found(format("Requested resource not found: Table: {} not found", table_name));
}
auto m = co_await mm.prepare_column_family_drop_announcement(keyspace_name, table_name, group0_guard.write_timestamp(), service::migration_manager::drop_views::yes);
auto m2 = co_await mm.prepare_keyspace_drop_announcement(keyspace_name, group0_guard.write_timestamp());
auto m = co_await service::prepare_column_family_drop_announcement(_proxy, keyspace_name, table_name, group0_guard.write_timestamp(), service::drop_views::yes);
auto m2 = co_await service::prepare_keyspace_drop_announcement(_proxy.local_db(), keyspace_name, group0_guard.write_timestamp());
std::move(m2.begin(), m2.end(), std::back_inserter(m));
@@ -1208,7 +1208,7 @@ future<executor::request_return_type> executor::update_table(client_state& clien
auto schema = builder.build();
auto m = co_await mm.prepare_column_family_update_announcement(schema, false, std::vector<view_ptr>(), group0_guard.write_timestamp());
auto m = co_await service::prepare_column_family_update_announcement(p.local(), schema, false, std::vector<view_ptr>(), group0_guard.write_timestamp());
co_await mm.announce(std::move(m), std::move(group0_guard));
@@ -4480,7 +4480,7 @@ static future<std::vector<mutation>> create_keyspace(std::string_view keyspace_n
auto opts = get_network_topology_options(sp, gossiper, rf);
auto ksm = keyspace_metadata::new_keyspace(keyspace_name_str, "org.apache.cassandra.locator.NetworkTopologyStrategy", std::move(opts), true);
co_return mm.prepare_new_keyspace_announcement(ksm, ts);
co_return service::prepare_new_keyspace_announcement(sp.local_db(), ksm, ts);
}
future<> executor::start() {

View File

@@ -71,7 +71,7 @@ static future<> create_metadata_table_if_missing_impl(
auto group0_guard = co_await mm.start_group0_operation();
auto ts = group0_guard.write_timestamp();
try {
co_return co_await mm.announce(co_await mm.prepare_new_column_family_announcement(table, ts), std::move(group0_guard));
co_return co_await mm.announce(co_await ::service::prepare_new_column_family_announcement(qp.proxy(), table, ts), std::move(group0_guard));
} catch (exceptions::already_exists_exception&) {}
}
}

View File

@@ -178,7 +178,7 @@ future<> service::create_keyspace_if_missing(::service::migration_manager& mm) c
opts,
true);
co_return co_await mm.announce(mm.prepare_new_keyspace_announcement(ksm, ts), std::move(group0_guard));
co_return co_await mm.announce(::service::prepare_new_keyspace_announcement(db.real_database(), ksm, ts), std::move(group0_guard));
}
}
}

View File

@@ -80,7 +80,7 @@ cql3::statements::alter_keyspace_statement::prepare_schema_mutations(query_proce
auto old_ksm = qp.db().find_keyspace(_name).metadata();
const auto& tm = *qp.proxy().get_token_metadata_ptr();
auto m = mm.prepare_keyspace_update_announcement(_attrs->as_ks_metadata_update(old_ksm, tm), ts);
auto m = service::prepare_keyspace_update_announcement(qp.db().real_database(), _attrs->as_ks_metadata_update(old_ksm, tm), ts);
using namespace cql_transport;
auto ret = ::make_shared<event::schema_change>(

View File

@@ -384,7 +384,7 @@ future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector
alter_table_statement::prepare_schema_mutations(query_processor& qp, service::migration_manager& mm, api::timestamp_type ts) const {
data_dictionary::database db = qp.db();
auto [cfm, view_updates] = prepare_schema_update(db);
auto m = co_await mm.prepare_column_family_update_announcement(cfm.build(), false, std::move(view_updates), ts);
auto m = co_await service::prepare_column_family_update_announcement(qp.proxy(), cfm.build(), false, std::move(view_updates), ts);
using namespace cql_transport;
auto ret = ::make_shared<event::schema_change>(

View File

@@ -69,7 +69,7 @@ future<std::vector<mutation>> alter_type_statement::prepare_announcement_mutatio
auto&& updated = make_updated_type(db, to_update->second);
// Now, we need to announce the type update to basically change it for new tables using this type,
// but we also need to find all existing user types and CF using it and change them.
auto res = co_await mm.prepare_update_type_announcement(updated, ts);
auto res = co_await service::prepare_update_type_announcement(mm.get_storage_proxy(), updated, ts);
std::move(res.begin(), res.end(), std::back_inserter(m));
for (auto&& schema : ks.metadata()->cf_meta_data() | boost::adaptors::map_values) {
@@ -85,10 +85,10 @@ future<std::vector<mutation>> alter_type_statement::prepare_announcement_mutatio
}
if (modified) {
if (schema->is_view()) {
auto res = co_await mm.prepare_view_update_announcement(view_ptr(cfm.build()), ts);
auto res = co_await service::prepare_view_update_announcement(mm.get_storage_proxy(), view_ptr(cfm.build()), ts);
std::move(res.begin(), res.end(), std::back_inserter(m));
} else {
auto res = co_await mm.prepare_column_family_update_announcement(cfm.build(), false, {}, ts);
auto res = co_await service::prepare_column_family_update_announcement(mm.get_storage_proxy(), cfm.build(), false, {}, ts);
std::move(res.begin(), res.end(), std::back_inserter(m));
}
}

View File

@@ -77,7 +77,7 @@ view_ptr alter_view_statement::prepare_view(data_dictionary::database db) const
}
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>> alter_view_statement::prepare_schema_mutations(query_processor& qp, service::migration_manager& mm, api::timestamp_type ts) const {
auto m = co_await mm.prepare_view_update_announcement(prepare_view(qp.db()), ts);
auto m = co_await service::prepare_view_update_announcement(qp.proxy(), prepare_view(qp.db()), ts);
using namespace cql_transport;
auto ret = ::make_shared<event::schema_change>(

View File

@@ -90,7 +90,7 @@ create_aggregate_statement::prepare_schema_mutations(query_processor& qp, servic
auto aggregate = dynamic_pointer_cast<functions::user_aggregate>(co_await validate_while_executing(qp));
if (aggregate) {
m = co_await mm.prepare_new_aggregate_announcement(aggregate, ts);
m = co_await service::prepare_new_aggregate_announcement(qp.proxy(), aggregate, ts);
ret = create_schema_change(*aggregate, true);
}

View File

@@ -73,7 +73,7 @@ create_function_statement::prepare_schema_mutations(query_processor& qp, service
auto func = dynamic_pointer_cast<functions::user_function>(co_await validate_while_executing(qp));
if (func) {
m = co_await mm.prepare_new_function_announcement(func, ts);
m = co_await service::prepare_new_function_announcement(qp.proxy(), func, ts);
ret = create_schema_change(*func, true);
}

View File

@@ -384,7 +384,7 @@ create_index_statement::prepare_schema_mutations(query_processor& qp, service::m
std::vector<mutation> m;
if (schema) {
m = co_await mm.prepare_column_family_update_announcement(std::move(schema), false, {}, ts);
m = co_await service::prepare_column_family_update_announcement(qp.proxy(), std::move(schema), false, {}, ts);
ret = ::make_shared<event::schema_change>(
event::schema_change::change_type::UPDATED,

View File

@@ -100,7 +100,7 @@ future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector
std::vector<mutation> m;
try {
m = mm.prepare_new_keyspace_announcement(_attrs->as_ks_metadata(_name, tm), ts);
m = service::prepare_new_keyspace_announcement(qp.db().real_database(), _attrs->as_ks_metadata(_name, tm), ts);
ret = ::make_shared<event::schema_change>(
event::schema_change::change_type::CREATED,

View File

@@ -76,7 +76,7 @@ create_table_statement::prepare_schema_mutations(query_processor& qp, service::m
std::vector<mutation> m;
try {
m = co_await mm.prepare_new_column_family_announcement(get_cf_meta_data(qp.db()), ts);
m = co_await service::prepare_new_column_family_announcement(qp.proxy(), get_cf_meta_data(qp.db()), ts);
using namespace cql_transport;
ret = ::make_shared<event::schema_change>(

View File

@@ -124,7 +124,7 @@ future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector
try {
auto t = make_type(qp);
if (t) {
m = co_await mm.prepare_new_type_announcement(*t, ts);
m = co_await service::prepare_new_type_announcement(qp.proxy(), *t, ts);
using namespace cql_transport;
ret = ::make_shared<event::schema_change>(

View File

@@ -367,7 +367,7 @@ create_view_statement::prepare_schema_mutations(query_processor& qp, service::mi
std::vector<mutation> m;
auto [definition, warnings] = prepare_view(qp.db());
try {
m = co_await mm.prepare_new_view_announcement(std::move(definition), ts);
m = co_await service::prepare_new_view_announcement(qp.proxy(), std::move(definition), ts);
using namespace cql_transport;
ret = ::make_shared<event::schema_change>(
event::schema_change::change_type::CREATED,

View File

@@ -34,7 +34,7 @@ drop_aggregate_statement::prepare_schema_mutations(query_processor& qp, service:
if (!user_aggr) {
throw exceptions::invalid_request_exception(format("'{}' is not a user defined aggregate", func));
}
m = co_await mm.prepare_aggregate_drop_announcement(user_aggr, ts);
m = co_await service::prepare_aggregate_drop_announcement(qp.proxy(), user_aggr, ts);
ret = create_schema_change(*func, false);
}

View File

@@ -38,7 +38,7 @@ drop_function_statement::prepare_schema_mutations(query_processor& qp, service::
if (auto aggregate = functions::functions::used_by_user_aggregate(user_func)) {
throw exceptions::invalid_request_exception(format("Cannot delete function {}, as it is used by user-defined aggregate {}", func, *aggregate));
}
m = co_await mm.prepare_function_drop_announcement(user_func, ts);
m = co_await service::prepare_function_drop_announcement(qp.proxy(), user_func, ts);
ret = create_schema_change(*func, false);
}

View File

@@ -79,7 +79,7 @@ drop_index_statement::prepare_schema_mutations(query_processor& qp, service::mig
auto cfm = make_drop_idex_schema(qp);
if (cfm) {
m = co_await mm.prepare_column_family_update_announcement(cfm, false, {}, ts);
m = co_await service::prepare_column_family_update_announcement(qp.proxy(), cfm, false, {}, ts);
using namespace cql_transport;
ret = ::make_shared<event::schema_change>(event::schema_change::change_type::UPDATED,

View File

@@ -52,7 +52,7 @@ drop_keyspace_statement::prepare_schema_mutations(query_processor& qp, service::
::shared_ptr<cql_transport::event::schema_change> ret;
try {
m = co_await mm.prepare_keyspace_drop_announcement(_keyspace, ts);
m = co_await service::prepare_keyspace_drop_announcement(qp.db().real_database(), _keyspace, ts);
using namespace cql_transport;
ret = ::make_shared<event::schema_change>(

View File

@@ -45,7 +45,7 @@ drop_table_statement::prepare_schema_mutations(query_processor& qp, service::mig
}
try {
m = co_await mm.prepare_column_family_drop_announcement(keyspace(), column_family(), ts);
m = co_await service::prepare_column_family_drop_announcement(qp.proxy(), keyspace(), column_family(), ts);
using namespace cql_transport;
ret = ::make_shared<event::schema_change>(

View File

@@ -137,7 +137,7 @@ drop_type_statement::prepare_schema_mutations(query_processor& qp, service::migr
// Can happen with if_exists
if (to_drop != all_types.end()) {
m = co_await mm.prepare_type_drop_announcement(to_drop->second, ts);
m = co_await service::prepare_type_drop_announcement(qp.proxy(), to_drop->second, ts);
using namespace cql_transport;
ret = ::make_shared<event::schema_change>(

View File

@@ -47,7 +47,7 @@ drop_view_statement::prepare_schema_mutations(query_processor& qp, service::migr
std::vector<mutation> m;
try {
m = co_await mm.prepare_view_drop_announcement(keyspace(), column_family(), ts);
m = co_await service::prepare_view_drop_announcement(qp.proxy(), keyspace(), column_family(), ts);
using namespace cql_transport;
ret = ::make_shared<event::schema_change>(

View File

@@ -253,8 +253,8 @@ static future<> add_new_columns_if_missing(replica::database& db, ::service::mig
schema_ptr table = b.build();
try {
auto ts = group0_guard.write_timestamp();
co_return co_await mm.announce(co_await mm.prepare_column_family_update_announcement(table, false, std::vector<view_ptr>(), ts),
std::move(group0_guard), "Add new columns to system_distributed.service_levels");
co_return co_await mm.announce(co_await service::prepare_column_family_update_announcement(mm.get_storage_proxy(), table, false,
std::vector<view_ptr>(), ts), std::move(group0_guard), "Add new columns to system_distributed.service_levels");
} catch (...) {}
}
} catch (...) {
@@ -282,7 +282,7 @@ future<> system_distributed_keyspace::start() {
"org.apache.cassandra.locator.SimpleStrategy",
{{"replication_factor", "3"}},
true /* durable_writes */);
co_await _mm.announce(_mm.prepare_new_keyspace_announcement(ksm, ts), std::move(group0_guard),
co_await _mm.announce(service::prepare_new_keyspace_announcement(_sp.local_db(), ksm, ts), std::move(group0_guard),
"Create system_distributed keyspace");
} catch (exceptions::already_exists_exception&) {}
} else {
@@ -299,7 +299,7 @@ future<> system_distributed_keyspace::start() {
"org.apache.cassandra.locator.EverywhereStrategy",
{},
true /* durable_writes */);
co_await _mm.announce(_mm.prepare_new_keyspace_announcement(ksm, ts), std::move(group0_guard),
co_await _mm.announce(service::prepare_new_keyspace_announcement(_sp.local_db(), ksm, ts), std::move(group0_guard),
"Create system_distributed_everywhere keyspace");
} catch (exceptions::already_exists_exception&) {}
} else {
@@ -318,7 +318,7 @@ future<> system_distributed_keyspace::start() {
auto m = co_await map_reduce(tables,
/* Mapper */ [this, ts] (auto&& table) -> future<std::vector<mutation>> {
try {
co_return co_await _mm.prepare_new_column_family_announcement(std::move(table), ts);
co_return co_await service::prepare_new_column_family_announcement(_sp, std::move(table), ts);
} catch (exceptions::already_exists_exception&) {
co_return std::vector<mutation>();
}

View File

@@ -62,7 +62,8 @@ future<> modify_tags(service::migration_manager& mm, sstring ks, sstring cf,
schema_builder builder(s);
builder.add_extension(tags_extension::NAME, ::make_shared<tags_extension>(tags));
auto m = co_await mm.prepare_column_family_update_announcement(builder.build(), false, std::vector<view_ptr>(), group0_guard.write_timestamp());
auto m = co_await service::prepare_column_family_update_announcement(mm.get_storage_proxy(),
builder.build(), false, std::vector<view_ptr>(), group0_guard.write_timestamp());
co_await mm.announce(std::move(m), std::move(group0_guard));
});

View File

@@ -198,7 +198,7 @@ future<> create_keyspace_if_not_exists_impl(seastar::sharded<service::storage_pr
attrs.add_property(cql3::statements::ks_prop_defs::KW_REPLICATION, replication_properties);
attrs.validate();
auto muts = mml.prepare_new_keyspace_announcement(attrs.as_ks_metadata(ks_name, *tm), ts);
auto muts = service::prepare_new_keyspace_announcement(db.real_database(), attrs.as_ks_metadata(ks_name, *tm), ts);
std::move(muts.begin(), muts.end(), std::back_inserter(ks_mutations));
}
@@ -210,15 +210,15 @@ future<> create_keyspace_if_not_exists_impl(seastar::sharded<service::storage_pr
auto group0_guard = co_await mml.start_group0_operation();
std::vector<mutation> table_mutations;
auto table_gen = std::bind_front(
[] (data_dictionary::database db, service::migration_manager& mml, std::vector<mutation>& table_mutations,
[] (data_dictionary::database db, service::storage_proxy& sp, std::vector<mutation>& table_mutations,
api::timestamp_type ts, sstring ks_name, sstring cf_name, schema_ptr schema) -> future<> {
if (db.has_schema(ks_name, cf_name)) {
co_return;
}
logger.info("Create keyspace: {}, table: {} for redis.", ks_name, cf_name);
auto muts = co_await mml.prepare_new_column_family_announcement(schema, ts);
auto muts = co_await service::prepare_new_column_family_announcement(sp, schema, ts);
std::move(muts.begin(), muts.end(), std::back_inserter(table_mutations));
}, db, std::ref(mml), std::ref(table_mutations), group0_guard.write_timestamp());
}, db, std::ref(proxy.local()), std::ref(table_mutations), group0_guard.write_timestamp());
co_await coroutine::parallel_for_each(ks_names, [table_gen = std::move(table_gen)] (const sstring& ks_name) mutable {
return parallel_for_each(tables, [ks_name, table_gen = std::move(table_gen)] (table t) {

View File

@@ -644,38 +644,32 @@ public void notifyDropAggregate(UDAggregate udf)
}
#endif
std::vector<mutation> migration_manager::prepare_keyspace_update_announcement(lw_shared_ptr<keyspace_metadata> ksm, api::timestamp_type ts) {
auto& proxy = _storage_proxy;
auto& db = proxy.get_db().local();
std::vector<mutation> prepare_keyspace_update_announcement(replica::database& db, lw_shared_ptr<keyspace_metadata> ksm, api::timestamp_type ts) {
db.validate_keyspace_update(*ksm);
mlogger.info("Update Keyspace: {}", ksm);
return db::schema_tables::make_create_keyspace_mutations(db.features().cluster_schema_features(), ksm, ts);
}
std::vector<mutation> migration_manager::prepare_new_keyspace_announcement(lw_shared_ptr<keyspace_metadata> ksm, api::timestamp_type timestamp) {
auto& proxy = _storage_proxy;
auto& db = proxy.get_db().local();
std::vector<mutation> prepare_new_keyspace_announcement(replica::database& db, lw_shared_ptr<keyspace_metadata> ksm, api::timestamp_type timestamp) {
db.validate_new_keyspace(*ksm);
mlogger.info("Create new Keyspace: {}", ksm);
return db::schema_tables::make_create_keyspace_mutations(db.features().cluster_schema_features(), ksm, timestamp);
}
future<std::vector<mutation>> migration_manager::include_keyspace(
const keyspace_metadata& keyspace, std::vector<mutation> mutations) {
static future<std::vector<mutation>> include_keyspace(
storage_proxy& sp, const keyspace_metadata& keyspace, std::vector<mutation> mutations) {
// Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
mutation m = co_await db::schema_tables::read_keyspace_mutation(_storage_proxy.container(), keyspace.name());
mutation m = co_await db::schema_tables::read_keyspace_mutation(sp.container(), keyspace.name());
mutations.push_back(std::move(m));
co_return std::move(mutations);
}
future<std::vector<mutation>> migration_manager::prepare_new_column_family_announcement(schema_ptr cfm, api::timestamp_type timestamp) {
future<std::vector<mutation>> prepare_new_column_family_announcement(storage_proxy& sp, schema_ptr cfm, api::timestamp_type timestamp) {
#if 0
cfm.validate();
#endif
try {
auto& db = _storage_proxy.get_db().local();
auto& db = sp.get_db().local();
auto&& keyspace = db.find_keyspace(cfm->ks_name());
if (db.has_schema(cfm->ks_name(), cfm->cf_name())) {
throw exceptions::already_exists_exception(cfm->ks_name(), cfm->cf_name());
@@ -687,25 +681,26 @@ future<std::vector<mutation>> migration_manager::prepare_new_column_family_annou
mlogger.info("Create new ColumnFamily: {}", cfm);
auto ksm = keyspace.metadata();
return seastar::async([this, cfm, timestamp, ksm] {
return seastar::async([&db, cfm, timestamp, ksm] {
auto mutations = db::schema_tables::make_create_table_mutations(cfm, timestamp);
get_notifier().before_create_column_family(*cfm, mutations, timestamp);
db.get_notifier().before_create_column_family(*cfm, mutations, timestamp);
return mutations;
}).then([this, ksm](std::vector<mutation> mutations) {
return include_keyspace(*ksm, std::move(mutations));
}).then([&sp, ksm](std::vector<mutation> mutations) {
return include_keyspace(sp, *ksm, std::move(mutations));
});
} catch (const replica::no_such_keyspace& e) {
throw exceptions::configuration_exception(format("Cannot add table '{}' to non existing keyspace '{}'.", cfm->cf_name(), cfm->ks_name()));
}
}
future<std::vector<mutation>> migration_manager::prepare_column_family_update_announcement(schema_ptr cfm, bool from_thrift, std::vector<view_ptr> view_updates, api::timestamp_type ts) {
future<std::vector<mutation>> prepare_column_family_update_announcement(storage_proxy& sp,
schema_ptr cfm, bool from_thrift, std::vector<view_ptr> view_updates, api::timestamp_type ts) {
warn(unimplemented::cause::VALIDATION);
#if 0
cfm.validate();
#endif
try {
auto& db = _storage_proxy.get_db().local();
auto& db = sp.local_db();
auto&& old_schema = db.find_column_family(cfm->ks_name(), cfm->cf_name()).schema(); // FIXME: Should we lookup by id?
#if 0
oldCfm.validateCompatility(cfm);
@@ -722,9 +717,9 @@ future<std::vector<mutation>> migration_manager::prepare_column_family_update_an
co_await coroutine::maybe_yield();
}
co_await seastar::async([&] {
get_notifier().before_update_column_family(*cfm, *old_schema, mutations, ts);
db.get_notifier().before_update_column_family(*cfm, *old_schema, mutations, ts);
});
co_return co_await include_keyspace(*keyspace, std::move(mutations));
co_return co_await include_keyspace(sp, *keyspace, std::move(mutations));
} catch (const replica::no_such_column_family& e) {
auto&& ex = std::make_exception_ptr(exceptions::configuration_exception(format("Cannot update non existing table '{}' in keyspace '{}'.",
cfm->cf_name(), cfm->ks_name())));
@@ -732,69 +727,68 @@ future<std::vector<mutation>> migration_manager::prepare_column_family_update_an
}
}
future<std::vector<mutation>> migration_manager::do_prepare_new_type_announcement(user_type new_type, api::timestamp_type ts) {
auto& db = _storage_proxy.get_db().local();
static future<std::vector<mutation>> do_prepare_new_type_announcement(storage_proxy& sp, user_type new_type, api::timestamp_type ts) {
auto& db = sp.local_db();
auto&& keyspace = db.find_keyspace(new_type->_keyspace);
auto mutations = db::schema_tables::make_create_type_mutations(keyspace.metadata(), new_type, ts);
return include_keyspace(*keyspace.metadata(), std::move(mutations));
return include_keyspace(sp, *keyspace.metadata(), std::move(mutations));
}
future<std::vector<mutation>> migration_manager::prepare_new_type_announcement(user_type new_type, api::timestamp_type ts) {
future<std::vector<mutation>> prepare_new_type_announcement(storage_proxy& sp, user_type new_type, api::timestamp_type ts) {
mlogger.info("Prepare Create new User Type: {}", new_type->get_name_as_string());
return do_prepare_new_type_announcement(std::move(new_type), ts);
return do_prepare_new_type_announcement(sp, std::move(new_type), ts);
}
future<std::vector<mutation>> migration_manager::prepare_update_type_announcement(user_type updated_type, api::timestamp_type ts) {
future<std::vector<mutation>> prepare_update_type_announcement(storage_proxy& sp, user_type updated_type, api::timestamp_type ts) {
mlogger.info("Prepare Update User Type: {}", updated_type->get_name_as_string());
return do_prepare_new_type_announcement(updated_type, ts);
return do_prepare_new_type_announcement(sp, updated_type, ts);
}
future<std::vector<mutation>> migration_manager::prepare_new_function_announcement(shared_ptr<cql3::functions::user_function> func, api::timestamp_type ts) {
auto& db = _storage_proxy.get_db().local();
future<std::vector<mutation>> prepare_new_function_announcement(storage_proxy& sp, shared_ptr<cql3::functions::user_function> func, api::timestamp_type ts) {
auto& db = sp.local_db();
auto&& keyspace = db.find_keyspace(func->name().keyspace);
auto mutations = db::schema_tables::make_create_function_mutations(func, ts);
return include_keyspace(*keyspace.metadata(), std::move(mutations));
return include_keyspace(sp, *keyspace.metadata(), std::move(mutations));
}
future<std::vector<mutation>> migration_manager::prepare_function_drop_announcement(shared_ptr<cql3::functions::user_function> func, api::timestamp_type ts) {
auto& db = _storage_proxy.get_db().local();
future<std::vector<mutation>> prepare_function_drop_announcement(storage_proxy& sp, shared_ptr<cql3::functions::user_function> func, api::timestamp_type ts) {
auto& db = sp.local_db();
auto&& keyspace = db.find_keyspace(func->name().keyspace);
auto mutations = db::schema_tables::make_drop_function_mutations(func, ts);
return include_keyspace(*keyspace.metadata(), std::move(mutations));
return include_keyspace(sp, *keyspace.metadata(), std::move(mutations));
}
future<std::vector<mutation>> migration_manager::prepare_new_aggregate_announcement(shared_ptr<cql3::functions::user_aggregate> aggregate, api::timestamp_type ts) {
auto& db = _storage_proxy.get_db().local();
future<std::vector<mutation>> prepare_new_aggregate_announcement(storage_proxy& sp, shared_ptr<cql3::functions::user_aggregate> aggregate, api::timestamp_type ts) {
auto& db = sp.local_db();
auto&& keyspace = db.find_keyspace(aggregate->name().keyspace);
auto mutations = db::schema_tables::make_create_aggregate_mutations(db.features().cluster_schema_features(), aggregate, ts);
return include_keyspace(*keyspace.metadata(), std::move(mutations));
return include_keyspace(sp, *keyspace.metadata(), std::move(mutations));
}
future<std::vector<mutation>> migration_manager::prepare_aggregate_drop_announcement(shared_ptr<cql3::functions::user_aggregate> aggregate, api::timestamp_type ts) {
auto& db = _storage_proxy.get_db().local();
future<std::vector<mutation>> prepare_aggregate_drop_announcement(storage_proxy& sp, shared_ptr<cql3::functions::user_aggregate> aggregate, api::timestamp_type ts) {
auto& db = sp.local_db();
auto&& keyspace = db.find_keyspace(aggregate->name().keyspace);
auto mutations = db::schema_tables::make_drop_aggregate_mutations(db.features().cluster_schema_features(), aggregate, ts);
return include_keyspace(*keyspace.metadata(), std::move(mutations));
return include_keyspace(sp, *keyspace.metadata(), std::move(mutations));
}
future<std::vector<mutation>> migration_manager::prepare_keyspace_drop_announcement(const sstring& ks_name, api::timestamp_type ts) {
auto& db = _storage_proxy.get_db().local();
future<std::vector<mutation>> prepare_keyspace_drop_announcement(replica::database& db, const sstring& ks_name, api::timestamp_type ts) {
if (!db.has_keyspace(ks_name)) {
throw exceptions::configuration_exception(format("Cannot drop non existing keyspace '{}'.", ks_name));
}
auto& keyspace = db.find_keyspace(ks_name);
mlogger.info("Drop Keyspace '{}'", ks_name);
return seastar::async([this, &db, &keyspace, ts, ks_name] {
return seastar::async([&db, &keyspace, ts, ks_name] {
auto mutations = db::schema_tables::make_drop_keyspace_mutations(db.features().cluster_schema_features(), keyspace.metadata(), ts);
get_notifier().before_drop_keyspace(ks_name, mutations, ts);
db.get_notifier().before_drop_keyspace(ks_name, mutations, ts);
return mutations;
});
}
future<std::vector<mutation>> migration_manager::prepare_column_family_drop_announcement(const sstring& ks_name,
const sstring& cf_name, api::timestamp_type ts, drop_views drop_views) {
future<std::vector<mutation>> prepare_column_family_drop_announcement(storage_proxy& sp,
const sstring& ks_name, const sstring& cf_name, api::timestamp_type ts, drop_views drop_views) {
try {
auto& db = _storage_proxy.get_db().local();
auto& db = sp.local_db();
auto& old_cfm = db.find_column_family(ks_name, cf_name);
auto& schema = old_cfm.schema();
if (schema->is_view()) {
@@ -832,29 +826,29 @@ future<std::vector<mutation>> migration_manager::prepare_column_family_drop_anno
// notifiers must run in seastar thread
co_await seastar::async([&] {
get_notifier().before_drop_column_family(*schema, mutations, ts);
db.get_notifier().before_drop_column_family(*schema, mutations, ts);
});
co_return co_await include_keyspace(*keyspace, std::move(mutations));
co_return co_await include_keyspace(sp, *keyspace, std::move(mutations));
} catch (const replica::no_such_column_family& e) {
auto&& ex = std::make_exception_ptr(exceptions::configuration_exception(format("Cannot drop non existing table '{}' in keyspace '{}'.", cf_name, ks_name)));
co_return coroutine::exception(std::move(ex));
}
}
future<std::vector<mutation>> migration_manager::prepare_type_drop_announcement(user_type dropped_type, api::timestamp_type ts) {
auto& db = _storage_proxy.get_db().local();
future<std::vector<mutation>> prepare_type_drop_announcement(storage_proxy& sp, user_type dropped_type, api::timestamp_type ts) {
auto& db = sp.local_db();
auto&& keyspace = db.find_keyspace(dropped_type->_keyspace);
mlogger.info("Drop User Type: {}", dropped_type->get_name_as_string());
auto mutations =
db::schema_tables::make_drop_type_mutations(keyspace.metadata(), dropped_type, ts);
return include_keyspace(*keyspace.metadata(), std::move(mutations));
return include_keyspace(sp, *keyspace.metadata(), std::move(mutations));
}
future<std::vector<mutation>> migration_manager::prepare_new_view_announcement(view_ptr view, api::timestamp_type ts) {
future<std::vector<mutation>> prepare_new_view_announcement(storage_proxy& sp, view_ptr view, api::timestamp_type ts) {
#if 0
view.metadata.validate();
#endif
auto& db = _storage_proxy.get_db().local();
auto& db = sp.local_db();
try {
auto&& keyspace = db.find_keyspace(view->ks_name()).metadata();
if (keyspace->cf_meta_data().contains(view->cf_name())) {
@@ -862,18 +856,18 @@ future<std::vector<mutation>> migration_manager::prepare_new_view_announcement(v
}
mlogger.info("Create new view: {}", view);
auto mutations = db::schema_tables::make_create_view_mutations(keyspace, std::move(view), ts);
co_return co_await include_keyspace(*keyspace, std::move(mutations));
co_return co_await include_keyspace(sp, *keyspace, std::move(mutations));
} catch (const replica::no_such_keyspace& e) {
auto&& ex = std::make_exception_ptr(exceptions::configuration_exception(format("Cannot add view '{}' to non existing keyspace '{}'.", view->cf_name(), view->ks_name())));
co_return coroutine::exception(std::move(ex));
}
}
future<std::vector<mutation>> migration_manager::prepare_view_update_announcement(view_ptr view, api::timestamp_type ts) {
future<std::vector<mutation>> prepare_view_update_announcement(storage_proxy& sp, view_ptr view, api::timestamp_type ts) {
#if 0
view.metadata.validate();
#endif
auto db = _storage_proxy.data_dictionary();
auto db = sp.data_dictionary();
try {
auto&& keyspace = db.find_keyspace(view->ks_name()).metadata();
auto& old_view = keyspace->cf_meta_data().at(view->cf_name());
@@ -885,7 +879,7 @@ future<std::vector<mutation>> migration_manager::prepare_view_update_announcemen
#endif
mlogger.info("Update view '{}.{}' From {} To {}", view->ks_name(), view->cf_name(), *old_view, *view);
auto mutations = db::schema_tables::make_update_view_mutations(keyspace, view_ptr(old_view), std::move(view), ts, true);
co_return co_await include_keyspace(*keyspace, std::move(mutations));
co_return co_await include_keyspace(sp, *keyspace, std::move(mutations));
} catch (const std::out_of_range& e) {
auto&& ex = std::make_exception_ptr(exceptions::configuration_exception(format("Cannot update non existing materialized view '{}' in keyspace '{}'.",
view->cf_name(), view->ks_name())));
@@ -893,8 +887,8 @@ future<std::vector<mutation>> migration_manager::prepare_view_update_announcemen
}
}
future<std::vector<mutation>> migration_manager::prepare_view_drop_announcement(const sstring& ks_name, const sstring& cf_name, api::timestamp_type ts) {
auto& db = _storage_proxy.get_db().local();
future<std::vector<mutation>> prepare_view_drop_announcement(storage_proxy& sp, const sstring& ks_name, const sstring& cf_name, api::timestamp_type ts) {
auto& db = sp.local_db();
try {
auto& view = db.find_column_family(ks_name, cf_name).schema();
if (!view->is_view()) {
@@ -906,7 +900,7 @@ future<std::vector<mutation>> migration_manager::prepare_view_drop_announcement(
auto keyspace = db.find_keyspace(ks_name).metadata();
mlogger.info("Drop view '{}.{}'", view->ks_name(), view->cf_name());
auto mutations = db::schema_tables::make_drop_view_mutations(keyspace, view_ptr(std::move(view)), ts);
return include_keyspace(*keyspace, std::move(mutations));
return include_keyspace(sp, *keyspace, std::move(mutations));
} catch (const replica::no_such_column_family& e) {
throw exceptions::configuration_exception(format("Cannot drop non existing materialized view '{}' in keyspace '{}'.",
cf_name, ks_name));

View File

@@ -132,43 +132,6 @@ public:
bool should_pull_schema_from(const gms::inet_address& endpoint);
bool has_compatible_schema_tables_version(const gms::inet_address& endpoint);
std::vector<mutation> prepare_keyspace_update_announcement(lw_shared_ptr<keyspace_metadata> ksm, api::timestamp_type);
std::vector<mutation> prepare_new_keyspace_announcement(lw_shared_ptr<keyspace_metadata> ksm, api::timestamp_type);
// The timestamp parameter can be used to ensure that all nodes update their internal tables' schemas
// with identical timestamps, which can prevent an undeeded schema exchange
future<std::vector<mutation>> prepare_column_family_update_announcement(schema_ptr cfm, bool from_thrift, std::vector<view_ptr> view_updates, api::timestamp_type ts);
future<std::vector<mutation>> prepare_new_column_family_announcement(schema_ptr cfm, api::timestamp_type timestamp);
future<std::vector<mutation>> prepare_new_type_announcement(user_type new_type, api::timestamp_type);
future<std::vector<mutation>> prepare_new_function_announcement(shared_ptr<cql3::functions::user_function> func, api::timestamp_type);
future<std::vector<mutation>> prepare_new_aggregate_announcement(shared_ptr<cql3::functions::user_aggregate> aggregate, api::timestamp_type);
future<std::vector<mutation>> prepare_function_drop_announcement(shared_ptr<cql3::functions::user_function> func, api::timestamp_type);
future<std::vector<mutation>> prepare_aggregate_drop_announcement(shared_ptr<cql3::functions::user_aggregate> aggregate, api::timestamp_type);
future<std::vector<mutation>> prepare_update_type_announcement(user_type updated_type, api::timestamp_type);
future<std::vector<mutation>> prepare_keyspace_drop_announcement(const sstring& ks_name, api::timestamp_type);
class drop_views_tag;
using drop_views = bool_class<drop_views_tag>;
future<std::vector<mutation>> prepare_column_family_drop_announcement(const sstring& ks_name, const sstring& cf_name, api::timestamp_type, drop_views drop_views = drop_views::no);
future<std::vector<mutation>> prepare_type_drop_announcement(user_type dropped_type, api::timestamp_type);
future<std::vector<mutation>> prepare_new_view_announcement(view_ptr view, api::timestamp_type);
future<std::vector<mutation>> prepare_view_update_announcement(view_ptr view, api::timestamp_type);
future<std::vector<mutation>> prepare_view_drop_announcement(const sstring& ks_name, const sstring& cf_name, api::timestamp_type);
// The function needs to be called if the user wants to read most up-to-date group 0 state (including schema state)
// (the function ensures that all previously finished group0 operations are visible on this node) or to write it.
//
@@ -202,9 +165,6 @@ public:
private:
future<> uninit_messaging_service();
future<std::vector<mutation>> include_keyspace(const keyspace_metadata& keyspace, std::vector<mutation> mutations);
future<std::vector<mutation>> do_prepare_new_type_announcement(user_type new_type, api::timestamp_type);
future<> push_schema_mutation(const gms::inet_address& endpoint, const std::vector<mutation>& schema);
future<> passive_announce();
@@ -245,4 +205,42 @@ public:
future<column_mapping> get_column_mapping(table_id, table_schema_version v);
std::vector<mutation> prepare_keyspace_update_announcement(replica::database& db, lw_shared_ptr<keyspace_metadata> ksm, api::timestamp_type ts);
std::vector<mutation> prepare_new_keyspace_announcement(replica::database& db, lw_shared_ptr<keyspace_metadata> ksm, api::timestamp_type timestamp);
// The timestamp parameter can be used to ensure that all nodes update their internal tables' schemas
// with identical timestamps, which can prevent an undeeded schema exchange
future<std::vector<mutation>> prepare_column_family_update_announcement(storage_proxy& sp,
schema_ptr cfm, bool from_thrift, std::vector<view_ptr> view_updates, api::timestamp_type ts);
future<std::vector<mutation>> prepare_new_column_family_announcement(storage_proxy& sp, schema_ptr cfm, api::timestamp_type timestamp);
future<std::vector<mutation>> prepare_new_type_announcement(storage_proxy& sp, user_type new_type, api::timestamp_type ts);
future<std::vector<mutation>> prepare_new_function_announcement(storage_proxy& sp, shared_ptr<cql3::functions::user_function> func, api::timestamp_type ts);
future<std::vector<mutation>> prepare_new_aggregate_announcement(storage_proxy& sp, shared_ptr<cql3::functions::user_aggregate> aggregate, api::timestamp_type ts);
future<std::vector<mutation>> prepare_function_drop_announcement(storage_proxy& sp, shared_ptr<cql3::functions::user_function> func, api::timestamp_type ts);
future<std::vector<mutation>> prepare_aggregate_drop_announcement(storage_proxy& sp, shared_ptr<cql3::functions::user_aggregate> aggregate, api::timestamp_type ts);
future<std::vector<mutation>> prepare_update_type_announcement(storage_proxy& sp, user_type updated_type, api::timestamp_type ts);
future<std::vector<mutation>> prepare_keyspace_drop_announcement(replica::database& db, const sstring& ks_name, api::timestamp_type ts);
class drop_views_tag;
using drop_views = bool_class<drop_views_tag>;
future<std::vector<mutation>> prepare_column_family_drop_announcement(storage_proxy& sp,
const sstring& ks_name, const sstring& cf_name, api::timestamp_type ts, drop_views drop_views = drop_views::no);
future<std::vector<mutation>> prepare_type_drop_announcement(storage_proxy& sp, user_type dropped_type, api::timestamp_type ts);
future<std::vector<mutation>> prepare_new_view_announcement(storage_proxy& sp, view_ptr view, api::timestamp_type ts);
future<std::vector<mutation>> prepare_view_update_announcement(storage_proxy& sp, view_ptr view, api::timestamp_type ts);
future<std::vector<mutation>> prepare_view_drop_announcement(storage_proxy& sp, const sstring& ks_name, const sstring& cf_name, api::timestamp_type ts);
}

View File

@@ -52,7 +52,7 @@ future<> table_helper::setup_table(cql3::query_processor& qp, service::migration
// The important thing is that it will converge eventually (some traces may
// be lost in a process but that's ok).
try {
co_return co_await mm.announce(co_await mm.prepare_new_column_family_announcement(b.build(), ts), std::move(group0_guard));
co_return co_await mm.announce(co_await service::prepare_new_column_family_announcement(qp.proxy(), b.build(), ts), std::move(group0_guard));
} catch (...) {}
}
@@ -136,7 +136,7 @@ future<> table_helper::setup_keyspace(cql3::query_processor& qp, service::migrat
std::map<sstring, sstring> opts;
opts["replication_factor"] = replication_factor;
auto ksm = keyspace_metadata::new_keyspace(keyspace_name, "org.apache.cassandra.locator.SimpleStrategy", std::move(opts), true);
co_await mm.announce(mm.prepare_new_keyspace_announcement(ksm, ts), std::move(group0_guard));
co_await mm.announce(service::prepare_new_keyspace_announcement(db.real_database(), ksm, ts), std::move(group0_guard));
}
}

View File

@@ -4046,7 +4046,7 @@ SEASTAR_TEST_CASE(test_view_with_two_regular_base_columns_in_key) {
auto& mm = e.migration_manager().local();
auto group0_guard = mm.start_group0_operation().get();
auto ts = group0_guard.write_timestamp();
mm.announce(mm.prepare_new_view_announcement(view_ptr(view_schema), ts).get(), std::move(group0_guard)).get();
mm.announce(service::prepare_new_view_announcement(mm.get_storage_proxy(), view_ptr(view_schema), ts).get(), std::move(group0_guard)).get();
// Verify that deleting and restoring columns behaves as expected - i.e. the row is deleted and regenerated
cquery_nofail(e, "INSERT INTO t (p, c, v1, v2) VALUES (1, 2, 3, 4)");

View File

@@ -266,13 +266,13 @@ static void test_database(void (*run_tests)(populate_fn_ex, bool), unsigned cgs)
auto group0_guard = mm.start_group0_operation().get();
auto ts = group0_guard.write_timestamp();
e.local_db().find_column_family(s->ks_name(), s->cf_name());
mm.announce(mm.prepare_column_family_drop_announcement(s->ks_name(), s->cf_name(), ts).get(), std::move(group0_guard)).get();
mm.announce(service::prepare_column_family_drop_announcement(mm.get_storage_proxy(), s->ks_name(), s->cf_name(), ts).get(), std::move(group0_guard)).get();
} catch (const replica::no_such_column_family&) {
// expected
}
auto group0_guard = mm.start_group0_operation().get();
auto ts = group0_guard.write_timestamp();
mm.announce(mm.prepare_new_column_family_announcement(s, ts).get(), std::move(group0_guard)).get();
mm.announce(service::prepare_new_column_family_announcement(mm.get_storage_proxy(), s, ts).get(), std::move(group0_guard)).get();
replica::column_family& cf = e.local_db().find_column_family(s);
auto uuid = cf.schema()->id();
for (auto&& m : partitions) {

View File

@@ -87,7 +87,7 @@ SEASTAR_TEST_CASE(test_group0_cmd_merge) {
};
std::vector<canonical_mutation> cms;
size_t size = 0;
auto muts = mm.prepare_keyspace_drop_announcement("ks", api::new_timestamp()).get0();
auto muts = service::prepare_keyspace_drop_announcement(env.local_db(), "ks", api::new_timestamp()).get0();
// Maximum mutation size is 1/3 of commitlog segment size wich we set
// to 1M. Make one command a little bit larger than third of the max size.
while (size < 150*1024) {

View File

@@ -1001,7 +1001,7 @@ SEASTAR_TEST_CASE(sstable_compaction_does_not_resurrect_data) {
.build();
auto group0_guard = mm.start_group0_operation().get();
auto ts = group0_guard.write_timestamp();
mm.announce(mm.prepare_new_column_family_announcement(s, ts).get(), std::move(group0_guard)).get();
mm.announce(service::prepare_new_column_family_announcement(mm.get_storage_proxy(), s, ts).get(), std::move(group0_guard)).get();
replica::table& t = db.find_column_family(ks_name, table_name);
@@ -1066,7 +1066,7 @@ SEASTAR_TEST_CASE(failed_flush_prevents_writes) {
schema_ptr s = ss.schema();
auto group0_guard = mm.start_group0_operation().get();
auto ts = group0_guard.write_timestamp();
mm.announce(mm.prepare_new_column_family_announcement(s, ts).get(), std::move(group0_guard)).get();
mm.announce(service::prepare_new_column_family_announcement(mm.get_storage_proxy(), s, ts).get(), std::move(group0_guard)).get();
replica::table& t = db.find_column_family("ks", "cf");
auto memtables = t.active_memtables();
@@ -1134,7 +1134,7 @@ SEASTAR_TEST_CASE(flushing_rate_is_reduced_if_compaction_doesnt_keep_up) {
return env.migration_manager().invoke_on(0, [s = global_schema_ptr(std::move(s))] (service::migration_manager& mm) -> future<> {
auto group0_guard = co_await mm.start_group0_operation();
auto ts = group0_guard.write_timestamp();
auto announcement = co_await mm.prepare_new_column_family_announcement(s, ts);
auto announcement = co_await service::prepare_new_column_family_announcement(mm.get_storage_proxy(), s, ts);
co_await mm.announce(std::move(announcement), std::move(group0_guard));
});
}
@@ -1143,7 +1143,7 @@ SEASTAR_TEST_CASE(flushing_rate_is_reduced_if_compaction_doesnt_keep_up) {
return env.migration_manager().invoke_on(0, [shard = this_shard_id()] (service::migration_manager& mm) -> future<> {
auto group0_guard = co_await mm.start_group0_operation();
auto ts = group0_guard.write_timestamp();
auto announcement = co_await mm.prepare_column_family_drop_announcement(ks_name(), cf_name(shard), ts);
auto announcement = co_await service::prepare_column_family_drop_announcement(mm.get_storage_proxy(), ks_name(), cf_name(shard), ts);
co_await mm.announce(std::move(announcement), std::move(group0_guard));
});
}

View File

@@ -4168,7 +4168,7 @@ SEASTAR_TEST_CASE(row_cache_is_populated_using_compacting_sstable_reader) {
.with_column(to_bytes("id"), int32_type)
.build();
mm.announce(
mm.prepare_new_column_family_announcement(s, api::new_timestamp()).get(),
service::prepare_new_column_family_announcement(mm.get_storage_proxy(), s, api::new_timestamp()).get(),
mm.start_group0_operation().get()
).get();

View File

@@ -47,7 +47,7 @@ SEASTAR_TEST_CASE(test_new_schema_with_no_structural_change_is_propagated) {
{
auto group0_guard = mm.start_group0_operation().get();
auto ts = group0_guard.write_timestamp();
mm.announce(mm.prepare_new_column_family_announcement(old_schema, ts).get(), std::move(group0_guard)).get();
mm.announce(service::prepare_new_column_family_announcement(mm.get_storage_proxy(), old_schema, ts).get(), std::move(group0_guard)).get();
}
auto old_table_version = e.db().local().find_schema(old_schema->id())->version();
@@ -58,7 +58,8 @@ SEASTAR_TEST_CASE(test_new_schema_with_no_structural_change_is_propagated) {
auto group0_guard = mm.start_group0_operation().get();
auto ts = group0_guard.write_timestamp();
mm.announce(mm.prepare_column_family_update_announcement(new_schema, false, std::vector<view_ptr>(), ts).get(), std::move(group0_guard)).get();
mm.announce(service::prepare_column_family_update_announcement(mm.get_storage_proxy(),
new_schema, false, std::vector<view_ptr>(), ts).get(), std::move(group0_guard)).get();
BOOST_REQUIRE_NE(e.db().local().find_schema(old_schema->id())->version(), old_table_version);
BOOST_REQUIRE_NE(e.db().local().get_version(), old_node_version);
@@ -81,7 +82,7 @@ SEASTAR_TEST_CASE(test_schema_is_updated_in_keyspace) {
{
auto group0_guard = mm.start_group0_operation().get();
auto ts = group0_guard.write_timestamp();
mm.announce(mm.prepare_new_column_family_announcement(old_schema, ts).get(), std::move(group0_guard)).get();
mm.announce(service::prepare_new_column_family_announcement(mm.get_storage_proxy(), old_schema, ts).get(), std::move(group0_guard)).get();
}
auto s = e.local_db().find_schema(old_schema->id());
@@ -94,7 +95,8 @@ SEASTAR_TEST_CASE(test_schema_is_updated_in_keyspace) {
auto group0_guard = mm.start_group0_operation().get();
auto ts = group0_guard.write_timestamp();
mm.announce(mm.prepare_column_family_update_announcement(new_schema, false, std::vector<view_ptr>(), ts).get(), std::move(group0_guard)).get();
mm.announce(service::prepare_column_family_update_announcement(mm.get_storage_proxy(),
new_schema, false, std::vector<view_ptr>(), ts).get(), std::move(group0_guard)).get();
s = e.local_db().find_schema(old_schema->id());
BOOST_REQUIRE_NE(*old_schema, *s);
@@ -118,7 +120,7 @@ SEASTAR_TEST_CASE(test_tombstones_are_ignored_in_version_calculation) {
auto& mm = e.migration_manager().local();
auto group0_guard = mm.start_group0_operation().get();
auto ts = group0_guard.write_timestamp();
mm.announce(mm.prepare_new_column_family_announcement(table_schema, ts).get(), std::move(group0_guard)).get();
mm.announce(service::prepare_new_column_family_announcement(mm.get_storage_proxy(), table_schema, ts).get(), std::move(group0_guard)).get();
auto old_table_version = e.db().local().find_schema(table_schema->id())->version();
auto old_node_version = e.db().local().get_version();
@@ -169,7 +171,7 @@ SEASTAR_TEST_CASE(test_concurrent_column_addition) {
{
auto group0_guard = mm.start_group0_operation().get();
auto ts = group0_guard.write_timestamp();
mm.announce(mm.prepare_new_column_family_announcement(s1, ts).get(), std::move(group0_guard)).get();
mm.announce(service::prepare_new_column_family_announcement(mm.get_storage_proxy(), s1, ts).get(), std::move(group0_guard)).get();
}
auto old_version = e.db().local().find_schema(s1->id())->version();
@@ -320,7 +322,7 @@ SEASTAR_TEST_CASE(test_combined_column_add_and_drop) {
{
auto group0_guard = mm.start_group0_operation().get();
auto ts = group0_guard.write_timestamp();
mm.announce(mm.prepare_new_column_family_announcement(s1, ts).get(), std::move(group0_guard)).get();
mm.announce(service::prepare_new_column_family_announcement(mm.get_storage_proxy(), s1, ts).get(), std::move(group0_guard)).get();
}
auto&& keyspace = e.db().local().find_keyspace(s1->ks_name()).metadata();
@@ -386,8 +388,8 @@ SEASTAR_TEST_CASE(test_concurrent_table_creation_with_different_schema) {
.with_column("v1", bytes_type)
.build();
auto ann1 = mm.prepare_new_column_family_announcement(s1, api::new_timestamp()).get();
auto ann2 = mm.prepare_new_column_family_announcement(s2, api::new_timestamp()).get();
auto ann1 = service::prepare_new_column_family_announcement(mm.get_storage_proxy(), s1, api::new_timestamp()).get();
auto ann2 = service::prepare_new_column_family_announcement(mm.get_storage_proxy(), s2, api::new_timestamp()).get();
{
auto group0_guard = mm.start_group0_operation().get();
@@ -477,7 +479,7 @@ SEASTAR_TEST_CASE(test_merging_creates_a_table_even_if_keyspace_was_recreated) {
{
auto group0_guard = mm.start_group0_operation().get();
const auto ts = group0_guard.write_timestamp();
auto muts = e.migration_manager().local().prepare_keyspace_drop_announcement("ks", ts).get0();
auto muts = service::prepare_keyspace_drop_announcement(e.local_db(), "ks", ts).get0();
boost::copy(muts, std::back_inserter(all_muts));
mm.announce(muts, std::move(group0_guard)).get();
}
@@ -487,7 +489,7 @@ SEASTAR_TEST_CASE(test_merging_creates_a_table_even_if_keyspace_was_recreated) {
const auto ts = group0_guard.write_timestamp();
// all_muts contains keyspace drop.
auto muts = e.migration_manager().local().prepare_new_keyspace_announcement(keyspace, ts);
auto muts = service::prepare_new_keyspace_announcement(e.db().local(), keyspace, ts);
boost::copy(muts, std::back_inserter(all_muts));
mm.announce(muts, std::move(group0_guard)).get();
}
@@ -496,7 +498,7 @@ SEASTAR_TEST_CASE(test_merging_creates_a_table_even_if_keyspace_was_recreated) {
auto group0_guard = mm.start_group0_operation().get();
const auto ts = group0_guard.write_timestamp();
auto muts = e.migration_manager().local().prepare_new_column_family_announcement(s0, ts).get0();
auto muts = service::prepare_new_column_family_announcement(mm.get_storage_proxy(), s0, ts).get0();
boost::copy(muts, std::back_inserter(all_muts));
mm.announce(all_muts, std::move(group0_guard)).get();

View File

@@ -310,7 +310,7 @@ public:
auto s = builder.build(schema_builder::compact_storage::no);
auto group0_guard = co_await _mm.local().start_group0_operation();
auto ts = group0_guard.write_timestamp();
co_return co_await _mm.local().announce(co_await _mm.local().prepare_new_column_family_announcement(s, ts), std::move(group0_guard));
co_return co_await _mm.local().announce(co_await service::prepare_new_column_family_announcement(_proxy.local(), s, ts), std::move(group0_guard));
}
virtual future<> require_keyspace_exists(const sstring& ks_name) override {

View File

@@ -885,7 +885,7 @@ public:
co_await t._query_state.get_client_state().has_keyspace_access(cf_def.keyspace, auth::permission::CREATE);
co_return co_await t.execute_schema_command([&cf_def] (service::migration_manager& mm, data_dictionary::database db, api::timestamp_type ts) -> future<std::vector<mutation>> {
co_return co_await t.execute_schema_command([&p = t._proxy.local(), &cf_def] (service::migration_manager& mm, data_dictionary::database db, api::timestamp_type ts) -> future<std::vector<mutation>> {
if (!db.has_keyspace(cf_def.keyspace)) {
throw NotFoundException();
}
@@ -894,7 +894,7 @@ public:
}
auto s = schema_from_thrift(cf_def, cf_def.keyspace);
co_return co_await mm.prepare_new_column_family_announcement(std::move(s), ts);
co_return co_await service::prepare_new_column_family_announcement(p, std::move(s), ts);
});
});
}
@@ -906,7 +906,7 @@ public:
co_await t._query_state.get_client_state().has_column_family_access(t.current_keyspace(), column_family, auth::permission::DROP);
co_return co_await t.execute_schema_command(
[&column_family, &current_keyspace = t.current_keyspace()] (service::migration_manager& mm, data_dictionary::database db, api::timestamp_type ts) -> future<std::vector<mutation>> {
[&p = t._proxy.local(), &column_family, &current_keyspace = t.current_keyspace()] (service::migration_manager& mm, data_dictionary::database db, api::timestamp_type ts) -> future<std::vector<mutation>> {
auto cf = db.find_table(current_keyspace, column_family);
if (cf.schema()->is_view()) {
throw make_exception<InvalidRequestException>("Cannot drop Materialized Views from Thrift");
@@ -915,7 +915,7 @@ public:
throw make_exception<InvalidRequestException>("Cannot drop table with Materialized Views {}", column_family);
}
co_return co_await mm.prepare_column_family_drop_announcement(current_keyspace, column_family, ts);
co_return co_await service::prepare_column_family_drop_announcement(p, current_keyspace, column_family, ts);
});
});
}
@@ -929,7 +929,7 @@ public:
co_await t._query_state.get_client_state().has_all_keyspaces_access(auth::permission::CREATE);
co_return co_await t.execute_schema_command([&ks_def] (service::migration_manager& mm, data_dictionary::database db, api::timestamp_type ts) -> future<std::vector<mutation>> {
co_return mm.prepare_new_keyspace_announcement(keyspace_from_thrift(ks_def), ts);
co_return service::prepare_new_keyspace_announcement(db.real_database(), keyspace_from_thrift(ks_def), ts);
});
});
}
@@ -948,7 +948,7 @@ public:
throw NotFoundException();
}
co_return co_await mm.prepare_keyspace_drop_announcement(keyspace, ts);
co_return co_await service::prepare_keyspace_drop_announcement(db.real_database(), keyspace, ts);
});
});
}
@@ -971,7 +971,7 @@ public:
}
auto ksm = keyspace_from_thrift(ks_def);
co_return mm.prepare_keyspace_update_announcement(std::move(ksm), ts);
co_return service::prepare_keyspace_update_announcement(db.real_database(), std::move(ksm), ts);
});
});
}
@@ -984,7 +984,7 @@ public:
co_await t._query_state.get_client_state().has_schema_access(cf_def.keyspace, cf_def.name, auth::permission::ALTER);
co_return co_await t.execute_schema_command([&cf_def] (service::migration_manager& mm, data_dictionary::database db, api::timestamp_type ts) -> future<std::vector<mutation>> {
co_return co_await t.execute_schema_command([&p = t._proxy.local(), &cf_def] (service::migration_manager& mm, data_dictionary::database db, api::timestamp_type ts) -> future<std::vector<mutation>> {
auto cf = db.find_table(cf_def.keyspace, cf_def.name);
auto schema = cf.schema();
@@ -1006,7 +1006,7 @@ public:
if (schema->thrift().is_dynamic() != s->thrift().is_dynamic()) {
fail(unimplemented::cause::MIXED_CF);
}
co_return co_await mm.prepare_column_family_update_announcement(std::move(s), true, std::vector<view_ptr>(), ts);
co_return co_await service::prepare_column_family_update_announcement(p, std::move(s), true, std::vector<view_ptr>(), ts);
});
});
}