messaging_service: drop the usage of ip based token_metadata APIs
We want to drop ips from token_metadata so move to use host id based counterparts. Messaging service gets a function that maps from ips to id when is starts listening.
This commit is contained in:
13
main.cc
13
main.cc
@@ -2118,7 +2118,18 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
group0_service.setup_group0_if_exist(sys_ks.local(), ss.local(), qp.local(), mm.local()).get();
|
||||
|
||||
with_scheduling_group(maintenance_scheduling_group, [&] {
|
||||
return messaging.invoke_on_all(&netw::messaging_service::start_listen, std::ref(token_metadata));
|
||||
return messaging.invoke_on_all([&] (auto& ms) {
|
||||
return ms.start_listen(token_metadata.local(), [&gossiper] (gms::inet_address ip) {
|
||||
if (ip == gossiper.local().get_broadcast_address()) {
|
||||
return gossiper.local().my_host_id();
|
||||
}
|
||||
try {
|
||||
return gossiper.local().get_host_id(ip);
|
||||
} catch (...) {
|
||||
return locator::host_id{};
|
||||
}
|
||||
});
|
||||
});
|
||||
}).get();
|
||||
|
||||
const auto generation_number = gms::generation_type(sys_ks.local().increment_and_get_generation().get());
|
||||
|
||||
@@ -299,8 +299,9 @@ future<> messaging_service::start() {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
future<> messaging_service::start_listen(locator::shared_token_metadata& stm) {
|
||||
future<> messaging_service::start_listen(locator::shared_token_metadata& stm, std::function<locator::host_id(gms::inet_address)> address_to_host_id_mapper) {
|
||||
_token_metadata = &stm;
|
||||
_address_to_host_id_mapper = std::move(address_to_host_id_mapper);
|
||||
do_start_listen();
|
||||
return make_ready_future<>();
|
||||
}
|
||||
@@ -308,20 +309,21 @@ future<> messaging_service::start_listen(locator::shared_token_metadata& stm) {
|
||||
bool messaging_service::topology_known_for(inet_address addr) const {
|
||||
// The token metadata pointer is nullptr before
|
||||
// the service is start_listen()-ed and after it's being shutdown()-ed.
|
||||
const locator::node* node;
|
||||
return _token_metadata
|
||||
&& _token_metadata->get()->get_topology().has_endpoint(addr);
|
||||
&& (node = _token_metadata->get()->get_topology().find_node(_address_to_host_id_mapper(addr))) && !node->is_none();
|
||||
}
|
||||
|
||||
// Precondition: `topology_known_for(addr)`.
|
||||
bool messaging_service::is_same_dc(inet_address addr) const {
|
||||
const auto& topo = _token_metadata->get()->get_topology();
|
||||
return topo.get_datacenter(addr) == topo.get_datacenter();
|
||||
return topo.get_datacenter(_address_to_host_id_mapper(addr)) == topo.get_datacenter();
|
||||
}
|
||||
|
||||
// Precondition: `topology_known_for(addr)`.
|
||||
bool messaging_service::is_same_rack(inet_address addr) const {
|
||||
const auto& topo = _token_metadata->get()->get_topology();
|
||||
return topo.get_rack(addr) == topo.get_rack();
|
||||
return topo.get_rack(_address_to_host_id_mapper(addr)) == topo.get_rack();
|
||||
}
|
||||
|
||||
// The socket metrics domain defines the way RPC metrics are grouped
|
||||
@@ -334,12 +336,15 @@ bool messaging_service::is_same_rack(inet_address addr) const {
|
||||
// that the isolation cookie suits very well, because these cookies
|
||||
// are different for different indices and are more informative than
|
||||
// plain numbers
|
||||
sstring messaging_service::client_metrics_domain(unsigned idx, inet_address addr) const {
|
||||
sstring messaging_service::client_metrics_domain(unsigned idx, inet_address addr, std::optional<locator::host_id> id) const {
|
||||
sstring ret = _scheduling_info_for_connection_index[idx].isolation_cookie;
|
||||
if (!id) {
|
||||
id = _address_to_host_id_mapper(addr);
|
||||
}
|
||||
if (_token_metadata) {
|
||||
const auto& topo = _token_metadata->get()->get_topology();
|
||||
if (topo.has_endpoint(addr)) {
|
||||
ret += ":" + topo.get_datacenter(addr);
|
||||
if (topo.has_node(*id)) {
|
||||
ret += ":" + topo.get_datacenter(*id);
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
@@ -1106,7 +1111,7 @@ shared_ptr<messaging_service::rpc_protocol_client_wrapper> messaging_service::ge
|
||||
opts.tcp_nodelay = must_tcp_nodelay;
|
||||
opts.reuseaddr = true;
|
||||
opts.isolation_cookie = _scheduling_info_for_connection_index[idx].isolation_cookie;
|
||||
opts.metrics_domain = client_metrics_domain(idx, id.addr); // not just `addr` as the latter may be internal IP
|
||||
opts.metrics_domain = client_metrics_domain(idx, id.addr, host_id); // not just `addr` as the latter may be internal IP
|
||||
|
||||
SCYLLA_ASSERT(!must_encrypt || _credentials);
|
||||
|
||||
|
||||
@@ -337,6 +337,8 @@ private:
|
||||
private:
|
||||
config _cfg;
|
||||
locator::shared_token_metadata* _token_metadata = nullptr;
|
||||
// a function that maps from ip to host id if known (returns default constructable host_id if there is no mapping)
|
||||
std::function<locator::host_id(gms::inet_address)> _address_to_host_id_mapper;
|
||||
// map: Node broadcast address -> Node internal IP, and the reversed mapping, for communication within the same data center
|
||||
std::unordered_map<gms::inet_address, gms::inet_address> _preferred_ip_cache, _preferred_to_endpoint;
|
||||
std::unique_ptr<rpc_protocol_wrapper> _rpc;
|
||||
@@ -378,7 +380,7 @@ public:
|
||||
~messaging_service();
|
||||
|
||||
future<> start();
|
||||
future<> start_listen(locator::shared_token_metadata& stm);
|
||||
future<> start_listen(locator::shared_token_metadata& stm, std::function<locator::host_id(gms::inet_address)> address_to_host_id_mapper);
|
||||
uint16_t port() const noexcept {
|
||||
return _cfg.port;
|
||||
}
|
||||
@@ -455,7 +457,7 @@ private:
|
||||
|
||||
bool is_host_banned(locator::host_id);
|
||||
|
||||
sstring client_metrics_domain(unsigned idx, inet_address addr) const;
|
||||
sstring client_metrics_domain(unsigned idx, inet_address addr, std::optional<locator::host_id> id) const;
|
||||
|
||||
public:
|
||||
// Return rpc::protocol::client for a shard which is a ip + cpuid pair.
|
||||
|
||||
@@ -755,7 +755,7 @@ private:
|
||||
// Once the seastar issue is fixed, we can just keep the tmp socket aliva across
|
||||
// the listen invoke below.
|
||||
tmp = {};
|
||||
_ms.invoke_on_all(&netw::messaging_service::start_listen, std::ref(_token_metadata)).get();
|
||||
_ms.invoke_on_all(&netw::messaging_service::start_listen, std::ref(_token_metadata), [host_id] (gms::inet_address ip) {return host_id; }).get();
|
||||
}
|
||||
} catch (std::system_error& e) {
|
||||
// if we still hit a used port (quick other process), just shut down ms and try again.
|
||||
|
||||
@@ -224,7 +224,7 @@ int main(int ac, char ** av) {
|
||||
auto deinit_testers = deferred_action([&testers] {
|
||||
testers.invoke_on_all(&tester::deinit_handler).get();
|
||||
});
|
||||
messaging.invoke_on_all(&netw::messaging_service::start_listen, std::ref(token_metadata)).get();
|
||||
messaging.invoke_on_all(&netw::messaging_service::start_listen, std::ref(token_metadata), [] (gms::inet_address ip){ return locator::host_id{}; }).get();
|
||||
if (config.contains("server")) {
|
||||
auto ip = config["server"].as<std::string>();
|
||||
auto cpuid = config["cpuid"].as<uint32_t>();
|
||||
|
||||
Reference in New Issue
Block a user