storage_service: topology coordinator: manage cluster cleanup as part of the topology management

Sometimes it is unsafe to start a new topology operation before cleanup
runs on dirty nodes. This patch detects the situation when the topology
operation to be executed cannot be run safely until all dirty nodes do
cleanup and initiates the cleanup automatically. It also waits for
cleanup to complete before proceeding with the topology operation.

There can be a situation that nodes that needs cleanup dies and will
never clear the flag. In this case if a topology operation that wants to
run next does not have this node in its ignore node list it may stuck
forever. To fix this the patch also introduces the "liveness aware"
request queue management: we do not simple choose _a_ request to run next,
but go over the queue and find requests that can proceed considering
the nodes liveness situation. If there are multiple requests eligible to
run the patch introduces the order based on the operation type: replace,
join, remove, leave, rebuild. The order is such so to not trigger cleanup
needlessly.
This commit is contained in:
Gleb Natapov
2023-10-26 09:15:05 +03:00
parent c9b7bd5a33
commit 0adb3904d8
2 changed files with 210 additions and 39 deletions

View File

@@ -33,6 +33,7 @@
#include "seastar/core/scollectd.hh"
#include "service/raft/group0_state_machine.hh"
#include "service/raft/raft_group0_client.hh"
#include "service/topology_state_machine.hh"
#include "utils/UUID.hh"
#include "gms/inet_address.hh"
#include "locator/load_sketch.hh"
@@ -1223,45 +1224,113 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
}
}
// Returns the guard back if no node to work on is found.
std::variant<group0_guard, node_to_work_on> get_node_to_work_on_opt(group0_guard guard) {
auto& topo = _topo_sm._topology;
const std::pair<const raft::server_id, replica_state>* e = nullptr;
struct cancel_requests {
group0_guard guard;
};
std::optional<topology_request> req;
if (topo.transition_nodes.size() != 0) {
// If there is a node that is the middle of topology operation continue with it
e = &*topo.transition_nodes.begin();
} else if (topo.new_nodes.size() != 0) {
// Otherwise check if there is a new node that wants to be joined
e = &*topo.new_nodes.begin();
req = topo.requests[e->first];
} else if (!topo.requests.empty()) {
// If there is no new node but request queue is not empty there is a request for normal node
req = topo.requests.begin()->second;
e = &*topo.normal_nodes.find(topo.requests.begin()->first);
}
if (!e) {
return guard;
struct start_cleanup {
group0_guard guard;
};
// Return dead nodes and while at it checking if there are live nodes that either need cleanup
// or running one already
std::unordered_set<raft::server_id> get_dead_node(bool& cleanup_running, bool& cleanup_needed) {
std::unordered_set<raft::server_id> dead_set;
cleanup_needed = cleanup_running = false;
for (auto& n : _topo_sm._topology.normal_nodes) {
bool alive = false;
try {
alive = _gossiper.is_alive(id2ip(locator::host_id(n.first.uuid())));
} catch (...) {}
if (!alive) {
dead_set.insert(n.first);
} else {
cleanup_running |= (n.second.cleanup == cleanup_status::running);
cleanup_needed |= (n.second.cleanup == cleanup_status::needed);
}
}
return dead_set;
}
std::optional<request_param> get_request_param(raft::server_id id) {
std::optional<request_param> req_param;
auto rit = topo.req_param.find(e->first);
if (rit != topo.req_param.end()) {
auto rit = _topo_sm._topology.req_param.find(id);
if (rit != _topo_sm._topology.req_param.end()) {
req_param = rit->second;
}
return node_to_work_on{std::move(guard), &topo, e->first, &e->second, std::move(req), std::move(req_param)};
return req_param;
};
// Returns:
// guard - there is nothing to do.
// cancel_requests - no request can be started so cancel the queue
// start_cleanup - cleanup needs to be started
// node_to_work_on - the node the topology coordinator should work on
std::variant<group0_guard, cancel_requests, start_cleanup, node_to_work_on> get_next_task(group0_guard guard) {
auto& topo = _topo_sm._topology;
if (topo.transition_nodes.size() != 0) {
// If there is a node that is the middle of topology operation continue with it
return get_node_to_work_on(std::move(guard));
}
bool cleanup_running;
bool cleanup_needed;
const auto dead_nodes = get_dead_node(cleanup_running, cleanup_needed);
if (cleanup_running || topo.requests.empty()) {
// Ether there is no requests or there is a live node that runs cleanup. Wait for it to complete.
return std::move(guard);
}
std::optional<std::pair<raft::server_id, topology_request>> next_req;
for (auto& req : topo.requests) {
auto enough_live_nodes = [&] {
auto exclude_nodes = get_excluded_nodes(req.first, req.second, get_request_param(req.first));
for (auto id : dead_nodes) {
if (!exclude_nodes.contains(id)) {
return false;
}
}
return true;
};
if (enough_live_nodes()) {
if (!next_req || next_req->second > req.second) {
next_req = req;
}
}
}
if (!next_req) {
// We did not find a request that has enough live node to proceed
// Cancel all requests to let admin know that no operation can succeed
slogger.warn("topology coordinator: cancel request queue because no request can proceed. Dead nodes: {}", dead_nodes);
return cancel_requests{std::move(guard)};
}
auto [id, req] = *next_req;
if (cleanup_needed && (req == topology_request::remove || req == topology_request::leave)) {
// If the highest prio request is removenode or decommission we need to start cleanup if one is needed
return start_cleanup(std::move(guard));
}
return node_to_work_on(std::move(guard), &topo, id, &topo.find(id)->second, req, get_request_param(id));
};
node_to_work_on get_node_to_work_on(group0_guard guard) {
auto node_or_guard = get_node_to_work_on_opt(std::move(guard));
if (auto* node = std::get_if<node_to_work_on>(&node_or_guard)) {
return std::move(*node);
auto& topo = _topo_sm._topology;
if (topo.transition_nodes.empty()) {
on_internal_error(slogger, ::format(
"raft topology: could not find node to work on"
" even though the state requires it (state: {})", topo.tstate));
}
on_internal_error(slogger, ::format(
"raft topology: could not find node to work on"
" even though the state requires it (state: {})", _topo_sm._topology.tstate));
auto e = &*topo.transition_nodes.begin();
return node_to_work_on{std::move(guard), &topo, e->first, &e->second, std::nullopt, get_request_param(e->first)};
};
future<group0_guard> start_operation() {
@@ -2090,12 +2159,23 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
// on the fact that the block which calls this is atomic.
// FIXME: Don't take the ownership of the guard to make the above guarantee explicit.
std::pair<bool, group0_guard> should_preempt_balancing(group0_guard guard) {
auto node_or_guard = get_node_to_work_on_opt(std::move(guard));
if (auto* node = std::get_if<node_to_work_on>(&node_or_guard)) {
auto work = get_next_task(std::move(guard));
if (auto* node = std::get_if<node_to_work_on>(&work)) {
return std::make_pair(true, std::move(node->guard));
}
guard = std::get<group0_guard>(std::move(node_or_guard));
if (auto* cancel = std::get_if<cancel_requests>(&work)) {
// request queue needs to be canceled, so preempt balancing
return std::make_pair(true, std::move(cancel->guard));
}
if (auto* cleanup = std::get_if<start_cleanup>(&work)) {
// cleanup has to be started
return std::make_pair(true, std::move(cleanup->guard));
}
guard = std::get<group0_guard>(std::move(work));
if (_topo_sm._topology.global_request) {
return std::make_pair(true, std::move(guard));
}
@@ -2107,19 +2187,87 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
return std::make_pair(false, std::move(guard));
}
future<> cancel_all_requests(group0_guard guard) {
std::vector<canonical_mutation> muts;
std::vector<raft::server_id> reject_join;
if (_topo_sm._topology.requests.empty()) {
co_return;
}
auto ts = guard.write_timestamp();
for (auto& [id, req] : _topo_sm._topology.requests) {
switch (req) {
case topology_request::replace:
[[fallthrough]];
case topology_request::join: {
topology_mutation_builder builder(ts);
builder.with_node(id)
.set("node_state", node_state::left)
.del("topology_request");
reject_join.emplace_back(id);
muts.emplace_back(builder.build());
try {
co_await wait_for_ip(id, _address_map, _as);
} catch (...) {
slogger.warn("wait_for_ip failed during cancelation: {}", std::current_exception());
}
}
break;
case topology_request::leave:
[[fallthrough]];
case topology_request::rebuild:
[[fallthrough]];
case topology_request::remove: {
topology_mutation_builder builder(ts);
builder.with_node(id)
.del("topology_request");
muts.emplace_back(builder.build());
}
break;
}
}
co_await update_topology_state(std::move(guard), std::move(muts), "cancel all topology requests");
for (auto id : reject_join) {
try {
co_await respond_to_joining_node(id, join_node_response_params{
.response = join_node_response_params::rejected{
.reason = "request canceled because some required nodes are dead"
},
});
} catch (...) {
slogger.warn("raft topology: attempt to send rejection response to {} failed: {}. "
"The node may hang. It's safe to shut it down manually now.",
id, std::current_exception());
}
}
}
// Returns `true` iff there was work to do.
future<bool> handle_topology_transition(group0_guard guard) {
auto tstate = _topo_sm._topology.tstate;
if (!tstate) {
// When adding a new source of work, make sure to update should_preempt_balancing() as well.
auto node_or_guard = get_node_to_work_on_opt(std::move(guard));
if (auto* node = std::get_if<node_to_work_on>(&node_or_guard)) {
auto work = get_next_task(std::move(guard));
if (auto* node = std::get_if<node_to_work_on>(&work)) {
co_await handle_node_transition(std::move(*node));
co_return true;
}
guard = std::get<group0_guard>(std::move(node_or_guard));
if (auto* cancel = std::get_if<cancel_requests>(&work)) {
co_await cancel_all_requests(std::move(cancel->guard));
co_return true;
}
if (auto* cleanup = std::get_if<start_cleanup>(&work)) {
co_await start_cleanup_on_dirty_nodes(std::move(cleanup->guard));
co_return true;
}
guard = std::get<group0_guard>(std::move(work));
if (_topo_sm._topology.global_request) {
co_await handle_global_request(std::move(guard));
co_return true;
@@ -2849,7 +2997,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
// mark all nodes (except self) as cleanup needed
if (node.id != id) {
topology_mutation_builder builder(node.guard.write_timestamp());
builder.with_node(id).set("cleanup_status", cleanup_status::running);
builder.with_node(id).set("cleanup_status", cleanup_status::needed);
muts.emplace_back(builder.build());
slogger.trace("raft topology: mark node {} as needed cleanup", id);
}
@@ -2857,6 +3005,25 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
return muts;
}
future<> start_cleanup_on_dirty_nodes(group0_guard guard) {
auto& topo = _topo_sm._topology;
std::vector<canonical_mutation> muts;
muts.reserve(topo.normal_nodes.size());
for (auto& [id, rs] : topo.normal_nodes) {
if (rs.cleanup == cleanup_status::needed) {
topology_mutation_builder builder(guard.write_timestamp());
builder.with_node(id).set("cleanup_status", cleanup_status::running);
muts.emplace_back(builder.build());
slogger.trace("raft topology: mark node {} as cleanup running", id);
}
}
if (!muts.empty()) {
co_await update_topology_state(std::move(guard), std::move(muts), "Starting cleanup");
}
}
// Returns true if the state machine was transitioned into tablet migration path.
future<bool> maybe_start_tablet_migration(group0_guard);

View File

@@ -39,11 +39,15 @@ enum class node_state: uint16_t {
rollback_to_normal, // the node rolls back failed decommission/remove node operation
};
// The order of the requests is a priority
// order in which requests are executes in case
// there are multiple requests in the queue.
// The order tries to minimize the amount of cleanups.
enum class topology_request: uint16_t {
join,
leave,
remove,
replace,
join,
remove,
leave,
rebuild
};