From c65f64cc5f4f9aeddfb08e24fd59376c981cce7b Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Tue, 26 Nov 2024 10:31:20 +0200 Subject: [PATCH] storage_service: do not update raft address map on gossiper events Raft address map is not use any longer to resolve addresses anyway, so drop dependency on it from raft_ip_address_updater and rename it to reflect that it is no longer raft address map specific. --- main.cc | 6 ++--- service/storage_service.cc | 45 ++++++++++++++------------------------ service/storage_service.hh | 8 +++---- test/lib/cql_test_env.cc | 6 ++--- 4 files changed, 27 insertions(+), 38 deletions(-) diff --git a/main.cc b/main.cc index 6db87f9bcb..15ab45286f 100644 --- a/main.cc +++ b/main.cc @@ -2021,10 +2021,8 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl */ db.local().enable_autocompaction_toggle(); - const auto generation_number = gms::generation_type(sys_ks.local().increment_and_get_generation().get()); - // Load address_map from system.peers and subscribe to gossiper events to keep it updated. - ss.local().init_address_map(raft_address_map.local(), generation_number).get(); + ss.local().init_address_map(gossip_address_map.local()).get(); auto cancel_address_map_subscription = defer_verbose_shutdown("storage service uninit address map", [&ss] { ss.local().uninit_address_map().get(); }); @@ -2046,6 +2044,8 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl return messaging.invoke_on_all(&netw::messaging_service::start_listen, std::ref(token_metadata)); }).get(); + const auto generation_number = gms::generation_type(sys_ks.local().increment_and_get_generation().get()); + with_scheduling_group(maintenance_scheduling_group, [&] { return ss.local().join_cluster(sys_dist_ks, proxy, service::start_hint_manager::yes, generation_number); }).get(); diff --git a/service/storage_service.cc b/service/storage_service.cc index 75ded45a4f..1d92458d3b 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -926,10 +926,10 @@ static auto ensure_alive(Coro&& coro) { }; } -// {{{ raft_ip_address_updater +// {{{ ip_address_updater -class storage_service::raft_ip_address_updater: public gms::i_endpoint_state_change_subscriber { - raft_address_map& _address_map; +class storage_service::ip_address_updater: public gms::i_endpoint_state_change_subscriber { + gms::gossip_address_map& _address_map; storage_service& _ss; future<> @@ -939,14 +939,14 @@ class storage_service::raft_ip_address_updater: public gms::i_endpoint_state_cha co_return; } locator::host_id id(utils::UUID(app_state_ptr->value())); - rslog.debug("raft_ip_address_updater::on_endpoint_change({}) {} {}", ev, endpoint, id); + rslog.debug("ip_address_updater::on_endpoint_change({}) {} {}", ev, endpoint, id); - const auto prev_ip = _address_map.find(id); - _address_map.add_or_update_entry(id, endpoint, ep_state->get_heart_beat_state().get_generation()); + auto prev_ip = _ss.get_token_metadata().get_endpoint_for_host_id_if_known(id); if (prev_ip == endpoint) { co_return; } - if (_address_map.find(id) == prev_ip) { + + if (_address_map.find(id) != endpoint) { // Address map refused to update IP for the host_id, // this means prev_ip has higher generation than endpoint. // We can immediately remove endpoint from gossiper @@ -960,9 +960,10 @@ class storage_service::raft_ip_address_updater: public gms::i_endpoint_state_cha co_return; } + // If the host_id <-> IP mapping has changed, we need to update system tables, token_metadat and erm. if (_ss.raft_topology_change_enabled()) { - rslog.debug("raft_ip_address_updater::on_endpoint_change({}), host_id {}, " + rslog.debug("ip_address_updater::on_endpoint_change({}), host_id {}, " "ip changed from [{}] to [{}], " "waiting for group 0 read/apply mutex before reloading Raft topology state...", ev, id, prev_ip, endpoint); @@ -973,13 +974,8 @@ class storage_service::raft_ip_address_updater: public gms::i_endpoint_state_cha // If we call sync_raft_topology_nodes here directly, a gossiper lock and // the _group0.read_apply_mutex could be taken in cross-order leading to a deadlock. // To avoid this, we don't wait for sync_raft_topology_nodes to finish. - (void)futurize_invoke(ensure_alive([this, id, endpoint, h = _ss._async_gate.hold()]() -> future<> { + (void)futurize_invoke(ensure_alive([this, id, h = _ss._async_gate.hold()]() -> future<> { auto guard = co_await _ss._group0->client().hold_read_apply_mutex(_ss._abort_source); - if (_address_map.find(id) != endpoint || - _ss.get_token_metadata().get_endpoint_for_host_id_if_known(id) == endpoint) - { - co_return; - } co_await utils::get_local_injector().inject("ip-change-raft-sync-delay", std::chrono::milliseconds(500)); storage_service::nodes_to_notify_after_sync nodes_to_notify; auto lock = co_await _ss.get_token_metadata_lock(); @@ -992,7 +988,7 @@ class storage_service::raft_ip_address_updater: public gms::i_endpoint_state_cha } public: - raft_ip_address_updater(raft_address_map& address_map, storage_service& ss) + ip_address_updater(gms::gossip_address_map& address_map, storage_service& ss) : _address_map(address_map) , _ss(ss) {} @@ -1032,7 +1028,7 @@ public: } }; -// }}} raft_ip_address_updater +// }}} ip_address_updater future<> storage_service::sstable_cleanup_fiber(raft::server& server, gate::holder group0_holder, sharded& proxy) noexcept { while (!_group0_as.abort_requested()) { @@ -2895,21 +2891,14 @@ void storage_service::set_group0(raft_group0& group0) { _group0 = &group0; } -future<> storage_service::init_address_map(raft_address_map& address_map, gms::generation_type new_generation) { - for (auto [ip, host] : co_await _sys_ks.local().load_host_ids()) { - address_map.add_or_update_entry(host, ip); - } - const auto& topology = get_token_metadata().get_topology(); - auto myid = topology.my_host_id(); - address_map.add_or_update_entry(myid, topology.my_address(), new_generation); - // Make my entry non expiring - address_map.set_nonexpiring(myid); - _raft_ip_address_updater = make_shared(address_map, *this); - _gossiper.register_(_raft_ip_address_updater); +future<> storage_service::init_address_map(gms::gossip_address_map& address_map) { + _ip_address_updater = make_shared(address_map, *this); + _gossiper.register_(_ip_address_updater); + co_return; } future<> storage_service::uninit_address_map() { - return _gossiper.unregister_(_raft_ip_address_updater); + return _gossiper.unregister_(_ip_address_updater); } bool storage_service::is_topology_coordinator_enabled() const { diff --git a/service/storage_service.hh b/service/storage_service.hh index fb0b0d32e2..ed555f927c 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -385,7 +385,7 @@ public: void set_group0(service::raft_group0&); - future<> init_address_map(raft_address_map& address_map, gms::generation_type new_generation); + future<> init_address_map(gms::gossip_address_map& address_map); future<> uninit_address_map(); bool is_topology_coordinator_enabled() const; @@ -861,12 +861,12 @@ private: raft::term_t term{0}; uint64_t last_index{0}; } _raft_topology_cmd_handler_state; - class raft_ip_address_updater; + class ip_address_updater; // Represents a subscription to gossiper on_change events, // updating the raft data structures that depend on - // IP addresses (raft_address_map, token_metadata.topology, erm-s), + // IP addresses (token_metadata.topology, erm-s), // as well as the system.peers table. - shared_ptr _raft_ip_address_updater; + shared_ptr _ip_address_updater; std::unordered_set find_raft_nodes_from_hoeps(const locator::host_id_or_endpoint_list& hoeps) const; diff --git a/test/lib/cql_test_env.cc b/test/lib/cql_test_env.cc index 0bcb4a94a0..a494703937 100644 --- a/test/lib/cql_test_env.cc +++ b/test/lib/cql_test_env.cc @@ -973,10 +973,8 @@ private: _cdc.stop().get(); }); - const auto generation_number = gms::generation_type(_sys_ks.local().increment_and_get_generation().get()); - // Load address_map from system.peers and subscribe to gossiper events to keep it updated. - _ss.local().init_address_map(_raft_address_map.local(), generation_number).get(); + _ss.local().init_address_map(_gossip_address_map.local()).get(); auto cancel_address_map_subscription = defer([this] { _ss.local().uninit_address_map().get(); }); @@ -987,6 +985,8 @@ private: group0_service.setup_group0_if_exist(_sys_ks.local(), _ss.local(), _qp.local(), _mm.local()).get(); + const auto generation_number = gms::generation_type(_sys_ks.local().increment_and_get_generation().get()); + try { _ss.local().join_cluster(_sys_dist_ks, _proxy, service::start_hint_manager::no, generation_number).get(); } catch (std::exception& e) {