raft topology: make left_token_ring a transition state

A node can be in the `left_token_ring` state after:
- a finished decommission,
- a failed bootstrap,
- a failed replace.

When a node is in the `left_token_ring` state, we don't know how
it has ended up in this state. We cannot distinguish a node that
has finished decommissioning from a node that has failed bootstrap.

The main problem it causes is that we incorrectly send the
`barrier_and_drain` command to a node that has failed
bootstrapping or replacing. We must do it for a node that has
finished decommissioning because it could still coordinate
requests. However, since we cannot distinguish nodes in the
`left_token_ring` state, we must send the command to all of them.
This issue appeared in scylladb/scylladb#16797 and this patch is
a follow-up that fixes it.

The solution is changing `left_token_ring` from a node state
to a transition state.

Regarding implementation, most of the changes are simple
refactoring. The less obvious are:
- Before this patch, in `system_keyspace::left_topology_state`, we
had to keep the ignored nodes' IDs for replace to ensure that the
replacing node will have access to it after moving to the
`left_token_ring` state, which happens when replace fails. We
don't need this workaround anymore. When we enter the new
`left_token_ring` transition state, the new node will still be in
the `decommissioning` state, so it won't lose its request param.
- Before this patch, a decommissioning node lost its tokens
while moving to the `left_token_ring` state. After the patch, it
loses tokens while still being in the `decommissioning` state. We
ensure that all `decommissioning` handlers correctly handle a node
that lost its tokens.

Moving the `left_token_ring` handler from `handle_node_transition`
to `handle_topology_transition` created a large diff. There are
only three changes:
- adding `auto node = get_node_to_work_on(std::move(guard));`,
- adding `builder.del_transition_state()`,
- changing error logged when `global_token_metadata_barrier` fails.
This commit is contained in:
Patryk Jędrzejczak
2024-01-26 10:08:10 +01:00
parent 12eb0738cf
commit b0eef50b2e
6 changed files with 122 additions and 114 deletions

View File

