Merge 'test.py: Add the possibility to run boost and unit tests with pytest ' from Andrei Chekun

Add the possibility to run boost and unit tests with pytest

test.py should follow the next paradigm - the ability to run all test cases sequentially by ONE pytest command.
With this paradigm, to have the better performance, we can split this 1 command into 2,3,4,5,100,200... whatever we want

It's a new functionality that does not touch test.py way of executing the boost and unit tests.
It supports the main features of test.py way of execution: automatic discovery of modes, repeats.
There is an additional requirement to execute tests in parallel: pytest-xdist. To install it, execute `pip install pytest-xdist`

To run test with pytest execute `pytest test/boost`. To execute only one file, provide the path filename `pytest test/boost/aggregate_fcts_test.cc` since it's a normal path, autocompletion will work on the terminal. To provide a specific mode, use the next parameter `--mode dev`, if parameter will not be provided pytest will try to use `ninja mode_list` to find out the compiled modes.
Parallel execution controlled by pyest-xdist and the parameter `-n 12`.
The useful command to discover the tests in the file or directory is `pytest --collect-only -q --mode dev test/boost/aggregate_fcts_test.cc`. That will return all test functions in the file. To execute only one function from the test, you can invoke the output from the previous command, but suffix for mode should be skipped, for example output will be `test/boost/aggregate_fcts_test.cc::test_aggregate_avg.dev`, so to execute this specific test function, please use the next command `pytest --mode dev test/boost/aggregate_fcts_test.cc::test_aggregate_avg`
There is a parameter `--repeat` that used to repeat the test case several times in the same way as test.py did.
It's not possible to run both boost and unit tests directories with one command, so we need to provide explicitly which directory should be executed. Like this `pytest --mode dev test/unit` or `pytest --mode dev test/boost`

Fixes: https://github.com/scylladb/qa-tasks/issues/1775

Closes scylladb/scylladb#21108

* github.com:scylladb/scylladb:
  test.py: Add possibility to run ldap tests from pytest
  test.py: Add the possibility to run unit tests from pytest
  test.py: Add the possibility to run boost test from pytest
  test.py: Add discovery for C++ tests for pytest
  test.py: Modify s3 server mock
  test.py: Add method to get environment variables from MinIO wrapper
  test.py: Move get configured modes to common lib
This commit is contained in:
Nadav Har'El
2025-02-09 11:56:24 +01:00
29 changed files with 1411 additions and 199 deletions

184
test.py
View File

