diff --git a/compaction/table_state.hh b/compaction/table_state.hh index 5920391387..60c2b5089e 100644 --- a/compaction/table_state.hh +++ b/compaction/table_state.hh @@ -53,7 +53,7 @@ public: virtual bool tombstone_gc_enabled() const noexcept = 0; virtual const tombstone_gc_state& get_tombstone_gc_state() const noexcept = 0; virtual compaction_backlog_tracker& get_backlog_tracker() = 0; - virtual const std::string& get_group_id() const noexcept = 0; + virtual const std::string get_group_id() const noexcept = 0; virtual seastar::condition_variable& get_staging_done_condition() noexcept = 0; }; diff --git a/db/config.cc b/db/config.cc index 7fcec02766..da57ae4b8a 100644 --- a/db/config.cc +++ b/db/config.cc @@ -1002,7 +1002,6 @@ db::config::config(std::shared_ptr exts) , nodeops_heartbeat_interval_seconds(this, "nodeops_heartbeat_interval_seconds", liveness::LiveUpdate, value_status::Used, 10, "Period of heartbeat ticks in node operations") , cache_index_pages(this, "cache_index_pages", liveness::LiveUpdate, value_status::Used, false, "Keep SSTable index pages in the global cache after a SSTable read. Expected to improve performance for workloads with big partitions, but may degrade performance for workloads with small partitions.") - , x_log2_compaction_groups(this, "x_log2_compaction_groups", value_status::Used, 0, "Controls static number of compaction groups per table per shard. For X groups, set the option to log (base 2) of X. Example: Value of 3 implies 8 groups.") , consistent_cluster_management(this, "consistent_cluster_management", value_status::Used, true, "Use RAFT for cluster management and DDL") , wasm_cache_memory_fraction(this, "wasm_cache_memory_fraction", value_status::Used, 0.01, "Maximum total size of all WASM instances stored in the cache as fraction of total shard memory") , wasm_cache_timeout_in_ms(this, "wasm_cache_timeout_in_ms", value_status::Used, 5000, "Time after which an instance is evicted from the cache") diff --git a/db/config.hh b/db/config.hh index 503c5dcb8a..e080b6b045 100644 --- a/db/config.hh +++ b/db/config.hh @@ -425,8 +425,6 @@ public: named_value cache_index_pages; - named_value x_log2_compaction_groups; - named_value consistent_cluster_management; named_value wasm_cache_memory_fraction; diff --git a/locator/token_metadata.cc b/locator/token_metadata.cc index fcdb9cc193..921cca71bd 100644 --- a/locator/token_metadata.cc +++ b/locator/token_metadata.cc @@ -334,7 +334,7 @@ token_metadata::tokens_iterator& token_metadata::tokens_iterator::operator++() { } host_id token_metadata::get_my_id() const { - return get_host_id(utils::fb_utilities::get_broadcast_address()); + return get_topology().get_config().this_host_id; } inline diff --git a/locator/topology.hh b/locator/topology.hh index 3c2ef3945b..1432f76def 100644 --- a/locator/topology.hh +++ b/locator/topology.hh @@ -161,6 +161,7 @@ class topology { public: struct config { inet_address this_endpoint; + host_id this_host_id; endpoint_dc_rack local_dc_rack; bool disable_proximity_sorting = false; @@ -177,6 +178,10 @@ public: public: const config& get_config() const noexcept { return _cfg; } + void set_host_id_cfg(host_id this_host_id) { + _cfg.this_host_id = this_host_id; + } + const node* this_node() const noexcept { return _this_node; } diff --git a/main.cc b/main.cc index a281236ed7..4ffea3e95d 100644 --- a/main.cc +++ b/main.cc @@ -1103,6 +1103,15 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl replica::distributed_loader::init_system_keyspace(sys_ks, erm_factory, db, *cfg, system_table_load_phase::phase1).get(); cfg->host_id = sys_ks.local().load_local_host_id().get0(); + shared_token_metadata::mutate_on_all_shards(token_metadata, [hostid = cfg->host_id, endpoint = utils::fb_utilities::get_broadcast_address()] (locator::token_metadata& tm) { + // Makes local host id available in topology cfg as soon as possible. + // Raft topology discard the endpoint-to-id map, so the local id can + // still be found in the config. + tm.get_topology().set_host_id_cfg(hostid); + tm.get_topology().add_or_update_endpoint(endpoint, hostid); + return make_ready_future<>(); + }).get(); + netw::messaging_service::config mscfg; mscfg.id = cfg->host_id; @@ -1291,11 +1300,6 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl // engine().at_exit([&qp] { return qp.stop(); }); sstables::init_metrics().get(); - shared_token_metadata::mutate_on_all_shards(token_metadata, [hostid = cfg->host_id, endpoint = utils::fb_utilities::get_broadcast_address()] (locator::token_metadata& tm) { - tm.get_topology().add_or_update_endpoint(endpoint, hostid); - return make_ready_future<>(); - }).get(); - supervisor::notify("initializing batchlog manager"); db::batchlog_manager_config bm_cfg; bm_cfg.write_request_timeout = cfg->write_request_timeout_in_ms() * 1ms; diff --git a/replica/compaction_group.hh b/replica/compaction_group.hh index b1c6d3b837..d4db06fda5 100644 --- a/replica/compaction_group.hh +++ b/replica/compaction_group.hh @@ -33,7 +33,7 @@ class compaction_group { table& _t; class table_state; std::unique_ptr _table_state; - std::string _group_id; + const size_t _group_id; // Tokens included in this compaction_groups dht::token_range _token_range; compaction::compaction_strategy_state _compaction_strategy_state; @@ -64,9 +64,9 @@ private: future<> delete_sstables_atomically(std::vector sstables_to_remove); public: - compaction_group(table& t, std::string gid, dht::token_range token_range); + compaction_group(table& t, size_t gid, dht::token_range token_range); - const std::string& get_group_id() const noexcept { + size_t group_id() const noexcept { return _group_id; } @@ -133,7 +133,14 @@ public: } }; -// Used by the tests to increase the default number of compaction groups by increasing the minimum to X. -void set_minimum_x_log2_compaction_groups(unsigned x_log2_compaction_groups); +using compaction_group_vector = utils::chunked_vector>; + +class compaction_group_manager { +public: + virtual ~compaction_group_manager() {} + virtual compaction_group_vector make_compaction_groups() const = 0; + virtual size_t compaction_group_of(dht::token) const = 0; + virtual size_t log2_compaction_groups() const = 0; +}; } diff --git a/replica/database.cc b/replica/database.cc index 7e68a9aa1e..4a854e13d6 100644 --- a/replica/database.cc +++ b/replica/database.cc @@ -1379,7 +1379,6 @@ keyspace::make_column_family_config(const schema& s, const database& db) const { cfg.view_update_concurrency_semaphore = _config.view_update_concurrency_semaphore; cfg.view_update_concurrency_semaphore_limit = _config.view_update_concurrency_semaphore_limit; cfg.data_listeners = &db.data_listeners(); - cfg.x_log2_compaction_groups = db_config.x_log2_compaction_groups(); cfg.enable_compacting_data_for_streaming_and_repair = db_config.enable_compacting_data_for_streaming_and_repair(); return cfg; diff --git a/replica/database.hh b/replica/database.hh index 2ac31fe336..c87bf71db5 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -323,6 +323,8 @@ using column_family_stats = table_stats; class database_sstable_write_monitor; class compaction_group; +class compaction_group_manager; +using compaction_group_vector = utils::chunked_vector>; using enable_backlog_tracker = bool_class; @@ -419,13 +421,10 @@ private: lw_shared_ptr make_memory_only_memtable_list(); lw_shared_ptr make_memtable_list(compaction_group& cg); - // The value of the parameter controls the number of compaction groups in this table. - // 0 (default) means 1 compaction group. 3 means 8 compaction groups. - const unsigned _x_log2_compaction_groups = 0; - compaction_manager& _compaction_manager; sstables::compaction_strategy _compaction_strategy; - std::vector> _compaction_groups; + std::unique_ptr _cg_manager; + compaction_group_vector _compaction_groups; // Compound SSTable set for all the compaction groups, which is useful for operations spanning all of them. lw_shared_ptr _sstables; // Control background fibers waiting for sstables to be deleted @@ -544,7 +543,7 @@ public: private: using compaction_group_ptr = std::unique_ptr; - std::vector> make_compaction_groups(); + std::unique_ptr make_compaction_group_manager(); // Return compaction group if table owns a single one. Otherwise, null is returned. compaction_group* single_compaction_group_if_available() const noexcept; // Select a compaction group from a given token. @@ -554,7 +553,7 @@ private: // Select a compaction group from a given sstable based on its token range. compaction_group& compaction_group_for_sstable(const sstables::shared_sstable& sst) const noexcept; // Returns a list of all compaction groups. - const std::vector>& compaction_groups() const noexcept; + const compaction_group_vector& compaction_groups() const noexcept; // Safely iterate through compaction groups, while performing async operations on them. future<> parallel_foreach_compaction_group(std::function(compaction_group&)> action); @@ -790,6 +789,7 @@ public: db::commitlog* commitlog() { return _commitlog; } const locator::effective_replication_map_ptr& get_effective_replication_map() const { return _erm; } void update_effective_replication_map(locator::effective_replication_map_ptr); + [[gnu::always_inline]] bool uses_tablets() const; future find_partition(schema_ptr, reader_permit permit, const dht::decorated_key& key) const; future find_partition_slow(schema_ptr, reader_permit permit, const partition_key& key) const; future find_row(schema_ptr, reader_permit permit, const dht::decorated_key& partition_key, clustering_key clustering_key) const; diff --git a/replica/table.cc b/replica/table.cc index 95fa9354d7..ddc74e07c3 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -48,6 +48,7 @@ #include "dht/token.hh" #include "dht/i_partitioner.hh" #include "replica/global_table_ptr.hh" +#include "locator/tablets.hh" #include #include @@ -534,17 +535,79 @@ void table::enable_off_strategy_trigger() { do_update_off_strategy_trigger(); } -std::vector> table::make_compaction_groups() { - std::vector> ret; - auto&& ranges = dht::split_token_range_msb(_x_log2_compaction_groups); - ret.reserve(ranges.size()); - tlogger.debug("Created {} compaction groups for {}.{}", ranges.size(), _schema->ks_name(), _schema->cf_name()); - size_t i = 0; - for (auto&& range : ranges) { - auto group_id = fmt::format("{}/{}", i++, ranges.size()); - ret.emplace_back(std::make_unique(*this, std::move(group_id), std::move(range))); +class single_compaction_group_manager final : public compaction_group_manager { + replica::table& _t; +public: + single_compaction_group_manager(replica::table& t) : _t(t) {} + + compaction_group_vector make_compaction_groups() const override { + compaction_group_vector r; + r.push_back(std::make_unique(_t, size_t(0), dht::token_range::make_open_ended_both_sides())); + return r; } - return ret; + size_t compaction_group_of(dht::token) const override { + return 0; + } + size_t log2_compaction_groups() const override { + return 0; + } +}; + +class tablet_compaction_group_manager final : public compaction_group_manager { + replica::table& _t; +private: + const locator::effective_replication_map_ptr& erm() const { + return _t.get_effective_replication_map(); + } + + const schema_ptr& schema() const { + return _t.schema(); + } + + const locator::tablet_map& tablet_map() const { + // FIXME: cheaper way to retrieve tablet_map than looking up every time in tablet_metadata's map. + auto& tm = erm()->get_token_metadata(); + return tm.tablets().get_tablet_map(schema()->id()); + } +public: + tablet_compaction_group_manager(replica::table& t) : _t(t) {} + + compaction_group_vector make_compaction_groups() const override { + compaction_group_vector ret; + + auto& tmap = tablet_map(); + auto& tm = erm()->get_token_metadata(); + ret.reserve(tmap.tablet_count()); + + for (auto tid : tmap.tablet_ids()) { + auto range = tmap.get_token_range(tid); + + auto shard = tmap.get_shard(tid, tm.get_my_id()); + if (shard && *shard == this_shard_id()) { + tlogger.debug("Tablet with id {} present for {}.{}", tid, schema()->ks_name(), schema()->cf_name()); + } + // FIXME: don't allocate compaction groups for tablets that aren't present in this shard. + ret.emplace_back(std::make_unique(_t, tid.value(), std::move(range))); + } + return ret; + } + size_t compaction_group_of(dht::token t) const override { + return tablet_map().get_tablet_id(t).value(); + } + size_t log2_compaction_groups() const override { + return log2ceil(tablet_map().tablet_count()); + } +}; + +bool table::uses_tablets() const { + return _erm && _erm->get_replication_strategy().uses_tablets(); +} + +std::unique_ptr table::make_compaction_group_manager() { + if (uses_tablets()) { + return std::make_unique(*this); + } + return std::make_unique(*this); } compaction_group* table::single_compaction_group_if_available() const noexcept { @@ -552,9 +615,10 @@ compaction_group* table::single_compaction_group_if_available() const noexcept { } compaction_group& table::compaction_group_for_token(dht::token token) const noexcept { - auto idx = dht::compaction_group_of(_x_log2_compaction_groups, token); + auto idx = _cg_manager->compaction_group_of(token); if (idx >= _compaction_groups.size()) { - on_fatal_internal_error(tlogger, format("compaction_group_for_token: index out of range: idx={} size_log2={} size={} token={}", idx, _x_log2_compaction_groups, _compaction_groups.size(), token)); + on_fatal_internal_error(tlogger, format("compaction_group_for_token: index out of range: idx={} size_log2={} size={} token={}", + idx, _cg_manager->log2_compaction_groups(), _compaction_groups.size(), token)); } auto& ret = *_compaction_groups[idx]; if (!ret.token_range().contains(token, dht::token_comparator())) { @@ -577,7 +641,7 @@ compaction_group& table::compaction_group_for_sstable(const sstables::shared_sst return compaction_group_for_token(sst->get_first_decorated_key().token()); } -const std::vector>& table::compaction_groups() const noexcept { +const compaction_group_vector& table::compaction_groups() const noexcept { return _compaction_groups; } @@ -1513,10 +1577,10 @@ table::make_memtable_list(compaction_group& cg) { return make_lw_shared(std::move(seal), std::move(get_schema), _config.dirty_memory_manager, _stats, _config.memory_compaction_scheduling_group); } -compaction_group::compaction_group(table& t, std::string group_id, dht::token_range token_range) +compaction_group::compaction_group(table& t, size_t group_id, dht::token_range token_range) : _t(t) , _table_state(std::make_unique(t, *this)) - , _group_id(std::move(group_id)) + , _group_id(group_id) , _token_range(std::move(token_range)) , _compaction_strategy_state(compaction::compaction_strategy_state::make(_t._compaction_strategy)) , _memtables(_t._config.enable_disk_writes ? _t.make_memtable_list(*this) : _t.make_memory_only_memtable_list()) @@ -1542,16 +1606,6 @@ void compaction_group::clear_sstables() { _maintenance_sstables = _t.make_maintenance_sstable_set(); } -static std::atomic minimum_x_log2_compaction_groups{0}; - -void set_minimum_x_log2_compaction_groups(unsigned x_log2_compaction_groups) { - minimum_x_log2_compaction_groups.store(x_log2_compaction_groups, std::memory_order_relaxed); -} - -static inline unsigned get_x_log2_compaction_groups(unsigned x_log2_compaction_groups) { - return std::max(x_log2_compaction_groups, minimum_x_log2_compaction_groups.load(std::memory_order_relaxed)); -} - table::table(schema_ptr schema, config config, lw_shared_ptr sopts, db::commitlog* cl, compaction_manager& compaction_manager, sstables::sstables_manager& sst_manager, cell_locker_stats& cl_stats, cache_tracker& row_cache_tracker, locator::effective_replication_map_ptr erm) @@ -1563,10 +1617,10 @@ table::table(schema_ptr schema, config config, lw_shared_ptrks_name()), column_family_label(_schema->cf_name()) ) - , _x_log2_compaction_groups(get_x_log2_compaction_groups(_config.x_log2_compaction_groups)) , _compaction_manager(compaction_manager) , _compaction_strategy(make_compaction_strategy(_schema->compaction_strategy(), _schema->compaction_strategy_options())) - , _compaction_groups(make_compaction_groups()) + , _cg_manager(make_compaction_group_manager()) + , _compaction_groups(_cg_manager->make_compaction_groups()) , _sstables(make_compound_sstable_set()) , _cache(_schema, sstables_as_snapshot_source(), row_cache_tracker, is_continuous::yes) , _commitlog(cl) @@ -2819,8 +2873,8 @@ public: compaction_backlog_tracker& get_backlog_tracker() override { return _t._compaction_manager.get_backlog_tracker(*this); } - const std::string& get_group_id() const noexcept override { - return _cg.get_group_id(); + const std::string get_group_id() const noexcept override { + return fmt::format("{}/{}", _cg.group_id(), _t._compaction_groups.size()); } seastar::condition_variable& get_staging_done_condition() noexcept override { diff --git a/scylla-gdb.py b/scylla-gdb.py index 45c227f286..0852a8521e 100755 --- a/scylla-gdb.py +++ b/scylla-gdb.py @@ -4410,13 +4410,17 @@ class scylla_memtables(gdb.Command): for table in all_tables(db): gdb.write('table %s:\n' % schema_ptr(table['_schema']).table_name()) try: - for cg_ptr in std_vector(table["_compaction_groups"]): + for cg_ptr in chunked_vector(table["_compaction_groups"]): scylla_memtables.dump_compaction_group_memtables(std_unique_ptr(cg_ptr).get()) except gdb.error: try: - scylla_memtables.dump_compaction_group_memtables(std_unique_ptr(table["_compaction_group"]).get()) + for cg_ptr in std_vector(table["_compaction_groups"]): + scylla_memtables.dump_compaction_group_memtables(std_unique_ptr(cg_ptr).get()) except gdb.error: - scylla_memtables.dump_memtable_list(seastar_lw_shared_ptr(table['_memtables']).get()) # Scylla 5.1 compatibility + try: + scylla_memtables.dump_compaction_group_memtables(std_unique_ptr(table["_compaction_group"]).get()) + except gdb.error: + scylla_memtables.dump_memtable_list(seastar_lw_shared_ptr(table['_memtables']).get()) # Scylla 5.1 compatibility def escape_html(s): return s.replace('&', '&').replace('<', '<').replace('>', '>') diff --git a/test.py b/test.py index c7e90c3eb8..07abd2acbd 100755 --- a/test.py +++ b/test.py @@ -375,8 +375,6 @@ class PythonTestSuite(TestSuite): if type(cmdline_options) == str: cmdline_options = [cmdline_options] cmdline_options = merge_cmdline_options(cmdline_options, create_cfg.cmdline_from_test) - if options.x_log2_compaction_groups: - cmdline_options = merge_cmdline_options(cmdline_options, [ '--x-log2-compaction-groups={}'.format(options.x_log2_compaction_groups) ]) # There are multiple sources of config options, with increasing priority # (if two sources provide the same config option, the higher priority one wins): diff --git a/test/boost/database_test.cc b/test/boost/database_test.cc index 9f5b3abef6..f101ddd204 100644 --- a/test/boost/database_test.cc +++ b/test/boost/database_test.cc @@ -81,12 +81,12 @@ future<> do_with_cql_env_and_compaction_groups_cgs(unsigned cgs, std::functiondata_file_directories()[0])); co_await recursive_touch_directory(cfg.db_config->data_file_directories()[0]); } - cfg.db_config->x_log2_compaction_groups(cgs); + // TODO: perhaps map log2_compaction_groups into initial_tablets when creating the testing keyspace. co_await do_with_cql_env_thread(func, cfg, thread_attr); } future<> do_with_cql_env_and_compaction_groups(std::function func, cql_test_config cfg = {}, thread_attributes thread_attr = {}) { - std::vector x_log2_compaction_group_values = { 0 /* 1 CG */, 1 /* 2 CGs */ }; + std::vector x_log2_compaction_group_values = { 0 /* 1 CG */ }; for (auto x_log2_compaction_groups : x_log2_compaction_group_values) { co_await do_with_cql_env_and_compaction_groups_cgs(x_log2_compaction_groups, func, cfg, thread_attr); } diff --git a/test/boost/tablets_test.cc b/test/boost/tablets_test.cc index b66f24c351..f3ef0ab73d 100644 --- a/test/boost/tablets_test.cc +++ b/test/boost/tablets_test.cc @@ -433,7 +433,7 @@ SEASTAR_TEST_CASE(test_sharder) { auto table1 = table_id(utils::UUID_gen::get_time_UUID()); - token_metadata tokm(token_metadata::config{}); + token_metadata tokm(token_metadata::config{ .topo_cfg{ .this_host_id = h1 } }); tokm.get_topology().add_or_update_endpoint(utils::fb_utilities::get_broadcast_address(), h1); std::vector tablet_ids; diff --git a/test/lib/cql_test_env.cc b/test/lib/cql_test_env.cc index 1729e806da..c90e9aef14 100644 --- a/test/lib/cql_test_env.cc +++ b/test/lib/cql_test_env.cc @@ -669,6 +669,10 @@ private: if (!cfg->host_id) { cfg->host_id = _sys_ks.local().load_local_host_id().get0(); } + locator::shared_token_metadata::mutate_on_all_shards(_token_metadata, [hostid = cfg->host_id] (locator::token_metadata& tm) { + tm.get_topology().set_host_id_cfg(hostid); + return make_ready_future<>(); + }).get(); // don't start listening so tests can be run in parallel _ms.start(cfg->host_id, listen, std::move(7000)).get(); diff --git a/test/lib/test_services.cc b/test/lib/test_services.cc index c1fef962c9..f5abac3093 100644 --- a/test/lib/test_services.cc +++ b/test/lib/test_services.cc @@ -125,7 +125,7 @@ public: compaction_backlog_tracker& get_backlog_tracker() override { return _backlog_tracker; } - const std::string& get_group_id() const noexcept override { + const std::string get_group_id() const noexcept override { return _group_id; } seastar::condition_variable& get_staging_done_condition() noexcept override { @@ -303,7 +303,7 @@ std::pair scylla_tests_cmdline_options_processor::process_cmdline_o unsigned x_log2_compaction_groups = vm["x-log2-compaction-groups"].as(); if (x_log2_compaction_groups) { std::cout << "Setting x_log2_compaction_groups to " << x_log2_compaction_groups << std::endl; - replica::set_minimum_x_log2_compaction_groups(x_log2_compaction_groups); + // TODO: perhaps we can later map it into initial_tablets. auto [_new_argc, _new_argv] = rebuild_arg_list_without(argc, argv, "--x-log2-compaction-groups", true); return std::make_pair(_new_argc, _new_argv); } diff --git a/tools/scylla-sstable.cc b/tools/scylla-sstable.cc index c745e65e6c..5aae089702 100644 --- a/tools/scylla-sstable.cc +++ b/tools/scylla-sstable.cc @@ -853,7 +853,7 @@ public: virtual bool tombstone_gc_enabled() const noexcept override { return false; } virtual const tombstone_gc_state& get_tombstone_gc_state() const noexcept override { return _tombstone_gc_state; } virtual compaction_backlog_tracker& get_backlog_tracker() override { return _backlog_tracker; } - virtual const std::string& get_group_id() const noexcept override { return _group_id; } + virtual const std::string get_group_id() const noexcept override { return _group_id; } virtual seastar::condition_variable& get_staging_done_condition() noexcept override { return _staging_done_condition; } };