diff --git a/configure.py b/configure.py index 45978d8955..36e6279393 100755 --- a/configure.py +++ b/configure.py @@ -1328,7 +1328,7 @@ idls = ['idl/gossip_digest.idl.hh', 'idl/gossip.idl.hh', 'idl/migration_manager.idl.hh', "idl/node_ops.idl.hh", - + "idl/tasks.idl.hh" ] scylla_tests_generic_dependencies = [ diff --git a/idl/CMakeLists.txt b/idl/CMakeLists.txt index 7f5c9082ac..c0d6d0987f 100644 --- a/idl/CMakeLists.txt +++ b/idl/CMakeLists.txt @@ -65,6 +65,7 @@ set(idl_headers gossip.idl.hh migration_manager.idl.hh node_ops.idl.hh + tasks.idl.hh ) foreach(idl_header ${idl_headers}) diff --git a/idl/tasks.idl.hh b/idl/tasks.idl.hh new file mode 100644 index 0000000000..c5cf9e0343 --- /dev/null +++ b/idl/tasks.idl.hh @@ -0,0 +1,11 @@ +/* + * Copyright 2025-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 + */ + +#include "idl/uuid.idl.hh" + +verb [[with_client_info]] tasks_get_children (tasks::get_children_request req) -> tasks::get_children_response; diff --git a/message/messaging_service.cc b/message/messaging_service.cc index 3067a2a29e..795408f746 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -85,6 +85,7 @@ #include "idl/storage_service.dist.hh" #include "idl/join_node.dist.hh" #include "idl/migration_manager.dist.hh" +#include "idl/tasks.dist.hh" #include "message/rpc_protocol_impl.hh" #include "idl/consistency_level.dist.impl.hh" #include "idl/tracing.dist.impl.hh" @@ -128,6 +129,7 @@ #include "idl/mapreduce_request.dist.impl.hh" #include "idl/storage_service.dist.impl.hh" #include "idl/join_node.dist.impl.hh" +#include "idl/tasks.dist.impl.hh" #include "gms/feature_service.hh" namespace netw { @@ -1428,15 +1430,4 @@ unsigned messaging_service::add_statement_tenant(sstring tenant_name, scheduling return idx; } -// Wrapper for TASKS_CHILDREN_REQUEST -void messaging_service::register_tasks_get_children(std::function (const rpc::client_info& cinfo, tasks::get_children_request)>&& func) { - register_handler(this, messaging_verb::TASKS_GET_CHILDREN, std::move(func)); -} -future<> messaging_service::unregister_tasks_get_children() { - return unregister_handler(messaging_verb::TASKS_GET_CHILDREN); -} -future messaging_service::send_tasks_get_children(msg_addr id, tasks::get_children_request req) { - return send_message>(this, messaging_verb::TASKS_GET_CHILDREN, std::move(id), std::move(req)); -} - } // namespace net diff --git a/message/messaging_service.hh b/message/messaging_service.hh index ab4182d912..ee79b29bc7 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -440,11 +440,6 @@ public: void register_repair_get_full_row_hashes_with_rpc_stream(std::function> (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::source source, rpc::optional dst_cpu_id_opt)>&& func); future<> unregister_repair_get_full_row_hashes_with_rpc_stream(); - // Wrapper for TASKS_GET_CHILDREN - void register_tasks_get_children(std::function (const rpc::client_info& cinfo, tasks::get_children_request)>&& func); - future<> unregister_tasks_get_children(); - future send_tasks_get_children(msg_addr id, tasks::get_children_request); - void foreach_server_connection_stats(std::function&& f) const; // Drops all connections from the given host and prevents further communication from it to happen. diff --git a/tasks/task_manager.cc b/tasks/task_manager.cc index c9a2436cf6..4264ba1078 100644 --- a/tasks/task_manager.cc +++ b/tasks/task_manager.cc @@ -25,6 +25,7 @@ #include "task_manager.hh" #include "tasks/virtual_task_hint.hh" #include "utils/error_injection.hh" +#include "idl/tasks.dist.hh" using namespace std::chrono_literals; @@ -417,7 +418,7 @@ future> task_manager::virtual_task::impl::get_childre auto nodes = module->get_nodes(); co_return co_await map_reduce(nodes, [ms, parent_id] (auto addr) -> future> { - return ms->send_tasks_get_children(netw::msg_addr{addr}, parent_id).then([addr] (auto resp) { + return ser::tasks_rpc_verbs::send_tasks_get_children(ms, netw::msg_addr{addr}, parent_id).then([addr] (auto resp) { return resp | std::views::transform([addr] (auto id) { return task_identity{ .node = addr, @@ -802,14 +803,14 @@ void task_manager::unregister_virtual_task(task_group group) noexcept { void task_manager::init_ms_handlers(netw::messaging_service& ms) { _messaging = &ms; - ms.register_tasks_get_children([this] (const rpc::client_info& cinfo, tasks::get_children_request req) -> future { + ser::tasks_rpc_verbs::register_tasks_get_children(_messaging, [this] (const rpc::client_info& cinfo, tasks::get_children_request req) -> future { return get_virtual_task_children(task_id{req.id}); }); } future<> task_manager::uninit_ms_handlers() { if (auto* ms = std::exchange(_messaging, nullptr)) { - return ms->unregister_tasks_get_children().discard_result(); + return ser::tasks_rpc_verbs::unregister(ms).discard_result(); } return make_ready_future(); }