Merge 'raft topology: join: shut down a node on error in response handler' from Patryk Jędrzejczak

If the joining node fails while handling the response from the
topology coordinator, it hangs even though it knows the join
operation has failed. Therefore, we ensure it shuts down in
this patch.

Additionally, we ensure that if the first join request response
was a rejection or the node failed while handling it, the
following acceptances by the (possibly different) coordinator
don't succeed. The node considers the join operation as failed.
We shouldn't add it to the cluster.

Fixes scylladb/scylladb#16333

Closes scylladb/scylladb#16650

* github.com:scylladb/scylladb:
  topology_coordinator: clarify warnings
  raft topology: join: allow only the first response to be a succesful acceptance
  storage_service: join_node_response_handler: fix indentation
  raft topology: join: shut down a node on error in response handler
This commit is contained in:
Kamil Braun
2024-01-17 14:55:26 +01:00

View File

@@ -2791,8 +2791,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
.response = std::move(validation_result),
});
} catch (const std::runtime_error& e) {
slogger.warn("raft topology: attempt to send rejection response to {} failed: {}. "
"The node may hang. It's safe to shut it down manually now.",
slogger.warn("raft topology: attempt to send rejection response to {} failed. "
"The node may hang. It's safe to shut it down manually now. Error: {}",
node.id, e.what());
}
@@ -3074,8 +3074,8 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
});
responded = true;
} catch (const std::runtime_error& e) {
slogger.warn("raft topology: attempt to send acceptance response to {} failed: {}. "
"The node may hang. It's safe to shut it down manually now.",
slogger.warn("raft topology: attempt to send acceptance response to {} failed. "
"The node may hang. It's safe to shut it down manually now. Error: {}",
node.id, e.what());
}
@@ -7827,102 +7827,119 @@ future<join_node_response_result> storage_service::join_node_response_handler(jo
co_await _join_node_request_done.get_shared_future(_group0_as);
if (_join_node_response_done.available()) {
// We already handled this RPC. No need to retry it. Return immediately for idempotence.
// We already handled this RPC. No need to retry it.
slogger.info("raft topology: the node got join_node_response RPC for the second time, ignoring");
if (std::holds_alternative<join_node_response_params::accepted>(params.response)
&& _join_node_response_done.failed()) {
// The topology coordinator accepted the node that was rejected before or failed while handling
// the response. Inform the coordinator about it so it moves the node to the left state.
throw _join_node_response_done.get_shared_future().get_exception();
}
co_return join_node_response_result{};
}
co_return co_await std::visit(overloaded_functor {
[&] (const join_node_response_params::accepted& acc) -> future<join_node_response_result> {
// Allow other nodes to mark the replacing node as alive. It has
// effect only if the replacing node is reusing the IP of the
// replaced node. In such a case, we do not allow the replacing
// node to advertise itself earlier. Thanks to this, if the
// topology sees the node being replaced as alive, it can safely
// reject the join request because it can be sure that it is not
// the replacing node that is alive.
co_await _gossiper.advertise_to_nodes({});
try {
co_return co_await std::visit(overloaded_functor {
[&] (const join_node_response_params::accepted& acc) -> future<join_node_response_result> {
// Allow other nodes to mark the replacing node as alive. It has
// effect only if the replacing node is reusing the IP of the
// replaced node. In such a case, we do not allow the replacing
// node to advertise itself earlier. Thanks to this, if the
// topology sees the node being replaced as alive, it can safely
// reject the join request because it can be sure that it is not
// the replacing node that is alive.
co_await _gossiper.advertise_to_nodes({});
// Do a read barrier to read/initialize the topology state
auto& raft_server = _group0->group0_server();
co_await raft_server.read_barrier(&_group0_as);
// Do a read barrier to read/initialize the topology state
auto& raft_server = _group0->group0_server();
co_await raft_server.read_barrier(&_group0_as);
// Calculate nodes to ignore
// TODO: ignore_dead_nodes setting for bootstrap
std::unordered_set<raft::server_id> ignored_ids;
auto my_request_it =
_topology_state_machine._topology.req_param.find(_group0->load_my_id());
if (my_request_it != _topology_state_machine._topology.req_param.end()) {
if (auto* replace = std::get_if<service::replace_param>(&my_request_it->second)) {
ignored_ids = replace->ignored_ids;
ignored_ids.insert(replace->replaced_id);
}
}
// After this RPC finishes, repair or streaming will be run, and
// both of them require this node to see the normal nodes as UP.
// This condition might not be true yet as this information is
// propagated through gossip. In order to reduce the chance of
// repair/streaming failure, wait here until we see normal nodes
// as UP (or the timeout elapses).
const auto& amap = _group0->address_map();
std::vector<gms::inet_address> sync_nodes;
// FIXME: https://github.com/scylladb/scylladb/issues/12279
// Keep trying to translate host IDs to IPs until all are available in gossip
// Ultimately, we should take this information from token_metadata
const auto sync_nodes_resolve_deadline = lowres_clock::now() + wait_for_live_nodes_timeout;
while (true) {
sync_nodes.clear();
std::vector<raft::server_id> untranslated_ids;
for (const auto& [id, _] : _topology_state_machine._topology.normal_nodes) {
if (ignored_ids.contains(id)) {
continue;
// Calculate nodes to ignore
// TODO: ignore_dead_nodes setting for bootstrap
std::unordered_set<raft::server_id> ignored_ids;
auto my_request_it =
_topology_state_machine._topology.req_param.find(_group0->load_my_id());
if (my_request_it != _topology_state_machine._topology.req_param.end()) {
if (auto* replace = std::get_if<service::replace_param>(&my_request_it->second)) {
ignored_ids = replace->ignored_ids;
ignored_ids.insert(replace->replaced_id);
}
if (auto ip = amap.find(id)) {
sync_nodes.push_back(*ip);
}
// After this RPC finishes, repair or streaming will be run, and
// both of them require this node to see the normal nodes as UP.
// This condition might not be true yet as this information is
// propagated through gossip. In order to reduce the chance of
// repair/streaming failure, wait here until we see normal nodes
// as UP (or the timeout elapses).
const auto& amap = _group0->address_map();
std::vector<gms::inet_address> sync_nodes;
// FIXME: https://github.com/scylladb/scylladb/issues/12279
// Keep trying to translate host IDs to IPs until all are available in gossip
// Ultimately, we should take this information from token_metadata
const auto sync_nodes_resolve_deadline = lowres_clock::now() + wait_for_live_nodes_timeout;
while (true) {
sync_nodes.clear();
std::vector<raft::server_id> untranslated_ids;
for (const auto& [id, _] : _topology_state_machine._topology.normal_nodes) {
if (ignored_ids.contains(id)) {
continue;
}
if (auto ip = amap.find(id)) {
sync_nodes.push_back(*ip);
} else {
untranslated_ids.push_back(id);
}
}
if (!untranslated_ids.empty()) {
if (lowres_clock::now() > sync_nodes_resolve_deadline) {
throw std::runtime_error(format(
"Failed to obtain IP addresses of nodes that should be seen"
" as alive within {}s",
std::chrono::duration_cast<std::chrono::seconds>(wait_for_live_nodes_timeout).count()));
}
static logger::rate_limit rate_limit{std::chrono::seconds(1)};
slogger.log(log_level::warn, rate_limit, "raft topology: cannot map nodes {} to ips, retrying.",
untranslated_ids);
co_await sleep_abortable(std::chrono::milliseconds(5), _group0_as);
} else {
untranslated_ids.push_back(id);
break;
}
}
if (!untranslated_ids.empty()) {
if (lowres_clock::now() > sync_nodes_resolve_deadline) {
throw std::runtime_error(format(
"Failed to obtain IP addresses of nodes that should be seen"
" as alive within {}s",
std::chrono::duration_cast<std::chrono::seconds>(wait_for_live_nodes_timeout).count()));
}
slogger.info("raft topology: coordinator accepted request to join, "
"waiting for nodes {} to be alive before responding and continuing",
sync_nodes);
co_await _gossiper.wait_alive(sync_nodes, wait_for_live_nodes_timeout);
slogger.info("raft topology: nodes {} are alive", sync_nodes);
static logger::rate_limit rate_limit{std::chrono::seconds(1)};
slogger.log(log_level::warn, rate_limit, "raft topology: cannot map nodes {} to ips, retrying.",
untranslated_ids);
// Unblock waiting join_node_rpc_handshaker::post_server_start,
// which will start the raft server and continue
_join_node_response_done.set_value();
co_await sleep_abortable(std::chrono::milliseconds(5), _group0_as);
} else {
break;
}
}
co_return join_node_response_result{};
},
[&] (const join_node_response_params::rejected& rej) -> future<join_node_response_result> {
auto eptr = std::make_exception_ptr(std::runtime_error(
format("the topology coordinator rejected request to join the cluster: {}", rej.reason)));
_join_node_response_done.set_exception(std::move(eptr));
slogger.info("raft topology: coordinator accepted request to join, "
"waiting for nodes {} to be alive before responding and continuing",
sync_nodes);
co_await _gossiper.wait_alive(sync_nodes, wait_for_live_nodes_timeout);
slogger.info("raft topology: nodes {} are alive", sync_nodes);
co_return join_node_response_result{};
},
}, params.response);
} catch (...) {
auto eptr = std::current_exception();
slogger.warn("raft topology: error while handling the join response from the topology coordinator. "
"The node will not join the cluster. Error: {}", eptr);
_join_node_response_done.set_exception(std::move(eptr));
// Unblock waiting join_node_rpc_handshaker::post_server_start,
// which will start the raft server and continue
_join_node_response_done.set_value();
co_return join_node_response_result{};
},
[&] (const join_node_response_params::rejected& rej) -> future<join_node_response_result> {
auto eptr = std::make_exception_ptr(std::runtime_error(
format("the topology coordinator rejected request to join the cluster: {}", rej.reason)));
_join_node_response_done.set_exception(std::move(eptr));
co_return join_node_response_result{};
},
}, params.response);
throw;
}
}
void storage_service::init_messaging_service(bool raft_topology_change_enabled) {