Merge 'raft topology: implement check_and_repair_cdc_streams API' from Kamil Braun

`check_and_repair_cdc_streams` is an existing API which you can use when the
current CDC generation is suboptimal, e.g. after you decommissioned a node the
current generation has more stream IDs than you need. In that case you can do
`nodetool checkAndRepairCdcStreams` to create a new generation with fewer
streams.

It also works when you change number of shards on some node. We don't
automatically introduce a new generation in that case but you can use
`checkAndRepairCdcStreams` to create a new generation with restored
shard-colocation.

This PR implements the API on top of raft topology, it was originally
implemented using gossiper.  It uses the `commit_cdc_generation` topology
transition state and a new `publish_cdc_generation` state to create new CDC
generations in a cluster without any nodes changing their `node_state`s in the
process.

Closes #13683

* github.com:scylladb/scylladb:
  docs: update topology-over-raft.md
  test: topology_experimental_raft: test `check_and_repair_cdc` API
  raft topology: implement `check_and_repair_cdc_streams` API
  raft topology: implement global request handling
  raft topology: introduce `prepare_new_cdc_generation_data`
  raft_topology: `get_node_to_work_on_opt`: return guard if no node found
  raft topology: remove `node_to_work_on` from `commit_cdc_generation` transition
  raft topology: separate `publish_cdc_generation` state
  raft topology: non-node-specific `exec_global_command`
  raft topology: introduce `start_operation()`
  raft topology: non-node-specific `topology_mutation_builder`
  topology_state_machine: introduce `global_topology_request`
  topology_state_machine: use `uint16_t` for `enum_class`es
  raft topology: make `new_cdc_generation_data_uuid` topology-global
This commit is contained in:
Tomasz Grabiec
2023-05-22 11:33:58 +02:00
9 changed files with 560 additions and 253 deletions

View File

@@ -681,11 +681,14 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
req.get_query_param("key")));
});
ss::cdc_streams_check_and_repair.set(r, [&cdc_gs] (std::unique_ptr<http::request> req) {
if (!cdc_gs.local_is_initialized()) {
throw std::runtime_error("get_cdc_generation_service: not initialized yet");
}
return cdc_gs.local().check_and_repair_cdc_streams().then([] {
ss::cdc_streams_check_and_repair.set(r, [&cdc_gs, &ss] (std::unique_ptr<http::request> req) {
return ss.invoke_on(0, [&cdc_gs] (service::storage_service& ss) {
if (!cdc_gs.local_is_initialized()) {
throw std::runtime_error("CDC generation service not initialized yet");
}
return ss.check_and_repair_cdc_streams(cdc_gs.local());
}).then([] {
return make_ready_future<json::json_return_type>(json_void());
});
});

View File

@@ -256,10 +256,11 @@ schema_ptr system_keyspace::topology() {
.with_column("num_tokens", int32_type)
.with_column("shard_count", int32_type)
.with_column("ignore_msb", int32_type)
.with_column("new_cdc_generation_data_uuid", uuid_type)
.with_column("new_cdc_generation_data_uuid", uuid_type, column_kind::static_column)
.with_column("transition_state", utf8_type, column_kind::static_column)
.with_column("current_cdc_generation_uuid", uuid_type, column_kind::static_column)
.with_column("current_cdc_generation_timestamp", timestamp_type, column_kind::static_column)
.with_column("global_topology_request", utf8_type, column_kind::static_column)
.set_comment("Current state of topology change machine")
.with_version(generate_schema_version(id))
.build();
@@ -3504,14 +3505,8 @@ future<service::topology> system_keyspace::load_topology_state() {
host_id));
}
utils::UUID new_cdc_gen_uuid;
if (row.has("new_cdc_generation_data_uuid")) {
new_cdc_gen_uuid = row.get_as<utils::UUID>("new_cdc_generation_data_uuid");
}
ring_slice = service::ring_slice {
.tokens = std::move(tokens),
.new_cdc_generation_data_uuid = new_cdc_gen_uuid,
};
}
@@ -3606,6 +3601,10 @@ future<service::topology> system_keyspace::load_topology_state() {
"load_topology_state: topology not in transition state but transition nodes are present");
}
if (some_row.has("new_cdc_generation_data_uuid")) {
ret.new_cdc_generation_data_uuid = some_row.get_as<utils::UUID>("new_cdc_generation_data_uuid");
}
if (some_row.has("current_cdc_generation_uuid")) {
auto gen_uuid = some_row.get_as<utils::UUID>("current_cdc_generation_uuid");
if (!some_row.has("current_cdc_generation_timestamp")) {
@@ -3644,6 +3643,12 @@ future<service::topology> system_keyspace::load_topology_state() {
"load_topology_state: normal nodes present but no current CDC generation ID");
}
}
if (some_row.has("global_topology_request")) {
auto req = service::global_topology_request_from_string(
some_row.get_as<sstring>("global_topology_request"));
ret.global_request.emplace(req);
}
}
co_return ret;

View File

