Merge 'raft topology: make left_token_ring a transition state' from Patryk Jędrzejczak

When a node is in the `left_token_ring` state, we don't know how
it has ended up in this state. We cannot distinguish a node that
has finished decommissioning from a node that has failed bootstrap.

The main problem it causes is that we incorrectly send the
`barrier_and_drain` command to a node that has failed
bootstrapping or replacing. We must do it for a node that has
finished decommissioning because it could still coordinate
requests. However, since we cannot distinguish nodes in the
`left_token_ring` state, we must send the command to all of them.
This issue appeared in scylladb/scylladb#16797 and this PR is
a follow-up that fixes it.

The solution is changing `left_token_ring` from a node state
to a transition state.

Fixes scylladb/scylladb#16944

Closes scylladb/scylladb#17009

* github.com:scylladb/scylladb:
  docs: dev: topology-over-raft: document the left_token_ring state
  topology_coordinator: adjust reason string in left_token_ring handler
  raft topology: make left_token_ring a transition state
  topology_coordinator: rollback_current_topology_op: remove unused exclude_nodes
This commit is contained in:
Kamil Braun
2024-01-29 15:29:01 +01:00
7 changed files with 127 additions and 117 deletions

View File

@@ -2758,17 +2758,6 @@ future<service::topology> system_keyspace::load_topology_state() {
}
ret.req_param.emplace(host_id, service::rebuild_param{*rebuild_option});
break;
case service::node_state::left_token_ring:
// If replacenode fails the bootstrapping node is moved to left_token_ring state where it executes the metadata
// barrier. It needs to know which nodes to ignore during the barrier, so put them here into the replace_param.
// Note that if the replacenode does not fail and later the node is decommissioned it will move to the left_token_ring
// state at some point and replace_param will be created here as well (we do not remove replaced_id, and ignored_ids
// when we move to normal state). But this is OK because we allow to ignore nodes during topology operations only if they
// are permanently dead.
if (replaced_id) {
ret.req_param.emplace(host_id, service::replace_param{*replaced_id, std::move(ignored_ids)});
}
break;
case service::node_state::rollback_to_normal:
if (replaced_id) {
ret.req_param.emplace(host_id, service::removenode_param{std::move(ignored_ids)});
@@ -2800,10 +2789,12 @@ future<service::topology> system_keyspace::load_topology_state() {
"load_topology_state: found two nodes in transitioning state: {} in {} state and {} in {} state",
other_id, other_rs.state, host_id, nstate));
}
// A decommissioning node doesn't have tokens at the end, they are
// removed during transition to the left_token_ring state.
// Bootstrapping and replacing nodes don't have tokens at first,
// they are inserted only at some point during bootstrap/replace
if (!ring_slice
&& nstate != service::node_state::left_token_ring
&& nstate != service::node_state::decommissioning
&& nstate != service::node_state::bootstrapping
&& nstate != service::node_state::replacing) {
on_fatal_internal_error(slogger, format(
@@ -2833,11 +2824,9 @@ future<service::topology> system_keyspace::load_topology_state() {
if (some_row.has("transition_state")) {
ret.tstate = service::transition_state_from_string(some_row.get_as<sstring>("transition_state"));
} else {
// Any remaining transition_nodes must be in left_token_ring state
// or rebuilding or rollback_to_normal
// Any remaining transition_nodes must be in rebuilding or rollback_to_normal state.
auto it = std::find_if(ret.transition_nodes.begin(), ret.transition_nodes.end(),
[] (auto& p) { return p.second.state != service::node_state::left_token_ring &&
p.second.state != service::node_state::rebuilding &&
[] (auto& p) { return p.second.state != service::node_state::rebuilding &&
p.second.state != service::node_state::rollback_to_normal; });
if (it != ret.transition_nodes.end()) {
on_internal_error(slogger, format(

View File

@@ -37,6 +37,9 @@ Additionally to specific node states, there entire topology can also be in a tra
Writes are going to both new and old replicas (new replicas means calculated according to modified
token ring), reads are using old replicas.
- `write_both_read_new` - as above, but reads are using new replicas.
- `left_token_ring` - the decommissioning node left the token ring, but we still need to wait until other
nodes observe it and stop sending writes to this node. Then, we tell the node to shut down and remove
it from group 0. We also use this state to rollback a failed bootstrap or decommission.
When a node bootstraps, we create new tokens for it and a new CDC generation
and enter the `commit_cdc_generation` state. Once the generation is committed,

View File

@@ -334,7 +334,6 @@ static locator::node::state to_topology_node_state(node_state ns) {
case node_state::removing: return locator::node::state::being_removed;
case node_state::normal: return locator::node::state::normal;
case node_state::rollback_to_normal: return locator::node::state::normal;
case node_state::left_token_ring: return locator::node::state::left;
case node_state::left: return locator::node::state::left;
case node_state::replacing: return locator::node::state::replacing;
case node_state::rebuilding: return locator::node::state::normal;
@@ -457,6 +456,11 @@ future<> storage_service::sync_raft_topology_nodes(mutable_token_metadata_ptr tm
}
break;
case node_state::decommissioning:
// A decommissioning node loses its tokens when topology moves to left_token_ring.
if (_topology_state_machine._topology.tstate == topology::transition_state::left_token_ring) {
break;
}
[[fallthrough]];
case node_state::removing:
update_topology(host_id, ip, rs);
co_await tmptr->update_normal_tokens(rs.ring.value().tokens, host_id);
@@ -485,8 +489,6 @@ future<> storage_service::sync_raft_topology_nodes(mutable_token_metadata_ptr tm
// Rebuilding node is normal
co_await process_normal_node(id, rs);
break;
case node_state::left_token_ring:
break;
case node_state::rollback_to_normal:
// no need for double writes anymore since op failed
co_await process_normal_node(id, rs);
@@ -578,6 +580,8 @@ future<> storage_service::topology_state_load() {
case topology::transition_state::tablet_draining:
[[fallthrough]];
case topology::transition_state::write_both_read_old:
[[fallthrough]];
case topology::transition_state::left_token_ring:
return read_new_t::no;
case topology::transition_state::write_both_read_new:
return read_new_t::yes;
@@ -4802,7 +4806,6 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(raft
result.status = raft_topology_cmd_result::command_status::success;
}
break;
case node_state::left_token_ring:
case node_state::left:
case node_state::none:
case node_state::removing:

View File

@@ -368,7 +368,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
| boost::adaptors::filtered([&cmd, &exclude_nodes] (const std::pair<const raft::server_id, replica_state>& n) {
// We must send barrier_and_drain to the decommissioning node as it might be coordinating requests.
bool drain_decommissioning_node = cmd.cmd == raft_topology_cmd::command::barrier_and_drain
&& (n.second.state == node_state::decommissioning || n.second.state == node_state::left_token_ring);
&& n.second.state == node_state::decommissioning;
return !exclude_nodes.contains(n.first) && (n.second.state == node_state::normal || drain_decommissioning_node);
})
| boost::adaptors::map_keys;
@@ -1503,10 +1503,13 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
[[fallthrough]];
case node_state::decommissioning: {
topology_mutation_builder builder(node.guard.write_timestamp());
auto next_state = node.rs->state == node_state::decommissioning
? node_state::left_token_ring : node_state::left;
builder.del_transition_state()
.set_version(_topo_sm._topology.version + 1)
auto next_state = node.rs->state == node_state::decommissioning ? node.rs->state : node_state::left;
if (node.rs->state == node_state::decommissioning) {
builder.set_transition_state(topology::transition_state::left_token_ring);
} else {
builder.del_transition_state();
}
builder.set_version(_topo_sm._topology.version + 1)
.with_node(node.id)
.del("tokens")
.set("node_state", next_state);
@@ -1551,6 +1554,87 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
case topology::transition_state::tablet_migration:
co_await handle_tablet_migration(std::move(guard), false);
break;
case topology::transition_state::left_token_ring: {
auto node = get_node_to_work_on(std::move(guard));
if (node.id == _raft.id()) {
// Someone else needs to coordinate the rest of the decommission process,
// because the decommissioning node is going to shut down in the middle of this state.
rtlogger.info("coordinator is decommissioning; giving up leadership");
co_await step_down_as_nonvoter();
// Note: if we restart after this point and become a voter
// and then a coordinator again, it's fine - we'll just repeat this step.
// (If we're in `left` state when we try to restart we won't
// be able to become a voter - we'll be banned from the cluster.)
}
bool barrier_failed = false;
// Wait until other nodes observe the new token ring and stop sending writes to this node.
try {
node = retake_node(co_await global_token_metadata_barrier(std::move(node.guard), get_excluded_nodes(node)), node.id);
} catch (term_changed_error&) {
throw;
} catch (group0_concurrent_modification&) {
throw;
} catch (...) {
rtlogger.error("transition_state::left_token_ring, "
"raft_topology_cmd::command::barrier failed, error {}",
std::current_exception());
barrier_failed = true;
}
if (barrier_failed) {
// If barrier above failed it means there may be unfinished writes to a decommissioned node.
// Lets wait for the ring delay for those writes to complete and new topology to propagate
// before continuing.
co_await sleep_abortable(_ring_delay, _as);
node = retake_node(co_await start_operation(), node.id);
}
topology_request_tracking_mutation_builder rtbuilder(node.rs->request_id);
rtbuilder.done();
co_await update_topology_state(take_guard(std::move(node)), {rtbuilder.build()}, "report request completion in left_token_ring sate");
// Tell the node to shut down.
// This is done to improve user experience when there are no failures.
// In the next state (`node_state::left`), the node will be banned by the rest of the cluster,
// so there's no guarantee that it would learn about entering that state even if it was still
// a member of group0, hence we use a separate direct RPC in this state to shut it down.
//
// There is the possibility that the node will never get the message
// and decommission will hang on that node.
// This is fine for the rest of the cluster - we will still remove, ban the node and continue.
auto node_id = node.id;
bool shutdown_failed = false;
try {
node = co_await exec_direct_command(std::move(node), raft_topology_cmd::command::barrier);
} catch (...) {
rtlogger.warn("failed to tell node {} to shut down - it may hang."
" It's safe to shut it down manually now. (Exception: {})",
node.id, std::current_exception());
shutdown_failed = true;
}
if (shutdown_failed) {
node = retake_node(co_await start_operation(), node_id);
}
// Remove the node from group0 here - in general, it won't be able to leave on its own
// because we'll ban it as soon as we tell it to shut down.
co_await remove_from_group0(node.id);
topology_mutation_builder builder(node.guard.write_timestamp());
builder.del_transition_state()
.with_node(node.id)
.set("node_state", node_state::left);
auto str = node.rs->state == node_state::decommissioning
? ::format("finished decommissioning node {}", node.id)
: ::format("finished rollback of {} after {} failure", node.id, node.rs->state);
co_await update_topology_state(take_guard(std::move(node)), {builder.build()}, std::move(str));
}
break;
}
co_return true;
};
@@ -1731,82 +1815,6 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
co_await update_topology_state(take_guard(std::move(node)), {builder.build(), rtbuilder.build()}, "rebuilding completed");
}
break;
case node_state::left_token_ring: {
if (node.id == _raft.id()) {
// Someone else needs to coordinate the rest of the decommission process,
// because the decommissioning node is going to shut down in the middle of this state.
rtlogger.info("coordinator is decommissioning; giving up leadership");
co_await step_down_as_nonvoter();
// Note: if we restart after this point and become a voter
// and then a coordinator again, it's fine - we'll just repeat this step.
// (If we're in `left` state when we try to restart we won't
// be able to become a voter - we'll be banned from the cluster.)
}
bool barrier_failed = false;
// Wait until other nodes observe the new token ring and stop sending writes to this node.
try {
node = retake_node(co_await global_token_metadata_barrier(std::move(node.guard), get_excluded_nodes(node)), node.id);
} catch (term_changed_error&) {
throw;
} catch (group0_concurrent_modification&) {
throw;
} catch (...) {
rtlogger.error("node_state::left_token_ring (node: {}), "
"global_token_metadata_barrier failed, error {}",
node.id, std::current_exception());
barrier_failed = true;
}
if (barrier_failed) {
// If barrier above failed it means there may be unfinished writes to a decommissioned node.
// Lets wait for the ring delay for those writes to complete and new topology to propagate
// before continuing.
co_await sleep_abortable(_ring_delay, _as);
node = retake_node(co_await start_operation(), node.id);
}
topology_request_tracking_mutation_builder rtbuilder(node.rs->request_id);
rtbuilder.done();
co_await update_topology_state(take_guard(std::move(node)), {rtbuilder.build()}, "report request completion in left_token_ring state");
// Tell the node to shut down.
// This is done to improve user experience when there are no failures.
// In the next state (`node_state::left`), the node will be banned by the rest of the cluster,
// so there's no guarantee that it would learn about entering that state even if it was still
// a member of group0, hence we use a separate direct RPC in this state to shut it down.
//
// There is the possibility that the node will never get the message
// and decommission will hang on that node.
// This is fine for the rest of the cluster - we will still remove, ban the node and continue.
auto node_id = node.id;
bool shutdown_failed = false;
try {
node = co_await exec_direct_command(std::move(node), raft_topology_cmd::command::barrier);
} catch (...) {
rtlogger.warn("failed to tell node {} to shut down - it may hang."
" It's safe to shut it down manually now. (Exception: {})",
node.id, std::current_exception());
shutdown_failed = true;
}
if (shutdown_failed) {
node = retake_node(co_await start_operation(), node_id);
}
// Remove the node from group0 here - in general, it won't be able to leave on its own
// because we'll ban it as soon as we tell it to shut down.
co_await remove_from_group0(node.id);
topology_mutation_builder builder(node.guard.write_timestamp());
builder.with_node(node.id)
.set("node_state", node_state::left);
auto str = ::format("finished decommissioning node {}", node.id);
co_await update_topology_state(take_guard(std::move(node)), {builder.build()}, std::move(str));
}
break;
case node_state::rollback_to_normal: {
// The barrier waits for all double writes started during the operation to complete. It allowed to fail
// since we will fence the requests later.
@@ -2086,20 +2094,18 @@ future<> topology_coordinator::rollback_current_topology_op(group0_guard&& guard
// Look for a node which operation should be aborted
// (there should be one since we are in the rollback)
node_to_work_on node = get_node_to_work_on(std::move(guard));
node_state state;
std::unordered_set<raft::server_id> exclude_nodes = parse_ignore_nodes(node.req_param);
node_state state = node.rs->state;
std::optional<topology::transition_state> transition_state;
switch (node.rs->state) {
case node_state::bootstrapping:
[[fallthrough]];
case node_state::replacing:
// To rollback bootstrap of replace just move a node that we tried to add to the left_token_ring state.
// It will be removed from the group0 by the state handler. It will also be notified to shutdown.
state = node_state::left_token_ring;
// To rollback bootstrap of replace just move the topology to left_token_ring. The node we tried to
// add will be removed from the group0 by the transition handler. It will also be notified to shutdown.
transition_state = topology::transition_state::left_token_ring;
break;
case node_state::removing:
// Exclude dead node from global barrier
exclude_nodes.emplace(node.id);
// The node was removed already. We need to add it back. Lets do it as non voter.
// If it ever boots again it will make itself a voter.
co_await _group0.group0_server().modify_config({raft::config_member{{node.id, {}}, false}}, {}, &_as);
@@ -2114,8 +2120,16 @@ future<> topology_coordinator::rollback_current_topology_op(group0_guard&& guard
topology_mutation_builder builder(node.guard.write_timestamp());
topology_request_tracking_mutation_builder rtbuilder(node.rs->request_id);
builder.del_transition_state()
.set_version(_topo_sm._topology.version + 1)
std::string str;
if (transition_state) {
builder.set_transition_state(*transition_state);
str = fmt::format("rollback {} after {} failure, moving transition state to {} and setting cleanup flag",
node.id, node.rs->state, *transition_state);
} else {
builder.del_transition_state();
str = fmt::format("rollback {} after {} failure to state {} and setting cleanup flag", node.id, node.rs->state, state);
}
builder.set_version(_topo_sm._topology.version + 1)
.with_node(node.id)
.set("node_state", state);
rtbuilder.set("error", fmt::format("Rolled back: {}", *_rollback));
@@ -2127,8 +2141,6 @@ future<> topology_coordinator::rollback_current_topology_op(group0_guard&& guard
muts.emplace_back(builder.build());
muts.emplace_back(rtbuilder.build());
auto str = fmt::format("rollback {} after {} failure to state {} and setting cleanup flag", node.id, node.rs->state, state);
rtlogger.info("{}", str);
co_await update_topology_state(std::move(node.guard), std::move(muts), str);
}

View File

@@ -168,6 +168,7 @@ static std::unordered_map<topology::transition_state, sstring> transition_state_
{topology::transition_state::write_both_read_new, "write both read new"},
{topology::transition_state::tablet_migration, "tablet migration"},
{topology::transition_state::tablet_draining, "tablet draining"},
{topology::transition_state::left_token_ring, "left token ring"},
};
std::ostream& operator<<(std::ostream& os, topology::transition_state s) {
@@ -192,7 +193,6 @@ static std::unordered_map<node_state, sstring> node_state_to_name_map = {
{node_state::decommissioning, "decommissioning"},
{node_state::removing, "removing"},
{node_state::normal, "normal"},
{node_state::left_token_ring, "left_token_ring"},
{node_state::left, "left"},
{node_state::replacing, "replacing"},
{node_state::rebuilding, "rebuilding"},

View File

@@ -33,7 +33,6 @@ enum class node_state: uint16_t {
replacing, // the node replaces another dead node in the cluster and it data is being streamed to it
rebuilding, // the node is being rebuild and is streaming data from other replicas
normal, // the node does not do any streaming and serves the slice of the ring that belongs to it
left_token_ring, // the node left the token ring, but not group0 yet; we wait until other nodes stop writing to it
left, // the node left the cluster and group0
rollback_to_normal, // the node rolls back failed decommission/remove node operation
};
@@ -116,6 +115,7 @@ struct topology {
write_both_read_old,
write_both_read_new,
tablet_migration,
left_token_ring,
};
std::optional<transition_state> tstate;

View File

@@ -49,7 +49,8 @@ async def test_topology_streaming_failure(request, manager: ManagerClient):
await manager.server_start(s.server_id, expected_error="Bootstrap failed. See earlier errors")
servers = await manager.running_servers()
assert s not in servers
matches = [await log.grep("raft_topology - rollback.*after bootstrapping failure to state left_token_ring", from_mark=mark) for log, mark in zip(logs, marks)]
matches = [await log.grep("raft_topology - rollback.*after bootstrapping failure, moving transition state to left token ring",
from_mark=mark) for log, mark in zip(logs, marks)]
assert sum(len(x) for x in matches) == 1
# bootstrap failure in raft barrier
marks = [await log.mark() for log in logs]
@@ -59,7 +60,8 @@ async def test_topology_streaming_failure(request, manager: ManagerClient):
await manager.server_start(s.server_id, expected_error="Bootstrap failed. See earlier errors")
servers = await manager.running_servers()
assert s not in servers
matches = [await log.grep("raft_topology - rollback.*after bootstrapping failure to state left_token_ring", from_mark=mark) for log, mark in zip(logs, marks)]
matches = [await log.grep("raft_topology - rollback.*after bootstrapping failure, moving transition state to left token ring",
from_mark=mark) for log, mark in zip(logs, marks)]
assert sum(len(x) for x in matches) == 1
# replace failure
marks = [await log.mark() for log in logs]
@@ -72,5 +74,6 @@ async def test_topology_streaming_failure(request, manager: ManagerClient):
await manager.server_start(s.server_id, expected_error="Replace failed. See earlier errors")
servers = await manager.running_servers()
assert s not in servers
matches = [await log.grep("raft_topology - rollback.*after replacing failure to state left_token_ring", from_mark=mark) for log, mark in zip(logs, marks)]
matches = [await log.grep("raft_topology - rollback.*after replacing failure, moving transition state to left token ring",
from_mark=mark) for log, mark in zip(logs, marks)]
assert sum(len(x) for x in matches) == 1