diff --git a/api/api.cc b/api/api.cc index e2880dad6e..aa0951eaba 100644 --- a/api/api.cc +++ b/api/api.cc @@ -317,13 +317,13 @@ future<> unset_server_commitlog(http_context& ctx) { return ctx.http_server.set_routes([&ctx] (routes& r) { unset_commitlog(ctx, r); }); } -future<> set_server_task_manager(http_context& ctx, sharded& tm, lw_shared_ptr cfg) { +future<> set_server_task_manager(http_context& ctx, sharded& tm, lw_shared_ptr cfg, sharded& gossiper) { auto rb = std::make_shared < api_registry_builder > (ctx.api_doc); - return ctx.http_server.set_routes([rb, &ctx, &tm, &cfg = *cfg](routes& r) { + return ctx.http_server.set_routes([rb, &ctx, &tm, &cfg = *cfg, &gossiper](routes& r) { rb->register_function(r, "task_manager", "The task manager API"); - set_task_manager(ctx, r, tm, cfg); + set_task_manager(ctx, r, tm, cfg, gossiper); }); } diff --git a/api/api_init.hh b/api/api_init.hh index b09948a8da..5bc364c8b1 100644 --- a/api/api_init.hh +++ b/api/api_init.hh @@ -131,7 +131,7 @@ future<> unset_server_cache(http_context& ctx); future<> set_server_compaction_manager(http_context& ctx, sharded& cm); future<> unset_server_compaction_manager(http_context& ctx); future<> set_server_done(http_context& ctx); -future<> set_server_task_manager(http_context& ctx, sharded& tm, lw_shared_ptr cfg); +future<> set_server_task_manager(http_context& ctx, sharded& tm, lw_shared_ptr cfg, sharded& gossiper); future<> unset_server_task_manager(http_context& ctx); future<> set_server_task_manager_test(http_context& ctx, sharded& tm); future<> unset_server_task_manager_test(http_context& ctx); diff --git a/api/task_manager.cc b/api/task_manager.cc index 5f42014396..99f719de6a 100644 --- a/api/task_manager.cc +++ b/api/task_manager.cc @@ -14,6 +14,7 @@ #include "api/api.hh" #include "api/api-doc/task_manager.json.hh" #include "db/system_keyspace.hh" +#include "gms/gossiper.hh" #include "tasks/task_handler.hh" #include "utils/overloaded_functor.hh" @@ -32,12 +33,16 @@ static ::tm get_time(db_clock::time_point tp) { return t; } -tm::task_status make_status(tasks::task_status status) { +tm::task_status make_status(tasks::task_status status, sharded& gossiper) { std::vector tis{status.children.size()}; - std::ranges::transform(status.children, tis.begin(), [] (const auto& child) { + std::ranges::transform(status.children, tis.begin(), [&gossiper] (const auto& child) { tm::task_identity ident; + gms::inet_address addr{}; + if (gossiper.local_is_initialized()) { + addr = gossiper.local().get_address_map().find(child.host_id).value_or(gms::inet_address{}); + } ident.task_id = child.task_id.to_sstring(); - ident.node = fmt::format("{}", child.node); + ident.node = fmt::format("{}", addr); return ident; }); @@ -81,7 +86,7 @@ tm::task_stats make_stats(tasks::task_stats stats) { return res; } -void set_task_manager(http_context& ctx, routes& r, sharded& tm, db::config& cfg) { +void set_task_manager(http_context& ctx, routes& r, sharded& tm, db::config& cfg, sharded& gossiper) { tm::get_modules.set(r, [&tm] (std::unique_ptr req) -> future { std::vector v = tm.local().get_modules() | std::views::keys | std::ranges::to(); co_return v; @@ -139,7 +144,7 @@ void set_task_manager(http_context& ctx, routes& r, sharded co_return std::move(f); }); - tm::get_task_status.set(r, [&tm] (std::unique_ptr req) -> future { + tm::get_task_status.set(r, [&tm, &gossiper] (std::unique_ptr req) -> future { auto id = tasks::task_id{utils::UUID{req->get_path_param("task_id")}}; tasks::task_status status; try { @@ -148,7 +153,7 @@ void set_task_manager(http_context& ctx, routes& r, sharded } catch (tasks::task_manager::task_not_found& e) { throw bad_param_exception(e.what()); } - co_return make_status(status); + co_return make_status(status, gossiper); }); tm::abort_task.set(r, [&tm] (std::unique_ptr req) -> future { @@ -164,7 +169,7 @@ void set_task_manager(http_context& ctx, routes& r, sharded co_return json_void(); }); - tm::wait_task.set(r, [&tm] (std::unique_ptr req) -> future { + tm::wait_task.set(r, [&tm, &gossiper] (std::unique_ptr req) -> future { auto id = tasks::task_id{utils::UUID{req->get_path_param("task_id")}}; tasks::task_status status; std::optional timeout = std::nullopt; @@ -179,24 +184,24 @@ void set_task_manager(http_context& ctx, routes& r, sharded } catch (timed_out_error& e) { throw httpd::base_exception{e.what(), http::reply::status_type::request_timeout}; } - co_return make_status(status); + co_return make_status(status, gossiper); }); - tm::get_task_status_recursively.set(r, [&_tm = tm] (std::unique_ptr req) -> future { + tm::get_task_status_recursively.set(r, [&_tm = tm, &gossiper] (std::unique_ptr req) -> future { auto& tm = _tm; auto id = tasks::task_id{utils::UUID{req->get_path_param("task_id")}}; try { auto task = tasks::task_handler{tm.local(), id}; auto res = co_await task.get_status_recursively(true); - std::function(output_stream&&)> f = [r = std::move(res)] (output_stream&& os) -> future<> { + std::function(output_stream&&)> f = [r = std::move(res), &gossiper] (output_stream&& os) -> future<> { auto s = std::move(os); auto res = std::move(r); co_await s.write("["); std::string delim = ""; for (auto& status: res) { co_await s.write(std::exchange(delim, ", ")); - co_await formatter::write(s, make_status(status)); + co_await formatter::write(s, make_status(status, gossiper)); } co_await s.write("]"); co_await s.close(); diff --git a/api/task_manager.hh b/api/task_manager.hh index 7081d59ec9..febaacbb22 100644 --- a/api/task_manager.hh +++ b/api/task_manager.hh @@ -18,7 +18,7 @@ namespace tasks { namespace api { -void set_task_manager(http_context& ctx, httpd::routes& r, sharded& tm, db::config& cfg); +void set_task_manager(http_context& ctx, httpd::routes& r, sharded& tm, db::config& cfg, sharded& gossiper); void unset_task_manager(http_context& ctx, httpd::routes& r); } diff --git a/configure.py b/configure.py index cf6616fde9..7362df94f5 100755 --- a/configure.py +++ b/configure.py @@ -1335,7 +1335,7 @@ idls = ['idl/gossip_digest.idl.hh', 'idl/gossip.idl.hh', 'idl/migration_manager.idl.hh', "idl/node_ops.idl.hh", - + "idl/tasks.idl.hh" ] scylla_tests_generic_dependencies = [ diff --git a/idl/CMakeLists.txt b/idl/CMakeLists.txt index 7f5c9082ac..c0d6d0987f 100644 --- a/idl/CMakeLists.txt +++ b/idl/CMakeLists.txt @@ -65,6 +65,7 @@ set(idl_headers gossip.idl.hh migration_manager.idl.hh node_ops.idl.hh + tasks.idl.hh ) foreach(idl_header ${idl_headers}) diff --git a/idl/tasks.idl.hh b/idl/tasks.idl.hh new file mode 100644 index 0000000000..c5cf9e0343 --- /dev/null +++ b/idl/tasks.idl.hh @@ -0,0 +1,11 @@ +/* + * Copyright 2025-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 + */ + +#include "idl/uuid.idl.hh" + +verb [[with_client_info]] tasks_get_children (tasks::get_children_request req) -> tasks::get_children_response; diff --git a/main.cc b/main.cc index 2fa60f2969..436752c963 100644 --- a/main.cc +++ b/main.cc @@ -1140,7 +1140,6 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl return tasks::task_manager::config { .task_ttl = cfg->task_ttl_seconds, .user_task_ttl = cfg->user_task_ttl_seconds, - .broadcast_address = broadcast_addr }; }); task_manager.start(std::move(get_tm_cfg), std::ref(stop_signal.as_sharded_abort_source())).get(); @@ -1148,7 +1147,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl task_manager.stop().get(); }); - api::set_server_task_manager(ctx, task_manager, cfg).get(); + api::set_server_task_manager(ctx, task_manager, cfg, gossiper).get(); auto stop_tm_api = defer_verbose_shutdown("task manager API", [&ctx] { api::unset_server_task_manager(ctx).get(); }); @@ -1573,6 +1572,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl // Task manager's messaging handlers need to be set like this because of dependency chain: // messaging -(needs)-> sys_ks -> db -> cm -> task_manager. task_manager.invoke_on_all([&] (auto& tm) { + tm.set_host_id(host_id); tm.init_ms_handlers(messaging.local()); }).get(); auto uninit_tm_ms_handlers = defer([&task_manager] () { diff --git a/message/messaging_service.cc b/message/messaging_service.cc index 3067a2a29e..795408f746 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -85,6 +85,7 @@ #include "idl/storage_service.dist.hh" #include "idl/join_node.dist.hh" #include "idl/migration_manager.dist.hh" +#include "idl/tasks.dist.hh" #include "message/rpc_protocol_impl.hh" #include "idl/consistency_level.dist.impl.hh" #include "idl/tracing.dist.impl.hh" @@ -128,6 +129,7 @@ #include "idl/mapreduce_request.dist.impl.hh" #include "idl/storage_service.dist.impl.hh" #include "idl/join_node.dist.impl.hh" +#include "idl/tasks.dist.impl.hh" #include "gms/feature_service.hh" namespace netw { @@ -1428,15 +1430,4 @@ unsigned messaging_service::add_statement_tenant(sstring tenant_name, scheduling return idx; } -// Wrapper for TASKS_CHILDREN_REQUEST -void messaging_service::register_tasks_get_children(std::function (const rpc::client_info& cinfo, tasks::get_children_request)>&& func) { - register_handler(this, messaging_verb::TASKS_GET_CHILDREN, std::move(func)); -} -future<> messaging_service::unregister_tasks_get_children() { - return unregister_handler(messaging_verb::TASKS_GET_CHILDREN); -} -future messaging_service::send_tasks_get_children(msg_addr id, tasks::get_children_request req) { - return send_message>(this, messaging_verb::TASKS_GET_CHILDREN, std::move(id), std::move(req)); -} - } // namespace net diff --git a/message/messaging_service.hh b/message/messaging_service.hh index ab4182d912..ee79b29bc7 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -440,11 +440,6 @@ public: void register_repair_get_full_row_hashes_with_rpc_stream(std::function> (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::source source, rpc::optional dst_cpu_id_opt)>&& func); future<> unregister_repair_get_full_row_hashes_with_rpc_stream(); - // Wrapper for TASKS_GET_CHILDREN - void register_tasks_get_children(std::function (const rpc::client_info& cinfo, tasks::get_children_request)>&& func); - future<> unregister_tasks_get_children(); - future send_tasks_get_children(msg_addr id, tasks::get_children_request); - void foreach_server_connection_stats(std::function&& f) const; // Drops all connections from the given host and prevents further communication from it to happen. diff --git a/node_ops/task_manager_module.cc b/node_ops/task_manager_module.cc index 834d6b02bd..25086ca179 100644 --- a/node_ops/task_manager_module.cc +++ b/node_ops/task_manager_module.cc @@ -206,7 +206,7 @@ task_manager_module::task_manager_module(tasks::task_manager& tm, service::stora , _ss(ss) {} -std::set task_manager_module::get_nodes() const { +std::set task_manager_module::get_nodes() const { return get_task_manager().get_nodes(_ss); } diff --git a/node_ops/task_manager_module.hh b/node_ops/task_manager_module.hh index bae4ff6a0e..cff6f2867b 100644 --- a/node_ops/task_manager_module.hh +++ b/node_ops/task_manager_module.hh @@ -67,7 +67,7 @@ private: public: task_manager_module(tasks::task_manager& tm, service::storage_service& ss) noexcept; - virtual std::set get_nodes() const override; + virtual std::set get_nodes() const override; }; } diff --git a/service/task_manager_module.cc b/service/task_manager_module.cc index f429b6f626..e113c9ef54 100644 --- a/service/task_manager_module.cc +++ b/service/task_manager_module.cc @@ -281,7 +281,7 @@ task_manager_module::task_manager_module(tasks::task_manager& tm, service::stora , _ss(ss) {} -std::set task_manager_module::get_nodes() const { +std::set task_manager_module::get_nodes() const { return get_task_manager().get_nodes(_ss); } diff --git a/service/task_manager_module.hh b/service/task_manager_module.hh index e32148abb4..bd24bede52 100644 --- a/service/task_manager_module.hh +++ b/service/task_manager_module.hh @@ -52,6 +52,6 @@ private: public: task_manager_module(tasks::task_manager& tm, service::storage_service& ss) noexcept; - std::set get_nodes() const override; + std::set get_nodes() const override; }; } diff --git a/tasks/task_handler.cc b/tasks/task_handler.cc index 21590f848c..1d0f71420a 100644 --- a/tasks/task_handler.cc +++ b/tasks/task_handler.cc @@ -20,7 +20,7 @@ namespace tasks { using task_status_variant = std::variant; static future get_task_status(task_manager::task_ptr task) { - auto broadcast_address = task->get_module()->get_task_manager().get_broadcast_address(); + auto host_id = task->get_module()->get_task_manager().get_host_id(); auto local_task_status = task->get_status(); auto status = task_status{ .task_id = local_task_status.id, @@ -41,16 +41,16 @@ static future get_task_status(task_manager::task_ptr task) { .progress_units = local_task_status.progress_units, .progress = co_await task->get_progress(), .children = co_await task->get_children().map_each_task( - [broadcast_address] (const task_manager::foreign_task_ptr& task) { + [host_id] (const task_manager::foreign_task_ptr& task) { // There is no race because id does not change for the whole task lifetime. return task_identity{ - .node = broadcast_address, + .host_id = host_id, .task_id = task->id() }; }, - [broadcast_address] (const task_manager::task::task_essentials& task) { + [host_id] (const task_manager::task::task_essentials& task) { return task_identity{ - .node = broadcast_address, + .host_id = host_id, .task_id = task.task_status.id }; }) @@ -138,7 +138,7 @@ future> task_handler::get_status_recursively( } else { // virtual task res.push_back(sh.status); for (auto ident: sh.status.children) { - if (ident.node != _tm.get_broadcast_address()) { + if (ident.host_id != _tm.get_host_id()) { // FIXME: add non-local version continue; } @@ -186,7 +186,7 @@ future> task_handler::get_status_recursively( .progress = task.task_progress, .children = task.failed_children | std::views::transform([&tm = _tm] (auto& child) { return task_identity{ - .node = tm.get_broadcast_address(), + .host_id = tm.get_host_id(), .task_id = child.task_status.id }; }) | std::ranges::to>() diff --git a/tasks/task_handler.hh b/tasks/task_handler.hh index 18f5eca868..e459e2df85 100644 --- a/tasks/task_handler.hh +++ b/tasks/task_handler.hh @@ -15,7 +15,7 @@ namespace tasks { struct task_identity { - gms::inet_address node; + locator::host_id host_id; task_id task_id; }; diff --git a/tasks/task_manager.cc b/tasks/task_manager.cc index c9a2436cf6..f9c7442688 100644 --- a/tasks/task_manager.cc +++ b/tasks/task_manager.cc @@ -25,6 +25,7 @@ #include "task_manager.hh" #include "tasks/virtual_task_hint.hh" #include "utils/error_injection.hh" +#include "idl/tasks.dist.hh" using namespace std::chrono_literals; @@ -409,18 +410,18 @@ future> task_manager::virtual_task::impl::get_childre auto ids = co_await module->get_task_manager().get_virtual_task_children(parent_id); co_return ids | std::views::transform([&tm = module->get_task_manager()] (auto id) { return task_identity{ - .node = tm.get_broadcast_address(), + .host_id = tm.get_host_id(), .task_id = id }; }) | std::ranges::to>(); } auto nodes = module->get_nodes(); - co_return co_await map_reduce(nodes, [ms, parent_id] (auto addr) -> future> { - return ms->send_tasks_get_children(netw::msg_addr{addr}, parent_id).then([addr] (auto resp) { - return resp | std::views::transform([addr] (auto id) { + co_return co_await map_reduce(nodes, [ms, parent_id] (auto host_id) -> future> { + return ser::tasks_rpc_verbs::send_tasks_get_children(ms, host_id, parent_id).then([host_id] (auto resp) { + return resp | std::views::transform([host_id] (auto id) { return task_identity{ - .node = addr, + .host_id = host_id, .task_id = id }; }) | std::ranges::to>(); @@ -532,8 +533,8 @@ const task_manager::tasks_collection& task_manager::module::get_tasks_collection return _tasks; } -std::set task_manager::module::get_nodes() const { - return {_tm.get_broadcast_address()}; +std::set task_manager::module::get_nodes() const { + return {_tm.get_host_id()}; } future> task_manager::module::get_stats(is_internal internal, std::function filter) const { @@ -653,8 +654,12 @@ task_manager::task_manager() noexcept , _user_task_ttl(0) {} -gms::inet_address task_manager::get_broadcast_address() const noexcept { - return _cfg.broadcast_address; +locator::host_id task_manager::get_host_id() const noexcept { + return _host_id; +} + +void task_manager::set_host_id(locator::host_id host_id) noexcept { + _host_id = host_id; } task_manager::modules& task_manager::get_modules() noexcept { @@ -689,15 +694,15 @@ const task_manager::tasks_collection& task_manager::get_tasks_collection() const return _tasks; } -std::set task_manager::get_nodes(service::storage_service& ss) const { +std::set task_manager::get_nodes(service::storage_service& ss) const { return std::ranges::join_view(std::to_array({ std::views::all(ss._topology_state_machine._topology.normal_nodes), std::views::all(ss._topology_state_machine._topology.transition_nodes)}) - ) | std::views::transform([&ss] (auto& node) { - return ss.host2ip(locator::host_id{node.first.uuid()}); - }) | std::views::filter([&ss] (gms::inet_address ip) { - return ss._gossiper.is_alive(ip); - }) | std::ranges::to>(); + ) | std::views::transform([] (auto& node) { + return locator::host_id{node.first.uuid()}; + }) | std::views::filter([&ss] (locator::host_id host_id) { + return ss._gossiper.is_alive(host_id); + }) | std::ranges::to>(); } future> task_manager::get_virtual_task_children(task_id parent_id) { @@ -802,14 +807,14 @@ void task_manager::unregister_virtual_task(task_group group) noexcept { void task_manager::init_ms_handlers(netw::messaging_service& ms) { _messaging = &ms; - ms.register_tasks_get_children([this] (const rpc::client_info& cinfo, tasks::get_children_request req) -> future { + ser::tasks_rpc_verbs::register_tasks_get_children(_messaging, [this] (const rpc::client_info& cinfo, tasks::get_children_request req) -> future { return get_virtual_task_children(task_id{req.id}); }); } future<> task_manager::uninit_ms_handlers() { if (auto* ms = std::exchange(_messaging, nullptr)) { - return ms->unregister_tasks_get_children().discard_result(); + return ser::tasks_rpc_verbs::unregister(ms).discard_result(); } return make_ready_future(); } diff --git a/tasks/task_manager.hh b/tasks/task_manager.hh index b2015f3195..5766b3ad42 100644 --- a/tasks/task_manager.hh +++ b/tasks/task_manager.hh @@ -18,7 +18,7 @@ #include #include "db_clock.hh" #include "utils/log.hh" -#include "gms/inet_address.hh" +#include "locator/host_id.hh" #include "schema/schema_fwd.hh" #include "tasks/types.hh" #include "utils/chunked_vector.hh" @@ -64,7 +64,6 @@ public: struct config { utils::updateable_value task_ttl; utils::updateable_value user_task_ttl; - gms::inet_address broadcast_address; }; using task_ptr = lw_shared_ptr; using virtual_task_ptr = lw_shared_ptr; @@ -84,6 +83,7 @@ private: tasks_collection _tasks; modules _modules; config _cfg; + locator::host_id _host_id = locator::host_id::create_null_id(); seastar::abort_source _as; optimized_optional _abort_subscription; utils::updateable_value _task_ttl; @@ -338,7 +338,7 @@ public: tasks_collection& get_tasks_collection() noexcept; const tasks_collection& get_tasks_collection() const noexcept; // Returns a set of nodes on which some of virtual tasks on this module can have their children. - virtual std::set get_nodes() const; + virtual std::set get_nodes() const; future> get_stats(is_internal internal, std::function filter) const; void register_task(task_ptr task); @@ -380,7 +380,9 @@ public: task_manager(config cfg, seastar::abort_source& as) noexcept; task_manager() noexcept; - gms::inet_address get_broadcast_address() const noexcept; + // Returns empty host_id if local info isn't resolved yet. + locator::host_id get_host_id() const noexcept; + void set_host_id(locator::host_id host_id) noexcept; modules& get_modules() noexcept; const modules& get_modules() const noexcept; task_map& get_local_tasks() noexcept; @@ -391,7 +393,7 @@ public: const tasks_collection& get_tasks_collection() const noexcept; future> get_virtual_task_children(task_id parent_id); - std::set get_nodes(service::storage_service& ss) const; + std::set get_nodes(service::storage_service& ss) const; module_ptr make_module(std::string name); void register_module(std::string name, module_ptr module); diff --git a/test/lib/cql_test_env.cc b/test/lib/cql_test_env.cc index 9b77149850..509a668f95 100644 --- a/test/lib/cql_test_env.cc +++ b/test/lib/cql_test_env.cc @@ -734,6 +734,10 @@ private: _gossip_address_map.stop().get(); }); + _task_manager.invoke_on_all([&] (auto& tm) { + tm.set_host_id(host_id); + }).get(); + auto arct_cfg = [&] { return utils::advanced_rpc_compressor::tracker::config{ .zstd_quota_fraction{1.0},