From 7ce0d380d4ebbd127da9c33322cf0e563424d6f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20Sielu=C5=BCycki?= Date: Tue, 15 Mar 2022 12:39:24 +0100 Subject: [PATCH] readers: Update tests to use make_queue_reader_v2. Closes #10220 --- test/boost/mutation_reader_test.cc | 59 +++++++++++++++--------------- test/lib/mutation_source_test.cc | 29 +++++++++++++++ test/lib/mutation_source_test.hh | 2 + 3 files changed, 61 insertions(+), 29 deletions(-) diff --git a/test/boost/mutation_reader_test.cc b/test/boost/mutation_reader_test.cc index e1a0834329..635959b138 100644 --- a/test/boost/mutation_reader_test.cc +++ b/test/boost/mutation_reader_test.cc @@ -47,6 +47,7 @@ #include "mutation_rebuilder.hh" #include +#include "readers/from_mutations_v2.hh" #include "readers/from_mutations.hh" #include "readers/from_mutations_v2.hh" #include "readers/forwardable_v2.hh" @@ -2470,7 +2471,7 @@ SEASTAR_THREAD_TEST_CASE(test_queue_reader) { // Simultaneous read and write { - auto read_all = [] (flat_mutation_reader& reader, std::vector& muts) { + auto read_all = [] (flat_mutation_reader_v2& reader, std::vector& muts) { return async([&reader, &muts] { auto close_reader = deferred_close(reader); while (auto mut_opt = read_mutation_from_flat_mutation_reader(reader).get0()) { @@ -2479,9 +2480,9 @@ SEASTAR_THREAD_TEST_CASE(test_queue_reader) { }); }; - auto write_all = [&semaphore] (queue_reader_handle& handle, const std::vector& muts) { + auto write_all = [&semaphore] (queue_reader_handle_v2& handle, const std::vector& muts) { return async([&] { - auto reader = make_flat_mutation_reader_from_mutations(muts.front().schema(), semaphore.make_permit(), muts); + auto reader = make_flat_mutation_reader_from_mutations_v2(muts.front().schema(), semaphore.make_permit(), muts); auto close_reader = deferred_close(reader); while (auto mf_opt = reader().get0()) { handle.push(std::move(*mf_opt)).get(); @@ -2493,7 +2494,7 @@ SEASTAR_THREAD_TEST_CASE(test_queue_reader) { auto actual_muts = std::vector{}; actual_muts.reserve(20); - auto p = make_queue_reader(gen.schema(), semaphore.make_permit()); + auto p = make_queue_reader_v2(gen.schema(), semaphore.make_permit()); auto& reader = std::get<0>(p); auto& handle = std::get<1>(p); auto close_reader = deferred_close(reader); @@ -2506,13 +2507,13 @@ SEASTAR_THREAD_TEST_CASE(test_queue_reader) { // abort() -- check that consumer is aborted { - auto p = make_queue_reader(gen.schema(), semaphore.make_permit()); + auto p = make_queue_reader_v2(gen.schema(), semaphore.make_permit()); auto& reader = std::get<0>(p); auto& handle = std::get<1>(p); auto close_reader = deferred_close(reader); auto fill_buffer_fut = reader.fill_buffer(); - auto expected_reader = make_flat_mutation_reader_from_mutations(expected_muts.front().schema(), semaphore.make_permit(), expected_muts); + auto expected_reader = make_flat_mutation_reader_from_mutations_v2(expected_muts.front().schema(), semaphore.make_permit(), expected_muts); auto close_expected_reader = deferred_close(expected_reader); handle.push(std::move(*expected_reader().get0())).get(); @@ -2522,19 +2523,19 @@ SEASTAR_THREAD_TEST_CASE(test_queue_reader) { handle.abort(std::make_exception_ptr(std::runtime_error("error"))); BOOST_REQUIRE_THROW(fill_buffer_fut.get(), std::runtime_error); - BOOST_REQUIRE_THROW(handle.push(mutation_fragment(*gen.schema(), semaphore.make_permit(), partition_end{})).get(), std::runtime_error); + BOOST_REQUIRE_THROW(handle.push(mutation_fragment_v2(*gen.schema(), semaphore.make_permit(), partition_end{})).get(), std::runtime_error); BOOST_REQUIRE(!reader.is_end_of_stream()); } // abort() -- check that producer is aborted { - auto p = make_queue_reader(gen.schema(), semaphore.make_permit()); + auto p = make_queue_reader_v2(gen.schema(), semaphore.make_permit()); auto& reader = std::get<0>(p); auto& handle = std::get<1>(p); auto close_reader = deferred_close(reader); reader.set_max_buffer_size(1); - auto expected_reader = make_flat_mutation_reader_from_mutations(expected_muts.front().schema(), semaphore.make_permit(), expected_muts); + auto expected_reader = make_flat_mutation_reader_from_mutations_v2(expected_muts.front().schema(), semaphore.make_permit(), expected_muts); auto close_expected_reader = deferred_close(expected_reader); auto push_fut = make_ready_future<>(); @@ -2553,7 +2554,7 @@ SEASTAR_THREAD_TEST_CASE(test_queue_reader) { // Detached handle { - auto p = make_queue_reader(gen.schema(), semaphore.make_permit()); + auto p = make_queue_reader_v2(gen.schema(), semaphore.make_permit()); auto& reader = std::get<0>(p); auto& handle = std::get<1>(p); auto fill_buffer_fut = reader.fill_buffer(); @@ -2563,20 +2564,20 @@ SEASTAR_THREAD_TEST_CASE(test_queue_reader) { throwaway_reader.close().get(); } - BOOST_REQUIRE_THROW(handle.push(mutation_fragment(*gen.schema(), semaphore.make_permit(), partition_end{})).get(), std::runtime_error); + BOOST_REQUIRE_THROW(handle.push(mutation_fragment_v2(*gen.schema(), semaphore.make_permit(), partition_end{})).get(), std::runtime_error); BOOST_REQUIRE_THROW(handle.push_end_of_stream(), std::runtime_error); BOOST_REQUIRE_NO_THROW(fill_buffer_fut.get()); } // Abandoned handle aborts, move-assignment { - auto p = make_queue_reader(gen.schema(), semaphore.make_permit()); + auto p = make_queue_reader_v2(gen.schema(), semaphore.make_permit()); auto& reader = std::get<0>(p); auto& handle = std::get<1>(p); auto close_reader = deferred_close(reader); auto fill_buffer_fut = reader.fill_buffer(); - auto expected_reader = make_flat_mutation_reader_from_mutations(expected_muts.front().schema(), semaphore.make_permit(), expected_muts); + auto expected_reader = make_flat_mutation_reader_from_mutations_v2(expected_muts.front().schema(), semaphore.make_permit(), expected_muts); auto close_expected_reader = deferred_close(expected_reader); handle.push(std::move(*expected_reader().get0())).get(); @@ -2584,7 +2585,7 @@ SEASTAR_THREAD_TEST_CASE(test_queue_reader) { BOOST_REQUIRE(!fill_buffer_fut.available()); { - auto p = make_queue_reader(gen.schema(), semaphore.make_permit()); + auto p = make_queue_reader_v2(gen.schema(), semaphore.make_permit()); auto& throwaway_reader = std::get<0>(p); auto& throwaway_handle = std::get<1>(p); auto close_throwaway_reader = deferred_close(throwaway_reader); @@ -2593,18 +2594,18 @@ SEASTAR_THREAD_TEST_CASE(test_queue_reader) { } BOOST_REQUIRE_THROW(fill_buffer_fut.get(), std::runtime_error); - BOOST_REQUIRE_THROW(handle.push(mutation_fragment(*gen.schema(), semaphore.make_permit(), partition_end{})).get(), std::runtime_error); + BOOST_REQUIRE_THROW(handle.push(mutation_fragment_v2(*gen.schema(), semaphore.make_permit(), partition_end{})).get(), std::runtime_error); } // Abandoned handle aborts, destructor { - auto p = make_queue_reader(gen.schema(), semaphore.make_permit()); + auto p = make_queue_reader_v2(gen.schema(), semaphore.make_permit()); auto& reader = std::get<0>(p); auto& handle = std::get<1>(p); auto close_reader = deferred_close(reader); auto fill_buffer_fut = reader.fill_buffer(); - auto expected_reader = make_flat_mutation_reader_from_mutations(expected_muts.front().schema(), semaphore.make_permit(), expected_muts); + auto expected_reader = make_flat_mutation_reader_from_mutations_v2(expected_muts.front().schema(), semaphore.make_permit(), expected_muts); auto close_expected_reader = deferred_close(expected_reader); handle.push(std::move(*expected_reader().get0())).get(); @@ -2613,28 +2614,28 @@ SEASTAR_THREAD_TEST_CASE(test_queue_reader) { { // Destroy handle - queue_reader_handle throwaway_handle(std::move(handle)); + queue_reader_handle_v2 throwaway_handle(std::move(handle)); } BOOST_REQUIRE_THROW(fill_buffer_fut.get(), std::runtime_error); - BOOST_REQUIRE_THROW(handle.push(mutation_fragment(*gen.schema(), semaphore.make_permit(), partition_end{})).get(), std::runtime_error); + BOOST_REQUIRE_THROW(handle.push(mutation_fragment_v2(*gen.schema(), semaphore.make_permit(), partition_end{})).get(), std::runtime_error); } // Life-cycle, relies on ASAN for error reporting { - auto p = make_queue_reader(gen.schema(), semaphore.make_permit()); + auto p = make_queue_reader_v2(gen.schema(), semaphore.make_permit()); auto& reader = std::get<0>(p); auto& handle = std::get<1>(p); auto close_reader = deferred_close(reader); { - auto throwaway_p = make_queue_reader(gen.schema(), semaphore.make_permit()); + auto throwaway_p = make_queue_reader_v2(gen.schema(), semaphore.make_permit()); auto& throwaway_reader = std::get<0>(throwaway_p); auto& throwaway_handle = std::get<1>(throwaway_p); auto close_throwaway_reader = deferred_close(throwaway_reader); // Overwrite handle handle = std::move(throwaway_handle); - auto another_throwaway_p = make_queue_reader(gen.schema(), semaphore.make_permit()); + auto another_throwaway_p = make_queue_reader_v2(gen.schema(), semaphore.make_permit()); auto& another_throwaway_reader = std::get<0>(another_throwaway_p); auto& another_throwaway_handle = std::get<1>(another_throwaway_p); auto close_another_throwaway_reader = deferred_close(another_throwaway_reader); @@ -2643,13 +2644,13 @@ SEASTAR_THREAD_TEST_CASE(test_queue_reader) { another_throwaway_handle = std::move(throwaway_handle); // Overwrite with moved-from handle (move constructor) - queue_reader_handle yet_another_throwaway_handle(std::move(throwaway_handle)); + queue_reader_handle_v2 yet_another_throwaway_handle(std::move(throwaway_handle)); } } // push_end_of_stream() detaches handle from reader, relies on ASAN for error reporting { - auto p = make_queue_reader(gen.schema(), semaphore.make_permit()); + auto p = make_queue_reader_v2(gen.schema(), semaphore.make_permit()); auto& reader = std::get<0>(p); auto& handle = std::get<1>(p); auto close_reader = deferred_close(reader); @@ -3862,9 +3863,9 @@ SEASTAR_THREAD_TEST_CASE(test_clustering_order_merger_in_memory) { auto make_authority = [s = g._s, &semaphore] (std::optional mut, streamed_mutation::forwarding fwd) { auto permit = semaphore.make_permit(); if (mut) { - return make_flat_mutation_reader_from_mutations(s, permit, {std::move(*mut)}, fwd); + return make_flat_mutation_reader_from_mutations_v2(s, permit, {std::move(*mut)}, fwd); } - return make_empty_flat_reader(s, permit); + return make_empty_flat_reader_v2(s, permit); }; auto make_tested = [s = g._s, &semaphore] (std::vector ms, streamed_mutation::forwarding fwd) { @@ -3874,7 +3875,7 @@ SEASTAR_THREAD_TEST_CASE(test_clustering_order_merger_in_memory) { return make_reader_bounds(s, permit, std::move(mb), fwd); })); auto q = std::make_unique(*s, std::move(rs)); - return downgrade_to_v1(make_clustering_combined_reader(s, permit, fwd, std::move(q))); + return make_clustering_combined_reader(s, permit, fwd, std::move(q)); }; auto seed = tests::random::get_int(); @@ -3928,7 +3929,7 @@ static future<> do_test_clustering_order_merger_sstable_set(bool reversed) { } auto make_authority = [&env, &query_schema, &query_slice] (mutation mut, streamed_mutation::forwarding fwd) { - return make_flat_mutation_reader_from_mutations(query_schema, env.make_reader_permit(), {std::move(mut)}, query_slice, fwd); + return make_flat_mutation_reader_from_mutations_v2(query_schema, env.make_reader_permit(), {std::move(mut)}, query_slice, fwd); }; auto pr = dht::partition_range::make_singular(dht::ring_position(dht::decorate_key(*query_schema, g._pk))); @@ -3943,7 +3944,7 @@ static future<> do_test_clustering_order_merger_sstable_set(bool reversed) { }, [included_gens] (const sstable& sst) { return included_gens.contains(sst.generation()); }, pk, query_schema, permit, fwd, reversed); - return downgrade_to_v1(make_clustering_combined_reader(query_schema, permit, fwd, std::move(q))); + return make_clustering_combined_reader(query_schema, permit, fwd, std::move(q)); }; auto seed = tests::random::get_int(); diff --git a/test/lib/mutation_source_test.cc b/test/lib/mutation_source_test.cc index 6db7594195..9d390c7bd5 100644 --- a/test/lib/mutation_source_test.cc +++ b/test/lib/mutation_source_test.cc @@ -2710,6 +2710,35 @@ void compare_readers(const schema& s, flat_mutation_reader authority, flat_mutat } } +static bool compare_readers(const schema& s, flat_mutation_reader_v2& authority, flat_reader_assertions_v2& tested) { + bool empty = true; + while (auto expected = authority().get()) { + tested.produces(s, *expected); + empty = false; + } + tested.produces_end_of_stream(); + return !empty; +} + +void compare_readers(const schema& s, flat_mutation_reader_v2 authority, flat_mutation_reader_v2 tested) { + auto close_authority = deferred_close(authority); + auto assertions = assert_that(std::move(tested)); + compare_readers(s, authority, assertions); +} + +// Assumes that the readers return fragments from (at most) a single (and the same) partition. +void compare_readers(const schema& s, flat_mutation_reader_v2 authority, flat_mutation_reader_v2 tested, const std::vector& fwd_ranges) { + auto close_authority = deferred_close(authority); + auto assertions = assert_that(std::move(tested)); + if (compare_readers(s, authority, assertions)) { + for (auto& r: fwd_ranges) { + authority.fast_forward_to(r).get(); + assertions.fast_forward_to(r); + compare_readers(s, authority, assertions); + } + } +} + mutation forwardable_reader_to_mutation(flat_mutation_reader r, const std::vector& fwd_ranges) { auto close_reader = deferred_close(r); diff --git a/test/lib/mutation_source_test.hh b/test/lib/mutation_source_test.hh index 146f86c2e0..2ce2c2bfd0 100644 --- a/test/lib/mutation_source_test.hh +++ b/test/lib/mutation_source_test.hh @@ -70,6 +70,8 @@ void for_each_schema_change(std::function& fwd_ranges); +void compare_readers(const schema&, flat_mutation_reader_v2 authority, flat_mutation_reader_v2 tested); +void compare_readers(const schema&, flat_mutation_reader_v2 authority, flat_mutation_reader_v2 tested, const std::vector& fwd_ranges); // Forward `r` to each range in `fwd_ranges` and consume all fragments produced by `r` in these ranges. // Build a mutation out of these fragments.