@@ -4,76 +4,119 @@ The topology state machine tracks all the nodes in a cluster,
their state, properties (topology, tokens, etc) and requested actions.
Node state can be one of those:
none - the new node joined group0 but did not bootstraped yet (has no tokens and data to serve)
bootstrapping - the node is currently in the process of streaming its part of the ring
decommissioning - the node is being decomissioned and stream its data to nodes that took over
removing - the node is being removed and its data is streamed to nodes that took over from still alive owners
replacing - the node replaces another dead node in the cluster and it data is being streamed to it
rebuilding - the node is being rebuild and is streaming data from other replicas
normal - the node does not do any streaming and serves the slice of the ring that belongs to it
left - the node left the cluster and group0
- `none` - the new node joined group0 but did not bootstraped yet (has no tokens and data to serve)
- `bootstrapping` - the node is currently in the process of streaming its part of the ring
- `decommissioning` - the node is being decomissioned and stream its data to nodes that took over
- `removing` - the node is being removed and its data is streamed to nodes that took over from still alive owners
- `replacing` - the node replaces another dead node in the cluster and it data is being streamed to it
- `rebuilding` - the node is being rebuild and is streaming data from other replicas
- `normal` - the node does not do any streaming and serves the slice of the ring that belongs to it
- `left` - the node left the cluster and group0
Nodes in state left are never removed from the state.
State transition diagram:
State transition diagram for nodes:
```
{none} ------> {bootstrapping|replacing} ------> {normal} <---> {rebuilding}
| | |
| | |
| V V
----------------> {left} <-------- {decommissioning|removing}
```
A state may have additional parameters associated with it. For instance
A node state may have additional parameters associated with it. For instance
'replacing' state has host id of a node been replaced as a parameter.
Tokens also can be in one of the states:
Additionally to specific node states, there entire topology can also be in a transitioning state:
write_both_read_old - writes are going to new and old replica, but reads are from
old replicas still
write_both_read_new - writes still going to old and new replicas but reads are
from new replica
owner - tokens are owned by the node and reads and write go to new
replica set only
- `commit_cdc_generation` - a new CDC generation data was written to internal tables earlier
and now we need to commit the generation - create a timestamp for it and tell every node
to start using it for CDC log table writes.
- `publish_cdc_generation` - a new CDC generation was committed and now we need to publish it
to user-facing description tables.
- `write_both_read_old` - one of the nodes is in a bootstrapping/decommissioning/removing/replacing state.
Writes are going to both new and old replicas (new replicas means calculated according to modified
token ring), reads are using old replicas.
- `write_both_read_new` - as above, but reads are using new replicas.
Tokens that needs to be move start in 'write_both_read_old' state. After entire
cluster learns about it streaming start. After the streaming tokens move
to 'write_both_read_new' state and again the whole cluster needs to learn about it
and make sure no reads started before that point exist in the system.
After that tokens may move to the 'owner' state.
When a node bootstraps, we create new tokens for it and a new CDC generation
and enter the `commit_cdc_generation` state. After committing the generation we
move to `publish_cdc_generation`. Once the generation is published, we enter
`write_both_read_old` state. After the entire cluster learns about it,
streaming starts. When streaming finishes, we move to `write_both_read_new`
state and again the whole cluster needs to learn about it and make sure that no
reads that started before this point exist in the system. Finally we remove the
transitioning state.
Decommission, removenode and replace work similarly, except they don't go through
`commit_cdc_generation` and `publish_cdc_generation`.
The state machine may also go only through `commit_cdc_generation` and
`publish_cdc_generation` states after getting a request from the user to create
a new CDC generation if the current one is suboptimal (e.g. after a
decommission).
The state machine also maintains a map of topology requests per node.
When a request is issued to a node the entry is added to the map. A
request is one of the topology operation currently supported: join,
leave, replace, remove and rebuild. A request may also have parameters
request is one of the topology operation currently supported: `join`,
`leave`, `replace`, `remove` and `rebuild`. A request may also have parameters
associated with it which are also stored in a separate map.
Note that some nodes may require work but the topology as a whole does not
transition. An example of this is the `rebuilding` state which does not change
the topology but requires streaming data.
Separately from the per-node requests, there is also a 'global' request field
for operations that don't affect any specific node but the entire cluster,
such as `check_and_repair_cdc_streams`.
# Topology state persistence table
The in memory state's machine state is persisted in a local table system.topology.
The schema of the tables is:
The in memory state's machine state is persisted in a local table `system.topology`.
The schema of the table is:
```
CREATE TABLE system.topology (
host_id uuid PRIMARY KEY,
key text,
host_id uuid,
datacenter text,
ignore_msb int,
node_state text,
num_tokens int,
rack text,
rebuild_option text,
release_version text,
replaced_id uuid,
shard_count int,
tokens set<text>,
replication_state text,
topology_request text
rebuild_option text
topology_request text,
transition_state text static,
current_cdc_generation_timestamp timestamp static,
current_cdc_generation_uuid uuid static,
global_topology_request text static,
new_cdc_generation_data_uuid uuid static,
PRIMARY KEY (key, host_id)
)
```
This is a single-partition table, with `key = 'topology'`.
Each node has a row in the table where its host_id is the primary key. The row contains:
host_id - id of the node
datacenter - a name of the datacenter the node belongs to
rack - a name of the rack the node belongs to
release_version - release version of the Scylla on the node
node_state - current state of the node
topology_request - if set contains one of the supported topology requests
tokens - if set contains a list of tokens that belongs to the node
replication_state - if set contains a state the state the token replication is now in
replaced_id - if the node replacing or replaced another node here will be the id of that node
rebuild_option - if the node is being rebuild contains datacenter name that is used as a rebuild source
Each node has a clustering row in the table where its `host_id` is the clustering key. The row contains:
- `host_id` - id of the node
- `datacenter` - a name of the datacenter the node belongs to
- `rack` - a name of the rack the node belongs to
- `ignore_msb` - the value of the node's `murmur3_partitioner_ignore_msb_bits` parameter
- `shard_count` - the node's `smp::count`
- `release_version` - the node's `version::current()` (corresponding to a Cassandra version, used by drivers)
- `node_state` - current state of the node (as described earlier)
- `topology_request` - if set contains one of the supported node-specific topology requests
- `tokens` - if set contains a list of tokens that belongs to the node
- `replaced_id` - if the node replacing or replaced another node here will be the id of that node
- `rebuild_option` - if the node is being rebuild contains datacenter name that is used as a rebuild source
- `num_tokens` - the requested number of tokens when the node bootstraps
There are also a few static columns for cluster-global properties:
- `transition_state` - the transitioning state of the cluster (as described earlier), may be null
- `current_cdc_generation_timestamp` - the timestamp of the last introduced CDC generation
- `current_cdc_generation_uuid` - the UUID of the last introduced CDC generation (used to access its data)
- `global_topology_request` - if set, contains one of the supported global topology requests
- `new_cdc_generation_data_uuid` - used in `commit_cdc_generation` state, the UUID of the generation to be committed

View File

