diff --git a/service/storage_service.cc b/service/storage_service.cc index 29b48ee7e6..ea454fb68a 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -33,6 +33,7 @@ #include "seastar/core/scollectd.hh" #include "service/raft/group0_state_machine.hh" #include "service/raft/raft_group0_client.hh" +#include "service/topology_state_machine.hh" #include "utils/UUID.hh" #include "gms/inet_address.hh" #include "locator/load_sketch.hh" @@ -1223,45 +1224,113 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { } } - // Returns the guard back if no node to work on is found. - std::variant get_node_to_work_on_opt(group0_guard guard) { - auto& topo = _topo_sm._topology; - const std::pair* e = nullptr; + struct cancel_requests { + group0_guard guard; + }; - std::optional req; - if (topo.transition_nodes.size() != 0) { - // If there is a node that is the middle of topology operation continue with it - e = &*topo.transition_nodes.begin(); - } else if (topo.new_nodes.size() != 0) { - // Otherwise check if there is a new node that wants to be joined - e = &*topo.new_nodes.begin(); - req = topo.requests[e->first]; - } else if (!topo.requests.empty()) { - // If there is no new node but request queue is not empty there is a request for normal node - req = topo.requests.begin()->second; - e = &*topo.normal_nodes.find(topo.requests.begin()->first); - } - - if (!e) { - return guard; + struct start_cleanup { + group0_guard guard; + }; + + // Return dead nodes and while at it checking if there are live nodes that either need cleanup + // or running one already + std::unordered_set get_dead_node(bool& cleanup_running, bool& cleanup_needed) { + std::unordered_set dead_set; + cleanup_needed = cleanup_running = false; + for (auto& n : _topo_sm._topology.normal_nodes) { + bool alive = false; + try { + alive = _gossiper.is_alive(id2ip(locator::host_id(n.first.uuid()))); + } catch (...) {} + + if (!alive) { + dead_set.insert(n.first); + } else { + cleanup_running |= (n.second.cleanup == cleanup_status::running); + cleanup_needed |= (n.second.cleanup == cleanup_status::needed); + } } + return dead_set; + } + std::optional get_request_param(raft::server_id id) { std::optional req_param; - auto rit = topo.req_param.find(e->first); - if (rit != topo.req_param.end()) { + auto rit = _topo_sm._topology.req_param.find(id); + if (rit != _topo_sm._topology.req_param.end()) { req_param = rit->second; } - return node_to_work_on{std::move(guard), &topo, e->first, &e->second, std::move(req), std::move(req_param)}; + return req_param; + }; + + // Returns: + // guard - there is nothing to do. + // cancel_requests - no request can be started so cancel the queue + // start_cleanup - cleanup needs to be started + // node_to_work_on - the node the topology coordinator should work on + std::variant get_next_task(group0_guard guard) { + auto& topo = _topo_sm._topology; + + if (topo.transition_nodes.size() != 0) { + // If there is a node that is the middle of topology operation continue with it + return get_node_to_work_on(std::move(guard)); + } + + bool cleanup_running; + bool cleanup_needed; + const auto dead_nodes = get_dead_node(cleanup_running, cleanup_needed); + + if (cleanup_running || topo.requests.empty()) { + // Ether there is no requests or there is a live node that runs cleanup. Wait for it to complete. + return std::move(guard); + } + + std::optional> next_req; + + for (auto& req : topo.requests) { + auto enough_live_nodes = [&] { + auto exclude_nodes = get_excluded_nodes(req.first, req.second, get_request_param(req.first)); + for (auto id : dead_nodes) { + if (!exclude_nodes.contains(id)) { + return false; + } + } + return true; + }; + if (enough_live_nodes()) { + if (!next_req || next_req->second > req.second) { + next_req = req; + } + } + } + + if (!next_req) { + // We did not find a request that has enough live node to proceed + // Cancel all requests to let admin know that no operation can succeed + slogger.warn("topology coordinator: cancel request queue because no request can proceed. Dead nodes: {}", dead_nodes); + return cancel_requests{std::move(guard)}; + } + + auto [id, req] = *next_req; + + if (cleanup_needed && (req == topology_request::remove || req == topology_request::leave)) { + // If the highest prio request is removenode or decommission we need to start cleanup if one is needed + return start_cleanup(std::move(guard)); + } + + return node_to_work_on(std::move(guard), &topo, id, &topo.find(id)->second, req, get_request_param(id)); }; node_to_work_on get_node_to_work_on(group0_guard guard) { - auto node_or_guard = get_node_to_work_on_opt(std::move(guard)); - if (auto* node = std::get_if(&node_or_guard)) { - return std::move(*node); + auto& topo = _topo_sm._topology; + + if (topo.transition_nodes.empty()) { + on_internal_error(slogger, ::format( + "raft topology: could not find node to work on" + " even though the state requires it (state: {})", topo.tstate)); } - on_internal_error(slogger, ::format( - "raft topology: could not find node to work on" - " even though the state requires it (state: {})", _topo_sm._topology.tstate)); + + auto e = &*topo.transition_nodes.begin(); + return node_to_work_on{std::move(guard), &topo, e->first, &e->second, std::nullopt, get_request_param(e->first)}; }; future start_operation() { @@ -2090,12 +2159,23 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { // on the fact that the block which calls this is atomic. // FIXME: Don't take the ownership of the guard to make the above guarantee explicit. std::pair should_preempt_balancing(group0_guard guard) { - auto node_or_guard = get_node_to_work_on_opt(std::move(guard)); - if (auto* node = std::get_if(&node_or_guard)) { + auto work = get_next_task(std::move(guard)); + if (auto* node = std::get_if(&work)) { return std::make_pair(true, std::move(node->guard)); } - guard = std::get(std::move(node_or_guard)); + if (auto* cancel = std::get_if(&work)) { + // request queue needs to be canceled, so preempt balancing + return std::make_pair(true, std::move(cancel->guard)); + } + + if (auto* cleanup = std::get_if(&work)) { + // cleanup has to be started + return std::make_pair(true, std::move(cleanup->guard)); + } + + guard = std::get(std::move(work)); + if (_topo_sm._topology.global_request) { return std::make_pair(true, std::move(guard)); } @@ -2107,19 +2187,87 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { return std::make_pair(false, std::move(guard)); } + future<> cancel_all_requests(group0_guard guard) { + std::vector muts; + std::vector reject_join; + if (_topo_sm._topology.requests.empty()) { + co_return; + } + auto ts = guard.write_timestamp(); + for (auto& [id, req] : _topo_sm._topology.requests) { + switch (req) { + case topology_request::replace: + [[fallthrough]]; + case topology_request::join: { + topology_mutation_builder builder(ts); + builder.with_node(id) + .set("node_state", node_state::left) + .del("topology_request"); + reject_join.emplace_back(id); + muts.emplace_back(builder.build()); + try { + co_await wait_for_ip(id, _address_map, _as); + } catch (...) { + slogger.warn("wait_for_ip failed during cancelation: {}", std::current_exception()); + } + } + break; + case topology_request::leave: + [[fallthrough]]; + case topology_request::rebuild: + [[fallthrough]]; + case topology_request::remove: { + topology_mutation_builder builder(ts); + builder.with_node(id) + .del("topology_request"); + muts.emplace_back(builder.build()); + } + break; + } + } + + co_await update_topology_state(std::move(guard), std::move(muts), "cancel all topology requests"); + + for (auto id : reject_join) { + try { + co_await respond_to_joining_node(id, join_node_response_params{ + .response = join_node_response_params::rejected{ + .reason = "request canceled because some required nodes are dead" + }, + }); + } catch (...) { + slogger.warn("raft topology: attempt to send rejection response to {} failed: {}. " + "The node may hang. It's safe to shut it down manually now.", + id, std::current_exception()); + } + } + + } + // Returns `true` iff there was work to do. future handle_topology_transition(group0_guard guard) { auto tstate = _topo_sm._topology.tstate; if (!tstate) { // When adding a new source of work, make sure to update should_preempt_balancing() as well. - auto node_or_guard = get_node_to_work_on_opt(std::move(guard)); - if (auto* node = std::get_if(&node_or_guard)) { + auto work = get_next_task(std::move(guard)); + if (auto* node = std::get_if(&work)) { co_await handle_node_transition(std::move(*node)); co_return true; } - guard = std::get(std::move(node_or_guard)); + if (auto* cancel = std::get_if(&work)) { + co_await cancel_all_requests(std::move(cancel->guard)); + co_return true; + } + + if (auto* cleanup = std::get_if(&work)) { + co_await start_cleanup_on_dirty_nodes(std::move(cleanup->guard)); + co_return true; + } + + guard = std::get(std::move(work)); + if (_topo_sm._topology.global_request) { co_await handle_global_request(std::move(guard)); co_return true; @@ -2849,7 +2997,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { // mark all nodes (except self) as cleanup needed if (node.id != id) { topology_mutation_builder builder(node.guard.write_timestamp()); - builder.with_node(id).set("cleanup_status", cleanup_status::running); + builder.with_node(id).set("cleanup_status", cleanup_status::needed); muts.emplace_back(builder.build()); slogger.trace("raft topology: mark node {} as needed cleanup", id); } @@ -2857,6 +3005,25 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { return muts; } + future<> start_cleanup_on_dirty_nodes(group0_guard guard) { + auto& topo = _topo_sm._topology; + std::vector muts; + muts.reserve(topo.normal_nodes.size()); + + for (auto& [id, rs] : topo.normal_nodes) { + if (rs.cleanup == cleanup_status::needed) { + topology_mutation_builder builder(guard.write_timestamp()); + builder.with_node(id).set("cleanup_status", cleanup_status::running); + muts.emplace_back(builder.build()); + slogger.trace("raft topology: mark node {} as cleanup running", id); + } + } + if (!muts.empty()) { + co_await update_topology_state(std::move(guard), std::move(muts), "Starting cleanup"); + } + } + + // Returns true if the state machine was transitioned into tablet migration path. future maybe_start_tablet_migration(group0_guard); diff --git a/service/topology_state_machine.hh b/service/topology_state_machine.hh index 9dded34da3..381a45d72d 100644 --- a/service/topology_state_machine.hh +++ b/service/topology_state_machine.hh @@ -39,11 +39,15 @@ enum class node_state: uint16_t { rollback_to_normal, // the node rolls back failed decommission/remove node operation }; +// The order of the requests is a priority +// order in which requests are executes in case +// there are multiple requests in the queue. +// The order tries to minimize the amount of cleanups. enum class topology_request: uint16_t { - join, - leave, - remove, replace, + join, + remove, + leave, rebuild };