diff --git a/db/hints/manager.cc b/db/hints/manager.cc index c4458d2778..56504ba8da 100644 --- a/db/hints/manager.cc +++ b/db/hints/manager.cc @@ -848,9 +848,9 @@ void manager::end_point_hints_manager::sender::start() { } future<> manager::end_point_hints_manager::sender::send_one_mutation(frozen_mutation_and_schema m) { - replica::keyspace& ks = _db.find_keyspace(m.s->ks_name()); + auto erm = _db.find_column_family(m.s).get_effective_replication_map(); auto token = dht::get_token(*m.s, m.fm.key()); - inet_address_vector_replica_set natural_endpoints = ks.get_effective_replication_map()->get_natural_endpoints(std::move(token)); + inet_address_vector_replica_set natural_endpoints = erm->get_natural_endpoints(std::move(token)); return do_send_one_mutation(std::move(m), natural_endpoints); } diff --git a/locator/abstract_replication_strategy.cc b/locator/abstract_replication_strategy.cc index 4b35583483..e6f27e277c 100644 --- a/locator/abstract_replication_strategy.cc +++ b/locator/abstract_replication_strategy.cc @@ -107,6 +107,13 @@ inet_address_vector_topology_change vnode_effective_replication_map::get_pending return _tmptr->pending_endpoints_for(search_token, ks_name); } +const per_table_replication_strategy* abstract_replication_strategy::maybe_as_per_table() const { + if (!_per_table) { + return nullptr; + } + return dynamic_cast(this); +} + void abstract_replication_strategy::validate_replication_factor(sstring rf) { if (rf.empty() || std::any_of(rf.begin(), rf.end(), [] (char c) {return !isdigit(c);})) { diff --git a/locator/abstract_replication_strategy.hh b/locator/abstract_replication_strategy.hh index 090fd89f7e..7c8c399c2d 100644 --- a/locator/abstract_replication_strategy.hh +++ b/locator/abstract_replication_strategy.hh @@ -51,12 +51,15 @@ using endpoint_set = utils::basic_sequenced_set void err(const char* fmt, Args&&... args) const { @@ -117,6 +120,15 @@ public: replication_strategy_type get_type() const noexcept { return _my_type; } const replication_strategy_config_options get_config_options() const noexcept { return _config_options; } + // If returns true then tables governed by this replication strategy have separate + // effective_replication_maps. + // If returns false, they share the same effective_replication_map, which is per keyspace. + // If returns true, then this replication strategy extends per_table_replication_strategy. + // Note, a replication strategy may extend per_table_replication_strategy while !is_per_table(), + // depending on actual strategy options. + bool is_per_table() const { return _per_table; } + const per_table_replication_strategy* maybe_as_per_table() const; + // Use the token_metadata provided by the caller instead of _token_metadata // Note: must be called with initialized, non-empty token_metadata. future get_ranges(inet_address ep, token_metadata_ptr tmptr) const; @@ -177,6 +189,21 @@ public: using effective_replication_map_ptr = seastar::shared_ptr; using mutable_effective_replication_map_ptr = seastar::shared_ptr; +/// Replication strategies which support per-table replication extend this trait. +/// +/// It will be accessed only if the replication strategy actually works in per-table mode, +/// that is after mark_as_per_table() is called, and as a result +/// abstract_replication_strategy::is_per_table() returns true. +class per_table_replication_strategy { +protected: + void mark_as_per_table(abstract_replication_strategy& self) { + self._per_table = true; + } +public: + virtual ~per_table_replication_strategy() = default; + virtual effective_replication_map_ptr make_replication_map(table_id, token_metadata_ptr) const = 0; +}; + // Holds the full replication_map resulting from applying the // effective replication strategy over the given token_metadata // and replication_strategy_config_options. diff --git a/replica/database.cc b/replica/database.cc index a9f5a25be0..cc64dd8dfd 100644 --- a/replica/database.cc +++ b/replica/database.cc @@ -401,6 +401,14 @@ const data_dictionary::user_types_storage& database::user_types() const noexcept return *_user_types; } +locator::vnode_effective_replication_map_ptr keyspace::get_effective_replication_map() const { + // FIXME: Examine all users. + if (get_replication_strategy().is_per_table()) { + on_internal_error(dblog, format("Tried to obtain per-keyspace effective replication map of {} but it's per-table", _metadata->name())); + } + return _effective_replication_map; +} + } // namespace replica void backlog_controller::adjust() { @@ -943,6 +951,13 @@ void database::maybe_init_schema_commitlog() { void database::add_column_family(keyspace& ks, schema_ptr schema, column_family::config cfg) { schema = local_schema_registry().learn(schema); schema->registry_entry()->mark_synced(); + auto&& rs = ks.get_replication_strategy(); + locator::effective_replication_map_ptr erm; + if (auto pt_rs = rs.maybe_as_per_table()) { + erm = pt_rs->make_replication_map(schema->id(), _shared_token_metadata.get()); + } else { + erm = ks.get_effective_replication_map(); + } // avoid self-reporting auto& sst_manager = is_system_table(*schema) ? get_system_sstables_manager() : get_user_sstables_manager(); lw_shared_ptr cf; @@ -950,9 +965,9 @@ void database::add_column_family(keyspace& ks, schema_ptr schema, column_family: db::commitlog& cl = schema->static_props().use_schema_commitlog && _uses_schema_commitlog ? *_schema_commitlog : *_commitlog; - cf = make_lw_shared(schema, std::move(cfg), ks.metadata()->get_storage_options_ptr(), cl, _compaction_manager, sst_manager, *_cl_stats, _row_cache_tracker); + cf = make_lw_shared(schema, std::move(cfg), ks.metadata()->get_storage_options_ptr(), cl, _compaction_manager, sst_manager, *_cl_stats, _row_cache_tracker, erm); } else { - cf = make_lw_shared(schema, std::move(cfg), ks.metadata()->get_storage_options_ptr(), column_family::no_commitlog(), _compaction_manager, sst_manager, *_cl_stats, _row_cache_tracker); + cf = make_lw_shared(schema, std::move(cfg), ks.metadata()->get_storage_options_ptr(), column_family::no_commitlog(), _compaction_manager, sst_manager, *_cl_stats, _row_cache_tracker, erm); } cf->set_durable_writes(ks.metadata()->durable_writes()); diff --git a/replica/database.hh b/replica/database.hh index 1c7359edb8..35551d8f01 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -131,6 +131,8 @@ class column_family_test; class table_for_tests; class database_test; +extern logging::logger dblog; + namespace replica { using shared_memtable = lw_shared_ptr; @@ -395,6 +397,7 @@ public: private: schema_ptr _schema; config _config; + locator::effective_replication_map_ptr _erm; lw_shared_ptr _storage_opts; mutable table_stats _stats; mutable db::view::stats _view_stats; @@ -757,17 +760,19 @@ public: logalloc::occupancy_stats occupancy() const; private: - table(schema_ptr schema, config cfg, lw_shared_ptr, db::commitlog* cl, compaction_manager&, sstables::sstables_manager&, cell_locker_stats& cl_stats, cache_tracker& row_cache_tracker); + table(schema_ptr schema, config cfg, lw_shared_ptr, db::commitlog* cl, compaction_manager&, sstables::sstables_manager&, cell_locker_stats& cl_stats, cache_tracker& row_cache_tracker, locator::effective_replication_map_ptr erm); public: - table(schema_ptr schema, config cfg, lw_shared_ptr sopts, db::commitlog& cl, compaction_manager& cm, sstables::sstables_manager& sm, cell_locker_stats& cl_stats, cache_tracker& row_cache_tracker) - : table(schema, std::move(cfg), std::move(sopts), &cl, cm, sm, cl_stats, row_cache_tracker) {} - table(schema_ptr schema, config cfg, lw_shared_ptr sopts, no_commitlog, compaction_manager& cm, sstables::sstables_manager& sm, cell_locker_stats& cl_stats, cache_tracker& row_cache_tracker) - : table(schema, std::move(cfg), std::move(sopts), nullptr, cm, sm, cl_stats, row_cache_tracker) {} + table(schema_ptr schema, config cfg, lw_shared_ptr sopts, db::commitlog& cl, compaction_manager& cm, sstables::sstables_manager& sm, cell_locker_stats& cl_stats, cache_tracker& row_cache_tracker, locator::effective_replication_map_ptr erm) + : table(schema, std::move(cfg), std::move(sopts), &cl, cm, sm, cl_stats, row_cache_tracker, std::move(erm)) {} + table(schema_ptr schema, config cfg, lw_shared_ptr sopts, no_commitlog, compaction_manager& cm, sstables::sstables_manager& sm, cell_locker_stats& cl_stats, cache_tracker& row_cache_tracker, locator::effective_replication_map_ptr erm) + : table(schema, std::move(cfg), std::move(sopts), nullptr, cm, sm, cl_stats, row_cache_tracker, std::move(erm)) {} table(column_family&&) = delete; // 'this' is being captured during construction ~table(); const schema_ptr& schema() const { return _schema; } void set_schema(schema_ptr); db::commitlog* commitlog() { return _commitlog; } + const locator::effective_replication_map_ptr& get_effective_replication_map() const { return _erm; } + void update_effective_replication_map(locator::effective_replication_map_ptr); future find_partition(schema_ptr, reader_permit permit, const dht::decorated_key& key) const; future find_partition_slow(schema_ptr, reader_permit permit, const partition_key& key) const; future find_row(schema_ptr, reader_permit permit, const dht::decorated_key& partition_key, clustering_key clustering_key) const; @@ -1204,9 +1209,7 @@ public: return _replication_strategy; } - locator::vnode_effective_replication_map_ptr get_effective_replication_map() const { - return _effective_replication_map; - } + locator::vnode_effective_replication_map_ptr get_effective_replication_map() const; column_family::config make_column_family_config(const schema& s, const database& db) const; future<> make_directory_for_column_family(const sstring& name, table_id uuid); diff --git a/replica/table.cc b/replica/table.cc index 5ecc3c1a74..89f7193ef1 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -1526,9 +1526,11 @@ static inline unsigned get_x_log2_compaction_groups(unsigned x_log2_compaction_g } table::table(schema_ptr schema, config config, lw_shared_ptr sopts, db::commitlog* cl, compaction_manager& compaction_manager, - sstables::sstables_manager& sst_manager, cell_locker_stats& cl_stats, cache_tracker& row_cache_tracker) + sstables::sstables_manager& sst_manager, cell_locker_stats& cl_stats, cache_tracker& row_cache_tracker, + locator::effective_replication_map_ptr erm) : _schema(std::move(schema)) , _config(std::move(config)) + , _erm(std::move(erm)) , _storage_opts(std::move(sopts)) , _view_stats(format("{}_{}_view_replica_update", _schema->ks_name(), _schema->cf_name()), keyspace_label(_schema->ks_name()), @@ -1554,6 +1556,10 @@ table::table(schema_ptr schema, config config, lw_shared_ptr sstables) { auto sel = make_lw_shared(sstables->make_incremental_selector()); diff --git a/service/forward_service.cc b/service/forward_service.cc index 43d24efd99..8a2e871d96 100644 --- a/service/forward_service.cc +++ b/service/forward_service.cc @@ -508,7 +508,8 @@ future<> forward_service::uninit_messaging_service() { future forward_service::dispatch(query::forward_request req, tracing::trace_state_ptr tr_state) { schema_ptr schema = local_schema_registry().get(req.cmd.schema_version); - replica::keyspace& ks = _db.local().find_keyspace(schema->ks_name()); + replica::table& cf = _db.local().find_column_family(schema); + auto erm = cf.get_effective_replication_map(); // next_vnode is used to iterate through all vnodes produced by // query_ranges_to_vnodes_generator. auto next_vnode = [ @@ -524,7 +525,6 @@ future forward_service::dispatch(query::forward_request r std::map vnodes_per_addr; const auto& topo = get_token_metadata_ptr()->get_topology(); while (std::optional vnode = next_vnode()) { - auto erm = ks.get_effective_replication_map(); inet_address_vector_replica_set live_endpoints = _proxy.get_live_endpoints(*erm, end_token(*vnode)); // Do not choose an endpoint outside the current datacenter if a request has a local consistency if (db::is_datacenter_local(req.cl)) { diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 63f1e7c0b6..8b333f7cf4 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -1639,8 +1639,8 @@ paxos_response_handler::paxos_response_handler(shared_ptr proxy_a , _permit(std::move(permit_arg)) , tr_state(tr_state_arg) { auto ks_name = _schema->ks_name(); - replica::keyspace& ks = _proxy->_db.local().find_keyspace(ks_name); - _effective_replication_map_ptr = ks.get_effective_replication_map(); + replica::table& table = _proxy->_db.local().find_column_family(_schema->id()); + _effective_replication_map_ptr = table.get_effective_replication_map(); storage_proxy::paxos_participants pp = _proxy->get_paxos_participants(ks_name, *_effective_replication_map_ptr, _key.token(), _cl_for_paxos); _live_endpoints = std::move(pp.endpoints); _required_participants = pp.required_participants; @@ -2843,8 +2843,8 @@ result storage_proxy::create_write_response_handler_helper(schema_ptr s, const dht::token& token, std::unique_ptr mh, db::consistency_level cl, db::write_type type, tracing::trace_state_ptr tr_state, service_permit permit, db::allow_per_partition_rate_limit allow_limit) { auto keyspace_name = s->ks_name(); - replica::keyspace& ks = _db.local().find_keyspace(keyspace_name); - auto erm = ks.get_effective_replication_map(); + replica::table& table = _db.local().find_column_family(s->id()); + auto erm = table.get_effective_replication_map(); inet_address_vector_replica_set natural_endpoints = erm->get_natural_endpoints_without_node_being_replaced(token); inet_address_vector_topology_change pending_endpoints = erm->get_pending_endpoints(token, keyspace_name); @@ -2961,8 +2961,8 @@ storage_proxy::create_write_response_handler(const std::tupleks_name(); - replica::keyspace& ks = _db.local().find_keyspace(keyspace_name); - auto ermp = ks.get_effective_replication_map(); + replica::table& table = _db.local().find_column_family(s->id()); + auto ermp = table.get_effective_replication_map(); // No rate limiting for paxos (yet) return create_write_response_handler(std::move(ermp), cl, db::write_type::CAS, std::make_unique(std::move(commit), s, nullptr), std::move(endpoints), @@ -3145,8 +3145,8 @@ future<> storage_proxy::mutate_counters(Range&& mutations, db::consistency_level std::set erms; for (auto& m : mutations) { - auto& ks = _db.local().find_keyspace(m.schema()->ks_name()); - auto erm = ks.get_effective_replication_map(); + auto& table = _db.local().find_column_family(m.schema()->id()); + auto erm = table.get_effective_replication_map(); erms.insert(erm); auto leader = find_leader_for_counter_update(m, *erm, cl); leaders[leader].emplace_back(frozen_mutation_and_schema { freeze(m), m.schema() }); @@ -3190,8 +3190,8 @@ future<> storage_proxy::mutate_counters(Range&& mutations, db::consistency_level // - we only use this to calculate some infomation for the error message // - the topology coordinator should prevent incompatible changes while requests // (like this one) are in flight - auto& ks = _db.local().find_keyspace(s->ks_name()); - auto erm = ks.get_effective_replication_map(); + auto& table = _db.local().find_column_family(s->id()); + auto erm = table.get_effective_replication_map(); try { std::rethrow_exception(std::move(exp)); } catch (rpc::timeout_error&) { @@ -3492,8 +3492,8 @@ storage_proxy::mutate_atomically_result(std::vector mutations, db::con future> send_batchlog_mutation(mutation m, db::consistency_level cl = db::consistency_level::ONE) { return _p.mutate_prepare<>(std::array{std::move(m)}, cl, db::write_type::BATCH_LOG, _permit, [this] (const mutation& m, db::consistency_level cl, db::write_type type, service_permit permit) { - auto& ks = _p._db.local().find_keyspace(m.schema()->ks_name()); - auto ermp = ks.get_effective_replication_map(); + auto& table = _p._db.local().find_column_family(m.schema()->id()); + auto ermp = table.get_effective_replication_map(); return _p.create_write_response_handler(std::move(ermp), cl, type, std::make_unique(m), _batchlog_endpoints, {}, {}, _trace_state, _stats, std::move(permit), std::monostate()); }).then(utils::result_wrap([this, cl] (unique_response_handler_vector ids) { _p.register_cdc_operation_result_tracker(ids, _cdc_tracker); @@ -3623,8 +3623,8 @@ future<> storage_proxy::send_to_endpoint( std::inserter(targets, targets.begin()), std::back_inserter(dead_endpoints), std::bind_front(&storage_proxy::is_alive, this)); - auto& ks = _db.local().find_keyspace(m->schema()->ks_name()); - auto erm = ks.get_effective_replication_map(); + auto& table = _db.local().find_column_family(m->schema()->id()); + auto erm = table.get_effective_replication_map(); slogger.trace("Creating write handler with live: {}; dead: {}", targets, dead_endpoints); db::assure_sufficient_live_nodes(cl, *erm, targets, pending_endpoints); return create_write_response_handler( @@ -5246,8 +5246,8 @@ storage_proxy::query_singular(lw_shared_ptr cmd, schema_ptr schema = local_schema_registry().get(cmd->schema_version); - replica::keyspace& ks = _db.local().find_keyspace(schema->ks_name()); - auto erm = ks.get_effective_replication_map(); + replica::table& table = _db.local().find_column_family(schema->id()); + auto erm = table.get_effective_replication_map(); db::read_repair_decision repair_decision = query_options.read_repair_decision ? *query_options.read_repair_decision : new_read_repair_decision(*schema); @@ -5574,12 +5574,12 @@ storage_proxy::query_partition_key_range(lw_shared_ptr cmd, db::consistency_level cl, storage_proxy::coordinator_query_options query_options) { schema_ptr schema = local_schema_registry().get(cmd->schema_version); - replica::keyspace& ks = _db.local().find_keyspace(schema->ks_name()); - auto erm = ks.get_effective_replication_map(); + replica::table& table = _db.local().find_column_family(schema->id()); + auto erm = table.get_effective_replication_map(); // when dealing with LocalStrategy and EverywhereStrategy keyspaces, we can skip the range splitting and merging // (which can be expensive in clusters with vnodes) - auto merge_tokens = !ks.get_replication_strategy().natural_endpoints_depend_on_token(); + auto merge_tokens = !erm->get_replication_strategy().natural_endpoints_depend_on_token(); query_ranges_to_vnodes_generator ranges_to_vnodes(erm->get_token_metadata_ptr(), schema, std::move(partition_ranges), merge_tokens); diff --git a/service/storage_service.cc b/service/storage_service.cc index 1a89a133aa..866a95ce9e 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -2533,6 +2533,8 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt pending_token_metadata_ptr.resize(smp::count); std::vector> pending_effective_replication_maps; pending_effective_replication_maps.resize(smp::count); + std::vector> pending_table_erms; + pending_table_erms.resize(smp::count); try { auto base_shard = this_shard_id(); @@ -2552,6 +2554,9 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt auto keyspaces = db.get_all_keyspaces(); for (auto& ks_name : keyspaces) { auto rs = db.find_keyspace(ks_name).get_replication_strategy_ptr(); + if (rs->is_per_table()) { + continue; + } auto erm = co_await get_erm_factory().create_effective_replication_map(rs, tmptr); pending_effective_replication_maps[base_shard].emplace(ks_name, std::move(erm)); } @@ -2559,10 +2564,27 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt auto& db = ss._db.local(); for (auto& ks_name : keyspaces) { auto rs = db.find_keyspace(ks_name).get_replication_strategy_ptr(); + if (rs->is_per_table()) { + continue; + } auto tmptr = pending_token_metadata_ptr[this_shard_id()]; auto erm = co_await ss.get_erm_factory().create_effective_replication_map(rs, std::move(tmptr)); pending_effective_replication_maps[this_shard_id()].emplace(ks_name, std::move(erm)); - + } + }); + // Prepare per-table erms. + co_await container().invoke_on_all([&] (storage_service& ss) { + auto& db = ss._db.local(); + auto tmptr = pending_token_metadata_ptr[this_shard_id()]; + for (auto&& [id, cf] : db.get_column_families()) { // Safe because we iterate without preemption + auto rs = db.find_keyspace(cf->schema()->keypace_name()).get_replication_strategy_ptr(); + locator::effective_replication_map_ptr erm; + if (auto pt_rs = rs->maybe_as_per_table()) { + erm = pt_rs->make_replication_map(id, tmptr); + } else { + erm = pending_effective_replication_maps[this_shard_id()][cf->schema()->keypace_name()]; + } + pending_table_erms[this_shard_id()].emplace(id, std::move(erm)); } }); } catch (...) { @@ -2575,6 +2597,7 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt co_await smp::invoke_on_all([&] () -> future<> { auto tmptr = std::move(pending_token_metadata_ptr[this_shard_id()]); auto erms = std::move(pending_effective_replication_maps[this_shard_id()]); + auto table_erms = std::move(pending_table_erms[this_shard_id()]); co_await utils::clear_gently(erms); co_await utils::clear_gently(tmptr); @@ -2590,14 +2613,21 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt try { co_await container().invoke_on_all([&] (storage_service& ss) { ss._shared_token_metadata.set(std::move(pending_token_metadata_ptr[this_shard_id()])); + auto& db = ss._db.local(); auto& erms = pending_effective_replication_maps[this_shard_id()]; for (auto it = erms.begin(); it != erms.end(); ) { - auto& db = ss._db.local(); auto& ks = db.find_keyspace(it->first); ks.update_effective_replication_map(std::move(it->second)); it = erms.erase(it); } + + auto& table_erms = pending_table_erms[this_shard_id()]; + for (auto it = table_erms.begin(); it != table_erms.end(); ) { + auto& cf = db.find_column_family(it->first); + cf.update_effective_replication_map(std::move(it->second)); + it = table_erms.erase(it); + } }); } catch (...) { // applying the changes on all shards should never fail diff --git a/sstables_loader.cc b/sstables_loader.cc index 0cbd253029..ffe34e1c0c 100644 --- a/sstables_loader.cc +++ b/sstables_loader.cc @@ -123,7 +123,7 @@ future<> sstables_loader::load_and_stream(sstring ks_name, sstring cf_name, auto s = table.schema(); const auto cf_id = s->id(); const auto reason = streaming::stream_reason::repair; - auto erm = _db.local().find_keyspace(ks_name).get_effective_replication_map(); + auto erm = _db.local().find_column_family(s).get_effective_replication_map(); // By sorting SSTables by their primary key, we allow SSTable runs to be // incrementally streamed. diff --git a/test/boost/mutation_test.cc b/test/boost/mutation_test.cc index d6e4a8070c..8040d21819 100644 --- a/test/boost/mutation_test.cc +++ b/test/boost/mutation_test.cc @@ -103,7 +103,7 @@ with_column_family(schema_ptr s, replica::column_family::config cfg, sstables::s auto cm = make_lw_shared(tm, compaction_manager::for_testing_tag{}); auto cl_stats = make_lw_shared(); auto s_opts = make_lw_shared(); - auto cf = make_lw_shared(s, cfg, s_opts, replica::column_family::no_commitlog(), *cm, sm, *cl_stats, *tracker); + auto cf = make_lw_shared(s, cfg, s_opts, replica::column_family::no_commitlog(), *cm, sm, *cl_stats, *tracker, nullptr); cf->mark_ready_for_writes(); co_await func(*cf); co_await cf->stop(); diff --git a/test/boost/sstable_3_x_test.cc b/test/boost/sstable_3_x_test.cc index 4e1b1c0b5a..892ee3908e 100644 --- a/test/boost/sstable_3_x_test.cc +++ b/test/boost/sstable_3_x_test.cc @@ -3025,7 +3025,7 @@ static flat_mutation_reader_v2 compacted_sstable_reader(test_env& env, schema_pt auto cm = make_lw_shared(false); auto cl_stats = make_lw_shared(); auto tracker = make_lw_shared(); - auto cf = make_lw_shared(s, env.make_table_config(), make_lw_shared(), replica::column_family::no_commitlog(), **cm, env.manager(), *cl_stats, *tracker); + auto cf = make_lw_shared(s, env.make_table_config(), make_lw_shared(), replica::column_family::no_commitlog(), **cm, env.manager(), *cl_stats, *tracker, nullptr); cf->mark_ready_for_writes(); lw_shared_ptr mt = make_lw_shared(s); diff --git a/test/boost/sstable_compaction_test.cc b/test/boost/sstable_compaction_test.cc index d57268650a..7638b4913b 100644 --- a/test/boost/sstable_compaction_test.cc +++ b/test/boost/sstable_compaction_test.cc @@ -4205,7 +4205,7 @@ SEASTAR_TEST_CASE(max_ongoing_compaction_test) { cfg.enable_commitlog = false; cfg.enable_incremental_backups = false; - auto cf = make_lw_shared(s, cfg, make_lw_shared(), replica::column_family::no_commitlog(), *cm, env.manager(), *cl_stats, *tracker); + auto cf = make_lw_shared(s, cfg, make_lw_shared(), replica::column_family::no_commitlog(), *cm, env.manager(), *cl_stats, *tracker, nullptr); cf->start(); cf->mark_ready_for_writes(); tables.push_back(cf); diff --git a/test/lib/test_services.cc b/test/lib/test_services.cc index 01210be55c..8f3a41b870 100644 --- a/test/lib/test_services.cc +++ b/test/lib/test_services.cc @@ -128,7 +128,7 @@ table_for_tests::table_for_tests(sstables::sstables_manager& sstables_manager, s _data->cfg.cf_stats = &_data->cf_stats; _data->cfg.enable_commitlog = false; _data->cm.enable(); - _data->cf = make_lw_shared(_data->s, _data->cfg, make_lw_shared(), replica::column_family::no_commitlog(), _data->cm, sstables_manager, _data->cl_stats, _data->tracker); + _data->cf = make_lw_shared(_data->s, _data->cfg, make_lw_shared(), replica::column_family::no_commitlog(), _data->cm, sstables_manager, _data->cl_stats, _data->tracker, nullptr); _data->cf->mark_ready_for_writes(); _data->table_s = std::make_unique(*_data, sstables_manager); _data->cm.add(*_data->table_s); diff --git a/test/perf/perf_sstable.hh b/test/perf/perf_sstable.hh index f243b7dcf1..e14b7a45e8 100644 --- a/test/perf/perf_sstable.hh +++ b/test/perf/perf_sstable.hh @@ -228,7 +228,7 @@ public: cell_locker_stats cl_stats; tasks::task_manager tm; auto cm = make_lw_shared(tm, compaction_manager::for_testing_tag{}); - auto cf = make_lw_shared(s, env.make_table_config(), make_lw_shared(), replica::column_family::no_commitlog(), *cm, env.manager(), cl_stats, tracker); + auto cf = make_lw_shared(s, env.make_table_config(), make_lw_shared(), replica::column_family::no_commitlog(), *cm, env.manager(), cl_stats, tracker, nullptr); auto start = perf_sstable_test_env::now(); diff --git a/tombstone_gc.cc b/tombstone_gc.cc index 41e53492ed..bb873a9f31 100644 --- a/tombstone_gc.cc +++ b/tombstone_gc.cc @@ -167,9 +167,8 @@ static bool needs_repair_before_gc(const replica::database& db, sstring ks_name) // need to run repair even if tombstone_gc mode = repair. auto& ks = db.find_keyspace(ks_name); auto& rs = ks.get_replication_strategy(); - auto erm = ks.get_effective_replication_map(); bool needs_repair = rs.get_type() != locator::replication_strategy_type::local - && erm->get_replication_factor() != 1; + && rs.get_replication_factor(db.get_token_metadata()) != 1; return needs_repair; }