test: use different IP addresses for listen and RPC addresses

Scylla can be configured to use different IPs for the internode communication
and client connections.  This test allocates and configure unique IP addresses
for the client connections (`rpc_address`) for 2-nodes cluster.

Two scenarios tested:
  1) Change RPC IPs sequentially
  2) Change RPC IPs simultaneously

Closes scylladb/scylladb#15965
This commit is contained in:
Evgeniy Naydanov
2023-11-06 12:57:18 +00:00
committed by Avi Kivity
parent c36945dea2
commit 10eebe3c66
5 changed files with 170 additions and 19 deletions

View File

@@ -18,5 +18,7 @@ class ServerInfo(NamedTuple):
"""Server id (test local) and IP address"""
server_id: ServerNum
ip_addr: IPAddress
rpc_address: IPAddress
def __str__(self):
return f"Server({self.server_id}, {self.ip_addr})"
return f"Server({self.server_id}, {self.ip_addr}, {self.rpc_address})"

View File

@@ -57,7 +57,7 @@ class ManagerClient():
async def driver_connect(self, server: Optional[ServerInfo] = None) -> None:
"""Connect to cluster"""
targets = [server] if server else await self.running_servers()
servers = [s_info.ip_addr for s_info in targets]
servers = [s_info.rpc_address for s_info in targets]
logger.debug("driver connecting to %s", servers)
self.ccluster = self.con_gen(servers, self.port, self.use_ssl, self.auth_provider)
self.cql = self.ccluster.connect()
@@ -122,14 +122,14 @@ class ManagerClient():
"""Get number of configured replicas for the cluster (replication factor)"""
return await self.client.get_json("/cluster/replicas")
async def running_servers(self) -> List[ServerInfo]:
async def running_servers(self) -> list[ServerInfo]:
"""Get List of server info (id and IP address) of running servers"""
try:
server_info_list = await self.client.get_json("/cluster/running-servers")
except RuntimeError as exc:
raise Exception("Failed to get list of running servers") from exc
assert isinstance(server_info_list, list), "running_servers got unknown data type"
return [ServerInfo(ServerNum(int(info[0])), IPAddress(info[1]))
return [ServerInfo(ServerNum(int(info[0])), IPAddress(info[1]), IPAddress(info[2]))
for info in server_info_list]
async def mark_dirty(self) -> None:
@@ -218,7 +218,8 @@ class ManagerClient():
raise Exception("Failed to add server") from exc
try:
s_info = ServerInfo(ServerNum(int(server_info["server_id"])),
IPAddress(server_info["ip_addr"]))
IPAddress(server_info["ip_addr"]),
IPAddress(server_info["rpc_address"]))
except Exception as exc:
raise RuntimeError(f"server_add got invalid server data {server_info}") from exc
logger.debug("ManagerClient added %s", s_info)
@@ -251,7 +252,8 @@ class ManagerClient():
for server_info in server_infos:
try:
s_info = ServerInfo(ServerNum(int(server_info["server_id"])),
IPAddress(server_info["ip_addr"]))
IPAddress(server_info["ip_addr"]),
IPAddress(server_info["rpc_address"]))
s_infos.append(s_info)
except Exception as exc:
raise RuntimeError(f"servers_add got invalid server data {server_info}") from exc
@@ -304,6 +306,21 @@ class ManagerClient():
response_type="json")
return IPAddress(ret["ip_addr"])
async def server_change_rpc_address(self, server_id: ServerNum) -> IPAddress:
"""Change server RPC IP address.
Applicable only to a stopped server.
"""
ret = await self.client.put_json(
resource_uri=f"/cluster/server/{server_id}/change_rpc_address",
data={},
response_type="json",
)
rpc_address = ret["rpc_address"]
logger.debug("ManagerClient has changed RPC IP for server %s to %s", server_id, rpc_address)
return IPAddress(rpc_address)
async def wait_for_host_known(self, dst_server_ip: IPAddress, expect_host_id: HostID,
deadline: Optional[float] = None) -> None:
"""Waits until dst_server_id knows about expect_host_id, with timeout"""

