diff --git a/service/storage_service.cc b/service/storage_service.cc index 0abcb0821b..4bff5a4b00 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -3694,29 +3694,86 @@ future<> storage_service::handle_state_normal(inet_address endpoint, gms::permit std::unordered_set endpoints_to_remove; auto do_remove_node = [&] (gms::inet_address node) { + // this lambda is called in three cases: + // 1. old endpoint for the given host_id is ours, we remove the new endpoint; + // 2. new endpoint for the given host_id has bigger generation, we remove the old endpoint; + // 3. old endpoint for the given host_id has bigger generation, we remove the new endpoint. + // In all of these cases host_id is retained, only the IP addresses are changed. + // That's why we don't need to call remove_endpoint on tmptr->get_new(). + // However, it will be called eventually through the chain storage_service::remove_endpoint -> + // _gossiper.remove_endpoint -> storage_service::on_remove, and we should handle + // the case when we wouldn't be able to find endpoint -> ip mapping in tm->get_new(). + // This could happen e.g. when the new endpoint has bigger generation - the code + // below will remap host_id to new IP and we won't find old IP in storage_service::on_remove. + // We should just skip the remove in that case. + tmptr->remove_endpoint(node); endpoints_to_remove.insert(node); }; // Order Matters, TM.updateHostID() should be called before TM.updateNormalToken(), (see CASSANDRA-4300). auto host_id = _gossiper.get_host_id(endpoint); auto existing = tmptr->get_endpoint_for_host_id_if_known(host_id); + + // Old node in replace-with-same-IP scenario. + std::optional replaced_id; + if (existing && *existing != endpoint) { + // This branch in taken when a node changes its IP address. + if (*existing == get_broadcast_address()) { slogger.warn("Not updating host ID {} for {} because it's mine", host_id, endpoint); do_remove_node(endpoint); } else if (_gossiper.compare_endpoint_startup(endpoint, *existing) > 0) { + // The new IP has greater generation than the existing one. + // Here we remap the host_id to the new IP. The 'owned_tokens' calculation logic below + // won't detect any changes - the branch 'endpoint == current_owner' will be taken. + // We still need to call 'remove_endpoint' for existing IP to remove it from system.peers. + slogger.warn("Host ID collision for {} between {} and {}; {} is the new owner", host_id, *existing, endpoint, endpoint); do_remove_node(*existing); slogger.info("Set host_id={} to be owned by node={}, existing={}", host_id, endpoint, *existing); tmptr->update_host_id(host_id, endpoint); + tmptr->get_new()->update_host_id(host_id, endpoint); } else { + // The new IP has smaller generation than the existing one, + // we are going to remove it, so we add it to the endpoints_to_remove. + // How does this relate to the tokens this endpoint may have? + // There is a condition below which checks that if endpoints_to_remove + // contains 'endpoint', then the owned_tokens must be empty, otherwise internal_error + // is triggered. This means the following is expected to be true: + // 1. each token from the tokens variable (which is read from gossiper) must have an owner node + // 2. this owner must be different from 'endpoint' + // 3. its generation must be greater than endpoint's + slogger.warn("Host ID collision for {} between {} and {}; ignored {}", host_id, *existing, endpoint, endpoint); do_remove_node(endpoint); } } else if (existing && *existing == endpoint) { + // This branch is taken for all gossiper-managed topology operations. + // For example, if this node is a member of the cluster and a new node is added, + // handle_state_normal is called on this node as the final step + // in the endpoint bootstrap process. + // This method is also called for both replace scenarios - with either the same or with a different IP. + // If the new node has a different IP, the old IP is removed by the block of + // logic below - we detach the old IP from token ring, + // it gets added to candidates_for_removal, then storage_service::remove_endpoint -> + // _gossiper.remove_endpoint -> storage_service::on_remove -> remove from token_metadata. + // If the new node has the same IP, we need to explicitly remove old host_id from + // token_metadata, since no IPs will be removed in this case. + // We do this after update_normal_tokens, allowing for tokens to be properly + // migrated to the new host_id. tmptr->del_replacing_endpoint(endpoint); + if (const auto old_host_id = tmptr->get_new()->get_host_id_if_known(endpoint); old_host_id && *old_host_id != host_id) { + replaced_id = *old_host_id; + } } else { + // This branch is taken if this node wasn't involved in node_ops + // workflow (storage_service::node_ops_cmd_handler wasn't called on it) and it just + // receives the current state of the cluster from the gossiper. + // For example, a new node receives this notification for every + // existing node in the cluster. tmptr->del_replacing_endpoint(endpoint); + auto nodes = _gossiper.get_nodes_with_host_id(host_id); bool left = std::any_of(nodes.begin(), nodes.end(), [this] (const gms::inet_address& node) { return _gossiper.is_left(node); }); if (left) { @@ -3726,6 +3783,7 @@ future<> storage_service::handle_state_normal(inet_address endpoint, gms::permit } slogger.info("Set host_id={} to be owned by node={}", host_id, endpoint); tmptr->update_host_id(host_id, endpoint); + tmptr->get_new()->update_host_id(host_id, endpoint); } // Tokens owned by the handled endpoint. @@ -3739,6 +3797,16 @@ future<> storage_service::handle_state_normal(inet_address endpoint, gms::permit std::unordered_map token_to_endpoint_map = get_token_metadata().get_token_to_endpoint(); std::unordered_set candidates_for_removal; + // Here we convert tokens from gossiper to owned_tokens, which will be assigned as a new + // normal tokens to token_metadata and its new host_id-based version. + // This transformation accounts for situations where some tokens + // belong to outdated nodes - the ones with smaller generation. + // We use endpoints instead of host_ids here since gossiper operates + // with endpoints and generations are tied to endpoints, not host_ids. + // In replace-with-same-ip scenario we won't be able to distinguish + // between the old and new IP owners, so we assume the old replica + // is down and won't be resurrected. + for (auto t : tokens) { // we don't want to update if this node is responsible for the token and it has a later startup time than endpoint. auto current = token_to_endpoint_map.find(t); @@ -3806,8 +3874,21 @@ future<> storage_service::handle_state_normal(inet_address endpoint, gms::permit do_notify_joined = true; } - tmptr->update_topology(endpoint, get_dc_rack_for(endpoint), locator::node::state::normal); + const auto dc_rack = get_dc_rack_for(endpoint); + tmptr->update_topology(endpoint, dc_rack, locator::node::state::normal); + tmptr->get_new()->update_topology(host_id, dc_rack, locator::node::state::normal); co_await tmptr->update_normal_tokens(owned_tokens, endpoint); + co_await tmptr->get_new()->update_normal_tokens(owned_tokens, host_id); + if (replaced_id) { + if (tmptr->get_new()->is_normal_token_owner(*replaced_id)) { + on_internal_error(slogger, ::format("replaced endpoint={}/{} still owns tokens {}", + endpoint, *replaced_id, tmptr->get_new()->get_tokens(*replaced_id))); + } else { + tmptr->get_new()->remove_endpoint(*replaced_id); + slogger.info("node {}/{} is removed from token_metadata since it's replaced by {}/{} ", + endpoint, *replaced_id, endpoint, host_id); + } + } } co_await update_topology_change_info(tmptr, ::format("handle_state_normal {}", endpoint));