From dc77d128e935a183a7c297d539a149d35ee82fc4 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Wed, 2 Dec 2020 20:14:15 +0200 Subject: [PATCH] Revert "Merge "raft: fix replication if existing log on leader" from Gleb" This reverts commit 0aa1f7c70a49be47761ab8467ad1d35ca80faa87, reversing changes made to 72c59e8000397873fcd2907a0e533e36f13c1159. The diff is strange, including unrelated commits. There is no understanding of the cause, so to be safe, revert and try again. --- configure.py | 1 - database.hh | 44 ++ db/view/view.cc | 3 +- db/view/view_builder.hh | 1 + db/view/view_update_generator.cc | 2 +- flat_mutation_reader.hh | 17 +- mutation_reader.hh | 15 + position_in_partition.hh | 6 - raft/fsm.cc | 5 - repair/repair.cc | 12 +- sstables/compaction.cc | 9 +- sstables/compaction_strategy.cc | 390 +++++++++++++++- sstables/sstable_set.cc | 659 ---------------------------- sstables/sstable_set.hh | 63 +-- sstables/sstable_set_impl.hh | 98 ----- sstables/sstables.hh | 6 - table.cc | 265 ++++++++++- test/boost/mutation_reader_test.cc | 3 +- test/boost/raft_fsm_test.cc | 6 +- test/boost/sstable_datafile_test.cc | 43 +- test/raft/replication_test.cc | 4 +- 21 files changed, 733 insertions(+), 919 deletions(-) delete mode 100644 sstables/sstable_set.cc delete mode 100644 sstables/sstable_set_impl.hh diff --git a/configure.py b/configure.py index e903117415..201599d992 100755 --- a/configure.py +++ b/configure.py @@ -575,7 +575,6 @@ scylla_core = (['database.cc', 'sstables/mp_row_consumer.cc', 'sstables/sstables.cc', 'sstables/sstables_manager.cc', - 'sstables/sstable_set.cc', 'sstables/mx/writer.cc', 'sstables/kl/writer.cc', 'sstables/sstable_version.cc', diff --git a/database.hh b/database.hh index d77ec86923..88084fee72 100644 --- a/database.hh +++ b/database.hh @@ -1001,6 +1001,50 @@ public: friend class distributed_loader; }; +using sstable_reader_factory_type = std::function; + +// Filters out mutation that doesn't belong to current shard. +flat_mutation_reader make_local_shard_sstable_reader(schema_ptr s, + reader_permit permit, + lw_shared_ptr sstables, + const dht::partition_range& pr, + const query::partition_slice& slice, + const io_priority_class& pc, + tracing::trace_state_ptr trace_state, + streamed_mutation::forwarding fwd, + mutation_reader::forwarding fwd_mr, + sstables::read_monitor_generator& monitor_generator = sstables::default_read_monitor_generator()); + +/// Read a range from the passed-in sstables. +/// +/// The reader is unrestricted, but will account its resource usage on the +/// semaphore belonging to the passed-in permit. +flat_mutation_reader make_range_sstable_reader(schema_ptr s, + reader_permit permit, + lw_shared_ptr sstables, + const dht::partition_range& pr, + const query::partition_slice& slice, + const io_priority_class& pc, + tracing::trace_state_ptr trace_state, + streamed_mutation::forwarding fwd, + mutation_reader::forwarding fwd_mr, + sstables::read_monitor_generator& monitor_generator = sstables::default_read_monitor_generator()); + +/// Read a range from the passed-in sstables. +/// +/// The reader is restricted, that is it will wait for admission on the semaphore +/// belonging to the passed-in permit, before starting to read. +flat_mutation_reader make_restricted_range_sstable_reader(schema_ptr s, + reader_permit permit, + lw_shared_ptr sstables, + const dht::partition_range& pr, + const query::partition_slice& slice, + const io_priority_class& pc, + tracing::trace_state_ptr trace_state, + streamed_mutation::forwarding fwd, + mutation_reader::forwarding fwd_mr, + sstables::read_monitor_generator& monitor_generator = sstables::default_read_monitor_generator()); + class user_types_metadata; class keyspace_metadata final { diff --git a/db/view/view.cc b/db/view/view.cc index cf264a4f0c..fbdaff03fb 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -1438,9 +1438,10 @@ view_builder::build_step& view_builder::get_or_create_build_step(utils::UUID bas void view_builder::initialize_reader_at_current_token(build_step& step) { step.pslice = make_partition_slice(*step.base->schema()); step.prange = dht::partition_range(dht::ring_position::starting_at(step.current_token()), dht::ring_position::max()); - step.reader = step.base->get_sstable_set().make_local_shard_sstable_reader( + step.reader = make_local_shard_sstable_reader( step.base->schema(), _permit, + make_lw_shared(step.base->get_sstable_set()), step.prange, step.pslice, default_priority_class(), diff --git a/db/view/view_builder.hh b/db/view/view_builder.hh index 1364571806..0e4d997934 100644 --- a/db/view/view_builder.hh +++ b/db/view/view_builder.hh @@ -28,6 +28,7 @@ #include "query-request.hh" #include "service/migration_listener.hh" #include "service/migration_manager.hh" +#include "sstables/sstable_set.hh" #include "utils/exponential_backoff_retry.hh" #include "utils/serialized_action.hh" #include "utils/UUID.hh" diff --git a/db/view/view_update_generator.cc b/db/view/view_update_generator.cc index 703d8ce454..a43c98097e 100644 --- a/db/view/view_update_generator.cc +++ b/db/view/view_update_generator.cc @@ -82,7 +82,7 @@ future<> view_update_generator::start() { tracing::trace_state_ptr ts, streamed_mutation::forwarding fwd_ms, mutation_reader::forwarding fwd_mr) { - return make_restricted_range_sstable_reader(std::move(ssts), s, std::move(permit), pr, ps, pc, std::move(ts), fwd_ms, fwd_mr); + return ::make_restricted_range_sstable_reader(s, std::move(permit), std::move(ssts), pr, ps, pc, std::move(ts), fwd_ms, fwd_mr); }); auto [staging_sstable_reader, staging_sstable_reader_handle] = make_manually_paused_evictable_reader( std::move(ms), diff --git a/flat_mutation_reader.hh b/flat_mutation_reader.hh index 769132d704..813324abee 100644 --- a/flat_mutation_reader.hh +++ b/flat_mutation_reader.hh @@ -310,7 +310,7 @@ private: friend class optimized_optional; void do_upgrade_schema(const schema_ptr&); public: - // Documented in mutation_reader::forwarding. + // Documented in mutation_reader::forwarding in mutation_reader.hh. class partition_range_forwarding_tag; using partition_range_forwarding = bool_class; @@ -519,21 +519,6 @@ flat_mutation_reader make_flat_mutation_reader(Args &&... args) { return flat_mutation_reader(std::make_unique(std::forward(args)...)); } -namespace mutation_reader { - // mutation_reader::forwarding determines whether fast_forward_to() may - // be used on the mutation reader to change the partition range being - // read. Enabling forwarding also changes read policy: forwarding::no - // means we will stop reading from disk at the end of the given range, - // but with forwarding::yes we may read ahead, anticipating the user to - // make a small skip with fast_forward_to() and continuing to read. - // - // Note that mutation_reader::forwarding is similarly name but different - // from streamed_mutation::forwarding - the former is about skipping to - // a different partition range, while the latter is about skipping - // inside a large partition. - using forwarding = flat_mutation_reader::partition_range_forwarding; -} - // Consumes mutation fragments until StopCondition is true. // The consumer will stop iff StopCondition returns true, in particular // reaching the end of stream alone won't stop the reader. diff --git a/mutation_reader.hh b/mutation_reader.hh index 3a3dd50b25..9f7bcb82f7 100644 --- a/mutation_reader.hh +++ b/mutation_reader.hh @@ -31,6 +31,21 @@ #include "flat_mutation_reader.hh" #include "reader_concurrency_semaphore.hh" +namespace mutation_reader { + // mutation_reader::forwarding determines whether fast_forward_to() may + // be used on the mutation reader to change the partition range being + // read. Enabling forwarding also changes read policy: forwarding::no + // means we will stop reading from disk at the end of the given range, + // but with forwarding::yes we may read ahead, anticipating the user to + // make a small skip with fast_forward_to() and continuing to read. + // + // Note that mutation_reader::forwarding is similarly name but different + // from streamed_mutation::forwarding - the former is about skipping to + // a different partition range, while the latter is about skipping + // inside a large partition. + using forwarding = flat_mutation_reader::partition_range_forwarding; +} + class reader_selector { protected: schema_ptr _s; diff --git a/position_in_partition.hh b/position_in_partition.hh index 6deeaa2c6e..7dbbd987cc 100644 --- a/position_in_partition.hh +++ b/position_in_partition.hh @@ -593,7 +593,6 @@ public: position_in_partition&& end() && { return std::move(_end); } bool contains(const schema& s, position_in_partition_view pos) const; bool overlaps(const schema& s, position_in_partition_view start, position_in_partition_view end) const; - bool is_all_clustered_rows(const schema&) const; friend std::ostream& operator<<(std::ostream&, const position_range&); }; @@ -611,8 +610,3 @@ bool position_range::overlaps(const schema& s, position_in_partition_view start, position_in_partition::less_compare less(s); return !less(end, _start) && less(start, _end); } - -inline -bool position_range::is_all_clustered_rows(const schema& s) const { - return _start.is_before_all_clustered_rows(s) && _end.is_after_all_clustered_rows(s); -} diff --git a/raft/fsm.cc b/raft/fsm.cc index 7a4680c9a7..cd291a2bf7 100644 --- a/raft/fsm.cc +++ b/raft/fsm.cc @@ -98,11 +98,6 @@ void fsm::become_leader() { _log_limiter_semaphore->sem.consume(_log.non_snapshoted_length()); _tracker->set_configuration(_current_config.servers, _log.next_idx()); _last_election_time = _clock.now(); - // a new leader needs to commit at lease one entry to make sure that - // all existing entries in its log are commited as well. Also it should - // send append entries rpc as soon as possible to establish its leqdership - // (3.4). Do both of those by commiting a dummy entry. - add_entry(log_entry::dummy()); replicate(); } diff --git a/repair/repair.cc b/repair/repair.cc index 2633e1201f..262b31d2ad 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -60,17 +60,17 @@ public: auto ops_label_type = sm::label("ops"); _metrics.add_group("node_ops", { sm::make_gauge("finished_percentage", [this] { return bootstrap_finished_percentage(); }, - sm::description("Finished percentage of node operation on this shard"), {ops_label_type("bootstrap")}), + sm::description("Number of finished percentage for bootstrap operation on this shard."), {ops_label_type("bootstrap")}), sm::make_gauge("finished_percentage", [this] { return replace_finished_percentage(); }, - sm::description("Finished percentage of node operation on this shard"), {ops_label_type("replace")}), + sm::description("Number of finished percentage for replace operation on this shard."), {ops_label_type("replace")}), sm::make_gauge("finished_percentage", [this] { return rebuild_finished_percentage(); }, - sm::description("Finished percentage of node operation on this shard"), {ops_label_type("rebuild")}), + sm::description("Number of finished percentage for rebuild operation on this shard."), {ops_label_type("rebuild")}), sm::make_gauge("finished_percentage", [this] { return decommission_finished_percentage(); }, - sm::description("Finished percentage of node operation on this shard"), {ops_label_type("decommission")}), + sm::description("Number of finished percentage for decommission operation on this shard."), {ops_label_type("decommission")}), sm::make_gauge("finished_percentage", [this] { return removenode_finished_percentage(); }, - sm::description("Finished percentage of node operation on this shard"), {ops_label_type("removenode")}), + sm::description("Number of finished percentage for removenode operation on this shard."), {ops_label_type("removenode")}), sm::make_gauge("finished_percentage", [this] { return repair_finished_percentage(); }, - sm::description("Finished percentage of node operation on this shard"), {ops_label_type("repair")}), + sm::description("Number of finished percentage for repair operation on this shard."), {ops_label_type("repair")}), }); } uint64_t bootstrap_total_ranges{0}; diff --git a/sstables/compaction.cc b/sstables/compaction.cc index 791c18597e..b5094bb6ed 100644 --- a/sstables/compaction.cc +++ b/sstables/compaction.cc @@ -822,8 +822,9 @@ public: } flat_mutation_reader make_sstable_reader() const override { - return _compacting->make_local_shard_sstable_reader(_schema, + return ::make_local_shard_sstable_reader(_schema, _permit, + _compacting, query::full_partition_range, _schema->full_slice(), _io_priority, @@ -868,8 +869,9 @@ public: } flat_mutation_reader make_sstable_reader() const override { - return _compacting->make_local_shard_sstable_reader(_schema, + return ::make_local_shard_sstable_reader(_schema, _permit, + _compacting, query::full_partition_range, _schema->full_slice(), _io_priority, @@ -1342,8 +1344,9 @@ public: // Use reader that makes sure no non-local mutation will not be filtered out. flat_mutation_reader make_sstable_reader() const override { - return _compacting->make_range_sstable_reader(_schema, + return ::make_range_sstable_reader(_schema, _permit, + _compacting, query::full_partition_range, _schema->full_slice(), _io_priority, diff --git a/sstables/compaction_strategy.cc b/sstables/compaction_strategy.cc index 63a51b7395..599af7e5ae 100644 --- a/sstables/compaction_strategy.cc +++ b/sstables/compaction_strategy.cc @@ -47,8 +47,10 @@ #include "compaction_strategy_impl.hh" #include "schema.hh" #include "sstable_set.hh" +#include "compatible_ring_position.hh" #include #include +#include #include #include "size_tiered_compaction_strategy.hh" #include "date_tiered_compaction_strategy.hh" @@ -62,6 +64,384 @@ logging::logger leveled_manifest::logger("LeveledManifest"); namespace sstables { +extern logging::logger clogger; + +void sstable_run::insert(shared_sstable sst) { + _all.insert(std::move(sst)); +} + +void sstable_run::erase(shared_sstable sst) { + _all.erase(sst); +} + +uint64_t sstable_run::data_size() const { + return boost::accumulate(_all | boost::adaptors::transformed(std::mem_fn(&sstable::data_size)), uint64_t(0)); +} + +std::ostream& operator<<(std::ostream& os, const sstables::sstable_run& run) { + os << "Run = {\n"; + if (run.all().empty()) { + os << " Identifier: not found\n"; + } else { + os << format(" Identifier: {}\n", (*run.all().begin())->run_identifier()); + } + + auto frags = boost::copy_range>(run.all()); + boost::sort(frags, [] (const shared_sstable& x, const shared_sstable& y) { + return x->get_first_decorated_key().token() < y->get_first_decorated_key().token(); + }); + os << " Fragments = {\n"; + for (auto& frag : frags) { + os << format(" {}={}:{}\n", frag->generation(), frag->get_first_decorated_key().token(), frag->get_last_decorated_key().token()); + } + os << " }\n}\n"; + return os; +} + +class incremental_selector_impl { +public: + virtual ~incremental_selector_impl() {} + virtual std::tuple, dht::ring_position_view> select(const dht::ring_position_view&) = 0; +}; + +class sstable_set_impl { +public: + virtual ~sstable_set_impl() {} + virtual std::unique_ptr clone() const = 0; + virtual std::vector select(const dht::partition_range& range) const = 0; + virtual void insert(shared_sstable sst) = 0; + virtual void erase(shared_sstable sst) = 0; + virtual std::unique_ptr make_incremental_selector() const = 0; +}; + +sstable_set::sstable_set(std::unique_ptr impl, schema_ptr s, lw_shared_ptr all) + : _impl(std::move(impl)) + , _schema(std::move(s)) + , _all(std::move(all)) { +} + +sstable_set::sstable_set(const sstable_set& x) + : _impl(x._impl->clone()) + , _schema(x._schema) + , _all(make_lw_shared(*x._all)) + , _all_runs(x._all_runs) { +} + +sstable_set::sstable_set(sstable_set&&) noexcept = default; + +sstable_set& +sstable_set::operator=(const sstable_set& x) { + if (this != &x) { + auto tmp = sstable_set(x); + *this = std::move(tmp); + } + return *this; +} + +sstable_set& +sstable_set::operator=(sstable_set&&) noexcept = default; + +std::vector +sstable_set::select(const dht::partition_range& range) const { + return _impl->select(range); +} + +std::vector +sstable_set::select_sstable_runs(const std::vector& sstables) const { + auto run_ids = boost::copy_range>(sstables | boost::adaptors::transformed(std::mem_fn(&sstable::run_identifier))); + return boost::copy_range>(run_ids | boost::adaptors::transformed([this] (utils::UUID run_id) { + return _all_runs.at(run_id); + })); +} + +void +sstable_set::insert(shared_sstable sst) { + _impl->insert(sst); + try { + _all->insert(sst); + try { + _all_runs[sst->run_identifier()].insert(sst); + } catch (...) { + _all->erase(sst); + throw; + } + } catch (...) { + _impl->erase(sst); + throw; + } +} + +void +sstable_set::erase(shared_sstable sst) { + _impl->erase(sst); + _all->erase(sst); + _all_runs[sst->run_identifier()].erase(sst); +} + +sstable_set::~sstable_set() = default; + +sstable_set::incremental_selector::incremental_selector(std::unique_ptr impl, const schema& s) + : _impl(std::move(impl)) + , _cmp(s) { +} + +sstable_set::incremental_selector::~incremental_selector() = default; + +sstable_set::incremental_selector::incremental_selector(sstable_set::incremental_selector&&) noexcept = default; + +sstable_set::incremental_selector::selection +sstable_set::incremental_selector::select(const dht::ring_position_view& pos) const { + if (!_current_range_view || !_current_range_view->contains(pos, _cmp)) { + std::tie(_current_range, _current_sstables, _current_next_position) = _impl->select(pos); + _current_range_view = _current_range->transform([] (const dht::ring_position& rp) { return dht::ring_position_view(rp); }); + } + return {_current_sstables, _current_next_position}; +} + +sstable_set::incremental_selector +sstable_set::make_incremental_selector() const { + return incremental_selector(_impl->make_incremental_selector(), *_schema); +} + +// default sstable_set, not specialized for anything +class bag_sstable_set : public sstable_set_impl { + // erasing is slow, but select() is fast + std::vector _sstables; +public: + virtual std::unique_ptr clone() const override { + return std::make_unique(*this); + } + virtual std::vector select(const dht::partition_range& range = query::full_partition_range) const override { + return _sstables; + } + virtual void insert(shared_sstable sst) override { + _sstables.push_back(std::move(sst)); + } + virtual void erase(shared_sstable sst) override { + auto it = boost::range::find(_sstables, sst); + if (it != _sstables.end()){ + _sstables.erase(it); + } + } + virtual std::unique_ptr make_incremental_selector() const override; + class incremental_selector; +}; + +class bag_sstable_set::incremental_selector : public incremental_selector_impl { + const std::vector& _sstables; +public: + incremental_selector(const std::vector& sstables) + : _sstables(sstables) { + } + virtual std::tuple, dht::ring_position_view> select(const dht::ring_position_view&) override { + return std::make_tuple(dht::partition_range::make_open_ended_both_sides(), _sstables, dht::ring_position_view::max()); + } +}; + +std::unique_ptr bag_sstable_set::make_incremental_selector() const { + return std::make_unique(_sstables); +} + +// specialized when sstables are partitioned in the token range space +// e.g. leveled compaction strategy +class partitioned_sstable_set : public sstable_set_impl { + using value_set = std::unordered_set; + using interval_map_type = boost::icl::interval_map; + using interval_type = interval_map_type::interval_type; + using map_iterator = interval_map_type::const_iterator; +private: + schema_ptr _schema; + std::vector _unleveled_sstables; + interval_map_type _leveled_sstables; + // Change counter on interval map for leveled sstables which is used by + // incremental selector to determine whether or not to invalidate iterators. + uint64_t _leveled_sstables_change_cnt = 0; + bool _use_level_metadata = false; +private: + static interval_type make_interval(const schema& s, const dht::partition_range& range) { + return interval_type::closed( + compatible_ring_position_or_view(s, dht::ring_position_view(range.start()->value())), + compatible_ring_position_or_view(s, dht::ring_position_view(range.end()->value()))); + } + interval_type make_interval(const dht::partition_range& range) const { + return make_interval(*_schema, range); + } + static interval_type make_interval(const schema_ptr& s, const sstable& sst) { + return interval_type::closed( + compatible_ring_position_or_view(s, dht::ring_position(sst.get_first_decorated_key())), + compatible_ring_position_or_view(s, dht::ring_position(sst.get_last_decorated_key()))); + } + interval_type make_interval(const sstable& sst) { + return make_interval(_schema, sst); + } + interval_type singular(const dht::ring_position& rp) const { + // We should use the view here, since this is used for queries. + auto rpv = dht::ring_position_view(rp); + auto crp = compatible_ring_position_or_view(*_schema, std::move(rpv)); + return interval_type::closed(crp, crp); + } + std::pair query(const dht::partition_range& range) const { + if (range.start() && range.end()) { + return _leveled_sstables.equal_range(make_interval(range)); + } + else if (range.start() && !range.end()) { + auto start = singular(range.start()->value()); + return { _leveled_sstables.lower_bound(start), _leveled_sstables.end() }; + } else if (!range.start() && range.end()) { + auto end = singular(range.end()->value()); + return { _leveled_sstables.begin(), _leveled_sstables.upper_bound(end) }; + } else { + return { _leveled_sstables.begin(), _leveled_sstables.end() }; + } + } + // SSTables are stored separately to avoid interval map's fragmentation issue when level 0 falls behind. + bool store_as_unleveled(const shared_sstable& sst) const { + return _use_level_metadata && sst->get_sstable_level() == 0; + } +public: + static dht::ring_position to_ring_position(const compatible_ring_position_or_view& crp) { + // Ring position views, representing bounds of sstable intervals are + // guaranteed to have key() != nullptr; + const auto& pos = crp.position(); + return dht::ring_position(pos.token(), *pos.key()); + } + static dht::partition_range to_partition_range(const interval_type& i) { + return dht::partition_range::make( + {to_ring_position(i.lower()), boost::icl::is_left_closed(i.bounds())}, + {to_ring_position(i.upper()), boost::icl::is_right_closed(i.bounds())}); + } + static dht::partition_range to_partition_range(const dht::ring_position_view& pos, const interval_type& i) { + auto lower_bound = [&] { + if (pos.key()) { + return dht::partition_range::bound(dht::ring_position(pos.token(), *pos.key()), + pos.is_after_key() == dht::ring_position_view::after_key::no); + } else { + return dht::partition_range::bound(dht::ring_position(pos.token(), pos.get_token_bound()), true); + } + }(); + auto upper_bound = dht::partition_range::bound(to_ring_position(i.lower()), !boost::icl::is_left_closed(i.bounds())); + return dht::partition_range::make(std::move(lower_bound), std::move(upper_bound)); + } + explicit partitioned_sstable_set(schema_ptr schema, bool use_level_metadata = true) + : _schema(std::move(schema)) + , _use_level_metadata(use_level_metadata) { + } + virtual std::unique_ptr clone() const override { + return std::make_unique(*this); + } + virtual std::vector select(const dht::partition_range& range) const override { + auto ipair = query(range); + auto b = std::move(ipair.first); + auto e = std::move(ipair.second); + value_set result; + while (b != e) { + boost::copy(b++->second, std::inserter(result, result.end())); + } + auto r = _unleveled_sstables; + r.insert(r.end(), result.begin(), result.end()); + return r; + } + virtual void insert(shared_sstable sst) override { + if (store_as_unleveled(sst)) { + _unleveled_sstables.push_back(std::move(sst)); + } else { + _leveled_sstables_change_cnt++; + _leveled_sstables.add({make_interval(*sst), value_set({sst})}); + } + } + virtual void erase(shared_sstable sst) override { + if (store_as_unleveled(sst)) { + _unleveled_sstables.erase(std::remove(_unleveled_sstables.begin(), _unleveled_sstables.end(), sst), _unleveled_sstables.end()); + } else { + _leveled_sstables_change_cnt++; + _leveled_sstables.subtract({make_interval(*sst), value_set({sst})}); + } + } + virtual std::unique_ptr make_incremental_selector() const override; + class incremental_selector; +}; + +class partitioned_sstable_set::incremental_selector : public incremental_selector_impl { + schema_ptr _schema; + const std::vector& _unleveled_sstables; + const interval_map_type& _leveled_sstables; + const uint64_t& _leveled_sstables_change_cnt; + uint64_t _last_known_leveled_sstables_change_cnt; + map_iterator _it; + // Only to back the dht::ring_position_view returned from select(). + dht::ring_position _next_position; +private: + dht::ring_position_view next_position(map_iterator it) { + if (it == _leveled_sstables.end()) { + _next_position = dht::ring_position::max(); + return dht::ring_position_view::max(); + } else { + _next_position = partitioned_sstable_set::to_ring_position(it->first.lower()); + return dht::ring_position_view(_next_position, dht::ring_position_view::after_key(!boost::icl::is_left_closed(it->first.bounds()))); + } + } + static bool is_before_interval(const compatible_ring_position_or_view& crp, const interval_type& interval) { + if (boost::icl::is_left_closed(interval.bounds())) { + return crp < interval.lower(); + } else { + return crp <= interval.lower(); + } + } + void maybe_invalidate_iterator(const compatible_ring_position_or_view& crp) { + if (_last_known_leveled_sstables_change_cnt != _leveled_sstables_change_cnt) { + _it = _leveled_sstables.lower_bound(interval_type::closed(crp, crp)); + _last_known_leveled_sstables_change_cnt = _leveled_sstables_change_cnt; + } + } +public: + incremental_selector(schema_ptr schema, const std::vector& unleveled_sstables, const interval_map_type& leveled_sstables, + const uint64_t& leveled_sstables_change_cnt) + : _schema(std::move(schema)) + , _unleveled_sstables(unleveled_sstables) + , _leveled_sstables(leveled_sstables) + , _leveled_sstables_change_cnt(leveled_sstables_change_cnt) + , _last_known_leveled_sstables_change_cnt(leveled_sstables_change_cnt) + , _it(leveled_sstables.begin()) + , _next_position(dht::ring_position::min()) { + } + virtual std::tuple, dht::ring_position_view> select(const dht::ring_position_view& pos) override { + auto crp = compatible_ring_position_or_view(*_schema, pos); + auto ssts = _unleveled_sstables; + using namespace dht; + + maybe_invalidate_iterator(crp); + + while (_it != _leveled_sstables.end()) { + if (boost::icl::contains(_it->first, crp)) { + ssts.insert(ssts.end(), _it->second.begin(), _it->second.end()); + return std::make_tuple(partitioned_sstable_set::to_partition_range(_it->first), std::move(ssts), next_position(std::next(_it))); + } + // We don't want to skip current interval if pos lies before it. + if (is_before_interval(crp, _it->first)) { + return std::make_tuple(partitioned_sstable_set::to_partition_range(pos, _it->first), std::move(ssts), next_position(_it)); + } + _it++; + } + return std::make_tuple(partition_range::make_open_ended_both_sides(), std::move(ssts), ring_position_view::max()); + } +}; + +std::unique_ptr partitioned_sstable_set::make_incremental_selector() const { + return std::make_unique(_schema, _unleveled_sstables, _leveled_sstables, _leveled_sstables_change_cnt); +} + +std::unique_ptr compaction_strategy_impl::make_sstable_set(schema_ptr schema) const { + return std::make_unique(); +} + +std::unique_ptr leveled_compaction_strategy::make_sstable_set(schema_ptr schema) const { + return std::make_unique(std::move(schema)); +} + +sstable_set make_partitioned_sstable_set(schema_ptr schema, lw_shared_ptr all, bool use_level_metadata) { + return sstables::sstable_set(std::make_unique(schema, use_level_metadata), schema, std::move(all)); +} + compaction_descriptor compaction_strategy_impl::get_major_compaction_job(column_family& cf, std::vector candidates) { return compaction_descriptor(std::move(candidates), cf.get_sstable_set(), service::get_local_compaction_priority()); } @@ -170,8 +550,6 @@ void size_tiered_backlog_tracker::remove_sstable(sstables::shared_sstable sst) { namespace sstables { -extern logging::logger clogger; - // The backlog for TWCS is just the sum of the individual backlogs in each time window. // We'll keep various SizeTiered backlog tracker objects-- one per window for the static SSTables. // We then scan the current compacting and in-progress writes and matching them to existing time @@ -653,6 +1031,14 @@ bool compaction_strategy::use_clustering_key_filter() const { return _compaction_strategy_impl->use_clustering_key_filter(); } +sstable_set +compaction_strategy::make_sstable_set(schema_ptr schema) const { + return sstable_set( + _compaction_strategy_impl->make_sstable_set(schema), + schema, + make_lw_shared()); +} + compaction_backlog_tracker& compaction_strategy::get_backlog_tracker() { return _compaction_strategy_impl->get_backlog_tracker(); } diff --git a/sstables/sstable_set.cc b/sstables/sstable_set.cc deleted file mode 100644 index 4e9b9f0d17..0000000000 --- a/sstables/sstable_set.cc +++ /dev/null @@ -1,659 +0,0 @@ -/* - * Copyright (C) 2020 ScyllaDB - */ - -/* - * This file is part of Scylla. - * - * Scylla is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * Scylla is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with Scylla. If not, see . - */ - -#include - -#include "compatible_ring_position.hh" -#include "compaction_strategy_impl.hh" -#include "leveled_compaction_strategy.hh" - -#include "sstable_set_impl.hh" - -namespace sstables { - -void sstable_run::insert(shared_sstable sst) { - _all.insert(std::move(sst)); -} - -void sstable_run::erase(shared_sstable sst) { - _all.erase(sst); -} - -uint64_t sstable_run::data_size() const { - return boost::accumulate(_all | boost::adaptors::transformed(std::mem_fn(&sstable::data_size)), uint64_t(0)); -} - -std::ostream& operator<<(std::ostream& os, const sstables::sstable_run& run) { - os << "Run = {\n"; - if (run.all().empty()) { - os << " Identifier: not found\n"; - } else { - os << format(" Identifier: {}\n", (*run.all().begin())->run_identifier()); - } - - auto frags = boost::copy_range>(run.all()); - boost::sort(frags, [] (const shared_sstable& x, const shared_sstable& y) { - return x->get_first_decorated_key().token() < y->get_first_decorated_key().token(); - }); - os << " Fragments = {\n"; - for (auto& frag : frags) { - os << format(" {}={}:{}\n", frag->generation(), frag->get_first_decorated_key().token(), frag->get_last_decorated_key().token()); - } - os << " }\n}\n"; - return os; -} - -sstable_set::sstable_set(std::unique_ptr impl, schema_ptr s, lw_shared_ptr all) - : _impl(std::move(impl)) - , _schema(std::move(s)) - , _all(std::move(all)) { -} - -sstable_set::sstable_set(const sstable_set& x) - : _impl(x._impl->clone()) - , _schema(x._schema) - , _all(make_lw_shared(*x._all)) - , _all_runs(x._all_runs) { -} - -sstable_set::sstable_set(sstable_set&&) noexcept = default; - -sstable_set& -sstable_set::operator=(const sstable_set& x) { - if (this != &x) { - auto tmp = sstable_set(x); - *this = std::move(tmp); - } - return *this; -} - -sstable_set& -sstable_set::operator=(sstable_set&&) noexcept = default; - -std::vector -sstable_set::select(const dht::partition_range& range) const { - return _impl->select(range); -} - -std::vector -sstable_set::select_sstable_runs(const std::vector& sstables) const { - auto run_ids = boost::copy_range>(sstables | boost::adaptors::transformed(std::mem_fn(&sstable::run_identifier))); - return boost::copy_range>(run_ids | boost::adaptors::transformed([this] (utils::UUID run_id) { - return _all_runs.at(run_id); - })); -} - -void -sstable_set::insert(shared_sstable sst) { - _impl->insert(sst); - try { - _all->insert(sst); - try { - _all_runs[sst->run_identifier()].insert(sst); - } catch (...) { - _all->erase(sst); - throw; - } - } catch (...) { - _impl->erase(sst); - throw; - } -} - -void -sstable_set::erase(shared_sstable sst) { - _impl->erase(sst); - _all->erase(sst); - _all_runs[sst->run_identifier()].erase(sst); -} - -sstable_set::~sstable_set() = default; - -sstable_set::incremental_selector::incremental_selector(std::unique_ptr impl, const schema& s) - : _impl(std::move(impl)) - , _cmp(s) { -} - -sstable_set::incremental_selector::~incremental_selector() = default; - -sstable_set::incremental_selector::incremental_selector(sstable_set::incremental_selector&&) noexcept = default; - -sstable_set::incremental_selector::selection -sstable_set::incremental_selector::select(const dht::ring_position_view& pos) const { - if (!_current_range_view || !_current_range_view->contains(pos, _cmp)) { - std::tie(_current_range, _current_sstables, _current_next_position) = _impl->select(pos); - _current_range_view = _current_range->transform([] (const dht::ring_position& rp) { return dht::ring_position_view(rp); }); - } - return {_current_sstables, _current_next_position}; -} - -sstable_set::incremental_selector -sstable_set::make_incremental_selector() const { - return incremental_selector(_impl->make_incremental_selector(), *_schema); -} - -std::unique_ptr bag_sstable_set::clone() const { - return std::make_unique(*this); -} - -std::vector bag_sstable_set::select(const dht::partition_range& range) const { - return _sstables; -} - -void bag_sstable_set::insert(shared_sstable sst) { - _sstables.push_back(std::move(sst)); -} - -void bag_sstable_set::erase(shared_sstable sst) { - auto it = boost::range::find(_sstables, sst); - if (it != _sstables.end()){ - _sstables.erase(it); - } -} - -class bag_sstable_set::incremental_selector : public incremental_selector_impl { - const std::vector& _sstables; -public: - incremental_selector(const std::vector& sstables) - : _sstables(sstables) { - } - virtual std::tuple, dht::ring_position_view> select(const dht::ring_position_view&) override { - return std::make_tuple(dht::partition_range::make_open_ended_both_sides(), _sstables, dht::ring_position_view::max()); - } -}; - -std::unique_ptr bag_sstable_set::make_incremental_selector() const { - return std::make_unique(_sstables); -} - -partitioned_sstable_set::interval_type partitioned_sstable_set::make_interval(const schema& s, const dht::partition_range& range) { - return interval_type::closed( - compatible_ring_position_or_view(s, dht::ring_position_view(range.start()->value())), - compatible_ring_position_or_view(s, dht::ring_position_view(range.end()->value()))); -} - -partitioned_sstable_set::interval_type partitioned_sstable_set::make_interval(const dht::partition_range& range) const { - return make_interval(*_schema, range); -} - -partitioned_sstable_set::interval_type partitioned_sstable_set::make_interval(const schema_ptr& s, const sstable& sst) { - return interval_type::closed( - compatible_ring_position_or_view(s, dht::ring_position(sst.get_first_decorated_key())), - compatible_ring_position_or_view(s, dht::ring_position(sst.get_last_decorated_key()))); -} - -partitioned_sstable_set::interval_type partitioned_sstable_set::make_interval(const sstable& sst) { - return make_interval(_schema, sst); -} - -partitioned_sstable_set::interval_type partitioned_sstable_set::singular(const dht::ring_position& rp) const { - // We should use the view here, since this is used for queries. - auto rpv = dht::ring_position_view(rp); - auto crp = compatible_ring_position_or_view(*_schema, std::move(rpv)); - return interval_type::closed(crp, crp); -} - -std::pair -partitioned_sstable_set::query(const dht::partition_range& range) const { - if (range.start() && range.end()) { - return _leveled_sstables.equal_range(make_interval(range)); - } - else if (range.start() && !range.end()) { - auto start = singular(range.start()->value()); - return { _leveled_sstables.lower_bound(start), _leveled_sstables.end() }; - } else if (!range.start() && range.end()) { - auto end = singular(range.end()->value()); - return { _leveled_sstables.begin(), _leveled_sstables.upper_bound(end) }; - } else { - return { _leveled_sstables.begin(), _leveled_sstables.end() }; - } -} - -bool partitioned_sstable_set::store_as_unleveled(const shared_sstable& sst) const { - return _use_level_metadata && sst->get_sstable_level() == 0; -} - -dht::ring_position partitioned_sstable_set::to_ring_position(const compatible_ring_position_or_view& crp) { - // Ring position views, representing bounds of sstable intervals are - // guaranteed to have key() != nullptr; - const auto& pos = crp.position(); - return dht::ring_position(pos.token(), *pos.key()); -} - -dht::partition_range partitioned_sstable_set::to_partition_range(const interval_type& i) { - return dht::partition_range::make( - {to_ring_position(i.lower()), boost::icl::is_left_closed(i.bounds())}, - {to_ring_position(i.upper()), boost::icl::is_right_closed(i.bounds())}); -} - -dht::partition_range partitioned_sstable_set::to_partition_range(const dht::ring_position_view& pos, const interval_type& i) { - auto lower_bound = [&] { - if (pos.key()) { - return dht::partition_range::bound(dht::ring_position(pos.token(), *pos.key()), - pos.is_after_key() == dht::ring_position_view::after_key::no); - } else { - return dht::partition_range::bound(dht::ring_position(pos.token(), pos.get_token_bound()), true); - } - }(); - auto upper_bound = dht::partition_range::bound(to_ring_position(i.lower()), !boost::icl::is_left_closed(i.bounds())); - return dht::partition_range::make(std::move(lower_bound), std::move(upper_bound)); -} - -partitioned_sstable_set::partitioned_sstable_set(schema_ptr schema, bool use_level_metadata) - : _schema(std::move(schema)) - , _use_level_metadata(use_level_metadata) { -} - -std::unique_ptr partitioned_sstable_set::clone() const { - return std::make_unique(*this); -} - -std::vector partitioned_sstable_set::select(const dht::partition_range& range) const { - auto ipair = query(range); - auto b = std::move(ipair.first); - auto e = std::move(ipair.second); - value_set result; - while (b != e) { - boost::copy(b++->second, std::inserter(result, result.end())); - } - auto r = _unleveled_sstables; - r.insert(r.end(), result.begin(), result.end()); - return r; -} - -void partitioned_sstable_set::insert(shared_sstable sst) { - if (store_as_unleveled(sst)) { - _unleveled_sstables.push_back(std::move(sst)); - } else { - _leveled_sstables_change_cnt++; - _leveled_sstables.add({make_interval(*sst), value_set({sst})}); - } -} - -void partitioned_sstable_set::erase(shared_sstable sst) { - if (store_as_unleveled(sst)) { - _unleveled_sstables.erase(std::remove(_unleveled_sstables.begin(), _unleveled_sstables.end(), sst), _unleveled_sstables.end()); - } else { - _leveled_sstables_change_cnt++; - _leveled_sstables.subtract({make_interval(*sst), value_set({sst})}); - } -} - -class partitioned_sstable_set::incremental_selector : public incremental_selector_impl { - schema_ptr _schema; - const std::vector& _unleveled_sstables; - const interval_map_type& _leveled_sstables; - const uint64_t& _leveled_sstables_change_cnt; - uint64_t _last_known_leveled_sstables_change_cnt; - map_iterator _it; - // Only to back the dht::ring_position_view returned from select(). - dht::ring_position _next_position; -private: - dht::ring_position_view next_position(map_iterator it) { - if (it == _leveled_sstables.end()) { - _next_position = dht::ring_position::max(); - return dht::ring_position_view::max(); - } else { - _next_position = partitioned_sstable_set::to_ring_position(it->first.lower()); - return dht::ring_position_view(_next_position, dht::ring_position_view::after_key(!boost::icl::is_left_closed(it->first.bounds()))); - } - } - static bool is_before_interval(const compatible_ring_position_or_view& crp, const interval_type& interval) { - if (boost::icl::is_left_closed(interval.bounds())) { - return crp < interval.lower(); - } else { - return crp <= interval.lower(); - } - } - void maybe_invalidate_iterator(const compatible_ring_position_or_view& crp) { - if (_last_known_leveled_sstables_change_cnt != _leveled_sstables_change_cnt) { - _it = _leveled_sstables.lower_bound(interval_type::closed(crp, crp)); - _last_known_leveled_sstables_change_cnt = _leveled_sstables_change_cnt; - } - } -public: - incremental_selector(schema_ptr schema, const std::vector& unleveled_sstables, const interval_map_type& leveled_sstables, - const uint64_t& leveled_sstables_change_cnt) - : _schema(std::move(schema)) - , _unleveled_sstables(unleveled_sstables) - , _leveled_sstables(leveled_sstables) - , _leveled_sstables_change_cnt(leveled_sstables_change_cnt) - , _last_known_leveled_sstables_change_cnt(leveled_sstables_change_cnt) - , _it(leveled_sstables.begin()) - , _next_position(dht::ring_position::min()) { - } - virtual std::tuple, dht::ring_position_view> select(const dht::ring_position_view& pos) override { - auto crp = compatible_ring_position_or_view(*_schema, pos); - auto ssts = _unleveled_sstables; - using namespace dht; - - maybe_invalidate_iterator(crp); - - while (_it != _leveled_sstables.end()) { - if (boost::icl::contains(_it->first, crp)) { - ssts.insert(ssts.end(), _it->second.begin(), _it->second.end()); - return std::make_tuple(partitioned_sstable_set::to_partition_range(_it->first), std::move(ssts), next_position(std::next(_it))); - } - // We don't want to skip current interval if pos lies before it. - if (is_before_interval(crp, _it->first)) { - return std::make_tuple(partitioned_sstable_set::to_partition_range(pos, _it->first), std::move(ssts), next_position(_it)); - } - _it++; - } - return std::make_tuple(partition_range::make_open_ended_both_sides(), std::move(ssts), ring_position_view::max()); - } -}; - -std::unique_ptr partitioned_sstable_set::make_incremental_selector() const { - return std::make_unique(_schema, _unleveled_sstables, _leveled_sstables, _leveled_sstables_change_cnt); -} - -std::unique_ptr compaction_strategy_impl::make_sstable_set(schema_ptr schema) const { - return std::make_unique(); -} - -std::unique_ptr leveled_compaction_strategy::make_sstable_set(schema_ptr schema) const { - return std::make_unique(std::move(schema)); -} - -sstable_set make_partitioned_sstable_set(schema_ptr schema, lw_shared_ptr all, bool use_level_metadata) { - return sstable_set(std::make_unique(schema, use_level_metadata), schema, std::move(all)); -} - -sstable_set -compaction_strategy::make_sstable_set(schema_ptr schema) const { - return sstable_set( - _compaction_strategy_impl->make_sstable_set(schema), - schema, - make_lw_shared()); -} - -using sstable_reader_factory_type = std::function; - -static logging::logger irclogger("incremental_reader_selector"); - -// Incremental selector implementation for combined_mutation_reader that -// selects readers on-demand as the read progresses through the token -// range. -class incremental_reader_selector : public reader_selector { - const dht::partition_range* _pr; - lw_shared_ptr _sstables; - tracing::trace_state_ptr _trace_state; - std::optional _selector; - std::unordered_set _read_sstable_gens; - sstable_reader_factory_type _fn; - - flat_mutation_reader create_reader(shared_sstable sst) { - tracing::trace(_trace_state, "Reading partition range {} from sstable {}", *_pr, seastar::value_of([&sst] { return sst->get_filename(); })); - return _fn(sst, *_pr); - } - -public: - explicit incremental_reader_selector(schema_ptr s, - lw_shared_ptr sstables, - const dht::partition_range& pr, - tracing::trace_state_ptr trace_state, - sstable_reader_factory_type fn) - : reader_selector(s, pr.start() ? pr.start()->value() : dht::ring_position_view::min()) - , _pr(&pr) - , _sstables(std::move(sstables)) - , _trace_state(std::move(trace_state)) - , _selector(_sstables->make_incremental_selector()) - , _fn(std::move(fn)) { - - irclogger.trace("{}: created for range: {} with {} sstables", - fmt::ptr(this), - *_pr, - _sstables->all()->size()); - } - - incremental_reader_selector(const incremental_reader_selector&) = delete; - incremental_reader_selector& operator=(const incremental_reader_selector&) = delete; - - incremental_reader_selector(incremental_reader_selector&&) = delete; - incremental_reader_selector& operator=(incremental_reader_selector&&) = delete; - - virtual std::vector create_new_readers(const std::optional& pos) override { - irclogger.trace("{}: {}({})", fmt::ptr(this), __FUNCTION__, seastar::lazy_deref(pos)); - - auto readers = std::vector(); - - do { - auto selection = _selector->select(_selector_position); - _selector_position = selection.next_position; - - irclogger.trace("{}: {} sstables to consider, advancing selector to {}", fmt::ptr(this), selection.sstables.size(), - _selector_position); - - readers = boost::copy_range>(selection.sstables - | boost::adaptors::filtered([this] (auto& sst) { return _read_sstable_gens.emplace(sst->generation()).second; }) - | boost::adaptors::transformed([this] (auto& sst) { return this->create_reader(sst); })); - } while (!_selector_position.is_max() && readers.empty() && (!pos || dht::ring_position_tri_compare(*_s, *pos, _selector_position) >= 0)); - - irclogger.trace("{}: created {} new readers", fmt::ptr(this), readers.size()); - - // prevents sstable_set::incremental_selector::_current_sstables from holding reference to - // sstables when done selecting. - if (_selector_position.is_max()) { - _selector.reset(); - } - - return readers; - } - - virtual std::vector fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override { - _pr = ≺ - - auto pos = dht::ring_position_view::for_range_start(*_pr); - if (dht::ring_position_tri_compare(*_s, pos, _selector_position) >= 0) { - return create_new_readers(pos); - } - - return {}; - } -}; - -// The returned function uses the bloom filter to check whether the given sstable -// may have a partition given by the ring position `pos`. -// -// Returning `false` means the sstable doesn't have such a partition. -// Returning `true` means it may, i.e. we don't know whether or not it does. -// -// Assumes the given `pos` and `schema` are alive during the function's lifetime. -static std::predicate auto -make_pk_filter(const dht::ring_position& pos, const schema& schema) { - return [&pos, key = key::from_partition_key(schema, *pos.key()), cmp = dht::ring_position_comparator(schema)] (const sstable& sst) { - return cmp(pos, sst.get_first_decorated_key()) >= 0 && - cmp(pos, sst.get_last_decorated_key()) <= 0 && - sst.filter_has_key(key); - }; -} - -// Filter out sstables for reader using bloom filter -static std::vector -filter_sstable_for_reader_by_pk(std::vector&& sstables, const schema& schema, const dht::ring_position& pos) { - auto filter = [_filter = make_pk_filter(pos, schema)] (const shared_sstable& sst) { return !_filter(*sst); }; - sstables.erase(boost::remove_if(sstables, filter), sstables.end()); - return sstables; -} - -// Filter out sstables for reader using sstable metadata that keeps track -// of a range for each clustering component. -static std::vector -filter_sstable_for_reader_by_ck(std::vector&& sstables, column_family& cf, const schema_ptr& schema, - const query::partition_slice& slice) { - // no clustering filtering is applied if schema defines no clustering key or - // compaction strategy thinks it will not benefit from such an optimization, - // or the partition_slice includes static columns. - if (!schema->clustering_key_size() || !cf.get_compaction_strategy().use_clustering_key_filter() || slice.static_columns.size()) { - return sstables; - } - - ::cf_stats* stats = cf.cf_stats(); - stats->clustering_filter_count++; - stats->sstables_checked_by_clustering_filter += sstables.size(); - - auto ck_filtering_all_ranges = slice.get_all_ranges(); - // fast path to include all sstables if only one full range was specified. - // For example, this happens if query only specifies a partition key. - if (ck_filtering_all_ranges.size() == 1 && ck_filtering_all_ranges[0].is_full()) { - stats->clustering_filter_fast_path_count++; - stats->surviving_sstables_after_clustering_filter += sstables.size(); - return sstables; - } - - auto skipped = std::partition(sstables.begin(), sstables.end(), [&ranges = ck_filtering_all_ranges] (const shared_sstable& sst) { - return sst->may_contain_rows(ranges); - }); - sstables.erase(skipped, sstables.end()); - stats->surviving_sstables_after_clustering_filter += sstables.size(); - - return sstables; -} - -flat_mutation_reader -sstable_set::create_single_key_sstable_reader( - column_family* cf, - schema_ptr schema, - reader_permit permit, - utils::estimated_histogram& sstable_histogram, - const dht::ring_position& pos, - const query::partition_slice& slice, - const io_priority_class& pc, - tracing::trace_state_ptr trace_state, - streamed_mutation::forwarding fwd, - mutation_reader::forwarding fwd_mr) const -{ - auto selected_sstables = filter_sstable_for_reader_by_pk(select({pos}), *schema, pos); - auto num_sstables = selected_sstables.size(); - if (!num_sstables) { - return make_empty_flat_reader(schema, permit); - } - auto readers = boost::copy_range>( - filter_sstable_for_reader_by_ck(std::move(selected_sstables), *cf, schema, slice) - | boost::adaptors::transformed([&] (const shared_sstable& sstable) { - tracing::trace(trace_state, "Reading key {} from sstable {}", pos, seastar::value_of([&sstable] { return sstable->get_filename(); })); - return sstable->read_row_flat(schema, permit, pos, slice, pc, trace_state, fwd); - }) - ); - - // If filter_sstable_for_reader_by_ck filtered any sstable that contains the partition - // we want to emit partition_start/end if no rows were found, - // to prevent https://github.com/scylladb/scylla/issues/3552. - // - // Use `flat_mutation_reader_from_mutations` with an empty mutation to emit - // the partition_start/end pair and append it to the list of readers passed - // to make_combined_reader to ensure partition_start/end are emitted even if - // all sstables actually containing the partition were filtered. - auto num_readers = readers.size(); - if (num_readers != num_sstables) { - readers.push_back(flat_mutation_reader_from_mutations(permit, {mutation(schema, *pos.key())}, slice, fwd)); - } - sstable_histogram.add(num_readers); - return make_combined_reader(schema, std::move(permit), std::move(readers), fwd, fwd_mr); -} - -flat_mutation_reader -sstable_set::make_range_sstable_reader( - schema_ptr s, - reader_permit permit, - const dht::partition_range& pr, - const query::partition_slice& slice, - const io_priority_class& pc, - tracing::trace_state_ptr trace_state, - streamed_mutation::forwarding fwd, - mutation_reader::forwarding fwd_mr, - read_monitor_generator& monitor_generator) const -{ - auto reader_factory_fn = [s, permit, &slice, &pc, trace_state, fwd, fwd_mr, &monitor_generator] - (shared_sstable& sst, const dht::partition_range& pr) mutable { - return sst->read_range_rows_flat(s, permit, pr, slice, pc, trace_state, fwd, fwd_mr, monitor_generator(sst)); - }; - return make_combined_reader(s, std::move(permit), std::make_unique(s, - shared_from_this(), - pr, - std::move(trace_state), - std::move(reader_factory_fn)), - fwd, - fwd_mr); -} - -flat_mutation_reader -sstable_set::make_local_shard_sstable_reader( - schema_ptr s, - reader_permit permit, - const dht::partition_range& pr, - const query::partition_slice& slice, - const io_priority_class& pc, - tracing::trace_state_ptr trace_state, - streamed_mutation::forwarding fwd, - mutation_reader::forwarding fwd_mr, - read_monitor_generator& monitor_generator) const -{ - auto reader_factory_fn = [s, permit, &slice, &pc, trace_state, fwd, fwd_mr, &monitor_generator] - (shared_sstable& sst, const dht::partition_range& pr) mutable { - flat_mutation_reader reader = sst->read_range_rows_flat(s, permit, pr, slice, pc, - trace_state, fwd, fwd_mr, monitor_generator(sst)); - if (sst->is_shared()) { - auto filter = [&s = *s](const dht::decorated_key& dk) -> bool { - return dht::shard_of(s, dk.token()) == this_shard_id(); - }; - reader = make_filtering_reader(std::move(reader), std::move(filter)); - } - return reader; - }; - return make_combined_reader(s, std::move(permit), std::make_unique(s, - shared_from_this(), - pr, - std::move(trace_state), - std::move(reader_factory_fn)), - fwd, - fwd_mr); -} - -flat_mutation_reader make_restricted_range_sstable_reader( - lw_shared_ptr sstables, - schema_ptr s, - reader_permit permit, - const dht::partition_range& pr, - const query::partition_slice& slice, - const io_priority_class& pc, - tracing::trace_state_ptr trace_state, - streamed_mutation::forwarding fwd, - mutation_reader::forwarding fwd_mr, - read_monitor_generator& monitor_generator) -{ - auto ms = mutation_source([sstables=std::move(sstables), &monitor_generator] ( - schema_ptr s, - reader_permit permit, - const dht::partition_range& pr, - const query::partition_slice& slice, - const io_priority_class& pc, - tracing::trace_state_ptr trace_state, - streamed_mutation::forwarding fwd, - mutation_reader::forwarding fwd_mr) { - return sstables->make_range_sstable_reader(std::move(s), std::move(permit), pr, slice, pc, - std::move(trace_state), fwd, fwd_mr, monitor_generator); - }); - return make_restricted_flat_reader(std::move(ms), std::move(s), std::move(permit), pr, slice, pc, std::move(trace_state), fwd, fwd_mr); -} - -} // namespace sstables diff --git a/sstables/sstable_set.hh b/sstables/sstable_set.hh index f0f5a86c73..28a953e7bc 100644 --- a/sstables/sstable_set.hh +++ b/sstables/sstable_set.hh @@ -21,17 +21,11 @@ #pragma once -#include "flat_mutation_reader.hh" -#include "sstables/progress_monitor.hh" #include "shared_sstable.hh" #include "dht/i_partitioner.hh" #include #include -namespace utils { -class estimated_histogram; -} - namespace sstables { class sstable_set_impl; @@ -49,7 +43,7 @@ public: const sstable_list& all() const { return _all; } }; -class sstable_set : public enable_lw_shared_from_this { +class sstable_set { std::unique_ptr _impl; schema_ptr _schema; // used to support column_family::get_sstable(), which wants to return an sstable_list @@ -105,63 +99,8 @@ public: selection select(const dht::ring_position_view& pos) const; }; incremental_selector make_incremental_selector() const; - - flat_mutation_reader create_single_key_sstable_reader( - column_family*, - schema_ptr, - reader_permit, - utils::estimated_histogram&, - const dht::ring_position&, // must contain a key - const query::partition_slice&, - const io_priority_class&, - tracing::trace_state_ptr, - streamed_mutation::forwarding, - mutation_reader::forwarding) const; - - /// Read a range from the sstable set. - /// - /// The reader is unrestricted, but will account its resource usage on the - /// semaphore belonging to the passed-in permit. - flat_mutation_reader make_range_sstable_reader( - schema_ptr, - reader_permit, - const dht::partition_range&, - const query::partition_slice&, - const io_priority_class&, - tracing::trace_state_ptr, - streamed_mutation::forwarding, - mutation_reader::forwarding, - read_monitor_generator& rmg = default_read_monitor_generator()) const; - - // Filters out mutations that don't belong to the current shard. - flat_mutation_reader make_local_shard_sstable_reader( - schema_ptr, - reader_permit, - const dht::partition_range&, - const query::partition_slice&, - const io_priority_class&, - tracing::trace_state_ptr, - streamed_mutation::forwarding, - mutation_reader::forwarding, - read_monitor_generator& rmg = default_read_monitor_generator()) const; }; -/// Read a range from the passed-in sstables. -/// -/// The reader is restricted, that is it will wait for admission on the semaphore -/// belonging to the passed-in permit, before starting to read. -flat_mutation_reader make_restricted_range_sstable_reader( - lw_shared_ptr sstables, - schema_ptr, - reader_permit, - const dht::partition_range&, - const query::partition_slice&, - const io_priority_class&, - tracing::trace_state_ptr, - streamed_mutation::forwarding, - mutation_reader::forwarding, - read_monitor_generator& rmg = default_read_monitor_generator()); - sstable_set make_partitioned_sstable_set(schema_ptr schema, lw_shared_ptr all, bool use_level_metadata = true); std::ostream& operator<<(std::ostream& os, const sstables::sstable_run& run); diff --git a/sstables/sstable_set_impl.hh b/sstables/sstable_set_impl.hh deleted file mode 100644 index e0113a5a06..0000000000 --- a/sstables/sstable_set_impl.hh +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Copyright (C) 2020 ScyllaDB - */ - -/* - * This file is part of Scylla. - * - * Scylla is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * Scylla is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with Scylla. If not, see . - */ - -#pragma once - -#include - -#include "compatible_ring_position.hh" -#include "sstable_set.hh" - -namespace sstables { - -class incremental_selector_impl { -public: - virtual ~incremental_selector_impl() {} - virtual std::tuple, dht::ring_position_view> select(const dht::ring_position_view&) = 0; -}; - -class sstable_set_impl { -public: - virtual ~sstable_set_impl() {} - virtual std::unique_ptr clone() const = 0; - virtual std::vector select(const dht::partition_range& range) const = 0; - virtual void insert(shared_sstable sst) = 0; - virtual void erase(shared_sstable sst) = 0; - virtual std::unique_ptr make_incremental_selector() const = 0; -}; - -// default sstable_set, not specialized for anything -class bag_sstable_set : public sstable_set_impl { - // erasing is slow, but select() is fast - std::vector _sstables; -public: - virtual std::unique_ptr clone() const override; - virtual std::vector select(const dht::partition_range& range = query::full_partition_range) const override; - virtual void insert(shared_sstable sst) override; - virtual void erase(shared_sstable sst) override; - virtual std::unique_ptr make_incremental_selector() const override; - class incremental_selector; -}; - -// specialized when sstables are partitioned in the token range space -// e.g. leveled compaction strategy -class partitioned_sstable_set : public sstable_set_impl { - using value_set = std::unordered_set; - using interval_map_type = boost::icl::interval_map; - using interval_type = interval_map_type::interval_type; - using map_iterator = interval_map_type::const_iterator; -private: - schema_ptr _schema; - std::vector _unleveled_sstables; - interval_map_type _leveled_sstables; - // Change counter on interval map for leveled sstables which is used by - // incremental selector to determine whether or not to invalidate iterators. - uint64_t _leveled_sstables_change_cnt = 0; - bool _use_level_metadata = false; -private: - static interval_type make_interval(const schema& s, const dht::partition_range& range); - interval_type make_interval(const dht::partition_range& range) const; - static interval_type make_interval(const schema_ptr& s, const sstable& sst); - interval_type make_interval(const sstable& sst); - interval_type singular(const dht::ring_position& rp) const; - std::pair query(const dht::partition_range& range) const; - // SSTables are stored separately to avoid interval map's fragmentation issue when level 0 falls behind. - bool store_as_unleveled(const shared_sstable& sst) const; -public: - static dht::ring_position to_ring_position(const compatible_ring_position_or_view& crp); - static dht::partition_range to_partition_range(const interval_type& i); - static dht::partition_range to_partition_range(const dht::ring_position_view& pos, const interval_type& i); - explicit partitioned_sstable_set(schema_ptr schema, bool use_level_metadata = true); - - virtual std::unique_ptr clone() const override; - virtual std::vector select(const dht::partition_range& range) const override; - virtual void insert(shared_sstable sst) override; - virtual void erase(shared_sstable sst) override; - virtual std::unique_ptr make_incremental_selector() const override; - class incremental_selector; -}; - -} // namespace sstables diff --git a/sstables/sstables.hh b/sstables/sstables.hh index c4843c0457..c4fb4056b0 100644 --- a/sstables/sstables.hh +++ b/sstables/sstables.hh @@ -805,12 +805,6 @@ public: // Return true if this sstable possibly stores clustering row(s) specified by ranges. bool may_contain_rows(const query::clustering_row_ranges& ranges) const; - // false => there are no partition tombstones, true => we don't know - bool may_have_partition_tombstones() const { - return !has_correct_min_max_column_names() - || _position_range.is_all_clustered_rows(*_schema); - } - // Return the large_data_stats_entry identified by large_data_type // iff _large_data_stats is available and the requested entry is in // the map. Otherwise, return a disengaged optional. diff --git a/table.cc b/table.cc index 703879a93e..0be1ff8146 100644 --- a/table.cc +++ b/table.cc @@ -25,7 +25,10 @@ #include "service/priority_manager.hh" #include "db/schema_tables.hh" #include "cell_locking.hh" +#include "mutation_fragment.hh" +#include "mutation_partition.hh" #include "utils/logalloc.hh" +#include "sstables/progress_monitor.hh" #include "checked-file-impl.hh" #include "view_info.hh" #include "db/data_listeners.hh" @@ -49,6 +52,229 @@ static seastar::metrics::label keyspace_label("ks"); using namespace std::chrono_literals; +// Filter out sstables for reader using bloom filter +static std::vector +filter_sstable_for_reader_by_pk(std::vector&& sstables, column_family& cf, const schema_ptr& schema, + const dht::partition_range& pr, const sstables::key& key) { + const dht::ring_position& pr_key = pr.start()->value(); + auto sstable_has_not_key = [&, cmp = dht::ring_position_comparator(*schema)] (const sstables::shared_sstable& sst) { + return cmp(pr_key, sst->get_first_decorated_key()) < 0 || + cmp(pr_key, sst->get_last_decorated_key()) > 0 || + !sst->filter_has_key(key); + }; + sstables.erase(boost::remove_if(sstables, sstable_has_not_key), sstables.end()); + return sstables; +} + +// Filter out sstables for reader using sstable metadata that keeps track +// of a range for each clustering component. +static std::vector +filter_sstable_for_reader_by_ck(std::vector&& sstables, column_family& cf, const schema_ptr& schema, + const query::partition_slice& slice) { + // no clustering filtering is applied if schema defines no clustering key or + // compaction strategy thinks it will not benefit from such an optimization, + // or the partition_slice includes static columns. + if (!schema->clustering_key_size() || !cf.get_compaction_strategy().use_clustering_key_filter() || slice.static_columns.size()) { + return sstables; + } + + ::cf_stats* stats = cf.cf_stats(); + stats->clustering_filter_count++; + stats->sstables_checked_by_clustering_filter += sstables.size(); + + auto ck_filtering_all_ranges = slice.get_all_ranges(); + // fast path to include all sstables if only one full range was specified. + // For example, this happens if query only specifies a partition key. + if (ck_filtering_all_ranges.size() == 1 && ck_filtering_all_ranges[0].is_full()) { + stats->clustering_filter_fast_path_count++; + stats->surviving_sstables_after_clustering_filter += sstables.size(); + return sstables; + } + + auto skipped = std::partition(sstables.begin(), sstables.end(), [&ranges = ck_filtering_all_ranges] (const sstables::shared_sstable& sst) { + return sst->may_contain_rows(ranges); + }); + sstables.erase(skipped, sstables.end()); + stats->surviving_sstables_after_clustering_filter += sstables.size(); + + return sstables; +} + +// Incremental selector implementation for combined_mutation_reader that +// selects readers on-demand as the read progresses through the token +// range. +class incremental_reader_selector : public reader_selector { + const dht::partition_range* _pr; + lw_shared_ptr _sstables; + tracing::trace_state_ptr _trace_state; + std::optional _selector; + std::unordered_set _read_sstable_gens; + sstable_reader_factory_type _fn; + + flat_mutation_reader create_reader(sstables::shared_sstable sst) { + tracing::trace(_trace_state, "Reading partition range {} from sstable {}", *_pr, seastar::value_of([&sst] { return sst->get_filename(); })); + return _fn(sst, *_pr); + } + +public: + explicit incremental_reader_selector(schema_ptr s, + lw_shared_ptr sstables, + const dht::partition_range& pr, + tracing::trace_state_ptr trace_state, + sstable_reader_factory_type fn) + : reader_selector(s, pr.start() ? pr.start()->value() : dht::ring_position_view::min()) + , _pr(&pr) + , _sstables(std::move(sstables)) + , _trace_state(std::move(trace_state)) + , _selector(_sstables->make_incremental_selector()) + , _fn(std::move(fn)) { + + tlogger.trace("incremental_reader_selector {}: created for range: {} with {} sstables", + fmt::ptr(this), + *_pr, + _sstables->all()->size()); + } + + incremental_reader_selector(const incremental_reader_selector&) = delete; + incremental_reader_selector& operator=(const incremental_reader_selector&) = delete; + + incremental_reader_selector(incremental_reader_selector&&) = delete; + incremental_reader_selector& operator=(incremental_reader_selector&&) = delete; + + virtual std::vector create_new_readers(const std::optional& pos) override { + tlogger.trace("incremental_reader_selector {}: {}({})", fmt::ptr(this), __FUNCTION__, seastar::lazy_deref(pos)); + + auto readers = std::vector(); + + do { + auto selection = _selector->select(_selector_position); + _selector_position = selection.next_position; + + tlogger.trace("incremental_reader_selector {}: {} sstables to consider, advancing selector to {}", fmt::ptr(this), selection.sstables.size(), + _selector_position); + + readers = boost::copy_range>(selection.sstables + | boost::adaptors::filtered([this] (auto& sst) { return _read_sstable_gens.emplace(sst->generation()).second; }) + | boost::adaptors::transformed([this] (auto& sst) { return this->create_reader(sst); })); + } while (!_selector_position.is_max() && readers.empty() && (!pos || dht::ring_position_tri_compare(*_s, *pos, _selector_position) >= 0)); + + tlogger.trace("incremental_reader_selector {}: created {} new readers", fmt::ptr(this), readers.size()); + + // prevents sstable_set::incremental_selector::_current_sstables from holding reference to + // sstables when done selecting. + if (_selector_position.is_max()) { + _selector.reset(); + } + + return readers; + } + + virtual std::vector fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override { + _pr = ≺ + + auto pos = dht::ring_position_view::for_range_start(*_pr); + if (dht::ring_position_tri_compare(*_s, pos, _selector_position) >= 0) { + return create_new_readers(pos); + } + + return {}; + } +}; + +static flat_mutation_reader +create_single_key_sstable_reader(column_family* cf, + schema_ptr schema, + reader_permit permit, + lw_shared_ptr sstables, + utils::estimated_histogram& sstable_histogram, + const dht::partition_range& pr, // must be singular + const query::partition_slice& slice, + const io_priority_class& pc, + tracing::trace_state_ptr trace_state, + streamed_mutation::forwarding fwd, + mutation_reader::forwarding fwd_mr) +{ + auto& pk = pr.start()->value().key(); + auto key = sstables::key::from_partition_key(*schema, *pk); + auto selected_sstables = filter_sstable_for_reader_by_pk(sstables->select(pr), *cf, schema, pr, key); + auto num_sstables = selected_sstables.size(); + if (!num_sstables) { + return make_empty_flat_reader(schema, permit); + } + auto readers = boost::copy_range>( + filter_sstable_for_reader_by_ck(std::move(selected_sstables), *cf, schema, slice) + | boost::adaptors::transformed([&] (const sstables::shared_sstable& sstable) { + tracing::trace(trace_state, "Reading key {} from sstable {}", pr, seastar::value_of([&sstable] { return sstable->get_filename(); })); + return sstable->read_row_flat(schema, permit, pr.start()->value(), slice, pc, trace_state, fwd); + }) + ); + + // If filter_sstable_for_reader_by_ck filtered any sstable that contains the partition + // we want to emit partition_start/end if no rows were found, + // to prevent https://github.com/scylladb/scylla/issues/3552. + // + // Use `flat_mutation_reader_from_mutations` with an empty mutation to emit + // the partition_start/end pair and append it to the list of readers passed + // to make_combined_reader to ensure partition_start/end are emitted even if + // all sstables actually containing the partition were filtered. + auto num_readers = readers.size(); + if (num_readers != num_sstables) { + readers.push_back(flat_mutation_reader_from_mutations(permit, {mutation(schema, *pk)}, slice, fwd)); + } + sstable_histogram.add(num_readers); + return make_combined_reader(schema, std::move(permit), std::move(readers), fwd, fwd_mr); +} + +flat_mutation_reader make_range_sstable_reader(schema_ptr s, + reader_permit permit, + lw_shared_ptr sstables, + const dht::partition_range& pr, + const query::partition_slice& slice, + const io_priority_class& pc, + tracing::trace_state_ptr trace_state, + streamed_mutation::forwarding fwd, + mutation_reader::forwarding fwd_mr, + sstables::read_monitor_generator& monitor_generator) +{ + auto reader_factory_fn = [s, permit, &slice, &pc, trace_state, fwd, fwd_mr, &monitor_generator] + (sstables::shared_sstable& sst, const dht::partition_range& pr) mutable { + return sst->read_range_rows_flat(s, permit, pr, slice, pc, trace_state, fwd, fwd_mr, monitor_generator(sst)); + }; + return make_combined_reader(s, std::move(permit), std::make_unique(s, + std::move(sstables), + pr, + std::move(trace_state), + std::move(reader_factory_fn)), + fwd, + fwd_mr); +} + +flat_mutation_reader make_restricted_range_sstable_reader(schema_ptr s, + reader_permit permit, + lw_shared_ptr sstables, + const dht::partition_range& pr, + const query::partition_slice& slice, + const io_priority_class& pc, + tracing::trace_state_ptr trace_state, + streamed_mutation::forwarding fwd, + mutation_reader::forwarding fwd_mr, + sstables::read_monitor_generator& monitor_generator) +{ + auto ms = mutation_source([sstables=std::move(sstables), &monitor_generator] ( + schema_ptr s, + reader_permit permit, + const dht::partition_range& pr, + const query::partition_slice& slice, + const io_priority_class& pc, + tracing::trace_state_ptr trace_state, + streamed_mutation::forwarding fwd, + mutation_reader::forwarding fwd_mr) { + return make_range_sstable_reader(std::move(s), std::move(permit), std::move(sstables), pr, slice, pc, + std::move(trace_state), fwd, fwd_mr, monitor_generator); + }); + return make_restricted_flat_reader(std::move(ms), std::move(s), std::move(permit), pr, slice, pc, std::move(trace_state), fwd, fwd_mr); +} + flat_mutation_reader table::make_sstable_reader(schema_ptr s, reader_permit permit, @@ -89,9 +315,8 @@ table::make_sstable_reader(schema_ptr s, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) { - assert(pr.is_singular() && pr.start()->value().has_key()); - return sstables->create_single_key_sstable_reader(const_cast(this), std::move(s), std::move(permit), - _stats.estimated_sstable_per_read, pr.start()->value(), slice, pc, std::move(trace_state), fwd, fwd_mr); + return create_single_key_sstable_reader(const_cast(this), std::move(s), std::move(permit), std::move(sstables), + _stats.estimated_sstable_per_read, pr, slice, pc, std::move(trace_state), fwd, fwd_mr); }); } else { return mutation_source([sstables=std::move(sstables)] ( @@ -103,7 +328,7 @@ table::make_sstable_reader(schema_ptr s, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) { - return sstables->make_local_shard_sstable_reader(std::move(s), std::move(permit), pr, slice, pc, + return make_local_shard_sstable_reader(std::move(s), std::move(permit), std::move(sstables), pr, slice, pc, std::move(trace_state), fwd, fwd_mr); }); } @@ -294,6 +519,38 @@ static bool belongs_to_other_shard(const std::vector& shards) { return shards.size() != size_t(belongs_to_current_shard(shards)); } +flat_mutation_reader make_local_shard_sstable_reader(schema_ptr s, + reader_permit permit, + lw_shared_ptr sstables, + const dht::partition_range& pr, + const query::partition_slice& slice, + const io_priority_class& pc, + tracing::trace_state_ptr trace_state, + streamed_mutation::forwarding fwd, + mutation_reader::forwarding fwd_mr, + sstables::read_monitor_generator& monitor_generator) +{ + auto reader_factory_fn = [s, permit, &slice, &pc, trace_state, fwd, fwd_mr, &monitor_generator] + (sstables::shared_sstable& sst, const dht::partition_range& pr) mutable { + flat_mutation_reader reader = sst->read_range_rows_flat(s, permit, pr, slice, pc, + trace_state, fwd, fwd_mr, monitor_generator(sst)); + if (sst->is_shared()) { + auto filter = [&s = *s](const dht::decorated_key& dk) -> bool { + return dht::shard_of(s, dk.token()) == this_shard_id(); + }; + reader = make_filtering_reader(std::move(reader), std::move(filter)); + } + return reader; + }; + return make_combined_reader(s, std::move(permit), std::make_unique(s, + std::move(sstables), + pr, + std::move(trace_state), + std::move(reader_factory_fn)), + fwd, + fwd_mr); +} + sstables::shared_sstable table::make_sstable(sstring dir, int64_t generation, sstables::sstable_version_types v, sstables::sstable_format_types f, io_error_handler_gen error_handler_gen) { return get_sstables_manager().make_sstable(_schema, dir, generation, v, f, gc_clock::now(), error_handler_gen); diff --git a/test/boost/mutation_reader_test.cc b/test/boost/mutation_reader_test.cc index 12b35f8bd7..0e650d8771 100644 --- a/test/boost/mutation_reader_test.cc +++ b/test/boost/mutation_reader_test.cc @@ -648,9 +648,10 @@ SEASTAR_THREAD_TEST_CASE(combined_mutation_reader_test) { auto list_reader = make_combined_reader(s.schema(), tests::make_permit(), std::move(sstable_mutation_readers)); - auto incremental_reader = sstable_set->make_local_shard_sstable_reader( + auto incremental_reader = make_local_shard_sstable_reader( s.schema(), tests::make_permit(), + sstable_set, query::full_partition_range, s.schema()->full_slice(), seastar::default_priority_class(), diff --git a/test/boost/raft_fsm_test.cc b/test/boost/raft_fsm_test.cc index d9cf85a81a..f2b8ec1158 100644 --- a/test/boost/raft_fsm_test.cc +++ b/test/boost/raft_fsm_test.cc @@ -69,8 +69,7 @@ BOOST_AUTO_TEST_CASE(test_election_single_node) { BOOST_CHECK(output.term); BOOST_CHECK(output.vote); BOOST_CHECK(output.messages.empty()); - // A new leader applies one dummy entry - BOOST_CHECK(output.log_entries.size() == 1 && std::holds_alternative(output.log_entries[0]->data)); + BOOST_CHECK(output.log_entries.empty()); BOOST_CHECK(output.committed.empty()); // The leader does not become candidate simply because // a timeout has elapsed, i.e. there are no spurious @@ -82,8 +81,7 @@ BOOST_AUTO_TEST_CASE(test_election_single_node) { BOOST_CHECK(!output.vote); BOOST_CHECK(output.messages.empty()); BOOST_CHECK(output.log_entries.empty()); - // Dummy entry is now commited - BOOST_CHECK(output.committed.size() == 1 && std::holds_alternative(output.committed[0]->data)); + BOOST_CHECK(output.committed.empty()); } // Test that adding an entry to a single-node cluster diff --git a/test/boost/sstable_datafile_test.cc b/test/boost/sstable_datafile_test.cc index 37c67f6a10..472291072c 100644 --- a/test/boost/sstable_datafile_test.cc +++ b/test/boost/sstable_datafile_test.cc @@ -6083,8 +6083,9 @@ SEASTAR_TEST_CASE(purged_tombstone_consumer_sstable_test) { for (auto&& sst : all) { compacting->insert(std::move(sst)); } - auto reader = compacting->make_range_sstable_reader(s, + auto reader = ::make_range_sstable_reader(s, tests::make_permit(), + compacting, query::full_partition_range, s->full_slice(), service::get_local_compaction_priority(), @@ -6684,43 +6685,3 @@ SEASTAR_TEST_CASE(test_zero_estimated_partitions) { return make_ready_future<>(); }); } - -SEASTAR_TEST_CASE(test_may_have_partition_tombstones) { - return test_env::do_with_async([] (test_env& env) { - storage_service_for_tests ssft; - simple_schema ss; - auto s = ss.schema(); - auto pks = ss.make_pkeys(2); - - auto tmp = tmpdir(); - unsigned gen = 0; - for (auto version : all_sstable_versions) { - if (version < sstable_version_types::md) { - continue; - } - - auto mut1 = mutation(s, pks[0]); - auto mut2 = mutation(s, pks[1]); - mut1.partition().apply_insert(*s, ss.make_ckey(0), ss.new_timestamp()); - mut1.partition().apply_delete(*s, ss.make_ckey(1), ss.new_tombstone()); - ss.add_row(mut1, ss.make_ckey(2), "val"); - ss.delete_range(mut1, query::clustering_range::make({ss.make_ckey(3)}, {ss.make_ckey(5)})); - ss.add_row(mut2, ss.make_ckey(6), "val"); - - auto sst_gen = [&env, s, &tmp, &gen, version] () { - return env.make_sstable(s, tmp.path().string(), ++gen, version, big); - }; - - { - auto sst = make_sstable_containing(sst_gen, {mut1, mut2}); - sst->load().get(); - BOOST_REQUIRE(!sst->may_have_partition_tombstones()); - } - - mut2.partition().apply(ss.new_tombstone()); - auto sst = make_sstable_containing(sst_gen, {mut1, mut2}); - sst->load().get(); - BOOST_REQUIRE(sst->may_have_partition_tombstones()); - } - }); -} diff --git a/test/raft/replication_test.cc b/test/raft/replication_test.cc index 529ba05f2c..a6d26d6ee0 100644 --- a/test/raft/replication_test.cc +++ b/test/raft/replication_test.cc @@ -586,7 +586,7 @@ future run_test(test_case test) { for (auto& s : persisted_snapshots) { auto& [snp, val] = s.second; auto digest = val.value.get_value(); - expected = sm_value_for(val.idx).get_value(); + expected = sm_value_for(snp.idx).get_value(); if (digest != expected) { tlogger.debug("Persisted snapshot {} doesn't match {} != {}", snp.id, digest, expected); fail = -1; @@ -615,8 +615,6 @@ int main(int argc, char* argv[]) { {.name = "non_empty_leader_log", .nodes = 2, .initial_states = {{.le = {{1,0},{1,1},{1,2},{1,3}}}}, .updates = {entries{4}}}, - {.name = "non_empty_leader_log_no_new_entries", .nodes = 2, .total_values = 4, - .initial_states = {{.le = {{1,0},{1,1},{1,2},{1,3}}}}}, // 1 nodes, 12 client entries {.name = "simple_1_auto_12", .nodes = 1, .initial_states = {}, .updates = {entries{12}}},