diff --git a/configure.py b/configure.py index 35b8828fa8..c2d44f24ee 100755 --- a/configure.py +++ b/configure.py @@ -575,6 +575,7 @@ scylla_core = (['database.cc', 'sstables/mp_row_consumer.cc', 'sstables/sstables.cc', 'sstables/sstables_manager.cc', + 'sstables/sstable_set.cc', 'sstables/mx/writer.cc', 'sstables/kl/writer.cc', 'sstables/sstable_version.cc', diff --git a/database.hh b/database.hh index 1f40320823..2a5f615764 100644 --- a/database.hh +++ b/database.hh @@ -1001,50 +1001,6 @@ public: friend class distributed_loader; }; -using sstable_reader_factory_type = std::function; - -// Filters out mutation that doesn't belong to current shard. -flat_mutation_reader make_local_shard_sstable_reader(schema_ptr s, - reader_permit permit, - lw_shared_ptr sstables, - const dht::partition_range& pr, - const query::partition_slice& slice, - const io_priority_class& pc, - tracing::trace_state_ptr trace_state, - streamed_mutation::forwarding fwd, - mutation_reader::forwarding fwd_mr, - sstables::read_monitor_generator& monitor_generator = sstables::default_read_monitor_generator()); - -/// Read a range from the passed-in sstables. -/// -/// The reader is unrestricted, but will account its resource usage on the -/// semaphore belonging to the passed-in permit. -flat_mutation_reader make_range_sstable_reader(schema_ptr s, - reader_permit permit, - lw_shared_ptr sstables, - const dht::partition_range& pr, - const query::partition_slice& slice, - const io_priority_class& pc, - tracing::trace_state_ptr trace_state, - streamed_mutation::forwarding fwd, - mutation_reader::forwarding fwd_mr, - sstables::read_monitor_generator& monitor_generator = sstables::default_read_monitor_generator()); - -/// Read a range from the passed-in sstables. -/// -/// The reader is restricted, that is it will wait for admission on the semaphore -/// belonging to the passed-in permit, before starting to read. -flat_mutation_reader make_restricted_range_sstable_reader(schema_ptr s, - reader_permit permit, - lw_shared_ptr sstables, - const dht::partition_range& pr, - const query::partition_slice& slice, - const io_priority_class& pc, - tracing::trace_state_ptr trace_state, - streamed_mutation::forwarding fwd, - mutation_reader::forwarding fwd_mr, - sstables::read_monitor_generator& monitor_generator = sstables::default_read_monitor_generator()); - class user_types_metadata; class keyspace_metadata final { diff --git a/db/view/view.cc b/db/view/view.cc index fbdaff03fb..cf264a4f0c 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -1438,10 +1438,9 @@ view_builder::build_step& view_builder::get_or_create_build_step(utils::UUID bas void view_builder::initialize_reader_at_current_token(build_step& step) { step.pslice = make_partition_slice(*step.base->schema()); step.prange = dht::partition_range(dht::ring_position::starting_at(step.current_token()), dht::ring_position::max()); - step.reader = make_local_shard_sstable_reader( + step.reader = step.base->get_sstable_set().make_local_shard_sstable_reader( step.base->schema(), _permit, - make_lw_shared(step.base->get_sstable_set()), step.prange, step.pslice, default_priority_class(), diff --git a/db/view/view_builder.hh b/db/view/view_builder.hh index 0e4d997934..1364571806 100644 --- a/db/view/view_builder.hh +++ b/db/view/view_builder.hh @@ -28,7 +28,6 @@ #include "query-request.hh" #include "service/migration_listener.hh" #include "service/migration_manager.hh" -#include "sstables/sstable_set.hh" #include "utils/exponential_backoff_retry.hh" #include "utils/serialized_action.hh" #include "utils/UUID.hh" diff --git a/db/view/view_update_generator.cc b/db/view/view_update_generator.cc index a43c98097e..703d8ce454 100644 --- a/db/view/view_update_generator.cc +++ b/db/view/view_update_generator.cc @@ -82,7 +82,7 @@ future<> view_update_generator::start() { tracing::trace_state_ptr ts, streamed_mutation::forwarding fwd_ms, mutation_reader::forwarding fwd_mr) { - return ::make_restricted_range_sstable_reader(s, std::move(permit), std::move(ssts), pr, ps, pc, std::move(ts), fwd_ms, fwd_mr); + return make_restricted_range_sstable_reader(std::move(ssts), s, std::move(permit), pr, ps, pc, std::move(ts), fwd_ms, fwd_mr); }); auto [staging_sstable_reader, staging_sstable_reader_handle] = make_manually_paused_evictable_reader( std::move(ms), diff --git a/flat_mutation_reader.hh b/flat_mutation_reader.hh index 813324abee..769132d704 100644 --- a/flat_mutation_reader.hh +++ b/flat_mutation_reader.hh @@ -310,7 +310,7 @@ private: friend class optimized_optional; void do_upgrade_schema(const schema_ptr&); public: - // Documented in mutation_reader::forwarding in mutation_reader.hh. + // Documented in mutation_reader::forwarding. class partition_range_forwarding_tag; using partition_range_forwarding = bool_class; @@ -519,6 +519,21 @@ flat_mutation_reader make_flat_mutation_reader(Args &&... args) { return flat_mutation_reader(std::make_unique(std::forward(args)...)); } +namespace mutation_reader { + // mutation_reader::forwarding determines whether fast_forward_to() may + // be used on the mutation reader to change the partition range being + // read. Enabling forwarding also changes read policy: forwarding::no + // means we will stop reading from disk at the end of the given range, + // but with forwarding::yes we may read ahead, anticipating the user to + // make a small skip with fast_forward_to() and continuing to read. + // + // Note that mutation_reader::forwarding is similarly name but different + // from streamed_mutation::forwarding - the former is about skipping to + // a different partition range, while the latter is about skipping + // inside a large partition. + using forwarding = flat_mutation_reader::partition_range_forwarding; +} + // Consumes mutation fragments until StopCondition is true. // The consumer will stop iff StopCondition returns true, in particular // reaching the end of stream alone won't stop the reader. diff --git a/mutation_reader.hh b/mutation_reader.hh index 9f7bcb82f7..3a3dd50b25 100644 --- a/mutation_reader.hh +++ b/mutation_reader.hh @@ -31,21 +31,6 @@ #include "flat_mutation_reader.hh" #include "reader_concurrency_semaphore.hh" -namespace mutation_reader { - // mutation_reader::forwarding determines whether fast_forward_to() may - // be used on the mutation reader to change the partition range being - // read. Enabling forwarding also changes read policy: forwarding::no - // means we will stop reading from disk at the end of the given range, - // but with forwarding::yes we may read ahead, anticipating the user to - // make a small skip with fast_forward_to() and continuing to read. - // - // Note that mutation_reader::forwarding is similarly name but different - // from streamed_mutation::forwarding - the former is about skipping to - // a different partition range, while the latter is about skipping - // inside a large partition. - using forwarding = flat_mutation_reader::partition_range_forwarding; -} - class reader_selector { protected: schema_ptr _s; diff --git a/position_in_partition.hh b/position_in_partition.hh index 7dbbd987cc..6deeaa2c6e 100644 --- a/position_in_partition.hh +++ b/position_in_partition.hh @@ -593,6 +593,7 @@ public: position_in_partition&& end() && { return std::move(_end); } bool contains(const schema& s, position_in_partition_view pos) const; bool overlaps(const schema& s, position_in_partition_view start, position_in_partition_view end) const; + bool is_all_clustered_rows(const schema&) const; friend std::ostream& operator<<(std::ostream&, const position_range&); }; @@ -610,3 +611,8 @@ bool position_range::overlaps(const schema& s, position_in_partition_view start, position_in_partition::less_compare less(s); return !less(end, _start) && less(start, _end); } + +inline +bool position_range::is_all_clustered_rows(const schema& s) const { + return _start.is_before_all_clustered_rows(s) && _end.is_after_all_clustered_rows(s); +} diff --git a/raft/fsm.cc b/raft/fsm.cc index cd291a2bf7..7a4680c9a7 100644 --- a/raft/fsm.cc +++ b/raft/fsm.cc @@ -98,6 +98,11 @@ void fsm::become_leader() { _log_limiter_semaphore->sem.consume(_log.non_snapshoted_length()); _tracker->set_configuration(_current_config.servers, _log.next_idx()); _last_election_time = _clock.now(); + // a new leader needs to commit at lease one entry to make sure that + // all existing entries in its log are commited as well. Also it should + // send append entries rpc as soon as possible to establish its leqdership + // (3.4). Do both of those by commiting a dummy entry. + add_entry(log_entry::dummy()); replicate(); } diff --git a/repair/repair.cc b/repair/repair.cc index 262b31d2ad..2633e1201f 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -60,17 +60,17 @@ public: auto ops_label_type = sm::label("ops"); _metrics.add_group("node_ops", { sm::make_gauge("finished_percentage", [this] { return bootstrap_finished_percentage(); }, - sm::description("Number of finished percentage for bootstrap operation on this shard."), {ops_label_type("bootstrap")}), + sm::description("Finished percentage of node operation on this shard"), {ops_label_type("bootstrap")}), sm::make_gauge("finished_percentage", [this] { return replace_finished_percentage(); }, - sm::description("Number of finished percentage for replace operation on this shard."), {ops_label_type("replace")}), + sm::description("Finished percentage of node operation on this shard"), {ops_label_type("replace")}), sm::make_gauge("finished_percentage", [this] { return rebuild_finished_percentage(); }, - sm::description("Number of finished percentage for rebuild operation on this shard."), {ops_label_type("rebuild")}), + sm::description("Finished percentage of node operation on this shard"), {ops_label_type("rebuild")}), sm::make_gauge("finished_percentage", [this] { return decommission_finished_percentage(); }, - sm::description("Number of finished percentage for decommission operation on this shard."), {ops_label_type("decommission")}), + sm::description("Finished percentage of node operation on this shard"), {ops_label_type("decommission")}), sm::make_gauge("finished_percentage", [this] { return removenode_finished_percentage(); }, - sm::description("Number of finished percentage for removenode operation on this shard."), {ops_label_type("removenode")}), + sm::description("Finished percentage of node operation on this shard"), {ops_label_type("removenode")}), sm::make_gauge("finished_percentage", [this] { return repair_finished_percentage(); }, - sm::description("Number of finished percentage for repair operation on this shard."), {ops_label_type("repair")}), + sm::description("Finished percentage of node operation on this shard"), {ops_label_type("repair")}), }); } uint64_t bootstrap_total_ranges{0}; diff --git a/sstables/compaction.cc b/sstables/compaction.cc index b5094bb6ed..791c18597e 100644 --- a/sstables/compaction.cc +++ b/sstables/compaction.cc @@ -822,9 +822,8 @@ public: } flat_mutation_reader make_sstable_reader() const override { - return ::make_local_shard_sstable_reader(_schema, + return _compacting->make_local_shard_sstable_reader(_schema, _permit, - _compacting, query::full_partition_range, _schema->full_slice(), _io_priority, @@ -869,9 +868,8 @@ public: } flat_mutation_reader make_sstable_reader() const override { - return ::make_local_shard_sstable_reader(_schema, + return _compacting->make_local_shard_sstable_reader(_schema, _permit, - _compacting, query::full_partition_range, _schema->full_slice(), _io_priority, @@ -1344,9 +1342,8 @@ public: // Use reader that makes sure no non-local mutation will not be filtered out. flat_mutation_reader make_sstable_reader() const override { - return ::make_range_sstable_reader(_schema, + return _compacting->make_range_sstable_reader(_schema, _permit, - _compacting, query::full_partition_range, _schema->full_slice(), _io_priority, diff --git a/sstables/compaction_strategy.cc b/sstables/compaction_strategy.cc index 599af7e5ae..63a51b7395 100644 --- a/sstables/compaction_strategy.cc +++ b/sstables/compaction_strategy.cc @@ -47,10 +47,8 @@ #include "compaction_strategy_impl.hh" #include "schema.hh" #include "sstable_set.hh" -#include "compatible_ring_position.hh" #include #include -#include #include #include "size_tiered_compaction_strategy.hh" #include "date_tiered_compaction_strategy.hh" @@ -64,384 +62,6 @@ logging::logger leveled_manifest::logger("LeveledManifest"); namespace sstables { -extern logging::logger clogger; - -void sstable_run::insert(shared_sstable sst) { - _all.insert(std::move(sst)); -} - -void sstable_run::erase(shared_sstable sst) { - _all.erase(sst); -} - -uint64_t sstable_run::data_size() const { - return boost::accumulate(_all | boost::adaptors::transformed(std::mem_fn(&sstable::data_size)), uint64_t(0)); -} - -std::ostream& operator<<(std::ostream& os, const sstables::sstable_run& run) { - os << "Run = {\n"; - if (run.all().empty()) { - os << " Identifier: not found\n"; - } else { - os << format(" Identifier: {}\n", (*run.all().begin())->run_identifier()); - } - - auto frags = boost::copy_range>(run.all()); - boost::sort(frags, [] (const shared_sstable& x, const shared_sstable& y) { - return x->get_first_decorated_key().token() < y->get_first_decorated_key().token(); - }); - os << " Fragments = {\n"; - for (auto& frag : frags) { - os << format(" {}={}:{}\n", frag->generation(), frag->get_first_decorated_key().token(), frag->get_last_decorated_key().token()); - } - os << " }\n}\n"; - return os; -} - -class incremental_selector_impl { -public: - virtual ~incremental_selector_impl() {} - virtual std::tuple, dht::ring_position_view> select(const dht::ring_position_view&) = 0; -}; - -class sstable_set_impl { -public: - virtual ~sstable_set_impl() {} - virtual std::unique_ptr clone() const = 0; - virtual std::vector select(const dht::partition_range& range) const = 0; - virtual void insert(shared_sstable sst) = 0; - virtual void erase(shared_sstable sst) = 0; - virtual std::unique_ptr make_incremental_selector() const = 0; -}; - -sstable_set::sstable_set(std::unique_ptr impl, schema_ptr s, lw_shared_ptr all) - : _impl(std::move(impl)) - , _schema(std::move(s)) - , _all(std::move(all)) { -} - -sstable_set::sstable_set(const sstable_set& x) - : _impl(x._impl->clone()) - , _schema(x._schema) - , _all(make_lw_shared(*x._all)) - , _all_runs(x._all_runs) { -} - -sstable_set::sstable_set(sstable_set&&) noexcept = default; - -sstable_set& -sstable_set::operator=(const sstable_set& x) { - if (this != &x) { - auto tmp = sstable_set(x); - *this = std::move(tmp); - } - return *this; -} - -sstable_set& -sstable_set::operator=(sstable_set&&) noexcept = default; - -std::vector -sstable_set::select(const dht::partition_range& range) const { - return _impl->select(range); -} - -std::vector -sstable_set::select_sstable_runs(const std::vector& sstables) const { - auto run_ids = boost::copy_range>(sstables | boost::adaptors::transformed(std::mem_fn(&sstable::run_identifier))); - return boost::copy_range>(run_ids | boost::adaptors::transformed([this] (utils::UUID run_id) { - return _all_runs.at(run_id); - })); -} - -void -sstable_set::insert(shared_sstable sst) { - _impl->insert(sst); - try { - _all->insert(sst); - try { - _all_runs[sst->run_identifier()].insert(sst); - } catch (...) { - _all->erase(sst); - throw; - } - } catch (...) { - _impl->erase(sst); - throw; - } -} - -void -sstable_set::erase(shared_sstable sst) { - _impl->erase(sst); - _all->erase(sst); - _all_runs[sst->run_identifier()].erase(sst); -} - -sstable_set::~sstable_set() = default; - -sstable_set::incremental_selector::incremental_selector(std::unique_ptr impl, const schema& s) - : _impl(std::move(impl)) - , _cmp(s) { -} - -sstable_set::incremental_selector::~incremental_selector() = default; - -sstable_set::incremental_selector::incremental_selector(sstable_set::incremental_selector&&) noexcept = default; - -sstable_set::incremental_selector::selection -sstable_set::incremental_selector::select(const dht::ring_position_view& pos) const { - if (!_current_range_view || !_current_range_view->contains(pos, _cmp)) { - std::tie(_current_range, _current_sstables, _current_next_position) = _impl->select(pos); - _current_range_view = _current_range->transform([] (const dht::ring_position& rp) { return dht::ring_position_view(rp); }); - } - return {_current_sstables, _current_next_position}; -} - -sstable_set::incremental_selector -sstable_set::make_incremental_selector() const { - return incremental_selector(_impl->make_incremental_selector(), *_schema); -} - -// default sstable_set, not specialized for anything -class bag_sstable_set : public sstable_set_impl { - // erasing is slow, but select() is fast - std::vector _sstables; -public: - virtual std::unique_ptr clone() const override { - return std::make_unique(*this); - } - virtual std::vector select(const dht::partition_range& range = query::full_partition_range) const override { - return _sstables; - } - virtual void insert(shared_sstable sst) override { - _sstables.push_back(std::move(sst)); - } - virtual void erase(shared_sstable sst) override { - auto it = boost::range::find(_sstables, sst); - if (it != _sstables.end()){ - _sstables.erase(it); - } - } - virtual std::unique_ptr make_incremental_selector() const override; - class incremental_selector; -}; - -class bag_sstable_set::incremental_selector : public incremental_selector_impl { - const std::vector& _sstables; -public: - incremental_selector(const std::vector& sstables) - : _sstables(sstables) { - } - virtual std::tuple, dht::ring_position_view> select(const dht::ring_position_view&) override { - return std::make_tuple(dht::partition_range::make_open_ended_both_sides(), _sstables, dht::ring_position_view::max()); - } -}; - -std::unique_ptr bag_sstable_set::make_incremental_selector() const { - return std::make_unique(_sstables); -} - -// specialized when sstables are partitioned in the token range space -// e.g. leveled compaction strategy -class partitioned_sstable_set : public sstable_set_impl { - using value_set = std::unordered_set; - using interval_map_type = boost::icl::interval_map; - using interval_type = interval_map_type::interval_type; - using map_iterator = interval_map_type::const_iterator; -private: - schema_ptr _schema; - std::vector _unleveled_sstables; - interval_map_type _leveled_sstables; - // Change counter on interval map for leveled sstables which is used by - // incremental selector to determine whether or not to invalidate iterators. - uint64_t _leveled_sstables_change_cnt = 0; - bool _use_level_metadata = false; -private: - static interval_type make_interval(const schema& s, const dht::partition_range& range) { - return interval_type::closed( - compatible_ring_position_or_view(s, dht::ring_position_view(range.start()->value())), - compatible_ring_position_or_view(s, dht::ring_position_view(range.end()->value()))); - } - interval_type make_interval(const dht::partition_range& range) const { - return make_interval(*_schema, range); - } - static interval_type make_interval(const schema_ptr& s, const sstable& sst) { - return interval_type::closed( - compatible_ring_position_or_view(s, dht::ring_position(sst.get_first_decorated_key())), - compatible_ring_position_or_view(s, dht::ring_position(sst.get_last_decorated_key()))); - } - interval_type make_interval(const sstable& sst) { - return make_interval(_schema, sst); - } - interval_type singular(const dht::ring_position& rp) const { - // We should use the view here, since this is used for queries. - auto rpv = dht::ring_position_view(rp); - auto crp = compatible_ring_position_or_view(*_schema, std::move(rpv)); - return interval_type::closed(crp, crp); - } - std::pair query(const dht::partition_range& range) const { - if (range.start() && range.end()) { - return _leveled_sstables.equal_range(make_interval(range)); - } - else if (range.start() && !range.end()) { - auto start = singular(range.start()->value()); - return { _leveled_sstables.lower_bound(start), _leveled_sstables.end() }; - } else if (!range.start() && range.end()) { - auto end = singular(range.end()->value()); - return { _leveled_sstables.begin(), _leveled_sstables.upper_bound(end) }; - } else { - return { _leveled_sstables.begin(), _leveled_sstables.end() }; - } - } - // SSTables are stored separately to avoid interval map's fragmentation issue when level 0 falls behind. - bool store_as_unleveled(const shared_sstable& sst) const { - return _use_level_metadata && sst->get_sstable_level() == 0; - } -public: - static dht::ring_position to_ring_position(const compatible_ring_position_or_view& crp) { - // Ring position views, representing bounds of sstable intervals are - // guaranteed to have key() != nullptr; - const auto& pos = crp.position(); - return dht::ring_position(pos.token(), *pos.key()); - } - static dht::partition_range to_partition_range(const interval_type& i) { - return dht::partition_range::make( - {to_ring_position(i.lower()), boost::icl::is_left_closed(i.bounds())}, - {to_ring_position(i.upper()), boost::icl::is_right_closed(i.bounds())}); - } - static dht::partition_range to_partition_range(const dht::ring_position_view& pos, const interval_type& i) { - auto lower_bound = [&] { - if (pos.key()) { - return dht::partition_range::bound(dht::ring_position(pos.token(), *pos.key()), - pos.is_after_key() == dht::ring_position_view::after_key::no); - } else { - return dht::partition_range::bound(dht::ring_position(pos.token(), pos.get_token_bound()), true); - } - }(); - auto upper_bound = dht::partition_range::bound(to_ring_position(i.lower()), !boost::icl::is_left_closed(i.bounds())); - return dht::partition_range::make(std::move(lower_bound), std::move(upper_bound)); - } - explicit partitioned_sstable_set(schema_ptr schema, bool use_level_metadata = true) - : _schema(std::move(schema)) - , _use_level_metadata(use_level_metadata) { - } - virtual std::unique_ptr clone() const override { - return std::make_unique(*this); - } - virtual std::vector select(const dht::partition_range& range) const override { - auto ipair = query(range); - auto b = std::move(ipair.first); - auto e = std::move(ipair.second); - value_set result; - while (b != e) { - boost::copy(b++->second, std::inserter(result, result.end())); - } - auto r = _unleveled_sstables; - r.insert(r.end(), result.begin(), result.end()); - return r; - } - virtual void insert(shared_sstable sst) override { - if (store_as_unleveled(sst)) { - _unleveled_sstables.push_back(std::move(sst)); - } else { - _leveled_sstables_change_cnt++; - _leveled_sstables.add({make_interval(*sst), value_set({sst})}); - } - } - virtual void erase(shared_sstable sst) override { - if (store_as_unleveled(sst)) { - _unleveled_sstables.erase(std::remove(_unleveled_sstables.begin(), _unleveled_sstables.end(), sst), _unleveled_sstables.end()); - } else { - _leveled_sstables_change_cnt++; - _leveled_sstables.subtract({make_interval(*sst), value_set({sst})}); - } - } - virtual std::unique_ptr make_incremental_selector() const override; - class incremental_selector; -}; - -class partitioned_sstable_set::incremental_selector : public incremental_selector_impl { - schema_ptr _schema; - const std::vector& _unleveled_sstables; - const interval_map_type& _leveled_sstables; - const uint64_t& _leveled_sstables_change_cnt; - uint64_t _last_known_leveled_sstables_change_cnt; - map_iterator _it; - // Only to back the dht::ring_position_view returned from select(). - dht::ring_position _next_position; -private: - dht::ring_position_view next_position(map_iterator it) { - if (it == _leveled_sstables.end()) { - _next_position = dht::ring_position::max(); - return dht::ring_position_view::max(); - } else { - _next_position = partitioned_sstable_set::to_ring_position(it->first.lower()); - return dht::ring_position_view(_next_position, dht::ring_position_view::after_key(!boost::icl::is_left_closed(it->first.bounds()))); - } - } - static bool is_before_interval(const compatible_ring_position_or_view& crp, const interval_type& interval) { - if (boost::icl::is_left_closed(interval.bounds())) { - return crp < interval.lower(); - } else { - return crp <= interval.lower(); - } - } - void maybe_invalidate_iterator(const compatible_ring_position_or_view& crp) { - if (_last_known_leveled_sstables_change_cnt != _leveled_sstables_change_cnt) { - _it = _leveled_sstables.lower_bound(interval_type::closed(crp, crp)); - _last_known_leveled_sstables_change_cnt = _leveled_sstables_change_cnt; - } - } -public: - incremental_selector(schema_ptr schema, const std::vector& unleveled_sstables, const interval_map_type& leveled_sstables, - const uint64_t& leveled_sstables_change_cnt) - : _schema(std::move(schema)) - , _unleveled_sstables(unleveled_sstables) - , _leveled_sstables(leveled_sstables) - , _leveled_sstables_change_cnt(leveled_sstables_change_cnt) - , _last_known_leveled_sstables_change_cnt(leveled_sstables_change_cnt) - , _it(leveled_sstables.begin()) - , _next_position(dht::ring_position::min()) { - } - virtual std::tuple, dht::ring_position_view> select(const dht::ring_position_view& pos) override { - auto crp = compatible_ring_position_or_view(*_schema, pos); - auto ssts = _unleveled_sstables; - using namespace dht; - - maybe_invalidate_iterator(crp); - - while (_it != _leveled_sstables.end()) { - if (boost::icl::contains(_it->first, crp)) { - ssts.insert(ssts.end(), _it->second.begin(), _it->second.end()); - return std::make_tuple(partitioned_sstable_set::to_partition_range(_it->first), std::move(ssts), next_position(std::next(_it))); - } - // We don't want to skip current interval if pos lies before it. - if (is_before_interval(crp, _it->first)) { - return std::make_tuple(partitioned_sstable_set::to_partition_range(pos, _it->first), std::move(ssts), next_position(_it)); - } - _it++; - } - return std::make_tuple(partition_range::make_open_ended_both_sides(), std::move(ssts), ring_position_view::max()); - } -}; - -std::unique_ptr partitioned_sstable_set::make_incremental_selector() const { - return std::make_unique(_schema, _unleveled_sstables, _leveled_sstables, _leveled_sstables_change_cnt); -} - -std::unique_ptr compaction_strategy_impl::make_sstable_set(schema_ptr schema) const { - return std::make_unique(); -} - -std::unique_ptr leveled_compaction_strategy::make_sstable_set(schema_ptr schema) const { - return std::make_unique(std::move(schema)); -} - -sstable_set make_partitioned_sstable_set(schema_ptr schema, lw_shared_ptr all, bool use_level_metadata) { - return sstables::sstable_set(std::make_unique(schema, use_level_metadata), schema, std::move(all)); -} - compaction_descriptor compaction_strategy_impl::get_major_compaction_job(column_family& cf, std::vector candidates) { return compaction_descriptor(std::move(candidates), cf.get_sstable_set(), service::get_local_compaction_priority()); } @@ -550,6 +170,8 @@ void size_tiered_backlog_tracker::remove_sstable(sstables::shared_sstable sst) { namespace sstables { +extern logging::logger clogger; + // The backlog for TWCS is just the sum of the individual backlogs in each time window. // We'll keep various SizeTiered backlog tracker objects-- one per window for the static SSTables. // We then scan the current compacting and in-progress writes and matching them to existing time @@ -1031,14 +653,6 @@ bool compaction_strategy::use_clustering_key_filter() const { return _compaction_strategy_impl->use_clustering_key_filter(); } -sstable_set -compaction_strategy::make_sstable_set(schema_ptr schema) const { - return sstable_set( - _compaction_strategy_impl->make_sstable_set(schema), - schema, - make_lw_shared()); -} - compaction_backlog_tracker& compaction_strategy::get_backlog_tracker() { return _compaction_strategy_impl->get_backlog_tracker(); } diff --git a/sstables/sstable_set.cc b/sstables/sstable_set.cc new file mode 100644 index 0000000000..4e9b9f0d17 --- /dev/null +++ b/sstables/sstable_set.cc @@ -0,0 +1,659 @@ +/* + * Copyright (C) 2020 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#include + +#include "compatible_ring_position.hh" +#include "compaction_strategy_impl.hh" +#include "leveled_compaction_strategy.hh" + +#include "sstable_set_impl.hh" + +namespace sstables { + +void sstable_run::insert(shared_sstable sst) { + _all.insert(std::move(sst)); +} + +void sstable_run::erase(shared_sstable sst) { + _all.erase(sst); +} + +uint64_t sstable_run::data_size() const { + return boost::accumulate(_all | boost::adaptors::transformed(std::mem_fn(&sstable::data_size)), uint64_t(0)); +} + +std::ostream& operator<<(std::ostream& os, const sstables::sstable_run& run) { + os << "Run = {\n"; + if (run.all().empty()) { + os << " Identifier: not found\n"; + } else { + os << format(" Identifier: {}\n", (*run.all().begin())->run_identifier()); + } + + auto frags = boost::copy_range>(run.all()); + boost::sort(frags, [] (const shared_sstable& x, const shared_sstable& y) { + return x->get_first_decorated_key().token() < y->get_first_decorated_key().token(); + }); + os << " Fragments = {\n"; + for (auto& frag : frags) { + os << format(" {}={}:{}\n", frag->generation(), frag->get_first_decorated_key().token(), frag->get_last_decorated_key().token()); + } + os << " }\n}\n"; + return os; +} + +sstable_set::sstable_set(std::unique_ptr impl, schema_ptr s, lw_shared_ptr all) + : _impl(std::move(impl)) + , _schema(std::move(s)) + , _all(std::move(all)) { +} + +sstable_set::sstable_set(const sstable_set& x) + : _impl(x._impl->clone()) + , _schema(x._schema) + , _all(make_lw_shared(*x._all)) + , _all_runs(x._all_runs) { +} + +sstable_set::sstable_set(sstable_set&&) noexcept = default; + +sstable_set& +sstable_set::operator=(const sstable_set& x) { + if (this != &x) { + auto tmp = sstable_set(x); + *this = std::move(tmp); + } + return *this; +} + +sstable_set& +sstable_set::operator=(sstable_set&&) noexcept = default; + +std::vector +sstable_set::select(const dht::partition_range& range) const { + return _impl->select(range); +} + +std::vector +sstable_set::select_sstable_runs(const std::vector& sstables) const { + auto run_ids = boost::copy_range>(sstables | boost::adaptors::transformed(std::mem_fn(&sstable::run_identifier))); + return boost::copy_range>(run_ids | boost::adaptors::transformed([this] (utils::UUID run_id) { + return _all_runs.at(run_id); + })); +} + +void +sstable_set::insert(shared_sstable sst) { + _impl->insert(sst); + try { + _all->insert(sst); + try { + _all_runs[sst->run_identifier()].insert(sst); + } catch (...) { + _all->erase(sst); + throw; + } + } catch (...) { + _impl->erase(sst); + throw; + } +} + +void +sstable_set::erase(shared_sstable sst) { + _impl->erase(sst); + _all->erase(sst); + _all_runs[sst->run_identifier()].erase(sst); +} + +sstable_set::~sstable_set() = default; + +sstable_set::incremental_selector::incremental_selector(std::unique_ptr impl, const schema& s) + : _impl(std::move(impl)) + , _cmp(s) { +} + +sstable_set::incremental_selector::~incremental_selector() = default; + +sstable_set::incremental_selector::incremental_selector(sstable_set::incremental_selector&&) noexcept = default; + +sstable_set::incremental_selector::selection +sstable_set::incremental_selector::select(const dht::ring_position_view& pos) const { + if (!_current_range_view || !_current_range_view->contains(pos, _cmp)) { + std::tie(_current_range, _current_sstables, _current_next_position) = _impl->select(pos); + _current_range_view = _current_range->transform([] (const dht::ring_position& rp) { return dht::ring_position_view(rp); }); + } + return {_current_sstables, _current_next_position}; +} + +sstable_set::incremental_selector +sstable_set::make_incremental_selector() const { + return incremental_selector(_impl->make_incremental_selector(), *_schema); +} + +std::unique_ptr bag_sstable_set::clone() const { + return std::make_unique(*this); +} + +std::vector bag_sstable_set::select(const dht::partition_range& range) const { + return _sstables; +} + +void bag_sstable_set::insert(shared_sstable sst) { + _sstables.push_back(std::move(sst)); +} + +void bag_sstable_set::erase(shared_sstable sst) { + auto it = boost::range::find(_sstables, sst); + if (it != _sstables.end()){ + _sstables.erase(it); + } +} + +class bag_sstable_set::incremental_selector : public incremental_selector_impl { + const std::vector& _sstables; +public: + incremental_selector(const std::vector& sstables) + : _sstables(sstables) { + } + virtual std::tuple, dht::ring_position_view> select(const dht::ring_position_view&) override { + return std::make_tuple(dht::partition_range::make_open_ended_both_sides(), _sstables, dht::ring_position_view::max()); + } +}; + +std::unique_ptr bag_sstable_set::make_incremental_selector() const { + return std::make_unique(_sstables); +} + +partitioned_sstable_set::interval_type partitioned_sstable_set::make_interval(const schema& s, const dht::partition_range& range) { + return interval_type::closed( + compatible_ring_position_or_view(s, dht::ring_position_view(range.start()->value())), + compatible_ring_position_or_view(s, dht::ring_position_view(range.end()->value()))); +} + +partitioned_sstable_set::interval_type partitioned_sstable_set::make_interval(const dht::partition_range& range) const { + return make_interval(*_schema, range); +} + +partitioned_sstable_set::interval_type partitioned_sstable_set::make_interval(const schema_ptr& s, const sstable& sst) { + return interval_type::closed( + compatible_ring_position_or_view(s, dht::ring_position(sst.get_first_decorated_key())), + compatible_ring_position_or_view(s, dht::ring_position(sst.get_last_decorated_key()))); +} + +partitioned_sstable_set::interval_type partitioned_sstable_set::make_interval(const sstable& sst) { + return make_interval(_schema, sst); +} + +partitioned_sstable_set::interval_type partitioned_sstable_set::singular(const dht::ring_position& rp) const { + // We should use the view here, since this is used for queries. + auto rpv = dht::ring_position_view(rp); + auto crp = compatible_ring_position_or_view(*_schema, std::move(rpv)); + return interval_type::closed(crp, crp); +} + +std::pair +partitioned_sstable_set::query(const dht::partition_range& range) const { + if (range.start() && range.end()) { + return _leveled_sstables.equal_range(make_interval(range)); + } + else if (range.start() && !range.end()) { + auto start = singular(range.start()->value()); + return { _leveled_sstables.lower_bound(start), _leveled_sstables.end() }; + } else if (!range.start() && range.end()) { + auto end = singular(range.end()->value()); + return { _leveled_sstables.begin(), _leveled_sstables.upper_bound(end) }; + } else { + return { _leveled_sstables.begin(), _leveled_sstables.end() }; + } +} + +bool partitioned_sstable_set::store_as_unleveled(const shared_sstable& sst) const { + return _use_level_metadata && sst->get_sstable_level() == 0; +} + +dht::ring_position partitioned_sstable_set::to_ring_position(const compatible_ring_position_or_view& crp) { + // Ring position views, representing bounds of sstable intervals are + // guaranteed to have key() != nullptr; + const auto& pos = crp.position(); + return dht::ring_position(pos.token(), *pos.key()); +} + +dht::partition_range partitioned_sstable_set::to_partition_range(const interval_type& i) { + return dht::partition_range::make( + {to_ring_position(i.lower()), boost::icl::is_left_closed(i.bounds())}, + {to_ring_position(i.upper()), boost::icl::is_right_closed(i.bounds())}); +} + +dht::partition_range partitioned_sstable_set::to_partition_range(const dht::ring_position_view& pos, const interval_type& i) { + auto lower_bound = [&] { + if (pos.key()) { + return dht::partition_range::bound(dht::ring_position(pos.token(), *pos.key()), + pos.is_after_key() == dht::ring_position_view::after_key::no); + } else { + return dht::partition_range::bound(dht::ring_position(pos.token(), pos.get_token_bound()), true); + } + }(); + auto upper_bound = dht::partition_range::bound(to_ring_position(i.lower()), !boost::icl::is_left_closed(i.bounds())); + return dht::partition_range::make(std::move(lower_bound), std::move(upper_bound)); +} + +partitioned_sstable_set::partitioned_sstable_set(schema_ptr schema, bool use_level_metadata) + : _schema(std::move(schema)) + , _use_level_metadata(use_level_metadata) { +} + +std::unique_ptr partitioned_sstable_set::clone() const { + return std::make_unique(*this); +} + +std::vector partitioned_sstable_set::select(const dht::partition_range& range) const { + auto ipair = query(range); + auto b = std::move(ipair.first); + auto e = std::move(ipair.second); + value_set result; + while (b != e) { + boost::copy(b++->second, std::inserter(result, result.end())); + } + auto r = _unleveled_sstables; + r.insert(r.end(), result.begin(), result.end()); + return r; +} + +void partitioned_sstable_set::insert(shared_sstable sst) { + if (store_as_unleveled(sst)) { + _unleveled_sstables.push_back(std::move(sst)); + } else { + _leveled_sstables_change_cnt++; + _leveled_sstables.add({make_interval(*sst), value_set({sst})}); + } +} + +void partitioned_sstable_set::erase(shared_sstable sst) { + if (store_as_unleveled(sst)) { + _unleveled_sstables.erase(std::remove(_unleveled_sstables.begin(), _unleveled_sstables.end(), sst), _unleveled_sstables.end()); + } else { + _leveled_sstables_change_cnt++; + _leveled_sstables.subtract({make_interval(*sst), value_set({sst})}); + } +} + +class partitioned_sstable_set::incremental_selector : public incremental_selector_impl { + schema_ptr _schema; + const std::vector& _unleveled_sstables; + const interval_map_type& _leveled_sstables; + const uint64_t& _leveled_sstables_change_cnt; + uint64_t _last_known_leveled_sstables_change_cnt; + map_iterator _it; + // Only to back the dht::ring_position_view returned from select(). + dht::ring_position _next_position; +private: + dht::ring_position_view next_position(map_iterator it) { + if (it == _leveled_sstables.end()) { + _next_position = dht::ring_position::max(); + return dht::ring_position_view::max(); + } else { + _next_position = partitioned_sstable_set::to_ring_position(it->first.lower()); + return dht::ring_position_view(_next_position, dht::ring_position_view::after_key(!boost::icl::is_left_closed(it->first.bounds()))); + } + } + static bool is_before_interval(const compatible_ring_position_or_view& crp, const interval_type& interval) { + if (boost::icl::is_left_closed(interval.bounds())) { + return crp < interval.lower(); + } else { + return crp <= interval.lower(); + } + } + void maybe_invalidate_iterator(const compatible_ring_position_or_view& crp) { + if (_last_known_leveled_sstables_change_cnt != _leveled_sstables_change_cnt) { + _it = _leveled_sstables.lower_bound(interval_type::closed(crp, crp)); + _last_known_leveled_sstables_change_cnt = _leveled_sstables_change_cnt; + } + } +public: + incremental_selector(schema_ptr schema, const std::vector& unleveled_sstables, const interval_map_type& leveled_sstables, + const uint64_t& leveled_sstables_change_cnt) + : _schema(std::move(schema)) + , _unleveled_sstables(unleveled_sstables) + , _leveled_sstables(leveled_sstables) + , _leveled_sstables_change_cnt(leveled_sstables_change_cnt) + , _last_known_leveled_sstables_change_cnt(leveled_sstables_change_cnt) + , _it(leveled_sstables.begin()) + , _next_position(dht::ring_position::min()) { + } + virtual std::tuple, dht::ring_position_view> select(const dht::ring_position_view& pos) override { + auto crp = compatible_ring_position_or_view(*_schema, pos); + auto ssts = _unleveled_sstables; + using namespace dht; + + maybe_invalidate_iterator(crp); + + while (_it != _leveled_sstables.end()) { + if (boost::icl::contains(_it->first, crp)) { + ssts.insert(ssts.end(), _it->second.begin(), _it->second.end()); + return std::make_tuple(partitioned_sstable_set::to_partition_range(_it->first), std::move(ssts), next_position(std::next(_it))); + } + // We don't want to skip current interval if pos lies before it. + if (is_before_interval(crp, _it->first)) { + return std::make_tuple(partitioned_sstable_set::to_partition_range(pos, _it->first), std::move(ssts), next_position(_it)); + } + _it++; + } + return std::make_tuple(partition_range::make_open_ended_both_sides(), std::move(ssts), ring_position_view::max()); + } +}; + +std::unique_ptr partitioned_sstable_set::make_incremental_selector() const { + return std::make_unique(_schema, _unleveled_sstables, _leveled_sstables, _leveled_sstables_change_cnt); +} + +std::unique_ptr compaction_strategy_impl::make_sstable_set(schema_ptr schema) const { + return std::make_unique(); +} + +std::unique_ptr leveled_compaction_strategy::make_sstable_set(schema_ptr schema) const { + return std::make_unique(std::move(schema)); +} + +sstable_set make_partitioned_sstable_set(schema_ptr schema, lw_shared_ptr all, bool use_level_metadata) { + return sstable_set(std::make_unique(schema, use_level_metadata), schema, std::move(all)); +} + +sstable_set +compaction_strategy::make_sstable_set(schema_ptr schema) const { + return sstable_set( + _compaction_strategy_impl->make_sstable_set(schema), + schema, + make_lw_shared()); +} + +using sstable_reader_factory_type = std::function; + +static logging::logger irclogger("incremental_reader_selector"); + +// Incremental selector implementation for combined_mutation_reader that +// selects readers on-demand as the read progresses through the token +// range. +class incremental_reader_selector : public reader_selector { + const dht::partition_range* _pr; + lw_shared_ptr _sstables; + tracing::trace_state_ptr _trace_state; + std::optional _selector; + std::unordered_set _read_sstable_gens; + sstable_reader_factory_type _fn; + + flat_mutation_reader create_reader(shared_sstable sst) { + tracing::trace(_trace_state, "Reading partition range {} from sstable {}", *_pr, seastar::value_of([&sst] { return sst->get_filename(); })); + return _fn(sst, *_pr); + } + +public: + explicit incremental_reader_selector(schema_ptr s, + lw_shared_ptr sstables, + const dht::partition_range& pr, + tracing::trace_state_ptr trace_state, + sstable_reader_factory_type fn) + : reader_selector(s, pr.start() ? pr.start()->value() : dht::ring_position_view::min()) + , _pr(&pr) + , _sstables(std::move(sstables)) + , _trace_state(std::move(trace_state)) + , _selector(_sstables->make_incremental_selector()) + , _fn(std::move(fn)) { + + irclogger.trace("{}: created for range: {} with {} sstables", + fmt::ptr(this), + *_pr, + _sstables->all()->size()); + } + + incremental_reader_selector(const incremental_reader_selector&) = delete; + incremental_reader_selector& operator=(const incremental_reader_selector&) = delete; + + incremental_reader_selector(incremental_reader_selector&&) = delete; + incremental_reader_selector& operator=(incremental_reader_selector&&) = delete; + + virtual std::vector create_new_readers(const std::optional& pos) override { + irclogger.trace("{}: {}({})", fmt::ptr(this), __FUNCTION__, seastar::lazy_deref(pos)); + + auto readers = std::vector(); + + do { + auto selection = _selector->select(_selector_position); + _selector_position = selection.next_position; + + irclogger.trace("{}: {} sstables to consider, advancing selector to {}", fmt::ptr(this), selection.sstables.size(), + _selector_position); + + readers = boost::copy_range>(selection.sstables + | boost::adaptors::filtered([this] (auto& sst) { return _read_sstable_gens.emplace(sst->generation()).second; }) + | boost::adaptors::transformed([this] (auto& sst) { return this->create_reader(sst); })); + } while (!_selector_position.is_max() && readers.empty() && (!pos || dht::ring_position_tri_compare(*_s, *pos, _selector_position) >= 0)); + + irclogger.trace("{}: created {} new readers", fmt::ptr(this), readers.size()); + + // prevents sstable_set::incremental_selector::_current_sstables from holding reference to + // sstables when done selecting. + if (_selector_position.is_max()) { + _selector.reset(); + } + + return readers; + } + + virtual std::vector fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override { + _pr = ≺ + + auto pos = dht::ring_position_view::for_range_start(*_pr); + if (dht::ring_position_tri_compare(*_s, pos, _selector_position) >= 0) { + return create_new_readers(pos); + } + + return {}; + } +}; + +// The returned function uses the bloom filter to check whether the given sstable +// may have a partition given by the ring position `pos`. +// +// Returning `false` means the sstable doesn't have such a partition. +// Returning `true` means it may, i.e. we don't know whether or not it does. +// +// Assumes the given `pos` and `schema` are alive during the function's lifetime. +static std::predicate auto +make_pk_filter(const dht::ring_position& pos, const schema& schema) { + return [&pos, key = key::from_partition_key(schema, *pos.key()), cmp = dht::ring_position_comparator(schema)] (const sstable& sst) { + return cmp(pos, sst.get_first_decorated_key()) >= 0 && + cmp(pos, sst.get_last_decorated_key()) <= 0 && + sst.filter_has_key(key); + }; +} + +// Filter out sstables for reader using bloom filter +static std::vector +filter_sstable_for_reader_by_pk(std::vector&& sstables, const schema& schema, const dht::ring_position& pos) { + auto filter = [_filter = make_pk_filter(pos, schema)] (const shared_sstable& sst) { return !_filter(*sst); }; + sstables.erase(boost::remove_if(sstables, filter), sstables.end()); + return sstables; +} + +// Filter out sstables for reader using sstable metadata that keeps track +// of a range for each clustering component. +static std::vector +filter_sstable_for_reader_by_ck(std::vector&& sstables, column_family& cf, const schema_ptr& schema, + const query::partition_slice& slice) { + // no clustering filtering is applied if schema defines no clustering key or + // compaction strategy thinks it will not benefit from such an optimization, + // or the partition_slice includes static columns. + if (!schema->clustering_key_size() || !cf.get_compaction_strategy().use_clustering_key_filter() || slice.static_columns.size()) { + return sstables; + } + + ::cf_stats* stats = cf.cf_stats(); + stats->clustering_filter_count++; + stats->sstables_checked_by_clustering_filter += sstables.size(); + + auto ck_filtering_all_ranges = slice.get_all_ranges(); + // fast path to include all sstables if only one full range was specified. + // For example, this happens if query only specifies a partition key. + if (ck_filtering_all_ranges.size() == 1 && ck_filtering_all_ranges[0].is_full()) { + stats->clustering_filter_fast_path_count++; + stats->surviving_sstables_after_clustering_filter += sstables.size(); + return sstables; + } + + auto skipped = std::partition(sstables.begin(), sstables.end(), [&ranges = ck_filtering_all_ranges] (const shared_sstable& sst) { + return sst->may_contain_rows(ranges); + }); + sstables.erase(skipped, sstables.end()); + stats->surviving_sstables_after_clustering_filter += sstables.size(); + + return sstables; +} + +flat_mutation_reader +sstable_set::create_single_key_sstable_reader( + column_family* cf, + schema_ptr schema, + reader_permit permit, + utils::estimated_histogram& sstable_histogram, + const dht::ring_position& pos, + const query::partition_slice& slice, + const io_priority_class& pc, + tracing::trace_state_ptr trace_state, + streamed_mutation::forwarding fwd, + mutation_reader::forwarding fwd_mr) const +{ + auto selected_sstables = filter_sstable_for_reader_by_pk(select({pos}), *schema, pos); + auto num_sstables = selected_sstables.size(); + if (!num_sstables) { + return make_empty_flat_reader(schema, permit); + } + auto readers = boost::copy_range>( + filter_sstable_for_reader_by_ck(std::move(selected_sstables), *cf, schema, slice) + | boost::adaptors::transformed([&] (const shared_sstable& sstable) { + tracing::trace(trace_state, "Reading key {} from sstable {}", pos, seastar::value_of([&sstable] { return sstable->get_filename(); })); + return sstable->read_row_flat(schema, permit, pos, slice, pc, trace_state, fwd); + }) + ); + + // If filter_sstable_for_reader_by_ck filtered any sstable that contains the partition + // we want to emit partition_start/end if no rows were found, + // to prevent https://github.com/scylladb/scylla/issues/3552. + // + // Use `flat_mutation_reader_from_mutations` with an empty mutation to emit + // the partition_start/end pair and append it to the list of readers passed + // to make_combined_reader to ensure partition_start/end are emitted even if + // all sstables actually containing the partition were filtered. + auto num_readers = readers.size(); + if (num_readers != num_sstables) { + readers.push_back(flat_mutation_reader_from_mutations(permit, {mutation(schema, *pos.key())}, slice, fwd)); + } + sstable_histogram.add(num_readers); + return make_combined_reader(schema, std::move(permit), std::move(readers), fwd, fwd_mr); +} + +flat_mutation_reader +sstable_set::make_range_sstable_reader( + schema_ptr s, + reader_permit permit, + const dht::partition_range& pr, + const query::partition_slice& slice, + const io_priority_class& pc, + tracing::trace_state_ptr trace_state, + streamed_mutation::forwarding fwd, + mutation_reader::forwarding fwd_mr, + read_monitor_generator& monitor_generator) const +{ + auto reader_factory_fn = [s, permit, &slice, &pc, trace_state, fwd, fwd_mr, &monitor_generator] + (shared_sstable& sst, const dht::partition_range& pr) mutable { + return sst->read_range_rows_flat(s, permit, pr, slice, pc, trace_state, fwd, fwd_mr, monitor_generator(sst)); + }; + return make_combined_reader(s, std::move(permit), std::make_unique(s, + shared_from_this(), + pr, + std::move(trace_state), + std::move(reader_factory_fn)), + fwd, + fwd_mr); +} + +flat_mutation_reader +sstable_set::make_local_shard_sstable_reader( + schema_ptr s, + reader_permit permit, + const dht::partition_range& pr, + const query::partition_slice& slice, + const io_priority_class& pc, + tracing::trace_state_ptr trace_state, + streamed_mutation::forwarding fwd, + mutation_reader::forwarding fwd_mr, + read_monitor_generator& monitor_generator) const +{ + auto reader_factory_fn = [s, permit, &slice, &pc, trace_state, fwd, fwd_mr, &monitor_generator] + (shared_sstable& sst, const dht::partition_range& pr) mutable { + flat_mutation_reader reader = sst->read_range_rows_flat(s, permit, pr, slice, pc, + trace_state, fwd, fwd_mr, monitor_generator(sst)); + if (sst->is_shared()) { + auto filter = [&s = *s](const dht::decorated_key& dk) -> bool { + return dht::shard_of(s, dk.token()) == this_shard_id(); + }; + reader = make_filtering_reader(std::move(reader), std::move(filter)); + } + return reader; + }; + return make_combined_reader(s, std::move(permit), std::make_unique(s, + shared_from_this(), + pr, + std::move(trace_state), + std::move(reader_factory_fn)), + fwd, + fwd_mr); +} + +flat_mutation_reader make_restricted_range_sstable_reader( + lw_shared_ptr sstables, + schema_ptr s, + reader_permit permit, + const dht::partition_range& pr, + const query::partition_slice& slice, + const io_priority_class& pc, + tracing::trace_state_ptr trace_state, + streamed_mutation::forwarding fwd, + mutation_reader::forwarding fwd_mr, + read_monitor_generator& monitor_generator) +{ + auto ms = mutation_source([sstables=std::move(sstables), &monitor_generator] ( + schema_ptr s, + reader_permit permit, + const dht::partition_range& pr, + const query::partition_slice& slice, + const io_priority_class& pc, + tracing::trace_state_ptr trace_state, + streamed_mutation::forwarding fwd, + mutation_reader::forwarding fwd_mr) { + return sstables->make_range_sstable_reader(std::move(s), std::move(permit), pr, slice, pc, + std::move(trace_state), fwd, fwd_mr, monitor_generator); + }); + return make_restricted_flat_reader(std::move(ms), std::move(s), std::move(permit), pr, slice, pc, std::move(trace_state), fwd, fwd_mr); +} + +} // namespace sstables diff --git a/sstables/sstable_set.hh b/sstables/sstable_set.hh index 28a953e7bc..f0f5a86c73 100644 --- a/sstables/sstable_set.hh +++ b/sstables/sstable_set.hh @@ -21,11 +21,17 @@ #pragma once +#include "flat_mutation_reader.hh" +#include "sstables/progress_monitor.hh" #include "shared_sstable.hh" #include "dht/i_partitioner.hh" #include #include +namespace utils { +class estimated_histogram; +} + namespace sstables { class sstable_set_impl; @@ -43,7 +49,7 @@ public: const sstable_list& all() const { return _all; } }; -class sstable_set { +class sstable_set : public enable_lw_shared_from_this { std::unique_ptr _impl; schema_ptr _schema; // used to support column_family::get_sstable(), which wants to return an sstable_list @@ -99,8 +105,63 @@ public: selection select(const dht::ring_position_view& pos) const; }; incremental_selector make_incremental_selector() const; + + flat_mutation_reader create_single_key_sstable_reader( + column_family*, + schema_ptr, + reader_permit, + utils::estimated_histogram&, + const dht::ring_position&, // must contain a key + const query::partition_slice&, + const io_priority_class&, + tracing::trace_state_ptr, + streamed_mutation::forwarding, + mutation_reader::forwarding) const; + + /// Read a range from the sstable set. + /// + /// The reader is unrestricted, but will account its resource usage on the + /// semaphore belonging to the passed-in permit. + flat_mutation_reader make_range_sstable_reader( + schema_ptr, + reader_permit, + const dht::partition_range&, + const query::partition_slice&, + const io_priority_class&, + tracing::trace_state_ptr, + streamed_mutation::forwarding, + mutation_reader::forwarding, + read_monitor_generator& rmg = default_read_monitor_generator()) const; + + // Filters out mutations that don't belong to the current shard. + flat_mutation_reader make_local_shard_sstable_reader( + schema_ptr, + reader_permit, + const dht::partition_range&, + const query::partition_slice&, + const io_priority_class&, + tracing::trace_state_ptr, + streamed_mutation::forwarding, + mutation_reader::forwarding, + read_monitor_generator& rmg = default_read_monitor_generator()) const; }; +/// Read a range from the passed-in sstables. +/// +/// The reader is restricted, that is it will wait for admission on the semaphore +/// belonging to the passed-in permit, before starting to read. +flat_mutation_reader make_restricted_range_sstable_reader( + lw_shared_ptr sstables, + schema_ptr, + reader_permit, + const dht::partition_range&, + const query::partition_slice&, + const io_priority_class&, + tracing::trace_state_ptr, + streamed_mutation::forwarding, + mutation_reader::forwarding, + read_monitor_generator& rmg = default_read_monitor_generator()); + sstable_set make_partitioned_sstable_set(schema_ptr schema, lw_shared_ptr all, bool use_level_metadata = true); std::ostream& operator<<(std::ostream& os, const sstables::sstable_run& run); diff --git a/sstables/sstable_set_impl.hh b/sstables/sstable_set_impl.hh new file mode 100644 index 0000000000..e0113a5a06 --- /dev/null +++ b/sstables/sstable_set_impl.hh @@ -0,0 +1,98 @@ +/* + * Copyright (C) 2020 ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#pragma once + +#include + +#include "compatible_ring_position.hh" +#include "sstable_set.hh" + +namespace sstables { + +class incremental_selector_impl { +public: + virtual ~incremental_selector_impl() {} + virtual std::tuple, dht::ring_position_view> select(const dht::ring_position_view&) = 0; +}; + +class sstable_set_impl { +public: + virtual ~sstable_set_impl() {} + virtual std::unique_ptr clone() const = 0; + virtual std::vector select(const dht::partition_range& range) const = 0; + virtual void insert(shared_sstable sst) = 0; + virtual void erase(shared_sstable sst) = 0; + virtual std::unique_ptr make_incremental_selector() const = 0; +}; + +// default sstable_set, not specialized for anything +class bag_sstable_set : public sstable_set_impl { + // erasing is slow, but select() is fast + std::vector _sstables; +public: + virtual std::unique_ptr clone() const override; + virtual std::vector select(const dht::partition_range& range = query::full_partition_range) const override; + virtual void insert(shared_sstable sst) override; + virtual void erase(shared_sstable sst) override; + virtual std::unique_ptr make_incremental_selector() const override; + class incremental_selector; +}; + +// specialized when sstables are partitioned in the token range space +// e.g. leveled compaction strategy +class partitioned_sstable_set : public sstable_set_impl { + using value_set = std::unordered_set; + using interval_map_type = boost::icl::interval_map; + using interval_type = interval_map_type::interval_type; + using map_iterator = interval_map_type::const_iterator; +private: + schema_ptr _schema; + std::vector _unleveled_sstables; + interval_map_type _leveled_sstables; + // Change counter on interval map for leveled sstables which is used by + // incremental selector to determine whether or not to invalidate iterators. + uint64_t _leveled_sstables_change_cnt = 0; + bool _use_level_metadata = false; +private: + static interval_type make_interval(const schema& s, const dht::partition_range& range); + interval_type make_interval(const dht::partition_range& range) const; + static interval_type make_interval(const schema_ptr& s, const sstable& sst); + interval_type make_interval(const sstable& sst); + interval_type singular(const dht::ring_position& rp) const; + std::pair query(const dht::partition_range& range) const; + // SSTables are stored separately to avoid interval map's fragmentation issue when level 0 falls behind. + bool store_as_unleveled(const shared_sstable& sst) const; +public: + static dht::ring_position to_ring_position(const compatible_ring_position_or_view& crp); + static dht::partition_range to_partition_range(const interval_type& i); + static dht::partition_range to_partition_range(const dht::ring_position_view& pos, const interval_type& i); + explicit partitioned_sstable_set(schema_ptr schema, bool use_level_metadata = true); + + virtual std::unique_ptr clone() const override; + virtual std::vector select(const dht::partition_range& range) const override; + virtual void insert(shared_sstable sst) override; + virtual void erase(shared_sstable sst) override; + virtual std::unique_ptr make_incremental_selector() const override; + class incremental_selector; +}; + +} // namespace sstables diff --git a/sstables/sstables.hh b/sstables/sstables.hh index c4fb4056b0..c4843c0457 100644 --- a/sstables/sstables.hh +++ b/sstables/sstables.hh @@ -805,6 +805,12 @@ public: // Return true if this sstable possibly stores clustering row(s) specified by ranges. bool may_contain_rows(const query::clustering_row_ranges& ranges) const; + // false => there are no partition tombstones, true => we don't know + bool may_have_partition_tombstones() const { + return !has_correct_min_max_column_names() + || _position_range.is_all_clustered_rows(*_schema); + } + // Return the large_data_stats_entry identified by large_data_type // iff _large_data_stats is available and the requested entry is in // the map. Otherwise, return a disengaged optional. diff --git a/table.cc b/table.cc index 0be1ff8146..703879a93e 100644 --- a/table.cc +++ b/table.cc @@ -25,10 +25,7 @@ #include "service/priority_manager.hh" #include "db/schema_tables.hh" #include "cell_locking.hh" -#include "mutation_fragment.hh" -#include "mutation_partition.hh" #include "utils/logalloc.hh" -#include "sstables/progress_monitor.hh" #include "checked-file-impl.hh" #include "view_info.hh" #include "db/data_listeners.hh" @@ -52,229 +49,6 @@ static seastar::metrics::label keyspace_label("ks"); using namespace std::chrono_literals; -// Filter out sstables for reader using bloom filter -static std::vector -filter_sstable_for_reader_by_pk(std::vector&& sstables, column_family& cf, const schema_ptr& schema, - const dht::partition_range& pr, const sstables::key& key) { - const dht::ring_position& pr_key = pr.start()->value(); - auto sstable_has_not_key = [&, cmp = dht::ring_position_comparator(*schema)] (const sstables::shared_sstable& sst) { - return cmp(pr_key, sst->get_first_decorated_key()) < 0 || - cmp(pr_key, sst->get_last_decorated_key()) > 0 || - !sst->filter_has_key(key); - }; - sstables.erase(boost::remove_if(sstables, sstable_has_not_key), sstables.end()); - return sstables; -} - -// Filter out sstables for reader using sstable metadata that keeps track -// of a range for each clustering component. -static std::vector -filter_sstable_for_reader_by_ck(std::vector&& sstables, column_family& cf, const schema_ptr& schema, - const query::partition_slice& slice) { - // no clustering filtering is applied if schema defines no clustering key or - // compaction strategy thinks it will not benefit from such an optimization, - // or the partition_slice includes static columns. - if (!schema->clustering_key_size() || !cf.get_compaction_strategy().use_clustering_key_filter() || slice.static_columns.size()) { - return sstables; - } - - ::cf_stats* stats = cf.cf_stats(); - stats->clustering_filter_count++; - stats->sstables_checked_by_clustering_filter += sstables.size(); - - auto ck_filtering_all_ranges = slice.get_all_ranges(); - // fast path to include all sstables if only one full range was specified. - // For example, this happens if query only specifies a partition key. - if (ck_filtering_all_ranges.size() == 1 && ck_filtering_all_ranges[0].is_full()) { - stats->clustering_filter_fast_path_count++; - stats->surviving_sstables_after_clustering_filter += sstables.size(); - return sstables; - } - - auto skipped = std::partition(sstables.begin(), sstables.end(), [&ranges = ck_filtering_all_ranges] (const sstables::shared_sstable& sst) { - return sst->may_contain_rows(ranges); - }); - sstables.erase(skipped, sstables.end()); - stats->surviving_sstables_after_clustering_filter += sstables.size(); - - return sstables; -} - -// Incremental selector implementation for combined_mutation_reader that -// selects readers on-demand as the read progresses through the token -// range. -class incremental_reader_selector : public reader_selector { - const dht::partition_range* _pr; - lw_shared_ptr _sstables; - tracing::trace_state_ptr _trace_state; - std::optional _selector; - std::unordered_set _read_sstable_gens; - sstable_reader_factory_type _fn; - - flat_mutation_reader create_reader(sstables::shared_sstable sst) { - tracing::trace(_trace_state, "Reading partition range {} from sstable {}", *_pr, seastar::value_of([&sst] { return sst->get_filename(); })); - return _fn(sst, *_pr); - } - -public: - explicit incremental_reader_selector(schema_ptr s, - lw_shared_ptr sstables, - const dht::partition_range& pr, - tracing::trace_state_ptr trace_state, - sstable_reader_factory_type fn) - : reader_selector(s, pr.start() ? pr.start()->value() : dht::ring_position_view::min()) - , _pr(&pr) - , _sstables(std::move(sstables)) - , _trace_state(std::move(trace_state)) - , _selector(_sstables->make_incremental_selector()) - , _fn(std::move(fn)) { - - tlogger.trace("incremental_reader_selector {}: created for range: {} with {} sstables", - fmt::ptr(this), - *_pr, - _sstables->all()->size()); - } - - incremental_reader_selector(const incremental_reader_selector&) = delete; - incremental_reader_selector& operator=(const incremental_reader_selector&) = delete; - - incremental_reader_selector(incremental_reader_selector&&) = delete; - incremental_reader_selector& operator=(incremental_reader_selector&&) = delete; - - virtual std::vector create_new_readers(const std::optional& pos) override { - tlogger.trace("incremental_reader_selector {}: {}({})", fmt::ptr(this), __FUNCTION__, seastar::lazy_deref(pos)); - - auto readers = std::vector(); - - do { - auto selection = _selector->select(_selector_position); - _selector_position = selection.next_position; - - tlogger.trace("incremental_reader_selector {}: {} sstables to consider, advancing selector to {}", fmt::ptr(this), selection.sstables.size(), - _selector_position); - - readers = boost::copy_range>(selection.sstables - | boost::adaptors::filtered([this] (auto& sst) { return _read_sstable_gens.emplace(sst->generation()).second; }) - | boost::adaptors::transformed([this] (auto& sst) { return this->create_reader(sst); })); - } while (!_selector_position.is_max() && readers.empty() && (!pos || dht::ring_position_tri_compare(*_s, *pos, _selector_position) >= 0)); - - tlogger.trace("incremental_reader_selector {}: created {} new readers", fmt::ptr(this), readers.size()); - - // prevents sstable_set::incremental_selector::_current_sstables from holding reference to - // sstables when done selecting. - if (_selector_position.is_max()) { - _selector.reset(); - } - - return readers; - } - - virtual std::vector fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override { - _pr = ≺ - - auto pos = dht::ring_position_view::for_range_start(*_pr); - if (dht::ring_position_tri_compare(*_s, pos, _selector_position) >= 0) { - return create_new_readers(pos); - } - - return {}; - } -}; - -static flat_mutation_reader -create_single_key_sstable_reader(column_family* cf, - schema_ptr schema, - reader_permit permit, - lw_shared_ptr sstables, - utils::estimated_histogram& sstable_histogram, - const dht::partition_range& pr, // must be singular - const query::partition_slice& slice, - const io_priority_class& pc, - tracing::trace_state_ptr trace_state, - streamed_mutation::forwarding fwd, - mutation_reader::forwarding fwd_mr) -{ - auto& pk = pr.start()->value().key(); - auto key = sstables::key::from_partition_key(*schema, *pk); - auto selected_sstables = filter_sstable_for_reader_by_pk(sstables->select(pr), *cf, schema, pr, key); - auto num_sstables = selected_sstables.size(); - if (!num_sstables) { - return make_empty_flat_reader(schema, permit); - } - auto readers = boost::copy_range>( - filter_sstable_for_reader_by_ck(std::move(selected_sstables), *cf, schema, slice) - | boost::adaptors::transformed([&] (const sstables::shared_sstable& sstable) { - tracing::trace(trace_state, "Reading key {} from sstable {}", pr, seastar::value_of([&sstable] { return sstable->get_filename(); })); - return sstable->read_row_flat(schema, permit, pr.start()->value(), slice, pc, trace_state, fwd); - }) - ); - - // If filter_sstable_for_reader_by_ck filtered any sstable that contains the partition - // we want to emit partition_start/end if no rows were found, - // to prevent https://github.com/scylladb/scylla/issues/3552. - // - // Use `flat_mutation_reader_from_mutations` with an empty mutation to emit - // the partition_start/end pair and append it to the list of readers passed - // to make_combined_reader to ensure partition_start/end are emitted even if - // all sstables actually containing the partition were filtered. - auto num_readers = readers.size(); - if (num_readers != num_sstables) { - readers.push_back(flat_mutation_reader_from_mutations(permit, {mutation(schema, *pk)}, slice, fwd)); - } - sstable_histogram.add(num_readers); - return make_combined_reader(schema, std::move(permit), std::move(readers), fwd, fwd_mr); -} - -flat_mutation_reader make_range_sstable_reader(schema_ptr s, - reader_permit permit, - lw_shared_ptr sstables, - const dht::partition_range& pr, - const query::partition_slice& slice, - const io_priority_class& pc, - tracing::trace_state_ptr trace_state, - streamed_mutation::forwarding fwd, - mutation_reader::forwarding fwd_mr, - sstables::read_monitor_generator& monitor_generator) -{ - auto reader_factory_fn = [s, permit, &slice, &pc, trace_state, fwd, fwd_mr, &monitor_generator] - (sstables::shared_sstable& sst, const dht::partition_range& pr) mutable { - return sst->read_range_rows_flat(s, permit, pr, slice, pc, trace_state, fwd, fwd_mr, monitor_generator(sst)); - }; - return make_combined_reader(s, std::move(permit), std::make_unique(s, - std::move(sstables), - pr, - std::move(trace_state), - std::move(reader_factory_fn)), - fwd, - fwd_mr); -} - -flat_mutation_reader make_restricted_range_sstable_reader(schema_ptr s, - reader_permit permit, - lw_shared_ptr sstables, - const dht::partition_range& pr, - const query::partition_slice& slice, - const io_priority_class& pc, - tracing::trace_state_ptr trace_state, - streamed_mutation::forwarding fwd, - mutation_reader::forwarding fwd_mr, - sstables::read_monitor_generator& monitor_generator) -{ - auto ms = mutation_source([sstables=std::move(sstables), &monitor_generator] ( - schema_ptr s, - reader_permit permit, - const dht::partition_range& pr, - const query::partition_slice& slice, - const io_priority_class& pc, - tracing::trace_state_ptr trace_state, - streamed_mutation::forwarding fwd, - mutation_reader::forwarding fwd_mr) { - return make_range_sstable_reader(std::move(s), std::move(permit), std::move(sstables), pr, slice, pc, - std::move(trace_state), fwd, fwd_mr, monitor_generator); - }); - return make_restricted_flat_reader(std::move(ms), std::move(s), std::move(permit), pr, slice, pc, std::move(trace_state), fwd, fwd_mr); -} - flat_mutation_reader table::make_sstable_reader(schema_ptr s, reader_permit permit, @@ -315,8 +89,9 @@ table::make_sstable_reader(schema_ptr s, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) { - return create_single_key_sstable_reader(const_cast(this), std::move(s), std::move(permit), std::move(sstables), - _stats.estimated_sstable_per_read, pr, slice, pc, std::move(trace_state), fwd, fwd_mr); + assert(pr.is_singular() && pr.start()->value().has_key()); + return sstables->create_single_key_sstable_reader(const_cast(this), std::move(s), std::move(permit), + _stats.estimated_sstable_per_read, pr.start()->value(), slice, pc, std::move(trace_state), fwd, fwd_mr); }); } else { return mutation_source([sstables=std::move(sstables)] ( @@ -328,7 +103,7 @@ table::make_sstable_reader(schema_ptr s, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) { - return make_local_shard_sstable_reader(std::move(s), std::move(permit), std::move(sstables), pr, slice, pc, + return sstables->make_local_shard_sstable_reader(std::move(s), std::move(permit), pr, slice, pc, std::move(trace_state), fwd, fwd_mr); }); } @@ -519,38 +294,6 @@ static bool belongs_to_other_shard(const std::vector& shards) { return shards.size() != size_t(belongs_to_current_shard(shards)); } -flat_mutation_reader make_local_shard_sstable_reader(schema_ptr s, - reader_permit permit, - lw_shared_ptr sstables, - const dht::partition_range& pr, - const query::partition_slice& slice, - const io_priority_class& pc, - tracing::trace_state_ptr trace_state, - streamed_mutation::forwarding fwd, - mutation_reader::forwarding fwd_mr, - sstables::read_monitor_generator& monitor_generator) -{ - auto reader_factory_fn = [s, permit, &slice, &pc, trace_state, fwd, fwd_mr, &monitor_generator] - (sstables::shared_sstable& sst, const dht::partition_range& pr) mutable { - flat_mutation_reader reader = sst->read_range_rows_flat(s, permit, pr, slice, pc, - trace_state, fwd, fwd_mr, monitor_generator(sst)); - if (sst->is_shared()) { - auto filter = [&s = *s](const dht::decorated_key& dk) -> bool { - return dht::shard_of(s, dk.token()) == this_shard_id(); - }; - reader = make_filtering_reader(std::move(reader), std::move(filter)); - } - return reader; - }; - return make_combined_reader(s, std::move(permit), std::make_unique(s, - std::move(sstables), - pr, - std::move(trace_state), - std::move(reader_factory_fn)), - fwd, - fwd_mr); -} - sstables::shared_sstable table::make_sstable(sstring dir, int64_t generation, sstables::sstable_version_types v, sstables::sstable_format_types f, io_error_handler_gen error_handler_gen) { return get_sstables_manager().make_sstable(_schema, dir, generation, v, f, gc_clock::now(), error_handler_gen); diff --git a/test/boost/mutation_reader_test.cc b/test/boost/mutation_reader_test.cc index 0e650d8771..12b35f8bd7 100644 --- a/test/boost/mutation_reader_test.cc +++ b/test/boost/mutation_reader_test.cc @@ -648,10 +648,9 @@ SEASTAR_THREAD_TEST_CASE(combined_mutation_reader_test) { auto list_reader = make_combined_reader(s.schema(), tests::make_permit(), std::move(sstable_mutation_readers)); - auto incremental_reader = make_local_shard_sstable_reader( + auto incremental_reader = sstable_set->make_local_shard_sstable_reader( s.schema(), tests::make_permit(), - sstable_set, query::full_partition_range, s.schema()->full_slice(), seastar::default_priority_class(), diff --git a/test/boost/raft_fsm_test.cc b/test/boost/raft_fsm_test.cc index f2b8ec1158..d9cf85a81a 100644 --- a/test/boost/raft_fsm_test.cc +++ b/test/boost/raft_fsm_test.cc @@ -69,7 +69,8 @@ BOOST_AUTO_TEST_CASE(test_election_single_node) { BOOST_CHECK(output.term); BOOST_CHECK(output.vote); BOOST_CHECK(output.messages.empty()); - BOOST_CHECK(output.log_entries.empty()); + // A new leader applies one dummy entry + BOOST_CHECK(output.log_entries.size() == 1 && std::holds_alternative(output.log_entries[0]->data)); BOOST_CHECK(output.committed.empty()); // The leader does not become candidate simply because // a timeout has elapsed, i.e. there are no spurious @@ -81,7 +82,8 @@ BOOST_AUTO_TEST_CASE(test_election_single_node) { BOOST_CHECK(!output.vote); BOOST_CHECK(output.messages.empty()); BOOST_CHECK(output.log_entries.empty()); - BOOST_CHECK(output.committed.empty()); + // Dummy entry is now commited + BOOST_CHECK(output.committed.size() == 1 && std::holds_alternative(output.committed[0]->data)); } // Test that adding an entry to a single-node cluster diff --git a/test/boost/sstable_datafile_test.cc b/test/boost/sstable_datafile_test.cc index 472291072c..37c67f6a10 100644 --- a/test/boost/sstable_datafile_test.cc +++ b/test/boost/sstable_datafile_test.cc @@ -6083,9 +6083,8 @@ SEASTAR_TEST_CASE(purged_tombstone_consumer_sstable_test) { for (auto&& sst : all) { compacting->insert(std::move(sst)); } - auto reader = ::make_range_sstable_reader(s, + auto reader = compacting->make_range_sstable_reader(s, tests::make_permit(), - compacting, query::full_partition_range, s->full_slice(), service::get_local_compaction_priority(), @@ -6685,3 +6684,43 @@ SEASTAR_TEST_CASE(test_zero_estimated_partitions) { return make_ready_future<>(); }); } + +SEASTAR_TEST_CASE(test_may_have_partition_tombstones) { + return test_env::do_with_async([] (test_env& env) { + storage_service_for_tests ssft; + simple_schema ss; + auto s = ss.schema(); + auto pks = ss.make_pkeys(2); + + auto tmp = tmpdir(); + unsigned gen = 0; + for (auto version : all_sstable_versions) { + if (version < sstable_version_types::md) { + continue; + } + + auto mut1 = mutation(s, pks[0]); + auto mut2 = mutation(s, pks[1]); + mut1.partition().apply_insert(*s, ss.make_ckey(0), ss.new_timestamp()); + mut1.partition().apply_delete(*s, ss.make_ckey(1), ss.new_tombstone()); + ss.add_row(mut1, ss.make_ckey(2), "val"); + ss.delete_range(mut1, query::clustering_range::make({ss.make_ckey(3)}, {ss.make_ckey(5)})); + ss.add_row(mut2, ss.make_ckey(6), "val"); + + auto sst_gen = [&env, s, &tmp, &gen, version] () { + return env.make_sstable(s, tmp.path().string(), ++gen, version, big); + }; + + { + auto sst = make_sstable_containing(sst_gen, {mut1, mut2}); + sst->load().get(); + BOOST_REQUIRE(!sst->may_have_partition_tombstones()); + } + + mut2.partition().apply(ss.new_tombstone()); + auto sst = make_sstable_containing(sst_gen, {mut1, mut2}); + sst->load().get(); + BOOST_REQUIRE(sst->may_have_partition_tombstones()); + } + }); +} diff --git a/test/raft/replication_test.cc b/test/raft/replication_test.cc index a6d26d6ee0..529ba05f2c 100644 --- a/test/raft/replication_test.cc +++ b/test/raft/replication_test.cc @@ -586,7 +586,7 @@ future run_test(test_case test) { for (auto& s : persisted_snapshots) { auto& [snp, val] = s.second; auto digest = val.value.get_value(); - expected = sm_value_for(snp.idx).get_value(); + expected = sm_value_for(val.idx).get_value(); if (digest != expected) { tlogger.debug("Persisted snapshot {} doesn't match {} != {}", snp.id, digest, expected); fail = -1; @@ -615,6 +615,8 @@ int main(int argc, char* argv[]) { {.name = "non_empty_leader_log", .nodes = 2, .initial_states = {{.le = {{1,0},{1,1},{1,2},{1,3}}}}, .updates = {entries{4}}}, + {.name = "non_empty_leader_log_no_new_entries", .nodes = 2, .total_values = 4, + .initial_states = {{.le = {{1,0},{1,1},{1,2},{1,3}}}}}, // 1 nodes, 12 client entries {.name = "simple_1_auto_12", .nodes = 1, .initial_states = {}, .updates = {entries{12}}},