Merge 'drop ip addresses from token metadata' from Gleb

Now that all topology related code uses host ids there is not point to
maintain ip to id (and back) mappings in the token metadata. After the
patch the mapping will be maintained in the gossiper only. The rest of
the system will use host ids and in rare cases where translation is
needed (mostly for UX compatibility reasons) the translation will be
done using gossiper.

Fixes: scylladb/scylla#21777

* 'gleb/drop-ip-from-tm-v3' of github.com:scylladb/scylla-dev: (57 commits)
  hint manager: do not translate ip to id in case hint manager is stopped already
  locator: token_metadata: drop update_host_id() function that does nothing now
  locator: topology: drop indexing by ips
  repair: drop unneeded code
  storage_service: use host_id to look for a node in on_alive handler
  storage_proxy: translate ips to ids in forward array using gossiper
  locator: topology: remove unused functions
  storage_service: check for outdated ip in on_change notification in the peers table
  storage_proxy: translate id to ip using address map in tablets's describe_ring code instead of taking one from the topology
  topology coordinator: change connection dropping code to work on host ids
  cql3: report host id instead of ip in error during SELECT FROM MUTATION_FRAGMENTS query
  locator: drop unused function from tablet_effective_replication_map
  api: view_build_statuses: do not use IP from the topology, but translate id to ip using address map instead
  locator: token_metadata: remove unused ip based functions
  locator: network_topology_strategy: use host_id based function to check number of endpoints in dcs
  gossiper: drop get_unreachable_token_owners functions
  storage_service: use gossiper to map ip to id in node_ops operations
  storage_service: fix indentation after the last patch
  storage_service: drop loops from node ops replace_prepare handling since there can be only one replacing node
  token_metadata: drop no longer used functions
  ...
This commit is contained in:
Kamil Braun
2025-01-17 11:00:52 +01:00
57 changed files with 534 additions and 1085 deletions

View File

