Merge 'Call left/joined notifiers when topology coordinator is enabled' from Gleb

The gossiper topology change code calls left/joined notifiers when a
node leave or joins the cluster. This code it not executed in topology coordinator
mode, so the coordinator needs to call those notifiers by itself. The
series add the calls.

Fixes scylladb/scylladb#15841

* 'gleb/raft-topo-notifications-v1' of github.com:scylladb/scylla-dev:
  storage service: topology coordinator: call notify_joined() when a node joins a cluster
  storage service: topology coordinator: call notify_left() when a node leaves a cluster
  storage_service: drop redundant check from notify_joined()
This commit is contained in:
Kamil Braun
2024-01-25 12:12:53 +01:00
2 changed files with 10 additions and 8 deletions

View File

@@ -343,7 +343,7 @@ static locator::node::state to_topology_node_state(node_state ns) {
// Synchronizes the local node state (token_metadata, system.peers/system.local tables,
// gossiper) to align it with the other raft topology nodes.
future<> storage_service::sync_raft_topology_nodes(mutable_token_metadata_ptr tmptr, std::optional<locator::host_id> target_node) {
future<> storage_service::sync_raft_topology_nodes(mutable_token_metadata_ptr tmptr, std::optional<locator::host_id> target_node, std::unordered_set<raft::server_id> prev_normal) {
const auto& am = _group0->address_map();
const auto& t = _topology_state_machine._topology;
@@ -375,6 +375,7 @@ future<> storage_service::sync_raft_topology_nodes(mutable_token_metadata_ptr tm
if (_gossiper.get_endpoint_state_ptr(*ip) && !get_used_ips().contains(*ip)) {
co_await _gossiper.force_remove_endpoint(*ip, gms::null_permit_id);
co_await notify_left(*ip);
}
}
@@ -413,6 +414,9 @@ future<> storage_service::sync_raft_topology_nodes(mutable_token_metadata_ptr tm
info.host_id = id.uuid();
info.release_version = rs.release_version;
co_await _sys_ks.local().update_peer_info(*ip, info);
if (!prev_normal.contains(id)) {
co_await notify_joined(*ip);
}
}
update_topology(host_id, ip, rs);
co_await tmptr->update_normal_tokens(rs.ring.value().tokens, host_id);
@@ -528,6 +532,8 @@ future<> storage_service::topology_state_load() {
}
rtlogger.debug("reload raft topology state");
std::unordered_set<raft::server_id> prev_normal = boost::copy_range<std::unordered_set<raft::server_id>>(_topology_state_machine._topology.normal_nodes | boost::adaptors::map_keys);
// read topology state from disk and recreate token_metadata from it
_topology_state_machine._topology = co_await _sys_ks.local().load_topology_state();
@@ -571,7 +577,7 @@ future<> storage_service::topology_state_load() {
}, _topology_state_machine._topology.tstate);
tmptr->set_read_new(read_new);
co_await sync_raft_topology_nodes(tmptr, std::nullopt);
co_await sync_raft_topology_nodes(tmptr, std::nullopt, std::move(prev_normal));
if (_db.local().get_config().check_experimental(db::experimental_features_t::feature::TABLETS)) {
tmptr->set_tablets(co_await replica::read_tablet_metadata(_qp));
@@ -679,7 +685,7 @@ class storage_service::raft_ip_address_updater: public gms::i_endpoint_state_cha
const auto hid = locator::host_id{id.uuid()};
if (_address_map.find(id) == endpoint && _ss.get_token_metadata().get_endpoint_for_host_id_if_known(hid) != endpoint) {
return _ss.mutate_token_metadata([this, hid](mutable_token_metadata_ptr t) {
return _ss.sync_raft_topology_nodes(std::move(t), hid);
return _ss.sync_raft_topology_nodes(std::move(t), hid, {});
}).finally([g = std::move(g)]{});
}
return make_ready_future<>();
@@ -5756,10 +5762,6 @@ future<> endpoint_lifecycle_notifier::notify_joined(gms::inet_address endpoint)
}
future<> storage_service::notify_joined(inet_address endpoint) {
if (!_gossiper.is_normal(endpoint)) {
co_return;
}
co_await utils::get_local_injector().inject(
"storage_service_notify_joined_sleep", std::chrono::milliseconds{500});

View File

@@ -789,7 +789,7 @@ private:
// Synchronizes the local node state (token_metadata, system.peers/system.local tables,
// gossiper) to align it with the other raft topology nodes.
// Optional target_node can be provided to restrict the synchronization to the specified node.
future<> sync_raft_topology_nodes(mutable_token_metadata_ptr tmptr, std::optional<locator::host_id> target_node);
future<> sync_raft_topology_nodes(mutable_token_metadata_ptr tmptr, std::optional<locator::host_id> target_node, std::unordered_set<raft::server_id> prev_normal);
// load topology state machine snapshot into memory
// raft_group0_client::_read_apply_mutex must be held
future<> topology_state_load();