test.py: Add resource consumption metrics

This PR adds the possibility to gather resource consumption metrics. The collected metrics can be used to compare performance before and after specific changes aimed at increasing performance. Currently, this functionality works only in manual mode, and this is just raw data. Later on, these metrics can be used in Jupyter notebook to analyze and visualize how the resources are used and can provide the insight on how to improve it. This PR is a first insight after gathering these metrics.

Add the possibility to gather resource consumption for the test.py execution. SQLite DB will be created with different performance metrics that will allow comparing the resource consumption between changes.
The DB will be in the tmp directory that by default set to testlog. Across the runs, the DB will not be deleted, so each new run will just add information to the existing DB.
Parameter --get-metrics was added to switch on or off the metrics gathering. By default, it's switched on.

Closes: scylladb/qa-tasks#1666

Closes: scylladb/qa-tasks#1707

Closes scylladb/scylladb#19881
This commit is contained in:
Andrei Chekun
2024-06-26 18:54:10 +02:00
committed by Avi Kivity
parent 39ce358d82
commit bbb6c3c2ff
6 changed files with 463 additions and 2 deletions

31
test.py
View File

@@ -36,6 +36,7 @@ from scripts import coverage # type: ignore
from test.pylib.artifact_registry import ArtifactRegistry
from test.pylib.host_registry import HostRegistry
from test.pylib.pool import Pool
from test.pylib.resource_gather import setup_cgroup, run_resource_watcher, get_resource_gather
from test.pylib.util import LogPrefixAdapter
from test.pylib.scylla_cluster import ScyllaServer, ScyllaCluster, get_cluster_manager, merge_cmdline_options
from test.pylib.minio_server import MinioServer
@@ -1216,6 +1217,8 @@ async def run_test(test: Test, options: argparse.Namespace, gentle_kill=False, e
os.getenv("ASAN_OPTIONS"),
]
try:
resource_gather = get_resource_gather(options.gather_metrics, test, options.tmpdir)
resource_gather.make_cgroup()
log.write("=== TEST.PY STARTING TEST {} ===\n".format(test.uname).encode(encoding="UTF-8"))
log.write("export UBSAN_OPTIONS='{}'\n".format(
":".join(filter(None, UBSAN_OPTIONS))).encode(encoding="UTF-8"))
@@ -1232,6 +1235,10 @@ async def run_test(test: Test, options: argparse.Namespace, gentle_kill=False, e
if options.cpus:
path = 'taskset'
args = ['-c', options.cpus, test.path, *test.args]
test_running_event = asyncio.Event()
test_resource_watcher = resource_gather.cgroup_monitor(test_event=test_running_event)
process = await asyncio.create_subprocess_exec(
path, *args,
stderr=log,
@@ -1245,19 +1252,33 @@ async def run_test(test: Test, options: argparse.Namespace, gentle_kill=False, e
SCYLLA_TEST_ENV='yes',
**env,
),
preexec_fn=os.setsid,
preexec_fn=resource_gather.put_process_to_cgroup,
)
stdout, _ = await asyncio.wait_for(process.communicate(), options.timeout)
test_running_event.set()
test.time_end = time.time()
metrics = resource_gather.get_test_metrics()
try:
async with asyncio.timeout(2):
await test_resource_watcher
except TimeoutError:
log.write(f'Metrics for {test.name} can be inaccurate, job reached timeout'.encode(encoding='UTF-8'))
finally:
resource_gather.remove_cgroup()
if process.returncode not in test.valid_exit_codes:
report_error('Test exited with code {code}\n'.format(code=process.returncode))
resource_gather.write_metrics_to_db(metrics)
return False
try:
test.check_log(not options.save_log_on_success)
except Exception as e:
print("")
print(test.name + ": " + palette.crit("failed to parse XML output: {}".format(e)))
resource_gather.write_metrics_to_db(metrics)
return False
resource_gather.write_metrics_to_db(metrics, True)
return True
except (asyncio.TimeoutError, asyncio.CancelledError) as e:
test.is_cancelled = True
@@ -1315,6 +1336,7 @@ def parse_cmd_line() -> argparse.Namespace:
help="""Path to temporary test data and log files. The data is
further segregated per build mode. Default: ./testlog.""",
)
parser.add_argument("--gather-metrics", action=argparse.BooleanOptionalAction, default=True)
parser.add_argument('--mode', choices=all_modes.keys(), action="append", dest="modes",
help="Run only tests for given build mode(s)")
parser.add_argument('--repeat', action="store", default="1", type=int,
@@ -1782,7 +1804,7 @@ async def main() -> int:
options = parse_cmd_line()
open_log(options.tmpdir, f"test.py.{'-'.join(options.modes)}.log", options.log_level)
setup_cgroup(options.gather_metrics)
await find_tests(options)
if options.list_tests:
print('\n'.join([f"{t.suite.mode:<8} {type(t.suite).__name__[:-9]:<11} {t.name}"
@@ -1790,11 +1812,16 @@ async def main() -> int:
return 0
signaled = asyncio.Event()
stop_event = asyncio.Event()
resource_watcher = run_resource_watcher(options.gather_metrics, signaled, stop_event, options.tmpdir)
setup_signal_handlers(asyncio.get_running_loop(), signaled)
try:
await run_all_tests(signaled, options)
stop_event.set()
async with asyncio.timeout(5):
await resource_watcher
except Exception as e:
print(palette.fail(e))
raise

View File

42
test/pylib/db/model.py Normal file
View File

@@ -0,0 +1,42 @@
#
# Copyright (C) 2024-present ScyllaDB
#
# SPDX-License-Identifier: AGPL-3.0-or-later
#
from datetime import datetime
from attr import define
@define
class CgroupMetric:
memory: int
test_id: int
timestamp: datetime
@define
class Metric:
test_id: int
memory_peak: int = None
success: bool = None
system_sec: float = None
time_end: float = None
time_start: float = None
time_taken: float = None
usage_sec: float = None
user_sec: float = None
@define
class SystemResourceMetric:
cpu: float
memory: float
timestamp: datetime
@define
class Test:
architecture: str
directory: str
mode: str
run_id: int
test_name: str

155
test/pylib/db/writer.py Normal file
View File

@@ -0,0 +1,155 @@
#
# Copyright (C) 2024-present ScyllaDB
#
# SPDX-License-Identifier: AGPL-3.0-or-later
#
import asyncio
import sqlite3
from typing import List
from attr import AttrsInstance, asdict
TESTS_TABLE = 'tests'
METRICS_TABLE = 'test_metrics'
SYSTEM_RESOURCE_METRICS_TABLE = 'system_resource_metrics'
CGROUP_MEMORY_METRICS_TABLE = 'cgroup_memory_metrics'
DEFAULT_DB_NAME = 'sqlite.db'
DATE_TIME_TEMPLATE = '%Y-%m-%d %H:%M:%S.%f'
create_table = [
f'''
CREATE TABLE IF NOT EXISTS {TESTS_TABLE} (
id INTEGER PRIMARY KEY,
architecture VARCHAR(15) NOT NULL,
directory VARCHAR(255),
mode VARCHAR(15) NOT NULL,
run_id INTEGER,
test_name VARCHAR(255) NOT NULL
);
''',
f'''
CREATE TABLE IF NOT EXISTS {METRICS_TABLE} (
id INTEGER PRIMARY KEY,
test_id INT NOT NULL,
user_sec REAL,
system_sec REAL,
usage_sec REAL,
memory_peak INTEGER,
time_taken REAL,
time_start DATETIME,
time_end DATETIME,
success BOOLEAN,
FOREIGN KEY(test_id) REFERENCES {TESTS_TABLE}(id)
);
''',
f'''
CREATE TABLE IF NOT EXISTS {SYSTEM_RESOURCE_METRICS_TABLE} (
id INTEGER PRIMARY KEY,
memory REAL,
cpu REAL,
timestamp DATETIME
);
''',
f'''
CREATE TABLE IF NOT EXISTS {CGROUP_MEMORY_METRICS_TABLE} (
id INTEGER PRIMARY KEY,
test_id INT NOT NULL,
memory REAL,
timestamp DATETIME,
FOREIGN KEY(test_id) REFERENCES {TESTS_TABLE}(id)
);
'''
]
class SingletonMeta(type):
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
instance = super().__call__(*args, **kwargs)
cls._instances[cls] = instance
return cls._instances[cls]
class SQLiteWriter(metaclass=SingletonMeta):
__instance = None
def __init__(self, database_path):
"""
Initializes the SQLWriter object.
Args:
database_path: Path to the SQLite database file.
"""
self.lock = asyncio.Lock()
self.conn = sqlite3.connect(database_path)
self.cursor = self.conn.cursor()
self.cursor.execute('PRAGMA foreign_keys=ON')
self.cursor.execute('PRAGMA sychronous=off')
for table in create_table:
self.cursor.execute(table).connection.commit()
SQLiteWriter.__instance = self
def write_row(self, model, table_name: str) -> int:
"""
Inserts a single row of data into the specified table.
Args:
model: A AttrsInstance object with a data to insert.
table_name: Name of the table where data is being written.
Return:
str: Returns the ID of the inserted record
"""
data = asdict(model)
columns = ', '.join(data.keys())
placeholders = ', '.join(['?'] * len(data))
values = tuple(data.values())
sql_query = f'INSERT INTO {table_name} ({columns}) VALUES ({placeholders})'
self.cursor.execute(sql_query, values)
last_row_id = self.cursor.lastrowid
self.conn.commit()
return last_row_id
def write_multiple_rows(self, data_list: List[AttrsInstance], table_name: str) -> None:
"""
Inserts multiple rows of data into the specified table.
Args:
data_list: A list of AttrsInstance objects, each representing a row of data.
table_name: Name of the table where data is being written.
"""
for model in data_list:
self.write_row(model, table_name)
def __del__(self):
"""
Closes the database connection when the object is deleted.
"""
self.conn.close()
def write_row_if_not_exist(self, model, table_name: str):
data = asdict(model)
values = tuple(data.values())
# Construct the SQL query to retrieve the ID if the record exists
select_query = f"""
SELECT id FROM {table_name} WHERE {
' AND '.join([f"{col} = ?" for col in data.keys()])
}
"""
# Execute the select query first
cursor = self.conn.execute(select_query, values)
existing_row = cursor.fetchone()
if existing_row:
# Record exists, return its ID
return existing_row[0]
else:
return self.write_row(model, table_name)

View File

@@ -0,0 +1,236 @@
#
# Copyright (C) 2024-present ScyllaDB
#
# SPDX-License-Identifier: AGPL-3.0-or-later
#
import asyncio
import getpass
import logging
import os
import platform
import subprocess
from abc import ABC
from asyncio import Task, Event
from datetime import datetime
from functools import lru_cache
from pathlib import Path
from typing import TextIO
import psutil
from test.pylib.db.model import Metric, SystemResourceMetric, CgroupMetric, Test
from test.pylib.db.writer import DATE_TIME_TEMPLATE, SQLiteWriter, SYSTEM_RESOURCE_METRICS_TABLE, METRICS_TABLE, \
DEFAULT_DB_NAME, CGROUP_MEMORY_METRICS_TABLE, TESTS_TABLE
@lru_cache(maxsize=None)
def get_cgroup() -> Path:
cgroup_path = f"/proc/{os.getpid()}/cgroup"
with open(cgroup_path, 'r') as f:
cgroup_info = f.readlines()
# Extract the relative cgroup for the process and make it absolute and add where the test.py process should be
# placed in.
# This can be used to manipulate the cgroup's controllers
return Path(f'/sys/fs/cgroup/{cgroup_info[0].strip().split(':')[-1]}/initial')
CGROUP_INITIAL: Path = get_cgroup()
CGROUP_TESTS: Path = CGROUP_INITIAL.parent / 'tests'
cancel_event_global = None
stop_event_global = None
class ResourceGather(ABC):
def __init__(self, test, tmp_dir: str):
self.test = test
self.db_path = Path(tmp_dir) / DEFAULT_DB_NAME
standardized_name = self.test.shortname.replace("/", "_")
self.cgroup_path = Path(
f"{CGROUP_TESTS}/{self.test.suite.name}.{standardized_name}.{self.test.suite.mode}.{self.test.id}"
)
self.logger = logging.getLogger(__name__)
def make_cgroup(self):
pass
def put_process_to_cgroup(self):
os.setsid()
def get_test_metrics(self) -> Metric:
pass
def write_metrics_to_db(self, metrics: Metric, success: bool = False) -> None:
pass
def cgroup_monitor(self, test_event: Event):
pass
def remove_cgroup(self):
pass
class ResourceGatherOff(ResourceGather):
def cgroup_monitor(self, test_event) -> Task:
return asyncio.create_task(no_monitor())
class ResourceGatherOn(ResourceGather):
def __init__(self, test, tmp_dir: str):
super().__init__(test, tmp_dir)
self.sqlite_writer = SQLiteWriter(self.db_path)
self.test_id: int = self.sqlite_writer.write_row_if_not_exist(
Test(
architecture=platform.machine(),
directory=test.suite.name,
mode=test.mode,
run_id=test.id,
test_name=test.shortname
),
TESTS_TABLE)
def make_cgroup(self):
os.makedirs(self.cgroup_path, exist_ok=True)
def get_test_metrics(self) -> Metric:
test_metrics: Metric = Metric(test_id=self.test_id)
test_metrics.time_taken = self.test.time_end - self.test.time_start
test_metrics.time_start = datetime.fromtimestamp(self.test.time_start).strftime(DATE_TIME_TEMPLATE)
test_metrics.time_end = datetime.fromtimestamp(self.test.time_end).strftime(DATE_TIME_TEMPLATE)
test_metrics.success = self.test.success
with open(self.cgroup_path / 'memory.peak', 'r') as file:
test_metrics.memory_peak = file.read()
if (self.cgroup_path / 'cpu.stat').exists():
with open(self.cgroup_path / 'cpu.stat', 'r', ) as file:
self._parse_cpu_stat(file, test_metrics)
return test_metrics
def write_metrics_to_db(self, metrics: Metric, success: bool = False) -> None:
metrics.success = success
self.sqlite_writer.write_row(metrics, METRICS_TABLE)
def put_process_to_cgroup(self):
super().put_process_to_cgroup()
pid = os.getpid()
with open(self.cgroup_path / 'cgroup.procs', "a") as cgroup:
cgroup.write(str(pid))
def remove_cgroup(self):
os.rmdir(self.cgroup_path)
def cgroup_monitor(self, test_event: Event) -> Task:
return asyncio.create_task(self._monitor_cgroup(test_event))
async def _monitor_cgroup(self, test_event: Event) -> None:
"""Continuously monitors CPU and memory utilization."""
try:
while not test_event.is_set():
with open(self.cgroup_path / 'memory.current', 'r') as memory_current:
timeline_record = CgroupMetric(
test_id=self.test_id,
memory=int(memory_current.read()),
timestamp=datetime.now()
)
self.sqlite_writer.write_row(timeline_record, CGROUP_MEMORY_METRICS_TABLE)
# Control the frequency of updates, for example, every 2 seconds
await asyncio.sleep(1)
except asyncio.CancelledError:
self.logger.info(f'cgroup monitoring job was cancelled')
@staticmethod
def _parse_cpu_stat(file: TextIO, metrics: Metric) -> None:
# Map the values from cpu.state to the model. Keys are values from cpu.stat
stats = {'user_usec': 'user_sec', 'system_usec': 'system_sec', 'usage_usec': 'usage_sec'}
for line in file.readlines():
stat, value = line.split(' ')
if stat in stats.keys():
setattr(metrics, stats[stat], float(value) / 1_000_000)
def get_resource_gather(is_switched_on: bool, test, tmpdir: str) -> ResourceGather:
if is_switched_on:
return ResourceGatherOn(test, tmpdir)
else:
return ResourceGatherOff(test, tmpdir)
def _is_cgroup_rw(path: Path) -> bool:
with open('/proc/mounts', 'r') as f:
for line in f.readlines():
if line.startswith('cgroup2'):
options = line.split(' ')[3].split(',')
if 'rw' in options:
return True
else:
return False
def setup_cgroup(is_required: bool) -> None:
if is_required:
# check where the process is executed in podman or in docker
is_podman = os.access("/run/.containerenv", os.F_OK)
is_docker = os.access("/.dockerenv", os.F_OK)
is_cgroup_ro = _is_cgroup_rw('/sys/fs/cgroup')
if is_podman or is_docker:
subprocess.run(['sudo', 'chown', '-R', f"{getpass.getuser()}:{getpass.getuser()}", '/sys/fs/cgroup'],
check=True)
if is_cgroup_ro and is_docker:
subprocess.run(
[
"sudo",
"mount",
"-o",
"remount,rw",
"/sys/fs/cgroup",
],
check=True,
)
for directory in [CGROUP_INITIAL, CGROUP_TESTS]:
if directory.exists():
os.rmdir(directory)
directory.mkdir()
with open(CGROUP_INITIAL.parent / 'cgroup.procs') as f:
processes = [x.strip() for x in f.readlines()]
for process in processes:
with open(CGROUP_INITIAL / 'cgroup.procs', "w") as f:
f.write(str(process))
with open(CGROUP_INITIAL.parent / 'cgroup.controllers', "r") as f:
controllers = f.readline()
controllers = " ".join(map(lambda x: f"+{x}", controllers.split(" ")))
with open(CGROUP_INITIAL.parent / 'cgroup.subtree_control', "w") as f:
f.write(controllers)
with open(CGROUP_TESTS / 'cgroup.subtree_control', "w") as f:
f.write(controllers)
async def monitor_resources(cancel_event: asyncio.Event, stop_event: asyncio.Event, tmpdir: Path) -> None:
"""Continuously monitors CPU and memory utilization."""
sqlite_writer = SQLiteWriter(tmpdir / DEFAULT_DB_NAME)
while not cancel_event.is_set() and not stop_event.is_set():
timeline_record = SystemResourceMetric(
cpu=psutil.cpu_percent(interval=0.1),
memory=psutil.virtual_memory().percent,
timestamp=datetime.now()
)
sqlite_writer.write_row(timeline_record, SYSTEM_RESOURCE_METRICS_TABLE)
# Control the frequency of updates, for example, every 2 seconds
await asyncio.sleep(2)
async def no_monitor():
pass
def run_resource_watcher(is_required, cancel_event, stop_event, tmpdir: str) -> Task:
if is_required:
return asyncio.create_task(monitor_resources(cancel_event, stop_event, Path(tmpdir)))
return asyncio.create_task(no_monitor())

View File

@@ -201,6 +201,7 @@ docker_common_args+=(
--security-opt label=disable \
--network host \
--cap-add SYS_PTRACE \
--privileged \
--ulimit nofile=$(ulimit -Sn):$hard_limit \
-v "$PWD:$PWD" \
-v "$tmpdir:/tmp" \