Merge 'topology changes over raft' from Gleb Natapov

The patch series introduces linearisable topology changes using
raft protocol. The state machine driven by raft is described in
"service: Introduce topology state machine". Some explanations about
the implementation can be found in "storage_service: raft topology:
implement topology management through raft".

The code is not ready for production. There is not much in terms of error
handling and integration with the rest of the system is not even started.
For full integration request fencing will need to be implemented and
token_metadata has to be extended to support not just "pending" nodes
but concepts of "read replica set" and "write replica set".

The code may be far from be usable, but it is hidden behind the
"experimental raft" flag and having it in tree will relieve me from
constant rebase burden.

* 'raft-topology-v6' of github.com:scylladb/scylla-dev:
  storage_service: fix indentation from previous patch
  storage_service: raft topology: implement topology management through raft
  service: raft: make group0_guard move assignable
  service: raft: wire up apply() and snapshot transfer for topology in group0 state machine
  storage_service: raft topology: introduce a function that applies topology cmd to local state machine
  storage_service: raft topology: introduce a raft monitor and topology coordinator fibers
  storage_service: raft topology: introduce snapshot transfer code for the topology table
  raft topology: add RAFT_TOPOLOGY_CMD verb that will be used by topology coordinator to communicated with nodes
  bootstrapper: Add get_random_bootstrap_tokens function
  service: raft: add support for topology_change command into raft_group0_client
  service: raft: introduce topology_change group0 command
  system_keyspace: add a table to persist topology change state machine's state
  service: Introduce topology state machine data structures
  storage_proxy: not consult topology on local table write
This commit is contained in:
Kamil Braun
2023-03-23 15:59:45 +01:00
20 changed files with 1946 additions and 171 deletions

View File

@@ -1036,6 +1036,7 @@ scylla_core = (['message/messaging_service.cc',
'tasks/task_manager.cc',
'rust/wasmtime_bindings/src/lib.rs',
'utils/to_string.cc',
'service/topology_state_machine.cc',
] + [Antlr3Grammar('cql3/Cql.g')] + [Thrift('interface/cassandra.thrift', 'Cassandra')] \
+ scylla_raft_core
)
@@ -1151,6 +1152,7 @@ idls = ['idl/gossip_digest.idl.hh',
'idl/per_partition_rate_limit_info.idl.hh',
'idl/position_in_partition.idl.hh',
'idl/experimental/broadcast_tables_lang.idl.hh',
'idl/storage_service.idl.hh',
]
headers = find_headers('.', excluded_dirs=['idl', 'build', 'seastar', '.git'])

View File

