idl: generate host_id variant of send functions as well
We want to be able to address nodes by host ids. For that lets generate send functions that gets host_id as a dst parameter. Changes to raft_rpc are needed because otherwise the compiler cannot select a correct overload.
This commit is contained in:
@@ -461,6 +461,7 @@ class RpcVerb(ASTBase):
|
||||
static void register_my_verb(netw::messaging_service* ms, std::function<return_value(args...)>&&);
|
||||
static future<> unregister_my_verb(netw::messaging_service* ms);
|
||||
static future<> send_my_verb(netw::messaging_service* ms, netw::msg_addr id, args...);
|
||||
static future<> send_my_verb(netw::messaging_service* ms, locator::host_id id, args...);
|
||||
|
||||
Each method accepts a pointer to an instance of messaging_service
|
||||
object, which contains the underlying seastar RPC protocol
|
||||
@@ -555,8 +556,8 @@ class RpcVerb(ASTBase):
|
||||
res.extend(self.params)
|
||||
return ', '.join([p.to_string() for p in res])
|
||||
|
||||
def send_function_signature_params_list(self, include_placeholder_names):
|
||||
res = 'netw::messaging_service* ms, netw::msg_addr id'
|
||||
def send_function_signature_params_list(self, include_placeholder_names, dst_type):
|
||||
res = f'netw::messaging_service* ms, {dst_type} id'
|
||||
if self.with_timeout:
|
||||
res += ', netw::messaging_service::clock_type::time_point timeout'
|
||||
if self.cancellable:
|
||||
@@ -1613,7 +1614,8 @@ def generate_rpc_verbs_declarations(hout, module_name):
|
||||
fprintln(hout, reindent(4, f'''static void register_{name}(netw::messaging_service* ms,
|
||||
std::function<{verb.handler_function_return_values()} ({verb.handler_function_parameters_str()})>&&);
|
||||
static future<> unregister_{name}(netw::messaging_service* ms);
|
||||
static {verb.send_function_return_type()} send_{name}({verb.send_function_signature_params_list(include_placeholder_names=False)});
|
||||
static {verb.send_function_return_type()} send_{name}({verb.send_function_signature_params_list(include_placeholder_names=False, dst_type="netw::msg_addr")});
|
||||
static {verb.send_function_return_type()} send_{name}({verb.send_function_signature_params_list(include_placeholder_names=False, dst_type="locator::host_id")});
|
||||
'''))
|
||||
|
||||
fprintln(hout, reindent(4, 'static future<> unregister(netw::messaging_service* ms);'))
|
||||
@@ -1633,7 +1635,11 @@ future<> {module_name}_rpc_verbs::unregister_{name}(netw::messaging_service* ms)
|
||||
return ms->unregister_handler({verb.messaging_verb_enum_case()});
|
||||
}}
|
||||
|
||||
{verb.send_function_return_type()} {module_name}_rpc_verbs::send_{name}({verb.send_function_signature_params_list(include_placeholder_names=True)}) {{
|
||||
{verb.send_function_return_type()} {module_name}_rpc_verbs::send_{name}({verb.send_function_signature_params_list(include_placeholder_names=True, dst_type="netw::msg_addr")}) {{
|
||||
{verb.send_function_invocation()}
|
||||
}}
|
||||
|
||||
{verb.send_function_return_type()} {module_name}_rpc_verbs::send_{name}({verb.send_function_signature_params_list(include_placeholder_names=True, dst_type="locator::host_id")}) {{
|
||||
{verb.send_function_invocation()}
|
||||
}}''')
|
||||
|
||||
|
||||
@@ -84,7 +84,8 @@ raft_rpc::two_way_rpc(sloc loc, raft::server_id id,
|
||||
}
|
||||
|
||||
future<raft::snapshot_reply> raft_rpc::send_snapshot(raft::server_id id, const raft::install_snapshot& snap, seastar::abort_source& as) {
|
||||
return two_way_rpc(sloc::current(), id, ser::raft_rpc_verbs::send_raft_send_snapshot, snap);
|
||||
auto l = [](auto&&...args) -> decltype(auto) { return ser::raft_rpc_verbs::send_raft_send_snapshot(std::forward<decltype(args)>(args)...); };
|
||||
return two_way_rpc(sloc::current(), id, std::move(l), snap);
|
||||
}
|
||||
|
||||
future<> raft_rpc::send_append_entries(raft::server_id id, const raft::append_request& append_request) {
|
||||
@@ -102,41 +103,50 @@ future<> raft_rpc::send_append_entries(raft::server_id id, const raft::append_re
|
||||
}
|
||||
|
||||
void raft_rpc::send_append_entries_reply(raft::server_id id, const raft::append_reply& reply) {
|
||||
one_way_rpc<one_way_kind::reply>(sloc::current(), id, ser::raft_rpc_verbs::send_raft_append_entries_reply, reply);
|
||||
auto l = [] (auto&&...args) -> decltype(auto) { return ser::raft_rpc_verbs::send_raft_append_entries_reply(std::forward<decltype(args)>(args)...); };
|
||||
one_way_rpc<one_way_kind::reply>(sloc::current(), id, std::move(l), reply);
|
||||
}
|
||||
|
||||
void raft_rpc::send_vote_request(raft::server_id id, const raft::vote_request& vote_request) {
|
||||
one_way_rpc<one_way_kind::request>(sloc::current(), id, ser::raft_rpc_verbs::send_raft_vote_request, vote_request);
|
||||
auto l = [] (auto&&...args) -> decltype(auto) { return ser::raft_rpc_verbs::send_raft_vote_request(std::forward<decltype(args)>(args)...); };
|
||||
one_way_rpc<one_way_kind::request>(sloc::current(), id, std::move(l), vote_request);
|
||||
}
|
||||
|
||||
void raft_rpc::send_vote_reply(raft::server_id id, const raft::vote_reply& vote_reply) {
|
||||
one_way_rpc<one_way_kind::reply>(sloc::current(), id, ser::raft_rpc_verbs::send_raft_vote_reply, vote_reply);
|
||||
auto l = [] (auto&&...args) -> decltype(auto) { return ser::raft_rpc_verbs::send_raft_vote_reply(std::forward<decltype(args)>(args)...); };
|
||||
one_way_rpc<one_way_kind::reply>(sloc::current(), id, std::move(l), vote_reply);
|
||||
}
|
||||
|
||||
void raft_rpc::send_timeout_now(raft::server_id id, const raft::timeout_now& timeout_now) {
|
||||
one_way_rpc<one_way_kind::request>(sloc::current(), id, ser::raft_rpc_verbs::send_raft_timeout_now, timeout_now);
|
||||
auto l = [] (auto&&...args) -> decltype(auto) { return ser::raft_rpc_verbs::send_raft_timeout_now(std::forward<decltype(args)>(args)...); };
|
||||
one_way_rpc<one_way_kind::request>(sloc::current(), id, std::move(l), timeout_now);
|
||||
}
|
||||
|
||||
void raft_rpc::send_read_quorum(raft::server_id id, const raft::read_quorum& read_quorum) {
|
||||
one_way_rpc<one_way_kind::request>(sloc::current(), id, ser::raft_rpc_verbs::send_raft_read_quorum, read_quorum);
|
||||
auto l = [] (auto&&...args) -> decltype(auto) { return ser::raft_rpc_verbs::send_raft_read_quorum(std::forward<decltype(args)>(args)...); };
|
||||
one_way_rpc<one_way_kind::request>(sloc::current(), id, std::move(l), read_quorum);
|
||||
}
|
||||
|
||||
void raft_rpc::send_read_quorum_reply(raft::server_id id, const raft::read_quorum_reply& read_quorum_reply) {
|
||||
one_way_rpc<one_way_kind::reply>(sloc::current(), id, ser::raft_rpc_verbs::send_raft_read_quorum_reply, read_quorum_reply);
|
||||
auto l = [] (auto&&...args) -> decltype(auto) { return ser::raft_rpc_verbs::send_raft_read_quorum_reply(std::forward<decltype(args)>(args)...); };
|
||||
one_way_rpc<one_way_kind::reply>(sloc::current(), id, std::move(l), read_quorum_reply);
|
||||
}
|
||||
|
||||
future<raft::add_entry_reply> raft_rpc::send_add_entry(raft::server_id id, const raft::command& cmd) {
|
||||
return two_way_rpc(sloc::current(), id, ser::raft_rpc_verbs::send_raft_add_entry, cmd);
|
||||
auto l = [] (auto&&...args) -> decltype(auto) { return ser::raft_rpc_verbs::send_raft_add_entry(std::forward<decltype(args)>(args)...); };
|
||||
return two_way_rpc(sloc::current(), id, std::move(l), cmd);
|
||||
}
|
||||
|
||||
future<raft::add_entry_reply> raft_rpc::send_modify_config(raft::server_id id,
|
||||
const std::vector<raft::config_member>& add,
|
||||
const std::vector<raft::server_id>& del) {
|
||||
return two_way_rpc(sloc::current(), id, ser::raft_rpc_verbs::send_raft_modify_config, add, del);
|
||||
auto l = [] (auto&&...args) -> decltype(auto) { return ser::raft_rpc_verbs::send_raft_modify_config(std::forward<decltype(args)>(args)...); };
|
||||
return two_way_rpc(sloc::current(), id, std::move(l), add, del);
|
||||
}
|
||||
|
||||
future<raft::read_barrier_reply> raft_rpc::execute_read_barrier_on_leader(raft::server_id id) {
|
||||
return two_way_rpc(sloc::current(), id, ser::raft_rpc_verbs::send_raft_execute_read_barrier_on_leader);
|
||||
auto l = [] (auto&&...args) -> decltype(auto) { return ser::raft_rpc_verbs::send_raft_execute_read_barrier_on_leader(std::forward<decltype(args)>(args)...); };
|
||||
return two_way_rpc(sloc::current(), id, std::move(l));
|
||||
}
|
||||
|
||||
future<> raft_rpc::abort() {
|
||||
|
||||
Reference in New Issue
Block a user