From 40d8bfa394aa941507df20d1232396bc13c55a4f Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Thu, 8 Oct 2020 16:48:38 +0200 Subject: [PATCH] sstables: move sstable reader creation functions to `sstable_set` Lower level functions such as `create_single_key_sstable_reader` were made methods of `sstable_set`. The motivation is that each concrete sstable_set may decide to use a better sstable reading algorithm specific to the data structures used by this sstable_set. For this it needs to access the set's internals. A nice side effect is that we moved some code out of table.cc and database.hh which are huge files. --- database.hh | 44 ----- db/view/view.cc | 3 +- db/view/view_builder.hh | 1 - db/view/view_update_generator.cc | 2 +- sstables/compaction.cc | 9 +- sstables/sstable_set.cc | 264 +++++++++++++++++++++++++++- sstables/sstable_set.hh | 63 ++++++- table.cc | 262 +-------------------------- test/boost/mutation_reader_test.cc | 3 +- test/boost/sstable_datafile_test.cc | 3 +- 10 files changed, 334 insertions(+), 320 deletions(-) diff --git a/database.hh b/database.hh index 08158aa34c..5fd98b20da 100644 --- a/database.hh +++ b/database.hh @@ -1001,50 +1001,6 @@ public: friend class distributed_loader; }; -using sstable_reader_factory_type = std::function; - -// Filters out mutation that doesn't belong to current shard. -flat_mutation_reader make_local_shard_sstable_reader(schema_ptr s, - reader_permit permit, - lw_shared_ptr sstables, - const dht::partition_range& pr, - const query::partition_slice& slice, - const io_priority_class& pc, - tracing::trace_state_ptr trace_state, - streamed_mutation::forwarding fwd, - mutation_reader::forwarding fwd_mr, - sstables::read_monitor_generator& monitor_generator = sstables::default_read_monitor_generator()); - -/// Read a range from the passed-in sstables. -/// -/// The reader is unrestricted, but will account its resource usage on the -/// semaphore belonging to the passed-in permit. -flat_mutation_reader make_range_sstable_reader(schema_ptr s, - reader_permit permit, - lw_shared_ptr sstables, - const dht::partition_range& pr, - const query::partition_slice& slice, - const io_priority_class& pc, - tracing::trace_state_ptr trace_state, - streamed_mutation::forwarding fwd, - mutation_reader::forwarding fwd_mr, - sstables::read_monitor_generator& monitor_generator = sstables::default_read_monitor_generator()); - -/// Read a range from the passed-in sstables. -/// -/// The reader is restricted, that is it will wait for admission on the semaphore -/// belonging to the passed-in permit, before starting to read. -flat_mutation_reader make_restricted_range_sstable_reader(schema_ptr s, - reader_permit permit, - lw_shared_ptr sstables, - const dht::partition_range& pr, - const query::partition_slice& slice, - const io_priority_class& pc, - tracing::trace_state_ptr trace_state, - streamed_mutation::forwarding fwd, - mutation_reader::forwarding fwd_mr, - sstables::read_monitor_generator& monitor_generator = sstables::default_read_monitor_generator()); - class user_types_metadata; class keyspace_metadata final { diff --git a/db/view/view.cc b/db/view/view.cc index 2b59b0fc1e..bfdf14c5a7 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -1430,10 +1430,9 @@ view_builder::build_step& view_builder::get_or_create_build_step(utils::UUID bas void view_builder::initialize_reader_at_current_token(build_step& step) { step.pslice = make_partition_slice(*step.base->schema()); step.prange = dht::partition_range(dht::ring_position::starting_at(step.current_token()), dht::ring_position::max()); - step.reader = make_local_shard_sstable_reader( + step.reader = step.base->get_sstable_set().make_local_shard_sstable_reader( step.base->schema(), _permit, - make_lw_shared(step.base->get_sstable_set()), step.prange, step.pslice, default_priority_class(), diff --git a/db/view/view_builder.hh b/db/view/view_builder.hh index 0e4d997934..1364571806 100644 --- a/db/view/view_builder.hh +++ b/db/view/view_builder.hh @@ -28,7 +28,6 @@ #include "query-request.hh" #include "service/migration_listener.hh" #include "service/migration_manager.hh" -#include "sstables/sstable_set.hh" #include "utils/exponential_backoff_retry.hh" #include "utils/serialized_action.hh" #include "utils/UUID.hh" diff --git a/db/view/view_update_generator.cc b/db/view/view_update_generator.cc index a43c98097e..703d8ce454 100644 --- a/db/view/view_update_generator.cc +++ b/db/view/view_update_generator.cc @@ -82,7 +82,7 @@ future<> view_update_generator::start() { tracing::trace_state_ptr ts, streamed_mutation::forwarding fwd_ms, mutation_reader::forwarding fwd_mr) { - return ::make_restricted_range_sstable_reader(s, std::move(permit), std::move(ssts), pr, ps, pc, std::move(ts), fwd_ms, fwd_mr); + return make_restricted_range_sstable_reader(std::move(ssts), s, std::move(permit), pr, ps, pc, std::move(ts), fwd_ms, fwd_mr); }); auto [staging_sstable_reader, staging_sstable_reader_handle] = make_manually_paused_evictable_reader( std::move(ms), diff --git a/sstables/compaction.cc b/sstables/compaction.cc index 8eff060c0c..8f795aa0a2 100644 --- a/sstables/compaction.cc +++ b/sstables/compaction.cc @@ -822,9 +822,8 @@ public: } flat_mutation_reader make_sstable_reader() const override { - return ::make_local_shard_sstable_reader(_schema, + return _compacting->make_local_shard_sstable_reader(_schema, _permit, - _compacting, query::full_partition_range, _schema->full_slice(), _io_priority, @@ -869,9 +868,8 @@ public: } flat_mutation_reader make_sstable_reader() const override { - return ::make_local_shard_sstable_reader(_schema, + return _compacting->make_local_shard_sstable_reader(_schema, _permit, - _compacting, query::full_partition_range, _schema->full_slice(), _io_priority, @@ -1341,9 +1339,8 @@ public: // Use reader that makes sure no non-local mutation will not be filtered out. flat_mutation_reader make_sstable_reader() const override { - return ::make_range_sstable_reader(_schema, + return _compacting->make_range_sstable_reader(_schema, _permit, - _compacting, query::full_partition_range, _schema->full_slice(), _io_priority, diff --git a/sstables/sstable_set.cc b/sstables/sstable_set.cc index 255f7c41a4..3d12d2df3f 100644 --- a/sstables/sstable_set.cc +++ b/sstables/sstable_set.cc @@ -375,7 +375,7 @@ std::unique_ptr leveled_compaction_strategy::make_sstable_set( } sstable_set make_partitioned_sstable_set(schema_ptr schema, lw_shared_ptr all, bool use_level_metadata) { - return sstables::sstable_set(std::make_unique(schema, use_level_metadata), schema, std::move(all)); + return sstable_set(std::make_unique(schema, use_level_metadata), schema, std::move(all)); } sstable_set @@ -386,4 +386,266 @@ compaction_strategy::make_sstable_set(schema_ptr schema) const { make_lw_shared()); } +using sstable_reader_factory_type = std::function; + +static logging::logger irclogger("incremental_reader_selector"); + +// Incremental selector implementation for combined_mutation_reader that +// selects readers on-demand as the read progresses through the token +// range. +class incremental_reader_selector : public reader_selector { + const dht::partition_range* _pr; + lw_shared_ptr _sstables; + tracing::trace_state_ptr _trace_state; + std::optional _selector; + std::unordered_set _read_sstable_gens; + sstable_reader_factory_type _fn; + + flat_mutation_reader create_reader(shared_sstable sst) { + tracing::trace(_trace_state, "Reading partition range {} from sstable {}", *_pr, seastar::value_of([&sst] { return sst->get_filename(); })); + return _fn(sst, *_pr); + } + +public: + explicit incremental_reader_selector(schema_ptr s, + lw_shared_ptr sstables, + const dht::partition_range& pr, + tracing::trace_state_ptr trace_state, + sstable_reader_factory_type fn) + : reader_selector(s, pr.start() ? pr.start()->value() : dht::ring_position_view::min()) + , _pr(&pr) + , _sstables(std::move(sstables)) + , _trace_state(std::move(trace_state)) + , _selector(_sstables->make_incremental_selector()) + , _fn(std::move(fn)) { + + irclogger.trace("{}: created for range: {} with {} sstables", + fmt::ptr(this), + *_pr, + _sstables->all()->size()); + } + + incremental_reader_selector(const incremental_reader_selector&) = delete; + incremental_reader_selector& operator=(const incremental_reader_selector&) = delete; + + incremental_reader_selector(incremental_reader_selector&&) = delete; + incremental_reader_selector& operator=(incremental_reader_selector&&) = delete; + + virtual std::vector create_new_readers(const std::optional& pos) override { + irclogger.trace("{}: {}({})", fmt::ptr(this), __FUNCTION__, seastar::lazy_deref(pos)); + + auto readers = std::vector(); + + do { + auto selection = _selector->select(_selector_position); + _selector_position = selection.next_position; + + irclogger.trace("{}: {} sstables to consider, advancing selector to {}", fmt::ptr(this), selection.sstables.size(), + _selector_position); + + readers = boost::copy_range>(selection.sstables + | boost::adaptors::filtered([this] (auto& sst) { return _read_sstable_gens.emplace(sst->generation()).second; }) + | boost::adaptors::transformed([this] (auto& sst) { return this->create_reader(sst); })); + } while (!_selector_position.is_max() && readers.empty() && (!pos || dht::ring_position_tri_compare(*_s, *pos, _selector_position) >= 0)); + + irclogger.trace("{}: created {} new readers", fmt::ptr(this), readers.size()); + + // prevents sstable_set::incremental_selector::_current_sstables from holding reference to + // sstables when done selecting. + if (_selector_position.is_max()) { + _selector.reset(); + } + + return readers; + } + + virtual std::vector fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override { + _pr = ≺ + + auto pos = dht::ring_position_view::for_range_start(*_pr); + if (dht::ring_position_tri_compare(*_s, pos, _selector_position) >= 0) { + return create_new_readers(pos); + } + + return {}; + } +}; + +// Filter out sstables for reader using bloom filter +static std::vector +filter_sstable_for_reader_by_pk(std::vector&& sstables, column_family& cf, const schema_ptr& schema, + const dht::partition_range& pr, const key& key) { + const dht::ring_position& pr_key = pr.start()->value(); + auto sstable_has_not_key = [&, cmp = dht::ring_position_comparator(*schema)] (const shared_sstable& sst) { + return cmp(pr_key, sst->get_first_decorated_key()) < 0 || + cmp(pr_key, sst->get_last_decorated_key()) > 0 || + !sst->filter_has_key(key); + }; + sstables.erase(boost::remove_if(sstables, sstable_has_not_key), sstables.end()); + return sstables; +} + +// Filter out sstables for reader using sstable metadata that keeps track +// of a range for each clustering component. +static std::vector +filter_sstable_for_reader_by_ck(std::vector&& sstables, column_family& cf, const schema_ptr& schema, + const query::partition_slice& slice) { + // no clustering filtering is applied if schema defines no clustering key or + // compaction strategy thinks it will not benefit from such an optimization, + // or the partition_slice includes static columns. + if (!schema->clustering_key_size() || !cf.get_compaction_strategy().use_clustering_key_filter() || slice.static_columns.size()) { + return sstables; + } + + ::cf_stats* stats = cf.cf_stats(); + stats->clustering_filter_count++; + stats->sstables_checked_by_clustering_filter += sstables.size(); + + auto ck_filtering_all_ranges = slice.get_all_ranges(); + // fast path to include all sstables if only one full range was specified. + // For example, this happens if query only specifies a partition key. + if (ck_filtering_all_ranges.size() == 1 && ck_filtering_all_ranges[0].is_full()) { + stats->clustering_filter_fast_path_count++; + stats->surviving_sstables_after_clustering_filter += sstables.size(); + return sstables; + } + + auto skipped = std::partition(sstables.begin(), sstables.end(), [&ranges = ck_filtering_all_ranges] (const shared_sstable& sst) { + return sst->may_contain_rows(ranges); + }); + sstables.erase(skipped, sstables.end()); + stats->surviving_sstables_after_clustering_filter += sstables.size(); + + return sstables; +} + +flat_mutation_reader +sstable_set::create_single_key_sstable_reader( + column_family* cf, + schema_ptr schema, + reader_permit permit, + utils::estimated_histogram& sstable_histogram, + const dht::partition_range& pr, + const query::partition_slice& slice, + const io_priority_class& pc, + tracing::trace_state_ptr trace_state, + streamed_mutation::forwarding fwd, + mutation_reader::forwarding fwd_mr) const +{ + auto& pk = pr.start()->value().key(); + auto key = key::from_partition_key(*schema, *pk); + auto selected_sstables = filter_sstable_for_reader_by_pk(select(pr), *cf, schema, pr, key); + auto num_sstables = selected_sstables.size(); + if (!num_sstables) { + return make_empty_flat_reader(schema, permit); + } + auto readers = boost::copy_range>( + filter_sstable_for_reader_by_ck(std::move(selected_sstables), *cf, schema, slice) + | boost::adaptors::transformed([&] (const shared_sstable& sstable) { + tracing::trace(trace_state, "Reading key {} from sstable {}", pr, seastar::value_of([&sstable] { return sstable->get_filename(); })); + return sstable->read_row_flat(schema, permit, pr.start()->value(), slice, pc, trace_state, fwd); + }) + ); + + // If filter_sstable_for_reader_by_ck filtered any sstable that contains the partition + // we want to emit partition_start/end if no rows were found, + // to prevent https://github.com/scylladb/scylla/issues/3552. + // + // Use `flat_mutation_reader_from_mutations` with an empty mutation to emit + // the partition_start/end pair and append it to the list of readers passed + // to make_combined_reader to ensure partition_start/end are emitted even if + // all sstables actually containing the partition were filtered. + auto num_readers = readers.size(); + if (num_readers != num_sstables) { + readers.push_back(flat_mutation_reader_from_mutations(permit, {mutation(schema, *pk)}, slice, fwd)); + } + sstable_histogram.add(num_readers); + return make_combined_reader(schema, std::move(permit), std::move(readers), fwd, fwd_mr); +} + +flat_mutation_reader +sstable_set::make_range_sstable_reader( + schema_ptr s, + reader_permit permit, + const dht::partition_range& pr, + const query::partition_slice& slice, + const io_priority_class& pc, + tracing::trace_state_ptr trace_state, + streamed_mutation::forwarding fwd, + mutation_reader::forwarding fwd_mr, + read_monitor_generator& monitor_generator) const +{ + auto reader_factory_fn = [s, permit, &slice, &pc, trace_state, fwd, fwd_mr, &monitor_generator] + (shared_sstable& sst, const dht::partition_range& pr) mutable { + return sst->read_range_rows_flat(s, permit, pr, slice, pc, trace_state, fwd, fwd_mr, monitor_generator(sst)); + }; + return make_combined_reader(s, std::move(permit), std::make_unique(s, + shared_from_this(), + pr, + std::move(trace_state), + std::move(reader_factory_fn)), + fwd, + fwd_mr); +} + +flat_mutation_reader +sstable_set::make_local_shard_sstable_reader( + schema_ptr s, + reader_permit permit, + const dht::partition_range& pr, + const query::partition_slice& slice, + const io_priority_class& pc, + tracing::trace_state_ptr trace_state, + streamed_mutation::forwarding fwd, + mutation_reader::forwarding fwd_mr, + read_monitor_generator& monitor_generator) const +{ + auto reader_factory_fn = [s, permit, &slice, &pc, trace_state, fwd, fwd_mr, &monitor_generator] + (shared_sstable& sst, const dht::partition_range& pr) mutable { + flat_mutation_reader reader = sst->read_range_rows_flat(s, permit, pr, slice, pc, + trace_state, fwd, fwd_mr, monitor_generator(sst)); + if (sst->is_shared()) { + auto filter = [&s = *s](const dht::decorated_key& dk) -> bool { + return dht::shard_of(s, dk.token()) == this_shard_id(); + }; + reader = make_filtering_reader(std::move(reader), std::move(filter)); + } + return reader; + }; + return make_combined_reader(s, std::move(permit), std::make_unique(s, + shared_from_this(), + pr, + std::move(trace_state), + std::move(reader_factory_fn)), + fwd, + fwd_mr); +} + +flat_mutation_reader make_restricted_range_sstable_reader( + lw_shared_ptr sstables, + schema_ptr s, + reader_permit permit, + const dht::partition_range& pr, + const query::partition_slice& slice, + const io_priority_class& pc, + tracing::trace_state_ptr trace_state, + streamed_mutation::forwarding fwd, + mutation_reader::forwarding fwd_mr, + read_monitor_generator& monitor_generator) +{ + auto ms = mutation_source([sstables=std::move(sstables), &monitor_generator] ( + schema_ptr s, + reader_permit permit, + const dht::partition_range& pr, + const query::partition_slice& slice, + const io_priority_class& pc, + tracing::trace_state_ptr trace_state, + streamed_mutation::forwarding fwd, + mutation_reader::forwarding fwd_mr) { + return sstables->make_range_sstable_reader(std::move(s), std::move(permit), pr, slice, pc, + std::move(trace_state), fwd, fwd_mr, monitor_generator); + }); + return make_restricted_flat_reader(std::move(ms), std::move(s), std::move(permit), pr, slice, pc, std::move(trace_state), fwd, fwd_mr); +} + } // namespace sstables diff --git a/sstables/sstable_set.hh b/sstables/sstable_set.hh index 28a953e7bc..69d9d006cb 100644 --- a/sstables/sstable_set.hh +++ b/sstables/sstable_set.hh @@ -21,11 +21,17 @@ #pragma once +#include "flat_mutation_reader.hh" +#include "sstables/progress_monitor.hh" #include "shared_sstable.hh" #include "dht/i_partitioner.hh" #include #include +namespace utils { +class estimated_histogram; +} + namespace sstables { class sstable_set_impl; @@ -43,7 +49,7 @@ public: const sstable_list& all() const { return _all; } }; -class sstable_set { +class sstable_set : public enable_lw_shared_from_this { std::unique_ptr _impl; schema_ptr _schema; // used to support column_family::get_sstable(), which wants to return an sstable_list @@ -99,8 +105,63 @@ public: selection select(const dht::ring_position_view& pos) const; }; incremental_selector make_incremental_selector() const; + + flat_mutation_reader create_single_key_sstable_reader( + column_family*, + schema_ptr, + reader_permit, + utils::estimated_histogram&, + const dht::partition_range&, // must be singular + const query::partition_slice&, + const io_priority_class&, + tracing::trace_state_ptr, + streamed_mutation::forwarding, + mutation_reader::forwarding) const; + + /// Read a range from the sstable set. + /// + /// The reader is unrestricted, but will account its resource usage on the + /// semaphore belonging to the passed-in permit. + flat_mutation_reader make_range_sstable_reader( + schema_ptr, + reader_permit, + const dht::partition_range&, + const query::partition_slice&, + const io_priority_class&, + tracing::trace_state_ptr, + streamed_mutation::forwarding, + mutation_reader::forwarding, + read_monitor_generator& rmg = default_read_monitor_generator()) const; + + // Filters out mutations that don't belong to the current shard. + flat_mutation_reader make_local_shard_sstable_reader( + schema_ptr, + reader_permit, + const dht::partition_range&, + const query::partition_slice&, + const io_priority_class&, + tracing::trace_state_ptr, + streamed_mutation::forwarding, + mutation_reader::forwarding, + read_monitor_generator& rmg = default_read_monitor_generator()) const; }; +/// Read a range from the passed-in sstables. +/// +/// The reader is restricted, that is it will wait for admission on the semaphore +/// belonging to the passed-in permit, before starting to read. +flat_mutation_reader make_restricted_range_sstable_reader( + lw_shared_ptr sstables, + schema_ptr, + reader_permit, + const dht::partition_range&, + const query::partition_slice&, + const io_priority_class&, + tracing::trace_state_ptr, + streamed_mutation::forwarding, + mutation_reader::forwarding, + read_monitor_generator& rmg = default_read_monitor_generator()); + sstable_set make_partitioned_sstable_set(schema_ptr schema, lw_shared_ptr all, bool use_level_metadata = true); std::ostream& operator<<(std::ostream& os, const sstables::sstable_run& run); diff --git a/table.cc b/table.cc index 0be1ff8146..97bc41f173 100644 --- a/table.cc +++ b/table.cc @@ -25,10 +25,7 @@ #include "service/priority_manager.hh" #include "db/schema_tables.hh" #include "cell_locking.hh" -#include "mutation_fragment.hh" -#include "mutation_partition.hh" #include "utils/logalloc.hh" -#include "sstables/progress_monitor.hh" #include "checked-file-impl.hh" #include "view_info.hh" #include "db/data_listeners.hh" @@ -52,229 +49,6 @@ static seastar::metrics::label keyspace_label("ks"); using namespace std::chrono_literals; -// Filter out sstables for reader using bloom filter -static std::vector -filter_sstable_for_reader_by_pk(std::vector&& sstables, column_family& cf, const schema_ptr& schema, - const dht::partition_range& pr, const sstables::key& key) { - const dht::ring_position& pr_key = pr.start()->value(); - auto sstable_has_not_key = [&, cmp = dht::ring_position_comparator(*schema)] (const sstables::shared_sstable& sst) { - return cmp(pr_key, sst->get_first_decorated_key()) < 0 || - cmp(pr_key, sst->get_last_decorated_key()) > 0 || - !sst->filter_has_key(key); - }; - sstables.erase(boost::remove_if(sstables, sstable_has_not_key), sstables.end()); - return sstables; -} - -// Filter out sstables for reader using sstable metadata that keeps track -// of a range for each clustering component. -static std::vector -filter_sstable_for_reader_by_ck(std::vector&& sstables, column_family& cf, const schema_ptr& schema, - const query::partition_slice& slice) { - // no clustering filtering is applied if schema defines no clustering key or - // compaction strategy thinks it will not benefit from such an optimization, - // or the partition_slice includes static columns. - if (!schema->clustering_key_size() || !cf.get_compaction_strategy().use_clustering_key_filter() || slice.static_columns.size()) { - return sstables; - } - - ::cf_stats* stats = cf.cf_stats(); - stats->clustering_filter_count++; - stats->sstables_checked_by_clustering_filter += sstables.size(); - - auto ck_filtering_all_ranges = slice.get_all_ranges(); - // fast path to include all sstables if only one full range was specified. - // For example, this happens if query only specifies a partition key. - if (ck_filtering_all_ranges.size() == 1 && ck_filtering_all_ranges[0].is_full()) { - stats->clustering_filter_fast_path_count++; - stats->surviving_sstables_after_clustering_filter += sstables.size(); - return sstables; - } - - auto skipped = std::partition(sstables.begin(), sstables.end(), [&ranges = ck_filtering_all_ranges] (const sstables::shared_sstable& sst) { - return sst->may_contain_rows(ranges); - }); - sstables.erase(skipped, sstables.end()); - stats->surviving_sstables_after_clustering_filter += sstables.size(); - - return sstables; -} - -// Incremental selector implementation for combined_mutation_reader that -// selects readers on-demand as the read progresses through the token -// range. -class incremental_reader_selector : public reader_selector { - const dht::partition_range* _pr; - lw_shared_ptr _sstables; - tracing::trace_state_ptr _trace_state; - std::optional _selector; - std::unordered_set _read_sstable_gens; - sstable_reader_factory_type _fn; - - flat_mutation_reader create_reader(sstables::shared_sstable sst) { - tracing::trace(_trace_state, "Reading partition range {} from sstable {}", *_pr, seastar::value_of([&sst] { return sst->get_filename(); })); - return _fn(sst, *_pr); - } - -public: - explicit incremental_reader_selector(schema_ptr s, - lw_shared_ptr sstables, - const dht::partition_range& pr, - tracing::trace_state_ptr trace_state, - sstable_reader_factory_type fn) - : reader_selector(s, pr.start() ? pr.start()->value() : dht::ring_position_view::min()) - , _pr(&pr) - , _sstables(std::move(sstables)) - , _trace_state(std::move(trace_state)) - , _selector(_sstables->make_incremental_selector()) - , _fn(std::move(fn)) { - - tlogger.trace("incremental_reader_selector {}: created for range: {} with {} sstables", - fmt::ptr(this), - *_pr, - _sstables->all()->size()); - } - - incremental_reader_selector(const incremental_reader_selector&) = delete; - incremental_reader_selector& operator=(const incremental_reader_selector&) = delete; - - incremental_reader_selector(incremental_reader_selector&&) = delete; - incremental_reader_selector& operator=(incremental_reader_selector&&) = delete; - - virtual std::vector create_new_readers(const std::optional& pos) override { - tlogger.trace("incremental_reader_selector {}: {}({})", fmt::ptr(this), __FUNCTION__, seastar::lazy_deref(pos)); - - auto readers = std::vector(); - - do { - auto selection = _selector->select(_selector_position); - _selector_position = selection.next_position; - - tlogger.trace("incremental_reader_selector {}: {} sstables to consider, advancing selector to {}", fmt::ptr(this), selection.sstables.size(), - _selector_position); - - readers = boost::copy_range>(selection.sstables - | boost::adaptors::filtered([this] (auto& sst) { return _read_sstable_gens.emplace(sst->generation()).second; }) - | boost::adaptors::transformed([this] (auto& sst) { return this->create_reader(sst); })); - } while (!_selector_position.is_max() && readers.empty() && (!pos || dht::ring_position_tri_compare(*_s, *pos, _selector_position) >= 0)); - - tlogger.trace("incremental_reader_selector {}: created {} new readers", fmt::ptr(this), readers.size()); - - // prevents sstable_set::incremental_selector::_current_sstables from holding reference to - // sstables when done selecting. - if (_selector_position.is_max()) { - _selector.reset(); - } - - return readers; - } - - virtual std::vector fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override { - _pr = ≺ - - auto pos = dht::ring_position_view::for_range_start(*_pr); - if (dht::ring_position_tri_compare(*_s, pos, _selector_position) >= 0) { - return create_new_readers(pos); - } - - return {}; - } -}; - -static flat_mutation_reader -create_single_key_sstable_reader(column_family* cf, - schema_ptr schema, - reader_permit permit, - lw_shared_ptr sstables, - utils::estimated_histogram& sstable_histogram, - const dht::partition_range& pr, // must be singular - const query::partition_slice& slice, - const io_priority_class& pc, - tracing::trace_state_ptr trace_state, - streamed_mutation::forwarding fwd, - mutation_reader::forwarding fwd_mr) -{ - auto& pk = pr.start()->value().key(); - auto key = sstables::key::from_partition_key(*schema, *pk); - auto selected_sstables = filter_sstable_for_reader_by_pk(sstables->select(pr), *cf, schema, pr, key); - auto num_sstables = selected_sstables.size(); - if (!num_sstables) { - return make_empty_flat_reader(schema, permit); - } - auto readers = boost::copy_range>( - filter_sstable_for_reader_by_ck(std::move(selected_sstables), *cf, schema, slice) - | boost::adaptors::transformed([&] (const sstables::shared_sstable& sstable) { - tracing::trace(trace_state, "Reading key {} from sstable {}", pr, seastar::value_of([&sstable] { return sstable->get_filename(); })); - return sstable->read_row_flat(schema, permit, pr.start()->value(), slice, pc, trace_state, fwd); - }) - ); - - // If filter_sstable_for_reader_by_ck filtered any sstable that contains the partition - // we want to emit partition_start/end if no rows were found, - // to prevent https://github.com/scylladb/scylla/issues/3552. - // - // Use `flat_mutation_reader_from_mutations` with an empty mutation to emit - // the partition_start/end pair and append it to the list of readers passed - // to make_combined_reader to ensure partition_start/end are emitted even if - // all sstables actually containing the partition were filtered. - auto num_readers = readers.size(); - if (num_readers != num_sstables) { - readers.push_back(flat_mutation_reader_from_mutations(permit, {mutation(schema, *pk)}, slice, fwd)); - } - sstable_histogram.add(num_readers); - return make_combined_reader(schema, std::move(permit), std::move(readers), fwd, fwd_mr); -} - -flat_mutation_reader make_range_sstable_reader(schema_ptr s, - reader_permit permit, - lw_shared_ptr sstables, - const dht::partition_range& pr, - const query::partition_slice& slice, - const io_priority_class& pc, - tracing::trace_state_ptr trace_state, - streamed_mutation::forwarding fwd, - mutation_reader::forwarding fwd_mr, - sstables::read_monitor_generator& monitor_generator) -{ - auto reader_factory_fn = [s, permit, &slice, &pc, trace_state, fwd, fwd_mr, &monitor_generator] - (sstables::shared_sstable& sst, const dht::partition_range& pr) mutable { - return sst->read_range_rows_flat(s, permit, pr, slice, pc, trace_state, fwd, fwd_mr, monitor_generator(sst)); - }; - return make_combined_reader(s, std::move(permit), std::make_unique(s, - std::move(sstables), - pr, - std::move(trace_state), - std::move(reader_factory_fn)), - fwd, - fwd_mr); -} - -flat_mutation_reader make_restricted_range_sstable_reader(schema_ptr s, - reader_permit permit, - lw_shared_ptr sstables, - const dht::partition_range& pr, - const query::partition_slice& slice, - const io_priority_class& pc, - tracing::trace_state_ptr trace_state, - streamed_mutation::forwarding fwd, - mutation_reader::forwarding fwd_mr, - sstables::read_monitor_generator& monitor_generator) -{ - auto ms = mutation_source([sstables=std::move(sstables), &monitor_generator] ( - schema_ptr s, - reader_permit permit, - const dht::partition_range& pr, - const query::partition_slice& slice, - const io_priority_class& pc, - tracing::trace_state_ptr trace_state, - streamed_mutation::forwarding fwd, - mutation_reader::forwarding fwd_mr) { - return make_range_sstable_reader(std::move(s), std::move(permit), std::move(sstables), pr, slice, pc, - std::move(trace_state), fwd, fwd_mr, monitor_generator); - }); - return make_restricted_flat_reader(std::move(ms), std::move(s), std::move(permit), pr, slice, pc, std::move(trace_state), fwd, fwd_mr); -} - flat_mutation_reader table::make_sstable_reader(schema_ptr s, reader_permit permit, @@ -315,7 +89,7 @@ table::make_sstable_reader(schema_ptr s, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) { - return create_single_key_sstable_reader(const_cast(this), std::move(s), std::move(permit), std::move(sstables), + return sstables->create_single_key_sstable_reader(const_cast(this), std::move(s), std::move(permit), _stats.estimated_sstable_per_read, pr, slice, pc, std::move(trace_state), fwd, fwd_mr); }); } else { @@ -328,7 +102,7 @@ table::make_sstable_reader(schema_ptr s, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) { - return make_local_shard_sstable_reader(std::move(s), std::move(permit), std::move(sstables), pr, slice, pc, + return sstables->make_local_shard_sstable_reader(std::move(s), std::move(permit), pr, slice, pc, std::move(trace_state), fwd, fwd_mr); }); } @@ -519,38 +293,6 @@ static bool belongs_to_other_shard(const std::vector& shards) { return shards.size() != size_t(belongs_to_current_shard(shards)); } -flat_mutation_reader make_local_shard_sstable_reader(schema_ptr s, - reader_permit permit, - lw_shared_ptr sstables, - const dht::partition_range& pr, - const query::partition_slice& slice, - const io_priority_class& pc, - tracing::trace_state_ptr trace_state, - streamed_mutation::forwarding fwd, - mutation_reader::forwarding fwd_mr, - sstables::read_monitor_generator& monitor_generator) -{ - auto reader_factory_fn = [s, permit, &slice, &pc, trace_state, fwd, fwd_mr, &monitor_generator] - (sstables::shared_sstable& sst, const dht::partition_range& pr) mutable { - flat_mutation_reader reader = sst->read_range_rows_flat(s, permit, pr, slice, pc, - trace_state, fwd, fwd_mr, monitor_generator(sst)); - if (sst->is_shared()) { - auto filter = [&s = *s](const dht::decorated_key& dk) -> bool { - return dht::shard_of(s, dk.token()) == this_shard_id(); - }; - reader = make_filtering_reader(std::move(reader), std::move(filter)); - } - return reader; - }; - return make_combined_reader(s, std::move(permit), std::make_unique(s, - std::move(sstables), - pr, - std::move(trace_state), - std::move(reader_factory_fn)), - fwd, - fwd_mr); -} - sstables::shared_sstable table::make_sstable(sstring dir, int64_t generation, sstables::sstable_version_types v, sstables::sstable_format_types f, io_error_handler_gen error_handler_gen) { return get_sstables_manager().make_sstable(_schema, dir, generation, v, f, gc_clock::now(), error_handler_gen); diff --git a/test/boost/mutation_reader_test.cc b/test/boost/mutation_reader_test.cc index a74d413431..b69bc3c053 100644 --- a/test/boost/mutation_reader_test.cc +++ b/test/boost/mutation_reader_test.cc @@ -646,10 +646,9 @@ SEASTAR_THREAD_TEST_CASE(combined_mutation_reader_test) { auto list_reader = make_combined_reader(s.schema(), tests::make_permit(), std::move(sstable_mutation_readers)); - auto incremental_reader = make_local_shard_sstable_reader( + auto incremental_reader = sstable_set->make_local_shard_sstable_reader( s.schema(), tests::make_permit(), - sstable_set, query::full_partition_range, s.schema()->full_slice(), seastar::default_priority_class(), diff --git a/test/boost/sstable_datafile_test.cc b/test/boost/sstable_datafile_test.cc index 472291072c..be6697850f 100644 --- a/test/boost/sstable_datafile_test.cc +++ b/test/boost/sstable_datafile_test.cc @@ -6083,9 +6083,8 @@ SEASTAR_TEST_CASE(purged_tombstone_consumer_sstable_test) { for (auto&& sst : all) { compacting->insert(std::move(sst)); } - auto reader = ::make_range_sstable_reader(s, + auto reader = compacting->make_range_sstable_reader(s, tests::make_permit(), - compacting, query::full_partition_range, s->full_slice(), service::get_local_compaction_priority(),