gossiper: Avoid endpoint_state copies

gossiper::get_endpoint_state_for_endpoint() returns a copy of
endpoint_state, which we've seen can be very expensive.

This patch adds a similar function which returns a pointer instead,
and changes the call sites where using the pointer-returning variant
is deemed safe (the pointer neither escapes the function, nor crosses
any defer point).

Fixes #764

Signed-off-by: Duarte Nunes <duarte@scylladb.com>
This commit is contained in:
Duarte Nunes
2017-10-10 00:23:33 +01:00
parent bc976b4773
commit ceebbe14cc
9 changed files with 46 additions and 31 deletions

View File

@@ -4252,7 +4252,7 @@ column_family::cache_hit_rate column_family::get_hit_rate(gms::inet_address addr
if (it == _cluster_cache_hit_rates.end()) {
// no data yet, get it from the gossiper
auto& gossiper = gms::get_local_gossiper();
auto eps = gossiper.get_endpoint_state_for_endpoint(addr);
auto* eps = gossiper.get_endpoint_state_for_endpoint_ptr(addr);
if (eps) {
auto state = eps->get_application_state(gms::application_state::CACHE_HITRATES);
float f = -1.0f; // missing state means old node

View File

@@ -193,7 +193,7 @@ range_streamer::get_all_ranges_with_strict_sources_for(const sstring& keyspace_n
inet_address source_ip = range_sources.find(desired_range)->second;
auto& gossiper = gms::get_local_gossiper();
auto source_state = gossiper.get_endpoint_state_for_endpoint(source_ip);
auto* source_state = gossiper.get_endpoint_state_for_endpoint_ptr(source_ip);
if (gossiper.is_enabled() && source_state && !source_state->is_alive()) {
throw std::runtime_error(sprint("A node required to move the data consistently is down (%s). If you wish to move the data from a potentially inconsistent replica, restart the node with consistent_rangemovement=false", source_ip));
}

View File

@@ -154,7 +154,7 @@ int failure_detector::get_up_endpoint_count() {
sstring failure_detector::get_endpoint_state(sstring address) {
std::stringstream ss;
auto eps = get_local_gossiper().get_endpoint_state_for_endpoint(inet_address(address));
auto* eps = get_local_gossiper().get_endpoint_state_for_endpoint_ptr(inet_address(address));
if (eps) {
append_endpoint_state(ss, *eps);
return sstring(ss.str());
@@ -163,7 +163,7 @@ sstring failure_detector::get_endpoint_state(sstring address) {
}
}
void failure_detector::append_endpoint_state(std::stringstream& ss, endpoint_state& state) {
void failure_detector::append_endpoint_state(std::stringstream& ss, const endpoint_state& state) {
ss << " generation:" << state.get_heart_beat_state().get_generation() << "\n";
ss << " heartbeat:" << state.get_heart_beat_state().get_heart_beat_version() << "\n";
for (const auto& entry : state.get_application_state_map()) {

View File

@@ -156,7 +156,7 @@ public:
}
private:
void append_endpoint_state(std::stringstream& ss, endpoint_state& state);
void append_endpoint_state(std::stringstream& ss, const endpoint_state& state);
public:
/**

View File

@@ -162,7 +162,7 @@ void gossiper::do_sort(std::vector<gossip_digest>& g_digest_list) {
std::vector<gossip_digest> diff_digests;
for (auto g_digest : g_digest_list) {
auto ep = g_digest.get_endpoint();
auto ep_state = this->get_endpoint_state_for_endpoint(ep);
auto* ep_state = this->get_endpoint_state_for_endpoint_ptr(ep);
int version = ep_state ? this->get_max_endpoint_state_version(*ep_state) : 0;
int diff_version = ::abs(version - g_digest.get_max_version());
diff_digests.emplace_back(gossip_digest(ep, g_digest.get_generation(), diff_version));
@@ -1060,7 +1060,16 @@ clk::time_point gossiper::get_expire_time_for_endpoint(inet_address endpoint) {
}
}
std::experimental::optional<endpoint_state> gossiper::get_endpoint_state_for_endpoint(inet_address ep) const {
const endpoint_state* gossiper::get_endpoint_state_for_endpoint_ptr(inet_address ep) const {
auto it = endpoint_state_map.find(ep);
if (it == endpoint_state_map.end()) {
return nullptr;
} else {
return &it->second;
}
}
stdx::optional<endpoint_state> gossiper::get_endpoint_state_for_endpoint(inet_address ep) const {
auto it = endpoint_state_map.find(ep);
if (it == endpoint_state_map.end()) {
return {};
@@ -1083,21 +1092,23 @@ std::unordered_map<inet_address, endpoint_state>& gms::gossiper::get_endpoint_st
bool gossiper::uses_host_id(inet_address endpoint) {
if (netw::get_local_messaging_service().knows_version(endpoint)) {
return true;
} else if (get_endpoint_state_for_endpoint(endpoint)->get_application_state(application_state::NET_VERSION)) {
}
auto* eps = get_endpoint_state_for_endpoint_ptr(endpoint);
if (eps && eps->get_application_state(application_state::NET_VERSION)) {
return true;
}
return false;
}
bool gossiper::uses_vnodes(inet_address endpoint) {
return uses_host_id(endpoint) && get_endpoint_state_for_endpoint(endpoint)->get_application_state(application_state::TOKENS);
return uses_host_id(endpoint) && get_endpoint_state_for_endpoint_ptr(endpoint)->get_application_state(application_state::TOKENS);
}
utils::UUID gossiper::get_host_id(inet_address endpoint) {
if (!uses_host_id(endpoint)) {
throw std::runtime_error(sprint("Host %s does not use new-style tokens!", endpoint));
}
sstring uuid = get_endpoint_state_for_endpoint(endpoint)->get_application_state(application_state::HOST_ID)->value;
sstring uuid = get_endpoint_state_for_endpoint_ptr(endpoint)->get_application_state(application_state::HOST_ID)->value;
return utils::UUID(uuid);
}
@@ -1137,8 +1148,8 @@ std::experimental::optional<endpoint_state> gossiper::get_state_for_version_bigg
}
int gossiper::compare_endpoint_startup(inet_address addr1, inet_address addr2) {
auto ep1 = get_endpoint_state_for_endpoint(addr1);
auto ep2 = get_endpoint_state_for_endpoint(addr2);
auto* ep1 = get_endpoint_state_for_endpoint_ptr(addr1);
auto* ep2 = get_endpoint_state_for_endpoint_ptr(addr2);
if (!ep1 || !ep2) {
auto err = sprint("Can not get endpoint_state for %s or %s", addr1, addr2);
logger.warn("{}", err);
@@ -1273,7 +1284,7 @@ void gossiper::handle_major_state_change(inet_address ep, const endpoint_state&
mark_dead(ep, ep_state);
}
auto eps_new = get_endpoint_state_for_endpoint(ep);
auto* eps_new = get_endpoint_state_for_endpoint_ptr(ep);
if (eps_new) {
_subscribers.for_each([ep, eps_new] (auto& subscriber) {
subscriber->on_join(ep, *eps_new);
@@ -1584,7 +1595,7 @@ future<> gossiper::do_stop_gossiping() {
}
return seastar::async([this, g = this->shared_from_this()] {
_enabled = false;
auto my_ep_state = get_endpoint_state_for_endpoint(get_broadcast_address());
auto* my_ep_state = get_endpoint_state_for_endpoint_ptr(get_broadcast_address());
if (my_ep_state) {
logger.info("My status = {}", get_gossip_status(*my_ep_state));
}
@@ -1745,7 +1756,7 @@ sstring gossiper::get_gossip_status(const endpoint_state& ep_state) const {
}
sstring gossiper::get_gossip_status(const inet_address& endpoint) const {
auto ep_state = get_endpoint_state_for_endpoint(endpoint);
auto* ep_state = get_endpoint_state_for_endpoint_ptr(endpoint);
if (!ep_state) {
return "";
}
@@ -1793,7 +1804,7 @@ future<> gossiper::wait_for_gossip_to_settle() {
}
bool gossiper::is_safe_for_bootstrap(inet_address endpoint) {
auto eps = get_endpoint_state_for_endpoint(endpoint);
auto* eps = get_endpoint_state_for_endpoint_ptr(endpoint);
// if there's no previous state, or the node was previously removed from the cluster, we're good
if (!eps || is_dead_state(*eps)) {
@@ -1823,7 +1834,7 @@ std::set<sstring> to_feature_set(sstring features_string) {
std::set<sstring> gossiper::get_supported_features(inet_address endpoint) const {
std::set<sstring> features;
auto ep_state = get_endpoint_state_for_endpoint(endpoint);
auto* ep_state = get_endpoint_state_for_endpoint_ptr(endpoint);
if (!ep_state) {
return features;
}

View File

@@ -385,7 +385,10 @@ private:
public:
clk::time_point get_expire_time_for_endpoint(inet_address endpoint);
std::experimental::optional<endpoint_state> get_endpoint_state_for_endpoint(inet_address ep) const;
const endpoint_state* get_endpoint_state_for_endpoint_ptr(inet_address ep) const;
// Use with caution, copies might be expensive (see #764)
stdx::optional<endpoint_state> get_endpoint_state_for_endpoint(inet_address ep) const;
// removes ALL endpoint states; should only be called after shadow gossip
void reset_endpoint_state_map();

View File

@@ -126,7 +126,7 @@ private:
sstring get_endpoint_info(inet_address endpoint, gms::application_state key,
const sstring& default_val) {
gms::gossiper& local_gossiper = gms::get_local_gossiper();
auto state = local_gossiper.get_endpoint_state_for_endpoint(endpoint);
auto* state = local_gossiper.get_endpoint_state_for_endpoint_ptr(endpoint);
// First, look in the gossiper::endpoint_state_map...
if (state) {

View File

@@ -191,7 +191,7 @@ future<> migration_manager::maybe_schedule_schema_pull(const utils::UUID& their_
return sleep(migration_delay).then([this, &proxy, endpoint] {
// grab the latest version of the schema since it may have changed again since the initial scheduling
auto& gossiper = gms::get_local_gossiper();
auto ep_state = gossiper.get_endpoint_state_for_endpoint(endpoint);
auto* ep_state = gossiper.get_endpoint_state_for_endpoint_ptr(endpoint);
if (!ep_state) {
mlogger.debug("epState vanished for {}, not submitting migration task", endpoint);
return make_ready_future<>();
@@ -261,7 +261,7 @@ future<> migration_manager::merge_schema_from(netw::messaging_service::msg_addr
bool migration_manager::has_compatible_schema_tables_version(const gms::inet_address& endpoint) {
auto& gossiper = gms::get_local_gossiper();
auto ep_state = gossiper.get_endpoint_state_for_endpoint(endpoint);
auto* ep_state = gossiper.get_endpoint_state_for_endpoint_ptr(endpoint);
if (!ep_state) {
return false;
}

View File

@@ -448,7 +448,7 @@ void storage_service::join_token_ring(int delay) {
auto existing = _token_metadata.get_endpoint(token);
if (existing) {
auto& gossiper = gms::get_local_gossiper();
auto eps = gossiper.get_endpoint_state_for_endpoint(*existing);
auto* eps = gossiper.get_endpoint_state_for_endpoint_ptr(*existing);
if (eps && eps->get_update_timestamp() > gms::gossiper::clk::now() - std::chrono::milliseconds(delay)) {
throw std::runtime_error("Cannot replace a live node...");
}
@@ -590,7 +590,7 @@ void storage_service::bootstrap(std::unordered_set<token> tokens) {
sstring
storage_service::get_rpc_address(const inet_address& endpoint) const {
if (endpoint != get_broadcast_address()) {
auto v = gms::get_local_gossiper().get_endpoint_state_for_endpoint(endpoint)->get_application_state(gms::application_state::RPC_ADDRESS);
auto v = gms::get_local_gossiper().get_endpoint_state_for_endpoint_ptr(endpoint)->get_application_state(gms::application_state::RPC_ADDRESS);
if (v) {
return v.value().value;
}
@@ -705,7 +705,7 @@ void storage_service::handle_state_normal(inet_address endpoint) {
auto existing = _token_metadata.get_endpoint_for_host_id(host_id);
if (db().local().is_replacing() &&
db().local().get_replace_address() &&
gossiper.get_endpoint_state_for_endpoint(db().local().get_replace_address().value()) &&
gossiper.get_endpoint_state_for_endpoint_ptr(db().local().get_replace_address().value()) &&
(host_id == gossiper.get_host_id(db().local().get_replace_address().value()))) {
slogger.warn("Not updating token metadata for {} because I am replacing it", endpoint);
} else {
@@ -909,7 +909,7 @@ void storage_service::handle_state_removing(inet_address endpoint, std::vector<s
_token_metadata.add_leaving_endpoint(endpoint);
update_pending_ranges().get();
// find the endpoint coordinating this removal that we need to notify when we're done
auto state = gossiper.get_endpoint_state_for_endpoint(endpoint);
auto* state = gossiper.get_endpoint_state_for_endpoint_ptr(endpoint);
if (!state) {
auto err = sprint("Can not find endpoint_state for endpoint=%s", endpoint);
slogger.warn("{}", err);
@@ -1007,7 +1007,7 @@ void storage_service::on_change(inet_address endpoint, application_state state,
}
} else {
auto& gossiper = gms::get_local_gossiper();
auto ep_state = gossiper.get_endpoint_state_for_endpoint(endpoint);
auto* ep_state = gossiper.get_endpoint_state_for_endpoint_ptr(endpoint);
if (!ep_state || gossiper.is_dead_state(*ep_state)) {
slogger.debug("Ignoring state change for dead or unknown endpoint: {}", endpoint);
return;
@@ -1098,7 +1098,7 @@ void storage_service::do_update_system_peers_table(gms::inet_address endpoint, c
void storage_service::update_peer_info(gms::inet_address endpoint) {
using namespace gms;
auto& gossiper = gms::get_local_gossiper();
auto ep_state = gossiper.get_endpoint_state_for_endpoint(endpoint);
auto* ep_state = gossiper.get_endpoint_state_for_endpoint_ptr(endpoint);
if (!ep_state) {
return;
}
@@ -1111,7 +1111,7 @@ void storage_service::update_peer_info(gms::inet_address endpoint) {
sstring storage_service::get_application_state_value(inet_address endpoint, application_state appstate) {
auto& gossiper = gms::get_local_gossiper();
auto eps = gossiper.get_endpoint_state_for_endpoint(endpoint);
auto* eps = gossiper.get_endpoint_state_for_endpoint_ptr(endpoint);
if (!eps) {
return {};
}
@@ -1552,7 +1552,7 @@ future<std::unordered_set<token>> storage_service::prepare_replacement_info() {
auto& gossiper = gms::get_local_gossiper();
gossiper.check_knows_remote_features(get_config_supported_features());
// now that we've gossiped at least once, we should be able to find the node we're replacing
auto state = gossiper.get_endpoint_state_for_endpoint(replace_address);
auto* state = gossiper.get_endpoint_state_for_endpoint_ptr(replace_address);
if (!state) {
throw std::runtime_error(sprint("Cannot replace_address %s because it doesn't exist in gossip", replace_address));
}
@@ -3002,9 +3002,10 @@ void storage_service::range_relocator::calculate_to_from_streams(std::unordered_
auto source_ip = address_list.front();
auto& gossiper = gms::get_local_gossiper();
auto state = gossiper.get_endpoint_state_for_endpoint(source_ip);
if (gossiper.is_enabled() && state && !state->is_alive())
auto* state = gossiper.get_endpoint_state_for_endpoint_ptr(source_ip);
if (gossiper.is_enabled() && state && !state->is_alive()) {
throw std::runtime_error(sprint("A node required to move the data consistently is down (%s). If you wish to move the data from a potentially inconsistent replica, restart the node with consistent_rangemovement=false", source_ip));
}
}
}
// calculating endpoints to stream current ranges to if needed