From c8ee7d4499cb5a15d3e66b51f2e8b6ee24e03791 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Patryk=20J=C4=99drzejczak?= Date: Fri, 1 Dec 2023 12:29:34 +0100 Subject: [PATCH] db: make schema commitlog feature mandatory Using consistent cluster management and not using schema commitlog ends with a bad configuration throw during bootstrap. Soon, we will make consistent cluster management mandatory. This forces us to also make schema commitlog mandatory, which we do in this patch. A booting node decides to use schema commitlog if at least one of the two statements below is true: - the node has `force_schema_commitlog=true` config, - the node knows that the cluster supports the `SCHEMA_COMMITLOG` cluster feature. The `SCHEMA_COMMITLOG` cluster feature has been added in version 5.1. This patch is supposed to be a part of version 6.0. We don't support a direct upgrade from 5.1 to 6.0 because it skips two versions - 5.2 and 5.4. So, in a supported upgrade we can assume that the version which we upgrade from has schema commitlog. This means that we don't need to check the `SCHEMA_COMMITLOG` feature during an upgrade. The reasoning above also applies to Scylla Enterprise. Version 2024.2 will be based on 6.0. Probably, we will only support an upgrade to 2024.2 from 2024.1, which is based on 5.4. But even if we support an upgrade from 2023.x, this patch won't break anything because 2023.1 is based on 5.2, which has schema commitlog. Upgrades from 2022.x definitely won't be supported. When we populate a new cluster, we can use the `force_schema_commitlog=true` config to use schema commitlog unconditionally. Then, the cluster feature check is irrelevant. This check could fail because we initiate schema commitlog before we learn about the features. The `force_schema_commitlog=true` config is especially useful when we want to use consistent cluster management. Failing feature checks would lead to crashes during initial bootstraps. Moreover, there is no point in creating a new cluster with `consistent_cluster_management=true` and `force_schema_commitlog=false`. It would just cause some initial bootstraps to fail, and after successful restarts, the result would be the same as if we used `force_schema_commitlog=true` from the start. In conclusion, we can unconditionally use schema commitlog without any checks in 6.0 because we can always safely upgrade a cluster and start a new cluster. Apart from making schema commitlog mandatory, this patch adds two changes that are its consequences: - making the unneeded `force_schema_commitlog` config unused, - deprecating the `SCHEMA_COMMITLOG` feature, which is always assumed to be true. Closes scylladb/scylladb#16254 --- conf/scylla.yaml | 12 ------------ db/config.cc | 2 +- db/schema_tables.cc | 13 +------------ db/system_keyspace.cc | 20 +------------------- gms/feature_service.cc | 1 + gms/feature_service.hh | 1 - main.cc | 6 +----- replica/database.cc | 28 ++-------------------------- replica/database.hh | 5 +---- test/lib/cql_test_env.cc | 2 +- test/pylib/scylla_cluster.py | 2 -- 11 files changed, 9 insertions(+), 83 deletions(-) diff --git a/conf/scylla.yaml b/conf/scylla.yaml index 5b5ec32320..91987c5bdc 100644 --- a/conf/scylla.yaml +++ b/conf/scylla.yaml @@ -559,18 +559,6 @@ murmur3_partitioner_ignore_msb_bits: 12 # Set to `false` to fall-back to the old algorithm. # enable_parallelized_aggregation: true -# When enabled, the node will start using separate commit log for schema changes -# right from the boot. Without this, it only happens following a restart after -# all nodes in the cluster were upgraded. -# -# Having this option ensures that new installations don't need a rolling restart -# to use the feature, but upgrades do. -# -# WARNING: It's unsafe to set this to false if the node previously booted -# with the schema commit log enabled. In such case, some schema changes -# may be lost if the node was not cleanly stopped. -force_schema_commit_log: true - # Time for which task manager task is kept in memory after it completes. task_ttl_in_seconds: 10 diff --git a/db/config.cc b/db/config.cc index cc9d1c67b7..22dfa27119 100644 --- a/db/config.cc +++ b/db/config.cc @@ -1087,7 +1087,7 @@ db::config::config(std::shared_ptr exts) , restrict_future_timestamp(this, "restrict_future_timestamp",liveness::LiveUpdate, value_status::Used, true, "Controls whether to detect and forbid unreasonable USING TIMESTAMP, more than 3 days into the future.") , ignore_truncation_record(this, "unsafe_ignore_truncation_record", value_status::Used, false, "Ignore truncation record stored in system tables as if tables were never truncated.") - , force_schema_commit_log(this, "force_schema_commit_log", value_status::Used, false, + , force_schema_commit_log(this, "force_schema_commit_log", value_status::Unused, false, "Use separate schema commit log unconditionally rater than after restart following discovery of cluster-wide support for it.") , task_ttl_seconds(this, "task_ttl_in_seconds", liveness::LiveUpdate, value_status::Used, 0, "Time for which information about finished task stays in memory.") , nodeops_watchdog_timeout_seconds(this, "nodeops_watchdog_timeout_seconds", liveness::LiveUpdate, value_status::Used, 120, "Time in seconds after which node operations abort when not hearing from the coordinator") diff --git a/db/schema_tables.cc b/db/schema_tables.cc index 6a09253fac..bcb015c4c3 100644 --- a/db/schema_tables.cc +++ b/db/schema_tables.cc @@ -1277,18 +1277,7 @@ static future<> do_merge_schema(distributed& proxy, shar auto old_aggregates = co_await read_schema_for_keyspaces(proxy, AGGREGATES, keyspaces); auto old_scylla_aggregates = co_await read_schema_for_keyspaces(proxy, SCYLLA_AGGREGATES, keyspaces); - if (proxy.local().get_db().local().uses_schema_commitlog()) { - co_await proxy.local().get_db().local().apply(freeze(mutations), db::no_timeout); - } else { - co_await proxy.local().mutate_locally(std::move(mutations), tracing::trace_state_ptr()); - - if (do_flush) { - auto& db = proxy.local().get_db(); - co_await max_concurrent_for_each(column_families, max_concurrent, [&db] (const table_id& id) -> future<> { - return replica::database::flush_table_on_all_shards(db, id); - }); - } - } + co_await proxy.local().get_db().local().apply(freeze(mutations), db::no_timeout); // with new data applied auto&& new_keyspaces = co_await read_schema_for_keyspaces(proxy, KEYSPACES, keyspaces); diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index d5943af6a2..b039adff1f 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -1571,9 +1571,6 @@ future<> system_keyspace::update_tokens(gms::inet_address ep, const std::unorder slogger.debug("INSERT INTO system.{} (peer, tokens) VALUES ({}, {})", PEERS, ep, tokens); auto set_type = set_type_impl::get_instance(utf8_type, true); co_await execute_cql(req, ep.addr(), make_set_value(set_type, prepare_tokens(tokens))).discard_result(); - if (!_db.uses_schema_commitlog()) { - co_await force_blocking_flush(PEERS); - } } @@ -1679,7 +1676,7 @@ future<> system_keyspace::set_scylla_local_param_as(const sstring& key, const T& sstring req = format("UPDATE system.{} SET value = ? WHERE key = ?", system_keyspace::SCYLLA_LOCAL); auto type = data_type_for(); co_await execute_cql(req, type->to_string_impl(data_value(value)), key).discard_result(); - if (visible_before_cl_replay || !_db.uses_schema_commitlog()) { + if (visible_before_cl_replay) { co_await force_blocking_flush(SCYLLA_LOCAL); } } @@ -1718,9 +1715,6 @@ future<> system_keyspace::remove_endpoint(gms::inet_address ep) { sstring req = format("DELETE FROM system.{} WHERE peer = ?", PEERS); slogger.debug("DELETE FROM system.{} WHERE peer = {}", PEERS, ep); co_await execute_cql(req, ep.addr()).discard_result(); - if (!_db.uses_schema_commitlog()) { - co_await force_blocking_flush(PEERS); - } } future<> system_keyspace::update_tokens(const std::unordered_set& tokens) { @@ -1731,9 +1725,6 @@ future<> system_keyspace::update_tokens(const std::unordered_set& to sstring req = format("INSERT INTO system.{} (key, tokens) VALUES (?, ?)", LOCAL); auto set_type = set_type_impl::get_instance(utf8_type, true); co_await execute_cql(req, sstring(LOCAL), make_set_value(set_type, prepare_tokens(tokens))); - if (!_db.uses_schema_commitlog()) { - co_await force_blocking_flush(PEERS); - } } future<> system_keyspace::force_blocking_flush(sstring cfname) { @@ -1779,9 +1770,6 @@ future<> system_keyspace::update_cdc_generation_id(cdc::generation_id gen_id) { sstring(v3::CDC_LOCAL), id.ts, id.id); } ), gen_id); - if (!_db.uses_schema_commitlog()) { - co_await force_blocking_flush(v3::CDC_LOCAL); - } } future> system_keyspace::get_cdc_generation_id() { @@ -1863,9 +1851,6 @@ future<> system_keyspace::set_bootstrap_state(bootstrap_state state) { sstring req = format("INSERT INTO system.{} (key, bootstrapped) VALUES (?, ?)", LOCAL); co_await execute_cql(req, sstring(LOCAL), state_name).discard_result(); - if (!_db.uses_schema_commitlog()) { - co_await force_blocking_flush(LOCAL); - } co_await container().invoke_on_all([state] (auto& sys_ks) { sys_ks._cache->_state = state; }); @@ -2065,9 +2050,6 @@ future system_keyspace::increment_and_get_generation() { } req = format("INSERT INTO system.{} (key, gossip_generation) VALUES ('{}', ?)", LOCAL, LOCAL); co_await _qp.execute_internal(req, {generation.value()}, cql3::query_processor::cache_internal::yes); - if (!_db.uses_schema_commitlog()) { - co_await force_blocking_flush(LOCAL); - } co_return generation; } diff --git a/gms/feature_service.cc b/gms/feature_service.cc index a823fb0103..feb4d1fcf8 100644 --- a/gms/feature_service.cc +++ b/gms/feature_service.cc @@ -139,6 +139,7 @@ std::set feature_service::supported_feature_set() const { "UNBOUNDED_RANGE_TOMBSTONES"sv, "MC_SSTABLE_FORMAT"sv, "COMPUTED_COLUMNS"sv, + "SCHEMA_COMMITLOG"sv, }; if (is_test_only_feature_deprecated()) { diff --git a/gms/feature_service.hh b/gms/feature_service.hh index 8c492ff6d2..03c4823d47 100644 --- a/gms/feature_service.hh +++ b/gms/feature_service.hh @@ -121,7 +121,6 @@ public: gms::feature parallelized_aggregation { *this, "PARALLELIZED_AGGREGATION"sv }; gms::feature keyspace_storage_options { *this, "KEYSPACE_STORAGE_OPTIONS"sv }; gms::feature typed_errors_in_read_rpc { *this, "TYPED_ERRORS_IN_READ_RPC"sv }; - gms::feature schema_commitlog { *this, "SCHEMA_COMMITLOG"sv }; gms::feature uda_native_parallelized_aggregation { *this, "UDA_NATIVE_PARALLELIZED_AGGREGATION"sv }; gms::feature aggregate_storage_options { *this, "AGGREGATE_STORAGE_OPTIONS"sv }; gms::feature collection_indexing { *this, "COLLECTION_INDEXING"sv }; diff --git a/main.cc b/main.cc index 63777e55b6..c03652e0bc 100644 --- a/main.cc +++ b/main.cc @@ -1161,7 +1161,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl when_all_succeed(feature_service.local().on_system_tables_loaded(sys_ks.local()), sst_format_selector.on_system_tables_loaded(sys_ks.local())).get(); - db.local().maybe_init_schema_commitlog(); + db.local().init_schema_commitlog(); // Mark all the system tables writable and assign the proper commitlog to them. sys_ks.invoke_on_all(&db::system_keyspace::mark_writable).get(); @@ -1410,10 +1410,6 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl }); if (raft_gr.local().is_enabled()) { - if (!db.local().uses_schema_commitlog()) { - startlog.error("Bad configuration: consistent_cluster_management requires schema commit log to be enabled"); - throw bad_configuration_error(); - } supervisor::notify("starting Raft Group Registry service"); raft_gr.invoke_on_all(&service::raft_group_registry::start).get(); } else { diff --git a/replica/database.cc b/replica/database.cc index 9564880d3d..3015574c99 100644 --- a/replica/database.cc +++ b/replica/database.cc @@ -392,11 +392,6 @@ database::database(const db::config& cfg, database_config dbcfg, service::migrat if (_dbcfg.sstables_format) { set_format(*_dbcfg.sstables_format); } - - // Schema commitlog can only be initialized on the null shard. - if (this_shard_id() != 0) { - _uses_schema_commitlog = false; - } } const db::extensions& database::extensions() const { @@ -942,21 +937,9 @@ static bool is_system_table(const schema& s) { || s.ks_name() == db::system_distributed_keyspace::NAME_EVERYWHERE; } -void database::maybe_init_schema_commitlog() { +void database::init_schema_commitlog() { assert(this_shard_id() == 0); - if (!_feat.schema_commitlog && !_cfg.force_schema_commit_log()) { - dblog.info("Not using schema commit log."); - _listeners.push_back(_feat.schema_commitlog.when_enabled([] { - dblog.warn("All nodes can now switch to use the schema commit log. Restart is needed for this to take effect."); - })); - _uses_schema_commitlog = false; - return; - } - - dblog.info("Using schema commit log."); - _uses_schema_commitlog = true; - db::commitlog::config c; c.commit_log_location = _cfg.schema_commitlog_directory(); c.fname_prefix = db::schema_tables::COMMITLOG_FILENAME_PREFIX; @@ -1005,7 +988,7 @@ future<> database::create_local_system_table( } db::commitlog* database::commitlog_for(const schema_ptr& schema) { - return schema->static_props().use_schema_commitlog && uses_schema_commitlog() + return schema->static_props().use_schema_commitlog ? _schema_commitlog.get() : _commitlog.get(); } @@ -1837,13 +1820,6 @@ future database::obtain_reader_permit(schema_ptr schema, const ch return obtain_reader_permit(find_column_family(std::move(schema)), op_name, timeout, std::move(trace_ptr)); } -bool database::uses_schema_commitlog() const { - if (!_uses_schema_commitlog.has_value()) [[unlikely]] { - on_internal_error(dblog, format("schema commitlog is not initialized yet")); - } - return *_uses_schema_commitlog; -} - bool database::is_user_semaphore(const reader_concurrency_semaphore& semaphore) const { return &semaphore != &_streaming_concurrency_sem && &semaphore != &_compaction_concurrency_sem diff --git a/replica/database.hh b/replica/database.hh index 8e33fde659..bff06cbd54 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -1419,7 +1419,6 @@ private: bool _enable_incremental_backups = false; bool _shutdown = false; bool _enable_autocompaction_toggle = false; - std::optional _uses_schema_commitlog; query::querier_cache _querier_cache; std::unique_ptr _large_data_handler; @@ -1578,7 +1577,7 @@ public: future<> create_local_system_table( schema_ptr table, bool write_in_user_memory, locator::effective_replication_map_factory&); - void maybe_init_schema_commitlog(); + void init_schema_commitlog(); using is_new_cf = bool_class; future<> add_column_family_and_make_directory(schema_ptr schema, is_new_cf is_new); @@ -1819,8 +1818,6 @@ public: return _sst_dir_semaphore; } - bool uses_schema_commitlog() const; - bool is_user_semaphore(const reader_concurrency_semaphore& semaphore) const; }; diff --git a/test/lib/cql_test_env.cc b/test/lib/cql_test_env.cc index 8f5f517f6a..72e6eb2d8f 100644 --- a/test/lib/cql_test_env.cc +++ b/test/lib/cql_test_env.cc @@ -621,7 +621,7 @@ private: auto stop_sys_kd = defer([this] { _sys_ks.stop().get(); }); replica::distributed_loader::init_system_keyspace(_sys_ks, _erm_factory, _db).get(); - _db.local().maybe_init_schema_commitlog(); + _db.local().init_schema_commitlog(); _sys_ks.invoke_on_all(&db::system_keyspace::mark_writable).get(); auto host_id = cfg_in.host_id; diff --git a/test/pylib/scylla_cluster.py b/test/pylib/scylla_cluster.py index e7b5abb3ed..085065e042 100644 --- a/test/pylib/scylla_cluster.py +++ b/test/pylib/scylla_cluster.py @@ -110,8 +110,6 @@ def make_scylla_conf(workdir: pathlib.Path, host_addr: str, seed_addrs: List[str 'reader_concurrency_semaphore_serialize_limit_multiplier': 0, 'reader_concurrency_semaphore_kill_limit_multiplier': 0, - - 'force_schema_commit_log': True, } # Seastar options can not be passed through scylla.yaml, use command line