diff --git a/service/storage_service.cc b/service/storage_service.cc index bbd4af7091..3833380660 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -405,7 +405,22 @@ static locator::node::state to_topology_node_state(node_state ns) { on_internal_error(rtlogger, format("unhandled node state: {}", ns)); } -future<> storage_service::raft_topology_update_ip(locator::host_id id, gms::inet_address ip, nodes_to_notify_after_sync* nodes_to_notify) { +future storage_service::get_host_id_to_ip_map() { + host_id_to_ip_map_t map; + const auto ep_to_id_map = co_await _sys_ks.local().load_host_ids(); + map.reserve(ep_to_id_map.size()); + for (const auto& [ep, id]: ep_to_id_map) { + const auto [it, inserted] = map.insert({id, ep}); + if (!inserted) { + on_internal_error(slogger, ::format("duplicate IP for host_id {}, first IP {}, second IP {}", + id, it->second, ep)); + } + } + co_return map; +}; + + +future<> storage_service::raft_topology_update_ip(locator::host_id id, gms::inet_address ip, const host_id_to_ip_map_t& host_id_to_ip_map, nodes_to_notify_after_sync* nodes_to_notify) { const auto& t = _topology_state_machine._topology; raft::server_id raft_id{id.uuid()}; @@ -417,23 +432,6 @@ future<> storage_service::raft_topology_update_ip(locator::host_id id, gms::inet co_return; } - using host_id_to_ip_map_t = std::unordered_map; - auto get_host_id_to_ip_map = [&, map = std::optional{}]() mutable -> future { - if (!map.has_value()) { - const auto ep_to_id_map = co_await _sys_ks.local().load_host_ids(); - map.emplace(); - map->reserve(ep_to_id_map.size()); - for (const auto& [ep, id]: ep_to_id_map) { - const auto [it, inserted] = map->insert({id, ep}); - if (!inserted) { - on_internal_error(slogger, ::format("duplicate IP for host_id {}, first IP {}, second IP {}", - id, it->second, ep)); - } - } - } - co_return &*map; - }; - const auto& rs = node->second; switch (rs.state) { @@ -444,11 +442,6 @@ future<> storage_service::raft_topology_update_ip(locator::host_id id, gms::inet // In replace-with-same-ip scenario the replaced node IP will be the same // as ours, we shouldn't put it into system.peers. - // We need to fetch host_id_map before the update_peer_info call below, - // get_host_id_to_ip_map checks for duplicates and update_peer_info can - // add one. - const auto& host_id_to_ip_map = *(co_await get_host_id_to_ip_map()); - // Some state that is used to fill in 'peers' table is still propagated over gossiper. // Populate the table with the state from the gossiper here since storage_service::on_change() // (which is called each time gossiper state changes) may have skipped it because the tokens @@ -615,12 +608,13 @@ future storage_service::sync_raft_t sys_ks_futures.reserve(t.left_nodes.size() + t.normal_nodes.size() + t.transition_nodes.size()); + auto id_to_ip_map = co_await get_host_id_to_ip_map(); for (const auto& id: t.left_nodes) { locator::host_id host_id{id.uuid()}; auto ip = _address_map.find(host_id); co_await process_left_node(id, host_id, ip); if (ip) { - sys_ks_futures.push_back(raft_topology_update_ip(host_id, *ip, nullptr)); + sys_ks_futures.push_back(raft_topology_update_ip(host_id, *ip, id_to_ip_map, nullptr)); } } for (const auto& [id, rs]: t.normal_nodes) { @@ -628,7 +622,7 @@ future storage_service::sync_raft_t auto ip = _address_map.find(host_id); co_await process_normal_node(id, host_id, ip, rs); if (ip) { - sys_ks_futures.push_back(raft_topology_update_ip(host_id, *ip, prev_normal.contains(id) ? nullptr : &nodes_to_notify)); + sys_ks_futures.push_back(raft_topology_update_ip(host_id, *ip, id_to_ip_map, prev_normal.contains(id) ? nullptr : &nodes_to_notify)); } } for (const auto& [id, rs]: t.transition_nodes) { @@ -636,7 +630,7 @@ future storage_service::sync_raft_t auto ip = _address_map.find(host_id); co_await process_transition_node(id, host_id, ip, rs); if (ip) { - sys_ks_futures.push_back(raft_topology_update_ip(host_id, *ip, nullptr)); + sys_ks_futures.push_back(raft_topology_update_ip(host_id, *ip, id_to_ip_map, nullptr)); } } for (auto id : t.get_excluded_nodes()) { @@ -964,7 +958,7 @@ class storage_service::ip_address_updater: public gms::i_endpoint_state_change_s co_await utils::get_local_injector().inject("ip-change-raft-sync-delay", std::chrono::milliseconds(500)); // Set notify_join to true since here we detected address change and drivers have to be notified nodes_to_notify_after_sync nodes_to_notify; - co_await _ss.raft_topology_update_ip(id, endpoint, &nodes_to_notify); + co_await _ss.raft_topology_update_ip(id, endpoint, co_await _ss.get_host_id_to_ip_map(), &nodes_to_notify); co_await _ss.notify_nodes_after_sync(std::move(nodes_to_notify)); })); } diff --git a/service/storage_service.hh b/service/storage_service.hh index fd0e27cc07..ce6289624c 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -963,7 +963,9 @@ private: std::vector joined; }; - future<> raft_topology_update_ip(locator::host_id id, gms::inet_address ip, nodes_to_notify_after_sync* nodes_to_notify); + using host_id_to_ip_map_t = std::unordered_map; + future get_host_id_to_ip_map(); + future<> raft_topology_update_ip(locator::host_id id, gms::inet_address ip, const host_id_to_ip_map_t& map, nodes_to_notify_after_sync* nodes_to_notify); // Synchronizes the local node state (token_metadata, system.peers/system.local tables, // gossiper) to align it with the other raft topology nodes. // Optional target_node can be provided to restrict the synchronization to the specified node. diff --git a/service/topology_coordinator.cc b/service/topology_coordinator.cc index 5b9f907577..bf53b110dc 100644 --- a/service/topology_coordinator.cc +++ b/service/topology_coordinator.cc @@ -1001,7 +1001,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { try { guard = co_await exec_global_command(std::move(guard), raft_topology_cmd::command::barrier_and_drain, exclude_nodes, drop_guard_and_retake::yes); } catch (...) { - rtlogger.error("drain rpc failed, proceed to fence old writes: {}", std::current_exception()); + rtlogger.warn("drain rpc failed, proceed to fence old writes: {}", std::current_exception()); drain_failed = true; } if (drain_failed) {