Merge 'schema_tables: turn view schema fixing code into a sanity check' from Kamil Braun

The purpose of `maybe_fix_legacy_secondary_index_mv_schema` was to deal
with legacy materialized view schemas used for secondary indexes,
schemas which were created before the notion of "computed columns" was
introduced. Back then, secondary index schemas would use a regular
"token" column. Later it became a computed column and old schemas would
be migrated during rolling upgrade.

The migration code was introduced in 2019
(db8d4a0cc6) and then fixed in 2020
(d473bc9b06).

The fix was present in Enterprise 2022.1 and in OSS 4.5. So, assuming
that users don't try crazy things like upgrading from 2021.X to 2023.X
(which we do not support), all clusters will have already executed the
migration code once they upgrade to 2023.X, meaning we can get rid of
it.

The main motivation of this PR is to get rid of the
`db::schema_tables::merge_schema` call in `parse_schema_tables`. In Raft
mode this was the only call to `merge_schema` outside "group 0 code" and
in fact it is unsafe -- it uses locally generated mutations with locally
generated timestamp (`api::new_timestamp()`), so if we actually did it,
we would permanently diverge the group 0 state machine across nodes
(the schema pulling code is disabled in Raft mode). Fortunately, this
should be dead code by now, as explained in the previous paragraph.

The migration code is now turned into a sanity check, if the users
try something crazy, they will get an error instead of silent data
corruption.

Closes scylladb/scylladb#15695

* github.com:scylladb/scylladb:
  view: remove unused `_backing_secondary_index`
  schema_tables: turn view schema fixing code into a sanity check
  schema_tables: make comment more precise
  feature_service: make COMPUTED_COLUMNS feature unconditionally true
This commit is contained in:
Avi Kivity
2023-10-31 13:23:19 +02:00
9 changed files with 44 additions and 83 deletions

View File

