gossip: Introduce GOSSIP_GET_ENDPOINT_STATES verb
The new verb is used to replace the current gossip shadow round implementation. Current shadow round implementation reuses the gossip syn and ack async message, which has plenty of drawbacks. It is hard to tell if the syn messages to a specific peer node has responded. The delayed responses from shadow round can apply to the normal gossip states even if the shadow round is done. The syn and ack message handler are full special cases due to the shadow round. All gossip application states including the one that are not relevant are sent back. The gossip application states are applied and the gossip listeners are called as if is in the normal gossip operation. It is completely unnecessary to call the gossip listeners in the shadow round. This patch introduces a new verb to request the exact gossip application states the shadow round needed with a synchronous verb and applies the application states without calling the gossip listeners. This patch makes the shadow round easier to reason about, more robust and efficient. Refs: #6845 Tests: update_cluster_layout_tests.py
This commit is contained in:
101
gms/gossiper.cc
101
gms/gossiper.cc
@@ -432,6 +432,26 @@ future<> gossiper::handle_shutdown_msg(inet_address from) {
|
||||
});
|
||||
}
|
||||
|
||||
future<gossip_get_endpoint_states_response>
|
||||
gossiper::handle_get_endpoint_states_msg(gossip_get_endpoint_states_request request) {
|
||||
std::unordered_map<gms::inet_address, gms::endpoint_state> map;
|
||||
const auto& application_states_wanted = request.application_states;
|
||||
for (auto& item : endpoint_state_map) {
|
||||
const inet_address& node = item.first;
|
||||
const endpoint_state& state = item.second;
|
||||
const heart_beat_state& hbs = state.get_heart_beat_state();
|
||||
auto state_wanted = endpoint_state(hbs);
|
||||
const std::map<application_state, versioned_value>& apps = state.get_application_state_map();
|
||||
for (const auto& app : apps) {
|
||||
if (application_states_wanted.count(app.first) > 0) {
|
||||
state_wanted.get_application_state_map().emplace(app);
|
||||
}
|
||||
}
|
||||
map.emplace(node, std::move(state_wanted));
|
||||
}
|
||||
return make_ready_future<gossip_get_endpoint_states_response>(gossip_get_endpoint_states_response{std::move(map)});
|
||||
}
|
||||
|
||||
future<> gossiper::init_messaging_service_handler(bind_messaging_port do_bind) {
|
||||
if (_ms_registered) {
|
||||
return make_ready_future<>();
|
||||
@@ -482,6 +502,11 @@ future<> gossiper::init_messaging_service_handler(bind_messaging_port do_bind) {
|
||||
});
|
||||
return messaging_service::no_wait();
|
||||
});
|
||||
ms().register_gossip_get_endpoint_states([] (const rpc::client_info& cinfo, gossip_get_endpoint_states_request request) {
|
||||
return smp::submit_to(0, [request = std::move(request)] () mutable {
|
||||
return gms::get_local_gossiper().handle_get_endpoint_states_msg(std::move(request));
|
||||
});
|
||||
});
|
||||
|
||||
// Start listening messaging_service after gossip message handlers are registered
|
||||
if (do_bind) {
|
||||
@@ -497,7 +522,8 @@ future<> gossiper::uninit_messaging_service_handler() {
|
||||
ms.unregister_gossip_shutdown(),
|
||||
ms.unregister_gossip_digest_syn(),
|
||||
ms.unregister_gossip_digest_ack(),
|
||||
ms.unregister_gossip_digest_ack2()
|
||||
ms.unregister_gossip_digest_ack2(),
|
||||
ms.unregister_gossip_get_endpoint_states()
|
||||
).then_unpack([this] {
|
||||
_ms_registered = false;
|
||||
});
|
||||
@@ -558,7 +584,7 @@ void gossiper::notify_failure_detector(inet_address endpoint, const endpoint_sta
|
||||
}
|
||||
|
||||
// Runs inside seastar::async context
|
||||
void gossiper::do_apply_state_locally(gms::inet_address node, const endpoint_state& remote_state) {
|
||||
void gossiper::do_apply_state_locally(gms::inet_address node, const endpoint_state& remote_state, bool listener_notification) {
|
||||
// If state does not exist just add it. If it does then add it if the remote generation is greater.
|
||||
// If there is a generation tie, attempt to break it by heartbeat version.
|
||||
auto permit = this->lock_endpoint(node).get0();
|
||||
@@ -573,29 +599,66 @@ void gossiper::do_apply_state_locally(gms::inet_address node, const endpoint_sta
|
||||
logger.warn("received an invalid gossip generation for peer {}; local generation = {}, received generation = {}",
|
||||
node, local_generation, remote_generation);
|
||||
} else if (remote_generation > local_generation) {
|
||||
logger.trace("Updating heartbeat state generation to {} from {} for {}", remote_generation, local_generation, node);
|
||||
// major state change will handle the update by inserting the remote state directly
|
||||
this->handle_major_state_change(node, remote_state);
|
||||
} else if (remote_generation == local_generation) {
|
||||
// find maximum state
|
||||
int local_max_version = this->get_max_endpoint_state_version(local_state);
|
||||
int remote_max_version = this->get_max_endpoint_state_version(remote_state);
|
||||
if (remote_max_version > local_max_version) {
|
||||
// apply states, but do not notify since there is no major change
|
||||
this->apply_new_states(node, local_state, remote_state);
|
||||
if (listener_notification) {
|
||||
logger.trace("Updating heartbeat state generation to {} from {} for {}", remote_generation, local_generation, node);
|
||||
// major state change will handle the update by inserting the remote state directly
|
||||
this->handle_major_state_change(node, remote_state);
|
||||
} else {
|
||||
logger.trace("Ignoring remote version {} <= {} for {}", remote_max_version, local_max_version, node);
|
||||
logger.debug("Applying remote_state for node {} (remote generation > local generation)", node);
|
||||
endpoint_state_map[node] = remote_state;
|
||||
}
|
||||
if (!local_state.is_alive() && !this->is_dead_state(local_state)) { // unless of course, it was dead
|
||||
this->mark_alive(node, local_state);
|
||||
} else if (remote_generation == local_generation) {
|
||||
if (listener_notification) {
|
||||
// find maximum state
|
||||
int local_max_version = this->get_max_endpoint_state_version(local_state);
|
||||
int remote_max_version = this->get_max_endpoint_state_version(remote_state);
|
||||
if (remote_max_version > local_max_version) {
|
||||
// apply states, but do not notify since there is no major change
|
||||
this->apply_new_states(node, local_state, remote_state);
|
||||
} else {
|
||||
logger.trace("Ignoring remote version {} <= {} for {}", remote_max_version, local_max_version, node);
|
||||
}
|
||||
if (!local_state.is_alive() && !this->is_dead_state(local_state)) { // unless of course, it was dead
|
||||
this->mark_alive(node, local_state);
|
||||
}
|
||||
} else {
|
||||
for (const auto& item : remote_state.get_application_state_map()) {
|
||||
const auto& remote_key = item.first;
|
||||
const auto& remote_value = item.second;
|
||||
const versioned_value* local_value = local_state.get_application_state_ptr(remote_key);
|
||||
if (!local_value || remote_value.version > local_value->version) {
|
||||
logger.debug("Applying remote_state for node {} (remote generation = local generation), key={}, value={}",
|
||||
node, remote_key, remote_value);
|
||||
local_state.add_application_state(remote_key, remote_value);
|
||||
} else {
|
||||
logger.debug("Ignoring remote_state for node {} (remote generation = local generation), key={}, value={}", node, remote_key, remote_value);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
logger.debug("Ignoring remote generation {} < {}", remote_generation, local_generation);
|
||||
}
|
||||
} else {
|
||||
// this is a new node, report it to the FD in case it is the first time we are seeing it AND it's not alive
|
||||
fd().report(node);
|
||||
this->handle_major_state_change(node, remote_state);
|
||||
if (listener_notification) {
|
||||
// this is a new node, report it to the FD in case it is the first time we are seeing it AND it's not alive
|
||||
fd().report(node);
|
||||
this->handle_major_state_change(node, remote_state);
|
||||
} else {
|
||||
logger.debug("Applying remote_state for node {} (new node)", node);
|
||||
endpoint_state_map[node] = remote_state;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Runs inside seastar::async context
|
||||
void gossiper::apply_state_locally_without_listener_notification(std::unordered_map<inet_address, endpoint_state> map) {
|
||||
for (auto& x : map) {
|
||||
const inet_address& node = x.first;
|
||||
const endpoint_state& remote_state = x.second;
|
||||
if (node == this->get_broadcast_address()) {
|
||||
continue;
|
||||
}
|
||||
do_apply_state_locally(node, remote_state, false);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -618,7 +681,7 @@ future<> gossiper::apply_state_locally(std::map<inet_address, endpoint_state> ma
|
||||
}
|
||||
return seastar::with_semaphore(_apply_state_locally_semaphore, 1, [this, &ep, &map] () mutable {
|
||||
return seastar::async([this, &ep, &map] () mutable {
|
||||
do_apply_state_locally(ep, map[ep]);
|
||||
do_apply_state_locally(ep, map[ep], true);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -81,6 +81,8 @@ class gossip_digest_ack2;
|
||||
class gossip_digest;
|
||||
class inet_address;
|
||||
class i_endpoint_state_change_subscriber;
|
||||
class gossip_get_endpoint_states_request;
|
||||
class gossip_get_endpoint_states_response;
|
||||
|
||||
class feature_service;
|
||||
|
||||
@@ -129,6 +131,7 @@ private:
|
||||
future<> handle_shutdown_msg(inet_address from);
|
||||
future<> do_send_ack_msg(msg_addr from, gossip_digest_syn syn_msg);
|
||||
future<> do_send_ack2_msg(msg_addr from, utils::chunked_vector<gossip_digest> ack_msg_digest);
|
||||
future<gossip_get_endpoint_states_response> handle_get_endpoint_states_msg(gossip_get_endpoint_states_request request);
|
||||
static constexpr uint32_t _default_cpuid = 0;
|
||||
msg_addr get_msg_addr(inet_address to);
|
||||
void do_sort(utils::chunked_vector<gossip_digest>& g_digest_list);
|
||||
@@ -441,7 +444,8 @@ public:
|
||||
future<> apply_state_locally(std::map<inet_address, endpoint_state> map);
|
||||
|
||||
private:
|
||||
void do_apply_state_locally(gms::inet_address node, const endpoint_state& remote_state);
|
||||
void do_apply_state_locally(gms::inet_address node, const endpoint_state& remote_state, bool listener_notification);
|
||||
void apply_state_locally_without_listener_notification(std::unordered_map<inet_address, endpoint_state> map);
|
||||
|
||||
void apply_new_states(inet_address addr, endpoint_state& local_state, const endpoint_state& remote_state);
|
||||
|
||||
@@ -646,5 +650,13 @@ inline future<std::map<inet_address, arrival_window>> get_arrival_samples() {
|
||||
});
|
||||
}
|
||||
|
||||
struct gossip_get_endpoint_states_request {
|
||||
// Application states the sender requested
|
||||
std::unordered_set<gms::application_state> application_states;
|
||||
};
|
||||
|
||||
struct gossip_get_endpoint_states_response {
|
||||
std::unordered_map<gms::inet_address, gms::endpoint_state> endpoint_state_map;
|
||||
};
|
||||
|
||||
} // namespace gms
|
||||
|
||||
@@ -74,4 +74,12 @@ class gossip_digest_ack2 {
|
||||
std::map<gms::inet_address, gms::endpoint_state> get_endpoint_state_map();
|
||||
};
|
||||
|
||||
struct gossip_get_endpoint_states_request {
|
||||
std::unordered_set<gms::application_state> application_states;
|
||||
};
|
||||
|
||||
struct gossip_get_endpoint_states_response {
|
||||
std::unordered_map<gms::inet_address, gms::endpoint_state> endpoint_state_map;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -475,6 +475,7 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) {
|
||||
case messaging_verb::GOSSIP_DIGEST_ACK2:
|
||||
case messaging_verb::GOSSIP_SHUTDOWN:
|
||||
case messaging_verb::GOSSIP_ECHO:
|
||||
case messaging_verb::GOSSIP_GET_ENDPOINT_STATES:
|
||||
case messaging_verb::GET_SCHEMA_VERSION:
|
||||
return 0;
|
||||
case messaging_verb::PREPARE_MESSAGE:
|
||||
@@ -1035,6 +1036,16 @@ future<> messaging_service::send_gossip_digest_ack2(msg_addr id, gossip_digest_a
|
||||
return send_message_oneway(this, messaging_verb::GOSSIP_DIGEST_ACK2, std::move(id), std::move(msg));
|
||||
}
|
||||
|
||||
void messaging_service::register_gossip_get_endpoint_states(std::function<future<gms::gossip_get_endpoint_states_response> (const rpc::client_info& cinfo, gms::gossip_get_endpoint_states_request request)>&& func) {
|
||||
register_handler(this, messaging_verb::GOSSIP_GET_ENDPOINT_STATES, std::move(func));
|
||||
}
|
||||
future<> messaging_service::unregister_gossip_get_endpoint_states() {
|
||||
return unregister_handler(messaging_verb::GOSSIP_GET_ENDPOINT_STATES);
|
||||
}
|
||||
future<gms::gossip_get_endpoint_states_response> messaging_service::send_gossip_get_endpoint_states(msg_addr id, std::chrono::milliseconds timeout, gms::gossip_get_endpoint_states_request request) {
|
||||
return send_message_timeout<future<gms::gossip_get_endpoint_states_response>>(this, messaging_verb::GOSSIP_GET_ENDPOINT_STATES, std::move(id), std::move(timeout), std::move(request));
|
||||
}
|
||||
|
||||
void messaging_service::register_definitions_update(std::function<rpc::no_wait_type (const rpc::client_info& cinfo, std::vector<frozen_mutation> fm,
|
||||
rpc::optional<std::vector<canonical_mutation>> cm)>&& func) {
|
||||
register_handler(this, netw::messaging_verb::DEFINITIONS_UPDATE, std::move(func));
|
||||
|
||||
@@ -54,6 +54,8 @@ namespace gms {
|
||||
class gossip_digest_syn;
|
||||
class gossip_digest_ack;
|
||||
class gossip_digest_ack2;
|
||||
class gossip_get_endpoint_states_request;
|
||||
class gossip_get_endpoint_states_response;
|
||||
}
|
||||
|
||||
namespace utils {
|
||||
@@ -140,7 +142,8 @@ enum class messaging_verb : int32_t {
|
||||
PAXOS_LEARN = 41,
|
||||
HINT_MUTATION = 42,
|
||||
PAXOS_PRUNE = 43,
|
||||
LAST = 44,
|
||||
GOSSIP_GET_ENDPOINT_STATES = 44,
|
||||
LAST = 45,
|
||||
};
|
||||
|
||||
} // namespace netw
|
||||
@@ -409,6 +412,11 @@ public:
|
||||
future<> unregister_gossip_digest_ack2();
|
||||
future<> send_gossip_digest_ack2(msg_addr id, gms::gossip_digest_ack2 msg);
|
||||
|
||||
// Wrapper for GOSSIP_GET_ENDPOINT_STATES
|
||||
void register_gossip_get_endpoint_states(std::function<future<gms::gossip_get_endpoint_states_response> (const rpc::client_info& cinfo, gms::gossip_get_endpoint_states_request request)>&& func);
|
||||
future<> unregister_gossip_get_endpoint_states();
|
||||
future<gms::gossip_get_endpoint_states_response> send_gossip_get_endpoint_states(msg_addr id, std::chrono::milliseconds timeout, gms::gossip_get_endpoint_states_request request);
|
||||
|
||||
// Wrapper for DEFINITIONS_UPDATE
|
||||
void register_definitions_update(std::function<rpc::no_wait_type (const rpc::client_info& cinfo, std::vector<frozen_mutation> fm,
|
||||
rpc::optional<std::vector<canonical_mutation>> cm)>&& func);
|
||||
|
||||
Reference in New Issue
Block a user