From 84197ff735963c84157e2fbf9c9ee23f582fdc48 Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Mon, 8 Jan 2024 11:06:09 +0200 Subject: [PATCH] storage_service: topology coordinator: check topology operation completion using status in topology_requests table Instead of trying to guess if a request completed by looking into the topology state (which is sometimes can be error prone) look at the request status in the new topology_requests. If request failed report a reason for the failure from the table. --- db/system_keyspace.cc | 17 +++++ db/system_keyspace.hh | 2 + service/storage_service.cc | 100 ++++++++++++++---------------- service/storage_service.hh | 3 + service/topology_state_machine.hh | 5 ++ 5 files changed, 74 insertions(+), 53 deletions(-) diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index 8de4b4074a..911131a1f3 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -2872,6 +2872,23 @@ future<> system_keyspace::sstables_registry_list(sstring location, sstable_regis }); } +future system_keyspace::get_topology_request_state(utils::UUID id) { + auto rs = co_await execute_cql( + format("SELECT done, error FROM system.{} WHERE id = {}", TOPOLOGY_REQUESTS, id)); + if (!rs || rs->empty()) { + on_internal_error(slogger, format("no entry for request id {}", id)); + } + + auto& row = rs->one(); + sstring error; + + if (row.has("error")) { + error = row.get_as("error"); + } + + co_return service::topology_request_state{row.get_as("done"), std::move(error)}; +} + sstring system_keyspace_name() { return system_keyspace::NAME; } diff --git a/db/system_keyspace.hh b/db/system_keyspace.hh index eda11816e7..fb3443ff6a 100644 --- a/db/system_keyspace.hh +++ b/db/system_keyspace.hh @@ -43,6 +43,7 @@ namespace paxos { class proposal; } // namespace service::paxos +struct topology_request_state; } namespace netw { @@ -519,6 +520,7 @@ public: future get_must_synchronize_topology(); future<> set_must_synchronize_topology(bool); + future get_topology_request_state(utils::UUID id); private: static service::topology_features decode_topology_features_state(::shared_ptr rs); diff --git a/service/storage_service.cc b/service/storage_service.cc index 4fc2baf6bb..b762fa232f 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -3920,23 +3920,18 @@ future<> storage_service::join_token_ring(shardedid()) || - (_topology_state_machine._topology.transition_nodes.contains(raft_server->id()) && - _topology_state_machine._topology.transition_nodes[raft_server->id()].state == node_state::left_token_ring); - }; + sstring err; - // Wait until we enter one of the final states - co_await _topology_state_machine.event.when([this, raft_server, &leaving] { - return _topology_state_machine._topology.normal_nodes.contains(raft_server->id()) || leaving(); - }); - - if (leaving()) { - if (_sys_ks.local().bootstrap_complete()) { + if (_sys_ks.local().bootstrap_complete()) { + if (_topology_state_machine._topology.left_nodes.contains(raft_server->id())) { throw std::runtime_error("A node that already left the cluster cannot be restarted"); - } else { - throw std::runtime_error(fmt::format("{} failed. See earlier errors", raft_replace_info ? "Replace" : "Bootstrap")); } + } else { + err = co_await wait_for_topology_request_completion(join_params.request_id); + } + + if (!err.empty()) { + throw std::runtime_error(fmt::format("{} failed. See earlier errors ({})", raft_replace_info ? "Replace" : "Bootstrap", err)); } co_await update_topology_with_local_metadata(*raft_server); @@ -5462,6 +5457,7 @@ void on_streaming_finished() { future<> storage_service::raft_decommission() { auto& raft_server = _group0->group0_server(); + utils::UUID request_id; auto disengage_shutdown_promise = defer([this] { _shutdown_request_promise = std::nullopt; @@ -5496,6 +5492,7 @@ future<> storage_service::raft_decommission() { topology_change change{{builder.build(), rtbuilder.build()}}; group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, ::format("decommission: request decommission for {}", raft_server.id())); + request_id = guard.new_group0_state_id(); try { co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as); } catch (group0_concurrent_modification&) { @@ -5514,22 +5511,15 @@ future<> storage_service::raft_decommission() { _topology_state_machine.event.broadcast(); }); - auto f2 = _topology_state_machine.event.wait([this, &raft_server, &abort_wait] { - if (abort_wait) { - return true; // the wait is aborted + auto f2 = wait_for_topology_request_completion(request_id, &abort_wait).then([this] (sstring error) { + if (!error.empty()) { + // Got error here. Abort the wait for the shutdown event + _shutdown_request_promise->set_exception(std::runtime_error("Decommission failure")); } - // Wait for decommission request to be removed, but node stay as normal which means decommission failed - auto it = _topology_state_machine._topology.find(raft_server.id()); - if (it->second.state == node_state::normal) { - auto rit = _topology_state_machine._topology.requests.find(raft_server.id()); - if (rit == _topology_state_machine._topology.requests.end() || rit->second != topology_request::leave) { - _shutdown_request_promise->set_exception(std::runtime_error("Decommission failure")); - return true; // node is normal, but leave request is gone. It means decommission failed - } - } - return false; + return error; }); + slogger.info("raft topology: decommission: wait for completion"); auto res = co_await when_all(std::move(f1), std::move(f2)); @@ -5537,8 +5527,8 @@ future<> storage_service::raft_decommission() { // Need to set it otherwise gossiper will try to send shutdown on exit co_await _gossiper.add_local_application_state({{ gms::application_state::STATUS, gms::versioned_value::left({}, _gossiper.now().time_since_epoch().count()) }}); } else { - constexpr auto err = "Decommission failed. See earlier errors"; - slogger.error(err); + auto err = fmt::format("Decommission failed. See earlier errors ({})", std::get<1>(res).get0()); + slogger.error("{}", err); throw std::runtime_error(err); } } @@ -5821,6 +5811,7 @@ void storage_service::run_replace_ops(std::unordered_set& bootstrap_token future<> storage_service::raft_removenode(locator::host_id host_id, std::list ignore_nodes_params) { auto id = raft::server_id{host_id.uuid()}; + utils::UUID request_id; while (true) { auto guard = co_await _group0->client().start_operation(&_group0_as); @@ -5866,6 +5857,7 @@ future<> storage_service::raft_removenode(locator::host_id host_id, std::listclient().prepare_command(std::move(change), guard, ::format("removenode: request remove for {}", id)); + request_id = guard.new_group0_state_id(); try { co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as); } catch (group0_concurrent_modification&) { @@ -5877,33 +5869,19 @@ future<> storage_service::raft_removenode(locator::host_id host_id, std::listsecond.state == node_state::normal) { - auto rit = _topology_state_machine._topology.requests.find(id); - if (rit == _topology_state_machine._topology.requests.end() || rit->second != topology_request::remove) { - return true; // node is normal, but remove request is gone. It means removenode failed - } - } - return false; - }); - if (left) { + // Wait until request completes + auto error = co_await wait_for_topology_request_completion(request_id); + + if (error.empty()) { try { co_await _group0->remove_from_raft_config(id); } catch (raft::not_a_member&) { slogger.info("raft topology removenode: already removed from the raft config by the topology coordinator"); } } else { - constexpr auto err = "Removenode failed. See earlier errors"; - slogger.error(err); + auto err = fmt::format("Removenode failed. See earlier errors ({})", error); + slogger.error("{}", err); throw std::runtime_error(err); } } @@ -6496,8 +6474,21 @@ future<> storage_service::do_cluster_cleanup() { slogger.info("raft topology: cluster cleanup done"); } +future storage_service::wait_for_topology_request_completion(utils::UUID id, bool* stop_waiting) { + while (!stop_waiting || !*stop_waiting) { + auto [done, error] = co_await _sys_ks.local().get_topology_request_state(id); + if (done) { + co_return error; + } + co_await _topology_state_machine.event.when(); + } + + co_return sstring(); +} + future<> storage_service::raft_rebuild(sstring source_dc) { auto& raft_server = _group0->group0_server(); + utils::UUID request_id; while (true) { auto guard = co_await _group0->client().start_operation(&_group0_as); @@ -6529,6 +6520,8 @@ future<> storage_service::raft_rebuild(sstring source_dc) { topology_change change{{builder.build(), rtbuilder.build()}}; group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, ::format("rebuild: request rebuild for {} ({})", raft_server.id(), source_dc)); + request_id = guard.new_group0_state_id(); + try { co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_group0_as); } catch (group0_concurrent_modification&) { @@ -6538,10 +6531,11 @@ future<> storage_service::raft_rebuild(sstring source_dc) { break; } - // Wait until rebuild completes. We know it completes when the request parameter is empty - co_await _topology_state_machine.event.when([this, &raft_server] { - return !_topology_state_machine._topology.req_param.contains(raft_server.id()); - }); + // Wait until request completes + auto err = co_await wait_for_topology_request_completion(request_id); + if (!err.empty()) { + throw std::runtime_error(::format("rebuild failed: {}", err)); + } } future<> storage_service::raft_check_and_repair_cdc_streams() { diff --git a/service/storage_service.hh b/service/storage_service.hh index f7323e43b6..6fc8080bb0 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -810,6 +810,9 @@ private: future<> _sstable_cleanup_fiber = make_ready_future<>(); future<> sstable_cleanup_fiber(raft::server& raft, sharded& proxy) noexcept; + // Waits for a topology request with a given ID to complete and return non empty error string + // if request completes with an error + future wait_for_topology_request_completion(utils::UUID id, bool* stop_waiting = nullptr); // We need to be able to abort all group0 operation during shutdown, so we need special abort source for that abort_source _group0_as; diff --git a/service/topology_state_machine.hh b/service/topology_state_machine.hh index 365b57b689..aa6457bd2e 100644 --- a/service/topology_state_machine.hh +++ b/service/topology_state_machine.hh @@ -238,6 +238,11 @@ struct fencing_token { } }; +struct topology_request_state { + bool done; + sstring error; +}; + std::ostream& operator<<(std::ostream& os, const fencing_token& fencing_token); std::ostream& operator<<(std::ostream& os, topology::transition_state s); topology::transition_state transition_state_from_string(const sstring& s);