diff --git a/api/failure_detector.cc b/api/failure_detector.cc index 24a71ba2be..8071fa31cc 100644 --- a/api/failure_detector.cc +++ b/api/failure_detector.cc @@ -71,7 +71,7 @@ void set_failure_detector(http_context& ctx, routes& r, gms::gossiper& g) { }); fd::get_endpoint_state.set(r, [&g] (std::unique_ptr req) { - auto* state = g.get_endpoint_state_for_endpoint_ptr(gms::inet_address(req->param["addr"])); + auto state = g.get_endpoint_state_ptr(gms::inet_address(req->param["addr"])); if (!state) { return make_ready_future(format("unknown endpoint {}", req->param["addr"])); } diff --git a/gms/endpoint_state.hh b/gms/endpoint_state.hh index 5fa625d1f2..c2d6db0d1a 100644 --- a/gms/endpoint_state.hh +++ b/gms/endpoint_state.hh @@ -151,6 +151,16 @@ public: friend std::ostream& operator<<(std::ostream& os, const endpoint_state& x); }; +using endpoint_state_ptr = lw_shared_ptr; + +inline endpoint_state_ptr make_endpoint_state_ptr(const endpoint_state& eps) { + return make_lw_shared(eps); +} + +inline endpoint_state_ptr make_endpoint_state_ptr(endpoint_state&& eps) { + return make_lw_shared(std::move(eps)); +} + // The endpoint state is protected with an endpoint lock // acquired in the gossiper using gossiper::lock_endpoint. // diff --git a/gms/gossiper.cc b/gms/gossiper.cc index 95df588da2..02eb05fd42 100644 --- a/gms/gossiper.cc +++ b/gms/gossiper.cc @@ -105,7 +105,7 @@ gossiper::gossiper(abort_source& as, const locator::shared_token_metadata& stm, _metrics.add_group("gossip", { sm::make_counter("heart_beat", [ep, this] { - auto es = get_endpoint_state_for_endpoint_ptr(ep); + auto es = get_endpoint_state_ptr(ep); if (es) { return es->get_heart_beat_state().get_heart_beat_version().value(); } else { @@ -144,7 +144,7 @@ void gossiper::do_sort(utils::chunked_vector& g_digest_list) { utils::chunked_vector diff_digests; for (auto g_digest : g_digest_list) { auto ep = g_digest.get_endpoint(); - auto* ep_state = this->get_endpoint_state_for_endpoint_ptr(ep); + auto ep_state = this->get_endpoint_state_ptr(ep); version_type version = ep_state ? this->get_max_endpoint_state_version(*ep_state) : version_type(); int32_t diff_version = ::abs(version - g_digest.get_max_version()); diff_digests.emplace_back(gossip_digest(ep, g_digest.get_generation(), version_type(diff_version))); @@ -360,7 +360,7 @@ future<> gossiper::do_send_ack2_msg(msg_addr from, utils::chunked_vector delta_ep_state_map; for (auto g_digest : ack_msg_digest) { inet_address addr = g_digest.get_endpoint(); - const auto es = get_endpoint_state_for_endpoint_ptr(addr); + const auto es = get_endpoint_state_ptr(addr); if (!es || es->get_heart_beat_state().get_generation() < g_digest.get_generation()) { continue; } @@ -421,7 +421,7 @@ future<> gossiper::handle_echo_msg(gms::inet_address from, std::optionalsecond; auto current_generation_number = generation_number_opt ? @@ -450,7 +450,7 @@ future<> gossiper::handle_shutdown_msg(inet_address from, std::optional auto permit = co_await this->lock_endpoint(from, null_permit_id); if (generation_number_opt) { debug_validate_gossip_generation(*generation_number_opt); - auto es = this->get_endpoint_state_for_endpoint_ptr(from); + auto es = this->get_endpoint_state_ptr(from); if (es) { auto local_generation = es->get_heart_beat_state().get_generation(); logger.info("Got shutdown message from {}, received_generation={}, local_generation={}", @@ -473,12 +473,10 @@ future gossiper::handle_get_endpoint_states_msg(gossip_get_endpoint_states_request request) { std::unordered_map map; const auto& application_states_wanted = request.application_states; - for (auto& item : _endpoint_state_map) { - const inet_address& node = item.first; - const endpoint_state& state = item.second; - const heart_beat_state& hbs = state.get_heart_beat_state(); + for (const auto& [node, state] : _endpoint_state_map) { + const heart_beat_state& hbs = state->get_heart_beat_state(); auto state_wanted = endpoint_state(hbs); - const std::map& apps = state.get_application_state_map(); + const std::map& apps = state->get_application_state_map(); for (const auto& app : apps) { if (application_states_wanted.count(app.first) > 0) { state_wanted.get_application_state_map().emplace(app); @@ -570,7 +568,7 @@ future<> gossiper::do_apply_state_locally(gms::inet_address node, const endpoint // If state does not exist just add it. If it does then add it if the remote generation is greater. // If there is a generation tie, attempt to break it by heartbeat version. auto permit = co_await this->lock_endpoint(node, null_permit_id); - auto es = this->get_endpoint_state_for_endpoint_ptr(node); + auto* es = this->get_mutable_endpoint_state_ptr(node); if (es) { endpoint_state& local_state = *es; auto local_generation = local_state.get_heart_beat_state().get_generation(); @@ -587,7 +585,7 @@ future<> gossiper::do_apply_state_locally(gms::inet_address node, const endpoint co_await this->handle_major_state_change(node, remote_state, permit.id()); } else { logger.debug("Applying remote_state for node {} (remote generation > local generation)", node); - _endpoint_state_map[node] = remote_state; + _endpoint_state_map[node] = make_endpoint_state_ptr(remote_state); } } else if (remote_generation == local_generation) { if (listener_notification) { @@ -625,7 +623,7 @@ future<> gossiper::do_apply_state_locally(gms::inet_address node, const endpoint co_await this->handle_major_state_change(node, remote_state, permit.id()); } else { logger.debug("Applying remote_state for node {} (new node)", node); - _endpoint_state_map[node] = remote_state; + _endpoint_state_map[node] = make_endpoint_state_ptr(remote_state); } } } @@ -699,10 +697,7 @@ future<> gossiper::remove_endpoint(inet_address endpoint, permit_id pid) { logger.info("removed {} from _seeds, updated _seeds list = {}", endpoint, _seeds); } - std::optional state; - if (auto eps = get_endpoint_state_for_endpoint_ptr(endpoint)) { - state.emplace(*eps); - } + auto state = get_endpoint_state_ptr(endpoint); bool was_alive; co_await mutate_live_and_unreachable_endpoints([endpoint, &was_alive] (gossiper& g) { @@ -717,7 +712,7 @@ future<> gossiper::remove_endpoint(inet_address endpoint, permit_id pid) { if (was_alive && state) { try { logger.info("InetAddress {} is now DOWN, status = {}", endpoint, get_gossip_status(*state)); - co_await do_on_dead_notifications(endpoint, std::move(*state), pid); + co_await do_on_dead_notifications(endpoint, std::move(state), pid); } catch (...) { logger.warn("Fail to call on_dead callback: {}", std::current_exception()); } @@ -737,7 +732,7 @@ future<> gossiper::do_status_check() { auto permit = co_await lock_endpoint(endpoint, null_permit_id); const auto& pid = permit.id(); - auto eps = get_endpoint_state_for_endpoint_ptr(endpoint); + auto eps = get_endpoint_state_ptr(endpoint); if (!eps) { continue; } @@ -1100,7 +1095,7 @@ void gossiper::run() { if (logger.is_enabled(logging::log_level::trace)) { for (auto& x : _endpoint_state_map) { - logger.trace("ep={}, eps={}", x.first, x.second); + logger.trace("ep={}, eps={}", x.first, *x.second); } } if (is_enabled()) { @@ -1174,14 +1169,14 @@ int64_t gossiper::get_endpoint_downtime(inet_address ep) const noexcept { // It is called from failure_detector future<> gossiper::convict(inet_address endpoint) { auto permit = co_await lock_endpoint(endpoint, null_permit_id); - auto* state = get_endpoint_state_for_endpoint_ptr(endpoint); + auto state = get_endpoint_state_ptr(endpoint); if (!state || !is_alive(endpoint)) { co_return; } if (is_shutdown(endpoint)) { co_await mark_as_shutdown(endpoint, permit.id()); } else { - co_await mark_dead(endpoint, *state, permit.id()); + co_await mark_dead(endpoint, state, permit.id()); } } @@ -1235,7 +1230,7 @@ void gossiper::make_random_gossip_digest(utils::chunked_vector& g } std::shuffle(endpoints.begin(), endpoints.end(), _random_engine); for (auto& endpoint : endpoints) { - auto es = get_endpoint_state_for_endpoint_ptr(endpoint); + auto es = get_endpoint_state_ptr(endpoint); if (es) { auto& eps = *es; generation = eps.get_heart_beat_state().get_generation(); @@ -1260,7 +1255,7 @@ future<> gossiper::replicate(inet_address ep, const endpoint_state& es, permit_i verify_permit(ep, pid); return container().invoke_on_all([ep, es, orig = this_shard_id(), self = shared_from_this()] (gossiper& g) { if (this_shard_id() != orig) { - g._endpoint_state_map[ep].add_application_state(es); + g.get_or_create_endpoint_state(ep).add_application_state(es); } }); } @@ -1268,14 +1263,14 @@ future<> gossiper::replicate(inet_address ep, const endpoint_state& es, permit_i future<> gossiper::advertise_token_removed(inet_address endpoint, locator::host_id host_id, permit_id pid) { auto permit = co_await lock_endpoint(endpoint, pid); pid = permit.id(); - auto& eps = get_endpoint_state(endpoint); + auto eps = get_endpoint_state(endpoint); eps.update_timestamp(); // make sure we don't evict it too soon eps.get_heart_beat_state().force_newer_generation_unsafe(); auto expire_time = compute_expire_time(); eps.add_application_state(application_state::STATUS, versioned_value::removed_nonlocal(host_id, expire_time.time_since_epoch().count())); logger.info("Completing removal of {}", endpoint); add_expire_time_for_endpoint(endpoint, expire_time); - _endpoint_state_map[endpoint] = eps; + _endpoint_state_map[endpoint] = make_endpoint_state_ptr(std::move(eps)); co_await replicate(endpoint, eps, pid); // ensure at least one gossip round occurs before returning co_await sleep_abortable(INTERVAL * 2, _abort_source); @@ -1291,7 +1286,7 @@ future<> gossiper::assassinate_endpoint(sstring address) { return seastar::async([&gossiper, g = gossiper.shared_from_this(), address] { inet_address endpoint(address); auto permit = gossiper.lock_endpoint(endpoint, null_permit_id).get0(); - auto es = gossiper.get_endpoint_state_for_endpoint_ptr(endpoint); + auto es = gossiper.get_mutable_endpoint_state_ptr(endpoint); auto now = gossiper.now(); generation_type gen(std::chrono::duration_cast((now + std::chrono::seconds(60)).time_since_epoch()).count()); version_type ver(9999); @@ -1312,7 +1307,7 @@ future<> gossiper::assassinate_endpoint(sstring address) { // make sure it did not change sleep_abortable(ring_delay, gossiper._abort_source).get(); - es = gossiper.get_endpoint_state_for_endpoint_ptr(endpoint); + es = gossiper.get_mutable_endpoint_state_ptr(endpoint); if (!es) { logger.warn("Endpoint {} disappeared while trying to assassinate, continuing anyway", endpoint); } else { @@ -1379,7 +1374,7 @@ future<> gossiper::do_gossip_to_unreachable_member(gossip_digest_syn message) { } bool gossiper::is_gossip_only_member(inet_address endpoint) { - auto es = get_endpoint_state_for_endpoint_ptr(endpoint); + auto es = get_endpoint_state_ptr(endpoint); if (!es) { return false; } @@ -1397,30 +1392,30 @@ clk::time_point gossiper::get_expire_time_for_endpoint(inet_address endpoint) co } } -const endpoint_state* gossiper::get_endpoint_state_for_endpoint_ptr(inet_address ep) const noexcept { +endpoint_state_ptr gossiper::get_endpoint_state_ptr(inet_address ep) const noexcept { auto it = _endpoint_state_map.find(ep); if (it == _endpoint_state_map.end()) { return nullptr; } else { - return &it->second; + return it->second; } } -endpoint_state* gossiper::get_endpoint_state_for_endpoint_ptr(inet_address ep) noexcept { +endpoint_state* gossiper::get_mutable_endpoint_state_ptr(inet_address ep) noexcept { auto it = _endpoint_state_map.find(ep); if (it == _endpoint_state_map.end()) { return nullptr; } else { - return &it->second; + return const_cast(it->second.get()); } } endpoint_state& gossiper::get_endpoint_state(inet_address ep) { - auto ptr = get_endpoint_state_for_endpoint_ptr(ep); - if (!ptr) { + auto it = _endpoint_state_map.find(ep); + if (it == _endpoint_state_map.end()) { throw std::out_of_range(format("ep={}", ep)); } - return *ptr; + return const_cast(*it->second); } endpoint_state& gossiper::get_or_create_endpoint_state(inet_address ep) { @@ -1428,9 +1423,9 @@ endpoint_state& gossiper::get_or_create_endpoint_state(inet_address ep) { if (it == _endpoint_state_map.end()) { auto eps = endpoint_state(); eps.add_application_state(application_state::RPC_ADDRESS, versioned_value::rpcaddress(ep)); - it = _endpoint_state_map.emplace(ep, std::move(eps)).first; + it = _endpoint_state_map.emplace(ep, make_endpoint_state_ptr(std::move(eps))).first; } - return it->second; + return const_cast(*it->second); } future<> gossiper::reset_endpoint_state_map() { @@ -1445,7 +1440,7 @@ future<> gossiper::reset_endpoint_state_map() { }); } -const std::unordered_map& gms::gossiper::get_endpoint_states() const noexcept { +const std::unordered_map& gms::gossiper::get_endpoint_states() const noexcept { return _endpoint_state_map; } @@ -1455,7 +1450,7 @@ std::vector gossiper::get_endpoints() const { stop_iteration gossiper::for_each_endpoint_state_until(std::function func) const { for (const auto& [node, eps] : _endpoint_state_map) { - if (func(node, eps) == stop_iteration::yes) { + if (func(node, *eps) == stop_iteration::yes) { return stop_iteration::yes; } } @@ -1476,7 +1471,7 @@ bool gossiper::is_cql_ready(const inet_address& endpoint) const { // never has application_state::RPC_READY in the endpoint_state, we can // only think their cql server is up, so we return true here if // application_state::RPC_READY is not present - auto* eps = get_endpoint_state_for_endpoint_ptr(endpoint); + auto eps = get_endpoint_state_ptr(endpoint); if (!eps) { logger.debug("Node {} does not have RPC_READY application_state, return is_cql_ready=true", endpoint); return true; @@ -1499,9 +1494,8 @@ locator::host_id gossiper::get_host_id(inet_address endpoint) const { std::set gossiper::get_nodes_with_host_id(locator::host_id host_id) const { std::set nodes; - for (auto& x : get_endpoint_states()) { - auto node = x.first; - auto app_state = get_application_state_ptr(node, application_state::HOST_ID); + for (const auto& [node, eps] : get_endpoint_states()) { + auto app_state = eps->get_application_state_ptr(application_state::HOST_ID); if (app_state && host_id == locator::host_id(utils::UUID(app_state->value()))) { nodes.insert(node); } @@ -1511,7 +1505,7 @@ std::set gossiper::get_nodes_with_host_id(locator::host_id ho std::optional gossiper::get_state_for_version_bigger_than(inet_address for_endpoint, version_type version) { std::optional reqd_endpoint_state; - auto es = get_endpoint_state_for_endpoint_ptr(for_endpoint); + auto es = get_endpoint_state_ptr(for_endpoint); if (es) { auto& eps = *es; /* @@ -1544,8 +1538,8 @@ std::optional gossiper::get_state_for_version_bigger_than(inet_a } generation_type::value_type gossiper::compare_endpoint_startup(inet_address addr1, inet_address addr2) { - auto* ep1 = get_endpoint_state_for_endpoint_ptr(addr1); - auto* ep2 = get_endpoint_state_for_endpoint_ptr(addr2); + auto ep1 = get_endpoint_state_ptr(addr1); + auto ep2 = get_endpoint_state_ptr(addr2); if (!ep1 || !ep2) { auto err = format("Can not get endpoint_state for {} or {}", addr1, addr2); logger.warn("{}", err); @@ -1568,7 +1562,7 @@ void gossiper::update_timestamp_for_nodes(const std::mapget_heart_beat_state().get_generation(); @@ -1626,7 +1620,7 @@ future<> gossiper::real_mark_alive(inet_address addr) { // After sending echo message, the Node might not be in the // _endpoint_state_map anymore, use the reference of local_state // might cause user-after-free - auto* es = get_endpoint_state_for_endpoint_ptr(addr); + auto* es = get_mutable_endpoint_state_ptr(addr); if (!es) { logger.info("Node {} is not in endpoint_state_map anymore", addr); co_return; @@ -1680,15 +1674,14 @@ future<> gossiper::real_mark_alive(inet_address addr) { }); } -future<> gossiper::mark_dead(inet_address addr, endpoint_state& local_state, permit_id pid) { +future<> gossiper::mark_dead(inet_address addr, endpoint_state_ptr state, permit_id pid) { logger.trace("marking as down {}", addr); verify_permit(addr, pid); - endpoint_state state = local_state; co_await mutate_live_and_unreachable_endpoints([addr] (gossiper& g) { g._live_endpoints.erase(addr); g._unreachable_endpoints[addr] = now(); }); - logger.info("InetAddress {} is now DOWN, status = {}", addr, get_gossip_status(state)); + logger.info("InetAddress {} is now DOWN, status = {}", addr, get_gossip_status(*state)); co_await do_on_dead_notifications(addr, std::move(state), pid); } @@ -1696,7 +1689,7 @@ future<> gossiper::handle_major_state_change(inet_address ep, const endpoint_sta verify_permit(ep, pid); std::optional eps_old; - if (auto* p = get_endpoint_state_for_endpoint_ptr(ep); p) { + if (auto p = get_endpoint_state_ptr(ep)) { eps_old = *p; } @@ -1708,7 +1701,7 @@ future<> gossiper::handle_major_state_change(inet_address ep, const endpoint_sta } } logger.trace("Adding endpoint state for {}, status = {}", ep, get_gossip_status(eps)); - _endpoint_state_map[ep] = eps; + _endpoint_state_map[ep] = make_endpoint_state_ptr(eps); co_await replicate(ep, eps, pid); if (is_in_shadow_round()) { @@ -1728,15 +1721,18 @@ future<> gossiper::handle_major_state_change(inet_address ep, const endpoint_sta }); } - auto& ep_state = _endpoint_state_map.at(ep); - if (!is_dead_state(ep_state)) { + auto ep_state = get_endpoint_state_ptr(ep); + if (!ep_state) { + throw std::out_of_range(format("ep={}", ep)); + } + if (!is_dead_state(*ep_state)) { mark_alive(ep); } else { logger.debug("Not marking {} alive due to dead state {}", ep, get_gossip_status(eps)); - co_await mark_dead(ep, ep_state, pid); + co_await mark_dead(ep, std::move(ep_state), pid); } - auto* eps_new = get_endpoint_state_for_endpoint_ptr(ep); + auto eps_new = get_endpoint_state_ptr(ep); if (eps_new) { co_await _subscribers.for_each([ep, eps_new, pid] (shared_ptr subscriber) { return subscriber->on_join(ep, *eps_new, pid); @@ -1867,9 +1863,9 @@ future<> gossiper::do_on_change_notifications(inet_address addr, const applicati }); } -future<> gossiper::do_on_dead_notifications(inet_address addr, endpoint_state state, permit_id pid) { +future<> gossiper::do_on_dead_notifications(inet_address addr, endpoint_state_ptr state, permit_id pid) { co_await _subscribers.for_each([addr, state = std::move(state), pid] (shared_ptr subscriber) { - return subscriber->on_dead(addr, state, pid); + return subscriber->on_dead(addr, *state, pid); }); } @@ -1910,14 +1906,14 @@ void gossiper::examine_gossiper(utils::chunked_vector& g_digest_l auto max_remote_version = g_digest.get_max_version(); /* Get state associated with the end point in digest */ auto&& ep = g_digest.get_endpoint(); - auto es = get_endpoint_state_for_endpoint_ptr(ep); + auto es = get_endpoint_state_ptr(ep); /* Here we need to fire a GossipDigestAckMessage. If we have some * data associated with this endpoint locally then we follow the * "if" path of the logic. If we have absolutely nothing for this * endpoint we need to request all the data for this endpoint. */ if (es) { - endpoint_state& ep_state_ptr = *es; + const endpoint_state& ep_state_ptr = *es; auto local_generation = ep_state_ptr.get_heart_beat_state().get_generation(); /* get the max version of all keys in the state associated with this endpoint */ auto max_local_version = get_max_endpoint_state_version(ep_state_ptr); @@ -2003,7 +1999,7 @@ future gossiper::get_generation_for_nodes(std::unordered_set nodes) { generation_for_nodes ret; for (const auto& node : nodes) { - auto es = get_endpoint_state_for_endpoint_ptr(node); + auto es = get_endpoint_state_ptr(node); if (es) { auto current_generation_number = es->get_heart_beat_state().get_generation(); ret.emplace(node, current_generation_number); @@ -2126,7 +2122,7 @@ future<> gossiper::add_saved_endpoint(inet_address ep) { //preserve any previously known, in-memory data about the endpoint (such as DC, RACK, and so on) auto ep_state = endpoint_state(); - auto es = get_endpoint_state_for_endpoint_ptr(ep); + auto es = get_endpoint_state_ptr(ep); if (es) { ep_state = *es; logger.debug("not replacing a previous ep_state for {}, but reusing it: {}", ep, ep_state); @@ -2142,7 +2138,7 @@ future<> gossiper::add_saved_endpoint(inet_address ep) { if (host_id) { ep_state.add_application_state(gms::application_state::HOST_ID, versioned_value::host_id(host_id.value())); } - _endpoint_state_map[ep] = ep_state; + _endpoint_state_map[ep] = make_endpoint_state_ptr(ep_state); co_await replicate(ep, ep_state, permit.id()); _unreachable_endpoints[ep] = now(); logger.trace("Adding saved endpoint {} {}", ep, ep_state.get_heart_beat_state().get_generation()); @@ -2185,24 +2181,22 @@ future<> gossiper::add_local_application_state(std::list gossiper::do_stop_gossiping() { return make_ready_future<>(); } return seastar::async([this, g = this->shared_from_this()] { - auto* my_ep_state = get_endpoint_state_for_endpoint_ptr(get_broadcast_address()); + auto my_ep_state = get_endpoint_state_ptr(get_broadcast_address()); if (my_ep_state) { logger.info("My status = {}", get_gossip_status(*my_ep_state)); } @@ -2358,7 +2352,7 @@ bool gossiper::is_alive(inet_address ep) const { #ifndef SCYLLA_BUILD_MODE_RELEASE // Live endpoints must always have a valid endpoint_state. // Verify that in testing mode to reduce the overhead in production. - if (is_alive && !get_endpoint_state_for_endpoint_ptr(ep)) { + if (is_alive && !get_endpoint_state_ptr(ep)) { on_internal_error(logger, fmt::format("Node {} is alive but has no endpoint state", ep)); } #endif @@ -2412,7 +2406,7 @@ future<> gossiper::wait_for_live_nodes_to_show_up(size_t n) { } const versioned_value* gossiper::get_application_state_ptr(inet_address endpoint, application_state appstate) const noexcept { - auto* eps = get_endpoint_state_for_endpoint_ptr(std::move(endpoint)); + auto eps = get_endpoint_state_ptr(std::move(endpoint)); if (!eps) { return nullptr; } @@ -2433,13 +2427,13 @@ sstring gossiper::get_application_state_value(inet_address endpoint, application */ future<> gossiper::mark_as_shutdown(const inet_address& endpoint, permit_id pid) { verify_permit(endpoint, pid); - auto es = get_endpoint_state_for_endpoint_ptr(endpoint); + auto es = get_mutable_endpoint_state_ptr(endpoint); if (es) { auto& ep_state = *es; ep_state.add_application_state(application_state::STATUS, versioned_value::shutdown(true)); ep_state.get_heart_beat_state().force_highest_possible_version_unsafe(); co_await replicate(endpoint, ep_state, pid); - co_await mark_dead(endpoint, ep_state, pid); + co_await mark_dead(endpoint, get_endpoint_state_ptr(endpoint), pid); } } @@ -2532,7 +2526,7 @@ bool gossiper::is_safe_for_bootstrap(inet_address endpoint) { // 1) The node is a completely new node and no state in gossip at all // 2) The node has state in gossip and it is already removed from the // cluster either by nodetool decommission or nodetool removenode - auto* eps = get_endpoint_state_for_endpoint_ptr(endpoint); + auto eps = get_endpoint_state_ptr(endpoint); bool allowed = true; if (!eps) { logger.debug("is_safe_for_bootstrap: node={}, status=no state in gossip, allowed_to_bootstrap={}", endpoint, allowed); @@ -2651,7 +2645,7 @@ void gossiper::check_knows_remote_features(std::set& local_fea void gossiper::check_snitch_name_matches(sstring local_snitch_name) const { for (const auto& [address, state] : _endpoint_state_map) { - const auto remote_snitch_name = state.get_application_state_ptr(application_state::SNITCH_NAME); + const auto remote_snitch_name = state->get_application_state_ptr(application_state::SNITCH_NAME); if (!remote_snitch_name) { continue; } diff --git a/gms/gossiper.hh b/gms/gossiper.hh index f747516fc9..a640bad860 100644 --- a/gms/gossiper.hh +++ b/gms/gossiper.hh @@ -184,7 +184,7 @@ private: } /* map where key is the endpoint and value is the state associated with the endpoint */ - std::unordered_map _endpoint_state_map; + std::unordered_map _endpoint_state_map; // Used for serializing changes to _endpoint_state_map and running of associated change listeners. endpoint_locks_map _endpoint_locks; @@ -409,15 +409,14 @@ private: future<> do_status_check(); - const std::unordered_map& get_endpoint_states() const noexcept; + const std::unordered_map& get_endpoint_states() const noexcept; public: clk::time_point get_expire_time_for_endpoint(inet_address endpoint) const noexcept; - const endpoint_state* get_endpoint_state_for_endpoint_ptr(inet_address ep) const noexcept; - endpoint_state& get_endpoint_state(inet_address ep); - - endpoint_state* get_endpoint_state_for_endpoint_ptr(inet_address ep) noexcept; + // Gets a shared pointer to the endpoint_state, if exists. + // Otherwise, returns a null ptr. + endpoint_state_ptr get_endpoint_state_ptr(inet_address ep) const noexcept; const versioned_value* get_application_state_ptr(inet_address endpoint, application_state appstate) const noexcept; sstring get_application_state_value(inet_address endpoint, application_state appstate) const; @@ -466,12 +465,17 @@ public: */ sstring get_rpc_address(const inet_address& endpoint) const; private: + // FIXME: for now, allow modifying the endpoint_state in place + // until all updates are applied only using replicate // Gets or creates endpoint_state for this node endpoint_state& get_or_create_endpoint_state(inet_address ep); endpoint_state& my_endpoint_state() { return get_or_create_endpoint_state(get_broadcast_address()); } + endpoint_state* get_mutable_endpoint_state_ptr(inet_address ep) noexcept; + endpoint_state& get_endpoint_state(inet_address ep); + void update_timestamp_for_nodes(const std::map& map); void mark_alive(inet_address addr); @@ -479,7 +483,7 @@ private: future<> real_mark_alive(inet_address addr); // Must be called under lock_endpoint. - future<> mark_dead(inet_address addr, endpoint_state& local_state, permit_id); + future<> mark_dead(inet_address addr, endpoint_state_ptr local_state, permit_id); // Must be called under lock_endpoint. future<> mark_as_shutdown(const inet_address& endpoint, permit_id); @@ -528,7 +532,7 @@ private: // notify that a node is DOWN (dead) // Must be called under lock_endpoint. - future<> do_on_dead_notifications(inet_address addr, endpoint_state state, permit_id); + future<> do_on_dead_notifications(inet_address addr, endpoint_state_ptr state, permit_id); /* Request all the state for the endpoint in the g_digest */ diff --git a/replica/table.cc b/replica/table.cc index 284dc7a94d..2994a64c96 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -2330,7 +2330,7 @@ table::cache_hit_rate table::get_hit_rate(const gms::gossiper& gossiper, gms::in auto it = _cluster_cache_hit_rates.find(addr); if (it == _cluster_cache_hit_rates.end()) { // no data yet, get it from the gossiper - auto* eps = gossiper.get_endpoint_state_for_endpoint_ptr(addr); + auto eps = gossiper.get_endpoint_state_ptr(addr); if (eps) { auto* state = eps->get_application_state_ptr(gms::application_state::CACHE_HITRATES); float f = -1.0f; // missing state means old node diff --git a/scylla-gdb.py b/scylla-gdb.py index ac061f8e97..446bd217b3 100755 --- a/scylla-gdb.py +++ b/scylla-gdb.py @@ -4212,6 +4212,11 @@ class scylla_gms(gdb.Command): gossiper = sharded(gdb.parse_and_eval('gms::_the_gossiper')).local() state_map = gossiper['endpoint_state_map'] for (endpoint, state) in unordered_map(state_map): + try: + state_ptr = seastar_lw_shared_ptr(state) + state = state_ptr.get().dereference() + except Exception: + pass ip = ip_to_str(int(get_ip(endpoint)), byteorder=sys.byteorder) gdb.write('%s: (gms::endpoint_state*) %s (%s)\n' % (ip, state.address, state['_heart_beat_state'])) for app_state, vv in std_map(state['_application_state']): diff --git a/service/migration_manager.cc b/service/migration_manager.cc index f8e448eef0..34b29e0cdd 100644 --- a/service/migration_manager.cc +++ b/service/migration_manager.cc @@ -233,7 +233,7 @@ bool migration_manager::have_schema_agreement() { return stop_iteration::no; } mlogger.debug("Checking schema state for {}.", endpoint); - auto* schema = eps.get_application_state_ptr(gms::application_state::SCHEMA); + auto schema = eps.get_application_state_ptr(gms::application_state::SCHEMA); if (!schema) { mlogger.debug("Schema state not yet available for {}.", endpoint); match = false; @@ -289,7 +289,7 @@ future<> migration_manager::maybe_schedule_schema_pull(const table_schema_versio // pushed out simultaneously. See CASSANDRA-5025 return sleep_abortable(migration_delay, _as).then([this, &db, endpoint] { // grab the latest version of the schema since it may have changed again since the initial scheduling - auto* ep_state = _gossiper.get_endpoint_state_for_endpoint_ptr(endpoint); + auto ep_state = _gossiper.get_endpoint_state_ptr(endpoint); if (!ep_state) { mlogger.debug("epState vanished for {}, not submitting migration task", endpoint); return make_ready_future<>(); @@ -1206,7 +1206,7 @@ future<> migration_manager::on_join(gms::inet_address endpoint, gms::endpoint_st future<> migration_manager::on_change(gms::inet_address endpoint, gms::application_state state, const gms::versioned_value& value, gms::permit_id) { if (state == gms::application_state::SCHEMA) { - auto* ep_state = _gossiper.get_endpoint_state_for_endpoint_ptr(endpoint); + auto ep_state = _gossiper.get_endpoint_state_ptr(endpoint); if (!ep_state || _gossiper.is_dead_state(*ep_state)) { mlogger.debug("Ignoring state change for dead or unknown endpoint: {}", endpoint); return make_ready_future(); diff --git a/service/storage_service.cc b/service/storage_service.cc index 78ef1a1630..b63e747c6c 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -486,7 +486,7 @@ future<> storage_service::topology_state_load() { // endpoints. We cannot rely on seeds alone, since it is not guaranteed that seeds // will be up to date and reachable at the time of restart. for (const auto& e: get_token_metadata_ptr()->get_all_endpoints()) { - if (!utils::fb_utilities::is_me(e) && !_gossiper.get_endpoint_state_for_endpoint_ptr(e)) { + if (!utils::fb_utilities::is_me(e) && !_gossiper.get_endpoint_state_ptr(e)) { co_await _gossiper.add_saved_endpoint(e); } } @@ -2447,7 +2447,7 @@ future<> storage_service::join_token_ring(sharded storage_service::join_token_ring(shardedget_endpoint(token); if (existing) { - auto* eps = _gossiper.get_endpoint_state_for_endpoint_ptr(*existing); + auto eps = _gossiper.get_endpoint_state_ptr(*existing); if (eps && eps->get_update_timestamp() > gms::gossiper::clk::now() - delay) { throw std::runtime_error("Cannot replace a live node..."); } @@ -3204,7 +3204,7 @@ future<> storage_service::handle_state_left(inet_address endpoint, std::vector storage_service::on_change(inet_address endpoint, application_state sta co_return; // did nothing. } } else { - auto* ep_state = _gossiper.get_endpoint_state_for_endpoint_ptr(endpoint); + auto ep_state = _gossiper.get_endpoint_state_ptr(endpoint); if (!ep_state || _gossiper.is_dead_state(*ep_state)) { slogger.debug("Ignoring state change for dead or unknown endpoint: {}", endpoint); co_return; @@ -3404,7 +3404,7 @@ future<> storage_service::do_update_system_peers_table(gms::inet_address endpoin future<> storage_service::update_peer_info(gms::inet_address endpoint) { slogger.debug("Update peer info: endpoint={}", endpoint); using namespace gms; - auto* ep_state = _gossiper.get_endpoint_state_for_endpoint_ptr(endpoint); + auto ep_state = _gossiper.get_endpoint_state_ptr(endpoint); if (!ep_state) { co_return; } @@ -3787,7 +3787,7 @@ storage_service::prepare_replacement_info(std::unordered_set replace_address = *nodes.begin(); } - auto* state = _gossiper.get_endpoint_state_for_endpoint_ptr(replace_address); + auto state = _gossiper.get_endpoint_state_ptr(replace_address); if (!state) { throw std::runtime_error(::format("Cannot replace_address {} because it doesn't exist in gossip", replace_address)); }