diff --git a/test/pylib/repair.py b/test/pylib/repair.py index 73f8d15b1a..1c5c2cc109 100644 --- a/test/pylib/repair.py +++ b/test/pylib/repair.py @@ -28,6 +28,18 @@ async def load_tablet_repair_time(cql, hosts, table_id): return repair_time_map +async def load_tablet_repair_task_infos(cql, host, table_id): + repair_task_infos = {} + + rows = await cql.run_async(f"SELECT last_token, repair_task_info from system.tablets where table_id = {table_id}", host=host) + + for row in rows: + if row.repair_task_info is not None: + key = str(row.last_token) + repair_task_infos[key] = row.repair_task_info + + return repair_task_infos + async def create_table_insert_data_for_repair(manager, rf = 3 , tablets = 8, fast_stats_refresh = True, nr_keys = 256, disable_flush_cache_time = False): if fast_stats_refresh: config = {'error_injections_at_startup': ['short_tablet_stats_refresh_interval']} diff --git a/test/pylib/rest_client.py b/test/pylib/rest_client.py index 1bb8162696..838d663b89 100644 --- a/test/pylib/rest_client.py +++ b/test/pylib/rest_client.py @@ -273,13 +273,18 @@ class ScyllaRESTAPIClient(): "token": str(token) }) - async def tablet_repair(self, node_ip: str, ks: str, table: str, token : int, timeout: Optional[float] = None, await_completion: bool = True) -> None: - res = await self.client.post_json(f"/storage_service/tablets/repair", host=node_ip, timeout=timeout, params={ + async def tablet_repair(self, node_ip: str, ks: str, table: str, token : int, hosts_filter: Optional[str] = None, dcs_filter: Optional[str] = None, timeout: Optional[float] = None, await_completion: bool = True) -> None: + params={ "ks": ks, "table": table, "tokens": str(token), "await_completion": str(await_completion).lower() - }) + } + if hosts_filter: + params["hosts_filter"] = hosts_filter + if dcs_filter: + params["dcs_filter"] = dcs_filter + res = await self.client.post_json(f"/storage_service/tablets/repair", host=node_ip, timeout=timeout, params=params) return res async def enable_tablet_balancing(self, node_ip: str) -> None: diff --git a/test/topology_custom/test_tablet_repair_scheduler.py b/test/topology_custom/test_tablet_repair_scheduler.py index 1dad3cd1c1..56ced30e8d 100644 --- a/test/topology_custom/test_tablet_repair_scheduler.py +++ b/test/topology_custom/test_tablet_repair_scheduler.py @@ -5,13 +5,16 @@ # from test.pylib.manager_client import ManagerClient +from test.pylib.util import wait_for_cql_and_get_hosts from test.topology.conftest import skip_mode -from test.pylib.repair import load_tablet_repair_time, create_table_insert_data_for_repair, get_tablet_task_id +from test.pylib.repair import load_tablet_repair_time, create_table_insert_data_for_repair, get_tablet_task_id, load_tablet_repair_task_infos from test.pylib.rest_client import inject_error_one_shot, read_barrier import pytest import asyncio import logging +import re +import requests import time import datetime @@ -202,3 +205,121 @@ async def test_tablet_repair_error_delete(manager: ManagerClient): await asyncio.gather(repair_task(), del_repair_task()); await inject_error_off(manager, "repair_tablet_fail_on_rpc_call", servers) + +def get_repair_row_from_disk(server): + row_num = 0 + metrics = requests.get(f"http://{server.ip_addr}:9180/metrics").text + pattern = re.compile("^scylla_repair_row_from_disk_nr") + for metric in metrics.split('\n'): + if pattern.match(metric) is not None: + row_num += int(metric.split()[1]) + + return row_num + +@pytest.mark.asyncio +@skip_mode('release', 'error injections are not supported in release mode') +async def test_tablet_repair_hosts_filter(manager: ManagerClient): + servers, cql, hosts, table_id = await create_table_insert_data_for_repair(manager) + hosts_filter = f"{hosts[0].host_id},{hosts[1].host_id}" + + row_num_before = [get_repair_row_from_disk(server) for server in servers] + + token = -1 + async def repair_task(): + await inject_error_on(manager, "repair_tablet_fail_on_rpc_call", servers) + await manager.api.tablet_repair(servers[0].ip_addr, "test", "test", token, hosts_filter=hosts_filter) + + async def check_filter(): + tablet_task_id = None + while tablet_task_id == None: + tablet_task_id = await get_tablet_task_id(cql, hosts[0], table_id, token) + + res = await load_tablet_repair_task_infos(cql, hosts[0], table_id) + assert len(res) == 1 + assert res[str(token)].repair_hosts_filter.split(",").sort() == hosts_filter.split(",").sort() + + await inject_error_off(manager, "repair_tablet_fail_on_rpc_call", servers) + + await asyncio.gather(repair_task(), check_filter()) + + row_num_after = [get_repair_row_from_disk(server) for server in servers] + assert row_num_before[0] < row_num_after[0] + assert row_num_before[1] < row_num_after[1] + assert row_num_before[2] == row_num_after[2] + +async def prepare_multi_dc_repair(manager): + servers = [await manager.server_add(property_file = {'dc': 'DC1', 'rack' : 'R1'}), + await manager.server_add(property_file = {'dc': 'DC1', 'rack' : 'R1'}), + await manager.server_add(property_file = {'dc': 'DC2', 'rack' : 'R2'})] + cql = manager.get_cql() + await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', " + "'DC1': 2, 'DC2': 1} AND tablets = {'initial': 8};") + await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c int) WITH tombstone_gc = {'mode':'repair'};") + keys = range(256) + await asyncio.gather(*[cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({k}, {k});") for k in keys]) + table_id = await manager.get_table_id("test", "test") + hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60) + return (servers, cql, hosts, table_id) + +@pytest.mark.asyncio +@skip_mode('release', 'error injections are not supported in release mode') +async def test_tablet_repair_dcs_filter(manager: ManagerClient): + servers, cql, hosts, table_id = await prepare_multi_dc_repair(manager) + dcs_filter = "DC1" + + row_num_before = [get_repair_row_from_disk(server) for server in servers] + + token = -1 + async def repair_task(): + await inject_error_on(manager, "repair_tablet_fail_on_rpc_call", servers) + await manager.api.tablet_repair(servers[0].ip_addr, "test", "test", token, dcs_filter=dcs_filter) + + async def check_filter(): + tablet_task_id = None + while tablet_task_id == None: + tablet_task_id = await get_tablet_task_id(cql, hosts[0], table_id, token) + + res = await load_tablet_repair_task_infos(cql, hosts[0], table_id) + assert len(res) == 1 + assert res[str(token)].repair_dcs_filter.split(",").sort() == dcs_filter.split(",").sort() + + await inject_error_off(manager, "repair_tablet_fail_on_rpc_call", servers) + + await asyncio.gather(repair_task(), check_filter()) + + row_num_after = [get_repair_row_from_disk(server) for server in servers] + assert row_num_before[0] < row_num_after[0] + assert row_num_before[1] < row_num_after[1] + assert row_num_before[2] == row_num_after[2] + +@pytest.mark.asyncio +@skip_mode('release', 'error injections are not supported in release mode') +async def test_tablet_repair_hosts_and_dcs_filter(manager: ManagerClient): + servers, cql, hosts, table_id = await prepare_multi_dc_repair(manager) + dcs_filter = "DC1,DC2" + hosts_filter = f"{hosts[0].host_id},{hosts[2].host_id}" + + row_num_before = [get_repair_row_from_disk(server) for server in servers] + + token = -1 + async def repair_task(): + await inject_error_on(manager, "repair_tablet_fail_on_rpc_call", servers) + await manager.api.tablet_repair(servers[0].ip_addr, "test", "test", token, hosts_filter=hosts_filter, dcs_filter=dcs_filter) + + async def check_filter(): + tablet_task_id = None + while tablet_task_id == None: + tablet_task_id = await get_tablet_task_id(cql, hosts[0], table_id, token) + + res = await load_tablet_repair_task_infos(cql, hosts[0], table_id) + assert len(res) == 1 + assert res[str(token)].repair_dcs_filter.split(",").sort() == dcs_filter.split(",").sort() + + await inject_error_off(manager, "repair_tablet_fail_on_rpc_call", servers) + + await asyncio.gather(repair_task(), check_filter()) + + row_num_after = [get_repair_row_from_disk(server) for server in servers] + assert row_num_before[0] < row_num_after[0] + assert row_num_before[1] == row_num_after[1] + assert row_num_before[2] < row_num_after[2]