Merge 'test.py: Topology test pytest integration' from Andrei Chekun

Migrate cluster tests directory to be handled by pytest. This is the next step in process of unification of the tests and migration to the pytest.
With this PR cluster test will be executed with the full path to the file instead of `suite/test` paradigm.

Backport is not needed because it framework enhancement.

Fixes: https://scylladb.atlassian.net/browse/SCYLLADB-46

Closes scylladb/scylladb#27618

* github.com:scylladb/scylladb:
  test.py: remove setsid from the framework
  test.py: rename suite.yaml to test_config.yaml
  test.py: add cluster tests to be executed by pytest
  test.py: add random seed for topology tests reproducibility
  test.py: add explicit default values to pytest options
  test.py: replace SCYLLA env var with build_mode fixture
This commit is contained in:
Botond Dénes
2026-02-25 10:17:20 +02:00
17 changed files with 71 additions and 68 deletions

31
test.py
View File

@@ -68,6 +68,7 @@ PYTEST_RUNNER_DIRECTORIES = [
TEST_DIR / 'rest_api',
TEST_DIR / 'nodetool',
TEST_DIR / 'scylla_gdb',
TEST_DIR / 'cluster',
]
launch_time = time.monotonic()
@@ -323,16 +324,17 @@ def run_pytest(options: argparse.Namespace) -> tuple[int, list[SimpleNamespace]]
report_dir = temp_dir / 'report'
junit_output_file = report_dir / f'pytest_cpp_{HOST_ID}.xml'
files_to_run = []
for name in options.name:
file_name = name
if '::' in name:
file_name, _ = name.split('::', maxsplit=1)
if any((TOP_SRC_DIR / file_name).is_relative_to(x) for x in PYTEST_RUNNER_DIRECTORIES):
files_to_run.append(name)
if not options.name:
files_to_run = [str(directory) for directory in PYTEST_RUNNER_DIRECTORIES]
if options.name:
for name in options.name:
file_name = name
if '::' in name:
file_name, _ = name.split('::', maxsplit=1)
if any((TOP_SRC_DIR / file_name).is_relative_to(x) for x in PYTEST_RUNNER_DIRECTORIES):
files_to_run.append(name)
else:
files_to_run = [ TOP_SRC_DIR / 'test/']
if not files_to_run:
logging.info(f'No boost found. Skipping pytest execution for boost tests.')
logging.info('Skipping pytest execution because no tests were selected for pytest.')
return 0, []
args = [
'--color=yes',
@@ -342,12 +344,16 @@ def run_pytest(options: argparse.Namespace) -> tuple[int, list[SimpleNamespace]]
if options.list_tests:
args.extend(['--collect-only', '--quiet', '--no-header'])
else:
threads = int(options.jobs)
# debug mode is very CPU and memory hungry, so we need to lower the number of threads to be able to finish tests
if 'debug' in options.modes:
threads = int(threads * 0.5)
args.extend([
"--log-level=DEBUG", # Capture logs
f'--junit-xml={junit_output_file}',
"-rf",
'--test-py-init',
f'-n{int(options.jobs)}',
f'-n{threads}',
f'--tmpdir={temp_dir}',
f'--maxfail={options.max_failures}',
f'--alluredir={report_dir / f"allure_{HOST_ID}"}',
@@ -448,7 +454,10 @@ async def run_all_tests(signaled: asyncio.Event, options: argparse.Namespace) ->
failed = 0
deadline = time.perf_counter() + options.session_timeout
try:
result = run_pytest(options)
# Run pytest in an executor to avoid blocking the event loop
# This allows resource monitoring to run concurrently
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, run_pytest, options)
total_tests += result[0]
failed_tests.extend(result[1])
console.print_start_blurb()

View File

@@ -43,14 +43,14 @@ faulthandler.enable(all_threads=True)
# region.
def pytest_addoption(parser):
parser.addoption("--aws", action="store_true",
help="run against AWS instead of a local Scylla installation")
help="run against AWS instead of a local Scylla installation", default=False)
parser.addoption("--https", action="store_true",
help="communicate via HTTPS protocol on port 8043 instead of HTTP when"
" running against a local Scylla installation")
" running against a local Scylla installation", default=False)
parser.addoption("--url", action="store",
help="communicate with given URL instead of defaults")
help="communicate with given URL instead of defaults", default=None)
parser.addoption("--runveryslow", action="store_true",
help="run tests marked veryslow instead of skipping them")
help="run tests marked veryslow instead of skipping them", default=False)
add_host_option(parser)
def pytest_configure(config):

