storage_service: topology_state_load: fill new token_metadata

For each inet_address-based modification of token_metadata we
make a corresponding host_id-based change in token_metadata->get_new().

The _gossiper.add_saved_endpoint logic is switched to the new token_metadata.
This commit is contained in:
Petr Gusev
2023-11-23 12:49:09 +04:00
parent e7e1c4e63c
commit b6fbbe28aa
2 changed files with 25 additions and 9 deletions

View File

@@ -399,11 +399,15 @@ future<> storage_service::topology_state_load() {
co_await tmptr->clear_gently(); // drop previous state
tmptr->set_version(_topology_state_machine._topology.version);
tmptr->get_new()->set_version(_topology_state_machine._topology.version);
auto update_topology = [&] (locator::host_id id, inet_address ip, const replica_state& rs) {
tmptr->update_topology(ip, locator::endpoint_dc_rack{rs.datacenter, rs.rack},
to_topology_node_state(rs.state), rs.shard_count);
tmptr->get_new()->update_topology(id, locator::endpoint_dc_rack{rs.datacenter, rs.rack},
to_topology_node_state(rs.state), rs.shard_count);
tmptr->update_host_id(id, ip);
tmptr->get_new()->update_host_id(id, ip);
};
auto add_normal_node = [&] (raft::server_id id, const replica_state& rs) -> future<> {
@@ -432,13 +436,14 @@ future<> storage_service::topology_state_load() {
}
update_topology(host_id, ip, rs);
co_await tmptr->update_normal_tokens(rs.ring.value().tokens, ip);
co_await tmptr->get_new()->update_normal_tokens(rs.ring.value().tokens, host_id);
};
for (const auto& [id, rs]: _topology_state_machine._topology.normal_nodes) {
co_await add_normal_node(id, rs);
}
tmptr->set_read_new(std::invoke([](std::optional<topology::transition_state> state) {
const auto read_new = std::invoke([](std::optional<topology::transition_state> state) {
using read_new_t = locator::token_metadata::read_new_t;
if (!state.has_value()) {
return read_new_t::no;
@@ -457,7 +462,9 @@ future<> storage_service::topology_state_load() {
case topology::transition_state::write_both_read_new:
return read_new_t::yes;
}
}, _topology_state_machine._topology.tstate));
}, _topology_state_machine._topology.tstate);
tmptr->set_read_new(read_new);
tmptr->get_new()->set_read_new(read_new);
for (const auto& [id, rs]: _topology_state_machine._topology.transition_nodes) {
locator::host_id host_id{id.uuid()};
@@ -484,8 +491,10 @@ future<> storage_service::topology_state_load() {
// (such as the CDC generation write).
// It doesn't break anything to set the tokens to normal early in this single-node case.
co_await tmptr->update_normal_tokens(rs.ring.value().tokens, ip);
co_await tmptr->get_new()->update_normal_tokens(rs.ring.value().tokens, host_id);
} else {
tmptr->add_bootstrap_tokens(rs.ring.value().tokens, ip);
tmptr->get_new()->add_bootstrap_tokens(rs.ring.value().tokens, host_id);
co_await update_topology_change_info(tmptr, ::format("bootstrapping node {}/{}", id, ip));
}
}
@@ -494,7 +503,9 @@ future<> storage_service::topology_state_load() {
case node_state::removing:
update_topology(host_id, ip, rs);
co_await tmptr->update_normal_tokens(rs.ring.value().tokens, ip);
co_await tmptr->get_new()->update_normal_tokens(rs.ring.value().tokens, host_id);
tmptr->add_leaving_endpoint(ip);
tmptr->get_new()->add_leaving_endpoint(host_id);
co_await update_topology_change_info(tmptr, ::format("{} {}/{}", rs.state, id, ip));
break;
case node_state::replacing: {
@@ -507,11 +518,11 @@ future<> storage_service::topology_state_load() {
on_fatal_internal_error(slogger, ::format("Cannot map id of a node being replaced {} to its ip", replaced_id));
}
assert(existing_ip);
// FIXME: Topology cannot hold two IPs with different host ids yet so
// when replacing we must advertise the replaced_id for the ip, otherwise
// topology will complain about host id of a local node changing and fail.
update_topology(ip == existing_ip ? locator::host_id(replaced_id.uuid()) : host_id, ip, rs);
const auto replaced_host_id = locator::host_id(replaced_id.uuid());
tmptr->get_new()->update_topology(replaced_host_id, std::nullopt, locator::node::state::being_replaced);
update_topology(host_id, ip, rs);
tmptr->add_replacing_endpoint(*existing_ip, ip);
tmptr->get_new()->add_replacing_endpoint(replaced_host_id, host_id);
co_await update_topology_change_info(tmptr, ::format("replacing {}/{} by {}/{}", replaced_id, *existing_ip, id, ip));
}
}
@@ -545,9 +556,11 @@ future<> storage_service::topology_state_load() {
// of the cluster state. To work correctly, the gossiper needs to know the current
// endpoints. We cannot rely on seeds alone, since it is not guaranteed that seeds
// will be up to date and reachable at the time of restart.
for (const auto& e: get_token_metadata_ptr()->get_all_endpoints()) {
if (!is_me(e) && !_gossiper.get_endpoint_state_ptr(e)) {
co_await _gossiper.add_saved_endpoint(e);
const auto* tmptr = get_token_metadata_ptr()->get_new();
for (const auto& e: tmptr->get_all_endpoints()) {
const auto ep = tmptr->get_endpoint_for_host_id(e);
if (!is_me(e) && !_gossiper.get_endpoint_state_ptr(ep)) {
co_await _gossiper.add_saved_endpoint(ep);
}
}

View File

@@ -270,6 +270,9 @@ private:
bool is_me(inet_address addr) const noexcept {
return get_token_metadata_ptr()->get_topology().is_me(addr);
}
bool is_me(locator::host_id id) const noexcept {
return get_token_metadata_ptr()->get_topology().is_me(id);
}
/* This abstraction maintains the token/endpoint metadata information */
shared_token_metadata& _shared_token_metadata;