@@ -3008,7 +3008,6 @@ static void prepare_builder_from_table_row(const schema_ctxt& ctxt, schema_build
}
}
schema_ptr create_table_from_mutations(const schema_ctxt& ctxt, schema_mutations sm, std::optional<table_schema_version> version)
{
slogger.trace("create_table_from_mutations: version={}, {}", version, sm);

View File

@@ -78,6 +78,7 @@ namespace {
system_keyspace::GROUP0_HISTORY,
system_keyspace::DISCOVERY,
system_keyspace::BROADCAST_KV_STORE,
system_keyspace::TOPOLOGY,
};
if (ks_name == system_keyspace::NAME && system_ks_null_shard_tables.contains(cf_name)) {
props.use_null_sharder = true;
@@ -91,7 +92,8 @@ namespace {
system_keyspace::RAFT_SNAPSHOTS,
system_keyspace::RAFT_SNAPSHOT_CONFIG,
system_keyspace::DISCOVERY,
system_keyspace::BROADCAST_KV_STORE
system_keyspace::BROADCAST_KV_STORE,
system_keyspace::TOPOLOGY,
};
if (ks_name == system_keyspace::NAME && extra_durable_tables.contains(cf_name)) {
props.wait_for_sync_to_commitlog = true;
@@ -213,6 +215,29 @@ schema_ptr system_keyspace::batchlog() {
return paxos;
}
schema_ptr system_keyspace::topology() {
static thread_local auto schema = [] {
auto id = generate_legacy_id(NAME, TOPOLOGY);
return schema_builder(NAME, TOPOLOGY, std::optional(id))
.with_column("key", utf8_type, column_kind::partition_key)
.with_column("host_id", uuid_type, column_kind::clustering_key)
.with_column("datacenter", utf8_type)
.with_column("rack", utf8_type)
.with_column("tokens", set_type_impl::get_instance(utf8_type, true))
.with_column("replication_state", utf8_type)
.with_column("node_state", utf8_type)
.with_column("release_version", utf8_type)
.with_column("topology_request", utf8_type)
.with_column("replaced_id", uuid_type)
.with_column("rebuild_option", utf8_type)
.with_column("num_tokens", int32_type)
.set_comment("Current state of topology change machine")
.with_version(generate_schema_version(id))
.build();
}();
return schema;
}
schema_ptr system_keyspace::raft() {
static thread_local auto schema = [] {
auto id = generate_legacy_id(NAME, RAFT);
@@ -2811,6 +2836,10 @@ std::vector<schema_ptr> system_keyspace::all_tables(const db::config& cfg) {
if (cfg.consistent_cluster_management()) {
r.insert(r.end(), {raft(), raft_snapshots(), raft_snapshot_config(), group0_history(), discovery()});
if (cfg.check_experimental(db::experimental_features_t::feature::RAFT)) {
r.insert(r.end(), {topology()});
}
if (cfg.check_experimental(db::experimental_features_t::feature::BROADCAST_TABLES)) {
r.insert(r.end(), {broadcast_kv_store()});
}
@@ -3434,6 +3463,180 @@ future<> system_keyspace::save_group0_upgrade_state(service::group0_upgrade_stat
return set_scylla_local_param(GROUP0_UPGRADE_STATE_KEY, value);
}
future<service::topology_state_machine::topology_type> system_keyspace::load_topology_state() {
auto rs = co_await qctx->execute_cql(
format("SELECT * FROM system.{} WHERE key = '{}'", TOPOLOGY, TOPOLOGY));
assert(rs);
service::topology_state_machine::topology_type ret;
if (rs->empty()) {
co_return ret;
}
for (auto& row : *rs) {
raft::server_id host_id{row.get_as<utils::UUID>("host_id")};
auto datacenter = row.get_as<sstring>("datacenter");
auto rack = row.get_as<sstring>("rack");
auto release_version = row.get_as<sstring>("release_version");
uint32_t num_tokens = row.get_as<int32_t>("num_tokens");
service::node_state nstate = service::node_state_from_string(row.get_as<sstring>("node_state"));
std::optional<service::ring_slice::replication_state> tstate;
if (row.has("replication_state")) {
tstate = service::replication_state_from_string(row.get_as<sstring>("replication_state"));
}
std::unordered_set<dht::token> t;
if (row.has("tokens")) {
auto blob = row.get_blob("tokens");
auto cdef = topology()->get_column_definition("tokens");
auto deserialized = cdef->type->deserialize(blob);
auto tokens = value_cast<set_type_impl::native_type>(deserialized);
t = decode_tokens(tokens);
}
std::optional<raft::server_id> replaced_id;
if (row.has("replaced_id")) {
replaced_id = raft::server_id(row.get_as<utils::UUID>("replaced_id"));
}
std::optional<sstring> rebuild_option;
if (row.has("rebuild_option")) {
rebuild_option = row.get_as<sstring>("rebuild_option");
}
if (row.has("topology_request")) {
auto req = service::topology_request_from_string(row.get_as<sstring>("topology_request"));
ret.requests.emplace(host_id, req);
switch(req) {
case service::topology_request::replace:
if (!replaced_id) {
on_internal_error(slogger, fmt::format("replaced_id is missing for a node {}", host_id));
}
ret.req_param.emplace(host_id, *replaced_id);
break;
case service::topology_request::rebuild:
if (!rebuild_option) {
on_internal_error(slogger, fmt::format("rebuild_option is missing for a node {}", host_id));
}
ret.req_param.emplace(host_id, *rebuild_option);
break;
case service::topology_request::join:
ret.req_param.emplace(host_id, num_tokens);
break;
default:
// no parameters for other requests
break;
}
} else {
switch (nstate) {
case service::node_state::replacing:
// If a node is replacing abother node we need to know which node it is replacing
if (!replaced_id) {
on_internal_error(slogger, fmt::format("replaced_id is missing for a node {}", host_id));
}
ret.req_param.emplace(host_id, *replaced_id);
break;
case service::node_state::rebuilding:
// If a node is rebuilding it needs to know the parameter for the operation
if (!rebuild_option) {
on_internal_error(slogger, fmt::format("rebuild_option is missing for a node {}", host_id));
}
ret.req_param.emplace(host_id, *rebuild_option);
break;
default:
// no parameters for other operations
break;
}
}
if (!tstate && t.size() != 0) {
on_fatal_internal_error(slogger, "There cannot be tokens without the replication state");
}
std::unordered_map<raft::server_id, service::replica_state>* map = nullptr;
if (nstate == service::node_state::normal) {
map = &ret.normal_nodes;
} else if (nstate == service::node_state::left) {
ret.left_nodes.emplace(host_id);
} else if (nstate == service::node_state::none) {
map = &ret.new_nodes;
} else {
map = &ret.transition_nodes;
}
if (map) {
map->emplace(host_id, service::replica_state{nstate, std::move(datacenter), std::move(rack), std::move(release_version),
tstate ? std::optional<service::ring_slice>(service::ring_slice{*tstate, std::move(t)}) : std::nullopt});
}
}
co_return ret;
}
system_keyspace::topology_mutation_builder::topology_mutation_builder(api::timestamp_type ts, raft::server_id id) :
_s(topology()),
_m(_s, partition_key::from_singular(*_s, TOPOLOGY)),
_ts(ts),
_r(_m.partition().clustered_row(*_s, clustering_key::from_singular(*_s, id.uuid()))) {
_r.apply(row_marker(_ts));
}
system_keyspace::topology_mutation_builder& system_keyspace::topology_mutation_builder::set(const char* cell, const sstring& value) {
auto cdef = _s->get_column_definition(cell);
assert(cdef);
_r.cells().apply(*cdef, atomic_cell::make_live(*cdef->type, _ts, cdef->type->decompose(value)));
return *this;
}
system_keyspace::topology_mutation_builder& system_keyspace::topology_mutation_builder::set(const char* cell, const raft::server_id& value) {
auto cdef = _s->get_column_definition(cell);
assert(cdef);
_r.cells().apply(*cdef, atomic_cell::make_live(*cdef->type, _ts, cdef->type->decompose(value.uuid())));
return *this;
}
system_keyspace::topology_mutation_builder& system_keyspace::topology_mutation_builder::set(const char* cell, const uint32_t& value) {
auto cdef = _s->get_column_definition(cell);
assert(cdef);
_r.cells().apply(*cdef, atomic_cell::make_live(*cdef->type, _ts, cdef->type->decompose(int32_t(value))));
return *this;
}
system_keyspace::topology_mutation_builder& system_keyspace::topology_mutation_builder::del(const char* cell) {
auto cdef = _s->get_column_definition(cell);
assert(cdef);
if (!cdef->type->is_multi_cell()) {
_r.cells().apply(*cdef, atomic_cell::make_dead(_ts, gc_clock::now()));
} else {
collection_mutation_description cm;
cm.tomb = tombstone{_ts, gc_clock::now()};
_r.cells().apply(*cdef, cm.serialize(*cdef->type));
}
return *this;
}
system_keyspace::topology_mutation_builder& system_keyspace::topology_mutation_builder::set(const char* cell, const std::unordered_set<dht::token>& tokens) {
auto cdef = _s->get_column_definition(cell);
assert(cdef);
collection_mutation_description cm;
if (tokens.size()) {
auto vtype = static_pointer_cast<const set_type_impl>(cdef->type)->get_elements_type();
cm.cells.reserve(tokens.size());
for (auto&& value : tokens) {
cm.cells.emplace_back(vtype->decompose(value.to_sstring()), atomic_cell::make_live(*bytes_type, _ts, bytes_view()));
}
_r.cells().apply(*cdef, cm.serialize(*cdef->type));
} else {
del(cell);
}
return *this;
}
sstring system_keyspace_name() {
return system_keyspace::NAME;
}

View File

@@ -27,6 +27,8 @@
#include "locator/host_id.hh"
#include "service/raft/group0_fwd.hh"
#include "tasks/task_manager.hh"
#include "service/topology_state_machine.hh"
#include "mutation/canonical_mutation.hh"
namespace service {
@@ -153,6 +155,7 @@ public:
static constexpr auto GROUP0_HISTORY = "group0_history";
static constexpr auto DISCOVERY = "discovery";
static constexpr auto BROADCAST_KV_STORE = "broadcast_kv_store";
static constexpr auto TOPOLOGY = "topology";
struct v3 {
static constexpr auto BATCHES = "batches";
@@ -233,6 +236,7 @@ public:
static schema_ptr group0_history();
static schema_ptr discovery();
static schema_ptr broadcast_kv_store();
static schema_ptr topology();
static table_schema_version generate_schema_version(table_id table_id, uint16_t offset = 0);
@@ -439,6 +443,26 @@ public:
// Assumes that the history table exists, i.e. Raft experimental feature is enabled.
static future<bool> group0_history_contains(utils::UUID state_id);
class topology_mutation_builder {
schema_ptr _s;
mutation _m;
api::timestamp_type _ts;
deletable_row& _r;
public:
topology_mutation_builder(api::timestamp_type ts, raft::server_id);
template<typename T>
topology_mutation_builder& set(const char* cell, const T& value) {
return set(cell, sstring{fmt::format("{}", value)});
}
topology_mutation_builder& set(const char* cell, const sstring& value);
topology_mutation_builder& set(const char* cell, const raft::server_id& value);
topology_mutation_builder& set(const char* cell, const std::unordered_set<dht::token>& value);
topology_mutation_builder& set(const char* cell, const uint32_t& value);
topology_mutation_builder& del(const char* cell);
canonical_mutation build() { return canonical_mutation{std::move(_m)}; }
};
static future<service::topology_state_machine::topology_type> load_topology_state();
// The mutation appends the given state ID to the group 0 history table, with the given description if non-empty.
//
// If `gc_older_than` is provided, the mutation will also contain a tombstone that clears all entries whose

View File

@@ -63,6 +63,20 @@ future<> boot_strapper::bootstrap(streaming::stream_reason reason, gms::gossiper
}
}
std::unordered_set<token> boot_strapper::get_random_bootstrap_tokens(const token_metadata_ptr tmptr, size_t num_tokens, dht::check_token_endpoint check) {
if (num_tokens < 1) {
throw std::runtime_error("num_tokens must be >= 1");
}
if (num_tokens == 1) {
blogger.warn("Picking random token for a single vnode. You should probably add more vnodes; failing that, you should probably specify the token manually");
}
auto tokens = get_random_tokens(std::move(tmptr), num_tokens);
blogger.info("Get random bootstrap_tokens={}", tokens);
return tokens;
}
std::unordered_set<token> boot_strapper::get_bootstrap_tokens(const token_metadata_ptr tmptr, const db::config& cfg, dht::check_token_endpoint check) {
std::unordered_set<sstring> initial_tokens;
sstring tokens_string = cfg.initial_token();
@@ -87,19 +101,7 @@ std::unordered_set<token> boot_strapper::get_bootstrap_tokens(const token_metada
blogger.info("Get manually specified bootstrap_tokens={}", tokens);
return tokens;
}
size_t num_tokens = cfg.num_tokens();
if (num_tokens < 1) {
throw std::runtime_error("num_tokens must be >= 1");
}
if (num_tokens == 1) {
blogger.warn("Picking random token for a single vnode. You should probably add more vnodes; failing that, you should probably specify the token manually");
}
auto tokens = get_random_tokens(std::move(tmptr), num_tokens);
blogger.info("Get random bootstrap_tokens={}", tokens);
return tokens;
return get_random_bootstrap_tokens(tmptr, cfg.num_tokens(), check);
}
std::unordered_set<token> boot_strapper::get_random_tokens(const token_metadata_ptr tmptr, size_t num_tokens) {

View File

@@ -61,6 +61,11 @@ public:
*/
static std::unordered_set<token> get_bootstrap_tokens(const token_metadata_ptr tmptr, const db::config& cfg, check_token_endpoint check);
/**
* Same as above but does not consult initialtoken config
*/
static std::unordered_set<token> get_random_bootstrap_tokens(const token_metadata_ptr tmptr, size_t num_tokens, check_token_endpoint check);
static std::unordered_set<token> get_random_tokens(const token_metadata_ptr tmptr, size_t num_tokens);
#if 0
public static class StringSerializer implements IVersionedSerializer<String>

View File

@@ -0,0 +1,79 @@
# Topology state machine
The topology state machine tracks all the nodes in a cluster,
their state, properties (topology, tokens, etc) and requested actions.
Node state can be one of those:
none - the new node joined group0 but did not bootstraped yet (has no tokens and data to serve)
bootstrapping - the node is currently in the process of streaming its part of the ring
decommissioning - the node is being decomissioned and stream its data to nodes that took over
removing - the node is being removed and its data is streamed to nodes that took over from still alive owners
replacing - the node replaces another dead node in the cluster and it data is being streamed to it
rebuilding - the node is being rebuild and is streaming data from other replicas
normal - the node does not do any streaming and serves the slice of the ring that belongs to it
left - the node left the cluster and group0
Nodes in state left are never removed from the state.
State transition diagram:
{none} ------> {bootstrapping|replacing} ------> {normal} <---> {rebuilding}
| | |
| | |
| V V
----------------> {left} <-------- {decommissioning|removing}
A state may have additional parameters associated with it. For instance
'replacing' state has host id of a node been replaced as a parameter.
Tokens also can be in one of the states:
write_both_read_old - writes are going to new and old replica, but reads are from
old replicas still
write_both_read_new - writes still going to old and new replicas but reads are
from new replica
owner - tokens are owned by the node and reads and write go to new
replica set only
Tokens that needs to be move start in 'write_both_read_old' state. After entire
cluster learns about it streaming start. After the streaming tokens move
to 'write_both_read_new' state and again the whole cluster needs to learn about it
and make sure no reads started before that point exist in the system.
After that tokens may move to the 'owner' state.
The state machine also maintains a map of topology requests per node.
When a request is issued to a node the entry is added to the map. A
request is one of the topology operation currently supported: join,
leave, replace, remove and rebuild. A request may also have parameters
associated with it which are also stored in a separate map.
# Topology state persistence table
The in memory state's machine state is persisted in a local table system.topology.
The schema of the tables is:
CREATE TABLE system.topology (
host_id uuid PRIMARY KEY,
datacenter text,
node_state text,
rack text,
release_version text,
replaced_id uuid,
tokens set<text>,
replication_state text,
topology_request text
rebuild_option text
)
Each node has a row in the table where its host_id is the primary key. The row contains:
host_id - id of the node
datacenter - a name of the datacenter the node belongs to
rack - a name of the rack the node belongs to
release_version - release version of the Scylla on the node
node_state - current state of the node
topology_request - if set contains one of the supported topology requests
tokens - if set contains a list of tokens that belongs to the node
replication_state - if set contains a state the state the token replication is now in
replaced_id - if the node replacing or replaced another node here will be the id of that node
rebuild_option - if the node is being rebuild contains datacenter name that is used as a rebuild source

View File

@@ -23,8 +23,12 @@ struct broadcast_table_query {
service::broadcast_tables::query query;
};
struct topology_change {
std::vector<canonical_mutation> mutations;
};
struct group0_command {
std::variant<service::schema_change, service::broadcast_table_query> change;
std::variant<service::schema_change, service::broadcast_table_query, service::topology_change> change;
canonical_mutation history_append;
std::optional<utils::UUID> prev_state_id;

View File

@@ -0,0 +1,35 @@
/*
* Copyright 2022-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
namespace service {
struct raft_topology_cmd {
enum class command: uint8_t {
barrier,
stream_ranges,
fence_old_reads
};
service::raft_topology_cmd::command cmd;
};
struct raft_topology_cmd_result {
enum class command_status: uint8_t {
fail,
success
};
service::raft_topology_cmd_result::command_status status;
};
struct raft_topology_snapshot {
std::vector<canonical_mutation> mutations;
};
struct raft_topology_pull_params {};
verb raft_topology_cmd (raft::term_t term, service::raft_topology_cmd) -> service::raft_topology_cmd_result;
verb raft_pull_topology_snapshot (service::raft_topology_pull_params) -> service::raft_topology_snapshot;
}

View File

@@ -45,6 +45,7 @@
#include "serializer.hh"
#include "full_position.hh"
#include "db/per_partition_rate_limit_info.hh"
#include "service/topology_state_machine.hh"
#include "idl/consistency_level.dist.hh"
#include "idl/tracing.dist.hh"
#include "idl/result.dist.hh"
@@ -73,6 +74,7 @@
#include "idl/replica_exception.dist.hh"
#include "idl/per_partition_rate_limit_info.dist.hh"
#include "idl/storage_proxy.dist.hh"
#include "idl/storage_service.dist.hh"
#include "message/rpc_protocol_impl.hh"
#include "idl/consistency_level.dist.impl.hh"
#include "idl/tracing.dist.impl.hh"
@@ -113,6 +115,7 @@
#include "idl/partition_checksum.dist.impl.hh"
#include "idl/forward_request.dist.hh"
#include "idl/forward_request.dist.impl.hh"
#include "idl/storage_service.dist.impl.hh"
namespace netw {
@@ -508,6 +511,7 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) {
case messaging_verb::GROUP0_PEER_EXCHANGE:
case messaging_verb::GROUP0_MODIFY_CONFIG:
case messaging_verb::GET_GROUP0_UPGRADE_STATE:
case messaging_verb::RAFT_TOPOLOGY_CMD:
// See comment above `TOPOLOGY_INDEPENDENT_IDX`.
// DO NOT put any 'hot' (e.g. data path) verbs in this group,
// only verbs which are 'rare' and 'cheap'.
@@ -568,6 +572,7 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) {
case messaging_verb::RAFT_ADD_ENTRY:
case messaging_verb::RAFT_MODIFY_CONFIG:
case messaging_verb::DIRECT_FD_PING:
case messaging_verb::RAFT_PULL_TOPOLOGY_SNAPSHOT:
return 2;
case messaging_verb::MUTATION_DONE:
case messaging_verb::MUTATION_FAILED:

View File

@@ -180,7 +180,9 @@ enum class messaging_verb : int32_t {
FORWARD_REQUEST = 61,
GET_GROUP0_UPGRADE_STATE = 62,
DIRECT_FD_PING = 63,
LAST = 64,
RAFT_TOPOLOGY_CMD = 64,
RAFT_PULL_TOPOLOGY_SNAPSHOT = 65,
LAST = 66,
};
} // namespace netw

View File

@@ -24,6 +24,8 @@
#include "idl/frozen_schema.dist.impl.hh"
#include "idl/experimental/broadcast_tables_lang.dist.hh"
#include "idl/experimental/broadcast_tables_lang.dist.impl.hh"
#include "service/storage_service.hh"
#include "idl/storage_service.dist.hh"
#include "idl/group0_state_machine.dist.hh"
#include "idl/group0_state_machine.dist.impl.hh"
#include "service/migration_manager.hh"
@@ -103,6 +105,9 @@ future<> group0_state_machine::apply(std::vector<raft::command_cref> command) {
[&] (broadcast_table_query& query) -> future<> {
auto result = co_await service::broadcast_tables::execute_broadcast_table_query(_sp, query.query, cmd.new_state_id);
_client.set_query_result(cmd.new_state_id, std::move(result));
},
[&] (topology_change& chng) -> future<> {
return _ss.topology_transition(_sp, cmd.creator_addr, std::move(chng.mutations));
}
), cmd.change);
@@ -119,7 +124,10 @@ void group0_state_machine::drop_snapshot(raft::snapshot_id id) {
}
future<> group0_state_machine::load_snapshot(raft::snapshot_id id) {
return make_ready_future<>();
// topology_state_load applies persisted state machine state into
// memory and thus needs to be protected with apply mutex
auto read_apply_mutex_holder = co_await get_units(_client._read_apply_mutex, 1);
co_await _ss.topology_state_load();
}
future<> group0_state_machine::transfer_snapshot(gms::inet_address from, raft::snapshot_descriptor snp) {
@@ -136,6 +144,9 @@ future<> group0_state_machine::transfer_snapshot(gms::inet_address from, raft::s
// (which were introduced a long time ago).
on_internal_error(slogger, "Expected MIGRATION_REQUEST to return canonical mutations");
}
auto topology_snp = co_await ser::storage_service_rpc_verbs::send_raft_pull_topology_snapshot(&_mm._messaging, addr, service::raft_topology_pull_params{});
auto history_mut = extract_history_mutation(*cm, _sp.data_dictionary());
// TODO ensure atomicity of snapshot application in presence of crashes (see TODO in `apply`)
@@ -144,6 +155,10 @@ future<> group0_state_machine::transfer_snapshot(gms::inet_address from, raft::s
co_await _mm.merge_schema_from(addr, std::move(*cm));
if (!topology_snp.mutations.empty()) {
co_await _ss.merge_topology_snapshot(std::move(topology_snp));
}
co_await _sp.mutate_locally({std::move(history_mut)}, nullptr);
}

View File

@@ -29,8 +29,12 @@ struct broadcast_table_query {
service::broadcast_tables::query query;
};
struct topology_change {
std::vector<canonical_mutation> mutations;
};
struct group0_command {
std::variant<schema_change, broadcast_table_query> change;
std::variant<schema_change, broadcast_table_query, topology_change> change;
// Mutation of group0 history table, appending a new state ID and optionally a description.
canonical_mutation history_append;

View File

@@ -123,6 +123,8 @@ group0_guard::~group0_guard() = default;
group0_guard::group0_guard(group0_guard&&) noexcept = default;
group0_guard& group0_guard::operator=(group0_guard&&) noexcept = default;
utils::UUID group0_guard::observed_group0_state_id() const {
return _impl->_observed_group0_state_id;
}
@@ -283,7 +285,9 @@ future<group0_guard> raft_group0_client::start_operation(seastar::abort_source*
}
}
group0_command raft_group0_client::prepare_command(schema_change change, group0_guard& guard, std::string_view description) {
template<typename Command>
requires std::same_as<Command, schema_change> || std::same_as<Command, topology_change>
group0_command raft_group0_client::prepare_command(Command change, group0_guard& guard, std::string_view description) {
group0_command group0_cmd {
.change{std::move(change)},
.history_append{db::system_keyspace::make_group0_history_state_id_mutation(
@@ -411,4 +415,7 @@ void raft_group0_client::set_query_result(utils::UUID query_id, service::broadca
}
}
template group0_command raft_group0_client::prepare_command(schema_change change, group0_guard& guard, std::string_view description);
template group0_command raft_group0_client::prepare_command(topology_change change, group0_guard& guard, std::string_view description);
}

View File

@@ -41,6 +41,7 @@ class group0_guard {
public:
~group0_guard();
group0_guard(group0_guard&&) noexcept;
group0_guard& operator=(group0_guard&&) noexcept;
utils::UUID observed_group0_state_id() const;
utils::UUID new_group0_state_id() const;
@@ -127,8 +128,10 @@ public:
// and add_entry would again forward to shard 0.
future<group0_guard> start_operation(seastar::abort_source* as = nullptr);
group0_command prepare_command(schema_change change, group0_guard& guard, std::string_view description);
group0_command prepare_command(broadcast_table_query query);
template<typename Command>
requires std::same_as<Command, schema_change> || std::same_as<Command, topology_change>
group0_command prepare_command(Command change, group0_guard& guard, std::string_view description);
// Returns the current group 0 upgrade state.
//

View File

@@ -3742,17 +3742,22 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo
auto& stats = handler_ptr->stats();
auto& handler = *handler_ptr;
auto& global_stats = handler._proxy->_global_stats;
auto& topology = handler_ptr->_effective_replication_map_ptr->get_topology();
auto local_dc = topology.get_datacenter();
if (handler.get_targets().size() != 1 || !fbu::is_me(handler.get_targets()[0])) {
auto& topology = handler_ptr->_effective_replication_map_ptr->get_topology();
auto local_dc = topology.get_datacenter();
for(auto dest: handler.get_targets()) {
sstring dc = topology.get_datacenter(dest);
// read repair writes do not go through coordinator since mutations are per destination
if (handler.read_repair_write() || dc == local_dc) {
local.emplace_back("", inet_address_vector_replica_set({dest}));
} else {
dc_groups[dc].push_back(dest);
for(auto dest: handler.get_targets()) {
sstring dc = topology.get_datacenter(dest);
// read repair writes do not go through coordinator since mutations are per destination
if (handler.read_repair_write() || dc == local_dc) {
local.emplace_back("", inet_address_vector_replica_set({dest}));
} else {
dc_groups[dc].push_back(dest);
}
}
} else {
// There is only one target replica and it is me
local.emplace_back("", handler.get_targets());
}
auto all = boost::range::join(local, dc_groups);

File diff suppressed because it is too large Load Diff

View File

@@ -43,6 +43,8 @@
#include "cdc/generation_id.hh"
#include "raft/raft.hh"
#include "repair/id.hh"
#include "raft/server.hh"
#include "service/topology_state_machine.hh"
class node_ops_cmd_request;
class node_ops_cmd_response;
@@ -757,6 +759,40 @@ private:
std::unordered_set<gms::inet_address> _normal_state_handled_on_boot;
bool is_normal_state_handled_on_boot(gms::inet_address);
future<> wait_for_normal_state_handled_on_boot(const std::unordered_set<gms::inet_address>& nodes, sstring ops, node_ops_id uuid);
friend class group0_state_machine;
bool _raft_topology_change_enabled = false;
future<> _raft_state_monitor = make_ready_future<>();
// This fibers monitors raft state and start/stops the topology change
// coordinator fiber
future<> raft_state_monitor_fiber(raft::server&, sharded<db::system_distributed_keyspace>& sys_dist_ks);
// State machine that is responsible for topology change
topology_state_machine _topology_state_machine;
future<> _topology_change_coordinator = make_ready_future<>();
future<> topology_change_coordinator_fiber(raft::server&, raft::term_t, sharded<db::system_distributed_keyspace>&, abort_source&);
// Those futures hold results of streaming for various operations
std::optional<shared_future<>> _bootstrap_result;
std::optional<shared_future<>> _decomission_result;
std::optional<shared_future<>> _rebuild_result;
std::unordered_map<raft::server_id, std::optional<shared_future<>>> _remove_result;
future<raft_topology_cmd_result> raft_topology_cmd_handler(sharded<db::system_distributed_keyspace>& sys_dist_ks, raft::term_t term, const raft_topology_cmd& cmd);
future<> raft_bootstrap(raft::server&);
future<> raft_decomission();
future<> raft_removenode(locator::host_id host_id);
future<> raft_replace(raft::server&, raft::server_id, gms::inet_address);
future<> raft_rebuild(sstring source_dc);
// This is called on all nodes for each new command received through raft
future<> topology_transition(storage_proxy& proxy, gms::inet_address, std::vector<canonical_mutation>);
// load topology state machine snapshot into memory
future<> topology_state_load();
// Applies received raft snapshot to local state machine persistent storage
future<> merge_topology_snapshot(raft_topology_snapshot snp);
};
}

View File

@@ -0,0 +1,118 @@
/*
* Copyright (C) 2022-present ScyllaDB
*
*/
/*
* SPDX-License-Identifier: (AGPL-3.0-or-later and Apache-2.0)
*/
#include "topology_state_machine.hh"
namespace service {
const std::pair<const raft::server_id, replica_state>* topology::find(raft::server_id id) {
auto it = normal_nodes.find(id);
if (it != normal_nodes.end()) {
return &*it;
}
it = transition_nodes.find(id);
if (it != transition_nodes.end()) {
return &*it;
}
it = new_nodes.find(id);
if (it != new_nodes.end()) {
return &*it;
}
return nullptr;
}
bool topology::contains(raft::server_id id) {
return normal_nodes.contains(id) ||
transition_nodes.contains(id) ||
new_nodes.contains(id) ||
left_nodes.contains(id);
}
static std::unordered_map<ring_slice::replication_state, sstring> replication_state_to_name_map = {
{ring_slice::replication_state::write_both_read_old, "write both read old"},
{ring_slice::replication_state::write_both_read_new, "write both read new"},
{ring_slice::replication_state::owner, "owner"},
};
std::ostream& operator<<(std::ostream& os, ring_slice::replication_state s) {
os << replication_state_to_name_map[s];
return os;
}
ring_slice::replication_state replication_state_from_string(const sstring& s) {
for (auto&& e : replication_state_to_name_map) {
if (e.second == s) {
return e.first;
}
}
throw std::runtime_error(fmt::format("cannot map name {} to token_state", s));
}
static std::unordered_map<node_state, sstring> node_state_to_name_map = {
{node_state::bootstrapping, "bootstrapping"},
{node_state::decommissioning, "decommissioning"},
{node_state::removing, "removing"},
{node_state::normal, "normal"},
{node_state::left, "left"},
{node_state::replacing, "replacing"},
{node_state::rebuilding, "rebuilding"},
{node_state::none, "none"}
};
std::ostream& operator<<(std::ostream& os, node_state s) {
os << node_state_to_name_map[s];
return os;
}
node_state node_state_from_string(const sstring& s) {
for (auto&& e : node_state_to_name_map) {
if (e.second == s) {
return e.first;
}
}
throw std::runtime_error(fmt::format("cannot map name {} to node_state", s));
}
static std::unordered_map<topology_request, sstring> topology_request_to_name_map = {
{topology_request::join, "join"},
{topology_request::leave, "leave"},
{topology_request::remove, "remove"},
{topology_request::replace, "replace"},
{topology_request::rebuild, "rebuild"}
};
std::ostream& operator<<(std::ostream& os, const topology_request& req) {
os << topology_request_to_name_map[req];
return os;
}
topology_request topology_request_from_string(const sstring& s) {
for (auto&& e : topology_request_to_name_map) {
if (e.second == s) {
return e.first;
}
}
throw std::runtime_error(fmt::format("cannot map name {} to topology_request", s));
}
std::ostream& operator<<(std::ostream& os, const raft_topology_cmd::command& cmd) {
switch (cmd) {
case raft_topology_cmd::command::barrier:
os << "barrier";
break;
case raft_topology_cmd::command::stream_ranges:
os << "stream_ranges";
break;
case raft_topology_cmd::command::fence_old_reads:
os << "fence_old_reads";
break;
}
return os;
}
}

View File

@@ -0,0 +1,133 @@
/*
* Copyright (C) 2022-present ScyllaDB
*
*/
/*
* SPDX-License-Identifier: (AGPL-3.0-or-later and Apache-2.0)
*/
#pragma once
#include <boost/range/algorithm/find_if.hpp>
#include "boost/range/join.hpp"
#include <iostream>
#include <unordered_set>
#include <unordered_map>
#include <seastar/core/condition-variable.hh>
#include <seastar/core/sstring.hh>
#include <seastar/core/shared_ptr.hh>
#include "dht/token.hh"
#include "raft/raft.hh"
#include "utils/UUID.hh"
#include "dht/i_partitioner.hh"
#include "mutation/canonical_mutation.hh"
namespace service {
enum class node_state: uint8_t {
none, // the new node joined group0 but did not bootstraped yet (has no tokens and data to serve)
bootstrapping, // the node is currently in the process of streaming its part of the ring
decommissioning, // the node is being decomissioned and stream its data to nodes that took over
removing, // the node is being removed and its data is streamed to nodes that took over from still alive owners
replacing, // the node replaces another dead node in the cluster and it data is being streamed to it
rebuilding, // the node is being rebuild and is streaming data from other replicas
normal, // the node does not do any streaming and serves the slice of the ring that belongs to it
left // the node left the cluster and group0
};
enum class topology_request: uint8_t {
join,
leave,
remove,
replace,
rebuild
};
using request_param = std::variant<raft::server_id, sstring, uint32_t>;
struct ring_slice {
enum class replication_state: uint8_t {
write_both_read_old,
write_both_read_new,
owner
};
replication_state state;
std::unordered_set<dht::token> tokens;
};
struct replica_state {
node_state state;
seastar::sstring datacenter;
seastar::sstring rack;
seastar::sstring release_version;
std::optional<ring_slice> ring; // if engaged contain the set of tokens the node owns together with their state
};
struct topology {
// Nodes that are normal members of the ring
std::unordered_map<raft::server_id, replica_state> normal_nodes;
// Nodes that are left
std::unordered_set<raft::server_id> left_nodes;
// Nodes that are waiting to be joined by the topology coordinator
std::unordered_map<raft::server_id, replica_state> new_nodes;
// Nodes that are in the process to be added to the ring
// Currently only at most one node at a time will be here
std::unordered_map<raft::server_id, replica_state> transition_nodes;
// Pending topology requests
std::unordered_map<raft::server_id, topology_request> requests;
// Holds parameters for a request per node and valid during entire
// operation untill the node becomes normal
std::unordered_map<raft::server_id, request_param> req_param;
// Find only nodes in non 'left' state
const std::pair<const raft::server_id, replica_state>* find(raft::server_id id);
// Return true if node exists in any state including 'left' one
bool contains(raft::server_id id);
};
struct raft_topology_snapshot {
std::vector<canonical_mutation> mutations;
};
struct raft_topology_pull_params {
};
// State machine that is responsible for topology change
struct topology_state_machine {
using topology_type = topology;
topology_type _topology;
condition_variable event;
};
// Raft leader uses this command to drive bootstrap process on other nodes
struct raft_topology_cmd {
enum class command: uint8_t {
barrier, // request to wait for the latest topology
stream_ranges, // reqeust to stream data, return when streaming is
// done
fence_old_reads // wait for all reads started before to complete
};
command cmd;
};
// returned as a result of raft_bootstrap_cmd
struct raft_topology_cmd_result {
enum class command_status: uint8_t {
fail,
success
};
command_status status = command_status::fail;
};
std::ostream& operator<<(std::ostream& os, ring_slice::replication_state s);
ring_slice::replication_state replication_state_from_string(const sstring& s);
std::ostream& operator<<(std::ostream& os, node_state s);
node_state node_state_from_string(const sstring& s);
std::ostream& operator<<(std::ostream& os, const topology_request& req);
topology_request topology_request_from_string(const sstring& s);
std::ostream& operator<<(std::ostream& os, const raft_topology_cmd::command& cmd);
}