@@ -1003,16 +1003,12 @@ static void append_base_key_to_index_ck(std::vector<managed_bytes_view>& explode
bytes indexed_table_select_statement::compute_idx_token(const partition_key& key) const {
const column_definition& cdef = *_view_schema->clustering_key_columns().begin();
clustering_row empty_row(clustering_key_prefix::make_empty());
bytes computed_value;
if (!cdef.is_computed()) {
// FIXME(pgrabowski): this legacy code is here for backward compatibility and should be removed
// once "computed_columns feature" is supported by every node
computed_value = legacy_token_column_computation().compute_value(*_schema, key);
} else {
computed_value = cdef.get_computation().compute_value(*_schema, key);
throw std::logic_error{format(
"Detected legacy non-computed token column {} in table {}.{}",
cdef.name_as_text(), _schema->ks_name(), _schema->cf_name())};
}
return computed_value;
return cdef.get_computation().compute_value(*_schema, key);
}
lw_shared_ptr<const service::pager::paging_state> indexed_table_select_statement::generate_view_paging_state_from_base_query_results(lw_shared_ptr<const service::pager::paging_state> paging_state,

View File

@@ -829,13 +829,6 @@ future<std::vector<canonical_mutation>> convert_schema_to_mutations(distributed<
std::vector<mutation>
adjust_schema_for_schema_features(std::vector<mutation> schema, schema_features features) {
//Don't send the `computed_columns` table mutations to nodes that doesn't know it.
if (!features.contains(schema_feature::COMPUTED_COLUMNS)) {
schema.erase(std::remove_if(schema.begin(), schema.end(), [] (const mutation& m) {
return m.schema()->cf_name() == COMPUTED_COLUMNS;
}) , schema.end());
}
for (auto& m : schema) {
m = redact_columns_for_missing_features(m, features);
}
@@ -1489,12 +1482,10 @@ static future<> merge_tables_and_views(distributed<service::storage_proxy>& prox
base_schema = proxy.local().local_db().find_schema(vp->ks_name(), vp->view_info()->base_name());
}
// Now when we have a referenced base - just in case we are registering an old view (this can happen in a mixed cluster)
// lets make it write enabled by updating it's compute columns.
view_ptr fixed_vp = maybe_fix_legacy_secondary_index_mv_schema(proxy.local().get_db().local(), vp, base_schema, preserve_version::yes);
if(fixed_vp) {
vp = fixed_vp;
}
// Now when we have a referenced base - sanity check that we're not registering an old view
// (this could happen when we skip multiple major versions in upgrade, which is unsupported.)
check_no_legacy_secondary_index_mv_schema(proxy.local().get_db().local(), vp, base_schema);
vp->view_info()->set_base_info(vp->view_info()->make_base_dependent_view_info(*base_schema));
return vp;
});
@@ -3666,20 +3657,20 @@ std::vector<sstring> all_table_names(schema_features features) {
boost::adaptors::transformed([] (auto schema) { return schema->cf_name(); }));
}
view_ptr maybe_fix_legacy_secondary_index_mv_schema(replica::database& db, const view_ptr& v, schema_ptr base_schema, preserve_version preserve_version) {
void check_no_legacy_secondary_index_mv_schema(replica::database& db, const view_ptr& v, schema_ptr base_schema) {
// Legacy format for a secondary index used a hardcoded "token" column, which ensured a proper
// order for indexed queries. This "token" column is now implemented as a computed column,
// but for the sake of compatibility we assume that there might be indexes created in the legacy
// format, where "token" is not marked as computed. Once we're sure that all indexes have their
// columns marked as computed (because they were either created on a node that supports computed
// columns or were fixed by this utility function), it's safe to remove this function altogether.
// order for indexed queries. This "token" column is has been implemented as a computed column
// for a long time now, and migration code has been / will be executed on all reasonable Scylla
// deployments (which don't do unsupported upgrades).
//
// This function is now used as a sanity check that we're not dealing with the legacy format anymore.
if (v->clustering_key_size() == 0) {
return view_ptr(nullptr);
return;
}
const auto ck_cols = v->clustering_key_columns();
const column_definition& first_view_ck = ck_cols.front();
if (first_view_ck.is_computed()) {
return view_ptr(nullptr);
return;
}
if (!base_schema) {
@@ -3687,17 +3678,16 @@ view_ptr maybe_fix_legacy_secondary_index_mv_schema(replica::database& db, const
}
// If the first clustering key part of a view is a column with name not found in base schema,
// it implies it might be backing an index created before computed columns were introduced,
// and as such it must be recreated properly.
// and the column is not computed (which we checked above), then it must be backing an index
// created before computed columns were introduced.
if (!base_schema->columns_by_name().contains(first_view_ck.name())) {
schema_builder builder{schema_ptr(v)};
builder.mark_column_computed(first_view_ck.name(), std::make_unique<legacy_token_column_computation>());
if (preserve_version) {
builder.with_version(v->version());
on_fatal_internal_error(slogger, format(
"Materialized view {}.{}: first clustering key column ({}) is not computed and does not have a corresponding"
" column in the base table. This materialized view must therefore be a secondary index created"
" using legacy method (without computed columns) that wasn't migrated properly to new method."
" Make sure that you perform rolling upgrade according to documented procedure without skipping"
" major Scylla versions.", v->ks_name(), v->cf_name(), first_view_ck.name_as_text()));
}
return view_ptr(builder.build());
}
return view_ptr(nullptr);
}

View File

@@ -264,9 +264,7 @@ std::vector<mutation> make_update_view_mutations(lw_shared_ptr<keyspace_metadata
std::vector<mutation> make_drop_view_mutations(lw_shared_ptr<keyspace_metadata> keyspace, view_ptr view, api::timestamp_type timestamp);
class preserve_version_tag {};
using preserve_version = bool_class<preserve_version_tag>;
view_ptr maybe_fix_legacy_secondary_index_mv_schema(replica::database& db, const view_ptr& v, schema_ptr base_schema, preserve_version preserve_version);
void check_no_legacy_secondary_index_mv_schema(replica::database& db, const view_ptr& v, schema_ptr base_schema);
sstring serialize_kind(column_kind kind);
column_kind deserialize_kind(sstring kind);

View File

@@ -575,15 +575,13 @@ private:
const partition_key& _base_key;
const clustering_or_static_row& _update;
const std::optional<clustering_or_static_row>& _existing;
const bool _backing_secondary_index;
public:
value_getter(const schema& base, const partition_key& base_key, const clustering_or_static_row& update, const std::optional<clustering_or_static_row>& existing, bool backing_secondary_index)
value_getter(const schema& base, const partition_key& base_key, const clustering_or_static_row& update, const std::optional<clustering_or_static_row>& existing)
: _base(base)
, _base_key(base_key)
, _update(update)
, _existing(existing)
, _backing_secondary_index(backing_secondary_index)
{
}
@@ -616,22 +614,18 @@ public:
private:
vector_type handle_computed_column(const column_definition& cdef) {
bytes computed_value;
if (!cdef.is_computed()) {
//FIXME(sarna): this legacy code is here for backward compatibility and should be removed
// once "computed_columns feature" is supported by every node
if (!_backing_secondary_index) {
throw std::logic_error(format("Column {} doesn't exist in base and this view is not backing a secondary index", cdef.name_as_text()));
throw std::logic_error{format(
"Detected legacy non-computed token column {} in view for table {}.{}",
cdef.name_as_text(), _base.ks_name(), _base.cf_name())};
}
computed_value = legacy_token_column_computation().compute_value(_base, _base_key);
} else {
auto& computation = cdef.get_computation();
if (auto* collection_computation = dynamic_cast<const collection_column_computation*>(&computation)) {
return handle_collection_column_computation(collection_computation);
}
computed_value = computation.compute_value(_base, _base_key);
}
auto computed_value = computation.compute_value(_base, _base_key);
return {managed_bytes_view(linearized_values.emplace_back(std::move(computed_value)))};
}
@@ -653,7 +647,7 @@ private:
std::vector<view_updates::view_row_entry>
view_updates::get_view_rows(const partition_key& base_key, const clustering_or_static_row& update, const std::optional<clustering_or_static_row>& existing) {
value_getter getter(*_base, base_key, update, existing, _backing_secondary_index);
value_getter getter(*_base, base_key, update, existing);
auto get_value = boost::adaptors::transformed(std::ref(getter));
@@ -1458,8 +1452,7 @@ view_update_builder make_view_update_builder(
" base schema version of the view ({}) for view {}.{} of {}.{}",
base->version(), v.base->base_schema()->version(), v.view->ks_name(), v.view->cf_name(), base->ks_name(), base->cf_name()));
}
bool is_index = base_table.get_index_manager().is_index(v.view);
return view_updates(std::move(v), is_index);
return view_updates(std::move(v));
}));
return view_update_builder(std::move(db), base_table, base, std::move(vs), std::move(updates), std::move(existings), now);
}

