api: task_manager: add /task_manager/drain

In the following patches, get_status won't be unregistering finished
tasks. However, tests need a functionality to drop a task, so that
they could manipulate only with the tasks for operations that were
invoked by these tests.

Add /task_manager/drain/{module} to unregister all finished tasks
from the module. Add respective nodetool command.
This commit is contained in:
Aleksandra Martyniuk
2025-01-13 17:11:40 +01:00
parent 1151062b2a
commit e37d1bcb98
10 changed files with 121 additions and 4 deletions

View File

@@ -253,6 +253,30 @@
]
}
]
},
{
"path":"/task_manager/drain/{module}",
"operations":[
{
"method":"POST",
"summary":"Drain finished local tasks",
"type":"void",
"nickname":"drain_tasks",
"produces":[
"application/json"
],
"parameters":[
{
"name":"module",
"description":"The module to drain",
"required":true,
"allowMultiple":false,
"type":"string",
"paramType":"path"
}
]
}
]
}
],
"models":{

View File

@@ -232,6 +232,32 @@ void set_task_manager(http_context& ctx, routes& r, sharded<tasks::task_manager>
uint32_t user_ttl = cfg.user_task_ttl_seconds();
co_return json::json_return_type(user_ttl);
});
tm::drain_tasks.set(r, [&tm] (std::unique_ptr<http::request> req) -> future<json::json_return_type> {
co_await tm.invoke_on_all([&req] (tasks::task_manager& tm) -> future<> {
tasks::task_manager::module_ptr module;
try {
module = tm.find_module(req->get_path_param("module"));
} catch (...) {
throw bad_param_exception(fmt::format("{}", std::current_exception()));
}
const auto& local_tasks = module->get_local_tasks();
std::vector<tasks::task_id> ids;
ids.reserve(local_tasks.size());
std::transform(begin(local_tasks), end(local_tasks), std::back_inserter(ids), [] (const auto& task) {
return task.second->is_complete() ? task.first : tasks::task_id::create_null_id();
});
for (auto&& id : ids) {
if (id) {
module->unregister_task(id);
}
co_await maybe_yield();
}
});
co_return json_void();
});
}
void unset_task_manager(http_context& ctx, routes& r) {
@@ -243,6 +269,7 @@ void unset_task_manager(http_context& ctx, routes& r) {
tm::get_task_status_recursively.unset(r);
tm::get_and_update_ttl.unset(r);
tm::get_ttl.unset(r);
tm::drain_tasks.unset(r);
}
}

View File

@@ -76,6 +76,8 @@ Briefly:
gets or sets new ttl.
- `/task_manager/user_ttl` -
gets or sets new user ttl.
- `/task_manager/drain/{module}` -
unregisters all finished local tasks in the module.
# Virtual tasks

View File

@@ -89,6 +89,8 @@ API calls
- *user_ttl* - new user ttl value.
* ``/task_manager/drain/{module}`` - unregisters all finished local tasks in the module.
Cluster tasks are not unregistered from task manager with API calls.
Tasks API

View File

@@ -0,0 +1,21 @@
Nodetool tasks drain
====================
**tasks drain** - Unregisters all finished local tasks from the module.
If a module is not specified, finished tasks in all modules are unregistered.
Syntax
-------
.. code-block:: console
nodetool tasks drain [--module <module>]
Options
-------
* ``--module`` - if set, only the specified module is drained.
For example:
.. code-block:: shell
> nodetool tasks drain --module repair

View File

@@ -5,6 +5,7 @@ Nodetool tasks
:hidden:
abort <abort>
drain <drain>
user-ttl <user-ttl>
list <list>
modules <modules>
@@ -32,6 +33,7 @@ Supported tasks suboperations
-----------------------------
* :doc:`abort </operating-scylla/nodetool-commands/tasks/abort>` - Aborts the task.
* :doc:`drain </operating-scylla/nodetool-commands/tasks/drain>` - Unregisters all finished local tasks.
* :doc:`user-ttl </operating-scylla/nodetool-commands/tasks/user-ttl>` - Gets or sets user_task_ttl value.
* :doc:`list </operating-scylla/nodetool-commands/tasks/list>` - Lists tasks in the module.
* :doc:`modules </operating-scylla/nodetool-commands/tasks/modules>` - Lists supported modules.

