db: make schema commitlog feature mandatory

Using consistent cluster management and not using schema commitlog
ends with a bad configuration throw during bootstrap. Soon, we
will make consistent cluster management mandatory. This forces us
to also make schema commitlog mandatory, which we do in this patch.

A booting node decides to use schema commitlog if at least one of
the two statements below is true:
- the node has `force_schema_commitlog=true` config,
- the node knows that the cluster supports the `SCHEMA_COMMITLOG`
  cluster feature.

The `SCHEMA_COMMITLOG` cluster feature has been added in version
5.1. This patch is supposed to be a part of version 6.0. We don't
support a direct upgrade from 5.1 to 6.0 because it skips two
versions - 5.2 and 5.4. So, in a supported upgrade we can assume
that the version which we upgrade from has schema commitlog. This
means that we don't need to check the `SCHEMA_COMMITLOG` feature
during an upgrade.

The reasoning above also applies to Scylla Enterprise. Version
2024.2 will be based on 6.0. Probably, we will only support
an upgrade to 2024.2 from 2024.1, which is based on 5.4. But even
if we support an upgrade from 2023.x, this patch won't break
anything because 2023.1 is based on 5.2, which has schema
commitlog. Upgrades from 2022.x definitely won't be supported.

When we populate a new cluster, we can use the
`force_schema_commitlog=true` config to use schema commitlog
unconditionally. Then, the cluster feature check is irrelevant.
This check could fail because we initiate schema commitlog before
we learn about the features. The `force_schema_commitlog=true`
config is especially useful when we want to use consistent cluster
management. Failing feature checks would lead to crashes during
initial bootstraps. Moreover, there is no point in creating a new
cluster with `consistent_cluster_management=true` and
`force_schema_commitlog=false`. It would just cause some initial
bootstraps to fail, and after successful restarts, the result would
be the same as if we used `force_schema_commitlog=true` from the
start.

In conclusion, we can unconditionally use schema commitlog without
any checks in 6.0 because we can always safely upgrade a cluster
and start a new cluster.

Apart from making schema commitlog mandatory, this patch adds two
changes that are its consequences:
- making the unneeded `force_schema_commitlog` config unused,
- deprecating the `SCHEMA_COMMITLOG` feature, which is always
  assumed to be true.

Closes scylladb/scylladb#16254
This commit is contained in:
Patryk Jędrzejczak
2023-12-01 12:29:34 +01:00
committed by Avi Kivity
parent 75a8be5b87
commit c8ee7d4499
11 changed files with 9 additions and 83 deletions

View File

@@ -559,18 +559,6 @@ murmur3_partitioner_ignore_msb_bits: 12
# Set to `false` to fall-back to the old algorithm.
# enable_parallelized_aggregation: true
# When enabled, the node will start using separate commit log for schema changes
# right from the boot. Without this, it only happens following a restart after
# all nodes in the cluster were upgraded.
#
# Having this option ensures that new installations don't need a rolling restart
# to use the feature, but upgrades do.
#
# WARNING: It's unsafe to set this to false if the node previously booted
# with the schema commit log enabled. In such case, some schema changes
# may be lost if the node was not cleanly stopped.
force_schema_commit_log: true
# Time for which task manager task is kept in memory after it completes.
task_ttl_in_seconds: 10

View File

