storage_service: Pass gossiper object to storage_service

Pass the gossiper object to storage_service class in order to avoid the
usage of the static object returned from get_local_gossiper().
This commit is contained in:
Asias He
2019-02-14 16:21:19 +08:00
parent b2c110699e
commit b91452ed4c
8 changed files with 31 additions and 21 deletions

View File

@@ -37,9 +37,9 @@ logging::logger startlog("init");
// duplicated in cql_test_env.cc
// until proper shutdown is done.
void init_storage_service(distributed<database>& db, sharded<auth::service>& auth_service, sharded<db::system_distributed_keyspace>& sys_dist_ks,
void init_storage_service(distributed<database>& db, sharded<gms::gossiper>& gossiper, sharded<auth::service>& auth_service, sharded<db::system_distributed_keyspace>& sys_dist_ks,
sharded<db::view::view_update_generator>& view_update_generator, sharded<gms::feature_service>& feature_service) {
service::init_storage_service(db, auth_service, sys_dist_ks, view_update_generator, feature_service).get();
service::init_storage_service(db, gossiper, auth_service, sys_dist_ks, view_update_generator, feature_service).get();
// #293 - do not stop anything
//engine().at_exit([] { return service::deinit_storage_service(); });
}
@@ -155,10 +155,7 @@ void init_ms_fd_gossiper(sharded<gms::gossiper>& gossiper
to_string(seeds), listen_address_in, broadcast_address);
throw std::runtime_error("Use broadcast_address for seeds list");
}
gossiper.start(std::ref(features), std::ref(cfg)).get();
gossiper.local().set_seeds(seeds);
// #293 - do not stop anything
//engine().at_exit([]{ return gms::get_gossiper().stop(); });
gossiper.invoke_on_all([cluster_name](gms::gossiper& g) {
g.set_cluster_name(cluster_name);
});

View File

