diff --git a/gms/gossiper.cc b/gms/gossiper.cc index c178d95eec..8a59e03138 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -1353,8 +1353,8 @@ void gossiper::add_saved_endpoint(inet_address ep) { logger.trace("Adding saved endpoint {} {}", ep, ep_state.get_heart_beat_state().get_generation()); } -void gossiper::add_local_application_state(application_state state, versioned_value value) { - seastar::async([this, g = this->shared_from_this(), state, value = std::move(value)] () mutable { +future<> gossiper::add_local_application_state(application_state state, versioned_value value) { + return seastar::async([this, g = this->shared_from_this(), state, value = std::move(value)] () mutable { inet_address ep_addr = get_broadcast_address(); assert(endpoint_state_map.count(ep_addr)); endpoint_state& ep_state = endpoint_state_map.at(ep_addr); @@ -1376,14 +1376,6 @@ void gossiper::add_local_application_state(application_state state, versioned_va }); } -void gossiper::add_lccal_application_states(std::list > states) { - // Note: The taskLock in Origin code is removed, we can probaby use a - // simple data structure here - for (std::pair& pair : states) { - add_local_application_state(pair.first, pair.second); - } -} - future<> gossiper::stop() { logger.debug("gossip::stop on cpu {}", engine().cpu_id()); diff --git a/gms/gossiper.hh b/gms/gossiper.hh index bae92d8c9d..b9bd55b3e7 100644 --- a/gms/gossiper.hh +++ b/gms/gossiper.hh @@ -463,9 +463,7 @@ public: */ void add_saved_endpoint(inet_address ep); - void add_local_application_state(application_state state, versioned_value value); - - void add_lccal_application_states(std::list> states); + future<> add_local_application_state(application_state state, versioned_value value); future<> stop(); diff --git a/service/storage_service.cc b/service/storage_service.cc index 2fc147243a..946bdf8266 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -196,19 +196,19 @@ future<> storage_service::prepare_to_join() { }); }).then([this] { // gossip snitch infos (local DC and rack) - gossip_snitch_info(); - auto& proxy = service::get_storage_proxy(); - // gossip Schema.emptyVersion forcing immediate check for schema updates (see MigrationManager#maybeScheduleSchemaPull) - return update_schema_version_and_announce(proxy); // Ensure we know our own actual Schema UUID in preparation for updates - + return gossip_snitch_info().then([this] { + auto& proxy = service::get_storage_proxy(); + // gossip Schema.emptyVersion forcing immediate check for schema updates (see MigrationManager#maybeScheduleSchemaPull) + return update_schema_version_and_announce(proxy); // Ensure we know our own actual Schema UUID in preparation for updates #if 0 - if (!MessagingService.instance().isListening()) - MessagingService.instance().listen(FBUtilities.getLocalAddress()); - LoadBroadcaster.instance.startBroadcasting(); + if (!MessagingService.instance().isListening()) + MessagingService.instance().listen(FBUtilities.getLocalAddress()); + LoadBroadcaster.instance.startBroadcasting(); - HintedHandOffManager.instance.start(); - BatchlogManager.instance.start(); + HintedHandOffManager.instance.start(); + BatchlogManager.instance.start(); #endif + }); }); } @@ -385,8 +385,8 @@ void storage_service::bootstrap(std::unordered_set tokens) { auto& gossiper = gms::get_local_gossiper(); if (!is_replacing()) { // if not an existing token then bootstrap - gossiper.add_local_application_state(gms::application_state::TOKENS, value_factory.tokens(tokens)); - gossiper.add_local_application_state(gms::application_state::STATUS, value_factory.bootstrapping(tokens)); + gossiper.add_local_application_state(gms::application_state::TOKENS, value_factory.tokens(tokens)).get(); + gossiper.add_local_application_state(gms::application_state::STATUS, value_factory.bootstrapping(tokens)).get(); set_mode(mode::JOINING, sprint("sleeping %s ms for pending range setup", RING_DELAY), true); sleep(std::chrono::milliseconds(RING_DELAY)).get(); } else { @@ -846,8 +846,8 @@ void storage_service::set_tokens(std::unordered_set tokens) { // Collection localTokens = getLocalTokens(); auto local_tokens = _bootstrap_tokens; auto& gossiper = gms::get_local_gossiper(); - gossiper.add_local_application_state(gms::application_state::TOKENS, value_factory.tokens(local_tokens)); - gossiper.add_local_application_state(gms::application_state::STATUS, value_factory.normal(local_tokens)); + gossiper.add_local_application_state(gms::application_state::TOKENS, value_factory.tokens(local_tokens)).get(); + gossiper.add_local_application_state(gms::application_state::STATUS, value_factory.normal(local_tokens)).get(); set_mode(mode::NORMAL, false); replicate_to_all_cores().get(); } @@ -986,8 +986,8 @@ future<> storage_service::init_server(int delay) { if (!tokens.empty()) { _token_metadata.update_normal_tokens(tokens, get_broadcast_address()); // order is important here, the gossiper can fire in between adding these two states. It's ok to send TOKENS without STATUS, but *not* vice versa. - gossiper.add_local_application_state(gms::application_state::TOKENS, value_factory.tokens(tokens)); - gossiper.add_local_application_state(gms::application_state::STATUS, value_factory.hibernate(true)); + gossiper.add_local_application_state(gms::application_state::TOKENS, value_factory.tokens(tokens)).get(); + gossiper.add_local_application_state(gms::application_state::STATUS, value_factory.hibernate(true)).get(); } logger.info("Not joining ring as requested. Use JMX (StorageService->joinRing()) to initiate ring joining"); } @@ -1016,14 +1016,15 @@ future<> storage_service::replicate_to_all_cores() { }); } -void storage_service::gossip_snitch_info() { +future<> storage_service::gossip_snitch_info() { auto& snitch = locator::i_endpoint_snitch::get_local_snitch_ptr(); auto addr = get_broadcast_address(); auto dc = snitch->get_datacenter(addr); auto rack = snitch->get_rack(addr); auto& gossiper = gms::get_local_gossiper(); - gossiper.add_local_application_state(gms::application_state::DC, value_factory.datacenter(dc)); - gossiper.add_local_application_state(gms::application_state::RACK, value_factory.rack(rack)); + return gossiper.add_local_application_state(gms::application_state::DC, value_factory.datacenter(dc)).then([this, &gossiper, rack] { + return gossiper.add_local_application_state(gms::application_state::RACK, value_factory.rack(rack)); + }); } future<> storage_service::stop() { @@ -2032,7 +2033,7 @@ void storage_service::leave_ring() { auto& gossiper = gms::get_local_gossiper(); auto expire_time = gossiper.compute_expire_time().time_since_epoch().count(); - gossiper.add_local_application_state(gms::application_state::STATUS, value_factory.left(get_local_tokens(), expire_time)); + gossiper.add_local_application_state(gms::application_state::STATUS, value_factory.left(get_local_tokens(), expire_time)).get(); auto delay = std::max(std::chrono::milliseconds(RING_DELAY), gms::gossiper::INTERVAL); logger.info("Announcing that I have left the ring for {}ms", delay.count()); sleep(delay).get(); @@ -2128,9 +2129,10 @@ future<> storage_service::stream_hints() { future<> storage_service::start_leaving() { auto& gossiper = gms::get_local_gossiper(); - gossiper.add_local_application_state(application_state::STATUS, value_factory.leaving(get_local_tokens())); - _token_metadata.add_leaving_endpoint(get_broadcast_address()); - return get_local_pending_range_calculator_service().update(); + return gossiper.add_local_application_state(application_state::STATUS, value_factory.leaving(get_local_tokens())).then([this] { + _token_metadata.add_leaving_endpoint(get_broadcast_address()); + return get_local_pending_range_calculator_service().update(); + }); } void storage_service::add_expire_time_if_found(inet_address endpoint, int64_t expire_time) { diff --git a/service/storage_service.hh b/service/storage_service.hh index 3f8c42629a..040dd217ec 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -132,7 +132,7 @@ public: return _token_metadata; } - void gossip_snitch_info(); + future<> gossip_snitch_info(); void set_load_broadcaster(shared_ptr lb); shared_ptr& get_load_broadcaster();