topology_tasks/test_tablet_tasks: use new_test_keyspace

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
Benny Halevy
2025-01-19 08:52:50 +02:00
parent 12f85ce57c
commit 9829b1594f

View File

@@ -14,6 +14,7 @@ from test.pylib.manager_client import ManagerClient
from test.pylib.repair import create_table_insert_data_for_repair, get_tablet_task_id
from test.pylib.tablets import get_all_tablet_replicas
from test.topology.conftest import skip_mode
from test.topology.util import create_new_test_keyspace, new_test_keyspace
from test.topology_custom.test_tablets2 import inject_error_on
from test.topology_tasks.task_manager_client import TaskManagerClient
from test.topology_tasks.task_manager_types import TaskStatus, TaskStats
@@ -30,9 +31,9 @@ async def message_injection(manager: ManagerClient, servers: list[ServerInfo], i
for server in servers:
await manager.api.message_injection(server.ip_addr, injection)
async def wait_tasks_created(tm: TaskManagerClient, server: ServerInfo, module_name: str, expected_number: int, type: str, table: Optional[str] = None):
async def wait_tasks_created(tm: TaskManagerClient, server: ServerInfo, module_name: str, expected_number: int, type: str, keyspace: str, table: Optional[str] = None):
async def get_tasks():
tasks = [task for task in await tm.list_tasks(server.ip_addr, module_name) if task.kind == "cluster" and task.type == type and task.keyspace == "test"]
tasks = [task for task in await tm.list_tasks(server.ip_addr, module_name) if task.kind == "cluster" and task.type == type and task.keyspace == keyspace]
return [task for task in tasks if not table or table == task.table]
tasks = await get_tasks()
@@ -40,7 +41,7 @@ async def wait_tasks_created(tm: TaskManagerClient, server: ServerInfo, module_n
tasks = await get_tasks()
return tasks
def check_task_status(status: TaskStatus, states: list[str], type: str, scope: str, abortable: bool, keyspace: str = "test", table: str = "test", possible_child_num: list[int] = [0]):
def check_task_status(status: TaskStatus, states: list[str], type: str, scope: str, abortable: bool, keyspace: str, table: str = "test", possible_child_num: list[int] = [0]):
assert status.scope == scope
assert status.kind == "cluster"
assert status.type == type
@@ -50,9 +51,9 @@ def check_task_status(status: TaskStatus, states: list[str], type: str, scope: s
assert len(status.children_ids) in possible_child_num
assert status.state in states
async def check_and_abort_repair_task(manager: ManagerClient, tm: TaskManagerClient, servers: list[ServerInfo], module_name: str):
async def check_and_abort_repair_task(manager: ManagerClient, tm: TaskManagerClient, servers: list[ServerInfo], module_name: str, keyspace: str = "test"):
# Wait until user repair task is created.
repair_tasks = await wait_tasks_created(tm, servers[0], module_name, 1, "user_repair")
repair_tasks = await wait_tasks_created(tm, servers[0], module_name, 1, "user_repair", keyspace=keyspace)
task = repair_tasks[0]
assert task.scope == "table"
@@ -62,14 +63,14 @@ async def check_and_abort_repair_task(manager: ManagerClient, tm: TaskManagerCli
status = await tm.get_task_status(servers[0].ip_addr, task.task_id)
check_task_status(status, ["created", "running"], "user_repair", "table", True)
check_task_status(status, ["created", "running"], "user_repair", "table", True, keyspace)
log = await manager.server_open_log(servers[0].server_id)
mark = await log.mark()
async def wait_for_task():
status_wait = await tm.wait_for_task(servers[0].ip_addr, task.task_id)
check_task_status(status_wait, ["done"], "user_repair", "table", True)
check_task_status(status_wait, ["done"], "user_repair", "table", True, keyspace)
async def abort_task():
await log.wait_for('tablet_virtual_task: wait until tablet operation is finished', from_mark=mark)
@@ -83,6 +84,7 @@ async def test_tablet_repair_task(manager: ManagerClient):
module_name = "tablets"
tm = TaskManagerClient(manager.api)
# FIXME: use unique_name for keyspace
servers, cql, hosts, table_id = await create_table_insert_data_for_repair(manager)
assert module_name in await tm.list_modules(servers[0].ip_addr), "tablets module wasn't registered"
@@ -94,16 +96,16 @@ async def test_tablet_repair_task(manager: ManagerClient):
await asyncio.gather(repair_task(), check_and_abort_repair_task(manager, tm, servers, module_name))
async def check_repair_task_list(tm: TaskManagerClient, servers: list[ServerInfo], module_name: str):
async def check_repair_task_list(tm: TaskManagerClient, servers: list[ServerInfo], module_name: str, keyspace: str = "test"):
def get_task_with_id(repair_tasks, task_id):
tasks_with_id1 = [task for task in repair_tasks if task.task_id == task_id]
assert len(tasks_with_id1) == 1
return tasks_with_id1[0]
# Wait until user repair tasks are created.
repair_tasks0 = await wait_tasks_created(tm, servers[0], module_name, len(servers), "user_repair")
repair_tasks1 = await wait_tasks_created(tm, servers[1], module_name, len(servers), "user_repair")
repair_tasks2 = await wait_tasks_created(tm, servers[2], module_name, len(servers), "user_repair")
repair_tasks0 = await wait_tasks_created(tm, servers[0], module_name, len(servers), "user_repair", keyspace=keyspace)
repair_tasks1 = await wait_tasks_created(tm, servers[1], module_name, len(servers), "user_repair", keyspace=keyspace)
repair_tasks2 = await wait_tasks_created(tm, servers[2], module_name, len(servers), "user_repair", keyspace=keyspace)
assert len(repair_tasks0) == len(repair_tasks1), f"Different number of repair virtual tasks on nodes {servers[0].server_id} and {servers[1].server_id}"
assert len(repair_tasks0) == len(repair_tasks2), f"Different number of repair virtual tasks on nodes {servers[0].server_id} and {servers[2].server_id}"
@@ -176,7 +178,7 @@ async def test_tablet_repair_task_children(manager: ManagerClient):
async def check_children():
# Wait until user repair task is created.
repair_tasks = await wait_tasks_created(tm, servers[0], module_name, 1, "user_repair")
repair_tasks = await wait_tasks_created(tm, servers[0], module_name, 1, "user_repair", "test")
status = await tm.wait_for_task(servers[0].ip_addr, repair_tasks[0].task_id)
assert len(status.children_ids) == 1
@@ -198,38 +200,38 @@ async def prepare_migration_test(manager: ManagerClient):
await make_server()
cql = manager.get_cql()
await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}")
await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c int);")
ks = await create_new_test_keyspace(cql, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}")
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
await make_server()
await cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({1}, {1});")
await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({1}, {1});")
return (servers, host_ids)
return (ks, servers, host_ids)
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_tablet_migration_task(manager: ManagerClient):
module_name = "tablets"
tm = TaskManagerClient(manager.api)
servers, host_ids = await prepare_migration_test(manager)
ks, servers, host_ids = await prepare_migration_test(manager)
injection = "handle_tablet_migration_end_migration"
async def move_tablet(old_replica, new_replica):
await manager.api.enable_injection(servers[0].ip_addr, injection, False)
await manager.api.move_tablet(servers[0].ip_addr, "test", "test", old_replica[0], old_replica[1], new_replica[0], new_replica[1], 0)
await manager.api.move_tablet(servers[0].ip_addr, ks, "test", old_replica[0], old_replica[1], new_replica[0], new_replica[1], 0)
async def check(type):
# Wait until migration task is created.
migration_tasks = await wait_tasks_created(tm, servers[0], module_name, 1, type)
migration_tasks = await wait_tasks_created(tm, servers[0], module_name, 1, type, keyspace=ks)
assert len(migration_tasks) == 1
status = await tm.get_task_status(servers[0].ip_addr, migration_tasks[0].task_id)
check_task_status(status, ["created", "running"], type, "tablet", False)
check_task_status(status, ["created", "running"], type, "tablet", False, keyspace=ks)
await manager.api.disable_injection(servers[0].ip_addr, injection)
replicas = await get_all_tablet_replicas(manager, servers[0], 'test', 'test')
replicas = await get_all_tablet_replicas(manager, servers[0], ks, 'test')
assert len(replicas) == 1 and len(replicas[0].replicas) == 1
intranode_migration_src = replicas[0].replicas[0]
@@ -246,16 +248,16 @@ async def test_tablet_migration_task(manager: ManagerClient):
async def test_tablet_migration_task_list(manager: ManagerClient):
module_name = "tablets"
tm = TaskManagerClient(manager.api)
servers, host_ids = await prepare_migration_test(manager)
ks, servers, host_ids = await prepare_migration_test(manager)
injection = "handle_tablet_migration_end_migration"
async def move_tablet(server, old_replica, new_replica):
await manager.api.move_tablet(server.ip_addr, "test", "test", old_replica[0], old_replica[1], new_replica[0], new_replica[1], 0)
await manager.api.move_tablet(server.ip_addr, ks, "test", old_replica[0], old_replica[1], new_replica[0], new_replica[1], 0)
async def check_migration_task_list(type: str):
# Wait until migration tasks are created.
migration_tasks0 = await wait_tasks_created(tm, servers[0], module_name, 1, type)
migration_tasks1 = await wait_tasks_created(tm, servers[1], module_name, 1, type)
migration_tasks0 = await wait_tasks_created(tm, servers[0], module_name, 1, type, keyspace=ks)
migration_tasks1 = await wait_tasks_created(tm, servers[1], module_name, 1, type, keyspace=ks)
assert len(migration_tasks0) == len(migration_tasks1), f"Different number of migration virtual tasks on nodes {servers[0].server_id} and {servers[1].server_id}"
assert len(migration_tasks0) == 1, f"Wrong number of migration virtual tasks"
@@ -270,11 +272,11 @@ async def test_tablet_migration_task_list(manager: ManagerClient):
assert task.kind == "cluster"
assert task.scope == "tablet"
assert task.table == "test"
assert task.keyspace == "test"
assert task.keyspace == ks
await disable_injection(manager, servers, injection)
replicas = await get_all_tablet_replicas(manager, servers[0], 'test', 'test')
replicas = await get_all_tablet_replicas(manager, servers[0], ks, 'test')
assert len(replicas) == 1 and len(replicas[0].replicas) == 1
intranode_migration_src = replicas[0].replicas[0]
@@ -293,17 +295,17 @@ async def test_tablet_migration_task_list(manager: ManagerClient):
async def test_tablet_migration_task_failed(manager: ManagerClient):
module_name = "tablets"
tm = TaskManagerClient(manager.api)
servers, host_ids = await prepare_migration_test(manager)
ks, servers, host_ids = await prepare_migration_test(manager)
wait_injection = "stream_tablet_wait"
throw_injection = "stream_tablet_move_to_cleanup"
async def move_tablet(old_replica, new_replica):
await manager.api.move_tablet(servers[0].ip_addr, "test", "test", old_replica[0], old_replica[1], new_replica[0], new_replica[1], 0)
await manager.api.move_tablet(servers[0].ip_addr, ks, "test", old_replica[0], old_replica[1], new_replica[0], new_replica[1], 0)
async def wait_for_task(task_id, type):
status = await tm.wait_for_task(servers[0].ip_addr, task_id)
check_task_status(status, ["failed"], type, "tablet", False)
check_task_status(status, ["failed"], type, "tablet", False, keyspace=ks)
async def resume_migration(log, mark):
await log.wait_for('tablet_virtual_task: wait until tablet operation is finished', from_mark=mark)
@@ -311,7 +313,7 @@ async def test_tablet_migration_task_failed(manager: ManagerClient):
async def check(type, log, mark):
# Wait until migration task is created.
migration_tasks = await wait_tasks_created(tm, servers[0], module_name, 1, type)
migration_tasks = await wait_tasks_created(tm, servers[0], module_name, 1, type, keyspace=ks)
assert len(migration_tasks) == 1
await asyncio.gather(wait_for_task(migration_tasks[0].task_id, type), resume_migration(log, mark))
@@ -322,7 +324,7 @@ async def test_tablet_migration_task_failed(manager: ManagerClient):
log = await manager.server_open_log(servers[0].server_id)
mark = await log.mark()
replicas = await get_all_tablet_replicas(manager, servers[0], 'test', 'test')
replicas = await get_all_tablet_replicas(manager, servers[0], ks, 'test')
assert len(replicas) == 1 and len(replicas[0].replicas) == 1
src = replicas[0].replicas[0]
@@ -336,6 +338,7 @@ async def test_repair_task_info_is_none_when_no_running_repair(manager: ManagerC
tm = TaskManagerClient(manager.api)
token = -1
# FIXME: use unique_name for keyspace
servers, cql, hosts, table_id = await create_table_insert_data_for_repair(manager)
assert module_name in await tm.list_modules(servers[0].ip_addr), "tablets module wasn't registered"
@@ -350,7 +353,7 @@ async def test_repair_task_info_is_none_when_no_running_repair(manager: ManagerC
await manager.api.tablet_repair(servers[0].ip_addr, "test", "test", token)
async def wait_and_check_none():
task = (await wait_tasks_created(tm, servers[0], module_name, 1,"user_repair"))[0]
task = (await wait_tasks_created(tm, servers[0], module_name, 1,"user_repair", keyspace="test"))[0]
await disable_injection(manager, servers, "repair_tablet_fail_on_rpc_call")
status = await tm.wait_for_task(servers[0].ip_addr, task.task_id)
await check_none()
@@ -399,34 +402,33 @@ async def test_tablet_resize_task(manager: ManagerClient):
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
cql = manager.get_cql()
keyspace = "test"
table1 = "test1"
table2 = "test2"
await cql.run_async(f"CREATE KEYSPACE {keyspace} WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} AND tablets = {{'initial': 1}};")
await cql.run_async(f"CREATE TABLE {keyspace}.{table1} (pk int PRIMARY KEY, c blob) WITH gc_grace_seconds=0 AND bloom_filter_fp_chance=1;")
await cql.run_async(f"CREATE TABLE {keyspace}.{table2} (pk int PRIMARY KEY, c blob) WITH gc_grace_seconds=0 AND bloom_filter_fp_chance=1;")
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as keyspace:
await cql.run_async(f"CREATE TABLE {keyspace}.{table1} (pk int PRIMARY KEY, c blob) WITH gc_grace_seconds=0 AND bloom_filter_fp_chance=1;")
await cql.run_async(f"CREATE TABLE {keyspace}.{table2} (pk int PRIMARY KEY, c blob) WITH gc_grace_seconds=0 AND bloom_filter_fp_chance=1;")
total_keys = 60
keys = range(total_keys)
await prepare_split(manager, servers[0], keyspace, table1, keys)
await enable_tablet_balancing_and_wait(manager, servers[0], "Detected tablet split for table")
await wait_tasks_created(tm, servers[0], module_name, 0, "split", table1)
total_keys = 60
keys = range(total_keys)
await prepare_split(manager, servers[0], keyspace, table1, keys)
await enable_tablet_balancing_and_wait(manager, servers[0], "Detected tablet split for table")
await wait_tasks_created(tm, servers[0], module_name, 0, "split", keyspace, table1)
await prepare_split(manager, servers[0], keyspace, table2, keys)
await prepare_merge(manager, servers[0], keyspace, table1, keys[:-1])
await manager.api.keyspace_compaction(servers[0].ip_addr, "test")
await prepare_split(manager, servers[0], keyspace, table2, keys)
await prepare_merge(manager, servers[0], keyspace, table1, keys[:-1])
await manager.api.keyspace_compaction(servers[0].ip_addr, keyspace)
injection = "tablet_split_finalization_postpone"
await enable_injection(manager, servers, injection)
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
injection = "tablet_split_finalization_postpone"
await enable_injection(manager, servers, injection)
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
async def wait_and_check_status(server, type, keyspace, table):
task = (await wait_tasks_created(tm, server, module_name, 1, type, table))[0]
status = await tm.get_task_status(server.ip_addr, task.task_id)
check_task_status(status, ["running"], type, "table", False, keyspace, table, [0, 1, 2])
async def wait_and_check_status(server, type, keyspace, table):
task = (await wait_tasks_created(tm, server, module_name, 1, type, keyspace, table))[0]
status = await tm.get_task_status(server.ip_addr, task.task_id)
check_task_status(status, ["running"], type, "table", False, keyspace, table, [0, 1, 2])
await wait_and_check_status(servers[0], "split", keyspace, table2)
await wait_and_check_status(servers[0], "merge", keyspace, table1)
await wait_and_check_status(servers[0], "split", keyspace, table2)
await wait_and_check_status(servers[0], "merge", keyspace, table1)
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
@@ -443,50 +445,49 @@ async def test_tablet_resize_list(manager: ManagerClient):
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
cql = manager.get_cql()
keyspace = "test"
table1 = "test1"
await cql.run_async(f"CREATE KEYSPACE {keyspace} WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} AND tablets = {{'initial': 1}};")
await cql.run_async(f"CREATE TABLE {keyspace}.{table1} (pk int PRIMARY KEY, c blob) WITH gc_grace_seconds=0 AND bloom_filter_fp_chance=1;")
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as keyspace:
await cql.run_async(f"CREATE TABLE {keyspace}.{table1} (pk int PRIMARY KEY, c blob) WITH gc_grace_seconds=0 AND bloom_filter_fp_chance=1;")
total_keys = 60
keys = range(total_keys)
await prepare_split(manager, servers[0], keyspace, table1, keys)
total_keys = 60
keys = range(total_keys)
await prepare_split(manager, servers[0], keyspace, table1, keys)
servers.append(await manager.server_add(cmdline=cmdline, config={
'error_injections_at_startup': ['short_tablet_stats_refresh_interval']
}))
servers.append(await manager.server_add(cmdline=cmdline, config={
'error_injections_at_startup': ['short_tablet_stats_refresh_interval']
}))
s1_log = await manager.server_open_log(servers[0].server_id)
s1_mark = await s1_log.mark()
s1_log = await manager.server_open_log(servers[0].server_id)
s1_mark = await s1_log.mark()
injection = "tablet_split_finalization_postpone"
compaction_injection = "split_sstable_rewrite"
await enable_injection(manager, servers, injection)
await manager.api.enable_injection(servers[0].ip_addr, compaction_injection, one_shot=True)
injection = "tablet_split_finalization_postpone"
compaction_injection = "split_sstable_rewrite"
await enable_injection(manager, servers, injection)
await manager.api.enable_injection(servers[0].ip_addr, compaction_injection, one_shot=True)
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
task0 = (await wait_tasks_created(tm, servers[0], module_name, 1, "split", table1))[0]
task1 = (await wait_tasks_created(tm, servers[1], module_name, 1, "split", table1))[0]
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
task0 = (await wait_tasks_created(tm, servers[0], module_name, 1, "split", keyspace, table1))[0]
task1 = (await wait_tasks_created(tm, servers[1], module_name, 1, "split", keyspace, table1))[0]
assert task0.task_id == task1.task_id
assert task0.task_id == task1.task_id
for task in [task0, task1]:
assert task.state == "running"
assert task.type == "split"
assert task.kind == "cluster"
assert task.scope == "table"
assert task.table == table1
assert task.keyspace == keyspace
for task in [task0, task1]:
assert task.state == "running"
assert task.type == "split"
assert task.kind == "cluster"
assert task.scope == "table"
assert task.table == table1
assert task.keyspace == keyspace
await s1_log.wait_for("split_sstable_rewrite: waiting", from_mark=s1_mark)
await manager.api.message_injection(servers[0].ip_addr, "split_sstable_rewrite")
await s1_log.wait_for("split_sstable_rewrite: waiting", from_mark=s1_mark)
await manager.api.message_injection(servers[0].ip_addr, "split_sstable_rewrite")
status1 = await tm.get_task_status(servers[1].ip_addr, task0.task_id)
status0 = await tm.get_task_status(servers[0].ip_addr, task0.task_id)
assert len(status0.children_ids) == 2
assert status0.children_ids == status1.children_ids
status1 = await tm.get_task_status(servers[1].ip_addr, task0.task_id)
status0 = await tm.get_task_status(servers[0].ip_addr, task0.task_id)
assert len(status0.children_ids) == 2
assert status0.children_ids == status1.children_ids
await disable_injection(manager, servers, injection)
await disable_injection(manager, servers, injection)
@pytest.mark.asyncio
@@ -505,35 +506,34 @@ async def test_tablet_resize_revoked(manager: ManagerClient):
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
cql = manager.get_cql()
keyspace = "test"
table1 = "test1"
await cql.run_async(f"CREATE KEYSPACE {keyspace} WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} AND tablets = {{'initial': 1}};")
await cql.run_async(f"CREATE TABLE {keyspace}.{table1} (pk int PRIMARY KEY, c blob) WITH gc_grace_seconds=0 AND bloom_filter_fp_chance=1;")
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as keyspace:
await cql.run_async(f"CREATE TABLE {keyspace}.{table1} (pk int PRIMARY KEY, c blob) WITH gc_grace_seconds=0 AND bloom_filter_fp_chance=1;")
total_keys = 60
keys = range(total_keys)
await prepare_split(manager, servers[0], keyspace, table1, keys)
total_keys = 60
keys = range(total_keys)
await prepare_split(manager, servers[0], keyspace, table1, keys)
injection = "tablet_split_finalization_postpone"
await enable_injection(manager, servers, injection)
injection = "tablet_split_finalization_postpone"
await enable_injection(manager, servers, injection)
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
task0 = (await wait_tasks_created(tm, servers[0], module_name, 1, "split", table1))[0]
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
task0 = (await wait_tasks_created(tm, servers[0], module_name, 1, "split", keyspace, table1))[0]
log = await manager.server_open_log(servers[0].server_id)
mark = await log.mark()
log = await manager.server_open_log(servers[0].server_id)
mark = await log.mark()
async def revoke_resize(log, mark):
await log.wait_for('tablet_virtual_task: wait until tablet operation is finished', from_mark=mark)
await asyncio.gather(*[cql.run_async(f"DELETE FROM {keyspace}.{table1} WHERE pk={k};") for k in keys])
async def revoke_resize(log, mark):
await log.wait_for('tablet_virtual_task: wait until tablet operation is finished', from_mark=mark)
await asyncio.gather(*[cql.run_async(f"DELETE FROM {keyspace}.{table1} WHERE pk={k};") for k in keys])
await manager.api.flush_keyspace(servers[0].ip_addr, keyspace)
await manager.api.flush_keyspace(servers[0].ip_addr, keyspace)
async def wait_for_task(task_id):
status = await tm.wait_for_task(servers[0].ip_addr, task_id)
check_task_status(status, ["suspended"], "split", "table", False, keyspace, table1, [0, 1, 2])
async def wait_for_task(task_id):
status = await tm.wait_for_task(servers[0].ip_addr, task_id)
check_task_status(status, ["suspended"], "split", "table", False, keyspace, table1, [0, 1, 2])
await asyncio.gather(revoke_resize(log, mark), wait_for_task(task0.task_id))
await asyncio.gather(revoke_resize(log, mark), wait_for_task(task0.task_id))
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')