storage_service: topology coordinator: check topology operation completion using status in topology_requests table

Instead of trying to guess if a request completed by looking into the
topology state (which is sometimes can be error prone) look at the
request status in the new topology_requests. If request failed report
a reason for the failure from the table.
This commit is contained in:
Gleb Natapov
2024-01-08 11:06:09 +02:00
parent 1c18476385
commit 84197ff735
5 changed files with 74 additions and 53 deletions

View File

@@ -2872,6 +2872,23 @@ future<> system_keyspace::sstables_registry_list(sstring location, sstable_regis
});
}
future<service::topology_request_state> system_keyspace::get_topology_request_state(utils::UUID id) {
auto rs = co_await execute_cql(
format("SELECT done, error FROM system.{} WHERE id = {}", TOPOLOGY_REQUESTS, id));
if (!rs || rs->empty()) {
on_internal_error(slogger, format("no entry for request id {}", id));
}
auto& row = rs->one();
sstring error;
if (row.has("error")) {
error = row.get_as<sstring>("error");
}
co_return service::topology_request_state{row.get_as<bool>("done"), std::move(error)};
}
sstring system_keyspace_name() {
return system_keyspace::NAME;
}

View File

@@ -43,6 +43,7 @@ namespace paxos {
class proposal;
} // namespace service::paxos
struct topology_request_state;
}
namespace netw {
@@ -519,6 +520,7 @@ public:
future<bool> get_must_synchronize_topology();
future<> set_must_synchronize_topology(bool);
future<service::topology_request_state> get_topology_request_state(utils::UUID id);
private:
static service::topology_features decode_topology_features_state(::shared_ptr<cql3::untyped_result_set> rs);

View File

@@ -3920,23 +3920,18 @@ future<> storage_service::join_token_ring(sharded<db::system_distributed_keyspac
// we do that here.
co_await raft_initialize_discovery_leader(*raft_server, join_params);
auto leaving = [&] {
return _topology_state_machine._topology.left_nodes.contains(raft_server->id()) ||
(_topology_state_machine._topology.transition_nodes.contains(raft_server->id()) &&
_topology_state_machine._topology.transition_nodes[raft_server->id()].state == node_state::left_token_ring);
};
sstring err;
// Wait until we enter one of the final states
co_await _topology_state_machine.event.when([this, raft_server, &leaving] {
return _topology_state_machine._topology.normal_nodes.contains(raft_server->id()) || leaving();
});
if (leaving()) {
if (_sys_ks.local().bootstrap_complete()) {
if (_sys_ks.local().bootstrap_complete()) {
if (_topology_state_machine._topology.left_nodes.contains(raft_server->id())) {
throw std::runtime_error("A node that already left the cluster cannot be restarted");
} else {
throw std::runtime_error(fmt::format("{} failed. See earlier errors", raft_replace_info ? "Replace" : "Bootstrap"));
}
} else {
err = co_await wait_for_topology_request_completion(join_params.request_id);
}
if (!err.empty()) {
throw std::runtime_error(fmt::format("{} failed. See earlier errors ({})", raft_replace_info ? "Replace" : "Bootstrap", err));
}
co_await update_topology_with_local_metadata(*raft_server);
@@ -5462,6 +5457,7 @@ void on_streaming_finished() {
future<> storage_service::raft_decommission() {
auto& raft_server = _group0->group0_server();
utils::UUID request_id;
auto disengage_shutdown_promise = defer([this] {
_shutdown_request_promise = std::nullopt;
@@ -5496,6 +5492,7 @@ future<> storage_service::raft_decommission() {
topology_change change{{builder.build(), rtbuilder.build()}};
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, ::format("decommission: request decommission for {}", raft_server.id()));
request_id = guard.new_group0_state_id();
try {
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as);
} catch (group0_concurrent_modification&) {
@@ -5514,22 +5511,15 @@ future<> storage_service::raft_decommission() {
_topology_state_machine.event.broadcast();
});
auto f2 = _topology_state_machine.event.wait([this, &raft_server, &abort_wait] {
if (abort_wait) {
return true; // the wait is aborted
auto f2 = wait_for_topology_request_completion(request_id, &abort_wait).then([this] (sstring error) {
if (!error.empty()) {
// Got error here. Abort the wait for the shutdown event
_shutdown_request_promise->set_exception(std::runtime_error("Decommission failure"));
}
// Wait for decommission request to be removed, but node stay as normal which means decommission failed
auto it = _topology_state_machine._topology.find(raft_server.id());
if (it->second.state == node_state::normal) {
auto rit = _topology_state_machine._topology.requests.find(raft_server.id());
if (rit == _topology_state_machine._topology.requests.end() || rit->second != topology_request::leave) {
_shutdown_request_promise->set_exception(std::runtime_error("Decommission failure"));
return true; // node is normal, but leave request is gone. It means decommission failed
}
}
return false;
return error;
});
slogger.info("raft topology: decommission: wait for completion");
auto res = co_await when_all(std::move(f1), std::move(f2));
@@ -5537,8 +5527,8 @@ future<> storage_service::raft_decommission() {
// Need to set it otherwise gossiper will try to send shutdown on exit
co_await _gossiper.add_local_application_state({{ gms::application_state::STATUS, gms::versioned_value::left({}, _gossiper.now().time_since_epoch().count()) }});
} else {
constexpr auto err = "Decommission failed. See earlier errors";
slogger.error(err);
auto err = fmt::format("Decommission failed. See earlier errors ({})", std::get<1>(res).get0());
slogger.error("{}", err);
throw std::runtime_error(err);
}
}
@@ -5821,6 +5811,7 @@ void storage_service::run_replace_ops(std::unordered_set<token>& bootstrap_token
future<> storage_service::raft_removenode(locator::host_id host_id, std::list<locator::host_id_or_endpoint> ignore_nodes_params) {
auto id = raft::server_id{host_id.uuid()};
utils::UUID request_id;
while (true) {
auto guard = co_await _group0->client().start_operation(&_group0_as);
@@ -5866,6 +5857,7 @@ future<> storage_service::raft_removenode(locator::host_id host_id, std::list<lo
topology_change change{{builder.build(), rtbuilder.build()}};
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, ::format("removenode: request remove for {}", id));
request_id = guard.new_group0_state_id();
try {
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as);
} catch (group0_concurrent_modification&) {
@@ -5877,33 +5869,19 @@ future<> storage_service::raft_removenode(locator::host_id host_id, std::list<lo
}
slogger.info("raft topology: removenode: wait for completion");
bool left = false;
co_await _topology_state_machine.event.when([this, id, &left] {
// Wait for this node to move to state left which means that removenode completed
// or wait for removenode request to be removed, but node stay as normal which means removenode failed
auto it = _topology_state_machine._topology.find(id);
if (!it) {
left = true;
return true; // node either left or on the way
}
if (it->second.state == node_state::normal) {
auto rit = _topology_state_machine._topology.requests.find(id);
if (rit == _topology_state_machine._topology.requests.end() || rit->second != topology_request::remove) {
return true; // node is normal, but remove request is gone. It means removenode failed
}
}
return false;
});
if (left) {
// Wait until request completes
auto error = co_await wait_for_topology_request_completion(request_id);
if (error.empty()) {
try {
co_await _group0->remove_from_raft_config(id);
} catch (raft::not_a_member&) {
slogger.info("raft topology removenode: already removed from the raft config by the topology coordinator");
}
} else {
constexpr auto err = "Removenode failed. See earlier errors";
slogger.error(err);
auto err = fmt::format("Removenode failed. See earlier errors ({})", error);
slogger.error("{}", err);
throw std::runtime_error(err);
}
}
@@ -6496,8 +6474,21 @@ future<> storage_service::do_cluster_cleanup() {
slogger.info("raft topology: cluster cleanup done");
}
future<sstring> storage_service::wait_for_topology_request_completion(utils::UUID id, bool* stop_waiting) {
while (!stop_waiting || !*stop_waiting) {
auto [done, error] = co_await _sys_ks.local().get_topology_request_state(id);
if (done) {
co_return error;
}
co_await _topology_state_machine.event.when();
}
co_return sstring();
}
future<> storage_service::raft_rebuild(sstring source_dc) {
auto& raft_server = _group0->group0_server();
utils::UUID request_id;
while (true) {
auto guard = co_await _group0->client().start_operation(&_group0_as);
@@ -6529,6 +6520,8 @@ future<> storage_service::raft_rebuild(sstring source_dc) {
topology_change change{{builder.build(), rtbuilder.build()}};
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, ::format("rebuild: request rebuild for {} ({})", raft_server.id(), source_dc));
request_id = guard.new_group0_state_id();
try {
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as);
} catch (group0_concurrent_modification&) {
@@ -6538,10 +6531,11 @@ future<> storage_service::raft_rebuild(sstring source_dc) {
break;
}
// Wait until rebuild completes. We know it completes when the request parameter is empty
co_await _topology_state_machine.event.when([this, &raft_server] {
return !_topology_state_machine._topology.req_param.contains(raft_server.id());
});
// Wait until request completes
auto err = co_await wait_for_topology_request_completion(request_id);
if (!err.empty()) {
throw std::runtime_error(::format("rebuild failed: {}", err));
}
}
future<> storage_service::raft_check_and_repair_cdc_streams() {

View File

@@ -810,6 +810,9 @@ private:
future<> _sstable_cleanup_fiber = make_ready_future<>();
future<> sstable_cleanup_fiber(raft::server& raft, sharded<service::storage_proxy>& proxy) noexcept;
// Waits for a topology request with a given ID to complete and return non empty error string
// if request completes with an error
future<sstring> wait_for_topology_request_completion(utils::UUID id, bool* stop_waiting = nullptr);
// We need to be able to abort all group0 operation during shutdown, so we need special abort source for that
abort_source _group0_as;

View File

@@ -238,6 +238,11 @@ struct fencing_token {
}
};
struct topology_request_state {
bool done;
sstring error;
};
std::ostream& operator<<(std::ostream& os, const fencing_token& fencing_token);
std::ostream& operator<<(std::ostream& os, topology::transition_state s);
topology::transition_state transition_state_from_string(const sstring& s);