raft topology: make replication_state a topology-global state
Previously it was part of `ring_slice`, belonging to a specific node. This commit moves it into `topology`, making it a cluster-global property. The `replication_state` column in `system.topology` is now `static`. This will allow us to easily introduce topology transition states that do not refer to any specific node. `commit_cdc_generation` will be such a state, allowing us to commit a new CDC generation even though all nodes are normal (none are transitioning). One could argue that the other states are conceptually already cluster-global: for example, `write_both_read_new` doesn't affect only the tokens of a bootstrapping (or decommissioning etc.) node; it affects replica sets of other tokens as well (with RFs greater than 1).
This commit is contained in:
@@ -244,7 +244,6 @@ schema_ptr system_keyspace::topology() {
|
||||
.with_column("datacenter", utf8_type)
|
||||
.with_column("rack", utf8_type)
|
||||
.with_column("tokens", set_type_impl::get_instance(utf8_type, true))
|
||||
.with_column("replication_state", utf8_type)
|
||||
.with_column("node_state", utf8_type)
|
||||
.with_column("release_version", utf8_type)
|
||||
.with_column("topology_request", utf8_type)
|
||||
@@ -254,6 +253,7 @@ schema_ptr system_keyspace::topology() {
|
||||
.with_column("shard_count", int32_type)
|
||||
.with_column("ignore_msb", int32_type)
|
||||
.with_column("new_cdc_generation_data_uuid", uuid_type)
|
||||
.with_column("replication_state", utf8_type, column_kind::static_column)
|
||||
.with_column("current_cdc_generation_uuid", uuid_type, column_kind::static_column)
|
||||
.with_column("current_cdc_generation_timestamp", timestamp_type, column_kind::static_column)
|
||||
.set_comment("Current state of topology change machine")
|
||||
@@ -3529,22 +3529,17 @@ future<service::topology> system_keyspace::load_topology_state() {
|
||||
service::node_state nstate = service::node_state_from_string(row.get_as<sstring>("node_state"));
|
||||
|
||||
std::optional<service::ring_slice> ring_slice;
|
||||
if (row.has("replication_state")) {
|
||||
auto repl_state = service::replication_state_from_string(row.get_as<sstring>("replication_state"));
|
||||
|
||||
std::unordered_set<dht::token> tokens;
|
||||
if (row.has("tokens")) {
|
||||
auto blob = row.get_blob("tokens");
|
||||
auto cdef = topology()->get_column_definition("tokens");
|
||||
auto deserialized = cdef->type->deserialize(blob);
|
||||
auto ts = value_cast<set_type_impl::native_type>(deserialized);
|
||||
tokens = decode_tokens(ts);
|
||||
}
|
||||
if (row.has("tokens")) {
|
||||
auto blob = row.get_blob("tokens");
|
||||
auto cdef = topology()->get_column_definition("tokens");
|
||||
auto deserialized = cdef->type->deserialize(blob);
|
||||
auto ts = value_cast<set_type_impl::native_type>(deserialized);
|
||||
auto tokens = decode_tokens(ts);
|
||||
|
||||
if (tokens.empty()) {
|
||||
on_fatal_internal_error(slogger, format(
|
||||
"load_topology_state: node {} has replication state ({}) but missing tokens",
|
||||
host_id, repl_state));
|
||||
"load_topology_state: node {} has tokens column present but tokens are empty",
|
||||
host_id));
|
||||
}
|
||||
|
||||
utils::UUID new_cdc_gen_uuid;
|
||||
@@ -3553,7 +3548,6 @@ future<service::topology> system_keyspace::load_topology_state() {
|
||||
}
|
||||
|
||||
ring_slice = service::ring_slice {
|
||||
.state = repl_state,
|
||||
.tokens = std::move(tokens),
|
||||
.new_cdc_generation_data_uuid = new_cdc_gen_uuid,
|
||||
};
|
||||
@@ -3642,6 +3636,18 @@ future<service::topology> system_keyspace::load_topology_state() {
|
||||
{
|
||||
// Here we access static columns, any row will do.
|
||||
auto& some_row = *rs->begin();
|
||||
|
||||
if (some_row.has("replication_state")) {
|
||||
ret.rstate = service::replication_state_from_string(some_row.get_as<sstring>("replication_state"));
|
||||
}
|
||||
|
||||
if (ret.rstate == service::topology::replication_state::normal
|
||||
&& !ret.transition_nodes.empty()) {
|
||||
on_internal_error(slogger, format(
|
||||
"load_topology_state: replication state is normal but transition nodes are present"));
|
||||
}
|
||||
|
||||
|
||||
if (some_row.has("current_cdc_generation_uuid")) {
|
||||
auto gen_uuid = some_row.get_as<utils::UUID>("current_cdc_generation_uuid");
|
||||
if (!some_row.has("current_cdc_generation_timestamp")) {
|
||||
|
||||
@@ -327,13 +327,11 @@ future<> storage_service::topology_state_load(cdc::generation_service& cdc_gen_s
|
||||
co_await tmptr->clear_gently(); // drop previous state
|
||||
|
||||
auto add_normal_node = [&] (raft::server_id id, const replica_state& rs) -> future<> {
|
||||
assert (rs.ring.value().state == ring_slice::ring_slice::replication_state::owner);
|
||||
|
||||
locator::host_id host_id{id.uuid()};
|
||||
auto ip = co_await id2ip(id);
|
||||
|
||||
slogger.trace("raft topology: loading topology: raft id={} ip={} node state={} dc={} rack={} tokens state={} tokens={}",
|
||||
id, ip, rs.state, rs.datacenter, rs.rack, rs.ring.value().state, rs.ring.value().tokens);
|
||||
id, ip, rs.state, rs.datacenter, rs.rack, _topology_state_machine._topology.rstate, rs.ring.value().tokens);
|
||||
// Save tokens, not needed for raft topology management, but needed by legacy
|
||||
// Also ip -> id mapping is needed for address map recreation on reboot
|
||||
if (!utils::fb_utilities::is_me(ip)) {
|
||||
@@ -360,7 +358,7 @@ future<> storage_service::topology_state_load(cdc::generation_service& cdc_gen_s
|
||||
auto ip = co_await id2ip(id);
|
||||
|
||||
slogger.trace("raft topology: loading topology: raft id={} ip={} node state={} dc={} rack={} tokens state={} tokens={}",
|
||||
id, ip, rs.state, rs.datacenter, rs.rack, rs.ring->state, rs.ring->tokens);
|
||||
id, ip, rs.state, rs.datacenter, rs.rack, _topology_state_machine._topology.rstate, rs.ring->tokens);
|
||||
|
||||
switch (rs.state) {
|
||||
case node_state::bootstrapping:
|
||||
@@ -473,6 +471,7 @@ public:
|
||||
topology_mutation_builder& set(const char* cell, const std::unordered_set<dht::token>& value);
|
||||
topology_mutation_builder& set(const char* cell, const uint32_t& value);
|
||||
topology_mutation_builder& set(const char* cell, const utils::UUID& value);
|
||||
topology_mutation_builder& set_replication_state(topology::replication_state);
|
||||
topology_mutation_builder& set_current_cdc_generation_id(const cdc::generation_id_v2&);
|
||||
topology_mutation_builder& del(const char* cell);
|
||||
canonical_mutation build() { return canonical_mutation{std::move(_m)}; }
|
||||
@@ -548,6 +547,11 @@ topology_mutation_builder& topology_mutation_builder::set(const char* cell, cons
|
||||
return *this;
|
||||
}
|
||||
|
||||
topology_mutation_builder& topology_mutation_builder::set_replication_state(topology::replication_state value) {
|
||||
_m.set_static_cell("replication_state", fmt::format("{}", value), _ts);
|
||||
return *this;
|
||||
}
|
||||
|
||||
topology_mutation_builder& topology_mutation_builder::set_current_cdc_generation_id(
|
||||
const cdc::generation_id_v2& value) {
|
||||
_m.set_static_cell("current_cdc_generation_timestamp", value.ts, _ts);
|
||||
@@ -676,8 +680,8 @@ future<> storage_service::topology_change_coordinator_fiber(raft::server& raft,
|
||||
}
|
||||
|
||||
bool res;
|
||||
switch (node.rs->ring.value().state) {
|
||||
case ring_slice::replication_state::commit_cdc_generation: {
|
||||
switch (node.topology->rstate) {
|
||||
case topology::replication_state::commit_cdc_generation: {
|
||||
// make sure all nodes know about new topology and have the new CDC generation data
|
||||
// (we require all nodes to be alive for topo change for now)
|
||||
std::tie(node, res) = co_await exec_global_command(std::move(node), raft_topology_cmd{raft_topology_cmd::command::barrier}, false, replaced_node);
|
||||
@@ -737,13 +741,13 @@ future<> storage_service::topology_change_coordinator_fiber(raft::server& raft,
|
||||
// committed) - they won't coordinate CDC-enabled writes until they reconnect to the
|
||||
// majority and commit.
|
||||
topology_mutation_builder builder(node.guard.write_timestamp(), node.id);
|
||||
builder.set("replication_state", ring_slice::replication_state::write_both_read_old)
|
||||
builder.set_replication_state(topology::replication_state::write_both_read_old)
|
||||
.set_current_cdc_generation_id(cdc_gen_id);
|
||||
auto str = fmt::format("{}: committed new CDC generation, ID: {}", node.rs->state, cdc_gen_id);
|
||||
co_await update_replica_state(std::move(node), {builder.build()}, std::move(str));
|
||||
}
|
||||
break;
|
||||
case ring_slice::ring_slice::replication_state::write_both_read_old: {
|
||||
case topology::replication_state::write_both_read_old: {
|
||||
// make sure all nodes know about new topology (we require all nodes to be alive for topo change for now)
|
||||
std::tie(node, res) = co_await exec_global_command(std::move(node), raft_topology_cmd{raft_topology_cmd::command::barrier}, false, replaced_node);
|
||||
if (!res) {
|
||||
@@ -779,14 +783,14 @@ future<> storage_service::topology_change_coordinator_fiber(raft::server& raft,
|
||||
break;
|
||||
}
|
||||
}
|
||||
// Streaming completed. We can now move tokens state to ring_slice::ring_slice::replication_state::write_both_read_new
|
||||
// Streaming completed. We can now move tokens state to topology::replication_state::write_both_read_new
|
||||
topology_mutation_builder builder(node.guard.write_timestamp(), node.id);
|
||||
builder.set("replication_state", ring_slice::replication_state::write_both_read_new);
|
||||
builder.set_replication_state(topology::replication_state::write_both_read_new);
|
||||
auto str = fmt::format("{}: streaming completed for node {}", node.rs->state, node.id);
|
||||
co_await update_replica_state(std::move(node), {builder.build()}, std::move(str));
|
||||
}
|
||||
break;
|
||||
case ring_slice::replication_state::write_both_read_new:
|
||||
case topology::replication_state::write_both_read_new:
|
||||
// In this state writes goes to old and new replicas but reads start to be done from new replicas
|
||||
// Before we stop writing to old replicas we need to wait for all previous reads to complete
|
||||
std::tie(node, res) = co_await exec_global_command(std::move(node), raft_topology_cmd{raft_topology_cmd::command::fence_old_reads}, true, replaced_node);
|
||||
@@ -796,7 +800,7 @@ future<> storage_service::topology_change_coordinator_fiber(raft::server& raft,
|
||||
switch(node.rs->state) {
|
||||
case node_state::bootstrapping: {
|
||||
topology_mutation_builder builder(node.guard.write_timestamp(), node.id);
|
||||
builder.set("replication_state", ring_slice::replication_state::owner)
|
||||
builder.set_replication_state(topology::replication_state::normal)
|
||||
.set("node_state", node_state::normal);
|
||||
co_await update_replica_state(std::move(node), {builder.build()}, "bootstrap: read fence completed");
|
||||
}
|
||||
@@ -804,9 +808,9 @@ future<> storage_service::topology_change_coordinator_fiber(raft::server& raft,
|
||||
case node_state::decommissioning:
|
||||
case node_state::removing: {
|
||||
topology_mutation_builder builder(node.guard.write_timestamp(), node.id);
|
||||
builder.del("replication_state")
|
||||
.del("tokens")
|
||||
.set("node_state", node_state::left);
|
||||
builder.del("tokens")
|
||||
.set("node_state", node_state::left)
|
||||
.set_replication_state(topology::replication_state::normal);
|
||||
auto str = fmt::format("{}: read fence completed", node.rs->state);
|
||||
co_await update_replica_state(std::move(node), {builder.build()}, std::move(str));
|
||||
}
|
||||
@@ -814,13 +818,12 @@ future<> storage_service::topology_change_coordinator_fiber(raft::server& raft,
|
||||
case node_state::replacing: {
|
||||
topology_mutation_builder builder1(node.guard.write_timestamp(), node.id);
|
||||
// Move new node to 'normal'
|
||||
builder1.set("replication_state", ring_slice::replication_state::owner)
|
||||
builder1.set_replication_state(topology::replication_state::normal)
|
||||
.set("node_state", node_state::normal);
|
||||
|
||||
// Move old node to 'left'
|
||||
topology_mutation_builder builder2(node.guard.write_timestamp(), replaced_node);
|
||||
builder2.del("replication_state")
|
||||
.del("tokens")
|
||||
builder2.del("tokens")
|
||||
.set("node_state", node_state::left);
|
||||
co_await update_replica_state(std::move(node), {builder1.build(), builder2.build()}, "replace: read fence completed");
|
||||
}
|
||||
@@ -828,11 +831,11 @@ future<> storage_service::topology_change_coordinator_fiber(raft::server& raft,
|
||||
default:
|
||||
on_fatal_internal_error(slogger, format("Ring state on node {} is write_both_read_new while the node is in state {}", node.id, node.rs->state));
|
||||
}
|
||||
// Reads are fenced. We can now move tokens state to ring_slice::replication_state::owner and node to normal
|
||||
// Reads are fenced. We can now move tokens state to topology::replication_state::normal and node to normal
|
||||
break;
|
||||
case ring_slice::replication_state::owner:
|
||||
case topology::replication_state::normal:
|
||||
// should not get here
|
||||
on_fatal_internal_error(slogger, format("Tried to handle ring state transition on node {} while in 'owner' state", node.id));
|
||||
on_fatal_internal_error(slogger, format("Tried to handle ring state transition on node {} while in 'normal' state", node.id));
|
||||
break;
|
||||
}
|
||||
};
|
||||
@@ -913,7 +916,7 @@ future<> storage_service::topology_change_coordinator_fiber(raft::server& raft,
|
||||
builder.set("node_state", node_state::bootstrapping)
|
||||
.del("topology_request")
|
||||
.set("tokens", bootstrap_tokens)
|
||||
.set("replication_state", ring_slice::replication_state::commit_cdc_generation)
|
||||
.set_replication_state(topology::replication_state::commit_cdc_generation)
|
||||
.set("new_cdc_generation_data_uuid", gen_uuid);
|
||||
updates.push_back(builder.build());
|
||||
auto reason = format(
|
||||
@@ -927,7 +930,7 @@ future<> storage_service::topology_change_coordinator_fiber(raft::server& raft,
|
||||
// meaning that reads will go to the replica being decommissioned but writes will go to new owner as well
|
||||
builder.set("node_state", node_state::decommissioning)
|
||||
.del("topology_request")
|
||||
.set("replication_state", ring_slice::replication_state::write_both_read_old);
|
||||
.set_replication_state(topology::replication_state::write_both_read_old);
|
||||
co_await update_replica_state(std::move(node), {builder.build()}, "start decommission");
|
||||
break;
|
||||
case topology_request::remove:
|
||||
@@ -936,7 +939,7 @@ future<> storage_service::topology_change_coordinator_fiber(raft::server& raft,
|
||||
// meaning that reads will go to the replica being removed (it is dead though) but writes will go to new owner as well
|
||||
builder.set("node_state", node_state::removing)
|
||||
.del("topology_request")
|
||||
.set("replication_state", ring_slice::replication_state::write_both_read_old);
|
||||
.set_replication_state(topology::replication_state::write_both_read_old);
|
||||
co_await update_replica_state(std::move(node), {builder.build()}, "start removenode");
|
||||
break;
|
||||
case topology_request::replace: {
|
||||
@@ -950,7 +953,7 @@ future<> storage_service::topology_change_coordinator_fiber(raft::server& raft,
|
||||
builder.set("node_state", node_state::replacing)
|
||||
.del("topology_request")
|
||||
.set("tokens", it->second.ring->tokens)
|
||||
.set("replication_state", ring_slice::replication_state::write_both_read_old);
|
||||
.set_replication_state(topology::replication_state::write_both_read_old);
|
||||
co_await update_replica_state(std::move(node), {builder.build()}, "start replace");
|
||||
break;
|
||||
}
|
||||
@@ -4653,10 +4656,10 @@ future<raft_topology_cmd_result> storage_service::raft_topology_cmd_handler(shar
|
||||
break;
|
||||
case raft_topology_cmd::command::stream_ranges: {
|
||||
const auto& rs = _topology_state_machine._topology.find(raft_server.id())->second;
|
||||
auto replication_state = _topology_state_machine._topology.rstate;
|
||||
if (!rs.ring ||
|
||||
(rs.ring->state != ring_slice::replication_state::write_both_read_old && rs.state != node_state::normal && rs.state != node_state::rebuilding)) {
|
||||
slogger.warn("raft topology: got stream_ranges request while my tokens state is {} and node state is {}",
|
||||
rs.ring ? fmt::format("{}", rs.ring->state) : "'missing ring'", rs.state);
|
||||
(replication_state != topology::replication_state::write_both_read_old && rs.state != node_state::normal && rs.state != node_state::rebuilding)) {
|
||||
slogger.warn("raft topology: got stream_ranges request while my tokens state is {} and node state is {}", replication_state, rs.state);
|
||||
break;
|
||||
}
|
||||
switch(rs.state) {
|
||||
|
||||
@@ -36,14 +36,14 @@ bool topology::contains(raft::server_id id) {
|
||||
left_nodes.contains(id);
|
||||
}
|
||||
|
||||
static std::unordered_map<ring_slice::replication_state, sstring> replication_state_to_name_map = {
|
||||
{ring_slice::replication_state::commit_cdc_generation, "commit cdc generation"},
|
||||
{ring_slice::replication_state::write_both_read_old, "write both read old"},
|
||||
{ring_slice::replication_state::write_both_read_new, "write both read new"},
|
||||
{ring_slice::replication_state::owner, "owner"},
|
||||
static std::unordered_map<topology::replication_state, sstring> replication_state_to_name_map = {
|
||||
{topology::replication_state::commit_cdc_generation, "commit cdc generation"},
|
||||
{topology::replication_state::write_both_read_old, "write both read old"},
|
||||
{topology::replication_state::write_both_read_new, "write both read new"},
|
||||
{topology::replication_state::normal, "normal"},
|
||||
};
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, ring_slice::replication_state s) {
|
||||
std::ostream& operator<<(std::ostream& os, topology::replication_state s) {
|
||||
auto it = replication_state_to_name_map.find(s);
|
||||
if (it == replication_state_to_name_map.end()) {
|
||||
on_internal_error(tsmlogger, "cannot print replication_state");
|
||||
@@ -51,7 +51,7 @@ std::ostream& operator<<(std::ostream& os, ring_slice::replication_state s) {
|
||||
return os << it->second;
|
||||
}
|
||||
|
||||
ring_slice::replication_state replication_state_from_string(const sstring& s) {
|
||||
topology::replication_state replication_state_from_string(const sstring& s) {
|
||||
for (auto&& e : replication_state_to_name_map) {
|
||||
if (e.second == s) {
|
||||
return e.first;
|
||||
|
||||
@@ -48,14 +48,6 @@ enum class topology_request: uint8_t {
|
||||
using request_param = std::variant<raft::server_id, sstring, uint32_t>;
|
||||
|
||||
struct ring_slice {
|
||||
enum class replication_state: uint8_t {
|
||||
commit_cdc_generation,
|
||||
write_both_read_old,
|
||||
write_both_read_new,
|
||||
owner
|
||||
};
|
||||
|
||||
replication_state state;
|
||||
std::unordered_set<dht::token> tokens;
|
||||
|
||||
// When a new node joins the cluster, always a new CDC generation is created.
|
||||
@@ -75,6 +67,15 @@ struct replica_state {
|
||||
};
|
||||
|
||||
struct topology {
|
||||
enum class replication_state: uint8_t {
|
||||
normal,
|
||||
commit_cdc_generation,
|
||||
write_both_read_old,
|
||||
write_both_read_new,
|
||||
};
|
||||
|
||||
replication_state rstate{replication_state::normal};
|
||||
|
||||
// Nodes that are normal members of the ring
|
||||
std::unordered_map<raft::server_id, replica_state> normal_nodes;
|
||||
// Nodes that are left
|
||||
@@ -138,8 +139,8 @@ struct raft_topology_cmd_result {
|
||||
command_status status = command_status::fail;
|
||||
};
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, ring_slice::replication_state s);
|
||||
ring_slice::replication_state replication_state_from_string(const sstring& s);
|
||||
std::ostream& operator<<(std::ostream& os, topology::replication_state s);
|
||||
topology::replication_state replication_state_from_string(const sstring& s);
|
||||
std::ostream& operator<<(std::ostream& os, node_state s);
|
||||
node_state node_state_from_string(const sstring& s);
|
||||
std::ostream& operator<<(std::ostream& os, const topology_request& req);
|
||||
|
||||
Reference in New Issue
Block a user