db/schema_tables: Atomically publish base and view changes

This patch ensures that the schema merging atomically publishes
schema changes. In particular, it ensures that when a base schema
and a subset of its views are modified together (i.e., upon an alter
table or alter type statement), then they are published together as
well, without any deferring in-between.

Signed-off-by: Duarte Nunes <duarte@scylladb.com>
This commit is contained in:
Duarte Nunes
2017-03-01 17:36:30 +01:00
parent e215f25b11
commit be12a2bf0a

View File

@@ -64,6 +64,7 @@
#include <boost/range/algorithm/copy.hpp>
#include <boost/range/adaptor/map.hpp>
#include <boost/range/join.hpp>
#include "compaction_strategy.hh"
#include "utils/joinpoint.hh"
@@ -113,18 +114,16 @@ struct qualified_name {
static future<schema_mutations> read_table_mutations(distributed<service::storage_proxy>& proxy, const qualified_name& table, schema_ptr s);
static void merge_tables(distributed<service::storage_proxy>& proxy,
std::map<qualified_name, schema_mutations>&& before,
std::map<qualified_name, schema_mutations>&& after);
static void merge_tables_and_views(distributed<service::storage_proxy>& proxy,
std::map<qualified_name, schema_mutations>&& tables_before,
std::map<qualified_name, schema_mutations>&& tables_after,
std::map<qualified_name, schema_mutations>&& views_before,
std::map<qualified_name, schema_mutations>&& views_after);
static void merge_types(distributed<service::storage_proxy>& proxy,
schema_result&& before,
schema_result&& after);
static void merge_views(distributed<service::storage_proxy>& proxy,
std::map<qualified_name, schema_mutations>&& before,
std::map<qualified_name, schema_mutations>&& after);
std::vector<const char*> ALL { KEYSPACES, COLUMNFAMILIES, COLUMNS, TRIGGERS, USERTYPES, VIEWS, FUNCTIONS, AGGREGATES };
using days = std::chrono::duration<int, std::ratio<24 * 3600>>;
@@ -701,9 +700,10 @@ future<> do_merge_schema(distributed<service::storage_proxy>& proxy, std::vector
#endif
std::set<sstring> keyspaces_to_drop = merge_keyspaces(proxy, std::move(old_keyspaces), std::move(new_keyspaces)).get0();
merge_tables(proxy, std::move(old_column_families), std::move(new_column_families));
merge_tables_and_views(proxy,
std::move(old_column_families), std::move(new_column_families),
std::move(old_views), std::move(new_views));
merge_types(proxy, std::move(old_types), std::move(new_types));
merge_views(proxy, std::move(old_views), std::move(new_views));
#if 0
mergeFunctions(oldFunctions, newFunctions);
mergeAggregates(oldAggregates, newAggregates);
@@ -769,16 +769,7 @@ future<std::set<sstring>> merge_keyspaces(distributed<service::storage_proxy>& p
});
}
// see the comments for merge_keyspaces()
template <typename CreateSchema, typename NotifyCreate, typename NotifyUpdate, typename NotifyDrop>
static void merge_schemas(distributed<service::storage_proxy>& proxy,
std::map<qualified_name, schema_mutations>&& before,
std::map<qualified_name, schema_mutations>&& after,
CreateSchema&& create_schema,
NotifyCreate&& notify_create,
NotifyUpdate&& notify_update,
NotifyDrop&& notify_drop)
{
struct schema_diff {
struct dropped_schema {
global_schema_ptr schema;
utils::joinpoint<db_clock::time_point> jp{[] {
@@ -790,57 +781,89 @@ static void merge_schemas(distributed<service::storage_proxy>& proxy,
std::vector<global_schema_ptr> altered;
std::vector<dropped_schema> dropped;
size_t size() const {
return created.size() + altered.size() + dropped.size();
}
};
template<typename CreateSchema>
static schema_diff diff_table_or_view(distributed<service::storage_proxy>& proxy,
std::map<qualified_name, schema_mutations>&& before,
std::map<qualified_name, schema_mutations>&& after,
CreateSchema&& create_schema)
{
schema_diff d;
auto diff = difference(before, after);
for (auto&& key : diff.entries_only_on_left) {
auto&& s = proxy.local().get_db().local().find_schema(key.keyspace_name, key.table_name);
logger.info("Dropping {}.{} id={} version={}", s->ks_name(), s->cf_name(), s->id(), s->version());
dropped.emplace_back(dropped_schema{s});
d.dropped.emplace_back(schema_diff::dropped_schema{s});
}
for (auto&& key : diff.entries_only_on_right) {
auto s = create_schema(std::move(after.at(key)));
logger.info("Creating {}.{} id={} version={}", s->ks_name(), s->cf_name(), s->id(), s->version());
created.emplace_back(s);
d.created.emplace_back(s);
}
for (auto&& key : diff.entries_differing) {
auto s = create_schema(std::move(after.at(key)));
logger.info("Altering {}.{} id={} version={}", s->ks_name(), s->cf_name(), s->id(), s->version());
altered.emplace_back(s);
d.altered.emplace_back(s);
}
return d;
}
// see the comments for merge_keyspaces()
// Atomically publishes schema changes. In particular, this function ensures
// that when a base schema and a subset of its views are modified together (i.e.,
// upon an alter table or alter type statement), then they are published together
// as well, without any deferring in-between.
static void merge_tables_and_views(distributed<service::storage_proxy>& proxy,
std::map<qualified_name, schema_mutations>&& tables_before,
std::map<qualified_name, schema_mutations>&& tables_after,
std::map<qualified_name, schema_mutations>&& views_before,
std::map<qualified_name, schema_mutations>&& views_after)
{
auto tables_diff = diff_table_or_view(proxy, std::move(tables_before), std::move(tables_after), [] (auto&& sm) {
return create_table_from_mutations(std::move(sm));
});
auto views_diff = diff_table_or_view(proxy, std::move(views_before), std::move(views_after), [] (auto&& sm) {
return create_view_from_mutations(std::move(sm));
});
proxy.local().get_db().invoke_on_all([&] (database& db) {
return seastar::async([&] {
for (auto&& gs : created) {
db.add_column_family_and_make_directory(gs).get();
db.find_column_family(gs).mark_ready_for_writes();
notify_create(service::get_local_migration_manager(), gs).get();
}
for (auto&& gs : altered) {
bool columns_changed = db.update_column_family(gs);
notify_update(service::get_local_migration_manager(), gs, columns_changed).get();
}
parallel_for_each(dropped, [&] (dropped_schema& dt) {
schema_ptr s = dt.schema.get();
return db.drop_column_family(s->ks_name(), s->cf_name(), [&] { return dt.jp.value(); }).then([s, &notify_drop] {
return notify_drop(service::get_local_migration_manager(), s);
});
parallel_for_each(boost::range::join(tables_diff.created, views_diff.created), [&] (global_schema_ptr& gs) {
return db.add_column_family_and_make_directory(gs);
}).get();
for (auto&& gs : boost::range::join(tables_diff.created, views_diff.created)) {
db.find_column_family(gs).mark_ready_for_writes();
}
std::vector<bool> columns_changed;
columns_changed.reserve(tables_diff.altered.size() + views_diff.altered.size());
for (auto&& gs : boost::range::join(tables_diff.altered, views_diff.altered)) {
columns_changed.push_back(db.update_column_family(gs));
}
parallel_for_each(boost::range::join(tables_diff.dropped, views_diff.dropped), [&] (schema_diff::dropped_schema& dt) {
auto& s = *dt.schema.get();
return db.drop_column_family(s.ks_name(), s.cf_name(), [&] { return dt.jp.value(); });
}).get();
auto& mm = service::get_local_migration_manager();
auto it = columns_changed.begin();
std::vector<future<>> notifications;
notifications.reserve(tables_diff.size() + views_diff.size());
auto notify = [&] (auto& r, auto&& f) { boost::range::transform(r, std::back_inserter(notifications), f); };
notify(tables_diff.created, [&] (auto&& gs) { return mm.notify_create_column_family(gs); });
notify(tables_diff.altered, [&] (auto&& gs) { return mm.notify_update_column_family(gs, *it++); });
notify(tables_diff.dropped, [&] (auto&& dt) { return mm.notify_drop_column_family(dt.schema); });
notify(views_diff.created, [&] (auto&& gs) { return mm.notify_create_view(view_ptr(gs)); });
notify(views_diff.altered, [&] (auto&& gs) { return mm.notify_update_view(view_ptr(gs), *it++); });
notify(views_diff.dropped, [&] (auto&& dt) { return mm.notify_drop_view(view_ptr(dt.schema)); });
when_all(notifications.rbegin(), notifications.rend()).get();
});
}).get();
}
static void merge_tables(distributed<service::storage_proxy>& proxy,
std::map<qualified_name, schema_mutations>&& before,
std::map<qualified_name, schema_mutations>&& after)
{
return merge_schemas(proxy,
std::move(before),
std::move(after),
[] (auto&& sm) { return create_table_from_mutations(std::move(sm)); },
std::mem_fn(&service::migration_manager::notify_create_column_family),
std::mem_fn(&service::migration_manager::notify_update_column_family),
std::mem_fn(&service::migration_manager::notify_drop_column_family));
}
static inline void collect_types(std::set<sstring>& keys, schema_result& result, std::vector<user_type>& to)
{
for (auto&& key : keys) {
@@ -941,19 +964,6 @@ static void merge_types(distributed<service::storage_proxy>& proxy, schema_resul
}).get();
}
static void merge_views(distributed<service::storage_proxy>& proxy,
std::map<qualified_name, schema_mutations>&& before,
std::map<qualified_name, schema_mutations>&& after)
{
return merge_schemas(proxy,
std::move(before),
std::move(after),
[] (auto&& sm) { return create_view_from_mutations(std::move(sm)); },
[] (auto&& mm, auto&& s) { return mm.notify_create_view(view_ptr(s)); },
[] (auto&& mm, auto&& s, bool columns_changed) { return mm.notify_update_view(view_ptr(s), columns_changed); },
[] (auto&& mm, auto&& s) { return mm.notify_drop_view(view_ptr(s)); });
}
#if 0
// see the comments for mergeKeyspaces()
private static void mergeFunctions(Map<DecoratedKey, ColumnFamily> before, Map<DecoratedKey, ColumnFamily> after)