storage_service: handle_state_normal: fill new token_metadata

This commit is contained in:
Petr Gusev
2023-10-19 18:51:45 +04:00
parent b6fbbe28aa
commit 6412cd64f1

View File

@@ -3694,29 +3694,86 @@ future<> storage_service::handle_state_normal(inet_address endpoint, gms::permit
std::unordered_set<inet_address> endpoints_to_remove;
auto do_remove_node = [&] (gms::inet_address node) {
// this lambda is called in three cases:
// 1. old endpoint for the given host_id is ours, we remove the new endpoint;
// 2. new endpoint for the given host_id has bigger generation, we remove the old endpoint;
// 3. old endpoint for the given host_id has bigger generation, we remove the new endpoint.
// In all of these cases host_id is retained, only the IP addresses are changed.
// That's why we don't need to call remove_endpoint on tmptr->get_new().
// However, it will be called eventually through the chain storage_service::remove_endpoint ->
// _gossiper.remove_endpoint -> storage_service::on_remove, and we should handle
// the case when we wouldn't be able to find endpoint -> ip mapping in tm->get_new().
// This could happen e.g. when the new endpoint has bigger generation - the code
// below will remap host_id to new IP and we won't find old IP in storage_service::on_remove.
// We should just skip the remove in that case.
tmptr->remove_endpoint(node);
endpoints_to_remove.insert(node);
};
// Order Matters, TM.updateHostID() should be called before TM.updateNormalToken(), (see CASSANDRA-4300).
auto host_id = _gossiper.get_host_id(endpoint);
auto existing = tmptr->get_endpoint_for_host_id_if_known(host_id);
// Old node in replace-with-same-IP scenario.
std::optional<locator::host_id> replaced_id;
if (existing && *existing != endpoint) {
// This branch in taken when a node changes its IP address.
if (*existing == get_broadcast_address()) {
slogger.warn("Not updating host ID {} for {} because it's mine", host_id, endpoint);
do_remove_node(endpoint);
} else if (_gossiper.compare_endpoint_startup(endpoint, *existing) > 0) {
// The new IP has greater generation than the existing one.
// Here we remap the host_id to the new IP. The 'owned_tokens' calculation logic below
// won't detect any changes - the branch 'endpoint == current_owner' will be taken.
// We still need to call 'remove_endpoint' for existing IP to remove it from system.peers.
slogger.warn("Host ID collision for {} between {} and {}; {} is the new owner", host_id, *existing, endpoint, endpoint);
do_remove_node(*existing);
slogger.info("Set host_id={} to be owned by node={}, existing={}", host_id, endpoint, *existing);
tmptr->update_host_id(host_id, endpoint);
tmptr->get_new()->update_host_id(host_id, endpoint);
} else {
// The new IP has smaller generation than the existing one,
// we are going to remove it, so we add it to the endpoints_to_remove.
// How does this relate to the tokens this endpoint may have?
// There is a condition below which checks that if endpoints_to_remove
// contains 'endpoint', then the owned_tokens must be empty, otherwise internal_error
// is triggered. This means the following is expected to be true:
// 1. each token from the tokens variable (which is read from gossiper) must have an owner node
// 2. this owner must be different from 'endpoint'
// 3. its generation must be greater than endpoint's
slogger.warn("Host ID collision for {} between {} and {}; ignored {}", host_id, *existing, endpoint, endpoint);
do_remove_node(endpoint);
}
} else if (existing && *existing == endpoint) {
// This branch is taken for all gossiper-managed topology operations.
// For example, if this node is a member of the cluster and a new node is added,
// handle_state_normal is called on this node as the final step
// in the endpoint bootstrap process.
// This method is also called for both replace scenarios - with either the same or with a different IP.
// If the new node has a different IP, the old IP is removed by the block of
// logic below - we detach the old IP from token ring,
// it gets added to candidates_for_removal, then storage_service::remove_endpoint ->
// _gossiper.remove_endpoint -> storage_service::on_remove -> remove from token_metadata.
// If the new node has the same IP, we need to explicitly remove old host_id from
// token_metadata, since no IPs will be removed in this case.
// We do this after update_normal_tokens, allowing for tokens to be properly
// migrated to the new host_id.
tmptr->del_replacing_endpoint(endpoint);
if (const auto old_host_id = tmptr->get_new()->get_host_id_if_known(endpoint); old_host_id && *old_host_id != host_id) {
replaced_id = *old_host_id;
}
} else {
// This branch is taken if this node wasn't involved in node_ops
// workflow (storage_service::node_ops_cmd_handler wasn't called on it) and it just
// receives the current state of the cluster from the gossiper.
// For example, a new node receives this notification for every
// existing node in the cluster.
tmptr->del_replacing_endpoint(endpoint);
auto nodes = _gossiper.get_nodes_with_host_id(host_id);
bool left = std::any_of(nodes.begin(), nodes.end(), [this] (const gms::inet_address& node) { return _gossiper.is_left(node); });
if (left) {
@@ -3726,6 +3783,7 @@ future<> storage_service::handle_state_normal(inet_address endpoint, gms::permit
}
slogger.info("Set host_id={} to be owned by node={}", host_id, endpoint);
tmptr->update_host_id(host_id, endpoint);
tmptr->get_new()->update_host_id(host_id, endpoint);
}
// Tokens owned by the handled endpoint.
@@ -3739,6 +3797,16 @@ future<> storage_service::handle_state_normal(inet_address endpoint, gms::permit
std::unordered_map<token, inet_address> token_to_endpoint_map = get_token_metadata().get_token_to_endpoint();
std::unordered_set<inet_address> candidates_for_removal;
// Here we convert tokens from gossiper to owned_tokens, which will be assigned as a new
// normal tokens to token_metadata and its new host_id-based version.
// This transformation accounts for situations where some tokens
// belong to outdated nodes - the ones with smaller generation.
// We use endpoints instead of host_ids here since gossiper operates
// with endpoints and generations are tied to endpoints, not host_ids.
// In replace-with-same-ip scenario we won't be able to distinguish
// between the old and new IP owners, so we assume the old replica
// is down and won't be resurrected.
for (auto t : tokens) {
// we don't want to update if this node is responsible for the token and it has a later startup time than endpoint.
auto current = token_to_endpoint_map.find(t);
@@ -3806,8 +3874,21 @@ future<> storage_service::handle_state_normal(inet_address endpoint, gms::permit
do_notify_joined = true;
}
tmptr->update_topology(endpoint, get_dc_rack_for(endpoint), locator::node::state::normal);
const auto dc_rack = get_dc_rack_for(endpoint);
tmptr->update_topology(endpoint, dc_rack, locator::node::state::normal);
tmptr->get_new()->update_topology(host_id, dc_rack, locator::node::state::normal);
co_await tmptr->update_normal_tokens(owned_tokens, endpoint);
co_await tmptr->get_new()->update_normal_tokens(owned_tokens, host_id);
if (replaced_id) {
if (tmptr->get_new()->is_normal_token_owner(*replaced_id)) {
on_internal_error(slogger, ::format("replaced endpoint={}/{} still owns tokens {}",
endpoint, *replaced_id, tmptr->get_new()->get_tokens(*replaced_id)));
} else {
tmptr->get_new()->remove_endpoint(*replaced_id);
slogger.info("node {}/{} is removed from token_metadata since it's replaced by {}/{} ",
endpoint, *replaced_id, endpoint, host_id);
}
}
}
co_await update_topology_change_info(tmptr, ::format("handle_state_normal {}", endpoint));