node_ops: move node_ops_cmd to IDL
This commit is contained in:
@@ -50,3 +50,5 @@ struct node_ops_cmd_response {
|
||||
bool ok;
|
||||
std::list<node_ops_id> pending_ops;
|
||||
};
|
||||
|
||||
verb [[with_client_info]] node_ops_cmd(node_ops_cmd_request) -> node_ops_cmd_response
|
||||
|
||||
@@ -1368,17 +1368,6 @@ future<std::vector<row_level_diff_detect_algorithm>> messaging_service::send_rep
|
||||
return send_message<future<std::vector<row_level_diff_detect_algorithm>>>(this, messaging_verb::REPAIR_GET_DIFF_ALGORITHMS, std::move(id));
|
||||
}
|
||||
|
||||
// Wrapper for NODE_OPS_CMD
|
||||
void messaging_service::register_node_ops_cmd(std::function<future<node_ops_cmd_response> (const rpc::client_info& cinfo, node_ops_cmd_request)>&& func) {
|
||||
register_handler(this, messaging_verb::NODE_OPS_CMD, std::move(func));
|
||||
}
|
||||
future<> messaging_service::unregister_node_ops_cmd() {
|
||||
return unregister_handler(messaging_verb::NODE_OPS_CMD);
|
||||
}
|
||||
future<node_ops_cmd_response> messaging_service::send_node_ops_cmd(msg_addr id, node_ops_cmd_request req) {
|
||||
return send_message<future<node_ops_cmd_response>>(this, messaging_verb::NODE_OPS_CMD, std::move(id), std::move(req));
|
||||
}
|
||||
|
||||
// Wrapper for TASKS_CHILDREN_REQUEST
|
||||
void messaging_service::register_tasks_get_children(std::function<future<tasks::get_children_response> (const rpc::client_info& cinfo, tasks::get_children_request)>&& func) {
|
||||
register_handler(this, messaging_verb::TASKS_GET_CHILDREN, std::move(func));
|
||||
|
||||
@@ -488,11 +488,6 @@ public:
|
||||
future<> unregister_repair_get_diff_algorithms();
|
||||
future<std::vector<row_level_diff_detect_algorithm>> send_repair_get_diff_algorithms(msg_addr id);
|
||||
|
||||
// Wrapper for NODE_OPS_CMD
|
||||
void register_node_ops_cmd(std::function<future<node_ops_cmd_response> (const rpc::client_info& cinfo, node_ops_cmd_request)>&& func);
|
||||
future<> unregister_node_ops_cmd();
|
||||
future<node_ops_cmd_response> send_node_ops_cmd(msg_addr id, node_ops_cmd_request);
|
||||
|
||||
// Wrapper for TASKS_GET_CHILDREN
|
||||
void register_tasks_get_children(std::function<future<tasks::get_children_response> (const rpc::client_info& cinfo, tasks::get_children_request)>&& func);
|
||||
future<> unregister_tasks_get_children();
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
#include <fmt/ranges.h>
|
||||
#include <seastar/core/sleep.hh>
|
||||
#include <seastar/coroutine/parallel_for_each.hh>
|
||||
#include "idl/node_ops.dist.hh"
|
||||
|
||||
static logging::logger nlogger("node_ops");
|
||||
|
||||
@@ -104,7 +105,7 @@ void node_ops_ctl::start_heartbeat_updater(node_ops_cmd cmd) {
|
||||
future<> node_ops_ctl::query_pending_op() {
|
||||
req.cmd = node_ops_cmd::query_pending_ops;
|
||||
co_await coroutine::parallel_for_each(sync_nodes, [this] (const gms::inet_address& node) -> future<> {
|
||||
auto resp = co_await ss._messaging.local().send_node_ops_cmd(netw::msg_addr(node), req);
|
||||
auto resp = co_await ser::node_ops_rpc_verbs::send_node_ops_cmd(&ss._messaging.local(), netw::msg_addr(node), req);
|
||||
nlogger.debug("{}[{}]: Got query_pending_ops response from node={}, resp.pending_ops={}", desc, uuid(), node, resp.pending_ops);
|
||||
if (boost::find(resp.pending_ops, uuid()) == resp.pending_ops.end()) {
|
||||
throw std::runtime_error(::format("{}[{}]: Node {} no longer tracks the operation", desc, uuid(), node));
|
||||
@@ -152,7 +153,7 @@ future<> node_ops_ctl::send_to_all(node_ops_cmd cmd) {
|
||||
co_return;
|
||||
}
|
||||
try {
|
||||
co_await ss._messaging.local().send_node_ops_cmd(netw::msg_addr(node), req);
|
||||
co_await ser::node_ops_rpc_verbs::send_node_ops_cmd(&ss._messaging.local(), netw::msg_addr(node), req);
|
||||
nlogger.debug("{}[{}]: Got {} response from node={}", desc, uuid(), op_desc, node);
|
||||
} catch (const seastar::rpc::unknown_verb_error&) {
|
||||
if (cmd_category == node_ops_cmd_category::prepare) {
|
||||
@@ -195,7 +196,7 @@ future<> node_ops_ctl::heartbeat_updater(node_ops_cmd cmd) {
|
||||
auto req = node_ops_cmd_request{cmd, uuid(), {}, {}, {}};
|
||||
co_await coroutine::parallel_for_each(sync_nodes, [&] (const gms::inet_address& node) -> future<> {
|
||||
try {
|
||||
co_await ss._messaging.local().send_node_ops_cmd(netw::msg_addr(node), req);
|
||||
co_await ser::node_ops_rpc_verbs::send_node_ops_cmd(&ss._messaging.local(), netw::msg_addr(node), req);
|
||||
nlogger.debug("{}[{}]: Got heartbeat response from node={}", desc, uuid(), node);
|
||||
} catch (...) {
|
||||
nlogger.warn("{}[{}]: Failed to get heartbeat response from node={}", desc, uuid(), node);
|
||||
|
||||
@@ -50,6 +50,7 @@
|
||||
#include <utility>
|
||||
|
||||
#include "idl/repair.dist.hh"
|
||||
#include "idl/node_ops.dist.hh"
|
||||
#include "utils/user_provided_param.hh"
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
@@ -1372,7 +1373,7 @@ future<> repair::user_requested_repair_task_impl::run() {
|
||||
while (!as.abort_requested()) {
|
||||
sleep_abortable(update_interval, as).get();
|
||||
parallel_for_each(participants, [&rs, uuid, &req] (gms::inet_address node) {
|
||||
return rs._messaging.send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) {
|
||||
return ser::node_ops_rpc_verbs::send_node_ops_cmd(&rs._messaging, netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) {
|
||||
rlogger.debug("repair[{}]: Got node_ops_cmd::repair_updater response from node={}", uuid, node);
|
||||
}).handle_exception([uuid, node] (std::exception_ptr ep) {
|
||||
rlogger.warn("repair[{}]: Failed to send node_ops_cmd::repair_updater to node={}", uuid, node);
|
||||
@@ -2527,7 +2528,7 @@ future<> repair::tablet_repair_task_impl::run() {
|
||||
while (!as.abort_requested()) {
|
||||
sleep_abortable(update_interval, as).get();
|
||||
parallel_for_each(participants, [&rs, uuid, &req] (gms::inet_address node) {
|
||||
return rs._messaging.send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) {
|
||||
return ser::node_ops_rpc_verbs::send_node_ops_cmd(&rs._messaging, netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) {
|
||||
rlogger.debug("repair[{}]: Got node_ops_cmd::repair_updater response from node={}", uuid, node);
|
||||
}).handle_exception([uuid, node] (std::exception_ptr ep) {
|
||||
rlogger.warn("repair[{}]: Failed to send node_ops_cmd::repair_updater to node={}", uuid, node);
|
||||
|
||||
@@ -97,6 +97,7 @@
|
||||
#include "service/raft/join_node.hh"
|
||||
#include "idl/join_node.dist.hh"
|
||||
#include "idl/migration_manager.dist.hh"
|
||||
#include "idl/node_ops.dist.hh"
|
||||
#include "protocol_server.hh"
|
||||
#include "node_ops/node_ops_ctl.hh"
|
||||
#include "node_ops/task_manager_module.hh"
|
||||
@@ -3860,7 +3861,7 @@ void storage_service::run_bootstrap_ops(std::unordered_set<token>& bootstrap_tok
|
||||
std::unordered_map<gms::inet_address, std::list<node_ops_id>> pending_ops;
|
||||
auto req = node_ops_cmd_request(node_ops_cmd::query_pending_ops, uuid);
|
||||
parallel_for_each(ctl.sync_nodes, [this, req, uuid, &pending_ops] (const gms::inet_address& node) {
|
||||
return _messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node, &pending_ops] (node_ops_cmd_response resp) {
|
||||
return ser::node_ops_rpc_verbs::send_node_ops_cmd(&_messaging.local(), netw::msg_addr(node), req).then([uuid, node, &pending_ops] (node_ops_cmd_response resp) {
|
||||
slogger.debug("bootstrap[{}]: Got query_pending_ops response from node={}, resp.pending_ops={}", uuid, node, resp.pending_ops);
|
||||
if (!resp.pending_ops.empty()) {
|
||||
pending_ops.emplace(node, resp.pending_ops);
|
||||
@@ -6942,7 +6943,7 @@ node_state storage_service::get_node_state(locator::host_id id) {
|
||||
}
|
||||
|
||||
void storage_service::init_messaging_service() {
|
||||
_messaging.local().register_node_ops_cmd([this] (const rpc::client_info& cinfo, node_ops_cmd_request req) {
|
||||
ser::node_ops_rpc_verbs::register_node_ops_cmd(&_messaging.local(), [this] (const rpc::client_info& cinfo, node_ops_cmd_request req) {
|
||||
auto coordinator = cinfo.retrieve_auxiliary<gms::inet_address>("baddr");
|
||||
std::optional<locator::host_id> coordinator_host_id;
|
||||
if (const auto* id = cinfo.retrieve_auxiliary_opt<locator::host_id>("host_id")) {
|
||||
@@ -7092,7 +7093,7 @@ void storage_service::init_messaging_service() {
|
||||
|
||||
future<> storage_service::uninit_messaging_service() {
|
||||
return when_all_succeed(
|
||||
_messaging.local().unregister_node_ops_cmd(),
|
||||
ser::node_ops_rpc_verbs::unregister(&_messaging.local()),
|
||||
ser::storage_service_rpc_verbs::unregister(&_messaging.local()),
|
||||
ser::join_node_rpc_verbs::unregister(&_messaging.local())
|
||||
).discard_result();
|
||||
|
||||
Reference in New Issue
Block a user