From b47faed54f8e032acd97644ffeacf48c4fe1af67 Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Thu, 28 Nov 2024 15:57:04 +0200 Subject: [PATCH 1/6] idl: move node_ops related stuff from the repair related IDL Create separate IDL file for node_ops stuff. --- configure.py | 1 + idl/CMakeLists.txt | 1 + idl/node_ops.idl.hh | 52 +++++++++++++++++++++++++++++++++++ idl/partition_checksum.idl.hh | 52 ----------------------------------- message/messaging_service.cc | 2 ++ 5 files changed, 56 insertions(+), 52 deletions(-) create mode 100644 idl/node_ops.idl.hh diff --git a/configure.py b/configure.py index 63fe026e78..b20550786a 100755 --- a/configure.py +++ b/configure.py @@ -1316,6 +1316,7 @@ idls = ['idl/gossip_digest.idl.hh', 'idl/utils.idl.hh', 'idl/gossip.idl.hh', 'idl/migration_manager.idl.hh', + "idl/node_ops.idl.hh", ] diff --git a/idl/CMakeLists.txt b/idl/CMakeLists.txt index d213f7ddc2..f98cbb6cf2 100644 --- a/idl/CMakeLists.txt +++ b/idl/CMakeLists.txt @@ -63,6 +63,7 @@ set(idl_headers utils.idl.hh gossip.idl.hh migration_manager.idl.hh + node_ops.idl.hh ) foreach(idl_header ${idl_headers}) diff --git a/idl/node_ops.idl.hh b/idl/node_ops.idl.hh new file mode 100644 index 0000000000..2768541437 --- /dev/null +++ b/idl/node_ops.idl.hh @@ -0,0 +1,52 @@ +/* + * Copyright 2024-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +#include "node_ops/id.hh" + +enum class node_ops_cmd : uint32_t { + removenode_prepare, + removenode_heartbeat, + removenode_sync_data, + removenode_abort, + removenode_done, + replace_prepare, + replace_prepare_mark_alive, + replace_prepare_pending_ranges, + replace_heartbeat, + replace_abort, + replace_done, + decommission_prepare, + decommission_heartbeat, + decommission_abort, + decommission_done, + bootstrap_prepare, + bootstrap_heartbeat, + bootstrap_abort, + bootstrap_done, + query_pending_ops, + repair_updater, +}; + +class node_ops_id final { + utils::UUID uuid(); +}; + +struct node_ops_cmd_request { + node_ops_cmd cmd; + node_ops_id ops_uuid; + std::list ignore_nodes; + std::list leaving_nodes; + std::unordered_map replace_nodes; + std::unordered_map> bootstrap_nodes; + std::list repair_tables; +}; + +struct node_ops_cmd_response { + bool ok; + std::list pending_ops; +}; diff --git a/idl/partition_checksum.idl.hh b/idl/partition_checksum.idl.hh index 2d6f23be53..c3436ad299 100644 --- a/idl/partition_checksum.idl.hh +++ b/idl/partition_checksum.idl.hh @@ -73,58 +73,6 @@ struct repair_row_level_start_response { repair_row_level_start_status status; }; -enum class node_ops_cmd : uint32_t { - removenode_prepare, - removenode_heartbeat, - removenode_sync_data, - removenode_abort, - removenode_done, - replace_prepare, - replace_prepare_mark_alive, - replace_prepare_pending_ranges, - replace_heartbeat, - replace_abort, - replace_done, - decommission_prepare, - decommission_heartbeat, - decommission_abort, - decommission_done, - bootstrap_prepare, - bootstrap_heartbeat, - bootstrap_abort, - bootstrap_done, - query_pending_ops, - repair_updater, -}; - -class node_ops_id final { - utils::UUID uuid(); -}; - -struct node_ops_cmd_request { - // Mandatory field, set by all cmds - node_ops_cmd cmd; - // Mandatory field, set by all cmds - node_ops_id ops_uuid; - // Optional field, list nodes to ignore, set by all cmds - std::list ignore_nodes; - // Optional field, list leaving nodes, set by decommission and removenode cmd - std::list leaving_nodes; - // Optional field, map existing nodes to replacing nodes, set by replace cmd - std::unordered_map replace_nodes; - // Optional field, map bootstrapping nodes to bootstrap tokens, set by bootstrap cmd - std::unordered_map> bootstrap_nodes; - // Optional field, list uuids of tables being repaired, set by repair cmd - std::list repair_tables; -}; - -struct node_ops_cmd_response { - // Mandatory field, set by all cmds - bool ok; - // Optional field, set by query_pending_ops cmd - std::list pending_ops; -}; - struct repair_update_system_table_request { tasks::task_id repair_uuid; table_id table_uuid; diff --git a/message/messaging_service.cc b/message/messaging_service.cc index 9c505ea3cf..7aa614b088 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -62,6 +62,7 @@ #include "idl/range.dist.hh" #include "idl/position_in_partition.dist.hh" #include "idl/partition_checksum.dist.hh" +#include "idl/node_ops.dist.hh" #include "idl/query.dist.hh" #include "idl/cache_temperature.dist.hh" #include "idl/view.dist.hh" @@ -116,6 +117,7 @@ #include "streaming/stream_manager.hh" #include "streaming/stream_mutation_fragments_cmd.hh" #include "idl/partition_checksum.dist.impl.hh" +#include "idl/node_ops.dist.impl.hh" #include "idl/mapreduce_request.dist.hh" #include "idl/mapreduce_request.dist.impl.hh" #include "idl/storage_service.dist.impl.hh" From 39c75d3add0234cb7f04aeb2b411a3ad0b092001 Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Thu, 28 Nov 2024 16:18:30 +0200 Subject: [PATCH 2/6] idl: rename partition_checksum.dist.hh to repair.dist.hh The file has many more things than partition_checksum. All of them are repair related now. --- configure.py | 2 +- idl/CMakeLists.txt | 2 +- idl/{partition_checksum.idl.hh => repair.idl.hh} | 0 message/messaging_service.cc | 4 ++-- repair/repair.cc | 4 ++-- repair/row_level.cc | 11 +++++------ 6 files changed, 11 insertions(+), 12 deletions(-) rename idl/{partition_checksum.idl.hh => repair.idl.hh} (100%) diff --git a/configure.py b/configure.py index b20550786a..766efb693a 100755 --- a/configure.py +++ b/configure.py @@ -1288,7 +1288,7 @@ idls = ['idl/gossip_digest.idl.hh', 'idl/streaming.idl.hh', 'idl/paging_state.idl.hh', 'idl/frozen_schema.idl.hh', - 'idl/partition_checksum.idl.hh', + 'idl/repair.idl.hh', 'idl/replay_position.idl.hh', 'idl/mutation.idl.hh', 'idl/query.idl.hh', diff --git a/idl/CMakeLists.txt b/idl/CMakeLists.txt index f98cbb6cf2..91672c5abf 100644 --- a/idl/CMakeLists.txt +++ b/idl/CMakeLists.txt @@ -35,7 +35,7 @@ set(idl_headers streaming.idl.hh paging_state.idl.hh frozen_schema.idl.hh - partition_checksum.idl.hh + repair.idl.hh replay_position.idl.hh mutation.idl.hh query.idl.hh diff --git a/idl/partition_checksum.idl.hh b/idl/repair.idl.hh similarity index 100% rename from idl/partition_checksum.idl.hh rename to idl/repair.idl.hh diff --git a/message/messaging_service.cc b/message/messaging_service.cc index 7aa614b088..21075ee1e2 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -61,7 +61,7 @@ #include "idl/read_command.dist.hh" #include "idl/range.dist.hh" #include "idl/position_in_partition.dist.hh" -#include "idl/partition_checksum.dist.hh" +#include "idl/repair.dist.hh" #include "idl/node_ops.dist.hh" #include "idl/query.dist.hh" #include "idl/cache_temperature.dist.hh" @@ -116,7 +116,7 @@ #include "mutation/frozen_mutation.hh" #include "streaming/stream_manager.hh" #include "streaming/stream_mutation_fragments_cmd.hh" -#include "idl/partition_checksum.dist.impl.hh" +#include "idl/repair.dist.impl.hh" #include "idl/node_ops.dist.impl.hh" #include "idl/mapreduce_request.dist.hh" #include "idl/mapreduce_request.dist.impl.hh" diff --git a/repair/repair.cc b/repair/repair.cc index a0314fee26..65fd16fbd3 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -49,7 +49,7 @@ #include #include -#include "idl/partition_checksum.dist.hh" +#include "idl/repair.dist.hh" #include "utils/user_provided_param.hh" using namespace std::chrono_literals; @@ -408,7 +408,7 @@ future> repair_service::flush_hints(repai uuid, node, participants); try { auto& ms = get_messaging(); - auto resp = co_await ser::partition_checksum_rpc_verbs::send_repair_flush_hints_batchlog(&ms, netw::msg_addr(node), req); + auto resp = co_await ser::repair_rpc_verbs::send_repair_flush_hints_batchlog(&ms, netw::msg_addr(node), req); if (resp.flush_time == gc_clock::time_point()) { // This means the node does not support sending flush_time back. Use the time when the flush is requested for flush_time. rlogger.debug("repair[{}]: Got empty flush_time from node={}. Please upgrade the node={}.", uuid, node, node); diff --git a/repair/row_level.cc b/repair/row_level.cc index f141b9956a..d262604eff 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -49,7 +49,7 @@ #include "db/system_keyspace.hh" #include "service/storage_proxy.hh" #include "db/batchlog_manager.hh" -#include "idl/partition_checksum.dist.hh" +#include "idl/repair.dist.hh" #include "readers/empty_v2.hh" #include "readers/evictable.hh" #include "readers/queue.hh" @@ -2565,11 +2565,11 @@ future<> repair_service::init_ms_handlers() { ms.register_repair_get_diff_algorithms([] (const rpc::client_info& cinfo) { return make_ready_future>(suportted_diff_detect_algorithms()); }); - ser::partition_checksum_rpc_verbs::register_repair_update_system_table(&ms, [this] (const rpc::client_info& cinfo, repair_update_system_table_request req) { + ser::repair_rpc_verbs::register_repair_update_system_table(&ms, [this] (const rpc::client_info& cinfo, repair_update_system_table_request req) { auto from = cinfo.retrieve_auxiliary("baddr"); return repair_update_system_table_handler(from, std::move(req)); }); - ser::partition_checksum_rpc_verbs::register_repair_flush_hints_batchlog(&ms, [this] (const rpc::client_info& cinfo, repair_flush_hints_batchlog_request req) { + ser::repair_rpc_verbs::register_repair_flush_hints_batchlog(&ms, [this] (const rpc::client_info& cinfo, repair_flush_hints_batchlog_request req) { auto from = cinfo.retrieve_auxiliary("baddr"); return repair_flush_hints_batchlog_handler(from, std::move(req)); }); @@ -2594,8 +2594,7 @@ future<> repair_service::uninit_ms_handlers() { ms.unregister_repair_get_estimated_partitions(), ms.unregister_repair_set_estimated_partitions(), ms.unregister_repair_get_diff_algorithms(), - ser::partition_checksum_rpc_verbs::unregister_repair_update_system_table(&ms), - ser::partition_checksum_rpc_verbs::unregister_repair_flush_hints_batchlog(&ms) + ser::repair_rpc_verbs::unregister(&ms) ).discard_result(); } @@ -3019,7 +3018,7 @@ private: co_await coroutine::parallel_for_each(all_nodes, [this, req] (gms::inet_address node) -> future<> { try { auto& ms = _shard_task.messaging.local(); - repair_update_system_table_response resp = co_await ser::partition_checksum_rpc_verbs::send_repair_update_system_table(&ms, netw::messaging_service::msg_addr(node), req); + repair_update_system_table_response resp = co_await ser::repair_rpc_verbs::send_repair_update_system_table(&ms, netw::messaging_service::msg_addr(node), req); (void)resp; // nothing to do with the response yet rlogger.debug("repair[{}]: Finished to update system.repair_history table of node {}", _shard_task.global_repair_id.uuid(), node); } catch (...) { From 5f6007f6ec6d98c21cfc1a468f154b08878e99d4 Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Thu, 28 Nov 2024 16:47:47 +0200 Subject: [PATCH 3/6] node_ops: move node_ops_cmd to IDL --- idl/node_ops.idl.hh | 2 ++ message/messaging_service.cc | 11 ----------- message/messaging_service.hh | 5 ----- node_ops/node_ops_ctl.cc | 7 ++++--- repair/repair.cc | 5 +++-- service/storage_service.cc | 7 ++++--- 6 files changed, 13 insertions(+), 24 deletions(-) diff --git a/idl/node_ops.idl.hh b/idl/node_ops.idl.hh index 2768541437..0279456c36 100644 --- a/idl/node_ops.idl.hh +++ b/idl/node_ops.idl.hh @@ -50,3 +50,5 @@ struct node_ops_cmd_response { bool ok; std::list pending_ops; }; + +verb [[with_client_info]] node_ops_cmd(node_ops_cmd_request) -> node_ops_cmd_response diff --git a/message/messaging_service.cc b/message/messaging_service.cc index 21075ee1e2..760d1263a7 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -1368,17 +1368,6 @@ future> messaging_service::send_rep return send_message>>(this, messaging_verb::REPAIR_GET_DIFF_ALGORITHMS, std::move(id)); } -// Wrapper for NODE_OPS_CMD -void messaging_service::register_node_ops_cmd(std::function (const rpc::client_info& cinfo, node_ops_cmd_request)>&& func) { - register_handler(this, messaging_verb::NODE_OPS_CMD, std::move(func)); -} -future<> messaging_service::unregister_node_ops_cmd() { - return unregister_handler(messaging_verb::NODE_OPS_CMD); -} -future messaging_service::send_node_ops_cmd(msg_addr id, node_ops_cmd_request req) { - return send_message>(this, messaging_verb::NODE_OPS_CMD, std::move(id), std::move(req)); -} - // Wrapper for TASKS_CHILDREN_REQUEST void messaging_service::register_tasks_get_children(std::function (const rpc::client_info& cinfo, tasks::get_children_request)>&& func) { register_handler(this, messaging_verb::TASKS_GET_CHILDREN, std::move(func)); diff --git a/message/messaging_service.hh b/message/messaging_service.hh index aa1f67bd68..32a5fbd3e2 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -488,11 +488,6 @@ public: future<> unregister_repair_get_diff_algorithms(); future> send_repair_get_diff_algorithms(msg_addr id); - // Wrapper for NODE_OPS_CMD - void register_node_ops_cmd(std::function (const rpc::client_info& cinfo, node_ops_cmd_request)>&& func); - future<> unregister_node_ops_cmd(); - future send_node_ops_cmd(msg_addr id, node_ops_cmd_request); - // Wrapper for TASKS_GET_CHILDREN void register_tasks_get_children(std::function (const rpc::client_info& cinfo, tasks::get_children_request)>&& func); future<> unregister_tasks_get_children(); diff --git a/node_ops/node_ops_ctl.cc b/node_ops/node_ops_ctl.cc index c8709cf7e7..6726e3452f 100644 --- a/node_ops/node_ops_ctl.cc +++ b/node_ops/node_ops_ctl.cc @@ -16,6 +16,7 @@ #include #include #include +#include "idl/node_ops.dist.hh" static logging::logger nlogger("node_ops"); @@ -104,7 +105,7 @@ void node_ops_ctl::start_heartbeat_updater(node_ops_cmd cmd) { future<> node_ops_ctl::query_pending_op() { req.cmd = node_ops_cmd::query_pending_ops; co_await coroutine::parallel_for_each(sync_nodes, [this] (const gms::inet_address& node) -> future<> { - auto resp = co_await ss._messaging.local().send_node_ops_cmd(netw::msg_addr(node), req); + auto resp = co_await ser::node_ops_rpc_verbs::send_node_ops_cmd(&ss._messaging.local(), netw::msg_addr(node), req); nlogger.debug("{}[{}]: Got query_pending_ops response from node={}, resp.pending_ops={}", desc, uuid(), node, resp.pending_ops); if (boost::find(resp.pending_ops, uuid()) == resp.pending_ops.end()) { throw std::runtime_error(::format("{}[{}]: Node {} no longer tracks the operation", desc, uuid(), node)); @@ -152,7 +153,7 @@ future<> node_ops_ctl::send_to_all(node_ops_cmd cmd) { co_return; } try { - co_await ss._messaging.local().send_node_ops_cmd(netw::msg_addr(node), req); + co_await ser::node_ops_rpc_verbs::send_node_ops_cmd(&ss._messaging.local(), netw::msg_addr(node), req); nlogger.debug("{}[{}]: Got {} response from node={}", desc, uuid(), op_desc, node); } catch (const seastar::rpc::unknown_verb_error&) { if (cmd_category == node_ops_cmd_category::prepare) { @@ -195,7 +196,7 @@ future<> node_ops_ctl::heartbeat_updater(node_ops_cmd cmd) { auto req = node_ops_cmd_request{cmd, uuid(), {}, {}, {}}; co_await coroutine::parallel_for_each(sync_nodes, [&] (const gms::inet_address& node) -> future<> { try { - co_await ss._messaging.local().send_node_ops_cmd(netw::msg_addr(node), req); + co_await ser::node_ops_rpc_verbs::send_node_ops_cmd(&ss._messaging.local(), netw::msg_addr(node), req); nlogger.debug("{}[{}]: Got heartbeat response from node={}", desc, uuid(), node); } catch (...) { nlogger.warn("{}[{}]: Failed to get heartbeat response from node={}", desc, uuid(), node); diff --git a/repair/repair.cc b/repair/repair.cc index 65fd16fbd3..29a53e5e58 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -50,6 +50,7 @@ #include #include "idl/repair.dist.hh" +#include "idl/node_ops.dist.hh" #include "utils/user_provided_param.hh" using namespace std::chrono_literals; @@ -1372,7 +1373,7 @@ future<> repair::user_requested_repair_task_impl::run() { while (!as.abort_requested()) { sleep_abortable(update_interval, as).get(); parallel_for_each(participants, [&rs, uuid, &req] (gms::inet_address node) { - return rs._messaging.send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) { + return ser::node_ops_rpc_verbs::send_node_ops_cmd(&rs._messaging, netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) { rlogger.debug("repair[{}]: Got node_ops_cmd::repair_updater response from node={}", uuid, node); }).handle_exception([uuid, node] (std::exception_ptr ep) { rlogger.warn("repair[{}]: Failed to send node_ops_cmd::repair_updater to node={}", uuid, node); @@ -2527,7 +2528,7 @@ future<> repair::tablet_repair_task_impl::run() { while (!as.abort_requested()) { sleep_abortable(update_interval, as).get(); parallel_for_each(participants, [&rs, uuid, &req] (gms::inet_address node) { - return rs._messaging.send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) { + return ser::node_ops_rpc_verbs::send_node_ops_cmd(&rs._messaging, netw::msg_addr(node), req).then([uuid, node] (node_ops_cmd_response resp) { rlogger.debug("repair[{}]: Got node_ops_cmd::repair_updater response from node={}", uuid, node); }).handle_exception([uuid, node] (std::exception_ptr ep) { rlogger.warn("repair[{}]: Failed to send node_ops_cmd::repair_updater to node={}", uuid, node); diff --git a/service/storage_service.cc b/service/storage_service.cc index dfd98002ec..b59977e7e8 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -97,6 +97,7 @@ #include "service/raft/join_node.hh" #include "idl/join_node.dist.hh" #include "idl/migration_manager.dist.hh" +#include "idl/node_ops.dist.hh" #include "protocol_server.hh" #include "node_ops/node_ops_ctl.hh" #include "node_ops/task_manager_module.hh" @@ -3860,7 +3861,7 @@ void storage_service::run_bootstrap_ops(std::unordered_set& bootstrap_tok std::unordered_map> pending_ops; auto req = node_ops_cmd_request(node_ops_cmd::query_pending_ops, uuid); parallel_for_each(ctl.sync_nodes, [this, req, uuid, &pending_ops] (const gms::inet_address& node) { - return _messaging.local().send_node_ops_cmd(netw::msg_addr(node), req).then([uuid, node, &pending_ops] (node_ops_cmd_response resp) { + return ser::node_ops_rpc_verbs::send_node_ops_cmd(&_messaging.local(), netw::msg_addr(node), req).then([uuid, node, &pending_ops] (node_ops_cmd_response resp) { slogger.debug("bootstrap[{}]: Got query_pending_ops response from node={}, resp.pending_ops={}", uuid, node, resp.pending_ops); if (!resp.pending_ops.empty()) { pending_ops.emplace(node, resp.pending_ops); @@ -6942,7 +6943,7 @@ node_state storage_service::get_node_state(locator::host_id id) { } void storage_service::init_messaging_service() { - _messaging.local().register_node_ops_cmd([this] (const rpc::client_info& cinfo, node_ops_cmd_request req) { + ser::node_ops_rpc_verbs::register_node_ops_cmd(&_messaging.local(), [this] (const rpc::client_info& cinfo, node_ops_cmd_request req) { auto coordinator = cinfo.retrieve_auxiliary("baddr"); std::optional coordinator_host_id; if (const auto* id = cinfo.retrieve_auxiliary_opt("host_id")) { @@ -7092,7 +7093,7 @@ void storage_service::init_messaging_service() { future<> storage_service::uninit_messaging_service() { return when_all_succeed( - _messaging.local().unregister_node_ops_cmd(), + ser::node_ops_rpc_verbs::unregister(&_messaging.local()), ser::storage_service_rpc_verbs::unregister(&_messaging.local()), ser::join_node_rpc_verbs::unregister(&_messaging.local()) ).discard_result(); From bfee93c74746c8555db23b7002cc01084d981b01 Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Sun, 1 Dec 2024 17:43:45 +0200 Subject: [PATCH 4/6] messaging_service: move repair verbs to IDL --- idl/repair.idl.hh | 10 ++++ message/messaging_service.cc | 109 ----------------------------------- message/messaging_service.hh | 50 ---------------- repair/row_level.cc | 52 +++++++---------- 4 files changed, 31 insertions(+), 190 deletions(-) diff --git a/idl/repair.idl.hh b/idl/repair.idl.hh index c3436ad299..468177e891 100644 --- a/idl/repair.idl.hh +++ b/idl/repair.idl.hh @@ -98,3 +98,13 @@ struct repair_flush_hints_batchlog_response { verb [[with_client_info]] repair_update_system_table (repair_update_system_table_request req [[ref]]) -> repair_update_system_table_response; verb [[with_client_info]] repair_flush_hints_batchlog (repair_flush_hints_batchlog_request req [[ref]]) -> repair_flush_hints_batchlog_response; +verb [[with_client_info]] repair_get_full_row_hashes (uint32_t repair_meta_id, shard_id dst_shard_id [[version 5.2]]) -> repair_hash_set; +verb [[with_client_info]] repair_get_combined_row_hash (uint32_t repair_meta_id, std::optional common_sync_boundary, shard_id dst_shard_id [[version 5.2]]) -> get_combined_row_hash_response; +verb [[with_client_info]] repair_get_sync_boundary (uint32_t repair_meta_id, std::optional skipped_sync_boundary, shard_id dst_shard_id [[version 5.2]]) -> get_sync_boundary_response; +verb [[with_client_info]] repair_get_row_diff (uint32_t repair_meta_id, repair_hash_set set_diff, bool needs_all_rows, shard_id dst_shard_id [[version 5.2]]) -> repair_rows_on_wire; +verb [[with_client_info]] repair_put_row_diff (uint32_t repair_meta_id, repair_rows_on_wire row_diff, shard_id dst_shard_id [[version 5.2]]); +verb [[with_client_info]] repair_row_level_start (uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed, unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version, streaming::stream_reason reason [[version 4.1.0]], gc_clock::time_point compaction_time [[version 5.2]], shard_id dst_shard_id [[version 5.2]]) -> repair_row_level_start_response [[version 4.2.0]]; +verb [[with_client_info]] repair_row_level_stop (uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, shard_id dst_shard_id [[version 5.2]]); +verb [[with_client_info]] repair_get_estimated_partitions (uint32_t repair_meta_id, shard_id dst_shard_id [[version 5.2]]) -> uint64_t; +verb [[with_client_info]] repair_set_estimated_partitions (uint32_t repair_meta_id, uint64_t estimated_partitions, shard_id dst_shard_id [[version 5.2]]); +verb [[with_client_info]] repair_get_diff_algorithms () -> std::vector; diff --git a/message/messaging_service.cc b/message/messaging_service.cc index 760d1263a7..3fc78eaf2a 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -1259,115 +1259,6 @@ future<> messaging_service::unregister_complete_message() { return unregister_handler(messaging_verb::COMPLETE_MESSAGE); } -// Wrapper for REPAIR_GET_FULL_ROW_HASHES -void messaging_service::register_repair_get_full_row_hashes(std::function (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::optional dst_shard_id)>&& func) { - register_handler(this, messaging_verb::REPAIR_GET_FULL_ROW_HASHES, std::move(func)); -} -future<> messaging_service::unregister_repair_get_full_row_hashes() { - return unregister_handler(messaging_verb::REPAIR_GET_FULL_ROW_HASHES); -} -future messaging_service::send_repair_get_full_row_hashes(msg_addr id, uint32_t repair_meta_id, shard_id dst_shard_id) { - return send_message>(this, messaging_verb::REPAIR_GET_FULL_ROW_HASHES, std::move(id), repair_meta_id, dst_shard_id); -} - -// Wrapper for REPAIR_GET_COMBINED_ROW_HASH -void messaging_service::register_repair_get_combined_row_hash(std::function (const rpc::client_info& cinfo, uint32_t repair_meta_id, std::optional common_sync_boundary, rpc::optional dst_shard_id)>&& func) { - register_handler(this, messaging_verb::REPAIR_GET_COMBINED_ROW_HASH, std::move(func)); -} -future<> messaging_service::unregister_repair_get_combined_row_hash() { - return unregister_handler(messaging_verb::REPAIR_GET_COMBINED_ROW_HASH); -} -future messaging_service::send_repair_get_combined_row_hash(msg_addr id, uint32_t repair_meta_id, std::optional common_sync_boundary, shard_id dst_shard_id) { - return send_message>(this, messaging_verb::REPAIR_GET_COMBINED_ROW_HASH, std::move(id), repair_meta_id, std::move(common_sync_boundary), dst_shard_id); -} - -void messaging_service::register_repair_get_sync_boundary(std::function (const rpc::client_info& cinfo, uint32_t repair_meta_id, std::optional skipped_sync_boundary, rpc::optional dst_shard_id)>&& func) { - register_handler(this, messaging_verb::REPAIR_GET_SYNC_BOUNDARY, std::move(func)); -} -future<> messaging_service::unregister_repair_get_sync_boundary() { - return unregister_handler(messaging_verb::REPAIR_GET_SYNC_BOUNDARY); -} -future messaging_service::send_repair_get_sync_boundary(msg_addr id, uint32_t repair_meta_id, std::optional skipped_sync_boundary, shard_id dst_shard_id) { - return send_message>(this, messaging_verb::REPAIR_GET_SYNC_BOUNDARY, std::move(id), repair_meta_id, std::move(skipped_sync_boundary), dst_shard_id); -} - -// Wrapper for REPAIR_GET_ROW_DIFF -void messaging_service::register_repair_get_row_diff(std::function (const rpc::client_info& cinfo, uint32_t repair_meta_id, repair_hash_set set_diff, bool needs_all_rows, rpc::optional dst_shard_id)>&& func) { - register_handler(this, messaging_verb::REPAIR_GET_ROW_DIFF, std::move(func)); -} -future<> messaging_service::unregister_repair_get_row_diff() { - return unregister_handler(messaging_verb::REPAIR_GET_ROW_DIFF); -} -future messaging_service::send_repair_get_row_diff(msg_addr id, uint32_t repair_meta_id, repair_hash_set set_diff, bool needs_all_rows, shard_id dst_shard_id) { - return send_message>(this, messaging_verb::REPAIR_GET_ROW_DIFF, std::move(id), repair_meta_id, std::move(set_diff), needs_all_rows, dst_shard_id); -} - -// Wrapper for REPAIR_PUT_ROW_DIFF -void messaging_service::register_repair_put_row_diff(std::function (const rpc::client_info& cinfo, uint32_t repair_meta_id, repair_rows_on_wire row_diff, rpc::optional dst_shard_id)>&& func) { - register_handler(this, messaging_verb::REPAIR_PUT_ROW_DIFF, std::move(func)); -} -future<> messaging_service::unregister_repair_put_row_diff() { - return unregister_handler(messaging_verb::REPAIR_PUT_ROW_DIFF); -} -future<> messaging_service::send_repair_put_row_diff(msg_addr id, uint32_t repair_meta_id, repair_rows_on_wire row_diff, shard_id dst_shard_id) { - return send_message(this, messaging_verb::REPAIR_PUT_ROW_DIFF, std::move(id), repair_meta_id, std::move(row_diff), dst_shard_id); -} - -// Wrapper for REPAIR_ROW_LEVEL_START -void messaging_service::register_repair_row_level_start(std::function (const rpc::client_info& cinfo, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed, unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version, rpc::optional reason, rpc::optional compaction_time, rpc::optional dst_shard_id)>&& func) { - register_handler(this, messaging_verb::REPAIR_ROW_LEVEL_START, std::move(func)); -} -future<> messaging_service::unregister_repair_row_level_start() { - return unregister_handler(messaging_verb::REPAIR_ROW_LEVEL_START); -} -future> messaging_service::send_repair_row_level_start(msg_addr id, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed, unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version, streaming::stream_reason reason, gc_clock::time_point compaction_time, shard_id dst_shard_id) { - return send_message>(this, messaging_verb::REPAIR_ROW_LEVEL_START, std::move(id), repair_meta_id, std::move(keyspace_name), std::move(cf_name), std::move(range), algo, max_row_buf_size, seed, remote_shard, remote_shard_count, remote_ignore_msb, std::move(remote_partitioner_name), std::move(schema_version), reason, compaction_time, dst_shard_id); -} - -// Wrapper for REPAIR_ROW_LEVEL_STOP -void messaging_service::register_repair_row_level_stop(std::function (const rpc::client_info& cinfo, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, rpc::optional dst_shard_id)>&& func) { - register_handler(this, messaging_verb::REPAIR_ROW_LEVEL_STOP, std::move(func)); -} -future<> messaging_service::unregister_repair_row_level_stop() { - return unregister_handler(messaging_verb::REPAIR_ROW_LEVEL_STOP); -} -future<> messaging_service::send_repair_row_level_stop(msg_addr id, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, shard_id dst_shard_id) { - return send_message(this, messaging_verb::REPAIR_ROW_LEVEL_STOP, std::move(id), repair_meta_id, std::move(keyspace_name), std::move(cf_name), std::move(range), dst_shard_id); -} - -// Wrapper for REPAIR_GET_ESTIMATED_PARTITIONS -void messaging_service::register_repair_get_estimated_partitions(std::function (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::optional dst_shard_id)>&& func) { - register_handler(this, messaging_verb::REPAIR_GET_ESTIMATED_PARTITIONS, std::move(func)); -} -future<> messaging_service::unregister_repair_get_estimated_partitions() { - return unregister_handler(messaging_verb::REPAIR_GET_ESTIMATED_PARTITIONS); -} -future messaging_service::send_repair_get_estimated_partitions(msg_addr id, uint32_t repair_meta_id, shard_id dst_shard_id) { - return send_message>(this, messaging_verb::REPAIR_GET_ESTIMATED_PARTITIONS, std::move(id), repair_meta_id, dst_shard_id); -} - -// Wrapper for REPAIR_SET_ESTIMATED_PARTITIONS -void messaging_service::register_repair_set_estimated_partitions(std::function (const rpc::client_info& cinfo, uint32_t repair_meta_id, uint64_t estimated_partitions, rpc::optional dst_shard_id)>&& func) { - register_handler(this, messaging_verb::REPAIR_SET_ESTIMATED_PARTITIONS, std::move(func)); -} -future<> messaging_service::unregister_repair_set_estimated_partitions() { - return unregister_handler(messaging_verb::REPAIR_SET_ESTIMATED_PARTITIONS); -} -future<> messaging_service::send_repair_set_estimated_partitions(msg_addr id, uint32_t repair_meta_id, uint64_t estimated_partitions, shard_id dst_shard_id) { - return send_message(this, messaging_verb::REPAIR_SET_ESTIMATED_PARTITIONS, std::move(id), repair_meta_id, estimated_partitions, dst_shard_id); -} - -// Wrapper for REPAIR_GET_DIFF_ALGORITHMS -void messaging_service::register_repair_get_diff_algorithms(std::function> (const rpc::client_info& cinfo)>&& func) { - register_handler(this, messaging_verb::REPAIR_GET_DIFF_ALGORITHMS, std::move(func)); -} -future<> messaging_service::unregister_repair_get_diff_algorithms() { - return unregister_handler(messaging_verb::REPAIR_GET_DIFF_ALGORITHMS); -} -future> messaging_service::send_repair_get_diff_algorithms(msg_addr id) { - return send_message>>(this, messaging_verb::REPAIR_GET_DIFF_ALGORITHMS, std::move(id)); -} - // Wrapper for TASKS_CHILDREN_REQUEST void messaging_service::register_tasks_get_children(std::function (const rpc::client_info& cinfo, tasks::get_children_request)>&& func) { register_handler(this, messaging_verb::TASKS_GET_CHILDREN, std::move(func)); diff --git a/message/messaging_service.hh b/message/messaging_service.hh index 32a5fbd3e2..a59983e482 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -438,56 +438,6 @@ public: future<> send_complete_message(msg_addr id, streaming::plan_id plan_id, unsigned dst_cpu_id, bool failed = false); future<> unregister_complete_message(); - // Wrapper for REPAIR_GET_FULL_ROW_HASHES - void register_repair_get_full_row_hashes(std::function (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::optional dst_cpu_id)>&& func); - future<> unregister_repair_get_full_row_hashes(); - future send_repair_get_full_row_hashes(msg_addr id, uint32_t repair_meta_id, shard_id dst_cpu_id); - - // Wrapper for REPAIR_GET_COMBINED_ROW_HASH - void register_repair_get_combined_row_hash(std::function (const rpc::client_info& cinfo, uint32_t repair_meta_id, std::optional common_sync_boundary, rpc::optional dst_cpu_id)>&& func); - future<> unregister_repair_get_combined_row_hash(); - future send_repair_get_combined_row_hash(msg_addr id, uint32_t repair_meta_id, std::optional common_sync_boundary, shard_id dst_cpu_id); - - // Wrapper for REPAIR_GET_SYNC_BOUNDARY - void register_repair_get_sync_boundary(std::function (const rpc::client_info& cinfo, uint32_t repair_meta_id, std::optional skipped_sync_boundary, rpc::optional dst_cpu_id)>&& func); - future<> unregister_repair_get_sync_boundary(); - future send_repair_get_sync_boundary(msg_addr id, uint32_t repair_meta_id, std::optional skipped_sync_boundary, shard_id dst_cpu_id); - - // Wrapper for REPAIR_GET_ROW_DIFF - void register_repair_get_row_diff(std::function (const rpc::client_info& cinfo, uint32_t repair_meta_id, repair_hash_set set_diff, bool needs_all_rows, rpc::optional dst_cpu_id)>&& func); - future<> unregister_repair_get_row_diff(); - future send_repair_get_row_diff(msg_addr id, uint32_t repair_meta_id, repair_hash_set set_diff, bool needs_all_rows, shard_id dst_cpu_id); - - // Wrapper for REPAIR_PUT_ROW_DIFF - void register_repair_put_row_diff(std::function (const rpc::client_info& cinfo, uint32_t repair_meta_id, repair_rows_on_wire row_diff, rpc::optional dst_cpu_id)>&& func); - future<> unregister_repair_put_row_diff(); - future<> send_repair_put_row_diff(msg_addr id, uint32_t repair_meta_id, repair_rows_on_wire row_diff, shard_id dst_cpu_id); - - // Wrapper for REPAIR_ROW_LEVEL_START - void register_repair_row_level_start(std::function (const rpc::client_info& cinfo, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed, unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version, rpc::optional reason, rpc::optional compaction_time, rpc::optional dst_cpu_id)>&& func); - future<> unregister_repair_row_level_start(); - future> send_repair_row_level_start(msg_addr id, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed, unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version, streaming::stream_reason reason, gc_clock::time_point compaction_time, shard_id dst_cpu_id); - - // Wrapper for REPAIR_ROW_LEVEL_STOP - void register_repair_row_level_stop(std::function (const rpc::client_info& cinfo, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, rpc::optional dst_cpu_id)>&& func); - future<> unregister_repair_row_level_stop(); - future<> send_repair_row_level_stop(msg_addr id, uint32_t repair_meta_id, sstring keyspace_name, sstring cf_name, dht::token_range range, shard_id dst_cpu_id); - - // Wrapper for REPAIR_GET_ESTIMATED_PARTITIONS - void register_repair_get_estimated_partitions(std::function (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::optional dst_cpu_id)>&& func); - future<> unregister_repair_get_estimated_partitions(); - future send_repair_get_estimated_partitions(msg_addr id, uint32_t repair_meta_id, shard_id dst_cpu_id); - - // Wrapper for REPAIR_SET_ESTIMATED_PARTITIONS - void register_repair_set_estimated_partitions(std::function (const rpc::client_info& cinfo, uint32_t repair_meta_id, uint64_t estimated_partitions, rpc::optional dst_cpu_id)>&& func); - future<> unregister_repair_set_estimated_partitions(); - future<> send_repair_set_estimated_partitions(msg_addr id, uint32_t repair_meta_id, uint64_t estimated_partitions, shard_id dst_cpu_id); - - // Wrapper for REPAIR_GET_DIFF_ALGORITHMS - void register_repair_get_diff_algorithms(std::function> (const rpc::client_info& cinfo)>&& func); - future<> unregister_repair_get_diff_algorithms(); - future> send_repair_get_diff_algorithms(msg_addr id); - // Wrapper for TASKS_GET_CHILDREN void register_tasks_get_children(std::function (const rpc::client_info& cinfo, tasks::get_children_request)>&& func); future<> unregister_tasks_get_children(); diff --git a/repair/row_level.cc b/repair/row_level.cc index d262604eff..750c60ff4f 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -224,7 +224,7 @@ static const std::vector& suportted_diff_detect static row_level_diff_detect_algorithm get_common_diff_detect_algorithm(netw::messaging_service& ms, const inet_address_vector_replica_set& nodes) { std::vector> nodes_algorithms(nodes.size()); parallel_for_each(std::views::iota(size_t(0), nodes.size()), coroutine::lambda([&] (size_t idx) -> future<> { - std::vector algorithms = co_await ms.send_repair_get_diff_algorithms(netw::messaging_service::msg_addr(nodes[idx])); + std::vector algorithms = co_await ser::repair_rpc_verbs::send_repair_get_diff_algorithms(&ms, netw::messaging_service::msg_addr(nodes[idx])); std::sort(algorithms.begin(), algorithms.end()); nodes_algorithms[idx] = std::move(algorithms); rlogger.trace("Got node_algorithms={}, from node={}", nodes_algorithms[idx], nodes[idx]); @@ -1431,7 +1431,7 @@ public: if (remote_node == myip()) { co_return co_await get_full_row_hashes_handler(); } - repair_hash_set hashes = co_await _messaging.send_repair_get_full_row_hashes(msg_addr(remote_node), + repair_hash_set hashes = co_await ser::repair_rpc_verbs::send_repair_get_full_row_hashes(&_messaging, msg_addr(remote_node), _repair_meta_id, dst_cpu_id); rlogger.debug("Got full hashes from peer={}, nr_hashes={}", remote_node, hashes.size()); _metrics.rx_hashes_nr += hashes.size(); @@ -1508,7 +1508,7 @@ public: if (remote_node == myip()) { co_return co_await get_combined_row_hash_handler(common_sync_boundary); } - get_combined_row_hash_response resp = co_await _messaging.send_repair_get_combined_row_hash(msg_addr(remote_node), + get_combined_row_hash_response resp = co_await ser::repair_rpc_verbs::send_repair_get_combined_row_hash(&_messaging, msg_addr(remote_node), _repair_meta_id, common_sync_boundary, dst_cpu_id); stats().rpc_call_nr++; stats().rx_hashes_nr++; @@ -1542,7 +1542,7 @@ public: // the time this change is introduced. sstring remote_partitioner_name = "org.apache.cassandra.dht.Murmur3Partitioner"; rpc::optional resp = - co_await _messaging.send_repair_row_level_start(msg_addr(remote_node), + co_await ser::repair_rpc_verbs::send_repair_row_level_start(&_messaging, msg_addr(remote_node), _repair_meta_id, ks_name, cf_name, std::move(range), _algo, _max_row_buf_size, _seed, _master_node_shard_config.shard, _master_node_shard_config.shard_count, _master_node_shard_config.ignore_msb, remote_partitioner_name, std::move(schema_version), reason, compaction_time, dst_cpu_id); @@ -1575,7 +1575,7 @@ public: co_return co_await stop(); } stats().rpc_call_nr++; - co_return co_await _messaging.send_repair_row_level_stop(msg_addr(remote_node), + co_return co_await ser::repair_rpc_verbs::send_repair_row_level_stop(&_messaging, msg_addr(remote_node), _repair_meta_id, std::move(ks_name), std::move(cf_name), std::move(range), dst_cpu_id); } @@ -1596,7 +1596,7 @@ public: co_return co_await get_estimated_partitions(); } stats().rpc_call_nr++; - co_return co_await _messaging.send_repair_get_estimated_partitions(msg_addr(remote_node), _repair_meta_id, dst_cpu_id); + co_return co_await ser::repair_rpc_verbs::send_repair_get_estimated_partitions(&_messaging, msg_addr(remote_node), _repair_meta_id, dst_cpu_id); } @@ -1615,7 +1615,7 @@ public: co_return co_await set_estimated_partitions(estimated_partitions); } stats().rpc_call_nr++; - co_return co_await _messaging.send_repair_set_estimated_partitions(msg_addr(remote_node), _repair_meta_id, estimated_partitions, dst_cpu_id); + co_return co_await ser::repair_rpc_verbs::send_repair_set_estimated_partitions(&_messaging, msg_addr(remote_node), _repair_meta_id, estimated_partitions, dst_cpu_id); } @@ -1635,7 +1635,7 @@ public: co_return co_await get_sync_boundary_handler(skipped_sync_boundary); } stats().rpc_call_nr++; - co_return co_await _messaging.send_repair_get_sync_boundary(msg_addr(remote_node), _repair_meta_id, skipped_sync_boundary, dst_cpu_id); + co_return co_await ser::repair_rpc_verbs::send_repair_get_sync_boundary(&_messaging, msg_addr(remote_node), _repair_meta_id, skipped_sync_boundary, dst_cpu_id); } // RPC handler @@ -1662,7 +1662,7 @@ public: _metrics.tx_hashes_nr += set_diff.size(); } stats().rpc_call_nr++; - repair_rows_on_wire rows = _messaging.send_repair_get_row_diff(msg_addr(remote_node), + repair_rows_on_wire rows = ser::repair_rpc_verbs::send_repair_get_row_diff(&_messaging, msg_addr(remote_node), _repair_meta_id, std::move(set_diff), bool(needs_all_rows), dst_cpu_id).get(); if (!rows.empty()) { apply_rows_on_master_in_thread(std::move(rows), remote_node, update_working_row_buf::yes, update_peer_row_hash_sets::no, node_idx); @@ -1676,7 +1676,7 @@ public: return; } stats().rpc_call_nr++; - repair_rows_on_wire rows = _messaging.send_repair_get_row_diff(msg_addr(remote_node), + repair_rows_on_wire rows = ser::repair_rpc_verbs::send_repair_get_row_diff(&_messaging, msg_addr(remote_node), _repair_meta_id, {}, bool(needs_all_rows_t::yes), dst_cpu_id).get(); if (!rows.empty()) { apply_rows_on_master_in_thread(std::move(rows), remote_node, update_working_row_buf::yes, update_peer_row_hash_sets::yes, node_idx); @@ -1803,7 +1803,7 @@ public: stats().tx_row_bytes += row_bytes; stats().rpc_call_nr++; repair_rows_on_wire rows = co_await to_repair_rows_on_wire(std::move(row_diff)); - co_await _messaging.send_repair_put_row_diff(msg_addr(remote_node), _repair_meta_id, std::move(rows), dst_cpu_id); + co_await ser::repair_rpc_verbs::send_repair_put_row_diff(&_messaging, msg_addr(remote_node), _repair_meta_id, std::move(rows), dst_cpu_id); } } @@ -2425,7 +2425,7 @@ future<> repair_service::init_ms_handlers() { }); return make_ready_future>(sink); }); - ms.register_repair_get_full_row_hashes([this] (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::optional dst_cpu_id_opt) { + ser::repair_rpc_verbs::register_repair_get_full_row_hashes(&ms, [this] (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::optional dst_cpu_id_opt) { auto src_cpu_id = cinfo.retrieve_auxiliary("src_cpu_id"); auto from = cinfo.retrieve_auxiliary("baddr"); auto shard = get_dst_shard_id(src_cpu_id, dst_cpu_id_opt); @@ -2439,7 +2439,7 @@ future<> repair_service::init_ms_handlers() { }); }) ; }); - ms.register_repair_get_combined_row_hash([this] (const rpc::client_info& cinfo, uint32_t repair_meta_id, + ser::repair_rpc_verbs::register_repair_get_combined_row_hash(&ms, [this] (const rpc::client_info& cinfo, uint32_t repair_meta_id, std::optional common_sync_boundary, rpc::optional dst_cpu_id_opt) { auto src_cpu_id = cinfo.retrieve_auxiliary("src_cpu_id"); auto shard = get_dst_shard_id(src_cpu_id, dst_cpu_id_opt); @@ -2455,7 +2455,7 @@ future<> repair_service::init_ms_handlers() { }); }); }); - ms.register_repair_get_sync_boundary([this] (const rpc::client_info& cinfo, uint32_t repair_meta_id, + ser::repair_rpc_verbs::register_repair_get_sync_boundary(&ms, [this] (const rpc::client_info& cinfo, uint32_t repair_meta_id, std::optional skipped_sync_boundary, rpc::optional dst_cpu_id_opt) { auto src_cpu_id = cinfo.retrieve_auxiliary("src_cpu_id"); auto from = cinfo.retrieve_auxiliary("baddr"); @@ -2470,7 +2470,7 @@ future<> repair_service::init_ms_handlers() { }); }); }); - ms.register_repair_get_row_diff([this] (const rpc::client_info& cinfo, uint32_t repair_meta_id, + ser::repair_rpc_verbs::register_repair_get_row_diff(&ms, [this] (const rpc::client_info& cinfo, uint32_t repair_meta_id, repair_hash_set set_diff, bool needs_all_rows, rpc::optional dst_cpu_id_opt) { auto src_cpu_id = cinfo.retrieve_auxiliary("src_cpu_id"); auto shard = get_dst_shard_id(src_cpu_id, dst_cpu_id_opt); @@ -2493,7 +2493,7 @@ future<> repair_service::init_ms_handlers() { } }); }); - ms.register_repair_put_row_diff([this] (const rpc::client_info& cinfo, uint32_t repair_meta_id, + ser::repair_rpc_verbs::register_repair_put_row_diff(&ms, [this] (const rpc::client_info& cinfo, uint32_t repair_meta_id, repair_rows_on_wire row_diff, rpc::optional dst_cpu_id_opt) { auto src_cpu_id = cinfo.retrieve_auxiliary("src_cpu_id"); auto shard = get_dst_shard_id(src_cpu_id, dst_cpu_id_opt); @@ -2513,7 +2513,7 @@ future<> repair_service::init_ms_handlers() { } }); }); - ms.register_repair_row_level_start([this] (const rpc::client_info& cinfo, uint32_t repair_meta_id, sstring ks_name, + ser::repair_rpc_verbs::register_repair_row_level_start(&ms, [this] (const rpc::client_info& cinfo, uint32_t repair_meta_id, sstring ks_name, sstring cf_name, dht::token_range range, row_level_diff_detect_algorithm algo, uint64_t max_row_buf_size, uint64_t seed, unsigned remote_shard, unsigned remote_shard_count, unsigned remote_ignore_msb, sstring remote_partitioner_name, table_schema_version schema_version, rpc::optional reason, rpc::optional compaction_time, rpc::optional dst_cpu_id_opt) { @@ -2535,7 +2535,7 @@ future<> repair_service::init_ms_handlers() { schema_version, r, ct, _repair_module->abort_source()); }); }); - ms.register_repair_row_level_stop([this] (const rpc::client_info& cinfo, uint32_t repair_meta_id, + ser::repair_rpc_verbs::register_repair_row_level_stop(&ms, [this] (const rpc::client_info& cinfo, uint32_t repair_meta_id, sstring ks_name, sstring cf_name, dht::token_range range, rpc::optional dst_cpu_id_opt) { auto src_cpu_id = cinfo.retrieve_auxiliary("src_cpu_id"); auto shard = get_dst_shard_id(src_cpu_id, dst_cpu_id_opt); @@ -2545,7 +2545,7 @@ future<> repair_service::init_ms_handlers() { std::move(ks_name), std::move(cf_name), std::move(range)); }); }); - ms.register_repair_get_estimated_partitions([this] (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::optional dst_cpu_id_opt) { + ser::repair_rpc_verbs::register_repair_get_estimated_partitions(&ms, [this] (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::optional dst_cpu_id_opt) { auto src_cpu_id = cinfo.retrieve_auxiliary("src_cpu_id"); auto shard = get_dst_shard_id(src_cpu_id, dst_cpu_id_opt); auto from = cinfo.retrieve_auxiliary("baddr"); @@ -2553,7 +2553,7 @@ future<> repair_service::init_ms_handlers() { return repair_meta::repair_get_estimated_partitions_handler(local_repair, from, repair_meta_id); }); }); - ms.register_repair_set_estimated_partitions([this] (const rpc::client_info& cinfo, uint32_t repair_meta_id, + ser::repair_rpc_verbs::register_repair_set_estimated_partitions(&ms, [this] (const rpc::client_info& cinfo, uint32_t repair_meta_id, uint64_t estimated_partitions, rpc::optional dst_cpu_id_opt) { auto src_cpu_id = cinfo.retrieve_auxiliary("src_cpu_id"); auto shard = get_dst_shard_id(src_cpu_id, dst_cpu_id_opt); @@ -2562,7 +2562,7 @@ future<> repair_service::init_ms_handlers() { return repair_meta::repair_set_estimated_partitions_handler(local_repair, from, repair_meta_id, estimated_partitions); }); }); - ms.register_repair_get_diff_algorithms([] (const rpc::client_info& cinfo) { + ser::repair_rpc_verbs::register_repair_get_diff_algorithms(&ms, [] (const rpc::client_info& cinfo) { return make_ready_future>(suportted_diff_detect_algorithms()); }); ser::repair_rpc_verbs::register_repair_update_system_table(&ms, [this] (const rpc::client_info& cinfo, repair_update_system_table_request req) { @@ -2584,16 +2584,6 @@ future<> repair_service::uninit_ms_handlers() { ms.unregister_repair_get_row_diff_with_rpc_stream(), ms.unregister_repair_put_row_diff_with_rpc_stream(), ms.unregister_repair_get_full_row_hashes_with_rpc_stream(), - ms.unregister_repair_get_full_row_hashes(), - ms.unregister_repair_get_combined_row_hash(), - ms.unregister_repair_get_sync_boundary(), - ms.unregister_repair_get_row_diff(), - ms.unregister_repair_put_row_diff(), - ms.unregister_repair_row_level_start(), - ms.unregister_repair_row_level_stop(), - ms.unregister_repair_get_estimated_partitions(), - ms.unregister_repair_set_estimated_partitions(), - ms.unregister_repair_get_diff_algorithms(), ser::repair_rpc_verbs::unregister(&ms) ).discard_result(); } From 92c2558a833bcc820a5775a87e40d7a3b159b7bb Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Tue, 3 Dec 2024 17:25:01 +0200 Subject: [PATCH 5/6] streaming: move streaming verbs to IDL --- idl/streaming.idl.hh | 17 ++++++++++ idl/uuid.idl.hh | 1 - message/messaging_service.cc | 56 ------------------------------- message/messaging_service.hh | 20 ----------- streaming/stream_fwd.hh | 6 ++++ streaming/stream_session.cc | 22 ++++++------ streaming/stream_transfer_task.cc | 5 +-- 7 files changed, 36 insertions(+), 91 deletions(-) diff --git a/idl/streaming.idl.hh b/idl/streaming.idl.hh index dcc37772d3..82c101f938 100644 --- a/idl/streaming.idl.hh +++ b/idl/streaming.idl.hh @@ -12,6 +12,19 @@ #include "streaming/stream_fwd.hh" +namespace service { + +// Before the mode of prepare_message verb to the IDL +// there was no serizlizer for session_id and one from +// raft_storage.idl.hh for tagged_id was erroneously +// used. It does not marked as `final`, so here we have +// to omit it as well for compatibility. +class session_id { + utils::UUID uuid(); +} + +} + namespace streaming { class plan_id final { @@ -55,4 +68,8 @@ enum class stream_mutation_fragments_cmd : uint8_t { end_of_stream, }; +verb [[with_client_info]] prepare_message (streaming::prepare_message msg, streaming::plan_id plan_id, sstring description, streaming::stream_reason reason [[version 3.1.0]], service::session_id session [[version 6.0.0]]) -> streaming::prepare_message; +verb [[with_client_info]] prepare_done_message (streaming::plan_id plan_id, unsigned dst_cpu_id); +verb [[with_client_info]] stream_mutation_done (streaming::plan_id plan_id, dht::token_range_vector ranges, table_id cf_id, unsigned dst_cpu_id); +verb [[with_client_info]] complete_message (streaming::plan_id plan_id, unsigned dst_cpu_id, bool failed [[version 2.1.0]]); } diff --git a/idl/uuid.idl.hh b/idl/uuid.idl.hh index 65db413038..2b99b2560a 100644 --- a/idl/uuid.idl.hh +++ b/idl/uuid.idl.hh @@ -42,4 +42,3 @@ class host_id final { }; } // namespace locator - diff --git a/message/messaging_service.cc b/message/messaging_service.cc index 3fc78eaf2a..802a1726bd 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -1203,62 +1203,6 @@ future<> messaging_service::unregister_repair_get_full_row_hashes_with_rpc_strea // Wrappers for verbs -// PREPARE_MESSAGE -void messaging_service::register_prepare_message(std::function (const rpc::client_info& cinfo, - streaming::prepare_message msg, streaming::plan_id plan_id, sstring description, rpc::optional reason, rpc::optional)>&& func) { - register_handler(this, messaging_verb::PREPARE_MESSAGE, std::move(func)); -} -future messaging_service::send_prepare_message(msg_addr id, streaming::prepare_message msg, streaming::plan_id plan_id, - sstring description, streaming::stream_reason reason, service::session_id session) { - return send_message(this, messaging_verb::PREPARE_MESSAGE, id, - std::move(msg), plan_id, std::move(description), reason, session); -} -future<> messaging_service::unregister_prepare_message() { - return unregister_handler(messaging_verb::PREPARE_MESSAGE); -} - -// PREPARE_DONE_MESSAGE -void messaging_service::register_prepare_done_message(std::function (const rpc::client_info& cinfo, streaming::plan_id plan_id, unsigned dst_cpu_id)>&& func) { - register_handler(this, messaging_verb::PREPARE_DONE_MESSAGE, std::move(func)); -} -future<> messaging_service::send_prepare_done_message(msg_addr id, streaming::plan_id plan_id, unsigned dst_cpu_id) { - return send_message(this, messaging_verb::PREPARE_DONE_MESSAGE, id, - plan_id, dst_cpu_id); -} -future<> messaging_service::unregister_prepare_done_message() { - return unregister_handler(messaging_verb::PREPARE_DONE_MESSAGE); -} - -// STREAM_MUTATION_DONE -void messaging_service::register_stream_mutation_done(std::function (const rpc::client_info& cinfo, - streaming::plan_id plan_id, dht::token_range_vector ranges, table_id cf_id, unsigned dst_cpu_id)>&& func) { - register_handler(this, messaging_verb::STREAM_MUTATION_DONE, - [func = std::move(func)] (const rpc::client_info& cinfo, - streaming::plan_id plan_id, std::vector> ranges, - table_id cf_id, unsigned dst_cpu_id) mutable { - return func(cinfo, plan_id, ::compat::unwrap(std::move(ranges)), cf_id, dst_cpu_id); - }); -} -future<> messaging_service::send_stream_mutation_done(msg_addr id, streaming::plan_id plan_id, dht::token_range_vector ranges, table_id cf_id, unsigned dst_cpu_id) { - return send_message(this, messaging_verb::STREAM_MUTATION_DONE, id, - plan_id, std::move(ranges), cf_id, dst_cpu_id); -} -future<> messaging_service::unregister_stream_mutation_done() { - return unregister_handler(messaging_verb::STREAM_MUTATION_DONE); -} - -// COMPLETE_MESSAGE -void messaging_service::register_complete_message(std::function (const rpc::client_info& cinfo, streaming::plan_id plan_id, unsigned dst_cpu_id, rpc::optional failed)>&& func) { - register_handler(this, messaging_verb::COMPLETE_MESSAGE, std::move(func)); -} -future<> messaging_service::send_complete_message(msg_addr id, streaming::plan_id plan_id, unsigned dst_cpu_id, bool failed) { - return send_message(this, messaging_verb::COMPLETE_MESSAGE, id, - plan_id, dst_cpu_id, failed); -} -future<> messaging_service::unregister_complete_message() { - return unregister_handler(messaging_verb::COMPLETE_MESSAGE); -} - // Wrapper for TASKS_CHILDREN_REQUEST void messaging_service::register_tasks_get_children(std::function (const rpc::client_info& cinfo, tasks::get_children_request)>&& func) { register_handler(this, messaging_verb::TASKS_GET_CHILDREN, std::move(func)); diff --git a/message/messaging_service.hh b/message/messaging_service.hh index a59983e482..a385d52b06 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -393,18 +393,6 @@ public: future<> unregister_handler(messaging_verb verb); - // Wrapper for PREPARE_MESSAGE verb - void register_prepare_message(std::function (const rpc::client_info& cinfo, - streaming::prepare_message msg, streaming::plan_id plan_id, sstring description, rpc::optional reason, rpc::optional)>&& func); - future send_prepare_message(msg_addr id, streaming::prepare_message msg, streaming::plan_id plan_id, - sstring description, streaming::stream_reason, service::session_id); - future<> unregister_prepare_message(); - - // Wrapper for PREPARE_DONE_MESSAGE verb - void register_prepare_done_message(std::function (const rpc::client_info& cinfo, streaming::plan_id plan_id, unsigned dst_cpu_id)>&& func); - future<> send_prepare_done_message(msg_addr id, streaming::plan_id plan_id, unsigned dst_cpu_id); - future<> unregister_prepare_done_message(); - // Wrapper for STREAM_MUTATION_FRAGMENTS // The receiver of STREAM_MUTATION_FRAGMENTS sends status code to the sender to notify any error on the receiver side. The status code is of type int32_t. 0 means successful, -1 means error, -2 means error and table is dropped, other status code value are reserved for future use. void register_stream_mutation_fragments(std::function> (const rpc::client_info& cinfo, streaming::plan_id plan_id, table_schema_version schema_id, table_id cf_id, uint64_t estimated_partitions, rpc::optional reason_opt, rpc::source> source, rpc::optional)>&& func); @@ -430,14 +418,6 @@ public: void register_repair_get_full_row_hashes_with_rpc_stream(std::function> (const rpc::client_info& cinfo, uint32_t repair_meta_id, rpc::source source, rpc::optional dst_cpu_id_opt)>&& func); future<> unregister_repair_get_full_row_hashes_with_rpc_stream(); - void register_stream_mutation_done(std::function (const rpc::client_info& cinfo, streaming::plan_id plan_id, dht::token_range_vector ranges, table_id cf_id, unsigned dst_cpu_id)>&& func); - future<> send_stream_mutation_done(msg_addr id, streaming::plan_id plan_id, dht::token_range_vector ranges, table_id cf_id, unsigned dst_cpu_id); - future<> unregister_stream_mutation_done(); - - void register_complete_message(std::function (const rpc::client_info& cinfo, streaming::plan_id plan_id, unsigned dst_cpu_id, rpc::optional failed)>&& func); - future<> send_complete_message(msg_addr id, streaming::plan_id plan_id, unsigned dst_cpu_id, bool failed = false); - future<> unregister_complete_message(); - // Wrapper for TASKS_GET_CHILDREN void register_tasks_get_children(std::function (const rpc::client_info& cinfo, tasks::get_children_request)>&& func); future<> unregister_tasks_get_children(); diff --git a/streaming/stream_fwd.hh b/streaming/stream_fwd.hh index b22970cff5..a29e570a31 100644 --- a/streaming/stream_fwd.hh +++ b/streaming/stream_fwd.hh @@ -21,3 +21,9 @@ class stream_state; using plan_id = utils::tagged_uuid; } // namespace streaming + +namespace service { + +using session_id = utils::tagged_uuid; + +} \ No newline at end of file diff --git a/streaming/stream_session.cc b/streaming/stream_session.cc index 57ab8a775a..a9d761f95e 100644 --- a/streaming/stream_session.cc +++ b/streaming/stream_session.cc @@ -31,6 +31,7 @@ #include "service/topology_guard.hh" #include "utils/assert.hh" #include "utils/error_injection.hh" +#include "idl/streaming.dist.hh" namespace streaming { @@ -89,7 +90,7 @@ stream_manager::make_streaming_consumer(uint64_t estimated_partitions, stream_re void stream_manager::init_messaging_service_handler(abort_source& as) { auto& ms = _ms.local(); - ms.register_prepare_message([this] (const rpc::client_info& cinfo, prepare_message msg, streaming::plan_id plan_id, sstring description, rpc::optional reason_opt, rpc::optional session) { + ser::streaming_rpc_verbs::register_prepare_message(&ms, [this] (const rpc::client_info& cinfo, prepare_message msg, streaming::plan_id plan_id, sstring description, rpc::optional reason_opt, rpc::optional session) { const auto& src_cpu_id = cinfo.retrieve_auxiliary("src_cpu_id"); const auto& from = cinfo.retrieve_auxiliary("baddr"); auto dst_cpu_id = this_shard_id(); @@ -105,7 +106,7 @@ void stream_manager::init_messaging_service_handler(abort_source& as) { return session->prepare(std::move(msg.requests), std::move(msg.summaries)); }); }); - ms.register_prepare_done_message([this] (const rpc::client_info& cinfo, streaming::plan_id plan_id, unsigned dst_cpu_id) { + ser::streaming_rpc_verbs::register_prepare_done_message(&ms, [this] (const rpc::client_info& cinfo, streaming::plan_id plan_id, unsigned dst_cpu_id) { const auto& from = cinfo.retrieve_auxiliary("baddr"); return container().invoke_on(dst_cpu_id, [plan_id, from] (auto& sm) mutable { auto session = sm.get_session(plan_id, from, "PREPARE_DONE_MESSAGE"); @@ -252,14 +253,14 @@ void stream_manager::init_messaging_service_handler(abort_source& as) { return make_ready_future>(sink); }); }); - ms.register_stream_mutation_done([this] (const rpc::client_info& cinfo, streaming::plan_id plan_id, dht::token_range_vector ranges, table_id cf_id, unsigned dst_cpu_id) { + ser::streaming_rpc_verbs::register_stream_mutation_done(&ms, [this] (const rpc::client_info& cinfo, streaming::plan_id plan_id, dht::token_range_vector ranges, table_id cf_id, unsigned dst_cpu_id) { const auto& from = cinfo.retrieve_auxiliary("baddr"); return container().invoke_on(dst_cpu_id, [ranges = std::move(ranges), plan_id, cf_id, from] (auto& sm) mutable { auto session = sm.get_session(plan_id, from, "STREAM_MUTATION_DONE", cf_id); session->receive_task_completed(cf_id); }); }); - ms.register_complete_message([this] (const rpc::client_info& cinfo, streaming::plan_id plan_id, unsigned dst_cpu_id, rpc::optional failed) { + ser::streaming_rpc_verbs::register_complete_message(&ms, [this] (const rpc::client_info& cinfo, streaming::plan_id plan_id, unsigned dst_cpu_id, rpc::optional failed) { const auto& from = cinfo.retrieve_auxiliary("baddr"); if (failed && *failed) { return container().invoke_on(dst_cpu_id, [plan_id, from, dst_cpu_id] (auto& sm) { @@ -279,11 +280,8 @@ void stream_manager::init_messaging_service_handler(abort_source& as) { future<> stream_manager::uninit_messaging_service_handler() { auto& ms = _ms.local(); return when_all_succeed( - ms.unregister_prepare_message(), - ms.unregister_prepare_done_message(), - ms.unregister_stream_mutation_fragments(), - ms.unregister_stream_mutation_done(), - ms.unregister_complete_message()).discard_result(); + ser::streaming_rpc_verbs::unregister(&ms), + ms.unregister_stream_mutation_fragments()).discard_result(); } stream_session::stream_session(stream_manager& mgr, inet_address peer_) @@ -305,7 +303,7 @@ future<> stream_session::on_initialization_complete() { } auto id = msg_addr{this->peer, 0}; sslog.debug("[Stream #{}] SEND PREPARE_MESSAGE to {}", plan_id(), id); - return manager().ms().send_prepare_message(id, std::move(prepare), plan_id(), description(), get_reason(), topo_guard()).then_wrapped([this, id] (auto&& f) { + return ser::streaming_rpc_verbs::send_prepare_message(&manager().ms(), id, std::move(prepare), plan_id(), description(), get_reason(), topo_guard()).then_wrapped([this, id] (auto&& f) { try { auto msg = f.get(); sslog.debug("[Stream #{}] GOT PREPARE_MESSAGE Reply from {}", this->plan_id(), this->peer); @@ -324,7 +322,7 @@ future<> stream_session::on_initialization_complete() { }).then([this, id] { auto plan_id = this->plan_id(); sslog.debug("[Stream #{}] SEND PREPARE_DONE_MESSAGE to {}", plan_id, id); - return manager().ms().send_prepare_done_message(id, plan_id, this->dst_cpu_id).then([this] { + return ser::streaming_rpc_verbs::send_prepare_done_message(&manager().ms(), id, plan_id, this->dst_cpu_id).then([this] { sslog.debug("[Stream #{}] GOT PREPARE_DONE_MESSAGE Reply from {}", this->plan_id(), this->peer); }).handle_exception([id, plan_id] (auto ep) { sslog.warn("[Stream #{}] Fail to send PREPARE_DONE_MESSAGE to {}, {}", plan_id, id, ep); @@ -467,7 +465,7 @@ void stream_session::send_failed_complete_message() { auto session = shared_from_this(); bool failed = true; //FIXME: discarded future. - (void)manager().ms().send_complete_message(id, plan_id, this->dst_cpu_id, failed).then([session, id, plan_id] { + (void)ser::streaming_rpc_verbs::send_complete_message(&manager().ms(), id, plan_id, this->dst_cpu_id, failed).then([session, id, plan_id] { sslog.debug("[Stream #{}] GOT COMPLETE_MESSAGE Reply from {}", plan_id, id.addr); }).handle_exception([session, id, plan_id] (auto ep) { sslog.debug("[Stream #{}] COMPLETE_MESSAGE for {} has failed: {}", plan_id, id.addr, ep); diff --git a/streaming/stream_transfer_task.cc b/streaming/stream_transfer_task.cc index 5f29bfee8a..fb0c6555cf 100644 --- a/streaming/stream_transfer_task.cc +++ b/streaming/stream_transfer_task.cc @@ -34,6 +34,7 @@ #include "repair/table_check.hh" #include "gms/feature_service.hh" #include "utils/error_injection.hh" +#include "idl/streaming.dist.hh" namespace streaming { @@ -233,7 +234,7 @@ future<> stream_transfer_task::execute() { }); }).then([this, plan_id, cf_id, id, &sm] { sslog.debug("[Stream #{}] SEND STREAM_MUTATION_DONE to {}, cf_id={}", plan_id, id, cf_id); - return sm.ms().send_stream_mutation_done(id, plan_id, _ranges, + return ser::streaming_rpc_verbs::send_stream_mutation_done(&sm.ms(), id, plan_id, _ranges, cf_id, session->dst_cpu_id).handle_exception([plan_id, id] (auto ep) { sslog.warn("[Stream #{}] stream_transfer_task: Fail to send STREAM_MUTATION_DONE to {}: {}", plan_id, id, ep); std::rethrow_exception(ep); @@ -256,7 +257,7 @@ future<> stream_transfer_task::execute() { if (table_dropped) { sslog.warn("[Stream #{}] Ignore the table with table_id {} which is dropped during streaming", plan_id, cf_id); if (!_mutation_done_sent) { - co_await session->manager().ms().send_stream_mutation_done(id, plan_id, _ranges, cf_id, session->dst_cpu_id); + co_await ser::streaming_rpc_verbs::send_stream_mutation_done(&session->manager().ms(), id, plan_id, _ranges, cf_id, session->dst_cpu_id); } } } From c095f63ea569f8a90a617b22c3eaa0395db528dd Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Tue, 3 Dec 2024 11:09:56 +0200 Subject: [PATCH 6/6] repair: repair_flush_hints_batchlog_request::target_nodes is not used any more, so mark it as such After b3b3e880d3391aee1cda9611bf830870f83d1592 target_nodes is not used by the receiver, so we can skip setting it on sender as well. --- idl/repair.idl.hh | 2 +- repair/repair.cc | 26 +++++++++++--------------- repair/repair.hh | 2 +- repair/row_level.cc | 23 +++++++++++------------ repair/row_level.hh | 2 +- 5 files changed, 25 insertions(+), 30 deletions(-) diff --git a/idl/repair.idl.hh b/idl/repair.idl.hh index 468177e891..981fcaf458 100644 --- a/idl/repair.idl.hh +++ b/idl/repair.idl.hh @@ -87,7 +87,7 @@ struct repair_update_system_table_response { struct repair_flush_hints_batchlog_request { tasks::task_id repair_uuid; - std::list target_nodes; + std::list unused; std::chrono::seconds hints_timeout; std::chrono::seconds batchlog_timeout; }; diff --git a/repair/repair.cc b/repair/repair.cc index 29a53e5e58..2c04dc0ee1 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -375,7 +375,7 @@ static future> get_hosts_participating_in_repair( future> repair_service::flush_hints(repair_uniq_id id, sstring keyspace, std::vector cfs, - std::unordered_set ignore_nodes, std::list participants) { + std::unordered_set ignore_nodes) { auto& db = get_db().local(); auto uuid = id.uuid(); bool needs_flush_before_repair = false; @@ -400,13 +400,13 @@ future> repair_service::flush_hints(repai }); auto hints_timeout = std::chrono::seconds(300); auto batchlog_timeout = std::chrono::seconds(300); - repair_flush_hints_batchlog_request req{id.uuid(), participants, hints_timeout, batchlog_timeout}; + repair_flush_hints_batchlog_request req{id.uuid(), {}, hints_timeout, batchlog_timeout}; auto start_time = gc_clock::now(); std::vector times; try { - co_await parallel_for_each(waiting_nodes, [this, uuid, start_time, ×, &req, &participants] (gms::inet_address node) -> future<> { - rlogger.info("repair[{}]: Sending repair_flush_hints_batchlog to node={}, participants={}, started", - uuid, node, participants); + co_await parallel_for_each(waiting_nodes, [this, uuid, start_time, ×, &req] (gms::inet_address node) -> future<> { + rlogger.info("repair[{}]: Sending repair_flush_hints_batchlog to node={}, started", + uuid, node); try { auto& ms = get_messaging(); auto resp = co_await ser::repair_rpc_verbs::send_repair_flush_hints_batchlog(&ms, netw::msg_addr(node), req); @@ -418,8 +418,8 @@ future> repair_service::flush_hints(repai times.push_back(resp.flush_time); } } catch (...) { - rlogger.warn("repair[{}]: Sending repair_flush_hints_batchlog to node={}, participants={}, failed: {}", - uuid, node, participants, std::current_exception()); + rlogger.warn("repair[{}]: Sending repair_flush_hints_batchlog to node={}, failed: {}", + uuid, node, std::current_exception()); throw; } }); @@ -431,11 +431,10 @@ future> repair_service::flush_hints(repai auto duration = std::chrono::duration(gc_clock::now() - start_time); rlogger.info("repair[{}]: Finished repair_flush_hints_batchlog flush_times={} flush_time={} flush_duration={}", uuid, times, flush_time, duration); } catch (...) { - rlogger.warn("repair[{}]: Sending repair_flush_hints_batchlog to participants={} failed, continue to run repair", - uuid, participants); + rlogger.warn("repair[{}]: Sending repair_flush_hints_batchlog failed, continue to run repair", uuid); } } else { - rlogger.info("repair[{}]: Skipped sending repair_flush_hints_batchlog to nodes={}", uuid, participants); + rlogger.info("repair[{}]: Skipped sending repair_flush_hints_batchlog", uuid); } co_return std::make_tuple(hints_batchlog_flushed, flush_time); } @@ -1360,7 +1359,7 @@ future<> repair::user_requested_repair_task_impl::run() { } else { participants = get_hosts_participating_in_repair(germs->get(), keyspace, ranges, data_centers, hosts, ignore_nodes).get(); } - auto [hints_batchlog_flushed, flush_time] = rs.flush_hints(id, keyspace, cfs, ignore_nodes, participants).get(); + auto [hints_batchlog_flushed, flush_time] = rs.flush_hints(id, keyspace, cfs, ignore_nodes).get(); std::vector> repair_results; repair_results.reserve(smp::count); @@ -2588,10 +2587,7 @@ future<> repair::tablet_repair_task_impl::run() { auto data_centers = std::vector(); auto hosts = std::vector(); auto ignore_nodes = std::unordered_set(); - auto my_address = erm->get_topology().my_address(); - auto participants = std::list(m.neighbors.all.begin(), m.neighbors.all.end()); - participants.push_front(my_address); - auto [hints_batchlog_flushed, flush_time] = co_await rs.flush_hints(id, m.keyspace_name, tables, ignore_nodes, participants); + auto [hints_batchlog_flushed, flush_time] = co_await rs.flush_hints(id, m.keyspace_name, tables, ignore_nodes); bool small_table_optimization = false; auto task_impl_ptr = seastar::make_shared(rs._repair_module, tasks::task_id::create_random_id(), diff --git a/repair/repair.hh b/repair/repair.hh index fac828f1cd..df1df03238 100644 --- a/repair/repair.hh +++ b/repair/repair.hh @@ -260,7 +260,7 @@ struct repair_update_system_table_response { struct repair_flush_hints_batchlog_request { tasks::task_id repair_uuid; - std::list target_nodes; + std::list unused; std::chrono::seconds hints_timeout; std::chrono::seconds batchlog_timeout; }; diff --git a/repair/row_level.cc b/repair/row_level.cc index 750c60ff4f..7374a6e7b7 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -2309,8 +2309,8 @@ future repair_service::repair_flush_hints_ return rs.repair_flush_hints_batchlog_handler(from, std::move(req)); }); } - rlogger.info("repair[{}]: Started to process repair_flush_hints_batchlog_request from node={} target_nodes={} hints_timeout={}s batchlog_timeout={}s", - req.repair_uuid, from, req.target_nodes, req.hints_timeout.count(), req.batchlog_timeout.count()); + rlogger.info("repair[{}]: Started to process repair_flush_hints_batchlog_request from node={} hints_timeout={}s batchlog_timeout={}s", + req.repair_uuid, from, req.hints_timeout.count(), req.batchlog_timeout.count()); auto permit = co_await seastar::get_units(_flush_hints_batchlog_sem, 1); bool updated = false; auto now = gc_clock::now(); @@ -2319,8 +2319,7 @@ future repair_service::repair_flush_hints_ auto flush_time = now; if (cache_disabled || (now - _flush_hints_batchlog_time > cache_time)) { // Empty targets meants all nodes - std::vector target_nodes; - db::hints::sync_point sync_point = co_await _sp.local().create_hint_sync_point(std::move(target_nodes)); + db::hints::sync_point sync_point = co_await _sp.local().create_hint_sync_point(std::vector{}); lowres_clock::time_point deadline = lowres_clock::now() + req.hints_timeout; try { bool bm_throw = utils::get_local_injector().enter("repair_flush_hints_batchlog_handler_bm_uninitialized"); @@ -2329,13 +2328,13 @@ future repair_service::repair_flush_hints_ } co_await coroutine::all( [this, &from, &req, &sync_point, &deadline] () -> future<> { - rlogger.info("repair[{}]: Started to flush hints for repair_flush_hints_batchlog_request from node={}, target_nodes={}", req.repair_uuid, from, req.target_nodes); + rlogger.info("repair[{}]: Started to flush hints for repair_flush_hints_batchlog_request from node={}", req.repair_uuid, from); co_await _sp.local().wait_for_hint_sync_point(std::move(sync_point), deadline); - rlogger.info("repair[{}]: Finished to flush hints for repair_flush_hints_batchlog_request from node={}, target_hosts={}", req.repair_uuid, from, req.target_nodes); + rlogger.info("repair[{}]: Finished to flush hints for repair_flush_hints_batchlog_request from node={}", req.repair_uuid, from); co_return; }, [this, now, cache_disabled, &flush_time, &cache_time, &from, &req] () -> future<> { - rlogger.info("repair[{}]: Started to flush batchlog for repair_flush_hints_batchlog_request from node={}, target_nodes={}", req.repair_uuid, from, req.target_nodes); + rlogger.info("repair[{}]: Started to flush batchlog for repair_flush_hints_batchlog_request from node={}", req.repair_uuid, from); auto last_replay = _bm.local().get_last_replay(); bool issue_flush = false; if (cache_disabled) { @@ -2364,12 +2363,12 @@ future repair_service::repair_flush_hints_ co_await _bm.local().do_batch_log_replay(db::batchlog_manager::post_replay_cleanup::no); utils::get_local_injector().set_parameter("repair_flush_hints_batchlog_handler", "issue_flush", fmt::to_string(flush_time)); } - rlogger.info("repair[{}]: Finished to flush batchlog for repair_flush_hints_batchlog_request from node={}, target_nodes={}, flushed={}", req.repair_uuid, from, req.target_nodes, issue_flush); + rlogger.info("repair[{}]: Finished to flush batchlog for repair_flush_hints_batchlog_request from node={}, flushed={}", req.repair_uuid, from, issue_flush); } ); } catch (...) { - rlogger.warn("repair[{}]: Failed to process repair_flush_hints_batchlog_request from node={} target_hosts={}: {}", - req.repair_uuid, from, req.target_nodes, std::current_exception()); + rlogger.warn("repair[{}]: Failed to process repair_flush_hints_batchlog_request from node={}: {}", + req.repair_uuid, from, std::current_exception()); throw; } co_await container().invoke_on_all([flush_time] (repair_service& rs) { @@ -2380,8 +2379,8 @@ future repair_service::repair_flush_hints_ utils::get_local_injector().set_parameter("repair_flush_hints_batchlog_handler", "skip_flush", fmt::to_string(flush_time)); } auto duration = std::chrono::duration(gc_clock::now() - now); - rlogger.info("repair[{}]: Finished to process repair_flush_hints_batchlog_request from node={} target_nodes={} updated={} flush_hints_batchlog_time={} flush_cache_time={} flush_duration={}", - req.repair_uuid, from, req.target_nodes, updated, _flush_hints_batchlog_time, cache_time, duration); + rlogger.info("repair[{}]: Finished to process repair_flush_hints_batchlog_request from node={} updated={} flush_hints_batchlog_time={} flush_cache_time={} flush_duration={}", + req.repair_uuid, from, updated, _flush_hints_batchlog_time, cache_time, duration); repair_flush_hints_batchlog_response resp{ .flush_time = _flush_hints_batchlog_time }; co_return resp; } diff --git a/repair/row_level.hh b/repair/row_level.hh index a6a2f0b949..c73f002e4d 100644 --- a/repair/row_level.hh +++ b/repair/row_level.hh @@ -124,7 +124,7 @@ class repair_service : public seastar::peering_sharded_service { gc_clock::time_point _flush_hints_batchlog_time; future> flush_hints(repair_uniq_id id, sstring keyspace, std::vector cfs, - std::unordered_set ignore_nodes, std::list participants); + std::unordered_set ignore_nodes); public: repair_service(sharded& tsm,