View File

@@ -251,6 +251,17 @@ class ScyllaServer:
self.config["alternator_address"] = ip_addr
self._write_config_file()
@property
def rpc_address(self) -> IPAddress:
return self.config["rpc_address"]
def change_rpc_address(self, rpc_address: IPAddress) -> None:
"""Change RPC IP address of the current server. Pre: the server is
stopped"""
if self.is_running:
raise RuntimeError(f"Can't change RPC IP of a running server {self.config['rpc_address']}.")
self.config["rpc_address"] = rpc_address
self._write_config_file()
async def install_and_start(self, api: ScyllaRESTAPIClient, expected_error: Optional[str] = None) -> None:
"""Setup and start this server"""
@@ -353,7 +364,7 @@ class ScyllaServer:
# words, even after CQL port is up, Scylla may still be
# initializing. When the role is ready, queries begin to
# work, so rely on this "side effect".
profile = ExecutionProfile(load_balancing_policy=WhiteListRoundRobinPolicy([self.ip_addr]),
profile = ExecutionProfile(load_balancing_policy=WhiteListRoundRobinPolicy([self.rpc_address]),
request_timeout=self.TOPOLOGY_TIMEOUT)
connected = False
try:
@@ -362,7 +373,7 @@ class ScyllaServer:
# point, so make sure we execute the checks strictly via
# this connection
with Cluster(execution_profiles={EXEC_PROFILE_DEFAULT: profile},
contact_points=[self.ip_addr],
contact_points=[self.rpc_address],
# This is the latest version Scylla supports
protocol_version=4,
control_connection_timeout=self.TOPOLOGY_TIMEOUT,
@@ -374,7 +385,7 @@ class ScyllaServer:
session.execute("SELECT key FROM system.local where key = 'local'")
self.control_cluster = Cluster(execution_profiles=
{EXEC_PROFILE_DEFAULT: profile},
contact_points=[self.ip_addr],
contact_points=[self.rpc_address],
control_connection_timeout=self.TOPOLOGY_TIMEOUT,
auth_provider=auth)
self.control_connection = self.control_cluster.connect()
@@ -768,7 +779,7 @@ class ScyllaCluster:
else:
self.stopped[server.server_id] = server
self.logger.info("Cluster %s added %s", self, server)
return ServerInfo(server.server_id, server.ip_addr)
return ServerInfo(server.server_id, server.ip_addr, server.rpc_address)
async def add_servers(self, servers_num: int = 1,
cmdline: Optional[List[str]] = None,
@@ -812,9 +823,9 @@ class ScyllaCluster:
stopped = ", ".join(str(server) for server in self.stopped.values())
return f"ScyllaCluster(name: {self.name}, running: {running}, stopped: {stopped})"
def running_servers(self) -> List[Tuple[ServerNum, IPAddress]]:
def running_servers(self) -> list[tuple[ServerNum, IPAddress, IPAddress]]:
"""Get a list of tuples of server id and IP address of running servers (and not removed)"""
return [(server.server_id, server.ip_addr) for server in self.running.values()
return [(server.server_id, server.ip_addr, server.rpc_address) for server in self.running.values()
if server.server_id not in self.removed]
def _get_keyspace_count(self) -> int:
@@ -962,6 +973,21 @@ class ScyllaCluster:
server.change_ip(ip_addr)
return ip_addr
async def change_rpc_address(self, server_id: ServerNum) -> IPAddress:
"""Lease a new IP address and update conf/scylla.yaml with it. The
original IP is released at the end of the test to avoid an
immediate recycle within the same cluster. The server must be
stopped before its ip is changed."""
assert server_id in self.servers, f"Server {server_id} unknown"
server = self.servers[server_id]
assert not server.is_running, f"Server {server_id} is running: stop it first and then change its ip"
self.is_dirty = True
rpc_address = IPAddress(await self.host_registry.lease_host())
self.leased_ips.add(rpc_address)
logging.info("Cluster %s changed server %s RPC IP from %s to %s", self.name,
server_id, server.config["rpc_address"], rpc_address)
server.change_rpc_address(rpc_address)
return rpc_address
class ScyllaClusterManager:
"""Manages a Scylla cluster for running test cases
@@ -1088,6 +1114,7 @@ class ScyllaClusterManager:
add_get('/cluster/server/{server_id}/get_config', self._server_get_config)
add_put('/cluster/server/{server_id}/update_config', self._server_update_config)
add_put('/cluster/server/{server_id}/change_ip', self._server_change_ip)
add_put('/cluster/server/{server_id}/change_rpc_address', self._server_change_rpc_address)
add_get('/cluster/server/{server_id}/get_log_filename', self._server_get_log_filename)
add_get('/cluster/server/{server_id}/workdir', self._server_get_workdir)
add_get('/cluster/server/{server_id}/exe', self._server_get_exe)
@@ -1109,7 +1136,7 @@ class ScyllaClusterManager:
assert self.cluster
return self.cluster.replicas
async def _cluster_running_servers(self, _request) -> list[tuple[ServerNum, IPAddress]]:
async def _cluster_running_servers(self, _request) -> list[tuple[ServerNum, IPAddress, IPAddress]]:
"""Return a dict of running server ids to IPs"""
return self.cluster.running_servers()
@@ -1196,7 +1223,7 @@ class ScyllaClusterManager:
s_info = await self.cluster.add_server(replace_cfg, data.get('cmdline'), data.get('config'),
data.get('property_file'), data.get('start', True),
data.get('expected_error', None))
return {"server_id" : s_info.server_id, "ip_addr": s_info.ip_addr}
return {"server_id": s_info.server_id, "ip_addr": s_info.ip_addr, "rpc_address": s_info.rpc_address}
async def _cluster_servers_add(self, request) -> list[dict[str, object]]:
"""Add new servers concurrently"""
@@ -1205,7 +1232,10 @@ class ScyllaClusterManager:
s_infos = await self.cluster.add_servers(data.get('servers_num'), data.get('cmdline'), data.get('config'),
data.get('property_file'), data.get('start', True),
data.get('expected_error', None))
return [{"server_id" : s_info.server_id, "ip_addr": s_info.ip_addr} for s_info in s_infos]
return [
{"server_id": s_info.server_id, "ip_addr": s_info.ip_addr, "rpc_address": s_info.rpc_address}
for s_info in s_infos
]
async def _cluster_remove_node(self, request: aiohttp.web.Request) -> None:
"""Run remove node on Scylla REST API for a specified server"""
@@ -1321,6 +1351,13 @@ class ScyllaClusterManager:
ip_addr = await self.cluster.change_ip(server_id)
return {"ip_addr": ip_addr}
async def _server_change_rpc_address(self, request: aiohttp.web.Request) -> dict[str, object]:
"""Pass change_ip command for the given server to the cluster"""
assert self.cluster
server_id = ServerNum(int(request.match_info["server_id"]))
rpc_address = await self.cluster.change_rpc_address(server_id)
return {"rpc_address": rpc_address}
async def _server_get_attribute(self, request: aiohttp.web.Request, attribute: str):
"""Generic request handler which gets a particular attribute of a ScyllaServer instance

View File

@@ -10,7 +10,7 @@ import pathlib
import os
import pytest
from typing import Callable, Awaitable, Optional, TypeVar, Generic
from typing import Callable, Awaitable, Optional, TypeVar, Any
from cassandra.cluster import NoHostAvailable, Session, Cluster # type: ignore # pylint: disable=no-name-in-module
from cassandra.protocol import InvalidRequest # type: ignore # pylint: disable=no-name-in-module
@@ -40,13 +40,17 @@ def unique_name():
async def wait_for(
pred: Callable[[], Awaitable[Optional[T]]],
deadline: float, period: float = 1) -> T:
deadline: float,
period: float = 1,
before_retry: Optional[Callable[[], Any]] = None) -> T:
while True:
assert(time.time() < deadline), "Deadline exceeded, failing test."
res = await pred()
if res is not None:
return res
await asyncio.sleep(period)
if before_retry:
before_retry()
async def wait_for_cql(cql: Session, host: Host, deadline: float) -> None:
@@ -65,7 +69,7 @@ async def wait_for_cql_and_get_hosts(cql: Session, servers: list[ServerInfo], de
"""Wait until every server in `servers` is available through `cql`
and translate `servers` to a list of Cassandra `Host`s.
"""
ip_set = set(str(srv.ip_addr) for srv in servers)
ip_set = set(str(srv.rpc_address) for srv in servers)
async def get_hosts() -> Optional[list[Host]]:
hosts = cql.cluster.metadata.all_hosts()
remaining = ip_set - {h.address for h in hosts}
@@ -74,7 +78,11 @@ async def wait_for_cql_and_get_hosts(cql: Session, servers: list[ServerInfo], de
logging.info(f"Driver hasn't yet learned about hosts: {remaining}")
return None
hosts = await wait_for(get_hosts, deadline)
hosts = await wait_for(
pred=get_hosts,
deadline=deadline,
before_retry=lambda: cql.cluster.refresh_nodes(force_token_rebuild=True),
)
# Take only hosts from `ip_set` (there may be more)
hosts = [h for h in hosts if h.address in ip_set]

View File

@@ -0,0 +1,87 @@
#
# Copyright (C) 2023-present ScyllaDB
#
# SPDX-License-Identifier: AGPL-3.0-or-later
#
"""
Test clusters can restart fine after an RPC IP address change.
"""
from __future__ import annotations
import time
import asyncio
import logging
from typing import TYPE_CHECKING
import pytest
from test.topology.util import wait_for_token_ring_and_group0_consistency
if TYPE_CHECKING:
from test.pylib.random_tables import RandomTables
from test.pylib.internal_types import ServerNum
from test.pylib.manager_client import ManagerClient
logger = logging.getLogger(__name__)
@pytest.fixture
async def two_nodes_cluster(manager: ManagerClient) -> list[ServerNum]:
logger.info(f"Booting initial 2-nodes cluster")
servers = [(await manager.server_add(start=True)).server_id for _ in range(2)]
await wait_for_token_ring_and_group0_consistency(manager, time.time() + 30)
return servers
@pytest.mark.asyncio
async def test_change_rpc_address(two_nodes_cluster: list[ServerNum],
manager: ManagerClient,
random_tables: RandomTables) -> None:
"""Sequentially stop two nodes, change their RPC IPs and start, check the cluster is functional."""
table = await random_tables.add_table(ncolumns=5)
# Scenario 1
# ----------
logger.info("Change RPC IP addresses sequentially")
for server_id in two_nodes_cluster:
logger.info("Change RPC IP address for server %s", server_id)
# There is an issue with Python driver (probably related to scylladb/python-driver#170 and/or
# scylladb/python-driver#230) which can cause reconnect failure after stopping one node.
# As a workaround, close the connection and reconnect after starting the server.
manager.driver_close()
await manager.server_stop_gracefully(server_id)
await manager.server_change_rpc_address(server_id)
await manager.server_start(server_id)
# Connect the Python driver back with updated IP address.
await manager.driver_connect()
await table.add_column()
await random_tables.verify_schema()
await wait_for_token_ring_and_group0_consistency(manager=manager, deadline=time.time() + 30)
# Scenario 2
# ----------
logger.info("Change RPC IP addresses for both servers simultaneously")
manager.driver_close()
await asyncio.gather(*[manager.server_stop_gracefully(server_id) for server_id in two_nodes_cluster])
await asyncio.gather(*[manager.server_change_rpc_address(server_id) for server_id in two_nodes_cluster])
for server_id in two_nodes_cluster:
await manager.server_start(server_id)
# Connect the Python driver back with updated IP addresses.
await manager.driver_connect()
await table.add_column()
await random_tables.verify_schema()
await wait_for_token_ring_and_group0_consistency(manager=manager, deadline=time.time() + 30)