diff --git a/configure.py b/configure.py index b3f1e9b369..e3ab077c4c 100755 --- a/configure.py +++ b/configure.py @@ -1036,6 +1036,7 @@ scylla_core = (['message/messaging_service.cc', 'tasks/task_manager.cc', 'rust/wasmtime_bindings/src/lib.rs', 'utils/to_string.cc', + 'service/topology_state_machine.cc', ] + [Antlr3Grammar('cql3/Cql.g')] + [Thrift('interface/cassandra.thrift', 'Cassandra')] \ + scylla_raft_core ) @@ -1151,6 +1152,7 @@ idls = ['idl/gossip_digest.idl.hh', 'idl/per_partition_rate_limit_info.idl.hh', 'idl/position_in_partition.idl.hh', 'idl/experimental/broadcast_tables_lang.idl.hh', + 'idl/storage_service.idl.hh', ] headers = find_headers('.', excluded_dirs=['idl', 'build', 'seastar', '.git']) diff --git a/db/schema_tables.cc b/db/schema_tables.cc index 093c61ffc4..1ba73884a5 100644 --- a/db/schema_tables.cc +++ b/db/schema_tables.cc @@ -3008,7 +3008,6 @@ static void prepare_builder_from_table_row(const schema_ctxt& ctxt, schema_build } } - schema_ptr create_table_from_mutations(const schema_ctxt& ctxt, schema_mutations sm, std::optional version) { slogger.trace("create_table_from_mutations: version={}, {}", version, sm); diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index b5aab51622..ac9c34831b 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -78,6 +78,7 @@ namespace { system_keyspace::GROUP0_HISTORY, system_keyspace::DISCOVERY, system_keyspace::BROADCAST_KV_STORE, + system_keyspace::TOPOLOGY, }; if (ks_name == system_keyspace::NAME && system_ks_null_shard_tables.contains(cf_name)) { props.use_null_sharder = true; @@ -91,7 +92,8 @@ namespace { system_keyspace::RAFT_SNAPSHOTS, system_keyspace::RAFT_SNAPSHOT_CONFIG, system_keyspace::DISCOVERY, - system_keyspace::BROADCAST_KV_STORE + system_keyspace::BROADCAST_KV_STORE, + system_keyspace::TOPOLOGY, }; if (ks_name == system_keyspace::NAME && extra_durable_tables.contains(cf_name)) { props.wait_for_sync_to_commitlog = true; @@ -213,6 +215,29 @@ schema_ptr system_keyspace::batchlog() { return paxos; } +schema_ptr system_keyspace::topology() { + static thread_local auto schema = [] { + auto id = generate_legacy_id(NAME, TOPOLOGY); + return schema_builder(NAME, TOPOLOGY, std::optional(id)) + .with_column("key", utf8_type, column_kind::partition_key) + .with_column("host_id", uuid_type, column_kind::clustering_key) + .with_column("datacenter", utf8_type) + .with_column("rack", utf8_type) + .with_column("tokens", set_type_impl::get_instance(utf8_type, true)) + .with_column("replication_state", utf8_type) + .with_column("node_state", utf8_type) + .with_column("release_version", utf8_type) + .with_column("topology_request", utf8_type) + .with_column("replaced_id", uuid_type) + .with_column("rebuild_option", utf8_type) + .with_column("num_tokens", int32_type) + .set_comment("Current state of topology change machine") + .with_version(generate_schema_version(id)) + .build(); + }(); + return schema; +} + schema_ptr system_keyspace::raft() { static thread_local auto schema = [] { auto id = generate_legacy_id(NAME, RAFT); @@ -2811,6 +2836,10 @@ std::vector system_keyspace::all_tables(const db::config& cfg) { if (cfg.consistent_cluster_management()) { r.insert(r.end(), {raft(), raft_snapshots(), raft_snapshot_config(), group0_history(), discovery()}); + if (cfg.check_experimental(db::experimental_features_t::feature::RAFT)) { + r.insert(r.end(), {topology()}); + } + if (cfg.check_experimental(db::experimental_features_t::feature::BROADCAST_TABLES)) { r.insert(r.end(), {broadcast_kv_store()}); } @@ -3434,6 +3463,180 @@ future<> system_keyspace::save_group0_upgrade_state(service::group0_upgrade_stat return set_scylla_local_param(GROUP0_UPGRADE_STATE_KEY, value); } +future system_keyspace::load_topology_state() { + auto rs = co_await qctx->execute_cql( + format("SELECT * FROM system.{} WHERE key = '{}'", TOPOLOGY, TOPOLOGY)); + assert(rs); + + service::topology_state_machine::topology_type ret; + + if (rs->empty()) { + co_return ret; + } + + for (auto& row : *rs) { + raft::server_id host_id{row.get_as("host_id")}; + auto datacenter = row.get_as("datacenter"); + auto rack = row.get_as("rack"); + auto release_version = row.get_as("release_version"); + uint32_t num_tokens = row.get_as("num_tokens"); + + service::node_state nstate = service::node_state_from_string(row.get_as("node_state")); + + std::optional tstate; + if (row.has("replication_state")) { + tstate = service::replication_state_from_string(row.get_as("replication_state")); + } + + std::unordered_set t; + + if (row.has("tokens")) { + auto blob = row.get_blob("tokens"); + auto cdef = topology()->get_column_definition("tokens"); + auto deserialized = cdef->type->deserialize(blob); + auto tokens = value_cast(deserialized); + t = decode_tokens(tokens); + } + + std::optional replaced_id; + if (row.has("replaced_id")) { + replaced_id = raft::server_id(row.get_as("replaced_id")); + } + + std::optional rebuild_option; + if (row.has("rebuild_option")) { + rebuild_option = row.get_as("rebuild_option"); + } + + if (row.has("topology_request")) { + auto req = service::topology_request_from_string(row.get_as("topology_request")); + ret.requests.emplace(host_id, req); + switch(req) { + case service::topology_request::replace: + if (!replaced_id) { + on_internal_error(slogger, fmt::format("replaced_id is missing for a node {}", host_id)); + } + ret.req_param.emplace(host_id, *replaced_id); + break; + case service::topology_request::rebuild: + if (!rebuild_option) { + on_internal_error(slogger, fmt::format("rebuild_option is missing for a node {}", host_id)); + } + ret.req_param.emplace(host_id, *rebuild_option); + break; + case service::topology_request::join: + ret.req_param.emplace(host_id, num_tokens); + break; + default: + // no parameters for other requests + break; + } + } else { + switch (nstate) { + case service::node_state::replacing: + // If a node is replacing abother node we need to know which node it is replacing + if (!replaced_id) { + on_internal_error(slogger, fmt::format("replaced_id is missing for a node {}", host_id)); + } + ret.req_param.emplace(host_id, *replaced_id); + break; + case service::node_state::rebuilding: + // If a node is rebuilding it needs to know the parameter for the operation + if (!rebuild_option) { + on_internal_error(slogger, fmt::format("rebuild_option is missing for a node {}", host_id)); + } + ret.req_param.emplace(host_id, *rebuild_option); + break; + default: + // no parameters for other operations + break; + } + } + + if (!tstate && t.size() != 0) { + on_fatal_internal_error(slogger, "There cannot be tokens without the replication state"); + } + std::unordered_map* map = nullptr; + if (nstate == service::node_state::normal) { + map = &ret.normal_nodes; + } else if (nstate == service::node_state::left) { + ret.left_nodes.emplace(host_id); + } else if (nstate == service::node_state::none) { + map = &ret.new_nodes; + } else { + map = &ret.transition_nodes; + } + if (map) { + map->emplace(host_id, service::replica_state{nstate, std::move(datacenter), std::move(rack), std::move(release_version), + tstate ? std::optional(service::ring_slice{*tstate, std::move(t)}) : std::nullopt}); + } + } + + co_return ret; +} + +system_keyspace::topology_mutation_builder::topology_mutation_builder(api::timestamp_type ts, raft::server_id id) : + _s(topology()), + _m(_s, partition_key::from_singular(*_s, TOPOLOGY)), + _ts(ts), + _r(_m.partition().clustered_row(*_s, clustering_key::from_singular(*_s, id.uuid()))) { + _r.apply(row_marker(_ts)); +} + +system_keyspace::topology_mutation_builder& system_keyspace::topology_mutation_builder::set(const char* cell, const sstring& value) { + auto cdef = _s->get_column_definition(cell); + assert(cdef); + _r.cells().apply(*cdef, atomic_cell::make_live(*cdef->type, _ts, cdef->type->decompose(value))); + return *this; +} + +system_keyspace::topology_mutation_builder& system_keyspace::topology_mutation_builder::set(const char* cell, const raft::server_id& value) { + auto cdef = _s->get_column_definition(cell); + assert(cdef); + _r.cells().apply(*cdef, atomic_cell::make_live(*cdef->type, _ts, cdef->type->decompose(value.uuid()))); + return *this; +} + +system_keyspace::topology_mutation_builder& system_keyspace::topology_mutation_builder::set(const char* cell, const uint32_t& value) { + auto cdef = _s->get_column_definition(cell); + assert(cdef); + _r.cells().apply(*cdef, atomic_cell::make_live(*cdef->type, _ts, cdef->type->decompose(int32_t(value)))); + return *this; +} + +system_keyspace::topology_mutation_builder& system_keyspace::topology_mutation_builder::del(const char* cell) { + auto cdef = _s->get_column_definition(cell); + assert(cdef); + if (!cdef->type->is_multi_cell()) { + _r.cells().apply(*cdef, atomic_cell::make_dead(_ts, gc_clock::now())); + } else { + collection_mutation_description cm; + cm.tomb = tombstone{_ts, gc_clock::now()}; + _r.cells().apply(*cdef, cm.serialize(*cdef->type)); + } + return *this; +} + +system_keyspace::topology_mutation_builder& system_keyspace::topology_mutation_builder::set(const char* cell, const std::unordered_set& tokens) { + auto cdef = _s->get_column_definition(cell); + assert(cdef); + collection_mutation_description cm; + if (tokens.size()) { + auto vtype = static_pointer_cast(cdef->type)->get_elements_type(); + + cm.cells.reserve(tokens.size()); + + for (auto&& value : tokens) { + cm.cells.emplace_back(vtype->decompose(value.to_sstring()), atomic_cell::make_live(*bytes_type, _ts, bytes_view())); + } + + _r.cells().apply(*cdef, cm.serialize(*cdef->type)); + } else { + del(cell); + } + return *this; +} + sstring system_keyspace_name() { return system_keyspace::NAME; } diff --git a/db/system_keyspace.hh b/db/system_keyspace.hh index c32b213fca..6221f02f80 100644 --- a/db/system_keyspace.hh +++ b/db/system_keyspace.hh @@ -27,6 +27,8 @@ #include "locator/host_id.hh" #include "service/raft/group0_fwd.hh" #include "tasks/task_manager.hh" +#include "service/topology_state_machine.hh" +#include "mutation/canonical_mutation.hh" namespace service { @@ -153,6 +155,7 @@ public: static constexpr auto GROUP0_HISTORY = "group0_history"; static constexpr auto DISCOVERY = "discovery"; static constexpr auto BROADCAST_KV_STORE = "broadcast_kv_store"; + static constexpr auto TOPOLOGY = "topology"; struct v3 { static constexpr auto BATCHES = "batches"; @@ -233,6 +236,7 @@ public: static schema_ptr group0_history(); static schema_ptr discovery(); static schema_ptr broadcast_kv_store(); + static schema_ptr topology(); static table_schema_version generate_schema_version(table_id table_id, uint16_t offset = 0); @@ -439,6 +443,26 @@ public: // Assumes that the history table exists, i.e. Raft experimental feature is enabled. static future group0_history_contains(utils::UUID state_id); + class topology_mutation_builder { + schema_ptr _s; + mutation _m; + api::timestamp_type _ts; + deletable_row& _r; + public: + topology_mutation_builder(api::timestamp_type ts, raft::server_id); + template + topology_mutation_builder& set(const char* cell, const T& value) { + return set(cell, sstring{fmt::format("{}", value)}); + } + topology_mutation_builder& set(const char* cell, const sstring& value); + topology_mutation_builder& set(const char* cell, const raft::server_id& value); + topology_mutation_builder& set(const char* cell, const std::unordered_set& value); + topology_mutation_builder& set(const char* cell, const uint32_t& value); + topology_mutation_builder& del(const char* cell); + canonical_mutation build() { return canonical_mutation{std::move(_m)}; } + }; + static future load_topology_state(); + // The mutation appends the given state ID to the group 0 history table, with the given description if non-empty. // // If `gc_older_than` is provided, the mutation will also contain a tombstone that clears all entries whose diff --git a/dht/boot_strapper.cc b/dht/boot_strapper.cc index eccd66475d..9490b551af 100644 --- a/dht/boot_strapper.cc +++ b/dht/boot_strapper.cc @@ -63,6 +63,20 @@ future<> boot_strapper::bootstrap(streaming::stream_reason reason, gms::gossiper } } +std::unordered_set boot_strapper::get_random_bootstrap_tokens(const token_metadata_ptr tmptr, size_t num_tokens, dht::check_token_endpoint check) { + if (num_tokens < 1) { + throw std::runtime_error("num_tokens must be >= 1"); + } + + if (num_tokens == 1) { + blogger.warn("Picking random token for a single vnode. You should probably add more vnodes; failing that, you should probably specify the token manually"); + } + + auto tokens = get_random_tokens(std::move(tmptr), num_tokens); + blogger.info("Get random bootstrap_tokens={}", tokens); + return tokens; +} + std::unordered_set boot_strapper::get_bootstrap_tokens(const token_metadata_ptr tmptr, const db::config& cfg, dht::check_token_endpoint check) { std::unordered_set initial_tokens; sstring tokens_string = cfg.initial_token(); @@ -87,19 +101,7 @@ std::unordered_set boot_strapper::get_bootstrap_tokens(const token_metada blogger.info("Get manually specified bootstrap_tokens={}", tokens); return tokens; } - - size_t num_tokens = cfg.num_tokens(); - if (num_tokens < 1) { - throw std::runtime_error("num_tokens must be >= 1"); - } - - if (num_tokens == 1) { - blogger.warn("Picking random token for a single vnode. You should probably add more vnodes; failing that, you should probably specify the token manually"); - } - - auto tokens = get_random_tokens(std::move(tmptr), num_tokens); - blogger.info("Get random bootstrap_tokens={}", tokens); - return tokens; + return get_random_bootstrap_tokens(tmptr, cfg.num_tokens(), check); } std::unordered_set boot_strapper::get_random_tokens(const token_metadata_ptr tmptr, size_t num_tokens) { diff --git a/dht/boot_strapper.hh b/dht/boot_strapper.hh index 28da8eec84..87acc15665 100644 --- a/dht/boot_strapper.hh +++ b/dht/boot_strapper.hh @@ -61,6 +61,11 @@ public: */ static std::unordered_set get_bootstrap_tokens(const token_metadata_ptr tmptr, const db::config& cfg, check_token_endpoint check); + /** + * Same as above but does not consult initialtoken config + */ + static std::unordered_set get_random_bootstrap_tokens(const token_metadata_ptr tmptr, size_t num_tokens, check_token_endpoint check); + static std::unordered_set get_random_tokens(const token_metadata_ptr tmptr, size_t num_tokens); #if 0 public static class StringSerializer implements IVersionedSerializer diff --git a/docs/dev/topology-over-raft.md b/docs/dev/topology-over-raft.md new file mode 100644 index 0000000000..6ab22749f6 --- /dev/null +++ b/docs/dev/topology-over-raft.md @@ -0,0 +1,79 @@ +# Topology state machine + +The topology state machine tracks all the nodes in a cluster, +their state, properties (topology, tokens, etc) and requested actions. + +Node state can be one of those: + none - the new node joined group0 but did not bootstraped yet (has no tokens and data to serve) + bootstrapping - the node is currently in the process of streaming its part of the ring + decommissioning - the node is being decomissioned and stream its data to nodes that took over + removing - the node is being removed and its data is streamed to nodes that took over from still alive owners + replacing - the node replaces another dead node in the cluster and it data is being streamed to it + rebuilding - the node is being rebuild and is streaming data from other replicas + normal - the node does not do any streaming and serves the slice of the ring that belongs to it + left - the node left the cluster and group0 + +Nodes in state left are never removed from the state. + +State transition diagram: + +{none} ------> {bootstrapping|replacing} ------> {normal} <---> {rebuilding} + | | | + | | | + | V V + ----------------> {left} <-------- {decommissioning|removing} + + +A state may have additional parameters associated with it. For instance +'replacing' state has host id of a node been replaced as a parameter. + +Tokens also can be in one of the states: + +write_both_read_old - writes are going to new and old replica, but reads are from + old replicas still +write_both_read_new - writes still going to old and new replicas but reads are + from new replica +owner - tokens are owned by the node and reads and write go to new + replica set only + +Tokens that needs to be move start in 'write_both_read_old' state. After entire +cluster learns about it streaming start. After the streaming tokens move +to 'write_both_read_new' state and again the whole cluster needs to learn about it +and make sure no reads started before that point exist in the system. +After that tokens may move to the 'owner' state. + +The state machine also maintains a map of topology requests per node. +When a request is issued to a node the entry is added to the map. A +request is one of the topology operation currently supported: join, +leave, replace, remove and rebuild. A request may also have parameters +associated with it which are also stored in a separate map. + +# Topology state persistence table + +The in memory state's machine state is persisted in a local table system.topology. +The schema of the tables is: + +CREATE TABLE system.topology ( + host_id uuid PRIMARY KEY, + datacenter text, + node_state text, + rack text, + release_version text, + replaced_id uuid, + tokens set, + replication_state text, + topology_request text + rebuild_option text +) + +Each node has a row in the table where its host_id is the primary key. The row contains: + host_id - id of the node + datacenter - a name of the datacenter the node belongs to + rack - a name of the rack the node belongs to + release_version - release version of the Scylla on the node + node_state - current state of the node + topology_request - if set contains one of the supported topology requests + tokens - if set contains a list of tokens that belongs to the node + replication_state - if set contains a state the state the token replication is now in + replaced_id - if the node replacing or replaced another node here will be the id of that node + rebuild_option - if the node is being rebuild contains datacenter name that is used as a rebuild source diff --git a/idl/group0_state_machine.idl.hh b/idl/group0_state_machine.idl.hh index 26a7bee4bc..b9e5667e01 100644 --- a/idl/group0_state_machine.idl.hh +++ b/idl/group0_state_machine.idl.hh @@ -23,8 +23,12 @@ struct broadcast_table_query { service::broadcast_tables::query query; }; +struct topology_change { + std::vector mutations; +}; + struct group0_command { - std::variant change; + std::variant change; canonical_mutation history_append; std::optional prev_state_id; diff --git a/idl/storage_service.idl.hh b/idl/storage_service.idl.hh new file mode 100644 index 0000000000..c2cb480350 --- /dev/null +++ b/idl/storage_service.idl.hh @@ -0,0 +1,35 @@ +/* + * Copyright 2022-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +namespace service { + struct raft_topology_cmd { + enum class command: uint8_t { + barrier, + stream_ranges, + fence_old_reads + }; + service::raft_topology_cmd::command cmd; + }; + + struct raft_topology_cmd_result { + enum class command_status: uint8_t { + fail, + success + }; + service::raft_topology_cmd_result::command_status status; + }; + + struct raft_topology_snapshot { + std::vector mutations; + }; + + struct raft_topology_pull_params {}; + + verb raft_topology_cmd (raft::term_t term, service::raft_topology_cmd) -> service::raft_topology_cmd_result; + verb raft_pull_topology_snapshot (service::raft_topology_pull_params) -> service::raft_topology_snapshot; +} diff --git a/message/messaging_service.cc b/message/messaging_service.cc index 45569c7abe..6f2701f088 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -45,6 +45,7 @@ #include "serializer.hh" #include "full_position.hh" #include "db/per_partition_rate_limit_info.hh" +#include "service/topology_state_machine.hh" #include "idl/consistency_level.dist.hh" #include "idl/tracing.dist.hh" #include "idl/result.dist.hh" @@ -73,6 +74,7 @@ #include "idl/replica_exception.dist.hh" #include "idl/per_partition_rate_limit_info.dist.hh" #include "idl/storage_proxy.dist.hh" +#include "idl/storage_service.dist.hh" #include "message/rpc_protocol_impl.hh" #include "idl/consistency_level.dist.impl.hh" #include "idl/tracing.dist.impl.hh" @@ -113,6 +115,7 @@ #include "idl/partition_checksum.dist.impl.hh" #include "idl/forward_request.dist.hh" #include "idl/forward_request.dist.impl.hh" +#include "idl/storage_service.dist.impl.hh" namespace netw { @@ -508,6 +511,7 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) { case messaging_verb::GROUP0_PEER_EXCHANGE: case messaging_verb::GROUP0_MODIFY_CONFIG: case messaging_verb::GET_GROUP0_UPGRADE_STATE: + case messaging_verb::RAFT_TOPOLOGY_CMD: // See comment above `TOPOLOGY_INDEPENDENT_IDX`. // DO NOT put any 'hot' (e.g. data path) verbs in this group, // only verbs which are 'rare' and 'cheap'. @@ -568,6 +572,7 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) { case messaging_verb::RAFT_ADD_ENTRY: case messaging_verb::RAFT_MODIFY_CONFIG: case messaging_verb::DIRECT_FD_PING: + case messaging_verb::RAFT_PULL_TOPOLOGY_SNAPSHOT: return 2; case messaging_verb::MUTATION_DONE: case messaging_verb::MUTATION_FAILED: diff --git a/message/messaging_service.hh b/message/messaging_service.hh index 58fa70b6f6..008e1bc647 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -180,7 +180,9 @@ enum class messaging_verb : int32_t { FORWARD_REQUEST = 61, GET_GROUP0_UPGRADE_STATE = 62, DIRECT_FD_PING = 63, - LAST = 64, + RAFT_TOPOLOGY_CMD = 64, + RAFT_PULL_TOPOLOGY_SNAPSHOT = 65, + LAST = 66, }; } // namespace netw diff --git a/service/raft/group0_state_machine.cc b/service/raft/group0_state_machine.cc index 7e7c6bb05d..c48b678a9c 100644 --- a/service/raft/group0_state_machine.cc +++ b/service/raft/group0_state_machine.cc @@ -24,6 +24,8 @@ #include "idl/frozen_schema.dist.impl.hh" #include "idl/experimental/broadcast_tables_lang.dist.hh" #include "idl/experimental/broadcast_tables_lang.dist.impl.hh" +#include "service/storage_service.hh" +#include "idl/storage_service.dist.hh" #include "idl/group0_state_machine.dist.hh" #include "idl/group0_state_machine.dist.impl.hh" #include "service/migration_manager.hh" @@ -103,6 +105,9 @@ future<> group0_state_machine::apply(std::vector command) { [&] (broadcast_table_query& query) -> future<> { auto result = co_await service::broadcast_tables::execute_broadcast_table_query(_sp, query.query, cmd.new_state_id); _client.set_query_result(cmd.new_state_id, std::move(result)); + }, + [&] (topology_change& chng) -> future<> { + return _ss.topology_transition(_sp, cmd.creator_addr, std::move(chng.mutations)); } ), cmd.change); @@ -119,7 +124,10 @@ void group0_state_machine::drop_snapshot(raft::snapshot_id id) { } future<> group0_state_machine::load_snapshot(raft::snapshot_id id) { - return make_ready_future<>(); + // topology_state_load applies persisted state machine state into + // memory and thus needs to be protected with apply mutex + auto read_apply_mutex_holder = co_await get_units(_client._read_apply_mutex, 1); + co_await _ss.topology_state_load(); } future<> group0_state_machine::transfer_snapshot(gms::inet_address from, raft::snapshot_descriptor snp) { @@ -136,6 +144,9 @@ future<> group0_state_machine::transfer_snapshot(gms::inet_address from, raft::s // (which were introduced a long time ago). on_internal_error(slogger, "Expected MIGRATION_REQUEST to return canonical mutations"); } + + auto topology_snp = co_await ser::storage_service_rpc_verbs::send_raft_pull_topology_snapshot(&_mm._messaging, addr, service::raft_topology_pull_params{}); + auto history_mut = extract_history_mutation(*cm, _sp.data_dictionary()); // TODO ensure atomicity of snapshot application in presence of crashes (see TODO in `apply`) @@ -144,6 +155,10 @@ future<> group0_state_machine::transfer_snapshot(gms::inet_address from, raft::s co_await _mm.merge_schema_from(addr, std::move(*cm)); + if (!topology_snp.mutations.empty()) { + co_await _ss.merge_topology_snapshot(std::move(topology_snp)); + } + co_await _sp.mutate_locally({std::move(history_mut)}, nullptr); } diff --git a/service/raft/group0_state_machine.hh b/service/raft/group0_state_machine.hh index af3b4c2df4..d1535fb769 100644 --- a/service/raft/group0_state_machine.hh +++ b/service/raft/group0_state_machine.hh @@ -29,8 +29,12 @@ struct broadcast_table_query { service::broadcast_tables::query query; }; +struct topology_change { + std::vector mutations; +}; + struct group0_command { - std::variant change; + std::variant change; // Mutation of group0 history table, appending a new state ID and optionally a description. canonical_mutation history_append; diff --git a/service/raft/raft_group0_client.cc b/service/raft/raft_group0_client.cc index 7c4bc25e8e..68e03244d9 100644 --- a/service/raft/raft_group0_client.cc +++ b/service/raft/raft_group0_client.cc @@ -123,6 +123,8 @@ group0_guard::~group0_guard() = default; group0_guard::group0_guard(group0_guard&&) noexcept = default; +group0_guard& group0_guard::operator=(group0_guard&&) noexcept = default; + utils::UUID group0_guard::observed_group0_state_id() const { return _impl->_observed_group0_state_id; } @@ -283,7 +285,9 @@ future raft_group0_client::start_operation(seastar::abort_source* } } -group0_command raft_group0_client::prepare_command(schema_change change, group0_guard& guard, std::string_view description) { +template +requires std::same_as || std::same_as +group0_command raft_group0_client::prepare_command(Command change, group0_guard& guard, std::string_view description) { group0_command group0_cmd { .change{std::move(change)}, .history_append{db::system_keyspace::make_group0_history_state_id_mutation( @@ -411,4 +415,7 @@ void raft_group0_client::set_query_result(utils::UUID query_id, service::broadca } } +template group0_command raft_group0_client::prepare_command(schema_change change, group0_guard& guard, std::string_view description); +template group0_command raft_group0_client::prepare_command(topology_change change, group0_guard& guard, std::string_view description); + } diff --git a/service/raft/raft_group0_client.hh b/service/raft/raft_group0_client.hh index 08cfac71fb..e700fc4676 100644 --- a/service/raft/raft_group0_client.hh +++ b/service/raft/raft_group0_client.hh @@ -41,6 +41,7 @@ class group0_guard { public: ~group0_guard(); group0_guard(group0_guard&&) noexcept; + group0_guard& operator=(group0_guard&&) noexcept; utils::UUID observed_group0_state_id() const; utils::UUID new_group0_state_id() const; @@ -127,8 +128,10 @@ public: // and add_entry would again forward to shard 0. future start_operation(seastar::abort_source* as = nullptr); - group0_command prepare_command(schema_change change, group0_guard& guard, std::string_view description); group0_command prepare_command(broadcast_table_query query); + template + requires std::same_as || std::same_as + group0_command prepare_command(Command change, group0_guard& guard, std::string_view description); // Returns the current group 0 upgrade state. // diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 664e129588..b39c0b55a1 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -3742,17 +3742,22 @@ void storage_proxy::send_to_live_endpoints(storage_proxy::response_id_type respo auto& stats = handler_ptr->stats(); auto& handler = *handler_ptr; auto& global_stats = handler._proxy->_global_stats; - auto& topology = handler_ptr->_effective_replication_map_ptr->get_topology(); - auto local_dc = topology.get_datacenter(); + if (handler.get_targets().size() != 1 || !fbu::is_me(handler.get_targets()[0])) { + auto& topology = handler_ptr->_effective_replication_map_ptr->get_topology(); + auto local_dc = topology.get_datacenter(); - for(auto dest: handler.get_targets()) { - sstring dc = topology.get_datacenter(dest); - // read repair writes do not go through coordinator since mutations are per destination - if (handler.read_repair_write() || dc == local_dc) { - local.emplace_back("", inet_address_vector_replica_set({dest})); - } else { - dc_groups[dc].push_back(dest); + for(auto dest: handler.get_targets()) { + sstring dc = topology.get_datacenter(dest); + // read repair writes do not go through coordinator since mutations are per destination + if (handler.read_repair_write() || dc == local_dc) { + local.emplace_back("", inet_address_vector_replica_set({dest})); + } else { + dc_groups[dc].push_back(dest); + } } + } else { + // There is only one target replica and it is me + local.emplace_back("", handler.get_targets()); } auto all = boost::range::join(local, dc_groups); diff --git a/service/storage_service.cc b/service/storage_service.cc index c0650f6c15..4ae26569ad 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -13,6 +13,7 @@ #include "dht/boot_strapper.hh" #include #include +#include #include "locator/snitch_base.hh" #include "locator/production_snitch_base.hh" #include "db/system_keyspace.hh" @@ -69,6 +70,9 @@ #include "utils/error_injection.hh" #include "utils/fb_utilities.hh" #include "locator/util.hh" +#include "idl/storage_service.dist.hh" +#include "service/storage_proxy.hh" +#include "service/raft/raft_address_map.hh" #include #include @@ -278,6 +282,645 @@ future<> storage_service::wait_for_ring_to_settle(std::chrono::milliseconds dela slogger.info("Checking bootstrapping/leaving nodes: ok"); } +future<> storage_service::topology_state_load() { +#ifdef SEASTAR_DEBUG + static bool running = false; + assert(!running); // The function is not re-entrant + auto d = defer([] { + running = false; + }); + running = true; +#endif + + if (!_raft_topology_change_enabled) { + co_return; + } + + slogger.debug("raft topology: reload raft topology state"); + // read topology state from disk and recreate token_metadata from it + _topology_state_machine._topology = co_await db::system_keyspace::load_topology_state(); + + const auto& am = _group0->address_map(); + auto id2ip = [this, &am] (raft::server_id id) -> future { + auto ip = am.find(id); + while (!ip) { + static logger::rate_limit rate_limit{std::chrono::seconds(1)}; + slogger.log(log_level::warn, rate_limit, "raft topology: cannot map {} to ip, retrying.", id); + // FIXME: https://github.com/scylladb/scylladb/issues/12279 + // Loop until gossiper figures the address + // but the solution is to change token_metadata to work with server_ids instead of ips + co_await sleep_abortable(std::chrono::milliseconds(5), _abort_source); + ip = am.find(id); + } + co_return *ip; + }; + + for (const auto& id: _topology_state_machine._topology.left_nodes) { + auto ip = co_await id2ip(id); + if (_gossiper.get_live_members().contains(ip) || _gossiper.get_unreachable_members().contains(ip)) { + co_await remove_endpoint(ip); + } + } + + co_await mutate_token_metadata(seastar::coroutine::lambda([this, &id2ip, &am] (mutable_token_metadata_ptr tmptr) -> future<> { + co_await tmptr->clear_gently(); // drop previous state + + auto add_normal_node = [&] (raft::server_id id, const replica_state& rs) -> future<> { + assert (rs.ring.value().state == ring_slice::ring_slice::replication_state::owner); + + locator::host_id host_id{id.uuid()}; + auto ip = co_await id2ip(id); + + slogger.trace("raft topology: loading topology: raft id={} ip={} node state={} dc={} rack={} tokens state={} tokens={}", + id, ip, rs.state, rs.datacenter, rs.rack, rs.ring.value().state, rs.ring.value().tokens); + // Save tokens, not needed for raft topology management, but needed by legacy + // Also ip -> id mapping is needed for address map recreation on reboot + if (!utils::fb_utilities::is_me(ip)) { + co_await _sys_ks.local().update_tokens(ip, rs.ring.value().tokens); + co_await _sys_ks.local().update_peer_info(ip, "data_center", rs.datacenter); + co_await _sys_ks.local().update_peer_info(ip, "rack", rs.rack); + co_await _sys_ks.local().update_peer_info(ip, "host_id", id.uuid()); + co_await _sys_ks.local().update_peer_info(ip, "release_version", rs.release_version); + } else { + co_await _sys_ks.local().update_tokens(rs.ring.value().tokens); + co_await _gossiper.add_local_application_state({{ gms::application_state::STATUS, gms::versioned_value::normal(rs.ring.value().tokens) }}); + } + tmptr->update_topology(ip, locator::endpoint_dc_rack{rs.datacenter, rs.rack}); + co_await tmptr->update_normal_tokens(rs.ring.value().tokens, ip); + tmptr->update_host_id(host_id, ip); + }; + + for (const auto& [id, rs]: _topology_state_machine._topology.normal_nodes) { + co_await add_normal_node(id, rs); + } + + for (const auto& [id, rs]: _topology_state_machine._topology.transition_nodes) { + locator::host_id host_id{id.uuid()}; + auto ip = co_await id2ip(id); + + slogger.trace("raft topology: loading topology: raft id={} ip={} node state={} dc={} rack={} tokens state={} tokens={}", + id, ip, rs.state, rs.datacenter, rs.rack, rs.ring->state, rs.ring->tokens); + + switch (rs.state) { + case node_state::bootstrapping: + if (!utils::fb_utilities::is_me(ip)) { + // Save ip -> id mapping in peers table because we need it on restart, but do not save tokens until owned + co_await _sys_ks.local().update_tokens(ip, {}); + co_await _sys_ks.local().update_peer_info(ip, "host_id", id.uuid()); + } + tmptr->update_topology(ip, locator::endpoint_dc_rack{rs.datacenter, rs.rack}); + tmptr->add_bootstrap_tokens(rs.ring.value().tokens, ip); + co_await update_pending_ranges(tmptr, format("bootstrapping node {}/{}", id, ip)); + break; + case node_state::decommissioning: + case node_state::removing: + tmptr->update_topology(ip, locator::endpoint_dc_rack{rs.datacenter, rs.rack}); + co_await tmptr->update_normal_tokens(rs.ring.value().tokens, ip); + tmptr->update_host_id(host_id, ip); + tmptr->add_leaving_endpoint(ip); + co_await update_pending_ranges(tmptr, format("{} {}/{}", rs.state, id, ip)); + break; + case node_state::replacing: { + assert(_topology_state_machine._topology.req_param.contains(id)); + auto replaced_id = std::get(_topology_state_machine._topology.req_param[id]); + auto existing_ip = am.find(replaced_id); + if (!existing_ip) { + // FIXME: What if not known? + on_fatal_internal_error(slogger, format("Cannot map id of a node being replaced {} to its ip", replaced_id)); + } + assert(existing_ip); + tmptr->update_topology(ip, locator::endpoint_dc_rack{rs.datacenter, rs.rack}); + tmptr->add_replacing_endpoint(*existing_ip, ip); + co_await update_pending_ranges(tmptr, format("replacing {}/{} by {}/{}", replaced_id, *existing_ip, id, ip)); + } + break; + case node_state::rebuilding: + // Rebuilding node is normal + co_await add_normal_node(id, rs); + break; + default: + on_fatal_internal_error(slogger, format("Unexpected state {} for node {}", rs.state, id)); + } + } + })); +} + +future<> storage_service::topology_transition(storage_proxy& proxy, gms::inet_address from, std::vector cms) { + assert(this_shard_id() == 0); + // write new state into persistent storage + std::vector mutations; + mutations.reserve(cms.size()); + try { + for (const auto& cm : cms) { + auto& tbl = _db.local().find_column_family(cm.column_family_id()); + mutations.emplace_back(cm.to_mutation(tbl.schema())); + } + } catch (replica::no_such_column_family& e) { + slogger.error("Error while applying topology mutations from {}: {}", from, e); + throw std::runtime_error(fmt::format("Error while applying topology mutations: {}", e)); + } + + co_await proxy.mutate_locally(std::move(mutations), tracing::trace_state_ptr()); + + co_await topology_state_load(); // reload new state + + _topology_state_machine.event.signal(); +} + +future<> storage_service::merge_topology_snapshot(raft_topology_snapshot snp) { + auto s = _db.local().find_schema(db::system_keyspace::NAME, db::system_keyspace::TOPOLOGY); + std::vector muts; + muts.reserve(snp.mutations.size()); + boost::transform(snp.mutations, std::back_inserter(muts), [s] (const canonical_mutation& m) { + return m.to_mutation(s); + }); + co_await _db.local().apply(freeze(muts), db::no_timeout); +} + +future<> storage_service::topology_change_coordinator_fiber(raft::server& raft, raft::term_t term, sharded& sys_dist_ks, abort_source& as) { + slogger.info("raft topology: start topology coordinator fiber"); + + auto abort = as.subscribe([this] () noexcept { + _topology_state_machine.event.signal(); + }); + + const auto& am = _group0->address_map(); + + // This is a topology snapshot for a given node. It contains pointers into the topology state machine + // that may be outdated after guard is released so the structure is meant to be destroyed together + // with the guard + struct node_to_work_on { + group0_guard guard; + const topology_state_machine::topology_type* topology; + raft::server_id id; + const replica_state* rs; + std::optional request; + std::optional req_param; + }; + + // The topology coordinator takes guard before operation start, but it releases it during various + // RPC commands that it sends to make it possible to submit new requests to the state machine while + // the coordinator drives current topology change. It is safe to do so since only the coordinator is + // ever allowed to change node's state, others may only create requests. To make sure the coordinator did + // not change while the lock was released, and hence the old coordinator does not work on old state, we check + // that the raft term is still the same after the lock is re-acquired. Throw term_changed_error if it did. + + struct term_changed_error{}; + + auto release_node = [] (std::optional node) { node.reset(); }; + auto retake_node = [this, term, &raft] (raft::server_id id, abort_source& as) -> future { + auto guard = co_await _group0->client().start_operation(&as); + + if (term != raft.get_current_term()) { + throw term_changed_error{}; + } + + auto& topo = _topology_state_machine._topology; + + auto it = topo.find(id); + assert(it); + + std::optional req; + auto rit = topo.requests.find(id); + if (rit != topo.requests.end()) { + req = rit->second; + } + std::optional req_param; + auto pit = topo.req_param.find(id); + if (pit != topo.req_param.end()) { + req_param = pit->second; + } + co_return node_to_work_on{std::move(guard), &topo, id, &it->second, std::move(req), std::move(req_param)}; + }; + + auto update_replica_state = [this] (node_to_work_on&& node, std::vector&& updates, const sstring& reason) -> future<> { + try { + slogger.trace("raft topology: do update {} reason {}", updates, reason); + topology_change change{std::move(updates)}; + group0_command g0_cmd = _group0->client().prepare_command(std::move(change), node.guard, reason); + co_await _group0->client().add_entry(std::move(g0_cmd), std::move(node.guard)); + } catch (group0_concurrent_modification&) { + slogger.info("raft topology: race while changing state: {}. Retrying", reason); + throw; + } + }; + + auto exec_direct_command_helper = [this, &am, &sys_dist_ks, term] (raft::server_id id, const raft_topology_cmd& cmd) -> future<> { + auto ip = am.find(id); + if (!ip) { + slogger.info("raft topology: cannot send command {} to {} because mapping to ip is not available", cmd.cmd, id); + co_await coroutine::exception(std::make_exception_ptr(std::runtime_error(fmt::format("no ip address mapping for {}", id)))); + } + slogger.trace("raft topology: send {} command to {}/{}", cmd.cmd, id, *ip); + auto result = utils::fb_utilities::is_me(*ip) ? + co_await raft_topology_cmd_handler(sys_dist_ks, term, cmd) : + co_await ser::storage_service_rpc_verbs::send_raft_topology_cmd(&_messaging.local(), netw::msg_addr{*ip}, term, cmd); + if (result.status == raft_topology_cmd_result::command_status::fail) { + co_await coroutine::exception(std::make_exception_ptr(std::runtime_error(fmt::format("failed status returned from {}/{}", id, *ip)))); + } + }; + + auto exec_direct_command = [&exec_direct_command_helper, &release_node, &retake_node, &as] (node_to_work_on&& node, const raft_topology_cmd& cmd) -> future { + auto id = node.id; + release_node(std::move(node)); + co_await exec_direct_command_helper(id, cmd); + co_return co_await retake_node(id, as); + }; + + auto exec_global_command_helper = [&exec_direct_command_helper] (auto nodes, const raft_topology_cmd& cmd) -> future { + auto f = co_await coroutine::as_future(seastar::parallel_for_each(std::move(nodes), [&cmd, &exec_direct_command_helper] (raft::server_id id) { + return exec_direct_command_helper(id, cmd); + })); + if (f.failed()) { + slogger.error("raft topology: send_raft_topology_cmd({}) failed with {}", cmd.cmd, f.get_exception()); + co_return false; + } else { + co_return true; + } + }; + + auto exec_global_command = [my_id = raft.id(), &exec_global_command_helper, &release_node, &retake_node, &as] (node_to_work_on&& node, const raft_topology_cmd& cmd, bool include_local, raft::server_id exclude_node) -> future> { + auto id = node.id; + auto nodes = node.topology->normal_nodes + | boost::adaptors::filtered([my_id, include_local, exclude_node] (const std::pair& n) { + return (include_local || n.first != my_id) && n.first != exclude_node; + }) | boost::adaptors::map_keys; + release_node(std::move(node)); + bool res = co_await exec_global_command_helper(nodes, cmd); + co_return std::make_pair(co_await retake_node(id, as), res); + }; + + auto handle_ring_transition = [&] (node_to_work_on&& node) -> future<> { + raft::server_id replaced_node; + + if (node.rs->state == node_state::replacing) { + replaced_node = std::get(node.req_param.value()); + } + + bool res; + switch (node.rs->ring.value().state) { + case ring_slice::ring_slice::replication_state::write_both_read_old: { + // make sure all nodes know about new topology (we require all nodes to be alive for topo change for now) + std::tie(node, res) = co_await exec_global_command(std::move(node), raft_topology_cmd{raft_topology_cmd::command::barrier}, false, replaced_node); + if (!res) { + break; + } + raft_topology_cmd cmd{raft_topology_cmd::command::stream_ranges}; + if (node.rs->state == node_state::removing) { + // tell all nodes to stream data of the removed node to new range owners + std::tie(node, res) = co_await exec_global_command(std::move(node), cmd, true, replaced_node); + if (!res) { + slogger.error("raft topology: send_raft_topology_cmd(stream_ranges) failed during removenode"); + break; + } + } else { + // Tell joining/leaving/replacing node to stream its ranges + try { + node = co_await exec_direct_command(std::move(node), cmd); + } catch (term_changed_error&) { + throw; + } catch (...) { + slogger.error("raft topology: send_raft_topology_cmd(stream_ranges) failed with exception (node state is {}): {}", node.rs->state, std::current_exception()); + break; + } + } + // Streaming completed. We can now move tokens state to ring_slice::ring_slice::replication_state::write_both_read_new + db::system_keyspace::topology_mutation_builder builder(node.guard.write_timestamp(), node.id); + builder.set("replication_state", ring_slice::replication_state::write_both_read_new); + auto str = fmt::format("{}: streaming completed for node {}", node.rs->state, node.id); + co_await update_replica_state(std::move(node), {builder.build()}, std::move(str)); + } + break; + case ring_slice::replication_state::write_both_read_new: + // In this state writes goes to old and new replicas but reads start to be done from new replicas + // Before we stop writing to old replicas we need to wait for all previous reads to complete + std::tie(node, res) = co_await exec_global_command(std::move(node), raft_topology_cmd{raft_topology_cmd::command::fence_old_reads}, true, replaced_node); + if (!res) { + break; + } + switch(node.rs->state) { + case node_state::bootstrapping: { + db::system_keyspace::topology_mutation_builder builder(node.guard.write_timestamp(), node.id); + builder.set("replication_state", ring_slice::replication_state::owner) + .set("node_state", node_state::normal); + co_await update_replica_state(std::move(node), {builder.build()}, "bootstrap: read fence completed"); + } + break; + case node_state::decommissioning: + case node_state::removing: { + db::system_keyspace::topology_mutation_builder builder(node.guard.write_timestamp(), node.id); + builder.del("replication_state") + .del("tokens") + .set("node_state", node_state::left); + auto str = fmt::format("{}: read fence completed", node.rs->state); + co_await update_replica_state(std::move(node), {builder.build()}, std::move(str)); + } + break; + case node_state::replacing: { + db::system_keyspace::topology_mutation_builder builder1(node.guard.write_timestamp(), node.id); + // Move new node to 'normal' + builder1.set("replication_state", ring_slice::replication_state::owner) + .set("node_state", node_state::normal); + + // Move old node to 'left' + db::system_keyspace::topology_mutation_builder builder2(node.guard.write_timestamp(), replaced_node); + builder2.del("replication_state") + .del("tokens") + .set("node_state", node_state::left); + co_await update_replica_state(std::move(node), {builder1.build(), builder2.build()}, "replace: read fence completed"); + } + break; + default: + on_fatal_internal_error(slogger, format("Ring state on node {} is write_both_read_new while the node is in state {}", node.id, node.rs->state)); + } + // Reads are fenced. We can now move tokens state to ring_slice::replication_state::owner and node to normal + break; + case ring_slice::replication_state::owner: + // should not get here + on_fatal_internal_error(slogger, format("Tried to handle ring state transition on node {} while in 'owner' state", node.id)); + break; + } + }; + + auto handle_node_transition = [&] (node_to_work_on&& node) -> future<> { + slogger.info("raft topology: coordinator fiber found a node to work on id={} state={}", node.id, node.rs->state); + + switch (node.rs->state) { + case node_state::normal: + case node_state::none: { + // if the state is none there have to be either 'join' or 'replace' request + // if the state is normal there have to be either 'leave', 'remove' or 'rebuild' request + db::system_keyspace::topology_mutation_builder builder(node.guard.write_timestamp(), node.id); + switch (node.request.value()) { + case topology_request::join: { + assert(!node.rs->ring); + auto num_tokens = std::get(node.req_param.value()); + // A node just joined and does not have tokens assigned yet + // Need to assign random tokens to the node + auto tmptr = get_token_metadata_ptr(); + auto bootstrap_tokens = boot_strapper::get_random_bootstrap_tokens(tmptr, num_tokens, dht::check_token_endpoint::yes); + + // Write choosen tokens through raft. + builder.set("node_state", node_state::bootstrapping) + .del("topology_request") + .set("tokens", bootstrap_tokens) + .set("replication_state", ring_slice::replication_state::write_both_read_old); + co_await update_replica_state(std::move(node), {builder.build()}, "bootstrap: assign tokens"); + break; + } + case topology_request::leave: + assert(node.rs->ring); + // start decommission and put tokens of decommissioning nodes into write_both_read_old state + // meaning that reads will go to the replica being decommissioned but writes will go to new owner as well + builder.set("node_state", node_state::decommissioning) + .del("topology_request") + .set("replication_state", ring_slice::replication_state::write_both_read_old); + co_await update_replica_state(std::move(node), {builder.build()}, "start decommission"); + break; + case topology_request::remove: + assert(node.rs->ring); + // start removing and put tokens of a node been removed into write_both_read_old state + // meaning that reads will go to the replica being removed (it is dead though) but writes will go to new owner as well + builder.set("node_state", node_state::removing) + .del("topology_request") + .set("replication_state", ring_slice::replication_state::write_both_read_old); + co_await update_replica_state(std::move(node), {builder.build()}, "start removenode"); + break; + case topology_request::replace: { + assert(!node.rs->ring); + auto replaced_id = std::get(node.req_param.value()); + auto it = _topology_state_machine._topology.normal_nodes.find(replaced_id); + assert(it != _topology_state_machine._topology.normal_nodes.end()); + assert(it->second.ring && it->second.state == node_state::normal); + // start replacing and take ownership of the tokens of a node been replaced and put them into write_both_read_old state + // meaning that reads will go to the replica being removed (it is dead though) but writes will go to new owner as well + builder.set("node_state", node_state::replacing) + .del("topology_request") + .set("tokens", it->second.ring->tokens) + .set("replication_state", ring_slice::replication_state::write_both_read_old); + co_await update_replica_state(std::move(node), {builder.build()}, "start replace"); + break; + } + case topology_request::rebuild: { + db::system_keyspace::topology_mutation_builder builder(node.guard.write_timestamp(), node.id); + builder.set("node_state", node_state::rebuilding) + .del("topology_request"); + co_await update_replica_state(std::move(node), {builder.build()}, "start rebuilding"); + break; + } + } + break; + } + case node_state::bootstrapping: + case node_state::decommissioning: + case node_state::removing: + case node_state::replacing: + co_await handle_ring_transition(std::move(node)); + break; + case node_state::rebuilding: { + node = co_await exec_direct_command(std::move(node), raft_topology_cmd{raft_topology_cmd::command::stream_ranges}); + db::system_keyspace::topology_mutation_builder builder(node.guard.write_timestamp(), node.id); + builder.set("node_state", node_state::normal) + .del("rebuild_option"); + co_await update_replica_state(std::move(node), {builder.build()}, "rebuilding completed"); + } + break; + case node_state::left: + // Should not get here + on_fatal_internal_error(slogger, format("Topology coordinator is called for node {} in state 'left'", node.id)); + break; + } + }; + + auto get_node_to_work_on = [this] (abort_source& as) -> future> { + auto guard = co_await _group0->client().start_operation(&as); + + auto& topo = _topology_state_machine._topology; + + const raft::configuration& rconf = _group0->group0_server().get_configuration(); + + if (rconf.current.size() > topo.normal_nodes.size() + topo.new_nodes.size() + topo.transition_nodes.size()) { + // Raft config is larger than the sum of all nodes not in 'left' state. + // Find nodes that 'left' but still in the config and remove them + size_t found = 0; + co_await coroutine::parallel_for_each(topo.left_nodes + | boost::adaptors::filtered([&rconf] (raft::server_id id) { return rconf.contains(id); }), + [&] (raft::server_id id) -> future<> { + found++; + // Remove from group 0 nodes that left. They may failed to do so by themselves + try { + slogger.trace("raft topology: topology coordinator fiber removing {} from the raft since it is in `left` state", id); + co_await _group0->group0_server().modify_config({}, {id}, &as); + } catch (const raft::commit_status_unknown&) { + slogger.trace("raft topology: topology coordinator fiber got unknown status while removing {} from the raft", id); + } + }); + if (!found) { + slogger.warn("raft topology: raft config is larger then sum of all nodes in non left state but no nodes in left state were found"); + } + } + + const std::pair* e = nullptr; + + std::optional req; + if (topo.transition_nodes.size() != 0) { + // If there is a node that is the middle of topology operation continue with it + e = &*topo.transition_nodes.begin(); + } else if (topo.new_nodes.size() != 0) { + // Otherwise check if there is a new node that wants to be joined + e = &*topo.new_nodes.begin(); + req = topo.requests[e->first]; + } else if (!topo.requests.empty()) { + // If there is no new node but request queue is not empty there is a request for normal node + req = topo.requests.begin()->second; + e = &*topo.normal_nodes.find(topo.requests.begin()->first); + } + + if (!e) { + co_return std::nullopt; + } + + std::optional req_param; + auto rit = topo.req_param.find(e->first); + if (rit != topo.req_param.end()) { + req_param = rit->second; + } + co_return node_to_work_on{std::move(guard), &topo, e->first, &e->second, std::move(req), std::move(req_param)}; + }; + + bool wait_for_event = false; + + while (!as.abort_requested()) { + try { + if (wait_for_event) { + slogger.trace("raft topology: topology coordinator fiber has nothing to do. Sleeping."); + co_await _topology_state_machine.event.when(); + slogger.trace("raft topology: topology coordinator fiber got an event"); + wait_for_event = false; + } + + auto node = co_await get_node_to_work_on(as); + + if (!node) { + // No nodes to work on. Wait for topology change event. + wait_for_event = true; + continue; + } + + co_await handle_node_transition(std::move(*node)); + } catch (raft::request_aborted&) { + slogger.debug("raft topology: topology change coordinator fiber aborted"); + } catch (group0_concurrent_modification&) { + } catch (term_changed_error&) { + // Term changed. We may no longer be a leader + slogger.debug("raft topology: topology change coordinator fiber notices term change {} -> {}", term, raft.get_current_term()); + } catch (...) { + slogger.error("raft topology: topology change coordinator fiber got error {}", std::current_exception()); + } + co_await coroutine::maybe_yield(); + } +} + +future<> storage_service::raft_state_monitor_fiber(raft::server& raft, sharded& sys_dist_ks) { + std::optional as; + try { + while (!_abort_source.abort_requested()) { + // Wait for a state change in case we are not a leader yet, or we are are the leader + // and coordinator work is running (in which case 'as' is engaged) + while (!raft.is_leader() || as) { + co_await raft.wait_for_state_change(&_abort_source); + if (as) { + as->request_abort(); // we are no longer a leader, so abort the coordinator + co_await std::exchange(_topology_change_coordinator, make_ready_future<>()); + as = std::nullopt; + } + } + // We are the leader now but that can change any time! + as.emplace(); + // start topology change coordinator in the background + _topology_change_coordinator = topology_change_coordinator_fiber(raft, raft.get_current_term(), sys_dist_ks, *as); + } + } catch (...) { + slogger.info("raft_state_monitor_fiber aborted with {}", std::current_exception()); + } + if (as) { + as->request_abort(); // abort current coordinator if running + co_await std::move(_topology_change_coordinator); + } +} + + +future<> storage_service::raft_replace(raft::server& raft_server, raft::server_id replaced_id, gms::inet_address replaced_ip) { + // Read barrier to access the latest topology. Quorum of nodes has to be alive. + co_await raft_server.read_barrier(&_abort_source); + + auto it = _topology_state_machine._topology.find(raft_server.id()); + if (it && it->second.state != node_state::replacing) { + throw std::runtime_error(fmt::format("Cannot do \"replace address\" operation with a node that is in state: {}", it->second.state)); + } + + // add myself to topology with request to replace + while (!_topology_state_machine._topology.contains(raft_server.id())) { + slogger.info("raft topology: adding myself to topology for replace: {} replacing {}", raft_server.id(), replaced_id); + auto guard = co_await _group0->client().start_operation(&_abort_source); + + auto it = _topology_state_machine._topology.normal_nodes.find(replaced_id); + if (it == _topology_state_machine._topology.normal_nodes.end()) { + throw std::runtime_error(fmt::format("Cannot replace node {}/{} because it is not in the 'normal' state", replaced_ip, replaced_id)); + } + + auto& rs = it->second; + db::system_keyspace::topology_mutation_builder builder(guard.write_timestamp(), raft_server.id()); + builder.set("node_state", node_state::none) + .set("datacenter", rs.datacenter) + .set("rack", rs.rack) + .set("release_version", version::release()) + .set("topology_request", topology_request::replace) + .set("replaced_id", replaced_id) + .set("num_tokens", _db.local().get_config().num_tokens()); + topology_change change{{builder.build()}}; + group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, fmt::format("replace {}/{}: add myself ({}) to topology", replaced_id, replaced_ip, raft_server.id())); + try { + co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_abort_source); + } catch (group0_concurrent_modification&) { + slogger.info("raft topology: replace: concurrent operation is detected, retrying."); + } + } + + co_return; +} + +future<> storage_service::raft_bootstrap(raft::server& raft_server) { + // We try to find ourself in the topology without doing read barrier + // first to not require quorum of live nodes during regular boot. But + // if we are not in the topology it either means this is the first boot + // or we failed during bootstrap so do a read barrier (which requires + // quorum to be alive) and re-check. + if (!_topology_state_machine._topology.contains(raft_server.id())) { + co_await raft_server.read_barrier(&_abort_source); + } + + while (!_topology_state_machine._topology.contains(raft_server.id())) { + slogger.info("raft topology: adding myself to topology: {}", raft_server.id()); + // Current topology does not contains this node. Bootstrap is needed! + auto guard = co_await _group0->client().start_operation(&_abort_source); + db::system_keyspace::topology_mutation_builder builder(guard.write_timestamp(), raft_server.id()); + builder.set("node_state", node_state::none) + .set("datacenter", _snitch.local()->get_datacenter()) + .set("rack", _snitch.local()->get_rack()) + .set("release_version", version::release()) + .set("topology_request", topology_request::join) + .set("num_tokens", _db.local().get_config().num_tokens()); + topology_change change{{builder.build()}}; + group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, "bootstrap: add myself to topology"); + try { + co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_abort_source); + } catch (group0_concurrent_modification&) { + slogger.info("raft topology: bootstrap: concurrent operation is detected, retrying."); + } + } +} + future<> storage_service::join_token_ring(cdc::generation_service& cdc_gen_service, sharded& sys_dist_ks, sharded& proxy, @@ -327,21 +970,23 @@ future<> storage_service::join_token_ring(cdc::generation_service& cdc_gen_servi throw std::runtime_error("Cannot replace address with a node that is already bootstrapped"); } ri = co_await prepare_replacement_info(initial_contact_nodes, loaded_peer_features); - bootstrap_tokens = std::move(ri->tokens); replace_address = ri->address; - replacing_a_node_with_same_ip = *replace_address == get_broadcast_address(); - replacing_a_node_with_diff_ip = *replace_address != get_broadcast_address(); - - slogger.info("Replacing a node with {} IP address, my address={}, node being replaced={}", - get_broadcast_address() == *replace_address ? "the same" : "a different", - get_broadcast_address(), *replace_address); - tmptr->update_topology(*replace_address, std::move(ri->dc_rack)); - co_await tmptr->update_normal_tokens(bootstrap_tokens, *replace_address); - replaced_host_id = ri->host_id; raft_replace_info = raft_group0::replace_info { .ip_addr = *replace_address, .raft_id = raft::server_id{ri->host_id.uuid()}, }; + if (!_raft_topology_change_enabled) { + bootstrap_tokens = std::move(ri->tokens); + replacing_a_node_with_same_ip = *replace_address == get_broadcast_address(); + replacing_a_node_with_diff_ip = *replace_address != get_broadcast_address(); + + slogger.info("Replacing a node with {} IP address, my address={}, node being replaced={}", + get_broadcast_address() == *replace_address ? "the same" : "a different", + get_broadcast_address(), *replace_address); + tmptr->update_topology(*replace_address, std::move(ri->dc_rack)); + co_await tmptr->update_normal_tokens(bootstrap_tokens, *replace_address); + replaced_host_id = ri->host_id; + } } else if (should_bootstrap()) { co_await check_for_endpoint_collision(initial_contact_nodes, loaded_peer_features); } else { @@ -454,6 +1099,21 @@ future<> storage_service::join_token_ring(cdc::generation_service& cdc_gen_servi assert(_group0); co_await _group0->setup_group0(_sys_ks.local(), initial_contact_nodes, raft_replace_info); + raft::server* raft_server = co_await [this] () -> future { + if (!_raft_topology_change_enabled) { + co_return nullptr; + } else if (_sys_ks.local().bootstrap_complete()) { + auto [upgrade_lock_holder, upgrade_state] = co_await _group0->client().get_group0_upgrade_state(); + co_return upgrade_state == group0_upgrade_state::use_post_raft_procedures ? &_group0->group0_server() : nullptr; + } else { + auto upgrade_state = (co_await _group0->client().get_group0_upgrade_state()).second; + if (upgrade_state != group0_upgrade_state::use_post_raft_procedures) { + on_internal_error(slogger, "raft topology: cluster not upgraded to use group 0 after setup_group0"); + } + co_return &_group0->group0_server(); + } + } (); + auto schema_change_announce = _db.local().observable_schema_version().observe([this] (table_schema_version schema_version) mutable { _migration_manager.local().passive_announce(std::move(schema_version)); }); @@ -462,6 +1122,49 @@ future<> storage_service::join_token_ring(cdc::generation_service& cdc_gen_servi set_mode(mode::JOINING); + if (raft_server) { // Raft is enabled. Check if we need to bootstrap ourself using raft + slogger.info("topology changes are using raft"); + + // start topology coordinator fiber + _raft_state_monitor = raft_state_monitor_fiber(*raft_server, sys_dist_ks); + + // Need to start system_distributed_keyspace before bootstrap because bootstraping + // process may access those tables. + supervisor::notify("starting system distributed keyspace"); + co_await sys_dist_ks.invoke_on_all(&db::system_distributed_keyspace::start); + + if (is_replacing()) { + assert(raft_replace_info); + co_await raft_replace(*raft_server, raft_replace_info->raft_id, raft_replace_info->ip_addr); + } else { + co_await raft_bootstrap(*raft_server); + } + + // Wait until we enter one of the final states + co_await _topology_state_machine.event.when([this, raft_server] { + return _topology_state_machine._topology.normal_nodes.contains(raft_server->id()) || + _topology_state_machine._topology.left_nodes.contains(raft_server->id()); + }); + + if (_topology_state_machine._topology.left_nodes.contains(raft_server->id())) { + throw std::runtime_error("A node that already left the cluster cannot be restarted"); + } + + // Node state is enough to know that bootstrap has completed, but to make legacy code happy + // let it know that the bootstrap is completed as well + co_await _sys_ks.local().set_bootstrap_state(db::system_keyspace::bootstrap_state::COMPLETED); + set_mode(mode::NORMAL); + + if (get_token_metadata().sorted_tokens().empty()) { + auto err = format("join_token_ring: Sorted token in token_metadata is empty"); + slogger.error("{}", err); + throw std::runtime_error(err); + } + + co_await _group0->finish_setup_after_join(); + co_return; + } + // We bootstrap if we haven't successfully bootstrapped before, as long as we are not a seed. // If we are a seed, or if the user manually sets auto_bootstrap to false, // we'll skip streaming data from other nodes and jump directly into the ring. @@ -837,6 +1540,12 @@ future<> storage_service::handle_state_bootstrap(inet_address endpoint) { future<> storage_service::handle_state_normal(inet_address endpoint) { slogger.debug("endpoint={} handle_state_normal", endpoint); + + if (_raft_topology_change_enabled) { + slogger.debug("ignore handle_state_normal since topology change are using raft"); + co_return; + } + auto tokens = get_tokens_for(endpoint); slogger.debug("Node {} state normal, token {}", endpoint, tokens); @@ -1036,6 +1745,12 @@ future<> storage_service::handle_state_leaving(inet_address endpoint) { } future<> storage_service::handle_state_left(inet_address endpoint, std::vector pieces) { + + if (_raft_topology_change_enabled) { + slogger.debug("ignore handle_state_left since topology change are using raft"); + co_return; + } + slogger.debug("endpoint={} handle_state_left", endpoint); if (pieces.size() < 2) { slogger.warn("Fail to handle_state_left endpoint={} pieces={}", endpoint, pieces); @@ -1394,6 +2109,8 @@ future<> storage_service::join_cluster(cdc::generation_service& cdc_gen_service, assert(this_shard_id() == 0); _group0 = &group0; + _raft_topology_change_enabled = _group0->is_raft_enabled() && _db.local().get_config().check_experimental(db::experimental_features_t::feature::RAFT); + return seastar::async([this, &cdc_gen_service, &sys_dist_ks, &proxy] { set_mode(mode::STARTING); @@ -1550,8 +2267,9 @@ future<> storage_service::stop() { // make sure nobody uses the semaphore node_ops_signal_abort(std::nullopt); _listeners.clear(); + _topology_state_machine.event.broken(make_exception_ptr(abort_requested_exception())); co_await _async_gate.close(); - co_await std::move(_node_ops_abort_thread); + co_await when_all(std::move(_node_ops_abort_thread), std::move(_raft_state_monitor)); } future<> storage_service::check_for_endpoint_collision(std::unordered_set initial_contact_nodes, const std::unordered_map& loaded_peer_features) { @@ -1571,7 +2289,9 @@ future<> storage_service::check_for_endpoint_collision(std::unordered_set throw std::runtime_error(format("Cannot replace_address {} because it has left the ring, status={}", replace_address, status)); } - auto tokens = get_tokens_for(replace_address); - if (tokens.empty()) { - throw std::runtime_error(format("Could not find tokens for {} to replace", replace_address)); + std::unordered_set tokens; + if (!_raft_topology_change_enabled) { + tokens = get_tokens_for(replace_address); + if (tokens.empty()) { + throw std::runtime_error(format("Could not find tokens for {} to replace", replace_address)); + } } auto dc_rack = get_dc_rack_for(replace_address); @@ -2088,103 +2811,155 @@ void on_streaming_finished() { utils::get_local_injector().inject("storage_service_streaming_sleep3", std::chrono::seconds{3}).get(); } +future<> storage_service::raft_decomission() { + auto& raft_server = _group0->group0_server(); + + while (true) { + auto guard = co_await _group0->client().start_operation(&_abort_source); + + auto it = _topology_state_machine._topology.find(raft_server.id()); + if (!it) { + throw std::runtime_error(fmt::format("local node {} is not a member of the cluster", raft_server.id())); + } + + const auto& rs = it->second; + + if (rs.state != node_state::normal) { + throw std::runtime_error(fmt::format("local node is not in the normal state (current state: {})", rs.state)); + } + + if (_topology_state_machine._topology.normal_nodes.size() == 1) { + throw std::runtime_error("Cannot decomission last node in the cluster"); + } + + slogger.info("raft topology: request decomission for: {}", raft_server.id()); + db::system_keyspace::topology_mutation_builder builder(guard.write_timestamp(), raft_server.id()); + builder.set("topology_request", topology_request::leave); + topology_change change{{builder.build()}}; + group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, fmt::format("decomission: request decomission for {}", raft_server.id())); + + try { + co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_abort_source); + } catch (group0_concurrent_modification&) { + slogger.info("raft topology: decomission: concurrent operation is detected, retrying."); + continue; + } + break; + } + + // Wait until we enter left state + co_await _topology_state_machine.event.when([this, &raft_server] { + return _topology_state_machine._topology.left_nodes.contains(raft_server.id()); + }); + + // Need to set it otherwise gossiper will try to send shutdown on exit + co_await _gossiper.add_local_application_state({{ gms::application_state::STATUS, gms::versioned_value::left({}, _gossiper.now().time_since_epoch().count()) }}); +} + future<> storage_service::decommission() { return run_with_api_lock(sstring("decommission"), [] (storage_service& ss) { return seastar::async([&ss] { - auto& db = ss._db.local(); - node_ops_ctl ctl(ss, node_ops_cmd::decommission_prepare, db.get_config().host_id, ss.get_broadcast_address()); - auto stop_ctl = deferred_stop(ctl); - auto tmptr = ss.get_token_metadata_ptr(); - const auto& uuid = ctl.uuid(); - auto endpoint = ctl.endpoint; - if (!tmptr->is_normal_token_owner(endpoint)) { - throw std::runtime_error("local node is not a member of the token ring yet"); - } - // We assume that we're a member of group 0 if we're in decommission()` and Raft is enabled. - // We have no way to check that we're not a member: attempting to perform group 0 operations - // would simply hang in that case, the leader would refuse to talk to us. - // If we aren't a member then we shouldn't be here anyway, since it means that either - // an earlier decommission finished (leave_group0 is the last operation in decommission) - // or that we were removed using `removenode`. - // - // For handling failure scenarios such as a group 0 member that is not a token ring member, - // there's `removenode`. - - auto temp = tmptr->clone_after_all_left().get0(); - auto num_tokens_after_all_left = temp.sorted_tokens().size(); - temp.clear_gently().get(); - if (num_tokens_after_all_left < 2) { - throw std::runtime_error("no other normal nodes in the ring; decommission would be pointless"); - } - - if (ss._operation_mode != mode::NORMAL) { - throw std::runtime_error(format("Node in {} state; wait for status to become normal or restart", ss._operation_mode)); - } - - ss.update_pending_ranges(format("decommission {}", endpoint)).get(); - - auto non_system_keyspaces = db.get_non_local_strategy_keyspaces(); - for (const auto& keyspace_name : non_system_keyspaces) { - if (ss.get_token_metadata().has_pending_ranges(keyspace_name, ss.get_broadcast_address())) { - throw std::runtime_error("data is currently moving to this node; unable to leave the ring"); - } - } - - slogger.info("DECOMMISSIONING: starts"); - ctl.req.leaving_nodes = std::list{endpoint}; - // TODO: wire ignore_nodes provided by user - - // Step 1: Decide who needs to sync data - for (const auto& [node, host_id] : tmptr->get_endpoint_to_host_id_map_for_reading()) { - seastar::thread::maybe_yield(); - if (!ctl.ignore_nodes.contains(node)) { - ctl.sync_nodes.insert(node); - } - } - - ctl.start("decommission"); - - assert(ss._group0); - bool raft_available = ss._group0->wait_for_raft().get(); + bool raft_available = false; bool left_token_ring = false; - - try { - // Step 2: Start heartbeat updater - ctl.start_heartbeat_updater(node_ops_cmd::decommission_heartbeat); - - // Step 3: Prepare to sync data - ctl.prepare(node_ops_cmd::decommission_prepare).get(); - - // Step 4: Start to sync data - slogger.info("DECOMMISSIONING: unbootstrap starts"); - ss.unbootstrap().get(); - on_streaming_finished(); - slogger.info("DECOMMISSIONING: unbootstrap done"); - - // Step 5: Become a group 0 non-voter before leaving the token ring. + auto uuid = node_ops_id::create_random_id(); + if (ss._raft_topology_change_enabled) { + ss.raft_decomission().get(); + raft_available = true; + } else { + auto& db = ss._db.local(); + node_ops_ctl ctl(ss, node_ops_cmd::decommission_prepare, db.get_config().host_id, ss.get_broadcast_address()); + auto stop_ctl = deferred_stop(ctl); + auto tmptr = ss.get_token_metadata_ptr(); + uuid = ctl.uuid(); + auto endpoint = ctl.endpoint; + if (!tmptr->is_normal_token_owner(endpoint)) { + throw std::runtime_error("local node is not a member of the token ring yet"); + } + // We assume that we're a member of group 0 if we're in decommission()` and Raft is enabled. + // We have no way to check that we're not a member: attempting to perform group 0 operations + // would simply hang in that case, the leader would refuse to talk to us. + // If we aren't a member then we shouldn't be here anyway, since it means that either + // an earlier decommission finished (leave_group0 is the last operation in decommission) + // or that we were removed using `removenode`. // - // Thanks to this, even if we fail after leaving the token ring but before leaving group 0, - // group 0's availability won't be reduced. - if (raft_available) { - slogger.info("decommission[{}]: becoming a group 0 non-voter", uuid); - ss._group0->become_nonvoter().get(); - slogger.info("decommission[{}]: became a group 0 non-voter", uuid); + // For handling failure scenarios such as a group 0 member that is not a token ring member, + // there's `removenode`. + + auto temp = tmptr->clone_after_all_left().get0(); + auto num_tokens_after_all_left = temp.sorted_tokens().size(); + temp.clear_gently().get(); + if (num_tokens_after_all_left < 2) { + throw std::runtime_error("no other normal nodes in the ring; decommission would be pointless"); } - // Step 6: Verify that other nodes didn't abort in the meantime. - // See https://github.com/scylladb/scylladb/issues/12989. - ctl.query_pending_op().get(); + if (ss._operation_mode != mode::NORMAL) { + throw std::runtime_error(format("Node in {} state; wait for status to become normal or restart", ss._operation_mode)); + } - // Step 7: Leave the token ring - slogger.info("decommission[{}]: leaving token ring", uuid); - ss.leave_ring().get(); - left_token_ring = true; - slogger.info("decommission[{}]: left token ring", uuid); + ss.update_pending_ranges(format("decommission {}", endpoint)).get(); - // Step 8: Finish token movement - ctl.done(node_ops_cmd::decommission_done).get(); - } catch (...) { - ctl.abort_on_error(node_ops_cmd::decommission_abort, std::current_exception()).get(); + auto non_system_keyspaces = db.get_non_local_strategy_keyspaces(); + for (const auto& keyspace_name : non_system_keyspaces) { + if (ss.get_token_metadata().has_pending_ranges(keyspace_name, ss.get_broadcast_address())) { + throw std::runtime_error("data is currently moving to this node; unable to leave the ring"); + } + } + + slogger.info("DECOMMISSIONING: starts"); + ctl.req.leaving_nodes = std::list{endpoint}; + // TODO: wire ignore_nodes provided by user + + // Step 1: Decide who needs to sync data + for (const auto& [node, host_id] : tmptr->get_endpoint_to_host_id_map_for_reading()) { + seastar::thread::maybe_yield(); + if (!ctl.ignore_nodes.contains(node)) { + ctl.sync_nodes.insert(node); + } + } + + ctl.start("decommission"); + + assert(ss._group0); + raft_available = ss._group0->wait_for_raft().get(); + + try { + // Step 2: Start heartbeat updater + ctl.start_heartbeat_updater(node_ops_cmd::decommission_heartbeat); + + // Step 3: Prepare to sync data + ctl.prepare(node_ops_cmd::decommission_prepare).get(); + + // Step 4: Start to sync data + slogger.info("DECOMMISSIONING: unbootstrap starts"); + ss.unbootstrap().get(); + on_streaming_finished(); + slogger.info("DECOMMISSIONING: unbootstrap done"); + + // Step 5: Become a group 0 non-voter before leaving the token ring. + // + // Thanks to this, even if we fail after leaving the token ring but before leaving group 0, + // group 0's availability won't be reduced. + if (raft_available) { + slogger.info("decommission[{}]: becoming a group 0 non-voter", uuid); + ss._group0->become_nonvoter().get(); + slogger.info("decommission[{}]: became a group 0 non-voter", uuid); + } + + // Step 6: Verify that other nodes didn't abort in the meantime. + // See https://github.com/scylladb/scylladb/issues/12989. + ctl.query_pending_op().get(); + + // Step 7: Leave the token ring + slogger.info("decommission[{}]: leaving token ring", uuid); + ss.leave_ring().get(); + left_token_ring = true; + slogger.info("decommission[{}]: left token ring", uuid); + + // Step 8: Finish token movement + ctl.done(node_ops_cmd::decommission_done).get(); + } catch (...) { + ctl.abort_on_error(node_ops_cmd::decommission_abort, std::current_exception()).get(); + } } // Step 8: Leave group 0 @@ -2199,24 +2974,30 @@ future<> storage_service::decommission() { if (raft_available && left_token_ring) { slogger.info("decommission[{}]: leaving Raft group 0", uuid); assert(ss._group0); - ss._group0->leave_group0().get(); + try { + ss._group0->leave_group0().get(); + } catch (raft::not_a_member& err) { + slogger.info("DECOMMISSIONING: already removed from the raft config by the topology coordinator"); + } slogger.info("decommission[{}]: left Raft group 0", uuid); } } catch (...) { - // Even though leave_group0 failed, we will finish decommission and shut down everything. - // There's nothing smarter we could do. We should not continue operating in this broken - // state (we're not a member of the token ring any more). - // - // If we didn't manage to leave group 0, we will stay as a non-voter - // (which is not too bad - non-voters at least do not reduce group 0's availability). - // It's possible to remove the garbage member using `removenode`. - slogger.error( - "decommission[{}]: FAILED when trying to leave Raft group 0: \"{}\". This node" - " is no longer a member of the token ring, so it will finish shutting down its services." - " It may still be a member of Raft group 0. To remove it, shut it down and use `removenode`." - " Consult the `decommission` and `removenode` documentation for more details.", - ctl.uuid(), std::current_exception()); - leave_group0_ex = std::current_exception(); + if (!ss._raft_topology_change_enabled) { + // Even though leave_group0 failed, we will finish decommission and shut down everything. + // There's nothing smarter we could do. We should not continue operating in this broken + // state (we're not a member of the token ring any more). + // + // If we didn't manage to leave group 0, we will stay as a non-voter + // (which is not too bad - non-voters at least do not reduce group 0's availability). + // It's possible to remove the garbage member using `removenode`. + slogger.error( + "decommission[{}]: FAILED when trying to leave Raft group 0: \"{}\". This node" + " is no longer a member of the token ring, so it will finish shutting down its services." + " It may still be a member of Raft group 0. To remove it, shut it down and use `removenode`." + " Consult the `decommission` and `removenode` documentation for more details.", + uuid, std::current_exception()); + leave_group0_ex = std::current_exception(); + } } ss.stop_transport().get(); @@ -2384,9 +3165,73 @@ void storage_service::run_replace_ops(std::unordered_set& bootstrap_token } } +future<> storage_service::raft_removenode(locator::host_id host_id) { + auto id = raft::server_id{host_id.uuid()}; + while (true) { + auto guard = co_await _group0->client().start_operation(&_abort_source); + + auto it = _topology_state_machine._topology.find(id); + + if (!it) { + throw std::runtime_error(format("raft topology removenode: host id {} is not found in the cluster", host_id)); + } + + auto& rs = it->second; // not usable after yeild + + if (rs.state != node_state::normal) { + throw std::runtime_error(format("raft topology removenode: node {} is in '{}' state. Wait for it to be in 'normal' state", id, rs.state)); + } + const auto& am = _group0->address_map(); + auto ip = am.find(id); + if (!ip) { + // What to do if there is no mapping? Wait and retry? + on_fatal_internal_error(slogger, format("Remove node cannot find a mapping from node id {} to its ip", id)); + } + + if (_gossiper.is_alive(*ip)) { + const std::string message = format( + "raft topology removenode: Rejected removenode operation for node {} ip {} " + "the node being removed is alive, maybe you should use decommission instead?", + id, *ip); + slogger.warn(std::string_view(message)); + throw std::runtime_error(message); + } + + slogger.info("raft topology: request removenode for: {}", id); + db::system_keyspace::topology_mutation_builder builder(guard.write_timestamp(), id); + builder.set("topology_request", topology_request::remove); + topology_change change{{builder.build()}}; + group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, fmt::format("removenode: request remove for {}", id)); + + try { + co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_abort_source); + } catch (group0_concurrent_modification&) { + slogger.info("raft topology: removenode: concurrent operation is detected, retrying."); + continue; + } + + break; + } + + // Wait the node we are removing to enter left state + co_await _topology_state_machine.event.when([this, id] { + return _topology_state_machine._topology.left_nodes.contains(id); + }); + + try { + co_await _group0->remove_from_raft_config(id); + } catch (raft::not_a_member&) { + slogger.info("raft topology removenode: already removed from the raft config by the topology coordinator"); + } +} + future<> storage_service::removenode(locator::host_id host_id, std::list ignore_nodes_params) { return run_with_api_lock(sstring("removenode"), [host_id, ignore_nodes_params = std::move(ignore_nodes_params)] (storage_service& ss) mutable { return seastar::async([&ss, host_id, ignore_nodes_params = std::move(ignore_nodes_params)] () mutable { + if (ss._raft_topology_change_enabled) { + ss.raft_removenode(host_id).get(); + return; + } auto uuid = node_ops_id::create_random_id(); auto tmptr = ss.get_token_metadata_ptr(); auto endpoint_opt = tmptr->get_endpoint_for_host_id(host_id); @@ -2852,31 +3697,78 @@ future<> storage_service::do_drain() { co_await _sys_ks.invoke_on_all(&db::system_keyspace::shutdown); } +future<> storage_service::raft_rebuild(sstring source_dc) { + auto& raft_server = _group0->group0_server(); + + while (true) { + auto guard = co_await _group0->client().start_operation(&_abort_source); + + auto it = _topology_state_machine._topology.find(raft_server.id()); + if (!it) { + throw std::runtime_error(fmt::format("local node {} is not a member of the cluster", raft_server.id())); + } + + const auto& rs = it->second; + + if (rs.state != node_state::normal) { + throw std::runtime_error(fmt::format("local node is not in the normal state (current state: {})", rs.state)); + } + + if (_topology_state_machine._topology.normal_nodes.size() == 1) { + throw std::runtime_error("Cannot rebuild a single node"); + } + + slogger.info("raft topology: request rebuild for: {}", raft_server.id()); + db::system_keyspace::topology_mutation_builder builder(guard.write_timestamp(), raft_server.id()); + builder.set("topology_request", topology_request::rebuild) + .set("rebuild_option", source_dc); + topology_change change{{builder.build()}}; + group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, fmt::format("rebuild: request rebuild for {} ({})", raft_server.id(), source_dc)); + + try { + co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_abort_source); + } catch (group0_concurrent_modification&) { + slogger.info("raft topology: rebuild: concurrent operation is detected, retrying."); + continue; + } + break; + } + + // Wait until rebuild completes. We know it completes when the request parameter is empty + co_await _topology_state_machine.event.when([this, &raft_server] { + return !_topology_state_machine._topology.req_param.contains(raft_server.id()); + }); +} + future<> storage_service::rebuild(sstring source_dc) { return run_with_api_lock(sstring("rebuild"), [source_dc] (storage_service& ss) -> future<> { - slogger.info("rebuild from dc: {}", source_dc == "" ? "(any dc)" : source_dc); - auto tmptr = ss.get_token_metadata_ptr(); - if (ss.is_repair_based_node_ops_enabled(streaming::stream_reason::rebuild)) { - co_await ss._repair.local().rebuild_with_repair(tmptr, std::move(source_dc)); + if (ss._raft_topology_change_enabled) { + co_await ss.raft_rebuild(source_dc); } else { - auto streamer = make_lw_shared(ss._db, ss._stream_manager, tmptr, ss._abort_source, - ss.get_broadcast_address(), ss._sys_ks.local().local_dc_rack(), "Rebuild", streaming::stream_reason::rebuild); - streamer->add_source_filter(std::make_unique(ss._gossiper.get_unreachable_members())); - if (source_dc != "") { - streamer->add_source_filter(std::make_unique(source_dc)); - } - auto ks_erms = ss._db.local().get_non_local_strategy_keyspaces_erms(); - for (const auto& [keyspace_name, erm] : ks_erms) { - co_await streamer->add_ranges(keyspace_name, erm, ss.get_ranges_for_endpoint(erm, utils::fb_utilities::get_broadcast_address()), ss._gossiper, false); - } - try { - co_await streamer->stream_async(); - slogger.info("Streaming for rebuild successful"); - } catch (...) { - auto ep = std::current_exception(); - // This is used exclusively through JMX, so log the full trace but only throw a simple RTE - slogger.warn("Error while rebuilding node: {}", ep); - std::rethrow_exception(std::move(ep)); + slogger.info("rebuild from dc: {}", source_dc == "" ? "(any dc)" : source_dc); + auto tmptr = ss.get_token_metadata_ptr(); + if (ss.is_repair_based_node_ops_enabled(streaming::stream_reason::rebuild)) { + co_await ss._repair.local().rebuild_with_repair(tmptr, std::move(source_dc)); + } else { + auto streamer = make_lw_shared(ss._db, ss._stream_manager, tmptr, ss._abort_source, + ss.get_broadcast_address(), ss._sys_ks.local().local_dc_rack(), "Rebuild", streaming::stream_reason::rebuild); + streamer->add_source_filter(std::make_unique(ss._gossiper.get_unreachable_members())); + if (source_dc != "") { + streamer->add_source_filter(std::make_unique(source_dc)); + } + auto ks_erms = ss._db.local().get_non_local_strategy_keyspaces_erms(); + for (const auto& [keyspace_name, erm] : ks_erms) { + co_await streamer->add_ranges(keyspace_name, erm, ss.get_ranges_for_endpoint(erm, utils::fb_utilities::get_broadcast_address()), ss._gossiper, false); + } + try { + co_await streamer->stream_async(); + slogger.info("Streaming for rebuild successful"); + } catch (...) { + auto ep = std::current_exception(); + // This is used exclusively through JMX, so log the full trace but only throw a simple RTE + slogger.warn("Error while rebuilding node: {}", ep); + std::rethrow_exception(std::move(ep)); + } } } }); @@ -3393,6 +4285,187 @@ future<> storage_service::snitch_reconfigured() { } } +future storage_service::raft_topology_cmd_handler(sharded& sys_dist_ks, raft::term_t term, const raft_topology_cmd& cmd) { + raft_topology_cmd_result result; + slogger.trace("raft topology: topology cmd rpc {} is called", cmd.cmd); + + // The retrier does: + // If no operation was previously started - start it now + // If previous operation still running - wait for it an return its result + // If previous operation completed sucessfully - return immediately + // If previous opertaion failed - restart it + auto retrier = [] (std::optional>& f, auto&& func) -> future<> { + if (!f || f->failed()) { + if (f) { + slogger.info("raft topology: retry streaming after previous attempt failed with {}", f->get_future().get_exception()); + } else { + slogger.info("raft topology: start streaming"); + } + f = func(); + } else { + slogger.debug("raft topology: already streaming"); + } + co_await f.value().get_future(); + slogger.info("raft topology: streaming completed"); + }; + + try { + auto& raft_server = _group0->group0_server(); + // do barrier to make sure we always see the latest topology + co_await raft_server.read_barrier(&_abort_source); + if (raft_server.get_current_term() != term) { + // Return an error since the command is from outdated leader + co_return result; + } + switch (cmd.cmd) { + case raft_topology_cmd::command::barrier: + // we already did read barrier above + result.status = raft_topology_cmd_result::command_status::success; + break; + case raft_topology_cmd::command::stream_ranges: { + const auto& rs = _topology_state_machine._topology.find(raft_server.id())->second; + if (!rs.ring || + (rs.ring->state != ring_slice::replication_state::write_both_read_old && rs.state != node_state::normal && rs.state != node_state::rebuilding)) { + slogger.warn("raft topology: got stream_ranges request while my tokens state is {} and node state is {}", + rs.ring ? fmt::format("{}", rs.ring->state) : "'missing ring'", rs.state); + break; + } + switch(rs.state) { + case node_state::bootstrapping: + case node_state::replacing: { + set_mode(mode::BOOTSTRAP); + // See issue #4001 + co_await mark_existing_views_as_built(sys_dist_ks); + co_await _db.invoke_on_all([] (replica::database& db) { + for (auto& cf : db.get_non_system_column_families()) { + cf->notify_bootstrap_or_replace_start(); + } + }); + if (rs.state == node_state::bootstrapping) { + if (!_topology_state_machine._topology.normal_nodes.empty()) { // stream only if there is a node in normal state + co_await retrier(_bootstrap_result, coroutine::lambda([&] () -> future<> { + if (is_repair_based_node_ops_enabled(streaming::stream_reason::bootstrap)) { + co_await _repair.local().bootstrap_with_repair(get_token_metadata_ptr(), rs.ring.value().tokens); + } else { + dht::boot_strapper bs(_db, _stream_manager, _abort_source, get_broadcast_address(), + locator::endpoint_dc_rack{rs.datacenter, rs.rack}, rs.ring.value().tokens, get_token_metadata_ptr()); + co_await bs.bootstrap(streaming::stream_reason::bootstrap, _gossiper); + } + })); + } + // Bootstrap did not complete yet, but streaming did + } else { + co_await retrier(_bootstrap_result, coroutine::lambda([&] () ->future<> { + if (is_repair_based_node_ops_enabled(streaming::stream_reason::replace)) { + co_await _repair.local().replace_with_repair(get_token_metadata_ptr(), rs.ring.value().tokens, {}); + } else { + dht::boot_strapper bs(_db, _stream_manager, _abort_source, get_broadcast_address(), + locator::endpoint_dc_rack{rs.datacenter, rs.rack}, rs.ring.value().tokens, get_token_metadata_ptr()); + assert(_topology_state_machine._topology.req_param.contains(raft_server.id())); + auto replaced_id = std::get(_topology_state_machine._topology.req_param[raft_server.id()]); + auto existing_ip = _group0->address_map().find(replaced_id); + assert(existing_ip); + co_await bs.bootstrap(streaming::stream_reason::replace, _gossiper, *existing_ip); + } + })); + } + co_await _db.invoke_on_all([] (replica::database& db) { + for (auto& cf : db.get_non_system_column_families()) { + cf->notify_bootstrap_or_replace_end(); + } + }); + result.status = raft_topology_cmd_result::command_status::success; + } + break; + case node_state::decommissioning: + co_await retrier(_decomission_result, coroutine::lambda([&] () { return unbootstrap(); })); + result.status = raft_topology_cmd_result::command_status::success; + break; + case node_state::normal: { + // If asked to stream a node in normal state it means that remove operation is running + // Find the node that is been removed + auto it = boost::find_if(_topology_state_machine._topology.transition_nodes, [] (auto& e) { return e.second.state == node_state::removing; }); + if (it == _topology_state_machine._topology.transition_nodes.end()) { + slogger.warn("raft topology: got stream_ranges request while my state is normal but cannot find a node that is been removed"); + break; + } + slogger.debug("raft topology: streaming to remove node {}", it->first); + const auto& am = _group0->address_map(); + auto ip = am.find(it->first); // map node id to ip + assert (ip); // what to do if address is unknown? + co_await retrier(_remove_result[it->first], coroutine::lambda([&] () { + auto as = make_shared(); + auto sub = _abort_source.subscribe([as] () noexcept { + if (!as->abort_requested()) { + as->request_abort(); + } + }); + if (is_repair_based_node_ops_enabled(streaming::stream_reason::removenode)) { + auto ops = seastar::make_shared(node_ops_id::create_random_id(), as, std::list()); + // FIXME: ignore node list support + return _repair.local().removenode_with_repair(get_token_metadata_ptr(), *ip, ops); + } else { + return removenode_with_stream(*ip, as); + } + })); + result.status = raft_topology_cmd_result::command_status::success; + } + break; + case node_state::rebuilding: { + auto source_dc = std::get(_topology_state_machine._topology.req_param[raft_server.id()]); + slogger.info("raft topology: rebuild from dc: {}", source_dc == "" ? "(any dc)" : source_dc); + co_await retrier(_rebuild_result, [&] () -> future<> { + auto tmptr = get_token_metadata_ptr(); + if (is_repair_based_node_ops_enabled(streaming::stream_reason::rebuild)) { + co_await _repair.local().rebuild_with_repair(tmptr, std::move(source_dc)); + } else { + auto streamer = make_lw_shared(_db, _stream_manager, tmptr, _abort_source, + get_broadcast_address(), _sys_ks.local().local_dc_rack(), "Rebuild", streaming::stream_reason::rebuild); + streamer->add_source_filter(std::make_unique(_gossiper.get_unreachable_members())); + if (source_dc != "") { + streamer->add_source_filter(std::make_unique(source_dc)); + } + auto ks_erms = _db.local().get_non_local_strategy_keyspaces_erms(); + for (const auto& [keyspace_name, erm] : ks_erms) { + co_await streamer->add_ranges(keyspace_name, erm, get_ranges_for_endpoint(erm, utils::fb_utilities::get_broadcast_address()), _gossiper, false); + } + try { + co_await streamer->stream_async(); + slogger.info("raft topology: streaming for rebuild successful"); + } catch (...) { + auto ep = std::current_exception(); + // This is used exclusively through JMX, so log the full trace but only throw a simple RTE + slogger.warn("raft topology: error while rebuilding node: {}", ep); + std::rethrow_exception(std::move(ep)); + } + } + }); + _rebuild_result.reset(); + result.status = raft_topology_cmd_result::command_status::success; + } + break; + case node_state::left: + case node_state::none: + case node_state::removing: + on_fatal_internal_error(slogger, fmt::format("Node {} got streaming request in state {}. It should be either dead or not part of the cluster", + raft_server.id(), rs.state)); + break; + } + } + break; + case raft_topology_cmd::command::fence_old_reads: + // We need to make sure all reads that used old topology are completed + // The simplest way to do it for now is to sleep for read timeout + //co_await sleep_abortable(_db.local().get_config().read_request_timeout_in_ms() * std::chrono::milliseconds(1), _abort_source); + result.status = raft_topology_cmd_result::command_status::success; + break; + } + } catch (...) { + slogger.error("raft topology: raft_topology_cmd failed with: {}", std::current_exception()); + } + co_return result; +} + void storage_service::init_messaging_service(sharded& proxy, sharded& sys_dist_ks) { _messaging.local().register_replication_finished([] (gms::inet_address from) { slogger.info("Got confirm_replication from {}", from); @@ -3405,12 +4478,33 @@ void storage_service::init_messaging_service(sharded& pr return ss.node_ops_cmd_handler(coordinator, std::move(req)); }); }); + ser::storage_service_rpc_verbs::register_raft_topology_cmd(&_messaging.local(), [this, &sys_dist_ks] (raft::term_t term, raft_topology_cmd cmd) { + return container().invoke_on(0, [&sys_dist_ks, cmd = std::move(cmd), term] (auto& ss) { + return ss.raft_topology_cmd_handler(sys_dist_ks, term, cmd); + }); + }); + ser::storage_service_rpc_verbs::register_raft_pull_topology_snapshot(&_messaging.local(), [this, &proxy] (raft_topology_pull_params params) { + return container().invoke_on(0, [&proxy] (auto& ss) -> future { + if (!ss._raft_topology_change_enabled) { + co_return raft_topology_snapshot{}; + } + auto rs = co_await db::system_keyspace::query_mutations(proxy, db::system_keyspace::NAME, db::system_keyspace::TOPOLOGY); + auto s = ss._db.local().find_schema(db::system_keyspace::NAME, db::system_keyspace::TOPOLOGY); + std::vector results; + results.reserve(rs->partitions().size()); + boost::range::transform(rs->partitions(), std::back_inserter(results), [s] (const partition& p) { + return canonical_mutation{p.mut().unfreeze(s)}; + }); + co_return raft_topology_snapshot{std::move(results)}; + }); + }); } future<> storage_service::uninit_messaging_service() { return when_all_succeed( _messaging.local().unregister_replication_finished(), - _messaging.local().unregister_node_ops_cmd() + _messaging.local().unregister_node_ops_cmd(), + ser::storage_service_rpc_verbs::unregister(&_messaging.local()) ).discard_result(); } diff --git a/service/storage_service.hh b/service/storage_service.hh index 6406ce87ff..f190354dff 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -43,6 +43,8 @@ #include "cdc/generation_id.hh" #include "raft/raft.hh" #include "repair/id.hh" +#include "raft/server.hh" +#include "service/topology_state_machine.hh" class node_ops_cmd_request; class node_ops_cmd_response; @@ -757,6 +759,40 @@ private: std::unordered_set _normal_state_handled_on_boot; bool is_normal_state_handled_on_boot(gms::inet_address); future<> wait_for_normal_state_handled_on_boot(const std::unordered_set& nodes, sstring ops, node_ops_id uuid); + + friend class group0_state_machine; + bool _raft_topology_change_enabled = false; + future<> _raft_state_monitor = make_ready_future<>(); + // This fibers monitors raft state and start/stops the topology change + // coordinator fiber + future<> raft_state_monitor_fiber(raft::server&, sharded& sys_dist_ks); + + // State machine that is responsible for topology change + topology_state_machine _topology_state_machine; + + future<> _topology_change_coordinator = make_ready_future<>(); + future<> topology_change_coordinator_fiber(raft::server&, raft::term_t, sharded&, abort_source&); + + // Those futures hold results of streaming for various operations + std::optional> _bootstrap_result; + std::optional> _decomission_result; + std::optional> _rebuild_result; + std::unordered_map>> _remove_result; + + future raft_topology_cmd_handler(sharded& sys_dist_ks, raft::term_t term, const raft_topology_cmd& cmd); + + future<> raft_bootstrap(raft::server&); + future<> raft_decomission(); + future<> raft_removenode(locator::host_id host_id); + future<> raft_replace(raft::server&, raft::server_id, gms::inet_address); + future<> raft_rebuild(sstring source_dc); + + // This is called on all nodes for each new command received through raft + future<> topology_transition(storage_proxy& proxy, gms::inet_address, std::vector); + // load topology state machine snapshot into memory + future<> topology_state_load(); + // Applies received raft snapshot to local state machine persistent storage + future<> merge_topology_snapshot(raft_topology_snapshot snp); }; } diff --git a/service/topology_state_machine.cc b/service/topology_state_machine.cc new file mode 100644 index 0000000000..91e92eaf20 --- /dev/null +++ b/service/topology_state_machine.cc @@ -0,0 +1,118 @@ +/* + * Copyright (C) 2022-present ScyllaDB + * + */ + +/* + * SPDX-License-Identifier: (AGPL-3.0-or-later and Apache-2.0) + */ + +#include "topology_state_machine.hh" + +namespace service { + +const std::pair* topology::find(raft::server_id id) { + auto it = normal_nodes.find(id); + if (it != normal_nodes.end()) { + return &*it; + } + it = transition_nodes.find(id); + if (it != transition_nodes.end()) { + return &*it; + } + it = new_nodes.find(id); + if (it != new_nodes.end()) { + return &*it; + } + return nullptr; +} + +bool topology::contains(raft::server_id id) { + return normal_nodes.contains(id) || + transition_nodes.contains(id) || + new_nodes.contains(id) || + left_nodes.contains(id); +} + +static std::unordered_map replication_state_to_name_map = { + {ring_slice::replication_state::write_both_read_old, "write both read old"}, + {ring_slice::replication_state::write_both_read_new, "write both read new"}, + {ring_slice::replication_state::owner, "owner"}, +}; + +std::ostream& operator<<(std::ostream& os, ring_slice::replication_state s) { + os << replication_state_to_name_map[s]; + return os; +} + +ring_slice::replication_state replication_state_from_string(const sstring& s) { + for (auto&& e : replication_state_to_name_map) { + if (e.second == s) { + return e.first; + } + } + throw std::runtime_error(fmt::format("cannot map name {} to token_state", s)); +} + +static std::unordered_map node_state_to_name_map = { + {node_state::bootstrapping, "bootstrapping"}, + {node_state::decommissioning, "decommissioning"}, + {node_state::removing, "removing"}, + {node_state::normal, "normal"}, + {node_state::left, "left"}, + {node_state::replacing, "replacing"}, + {node_state::rebuilding, "rebuilding"}, + {node_state::none, "none"} +}; + +std::ostream& operator<<(std::ostream& os, node_state s) { + os << node_state_to_name_map[s]; + return os; +} + +node_state node_state_from_string(const sstring& s) { + for (auto&& e : node_state_to_name_map) { + if (e.second == s) { + return e.first; + } + } + throw std::runtime_error(fmt::format("cannot map name {} to node_state", s)); +} + +static std::unordered_map topology_request_to_name_map = { + {topology_request::join, "join"}, + {topology_request::leave, "leave"}, + {topology_request::remove, "remove"}, + {topology_request::replace, "replace"}, + {topology_request::rebuild, "rebuild"} +}; + +std::ostream& operator<<(std::ostream& os, const topology_request& req) { + os << topology_request_to_name_map[req]; + return os; +} + +topology_request topology_request_from_string(const sstring& s) { + for (auto&& e : topology_request_to_name_map) { + if (e.second == s) { + return e.first; + } + } + throw std::runtime_error(fmt::format("cannot map name {} to topology_request", s)); +} + +std::ostream& operator<<(std::ostream& os, const raft_topology_cmd::command& cmd) { + switch (cmd) { + case raft_topology_cmd::command::barrier: + os << "barrier"; + break; + case raft_topology_cmd::command::stream_ranges: + os << "stream_ranges"; + break; + case raft_topology_cmd::command::fence_old_reads: + os << "fence_old_reads"; + break; + } + return os; +} +} diff --git a/service/topology_state_machine.hh b/service/topology_state_machine.hh new file mode 100644 index 0000000000..df254a8d97 --- /dev/null +++ b/service/topology_state_machine.hh @@ -0,0 +1,133 @@ +/* + * Copyright (C) 2022-present ScyllaDB + * + */ + +/* + * SPDX-License-Identifier: (AGPL-3.0-or-later and Apache-2.0) + */ + +#pragma once + +#include +#include "boost/range/join.hpp" +#include +#include +#include +#include +#include +#include +#include "dht/token.hh" +#include "raft/raft.hh" +#include "utils/UUID.hh" +#include "dht/i_partitioner.hh" +#include "mutation/canonical_mutation.hh" + +namespace service { + +enum class node_state: uint8_t { + none, // the new node joined group0 but did not bootstraped yet (has no tokens and data to serve) + bootstrapping, // the node is currently in the process of streaming its part of the ring + decommissioning, // the node is being decomissioned and stream its data to nodes that took over + removing, // the node is being removed and its data is streamed to nodes that took over from still alive owners + replacing, // the node replaces another dead node in the cluster and it data is being streamed to it + rebuilding, // the node is being rebuild and is streaming data from other replicas + normal, // the node does not do any streaming and serves the slice of the ring that belongs to it + left // the node left the cluster and group0 +}; + +enum class topology_request: uint8_t { + join, + leave, + remove, + replace, + rebuild +}; + +using request_param = std::variant; + +struct ring_slice { + enum class replication_state: uint8_t { + write_both_read_old, + write_both_read_new, + owner + }; + + replication_state state; + std::unordered_set tokens; +}; + +struct replica_state { + node_state state; + seastar::sstring datacenter; + seastar::sstring rack; + seastar::sstring release_version; + std::optional ring; // if engaged contain the set of tokens the node owns together with their state +}; + +struct topology { + // Nodes that are normal members of the ring + std::unordered_map normal_nodes; + // Nodes that are left + std::unordered_set left_nodes; + // Nodes that are waiting to be joined by the topology coordinator + std::unordered_map new_nodes; + // Nodes that are in the process to be added to the ring + // Currently only at most one node at a time will be here + std::unordered_map transition_nodes; + + // Pending topology requests + std::unordered_map requests; + + // Holds parameters for a request per node and valid during entire + // operation untill the node becomes normal + std::unordered_map req_param; + + // Find only nodes in non 'left' state + const std::pair* find(raft::server_id id); + // Return true if node exists in any state including 'left' one + bool contains(raft::server_id id); +}; + +struct raft_topology_snapshot { + std::vector mutations; +}; + +struct raft_topology_pull_params { +}; + +// State machine that is responsible for topology change +struct topology_state_machine { + using topology_type = topology; + topology_type _topology; + condition_variable event; +}; + +// Raft leader uses this command to drive bootstrap process on other nodes +struct raft_topology_cmd { + enum class command: uint8_t { + barrier, // request to wait for the latest topology + stream_ranges, // reqeust to stream data, return when streaming is + // done + fence_old_reads // wait for all reads started before to complete + }; + command cmd; +}; + +// returned as a result of raft_bootstrap_cmd +struct raft_topology_cmd_result { + enum class command_status: uint8_t { + fail, + success + }; + command_status status = command_status::fail; +}; + +std::ostream& operator<<(std::ostream& os, ring_slice::replication_state s); +ring_slice::replication_state replication_state_from_string(const sstring& s); +std::ostream& operator<<(std::ostream& os, node_state s); +node_state node_state_from_string(const sstring& s); +std::ostream& operator<<(std::ostream& os, const topology_request& req); +topology_request topology_request_from_string(const sstring& s); +std::ostream& operator<<(std::ostream& os, const raft_topology_cmd::command& cmd); +}