view_info: set base info on construction
Currently, the base_info may or may not be set in view schemas.
Even when it's set, it may be modified. This necessitates extra
checks when handling view schemas, as well as potentially causing
errors when we forget to set it at some point.
Instead, we want to make the base info an immutable member of view
schemas (inside view_info). The first step towards that is making
sure that all newly created schemas have the base info set.
We achieve that by requiring a base schema when constructing a view
schema. Unfortunately, this adds complexity each time we're making
a view schema - we need to get the base schema as well.
In most cases, the base schema is already available. The most
problematic scenario is when we create a schema from mutations:
- when parsing system tables we can get the schema from the
database, as regular tables are parsed before views
- when loading a view schema using the schema loader tool, we need
to load the base additionally to the view schema, effectively
doubling the work
- when pulling the schema from another node - in this case we can
only get the current version of the base schema from the local
database
Additionally, we need to consider the base schema version - when
we generate view updates the version of the base schema used for
reads should match the version of the base schema in view's base
info.
This is achieved by selecting the correct (old or new) schema in
`db::schema_tables::merge_tables_and_views` and using the stored
base schema in the schema_registry.
(cherry picked from commit 900687c818)
This commit is contained in:
@@ -1482,7 +1482,7 @@ static future<executor::request_return_type> create_table_on_shard0(service::cli
|
||||
}
|
||||
}
|
||||
const bool include_all_columns = true;
|
||||
view_builder.with_view_info(*schema, include_all_columns, ""/*where clause*/);
|
||||
view_builder.with_view_info(schema, include_all_columns, ""/*where clause*/);
|
||||
}
|
||||
|
||||
// FIXME: the following needs to be in a loop. If mm.announce() below
|
||||
@@ -1758,7 +1758,7 @@ future<executor::request_return_type> executor::update_table(client_state& clien
|
||||
}
|
||||
}
|
||||
const bool include_all_columns = true;
|
||||
view_builder.with_view_info(*schema, include_all_columns, ""/*where clause*/);
|
||||
view_builder.with_view_info(schema, include_all_columns, ""/*where clause*/);
|
||||
new_views.emplace_back(view_builder.build());
|
||||
} else if (op == "Delete") {
|
||||
elogger.trace("Deleting GSI {}", index_name);
|
||||
|
||||
@@ -276,7 +276,7 @@ void alter_table_statement::drop_column(const query_options& options, const sche
|
||||
}
|
||||
}
|
||||
|
||||
std::pair<schema_builder, std::vector<view_ptr>> alter_table_statement::prepare_schema_update(data_dictionary::database db, const query_options& options) const {
|
||||
std::pair<schema_ptr, std::vector<view_ptr>> alter_table_statement::prepare_schema_update(data_dictionary::database db, const query_options& options) const {
|
||||
auto s = validation::validate_column_family(db, keyspace(), column_family());
|
||||
if (s->is_view()) {
|
||||
throw exceptions::invalid_request_exception("Cannot use ALTER TABLE on Materialized View");
|
||||
@@ -370,6 +370,9 @@ std::pair<schema_builder, std::vector<view_ptr>> alter_table_statement::prepare_
|
||||
validate_column_rename(db, *s, *from, *to);
|
||||
cfm.rename_column(from->name(), to->name());
|
||||
}
|
||||
// New view schemas contain the new column names, so we need to base them on the
|
||||
// new base schema.
|
||||
schema_ptr new_base_schema = cfm.build();
|
||||
// If the view includes a renamed column, it must be renamed in
|
||||
// the view table and the definition.
|
||||
for (auto&& view : cf.views()) {
|
||||
@@ -390,22 +393,21 @@ std::pair<schema_builder, std::vector<view_ptr>> alter_table_statement::prepare_
|
||||
view->view_info()->where_clause(),
|
||||
view_renames,
|
||||
cql3::dialect{});
|
||||
builder.with_view_info(view->view_info()->base_id(), view->view_info()->base_name(),
|
||||
view->view_info()->include_all_columns(), std::move(new_where));
|
||||
builder.with_view_info(new_base_schema, view->view_info()->include_all_columns(), std::move(new_where));
|
||||
view_updates.push_back(view_ptr(builder.build()));
|
||||
}
|
||||
}
|
||||
break;
|
||||
return make_pair(std::move(new_base_schema), std::move(view_updates));
|
||||
}
|
||||
|
||||
return make_pair(std::move(cfm), std::move(view_updates));
|
||||
return make_pair(cfm.build(), std::move(view_updates));
|
||||
}
|
||||
|
||||
future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>>
|
||||
alter_table_statement::prepare_schema_mutations(query_processor& qp, const query_options& options, api::timestamp_type ts) const {
|
||||
data_dictionary::database db = qp.db();
|
||||
auto [cfm, view_updates] = prepare_schema_update(db, options);
|
||||
auto m = co_await service::prepare_column_family_update_announcement(qp.proxy(), cfm.build(), std::move(view_updates), ts);
|
||||
auto [s, view_updates] = prepare_schema_update(db, options);
|
||||
auto m = co_await service::prepare_column_family_update_announcement(qp.proxy(), std::move(s), std::move(view_updates), ts);
|
||||
|
||||
using namespace cql_transport;
|
||||
auto ret = ::make_shared<event::schema_change>(
|
||||
|
||||
@@ -69,7 +69,7 @@ private:
|
||||
void add_column(const query_options& options, const schema& schema, data_dictionary::table cf, schema_builder& cfm, std::vector<view_ptr>& view_updates, const column_identifier& column_name, const cql3_type validator, const column_definition* def, bool is_static) const;
|
||||
void alter_column(const query_options& options, const schema& schema, data_dictionary::table cf, schema_builder& cfm, std::vector<view_ptr>& view_updates, const column_identifier& column_name, const cql3_type validator, const column_definition* def, bool is_static) const;
|
||||
void drop_column(const query_options& options, const schema& schema, data_dictionary::table cf, schema_builder& cfm, std::vector<view_ptr>& view_updates, const column_identifier& column_name, const cql3_type validator, const column_definition* def, bool is_static) const;
|
||||
std::pair<schema_builder, std::vector<view_ptr>> prepare_schema_update(data_dictionary::database db, const query_options& options) const;
|
||||
std::pair<schema_ptr, std::vector<view_ptr>> prepare_schema_update(data_dictionary::database db, const query_options& options) const;
|
||||
};
|
||||
|
||||
class alter_table_statement::raw_statement : public raw::cf_statement {
|
||||
|
||||
@@ -367,7 +367,7 @@ std::pair<view_ptr, cql3::cql_warnings_vec> create_view_statement::prepare_view(
|
||||
}
|
||||
|
||||
auto where_clause_text = util::relations_to_where_clause(_where_clause);
|
||||
builder.with_view_info(schema->id(), schema->cf_name(), included.empty(), std::move(where_clause_text));
|
||||
builder.with_view_info(schema, included.empty(), std::move(where_clause_text));
|
||||
|
||||
return std::make_pair(view_ptr(builder.build()), std::move(warnings));
|
||||
}
|
||||
|
||||
@@ -579,19 +579,23 @@ static future<> merge_tables_and_views(distributed<service::storage_proxy>& prox
|
||||
// 2. The table was just created - the table is guaranteed to be published with the view in that case.
|
||||
// 3. The view itself was altered - in that case we already know the base table so we can take it from
|
||||
// the database object.
|
||||
view_ptr vp = create_view_from_mutations(proxy, std::move(sm));
|
||||
query::result_set rs(sm.columnfamilies_mutation());
|
||||
const query::result_set_row& view_row = rs.row(0);
|
||||
auto ks_name = view_row.get_nonnull<sstring>("keyspace_name");
|
||||
auto base_name = view_row.get_nonnull<sstring>("base_table_name");
|
||||
|
||||
schema_ptr base_schema;
|
||||
for (auto&& altered : tables_diff.altered) {
|
||||
// Chose the appropriate version of the base table schema: old -> old, new -> new.
|
||||
schema_ptr s = side == schema_diff_side::left ? altered.old_schema : altered.new_schema;
|
||||
if (s->ks_name() == vp->ks_name() && s->cf_name() == vp->view_info()->base_name() ) {
|
||||
if (s->ks_name() == ks_name && s->cf_name() == base_name) {
|
||||
base_schema = s;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!base_schema) {
|
||||
for (auto&& s : tables_diff.created) {
|
||||
if (s.get()->ks_name() == vp->ks_name() && s.get()->cf_name() == vp->view_info()->base_name() ) {
|
||||
if (s.get()->ks_name() == ks_name && s.get()->cf_name() == base_name) {
|
||||
base_schema = s;
|
||||
break;
|
||||
}
|
||||
@@ -599,8 +603,9 @@ static future<> merge_tables_and_views(distributed<service::storage_proxy>& prox
|
||||
}
|
||||
|
||||
if (!base_schema) {
|
||||
base_schema = proxy.local().local_db().find_schema(vp->ks_name(), vp->view_info()->base_name());
|
||||
base_schema = proxy.local().local_db().find_schema(ks_name, base_name);
|
||||
}
|
||||
view_ptr vp = create_view_from_mutations(proxy, std::move(sm), base_schema);
|
||||
|
||||
// Now when we have a referenced base - sanity check that we're not registering an old view
|
||||
// (this could happen when we skip multiple major versions in upgrade, which is unsupported.)
|
||||
|
||||
@@ -2432,9 +2432,11 @@ static index_metadata create_index_from_index_row(const query::result_set_row& r
|
||||
|
||||
/*
|
||||
* View metadata serialization/deserialization.
|
||||
* If the base schema is not provided, the schema context must have a reference to the database,
|
||||
* and the most up-to-date base schema will be pulled from there.
|
||||
*/
|
||||
|
||||
view_ptr create_view_from_mutations(const schema_ctxt& ctxt, schema_mutations sm, std::optional<table_schema_version> version) {
|
||||
view_ptr create_view_from_mutations(const schema_ctxt& ctxt, schema_mutations sm, std::optional<schema_ptr> base_schema, std::optional<table_schema_version> version) {
|
||||
auto table_rs = query::result_set(sm.columnfamilies_mutation());
|
||||
query::result_set_row row = table_rs.row(0);
|
||||
|
||||
@@ -2463,12 +2465,19 @@ view_ptr create_view_from_mutations(const schema_ctxt& ctxt, schema_mutations sm
|
||||
builder.with_version(sm.digest(ctxt.features().cluster_schema_features()));
|
||||
}
|
||||
|
||||
auto base_id = table_id(row.get_nonnull<utils::UUID>("base_table_id"));
|
||||
auto base_name = row.get_nonnull<sstring>("base_table_name");
|
||||
if (!base_schema) {
|
||||
if (!ctxt.get_db()) {
|
||||
on_internal_error(slogger, format("No database reference with missing base schema when creating view {}.{} from mutations",
|
||||
ks_name, cf_name));
|
||||
}
|
||||
auto base_id = table_id(row.get_nonnull<utils::UUID>("base_table_id"));
|
||||
base_schema = ctxt.get_db()->find_schema(base_id);
|
||||
}
|
||||
|
||||
auto include_all_columns = row.get_nonnull<bool>("include_all_columns");
|
||||
auto where_clause = row.get_nonnull<sstring>("where_clause");
|
||||
|
||||
builder.with_view_info(std::move(base_id), std::move(base_name), include_all_columns, std::move(where_clause));
|
||||
builder.with_view_info(*base_schema, include_all_columns, std::move(where_clause));
|
||||
return view_ptr(builder.build());
|
||||
}
|
||||
|
||||
|
||||
@@ -287,7 +287,7 @@ std::vector<mutation> make_drop_table_mutations(lw_shared_ptr<keyspace_metadata>
|
||||
|
||||
schema_ptr create_table_from_mutations(const schema_ctxt&, schema_mutations, std::optional<table_schema_version> version = {});
|
||||
|
||||
view_ptr create_view_from_mutations(const schema_ctxt&, schema_mutations, std::optional<table_schema_version> version = {});
|
||||
view_ptr create_view_from_mutations(const schema_ctxt&, schema_mutations, std::optional<schema_ptr> = {}, std::optional<table_schema_version> version = {});
|
||||
|
||||
future<std::vector<view_ptr>> create_views_from_schema_partition(distributed<service::storage_proxy>& proxy, const schema_result::mapped_type& result);
|
||||
|
||||
|
||||
@@ -78,9 +78,10 @@ static inline void inject_failure(std::string_view operation) {
|
||||
[operation] { throw std::runtime_error(std::string(operation)); });
|
||||
}
|
||||
|
||||
view_info::view_info(const schema& schema, const raw_view_info& raw_view_info)
|
||||
view_info::view_info(const schema& schema, const raw_view_info& raw_view_info, schema_ptr base_schema)
|
||||
: _schema(schema)
|
||||
, _raw(raw_view_info)
|
||||
, _base_info(make_base_dependent_view_info(*base_schema))
|
||||
, _has_computed_column_depending_on_base_non_primary_key(false)
|
||||
{ }
|
||||
|
||||
|
||||
@@ -25,12 +25,15 @@ frozen_schema::frozen_schema(const schema_ptr& s)
|
||||
}())
|
||||
{ }
|
||||
|
||||
schema_ptr frozen_schema::unfreeze(const db::schema_ctxt& ctxt) const {
|
||||
schema_ptr frozen_schema::unfreeze(const db::schema_ctxt& ctxt, std::optional<schema_ptr> base_schema) const {
|
||||
auto in = ser::as_input_stream(_data);
|
||||
auto sv = ser::deserialize(in, std::type_identity<ser::schema_view>());
|
||||
return sv.mutations().is_view()
|
||||
? db::schema_tables::create_view_from_mutations(ctxt, sv.mutations(), sv.version())
|
||||
: db::schema_tables::create_table_from_mutations(ctxt, sv.mutations(), sv.version());
|
||||
auto sm = sv.mutations();
|
||||
if (sm.is_view()) {
|
||||
return db::schema_tables::create_view_from_mutations(ctxt, std::move(sm), base_schema, sv.version());
|
||||
} else {
|
||||
return db::schema_tables::create_table_from_mutations(ctxt, std::move(sm), sv.version());
|
||||
}
|
||||
}
|
||||
|
||||
frozen_schema::frozen_schema(bytes_ostream b)
|
||||
|
||||
@@ -27,6 +27,6 @@ public:
|
||||
frozen_schema(const frozen_schema&) = default;
|
||||
frozen_schema& operator=(const frozen_schema&) = default;
|
||||
frozen_schema& operator=(frozen_schema&&) = default;
|
||||
schema_ptr unfreeze(const db::schema_ctxt&) const;
|
||||
schema_ptr unfreeze(const db::schema_ctxt&, std::optional<schema_ptr> base = std::nullopt) const;
|
||||
const bytes_ostream& representation() const;
|
||||
};
|
||||
|
||||
@@ -305,7 +305,7 @@ view_ptr secondary_index_manager::create_view_for_index(const index_metadata& im
|
||||
(target_type == cql3::statements::index_target::target_type::regular_values) ?
|
||||
format("{} IS NOT NULL", index_target->name_as_cql_string()) :
|
||||
"";
|
||||
builder.with_view_info(*schema, false, where_clause);
|
||||
builder.with_view_info(schema, false, where_clause);
|
||||
// A local secondary index should be backed by a *synchronous* view,
|
||||
// see #16371. A view is marked synchronous with a tag. Non-local indexes
|
||||
// do not need the tags schema extension at all.
|
||||
|
||||
@@ -406,7 +406,7 @@ schema::raw_schema::raw_schema(table_id id)
|
||||
, _sharder(::get_sharder(smp::count, default_partitioner_ignore_msb))
|
||||
{ }
|
||||
|
||||
schema::schema(private_tag, const raw_schema& raw, const schema_static_props& props)
|
||||
schema::schema(private_tag, const raw_schema& raw, const schema_static_props& props, std::optional<schema_ptr> base_schema)
|
||||
: _raw(raw)
|
||||
, _static_props(props)
|
||||
, _offsets([this] {
|
||||
@@ -490,7 +490,14 @@ schema::schema(private_tag, const raw_schema& raw, const schema_static_props& pr
|
||||
|
||||
rebuild();
|
||||
if (_raw._view_info) {
|
||||
_view_info = std::make_unique<::view_info>(*this, *_raw._view_info);
|
||||
if (!base_schema) {
|
||||
on_internal_error(dblog, format("Tried to create schema for view {}.{} without schema of the base {}",
|
||||
_raw._ks_name, _raw._cf_name, _raw._view_info->base_name()));
|
||||
}
|
||||
_view_info = std::make_unique<::view_info>(*this, *_raw._view_info, *base_schema);
|
||||
} else if (base_schema) {
|
||||
on_internal_error(dblog, format("Tried to create schema for view {}.{} of base {} without view info",
|
||||
_raw._ks_name, _raw._cf_name, base_schema.value()->cf_name()));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -507,7 +514,7 @@ schema::schema(const schema& o, const std::function<void(schema&)>& transform)
|
||||
|
||||
rebuild();
|
||||
if (o.is_view()) {
|
||||
_view_info = std::make_unique<::view_info>(*this, o.view_info()->raw());
|
||||
_view_info = std::make_unique<::view_info>(*this, o.view_info()->raw(), o.view_info()->base_info()->base_schema());
|
||||
if (o.view_info()->base_info()) {
|
||||
_view_info->set_base_info(o.view_info()->base_info());
|
||||
}
|
||||
@@ -1241,6 +1248,7 @@ schema_builder::schema_builder(const schema_ptr s)
|
||||
: schema_builder(s->_raw)
|
||||
{
|
||||
if (s->is_view()) {
|
||||
_base_schema = s->view_info()->base_info()->base_schema();
|
||||
_view_info = s->view_info()->raw();
|
||||
}
|
||||
}
|
||||
@@ -1478,8 +1486,9 @@ void schema_builder::prepare_dense_schema(schema::raw_schema& raw) {
|
||||
}
|
||||
}
|
||||
|
||||
schema_builder& schema_builder::with_view_info(table_id base_id, sstring base_name, bool include_all_columns, sstring where_clause) {
|
||||
_raw._view_info = raw_view_info(std::move(base_id), std::move(base_name), include_all_columns, std::move(where_clause));
|
||||
schema_builder& schema_builder::with_view_info(schema_ptr base, bool include_all_columns, sstring where_clause) {
|
||||
_base_schema = std::move(base);
|
||||
_raw._view_info = raw_view_info(_base_schema.value()->id(), _base_schema.value()->cf_name(), include_all_columns, std::move(where_clause));
|
||||
return *this;
|
||||
}
|
||||
|
||||
@@ -1584,7 +1593,7 @@ schema_ptr schema_builder::build(schema::raw_schema& new_raw) {
|
||||
}
|
||||
), _version);
|
||||
|
||||
return make_lw_shared<schema>(schema::private_tag{}, new_raw, static_props);
|
||||
return make_lw_shared<schema>(schema::private_tag{}, new_raw, static_props, _base_schema);
|
||||
}
|
||||
|
||||
auto schema_builder::static_configurators() -> std::vector<static_configurator>& {
|
||||
|
||||
@@ -623,7 +623,7 @@ private:
|
||||
schema(const schema&, const std::function<void(schema&)>&);
|
||||
class private_tag{};
|
||||
public:
|
||||
schema(private_tag, const raw_schema&, const schema_static_props& props);
|
||||
schema(private_tag, const raw_schema&, const schema_static_props& props, std::optional<schema_ptr> base_schema);
|
||||
schema(const schema&);
|
||||
// See \ref make_reversed().
|
||||
schema(reversed_tag, const schema&);
|
||||
|
||||
@@ -29,6 +29,7 @@ private:
|
||||
std::optional<compact_storage> _compact_storage;
|
||||
std::variant<from_time, from_hash, table_schema_version> _version = from_time{};
|
||||
std::optional<raw_view_info> _view_info;
|
||||
std::optional<schema_ptr> _base_schema;
|
||||
schema_builder(const schema::raw_schema&);
|
||||
static std::vector<static_configurator>& static_configurators();
|
||||
public:
|
||||
@@ -271,10 +272,7 @@ public:
|
||||
// of table_schema_version (even in ABA changes).
|
||||
schema_builder& with_hash_version();
|
||||
|
||||
schema_builder& with_view_info(table_id base_id, sstring base_name, bool include_all_columns, sstring where_clause);
|
||||
schema_builder& with_view_info(const schema& base_schema, bool include_all_columns, sstring where_clause) {
|
||||
return with_view_info(base_schema.id(), base_schema.cf_name(), include_all_columns, where_clause);
|
||||
}
|
||||
schema_builder& with_view_info(schema_ptr base_schema, bool include_all_columns, sstring where_clause);
|
||||
|
||||
schema_builder& with_index(const index_metadata& im);
|
||||
schema_builder& without_index(const sstring& name);
|
||||
|
||||
@@ -234,7 +234,7 @@ future<schema_ptr> schema_registry_entry::start_loading(async_schema_loader load
|
||||
schema_ptr schema_registry_entry::get_schema() {
|
||||
if (!_schema) {
|
||||
slogger.trace("Activating {}", _version);
|
||||
auto s = _frozen_schema->unfreeze(*_registry._ctxt);
|
||||
schema_ptr s = _frozen_schema->unfreeze(*_registry._ctxt, _base_schema);
|
||||
if (s->version() != _version) {
|
||||
throw std::runtime_error(format("Unfrozen schema version doesn't match entry version ({}): {}", _version, *s));
|
||||
}
|
||||
|
||||
@@ -4004,7 +4004,7 @@ SEASTAR_TEST_CASE(test_view_with_two_regular_base_columns_in_key) {
|
||||
.with_column(to_bytes("v2"), int32_type, column_kind::clustering_key)
|
||||
.with_column(to_bytes("p"), int32_type, column_kind::clustering_key)
|
||||
.with_column(to_bytes("c"), int32_type, column_kind::clustering_key)
|
||||
.with_view_info(*schema, false, "v1 IS NOT NULL AND v2 IS NOT NULL AND p IS NOT NULL AND c IS NOT NULL");
|
||||
.with_view_info(schema, false, "v1 IS NOT NULL AND v2 IS NOT NULL AND p IS NOT NULL AND c IS NOT NULL");
|
||||
|
||||
schema_ptr view_schema = view_builder.build();
|
||||
auto& mm = e.migration_manager().local();
|
||||
|
||||
@@ -288,7 +288,7 @@ SEASTAR_THREAD_TEST_CASE(test_view_info_is_recovered_after_dying) {
|
||||
auto view_schema = schema_builder("ks", "cf_view")
|
||||
.with_column("v", int32_type, column_kind::partition_key)
|
||||
.with_column("pk", int32_type)
|
||||
.with_view_info(*base_schema, false, "pk IS NOT NULL AND v IS NOT NULL")
|
||||
.with_view_info(base_schema, false, "pk IS NOT NULL AND v IS NOT NULL")
|
||||
.build();
|
||||
view_schema->view_info()->set_base_info(view_schema->view_info()->make_base_dependent_view_info(*base_schema));
|
||||
local_schema_registry().get_or_load(view_schema->version(),
|
||||
|
||||
@@ -477,7 +477,11 @@ schema_ptr do_load_schema_from_schema_tables(const db::config& dbcfg, std::files
|
||||
schema_mutations muts(std::move(*tables), std::move(*columns), std::move(view_virtual_columns), std::move(computed_columns), std::move(indexes),
|
||||
std::move(dropped_columns), std::move(scylla_tables));
|
||||
if (muts.is_view()) {
|
||||
return db::schema_tables::create_view_from_mutations(ctxt, muts);
|
||||
query::result_set rs(muts.columnfamilies_mutation());
|
||||
const query::result_set_row& view_row = rs.row(0);
|
||||
auto base_name = view_row.get_nonnull<sstring>("base_table_name");
|
||||
auto base_schema = do_load_schema_from_schema_tables(dbcfg, scylla_data_path, keyspace, base_name);
|
||||
return db::schema_tables::create_view_from_mutations(ctxt, muts, std::move(base_schema));
|
||||
} else {
|
||||
return db::schema_tables::create_table_from_mutations(ctxt, muts);
|
||||
}
|
||||
|
||||
@@ -29,7 +29,7 @@ class view_info final {
|
||||
// partition key columns of the base, maybe in a different order.
|
||||
mutable bool _is_partition_key_permutation_of_base_partition_key;
|
||||
public:
|
||||
view_info(const schema& schema, const raw_view_info& raw_view_info);
|
||||
view_info(const schema& schema, const raw_view_info& raw_view_info, schema_ptr base_schema);
|
||||
|
||||
const raw_view_info& raw() const {
|
||||
return _raw;
|
||||
|
||||
Reference in New Issue
Block a user