Revert "Merge 'Atomic in-memory schema changes application' from Marcin Maliszkiewicz"

This reverts commit 0b516da95b, reversing
changes made to 30199552ac. It breaks
cluster.random_failures.test_random_failures.test_random_failures
in debug mode (at least).

Fixes #24513
This commit is contained in:
Avi Kivity
2025-06-16 22:38:12 +03:00
parent 5c2e5890a6
commit cd79a8fc25
42 changed files with 636 additions and 1343 deletions

View File

@@ -83,6 +83,7 @@ private:
void on_update_function(const sstring& ks_name, const sstring& function_name) override {}
void on_update_aggregate(const sstring& ks_name, const sstring& aggregate_name) override {}
void on_update_view(const sstring& ks_name, const sstring& view_name, bool columns_changed) override {}
void on_update_tablet_metadata(const locator::tablet_metadata_change_hint&) override {}
void on_drop_keyspace(const sstring& ks_name) override {
if (!legacy_mode(_qp)) {

View File

@@ -768,7 +768,6 @@ scylla_raft_core = [
scylla_core = (['message/messaging_service.cc',
'replica/database.cc',
'replica/schema_describe_helper.cc',
'replica/table.cc',
'replica/tablets.cc',
'replica/distributed_loader.cc',

View File

@@ -16,7 +16,6 @@
#include "cql3/functions/function_name.hh"
#include "schema/schema.hh"
#include <unordered_map>
#include "data_dictionary/user_types_metadata.hh"
namespace cql3 {
@@ -103,13 +102,6 @@ const functions& instance();
class change_batch : public functions {
public:
struct func_name_and_args {
function_name name;
std::vector<data_type> arg_types;
bool aggregate;
};
std::vector<func_name_and_args> removed_functions;
// Skip init as we copy data from static instance.
change_batch() : functions(skip_init{}) {
_declared = instance()._declared;
@@ -120,15 +112,6 @@ public:
// Used only by unittest.
void clear_functions() noexcept;
void remove_function(function_name& name, std::vector<data_type>& arg_types, bool aggregate = false) {
removed_functions.emplace_back(name, arg_types, aggregate);
functions::remove_function(name, arg_types);
};
void remove_aggregate(function_name& name, std::vector<data_type>& arg_types) {
remove_function(name, arg_types, true);
}
};
}

View File

@@ -1135,6 +1135,9 @@ void query_processor::migration_subscriber::on_update_view(
on_update_column_family(ks_name, view_name, columns_changed);
}
void query_processor::migration_subscriber::on_update_tablet_metadata(const locator::tablet_metadata_change_hint&) {
}
void query_processor::migration_subscriber::on_drop_keyspace(const sstring& ks_name) {
remove_invalid_prepared_statements(ks_name, std::nullopt);
}

View File

@@ -590,6 +590,7 @@ public:
virtual void on_update_function(const sstring& ks_name, const sstring& function_name) override;
virtual void on_update_aggregate(const sstring& ks_name, const sstring& aggregate_name) override;
virtual void on_update_view(const sstring& ks_name, const sstring& view_name, bool columns_changed) override;
virtual void on_update_tablet_metadata(const locator::tablet_metadata_change_hint&) override;
virtual void on_drop_keyspace(const sstring& ks_name) override;
virtual void on_drop_column_family(const sstring& ks_name, const sstring& cf_name) override;

View File

@@ -210,7 +210,8 @@ description view(const data_dictionary::database& db, const sstring& ks, const s
throw exceptions::invalid_request_exception(format("Materialized view '{}' not found in keyspace '{}'", name, ks));
}
return view->schema()->describe(replica::make_schema_describe_helper(view->schema(), db), with_internals ? describe_option::STMTS_AND_INTERNALS : describe_option::STMTS);
replica::schema_describe_helper describe_helper{db};
return view->schema()->describe(describe_helper, with_internals ? describe_option::STMTS_AND_INTERNALS : describe_option::STMTS);
}
description index(const data_dictionary::database& db, const sstring& ks, const sstring& name, bool with_internals) {
@@ -228,7 +229,8 @@ description index(const data_dictionary::database& db, const sstring& ks, const
}
}
return (**idx).describe(replica::make_schema_describe_helper(*idx, db), with_internals ? describe_option::STMTS_AND_INTERNALS : describe_option::STMTS);
replica::schema_describe_helper describe_helper{db};
return (**idx).describe(describe_helper, with_internals ? describe_option::STMTS_AND_INTERNALS : describe_option::STMTS);
}
// `base_name` should be a table with enabled cdc
@@ -241,7 +243,7 @@ std::optional<description> describe_cdc_log_table(const data_dictionary::databas
std::ostringstream os;
auto schema = table->schema();
auto describe_helper = replica::make_schema_describe_helper(schema, db);
replica::schema_describe_helper describe_helper{db};
schema->describe_alter_with_properties(describe_helper, os);
auto schema_desc = schema->describe(describe_helper, describe_option::NO_STMTS);
@@ -266,7 +268,8 @@ future<std::vector<description>> table(const data_dictionary::database& db, cons
std::vector<description> result;
// table
auto table_desc = schema->describe(replica::make_schema_describe_helper(schema, db), with_internals ? describe_option::STMTS_AND_INTERNALS : describe_option::STMTS);
replica::schema_describe_helper describe_helper{db};
auto table_desc = schema->describe(describe_helper, with_internals ? describe_option::STMTS_AND_INTERNALS : describe_option::STMTS);
if (cdc::is_log_for_some_table(db.real_database(), ks, name)) {
// If the table the user wants to describe is a CDC log table, we want to print it as a CQL comment.
// This way, the user learns about the internals of the table, but they're also told not to execute it.
@@ -325,8 +328,9 @@ future<std::vector<description>> tables(const data_dictionary::database& db, con
co_return result;
}
co_return tables | std::views::transform([&db] (auto&& t) {
return t->describe(replica::make_schema_describe_helper(t, db), describe_option::NO_STMTS);
replica::schema_describe_helper describe_helper{db};
co_return tables | std::views::transform([&describe_helper] (auto&& t) {
return t->describe(describe_helper, describe_option::NO_STMTS);
}) | std::ranges::to<std::vector>();
}

View File

@@ -527,9 +527,9 @@ public:
future<> drop_legacy_tables() {
mlogger.info("Dropping legacy schema tables");
auto with_snapshot = !_keyspaces.empty();
for (const sstring& cfname : legacy_schema_tables) {
co_await replica::database::legacy_drop_table_on_all_shards(_db, _sys_ks, db::system_keyspace::NAME, cfname, with_snapshot);
}
return parallel_for_each(legacy_schema_tables, [this, with_snapshot](const sstring& cfname) {
return replica::database::drop_table_on_all_shards(_db, _sys_ks, db::system_keyspace::NAME, cfname, with_snapshot);
});
}
future<> store_keyspaces_in_new_schema_tables() {

File diff suppressed because it is too large Load Diff

View File

@@ -9,212 +9,18 @@
#pragma once
#include "frozen_schema.hh"
#include "mutation/mutation.hh"
#include <seastar/core/future.hh>
#include "service/storage_proxy.hh"
#include "query-result-set.hh"
#include "db/schema_tables.hh"
#include "data_dictionary/user_types_metadata.hh"
#include "schema/schema_registry.hh"
#include "service/storage_service.hh"
#include "replica/database.hh"
#include "replica/global_table_ptr.hh"
#include "replica/tables_metadata_lock.hh"
#include <seastar/core/distributed.hh>
#include <unordered_map>
namespace db {
namespace schema_tables {
future<> merge_schema(sharded<db::system_keyspace>& sys_ks, distributed<service::storage_proxy>& proxy, distributed<service::storage_service>& ss, gms::feature_service& feat, std::vector<mutation> mutations, bool reload = false);
enum class table_kind { table, view };
struct table_selector {
bool all_in_keyspace = false; // If true, selects all existing tables in a keyspace plus what's in "tables";
std::unordered_map<table_kind, std::unordered_set<sstring>> tables;
table_selector& operator+=(table_selector&& o);
void add(table_kind t, sstring name);
void add(sstring name);
};
struct schema_persisted_state {
schema_tables::schema_result keyspaces;
schema_tables::schema_result scylla_keyspaces;
std::map<table_id, schema_mutations> tables;
schema_tables::schema_result types;
std::map<table_id, schema_mutations> views;
schema_tables::schema_result functions;
schema_tables::schema_result aggregates;
schema_tables::schema_result scylla_aggregates;
};
struct affected_keyspaces_names {
std::set<sstring> created;
std::set<sstring> altered;
std::set<sstring> dropped;
};
// groups keyspaces based on what is happening to them during schema change
struct affected_keyspaces {
std::vector<replica::database::created_keyspace_per_shard> created;
std::vector<replica::database::keyspace_change_per_shard> altered;
// names need to be copied here as they are used multiple times and
// keyspace struct from which we obtain the name is moved when
// we commit it
affected_keyspaces_names names;
};
struct affected_user_types_per_shard {
std::vector<user_type> created;
std::vector<user_type> altered;
std::vector<user_type> dropped;
};
// groups UDTs based on what is happening to them during schema change
using affected_user_types = sharded<affected_user_types_per_shard>;
// In_progress_types_storage_per_shard contains current
// types with in-progress modifications applied.
// Important note: this storage can't be used directly in all cases,
// e.g. it's legal to drop type together with dropping other entity
// in such case we use existing storage instead so that whatever
// is being dropped can reference this type (we remove it from in_progress storage)
// in such cases get proper storage via committed_storage().
class in_progress_types_storage_per_shard : public data_dictionary::user_types_storage {
std::shared_ptr<data_dictionary::user_types_storage> _stored_user_types;
std::map<sstring, data_dictionary::user_types_metadata> _in_progress_types;
public:
in_progress_types_storage_per_shard(replica::database& db, const affected_keyspaces& affected_keyspaces, const affected_user_types& affected_types);
virtual const data_dictionary::user_types_metadata& get(const sstring& ks) const override;
std::shared_ptr<data_dictionary::user_types_storage> committed_storage();
};
class in_progress_types_storage {
// wrapped in foreign_ptr so they can be destroyed on the right shard
std::vector<foreign_ptr<shared_ptr<in_progress_types_storage_per_shard>>> shards;
public:
in_progress_types_storage() : shards(smp::count) {}
future<> init(distributed<replica::database>& sharded_db, const affected_keyspaces& affected_keyspaces, const affected_user_types& affected_types);
in_progress_types_storage_per_shard& local();
};
struct frozen_schema_diff {
struct altered_schema {
frozen_schema_with_base_info old_schema;
frozen_schema_with_base_info new_schema;
};
std::vector<frozen_schema_with_base_info> created;
std::vector<altered_schema> altered;
std::vector<frozen_schema_with_base_info> dropped;
};
// schema_diff represents what is happening with tables or views during schema merge
struct schema_diff_per_shard {
struct altered_schema {
schema_ptr old_schema;
schema_ptr new_schema;
};
std::vector<schema_ptr> created;
std::vector<altered_schema> altered;
std::vector<schema_ptr> dropped;
future<frozen_schema_diff> freeze() const;
static future<schema_diff_per_shard> copy_from(replica::database&, in_progress_types_storage&, const frozen_schema_diff& oth);
};
class new_token_metadata {
std::vector<locator::mutable_token_metadata_ptr> shards{smp::count};
public:
locator::mutable_token_metadata_ptr& local();
future<> destroy();
};
struct affected_tables_and_views_per_shard {
schema_diff_per_shard tables;
schema_diff_per_shard views;
std::vector<bool> columns_changed;
};
struct affected_tables_and_views {
sharded<affected_tables_and_views_per_shard> tables_and_views;
std::unique_ptr<replica::tables_metadata_lock_on_all_shards> locks;
std::unordered_map<table_id, replica::global_table_ptr> table_shards;
new_token_metadata new_token_metadata; // represents token metadata after updating tablets metadata, nullptr if there was no change
};
// We wrap it with pointer because change_batch needs to be constructed and destructed
// on the same shard as it's used for.
using functions_change_batch_all_shards = sharded<cql3::functions::change_batch>;
// Schema_applier encapsulates intermediate state needed to construct schema objects from
// set of rows read from system tables (see struct schema_state). It does atomic (per shard)
// application of a new schema.
class schema_applier {
using keyspace_name = sstring;
sharded<service::storage_proxy>& _proxy;
sharded<service::storage_service>& _ss;
sharded<db::system_keyspace>& _sys_ks;
const bool _reload;
std::set<sstring> _keyspaces;
std::unordered_map<keyspace_name, table_selector> _affected_tables;
locator::tablet_metadata_change_hint _tablet_hint;
schema_persisted_state _before;
schema_persisted_state _after;
in_progress_types_storage _types_storage;
affected_keyspaces _affected_keyspaces;
affected_user_types _affected_user_types;
affected_tables_and_views _affected_tables_and_views;
functions_change_batch_all_shards _functions_batch; // includes aggregates
future<schema_persisted_state> get_schema_persisted_state();
public:
schema_applier(
sharded<service::storage_proxy>& proxy,
sharded<service::storage_service>& ss,
sharded<db::system_keyspace>& sys_ks,
bool reload = false)
: _proxy(proxy), _ss(ss), _sys_ks(sys_ks), _reload(reload) {};
// Gets called before mutations are applied,
// preferably no work should be done here but subsystem
// may do some snapshot of 'before' data.
future<> prepare(std::vector<mutation>& muts);
// Update is called after mutations are applied, it should create
// all updates but not yet commit them to a subsystem (i.e. copy on write style).
// All changes should be visible only to schema_applier object but not to other subsystems.
future<> update();
// Makes updates visible. Before calling this function in memory state as observed by other
// components should not yet change. The function atomically switches current state with
// new state (the one built in update function).
future<> commit();
// Post_commit is called after commit and allows to trigger code which can't provide
// atomicity either for legacy reasons or causes side effects to an external system
// (e.g. informing client's driver).
future<> post_commit();
// Some destruction may need to be done on particular shard hence we need to run it in coroutine.
future<> destroy();
private:
void commit_tables_and_views();
future<> finalize_tables_and_views();
void commit_on_shard(replica::database& db);
};
future<> merge_schema(sharded<db::system_keyspace>& sys_ks, distributed<service::storage_proxy>& proxy, gms::feature_service& feat, std::vector<mutation> mutations, bool reload = false);
}

View File

@@ -141,9 +141,9 @@ using computed_columns_map = std::unordered_map<bytes, column_computation_ptr>;
static computed_columns_map get_computed_columns(const schema_mutations& sm);
static std::vector<column_definition> create_columns_from_column_rows(
const schema_ctxt& ctxt,
const query::result_set& rows, const sstring& keyspace,
const sstring& table, bool is_super, column_view_virtual is_view_virtual, const computed_columns_map& computed_columns,
const data_dictionary::user_types_storage& user_types);
const sstring& table, bool is_super, column_view_virtual is_view_virtual, const computed_columns_map& computed_columns);
static std::vector<index_metadata> create_indices_from_index_rows(const query::result_set& rows,
@@ -1044,19 +1044,19 @@ future<std::vector<user_type>> create_types(replica::database& db, const std::ve
co_return ret;
}
std::vector<data_type> read_arg_types(const query::result_set_row& row, const sstring& keyspace, const data_dictionary::user_types_storage& user_types) {
std::vector<data_type> read_arg_types(replica::database& db, const query::result_set_row& row, const sstring& keyspace) {
std::vector<data_type> arg_types;
for (const auto& arg : get_list<sstring>(row, "argument_types")) {
arg_types.push_back(db::cql_type_parser::parse(keyspace, arg, user_types));
arg_types.push_back(db::cql_type_parser::parse(keyspace, arg, db.user_types()));
}
return arg_types;
}
future<shared_ptr<cql3::functions::user_function>> create_func(replica::database& db, const query::result_set_row& row, const data_dictionary::user_types_storage& user_types) {
future<shared_ptr<cql3::functions::user_function>> create_func(replica::database& db, const query::result_set_row& row) {
cql3::functions::function_name name{
row.get_nonnull<sstring>("keyspace_name"), row.get_nonnull<sstring>("function_name")};
auto arg_types = read_arg_types(row, name.keyspace, user_types);
data_type return_type = db::cql_type_parser::parse(name.keyspace, row.get_nonnull<sstring>("return_type"), user_types);
auto arg_types = read_arg_types(db, row, name.keyspace);
data_type return_type = db::cql_type_parser::parse(name.keyspace, row.get_nonnull<sstring>("return_type"), db.user_types());
// FIXME: We already computed the bitcode in
// create_function_statement, but it is not clear how to get it
@@ -1078,11 +1078,11 @@ future<shared_ptr<cql3::functions::user_function>> create_func(replica::database
row.get_nonnull<bool>("called_on_null_input"), std::move(*ctx));
}
shared_ptr<cql3::functions::user_aggregate> create_aggregate(replica::database& db, const query::result_set_row& row, const query::result_set_row* scylla_row, cql3::functions::change_batch& batch, const data_dictionary::user_types_storage& user_types) {
shared_ptr<cql3::functions::user_aggregate> create_aggregate(replica::database& db, const query::result_set_row& row, const query::result_set_row* scylla_row, cql3::functions::change_batch& batch) {
cql3::functions::function_name name{
row.get_nonnull<sstring>("keyspace_name"), row.get_nonnull<sstring>("aggregate_name")};
auto arg_types = read_arg_types(row, name.keyspace, user_types);
data_type state_type = db::cql_type_parser::parse(name.keyspace, row.get_nonnull<sstring>("state_type"), user_types);
auto arg_types = read_arg_types(db, row, name.keyspace);
data_type state_type = db::cql_type_parser::parse(name.keyspace, row.get_nonnull<sstring>("state_type"), db.user_types());
sstring sfunc = row.get_nonnull<sstring>("state_func");
auto ffunc = row.get<sstring>("final_func");
auto initcond_str = row.get<sstring>("initcond");
@@ -1270,10 +1270,9 @@ std::vector<mutation> make_drop_keyspace_mutations(schema_features features, lw_
*
* @param partition Keyspace attributes in serialized form
*/
future<lw_shared_ptr<keyspace_metadata>> create_keyspace_metadata(
distributed<service::storage_proxy>& proxy,
const schema_result_value_type& result,
lw_shared_ptr<query::result_set> scylla_specific_rs)
future<lw_shared_ptr<keyspace_metadata>> create_keyspace_from_schema_partition(distributed<service::storage_proxy>& proxy,
const schema_result_value_type& result,
lw_shared_ptr<query::result_set> scylla_specific_rs)
{
auto&& rs = result.second;
if (rs->empty()) {
@@ -1336,7 +1335,7 @@ seastar::future<std::vector<shared_ptr<cql3::functions::user_function>>> create_
replica::database& db, lw_shared_ptr<query::result_set> result) {
std::vector<shared_ptr<cql3::functions::user_function>> ret;
for (const auto& row : result->rows()) {
ret.emplace_back(co_await create_func(db, row, db.user_types()));
ret.emplace_back(co_await create_func(db, row));
}
co_return ret;
}
@@ -1354,16 +1353,16 @@ std::vector<shared_ptr<cql3::functions::user_aggregate>> create_aggregates_from_
std::vector<shared_ptr<cql3::functions::user_aggregate>> ret;
for (const auto& row : result->rows()) {
auto agg_name = row.get_nonnull<sstring>("aggregate_name");
auto agg_args = read_arg_types(row, row.get_nonnull<sstring>("keyspace_name"), db.user_types());
auto agg_args = read_arg_types(db, row, row.get_nonnull<sstring>("keyspace_name"));
const query::result_set_row *scylla_row_ptr = nullptr;
for (auto [it, end] = scylla_aggs.equal_range(agg_name); it != end; ++it) {
auto scylla_agg_args = read_arg_types(*it->second, it->second->get_nonnull<sstring>("keyspace_name"), db.user_types());
auto scylla_agg_args = read_arg_types(db, *it->second, it->second->get_nonnull<sstring>("keyspace_name"));
if (agg_args == scylla_agg_args) {
scylla_row_ptr = it->second;
break;
}
}
ret.emplace_back(create_aggregate(db, row, scylla_row_ptr, batch, db.user_types()));
ret.emplace_back(create_aggregate(db, row, scylla_row_ptr, batch));
}
return ret;
}
@@ -2019,8 +2018,7 @@ future<schema_ptr> create_table_from_name(distributed<service::storage_proxy>& p
if (!sm.live()) {
co_await coroutine::return_exception(std::runtime_error(format("{}:{} not found in the schema definitions keyspace.", qn.keyspace_name, qn.table_name)));
}
const schema_ctxt& ctxt = proxy;
co_return create_table_from_mutations(ctxt, std::move(sm), ctxt.user_types());
co_return create_table_from_mutations(proxy, std::move(sm));
}
// Limit concurrency of user tables to prevent stalls.
@@ -2190,7 +2188,7 @@ static void prepare_builder_from_scylla_tables_row(const schema_ctxt& ctxt, sche
}
}
schema_ptr create_table_from_mutations(const schema_ctxt& ctxt, schema_mutations sm, const data_dictionary::user_types_storage& user_types, std::optional<table_schema_version> version)
schema_ptr create_table_from_mutations(const schema_ctxt& ctxt, schema_mutations sm, std::optional<table_schema_version> version)
{
slogger.trace("create_table_from_mutations: version={}, {}", version, sm);
@@ -2225,14 +2223,14 @@ schema_ptr create_table_from_mutations(const schema_ctxt& ctxt, schema_mutations
auto computed_columns = get_computed_columns(sm);
std::vector<column_definition> column_defs = create_columns_from_column_rows(
ctxt,
query::result_set(sm.columns_mutation()),
ks_name,
cf_name,/*,
fullRawComparator, */
cf == cf_type::super,
column_view_virtual::no,
computed_columns,
user_types);
computed_columns);
builder.set_is_dense(is_dense);
@@ -2262,7 +2260,7 @@ schema_ptr create_table_from_mutations(const schema_ctxt& ctxt, schema_mutations
query::result_set dcr(*sm.dropped_columns_mutation());
for (auto& row : dcr.rows()) {
auto name = row.get_nonnull<sstring>("column_name");
auto type = cql_type_parser::parse(ks_name, row.get_nonnull<sstring>("type"), user_types);
auto type = cql_type_parser::parse(ks_name, row.get_nonnull<sstring>("type"), ctxt.user_types());
auto time = row.get_nonnull<db_clock::time_point>("dropped_time");
builder.without_column(name, type, time.time_since_epoch().count());
}
@@ -2401,20 +2399,19 @@ static computed_columns_map get_computed_columns(const schema_mutations& sm) {
}) | std::ranges::to<computed_columns_map>();
}
static std::vector<column_definition> create_columns_from_column_rows(
static std::vector<column_definition> create_columns_from_column_rows(const schema_ctxt& ctxt,
const query::result_set& rows,
const sstring& keyspace,
const sstring& table, /*,
AbstractType<?> rawComparator, */
bool is_super,
column_view_virtual is_view_virtual,
const computed_columns_map& computed_columns,
const data_dictionary::user_types_storage& user_types)
const computed_columns_map& computed_columns)
{
std::vector<column_definition> columns;
for (auto&& row : rows.rows()) {
auto kind = deserialize_kind(row.get_nonnull<sstring>("kind"));
auto type = cql_type_parser::parse(keyspace, row.get_nonnull<sstring>("type"), user_types);
auto type = cql_type_parser::parse(keyspace, row.get_nonnull<sstring>("type"), ctxt.user_types());
auto name_bytes = row.get_nonnull<bytes>("column_name_bytes");
column_id position = row.get_nonnull<int32_t>("position");
@@ -2461,11 +2458,8 @@ static index_metadata create_index_from_index_row(const query::result_set_row& r
return index_metadata{index_name, options, kind, is_local};
}
static schema_builder prepare_view_schema_builder_from_mutations(const schema_ctxt& ctxt,
const schema_mutations& sm,
const data_dictionary::user_types_storage& user_types,
std::optional<table_schema_version> version,
const query::result_set& table_rs) {
static schema_builder prepare_view_schema_builder_from_mutations(const schema_ctxt& ctxt, const schema_mutations& sm, std::optional<table_schema_version> version,
const query::result_set& table_rs) {
const query::result_set_row& row = table_rs.row(0);
auto ks_name = row.get_nonnull<sstring>("keyspace_name");
@@ -2483,12 +2477,12 @@ static schema_builder prepare_view_schema_builder_from_mutations(const schema_ct
}
auto computed_columns = get_computed_columns(sm);
auto column_defs = create_columns_from_column_rows(query::result_set(sm.columns_mutation()), ks_name, cf_name, false, column_view_virtual::no, computed_columns, user_types);
auto column_defs = create_columns_from_column_rows(ctxt, query::result_set(sm.columns_mutation()), ks_name, cf_name, false, column_view_virtual::no, computed_columns);
for (auto&& cdef : column_defs) {
builder.with_column_ordered(cdef);
}
if (sm.view_virtual_columns_mutation()) {
column_defs = create_columns_from_column_rows(query::result_set(*sm.view_virtual_columns_mutation()), ks_name, cf_name, false, column_view_virtual::yes, computed_columns, user_types);
column_defs = create_columns_from_column_rows(ctxt, query::result_set(*sm.view_virtual_columns_mutation()), ks_name, cf_name, false, column_view_virtual::yes, computed_columns);
for (auto&& cdef : column_defs) {
builder.with_column_ordered(cdef);
}
@@ -2507,12 +2501,9 @@ static schema_builder prepare_view_schema_builder_from_mutations(const schema_ct
* If the base info is not provided, the schema context must have a reference to the database,
* and the most up-to-date base schema will be pulled from there.
*/
view_ptr create_view_from_mutations(const schema_ctxt& ctxt, schema_mutations sm,
const data_dictionary::user_types_storage& user_types,
schema_ptr base_schema,
std::optional<table_schema_version> version) {
view_ptr create_view_from_mutations(const schema_ctxt& ctxt, schema_mutations sm, schema_ptr base_schema, std::optional<table_schema_version> version) {
auto table_rs = query::result_set(sm.columnfamilies_mutation());
auto builder = prepare_view_schema_builder_from_mutations(ctxt, sm, user_types, version, table_rs);
auto builder = prepare_view_schema_builder_from_mutations(ctxt, sm, version, table_rs);
const query::result_set_row& row = table_rs.row(0);
auto include_all_columns = row.get_nonnull<bool>("include_all_columns");
auto where_clause = row.get_nonnull<sstring>("where_clause");
@@ -2521,12 +2512,9 @@ view_ptr create_view_from_mutations(const schema_ctxt& ctxt, schema_mutations sm
return view_ptr(builder.build());
}
view_ptr create_view_from_mutations(const schema_ctxt& ctxt, schema_mutations sm,
const data_dictionary::user_types_storage& user_types,
std::optional<db::view::base_dependent_view_info> base_info,
std::optional<table_schema_version> version) {
view_ptr create_view_from_mutations(const schema_ctxt& ctxt, schema_mutations sm, std::optional<db::view::base_dependent_view_info> base_info, std::optional<table_schema_version> version) {
auto table_rs = query::result_set(sm.columnfamilies_mutation());
auto builder = prepare_view_schema_builder_from_mutations(ctxt, sm, user_types, version, table_rs);
auto builder = prepare_view_schema_builder_from_mutations(ctxt, sm, version, table_rs);
const query::result_set_row& row = table_rs.row(0);
auto id = table_id(row.get_nonnull<utils::UUID>("base_table_id"));
auto base_name = row.get_nonnull<sstring>("base_table_name");
@@ -2555,8 +2543,7 @@ static future<view_ptr> create_view_from_table_row(distributed<service::storage_
if (!sm.live()) {
co_await coroutine::return_exception(std::runtime_error(format("{}:{} not found in the view definitions keyspace.", qn.keyspace_name, qn.table_name)));
}
const schema_ctxt& ctxt = proxy;
co_return create_view_from_mutations(ctxt, std::move(sm), ctxt.user_types());
co_return create_view_from_mutations(proxy, std::move(sm));
}
/**

View File

@@ -238,7 +238,7 @@ std::vector<mutation> make_create_keyspace_mutations(schema_features features, l
std::vector<mutation> make_drop_keyspace_mutations(schema_features features, lw_shared_ptr<keyspace_metadata> keyspace, api::timestamp_type timestamp);
future<lw_shared_ptr<keyspace_metadata>> create_keyspace_metadata(distributed<service::storage_proxy>& proxy, const schema_result_value_type& partition, lw_shared_ptr<query::result_set> scylla_specific_rs);
future<lw_shared_ptr<keyspace_metadata>> create_keyspace_from_schema_partition(distributed<service::storage_proxy>& proxy, const schema_result_value_type& partition, lw_shared_ptr<query::result_set> scylla_specific_rs = nullptr);
future<lw_shared_ptr<query::result_set>> extract_scylla_specific_keyspace_info(distributed<service::storage_proxy>& proxy, const schema_result_value_type& partition);
@@ -251,13 +251,13 @@ future<std::vector<user_type>> create_types(replica::database& db, const std::ve
future<std::vector<user_type>> create_types_from_schema_partition(keyspace_metadata& ks, lw_shared_ptr<query::result_set> result);
std::vector<data_type> read_arg_types(const query::result_set_row& row, const sstring& keyspace, const data_dictionary::user_types_storage& user_types);
std::vector<data_type> read_arg_types(replica::database& db, const query::result_set_row& row, const sstring& keyspace);
future<shared_ptr<cql3::functions::user_function>> create_func(replica::database& db, const query::result_set_row& row, const data_dictionary::user_types_storage& user_types);
future<shared_ptr<cql3::functions::user_function>> create_func(replica::database& db, const query::result_set_row& row);
seastar::future<std::vector<shared_ptr<cql3::functions::user_function>>> create_functions_from_schema_partition(replica::database& db, lw_shared_ptr<query::result_set> result);
shared_ptr<cql3::functions::user_aggregate> create_aggregate(replica::database& db, const query::result_set_row& row, const query::result_set_row* scylla_row, cql3::functions::change_batch& batch, const data_dictionary::user_types_storage& user_types);
shared_ptr<cql3::functions::user_aggregate> create_aggregate(replica::database& db, const query::result_set_row& row, const query::result_set_row* scylla_row, cql3::functions::change_batch& batch);
std::vector<shared_ptr<cql3::functions::user_aggregate>> create_aggregates_from_schema_partition(replica::database& db, lw_shared_ptr<query::result_set> result, lw_shared_ptr<query::result_set> scylla_result, cql3::functions::change_batch& batch);
@@ -286,10 +286,10 @@ future<std::map<sstring, schema_ptr>> create_tables_from_tables_partition(distri
std::vector<mutation> make_drop_table_mutations(lw_shared_ptr<keyspace_metadata> keyspace, schema_ptr table, api::timestamp_type timestamp);
schema_ptr create_table_from_mutations(const schema_ctxt&, schema_mutations, const data_dictionary::user_types_storage& user_types, std::optional<table_schema_version> version = {});
schema_ptr create_table_from_mutations(const schema_ctxt&, schema_mutations, std::optional<table_schema_version> version = {});
view_ptr create_view_from_mutations(const schema_ctxt&, schema_mutations, const data_dictionary::user_types_storage&, schema_ptr, std::optional<table_schema_version> version = {});
view_ptr create_view_from_mutations(const schema_ctxt&, schema_mutations, const data_dictionary::user_types_storage&, std::optional<view::base_dependent_view_info> = {}, std::optional<table_schema_version> version = {});
view_ptr create_view_from_mutations(const schema_ctxt&, schema_mutations, schema_ptr, std::optional<table_schema_version> version = {});
view_ptr create_view_from_mutations(const schema_ctxt&, schema_mutations, std::optional<view::base_dependent_view_info> = {}, std::optional<table_schema_version> version = {});
future<std::vector<view_ptr>> create_views_from_schema_partition(distributed<service::storage_proxy>& proxy, const schema_result::mapped_type& result);

View File

@@ -9,7 +9,6 @@
#include "frozen_schema.hh"
#include "db/schema_tables.hh"
#include "db/view/view.hh"
#include "view_info.hh"
#include "mutation/canonical_mutation.hh"
#include "schema_mutations.hh"
#include "idl/frozen_schema.dist.hh"
@@ -32,12 +31,12 @@ schema_ptr frozen_schema::unfreeze(const db::schema_ctxt& ctxt, std::optional<db
auto sv = ser::deserialize(in, std::type_identity<ser::schema_view>());
auto sm = sv.mutations();
if (sm.is_view()) {
return db::schema_tables::create_view_from_mutations(ctxt, std::move(sm), ctxt.user_types(), std::move(base_info), sv.version());
return db::schema_tables::create_view_from_mutations(ctxt, std::move(sm), std::move(base_info), sv.version());
} else {
if (base_info) {
throw std::runtime_error("Trying to unfreeze regular table schema with base info");
}
return db::schema_tables::create_table_from_mutations(ctxt, std::move(sm), ctxt.user_types(), sv.version());
return db::schema_tables::create_table_from_mutations(ctxt, std::move(sm), sv.version());
}
}
@@ -49,13 +48,3 @@ const bytes_ostream& frozen_schema::representation() const
{
return _data;
}
frozen_schema_with_base_info::frozen_schema_with_base_info(const schema_ptr& c) : frozen_schema(c) {
if (c->is_view()) {
base_info = c->view_info()->base_info();
}
}
schema_ptr frozen_schema_with_base_info::unfreeze(const db::schema_ctxt& ctxt) const {
return frozen_schema::unfreeze(ctxt, base_info);
}

View File

@@ -31,14 +31,3 @@ public:
schema_ptr unfreeze(const db::schema_ctxt&, std::optional<db::view::base_dependent_view_info> base_info = {}) const;
const bytes_ostream& representation() const;
};
// To unfreeze view without base table added to schema registry
// we need base_info.
class frozen_schema_with_base_info : public frozen_schema {
public:
frozen_schema_with_base_info(const schema_ptr& c);
schema_ptr unfreeze(const db::schema_ctxt& ctxt) const;
private:
// Set only for views.
std::optional<db::view::base_dependent_view_info> base_info;
};

View File

@@ -1687,7 +1687,7 @@ sharded<locator::shared_token_metadata> token_metadata;
checkpoint(stop_signal, "starting migration manager");
debug::the_migration_manager = &mm;
mm.start(std::ref(mm_notifier), std::ref(feature_service), std::ref(messaging), std::ref(proxy), std::ref(ss), std::ref(gossiper), std::ref(group0_client), std::ref(sys_ks)).get();
mm.start(std::ref(mm_notifier), std::ref(feature_service), std::ref(messaging), std::ref(proxy), std::ref(gossiper), std::ref(group0_client), std::ref(sys_ks)).get();
auto stop_migration_manager = defer_verbose_shutdown("migration manager", [&mm] {
mm.stop().get();
});
@@ -1869,7 +1869,7 @@ sharded<locator::shared_token_metadata> token_metadata;
checkpoint(stop_signal, "loading tablet metadata");
try {
ss.local().update_tablet_metadata({}).get();
ss.local().load_tablet_metadata({}).get();
} catch (...) {
if (!cfg->maintenance_mode()) {
throw;

View File

@@ -9,8 +9,7 @@ target_sources(replica
memtable.cc
exceptions.cc
dirty_memory_manager.cc
mutation_dump.cc
schema_describe_helper.cc)
mutation_dump.cc)
target_include_directories(replica
PUBLIC
${CMAKE_SOURCE_DIR})

View File

@@ -8,20 +8,16 @@
#include <algorithm>
#include <exception>
#include <fmt/ranges.h>
#include <fmt/std.h>
#include <seastar/core/rwlock.hh>
#include "locator/network_topology_strategy.hh"
#include "locator/tablets.hh"
#include "locator/token_metadata_fwd.hh"
#include "utils/log.hh"
#include "replica/database_fwd.hh"
#include <seastar/core/shard_id.hh>
#include "utils/assert.hh"
#include "utils/lister.hh"
#include "replica/database.hh"
#include <memory>
#include <seastar/core/future-util.hh>
#include "db/system_keyspace.hh"
#include "db/system_keyspace_sstables_registry.hh"
@@ -100,8 +96,9 @@ make_flush_controller(const db::config& cfg, backlog_controller::scheduling_grou
return flush_controller(sg, cfg.memtable_flush_static_shares(), 50ms, cfg.unspooled_dirty_soft_limit(), std::move(fn));
}
keyspace::keyspace(config cfg, locator::effective_replication_map_factory& erm_factory)
: _config(std::move(cfg))
keyspace::keyspace(lw_shared_ptr<keyspace_metadata> metadata, config cfg, locator::effective_replication_map_factory& erm_factory)
: _metadata(std::move(metadata))
, _config(std::move(cfg))
, _erm_factory(erm_factory)
{}
@@ -742,10 +739,8 @@ future<> database::parse_system_tables(distributed<service::storage_proxy>& prox
using namespace db::schema_tables;
co_await do_parse_schema_tables(proxy, db::schema_tables::KEYSPACES, coroutine::lambda([&] (schema_result_value_type &v) -> future<> {
auto scylla_specific_rs = co_await extract_scylla_specific_keyspace_info(proxy, v);
auto ksm = co_await create_keyspace_metadata(proxy, v, scylla_specific_rs);
auto ks = co_await create_keyspace(ksm, proxy.local().get_erm_factory(), system_keyspace::no);
insert_keyspace(std::move(ks));
co_return;
auto ksm = co_await create_keyspace_from_schema_partition(proxy, v, scylla_specific_rs);
co_return co_await create_keyspace(ksm, proxy.local().get_erm_factory(), system_keyspace::no);
}));
co_await do_parse_schema_tables(proxy, db::schema_tables::TYPES, coroutine::lambda([&] (schema_result_value_type &v) -> future<> {
auto& ks = this->find_keyspace(v.first);
@@ -840,62 +835,59 @@ database::init_commitlog() {
});
}
future<> database::modify_keyspace_on_all_shards(sharded<database>& sharded_db, std::function<future<>(replica::database&)> func) {
future<> database::modify_keyspace_on_all_shards(sharded<database>& sharded_db, std::function<future<>(replica::database&)> func, std::function<future<>(replica::database&)> notifier) {
// Run func first on shard 0
// to allow "seeding" of the effective_replication_map
// with a new e_r_m instance.
SCYLLA_ASSERT(this_shard_id() == 0);
co_await func(sharded_db.local());
co_await sharded_db.invoke_on_others([&] (replica::database& db) {
co_await sharded_db.invoke_on(0, func);
co_await sharded_db.invoke_on_all([&] (replica::database& db) {
if (this_shard_id() == 0) {
return make_ready_future<>();
}
return func(db);
});
co_await sharded_db.invoke_on_all(notifier);
}
future<keyspace_change> database::prepare_update_keyspace(const keyspace& ks, lw_shared_ptr<keyspace_metadata> metadata) const {
auto strategy = keyspace::create_replication_strategy(metadata);
locator::vnode_effective_replication_map_ptr erm = nullptr;
if (!strategy->is_per_table()) {
erm = co_await ks.create_effective_replication_map(strategy,
get_shared_token_metadata());
}
co_return keyspace_change{
.metadata = metadata,
.strategy = std::move(strategy),
.erm = std::move(erm),
};
}
future<> database::update_keyspace(const keyspace_metadata& tmp_ksm) {
auto& ks = find_keyspace(tmp_ksm.name());
auto new_ksm = ::make_lw_shared<keyspace_metadata>(tmp_ksm.name(), tmp_ksm.strategy_name(), tmp_ksm.strategy_options(), tmp_ksm.initial_tablets(), tmp_ksm.durable_writes(),
ks.metadata()->cf_meta_data() | std::views::values | std::ranges::to<std::vector>(), std::move(ks.metadata()->user_types()), tmp_ksm.get_storage_options());
void database::update_keyspace(std::unique_ptr<keyspace_change> change) {
auto& ks = find_keyspace(change->metadata->name());
bool old_durable_writes = ks.metadata()->durable_writes();
bool new_durable_writes = change->metadata->durable_writes();
bool new_durable_writes = new_ksm->durable_writes();
if (old_durable_writes != new_durable_writes) {
for (auto& [cf_name, cf_schema] : change->metadata->cf_meta_data()) {
for (auto& [cf_name, cf_schema] : new_ksm->cf_meta_data()) {
auto& cf = find_column_family(cf_schema);
cf.set_durable_writes(new_durable_writes);
}
}
ks.apply(*change);
co_await ks.update_from(get_shared_token_metadata(), std::move(new_ksm));
}
future<database::keyspace_change_per_shard> database::prepare_update_keyspace_on_all_shards(sharded<database>& sharded_db, const keyspace_metadata& ksm) {
keyspace_change_per_shard changes(smp::count);
co_await modify_keyspace_on_all_shards(sharded_db, [&] (replica::database& db) -> future<> {
auto& ks = db.find_keyspace(ksm.name());
auto new_ksm = ::make_lw_shared<keyspace_metadata>(ksm.name(), ksm.strategy_name(), ksm.strategy_options(), ksm.initial_tablets(), ksm.durable_writes(),
ks.metadata()->cf_meta_data() | std::views::values | std::ranges::to<std::vector>(), ks.metadata()->user_types(), ksm.get_storage_options());
auto change = co_await db.prepare_update_keyspace(ks, new_ksm);
changes[this_shard_id()] = make_foreign(std::make_unique<keyspace_change>(std::move(change)));
co_return;
future<> database::update_keyspace_on_all_shards(sharded<database>& sharded_db, const keyspace_metadata& ksm) {
return modify_keyspace_on_all_shards(sharded_db, [&] (replica::database& db) {
return db.update_keyspace(ksm);
}, [&] (replica::database& db) {
const auto& ks = db.find_keyspace(ksm.name());
return db.get_notifier().update_keyspace(ks.metadata());
});
co_return changes;
}
void database::drop_keyspace(const sstring& name) {
_keyspaces.erase(name);
}
future<> database::drop_keyspace_on_all_shards(sharded<database>& sharded_db, const sstring& name) {
return modify_keyspace_on_all_shards(sharded_db, [&] (replica::database& db) {
db.drop_keyspace(name);
return make_ready_future<>();
}, [&] (replica::database& db) {
return db.get_notifier().drop_keyspace(name);
});
}
static bool is_system_table(const schema& s) {
auto& k = s.ks_name();
return k == db::system_keyspace::NAME ||
@@ -951,8 +943,7 @@ future<> database::create_local_system_table(
std::nullopt,
durable
);
auto ks = co_await create_keyspace(ksm, erm_factory, replica::database::system_keyspace::yes);
insert_keyspace(std::move(ks));
co_await create_keyspace(ksm, erm_factory, replica::database::system_keyspace::yes);
}
auto& ks = find_keyspace(ks_name);
auto cfg = ks.make_column_family_config(*table, *this);
@@ -962,18 +953,7 @@ future<> database::create_local_system_table(
cfg.memtable_scheduling_group = default_scheduling_group();
cfg.memtable_to_cache_scheduling_group = default_scheduling_group();
}
auto lock = get_tables_metadata().hold_write_lock();
std::exception_ptr ex;
try {
add_column_family(ks, table, std::move(cfg), replica::database::is_new_cf::no);
} catch (...) {
ex = std::current_exception();
}
// cleanup
if (ex && column_family_exists(table->id())) {
auto& cf = find_column_family(table);
co_await cf.stop();
}
co_await add_column_family(ks, table, std::move(cfg), replica::database::is_new_cf::no);
}
db::commitlog* database::commitlog_for(const schema_ptr& schema) {
@@ -982,17 +962,12 @@ db::commitlog* database::commitlog_for(const schema_ptr& schema) {
: _commitlog.get();
}
void database::add_column_family(keyspace& ks, schema_ptr schema, column_family::config cfg, is_new_cf is_new, locator::token_metadata_ptr not_commited_new_metadata) {
future<> database::add_column_family(keyspace& ks, schema_ptr schema, column_family::config cfg, is_new_cf is_new) {
schema = local_schema_registry().learn(schema);
auto&& rs = ks.get_replication_strategy();
locator::effective_replication_map_ptr erm;
if (auto pt_rs = rs.maybe_as_per_table()) {
auto metadata_ptr = not_commited_new_metadata;
if (!metadata_ptr) {
// use the current one
metadata_ptr = _shared_token_metadata.get();
}
erm = pt_rs->make_replication_map(schema->id(), metadata_ptr);
erm = pt_rs->make_replication_map(schema->id(), _shared_token_metadata.get());
} else {
erm = ks.get_vnode_effective_replication_map();
}
@@ -1015,34 +990,23 @@ void database::add_column_family(keyspace& ks, schema_ptr schema, column_family:
throw std::invalid_argument("Column family " + schema->cf_name() + " exists");
}
cf->start();
_tables_metadata.add_table(*this, ks, *cf, schema);
auto f = co_await coroutine::as_future(_tables_metadata.add_table(*this, ks, *cf, schema));
if (f.failed()) {
co_await cf->stop();
co_await coroutine::return_exception_ptr(f.get_exception());
}
// Table must be added before entry is marked synced.
schema->registry_entry()->mark_synced();
}
future<> database::make_column_family_directory(schema_ptr schema) {
future<> database::add_column_family_and_make_directory(schema_ptr schema, is_new_cf is_new) {
auto& ks = find_keyspace(schema->ks_name());
co_await add_column_family(ks, schema, ks.make_column_family_config(*schema, *this), is_new);
auto& cf = find_column_family(schema);
cf.get_index_manager().reload();
co_await cf.init_storage();
}
future<> database::add_column_family_and_make_directory(schema_ptr schema, is_new_cf is_new) {
auto lock = co_await get_tables_metadata().hold_write_lock();
auto& ks = find_keyspace(schema->ks_name());
std::exception_ptr ex;
try {
add_column_family(ks, schema, ks.make_column_family_config(*schema, *this), is_new);
} catch (...) {
ex = std::current_exception();
}
// cleanup
if (ex && column_family_exists(schema->id())) {
auto& cf = find_column_family(schema);
co_await cf.stop();
}
co_await make_column_family_directory(schema);
}
bool database::update_column_family(schema_ptr new_schema) {
column_family& cfm = find_column_family(new_schema->id());
bool columns_changed = !cfm.schema()->equal_columns(*new_schema);
@@ -1058,55 +1022,38 @@ bool database::update_column_family(schema_ptr new_schema) {
return columns_changed;
}
void database::remove(table& cf) noexcept {
future<> database::remove(table& cf) noexcept {
cf.deregister_metrics();
_tables_metadata.remove_table(*this, cf);
return _tables_metadata.remove_table(*this, cf);
}
future<> database::detach_column_family(table& cf) {
auto uuid = cf.schema()->id();
co_await remove(cf);
cf.clear_views();
co_await cf.await_pending_ops();
co_await foreach_reader_concurrency_semaphore([uuid] (reader_concurrency_semaphore& sem) -> future<> {
co_await sem.evict_inactive_reads_for_table(uuid);
});
}
global_table_ptr::global_table_ptr() {
_p.resize(smp::count);
_views.resize(smp::count);
_base.resize(smp::count);
}
void global_table_ptr::assign(database& db, table_id uuid) {
auto& t = db.find_column_family(uuid);
global_table_ptr::global_table_ptr(global_table_ptr&& o) noexcept
: _p(std::move(o._p))
{ }
std::vector<lw_shared_ptr<replica::table>> views;
views.reserve(t.views().size());
for (const auto& v : t.views()) {
views.push_back(db.find_column_family(v).shared_from_this());
}
if (t.schema()->is_view()) {
auto& base = db.find_column_family(t.schema()->view_info()->base_id());
_base[this_shard_id()] = make_foreign(base.shared_from_this());
}
global_table_ptr::~global_table_ptr() {}
void global_table_ptr::assign(table& t) {
_p[this_shard_id()] = make_foreign(t.shared_from_this());
_views[this_shard_id()] = make_foreign(
std::make_unique<std::vector<lw_shared_ptr<table>>>(std::move(views)));
}
table* global_table_ptr::operator->() const noexcept { return &*_p[this_shard_id()]; }
table& global_table_ptr::operator*() const noexcept { return *_p[this_shard_id()]; }
void global_table_ptr::clear_views() noexcept {
_views[this_shard_id()]->clear();
}
std::vector<lw_shared_ptr<table>>& global_table_ptr::views() const noexcept {
return *_views[this_shard_id()];
}
table& global_table_ptr::base() const noexcept {
return *_base[this_shard_id()];
}
void tables_metadata_lock_on_all_shards::assign_lock(seastar::rwlock::holder&& h) {
_holders[this_shard_id()] = make_foreign(std::make_unique<seastar::rwlock::holder>(std::move(h)));
}
future<global_table_ptr> get_table_on_all_shards(sharded<database>& sharded_db, sstring ks_name, sstring cf_name) {
auto uuid = sharded_db.local().find_uuid(ks_name, cf_name);
return get_table_on_all_shards(sharded_db, std::move(uuid));
@@ -1116,54 +1063,32 @@ future<global_table_ptr> get_table_on_all_shards(sharded<database>& sharded_db,
global_table_ptr table_shards;
co_await sharded_db.invoke_on_all([&] (auto& db) {
try {
table_shards.assign(db, uuid);
} catch (const no_such_column_family& err) {
on_internal_error(dblog, err.what());
table_shards.assign(db.find_column_family(uuid));
} catch (no_such_column_family&) {
on_internal_error(dblog, fmt::format("Table UUID={} not found", uuid));
}
});
co_return table_shards;
}
future<tables_metadata_lock_on_all_shards> database::prepare_tables_metadata_change_on_all_shards(sharded<database>& sharded_db) {
tables_metadata_lock_on_all_shards locks;
co_await sharded_db.invoke_on_all([&] (auto& db) -> future<> {
locks.assign_lock(co_await db.get_tables_metadata().hold_write_lock());
});
co_return locks;
}
future<global_table_ptr> database::prepare_drop_table_on_all_shards(sharded<database>& sharded_db, table_id uuid) {
co_return co_await get_table_on_all_shards(sharded_db, uuid);;
}
void database::drop_table(sharded<database>& sharded_db,
sstring ks_name, sstring cf_name, bool with_snapshot, global_table_ptr& table_shards) {
future<> database::drop_table_on_all_shards(sharded<database>& sharded_db, sharded<db::system_keyspace>& sys_ks,
sstring ks_name, sstring cf_name, bool with_snapshot) {
auto auto_snapshot = sharded_db.local().get_config().auto_snapshot();
dblog.info("Dropping {}.{} {}snapshot", ks_name, cf_name, with_snapshot && auto_snapshot ? "with auto-" : "without ");
auto& cf = *table_shards;
sharded_db.local().remove(cf);
table_shards.clear_views();
cf.clear_views();
}
future<> database::cleanup_drop_table_on_all_shards(sharded<database>& sharded_db, sharded<db::system_keyspace>& sys_ks,
bool with_snapshot, global_table_ptr& table_shards) {
co_await sharded_db.invoke_on_all([&] (database& db) -> future<> {
auto& cf = *table_shards;
auto uuid = cf.schema()->id();
co_await cf.await_pending_ops();
co_await db.foreach_reader_concurrency_semaphore([uuid] (reader_concurrency_semaphore& sem) -> future<> {
co_await sem.evict_inactive_reads_for_table(uuid);
});
auto uuid = sharded_db.local().find_uuid(ks_name, cf_name);
auto table_shards = co_await get_table_on_all_shards(sharded_db, uuid);
std::optional<sstring> snapshot_name_opt;
if (with_snapshot) {
snapshot_name_opt = format("pre-drop-{}", db_clock::now().time_since_epoch().count());
}
co_await sharded_db.invoke_on_all([&] (database& db) {
return db.detach_column_family(*table_shards);
});
// Use a time point in the far future (9999-12-31T00:00:00+0000)
// to ensure all sstables are truncated,
// but be careful to stays within the client's datetime limits.
constexpr db_clock::time_point truncated_at(std::chrono::seconds(253402214400));
std::optional<sstring> snapshot_name_opt;
if (with_snapshot) {
snapshot_name_opt = format("pre-drop-{}", db_clock::now().time_since_epoch().count());
}
auto f = co_await coroutine::as_future(truncate_table_on_all_shards(sharded_db, sys_ks, table_shards, truncated_at, with_snapshot, std::move(snapshot_name_opt)));
co_await smp::invoke_on_all([&] {
return table_shards->stop();
@@ -1172,17 +1097,6 @@ future<> database::cleanup_drop_table_on_all_shards(sharded<database>& sharded_d
co_await table_shards->destroy_storage();
}
future<> database::legacy_drop_table_on_all_shards(sharded<database>& sharded_db, sharded<db::system_keyspace>& sys_ks,
sstring ks_name, sstring cf_name, bool with_snapshot) {
auto locks = co_await prepare_tables_metadata_change_on_all_shards(sharded_db);
auto uuid = sharded_db.local().find_uuid(ks_name, cf_name);
auto table_shards = co_await prepare_drop_table_on_all_shards(sharded_db, uuid);
co_await sharded_db.invoke_on_all([&] (database& db) {
return db.drop_table(sharded_db, ks_name, cf_name, with_snapshot, table_shards);
});
co_await cleanup_drop_table_on_all_shards(sharded_db, sys_ks, with_snapshot, table_shards);
}
table_id database::find_uuid(std::string_view ks, std::string_view cf) const {
try {
return _tables_metadata.get_table_id(std::make_pair(ks, cf));
@@ -1335,17 +1249,19 @@ bool database::column_family_exists(const table_id& uuid) const {
return _tables_metadata.contains(uuid);
}
locator::replication_strategy_ptr
keyspace::create_replication_strategy(lw_shared_ptr<keyspace_metadata> metadata) {
future<>
keyspace::create_replication_strategy(const locator::shared_token_metadata& stm) {
using namespace locator;
replication_strategy_params params(metadata->strategy_options(), metadata->initial_tablets());
rslogger.debug("replication strategy for keyspace {} is {}, opts={}",
metadata->name(), metadata->strategy_name(), metadata->strategy_options());
return abstract_replication_strategy::create_replication_strategy(metadata->strategy_name(), params);
}
future<locator::vnode_effective_replication_map_ptr> keyspace::create_effective_replication_map(locator::replication_strategy_ptr strategy, const locator::shared_token_metadata& stm) const {
co_return co_await _erm_factory.create_effective_replication_map(strategy, stm.get());
locator::replication_strategy_params params(_metadata->strategy_options(), _metadata->initial_tablets());
_replication_strategy =
abstract_replication_strategy::create_replication_strategy(_metadata->strategy_name(), params);
rslogger.debug("replication strategy for keyspace {} is {}, opts={}",
_metadata->name(), _metadata->strategy_name(), _metadata->strategy_options());
if (!_replication_strategy->is_per_table()) {
auto erm = co_await _erm_factory.create_effective_replication_map(_replication_strategy, stm.get());
update_effective_replication_map(std::move(erm));
}
}
void
@@ -1358,10 +1274,9 @@ keyspace::get_replication_strategy() const {
return *_replication_strategy;
}
void keyspace::apply(keyspace_change kc) {
_metadata = std::move(kc.metadata);
_replication_strategy = std::move(kc.strategy);
_effective_replication_map = std::move(kc.erm);
future<> keyspace::update_from(const locator::shared_token_metadata& stm, ::lw_shared_ptr<keyspace_metadata> ksm) {
_metadata = std::move(ksm);
return create_replication_strategy(stm);
}
column_family::config
@@ -1451,36 +1366,31 @@ std::vector<view_ptr> database::get_views() const {
| std::views::transform([] (auto& cf) { return view_ptr(cf->schema()); }));
}
future<std::unique_ptr<keyspace>> database::create_in_memory_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm, locator::effective_replication_map_factory& erm_factory, system_keyspace system) {
future<> database::create_in_memory_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm, locator::effective_replication_map_factory& erm_factory, system_keyspace system) {
auto kscfg = make_keyspace_config(*ksm, system);
auto ks(std::make_unique<keyspace>(std::move(kscfg), erm_factory));
auto change = co_await prepare_update_keyspace(*ks, ksm);
ks->apply(std::move(change));
co_return ks;
keyspace ks(ksm, std::move(kscfg), erm_factory);
co_await ks.create_replication_strategy(get_shared_token_metadata());
_keyspaces.emplace(ksm->name(), std::move(ks));
}
future<std::unique_ptr<keyspace>>
future<>
database::create_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm, locator::effective_replication_map_factory& erm_factory, system_keyspace system) {
co_await get_sstables_manager(system).init_keyspace_storage(ksm->get_storage_options(), ksm->name());
co_return co_await create_in_memory_keyspace(ksm, erm_factory, system);
}
void database::insert_keyspace(std::unique_ptr<keyspace> ks) {
auto& name = ks->metadata()->name();
if (_keyspaces.contains(name)) {
return;
if (_keyspaces.contains(ksm->name())) {
co_return;
}
_keyspaces.emplace(name, std::move(*ks));
co_await create_in_memory_keyspace(ksm, erm_factory, system);
co_await get_sstables_manager(system).init_keyspace_storage(ksm->get_storage_options(), ksm->name());
}
future<database::created_keyspace_per_shard> database::prepare_create_keyspace_on_all_shards(sharded<database>& sharded_db, sharded<service::storage_proxy>& proxy, const keyspace_metadata& ks_metadata) {
created_keyspace_per_shard created(smp::count);
future<> database::create_keyspace_on_all_shards(sharded<database>& sharded_db, sharded<service::storage_proxy>& proxy, const keyspace_metadata& ks_metadata) {
co_await modify_keyspace_on_all_shards(sharded_db, [&] (replica::database& db) -> future<> {
auto ksm = keyspace_metadata::new_keyspace(ks_metadata);
auto ks = co_await db.create_keyspace(ksm, proxy.local().get_erm_factory(), system_keyspace::no);
created[this_shard_id()] = make_foreign(std::move(ks));
co_await db.create_keyspace(ksm, proxy.local().get_erm_factory(), system_keyspace::no);
}, [&] (replica::database& db) -> future<> {
const auto& ks = db.find_keyspace(ks_metadata.name());
co_await db.get_notifier().create_keyspace(ks.metadata());
});
co_return created;
}
future<>
@@ -2648,19 +2558,19 @@ future<> database::truncate_table_on_all_shards(sharded<database>& sharded_db, s
co_await coroutine::parallel_for_each(std::views::iota(0u, smp::count), [&] (unsigned shard) -> future<> {
table_states[shard] = co_await smp::submit_to(shard, [&] () -> future<foreign_ptr<std::unique_ptr<table_truncate_state>>> {
auto& cf = *table_shards;
auto& views = table_shards.views();
auto st = std::make_unique<table_truncate_state>();
st->holder = cf.async_gate().hold();
st->cres.reserve(1 + views.size());
st->cres.reserve(1 + cf.views().size());
auto& db = sharded_db.local();
auto& cm = db.get_compaction_manager();
co_await cf.parallel_foreach_table_state([&cm, &st] (compaction::table_state& ts) -> future<> {
st->cres.emplace_back(co_await cm.stop_and_disable_compaction(ts));
});
co_await coroutine::parallel_for_each(views, [&] (lw_shared_ptr<replica::table> v) -> future<> {
co_await v->parallel_foreach_table_state([&cm, &st] (compaction::table_state& ts) -> future<> {
co_await coroutine::parallel_for_each(cf.views(), [&] (view_ptr v) -> future<> {
auto& vcf = db.find_column_family(v);
co_await vcf.parallel_foreach_table_state([&cm, &st] (compaction::table_state& ts) -> future<> {
st->cres.emplace_back(co_await cm.stop_and_disable_compaction(ts));
});
});
@@ -2690,7 +2600,6 @@ future<> database::truncate_table_on_all_shards(sharded<database>& sharded_db, s
co_await sharded_db.invoke_on_all([&] (replica::database& db) -> future<> {
unsigned shard = this_shard_id();
auto& cf = *table_shards;
auto& views = table_shards.views();
auto& st = *table_states[shard];
// Force mutations coming in to re-acquire higher rp:s
@@ -2701,8 +2610,10 @@ future<> database::truncate_table_on_all_shards(sharded<database>& sharded_db, s
st.low_mark = cf.set_low_replay_position_mark();
co_await flush_or_clear(cf);
co_await coroutine::parallel_for_each(views, [&] (lw_shared_ptr<replica::table> v) -> future<> {
co_await flush_or_clear(*v);
co_await coroutine::parallel_for_each(cf.views(), [&] (view_ptr v) -> future<> {
auto& vcf = db.find_column_family(v);
co_await flush_or_clear(vcf);
});
// Since writes could be appended to active memtable between getting low_mark above
// and flush, the low_mark has to be adjusted to account for those writes, where
@@ -2728,15 +2639,14 @@ future<> database::truncate_table_on_all_shards(sharded<database>& sharded_db, s
co_await sharded_db.invoke_on_all([&] (database& db) {
auto shard = this_shard_id();
auto& cf = *table_shards;
auto& views = table_shards.views();
auto& st = *table_states[shard];
return db.truncate(sys_ks.local(), cf, views, st);
return db.truncate(sys_ks.local(), cf, st);
});
dblog.info("Truncated {}.{}", s->ks_name(), s->cf_name());
}
future<> database::truncate(db::system_keyspace& sys_ks, column_family& cf, std::vector<lw_shared_ptr<replica::table>>& views, const table_truncate_state& st) {
future<> database::truncate(db::system_keyspace& sys_ks, column_family& cf, const table_truncate_state& st) {
dblog.trace("Truncating {}.{} on shard", cf.schema()->ks_name(), cf.schema()->cf_name());
const auto uuid = cf.schema()->id();
@@ -2774,9 +2684,10 @@ future<> database::truncate(db::system_keyspace& sys_ks, column_family& cf, std:
rp = st.low_mark;
}
}
co_await coroutine::parallel_for_each(views, [&sys_ks, truncated_at] (lw_shared_ptr<replica::table> v) -> future<> {
db::replay_position rp = co_await v->discard_sstables(truncated_at);
co_await sys_ks.save_truncation_record(*v, truncated_at, rp);
co_await coroutine::parallel_for_each(cf.views(), [this, &sys_ks, truncated_at] (view_ptr v) -> future<> {
auto& vcf = find_column_family(v);
db::replay_position rp = co_await vcf.discard_sstables(truncated_at);
co_await sys_ks.save_truncation_record(vcf, truncated_at, rp);
});
// save_truncation_record() may actually fail after we cached the truncation time
// but this is not be worse that if failing without caching: at least the correct time
@@ -3054,18 +2965,14 @@ size_t database::tables_metadata::size() const noexcept {
return _column_families.size();
}
future<rwlock::holder> database::tables_metadata::hold_write_lock() {
co_return co_await _cf_lock.hold_write_lock();
}
void database::tables_metadata::add_table(database& db, keyspace& ks, table& cf, schema_ptr s) {
SCYLLA_ASSERT(!_cf_lock.try_write_lock()); // lock should be acquired before the call
future<> database::tables_metadata::add_table(database& db, keyspace& ks, table& cf, schema_ptr s) {
auto holder = co_await _cf_lock.hold_write_lock();
add_table_helper(db, ks, cf, s);
}
void database::tables_metadata::remove_table(database& db, table& cf) noexcept {
SCYLLA_ASSERT(!_cf_lock.try_write_lock()); // lock should be acquired before the call
future<> database::tables_metadata::remove_table(database& db, table& cf) noexcept {
try {
auto holder = co_await _cf_lock.hold_write_lock();
auto s = cf.schema();
auto& ks = db.find_keyspace(s->ks_name());
remove_table_helper(db, ks, cf, s);

View File

@@ -15,8 +15,6 @@
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/execution_stage.hh>
#include <seastar/core/when_all.hh>
#include "replica/global_table_ptr.hh"
#include "types/user.hh"
#include "utils/assert.hh"
#include "utils/hash.hh"
#include "db_clock.hh"
@@ -70,7 +68,6 @@
#include "compaction/compaction_fwd.hh"
#include "compaction_group.hh"
#include "service/qos/qos_configuration_change_subscriber.hh"
#include "replica/tables_metadata_lock.hh"
class cell_locker;
class cell_locker_stats;
@@ -1015,7 +1012,7 @@ private:
future<snapshot_file_set> take_snapshot(sstring jsondir);
// Writes the table schema and the manifest of all files in the snapshot directory.
future<> finalize_snapshot(const global_table_ptr& table_shards, sstring jsondir, std::vector<snapshot_file_set> file_sets);
future<> finalize_snapshot(database& db, sstring jsondir, std::vector<snapshot_file_set> file_sets);
static future<> seal_snapshot(sstring jsondir, std::vector<snapshot_file_set> file_sets);
public:
static future<> snapshot_on_all_shards(sharded<database>& sharded_db, const global_table_ptr& table_shards, sstring name);
@@ -1039,7 +1036,7 @@ public:
* CREATE INDEX command.
* The same is true for local index and MATERIALIZED VIEW.
*/
future<> write_schema_as_cql(const global_table_ptr& table_shards, sstring dir) const;
future<> write_schema_as_cql(database& db, sstring dir) const;
bool incremental_backups_enabled() const {
return _config.enable_incremental_backups;
@@ -1334,17 +1331,6 @@ using user_types_metadata = data_dictionary::user_types_metadata;
using keyspace_metadata = data_dictionary::keyspace_metadata;
// Encapsulates objects needed to update keyspace schema
struct keyspace_change {
lw_shared_ptr<keyspace_metadata> metadata;
locator::replication_strategy_ptr strategy;
locator::vnode_effective_replication_map_ptr erm;
const sstring& keyspace_name() const {
return metadata->name();
}
};
class keyspace {
public:
struct config {
@@ -1376,14 +1362,14 @@ private:
locator::effective_replication_map_factory& _erm_factory;
public:
explicit keyspace(config cfg, locator::effective_replication_map_factory& erm_factory);
explicit keyspace(lw_shared_ptr<keyspace_metadata> metadata, config cfg, locator::effective_replication_map_factory& erm_factory);
keyspace(const keyspace&) = delete;
void operator=(const keyspace&) = delete;
keyspace(keyspace&&) = default;
future<> shutdown() noexcept;
void apply(keyspace_change kc);
future<> update_from(const locator::shared_token_metadata& stm, lw_shared_ptr<keyspace_metadata>);
future<> init_storage();
@@ -1392,12 +1378,7 @@ public:
* boom, it is replaced.
*/
lw_shared_ptr<keyspace_metadata> metadata() const;
static locator::replication_strategy_ptr create_replication_strategy(
lw_shared_ptr<keyspace_metadata> metadata);
future<locator::vnode_effective_replication_map_ptr> create_effective_replication_map(
locator::replication_strategy_ptr strategy,
const locator::shared_token_metadata& stm) const;
future<> create_replication_strategy(const locator::shared_token_metadata& stm);
void update_effective_replication_map(locator::vnode_effective_replication_map_ptr erm);
/**
@@ -1494,11 +1475,8 @@ public:
public:
size_t size() const noexcept;
// write lock is needed during adding or removing table
future<rwlock::holder> hold_write_lock();
void add_table(database& db, keyspace& ks, table& cf, schema_ptr s);
void remove_table(database& db, table& cf) noexcept;
future<> add_table(database& db, keyspace& ks, table& cf, schema_ptr s);
future<> remove_table(database& db, table& cf) noexcept;
table& get_table(table_id id) const;
table_id get_table_id(const std::pair<std::string_view, std::string_view>& kscf) const;
lw_shared_ptr<table> get_table_if_exists(table_id id) const;
@@ -1661,7 +1639,7 @@ private:
future<> flush_system_column_families();
using system_keyspace = bool_class<struct system_keyspace_tag>;
future<std::unique_ptr<keyspace>> create_in_memory_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm, locator::effective_replication_map_factory& erm_factory, system_keyspace system);
future<> create_in_memory_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm, locator::effective_replication_map_factory& erm_factory, system_keyspace system);
void setup_metrics();
void setup_scylla_memory_diagnostics_producer();
reader_concurrency_semaphore& read_concurrency_sem();
@@ -1679,18 +1657,15 @@ private:
template<typename Future>
Future update_write_metrics(Future&& f);
void update_write_metrics_for_timed_out_write();
future<std::unique_ptr<keyspace>> create_keyspace(const lw_shared_ptr<keyspace_metadata>&, locator::effective_replication_map_factory& erm_factory, system_keyspace system);
void remove(table&) noexcept;
future<keyspace_change> prepare_update_keyspace(const keyspace& ks, lw_shared_ptr<keyspace_metadata> metadata) const;
static future<> modify_keyspace_on_all_shards(sharded<database>& sharded_db, std::function<future<>(replica::database&)> func);
future<> create_keyspace(const lw_shared_ptr<keyspace_metadata>&, locator::effective_replication_map_factory& erm_factory, system_keyspace system);
future<> remove(table&) noexcept;
void drop_keyspace(const sstring& name);
future<> update_keyspace(const keyspace_metadata& tmp_ksm);
static future<> modify_keyspace_on_all_shards(sharded<database>& sharded_db, std::function<future<>(replica::database&)> func, std::function<future<>(replica::database&)> notifier);
future<> foreach_reader_concurrency_semaphore(std::function<future<>(reader_concurrency_semaphore&)> func);
friend class ::sigquit_handler; // wants access to all semaphores to dump diagnostics
public:
void insert_keyspace(std::unique_ptr<keyspace> ks);
void update_keyspace(std::unique_ptr<keyspace_change> change);
void drop_keyspace(const sstring& name);
static table_schema_version empty_version;
query::result_memory_limiter& get_result_memory_limiter() {
@@ -1760,33 +1735,27 @@ public:
schema_ptr table, bool write_in_user_memory, locator::effective_replication_map_factory&);
void init_schema_commitlog();
using is_new_cf = bool_class<struct is_new_cf_tag>;
void add_column_family(keyspace& ks, schema_ptr schema, column_family::config cfg, is_new_cf is_new, locator::token_metadata_ptr not_commited_new_metadata = nullptr);
future<> make_column_family_directory(schema_ptr schema);
future<> add_column_family_and_make_directory(schema_ptr schema, is_new_cf is_new);
/* throws no_such_column_family if missing */
table_id find_uuid(std::string_view ks, std::string_view cf) const;
table_id find_uuid(const schema_ptr&) const;
using created_keyspace_per_shard = std::vector<seastar::foreign_ptr<std::unique_ptr<keyspace>>>;
using keyspace_change_per_shard = std::vector<seastar::foreign_ptr<std::unique_ptr<keyspace_change>>>;
/**
* Creates a keyspace for a given metadata if it still doesn't exist.
*
* @return ready future when the operation is complete
*/
static future<created_keyspace_per_shard> prepare_create_keyspace_on_all_shards(sharded<database>& sharded_db, sharded<service::storage_proxy>& proxy, const keyspace_metadata& ksm);
static future<> create_keyspace_on_all_shards(sharded<database>& sharded_db, sharded<service::storage_proxy>& proxy, const keyspace_metadata& ksm);
/* below, find_keyspace throws no_such_<type> on fail */
keyspace& find_keyspace(std::string_view name);
const keyspace& find_keyspace(std::string_view name) const;
bool has_keyspace(std::string_view name) const;
void validate_keyspace_update(keyspace_metadata& ksm);
void validate_new_keyspace(keyspace_metadata& ksm);
static future<keyspace_change_per_shard> prepare_update_keyspace_on_all_shards(sharded<database>& sharded_db, const keyspace_metadata& ksm);
static future<> update_keyspace_on_all_shards(sharded<database>& sharded_db, const keyspace_metadata& ksm);
static future<> drop_keyspace_on_all_shards(sharded<database>& sharded_db, const sstring& name);
std::vector<sstring> get_non_system_keyspaces() const;
std::vector<sstring> get_user_keyspaces() const;
std::vector<sstring> get_all_keyspaces() const;
@@ -1945,31 +1914,20 @@ public:
bool update_column_family(schema_ptr s);
private:
keyspace::config make_keyspace_config(const keyspace_metadata& ksm, system_keyspace is_system);
future<> add_column_family(keyspace& ks, schema_ptr schema, column_family::config cfg, is_new_cf is_new);
future<> detach_column_family(table& cf);
struct table_truncate_state;
static future<> truncate_table_on_all_shards(sharded<database>& db, sharded<db::system_keyspace>& sys_ks, const global_table_ptr&, std::optional<db_clock::time_point> truncated_at_opt, bool with_snapshot, std::optional<sstring> snapshot_name_opt);
future<> truncate(db::system_keyspace& sys_ks, column_family& cf, std::vector<lw_shared_ptr<replica::table>>& views, const table_truncate_state&);
future<> truncate(db::system_keyspace& sys_ks, column_family& cf, const table_truncate_state&);
public:
/** Truncates the given column family */
// If truncated_at_opt is not given, it is set to db_clock::now right after flush/clear.
static future<> truncate_table_on_all_shards(sharded<database>& db, sharded<db::system_keyspace>& sys_ks, sstring ks_name, sstring cf_name, std::optional<db_clock::time_point> truncated_at_opt = {}, bool with_snapshot = true, std::optional<sstring> snapshot_name_opt = {});
static future<tables_metadata_lock_on_all_shards> prepare_tables_metadata_change_on_all_shards(sharded<database>& sharded_db);
// Drops the table and removes the table directory if there are no snapshots,
// it's executed in 3 steps: prepare, drop and cleanup so that the middle step
// (actual drop) could be atomic.
static future<global_table_ptr> prepare_drop_table_on_all_shards(sharded<database>& sharded_db, table_id uuid);
// drop_table should be called on all shards
static void drop_table(sharded<database>& sharded_db,
sstring ks_name, sstring cf_name,
bool with_snapshot, global_table_ptr& table_shards);
static future<> cleanup_drop_table_on_all_shards(sharded<database>& sharded_db,
sharded<db::system_keyspace>& sys_ks,
bool with_snapshot, global_table_ptr& table_shards);
static future<> legacy_drop_table_on_all_shards(sharded<database>& sharded_db, sharded<db::system_keyspace>& sys_ks,
sstring ks_name, sstring cf_name,
bool with_snapshot = true);
// drops the table on all shards and removes the table directory if there are no snapshots
static future<> drop_table_on_all_shards(sharded<database>& db, sharded<db::system_keyspace>& sys_ks, sstring ks_name, sstring cf_name, bool with_snapshot = true);
const dirty_memory_manager_logalloc::region_group& dirty_memory_region_group() const {
return _dirty_memory_manager.region_group();

View File

@@ -8,10 +8,6 @@
#pragma once
#include <memory>
#include <vector>
#include <seastar/core/sharded.hh>
namespace replica {
// replica/database.hh
@@ -20,7 +16,7 @@ class keyspace;
class table;
using column_family = table;
class memtable_list;
class keyspace_change;
}

View File

@@ -18,17 +18,13 @@ class table;
class global_table_ptr {
std::vector<foreign_ptr<lw_shared_ptr<table>>> _p;
std::vector<foreign_ptr<lw_shared_ptr<table>>> _base; // relevant if _p is view or index
std::vector<foreign_ptr<std::unique_ptr<std::vector<lw_shared_ptr<table>>>>> _views;
public:
global_table_ptr();
global_table_ptr(global_table_ptr&&) noexcept = default;
void assign(database& db, table_id uuid);
global_table_ptr(global_table_ptr&&) noexcept;
~global_table_ptr();
void assign(table& t);
table* operator->() const noexcept;
table& operator*() const noexcept;
std::vector<lw_shared_ptr<table>>& views() const noexcept;
void clear_views() noexcept;
table& base() const noexcept;
auto as_sharded_parameter() {
return sharded_parameter([this] { return std::ref(**this); });
}

View File

@@ -1,56 +0,0 @@
/*
* Copyright (C) 2024-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "replica/schema_describe_helper.hh"
#include "data_dictionary/data_dictionary.hh"
#include "index/secondary_index_manager.hh"
#include "schema/schema.hh"
#include "view_info.hh"
#include "replica/database.hh"
namespace replica {
::schema_describe_helper make_schema_describe_helper(schema_ptr schema, const data_dictionary::database& db) {
::schema_describe_helper h{
.type = ::schema_describe_helper::type::table,
};
if (schema->is_view()) {
auto table = db.find_column_family(schema->view_info()->base_id());
h.base_schema = table.schema();
const auto& mgr = table.get_index_manager();
if (mgr.is_index(*schema)) {
h.type = ::schema_describe_helper::type::index;
h.is_global_index = mgr.is_global_index(*schema);
h.custom_index_class = mgr.custom_index_class(*schema);
} else {
h.type = ::schema_describe_helper::type::view;
}
}
return h;
}
::schema_describe_helper make_schema_describe_helper(const global_table_ptr& table_shards) {
::schema_describe_helper h{
.type = ::schema_describe_helper::type::table,
};
auto& schema = *table_shards->schema();
if (schema.is_view()) {
h.base_schema = table_shards.base().schema();
const auto& mgr = table_shards->get_index_manager();
if (mgr.is_index(schema)) {
h.type = ::schema_describe_helper::type::index;
h.is_global_index = mgr.is_global_index(schema);
h.custom_index_class = mgr.custom_index_class(schema);
} else {
h.type = ::schema_describe_helper::type::view;
}
}
return h;
}
} // namespace replica

View File

@@ -9,14 +9,33 @@
#pragma once
#include "data_dictionary/data_dictionary.hh"
#include "index/secondary_index_manager.hh"
#include "schema/schema.hh"
#include "replica/global_table_ptr.hh"
#include <seastar/core/sstring.hh>
#include <optional>
namespace replica {
::schema_describe_helper make_schema_describe_helper(schema_ptr schema, const data_dictionary::database& db);
::schema_describe_helper make_schema_describe_helper(const global_table_ptr& table_shards);
class schema_describe_helper : public ::schema_describe_helper {
data_dictionary::database _db;
public:
explicit schema_describe_helper(data_dictionary::database db) : _db(db) { }
} // namespace replica
virtual bool is_global_index(const table_id& base_id, const schema& view_s) const override {
return _db.find_column_family(base_id).get_index_manager().is_global_index(view_s);
}
virtual bool is_index(const table_id& base_id, const schema& view_s) const override {
return _db.find_column_family(base_id).get_index_manager().is_index(view_s);
}
virtual std::optional<sstring> custom_index_class(const table_id& base_id, const schema& view_s) const override {
return _db.find_column_family(base_id).get_index_manager().custom_index_class(view_s);
}
virtual schema_ptr find_schema(const table_id& id) const override {
return _db.find_schema(id);
}
};
} // namespace data_dictionary

View File

@@ -2906,10 +2906,9 @@ table::seal_snapshot(sstring jsondir, std::vector<snapshot_file_set> file_sets)
co_await io_check(sync_directory, std::move(jsondir));
}
future<> table::write_schema_as_cql(const global_table_ptr& table_shards, sstring dir) const {
auto schema_desc = schema()->describe(
replica::make_schema_describe_helper(table_shards),
cql3::describe_option::STMTS);
future<> table::write_schema_as_cql(database& db, sstring dir) const {
replica::schema_describe_helper describe_helper{db.as_data_dictionary()};
auto schema_desc = this->schema()->describe(describe_helper, cql3::describe_option::STMTS);
auto schema_description = std::move(*schema_desc.create_statement);
auto schema_file_name = dir + "/schema.cql";
@@ -2959,7 +2958,7 @@ future<> table::snapshot_on_all_shards(sharded<database>& sharded_db, const glob
});
co_await io_check(sync_directory, jsondir);
co_await t.finalize_snapshot(table_shards, std::move(jsondir), std::move(file_sets));
co_await t.finalize_snapshot(sharded_db.local(), std::move(jsondir), std::move(file_sets));
});
}
@@ -2980,11 +2979,11 @@ future<table::snapshot_file_set> table::take_snapshot(sstring jsondir) {
co_return make_foreign(std::move(table_names));
}
future<> table::finalize_snapshot(const global_table_ptr& table_shards, sstring jsondir, std::vector<snapshot_file_set> file_sets) {
future<> table::finalize_snapshot(database& db, sstring jsondir, std::vector<snapshot_file_set> file_sets) {
std::exception_ptr ex;
tlogger.debug("snapshot {}: writing schema.cql", jsondir);
co_await write_schema_as_cql(table_shards, jsondir).handle_exception([&] (std::exception_ptr ptr) {
co_await write_schema_as_cql(db, jsondir).handle_exception([&] (std::exception_ptr ptr) {
tlogger.error("Failed writing schema file in snapshot in {} with exception {}", jsondir, ptr);
ex = std::move(ptr);
});

View File

@@ -1,25 +0,0 @@
/*
* Copyright (C) 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include <vector>
#include <seastar/core/sharded.hh>
#include <seastar/core/rwlock.hh>
#include <seastar/core/smp.hh>
namespace replica {
class tables_metadata_lock_on_all_shards {
std::vector<seastar::foreign_ptr<std::unique_ptr<seastar::rwlock::holder>>> _holders;
public:
tables_metadata_lock_on_all_shards() : _holders(seastar::smp::count) {};
void assign_lock(seastar::rwlock::holder&& h);
};
} // namespace replica

View File

@@ -1008,9 +1008,9 @@ sstring schema::get_create_statement(const schema_describe_helper& helper, bool
int n = 0;
if (is_view()) {
if (helper.type == schema_describe_helper::type::index) {
auto is_local = !helper.is_global_index;
auto custom_index_class = helper.custom_index_class;
if (helper.is_index(view_info()->base_id(), *this)) {
auto is_local = !helper.is_global_index(view_info()->base_id(), *this);
auto custom_index_class = helper.custom_index_class(view_info()->base_id(), *this);
if (custom_index_class) {
os << "CUSTOM ";
@@ -1019,7 +1019,7 @@ sstring schema::get_create_statement(const schema_describe_helper& helper, bool
os << "INDEX " << cql3::util::maybe_quote(secondary_index::index_name_from_table_name(cf_name())) << " ON "
<< cql3::util::maybe_quote(ks_name()) << "." << cql3::util::maybe_quote(view_info()->base_name());
describe_index_columns(os, is_local, *this, helper.base_schema);
describe_index_columns(os, is_local, *this, helper.find_schema(view_info()->base_id()));
if (custom_index_class) {
os << " USING '" << *custom_index_class << "'";
@@ -1142,14 +1142,12 @@ sstring schema::get_create_statement(const schema_describe_helper& helper, bool
cql3::description schema::describe(const schema_describe_helper& helper, cql3::describe_option desc_opt) const {
const sstring type = std::invoke([&] {
switch (helper.type) {
case schema_describe_helper::type::view:
return "view";
case schema_describe_helper::type::index:
return "index";
case schema_describe_helper::type::table:
return "table";
if (is_view()) {
return helper.is_index(view_info()->base_id(), *this)
? "index"
: "view";
}
return "table";
});
return cql3::description {
@@ -1191,7 +1189,7 @@ std::ostream& schema::schema_properties(const schema_describe_helper& helper, st
for (auto& [type, ext] : extensions()) {
os << "\n AND " << type << " = " << ext->options_to_string();
}
if (helper.type == schema_describe_helper::type::view) {
if (is_view() && !helper.is_index(view_info()->base_id(), *this)) {
auto is_sync_update = db::find_tag(*this, db::SYNCHRONOUS_VIEW_UPDATES_TAG_KEY);
if (is_sync_update.has_value()) {
os << "\n AND synchronous_updates = " << *is_sync_update;
@@ -1202,16 +1200,14 @@ std::ostream& schema::schema_properties(const schema_describe_helper& helper, st
std::ostream& schema::describe_alter_with_properties(const schema_describe_helper& helper, std::ostream& os) const {
os << "ALTER ";
switch (helper.type) {
case schema_describe_helper::type::view:
if (is_view()) {
if (helper.is_index(view_info()->base_id(), *this)) {
on_internal_error(dblog, "ALTER statement is not supported for index");
}
os << "MATERIALIZED VIEW ";
break;
case schema_describe_helper::type::index:
on_internal_error(dblog, "ALTER statement is not supported for index");
break;
case schema_describe_helper::type::table:
} else {
os << "TABLE ";
break;
}
os << cql3::util::maybe_quote(ks_name()) << "." << cql3::util::maybe_quote(cf_name()) << " WITH ";
schema_properties(helper, os);

View File

@@ -502,12 +502,13 @@ struct schema_static_props {
bool is_group0_table = false; // the table is a group 0 table
};
struct schema_describe_helper {
enum class type { table, view, index };
type type;
bool is_global_index;
std::optional<sstring> custom_index_class;
schema_ptr base_schema; // relevant for view and index
class schema_describe_helper {
public:
virtual bool is_global_index(const table_id& base_id, const schema& view_s) const = 0;
virtual bool is_index(const table_id& base_id, const schema& view_s) const = 0;
virtual std::optional<sstring> custom_index_class(const table_id& base_id, const schema& view_s) const = 0;
virtual schema_ptr find_schema(const table_id& id) const = 0;
virtual ~schema_describe_helper() = default;
};
/*

View File

@@ -62,6 +62,7 @@ public:
virtual void on_update_function(const sstring& ks_name, const sstring& function_name) = 0;
virtual void on_update_aggregate(const sstring& ks_name, const sstring& aggregate_name) = 0;
virtual void on_update_view(const sstring& ks_name, const sstring& view_name, bool columns_changed) = 0;
virtual void on_update_tablet_metadata(const locator::tablet_metadata_change_hint&) = 0;
// The callback runs inside seastar thread
virtual void on_drop_keyspace(const sstring& ks_name) = 0;
@@ -101,6 +102,7 @@ public:
void on_update_user_type(const sstring& ks_name, const sstring& type_name) override {}
void on_update_function(const sstring& ks_name, const sstring& function_name) override {}
void on_update_aggregate(const sstring& ks_name, const sstring& aggregate_name) override {}
void on_update_tablet_metadata(const locator::tablet_metadata_change_hint&) override {}
void on_drop_keyspace(const sstring& ks_name) override {}
void on_drop_column_family(const sstring& ks_name, const sstring& cf_name) override {}
@@ -128,15 +130,16 @@ public:
/// Unregister a migration listener on current shard.
future<> unregister_listener(migration_listener* listener);
future<> create_keyspace(const sstring& ks_name);
future<> create_keyspace(lw_shared_ptr<keyspace_metadata> ksm);
future<> create_column_family(schema_ptr cfm);
future<> create_user_type(user_type type);
future<> create_view(view_ptr view);
future<> update_keyspace(const sstring& ks_name);
future<> update_keyspace(lw_shared_ptr<keyspace_metadata> ksm);
future<> update_column_family(schema_ptr cfm, bool columns_changed);
future<> update_user_type(user_type type);
future<> update_view(view_ptr view, bool columns_changed);
future<> drop_keyspace(const sstring& ks_name);
future<> update_tablet_metadata(locator::tablet_metadata_change_hint);
future<> drop_keyspace(sstring ks_name);
future<> drop_column_family(schema_ptr cfm);
future<> drop_user_type(user_type type);
future<> drop_view(view_ptr view);

View File

@@ -51,7 +51,7 @@ const std::chrono::milliseconds migration_manager::migration_delay = 60000ms;
static future<schema_ptr> get_schema_definition(table_schema_version v, locator::host_id dst, unsigned shard, netw::messaging_service& ms, service::storage_proxy& sp);
migration_manager::migration_manager(migration_notifier& notifier, gms::feature_service& feat, netw::messaging_service& ms,
service::storage_proxy& storage_proxy, sharded<service::storage_service>& ss, gms::gossiper& gossiper, service::raft_group0_client& group0_client, sharded<db::system_keyspace>& sysks) :
service::storage_proxy& storage_proxy, gms::gossiper& gossiper, service::raft_group0_client& group0_client, sharded<db::system_keyspace>& sysks) :
_notifier(notifier)
, _group0_barrier(this_shard_id() == 0 ?
std::function<future<>()>([this] () -> future<> {
@@ -66,7 +66,7 @@ migration_manager::migration_manager(migration_notifier& notifier, gms::feature_
})
)
, _background_tasks("migration_manager::background_tasks")
, _feat(feat), _messaging(ms), _storage_proxy(storage_proxy), _ss(ss), _gossiper(gossiper), _group0_client(group0_client)
, _feat(feat), _messaging(ms), _storage_proxy(storage_proxy), _gossiper(gossiper), _group0_client(group0_client)
, _sys_ks(sysks)
, _schema_push([this] { return passive_announce(); })
, _concurrent_ddl_retries{10}
@@ -400,13 +400,13 @@ future<> migration_manager::merge_schema_from(locator::host_id src, const std::v
return make_exception_future<>(std::make_exception_ptr<std::runtime_error>(
std::runtime_error(fmt::format("Error while applying schema mutations: {}", e))));
}
return db::schema_tables::merge_schema(_sys_ks, proxy.container(), _ss, _feat, std::move(mutations));
return db::schema_tables::merge_schema(_sys_ks, proxy.container(), _feat, std::move(mutations));
}
future<> migration_manager::reload_schema() {
mlogger.info("Reloading schema");
std::vector<mutation> mutations;
return db::schema_tables::merge_schema(_sys_ks, _storage_proxy.container(), _ss, _feat, std::move(mutations), true);
return db::schema_tables::merge_schema(_sys_ks, _storage_proxy.container(), _feat, std::move(mutations), true);
}
bool migration_manager::has_compatible_schema_tables_version(const locator::host_id& endpoint) {
@@ -436,11 +436,12 @@ future<> migration_notifier::on_schema_change(std::function<void(migration_liste
});
}
future<> migration_notifier::create_keyspace(const sstring& ks_name) {
future<> migration_notifier::create_keyspace(lw_shared_ptr<keyspace_metadata> ksm) {
const auto& name = ksm->name();
co_await on_schema_change([&] (migration_listener* listener) {
listener->on_create_keyspace(ks_name);
listener->on_create_keyspace(name);
}, [&] (std::exception_ptr ex) {
return fmt::format("Create keyspace notification failed {}: {}", ks_name, ex);
return fmt::format("Create keyspace notification failed {}: {}", name, ex);
});
}
@@ -474,11 +475,12 @@ future<> migration_notifier::create_view(view_ptr view) {
});
}
future<> migration_notifier::update_keyspace(const sstring& ks_name) {
future<> migration_notifier::update_keyspace(lw_shared_ptr<keyspace_metadata> ksm) {
const auto& name = ksm->name();
co_await on_schema_change([&] (migration_listener* listener) {
listener->on_update_keyspace(ks_name);
listener->on_update_keyspace(name);
}, [&] (std::exception_ptr ex) {
return fmt::format("Update keyspace notification failed {}: {}", ks_name, ex);
return fmt::format("Update keyspace notification failed {}: {}", name, ex);
});
}
@@ -512,7 +514,15 @@ future<> migration_notifier::update_view(view_ptr view, bool columns_changed) {
});
}
future<> migration_notifier::drop_keyspace(const sstring& ks_name) {
future<> migration_notifier::update_tablet_metadata(locator::tablet_metadata_change_hint hint) {
return seastar::async([this, hint = std::move(hint)] {
_listeners.thread_for_each([&hint] (migration_listener* listener) {
listener->on_update_tablet_metadata(hint);
});
});
}
future<> migration_notifier::drop_keyspace(sstring ks_name) {
co_await on_schema_change([&] (migration_listener* listener) {
listener->on_drop_keyspace(ks_name);
}, [&] (std::exception_ptr ex) {
@@ -914,7 +924,7 @@ future<> migration_manager::announce_with_raft(std::vector<mutation> schema, gro
}
future<> migration_manager::announce_without_raft(std::vector<mutation> schema, group0_guard guard) {
auto f = db::schema_tables::merge_schema(_sys_ks, _storage_proxy.container(), _ss, _feat, schema);
auto f = db::schema_tables::merge_schema(_sys_ks, _storage_proxy.container(), _feat, schema);
try {
using namespace std::placeholders;

View File

@@ -18,7 +18,6 @@
#include "gms/feature.hh"
#include "gms/i_endpoint_state_change_subscriber.hh"
#include "schema/schema_fwd.hh"
#include "service/storage_service.hh"
#include "utils/serialized_action.hh"
#include "service/raft/raft_group_registry.hh"
#include "service/raft/raft_group0_client.hh"
@@ -63,7 +62,6 @@ private:
gms::feature_service& _feat;
netw::messaging_service& _messaging;
service::storage_proxy& _storage_proxy;
sharded<service::storage_service>& _ss;
gms::gossiper& _gossiper;
seastar::abort_source _as;
service::raft_group0_client& _group0_client;
@@ -89,7 +87,7 @@ private:
friend class group0_state_machine; // needed for access to _messaging
size_t _concurrent_ddl_retries;
public:
migration_manager(migration_notifier&, gms::feature_service&, netw::messaging_service& ms, service::storage_proxy&, sharded<service::storage_service>& ss, gms::gossiper& gossiper, service::raft_group0_client& group0_client, sharded<db::system_keyspace>& sysks);
migration_manager(migration_notifier&, gms::feature_service&, netw::messaging_service& ms, service::storage_proxy&, gms::gossiper& gossiper, service::raft_group0_client& group0_client, sharded<db::system_keyspace>& sysks);
migration_notifier& get_notifier() { return _notifier; }
const migration_notifier& get_notifier() const { return _notifier; }

View File

@@ -5536,26 +5536,24 @@ future<> storage_service::keyspace_changed(const sstring& ks_name) {
return update_topology_change_info(reason, acquire_merge_lock::no);
}
future<locator::mutable_token_metadata_ptr> storage_service::prepare_tablet_metadata(const locator::tablet_metadata_change_hint& hint) {
SCYLLA_ASSERT(this_shard_id() == 0);
auto tmptr = co_await get_mutable_token_metadata_ptr();
if (hint) {
co_await replica::update_tablet_metadata(_db.local(), _qp, tmptr->tablets(), hint);
} else {
tmptr->set_tablets(co_await replica::read_tablet_metadata(_qp));
void storage_service::on_update_tablet_metadata(const locator::tablet_metadata_change_hint& hint) {
if (this_shard_id() != 0) {
// replicate_to_all_cores() takes care of other shards.
return;
}
tmptr->tablets().set_balancing_enabled(_topology_state_machine._topology.tablet_balancing_enabled);
co_return tmptr;
load_tablet_metadata(hint).get();
_topology_state_machine.event.broadcast(); // wake up load balancer.
}
future<> storage_service::commit_tablet_metadata(locator::mutable_token_metadata_ptr tmptr) {
co_await replicate_to_all_cores(std::move(tmptr));
_topology_state_machine.event.broadcast();
}
future<> storage_service::update_tablet_metadata(const locator::tablet_metadata_change_hint& hint) {
co_await commit_tablet_metadata(
co_await prepare_tablet_metadata(hint));
future<> storage_service::load_tablet_metadata(const locator::tablet_metadata_change_hint& hint) {
return mutate_token_metadata([this, &hint] (mutable_token_metadata_ptr tmptr) -> future<> {
if (hint) {
co_await replica::update_tablet_metadata(_db.local(), _qp, tmptr->tablets(), hint);
} else {
tmptr->set_tablets(co_await replica::read_tablet_metadata(_qp));
}
tmptr->tablets().set_balancing_enabled(_topology_state_machine._topology.tablet_balancing_enabled);
}, acquire_merge_lock::no);
}
future<> storage_service::process_tablet_split_candidate(table_id table) noexcept {

View File

@@ -249,10 +249,7 @@ public:
future<> uninit_messaging_service();
// If a hint is provided, only the changed parts of the tablet metadata will be (re)loaded.
future<locator::mutable_token_metadata_ptr> prepare_tablet_metadata(const locator::tablet_metadata_change_hint& hint);
future<> commit_tablet_metadata(locator::mutable_token_metadata_ptr tmptr);
future<> update_tablet_metadata(const locator::tablet_metadata_change_hint& hint);
future<> load_tablet_metadata(const locator::tablet_metadata_change_hint& hint);
void start_tablet_split_monitor();
private:
using acquire_merge_lock = bool_class<class acquire_merge_lock_tag>;
@@ -541,6 +538,7 @@ public:
virtual void on_update_function(const sstring& ks_name, const sstring& function_name) override {}
virtual void on_update_aggregate(const sstring& ks_name, const sstring& aggregate_name) override {}
virtual void on_update_view(const sstring& ks_name, const sstring& view_name, bool columns_changed) override {}
virtual void on_update_tablet_metadata(const locator::tablet_metadata_change_hint&) override;
virtual void on_drop_keyspace(const sstring& ks_name) override { keyspace_changed(ks_name).get(); }
virtual void on_drop_column_family(const sstring& ks_name, const sstring& cf_name) override {}

View File

@@ -4354,10 +4354,11 @@ SEASTAR_TEST_CASE(test_describe_simple_schema) {
"country_code int,"
"number text)"
).get();
replica::schema_describe_helper describe_helper{e.data_dictionary()};
for (auto &&ct : cql_create_tables) {
e.execute_cql(ct.second).get();
auto schema = e.local_db().find_schema("ks", ct.first);
auto schema_desc = schema->describe(replica::make_schema_describe_helper(schema, e.data_dictionary()), cql3::describe_option::STMTS);
auto schema_desc = schema->describe(describe_helper, cql3::describe_option::STMTS);
BOOST_CHECK_EQUAL(normalize_white_space(*schema_desc.create_statement), normalize_white_space(ct.second));
}
@@ -4420,15 +4421,18 @@ SEASTAR_TEST_CASE(test_describe_view_schema) {
e.execute_cql("CREATE KEYSPACE \"KS\" WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3}").get();
e.execute_cql(base_table).get();
replica::schema_describe_helper describe_helper{e.data_dictionary()};
for (auto &&ct : cql_create_tables) {
e.execute_cql(ct.second).get();
auto schema = e.local_db().find_schema("KS", ct.first);
auto schema_desc = schema->describe(replica::make_schema_describe_helper(schema, e.data_dictionary()), cql3::describe_option::STMTS);
auto schema_desc = schema->describe(describe_helper, cql3::describe_option::STMTS);
BOOST_CHECK_EQUAL(normalize_white_space(*schema_desc.create_statement), normalize_white_space(ct.second));
auto base_schema = e.local_db().find_schema("KS", "cF");
auto base_schema_desc = base_schema->describe(replica::make_schema_describe_helper(base_schema, e.data_dictionary()), cql3::describe_option::STMTS);
auto base_schema_desc = base_schema->describe(describe_helper, cql3::describe_option::STMTS);
BOOST_CHECK_EQUAL(normalize_white_space(*base_schema_desc.create_statement), normalize_white_space(base_table));
}

View File

@@ -849,7 +849,7 @@ SEASTAR_TEST_CASE(clear_multiple_snapshots) {
// existing snapshots expected to remain after dropping the table
testlog.debug("Dropping table {}.{}", ks_name, table_name);
replica::database::legacy_drop_table_on_all_shards(e.db(), e.get_system_keyspace(), ks_name, table_name).get();
replica::database::drop_table_on_all_shards(e.db(), e.get_system_keyspace(), ks_name, table_name).get();
BOOST_REQUIRE_EQUAL(fs::exists(snapshots_dir / snapshot_name(num_snapshots)), true);
// clear all tags
@@ -1516,7 +1516,7 @@ SEASTAR_TEST_CASE(database_drop_column_family_clears_querier_cache) {
s->full_slice(),
nullptr);
auto f = replica::database::legacy_drop_table_on_all_shards(e.db(), e.get_system_keyspace(), "ks", "cf");
auto f = replica::database::drop_table_on_all_shards(e.db(), e.get_system_keyspace(), "ks", "cf");
// we add a querier to the querier cache while the drop is ongoing
auto& qc = db.get_querier_cache();
@@ -1545,7 +1545,7 @@ static future<> test_drop_table_with_auto_snapshot(bool auto_snapshot) {
// Pass `with_snapshot=true` to drop_table_on_all
// to allow auto_snapshot (based on the configuration above).
// The table directory should therefore exist after the table is dropped if auto_snapshot is disabled in the configuration.
co_await replica::database::legacy_drop_table_on_all_shards(e.db(), e.get_system_keyspace(), ks_name, table_name, true);
co_await replica::database::drop_table_on_all_shards(e.db(), e.get_system_keyspace(), ks_name, table_name, true);
auto cf_dir_exists = co_await file_exists(cf_dir);
BOOST_REQUIRE_EQUAL(cf_dir_exists, auto_snapshot);
co_return;
@@ -1570,7 +1570,7 @@ SEASTAR_TEST_CASE(drop_table_with_no_snapshot) {
// Pass `with_snapshot=false` to drop_table_on_all
// to disallow auto_snapshot.
// The table directory should therefore not exist after the table is dropped.
co_await replica::database::legacy_drop_table_on_all_shards(e.db(), e.get_system_keyspace(), ks_name, table_name, false);
co_await replica::database::drop_table_on_all_shards(e.db(), e.get_system_keyspace(), ks_name, table_name, false);
auto cf_dir_exists = co_await file_exists(cf_dir);
BOOST_REQUIRE_EQUAL(cf_dir_exists, false);
co_return;
@@ -1589,7 +1589,7 @@ SEASTAR_TEST_CASE(drop_table_with_explicit_snapshot) {
// With explicit snapshot and with_snapshot=false
// dir should still be kept, regardless of the
// with_snapshot parameter and auto_snapshot config.
co_await replica::database::legacy_drop_table_on_all_shards(e.db(), e.get_system_keyspace(), ks_name, table_name, false);
co_await replica::database::drop_table_on_all_shards(e.db(), e.get_system_keyspace(), ks_name, table_name, false);
auto cf_dir_exists = co_await file_exists(cf_dir);
BOOST_REQUIRE_EQUAL(cf_dir_exists, true);
co_return;

View File

@@ -560,6 +560,7 @@ public:
int update_function_count = 0;
int update_aggregate_count = 0;
int update_view_count = 0;
int update_tablets = 0;
int drop_keyspace_count = 0;
int drop_column_family_count = 0;
int drop_user_type_count = 0;
@@ -582,6 +583,7 @@ public:
virtual void on_update_function(const sstring&, const sstring&) override { ++update_function_count; }
virtual void on_update_aggregate(const sstring&, const sstring&) override { ++update_aggregate_count; }
virtual void on_update_view(const sstring&, const sstring&, bool) override { ++update_view_count; }
virtual void on_update_tablet_metadata(const locator::tablet_metadata_change_hint&) override { ++update_tablets; }
virtual void on_drop_keyspace(const sstring&) override { ++drop_keyspace_count; }
virtual void on_drop_column_family(const sstring&, const sstring&) override { ++drop_column_family_count; }
virtual void on_drop_user_type(const sstring&, const sstring&) override { ++drop_user_type_count; }

View File

@@ -174,7 +174,7 @@ SEASTAR_THREAD_TEST_CASE(test_table_is_attached) {
auto sm0 = db::schema_tables::make_schema_mutations(s0, api::new_timestamp(), true);
std::vector<mutation> muts;
sm0.copy_to(muts);
db::schema_tables::merge_schema(e.get_system_keyspace(), e.get_storage_proxy(), e.get_storage_service(),
db::schema_tables::merge_schema(e.get_system_keyspace(), e.get_storage_proxy(),
e.get_feature_service().local(), muts).get();
}

View File

@@ -1424,7 +1424,7 @@ future<> handle_resize_finalize(cql_test_env& e, group0_guard& guard, const migr
if (changed) {
// Need to reload on each resize because table object expects tablet count to change by a factor of 2.
co_await save_tablet_metadata(e.local_db(), stm.get()->tablets(), guard.write_timestamp());
co_await e.get_storage_service().local().update_tablet_metadata({});
co_await e.get_storage_service().local().load_tablet_metadata({});
// Need a new guard to make sure later changes use later timestamp.
release_guard(std::move(guard));
@@ -1549,7 +1549,7 @@ void rebalance_tablets(cql_test_env& e,
// causing test flakiness.
auto& stm = e.shared_token_metadata().local();
save_tablet_metadata(e.local_db(), stm.get()->tablets(), guard.write_timestamp()).get();
e.get_storage_service().local().update_tablet_metadata({}).get();
e.get_storage_service().local().load_tablet_metadata({}).get();
testlog.debug("rebalance_tablets(): done");
}

View File

@@ -871,7 +871,7 @@ private:
// gropu0 client exists only on shard 0
service::raft_group0_client group0_client(_group0_registry.local(), _sys_ks.local(), _token_metadata.local(), maintenance_mode_enabled::no);
_mm.start(std::ref(_mnotifier), std::ref(_feature_service), std::ref(_ms), std::ref(_proxy), std::ref(_ss), std::ref(_gossiper), std::ref(group0_client), std::ref(_sys_ks)).get();
_mm.start(std::ref(_mnotifier), std::ref(_feature_service), std::ref(_ms), std::ref(_proxy), std::ref(_gossiper), std::ref(group0_client), std::ref(_sys_ks)).get();
auto stop_mm = defer_verbose_shutdown("migration manager", [this] { _mm.stop().get(); });
_tablet_allocator.start(service::tablet_allocator::config{}, std::ref(_mnotifier), std::ref(_db)).get();

View File

@@ -1152,9 +1152,8 @@ future<> random_schema::create_with_cql(cql_test_env& env) {
auto& db = env.local_db();
auto schema_desc = _schema->describe(
replica::make_schema_describe_helper(_schema, db.as_data_dictionary()),
cql3::describe_option::STMTS);
replica::schema_describe_helper describe_helper{db.as_data_dictionary()};
auto schema_desc = _schema->describe(describe_helper, cql3::describe_option::STMTS);
env.execute_cql(*schema_desc.create_statement).get();
auto& tbl = db.find_column_family(ks_name, tbl_name);

View File

@@ -179,8 +179,7 @@ rebalance_stats rebalance_tablets(cql_test_env& e, locator::load_stats_ptr load_
// as that may violate invariants and cause failures in later operations
// causing test flakiness.
save_tablet_metadata(e.local_db(), stm.get()->tablets(), guard.write_timestamp()).get();
e.get_storage_service().local().update_tablet_metadata({}).get();
e.get_storage_service().local().load_tablet_metadata({}).get();
testlog.info("Rebalance took {:.3f} [s] after {} iteration(s)", stats.elapsed_time.count(), i + 1);
return stats;
}

View File

@@ -481,9 +481,9 @@ schema_ptr do_load_schema_from_schema_tables(const db::config& dbcfg, std::files
const query::result_set_row& view_row = rs.row(0);
auto base_name = view_row.get_nonnull<sstring>("base_table_name");
auto base_schema = do_load_schema_from_schema_tables(dbcfg, scylla_data_path, keyspace, base_name);
return db::schema_tables::create_view_from_mutations(ctxt, muts, ctxt.user_types(), std::move(base_schema));
return db::schema_tables::create_view_from_mutations(ctxt, muts, std::move(base_schema));
} else {
return db::schema_tables::create_table_from_mutations(ctxt, muts, ctxt.user_types());
return db::schema_tables::create_table_from_mutations(ctxt, muts);
}
}

View File

@@ -2977,7 +2977,7 @@ void query_operation(schema_ptr sstable_schema, reader_permit permit, const std:
const auto table_name = schema->cf_name();
schema_describe_helper describe_helper = replica::make_schema_describe_helper(schema, db.as_data_dictionary());
replica::schema_describe_helper describe_helper{db.as_data_dictionary()};
const auto original_schema_description = sstable_schema->describe(describe_helper, cql3::describe_option::STMTS_AND_INTERNALS);
const auto schema_description = schema->describe(describe_helper, cql3::describe_option::STMTS_AND_INTERNALS);

View File

@@ -155,6 +155,8 @@ void cql_server::event_notifier::on_update_aggregate(const sstring& ks_name, con
elogger.warn("%s event ignored", __func__);
}
void cql_server::event_notifier::on_update_tablet_metadata(const locator::tablet_metadata_change_hint&) {}
void cql_server::event_notifier::on_drop_keyspace(const sstring& ks_name)
{
for (auto&& conn : _schema_change_listeners) {

View File

@@ -406,6 +406,7 @@ public:
virtual void on_update_view(const sstring& ks_name, const sstring& view_name, bool columns_changed) override;
virtual void on_update_function(const sstring& ks_name, const sstring& function_name) override;
virtual void on_update_aggregate(const sstring& ks_name, const sstring& aggregate_name) override;
virtual void on_update_tablet_metadata(const locator::tablet_metadata_change_hint&) override;
virtual void on_drop_keyspace(const sstring& ks_name) override;
virtual void on_drop_column_family(const sstring& ks_name, const sstring& cf_name) override;