idl, message: make with_timeout and cancellable verb attributes composable

And define `send_message_timeout_cancellable` in rpc_protocol_impl.hh
using the newly introduced rpc_handler entry point
in seastar that accepts both timeout and cancellable params.

Note that the interface to the user still uses abort_source
while internally the funtion allocates a seastar::rpc::cancellable
object.  It is possible to provide an interface that will accept
a rpc::cancellable& from the caller, but the existing messaging api
uses abort_source.  Changing it may be considered in the future.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
Benny Halevy
2025-04-10 17:37:02 +03:00
parent e06d226d08
commit 0b97806771
2 changed files with 35 additions and 4 deletions

View File

@@ -479,11 +479,9 @@ class RpcVerb(ASTBase):
- [[with_timeout]] - an additional time_point parameter is supplied
to the handler function and send* method uses send_message_*_timeout
variant of internal function to actually send the message.
Incompatible with [[cancellable]].
- [[cancellable]] - an additional abort_source& parameter is supplied
to the handler function and send* method uses send_message_*_cancellable
variant of internal function to actually send the message.
Incompatible with [[with_timeout]].
- [[one_way]] - the handler function is annotated by
future<rpc::no_wait_type> return type to designate that a client
doesn't need to wait for an answer.
@@ -697,8 +695,6 @@ def rpc_verb_parse_action(tokens):
one_way = not raw_attrs.empty() and 'one_way' in raw_attrs.attr_items
if one_way and 'return_values' in tokens:
raise Exception(f"Invalid return type specification for one-way RPC verb '{name}'")
if with_timeout and cancellable:
raise Exception(f"Error in verb {name}: [[with_timeout]] cannot be used together with [[cancellable]] in the same verb")
return RpcVerb(name=name, parameters=params, return_values=tokens.get('return_values'), with_client_info=with_client_info, with_timeout=with_timeout, cancellable=cancellable, one_way=one_way, ip=ip)

View File

@@ -250,6 +250,41 @@ auto send_message_cancellable(messaging_service* ms, messaging_verb verb, locato
return send_message_cancellable<MsgIn, MsgOut...>(ms, verb, std::optional{id}, ms->addr_for_host_id(id), as, std::forward<MsgOut>(msg)...);
}
template <typename MsgIn, typename Timeout, typename... MsgOut>
auto send_message_timeout_cancellable(messaging_service* ms, messaging_verb verb, locator::host_id host_id, Timeout timeout, abort_source& as, MsgOut&&... msg) {
auto rpc_handler = ms->rpc()->make_client<MsgIn(MsgOut...)>(verb);
using futurator = futurize<std::invoke_result_t<decltype(rpc_handler), rpc_protocol::client&, MsgOut...>>;
if (ms->is_shutting_down()) {
return futurator::make_exception_future(rpc::closed_error());
}
auto rpc_client_ptr = ms->get_rpc_client(verb, ms->addr_for_host_id(host_id), host_id);
auto& rpc_client = *rpc_client_ptr;
auto c = std::make_unique<seastar::rpc::cancellable>();
auto& c_ref = *c;
auto sub = as.subscribe([c = std::move(c)] () noexcept {
c->cancel();
});
if (!sub) {
return futurator::make_exception_future(abort_requested_exception{});
}
return rpc_handler(rpc_client, timeout, c_ref, std::forward<MsgOut>(msg)...).handle_exception([ms = ms->shared_from_this(), host_id, verb, rpc_client_ptr = std::move(rpc_client_ptr), sub = std::move(sub)] (std::exception_ptr&& eptr) {
ms->increment_dropped_messages(verb);
if (try_catch<rpc::closed_error>(eptr)) {
// This is a transport error
ms->remove_error_rpc_client(verb, host_id);
return futurator::make_exception_future(std::move(eptr));
} else if (try_catch<rpc::canceled_error>(eptr)) {
// Translate low-level canceled_error into high-level abort_requested_exception.
return futurator::make_exception_future(abort_requested_exception{});
} else {
// This is expected to be a rpc server error, e.g., the rpc handler throws a std::runtime_error.
return futurator::make_exception_future(std::move(eptr));
}
});
}
// Send one way message for verb
template <typename... MsgOut>
auto send_message_oneway(messaging_service* ms, messaging_verb verb, msg_addr id, MsgOut&&... msg) {