Merge 'Fix a regression that sometimes causes an internal error and demote barrier_and_drain rpc error log to a warning ' from Gleb Natapov
The series fixes a regression and demotes a barrier_and_drain logging error to a warning since this particular condition may happen during normal operation. We want to backport both since one is a bug fix and another is trivial and reduces CI flakiness. Closes scylladb/scylladb#22650 * https://github.com/scylladb/scylladb: topology_coordinator: demote barrier_and_drain rpc failure to warning topology_coordinator: read peers table only once during topology state application
This commit is contained in:
@@ -405,7 +405,22 @@ static locator::node::state to_topology_node_state(node_state ns) {
|
||||
on_internal_error(rtlogger, format("unhandled node state: {}", ns));
|
||||
}
|
||||
|
||||
future<> storage_service::raft_topology_update_ip(locator::host_id id, gms::inet_address ip, nodes_to_notify_after_sync* nodes_to_notify) {
|
||||
future<storage_service::host_id_to_ip_map_t> storage_service::get_host_id_to_ip_map() {
|
||||
host_id_to_ip_map_t map;
|
||||
const auto ep_to_id_map = co_await _sys_ks.local().load_host_ids();
|
||||
map.reserve(ep_to_id_map.size());
|
||||
for (const auto& [ep, id]: ep_to_id_map) {
|
||||
const auto [it, inserted] = map.insert({id, ep});
|
||||
if (!inserted) {
|
||||
on_internal_error(slogger, ::format("duplicate IP for host_id {}, first IP {}, second IP {}",
|
||||
id, it->second, ep));
|
||||
}
|
||||
}
|
||||
co_return map;
|
||||
};
|
||||
|
||||
|
||||
future<> storage_service::raft_topology_update_ip(locator::host_id id, gms::inet_address ip, const host_id_to_ip_map_t& host_id_to_ip_map, nodes_to_notify_after_sync* nodes_to_notify) {
|
||||
const auto& t = _topology_state_machine._topology;
|
||||
raft::server_id raft_id{id.uuid()};
|
||||
|
||||
@@ -417,23 +432,6 @@ future<> storage_service::raft_topology_update_ip(locator::host_id id, gms::inet
|
||||
co_return;
|
||||
}
|
||||
|
||||
using host_id_to_ip_map_t = std::unordered_map<locator::host_id, gms::inet_address>;
|
||||
auto get_host_id_to_ip_map = [&, map = std::optional<host_id_to_ip_map_t>{}]() mutable -> future<const host_id_to_ip_map_t*> {
|
||||
if (!map.has_value()) {
|
||||
const auto ep_to_id_map = co_await _sys_ks.local().load_host_ids();
|
||||
map.emplace();
|
||||
map->reserve(ep_to_id_map.size());
|
||||
for (const auto& [ep, id]: ep_to_id_map) {
|
||||
const auto [it, inserted] = map->insert({id, ep});
|
||||
if (!inserted) {
|
||||
on_internal_error(slogger, ::format("duplicate IP for host_id {}, first IP {}, second IP {}",
|
||||
id, it->second, ep));
|
||||
}
|
||||
}
|
||||
}
|
||||
co_return &*map;
|
||||
};
|
||||
|
||||
const auto& rs = node->second;
|
||||
|
||||
switch (rs.state) {
|
||||
@@ -444,11 +442,6 @@ future<> storage_service::raft_topology_update_ip(locator::host_id id, gms::inet
|
||||
// In replace-with-same-ip scenario the replaced node IP will be the same
|
||||
// as ours, we shouldn't put it into system.peers.
|
||||
|
||||
// We need to fetch host_id_map before the update_peer_info call below,
|
||||
// get_host_id_to_ip_map checks for duplicates and update_peer_info can
|
||||
// add one.
|
||||
const auto& host_id_to_ip_map = *(co_await get_host_id_to_ip_map());
|
||||
|
||||
// Some state that is used to fill in 'peers' table is still propagated over gossiper.
|
||||
// Populate the table with the state from the gossiper here since storage_service::on_change()
|
||||
// (which is called each time gossiper state changes) may have skipped it because the tokens
|
||||
@@ -615,12 +608,13 @@ future<storage_service::nodes_to_notify_after_sync> storage_service::sync_raft_t
|
||||
|
||||
sys_ks_futures.reserve(t.left_nodes.size() + t.normal_nodes.size() + t.transition_nodes.size());
|
||||
|
||||
auto id_to_ip_map = co_await get_host_id_to_ip_map();
|
||||
for (const auto& id: t.left_nodes) {
|
||||
locator::host_id host_id{id.uuid()};
|
||||
auto ip = _address_map.find(host_id);
|
||||
co_await process_left_node(id, host_id, ip);
|
||||
if (ip) {
|
||||
sys_ks_futures.push_back(raft_topology_update_ip(host_id, *ip, nullptr));
|
||||
sys_ks_futures.push_back(raft_topology_update_ip(host_id, *ip, id_to_ip_map, nullptr));
|
||||
}
|
||||
}
|
||||
for (const auto& [id, rs]: t.normal_nodes) {
|
||||
@@ -628,7 +622,7 @@ future<storage_service::nodes_to_notify_after_sync> storage_service::sync_raft_t
|
||||
auto ip = _address_map.find(host_id);
|
||||
co_await process_normal_node(id, host_id, ip, rs);
|
||||
if (ip) {
|
||||
sys_ks_futures.push_back(raft_topology_update_ip(host_id, *ip, prev_normal.contains(id) ? nullptr : &nodes_to_notify));
|
||||
sys_ks_futures.push_back(raft_topology_update_ip(host_id, *ip, id_to_ip_map, prev_normal.contains(id) ? nullptr : &nodes_to_notify));
|
||||
}
|
||||
}
|
||||
for (const auto& [id, rs]: t.transition_nodes) {
|
||||
@@ -636,7 +630,7 @@ future<storage_service::nodes_to_notify_after_sync> storage_service::sync_raft_t
|
||||
auto ip = _address_map.find(host_id);
|
||||
co_await process_transition_node(id, host_id, ip, rs);
|
||||
if (ip) {
|
||||
sys_ks_futures.push_back(raft_topology_update_ip(host_id, *ip, nullptr));
|
||||
sys_ks_futures.push_back(raft_topology_update_ip(host_id, *ip, id_to_ip_map, nullptr));
|
||||
}
|
||||
}
|
||||
for (auto id : t.get_excluded_nodes()) {
|
||||
@@ -964,7 +958,7 @@ class storage_service::ip_address_updater: public gms::i_endpoint_state_change_s
|
||||
co_await utils::get_local_injector().inject("ip-change-raft-sync-delay", std::chrono::milliseconds(500));
|
||||
// Set notify_join to true since here we detected address change and drivers have to be notified
|
||||
nodes_to_notify_after_sync nodes_to_notify;
|
||||
co_await _ss.raft_topology_update_ip(id, endpoint, &nodes_to_notify);
|
||||
co_await _ss.raft_topology_update_ip(id, endpoint, co_await _ss.get_host_id_to_ip_map(), &nodes_to_notify);
|
||||
co_await _ss.notify_nodes_after_sync(std::move(nodes_to_notify));
|
||||
}));
|
||||
}
|
||||
|
||||
@@ -963,7 +963,9 @@ private:
|
||||
std::vector<gms::inet_address> joined;
|
||||
};
|
||||
|
||||
future<> raft_topology_update_ip(locator::host_id id, gms::inet_address ip, nodes_to_notify_after_sync* nodes_to_notify);
|
||||
using host_id_to_ip_map_t = std::unordered_map<locator::host_id, gms::inet_address>;
|
||||
future<host_id_to_ip_map_t> get_host_id_to_ip_map();
|
||||
future<> raft_topology_update_ip(locator::host_id id, gms::inet_address ip, const host_id_to_ip_map_t& map, nodes_to_notify_after_sync* nodes_to_notify);
|
||||
// Synchronizes the local node state (token_metadata, system.peers/system.local tables,
|
||||
// gossiper) to align it with the other raft topology nodes.
|
||||
// Optional target_node can be provided to restrict the synchronization to the specified node.
|
||||
|
||||
@@ -1001,7 +1001,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
try {
|
||||
guard = co_await exec_global_command(std::move(guard), raft_topology_cmd::command::barrier_and_drain, exclude_nodes, drop_guard_and_retake::yes);
|
||||
} catch (...) {
|
||||
rtlogger.error("drain rpc failed, proceed to fence old writes: {}", std::current_exception());
|
||||
rtlogger.warn("drain rpc failed, proceed to fence old writes: {}", std::current_exception());
|
||||
drain_failed = true;
|
||||
}
|
||||
if (drain_failed) {
|
||||
|
||||
Reference in New Issue
Block a user