gossip: Futurize add_local_application_state()

We are ignoring the future returned by seastar::async. Futurize it so
caller can wait for the application state to be actually applied.

In addition, dropping the unused add_local_application_states function.
This commit is contained in:
Asias He
2015-10-30 22:54:32 +08:00
committed by Avi Kivity
parent 2c3591cbd9
commit 5e8037b50a
4 changed files with 29 additions and 37 deletions

View File

@@ -1353,8 +1353,8 @@ void gossiper::add_saved_endpoint(inet_address ep) {
logger.trace("Adding saved endpoint {} {}", ep, ep_state.get_heart_beat_state().get_generation());
}
void gossiper::add_local_application_state(application_state state, versioned_value value) {
seastar::async([this, g = this->shared_from_this(), state, value = std::move(value)] () mutable {
future<> gossiper::add_local_application_state(application_state state, versioned_value value) {
return seastar::async([this, g = this->shared_from_this(), state, value = std::move(value)] () mutable {
inet_address ep_addr = get_broadcast_address();
assert(endpoint_state_map.count(ep_addr));
endpoint_state& ep_state = endpoint_state_map.at(ep_addr);
@@ -1376,14 +1376,6 @@ void gossiper::add_local_application_state(application_state state, versioned_va
});
}
void gossiper::add_lccal_application_states(std::list<std::pair<application_state, versioned_value> > states) {
// Note: The taskLock in Origin code is removed, we can probaby use a
// simple data structure here
for (std::pair<application_state, versioned_value>& pair : states) {
add_local_application_state(pair.first, pair.second);
}
}
future<> gossiper::stop() {
logger.debug("gossip::stop on cpu {}", engine().cpu_id());

View File

@@ -463,9 +463,7 @@ public:
*/
void add_saved_endpoint(inet_address ep);
void add_local_application_state(application_state state, versioned_value value);
void add_lccal_application_states(std::list<std::pair<application_state, versioned_value>> states);
future<> add_local_application_state(application_state state, versioned_value value);
future<> stop();

View File

@@ -196,19 +196,19 @@ future<> storage_service::prepare_to_join() {
});
}).then([this] {
// gossip snitch infos (local DC and rack)
gossip_snitch_info();
auto& proxy = service::get_storage_proxy();
// gossip Schema.emptyVersion forcing immediate check for schema updates (see MigrationManager#maybeScheduleSchemaPull)
return update_schema_version_and_announce(proxy); // Ensure we know our own actual Schema UUID in preparation for updates
return gossip_snitch_info().then([this] {
auto& proxy = service::get_storage_proxy();
// gossip Schema.emptyVersion forcing immediate check for schema updates (see MigrationManager#maybeScheduleSchemaPull)
return update_schema_version_and_announce(proxy); // Ensure we know our own actual Schema UUID in preparation for updates
#if 0
if (!MessagingService.instance().isListening())
MessagingService.instance().listen(FBUtilities.getLocalAddress());
LoadBroadcaster.instance.startBroadcasting();
if (!MessagingService.instance().isListening())
MessagingService.instance().listen(FBUtilities.getLocalAddress());
LoadBroadcaster.instance.startBroadcasting();
HintedHandOffManager.instance.start();
BatchlogManager.instance.start();
HintedHandOffManager.instance.start();
BatchlogManager.instance.start();
#endif
});
});
}
@@ -385,8 +385,8 @@ void storage_service::bootstrap(std::unordered_set<token> tokens) {
auto& gossiper = gms::get_local_gossiper();
if (!is_replacing()) {
// if not an existing token then bootstrap
gossiper.add_local_application_state(gms::application_state::TOKENS, value_factory.tokens(tokens));
gossiper.add_local_application_state(gms::application_state::STATUS, value_factory.bootstrapping(tokens));
gossiper.add_local_application_state(gms::application_state::TOKENS, value_factory.tokens(tokens)).get();
gossiper.add_local_application_state(gms::application_state::STATUS, value_factory.bootstrapping(tokens)).get();
set_mode(mode::JOINING, sprint("sleeping %s ms for pending range setup", RING_DELAY), true);
sleep(std::chrono::milliseconds(RING_DELAY)).get();
} else {
@@ -846,8 +846,8 @@ void storage_service::set_tokens(std::unordered_set<token> tokens) {
// Collection<Token> localTokens = getLocalTokens();
auto local_tokens = _bootstrap_tokens;
auto& gossiper = gms::get_local_gossiper();
gossiper.add_local_application_state(gms::application_state::TOKENS, value_factory.tokens(local_tokens));
gossiper.add_local_application_state(gms::application_state::STATUS, value_factory.normal(local_tokens));
gossiper.add_local_application_state(gms::application_state::TOKENS, value_factory.tokens(local_tokens)).get();
gossiper.add_local_application_state(gms::application_state::STATUS, value_factory.normal(local_tokens)).get();
set_mode(mode::NORMAL, false);
replicate_to_all_cores().get();
}
@@ -986,8 +986,8 @@ future<> storage_service::init_server(int delay) {
if (!tokens.empty()) {
_token_metadata.update_normal_tokens(tokens, get_broadcast_address());
// order is important here, the gossiper can fire in between adding these two states. It's ok to send TOKENS without STATUS, but *not* vice versa.
gossiper.add_local_application_state(gms::application_state::TOKENS, value_factory.tokens(tokens));
gossiper.add_local_application_state(gms::application_state::STATUS, value_factory.hibernate(true));
gossiper.add_local_application_state(gms::application_state::TOKENS, value_factory.tokens(tokens)).get();
gossiper.add_local_application_state(gms::application_state::STATUS, value_factory.hibernate(true)).get();
}
logger.info("Not joining ring as requested. Use JMX (StorageService->joinRing()) to initiate ring joining");
}
@@ -1016,14 +1016,15 @@ future<> storage_service::replicate_to_all_cores() {
});
}
void storage_service::gossip_snitch_info() {
future<> storage_service::gossip_snitch_info() {
auto& snitch = locator::i_endpoint_snitch::get_local_snitch_ptr();
auto addr = get_broadcast_address();
auto dc = snitch->get_datacenter(addr);
auto rack = snitch->get_rack(addr);
auto& gossiper = gms::get_local_gossiper();
gossiper.add_local_application_state(gms::application_state::DC, value_factory.datacenter(dc));
gossiper.add_local_application_state(gms::application_state::RACK, value_factory.rack(rack));
return gossiper.add_local_application_state(gms::application_state::DC, value_factory.datacenter(dc)).then([this, &gossiper, rack] {
return gossiper.add_local_application_state(gms::application_state::RACK, value_factory.rack(rack));
});
}
future<> storage_service::stop() {
@@ -2032,7 +2033,7 @@ void storage_service::leave_ring() {
auto& gossiper = gms::get_local_gossiper();
auto expire_time = gossiper.compute_expire_time().time_since_epoch().count();
gossiper.add_local_application_state(gms::application_state::STATUS, value_factory.left(get_local_tokens(), expire_time));
gossiper.add_local_application_state(gms::application_state::STATUS, value_factory.left(get_local_tokens(), expire_time)).get();
auto delay = std::max(std::chrono::milliseconds(RING_DELAY), gms::gossiper::INTERVAL);
logger.info("Announcing that I have left the ring for {}ms", delay.count());
sleep(delay).get();
@@ -2128,9 +2129,10 @@ future<> storage_service::stream_hints() {
future<> storage_service::start_leaving() {
auto& gossiper = gms::get_local_gossiper();
gossiper.add_local_application_state(application_state::STATUS, value_factory.leaving(get_local_tokens()));
_token_metadata.add_leaving_endpoint(get_broadcast_address());
return get_local_pending_range_calculator_service().update();
return gossiper.add_local_application_state(application_state::STATUS, value_factory.leaving(get_local_tokens())).then([this] {
_token_metadata.add_leaving_endpoint(get_broadcast_address());
return get_local_pending_range_calculator_service().update();
});
}
void storage_service::add_expire_time_if_found(inet_address endpoint, int64_t expire_time) {

View File

@@ -132,7 +132,7 @@ public:
return _token_metadata;
}
void gossip_snitch_info();
future<> gossip_snitch_info();
void set_load_broadcaster(shared_ptr<load_broadcaster> lb);
shared_ptr<load_broadcaster>& get_load_broadcaster();