messaging_service: pass gossip_address_map to the mm and introduce send by id functions

The function looks up provided host id in gossip_address_map and throws
unknown_address if the mapping is not available. Otherwise it sends the
message by IP found.
This commit is contained in:
Gleb Natapov
2024-09-11 12:57:51 +03:00
parent 0e264ccba9
commit 76aa41dfcf
7 changed files with 57 additions and 9 deletions

View File

@@ -1480,7 +1480,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
}
// Delay listening messaging_service until gossip message handlers are registered
messaging.start(mscfg, scfg, creds, std::ref(feature_service)).get();
messaging.start(mscfg, scfg, creds, std::ref(feature_service), std::ref(gossip_address_map)).get();
auto stop_ms = defer_verbose_shutdown("messaging service", [&messaging] {
messaging.invoke_on_all(&netw::messaging_service::stop).get();
});

View File

@@ -233,9 +233,9 @@ future<> messaging_service::unregister_handler(messaging_verb verb) {
return _rpc->unregister_handler(verb);
}
messaging_service::messaging_service(locator::host_id id, gms::inet_address ip, uint16_t port, gms::feature_service& feature_service)
messaging_service::messaging_service(locator::host_id id, gms::inet_address ip, uint16_t port, gms::feature_service& feature_service, gms::gossip_address_map& address_map)
: messaging_service(config{std::move(id), ip, ip, port},
scheduling_config{{{{}, "$default"}}, {}, {}}, nullptr, feature_service)
scheduling_config{{{{}, "$default"}}, {}, {}}, nullptr, feature_service, address_map)
{}
static
@@ -423,7 +423,8 @@ void messaging_service::do_start_listen() {
}
}
messaging_service::messaging_service(config cfg, scheduling_config scfg, std::shared_ptr<seastar::tls::credentials_builder> credentials, gms::feature_service& feature_service)
messaging_service::messaging_service(config cfg, scheduling_config scfg, std::shared_ptr<seastar::tls::credentials_builder> credentials, gms::feature_service& feature_service,
gms::gossip_address_map& address_map)
: _cfg(std::move(cfg))
, _rpc(new rpc_protocol_wrapper(serializer { }))
, _credentials_builder(credentials ? std::make_unique<seastar::tls::credentials_builder>(*credentials) : nullptr)
@@ -431,6 +432,7 @@ messaging_service::messaging_service(config cfg, scheduling_config scfg, std::sh
, _scheduling_config(scfg)
, _scheduling_info_for_connection_index(initial_scheduling_info())
, _feature_service(feature_service)
, _address_map(address_map)
{
_rpc->set_logger(&rpc_logger);
@@ -679,6 +681,14 @@ static constexpr std::array<uint8_t, static_cast<size_t>(messaging_verb::LAST)>
static std::array<uint8_t, static_cast<size_t>(messaging_verb::LAST)> s_rpc_client_idx_table = make_rpc_client_idx_table();
msg_addr messaging_service::addr_for_host_id(locator::host_id hid) {
auto opt_ip = _address_map.find(hid);
if (!opt_ip) {
throw unknown_address(hid);
}
return msg_addr{*opt_ip, 0};
}
unsigned
messaging_service::get_rpc_client_idx(messaging_verb verb) const {
auto idx = s_rpc_client_idx_table[static_cast<size_t>(verb)];

View File

@@ -23,6 +23,7 @@
#include "locator/host_id.hh"
#include "service/session.hh"
#include "service/maintenance_mode.hh"
#include "gms/gossip_address_map.hh"
#include "tasks/types.hh"
#include <list>
@@ -230,6 +231,10 @@ struct schema_pull_options {
bool group0_snapshot_transfer = false;
};
struct unknown_address : public std::runtime_error {
unknown_address(locator::host_id id) : std::runtime_error(fmt::format("no ip address mapping for {}", id)) {}
};
class messaging_service : public seastar::async_sharded_service<messaging_service>, public peering_sharded_service<messaging_service> {
public:
struct rpc_protocol_wrapper;
@@ -346,6 +351,7 @@ private:
struct connection_ref;
std::unordered_multimap<locator::host_id, connection_ref> _host_connections;
std::unordered_set<locator::host_id> _banned_hosts;
gms::gossip_address_map& _address_map;
future<> shutdown_tls_server();
future<> shutdown_nontls_server();
@@ -356,8 +362,8 @@ private:
public:
using clock_type = lowres_clock;
messaging_service(locator::host_id id, gms::inet_address ip, uint16_t port, gms::feature_service& feature_service);
messaging_service(config cfg, scheduling_config scfg, std::shared_ptr<seastar::tls::credentials_builder>, gms::feature_service& feature_service);
messaging_service(locator::host_id id, gms::inet_address ip, uint16_t port, gms::feature_service& feature_service, gms::gossip_address_map& address_map);
messaging_service(config cfg, scheduling_config scfg, std::shared_ptr<seastar::tls::credentials_builder>, gms::feature_service& feature_service, gms::gossip_address_map& address_map);
~messaging_service();
future<> start();
@@ -493,6 +499,8 @@ public:
// No further RPC handlers will be called for that node,
// but we don't prevent handlers that were started concurrently from finishing.
future<> ban_host(locator::host_id);
msg_addr addr_for_host_id(locator::host_id hid);
private:
template <typename Fn>
requires std::is_invocable_r_v<bool, Fn, const shard_info&>

View File

@@ -144,6 +144,12 @@ auto send_message(messaging_service* ms, messaging_verb verb, msg_addr id, MsgOu
});
}
// Send a message for verb
template <typename MsgIn, typename... MsgOut>
auto send_message(messaging_service* ms, messaging_verb verb, locator::host_id hid, MsgOut&&... msg) {
return send_message<MsgIn, MsgOut...>(ms, verb, ms->addr_for_host_id(hid), std::forward<MsgOut>(msg)...);
}
// TODO: Remove duplicated code in send_message
template <typename MsgIn, typename Timeout, typename... MsgOut>
auto send_message_timeout(messaging_service* ms, messaging_verb verb, msg_addr id, Timeout timeout, MsgOut&&... msg) {
@@ -167,6 +173,12 @@ auto send_message_timeout(messaging_service* ms, messaging_verb verb, msg_addr i
});
}
// Send a message for verb
template <typename MsgIn, typename... MsgOut>
auto send_message_timeout(messaging_service* ms, messaging_verb verb, locator::host_id hid, MsgOut&&... msg) {
return send_message_timeout<MsgIn, MsgOut...>(ms, verb, ms->addr_for_host_id(hid), std::forward<MsgOut>(msg)...);
}
// Requesting abort on the provided abort_source drops the message from the outgoing queue (if it's still there)
// and causes the returned future to resolve exceptionally with `abort_requested_exception`.
// TODO: Remove duplicated code in send_message
@@ -205,6 +217,11 @@ auto send_message_cancellable(messaging_service* ms, messaging_verb verb, msg_ad
});
}
template <typename MsgIn, typename... MsgOut>
auto send_message_cancellable(messaging_service* ms, messaging_verb verb, locator::host_id id, abort_source& as, MsgOut&&... msg) {
return send_message_cancellable<MsgIn, MsgOut...>(ms, verb, ms->addr_for_host_id(id), as, std::forward<MsgOut>(msg)...);
}
// Send one way message for verb
template <typename... MsgOut>
auto send_message_oneway(messaging_service* ms, messaging_verb verb, msg_addr id, MsgOut&&... msg) {
@@ -217,4 +234,15 @@ auto send_message_oneway_timeout(messaging_service* ms, messaging_verb verb, msg
return send_message_timeout<rpc::no_wait_type>(ms, std::move(verb), std::move(id), timeout, std::forward<MsgOut>(msg)...);
}
template <typename... MsgOut>
auto send_message_oneway(messaging_service* ms, messaging_verb verb, locator::host_id id, MsgOut&&... msg) {
return send_message<rpc::no_wait_type>(ms, std::move(verb), std::move(id), std::forward<MsgOut>(msg)...);
}
// Send one way message for verb
template <typename Timeout, typename... MsgOut>
auto send_message_oneway_timeout(messaging_service* ms, messaging_verb verb, locator::host_id id, Timeout timeout, MsgOut&&... msg) {
return send_message_timeout<rpc::no_wait_type>(ms, std::move(verb), std::move(id), timeout, std::forward<MsgOut>(msg)...);
}
} // namespace netw

