mutation_reader: convert make_combined_reader() overloads to v2

Just sprinkle the right amount downgrade_to_v1() and upgrade_to_v2() to
call sites, no attempts at optimization was done.
This commit is contained in:
Botond Dénes
2021-12-13 14:27:51 +02:00
parent 1554b94b78
commit aeddcf50a1
15 changed files with 159 additions and 166 deletions

View File

@@ -377,14 +377,6 @@ class list_reader_selector : public reader_selector {
std::vector<flat_mutation_reader_v2> _readers;
public:
explicit list_reader_selector(schema_ptr s, std::vector<flat_mutation_reader> readers)
: reader_selector(s, dht::ring_position_view::min()) {
_readers.reserve(readers.size());
for (auto&& rd : readers) {
_readers.push_back(upgrade_to_v2(std::move(rd)));
}
}
explicit list_reader_selector(schema_ptr s, std::vector<flat_mutation_reader_v2> readers)
: reader_selector(s, dht::ring_position_view::min())
, _readers(std::move(readers)) {
@@ -770,24 +762,24 @@ future<> merging_reader<P>::close() noexcept {
return _merger.close();
}
flat_mutation_reader make_combined_reader(schema_ptr schema,
flat_mutation_reader_v2 make_combined_reader(schema_ptr schema,
reader_permit permit,
std::unique_ptr<reader_selector> selector,
streamed_mutation::forwarding fwd_sm,
mutation_reader::forwarding fwd_mr) {
return downgrade_to_v1(make_flat_mutation_reader_v2<merging_reader<mutation_reader_merger>>(schema,
return make_flat_mutation_reader_v2<merging_reader<mutation_reader_merger>>(schema,
std::move(permit),
fwd_sm,
mutation_reader_merger(schema, std::move(selector), fwd_sm, fwd_mr)));
mutation_reader_merger(schema, std::move(selector), fwd_sm, fwd_mr));
}
flat_mutation_reader make_combined_reader(schema_ptr schema,
flat_mutation_reader_v2 make_combined_reader(schema_ptr schema,
reader_permit permit,
std::vector<flat_mutation_reader> readers,
std::vector<flat_mutation_reader_v2> readers,
streamed_mutation::forwarding fwd_sm,
mutation_reader::forwarding fwd_mr) {
if (readers.empty()) {
return make_empty_flat_reader(std::move(schema), std::move(permit));
return make_empty_flat_reader_v2(std::move(schema), std::move(permit));
}
if (readers.size() == 1) {
return std::move(readers.front());
@@ -799,13 +791,13 @@ flat_mutation_reader make_combined_reader(schema_ptr schema,
fwd_mr);
}
flat_mutation_reader make_combined_reader(schema_ptr schema,
flat_mutation_reader_v2 make_combined_reader(schema_ptr schema,
reader_permit permit,
flat_mutation_reader&& a,
flat_mutation_reader&& b,
flat_mutation_reader_v2&& a,
flat_mutation_reader_v2&& b,
streamed_mutation::forwarding fwd_sm,
mutation_reader::forwarding fwd_mr) {
std::vector<flat_mutation_reader> v;
std::vector<flat_mutation_reader_v2> v;
v.reserve(2);
v.push_back(std::move(a));
v.push_back(std::move(b));
@@ -842,13 +834,14 @@ mutation_source make_combined_mutation_source(std::vector<mutation_source> adden
const query::partition_slice& slice,
const io_priority_class& pc,
tracing::trace_state_ptr tr,
streamed_mutation::forwarding fwd) {
std::vector<flat_mutation_reader> rd;
streamed_mutation::forwarding fwd_sm,
mutation_reader::forwarding fwd_mr) {
std::vector<flat_mutation_reader_v2> rd;
rd.reserve(addends.size());
for (auto&& ms : addends) {
rd.emplace_back(ms.make_reader(s, permit, pr, slice, pc, tr, fwd));
rd.emplace_back(ms.make_reader_v2(s, permit, pr, slice, pc, tr, fwd_sm, fwd_mr));
}
return make_combined_reader(s, std::move(permit), std::move(rd), fwd);
return make_combined_reader(s, std::move(permit), std::move(rd), fwd_sm, fwd_mr);
});
}

View File

@@ -53,20 +53,20 @@ public:
// Creates a mutation reader which combines data return by supplied readers.
// Returns mutation of the same schema only when all readers return mutations
// of the same schema.
flat_mutation_reader make_combined_reader(schema_ptr schema,
flat_mutation_reader_v2 make_combined_reader(schema_ptr schema,
reader_permit permit,
std::vector<flat_mutation_reader>,
std::vector<flat_mutation_reader_v2>,
streamed_mutation::forwarding fwd_sm = streamed_mutation::forwarding::no,
mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::yes);
flat_mutation_reader make_combined_reader(schema_ptr schema,
flat_mutation_reader_v2 make_combined_reader(schema_ptr schema,
reader_permit permit,
std::unique_ptr<reader_selector>,
streamed_mutation::forwarding,
mutation_reader::forwarding);
flat_mutation_reader make_combined_reader(schema_ptr schema,
flat_mutation_reader_v2 make_combined_reader(schema_ptr schema,
reader_permit permit,
flat_mutation_reader&& a,
flat_mutation_reader&& b,
flat_mutation_reader_v2&& a,
flat_mutation_reader_v2&& b,
streamed_mutation::forwarding fwd_sm = streamed_mutation::forwarding::no,
mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::yes);

View File

@@ -788,11 +788,11 @@ sstable_set_impl::create_single_key_sstable_reader(
if (!num_sstables) {
return make_empty_flat_reader(schema, permit);
}
auto readers = boost::copy_range<std::vector<flat_mutation_reader>>(
auto readers = boost::copy_range<std::vector<flat_mutation_reader_v2>>(
filter_sstable_for_reader_by_ck(std::move(selected_sstables), *cf, schema, slice)
| boost::adaptors::transformed([&] (const shared_sstable& sstable) {
tracing::trace(trace_state, "Reading key {} from sstable {}", pos, seastar::value_of([&sstable] { return sstable->get_filename(); }));
return sstable->make_reader_v1(schema, permit, pr, slice, pc, trace_state, fwd);
return sstable->make_reader(schema, permit, pr, slice, pc, trace_state, fwd);
})
);
@@ -806,10 +806,10 @@ sstable_set_impl::create_single_key_sstable_reader(
// all sstables actually containing the partition were filtered.
auto num_readers = readers.size();
if (num_readers != num_sstables) {
readers.push_back(make_flat_mutation_reader_from_mutations(schema, permit, {mutation(schema, *pos.key())}, slice, fwd));
readers.push_back(upgrade_to_v2(make_flat_mutation_reader_from_mutations(schema, permit, {mutation(schema, *pos.key())}, slice, fwd)));
}
sstable_histogram.add(num_readers);
return make_combined_reader(schema, std::move(permit), std::move(readers), fwd, fwd_mr);
return downgrade_to_v1(make_combined_reader(schema, std::move(permit), std::move(readers), fwd, fwd_mr));
}
flat_mutation_reader
@@ -1058,13 +1058,13 @@ compound_sstable_set::create_single_key_sstable_reader(
return non_empty_set->create_single_key_sstable_reader(cf, std::move(schema), std::move(permit), sstable_histogram, pr, slice, pc, trace_state, fwd, fwd_mr);
}
auto readers = boost::copy_range<std::vector<flat_mutation_reader>>(
auto readers = boost::copy_range<std::vector<flat_mutation_reader_v2>>(
boost::make_iterator_range(sets.begin(), it)
| boost::adaptors::transformed([&] (const lw_shared_ptr<sstable_set>& non_empty_set) {
return non_empty_set->create_single_key_sstable_reader(cf, schema, permit, sstable_histogram, pr, slice, pc, trace_state, fwd, fwd_mr);
return upgrade_to_v2(non_empty_set->create_single_key_sstable_reader(cf, schema, permit, sstable_histogram, pr, slice, pc, trace_state, fwd, fwd_mr));
})
);
return make_combined_reader(std::move(schema), std::move(permit), std::move(readers), fwd, fwd_mr);
return downgrade_to_v1(make_combined_reader(std::move(schema), std::move(permit), std::move(readers), fwd, fwd_mr));
}
flat_mutation_reader
@@ -1100,13 +1100,13 @@ sstable_set::make_range_sstable_reader(
(shared_sstable& sst, const dht::partition_range& pr) mutable {
return sst->make_reader(s, permit, pr, slice, pc, trace_state, fwd, fwd_mr, monitor_generator(sst));
};
return upgrade_to_v2(make_combined_reader(s, std::move(permit), std::make_unique<incremental_reader_selector>(s,
return make_combined_reader(s, std::move(permit), std::make_unique<incremental_reader_selector>(s,
shared_from_this(),
pr,
std::move(trace_state),
std::move(reader_factory_fn)),
fwd,
fwd_mr));
fwd_mr);
}
flat_mutation_reader_v2
@@ -1130,13 +1130,13 @@ sstable_set::make_local_shard_sstable_reader(
auto sst = *sstables->begin();
return reader_factory_fn(sst, pr);
}
return upgrade_to_v2(make_combined_reader(s, std::move(permit), std::make_unique<incremental_reader_selector>(s,
return make_combined_reader(s, std::move(permit), std::make_unique<incremental_reader_selector>(s,
shared_from_this(),
pr,
std::move(trace_state),
std::move(reader_factory_fn)),
fwd,
fwd_mr));
fwd_mr);
}
flat_mutation_reader_v2 sstable_set::make_crawling_reader(
@@ -1145,11 +1145,11 @@ flat_mutation_reader_v2 sstable_set::make_crawling_reader(
const io_priority_class& pc,
tracing::trace_state_ptr trace_ptr,
read_monitor_generator& monitor_generator) const {
std::vector<flat_mutation_reader> readers;
std::vector<flat_mutation_reader_v2> readers;
_impl->for_each_sstable([&] (const shared_sstable& sst) mutable {
readers.emplace_back(downgrade_to_v1(sst->make_crawling_reader(schema, permit, pc, trace_ptr, monitor_generator(sst))));
readers.emplace_back(sst->make_crawling_reader(schema, permit, pc, trace_ptr, monitor_generator(sst)));
});
return upgrade_to_v2(make_combined_reader(schema, std::move(permit), std::move(readers), streamed_mutation::forwarding::no, mutation_reader::forwarding::no));
return make_combined_reader(schema, std::move(permit), std::move(readers), streamed_mutation::forwarding::no, mutation_reader::forwarding::no);
}
unsigned sstable_set_overlapping_count(const schema_ptr& schema, const std::vector<shared_sstable>& sstables) {

View File

@@ -154,7 +154,7 @@ table::make_reader(schema_ptr s,
return (*_virtual_reader).make_reader(s, std::move(permit), range, slice, pc, trace_state, fwd, fwd_mr);
}
std::vector<flat_mutation_reader> readers;
std::vector<flat_mutation_reader_v2> readers;
readers.reserve(_memtables->size() + 1);
// We're assuming that cache and memtables are both read atomically
@@ -178,7 +178,7 @@ table::make_reader(schema_ptr s,
// https://github.com/scylladb/scylla/issues/185
for (auto&& mt : *_memtables) {
readers.emplace_back(mt->make_flat_reader(s, permit, range, slice, pc, trace_state, fwd, fwd_mr));
readers.emplace_back(upgrade_to_v2(mt->make_flat_reader(s, permit, range, slice, pc, trace_state, fwd, fwd_mr)));
}
const auto bypass_cache = slice.options.contains(query::partition_slice::option::bypass_cache);
@@ -198,12 +198,12 @@ table::make_reader(schema_ptr s,
// FIXME: remove this workaround (and the `reversed_reads_auto_bypass_cache` option) after:
// - support for reversed reads is implemented in the cache,
// - fast forwarding is implemented in reversed sstable readers.
readers.emplace_back(_cache.make_reader(s, permit, range, slice, pc, std::move(trace_state), fwd, fwd_mr));
readers.emplace_back(upgrade_to_v2(_cache.make_reader(s, permit, range, slice, pc, std::move(trace_state), fwd, fwd_mr)));
} else {
readers.emplace_back(make_sstable_reader(s, permit, _sstables, range, slice, pc, std::move(trace_state), fwd, fwd_mr));
readers.emplace_back(upgrade_to_v2(make_sstable_reader(s, permit, _sstables, range, slice, pc, std::move(trace_state), fwd, fwd_mr)));
}
auto comb_reader = make_combined_reader(s, std::move(permit), std::move(readers), fwd, fwd_mr);
auto comb_reader = downgrade_to_v1(make_combined_reader(s, std::move(permit), std::move(readers), fwd, fwd_mr));
if (_config.data_listeners && !_config.data_listeners->empty()) {
return _config.data_listeners->on_read(s, range, slice, std::move(comb_reader));
} else {
@@ -233,13 +233,13 @@ table::make_streaming_reader(schema_ptr s, reader_permit permit,
auto source = mutation_source([this] (schema_ptr s, reader_permit permit, const dht::partition_range& range, const query::partition_slice& slice,
const io_priority_class& pc, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) {
std::vector<flat_mutation_reader> readers;
std::vector<flat_mutation_reader_v2> readers;
readers.reserve(_memtables->size() + 1);
for (auto&& mt : *_memtables) {
readers.emplace_back(mt->make_flat_reader(s, permit, range, slice, pc, trace_state, fwd, fwd_mr));
readers.emplace_back(upgrade_to_v2(mt->make_flat_reader(s, permit, range, slice, pc, trace_state, fwd, fwd_mr)));
}
readers.emplace_back(make_sstable_reader(s, permit, _sstables, range, slice, pc, std::move(trace_state), fwd, fwd_mr));
return make_combined_reader(s, std::move(permit), std::move(readers), fwd, fwd_mr);
readers.emplace_back(upgrade_to_v2(make_sstable_reader(s, permit, _sstables, range, slice, pc, std::move(trace_state), fwd, fwd_mr)));
return downgrade_to_v1(make_combined_reader(s, std::move(permit), std::move(readers), fwd, fwd_mr));
});
return make_flat_multi_range_reader(s, std::move(permit), std::move(source), ranges, slice, pc, nullptr, mutation_reader::forwarding::no);
@@ -251,13 +251,13 @@ flat_mutation_reader table::make_streaming_reader(schema_ptr schema, reader_perm
auto trace_state = tracing::trace_state_ptr();
const auto fwd = streamed_mutation::forwarding::no;
std::vector<flat_mutation_reader> readers;
std::vector<flat_mutation_reader_v2> readers;
readers.reserve(_memtables->size() + 1);
for (auto&& mt : *_memtables) {
readers.emplace_back(mt->make_flat_reader(schema, permit, range, slice, pc, trace_state, fwd, fwd_mr));
readers.emplace_back(upgrade_to_v2(mt->make_flat_reader(schema, permit, range, slice, pc, trace_state, fwd, fwd_mr)));
}
readers.emplace_back(make_sstable_reader(schema, permit, _sstables, range, slice, pc, std::move(trace_state), fwd, fwd_mr));
return make_combined_reader(std::move(schema), std::move(permit), std::move(readers), fwd, fwd_mr);
readers.emplace_back(upgrade_to_v2(make_sstable_reader(schema, permit, _sstables, range, slice, pc, std::move(trace_state), fwd, fwd_mr)));
return downgrade_to_v1(make_combined_reader(std::move(schema), std::move(permit), std::move(readers), fwd, fwd_mr));
}
flat_mutation_reader table::make_streaming_reader(schema_ptr schema, reader_permit permit, const dht::partition_range& range,
@@ -2228,11 +2228,11 @@ table::make_reader_excluding_sstables(schema_ptr s,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr) const {
std::vector<flat_mutation_reader> readers;
std::vector<flat_mutation_reader_v2> readers;
readers.reserve(_memtables->size() + 1);
for (auto&& mt : *_memtables) {
readers.emplace_back(mt->make_flat_reader(s, permit, range, slice, pc, trace_state, fwd, fwd_mr));
readers.emplace_back(upgrade_to_v2(mt->make_flat_reader(s, permit, range, slice, pc, trace_state, fwd, fwd_mr)));
}
auto excluded_ssts = boost::copy_range<std::unordered_set<sstables::shared_sstable>>(excluded);
@@ -2244,8 +2244,8 @@ table::make_reader_excluding_sstables(schema_ptr s,
effective_sstables->insert(sst);
});
readers.emplace_back(make_sstable_reader(s, permit, std::move(effective_sstables), range, slice, pc, std::move(trace_state), fwd, fwd_mr));
return make_combined_reader(s, std::move(permit), std::move(readers), fwd, fwd_mr);
readers.emplace_back(upgrade_to_v2(make_sstable_reader(s, permit, std::move(effective_sstables), range, slice, pc, std::move(trace_state), fwd, fwd_mr)));
return downgrade_to_v1(make_combined_reader(s, std::move(permit), std::move(readers), fwd, fwd_mr));
}
future<> table::move_sstables_from_staging(std::vector<sstables::shared_sstable> sstables) {

View File

@@ -133,9 +133,9 @@ SEASTAR_TEST_CASE(test_mutation_merger_conforms_to_mutation_source) {
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr)
{
std::vector<flat_mutation_reader> readers;
std::vector<flat_mutation_reader_v2> readers;
for (int i = 0; i < n; ++i) {
readers.push_back(memtables[i]->make_flat_reader(s, permit, range, slice, pc, trace_state, fwd, fwd_mr));
readers.push_back(upgrade_to_v2(memtables[i]->make_flat_reader(s, permit, range, slice, pc, trace_state, fwd, fwd_mr)));
}
return make_combined_reader(s, std::move(permit), std::move(readers), fwd, fwd_mr);
});

View File

@@ -79,7 +79,7 @@ SEASTAR_TEST_CASE(test_combining_two_readers_with_the_same_row) {
mutation m2(s, partition_key::from_single_value(*s, "key1"));
m2.set_clustered_cell(clustering_key::make_empty(), "v", data_value(bytes("v2")), 2);
assert_that(make_combined_reader(s, permit, make_flat_mutation_reader_from_mutations(s, permit, {m1}), make_flat_mutation_reader_from_mutations(s, permit, {m2})))
assert_that(make_combined_reader(s, permit, upgrade_to_v2(make_flat_mutation_reader_from_mutations(s, permit, {m1})), upgrade_to_v2(make_flat_mutation_reader_from_mutations(s, permit, {m2}))))
.produces(m2)
.produces_end_of_stream();
});
@@ -97,7 +97,7 @@ SEASTAR_TEST_CASE(test_combining_two_non_overlapping_readers) {
mutation m2(s, partition_key::from_single_value(*s, "keyA"));
m2.set_clustered_cell(clustering_key::make_empty(), "v", data_value(bytes("v2")), 2);
auto cr = make_combined_reader(s, permit, make_flat_mutation_reader_from_mutations(s, permit, {m1}), make_flat_mutation_reader_from_mutations(s, permit, {m2}));
auto cr = make_combined_reader(s, permit, upgrade_to_v2(make_flat_mutation_reader_from_mutations(s, permit, {m1})), upgrade_to_v2(make_flat_mutation_reader_from_mutations(s, permit, {m2})));
assert_that(std::move(cr))
.produces(m2)
.produces(m1)
@@ -120,7 +120,7 @@ SEASTAR_TEST_CASE(test_combining_two_partially_overlapping_readers) {
mutation m3(s, partition_key::from_single_value(*s, "keyC"));
m3.set_clustered_cell(clustering_key::make_empty(), "v", data_value(bytes("v3")), 1);
assert_that(make_combined_reader(s, permit, make_flat_mutation_reader_from_mutations(s, permit, {m1, m2}), make_flat_mutation_reader_from_mutations(s, permit, {m2, m3})))
assert_that(make_combined_reader(s, permit, upgrade_to_v2(make_flat_mutation_reader_from_mutations(s, permit, {m1, m2})), upgrade_to_v2(make_flat_mutation_reader_from_mutations(s, permit, {m2, m3}))))
.produces(m1)
.produces(m2)
.produces(m3)
@@ -143,8 +143,8 @@ SEASTAR_TEST_CASE(test_combining_one_reader_with_many_partitions) {
mutation m3(s, partition_key::from_single_value(*s, "keyC"));
m3.set_clustered_cell(clustering_key::make_empty(), "v", data_value(bytes("v3")), 1);
std::vector<flat_mutation_reader> v;
v.push_back(make_flat_mutation_reader_from_mutations(s, permit, {m1, m2, m3}));
std::vector<flat_mutation_reader_v2> v;
v.push_back(upgrade_to_v2(make_flat_mutation_reader_from_mutations(s, permit, {m1, m2, m3})));
assert_that(make_combined_reader(s, permit, std::move(v), streamed_mutation::forwarding::no, mutation_reader::forwarding::no))
.produces(m1)
.produces(m2)
@@ -169,9 +169,9 @@ SEASTAR_THREAD_TEST_CASE(combined_reader_galloping_within_partition_test) {
return mut;
};
std::vector<flat_mutation_reader> v;
v.push_back(make_flat_mutation_reader_from_mutations(s.schema(), permit, {make_partition(boost::irange(0, 5))}));
v.push_back(make_flat_mutation_reader_from_mutations(s.schema(), permit, {make_partition(boost::irange(5, 10))}));
std::vector<flat_mutation_reader_v2> v;
v.push_back(upgrade_to_v2(make_flat_mutation_reader_from_mutations(s.schema(), permit, {make_partition(boost::irange(0, 5))})));
v.push_back(upgrade_to_v2(make_flat_mutation_reader_from_mutations(s.schema(), permit, {make_partition(boost::irange(5, 10))})));
assert_that(make_combined_reader(s.schema(), permit, std::move(v), streamed_mutation::forwarding::no, mutation_reader::forwarding::no))
.produces(make_partition(boost::irange(0, 10)))
.produces_end_of_stream();
@@ -193,15 +193,15 @@ SEASTAR_THREAD_TEST_CASE(combined_mutation_reader_galloping_over_multiple_partit
const auto k = s.make_pkeys(2);
std::vector<flat_mutation_reader> v;
v.push_back(make_flat_mutation_reader_from_mutations(s.schema(), permit, {
std::vector<flat_mutation_reader_v2> v;
v.push_back(upgrade_to_v2(make_flat_mutation_reader_from_mutations(s.schema(), permit, {
make_partition_with_clustering_rows(s, k[0], boost::irange(5, 10)),
make_partition_with_clustering_rows(s, k[1], boost::irange(0, 5))
}));
v.push_back(make_flat_mutation_reader_from_mutations(s.schema(), permit, {
})));
v.push_back(upgrade_to_v2(make_flat_mutation_reader_from_mutations(s.schema(), permit, {
make_partition_with_clustering_rows(s, k[0], boost::irange(0, 5)),
make_partition_with_clustering_rows(s, k[1], boost::irange(5, 10))
}));
})));
assert_that(make_combined_reader(s.schema(), permit, std::move(v), streamed_mutation::forwarding::no, mutation_reader::forwarding::no))
.produces(make_partition_with_clustering_rows(s, k[0], boost::irange(0, 10)))
.produces(make_partition_with_clustering_rows(s, k[1], boost::irange(0, 10)))
@@ -215,15 +215,15 @@ SEASTAR_THREAD_TEST_CASE(combined_reader_galloping_changing_multiple_partitions_
const auto k = s.make_pkeys(2);
std::vector<flat_mutation_reader> v;
v.push_back(make_flat_mutation_reader_from_mutations(s.schema(), permit, {
std::vector<flat_mutation_reader_v2> v;
v.push_back(upgrade_to_v2(make_flat_mutation_reader_from_mutations(s.schema(), permit, {
make_partition_with_clustering_rows(s, k[0], boost::irange(0, 5)),
make_partition_with_clustering_rows(s, k[1], boost::irange(0, 5))
}));
v.push_back(make_flat_mutation_reader_from_mutations(s.schema(), permit, {
})));
v.push_back(upgrade_to_v2(make_flat_mutation_reader_from_mutations(s.schema(), permit, {
make_partition_with_clustering_rows(s, k[0], boost::irange(5, 10)),
make_partition_with_clustering_rows(s, k[1], boost::irange(5, 10)),
}));
})));
assert_that(make_combined_reader(s.schema(), permit, std::move(v), streamed_mutation::forwarding::no, mutation_reader::forwarding::no))
.produces(make_partition_with_clustering_rows(s, k[0], boost::irange(0, 10)))
.produces(make_partition_with_clustering_rows(s, k[1], boost::irange(0, 10)))
@@ -316,7 +316,7 @@ SEASTAR_TEST_CASE(test_combining_two_readers_with_one_reader_empty) {
mutation m1(s, partition_key::from_single_value(*s, "key1"));
m1.set_clustered_cell(clustering_key::make_empty(), "v", data_value(bytes("v1")), 1);
assert_that(make_combined_reader(s, permit, make_flat_mutation_reader_from_mutations(s, permit, {m1}), make_empty_flat_reader(s, permit)))
assert_that(make_combined_reader(s, permit, upgrade_to_v2(make_flat_mutation_reader_from_mutations(s, permit, {m1})), make_empty_flat_reader_v2(s, permit)))
.produces(m1)
.produces_end_of_stream();
});
@@ -327,18 +327,18 @@ SEASTAR_TEST_CASE(test_combining_two_empty_readers) {
auto s = make_schema();
tests::reader_concurrency_semaphore_wrapper semaphore;
auto permit = semaphore.make_permit();
assert_that(make_combined_reader(s, permit, make_empty_flat_reader(s, permit), make_empty_flat_reader(s, permit)))
assert_that(make_combined_reader(s, permit, make_empty_flat_reader_v2(s, permit), make_empty_flat_reader_v2(s, permit)))
.produces_end_of_stream();
});
}
SEASTAR_TEST_CASE(test_combining_one_empty_reader) {
return seastar::async([] {
std::vector<flat_mutation_reader> v;
std::vector<flat_mutation_reader_v2> v;
tests::reader_concurrency_semaphore_wrapper semaphore;
auto permit = semaphore.make_permit();
auto s = make_schema();
v.push_back(make_empty_flat_reader(s, permit));
v.push_back(make_empty_flat_reader_v2(s, permit));
assert_that(make_combined_reader(s, permit, std::move(v), streamed_mutation::forwarding::no, mutation_reader::forwarding::no))
.produces_end_of_stream();
});
@@ -392,8 +392,8 @@ SEASTAR_TEST_CASE(test_fast_forwarding_combining_reader) {
};
auto make_reader = [&] (reader_permit permit, const dht::partition_range& pr) {
return make_combined_reader(s, permit, ranges::to<std::vector<flat_mutation_reader>>(mutations | boost::adaptors::transformed([&pr, s, permit] (auto& ms) {
return make_flat_mutation_reader_from_mutations(s, permit, {ms}, pr);
return make_combined_reader(s, permit, ranges::to<std::vector<flat_mutation_reader_v2>>(mutations | boost::adaptors::transformed([&pr, s, permit] (auto& ms) {
return upgrade_to_v2(make_flat_mutation_reader_from_mutations(s, permit, {ms}, pr));
})));
};
@@ -444,9 +444,9 @@ SEASTAR_THREAD_TEST_CASE(test_fast_forwarding_combining_reader_with_galloping) {
};
auto pr = dht::partition_range::make(ring[0], ring[0]);
std::vector<flat_mutation_reader> v;
v.push_back(make_flat_mutation_reader_from_mutations(s.schema(), permit, make_n_mutations(boost::irange(0, 5), 7), pr));
v.push_back(make_flat_mutation_reader_from_mutations(s.schema(), permit, make_n_mutations(boost::irange(5, 10), 7), pr));
std::vector<flat_mutation_reader_v2> v;
v.push_back(upgrade_to_v2(make_flat_mutation_reader_from_mutations(s.schema(), permit, make_n_mutations(boost::irange(0, 5), 7), pr)));
v.push_back(upgrade_to_v2(make_flat_mutation_reader_from_mutations(s.schema(), permit, make_n_mutations(boost::irange(5, 10), 7), pr)));
assert_that(make_combined_reader(s.schema(), permit, std::move(v), streamed_mutation::forwarding::no, mutation_reader::forwarding::yes))
.produces(make_partition_with_clustering_rows(s, pkeys[0], boost::irange(0, 10)))
@@ -494,9 +494,9 @@ SEASTAR_TEST_CASE(test_sm_fast_forwarding_combining_reader) {
{make_mutation(2)},
};
std::vector<flat_mutation_reader> readers;
std::vector<flat_mutation_reader_v2> readers;
for (auto& mutations : readers_mutations) {
readers.emplace_back(make_flat_mutation_reader_from_mutations(s.schema(), permit, mutations, streamed_mutation::forwarding::yes));
readers.emplace_back(upgrade_to_v2(make_flat_mutation_reader_from_mutations(s.schema(), permit, mutations, streamed_mutation::forwarding::yes)));
}
assert_that(make_combined_reader(s.schema(), permit, std::move(readers), streamed_mutation::forwarding::yes, mutation_reader::forwarding::no))
@@ -542,9 +542,9 @@ SEASTAR_THREAD_TEST_CASE(test_sm_fast_forwarding_combining_reader_with_galloping
};
auto pr = dht::partition_range::make(ring[0], ring[0]);
std::vector<flat_mutation_reader> v;
v.push_back(make_flat_mutation_reader_from_mutations(s.schema(), permit, make_n_mutations(boost::irange(0, 5), 3), streamed_mutation::forwarding::yes));
v.push_back(make_flat_mutation_reader_from_mutations(s.schema(), permit, make_n_mutations(boost::irange(5, 10), 3), streamed_mutation::forwarding::yes));
std::vector<flat_mutation_reader_v2> v;
v.push_back(upgrade_to_v2(make_flat_mutation_reader_from_mutations(s.schema(), permit, make_n_mutations(boost::irange(0, 5), 3), streamed_mutation::forwarding::yes)));
v.push_back(upgrade_to_v2(make_flat_mutation_reader_from_mutations(s.schema(), permit, make_n_mutations(boost::irange(5, 10), 3), streamed_mutation::forwarding::yes)));
auto reader = make_combined_reader(s.schema(), permit, std::move(v), streamed_mutation::forwarding::yes, mutation_reader::forwarding::no);
auto assertions = assert_that(std::move(reader));
@@ -656,14 +656,14 @@ SEASTAR_TEST_CASE(combined_mutation_reader_test) {
auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::leveled, {});
auto sstable_set = make_lw_shared<sstables::sstable_set>(cs.make_sstable_set(s.schema()));
std::vector<flat_mutation_reader> sstable_mutation_readers;
std::vector<flat_mutation_reader_v2> sstable_mutation_readers;
auto list_permit = env.make_reader_permit();
for (auto sst : sstable_list) {
sstable_set->insert(sst);
sstable_mutation_readers.emplace_back(
sst->as_mutation_source().make_reader(
sst->as_mutation_source().make_reader_v2(
s.schema(),
list_permit,
query::full_partition_range,
@@ -954,7 +954,7 @@ SEASTAR_TEST_CASE(test_fast_forwarding_combined_reader_is_consistent_with_slicin
auto keys = gen.make_partition_keys(3);
std::vector<mutation> combined;
std::list<dht::partition_range> reader_ranges;
std::vector<flat_mutation_reader> readers;
std::vector<flat_mutation_reader_v2> readers;
for (int i = 0; i < n_readers; ++i) {
std::vector<mutation> muts;
for (auto&& key : keys) {
@@ -971,7 +971,7 @@ SEASTAR_TEST_CASE(test_fast_forwarding_combined_reader_is_consistent_with_slicin
}
mutation_source ds = create_sstable(env, s, muts)->as_mutation_source();
reader_ranges.push_back(dht::partition_range::make({keys[0]}, {keys[0]}));
readers.push_back(ds.make_reader(s,
readers.push_back(ds.make_reader_v2(s,
permit,
reader_ranges.back(),
s->full_slice(), default_priority_class(), nullptr,
@@ -979,9 +979,9 @@ SEASTAR_TEST_CASE(test_fast_forwarding_combined_reader_is_consistent_with_slicin
mutation_reader::forwarding::yes));
}
flat_mutation_reader rd = make_combined_reader(s, permit, std::move(readers),
flat_mutation_reader rd = downgrade_to_v1(make_combined_reader(s, permit, std::move(readers),
streamed_mutation::forwarding::yes,
mutation_reader::forwarding::yes);
mutation_reader::forwarding::yes));
auto close_rd = deferred_close(rd);
std::vector<query::clustering_range> ranges = gen.make_random_ranges(3);
@@ -1037,7 +1037,7 @@ SEASTAR_TEST_CASE(test_combined_reader_slicing_with_overlapping_range_tombstones
m2.partition().apply_delete(*s, rt2);
ss.add_row(m2, ss.make_ckey(4), "v2"); // position after rt2.position() but before rt2.end_position().
std::vector<flat_mutation_reader> readers;
std::vector<flat_mutation_reader_v2> readers;
mutation_source ds1 = create_sstable(env, s, {m1})->as_mutation_source();
mutation_source ds2 = create_sstable(env, s, {m2})->as_mutation_source();
@@ -1048,11 +1048,11 @@ SEASTAR_TEST_CASE(test_combined_reader_slicing_with_overlapping_range_tombstones
{
auto permit = env.make_reader_permit();
auto slice = partition_slice_builder(*s).with_range(range).build();
readers.push_back(ds1.make_reader(s, permit, query::full_partition_range, slice));
readers.push_back(ds2.make_reader(s, permit, query::full_partition_range, slice));
readers.push_back(ds1.make_reader_v2(s, permit, query::full_partition_range, slice));
readers.push_back(ds2.make_reader_v2(s, permit, query::full_partition_range, slice));
auto rd = make_combined_reader(s, permit, std::move(readers),
streamed_mutation::forwarding::no, mutation_reader::forwarding::no);
auto rd = downgrade_to_v1(make_combined_reader(s, permit, std::move(readers),
streamed_mutation::forwarding::no, mutation_reader::forwarding::no));
auto close_rd = deferred_close(rd);
auto prange = position_range(range);
@@ -1072,13 +1072,13 @@ SEASTAR_TEST_CASE(test_combined_reader_slicing_with_overlapping_range_tombstones
// Check fast_forward_to()
{
auto permit = env.make_reader_permit();
readers.push_back(ds1.make_reader(s, permit, query::full_partition_range, s->full_slice(), default_priority_class(),
readers.push_back(ds1.make_reader_v2(s, permit, query::full_partition_range, s->full_slice(), default_priority_class(),
nullptr, streamed_mutation::forwarding::yes));
readers.push_back(ds2.make_reader(s, permit, query::full_partition_range, s->full_slice(), default_priority_class(),
readers.push_back(ds2.make_reader_v2(s, permit, query::full_partition_range, s->full_slice(), default_priority_class(),
nullptr, streamed_mutation::forwarding::yes));
auto rd = make_combined_reader(s, permit, std::move(readers),
streamed_mutation::forwarding::yes, mutation_reader::forwarding::no);
auto rd = downgrade_to_v1(make_combined_reader(s, permit, std::move(readers),
streamed_mutation::forwarding::yes, mutation_reader::forwarding::no));
auto close_rd = deferred_close(rd);
auto prange = position_range(range);
@@ -4115,11 +4115,11 @@ SEASTAR_THREAD_TEST_CASE(clustering_combined_reader_mutation_source_test) {
good_readers.emplace(k, make_clustering_combined_reader(s, permit, fwd_sm, std::move(q)));
}
std::vector<flat_mutation_reader> readers;
std::vector<flat_mutation_reader_v2> readers;
for (auto& m: bad) {
readers.push_back(make_flat_mutation_reader_from_mutations(s, permit, {m}, range, slice, fwd_sm));
readers.push_back(upgrade_to_v2(make_flat_mutation_reader_from_mutations(s, permit, {m}, range, slice, fwd_sm)));
}
readers.push_back(make_flat_mutation_reader<multi_partition_reader>(s, permit, std::move(good_readers), range));
readers.push_back(upgrade_to_v2(make_flat_mutation_reader<multi_partition_reader>(s, permit, std::move(good_readers), range)));
return make_combined_reader(std::move(s), std::move(permit), std::move(readers), fwd_sm, fwd_mr);
});

View File

@@ -363,10 +363,10 @@ SEASTAR_THREAD_TEST_CASE(test_timestamp_based_splitting_mutation_writer) {
testlog.debug("Data split into {} buckets: {}", buckets.size(), boost::copy_range<std::vector<int64_t>>(buckets | boost::adaptors::map_keys));
auto permit = semaphore.make_permit();
auto bucket_readers = boost::copy_range<std::vector<flat_mutation_reader>>(buckets | boost::adaptors::map_values |
boost::adaptors::transformed([&random_schema, &permit] (std::vector<mutation> muts) { return make_flat_mutation_reader_from_mutations(random_schema.schema(), permit, std::move(muts)); }));
auto reader = make_combined_reader(random_schema.schema(), permit, std::move(bucket_readers), streamed_mutation::forwarding::no,
mutation_reader::forwarding::no);
auto bucket_readers = boost::copy_range<std::vector<flat_mutation_reader_v2>>(buckets | boost::adaptors::map_values |
boost::adaptors::transformed([&random_schema, &permit] (std::vector<mutation> muts) { return upgrade_to_v2(make_flat_mutation_reader_from_mutations(random_schema.schema(), permit, std::move(muts))); }));
auto reader = downgrade_to_v1(make_combined_reader(random_schema.schema(), permit, std::move(bucket_readers), streamed_mutation::forwarding::no,
mutation_reader::forwarding::no));
auto close_reader = deferred_close(reader);
const auto now = gc_clock::now();
@@ -488,14 +488,14 @@ SEASTAR_THREAD_TEST_CASE(test_partition_based_splitting_mutation_writer) {
});
};
auto check_and_reset = [&] {
std::vector<flat_mutation_reader> readers;
std::vector<flat_mutation_reader_v2> readers;
auto close_readers = defer([&] {
for (auto& rd : readers) {
rd.close().get();
}
});
for (auto muts : output_mutations) {
readers.emplace_back(make_flat_mutation_reader_from_mutations(random_schema.schema(), semaphore.make_permit(), std::move(muts)));
readers.emplace_back(upgrade_to_v2(make_flat_mutation_reader_from_mutations(random_schema.schema(), semaphore.make_permit(), std::move(muts))));
}
auto rd = assert_that(make_combined_reader(random_schema.schema(), semaphore.make_permit(), std::move(readers)));
for (const auto& mut : input_mutations) {

View File

@@ -3233,10 +3233,10 @@ static tmpdir write_sstables(test_env& env, schema_ptr s, lw_shared_ptr<memtable
tmpdir tmp;
auto sst = env.make_sstable(s, tmp.path().string(), 1, version, sstable::format_types::big, 4096);
sst->write_components(make_combined_reader(s,
sst->write_components(downgrade_to_v1(make_combined_reader(s,
env.make_reader_permit(),
mt1->make_flat_reader(s, env.make_reader_permit()),
mt2->make_flat_reader(s, env.make_reader_permit())), 1, s, env.manager().configure_writer(), mt1->get_encoding_stats()).get();
upgrade_to_v2(mt1->make_flat_reader(s, env.make_reader_permit())),
upgrade_to_v2(mt2->make_flat_reader(s, env.make_reader_permit())))), 1, s, env.manager().configure_writer(), mt1->get_encoding_stats()).get();
return tmp;
}

View File

@@ -2856,11 +2856,11 @@ SEASTAR_THREAD_TEST_CASE(test_scrub_segregate_stack) {
testlog.info("Checking position monotonicity of re-combined stream");
{
std::vector<flat_mutation_reader> readers;
std::vector<flat_mutation_reader_v2> readers;
readers.reserve(segregated_fragment_streams.size());
for (const auto& segregated_fragment_stream : segregated_fragment_streams) {
readers.emplace_back(make_flat_mutation_reader_from_fragments(schema, permit, copy_fragments(segregated_fragment_stream)));
readers.emplace_back(upgrade_to_v2(make_flat_mutation_reader_from_fragments(schema, permit, copy_fragments(segregated_fragment_stream))));
}
assert_that(make_combined_reader(schema, permit, std::move(readers))).has_monotonic_positions();
@@ -2868,11 +2868,11 @@ SEASTAR_THREAD_TEST_CASE(test_scrub_segregate_stack) {
testlog.info("Checking content of re-combined stream");
{
std::vector<flat_mutation_reader> readers;
std::vector<flat_mutation_reader_v2> readers;
readers.reserve(segregated_fragment_streams.size());
for (const auto& segregated_fragment_stream : segregated_fragment_streams) {
readers.emplace_back(make_flat_mutation_reader_from_fragments(schema, permit, copy_fragments(segregated_fragment_stream)));
readers.emplace_back(upgrade_to_v2(make_flat_mutation_reader_from_fragments(schema, permit, copy_fragments(segregated_fragment_stream))));
}
auto rd = assert_that(make_combined_reader(schema, permit, std::move(readers)));

View File

@@ -1220,8 +1220,8 @@ SEASTAR_TEST_CASE(test_writing_combined_stream_with_tombstones_at_the_same_posit
auto mt2 = make_lw_shared<memtable>(s);
mt2->apply(m2);
auto combined_permit = env.make_reader_permit();
auto mr = make_combined_reader(s, combined_permit,
mt1->make_flat_reader(s, combined_permit), mt2->make_flat_reader(s, combined_permit));
auto mr = downgrade_to_v1(make_combined_reader(s, combined_permit,
upgrade_to_v2(mt1->make_flat_reader(s, combined_permit)), upgrade_to_v2(mt2->make_flat_reader(s, combined_permit))));
auto sst = make_sstable_easy(env, dir.path(), std::move(mr), env.manager().configure_writer(), 1, version);
assert_that(sst->as_mutation_source().make_reader(s, env.make_reader_permit()))

View File

@@ -67,19 +67,19 @@ private:
auto new_mt = make_lw_shared<memtable>(_s);
tests::reader_concurrency_semaphore_wrapper semaphore;
auto permit = semaphore.make_permit();
std::vector<flat_mutation_reader> readers;
std::vector<flat_mutation_reader_v2> readers;
for (auto&& mt : _memtables) {
readers.push_back(mt->make_flat_reader(new_mt->schema(),
readers.push_back(upgrade_to_v2(mt->make_flat_reader(new_mt->schema(),
permit,
query::full_partition_range,
new_mt->schema()->full_slice(),
default_priority_class(),
nullptr,
streamed_mutation::forwarding::no,
mutation_reader::forwarding::yes));
mutation_reader::forwarding::yes)));
}
_memtables.push_back(new_memtable());
auto&& rd = make_combined_reader(new_mt->schema(), permit, std::move(readers));
auto&& rd = downgrade_to_v1(make_combined_reader(new_mt->schema(), permit, std::move(readers)));
auto close_rd = deferred_close(rd);
consume_partitions(rd, [&] (mutation&& m) {
new_mt->apply(std::move(m));

View File

@@ -65,7 +65,7 @@ protected:
const std::vector<std::vector<mutation>>& overlapping_partitions_disjoint_rows_streams() const {
return _overlapping_partitions_disjoint_rows;
}
future<> consume_all(flat_mutation_reader mr) const;
future<> consume_all(flat_mutation_reader_v2 mr) const;
public:
combined()
: _semaphore("combined")
@@ -150,9 +150,9 @@ std::vector<std::vector<mutation>> combined::create_overlapping_partitions_disjo
return mss;
}
future<> combined::consume_all(flat_mutation_reader mr) const
future<> combined::consume_all(flat_mutation_reader_v2 mr) const
{
return with_closeable(std::move(mr), [] (auto& mr) {
return with_closeable(downgrade_to_v1(std::move(mr)), [] (auto& mr) {
perf_tests::start_measuring_time();
return mr.consume_pausable([] (mutation_fragment mf) {
perf_tests::do_not_optimize(mf);
@@ -165,28 +165,28 @@ future<> combined::consume_all(flat_mutation_reader mr) const
PERF_TEST_F(combined, one_row)
{
std::vector<flat_mutation_reader> mrs;
mrs.emplace_back(make_flat_mutation_reader_from_mutations(schema().schema(), permit(), one_row_stream()));
std::vector<flat_mutation_reader_v2> mrs;
mrs.emplace_back(upgrade_to_v2(make_flat_mutation_reader_from_mutations(schema().schema(), permit(), one_row_stream())));
return consume_all(make_combined_reader(schema().schema(), permit(), std::move(mrs)));
}
PERF_TEST_F(combined, single_active)
{
std::vector<flat_mutation_reader> mrs;
std::vector<flat_mutation_reader_v2> mrs;
mrs.reserve(4);
mrs.emplace_back(make_flat_mutation_reader_from_mutations(schema().schema(), permit(), single_stream()));
mrs.emplace_back(upgrade_to_v2(make_flat_mutation_reader_from_mutations(schema().schema(), permit(), single_stream())));
for (auto i = 0; i < 3; i++) {
mrs.emplace_back(make_empty_flat_reader(schema().schema(), permit()));
mrs.emplace_back(make_empty_flat_reader_v2(schema().schema(), permit()));
}
return consume_all(make_combined_reader(schema().schema(), permit(), std::move(mrs)));
}
PERF_TEST_F(combined, many_overlapping)
{
std::vector<flat_mutation_reader> mrs;
std::vector<flat_mutation_reader_v2> mrs;
mrs.reserve(4);
for (auto i = 0; i < 4; i++) {
mrs.emplace_back(make_flat_mutation_reader_from_mutations(schema().schema(), permit(), single_stream()));
mrs.emplace_back(upgrade_to_v2(make_flat_mutation_reader_from_mutations(schema().schema(), permit(), single_stream())));
}
return consume_all(make_combined_reader(schema().schema(), permit(), std::move(mrs)));
}
@@ -194,10 +194,10 @@ PERF_TEST_F(combined, many_overlapping)
PERF_TEST_F(combined, disjoint_interleaved)
{
return consume_all(make_combined_reader(schema().schema(), permit(),
boost::copy_range<std::vector<flat_mutation_reader>>(
boost::copy_range<std::vector<flat_mutation_reader_v2>>(
disjoint_interleaved_streams()
| boost::adaptors::transformed([this] (auto&& ms) {
return schema().schema(), make_flat_mutation_reader_from_mutations(schema().schema(), permit(), std::move(ms));
return schema().schema(), upgrade_to_v2(make_flat_mutation_reader_from_mutations(schema().schema(), permit(), std::move(ms)));
})
)
));
@@ -206,10 +206,10 @@ PERF_TEST_F(combined, disjoint_interleaved)
PERF_TEST_F(combined, disjoint_ranges)
{
return consume_all(make_combined_reader(schema().schema(), permit(),
boost::copy_range<std::vector<flat_mutation_reader>>(
boost::copy_range<std::vector<flat_mutation_reader_v2>>(
disjoint_ranges_streams()
| boost::adaptors::transformed([this] (auto&& ms) {
return make_flat_mutation_reader_from_mutations(schema().schema(), permit(), std::move(ms));
return upgrade_to_v2(make_flat_mutation_reader_from_mutations(schema().schema(), permit(), std::move(ms)));
})
)
));
@@ -218,10 +218,10 @@ PERF_TEST_F(combined, disjoint_ranges)
PERF_TEST_F(combined, overlapping_partitions_disjoint_rows)
{
return consume_all(make_combined_reader(schema().schema(), permit(),
boost::copy_range<std::vector<flat_mutation_reader>>(
boost::copy_range<std::vector<flat_mutation_reader_v2>>(
overlapping_partitions_disjoint_rows_streams()
| boost::adaptors::transformed([this] (auto&& ms) {
return make_flat_mutation_reader_from_mutations(schema().schema(), permit(), std::move(ms));
return upgrade_to_v2(make_flat_mutation_reader_from_mutations(schema().schema(), permit(), std::move(ms)));
})
)
));
@@ -246,7 +246,7 @@ protected:
const std::vector<mutation_bounds>& almost_disjoint_clustering_ranges() const {
return _almost_disjoint_ranges;
}
future<size_t> consume_all(flat_mutation_reader mr) const;
future<size_t> consume_all(flat_mutation_reader_v2 mr) const;
public:
clustering_combined()
: _semaphore("clustering_combined")
@@ -270,9 +270,9 @@ std::vector<mutation_bounds> clustering_combined::create_almost_disjoint_ranges(
return mbs;
}
future<size_t> clustering_combined::consume_all(flat_mutation_reader mr) const
future<size_t> clustering_combined::consume_all(flat_mutation_reader_v2 mr) const
{
return do_with(std::move(mr), size_t(0), [] (auto& mr, size_t& num_mfs) {
return do_with(downgrade_to_v1(std::move(mr)), size_t(0), [] (auto& mr, size_t& num_mfs) {
perf_tests::start_measuring_time();
return mr.consume_pausable([&num_mfs] (mutation_fragment mf) {
++num_mfs;
@@ -290,10 +290,10 @@ future<size_t> clustering_combined::consume_all(flat_mutation_reader mr) const
PERF_TEST_F(clustering_combined, ranges_generic)
{
return consume_all(make_combined_reader(schema().schema(), permit(),
boost::copy_range<std::vector<flat_mutation_reader>>(
boost::copy_range<std::vector<flat_mutation_reader_v2>>(
almost_disjoint_clustering_ranges()
| boost::adaptors::transformed([this] (auto&& mb) {
return make_flat_mutation_reader_from_mutations(schema().schema(), permit(), {std::move(mb.m)});
return upgrade_to_v2(make_flat_mutation_reader_from_mutations(schema().schema(), permit(), {std::move(mb.m)}));
})
)
));
@@ -308,8 +308,8 @@ PERF_TEST_F(clustering_combined, ranges_specialized)
std::move(mb.lower), std::move(mb.upper)};
}));
auto q = std::make_unique<simple_position_reader_queue>(*schema().schema(), std::move(rbs));
return consume_all(make_clustering_combined_reader(
schema().schema(), permit(), streamed_mutation::forwarding::no, std::move(q)));
return consume_all(upgrade_to_v2(make_clustering_combined_reader(
schema().schema(), permit(), streamed_mutation::forwarding::no, std::move(q))));
}
class memtable {

View File

@@ -85,7 +85,7 @@ void run_test(const sstring& name, schema_ptr s, MutationGenerator&& gen) {
// Create a reader which tests the case of memtable snapshots
// going away after memtable was merged to cache.
auto rd = std::make_unique<flat_mutation_reader>(
make_combined_reader(s, permit, cache.make_reader(s, permit), mt->make_flat_reader(s, permit)));
downgrade_to_v1(make_combined_reader(s, permit, upgrade_to_v2(cache.make_reader(s, permit)), upgrade_to_v2(mt->make_flat_reader(s, permit)))));
auto close_rd = defer([&rd] { rd->close().get(); });
rd->set_max_buffer_size(1);
rd->fill_buffer().get();

View File

@@ -164,17 +164,17 @@ struct table {
std::unique_ptr<reader> make_reader(dht::partition_range pr, query::partition_slice slice) {
testlog.trace("making reader, pk={} ck={}", pr, slice);
auto r = std::make_unique<reader>(std::move(pr), std::move(slice));
std::vector<flat_mutation_reader> rd;
std::vector<flat_mutation_reader_v2> rd;
auto permit = make_permit();
if (prev_mt) {
rd.push_back(prev_mt->make_flat_reader(s.schema(), permit, r->pr, r->slice, default_priority_class(), nullptr,
streamed_mutation::forwarding::no, mutation_reader::forwarding::no));
rd.push_back(upgrade_to_v2(prev_mt->make_flat_reader(s.schema(), permit, r->pr, r->slice, default_priority_class(), nullptr,
streamed_mutation::forwarding::no, mutation_reader::forwarding::no)));
}
rd.push_back(mt->make_flat_reader(s.schema(), permit, r->pr, r->slice, default_priority_class(), nullptr,
streamed_mutation::forwarding::no, mutation_reader::forwarding::no));
rd.push_back(cache.make_reader(s.schema(), permit, r->pr, r->slice, default_priority_class(), nullptr,
streamed_mutation::forwarding::no, mutation_reader::forwarding::no));
r->rd = make_combined_reader(s.schema(), permit, std::move(rd), streamed_mutation::forwarding::no, mutation_reader::forwarding::no);
rd.push_back(upgrade_to_v2(mt->make_flat_reader(s.schema(), permit, r->pr, r->slice, default_priority_class(), nullptr,
streamed_mutation::forwarding::no, mutation_reader::forwarding::no)));
rd.push_back(upgrade_to_v2(cache.make_reader(s.schema(), permit, r->pr, r->slice, default_priority_class(), nullptr,
streamed_mutation::forwarding::no, mutation_reader::forwarding::no)));
r->rd = downgrade_to_v1(make_combined_reader(s.schema(), permit, std::move(rd), streamed_mutation::forwarding::no, mutation_reader::forwarding::no));
return r;
}

View File

@@ -535,16 +535,16 @@ void consume_sstables(schema_ptr schema, reader_permit permit, std::vector<sstab
std::function<stop_iteration(flat_mutation_reader&, sstables::sstable*)> reader_consumer) {
sst_log.trace("consume_sstables(): {} sstables, merge={}, no_skips={}", sstables.size(), merge, no_skips);
if (merge) {
std::vector<flat_mutation_reader> readers;
std::vector<flat_mutation_reader_v2> readers;
readers.reserve(sstables.size());
for (const auto& sst : sstables) {
if (no_skips) {
readers.emplace_back(sst->make_crawling_reader_v1(schema, permit));
readers.emplace_back(sst->make_crawling_reader(schema, permit));
} else {
readers.emplace_back(sst->make_reader_v1(schema, permit, query::full_partition_range, schema->full_slice()));
readers.emplace_back(sst->make_reader(schema, permit, query::full_partition_range, schema->full_slice()));
}
}
auto rd = make_combined_reader(schema, permit, std::move(readers));
auto rd = downgrade_to_v1(make_combined_reader(schema, permit, std::move(readers)));
reader_consumer(rd, nullptr);
} else {