From be12a2bf0a8833ec4e7558f96f84221a1dd58021 Mon Sep 17 00:00:00 2001 From: Duarte Nunes Date: Wed, 1 Mar 2017 17:36:30 +0100 Subject: [PATCH] db/schema_tables: Atomically publish base and view changes This patch ensures that the schema merging atomically publishes schema changes. In particular, it ensures that when a base schema and a subset of its views are modified together (i.e., upon an alter table or alter type statement), then they are published together as well, without any deferring in-between. Signed-off-by: Duarte Nunes --- db/schema_tables.cc | 134 ++++++++++++++++++++++++-------------------- 1 file changed, 72 insertions(+), 62 deletions(-) diff --git a/db/schema_tables.cc b/db/schema_tables.cc index 78c267ad8b..15abf7e995 100644 --- a/db/schema_tables.cc +++ b/db/schema_tables.cc @@ -64,6 +64,7 @@ #include #include +#include #include "compaction_strategy.hh" #include "utils/joinpoint.hh" @@ -113,18 +114,16 @@ struct qualified_name { static future read_table_mutations(distributed& proxy, const qualified_name& table, schema_ptr s); -static void merge_tables(distributed& proxy, - std::map&& before, - std::map&& after); +static void merge_tables_and_views(distributed& proxy, + std::map&& tables_before, + std::map&& tables_after, + std::map&& views_before, + std::map&& views_after); static void merge_types(distributed& proxy, schema_result&& before, schema_result&& after); -static void merge_views(distributed& proxy, - std::map&& before, - std::map&& after); - std::vector ALL { KEYSPACES, COLUMNFAMILIES, COLUMNS, TRIGGERS, USERTYPES, VIEWS, FUNCTIONS, AGGREGATES }; using days = std::chrono::duration>; @@ -701,9 +700,10 @@ future<> do_merge_schema(distributed& proxy, std::vector #endif std::set keyspaces_to_drop = merge_keyspaces(proxy, std::move(old_keyspaces), std::move(new_keyspaces)).get0(); - merge_tables(proxy, std::move(old_column_families), std::move(new_column_families)); + merge_tables_and_views(proxy, + std::move(old_column_families), std::move(new_column_families), + std::move(old_views), std::move(new_views)); merge_types(proxy, std::move(old_types), std::move(new_types)); - merge_views(proxy, std::move(old_views), std::move(new_views)); #if 0 mergeFunctions(oldFunctions, newFunctions); mergeAggregates(oldAggregates, newAggregates); @@ -769,16 +769,7 @@ future> merge_keyspaces(distributed& p }); } -// see the comments for merge_keyspaces() -template -static void merge_schemas(distributed& proxy, - std::map&& before, - std::map&& after, - CreateSchema&& create_schema, - NotifyCreate&& notify_create, - NotifyUpdate&& notify_update, - NotifyDrop&& notify_drop) -{ +struct schema_diff { struct dropped_schema { global_schema_ptr schema; utils::joinpoint jp{[] { @@ -790,57 +781,89 @@ static void merge_schemas(distributed& proxy, std::vector altered; std::vector dropped; + size_t size() const { + return created.size() + altered.size() + dropped.size(); + } +}; + +template +static schema_diff diff_table_or_view(distributed& proxy, + std::map&& before, + std::map&& after, + CreateSchema&& create_schema) +{ + schema_diff d; auto diff = difference(before, after); for (auto&& key : diff.entries_only_on_left) { auto&& s = proxy.local().get_db().local().find_schema(key.keyspace_name, key.table_name); logger.info("Dropping {}.{} id={} version={}", s->ks_name(), s->cf_name(), s->id(), s->version()); - dropped.emplace_back(dropped_schema{s}); + d.dropped.emplace_back(schema_diff::dropped_schema{s}); } for (auto&& key : diff.entries_only_on_right) { auto s = create_schema(std::move(after.at(key))); logger.info("Creating {}.{} id={} version={}", s->ks_name(), s->cf_name(), s->id(), s->version()); - created.emplace_back(s); + d.created.emplace_back(s); } for (auto&& key : diff.entries_differing) { auto s = create_schema(std::move(after.at(key))); logger.info("Altering {}.{} id={} version={}", s->ks_name(), s->cf_name(), s->id(), s->version()); - altered.emplace_back(s); + d.altered.emplace_back(s); } + return d; +} + +// see the comments for merge_keyspaces() +// Atomically publishes schema changes. In particular, this function ensures +// that when a base schema and a subset of its views are modified together (i.e., +// upon an alter table or alter type statement), then they are published together +// as well, without any deferring in-between. +static void merge_tables_and_views(distributed& proxy, + std::map&& tables_before, + std::map&& tables_after, + std::map&& views_before, + std::map&& views_after) +{ + auto tables_diff = diff_table_or_view(proxy, std::move(tables_before), std::move(tables_after), [] (auto&& sm) { + return create_table_from_mutations(std::move(sm)); + }); + auto views_diff = diff_table_or_view(proxy, std::move(views_before), std::move(views_after), [] (auto&& sm) { + return create_view_from_mutations(std::move(sm)); + }); proxy.local().get_db().invoke_on_all([&] (database& db) { return seastar::async([&] { - for (auto&& gs : created) { - db.add_column_family_and_make_directory(gs).get(); - db.find_column_family(gs).mark_ready_for_writes(); - notify_create(service::get_local_migration_manager(), gs).get(); - } - for (auto&& gs : altered) { - bool columns_changed = db.update_column_family(gs); - notify_update(service::get_local_migration_manager(), gs, columns_changed).get(); - } - parallel_for_each(dropped, [&] (dropped_schema& dt) { - schema_ptr s = dt.schema.get(); - return db.drop_column_family(s->ks_name(), s->cf_name(), [&] { return dt.jp.value(); }).then([s, ¬ify_drop] { - return notify_drop(service::get_local_migration_manager(), s); - }); + parallel_for_each(boost::range::join(tables_diff.created, views_diff.created), [&] (global_schema_ptr& gs) { + return db.add_column_family_and_make_directory(gs); }).get(); + for (auto&& gs : boost::range::join(tables_diff.created, views_diff.created)) { + db.find_column_family(gs).mark_ready_for_writes(); + } + std::vector columns_changed; + columns_changed.reserve(tables_diff.altered.size() + views_diff.altered.size()); + for (auto&& gs : boost::range::join(tables_diff.altered, views_diff.altered)) { + columns_changed.push_back(db.update_column_family(gs)); + } + parallel_for_each(boost::range::join(tables_diff.dropped, views_diff.dropped), [&] (schema_diff::dropped_schema& dt) { + auto& s = *dt.schema.get(); + return db.drop_column_family(s.ks_name(), s.cf_name(), [&] { return dt.jp.value(); }); + }).get(); + + auto& mm = service::get_local_migration_manager(); + auto it = columns_changed.begin(); + std::vector> notifications; + notifications.reserve(tables_diff.size() + views_diff.size()); + auto notify = [&] (auto& r, auto&& f) { boost::range::transform(r, std::back_inserter(notifications), f); }; + notify(tables_diff.created, [&] (auto&& gs) { return mm.notify_create_column_family(gs); }); + notify(tables_diff.altered, [&] (auto&& gs) { return mm.notify_update_column_family(gs, *it++); }); + notify(tables_diff.dropped, [&] (auto&& dt) { return mm.notify_drop_column_family(dt.schema); }); + notify(views_diff.created, [&] (auto&& gs) { return mm.notify_create_view(view_ptr(gs)); }); + notify(views_diff.altered, [&] (auto&& gs) { return mm.notify_update_view(view_ptr(gs), *it++); }); + notify(views_diff.dropped, [&] (auto&& dt) { return mm.notify_drop_view(view_ptr(dt.schema)); }); + when_all(notifications.rbegin(), notifications.rend()).get(); }); }).get(); } -static void merge_tables(distributed& proxy, - std::map&& before, - std::map&& after) -{ - return merge_schemas(proxy, - std::move(before), - std::move(after), - [] (auto&& sm) { return create_table_from_mutations(std::move(sm)); }, - std::mem_fn(&service::migration_manager::notify_create_column_family), - std::mem_fn(&service::migration_manager::notify_update_column_family), - std::mem_fn(&service::migration_manager::notify_drop_column_family)); -} - static inline void collect_types(std::set& keys, schema_result& result, std::vector& to) { for (auto&& key : keys) { @@ -941,19 +964,6 @@ static void merge_types(distributed& proxy, schema_resul }).get(); } -static void merge_views(distributed& proxy, - std::map&& before, - std::map&& after) -{ - return merge_schemas(proxy, - std::move(before), - std::move(after), - [] (auto&& sm) { return create_view_from_mutations(std::move(sm)); }, - [] (auto&& mm, auto&& s) { return mm.notify_create_view(view_ptr(s)); }, - [] (auto&& mm, auto&& s, bool columns_changed) { return mm.notify_update_view(view_ptr(s), columns_changed); }, - [] (auto&& mm, auto&& s) { return mm.notify_drop_view(view_ptr(s)); }); -} - #if 0 // see the comments for mergeKeyspaces() private static void mergeFunctions(Map before, Map after)