diff --git a/database.cc b/database.cc index 47e309a38e..f949ce39d9 100644 --- a/database.cc +++ b/database.cc @@ -584,9 +584,15 @@ create_keyspace(distributed& db, const lw_shared_ptrdata_file_directories()[0] + "/" + ksm->name()).then([ksm, &db] { return db.invoke_on_all([ksm] (database& db) { auto cfg = db.make_keyspace_config(*ksm); + keyspace ks(ksm, cfg); - ks.create_replication_strategy(); - db.add_keyspace(ksm->name(), std::move(ks)); + auto fu = ks.create_replication_strategy(); + + return fu.then([&db, ks = std::move(ks), ksm] () mutable { + db.add_keyspace(ksm->name(), std::move(ks)); + + return make_ready_future<>(); + }); }); }); // FIXME: rollback on error, or keyspace directory remains on disk, poisoning @@ -692,11 +698,13 @@ const column_family& database::find_column_family(const utils::UUID& uuid) const } } -void +future<> keyspace::create_replication_strategy() { - static thread_local locator::token_metadata tm; - static locator::simple_snitch snitch; + using namespace locator; + + static thread_local token_metadata tm; static std::unordered_map options = {{"replication_factor", "3"}}; + auto d2t = [](double d) { unsigned long l = net::hton(static_cast(d*(std::numeric_limits::max()))); std::array a; @@ -707,7 +715,16 @@ keyspace::create_replication_strategy() { tm.update_normal_token({dht::token::kind::key, {d2t(1.0/4).data(), 8}}, to_sstring("127.0.0.2")); tm.update_normal_token({dht::token::kind::key, {d2t(2.0/4).data(), 8}}, to_sstring("127.0.0.3")); tm.update_normal_token({dht::token::kind::key, {d2t(3.0/4).data(), 8}}, to_sstring("127.0.0.4")); - _replication_strategy = locator::abstract_replication_strategy::create_replication_strategy(_metadata->name(), _metadata->strategy_name(), tm, snitch, options); + + return make_snitch().then( + [this] (snitch_ptr&& s) { + _replication_strategy = + abstract_replication_strategy::create_replication_strategy( + _metadata->name(), _metadata->strategy_name(), + tm, std::move(s), options); + + return make_ready_future<>(); + }); } locator::abstract_replication_strategy& @@ -750,15 +767,21 @@ schema_ptr database::find_schema(const utils::UUID& uuid) const throw (no_such_c return find_column_family(uuid).schema(); } -keyspace& -database::find_or_create_keyspace(const lw_shared_ptr& ksm) { +future<> +database::create_keyspace(const lw_shared_ptr& ksm) { auto i = _keyspaces.find(ksm->name()); if (i != _keyspaces.end()) { - return i->second; + return make_ready_future<>(); } + keyspace ks(ksm, std::move(make_keyspace_config(*ksm))); - ks.create_replication_strategy(); - return _keyspaces.emplace(ksm->name(), std::move(ks)).first->second; + auto fu = ks.create_replication_strategy(); + + return fu.then([ks = std::move(ks), ksm, this] () mutable { + _keyspaces.emplace(ksm->name(), std::move(ks)); + + return make_ready_future<>(); + }); } std::set @@ -1018,5 +1041,7 @@ operator<<(std::ostream& os, const atomic_cell& ac) { future<> database::stop() { - return make_ready_future<>(); + return do_for_each(_keyspaces, [this] (auto& val_pair) { + return val_pair.second.stop(); + }); } diff --git a/database.hh b/database.hh index fee97f9619..4d2bfa055a 100644 --- a/database.hh +++ b/database.hh @@ -214,13 +214,20 @@ public: const lw_shared_ptr& metadata() const { return _metadata; } - void create_replication_strategy(); + future<> create_replication_strategy(); locator::abstract_replication_strategy& get_replication_strategy(); column_family::config make_column_family_config(const schema& s) const; future<> make_directory_for_column_family(const sstring& name, utils::UUID uuid); void add_column_family(const schema_ptr& s) { _metadata->add_column_family(s); } + future<> stop() { + if (_replication_strategy) { + return _replication_strategy->stop(); + } + + return make_ready_future<>(); + } private: sstring column_family_directory(const sstring& name, utils::UUID uuid) const; }; @@ -276,7 +283,7 @@ public: const utils::UUID& find_uuid(const schema_ptr&) const throw (std::out_of_range); /* below, find* throws no_such_ on fail */ - keyspace& find_or_create_keyspace(const lw_shared_ptr&); + future<> create_keyspace(const lw_shared_ptr&); keyspace& find_keyspace(const sstring& name) throw (no_such_keyspace); const keyspace& find_keyspace(const sstring& name) const throw (no_such_keyspace); bool has_keyspace(const sstring& name) const; diff --git a/db/legacy_schema_tables.cc b/db/legacy_schema_tables.cc index 7e3fb48698..20dfe692af 100644 --- a/db/legacy_schema_tables.cc +++ b/db/legacy_schema_tables.cc @@ -580,17 +580,32 @@ std::vector ALL { KEYSPACES, COLUMNFAMILIES, COLUMNS, TRIGGERS, USE created.emplace_back(schema_result::value_type{key, std::move(post)}); } } + return do_with(std::move(created), [&proxy, altered = std::move(altered)] (auto& created) { - return proxy.get_db().invoke_on_all([&created, altered = std::move(altered)] (database& db) { - for (auto&& kv : created) { - auto ksm = create_keyspace_from_schema_partition(kv); - keyspace k(ksm, db.make_keyspace_config(*ksm)); - k.create_replication_strategy(); - db.add_keyspace(ksm->name(), std::move(k)); - } - for (auto&& name : altered) { - db.update_keyspace(name); - } + return proxy.get_db().invoke_on_all([&proxy, &created, altered = std::move(altered)] (database& db) { + auto temp_vec_ptr = make_lw_shared, std::unique_ptr>>>(); + return do_for_each(created, + [&db, temp_vec_ptr] (auto&& val) { + + auto ksm = create_keyspace_from_schema_partition(val); + std::unique_ptr + k_ptr(new keyspace(ksm, db.make_keyspace_config(*ksm))); + auto fu = k_ptr->create_replication_strategy(); + temp_vec_ptr->emplace_back(ksm, std::move(k_ptr)); + + return fu; + }).then([&db, temp_vec_ptr] { + return do_for_each(*temp_vec_ptr, [&db] (auto&& p_val) { + db.add_keyspace(p_val.first->name(), std::move(*p_val.second)); + return make_ready_future<>(); + }); + }).then([altered = std::move(altered), &db] () mutable { + for (auto&& name : altered) { + db.update_keyspace(name); + } + + return make_ready_future<>(); + }); }); }).then([dropped = std::move(dropped)] () { return make_ready_future>(dropped); diff --git a/locator/abstract_replication_strategy.cc b/locator/abstract_replication_strategy.cc index 5344a3be17..6a73e87e89 100644 --- a/locator/abstract_replication_strategy.cc +++ b/locator/abstract_replication_strategy.cc @@ -7,12 +7,17 @@ namespace locator { -abstract_replication_strategy::abstract_replication_strategy(const sstring& ks_name, token_metadata& token_metadata, i_endpoint_snitch& snitch, std::unordered_map& config_options) : - _ks_name(ks_name), _config_options(config_options), _token_metadata(token_metadata), _snitch(snitch) {} +abstract_replication_strategy::abstract_replication_strategy(const sstring& ks_name, token_metadata& token_metadata, snitch_ptr&& snitch, std::unordered_map& config_options) : + _ks_name(ks_name), _config_options(config_options), _token_metadata(token_metadata), _snitch(std::move(snitch)) {} -std::unique_ptr abstract_replication_strategy::create_replication_strategy(const sstring& ks_name, const sstring& strategy_name, token_metadata& tk_metadata, i_endpoint_snitch& snitch, std::unordered_map& config_options) { +std::unique_ptr abstract_replication_strategy::create_replication_strategy(const sstring& ks_name, const sstring& strategy_name, token_metadata& tk_metadata, snitch_ptr&& snitch, std::unordered_map& config_options) { sstring class_name = strategy_name.find(".") != sstring::npos ? strategy_name : "org.apache.cassandra.locator." + strategy_name; - return create_object(class_name, ks_name, tk_metadata, snitch, config_options); + return create_object&> + (class_name, ks_name, tk_metadata, std::move(snitch), config_options); } std::vector abstract_replication_strategy::get_natural_endpoints(const token& search_token) { diff --git a/locator/abstract_replication_strategy.hh b/locator/abstract_replication_strategy.hh index 853f2bcebd..a56744bf24 100644 --- a/locator/abstract_replication_strategy.hh +++ b/locator/abstract_replication_strategy.hh @@ -26,13 +26,14 @@ protected: keyspace* _keyspace = nullptr; std::unordered_map _config_options; token_metadata& _token_metadata; - i_endpoint_snitch& _snitch; + snitch_ptr _snitch; virtual std::vector calculate_natural_endpoints(const token& search_token) = 0; public: - abstract_replication_strategy(const sstring& keyspace_name, token_metadata& token_metadata, i_endpoint_snitch& snitch, std::unordered_map& config_options); + abstract_replication_strategy(const sstring& keyspace_name, token_metadata& token_metadata, snitch_ptr&& snitch, std::unordered_map& config_options); virtual ~abstract_replication_strategy() {} - static std::unique_ptr create_replication_strategy(const sstring& ks_name, const sstring& strategy_name, token_metadata& token_metadata, i_endpoint_snitch& snitch, std::unordered_map& config_options); + static std::unique_ptr create_replication_strategy(const sstring& ks_name, const sstring& strategy_name, token_metadata& token_metadata, snitch_ptr&& snitch, std::unordered_map& config_options); std::vector get_natural_endpoints(const token& search_token); + future<> stop() { return _snitch->stop(); } }; } diff --git a/locator/rack_inferring_snitch.hh b/locator/rack_inferring_snitch.hh index cf21b80cfd..bc6eb2ba3c 100644 --- a/locator/rack_inferring_snitch.hh +++ b/locator/rack_inferring_snitch.hh @@ -34,12 +34,20 @@ using inet_address = gms::inet_address; * A simple endpoint snitch implementation that assumes datacenter and rack information is encoded * in the 2nd and 3rd octets of the ip address, respectively. */ -struct rack_inferring_snitch : public snitch_base { +class rack_inferring_snitch : public snitch_base { +private: + template + friend future make_snitch(A&&... a); + rack_inferring_snitch() { _my_dc = get_datacenter(utils::fb_utilities::get_broadcast_address()); _my_rack = get_rack(utils::fb_utilities::get_broadcast_address()); + + // This snitch is ready on creation + _snitch_is_ready.set_value(); } +public: virtual sstring get_rack(inet_address endpoint) override { return std::to_string((endpoint.raw_addr() >> 8) & 0xFF); } @@ -47,6 +55,12 @@ struct rack_inferring_snitch : public snitch_base { virtual sstring get_datacenter(inet_address endpoint) override { return std::to_string((endpoint.raw_addr() >> 16) & 0xFF); } + + // noop + virtual future<> stop() override { + _state = snitch_state::stopped; + return make_ready_future<>(); + } }; } // namespace locator diff --git a/locator/simple_snitch.hh b/locator/simple_snitch.hh index 6a7d27342d..65770efebb 100644 --- a/locator/simple_snitch.hh +++ b/locator/simple_snitch.hh @@ -22,6 +22,7 @@ #pragma once #include "snitch_base.hh" #include "utils/fb_utilities.hh" +#include namespace locator { @@ -30,12 +31,19 @@ namespace locator { * allowing non-read-repaired reads to prefer a single endpoint, which improves * cache locality. */ -struct simple_snitch : public snitch_base { +class simple_snitch : public snitch_base { + template + friend future make_snitch(A&&... a); + simple_snitch() { _my_dc = get_datacenter(utils::fb_utilities::get_broadcast_address()); _my_rack = get_rack(utils::fb_utilities::get_broadcast_address()); + + // This snitch is ready on creation + _snitch_is_ready.set_value(); } +public: virtual sstring get_rack(inet_address endpoint) override { return "rack1"; } @@ -66,6 +74,12 @@ struct simple_snitch : public snitch_base { // return 0; } + + // noop + virtual future<> stop() override { + _state = snitch_state::stopped; + return make_ready_future<>(); + } }; } diff --git a/locator/simple_strategy.cc b/locator/simple_strategy.cc index 72732c8ce1..95b3fca9ba 100644 --- a/locator/simple_strategy.cc +++ b/locator/simple_strategy.cc @@ -8,8 +8,8 @@ namespace locator { -simple_strategy::simple_strategy(const sstring& keyspace_name, token_metadata& token_metadata, i_endpoint_snitch& snitch, std::unordered_map& config_options) : - abstract_replication_strategy(keyspace_name, token_metadata, snitch, config_options) {} +simple_strategy::simple_strategy(const sstring& keyspace_name, token_metadata& token_metadata, snitch_ptr&& snitch, std::unordered_map& config_options) : + abstract_replication_strategy(keyspace_name, token_metadata, std::move(snitch), config_options) {} std::vector simple_strategy::calculate_natural_endpoints(const token& t) { size_t replicas = get_replication_factor(); @@ -47,7 +47,7 @@ size_t simple_strategy::get_replication_factor() const { return std::stol(it->second); } -using registry = class_registrator&>; +using registry = class_registrator&>; static registry registrator("org.apache.cassandra.locator.SimpleStrategy"); } diff --git a/locator/simple_strategy.hh b/locator/simple_strategy.hh index 7d20402385..98153ccad8 100644 --- a/locator/simple_strategy.hh +++ b/locator/simple_strategy.hh @@ -12,7 +12,7 @@ class simple_strategy : public abstract_replication_strategy { protected: virtual std::vector calculate_natural_endpoints(const token& search_token) override; public: - simple_strategy(const sstring& keyspace_name, token_metadata& token_metadata, i_endpoint_snitch& snitch, std::unordered_map& config_options); + simple_strategy(const sstring& keyspace_name, token_metadata& token_metadata, snitch_ptr&& snitch, std::unordered_map& config_options); virtual ~simple_strategy() {}; size_t get_replication_factor() const; }; diff --git a/locator/snitch_base.hh b/locator/snitch_base.hh index e823f4c2e8..a649edeef0 100644 --- a/locator/snitch_base.hh +++ b/locator/snitch_base.hh @@ -25,10 +25,17 @@ #include #include "gms/inet_address.hh" +#include "core/shared_ptr.hh" namespace locator { -using inet_address = gms::inet_address; +struct i_endpoint_snitch; + +typedef gms::inet_address inet_address; +typedef std::unique_ptr snitch_ptr; + +template +static future make_snitch(A&&... a); struct i_endpoint_snitch { @@ -79,9 +86,46 @@ struct i_endpoint_snitch std::vector& l1, std::vector& l2) = 0; - virtual ~i_endpoint_snitch() {}; + virtual ~i_endpoint_snitch() { assert(_state == snitch_state::stopped); }; + + virtual future<> stop() = 0; + + template + friend future make_snitch(A&&... a); + +protected: + promise<> _snitch_is_ready; + enum class snitch_state { + initializing, + running, + stopping, + stopped + } _state = snitch_state::initializing; }; +template +static future make_snitch(A&&... a) { + snitch_ptr s(new SnitchClass(std::forward(a)...)); + + auto fu = s->_snitch_is_ready.get_future(); + return fu.then_wrapped([s = std::move(s)] (auto&& f) mutable { + try { + f.get(); + + return make_ready_future(std::move(s)); + } catch (...) { + auto eptr = std::current_exception(); + auto fu = s->stop(); + + return fu.then([eptr, s = std::move(s)] () mutable { + std::rethrow_exception(eptr); + // just to make a compiler happy + return make_ready_future(std::move(s)); + }); + } + }); +} + class snitch_base : public i_endpoint_snitch { public: // diff --git a/tests/urchin/cql_query_test.cc b/tests/urchin/cql_query_test.cc index 43f7b9a466..4b917fe211 100644 --- a/tests/urchin/cql_query_test.cc +++ b/tests/urchin/cql_query_test.cc @@ -797,11 +797,16 @@ SEASTAR_TEST_CASE(test_user_type) { false ); // We don't have "CREATE TYPE" yet, so we must insert the type manually - e.local_db().find_or_create_keyspace(ksm)._user_types.add_type(make_user_type()); - return e.create_table([make_user_type] (auto ks_name) { + return e.local_db().create_keyspace(ksm).then( + [&e, make_user_type, ksm] { + keyspace& ks = e.local_db().find_keyspace(ksm->name()); + ks._user_types.add_type(make_user_type()); + + return e.create_table([make_user_type] (auto ks_name) { // CQL: "create table cf (id int primary key, t ut1)"; - return schema({}, ks_name, "cf", - {{"id", int32_type}}, {}, {{"t", make_user_type()}}, {}, utf8_type); + return schema({}, ks_name, "cf", + {{"id", int32_type}}, {}, {{"t", make_user_type()}}, {}, utf8_type); + }); }).then([&e] { return e.execute_cql("insert into cf (id, t) values (1, (1001, 2001, 'abc1'));").discard_result(); }).then([&e] {