tasks: move tasks_get_children to IDL

This commit is contained in:
Aleksandra Martyniuk
2025-01-22 17:41:05 +01:00
parent bea434f417
commit 7969e98b4e
6 changed files with 19 additions and 20 deletions

View File

@@ -1328,7 +1328,7 @@ idls = ['idl/gossip_digest.idl.hh',
'idl/gossip.idl.hh',
'idl/migration_manager.idl.hh',
"idl/node_ops.idl.hh",
"idl/tasks.idl.hh"
]
scylla_tests_generic_dependencies = [

View File

@@ -65,6 +65,7 @@ set(idl_headers
gossip.idl.hh
migration_manager.idl.hh
node_ops.idl.hh
tasks.idl.hh
)
foreach(idl_header ${idl_headers})

11
idl/tasks.idl.hh Normal file
View File

@@ -0,0 +1,11 @@
/*
* Copyright 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "idl/uuid.idl.hh"
verb [[with_client_info]] tasks_get_children (tasks::get_children_request req) -> tasks::get_children_response;

View File

@@ -85,6 +85,7 @@
#include "idl/storage_service.dist.hh"
#include "idl/join_node.dist.hh"
#include "idl/migration_manager.dist.hh"
#include "idl/tasks.dist.hh"
#include "message/rpc_protocol_impl.hh"
#include "idl/consistency_level.dist.impl.hh"
#include "idl/tracing.dist.impl.hh"
@@ -128,6 +129,7 @@
#include "idl/mapreduce_request.dist.impl.hh"
#include "idl/storage_service.dist.impl.hh"
#include "idl/join_node.dist.impl.hh"
#include "idl/tasks.dist.impl.hh"
#include "gms/feature_service.hh"
namespace netw {
@@ -1428,15 +1430,4 @@ unsigned messaging_service::add_statement_tenant(sstring tenant_name, scheduling
return idx;
}
// Wrapper for TASKS_CHILDREN_REQUEST
void messaging_service::register_tasks_get_children(std::function<future<tasks::get_children_response> (const rpc::client_info& cinfo, tasks::get_children_request)>&& func) {
register_handler(this, messaging_verb::TASKS_GET_CHILDREN, std::move(func));
}
future<> messaging_service::unregister_tasks_get_children() {
return unregister_handler(messaging_verb::TASKS_GET_CHILDREN);
}
future<tasks::get_children_response> messaging_service::send_tasks_get_children(msg_addr id, tasks::get_children_request req) {
return send_message<future<tasks::get_children_response>>(this, messaging_verb::TASKS_GET_CHILDREN, std::move(id), std::move(req));
}
} // namespace net

View File

@@ -440,11 +440,6 @@ public:
void register_repair_get_full_row_hashes_with_rpc_stream(std::function<future<rpc::sink<repair_hash_with_cmd>> (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::source<repair_stream_cmd> source, rpc::optional<shard_id> dst_cpu_id_opt)>&& func);
future<> unregister_repair_get_full_row_hashes_with_rpc_stream();
// Wrapper for TASKS_GET_CHILDREN
void register_tasks_get_children(std::function<future<tasks::get_children_response> (const rpc::client_info& cinfo, tasks::get_children_request)>&& func);
future<> unregister_tasks_get_children();
future<tasks::get_children_response> send_tasks_get_children(msg_addr id, tasks::get_children_request);
void foreach_server_connection_stats(std::function<void(const rpc::client_info&, const rpc::stats&)>&& f) const;
// Drops all connections from the given host and prevents further communication from it to happen.

View File

@@ -25,6 +25,7 @@
#include "task_manager.hh"
#include "tasks/virtual_task_hint.hh"
#include "utils/error_injection.hh"
#include "idl/tasks.dist.hh"
using namespace std::chrono_literals;
@@ -417,7 +418,7 @@ future<std::vector<task_identity>> task_manager::virtual_task::impl::get_childre
auto nodes = module->get_nodes();
co_return co_await map_reduce(nodes, [ms, parent_id] (auto addr) -> future<std::vector<task_identity>> {
return ms->send_tasks_get_children(netw::msg_addr{addr}, parent_id).then([addr] (auto resp) {
return ser::tasks_rpc_verbs::send_tasks_get_children(ms, netw::msg_addr{addr}, parent_id).then([addr] (auto resp) {
return resp | std::views::transform([addr] (auto id) {
return task_identity{
.node = addr,
@@ -802,14 +803,14 @@ void task_manager::unregister_virtual_task(task_group group) noexcept {
void task_manager::init_ms_handlers(netw::messaging_service& ms) {
_messaging = &ms;
ms.register_tasks_get_children([this] (const rpc::client_info& cinfo, tasks::get_children_request req) -> future<tasks::get_children_response> {
ser::tasks_rpc_verbs::register_tasks_get_children(_messaging, [this] (const rpc::client_info& cinfo, tasks::get_children_request req) -> future<tasks::get_children_response> {
return get_virtual_task_children(task_id{req.id});
});
}
future<> task_manager::uninit_ms_handlers() {
if (auto* ms = std::exchange(_messaging, nullptr)) {
return ms->unregister_tasks_get_children().discard_result();
return ser::tasks_rpc_verbs::unregister(ms).discard_result();
}
return make_ready_future();
}