View File

@@ -29,6 +29,15 @@ def test_abort_failure(nodetool, scylla_only):
{"expected_requests": []},
["required parameter is missing"])
def test_drain(nodetool, scylla_only):
nodetool("tasks", "drain", expected_requests=[
expected_request("GET", "/task_manager/list_modules", response=["repair", "compaction"]),
expected_request("POST", "/task_manager/drain/repair"),
expected_request("POST", "/task_manager/drain/compaction")])
nodetool("tasks", "drain", "--module", "repair", expected_requests=[
expected_request("POST", "/task_manager/drain/repair")])
def test_user_ttl(nodetool, scylla_only):
nodetool("tasks", "user-ttl", expected_requests=[
expected_request("GET", "/task_manager/user_ttl")])

View File

@@ -70,9 +70,11 @@ def check_child_parent_relationship(rest_api, status_tree, parent, allow_no_chil
def drain_module_tasks(rest_api, module_name):
tasks = [task for task in list_tasks(rest_api, module_name, True)]
# Wait for all tasks.
for task in tasks:
# Wait for task and unregister it.
resp = rest_api.send("GET", f"task_manager/wait_task/{task['task_id']}")
resp = rest_api.send("GET", f"task_manager/task_status/{task['task_id']}")
# The task may be already unregistered.
assert resp.status_code == requests.codes.ok or resp.status_code == requests.codes.bad_request, "Invalid status code"
resp = rest_api.send("POST", f"task_manager/drain/{module_name}")
resp.raise_for_status()

View File

@@ -75,5 +75,4 @@ class TaskManagerClient():
tasks = await self.list_tasks(node_ip, module_name, internal=internal)
await asyncio.gather(*(self.api.client.get(f"/task_manager/wait_task/{stats.task_id}", host=node_ip,
allow_failed=True) for stats in tasks))
await asyncio.gather(*(self.api.client.get(f"/task_manager/task_status/{stats.task_id}", host=node_ip,
allow_failed=True) for stats in tasks))
await self.api.client.get(f"/task_manager/drain/{module_name}", host=node_ip)

View File

@@ -2912,6 +2912,18 @@ void tasks_abort_operation(scylla_rest_client& client, const bpo::variables_map&
}
}
void tasks_drain_operation(scylla_rest_client& client, const bpo::variables_map& vm) {
if (vm.contains("module")) {
auto module = vm["module"].as<sstring>();
auto res = client.post(format("/task_manager/drain/{}", module));
return;
}
auto module_res = client.get("/task_manager/list_modules");
for (const auto& module : module_res.GetArray()) {
auto drain_res = client.post(format("/task_manager/drain/{}", module.GetString()));
}
}
void tasks_user_ttl_operation(scylla_rest_client& client, const bpo::variables_map& vm) {
if (!vm.contains("set")) {
auto res = client.get("/task_manager/user_ttl");
@@ -4268,6 +4280,20 @@ For more information, see: {}"
typed_option<sstring>("id", "The uuid of a task", 1),
},
},
{
"drain",
"Drains tasks",
fmt::format(R"(
Unregisters all finished local tasks from the specified module. If a module is not specified,
all modules are drained.
For more information, see: {}"
)", doc_link("operating-scylla/nodetool-commands/tasks/drain.html")),
{
typed_option<sstring>("module", "The module name; if specified, only the tasks from this module are unregistered"),
},
{ },
},
{
"user-ttl",
"Gets or sets user task ttl",
@@ -4382,6 +4408,9 @@ For more information, see: {}"
{
"abort", { tasks_abort_operation }
},
{
"drain", { tasks_drain_operation }
},
{
"user-ttl", { tasks_user_ttl_operation }
},