test: add test to check dcs and hosts repair filter
This commit is contained in:
@@ -28,6 +28,18 @@ async def load_tablet_repair_time(cql, hosts, table_id):
|
||||
|
||||
return repair_time_map
|
||||
|
||||
async def load_tablet_repair_task_infos(cql, host, table_id):
|
||||
repair_task_infos = {}
|
||||
|
||||
rows = await cql.run_async(f"SELECT last_token, repair_task_info from system.tablets where table_id = {table_id}", host=host)
|
||||
|
||||
for row in rows:
|
||||
if row.repair_task_info is not None:
|
||||
key = str(row.last_token)
|
||||
repair_task_infos[key] = row.repair_task_info
|
||||
|
||||
return repair_task_infos
|
||||
|
||||
async def create_table_insert_data_for_repair(manager, rf = 3 , tablets = 8, fast_stats_refresh = True, nr_keys = 256, disable_flush_cache_time = False):
|
||||
if fast_stats_refresh:
|
||||
config = {'error_injections_at_startup': ['short_tablet_stats_refresh_interval']}
|
||||
|
||||
@@ -273,13 +273,18 @@ class ScyllaRESTAPIClient():
|
||||
"token": str(token)
|
||||
})
|
||||
|
||||
async def tablet_repair(self, node_ip: str, ks: str, table: str, token : int, timeout: Optional[float] = None, await_completion: bool = True) -> None:
|
||||
res = await self.client.post_json(f"/storage_service/tablets/repair", host=node_ip, timeout=timeout, params={
|
||||
async def tablet_repair(self, node_ip: str, ks: str, table: str, token : int, hosts_filter: Optional[str] = None, dcs_filter: Optional[str] = None, timeout: Optional[float] = None, await_completion: bool = True) -> None:
|
||||
params={
|
||||
"ks": ks,
|
||||
"table": table,
|
||||
"tokens": str(token),
|
||||
"await_completion": str(await_completion).lower()
|
||||
})
|
||||
}
|
||||
if hosts_filter:
|
||||
params["hosts_filter"] = hosts_filter
|
||||
if dcs_filter:
|
||||
params["dcs_filter"] = dcs_filter
|
||||
res = await self.client.post_json(f"/storage_service/tablets/repair", host=node_ip, timeout=timeout, params=params)
|
||||
return res
|
||||
|
||||
async def enable_tablet_balancing(self, node_ip: str) -> None:
|
||||
|
||||
@@ -5,13 +5,16 @@
|
||||
#
|
||||
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.util import wait_for_cql_and_get_hosts
|
||||
from test.topology.conftest import skip_mode
|
||||
from test.pylib.repair import load_tablet_repair_time, create_table_insert_data_for_repair, get_tablet_task_id
|
||||
from test.pylib.repair import load_tablet_repair_time, create_table_insert_data_for_repair, get_tablet_task_id, load_tablet_repair_task_infos
|
||||
from test.pylib.rest_client import inject_error_one_shot, read_barrier
|
||||
|
||||
import pytest
|
||||
import asyncio
|
||||
import logging
|
||||
import re
|
||||
import requests
|
||||
import time
|
||||
import datetime
|
||||
|
||||
@@ -202,3 +205,121 @@ async def test_tablet_repair_error_delete(manager: ManagerClient):
|
||||
|
||||
await asyncio.gather(repair_task(), del_repair_task());
|
||||
await inject_error_off(manager, "repair_tablet_fail_on_rpc_call", servers)
|
||||
|
||||
def get_repair_row_from_disk(server):
|
||||
row_num = 0
|
||||
metrics = requests.get(f"http://{server.ip_addr}:9180/metrics").text
|
||||
pattern = re.compile("^scylla_repair_row_from_disk_nr")
|
||||
for metric in metrics.split('\n'):
|
||||
if pattern.match(metric) is not None:
|
||||
row_num += int(metric.split()[1])
|
||||
|
||||
return row_num
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@skip_mode('release', 'error injections are not supported in release mode')
|
||||
async def test_tablet_repair_hosts_filter(manager: ManagerClient):
|
||||
servers, cql, hosts, table_id = await create_table_insert_data_for_repair(manager)
|
||||
hosts_filter = f"{hosts[0].host_id},{hosts[1].host_id}"
|
||||
|
||||
row_num_before = [get_repair_row_from_disk(server) for server in servers]
|
||||
|
||||
token = -1
|
||||
async def repair_task():
|
||||
await inject_error_on(manager, "repair_tablet_fail_on_rpc_call", servers)
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, "test", "test", token, hosts_filter=hosts_filter)
|
||||
|
||||
async def check_filter():
|
||||
tablet_task_id = None
|
||||
while tablet_task_id == None:
|
||||
tablet_task_id = await get_tablet_task_id(cql, hosts[0], table_id, token)
|
||||
|
||||
res = await load_tablet_repair_task_infos(cql, hosts[0], table_id)
|
||||
assert len(res) == 1
|
||||
assert res[str(token)].repair_hosts_filter.split(",").sort() == hosts_filter.split(",").sort()
|
||||
|
||||
await inject_error_off(manager, "repair_tablet_fail_on_rpc_call", servers)
|
||||
|
||||
await asyncio.gather(repair_task(), check_filter())
|
||||
|
||||
row_num_after = [get_repair_row_from_disk(server) for server in servers]
|
||||
assert row_num_before[0] < row_num_after[0]
|
||||
assert row_num_before[1] < row_num_after[1]
|
||||
assert row_num_before[2] == row_num_after[2]
|
||||
|
||||
async def prepare_multi_dc_repair(manager):
|
||||
servers = [await manager.server_add(property_file = {'dc': 'DC1', 'rack' : 'R1'}),
|
||||
await manager.server_add(property_file = {'dc': 'DC1', 'rack' : 'R1'}),
|
||||
await manager.server_add(property_file = {'dc': 'DC2', 'rack' : 'R2'})]
|
||||
cql = manager.get_cql()
|
||||
await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', "
|
||||
"'DC1': 2, 'DC2': 1} AND tablets = {'initial': 8};")
|
||||
await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c int) WITH tombstone_gc = {'mode':'repair'};")
|
||||
keys = range(256)
|
||||
await asyncio.gather(*[cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({k}, {k});") for k in keys])
|
||||
table_id = await manager.get_table_id("test", "test")
|
||||
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
return (servers, cql, hosts, table_id)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@skip_mode('release', 'error injections are not supported in release mode')
|
||||
async def test_tablet_repair_dcs_filter(manager: ManagerClient):
|
||||
servers, cql, hosts, table_id = await prepare_multi_dc_repair(manager)
|
||||
dcs_filter = "DC1"
|
||||
|
||||
row_num_before = [get_repair_row_from_disk(server) for server in servers]
|
||||
|
||||
token = -1
|
||||
async def repair_task():
|
||||
await inject_error_on(manager, "repair_tablet_fail_on_rpc_call", servers)
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, "test", "test", token, dcs_filter=dcs_filter)
|
||||
|
||||
async def check_filter():
|
||||
tablet_task_id = None
|
||||
while tablet_task_id == None:
|
||||
tablet_task_id = await get_tablet_task_id(cql, hosts[0], table_id, token)
|
||||
|
||||
res = await load_tablet_repair_task_infos(cql, hosts[0], table_id)
|
||||
assert len(res) == 1
|
||||
assert res[str(token)].repair_dcs_filter.split(",").sort() == dcs_filter.split(",").sort()
|
||||
|
||||
await inject_error_off(manager, "repair_tablet_fail_on_rpc_call", servers)
|
||||
|
||||
await asyncio.gather(repair_task(), check_filter())
|
||||
|
||||
row_num_after = [get_repair_row_from_disk(server) for server in servers]
|
||||
assert row_num_before[0] < row_num_after[0]
|
||||
assert row_num_before[1] < row_num_after[1]
|
||||
assert row_num_before[2] == row_num_after[2]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@skip_mode('release', 'error injections are not supported in release mode')
|
||||
async def test_tablet_repair_hosts_and_dcs_filter(manager: ManagerClient):
|
||||
servers, cql, hosts, table_id = await prepare_multi_dc_repair(manager)
|
||||
dcs_filter = "DC1,DC2"
|
||||
hosts_filter = f"{hosts[0].host_id},{hosts[2].host_id}"
|
||||
|
||||
row_num_before = [get_repair_row_from_disk(server) for server in servers]
|
||||
|
||||
token = -1
|
||||
async def repair_task():
|
||||
await inject_error_on(manager, "repair_tablet_fail_on_rpc_call", servers)
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, "test", "test", token, hosts_filter=hosts_filter, dcs_filter=dcs_filter)
|
||||
|
||||
async def check_filter():
|
||||
tablet_task_id = None
|
||||
while tablet_task_id == None:
|
||||
tablet_task_id = await get_tablet_task_id(cql, hosts[0], table_id, token)
|
||||
|
||||
res = await load_tablet_repair_task_infos(cql, hosts[0], table_id)
|
||||
assert len(res) == 1
|
||||
assert res[str(token)].repair_dcs_filter.split(",").sort() == dcs_filter.split(",").sort()
|
||||
|
||||
await inject_error_off(manager, "repair_tablet_fail_on_rpc_call", servers)
|
||||
|
||||
await asyncio.gather(repair_task(), check_filter())
|
||||
|
||||
row_num_after = [get_repair_row_from_disk(server) for server in servers]
|
||||
assert row_num_before[0] < row_num_after[0]
|
||||
assert row_num_before[1] == row_num_after[1]
|
||||
assert row_num_before[2] < row_num_after[2]
|
||||
|
||||
Reference in New Issue
Block a user