test: rewrite wait_task test

Rewrite test that checks whether task_manager/wait_task works properly.
The old version didn't work. Delete functions used in old version.
This commit is contained in:
Aleksandra Martyniuk
2023-08-04 13:34:52 +02:00
parent 9d2e55fd37
commit 629f893355
2 changed files with 17 additions and 14 deletions

View File

@@ -30,9 +30,6 @@ def wait_for_task(rest_api, task_id):
resp.raise_for_status()
return resp.json()
async def wait_for_task_async(rest_api, task_id):
rest_api.send("GET", f"task_manager/wait_task/{task_id}")
def get_task_status_recursively(rest_api, task_id):
resp = rest_api.send("GET", f"task_manager/task_status_recursive/{task_id}")
resp.raise_for_status()

View File

@@ -1,9 +1,8 @@
import requests
import time
import asyncio
from rest_util import new_test_module, new_test_task, set_tmp_task_ttl
from task_manager_utils import check_field_correctness, check_status_correctness, assert_task_does_not_exist, list_modules, get_task_status, list_tasks, get_task_status_recursively
from rest_util import new_test_module, new_test_task, set_tmp_task_ttl, ThreadWrapper
from task_manager_utils import check_field_correctness, check_status_correctness, assert_task_does_not_exist, list_modules, get_task_status, list_tasks, get_task_status_recursively, wait_for_task
long_time = 1000000000
@@ -80,23 +79,30 @@ def test_task_manager_not_abortable(rest_api):
resp = rest_api.send("POST", f"task_manager/abort_task/{task0}")
assert resp.status_code == requests.codes.internal_server_error, "Aborted unabortable task"
async def test_task_manager_wait(rest_api):
def wait_and_check_status(rest_api, id, sequence_number, keyspace, table):
status = wait_for_task(rest_api, id)
check_status_correctness(status, { "id": id, "state": "done", "sequence_number": sequence_number, "keyspace": keyspace, "table": table })
def test_task_manager_wait(rest_api):
with new_test_module(rest_api):
args0 = { "keyspace": "keyspace0", "table": "table0"}
keyspace = "keyspace0"
table = "table0"
args0 = { "keyspace": keyspace, "table": table }
with new_test_task(rest_api, args0) as task0:
print(f"created test task {task0}")
waiting_task = asyncio.create_task(wait_for_task_async(rest_api, task0))
done, pending = await asyncio.wait({waiting_task})
x = ThreadWrapper(target=wait_and_check_status, args=(rest_api, task0, 1, keyspace, table,))
x.start()
assert waiting_task in pending, "wait_task finished while the task was still running"
time.sleep(2) # Thread x should wait until finish_test_task.
assert x.is_alive, "task_manager/wait_task does not wait for task to be complete"
resp = rest_api.send("POST", f"task_manager_test/finish_test_task/{task0}")
resp.raise_for_status()
status = resp.json()
check_status_correctness(status, { "id": task0, "state": "done", "sequence_number": 1, "keyspace": "keyspace0", "table": "table0" })
assert waiting_task in done, "wait_task did not returned even though the task finished"
x.join()
assert_task_does_not_exist(rest_api, task0)
def test_task_manager_ttl(rest_api):