Merge 'IDL: support generating boilerplate code for RPC verbs' from Pavel Solodovnikov
Introduce new syntax in IDL compiler to allow generating
registration/sending code for RPC verbs:
```
verb [[attr1, attr2...] my_verb (args...) -> return_type;
```
`my_verb` RPC verb declaration corresponds to the
`netw::messaging_verb::MY_VERB` enumeration value to identify the
new RPC verb.
For a given `idl_module.idl.hh` file, a registrator class named
`idl_module_rpc_verbs` will be created if there are any RPC verbs
registered within the IDL module file.
These are the methods being created for each RPC verb:
```
static void register_my_verb(netw::messaging_service* ms, std::function<return_type(args...)>&&);
static future<> unregister_my_verb(netw::messaging_service* ms);
static future<> send_my_verb(netw::messaging_service* ms, netw::msg_addr id, args...);
```
Each method accepts a pointer to an instance of `messaging_service`
object, which contains the underlying seastar RPC protocol
implementation, that is used to register verbs and pass messages.
There is also a method to unregister all verbs at once:
```
static future<> unregister(netw::messaging_service* ms);
```
The following attributes are supported when declaring an RPC verb
in the IDL:
* `[[with_client_info]]` - the handler will contain a const reference to
an `rpc::client_info` as the first argument.
* `[[with_timeout]]` - an additional `time_point` parameter is supplied
to the handler function and `send*` method uses `send_message_*_timeout`
variant of internal function to actually send the message.
* `[[one_way]]` - the handler function is annotated by
`future<rpc::no_wait_type>` return type to designate that a client
doesn't need to wait for an answer.
The `-> return_type` clause is optional for two-way messages. If omitted,
the return type is set to be `future<>`.
For one-way verbs, the use of return clause is prohibited and the
signature of `send*` function always returns `future<>`.
No existing code is affected.
Ref: #1456
Closes #9359
* github.com:scylladb/scylla:
idl: support generating boilerplate code for RPC verbs
idl: allow specifying multiple attributes in the grammar
message: messaging_service: extract RPC protocol details and helpers into a separate header
This commit is contained in:
@@ -1,54 +1,193 @@
|
||||
# IDL definition
|
||||
The schema we use similar to c++ schema.
|
||||
Use class or struct similar to the object you need the serializer for.
|
||||
Use namespace when applicable.
|
||||
# IDL compiler
|
||||
|
||||
## keywords
|
||||
* class/struct - a class or a struct like C++
|
||||
class/struct can have final or stub marker
|
||||
* namespace - has the same C++ meaning
|
||||
* enum class - has the same C++ meaning
|
||||
* final modifier for class - when a class mark as final it will not contain a size parameter. Note that final class cannot be extended by future version, so use with care
|
||||
* stub class - when a class is mark as stub, it means that no code will be generated for this class and it is only there as a documentation.
|
||||
* version attributes - mark with [[version id ]] mark that a field is available from a specific version
|
||||
* template - A template class definition like C++
|
||||
## Syntax
|
||||
IDL compiler is a tool written in Python, that generates utility serialization/de-serialization code (specializations
|
||||
for `ser::serializer<T>` class) for C++ classes and enums. It takes an IDL definition file as input and generates two
|
||||
files from it: `<mod_name>.dist.hh`, which is declarations part, and `<mod_name>.dist.impl.hh`, which contains generated
|
||||
code definitions (private part).
|
||||
|
||||
The syntax of IDL is similar to C++ to some degree (e.g. contains `struct/class` and `enum class` constructs), but is
|
||||
also extended to support more complex things in the context of RPC serialization: RPC messages (also called
|
||||
"RPC verbs").
|
||||
|
||||
By default, all generated code for Scylla is created under `ser` namespace.
|
||||
|
||||
## File syntax description
|
||||
|
||||
As noted above, the syntax of IDL definitions file resembles C++ by providing similar-looking code constructs for class
|
||||
and enum serializers.
|
||||
|
||||
The IDL file contains a sequence of entity declarations, most of which correspond directly to those from the actual
|
||||
C++ code:
|
||||
|
||||
- Namespaces (`namespace`)
|
||||
- Classes / structures (`class` / `struct`)
|
||||
- Enums (`enum class`)
|
||||
- RPC verbs (`verb`)
|
||||
|
||||
Classes can be templated, supporting the ordinary `template <typename T>`-style notation at the beginning of the class
|
||||
declaration, similar to C++.
|
||||
|
||||
Some places (e.g. classes, class members and RPC verbs) also support modifying generation behavior by providing
|
||||
attribute sequences (via C++ syntax of `[[attr]]` form).
|
||||
|
||||
### Namespaces
|
||||
|
||||
Namespaces in IDL act the same way as in C++ or any other programming language, and can have arbitrary nesting depth.
|
||||
Syntax:
|
||||
|
||||
### Namespace
|
||||
```
|
||||
namespace ns_name { namespace-body }
|
||||
namespace <ns-name> { <namespace-body> }
|
||||
```
|
||||
* ns_name: either a previously unused identifier, in which case this is original-namespace-definition or the name of a namespace, in which case this is extension-namespace-definition
|
||||
* namespace-body: possibly empty sequence of declarations of any kind (including class and struct definitions as well as nested namespaces)
|
||||
|
||||
### class/struct
|
||||
`
|
||||
class-key class-name final(optional) stub(optional) { member-specification } ;(optional)
|
||||
`
|
||||
* class-key: one of class or struct.
|
||||
* class-name: the name of the class that's being defined. optionally followed by keyword final, optionally followed by keyword stub
|
||||
* final: when a class mark as final, it means it can not be extended and there is no need to serialize its size, use with care.
|
||||
* stub: when a class is mark as stub, it means no code will generate for it and it is added for documentation only.
|
||||
* member-specification: list of access specifiers, and public member accessor see class member below.
|
||||
* to be compatible with C++ a class definition can be followed by a semicolon.
|
||||
### enum
|
||||
`enum-key identifier enum-base { enumerator-list(optional) }`
|
||||
* enum-key: only enum class is supported
|
||||
* identifier: the name of the enumeration that's being declared.
|
||||
* enum-base: colon (:), followed by a type-specifier-seq that names an integral type (see the C++ standard for the full list of all possible integral types).
|
||||
* enumerator-list: comma-separated list of enumerator definitions, each of which is either simply an identifier, which becomes the name of the enumerator, or an identifier with an initializer: identifier = integral value.
|
||||
Note that though C++ allows constexpr as an initialize value, it makes the documentation less readable, hence is not permitted.
|
||||
Where:
|
||||
|
||||
### class member
|
||||
`type member-access attributes(optional) default-value(optional);`
|
||||
* type: Any valid C++ type, following the C++ notation. note that there should be a serializer for the type, but deceleration order is not mandatory
|
||||
* member-access: is the way the member can be access. If the member is public it can be the name itself. if not it could be a getter function that should be followed by braces. Note that getter can (and probably should) be const methods.
|
||||
* attributes: Attributes define by square brackets. Currently are use to mark a version in which a specific member was added [ [ version version-number] ] would mark that the specific member was added in the given version number.
|
||||
- `ns-name` — namespace identifier. Directly corresponds to the namespace from C++, defining a new namespace or
|
||||
extending an existing one.
|
||||
- `namespace-body` — a sequence of 0+ nested entities of any kind: classes, enums, RPC verbs or other nested
|
||||
namespaces.
|
||||
|
||||
### template
|
||||
`template < parameter-list > class-declaration`
|
||||
* parameter-list - a non-empty comma-separated list of the template parameters.
|
||||
* class-decleration - (See class section) The class name declared become a template name.
|
||||
Example:
|
||||
|
||||
```
|
||||
namespace ns {
|
||||
|
||||
class my_class {
|
||||
int a;
|
||||
bool b;
|
||||
std::vector<int> c;
|
||||
};
|
||||
|
||||
enum class e {
|
||||
E1,
|
||||
E2
|
||||
};
|
||||
|
||||
// ...
|
||||
|
||||
} // namespace ns
|
||||
```
|
||||
|
||||
### Classes
|
||||
|
||||
Class declaration creates a `ser::serializer<T>` specialization для for a class with a given name, along with an
|
||||
implementation of `write`, `read` and `skip` methods. Syntax:
|
||||
|
||||
```
|
||||
template <template-parameter-list> (optional)
|
||||
class-key <class-name> final(optional) attributes_seq(optional) stub(optional) { <nested-class> | <member-specification> } ;(optional)
|
||||
```
|
||||
|
||||
Where:
|
||||
|
||||
- `template-parameter-list` - the list of template arguments in case a class is a template.
|
||||
- `class-key` - either `class`, or `struct`.
|
||||
- `class-name` - class identifier, for which `ser::serializer<T>` specialization is to be created.
|
||||
- `final` specifier — an optimization option, which denotes that a class is final, i.e. cannot be extended in the
|
||||
future, in which case the size is not serialized. Should be used with care.
|
||||
- `stub` specifier — skip generating serialization code for a class. Initially designed for documentation within IDL
|
||||
(there is one exception to that with `[[writable]]` attribute, though. More on that below).
|
||||
- `attributes_seq` - optional sequence of C++-style attributes. Only `[[writable]]` attribute is supported at the
|
||||
moment, which means that: if specified, the writers and serialization views will also be generated for a class
|
||||
(inside the private `<mod_name>.dist.impl.hh` part). Other attributes are ignored.
|
||||
- `nested-class` - nested class definition following the same syntax.
|
||||
- `member-specification` — data members and getter functions declarations (more details on the syntax below). Getter
|
||||
functions should be used in cases where access to private class fields is needed. Both kinds of field accessors can
|
||||
also be marked with `[[version id]]` version attribute, which denotes, that a field is accessible starting from
|
||||
version `id`.
|
||||
|
||||
A class declaration can optionally include a semicolon at the end to more closely resemble C++ syntax.
|
||||
|
||||
If a class contains both `stub` specifier and `[[writable]]` attribute at the same time, the `ser::serializer` code is
|
||||
not generated but the serialization views (classes with `_view` suffix in the name, that support reading and writing
|
||||
data in the stream according to some fixed data layout) are created, nonetheless.
|
||||
|
||||
Class members are declared the following way:
|
||||
|
||||
```
|
||||
<type> name <getter-marker>(optional) [[version version-id]](optional) <default-value>(optional);
|
||||
```
|
||||
|
||||
Where:
|
||||
|
||||
- `type` - any valid C++ type, following the regular C++ syntax. Naturally, a serializer specialization for this
|
||||
type should exist in order to serialize/deserialize it.
|
||||
- `name` - accessor name. For ordinary data fields it is just a C++ name of a class field. In case it's a getter
|
||||
function, it also should contain a "getter marker", which is denoted as empty `()` braces sequence right after the
|
||||
name. As noted above, getter functions should be used if a field is not accessible (i.e. is private), otherwise a
|
||||
regular data field can be used. Note, that getter functions can (and probably should) be const methods.
|
||||
- version attribute — specify that a field is accessible starting from version `version-id` and above.
|
||||
- `default-value` — an optional clause to specify default value for a field accessor. The syntax is: `= <value>`.
|
||||
|
||||
### Enums
|
||||
|
||||
Analogous to classes, `ser::serializer<T>` specializations can also be generated for enums. Declaration syntax:
|
||||
|
||||
```
|
||||
enum class identifier enum-base { enumerator-list } ;(optional)
|
||||
```
|
||||
|
||||
Where:
|
||||
|
||||
- `identifier` - the name of C++ enum class.
|
||||
- `enum-base` - mandatory specification of C++ underlying type for the enum, following the regular C++ syntax: `: integer-type`
|
||||
- `enumerator-list` - a list of enum cases or initializers of the following form: `name = integer-value`, where
|
||||
`integer-value` is a plain integer literal value.
|
||||
|
||||
Note that although C++ allows `constexpr` as an initializer value, it makes the documentation less readable, hence is
|
||||
not permitted.
|
||||
|
||||
### RPC Verbs
|
||||
|
||||
IDL can also contain declarations of RPC messages with a given signature (also called: "RPC verbs"). It allows to
|
||||
automatically generate boilerplate code for message handlers registration and message passing code via an instance of
|
||||
`netw::messaging_service` class. Declaration syntax:
|
||||
|
||||
```
|
||||
verb id [[attributes...]](optional) (parameters...) (-> return-type)(optional) ;(optional)
|
||||
```
|
||||
|
||||
There should be a corresponding upper-cased enumerator `ID` inside the `netw::messaging_verb` enum for a verb with name
|
||||
`id`. For example, for a `my_verb` declaration there should be a corresponding `netw::messaging_verb::MY_VERB` constant
|
||||
to specify an id for the RPC client.
|
||||
|
||||
The parameters of the verb declarations will also act as parameters for the handler functions and corresponding `send`
|
||||
functions. In case `[[with_timeout]]` attribute is set, the argument list is extended with a `time_point` argument at the
|
||||
beginning of the parameter list to specify an RPC timeout when sending or handling the message.
|
||||
|
||||
The return value type is calculated as `future<return_type>` and is used as return type for message handler and `send`
|
||||
function. If the `-> return type` clause is omitted, the return type is assumed to be `future<>`. If `[[one_way]]`
|
||||
attribute is specified, handler function return type is fixed to `future<rpc::no_wait_type>`, and the return type
|
||||
clause should not be used, otherwise an exception will be thrown during IDL generation process.
|
||||
|
||||
If `[[with_client_info]]` attribute is used then the handler will contain a const reference to an `rpc::client_info` as
|
||||
its first argument.
|
||||
|
||||
`[[with_client_info]]`, `[[with_timeout]]` and `[[one_way]]` attributes can be combined.
|
||||
|
||||
For an RPC verb with the definition of `verb x (arg1_t, arg2_t) -> ret_t;` , which is defined in some `my_mod.idl.hh`
|
||||
module, the following `my_mod_rpc_verbs` class will be generated (approximately):
|
||||
|
||||
```cpp
|
||||
// my_mod.dist.hh
|
||||
|
||||
namespace ser {
|
||||
|
||||
struct my_mod_rpc_verbs {
|
||||
static void register_x(netw::messaging_service* ms, std::function<future<ret_t> (const rpc::client_info&, arg1_t, arg2_t>&&);
|
||||
static future<> unregister_x(netw::messaging_service* ms);
|
||||
static future<ret_t> send_x(netw::messaging_service* ms, netw::msg_addr id, arg1_t, arg2_t);
|
||||
|
||||
// calls unregister_x, and the same for other verbs, if there are any,
|
||||
// and waits for all of them to resolve.
|
||||
static future<> unregister(netw::messaging_service* ms);
|
||||
};
|
||||
|
||||
} // namespace ser
|
||||
```
|
||||
|
||||
Each parameter can optionally have a name, otherwise a placeholder name of form `_N` will be used, where `N` is the
|
||||
index within the RPC verb parameters list. Also, each argument can be annotated with `[[version id]]` attribute, which
|
||||
will cause it to be accepted as a `rpc::optional<>` in the handler function signature.
|
||||
|
||||
## IDL example
|
||||
Forward slashes comments are ignored until the end of the line.
|
||||
|
||||
296
idl-compiler.py
296
idl-compiler.py
@@ -28,6 +28,7 @@ from numbers import Number
|
||||
from pprint import pformat
|
||||
from copy import copy
|
||||
from typing import List
|
||||
import os.path
|
||||
|
||||
EXTENSION = '.idl.hh'
|
||||
READ_BUFF = 'input_buffer'
|
||||
@@ -123,6 +124,11 @@ class BasicType(ASTBase):
|
||||
def __repr__(self):
|
||||
return self.__str__()
|
||||
|
||||
def to_string(self):
|
||||
if self.is_const:
|
||||
return 'const ' + self.name
|
||||
return self.name
|
||||
|
||||
|
||||
class TemplateType(ASTBase):
|
||||
'''AST node representing template types, for example: `std::vector<T>`.
|
||||
@@ -144,6 +150,12 @@ class TemplateType(ASTBase):
|
||||
def __repr__(self):
|
||||
return self.__str__()
|
||||
|
||||
def to_string(self):
|
||||
res = self.name + '<'
|
||||
res += ', '.join([p.to_string() for p in self.template_parameters])
|
||||
res += '>'
|
||||
return res
|
||||
|
||||
|
||||
class EnumValue(ASTBase):
|
||||
'''AST node representing a single `name=value` enumerator in the enum.
|
||||
@@ -198,7 +210,7 @@ template<typename Input>
|
||||
}}""")
|
||||
|
||||
|
||||
class Attribute(ASTBase):
|
||||
class Attributes(ASTBase):
|
||||
''' AST node for representing class and field attributes.
|
||||
|
||||
The following attributes are supported:
|
||||
@@ -206,15 +218,19 @@ class Attribute(ASTBase):
|
||||
for a class.
|
||||
- `[[version id]] field attribute, marks that a field is available starting
|
||||
from a specific version.'''
|
||||
def __init__(self, name):
|
||||
super().__init__(name)
|
||||
def __init__(self, attr_items=[]):
|
||||
super().__init__('attributes')
|
||||
self.attr_items = attr_items
|
||||
|
||||
def __str__(self):
|
||||
return f"[[{self.name}]]"
|
||||
return f"[[{', '.join([a for a in self.attr_items])}]]"
|
||||
|
||||
def __repr__(self):
|
||||
return self.__str__()
|
||||
|
||||
def empty(self):
|
||||
return not self.attr_items
|
||||
|
||||
|
||||
class DataClassMember(ASTBase):
|
||||
'''AST node representing a data field in a class.
|
||||
@@ -374,6 +390,164 @@ void serializer<{name}{self.template_param_names_str}>::skip(Input& buf) {{
|
||||
fprintln(cout, """ });\n}""")
|
||||
|
||||
|
||||
class RpcVerbParam(ASTBase):
|
||||
"""AST element representing a single argument in an RPC verb declaration.
|
||||
Consists of:
|
||||
* Argument type
|
||||
* Argument name (optional)
|
||||
* Additional attributes (only [[version]] attribute is supported).
|
||||
|
||||
If the name is omitted, then this argument will have a placeholder name of form `_N`, where N is the index
|
||||
of the argument in the argument list for an RPC verb.
|
||||
|
||||
If the [[version]] attribute is specified, then handler function signature for an RPC verb will contain this
|
||||
argument as an `rpc::optional<>`."""
|
||||
def __init__(self, type, name, attributes=Attributes()):
|
||||
self.type = type
|
||||
self.name = name
|
||||
self.attributes = attributes
|
||||
|
||||
def __str__(self):
|
||||
return f"<RpcVerbParam(type={self.type}, name={self.name}, attributes={self.attributes})>"
|
||||
|
||||
def __repr__(self):
|
||||
return self.__str__()
|
||||
|
||||
def is_optional(self):
|
||||
return bool([a.startswith('version') for a in self.attributes.attr_items])
|
||||
|
||||
def to_string(self):
|
||||
res = self.type.to_string()
|
||||
if self.is_optional():
|
||||
res = 'rpc::optional<' + res + '>'
|
||||
if self.name:
|
||||
res += ' '
|
||||
res += self.name
|
||||
return res
|
||||
|
||||
def to_string_send_fn_signature(self):
|
||||
return self.to_string() if not self.is_optional() else self.type.to_string() + ' ' + self.name
|
||||
|
||||
|
||||
class RpcVerb(ASTBase):
|
||||
"""AST element representing an RPC verb declaration.
|
||||
|
||||
`my_verb` RPC verb declaration corresponds to the
|
||||
`netw::messaging_verb::MY_VERB` enumeration value to identify the
|
||||
new RPC verb.
|
||||
|
||||
For a given `idl_module.idl.hh` file, a registrator class named
|
||||
`idl_module_rpc_verbs` will be created if there are any RPC verbs
|
||||
registered within the IDL module file.
|
||||
|
||||
These are the methods being created for each RPC verb:
|
||||
|
||||
static void register_my_verb(netw::messaging_service* ms, std::function<return_type(args...)>&&);
|
||||
static future<> unregister_my_verb(netw::messaging_service* ms);
|
||||
static future<> send_my_verb(netw::messaging_service* ms, netw::msg_addr id, args...);
|
||||
|
||||
Each method accepts a pointer to an instance of messaging_service
|
||||
object, which contains the underlying seastar RPC protocol
|
||||
implementation, that is used to register verbs and pass messages.
|
||||
|
||||
There is also a method to unregister all verbs at once:
|
||||
|
||||
static future<> unregister(netw::messaging_service* ms);
|
||||
|
||||
The following attributes are supported when declaring an RPC verb
|
||||
in the IDL:
|
||||
|
||||
- [[with_client_info]] - the handler will contain a const reference to
|
||||
an `rpc::client_info` as the first argument.
|
||||
- [[with_timeout]] - an additional time_point parameter is supplied
|
||||
to the handler function and send* method uses send_message_*_timeout
|
||||
variant of internal function to actually send the message.
|
||||
- [[one_way]] - the handler function is annotated by
|
||||
future<rpc::no_wait_type> return type to designate that a client
|
||||
doesn't need to wait for an answer.
|
||||
|
||||
The `-> return_type` clause is optional for two-way messages. If omitted,
|
||||
the return type is set to be `future<>`.
|
||||
For one-way verbs, the use of return clause is prohibited and the
|
||||
signature of `send*` function always returns `future<>`."""
|
||||
def __init__(self, name, parameters, return_type, with_client_info, with_timeout, one_way):
|
||||
super().__init__(name)
|
||||
self.params = parameters
|
||||
self.return_type = return_type
|
||||
self.with_client_info = with_client_info
|
||||
self.with_timeout = with_timeout
|
||||
self.one_way = one_way
|
||||
|
||||
def __str__(self):
|
||||
return f"<RpcVerb(name={self.name}, params={self.params}, return_type={self.return_type}, with_client_info={self.with_client_info}, with_timeout={self.with_timeout}, one_way={self.one_way})>"
|
||||
|
||||
def __repr__(self):
|
||||
return self.__str__()
|
||||
|
||||
def send_function_name(self):
|
||||
send_fn = 'send_message'
|
||||
if self.one_way:
|
||||
send_fn += '_oneway'
|
||||
if self.with_timeout:
|
||||
send_fn += '_timeout'
|
||||
return send_fn
|
||||
|
||||
def handler_function_return_type(self):
|
||||
if self.one_way:
|
||||
return 'future<rpc::no_wait_type>'
|
||||
return f"future<{self.return_type.to_string() if self.return_type else ''}>"
|
||||
|
||||
def send_function_return_type(self):
|
||||
if self.one_way:
|
||||
return 'future<>'
|
||||
return self.handler_function_return_type()
|
||||
|
||||
def messaging_verb_enum_case(self):
|
||||
return f'netw::messaging_verb::{self.name.upper()}'
|
||||
|
||||
def handler_function_parameters_str(self):
|
||||
res = []
|
||||
if self.with_client_info:
|
||||
res.append(RpcVerbParam(type=BasicType(name='rpc::client_info&', is_const=True), name='info'))
|
||||
if self.with_timeout:
|
||||
res.append(RpcVerbParam(type=BasicType(name='rpc::opt_time_point'), name='timeout'))
|
||||
if self.params:
|
||||
res.extend(self.params)
|
||||
return ', '.join([p.to_string() for p in res])
|
||||
|
||||
def send_function_signature_params_list(self, include_placeholder_names):
|
||||
res = 'netw::messaging_service* ms, netw::msg_addr id'
|
||||
if self.with_timeout:
|
||||
res += ', netw::messaging_service::clock_type::time_point timeout'
|
||||
if self.params:
|
||||
for idx, p in enumerate(self.params):
|
||||
res += ', ' + p.to_string_send_fn_signature()
|
||||
if include_placeholder_names and not p.name:
|
||||
res += f' _{idx + 1}'
|
||||
return res
|
||||
|
||||
def send_message_argument_list(self):
|
||||
res = f'ms, '
|
||||
if self.with_timeout and self.one_way:
|
||||
# For some reason the timeout argument position in
|
||||
# `send_message_oneway_timeout` is different from `send_message_timeout`.
|
||||
res += f'timeout, {self.messaging_verb_enum_case()}, id'
|
||||
else:
|
||||
res += f'{self.messaging_verb_enum_case()}, id'
|
||||
if self.with_timeout:
|
||||
res += ', timeout'
|
||||
if self.params:
|
||||
for idx, p in enumerate(self.params):
|
||||
res += ', ' + f'std::move({p.name if p.name else f"_{idx + 1}"})'
|
||||
return res
|
||||
|
||||
def send_function_invocation(self):
|
||||
res = 'return ' + self.send_function_name()
|
||||
if not (self.one_way and self.with_timeout):
|
||||
res += '<' + self.send_function_return_type() + '>'
|
||||
res += '(' + self.send_message_argument_list() + ');'
|
||||
return res
|
||||
|
||||
class NamespaceDef(ASTBase):
|
||||
'''AST node representing a namespace scope.
|
||||
|
||||
@@ -433,13 +607,18 @@ def enum_def_parse_action(tokens):
|
||||
return EnumDef(name=tokens['name'], underlying_type=tokens['underlying_type'], members=tokens['enum_values'].asList())
|
||||
|
||||
|
||||
def attribute_parse_action(tokens):
|
||||
return Attribute(name=tokens[0])
|
||||
def attributes_parse_action(tokens):
|
||||
items = []
|
||||
for attr_clause in tokens:
|
||||
# Split individual attributes inside each attribute clause by commas and strip extra whitespace characters
|
||||
items += [arg.strip() for arg in attr_clause.split(',')]
|
||||
return Attributes(attr_items=items)
|
||||
|
||||
|
||||
def class_member_parse_action(tokens):
|
||||
member_name = tokens['name']
|
||||
attribute = tokens['attribute'][0] if 'attribute' in tokens else None
|
||||
raw_attrs = tokens['attributes']
|
||||
attribute = raw_attrs.attr_items[0] if not raw_attrs.empty() else None
|
||||
default = tokens['default'][0] if 'default' in tokens else None
|
||||
if not isinstance(member_name, str): # accessor function declaration
|
||||
return FunctionClassMember(type=tokens["type"], name=member_name[0], attribute=attribute, default_value=default)
|
||||
@@ -451,13 +630,33 @@ def class_def_parse_action(tokens):
|
||||
is_final = 'final' in tokens
|
||||
is_stub = 'stub' in tokens
|
||||
class_members = tokens['members'].asList() if 'members' in tokens else []
|
||||
attribute = tokens['attribute'][0] if 'attribute' in tokens else None
|
||||
raw_attrs = tokens['attributes']
|
||||
attribute = raw_attrs.attr_items[0] if not raw_attrs.empty() else None
|
||||
template_params = None
|
||||
if 'template' in tokens:
|
||||
template_params = [ClassTemplateParam(typename=tp[0], name=tp[1]) for tp in tokens['template']]
|
||||
return ClassDef(name=tokens['name'], members=class_members, final=is_final, stub=is_stub, attribute=attribute, template_params=template_params)
|
||||
|
||||
|
||||
def rpc_verb_param_parse_action(tokens):
|
||||
type = tokens['type']
|
||||
name = tokens['ident'] if 'ident' in tokens else None
|
||||
attrs = tokens['attrs']
|
||||
return RpcVerbParam(type=type, name=name, attributes=attrs)
|
||||
|
||||
|
||||
def rpc_verb_parse_action(tokens):
|
||||
name = tokens['name']
|
||||
raw_attrs = tokens['attributes']
|
||||
params = tokens['params'] if 'params' in tokens else []
|
||||
with_timeout = not raw_attrs.empty() and 'with_timeout' in raw_attrs.attr_items
|
||||
with_client_info = not raw_attrs.empty() and 'with_client_info' in raw_attrs.attr_items
|
||||
one_way = not raw_attrs.empty() and 'one_way' in raw_attrs.attr_items
|
||||
if one_way and 'return_type' in tokens:
|
||||
raise Exception(f"Invalid return type specification for one-way RPC verb '{name}'")
|
||||
return RpcVerb(name=name, parameters=params, return_type=tokens.get('return_type'), with_client_info=with_client_info, with_timeout=with_timeout, one_way=one_way)
|
||||
|
||||
|
||||
def namespace_parse_action(tokens):
|
||||
return NamespaceDef(name=tokens['name'], members=tokens['ns_members'].asList())
|
||||
|
||||
@@ -489,6 +688,7 @@ def parse_file(file_name):
|
||||
ns_qualified_ident = pp.delimitedList(identifier, "::", combine=True)
|
||||
enum_lit = pp.Keyword('enum').suppress()
|
||||
ns = pp.Keyword("namespace").suppress()
|
||||
verb = pp.Keyword("verb").suppress()
|
||||
|
||||
btype = ns_qualified_ident.copy()
|
||||
btype.setParseAction(basic_type_parse_action)
|
||||
@@ -514,27 +714,36 @@ def parse_file(file_name):
|
||||
content = pp.Forward()
|
||||
|
||||
attrib = lbrack - lbrack - pp.SkipTo(']') - rbrack - rbrack
|
||||
attrib.setParseAction(attribute_parse_action)
|
||||
opt_attribute = pp.Optional(attrib)("attribute")
|
||||
opt_attributes = pp.ZeroOrMore(attrib)("attributes")
|
||||
opt_attributes.setParseAction(attributes_parse_action)
|
||||
|
||||
default_value = equals - pp.SkipTo(';')
|
||||
member_name = pp.Combine(identifier - pp.Optional(lparen - rparen)("function_marker"))
|
||||
class_member = type("type") - member_name("name") - opt_attribute - pp.Optional(default_value)("default") - semi
|
||||
class_member = type("type") - member_name("name") - opt_attributes - pp.Optional(default_value)("default") - semi
|
||||
class_member.setParseAction(class_member_parse_action)
|
||||
|
||||
template_param = pp.Group(identifier("type") - identifier("name"))
|
||||
template_def = template - langle - pp.delimitedList(template_param)("params") - rangle
|
||||
class_content = pp.Forward()
|
||||
class_def = pp.Optional(template_def)("template") + (cls | struct) - ns_qualified_ident("name") - \
|
||||
pp.Optional(final)("final") - pp.Optional(stub)("stub") - opt_attribute - \
|
||||
pp.Optional(final)("final") - pp.Optional(stub)("stub") - opt_attributes - \
|
||||
lbrace - pp.ZeroOrMore(class_content)("members") - rbrace - pp.Optional(semi)
|
||||
class_content <<= enum | class_def | class_member
|
||||
class_def.setParseAction(class_def_parse_action)
|
||||
|
||||
rpc_verb_param = type("type") - pp.Optional(identifier)("ident") - opt_attributes("attrs")
|
||||
rpc_verb_param.setParseAction(rpc_verb_param_parse_action)
|
||||
rpc_verb_params = pp.delimitedList(rpc_verb_param)
|
||||
|
||||
rpc_verb = verb - opt_attributes - identifier("name") - \
|
||||
lparen.suppress() - pp.Optional(rpc_verb_params("params")) - rparen.suppress() - \
|
||||
pp.Optional(pp.Literal("->").suppress() - type("return_type")) - pp.Optional(semi)
|
||||
rpc_verb.setParseAction(rpc_verb_parse_action)
|
||||
|
||||
namespace = ns - identifier("name") - lbrace - pp.OneOrMore(content)("ns_members") - rbrace
|
||||
namespace.setParseAction(namespace_parse_action)
|
||||
|
||||
content <<= enum | class_def | namespace
|
||||
content <<= enum | class_def | rpc_verb | namespace
|
||||
|
||||
for varname in ("enum", "class_def", "class_member", "content", "namespace", "template_def"):
|
||||
locals()[varname].setName(varname)
|
||||
@@ -603,6 +812,7 @@ def flat_type(t):
|
||||
|
||||
local_types = {}
|
||||
local_writable_types = {}
|
||||
rpc_verbs = {}
|
||||
|
||||
|
||||
def resolve_basic_type_ref(type: BasicType):
|
||||
@@ -1076,13 +1286,18 @@ def register_local_type(cls):
|
||||
def register_writable_local_type(cls):
|
||||
global local_writable_types
|
||||
global stubs
|
||||
if not cls.attribute or cls.attribute.name != 'writable':
|
||||
if not cls.attribute or cls.attribute != 'writable':
|
||||
return
|
||||
local_writable_types[cls.name] = cls
|
||||
if cls.stub:
|
||||
stubs.add(cls.name)
|
||||
|
||||
|
||||
def register_rpc_verb(verb):
|
||||
global rpc_verbs
|
||||
rpc_verbs[verb.name] = verb
|
||||
|
||||
|
||||
def sort_dependencies():
|
||||
dep_tree = {}
|
||||
res = []
|
||||
@@ -1289,10 +1504,57 @@ def handle_objects(tree, hout, cout):
|
||||
handle_enum(obj, hout, cout)
|
||||
elif isinstance(obj, NamespaceDef):
|
||||
handle_objects(obj.members, hout, cout)
|
||||
elif isinstance(obj, RpcVerb):
|
||||
pass
|
||||
else:
|
||||
print(f"Unknown type: {obj}")
|
||||
|
||||
|
||||
def generate_rpc_verbs_declarations(hout, module_name):
|
||||
fprintln(hout, f"\n// RPC verbs defined in the '{module_name}' module\n")
|
||||
fprintln(hout, f'struct {module_name}_rpc_verbs {{')
|
||||
for name, verb in rpc_verbs.items():
|
||||
fprintln(hout, reindent(4, f'''static void register_{name}(netw::messaging_service* ms,
|
||||
std::function<{verb.handler_function_return_type()} ({verb.handler_function_parameters_str()})>&&);
|
||||
static future<> unregister_{name}(netw::messaging_service* ms);
|
||||
static {verb.send_function_return_type()} send_{name}({verb.send_function_signature_params_list(include_placeholder_names=False)});
|
||||
'''))
|
||||
|
||||
fprintln(hout, reindent(4, 'static future<> unregister(netw::messaging_service* ms);'))
|
||||
fprintln(hout, '};\n')
|
||||
|
||||
|
||||
def generate_rpc_verbs_definitions(cout, module_name):
|
||||
fprintln(cout, f"\n// RPC verbs defined in the '{module_name}' module")
|
||||
for name, verb in rpc_verbs.items():
|
||||
fprintln(cout, f'''
|
||||
void {module_name}_rpc_verbs::register_{name}(netw::messaging_service* ms,
|
||||
std::function<{verb.handler_function_return_type()} ({verb.handler_function_parameters_str()})>&& f) {{
|
||||
register_handler(ms, {verb.messaging_verb_enum_case()}, std::move(f));
|
||||
}}
|
||||
|
||||
future<> {module_name}_rpc_verbs::unregister_{name}(netw::messaging_service* ms) {{
|
||||
return ms->unregister_handler({verb.messaging_verb_enum_case()});
|
||||
}}
|
||||
|
||||
{verb.send_function_return_type()} {module_name}_rpc_verbs::send_{name}({verb.send_function_signature_params_list(include_placeholder_names=True)}) {{
|
||||
{verb.send_function_invocation()}
|
||||
}}''')
|
||||
|
||||
fprintln(cout, f'''
|
||||
future<> {module_name}_rpc_verbs::unregister(netw::messaging_service* ms) {{
|
||||
return when_all_succeed({', '.join([f'unregister_{v}(ms)' for v in rpc_verbs.keys()])}).discard_result();
|
||||
}}
|
||||
''')
|
||||
|
||||
|
||||
def generate_rpc_verbs(hout, cout, module_name):
|
||||
if not rpc_verbs:
|
||||
return
|
||||
generate_rpc_verbs_declarations(hout, module_name)
|
||||
generate_rpc_verbs_definitions(cout, module_name)
|
||||
|
||||
|
||||
def handle_types(tree):
|
||||
'''Traverse AST and record all locally defined types, i.e. defined in
|
||||
the currently processed IDL file.
|
||||
@@ -1301,6 +1563,8 @@ def handle_types(tree):
|
||||
if isinstance(obj, ClassDef):
|
||||
register_local_type(obj)
|
||||
register_writable_local_type(obj)
|
||||
elif isinstance(obj, RpcVerb):
|
||||
register_rpc_verb(obj)
|
||||
elif isinstance(obj, EnumDef):
|
||||
pass
|
||||
elif isinstance(obj, NamespaceDef):
|
||||
@@ -1369,6 +1633,10 @@ def load_file(name):
|
||||
setup_additional_metadata(data)
|
||||
handle_types(data)
|
||||
handle_objects(data, hout, cout)
|
||||
|
||||
module_name = os.path.basename(name)
|
||||
module_name = module_name[:module_name.find('.')]
|
||||
generate_rpc_verbs(hout, cout, module_name)
|
||||
add_visitors(cout)
|
||||
if config.ns != '':
|
||||
fprintln(hout, f"}} // {config.ns}")
|
||||
|
||||
@@ -101,40 +101,14 @@
|
||||
#include "streaming/stream_mutation_fragments_cmd.hh"
|
||||
#include "locator/snitch_base.hh"
|
||||
|
||||
#include "message/rpc_protocol_impl.hh"
|
||||
|
||||
namespace netw {
|
||||
|
||||
static_assert(!std::is_default_constructible_v<msg_addr>);
|
||||
static_assert(std::is_nothrow_copy_constructible_v<msg_addr>);
|
||||
static_assert(std::is_nothrow_move_constructible_v<msg_addr>);
|
||||
|
||||
// thunk from rpc serializers to generate serializers
|
||||
template <typename T, typename Output>
|
||||
void write(serializer, Output& out, const T& data) {
|
||||
ser::serialize(out, data);
|
||||
}
|
||||
template <typename T, typename Input>
|
||||
T read(serializer, Input& in, boost::type<T> type) {
|
||||
return ser::deserialize(in, type);
|
||||
}
|
||||
|
||||
template <typename Output, typename T>
|
||||
void write(serializer s, Output& out, const foreign_ptr<T>& v) {
|
||||
return write(s, out, *v);
|
||||
}
|
||||
template <typename Input, typename T>
|
||||
foreign_ptr<T> read(serializer s, Input& in, boost::type<foreign_ptr<T>>) {
|
||||
return make_foreign(read(s, in, boost::type<T>()));
|
||||
}
|
||||
|
||||
template <typename Output, typename T>
|
||||
void write(serializer s, Output& out, const lw_shared_ptr<T>& v) {
|
||||
return write(s, out, *v);
|
||||
}
|
||||
template <typename Input, typename T>
|
||||
lw_shared_ptr<T> read(serializer s, Input& in, boost::type<lw_shared_ptr<T>>) {
|
||||
return make_lw_shared<T>(read(s, in, boost::type<T>()));
|
||||
}
|
||||
|
||||
static logging::logger mlogger("messaging_service");
|
||||
static logging::logger rpc_logger("rpc");
|
||||
|
||||
@@ -142,7 +116,6 @@ using inet_address = gms::inet_address;
|
||||
using gossip_digest_syn = gms::gossip_digest_syn;
|
||||
using gossip_digest_ack = gms::gossip_digest_ack;
|
||||
using gossip_digest_ack2 = gms::gossip_digest_ack2;
|
||||
using rpc_protocol = rpc::protocol<serializer, messaging_verb>;
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
static rpc::lz4_fragmented_compressor::factory lz4_fragmented_compressor_factory;
|
||||
@@ -152,66 +125,6 @@ static rpc::multi_algo_compressor_factory compressor_factory {
|
||||
&lz4_compressor_factory,
|
||||
};
|
||||
|
||||
class messaging_service::rpc_protocol_wrapper {
|
||||
rpc_protocol _impl;
|
||||
public:
|
||||
explicit rpc_protocol_wrapper(serializer&& s) : _impl(std::move(s)) {}
|
||||
|
||||
rpc_protocol& protocol() { return _impl; }
|
||||
|
||||
template<typename Func>
|
||||
auto make_client(messaging_verb t) { return _impl.make_client<Func>(t); }
|
||||
|
||||
template<typename Func>
|
||||
auto register_handler(messaging_verb t, Func&& func) { return _impl.register_handler(t, std::forward<Func>(func)); }
|
||||
|
||||
template<typename Func>
|
||||
auto register_handler(messaging_verb t, scheduling_group sg, Func&& func) { return _impl.register_handler(t, sg, std::forward<Func>(func)); }
|
||||
|
||||
future<> unregister_handler(messaging_verb t) { return _impl.unregister_handler(t); }
|
||||
|
||||
void set_logger(::seastar::logger* logger) { _impl.set_logger(logger); }
|
||||
|
||||
bool has_handler(messaging_verb msg_id) { return _impl.has_handler(msg_id); }
|
||||
|
||||
bool has_handlers() const noexcept { return _impl.has_handlers(); }
|
||||
};
|
||||
|
||||
// This wrapper pretends to be rpc_protocol::client, but also handles
|
||||
// stopping it before destruction, in case it wasn't stopped already.
|
||||
// This should be integrated into messaging_service proper.
|
||||
class messaging_service::rpc_protocol_client_wrapper {
|
||||
std::unique_ptr<rpc_protocol::client> _p;
|
||||
::shared_ptr<seastar::tls::server_credentials> _credentials;
|
||||
public:
|
||||
rpc_protocol_client_wrapper(rpc_protocol& proto, rpc::client_options opts, socket_address addr, socket_address local = {})
|
||||
: _p(std::make_unique<rpc_protocol::client>(proto, std::move(opts), addr, local)) {
|
||||
}
|
||||
rpc_protocol_client_wrapper(rpc_protocol& proto, rpc::client_options opts, socket_address addr, socket_address local, ::shared_ptr<seastar::tls::server_credentials> c)
|
||||
: _p(std::make_unique<rpc_protocol::client>(proto, std::move(opts), seastar::tls::socket(c), addr, local))
|
||||
, _credentials(c)
|
||||
{}
|
||||
auto get_stats() const { return _p->get_stats(); }
|
||||
future<> stop() { return _p->stop(); }
|
||||
bool error() {
|
||||
return _p->error();
|
||||
}
|
||||
operator rpc_protocol::client&() { return *_p; }
|
||||
|
||||
/**
|
||||
* #3787 Must ensure we use the right type of socker. I.e. tls or not.
|
||||
* See above, we retain credentials object so we here can know if we
|
||||
* are tls or not.
|
||||
*/
|
||||
template<typename Serializer, typename... Out>
|
||||
future<rpc::sink<Out...>> make_stream_sink() {
|
||||
if (_credentials) {
|
||||
return _p->make_stream_sink<Serializer, Out...>(seastar::tls::socket(_credentials));
|
||||
}
|
||||
return _p->make_stream_sink<Serializer, Out...>();
|
||||
}
|
||||
};
|
||||
|
||||
struct messaging_service::rpc_protocol_server_wrapper : public rpc_protocol::server { using rpc_protocol::server::server; };
|
||||
|
||||
constexpr int32_t messaging_service::current_version;
|
||||
@@ -287,12 +200,6 @@ bool messaging_service::knows_version(const gms::inet_address& endpoint) const {
|
||||
return true;
|
||||
}
|
||||
|
||||
// Register a handler (a callback lambda) for verb
|
||||
template <typename Func>
|
||||
void register_handler(messaging_service* ms, messaging_verb verb, Func&& func) {
|
||||
ms->rpc()->register_handler(verb, ms->scheduling_group_for_verb(verb), std::move(func));
|
||||
}
|
||||
|
||||
future<> messaging_service::unregister_handler(messaging_verb verb) {
|
||||
return _rpc->unregister_handler(verb);
|
||||
}
|
||||
@@ -961,76 +868,6 @@ future<> messaging_service::unregister_repair_get_full_row_hashes_with_rpc_strea
|
||||
return unregister_handler(messaging_verb::REPAIR_GET_FULL_ROW_HASHES_WITH_RPC_STREAM);
|
||||
}
|
||||
|
||||
// Send a message for verb
|
||||
template <typename MsgIn, typename... MsgOut>
|
||||
auto send_message(messaging_service* ms, messaging_verb verb, msg_addr id, MsgOut&&... msg) {
|
||||
auto rpc_handler = ms->rpc()->make_client<MsgIn(MsgOut...)>(verb);
|
||||
if (ms->is_shutting_down()) {
|
||||
using futurator = futurize<std::result_of_t<decltype(rpc_handler)(rpc_protocol::client&, MsgOut...)>>;
|
||||
return futurator::make_exception_future(rpc::closed_error());
|
||||
}
|
||||
auto rpc_client_ptr = ms->get_rpc_client(verb, id);
|
||||
auto& rpc_client = *rpc_client_ptr;
|
||||
return rpc_handler(rpc_client, std::forward<MsgOut>(msg)...).then_wrapped([ms = ms->shared_from_this(), id, verb, rpc_client_ptr = std::move(rpc_client_ptr)] (auto&& f) {
|
||||
try {
|
||||
if (f.failed()) {
|
||||
ms->increment_dropped_messages(verb);
|
||||
f.get();
|
||||
assert(false); // never reached
|
||||
}
|
||||
return std::move(f);
|
||||
} catch (rpc::closed_error&) {
|
||||
// This is a transport error
|
||||
ms->remove_error_rpc_client(verb, id);
|
||||
throw;
|
||||
} catch (...) {
|
||||
// This is expected to be a rpc server error, e.g., the rpc handler throws a std::runtime_error.
|
||||
throw;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// TODO: Remove duplicated code in send_message
|
||||
template <typename MsgIn, typename Timeout, typename... MsgOut>
|
||||
auto send_message_timeout(messaging_service* ms, messaging_verb verb, msg_addr id, Timeout timeout, MsgOut&&... msg) {
|
||||
auto rpc_handler = ms->rpc()->make_client<MsgIn(MsgOut...)>(verb);
|
||||
if (ms->is_shutting_down()) {
|
||||
using futurator = futurize<std::result_of_t<decltype(rpc_handler)(rpc_protocol::client&, MsgOut...)>>;
|
||||
return futurator::make_exception_future(rpc::closed_error());
|
||||
}
|
||||
auto rpc_client_ptr = ms->get_rpc_client(verb, id);
|
||||
auto& rpc_client = *rpc_client_ptr;
|
||||
return rpc_handler(rpc_client, timeout, std::forward<MsgOut>(msg)...).then_wrapped([ms = ms->shared_from_this(), id, verb, rpc_client_ptr = std::move(rpc_client_ptr)] (auto&& f) {
|
||||
try {
|
||||
if (f.failed()) {
|
||||
ms->increment_dropped_messages(verb);
|
||||
f.get();
|
||||
assert(false); // never reached
|
||||
}
|
||||
return std::move(f);
|
||||
} catch (rpc::closed_error&) {
|
||||
// This is a transport error
|
||||
ms->remove_error_rpc_client(verb, id);
|
||||
throw;
|
||||
} catch (...) {
|
||||
// This is expected to be a rpc server error, e.g., the rpc handler throws a std::runtime_error.
|
||||
throw;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Send one way message for verb
|
||||
template <typename... MsgOut>
|
||||
auto send_message_oneway(messaging_service* ms, messaging_verb verb, msg_addr id, MsgOut&&... msg) {
|
||||
return send_message<rpc::no_wait_type>(ms, std::move(verb), std::move(id), std::forward<MsgOut>(msg)...);
|
||||
}
|
||||
|
||||
// Send one way message for verb
|
||||
template <typename Timeout, typename... MsgOut>
|
||||
auto send_message_oneway_timeout(messaging_service* ms, Timeout timeout, messaging_verb verb, msg_addr id, MsgOut&&... msg) {
|
||||
return send_message_timeout<rpc::no_wait_type>(ms, std::move(verb), std::move(id), timeout, std::forward<MsgOut>(msg)...);
|
||||
}
|
||||
|
||||
// Wrappers for verbs
|
||||
|
||||
// PREPARE_MESSAGE
|
||||
|
||||
184
message/rpc_protocol_impl.hh
Normal file
184
message/rpc_protocol_impl.hh
Normal file
@@ -0,0 +1,184 @@
|
||||
#include "message/messaging_service.hh"
|
||||
|
||||
#include <seastar/rpc/rpc.hh>
|
||||
|
||||
namespace netw {
|
||||
|
||||
// thunk from rpc serializers to generate serializers
|
||||
template <typename T, typename Output>
|
||||
void write(serializer, Output& out, const T& data) {
|
||||
ser::serialize(out, data);
|
||||
}
|
||||
template <typename T, typename Input>
|
||||
T read(serializer, Input& in, boost::type<T> type) {
|
||||
return ser::deserialize(in, type);
|
||||
}
|
||||
|
||||
template <typename Output, typename T>
|
||||
void write(serializer s, Output& out, const foreign_ptr<T>& v) {
|
||||
return write(s, out, *v);
|
||||
}
|
||||
template <typename Input, typename T>
|
||||
foreign_ptr<T> read(serializer s, Input& in, boost::type<foreign_ptr<T>>) {
|
||||
return make_foreign(read(s, in, boost::type<T>()));
|
||||
}
|
||||
|
||||
template <typename Output, typename T>
|
||||
void write(serializer s, Output& out, const lw_shared_ptr<T>& v) {
|
||||
return write(s, out, *v);
|
||||
}
|
||||
template <typename Input, typename T>
|
||||
lw_shared_ptr<T> read(serializer s, Input& in, boost::type<lw_shared_ptr<T>>) {
|
||||
return make_lw_shared<T>(read(s, in, boost::type<T>()));
|
||||
}
|
||||
|
||||
using rpc_protocol = rpc::protocol<serializer, messaging_verb>;
|
||||
|
||||
class messaging_service::rpc_protocol_wrapper {
|
||||
rpc_protocol _impl;
|
||||
public:
|
||||
explicit rpc_protocol_wrapper(serializer &&s) : _impl(std::move(s)) {}
|
||||
|
||||
rpc_protocol &protocol() { return _impl; }
|
||||
|
||||
template<typename Func>
|
||||
auto make_client(messaging_verb t) { return _impl.make_client<Func>(t); }
|
||||
|
||||
template<typename Func>
|
||||
auto register_handler(messaging_verb t, Func &&func) {
|
||||
return _impl.register_handler(t, std::forward<Func>(func));
|
||||
}
|
||||
|
||||
template<typename Func>
|
||||
auto register_handler(messaging_verb t, scheduling_group sg, Func &&func) {
|
||||
return _impl.register_handler(t, sg, std::forward<Func>(func));
|
||||
}
|
||||
|
||||
future<> unregister_handler(messaging_verb t) { return _impl.unregister_handler(t); }
|
||||
|
||||
void set_logger(::seastar::logger *logger) { _impl.set_logger(logger); }
|
||||
|
||||
bool has_handler(messaging_verb msg_id) { return _impl.has_handler(msg_id); }
|
||||
|
||||
bool has_handlers() const noexcept { return _impl.has_handlers(); }
|
||||
};
|
||||
|
||||
// This wrapper pretends to be rpc_protocol::client, but also handles
|
||||
// stopping it before destruction, in case it wasn't stopped already.
|
||||
// This should be integrated into messaging_service proper.
|
||||
class messaging_service::rpc_protocol_client_wrapper {
|
||||
std::unique_ptr<rpc_protocol::client> _p;
|
||||
::shared_ptr<seastar::tls::server_credentials> _credentials;
|
||||
public:
|
||||
rpc_protocol_client_wrapper(rpc_protocol &proto, rpc::client_options opts, socket_address addr,
|
||||
socket_address local = {})
|
||||
: _p(std::make_unique<rpc_protocol::client>(proto, std::move(opts), addr, local)) {
|
||||
}
|
||||
|
||||
rpc_protocol_client_wrapper(rpc_protocol &proto, rpc::client_options opts, socket_address addr,
|
||||
socket_address local, ::shared_ptr<seastar::tls::server_credentials> c)
|
||||
: _p(
|
||||
std::make_unique<rpc_protocol::client>(proto, std::move(opts), seastar::tls::socket(c), addr, local)),
|
||||
_credentials(c) {}
|
||||
|
||||
auto get_stats() const { return _p->get_stats(); }
|
||||
|
||||
future<> stop() { return _p->stop(); }
|
||||
|
||||
bool error() {
|
||||
return _p->error();
|
||||
}
|
||||
|
||||
operator rpc_protocol::client &() { return *_p; }
|
||||
|
||||
/**
|
||||
* #3787 Must ensure we use the right type of socket. I.e. tls or not.
|
||||
* See above, we retain credentials object so we here can know if we
|
||||
* are tls or not.
|
||||
*/
|
||||
template<typename Serializer, typename... Out>
|
||||
future<rpc::sink<Out...>> make_stream_sink() {
|
||||
if (_credentials) {
|
||||
return _p->make_stream_sink<Serializer, Out...>(seastar::tls::socket(_credentials));
|
||||
}
|
||||
return _p->make_stream_sink<Serializer, Out...>();
|
||||
}
|
||||
};
|
||||
|
||||
// Register a handler (a callback lambda) for verb
|
||||
template<typename Func>
|
||||
void register_handler(messaging_service *ms, messaging_verb verb, Func &&func) {
|
||||
ms->rpc()->register_handler(verb, ms->scheduling_group_for_verb(verb), std::move(func));
|
||||
}
|
||||
|
||||
// Send a message for verb
|
||||
template <typename MsgIn, typename... MsgOut>
|
||||
auto send_message(messaging_service* ms, messaging_verb verb, msg_addr id, MsgOut&&... msg) {
|
||||
auto rpc_handler = ms->rpc()->make_client<MsgIn(MsgOut...)>(verb);
|
||||
if (ms->is_shutting_down()) {
|
||||
using futurator = futurize<std::result_of_t<decltype(rpc_handler)(rpc_protocol::client&, MsgOut...)>>;
|
||||
return futurator::make_exception_future(rpc::closed_error());
|
||||
}
|
||||
auto rpc_client_ptr = ms->get_rpc_client(verb, id);
|
||||
auto& rpc_client = *rpc_client_ptr;
|
||||
return rpc_handler(rpc_client, std::forward<MsgOut>(msg)...).then_wrapped([ms = ms->shared_from_this(), id, verb, rpc_client_ptr = std::move(rpc_client_ptr)] (auto&& f) {
|
||||
try {
|
||||
if (f.failed()) {
|
||||
ms->increment_dropped_messages(verb);
|
||||
f.get();
|
||||
assert(false); // never reached
|
||||
}
|
||||
return std::move(f);
|
||||
} catch (rpc::closed_error&) {
|
||||
// This is a transport error
|
||||
ms->remove_error_rpc_client(verb, id);
|
||||
throw;
|
||||
} catch (...) {
|
||||
// This is expected to be a rpc server error, e.g., the rpc handler throws a std::runtime_error.
|
||||
throw;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// TODO: Remove duplicated code in send_message
|
||||
template <typename MsgIn, typename Timeout, typename... MsgOut>
|
||||
auto send_message_timeout(messaging_service* ms, messaging_verb verb, msg_addr id, Timeout timeout, MsgOut&&... msg) {
|
||||
auto rpc_handler = ms->rpc()->make_client<MsgIn(MsgOut...)>(verb);
|
||||
if (ms->is_shutting_down()) {
|
||||
using futurator = futurize<std::result_of_t<decltype(rpc_handler)(rpc_protocol::client&, MsgOut...)>>;
|
||||
return futurator::make_exception_future(rpc::closed_error());
|
||||
}
|
||||
auto rpc_client_ptr = ms->get_rpc_client(verb, id);
|
||||
auto& rpc_client = *rpc_client_ptr;
|
||||
return rpc_handler(rpc_client, timeout, std::forward<MsgOut>(msg)...).then_wrapped([ms = ms->shared_from_this(), id, verb, rpc_client_ptr = std::move(rpc_client_ptr)] (auto&& f) {
|
||||
try {
|
||||
if (f.failed()) {
|
||||
ms->increment_dropped_messages(verb);
|
||||
f.get();
|
||||
assert(false); // never reached
|
||||
}
|
||||
return std::move(f);
|
||||
} catch (rpc::closed_error&) {
|
||||
// This is a transport error
|
||||
ms->remove_error_rpc_client(verb, id);
|
||||
throw;
|
||||
} catch (...) {
|
||||
// This is expected to be a rpc server error, e.g., the rpc handler throws a std::runtime_error.
|
||||
throw;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Send one way message for verb
|
||||
template <typename... MsgOut>
|
||||
auto send_message_oneway(messaging_service* ms, messaging_verb verb, msg_addr id, MsgOut&&... msg) {
|
||||
return send_message<rpc::no_wait_type>(ms, std::move(verb), std::move(id), std::forward<MsgOut>(msg)...);
|
||||
}
|
||||
|
||||
// Send one way message for verb
|
||||
template <typename Timeout, typename... MsgOut>
|
||||
auto send_message_oneway_timeout(messaging_service* ms, Timeout timeout, messaging_verb verb, msg_addr id, MsgOut&&... msg) {
|
||||
return send_message_timeout<rpc::no_wait_type>(ms, std::move(verb), std::move(id), timeout, std::forward<MsgOut>(msg)...);
|
||||
}
|
||||
|
||||
} // namespace netw
|
||||
Reference in New Issue
Block a user