sstable_set: refactor filter_sstable_for_reader_by_pk
Introduce a `make_pk_filter` function, which given a ring position, returns a boolean function (a filter) that given a sstable, tells whether the sstable may contain rows with the given position. The logic has been extracted from `filter_sstable_for_reader_by_pk`.
This commit is contained in:
@@ -471,16 +471,27 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
// The returned function uses the bloom filter to check whether the given sstable
|
||||
// may have a partition given by the ring position `pos`.
|
||||
//
|
||||
// Returning `false` means the sstable doesn't have such a partition.
|
||||
// Returning `true` means it may, i.e. we don't know whether or not it does.
|
||||
//
|
||||
// Assumes the given `pos` and `schema` are alive during the function's lifetime.
|
||||
static std::predicate<const sstable&> auto
|
||||
make_pk_filter(const dht::ring_position& pos, const schema& schema) {
|
||||
return [&pos, key = key::from_partition_key(schema, *pos.key()), cmp = dht::ring_position_comparator(schema)] (const sstable& sst) {
|
||||
return cmp(pos, sst.get_first_decorated_key()) >= 0 &&
|
||||
cmp(pos, sst.get_last_decorated_key()) <= 0 &&
|
||||
sst.filter_has_key(key);
|
||||
};
|
||||
}
|
||||
|
||||
// Filter out sstables for reader using bloom filter
|
||||
static std::vector<shared_sstable>
|
||||
filter_sstable_for_reader_by_pk(std::vector<shared_sstable>&& sstables, column_family& cf, const schema_ptr& schema,
|
||||
const dht::ring_position& pos, const key& key) {
|
||||
auto sstable_has_not_key = [&, cmp = dht::ring_position_comparator(*schema)] (const shared_sstable& sst) {
|
||||
return cmp(pos, sst->get_first_decorated_key()) < 0 ||
|
||||
cmp(pos, sst->get_last_decorated_key()) > 0 ||
|
||||
!sst->filter_has_key(key);
|
||||
};
|
||||
sstables.erase(boost::remove_if(sstables, sstable_has_not_key), sstables.end());
|
||||
filter_sstable_for_reader_by_pk(std::vector<shared_sstable>&& sstables, const schema& schema, const dht::ring_position& pos) {
|
||||
auto filter = [_filter = make_pk_filter(pos, schema)] (const shared_sstable& sst) { return !_filter(*sst); };
|
||||
sstables.erase(boost::remove_if(sstables, filter), sstables.end());
|
||||
return sstables;
|
||||
}
|
||||
|
||||
@@ -531,9 +542,7 @@ sstable_set::create_single_key_sstable_reader(
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr) const
|
||||
{
|
||||
auto& pk = pos.key();
|
||||
auto key = key::from_partition_key(*schema, *pk);
|
||||
auto selected_sstables = filter_sstable_for_reader_by_pk(select({pos}), *cf, schema, pos, key);
|
||||
auto selected_sstables = filter_sstable_for_reader_by_pk(select({pos}), *schema, pos);
|
||||
auto num_sstables = selected_sstables.size();
|
||||
if (!num_sstables) {
|
||||
return make_empty_flat_reader(schema, permit);
|
||||
@@ -556,7 +565,7 @@ sstable_set::create_single_key_sstable_reader(
|
||||
// all sstables actually containing the partition were filtered.
|
||||
auto num_readers = readers.size();
|
||||
if (num_readers != num_sstables) {
|
||||
readers.push_back(flat_mutation_reader_from_mutations(permit, {mutation(schema, *pk)}, slice, fwd));
|
||||
readers.push_back(flat_mutation_reader_from_mutations(permit, {mutation(schema, *pos.key())}, slice, fwd));
|
||||
}
|
||||
sstable_histogram.add(num_readers);
|
||||
return make_combined_reader(schema, std::move(permit), std::move(readers), fwd, fwd_mr);
|
||||
|
||||
Reference in New Issue
Block a user