@@ -1087,7 +1087,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
, restrict_future_timestamp(this, "restrict_future_timestamp",liveness::LiveUpdate, value_status::Used, true, "Controls whether to detect and forbid unreasonable USING TIMESTAMP, more than 3 days into the future.")
, ignore_truncation_record(this, "unsafe_ignore_truncation_record", value_status::Used, false,
"Ignore truncation record stored in system tables as if tables were never truncated.")
, force_schema_commit_log(this, "force_schema_commit_log", value_status::Used, false,
, force_schema_commit_log(this, "force_schema_commit_log", value_status::Unused, false,
"Use separate schema commit log unconditionally rater than after restart following discovery of cluster-wide support for it.")
, task_ttl_seconds(this, "task_ttl_in_seconds", liveness::LiveUpdate, value_status::Used, 0, "Time for which information about finished task stays in memory.")
, nodeops_watchdog_timeout_seconds(this, "nodeops_watchdog_timeout_seconds", liveness::LiveUpdate, value_status::Used, 120, "Time in seconds after which node operations abort when not hearing from the coordinator")

View File

@@ -1277,18 +1277,7 @@ static future<> do_merge_schema(distributed<service::storage_proxy>& proxy, shar
auto old_aggregates = co_await read_schema_for_keyspaces(proxy, AGGREGATES, keyspaces);
auto old_scylla_aggregates = co_await read_schema_for_keyspaces(proxy, SCYLLA_AGGREGATES, keyspaces);
if (proxy.local().get_db().local().uses_schema_commitlog()) {
co_await proxy.local().get_db().local().apply(freeze(mutations), db::no_timeout);
} else {
co_await proxy.local().mutate_locally(std::move(mutations), tracing::trace_state_ptr());
if (do_flush) {
auto& db = proxy.local().get_db();
co_await max_concurrent_for_each(column_families, max_concurrent, [&db] (const table_id& id) -> future<> {
return replica::database::flush_table_on_all_shards(db, id);
});
}
}
co_await proxy.local().get_db().local().apply(freeze(mutations), db::no_timeout);
// with new data applied
auto&& new_keyspaces = co_await read_schema_for_keyspaces(proxy, KEYSPACES, keyspaces);

View File

@@ -1571,9 +1571,6 @@ future<> system_keyspace::update_tokens(gms::inet_address ep, const std::unorder
slogger.debug("INSERT INTO system.{} (peer, tokens) VALUES ({}, {})", PEERS, ep, tokens);
auto set_type = set_type_impl::get_instance(utf8_type, true);
co_await execute_cql(req, ep.addr(), make_set_value(set_type, prepare_tokens(tokens))).discard_result();
if (!_db.uses_schema_commitlog()) {
co_await force_blocking_flush(PEERS);
}
}
@@ -1679,7 +1676,7 @@ future<> system_keyspace::set_scylla_local_param_as(const sstring& key, const T&
sstring req = format("UPDATE system.{} SET value = ? WHERE key = ?", system_keyspace::SCYLLA_LOCAL);
auto type = data_type_for<T>();
co_await execute_cql(req, type->to_string_impl(data_value(value)), key).discard_result();
if (visible_before_cl_replay || !_db.uses_schema_commitlog()) {
if (visible_before_cl_replay) {
co_await force_blocking_flush(SCYLLA_LOCAL);
}
}
@@ -1718,9 +1715,6 @@ future<> system_keyspace::remove_endpoint(gms::inet_address ep) {
sstring req = format("DELETE FROM system.{} WHERE peer = ?", PEERS);
slogger.debug("DELETE FROM system.{} WHERE peer = {}", PEERS, ep);
co_await execute_cql(req, ep.addr()).discard_result();
if (!_db.uses_schema_commitlog()) {
co_await force_blocking_flush(PEERS);
}
}
future<> system_keyspace::update_tokens(const std::unordered_set<dht::token>& tokens) {
@@ -1731,9 +1725,6 @@ future<> system_keyspace::update_tokens(const std::unordered_set<dht::token>& to
sstring req = format("INSERT INTO system.{} (key, tokens) VALUES (?, ?)", LOCAL);
auto set_type = set_type_impl::get_instance(utf8_type, true);
co_await execute_cql(req, sstring(LOCAL), make_set_value(set_type, prepare_tokens(tokens)));
if (!_db.uses_schema_commitlog()) {
co_await force_blocking_flush(PEERS);
}
}
future<> system_keyspace::force_blocking_flush(sstring cfname) {
@@ -1779,9 +1770,6 @@ future<> system_keyspace::update_cdc_generation_id(cdc::generation_id gen_id) {
sstring(v3::CDC_LOCAL), id.ts, id.id);
}
), gen_id);
if (!_db.uses_schema_commitlog()) {
co_await force_blocking_flush(v3::CDC_LOCAL);
}
}
future<std::optional<cdc::generation_id>> system_keyspace::get_cdc_generation_id() {
@@ -1863,9 +1851,6 @@ future<> system_keyspace::set_bootstrap_state(bootstrap_state state) {
sstring req = format("INSERT INTO system.{} (key, bootstrapped) VALUES (?, ?)", LOCAL);
co_await execute_cql(req, sstring(LOCAL), state_name).discard_result();
if (!_db.uses_schema_commitlog()) {
co_await force_blocking_flush(LOCAL);
}
co_await container().invoke_on_all([state] (auto& sys_ks) {
sys_ks._cache->_state = state;
});
@@ -2065,9 +2050,6 @@ future<int> system_keyspace::increment_and_get_generation() {
}
req = format("INSERT INTO system.{} (key, gossip_generation) VALUES ('{}', ?)", LOCAL, LOCAL);
co_await _qp.execute_internal(req, {generation.value()}, cql3::query_processor::cache_internal::yes);
if (!_db.uses_schema_commitlog()) {
co_await force_blocking_flush(LOCAL);
}
co_return generation;
}

View File

@@ -139,6 +139,7 @@ std::set<std::string_view> feature_service::supported_feature_set() const {
"UNBOUNDED_RANGE_TOMBSTONES"sv,
"MC_SSTABLE_FORMAT"sv,
"COMPUTED_COLUMNS"sv,
"SCHEMA_COMMITLOG"sv,
};
if (is_test_only_feature_deprecated()) {

View File

@@ -121,7 +121,6 @@ public:
gms::feature parallelized_aggregation { *this, "PARALLELIZED_AGGREGATION"sv };
gms::feature keyspace_storage_options { *this, "KEYSPACE_STORAGE_OPTIONS"sv };
gms::feature typed_errors_in_read_rpc { *this, "TYPED_ERRORS_IN_READ_RPC"sv };
gms::feature schema_commitlog { *this, "SCHEMA_COMMITLOG"sv };
gms::feature uda_native_parallelized_aggregation { *this, "UDA_NATIVE_PARALLELIZED_AGGREGATION"sv };
gms::feature aggregate_storage_options { *this, "AGGREGATE_STORAGE_OPTIONS"sv };
gms::feature collection_indexing { *this, "COLLECTION_INDEXING"sv };

View File

@@ -1161,7 +1161,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
when_all_succeed(feature_service.local().on_system_tables_loaded(sys_ks.local()),
sst_format_selector.on_system_tables_loaded(sys_ks.local())).get();
db.local().maybe_init_schema_commitlog();
db.local().init_schema_commitlog();
// Mark all the system tables writable and assign the proper commitlog to them.
sys_ks.invoke_on_all(&db::system_keyspace::mark_writable).get();
@@ -1410,10 +1410,6 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
});
if (raft_gr.local().is_enabled()) {
if (!db.local().uses_schema_commitlog()) {
startlog.error("Bad configuration: consistent_cluster_management requires schema commit log to be enabled");
throw bad_configuration_error();
}
supervisor::notify("starting Raft Group Registry service");
raft_gr.invoke_on_all(&service::raft_group_registry::start).get();
} else {

View File

@@ -392,11 +392,6 @@ database::database(const db::config& cfg, database_config dbcfg, service::migrat
if (_dbcfg.sstables_format) {
set_format(*_dbcfg.sstables_format);
}
// Schema commitlog can only be initialized on the null shard.
if (this_shard_id() != 0) {
_uses_schema_commitlog = false;
}
}
const db::extensions& database::extensions() const {
@@ -942,21 +937,9 @@ static bool is_system_table(const schema& s) {
|| s.ks_name() == db::system_distributed_keyspace::NAME_EVERYWHERE;
}
void database::maybe_init_schema_commitlog() {
void database::init_schema_commitlog() {
assert(this_shard_id() == 0);
if (!_feat.schema_commitlog && !_cfg.force_schema_commit_log()) {
dblog.info("Not using schema commit log.");
_listeners.push_back(_feat.schema_commitlog.when_enabled([] {
dblog.warn("All nodes can now switch to use the schema commit log. Restart is needed for this to take effect.");
}));
_uses_schema_commitlog = false;
return;
}
dblog.info("Using schema commit log.");
_uses_schema_commitlog = true;
db::commitlog::config c;
c.commit_log_location = _cfg.schema_commitlog_directory();
c.fname_prefix = db::schema_tables::COMMITLOG_FILENAME_PREFIX;
@@ -1005,7 +988,7 @@ future<> database::create_local_system_table(
}
db::commitlog* database::commitlog_for(const schema_ptr& schema) {
return schema->static_props().use_schema_commitlog && uses_schema_commitlog()
return schema->static_props().use_schema_commitlog
? _schema_commitlog.get()
: _commitlog.get();
}
@@ -1837,13 +1820,6 @@ future<reader_permit> database::obtain_reader_permit(schema_ptr schema, const ch
return obtain_reader_permit(find_column_family(std::move(schema)), op_name, timeout, std::move(trace_ptr));
}
bool database::uses_schema_commitlog() const {
if (!_uses_schema_commitlog.has_value()) [[unlikely]] {
on_internal_error(dblog, format("schema commitlog is not initialized yet"));
}
return *_uses_schema_commitlog;
}
bool database::is_user_semaphore(const reader_concurrency_semaphore& semaphore) const {
return &semaphore != &_streaming_concurrency_sem
&& &semaphore != &_compaction_concurrency_sem

View File

@@ -1419,7 +1419,6 @@ private:
bool _enable_incremental_backups = false;
bool _shutdown = false;
bool _enable_autocompaction_toggle = false;
std::optional<bool> _uses_schema_commitlog;
query::querier_cache _querier_cache;
std::unique_ptr<db::large_data_handler> _large_data_handler;
@@ -1578,7 +1577,7 @@ public:
future<> create_local_system_table(
schema_ptr table, bool write_in_user_memory, locator::effective_replication_map_factory&);
void maybe_init_schema_commitlog();
void init_schema_commitlog();
using is_new_cf = bool_class<struct is_new_cf_tag>;
future<> add_column_family_and_make_directory(schema_ptr schema, is_new_cf is_new);
@@ -1819,8 +1818,6 @@ public:
return _sst_dir_semaphore;
}
bool uses_schema_commitlog() const;
bool is_user_semaphore(const reader_concurrency_semaphore& semaphore) const;
};

View File

@@ -621,7 +621,7 @@ private:
auto stop_sys_kd = defer([this] { _sys_ks.stop().get(); });
replica::distributed_loader::init_system_keyspace(_sys_ks, _erm_factory, _db).get();
_db.local().maybe_init_schema_commitlog();
_db.local().init_schema_commitlog();
_sys_ks.invoke_on_all(&db::system_keyspace::mark_writable).get();
auto host_id = cfg_in.host_id;

View File

@@ -110,8 +110,6 @@ def make_scylla_conf(workdir: pathlib.Path, host_addr: str, seed_addrs: List[str
'reader_concurrency_semaphore_serialize_limit_multiplier': 0,
'reader_concurrency_semaphore_kill_limit_multiplier': 0,
'force_schema_commit_log': True,
}
# Seastar options can not be passed through scylla.yaml, use command line