From b6fbbe28aa74108576e7034c5f619fb1dfe1390b Mon Sep 17 00:00:00 2001 From: Petr Gusev Date: Thu, 23 Nov 2023 12:49:09 +0400 Subject: [PATCH] storage_service: topology_state_load: fill new token_metadata For each inet_address-based modification of token_metadata we make a corresponding host_id-based change in token_metadata->get_new(). The _gossiper.add_saved_endpoint logic is switched to the new token_metadata. --- service/storage_service.cc | 31 ++++++++++++++++++++++--------- service/storage_service.hh | 3 +++ 2 files changed, 25 insertions(+), 9 deletions(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index 586685c786..0abcb0821b 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -399,11 +399,15 @@ future<> storage_service::topology_state_load() { co_await tmptr->clear_gently(); // drop previous state tmptr->set_version(_topology_state_machine._topology.version); + tmptr->get_new()->set_version(_topology_state_machine._topology.version); auto update_topology = [&] (locator::host_id id, inet_address ip, const replica_state& rs) { tmptr->update_topology(ip, locator::endpoint_dc_rack{rs.datacenter, rs.rack}, to_topology_node_state(rs.state), rs.shard_count); + tmptr->get_new()->update_topology(id, locator::endpoint_dc_rack{rs.datacenter, rs.rack}, + to_topology_node_state(rs.state), rs.shard_count); tmptr->update_host_id(id, ip); + tmptr->get_new()->update_host_id(id, ip); }; auto add_normal_node = [&] (raft::server_id id, const replica_state& rs) -> future<> { @@ -432,13 +436,14 @@ future<> storage_service::topology_state_load() { } update_topology(host_id, ip, rs); co_await tmptr->update_normal_tokens(rs.ring.value().tokens, ip); + co_await tmptr->get_new()->update_normal_tokens(rs.ring.value().tokens, host_id); }; for (const auto& [id, rs]: _topology_state_machine._topology.normal_nodes) { co_await add_normal_node(id, rs); } - tmptr->set_read_new(std::invoke([](std::optional state) { + const auto read_new = std::invoke([](std::optional state) { using read_new_t = locator::token_metadata::read_new_t; if (!state.has_value()) { return read_new_t::no; @@ -457,7 +462,9 @@ future<> storage_service::topology_state_load() { case topology::transition_state::write_both_read_new: return read_new_t::yes; } - }, _topology_state_machine._topology.tstate)); + }, _topology_state_machine._topology.tstate); + tmptr->set_read_new(read_new); + tmptr->get_new()->set_read_new(read_new); for (const auto& [id, rs]: _topology_state_machine._topology.transition_nodes) { locator::host_id host_id{id.uuid()}; @@ -484,8 +491,10 @@ future<> storage_service::topology_state_load() { // (such as the CDC generation write). // It doesn't break anything to set the tokens to normal early in this single-node case. co_await tmptr->update_normal_tokens(rs.ring.value().tokens, ip); + co_await tmptr->get_new()->update_normal_tokens(rs.ring.value().tokens, host_id); } else { tmptr->add_bootstrap_tokens(rs.ring.value().tokens, ip); + tmptr->get_new()->add_bootstrap_tokens(rs.ring.value().tokens, host_id); co_await update_topology_change_info(tmptr, ::format("bootstrapping node {}/{}", id, ip)); } } @@ -494,7 +503,9 @@ future<> storage_service::topology_state_load() { case node_state::removing: update_topology(host_id, ip, rs); co_await tmptr->update_normal_tokens(rs.ring.value().tokens, ip); + co_await tmptr->get_new()->update_normal_tokens(rs.ring.value().tokens, host_id); tmptr->add_leaving_endpoint(ip); + tmptr->get_new()->add_leaving_endpoint(host_id); co_await update_topology_change_info(tmptr, ::format("{} {}/{}", rs.state, id, ip)); break; case node_state::replacing: { @@ -507,11 +518,11 @@ future<> storage_service::topology_state_load() { on_fatal_internal_error(slogger, ::format("Cannot map id of a node being replaced {} to its ip", replaced_id)); } assert(existing_ip); - // FIXME: Topology cannot hold two IPs with different host ids yet so - // when replacing we must advertise the replaced_id for the ip, otherwise - // topology will complain about host id of a local node changing and fail. - update_topology(ip == existing_ip ? locator::host_id(replaced_id.uuid()) : host_id, ip, rs); + const auto replaced_host_id = locator::host_id(replaced_id.uuid()); + tmptr->get_new()->update_topology(replaced_host_id, std::nullopt, locator::node::state::being_replaced); + update_topology(host_id, ip, rs); tmptr->add_replacing_endpoint(*existing_ip, ip); + tmptr->get_new()->add_replacing_endpoint(replaced_host_id, host_id); co_await update_topology_change_info(tmptr, ::format("replacing {}/{} by {}/{}", replaced_id, *existing_ip, id, ip)); } } @@ -545,9 +556,11 @@ future<> storage_service::topology_state_load() { // of the cluster state. To work correctly, the gossiper needs to know the current // endpoints. We cannot rely on seeds alone, since it is not guaranteed that seeds // will be up to date and reachable at the time of restart. - for (const auto& e: get_token_metadata_ptr()->get_all_endpoints()) { - if (!is_me(e) && !_gossiper.get_endpoint_state_ptr(e)) { - co_await _gossiper.add_saved_endpoint(e); + const auto* tmptr = get_token_metadata_ptr()->get_new(); + for (const auto& e: tmptr->get_all_endpoints()) { + const auto ep = tmptr->get_endpoint_for_host_id(e); + if (!is_me(e) && !_gossiper.get_endpoint_state_ptr(ep)) { + co_await _gossiper.add_saved_endpoint(ep); } } diff --git a/service/storage_service.hh b/service/storage_service.hh index 99bf6f58c6..c8ad5970c4 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -270,6 +270,9 @@ private: bool is_me(inet_address addr) const noexcept { return get_token_metadata_ptr()->get_topology().is_me(addr); } + bool is_me(locator::host_id id) const noexcept { + return get_token_metadata_ptr()->get_topology().is_me(id); + } /* This abstraction maintains the token/endpoint metadata information */ shared_token_metadata& _shared_token_metadata;