Merge 'tasks: use host_id in task manager' from Aleksandra Martyniuk

Use host_id in a children list of a task in task manager to indicate
a node on which the child was created.

Move TASKS_CHILDREN_REQUEST to IDL. Send it by host_id.

Fixes: https://github.com/scylladb/scylladb/issues/22284.

Ip to host_id transition; backport isn't needed.

Closes scylladb/scylladb#22487

* github.com:scylladb/scylladb:
  tasks: drop task_manager::config::broadcast_address as it's unused
  tasks: replace ip with host_id in task_identity
  api: task_manager: pass gossiper to api::set_task_manager
  tasks: keep host_id in task_manager
  tasks: move tasks_get_children to IDL
This commit is contained in:
Botond Dénes
2025-02-11 11:32:27 +02:00
19 changed files with 83 additions and 69 deletions

View File

@@ -317,13 +317,13 @@ future<> unset_server_commitlog(http_context& ctx) {
return ctx.http_server.set_routes([&ctx] (routes& r) { unset_commitlog(ctx, r); });
}
future<> set_server_task_manager(http_context& ctx, sharded<tasks::task_manager>& tm, lw_shared_ptr<db::config> cfg) {
future<> set_server_task_manager(http_context& ctx, sharded<tasks::task_manager>& tm, lw_shared_ptr<db::config> cfg, sharded<gms::gossiper>& gossiper) {
auto rb = std::make_shared < api_registry_builder > (ctx.api_doc);
return ctx.http_server.set_routes([rb, &ctx, &tm, &cfg = *cfg](routes& r) {
return ctx.http_server.set_routes([rb, &ctx, &tm, &cfg = *cfg, &gossiper](routes& r) {
rb->register_function(r, "task_manager",
"The task manager API");
set_task_manager(ctx, r, tm, cfg);
set_task_manager(ctx, r, tm, cfg, gossiper);
});
}

View File

@@ -131,7 +131,7 @@ future<> unset_server_cache(http_context& ctx);
future<> set_server_compaction_manager(http_context& ctx, sharded<compaction_manager>& cm);
future<> unset_server_compaction_manager(http_context& ctx);
future<> set_server_done(http_context& ctx);
future<> set_server_task_manager(http_context& ctx, sharded<tasks::task_manager>& tm, lw_shared_ptr<db::config> cfg);
future<> set_server_task_manager(http_context& ctx, sharded<tasks::task_manager>& tm, lw_shared_ptr<db::config> cfg, sharded<gms::gossiper>& gossiper);
future<> unset_server_task_manager(http_context& ctx);
future<> set_server_task_manager_test(http_context& ctx, sharded<tasks::task_manager>& tm);
future<> unset_server_task_manager_test(http_context& ctx);

View File

@@ -14,6 +14,7 @@
#include "api/api.hh"
#include "api/api-doc/task_manager.json.hh"
#include "db/system_keyspace.hh"
#include "gms/gossiper.hh"
#include "tasks/task_handler.hh"
#include "utils/overloaded_functor.hh"
@@ -32,12 +33,16 @@ static ::tm get_time(db_clock::time_point tp) {
return t;
}
tm::task_status make_status(tasks::task_status status) {
tm::task_status make_status(tasks::task_status status, sharded<gms::gossiper>& gossiper) {
std::vector<tm::task_identity> tis{status.children.size()};
std::ranges::transform(status.children, tis.begin(), [] (const auto& child) {
std::ranges::transform(status.children, tis.begin(), [&gossiper] (const auto& child) {
tm::task_identity ident;
gms::inet_address addr{};
if (gossiper.local_is_initialized()) {
addr = gossiper.local().get_address_map().find(child.host_id).value_or(gms::inet_address{});
}
ident.task_id = child.task_id.to_sstring();
ident.node = fmt::format("{}", child.node);
ident.node = fmt::format("{}", addr);
return ident;
});
@@ -81,7 +86,7 @@ tm::task_stats make_stats(tasks::task_stats stats) {
return res;
}
void set_task_manager(http_context& ctx, routes& r, sharded<tasks::task_manager>& tm, db::config& cfg) {
void set_task_manager(http_context& ctx, routes& r, sharded<tasks::task_manager>& tm, db::config& cfg, sharded<gms::gossiper>& gossiper) {
tm::get_modules.set(r, [&tm] (std::unique_ptr<http::request> req) -> future<json::json_return_type> {
std::vector<std::string> v = tm.local().get_modules() | std::views::keys | std::ranges::to<std::vector>();
co_return v;
@@ -139,7 +144,7 @@ void set_task_manager(http_context& ctx, routes& r, sharded<tasks::task_manager>
co_return std::move(f);
});
tm::get_task_status.set(r, [&tm] (std::unique_ptr<http::request> req) -> future<json::json_return_type> {
tm::get_task_status.set(r, [&tm, &gossiper] (std::unique_ptr<http::request> req) -> future<json::json_return_type> {
auto id = tasks::task_id{utils::UUID{req->get_path_param("task_id")}};
tasks::task_status status;
try {
@@ -148,7 +153,7 @@ void set_task_manager(http_context& ctx, routes& r, sharded<tasks::task_manager>
} catch (tasks::task_manager::task_not_found& e) {
throw bad_param_exception(e.what());
}
co_return make_status(status);
co_return make_status(status, gossiper);
});
tm::abort_task.set(r, [&tm] (std::unique_ptr<http::request> req) -> future<json::json_return_type> {
@@ -164,7 +169,7 @@ void set_task_manager(http_context& ctx, routes& r, sharded<tasks::task_manager>
co_return json_void();
});
tm::wait_task.set(r, [&tm] (std::unique_ptr<http::request> req) -> future<json::json_return_type> {
tm::wait_task.set(r, [&tm, &gossiper] (std::unique_ptr<http::request> req) -> future<json::json_return_type> {
auto id = tasks::task_id{utils::UUID{req->get_path_param("task_id")}};
tasks::task_status status;
std::optional<std::chrono::seconds> timeout = std::nullopt;
@@ -179,24 +184,24 @@ void set_task_manager(http_context& ctx, routes& r, sharded<tasks::task_manager>
} catch (timed_out_error& e) {
throw httpd::base_exception{e.what(), http::reply::status_type::request_timeout};
}
co_return make_status(status);
co_return make_status(status, gossiper);
});
tm::get_task_status_recursively.set(r, [&_tm = tm] (std::unique_ptr<http::request> req) -> future<json::json_return_type> {
tm::get_task_status_recursively.set(r, [&_tm = tm, &gossiper] (std::unique_ptr<http::request> req) -> future<json::json_return_type> {
auto& tm = _tm;
auto id = tasks::task_id{utils::UUID{req->get_path_param("task_id")}};
try {
auto task = tasks::task_handler{tm.local(), id};
auto res = co_await task.get_status_recursively(true);
std::function<future<>(output_stream<char>&&)> f = [r = std::move(res)] (output_stream<char>&& os) -> future<> {
std::function<future<>(output_stream<char>&&)> f = [r = std::move(res), &gossiper] (output_stream<char>&& os) -> future<> {
auto s = std::move(os);
auto res = std::move(r);
co_await s.write("[");
std::string delim = "";
for (auto& status: res) {
co_await s.write(std::exchange(delim, ", "));
co_await formatter::write(s, make_status(status));
co_await formatter::write(s, make_status(status, gossiper));
}
co_await s.write("]");
co_await s.close();

View File

@@ -18,7 +18,7 @@ namespace tasks {
namespace api {
void set_task_manager(http_context& ctx, httpd::routes& r, sharded<tasks::task_manager>& tm, db::config& cfg);
void set_task_manager(http_context& ctx, httpd::routes& r, sharded<tasks::task_manager>& tm, db::config& cfg, sharded<gms::gossiper>& gossiper);
void unset_task_manager(http_context& ctx, httpd::routes& r);
}

View File

@@ -1335,7 +1335,7 @@ idls = ['idl/gossip_digest.idl.hh',
'idl/gossip.idl.hh',
'idl/migration_manager.idl.hh',
"idl/node_ops.idl.hh",
"idl/tasks.idl.hh"
]
scylla_tests_generic_dependencies = [

View File

@@ -65,6 +65,7 @@ set(idl_headers
gossip.idl.hh
migration_manager.idl.hh
node_ops.idl.hh
tasks.idl.hh
)
foreach(idl_header ${idl_headers})

11
idl/tasks.idl.hh Normal file
View File

@@ -0,0 +1,11 @@
/*
* Copyright 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "idl/uuid.idl.hh"
verb [[with_client_info]] tasks_get_children (tasks::get_children_request req) -> tasks::get_children_response;

View File

@@ -1140,7 +1140,6 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
return tasks::task_manager::config {
.task_ttl = cfg->task_ttl_seconds,
.user_task_ttl = cfg->user_task_ttl_seconds,
.broadcast_address = broadcast_addr
};
});
task_manager.start(std::move(get_tm_cfg), std::ref(stop_signal.as_sharded_abort_source())).get();
@@ -1148,7 +1147,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
task_manager.stop().get();
});
api::set_server_task_manager(ctx, task_manager, cfg).get();
api::set_server_task_manager(ctx, task_manager, cfg, gossiper).get();
auto stop_tm_api = defer_verbose_shutdown("task manager API", [&ctx] {
api::unset_server_task_manager(ctx).get();
});
@@ -1573,6 +1572,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
// Task manager's messaging handlers need to be set like this because of dependency chain:
// messaging -(needs)-> sys_ks -> db -> cm -> task_manager.
task_manager.invoke_on_all([&] (auto& tm) {
tm.set_host_id(host_id);
tm.init_ms_handlers(messaging.local());
}).get();
auto uninit_tm_ms_handlers = defer([&task_manager] () {

View File

@@ -85,6 +85,7 @@
#include "idl/storage_service.dist.hh"
#include "idl/join_node.dist.hh"
#include "idl/migration_manager.dist.hh"
#include "idl/tasks.dist.hh"
#include "message/rpc_protocol_impl.hh"
#include "idl/consistency_level.dist.impl.hh"
#include "idl/tracing.dist.impl.hh"
@@ -128,6 +129,7 @@
#include "idl/mapreduce_request.dist.impl.hh"
#include "idl/storage_service.dist.impl.hh"
#include "idl/join_node.dist.impl.hh"
#include "idl/tasks.dist.impl.hh"
#include "gms/feature_service.hh"
namespace netw {
@@ -1428,15 +1430,4 @@ unsigned messaging_service::add_statement_tenant(sstring tenant_name, scheduling
return idx;
}
// Wrapper for TASKS_CHILDREN_REQUEST
void messaging_service::register_tasks_get_children(std::function<future<tasks::get_children_response> (const rpc::client_info& cinfo, tasks::get_children_request)>&& func) {
register_handler(this, messaging_verb::TASKS_GET_CHILDREN, std::move(func));
}
future<> messaging_service::unregister_tasks_get_children() {
return unregister_handler(messaging_verb::TASKS_GET_CHILDREN);
}
future<tasks::get_children_response> messaging_service::send_tasks_get_children(msg_addr id, tasks::get_children_request req) {
return send_message<future<tasks::get_children_response>>(this, messaging_verb::TASKS_GET_CHILDREN, std::move(id), std::move(req));
}
} // namespace net

View File

@@ -440,11 +440,6 @@ public:
void register_repair_get_full_row_hashes_with_rpc_stream(std::function<future<rpc::sink<repair_hash_with_cmd>> (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::source<repair_stream_cmd> source, rpc::optional<shard_id> dst_cpu_id_opt)>&& func);
future<> unregister_repair_get_full_row_hashes_with_rpc_stream();
// Wrapper for TASKS_GET_CHILDREN
void register_tasks_get_children(std::function<future<tasks::get_children_response> (const rpc::client_info& cinfo, tasks::get_children_request)>&& func);
future<> unregister_tasks_get_children();
future<tasks::get_children_response> send_tasks_get_children(msg_addr id, tasks::get_children_request);
void foreach_server_connection_stats(std::function<void(const rpc::client_info&, const rpc::stats&)>&& f) const;
// Drops all connections from the given host and prevents further communication from it to happen.

View File

@@ -206,7 +206,7 @@ task_manager_module::task_manager_module(tasks::task_manager& tm, service::stora
, _ss(ss)
{}
std::set<gms::inet_address> task_manager_module::get_nodes() const {
std::set<locator::host_id> task_manager_module::get_nodes() const {
return get_task_manager().get_nodes(_ss);
}

View File

@@ -67,7 +67,7 @@ private:
public:
task_manager_module(tasks::task_manager& tm, service::storage_service& ss) noexcept;
virtual std::set<gms::inet_address> get_nodes() const override;
virtual std::set<locator::host_id> get_nodes() const override;
};
}

View File

@@ -281,7 +281,7 @@ task_manager_module::task_manager_module(tasks::task_manager& tm, service::stora
, _ss(ss)
{}
std::set<gms::inet_address> task_manager_module::get_nodes() const {
std::set<locator::host_id> task_manager_module::get_nodes() const {
return get_task_manager().get_nodes(_ss);
}

View File

@@ -52,6 +52,6 @@ private:
public:
task_manager_module(tasks::task_manager& tm, service::storage_service& ss) noexcept;
std::set<gms::inet_address> get_nodes() const override;
std::set<locator::host_id> get_nodes() const override;
};
}

View File

@@ -20,7 +20,7 @@ namespace tasks {
using task_status_variant = std::variant<tasks::task_manager::foreign_task_ptr, tasks::task_manager::task::task_essentials>;
static future<task_status> get_task_status(task_manager::task_ptr task) {
auto broadcast_address = task->get_module()->get_task_manager().get_broadcast_address();
auto host_id = task->get_module()->get_task_manager().get_host_id();
auto local_task_status = task->get_status();
auto status = task_status{
.task_id = local_task_status.id,
@@ -41,16 +41,16 @@ static future<task_status> get_task_status(task_manager::task_ptr task) {
.progress_units = local_task_status.progress_units,
.progress = co_await task->get_progress(),
.children = co_await task->get_children().map_each_task<task_identity>(
[broadcast_address] (const task_manager::foreign_task_ptr& task) {
[host_id] (const task_manager::foreign_task_ptr& task) {
// There is no race because id does not change for the whole task lifetime.
return task_identity{
.node = broadcast_address,
.host_id = host_id,
.task_id = task->id()
};
},
[broadcast_address] (const task_manager::task::task_essentials& task) {
[host_id] (const task_manager::task::task_essentials& task) {
return task_identity{
.node = broadcast_address,
.host_id = host_id,
.task_id = task.task_status.id
};
})
@@ -138,7 +138,7 @@ future<utils::chunked_vector<task_status>> task_handler::get_status_recursively(
} else { // virtual task
res.push_back(sh.status);
for (auto ident: sh.status.children) {
if (ident.node != _tm.get_broadcast_address()) {
if (ident.host_id != _tm.get_host_id()) {
// FIXME: add non-local version
continue;
}
@@ -186,7 +186,7 @@ future<utils::chunked_vector<task_status>> task_handler::get_status_recursively(
.progress = task.task_progress,
.children = task.failed_children | std::views::transform([&tm = _tm] (auto& child) {
return task_identity{
.node = tm.get_broadcast_address(),
.host_id = tm.get_host_id(),
.task_id = child.task_status.id
};
}) | std::ranges::to<std::vector<task_identity>>()

View File

@@ -15,7 +15,7 @@
namespace tasks {
struct task_identity {
gms::inet_address node;
locator::host_id host_id;
task_id task_id;
};

View File

@@ -25,6 +25,7 @@
#include "task_manager.hh"
#include "tasks/virtual_task_hint.hh"
#include "utils/error_injection.hh"
#include "idl/tasks.dist.hh"
using namespace std::chrono_literals;
@@ -409,18 +410,18 @@ future<std::vector<task_identity>> task_manager::virtual_task::impl::get_childre
auto ids = co_await module->get_task_manager().get_virtual_task_children(parent_id);
co_return ids | std::views::transform([&tm = module->get_task_manager()] (auto id) {
return task_identity{
.node = tm.get_broadcast_address(),
.host_id = tm.get_host_id(),
.task_id = id
};
}) | std::ranges::to<std::vector<task_identity>>();
}
auto nodes = module->get_nodes();
co_return co_await map_reduce(nodes, [ms, parent_id] (auto addr) -> future<std::vector<task_identity>> {
return ms->send_tasks_get_children(netw::msg_addr{addr}, parent_id).then([addr] (auto resp) {
return resp | std::views::transform([addr] (auto id) {
co_return co_await map_reduce(nodes, [ms, parent_id] (auto host_id) -> future<std::vector<task_identity>> {
return ser::tasks_rpc_verbs::send_tasks_get_children(ms, host_id, parent_id).then([host_id] (auto resp) {
return resp | std::views::transform([host_id] (auto id) {
return task_identity{
.node = addr,
.host_id = host_id,
.task_id = id
};
}) | std::ranges::to<std::vector<task_identity>>();
@@ -532,8 +533,8 @@ const task_manager::tasks_collection& task_manager::module::get_tasks_collection
return _tasks;
}
std::set<gms::inet_address> task_manager::module::get_nodes() const {
return {_tm.get_broadcast_address()};
std::set<locator::host_id> task_manager::module::get_nodes() const {
return {_tm.get_host_id()};
}
future<utils::chunked_vector<task_stats>> task_manager::module::get_stats(is_internal internal, std::function<bool(std::string& keyspace, std::string& table)> filter) const {
@@ -653,8 +654,12 @@ task_manager::task_manager() noexcept
, _user_task_ttl(0)
{}
gms::inet_address task_manager::get_broadcast_address() const noexcept {
return _cfg.broadcast_address;
locator::host_id task_manager::get_host_id() const noexcept {
return _host_id;
}
void task_manager::set_host_id(locator::host_id host_id) noexcept {
_host_id = host_id;
}
task_manager::modules& task_manager::get_modules() noexcept {
@@ -689,15 +694,15 @@ const task_manager::tasks_collection& task_manager::get_tasks_collection() const
return _tasks;
}
std::set<gms::inet_address> task_manager::get_nodes(service::storage_service& ss) const {
std::set<locator::host_id> task_manager::get_nodes(service::storage_service& ss) const {
return std::ranges::join_view(std::to_array({
std::views::all(ss._topology_state_machine._topology.normal_nodes),
std::views::all(ss._topology_state_machine._topology.transition_nodes)})
) | std::views::transform([&ss] (auto& node) {
return ss.host2ip(locator::host_id{node.first.uuid()});
}) | std::views::filter([&ss] (gms::inet_address ip) {
return ss._gossiper.is_alive(ip);
}) | std::ranges::to<std::set<gms::inet_address>>();
) | std::views::transform([] (auto& node) {
return locator::host_id{node.first.uuid()};
}) | std::views::filter([&ss] (locator::host_id host_id) {
return ss._gossiper.is_alive(host_id);
}) | std::ranges::to<std::set<locator::host_id>>();
}
future<std::vector<task_id>> task_manager::get_virtual_task_children(task_id parent_id) {
@@ -802,14 +807,14 @@ void task_manager::unregister_virtual_task(task_group group) noexcept {
void task_manager::init_ms_handlers(netw::messaging_service& ms) {
_messaging = &ms;
ms.register_tasks_get_children([this] (const rpc::client_info& cinfo, tasks::get_children_request req) -> future<tasks::get_children_response> {
ser::tasks_rpc_verbs::register_tasks_get_children(_messaging, [this] (const rpc::client_info& cinfo, tasks::get_children_request req) -> future<tasks::get_children_response> {
return get_virtual_task_children(task_id{req.id});
});
}
future<> task_manager::uninit_ms_handlers() {
if (auto* ms = std::exchange(_messaging, nullptr)) {
return ms->unregister_tasks_get_children().discard_result();
return ser::tasks_rpc_verbs::unregister(ms).discard_result();
}
return make_ready_future();
}

View File

@@ -18,7 +18,7 @@
#include <seastar/coroutine/parallel_for_each.hh>
#include "db_clock.hh"
#include "utils/log.hh"
#include "gms/inet_address.hh"
#include "locator/host_id.hh"
#include "schema/schema_fwd.hh"
#include "tasks/types.hh"
#include "utils/chunked_vector.hh"
@@ -64,7 +64,6 @@ public:
struct config {
utils::updateable_value<uint32_t> task_ttl;
utils::updateable_value<uint32_t> user_task_ttl;
gms::inet_address broadcast_address;
};
using task_ptr = lw_shared_ptr<task_manager::task>;
using virtual_task_ptr = lw_shared_ptr<task_manager::virtual_task>;
@@ -84,6 +83,7 @@ private:
tasks_collection _tasks;
modules _modules;
config _cfg;
locator::host_id _host_id = locator::host_id::create_null_id();
seastar::abort_source _as;
optimized_optional<seastar::abort_source::subscription> _abort_subscription;
utils::updateable_value<uint32_t> _task_ttl;
@@ -338,7 +338,7 @@ public:
tasks_collection& get_tasks_collection() noexcept;
const tasks_collection& get_tasks_collection() const noexcept;
// Returns a set of nodes on which some of virtual tasks on this module can have their children.
virtual std::set<gms::inet_address> get_nodes() const;
virtual std::set<locator::host_id> get_nodes() const;
future<utils::chunked_vector<task_stats>> get_stats(is_internal internal, std::function<bool(std::string&, std::string&)> filter) const;
void register_task(task_ptr task);
@@ -380,7 +380,9 @@ public:
task_manager(config cfg, seastar::abort_source& as) noexcept;
task_manager() noexcept;
gms::inet_address get_broadcast_address() const noexcept;
// Returns empty host_id if local info isn't resolved yet.
locator::host_id get_host_id() const noexcept;
void set_host_id(locator::host_id host_id) noexcept;
modules& get_modules() noexcept;
const modules& get_modules() const noexcept;
task_map& get_local_tasks() noexcept;
@@ -391,7 +393,7 @@ public:
const tasks_collection& get_tasks_collection() const noexcept;
future<std::vector<task_id>> get_virtual_task_children(task_id parent_id);
std::set<gms::inet_address> get_nodes(service::storage_service& ss) const;
std::set<locator::host_id> get_nodes(service::storage_service& ss) const;
module_ptr make_module(std::string name);
void register_module(std::string name, module_ptr module);

View File

@@ -734,6 +734,10 @@ private:
_gossip_address_map.stop().get();
});
_task_manager.invoke_on_all([&] (auto& tm) {
tm.set_host_id(host_id);
}).get();
auto arct_cfg = [&] {
return utils::advanced_rpc_compressor::tracker::config{
.zstd_quota_fraction{1.0},