snitch, storage_service: Move reconnect to internal_ip kick

The same thing as in previous patch -- when gossiper issues
on_join/_change notification, storage service can kick messaging
service to update its internal_ip cache and reconnect to the peer.

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
This commit is contained in:
Pavel Emelyanov
2022-07-01 15:52:40 +03:00
parent 1bf8b0dd92
commit b91f7e9ec4
3 changed files with 20 additions and 16 deletions

View File

@@ -201,22 +201,7 @@ future<> reconnectable_snitch_helper::reconnect(gms::inet_address public_address
}
future<> reconnectable_snitch_helper::reconnect(gms::inet_address public_address, gms::inet_address local_address) {
auto& sn_ptr = locator::i_endpoint_snitch::get_local_snitch_ptr();
netw::messaging_service& ms = sn_ptr.get_local_gossiper().get_local_messaging();
if (sn_ptr->get_datacenter(public_address) == _local_dc &&
ms.get_preferred_ip(public_address) != local_address) {
//
// ...then update messaging_service cache and reset the currently
// open connections to this endpoint on all shards...
//
co_await ms.container().invoke_on_all([public_address, local_address] (auto& local_ms) {
local_ms.cache_preferred_ip(public_address, local_address);
local_ms.remove_rpc_client(netw::msg_addr(public_address));
});
logger().debug("Initiated reconnect to an Internal IP {} for the {}", local_address, public_address);
}
return make_ready_future<>();
}
reconnectable_snitch_helper::reconnectable_snitch_helper(sstring local_dc)

View File

@@ -1186,11 +1186,29 @@ future<> storage_service::on_change(inet_address endpoint, application_state sta
if (state == application_state::RPC_READY) {
slogger.debug("Got application_state::RPC_READY for node {}, is_cql_ready={}", endpoint, ep_state->is_cql_ready());
co_await notify_cql_change(endpoint, ep_state->is_cql_ready());
} else if (state == application_state::INTERNAL_IP) {
co_await maybe_reconnect_to_preferred_ip(endpoint, inet_address(value.value));
}
}
}
}
future<> storage_service::maybe_reconnect_to_preferred_ip(inet_address ep, inet_address local_ip) {
auto& snitch = locator::i_endpoint_snitch::get_local_snitch_ptr();
if (!snitch->prefer_local()) {
co_return;
}
const auto& topo = get_token_metadata().get_topology();
if (topo.get_datacenter() == topo.get_datacenter(ep) && _messaging.local().get_preferred_ip(ep) != local_ip) {
slogger.debug("Initiated reconnect to an Internal IP {} for the {}", local_ip, ep);
co_await _messaging.invoke_on_all([ep, local_ip] (auto& local_ms) {
local_ms.cache_preferred_ip(ep, local_ip);
local_ms.remove_rpc_client(netw::msg_addr(ep));
});
}
}
future<> storage_service::on_remove(gms::inet_address endpoint) {
slogger.debug("endpoint={} on_remove", endpoint);

View File

@@ -611,6 +611,7 @@ private:
// needs to be modified to accept either a keyspace or ARS.
future<std::unordered_multimap<dht::token_range, inet_address>> get_changed_ranges_for_leaving(sstring keyspace_name, inet_address endpoint);
future<> maybe_reconnect_to_preferred_ip(inet_address ep, inet_address local_ip);
public:
sstring get_release_version();