mutation_reader: update make_filtering_reader() to flat_mutation_reader_v2

As part of the drive to move over to flat_mutation_reader_v2, update
make_filtering_reader(). Since it doesn't examine range tombstones
(only the partition_start, to filter the key) the entire patch
is just glue code upgrading and downgrading users in the pipeline
(or removing a conversion, in one case).

Test: unit (dev)

Closes #9723
This commit is contained in:
Avi Kivity
2021-12-03 13:13:28 +02:00
parent 6737c88045
commit 395b30bca8
6 changed files with 25 additions and 25 deletions

View File

@@ -1135,7 +1135,7 @@ public:
: cleanup_compaction(table_s, std::move(descriptor), cdata, opts.owned_ranges) {}
flat_mutation_reader make_sstable_reader() const override {
return make_filtering_reader(regular_compaction::make_sstable_reader(), make_partition_filter());
return downgrade_to_v1(make_filtering_reader(upgrade_to_v2(regular_compaction::make_sstable_reader()), make_partition_filter()));
}
std::string_view report_start_desc() const override {

View File

@@ -85,13 +85,13 @@ flat_mutation_reader toppartitions_data_listener::on_read(const schema_ptr& s, c
if (include_all || _keyspace_filters.contains(s->ks_name()) || _table_filters.contains({s->ks_name(), s->cf_name()})) {
dblog.trace("toppartitions_data_listener::on_read: {}.{}", s->ks_name(), s->cf_name());
return make_filtering_reader(std::move(rd), [zis = this->weak_from_this(), &range, &slice, s = std::move(s)] (const dht::decorated_key& dk) {
return downgrade_to_v1(make_filtering_reader(upgrade_to_v2(std::move(rd)), [zis = this->weak_from_this(), &range, &slice, s = std::move(s)] (const dht::decorated_key& dk) {
// The data query may be executing after the toppartitions_data_listener object has been removed, so check
if (zis) {
zis->_top_k_read.append(toppartitions_item_key{s, dk});
}
return true;
});
}));
}
return std::move(rd);

View File

@@ -92,9 +92,9 @@ mutation_source memtable_filling_virtual_table::as_mutation_source() {
auto rd = mt->as_data_source().make_reader_v2(s, units->units.permit(), range, slice, pc, trace_state, fwd, fwd_mr);
if (!_shard_aware) {
rd = upgrade_to_v2(make_filtering_reader(downgrade_to_v1(std::move(rd)), [this] (const dht::decorated_key& dk) -> bool {
rd = make_filtering_reader(std::move(rd), [this] (const dht::decorated_key& dk) -> bool {
return this_shard_owns(dk);
}));
});
}
return rd;
@@ -175,9 +175,9 @@ mutation_source streaming_virtual_table::as_mutation_source() {
auto rd = make_slicing_filtering_reader(std::move(reader_and_handle.first), pr, slice);
if (!_shard_aware) {
rd = make_filtering_reader(std::move(rd), [this] (const dht::decorated_key& dk) -> bool {
rd = downgrade_to_v1(make_filtering_reader(upgrade_to_v2(std::move(rd)), [this] (const dht::decorated_key& dk) -> bool {
return this_shard_owns(dk);
});
}));
}
if (reversed) {

View File

@@ -74,12 +74,12 @@ template <typename MutationFilter>
requires requires(MutationFilter mf, const dht::decorated_key& dk) {
{ mf(dk) } -> std::same_as<bool>;
}
class filtering_reader : public flat_mutation_reader::impl {
flat_mutation_reader _rd;
class filtering_reader : public flat_mutation_reader_v2::impl {
flat_mutation_reader_v2 _rd;
MutationFilter _filter;
static_assert(std::is_same<bool, std::result_of_t<MutationFilter(const dht::decorated_key&)>>::value, "bad MutationFilter signature");
public:
filtering_reader(flat_mutation_reader rd, MutationFilter&& filter)
filtering_reader(flat_mutation_reader_v2 rd, MutationFilter&& filter)
: impl(rd.schema(), rd.permit())
, _rd(std::move(rd))
, _filter(std::forward<MutationFilter>(filter)) {
@@ -132,8 +132,8 @@ public:
// accepts mutation const& and returns a bool. The mutation stays in the
// stream if and only if the filter returns true.
template <typename MutationFilter>
flat_mutation_reader make_filtering_reader(flat_mutation_reader rd, MutationFilter&& filter) {
return make_flat_mutation_reader<filtering_reader<MutationFilter>>(std::move(rd), std::forward<MutationFilter>(filter));
flat_mutation_reader_v2 make_filtering_reader(flat_mutation_reader_v2 rd, MutationFilter&& filter) {
return make_flat_mutation_reader_v2<filtering_reader<MutationFilter>>(std::move(rd), std::forward<MutationFilter>(filter));
}
/// Create a wrapper that filters fragments according to partition range and slice.

View File

@@ -41,11 +41,11 @@ public:
virtual flat_mutation_reader on_read(const schema_ptr& s, const dht::partition_range& range,
const query::partition_slice& slice, flat_mutation_reader&& rd) override {
if (s->cf_name() == _cf_name) {
return make_filtering_reader(std::move(rd), [this, &range, &slice, s = std::move(s)] (const dht::decorated_key& dk) {
return downgrade_to_v1(make_filtering_reader(upgrade_to_v2(std::move(rd)), [this, &range, &slice, s = std::move(s)] (const dht::decorated_key& dk) {
testlog.info("listener {}: read {}", fmt::ptr(this), dk);
++read;
return true;
});
}));
}
return std::move(rd);
}

View File

@@ -251,7 +251,7 @@ SEASTAR_TEST_CASE(test_filtering) {
auto m4 = make_mutation_with_key(s, "key4");
// All pass
assert_that(make_filtering_reader(make_flat_mutation_reader_from_mutations(s, semaphore.make_permit(), {m1, m2, m3, m4}),
assert_that(make_filtering_reader(upgrade_to_v2(make_flat_mutation_reader_from_mutations(s, semaphore.make_permit(), {m1, m2, m3, m4})),
[] (const dht::decorated_key& dk) { return true; }))
.produces(m1)
.produces(m2)
@@ -260,47 +260,47 @@ SEASTAR_TEST_CASE(test_filtering) {
.produces_end_of_stream();
// None pass
assert_that(make_filtering_reader(make_flat_mutation_reader_from_mutations(s, semaphore.make_permit(), {m1, m2, m3, m4}),
assert_that(make_filtering_reader(upgrade_to_v2(make_flat_mutation_reader_from_mutations(s, semaphore.make_permit(), {m1, m2, m3, m4})),
[] (const dht::decorated_key& dk) { return false; }))
.produces_end_of_stream();
// Trim front
assert_that(make_filtering_reader(make_flat_mutation_reader_from_mutations(s, semaphore.make_permit(), {m1, m2, m3, m4}),
assert_that(make_filtering_reader(upgrade_to_v2(make_flat_mutation_reader_from_mutations(s, semaphore.make_permit(), {m1, m2, m3, m4})),
[&] (const dht::decorated_key& dk) { return !dk.key().equal(*s, m1.key()); }))
.produces(m2)
.produces(m3)
.produces(m4)
.produces_end_of_stream();
assert_that(make_filtering_reader(make_flat_mutation_reader_from_mutations(s, semaphore.make_permit(), {m1, m2, m3, m4}),
assert_that(make_filtering_reader(upgrade_to_v2(make_flat_mutation_reader_from_mutations(s, semaphore.make_permit(), {m1, m2, m3, m4})),
[&] (const dht::decorated_key& dk) { return !dk.key().equal(*s, m1.key()) && !dk.key().equal(*s, m2.key()); }))
.produces(m3)
.produces(m4)
.produces_end_of_stream();
// Trim back
assert_that(make_filtering_reader(make_flat_mutation_reader_from_mutations(s, semaphore.make_permit(), {m1, m2, m3, m4}),
assert_that(make_filtering_reader(upgrade_to_v2(make_flat_mutation_reader_from_mutations(s, semaphore.make_permit(), {m1, m2, m3, m4})),
[&] (const dht::decorated_key& dk) { return !dk.key().equal(*s, m4.key()); }))
.produces(m1)
.produces(m2)
.produces(m3)
.produces_end_of_stream();
assert_that(make_filtering_reader(make_flat_mutation_reader_from_mutations(s, semaphore.make_permit(), {m1, m2, m3, m4}),
assert_that(make_filtering_reader(upgrade_to_v2(make_flat_mutation_reader_from_mutations(s, semaphore.make_permit(), {m1, m2, m3, m4})),
[&] (const dht::decorated_key& dk) { return !dk.key().equal(*s, m4.key()) && !dk.key().equal(*s, m3.key()); }))
.produces(m1)
.produces(m2)
.produces_end_of_stream();
// Trim middle
assert_that(make_filtering_reader(make_flat_mutation_reader_from_mutations(s, semaphore.make_permit(), {m1, m2, m3, m4}),
assert_that(make_filtering_reader(upgrade_to_v2(make_flat_mutation_reader_from_mutations(s, semaphore.make_permit(), {m1, m2, m3, m4})),
[&] (const dht::decorated_key& dk) { return !dk.key().equal(*s, m3.key()); }))
.produces(m1)
.produces(m2)
.produces(m4)
.produces_end_of_stream();
assert_that(make_filtering_reader(make_flat_mutation_reader_from_mutations(s, semaphore.make_permit(), {m1, m2, m3, m4}),
assert_that(make_filtering_reader(upgrade_to_v2(make_flat_mutation_reader_from_mutations(s, semaphore.make_permit(), {m1, m2, m3, m4})),
[&] (const dht::decorated_key& dk) { return !dk.key().equal(*s, m2.key()) && !dk.key().equal(*s, m3.key()); }))
.produces(m1)
.produces(m4)
@@ -2376,12 +2376,12 @@ SEASTAR_THREAD_TEST_CASE(test_multishard_streaming_reader) {
return table.as_mutation_source().make_reader(std::move(s), std::move(permit), range, slice, pc, std::move(trace_state),
streamed_mutation::forwarding::no, fwd_mr);
};
auto reference_reader = make_filtering_reader(
auto reference_reader = downgrade_to_v1(make_filtering_reader(upgrade_to_v2(
make_multishard_combining_reader(seastar::make_shared<test_reader_lifecycle_policy>(std::move(reader_factory)),
schema, make_reader_permit(env), partition_range, schema->full_slice(), service::get_local_sstable_query_read_priority()),
schema, make_reader_permit(env), partition_range, schema->full_slice(), service::get_local_sstable_query_read_priority())),
[&remote_partitioner] (const dht::decorated_key& pkey) {
return remote_partitioner.shard_of(pkey.token()) == 0;
});
}));
auto close_reference_reader = deferred_close(reference_reader);
std::vector<mutation> reference_muts;