View File

@@ -210,15 +210,13 @@ class view_updates final {
base_info_ptr _base_info;
std::unordered_map<partition_key, mutation_partition, partition_key::hashing, partition_key::equality> _updates;
mutable size_t _op_count = 0;
const bool _backing_secondary_index;
public:
explicit view_updates(view_and_base vab, bool backing_secondary_index)
explicit view_updates(view_and_base vab)
: _view(std::move(vab.view))
, _view_info(*_view->view_info())
, _base(vab.base->base_schema())
, _base_info(vab.base)
, _updates(8, partition_key::hashing(*_view), partition_key::equality(*_view))
, _backing_secondary_index(backing_secondary_index)
{
}

View File

@@ -138,6 +138,7 @@ std::set<std::string_view> feature_service::supported_feature_set() const {
"CORRECT_STATIC_COMPACT_IN_MC"sv,
"UNBOUNDED_RANGE_TOMBSTONES"sv,
"MC_SSTABLE_FORMAT"sv,
"COMPUTED_COLUMNS"sv,
};
if (is_test_only_feature_deprecated()) {
@@ -195,7 +196,7 @@ db::schema_features feature_service::cluster_schema_features() const {
db::schema_features f;
f.set_if<db::schema_feature::VIEW_VIRTUAL_COLUMNS>(view_virtual_columns);
f.set_if<db::schema_feature::DIGEST_INSENSITIVE_TO_EXPIRY>(digest_insensitive_to_expiry);
f.set_if<db::schema_feature::COMPUTED_COLUMNS>(computed_columns);
f.set<db::schema_feature::COMPUTED_COLUMNS>();
f.set_if<db::schema_feature::CDC_OPTIONS>(cdc);
f.set_if<db::schema_feature::PER_TABLE_PARTITIONERS>(per_table_partitioners);
f.set_if<db::schema_feature::SCYLLA_KEYSPACES>(keyspace_storage_options);

View File

@@ -87,7 +87,6 @@ public:
gms::feature me_sstable { *this, "ME_SSTABLE_FORMAT"sv };
gms::feature view_virtual_columns { *this, "VIEW_VIRTUAL_COLUMNS"sv };
gms::feature digest_insensitive_to_expiry { *this, "DIGEST_INSENSITIVE_TO_EXPIRY"sv };
gms::feature computed_columns { *this, "COMPUTED_COLUMNS"sv };
gms::feature cdc { *this, "CDC"sv };
gms::feature nonfrozen_udts { *this, "NONFROZEN_UDTS"sv };
gms::feature hinted_handoff_separate_connection { *this, "HINTED_HANDOFF_SEPARATE_CONNECTION"sv };

View File

@@ -852,17 +852,8 @@ future<> database::parse_system_tables(distributed<service::storage_proxy>& prox
co_await do_parse_schema_tables(proxy, db::schema_tables::VIEWS, coroutine::lambda([&] (schema_result_value_type &v) -> future<> {
std::vector<view_ptr> views = co_await create_views_from_schema_partition(proxy, v.second);
co_await coroutine::parallel_for_each(views.begin(), views.end(), [&] (auto&& v) -> future<> {
// TODO: Remove once computed columns are guaranteed to be featured in the whole cluster.
// we fix here the schema in place in oreder to avoid races (write commands comming from other coordinators).
view_ptr fixed_v = maybe_fix_legacy_secondary_index_mv_schema(*this, v, nullptr, preserve_version::yes);
view_ptr v_to_add = fixed_v ? fixed_v : v;
co_await this->add_column_family_and_make_directory(v_to_add, replica::database::is_new_cf::no);
if (bool(fixed_v)) {
v_to_add = fixed_v;
auto&& keyspace = find_keyspace(v->ks_name()).metadata();
auto mutations = db::schema_tables::make_update_view_mutations(keyspace, view_ptr(v), fixed_v, api::new_timestamp(), true);
co_await db::schema_tables::merge_schema(sys_ks, proxy, _feat, std::move(mutations));
}
check_no_legacy_secondary_index_mv_schema(*this, v, nullptr);
co_await this->add_column_family_and_make_directory(v, replica::database::is_new_cf::no);
});
}));
}

View File

@@ -109,7 +109,6 @@ void migration_manager::init_messaging_service()
_feature_listeners.push_back(_feat.digest_insensitive_to_expiry.when_enabled(update_schema));
_feature_listeners.push_back(_feat.cdc.when_enabled(update_schema));
_feature_listeners.push_back(_feat.per_table_partitioners.when_enabled(update_schema));
_feature_listeners.push_back(_feat.computed_columns.when_enabled(update_schema));
if (!_feat.table_digest_insensitive_to_expiry) {
_feature_listeners.push_back(_feat.table_digest_insensitive_to_expiry.when_enabled([this] {
@@ -1106,15 +1105,11 @@ static future<schema_ptr> get_schema_definition(table_schema_version v, netw::me
// That means the column mapping for the schema should always be inserted
// with TTL (refresh TTL in case column mapping already existed prior to that).
auto us = s.unfreeze(db::schema_ctxt(proxy));
// if this is a view - we might need to fix it's schema before registering it.
// if this is a view - sanity check that its schema doesn't need fixing.
if (us->is_view()) {
auto& db = proxy.local().local_db();
schema_ptr base_schema = db.find_schema(us->view_info()->base_id());
auto fixed_view = db::schema_tables::maybe_fix_legacy_secondary_index_mv_schema(db, view_ptr(us), base_schema,
db::schema_tables::preserve_version::yes);
if (fixed_view) {
us = fixed_view;
}
db::schema_tables::check_no_legacy_secondary_index_mv_schema(db, view_ptr(us), base_schema);
}
return db::schema_tables::store_column_mapping(proxy, us, true).then([us] {
return frozen_schema{us};