diff --git a/service/storage_service.cc b/service/storage_service.cc index 0095cf4530..364f9a74e6 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -2791,8 +2791,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { .response = std::move(validation_result), }); } catch (const std::runtime_error& e) { - slogger.warn("raft topology: attempt to send rejection response to {} failed: {}. " - "The node may hang. It's safe to shut it down manually now.", + slogger.warn("raft topology: attempt to send rejection response to {} failed. " + "The node may hang. It's safe to shut it down manually now. Error: {}", node.id, e.what()); } @@ -3074,8 +3074,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { }); responded = true; } catch (const std::runtime_error& e) { - slogger.warn("raft topology: attempt to send acceptance response to {} failed: {}. " - "The node may hang. It's safe to shut it down manually now.", + slogger.warn("raft topology: attempt to send acceptance response to {} failed. " + "The node may hang. It's safe to shut it down manually now. Error: {}", node.id, e.what()); } @@ -7827,102 +7827,119 @@ future storage_service::join_node_response_handler(jo co_await _join_node_request_done.get_shared_future(_group0_as); if (_join_node_response_done.available()) { - // We already handled this RPC. No need to retry it. Return immediately for idempotence. + // We already handled this RPC. No need to retry it. slogger.info("raft topology: the node got join_node_response RPC for the second time, ignoring"); + + if (std::holds_alternative(params.response) + && _join_node_response_done.failed()) { + // The topology coordinator accepted the node that was rejected before or failed while handling + // the response. Inform the coordinator about it so it moves the node to the left state. + throw _join_node_response_done.get_shared_future().get_exception(); + } + co_return join_node_response_result{}; } - co_return co_await std::visit(overloaded_functor { - [&] (const join_node_response_params::accepted& acc) -> future { - // Allow other nodes to mark the replacing node as alive. It has - // effect only if the replacing node is reusing the IP of the - // replaced node. In such a case, we do not allow the replacing - // node to advertise itself earlier. Thanks to this, if the - // topology sees the node being replaced as alive, it can safely - // reject the join request because it can be sure that it is not - // the replacing node that is alive. - co_await _gossiper.advertise_to_nodes({}); + try { + co_return co_await std::visit(overloaded_functor { + [&] (const join_node_response_params::accepted& acc) -> future { + // Allow other nodes to mark the replacing node as alive. It has + // effect only if the replacing node is reusing the IP of the + // replaced node. In such a case, we do not allow the replacing + // node to advertise itself earlier. Thanks to this, if the + // topology sees the node being replaced as alive, it can safely + // reject the join request because it can be sure that it is not + // the replacing node that is alive. + co_await _gossiper.advertise_to_nodes({}); - // Do a read barrier to read/initialize the topology state - auto& raft_server = _group0->group0_server(); - co_await raft_server.read_barrier(&_group0_as); + // Do a read barrier to read/initialize the topology state + auto& raft_server = _group0->group0_server(); + co_await raft_server.read_barrier(&_group0_as); - // Calculate nodes to ignore - // TODO: ignore_dead_nodes setting for bootstrap - std::unordered_set ignored_ids; - auto my_request_it = - _topology_state_machine._topology.req_param.find(_group0->load_my_id()); - if (my_request_it != _topology_state_machine._topology.req_param.end()) { - if (auto* replace = std::get_if(&my_request_it->second)) { - ignored_ids = replace->ignored_ids; - ignored_ids.insert(replace->replaced_id); - } - } - - // After this RPC finishes, repair or streaming will be run, and - // both of them require this node to see the normal nodes as UP. - // This condition might not be true yet as this information is - // propagated through gossip. In order to reduce the chance of - // repair/streaming failure, wait here until we see normal nodes - // as UP (or the timeout elapses). - const auto& amap = _group0->address_map(); - std::vector sync_nodes; - // FIXME: https://github.com/scylladb/scylladb/issues/12279 - // Keep trying to translate host IDs to IPs until all are available in gossip - // Ultimately, we should take this information from token_metadata - const auto sync_nodes_resolve_deadline = lowres_clock::now() + wait_for_live_nodes_timeout; - while (true) { - sync_nodes.clear(); - std::vector untranslated_ids; - for (const auto& [id, _] : _topology_state_machine._topology.normal_nodes) { - if (ignored_ids.contains(id)) { - continue; + // Calculate nodes to ignore + // TODO: ignore_dead_nodes setting for bootstrap + std::unordered_set ignored_ids; + auto my_request_it = + _topology_state_machine._topology.req_param.find(_group0->load_my_id()); + if (my_request_it != _topology_state_machine._topology.req_param.end()) { + if (auto* replace = std::get_if(&my_request_it->second)) { + ignored_ids = replace->ignored_ids; + ignored_ids.insert(replace->replaced_id); } - if (auto ip = amap.find(id)) { - sync_nodes.push_back(*ip); + } + + // After this RPC finishes, repair or streaming will be run, and + // both of them require this node to see the normal nodes as UP. + // This condition might not be true yet as this information is + // propagated through gossip. In order to reduce the chance of + // repair/streaming failure, wait here until we see normal nodes + // as UP (or the timeout elapses). + const auto& amap = _group0->address_map(); + std::vector sync_nodes; + // FIXME: https://github.com/scylladb/scylladb/issues/12279 + // Keep trying to translate host IDs to IPs until all are available in gossip + // Ultimately, we should take this information from token_metadata + const auto sync_nodes_resolve_deadline = lowres_clock::now() + wait_for_live_nodes_timeout; + while (true) { + sync_nodes.clear(); + std::vector untranslated_ids; + for (const auto& [id, _] : _topology_state_machine._topology.normal_nodes) { + if (ignored_ids.contains(id)) { + continue; + } + if (auto ip = amap.find(id)) { + sync_nodes.push_back(*ip); + } else { + untranslated_ids.push_back(id); + } + } + + if (!untranslated_ids.empty()) { + if (lowres_clock::now() > sync_nodes_resolve_deadline) { + throw std::runtime_error(format( + "Failed to obtain IP addresses of nodes that should be seen" + " as alive within {}s", + std::chrono::duration_cast(wait_for_live_nodes_timeout).count())); + } + + static logger::rate_limit rate_limit{std::chrono::seconds(1)}; + slogger.log(log_level::warn, rate_limit, "raft topology: cannot map nodes {} to ips, retrying.", + untranslated_ids); + + co_await sleep_abortable(std::chrono::milliseconds(5), _group0_as); } else { - untranslated_ids.push_back(id); + break; } } - if (!untranslated_ids.empty()) { - if (lowres_clock::now() > sync_nodes_resolve_deadline) { - throw std::runtime_error(format( - "Failed to obtain IP addresses of nodes that should be seen" - " as alive within {}s", - std::chrono::duration_cast(wait_for_live_nodes_timeout).count())); - } + slogger.info("raft topology: coordinator accepted request to join, " + "waiting for nodes {} to be alive before responding and continuing", + sync_nodes); + co_await _gossiper.wait_alive(sync_nodes, wait_for_live_nodes_timeout); + slogger.info("raft topology: nodes {} are alive", sync_nodes); - static logger::rate_limit rate_limit{std::chrono::seconds(1)}; - slogger.log(log_level::warn, rate_limit, "raft topology: cannot map nodes {} to ips, retrying.", - untranslated_ids); + // Unblock waiting join_node_rpc_handshaker::post_server_start, + // which will start the raft server and continue + _join_node_response_done.set_value(); - co_await sleep_abortable(std::chrono::milliseconds(5), _group0_as); - } else { - break; - } - } + co_return join_node_response_result{}; + }, + [&] (const join_node_response_params::rejected& rej) -> future { + auto eptr = std::make_exception_ptr(std::runtime_error( + format("the topology coordinator rejected request to join the cluster: {}", rej.reason))); + _join_node_response_done.set_exception(std::move(eptr)); - slogger.info("raft topology: coordinator accepted request to join, " - "waiting for nodes {} to be alive before responding and continuing", - sync_nodes); - co_await _gossiper.wait_alive(sync_nodes, wait_for_live_nodes_timeout); - slogger.info("raft topology: nodes {} are alive", sync_nodes); + co_return join_node_response_result{}; + }, + }, params.response); + } catch (...) { + auto eptr = std::current_exception(); + slogger.warn("raft topology: error while handling the join response from the topology coordinator. " + "The node will not join the cluster. Error: {}", eptr); + _join_node_response_done.set_exception(std::move(eptr)); - // Unblock waiting join_node_rpc_handshaker::post_server_start, - // which will start the raft server and continue - _join_node_response_done.set_value(); - - co_return join_node_response_result{}; - }, - [&] (const join_node_response_params::rejected& rej) -> future { - auto eptr = std::make_exception_ptr(std::runtime_error( - format("the topology coordinator rejected request to join the cluster: {}", rej.reason))); - _join_node_response_done.set_exception(std::move(eptr)); - - co_return join_node_response_result{}; - }, - }, params.response); + throw; + } } void storage_service::init_messaging_service(bool raft_topology_change_enabled) {