raft ips: rename gossiper_state_change_subscriber_proxy -> raft_ip_address_updater

This commit is contained in:
Petr Gusev
2024-01-10 21:09:31 +04:00
parent 6e7bbc94f4
commit e24bee545b
2 changed files with 15 additions and 13 deletions

View File

@@ -650,9 +650,9 @@ future<> storage_service::merge_topology_snapshot(raft_topology_snapshot snp) {
co_await _db.local().apply(freeze(muts), db::no_timeout);
}
// {{{ gossiper_state_change_subscriber_proxy
// {{{ raft_ip_address_updater
class storage_service::gossiper_state_change_subscriber_proxy: public gms::i_endpoint_state_change_subscriber {
class storage_service::raft_ip_address_updater: public gms::i_endpoint_state_change_subscriber {
raft_address_map& _address_map;
storage_service& _ss;
@@ -663,14 +663,14 @@ class storage_service::gossiper_state_change_subscriber_proxy: public gms::i_end
co_return;
}
raft::server_id id(utils::UUID(app_state_ptr->value()));
rslog.debug("gossiper_state_change_subscriber_proxy::on_endpoint_change() {} {}", endpoint, id);
rslog.debug("raft_ip_address_updater::on_endpoint_change() {} {}", endpoint, id);
const auto prev_ip = _address_map.find(id);
_address_map.add_or_update_entry(id, endpoint, ep_state->get_heart_beat_state().get_generation());
// If the host_id <-> IP mapping has changed, we need to update system tables, token_metadat and erm.
if (_ss._raft_topology_change_enabled && prev_ip != endpoint && _address_map.find(id) == endpoint) {
rslog.debug("gossiper_state_change_subscriber_proxy::on_endpoint_change(), host_id {}, "
rslog.debug("raft_ip_address_updater::on_endpoint_change(), host_id {}, "
"ip changed from [{}] to [{}], "
"waiting for group 0 read/apply mutex before reloading Raft topology state...",
id, prev_ip, endpoint);
@@ -694,7 +694,7 @@ class storage_service::gossiper_state_change_subscriber_proxy: public gms::i_end
}
public:
gossiper_state_change_subscriber_proxy(raft_address_map& address_map, storage_service& ss)
raft_ip_address_updater(raft_address_map& address_map, storage_service& ss)
: _address_map(address_map)
, _ss(ss)
{}
@@ -734,7 +734,7 @@ public:
}
};
// }}} gossiper_state_change_subscriber_proxy
// }}} raft_ip_address_updater
template<typename Builder>
class topology_mutation_builder_base {
@@ -4446,12 +4446,12 @@ future<> storage_service::init_address_map(raft_address_map& address_map) {
for (auto [ip, host] : co_await _sys_ks.local().load_host_ids()) {
address_map.add_or_update_entry(raft::server_id(host.uuid()), ip);
}
_gossiper_proxy = make_shared<gossiper_state_change_subscriber_proxy>(address_map, *this);
_gossiper.register_(_gossiper_proxy);
_raft_ip_address_updater = make_shared<raft_ip_address_updater>(address_map, *this);
_gossiper.register_(_raft_ip_address_updater);
}
future<> storage_service::uninit_address_map() {
return _gossiper.unregister_(_gossiper_proxy);
return _gossiper.unregister_(_raft_ip_address_updater);
}
future<> storage_service::join_cluster(sharded<db::system_distributed_keyspace>& sys_dist_ks, sharded<service::storage_proxy>& proxy) {

View File

@@ -763,10 +763,12 @@ private:
raft::term_t term{0};
uint64_t last_index{0};
} _raft_topology_cmd_handler_state;
class gossiper_state_change_subscriber_proxy;
// A proxy class representing subscription to on_change
// events, and updating the address map on this events.
shared_ptr<gossiper_state_change_subscriber_proxy> _gossiper_proxy;
class raft_ip_address_updater;
// Represents a subscription to gossiper on_change events,
// updating the raft data structures that depend on
// IP addresses (raft_address_map, token_metadata.topology, erm-s),
// as well as the system.peers table.
shared_ptr<raft_ip_address_updater> _raft_ip_address_updater;
std::unordered_set<raft::server_id> find_raft_nodes_from_hoeps(const std::list<locator::host_id_or_endpoint>& hoeps);