View File

@@ -721,7 +721,7 @@ private:
port = tmp.local_address().port();
}
// Don't start listening so tests can be run in parallel if cfg_in.ms_listen is not set to true explicitly.
_ms.start(host_id, listen, std::move(port), std::ref(_feature_service)).get();
_ms.start(host_id, listen, std::move(port), std::ref(_feature_service), std::ref(_gossip_address_map)).get();
stop_ms = defer(stop_type(stop_ms_func));
if (cfg_in.ms_listen) {

View File

@@ -77,7 +77,7 @@ int main(int ac, char ** av) {
gossip_address_map.start().get();
messaging.start(locator::host_id{}, listen, 7000, std::ref(feature_service)).get();
messaging.start(locator::host_id{}, listen, 7000, std::ref(feature_service), std::ref(gossip_address_map)).get();
auto stop_messaging = deferred_stop(messaging);
gms::gossip_config gcfg;

View File

@@ -196,8 +196,10 @@ int main(int ac, char ** av) {
seastar::sharded<gms::feature_service> feature_service;
auto cfg = gms::feature_config_from_db_config(db::config(), {});
feature_service.start(cfg).get();
seastar::sharded<gms::gossip_address_map> gossip_address_map;
gossip_address_map.start().get();
seastar::sharded<netw::messaging_service> messaging;
messaging.start(locator::host_id{}, listen, 7000, std::ref(feature_service)).get();
messaging.start(locator::host_id{}, listen, 7000, std::ref(feature_service), std::ref(gossip_address_map)).get();
auto stop_messaging = deferred_stop(messaging);
seastar::sharded<tester> testers;
testers.start(std::ref(messaging)).get();