@@ -468,81 +468,104 @@ future<> storage_service::merge_topology_snapshot(raft_topology_snapshot snp) {
co_await _db.local().apply(freeze(muts), db::no_timeout);
}
class topology_mutation_builder;
class topology_node_mutation_builder {
topology_mutation_builder& _builder;
deletable_row& _r;
public:
topology_node_mutation_builder(topology_mutation_builder&, raft::server_id);
template<typename T>
topology_node_mutation_builder& set(const char* cell, const T& value) {
return set(cell, sstring{::format("{}", value)});
}
topology_node_mutation_builder& set(const char* cell, const sstring& value);
topology_node_mutation_builder& set(const char* cell, const raft::server_id& value);
topology_node_mutation_builder& set(const char* cell, const std::unordered_set<dht::token>& value);
topology_node_mutation_builder& set(const char* cell, const uint32_t& value);
topology_node_mutation_builder& set(const char* cell, const utils::UUID& value);
topology_node_mutation_builder& del(const char* cell);
canonical_mutation build();
};
class topology_mutation_builder {
friend class topology_node_mutation_builder;
schema_ptr _s;
mutation _m;
api::timestamp_type _ts;
deletable_row& _r;
std::optional<topology_node_mutation_builder> _node_builder;
public:
topology_mutation_builder(api::timestamp_type ts, raft::server_id);
template<typename T>
topology_mutation_builder& set(const char* cell, const T& value) {
return set(cell, ::format("{}", value));
}
topology_mutation_builder& set(const char* cell, const sstring& value);
topology_mutation_builder& set(const char* cell, const raft::server_id& value);
topology_mutation_builder& set(const char* cell, const std::unordered_set<dht::token>& value);
topology_mutation_builder& set(const char* cell, const uint32_t& value);
topology_mutation_builder& set(const char* cell, const utils::UUID& value);
topology_mutation_builder(api::timestamp_type ts);
topology_mutation_builder& set_transition_state(topology::transition_state);
topology_mutation_builder& set_current_cdc_generation_id(const cdc::generation_id_v2&);
topology_mutation_builder& set_new_cdc_generation_data_uuid(const utils::UUID& value);
topology_mutation_builder& set_global_topology_request(global_topology_request);
topology_mutation_builder& del_transition_state();
topology_mutation_builder& del(const char* cell);
topology_mutation_builder& del_global_topology_request();
topology_node_mutation_builder& with_node(raft::server_id);
canonical_mutation build() { return canonical_mutation{std::move(_m)}; }
};
topology_mutation_builder::topology_mutation_builder(api::timestamp_type ts, raft::server_id id) :
topology_mutation_builder::topology_mutation_builder(api::timestamp_type ts) :
_s(db::system_keyspace::topology()),
_m(_s, partition_key::from_singular(*_s, db::system_keyspace::TOPOLOGY)),
_ts(ts),
_r(_m.partition().clustered_row(*_s, clustering_key::from_singular(*_s, id.uuid()))) {
_r.apply(row_marker(_ts));
_ts(ts) {
}
topology_mutation_builder& topology_mutation_builder::set(const char* cell, const sstring& value) {
auto cdef = _s->get_column_definition(cell);
topology_node_mutation_builder::topology_node_mutation_builder(topology_mutation_builder& builder, raft::server_id id) :
_builder(builder),
_r(_builder._m.partition().clustered_row(*_builder._s, clustering_key::from_singular(*_builder._s, id.uuid()))) {
_r.apply(row_marker(_builder._ts));
}
topology_node_mutation_builder& topology_node_mutation_builder::set(const char* cell, const sstring& value) {
auto cdef = _builder._s->get_column_definition(cell);
assert(cdef);
_r.cells().apply(*cdef, atomic_cell::make_live(*cdef->type, _ts, cdef->type->decompose(value)));
_r.cells().apply(*cdef, atomic_cell::make_live(*cdef->type, _builder._ts, cdef->type->decompose(value)));
return *this;
}
topology_mutation_builder& topology_mutation_builder::set(const char* cell, const raft::server_id& value) {
auto cdef = _s->get_column_definition(cell);
topology_node_mutation_builder& topology_node_mutation_builder::set(const char* cell, const raft::server_id& value) {
auto cdef = _builder._s->get_column_definition(cell);
assert(cdef);
_r.cells().apply(*cdef, atomic_cell::make_live(*cdef->type, _ts, cdef->type->decompose(value.uuid())));
_r.cells().apply(*cdef, atomic_cell::make_live(*cdef->type, _builder._ts, cdef->type->decompose(value.uuid())));
return *this;
}
topology_mutation_builder& topology_mutation_builder::set(const char* cell, const uint32_t& value) {
auto cdef = _s->get_column_definition(cell);
topology_node_mutation_builder& topology_node_mutation_builder::set(const char* cell, const uint32_t& value) {
auto cdef = _builder._s->get_column_definition(cell);
assert(cdef);
_r.cells().apply(*cdef, atomic_cell::make_live(*cdef->type, _ts, cdef->type->decompose(int32_t(value))));
_r.cells().apply(*cdef, atomic_cell::make_live(*cdef->type, _builder._ts, cdef->type->decompose(int32_t(value))));
return *this;
}
topology_mutation_builder& topology_mutation_builder::set(
topology_node_mutation_builder& topology_node_mutation_builder::set(
const char* cell, const utils::UUID& value) {
auto cdef = _s->get_column_definition(cell);
auto cdef = _builder._s->get_column_definition(cell);
assert(cdef);
_r.cells().apply(*cdef, atomic_cell::make_live(*cdef->type, _ts, cdef->type->decompose(value)));
_r.cells().apply(*cdef, atomic_cell::make_live(*cdef->type, _builder._ts, cdef->type->decompose(value)));
return *this;
}
topology_mutation_builder& topology_mutation_builder::del(const char* cell) {
auto cdef = _s->get_column_definition(cell);
topology_node_mutation_builder& topology_node_mutation_builder::del(const char* cell) {
auto cdef = _builder._s->get_column_definition(cell);
assert(cdef);
if (!cdef->type->is_multi_cell()) {
_r.cells().apply(*cdef, atomic_cell::make_dead(_ts, gc_clock::now()));
_r.cells().apply(*cdef, atomic_cell::make_dead(_builder._ts, gc_clock::now()));
} else {
collection_mutation_description cm;
cm.tomb = tombstone{_ts, gc_clock::now()};
cm.tomb = tombstone{_builder._ts, gc_clock::now()};
_r.cells().apply(*cdef, cm.serialize(*cdef->type));
}
return *this;
}
topology_mutation_builder& topology_mutation_builder::set(const char* cell, const std::unordered_set<dht::token>& tokens) {
auto cdef = _s->get_column_definition(cell);
topology_node_mutation_builder& topology_node_mutation_builder::set(const char* cell, const std::unordered_set<dht::token>& tokens) {
auto cdef = _builder._s->get_column_definition(cell);
assert(cdef);
collection_mutation_description cm;
if (tokens.size()) {
@@ -551,7 +574,7 @@ topology_mutation_builder& topology_mutation_builder::set(const char* cell, cons
cm.cells.reserve(tokens.size());
for (auto&& value : tokens) {
cm.cells.emplace_back(vtype->decompose(value.to_sstring()), atomic_cell::make_live(*bytes_type, _ts, bytes_view()));
cm.cells.emplace_back(vtype->decompose(value.to_sstring()), atomic_cell::make_live(*bytes_type, _builder._ts, bytes_view()));
}
_r.cells().apply(*cdef, cm.serialize(*cdef->type));
@@ -561,6 +584,10 @@ topology_mutation_builder& topology_mutation_builder::set(const char* cell, cons
return *this;
}
canonical_mutation topology_node_mutation_builder::build() {
return canonical_mutation{std::move(_builder._m)};
}
topology_mutation_builder& topology_mutation_builder::set_transition_state(topology::transition_state value) {
_m.set_static_cell("transition_state", ::format("{}", value), _ts);
return *this;
@@ -580,6 +607,29 @@ topology_mutation_builder& topology_mutation_builder::set_current_cdc_generation
return *this;
}
topology_mutation_builder& topology_mutation_builder::set_new_cdc_generation_data_uuid(
const utils::UUID& value) {
_m.set_static_cell("new_cdc_generation_data_uuid", value, _ts);
return *this;
}
topology_mutation_builder& topology_mutation_builder::set_global_topology_request(global_topology_request value) {
_m.set_static_cell("global_topology_request", ::format("{}", value), _ts);
return *this;
}
topology_mutation_builder& topology_mutation_builder::del_global_topology_request() {
auto cdef = _s->get_column_definition("global_topology_request");
assert(cdef);
_m.partition().static_row().apply(*cdef, atomic_cell::make_dead(_ts, gc_clock::now()));
return *this;
}
topology_node_mutation_builder& topology_mutation_builder::with_node(raft::server_id n) {
_node_builder.emplace(*this, n);
return *_node_builder;
}
using raft_topology_cmd_handler_type = noncopyable_function<future<raft_topology_cmd_result>(
sharded<db::system_distributed_keyspace>&, raft::term_t, const raft_topology_cmd&)>;
@@ -658,7 +708,8 @@ class topology_coordinator {
}
}
std::optional<node_to_work_on> get_node_to_work_on_opt(group0_guard guard) {
// Returns the guard back if no node to work on is found.
std::variant<group0_guard, node_to_work_on> get_node_to_work_on_opt(group0_guard guard) {
auto& topo = _topo_sm._topology;
const std::pair<const raft::server_id, replica_state>* e = nullptr;
@@ -677,7 +728,7 @@ class topology_coordinator {
}
if (!e) {
return std::nullopt;
return guard;
}
std::optional<request_param> req_param;
@@ -689,26 +740,30 @@ class topology_coordinator {
};
node_to_work_on get_node_to_work_on(group0_guard guard) {
auto node_opt = get_node_to_work_on_opt(std::move(guard));
if (!node_opt) {
on_internal_error(slogger, ::format(
"raft topology: could not find node to work on"
" even though the state requires it (state: {})", _topo_sm._topology.tstate));
auto node_or_guard = get_node_to_work_on_opt(std::move(guard));
if (auto* node = std::get_if<node_to_work_on>(&node_or_guard)) {
return std::move(*node);
}
return std::move(*node_opt);
on_internal_error(slogger, ::format(
"raft topology: could not find node to work on"
" even though the state requires it (state: {})", _topo_sm._topology.tstate));
};
void release_node(std::optional<node_to_work_on> node) {
// Leaving the scope destroys the object and releases the guard.
}
future<node_to_work_on> retake_node(raft::server_id id) {
future<group0_guard> start_operation() {
auto guard = co_await _group0.client().start_operation(&_as);
if (_term != _raft.get_current_term()) {
throw term_changed_error{};
}
co_return std::move(guard);
}
void release_node(std::optional<node_to_work_on> node) {
// Leaving the scope destroys the object and releases the guard.
}
node_to_work_on retake_node(group0_guard guard, raft::server_id id) {
auto& topo = _topo_sm._topology;
auto it = topo.find(id);
@@ -724,7 +779,7 @@ class topology_coordinator {
if (pit != topo.req_param.end()) {
req_param = pit->second;
}
co_return node_to_work_on{std::move(guard), &topo, id, &it->second, std::move(req), std::move(req_param)};
return node_to_work_on{std::move(guard), &topo, id, &it->second, std::move(req), std::move(req_param)};
}
group0_guard take_guard(node_to_work_on&& node) {
@@ -774,7 +829,7 @@ class topology_coordinator {
auto id = node.id;
release_node(std::move(node));
co_await exec_direct_command_helper(id, cmd);
co_return co_await retake_node(id);
co_return retake_node(co_await start_operation(), id);
};
future<bool> exec_global_command_helper(auto nodes, const raft_topology_cmd& cmd) {
@@ -791,67 +846,177 @@ class topology_coordinator {
}
};
future<std::pair<group0_guard, bool>> exec_global_command(
group0_guard guard, const raft_topology_cmd& cmd,
const utils::small_vector<raft::server_id, 2>& exclude_nodes) {
auto nodes = _topo_sm._topology.normal_nodes | boost::adaptors::filtered(
[&exclude_nodes] (const std::pair<const raft::server_id, replica_state>& n) {
return std::none_of(exclude_nodes.begin(), exclude_nodes.end(),
[&n] (const raft::server_id& m) { return n.first == m; });
}) | boost::adaptors::map_keys;
{
// release guard
auto _ = std::move(guard);
}
bool res = co_await exec_global_command_helper(std::move(nodes), cmd);
co_return std::pair{co_await start_operation(), res};
}
future<std::pair<node_to_work_on, bool>> exec_global_command(
node_to_work_on&& node, const raft_topology_cmd& cmd, bool include_local) {
auto my_id = _raft.id();
auto id = node.id;
auto exclude_node = parse_replaced_node(node);
auto nodes = node.topology->normal_nodes | boost::adaptors::filtered(
[my_id, include_local, exclude_node] (const std::pair<const raft::server_id, replica_state>& n) {
return (include_local || n.first != my_id) && n.first != exclude_node;
}) | boost::adaptors::map_keys;
release_node(std::move(node));
bool res = co_await exec_global_command_helper(nodes, cmd);
co_return std::make_pair(co_await retake_node(id), res);
utils::small_vector<raft::server_id, 2> exclude_nodes{parse_replaced_node(node)};
if (!include_local) {
exclude_nodes.push_back(_raft.id());
}
auto [guard, res] = co_await exec_global_command(std::move(node.guard), cmd, exclude_nodes);
co_return std::pair{retake_node(std::move(guard), node.id), res};
};
struct bootstrapping_info {
const std::unordered_set<token>& bootstrap_tokens;
const replica_state& rs;
};
// Returns data for a new CDC generation in the form of mutations for the CDC_GENERATIONS_V3 table
// and the generation's UUID.
//
// If there's a bootstrapping node, its tokens should be included in the new generation.
// Pass them and a reference to the bootstrapping node's replica_state through `binfo`.
future<std::pair<utils::UUID, utils::chunked_vector<mutation>>> prepare_new_cdc_generation_data(
locator::token_metadata_ptr tmptr, const group0_guard& guard, std::optional<bootstrapping_info> binfo) {
auto get_sharding_info = [&] (dht::token end) -> std::pair<size_t, uint8_t> {
if (binfo && binfo->bootstrap_tokens.contains(end)) {
return {binfo->rs.shard_count, binfo->rs.ignore_msb};
} else {
// FIXME: token metadata should directly return host ID for given token. See #12279
auto ep = tmptr->get_endpoint(end);
if (!ep) {
// get_sharding_info is only called for bootstrap tokens
// or for tokens present in token_metadata
on_internal_error(slogger, ::format(
"raft topology: make_new_cdc_generation_data: get_sharding_info:"
" can't find endpoint for token {}", end));
}
auto id = tmptr->get_host_id_if_known(*ep);
if (!id) {
on_internal_error(slogger, ::format(
"raft topology: make_new_cdc_generation_data: get_sharding_info:"
" can't find host ID for endpoint {}, owner of token {}", *ep, end));
}
auto ptr = _topo_sm._topology.find(raft::server_id{id->uuid()});
if (!ptr) {
on_internal_error(slogger, ::format(
"raft topology: make_new_cdc_generation_data: get_sharding_info:"
" couldn't find node {} in topology, owner of token {}", *id, end));
}
auto& rs = ptr->second;
return {rs.shard_count, rs.ignore_msb};
}
};
auto [gen_uuid, gen_desc] = cdc::make_new_generation_data(
binfo ? binfo->bootstrap_tokens : std::unordered_set<token>{}, get_sharding_info, tmptr);
auto gen_table_schema = _db.find_schema(
db::system_keyspace::NAME, db::system_keyspace::CDC_GENERATIONS_V3);
// FIXME: the CDC generation data can be large and not fit in a single command
// (for large clusters, it will introduce reactor stalls and go over commitlog entry
// size limit). We need to split it into multiple mutations by smartly picking
// a `mutation_size_threshold` and sending each mutation as a separate group 0 command.
// We also don't want to serialize the commands - there may be many of them,
// and we don't want to wait for a network round-trip to a quorum between each command.
// So we need to introduce a mechanism for group 0 to send a sequence of commands
// that can be committed concurrently. Also we need to be careful with memory consumption
// with many large mutations.
// See `system_distributed_keyspace::insert_cdc_generation` for inspiration how it
// was done when the mutations were stored in a regular distributed table.
const size_t mutation_size_threshold = 2'000'000;
auto gen_mutations = co_await cdc::get_cdc_generation_mutations(
gen_table_schema, gen_uuid, gen_desc, mutation_size_threshold, guard.write_timestamp());
co_return std::pair{gen_uuid, std::move(gen_mutations)};
}
// Precondition: there is no node request and no ongoing topology transition
// (checked under the guard we're holding).
future<> handle_global_request(group0_guard guard) {
switch (_topo_sm._topology.global_request.value()) {
case global_topology_request::new_cdc_generation: {
slogger.info("raft topology: new CDC generation requested");
auto tmptr = get_token_metadata_ptr();
auto [gen_uuid, gen_mutations] = co_await prepare_new_cdc_generation_data(tmptr, guard, std::nullopt);
std::vector<canonical_mutation> updates{gen_mutations.begin(), gen_mutations.end()};
topology_mutation_builder builder(guard.write_timestamp());
builder.set_transition_state(topology::transition_state::commit_cdc_generation)
.set_new_cdc_generation_data_uuid(gen_uuid)
.del_global_topology_request();
updates.push_back(builder.build());
auto reason = ::format(
"insert CDC generation data (UUID: {})", gen_uuid);
co_await update_topology_state(std::move(guard), {std::move(updates)}, reason);
}
break;
}
}
// Returns `true` iff there was work to do.
future<bool> handle_topology_transition(group0_guard guard) {
auto tstate = _topo_sm._topology.tstate;
if (!tstate) {
auto node_opt = get_node_to_work_on_opt(std::move(guard));
if (!node_opt) {
co_return false;
auto node_or_guard = get_node_to_work_on_opt(std::move(guard));
if (auto* node = std::get_if<node_to_work_on>(&node_or_guard)) {
co_await handle_node_transition(std::move(*node));
co_return true;
}
co_await handle_node_transition(std::move(*node_opt));
co_return true;
guard = std::get<group0_guard>(std::move(node_or_guard));
if (_topo_sm._topology.global_request) {
co_await handle_global_request(std::move(guard));
co_return true;
}
co_return false;
}
bool exec_command_res;
switch (*tstate) {
case topology::transition_state::commit_cdc_generation: {
auto node = get_node_to_work_on(std::move(guard));
// make sure all nodes know about new topology and have the new CDC generation data
// (we require all nodes to be alive for topo change for now)
std::tie(node, exec_command_res) = co_await exec_global_command(
std::move(node), raft_topology_cmd{raft_topology_cmd::command::barrier}, false);
// Note: if there was a replace or removenode going on, we'd need to put the replaced/removed
// node into `exclude_nodes` parameter in `exec_global_command`, but CDC generations are never
// introduced during replace/remove.
std::tie(guard, exec_command_res) = co_await exec_global_command(
std::move(guard), raft_topology_cmd{raft_topology_cmd::command::barrier}, {_raft.id()});
if (!exec_command_res) {
break;
}
// We don't need to add delay to the generation timestamp if this is the first generation.
bool add_ts_delay = bool(node.topology->current_cdc_generation_id);
bool add_ts_delay = bool(_topo_sm._topology.current_cdc_generation_id);
// Begin the race.
// See the large FIXME below.
auto cdc_gen_ts = cdc::new_generation_timestamp(add_ts_delay, _ring_delay);
auto cdc_gen_uuid = node.rs->ring.value().new_cdc_generation_data_uuid;
auto cdc_gen_uuid = _topo_sm._topology.new_cdc_generation_data_uuid;
if (!cdc_gen_uuid) {
on_internal_error(slogger, ::format(
"raft topology: new CDC generation data UUID missing in `commit_cdc_generation` state"
", transitioning node: {}", node.id));
on_internal_error(slogger,
"raft topology: new CDC generation data UUID missing in `commit_cdc_generation` state");
}
cdc::generation_id_v2 cdc_gen_id {
.ts = cdc_gen_ts,
.id = cdc_gen_uuid,
.id = *cdc_gen_uuid,
};
{
// Sanity check.
// This could happen if the topology coordinator's clock is broken.
auto curr_gen_id = node.topology->current_cdc_generation_id;
auto curr_gen_id = _topo_sm._topology.current_cdc_generation_id;
if (curr_gen_id && curr_gen_id->ts >= cdc_gen_ts) {
on_internal_error(slogger, ::format(
"raft topology: new CDC generation has smaller timestamp than the previous one."
@@ -881,11 +1046,30 @@ class topology_coordinator {
// in the middle of a CDC generation switch (when they are prepared to switch but not
// committed) - they won't coordinate CDC-enabled writes until they reconnect to the
// majority and commit.
topology_mutation_builder builder(node.guard.write_timestamp(), node.id);
builder.set_transition_state(topology::transition_state::write_both_read_old)
topology_mutation_builder builder(guard.write_timestamp());
builder.set_transition_state(topology::transition_state::publish_cdc_generation)
.set_current_cdc_generation_id(cdc_gen_id);
auto str = ::format("{}: committed new CDC generation, ID: {}", node.rs->state, cdc_gen_id);
co_await update_topology_state(take_guard(std::move(node)), {builder.build()}, std::move(str));
auto str = ::format("committed new CDC generation, ID: {}", cdc_gen_id);
co_await update_topology_state(std::move(guard), {builder.build()}, std::move(str));
}
break;
case topology::transition_state::publish_cdc_generation: {
// We just committed a new CDC generation in the commit_cdc_generation step.
// Publish it to the user-facing distributed CDC description tables.
auto curr_gen_id = _topo_sm._topology.current_cdc_generation_id.value();
auto gen_data = co_await _sys_ks.read_cdc_generation(curr_gen_id.id);
co_await _sys_dist_ks.local().create_cdc_desc(
curr_gen_id.ts, gen_data, { get_token_metadata().count_normal_token_owners() });
topology_mutation_builder builder(guard.write_timestamp());
if (_topo_sm._topology.transition_nodes.empty()) {
builder.del_transition_state();
} else {
builder.set_transition_state(topology::transition_state::write_both_read_old);
}
auto str = ::format("published CDC generation, ID: {}", curr_gen_id);
co_await update_topology_state(std::move(guard), {builder.build()}, std::move(str));
}
break;
case topology::transition_state::write_both_read_old: {
@@ -898,16 +1082,6 @@ class topology_coordinator {
break;
}
// If a node is bootstrapping, we just committed a new CDC generation in the commit_cdc_generation step.
// Publish it to the user-facing distributed CDC description tables.
if (node.rs->state == node_state::bootstrapping) {
auto curr_gen_id = node.topology->current_cdc_generation_id.value();
auto gen_data = co_await _sys_ks.read_cdc_generation(curr_gen_id.id);
co_await _sys_dist_ks.local().create_cdc_desc(
curr_gen_id.ts, gen_data, { get_token_metadata().count_normal_token_owners() });
}
raft_topology_cmd cmd{raft_topology_cmd::command::stream_ranges};
if (node.rs->state == node_state::removing) {
// tell all nodes to stream data of the removed node to new range owners
@@ -929,7 +1103,7 @@ class topology_coordinator {
}
}
// Streaming completed. We can now move tokens state to topology::transition_state::write_both_read_new
topology_mutation_builder builder(node.guard.write_timestamp(), node.id);
topology_mutation_builder builder(node.guard.write_timestamp());
builder.set_transition_state(topology::transition_state::write_both_read_new);
auto str = ::format("{}: streaming completed for node {}", node.rs->state, node.id);
co_await update_topology_state(take_guard(std::move(node)), {builder.build()}, std::move(str));
@@ -947,8 +1121,9 @@ class topology_coordinator {
}
switch(node.rs->state) {
case node_state::bootstrapping: {
topology_mutation_builder builder(node.guard.write_timestamp(), node.id);
topology_mutation_builder builder(node.guard.write_timestamp());
builder.del_transition_state()
.with_node(node.id)
.set("node_state", node_state::normal);
co_await update_topology_state(take_guard(std::move(node)), {builder.build()},
"bootstrap: read fence completed");
@@ -956,23 +1131,26 @@ class topology_coordinator {
break;
case node_state::decommissioning:
case node_state::removing: {
topology_mutation_builder builder(node.guard.write_timestamp(), node.id);
builder.del("tokens")
.set("node_state", node_state::left)
.del_transition_state();
topology_mutation_builder builder(node.guard.write_timestamp());
builder.del_transition_state()
.with_node(node.id)
.del("tokens")
.set("node_state", node_state::left);
auto str = ::format("{}: read fence completed", node.rs->state);
co_await update_topology_state(take_guard(std::move(node)), {builder.build()}, std::move(str));
}
break;
case node_state::replacing: {
topology_mutation_builder builder1(node.guard.write_timestamp(), node.id);
topology_mutation_builder builder1(node.guard.write_timestamp());
// Move new node to 'normal'
builder1.del_transition_state()
.with_node(node.id)
.set("node_state", node_state::normal);
// Move old node to 'left'
topology_mutation_builder builder2(node.guard.write_timestamp(), parse_replaced_node(node));
builder2.del("tokens")
topology_mutation_builder builder2(node.guard.write_timestamp());
builder2.with_node(parse_replaced_node(node))
.del("tokens")
.set("node_state", node_state::left);
co_await update_topology_state(take_guard(std::move(node)), {builder1.build(), builder2.build()},
"replace: read fence completed");
@@ -1001,7 +1179,7 @@ class topology_coordinator {
case node_state::none: {
// if the state is none there have to be either 'join' or 'replace' request
// if the state is normal there have to be either 'leave', 'remove' or 'rebuild' request
topology_mutation_builder builder(node.guard.write_timestamp(), node.id);
topology_mutation_builder builder(node.guard.write_timestamp());
switch (node.request.value()) {
case topology_request::join: {
assert(!node.rs->ring);
@@ -1012,66 +1190,17 @@ class topology_coordinator {
auto bootstrap_tokens = dht::boot_strapper::get_random_bootstrap_tokens(
tmptr, num_tokens, dht::check_token_endpoint::yes);
auto get_sharding_info = [&] (dht::token end) -> std::pair<size_t, uint8_t> {
if (bootstrap_tokens.contains(end)) {
return {node.rs->shard_count, node.rs->ignore_msb};
} else {
// FIXME: token metadata should directly return host ID for given token. See #12279
auto ep = tmptr->get_endpoint(end);
if (!ep) {
// get_sharding_info is only called for bootstrap tokens
// or for tokens present in token_metadata
on_internal_error(slogger, ::format(
"raft topology: make_new_cdc_generation_data: get_sharding_info:"
" can't find endpoint for token {}", end));
}
auto id = tmptr->get_host_id_if_known(*ep);
if (!id) {
on_internal_error(slogger, ::format(
"raft topology: make_new_cdc_generation_data: get_sharding_info:"
" can't find host ID for endpoint {}, owner of token {}", *ep, end));
}
auto ptr = node.topology->find(raft::server_id{id->uuid()});
if (!ptr) {
on_internal_error(slogger, ::format(
"raft topology: make_new_cdc_generation_data: get_sharding_info:"
" couldn't find node {} in topology, owner of token {}", *id, end));
}
auto& rs = ptr->second;
return {rs.shard_count, rs.ignore_msb};
}
};
auto [gen_uuid, gen_desc] = cdc::make_new_generation_data(
bootstrap_tokens, get_sharding_info, tmptr);
auto gen_table_schema = _db.find_schema(
db::system_keyspace::NAME, db::system_keyspace::CDC_GENERATIONS_V3);
// FIXME: the CDC generation data can be large and not fit in a single command
// (for large clusters, it will introduce reactor stalls and go over commitlog entry
// size limit). We need to split it into multiple mutations by smartly picking
// a `mutation_size_threshold` and sending each mutation as a separate group 0 command.
// We also don't want to serialize the commands - there may be many of them,
// and we don't want to wait for a network round-trip to a quorum between each command.
// So we need to introduce a mechanism for group 0 to send a sequence of commands
// that can be committed concurrently. Also we need to be careful with memory consumption
// with many large mutations.
// See `system_distributed_keyspace::insert_cdc_generation` for inspiration how it
// was done when the mutations were stored in a regular distributed table.
const size_t mutation_size_threshold = 2'000'000;
auto gen_mutations = co_await cdc::get_cdc_generation_mutations(
gen_table_schema, gen_uuid, gen_desc, mutation_size_threshold, node.guard.write_timestamp());
auto [gen_uuid, gen_mutations] = co_await prepare_new_cdc_generation_data(
tmptr, node.guard, bootstrapping_info{bootstrap_tokens, *node.rs});
std::vector<canonical_mutation> updates{gen_mutations.begin(), gen_mutations.end()};
// Write chosen tokens and CDC generation data through raft.
builder.set("node_state", node_state::bootstrapping)
.del("topology_request")
.set("tokens", bootstrap_tokens)
.set_transition_state(topology::transition_state::commit_cdc_generation)
.set("new_cdc_generation_data_uuid", gen_uuid);
builder.set_transition_state(topology::transition_state::commit_cdc_generation)
.set_new_cdc_generation_data_uuid(gen_uuid)
.with_node(node.id)
.set("node_state", node_state::bootstrapping)
.del("topology_request")
.set("tokens", bootstrap_tokens);
updates.push_back(builder.build());
auto reason = ::format(
"bootstrap: assign tokens and insert CDC generation data (UUID: {})", gen_uuid);
@@ -1083,9 +1212,10 @@ class topology_coordinator {
// start decommission and put tokens of decommissioning nodes into write_both_read_old state
// meaning that reads will go to the replica being decommissioned
// but writes will go to new owner as well
builder.set("node_state", node_state::decommissioning)
.del("topology_request")
.set_transition_state(topology::transition_state::write_both_read_old);
builder.set_transition_state(topology::transition_state::write_both_read_old)
.with_node(node.id)
.set("node_state", node_state::decommissioning)
.del("topology_request");
co_await update_topology_state(take_guard(std::move(node)), {builder.build()},
"start decommission");
break;
@@ -1094,9 +1224,10 @@ class topology_coordinator {
// start removing and put tokens of a node been removed into write_both_read_old state
// meaning that reads will go to the replica being removed (it is dead though)
// but writes will go to new owner as well
builder.set("node_state", node_state::removing)
.del("topology_request")
.set_transition_state(topology::transition_state::write_both_read_old);
builder.set_transition_state(topology::transition_state::write_both_read_old)
.with_node(node.id)
.set("node_state", node_state::removing)
.del("topology_request");
co_await update_topology_state(take_guard(std::move(node)), {builder.build()},
"start removenode");
break;
@@ -1109,17 +1240,19 @@ class topology_coordinator {
// start replacing and take ownership of the tokens of a node been replaced
// and put them into write_both_read_old state meaning that reads will go
// to the replica being removed (it is dead though) but writes will go to new owner as well
builder.set("node_state", node_state::replacing)
.del("topology_request")
.set("tokens", it->second.ring->tokens)
.set_transition_state(topology::transition_state::write_both_read_old);
builder.set_transition_state(topology::transition_state::write_both_read_old)
.with_node(node.id)
.set("node_state", node_state::replacing)
.del("topology_request")
.set("tokens", it->second.ring->tokens);
co_await update_topology_state(take_guard(std::move(node)), {builder.build()}, "start replace");
break;
}
case topology_request::rebuild: {
topology_mutation_builder builder(node.guard.write_timestamp(), node.id);
builder.set("node_state", node_state::rebuilding)
.del("topology_request");
topology_mutation_builder builder(node.guard.write_timestamp());
builder.with_node(node.id)
.set("node_state", node_state::rebuilding)
.del("topology_request");
co_await update_topology_state(take_guard(std::move(node)), {builder.build()},
"start rebuilding");
break;
@@ -1130,8 +1263,9 @@ class topology_coordinator {
case node_state::rebuilding: {
node = co_await exec_direct_command(
std::move(node), raft_topology_cmd{raft_topology_cmd::command::stream_ranges});
topology_mutation_builder builder(node.guard.write_timestamp(), node.id);
builder.set("node_state", node_state::normal)
topology_mutation_builder builder(node.guard.write_timestamp());
builder.with_node(node.id)
.set("node_state", node_state::normal)
.del("rebuild_option");
co_await update_topology_state(take_guard(std::move(node)), {builder.build()}, "rebuilding completed");
}
@@ -1188,7 +1322,7 @@ future<> topology_coordinator::run() {
wait_for_event = false;
}
auto guard = co_await _group0.client().start_operation(&_as);
auto guard = co_await start_operation();
co_await cleanup_group0_config_if_needed();
bool had_work = co_await handle_topology_transition(std::move(guard));
@@ -1264,8 +1398,9 @@ future<> storage_service::raft_replace(raft::server& raft_server, raft::server_i
}
auto& rs = it->second;
topology_mutation_builder builder(guard.write_timestamp(), raft_server.id());
builder.set("node_state", node_state::none)
topology_mutation_builder builder(guard.write_timestamp());
builder.with_node(raft_server.id())
.set("node_state", node_state::none)
.set("datacenter", rs.datacenter)
.set("rack", rs.rack)
.set("release_version", version::release())
@@ -1300,8 +1435,9 @@ future<> storage_service::raft_bootstrap(raft::server& raft_server) {
slogger.info("raft topology: adding myself to topology: {}", raft_server.id());
// Current topology does not contains this node. Bootstrap is needed!
auto guard = co_await _group0->client().start_operation(&_abort_source);
topology_mutation_builder builder(guard.write_timestamp(), raft_server.id());
builder.set("node_state", node_state::none)
topology_mutation_builder builder(guard.write_timestamp());
builder.with_node(raft_server.id())
.set("node_state", node_state::none)
.set("datacenter", _snitch.local()->get_datacenter())
.set("rack", _snitch.local()->get_rack())
.set("release_version", version::release())
@@ -1367,8 +1503,9 @@ future<> storage_service::update_topology_with_local_metadata(raft::server& raft
co_await _sys_ks.local().set_must_synchronize_topology(true);
topology_mutation_builder builder(guard.write_timestamp(), raft_server.id());
builder.set("shard_count", local_shard_count)
topology_mutation_builder builder(guard.write_timestamp());
builder.with_node(raft_server.id())
.set("shard_count", local_shard_count)
.set("ignore_msb", local_ignore_msb)
.set("release_version", local_release_version);
topology_change change{{builder.build()}};
@@ -3362,8 +3499,9 @@ future<> storage_service::raft_decomission() {
}
slogger.info("raft topology: request decomission for: {}", raft_server.id());
topology_mutation_builder builder(guard.write_timestamp(), raft_server.id());
builder.set("topology_request", topology_request::leave);
topology_mutation_builder builder(guard.write_timestamp());
builder.with_node(raft_server.id())
.set("topology_request", topology_request::leave);
topology_change change{{builder.build()}};
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, ::format("decomission: request decomission for {}", raft_server.id()));
@@ -3703,8 +3841,9 @@ future<> storage_service::raft_removenode(locator::host_id host_id) {
}
slogger.info("raft topology: request removenode for: {}", id);
topology_mutation_builder builder(guard.write_timestamp(), id);
builder.set("topology_request", topology_request::remove);
topology_mutation_builder builder(guard.write_timestamp());
builder.with_node(id)
.set("topology_request", topology_request::remove);
topology_change change{{builder.build()}};
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, ::format("removenode: request remove for {}", id));
@@ -3857,6 +3996,16 @@ future<> storage_service::removenode(locator::host_id host_id, std::list<locator
});
}
future<> storage_service::check_and_repair_cdc_streams(cdc::generation_service& cdc_gen_svc) {
assert(this_shard_id() == 0);
if (_raft_topology_change_enabled) {
return raft_check_and_repair_cdc_streams();
}
return cdc_gen_svc.check_and_repair_cdc_streams();
}
class node_ops_meta_data {
node_ops_id _ops_uuid;
gms::inet_address _coordinator;
@@ -4221,8 +4370,9 @@ future<> storage_service::raft_rebuild(sstring source_dc) {
}
slogger.info("raft topology: request rebuild for: {}", raft_server.id());
topology_mutation_builder builder(guard.write_timestamp(), raft_server.id());
builder.set("topology_request", topology_request::rebuild)
topology_mutation_builder builder(guard.write_timestamp());
builder.with_node(raft_server.id())
.set("topology_request", topology_request::rebuild)
.set("rebuild_option", source_dc);
topology_change change{{builder.build()}};
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, ::format("rebuild: request rebuild for {} ({})", raft_server.id(), source_dc));
@@ -4242,6 +4392,44 @@ future<> storage_service::raft_rebuild(sstring source_dc) {
});
}
future<> storage_service::raft_check_and_repair_cdc_streams() {
std::optional<cdc::generation_id_v2> curr_gen;
while (true) {
slogger.info("raft topology: request check_and_repair_cdc_streams, refreshing topology");
auto guard = co_await _group0->client().start_operation(&_abort_source);
auto curr_req = _topology_state_machine._topology.global_request;
if (curr_req && *curr_req != global_topology_request::new_cdc_generation) {
// FIXME: replace this with a queue
throw std::runtime_error{
"check_and_repair_cdc_streams: a different topology request is already pending, try again later"};
}
curr_gen = _topology_state_machine._topology.current_cdc_generation_id;
// FIXME: check if the current generation is optimal, don't request new one if it isn't
topology_mutation_builder builder(guard.write_timestamp());
builder.set_global_topology_request(global_topology_request::new_cdc_generation);
topology_change change{{builder.build()}};
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard,
::format("request check+repair CDC generation from {}", _group0->group0_server().id()));
try {
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_abort_source);
} catch (group0_concurrent_modification&) {
slogger.info("raft topology: request check+repair CDC: concurrent operation is detected, retrying.");
continue;
}
break;
}
// Wait until the current CDC generation changes.
// This might happen due to a different reason than our request but we don't care.
co_await _topology_state_machine.event.when([this, &curr_gen] {
return curr_gen != _topology_state_machine._topology.current_cdc_generation_id;
});
}
future<> storage_service::rebuild(sstring source_dc) {
return run_with_api_lock(sstring("rebuild"), [source_dc] (storage_service& ss) -> future<> {
if (ss._raft_topology_change_enabled) {

View File

@@ -710,6 +710,9 @@ public:
future<std::map<gms::inet_address, float>> effective_ownership(sstring keyspace_name);
// Must run on shard 0.
future<> check_and_repair_cdc_streams(cdc::generation_service&);
private:
promise<> _drain_finished;
std::optional<shared_future<>> _transport_stopped;
@@ -789,6 +792,7 @@ private:
future<> raft_removenode(locator::host_id host_id);
future<> raft_replace(raft::server&, raft::server_id, gms::inet_address);
future<> raft_rebuild(sstring source_dc);
future<> raft_check_and_repair_cdc_streams();
future<> update_topology_with_local_metadata(raft::server&);
// This is called on all nodes for each new command received through raft

View File

@@ -38,6 +38,7 @@ bool topology::contains(raft::server_id id) {
static std::unordered_map<topology::transition_state, sstring> transition_state_to_name_map = {
{topology::transition_state::commit_cdc_generation, "commit cdc generation"},
{topology::transition_state::publish_cdc_generation, "publish cdc generation"},
{topology::transition_state::write_both_read_old, "write both read old"},
{topology::transition_state::write_both_read_new, "write both read new"},
};
@@ -109,6 +110,28 @@ topology_request topology_request_from_string(const sstring& s) {
throw std::runtime_error(fmt::format("cannot map name {} to topology_request", s));
}
static std::unordered_map<global_topology_request, sstring> global_topology_request_to_name_map = {
{global_topology_request::new_cdc_generation, "new_cdc_generation"},
};
std::ostream& operator<<(std::ostream& os, const global_topology_request& req) {
auto it = global_topology_request_to_name_map.find(req);
if (it == global_topology_request_to_name_map.end()) {
on_internal_error(tsmlogger, format("cannot print global topology request {}", static_cast<uint8_t>(req)));
}
return os << it->second;
}
global_topology_request global_topology_request_from_string(const sstring& s) {
for (auto&& e : global_topology_request_to_name_map) {
if (e.second == s) {
return e.first;
}
}
on_internal_error(tsmlogger, format("cannot map name {} to global_topology_request", s));
}
std::ostream& operator<<(std::ostream& os, const raft_topology_cmd::command& cmd) {
switch (cmd) {
case raft_topology_cmd::command::barrier:

View File

@@ -26,7 +26,7 @@
namespace service {
enum class node_state: uint8_t {
enum class node_state: uint16_t {
none, // the new node joined group0 but did not bootstraped yet (has no tokens and data to serve)
bootstrapping, // the node is currently in the process of streaming its part of the ring
decommissioning, // the node is being decomissioned and stream its data to nodes that took over
@@ -37,7 +37,7 @@ enum class node_state: uint8_t {
left // the node left the cluster and group0
};
enum class topology_request: uint8_t {
enum class topology_request: uint16_t {
join,
leave,
remove,
@@ -47,13 +47,12 @@ enum class topology_request: uint8_t {
using request_param = std::variant<raft::server_id, sstring, uint32_t>;
enum class global_topology_request: uint16_t {
new_cdc_generation,
};
struct ring_slice {
std::unordered_set<dht::token> tokens;
// When a new node joins the cluster, always a new CDC generation is created.
// This is the UUID used to access the data of the CDC generation introduced
// when the node owning this ring_slice joined (it's the partition key in CDC_GENERATIONS_V3 table).
utils::UUID new_cdc_generation_data_uuid;
};
struct replica_state {
@@ -67,8 +66,9 @@ struct replica_state {
};
struct topology {
enum class transition_state: uint8_t {
enum class transition_state: uint16_t {
commit_cdc_generation,
publish_cdc_generation,
write_both_read_old,
write_both_read_new,
};
@@ -92,8 +92,17 @@ struct topology {
// operation untill the node becomes normal
std::unordered_map<raft::server_id, request_param> req_param;
// Pending global topology request (i.e. not related to any specific node).
std::optional<global_topology_request> global_request;
// The ID of the last introduced CDC generation.
std::optional<cdc::generation_id_v2> current_cdc_generation_id;
// This is the UUID used to access the data of a new CDC generation introduced
// e.g. when a new node bootstraps, needed in `commit_cdc_generation` transition state.
// It's used as partition key in CDC_GENERATIONS_V3 table.
std::optional<utils::UUID> new_cdc_generation_data_uuid;
// Find only nodes in non 'left' state
const std::pair<const raft::server_id, replica_state>* find(raft::server_id id) const;
// Return true if node exists in any state including 'left' one
@@ -120,7 +129,7 @@ struct topology_state_machine {
// Raft leader uses this command to drive bootstrap process on other nodes
struct raft_topology_cmd {
enum class command: uint8_t {
enum class command: uint16_t {
barrier, // request to wait for the latest topology
stream_ranges, // reqeust to stream data, return when streaming is
// done
@@ -131,7 +140,7 @@ struct raft_topology_cmd {
// returned as a result of raft_bootstrap_cmd
struct raft_topology_cmd_result {
enum class command_status: uint8_t {
enum class command_status: uint16_t {
fail,
success
};
@@ -144,5 +153,7 @@ std::ostream& operator<<(std::ostream& os, node_state s);
node_state node_state_from_string(const sstring& s);
std::ostream& operator<<(std::ostream& os, const topology_request& req);
topology_request topology_request_from_string(const sstring& s);
std::ostream& operator<<(std::ostream&, const global_topology_request&);
global_topology_request global_topology_request_from_string(const sstring&);
std::ostream& operator<<(std::ostream& os, const raft_topology_cmd::command& cmd);
}

View File

@@ -65,6 +65,11 @@ class ManagerClient():
self.ccluster = None
self.cql = None
def get_cql(self) -> CassandraSession:
"""Precondition: driver is connected"""
assert(self.cql)
return self.cql
# Make driver update endpoints from remote connection
def _driver_update(self) -> None:
if self.ccluster is not None:

View File

@@ -5,11 +5,17 @@
#
from test.pylib.scylla_cluster import ReplaceConfig
from test.pylib.manager_client import ManagerClient
from test.pylib.util import unique_name
from test.pylib.util import wait_for, wait_for_cql_and_get_hosts
from test.topology.util import check_token_ring_and_group0_consistency
from cassandra.cluster import ConsistencyLevel # type: ignore # pylint: disable=no-name-in-module
from cassandra.query import SimpleStatement # type: ignore # pylint: disable=no-name-in-module
import pytest
import logging
import time
from datetime import datetime
from typing import Optional
logger = logging.getLogger(__name__)
@@ -35,7 +41,26 @@ async def test_topology_ops(request, manager: ManagerClient):
logger.info(f"Removing node {servers[0]} using {servers[1]}")
await manager.remove_node(servers[1].server_id, servers[0].server_id)
await check_token_ring_and_group0_consistency(manager)
servers = servers[1:]
logger.info(f"Decommissioning node {servers[1]}")
await manager.decommission_node(servers[1].server_id)
cql = manager.get_cql()
query = SimpleStatement(
"select time from system_distributed.cdc_generation_timestamps where key = 'timestamps'",
consistency_level = ConsistencyLevel.QUORUM)
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
gen_timestamps = {r.time for r in await manager.get_cql().run_async(query)}
logger.info(f"Timestamps before check_and_repair: {gen_timestamps}")
await manager.api.client.post("/storage_service/cdc_streams_check_and_repair", servers[1].ip_addr)
async def new_gen_appeared() -> Optional[set[datetime]]:
new_gen_timestamps = {r.time for r in await manager.get_cql().run_async(query)}
assert(gen_timestamps <= new_gen_timestamps)
if gen_timestamps < new_gen_timestamps:
return new_gen_timestamps
return None
new_gen_timestamps = await wait_for(new_gen_appeared, time.time() + 60)
logger.info(f"Timestamps after check_and_repair: {new_gen_timestamps}")
logger.info(f"Decommissioning node {servers[0]}")
await manager.decommission_node(servers[0].server_id)
await check_token_ring_and_group0_consistency(manager)