diff --git a/idl/node_ops.idl.hh b/idl/node_ops.idl.hh index 2768541437..0279456c36 100644 --- a/idl/node_ops.idl.hh +++ b/idl/node_ops.idl.hh @@ -50,3 +50,5 @@ struct node_ops_cmd_response { bool ok; std::list pending_ops; }; + +verb [[with_client_info]] node_ops_cmd(node_ops_cmd_request) -> node_ops_cmd_response diff --git a/message/messaging_service.cc b/message/messaging_service.cc index 21075ee1e2..760d1263a7 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -1368,17 +1368,6 @@ future> messaging_service::send_rep return send_message>>(this, messaging_verb::REPAIR_GET_DIFF_ALGORITHMS, std::move(id)); } -// Wrapper for NODE_OPS_CMD -void messaging_service::register_node_ops_cmd(std::function (const rpc::client_info& cinfo, node_ops_cmd_request)>&& func) { - register_handler(this, messaging_verb::NODE_OPS_CMD, std::move(func)); -} -future<> messaging_service::unregister_node_ops_cmd() { - return unregister_handler(messaging_verb::NODE_OPS_CMD); -} -future messaging_service::send_node_ops_cmd(msg_addr id, node_ops_cmd_request req) { - return send_message>(this, messaging_verb::NODE_OPS_CMD, std::move(id), std::move(req)); -} - // Wrapper for TASKS_CHILDREN_REQUEST void messaging_service::register_tasks_get_children(std::function (const rpc::client_info& cinfo, tasks::get_children_request)>&& func) { register_handler(this, messaging_verb::TASKS_GET_CHILDREN, std::move(func)); diff --git a/message/messaging_service.hh b/message/messaging_service.hh index aa1f67bd68..32a5fbd3e2 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -488,11 +488,6 @@ public: future<> unregister_repair_get_diff_algorithms(); future> send_repair_get_diff_algorithms(msg_addr id); - // Wrapper for NODE_OPS_CMD - void register_node_ops_cmd(std::function (const rpc::client_info& cinfo, node_ops_cmd_request)>&& func); - future<> unregister_node_ops_cmd(); - future send_node_ops_cmd(msg_addr id, node_ops_cmd_request); - // Wrapper for TASKS_GET_CHILDREN void register_tasks_get_children(std::function (const rpc::client_info& cinfo, tasks::get_children_request)>&& func); future<> unregister_tasks_get_children(); diff --git a/node_ops/node_ops_ctl.cc b/node_ops/node_ops_ctl.cc index c8709cf7e7..6726e3452f 100644 --- a/node_ops/node_ops_ctl.cc +++ b/node_ops/node_ops_ctl.cc @@ -16,6 +16,7 @@ #include #include #include +#include "idl/node_ops.dist.hh" static logging::logger nlogger("node_ops"); @@ -104,7 +105,7 @@ void node_ops_ctl::start_heartbeat_updater(node_ops_cmd cmd) { future<> node_ops_ctl::query_pending_op() { req.cmd = node_ops_cmd::query_pending_ops; co_await coroutine::parallel_for_each(sync_nodes, [this] (const gms::inet_address& node) -> future<> { - auto resp = co_await ss._messaging.local().send_node_ops_cmd(netw::msg_addr(node), req); + auto resp = co_await ser::node_ops_rpc_verbs::send_node_ops_cmd(&ss._messaging.local(), netw::msg_addr(node), req); nlogger.debug("{}[{}]: Got query_pending_ops response from node={}, resp.pending_ops={}", desc, uuid(), node, resp.pending_ops); if (boost::find(resp.pending_ops, uuid()) == resp.pending_ops.end()) { throw std::runtime_error(::format("{}[{}]: Node {} no longer tracks the operation", desc, uuid(), node)); @@ -152,7 +153,7 @@ future<> node_ops_ctl::send_to_all(node_ops_cmd cmd) { co_return; } try { - co_await ss._messaging.local().send_node_ops_cmd(netw::msg_addr(node), req); + co_await ser::node_ops_rpc_verbs::send_node_ops_cmd(&ss._messaging.local(), netw::msg_addr(node), req); nlogger.debug("{}[{}]: Got {} response from node={}", desc, uuid(), op_desc, node); } catch (const seastar::rpc::unknown_verb_error&) { if (cmd_category == node_ops_cmd_category::prepare) { @@ -195,7 +196,7 @@ future<> node_ops_ctl::heartbeat_updater(node_ops_cmd cmd) { auto req = node_ops_cmd_request{cmd, uuid(), {}, {}, {}}; co_await coroutine::parallel_for_each(sync_nodes, [&] (const gms::inet_address& node) -> future<> { try { - co_await ss._messaging.local().send_node_ops_cmd(netw::msg_addr(node), req); + co_await ser::node_ops_rpc_verbs::send_node_ops_cmd(&ss._messaging.local(), netw::msg_addr(node), req); nlogger.debug("{}[{}]: Got heartbeat response from node={}", desc, uuid(), node); } catch (...) { nlogger.warn("{}[{}]: Failed to get heartbeat response from node={}", desc, uuid(), node); diff --git a/repair/repair.cc b/repair/repair.cc index 65fd16fbd3..29a53e5e58 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -50,6 +50,7 @@ #include #include "idl/repair.dist.hh" +#include "idl/node_ops.dist.hh" #include "utils/user_provided_param.hh" using namespace std::chrono_literals; @@ -1372,7 +1373,7 @@ future<> repair::user_requested_repair_task_impl::run() { while (!as.abort_requested()) { sleep_abortable(update_interval, as).get(); parallel_for_each(participants, [&rs, uuid, &req] (gms::inet_address node) { - return rs._messaging.send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) { + return ser::node_ops_rpc_verbs::send_node_ops_cmd(&rs._messaging, netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) { rlogger.debug("repair[{}]: Got node_ops_cmd::repair_updater response from node={}", uuid, node); }).handle_exception([uuid, node] (std::exception_ptr ep) { rlogger.warn("repair[{}]: Failed to send node_ops_cmd::repair_updater to node={}", uuid, node); @@ -2527,7 +2528,7 @@ future<> repair::tablet_repair_task_impl::run() { while (!as.abort_requested()) { sleep_abortable(update_interval, as).get(); parallel_for_each(participants, [&rs, uuid, &req] (gms::inet_address node) { - return rs._messaging.send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) { + return ser::node_ops_rpc_verbs::send_node_ops_cmd(&rs._messaging, netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) { rlogger.debug("repair[{}]: Got node_ops_cmd::repair_updater response from node={}", uuid, node); }).handle_exception([uuid, node] (std::exception_ptr ep) { rlogger.warn("repair[{}]: Failed to send node_ops_cmd::repair_updater to node={}", uuid, node); diff --git a/service/storage_service.cc b/service/storage_service.cc index dfd98002ec..b59977e7e8 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -97,6 +97,7 @@ #include "service/raft/join_node.hh" #include "idl/join_node.dist.hh" #include "idl/migration_manager.dist.hh" +#include "idl/node_ops.dist.hh" #include "protocol_server.hh" #include "node_ops/node_ops_ctl.hh" #include "node_ops/task_manager_module.hh" @@ -3860,7 +3861,7 @@ void storage_service::run_bootstrap_ops(std::unordered_set& bootstrap_tok std::unordered_map> pending_ops; auto req = node_ops_cmd_request(node_ops_cmd::query_pending_ops, uuid); parallel_for_each(ctl.sync_nodes, [this, req, uuid, &pending_ops] (const gms::inet_address& node) { - return _messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node, &pending_ops] (node_ops_cmd_response resp) { + return ser::node_ops_rpc_verbs::send_node_ops_cmd(&_messaging.local(), netw::msg_addr(node), req).then([uuid, node, &pending_ops] (node_ops_cmd_response resp) { slogger.debug("bootstrap[{}]: Got query_pending_ops response from node={}, resp.pending_ops={}", uuid, node, resp.pending_ops); if (!resp.pending_ops.empty()) { pending_ops.emplace(node, resp.pending_ops); @@ -6942,7 +6943,7 @@ node_state storage_service::get_node_state(locator::host_id id) { } void storage_service::init_messaging_service() { - _messaging.local().register_node_ops_cmd([this] (const rpc::client_info& cinfo, node_ops_cmd_request req) { + ser::node_ops_rpc_verbs::register_node_ops_cmd(&_messaging.local(), [this] (const rpc::client_info& cinfo, node_ops_cmd_request req) { auto coordinator = cinfo.retrieve_auxiliary("baddr"); std::optional coordinator_host_id; if (const auto* id = cinfo.retrieve_auxiliary_opt("host_id")) { @@ -7092,7 +7093,7 @@ void storage_service::init_messaging_service() { future<> storage_service::uninit_messaging_service() { return when_all_succeed( - _messaging.local().unregister_node_ops_cmd(), + ser::node_ops_rpc_verbs::unregister(&_messaging.local()), ser::storage_service_rpc_verbs::unregister(&_messaging.local()), ser::join_node_rpc_verbs::unregister(&_messaging.local()) ).discard_result();