From 9829b1594ff71c82ff4d5bc623d531db2cbc5779 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Sun, 19 Jan 2025 08:52:50 +0200 Subject: [PATCH] topology_tasks/test_tablet_tasks: use new_test_keyspace Signed-off-by: Benny Halevy --- test/topology_tasks/test_tablet_tasks.py | 214 +++++++++++------------ 1 file changed, 107 insertions(+), 107 deletions(-) diff --git a/test/topology_tasks/test_tablet_tasks.py b/test/topology_tasks/test_tablet_tasks.py index 02455f6e57..2e9eeed157 100644 --- a/test/topology_tasks/test_tablet_tasks.py +++ b/test/topology_tasks/test_tablet_tasks.py @@ -14,6 +14,7 @@ from test.pylib.manager_client import ManagerClient from test.pylib.repair import create_table_insert_data_for_repair, get_tablet_task_id from test.pylib.tablets import get_all_tablet_replicas from test.topology.conftest import skip_mode +from test.topology.util import create_new_test_keyspace, new_test_keyspace from test.topology_custom.test_tablets2 import inject_error_on from test.topology_tasks.task_manager_client import TaskManagerClient from test.topology_tasks.task_manager_types import TaskStatus, TaskStats @@ -30,9 +31,9 @@ async def message_injection(manager: ManagerClient, servers: list[ServerInfo], i for server in servers: await manager.api.message_injection(server.ip_addr, injection) -async def wait_tasks_created(tm: TaskManagerClient, server: ServerInfo, module_name: str, expected_number: int, type: str, table: Optional[str] = None): +async def wait_tasks_created(tm: TaskManagerClient, server: ServerInfo, module_name: str, expected_number: int, type: str, keyspace: str, table: Optional[str] = None): async def get_tasks(): - tasks = [task for task in await tm.list_tasks(server.ip_addr, module_name) if task.kind == "cluster" and task.type == type and task.keyspace == "test"] + tasks = [task for task in await tm.list_tasks(server.ip_addr, module_name) if task.kind == "cluster" and task.type == type and task.keyspace == keyspace] return [task for task in tasks if not table or table == task.table] tasks = await get_tasks() @@ -40,7 +41,7 @@ async def wait_tasks_created(tm: TaskManagerClient, server: ServerInfo, module_n tasks = await get_tasks() return tasks -def check_task_status(status: TaskStatus, states: list[str], type: str, scope: str, abortable: bool, keyspace: str = "test", table: str = "test", possible_child_num: list[int] = [0]): +def check_task_status(status: TaskStatus, states: list[str], type: str, scope: str, abortable: bool, keyspace: str, table: str = "test", possible_child_num: list[int] = [0]): assert status.scope == scope assert status.kind == "cluster" assert status.type == type @@ -50,9 +51,9 @@ def check_task_status(status: TaskStatus, states: list[str], type: str, scope: s assert len(status.children_ids) in possible_child_num assert status.state in states -async def check_and_abort_repair_task(manager: ManagerClient, tm: TaskManagerClient, servers: list[ServerInfo], module_name: str): +async def check_and_abort_repair_task(manager: ManagerClient, tm: TaskManagerClient, servers: list[ServerInfo], module_name: str, keyspace: str = "test"): # Wait until user repair task is created. - repair_tasks = await wait_tasks_created(tm, servers[0], module_name, 1, "user_repair") + repair_tasks = await wait_tasks_created(tm, servers[0], module_name, 1, "user_repair", keyspace=keyspace) task = repair_tasks[0] assert task.scope == "table" @@ -62,14 +63,14 @@ async def check_and_abort_repair_task(manager: ManagerClient, tm: TaskManagerCli status = await tm.get_task_status(servers[0].ip_addr, task.task_id) - check_task_status(status, ["created", "running"], "user_repair", "table", True) + check_task_status(status, ["created", "running"], "user_repair", "table", True, keyspace) log = await manager.server_open_log(servers[0].server_id) mark = await log.mark() async def wait_for_task(): status_wait = await tm.wait_for_task(servers[0].ip_addr, task.task_id) - check_task_status(status_wait, ["done"], "user_repair", "table", True) + check_task_status(status_wait, ["done"], "user_repair", "table", True, keyspace) async def abort_task(): await log.wait_for('tablet_virtual_task: wait until tablet operation is finished', from_mark=mark) @@ -83,6 +84,7 @@ async def test_tablet_repair_task(manager: ManagerClient): module_name = "tablets" tm = TaskManagerClient(manager.api) + # FIXME: use unique_name for keyspace servers, cql, hosts, table_id = await create_table_insert_data_for_repair(manager) assert module_name in await tm.list_modules(servers[0].ip_addr), "tablets module wasn't registered" @@ -94,16 +96,16 @@ async def test_tablet_repair_task(manager: ManagerClient): await asyncio.gather(repair_task(), check_and_abort_repair_task(manager, tm, servers, module_name)) -async def check_repair_task_list(tm: TaskManagerClient, servers: list[ServerInfo], module_name: str): +async def check_repair_task_list(tm: TaskManagerClient, servers: list[ServerInfo], module_name: str, keyspace: str = "test"): def get_task_with_id(repair_tasks, task_id): tasks_with_id1 = [task for task in repair_tasks if task.task_id == task_id] assert len(tasks_with_id1) == 1 return tasks_with_id1[0] # Wait until user repair tasks are created. - repair_tasks0 = await wait_tasks_created(tm, servers[0], module_name, len(servers), "user_repair") - repair_tasks1 = await wait_tasks_created(tm, servers[1], module_name, len(servers), "user_repair") - repair_tasks2 = await wait_tasks_created(tm, servers[2], module_name, len(servers), "user_repair") + repair_tasks0 = await wait_tasks_created(tm, servers[0], module_name, len(servers), "user_repair", keyspace=keyspace) + repair_tasks1 = await wait_tasks_created(tm, servers[1], module_name, len(servers), "user_repair", keyspace=keyspace) + repair_tasks2 = await wait_tasks_created(tm, servers[2], module_name, len(servers), "user_repair", keyspace=keyspace) assert len(repair_tasks0) == len(repair_tasks1), f"Different number of repair virtual tasks on nodes {servers[0].server_id} and {servers[1].server_id}" assert len(repair_tasks0) == len(repair_tasks2), f"Different number of repair virtual tasks on nodes {servers[0].server_id} and {servers[2].server_id}" @@ -176,7 +178,7 @@ async def test_tablet_repair_task_children(manager: ManagerClient): async def check_children(): # Wait until user repair task is created. - repair_tasks = await wait_tasks_created(tm, servers[0], module_name, 1, "user_repair") + repair_tasks = await wait_tasks_created(tm, servers[0], module_name, 1, "user_repair", "test") status = await tm.wait_for_task(servers[0].ip_addr, repair_tasks[0].task_id) assert len(status.children_ids) == 1 @@ -198,38 +200,38 @@ async def prepare_migration_test(manager: ManagerClient): await make_server() cql = manager.get_cql() - await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") - await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c int);") + ks = await create_new_test_keyspace(cql, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") + await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);") await make_server() - await cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({1}, {1});") + await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({1}, {1});") - return (servers, host_ids) + return (ks, servers, host_ids) @pytest.mark.asyncio @skip_mode('release', 'error injections are not supported in release mode') async def test_tablet_migration_task(manager: ManagerClient): module_name = "tablets" tm = TaskManagerClient(manager.api) - servers, host_ids = await prepare_migration_test(manager) + ks, servers, host_ids = await prepare_migration_test(manager) injection = "handle_tablet_migration_end_migration" async def move_tablet(old_replica, new_replica): await manager.api.enable_injection(servers[0].ip_addr, injection, False) - await manager.api.move_tablet(servers[0].ip_addr, "test", "test", old_replica[0], old_replica[1], new_replica[0], new_replica[1], 0) + await manager.api.move_tablet(servers[0].ip_addr, ks, "test", old_replica[0], old_replica[1], new_replica[0], new_replica[1], 0) async def check(type): # Wait until migration task is created. - migration_tasks = await wait_tasks_created(tm, servers[0], module_name, 1, type) + migration_tasks = await wait_tasks_created(tm, servers[0], module_name, 1, type, keyspace=ks) assert len(migration_tasks) == 1 status = await tm.get_task_status(servers[0].ip_addr, migration_tasks[0].task_id) - check_task_status(status, ["created", "running"], type, "tablet", False) + check_task_status(status, ["created", "running"], type, "tablet", False, keyspace=ks) await manager.api.disable_injection(servers[0].ip_addr, injection) - replicas = await get_all_tablet_replicas(manager, servers[0], 'test', 'test') + replicas = await get_all_tablet_replicas(manager, servers[0], ks, 'test') assert len(replicas) == 1 and len(replicas[0].replicas) == 1 intranode_migration_src = replicas[0].replicas[0] @@ -246,16 +248,16 @@ async def test_tablet_migration_task(manager: ManagerClient): async def test_tablet_migration_task_list(manager: ManagerClient): module_name = "tablets" tm = TaskManagerClient(manager.api) - servers, host_ids = await prepare_migration_test(manager) + ks, servers, host_ids = await prepare_migration_test(manager) injection = "handle_tablet_migration_end_migration" async def move_tablet(server, old_replica, new_replica): - await manager.api.move_tablet(server.ip_addr, "test", "test", old_replica[0], old_replica[1], new_replica[0], new_replica[1], 0) + await manager.api.move_tablet(server.ip_addr, ks, "test", old_replica[0], old_replica[1], new_replica[0], new_replica[1], 0) async def check_migration_task_list(type: str): # Wait until migration tasks are created. - migration_tasks0 = await wait_tasks_created(tm, servers[0], module_name, 1, type) - migration_tasks1 = await wait_tasks_created(tm, servers[1], module_name, 1, type) + migration_tasks0 = await wait_tasks_created(tm, servers[0], module_name, 1, type, keyspace=ks) + migration_tasks1 = await wait_tasks_created(tm, servers[1], module_name, 1, type, keyspace=ks) assert len(migration_tasks0) == len(migration_tasks1), f"Different number of migration virtual tasks on nodes {servers[0].server_id} and {servers[1].server_id}" assert len(migration_tasks0) == 1, f"Wrong number of migration virtual tasks" @@ -270,11 +272,11 @@ async def test_tablet_migration_task_list(manager: ManagerClient): assert task.kind == "cluster" assert task.scope == "tablet" assert task.table == "test" - assert task.keyspace == "test" + assert task.keyspace == ks await disable_injection(manager, servers, injection) - replicas = await get_all_tablet_replicas(manager, servers[0], 'test', 'test') + replicas = await get_all_tablet_replicas(manager, servers[0], ks, 'test') assert len(replicas) == 1 and len(replicas[0].replicas) == 1 intranode_migration_src = replicas[0].replicas[0] @@ -293,17 +295,17 @@ async def test_tablet_migration_task_list(manager: ManagerClient): async def test_tablet_migration_task_failed(manager: ManagerClient): module_name = "tablets" tm = TaskManagerClient(manager.api) - servers, host_ids = await prepare_migration_test(manager) + ks, servers, host_ids = await prepare_migration_test(manager) wait_injection = "stream_tablet_wait" throw_injection = "stream_tablet_move_to_cleanup" async def move_tablet(old_replica, new_replica): - await manager.api.move_tablet(servers[0].ip_addr, "test", "test", old_replica[0], old_replica[1], new_replica[0], new_replica[1], 0) + await manager.api.move_tablet(servers[0].ip_addr, ks, "test", old_replica[0], old_replica[1], new_replica[0], new_replica[1], 0) async def wait_for_task(task_id, type): status = await tm.wait_for_task(servers[0].ip_addr, task_id) - check_task_status(status, ["failed"], type, "tablet", False) + check_task_status(status, ["failed"], type, "tablet", False, keyspace=ks) async def resume_migration(log, mark): await log.wait_for('tablet_virtual_task: wait until tablet operation is finished', from_mark=mark) @@ -311,7 +313,7 @@ async def test_tablet_migration_task_failed(manager: ManagerClient): async def check(type, log, mark): # Wait until migration task is created. - migration_tasks = await wait_tasks_created(tm, servers[0], module_name, 1, type) + migration_tasks = await wait_tasks_created(tm, servers[0], module_name, 1, type, keyspace=ks) assert len(migration_tasks) == 1 await asyncio.gather(wait_for_task(migration_tasks[0].task_id, type), resume_migration(log, mark)) @@ -322,7 +324,7 @@ async def test_tablet_migration_task_failed(manager: ManagerClient): log = await manager.server_open_log(servers[0].server_id) mark = await log.mark() - replicas = await get_all_tablet_replicas(manager, servers[0], 'test', 'test') + replicas = await get_all_tablet_replicas(manager, servers[0], ks, 'test') assert len(replicas) == 1 and len(replicas[0].replicas) == 1 src = replicas[0].replicas[0] @@ -336,6 +338,7 @@ async def test_repair_task_info_is_none_when_no_running_repair(manager: ManagerC tm = TaskManagerClient(manager.api) token = -1 + # FIXME: use unique_name for keyspace servers, cql, hosts, table_id = await create_table_insert_data_for_repair(manager) assert module_name in await tm.list_modules(servers[0].ip_addr), "tablets module wasn't registered" @@ -350,7 +353,7 @@ async def test_repair_task_info_is_none_when_no_running_repair(manager: ManagerC await manager.api.tablet_repair(servers[0].ip_addr, "test", "test", token) async def wait_and_check_none(): - task = (await wait_tasks_created(tm, servers[0], module_name, 1,"user_repair"))[0] + task = (await wait_tasks_created(tm, servers[0], module_name, 1,"user_repair", keyspace="test"))[0] await disable_injection(manager, servers, "repair_tablet_fail_on_rpc_call") status = await tm.wait_for_task(servers[0].ip_addr, task.task_id) await check_none() @@ -399,34 +402,33 @@ async def test_tablet_resize_task(manager: ManagerClient): await manager.api.disable_tablet_balancing(servers[0].ip_addr) cql = manager.get_cql() - keyspace = "test" table1 = "test1" table2 = "test2" - await cql.run_async(f"CREATE KEYSPACE {keyspace} WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} AND tablets = {{'initial': 1}};") - await cql.run_async(f"CREATE TABLE {keyspace}.{table1} (pk int PRIMARY KEY, c blob) WITH gc_grace_seconds=0 AND bloom_filter_fp_chance=1;") - await cql.run_async(f"CREATE TABLE {keyspace}.{table2} (pk int PRIMARY KEY, c blob) WITH gc_grace_seconds=0 AND bloom_filter_fp_chance=1;") + async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as keyspace: + await cql.run_async(f"CREATE TABLE {keyspace}.{table1} (pk int PRIMARY KEY, c blob) WITH gc_grace_seconds=0 AND bloom_filter_fp_chance=1;") + await cql.run_async(f"CREATE TABLE {keyspace}.{table2} (pk int PRIMARY KEY, c blob) WITH gc_grace_seconds=0 AND bloom_filter_fp_chance=1;") - total_keys = 60 - keys = range(total_keys) - await prepare_split(manager, servers[0], keyspace, table1, keys) - await enable_tablet_balancing_and_wait(manager, servers[0], "Detected tablet split for table") - await wait_tasks_created(tm, servers[0], module_name, 0, "split", table1) + total_keys = 60 + keys = range(total_keys) + await prepare_split(manager, servers[0], keyspace, table1, keys) + await enable_tablet_balancing_and_wait(manager, servers[0], "Detected tablet split for table") + await wait_tasks_created(tm, servers[0], module_name, 0, "split", keyspace, table1) - await prepare_split(manager, servers[0], keyspace, table2, keys) - await prepare_merge(manager, servers[0], keyspace, table1, keys[:-1]) - await manager.api.keyspace_compaction(servers[0].ip_addr, "test") + await prepare_split(manager, servers[0], keyspace, table2, keys) + await prepare_merge(manager, servers[0], keyspace, table1, keys[:-1]) + await manager.api.keyspace_compaction(servers[0].ip_addr, keyspace) - injection = "tablet_split_finalization_postpone" - await enable_injection(manager, servers, injection) - await manager.api.enable_tablet_balancing(servers[0].ip_addr) + injection = "tablet_split_finalization_postpone" + await enable_injection(manager, servers, injection) + await manager.api.enable_tablet_balancing(servers[0].ip_addr) - async def wait_and_check_status(server, type, keyspace, table): - task = (await wait_tasks_created(tm, server, module_name, 1, type, table))[0] - status = await tm.get_task_status(server.ip_addr, task.task_id) - check_task_status(status, ["running"], type, "table", False, keyspace, table, [0, 1, 2]) + async def wait_and_check_status(server, type, keyspace, table): + task = (await wait_tasks_created(tm, server, module_name, 1, type, keyspace, table))[0] + status = await tm.get_task_status(server.ip_addr, task.task_id) + check_task_status(status, ["running"], type, "table", False, keyspace, table, [0, 1, 2]) - await wait_and_check_status(servers[0], "split", keyspace, table2) - await wait_and_check_status(servers[0], "merge", keyspace, table1) + await wait_and_check_status(servers[0], "split", keyspace, table2) + await wait_and_check_status(servers[0], "merge", keyspace, table1) @pytest.mark.asyncio @skip_mode('release', 'error injections are not supported in release mode') @@ -443,50 +445,49 @@ async def test_tablet_resize_list(manager: ManagerClient): await manager.api.disable_tablet_balancing(servers[0].ip_addr) cql = manager.get_cql() - keyspace = "test" table1 = "test1" - await cql.run_async(f"CREATE KEYSPACE {keyspace} WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} AND tablets = {{'initial': 1}};") - await cql.run_async(f"CREATE TABLE {keyspace}.{table1} (pk int PRIMARY KEY, c blob) WITH gc_grace_seconds=0 AND bloom_filter_fp_chance=1;") + async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as keyspace: + await cql.run_async(f"CREATE TABLE {keyspace}.{table1} (pk int PRIMARY KEY, c blob) WITH gc_grace_seconds=0 AND bloom_filter_fp_chance=1;") - total_keys = 60 - keys = range(total_keys) - await prepare_split(manager, servers[0], keyspace, table1, keys) + total_keys = 60 + keys = range(total_keys) + await prepare_split(manager, servers[0], keyspace, table1, keys) - servers.append(await manager.server_add(cmdline=cmdline, config={ - 'error_injections_at_startup': ['short_tablet_stats_refresh_interval'] - })) + servers.append(await manager.server_add(cmdline=cmdline, config={ + 'error_injections_at_startup': ['short_tablet_stats_refresh_interval'] + })) - s1_log = await manager.server_open_log(servers[0].server_id) - s1_mark = await s1_log.mark() + s1_log = await manager.server_open_log(servers[0].server_id) + s1_mark = await s1_log.mark() - injection = "tablet_split_finalization_postpone" - compaction_injection = "split_sstable_rewrite" - await enable_injection(manager, servers, injection) - await manager.api.enable_injection(servers[0].ip_addr, compaction_injection, one_shot=True) + injection = "tablet_split_finalization_postpone" + compaction_injection = "split_sstable_rewrite" + await enable_injection(manager, servers, injection) + await manager.api.enable_injection(servers[0].ip_addr, compaction_injection, one_shot=True) - await manager.api.enable_tablet_balancing(servers[0].ip_addr) - task0 = (await wait_tasks_created(tm, servers[0], module_name, 1, "split", table1))[0] - task1 = (await wait_tasks_created(tm, servers[1], module_name, 1, "split", table1))[0] + await manager.api.enable_tablet_balancing(servers[0].ip_addr) + task0 = (await wait_tasks_created(tm, servers[0], module_name, 1, "split", keyspace, table1))[0] + task1 = (await wait_tasks_created(tm, servers[1], module_name, 1, "split", keyspace, table1))[0] - assert task0.task_id == task1.task_id + assert task0.task_id == task1.task_id - for task in [task0, task1]: - assert task.state == "running" - assert task.type == "split" - assert task.kind == "cluster" - assert task.scope == "table" - assert task.table == table1 - assert task.keyspace == keyspace + for task in [task0, task1]: + assert task.state == "running" + assert task.type == "split" + assert task.kind == "cluster" + assert task.scope == "table" + assert task.table == table1 + assert task.keyspace == keyspace - await s1_log.wait_for("split_sstable_rewrite: waiting", from_mark=s1_mark) - await manager.api.message_injection(servers[0].ip_addr, "split_sstable_rewrite") + await s1_log.wait_for("split_sstable_rewrite: waiting", from_mark=s1_mark) + await manager.api.message_injection(servers[0].ip_addr, "split_sstable_rewrite") - status1 = await tm.get_task_status(servers[1].ip_addr, task0.task_id) - status0 = await tm.get_task_status(servers[0].ip_addr, task0.task_id) - assert len(status0.children_ids) == 2 - assert status0.children_ids == status1.children_ids + status1 = await tm.get_task_status(servers[1].ip_addr, task0.task_id) + status0 = await tm.get_task_status(servers[0].ip_addr, task0.task_id) + assert len(status0.children_ids) == 2 + assert status0.children_ids == status1.children_ids - await disable_injection(manager, servers, injection) + await disable_injection(manager, servers, injection) @pytest.mark.asyncio @@ -505,35 +506,34 @@ async def test_tablet_resize_revoked(manager: ManagerClient): await manager.api.disable_tablet_balancing(servers[0].ip_addr) cql = manager.get_cql() - keyspace = "test" table1 = "test1" - await cql.run_async(f"CREATE KEYSPACE {keyspace} WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} AND tablets = {{'initial': 1}};") - await cql.run_async(f"CREATE TABLE {keyspace}.{table1} (pk int PRIMARY KEY, c blob) WITH gc_grace_seconds=0 AND bloom_filter_fp_chance=1;") + async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as keyspace: + await cql.run_async(f"CREATE TABLE {keyspace}.{table1} (pk int PRIMARY KEY, c blob) WITH gc_grace_seconds=0 AND bloom_filter_fp_chance=1;") - total_keys = 60 - keys = range(total_keys) - await prepare_split(manager, servers[0], keyspace, table1, keys) + total_keys = 60 + keys = range(total_keys) + await prepare_split(manager, servers[0], keyspace, table1, keys) - injection = "tablet_split_finalization_postpone" - await enable_injection(manager, servers, injection) + injection = "tablet_split_finalization_postpone" + await enable_injection(manager, servers, injection) - await manager.api.enable_tablet_balancing(servers[0].ip_addr) - task0 = (await wait_tasks_created(tm, servers[0], module_name, 1, "split", table1))[0] + await manager.api.enable_tablet_balancing(servers[0].ip_addr) + task0 = (await wait_tasks_created(tm, servers[0], module_name, 1, "split", keyspace, table1))[0] - log = await manager.server_open_log(servers[0].server_id) - mark = await log.mark() + log = await manager.server_open_log(servers[0].server_id) + mark = await log.mark() - async def revoke_resize(log, mark): - await log.wait_for('tablet_virtual_task: wait until tablet operation is finished', from_mark=mark) - await asyncio.gather(*[cql.run_async(f"DELETE FROM {keyspace}.{table1} WHERE pk={k};") for k in keys]) + async def revoke_resize(log, mark): + await log.wait_for('tablet_virtual_task: wait until tablet operation is finished', from_mark=mark) + await asyncio.gather(*[cql.run_async(f"DELETE FROM {keyspace}.{table1} WHERE pk={k};") for k in keys]) - await manager.api.flush_keyspace(servers[0].ip_addr, keyspace) + await manager.api.flush_keyspace(servers[0].ip_addr, keyspace) - async def wait_for_task(task_id): - status = await tm.wait_for_task(servers[0].ip_addr, task_id) - check_task_status(status, ["suspended"], "split", "table", False, keyspace, table1, [0, 1, 2]) + async def wait_for_task(task_id): + status = await tm.wait_for_task(servers[0].ip_addr, task_id) + check_task_status(status, ["suspended"], "split", "table", False, keyspace, table1, [0, 1, 2]) - await asyncio.gather(revoke_resize(log, mark), wait_for_task(task0.task_id)) + await asyncio.gather(revoke_resize(log, mark), wait_for_task(task0.task_id)) @pytest.mark.asyncio @skip_mode('release', 'error injections are not supported in release mode')