code: Pass sharded<db::system_keyspace>& to database::truncate()
The arguments goes via the db::(drop|truncate)_table_on_all_shards() pair of calls that start from - storage_proxy::remote: has its sys.ks reference already - schema_tables::merge_schema: has sys.ks argument already - legacy_schema_migrator: the reference was added by previous patch - tests: run in cql_test_env with sys.ks on board Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
This commit is contained in:
@@ -51,7 +51,6 @@ public:
|
||||
|
||||
migrator(sharded<service::storage_proxy>& sp, sharded<replica::database>& db, sharded<db::system_keyspace>& sys_ks, cql3::query_processor& qp)
|
||||
: _sp(sp), _db(db), _sys_ks(sys_ks), _qp(qp) {
|
||||
(void)_sys_ks;
|
||||
}
|
||||
migrator(migrator&&) = default;
|
||||
|
||||
@@ -536,7 +535,7 @@ public:
|
||||
mlogger.info("Dropping legacy schema tables");
|
||||
auto with_snapshot = !_keyspaces.empty();
|
||||
return parallel_for_each(legacy_schema_tables, [this, with_snapshot](const sstring& cfname) {
|
||||
return replica::database::drop_table_on_all_shards(_db, db::system_keyspace::NAME, cfname, with_snapshot);
|
||||
return replica::database::drop_table_on_all_shards(_db, _sys_ks, db::system_keyspace::NAME, cfname, with_snapshot);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -148,6 +148,7 @@ struct qualified_name {
|
||||
static future<schema_mutations> read_table_mutations(distributed<service::storage_proxy>& proxy, const qualified_name& table, schema_ptr s);
|
||||
|
||||
static future<> merge_tables_and_views(distributed<service::storage_proxy>& proxy,
|
||||
sharded<db::system_keyspace>& sys_ks,
|
||||
std::map<table_id, schema_mutations>&& tables_before,
|
||||
std::map<table_id, schema_mutations>&& tables_after,
|
||||
std::map<table_id, schema_mutations>&& views_before,
|
||||
@@ -164,7 +165,7 @@ static future<user_types_to_drop> merge_types(distributed<service::storage_proxy
|
||||
static future<> merge_functions(distributed<service::storage_proxy>& proxy, schema_result before, schema_result after);
|
||||
static future<> merge_aggregates(distributed<service::storage_proxy>& proxy, schema_result before, schema_result after, schema_result scylla_before, schema_result scylla_after);
|
||||
|
||||
static future<> do_merge_schema(distributed<service::storage_proxy>&, std::vector<mutation>, bool do_flush);
|
||||
static future<> do_merge_schema(distributed<service::storage_proxy>&, sharded<db::system_keyspace>& sys_ks, std::vector<mutation>, bool do_flush);
|
||||
|
||||
using computed_columns_map = std::unordered_map<bytes, column_computation_ptr>;
|
||||
static computed_columns_map get_computed_columns(const schema_mutations& sm);
|
||||
@@ -980,7 +981,7 @@ future<> merge_schema(sharded<db::system_keyspace>& sys_ks, distributed<service:
|
||||
}
|
||||
co_await with_merge_lock([&] () mutable -> future<> {
|
||||
bool flush_schema = proxy.local().get_db().local().get_config().flush_schema_tables_after_modification();
|
||||
co_await do_merge_schema(proxy, std::move(mutations), flush_schema);
|
||||
co_await do_merge_schema(proxy, sys_ks, std::move(mutations), flush_schema);
|
||||
co_await update_schema_version_and_announce(sys_ks, proxy, feat.cluster_schema_features());
|
||||
});
|
||||
}
|
||||
@@ -1219,7 +1220,7 @@ table_selector get_affected_tables(const sstring& keyspace_name, const mutation&
|
||||
return result;
|
||||
}
|
||||
|
||||
static future<> do_merge_schema(distributed<service::storage_proxy>& proxy, std::vector<mutation> mutations, bool do_flush)
|
||||
static future<> do_merge_schema(distributed<service::storage_proxy>& proxy, sharded<db::system_keyspace>& sys_ks, std::vector<mutation> mutations, bool do_flush)
|
||||
{
|
||||
slogger.trace("do_merge_schema: {}", mutations);
|
||||
schema_ptr s = keyspaces();
|
||||
@@ -1300,7 +1301,7 @@ static future<> do_merge_schema(distributed<service::storage_proxy>& proxy, std:
|
||||
|
||||
std::set<sstring> keyspaces_to_drop = co_await merge_keyspaces(proxy, std::move(old_keyspaces), std::move(new_keyspaces));
|
||||
auto types_to_drop = co_await merge_types(proxy, std::move(old_types), std::move(new_types));
|
||||
co_await merge_tables_and_views(proxy,
|
||||
co_await merge_tables_and_views(proxy, sys_ks,
|
||||
std::move(old_column_families), std::move(new_column_families),
|
||||
std::move(old_views), std::move(new_views));
|
||||
co_await merge_functions(proxy, std::move(old_functions), std::move(new_functions));
|
||||
@@ -1435,6 +1436,7 @@ static schema_diff diff_table_or_view(distributed<service::storage_proxy>& proxy
|
||||
// upon an alter table or alter type statement), then they are published together
|
||||
// as well, without any deferring in-between.
|
||||
static future<> merge_tables_and_views(distributed<service::storage_proxy>& proxy,
|
||||
sharded<db::system_keyspace>& sys_ks,
|
||||
std::map<table_id, schema_mutations>&& tables_before,
|
||||
std::map<table_id, schema_mutations>&& tables_after,
|
||||
std::map<table_id, schema_mutations>&& views_before,
|
||||
@@ -1488,13 +1490,13 @@ static future<> merge_tables_and_views(distributed<service::storage_proxy>& prox
|
||||
// to a mv not finding its schema when snapshoting since the main table
|
||||
// was already dropped (see https://github.com/scylladb/scylla/issues/5614)
|
||||
auto& db = proxy.local().get_db();
|
||||
co_await max_concurrent_for_each(views_diff.dropped, max_concurrent, [&db] (schema_diff::dropped_schema& dt) {
|
||||
co_await max_concurrent_for_each(views_diff.dropped, max_concurrent, [&db, &sys_ks] (schema_diff::dropped_schema& dt) {
|
||||
auto& s = *dt.schema.get();
|
||||
return replica::database::drop_table_on_all_shards(db, s.ks_name(), s.cf_name());
|
||||
return replica::database::drop_table_on_all_shards(db, sys_ks, s.ks_name(), s.cf_name());
|
||||
});
|
||||
co_await max_concurrent_for_each(tables_diff.dropped, max_concurrent, [&db] (schema_diff::dropped_schema& dt) -> future<> {
|
||||
co_await max_concurrent_for_each(tables_diff.dropped, max_concurrent, [&db, &sys_ks] (schema_diff::dropped_schema& dt) -> future<> {
|
||||
auto& s = *dt.schema.get();
|
||||
return replica::database::drop_table_on_all_shards(db, s.ks_name(), s.cf_name());
|
||||
return replica::database::drop_table_on_all_shards(db, sys_ks, s.ks_name(), s.cf_name());
|
||||
});
|
||||
|
||||
co_await db.invoke_on_all([&] (replica::database& db) -> future<> {
|
||||
|
||||
@@ -1121,7 +1121,8 @@ future<global_table_ptr> get_table_on_all_shards(sharded<database>& sharded_db,
|
||||
co_return table_shards;
|
||||
}
|
||||
|
||||
future<> database::drop_table_on_all_shards(sharded<database>& sharded_db, sstring ks_name, sstring cf_name, bool with_snapshot) {
|
||||
future<> database::drop_table_on_all_shards(sharded<database>& sharded_db, sharded<db::system_keyspace>& sys_ks,
|
||||
sstring ks_name, sstring cf_name, bool with_snapshot) {
|
||||
auto auto_snapshot = sharded_db.local().get_config().auto_snapshot();
|
||||
dblog.info("Dropping {}.{} {}snapshot", ks_name, cf_name, with_snapshot && auto_snapshot ? "with auto-" : "without ");
|
||||
|
||||
@@ -1138,7 +1139,7 @@ future<> database::drop_table_on_all_shards(sharded<database>& sharded_db, sstri
|
||||
// to ensure all sstables are truncated,
|
||||
// but be careful to stays within the client's datetime limits.
|
||||
constexpr db_clock::time_point truncated_at(std::chrono::seconds(253402214400));
|
||||
auto f = co_await coroutine::as_future(truncate_table_on_all_shards(sharded_db, table_shards, truncated_at, with_snapshot, std::move(snapshot_name_opt)));
|
||||
auto f = co_await coroutine::as_future(truncate_table_on_all_shards(sharded_db, sys_ks, table_shards, truncated_at, with_snapshot, std::move(snapshot_name_opt)));
|
||||
co_await smp::invoke_on_all([&] {
|
||||
return table_shards->stop();
|
||||
});
|
||||
@@ -2467,10 +2468,11 @@ future<> database::snapshot_keyspace_on_all_shards(sharded<database>& sharded_db
|
||||
});
|
||||
}
|
||||
|
||||
future<> database::truncate_table_on_all_shards(sharded<database>& sharded_db, sstring ks_name, sstring cf_name, std::optional<db_clock::time_point> truncated_at_opt, bool with_snapshot, std::optional<sstring> snapshot_name_opt) {
|
||||
future<> database::truncate_table_on_all_shards(sharded<database>& sharded_db, sharded<db::system_keyspace>& sys_ks,
|
||||
sstring ks_name, sstring cf_name, std::optional<db_clock::time_point> truncated_at_opt, bool with_snapshot, std::optional<sstring> snapshot_name_opt) {
|
||||
auto uuid = sharded_db.local().find_uuid(ks_name, cf_name);
|
||||
auto table_shards = co_await get_table_on_all_shards(sharded_db, uuid);
|
||||
co_return co_await truncate_table_on_all_shards(sharded_db, table_shards, truncated_at_opt, with_snapshot, std::move(snapshot_name_opt));
|
||||
co_return co_await truncate_table_on_all_shards(sharded_db, sys_ks, table_shards, truncated_at_opt, with_snapshot, std::move(snapshot_name_opt));
|
||||
}
|
||||
|
||||
struct database::table_truncate_state {
|
||||
@@ -2481,7 +2483,8 @@ struct database::table_truncate_state {
|
||||
bool did_flush;
|
||||
};
|
||||
|
||||
future<> database::truncate_table_on_all_shards(sharded<database>& sharded_db, const global_table_ptr& table_shards, std::optional<db_clock::time_point> truncated_at_opt, bool with_snapshot, std::optional<sstring> snapshot_name_opt) {
|
||||
future<> database::truncate_table_on_all_shards(sharded<database>& sharded_db, sharded<db::system_keyspace>& sys_ks,
|
||||
const global_table_ptr& table_shards, std::optional<db_clock::time_point> truncated_at_opt, bool with_snapshot, std::optional<sstring> snapshot_name_opt) {
|
||||
auto& cf = *table_shards;
|
||||
auto s = cf.schema();
|
||||
|
||||
@@ -2569,11 +2572,11 @@ future<> database::truncate_table_on_all_shards(sharded<database>& sharded_db, c
|
||||
auto& cf = *table_shards;
|
||||
auto& st = *table_states[shard];
|
||||
|
||||
return db.truncate(cf, st, truncated_at);
|
||||
return db.truncate(sys_ks.local(), cf, st, truncated_at);
|
||||
});
|
||||
}
|
||||
|
||||
future<> database::truncate(column_family& cf, const table_truncate_state& st, db_clock::time_point truncated_at) {
|
||||
future<> database::truncate(db::system_keyspace& sys_ks, column_family& cf, const table_truncate_state& st, db_clock::time_point truncated_at) {
|
||||
dblog.trace("Truncating {}.{} on shard", cf.schema()->ks_name(), cf.schema()->cf_name());
|
||||
|
||||
const auto uuid = cf.schema()->id();
|
||||
|
||||
@@ -1708,15 +1708,15 @@ private:
|
||||
|
||||
struct table_truncate_state;
|
||||
|
||||
static future<> truncate_table_on_all_shards(sharded<database>& db, const global_table_ptr&, std::optional<db_clock::time_point> truncated_at_opt, bool with_snapshot, std::optional<sstring> snapshot_name_opt);
|
||||
future<> truncate(column_family& cf, const table_truncate_state&, db_clock::time_point truncated_at);
|
||||
static future<> truncate_table_on_all_shards(sharded<database>& db, sharded<db::system_keyspace>& sys_ks, const global_table_ptr&, std::optional<db_clock::time_point> truncated_at_opt, bool with_snapshot, std::optional<sstring> snapshot_name_opt);
|
||||
future<> truncate(db::system_keyspace& sys_ks, column_family& cf, const table_truncate_state&, db_clock::time_point truncated_at);
|
||||
public:
|
||||
/** Truncates the given column family */
|
||||
// If truncated_at_opt is not given, it is set to db_clock::now right after flush/clear.
|
||||
static future<> truncate_table_on_all_shards(sharded<database>& db, sstring ks_name, sstring cf_name, std::optional<db_clock::time_point> truncated_at_opt = {}, bool with_snapshot = true, std::optional<sstring> snapshot_name_opt = {});
|
||||
static future<> truncate_table_on_all_shards(sharded<database>& db, sharded<db::system_keyspace>& sys_ks, sstring ks_name, sstring cf_name, std::optional<db_clock::time_point> truncated_at_opt = {}, bool with_snapshot = true, std::optional<sstring> snapshot_name_opt = {});
|
||||
|
||||
// drops the table on all shards and removes the table directory if there are no snapshots
|
||||
static future<> drop_table_on_all_shards(sharded<database>& db, sstring ks_name, sstring cf_name, bool with_snapshot = true);
|
||||
static future<> drop_table_on_all_shards(sharded<database>& db, sharded<db::system_keyspace>& sys_ks, sstring ks_name, sstring cf_name, bool with_snapshot = true);
|
||||
|
||||
const dirty_memory_manager_logalloc::region_group& dirty_memory_region_group() const {
|
||||
return _dirty_memory_manager.region_group();
|
||||
|
||||
@@ -750,7 +750,7 @@ private:
|
||||
}
|
||||
|
||||
future<> handle_truncate(rpc::opt_time_point timeout, sstring ksname, sstring cfname) {
|
||||
return replica::database::truncate_table_on_all_shards(_sp._db, ksname, cfname);
|
||||
return replica::database::truncate_table_on_all_shards(_sp._db, _sys_ks, ksname, cfname);
|
||||
}
|
||||
|
||||
future<foreign_ptr<std::unique_ptr<service::paxos::prepare_response>>>
|
||||
|
||||
@@ -131,7 +131,7 @@ SEASTAR_TEST_CASE(test_safety_after_truncate) {
|
||||
};
|
||||
assert_query_result(keys_per_shard);
|
||||
|
||||
replica::database::truncate_table_on_all_shards(e.db(), "ks", "cf").get();
|
||||
replica::database::truncate_table_on_all_shards(e.db(), e.get_system_keyspace(), "ks", "cf").get();
|
||||
|
||||
for (auto it = keys_per_shard.begin(); it < keys_per_shard.end(); ++it) {
|
||||
*it = 0;
|
||||
@@ -180,7 +180,7 @@ SEASTAR_TEST_CASE(test_truncate_without_snapshot_during_writes) {
|
||||
|
||||
auto f0 = insert_data(0, num_keys);
|
||||
auto f1 = do_until([&] { return std::cmp_greater_equal(count, num_keys); }, [&, ts = db_clock::now()] {
|
||||
return replica::database::truncate_table_on_all_shards(e.db(), "ks", "cf", ts, false /* with_snapshot */).then([] {
|
||||
return replica::database::truncate_table_on_all_shards(e.db(), e.get_system_keyspace(), "ks", "cf", ts, false /* with_snapshot */).then([] {
|
||||
return yield();
|
||||
});
|
||||
});
|
||||
@@ -777,7 +777,7 @@ SEASTAR_TEST_CASE(clear_multiple_snapshots) {
|
||||
|
||||
// existing snapshots expected to remain after dropping the table
|
||||
testlog.debug("Dropping table {}.{}", ks_name, table_name);
|
||||
replica::database::drop_table_on_all_shards(e.db(), ks_name, table_name).get();
|
||||
replica::database::drop_table_on_all_shards(e.db(), e.get_system_keyspace(), ks_name, table_name).get();
|
||||
BOOST_REQUIRE_EQUAL(fs::exists(snapshots_dir / snapshot_name(num_snapshots)), true);
|
||||
|
||||
// clear all tags
|
||||
@@ -1350,7 +1350,7 @@ SEASTAR_TEST_CASE(database_drop_column_family_clears_querier_cache) {
|
||||
s->full_slice(),
|
||||
nullptr);
|
||||
|
||||
auto f = replica::database::drop_table_on_all_shards(e.db(), "ks", "cf");
|
||||
auto f = replica::database::drop_table_on_all_shards(e.db(), e.get_system_keyspace(), "ks", "cf");
|
||||
|
||||
// we add a querier to the querier cache while the drop is ongoing
|
||||
auto& qc = db.get_querier_cache();
|
||||
@@ -1379,7 +1379,7 @@ static future<> test_drop_table_with_auto_snapshot(bool auto_snapshot) {
|
||||
// Pass `with_snapshot=true` to drop_table_on_all
|
||||
// to allow auto_snapshot (based on the configuration above).
|
||||
// The table directory should therefore exist after the table is dropped if auto_snapshot is disabled in the configuration.
|
||||
co_await replica::database::drop_table_on_all_shards(e.db(), ks_name, table_name, true);
|
||||
co_await replica::database::drop_table_on_all_shards(e.db(), e.get_system_keyspace(), ks_name, table_name, true);
|
||||
auto cf_dir_exists = co_await file_exists(cf_dir);
|
||||
BOOST_REQUIRE_EQUAL(cf_dir_exists, auto_snapshot);
|
||||
co_return;
|
||||
@@ -1404,7 +1404,7 @@ SEASTAR_TEST_CASE(drop_table_with_no_snapshot) {
|
||||
// Pass `with_snapshot=false` to drop_table_on_all
|
||||
// to disallow auto_snapshot.
|
||||
// The table directory should therefore not exist after the table is dropped.
|
||||
co_await replica::database::drop_table_on_all_shards(e.db(), ks_name, table_name, false);
|
||||
co_await replica::database::drop_table_on_all_shards(e.db(), e.get_system_keyspace(), ks_name, table_name, false);
|
||||
auto cf_dir_exists = co_await file_exists(cf_dir);
|
||||
BOOST_REQUIRE_EQUAL(cf_dir_exists, false);
|
||||
co_return;
|
||||
@@ -1423,7 +1423,7 @@ SEASTAR_TEST_CASE(drop_table_with_explicit_snapshot) {
|
||||
// With explicit snapshot and with_snapshot=false
|
||||
// dir should still be kept, regardless of the
|
||||
// with_snapshot parameter and auto_snapshot config.
|
||||
co_await replica::database::drop_table_on_all_shards(e.db(), ks_name, table_name, false);
|
||||
co_await replica::database::drop_table_on_all_shards(e.db(), e.get_system_keyspace(), ks_name, table_name, false);
|
||||
auto cf_dir_exists = co_await file_exists(cf_dir);
|
||||
BOOST_REQUIRE_EQUAL(cf_dir_exists, true);
|
||||
co_return;
|
||||
|
||||
Reference in New Issue
Block a user