Merge 'sync_raft_topology_nodes: force_remove_endpoint for left nodes only if an IP is not used by other nodes' from Petr Gusev

Before the patch we called `gossiper.remove_endpoint` for IP-s of the
left nodes. The problem is that in replace-with-same-ip scenario we
called `gossiper.remove_endpoint` for IP which is used by the new,
replacing node. The `gossiper.remove_endpoint` method puts the IP into
quarantine, which means gossiper will ignore all events about this IP
for `quarantine_delay` (one minute by default). If we immediately
replace just replaced node with the same IP again, the bootstrap will
fail since the gossiper events are blocked for this IP, and we won't be
able to resolve an IP for the new host_id.

Another problem was that we called gossiper.remove_endpoint method,
which doesn't remove an endpoint from `_endpoint_state_map`, only from
live and unreachable lists. This means the IP will keep circulating in
the gossiper message exchange between cluster nodes until full cluster
restart.

This patch fixes both of these problems. First, we rely on the fact that
when topology coordinator moves the `being_replaced` node to the left
state, the IP of the `replacing` node is known to all nodes. This means
before removing an IP from the gossiper we can check if this IP is
currently used by another node in the current raft topology. This is
done by constructing the `used_ips` map based on normal and transition
nodes. This map is cached to avoid quadratic behaviour.

Second, we call `gossiper.force_remove_endpoint`, not
`gossiper.remove_endpoint`. This function removes and IP from
`_endpoint_state_map`, as well as from live and unreachable lists.

Closes scylladb/scylladb#16820

* github.com:scylladb/scylladb:
  get_peer_info_for_update: update only required fields in raft topology mode
  get_peer_info_for_update: introduce set_field lambda
  storage_service::on_change: fix indent
  storage_service::on_change: skip handle_state functions in raft topology mode
  test_replace_different_ip: check old IP is removed from gossiper
  test_replace: check two replace with same IP one after another
  storage_service: sync_raft_topology_nodes: force_remove_endpoint for left nodes only if an IP is not used by other nodes
This commit is contained in:
Kamil Braun
2024-01-22 11:25:55 +01:00
4 changed files with 143 additions and 65 deletions

View File

