Extend system.topology with 3 new columns to store data required to process alter ks global topo req

Because ALTER KS will result in creating a global topo req, we'll have
to pass the req data to topology coordinator's state machine, and the
easiest way to do it is through sytem.topology table, which is going to
be extended with 3 extra columns carrying all the data required to
execute ALTER KS from within topology coordinator.
This commit is contained in:
Piotr Smaron
2024-04-19 13:49:04 +02:00
parent 6fd0a49b63
commit 59d3fd615f
5 changed files with 42 additions and 0 deletions

View File

@@ -236,12 +236,15 @@ schema_ptr system_keyspace::topology() {
.with_column("request_id", timeuuid_type)
.with_column("ignore_nodes", set_type_impl::get_instance(uuid_type, true), column_kind::static_column)
.with_column("new_cdc_generation_data_uuid", timeuuid_type, column_kind::static_column)
.with_column("new_keyspace_rf_change_ks_name", utf8_type, column_kind::static_column)
.with_column("new_keyspace_rf_change_data", map_type_impl::get_instance(utf8_type, utf8_type, false), column_kind::static_column)
.with_column("version", long_type, column_kind::static_column)
.with_column("fence_version", long_type, column_kind::static_column)
.with_column("transition_state", utf8_type, column_kind::static_column)
.with_column("committed_cdc_generations", set_type_impl::get_instance(cdc_generation_ts_id_type, true), column_kind::static_column)
.with_column("unpublished_cdc_generations", set_type_impl::get_instance(cdc_generation_ts_id_type, true), column_kind::static_column)
.with_column("global_topology_request", utf8_type, column_kind::static_column)
.with_column("global_topology_request_id", timeuuid_type, column_kind::static_column)
.with_column("enabled_features", set_type_impl::get_instance(utf8_type, true), column_kind::static_column)
.with_column("session", uuid_type, column_kind::static_column)
.with_column("tablet_balancing_enabled", boolean_type, column_kind::static_column)
@@ -3070,6 +3073,11 @@ future<service::topology> system_keyspace::load_topology_state(const std::unorde
ret.committed_cdc_generations = decode_cdc_generations_ids(deserialize_set_column(*topology(), some_row, "committed_cdc_generations"));
}
if (some_row.has("new_keyspace_rf_change_data")) {
ret.new_keyspace_rf_change_ks_name = some_row.get_as<sstring>("new_keyspace_rf_change_ks_name");
ret.new_keyspace_rf_change_data = some_row.get_map<sstring,sstring>("new_keyspace_rf_change_data");
}
if (!ret.committed_cdc_generations.empty()) {
// Sanity check for CDC generation data consistency.
auto gen_id = ret.committed_cdc_generations.back();
@@ -3101,6 +3109,10 @@ future<service::topology> system_keyspace::load_topology_state(const std::unorde
ret.global_request.emplace(req);
}
if (some_row.has("global_topology_request_id")) {
ret.global_request_id = some_row.get_as<utils::UUID>("global_topology_request_id");
}
if (some_row.has("enabled_features")) {
ret.enabled_features = decode_features(deserialize_set_column(*topology(), some_row, "enabled_features"));
}

View File

@@ -549,7 +549,10 @@ CREATE TABLE system.topology (
committed_cdc_generations set<tuple<timestamp, timeuuid>> static,
unpublished_cdc_generations set<tuple<timestamp, timeuuid>> static,
global_topology_request text static,
global_topology_request_id timeuuid static,
new_cdc_generation_data_uuid timeuuid static,
new_keyspace_rf_change_ks_name text static,
new_keyspace_rf_change_data frozen<map<text, text>> static,
PRIMARY KEY (key, host_id)
)
```
@@ -575,8 +578,11 @@ There are also a few static columns for cluster-global properties:
- `committed_cdc_generations` - the IDs of the committed CDC generations
- `unpublished_cdc_generations` - the IDs of the committed yet unpublished CDC generations
- `global_topology_request` - if set, contains one of the supported global topology requests
- `global_topology_request_id` - if set, contains global topology request's id, which is a new group0's state id
- `new_cdc_generation_data_uuid` - used in `commit_cdc_generation` state, the time UUID of the generation to be committed
- `upgrade_state` - describes the progress of the upgrade to raft-based topology.
- 'new_keyspace_rf_change_ks_name' - the name of the KS that is being the target of the scheduled ALTER KS statement
- 'new_keyspace_rf_change_data' - the KS options to be used when executing the scheduled ALTER KS statement
# Join procedure

View File

@@ -11,6 +11,7 @@
#include "types/tuple.hh"
#include "types/types.hh"
#include "types/set.hh"
#include "types/map.hh"
namespace db {
extern thread_local data_type cdc_generation_ts_id_type;
@@ -213,6 +214,15 @@ topology_mutation_builder& topology_mutation_builder::set_committed_cdc_generati
return apply_set("committed_cdc_generations", collection_apply_mode::overwrite, std::move(dv));
}
topology_mutation_builder& topology_mutation_builder::set_new_keyspace_rf_change_data(
const sstring& ks_name, const std::map<sstring, sstring>& rf_per_dc) {
apply_atomic("new_keyspace_rf_change_ks_name", ks_name);
apply_atomic("new_keyspace_rf_change_data",
make_map_value(schema().get_column_definition("new_keyspace_rf_change_data")->type,
map_type_impl::native_type(rf_per_dc.begin(), rf_per_dc.end())));
return *this;
}
topology_mutation_builder& topology_mutation_builder::set_unpublished_cdc_generations(const std::vector<cdc::generation_id_v2>& values) {
auto dv = values | boost::adaptors::transformed([&] (const auto& v) {
return make_tuple_value(db::cdc_generation_ts_id_type, tuple_type_impl::native_type({v.ts, timeuuid_native_type{v.id}}));
@@ -224,6 +234,10 @@ topology_mutation_builder& topology_mutation_builder::set_global_topology_reques
return apply_atomic("global_topology_request", ::format("{}", value));
}
topology_mutation_builder& topology_mutation_builder::set_global_topology_request_id(const utils::UUID& value) {
return apply_atomic("global_topology_request_id", value);
}
topology_mutation_builder& topology_mutation_builder::set_upgrade_state(topology::upgrade_state_type value) {
return apply_atomic("upgrade_state", ::format("{}", value));
}

View File

@@ -114,8 +114,10 @@ public:
topology_mutation_builder& set_tablet_balancing_enabled(bool);
topology_mutation_builder& set_new_cdc_generation_data_uuid(const utils::UUID& value);
topology_mutation_builder& set_committed_cdc_generations(const std::vector<cdc::generation_id_v2>& values);
topology_mutation_builder& set_new_keyspace_rf_change_data(const sstring &ks_name, const std::map<sstring, sstring> &rf_per_dc);
topology_mutation_builder& set_unpublished_cdc_generations(const std::vector<cdc::generation_id_v2>& values);
topology_mutation_builder& set_global_topology_request(global_topology_request);
topology_mutation_builder& set_global_topology_request_id(const utils::UUID&);
topology_mutation_builder& set_upgrade_state(topology::upgrade_state_type);
topology_mutation_builder& add_enabled_features(const std::set<sstring>& value);
topology_mutation_builder& add_ignored_nodes(const std::unordered_set<raft::server_id>& value);

View File

@@ -154,6 +154,9 @@ struct topology {
// Pending global topology request (i.e. not related to any specific node).
std::optional<global_topology_request> global_request;
// Pending global topology request's id, which is a new group0's state id
std::optional<utils::UUID> global_request_id;
// The IDs of the committed CDC generations sorted by timestamps.
// The obsolete generations may not be in this list as they are continually deleted.
std::vector<cdc::generation_id_v2> committed_cdc_generations;
@@ -163,6 +166,11 @@ struct topology {
// It's used as the first column of the clustering key in CDC_GENERATIONS_V3 table.
std::optional<utils::UUID> new_cdc_generation_data_uuid;
// The name of the KS that is being the target of the scheduled ALTER KS statement
std::optional<sstring> new_keyspace_rf_change_ks_name;
// The KS options to be used when executing the scheduled ALTER KS statement
std::optional<std::unordered_map<sstring, sstring>> new_keyspace_rf_change_data;
// The IDs of the committed yet unpublished CDC generations sorted by timestamps.
std::vector<cdc::generation_id_v2> unpublished_cdc_generations;