diff --git a/init.cc b/init.cc index 86a22e4142..5ba5dd6f3c 100644 --- a/init.cc +++ b/init.cc @@ -37,9 +37,9 @@ logging::logger startlog("init"); // duplicated in cql_test_env.cc // until proper shutdown is done. -void init_storage_service(distributed& db, sharded& auth_service, sharded& sys_dist_ks, +void init_storage_service(distributed& db, sharded& gossiper, sharded& auth_service, sharded& sys_dist_ks, sharded& view_update_generator, sharded& feature_service) { - service::init_storage_service(db, auth_service, sys_dist_ks, view_update_generator, feature_service).get(); + service::init_storage_service(db, gossiper, auth_service, sys_dist_ks, view_update_generator, feature_service).get(); // #293 - do not stop anything //engine().at_exit([] { return service::deinit_storage_service(); }); } @@ -155,10 +155,7 @@ void init_ms_fd_gossiper(sharded& gossiper to_string(seeds), listen_address_in, broadcast_address); throw std::runtime_error("Use broadcast_address for seeds list"); } - gossiper.start(std::ref(features), std::ref(cfg)).get(); gossiper.local().set_seeds(seeds); - // #293 - do not stop anything - //engine().at_exit([]{ return gms::get_gossiper().stop(); }); gossiper.invoke_on_all([cluster_name](gms::gossiper& g) { g.set_cluster_name(cluster_name); }); diff --git a/init.hh b/init.hh index eb26ec4ed7..833d6c6378 100644 --- a/init.hh +++ b/init.hh @@ -46,7 +46,7 @@ extern logging::logger startlog; class bad_configuration_error : public std::exception {}; -void init_storage_service(distributed& db, sharded& auth_service, sharded& sys_dist_ks, +void init_storage_service(distributed& db, sharded& gossiper, sharded& auth_service, sharded& sys_dist_ks, sharded& view_update_generator, sharded& feature_service); struct init_scheduling_config { diff --git a/main.cc b/main.cc index ebef0da7da..7ce431517c 100644 --- a/main.cc +++ b/main.cc @@ -522,8 +522,12 @@ int main(int ac, char** av) { static sharded auth_service; static sharded sys_dist_ks; static sharded view_update_generator; + auto& gossiper = gms::get_gossiper(); + gossiper.start(std::ref(feature_service), std::ref(*cfg)).get(); + // #293 - do not stop anything + //engine().at_exit([]{ return gms::get_gossiper().stop(); }); supervisor::notify("initializing storage service"); - init_storage_service(db, auth_service, sys_dist_ks, view_update_generator, feature_service); + init_storage_service(db, gossiper, auth_service, sys_dist_ks, view_update_generator, feature_service); supervisor::notify("starting per-shard database core"); // Note: changed from using a move here, because we want the config object intact. @@ -622,7 +626,6 @@ int main(int ac, char** av) { scfg.statement = dbcfg.statement_scheduling_group; scfg.streaming = dbcfg.streaming_scheduling_group; scfg.gossip = scheduling_group(); - auto& gossiper = gms::get_gossiper(); init_ms_fd_gossiper(gossiper , feature_service , *cfg diff --git a/service/storage_service.cc b/service/storage_service.cc index 800489dd9c..477a9c2a6a 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -131,10 +131,11 @@ int get_generation_number() { return generation_number; } -storage_service::storage_service(distributed& db, sharded& auth_service, sharded& sys_dist_ks, +storage_service::storage_service(distributed& db, gms::gossiper& gossiper, sharded& auth_service, sharded& sys_dist_ks, sharded& view_update_generator, gms::feature_service& feature_service, std::set disabled_features) : _feature_service(feature_service) , _db(db) + , _gossiper(gossiper) , _auth_service(auth_service) , _disabled_features(std::move(disabled_features)) , _range_tombstones_feature(_feature_service, RANGE_TOMBSTONES_FEATURE) @@ -3330,9 +3331,9 @@ storage_service::view_build_statuses(sstring keyspace, sstring view_name) const }); } -future<> init_storage_service(distributed& db, sharded& auth_service, sharded& sys_dist_ks, +future<> init_storage_service(distributed& db, sharded& gossiper, sharded& auth_service, sharded& sys_dist_ks, sharded& view_update_generator, sharded& feature_service) { - return service::get_storage_service().start(std::ref(db), std::ref(auth_service), std::ref(sys_dist_ks), std::ref(view_update_generator), std::ref(feature_service)); + return service::get_storage_service().start(std::ref(db), std::ref(gossiper), std::ref(auth_service), std::ref(sys_dist_ks), std::ref(view_update_generator), std::ref(feature_service)); } future<> deinit_storage_service() { diff --git a/service/storage_service.hh b/service/storage_service.hh index 9bc9bbc882..2ae0092506 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -73,6 +73,7 @@ class boot_strapper; namespace gms { class feature_service; +class gossiper; }; namespace service { @@ -126,6 +127,7 @@ private: #endif gms::feature_service& _feature_service; distributed& _db; + gms::gossiper& _gossiper; sharded& _auth_service; int _update_jobs{0}; // Note that this is obviously only valid for the current shard. Users of @@ -145,7 +147,7 @@ private: seastar::metrics::metric_groups _metrics; std::set _disabled_features; public: - storage_service(distributed& db, sharded&, sharded&, sharded&, gms::feature_service& feature_service, /* only for tests */ std::set disabled_features = {}); + storage_service(distributed& db, gms::gossiper& gossiper, sharded&, sharded&, sharded&, gms::feature_service& feature_service, /* only for tests */ std::set disabled_features = {}); void isolate_on_error(); void isolate_on_commit_error(); @@ -2311,7 +2313,7 @@ private: void notify_cql_change(inet_address endpoint, bool ready); }; -future<> init_storage_service(distributed& db, sharded& auth_service, sharded& sys_dist_ks, +future<> init_storage_service(distributed& db, sharded& gossiper, sharded& auth_service, sharded& sys_dist_ks, sharded& view_update_generator, sharded& feature_service); future<> deinit_storage_service(); diff --git a/tests/cql_test_env.cc b/tests/cql_test_env.cc index 7900b852ae..f4bc387004 100644 --- a/tests/cql_test_env.cc +++ b/tests/cql_test_env.cc @@ -362,6 +362,9 @@ public: feature_service->start().get(); auto stop_feature_service = defer([&] { feature_service->stop().get(); }); + // FIXME: split + tst_init_ms_fd_gossiper(*feature_service, *cfg, db::config::seed_provider_type()).get(); + distributed& proxy = service::get_storage_proxy(); distributed& mm = service::get_migration_manager(); distributed& bm = db::get_batchlog_manager(); @@ -369,7 +372,7 @@ public: auto view_update_generator = ::make_shared>(); auto& ss = service::get_storage_service(); - ss.start(std::ref(*db), std::ref(*auth_service), std::ref(sys_dist_ks), std::ref(*view_update_generator), std::ref(*feature_service), cfg_in.disabled_features).get(); + ss.start(std::ref(*db), std::ref(gms::get_gossiper()), std::ref(*auth_service), std::ref(sys_dist_ks), std::ref(*view_update_generator), std::ref(*feature_service), cfg_in.disabled_features).get(); auto stop_storage_service = defer([&ss] { ss.stop().get(); }); database_config dbcfg; @@ -383,8 +386,6 @@ public: db.get_compaction_manager().start(); }).get(); - // FIXME: split - tst_init_ms_fd_gossiper(*feature_service, *cfg, db::config::seed_provider_type()).get(); auto stop_ms_fd_gossiper = defer([] { gms::get_gossiper().stop().get(); }); @@ -499,6 +500,7 @@ future<> do_with_cql_env_thread(std::function func, cql_tes class storage_service_for_tests::impl { sharded _feature_service; + sharded _gossiper; distributed _db; db::config _cfg; sharded _auth_service; @@ -508,9 +510,12 @@ public: impl() { auto thread = seastar::thread_impl::get(); assert(thread); + utils::fb_utilities::set_broadcast_address(gms::inet_address("localhost")); + utils::fb_utilities::set_broadcast_rpc_address(gms::inet_address("localhost")); _feature_service.start().get(); + _gossiper.start(std::ref(_feature_service), std::ref(_cfg)).get(); netw::get_messaging_service().start(gms::inet_address("127.0.0.1"), 7000, false).get(); - service::get_storage_service().start(std::ref(_db), std::ref(_auth_service), std::ref(_sys_dist_ks), std::ref(_view_update_generator), std::ref(_feature_service)).get(); + service::get_storage_service().start(std::ref(_db), std::ref(_gossiper), std::ref(_auth_service), std::ref(_sys_dist_ks), std::ref(_view_update_generator), std::ref(_feature_service)).get(); service::get_storage_service().invoke_on_all([] (auto& ss) { ss.enable_all_features(); }).get(); @@ -519,6 +524,7 @@ public: service::get_storage_service().stop().get(); netw::get_messaging_service().stop().get(); _db.stop().get(); + _gossiper.stop().get(); _feature_service.stop().get(); } }; diff --git a/tests/gossip.cc b/tests/gossip.cc index 560e3133ae..b512d43b5c 100644 --- a/tests/gossip.cc +++ b/tests/gossip.cc @@ -78,7 +78,7 @@ int main(int ac, char ** av) { feature_service.start().get(); sharded sys_dist_ks; sharded view_update_generator; - service::init_storage_service(db, auth_service, sys_dist_ks, view_update_generator, feature_service).get(); + service::init_storage_service(db, gms::get_gossiper(), auth_service, sys_dist_ks, view_update_generator, feature_service).get(); netw::get_messaging_service().start(listen).get(); auto& server = netw::get_local_messaging_service(); auto port = server.port(); diff --git a/tests/gossip_test.cc b/tests/gossip_test.cc index ed6bab0659..eac18e345e 100644 --- a/tests/gossip_test.cc +++ b/tests/gossip_test.cc @@ -59,7 +59,10 @@ SEASTAR_TEST_CASE(test_boot_shutdown){ netw::get_messaging_service().start(gms::inet_address("127.0.0.1"), 7000, false /* don't bind */).get(); auto stop_messaging_service = defer([&] { netw::get_messaging_service().stop().get(); }); - service::get_storage_service().start(std::ref(db), std::ref(auth_service), std::ref(sys_dist_ks), std::ref(view_update_generator), std::ref(feature_service)).get(); + gms::get_gossiper().start(std::ref(feature_service), std::ref(cfg)).get(); + auto stop_gossiper = defer([&] { gms::get_gossiper().stop().get(); }); + + service::get_storage_service().start(std::ref(db), std::ref(gms::get_gossiper()), std::ref(auth_service), std::ref(sys_dist_ks), std::ref(view_update_generator), std::ref(feature_service)).get(); auto stop_ss = defer([&] { service::get_storage_service().stop().get(); }); db.start().get(); @@ -68,7 +71,5 @@ SEASTAR_TEST_CASE(test_boot_shutdown){ stop_database(db).get(); }); - gms::get_gossiper().start(std::ref(feature_service), std::ref(cfg)).get(); - auto stop_gossiper = defer([&] { gms::get_gossiper().stop().get(); }); }); }