From ddfd9c3173d81d53d4f6085d4693d3cc9fa54e9c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Patryk=20J=C4=99drzejczak?= Date: Thu, 4 Jan 2024 16:28:23 +0100 Subject: [PATCH 1/4] raft topology: join: shut down a node on error in response handler If the joining node fails while handling the response from the topology coordinator, it hangs even though it knows the join operation has failed. Therefore, we ensure it shuts down in this patch. We rethrow the caught exception to ensure the topology coordinator knows the RPC has failed. In case of rejection, it does not matter because the coordinator behaves the same way in both cases: RPC success and RPC failure. It transitions the rejected node to the left state. However, in case of acceptance, this only happens if the RPC fails. Otherwise, the coordinator continues handling the request. On abort, one of the two events happens first: - the new catch statement catches `abort_requested_exeption` and sets it on `_join_node_response_done`, - `co_await _ss._join_node_response_done.get_shared_future(as);` in `join_node_rpc_handshaker::post_server_start` resolves with `abort_requested_exception` after triggering `as`. In both cases, `join_node_rpc_handshaker::post_server_start` throws `abort_requested_exception`. Therefore, we don't need a separate catch statement for `abort_requested_exception` in `join_node_response_handler`. --- service/storage_service.cc | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/service/storage_service.cc b/service/storage_service.cc index 1fee26b84d..c5d766d324 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -7119,6 +7119,7 @@ future storage_service::join_node_response_handler(jo co_return join_node_response_result{}; } + try { co_return co_await std::visit(overloaded_functor { [&] (const join_node_response_params::accepted& acc) -> future { // Allow other nodes to mark the replacing node as alive. It has @@ -7210,6 +7211,14 @@ future storage_service::join_node_response_handler(jo co_return join_node_response_result{}; }, }, params.response); + } catch (...) { + auto eptr = std::current_exception(); + slogger.warn("raft topology: error while handling the join response from the topology coordinator. " + "The node will not join the cluster. Error: {}", eptr); + _join_node_response_done.set_exception(std::move(eptr)); + + throw; + } } void storage_service::init_messaging_service(bool raft_topology_change_enabled) { From f3a08757af1684290da1c7f6c6497aec64de5f01 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Patryk=20J=C4=99drzejczak?= Date: Tue, 9 Jan 2024 11:12:24 +0100 Subject: [PATCH 2/4] storage_service: join_node_response_handler: fix indentation Broken in the previous patch. --- service/storage_service.cc | 162 ++++++++++++++++++------------------- 1 file changed, 81 insertions(+), 81 deletions(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index c5d766d324..8bf32f1176 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -7120,97 +7120,97 @@ future storage_service::join_node_response_handler(jo } try { - co_return co_await std::visit(overloaded_functor { - [&] (const join_node_response_params::accepted& acc) -> future { - // Allow other nodes to mark the replacing node as alive. It has - // effect only if the replacing node is reusing the IP of the - // replaced node. In such a case, we do not allow the replacing - // node to advertise itself earlier. Thanks to this, if the - // topology sees the node being replaced as alive, it can safely - // reject the join request because it can be sure that it is not - // the replacing node that is alive. - co_await _gossiper.advertise_to_nodes({}); + co_return co_await std::visit(overloaded_functor { + [&] (const join_node_response_params::accepted& acc) -> future { + // Allow other nodes to mark the replacing node as alive. It has + // effect only if the replacing node is reusing the IP of the + // replaced node. In such a case, we do not allow the replacing + // node to advertise itself earlier. Thanks to this, if the + // topology sees the node being replaced as alive, it can safely + // reject the join request because it can be sure that it is not + // the replacing node that is alive. + co_await _gossiper.advertise_to_nodes({}); - // Do a read barrier to read/initialize the topology state - auto& raft_server = _group0->group0_server(); - co_await raft_server.read_barrier(&_group0_as); + // Do a read barrier to read/initialize the topology state + auto& raft_server = _group0->group0_server(); + co_await raft_server.read_barrier(&_group0_as); - // Calculate nodes to ignore - // TODO: ignore_dead_nodes setting for bootstrap - std::unordered_set ignored_ids; - auto my_request_it = - _topology_state_machine._topology.req_param.find(_group0->load_my_id()); - if (my_request_it != _topology_state_machine._topology.req_param.end()) { - if (auto* replace = std::get_if(&my_request_it->second)) { - ignored_ids = replace->ignored_ids; - ignored_ids.insert(replace->replaced_id); - } - } - - // After this RPC finishes, repair or streaming will be run, and - // both of them require this node to see the normal nodes as UP. - // This condition might not be true yet as this information is - // propagated through gossip. In order to reduce the chance of - // repair/streaming failure, wait here until we see normal nodes - // as UP (or the timeout elapses). - const auto& amap = _group0->address_map(); - std::vector sync_nodes; - // FIXME: https://github.com/scylladb/scylladb/issues/12279 - // Keep trying to translate host IDs to IPs until all are available in gossip - // Ultimately, we should take this information from token_metadata - const auto sync_nodes_resolve_deadline = lowres_clock::now() + wait_for_live_nodes_timeout; - while (true) { - sync_nodes.clear(); - std::vector untranslated_ids; - for (const auto& [id, _] : _topology_state_machine._topology.normal_nodes) { - if (ignored_ids.contains(id)) { - continue; + // Calculate nodes to ignore + // TODO: ignore_dead_nodes setting for bootstrap + std::unordered_set ignored_ids; + auto my_request_it = + _topology_state_machine._topology.req_param.find(_group0->load_my_id()); + if (my_request_it != _topology_state_machine._topology.req_param.end()) { + if (auto* replace = std::get_if(&my_request_it->second)) { + ignored_ids = replace->ignored_ids; + ignored_ids.insert(replace->replaced_id); } - if (auto ip = amap.find(id)) { - sync_nodes.push_back(*ip); + } + + // After this RPC finishes, repair or streaming will be run, and + // both of them require this node to see the normal nodes as UP. + // This condition might not be true yet as this information is + // propagated through gossip. In order to reduce the chance of + // repair/streaming failure, wait here until we see normal nodes + // as UP (or the timeout elapses). + const auto& amap = _group0->address_map(); + std::vector sync_nodes; + // FIXME: https://github.com/scylladb/scylladb/issues/12279 + // Keep trying to translate host IDs to IPs until all are available in gossip + // Ultimately, we should take this information from token_metadata + const auto sync_nodes_resolve_deadline = lowres_clock::now() + wait_for_live_nodes_timeout; + while (true) { + sync_nodes.clear(); + std::vector untranslated_ids; + for (const auto& [id, _] : _topology_state_machine._topology.normal_nodes) { + if (ignored_ids.contains(id)) { + continue; + } + if (auto ip = amap.find(id)) { + sync_nodes.push_back(*ip); + } else { + untranslated_ids.push_back(id); + } + } + + if (!untranslated_ids.empty()) { + if (lowres_clock::now() > sync_nodes_resolve_deadline) { + throw std::runtime_error(format( + "Failed to obtain IP addresses of nodes that should be seen" + " as alive within {}s", + std::chrono::duration_cast(wait_for_live_nodes_timeout).count())); + } + + static logger::rate_limit rate_limit{std::chrono::seconds(1)}; + slogger.log(log_level::warn, rate_limit, "raft topology: cannot map nodes {} to ips, retrying.", + untranslated_ids); + + co_await sleep_abortable(std::chrono::milliseconds(5), _group0_as); } else { - untranslated_ids.push_back(id); + break; } } - if (!untranslated_ids.empty()) { - if (lowres_clock::now() > sync_nodes_resolve_deadline) { - throw std::runtime_error(format( - "Failed to obtain IP addresses of nodes that should be seen" - " as alive within {}s", - std::chrono::duration_cast(wait_for_live_nodes_timeout).count())); - } + slogger.info("raft topology: coordinator accepted request to join, " + "waiting for nodes {} to be alive before responding and continuing", + sync_nodes); + co_await _gossiper.wait_alive(sync_nodes, wait_for_live_nodes_timeout); + slogger.info("raft topology: nodes {} are alive", sync_nodes); - static logger::rate_limit rate_limit{std::chrono::seconds(1)}; - slogger.log(log_level::warn, rate_limit, "raft topology: cannot map nodes {} to ips, retrying.", - untranslated_ids); + // Unblock waiting join_node_rpc_handshaker::post_server_start, + // which will start the raft server and continue + _join_node_response_done.set_value(); - co_await sleep_abortable(std::chrono::milliseconds(5), _group0_as); - } else { - break; - } - } + co_return join_node_response_result{}; + }, + [&] (const join_node_response_params::rejected& rej) -> future { + auto eptr = std::make_exception_ptr(std::runtime_error( + format("the topology coordinator rejected request to join the cluster: {}", rej.reason))); + _join_node_response_done.set_exception(std::move(eptr)); - slogger.info("raft topology: coordinator accepted request to join, " - "waiting for nodes {} to be alive before responding and continuing", - sync_nodes); - co_await _gossiper.wait_alive(sync_nodes, wait_for_live_nodes_timeout); - slogger.info("raft topology: nodes {} are alive", sync_nodes); - - // Unblock waiting join_node_rpc_handshaker::post_server_start, - // which will start the raft server and continue - _join_node_response_done.set_value(); - - co_return join_node_response_result{}; - }, - [&] (const join_node_response_params::rejected& rej) -> future { - auto eptr = std::make_exception_ptr(std::runtime_error( - format("the topology coordinator rejected request to join the cluster: {}", rej.reason))); - _join_node_response_done.set_exception(std::move(eptr)); - - co_return join_node_response_result{}; - }, - }, params.response); + co_return join_node_response_result{}; + }, + }, params.response); } catch (...) { auto eptr = std::current_exception(); slogger.warn("raft topology: error while handling the join response from the topology coordinator. " From b4b170047bf21d6b5166c0ebe91f37aa5c77a2ab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Patryk=20J=C4=99drzejczak?= Date: Tue, 9 Jan 2024 11:33:57 +0100 Subject: [PATCH 3/4] raft topology: join: allow only the first response to be a succesful acceptance The joining node might receive more than one join response (see the comment at the beginning of `join_node_response_handler`). If the first response was a rejection or it was an acceptance but the joining node failed while handling it, the following acceptances by the coordinator shouldn't succeed. The joining node considers the join operation as failed. Currently, we always immediately return from non-first response handler calls. However, if the response is an acceptance, and the first response wasn't a successfully handled acceptance, we need to throw an exception to ensure the topology coordinator moves the node to the left state. We do it in this patch. We throw the exception set while handling the first response. It explains why we are failing the current acceptance. We don't want to throw the exception on rejection. The topology coordinator will move the node to the left state anyway. Also, failing the rejection with an error message containing "the topology coordinator rejected request to join the cluster" (from the previous rejection) would be very confusing. --- service/storage_service.cc | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index 8bf32f1176..80dcd00f16 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -7114,8 +7114,16 @@ future storage_service::join_node_response_handler(jo co_await _join_node_request_done.get_shared_future(_group0_as); if (_join_node_response_done.available()) { - // We already handled this RPC. No need to retry it. Return immediately for idempotence. + // We already handled this RPC. No need to retry it. slogger.info("raft topology: the node got join_node_response RPC for the second time, ignoring"); + + if (std::holds_alternative(params.response) + && _join_node_response_done.failed()) { + // The topology coordinator accepted the node that was rejected before or failed while handling + // the response. Inform the coordinator about it so it moves the node to the left state. + throw _join_node_response_done.get_shared_future().get_exception(); + } + co_return join_node_response_result{}; } From e99d03a21e03188a0517e859a6bed5193fe361bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Patryk=20J=C4=99drzejczak?= Date: Thu, 4 Jan 2024 18:10:53 +0100 Subject: [PATCH 4/4] topology_coordinator: clarify warnings It was unclear where the error messages ended if they consisted of multiple sentences. --- service/storage_service.cc | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index 80dcd00f16..a1205fb17a 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -2249,8 +2249,8 @@ class topology_coordinator { .response = std::move(validation_result), }); } catch (const std::runtime_error& e) { - slogger.warn("raft topology: attempt to send rejection response to {} failed: {}. " - "The node may hang. It's safe to shut it down manually now.", + slogger.warn("raft topology: attempt to send rejection response to {} failed. " + "The node may hang. It's safe to shut it down manually now. Error: {}", node.id, e.what()); } @@ -2506,8 +2506,8 @@ class topology_coordinator { }); responded = true; } catch (const std::runtime_error& e) { - slogger.warn("raft topology: attempt to send acceptance response to {} failed: {}. " - "The node may hang. It's safe to shut it down manually now.", + slogger.warn("raft topology: attempt to send acceptance response to {} failed. " + "The node may hang. It's safe to shut it down manually now. Error: {}", node.id, e.what()); }