messaging_service: change connection dropping notification to pass host id only

Only host id is needed in the callback anyway.
This commit is contained in:
Gleb Natapov
2025-02-12 14:02:22 +02:00
parent 24d30073f9
commit b3720b80b6
4 changed files with 15 additions and 27 deletions

View File

@@ -223,9 +223,9 @@ size_t msg_addr::hash::operator()(const msg_addr& id) const noexcept {
return std::hash<bytes_view>()(id.addr.bytes());
}
messaging_service::shard_info::shard_info(shared_ptr<rpc_protocol_client_wrapper>&& client, bool topo_ignored, inet_address ip)
messaging_service::shard_info::shard_info(shared_ptr<rpc_protocol_client_wrapper>&& client, bool topo_ignored)
: rpc_client(std::move(client))
, topology_ignored(topo_ignored), endpoint(ip)
, topology_ignored(topo_ignored)
{
}
@@ -1150,12 +1150,12 @@ shared_ptr<messaging_service::rpc_protocol_client_wrapper> messaging_service::ge
// the topology (so we always set `topology_ignored` to `false` in that case).
bool topology_ignored = idx != TOPOLOGY_INDEPENDENT_IDX && topology_status.has_value() && *topology_status == false;
if (host_id) {
auto res = _clients_with_host_id[idx].emplace(*host_id, shard_info(std::move(client), topology_ignored, id.addr));
auto res = _clients_with_host_id[idx].emplace(*host_id, shard_info(std::move(client), topology_ignored));
SCYLLA_ASSERT(res.second);
auto it = res.first;
client = it->second.rpc_client;
} else {
auto res = _clients[idx].emplace(id, shard_info(std::move(client), topology_ignored, id.addr));
auto res = _clients[idx].emplace(id, shard_info(std::move(client), topology_ignored));
SCYLLA_ASSERT(res.second);
auto it = res.first;
client = it->second.rpc_client;
@@ -1187,12 +1187,10 @@ void messaging_service::find_and_remove_client(Map& clients, typename Map::key_t
if (it != clients.end() && filter(it->second)) {
auto client = std::move(it->second.rpc_client);
gms::inet_address addr;
std::optional<locator::host_id> hid;
locator::host_id hid;
if constexpr (std::is_same_v<typename Map::key_type, msg_addr>) {
addr = id.addr;
hid = _address_to_host_id_mapper(id.addr);
} else {
addr = it->second.endpoint;
hid = id;
}
@@ -1204,10 +1202,10 @@ void messaging_service::find_and_remove_client(Map& clients, typename Map::key_t
// This will make sure messaging_service::stop() blocks until
// client->stop() is over.
//
(void)client->stop().finally([addr, client, ms = shared_from_this()] {
mlogger.debug("dropped connection to {}", addr);
(void)client->stop().finally([id, client, ms = shared_from_this()] {
mlogger.debug("dropped connection to {}", id);
}).discard_result();
_connection_dropped(addr, hid);
_connection_dropped(hid);
}
}
@@ -1220,7 +1218,6 @@ void messaging_service::remove_error_rpc_client(messaging_verb verb, locator::ho
}
// Removes client to id.addr in both _client and _clients_with_host_id
// FIXME: make removing from _clients_with_host_id more efficient
void messaging_service::remove_rpc_client(msg_addr id, std::optional<locator::host_id> hid) {
for (auto& c : _clients) {
find_and_remove_client(c, id, [] (const auto&) { return true; });

View File

@@ -251,10 +251,9 @@ public:
static constexpr int32_t current_version = 0;
struct shard_info {
shard_info(shared_ptr<rpc_protocol_client_wrapper>&& client, bool topology_ignored, inet_address ip);
shard_info(shared_ptr<rpc_protocol_client_wrapper>&& client, bool topology_ignored);
shared_ptr<rpc_protocol_client_wrapper> rpc_client;
const bool topology_ignored;
const inet_address endpoint;
rpc::stats get_stats() const;
};

View File

@@ -21,8 +21,8 @@ struct msg_addr;
enum class messaging_verb;
class messaging_service;
using connection_drop_signal_t = boost::signals2::signal_type<void (gms::inet_address, std::optional<locator::host_id>), boost::signals2::keywords::mutex_type<boost::signals2::dummy_mutex>>::type;
using connection_drop_slot_t = std::function<void(gms::inet_address, std::optional<locator::host_id>)>;
using connection_drop_signal_t = boost::signals2::signal_type<void (locator::host_id), boost::signals2::keywords::mutex_type<boost::signals2::dummy_mutex>>::type;
using connection_drop_slot_t = std::function<void(locator::host_id)>;
using connection_drop_registration_t = boost::signals2::scoped_connection;
}

View File

@@ -1047,18 +1047,10 @@ private:
co_return netw::messaging_service::no_wait();
}
void connection_dropped(gms::inet_address addr, std::optional<locator::host_id> id) {
slogger.debug("Drop hit rate info for {} because of disconnect", addr);
if (!id) {
try {
id = _gossiper.get_host_id(addr);
} catch (...) {}
}
if (!id) {
return;
}
void connection_dropped(locator::host_id id) {
slogger.debug("Drop hit rate info for {} because of disconnect", id);
for (auto&& cf : _sp._db.local().get_non_system_column_families()) {
cf->drop_hit_rate(*id);
cf->drop_hit_rate(id);
}
}