diff --git a/service/storage_service.cc b/service/storage_service.cc index 1050834f9a..6ccfd2eb91 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -368,6 +368,7 @@ static locator::node::state to_topology_node_state(node_state ns) { // gossiper) to align it with the other raft topology nodes. future<> storage_service::sync_raft_topology_nodes(mutable_token_metadata_ptr tmptr, std::optional target_node) { const auto& am = _group0->address_map(); + const auto& t = _topology_state_machine._topology; auto update_topology = [&] (locator::host_id id, std::optional ip, const replica_state& rs) { tmptr->update_topology(id, locator::endpoint_dc_rack{rs.datacenter, rs.rack}, @@ -377,11 +378,27 @@ future<> storage_service::sync_raft_topology_nodes(mutable_token_metadata_ptr tm } }; - auto process_left_node = [&] (raft::server_id id) -> future<> { - auto ip = am.find(id); + auto get_used_ips = [&, used_ips = std::optional>{}]() mutable + -> const std::unordered_set& + { + if (!used_ips) { + used_ips.emplace(); + for (const auto& [sid, rs]: boost::range::join(t.normal_nodes, t.transition_nodes)) { + if (const auto used_ip = am.find(sid)) { + used_ips->insert(*used_ip); + } + } + } + return *used_ips; + }; - if (ip && (_gossiper.get_live_members().contains(*ip) || _gossiper.get_unreachable_members().contains(*ip))) { - co_await remove_endpoint(*ip, gms::null_permit_id); + auto process_left_node = [&] (raft::server_id id) -> future<> { + if (const auto ip = am.find(id)) { + co_await _sys_ks.local().remove_endpoint(*ip); + + if (_gossiper.get_endpoint_state_ptr(*ip) && !get_used_ips().contains(*ip)) { + co_await _gossiper.force_remove_endpoint(*ip, gms::null_permit_id); + } } // FIXME: when removing a node from the cluster through `removenode`, we should ban it early, @@ -496,8 +513,6 @@ future<> storage_service::sync_raft_topology_nodes(mutable_token_metadata_ptr tm } }; - const auto& t = _topology_state_machine._topology; - if (target_node) { raft::server_id raft_id{target_node->uuid()}; if (t.left_nodes.contains(raft_id)) { @@ -4319,11 +4334,6 @@ future<> storage_service::handle_state_bootstrap(inet_address endpoint, gms::per future<> storage_service::handle_state_normal(inet_address endpoint, gms::permit_id pid) { slogger.debug("endpoint={} handle_state_normal: permit_id={}", endpoint, pid); - if (_raft_topology_change_enabled) { - slogger.debug("ignore handle_state_normal since topology changes are using raft"); - co_return; - } - auto tokens = get_tokens_for(endpoint); slogger.info("Node {} is in normal state, tokens: {}", endpoint, tokens); @@ -4563,11 +4573,6 @@ future<> storage_service::handle_state_normal(inet_address endpoint, gms::permit future<> storage_service::handle_state_left(inet_address endpoint, std::vector pieces, gms::permit_id pid) { slogger.debug("endpoint={} handle_state_left: permit_id={}", endpoint, pid); - if (_raft_topology_change_enabled) { - slogger.debug("ignore handle_state_left since topology changes are using raft"); - co_return; - } - if (pieces.size() < 2) { slogger.warn("Fail to handle_state_left endpoint={} pieces={}", endpoint, pieces); co_return; @@ -4644,27 +4649,31 @@ future<> storage_service::on_change(gms::inet_address endpoint, const gms::appli // copy the states map locally since the coroutine may yield auto states = states_; slogger.debug("endpoint={} on_change: states={}, permit_id={}", endpoint, states, pid); - co_await on_application_state_change(endpoint, states, application_state::STATUS, pid, [this] (inet_address endpoint, const gms::versioned_value& value, gms::permit_id pid) -> future<> { - std::vector pieces; - boost::split(pieces, value.value(), boost::is_any_of(sstring(versioned_value::DELIMITER_STR))); - if (pieces.empty()) { - slogger.warn("Fail to split status in on_change: endpoint={}, app_state={}, value={}", endpoint, application_state::STATUS, value); - co_return; - } - const sstring& move_name = pieces[0]; - if (move_name == sstring(versioned_value::STATUS_BOOTSTRAPPING)) { - co_await handle_state_bootstrap(endpoint, pid); - } else if (move_name == sstring(versioned_value::STATUS_NORMAL) || - move_name == sstring(versioned_value::SHUTDOWN)) { - co_await handle_state_normal(endpoint, pid); - } else if (move_name == sstring(versioned_value::REMOVED_TOKEN)) { - co_await handle_state_removed(endpoint, std::move(pieces), pid); - } else if (move_name == sstring(versioned_value::STATUS_LEFT)) { - co_await handle_state_left(endpoint, std::move(pieces), pid); - } else { - co_return; // did nothing. - } - }); + if (_raft_topology_change_enabled) { + slogger.debug("ignore status changes since topology changes are using raft"); + } else { + co_await on_application_state_change(endpoint, states, application_state::STATUS, pid, [this] (inet_address endpoint, const gms::versioned_value& value, gms::permit_id pid) -> future<> { + std::vector pieces; + boost::split(pieces, value.value(), boost::is_any_of(sstring(versioned_value::DELIMITER_STR))); + if (pieces.empty()) { + slogger.warn("Fail to split status in on_change: endpoint={}, app_state={}, value={}", endpoint, application_state::STATUS, value); + co_return; + } + const sstring& move_name = pieces[0]; + if (move_name == sstring(versioned_value::STATUS_BOOTSTRAPPING)) { + co_await handle_state_bootstrap(endpoint, pid); + } else if (move_name == sstring(versioned_value::STATUS_NORMAL) || + move_name == sstring(versioned_value::SHUTDOWN)) { + co_await handle_state_normal(endpoint, pid); + } else if (move_name == sstring(versioned_value::REMOVED_TOKEN)) { + co_await handle_state_removed(endpoint, std::move(pieces), pid); + } else if (move_name == sstring(versioned_value::STATUS_LEFT)) { + co_await handle_state_left(endpoint, std::move(pieces), pid); + } else { + co_return; // did nothing. + } + }); + } auto ep_state = _gossiper.get_endpoint_state_ptr(endpoint); if (!ep_state || _gossiper.is_dead_state(*ep_state)) { slogger.debug("Ignoring state change for dead or unknown endpoint: {}", endpoint); @@ -4755,56 +4764,50 @@ db::system_keyspace::peer_info storage_service::get_peer_info_for_update(inet_ad db::system_keyspace::peer_info storage_service::get_peer_info_for_update(inet_address endpoint, const gms::application_state_map& app_state_map) { db::system_keyspace::peer_info ret; - auto insert_string = [&] (std::optional& opt, const gms::versioned_value& value, std::string_view) { - opt.emplace(value.value()); - }; - auto insert_address = [&] (std::optional& opt, const gms::versioned_value& value, std::string_view name) { - net::inet_address addr; - try { - addr = net::inet_address(value.value()); - } catch (...) { - on_internal_error(slogger, format("failed to parse {} {} for {}: {}", name, value.value(), endpoint, std::current_exception())); + auto set_field = [&] (std::optional& field, + const gms::versioned_value& value, + std::string_view name, + bool managed_by_raft_in_raft_topology) + { + if (_raft_topology_change_enabled && managed_by_raft_in_raft_topology) { + return; } - opt.emplace(addr); - }; - auto insert_uuid = [&] (std::optional& opt, const gms::versioned_value& value, std::string_view name) { - utils::UUID id; try { - id = utils::UUID(value.value()); + field = T(value.value()); } catch (...) { - on_internal_error(slogger, format("failed to parse {} {} for {}: {}", name, value.value(), endpoint, std::current_exception())); + on_internal_error(slogger, format("failed to parse {} {} for {}: {}", name, value.value(), + endpoint, std::current_exception())); } - opt.emplace(id); }; for (const auto& [state, value] : app_state_map) { switch (state) { case application_state::DC: - insert_string(ret.data_center, value, "data_center"); + set_field(ret.data_center, value, "data_center", true); break; case application_state::HOST_ID: - insert_uuid(ret.host_id, value, "host_id"); + set_field(ret.host_id, value, "host_id", true); break; case application_state::INTERNAL_IP: - insert_address(ret.preferred_ip, value, "preferred_ip"); + set_field(ret.preferred_ip, value, "preferred_ip", false); break; case application_state::RACK: - insert_string(ret.rack, value, "rack"); + set_field(ret.rack, value, "rack", true); break; case application_state::RELEASE_VERSION: - insert_string(ret.release_version, value, "release_version"); + set_field(ret.release_version, value, "release_version", true); break; case application_state::RPC_ADDRESS: - insert_address(ret.rpc_address, value, "rpc_address"); + set_field(ret.rpc_address, value, "rpc_address", false); break; case application_state::SCHEMA: - insert_uuid(ret.schema_version, value, "schema_version"); + set_field(ret.schema_version, value, "schema_version", false); break; case application_state::TOKENS: // tokens are updated separately break; case application_state::SUPPORTED_FEATURES: - insert_string(ret.supported_features, value, "supported_features"); + set_field(ret.supported_features, value, "supported_features", true); break; default: break; diff --git a/service/storage_service.hh b/service/storage_service.hh index 50b406613d..e43ad4ad1a 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -475,7 +475,7 @@ public: virtual void on_drop_view(const sstring& ks_name, const sstring& view_name) override {} private: db::system_keyspace::peer_info get_peer_info_for_update(inet_address endpoint); - static db::system_keyspace::peer_info get_peer_info_for_update(inet_address endpoint, const gms::application_state_map& app_state_map); + db::system_keyspace::peer_info get_peer_info_for_update(inet_address endpoint, const gms::application_state_map& app_state_map); std::unordered_set get_tokens_for(inet_address endpoint); std::optional get_dc_rack_for(const gms::endpoint_state& ep_state); diff --git a/test/topology_custom/test_replace.py b/test/topology_custom/test_replace.py index f081e5ff93..12ec73563a 100644 --- a/test/topology_custom/test_replace.py +++ b/test/topology_custom/test_replace.py @@ -9,19 +9,60 @@ Test replacing node in different scenarios import time from test.pylib.scylla_cluster import ReplaceConfig from test.pylib.manager_client import ManagerClient -from test.topology.util import wait_for_token_ring_and_group0_consistency +from test.topology.util import wait_for_token_ring_and_group0_consistency, wait_for_cql_and_get_hosts, wait_for import pytest +import logging + + +logger = logging.getLogger(__name__) @pytest.mark.asyncio async def test_replace_different_ip(manager: ManagerClient) -> None: """Replace an existing node with new node using a different IP address""" servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': 2000}) + logger.info(f"cluster started, servers {servers}") + + logger.info(f"replacing server {servers[0]}") await manager.server_stop(servers[0].server_id) - replace_cfg = ReplaceConfig(replaced_id = servers[0].server_id, reuse_ip_addr = False, use_host_id = False) - await manager.server_add(replace_cfg) + replaced_server = servers[0] + replace_cfg = ReplaceConfig(replaced_id = replaced_server.server_id, reuse_ip_addr = False, use_host_id = False) + new_server = await manager.server_add(replace_cfg) + cql = manager.get_cql() + servers = await manager.running_servers() + all_ips = set([s.rpc_address for s in servers]) + logger.info(f"new server {new_server} started, all ips {all_ips}, " + "waiting for token ring and group0 consistency") await wait_for_token_ring_and_group0_consistency(manager, time.time() + 30) + for s in servers: + peers_to_see = all_ips - {s.rpc_address} + + logger.info(f'waiting for cql and get hosts for {s}') + h = (await wait_for_cql_and_get_hosts(cql, [s], time.time() + 60))[0] + + logger.info(f"waiting for {s} to see its peers {peers_to_see}") + async def check_peers_and_gossiper(): + peers = set([r.peer for r in await cql.run_async("select peer from system.peers", host=h)]) + remaining = peers_to_see - peers + if remaining: + logger.info(f"server {h} doesn't see its peers, all_ips {all_ips}, peers_to_see {peers_to_see}, remaining {remaining}, continue waiting") + return None + + alive_eps = await manager.api.get_alive_endpoints(s.ip_addr) + if replaced_server.ip_addr in alive_eps: + logger.info(f"server {h}, replaced ip {replaced_server.ip_addr} is contained in alive eps {alive_eps}, continue waiting") + return None + + down_eps = await manager.api.get_down_endpoints(s.ip_addr) + if replaced_server.ip_addr in down_eps: + logger.info(f"server {h}, replaced ip {replaced_server.ip_addr} is contained in down eps {down_eps}, continue waiting") + return None + + return True + await wait_for(check_peers_and_gossiper, time.time() + 60) + logger.info(f"server {s} system.peers and gossiper state is valid") + @pytest.mark.asyncio async def test_replace_different_ip_using_host_id(manager: ManagerClient) -> None: """Replace an existing node with new node reusing the replaced node host id""" diff --git a/test/topology_custom/test_replace_with_same_ip_twice.py b/test/topology_custom/test_replace_with_same_ip_twice.py new file mode 100644 index 0000000000..08ff740a9d --- /dev/null +++ b/test/topology_custom/test_replace_with_same_ip_twice.py @@ -0,0 +1,34 @@ +# +# Copyright (C) 2024-present ScyllaDB +# +# SPDX-License-Identifier: AGPL-3.0-or-later +# +import time +from test.pylib.scylla_cluster import ReplaceConfig +from test.pylib.manager_client import ManagerClient +from test.topology.util import wait_for_token_ring_and_group0_consistency +from test.pylib.internal_types import ServerInfo +import pytest +import logging + + +logger = logging.getLogger(__name__) + + +@pytest.mark.asyncio +async def test_replace_with_same_ip_twice(manager: ManagerClient) -> None: + logger.info("starting a cluster with two nodes") + servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': 2000}) + logger.info(f"cluster started {servers}") + + async def replace_with_same_ip(s: ServerInfo) -> ServerInfo: + logger.info(f"stopping server {s.server_id}") + await manager.server_stop_gracefully(s.server_id) + + logger.info(f"replacing server {s.server_id} with same ip") + replace_cfg = ReplaceConfig(replaced_id = s.server_id, reuse_ip_addr = True, use_host_id = False) + return await manager.server_add(replace_cfg) + + test_server = servers[1] + test_server = await replace_with_same_ip(test_server) + await replace_with_same_ip(test_server)