@@ -37,12 +37,13 @@ from abc import ABC, abstractmethod
from io import StringIO
from scripts import coverage # type: ignore
from test.pylib.artifact_registry import ArtifactRegistry
from test.pylib.cpp.ldap.prepare_instance import try_something_backoff, can_connect, setup
from test.pylib.host_registry import HostRegistry
from test.pylib.pool import Pool
from test.pylib.s3_proxy import S3ProxyServer
from test.pylib.s3_server_mock import MockS3Server
from test.pylib.resource_gather import setup_cgroup, run_resource_watcher, get_resource_gather
from test.pylib.util import LogPrefixAdapter
from test.pylib.util import LogPrefixAdapter, get_configured_modes, ninja
from test.pylib.scylla_cluster import ScyllaServer, ScyllaCluster, get_cluster_manager, merge_cmdline_options
from test.pylib.minio_server import MinioServer
from typing import Dict, List, Callable, Any, Iterable, Optional, Awaitable, Union
@@ -62,72 +63,6 @@ all_modes = {'debug': 'Debug',
'coverage': 'Coverage'}
debug_modes = {'debug', 'sanitize'}
LDAP_SERVER_CONFIGURATION_FILE = os.path.join(os.path.dirname(__file__), 'test', 'resource', 'slapd.conf')
DEFAULT_ENTRIES = [
"""dn: dc=example,dc=com
objectClass: dcObject
objectClass: organization
dc: example
o: Example
description: Example directory.
""",
"""dn: cn=root,dc=example,dc=com
objectClass: organizationalRole
cn: root
description: Directory manager.
""",
"""dn: ou=People,dc=example,dc=com
objectClass: organizationalUnit
ou: People
description: Our people.
""",
"""# Default superuser for Scylla
dn: uid=cassandra,ou=People,dc=example,dc=com
objectClass: organizationalPerson
objectClass: uidObject
cn: cassandra
ou: People
sn: cassandra
userid: cassandra
userPassword: cassandra
""",
"""dn: uid=jsmith,ou=People,dc=example,dc=com
objectClass: organizationalPerson
objectClass: uidObject
cn: Joe Smith
ou: People
sn: Smith
userid: jsmith
userPassword: joeisgreat
""",
"""dn: uid=jdoe,ou=People,dc=example,dc=com
objectClass: organizationalPerson
objectClass: uidObject
cn: John Doe
ou: People
sn: Doe
userid: jdoe
userPassword: pa55w0rd
""",
"""dn: cn=role1,dc=example,dc=com
objectClass: groupOfUniqueNames
cn: role1
uniqueMember: uid=jsmith,ou=People,dc=example,dc=com
uniqueMember: uid=cassandra,ou=People,dc=example,dc=com
""",
"""dn: cn=role2,dc=example,dc=com
objectClass: groupOfUniqueNames
cn: role2
uniqueMember: uid=cassandra,ou=People,dc=example,dc=com
""",
"""dn: cn=role3,dc=example,dc=com
objectClass: groupOfUniqueNames
cn: role3
uniqueMember: uid=jdoe,ou=People,dc=example,dc=com
""",
]
def create_formatter(*decorators) -> Callable[[Any], str]:
"""Return a function which decorates its argument with the given
@@ -167,15 +102,6 @@ def path_to(mode, *components):
return os.path.join(build_dir, mode, *components)
def ninja(target):
"""Build specified target using ninja"""
build_dir = 'build'
args = ['ninja', target]
if os.path.exists(os.path.join(build_dir, 'build.ninja')):
args = ['ninja', '-C', build_dir, target]
return subprocess.Popen(args, stdout=subprocess.PIPE).communicate()[0].decode()
class TestSuite(ABC):
"""A test suite is a folder with tests of the same type.
E.g. it can be unit tests, boost tests, or CQL tests."""
@@ -984,37 +910,6 @@ class BoostTest(Test):
print(read_log(self.log_filename))
def can_connect(address, family=socket.AF_INET):
s = socket.socket(family)
try:
s.connect(address)
return True
except OSError as e:
if 'AF_UNIX path too long' in str(e):
raise OSError(e.errno, "{} ({})".format(str(e), address)) from None
else:
return False
except:
return False
def try_something_backoff(something):
sleep_time = 0.05
while not something():
if sleep_time > 30:
return False
time.sleep(sleep_time)
sleep_time *= 2
return True
def make_saslauthd_conf(port, instance_path):
"""Creates saslauthd.conf with appropriate contents under instance_path. Returns the path to the new file."""
saslauthd_conf_path = os.path.join(instance_path, 'saslauthd.conf')
with open(saslauthd_conf_path, 'w') as f:
f.write('ldap_servers: ldap://localhost:{}\nldap_search_base: dc=example,dc=com'.format(port))
return saslauthd_conf_path
class LdapTest(BoostTest):
"""A unit test which can produce its own XML output, and needs an ldap server"""
@@ -1022,73 +917,10 @@ class LdapTest(BoostTest):
super().__init__(test_no, shortname, args, suite, None, False, None)
async def setup(self, port, options):
instances_root = os.path.join(options.tmpdir, self.mode, 'ldap_instances');
instance_path = os.path.join(os.path.abspath(instances_root), str(port))
slapd_pid_file = os.path.join(instance_path, 'slapd.pid')
saslauthd_socket_path = TemporaryDirectory()
os.makedirs(instance_path, exist_ok=True)
# This will always fail because it lacks the permissions to read the default slapd data
# folder but it does create the instance folder so we don't want to fail here.
try:
subprocess.check_output(['slaptest', '-f', LDAP_SERVER_CONFIGURATION_FILE, '-F', instance_path],
stderr=subprocess.DEVNULL)
except:
pass
# Set up failure injection.
proxy_name = 'p{}'.format(port)
subprocess.check_output([
'toxiproxy-cli', 'c', proxy_name,
'--listen', 'localhost:{}'.format(port + 2), '--upstream', 'localhost:{}'.format(port)])
# Sever the connection after byte_limit bytes have passed through:
instances_root = pathlib.Path(options.tmpdir) / self.mode / 'ldap_instances'
byte_limit = options.byte_limit if options.byte_limit else randint(0, 2000)
subprocess.check_output(['toxiproxy-cli', 't', 'a', proxy_name, '-t', 'limit_data', '-n', 'limiter',
'-a', 'bytes={}'.format(byte_limit)])
# Change the data folder in the default config.
replace_expression = 's/olcDbDirectory:.*/olcDbDirectory: {}/g'.format(
os.path.abspath(instance_path).replace('/', r'\/'))
subprocess.check_output(
['find', instance_path, '-type', 'f', '-exec', 'sed', '-i', replace_expression, '{}', ';'])
# Change the pid file to be kept with the instance.
replace_expression = 's/olcPidFile:.*/olcPidFile: {}/g'.format(
os.path.abspath(slapd_pid_file).replace('/', r'\/'))
subprocess.check_output(
['find', instance_path, '-type', 'f', '-exec', 'sed', '-i', replace_expression, '{}', ';'])
# Put the test data in.
cmd = ['slapadd', '-F', instance_path]
subprocess.check_output(
cmd, input='\n\n'.join(DEFAULT_ENTRIES).encode('ascii'), stderr=subprocess.STDOUT)
# Set up the server.
SLAPD_URLS='ldap://:{}/ ldaps://:{}/'.format(port, port + 1)
def can_connect_to_slapd():
return can_connect(('127.0.0.1', port)) and can_connect(('127.0.0.1', port + 1)) and can_connect(('127.0.0.1', port + 2))
def can_connect_to_saslauthd():
return can_connect(os.path.join(saslauthd_socket_path.name, 'mux'), socket.AF_UNIX)
slapd_proc = subprocess.Popen(['prlimit', '-n1024', 'slapd', '-F', instance_path, '-h', SLAPD_URLS, '-d', '0'])
saslauthd_conf_path = make_saslauthd_conf(port, instance_path)
test_env = {
"SEASTAR_LDAP_PORT" : str(port),
"SASLAUTHD_MUX_PATH" : os.path.join(saslauthd_socket_path.name, "mux")
}
saslauthd_proc = subprocess.Popen(
['saslauthd', '-d', '-n', '1', '-a', 'ldap', '-O', saslauthd_conf_path, '-m', saslauthd_socket_path.name],
stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT)
def finalize():
slapd_proc.terminate()
slapd_proc.wait() # Wait for slapd to remove slapd.pid, so it doesn't race with rmtree below.
saslauthd_proc.kill() # Somehow, invoking terminate() here also terminates toxiproxy-server. o_O
shutil.rmtree(instance_path)
saslauthd_socket_path.cleanup()
subprocess.check_output(['toxiproxy-cli', 'd', proxy_name])
try:
if not try_something_backoff(can_connect_to_slapd):
raise Exception('Unable to connect to slapd')
if not try_something_backoff(can_connect_to_saslauthd):
raise Exception('Unable to connect to saslauthd')
except:
finalize()
raise
return finalize, '--byte-limit={}'.format(byte_limit), test_env
project_root = pathlib.Path(os.path.dirname(__file__))
return setup(project_root=project_root, port=port, instance_root=instances_root, byte_limit=byte_limit)
class CQLApprovalTest(Test):
@@ -1786,11 +1618,7 @@ def parse_cmd_line() -> argparse.Namespace:
if not args.modes:
try:
out = ninja('mode_list')
# [1/1] List configured modes
# debug release dev
args.modes = re.sub(r'.* List configured modes\n(.*)\n', r'\1',
out, count=1, flags=re.DOTALL).split("\n")[-1].split(' ')
args.modes = get_configured_modes()
except Exception:
print(palette.fail("Failed to read output of `ninja mode_list`: please run ./configure.py first"))
raise

View File

@@ -68,6 +68,43 @@ Boost tests can also be run using test.py - which is a script that provides
a uniform way to run all tests in scylladb.git - C++ tests, Python tests,
etc.
## Execution with pytest
To run all tests with pytest execute
```bash
pytest test/boost
```
To execute all tests in one file, provide the path to the source filename as a parameter
```bash
pytest test/boost/aggregate_fcts_test.cc
```
Since it's a normal path, autocompletion works in the terminal out of the box.
To execute only one test function, provide the path to the source file and function name
```bash
pytest --mode dev test/boost/aggregate_fcts_test.cc::test_aggregate_avg
```
To provide a specific mode, use the next parameter `--mode dev`,
if parameter isn't provided pytest tries to use `ninja mode_list` to find out the compiled modes.
Parallel execution is controlled by `pytest-xdist` and the parameter `-n auto`.
This command starts tests with the number of workers equal to CPU cores.
The useful command to discover the tests in the file or directory is
```bash
pytest --collect-only -q --mode dev test/boost/aggregate_fcts_test.cc
```
That will return all test functions in the file.
To execute only one function from the test, you can invoke the output from the previous command.
However, suffix for mode should be skipped.
For example,
output shows in the terminal something like this `test/boost/aggregate_fcts_test.cc::test_aggregate_avg.dev`.
So to execute this specific test function, please use the next command
```bash
pytest --mode dev test/boost/aggregate_fcts_test.cc::test_aggregate_avg
```
# Writing tests
Because of the large build time and build size of each separate test

0
test/boost/__init__.py Normal file
View File

48
test/boost/conftest.py Normal file
View File

@@ -0,0 +1,48 @@
#
# Copyright (C) 2024-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import os
import sys
from pathlib import Path, PosixPath
import pytest
from pytest import Collector
from test.pylib.cpp.boost.boost_facade import BoostTestFacade, COMBINED_TESTS
from test.pylib.cpp.boost.prepare_instance import get_env_manager
from test.pylib.cpp.common_cpp_conftest import collect_items, get_combined_tests
from test.pylib.util import get_modes_to_run
def pytest_collect_file(file_path: PosixPath, parent: Collector):
"""
Method triggered automatically by pytest to collect files from a directory. Boost and unit have the same logic for
collection, the only difference in execution, and it's covered by facade
"""
# One of the files in the directory has additional extensions .inc. It's not a test and will not have a binary for
# execution, so it should be excluded from collecting
if file_path.suffix == '.cc' and '.inc' not in file_path.suffixes and file_path.stem != COMBINED_TESTS.stem:
return collect_items(file_path, parent, facade=BoostTestFacade(parent.config, get_combined_tests(parent.session)))
@pytest.hookimpl(wrapper=True)
def pytest_runtestloop(session):
"""
https://docs.pytest.org/en/stable/reference/reference.html#pytest.hookspec.pytest_runtestloop
This hook is needed to start the Minio and S3 mock servers before tests. After starting the servers, the default
pytest's runtestloop takes control. Finally part is responsible for stopping servers regardless of failure in the tests.
"""
if session.config.getoption('collectonly'):
yield
return
temp_dir = Path(session.config.rootpath, '..', session.config.getoption('tmpdir'))
modes = get_modes_to_run(session)
is_worker = False
if 'xdist' in sys.modules:
is_worker = sys.modules['xdist'].is_xdist_worker(session)
with get_env_manager(temp_dir, is_worker, modes):
yield

View File

@@ -1,3 +1,9 @@
#
# Copyright (C) 2024-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import pytest
from test.pylib.report_plugin import ReportPlugin
@@ -9,7 +15,7 @@ ALL_MODES = {'debug': 'Debug',
'coverage': 'Coverage'}
def pytest_addoption(parser):
parser.addoption('--mode', choices=ALL_MODES.keys(), dest="mode",
parser.addoption('--mode', choices=ALL_MODES.keys(), action="append", dest="modes",
help="Run only tests for given build mode(s)")
parser.addoption('--tmpdir', action='store', default='testlog', help='''Path to temporary test data and log files. The data is
further segregated per build mode. Default: ./testlog.''', )
@@ -20,8 +26,43 @@ def pytest_addoption(parser):
def build_mode(request):
"""
This fixture returns current build mode.
This is for running tests through the test.py script, where only one mode is passed to the test
"""
return request.config.getoption('mode')
# to avoid issues when there's no provided mode parameter, do it in two steps: get the parameter and if it's not
# None, get the first value from the list
mode = request.config.getoption("modes")
if mode:
return mode[0]
return mode
def pytest_configure(config):
config.pluginmanager.register(ReportPlugin())
config.pluginmanager.register(ReportPlugin())
def pytest_collection_modifyitems(config, items):
"""
This is a standard pytest method.
This is needed to modify the test names with dev mode and run id to differ them one from another
"""
run_id = config.getoption('run_id', None)
for item in items:
# check if this is custom cpp tests that have additional attributes for name modification
if hasattr(item, 'mode'):
# modify name with mode that is always present in cpp tests
item.nodeid = f'{item.nodeid}.{item.mode}'
item.name = f'{item.name}.{item.mode}'
if item.run_id:
item.nodeid = f'{item.nodeid}.{item.run_id}'
item.name = f'{item.name}.{item.run_id}'
else:
# here go python tests that are executed through test.py
# since test.py is responsible for creating several tests with the required mode,
# a list with modes contains only one value,
# that's why in name modification the first element is used
modes = config.getoption('modes')
if modes:
item._nodeid = f'{item._nodeid}.{modes[0]}'
item.name = f'{item.name}.{modes[0]}'
if run_id:
item._nodeid = f'{item._nodeid}.{run_id}'
item.name = f'{item.name}.{run_id}'

0
test/ldap/__init__.py Normal file
View File

54
test/ldap/conftest.py Normal file
View File

@@ -0,0 +1,54 @@
#
# Copyright (C) 2025-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import sys
from pathlib import PosixPath
from random import randint
import pytest
from pytest import Collector
from test.pylib.cpp.boost.boost_facade import BoostTestFacade
from test.pylib.cpp.ldap.prepare_instance import get_env_manager
from test.pylib.cpp.common_cpp_conftest import collect_items, get_modes_to_run, get_root_path, get_combined_tests
def pytest_addoption(parser):
parser.addoption('--byte-limit', action="store", default=None, type=int,
help="Specific byte limit for failure injection (random by default)")
def pytest_collect_file(file_path: PosixPath, parent: Collector):
"""
Method triggered automatically by pytest to collect files from a directory.
These tests can use BoostFacade since they're Boost tests located in different directory.
"""
if file_path.suffix == '.cc':
return collect_items(file_path, parent, facade=BoostTestFacade(parent.config))
@pytest.hookimpl(wrapper=True)
def pytest_runtestloop(session):
"""
https://docs.pytest.org/en/stable/reference/reference.html#pytest.hookspec.pytest_runtestloop
This hook is needed to start the Minio and S3 mock servers before tests. After starting the servers, the default
pytest's runtestloop takes control. Finally part is responsible for stopping servers regardless of failure in the tests.
"""
# make a collection only without starting unnecessary services
if session.config.getoption('collectonly'):
yield
return
root_dir = get_root_path(session)
temp_dir = root_dir / session.config.getoption('tmpdir')
try:
byte_limit = session.config.getoption('byte-limit')
except ValueError:
byte_limit = randint(0, 2000)
modes = get_modes_to_run(session)
worker_id = 'master'
if 'xdist' in sys.modules:
worker_id = sys.modules['xdist'].get_xdist_worker_id(session)
with get_env_manager(root_dir, temp_dir, worker_id, modes, byte_limit):
yield

View File

View File

View File

@@ -0,0 +1,186 @@
#
# Copyright (c) 2014 Bruno Oliveira
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
# Copyright (C) 2025-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
from __future__ import annotations
import io
import os
import subprocess
from collections.abc import Sequence
from pathlib import Path
from xml.etree import ElementTree
from test.pylib.cpp.facade import CppTestFacade, CppTestFailure, run_process
TIMEOUT_DEBUG = 60 * 5 # seconds
TIMEOUT = 60 * 2 # seconds
COMBINED_TESTS = Path('build', 'dev', 'test', 'boost', 'combined_tests')
class BoostTestFacade(CppTestFacade):
"""
Facade for BoostTests that's responsible for discovering test functions and executing them correctly.
"""
def list_tests(
self,
executable: Path,
no_parallel: bool,
) -> tuple[bool, list[str]]:
"""
Return a boolean value indicating whether the tests combined or not and the list of tests
"""
if no_parallel:
return False, [os.path.basename(os.path.splitext(executable)[0])]
else:
if not os.path.isfile(executable):
return True, self.combined_suites[executable.stem]
args = [executable, '--list_content']
try:
output = subprocess.check_output(
args,
stderr=subprocess.STDOUT,
universal_newlines=True,
)
except subprocess.CalledProcessError as e:
output = e.output
# --list_content produces the list of all test cases in the file. When BOOST_DATA_TEST_CASE is used it
# additionally produce the lines with numbers for each case preserving the function name like this:
# test_singular_tree_ptr_sz*
# _0*
# _1*
# _2*
# however, it's only possible to run test_singular_tree_ptr_sz that executes all test cases
# this line catches only test function name ignoring unrelated lines like '_0'
# Note: this ignores any test case starting with a '_' symbol
return False, [case[:-1] for case in output.splitlines() if
case.endswith('*') and not case.strip().startswith('_')]
def run_test(
self,
executable: Path,
original_name: str,
test_name: str,
mode: str,
file_name: Path,
test_args:Sequence[str] = (),
) -> tuple[list[CppTestFailure], str] | tuple[None, str]:
def read_file(name: Path) -> str:
try:
with io.open(name) as f:
return f.read()
except IOError:
return ''
timeout = TIMEOUT_DEBUG if mode=='debug' else TIMEOUT
root_log_dir = self.temp_dir / mode / 'pytest'
log_xml = root_log_dir / f"{test_name}.log"
stdout_file_path = root_log_dir/ f"{test_name}_stdout.log"
stderr_file_path = root_log_dir / f"{test_name}_stderr.log"
report_xml = root_log_dir / f"{test_name}.xml"
args = [ str(executable),
'--output_format=XML',
f"--report_sink={report_xml}",
f"--log_sink={log_xml}",
'--catch_system_errors=no',
'--color_output=false',
]
if original_name != Path(executable).stem:
if executable.stem == COMBINED_TESTS.stem:
args.append(f"--run_test={file_name.stem}/{original_name}")
else:
args.append(f"--run_test={original_name}")
# Tests are written in the way that everything after '--' passes to the test itself rather than to the test framework
args.append('--')
args.extend(test_args)
os.chdir(self.temp_dir.parent)
p, stderr, stdout = run_process(args, timeout)
with open(stdout_file_path, 'w') as fd:
fd.write(stdout)
with open(stderr_file_path, 'w') as fd:
fd.write(stderr)
log = read_file(log_xml)
report = read_file(report_xml)
results = self._parse_log(log=log)
if p.returncode != 0:
msg = (
'working_dir: {working_dir}\n'
'Internal Error: calling {executable} '
'for test {test_id} failed (return_code={return_code}):\n'
'output file:{stdout}\n'
'std error file:{stderr}\n'
'log:{log}\n'
'report:{report}\n'
'command to repeat:{command}'
)
failure = CppTestFailure(
file_name.name,
line_num=results[0].line_num,
contents=msg.format(
working_dir=os.getcwd(),
executable=executable,
test_id=test_name,
stdout=stdout_file_path.absolute(),
stderr=stderr_file_path.absolute(),
log=log,
report=report,
command=' '.join(p.args),
return_code=p.returncode,
),
)
return [failure], stdout
if results:
return results, stdout
return None, stdout
def _parse_log(self, log: str) -> list[CppTestFailure]:
"""
Parse the 'log' section produced by BoostTest.
This is always an XML file, and from this it's possible to parse most of the
failures possible when running BoostTest.
"""
parsed_elements = []
log_root = ElementTree.fromstring(log)
if log_root is not None:
parsed_elements.extend(log_root.findall('Exception'))
parsed_elements.extend(log_root.findall('Error'))
parsed_elements.extend(log_root.findall('FatalError'))
result = []
for elem in parsed_elements:
last_checkpoint = elem.find('LastCheckpoint')
if last_checkpoint:
elem = last_checkpoint
file_name = elem.attrib['file']
line_num = int(elem.attrib['line'])
result.append(CppTestFailure(file_name, line_num, elem.text or ''))
return result

View File

@@ -0,0 +1,162 @@
#
# Copyright (C) 2025-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
from __future__ import annotations
import asyncio
import logging
import os
import shutil
import time
from contextlib import contextmanager
from pathlib import Path
from typing import Any, Generator
from test.pylib.host_registry import HostRegistry
from test.pylib.minio_server import MinioServer
from test.pylib.s3_proxy import S3ProxyServer
from test.pylib.s3_server_mock import MockS3Server
from test.pylib.util import LogPrefixAdapter
class PrepareChildProcessEnv:
"""
Class responsible to get environment variables from the main thread through the shared file and set them for the process
"""
def __init__(self, env_file: Path):
self.env_file = env_file
def prepare(self) -> None:
"""
Read the environment variables for S3 and MinIO from the file and set them for the process.
"""
timeout = 10
start_time = time.time()
sleep_for = 0.01
while True:
if os.path.exists(self.env_file):
with open(self.env_file, 'r') as file:
for line in file.readlines():
key, value = line.strip().split('=', 1)
os.environ[key] = value
break
if time.time() - start_time > timeout:
raise TimeoutError(f"Timeout waiting for file {self.env_file}")
# Sleep needed to wait when the controller will create a file with environment variables.
# Without sleep checking of the file existence will be too fast,
# so it will finish before the file is created
time.sleep(sleep_for)
sleep_for *=sleep_for
def cleanup(self) -> None:
"""
Fake method to have the same interfaces with Controller class.
"""
pass
def __enter__(self):
self.prepare()
def __exit__(self, exc_type, exc_val, exc_tb):
self.cleanup()
class PrepareMainProcessEnv:
"""
A class responsible for starting additional services needed by tests.
It starts up a Minio server and an S3 mock server.
The environment settings are saved to a file for later consumption by child processes.
Class ensures that the necessary subdirectories exist or clean it if it exists
"""
def __init__(self, temp_dir: Path, modes: list[str], env_file: Path):
self.temp_dir = temp_dir
pytest_dirs = [self.temp_dir / mode / 'pytest' for mode in modes]
for directory in [self.temp_dir, *pytest_dirs]:
if not directory.exists():
os.makedirs(directory, exist_ok=True)
else:
shutil.rmtree(directory)
self.env_file = env_file
hosts = HostRegistry()
self.loop = asyncio.new_event_loop()
address_minio = self.loop.run_until_complete(hosts.lease_host())
address_s3_mock = self.loop.run_until_complete(hosts.lease_host())
self.address_s3_proxy = self.loop.run_until_complete(hosts.lease_host())
self.minio = MinioServer(self.temp_dir, address_minio, LogPrefixAdapter(logging.getLogger('minio'), {'prefix': 'minio'}))
self.mock_s3 = MockS3Server(address_s3_mock, 2012,
LogPrefixAdapter(logging.getLogger('s3_mock'), {'prefix': 's3_mock'}))
# S3 proxy initialized later because it needs to know Minis address and port that will be available only after
# Minio will start
self.proxy_s3 = None
def prepare(self) -> None:
"""
Start the S3 mock server and MinIO for the tests.
Create a file with environment variables for connecting to them.
"""
tasks = [
self.loop.create_task(self.minio.start()),
self.loop.create_task(self.mock_s3.start()),
]
self.loop.run_until_complete(asyncio.gather(*tasks))
envs = self.minio.get_envs_settings()
envs.update(self.mock_s3.get_envs_settings())
minio_uri = "http://" + envs[self.minio.ENV_ADDRESS] + ":" + envs[self.minio.ENV_PORT]
self.proxy_s3 = S3ProxyServer(self.address_s3_proxy, 9002, minio_uri, 3, int(time.time()),
LogPrefixAdapter(logging.getLogger('s3_proxy'), {'prefix': 's3_proxy'}))
self.loop.run_until_complete(self.proxy_s3.start())
envs.update(self.proxy_s3.get_envs_settings())
with open(self.env_file, 'w') as file:
for key, value in envs.items():
file.write(f"{key}={value}\n")
def cleanup(self) -> None:
"""
Stop the S3 mock server and MinIO
Remove the file with environment variables to not mess for consecutive runs.
"""
tasks = [
self.loop.create_task(self.minio.stop()),
self.loop.create_task(self.mock_s3.stop()),
self.loop.create_task(self.proxy_s3.stop()),
]
self.loop.run_until_complete(asyncio.gather(*tasks))
if os.path.exists(self.env_file):
self.env_file.unlink()
def __enter__(self):
try:
self.prepare()
except Exception:
self.cleanup()
raise
def __exit__(self, exc_type, exc_val, exc_tb):
self.cleanup()
@contextmanager
def get_env_manager(temp_dir: Path, is_worker: bool, modes: list[str]) -> Generator[None, Any, None]:
"""
xdist helps to execute test in parallel.
For that purpose it creates one main controller and workers.
Pytest itself doesn't know if it's a worker or controller, so it will execute all fixtures and methods.
Tests need S3 mock server and minio to start only once for the whole run, since they can share the one instance and
share the environment variables with workers.
So the part of starting the servers executes on non-workers' processes.
That means when xdist isn't used, servers start as intended in the main process.
Tests on workers should know the endpoints of the servers, so the controller prepares this information.
According classes responsible for configuration controller and workers.
"""
env_file = Path(f"{temp_dir}/services_envs").absolute()
if is_worker:
with PrepareChildProcessEnv(env_file):
yield
else:
with PrepareMainProcessEnv(temp_dir, modes, env_file):
yield

View File

@@ -0,0 +1,131 @@
#
# Copyright (C) 2025-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import collections
import subprocess
from copy import copy
from functools import cache
from pathlib import Path, PosixPath
import yaml
from pytest import Collector
from test.pylib.cpp.boost.boost_facade import COMBINED_TESTS
from test.pylib.cpp.facade import CppTestFacade
from test.pylib.cpp.item import CppFile
from test.pylib.util import get_modes_to_run
ALL_MODES = {
'debug': 'Debug',
'release': 'RelWithDebInfo',
'dev': 'Dev',
'sanitize': 'Sanitize',
'coverage': 'Coverage',
}
DEBUG_MODES = {
'debug': 'Debug',
'sanitize': 'Sanitize',
}
DEFAULT_ARGS = [
'--overprovisioned',
'--unsafe-bypass-fsync 1',
'--kernel-page-cache 1',
'--blocked-reactor-notify-ms 2000000',
'--collectd 0',
'--max-networking-io-control-blocks=100',
]
def get_disabled_tests(config: dict, modes: list[str]) -> dict[str, set[str]]:
"""
Get the dict with disabled tests.
Pytest spawns one process, so all modes should be handled there instead one by one as test.py does.
"""
disabled_tests = {}
for mode in modes:
# Skip tests disabled in suite.yaml
disabled_tests_for_mode = set(config.get('disable', []))
# Skip tests disabled in the specific mode.
disabled_tests_for_mode.update(config.get('skip_in_' + mode, []))
# If this mode is one of the debug modes, and there are
# tests disabled in a debug mode, add these tests to the skip list.
if mode in DEBUG_MODES:
disabled_tests_for_mode.update(config.get('skip_in_debug_modes', []))
# If a test is listed in run_in_<mode>, it should only be enabled in
# this mode. Tests not listed in any run_in_<mode> directive should
# run in all modes. Inverting this, we should disable all tests
# that are listed explicitly in some run_in_<m> where m != mode
# This, of course, may create ambiguity with skip_* settings,
# since the priority of the two is undefined, but oh well.
run_in_m = set(config.get('run_in_' + mode, []))
for a in ALL_MODES:
if a == mode:
continue
skip_in_m = set(config.get('run_in_' + a, []))
disabled_tests_for_mode.update(skip_in_m - run_in_m)
disabled_tests[mode] = disabled_tests_for_mode
return disabled_tests
def read_suite_config(directory: Path) -> dict[str, str]:
"""
Helper method that will return the configuration from the suite.yaml file
"""
with open(directory / 'suite.yaml', 'r') as cfg_file:
cfg = yaml.safe_load(cfg_file.read())
if not isinstance(cfg, dict):
raise RuntimeError('Failed to load tests: suite.yaml is empty')
return cfg
def get_root_path(session) -> Path:
return Path(session.config.rootpath).parent
def collect_items(file_path: PosixPath, parent: Collector, facade: CppTestFacade) -> object:
"""
Collect c++ test based on the .cc files. C++ test binaries are located in different directory, so the method will take care
to provide the correct path to the binary based on the file name and mode.
"""
run_id = parent.config.getoption('run_id')
modes = get_modes_to_run(parent.session)
project_root = Path(parent.session.config.rootpath).parent
suite_config = read_suite_config(file_path.parent)
no_parallel_cases = suite_config.get('no_parallel_cases', [])
disabled_tests = get_disabled_tests(suite_config, modes)
args = copy(DEFAULT_ARGS)
custom_args_config = suite_config.get('custom_args', {})
test_name = file_path.stem
no_parallel_run = True if test_name in no_parallel_cases else False
custom_args = custom_args_config.get(file_path.stem, ['-c2 -m2G'])
if len(custom_args) > 1:
return CppFile.from_parent(parent=parent, path=file_path, arguments=args, parameters=custom_args,
no_parallel_run=no_parallel_run, modes=modes, disabled_tests=disabled_tests,
run_id=run_id, facade=facade, project_root=project_root)
else:
args.extend(custom_args)
return CppFile.from_parent(parent=parent, path=file_path, arguments=args, no_parallel_run=no_parallel_run,
modes=modes, disabled_tests=disabled_tests, run_id=run_id, facade=facade, project_root=project_root)
@cache
def get_combined_tests(session):
suites = collections.defaultdict()
executable = get_root_path(session) / COMBINED_TESTS
args = [executable, '--list_content']
output = subprocess.check_output(
args,
stderr=subprocess.STDOUT,
universal_newlines=True,
)
current_suite = ''
for line in output.splitlines():
if not line.startswith(' '):
current_suite = line.strip().rstrip('*')
suites[current_suite] = []
else:
case_name = line.strip().rstrip('*')
suites[current_suite].append(case_name)
return suites

80
test/pylib/cpp/facade.py Normal file
View File

@@ -0,0 +1,80 @@
#
# Copyright (c) 2014 Bruno Oliveira
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
# Copyright (C) 2025-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
from __future__ import annotations
import shlex
import subprocess
from abc import ABC
from pathlib import Path
from subprocess import TimeoutExpired
from typing import Sequence
from pytest import Config
class CppTestFailure(Exception):
def __init__(self, filename: str, line_num: int, contents: str) -> None:
self.filename = filename
self.line_num = line_num
self.lines = contents.splitlines()
def get_lines(self) -> list[tuple[str, tuple[str, ...]]]:
m = ("red", "bold")
return [(x, m) for x in self.lines]
def get_file_reference(self) -> tuple[str, int]:
return self.filename, self.line_num
class CppTestFailureList(Exception):
def __init__(self, failures: Sequence[CppTestFailure]) -> None:
self.failures = list(failures)
class CppTestFacade(ABC):
def __init__(self, config: Config, combined_tests: dict[str, list[str]] = None):
self.temp_dir: Path = Path(config.getoption('tmpdir'))
self.combined_suites: dict[str, list[str]] = combined_tests
def list_tests(self, executable: Path , no_parallel: bool) -> tuple[bool,list[str]]:
raise NotImplementedError
def run_test(self, executable: Path, original_name: str, test_id: str, mode:str, file_name: Path, test_args: Sequence[str] = ()) -> tuple[Sequence[CppTestFailure] | None, str]:
raise NotImplementedError
def run_process(args: list[str], timeout):
args = shlex.split(' '.join(args))
p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
try:
stdout, stderr = p.communicate(timeout=timeout)
except TimeoutExpired:
print('Timeout reached')
p.kill()
stdout = p.stdout.read()
stderr = p.stderr.read()
except KeyboardInterrupt:
p.kill()
raise
return p, stderr, stdout

156
test/pylib/cpp/item.py Normal file
View File

@@ -0,0 +1,156 @@
#
# Copyright (c) 2014 Bruno Oliveira
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
# Copyright (C) 2025-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
from __future__ import annotations
from pathlib import Path
from typing import Sequence, Any, Iterator
import pytest
from _pytest._code.code import TerminalRepr, ReprFileLocation
from _pytest._io import TerminalWriter
from test.pylib.cpp.boost.boost_facade import COMBINED_TESTS
from test.pylib.cpp.facade import CppTestFailure, CppTestFailureList, CppTestFacade
class CppFailureRepr(object):
failure_sep = "---"
def __init__(self, failures: Sequence[CppTestFailure]) -> None:
self.failures = failures
def __str__(self) -> str:
reprs = []
for failure in self.failures:
pure_lines = "\n".join(x[0] for x in failure.get_lines())
repr_loc = self._get_repr_file_location(failure)
reprs.append("%s\n%s" % (pure_lines, repr_loc))
return self.failure_sep.join(reprs)
def _get_repr_file_location(self, failure: CppTestFailure) -> ReprFileLocation:
filename, line_num = failure.get_file_reference()
return ReprFileLocation(filename, line_num, "C++ failure")
def toterminal(self, tw: TerminalWriter) -> None:
for index, failure in enumerate(self.failures):
for line, markup in failure.get_lines():
markup_params = {m: True for m in markup}
tw.line(line, **markup_params)
location = self._get_repr_file_location(failure)
location.toterminal(tw)
if index != len(self.failures) - 1:
tw.line(self.failure_sep, cyan=True)
class CppTestFunction(pytest.Item):
"""
Represents a single test function in the file.
"""
facade = None
def __init__(self, *, executable: Path, facade: CppTestFacade, mode: str, test_unique_name: str, arguments: Sequence[str],
file_name: Path, run_id:int = None, **kwargs: Any) -> None:
super().__init__(**kwargs)
self.facade = facade
self.executable = executable
self.mode = mode
self.file_name = file_name
self.originalname = kwargs['name']
self.test_unique_name = test_unique_name
self._arguments = arguments
self.run_id = run_id
self.fixturenames = []
self.own_markers = []
self.add_marker(pytest.mark.cpp)
@property
def nodeid(self) -> str:
return self._nodeid
@nodeid.setter
def nodeid(self, nodeid: str) -> None:
self._nodeid = nodeid
def runtest(self) -> None:
failures, output = self.facade.run_test(self.executable, self.originalname, self.test_unique_name, self.mode,
self.file_name, self._arguments)
# Report the c++ output in its own sections
self.add_report_section("call", "c++", output)
if failures:
raise CppTestFailureList(failures)
def repr_failure( # type:ignore[override]
self, excinfo: pytest.ExceptionInfo[BaseException], **kwargs: Any) -> str | TerminalRepr | CppFailureRepr:
if isinstance(excinfo.value, CppTestFailureList):
return CppFailureRepr(excinfo.value.failures)
return pytest.Item.repr_failure(self, excinfo)
def reportinfo(self) -> tuple[Any, int, str]:
return self.path, 0, self.originalname
class CppFile(pytest.File):
"""
Represents the C++ test file with all necessary information for test execution
"""
def __init__(self, *, no_parallel_run: bool = False, modes: list[str], disabled_tests: dict[str, set[str]],
run_id=None, facade: CppTestFacade, arguments: Sequence[str], parameters: list[str] = None, project_root: Path,
**kwargs: Any) -> None:
super().__init__(**kwargs)
self.facade = facade
self.modes = modes
self.run_id = run_id
self.disabled_tests = disabled_tests
self.no_parallel_run = no_parallel_run
self.parameters = parameters
self.project_root = project_root
self._arguments = arguments
def collect(self) -> Iterator[CppTestFunction]:
for mode in self.modes:
test_name = self.path.stem
if test_name in self.disabled_tests[mode]:
continue
executable = Path(f'{self.project_root}/build/{mode}/test/{self.path.parent.name}/{test_name}')
combined, tests = self.facade.list_tests(executable, self.no_parallel_run)
if combined:
executable = executable.parent / COMBINED_TESTS.stem
for test_name in tests:
if '/' in test_name:
test_name = test_name.replace('/', '_')
if self.parameters:
for index, parameter in enumerate(self.parameters):
yield CppTestFunction.from_parent(self, name=test_name, executable=executable,
facade=self.facade, mode=mode, test_unique_name=f'{test_name}.{index + 1}',
file_name=self.path, run_id=self.run_id,
arguments=[*self._arguments, parameter])
else:
yield CppTestFunction.from_parent(self, name=test_name, executable=executable, facade=self.facade, mode=mode,
file_name=self.path, test_unique_name=test_name, run_id=self.run_id, arguments=self._arguments)

View File

@@ -0,0 +1,17 @@
# Running tests with pytest
To run test with pytest execute
```bash
pytest test/ldap
```
To execute only one file, provide the path filename
```bash
pytest test/unit/role_manager_test.cc
```
Since it's a normal path, autocompletion works in the terminal out of the box.
To provide a specific mode, use the next parameter `--mode dev`,
if parameter isn't provided pytest tries to use `ninja mode_list` to find out the compiled modes.
Parallel execution is controlled by `pytest-xdist` and the parameter `-n auto`.
This command starts tests with the number of workers equal to CPU cores.

View File

View File

@@ -0,0 +1,318 @@
#
# Copyright (C) 2025-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
from __future__ import annotations
import os
import shutil
import socket
import subprocess
import time
from contextlib import contextmanager
from pathlib import Path
from tempfile import TemporaryDirectory
from time import sleep
from typing import Any, Generator
LDAP_SERVER_CONFIGURATION_FILE = Path('test', 'resource', 'slapd.conf')
DEFAULT_ENTRIES = ["""dn: dc=example,dc=com
objectClass: dcObject
objectClass: organization
dc: example
o: Example
description: Example directory.
""", """dn: cn=root,dc=example,dc=com
objectClass: organizationalRole
cn: root
description: Directory manager.
""", """dn: ou=People,dc=example,dc=com
objectClass: organizationalUnit
ou: People
description: Our people.
""", """# Default superuser for Scylla
dn: uid=cassandra,ou=People,dc=example,dc=com
objectClass: organizationalPerson
objectClass: uidObject
cn: cassandra
ou: People
sn: cassandra
userid: cassandra
userPassword: cassandra
""", """dn: uid=jsmith,ou=People,dc=example,dc=com
objectClass: organizationalPerson
objectClass: uidObject
cn: Joe Smith
ou: People
sn: Smith
userid: jsmith
userPassword: joeisgreat
""", """dn: uid=jdoe,ou=People,dc=example,dc=com
objectClass: organizationalPerson
objectClass: uidObject
cn: John Doe
ou: People
sn: Doe
userid: jdoe
userPassword: pa55w0rd
""", """dn: cn=role1,dc=example,dc=com
objectClass: groupOfUniqueNames
cn: role1
uniqueMember: uid=jsmith,ou=People,dc=example,dc=com
uniqueMember: uid=cassandra,ou=People,dc=example,dc=com
""", """dn: cn=role2,dc=example,dc=com
objectClass: groupOfUniqueNames
cn: role2
uniqueMember: uid=cassandra,ou=People,dc=example,dc=com
""", """dn: cn=role3,dc=example,dc=com
objectClass: groupOfUniqueNames
cn: role3
uniqueMember: uid=jdoe,ou=People,dc=example,dc=com
""", ]
class PrepareChildProcessEnv:
"""
Class responsible to get environment variables from the main thread through the shared file and set them for the process
"""
def __init__(self, root_dir, temp_dir: Path, modes: list[str], env_file: Path, byte_limit: int, worker_id):
self.id = int(worker_id[2:]) + 1
self.temp_dir = temp_dir
self.modes = modes
self.root_dir = root_dir
self.byte_limit = byte_limit
self.env_file = env_file
self.finalize = None
def prepare(self) -> None:
"""
Setup LDAP proxy and set environment variables
"""
ldap_port = 5000 + (self.id * 3) % 55000
timeout = 10
sleep_for = 0.01
start_time = time.time()
while True:
if os.path.exists(self.env_file):
(self.finalize, _, test_env) = setup(self.root_dir, ldap_port, self.temp_dir / 'ldap_instances',
self.byte_limit)
for key, value in test_env.items():
os.environ[key] = value
break
if time.time() - start_time > timeout:
raise TimeoutError(f"Timeout waiting for file {self.env_file}")
# Sleep needed to wait when the controller will create a file with environment variables.
# Without sleep checking of the file existence will be too fast,
# so it will finish before the file is created
time.sleep(sleep_for)
sleep_for *= 2
def cleanup(self) -> None:
"""
Stop LDAP
"""
if self.finalize:
self.finalize()
def __enter__(self):
try:
self.prepare()
except Exception:
self.cleanup()
raise
def __exit__(self, exc_type, exc_val, exc_tb):
self.cleanup()
class PrepareMainProcessEnv:
"""
A class responsible for starting additional services needed by tests.
It starts up a Minio server and an S3 mock server.
The environment settings are saved to a file for later consumption by child processes.
"""
def __init__(self, root_dir, temp_dir: Path, modes: list[str], env_file: Path, byte_limit: int):
self.temp_dir = temp_dir
self.modes = modes
self.root_dir = root_dir
pytest_dirs = [self.temp_dir / mode / 'pytest' for mode in modes]
for directory in [self.temp_dir, *pytest_dirs]:
if not directory.exists():
os.makedirs(directory, exist_ok=True)
self.env_file = env_file
self.tp_server = subprocess.Popen('toxiproxy-server', stderr=subprocess.DEVNULL)
def can_connect_to_toxiproxy():
return can_connect(('127.0.0.1', 8474))
if not try_something_backoff(can_connect_to_toxiproxy):
raise Exception('Could not connect to toxiproxy')
self.ldap_port = 5000
self.byte_limit = byte_limit
self.finalize = None
def prepare(self) -> None:
"""
Start the LDAP.
Create a file with environment variables for connecting to them.
"""
(self.finalize, _, test_env) = setup(self.root_dir, self.ldap_port, self.temp_dir / 'ldap_instances',
self.byte_limit)
for key, value in test_env.items():
os.environ[key] = value
with open(self.env_file, 'w') as file:
for key, value in test_env.items():
file.write(f"{key}={value}\n")
def cleanup(self) -> None:
"""
Stop LDAP.
Remove the file with environment variables to not mess for consecutive runs.
"""
if os.path.exists(self.env_file):
self.env_file.unlink()
if self.finalize:
self.finalize()
if self.tp_server is not None:
self.tp_server.terminate()
def __enter__(self):
try:
self.prepare()
except Exception:
self.cleanup()
raise
def __exit__(self, exc_type, exc_val, exc_tb):
self.cleanup()
@contextmanager
def get_env_manager(root_dir: Path, temp_dir: Path, worker_id: str, modes: list[str],
byte_limit: int) -> Generator[None, Any, None]:
"""
xdist helps to execute test in parallel.
For that purpose it creates one main controller and workers.
Pytest itself doesn't know if it's a worker or controller, so it will execute all fixtures and methods.
Tests need S3 mock server and minio to start only once for the whole run, since they can share the one instance and
share the environment variables with workers.
So the part of starting the servers executes on non-workers' machines.
That means when xdist isn't used, servers start as intended in the main process.
Tests on workers should know the endpoints of the servers, so the controller prepares this information.
According classes responsible for configuration controller and workers.
"""
env_file = Path(f"{temp_dir}/test_env").absolute()
if worker_id != 'master':
with PrepareChildProcessEnv(root_dir, temp_dir, modes, env_file, byte_limit, worker_id):
yield
else:
with PrepareMainProcessEnv(root_dir, temp_dir, modes, env_file, byte_limit):
yield
def can_connect(address, family=socket.AF_INET):
s = socket.socket(family)
try:
s.connect(address)
return True
except OSError as e:
if 'AF_UNIX path too long' in str(e):
raise OSError(e.errno, "{} ({})".format(str(e), address)) from None
else:
return False
except:
return False
def try_something_backoff(something):
sleep_time = 0.05
while not something():
if sleep_time > 30:
return False
time.sleep(sleep_time)
sleep_time *= 2
return True
def make_saslauthd_conf(port, instance_path):
"""Creates saslauthd.conf with appropriate contents under instance_path. Returns the path to the new file."""
saslauthd_conf_path = os.path.join(instance_path, 'saslauthd.conf')
with open(saslauthd_conf_path, 'w') as f:
f.write('ldap_servers: ldap://localhost:{}\nldap_search_base: dc=example,dc=com'.format(port))
return saslauthd_conf_path
def setup(project_root: Path, port: int, instance_root: Path, byte_limit: int):
instance_path = instance_root / str(port)
slapd_pid_file = instance_path / 'slapd.pid'
saslauthd_socket_path = TemporaryDirectory()
os.makedirs(instance_path, exist_ok=True)
# This will always fail because it lacks the permissions to read the default slapd data
# folder but it does create the instance folder so we don't want to fail here.
try:
subprocess.check_output(['slaptest', '-f', project_root / LDAP_SERVER_CONFIGURATION_FILE, '-F', instance_path],
stderr=subprocess.DEVNULL)
except:
pass
# Set up failure injection.
proxy_name = 'p{}'.format(port)
subprocess.check_output(
['toxiproxy-cli', 'c', proxy_name, '--listen', 'localhost:{}'.format(port + 2), '--upstream',
'localhost:{}'.format(port)])
subprocess.check_output(['toxiproxy-cli', 't', 'a', proxy_name, '-t', 'limit_data', '-n', 'limiter', '-a',
'bytes={}'.format(byte_limit)])
# Change the data folder in the default config.
replace_expression = 's/olcDbDirectory:.*/olcDbDirectory: {}/g'.format(str(instance_path).replace('/', r'\/'))
subprocess.check_output(['find', instance_path, '-type', 'f', '-exec', 'sed', '-i', replace_expression, '{}', ';'])
# Change the pid file to be kept with the instance.
replace_expression = 's/olcPidFile:.*/olcPidFile: {}/g'.format(str(slapd_pid_file).replace('/', r'\/'))
subprocess.check_output(['find', instance_path, '-type', 'f', '-exec', 'sed', '-i', replace_expression, '{}', ';'])
# Put the test data in.
cmd = ['slapadd', '-F', instance_path]
subprocess.check_output(cmd, input='\n\n'.join(DEFAULT_ENTRIES).encode('ascii'), stderr=subprocess.STDOUT)
# Set up the server.
SLAPD_URLS = 'ldap://:{}/ ldaps://:{}/'.format(port, port + 1)
def can_connect_to_slapd():
return can_connect(('127.0.0.1', port)) and can_connect(('127.0.0.1', port + 1)) and can_connect(
('127.0.0.1', port + 2))
def can_connect_to_saslauthd():
return can_connect(os.path.join(saslauthd_socket_path.name, 'mux'), socket.AF_UNIX)
slapd_proc = subprocess.Popen(['prlimit', '-n1024', 'slapd', '-F', instance_path, '-h', SLAPD_URLS, '-d', '0'])
saslauthd_conf_path = make_saslauthd_conf(port, instance_path)
test_env = {"SEASTAR_LDAP_PORT": str(port), "SASLAUTHD_MUX_PATH": os.path.join(saslauthd_socket_path.name, "mux")}
saslauthd_proc = subprocess.Popen(
['saslauthd', '-d', '-n', '1', '-a', 'ldap', '-O', saslauthd_conf_path, '-m', saslauthd_socket_path.name],
stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT)
def finalize():
slapd_proc.terminate()
slapd_proc.wait() # Wait for slapd to remove slapd.pid, so it doesn't race with rmtree below.
saslauthd_proc.kill() # Somehow, invoking terminate() here also terminates toxiproxy-server. o_O
shutil.rmtree(instance_path)
saslauthd_socket_path.cleanup()
subprocess.check_output(['toxiproxy-cli', 'd', proxy_name])
try:
if not try_something_backoff(can_connect_to_slapd):
raise Exception('Unable to connect to slapd')
if not try_something_backoff(can_connect_to_saslauthd):
raise Exception('Unable to connect to saslauthd')
except:
finalize()
raise
return finalize, '--byte-limit={}'.format(byte_limit), test_env

View File

View File

@@ -0,0 +1,61 @@
#
# Copyright (C) 2025-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
from __future__ import annotations
import os
from pathlib import Path
from typing import Sequence
from test.pylib.cpp.facade import CppTestFacade, CppTestFailure, run_process
TIMEOUT = 30 # seconds
class UnitTestFacade(CppTestFacade):
def list_tests(
self,
executable: Path,
no_parallel_run: bool
) -> tuple[bool, list[str]]:
return False, [os.path.basename(os.path.splitext(executable)[0])]
def run_test(
self,
executable: Path,
original_name: str,
test_name: str,
mode: str,
file_name: Path,
test_args: Sequence[str] = (),
) -> tuple[list[CppTestFailure], str] | tuple[None, str]:
args = [str(executable), *test_args]
os.chdir(self.temp_dir.parent)
p, stderr, stdout = run_process(args, TIMEOUT)
if p.returncode != 0:
msg = (
'working_dir: {working_dir}\n'
'Internal Error: calling {executable} '
'for test {test_id} failed (returncode={returncode}):\n'
'output:{stdout}\n'
'std error:{stderr}\n'
'command to repeat:{command}'
)
failure = CppTestFailure(
file_name.name,
line_num=0,
contents=msg.format(
working_dir=os.getcwd(),
executable=executable,
test_id=test_name,
stdout=stdout,
stderr=stderr,
command=' '.join(p.args),
returncode=p.returncode,
),
)
return [failure], stdout
return None, stdout

View File

@@ -206,6 +206,9 @@ class MinioServer:
self.ENV_ACCESS_KEY,
self.ENV_SECRET_KEY]
def get_envs_settings(self):
return {key: os.environ[key] for key in self._get_environs()}
def _unset_environ(self):
for env in self._get_environs():
if value := self.old_env.get(env):

View File

@@ -17,7 +17,10 @@ class ReportPlugin:
# Pytest hook to modify test name to include mode and run_id
def pytest_configure(self, config):
self.build_mode = config.getoption('mode')
# getting build_mode in two steps is needed for the cases when no mode parameter is provided
self.build_mode = config.getoption("modes")
if self.build_mode:
self.build_mode = self.build_mode[0]
self.config = config
self.run_id = config.getoption("run_id")
@@ -25,8 +28,6 @@ class ReportPlugin:
def pytest_runtest_makereport(self):
outcome = yield
report = outcome.get_result()
if self.build_mode is not None or self.run_id is not None:
report.nodeid = f"{report.nodeid}.{self.build_mode}.{self.run_id}"
status = get_pytest_report_status(report)
# skip attaching logs for passed tests
# attach_capture is a destination for "--allure-no-capture" option from allure-plugin
@@ -44,7 +45,7 @@ class ReportPlugin:
Add mode tag to be able to search by it.
Add parameters to make allure distinguish them and not put them to retries.
"""
request.node.name = f"{request.node.name}.{self.build_mode}.{self.run_id}"
allure.dynamic.tag(self.build_mode)
allure.dynamic.parameter('mode', self.build_mode)
allure.dynamic.parameter('run_id', self.run_id)
if self.build_mode is not None or self.run_id is not None:
allure.dynamic.tag(self.build_mode)
allure.dynamic.parameter('mode', self.build_mode)
allure.dynamic.parameter('run_id', self.run_id)