@@ -113,11 +113,10 @@ bool is_datacenter_local(consistency_level l) {
return l == consistency_level::LOCAL_ONE || l == consistency_level::LOCAL_QUORUM;
}
template <typename Range, typename PendingRange = std::array<gms::inet_address, 0>>
std::unordered_map<sstring, dc_node_count> count_per_dc_endpoints(
const locator::effective_replication_map& erm,
const Range& live_endpoints,
const PendingRange& pending_endpoints = std::array<gms::inet_address, 0>()) {
const host_id_vector_replica_set& live_endpoints,
const host_id_vector_topology_change& pending_endpoints = host_id_vector_topology_change()) {
using namespace locator;
auto& rs = erm.get_replication_strategy();
@@ -148,12 +147,11 @@ std::unordered_map<sstring, dc_node_count> count_per_dc_endpoints(
return dc_endpoints;
}
template<typename Range, typename PendingRange>
bool assure_sufficient_live_nodes_each_quorum(
consistency_level cl,
const locator::effective_replication_map& erm,
const Range& live_endpoints,
const PendingRange& pending_endpoints) {
const host_id_vector_replica_set& live_endpoints,
const host_id_vector_topology_change& pending_endpoints) {
using namespace locator;
auto& rs = erm.get_replication_strategy();
@@ -175,12 +173,11 @@ bool assure_sufficient_live_nodes_each_quorum(
return false;
}
template<typename Range, typename PendingRange>
void assure_sufficient_live_nodes(
consistency_level cl,
const locator::effective_replication_map& erm,
const Range& live_endpoints,
const PendingRange& pending_endpoints) {
const host_id_vector_replica_set& live_endpoints,
const host_id_vector_topology_change& pending_endpoints) {
size_t need = block_for(erm, cl);
auto adjust_live_for_error = [] (size_t live, size_t pending) {
@@ -228,10 +225,6 @@ void assure_sufficient_live_nodes(
}
}
template void assure_sufficient_live_nodes(consistency_level, const locator::effective_replication_map&, const inet_address_vector_replica_set&, const std::array<gms::inet_address, 0>&);
template void assure_sufficient_live_nodes(db::consistency_level, const locator::effective_replication_map&, const inet_address_vector_replica_set&, const utils::small_vector<gms::inet_address, 1ul>&);
template void assure_sufficient_live_nodes(db::consistency_level, const locator::effective_replication_map&, const host_id_vector_replica_set&, const host_id_vector_topology_change&);
host_id_vector_replica_set
filter_for_query(consistency_level cl,
const locator::effective_replication_map& erm,

View File

@@ -61,15 +61,9 @@ is_sufficient_live_nodes(consistency_level cl,
const locator::effective_replication_map& erm,
const host_id_vector_replica_set& live_endpoints);
template<typename Range, typename PendingRange = std::array<gms::inet_address, 0>>
void assure_sufficient_live_nodes(
consistency_level cl,
const locator::effective_replication_map& erm,
const Range& live_endpoints,
const PendingRange& pending_endpoints = std::array<gms::inet_address, 0>());
extern template void assure_sufficient_live_nodes(consistency_level, const locator::effective_replication_map&, const inet_address_vector_replica_set&, const std::array<gms::inet_address, 0>&);
extern template void assure_sufficient_live_nodes(db::consistency_level, const locator::effective_replication_map&, const inet_address_vector_replica_set&, const utils::small_vector<gms::inet_address, 1ul>&);
extern template void assure_sufficient_live_nodes(db::consistency_level, const locator::effective_replication_map&, const host_id_vector_replica_set&, const host_id_vector_topology_change&);
const host_id_vector_replica_set& live_endpoints,
const host_id_vector_topology_change& pending_endpoints = host_id_vector_topology_change());
}

View File

@@ -82,33 +82,16 @@ bool hint_sender::can_send() noexcept {
}
const auto tmptr = _shard_manager._proxy.get_token_metadata_ptr();
const auto maybe_ep = std::invoke([&] () noexcept -> std::optional<gms::inet_address> {
try {
return tmptr->get_endpoint_for_host_id_if_known(_ep_key);
} catch (...) {
return std::nullopt;
}
});
try {
// `hint_sender` can never target this node, so if the returned optional is empty,
// that must mean the current locator::token_metadata doesn't store the information
// about the target node.
if (maybe_ep && _gossiper.is_alive(*maybe_ep)) {
_state.remove(state::ep_state_left_the_ring);
return true;
} else {
if (!_state.contains(state::ep_state_left_the_ring)) {
_state.set_if<state::ep_state_left_the_ring>(!tmptr->is_normal_token_owner(_ep_key));
}
// If the node is not part of the ring, we will send hints to all new replicas.
// Note that if the optional -- `maybe_ep` -- is empty, that could mean that `_ep_key`
// is the locator::host_id of THIS node. However, that's impossible because instances
// of `hint_sender` are only created for OTHER nodes, so this logic is correct.
return _state.contains(state::ep_state_left_the_ring);
if (_gossiper.is_alive(_ep_key)) {
_state.remove(state::ep_state_left_the_ring);
return true;
} else {
if (!_state.contains(state::ep_state_left_the_ring)) {
_state.set_if<state::ep_state_left_the_ring>(!tmptr->is_normal_token_owner(_ep_key));
}
} catch (...) {
return false;
// If the node is not part of the ring, we will send hints to all new replicas.
return _state.contains(state::ep_state_left_the_ring);
}
}

View File

@@ -266,21 +266,14 @@ void manager::forbid_hints_for_eps_with_pending_hints() {
}
}
sync_point::shard_rps manager::calculate_current_sync_point(std::span<const gms::inet_address> target_eps) const {
sync_point::shard_rps manager::calculate_current_sync_point(std::span<const locator::host_id> target_eps) const {
sync_point::shard_rps rps;
const auto tmptr = _proxy.get_token_metadata_ptr();
for (auto addr : target_eps) {
const auto hid = tmptr->get_host_id_if_known(addr);
// Ignore the IPs that we cannot map.
if (!hid) {
continue;
}
auto it = _ep_managers.find(*hid);
auto it = _ep_managers.find(addr);
if (it != _ep_managers.end()) {
const hint_endpoint_manager& ep_man = it->second;
rps[*hid] = ep_man.last_written_replay_position();
rps[addr] = ep_man.last_written_replay_position();
}
}
@@ -342,10 +335,11 @@ future<> manager::wait_for_sync_point(abort_source& as, const sync_point::shard_
for (const auto& [addr, rp] : rps) {
if (std::holds_alternative<gms::inet_address>(addr)) {
const auto maybe_hid = tmptr->get_host_id_if_known(std::get<gms::inet_address>(addr));
// Ignore the IPs we cannot map.
if (maybe_hid) [[likely]] {
hid_rps.emplace(*maybe_hid, rp);
try {
const auto hid = _gossiper_anchor->get_host_id(std::get<gms::inet_address>(addr));
hid_rps.emplace(hid, rp);
} catch (...) {
// Ignore the IPs we cannot map.
}
} else {
hid_rps.emplace(std::get<locator::host_id>(addr), rp);
@@ -436,11 +430,11 @@ bool manager::have_ep_manager(const std::variant<locator::host_id, gms::inet_add
return _hint_directory_manager.has_mapping(std::get<gms::inet_address>(ep));
}
bool manager::store_hint(endpoint_id host_id, gms::inet_address ip, schema_ptr s, lw_shared_ptr<const frozen_mutation> fm,
bool manager::store_hint(endpoint_id host_id, schema_ptr s, lw_shared_ptr<const frozen_mutation> fm,
tracing::trace_state_ptr tr_state) noexcept
{
if (utils::get_local_injector().enter("reject_incoming_hints")) {
manager_logger.debug("Rejecting a hint to {} / {} due to an error injection", host_id, ip);
manager_logger.debug("Rejecting a hint to {} due to an error injection", host_id);
++_stats.dropped;
return false;
}
@@ -451,6 +445,8 @@ bool manager::store_hint(endpoint_id host_id, gms::inet_address ip, schema_ptr s
return false;
}
auto ip = _gossiper_anchor->get_address_map().get(host_id);
try {
manager_logger.trace("Going to store a hint to {}", host_id);
tracing::trace(tr_state, "Going to store a hint to {}", host_id);
@@ -599,9 +595,9 @@ future<> manager::change_host_filter(host_filter filter) {
// been created by mistake and they're invalid. The same for pre-host-ID hinted handoff
// -- hint directories representing host IDs are NOT valid.
if (hid_or_ep.has_host_id() && _uses_host_id) {
return std::make_optional(pair_type{hid_or_ep.id(), hid_or_ep.resolve_endpoint(*tmptr)});
return std::make_optional(pair_type{hid_or_ep.id(), hid_or_ep.resolve_endpoint(*_gossiper_anchor)});
} else if (hid_or_ep.has_endpoint() && !_uses_host_id) {
return std::make_optional(pair_type{hid_or_ep.resolve_id(*tmptr), hid_or_ep.endpoint()});
return std::make_optional(pair_type{hid_or_ep.resolve_id(*_gossiper_anchor), hid_or_ep.endpoint()});
} else {
return std::nullopt;
}
@@ -822,7 +818,7 @@ future<> manager::initialize_endpoint_managers() {
const auto maybe_host_id = std::invoke([&] () -> std::optional<locator::host_id> {
try {
return maybe_host_id_or_ep->resolve_id(*tmptr);
return maybe_host_id_or_ep->resolve_id(*_gossiper_anchor);
} catch (...) {
return std::nullopt;
}
@@ -874,7 +870,7 @@ future<> manager::migrate_ip_directories() {
continue;
}
const locator::host_id host_id = hid_or_ep.resolve_id(*tmptr);
const locator::host_id host_id = hid_or_ep.resolve_id(*_gossiper_anchor);
dirs_to_rename.push_back({.current_name = std::move(directory), .new_name = host_id.to_sstring()});
} catch (...) {
// We cannot map the IP to the corresponding host ID either because

View File

@@ -171,7 +171,7 @@ public:
void register_metrics(const sstring& group_name);
future<> start(shared_ptr<const gms::gossiper> gossiper_ptr);
future<> stop();
bool store_hint(endpoint_id host_id, gms::inet_address ip, schema_ptr s, lw_shared_ptr<const frozen_mutation> fm,
bool store_hint(endpoint_id host_id, schema_ptr s, lw_shared_ptr<const frozen_mutation> fm,
tracing::trace_state_ptr tr_state) noexcept;
/// \brief Changes the host_filter currently used, stopping and starting endpoint_managers relevant to the new host_filter.
@@ -278,7 +278,7 @@ public:
///
/// \param target_eps The list of endpoints the sync point should correspond to. When empty, the function assumes all endpoints.
/// \return Sync point corresponding to the specified endpoints.
sync_point::shard_rps calculate_current_sync_point(std::span<const gms::inet_address> target_eps) const;
sync_point::shard_rps calculate_current_sync_point(std::span<const locator::host_id> target_eps) const;
/// \brief Waits until hint replay reach replay positions described in `rps`.
future<> wait_for_sync_point(abort_source& as, const sync_point::shard_rps& rps);

View File

@@ -2080,7 +2080,7 @@ future<> system_keyspace::update_peer_info(gms::inet_address ep, locator::host_i
if (!hid) {
on_internal_error(slogger, format("update_peer_info called with empty host_id, ep {}", ep));
}
if (_db.get_token_metadata().get_topology().is_me(ep)) {
if (_db.get_token_metadata().get_topology().is_me(hid)) {
on_internal_error(slogger, format("update_peer_info called for this node: {}", ep));
}

View File

@@ -2433,14 +2433,14 @@ future<std::unordered_map<locator::host_id, sstring>> view_builder::view_status(
}
future<std::unordered_map<sstring, sstring>>
view_builder::view_build_statuses(sstring keyspace, sstring view_name) const {
view_builder::view_build_statuses(sstring keyspace, sstring view_name, const gms::gossiper& gossiper) const {
std::unordered_map<locator::host_id, sstring> status = co_await view_status(std::move(keyspace), std::move(view_name));
std::unordered_map<sstring, sstring> status_map;
const auto& topo = _db.get_token_metadata().get_topology();
topo.for_each_node([&] (const locator::node& node) {
auto it = status.find(node.host_id());
auto s = it != status.end() ? std::move(it->second) : "UNKNOWN";
status_map.emplace(fmt::to_string(node.endpoint()), std::move(s));
status_map.emplace(fmt::to_string(gossiper.get_address_map().get(node.host_id())), std::move(s));
});
co_return status_map;
}
@@ -2670,7 +2670,7 @@ future<> view_builder::migrate_to_v2(locator::token_metadata_ptr tmptr, db::syst
// In the v1 table we may have left over rows that belong to nodes that were removed
// and we didn't clean them, so do that now.
auto host_id = row.get_as<utils::UUID>("host_id");
if (!tmptr->get_endpoint_for_host_id_if_known(locator::host_id(host_id))) {
if (!tmptr->get_topology().find_node(locator::host_id(host_id))) {
vlogger.warn("Dropping a row from view_build_status: host {} does not exist", host_id);
continue;
}
@@ -3150,7 +3150,7 @@ future<bool> view_builder::check_view_build_ongoing(const locator::token_metadat
return view_status(ks_name, cf_name).then([&tm] (view_statuses_type&& view_statuses) {
return std::ranges::any_of(view_statuses, [&tm] (const view_statuses_type::value_type& view_status) {
// Only consider status of known hosts.
return view_status.second == "STARTED" && tm.get_endpoint_for_host_id_if_known(view_status.first);
return view_status.second == "STARTED" && tm.get_topology().find_node(view_status.first);
});
});
}

View File

@@ -235,7 +235,7 @@ public:
// For tests
future<> wait_until_built(const sstring& ks_name, const sstring& view_name);
future<std::unordered_map<sstring, sstring>> view_build_statuses(sstring keyspace, sstring view_name) const;
future<std::unordered_map<sstring, sstring>> view_build_statuses(sstring keyspace, sstring view_name, const gms::gossiper& g) const;
// Can only be called on shard-0
future<> mark_existing_views_as_built();

View File

@@ -75,7 +75,7 @@ public:
std::vector<frozen_mutation> muts;
muts.reserve(gossiper.num_endpoints());
gossiper.for_each_endpoint_state([&] (const gms::inet_address& endpoint, const gms::endpoint_state&) {
gossiper.for_each_endpoint_state([&] (const gms::inet_address& endpoint, const gms::endpoint_state& eps) {
static thread_local auto s = build_schema();
mutation m(s, partition_key::from_single_value(*s, data_value(endpoint).serialize_nonnull()));
row& cr = m.partition().clustered_row(*schema(), clustering_key::make_empty()).cells();
@@ -86,16 +86,15 @@ public:
}
set_cell(cr, "load", gossiper.get_application_state_value(endpoint, gms::application_state::LOAD));
auto hostid = tm.get_host_id_if_known(endpoint);
if (hostid) {
if (ss.raft_topology_change_enabled() && !gossiper.is_shutdown(endpoint)) {
set_cell(cr, "status", boost::to_upper_copy<std::string>(fmt::format("{}", ss.get_node_state(*hostid))));
}
set_cell(cr, "host_id", hostid->uuid());
auto hostid = eps.get_host_id();
if (ss.raft_topology_change_enabled() && !gossiper.is_shutdown(endpoint)) {
set_cell(cr, "status", boost::to_upper_copy<std::string>(fmt::format("{}", ss.get_node_state(hostid))));
}
set_cell(cr, "host_id", hostid.uuid());
if (hostid) {
sstring dc = tm.get_topology().get_location(endpoint).dc;
if (tm.get_topology().has_node(hostid)) {
// Not all entries in gossiper are present in the topology
sstring dc = tm.get_topology().get_location(hostid).dc;
set_cell(cr, "dc", dc);
}
@@ -103,7 +102,7 @@ public:
set_cell(cr, "owns", ownership[endpoint]);
}
set_cell(cr, "tokens", int32_t(hostid ? tm.get_tokens(*hostid).size() : 0));
set_cell(cr, "tokens", int32_t(tm.get_tokens(hostid).size()));
muts.push_back(freeze(std::move(m)));
});