From 1b6e1456e59ba75d499f45dddf31dc0fc1f5ee60 Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Wed, 18 Dec 2024 18:07:49 +0200 Subject: [PATCH] messaging_service: drop the usage of ip based token_metadata APIs We want to drop ips from token_metadata so move to use host id based counterparts. Messaging service gets a function that maps from ips to id when is starts listening. --- main.cc | 13 ++++++++++++- message/messaging_service.cc | 21 +++++++++++++-------- message/messaging_service.hh | 6 ++++-- test/lib/cql_test_env.cc | 2 +- test/manual/message.cc | 2 +- 5 files changed, 31 insertions(+), 13 deletions(-) diff --git a/main.cc b/main.cc index 8ccb6d846c..700cb6d83f 100644 --- a/main.cc +++ b/main.cc @@ -2118,7 +2118,18 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl group0_service.setup_group0_if_exist(sys_ks.local(), ss.local(), qp.local(), mm.local()).get(); with_scheduling_group(maintenance_scheduling_group, [&] { - return messaging.invoke_on_all(&netw::messaging_service::start_listen, std::ref(token_metadata)); + return messaging.invoke_on_all([&] (auto& ms) { + return ms.start_listen(token_metadata.local(), [&gossiper] (gms::inet_address ip) { + if (ip == gossiper.local().get_broadcast_address()) { + return gossiper.local().my_host_id(); + } + try { + return gossiper.local().get_host_id(ip); + } catch (...) { + return locator::host_id{}; + } + }); + }); }).get(); const auto generation_number = gms::generation_type(sys_ks.local().increment_and_get_generation().get()); diff --git a/message/messaging_service.cc b/message/messaging_service.cc index 22b6e35eab..8dc7ce6e9e 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -299,8 +299,9 @@ future<> messaging_service::start() { return make_ready_future<>(); } -future<> messaging_service::start_listen(locator::shared_token_metadata& stm) { +future<> messaging_service::start_listen(locator::shared_token_metadata& stm, std::function address_to_host_id_mapper) { _token_metadata = &stm; + _address_to_host_id_mapper = std::move(address_to_host_id_mapper); do_start_listen(); return make_ready_future<>(); } @@ -308,20 +309,21 @@ future<> messaging_service::start_listen(locator::shared_token_metadata& stm) { bool messaging_service::topology_known_for(inet_address addr) const { // The token metadata pointer is nullptr before // the service is start_listen()-ed and after it's being shutdown()-ed. + const locator::node* node; return _token_metadata - && _token_metadata->get()->get_topology().has_endpoint(addr); + && (node = _token_metadata->get()->get_topology().find_node(_address_to_host_id_mapper(addr))) && !node->is_none(); } // Precondition: `topology_known_for(addr)`. bool messaging_service::is_same_dc(inet_address addr) const { const auto& topo = _token_metadata->get()->get_topology(); - return topo.get_datacenter(addr) == topo.get_datacenter(); + return topo.get_datacenter(_address_to_host_id_mapper(addr)) == topo.get_datacenter(); } // Precondition: `topology_known_for(addr)`. bool messaging_service::is_same_rack(inet_address addr) const { const auto& topo = _token_metadata->get()->get_topology(); - return topo.get_rack(addr) == topo.get_rack(); + return topo.get_rack(_address_to_host_id_mapper(addr)) == topo.get_rack(); } // The socket metrics domain defines the way RPC metrics are grouped @@ -334,12 +336,15 @@ bool messaging_service::is_same_rack(inet_address addr) const { // that the isolation cookie suits very well, because these cookies // are different for different indices and are more informative than // plain numbers -sstring messaging_service::client_metrics_domain(unsigned idx, inet_address addr) const { +sstring messaging_service::client_metrics_domain(unsigned idx, inet_address addr, std::optional id) const { sstring ret = _scheduling_info_for_connection_index[idx].isolation_cookie; + if (!id) { + id = _address_to_host_id_mapper(addr); + } if (_token_metadata) { const auto& topo = _token_metadata->get()->get_topology(); - if (topo.has_endpoint(addr)) { - ret += ":" + topo.get_datacenter(addr); + if (topo.has_node(*id)) { + ret += ":" + topo.get_datacenter(*id); } } return ret; @@ -1106,7 +1111,7 @@ shared_ptr messaging_service::ge opts.tcp_nodelay = must_tcp_nodelay; opts.reuseaddr = true; opts.isolation_cookie = _scheduling_info_for_connection_index[idx].isolation_cookie; - opts.metrics_domain = client_metrics_domain(idx, id.addr); // not just `addr` as the latter may be internal IP + opts.metrics_domain = client_metrics_domain(idx, id.addr, host_id); // not just `addr` as the latter may be internal IP SCYLLA_ASSERT(!must_encrypt || _credentials); diff --git a/message/messaging_service.hh b/message/messaging_service.hh index 873aa83c90..083caee6ea 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -337,6 +337,8 @@ private: private: config _cfg; locator::shared_token_metadata* _token_metadata = nullptr; + // a function that maps from ip to host id if known (returns default constructable host_id if there is no mapping) + std::function _address_to_host_id_mapper; // map: Node broadcast address -> Node internal IP, and the reversed mapping, for communication within the same data center std::unordered_map _preferred_ip_cache, _preferred_to_endpoint; std::unique_ptr _rpc; @@ -378,7 +380,7 @@ public: ~messaging_service(); future<> start(); - future<> start_listen(locator::shared_token_metadata& stm); + future<> start_listen(locator::shared_token_metadata& stm, std::function address_to_host_id_mapper); uint16_t port() const noexcept { return _cfg.port; } @@ -455,7 +457,7 @@ private: bool is_host_banned(locator::host_id); - sstring client_metrics_domain(unsigned idx, inet_address addr) const; + sstring client_metrics_domain(unsigned idx, inet_address addr, std::optional id) const; public: // Return rpc::protocol::client for a shard which is a ip + cpuid pair. diff --git a/test/lib/cql_test_env.cc b/test/lib/cql_test_env.cc index eae4ffc455..82f5971340 100644 --- a/test/lib/cql_test_env.cc +++ b/test/lib/cql_test_env.cc @@ -755,7 +755,7 @@ private: // Once the seastar issue is fixed, we can just keep the tmp socket aliva across // the listen invoke below. tmp = {}; - _ms.invoke_on_all(&netw::messaging_service::start_listen, std::ref(_token_metadata)).get(); + _ms.invoke_on_all(&netw::messaging_service::start_listen, std::ref(_token_metadata), [host_id] (gms::inet_address ip) {return host_id; }).get(); } } catch (std::system_error& e) { // if we still hit a used port (quick other process), just shut down ms and try again. diff --git a/test/manual/message.cc b/test/manual/message.cc index 7e153edd3e..d32a9891a8 100644 --- a/test/manual/message.cc +++ b/test/manual/message.cc @@ -224,7 +224,7 @@ int main(int ac, char ** av) { auto deinit_testers = deferred_action([&testers] { testers.invoke_on_all(&tester::deinit_handler).get(); }); - messaging.invoke_on_all(&netw::messaging_service::start_listen, std::ref(token_metadata)).get(); + messaging.invoke_on_all(&netw::messaging_service::start_listen, std::ref(token_metadata), [] (gms::inet_address ip){ return locator::host_id{}; }).get(); if (config.contains("server")) { auto ip = config["server"].as(); auto cpuid = config["cpuid"].as();