Merge 'Do not update topology on address change' from Gleb Natapov

Since now topology does not contain ip addresses there is no need to
create topology on an ip address change. Only peers table has to be
updated. The series factors out peers table update code from
sync_raft_topology_nodes() and calls it on topology and ip address
updates. As a side effect it fixes #22293 since now topology loading
does not require IP do be present, so the assert that is triggered in
this bug is removed.

Fixes: scylladb/scylladb#22293

Closes scylladb/scylladb#22519

* github.com:scylladb/scylladb:
  topology coordinator: do not update topology on address change
  topology coordinator: split out the peer table update functionality from raft state application
This commit is contained in:
Kamil Braun
2025-01-28 12:52:29 +01:00
2 changed files with 141 additions and 122 deletions

View File

@@ -413,20 +413,17 @@ static locator::node::state to_topology_node_state(node_state ns) {
on_internal_error(rtlogger, format("unhandled node state: {}", ns));
}
// Synchronizes the local node state (token_metadata, system.peers/system.local tables,
// gossiper) to align it with the other raft topology nodes.
future<storage_service::nodes_to_notify_after_sync> storage_service::sync_raft_topology_nodes(mutable_token_metadata_ptr tmptr, std::optional<locator::host_id> target_node, std::unordered_set<raft::server_id> prev_normal) {
nodes_to_notify_after_sync nodes_to_notify;
rtlogger.trace("Start sync_raft_topology_nodes target_node={}", target_node);
const auto& am =_address_map;
future<> storage_service::raft_topology_update_ip(locator::host_id id, gms::inet_address ip, nodes_to_notify_after_sync* nodes_to_notify) {
const auto& t = _topology_state_machine._topology;
raft::server_id raft_id{id.uuid()};
auto update_topology = [&] (locator::host_id id, const replica_state& rs) {
tmptr->update_topology(id, locator::endpoint_dc_rack{rs.datacenter, rs.rack},
to_topology_node_state(rs.state), rs.shard_count);
};
std::vector<future<>> sys_ks_futures;
auto node = t.find(raft_id);
if (!node) {
co_return;
}
using host_id_to_ip_map_t = std::unordered_map<locator::host_id, gms::inet_address>;
auto get_host_id_to_ip_map = [&, map = std::optional<host_id_to_ip_map_t>{}]() mutable -> future<const host_id_to_ip_map_t*> {
@@ -445,50 +442,13 @@ future<storage_service::nodes_to_notify_after_sync> storage_service::sync_raft_t
co_return &*map;
};
std::vector<future<>> sys_ks_futures;
const auto& rs = node->second;
auto remove_ip = [&](inet_address ip, locator::host_id host_id, bool notify) -> future<> {
sys_ks_futures.push_back(_sys_ks.local().remove_endpoint(ip));
if (const auto ep = _gossiper.get_endpoint_state_ptr(ip); ep && ep->get_host_id() == host_id) {
co_await _gossiper.force_remove_endpoint(ip, gms::null_permit_id);
if (notify) {
nodes_to_notify.left.push_back({ip, host_id});
switch (rs.state) {
case node_state::normal: {
if (is_me(ip)) {
co_return;
}
}
};
auto process_left_node = [&] (raft::server_id id) -> future<> {
locator::host_id host_id{id.uuid()};
if (const auto ip = am.find(host_id)) {
co_await remove_ip(*ip, host_id, true);
}
if (t.left_nodes_rs.find(id) != t.left_nodes_rs.end()) {
update_topology(host_id, t.left_nodes_rs.at(id));
}
// However if we do that, we need to also implement unbanning a node and do it if `removenode` is aborted.
co_await _messaging.local().ban_host(host_id);
};
auto process_normal_node = [&] (raft::server_id id, const replica_state& rs) -> future<> {
locator::host_id host_id{id.uuid()};
auto ip = am.find(host_id);
rtlogger.trace("loading topology: raft id={} ip={} node state={} dc={} rack={} tokens state={} tokens={} shards={}",
id, ip, rs.state, rs.datacenter, rs.rack, _topology_state_machine._topology.tstate, rs.ring.value().tokens, rs.shard_count, rs.cleanup);
// Save tokens, not needed for raft topology management, but needed by legacy
// Also ip -> id mapping is needed for address map recreation on reboot
if (is_me(host_id)) {
sys_ks_futures.push_back(_sys_ks.local().update_tokens(rs.ring.value().tokens));
co_await _gossiper.add_local_application_state(
std::pair(gms::application_state::TOKENS, gms::versioned_value::tokens(rs.ring.value().tokens)),
std::pair(gms::application_state::CDC_GENERATION_ID, gms::versioned_value::cdc_generation_id(_topology_state_machine._topology.committed_cdc_generations.back())),
std::pair(gms::application_state::STATUS, gms::versioned_value::normal(rs.ring.value().tokens))
);
} else if (ip && !is_me(*ip)) {
// In replace-with-same-ip scenario the replaced node IP will be the same
// as ours, we shouldn't put it into system.peers.
@@ -501,7 +461,7 @@ future<storage_service::nodes_to_notify_after_sync> storage_service::sync_raft_t
// Populate the table with the state from the gossiper here since storage_service::on_change()
// (which is called each time gossiper state changes) may have skipped it because the tokens
// for the node were not in the 'normal' state yet
auto info = get_peer_info_for_update(*ip);
auto info = get_peer_info_for_update(ip);
if (info) {
// And then amend with the info from raft
info->tokens = rs.ring.value().tokens;
@@ -509,29 +469,97 @@ future<storage_service::nodes_to_notify_after_sync> storage_service::sync_raft_t
info->rack = rs.rack;
info->release_version = rs.release_version;
info->supported_features = fmt::to_string(fmt::join(rs.supported_features, ","));
sys_ks_futures.push_back(_sys_ks.local().update_peer_info(*ip, host_id, *info));
}
if (!prev_normal.contains(id)) {
nodes_to_notify.joined.push_back(*ip);
sys_ks_futures.push_back(_sys_ks.local().update_peer_info(ip, id, *info));
}
if (const auto it = host_id_to_ip_map.find(host_id); it != host_id_to_ip_map.end() && it->second != *ip) {
if (nodes_to_notify) {
nodes_to_notify->joined.emplace_back(ip);
}
if (const auto it = host_id_to_ip_map.find(id); it != host_id_to_ip_map.end() && it->second != ip) {
utils::get_local_injector().inject("crash-before-prev-ip-removed", [] {
slogger.info("crash-before-prev-ip-removed hit, killing the node");
_exit(1);
});
// IP change is not expected to emit REMOVED_NODE notifications
co_await remove_ip(it->second, host_id, false);
auto old_ip = it->second;
sys_ks_futures.push_back(_sys_ks.local().remove_endpoint(old_ip));
if (const auto ep = _gossiper.get_endpoint_state_ptr(old_ip); ep && ep->get_host_id() == id) {
co_await _gossiper.force_remove_endpoint(old_ip, gms::null_permit_id);
}
}
}
break;
case node_state::bootstrapping:
if (!is_me(ip)) {
utils::get_local_injector().inject("crash-before-bootstrapping-node-added", [] {
rtlogger.error("crash-before-bootstrapping-node-added hit, killing the node");
_exit(1);
});
// Save ip -> id mapping in peers table because we need it on restart, but do not save tokens until owned
sys_ks_futures.push_back(_sys_ks.local().update_peer_info(ip, id, {}));
}
break;
default:
break;
}
co_await when_all_succeed(sys_ks_futures.begin(), sys_ks_futures.end()).discard_result();
}
// Synchronizes the local node state (token_metadata, system.peers/system.local tables,
// gossiper) to align it with the other raft topology nodes.
future<storage_service::nodes_to_notify_after_sync> storage_service::sync_raft_topology_nodes(mutable_token_metadata_ptr tmptr, std::unordered_set<raft::server_id> prev_normal) {
nodes_to_notify_after_sync nodes_to_notify;
rtlogger.trace("Start sync_raft_topology_nodes");
const auto& t = _topology_state_machine._topology;
auto update_topology = [&] (locator::host_id id, const replica_state& rs) {
tmptr->update_topology(id, locator::endpoint_dc_rack{rs.datacenter, rs.rack},
to_topology_node_state(rs.state), rs.shard_count);
};
std::vector<future<>> sys_ks_futures;
auto process_left_node = [&] (raft::server_id id, locator::host_id host_id, std::optional<gms::inet_address> ip) -> future<> {
if (ip) {
sys_ks_futures.push_back(_sys_ks.local().remove_endpoint(*ip));
if (const auto ep = _gossiper.get_endpoint_state_ptr(*ip); ep && ep->get_host_id() == host_id) {
co_await _gossiper.force_remove_endpoint(*ip, gms::null_permit_id);
nodes_to_notify.left.push_back({*ip, host_id});
}
}
if (t.left_nodes_rs.find(id) != t.left_nodes_rs.end()) {
update_topology(host_id, t.left_nodes_rs.at(id));
}
// However if we do that, we need to also implement unbanning a node and do it if `removenode` is aborted.
co_await _messaging.local().ban_host(host_id);
};
auto process_normal_node = [&] (raft::server_id id, locator::host_id host_id, std::optional<gms::inet_address> ip, const replica_state& rs) -> future<> {
rtlogger.trace("loading topology: raft id={} ip={} node state={} dc={} rack={} tokens state={} tokens={} shards={}",
id, ip, rs.state, rs.datacenter, rs.rack, _topology_state_machine._topology.tstate, rs.ring.value().tokens, rs.shard_count, rs.cleanup);
// Save tokens, not needed for raft topology management, but needed by legacy
// Also ip -> id mapping is needed for address map recreation on reboot
if (is_me(host_id)) {
sys_ks_futures.push_back(_sys_ks.local().update_tokens(rs.ring.value().tokens));
co_await _gossiper.add_local_application_state(
std::pair(gms::application_state::TOKENS, gms::versioned_value::tokens(rs.ring.value().tokens)),
std::pair(gms::application_state::CDC_GENERATION_ID, gms::versioned_value::cdc_generation_id(_topology_state_machine._topology.committed_cdc_generations.back())),
std::pair(gms::application_state::STATUS, gms::versioned_value::normal(rs.ring.value().tokens))
);
}
update_topology(host_id, rs);
co_await tmptr->update_normal_tokens(rs.ring.value().tokens, host_id);
};
auto process_transition_node = [&](raft::server_id id, const replica_state& rs) -> future<> {
locator::host_id host_id{id.uuid()};
auto ip = am.find(host_id);
auto process_transition_node = [&](raft::server_id id, locator::host_id host_id, std::optional<gms::inet_address> ip, const replica_state& rs) -> future<> {
rtlogger.trace("loading topology: raft id={} ip={} node state={} dc={} rack={} tokens state={} tokens={}",
id, ip, rs.state, rs.datacenter, rs.rack, _topology_state_machine._topology.tstate,
seastar::value_of([&] () -> sstring {
@@ -541,29 +569,16 @@ future<storage_service::nodes_to_notify_after_sync> storage_service::sync_raft_t
switch (rs.state) {
case node_state::bootstrapping:
if (rs.ring.has_value()) {
if (ip) {
if (!is_me(*ip)) {
utils::get_local_injector().inject("crash-before-bootstrapping-node-added", [] {
rtlogger.error("crash-before-bootstrapping-node-added hit, killing the node");
_exit(1);
});
// Save ip -> id mapping in peers table because we need it on restart, but do not save tokens until owned
sys_ks_futures.push_back(_sys_ks.local().update_peer_info(*ip, host_id, {}));
}
update_topology(host_id, rs);
if (_topology_state_machine._topology.normal_nodes.empty()) {
// This is the first node in the cluster. Insert the tokens as normal to the token ring early
// so we can perform writes to regular 'distributed' tables during the bootstrap procedure
// (such as the CDC generation write).
// It doesn't break anything to set the tokens to normal early in this single-node case.
co_await tmptr->update_normal_tokens(rs.ring.value().tokens, host_id);
} else {
tmptr->add_bootstrap_tokens(rs.ring.value().tokens, host_id);
co_await update_topology_change_info(tmptr, ::format("bootstrapping node {}/{}", id, ip));
}
} else if (_topology_state_machine._topology.tstate == topology::transition_state::write_both_read_new) {
on_internal_error(rtlogger, format("Bootstrapping node {} does not have IP mapping but the topology is in the write_both_read_new state", id));
update_topology(host_id, rs);
if (_topology_state_machine._topology.normal_nodes.empty()) {
// This is the first node in the cluster. Insert the tokens as normal to the token ring early
// so we can perform writes to regular 'distributed' tables during the bootstrap procedure
// (such as the CDC generation write).
// It doesn't break anything to set the tokens to normal early in this single-node case.
co_await tmptr->update_normal_tokens(rs.ring.value().tokens, host_id);
} else {
tmptr->add_bootstrap_tokens(rs.ring.value().tokens, host_id);
co_await update_topology_change_info(tmptr, ::format("bootstrapping node {}/{}", id, ip));
}
}
break;
@@ -576,7 +591,7 @@ future<storage_service::nodes_to_notify_after_sync> storage_service::sync_raft_t
case node_state::removing:
if (_topology_state_machine._topology.tstate == topology::transition_state::rollback_to_normal) {
// no need for double writes anymore since op failed
co_await process_normal_node(id, rs);
co_await process_normal_node(id, host_id, ip, rs);
break;
}
update_topology(host_id, rs);
@@ -587,7 +602,7 @@ future<storage_service::nodes_to_notify_after_sync> storage_service::sync_raft_t
case node_state::replacing: {
SCYLLA_ASSERT(_topology_state_machine._topology.req_param.contains(id));
auto replaced_id = std::get<replace_param>(_topology_state_machine._topology.req_param[id]).replaced_id;
auto existing_ip = am.find(locator::host_id{replaced_id.uuid()});
auto existing_ip = _address_map.find(locator::host_id{replaced_id.uuid()});
const auto replaced_host_id = locator::host_id(replaced_id.uuid());
tmptr->update_topology(replaced_host_id, std::nullopt, locator::node::state::being_replaced);
tmptr->add_replacing_endpoint(replaced_host_id, host_id);
@@ -599,38 +614,43 @@ future<storage_service::nodes_to_notify_after_sync> storage_service::sync_raft_t
break;
case node_state::rebuilding:
// Rebuilding node is normal
co_await process_normal_node(id, rs);
co_await process_normal_node(id, host_id, ip, rs);
break;
default:
on_fatal_internal_error(rtlogger, ::format("Unexpected state {} for node {}", rs.state, id));
}
};
if (target_node) {
raft::server_id raft_id{target_node->uuid()};
if (t.left_nodes.contains(raft_id)) {
co_await process_left_node(raft_id);
} else if (auto it = t.normal_nodes.find(raft_id); it != t.normal_nodes.end()) {
co_await process_normal_node(raft_id, it->second);
} else if ((it = t.transition_nodes.find(raft_id)) != t.transition_nodes.end()) {
co_await process_transition_node(raft_id, it->second);
sys_ks_futures.reserve(t.left_nodes.size() + t.normal_nodes.size() + t.transition_nodes.size());
for (const auto& id: t.left_nodes) {
locator::host_id host_id{id.uuid()};
auto ip = _address_map.find(host_id);
co_await process_left_node(id, host_id, ip);
if (ip) {
sys_ks_futures.push_back(raft_topology_update_ip(host_id, *ip, nullptr));
}
} else {
sys_ks_futures.reserve(t.left_nodes.size() + t.normal_nodes.size() + t.transition_nodes.size());
for (const auto& id: t.left_nodes) {
co_await process_left_node(id);
}
for (const auto& [id, rs]: t.normal_nodes) {
locator::host_id host_id{id.uuid()};
auto ip = _address_map.find(host_id);
co_await process_normal_node(id, host_id, ip, rs);
if (ip) {
sys_ks_futures.push_back(raft_topology_update_ip(host_id, *ip, prev_normal.contains(id) ? nullptr : &nodes_to_notify));
}
for (const auto& [id, rs]: t.normal_nodes) {
co_await process_normal_node(id, rs);
}
for (const auto& [id, rs]: t.transition_nodes) {
locator::host_id host_id{id.uuid()};
auto ip = _address_map.find(host_id);
co_await process_transition_node(id, host_id, ip, rs);
if (ip) {
sys_ks_futures.push_back(raft_topology_update_ip(host_id, *ip, nullptr));
}
for (const auto& [id, rs]: t.transition_nodes) {
co_await process_transition_node(id, rs);
}
for (auto id : t.get_excluded_nodes()) {
locator::node* n = tmptr->get_topology().find_node(locator::host_id(id.uuid()));
if (n) {
n->set_excluded(true);
}
}
for (auto id : t.get_excluded_nodes()) {
locator::node* n = tmptr->get_topology().find_node(locator::host_id(id.uuid()));
if (n) {
n->set_excluded(true);
}
}
@@ -756,7 +776,7 @@ future<> storage_service::topology_state_load(state_change_hint hint) {
}, _topology_state_machine._topology.tstate);
tmptr->set_read_new(read_new);
auto nodes_to_notify = co_await sync_raft_topology_nodes(tmptr, std::nullopt, std::move(prev_normal));
auto nodes_to_notify = co_await sync_raft_topology_nodes(tmptr, std::move(prev_normal));
std::optional<locator::tablet_metadata> tablets;
if (hint.tablets_hint) {
@@ -945,14 +965,12 @@ class storage_service::ip_address_updater: public gms::i_endpoint_state_change_s
// If we call sync_raft_topology_nodes here directly, a gossiper lock and
// the _group0.read_apply_mutex could be taken in cross-order leading to a deadlock.
// To avoid this, we don't wait for sync_raft_topology_nodes to finish.
(void)futurize_invoke(ensure_alive([this, id, h = _ss._async_gate.hold()]() -> future<> {
(void)futurize_invoke(ensure_alive([this, id, endpoint, h = _ss._async_gate.hold()]() -> future<> {
auto guard = co_await _ss._group0->client().hold_read_apply_mutex(_ss._abort_source);
co_await utils::get_local_injector().inject("ip-change-raft-sync-delay", std::chrono::milliseconds(500));
storage_service::nodes_to_notify_after_sync nodes_to_notify;
auto lock = co_await _ss.get_token_metadata_lock();
co_await _ss.mutate_token_metadata([this, id, &nodes_to_notify](mutable_token_metadata_ptr t) -> future<> {
nodes_to_notify = co_await _ss.sync_raft_topology_nodes(std::move(t), id, {});
}, storage_service::acquire_merge_lock::no);
// Set notify_join to true since here we detected address change and drivers have to be notified
nodes_to_notify_after_sync nodes_to_notify;
co_await _ss.raft_topology_update_ip(id, endpoint, &nodes_to_notify);
co_await _ss.notify_nodes_after_sync(std::move(nodes_to_notify));
}));
}

View File

@@ -963,11 +963,12 @@ private:
std::vector<gms::inet_address> joined;
};
future<> raft_topology_update_ip(locator::host_id id, gms::inet_address ip, nodes_to_notify_after_sync* nodes_to_notify);
// Synchronizes the local node state (token_metadata, system.peers/system.local tables,
// gossiper) to align it with the other raft topology nodes.
// Optional target_node can be provided to restrict the synchronization to the specified node.
// Returns a structure that describes which notifications to trigger after token metadata is updated.
future<nodes_to_notify_after_sync> sync_raft_topology_nodes(mutable_token_metadata_ptr tmptr, std::optional<locator::host_id> target_node, std::unordered_set<raft::server_id> prev_normal);
future<nodes_to_notify_after_sync> sync_raft_topology_nodes(mutable_token_metadata_ptr tmptr, std::unordered_set<raft::server_id> prev_normal);
// Triggers notifications (on_joined, on_left) based on the recent changes to token metadata, as described by the passed in structure.
// This function should be called on the result of `sync_raft_topology_nodes`, after the global token metadata is updated.
future<> notify_nodes_after_sync(nodes_to_notify_after_sync&& nodes_to_notify);