Merge 'service: migration_manager: change the prepare_ methods to functions' from Patryk Jędrzejczak

The `migration_manager` service is responsible for schema convergence in
the cluster - pushing schema changes to other nodes and pulling schema
when a version mismatch is observed. However, there is also a part of
`migration_manager` that doesn't really belong there - creating
mutations for schema updates. These are the functions with `prepare_`
prefix. They don't modify any state and don't exchange any messages.
They only need to read the local database.

We take these functions out of `migration_manager` and make them
separate functions to reduce the dependency of other modules (especially
`query_processor` and CQL statements) on `migration_manager`. Since all
of these functions only need access to `storage_proxy` (or even only
`replica::database`), doing such a refactor is not complicated. We just
have to add one parameter, either `storage_proxy` or `database` and both
of them are easily accessible in the places where these functions are
called.

This refactor makes `migration_manager` unneeded in a few functions:
- `alternator::executor::create_keyspace`,
- `cql3::statements::alter_type_statement::prepare_announcement_mutations`,
- `cql3::statements::schema_altering_statement::prepare_schema_mutations`,
- `cql3::query_processor::execute_thrift_schema_command:`,
- `thrift::handler::execute_schema_command`.

We remove the `migration_manager&` parameter from all these functions.

Fixes #14339

Closes #14875

* github.com:scylladb/scylladb:
  cql3: query_processor::execute_thrift_schema_command: remove an unused parameter
  cql3: schema_altering_statement::prepare_schema_mutations: remove an unused parameter
  cql3: alter_type_statement::prepare_announcement_mutations: change parameters
  alternator: executor::create_keyspace: remove an unused parameter
  service: migration_manager: change the prepare_ methods to functions
This commit is contained in:
Kamil Braun
2023-08-01 11:56:56 +02:00
56 changed files with 218 additions and 227 deletions

View File

