From 9b17ad3771da950e97c3defb6f9208f9b5d3ca17 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Wed, 22 Mar 2023 00:24:10 +0100 Subject: [PATCH] locator: Introduce per-table replication strategy Will be used by tablet-based replication strategies, for which effective replication map is different per table. Also, this patch adapts existing users of effective replication map to use the per-table effective replication map. For simplicity, every table has an effective replication map, even if the erm is per keyspace. This way the client code can be uniform and doesn't have to check whether replication strategy is per table. Not all users of per-keyspace get_effective_replication_map() are adapted yet to work per-table. Those algorithms will throw an exception when invoked on a keyspace which uses per-table replication strategy. --- db/hints/manager.cc | 4 +-- locator/abstract_replication_strategy.cc | 7 +++++ locator/abstract_replication_strategy.hh | 27 +++++++++++++++++ replica/database.cc | 19 ++++++++++-- replica/database.hh | 19 +++++++----- replica/table.cc | 8 ++++- service/forward_service.cc | 4 +-- service/storage_proxy.cc | 38 ++++++++++++------------ service/storage_service.cc | 34 +++++++++++++++++++-- sstables_loader.cc | 2 +- test/boost/mutation_test.cc | 2 +- test/boost/sstable_3_x_test.cc | 2 +- test/boost/sstable_compaction_test.cc | 2 +- test/lib/test_services.cc | 2 +- test/perf/perf_sstable.hh | 2 +- tombstone_gc.cc | 3 +- 16 files changed, 131 insertions(+), 44 deletions(-) diff --git a/db/hints/manager.cc b/db/hints/manager.cc index c4458d2778..56504ba8da 100644 --- a/db/hints/manager.cc +++ b/db/hints/manager.cc @@ -848,9 +848,9 @@ void manager::end_point_hints_manager::sender::start() { } future<> manager::end_point_hints_manager::sender::send_one_mutation(frozen_mutation_and_schema m) { - replica::keyspace& ks = _db.find_keyspace(m.s->ks_name()); + auto erm = _db.find_column_family(m.s).get_effective_replication_map(); auto token = dht::get_token(*m.s, m.fm.key()); - inet_address_vector_replica_set natural_endpoints = ks.get_effective_replication_map()->get_natural_endpoints(std::move(token)); + inet_address_vector_replica_set natural_endpoints = erm->get_natural_endpoints(std::move(token)); return do_send_one_mutation(std::move(m), natural_endpoints); } diff --git a/locator/abstract_replication_strategy.cc b/locator/abstract_replication_strategy.cc index 4b35583483..e6f27e277c 100644 --- a/locator/abstract_replication_strategy.cc +++ b/locator/abstract_replication_strategy.cc @@ -107,6 +107,13 @@ inet_address_vector_topology_change vnode_effective_replication_map::get_pending return _tmptr->pending_endpoints_for(search_token, ks_name); } +const per_table_replication_strategy* abstract_replication_strategy::maybe_as_per_table() const { + if (!_per_table) { + return nullptr; + } + return dynamic_cast(this); +} + void abstract_replication_strategy::validate_replication_factor(sstring rf) { if (rf.empty() || std::any_of(rf.begin(), rf.end(), [] (char c) {return !isdigit(c);})) { diff --git a/locator/abstract_replication_strategy.hh b/locator/abstract_replication_strategy.hh index 090fd89f7e..7c8c399c2d 100644 --- a/locator/abstract_replication_strategy.hh +++ b/locator/abstract_replication_strategy.hh @@ -51,12 +51,15 @@ using endpoint_set = utils::basic_sequenced_set void err(const char* fmt, Args&&... args) const { @@ -117,6 +120,15 @@ public: replication_strategy_type get_type() const noexcept { return _my_type; } const replication_strategy_config_options get_config_options() const noexcept { return _config_options; } + // If returns true then tables governed by this replication strategy have separate + // effective_replication_maps. + // If returns false, they share the same effective_replication_map, which is per keyspace. + // If returns true, then this replication strategy extends per_table_replication_strategy. + // Note, a replication strategy may extend per_table_replication_strategy while !is_per_table(), + // depending on actual strategy options. + bool is_per_table() const { return _per_table; } + const per_table_replication_strategy* maybe_as_per_table() const; + // Use the token_metadata provided by the caller instead of _token_metadata // Note: must be called with initialized, non-empty token_metadata. future get_ranges(inet_address ep, token_metadata_ptr tmptr) const; @@ -177,6 +189,21 @@ public: using effective_replication_map_ptr = seastar::shared_ptr; using mutable_effective_replication_map_ptr = seastar::shared_ptr; +/// Replication strategies which support per-table replication extend this trait. +/// +/// It will be accessed only if the replication strategy actually works in per-table mode, +/// that is after mark_as_per_table() is called, and as a result +/// abstract_replication_strategy::is_per_table() returns true. +class per_table_replication_strategy { +protected: + void mark_as_per_table(abstract_replication_strategy& self) { + self._per_table = true; + } +public: + virtual ~per_table_replication_strategy() = default; + virtual effective_replication_map_ptr make_replication_map(table_id, token_metadata_ptr) const = 0; +}; + // Holds the full replication_map resulting from applying the // effective replication strategy over the given token_metadata // and replication_strategy_config_options. diff --git a/replica/database.cc b/replica/database.cc index a9f5a25be0..cc64dd8dfd 100644 --- a/replica/database.cc +++ b/replica/database.cc @@ -401,6 +401,14 @@ const data_dictionary::user_types_storage& database::user_types() const noexcept return *_user_types; } +locator::vnode_effective_replication_map_ptr keyspace::get_effective_replication_map() const { + // FIXME: Examine all users. + if (get_replication_strategy().is_per_table()) { + on_internal_error(dblog, format("Tried to obtain per-keyspace effective replication map of {} but it's per-table", _metadata->name())); + } + return _effective_replication_map; +} + } // namespace replica void backlog_controller::adjust() { @@ -943,6 +951,13 @@ void database::maybe_init_schema_commitlog() { void database::add_column_family(keyspace& ks, schema_ptr schema, column_family::config cfg) { schema = local_schema_registry().learn(schema); schema->registry_entry()->mark_synced(); + auto&& rs = ks.get_replication_strategy(); + locator::effective_replication_map_ptr erm; + if (auto pt_rs = rs.maybe_as_per_table()) { + erm = pt_rs->make_replication_map(schema->id(), _shared_token_metadata.get()); + } else { + erm = ks.get_effective_replication_map(); + } // avoid self-reporting auto& sst_manager = is_system_table(*schema) ? get_system_sstables_manager() : get_user_sstables_manager(); lw_shared_ptr cf; @@ -950,9 +965,9 @@ void database::add_column_family(keyspace& ks, schema_ptr schema, column_family: db::commitlog& cl = schema->static_props().use_schema_commitlog && _uses_schema_commitlog ? *_schema_commitlog : *_commitlog; - cf = make_lw_shared(schema, std::move(cfg), ks.metadata()->get_storage_options_ptr(), cl, _compaction_manager, sst_manager, *_cl_stats, _row_cache_tracker); + cf = make_lw_shared(schema, std::move(cfg), ks.metadata()->get_storage_options_ptr(), cl, _compaction_manager, sst_manager, *_cl_stats, _row_cache_tracker, erm); } else { - cf = make_lw_shared(schema, std::move(cfg), ks.metadata()->get_storage_options_ptr(), column_family::no_commitlog(), _compaction_manager, sst_manager, *_cl_stats, _row_cache_tracker); + cf = make_lw_shared(schema, std::move(cfg), ks.metadata()->get_storage_options_ptr(), column_family::no_commitlog(), _compaction_manager, sst_manager, *_cl_stats, _row_cache_tracker, erm); } cf->set_durable_writes(ks.metadata()->durable_writes()); diff --git a/replica/database.hh b/replica/database.hh index 1c7359edb8..35551d8f01 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -131,6 +131,8 @@ class column_family_test; class table_for_tests; class database_test; +extern logging::logger dblog; + namespace replica { using shared_memtable = lw_shared_ptr; @@ -395,6 +397,7 @@ public: private: schema_ptr _schema; config _config; + locator::effective_replication_map_ptr _erm; lw_shared_ptr _storage_opts; mutable table_stats _stats; mutable db::view::stats _view_stats; @@ -757,17 +760,19 @@ public: logalloc::occupancy_stats occupancy() const; private: - table(schema_ptr schema, config cfg, lw_shared_ptr, db::commitlog* cl, compaction_manager&, sstables::sstables_manager&, cell_locker_stats& cl_stats, cache_tracker& row_cache_tracker); + table(schema_ptr schema, config cfg, lw_shared_ptr, db::commitlog* cl, compaction_manager&, sstables::sstables_manager&, cell_locker_stats& cl_stats, cache_tracker& row_cache_tracker, locator::effective_replication_map_ptr erm); public: - table(schema_ptr schema, config cfg, lw_shared_ptr sopts, db::commitlog& cl, compaction_manager& cm, sstables::sstables_manager& sm, cell_locker_stats& cl_stats, cache_tracker& row_cache_tracker) - : table(schema, std::move(cfg), std::move(sopts), &cl, cm, sm, cl_stats, row_cache_tracker) {} - table(schema_ptr schema, config cfg, lw_shared_ptr sopts, no_commitlog, compaction_manager& cm, sstables::sstables_manager& sm, cell_locker_stats& cl_stats, cache_tracker& row_cache_tracker) - : table(schema, std::move(cfg), std::move(sopts), nullptr, cm, sm, cl_stats, row_cache_tracker) {} + table(schema_ptr schema, config cfg, lw_shared_ptr sopts, db::commitlog& cl, compaction_manager& cm, sstables::sstables_manager& sm, cell_locker_stats& cl_stats, cache_tracker& row_cache_tracker, locator::effective_replication_map_ptr erm) + : table(schema, std::move(cfg), std::move(sopts), &cl, cm, sm, cl_stats, row_cache_tracker, std::move(erm)) {} + table(schema_ptr schema, config cfg, lw_shared_ptr sopts, no_commitlog, compaction_manager& cm, sstables::sstables_manager& sm, cell_locker_stats& cl_stats, cache_tracker& row_cache_tracker, locator::effective_replication_map_ptr erm) + : table(schema, std::move(cfg), std::move(sopts), nullptr, cm, sm, cl_stats, row_cache_tracker, std::move(erm)) {} table(column_family&&) = delete; // 'this' is being captured during construction ~table(); const schema_ptr& schema() const { return _schema; } void set_schema(schema_ptr); db::commitlog* commitlog() { return _commitlog; } + const locator::effective_replication_map_ptr& get_effective_replication_map() const { return _erm; } + void update_effective_replication_map(locator::effective_replication_map_ptr); future find_partition(schema_ptr, reader_permit permit, const dht::decorated_key& key) const; future find_partition_slow(schema_ptr, reader_permit permit, const partition_key& key) const; future find_row(schema_ptr, reader_permit permit, const dht::decorated_key& partition_key, clustering_key clustering_key) const; @@ -1204,9 +1209,7 @@ public: return _replication_strategy; } - locator::vnode_effective_replication_map_ptr get_effective_replication_map() const { - return _effective_replication_map; - } + locator::vnode_effective_replication_map_ptr get_effective_replication_map() const; column_family::config make_column_family_config(const schema& s, const database& db) const; future<> make_directory_for_column_family(const sstring& name, table_id uuid); diff --git a/replica/table.cc b/replica/table.cc index 5ecc3c1a74..89f7193ef1 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -1526,9 +1526,11 @@ static inline unsigned get_x_log2_compaction_groups(unsigned x_log2_compaction_g } table::table(schema_ptr schema, config config, lw_shared_ptr sopts, db::commitlog* cl, compaction_manager& compaction_manager, - sstables::sstables_manager& sst_manager, cell_locker_stats& cl_stats, cache_tracker& row_cache_tracker) + sstables::sstables_manager& sst_manager, cell_locker_stats& cl_stats, cache_tracker& row_cache_tracker, + locator::effective_replication_map_ptr erm) : _schema(std::move(schema)) , _config(std::move(config)) + , _erm(std::move(erm)) , _storage_opts(std::move(sopts)) , _view_stats(format("{}_{}_view_replica_update", _schema->ks_name(), _schema->cf_name()), keyspace_label(_schema->ks_name()), @@ -1554,6 +1556,10 @@ table::table(schema_ptr schema, config config, lw_shared_ptr sstables) { auto sel = make_lw_shared(sstables->make_incremental_selector()); diff --git a/service/forward_service.cc b/service/forward_service.cc index 43d24efd99..8a2e871d96 100644 --- a/service/forward_service.cc +++ b/service/forward_service.cc @@ -508,7 +508,8 @@ future<> forward_service::uninit_messaging_service() { future forward_service::dispatch(query::forward_request req, tracing::trace_state_ptr tr_state) { schema_ptr schema = local_schema_registry().get(req.cmd.schema_version); - replica::keyspace& ks = _db.local().find_keyspace(schema->ks_name()); + replica::table& cf = _db.local().find_column_family(schema); + auto erm = cf.get_effective_replication_map(); // next_vnode is used to iterate through all vnodes produced by // query_ranges_to_vnodes_generator. auto next_vnode = [ @@ -524,7 +525,6 @@ future forward_service::dispatch(query::forward_request r std::map vnodes_per_addr; const auto& topo = get_token_metadata_ptr()->get_topology(); while (std::optional vnode = next_vnode()) { - auto erm = ks.get_effective_replication_map(); inet_address_vector_replica_set live_endpoints = _proxy.get_live_endpoints(*erm, end_token(*vnode)); // Do not choose an endpoint outside the current datacenter if a request has a local consistency if (db::is_datacenter_local(req.cl)) { diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 63f1e7c0b6..8b333f7cf4 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -1639,8 +1639,8 @@ paxos_response_handler::paxos_response_handler(shared_ptr proxy_a , _permit(std::move(permit_arg)) , tr_state(tr_state_arg) { auto ks_name = _schema->ks_name(); - replica::keyspace& ks = _proxy->_db.local().find_keyspace(ks_name); - _effective_replication_map_ptr = ks.get_effective_replication_map(); + replica::table& table = _proxy->_db.local().find_column_family(_schema->id()); + _effective_replication_map_ptr = table.get_effective_replication_map(); storage_proxy::paxos_participants pp = _proxy->get_paxos_participants(ks_name, *_effective_replication_map_ptr, _key.token(), _cl_for_paxos); _live_endpoints = std::move(pp.endpoints); _required_participants = pp.required_participants; @@ -2843,8 +2843,8 @@ result storage_proxy::create_write_response_handler_helper(schema_ptr s, const dht::token& token, std::unique_ptr mh, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit) { auto keyspace_name = s->ks_name(); - replica::keyspace& ks = _db.local().find_keyspace(keyspace_name); - auto erm = ks.get_effective_replication_map(); + replica::table& table = _db.local().find_column_family(s->id()); + auto erm = table.get_effective_replication_map(); inet_address_vector_replica_set natural_endpoints = erm->get_natural_endpoints_without_node_being_replaced(token); inet_address_vector_topology_change pending_endpoints = erm->get_pending_endpoints(token, keyspace_name); @@ -2961,8 +2961,8 @@ storage_proxy::create_write_response_handler(const std::tupleks_name(); - replica::keyspace& ks = _db.local().find_keyspace(keyspace_name); - auto ermp = ks.get_effective_replication_map(); + replica::table& table = _db.local().find_column_family(s->id()); + auto ermp = table.get_effective_replication_map(); // No rate limiting for paxos (yet) return create_write_response_handler(std::move(ermp), cl, db::write_type::CAS, std::make_unique(std::move(commit), s, nullptr), std::move(endpoints), @@ -3145,8 +3145,8 @@ future<> storage_proxy::mutate_counters(Range&& mutations, db::consistency_level std::set erms; for (auto& m : mutations) { - auto& ks = _db.local().find_keyspace(m.schema()->ks_name()); - auto erm = ks.get_effective_replication_map(); + auto& table = _db.local().find_column_family(m.schema()->id()); + auto erm = table.get_effective_replication_map(); erms.insert(erm); auto leader = find_leader_for_counter_update(m, *erm, cl); leaders[leader].emplace_back(frozen_mutation_and_schema { freeze(m), m.schema() }); @@ -3190,8 +3190,8 @@ future<> storage_proxy::mutate_counters(Range&& mutations, db::consistency_level // - we only use this to calculate some infomation for the error message // - the topology coordinator should prevent incompatible changes while requests // (like this one) are in flight - auto& ks = _db.local().find_keyspace(s->ks_name()); - auto erm = ks.get_effective_replication_map(); + auto& table = _db.local().find_column_family(s->id()); + auto erm = table.get_effective_replication_map(); try { std::rethrow_exception(std::move(exp)); } catch (rpc::timeout_error&) { @@ -3492,8 +3492,8 @@ storage_proxy::mutate_atomically_result(std::vector mutations, db::con future> send_batchlog_mutation(mutation m, db::consistency_level cl = db::consistency_level::ONE) { return _p.mutate_prepare<>(std::array{std::move(m)}, cl, db::write_type::BATCH_LOG, _permit, [this] (const mutation& m, db::consistency_level cl, db::write_type type, service_permit permit) { - auto& ks = _p._db.local().find_keyspace(m.schema()->ks_name()); - auto ermp = ks.get_effective_replication_map(); + auto& table = _p._db.local().find_column_family(m.schema()->id()); + auto ermp = table.get_effective_replication_map(); return _p.create_write_response_handler(std::move(ermp), cl, type, std::make_unique(m), _batchlog_endpoints, {}, {}, _trace_state, _stats, std::move(permit), std::monostate()); }).then(utils::result_wrap([this, cl] (unique_response_handler_vector ids) { _p.register_cdc_operation_result_tracker(ids, _cdc_tracker); @@ -3623,8 +3623,8 @@ future<> storage_proxy::send_to_endpoint( std::inserter(targets, targets.begin()), std::back_inserter(dead_endpoints), std::bind_front(&storage_proxy::is_alive, this)); - auto& ks = _db.local().find_keyspace(m->schema()->ks_name()); - auto erm = ks.get_effective_replication_map(); + auto& table = _db.local().find_column_family(m->schema()->id()); + auto erm = table.get_effective_replication_map(); slogger.trace("Creating write handler with live: {}; dead: {}", targets, dead_endpoints); db::assure_sufficient_live_nodes(cl, *erm, targets, pending_endpoints); return create_write_response_handler( @@ -5246,8 +5246,8 @@ storage_proxy::query_singular(lw_shared_ptr cmd, schema_ptr schema = local_schema_registry().get(cmd->schema_version); - replica::keyspace& ks = _db.local().find_keyspace(schema->ks_name()); - auto erm = ks.get_effective_replication_map(); + replica::table& table = _db.local().find_column_family(schema->id()); + auto erm = table.get_effective_replication_map(); db::read_repair_decision repair_decision = query_options.read_repair_decision ? *query_options.read_repair_decision : new_read_repair_decision(*schema); @@ -5574,12 +5574,12 @@ storage_proxy::query_partition_key_range(lw_shared_ptr cmd, db::consistency_level cl, storage_proxy::coordinator_query_options query_options) { schema_ptr schema = local_schema_registry().get(cmd->schema_version); - replica::keyspace& ks = _db.local().find_keyspace(schema->ks_name()); - auto erm = ks.get_effective_replication_map(); + replica::table& table = _db.local().find_column_family(schema->id()); + auto erm = table.get_effective_replication_map(); // when dealing with LocalStrategy and EverywhereStrategy keyspaces, we can skip the range splitting and merging // (which can be expensive in clusters with vnodes) - auto merge_tokens = !ks.get_replication_strategy().natural_endpoints_depend_on_token(); + auto merge_tokens = !erm->get_replication_strategy().natural_endpoints_depend_on_token(); query_ranges_to_vnodes_generator ranges_to_vnodes(erm->get_token_metadata_ptr(), schema, std::move(partition_ranges), merge_tokens); diff --git a/service/storage_service.cc b/service/storage_service.cc index 1a89a133aa..866a95ce9e 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -2533,6 +2533,8 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt pending_token_metadata_ptr.resize(smp::count); std::vector> pending_effective_replication_maps; pending_effective_replication_maps.resize(smp::count); + std::vector> pending_table_erms; + pending_table_erms.resize(smp::count); try { auto base_shard = this_shard_id(); @@ -2552,6 +2554,9 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt auto keyspaces = db.get_all_keyspaces(); for (auto& ks_name : keyspaces) { auto rs = db.find_keyspace(ks_name).get_replication_strategy_ptr(); + if (rs->is_per_table()) { + continue; + } auto erm = co_await get_erm_factory().create_effective_replication_map(rs, tmptr); pending_effective_replication_maps[base_shard].emplace(ks_name, std::move(erm)); } @@ -2559,10 +2564,27 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt auto& db = ss._db.local(); for (auto& ks_name : keyspaces) { auto rs = db.find_keyspace(ks_name).get_replication_strategy_ptr(); + if (rs->is_per_table()) { + continue; + } auto tmptr = pending_token_metadata_ptr[this_shard_id()]; auto erm = co_await ss.get_erm_factory().create_effective_replication_map(rs, std::move(tmptr)); pending_effective_replication_maps[this_shard_id()].emplace(ks_name, std::move(erm)); - + } + }); + // Prepare per-table erms. + co_await container().invoke_on_all([&] (storage_service& ss) { + auto& db = ss._db.local(); + auto tmptr = pending_token_metadata_ptr[this_shard_id()]; + for (auto&& [id, cf] : db.get_column_families()) { // Safe because we iterate without preemption + auto rs = db.find_keyspace(cf->schema()->keypace_name()).get_replication_strategy_ptr(); + locator::effective_replication_map_ptr erm; + if (auto pt_rs = rs->maybe_as_per_table()) { + erm = pt_rs->make_replication_map(id, tmptr); + } else { + erm = pending_effective_replication_maps[this_shard_id()][cf->schema()->keypace_name()]; + } + pending_table_erms[this_shard_id()].emplace(id, std::move(erm)); } }); } catch (...) { @@ -2575,6 +2597,7 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt co_await smp::invoke_on_all([&] () -> future<> { auto tmptr = std::move(pending_token_metadata_ptr[this_shard_id()]); auto erms = std::move(pending_effective_replication_maps[this_shard_id()]); + auto table_erms = std::move(pending_table_erms[this_shard_id()]); co_await utils::clear_gently(erms); co_await utils::clear_gently(tmptr); @@ -2590,14 +2613,21 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt try { co_await container().invoke_on_all([&] (storage_service& ss) { ss._shared_token_metadata.set(std::move(pending_token_metadata_ptr[this_shard_id()])); + auto& db = ss._db.local(); auto& erms = pending_effective_replication_maps[this_shard_id()]; for (auto it = erms.begin(); it != erms.end(); ) { - auto& db = ss._db.local(); auto& ks = db.find_keyspace(it->first); ks.update_effective_replication_map(std::move(it->second)); it = erms.erase(it); } + + auto& table_erms = pending_table_erms[this_shard_id()]; + for (auto it = table_erms.begin(); it != table_erms.end(); ) { + auto& cf = db.find_column_family(it->first); + cf.update_effective_replication_map(std::move(it->second)); + it = table_erms.erase(it); + } }); } catch (...) { // applying the changes on all shards should never fail diff --git a/sstables_loader.cc b/sstables_loader.cc index 0cbd253029..ffe34e1c0c 100644 --- a/sstables_loader.cc +++ b/sstables_loader.cc @@ -123,7 +123,7 @@ future<> sstables_loader::load_and_stream(sstring ks_name, sstring cf_name, auto s = table.schema(); const auto cf_id = s->id(); const auto reason = streaming::stream_reason::repair; - auto erm = _db.local().find_keyspace(ks_name).get_effective_replication_map(); + auto erm = _db.local().find_column_family(s).get_effective_replication_map(); // By sorting SSTables by their primary key, we allow SSTable runs to be // incrementally streamed. diff --git a/test/boost/mutation_test.cc b/test/boost/mutation_test.cc index d6e4a8070c..8040d21819 100644 --- a/test/boost/mutation_test.cc +++ b/test/boost/mutation_test.cc @@ -103,7 +103,7 @@ with_column_family(schema_ptr s, replica::column_family::config cfg, sstables::s auto cm = make_lw_shared(tm, compaction_manager::for_testing_tag{}); auto cl_stats = make_lw_shared(); auto s_opts = make_lw_shared(); - auto cf = make_lw_shared(s, cfg, s_opts, replica::column_family::no_commitlog(), *cm, sm, *cl_stats, *tracker); + auto cf = make_lw_shared(s, cfg, s_opts, replica::column_family::no_commitlog(), *cm, sm, *cl_stats, *tracker, nullptr); cf->mark_ready_for_writes(); co_await func(*cf); co_await cf->stop(); diff --git a/test/boost/sstable_3_x_test.cc b/test/boost/sstable_3_x_test.cc index 4e1b1c0b5a..892ee3908e 100644 --- a/test/boost/sstable_3_x_test.cc +++ b/test/boost/sstable_3_x_test.cc @@ -3025,7 +3025,7 @@ static flat_mutation_reader_v2 compacted_sstable_reader(test_env& env, schema_pt auto cm = make_lw_shared(false); auto cl_stats = make_lw_shared(); auto tracker = make_lw_shared(); - auto cf = make_lw_shared(s, env.make_table_config(), make_lw_shared(), replica::column_family::no_commitlog(), **cm, env.manager(), *cl_stats, *tracker); + auto cf = make_lw_shared(s, env.make_table_config(), make_lw_shared(), replica::column_family::no_commitlog(), **cm, env.manager(), *cl_stats, *tracker, nullptr); cf->mark_ready_for_writes(); lw_shared_ptr mt = make_lw_shared(s); diff --git a/test/boost/sstable_compaction_test.cc b/test/boost/sstable_compaction_test.cc index d57268650a..7638b4913b 100644 --- a/test/boost/sstable_compaction_test.cc +++ b/test/boost/sstable_compaction_test.cc @@ -4205,7 +4205,7 @@ SEASTAR_TEST_CASE(max_ongoing_compaction_test) { cfg.enable_commitlog = false; cfg.enable_incremental_backups = false; - auto cf = make_lw_shared(s, cfg, make_lw_shared(), replica::column_family::no_commitlog(), *cm, env.manager(), *cl_stats, *tracker); + auto cf = make_lw_shared(s, cfg, make_lw_shared(), replica::column_family::no_commitlog(), *cm, env.manager(), *cl_stats, *tracker, nullptr); cf->start(); cf->mark_ready_for_writes(); tables.push_back(cf); diff --git a/test/lib/test_services.cc b/test/lib/test_services.cc index 01210be55c..8f3a41b870 100644 --- a/test/lib/test_services.cc +++ b/test/lib/test_services.cc @@ -128,7 +128,7 @@ table_for_tests::table_for_tests(sstables::sstables_manager& sstables_manager, s _data->cfg.cf_stats = &_data->cf_stats; _data->cfg.enable_commitlog = false; _data->cm.enable(); - _data->cf = make_lw_shared(_data->s, _data->cfg, make_lw_shared(), replica::column_family::no_commitlog(), _data->cm, sstables_manager, _data->cl_stats, _data->tracker); + _data->cf = make_lw_shared(_data->s, _data->cfg, make_lw_shared(), replica::column_family::no_commitlog(), _data->cm, sstables_manager, _data->cl_stats, _data->tracker, nullptr); _data->cf->mark_ready_for_writes(); _data->table_s = std::make_unique(*_data, sstables_manager); _data->cm.add(*_data->table_s); diff --git a/test/perf/perf_sstable.hh b/test/perf/perf_sstable.hh index f243b7dcf1..e14b7a45e8 100644 --- a/test/perf/perf_sstable.hh +++ b/test/perf/perf_sstable.hh @@ -228,7 +228,7 @@ public: cell_locker_stats cl_stats; tasks::task_manager tm; auto cm = make_lw_shared(tm, compaction_manager::for_testing_tag{}); - auto cf = make_lw_shared(s, env.make_table_config(), make_lw_shared(), replica::column_family::no_commitlog(), *cm, env.manager(), cl_stats, tracker); + auto cf = make_lw_shared(s, env.make_table_config(), make_lw_shared(), replica::column_family::no_commitlog(), *cm, env.manager(), cl_stats, tracker, nullptr); auto start = perf_sstable_test_env::now(); diff --git a/tombstone_gc.cc b/tombstone_gc.cc index 41e53492ed..bb873a9f31 100644 --- a/tombstone_gc.cc +++ b/tombstone_gc.cc @@ -167,9 +167,8 @@ static bool needs_repair_before_gc(const replica::database& db, sstring ks_name) // need to run repair even if tombstone_gc mode = repair. auto& ks = db.find_keyspace(ks_name); auto& rs = ks.get_replication_strategy(); - auto erm = ks.get_effective_replication_map(); bool needs_repair = rs.get_type() != locator::replication_strategy_type::local - && erm->get_replication_factor() != 1; + && rs.get_replication_factor(db.get_token_metadata()) != 1; return needs_repair; }