From b0eef50b2e9e44c830f86f5dd715343c8f1db84a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Patryk=20J=C4=99drzejczak?= Date: Fri, 26 Jan 2024 10:08:10 +0100 Subject: [PATCH] raft topology: make left_token_ring a transition state A node can be in the `left_token_ring` state after: - a finished decommission, - a failed bootstrap, - a failed replace. When a node is in the `left_token_ring` state, we don't know how it has ended up in this state. We cannot distinguish a node that has finished decommissioning from a node that has failed bootstrap. The main problem it causes is that we incorrectly send the `barrier_and_drain` command to a node that has failed bootstrapping or replacing. We must do it for a node that has finished decommissioning because it could still coordinate requests. However, since we cannot distinguish nodes in the `left_token_ring` state, we must send the command to all of them. This issue appeared in scylladb/scylladb#16797 and this patch is a follow-up that fixes it. The solution is changing `left_token_ring` from a node state to a transition state. Regarding implementation, most of the changes are simple refactoring. The less obvious are: - Before this patch, in `system_keyspace::left_topology_state`, we had to keep the ignored nodes' IDs for replace to ensure that the replacing node will have access to it after moving to the `left_token_ring` state, which happens when replace fails. We don't need this workaround anymore. When we enter the new `left_token_ring` transition state, the new node will still be in the `decommissioning` state, so it won't lose its request param. - Before this patch, a decommissioning node lost its tokens while moving to the `left_token_ring` state. After the patch, it loses tokens while still being in the `decommissioning` state. We ensure that all `decommissioning` handlers correctly handle a node that lost its tokens. Moving the `left_token_ring` handler from `handle_node_transition` to `handle_topology_transition` created a large diff. There are only three changes: - adding `auto node = get_node_to_work_on(std::move(guard));`, - adding `builder.del_transition_state()`, - changing error logged when `global_token_metadata_barrier` fails. --- db/system_keyspace.cc | 21 +- service/storage_service.cc | 11 +- service/topology_coordinator.cc | 191 ++++++++++-------- service/topology_state_machine.cc | 2 +- service/topology_state_machine.hh | 2 +- .../test_topology_failure_recovery.py | 9 +- 6 files changed, 122 insertions(+), 114 deletions(-) diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index aaf1da8fbe..699950683b 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -2758,17 +2758,6 @@ future system_keyspace::load_topology_state() { } ret.req_param.emplace(host_id, service::rebuild_param{*rebuild_option}); break; - case service::node_state::left_token_ring: - // If replacenode fails the bootstrapping node is moved to left_token_ring state where it executes the metadata - // barrier. It needs to know which nodes to ignore during the barrier, so put them here into the replace_param. - // Note that if the replacenode does not fail and later the node is decommissioned it will move to the left_token_ring - // state at some point and replace_param will be created here as well (we do not remove replaced_id, and ignored_ids - // when we move to normal state). But this is OK because we allow to ignore nodes during topology operations only if they - // are permanently dead. - if (replaced_id) { - ret.req_param.emplace(host_id, service::replace_param{*replaced_id, std::move(ignored_ids)}); - } - break; case service::node_state::rollback_to_normal: if (replaced_id) { ret.req_param.emplace(host_id, service::removenode_param{std::move(ignored_ids)}); @@ -2800,10 +2789,12 @@ future system_keyspace::load_topology_state() { "load_topology_state: found two nodes in transitioning state: {} in {} state and {} in {} state", other_id, other_rs.state, host_id, nstate)); } + // A decommissioning node doesn't have tokens at the end, they are + // removed during transition to the left_token_ring state. // Bootstrapping and replacing nodes don't have tokens at first, // they are inserted only at some point during bootstrap/replace if (!ring_slice - && nstate != service::node_state::left_token_ring + && nstate != service::node_state::decommissioning && nstate != service::node_state::bootstrapping && nstate != service::node_state::replacing) { on_fatal_internal_error(slogger, format( @@ -2833,11 +2824,9 @@ future system_keyspace::load_topology_state() { if (some_row.has("transition_state")) { ret.tstate = service::transition_state_from_string(some_row.get_as("transition_state")); } else { - // Any remaining transition_nodes must be in left_token_ring state - // or rebuilding or rollback_to_normal + // Any remaining transition_nodes must be in rebuilding or rollback_to_normal state. auto it = std::find_if(ret.transition_nodes.begin(), ret.transition_nodes.end(), - [] (auto& p) { return p.second.state != service::node_state::left_token_ring && - p.second.state != service::node_state::rebuilding && + [] (auto& p) { return p.second.state != service::node_state::rebuilding && p.second.state != service::node_state::rollback_to_normal; }); if (it != ret.transition_nodes.end()) { on_internal_error(slogger, format( diff --git a/service/storage_service.cc b/service/storage_service.cc index 39a34113c3..65d6c3b7bd 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -334,7 +334,6 @@ static locator::node::state to_topology_node_state(node_state ns) { case node_state::removing: return locator::node::state::being_removed; case node_state::normal: return locator::node::state::normal; case node_state::rollback_to_normal: return locator::node::state::normal; - case node_state::left_token_ring: return locator::node::state::left; case node_state::left: return locator::node::state::left; case node_state::replacing: return locator::node::state::replacing; case node_state::rebuilding: return locator::node::state::normal; @@ -457,6 +456,11 @@ future<> storage_service::sync_raft_topology_nodes(mutable_token_metadata_ptr tm } break; case node_state::decommissioning: + // A decommissioning node loses its tokens when topology moves to left_token_ring. + if (_topology_state_machine._topology.tstate == topology::transition_state::left_token_ring) { + break; + } + [[fallthrough]]; case node_state::removing: update_topology(host_id, ip, rs); co_await tmptr->update_normal_tokens(rs.ring.value().tokens, host_id); @@ -485,8 +489,6 @@ future<> storage_service::sync_raft_topology_nodes(mutable_token_metadata_ptr tm // Rebuilding node is normal co_await process_normal_node(id, rs); break; - case node_state::left_token_ring: - break; case node_state::rollback_to_normal: // no need for double writes anymore since op failed co_await process_normal_node(id, rs); @@ -578,6 +580,8 @@ future<> storage_service::topology_state_load() { case topology::transition_state::tablet_draining: [[fallthrough]]; case topology::transition_state::write_both_read_old: + [[fallthrough]]; + case topology::transition_state::left_token_ring: return read_new_t::no; case topology::transition_state::write_both_read_new: return read_new_t::yes; @@ -4802,7 +4806,6 @@ future storage_service::raft_topology_cmd_handler(raft result.status = raft_topology_cmd_result::command_status::success; } break; - case node_state::left_token_ring: case node_state::left: case node_state::none: case node_state::removing: diff --git a/service/topology_coordinator.cc b/service/topology_coordinator.cc index 8f749e998e..869c0fdf99 100644 --- a/service/topology_coordinator.cc +++ b/service/topology_coordinator.cc @@ -368,7 +368,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { | boost::adaptors::filtered([&cmd, &exclude_nodes] (const std::pair& n) { // We must send barrier_and_drain to the decommissioning node as it might be coordinating requests. bool drain_decommissioning_node = cmd.cmd == raft_topology_cmd::command::barrier_and_drain - && (n.second.state == node_state::decommissioning || n.second.state == node_state::left_token_ring); + && n.second.state == node_state::decommissioning; return !exclude_nodes.contains(n.first) && (n.second.state == node_state::normal || drain_decommissioning_node); }) | boost::adaptors::map_keys; @@ -1503,10 +1503,13 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { [[fallthrough]]; case node_state::decommissioning: { topology_mutation_builder builder(node.guard.write_timestamp()); - auto next_state = node.rs->state == node_state::decommissioning - ? node_state::left_token_ring : node_state::left; - builder.del_transition_state() - .set_version(_topo_sm._topology.version + 1) + auto next_state = node.rs->state == node_state::decommissioning ? node.rs->state : node_state::left; + if (node.rs->state == node_state::decommissioning) { + builder.set_transition_state(topology::transition_state::left_token_ring); + } else { + builder.del_transition_state(); + } + builder.set_version(_topo_sm._topology.version + 1) .with_node(node.id) .del("tokens") .set("node_state", next_state); @@ -1551,6 +1554,85 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { case topology::transition_state::tablet_migration: co_await handle_tablet_migration(std::move(guard), false); break; + case topology::transition_state::left_token_ring: { + auto node = get_node_to_work_on(std::move(guard)); + + if (node.id == _raft.id()) { + // Someone else needs to coordinate the rest of the decommission process, + // because the decommissioning node is going to shut down in the middle of this state. + rtlogger.info("coordinator is decommissioning; giving up leadership"); + co_await step_down_as_nonvoter(); + + // Note: if we restart after this point and become a voter + // and then a coordinator again, it's fine - we'll just repeat this step. + // (If we're in `left` state when we try to restart we won't + // be able to become a voter - we'll be banned from the cluster.) + } + + bool barrier_failed = false; + // Wait until other nodes observe the new token ring and stop sending writes to this node. + try { + node = retake_node(co_await global_token_metadata_barrier(std::move(node.guard), get_excluded_nodes(node)), node.id); + } catch (term_changed_error&) { + throw; + } catch (group0_concurrent_modification&) { + throw; + } catch (...) { + rtlogger.error("transition_state::left_token_ring, " + "raft_topology_cmd::command::barrier failed, error {}", + std::current_exception()); + barrier_failed = true; + } + + if (barrier_failed) { + // If barrier above failed it means there may be unfinished writes to a decommissioned node. + // Lets wait for the ring delay for those writes to complete and new topology to propagate + // before continuing. + co_await sleep_abortable(_ring_delay, _as); + node = retake_node(co_await start_operation(), node.id); + } + + topology_request_tracking_mutation_builder rtbuilder(node.rs->request_id); + + rtbuilder.done(); + + co_await update_topology_state(take_guard(std::move(node)), {rtbuilder.build()}, "report request completion in left_token_ring sate"); + + // Tell the node to shut down. + // This is done to improve user experience when there are no failures. + // In the next state (`node_state::left`), the node will be banned by the rest of the cluster, + // so there's no guarantee that it would learn about entering that state even if it was still + // a member of group0, hence we use a separate direct RPC in this state to shut it down. + // + // There is the possibility that the node will never get the message + // and decommission will hang on that node. + // This is fine for the rest of the cluster - we will still remove, ban the node and continue. + auto node_id = node.id; + bool shutdown_failed = false; + try { + node = co_await exec_direct_command(std::move(node), raft_topology_cmd::command::barrier); + } catch (...) { + rtlogger.warn("failed to tell node {} to shut down - it may hang." + " It's safe to shut it down manually now. (Exception: {})", + node.id, std::current_exception()); + shutdown_failed = true; + } + if (shutdown_failed) { + node = retake_node(co_await start_operation(), node_id); + } + + // Remove the node from group0 here - in general, it won't be able to leave on its own + // because we'll ban it as soon as we tell it to shut down. + co_await remove_from_group0(node.id); + + topology_mutation_builder builder(node.guard.write_timestamp()); + builder.del_transition_state() + .with_node(node.id) + .set("node_state", node_state::left); + auto str = ::format("finished decommissioning node {}", node.id); + co_await update_topology_state(take_guard(std::move(node)), {builder.build()}, std::move(str)); + } + break; } co_return true; }; @@ -1731,82 +1813,6 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { co_await update_topology_state(take_guard(std::move(node)), {builder.build(), rtbuilder.build()}, "rebuilding completed"); } break; - case node_state::left_token_ring: { - if (node.id == _raft.id()) { - // Someone else needs to coordinate the rest of the decommission process, - // because the decommissioning node is going to shut down in the middle of this state. - rtlogger.info("coordinator is decommissioning; giving up leadership"); - co_await step_down_as_nonvoter(); - - // Note: if we restart after this point and become a voter - // and then a coordinator again, it's fine - we'll just repeat this step. - // (If we're in `left` state when we try to restart we won't - // be able to become a voter - we'll be banned from the cluster.) - } - - bool barrier_failed = false; - // Wait until other nodes observe the new token ring and stop sending writes to this node. - try { - node = retake_node(co_await global_token_metadata_barrier(std::move(node.guard), get_excluded_nodes(node)), node.id); - } catch (term_changed_error&) { - throw; - } catch (group0_concurrent_modification&) { - throw; - } catch (...) { - rtlogger.error("node_state::left_token_ring (node: {}), " - "global_token_metadata_barrier failed, error {}", - node.id, std::current_exception()); - barrier_failed = true; - } - - if (barrier_failed) { - // If barrier above failed it means there may be unfinished writes to a decommissioned node. - // Lets wait for the ring delay for those writes to complete and new topology to propagate - // before continuing. - co_await sleep_abortable(_ring_delay, _as); - node = retake_node(co_await start_operation(), node.id); - } - - topology_request_tracking_mutation_builder rtbuilder(node.rs->request_id); - - rtbuilder.done(); - - co_await update_topology_state(take_guard(std::move(node)), {rtbuilder.build()}, "report request completion in left_token_ring state"); - - // Tell the node to shut down. - // This is done to improve user experience when there are no failures. - // In the next state (`node_state::left`), the node will be banned by the rest of the cluster, - // so there's no guarantee that it would learn about entering that state even if it was still - // a member of group0, hence we use a separate direct RPC in this state to shut it down. - // - // There is the possibility that the node will never get the message - // and decommission will hang on that node. - // This is fine for the rest of the cluster - we will still remove, ban the node and continue. - auto node_id = node.id; - bool shutdown_failed = false; - try { - node = co_await exec_direct_command(std::move(node), raft_topology_cmd::command::barrier); - } catch (...) { - rtlogger.warn("failed to tell node {} to shut down - it may hang." - " It's safe to shut it down manually now. (Exception: {})", - node.id, std::current_exception()); - shutdown_failed = true; - } - if (shutdown_failed) { - node = retake_node(co_await start_operation(), node_id); - } - - // Remove the node from group0 here - in general, it won't be able to leave on its own - // because we'll ban it as soon as we tell it to shut down. - co_await remove_from_group0(node.id); - - topology_mutation_builder builder(node.guard.write_timestamp()); - builder.with_node(node.id) - .set("node_state", node_state::left); - auto str = ::format("finished decommissioning node {}", node.id); - co_await update_topology_state(take_guard(std::move(node)), {builder.build()}, std::move(str)); - } - break; case node_state::rollback_to_normal: { // The barrier waits for all double writes started during the operation to complete. It allowed to fail // since we will fence the requests later. @@ -2086,15 +2092,16 @@ future<> topology_coordinator::rollback_current_topology_op(group0_guard&& guard // Look for a node which operation should be aborted // (there should be one since we are in the rollback) node_to_work_on node = get_node_to_work_on(std::move(guard)); - node_state state; + node_state state = node.rs->state; + std::optional transition_state; switch (node.rs->state) { case node_state::bootstrapping: [[fallthrough]]; case node_state::replacing: - // To rollback bootstrap of replace just move a node that we tried to add to the left_token_ring state. - // It will be removed from the group0 by the state handler. It will also be notified to shutdown. - state = node_state::left_token_ring; + // To rollback bootstrap of replace just move the topology to left_token_ring. The node we tried to + // add will be removed from the group0 by the transition handler. It will also be notified to shutdown. + transition_state = topology::transition_state::left_token_ring; break; case node_state::removing: // The node was removed already. We need to add it back. Lets do it as non voter. @@ -2111,8 +2118,16 @@ future<> topology_coordinator::rollback_current_topology_op(group0_guard&& guard topology_mutation_builder builder(node.guard.write_timestamp()); topology_request_tracking_mutation_builder rtbuilder(node.rs->request_id); - builder.del_transition_state() - .set_version(_topo_sm._topology.version + 1) + std::string str; + if (transition_state) { + builder.set_transition_state(*transition_state); + str = fmt::format("rollback {} after {} failure, moving transition state to {} and setting cleanup flag", + node.id, node.rs->state, *transition_state); + } else { + builder.del_transition_state(); + str = fmt::format("rollback {} after {} failure to state {} and setting cleanup flag", node.id, node.rs->state, state); + } + builder.set_version(_topo_sm._topology.version + 1) .with_node(node.id) .set("node_state", state); rtbuilder.set("error", fmt::format("Rolled back: {}", *_rollback)); @@ -2124,8 +2139,6 @@ future<> topology_coordinator::rollback_current_topology_op(group0_guard&& guard muts.emplace_back(builder.build()); muts.emplace_back(rtbuilder.build()); - auto str = fmt::format("rollback {} after {} failure to state {} and setting cleanup flag", node.id, node.rs->state, state); - rtlogger.info("{}", str); co_await update_topology_state(std::move(node.guard), std::move(muts), str); } diff --git a/service/topology_state_machine.cc b/service/topology_state_machine.cc index 8a398bf112..5e8a09b3dd 100644 --- a/service/topology_state_machine.cc +++ b/service/topology_state_machine.cc @@ -168,6 +168,7 @@ static std::unordered_map transition_state_ {topology::transition_state::write_both_read_new, "write both read new"}, {topology::transition_state::tablet_migration, "tablet migration"}, {topology::transition_state::tablet_draining, "tablet draining"}, + {topology::transition_state::left_token_ring, "left token ring"}, }; std::ostream& operator<<(std::ostream& os, topology::transition_state s) { @@ -192,7 +193,6 @@ static std::unordered_map node_state_to_name_map = { {node_state::decommissioning, "decommissioning"}, {node_state::removing, "removing"}, {node_state::normal, "normal"}, - {node_state::left_token_ring, "left_token_ring"}, {node_state::left, "left"}, {node_state::replacing, "replacing"}, {node_state::rebuilding, "rebuilding"}, diff --git a/service/topology_state_machine.hh b/service/topology_state_machine.hh index 7eed585312..e63deb617b 100644 --- a/service/topology_state_machine.hh +++ b/service/topology_state_machine.hh @@ -33,7 +33,6 @@ enum class node_state: uint16_t { replacing, // the node replaces another dead node in the cluster and it data is being streamed to it rebuilding, // the node is being rebuild and is streaming data from other replicas normal, // the node does not do any streaming and serves the slice of the ring that belongs to it - left_token_ring, // the node left the token ring, but not group0 yet; we wait until other nodes stop writing to it left, // the node left the cluster and group0 rollback_to_normal, // the node rolls back failed decommission/remove node operation }; @@ -116,6 +115,7 @@ struct topology { write_both_read_old, write_both_read_new, tablet_migration, + left_token_ring, }; std::optional tstate; diff --git a/test/topology/test_topology_failure_recovery.py b/test/topology/test_topology_failure_recovery.py index 17c5cb4188..a1b62065f8 100644 --- a/test/topology/test_topology_failure_recovery.py +++ b/test/topology/test_topology_failure_recovery.py @@ -49,7 +49,8 @@ async def test_topology_streaming_failure(request, manager: ManagerClient): await manager.server_start(s.server_id, expected_error="Bootstrap failed. See earlier errors") servers = await manager.running_servers() assert s not in servers - matches = [await log.grep("raft_topology - rollback.*after bootstrapping failure to state left_token_ring", from_mark=mark) for log, mark in zip(logs, marks)] + matches = [await log.grep("raft_topology - rollback.*after bootstrapping failure, moving transition state to left token ring", + from_mark=mark) for log, mark in zip(logs, marks)] assert sum(len(x) for x in matches) == 1 # bootstrap failure in raft barrier marks = [await log.mark() for log in logs] @@ -59,7 +60,8 @@ async def test_topology_streaming_failure(request, manager: ManagerClient): await manager.server_start(s.server_id, expected_error="Bootstrap failed. See earlier errors") servers = await manager.running_servers() assert s not in servers - matches = [await log.grep("raft_topology - rollback.*after bootstrapping failure to state left_token_ring", from_mark=mark) for log, mark in zip(logs, marks)] + matches = [await log.grep("raft_topology - rollback.*after bootstrapping failure, moving transition state to left token ring", + from_mark=mark) for log, mark in zip(logs, marks)] assert sum(len(x) for x in matches) == 1 # replace failure marks = [await log.mark() for log in logs] @@ -72,5 +74,6 @@ async def test_topology_streaming_failure(request, manager: ManagerClient): await manager.server_start(s.server_id, expected_error="Replace failed. See earlier errors") servers = await manager.running_servers() assert s not in servers - matches = [await log.grep("raft_topology - rollback.*after replacing failure to state left_token_ring", from_mark=mark) for log, mark in zip(logs, marks)] + matches = [await log.grep("raft_topology - rollback.*after replacing failure, moving transition state to left token ring", + from_mark=mark) for log, mark in zip(logs, marks)] assert sum(len(x) for x in matches) == 1