gossiper: keep and serve shared endpoint_state_ptr in map

This commit changes the interface to
using endpoint_state_ptr = lw_shared_ptr<const endpoint_state>
so that users can get a snapshot of the endpoint_state
that they must not modify in-place anyhow.
While internally, gossiper still has the legacy helpers
to manage the endpoint_state.

Fixes scylladb/scylladb#14799

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
Benny Halevy
2023-08-17 16:59:06 +03:00
parent f33a6d37f2
commit d00e49a1bb
8 changed files with 111 additions and 98 deletions

View File

@@ -71,7 +71,7 @@ void set_failure_detector(http_context& ctx, routes& r, gms::gossiper& g) {
});
fd::get_endpoint_state.set(r, [&g] (std::unique_ptr<request> req) {
auto* state = g.get_endpoint_state_for_endpoint_ptr(gms::inet_address(req->param["addr"]));
auto state = g.get_endpoint_state_ptr(gms::inet_address(req->param["addr"]));
if (!state) {
return make_ready_future<json::json_return_type>(format("unknown endpoint {}", req->param["addr"]));
}

View File

@@ -151,6 +151,16 @@ public:
friend std::ostream& operator<<(std::ostream& os, const endpoint_state& x);
};
using endpoint_state_ptr = lw_shared_ptr<const endpoint_state>;
inline endpoint_state_ptr make_endpoint_state_ptr(const endpoint_state& eps) {
return make_lw_shared<endpoint_state>(eps);
}
inline endpoint_state_ptr make_endpoint_state_ptr(endpoint_state&& eps) {
return make_lw_shared<endpoint_state>(std::move(eps));
}
// The endpoint state is protected with an endpoint lock
// acquired in the gossiper using gossiper::lock_endpoint.
//

View File

@@ -105,7 +105,7 @@ gossiper::gossiper(abort_source& as, const locator::shared_token_metadata& stm,
_metrics.add_group("gossip", {
sm::make_counter("heart_beat",
[ep, this] {
auto es = get_endpoint_state_for_endpoint_ptr(ep);
auto es = get_endpoint_state_ptr(ep);
if (es) {
return es->get_heart_beat_state().get_heart_beat_version().value();
} else {
@@ -144,7 +144,7 @@ void gossiper::do_sort(utils::chunked_vector<gossip_digest>& g_digest_list) {
utils::chunked_vector<gossip_digest> diff_digests;
for (auto g_digest : g_digest_list) {
auto ep = g_digest.get_endpoint();
auto* ep_state = this->get_endpoint_state_for_endpoint_ptr(ep);
auto ep_state = this->get_endpoint_state_ptr(ep);
version_type version = ep_state ? this->get_max_endpoint_state_version(*ep_state) : version_type();
int32_t diff_version = ::abs(version - g_digest.get_max_version());
diff_digests.emplace_back(gossip_digest(ep, g_digest.get_generation(), version_type(diff_version)));
@@ -360,7 +360,7 @@ future<> gossiper::do_send_ack2_msg(msg_addr from, utils::chunked_vector<gossip_
std::map<inet_address, endpoint_state> delta_ep_state_map;
for (auto g_digest : ack_msg_digest) {
inet_address addr = g_digest.get_endpoint();
const auto es = get_endpoint_state_for_endpoint_ptr(addr);
const auto es = get_endpoint_state_ptr(addr);
if (!es || es->get_heart_beat_state().get_generation() < g_digest.get_generation()) {
continue;
}
@@ -421,7 +421,7 @@ future<> gossiper::handle_echo_msg(gms::inet_address from, std::optional<int64_t
if (it == _advertise_to_nodes.end()) {
respond = false;
} else {
auto es = get_endpoint_state_for_endpoint_ptr(from);
auto es = get_endpoint_state_ptr(from);
if (es) {
auto saved_generation_number = it->second;
auto current_generation_number = generation_number_opt ?
@@ -450,7 +450,7 @@ future<> gossiper::handle_shutdown_msg(inet_address from, std::optional<int64_t>
auto permit = co_await this->lock_endpoint(from, null_permit_id);
if (generation_number_opt) {
debug_validate_gossip_generation(*generation_number_opt);
auto es = this->get_endpoint_state_for_endpoint_ptr(from);
auto es = this->get_endpoint_state_ptr(from);
if (es) {
auto local_generation = es->get_heart_beat_state().get_generation();
logger.info("Got shutdown message from {}, received_generation={}, local_generation={}",
@@ -473,12 +473,10 @@ future<gossip_get_endpoint_states_response>
gossiper::handle_get_endpoint_states_msg(gossip_get_endpoint_states_request request) {
std::unordered_map<gms::inet_address, gms::endpoint_state> map;
const auto& application_states_wanted = request.application_states;
for (auto& item : _endpoint_state_map) {
const inet_address& node = item.first;
const endpoint_state& state = item.second;
const heart_beat_state& hbs = state.get_heart_beat_state();
for (const auto& [node, state] : _endpoint_state_map) {
const heart_beat_state& hbs = state->get_heart_beat_state();
auto state_wanted = endpoint_state(hbs);
const std::map<application_state, versioned_value>& apps = state.get_application_state_map();
const std::map<application_state, versioned_value>& apps = state->get_application_state_map();
for (const auto& app : apps) {
if (application_states_wanted.count(app.first) > 0) {
state_wanted.get_application_state_map().emplace(app);
@@ -570,7 +568,7 @@ future<> gossiper::do_apply_state_locally(gms::inet_address node, const endpoint
// If state does not exist just add it. If it does then add it if the remote generation is greater.
// If there is a generation tie, attempt to break it by heartbeat version.
auto permit = co_await this->lock_endpoint(node, null_permit_id);
auto es = this->get_endpoint_state_for_endpoint_ptr(node);
auto* es = this->get_mutable_endpoint_state_ptr(node);
if (es) {
endpoint_state& local_state = *es;
auto local_generation = local_state.get_heart_beat_state().get_generation();
@@ -587,7 +585,7 @@ future<> gossiper::do_apply_state_locally(gms::inet_address node, const endpoint
co_await this->handle_major_state_change(node, remote_state, permit.id());
} else {
logger.debug("Applying remote_state for node {} (remote generation > local generation)", node);
_endpoint_state_map[node] = remote_state;
_endpoint_state_map[node] = make_endpoint_state_ptr(remote_state);
}
} else if (remote_generation == local_generation) {
if (listener_notification) {
@@ -625,7 +623,7 @@ future<> gossiper::do_apply_state_locally(gms::inet_address node, const endpoint
co_await this->handle_major_state_change(node, remote_state, permit.id());
} else {
logger.debug("Applying remote_state for node {} (new node)", node);
_endpoint_state_map[node] = remote_state;
_endpoint_state_map[node] = make_endpoint_state_ptr(remote_state);
}
}
}
@@ -699,10 +697,7 @@ future<> gossiper::remove_endpoint(inet_address endpoint, permit_id pid) {
logger.info("removed {} from _seeds, updated _seeds list = {}", endpoint, _seeds);
}
std::optional<endpoint_state> state;
if (auto eps = get_endpoint_state_for_endpoint_ptr(endpoint)) {
state.emplace(*eps);
}
auto state = get_endpoint_state_ptr(endpoint);
bool was_alive;
co_await mutate_live_and_unreachable_endpoints([endpoint, &was_alive] (gossiper& g) {
@@ -717,7 +712,7 @@ future<> gossiper::remove_endpoint(inet_address endpoint, permit_id pid) {
if (was_alive && state) {
try {
logger.info("InetAddress {} is now DOWN, status = {}", endpoint, get_gossip_status(*state));
co_await do_on_dead_notifications(endpoint, std::move(*state), pid);
co_await do_on_dead_notifications(endpoint, std::move(state), pid);
} catch (...) {
logger.warn("Fail to call on_dead callback: {}", std::current_exception());
}
@@ -737,7 +732,7 @@ future<> gossiper::do_status_check() {
auto permit = co_await lock_endpoint(endpoint, null_permit_id);
const auto& pid = permit.id();
auto eps = get_endpoint_state_for_endpoint_ptr(endpoint);
auto eps = get_endpoint_state_ptr(endpoint);
if (!eps) {
continue;
}
@@ -1100,7 +1095,7 @@ void gossiper::run() {
if (logger.is_enabled(logging::log_level::trace)) {
for (auto& x : _endpoint_state_map) {
logger.trace("ep={}, eps={}", x.first, x.second);
logger.trace("ep={}, eps={}", x.first, *x.second);
}
}
if (is_enabled()) {
@@ -1174,14 +1169,14 @@ int64_t gossiper::get_endpoint_downtime(inet_address ep) const noexcept {
// It is called from failure_detector
future<> gossiper::convict(inet_address endpoint) {
auto permit = co_await lock_endpoint(endpoint, null_permit_id);
auto* state = get_endpoint_state_for_endpoint_ptr(endpoint);
auto state = get_endpoint_state_ptr(endpoint);
if (!state || !is_alive(endpoint)) {
co_return;
}
if (is_shutdown(endpoint)) {
co_await mark_as_shutdown(endpoint, permit.id());
} else {
co_await mark_dead(endpoint, *state, permit.id());
co_await mark_dead(endpoint, state, permit.id());
}
}
@@ -1235,7 +1230,7 @@ void gossiper::make_random_gossip_digest(utils::chunked_vector<gossip_digest>& g
}
std::shuffle(endpoints.begin(), endpoints.end(), _random_engine);
for (auto& endpoint : endpoints) {
auto es = get_endpoint_state_for_endpoint_ptr(endpoint);
auto es = get_endpoint_state_ptr(endpoint);
if (es) {
auto& eps = *es;
generation = eps.get_heart_beat_state().get_generation();
@@ -1260,7 +1255,7 @@ future<> gossiper::replicate(inet_address ep, const endpoint_state& es, permit_i
verify_permit(ep, pid);
return container().invoke_on_all([ep, es, orig = this_shard_id(), self = shared_from_this()] (gossiper& g) {
if (this_shard_id() != orig) {
g._endpoint_state_map[ep].add_application_state(es);
g.get_or_create_endpoint_state(ep).add_application_state(es);
}
});
}
@@ -1268,14 +1263,14 @@ future<> gossiper::replicate(inet_address ep, const endpoint_state& es, permit_i
future<> gossiper::advertise_token_removed(inet_address endpoint, locator::host_id host_id, permit_id pid) {
auto permit = co_await lock_endpoint(endpoint, pid);
pid = permit.id();
auto& eps = get_endpoint_state(endpoint);
auto eps = get_endpoint_state(endpoint);
eps.update_timestamp(); // make sure we don't evict it too soon
eps.get_heart_beat_state().force_newer_generation_unsafe();
auto expire_time = compute_expire_time();
eps.add_application_state(application_state::STATUS, versioned_value::removed_nonlocal(host_id, expire_time.time_since_epoch().count()));
logger.info("Completing removal of {}", endpoint);
add_expire_time_for_endpoint(endpoint, expire_time);
_endpoint_state_map[endpoint] = eps;
_endpoint_state_map[endpoint] = make_endpoint_state_ptr(std::move(eps));
co_await replicate(endpoint, eps, pid);
// ensure at least one gossip round occurs before returning
co_await sleep_abortable(INTERVAL * 2, _abort_source);
@@ -1291,7 +1286,7 @@ future<> gossiper::assassinate_endpoint(sstring address) {
return seastar::async([&gossiper, g = gossiper.shared_from_this(), address] {
inet_address endpoint(address);
auto permit = gossiper.lock_endpoint(endpoint, null_permit_id).get0();
auto es = gossiper.get_endpoint_state_for_endpoint_ptr(endpoint);
auto es = gossiper.get_mutable_endpoint_state_ptr(endpoint);
auto now = gossiper.now();
generation_type gen(std::chrono::duration_cast<std::chrono::seconds>((now + std::chrono::seconds(60)).time_since_epoch()).count());
version_type ver(9999);
@@ -1312,7 +1307,7 @@ future<> gossiper::assassinate_endpoint(sstring address) {
// make sure it did not change
sleep_abortable(ring_delay, gossiper._abort_source).get();
es = gossiper.get_endpoint_state_for_endpoint_ptr(endpoint);
es = gossiper.get_mutable_endpoint_state_ptr(endpoint);
if (!es) {
logger.warn("Endpoint {} disappeared while trying to assassinate, continuing anyway", endpoint);
} else {
@@ -1379,7 +1374,7 @@ future<> gossiper::do_gossip_to_unreachable_member(gossip_digest_syn message) {
}
bool gossiper::is_gossip_only_member(inet_address endpoint) {
auto es = get_endpoint_state_for_endpoint_ptr(endpoint);
auto es = get_endpoint_state_ptr(endpoint);
if (!es) {
return false;
}
@@ -1397,30 +1392,30 @@ clk::time_point gossiper::get_expire_time_for_endpoint(inet_address endpoint) co
}
}
const endpoint_state* gossiper::get_endpoint_state_for_endpoint_ptr(inet_address ep) const noexcept {
endpoint_state_ptr gossiper::get_endpoint_state_ptr(inet_address ep) const noexcept {
auto it = _endpoint_state_map.find(ep);
if (it == _endpoint_state_map.end()) {
return nullptr;
} else {
return &it->second;
return it->second;
}
}
endpoint_state* gossiper::get_endpoint_state_for_endpoint_ptr(inet_address ep) noexcept {
endpoint_state* gossiper::get_mutable_endpoint_state_ptr(inet_address ep) noexcept {
auto it = _endpoint_state_map.find(ep);
if (it == _endpoint_state_map.end()) {
return nullptr;
} else {
return &it->second;
return const_cast<endpoint_state*>(it->second.get());
}
}
endpoint_state& gossiper::get_endpoint_state(inet_address ep) {
auto ptr = get_endpoint_state_for_endpoint_ptr(ep);
if (!ptr) {
auto it = _endpoint_state_map.find(ep);
if (it == _endpoint_state_map.end()) {
throw std::out_of_range(format("ep={}", ep));
}
return *ptr;
return const_cast<endpoint_state&>(*it->second);
}
endpoint_state& gossiper::get_or_create_endpoint_state(inet_address ep) {
@@ -1428,9 +1423,9 @@ endpoint_state& gossiper::get_or_create_endpoint_state(inet_address ep) {
if (it == _endpoint_state_map.end()) {
auto eps = endpoint_state();
eps.add_application_state(application_state::RPC_ADDRESS, versioned_value::rpcaddress(ep));
it = _endpoint_state_map.emplace(ep, std::move(eps)).first;
it = _endpoint_state_map.emplace(ep, make_endpoint_state_ptr(std::move(eps))).first;
}
return it->second;
return const_cast<endpoint_state&>(*it->second);
}
future<> gossiper::reset_endpoint_state_map() {
@@ -1445,7 +1440,7 @@ future<> gossiper::reset_endpoint_state_map() {
});
}
const std::unordered_map<inet_address, endpoint_state>& gms::gossiper::get_endpoint_states() const noexcept {
const std::unordered_map<inet_address, endpoint_state_ptr>& gms::gossiper::get_endpoint_states() const noexcept {
return _endpoint_state_map;
}
@@ -1455,7 +1450,7 @@ std::vector<inet_address> gossiper::get_endpoints() const {
stop_iteration gossiper::for_each_endpoint_state_until(std::function<stop_iteration(const inet_address&, const endpoint_state&)> func) const {
for (const auto& [node, eps] : _endpoint_state_map) {
if (func(node, eps) == stop_iteration::yes) {
if (func(node, *eps) == stop_iteration::yes) {
return stop_iteration::yes;
}
}
@@ -1476,7 +1471,7 @@ bool gossiper::is_cql_ready(const inet_address& endpoint) const {
// never has application_state::RPC_READY in the endpoint_state, we can
// only think their cql server is up, so we return true here if
// application_state::RPC_READY is not present
auto* eps = get_endpoint_state_for_endpoint_ptr(endpoint);
auto eps = get_endpoint_state_ptr(endpoint);
if (!eps) {
logger.debug("Node {} does not have RPC_READY application_state, return is_cql_ready=true", endpoint);
return true;
@@ -1499,9 +1494,8 @@ locator::host_id gossiper::get_host_id(inet_address endpoint) const {
std::set<gms::inet_address> gossiper::get_nodes_with_host_id(locator::host_id host_id) const {
std::set<gms::inet_address> nodes;
for (auto& x : get_endpoint_states()) {
auto node = x.first;
auto app_state = get_application_state_ptr(node, application_state::HOST_ID);
for (const auto& [node, eps] : get_endpoint_states()) {
auto app_state = eps->get_application_state_ptr(application_state::HOST_ID);
if (app_state && host_id == locator::host_id(utils::UUID(app_state->value()))) {
nodes.insert(node);
}
@@ -1511,7 +1505,7 @@ std::set<gms::inet_address> gossiper::get_nodes_with_host_id(locator::host_id ho
std::optional<endpoint_state> gossiper::get_state_for_version_bigger_than(inet_address for_endpoint, version_type version) {
std::optional<endpoint_state> reqd_endpoint_state;
auto es = get_endpoint_state_for_endpoint_ptr(for_endpoint);
auto es = get_endpoint_state_ptr(for_endpoint);
if (es) {
auto& eps = *es;
/*
@@ -1544,8 +1538,8 @@ std::optional<endpoint_state> gossiper::get_state_for_version_bigger_than(inet_a
}
generation_type::value_type gossiper::compare_endpoint_startup(inet_address addr1, inet_address addr2) {
auto* ep1 = get_endpoint_state_for_endpoint_ptr(addr1);
auto* ep2 = get_endpoint_state_for_endpoint_ptr(addr2);
auto ep1 = get_endpoint_state_ptr(addr1);
auto ep2 = get_endpoint_state_ptr(addr2);
if (!ep1 || !ep2) {
auto err = format("Can not get endpoint_state for {} or {}", addr1, addr2);
logger.warn("{}", err);
@@ -1568,7 +1562,7 @@ void gossiper::update_timestamp_for_nodes(const std::map<inet_address, endpoint_
for (const auto& x : map) {
const gms::inet_address& endpoint = x.first;
const endpoint_state& remote_endpoint_state = x.second;
auto* local_endpoint_state = get_endpoint_state_for_endpoint_ptr(endpoint);
auto* local_endpoint_state = get_mutable_endpoint_state_ptr(endpoint);
if (local_endpoint_state) {
bool update = false;
auto local_generation = local_endpoint_state->get_heart_beat_state().get_generation();
@@ -1626,7 +1620,7 @@ future<> gossiper::real_mark_alive(inet_address addr) {
// After sending echo message, the Node might not be in the
// _endpoint_state_map anymore, use the reference of local_state
// might cause user-after-free
auto* es = get_endpoint_state_for_endpoint_ptr(addr);
auto* es = get_mutable_endpoint_state_ptr(addr);
if (!es) {
logger.info("Node {} is not in endpoint_state_map anymore", addr);
co_return;
@@ -1680,15 +1674,14 @@ future<> gossiper::real_mark_alive(inet_address addr) {
});
}
future<> gossiper::mark_dead(inet_address addr, endpoint_state& local_state, permit_id pid) {
future<> gossiper::mark_dead(inet_address addr, endpoint_state_ptr state, permit_id pid) {
logger.trace("marking as down {}", addr);
verify_permit(addr, pid);
endpoint_state state = local_state;
co_await mutate_live_and_unreachable_endpoints([addr] (gossiper& g) {
g._live_endpoints.erase(addr);
g._unreachable_endpoints[addr] = now();
});
logger.info("InetAddress {} is now DOWN, status = {}", addr, get_gossip_status(state));
logger.info("InetAddress {} is now DOWN, status = {}", addr, get_gossip_status(*state));
co_await do_on_dead_notifications(addr, std::move(state), pid);
}
@@ -1696,7 +1689,7 @@ future<> gossiper::handle_major_state_change(inet_address ep, const endpoint_sta
verify_permit(ep, pid);
std::optional<endpoint_state> eps_old;
if (auto* p = get_endpoint_state_for_endpoint_ptr(ep); p) {
if (auto p = get_endpoint_state_ptr(ep)) {
eps_old = *p;
}
@@ -1708,7 +1701,7 @@ future<> gossiper::handle_major_state_change(inet_address ep, const endpoint_sta
}
}
logger.trace("Adding endpoint state for {}, status = {}", ep, get_gossip_status(eps));
_endpoint_state_map[ep] = eps;
_endpoint_state_map[ep] = make_endpoint_state_ptr(eps);
co_await replicate(ep, eps, pid);
if (is_in_shadow_round()) {
@@ -1728,15 +1721,18 @@ future<> gossiper::handle_major_state_change(inet_address ep, const endpoint_sta
});
}
auto& ep_state = _endpoint_state_map.at(ep);
if (!is_dead_state(ep_state)) {
auto ep_state = get_endpoint_state_ptr(ep);
if (!ep_state) {
throw std::out_of_range(format("ep={}", ep));
}
if (!is_dead_state(*ep_state)) {
mark_alive(ep);
} else {
logger.debug("Not marking {} alive due to dead state {}", ep, get_gossip_status(eps));
co_await mark_dead(ep, ep_state, pid);
co_await mark_dead(ep, std::move(ep_state), pid);
}
auto* eps_new = get_endpoint_state_for_endpoint_ptr(ep);
auto eps_new = get_endpoint_state_ptr(ep);
if (eps_new) {
co_await _subscribers.for_each([ep, eps_new, pid] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
return subscriber->on_join(ep, *eps_new, pid);
@@ -1867,9 +1863,9 @@ future<> gossiper::do_on_change_notifications(inet_address addr, const applicati
});
}
future<> gossiper::do_on_dead_notifications(inet_address addr, endpoint_state state, permit_id pid) {
future<> gossiper::do_on_dead_notifications(inet_address addr, endpoint_state_ptr state, permit_id pid) {
co_await _subscribers.for_each([addr, state = std::move(state), pid] (shared_ptr<i_endpoint_state_change_subscriber> subscriber) {
return subscriber->on_dead(addr, state, pid);
return subscriber->on_dead(addr, *state, pid);
});
}
@@ -1910,14 +1906,14 @@ void gossiper::examine_gossiper(utils::chunked_vector<gossip_digest>& g_digest_l
auto max_remote_version = g_digest.get_max_version();
/* Get state associated with the end point in digest */
auto&& ep = g_digest.get_endpoint();
auto es = get_endpoint_state_for_endpoint_ptr(ep);
auto es = get_endpoint_state_ptr(ep);
/* Here we need to fire a GossipDigestAckMessage. If we have some
* data associated with this endpoint locally then we follow the
* "if" path of the logic. If we have absolutely nothing for this
* endpoint we need to request all the data for this endpoint.
*/
if (es) {
endpoint_state& ep_state_ptr = *es;
const endpoint_state& ep_state_ptr = *es;
auto local_generation = ep_state_ptr.get_heart_beat_state().get_generation();
/* get the max version of all keys in the state associated with this endpoint */
auto max_local_version = get_max_endpoint_state_version(ep_state_ptr);
@@ -2003,7 +1999,7 @@ future<gossiper::generation_for_nodes>
gossiper::get_generation_for_nodes(std::unordered_set<gms::inet_address> nodes) {
generation_for_nodes ret;
for (const auto& node : nodes) {
auto es = get_endpoint_state_for_endpoint_ptr(node);
auto es = get_endpoint_state_ptr(node);
if (es) {
auto current_generation_number = es->get_heart_beat_state().get_generation();
ret.emplace(node, current_generation_number);
@@ -2126,7 +2122,7 @@ future<> gossiper::add_saved_endpoint(inet_address ep) {
//preserve any previously known, in-memory data about the endpoint (such as DC, RACK, and so on)
auto ep_state = endpoint_state();
auto es = get_endpoint_state_for_endpoint_ptr(ep);
auto es = get_endpoint_state_ptr(ep);
if (es) {
ep_state = *es;
logger.debug("not replacing a previous ep_state for {}, but reusing it: {}", ep, ep_state);
@@ -2142,7 +2138,7 @@ future<> gossiper::add_saved_endpoint(inet_address ep) {
if (host_id) {
ep_state.add_application_state(gms::application_state::HOST_ID, versioned_value::host_id(host_id.value()));
}
_endpoint_state_map[ep] = ep_state;
_endpoint_state_map[ep] = make_endpoint_state_ptr(ep_state);
co_await replicate(ep, ep_state, permit.id());
_unreachable_endpoints[ep] = now();
logger.trace("Adding saved endpoint {} {}", ep, ep_state.get_heart_beat_state().get_generation());
@@ -2185,24 +2181,22 @@ future<> gossiper::add_local_application_state(std::list<std::pair<application_s
inet_address ep_addr = gossiper.get_broadcast_address();
// for symmetry with other apply, use endpoint lock for our own address.
auto permit = gossiper.lock_endpoint(ep_addr, null_permit_id).get0();
auto es = gossiper.get_endpoint_state_for_endpoint_ptr(ep_addr);
if (!es) {
auto ep_state_before = gossiper.get_endpoint_state_ptr(ep_addr);
if (!ep_state_before) {
auto err = format("endpoint_state_map does not contain endpoint = {}, application_states = {}",
ep_addr, states);
throw std::runtime_error(err);
}
endpoint_state ep_state_before = *es;
for (auto& p : states) {
auto& state = p.first;
auto& value = p.second;
// Fire "before change" notifications:
// Not explicit, but apparently we allow this to defer (inside out implicit seastar::async)
gossiper.do_before_change_notifications(ep_addr, ep_state_before, state, value).get();
gossiper.do_before_change_notifications(ep_addr, *ep_state_before, state, value).get();
}
es = gossiper.get_endpoint_state_for_endpoint_ptr(ep_addr);
auto es = gossiper.get_mutable_endpoint_state_ptr(ep_addr);
if (!es) {
return;
}
@@ -2248,7 +2242,7 @@ future<> gossiper::do_stop_gossiping() {
return make_ready_future<>();
}
return seastar::async([this, g = this->shared_from_this()] {
auto* my_ep_state = get_endpoint_state_for_endpoint_ptr(get_broadcast_address());
auto my_ep_state = get_endpoint_state_ptr(get_broadcast_address());
if (my_ep_state) {
logger.info("My status = {}", get_gossip_status(*my_ep_state));
}
@@ -2358,7 +2352,7 @@ bool gossiper::is_alive(inet_address ep) const {
#ifndef SCYLLA_BUILD_MODE_RELEASE
// Live endpoints must always have a valid endpoint_state.
// Verify that in testing mode to reduce the overhead in production.
if (is_alive && !get_endpoint_state_for_endpoint_ptr(ep)) {
if (is_alive && !get_endpoint_state_ptr(ep)) {
on_internal_error(logger, fmt::format("Node {} is alive but has no endpoint state", ep));
}
#endif
@@ -2412,7 +2406,7 @@ future<> gossiper::wait_for_live_nodes_to_show_up(size_t n) {
}
const versioned_value* gossiper::get_application_state_ptr(inet_address endpoint, application_state appstate) const noexcept {
auto* eps = get_endpoint_state_for_endpoint_ptr(std::move(endpoint));
auto eps = get_endpoint_state_ptr(std::move(endpoint));
if (!eps) {
return nullptr;
}
@@ -2433,13 +2427,13 @@ sstring gossiper::get_application_state_value(inet_address endpoint, application
*/
future<> gossiper::mark_as_shutdown(const inet_address& endpoint, permit_id pid) {
verify_permit(endpoint, pid);
auto es = get_endpoint_state_for_endpoint_ptr(endpoint);
auto es = get_mutable_endpoint_state_ptr(endpoint);
if (es) {
auto& ep_state = *es;
ep_state.add_application_state(application_state::STATUS, versioned_value::shutdown(true));
ep_state.get_heart_beat_state().force_highest_possible_version_unsafe();
co_await replicate(endpoint, ep_state, pid);
co_await mark_dead(endpoint, ep_state, pid);
co_await mark_dead(endpoint, get_endpoint_state_ptr(endpoint), pid);
}
}
@@ -2532,7 +2526,7 @@ bool gossiper::is_safe_for_bootstrap(inet_address endpoint) {
// 1) The node is a completely new node and no state in gossip at all
// 2) The node has state in gossip and it is already removed from the
// cluster either by nodetool decommission or nodetool removenode
auto* eps = get_endpoint_state_for_endpoint_ptr(endpoint);
auto eps = get_endpoint_state_ptr(endpoint);
bool allowed = true;
if (!eps) {
logger.debug("is_safe_for_bootstrap: node={}, status=no state in gossip, allowed_to_bootstrap={}", endpoint, allowed);
@@ -2651,7 +2645,7 @@ void gossiper::check_knows_remote_features(std::set<std::string_view>& local_fea
void gossiper::check_snitch_name_matches(sstring local_snitch_name) const {
for (const auto& [address, state] : _endpoint_state_map) {
const auto remote_snitch_name = state.get_application_state_ptr(application_state::SNITCH_NAME);
const auto remote_snitch_name = state->get_application_state_ptr(application_state::SNITCH_NAME);
if (!remote_snitch_name) {
continue;
}

View File

@@ -184,7 +184,7 @@ private:
}
/* map where key is the endpoint and value is the state associated with the endpoint */
std::unordered_map<inet_address, endpoint_state> _endpoint_state_map;
std::unordered_map<inet_address, endpoint_state_ptr> _endpoint_state_map;
// Used for serializing changes to _endpoint_state_map and running of associated change listeners.
endpoint_locks_map _endpoint_locks;
@@ -409,15 +409,14 @@ private:
future<> do_status_check();
const std::unordered_map<inet_address, endpoint_state>& get_endpoint_states() const noexcept;
const std::unordered_map<inet_address, endpoint_state_ptr>& get_endpoint_states() const noexcept;
public:
clk::time_point get_expire_time_for_endpoint(inet_address endpoint) const noexcept;
const endpoint_state* get_endpoint_state_for_endpoint_ptr(inet_address ep) const noexcept;
endpoint_state& get_endpoint_state(inet_address ep);
endpoint_state* get_endpoint_state_for_endpoint_ptr(inet_address ep) noexcept;
// Gets a shared pointer to the endpoint_state, if exists.
// Otherwise, returns a null ptr.
endpoint_state_ptr get_endpoint_state_ptr(inet_address ep) const noexcept;
const versioned_value* get_application_state_ptr(inet_address endpoint, application_state appstate) const noexcept;
sstring get_application_state_value(inet_address endpoint, application_state appstate) const;
@@ -466,12 +465,17 @@ public:
*/
sstring get_rpc_address(const inet_address& endpoint) const;
private:
// FIXME: for now, allow modifying the endpoint_state in place
// until all updates are applied only using replicate
// Gets or creates endpoint_state for this node
endpoint_state& get_or_create_endpoint_state(inet_address ep);
endpoint_state& my_endpoint_state() {
return get_or_create_endpoint_state(get_broadcast_address());
}
endpoint_state* get_mutable_endpoint_state_ptr(inet_address ep) noexcept;
endpoint_state& get_endpoint_state(inet_address ep);
void update_timestamp_for_nodes(const std::map<inet_address, endpoint_state>& map);
void mark_alive(inet_address addr);
@@ -479,7 +483,7 @@ private:
future<> real_mark_alive(inet_address addr);
// Must be called under lock_endpoint.
future<> mark_dead(inet_address addr, endpoint_state& local_state, permit_id);
future<> mark_dead(inet_address addr, endpoint_state_ptr local_state, permit_id);
// Must be called under lock_endpoint.
future<> mark_as_shutdown(const inet_address& endpoint, permit_id);
@@ -528,7 +532,7 @@ private:
// notify that a node is DOWN (dead)
// Must be called under lock_endpoint.
future<> do_on_dead_notifications(inet_address addr, endpoint_state state, permit_id);
future<> do_on_dead_notifications(inet_address addr, endpoint_state_ptr state, permit_id);
/* Request all the state for the endpoint in the g_digest */

View File

@@ -2330,7 +2330,7 @@ table::cache_hit_rate table::get_hit_rate(const gms::gossiper& gossiper, gms::in
auto it = _cluster_cache_hit_rates.find(addr);
if (it == _cluster_cache_hit_rates.end()) {
// no data yet, get it from the gossiper
auto* eps = gossiper.get_endpoint_state_for_endpoint_ptr(addr);
auto eps = gossiper.get_endpoint_state_ptr(addr);
if (eps) {
auto* state = eps->get_application_state_ptr(gms::application_state::CACHE_HITRATES);
float f = -1.0f; // missing state means old node

View File

@@ -4212,6 +4212,11 @@ class scylla_gms(gdb.Command):
gossiper = sharded(gdb.parse_and_eval('gms::_the_gossiper')).local()
state_map = gossiper['endpoint_state_map']
for (endpoint, state) in unordered_map(state_map):
try:
state_ptr = seastar_lw_shared_ptr(state)
state = state_ptr.get().dereference()
except Exception:
pass
ip = ip_to_str(int(get_ip(endpoint)), byteorder=sys.byteorder)
gdb.write('%s: (gms::endpoint_state*) %s (%s)\n' % (ip, state.address, state['_heart_beat_state']))
for app_state, vv in std_map(state['_application_state']):

View File

@@ -233,7 +233,7 @@ bool migration_manager::have_schema_agreement() {
return stop_iteration::no;
}
mlogger.debug("Checking schema state for {}.", endpoint);
auto* schema = eps.get_application_state_ptr(gms::application_state::SCHEMA);
auto schema = eps.get_application_state_ptr(gms::application_state::SCHEMA);
if (!schema) {
mlogger.debug("Schema state not yet available for {}.", endpoint);
match = false;
@@ -289,7 +289,7 @@ future<> migration_manager::maybe_schedule_schema_pull(const table_schema_versio
// pushed out simultaneously. See CASSANDRA-5025
return sleep_abortable(migration_delay, _as).then([this, &db, endpoint] {
// grab the latest version of the schema since it may have changed again since the initial scheduling
auto* ep_state = _gossiper.get_endpoint_state_for_endpoint_ptr(endpoint);
auto ep_state = _gossiper.get_endpoint_state_ptr(endpoint);
if (!ep_state) {
mlogger.debug("epState vanished for {}, not submitting migration task", endpoint);
return make_ready_future<>();
@@ -1206,7 +1206,7 @@ future<> migration_manager::on_join(gms::inet_address endpoint, gms::endpoint_st
future<> migration_manager::on_change(gms::inet_address endpoint, gms::application_state state, const gms::versioned_value& value, gms::permit_id) {
if (state == gms::application_state::SCHEMA) {
auto* ep_state = _gossiper.get_endpoint_state_for_endpoint_ptr(endpoint);
auto ep_state = _gossiper.get_endpoint_state_ptr(endpoint);
if (!ep_state || _gossiper.is_dead_state(*ep_state)) {
mlogger.debug("Ignoring state change for dead or unknown endpoint: {}", endpoint);
return make_ready_future();

View File

@@ -486,7 +486,7 @@ future<> storage_service::topology_state_load() {
// endpoints. We cannot rely on seeds alone, since it is not guaranteed that seeds
// will be up to date and reachable at the time of restart.
for (const auto& e: get_token_metadata_ptr()->get_all_endpoints()) {
if (!utils::fb_utilities::is_me(e) && !_gossiper.get_endpoint_state_for_endpoint_ptr(e)) {
if (!utils::fb_utilities::is_me(e) && !_gossiper.get_endpoint_state_ptr(e)) {
co_await _gossiper.add_saved_endpoint(e);
}
}
@@ -2447,7 +2447,7 @@ future<> storage_service::join_token_ring(sharded<db::system_distributed_keyspac
auto local_host_id = _db.local().get_config().host_id;
if (!replacing_a_node_with_diff_ip) {
auto endpoint = get_broadcast_address();
auto eps = _gossiper.get_endpoint_state_for_endpoint_ptr(endpoint);
auto eps = _gossiper.get_endpoint_state_ptr(endpoint);
if (eps) {
auto replace_host_id = _gossiper.get_host_id(get_broadcast_address());
slogger.info("Host {}/{} is replacing {}/{} using the same address", local_host_id, endpoint, replace_host_id, endpoint);
@@ -2675,7 +2675,7 @@ future<> storage_service::join_token_ring(sharded<db::system_distributed_keyspac
for (auto token : bootstrap_tokens) {
auto existing = tmptr->get_endpoint(token);
if (existing) {
auto* eps = _gossiper.get_endpoint_state_for_endpoint_ptr(*existing);
auto eps = _gossiper.get_endpoint_state_ptr(*existing);
if (eps && eps->get_update_timestamp() > gms::gossiper::clk::now() - delay) {
throw std::runtime_error("Cannot replace a live node...");
}
@@ -3204,7 +3204,7 @@ future<> storage_service::handle_state_left(inet_address endpoint, std::vector<s
auto tokens = get_tokens_for(endpoint);
slogger.debug("Node {} state left, tokens {}", endpoint, tokens);
if (tokens.empty()) {
auto eps = _gossiper.get_endpoint_state_for_endpoint_ptr(endpoint);
auto eps = _gossiper.get_endpoint_state_ptr(endpoint);
if (eps) {
slogger.warn("handle_state_left: Tokens for node={} are empty, endpoint_state={}", endpoint, *eps);
} else {
@@ -3299,7 +3299,7 @@ future<> storage_service::on_change(inet_address endpoint, application_state sta
co_return; // did nothing.
}
} else {
auto* ep_state = _gossiper.get_endpoint_state_for_endpoint_ptr(endpoint);
auto ep_state = _gossiper.get_endpoint_state_ptr(endpoint);
if (!ep_state || _gossiper.is_dead_state(*ep_state)) {
slogger.debug("Ignoring state change for dead or unknown endpoint: {}", endpoint);
co_return;
@@ -3404,7 +3404,7 @@ future<> storage_service::do_update_system_peers_table(gms::inet_address endpoin
future<> storage_service::update_peer_info(gms::inet_address endpoint) {
slogger.debug("Update peer info: endpoint={}", endpoint);
using namespace gms;
auto* ep_state = _gossiper.get_endpoint_state_for_endpoint_ptr(endpoint);
auto ep_state = _gossiper.get_endpoint_state_ptr(endpoint);
if (!ep_state) {
co_return;
}
@@ -3787,7 +3787,7 @@ storage_service::prepare_replacement_info(std::unordered_set<gms::inet_address>
replace_address = *nodes.begin();
}
auto* state = _gossiper.get_endpoint_state_for_endpoint_ptr(replace_address);
auto state = _gossiper.get_endpoint_state_ptr(replace_address);
if (!state) {
throw std::runtime_error(::format("Cannot replace_address {} because it doesn't exist in gossip", replace_address));
}