readers: Update tests to use make_queue_reader_v2.

Closes #10220
This commit is contained in:
Mikołaj Sielużycki
2022-03-15 12:39:24 +01:00
committed by Botond Dénes
parent 5d7b2c6515
commit 7ce0d380d4
3 changed files with 61 additions and 29 deletions

View File

@@ -47,6 +47,7 @@
#include "mutation_rebuilder.hh"
#include <boost/range/algorithm/sort.hpp>
#include "readers/from_mutations_v2.hh"
#include "readers/from_mutations.hh"
#include "readers/from_mutations_v2.hh"
#include "readers/forwardable_v2.hh"
@@ -2470,7 +2471,7 @@ SEASTAR_THREAD_TEST_CASE(test_queue_reader) {
// Simultaneous read and write
{
auto read_all = [] (flat_mutation_reader& reader, std::vector<mutation>& muts) {
auto read_all = [] (flat_mutation_reader_v2& reader, std::vector<mutation>& muts) {
return async([&reader, &muts] {
auto close_reader = deferred_close(reader);
while (auto mut_opt = read_mutation_from_flat_mutation_reader(reader).get0()) {
@@ -2479,9 +2480,9 @@ SEASTAR_THREAD_TEST_CASE(test_queue_reader) {
});
};
auto write_all = [&semaphore] (queue_reader_handle& handle, const std::vector<mutation>& muts) {
auto write_all = [&semaphore] (queue_reader_handle_v2& handle, const std::vector<mutation>& muts) {
return async([&] {
auto reader = make_flat_mutation_reader_from_mutations(muts.front().schema(), semaphore.make_permit(), muts);
auto reader = make_flat_mutation_reader_from_mutations_v2(muts.front().schema(), semaphore.make_permit(), muts);
auto close_reader = deferred_close(reader);
while (auto mf_opt = reader().get0()) {
handle.push(std::move(*mf_opt)).get();
@@ -2493,7 +2494,7 @@ SEASTAR_THREAD_TEST_CASE(test_queue_reader) {
auto actual_muts = std::vector<mutation>{};
actual_muts.reserve(20);
auto p = make_queue_reader(gen.schema(), semaphore.make_permit());
auto p = make_queue_reader_v2(gen.schema(), semaphore.make_permit());
auto& reader = std::get<0>(p);
auto& handle = std::get<1>(p);
auto close_reader = deferred_close(reader);
@@ -2506,13 +2507,13 @@ SEASTAR_THREAD_TEST_CASE(test_queue_reader) {
// abort() -- check that consumer is aborted
{
auto p = make_queue_reader(gen.schema(), semaphore.make_permit());
auto p = make_queue_reader_v2(gen.schema(), semaphore.make_permit());
auto& reader = std::get<0>(p);
auto& handle = std::get<1>(p);
auto close_reader = deferred_close(reader);
auto fill_buffer_fut = reader.fill_buffer();
auto expected_reader = make_flat_mutation_reader_from_mutations(expected_muts.front().schema(), semaphore.make_permit(), expected_muts);
auto expected_reader = make_flat_mutation_reader_from_mutations_v2(expected_muts.front().schema(), semaphore.make_permit(), expected_muts);
auto close_expected_reader = deferred_close(expected_reader);
handle.push(std::move(*expected_reader().get0())).get();
@@ -2522,19 +2523,19 @@ SEASTAR_THREAD_TEST_CASE(test_queue_reader) {
handle.abort(std::make_exception_ptr<std::runtime_error>(std::runtime_error("error")));
BOOST_REQUIRE_THROW(fill_buffer_fut.get(), std::runtime_error);
BOOST_REQUIRE_THROW(handle.push(mutation_fragment(*gen.schema(), semaphore.make_permit(), partition_end{})).get(), std::runtime_error);
BOOST_REQUIRE_THROW(handle.push(mutation_fragment_v2(*gen.schema(), semaphore.make_permit(), partition_end{})).get(), std::runtime_error);
BOOST_REQUIRE(!reader.is_end_of_stream());
}
// abort() -- check that producer is aborted
{
auto p = make_queue_reader(gen.schema(), semaphore.make_permit());
auto p = make_queue_reader_v2(gen.schema(), semaphore.make_permit());
auto& reader = std::get<0>(p);
auto& handle = std::get<1>(p);
auto close_reader = deferred_close(reader);
reader.set_max_buffer_size(1);
auto expected_reader = make_flat_mutation_reader_from_mutations(expected_muts.front().schema(), semaphore.make_permit(), expected_muts);
auto expected_reader = make_flat_mutation_reader_from_mutations_v2(expected_muts.front().schema(), semaphore.make_permit(), expected_muts);
auto close_expected_reader = deferred_close(expected_reader);
auto push_fut = make_ready_future<>();
@@ -2553,7 +2554,7 @@ SEASTAR_THREAD_TEST_CASE(test_queue_reader) {
// Detached handle
{
auto p = make_queue_reader(gen.schema(), semaphore.make_permit());
auto p = make_queue_reader_v2(gen.schema(), semaphore.make_permit());
auto& reader = std::get<0>(p);
auto& handle = std::get<1>(p);
auto fill_buffer_fut = reader.fill_buffer();
@@ -2563,20 +2564,20 @@ SEASTAR_THREAD_TEST_CASE(test_queue_reader) {
throwaway_reader.close().get();
}
BOOST_REQUIRE_THROW(handle.push(mutation_fragment(*gen.schema(), semaphore.make_permit(), partition_end{})).get(), std::runtime_error);
BOOST_REQUIRE_THROW(handle.push(mutation_fragment_v2(*gen.schema(), semaphore.make_permit(), partition_end{})).get(), std::runtime_error);
BOOST_REQUIRE_THROW(handle.push_end_of_stream(), std::runtime_error);
BOOST_REQUIRE_NO_THROW(fill_buffer_fut.get());
}
// Abandoned handle aborts, move-assignment
{
auto p = make_queue_reader(gen.schema(), semaphore.make_permit());
auto p = make_queue_reader_v2(gen.schema(), semaphore.make_permit());
auto& reader = std::get<0>(p);
auto& handle = std::get<1>(p);
auto close_reader = deferred_close(reader);
auto fill_buffer_fut = reader.fill_buffer();
auto expected_reader = make_flat_mutation_reader_from_mutations(expected_muts.front().schema(), semaphore.make_permit(), expected_muts);
auto expected_reader = make_flat_mutation_reader_from_mutations_v2(expected_muts.front().schema(), semaphore.make_permit(), expected_muts);
auto close_expected_reader = deferred_close(expected_reader);
handle.push(std::move(*expected_reader().get0())).get();
@@ -2584,7 +2585,7 @@ SEASTAR_THREAD_TEST_CASE(test_queue_reader) {
BOOST_REQUIRE(!fill_buffer_fut.available());
{
auto p = make_queue_reader(gen.schema(), semaphore.make_permit());
auto p = make_queue_reader_v2(gen.schema(), semaphore.make_permit());
auto& throwaway_reader = std::get<0>(p);
auto& throwaway_handle = std::get<1>(p);
auto close_throwaway_reader = deferred_close(throwaway_reader);
@@ -2593,18 +2594,18 @@ SEASTAR_THREAD_TEST_CASE(test_queue_reader) {
}
BOOST_REQUIRE_THROW(fill_buffer_fut.get(), std::runtime_error);
BOOST_REQUIRE_THROW(handle.push(mutation_fragment(*gen.schema(), semaphore.make_permit(), partition_end{})).get(), std::runtime_error);
BOOST_REQUIRE_THROW(handle.push(mutation_fragment_v2(*gen.schema(), semaphore.make_permit(), partition_end{})).get(), std::runtime_error);
}
// Abandoned handle aborts, destructor
{
auto p = make_queue_reader(gen.schema(), semaphore.make_permit());
auto p = make_queue_reader_v2(gen.schema(), semaphore.make_permit());
auto& reader = std::get<0>(p);
auto& handle = std::get<1>(p);
auto close_reader = deferred_close(reader);
auto fill_buffer_fut = reader.fill_buffer();
auto expected_reader = make_flat_mutation_reader_from_mutations(expected_muts.front().schema(), semaphore.make_permit(), expected_muts);
auto expected_reader = make_flat_mutation_reader_from_mutations_v2(expected_muts.front().schema(), semaphore.make_permit(), expected_muts);
auto close_expected_reader = deferred_close(expected_reader);
handle.push(std::move(*expected_reader().get0())).get();
@@ -2613,28 +2614,28 @@ SEASTAR_THREAD_TEST_CASE(test_queue_reader) {
{
// Destroy handle
queue_reader_handle throwaway_handle(std::move(handle));
queue_reader_handle_v2 throwaway_handle(std::move(handle));
}
BOOST_REQUIRE_THROW(fill_buffer_fut.get(), std::runtime_error);
BOOST_REQUIRE_THROW(handle.push(mutation_fragment(*gen.schema(), semaphore.make_permit(), partition_end{})).get(), std::runtime_error);
BOOST_REQUIRE_THROW(handle.push(mutation_fragment_v2(*gen.schema(), semaphore.make_permit(), partition_end{})).get(), std::runtime_error);
}
// Life-cycle, relies on ASAN for error reporting
{
auto p = make_queue_reader(gen.schema(), semaphore.make_permit());
auto p = make_queue_reader_v2(gen.schema(), semaphore.make_permit());
auto& reader = std::get<0>(p);
auto& handle = std::get<1>(p);
auto close_reader = deferred_close(reader);
{
auto throwaway_p = make_queue_reader(gen.schema(), semaphore.make_permit());
auto throwaway_p = make_queue_reader_v2(gen.schema(), semaphore.make_permit());
auto& throwaway_reader = std::get<0>(throwaway_p);
auto& throwaway_handle = std::get<1>(throwaway_p);
auto close_throwaway_reader = deferred_close(throwaway_reader);
// Overwrite handle
handle = std::move(throwaway_handle);
auto another_throwaway_p = make_queue_reader(gen.schema(), semaphore.make_permit());
auto another_throwaway_p = make_queue_reader_v2(gen.schema(), semaphore.make_permit());
auto& another_throwaway_reader = std::get<0>(another_throwaway_p);
auto& another_throwaway_handle = std::get<1>(another_throwaway_p);
auto close_another_throwaway_reader = deferred_close(another_throwaway_reader);
@@ -2643,13 +2644,13 @@ SEASTAR_THREAD_TEST_CASE(test_queue_reader) {
another_throwaway_handle = std::move(throwaway_handle);
// Overwrite with moved-from handle (move constructor)
queue_reader_handle yet_another_throwaway_handle(std::move(throwaway_handle));
queue_reader_handle_v2 yet_another_throwaway_handle(std::move(throwaway_handle));
}
}
// push_end_of_stream() detaches handle from reader, relies on ASAN for error reporting
{
auto p = make_queue_reader(gen.schema(), semaphore.make_permit());
auto p = make_queue_reader_v2(gen.schema(), semaphore.make_permit());
auto& reader = std::get<0>(p);
auto& handle = std::get<1>(p);
auto close_reader = deferred_close(reader);
@@ -3862,9 +3863,9 @@ SEASTAR_THREAD_TEST_CASE(test_clustering_order_merger_in_memory) {
auto make_authority = [s = g._s, &semaphore] (std::optional<mutation> mut, streamed_mutation::forwarding fwd) {
auto permit = semaphore.make_permit();
if (mut) {
return make_flat_mutation_reader_from_mutations(s, permit, {std::move(*mut)}, fwd);
return make_flat_mutation_reader_from_mutations_v2(s, permit, {std::move(*mut)}, fwd);
}
return make_empty_flat_reader(s, permit);
return make_empty_flat_reader_v2(s, permit);
};
auto make_tested = [s = g._s, &semaphore] (std::vector<mutation_bounds> ms, streamed_mutation::forwarding fwd) {
@@ -3874,7 +3875,7 @@ SEASTAR_THREAD_TEST_CASE(test_clustering_order_merger_in_memory) {
return make_reader_bounds(s, permit, std::move(mb), fwd);
}));
auto q = std::make_unique<simple_position_reader_queue>(*s, std::move(rs));
return downgrade_to_v1(make_clustering_combined_reader(s, permit, fwd, std::move(q)));
return make_clustering_combined_reader(s, permit, fwd, std::move(q));
};
auto seed = tests::random::get_int<uint32_t>();
@@ -3928,7 +3929,7 @@ static future<> do_test_clustering_order_merger_sstable_set(bool reversed) {
}
auto make_authority = [&env, &query_schema, &query_slice] (mutation mut, streamed_mutation::forwarding fwd) {
return make_flat_mutation_reader_from_mutations(query_schema, env.make_reader_permit(), {std::move(mut)}, query_slice, fwd);
return make_flat_mutation_reader_from_mutations_v2(query_schema, env.make_reader_permit(), {std::move(mut)}, query_slice, fwd);
};
auto pr = dht::partition_range::make_singular(dht::ring_position(dht::decorate_key(*query_schema, g._pk)));
@@ -3943,7 +3944,7 @@ static future<> do_test_clustering_order_merger_sstable_set(bool reversed) {
},
[included_gens] (const sstable& sst) { return included_gens.contains(sst.generation()); },
pk, query_schema, permit, fwd, reversed);
return downgrade_to_v1(make_clustering_combined_reader(query_schema, permit, fwd, std::move(q)));
return make_clustering_combined_reader(query_schema, permit, fwd, std::move(q));
};
auto seed = tests::random::get_int<uint32_t>();

View File

@@ -2710,6 +2710,35 @@ void compare_readers(const schema& s, flat_mutation_reader authority, flat_mutat
}
}
static bool compare_readers(const schema& s, flat_mutation_reader_v2& authority, flat_reader_assertions_v2& tested) {
bool empty = true;
while (auto expected = authority().get()) {
tested.produces(s, *expected);
empty = false;
}
tested.produces_end_of_stream();
return !empty;
}
void compare_readers(const schema& s, flat_mutation_reader_v2 authority, flat_mutation_reader_v2 tested) {
auto close_authority = deferred_close(authority);
auto assertions = assert_that(std::move(tested));
compare_readers(s, authority, assertions);
}
// Assumes that the readers return fragments from (at most) a single (and the same) partition.
void compare_readers(const schema& s, flat_mutation_reader_v2 authority, flat_mutation_reader_v2 tested, const std::vector<position_range>& fwd_ranges) {
auto close_authority = deferred_close(authority);
auto assertions = assert_that(std::move(tested));
if (compare_readers(s, authority, assertions)) {
for (auto& r: fwd_ranges) {
authority.fast_forward_to(r).get();
assertions.fast_forward_to(r);
compare_readers(s, authority, assertions);
}
}
}
mutation forwardable_reader_to_mutation(flat_mutation_reader r, const std::vector<position_range>& fwd_ranges) {
auto close_reader = deferred_close(r);

View File

@@ -70,6 +70,8 @@ void for_each_schema_change(std::function<void(schema_ptr, const std::vector<mut
void compare_readers(const schema&, flat_mutation_reader authority, flat_mutation_reader tested);
void compare_readers(const schema&, flat_mutation_reader authority, flat_mutation_reader tested, const std::vector<position_range>& fwd_ranges);
void compare_readers(const schema&, flat_mutation_reader_v2 authority, flat_mutation_reader_v2 tested);
void compare_readers(const schema&, flat_mutation_reader_v2 authority, flat_mutation_reader_v2 tested, const std::vector<position_range>& fwd_ranges);
// Forward `r` to each range in `fwd_ranges` and consume all fragments produced by `r` in these ranges.
// Build a mutation out of these fragments.