View File

@@ -220,12 +220,23 @@ class S3ProxyServer:
self.server.socket.settimeout(10000)
self.server.socket.listen(1000)
self.is_running = False
os.environ['PROXY_S3_SERVER_PORT'] = f'{port}'
os.environ['PROXY_S3_SERVER_HOST'] = host
self.envs = {'PROXY_S3_SERVER_PORT': f'{port}', 'PROXY_S3_SERVER_HOST': f'{host}'}
def _set_environ(self):
for key, value in self.envs.items():
os.environ[key] = value
def _unset_environ(self):
for key in self.envs.keys():
del os.environ[key]
def get_envs_settings(self):
return self.envs
async def start(self):
if not self.is_running:
self.logger.info('Starting S3 proxy server on %s', self.server.server_address)
self._set_environ()
loop = asyncio.get_running_loop()
self.server_thread = loop.run_in_executor(None, self.server.serve_forever)
self.is_running = True
@@ -233,6 +244,7 @@ class S3ProxyServer:
async def stop(self):
if self.is_running:
self.logger.info('Stopping S3 proxy server')
self._unset_environ()
self.server.shutdown()
await self.server_thread
self.is_running = False

View File

@@ -268,12 +268,23 @@ class MockS3Server:
self.server.socket.listen(1000)
self.logger = logger
self.is_running = False
os.environ['MOCK_S3_SERVER_PORT'] = f'{port}'
os.environ['MOCK_S3_SERVER_HOST'] = host
self.envs = {'MOCK_S3_SERVER_PORT': f'{port}', 'MOCK_S3_SERVER_HOST': f'{host}'}
def _set_environ(self):
for key, value in self.envs.items():
os.environ[key] = value
def _unset_environ(self):
for key in self.envs.keys():
del os.environ[key]
def get_envs_settings(self):
return self.envs
async def start(self):
if not self.is_running:
self.logger.info('Starting S3 mock server on %s', self.server.server_address)
self._set_environ()
loop = asyncio.get_running_loop()
self.server_thread = loop.run_in_executor(None, self.server.serve_forever)
self.is_running = True
@@ -281,6 +292,7 @@ class MockS3Server:
async def stop(self):
if self.is_running:
self.logger.info('Stopping S3 mock server')
self._unset_environ()
self.server.shutdown()
await self.server_thread
self.is_running = False