@@ -46,7 +46,7 @@ extern logging::logger startlog;
class bad_configuration_error : public std::exception {};
void init_storage_service(distributed<database>& db, sharded<auth::service>& auth_service, sharded<db::system_distributed_keyspace>& sys_dist_ks,
void init_storage_service(distributed<database>& db, sharded<gms::gossiper>& gossiper, sharded<auth::service>& auth_service, sharded<db::system_distributed_keyspace>& sys_dist_ks,
sharded<db::view::view_update_generator>& view_update_generator, sharded<gms::feature_service>& feature_service);
struct init_scheduling_config {

View File

@@ -522,8 +522,12 @@ int main(int ac, char** av) {
static sharded<auth::service> auth_service;
static sharded<db::system_distributed_keyspace> sys_dist_ks;
static sharded<db::view::view_update_generator> view_update_generator;
auto& gossiper = gms::get_gossiper();
gossiper.start(std::ref(feature_service), std::ref(*cfg)).get();
// #293 - do not stop anything
//engine().at_exit([]{ return gms::get_gossiper().stop(); });
supervisor::notify("initializing storage service");
init_storage_service(db, auth_service, sys_dist_ks, view_update_generator, feature_service);
init_storage_service(db, gossiper, auth_service, sys_dist_ks, view_update_generator, feature_service);
supervisor::notify("starting per-shard database core");
// Note: changed from using a move here, because we want the config object intact.
@@ -622,7 +626,6 @@ int main(int ac, char** av) {
scfg.statement = dbcfg.statement_scheduling_group;
scfg.streaming = dbcfg.streaming_scheduling_group;
scfg.gossip = scheduling_group();
auto& gossiper = gms::get_gossiper();
init_ms_fd_gossiper(gossiper
, feature_service
, *cfg

View File

@@ -131,10 +131,11 @@ int get_generation_number() {
return generation_number;
}
storage_service::storage_service(distributed<database>& db, sharded<auth::service>& auth_service, sharded<db::system_distributed_keyspace>& sys_dist_ks,
storage_service::storage_service(distributed<database>& db, gms::gossiper& gossiper, sharded<auth::service>& auth_service, sharded<db::system_distributed_keyspace>& sys_dist_ks,
sharded<db::view::view_update_generator>& view_update_generator, gms::feature_service& feature_service, std::set<sstring> disabled_features)
: _feature_service(feature_service)
, _db(db)
, _gossiper(gossiper)
, _auth_service(auth_service)
, _disabled_features(std::move(disabled_features))
, _range_tombstones_feature(_feature_service, RANGE_TOMBSTONES_FEATURE)
@@ -3330,9 +3331,9 @@ storage_service::view_build_statuses(sstring keyspace, sstring view_name) const
});
}
future<> init_storage_service(distributed<database>& db, sharded<auth::service>& auth_service, sharded<db::system_distributed_keyspace>& sys_dist_ks,
future<> init_storage_service(distributed<database>& db, sharded<gms::gossiper>& gossiper, sharded<auth::service>& auth_service, sharded<db::system_distributed_keyspace>& sys_dist_ks,
sharded<db::view::view_update_generator>& view_update_generator, sharded<gms::feature_service>& feature_service) {
return service::get_storage_service().start(std::ref(db), std::ref(auth_service), std::ref(sys_dist_ks), std::ref(view_update_generator), std::ref(feature_service));
return service::get_storage_service().start(std::ref(db), std::ref(gossiper), std::ref(auth_service), std::ref(sys_dist_ks), std::ref(view_update_generator), std::ref(feature_service));
}
future<> deinit_storage_service() {

View File

@@ -73,6 +73,7 @@ class boot_strapper;
namespace gms {
class feature_service;
class gossiper;
};
namespace service {
@@ -126,6 +127,7 @@ private:
#endif
gms::feature_service& _feature_service;
distributed<database>& _db;
gms::gossiper& _gossiper;
sharded<auth::service>& _auth_service;
int _update_jobs{0};
// Note that this is obviously only valid for the current shard. Users of
@@ -145,7 +147,7 @@ private:
seastar::metrics::metric_groups _metrics;
std::set<sstring> _disabled_features;
public:
storage_service(distributed<database>& db, sharded<auth::service>&, sharded<db::system_distributed_keyspace>&, sharded<db::view::view_update_generator>&, gms::feature_service& feature_service, /* only for tests */ std::set<sstring> disabled_features = {});
storage_service(distributed<database>& db, gms::gossiper& gossiper, sharded<auth::service>&, sharded<db::system_distributed_keyspace>&, sharded<db::view::view_update_generator>&, gms::feature_service& feature_service, /* only for tests */ std::set<sstring> disabled_features = {});
void isolate_on_error();
void isolate_on_commit_error();
@@ -2311,7 +2313,7 @@ private:
void notify_cql_change(inet_address endpoint, bool ready);
};
future<> init_storage_service(distributed<database>& db, sharded<auth::service>& auth_service, sharded<db::system_distributed_keyspace>& sys_dist_ks,
future<> init_storage_service(distributed<database>& db, sharded<gms::gossiper>& gossiper, sharded<auth::service>& auth_service, sharded<db::system_distributed_keyspace>& sys_dist_ks,
sharded<db::view::view_update_generator>& view_update_generator, sharded<gms::feature_service>& feature_service);
future<> deinit_storage_service();

View File

@@ -362,6 +362,9 @@ public:
feature_service->start().get();
auto stop_feature_service = defer([&] { feature_service->stop().get(); });
// FIXME: split
tst_init_ms_fd_gossiper(*feature_service, *cfg, db::config::seed_provider_type()).get();
distributed<service::storage_proxy>& proxy = service::get_storage_proxy();
distributed<service::migration_manager>& mm = service::get_migration_manager();
distributed<db::batchlog_manager>& bm = db::get_batchlog_manager();
@@ -369,7 +372,7 @@ public:
auto view_update_generator = ::make_shared<seastar::sharded<db::view::view_update_generator>>();
auto& ss = service::get_storage_service();
ss.start(std::ref(*db), std::ref(*auth_service), std::ref(sys_dist_ks), std::ref(*view_update_generator), std::ref(*feature_service), cfg_in.disabled_features).get();
ss.start(std::ref(*db), std::ref(gms::get_gossiper()), std::ref(*auth_service), std::ref(sys_dist_ks), std::ref(*view_update_generator), std::ref(*feature_service), cfg_in.disabled_features).get();
auto stop_storage_service = defer([&ss] { ss.stop().get(); });
database_config dbcfg;
@@ -383,8 +386,6 @@ public:
db.get_compaction_manager().start();
}).get();
// FIXME: split
tst_init_ms_fd_gossiper(*feature_service, *cfg, db::config::seed_provider_type()).get();
auto stop_ms_fd_gossiper = defer([] {
gms::get_gossiper().stop().get();
});
@@ -499,6 +500,7 @@ future<> do_with_cql_env_thread(std::function<void(cql_test_env&)> func, cql_tes
class storage_service_for_tests::impl {
sharded<gms::feature_service> _feature_service;
sharded<gms::gossiper> _gossiper;
distributed<database> _db;
db::config _cfg;
sharded<auth::service> _auth_service;
@@ -508,9 +510,12 @@ public:
impl() {
auto thread = seastar::thread_impl::get();
assert(thread);
utils::fb_utilities::set_broadcast_address(gms::inet_address("localhost"));
utils::fb_utilities::set_broadcast_rpc_address(gms::inet_address("localhost"));
_feature_service.start().get();
_gossiper.start(std::ref(_feature_service), std::ref(_cfg)).get();
netw::get_messaging_service().start(gms::inet_address("127.0.0.1"), 7000, false).get();
service::get_storage_service().start(std::ref(_db), std::ref(_auth_service), std::ref(_sys_dist_ks), std::ref(_view_update_generator), std::ref(_feature_service)).get();
service::get_storage_service().start(std::ref(_db), std::ref(_gossiper), std::ref(_auth_service), std::ref(_sys_dist_ks), std::ref(_view_update_generator), std::ref(_feature_service)).get();
service::get_storage_service().invoke_on_all([] (auto& ss) {
ss.enable_all_features();
}).get();
@@ -519,6 +524,7 @@ public:
service::get_storage_service().stop().get();
netw::get_messaging_service().stop().get();
_db.stop().get();
_gossiper.stop().get();
_feature_service.stop().get();
}
};

View File

@@ -78,7 +78,7 @@ int main(int ac, char ** av) {
feature_service.start().get();
sharded<db::system_distributed_keyspace> sys_dist_ks;
sharded<db::view::view_update_generator> view_update_generator;
service::init_storage_service(db, auth_service, sys_dist_ks, view_update_generator, feature_service).get();
service::init_storage_service(db, gms::get_gossiper(), auth_service, sys_dist_ks, view_update_generator, feature_service).get();
netw::get_messaging_service().start(listen).get();
auto& server = netw::get_local_messaging_service();
auto port = server.port();

View File

@@ -59,7 +59,10 @@ SEASTAR_TEST_CASE(test_boot_shutdown){
netw::get_messaging_service().start(gms::inet_address("127.0.0.1"), 7000, false /* don't bind */).get();
auto stop_messaging_service = defer([&] { netw::get_messaging_service().stop().get(); });
service::get_storage_service().start(std::ref(db), std::ref(auth_service), std::ref(sys_dist_ks), std::ref(view_update_generator), std::ref(feature_service)).get();
gms::get_gossiper().start(std::ref(feature_service), std::ref(cfg)).get();
auto stop_gossiper = defer([&] { gms::get_gossiper().stop().get(); });
service::get_storage_service().start(std::ref(db), std::ref(gms::get_gossiper()), std::ref(auth_service), std::ref(sys_dist_ks), std::ref(view_update_generator), std::ref(feature_service)).get();
auto stop_ss = defer([&] { service::get_storage_service().stop().get(); });
db.start().get();
@@ -68,7 +71,5 @@ SEASTAR_TEST_CASE(test_boot_shutdown){
stop_database(db).get();
});
gms::get_gossiper().start(std::ref(feature_service), std::ref(cfg)).get();
auto stop_gossiper = defer([&] { gms::get_gossiper().stop().get(); });
});
}