diff --git a/gms/gossiper.cc b/gms/gossiper.cc index 040cdc97a6..3f47a0de54 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -432,6 +432,26 @@ future<> gossiper::handle_shutdown_msg(inet_address from) { }); } +future +gossiper::handle_get_endpoint_states_msg(gossip_get_endpoint_states_request request) { + std::unordered_map map; + const auto& application_states_wanted = request.application_states; + for (auto& item : endpoint_state_map) { + const inet_address& node = item.first; + const endpoint_state& state = item.second; + const heart_beat_state& hbs = state.get_heart_beat_state(); + auto state_wanted = endpoint_state(hbs); + const std::map& apps = state.get_application_state_map(); + for (const auto& app : apps) { + if (application_states_wanted.count(app.first) > 0) { + state_wanted.get_application_state_map().emplace(app); + } + } + map.emplace(node, std::move(state_wanted)); + } + return make_ready_future(gossip_get_endpoint_states_response{std::move(map)}); +} + future<> gossiper::init_messaging_service_handler(bind_messaging_port do_bind) { if (_ms_registered) { return make_ready_future<>(); @@ -482,6 +502,11 @@ future<> gossiper::init_messaging_service_handler(bind_messaging_port do_bind) { }); return messaging_service::no_wait(); }); + ms().register_gossip_get_endpoint_states([] (const rpc::client_info& cinfo, gossip_get_endpoint_states_request request) { + return smp::submit_to(0, [request = std::move(request)] () mutable { + return gms::get_local_gossiper().handle_get_endpoint_states_msg(std::move(request)); + }); + }); // Start listening messaging_service after gossip message handlers are registered if (do_bind) { @@ -497,7 +522,8 @@ future<> gossiper::uninit_messaging_service_handler() { ms.unregister_gossip_shutdown(), ms.unregister_gossip_digest_syn(), ms.unregister_gossip_digest_ack(), - ms.unregister_gossip_digest_ack2() + ms.unregister_gossip_digest_ack2(), + ms.unregister_gossip_get_endpoint_states() ).then_unpack([this] { _ms_registered = false; }); @@ -558,7 +584,7 @@ void gossiper::notify_failure_detector(inet_address endpoint, const endpoint_sta } // Runs inside seastar::async context -void gossiper::do_apply_state_locally(gms::inet_address node, const endpoint_state& remote_state) { +void gossiper::do_apply_state_locally(gms::inet_address node, const endpoint_state& remote_state, bool listener_notification) { // If state does not exist just add it. If it does then add it if the remote generation is greater. // If there is a generation tie, attempt to break it by heartbeat version. auto permit = this->lock_endpoint(node).get0(); @@ -573,29 +599,66 @@ void gossiper::do_apply_state_locally(gms::inet_address node, const endpoint_sta logger.warn("received an invalid gossip generation for peer {}; local generation = {}, received generation = {}", node, local_generation, remote_generation); } else if (remote_generation > local_generation) { - logger.trace("Updating heartbeat state generation to {} from {} for {}", remote_generation, local_generation, node); - // major state change will handle the update by inserting the remote state directly - this->handle_major_state_change(node, remote_state); - } else if (remote_generation == local_generation) { - // find maximum state - int local_max_version = this->get_max_endpoint_state_version(local_state); - int remote_max_version = this->get_max_endpoint_state_version(remote_state); - if (remote_max_version > local_max_version) { - // apply states, but do not notify since there is no major change - this->apply_new_states(node, local_state, remote_state); + if (listener_notification) { + logger.trace("Updating heartbeat state generation to {} from {} for {}", remote_generation, local_generation, node); + // major state change will handle the update by inserting the remote state directly + this->handle_major_state_change(node, remote_state); } else { - logger.trace("Ignoring remote version {} <= {} for {}", remote_max_version, local_max_version, node); + logger.debug("Applying remote_state for node {} (remote generation > local generation)", node); + endpoint_state_map[node] = remote_state; } - if (!local_state.is_alive() && !this->is_dead_state(local_state)) { // unless of course, it was dead - this->mark_alive(node, local_state); + } else if (remote_generation == local_generation) { + if (listener_notification) { + // find maximum state + int local_max_version = this->get_max_endpoint_state_version(local_state); + int remote_max_version = this->get_max_endpoint_state_version(remote_state); + if (remote_max_version > local_max_version) { + // apply states, but do not notify since there is no major change + this->apply_new_states(node, local_state, remote_state); + } else { + logger.trace("Ignoring remote version {} <= {} for {}", remote_max_version, local_max_version, node); + } + if (!local_state.is_alive() && !this->is_dead_state(local_state)) { // unless of course, it was dead + this->mark_alive(node, local_state); + } + } else { + for (const auto& item : remote_state.get_application_state_map()) { + const auto& remote_key = item.first; + const auto& remote_value = item.second; + const versioned_value* local_value = local_state.get_application_state_ptr(remote_key); + if (!local_value || remote_value.version > local_value->version) { + logger.debug("Applying remote_state for node {} (remote generation = local generation), key={}, value={}", + node, remote_key, remote_value); + local_state.add_application_state(remote_key, remote_value); + } else { + logger.debug("Ignoring remote_state for node {} (remote generation = local generation), key={}, value={}", node, remote_key, remote_value); + } + } } } else { logger.debug("Ignoring remote generation {} < {}", remote_generation, local_generation); } } else { - // this is a new node, report it to the FD in case it is the first time we are seeing it AND it's not alive - fd().report(node); - this->handle_major_state_change(node, remote_state); + if (listener_notification) { + // this is a new node, report it to the FD in case it is the first time we are seeing it AND it's not alive + fd().report(node); + this->handle_major_state_change(node, remote_state); + } else { + logger.debug("Applying remote_state for node {} (new node)", node); + endpoint_state_map[node] = remote_state; + } + } +} + +// Runs inside seastar::async context +void gossiper::apply_state_locally_without_listener_notification(std::unordered_map map) { + for (auto& x : map) { + const inet_address& node = x.first; + const endpoint_state& remote_state = x.second; + if (node == this->get_broadcast_address()) { + continue; + } + do_apply_state_locally(node, remote_state, false); } } @@ -618,7 +681,7 @@ future<> gossiper::apply_state_locally(std::map ma } return seastar::with_semaphore(_apply_state_locally_semaphore, 1, [this, &ep, &map] () mutable { return seastar::async([this, &ep, &map] () mutable { - do_apply_state_locally(ep, map[ep]); + do_apply_state_locally(ep, map[ep], true); }); }); }); diff --git a/gms/gossiper.hh b/gms/gossiper.hh index 04d7fb3085..2e52cdce99 100644 --- a/gms/gossiper.hh +++ b/gms/gossiper.hh @@ -81,6 +81,8 @@ class gossip_digest_ack2; class gossip_digest; class inet_address; class i_endpoint_state_change_subscriber; +class gossip_get_endpoint_states_request; +class gossip_get_endpoint_states_response; class feature_service; @@ -129,6 +131,7 @@ private: future<> handle_shutdown_msg(inet_address from); future<> do_send_ack_msg(msg_addr from, gossip_digest_syn syn_msg); future<> do_send_ack2_msg(msg_addr from, utils::chunked_vector ack_msg_digest); + future handle_get_endpoint_states_msg(gossip_get_endpoint_states_request request); static constexpr uint32_t _default_cpuid = 0; msg_addr get_msg_addr(inet_address to); void do_sort(utils::chunked_vector& g_digest_list); @@ -441,7 +444,8 @@ public: future<> apply_state_locally(std::map map); private: - void do_apply_state_locally(gms::inet_address node, const endpoint_state& remote_state); + void do_apply_state_locally(gms::inet_address node, const endpoint_state& remote_state, bool listener_notification); + void apply_state_locally_without_listener_notification(std::unordered_map map); void apply_new_states(inet_address addr, endpoint_state& local_state, const endpoint_state& remote_state); @@ -646,5 +650,13 @@ inline future> get_arrival_samples() { }); } +struct gossip_get_endpoint_states_request { + // Application states the sender requested + std::unordered_set application_states; +}; + +struct gossip_get_endpoint_states_response { + std::unordered_map endpoint_state_map; +}; } // namespace gms diff --git a/idl/gossip_digest.idl.hh b/idl/gossip_digest.idl.hh index c82928c804..596e96e7d3 100644 --- a/idl/gossip_digest.idl.hh +++ b/idl/gossip_digest.idl.hh @@ -74,4 +74,12 @@ class gossip_digest_ack2 { std::map get_endpoint_state_map(); }; +struct gossip_get_endpoint_states_request { + std::unordered_set application_states; +}; + +struct gossip_get_endpoint_states_response { + std::unordered_map endpoint_state_map; +}; + } diff --git a/message/messaging_service.cc b/message/messaging_service.cc index b486c1c28b..0566c39e11 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -475,6 +475,7 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) { case messaging_verb::GOSSIP_DIGEST_ACK2: case messaging_verb::GOSSIP_SHUTDOWN: case messaging_verb::GOSSIP_ECHO: + case messaging_verb::GOSSIP_GET_ENDPOINT_STATES: case messaging_verb::GET_SCHEMA_VERSION: return 0; case messaging_verb::PREPARE_MESSAGE: @@ -1035,6 +1036,16 @@ future<> messaging_service::send_gossip_digest_ack2(msg_addr id, gossip_digest_a return send_message_oneway(this, messaging_verb::GOSSIP_DIGEST_ACK2, std::move(id), std::move(msg)); } +void messaging_service::register_gossip_get_endpoint_states(std::function (const rpc::client_info& cinfo, gms::gossip_get_endpoint_states_request request)>&& func) { + register_handler(this, messaging_verb::GOSSIP_GET_ENDPOINT_STATES, std::move(func)); +} +future<> messaging_service::unregister_gossip_get_endpoint_states() { + return unregister_handler(messaging_verb::GOSSIP_GET_ENDPOINT_STATES); +} +future messaging_service::send_gossip_get_endpoint_states(msg_addr id, std::chrono::milliseconds timeout, gms::gossip_get_endpoint_states_request request) { + return send_message_timeout>(this, messaging_verb::GOSSIP_GET_ENDPOINT_STATES, std::move(id), std::move(timeout), std::move(request)); +} + void messaging_service::register_definitions_update(std::function fm, rpc::optional> cm)>&& func) { register_handler(this, netw::messaging_verb::DEFINITIONS_UPDATE, std::move(func)); diff --git a/message/messaging_service.hh b/message/messaging_service.hh index 379a0708c9..cc9348d13d 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -54,6 +54,8 @@ namespace gms { class gossip_digest_syn; class gossip_digest_ack; class gossip_digest_ack2; + class gossip_get_endpoint_states_request; + class gossip_get_endpoint_states_response; } namespace utils { @@ -140,7 +142,8 @@ enum class messaging_verb : int32_t { PAXOS_LEARN = 41, HINT_MUTATION = 42, PAXOS_PRUNE = 43, - LAST = 44, + GOSSIP_GET_ENDPOINT_STATES = 44, + LAST = 45, }; } // namespace netw @@ -409,6 +412,11 @@ public: future<> unregister_gossip_digest_ack2(); future<> send_gossip_digest_ack2(msg_addr id, gms::gossip_digest_ack2 msg); + // Wrapper for GOSSIP_GET_ENDPOINT_STATES + void register_gossip_get_endpoint_states(std::function (const rpc::client_info& cinfo, gms::gossip_get_endpoint_states_request request)>&& func); + future<> unregister_gossip_get_endpoint_states(); + future send_gossip_get_endpoint_states(msg_addr id, std::chrono::milliseconds timeout, gms::gossip_get_endpoint_states_request request); + // Wrapper for DEFINITIONS_UPDATE void register_definitions_update(std::function fm, rpc::optional> cm)>&& func);