From b91f7e9ec4ff9a5dcc6ca5d27c3f1015910c1084 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Fri, 1 Jul 2022 15:52:40 +0300 Subject: [PATCH] snitch, storage_service: Move reconnect to internal_ip kick The same thing as in previous patch -- when gossiper issues on_join/_change notification, storage service can kick messaging service to update its internal_ip cache and reconnect to the peer. Signed-off-by: Pavel Emelyanov --- locator/production_snitch_base.cc | 17 +---------------- service/storage_service.cc | 18 ++++++++++++++++++ service/storage_service.hh | 1 + 3 files changed, 20 insertions(+), 16 deletions(-) diff --git a/locator/production_snitch_base.cc b/locator/production_snitch_base.cc index 9ac4dabe0c..351a810e4b 100644 --- a/locator/production_snitch_base.cc +++ b/locator/production_snitch_base.cc @@ -201,22 +201,7 @@ future<> reconnectable_snitch_helper::reconnect(gms::inet_address public_address } future<> reconnectable_snitch_helper::reconnect(gms::inet_address public_address, gms::inet_address local_address) { - auto& sn_ptr = locator::i_endpoint_snitch::get_local_snitch_ptr(); - netw::messaging_service& ms = sn_ptr.get_local_gossiper().get_local_messaging(); - - if (sn_ptr->get_datacenter(public_address) == _local_dc && - ms.get_preferred_ip(public_address) != local_address) { - // - // ...then update messaging_service cache and reset the currently - // open connections to this endpoint on all shards... - // - co_await ms.container().invoke_on_all([public_address, local_address] (auto& local_ms) { - local_ms.cache_preferred_ip(public_address, local_address); - local_ms.remove_rpc_client(netw::msg_addr(public_address)); - }); - - logger().debug("Initiated reconnect to an Internal IP {} for the {}", local_address, public_address); - } + return make_ready_future<>(); } reconnectable_snitch_helper::reconnectable_snitch_helper(sstring local_dc) diff --git a/service/storage_service.cc b/service/storage_service.cc index acebaf4ac4..7c74fc75c6 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -1186,11 +1186,29 @@ future<> storage_service::on_change(inet_address endpoint, application_state sta if (state == application_state::RPC_READY) { slogger.debug("Got application_state::RPC_READY for node {}, is_cql_ready={}", endpoint, ep_state->is_cql_ready()); co_await notify_cql_change(endpoint, ep_state->is_cql_ready()); + } else if (state == application_state::INTERNAL_IP) { + co_await maybe_reconnect_to_preferred_ip(endpoint, inet_address(value.value)); } } } } +future<> storage_service::maybe_reconnect_to_preferred_ip(inet_address ep, inet_address local_ip) { + auto& snitch = locator::i_endpoint_snitch::get_local_snitch_ptr(); + if (!snitch->prefer_local()) { + co_return; + } + + const auto& topo = get_token_metadata().get_topology(); + if (topo.get_datacenter() == topo.get_datacenter(ep) && _messaging.local().get_preferred_ip(ep) != local_ip) { + slogger.debug("Initiated reconnect to an Internal IP {} for the {}", local_ip, ep); + co_await _messaging.invoke_on_all([ep, local_ip] (auto& local_ms) { + local_ms.cache_preferred_ip(ep, local_ip); + local_ms.remove_rpc_client(netw::msg_addr(ep)); + }); + } +} + future<> storage_service::on_remove(gms::inet_address endpoint) { slogger.debug("endpoint={} on_remove", endpoint); diff --git a/service/storage_service.hh b/service/storage_service.hh index d4ae660af6..f43ba28982 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -611,6 +611,7 @@ private: // needs to be modified to accept either a keyspace or ARS. future> get_changed_ranges_for_leaving(sstring keyspace_name, inet_address endpoint); + future<> maybe_reconnect_to_preferred_ip(inet_address ep, inet_address local_ip); public: sstring get_release_version();