From 39cd9dae4e2b13ba2f3d6ef192e9446c7d84a47e Mon Sep 17 00:00:00 2001 From: Eliran Sinvani Date: Mon, 23 Nov 2020 15:56:35 +0200 Subject: [PATCH 1/4] materialized views: Extract fix legacy schema into its own logic We extract the logic for fixing the view schema into it's own logic as we will need to use it in more places in the code. This makes 'maybe_update_legacy_secondary_index_mv_schema' redundant since it becomes a two liner wrapper for this logic. We also remove it here and replace the call to it with the equivalent code. --- database.cc | 8 +++++++- db/schema_tables.cc | 21 +++++++++++++-------- db/schema_tables.hh | 4 +++- 3 files changed, 23 insertions(+), 10 deletions(-) diff --git a/database.cc b/database.cc index 07b670f84e..9fbc354ef6 100644 --- a/database.cc +++ b/database.cc @@ -848,7 +848,13 @@ future<> database::parse_system_tables(distributed& prox return create_views_from_schema_partition(proxy, v.second).then([this, &mm] (std::vector views) { return parallel_for_each(views.begin(), views.end(), [this, &mm] (auto&& v) { return this->add_column_family_and_make_directory(v).then([this, &mm, v] { - return maybe_update_legacy_secondary_index_mv_schema(mm.local(), *this, v); + // TODO: Remove once computed columns are guaranteed to be featured in the whole cluster. + view_ptr fixed_v = maybe_fix_legacy_secondary_index_mv_schema(*this, v, nullptr, preserve_version::no); + if (fixed_v) { + return mm.local().announce_view_update(view_ptr(fixed_v)); + } else { + return make_ready_future<>(); + } }); }); }); diff --git a/db/schema_tables.cc b/db/schema_tables.cc index 941c3ab1fa..ff96b49119 100644 --- a/db/schema_tables.cc +++ b/db/schema_tables.cc @@ -3071,8 +3071,7 @@ std::vector all_table_names(schema_features features) { boost::adaptors::transformed([] (auto schema) { return schema->cf_name(); })); } -future<> maybe_update_legacy_secondary_index_mv_schema(service::migration_manager& mm, database& db, view_ptr v) { - // TODO(sarna): Remove once computed columns are guaranteed to be featured in the whole cluster. +view_ptr maybe_fix_legacy_secondary_index_mv_schema(database& db, const view_ptr& v, schema_ptr base_schema, preserve_version preserve_version) { // Legacy format for a secondary index used a hardcoded "token" column, which ensured a proper // order for indexed queries. This "token" column is now implemented as a computed column, // but for the sake of compatibility we assume that there might be indexes created in the legacy @@ -3080,26 +3079,32 @@ future<> maybe_update_legacy_secondary_index_mv_schema(service::migration_manage // columns marked as computed (because they were either created on a node that supports computed // columns or were fixed by this utility function), it's safe to remove this function altogether. if (v->clustering_key_size() == 0) { - return make_ready_future<>(); + return view_ptr(nullptr); } const column_definition& first_view_ck = v->clustering_key_columns().front(); if (first_view_ck.is_computed()) { - return make_ready_future<>(); + return view_ptr(nullptr); + } + + if (!base_schema) { + base_schema = db.find_schema(v->view_info()->base_id()); } - table& base = db.find_column_family(v->view_info()->base_id()); - schema_ptr base_schema = base.schema(); // If the first clustering key part of a view is a column with name not found in base schema, // it implies it might be backing an index created before computed columns were introduced, // and as such it must be recreated properly. if (!base_schema->columns_by_name().contains(first_view_ck.name())) { schema_builder builder{schema_ptr(v)}; builder.mark_column_computed(first_view_ck.name(), std::make_unique()); - return mm.announce_view_update(view_ptr(builder.build())); + if (preserve_version) { + builder.with_version(v->version()); + } + return view_ptr(builder.build()); } - return make_ready_future<>(); + return view_ptr(nullptr); } + namespace legacy { table_schema_version schema_mutations::digest() const { diff --git a/db/schema_tables.hh b/db/schema_tables.hh index 74d10cc9a1..0d51a88869 100644 --- a/db/schema_tables.hh +++ b/db/schema_tables.hh @@ -239,7 +239,9 @@ std::vector make_update_view_mutations(lw_shared_ptr make_drop_view_mutations(lw_shared_ptr keyspace, view_ptr view, api::timestamp_type timestamp); -future<> maybe_update_legacy_secondary_index_mv_schema(service::migration_manager& mm, database& db, view_ptr v); +class preserve_version_tag {}; +using preserve_version = bool_class; +view_ptr maybe_fix_legacy_secondary_index_mv_schema(database& db, const view_ptr& v, schema_ptr base_schema, preserve_version preserve_version); sstring serialize_kind(column_kind kind); column_kind deserialize_kind(sstring kind); From 9162748b18e2c4f30cf6d3f489b8ea9b42328bdd Mon Sep 17 00:00:00 2001 From: Eliran Sinvani Date: Mon, 23 Nov 2020 15:02:31 +0200 Subject: [PATCH 2/4] materialized views: create view schemas with proper base table reference. Newly created view schemas don't always have their base info, this is bad since such schemas don't support read nor write. This leaves us vulnerable to a race condition where there is an attempt to use this schema for read or write. Here we initialize the base reference and also reconfigure the view to conform to the new computed column type, which makes it usable for write and not only reads. We do it for views created in the migration manager following announcements and also for copied schemas. --- db/schema_tables.cc | 37 ++++++++++++++++++++++++++++++++++++- schema.cc | 3 +++ 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/db/schema_tables.cc b/db/schema_tables.cc index ff96b49119..1ef399fc4a 100644 --- a/db/schema_tables.cc +++ b/db/schema_tables.cc @@ -1226,7 +1226,42 @@ static void merge_tables_and_views(distributed& proxy, return create_table_from_mutations(proxy, std::move(sm)); }); auto views_diff = diff_table_or_view(proxy, std::move(views_before), std::move(views_after), [&] (schema_mutations sm) { - return create_view_from_mutations(proxy, std::move(sm)); + // The view schema mutation should be created with reference to the base table schema because we definitely know it by now. + // If we don't do it we are leaving a window where write commands to this schema are illegal. + // There are 3 possibilities: + // 1. The table was altered - in this case we want the view to correspond to this new table schema. + // 2. The table was just created - the table is guarantied to be published with the view in that case. + // 3. The view itself was altered - in that case we already know the base table so we can take it from + // the database object. + view_ptr vp = create_view_from_mutations(proxy, std::move(sm)); + schema_ptr base_schema; + for (auto&& s : tables_diff.altered) { + if (s.new_schema.get()->ks_name() == vp->ks_name() && s.new_schema.get()->cf_name() == vp->view_info()->base_name() ) { + base_schema = s.new_schema; + break; + } + } + if (!base_schema) { + for (auto&& s : tables_diff.created) { + if (s.get()->ks_name() == vp->ks_name() && s.get()->cf_name() == vp->view_info()->base_name() ) { + base_schema = s; + break; + } + } + } + + if (!base_schema) { + base_schema = proxy.local().local_db().find_schema(vp->ks_name(), vp->view_info()->base_name()); + } + + // Now when we have a referenced base - just in case we are registering an old view (this can happen in a mixed cluster) + // lets make it write enabled by updating it's compute columns. + view_ptr fixed_vp = maybe_fix_legacy_secondary_index_mv_schema(proxy.local().get_db().local(), vp, base_schema, preserve_version::yes); + if(fixed_vp) { + vp = fixed_vp; + } + vp->view_info()->set_base_info(vp->view_info()->make_base_dependent_view_info(*base_schema)); + return vp; }); proxy.local().get_db().invoke_on_all([&] (database& db) { diff --git a/schema.cc b/schema.cc index bedba9b531..1e2e5fab1f 100644 --- a/schema.cc +++ b/schema.cc @@ -456,6 +456,9 @@ schema::schema(const schema& o) rebuild(); if (o.is_view()) { _view_info = std::make_unique<::view_info>(*this, o.view_info()->raw()); + if (o.view_info()->base_info()) { + _view_info->set_base_info(o.view_info()->base_info()); + } } } From 04de7705661d3fe1bb7c0438b13f827a13f4c7e8 Mon Sep 17 00:00:00 2001 From: Eliran Sinvani Date: Mon, 23 Nov 2020 15:11:41 +0200 Subject: [PATCH 3/4] global_schema_ptr: add support for view's base table Up until now, the global_schema_ptr object was a crack through which a view schema with an uninitialized base reference could sneak. Even if the schema itself contained a base reference, the base schema didn't carry over to shards different than the shard on which the global_schema_ptr was created. Since once the schema is in the registry it might be used for everything (reads and writes), we also need to make sure that global schemas for an incomplete view schemas will not be created. --- schema_registry.cc | 79 ++++++++++++++++++++++++++++++++++------------ schema_registry.hh | 1 + 2 files changed, 60 insertions(+), 20 deletions(-) diff --git a/schema_registry.cc b/schema_registry.cc index ea2e69c7b2..1f2b230c2c 100644 --- a/schema_registry.cc +++ b/schema_registry.cc @@ -24,6 +24,7 @@ #include "schema_registry.hh" #include "log.hh" #include "db/schema_tables.hh" +#include "view_info.hh" static logging::logger slogger("schema_registry"); @@ -274,22 +275,43 @@ global_schema_ptr::global_schema_ptr(global_schema_ptr&& o) noexcept { assert(o._cpu_of_origin == current); _ptr = std::move(o._ptr); _cpu_of_origin = current; + _base_schema = std::move(o._base_schema); } schema_ptr global_schema_ptr::get() const { if (this_shard_id() == _cpu_of_origin) { return _ptr; } else { - // 'e' points to a foreign entry, but we know it won't be evicted - // because _ptr is preventing this. - const schema_registry_entry& e = *_ptr->registry_entry(); - schema_ptr s = local_schema_registry().get_or_null(e.version()); - if (!s) { - s = local_schema_registry().get_or_load(e.version(), [&e](table_schema_version) { - return e.frozen(); - }); + auto registered_schema = [](const schema_registry_entry& e) { + schema_ptr ret = local_schema_registry().get_or_null(e.version()); + if (!ret) { + ret = local_schema_registry().get_or_load(e.version(), [&e](table_schema_version) { + return e.frozen(); + }); + } + return ret; + }; + + schema_ptr registered_bs; + // the following code contains registry entry dereference of a foreign shard + // however, it is guarantied to succeed since we made sure in the constructor + // that _bs_schema and _ptr will have a registry on the foreign shard where this + // object originated so as long as this object lives the registry entries lives too + // and it is safe to reference them on foreign shards. + if (_base_schema) { + registered_bs = registered_schema(*_base_schema->registry_entry()); + if (_base_schema->registry_entry()->is_synced()) { + registered_bs->registry_entry()->mark_synced(); + } } - if (e.is_synced()) { + schema_ptr s = registered_schema(*_ptr->registry_entry()); + if (s->is_view()) { + if (!s->view_info()->base_info()) { + // we know that registered_bs is valid here because we make sure of it in the constructors. + s->view_info()->set_base_info(s->view_info()->make_base_dependent_view_info(*registered_bs)); + } + } + if (_ptr->registry_entry()->is_synced()) { s->registry_entry()->mark_synced(); } return s; @@ -297,16 +319,33 @@ schema_ptr global_schema_ptr::get() const { } global_schema_ptr::global_schema_ptr(const schema_ptr& ptr) - : _ptr([&ptr]() { - // _ptr must always have an associated registry entry, - // if ptr doesn't, we need to load it into the registry. - schema_registry_entry* e = ptr->registry_entry(); + : _cpu_of_origin(this_shard_id()) { + // _ptr must always have an associated registry entry, + // if ptr doesn't, we need to load it into the registry. + auto ensure_registry_entry = [] (const schema_ptr& s) { + schema_registry_entry* e = s->registry_entry(); if (e) { - return ptr; - } - return local_schema_registry().get_or_load(ptr->version(), [&ptr] (table_schema_version) { - return frozen_schema(ptr); + return s; + } else { + return local_schema_registry().get_or_load(s->version(), [&s] (table_schema_version) { + return frozen_schema(s); }); - }()) - , _cpu_of_origin(this_shard_id()) -{ } + } + }; + + schema_ptr s = ensure_registry_entry(ptr); + if (s->is_view()) { + if (s->view_info()->base_info()) { + _base_schema = ensure_registry_entry(s->view_info()->base_info()->base_schema()); + } else if (ptr->view_info()->base_info()) { + _base_schema = ensure_registry_entry(ptr->view_info()->base_info()->base_schema()); + } else { + on_internal_error(slogger, format("Tried to build a global schema for view {}.{} with an uninitialized base info", s->ks_name(), s->cf_name())); + } + + if (!s->view_info()->base_info() || !s->view_info()->base_info()->base_schema()->registry_entry()) { + s->view_info()->set_base_info(s->view_info()->make_base_dependent_view_info(*_base_schema)); + } + } + _ptr = s; +} diff --git a/schema_registry.hh b/schema_registry.hh index 78aa02c50e..9b58f2f793 100644 --- a/schema_registry.hh +++ b/schema_registry.hh @@ -165,6 +165,7 @@ schema_registry& local_schema_registry(); // chain will last. class global_schema_ptr { schema_ptr _ptr; + schema_ptr _base_schema; unsigned _cpu_of_origin; public: // Note: the schema_ptr must come from the current shard and can't be nullptr. From 0220786710f0d57a6f47a16239a141038b471489 Mon Sep 17 00:00:00 2001 From: Eliran Sinvani Date: Mon, 22 Feb 2021 12:55:22 +0200 Subject: [PATCH 4/4] database: Fix view schemas in place when loading On restart the view schemas are loaded and might contain old views with an unmarked computed column. We already have code to update the schema, but before we do it we load the view as is. This is not desired since once registered, this view version can be used for writes which is forbidden since we will spot a none computed column which is in the view's primary key but not in the base table at all. To solve this, in addition to altering the persistent schema, we fix the view's loaded schema in place. This is safe since computed column is just involved in generating a value for this column when creating a view update so the effect of this manipulation stays internal. The second stage of the in place fixing is to persist the changes made in the in place fixing so the view is ready for the next node restart in particular the `computed_columns` table. --- database.cc | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/database.cc b/database.cc index 9fbc354ef6..ac186b7983 100644 --- a/database.cc +++ b/database.cc @@ -845,17 +845,22 @@ future<> database::parse_system_tables(distributed& prox }); }).then([&proxy, &mm, this] { return do_parse_schema_tables(proxy, db::schema_tables::VIEWS, [this, &proxy, &mm] (schema_result_value_type &v) { - return create_views_from_schema_partition(proxy, v.second).then([this, &mm] (std::vector views) { - return parallel_for_each(views.begin(), views.end(), [this, &mm] (auto&& v) { - return this->add_column_family_and_make_directory(v).then([this, &mm, v] { - // TODO: Remove once computed columns are guaranteed to be featured in the whole cluster. - view_ptr fixed_v = maybe_fix_legacy_secondary_index_mv_schema(*this, v, nullptr, preserve_version::no); - if (fixed_v) { - return mm.local().announce_view_update(view_ptr(fixed_v)); - } else { - return make_ready_future<>(); - } - }); + return create_views_from_schema_partition(proxy, v.second).then([this, &mm, &proxy] (std::vector views) { + return parallel_for_each(views.begin(), views.end(), [this, &mm, &proxy] (auto&& v) { + // TODO: Remove once computed columns are guaranteed to be featured in the whole cluster. + // we fix here the schema in place in oreder to avoid races (write commands comming from other coordinators). + view_ptr fixed_v = maybe_fix_legacy_secondary_index_mv_schema(*this, v, nullptr, preserve_version::yes); + view_ptr v_to_add = fixed_v ? fixed_v : v; + future<> f = this->add_column_family_and_make_directory(v_to_add); + if (bool(fixed_v)) { + v_to_add = fixed_v; + auto&& keyspace = find_keyspace(v->ks_name()).metadata(); + auto mutations = db::schema_tables::make_update_view_mutations(keyspace, view_ptr(v), fixed_v, api::new_timestamp(), true); + f = f.then([this, &proxy, mutations = std::move(mutations)] { + return db::schema_tables::merge_schema(proxy, _feat, std::move(mutations)); + }); + } + return f; }); }); });