@@ -2758,17 +2758,6 @@ future<service::topology> system_keyspace::load_topology_state() {
}
ret.req_param.emplace(host_id, service::rebuild_param{*rebuild_option});
break;
case service::node_state::left_token_ring:
// If replacenode fails the bootstrapping node is moved to left_token_ring state where it executes the metadata
// barrier. It needs to know which nodes to ignore during the barrier, so put them here into the replace_param.
// Note that if the replacenode does not fail and later the node is decommissioned it will move to the left_token_ring
// state at some point and replace_param will be created here as well (we do not remove replaced_id, and ignored_ids
// when we move to normal state). But this is OK because we allow to ignore nodes during topology operations only if they
// are permanently dead.
if (replaced_id) {
ret.req_param.emplace(host_id, service::replace_param{*replaced_id, std::move(ignored_ids)});
}
break;
case service::node_state::rollback_to_normal:
if (replaced_id) {
ret.req_param.emplace(host_id, service::removenode_param{std::move(ignored_ids)});
@@ -2800,10 +2789,12 @@ future<service::topology> system_keyspace::load_topology_state() {
"load_topology_state: found two nodes in transitioning state: {} in {} state and {} in {} state",
other_id, other_rs.state, host_id, nstate));
}
// A decommissioning node doesn't have tokens at the end, they are
// removed during transition to the left_token_ring state.
// Bootstrapping and replacing nodes don't have tokens at first,
// they are inserted only at some point during bootstrap/replace
if (!ring_slice
&& nstate != service::node_state::left_token_ring
&& nstate != service::node_state::decommissioning
&& nstate != service::node_state::bootstrapping
&& nstate != service::node_state::replacing) {
on_fatal_internal_error(slogger, format(
@@ -2833,11 +2824,9 @@ future<service::topology> system_keyspace::load_topology_state() {
if (some_row.has("transition_state")) {
ret.tstate = service::transition_state_from_string(some_row.get_as<sstring>("transition_state"));
} else {
// Any remaining transition_nodes must be in left_token_ring state
// or rebuilding or rollback_to_normal
// Any remaining transition_nodes must be in rebuilding or rollback_to_normal state.
auto it = std::find_if(ret.transition_nodes.begin(), ret.transition_nodes.end(),
[] (auto& p) { return p.second.state != service::node_state::left_token_ring &&
p.second.state != service::node_state::rebuilding &&
[] (auto& p) { return p.second.state != service::node_state::rebuilding &&
p.second.state != service::node_state::rollback_to_normal; });
if (it != ret.transition_nodes.end()) {
on_internal_error(slogger, format(

View File

@@ -334,7 +334,6 @@ static locator::node::state to_topology_node_state(node_state ns) {
case node_state::removing: return locator::node::state::being_removed;
case node_state::normal: return locator::node::state::normal;
case node_state::rollback_to_normal: return locator::node::state::normal;
case node_state::left_token_ring: return locator::node::state::left;
case node_state::left: return locator::node::state::left;
case node_state::replacing: return locator::node::state::replacing;
case node_state::rebuilding: return locator::node::state::normal;
@@ -457,6 +456,11 @@ future<> storage_service::sync_raft_topology_nodes(mutable_token_metadata_ptr tm
}
break;
case node_state::decommissioning:
// A decommissioning node loses its tokens when topology moves to left_token_ring.
if (_topology_state_machine._topology.tstate == topology::transition_state::left_token_ring) {
break;
}
[[fallthrough]];
case node_state::removing:
update_topology(host_id, ip, rs);
co_await tmptr->update_normal_tokens(rs.ring.value().tokens, host_id);
@@ -485,8 +489,6 @@ future<> storage_service::sync_raft_topology_nodes(mutable_token_metadata_ptr tm
// Rebuilding node is normal
co_await process_normal_node(id, rs);
break;
case node_state::left_token_ring:
break;
case node_state::rollback_to_normal:
// no need for double writes anymore since op failed
co_await process_normal_node(id, rs);
@@ -578,6 +580,8 @@ future<> storage_service::topology_state_load() {
case topology::transition_state::tablet_draining:
[[fallthrough]];
case topology::transition_state::write_both_read_old:
[[fallthrough]];
case topology::transition_state::left_token_ring:
return read_new_t::no;
case topology::transition_state::write_both_read_new:
return read_new_t::yes;
@@ -4802,7 +4806,6 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
result.status = raft_topology_cmd_result::command_status::success;
}
break;
case node_state::left_token_ring:
case node_state::left:
case node_state::none:
case node_state::removing:

View File

@@ -368,7 +368,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
| boost::adaptors::filtered([&cmd, &exclude_nodes] (const std::pair<const raft::server_id, replica_state>& n) {
// We must send barrier_and_drain to the decommissioning node as it might be coordinating requests.
bool drain_decommissioning_node = cmd.cmd == raft_topology_cmd::command::barrier_and_drain
&& (n.second.state == node_state::decommissioning || n.second.state == node_state::left_token_ring);
&& n.second.state == node_state::decommissioning;
return !exclude_nodes.contains(n.first) && (n.second.state == node_state::normal || drain_decommissioning_node);
})
| boost::adaptors::map_keys;
@@ -1503,10 +1503,13 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
[[fallthrough]];
case node_state::decommissioning: {
topology_mutation_builder builder(node.guard.write_timestamp());
auto next_state = node.rs->state == node_state::decommissioning
? node_state::left_token_ring : node_state::left;
builder.del_transition_state()
.set_version(_topo_sm._topology.version + 1)
auto next_state = node.rs->state == node_state::decommissioning ? node.rs->state : node_state::left;
if (node.rs->state == node_state::decommissioning) {
builder.set_transition_state(topology::transition_state::left_token_ring);
} else {
builder.del_transition_state();
}
builder.set_version(_topo_sm._topology.version + 1)
.with_node(node.id)
.del("tokens")
.set("node_state", next_state);
@@ -1551,6 +1554,85 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
case topology::transition_state::tablet_migration:
co_await handle_tablet_migration(std::move(guard), false);
break;
case topology::transition_state::left_token_ring: {
auto node = get_node_to_work_on(std::move(guard));
if (node.id == _raft.id()) {
// Someone else needs to coordinate the rest of the decommission process,
// because the decommissioning node is going to shut down in the middle of this state.
rtlogger.info("coordinator is decommissioning; giving up leadership");
co_await step_down_as_nonvoter();
// Note: if we restart after this point and become a voter
// and then a coordinator again, it's fine - we'll just repeat this step.
// (If we're in `left` state when we try to restart we won't
// be able to become a voter - we'll be banned from the cluster.)
}
bool barrier_failed = false;
// Wait until other nodes observe the new token ring and stop sending writes to this node.
try {
node = retake_node(co_await global_token_metadata_barrier(std::move(node.guard), get_excluded_nodes(node)), node.id);
} catch (term_changed_error&) {
throw;
} catch (group0_concurrent_modification&) {
throw;
} catch (...) {
rtlogger.error("transition_state::left_token_ring, "
"raft_topology_cmd::command::barrier failed, error {}",
std::current_exception());
barrier_failed = true;
}
if (barrier_failed) {
// If barrier above failed it means there may be unfinished writes to a decommissioned node.
// Lets wait for the ring delay for those writes to complete and new topology to propagate
// before continuing.
co_await sleep_abortable(_ring_delay, _as);
node = retake_node(co_await start_operation(), node.id);
}
topology_request_tracking_mutation_builder rtbuilder(node.rs->request_id);
rtbuilder.done();
co_await update_topology_state(take_guard(std::move(node)), {rtbuilder.build()}, "report request completion in left_token_ring sate");
// Tell the node to shut down.
// This is done to improve user experience when there are no failures.
// In the next state (`node_state::left`), the node will be banned by the rest of the cluster,
// so there's no guarantee that it would learn about entering that state even if it was still
// a member of group0, hence we use a separate direct RPC in this state to shut it down.
//
// There is the possibility that the node will never get the message
// and decommission will hang on that node.
// This is fine for the rest of the cluster - we will still remove, ban the node and continue.
auto node_id = node.id;
bool shutdown_failed = false;
try {
node = co_await exec_direct_command(std::move(node), raft_topology_cmd::command::barrier);
} catch (...) {
rtlogger.warn("failed to tell node {} to shut down - it may hang."
" It's safe to shut it down manually now. (Exception: {})",
node.id, std::current_exception());
shutdown_failed = true;
}
if (shutdown_failed) {
node = retake_node(co_await start_operation(), node_id);
}
// Remove the node from group0 here - in general, it won't be able to leave on its own
// because we'll ban it as soon as we tell it to shut down.
co_await remove_from_group0(node.id);
topology_mutation_builder builder(node.guard.write_timestamp());
builder.del_transition_state()
.with_node(node.id)
.set("node_state", node_state::left);
auto str = ::format("finished decommissioning node {}", node.id);
co_await update_topology_state(take_guard(std::move(node)), {builder.build()}, std::move(str));
}
break;
}
co_return true;
};
@@ -1731,82 +1813,6 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
co_await update_topology_state(take_guard(std::move(node)), {builder.build(), rtbuilder.build()}, "rebuilding completed");
}
break;
case node_state::left_token_ring: {
if (node.id == _raft.id()) {
// Someone else needs to coordinate the rest of the decommission process,
// because the decommissioning node is going to shut down in the middle of this state.
rtlogger.info("coordinator is decommissioning; giving up leadership");
co_await step_down_as_nonvoter();
// Note: if we restart after this point and become a voter
// and then a coordinator again, it's fine - we'll just repeat this step.
// (If we're in `left` state when we try to restart we won't
// be able to become a voter - we'll be banned from the cluster.)
}
bool barrier_failed = false;
// Wait until other nodes observe the new token ring and stop sending writes to this node.
try {
node = retake_node(co_await global_token_metadata_barrier(std::move(node.guard), get_excluded_nodes(node)), node.id);
} catch (term_changed_error&) {
throw;
} catch (group0_concurrent_modification&) {
throw;
} catch (...) {
rtlogger.error("node_state::left_token_ring (node: {}), "
"global_token_metadata_barrier failed, error {}",
node.id, std::current_exception());
barrier_failed = true;
}
if (barrier_failed) {
// If barrier above failed it means there may be unfinished writes to a decommissioned node.
// Lets wait for the ring delay for those writes to complete and new topology to propagate
// before continuing.
co_await sleep_abortable(_ring_delay, _as);
node = retake_node(co_await start_operation(), node.id);
}
topology_request_tracking_mutation_builder rtbuilder(node.rs->request_id);
rtbuilder.done();
co_await update_topology_state(take_guard(std::move(node)), {rtbuilder.build()}, "report request completion in left_token_ring state");
// Tell the node to shut down.
// This is done to improve user experience when there are no failures.
// In the next state (`node_state::left`), the node will be banned by the rest of the cluster,
// so there's no guarantee that it would learn about entering that state even if it was still
// a member of group0, hence we use a separate direct RPC in this state to shut it down.
//
// There is the possibility that the node will never get the message
// and decommission will hang on that node.
// This is fine for the rest of the cluster - we will still remove, ban the node and continue.
auto node_id = node.id;
bool shutdown_failed = false;
try {
node = co_await exec_direct_command(std::move(node), raft_topology_cmd::command::barrier);
} catch (...) {
rtlogger.warn("failed to tell node {} to shut down - it may hang."
" It's safe to shut it down manually now. (Exception: {})",
node.id, std::current_exception());
shutdown_failed = true;
}
if (shutdown_failed) {
node = retake_node(co_await start_operation(), node_id);
}
// Remove the node from group0 here - in general, it won't be able to leave on its own
// because we'll ban it as soon as we tell it to shut down.
co_await remove_from_group0(node.id);
topology_mutation_builder builder(node.guard.write_timestamp());
builder.with_node(node.id)
.set("node_state", node_state::left);
auto str = ::format("finished decommissioning node {}", node.id);
co_await update_topology_state(take_guard(std::move(node)), {builder.build()}, std::move(str));
}
break;
case node_state::rollback_to_normal: {
// The barrier waits for all double writes started during the operation to complete. It allowed to fail
// since we will fence the requests later.
@@ -2086,15 +2092,16 @@ future<> topology_coordinator::rollback_current_topology_op(group0_guard&& guard
// Look for a node which operation should be aborted
// (there should be one since we are in the rollback)
node_to_work_on node = get_node_to_work_on(std::move(guard));
node_state state;
node_state state = node.rs->state;
std::optional<topology::transition_state> transition_state;
switch (node.rs->state) {
case node_state::bootstrapping:
[[fallthrough]];
case node_state::replacing:
// To rollback bootstrap of replace just move a node that we tried to add to the left_token_ring state.
// It will be removed from the group0 by the state handler. It will also be notified to shutdown.
state = node_state::left_token_ring;
// To rollback bootstrap of replace just move the topology to left_token_ring. The node we tried to
// add will be removed from the group0 by the transition handler. It will also be notified to shutdown.
transition_state = topology::transition_state::left_token_ring;
break;
case node_state::removing:
// The node was removed already. We need to add it back. Lets do it as non voter.
@@ -2111,8 +2118,16 @@ future<> topology_coordinator::rollback_current_topology_op(group0_guard&& guard
topology_mutation_builder builder(node.guard.write_timestamp());
topology_request_tracking_mutation_builder rtbuilder(node.rs->request_id);
builder.del_transition_state()
.set_version(_topo_sm._topology.version + 1)
std::string str;
if (transition_state) {
builder.set_transition_state(*transition_state);
str = fmt::format("rollback {} after {} failure, moving transition state to {} and setting cleanup flag",
node.id, node.rs->state, *transition_state);
} else {
builder.del_transition_state();
str = fmt::format("rollback {} after {} failure to state {} and setting cleanup flag", node.id, node.rs->state, state);
}
builder.set_version(_topo_sm._topology.version + 1)
.with_node(node.id)
.set("node_state", state);
rtbuilder.set("error", fmt::format("Rolled back: {}", *_rollback));
@@ -2124,8 +2139,6 @@ future<> topology_coordinator::rollback_current_topology_op(group0_guard&& guard
muts.emplace_back(builder.build());
muts.emplace_back(rtbuilder.build());
auto str = fmt::format("rollback {} after {} failure to state {} and setting cleanup flag", node.id, node.rs->state, state);
rtlogger.info("{}", str);
co_await update_topology_state(std::move(node.guard), std::move(muts), str);
}

View File

@@ -168,6 +168,7 @@ static std::unordered_map<topology::transition_state, sstring> transition_state_
{topology::transition_state::write_both_read_new, "write both read new"},
{topology::transition_state::tablet_migration, "tablet migration"},
{topology::transition_state::tablet_draining, "tablet draining"},
{topology::transition_state::left_token_ring, "left token ring"},
};
std::ostream& operator<<(std::ostream& os, topology::transition_state s) {
@@ -192,7 +193,6 @@ static std::unordered_map<node_state, sstring> node_state_to_name_map = {
{node_state::decommissioning, "decommissioning"},
{node_state::removing, "removing"},
{node_state::normal, "normal"},
{node_state::left_token_ring, "left_token_ring"},
{node_state::left, "left"},
{node_state::replacing, "replacing"},
{node_state::rebuilding, "rebuilding"},

View File

@@ -33,7 +33,6 @@ enum class node_state: uint16_t {
replacing, // the node replaces another dead node in the cluster and it data is being streamed to it
rebuilding, // the node is being rebuild and is streaming data from other replicas
normal, // the node does not do any streaming and serves the slice of the ring that belongs to it
left_token_ring, // the node left the token ring, but not group0 yet; we wait until other nodes stop writing to it
left, // the node left the cluster and group0
rollback_to_normal, // the node rolls back failed decommission/remove node operation
};
@@ -116,6 +115,7 @@ struct topology {
write_both_read_old,
write_both_read_new,
tablet_migration,
left_token_ring,
};
std::optional<transition_state> tstate;

View File

@@ -49,7 +49,8 @@ async def test_topology_streaming_failure(request, manager: ManagerClient):
await manager.server_start(s.server_id, expected_error="Bootstrap failed. See earlier errors")
servers = await manager.running_servers()
assert s not in servers
matches = [await log.grep("raft_topology - rollback.*after bootstrapping failure to state left_token_ring", from_mark=mark) for log, mark in zip(logs, marks)]
matches = [await log.grep("raft_topology - rollback.*after bootstrapping failure, moving transition state to left token ring",
from_mark=mark) for log, mark in zip(logs, marks)]
assert sum(len(x) for x in matches) == 1
# bootstrap failure in raft barrier
marks = [await log.mark() for log in logs]
@@ -59,7 +60,8 @@ async def test_topology_streaming_failure(request, manager: ManagerClient):
await manager.server_start(s.server_id, expected_error="Bootstrap failed. See earlier errors")
servers = await manager.running_servers()
assert s not in servers
matches = [await log.grep("raft_topology - rollback.*after bootstrapping failure to state left_token_ring", from_mark=mark) for log, mark in zip(logs, marks)]
matches = [await log.grep("raft_topology - rollback.*after bootstrapping failure, moving transition state to left token ring",
from_mark=mark) for log, mark in zip(logs, marks)]
assert sum(len(x) for x in matches) == 1
# replace failure
marks = [await log.mark() for log in logs]
@@ -72,5 +74,6 @@ async def test_topology_streaming_failure(request, manager: ManagerClient):
await manager.server_start(s.server_id, expected_error="Replace failed. See earlier errors")
servers = await manager.running_servers()
assert s not in servers
matches = [await log.grep("raft_topology - rollback.*after replacing failure to state left_token_ring", from_mark=mark) for log, mark in zip(logs, marks)]
matches = [await log.grep("raft_topology - rollback.*after replacing failure, moving transition state to left token ring",
from_mark=mark) for log, mark in zip(logs, marks)]
assert sum(len(x) for x in matches) == 1