View File

@@ -3,6 +3,8 @@
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import re
import subprocess
from collections.abc import Coroutine
import threading
import time
@@ -10,7 +12,8 @@ import asyncio
import logging
import pathlib
import os
import pytest
from functools import cache
import random
import string
@@ -255,3 +258,32 @@ async def wait_for_first_completed(coros: list[Coroutine]):
t.cancel()
for t in done:
await t
def ninja(target):
"""Build specified target using ninja"""
build_dir = 'build'
args = ['ninja', target]
if os.path.exists(os.path.join(build_dir, 'build.ninja')):
args = ['ninja', '-C', build_dir, target]
return subprocess.Popen(args, stdout=subprocess.PIPE).communicate()[0].decode()
@cache
def get_configured_modes(root_dir=None):
if root_dir:
os.chdir(root_dir)
out = ninja('mode_list')
# [1/1] List configured modes
# debug release dev
return re.sub(r'.* List configured modes\n(.*)\n', r'\1',
out, count=1, flags=re.DOTALL).split('\n')[-1].split(' ')
def get_modes_to_run(session) -> list[str]:
modes = session.config.getoption('modes')
if not modes:
modes = get_configured_modes(root_dir=pathlib.Path(session.config.rootpath).parent)
if not modes:
raise RuntimeError('No modes configured. Please run ./configure.py first')
return modes

