From cc1b5aaf5116cd1d1654824e5a51b23a339d6aca Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Thu, 10 Oct 2024 12:03:30 +0300 Subject: [PATCH] idl: generate host_id variant of send functions as well We want to be able to address nodes by host ids. For that lets generate send functions that gets host_id as a dst parameter. Changes to raft_rpc are needed because otherwise the compiler cannot select a correct overload. --- idl-compiler.py | 14 ++++++++++---- service/raft/raft_rpc.cc | 30 ++++++++++++++++++++---------- 2 files changed, 30 insertions(+), 14 deletions(-) diff --git a/idl-compiler.py b/idl-compiler.py index 20dd824ce5..45650ffbae 100755 --- a/idl-compiler.py +++ b/idl-compiler.py @@ -461,6 +461,7 @@ class RpcVerb(ASTBase): static void register_my_verb(netw::messaging_service* ms, std::function&&); static future<> unregister_my_verb(netw::messaging_service* ms); static future<> send_my_verb(netw::messaging_service* ms, netw::msg_addr id, args...); + static future<> send_my_verb(netw::messaging_service* ms, locator::host_id id, args...); Each method accepts a pointer to an instance of messaging_service object, which contains the underlying seastar RPC protocol @@ -555,8 +556,8 @@ class RpcVerb(ASTBase): res.extend(self.params) return ', '.join([p.to_string() for p in res]) - def send_function_signature_params_list(self, include_placeholder_names): - res = 'netw::messaging_service* ms, netw::msg_addr id' + def send_function_signature_params_list(self, include_placeholder_names, dst_type): + res = f'netw::messaging_service* ms, {dst_type} id' if self.with_timeout: res += ', netw::messaging_service::clock_type::time_point timeout' if self.cancellable: @@ -1613,7 +1614,8 @@ def generate_rpc_verbs_declarations(hout, module_name): fprintln(hout, reindent(4, f'''static void register_{name}(netw::messaging_service* ms, std::function<{verb.handler_function_return_values()} ({verb.handler_function_parameters_str()})>&&); static future<> unregister_{name}(netw::messaging_service* ms); -static {verb.send_function_return_type()} send_{name}({verb.send_function_signature_params_list(include_placeholder_names=False)}); +static {verb.send_function_return_type()} send_{name}({verb.send_function_signature_params_list(include_placeholder_names=False, dst_type="netw::msg_addr")}); +static {verb.send_function_return_type()} send_{name}({verb.send_function_signature_params_list(include_placeholder_names=False, dst_type="locator::host_id")}); ''')) fprintln(hout, reindent(4, 'static future<> unregister(netw::messaging_service* ms);')) @@ -1633,7 +1635,11 @@ future<> {module_name}_rpc_verbs::unregister_{name}(netw::messaging_service* ms) return ms->unregister_handler({verb.messaging_verb_enum_case()}); }} -{verb.send_function_return_type()} {module_name}_rpc_verbs::send_{name}({verb.send_function_signature_params_list(include_placeholder_names=True)}) {{ +{verb.send_function_return_type()} {module_name}_rpc_verbs::send_{name}({verb.send_function_signature_params_list(include_placeholder_names=True, dst_type="netw::msg_addr")}) {{ + {verb.send_function_invocation()} +}} + +{verb.send_function_return_type()} {module_name}_rpc_verbs::send_{name}({verb.send_function_signature_params_list(include_placeholder_names=True, dst_type="locator::host_id")}) {{ {verb.send_function_invocation()} }}''') diff --git a/service/raft/raft_rpc.cc b/service/raft/raft_rpc.cc index f090506ad7..c934cc0d6a 100644 --- a/service/raft/raft_rpc.cc +++ b/service/raft/raft_rpc.cc @@ -84,7 +84,8 @@ raft_rpc::two_way_rpc(sloc loc, raft::server_id id, } future raft_rpc::send_snapshot(raft::server_id id, const raft::install_snapshot& snap, seastar::abort_source& as) { - return two_way_rpc(sloc::current(), id, ser::raft_rpc_verbs::send_raft_send_snapshot, snap); + auto l = [](auto&&...args) -> decltype(auto) { return ser::raft_rpc_verbs::send_raft_send_snapshot(std::forward(args)...); }; + return two_way_rpc(sloc::current(), id, std::move(l), snap); } future<> raft_rpc::send_append_entries(raft::server_id id, const raft::append_request& append_request) { @@ -102,41 +103,50 @@ future<> raft_rpc::send_append_entries(raft::server_id id, const raft::append_re } void raft_rpc::send_append_entries_reply(raft::server_id id, const raft::append_reply& reply) { - one_way_rpc(sloc::current(), id, ser::raft_rpc_verbs::send_raft_append_entries_reply, reply); + auto l = [] (auto&&...args) -> decltype(auto) { return ser::raft_rpc_verbs::send_raft_append_entries_reply(std::forward(args)...); }; + one_way_rpc(sloc::current(), id, std::move(l), reply); } void raft_rpc::send_vote_request(raft::server_id id, const raft::vote_request& vote_request) { - one_way_rpc(sloc::current(), id, ser::raft_rpc_verbs::send_raft_vote_request, vote_request); + auto l = [] (auto&&...args) -> decltype(auto) { return ser::raft_rpc_verbs::send_raft_vote_request(std::forward(args)...); }; + one_way_rpc(sloc::current(), id, std::move(l), vote_request); } void raft_rpc::send_vote_reply(raft::server_id id, const raft::vote_reply& vote_reply) { - one_way_rpc(sloc::current(), id, ser::raft_rpc_verbs::send_raft_vote_reply, vote_reply); + auto l = [] (auto&&...args) -> decltype(auto) { return ser::raft_rpc_verbs::send_raft_vote_reply(std::forward(args)...); }; + one_way_rpc(sloc::current(), id, std::move(l), vote_reply); } void raft_rpc::send_timeout_now(raft::server_id id, const raft::timeout_now& timeout_now) { - one_way_rpc(sloc::current(), id, ser::raft_rpc_verbs::send_raft_timeout_now, timeout_now); + auto l = [] (auto&&...args) -> decltype(auto) { return ser::raft_rpc_verbs::send_raft_timeout_now(std::forward(args)...); }; + one_way_rpc(sloc::current(), id, std::move(l), timeout_now); } void raft_rpc::send_read_quorum(raft::server_id id, const raft::read_quorum& read_quorum) { - one_way_rpc(sloc::current(), id, ser::raft_rpc_verbs::send_raft_read_quorum, read_quorum); + auto l = [] (auto&&...args) -> decltype(auto) { return ser::raft_rpc_verbs::send_raft_read_quorum(std::forward(args)...); }; + one_way_rpc(sloc::current(), id, std::move(l), read_quorum); } void raft_rpc::send_read_quorum_reply(raft::server_id id, const raft::read_quorum_reply& read_quorum_reply) { - one_way_rpc(sloc::current(), id, ser::raft_rpc_verbs::send_raft_read_quorum_reply, read_quorum_reply); + auto l = [] (auto&&...args) -> decltype(auto) { return ser::raft_rpc_verbs::send_raft_read_quorum_reply(std::forward(args)...); }; + one_way_rpc(sloc::current(), id, std::move(l), read_quorum_reply); } future raft_rpc::send_add_entry(raft::server_id id, const raft::command& cmd) { - return two_way_rpc(sloc::current(), id, ser::raft_rpc_verbs::send_raft_add_entry, cmd); + auto l = [] (auto&&...args) -> decltype(auto) { return ser::raft_rpc_verbs::send_raft_add_entry(std::forward(args)...); }; + return two_way_rpc(sloc::current(), id, std::move(l), cmd); } future raft_rpc::send_modify_config(raft::server_id id, const std::vector& add, const std::vector& del) { - return two_way_rpc(sloc::current(), id, ser::raft_rpc_verbs::send_raft_modify_config, add, del); + auto l = [] (auto&&...args) -> decltype(auto) { return ser::raft_rpc_verbs::send_raft_modify_config(std::forward(args)...); }; + return two_way_rpc(sloc::current(), id, std::move(l), add, del); } future raft_rpc::execute_read_barrier_on_leader(raft::server_id id) { - return two_way_rpc(sloc::current(), id, ser::raft_rpc_verbs::send_raft_execute_read_barrier_on_leader); + auto l = [] (auto&&...args) -> decltype(auto) { return ser::raft_rpc_verbs::send_raft_execute_read_barrier_on_leader(std::forward(args)...); }; + return two_way_rpc(sloc::current(), id, std::move(l)); } future<> raft_rpc::abort() {