@@ -81,7 +81,7 @@ static sstring_view table_status_to_sstring(table_status tbl_status) {
return "UKNOWN";
}
static future<std::vector<mutation>> create_keyspace(std::string_view keyspace_name, service::storage_proxy& sp, service::migration_manager& mm, gms::gossiper& gossiper, api::timestamp_type);
static future<std::vector<mutation>> create_keyspace(std::string_view keyspace_name, service::storage_proxy& sp, gms::gossiper& gossiper, api::timestamp_type);
static map_type attrs_type() {
static thread_local auto t = map_type_impl::get_instance(utf8_type, bytes_type, true);
@@ -573,8 +573,8 @@ future<executor::request_return_type> executor::delete_table(client_state& clien
throw api_error::resource_not_found(format("Requested resource not found: Table: {} not found", table_name));
}
auto m = co_await mm.prepare_column_family_drop_announcement(keyspace_name, table_name, group0_guard.write_timestamp(), service::migration_manager::drop_views::yes);
auto m2 = co_await mm.prepare_keyspace_drop_announcement(keyspace_name, group0_guard.write_timestamp());
auto m = co_await service::prepare_column_family_drop_announcement(_proxy, keyspace_name, table_name, group0_guard.write_timestamp(), service::drop_views::yes);
auto m2 = co_await service::prepare_keyspace_drop_announcement(_proxy.local_db(), keyspace_name, group0_guard.write_timestamp());
std::move(m2.begin(), m2.end(), std::back_inserter(m));
@@ -1120,7 +1120,7 @@ static future<executor::request_return_type> create_table_on_shard0(tracing::tra
auto ts = group0_guard.write_timestamp();
std::vector<mutation> schema_mutations;
try {
schema_mutations = co_await create_keyspace(keyspace_name, sp, mm, gossiper, ts);
schema_mutations = co_await create_keyspace(keyspace_name, sp, gossiper, ts);
} catch (exceptions::already_exists_exception&) {
if (sp.data_dictionary().has_schema(keyspace_name, table_name)) {
co_return api_error::resource_in_use(format("Table {} already exists", table_name));
@@ -1208,7 +1208,7 @@ future<executor::request_return_type> executor::update_table(client_state& clien
auto schema = builder.build();
auto m = co_await mm.prepare_column_family_update_announcement(schema, false, std::vector<view_ptr>(), group0_guard.write_timestamp());
auto m = co_await service::prepare_column_family_update_announcement(p.local(), schema, false, std::vector<view_ptr>(), group0_guard.write_timestamp());
co_await mm.announce(std::move(m), std::move(group0_guard));
@@ -4468,7 +4468,7 @@ future<executor::request_return_type> executor::describe_continuous_backups(clie
// of nodes in the cluster: A cluster with 3 or more live nodes, gets RF=3.
// A smaller cluster (presumably, a test only), gets RF=1. The user may
// manually create the keyspace to override this predefined behavior.
static future<std::vector<mutation>> create_keyspace(std::string_view keyspace_name, service::storage_proxy& sp, service::migration_manager& mm, gms::gossiper& gossiper, api::timestamp_type ts) {
static future<std::vector<mutation>> create_keyspace(std::string_view keyspace_name, service::storage_proxy& sp, gms::gossiper& gossiper, api::timestamp_type ts) {
sstring keyspace_name_str(keyspace_name);
int endpoint_count = gossiper.get_endpoint_states().size();
int rf = 3;
@@ -4480,7 +4480,7 @@ static future<std::vector<mutation>> create_keyspace(std::string_view keyspace_n
auto opts = get_network_topology_options(sp, gossiper, rf);
auto ksm = keyspace_metadata::new_keyspace(keyspace_name_str, "org.apache.cassandra.locator.NetworkTopologyStrategy", std::move(opts), true);
co_return mm.prepare_new_keyspace_announcement(ksm, ts);
co_return service::prepare_new_keyspace_announcement(sp.local_db(), ksm, ts);
}
future<> executor::start() {

View File

@@ -71,7 +71,7 @@ static future<> create_metadata_table_if_missing_impl(
auto group0_guard = co_await mm.start_group0_operation();
auto ts = group0_guard.write_timestamp();
try {
co_return co_await mm.announce(co_await mm.prepare_new_column_family_announcement(table, ts), std::move(group0_guard));
co_return co_await mm.announce(co_await ::service::prepare_new_column_family_announcement(qp.proxy(), table, ts), std::move(group0_guard));
} catch (exceptions::already_exists_exception&) {}
}
}

View File

@@ -178,7 +178,7 @@ future<> service::create_keyspace_if_missing(::service::migration_manager& mm) c
opts,
true);
co_return co_await mm.announce(mm.prepare_new_keyspace_announcement(ksm, ts), std::move(group0_guard));
co_return co_await mm.announce(::service::prepare_new_keyspace_announcement(db.real_database(), ksm, ts), std::move(group0_guard));
}
}
}

View File

@@ -890,7 +890,7 @@ query_processor::execute_schema_statement(const statements::schema_altering_stat
try {
auto group0_guard = co_await mm.start_group0_operation();
auto [ret, m, cql_warnings] = co_await stmt.prepare_schema_mutations(*this, mm, group0_guard.write_timestamp());
auto [ret, m, cql_warnings] = co_await stmt.prepare_schema_mutations(*this, group0_guard.write_timestamp());
warnings = std::move(cql_warnings);
if (!m.empty()) {
@@ -929,7 +929,7 @@ query_processor::execute_schema_statement(const statements::schema_altering_stat
future<std::string>
query_processor::execute_thrift_schema_command(
std::function<future<std::vector<mutation>>(
service::migration_manager&, data_dictionary::database, api::timestamp_type)
data_dictionary::database, api::timestamp_type)
> prepare_schema_mutations) {
assert(this_shard_id() == 0);
@@ -938,7 +938,7 @@ query_processor::execute_thrift_schema_command(
auto group0_guard = co_await mm.start_group0_operation();
auto ts = group0_guard.write_timestamp();
co_await mm.announce(co_await prepare_schema_mutations(mm, db(), ts), std::move(group0_guard));
co_await mm.announce(co_await prepare_schema_mutations(db(), ts), std::move(group0_guard));
co_return std::string(db().get_version().to_sstring());
}

View File

@@ -408,7 +408,7 @@ public:
future<std::string>
execute_thrift_schema_command(
std::function<future<std::vector<mutation>>(
service::migration_manager&, data_dictionary::database, api::timestamp_type)
data_dictionary::database, api::timestamp_type)
> prepare_schema_mutations);
std::unique_ptr<statements::prepared_statement> get_statement(

View File

@@ -75,12 +75,12 @@ void cql3::statements::alter_keyspace_statement::validate(query_processor& qp, c
}
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>>
cql3::statements::alter_keyspace_statement::prepare_schema_mutations(query_processor& qp, service::migration_manager& mm, api::timestamp_type ts) const {
cql3::statements::alter_keyspace_statement::prepare_schema_mutations(query_processor& qp, api::timestamp_type ts) const {
try {
auto old_ksm = qp.db().find_keyspace(_name).metadata();
const auto& tm = *qp.proxy().get_token_metadata_ptr();
auto m = mm.prepare_keyspace_update_announcement(_attrs->as_ks_metadata_update(old_ksm, tm), ts);
auto m = service::prepare_keyspace_update_announcement(qp.db().real_database(), _attrs->as_ks_metadata_update(old_ksm, tm), ts);
using namespace cql_transport;
auto ret = ::make_shared<event::schema_change>(

View File

@@ -33,7 +33,7 @@ public:
future<> check_access(query_processor& qp, const service::client_state& state) const override;
void validate(query_processor& qp, const service::client_state& state) const override;
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>> prepare_schema_mutations(query_processor& qp, service::migration_manager& mm, api::timestamp_type) const override;
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>> prepare_schema_mutations(query_processor& qp, api::timestamp_type) const override;
virtual std::unique_ptr<prepared_statement> prepare(data_dictionary::database db, cql_stats& stats) override;
virtual future<::shared_ptr<messages::result_message>> execute(query_processor& qp, service::query_state& state, const query_options& options) const override;
};

View File

@@ -381,10 +381,10 @@ std::pair<schema_builder, std::vector<view_ptr>> alter_table_statement::prepare_
}
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>>
alter_table_statement::prepare_schema_mutations(query_processor& qp, service::migration_manager& mm, api::timestamp_type ts) const {
alter_table_statement::prepare_schema_mutations(query_processor& qp, api::timestamp_type ts) const {
data_dictionary::database db = qp.db();
auto [cfm, view_updates] = prepare_schema_update(db);
auto m = co_await mm.prepare_column_family_update_announcement(cfm.build(), false, std::move(view_updates), ts);
auto m = co_await service::prepare_column_family_update_announcement(qp.proxy(), cfm.build(), false, std::move(view_updates), ts);
using namespace cql_transport;
auto ret = ::make_shared<event::schema_change>(

View File

@@ -54,7 +54,7 @@ public:
virtual std::unique_ptr<prepared_statement> prepare(data_dictionary::database db, cql_stats& stats) override;
virtual future<::shared_ptr<messages::result_message>> execute(query_processor& qp, service::query_state& state, const query_options& options) const override;
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>> prepare_schema_mutations(query_processor& qp, service::migration_manager& mm, api::timestamp_type) const override;
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>> prepare_schema_mutations(query_processor& qp, api::timestamp_type) const override;
private:
void add_column(const schema& schema, data_dictionary::table cf, schema_builder& cfm, std::vector<view_ptr>& view_updates, const column_identifier& column_name, const cql3_type validator, const column_definition* def, bool is_static) const;
void alter_column(const schema& schema, data_dictionary::table cf, schema_builder& cfm, std::vector<view_ptr>& view_updates, const column_identifier& column_name, const cql3_type validator, const column_definition* def, bool is_static) const;

View File

@@ -47,9 +47,9 @@ const sstring& alter_type_statement::keyspace() const
return _name.get_keyspace();
}
future<std::vector<mutation>> alter_type_statement::prepare_announcement_mutations(data_dictionary::database db, service::migration_manager& mm, api::timestamp_type ts) const {
future<std::vector<mutation>> alter_type_statement::prepare_announcement_mutations(service::storage_proxy& sp, api::timestamp_type ts) const {
std::vector<mutation> m;
auto&& ks = db.find_keyspace(keyspace());
auto&& ks = sp.data_dictionary().find_keyspace(keyspace());
auto&& all_types = ks.metadata()->user_types().get_all_types();
auto to_update = all_types.find(_name.get_user_type_name());
// Shouldn't happen, unless we race with a drop
@@ -66,10 +66,10 @@ future<std::vector<mutation>> alter_type_statement::prepare_announcement_mutatio
}
}
auto&& updated = make_updated_type(db, to_update->second);
auto&& updated = make_updated_type(sp.data_dictionary(), to_update->second);
// Now, we need to announce the type update to basically change it for new tables using this type,
// but we also need to find all existing user types and CF using it and change them.
auto res = co_await mm.prepare_update_type_announcement(updated, ts);
auto res = co_await service::prepare_update_type_announcement(sp, updated, ts);
std::move(res.begin(), res.end(), std::back_inserter(m));
for (auto&& schema : ks.metadata()->cf_meta_data() | boost::adaptors::map_values) {
@@ -85,10 +85,10 @@ future<std::vector<mutation>> alter_type_statement::prepare_announcement_mutatio
}
if (modified) {
if (schema->is_view()) {
auto res = co_await mm.prepare_view_update_announcement(view_ptr(cfm.build()), ts);
auto res = co_await service::prepare_view_update_announcement(sp, view_ptr(cfm.build()), ts);
std::move(res.begin(), res.end(), std::back_inserter(m));
} else {
auto res = co_await mm.prepare_column_family_update_announcement(cfm.build(), false, {}, ts);
auto res = co_await service::prepare_column_family_update_announcement(sp, cfm.build(), false, {}, ts);
std::move(res.begin(), res.end(), std::back_inserter(m));
}
}
@@ -98,9 +98,9 @@ future<std::vector<mutation>> alter_type_statement::prepare_announcement_mutatio
}
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>>
alter_type_statement::prepare_schema_mutations(query_processor& qp, service::migration_manager& mm, api::timestamp_type ts) const {
alter_type_statement::prepare_schema_mutations(query_processor& qp, api::timestamp_type ts) const {
try {
auto m = co_await prepare_announcement_mutations(qp.db(), mm, ts);
auto m = co_await prepare_announcement_mutations(qp.proxy(), ts);
using namespace cql_transport;
auto ret = ::make_shared<event::schema_change>(

View File

@@ -35,7 +35,7 @@ public:
virtual const sstring& keyspace() const override;
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>> prepare_schema_mutations(query_processor& qp, service::migration_manager& mm, api::timestamp_type) const override;
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>> prepare_schema_mutations(query_processor& qp, api::timestamp_type) const override;
class add_or_alter;
class renames;
@@ -48,7 +48,7 @@ private:
virtual future<> operator()(schema_ptr cfm, bool from_thrift, std::vector<view_ptr>&& view_updates, std::optional<api::timestamp_type> ts_opt) = 0;
};
future<std::vector<mutation>> prepare_announcement_mutations(data_dictionary::database db, service::migration_manager& mm, api::timestamp_type) const;
future<std::vector<mutation>> prepare_announcement_mutations(service::storage_proxy& sp, api::timestamp_type) const;
};
class alter_type_statement::add_or_alter : public alter_type_statement {

View File

@@ -76,8 +76,8 @@ view_ptr alter_view_statement::prepare_view(data_dictionary::database db) const
return view_ptr(builder.build());
}
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>> alter_view_statement::prepare_schema_mutations(query_processor& qp, service::migration_manager& mm, api::timestamp_type ts) const {
auto m = co_await mm.prepare_view_update_announcement(prepare_view(qp.db()), ts);
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>> alter_view_statement::prepare_schema_mutations(query_processor& qp, api::timestamp_type ts) const {
auto m = co_await service::prepare_view_update_announcement(qp.proxy(), prepare_view(qp.db()), ts);
using namespace cql_transport;
auto ret = ::make_shared<event::schema_change>(

View File

@@ -33,7 +33,7 @@ public:
virtual future<> check_access(query_processor& qp, const service::client_state& state) const override;
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>> prepare_schema_mutations(query_processor& qp, service::migration_manager& mm, api::timestamp_type) const override;
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>> prepare_schema_mutations(query_processor& qp, api::timestamp_type) const override;
virtual std::unique_ptr<prepared_statement> prepare(data_dictionary::database db, cql_stats& stats) override;
};

View File

@@ -84,13 +84,13 @@ std::unique_ptr<prepared_statement> create_aggregate_statement::prepare(data_dic
}
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>>
create_aggregate_statement::prepare_schema_mutations(query_processor& qp, service::migration_manager& mm, api::timestamp_type ts) const {
create_aggregate_statement::prepare_schema_mutations(query_processor& qp, api::timestamp_type ts) const {
::shared_ptr<cql_transport::event::schema_change> ret;
std::vector<mutation> m;
auto aggregate = dynamic_pointer_cast<functions::user_aggregate>(co_await validate_while_executing(qp));
if (aggregate) {
m = co_await mm.prepare_new_aggregate_announcement(aggregate, ts);
m = co_await service::prepare_new_aggregate_announcement(qp.proxy(), aggregate, ts);
ret = create_schema_change(*aggregate, true);
}

View File

@@ -25,7 +25,7 @@ namespace statements {
class create_aggregate_statement final : public create_function_statement_base {
virtual std::unique_ptr<prepared_statement> prepare(data_dictionary::database db, cql_stats& stats) override;
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>> prepare_schema_mutations(query_processor& qp, service::migration_manager& mm, api::timestamp_type) const override;
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>> prepare_schema_mutations(query_processor& qp, api::timestamp_type) const override;
virtual future<> check_access(query_processor& qp, const service::client_state& state) const override;
virtual seastar::future<shared_ptr<db::functions::function>> create(query_processor& qp, db::functions::function* old) const override;

View File

@@ -66,14 +66,14 @@ std::unique_ptr<prepared_statement> create_function_statement::prepare(data_dict
}
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>>
create_function_statement::prepare_schema_mutations(query_processor& qp, service::migration_manager& mm, api::timestamp_type ts) const {
create_function_statement::prepare_schema_mutations(query_processor& qp, api::timestamp_type ts) const {
::shared_ptr<cql_transport::event::schema_change> ret;
std::vector<mutation> m;
auto func = dynamic_pointer_cast<functions::user_function>(co_await validate_while_executing(qp));
if (func) {
m = co_await mm.prepare_new_function_announcement(func, ts);
m = co_await service::prepare_new_function_announcement(qp.proxy(), func, ts);
ret = create_schema_change(*func, true);
}

View File

@@ -23,7 +23,7 @@ namespace statements {
class create_function_statement final : public create_function_statement_base {
virtual std::unique_ptr<prepared_statement> prepare(data_dictionary::database db, cql_stats& stats) override;
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>> prepare_schema_mutations(query_processor& qp, service::migration_manager& mm, api::timestamp_type) const override;
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>> prepare_schema_mutations(query_processor& qp, api::timestamp_type) const override;
virtual seastar::future<shared_ptr<db::functions::function>> create(query_processor& qp, db::functions::function* old) const override;
sstring _language;

View File

@@ -376,7 +376,7 @@ schema_ptr create_index_statement::build_index_schema(query_processor& qp) const
}
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>>
create_index_statement::prepare_schema_mutations(query_processor& qp, service::migration_manager& mm, api::timestamp_type ts) const {
create_index_statement::prepare_schema_mutations(query_processor& qp, api::timestamp_type ts) const {
using namespace cql_transport;
auto schema = build_index_schema(qp);
@@ -384,7 +384,7 @@ create_index_statement::prepare_schema_mutations(query_processor& qp, service::m
std::vector<mutation> m;
if (schema) {
m = co_await mm.prepare_column_family_update_announcement(std::move(schema), false, {}, ts);
m = co_await service::prepare_column_family_update_announcement(qp.proxy(), std::move(schema), false, {}, ts);
ret = ::make_shared<event::schema_change>(
event::schema_change::change_type::UPDATED,

View File

@@ -47,7 +47,7 @@ public:
future<> check_access(query_processor& qp, const service::client_state& state) const override;
void validate(query_processor&, const service::client_state& state) const override;
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>> prepare_schema_mutations(query_processor& qp, service::migration_manager& mm, api::timestamp_type) const override;
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>> prepare_schema_mutations(query_processor& qp, api::timestamp_type) const override;
virtual std::unique_ptr<prepared_statement> prepare(data_dictionary::database db, cql_stats& stats) override;

View File

@@ -93,14 +93,14 @@ void create_keyspace_statement::validate(query_processor& qp, const service::cli
#endif
}
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>> create_keyspace_statement::prepare_schema_mutations(query_processor& qp, service::migration_manager& mm, api::timestamp_type ts) const {
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>> create_keyspace_statement::prepare_schema_mutations(query_processor& qp, api::timestamp_type ts) const {
using namespace cql_transport;
const auto& tm = *qp.proxy().get_token_metadata_ptr();
::shared_ptr<event::schema_change> ret;
std::vector<mutation> m;
try {
m = mm.prepare_new_keyspace_announcement(_attrs->as_ks_metadata(_name, tm), ts);
m = service::prepare_new_keyspace_announcement(qp.db().real_database(), _attrs->as_ks_metadata(_name, tm), ts);
ret = ::make_shared<event::schema_change>(
event::schema_change::change_type::CREATED,

View File

@@ -64,7 +64,7 @@ public:
virtual void validate(query_processor&, const service::client_state& state) const override;
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>> prepare_schema_mutations(query_processor& qp, service::migration_manager& mm, api::timestamp_type) const override;
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>> prepare_schema_mutations(query_processor& qp, api::timestamp_type) const override;
virtual std::unique_ptr<prepared_statement> prepare(data_dictionary::database db, cql_stats& stats) override;

View File

@@ -71,12 +71,12 @@ std::vector<column_definition> create_table_statement::get_columns() const
}
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>>
create_table_statement::prepare_schema_mutations(query_processor& qp, service::migration_manager& mm, api::timestamp_type ts) const {
create_table_statement::prepare_schema_mutations(query_processor& qp, api::timestamp_type ts) const {
::shared_ptr<cql_transport::event::schema_change> ret;
std::vector<mutation> m;
try {
m = co_await mm.prepare_new_column_family_announcement(get_cf_meta_data(qp.db()), ts);
m = co_await service::prepare_new_column_family_announcement(qp.proxy(), get_cf_meta_data(qp.db()), ts);
using namespace cql_transport;
ret = ::make_shared<event::schema_change>(

View File

@@ -70,7 +70,7 @@ public:
virtual future<> check_access(query_processor& qp, const service::client_state& state) const override;
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>> prepare_schema_mutations(query_processor& qp, service::migration_manager& mm, api::timestamp_type) const override;
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>> prepare_schema_mutations(query_processor& qp, api::timestamp_type) const override;
virtual std::unique_ptr<prepared_statement> prepare(data_dictionary::database db, cql_stats& stats) override;

View File

@@ -118,13 +118,13 @@ std::optional<user_type> create_type_statement::make_type(query_processor& qp) c
return type;
}
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>> create_type_statement::prepare_schema_mutations(query_processor& qp, service::migration_manager& mm, api::timestamp_type ts) const {
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>> create_type_statement::prepare_schema_mutations(query_processor& qp, api::timestamp_type ts) const {
::shared_ptr<cql_transport::event::schema_change> ret;
std::vector<mutation> m;
try {
auto t = make_type(qp);
if (t) {
m = co_await mm.prepare_new_type_announcement(*t, ts);
m = co_await service::prepare_new_type_announcement(qp.proxy(), *t, ts);
using namespace cql_transport;
ret = ::make_shared<event::schema_change>(

View File

@@ -37,7 +37,7 @@ public:
virtual const sstring& keyspace() const override;
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>> prepare_schema_mutations(query_processor& qp, service::migration_manager& mm, api::timestamp_type) const override;
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>> prepare_schema_mutations(query_processor& qp, api::timestamp_type) const override;
virtual std::unique_ptr<prepared_statement> prepare(data_dictionary::database db, cql_stats& stats) override;

View File

@@ -362,12 +362,12 @@ std::pair<view_ptr, cql3::cql_warnings_vec> create_view_statement::prepare_view(
}
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>>
create_view_statement::prepare_schema_mutations(query_processor& qp, service::migration_manager& mm, api::timestamp_type ts) const {
create_view_statement::prepare_schema_mutations(query_processor& qp, api::timestamp_type ts) const {
::shared_ptr<cql_transport::event::schema_change> ret;
std::vector<mutation> m;
auto [definition, warnings] = prepare_view(qp.db());
try {
m = co_await mm.prepare_new_view_announcement(std::move(definition), ts);
m = co_await service::prepare_new_view_announcement(qp.proxy(), std::move(definition), ts);
using namespace cql_transport;
ret = ::make_shared<event::schema_change>(
event::schema_change::change_type::CREATED,

View File

@@ -57,7 +57,7 @@ public:
// Functions we need to override to subclass schema_altering_statement
virtual future<> check_access(query_processor& qp, const service::client_state& state) const override;
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>> prepare_schema_mutations(query_processor& qp, service::migration_manager& mm, api::timestamp_type) const override;
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>> prepare_schema_mutations(query_processor& qp, api::timestamp_type) const override;
virtual std::unique_ptr<prepared_statement> prepare(data_dictionary::database db, cql_stats& stats) override;

View File

@@ -24,7 +24,7 @@ std::unique_ptr<prepared_statement> drop_aggregate_statement::prepare(data_dicti
}
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>>
drop_aggregate_statement::prepare_schema_mutations(query_processor& qp, service::migration_manager& mm, api::timestamp_type ts) const {
drop_aggregate_statement::prepare_schema_mutations(query_processor& qp, api::timestamp_type ts) const {
::shared_ptr<cql_transport::event::schema_change> ret;
std::vector<mutation> m;
@@ -34,7 +34,7 @@ drop_aggregate_statement::prepare_schema_mutations(query_processor& qp, service:
if (!user_aggr) {
throw exceptions::invalid_request_exception(format("'{}' is not a user defined aggregate", func));
}
m = co_await mm.prepare_aggregate_drop_announcement(user_aggr, ts);
m = co_await service::prepare_aggregate_drop_announcement(qp.proxy(), user_aggr, ts);
ret = create_schema_change(*func, false);
}

View File

@@ -15,7 +15,7 @@ class query_processor;
namespace statements {
class drop_aggregate_statement final : public drop_function_statement_base {
virtual std::unique_ptr<prepared_statement> prepare(data_dictionary::database db, cql_stats& stats) override;
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>> prepare_schema_mutations(query_processor& qp, service::migration_manager& mm, api::timestamp_type) const override;
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>> prepare_schema_mutations(query_processor& qp, api::timestamp_type) const override;
public:
drop_aggregate_statement(functions::function_name name, std::vector<shared_ptr<cql3_type::raw>> arg_types,

View File

@@ -24,7 +24,7 @@ std::unique_ptr<prepared_statement> drop_function_statement::prepare(data_dictio
}
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>>
drop_function_statement::prepare_schema_mutations(query_processor& qp, service::migration_manager& mm, api::timestamp_type ts) const {
drop_function_statement::prepare_schema_mutations(query_processor& qp, api::timestamp_type ts) const {
::shared_ptr<cql_transport::event::schema_change> ret;
std::vector<mutation> m;
@@ -38,7 +38,7 @@ drop_function_statement::prepare_schema_mutations(query_processor& qp, service::
if (auto aggregate = functions::functions::used_by_user_aggregate(user_func)) {
throw exceptions::invalid_request_exception(format("Cannot delete function {}, as it is used by user-defined aggregate {}", func, *aggregate));
}
m = co_await mm.prepare_function_drop_announcement(user_func, ts);
m = co_await service::prepare_function_drop_announcement(qp.proxy(), user_func, ts);
ret = create_schema_change(*func, false);
}

View File

@@ -15,7 +15,7 @@ class query_processor;
namespace statements {
class drop_function_statement final : public drop_function_statement_base {
virtual std::unique_ptr<prepared_statement> prepare(data_dictionary::database db, cql_stats& stats) override;
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>> prepare_schema_mutations(query_processor& qp, service::migration_manager& mm, api::timestamp_type) const override;
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>> prepare_schema_mutations(query_processor& qp, api::timestamp_type) const override;
public:
drop_function_statement(functions::function_name name, std::vector<shared_ptr<cql3_type::raw>> arg_types,

View File

@@ -73,13 +73,13 @@ schema_ptr drop_index_statement::make_drop_idex_schema(query_processor& qp) cons
}
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>>
drop_index_statement::prepare_schema_mutations(query_processor& qp, service::migration_manager& mm, api::timestamp_type ts) const {
drop_index_statement::prepare_schema_mutations(query_processor& qp, api::timestamp_type ts) const {
::shared_ptr<cql_transport::event::schema_change> ret;
std::vector<mutation> m;
auto cfm = make_drop_idex_schema(qp);
if (cfm) {
m = co_await mm.prepare_column_family_update_announcement(cfm, false, {}, ts);
m = co_await service::prepare_column_family_update_announcement(qp.proxy(), cfm, false, {}, ts);
using namespace cql_transport;
ret = ::make_shared<event::schema_change>(event::schema_change::change_type::UPDATED,

View File

@@ -44,7 +44,7 @@ public:
virtual void validate(query_processor&, const service::client_state& state) const override;
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>> prepare_schema_mutations(query_processor& qp, service::migration_manager& mm, api::timestamp_type) const override;
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>> prepare_schema_mutations(query_processor& qp, api::timestamp_type) const override;
virtual std::unique_ptr<prepared_statement> prepare(data_dictionary::database db, cql_stats& stats) override;
private:

View File

@@ -47,12 +47,12 @@ const sstring& drop_keyspace_statement::keyspace() const
}
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>>
drop_keyspace_statement::prepare_schema_mutations(query_processor& qp, service::migration_manager& mm, api::timestamp_type ts) const {
drop_keyspace_statement::prepare_schema_mutations(query_processor& qp, api::timestamp_type ts) const {
std::vector<mutation> m;
::shared_ptr<cql_transport::event::schema_change> ret;
try {
m = co_await mm.prepare_keyspace_drop_announcement(_keyspace, ts);
m = co_await service::prepare_keyspace_drop_announcement(qp.db().real_database(), _keyspace, ts);
using namespace cql_transport;
ret = ::make_shared<event::schema_change>(

View File

@@ -30,7 +30,7 @@ public:
virtual const sstring& keyspace() const override;
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>> prepare_schema_mutations(query_processor& qp, service::migration_manager& mm, api::timestamp_type) const override;
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>> prepare_schema_mutations(query_processor& qp, api::timestamp_type) const override;
virtual std::unique_ptr<prepared_statement> prepare(data_dictionary::database db, cql_stats& stats) override;
};

View File

@@ -33,7 +33,7 @@ future<> drop_table_statement::check_access(query_processor& qp, const service::
}
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>>
drop_table_statement::prepare_schema_mutations(query_processor& qp, service::migration_manager& mm, api::timestamp_type ts) const {
drop_table_statement::prepare_schema_mutations(query_processor& qp, api::timestamp_type ts) const {
::shared_ptr<cql_transport::event::schema_change> ret;
std::vector<mutation> m;
@@ -45,7 +45,7 @@ drop_table_statement::prepare_schema_mutations(query_processor& qp, service::mig
}
try {
m = co_await mm.prepare_column_family_drop_announcement(keyspace(), column_family(), ts);
m = co_await service::prepare_column_family_drop_announcement(qp.proxy(), keyspace(), column_family(), ts);
using namespace cql_transport;
ret = ::make_shared<event::schema_change>(

View File

@@ -26,7 +26,7 @@ public:
virtual future<> check_access(query_processor& qp, const service::client_state& state) const override;
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>> prepare_schema_mutations(query_processor& qp, service::migration_manager& mm, api::timestamp_type) const override;
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>> prepare_schema_mutations(query_processor& qp, api::timestamp_type) const override;
virtual std::unique_ptr<prepared_statement> prepare(data_dictionary::database db, cql_stats& stats) override;
};

View File

@@ -121,7 +121,7 @@ const sstring& drop_type_statement::keyspace() const
}
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>>
drop_type_statement::prepare_schema_mutations(query_processor& qp, service::migration_manager& mm, api::timestamp_type ts) const {
drop_type_statement::prepare_schema_mutations(query_processor& qp, api::timestamp_type ts) const {
validate_while_executing(qp);
data_dictionary::database db = qp.db();
@@ -137,7 +137,7 @@ drop_type_statement::prepare_schema_mutations(query_processor& qp, service::migr
// Can happen with if_exists
if (to_drop != all_types.end()) {
m = co_await mm.prepare_type_drop_announcement(to_drop->second, ts);
m = co_await service::prepare_type_drop_announcement(qp.proxy(), to_drop->second, ts);
using namespace cql_transport;
ret = ::make_shared<event::schema_change>(

View File

@@ -29,7 +29,7 @@ public:
virtual const sstring& keyspace() const override;
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>> prepare_schema_mutations(query_processor& qp, service::migration_manager& mm, api::timestamp_type) const override;
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>> prepare_schema_mutations(query_processor& qp, api::timestamp_type) const override;
virtual std::unique_ptr<prepared_statement> prepare(data_dictionary::database db, cql_stats& stats) override;

View File

@@ -42,12 +42,12 @@ future<> drop_view_statement::check_access(query_processor& qp, const service::c
}
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>>
drop_view_statement::prepare_schema_mutations(query_processor& qp, service::migration_manager& mm, api::timestamp_type ts) const {
drop_view_statement::prepare_schema_mutations(query_processor& qp, api::timestamp_type ts) const {
::shared_ptr<cql_transport::event::schema_change> ret;
std::vector<mutation> m;
try {
m = co_await mm.prepare_view_drop_announcement(keyspace(), column_family(), ts);
m = co_await service::prepare_view_drop_announcement(qp.proxy(), keyspace(), column_family(), ts);
using namespace cql_transport;
ret = ::make_shared<event::schema_change>(

View File

@@ -32,7 +32,7 @@ public:
virtual future<> check_access(query_processor& qp, const service::client_state& state) const override;
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>> prepare_schema_mutations(query_processor& qp, service::migration_manager& mm, api::timestamp_type) const override;
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>> prepare_schema_mutations(query_processor& qp, api::timestamp_type) const override;
virtual std::unique_ptr<prepared_statement> prepare(data_dictionary::database db, cql_stats& stats) override;
};

View File

@@ -20,10 +20,6 @@
class mutation;
namespace service {
class migration_manager;
}
namespace cql3 {
class query_processor;
@@ -62,7 +58,7 @@ protected:
execute(query_processor& qp, service::query_state& state, const query_options& options) const override;
public:
virtual future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>> prepare_schema_mutations(query_processor& qp, service::migration_manager& mm, api::timestamp_type) const = 0;
virtual future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>> prepare_schema_mutations(query_processor& qp, api::timestamp_type) const = 0;
};
}

View File

@@ -253,8 +253,8 @@ static future<> add_new_columns_if_missing(replica::database& db, ::service::mig
schema_ptr table = b.build();
try {
auto ts = group0_guard.write_timestamp();
co_return co_await mm.announce(co_await mm.prepare_column_family_update_announcement(table, false, std::vector<view_ptr>(), ts),
std::move(group0_guard), "Add new columns to system_distributed.service_levels");
co_return co_await mm.announce(co_await service::prepare_column_family_update_announcement(mm.get_storage_proxy(), table, false,
std::vector<view_ptr>(), ts), std::move(group0_guard), "Add new columns to system_distributed.service_levels");
} catch (...) {}
}
} catch (...) {
@@ -282,7 +282,7 @@ future<> system_distributed_keyspace::start() {
"org.apache.cassandra.locator.SimpleStrategy",
{{"replication_factor", "3"}},
true /* durable_writes */);
co_await _mm.announce(_mm.prepare_new_keyspace_announcement(ksm, ts), std::move(group0_guard),
co_await _mm.announce(service::prepare_new_keyspace_announcement(_sp.local_db(), ksm, ts), std::move(group0_guard),
"Create system_distributed keyspace");
} catch (exceptions::already_exists_exception&) {}
} else {
@@ -299,7 +299,7 @@ future<> system_distributed_keyspace::start() {
"org.apache.cassandra.locator.EverywhereStrategy",
{},
true /* durable_writes */);
co_await _mm.announce(_mm.prepare_new_keyspace_announcement(ksm, ts), std::move(group0_guard),
co_await _mm.announce(service::prepare_new_keyspace_announcement(_sp.local_db(), ksm, ts), std::move(group0_guard),
"Create system_distributed_everywhere keyspace");
} catch (exceptions::already_exists_exception&) {}
} else {
@@ -318,7 +318,7 @@ future<> system_distributed_keyspace::start() {
auto m = co_await map_reduce(tables,
/* Mapper */ [this, ts] (auto&& table) -> future<std::vector<mutation>> {
try {
co_return co_await _mm.prepare_new_column_family_announcement(std::move(table), ts);
co_return co_await service::prepare_new_column_family_announcement(_sp, std::move(table), ts);
} catch (exceptions::already_exists_exception&) {
co_return std::vector<mutation>();
}

View File

@@ -62,7 +62,8 @@ future<> modify_tags(service::migration_manager& mm, sstring ks, sstring cf,
schema_builder builder(s);
builder.add_extension(tags_extension::NAME, ::make_shared<tags_extension>(tags));
auto m = co_await mm.prepare_column_family_update_announcement(builder.build(), false, std::vector<view_ptr>(), group0_guard.write_timestamp());
auto m = co_await service::prepare_column_family_update_announcement(mm.get_storage_proxy(),
builder.build(), false, std::vector<view_ptr>(), group0_guard.write_timestamp());
co_await mm.announce(std::move(m), std::move(group0_guard));
});

View File

@@ -198,7 +198,7 @@ future<> create_keyspace_if_not_exists_impl(seastar::sharded<service::storage_pr
attrs.add_property(cql3::statements::ks_prop_defs::KW_REPLICATION, replication_properties);
attrs.validate();
auto muts = mml.prepare_new_keyspace_announcement(attrs.as_ks_metadata(ks_name, *tm), ts);
auto muts = service::prepare_new_keyspace_announcement(db.real_database(), attrs.as_ks_metadata(ks_name, *tm), ts);
std::move(muts.begin(), muts.end(), std::back_inserter(ks_mutations));
}
@@ -210,15 +210,15 @@ future<> create_keyspace_if_not_exists_impl(seastar::sharded<service::storage_pr
auto group0_guard = co_await mml.start_group0_operation();
std::vector<mutation> table_mutations;
auto table_gen = std::bind_front(
[] (data_dictionary::database db, service::migration_manager& mml, std::vector<mutation>& table_mutations,
[] (data_dictionary::database db, service::storage_proxy& sp, std::vector<mutation>& table_mutations,
api::timestamp_type ts, sstring ks_name, sstring cf_name, schema_ptr schema) -> future<> {
if (db.has_schema(ks_name, cf_name)) {
co_return;
}
logger.info("Create keyspace: {}, table: {} for redis.", ks_name, cf_name);
auto muts = co_await mml.prepare_new_column_family_announcement(schema, ts);
auto muts = co_await service::prepare_new_column_family_announcement(sp, schema, ts);
std::move(muts.begin(), muts.end(), std::back_inserter(table_mutations));
}, db, std::ref(mml), std::ref(table_mutations), group0_guard.write_timestamp());
}, db, std::ref(proxy.local()), std::ref(table_mutations), group0_guard.write_timestamp());
co_await coroutine::parallel_for_each(ks_names, [table_gen = std::move(table_gen)] (const sstring& ks_name) mutable {
return parallel_for_each(tables, [ks_name, table_gen = std::move(table_gen)] (table t) {

View File

@@ -658,38 +658,32 @@ public void notifyDropAggregate(UDAggregate udf)
}
#endif
std::vector<mutation> migration_manager::prepare_keyspace_update_announcement(lw_shared_ptr<keyspace_metadata> ksm, api::timestamp_type ts) {
auto& proxy = _storage_proxy;
auto& db = proxy.get_db().local();
std::vector<mutation> prepare_keyspace_update_announcement(replica::database& db, lw_shared_ptr<keyspace_metadata> ksm, api::timestamp_type ts) {
db.validate_keyspace_update(*ksm);
mlogger.info("Update Keyspace: {}", ksm);
return db::schema_tables::make_create_keyspace_mutations(db.features().cluster_schema_features(), ksm, ts);
}
std::vector<mutation> migration_manager::prepare_new_keyspace_announcement(lw_shared_ptr<keyspace_metadata> ksm, api::timestamp_type timestamp) {
auto& proxy = _storage_proxy;
auto& db = proxy.get_db().local();
std::vector<mutation> prepare_new_keyspace_announcement(replica::database& db, lw_shared_ptr<keyspace_metadata> ksm, api::timestamp_type timestamp) {
db.validate_new_keyspace(*ksm);
mlogger.info("Create new Keyspace: {}", ksm);
return db::schema_tables::make_create_keyspace_mutations(db.features().cluster_schema_features(), ksm, timestamp);
}
future<std::vector<mutation>> migration_manager::include_keyspace(
const keyspace_metadata& keyspace, std::vector<mutation> mutations) {
static future<std::vector<mutation>> include_keyspace(
storage_proxy& sp, const keyspace_metadata& keyspace, std::vector<mutation> mutations) {
// Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631).
mutation m = co_await db::schema_tables::read_keyspace_mutation(_storage_proxy.container(), keyspace.name());
mutation m = co_await db::schema_tables::read_keyspace_mutation(sp.container(), keyspace.name());
mutations.push_back(std::move(m));
co_return std::move(mutations);
}
future<std::vector<mutation>> migration_manager::prepare_new_column_family_announcement(schema_ptr cfm, api::timestamp_type timestamp) {
future<std::vector<mutation>> prepare_new_column_family_announcement(storage_proxy& sp, schema_ptr cfm, api::timestamp_type timestamp) {
#if 0
cfm.validate();
#endif
try {
auto& db = _storage_proxy.get_db().local();
auto& db = sp.get_db().local();
auto&& keyspace = db.find_keyspace(cfm->ks_name());
if (db.has_schema(cfm->ks_name(), cfm->cf_name())) {
throw exceptions::already_exists_exception(cfm->ks_name(), cfm->cf_name());
@@ -701,25 +695,26 @@ future<std::vector<mutation>> migration_manager::prepare_new_column_family_annou
mlogger.info("Create new ColumnFamily: {}", cfm);
auto ksm = keyspace.metadata();
return seastar::async([this, cfm, timestamp, ksm] {
return seastar::async([&db, cfm, timestamp, ksm] {
auto mutations = db::schema_tables::make_create_table_mutations(cfm, timestamp);
get_notifier().before_create_column_family(*cfm, mutations, timestamp);
db.get_notifier().before_create_column_family(*cfm, mutations, timestamp);
return mutations;
}).then([this, ksm](std::vector<mutation> mutations) {
return include_keyspace(*ksm, std::move(mutations));
}).then([&sp, ksm](std::vector<mutation> mutations) {
return include_keyspace(sp, *ksm, std::move(mutations));
});
} catch (const replica::no_such_keyspace& e) {
throw exceptions::configuration_exception(format("Cannot add table '{}' to non existing keyspace '{}'.", cfm->cf_name(), cfm->ks_name()));
}
}
future<std::vector<mutation>> migration_manager::prepare_column_family_update_announcement(schema_ptr cfm, bool from_thrift, std::vector<view_ptr> view_updates, api::timestamp_type ts) {
future<std::vector<mutation>> prepare_column_family_update_announcement(storage_proxy& sp,
schema_ptr cfm, bool from_thrift, std::vector<view_ptr> view_updates, api::timestamp_type ts) {
warn(unimplemented::cause::VALIDATION);
#if 0
cfm.validate();
#endif
try {
auto& db = _storage_proxy.get_db().local();
auto& db = sp.local_db();
auto&& old_schema = db.find_column_family(cfm->ks_name(), cfm->cf_name()).schema(); // FIXME: Should we lookup by id?
#if 0
oldCfm.validateCompatility(cfm);
@@ -736,9 +731,9 @@ future<std::vector<mutation>> migration_manager::prepare_column_family_update_an
co_await coroutine::maybe_yield();
}
co_await seastar::async([&] {
get_notifier().before_update_column_family(*cfm, *old_schema, mutations, ts);
db.get_notifier().before_update_column_family(*cfm, *old_schema, mutations, ts);
});
co_return co_await include_keyspace(*keyspace, std::move(mutations));
co_return co_await include_keyspace(sp, *keyspace, std::move(mutations));
} catch (const replica::no_such_column_family& e) {
auto&& ex = std::make_exception_ptr(exceptions::configuration_exception(format("Cannot update non existing table '{}' in keyspace '{}'.",
cfm->cf_name(), cfm->ks_name())));
@@ -746,69 +741,68 @@ future<std::vector<mutation>> migration_manager::prepare_column_family_update_an
}
}
future<std::vector<mutation>> migration_manager::do_prepare_new_type_announcement(user_type new_type, api::timestamp_type ts) {
auto& db = _storage_proxy.get_db().local();
static future<std::vector<mutation>> do_prepare_new_type_announcement(storage_proxy& sp, user_type new_type, api::timestamp_type ts) {
auto& db = sp.local_db();
auto&& keyspace = db.find_keyspace(new_type->_keyspace);
auto mutations = db::schema_tables::make_create_type_mutations(keyspace.metadata(), new_type, ts);
return include_keyspace(*keyspace.metadata(), std::move(mutations));
return include_keyspace(sp, *keyspace.metadata(), std::move(mutations));
}
future<std::vector<mutation>> migration_manager::prepare_new_type_announcement(user_type new_type, api::timestamp_type ts) {
future<std::vector<mutation>> prepare_new_type_announcement(storage_proxy& sp, user_type new_type, api::timestamp_type ts) {
mlogger.info("Prepare Create new User Type: {}", new_type->get_name_as_string());
return do_prepare_new_type_announcement(std::move(new_type), ts);
return do_prepare_new_type_announcement(sp, std::move(new_type), ts);
}
future<std::vector<mutation>> migration_manager::prepare_update_type_announcement(user_type updated_type, api::timestamp_type ts) {
future<std::vector<mutation>> prepare_update_type_announcement(storage_proxy& sp, user_type updated_type, api::timestamp_type ts) {
mlogger.info("Prepare Update User Type: {}", updated_type->get_name_as_string());
return do_prepare_new_type_announcement(updated_type, ts);
return do_prepare_new_type_announcement(sp, updated_type, ts);
}
future<std::vector<mutation>> migration_manager::prepare_new_function_announcement(shared_ptr<cql3::functions::user_function> func, api::timestamp_type ts) {
auto& db = _storage_proxy.get_db().local();
future<std::vector<mutation>> prepare_new_function_announcement(storage_proxy& sp, shared_ptr<cql3::functions::user_function> func, api::timestamp_type ts) {
auto& db = sp.local_db();
auto&& keyspace = db.find_keyspace(func->name().keyspace);
auto mutations = db::schema_tables::make_create_function_mutations(func, ts);
return include_keyspace(*keyspace.metadata(), std::move(mutations));
return include_keyspace(sp, *keyspace.metadata(), std::move(mutations));
}
future<std::vector<mutation>> migration_manager::prepare_function_drop_announcement(shared_ptr<cql3::functions::user_function> func, api::timestamp_type ts) {
auto& db = _storage_proxy.get_db().local();
future<std::vector<mutation>> prepare_function_drop_announcement(storage_proxy& sp, shared_ptr<cql3::functions::user_function> func, api::timestamp_type ts) {
auto& db = sp.local_db();
auto&& keyspace = db.find_keyspace(func->name().keyspace);
auto mutations = db::schema_tables::make_drop_function_mutations(func, ts);
return include_keyspace(*keyspace.metadata(), std::move(mutations));
return include_keyspace(sp, *keyspace.metadata(), std::move(mutations));
}
future<std::vector<mutation>> migration_manager::prepare_new_aggregate_announcement(shared_ptr<cql3::functions::user_aggregate> aggregate, api::timestamp_type ts) {
auto& db = _storage_proxy.get_db().local();
future<std::vector<mutation>> prepare_new_aggregate_announcement(storage_proxy& sp, shared_ptr<cql3::functions::user_aggregate> aggregate, api::timestamp_type ts) {
auto& db = sp.local_db();
auto&& keyspace = db.find_keyspace(aggregate->name().keyspace);
auto mutations = db::schema_tables::make_create_aggregate_mutations(db.features().cluster_schema_features(), aggregate, ts);
return include_keyspace(*keyspace.metadata(), std::move(mutations));
return include_keyspace(sp, *keyspace.metadata(), std::move(mutations));
}
future<std::vector<mutation>> migration_manager::prepare_aggregate_drop_announcement(shared_ptr<cql3::functions::user_aggregate> aggregate, api::timestamp_type ts) {
auto& db = _storage_proxy.get_db().local();
future<std::vector<mutation>> prepare_aggregate_drop_announcement(storage_proxy& sp, shared_ptr<cql3::functions::user_aggregate> aggregate, api::timestamp_type ts) {
auto& db = sp.local_db();
auto&& keyspace = db.find_keyspace(aggregate->name().keyspace);
auto mutations = db::schema_tables::make_drop_aggregate_mutations(db.features().cluster_schema_features(), aggregate, ts);
return include_keyspace(*keyspace.metadata(), std::move(mutations));
return include_keyspace(sp, *keyspace.metadata(), std::move(mutations));
}
future<std::vector<mutation>> migration_manager::prepare_keyspace_drop_announcement(const sstring& ks_name, api::timestamp_type ts) {
auto& db = _storage_proxy.get_db().local();
future<std::vector<mutation>> prepare_keyspace_drop_announcement(replica::database& db, const sstring& ks_name, api::timestamp_type ts) {
if (!db.has_keyspace(ks_name)) {
throw exceptions::configuration_exception(format("Cannot drop non existing keyspace '{}'.", ks_name));
}
auto& keyspace = db.find_keyspace(ks_name);
mlogger.info("Drop Keyspace '{}'", ks_name);
return seastar::async([this, &db, &keyspace, ts, ks_name] {
return seastar::async([&db, &keyspace, ts, ks_name] {
auto mutations = db::schema_tables::make_drop_keyspace_mutations(db.features().cluster_schema_features(), keyspace.metadata(), ts);
get_notifier().before_drop_keyspace(ks_name, mutations, ts);
db.get_notifier().before_drop_keyspace(ks_name, mutations, ts);
return mutations;
});
}
future<std::vector<mutation>> migration_manager::prepare_column_family_drop_announcement(const sstring& ks_name,
const sstring& cf_name, api::timestamp_type ts, drop_views drop_views) {
future<std::vector<mutation>> prepare_column_family_drop_announcement(storage_proxy& sp,
const sstring& ks_name, const sstring& cf_name, api::timestamp_type ts, drop_views drop_views) {
try {
auto& db = _storage_proxy.get_db().local();
auto& db = sp.local_db();
auto& old_cfm = db.find_column_family(ks_name, cf_name);
auto& schema = old_cfm.schema();
if (schema->is_view()) {
@@ -846,29 +840,29 @@ future<std::vector<mutation>> migration_manager::prepare_column_family_drop_anno
// notifiers must run in seastar thread
co_await seastar::async([&] {
get_notifier().before_drop_column_family(*schema, mutations, ts);
db.get_notifier().before_drop_column_family(*schema, mutations, ts);
});
co_return co_await include_keyspace(*keyspace, std::move(mutations));
co_return co_await include_keyspace(sp, *keyspace, std::move(mutations));
} catch (const replica::no_such_column_family& e) {
auto&& ex = std::make_exception_ptr(exceptions::configuration_exception(format("Cannot drop non existing table '{}' in keyspace '{}'.", cf_name, ks_name)));
co_return coroutine::exception(std::move(ex));
}
}
future<std::vector<mutation>> migration_manager::prepare_type_drop_announcement(user_type dropped_type, api::timestamp_type ts) {
auto& db = _storage_proxy.get_db().local();
future<std::vector<mutation>> prepare_type_drop_announcement(storage_proxy& sp, user_type dropped_type, api::timestamp_type ts) {
auto& db = sp.local_db();
auto&& keyspace = db.find_keyspace(dropped_type->_keyspace);
mlogger.info("Drop User Type: {}", dropped_type->get_name_as_string());
auto mutations =
db::schema_tables::make_drop_type_mutations(keyspace.metadata(), dropped_type, ts);
return include_keyspace(*keyspace.metadata(), std::move(mutations));
return include_keyspace(sp, *keyspace.metadata(), std::move(mutations));
}
future<std::vector<mutation>> migration_manager::prepare_new_view_announcement(view_ptr view, api::timestamp_type ts) {
future<std::vector<mutation>> prepare_new_view_announcement(storage_proxy& sp, view_ptr view, api::timestamp_type ts) {
#if 0
view.metadata.validate();
#endif
auto& db = _storage_proxy.get_db().local();
auto& db = sp.local_db();
try {
auto&& keyspace = db.find_keyspace(view->ks_name()).metadata();
if (keyspace->cf_meta_data().contains(view->cf_name())) {
@@ -876,18 +870,18 @@ future<std::vector<mutation>> migration_manager::prepare_new_view_announcement(v
}
mlogger.info("Create new view: {}", view);
auto mutations = db::schema_tables::make_create_view_mutations(keyspace, std::move(view), ts);
co_return co_await include_keyspace(*keyspace, std::move(mutations));
co_return co_await include_keyspace(sp, *keyspace, std::move(mutations));
} catch (const replica::no_such_keyspace& e) {
auto&& ex = std::make_exception_ptr(exceptions::configuration_exception(format("Cannot add view '{}' to non existing keyspace '{}'.", view->cf_name(), view->ks_name())));
co_return coroutine::exception(std::move(ex));
}
}
future<std::vector<mutation>> migration_manager::prepare_view_update_announcement(view_ptr view, api::timestamp_type ts) {
future<std::vector<mutation>> prepare_view_update_announcement(storage_proxy& sp, view_ptr view, api::timestamp_type ts) {
#if 0
view.metadata.validate();
#endif
auto db = _storage_proxy.data_dictionary();
auto db = sp.data_dictionary();
try {
auto&& keyspace = db.find_keyspace(view->ks_name()).metadata();
auto& old_view = keyspace->cf_meta_data().at(view->cf_name());
@@ -899,7 +893,7 @@ future<std::vector<mutation>> migration_manager::prepare_view_update_announcemen
#endif
mlogger.info("Update view '{}.{}' From {} To {}", view->ks_name(), view->cf_name(), *old_view, *view);
auto mutations = db::schema_tables::make_update_view_mutations(keyspace, view_ptr(old_view), std::move(view), ts, true);
co_return co_await include_keyspace(*keyspace, std::move(mutations));
co_return co_await include_keyspace(sp, *keyspace, std::move(mutations));
} catch (const std::out_of_range& e) {
auto&& ex = std::make_exception_ptr(exceptions::configuration_exception(format("Cannot update non existing materialized view '{}' in keyspace '{}'.",
view->cf_name(), view->ks_name())));
@@ -907,8 +901,8 @@ future<std::vector<mutation>> migration_manager::prepare_view_update_announcemen
}
}
future<std::vector<mutation>> migration_manager::prepare_view_drop_announcement(const sstring& ks_name, const sstring& cf_name, api::timestamp_type ts) {
auto& db = _storage_proxy.get_db().local();
future<std::vector<mutation>> prepare_view_drop_announcement(storage_proxy& sp, const sstring& ks_name, const sstring& cf_name, api::timestamp_type ts) {
auto& db = sp.local_db();
try {
auto& view = db.find_column_family(ks_name, cf_name).schema();
if (!view->is_view()) {
@@ -920,7 +914,7 @@ future<std::vector<mutation>> migration_manager::prepare_view_drop_announcement(
auto keyspace = db.find_keyspace(ks_name).metadata();
mlogger.info("Drop view '{}.{}'", view->ks_name(), view->cf_name());
auto mutations = db::schema_tables::make_drop_view_mutations(keyspace, view_ptr(std::move(view)), ts);
return include_keyspace(*keyspace, std::move(mutations));
return include_keyspace(sp, *keyspace, std::move(mutations));
} catch (const replica::no_such_column_family& e) {
throw exceptions::configuration_exception(format("Cannot drop non existing materialized view '{}' in keyspace '{}'.",
cf_name, ks_name));

View File

@@ -133,43 +133,6 @@ public:
bool should_pull_schema_from(const gms::inet_address& endpoint);
bool has_compatible_schema_tables_version(const gms::inet_address& endpoint);
std::vector<mutation> prepare_keyspace_update_announcement(lw_shared_ptr<keyspace_metadata> ksm, api::timestamp_type);
std::vector<mutation> prepare_new_keyspace_announcement(lw_shared_ptr<keyspace_metadata> ksm, api::timestamp_type);
// The timestamp parameter can be used to ensure that all nodes update their internal tables' schemas
// with identical timestamps, which can prevent an undeeded schema exchange
future<std::vector<mutation>> prepare_column_family_update_announcement(schema_ptr cfm, bool from_thrift, std::vector<view_ptr> view_updates, api::timestamp_type ts);
future<std::vector<mutation>> prepare_new_column_family_announcement(schema_ptr cfm, api::timestamp_type timestamp);
future<std::vector<mutation>> prepare_new_type_announcement(user_type new_type, api::timestamp_type);
future<std::vector<mutation>> prepare_new_function_announcement(shared_ptr<cql3::functions::user_function> func, api::timestamp_type);
future<std::vector<mutation>> prepare_new_aggregate_announcement(shared_ptr<cql3::functions::user_aggregate> aggregate, api::timestamp_type);
future<std::vector<mutation>> prepare_function_drop_announcement(shared_ptr<cql3::functions::user_function> func, api::timestamp_type);
future<std::vector<mutation>> prepare_aggregate_drop_announcement(shared_ptr<cql3::functions::user_aggregate> aggregate, api::timestamp_type);
future<std::vector<mutation>> prepare_update_type_announcement(user_type updated_type, api::timestamp_type);
future<std::vector<mutation>> prepare_keyspace_drop_announcement(const sstring& ks_name, api::timestamp_type);
class drop_views_tag;
using drop_views = bool_class<drop_views_tag>;
future<std::vector<mutation>> prepare_column_family_drop_announcement(const sstring& ks_name, const sstring& cf_name, api::timestamp_type, drop_views drop_views = drop_views::no);
future<std::vector<mutation>> prepare_type_drop_announcement(user_type dropped_type, api::timestamp_type);
future<std::vector<mutation>> prepare_new_view_announcement(view_ptr view, api::timestamp_type);
future<std::vector<mutation>> prepare_view_update_announcement(view_ptr view, api::timestamp_type);
future<std::vector<mutation>> prepare_view_drop_announcement(const sstring& ks_name, const sstring& cf_name, api::timestamp_type);
// The function needs to be called if the user wants to read most up-to-date group 0 state (including schema state)
// (the function ensures that all previously finished group0 operations are visible on this node) or to write it.
//
@@ -203,9 +166,6 @@ public:
private:
future<> uninit_messaging_service();
future<std::vector<mutation>> include_keyspace(const keyspace_metadata& keyspace, std::vector<mutation> mutations);
future<std::vector<mutation>> do_prepare_new_type_announcement(user_type new_type, api::timestamp_type);
future<> push_schema_mutation(const gms::inet_address& endpoint, const std::vector<mutation>& schema);
future<> passive_announce();
@@ -246,4 +206,42 @@ public:
future<column_mapping> get_column_mapping(db::system_keyspace& sys_ks, table_id, table_schema_version v);
std::vector<mutation> prepare_keyspace_update_announcement(replica::database& db, lw_shared_ptr<keyspace_metadata> ksm, api::timestamp_type ts);
std::vector<mutation> prepare_new_keyspace_announcement(replica::database& db, lw_shared_ptr<keyspace_metadata> ksm, api::timestamp_type timestamp);
// The timestamp parameter can be used to ensure that all nodes update their internal tables' schemas
// with identical timestamps, which can prevent an undeeded schema exchange
future<std::vector<mutation>> prepare_column_family_update_announcement(storage_proxy& sp,
schema_ptr cfm, bool from_thrift, std::vector<view_ptr> view_updates, api::timestamp_type ts);
future<std::vector<mutation>> prepare_new_column_family_announcement(storage_proxy& sp, schema_ptr cfm, api::timestamp_type timestamp);
future<std::vector<mutation>> prepare_new_type_announcement(storage_proxy& sp, user_type new_type, api::timestamp_type ts);
future<std::vector<mutation>> prepare_new_function_announcement(storage_proxy& sp, shared_ptr<cql3::functions::user_function> func, api::timestamp_type ts);
future<std::vector<mutation>> prepare_new_aggregate_announcement(storage_proxy& sp, shared_ptr<cql3::functions::user_aggregate> aggregate, api::timestamp_type ts);
future<std::vector<mutation>> prepare_function_drop_announcement(storage_proxy& sp, shared_ptr<cql3::functions::user_function> func, api::timestamp_type ts);
future<std::vector<mutation>> prepare_aggregate_drop_announcement(storage_proxy& sp, shared_ptr<cql3::functions::user_aggregate> aggregate, api::timestamp_type ts);
future<std::vector<mutation>> prepare_update_type_announcement(storage_proxy& sp, user_type updated_type, api::timestamp_type ts);
future<std::vector<mutation>> prepare_keyspace_drop_announcement(replica::database& db, const sstring& ks_name, api::timestamp_type ts);
class drop_views_tag;
using drop_views = bool_class<drop_views_tag>;
future<std::vector<mutation>> prepare_column_family_drop_announcement(storage_proxy& sp,
const sstring& ks_name, const sstring& cf_name, api::timestamp_type ts, drop_views drop_views = drop_views::no);
future<std::vector<mutation>> prepare_type_drop_announcement(storage_proxy& sp, user_type dropped_type, api::timestamp_type ts);
future<std::vector<mutation>> prepare_new_view_announcement(storage_proxy& sp, view_ptr view, api::timestamp_type ts);
future<std::vector<mutation>> prepare_view_update_announcement(storage_proxy& sp, view_ptr view, api::timestamp_type ts);
future<std::vector<mutation>> prepare_view_drop_announcement(storage_proxy& sp, const sstring& ks_name, const sstring& cf_name, api::timestamp_type ts);
}

View File

@@ -52,7 +52,7 @@ future<> table_helper::setup_table(cql3::query_processor& qp, service::migration
// The important thing is that it will converge eventually (some traces may
// be lost in a process but that's ok).
try {
co_return co_await mm.announce(co_await mm.prepare_new_column_family_announcement(b.build(), ts), std::move(group0_guard));
co_return co_await mm.announce(co_await service::prepare_new_column_family_announcement(qp.proxy(), b.build(), ts), std::move(group0_guard));
} catch (...) {}
}
@@ -136,7 +136,7 @@ future<> table_helper::setup_keyspace(cql3::query_processor& qp, service::migrat
std::map<sstring, sstring> opts;
opts["replication_factor"] = replication_factor;
auto ksm = keyspace_metadata::new_keyspace(keyspace_name, "org.apache.cassandra.locator.SimpleStrategy", std::move(opts), true);
co_await mm.announce(mm.prepare_new_keyspace_announcement(ksm, ts), std::move(group0_guard));
co_await mm.announce(service::prepare_new_keyspace_announcement(db.real_database(), ksm, ts), std::move(group0_guard));
}
}

View File

@@ -4046,7 +4046,7 @@ SEASTAR_TEST_CASE(test_view_with_two_regular_base_columns_in_key) {
auto& mm = e.migration_manager().local();
auto group0_guard = mm.start_group0_operation().get();
auto ts = group0_guard.write_timestamp();
mm.announce(mm.prepare_new_view_announcement(view_ptr(view_schema), ts).get(), std::move(group0_guard)).get();
mm.announce(service::prepare_new_view_announcement(mm.get_storage_proxy(), view_ptr(view_schema), ts).get(), std::move(group0_guard)).get();
// Verify that deleting and restoring columns behaves as expected - i.e. the row is deleted and regenerated
cquery_nofail(e, "INSERT INTO t (p, c, v1, v2) VALUES (1, 2, 3, 4)");

View File

@@ -266,13 +266,13 @@ static void test_database(void (*run_tests)(populate_fn_ex, bool), unsigned cgs)
auto group0_guard = mm.start_group0_operation().get();
auto ts = group0_guard.write_timestamp();
e.local_db().find_column_family(s->ks_name(), s->cf_name());
mm.announce(mm.prepare_column_family_drop_announcement(s->ks_name(), s->cf_name(), ts).get(), std::move(group0_guard)).get();
mm.announce(service::prepare_column_family_drop_announcement(mm.get_storage_proxy(), s->ks_name(), s->cf_name(), ts).get(), std::move(group0_guard)).get();
} catch (const replica::no_such_column_family&) {
// expected
}
auto group0_guard = mm.start_group0_operation().get();
auto ts = group0_guard.write_timestamp();
mm.announce(mm.prepare_new_column_family_announcement(s, ts).get(), std::move(group0_guard)).get();
mm.announce(service::prepare_new_column_family_announcement(mm.get_storage_proxy(), s, ts).get(), std::move(group0_guard)).get();
replica::column_family& cf = e.local_db().find_column_family(s);
auto uuid = cf.schema()->id();
for (auto&& m : partitions) {

View File

@@ -87,7 +87,7 @@ SEASTAR_TEST_CASE(test_group0_cmd_merge) {
};
std::vector<canonical_mutation> cms;
size_t size = 0;
auto muts = mm.prepare_keyspace_drop_announcement("ks", api::new_timestamp()).get0();
auto muts = service::prepare_keyspace_drop_announcement(env.local_db(), "ks", api::new_timestamp()).get0();
// Maximum mutation size is 1/3 of commitlog segment size wich we set
// to 1M. Make one command a little bit larger than third of the max size.
while (size < 150*1024) {

View File

@@ -1001,7 +1001,7 @@ SEASTAR_TEST_CASE(sstable_compaction_does_not_resurrect_data) {
.build();
auto group0_guard = mm.start_group0_operation().get();
auto ts = group0_guard.write_timestamp();
mm.announce(mm.prepare_new_column_family_announcement(s, ts).get(), std::move(group0_guard)).get();
mm.announce(service::prepare_new_column_family_announcement(mm.get_storage_proxy(), s, ts).get(), std::move(group0_guard)).get();
replica::table& t = db.find_column_family(ks_name, table_name);
@@ -1066,7 +1066,7 @@ SEASTAR_TEST_CASE(failed_flush_prevents_writes) {
schema_ptr s = ss.schema();
auto group0_guard = mm.start_group0_operation().get();
auto ts = group0_guard.write_timestamp();
mm.announce(mm.prepare_new_column_family_announcement(s, ts).get(), std::move(group0_guard)).get();
mm.announce(service::prepare_new_column_family_announcement(mm.get_storage_proxy(), s, ts).get(), std::move(group0_guard)).get();
replica::table& t = db.find_column_family("ks", "cf");
auto memtables = t.active_memtables();
@@ -1134,7 +1134,7 @@ SEASTAR_TEST_CASE(flushing_rate_is_reduced_if_compaction_doesnt_keep_up) {
return env.migration_manager().invoke_on(0, [s = global_schema_ptr(std::move(s))] (service::migration_manager& mm) -> future<> {
auto group0_guard = co_await mm.start_group0_operation();
auto ts = group0_guard.write_timestamp();
auto announcement = co_await mm.prepare_new_column_family_announcement(s, ts);
auto announcement = co_await service::prepare_new_column_family_announcement(mm.get_storage_proxy(), s, ts);
co_await mm.announce(std::move(announcement), std::move(group0_guard));
});
}
@@ -1143,7 +1143,7 @@ SEASTAR_TEST_CASE(flushing_rate_is_reduced_if_compaction_doesnt_keep_up) {
return env.migration_manager().invoke_on(0, [shard = this_shard_id()] (service::migration_manager& mm) -> future<> {
auto group0_guard = co_await mm.start_group0_operation();
auto ts = group0_guard.write_timestamp();
auto announcement = co_await mm.prepare_column_family_drop_announcement(ks_name(), cf_name(shard), ts);
auto announcement = co_await service::prepare_column_family_drop_announcement(mm.get_storage_proxy(), ks_name(), cf_name(shard), ts);
co_await mm.announce(std::move(announcement), std::move(group0_guard));
});
}

View File

@@ -4168,7 +4168,7 @@ SEASTAR_TEST_CASE(row_cache_is_populated_using_compacting_sstable_reader) {
.with_column(to_bytes("id"), int32_type)
.build();
mm.announce(
mm.prepare_new_column_family_announcement(s, api::new_timestamp()).get(),
service::prepare_new_column_family_announcement(mm.get_storage_proxy(), s, api::new_timestamp()).get(),
mm.start_group0_operation().get()
).get();

View File

@@ -47,7 +47,7 @@ SEASTAR_TEST_CASE(test_new_schema_with_no_structural_change_is_propagated) {
{
auto group0_guard = mm.start_group0_operation().get();
auto ts = group0_guard.write_timestamp();
mm.announce(mm.prepare_new_column_family_announcement(old_schema, ts).get(), std::move(group0_guard)).get();
mm.announce(service::prepare_new_column_family_announcement(mm.get_storage_proxy(), old_schema, ts).get(), std::move(group0_guard)).get();
}
auto old_table_version = e.db().local().find_schema(old_schema->id())->version();
@@ -58,7 +58,8 @@ SEASTAR_TEST_CASE(test_new_schema_with_no_structural_change_is_propagated) {
auto group0_guard = mm.start_group0_operation().get();
auto ts = group0_guard.write_timestamp();
mm.announce(mm.prepare_column_family_update_announcement(new_schema, false, std::vector<view_ptr>(), ts).get(), std::move(group0_guard)).get();
mm.announce(service::prepare_column_family_update_announcement(mm.get_storage_proxy(),
new_schema, false, std::vector<view_ptr>(), ts).get(), std::move(group0_guard)).get();
BOOST_REQUIRE_NE(e.db().local().find_schema(old_schema->id())->version(), old_table_version);
BOOST_REQUIRE_NE(e.db().local().get_version(), old_node_version);
@@ -81,7 +82,7 @@ SEASTAR_TEST_CASE(test_schema_is_updated_in_keyspace) {
{
auto group0_guard = mm.start_group0_operation().get();
auto ts = group0_guard.write_timestamp();
mm.announce(mm.prepare_new_column_family_announcement(old_schema, ts).get(), std::move(group0_guard)).get();
mm.announce(service::prepare_new_column_family_announcement(mm.get_storage_proxy(), old_schema, ts).get(), std::move(group0_guard)).get();
}
auto s = e.local_db().find_schema(old_schema->id());
@@ -94,7 +95,8 @@ SEASTAR_TEST_CASE(test_schema_is_updated_in_keyspace) {
auto group0_guard = mm.start_group0_operation().get();
auto ts = group0_guard.write_timestamp();
mm.announce(mm.prepare_column_family_update_announcement(new_schema, false, std::vector<view_ptr>(), ts).get(), std::move(group0_guard)).get();
mm.announce(service::prepare_column_family_update_announcement(mm.get_storage_proxy(),
new_schema, false, std::vector<view_ptr>(), ts).get(), std::move(group0_guard)).get();
s = e.local_db().find_schema(old_schema->id());
BOOST_REQUIRE_NE(*old_schema, *s);
@@ -118,7 +120,7 @@ SEASTAR_TEST_CASE(test_tombstones_are_ignored_in_version_calculation) {
auto& mm = e.migration_manager().local();
auto group0_guard = mm.start_group0_operation().get();
auto ts = group0_guard.write_timestamp();
mm.announce(mm.prepare_new_column_family_announcement(table_schema, ts).get(), std::move(group0_guard)).get();
mm.announce(service::prepare_new_column_family_announcement(mm.get_storage_proxy(), table_schema, ts).get(), std::move(group0_guard)).get();
auto old_table_version = e.db().local().find_schema(table_schema->id())->version();
auto old_node_version = e.db().local().get_version();
@@ -169,7 +171,7 @@ SEASTAR_TEST_CASE(test_concurrent_column_addition) {
{
auto group0_guard = mm.start_group0_operation().get();
auto ts = group0_guard.write_timestamp();
mm.announce(mm.prepare_new_column_family_announcement(s1, ts).get(), std::move(group0_guard)).get();
mm.announce(service::prepare_new_column_family_announcement(mm.get_storage_proxy(), s1, ts).get(), std::move(group0_guard)).get();
}
auto old_version = e.db().local().find_schema(s1->id())->version();
@@ -320,7 +322,7 @@ SEASTAR_TEST_CASE(test_combined_column_add_and_drop) {
{
auto group0_guard = mm.start_group0_operation().get();
auto ts = group0_guard.write_timestamp();
mm.announce(mm.prepare_new_column_family_announcement(s1, ts).get(), std::move(group0_guard)).get();
mm.announce(service::prepare_new_column_family_announcement(mm.get_storage_proxy(), s1, ts).get(), std::move(group0_guard)).get();
}
auto&& keyspace = e.db().local().find_keyspace(s1->ks_name()).metadata();
@@ -386,8 +388,8 @@ SEASTAR_TEST_CASE(test_concurrent_table_creation_with_different_schema) {
.with_column("v1", bytes_type)
.build();
auto ann1 = mm.prepare_new_column_family_announcement(s1, api::new_timestamp()).get();
auto ann2 = mm.prepare_new_column_family_announcement(s2, api::new_timestamp()).get();
auto ann1 = service::prepare_new_column_family_announcement(mm.get_storage_proxy(), s1, api::new_timestamp()).get();
auto ann2 = service::prepare_new_column_family_announcement(mm.get_storage_proxy(), s2, api::new_timestamp()).get();
{
auto group0_guard = mm.start_group0_operation().get();
@@ -477,7 +479,7 @@ SEASTAR_TEST_CASE(test_merging_creates_a_table_even_if_keyspace_was_recreated) {
{
auto group0_guard = mm.start_group0_operation().get();
const auto ts = group0_guard.write_timestamp();
auto muts = e.migration_manager().local().prepare_keyspace_drop_announcement("ks", ts).get0();
auto muts = service::prepare_keyspace_drop_announcement(e.local_db(), "ks", ts).get0();
boost::copy(muts, std::back_inserter(all_muts));
mm.announce(muts, std::move(group0_guard)).get();
}
@@ -487,7 +489,7 @@ SEASTAR_TEST_CASE(test_merging_creates_a_table_even_if_keyspace_was_recreated) {
const auto ts = group0_guard.write_timestamp();
// all_muts contains keyspace drop.
auto muts = e.migration_manager().local().prepare_new_keyspace_announcement(keyspace, ts);
auto muts = service::prepare_new_keyspace_announcement(e.db().local(), keyspace, ts);
boost::copy(muts, std::back_inserter(all_muts));
mm.announce(muts, std::move(group0_guard)).get();
}
@@ -496,7 +498,7 @@ SEASTAR_TEST_CASE(test_merging_creates_a_table_even_if_keyspace_was_recreated) {
auto group0_guard = mm.start_group0_operation().get();
const auto ts = group0_guard.write_timestamp();
auto muts = e.migration_manager().local().prepare_new_column_family_announcement(s0, ts).get0();
auto muts = service::prepare_new_column_family_announcement(mm.get_storage_proxy(), s0, ts).get0();
boost::copy(muts, std::back_inserter(all_muts));
mm.announce(all_muts, std::move(group0_guard)).get();

View File

@@ -310,7 +310,7 @@ public:
auto s = builder.build(schema_builder::compact_storage::no);
auto group0_guard = co_await _mm.local().start_group0_operation();
auto ts = group0_guard.write_timestamp();
co_return co_await _mm.local().announce(co_await _mm.local().prepare_new_column_family_announcement(s, ts), std::move(group0_guard));
co_return co_await _mm.local().announce(co_await service::prepare_new_column_family_announcement(_proxy.local(), s, ts), std::move(group0_guard));
}
virtual future<> require_keyspace_exists(const sstring& ks_name) override {

View File

@@ -871,7 +871,7 @@ public:
});
}
future<std::string> execute_schema_command(std::function<future<std::vector<mutation>>(service::migration_manager&, data_dictionary::database, api::timestamp_type)> ddl) {
future<std::string> execute_schema_command(std::function<future<std::vector<mutation>>(data_dictionary::database, api::timestamp_type)> ddl) {
return _query_processor.invoke_on(0, [ddl = std::move(ddl)] (cql3::query_processor& qp) mutable {
return qp.execute_thrift_schema_command(std::move(ddl));
});
@@ -885,7 +885,7 @@ public:
co_await t._query_state.get_client_state().has_keyspace_access(cf_def.keyspace, auth::permission::CREATE);
co_return co_await t.execute_schema_command([&cf_def] (service::migration_manager& mm, data_dictionary::database db, api::timestamp_type ts) -> future<std::vector<mutation>> {
co_return co_await t.execute_schema_command([&p = t._proxy.local(), &cf_def] (data_dictionary::database db, api::timestamp_type ts) -> future<std::vector<mutation>> {
if (!db.has_keyspace(cf_def.keyspace)) {
throw NotFoundException();
}
@@ -894,7 +894,7 @@ public:
}
auto s = schema_from_thrift(cf_def, cf_def.keyspace);
co_return co_await mm.prepare_new_column_family_announcement(std::move(s), ts);
co_return co_await service::prepare_new_column_family_announcement(p, std::move(s), ts);
});
});
}
@@ -906,7 +906,7 @@ public:
co_await t._query_state.get_client_state().has_column_family_access(t.current_keyspace(), column_family, auth::permission::DROP);
co_return co_await t.execute_schema_command(
[&column_family, &current_keyspace = t.current_keyspace()] (service::migration_manager& mm, data_dictionary::database db, api::timestamp_type ts) -> future<std::vector<mutation>> {
[&p = t._proxy.local(), &column_family, &current_keyspace = t.current_keyspace()] (data_dictionary::database db, api::timestamp_type ts) -> future<std::vector<mutation>> {
auto cf = db.find_table(current_keyspace, column_family);
if (cf.schema()->is_view()) {
throw make_exception<InvalidRequestException>("Cannot drop Materialized Views from Thrift");
@@ -915,7 +915,7 @@ public:
throw make_exception<InvalidRequestException>("Cannot drop table with Materialized Views {}", column_family);
}
co_return co_await mm.prepare_column_family_drop_announcement(current_keyspace, column_family, ts);
co_return co_await service::prepare_column_family_drop_announcement(p, current_keyspace, column_family, ts);
});
});
}
@@ -928,8 +928,8 @@ public:
co_await t._query_state.get_client_state().has_all_keyspaces_access(auth::permission::CREATE);
co_return co_await t.execute_schema_command([&ks_def] (service::migration_manager& mm, data_dictionary::database db, api::timestamp_type ts) -> future<std::vector<mutation>> {
co_return mm.prepare_new_keyspace_announcement(keyspace_from_thrift(ks_def), ts);
co_return co_await t.execute_schema_command([&ks_def] (data_dictionary::database db, api::timestamp_type ts) -> future<std::vector<mutation>> {
co_return service::prepare_new_keyspace_announcement(db.real_database(), keyspace_from_thrift(ks_def), ts);
});
});
}
@@ -942,13 +942,13 @@ public:
co_await t._query_state.get_client_state().has_keyspace_access(keyspace, auth::permission::DROP);
co_return co_await t.execute_schema_command([&keyspace] (service::migration_manager& mm, data_dictionary::database db, api::timestamp_type ts) -> future<std::vector<mutation>> {
co_return co_await t.execute_schema_command([&keyspace] (data_dictionary::database db, api::timestamp_type ts) -> future<std::vector<mutation>> {
thrift_validation::validate_keyspace_not_system(keyspace);
if (!db.has_keyspace(keyspace)) {
throw NotFoundException();
}
co_return co_await mm.prepare_keyspace_drop_announcement(keyspace, ts);
co_return co_await service::prepare_keyspace_drop_announcement(db.real_database(), keyspace, ts);
});
});
}
@@ -962,7 +962,7 @@ public:
co_await t._query_state.get_client_state().has_keyspace_access(ks_def.name, auth::permission::ALTER);
co_return co_await t.execute_schema_command([&ks_def] (service::migration_manager& mm, data_dictionary::database db, api::timestamp_type ts) -> future<std::vector<mutation>> {
co_return co_await t.execute_schema_command([&ks_def] (data_dictionary::database db, api::timestamp_type ts) -> future<std::vector<mutation>> {
if (!db.has_keyspace(ks_def.name)) {
throw NotFoundException();
}
@@ -971,7 +971,7 @@ public:
}
auto ksm = keyspace_from_thrift(ks_def);
co_return mm.prepare_keyspace_update_announcement(std::move(ksm), ts);
co_return service::prepare_keyspace_update_announcement(db.real_database(), std::move(ksm), ts);
});
});
}
@@ -984,7 +984,7 @@ public:
co_await t._query_state.get_client_state().has_schema_access(cf_def.keyspace, cf_def.name, auth::permission::ALTER);
co_return co_await t.execute_schema_command([&cf_def] (service::migration_manager& mm, data_dictionary::database db, api::timestamp_type ts) -> future<std::vector<mutation>> {
co_return co_await t.execute_schema_command([&p = t._proxy.local(), &cf_def] (data_dictionary::database db, api::timestamp_type ts) -> future<std::vector<mutation>> {
auto cf = db.find_table(cf_def.keyspace, cf_def.name);
auto schema = cf.schema();
@@ -1006,7 +1006,7 @@ public:
if (schema->thrift().is_dynamic() != s->thrift().is_dynamic()) {
fail(unimplemented::cause::MIXED_CF);
}
co_return co_await mm.prepare_column_family_update_announcement(std::move(s), true, std::vector<view_ptr>(), ts);
co_return co_await service::prepare_column_family_update_announcement(p, std::move(s), true, std::vector<view_ptr>(), ts);
});
});
}