test_tablet_repair_scheduler: prepare_multi_dc_repair: use create_new_test_keyspace

and return the keyspace unique name to the caller.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
Benny Halevy
2025-02-05 09:07:10 +02:00
parent cbe79b20f7
commit cc281ff88d

View File

@@ -4,11 +4,15 @@
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
from test.pylib.internal_types import ServerInfo
from test.pylib.manager_client import ManagerClient
from test.pylib.util import wait_for_cql_and_get_hosts
from test.pylib.util import wait_for_cql_and_get_hosts, Host
from test.topology.conftest import skip_mode
from test.pylib.repair import load_tablet_repair_time, create_table_insert_data_for_repair, get_tablet_task_id, load_tablet_repair_task_infos
from test.pylib.rest_client import inject_error_one_shot, read_barrier
from test.topology.util import create_new_test_keyspace
from cassandra.cluster import Session as CassandraSession
import pytest
import asyncio
@@ -247,24 +251,24 @@ async def test_tablet_repair_hosts_filter(manager: ManagerClient):
assert row_num_before[1] < row_num_after[1]
assert row_num_before[2] == row_num_after[2]
async def prepare_multi_dc_repair(manager):
async def prepare_multi_dc_repair(manager) -> tuple[list[ServerInfo], CassandraSession, list[Host], str, str]:
servers = [await manager.server_add(property_file = {'dc': 'DC1', 'rack' : 'R1'}),
await manager.server_add(property_file = {'dc': 'DC1', 'rack' : 'R1'}),
await manager.server_add(property_file = {'dc': 'DC2', 'rack' : 'R2'})]
cql = manager.get_cql()
await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', "
ks = await create_new_test_keyspace(cql, "WITH replication = {'class': 'NetworkTopologyStrategy', "
"'DC1': 2, 'DC2': 1} AND tablets = {'initial': 8};")
await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c int) WITH tombstone_gc = {'mode':'repair'};")
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int) WITH tombstone_gc = {{'mode':'repair'}};")
keys = range(256)
await asyncio.gather(*[cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({k}, {k});") for k in keys])
table_id = await manager.get_table_id("test", "test")
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in keys])
table_id = await manager.get_table_id(ks, "test")
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
return (servers, cql, hosts, table_id)
return (servers, cql, hosts, ks, table_id)
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_tablet_repair_dcs_filter(manager: ManagerClient):
servers, cql, hosts, table_id = await prepare_multi_dc_repair(manager)
servers, cql, hosts, ks, table_id = await prepare_multi_dc_repair(manager)
dcs_filter = "DC1"
row_num_before = [get_repair_row_from_disk(server) for server in servers]
@@ -272,7 +276,7 @@ async def test_tablet_repair_dcs_filter(manager: ManagerClient):
token = -1
async def repair_task():
await inject_error_on(manager, "repair_tablet_fail_on_rpc_call", servers)
await manager.api.tablet_repair(servers[0].ip_addr, "test", "test", token, dcs_filter=dcs_filter)
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, dcs_filter=dcs_filter)
async def check_filter():
tablet_task_id = None
@@ -295,7 +299,7 @@ async def test_tablet_repair_dcs_filter(manager: ManagerClient):
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_tablet_repair_hosts_and_dcs_filter(manager: ManagerClient):
servers, cql, hosts, table_id = await prepare_multi_dc_repair(manager)
servers, cql, hosts, ks, table_id = await prepare_multi_dc_repair(manager)
dcs_filter = "DC1,DC2"
hosts_filter = f"{hosts[0].host_id},{hosts[2].host_id}"
@@ -304,7 +308,7 @@ async def test_tablet_repair_hosts_and_dcs_filter(manager: ManagerClient):
token = -1
async def repair_task():
await inject_error_on(manager, "repair_tablet_fail_on_rpc_call", servers)
await manager.api.tablet_repair(servers[0].ip_addr, "test", "test", token, hosts_filter=hosts_filter, dcs_filter=dcs_filter)
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, hosts_filter=hosts_filter, dcs_filter=dcs_filter)
async def check_filter():
tablet_task_id = None