database: Extract common create cf code

This patch moves some duplicate code into the
add_column_family_and_create_directory() function. It also saves some
superfluous keyspace lookups and readies the code to be used by
materialized views.

Signed-off-by: Duarte Nunes <duarte@scylladb.com>
This commit is contained in:
Duarte Nunes
2016-12-14 11:46:11 +01:00
parent 42242273f6
commit 40c684b5f5
4 changed files with 17 additions and 22 deletions

View File

@@ -1971,11 +1971,7 @@ future<> database::parse_system_tables(distributed<service::storage_proxy>& prox
return do_parse_system_tables(proxy, db::schema_tables::COLUMNFAMILIES, [this, &proxy] (schema_result_value_type &v) {
return create_tables_from_tables_partition(proxy, v.second).then([this] (std::map<sstring, schema_ptr> tables) {
return parallel_for_each(tables.begin(), tables.end(), [this] (auto& t) {
auto s = t.second;
auto& ks = this->find_keyspace(s->ks_name());
auto cfg = ks.make_column_family_config(*s, this->get_config());
this->add_column_family(s, std::move(cfg));
return ks.make_directory_for_column_family(s->cf_name(), s->id()).then([s] {});
return this->add_column_family_and_make_directory(t.second);
});
});
});
@@ -2068,10 +2064,10 @@ void database::drop_keyspace(const sstring& name) {
_keyspaces.erase(name);
}
void database::add_column_family(schema_ptr schema, column_family::config cfg) {
void database::add_column_family(keyspace& ks, schema_ptr schema, column_family::config cfg) {
schema = local_schema_registry().learn(schema);
schema->registry_entry()->mark_synced();
auto uuid = schema->id();
lw_shared_ptr<column_family> cf;
if (cfg.enable_commitlog && _commitlog) {
cf = make_lw_shared<column_family>(schema, std::move(cfg), *_commitlog, _compaction_manager);
@@ -2079,10 +2075,7 @@ void database::add_column_family(schema_ptr schema, column_family::config cfg) {
cf = make_lw_shared<column_family>(schema, std::move(cfg), column_family::no_commitlog(), _compaction_manager);
}
auto ks = _keyspaces.find(schema->ks_name());
if (ks == _keyspaces.end()) {
throw std::invalid_argument("Keyspace " + schema->ks_name() + " not defined");
}
auto uuid = schema->id();
if (_column_families.count(uuid) != 0) {
throw std::invalid_argument("UUID " + uuid.to_sstring() + " already mapped");
}
@@ -2090,12 +2083,18 @@ void database::add_column_family(schema_ptr schema, column_family::config cfg) {
if (_ks_cf_to_uuid.count(kscf) != 0) {
throw std::invalid_argument("Column family " + schema->cf_name() + " exists");
}
ks->second.add_or_update_column_family(schema);
ks.add_or_update_column_family(schema);
cf->start();
_column_families.emplace(uuid, std::move(cf));
_ks_cf_to_uuid.emplace(std::move(kscf), uuid);
}
future<> database::add_column_family_and_make_directory(schema_ptr schema) {
auto& ks = find_keyspace(schema->ks_name());
add_column_family(ks, schema, ks.make_column_family_config(*schema, get_config()));
return ks.make_directory_for_column_family(schema->cf_name(), schema->id());
}
future<> database::drop_column_family(const sstring& ks_name, const sstring& cf_name, timestamp_func tsf) {
auto uuid = find_uuid(ks_name, cf_name);
auto& ks = find_keyspace(ks_name);

View File

@@ -1142,7 +1142,8 @@ public:
future<> init_system_keyspace();
future<> load_sstables(distributed<service::storage_proxy>& p); // after init_system_keyspace()
void add_column_family(schema_ptr schema, column_family::config cfg);
void add_column_family(keyspace& ks, schema_ptr schema, column_family::config cfg);
future<> add_column_family_and_make_directory(schema_ptr schema);
/* throws std::out_of_range if missing */
const utils::UUID& find_uuid(const sstring& ks, const sstring& cf) const;

View File

@@ -795,14 +795,9 @@ static void merge_tables(distributed<service::storage_proxy>& proxy,
proxy.local().get_db().invoke_on_all([&created, &dropped, &altered] (database& db) {
return seastar::async([&] {
for (auto&& gs : created) {
schema_ptr s = gs.get();
auto& ks = db.find_keyspace(s->ks_name());
auto cfg = ks.make_column_family_config(*s, db.get_config());
db.add_column_family(s, cfg);
auto& cf = db.find_column_family(s);
cf.mark_ready_for_writes();
ks.make_directory_for_column_family(s->cf_name(), s->id()).get();
service::get_local_migration_manager().notify_create_column_family(s).get();
db.add_column_family_and_make_directory(gs).get();
db.find_column_family(gs).mark_ready_for_writes();
service::get_local_migration_manager().notify_create_column_family(gs).get();
}
for (auto&& gs : altered) {
update_column_family(db, gs.get()).get();

View File

@@ -1060,7 +1060,7 @@ void make(database& db, bool durable, bool volatile_testing_only) {
if (maybe_write_in_user_memory(table, db)) {
cfg.dirty_memory_manager = &db._dirty_memory_manager;
}
db.add_column_family(table, std::move(cfg));
db.add_column_family(ks, table, std::move(cfg));
maybe_add_virtual_reader(table, db);
}
}