@@ -368,6 +368,7 @@ static locator::node::state to_topology_node_state(node_state ns) {
// gossiper) to align it with the other raft topology nodes.
future<> storage_service::sync_raft_topology_nodes(mutable_token_metadata_ptr tmptr, std::optional<locator::host_id> target_node) {
const auto& am = _group0->address_map();
const auto& t = _topology_state_machine._topology;
auto update_topology = [&] (locator::host_id id, std::optional<inet_address> ip, const replica_state& rs) {
tmptr->update_topology(id, locator::endpoint_dc_rack{rs.datacenter, rs.rack},
@@ -377,11 +378,27 @@ future<> storage_service::sync_raft_topology_nodes(mutable_token_metadata_ptr tm
}
};
auto process_left_node = [&] (raft::server_id id) -> future<> {
auto ip = am.find(id);
auto get_used_ips = [&, used_ips = std::optional<std::unordered_set<inet_address>>{}]() mutable
-> const std::unordered_set<inet_address>&
{
if (!used_ips) {
used_ips.emplace();
for (const auto& [sid, rs]: boost::range::join(t.normal_nodes, t.transition_nodes)) {
if (const auto used_ip = am.find(sid)) {
used_ips->insert(*used_ip);
}
}
}
return *used_ips;
};
if (ip && (_gossiper.get_live_members().contains(*ip) || _gossiper.get_unreachable_members().contains(*ip))) {
co_await remove_endpoint(*ip, gms::null_permit_id);
auto process_left_node = [&] (raft::server_id id) -> future<> {
if (const auto ip = am.find(id)) {
co_await _sys_ks.local().remove_endpoint(*ip);
if (_gossiper.get_endpoint_state_ptr(*ip) && !get_used_ips().contains(*ip)) {
co_await _gossiper.force_remove_endpoint(*ip, gms::null_permit_id);
}
}
// FIXME: when removing a node from the cluster through `removenode`, we should ban it early,
@@ -496,8 +513,6 @@ future<> storage_service::sync_raft_topology_nodes(mutable_token_metadata_ptr tm
}
};
const auto& t = _topology_state_machine._topology;
if (target_node) {
raft::server_id raft_id{target_node->uuid()};
if (t.left_nodes.contains(raft_id)) {
@@ -4319,11 +4334,6 @@ future<> storage_service::handle_state_bootstrap(inet_address endpoint, gms::per
future<> storage_service::handle_state_normal(inet_address endpoint, gms::permit_id pid) {
slogger.debug("endpoint={} handle_state_normal: permit_id={}", endpoint, pid);
if (_raft_topology_change_enabled) {
slogger.debug("ignore handle_state_normal since topology changes are using raft");
co_return;
}
auto tokens = get_tokens_for(endpoint);
slogger.info("Node {} is in normal state, tokens: {}", endpoint, tokens);
@@ -4563,11 +4573,6 @@ future<> storage_service::handle_state_normal(inet_address endpoint, gms::permit
future<> storage_service::handle_state_left(inet_address endpoint, std::vector<sstring> pieces, gms::permit_id pid) {
slogger.debug("endpoint={} handle_state_left: permit_id={}", endpoint, pid);
if (_raft_topology_change_enabled) {
slogger.debug("ignore handle_state_left since topology changes are using raft");
co_return;
}
if (pieces.size() < 2) {
slogger.warn("Fail to handle_state_left endpoint={} pieces={}", endpoint, pieces);
co_return;
@@ -4644,27 +4649,31 @@ future<> storage_service::on_change(gms::inet_address endpoint, const gms::appli
// copy the states map locally since the coroutine may yield
auto states = states_;
slogger.debug("endpoint={} on_change: states={}, permit_id={}", endpoint, states, pid);
co_await on_application_state_change(endpoint, states, application_state::STATUS, pid, [this] (inet_address endpoint, const gms::versioned_value& value, gms::permit_id pid) -> future<> {
std::vector<sstring> pieces;
boost::split(pieces, value.value(), boost::is_any_of(sstring(versioned_value::DELIMITER_STR)));
if (pieces.empty()) {
slogger.warn("Fail to split status in on_change: endpoint={}, app_state={}, value={}", endpoint, application_state::STATUS, value);
co_return;
}
const sstring& move_name = pieces[0];
if (move_name == sstring(versioned_value::STATUS_BOOTSTRAPPING)) {
co_await handle_state_bootstrap(endpoint, pid);
} else if (move_name == sstring(versioned_value::STATUS_NORMAL) ||
move_name == sstring(versioned_value::SHUTDOWN)) {
co_await handle_state_normal(endpoint, pid);
} else if (move_name == sstring(versioned_value::REMOVED_TOKEN)) {
co_await handle_state_removed(endpoint, std::move(pieces), pid);
} else if (move_name == sstring(versioned_value::STATUS_LEFT)) {
co_await handle_state_left(endpoint, std::move(pieces), pid);
} else {
co_return; // did nothing.
}
});
if (_raft_topology_change_enabled) {
slogger.debug("ignore status changes since topology changes are using raft");
} else {
co_await on_application_state_change(endpoint, states, application_state::STATUS, pid, [this] (inet_address endpoint, const gms::versioned_value& value, gms::permit_id pid) -> future<> {
std::vector<sstring> pieces;
boost::split(pieces, value.value(), boost::is_any_of(sstring(versioned_value::DELIMITER_STR)));
if (pieces.empty()) {
slogger.warn("Fail to split status in on_change: endpoint={}, app_state={}, value={}", endpoint, application_state::STATUS, value);
co_return;
}
const sstring& move_name = pieces[0];
if (move_name == sstring(versioned_value::STATUS_BOOTSTRAPPING)) {
co_await handle_state_bootstrap(endpoint, pid);
} else if (move_name == sstring(versioned_value::STATUS_NORMAL) ||
move_name == sstring(versioned_value::SHUTDOWN)) {
co_await handle_state_normal(endpoint, pid);
} else if (move_name == sstring(versioned_value::REMOVED_TOKEN)) {
co_await handle_state_removed(endpoint, std::move(pieces), pid);
} else if (move_name == sstring(versioned_value::STATUS_LEFT)) {
co_await handle_state_left(endpoint, std::move(pieces), pid);
} else {
co_return; // did nothing.
}
});
}
auto ep_state = _gossiper.get_endpoint_state_ptr(endpoint);
if (!ep_state || _gossiper.is_dead_state(*ep_state)) {
slogger.debug("Ignoring state change for dead or unknown endpoint: {}", endpoint);
@@ -4755,56 +4764,50 @@ db::system_keyspace::peer_info storage_service::get_peer_info_for_update(inet_ad
db::system_keyspace::peer_info storage_service::get_peer_info_for_update(inet_address endpoint, const gms::application_state_map& app_state_map) {
db::system_keyspace::peer_info ret;
auto insert_string = [&] (std::optional<sstring>& opt, const gms::versioned_value& value, std::string_view) {
opt.emplace(value.value());
};
auto insert_address = [&] (std::optional<net::inet_address>& opt, const gms::versioned_value& value, std::string_view name) {
net::inet_address addr;
try {
addr = net::inet_address(value.value());
} catch (...) {
on_internal_error(slogger, format("failed to parse {} {} for {}: {}", name, value.value(), endpoint, std::current_exception()));
auto set_field = [&]<typename T> (std::optional<T>& field,
const gms::versioned_value& value,
std::string_view name,
bool managed_by_raft_in_raft_topology)
{
if (_raft_topology_change_enabled && managed_by_raft_in_raft_topology) {
return;
}
opt.emplace(addr);
};
auto insert_uuid = [&] (std::optional<utils::UUID>& opt, const gms::versioned_value& value, std::string_view name) {
utils::UUID id;
try {
id = utils::UUID(value.value());
field = T(value.value());
} catch (...) {
on_internal_error(slogger, format("failed to parse {} {} for {}: {}", name, value.value(), endpoint, std::current_exception()));
on_internal_error(slogger, format("failed to parse {} {} for {}: {}", name, value.value(),
endpoint, std::current_exception()));
}
opt.emplace(id);
};
for (const auto& [state, value] : app_state_map) {
switch (state) {
case application_state::DC:
insert_string(ret.data_center, value, "data_center");
set_field(ret.data_center, value, "data_center", true);
break;
case application_state::HOST_ID:
insert_uuid(ret.host_id, value, "host_id");
set_field(ret.host_id, value, "host_id", true);
break;
case application_state::INTERNAL_IP:
insert_address(ret.preferred_ip, value, "preferred_ip");
set_field(ret.preferred_ip, value, "preferred_ip", false);
break;
case application_state::RACK:
insert_string(ret.rack, value, "rack");
set_field(ret.rack, value, "rack", true);
break;
case application_state::RELEASE_VERSION:
insert_string(ret.release_version, value, "release_version");
set_field(ret.release_version, value, "release_version", true);
break;
case application_state::RPC_ADDRESS:
insert_address(ret.rpc_address, value, "rpc_address");
set_field(ret.rpc_address, value, "rpc_address", false);
break;
case application_state::SCHEMA:
insert_uuid(ret.schema_version, value, "schema_version");
set_field(ret.schema_version, value, "schema_version", false);
break;
case application_state::TOKENS:
// tokens are updated separately
break;
case application_state::SUPPORTED_FEATURES:
insert_string(ret.supported_features, value, "supported_features");
set_field(ret.supported_features, value, "supported_features", true);
break;
default:
break;

View File

@@ -475,7 +475,7 @@ public:
virtual void on_drop_view(const sstring& ks_name, const sstring& view_name) override {}
private:
db::system_keyspace::peer_info get_peer_info_for_update(inet_address endpoint);
static db::system_keyspace::peer_info get_peer_info_for_update(inet_address endpoint, const gms::application_state_map& app_state_map);
db::system_keyspace::peer_info get_peer_info_for_update(inet_address endpoint, const gms::application_state_map& app_state_map);
std::unordered_set<token> get_tokens_for(inet_address endpoint);
std::optional<locator::endpoint_dc_rack> get_dc_rack_for(const gms::endpoint_state& ep_state);

View File

@@ -9,19 +9,60 @@ Test replacing node in different scenarios
import time
from test.pylib.scylla_cluster import ReplaceConfig
from test.pylib.manager_client import ManagerClient
from test.topology.util import wait_for_token_ring_and_group0_consistency
from test.topology.util import wait_for_token_ring_and_group0_consistency, wait_for_cql_and_get_hosts, wait_for
import pytest
import logging
logger = logging.getLogger(__name__)
@pytest.mark.asyncio
async def test_replace_different_ip(manager: ManagerClient) -> None:
"""Replace an existing node with new node using a different IP address"""
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': 2000})
logger.info(f"cluster started, servers {servers}")
logger.info(f"replacing server {servers[0]}")
await manager.server_stop(servers[0].server_id)
replace_cfg = ReplaceConfig(replaced_id = servers[0].server_id, reuse_ip_addr = False, use_host_id = False)
await manager.server_add(replace_cfg)
replaced_server = servers[0]
replace_cfg = ReplaceConfig(replaced_id = replaced_server.server_id, reuse_ip_addr = False, use_host_id = False)
new_server = await manager.server_add(replace_cfg)
cql = manager.get_cql()
servers = await manager.running_servers()
all_ips = set([s.rpc_address for s in servers])
logger.info(f"new server {new_server} started, all ips {all_ips}, "
"waiting for token ring and group0 consistency")
await wait_for_token_ring_and_group0_consistency(manager, time.time() + 30)
for s in servers:
peers_to_see = all_ips - {s.rpc_address}
logger.info(f'waiting for cql and get hosts for {s}')
h = (await wait_for_cql_and_get_hosts(cql, [s], time.time() + 60))[0]
logger.info(f"waiting for {s} to see its peers {peers_to_see}")
async def check_peers_and_gossiper():
peers = set([r.peer for r in await cql.run_async("select peer from system.peers", host=h)])
remaining = peers_to_see - peers
if remaining:
logger.info(f"server {h} doesn't see its peers, all_ips {all_ips}, peers_to_see {peers_to_see}, remaining {remaining}, continue waiting")
return None
alive_eps = await manager.api.get_alive_endpoints(s.ip_addr)
if replaced_server.ip_addr in alive_eps:
logger.info(f"server {h}, replaced ip {replaced_server.ip_addr} is contained in alive eps {alive_eps}, continue waiting")
return None
down_eps = await manager.api.get_down_endpoints(s.ip_addr)
if replaced_server.ip_addr in down_eps:
logger.info(f"server {h}, replaced ip {replaced_server.ip_addr} is contained in down eps {down_eps}, continue waiting")
return None
return True
await wait_for(check_peers_and_gossiper, time.time() + 60)
logger.info(f"server {s} system.peers and gossiper state is valid")
@pytest.mark.asyncio
async def test_replace_different_ip_using_host_id(manager: ManagerClient) -> None:
"""Replace an existing node with new node reusing the replaced node host id"""

View File

@@ -0,0 +1,34 @@
#
# Copyright (C) 2024-present ScyllaDB
#
# SPDX-License-Identifier: AGPL-3.0-or-later
#
import time
from test.pylib.scylla_cluster import ReplaceConfig
from test.pylib.manager_client import ManagerClient
from test.topology.util import wait_for_token_ring_and_group0_consistency
from test.pylib.internal_types import ServerInfo
import pytest
import logging
logger = logging.getLogger(__name__)
@pytest.mark.asyncio
async def test_replace_with_same_ip_twice(manager: ManagerClient) -> None:
logger.info("starting a cluster with two nodes")
servers = await manager.servers_add(3, config={'failure_detector_timeout_in_ms': 2000})
logger.info(f"cluster started {servers}")
async def replace_with_same_ip(s: ServerInfo) -> ServerInfo:
logger.info(f"stopping server {s.server_id}")
await manager.server_stop_gracefully(s.server_id)
logger.info(f"replacing server {s.server_id} with same ip")
replace_cfg = ReplaceConfig(replaced_id = s.server_id, reuse_ip_addr = True, use_host_id = False)
return await manager.server_add(replace_cfg)
test_server = servers[1]
test_server = await replace_with_same_ip(test_server)
await replace_with_same_ip(test_server)