From 1da7d6bf022ed75e7285c5400c4ba0d6fc307de3 Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Mon, 3 Feb 2025 12:22:59 +0200 Subject: [PATCH 1/2] topology_coordinator: read peers table only once during topology state application During topology state application peers table may be updated with the new ip->id mapping. The update is not atomic: it adds new mapping and then removes the old one. If we call get_host_id_to_ip_map while this is happening it may trigger an internal error there. This is a regression since ef929c5def64c447b26d790b6c944c9f84eaea95. Before that commit the code read the peers table only once before starting the update loop. This patch restores the behaviour. Fixes: scylladb/scylladb#22578 --- service/storage_service.cc | 48 +++++++++++++++++--------------------- service/storage_service.hh | 4 +++- 2 files changed, 24 insertions(+), 28 deletions(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index 97f9fbfa61..295903bb36 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -405,7 +405,22 @@ static locator::node::state to_topology_node_state(node_state ns) { on_internal_error(rtlogger, format("unhandled node state: {}", ns)); } -future<> storage_service::raft_topology_update_ip(locator::host_id id, gms::inet_address ip, nodes_to_notify_after_sync* nodes_to_notify) { +future storage_service::get_host_id_to_ip_map() { + host_id_to_ip_map_t map; + const auto ep_to_id_map = co_await _sys_ks.local().load_host_ids(); + map.reserve(ep_to_id_map.size()); + for (const auto& [ep, id]: ep_to_id_map) { + const auto [it, inserted] = map.insert({id, ep}); + if (!inserted) { + on_internal_error(slogger, ::format("duplicate IP for host_id {}, first IP {}, second IP {}", + id, it->second, ep)); + } + } + co_return map; +}; + + +future<> storage_service::raft_topology_update_ip(locator::host_id id, gms::inet_address ip, const host_id_to_ip_map_t& host_id_to_ip_map, nodes_to_notify_after_sync* nodes_to_notify) { const auto& t = _topology_state_machine._topology; raft::server_id raft_id{id.uuid()}; @@ -417,23 +432,6 @@ future<> storage_service::raft_topology_update_ip(locator::host_id id, gms::inet co_return; } - using host_id_to_ip_map_t = std::unordered_map; - auto get_host_id_to_ip_map = [&, map = std::optional{}]() mutable -> future { - if (!map.has_value()) { - const auto ep_to_id_map = co_await _sys_ks.local().load_host_ids(); - map.emplace(); - map->reserve(ep_to_id_map.size()); - for (const auto& [ep, id]: ep_to_id_map) { - const auto [it, inserted] = map->insert({id, ep}); - if (!inserted) { - on_internal_error(slogger, ::format("duplicate IP for host_id {}, first IP {}, second IP {}", - id, it->second, ep)); - } - } - } - co_return &*map; - }; - const auto& rs = node->second; switch (rs.state) { @@ -444,11 +442,6 @@ future<> storage_service::raft_topology_update_ip(locator::host_id id, gms::inet // In replace-with-same-ip scenario the replaced node IP will be the same // as ours, we shouldn't put it into system.peers. - // We need to fetch host_id_map before the update_peer_info call below, - // get_host_id_to_ip_map checks for duplicates and update_peer_info can - // add one. - const auto& host_id_to_ip_map = *(co_await get_host_id_to_ip_map()); - // Some state that is used to fill in 'peers' table is still propagated over gossiper. // Populate the table with the state from the gossiper here since storage_service::on_change() // (which is called each time gossiper state changes) may have skipped it because the tokens @@ -615,12 +608,13 @@ future storage_service::sync_raft_t sys_ks_futures.reserve(t.left_nodes.size() + t.normal_nodes.size() + t.transition_nodes.size()); + auto id_to_ip_map = co_await get_host_id_to_ip_map(); for (const auto& id: t.left_nodes) { locator::host_id host_id{id.uuid()}; auto ip = _address_map.find(host_id); co_await process_left_node(id, host_id, ip); if (ip) { - sys_ks_futures.push_back(raft_topology_update_ip(host_id, *ip, nullptr)); + sys_ks_futures.push_back(raft_topology_update_ip(host_id, *ip, id_to_ip_map, nullptr)); } } for (const auto& [id, rs]: t.normal_nodes) { @@ -628,7 +622,7 @@ future storage_service::sync_raft_t auto ip = _address_map.find(host_id); co_await process_normal_node(id, host_id, ip, rs); if (ip) { - sys_ks_futures.push_back(raft_topology_update_ip(host_id, *ip, prev_normal.contains(id) ? nullptr : &nodes_to_notify)); + sys_ks_futures.push_back(raft_topology_update_ip(host_id, *ip, id_to_ip_map, prev_normal.contains(id) ? nullptr : &nodes_to_notify)); } } for (const auto& [id, rs]: t.transition_nodes) { @@ -636,7 +630,7 @@ future storage_service::sync_raft_t auto ip = _address_map.find(host_id); co_await process_transition_node(id, host_id, ip, rs); if (ip) { - sys_ks_futures.push_back(raft_topology_update_ip(host_id, *ip, nullptr)); + sys_ks_futures.push_back(raft_topology_update_ip(host_id, *ip, id_to_ip_map, nullptr)); } } for (auto id : t.get_excluded_nodes()) { @@ -962,7 +956,7 @@ class storage_service::ip_address_updater: public gms::i_endpoint_state_change_s co_await utils::get_local_injector().inject("ip-change-raft-sync-delay", std::chrono::milliseconds(500)); // Set notify_join to true since here we detected address change and drivers have to be notified nodes_to_notify_after_sync nodes_to_notify; - co_await _ss.raft_topology_update_ip(id, endpoint, &nodes_to_notify); + co_await _ss.raft_topology_update_ip(id, endpoint, co_await _ss.get_host_id_to_ip_map(), &nodes_to_notify); co_await _ss.notify_nodes_after_sync(std::move(nodes_to_notify)); })); } diff --git a/service/storage_service.hh b/service/storage_service.hh index fd0e27cc07..ce6289624c 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -963,7 +963,9 @@ private: std::vector joined; }; - future<> raft_topology_update_ip(locator::host_id id, gms::inet_address ip, nodes_to_notify_after_sync* nodes_to_notify); + using host_id_to_ip_map_t = std::unordered_map; + future get_host_id_to_ip_map(); + future<> raft_topology_update_ip(locator::host_id id, gms::inet_address ip, const host_id_to_ip_map_t& map, nodes_to_notify_after_sync* nodes_to_notify); // Synchronizes the local node state (token_metadata, system.peers/system.local tables, // gossiper) to align it with the other raft topology nodes. // Optional target_node can be provided to restrict the synchronization to the specified node. From fe45ea505bd20f0f594dfeca2a97088c6f7c372c Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Mon, 3 Feb 2025 13:05:01 +0200 Subject: [PATCH 2/2] topology_coordinator: demote barrier_and_drain rpc failure to warning The failure may happen during normal operation as well (for instance if leader changes). Fixes: scylladb/scylladb#22364 --- service/topology_coordinator.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/service/topology_coordinator.cc b/service/topology_coordinator.cc index 5f478fc3ba..a16b9857a1 100644 --- a/service/topology_coordinator.cc +++ b/service/topology_coordinator.cc @@ -1000,7 +1000,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { try { guard = co_await exec_global_command(std::move(guard), raft_topology_cmd::command::barrier_and_drain, exclude_nodes, drop_guard_and_retake::yes); } catch (...) { - rtlogger.error("drain rpc failed, proceed to fence old writes: {}", std::current_exception()); + rtlogger.warn("drain rpc failed, proceed to fence old writes: {}", std::current_exception()); drain_failed = true; } if (drain_failed) {