storage_service: do not update raft address map on gossiper events
Raft address map is not use any longer to resolve addresses anyway, so drop dependency on it from raft_ip_address_updater and rename it to reflect that it is no longer raft address map specific.
This commit is contained in:
6
main.cc
6
main.cc
@@ -2021,10 +2021,8 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
*/
|
||||
db.local().enable_autocompaction_toggle();
|
||||
|
||||
const auto generation_number = gms::generation_type(sys_ks.local().increment_and_get_generation().get());
|
||||
|
||||
// Load address_map from system.peers and subscribe to gossiper events to keep it updated.
|
||||
ss.local().init_address_map(raft_address_map.local(), generation_number).get();
|
||||
ss.local().init_address_map(gossip_address_map.local()).get();
|
||||
auto cancel_address_map_subscription = defer_verbose_shutdown("storage service uninit address map", [&ss] {
|
||||
ss.local().uninit_address_map().get();
|
||||
});
|
||||
@@ -2046,6 +2044,8 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
|
||||
return messaging.invoke_on_all(&netw::messaging_service::start_listen, std::ref(token_metadata));
|
||||
}).get();
|
||||
|
||||
const auto generation_number = gms::generation_type(sys_ks.local().increment_and_get_generation().get());
|
||||
|
||||
with_scheduling_group(maintenance_scheduling_group, [&] {
|
||||
return ss.local().join_cluster(sys_dist_ks, proxy, service::start_hint_manager::yes, generation_number);
|
||||
}).get();
|
||||
|
||||
@@ -926,10 +926,10 @@ static auto ensure_alive(Coro&& coro) {
|
||||
};
|
||||
}
|
||||
|
||||
// {{{ raft_ip_address_updater
|
||||
// {{{ ip_address_updater
|
||||
|
||||
class storage_service::raft_ip_address_updater: public gms::i_endpoint_state_change_subscriber {
|
||||
raft_address_map& _address_map;
|
||||
class storage_service::ip_address_updater: public gms::i_endpoint_state_change_subscriber {
|
||||
gms::gossip_address_map& _address_map;
|
||||
storage_service& _ss;
|
||||
|
||||
future<>
|
||||
@@ -939,14 +939,14 @@ class storage_service::raft_ip_address_updater: public gms::i_endpoint_state_cha
|
||||
co_return;
|
||||
}
|
||||
locator::host_id id(utils::UUID(app_state_ptr->value()));
|
||||
rslog.debug("raft_ip_address_updater::on_endpoint_change({}) {} {}", ev, endpoint, id);
|
||||
rslog.debug("ip_address_updater::on_endpoint_change({}) {} {}", ev, endpoint, id);
|
||||
|
||||
const auto prev_ip = _address_map.find(id);
|
||||
_address_map.add_or_update_entry(id, endpoint, ep_state->get_heart_beat_state().get_generation());
|
||||
auto prev_ip = _ss.get_token_metadata().get_endpoint_for_host_id_if_known(id);
|
||||
if (prev_ip == endpoint) {
|
||||
co_return;
|
||||
}
|
||||
if (_address_map.find(id) == prev_ip) {
|
||||
|
||||
if (_address_map.find(id) != endpoint) {
|
||||
// Address map refused to update IP for the host_id,
|
||||
// this means prev_ip has higher generation than endpoint.
|
||||
// We can immediately remove endpoint from gossiper
|
||||
@@ -960,9 +960,10 @@ class storage_service::raft_ip_address_updater: public gms::i_endpoint_state_cha
|
||||
co_return;
|
||||
}
|
||||
|
||||
|
||||
// If the host_id <-> IP mapping has changed, we need to update system tables, token_metadat and erm.
|
||||
if (_ss.raft_topology_change_enabled()) {
|
||||
rslog.debug("raft_ip_address_updater::on_endpoint_change({}), host_id {}, "
|
||||
rslog.debug("ip_address_updater::on_endpoint_change({}), host_id {}, "
|
||||
"ip changed from [{}] to [{}], "
|
||||
"waiting for group 0 read/apply mutex before reloading Raft topology state...",
|
||||
ev, id, prev_ip, endpoint);
|
||||
@@ -973,13 +974,8 @@ class storage_service::raft_ip_address_updater: public gms::i_endpoint_state_cha
|
||||
// If we call sync_raft_topology_nodes here directly, a gossiper lock and
|
||||
// the _group0.read_apply_mutex could be taken in cross-order leading to a deadlock.
|
||||
// To avoid this, we don't wait for sync_raft_topology_nodes to finish.
|
||||
(void)futurize_invoke(ensure_alive([this, id, endpoint, h = _ss._async_gate.hold()]() -> future<> {
|
||||
(void)futurize_invoke(ensure_alive([this, id, h = _ss._async_gate.hold()]() -> future<> {
|
||||
auto guard = co_await _ss._group0->client().hold_read_apply_mutex(_ss._abort_source);
|
||||
if (_address_map.find(id) != endpoint ||
|
||||
_ss.get_token_metadata().get_endpoint_for_host_id_if_known(id) == endpoint)
|
||||
{
|
||||
co_return;
|
||||
}
|
||||
co_await utils::get_local_injector().inject("ip-change-raft-sync-delay", std::chrono::milliseconds(500));
|
||||
storage_service::nodes_to_notify_after_sync nodes_to_notify;
|
||||
auto lock = co_await _ss.get_token_metadata_lock();
|
||||
@@ -992,7 +988,7 @@ class storage_service::raft_ip_address_updater: public gms::i_endpoint_state_cha
|
||||
}
|
||||
|
||||
public:
|
||||
raft_ip_address_updater(raft_address_map& address_map, storage_service& ss)
|
||||
ip_address_updater(gms::gossip_address_map& address_map, storage_service& ss)
|
||||
: _address_map(address_map)
|
||||
, _ss(ss)
|
||||
{}
|
||||
@@ -1032,7 +1028,7 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
// }}} raft_ip_address_updater
|
||||
// }}} ip_address_updater
|
||||
|
||||
future<> storage_service::sstable_cleanup_fiber(raft::server& server, gate::holder group0_holder, sharded<service::storage_proxy>& proxy) noexcept {
|
||||
while (!_group0_as.abort_requested()) {
|
||||
@@ -2895,21 +2891,14 @@ void storage_service::set_group0(raft_group0& group0) {
|
||||
_group0 = &group0;
|
||||
}
|
||||
|
||||
future<> storage_service::init_address_map(raft_address_map& address_map, gms::generation_type new_generation) {
|
||||
for (auto [ip, host] : co_await _sys_ks.local().load_host_ids()) {
|
||||
address_map.add_or_update_entry(host, ip);
|
||||
}
|
||||
const auto& topology = get_token_metadata().get_topology();
|
||||
auto myid = topology.my_host_id();
|
||||
address_map.add_or_update_entry(myid, topology.my_address(), new_generation);
|
||||
// Make my entry non expiring
|
||||
address_map.set_nonexpiring(myid);
|
||||
_raft_ip_address_updater = make_shared<raft_ip_address_updater>(address_map, *this);
|
||||
_gossiper.register_(_raft_ip_address_updater);
|
||||
future<> storage_service::init_address_map(gms::gossip_address_map& address_map) {
|
||||
_ip_address_updater = make_shared<ip_address_updater>(address_map, *this);
|
||||
_gossiper.register_(_ip_address_updater);
|
||||
co_return;
|
||||
}
|
||||
|
||||
future<> storage_service::uninit_address_map() {
|
||||
return _gossiper.unregister_(_raft_ip_address_updater);
|
||||
return _gossiper.unregister_(_ip_address_updater);
|
||||
}
|
||||
|
||||
bool storage_service::is_topology_coordinator_enabled() const {
|
||||
|
||||
@@ -385,7 +385,7 @@ public:
|
||||
|
||||
void set_group0(service::raft_group0&);
|
||||
|
||||
future<> init_address_map(raft_address_map& address_map, gms::generation_type new_generation);
|
||||
future<> init_address_map(gms::gossip_address_map& address_map);
|
||||
|
||||
future<> uninit_address_map();
|
||||
bool is_topology_coordinator_enabled() const;
|
||||
@@ -861,12 +861,12 @@ private:
|
||||
raft::term_t term{0};
|
||||
uint64_t last_index{0};
|
||||
} _raft_topology_cmd_handler_state;
|
||||
class raft_ip_address_updater;
|
||||
class ip_address_updater;
|
||||
// Represents a subscription to gossiper on_change events,
|
||||
// updating the raft data structures that depend on
|
||||
// IP addresses (raft_address_map, token_metadata.topology, erm-s),
|
||||
// IP addresses (token_metadata.topology, erm-s),
|
||||
// as well as the system.peers table.
|
||||
shared_ptr<raft_ip_address_updater> _raft_ip_address_updater;
|
||||
shared_ptr<ip_address_updater> _ip_address_updater;
|
||||
|
||||
std::unordered_set<raft::server_id> find_raft_nodes_from_hoeps(const locator::host_id_or_endpoint_list& hoeps) const;
|
||||
|
||||
|
||||
@@ -973,10 +973,8 @@ private:
|
||||
_cdc.stop().get();
|
||||
});
|
||||
|
||||
const auto generation_number = gms::generation_type(_sys_ks.local().increment_and_get_generation().get());
|
||||
|
||||
// Load address_map from system.peers and subscribe to gossiper events to keep it updated.
|
||||
_ss.local().init_address_map(_raft_address_map.local(), generation_number).get();
|
||||
_ss.local().init_address_map(_gossip_address_map.local()).get();
|
||||
auto cancel_address_map_subscription = defer([this] {
|
||||
_ss.local().uninit_address_map().get();
|
||||
});
|
||||
@@ -987,6 +985,8 @@ private:
|
||||
|
||||
group0_service.setup_group0_if_exist(_sys_ks.local(), _ss.local(), _qp.local(), _mm.local()).get();
|
||||
|
||||
const auto generation_number = gms::generation_type(_sys_ks.local().increment_and_get_generation().get());
|
||||
|
||||
try {
|
||||
_ss.local().join_cluster(_sys_dist_ks, _proxy, service::start_hint_manager::no, generation_number).get();
|
||||
} catch (std::exception& e) {
|
||||
|
||||
Reference in New Issue
Block a user