sstables: move sstable reader creation functions to sstable_set

Lower level functions such as `create_single_key_sstable_reader`
were made methods of `sstable_set`.

The motivation is that each concrete sstable_set
may decide to use a better sstable reading algorithm specific to the
data structures used by this sstable_set. For this it needs to access
the set's internals.

A nice side effect is that we moved some code out of table.cc
and database.hh which are huge files.
This commit is contained in:
Kamil Braun
2020-10-08 16:48:38 +02:00
parent 708093884c
commit 40d8bfa394
10 changed files with 334 additions and 320 deletions

View File

@@ -1001,50 +1001,6 @@ public:
friend class distributed_loader;
};
using sstable_reader_factory_type = std::function<flat_mutation_reader(sstables::shared_sstable&, const dht::partition_range& pr)>;
// Filters out mutation that doesn't belong to current shard.
flat_mutation_reader make_local_shard_sstable_reader(schema_ptr s,
reader_permit permit,
lw_shared_ptr<sstables::sstable_set> sstables,
const dht::partition_range& pr,
const query::partition_slice& slice,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr,
sstables::read_monitor_generator& monitor_generator = sstables::default_read_monitor_generator());
/// Read a range from the passed-in sstables.
///
/// The reader is unrestricted, but will account its resource usage on the
/// semaphore belonging to the passed-in permit.
flat_mutation_reader make_range_sstable_reader(schema_ptr s,
reader_permit permit,
lw_shared_ptr<sstables::sstable_set> sstables,
const dht::partition_range& pr,
const query::partition_slice& slice,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr,
sstables::read_monitor_generator& monitor_generator = sstables::default_read_monitor_generator());
/// Read a range from the passed-in sstables.
///
/// The reader is restricted, that is it will wait for admission on the semaphore
/// belonging to the passed-in permit, before starting to read.
flat_mutation_reader make_restricted_range_sstable_reader(schema_ptr s,
reader_permit permit,
lw_shared_ptr<sstables::sstable_set> sstables,
const dht::partition_range& pr,
const query::partition_slice& slice,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr,
sstables::read_monitor_generator& monitor_generator = sstables::default_read_monitor_generator());
class user_types_metadata;
class keyspace_metadata final {

View File

@@ -1430,10 +1430,9 @@ view_builder::build_step& view_builder::get_or_create_build_step(utils::UUID bas
void view_builder::initialize_reader_at_current_token(build_step& step) {
step.pslice = make_partition_slice(*step.base->schema());
step.prange = dht::partition_range(dht::ring_position::starting_at(step.current_token()), dht::ring_position::max());
step.reader = make_local_shard_sstable_reader(
step.reader = step.base->get_sstable_set().make_local_shard_sstable_reader(
step.base->schema(),
_permit,
make_lw_shared<sstables::sstable_set>(step.base->get_sstable_set()),
step.prange,
step.pslice,
default_priority_class(),

View File

@@ -28,7 +28,6 @@
#include "query-request.hh"
#include "service/migration_listener.hh"
#include "service/migration_manager.hh"
#include "sstables/sstable_set.hh"
#include "utils/exponential_backoff_retry.hh"
#include "utils/serialized_action.hh"
#include "utils/UUID.hh"

View File

@@ -82,7 +82,7 @@ future<> view_update_generator::start() {
tracing::trace_state_ptr ts,
streamed_mutation::forwarding fwd_ms,
mutation_reader::forwarding fwd_mr) {
return ::make_restricted_range_sstable_reader(s, std::move(permit), std::move(ssts), pr, ps, pc, std::move(ts), fwd_ms, fwd_mr);
return make_restricted_range_sstable_reader(std::move(ssts), s, std::move(permit), pr, ps, pc, std::move(ts), fwd_ms, fwd_mr);
});
auto [staging_sstable_reader, staging_sstable_reader_handle] = make_manually_paused_evictable_reader(
std::move(ms),

View File

@@ -822,9 +822,8 @@ public:
}
flat_mutation_reader make_sstable_reader() const override {
return ::make_local_shard_sstable_reader(_schema,
return _compacting->make_local_shard_sstable_reader(_schema,
_permit,
_compacting,
query::full_partition_range,
_schema->full_slice(),
_io_priority,
@@ -869,9 +868,8 @@ public:
}
flat_mutation_reader make_sstable_reader() const override {
return ::make_local_shard_sstable_reader(_schema,
return _compacting->make_local_shard_sstable_reader(_schema,
_permit,
_compacting,
query::full_partition_range,
_schema->full_slice(),
_io_priority,
@@ -1341,9 +1339,8 @@ public:
// Use reader that makes sure no non-local mutation will not be filtered out.
flat_mutation_reader make_sstable_reader() const override {
return ::make_range_sstable_reader(_schema,
return _compacting->make_range_sstable_reader(_schema,
_permit,
_compacting,
query::full_partition_range,
_schema->full_slice(),
_io_priority,

View File

@@ -375,7 +375,7 @@ std::unique_ptr<sstable_set_impl> leveled_compaction_strategy::make_sstable_set(
}
sstable_set make_partitioned_sstable_set(schema_ptr schema, lw_shared_ptr<sstable_list> all, bool use_level_metadata) {
return sstables::sstable_set(std::make_unique<partitioned_sstable_set>(schema, use_level_metadata), schema, std::move(all));
return sstable_set(std::make_unique<partitioned_sstable_set>(schema, use_level_metadata), schema, std::move(all));
}
sstable_set
@@ -386,4 +386,266 @@ compaction_strategy::make_sstable_set(schema_ptr schema) const {
make_lw_shared<sstable_list>());
}
using sstable_reader_factory_type = std::function<flat_mutation_reader(shared_sstable&, const dht::partition_range& pr)>;
static logging::logger irclogger("incremental_reader_selector");
// Incremental selector implementation for combined_mutation_reader that
// selects readers on-demand as the read progresses through the token
// range.
class incremental_reader_selector : public reader_selector {
const dht::partition_range* _pr;
lw_shared_ptr<const sstable_set> _sstables;
tracing::trace_state_ptr _trace_state;
std::optional<sstable_set::incremental_selector> _selector;
std::unordered_set<int64_t> _read_sstable_gens;
sstable_reader_factory_type _fn;
flat_mutation_reader create_reader(shared_sstable sst) {
tracing::trace(_trace_state, "Reading partition range {} from sstable {}", *_pr, seastar::value_of([&sst] { return sst->get_filename(); }));
return _fn(sst, *_pr);
}
public:
explicit incremental_reader_selector(schema_ptr s,
lw_shared_ptr<const sstable_set> sstables,
const dht::partition_range& pr,
tracing::trace_state_ptr trace_state,
sstable_reader_factory_type fn)
: reader_selector(s, pr.start() ? pr.start()->value() : dht::ring_position_view::min())
, _pr(&pr)
, _sstables(std::move(sstables))
, _trace_state(std::move(trace_state))
, _selector(_sstables->make_incremental_selector())
, _fn(std::move(fn)) {
irclogger.trace("{}: created for range: {} with {} sstables",
fmt::ptr(this),
*_pr,
_sstables->all()->size());
}
incremental_reader_selector(const incremental_reader_selector&) = delete;
incremental_reader_selector& operator=(const incremental_reader_selector&) = delete;
incremental_reader_selector(incremental_reader_selector&&) = delete;
incremental_reader_selector& operator=(incremental_reader_selector&&) = delete;
virtual std::vector<flat_mutation_reader> create_new_readers(const std::optional<dht::ring_position_view>& pos) override {
irclogger.trace("{}: {}({})", fmt::ptr(this), __FUNCTION__, seastar::lazy_deref(pos));
auto readers = std::vector<flat_mutation_reader>();
do {
auto selection = _selector->select(_selector_position);
_selector_position = selection.next_position;
irclogger.trace("{}: {} sstables to consider, advancing selector to {}", fmt::ptr(this), selection.sstables.size(),
_selector_position);
readers = boost::copy_range<std::vector<flat_mutation_reader>>(selection.sstables
| boost::adaptors::filtered([this] (auto& sst) { return _read_sstable_gens.emplace(sst->generation()).second; })
| boost::adaptors::transformed([this] (auto& sst) { return this->create_reader(sst); }));
} while (!_selector_position.is_max() && readers.empty() && (!pos || dht::ring_position_tri_compare(*_s, *pos, _selector_position) >= 0));
irclogger.trace("{}: created {} new readers", fmt::ptr(this), readers.size());
// prevents sstable_set::incremental_selector::_current_sstables from holding reference to
// sstables when done selecting.
if (_selector_position.is_max()) {
_selector.reset();
}
return readers;
}
virtual std::vector<flat_mutation_reader> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override {
_pr = &pr;
auto pos = dht::ring_position_view::for_range_start(*_pr);
if (dht::ring_position_tri_compare(*_s, pos, _selector_position) >= 0) {
return create_new_readers(pos);
}
return {};
}
};
// Filter out sstables for reader using bloom filter
static std::vector<shared_sstable>
filter_sstable_for_reader_by_pk(std::vector<shared_sstable>&& sstables, column_family& cf, const schema_ptr& schema,
const dht::partition_range& pr, const key& key) {
const dht::ring_position& pr_key = pr.start()->value();
auto sstable_has_not_key = [&, cmp = dht::ring_position_comparator(*schema)] (const shared_sstable& sst) {
return cmp(pr_key, sst->get_first_decorated_key()) < 0 ||
cmp(pr_key, sst->get_last_decorated_key()) > 0 ||
!sst->filter_has_key(key);
};
sstables.erase(boost::remove_if(sstables, sstable_has_not_key), sstables.end());
return sstables;
}
// Filter out sstables for reader using sstable metadata that keeps track
// of a range for each clustering component.
static std::vector<shared_sstable>
filter_sstable_for_reader_by_ck(std::vector<shared_sstable>&& sstables, column_family& cf, const schema_ptr& schema,
const query::partition_slice& slice) {
// no clustering filtering is applied if schema defines no clustering key or
// compaction strategy thinks it will not benefit from such an optimization,
// or the partition_slice includes static columns.
if (!schema->clustering_key_size() || !cf.get_compaction_strategy().use_clustering_key_filter() || slice.static_columns.size()) {
return sstables;
}
::cf_stats* stats = cf.cf_stats();
stats->clustering_filter_count++;
stats->sstables_checked_by_clustering_filter += sstables.size();
auto ck_filtering_all_ranges = slice.get_all_ranges();
// fast path to include all sstables if only one full range was specified.
// For example, this happens if query only specifies a partition key.
if (ck_filtering_all_ranges.size() == 1 && ck_filtering_all_ranges[0].is_full()) {
stats->clustering_filter_fast_path_count++;
stats->surviving_sstables_after_clustering_filter += sstables.size();
return sstables;
}
auto skipped = std::partition(sstables.begin(), sstables.end(), [&ranges = ck_filtering_all_ranges] (const shared_sstable& sst) {
return sst->may_contain_rows(ranges);
});
sstables.erase(skipped, sstables.end());
stats->surviving_sstables_after_clustering_filter += sstables.size();
return sstables;
}
flat_mutation_reader
sstable_set::create_single_key_sstable_reader(
column_family* cf,
schema_ptr schema,
reader_permit permit,
utils::estimated_histogram& sstable_histogram,
const dht::partition_range& pr,
const query::partition_slice& slice,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr) const
{
auto& pk = pr.start()->value().key();
auto key = key::from_partition_key(*schema, *pk);
auto selected_sstables = filter_sstable_for_reader_by_pk(select(pr), *cf, schema, pr, key);
auto num_sstables = selected_sstables.size();
if (!num_sstables) {
return make_empty_flat_reader(schema, permit);
}
auto readers = boost::copy_range<std::vector<flat_mutation_reader>>(
filter_sstable_for_reader_by_ck(std::move(selected_sstables), *cf, schema, slice)
| boost::adaptors::transformed([&] (const shared_sstable& sstable) {
tracing::trace(trace_state, "Reading key {} from sstable {}", pr, seastar::value_of([&sstable] { return sstable->get_filename(); }));
return sstable->read_row_flat(schema, permit, pr.start()->value(), slice, pc, trace_state, fwd);
})
);
// If filter_sstable_for_reader_by_ck filtered any sstable that contains the partition
// we want to emit partition_start/end if no rows were found,
// to prevent https://github.com/scylladb/scylla/issues/3552.
//
// Use `flat_mutation_reader_from_mutations` with an empty mutation to emit
// the partition_start/end pair and append it to the list of readers passed
// to make_combined_reader to ensure partition_start/end are emitted even if
// all sstables actually containing the partition were filtered.
auto num_readers = readers.size();
if (num_readers != num_sstables) {
readers.push_back(flat_mutation_reader_from_mutations(permit, {mutation(schema, *pk)}, slice, fwd));
}
sstable_histogram.add(num_readers);
return make_combined_reader(schema, std::move(permit), std::move(readers), fwd, fwd_mr);
}
flat_mutation_reader
sstable_set::make_range_sstable_reader(
schema_ptr s,
reader_permit permit,
const dht::partition_range& pr,
const query::partition_slice& slice,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr,
read_monitor_generator& monitor_generator) const
{
auto reader_factory_fn = [s, permit, &slice, &pc, trace_state, fwd, fwd_mr, &monitor_generator]
(shared_sstable& sst, const dht::partition_range& pr) mutable {
return sst->read_range_rows_flat(s, permit, pr, slice, pc, trace_state, fwd, fwd_mr, monitor_generator(sst));
};
return make_combined_reader(s, std::move(permit), std::make_unique<incremental_reader_selector>(s,
shared_from_this(),
pr,
std::move(trace_state),
std::move(reader_factory_fn)),
fwd,
fwd_mr);
}
flat_mutation_reader
sstable_set::make_local_shard_sstable_reader(
schema_ptr s,
reader_permit permit,
const dht::partition_range& pr,
const query::partition_slice& slice,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr,
read_monitor_generator& monitor_generator) const
{
auto reader_factory_fn = [s, permit, &slice, &pc, trace_state, fwd, fwd_mr, &monitor_generator]
(shared_sstable& sst, const dht::partition_range& pr) mutable {
flat_mutation_reader reader = sst->read_range_rows_flat(s, permit, pr, slice, pc,
trace_state, fwd, fwd_mr, monitor_generator(sst));
if (sst->is_shared()) {
auto filter = [&s = *s](const dht::decorated_key& dk) -> bool {
return dht::shard_of(s, dk.token()) == this_shard_id();
};
reader = make_filtering_reader(std::move(reader), std::move(filter));
}
return reader;
};
return make_combined_reader(s, std::move(permit), std::make_unique<incremental_reader_selector>(s,
shared_from_this(),
pr,
std::move(trace_state),
std::move(reader_factory_fn)),
fwd,
fwd_mr);
}
flat_mutation_reader make_restricted_range_sstable_reader(
lw_shared_ptr<sstable_set> sstables,
schema_ptr s,
reader_permit permit,
const dht::partition_range& pr,
const query::partition_slice& slice,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr,
read_monitor_generator& monitor_generator)
{
auto ms = mutation_source([sstables=std::move(sstables), &monitor_generator] (
schema_ptr s,
reader_permit permit,
const dht::partition_range& pr,
const query::partition_slice& slice,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr) {
return sstables->make_range_sstable_reader(std::move(s), std::move(permit), pr, slice, pc,
std::move(trace_state), fwd, fwd_mr, monitor_generator);
});
return make_restricted_flat_reader(std::move(ms), std::move(s), std::move(permit), pr, slice, pc, std::move(trace_state), fwd, fwd_mr);
}
} // namespace sstables

View File

@@ -21,11 +21,17 @@
#pragma once
#include "flat_mutation_reader.hh"
#include "sstables/progress_monitor.hh"
#include "shared_sstable.hh"
#include "dht/i_partitioner.hh"
#include <seastar/core/shared_ptr.hh>
#include <vector>
namespace utils {
class estimated_histogram;
}
namespace sstables {
class sstable_set_impl;
@@ -43,7 +49,7 @@ public:
const sstable_list& all() const { return _all; }
};
class sstable_set {
class sstable_set : public enable_lw_shared_from_this<sstable_set> {
std::unique_ptr<sstable_set_impl> _impl;
schema_ptr _schema;
// used to support column_family::get_sstable(), which wants to return an sstable_list
@@ -99,8 +105,63 @@ public:
selection select(const dht::ring_position_view& pos) const;
};
incremental_selector make_incremental_selector() const;
flat_mutation_reader create_single_key_sstable_reader(
column_family*,
schema_ptr,
reader_permit,
utils::estimated_histogram&,
const dht::partition_range&, // must be singular
const query::partition_slice&,
const io_priority_class&,
tracing::trace_state_ptr,
streamed_mutation::forwarding,
mutation_reader::forwarding) const;
/// Read a range from the sstable set.
///
/// The reader is unrestricted, but will account its resource usage on the
/// semaphore belonging to the passed-in permit.
flat_mutation_reader make_range_sstable_reader(
schema_ptr,
reader_permit,
const dht::partition_range&,
const query::partition_slice&,
const io_priority_class&,
tracing::trace_state_ptr,
streamed_mutation::forwarding,
mutation_reader::forwarding,
read_monitor_generator& rmg = default_read_monitor_generator()) const;
// Filters out mutations that don't belong to the current shard.
flat_mutation_reader make_local_shard_sstable_reader(
schema_ptr,
reader_permit,
const dht::partition_range&,
const query::partition_slice&,
const io_priority_class&,
tracing::trace_state_ptr,
streamed_mutation::forwarding,
mutation_reader::forwarding,
read_monitor_generator& rmg = default_read_monitor_generator()) const;
};
/// Read a range from the passed-in sstables.
///
/// The reader is restricted, that is it will wait for admission on the semaphore
/// belonging to the passed-in permit, before starting to read.
flat_mutation_reader make_restricted_range_sstable_reader(
lw_shared_ptr<sstable_set> sstables,
schema_ptr,
reader_permit,
const dht::partition_range&,
const query::partition_slice&,
const io_priority_class&,
tracing::trace_state_ptr,
streamed_mutation::forwarding,
mutation_reader::forwarding,
read_monitor_generator& rmg = default_read_monitor_generator());
sstable_set make_partitioned_sstable_set(schema_ptr schema, lw_shared_ptr<sstable_list> all, bool use_level_metadata = true);
std::ostream& operator<<(std::ostream& os, const sstables::sstable_run& run);

262
table.cc
View File

@@ -25,10 +25,7 @@
#include "service/priority_manager.hh"
#include "db/schema_tables.hh"
#include "cell_locking.hh"
#include "mutation_fragment.hh"
#include "mutation_partition.hh"
#include "utils/logalloc.hh"
#include "sstables/progress_monitor.hh"
#include "checked-file-impl.hh"
#include "view_info.hh"
#include "db/data_listeners.hh"
@@ -52,229 +49,6 @@ static seastar::metrics::label keyspace_label("ks");
using namespace std::chrono_literals;
// Filter out sstables for reader using bloom filter
static std::vector<sstables::shared_sstable>
filter_sstable_for_reader_by_pk(std::vector<sstables::shared_sstable>&& sstables, column_family& cf, const schema_ptr& schema,
const dht::partition_range& pr, const sstables::key& key) {
const dht::ring_position& pr_key = pr.start()->value();
auto sstable_has_not_key = [&, cmp = dht::ring_position_comparator(*schema)] (const sstables::shared_sstable& sst) {
return cmp(pr_key, sst->get_first_decorated_key()) < 0 ||
cmp(pr_key, sst->get_last_decorated_key()) > 0 ||
!sst->filter_has_key(key);
};
sstables.erase(boost::remove_if(sstables, sstable_has_not_key), sstables.end());
return sstables;
}
// Filter out sstables for reader using sstable metadata that keeps track
// of a range for each clustering component.
static std::vector<sstables::shared_sstable>
filter_sstable_for_reader_by_ck(std::vector<sstables::shared_sstable>&& sstables, column_family& cf, const schema_ptr& schema,
const query::partition_slice& slice) {
// no clustering filtering is applied if schema defines no clustering key or
// compaction strategy thinks it will not benefit from such an optimization,
// or the partition_slice includes static columns.
if (!schema->clustering_key_size() || !cf.get_compaction_strategy().use_clustering_key_filter() || slice.static_columns.size()) {
return sstables;
}
::cf_stats* stats = cf.cf_stats();
stats->clustering_filter_count++;
stats->sstables_checked_by_clustering_filter += sstables.size();
auto ck_filtering_all_ranges = slice.get_all_ranges();
// fast path to include all sstables if only one full range was specified.
// For example, this happens if query only specifies a partition key.
if (ck_filtering_all_ranges.size() == 1 && ck_filtering_all_ranges[0].is_full()) {
stats->clustering_filter_fast_path_count++;
stats->surviving_sstables_after_clustering_filter += sstables.size();
return sstables;
}
auto skipped = std::partition(sstables.begin(), sstables.end(), [&ranges = ck_filtering_all_ranges] (const sstables::shared_sstable& sst) {
return sst->may_contain_rows(ranges);
});
sstables.erase(skipped, sstables.end());
stats->surviving_sstables_after_clustering_filter += sstables.size();
return sstables;
}
// Incremental selector implementation for combined_mutation_reader that
// selects readers on-demand as the read progresses through the token
// range.
class incremental_reader_selector : public reader_selector {
const dht::partition_range* _pr;
lw_shared_ptr<sstables::sstable_set> _sstables;
tracing::trace_state_ptr _trace_state;
std::optional<sstables::sstable_set::incremental_selector> _selector;
std::unordered_set<int64_t> _read_sstable_gens;
sstable_reader_factory_type _fn;
flat_mutation_reader create_reader(sstables::shared_sstable sst) {
tracing::trace(_trace_state, "Reading partition range {} from sstable {}", *_pr, seastar::value_of([&sst] { return sst->get_filename(); }));
return _fn(sst, *_pr);
}
public:
explicit incremental_reader_selector(schema_ptr s,
lw_shared_ptr<sstables::sstable_set> sstables,
const dht::partition_range& pr,
tracing::trace_state_ptr trace_state,
sstable_reader_factory_type fn)
: reader_selector(s, pr.start() ? pr.start()->value() : dht::ring_position_view::min())
, _pr(&pr)
, _sstables(std::move(sstables))
, _trace_state(std::move(trace_state))
, _selector(_sstables->make_incremental_selector())
, _fn(std::move(fn)) {
tlogger.trace("incremental_reader_selector {}: created for range: {} with {} sstables",
fmt::ptr(this),
*_pr,
_sstables->all()->size());
}
incremental_reader_selector(const incremental_reader_selector&) = delete;
incremental_reader_selector& operator=(const incremental_reader_selector&) = delete;
incremental_reader_selector(incremental_reader_selector&&) = delete;
incremental_reader_selector& operator=(incremental_reader_selector&&) = delete;
virtual std::vector<flat_mutation_reader> create_new_readers(const std::optional<dht::ring_position_view>& pos) override {
tlogger.trace("incremental_reader_selector {}: {}({})", fmt::ptr(this), __FUNCTION__, seastar::lazy_deref(pos));
auto readers = std::vector<flat_mutation_reader>();
do {
auto selection = _selector->select(_selector_position);
_selector_position = selection.next_position;
tlogger.trace("incremental_reader_selector {}: {} sstables to consider, advancing selector to {}", fmt::ptr(this), selection.sstables.size(),
_selector_position);
readers = boost::copy_range<std::vector<flat_mutation_reader>>(selection.sstables
| boost::adaptors::filtered([this] (auto& sst) { return _read_sstable_gens.emplace(sst->generation()).second; })
| boost::adaptors::transformed([this] (auto& sst) { return this->create_reader(sst); }));
} while (!_selector_position.is_max() && readers.empty() && (!pos || dht::ring_position_tri_compare(*_s, *pos, _selector_position) >= 0));
tlogger.trace("incremental_reader_selector {}: created {} new readers", fmt::ptr(this), readers.size());
// prevents sstable_set::incremental_selector::_current_sstables from holding reference to
// sstables when done selecting.
if (_selector_position.is_max()) {
_selector.reset();
}
return readers;
}
virtual std::vector<flat_mutation_reader> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override {
_pr = &pr;
auto pos = dht::ring_position_view::for_range_start(*_pr);
if (dht::ring_position_tri_compare(*_s, pos, _selector_position) >= 0) {
return create_new_readers(pos);
}
return {};
}
};
static flat_mutation_reader
create_single_key_sstable_reader(column_family* cf,
schema_ptr schema,
reader_permit permit,
lw_shared_ptr<sstables::sstable_set> sstables,
utils::estimated_histogram& sstable_histogram,
const dht::partition_range& pr, // must be singular
const query::partition_slice& slice,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr)
{
auto& pk = pr.start()->value().key();
auto key = sstables::key::from_partition_key(*schema, *pk);
auto selected_sstables = filter_sstable_for_reader_by_pk(sstables->select(pr), *cf, schema, pr, key);
auto num_sstables = selected_sstables.size();
if (!num_sstables) {
return make_empty_flat_reader(schema, permit);
}
auto readers = boost::copy_range<std::vector<flat_mutation_reader>>(
filter_sstable_for_reader_by_ck(std::move(selected_sstables), *cf, schema, slice)
| boost::adaptors::transformed([&] (const sstables::shared_sstable& sstable) {
tracing::trace(trace_state, "Reading key {} from sstable {}", pr, seastar::value_of([&sstable] { return sstable->get_filename(); }));
return sstable->read_row_flat(schema, permit, pr.start()->value(), slice, pc, trace_state, fwd);
})
);
// If filter_sstable_for_reader_by_ck filtered any sstable that contains the partition
// we want to emit partition_start/end if no rows were found,
// to prevent https://github.com/scylladb/scylla/issues/3552.
//
// Use `flat_mutation_reader_from_mutations` with an empty mutation to emit
// the partition_start/end pair and append it to the list of readers passed
// to make_combined_reader to ensure partition_start/end are emitted even if
// all sstables actually containing the partition were filtered.
auto num_readers = readers.size();
if (num_readers != num_sstables) {
readers.push_back(flat_mutation_reader_from_mutations(permit, {mutation(schema, *pk)}, slice, fwd));
}
sstable_histogram.add(num_readers);
return make_combined_reader(schema, std::move(permit), std::move(readers), fwd, fwd_mr);
}
flat_mutation_reader make_range_sstable_reader(schema_ptr s,
reader_permit permit,
lw_shared_ptr<sstables::sstable_set> sstables,
const dht::partition_range& pr,
const query::partition_slice& slice,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr,
sstables::read_monitor_generator& monitor_generator)
{
auto reader_factory_fn = [s, permit, &slice, &pc, trace_state, fwd, fwd_mr, &monitor_generator]
(sstables::shared_sstable& sst, const dht::partition_range& pr) mutable {
return sst->read_range_rows_flat(s, permit, pr, slice, pc, trace_state, fwd, fwd_mr, monitor_generator(sst));
};
return make_combined_reader(s, std::move(permit), std::make_unique<incremental_reader_selector>(s,
std::move(sstables),
pr,
std::move(trace_state),
std::move(reader_factory_fn)),
fwd,
fwd_mr);
}
flat_mutation_reader make_restricted_range_sstable_reader(schema_ptr s,
reader_permit permit,
lw_shared_ptr<sstables::sstable_set> sstables,
const dht::partition_range& pr,
const query::partition_slice& slice,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr,
sstables::read_monitor_generator& monitor_generator)
{
auto ms = mutation_source([sstables=std::move(sstables), &monitor_generator] (
schema_ptr s,
reader_permit permit,
const dht::partition_range& pr,
const query::partition_slice& slice,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr) {
return make_range_sstable_reader(std::move(s), std::move(permit), std::move(sstables), pr, slice, pc,
std::move(trace_state), fwd, fwd_mr, monitor_generator);
});
return make_restricted_flat_reader(std::move(ms), std::move(s), std::move(permit), pr, slice, pc, std::move(trace_state), fwd, fwd_mr);
}
flat_mutation_reader
table::make_sstable_reader(schema_ptr s,
reader_permit permit,
@@ -315,7 +89,7 @@ table::make_sstable_reader(schema_ptr s,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr) {
return create_single_key_sstable_reader(const_cast<column_family*>(this), std::move(s), std::move(permit), std::move(sstables),
return sstables->create_single_key_sstable_reader(const_cast<column_family*>(this), std::move(s), std::move(permit),
_stats.estimated_sstable_per_read, pr, slice, pc, std::move(trace_state), fwd, fwd_mr);
});
} else {
@@ -328,7 +102,7 @@ table::make_sstable_reader(schema_ptr s,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr) {
return make_local_shard_sstable_reader(std::move(s), std::move(permit), std::move(sstables), pr, slice, pc,
return sstables->make_local_shard_sstable_reader(std::move(s), std::move(permit), pr, slice, pc,
std::move(trace_state), fwd, fwd_mr);
});
}
@@ -519,38 +293,6 @@ static bool belongs_to_other_shard(const std::vector<shard_id>& shards) {
return shards.size() != size_t(belongs_to_current_shard(shards));
}
flat_mutation_reader make_local_shard_sstable_reader(schema_ptr s,
reader_permit permit,
lw_shared_ptr<sstables::sstable_set> sstables,
const dht::partition_range& pr,
const query::partition_slice& slice,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr,
sstables::read_monitor_generator& monitor_generator)
{
auto reader_factory_fn = [s, permit, &slice, &pc, trace_state, fwd, fwd_mr, &monitor_generator]
(sstables::shared_sstable& sst, const dht::partition_range& pr) mutable {
flat_mutation_reader reader = sst->read_range_rows_flat(s, permit, pr, slice, pc,
trace_state, fwd, fwd_mr, monitor_generator(sst));
if (sst->is_shared()) {
auto filter = [&s = *s](const dht::decorated_key& dk) -> bool {
return dht::shard_of(s, dk.token()) == this_shard_id();
};
reader = make_filtering_reader(std::move(reader), std::move(filter));
}
return reader;
};
return make_combined_reader(s, std::move(permit), std::make_unique<incremental_reader_selector>(s,
std::move(sstables),
pr,
std::move(trace_state),
std::move(reader_factory_fn)),
fwd,
fwd_mr);
}
sstables::shared_sstable table::make_sstable(sstring dir, int64_t generation, sstables::sstable_version_types v, sstables::sstable_format_types f,
io_error_handler_gen error_handler_gen) {
return get_sstables_manager().make_sstable(_schema, dir, generation, v, f, gc_clock::now(), error_handler_gen);

View File

@@ -646,10 +646,9 @@ SEASTAR_THREAD_TEST_CASE(combined_mutation_reader_test) {
auto list_reader = make_combined_reader(s.schema(), tests::make_permit(),
std::move(sstable_mutation_readers));
auto incremental_reader = make_local_shard_sstable_reader(
auto incremental_reader = sstable_set->make_local_shard_sstable_reader(
s.schema(),
tests::make_permit(),
sstable_set,
query::full_partition_range,
s.schema()->full_slice(),
seastar::default_priority_class(),

View File

@@ -6083,9 +6083,8 @@ SEASTAR_TEST_CASE(purged_tombstone_consumer_sstable_test) {
for (auto&& sst : all) {
compacting->insert(std::move(sst));
}
auto reader = ::make_range_sstable_reader(s,
auto reader = compacting->make_range_sstable_reader(s,
tests::make_permit(),
compacting,
query::full_partition_range,
s->full_slice(),
service::get_local_compaction_priority(),