View File

@@ -10,6 +10,7 @@ markers =
without_scylla: run without attaching to a scylla process
enable_tablets: create keyspace with tablets enabled or disabled
repair: tests for repair
cpp: marker for c++ tests
norecursedirs = manual perf lib
# Ignore warnings about HTTPS requests without certificate verification
# (see issue #15287). Pytest breaks urllib3.disable_warnings() in conftest.py,
@@ -26,6 +27,7 @@ norecursedirs = manual perf lib
filterwarnings =
ignore::urllib3.exceptions.InsecureRequestWarning
ignore:record_property is incompatible with junit_family:pytest.PytestWarning
ignore::DeprecationWarning:importlib._bootstrap
tmp_path_retention_count = 1
tmp_path_retention_policy = failed

View File

@@ -265,9 +265,3 @@ def skip_mode_fixture(request, build_mode):
for reason, platform_key in skipped_funcs.get((request.function, build_mode), []):
if platform_key is None or platform_key in platform.platform():
pytest.skip(f'{request.node.name} skipped, reason: {reason}')
def pytest_collection_modifyitems(items, config):
run_id = config.getoption('run_id')
for item in items:
item.name = f"{item.name}.{run_id}"

17
test/unit/README.md Normal file
View File

@@ -0,0 +1,17 @@
# Running tests with pytest
To run test with pytest execute
```bash
pytest test/unit
```
To execute only one file, provide the path filename
```bash
pytest test/unit/lsa_async_eviction_test.cc
```
Since it's a normal path, autocompletion works in the terminal out of the box.
To provide a specific mode, use the next parameter `--mode dev`,
if parameter isn't provided pytest tries to use `ninja mode_list` to find out the compiled modes.
Parallel execution is controlled by `pytest-xdist` and the parameter `-n auto`.
This command starts tests with the number of workers equal to CPU cores.

0
test/unit/__init__.py Normal file
View File

20
test/unit/conftest.py Normal file
View File

@@ -0,0 +1,20 @@
#
# Copyright (C) 2025-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
from pathlib import PosixPath
from pytest import Collector
from test.pylib.cpp.common_cpp_conftest import collect_items
from test.pylib.cpp.unit.unit_facade import UnitTestFacade
def pytest_collect_file(file_path: PosixPath, parent: Collector):
"""
Method triggered automatically by pytest to collect files from a directory. Boost and unit have the same logic for
collection, the only difference in execution, and it's covered by facade
"""
if file_path.suffix == '.cc':
return collect_items(file_path, parent, facade=UnitTestFacade(parent.config))