View File

@@ -78,7 +78,7 @@ def pytest_addoption(parser):
parser.addoption('--manager-api', action='store',
help='Manager unix socket path')
add_cql_connection_options(parser)
parser.addoption('--skip-internet-dependent-tests', action='store_true',
parser.addoption('--skip-internet-dependent-tests', action='store_true', default=False,
help='Skip tests which depend on artifacts from the internet')
parser.addoption('--artifacts_dir_url', action='store', type=str, default=None, dest='artifacts_dir_url',
help='Provide the URL to artifacts directory to generate the link to failed tests directory '
@@ -385,7 +385,7 @@ async def scylla_2025_1(request, build_mode, internet_dependency_enabled) -> Asy
yield await get_scylla_2025_1_description(build_mode)
@pytest.fixture(scope="function", params=list(KeyProvider))
async def key_provider(request, tmpdir):
async def key_provider(request, tmpdir, scylla_binary):
"""Encryption providers fixture"""
async with make_key_provider_factory(request.param, tmpdir) as res:
async with make_key_provider_factory(request.param, tmpdir, scylla_binary) as res:
yield res

View File

@@ -30,17 +30,12 @@ logger = logging.getLogger(__name__)
def pytest_addoption(parser: Parser) -> None:
parser.addoption("--use-vnodes", action="store_true", default=True, help="Determines wither or not to setup clusters using vnodes for tests")
parser.addoption("--num-tokens", action="store", default=256, help="Number of tokens to set num_tokens yaml setting to when creating instances with vnodes enabled")
parser.addoption("--experimental-features", type=lambda s: s.split(","), action="store", help="Pass experimental features <feature>,<feature> to enable")
parser.addoption("--experimental-features", type=lambda s: s.split(","), action="store", help="Pass experimental features <feature>,<feature> to enable", default=None)
parser.addoption("--tablets", action=argparse.BooleanOptionalAction, default=False, help="Whether to enable tablets support (default: %(default)s)")
parser.addoption("--force-gossip-topology-changes", action="store_true", default=False, help="force gossip topology changes in a fresh cluster")
def pytest_configure(config: Config) -> None:
logging.getLogger("cassandra").setLevel(logging.INFO)
logging.getLogger("boto3").setLevel(logging.INFO)
logging.getLogger("botocore").setLevel(logging.INFO)
logging.getLogger("s3transfer").setLevel(logging.INFO)
features = {"cdc", "raft", "consistent-cluster-management", "consistent-topology-changes"}
if experimental_features := config.getoption("--experimental-features"):
features.update(experimental_features)

View File

@@ -256,10 +256,10 @@ async def test_wrong_cipher_algorithm(manager, key_provider):
assert len(expected_errors) == len(broken_ciphers), expected_errors
@pytest.mark.parametrize(argnames="compression", argvalues=("LZ4", "Snappy", "Deflate"))
async def test_encryption_table_compression(manager, tmpdir, compression):
async def test_encryption_table_compression(manager, tmpdir, compression, scylla_binary):
"""Test compression + ear"""
logger.debug("---- Test with compression: %s -----", compression)
async with make_key_provider_factory(KeyProvider.local, tmpdir) as key_provider:
async with make_key_provider_factory(KeyProvider.local, tmpdir, scylla_binary) as key_provider:
await _smoke_test(manager, key_provider,
ciphers={"AES/CBC/PKCS5Padding": [128]},
compression=compression)

View File

@@ -3,6 +3,8 @@
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
from pathlib import Path
from test.pylib.manager_client import ManagerClient
from test.pylib.random_tables import RandomTables, Column, IntType, CounterType
from test.pylib.util import unique_name, wait_for_cql_and_get_hosts, wait_for
@@ -357,14 +359,12 @@ async def test_fence_lwt_during_bootstap(manager: ManagerClient):
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='dev mode is enough for this test')
@pytest.mark.skip_mode(mode='debug', reason='dev mode is enough for this test')
async def test_lwt_fencing_upgrade(manager: ManagerClient, scylla_2025_1: ScyllaVersionDescription):
async def test_lwt_fencing_upgrade(manager: ManagerClient, scylla_2025_1: ScyllaVersionDescription, scylla_binary: Path):
"""
The test runs some LWT workload on a vnodes-based table, rolling-restarts nodes
with a new Scylla version and checks that LWTs complete as expected. Downgrading
a single node back to original version is also covered.
"""
new_exe = os.getenv("SCYLLA")
assert new_exe
logger.info("Bootstrapping cluster")
servers = await manager.servers_add(3,
@@ -417,7 +417,7 @@ async def test_lwt_fencing_upgrade(manager: ManagerClient, scylla_2025_1: Scylla
wait_for_some_lwts()
logger.info(f"Upgrading {servers[0].server_id}")
await manager.server_change_version(servers[0].server_id, new_exe)
await manager.server_change_version(servers[0].server_id, scylla_binary)
wait_for_some_lwts()
logger.info(f"Downgrading {servers[0].server_id}")
@@ -437,7 +437,7 @@ async def test_lwt_fencing_upgrade(manager: ManagerClient, scylla_2025_1: Scylla
return True
await wait_for(all_hosts_are_alive, deadline=time.time() + 60, period=0.1)
logger.info(f"Upgrading {s.server_id}")
await manager.server_change_version(s.server_id, new_exe)
await manager.server_change_version(s.server_id, scylla_binary)
logger.info("Done upgrading servers")

View File

@@ -6,6 +6,8 @@
import os
import time
from pathlib import Path
import pytest
import asyncio
import logging
@@ -94,7 +96,7 @@ async def test_crc_check_chance_out_of_bounds(manager: ManagerClient, cfg_source
await manager.server_add(cmdline=yaml_to_cmdline(config), expected_error=expected_error)
@pytest.mark.asyncio
async def test_default_compression_on_upgrade(manager: ManagerClient, scylla_2025_1: ScyllaVersionDescription):
async def test_default_compression_on_upgrade(manager: ManagerClient, scylla_2025_1: ScyllaVersionDescription, scylla_binary: Path):
"""
Check that the default SSTable compression algorithm is:
* LZ4Compressor if SSTABLE_COMPRESSION_DICTS is disabled.
@@ -121,9 +123,6 @@ async def test_default_compression_on_upgrade(manager: ManagerClient, scylla_202
assert actual_compression == expected_compression, \
f"Expected {expected_compression} for {table_name} ({context}), got: {actual_compression}"
new_exe = os.getenv("SCYLLA")
assert new_exe
logger.info("Starting servers with version 2025.1")
servers = await manager.servers_add(2, version=scylla_2025_1)
@@ -134,13 +133,13 @@ async def test_default_compression_on_upgrade(manager: ManagerClient, scylla_202
await create_table_and_check_compression(cql, "test_ks", "table_before_upgrade", "org.apache.cassandra.io.compress.LZ4Compressor", "before upgrade")
logger.info("Upgrading server 0")
await manager.server_change_version(servers[0].server_id, new_exe)
await manager.server_change_version(servers[0].server_id, scylla_binary)
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
await create_table_and_check_compression(cql, "test_ks", "table_during_upgrade", "org.apache.cassandra.io.compress.LZ4Compressor", "during upgrade")
logger.info("Upgrading server 1")
await manager.server_change_version(servers[1].server_id, new_exe)
await manager.server_change_version(servers[1].server_id, scylla_binary)
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
logger.info("Waiting for SSTABLE_COMPRESSION_DICTS cluster feature to be enabled on all nodes")

View File

@@ -11,6 +11,7 @@ import os
import pathlib
import contextlib
import time
from test.pylib.manager_client import ManagerClient, ServerInfo
from test.pylib.rest_client import read_barrier, HTTPError
from test.pylib.scylla_cluster import ScyllaVersionDescription
@@ -23,9 +24,7 @@ from cassandra.query import SimpleStatement
logger = logging.getLogger(__name__)
async def test_upgrade_and_rollback(manager: ManagerClient, scylla_2025_1: ScyllaVersionDescription):
new_exe = os.getenv("SCYLLA")
assert new_exe
async def test_upgrade_and_rollback(manager: ManagerClient, scylla_2025_1: ScyllaVersionDescription, scylla_binary: pathlib.Path):
logger.info("Bootstrapping cluster")
servers = (await manager.servers_add(2, cmdline=[
@@ -104,7 +103,7 @@ async def test_upgrade_and_rollback(manager: ManagerClient, scylla_2025_1: Scyll
raise Exception(f'Expected HTTPError, got no exception')
logger.info("Upgrading server 0")
await manager.server_change_version(servers[0].server_id, new_exe)
await manager.server_change_version(servers[0].server_id, scylla_binary)
logger.info("Checking that new version returns 500 on retrain_dict before full upgrade")
try:
@@ -157,8 +156,8 @@ async def test_upgrade_and_rollback(manager: ManagerClient, scylla_2025_1: Scyll
logger.info("Upgrading both servers")
await asyncio.gather(
manager.server_change_version(servers[0].server_id, new_exe),
manager.server_change_version(servers[1].server_id, new_exe)
manager.server_change_version(servers[0].server_id, scylla_binary),
manager.server_change_version(servers[1].server_id, scylla_binary)
)
logger.info("Waiting for SSTABLE_COMPRESSION_DICTS cluster feature")

View File

@@ -21,7 +21,7 @@ logger = logging.getLogger(__name__)
# Checks basic functionality on the cluster with different values of the --smp parameter on the nodes.
@pytest.mark.asyncio
async def test_nodes_with_different_smp(request: FixtureRequest, manager: ManagerClient) -> None:
async def test_nodes_with_different_smp(request: FixtureRequest, manager: ManagerClient, build_mode) -> None:
# In this test it's more convenient to start with a fresh cluster.
# When the node starts it tries to communicate with others
@@ -42,11 +42,9 @@ async def test_nodes_with_different_smp(request: FixtureRequest, manager: Manage
# The test is flaky on CI in debug builds on aarch64 (#14752),
# here we sprinkle more logs for debug/aarch64
# hoping it'll help to debug it.
# The hack with xmlpath: it's set by test.py to some file with mode in the path,
# couldn't think of a better way to determine the current mode.
# This should produce ~16M of logs per Scylla node.
log_args = []
if ('/debug/' in request.config.getoption('xmlpath', '')) and ('aarch64' in platform.processor()):
if build_mode == 'debug' and ('aarch64' in platform.processor()):
log_args = [
'--default-log-level', 'debug',
'--logger-log-level', 'raft_group0=trace:group0_client=trace:storage_service=trace'

View File

@@ -272,5 +272,5 @@ def compact_storage(cql):
# Otherwise, use the provided minio server to run all S3 related tests
@pytest.fixture
def skip_s3_tests(request):
if request.config.getoption("--no-minio"):
if request.config.getoption("--no-minio", default=None):
pytest.skip("Skipping S3 related tests being run from test/cqlpy/run")

View File

@@ -44,7 +44,7 @@ class ServerAddress(NamedTuple):
async def server_address(request, testpy_test: None|Test):
# unshare(1) -rn drops us in a new network namespace in which the "lo" is
# not up yet, so let's set it up first.
if request.config.getoption('--run-within-unshare'):
if request.config.getoption('--run-within-unshare', default=False):
try:
args = "ip link set lo up".split()
subprocess.run(args, check=True)

View File

@@ -71,17 +71,15 @@ class LocalFileSystemKeyProviderFactory(KeyProviderFactory):
class ReplicatedKeyProviderFactory(KeyProviderFactory):
"""ReplicatedKeyProviderFactory proxy"""
def __init__(self, tmpdir):
def __init__(self, tmpdir, scylla_exe):
super(ReplicatedKeyProviderFactory, self).__init__(KeyProvider.replicated, tmpdir)
self.system_key_file_name = "system_key"
self.scylla_exe = scylla_exe
async def __aenter__(self):
await super().__aenter__()
scylla = os.path.abspath(os.getenv('SCYLLA'))
if not scylla:
raise RuntimeError('No scylla in environment')
args = ["local-file-key-generator", "generate", os.path.join(self.system_key_location, self.system_key_file_name)]
proc = await asyncio.create_subprocess_exec(scylla, *args, stderr=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE)
proc = await asyncio.create_subprocess_exec(self.scylla_exe, *args, stderr=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE)
_, stderr = await proc.communicate()
if proc.returncode != 0:
raise RuntimeError(f'Could not generate system key: {stderr.decode()}')
@@ -292,13 +290,13 @@ class AzureKeyProviderFactory(KeyProviderFactory):
def require_restart(self):
return True
def make_key_provider_factory(provider: KeyProvider, tmpdir):
def make_key_provider_factory(provider: KeyProvider, tmpdir, scylla_exe):
"""Create key provider factory for enum"""
res = None
if provider == KeyProvider.local:
res = LocalFileSystemKeyProviderFactory(tmpdir)
elif provider == KeyProvider.replicated:
res = ReplicatedKeyProviderFactory(tmpdir)
res = ReplicatedKeyProviderFactory(tmpdir, scylla_exe)
elif provider == KeyProvider.kmip:
res = KmipKeyProviderFactory(tmpdir)
elif provider == KeyProvider.kms:

View File

@@ -120,7 +120,7 @@ class ResourceGather(ABC):
pass
def put_process_to_cgroup(self) -> None:
os.setsid()
pass
def get_test_metrics(self) -> Metric:
pass
@@ -198,7 +198,6 @@ class ResourceGatherOn(ResourceGather):
self.sqlite_writer.write_row(metrics, METRICS_TABLE)
def put_process_to_cgroup(self) -> None:
super().put_process_to_cgroup()
try:
pid = os.getpid()
with open(self.cgroup_path / 'cgroup.procs', "a") as cgroup:

View File

@@ -11,11 +11,13 @@ import logging
import os
import pathlib
import platform
import random
import sys
from argparse import BooleanOptionalAction
from collections import defaultdict
from itertools import chain, count, product
from functools import cache, cached_property
from pathlib import Path
from random import randint
from typing import TYPE_CHECKING
@@ -150,6 +152,10 @@ async def testpy_test(request: pytest.FixtureRequest, build_mode: str) -> Test |
return await get_testpy_test(path=request.path, options=request.config.option, mode=build_mode)
return None
@pytest.fixture(scope="function")
def scylla_binary(testpy_test) -> Path:
return testpy_test.suite.scylla_exe
def pytest_collection_modifyitems(items: list[pytest.Item]) -> None:
for item in items:
@@ -263,6 +269,7 @@ def pytest_configure(config: pytest.Config) -> None:
global _pytest_config
_pytest_config = config
os.environ["TOPOLOGY_RANDOM_FAILURES_TEST_SHUFFLE_SEED"] = os.environ.get("TOPOLOGY_RANDOM_FAILURES_TEST_SHUFFLE_SEED", str(random.randint(0, sys.maxsize)))
config.build_modes = get_modes_to_run(config)
repeat = int(config.getoption("--repeat"))

View File

@@ -823,7 +823,6 @@ class ScyllaServer:
stderr=self.log_file,
stdout=self.log_file,
env=env,
preexec_fn=os.setsid,
)
if expected_server_up_state == ServerUpState.PROCESS_STARTED:

View File

@@ -304,11 +304,11 @@ def add_cql_connection_options(parser: Parser) -> None:
cql_options.addoption("--port", default="9042",
help="CQL port to connect to")
cql_options.addoption("--ssl", action="store_true",
help="Connect to CQL via an encrypted TLSv1.2 connection")
help="Connect to CQL via an encrypted TLSv1.2 connection", default=False)
cql_options.addoption("--auth_username",
help="username for authentication")
help="username for authentication", default=None)
cql_options.addoption("--auth_password",
help="password for authentication")
help="password for authentication", default=None)
# Use cache to execute this function once per pytest session.
@@ -317,9 +317,9 @@ def add_s3_options(parser: Parser) -> None:
"""Options for tests which use S3 server (i.e., cluster/object_store and cqlpy/test_tools.py)"""
s3_options = parser.getgroup("S3 server settings")
s3_options.addoption('--s3-server-address')
s3_options.addoption('--s3-server-port', type=int)
s3_options.addoption('--aws-access-key')
s3_options.addoption('--aws-secret-key')
s3_options.addoption('--aws-region')
s3_options.addoption('--s3-server-bucket')
s3_options.addoption('--s3-server-address', default=None)
s3_options.addoption('--s3-server-port', type=int, default=None)
s3_options.addoption('--aws-access-key', default=None)
s3_options.addoption('--aws-secret-key', default=None)
s3_options.addoption('--aws-region', default=None)
s3_options.addoption('--s3-server-bucket', default=None)