topology_state_machine: extract features-related fields to a struct
`enabled_features` and `supported_features` are now moved to a new `topology::features` struct. This will allow to move load this information independently from the `topology` struct, which will be needed for feature checking during start.
This commit is contained in:
@@ -2531,9 +2531,8 @@ future<service::topology> system_keyspace::load_topology_state() {
|
||||
rebuild_option = row.get_as<sstring>("rebuild_option");
|
||||
}
|
||||
|
||||
std::set<sstring> supported_features;
|
||||
if (row.has("supported_features")) {
|
||||
supported_features = decode_features(deserialize_set_column(*topology(), row, "supported_features"));
|
||||
if (row.has("supported_features") && nstate == service::node_state::normal) {
|
||||
ret.features.normal_supported_features.emplace(host_id, decode_features(deserialize_set_column(*topology(), row, "supported_features")));
|
||||
}
|
||||
|
||||
if (row.has("topology_request")) {
|
||||
@@ -2602,7 +2601,7 @@ future<service::topology> system_keyspace::load_topology_state() {
|
||||
if (map) {
|
||||
map->emplace(host_id, service::replica_state{
|
||||
nstate, std::move(datacenter), std::move(rack), std::move(release_version),
|
||||
ring_slice, shard_count, ignore_msb, std::move(supported_features)});
|
||||
ring_slice, shard_count, ignore_msb});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2677,7 +2676,7 @@ future<service::topology> system_keyspace::load_topology_state() {
|
||||
}
|
||||
|
||||
if (some_row.has("enabled_features")) {
|
||||
ret.enabled_features = decode_features(deserialize_set_column(*topology(), some_row, "enabled_features"));
|
||||
ret.features.enabled_features = decode_features(deserialize_set_column(*topology(), some_row, "enabled_features"));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -257,8 +257,8 @@ future<> feature_service::enable_features_on_startup(db::system_keyspace& sys_ks
|
||||
persisted_features = co_await sys_ks.load_local_enabled_features();
|
||||
} else {
|
||||
auto topo = co_await sys_ks.load_topology_state();
|
||||
persisted_unsafe_to_disable_features = topo.calculate_not_yet_enabled_features();
|
||||
persisted_features = std::move(topo.enabled_features);
|
||||
persisted_unsafe_to_disable_features = topo.features.calculate_not_yet_enabled_features();
|
||||
persisted_features = std::move(topo.features.enabled_features);
|
||||
}
|
||||
|
||||
if (persisted_features.empty() && persisted_unsafe_to_disable_features.empty()) {
|
||||
|
||||
@@ -314,13 +314,13 @@ future<> storage_service::topology_state_load(cdc::generation_service& cdc_gen_s
|
||||
_topology_state_machine._topology = co_await _sys_ks.local().load_topology_state();
|
||||
|
||||
co_await _feature_service.container().invoke_on_all([&] (gms::feature_service& fs) {
|
||||
return fs.enable(boost::copy_range<std::set<std::string_view>>(_topology_state_machine._topology.enabled_features));
|
||||
return fs.enable(boost::copy_range<std::set<std::string_view>>(_topology_state_machine._topology.features.enabled_features));
|
||||
});
|
||||
|
||||
// Update the legacy `enabled_features` key in `system.scylla_local`.
|
||||
// It's OK to update it after enabling features because `system.topology` now
|
||||
// is the source of truth about enabled features.
|
||||
co_await _sys_ks.local().save_local_enabled_features(_topology_state_machine._topology.enabled_features);
|
||||
co_await _sys_ks.local().save_local_enabled_features(_topology_state_machine._topology.features.enabled_features);
|
||||
|
||||
const auto& am = _group0->address_map();
|
||||
auto id2ip = [this, &am] (raft::server_id id) -> future<gms::inet_address> {
|
||||
@@ -1477,7 +1477,7 @@ class topology_coordinator {
|
||||
co_return true;
|
||||
}
|
||||
|
||||
if (auto feats = _topo_sm._topology.calculate_not_yet_enabled_features(); !feats.empty()) {
|
||||
if (auto feats = _topo_sm._topology.features.calculate_not_yet_enabled_features(); !feats.empty()) {
|
||||
co_await enable_features(std::move(guard), std::move(feats));
|
||||
co_return true;
|
||||
}
|
||||
@@ -2133,11 +2133,12 @@ future<> storage_service::do_update_topology_with_local_metadata(raft::server& r
|
||||
}
|
||||
|
||||
auto& replica_state = it->second;
|
||||
const auto& replica_supported_features = _topology_state_machine._topology.features.normal_supported_features[raft_server.id()];
|
||||
|
||||
return replica_state.shard_count == local_shard_count
|
||||
&& replica_state.ignore_msb == local_ignore_msb
|
||||
&& replica_state.release_version == local_release_version
|
||||
&& replica_state.supported_features == local_supported_features;
|
||||
&& replica_supported_features == local_supported_features;
|
||||
};
|
||||
|
||||
// We avoid performing a read barrier if we're sure that our metadata stored in topology
|
||||
|
||||
@@ -36,11 +36,11 @@ bool topology::contains(raft::server_id id) {
|
||||
left_nodes.contains(id);
|
||||
}
|
||||
|
||||
std::set<sstring> topology::calculate_not_yet_enabled_features() const {
|
||||
std::set<sstring> topology_features::calculate_not_yet_enabled_features() const {
|
||||
std::set<sstring> to_enable;
|
||||
bool first = true;
|
||||
|
||||
for (const auto& [_, rs] : normal_nodes) {
|
||||
for (const auto& [id, supported_features] : normal_supported_features) {
|
||||
if (!first && to_enable.empty()) {
|
||||
break;
|
||||
}
|
||||
@@ -49,11 +49,13 @@ std::set<sstring> topology::calculate_not_yet_enabled_features() const {
|
||||
// This is the first node that we process.
|
||||
// Calculate the set of features that this node understands
|
||||
// but are not enabled yet.
|
||||
std::ranges::set_difference(rs.supported_features, enabled_features, std::inserter(to_enable, to_enable.begin()));
|
||||
std::ranges::set_difference(supported_features, enabled_features, std::inserter(to_enable, to_enable.begin()));
|
||||
first = false;
|
||||
} else {
|
||||
// Erase the elements that this node doesn't support
|
||||
std::erase_if(to_enable, [&rs = rs] (const sstring& f) { return !rs.supported_features.contains(f); });
|
||||
std::erase_if(to_enable, [&supported_features = supported_features] (const sstring& f) {
|
||||
return !supported_features.contains(f);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -64,7 +64,17 @@ struct replica_state {
|
||||
std::optional<ring_slice> ring; // if engaged contain the set of tokens the node owns together with their state
|
||||
size_t shard_count;
|
||||
uint8_t ignore_msb;
|
||||
std::set<sstring> supported_features;
|
||||
};
|
||||
|
||||
struct topology_features {
|
||||
// Supported features, for normal nodes
|
||||
std::unordered_map<raft::server_id, std::set<sstring>> normal_supported_features;
|
||||
|
||||
// Features that are considered enabled by the cluster
|
||||
std::set<sstring> enabled_features;
|
||||
|
||||
// Calculates a set of features that are supported by all normal nodes but not yet enabled.
|
||||
std::set<sstring> calculate_not_yet_enabled_features() const;
|
||||
};
|
||||
|
||||
struct topology {
|
||||
@@ -110,15 +120,13 @@ struct topology {
|
||||
// It's used as partition key in CDC_GENERATIONS_V3 table.
|
||||
std::optional<utils::UUID> new_cdc_generation_data_uuid;
|
||||
|
||||
// Features that are considered enabled by the cluster
|
||||
std::set<sstring> enabled_features;
|
||||
// Describes the state of the features of normal nodes
|
||||
topology_features features;
|
||||
|
||||
// Find only nodes in non 'left' state
|
||||
const std::pair<const raft::server_id, replica_state>* find(raft::server_id id) const;
|
||||
// Return true if node exists in any state including 'left' one
|
||||
bool contains(raft::server_id id);
|
||||
// Calculates a set of features that are supported by all normal nodes but not yet enabled
|
||||
std::set<sstring> calculate_not_yet_enabled_features() const;
|
||||
// Number of nodes that are not in the 'left' state
|
||||
size_t size() const;
|
||||
// Are there any non-left nodes?
|
||||
|
||||
Reference in New Issue
Block a user