From a46df5af63ca62f3cb53b220b56276ad1394765f Mon Sep 17 00:00:00 2001 From: Petr Gusev Date: Thu, 23 Feb 2023 21:51:26 +0400 Subject: [PATCH 01/12] row_cache: pass partition_start though nonforwardable reader Now the nonforwardable reader unconditionally produces a partition_end, even if the input reader was empty. This is strange in itself, but it also hinders to properly fix its next_partition() method, which is our ultimate goal. So we are going to change this and produce partition_end only if there were some data in the stream. However, this makes a problem: now we pop partition_start from the underlying reader in autoupdating_underlying_reader::move_to_next_partition and manually push it back to downstream readers bypassing nonforwardable reader. This means if we change the logic in nonforwardable reader as described we will end up with partition_start without partition_end in the downstream readers. This patch rectifies this by making sure that nonforwardable will see the initial partition_start. We inject this partition_start just before the nonforwardable reader, into delegating_reader. This also makes the result type of range_populating_reader::operator() a bit simpler, we don't need to pass partition_start anymore. --- row_cache.cc | 24 ++++++++---------------- 1 file changed, 8 insertions(+), 16 deletions(-) diff --git a/row_cache.cc b/row_cache.cc index aa6909f867..7f3c68912c 100644 --- a/row_cache.cc +++ b/row_cache.cc @@ -354,8 +354,9 @@ future<> read_context::create_underlying() { }); } -static flat_mutation_reader_v2 read_directly_from_underlying(read_context& reader) { +static flat_mutation_reader_v2 read_directly_from_underlying(read_context& reader, mutation_fragment_v2 partition_start) { auto res = make_delegating_reader(reader.underlying().underlying()); + res.unpop_mutation_fragment(std::move(partition_start)); res.upgrade_schema(reader.schema()); return make_nonforwardable(std::move(res), true); } @@ -388,8 +389,7 @@ private: }); } else { _cache._tracker.on_mispopulate(); - _reader = read_directly_from_underlying(*_read_context); - this->push_mutation_fragment(std::move(*mfopt)); + _reader = read_directly_from_underlying(*_read_context, std::move(*mfopt)); } }); }); @@ -514,15 +514,13 @@ public: , _read_context(ctx) {} - using read_result = std::tuple; - - future operator()() { + future operator()() { return _reader.move_to_next_partition().then([this] (auto&& mfopt) mutable { { if (!mfopt) { return _cache._read_section(_cache._tracker.region(), [&] { this->handle_end_of_stream(); - return make_ready_future(read_result(std::nullopt, std::nullopt)); + return make_ready_future(std::nullopt); }); } _cache.on_partition_miss(); @@ -533,14 +531,12 @@ public: cache_entry& e = _cache.find_or_create_incomplete(ps, _reader.creation_phase(), this->can_set_continuity() ? &*_last_key : nullptr); _last_key = row_cache::previous_entry_pointer(key); - return make_ready_future( - read_result(e.read(_cache, _read_context, _reader.creation_phase()), std::nullopt)); + return make_ready_future(e.read(_cache, _read_context, _reader.creation_phase())); }); } else { _cache._tracker.on_mispopulate(); _last_key = row_cache::previous_entry_pointer(key); - return make_ready_future( - read_result(read_directly_from_underlying(_read_context), std::move(mfopt))); + return make_ready_future(read_directly_from_underlying(_read_context, std::move(*mfopt))); } } }); @@ -644,12 +640,8 @@ private: } future read_from_secondary() { - return _secondary_reader().then([this] (range_populating_reader::read_result&& res) { - auto&& [fropt, ps] = res; + return _secondary_reader().then([this] (flat_mutation_reader_v2_opt&& fropt) { if (fropt) { - if (ps) { - push_mutation_fragment(std::move(*ps)); - } return make_ready_future(std::move(fropt)); } else { _secondary_in_progress = false; From 9c5c380b0b3e21b1392cb7b724e33763269936b3 Mon Sep 17 00:00:00 2001 From: Petr Gusev Date: Thu, 23 Feb 2023 22:03:23 +0400 Subject: [PATCH 02/12] nonforwardable reader: no partition_end for empty reader The patch introduces the _partition_is_open flag, inject partition_end only if there was some data in the input reader. A simple unit test has been added for the nonforwardable reader which checks this new behaviour. --- readers/mutation_readers.cc | 15 ++++++++++---- test/boost/flat_mutation_reader_test.cc | 26 +++++++++++++++++++++++++ 2 files changed, 37 insertions(+), 4 deletions(-) diff --git a/readers/mutation_readers.cc b/readers/mutation_readers.cc index 282842cc42..eaf1a2b288 100644 --- a/readers/mutation_readers.cc +++ b/readers/mutation_readers.cc @@ -411,15 +411,19 @@ flat_mutation_reader_v2 make_nonforwardable(flat_mutation_reader_v2 r, bool sing flat_mutation_reader_v2 _underlying; bool _single_partition; bool _static_row_done = false; + bool _partition_is_open = false; bool is_end_end_of_underlying_stream() const { return _underlying.is_buffer_empty() && _underlying.is_end_of_stream(); } future<> on_end_of_underlying_stream() { - if (!_static_row_done) { - _static_row_done = true; - return _underlying.fast_forward_to(position_range::all_clustered_rows()); + if (_partition_is_open) { + if (!_static_row_done) { + _static_row_done = true; + return _underlying.fast_forward_to(position_range::all_clustered_rows()); + } + push_mutation_fragment(*_schema, _permit, partition_end()); + _partition_is_open = false; } - push_mutation_fragment(*_schema, _permit, partition_end()); if (_single_partition) { _end_of_stream = true; return make_ready_future<>(); @@ -440,6 +444,9 @@ flat_mutation_reader_v2 make_nonforwardable(flat_mutation_reader_v2 r, bool sing virtual future<> fill_buffer() override { return do_until([this] { return is_end_of_stream() || is_buffer_full(); }, [this] { return fill_buffer_from(_underlying).then([this] (bool underlying_finished) { + if (!_partition_is_open && !is_buffer_empty()) { + _partition_is_open = true; + } if (underlying_finished) { return on_end_of_underlying_stream(); } diff --git a/test/boost/flat_mutation_reader_test.cc b/test/boost/flat_mutation_reader_test.cc index 72e21a0f27..7f75d3902f 100644 --- a/test/boost/flat_mutation_reader_test.cc +++ b/test/boost/flat_mutation_reader_test.cc @@ -38,6 +38,7 @@ #include "readers/from_fragments_v2.hh" #include "readers/forwardable_v2.hh" #include "readers/compacting.hh" +#include "readers/nonforwardable.hh" struct mock_consumer { struct result { @@ -727,6 +728,31 @@ SEASTAR_TEST_CASE(test_make_forwardable) { }); } +SEASTAR_THREAD_TEST_CASE(test_make_nonforwardable) { + simple_schema s; + tests::reader_concurrency_semaphore_wrapper semaphore; + const auto permit = semaphore.make_permit(); + + auto make_reader = [&](std::vector mutations, + bool single_partition, + const dht::partition_range& pr) + { + auto result = make_flat_mutation_reader_from_mutations_v2(s.schema(), + permit, + std::move(mutations), + pr, + streamed_mutation::forwarding::yes); + result = make_nonforwardable(std::move(result), single_partition); + return assert_that(std::move(result)).exact(); + }; + + // no input -> no output + { + auto rd = make_reader({}, false, query::full_partition_range); + rd.produces_end_of_stream(); + } +} + SEASTAR_TEST_CASE(test_abandoned_flat_mutation_reader_from_mutation) { return seastar::async([] { tests::reader_concurrency_semaphore_wrapper semaphore; From 8ff96e1bce11d3e7c1969ab8b1ff84c495ddce4f Mon Sep 17 00:00:00 2001 From: Petr Gusev Date: Thu, 23 Feb 2023 22:18:11 +0400 Subject: [PATCH 03/12] nonforwardable reader: no partition_end after next_partition() Before the patch, nonforwardable reader injected partition_end unconditionally. This caused problems in case next_partition() was called, the downstream reader might have already injected its own partition_end marker, and the one from nonforwardable reader was a duplicate. Fixes: #12249 --- readers/mutation_readers.cc | 2 ++ test/boost/flat_mutation_reader_test.cc | 38 +++++++++++++++++++++++++ 2 files changed, 40 insertions(+) diff --git a/readers/mutation_readers.cc b/readers/mutation_readers.cc index eaf1a2b288..deb5c860ee 100644 --- a/readers/mutation_readers.cc +++ b/readers/mutation_readers.cc @@ -461,6 +461,8 @@ flat_mutation_reader_v2 make_nonforwardable(flat_mutation_reader_v2 r, bool sing clear_buffer_to_next_partition(); auto maybe_next_partition = make_ready_future<>();; if (is_buffer_empty()) { + _partition_is_open = false; + _static_row_done = false; maybe_next_partition = _underlying.next_partition(); } return maybe_next_partition.then([this] { diff --git a/test/boost/flat_mutation_reader_test.cc b/test/boost/flat_mutation_reader_test.cc index 7f75d3902f..0a57face56 100644 --- a/test/boost/flat_mutation_reader_test.cc +++ b/test/boost/flat_mutation_reader_test.cc @@ -746,11 +746,49 @@ SEASTAR_THREAD_TEST_CASE(test_make_nonforwardable) { return assert_that(std::move(result)).exact(); }; + const auto pk1 = s.make_pkey(1); + auto m1 = mutation(s.schema(), pk1); + m1.apply(s.make_row(permit, s.make_ckey(11), "value1")); + + const auto pk2 = s.make_pkey(2); + auto m2 = mutation(s.schema(), pk2); + m2.apply(s.make_row(permit, s.make_ckey(22), "value2")); + + const auto pk3 = s.make_pkey(3); + auto m3 = mutation(s.schema(), pk3); + m3.apply(s.make_row(permit, s.make_ckey(33), "value3")); + + dht::ring_position_comparator cmp{*s.schema()}; + BOOST_CHECK_EQUAL(cmp(m1.decorated_key(), m2.decorated_key()), std::strong_ordering::less); + BOOST_CHECK_EQUAL(cmp(m2.decorated_key(), m3.decorated_key()), std::strong_ordering::less); + // no input -> no output { auto rd = make_reader({}, false, query::full_partition_range); rd.produces_end_of_stream(); } + + // next_partition() + { + auto check = [&] (flat_reader_assertions_v2 rd) { + rd.produces_partition_start(m1.decorated_key(), m1.partition().partition_tombstone()); + rd.next_partition(); + rd.produces_partition_start(m2.decorated_key(), m2.partition().partition_tombstone()); + rd.produces_row_with_key(m2.partition().clustered_rows().begin()->key()); + rd.produces_partition_end(); + rd.produces_end_of_stream(); + }; + + // buffer is not empty + check(make_reader({m1, m2}, false, query::full_partition_range)); + + // buffer is empty + { + auto rd = make_reader({m1, m2}, false, query::full_partition_range); + rd.set_max_buffer_size(1); + check(std::move(rd)); + } + } } SEASTAR_TEST_CASE(test_abandoned_flat_mutation_reader_from_mutation) { From 88cd1c370057d2292bf914038f94f2d190b6a407 Mon Sep 17 00:00:00 2001 From: Petr Gusev Date: Thu, 23 Feb 2023 22:25:45 +0400 Subject: [PATCH 04/12] nonforwardable reader: no partition_end after fast_forward_to() This patch fixes the problem with method fast_forward_to which is similar to the one with next_partition, no partition_end should be injected for the partition if fast_forward_to was called inside it. --- readers/mutation_readers.cc | 2 ++ test/boost/flat_mutation_reader_test.cc | 18 ++++++++++++++++++ 2 files changed, 20 insertions(+) diff --git a/readers/mutation_readers.cc b/readers/mutation_readers.cc index deb5c860ee..e3111eea02 100644 --- a/readers/mutation_readers.cc +++ b/readers/mutation_readers.cc @@ -472,6 +472,8 @@ flat_mutation_reader_v2 make_nonforwardable(flat_mutation_reader_v2 r, bool sing virtual future<> fast_forward_to(const dht::partition_range& pr) override { _end_of_stream = false; clear_buffer(); + _partition_is_open = false; + _static_row_done = false; return _underlying.fast_forward_to(pr); } virtual future<> close() noexcept override { diff --git a/test/boost/flat_mutation_reader_test.cc b/test/boost/flat_mutation_reader_test.cc index 0a57face56..22312d3404 100644 --- a/test/boost/flat_mutation_reader_test.cc +++ b/test/boost/flat_mutation_reader_test.cc @@ -789,6 +789,24 @@ SEASTAR_THREAD_TEST_CASE(test_make_nonforwardable) { check(std::move(rd)); } } + + // fast_forward_to() + { + const auto m1_range = dht::partition_range::make_singular(m1.decorated_key()); + auto rd = make_reader({m1, m2}, false, m1_range); + rd.set_max_buffer_size(1); + + rd.produces_partition_start(m1.decorated_key(), m1.partition().partition_tombstone()); + + const auto m2_range = dht::partition_range::make_singular(m2.decorated_key()); + rd.fast_forward_to(m2_range); + rd.produces_partition_start(m2.decorated_key(), m2.partition().partition_tombstone()); + rd.produces_row_with_key(m2.partition().clustered_rows().begin()->key()); + rd.produces_partition_end(); + + rd.next_partition(); + rd.produces_end_of_stream(); + } } SEASTAR_TEST_CASE(test_abandoned_flat_mutation_reader_from_mutation) { From 023ed0ad003f287a14da493c4a1848ae83d786f6 Mon Sep 17 00:00:00 2001 From: Petr Gusev Date: Thu, 23 Feb 2023 22:29:03 +0400 Subject: [PATCH 05/12] nonforwardable reader: add more tests Add more test cases for completeness. --- test/boost/flat_mutation_reader_test.cc | 37 +++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/test/boost/flat_mutation_reader_test.cc b/test/boost/flat_mutation_reader_test.cc index 22312d3404..4f15dca1ca 100644 --- a/test/boost/flat_mutation_reader_test.cc +++ b/test/boost/flat_mutation_reader_test.cc @@ -807,6 +807,43 @@ SEASTAR_THREAD_TEST_CASE(test_make_nonforwardable) { rd.next_partition(); rd.produces_end_of_stream(); } + + // single_partition + { + auto rd = make_reader({m1, m2}, true, query::full_partition_range); + rd.set_max_buffer_size(1); + + rd.produces_partition_start(m1.decorated_key(), m1.partition().partition_tombstone()); + rd.produces_row_with_key(m1.partition().clustered_rows().begin()->key()); + rd.produces_partition_end(); + rd.produces_end_of_stream(); + + rd.next_partition(); + rd.produces_partition_start(m2.decorated_key(), m2.partition().partition_tombstone()); + rd.produces_row_with_key(m2.partition().clustered_rows().begin()->key()); + rd.produces_partition_end(); + rd.produces_end_of_stream(); + + rd.next_partition(); + rd.produces_end_of_stream(); + } + + // static row + { + s.add_static_row(m1, "test-static"); + const auto m1_range = dht::partition_range::make_singular(m1.decorated_key()); + auto rd = make_reader({m1, m2}, false, m1_range); + rd.set_max_buffer_size(1); + rd.produces_partition_start(m1.decorated_key(), m1.partition().partition_tombstone()); + rd.produces_static_row( + {{s.schema()->get_column_definition(to_bytes("s1")), to_bytes("test-static")}}); + rd.produces_row( + m1.partition().clustered_rows().begin()->key(), + {{s.schema()->get_column_definition(to_bytes("v")), to_bytes("value1")}} + ); + rd.produces_partition_end(); + rd.produces_end_of_stream(); + } } SEASTAR_TEST_CASE(test_abandoned_flat_mutation_reader_from_mutation) { From beeffb899fd8e9ec892016e8ae51e5eafa0d77fb Mon Sep 17 00:00:00 2001 From: Petr Gusev Date: Thu, 23 Feb 2023 22:34:36 +0400 Subject: [PATCH 06/12] nonforwardable reader: refactor, extract reset_partition No observable behaviour changes, just refactor the code. --- readers/mutation_readers.cc | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/readers/mutation_readers.cc b/readers/mutation_readers.cc index e3111eea02..b4f410159c 100644 --- a/readers/mutation_readers.cc +++ b/readers/mutation_readers.cc @@ -422,19 +422,22 @@ flat_mutation_reader_v2 make_nonforwardable(flat_mutation_reader_v2 r, bool sing return _underlying.fast_forward_to(position_range::all_clustered_rows()); } push_mutation_fragment(*_schema, _permit, partition_end()); - _partition_is_open = false; + reset_partition(); } if (_single_partition) { _end_of_stream = true; return make_ready_future<>(); } return _underlying.next_partition().then([this] { - _static_row_done = false; return _underlying.fill_buffer().then([this] { _end_of_stream = is_end_end_of_underlying_stream(); }); }); } + void reset_partition() { + _partition_is_open = false; + _static_row_done = false; + } public: reader(flat_mutation_reader_v2 r, bool single_partition) : impl(r.schema(), r.permit()) @@ -461,8 +464,7 @@ flat_mutation_reader_v2 make_nonforwardable(flat_mutation_reader_v2 r, bool sing clear_buffer_to_next_partition(); auto maybe_next_partition = make_ready_future<>();; if (is_buffer_empty()) { - _partition_is_open = false; - _static_row_done = false; + reset_partition(); maybe_next_partition = _underlying.next_partition(); } return maybe_next_partition.then([this] { @@ -472,8 +474,7 @@ flat_mutation_reader_v2 make_nonforwardable(flat_mutation_reader_v2 r, bool sing virtual future<> fast_forward_to(const dht::partition_range& pr) override { _end_of_stream = false; clear_buffer(); - _partition_is_open = false; - _static_row_done = false; + reset_partition(); return _underlying.fast_forward_to(pr); } virtual future<> close() noexcept override { From a517e1d6ad6a69482089587883e2e989cd9f1d4b Mon Sep 17 00:00:00 2001 From: Petr Gusev Date: Thu, 23 Feb 2023 22:35:43 +0400 Subject: [PATCH 07/12] nonforwardable reader: fix indentation --- readers/mutation_readers.cc | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/readers/mutation_readers.cc b/readers/mutation_readers.cc index b4f410159c..fe767517cc 100644 --- a/readers/mutation_readers.cc +++ b/readers/mutation_readers.cc @@ -428,11 +428,11 @@ flat_mutation_reader_v2 make_nonforwardable(flat_mutation_reader_v2 r, bool sing _end_of_stream = true; return make_ready_future<>(); } - return _underlying.next_partition().then([this] { - return _underlying.fill_buffer().then([this] { - _end_of_stream = is_end_end_of_underlying_stream(); + return _underlying.next_partition().then([this] { + return _underlying.fill_buffer().then([this] { + _end_of_stream = is_end_end_of_underlying_stream(); + }); }); - }); } void reset_partition() { _partition_is_open = false; @@ -467,9 +467,9 @@ flat_mutation_reader_v2 make_nonforwardable(flat_mutation_reader_v2 r, bool sing reset_partition(); maybe_next_partition = _underlying.next_partition(); } - return maybe_next_partition.then([this] { - _end_of_stream = is_end_end_of_underlying_stream(); - }); + return maybe_next_partition.then([this] { + _end_of_stream = is_end_end_of_underlying_stream(); + }); } virtual future<> fast_forward_to(const dht::partition_range& pr) override { _end_of_stream = false; From 64427b9164f7e6a5d5be6a4c883a0d22553a29b6 Mon Sep 17 00:00:00 2001 From: Petr Gusev Date: Thu, 23 Feb 2023 22:52:16 +0400 Subject: [PATCH 08/12] flat_mutation_reader_v2: drop forward_buffer_to This is just a strange method I came across. It effectively does nothing but clear_buffer(). --- db/chained_delegating_reader.hh | 2 +- db/size_estimates_virtual_reader.cc | 2 +- db/view/build_progress_virtual_reader.hh | 2 +- index/built_indexes_virtual_reader.hh | 2 +- readers/combined.cc | 2 +- readers/delegating_v2.hh | 2 +- readers/filtering.hh | 2 +- readers/flat_mutation_reader_v2.hh | 3 +-- readers/multishard.cc | 2 +- readers/mutation_reader.cc | 5 ----- readers/mutation_readers.cc | 6 +++--- sstables/kl/reader.cc | 2 +- sstables/mx/reader.cc | 2 +- 13 files changed, 14 insertions(+), 20 deletions(-) diff --git a/db/chained_delegating_reader.hh b/db/chained_delegating_reader.hh index 489fb272a4..a973400a08 100644 --- a/db/chained_delegating_reader.hh +++ b/db/chained_delegating_reader.hh @@ -59,7 +59,7 @@ public: } _end_of_stream = false; - forward_buffer_to(pr.start()); + clear_buffer(); return _underlying->fast_forward_to(std::move(pr)); } diff --git a/db/size_estimates_virtual_reader.cc b/db/size_estimates_virtual_reader.cc index 4976252b78..5ea2868f26 100644 --- a/db/size_estimates_virtual_reader.cc +++ b/db/size_estimates_virtual_reader.cc @@ -295,7 +295,7 @@ future<> size_estimates_mutation_reader::fast_forward_to(const dht::partition_ra } future<> size_estimates_mutation_reader::fast_forward_to(position_range pr) { - forward_buffer_to(pr.start()); + clear_buffer(); _end_of_stream = false; if (_partition_reader) { return _partition_reader->fast_forward_to(std::move(pr)); diff --git a/db/view/build_progress_virtual_reader.hh b/db/view/build_progress_virtual_reader.hh index ddc90a2f12..d2473c9e6c 100644 --- a/db/view/build_progress_virtual_reader.hh +++ b/db/view/build_progress_virtual_reader.hh @@ -172,7 +172,7 @@ class build_progress_virtual_reader { } virtual future<> fast_forward_to(position_range range) override { - forward_buffer_to(range.start()); + clear_buffer(); _end_of_stream = false; return _underlying.fast_forward_to(std::move(range)); } diff --git a/index/built_indexes_virtual_reader.hh b/index/built_indexes_virtual_reader.hh index 6669cfd9d4..c74e12cb37 100644 --- a/index/built_indexes_virtual_reader.hh +++ b/index/built_indexes_virtual_reader.hh @@ -175,7 +175,7 @@ class built_indexes_virtual_reader { } virtual future<> fast_forward_to(position_range range) override { - forward_buffer_to(range.start()); + clear_buffer(); _end_of_stream = false; // range contains index names (e.g., xyz) but the underlying table // contains view names (e.g., xyz_index) so we need to add the diff --git a/readers/combined.cc b/readers/combined.cc index c795dfdd82..1fb0b25760 100644 --- a/readers/combined.cc +++ b/readers/combined.cc @@ -658,7 +658,7 @@ future<> merging_reader

::fast_forward_to(const dht::partition_range& pr) { template future<> merging_reader

::fast_forward_to(position_range pr) { - forward_buffer_to(pr.start()); + clear_buffer(); _end_of_stream = false; return _merger.fast_forward_to(std::move(pr)); } diff --git a/readers/delegating_v2.hh b/readers/delegating_v2.hh index 810f7c9ba8..cb47de681e 100644 --- a/readers/delegating_v2.hh +++ b/readers/delegating_v2.hh @@ -40,7 +40,7 @@ public: } virtual future<> fast_forward_to(position_range pr) override { _end_of_stream = false; - forward_buffer_to(pr.start()); + clear_buffer(); return _underlying->fast_forward_to(std::move(pr)); } virtual future<> next_partition() override { diff --git a/readers/filtering.hh b/readers/filtering.hh index 331b3611d6..48259b123d 100644 --- a/readers/filtering.hh +++ b/readers/filtering.hh @@ -54,7 +54,7 @@ public: return _rd.fast_forward_to(pr); } virtual future<> fast_forward_to(position_range pr) override { - forward_buffer_to(pr.start()); + clear_buffer(); _end_of_stream = false; return _rd.fast_forward_to(std::move(pr)); } diff --git a/readers/flat_mutation_reader_v2.hh b/readers/flat_mutation_reader_v2.hh index 2180d1fc9d..d8425fae28 100644 --- a/readers/flat_mutation_reader_v2.hh +++ b/readers/flat_mutation_reader_v2.hh @@ -153,7 +153,6 @@ public: void reserve_additional(size_t n) { _buffer.reserve(_buffer.size() + n); } - void forward_buffer_to(const position_in_partition& pos); void clear_buffer_to_next_partition(); template future fill_buffer_from(Source&); @@ -722,7 +721,7 @@ flat_mutation_reader_v2 transform(flat_mutation_reader_v2 r, T t) { return _reader.fast_forward_to(pr); } virtual future<> fast_forward_to(position_range pr) override { - forward_buffer_to(pr.start()); + clear_buffer(); _end_of_stream = false; return _reader.fast_forward_to(std::move(pr)); } diff --git a/readers/multishard.cc b/readers/multishard.cc index f17e0a1f94..b34f78757f 100644 --- a/readers/multishard.cc +++ b/readers/multishard.cc @@ -158,7 +158,7 @@ future<> foreign_reader::fast_forward_to(const dht::partition_range& pr) { } future<> foreign_reader::fast_forward_to(position_range pr) { - forward_buffer_to(pr.start()); + clear_buffer(); _end_of_stream = false; return forward_operation([reader = _reader.get(), pr = std::move(pr)] () { return reader->fast_forward_to(std::move(pr)); diff --git a/readers/mutation_reader.cc b/readers/mutation_reader.cc index d7bf452d82..36b391afb2 100644 --- a/readers/mutation_reader.cc +++ b/readers/mutation_reader.cc @@ -385,11 +385,6 @@ flat_mutation_reader_v2::~flat_mutation_reader_v2() { } } -void flat_mutation_reader_v2::impl::forward_buffer_to(const position_in_partition& pos) { - clear_buffer(); - _buffer_size = compute_buffer_size(*_schema, _buffer); -} - void flat_mutation_reader_v2::impl::clear_buffer_to_next_partition() { auto next_partition_start = std::find_if(_buffer.begin(), _buffer.end(), [] (const mutation_fragment_v2& mf) { return mf.is_partition_start(); diff --git a/readers/mutation_readers.cc b/readers/mutation_readers.cc index fe767517cc..50e84f3e91 100644 --- a/readers/mutation_readers.cc +++ b/readers/mutation_readers.cc @@ -167,7 +167,7 @@ flat_mutation_reader_v2 make_forwardable(flat_mutation_reader_v2 m) { _current = std::move(pr); _end_of_stream = false; _current_has_content = false; - forward_buffer_to(_current.start()); + clear_buffer(); return make_ready_future<>(); } virtual future<> next_partition() override { @@ -267,7 +267,7 @@ flat_mutation_reader_v2 make_slicing_filtering_reader(flat_mutation_reader_v2 rd } virtual future<> fast_forward_to(position_range pr) override { - forward_buffer_to(pr.start()); + clear_buffer(); _end_of_stream = false; return _rd.fast_forward_to(std::move(pr)); } @@ -1544,7 +1544,7 @@ public: return _reader.fast_forward_to(pr); } virtual future<> fast_forward_to(position_range pr) override { - forward_buffer_to(pr.start()); + clear_buffer(); _end_of_stream = false; return _reader.fast_forward_to(std::move(pr)); } diff --git a/sstables/kl/reader.cc b/sstables/kl/reader.cc index 2d8e85560b..b68c7ab969 100644 --- a/sstables/kl/reader.cc +++ b/sstables/kl/reader.cc @@ -1465,7 +1465,7 @@ public: // If _ds is not created then next_partition() has no effect because there was no partition_start emitted yet. } virtual future<> fast_forward_to(position_range cr) override { - forward_buffer_to(cr.start()); + clear_buffer(); if (!_partition_finished) { _end_of_stream = false; return advance_context(_consumer.fast_forward_to(std::move(cr))); diff --git a/sstables/mx/reader.cc b/sstables/mx/reader.cc index fbe0b1452d..bc90290f39 100644 --- a/sstables/mx/reader.cc +++ b/sstables/mx/reader.cc @@ -1653,7 +1653,7 @@ public: // If _ds is not created then next_partition() has no effect because there was no partition_start emitted yet. } virtual future<> fast_forward_to(position_range cr) override { - forward_buffer_to(cr.start()); + clear_buffer(); if (!_partition_finished) { _end_of_stream = false; return advance_context(_consumer.fast_forward_to(std::move(cr))); From a67776b750d163a251363711218e32a5c02a1053 Mon Sep 17 00:00:00 2001 From: Petr Gusev Date: Tue, 28 Feb 2023 22:31:46 +0400 Subject: [PATCH 09/12] make_forwardable: fix next_partition When next_partition is called, the buffer could contain partition_start and possibly static_row. In this case clear_buffer_to_next_partition will not remove anything from the buffer and the reader position should not change. Before this patch, however, we used to set _end_of_stream=false, which violated the forwardable-reader contract - the data of the next partition was emitted after the data of the first partition without intermediate EOS. This bug was found when debugging test_make_nonforwardable_from_mutations_as_mutation_source flakiness. A corresponding focused test_make_forwardable_next_partition has been added to exercise this problem. --- readers/mutation_readers.cc | 5 ++- test/boost/flat_mutation_reader_test.cc | 43 +++++++++++++++++++++++++ 2 files changed, 47 insertions(+), 1 deletion(-) diff --git a/readers/mutation_readers.cc b/readers/mutation_readers.cc index 50e84f3e91..02d3d75897 100644 --- a/readers/mutation_readers.cc +++ b/readers/mutation_readers.cc @@ -171,12 +171,15 @@ flat_mutation_reader_v2 make_forwardable(flat_mutation_reader_v2 m) { return make_ready_future<>(); } virtual future<> next_partition() override { + clear_buffer_to_next_partition(); + if (!is_buffer_empty()) { + co_return; + } _end_of_stream = false; if (!_next || !_next->is_partition_start()) { co_await _underlying.next_partition(); _next = {}; } - clear_buffer_to_next_partition(); _current = { position_in_partition::for_partition_start(), position_in_partition(position_in_partition::after_static_row_tag_t()) diff --git a/test/boost/flat_mutation_reader_test.cc b/test/boost/flat_mutation_reader_test.cc index 4f15dca1ca..e0b16496e1 100644 --- a/test/boost/flat_mutation_reader_test.cc +++ b/test/boost/flat_mutation_reader_test.cc @@ -728,6 +728,49 @@ SEASTAR_TEST_CASE(test_make_forwardable) { }); } +SEASTAR_THREAD_TEST_CASE(test_make_forwardable_next_partition) { + simple_schema s; + tests::reader_concurrency_semaphore_wrapper semaphore; + const auto permit = semaphore.make_permit(); + + auto make_reader = [&](std::vector mutations, const dht::partition_range& pr) { + auto result = make_flat_mutation_reader_from_mutations_v2(s.schema(), + permit, + std::move(mutations), + pr, + streamed_mutation::forwarding::yes); + return assert_that(std::move(result)).exact(); + }; + + const auto pk1 = s.make_pkey(1); + auto m1 = mutation(s.schema(), pk1); + s.add_static_row(m1, "test-static-1"); + + const auto pk2 = s.make_pkey(2); + auto m2 = mutation(s.schema(), pk2); + s.add_static_row(m2, "test-static-2"); + + dht::ring_position_comparator cmp{*s.schema()}; + BOOST_CHECK_EQUAL(cmp(m1.decorated_key(), m2.decorated_key()), std::strong_ordering::less); + + auto rd = make_reader({m1, m2}, query::full_partition_range); + rd.fill_buffer().get(); + rd.next_partition(); + rd.produces_partition_start(m1.decorated_key(), m1.partition().partition_tombstone()); + rd.produces_static_row( + {{s.schema()->get_column_definition(to_bytes("s1")), to_bytes("test-static-1")}}); + rd.produces_end_of_stream(); + + rd.next_partition(); + rd.produces_partition_start(m2.decorated_key(), m2.partition().partition_tombstone()); + rd.produces_static_row( + {{s.schema()->get_column_definition(to_bytes("s1")), to_bytes("test-static-2")}}); + rd.produces_end_of_stream(); + + rd.next_partition(); + rd.produces_end_of_stream(); +} + SEASTAR_THREAD_TEST_CASE(test_make_nonforwardable) { simple_schema s; tests::reader_concurrency_semaphore_wrapper semaphore; From 989ef9d358ab10f446691d96040be2582bd97e9a Mon Sep 17 00:00:00 2001 From: Petr Gusev Date: Mon, 27 Feb 2023 13:58:04 +0400 Subject: [PATCH 10/12] make_nonforwardable: next_partition and fast_forward_to when single_partition is true This flag designates that we should consume only one partition from the underlying reader. This means that attempts to move to another partition should cause an EOS. --- readers/mutation_readers.cc | 12 ++++++++++-- test/boost/flat_mutation_reader_test.cc | 19 +++++++++++++++---- 2 files changed, 25 insertions(+), 6 deletions(-) diff --git a/readers/mutation_readers.cc b/readers/mutation_readers.cc index 02d3d75897..3bcf18c3b7 100644 --- a/readers/mutation_readers.cc +++ b/readers/mutation_readers.cc @@ -465,8 +465,12 @@ flat_mutation_reader_v2 make_nonforwardable(flat_mutation_reader_v2 r, bool sing } virtual future<> next_partition() override { clear_buffer_to_next_partition(); - auto maybe_next_partition = make_ready_future<>();; + auto maybe_next_partition = make_ready_future<>(); if (is_buffer_empty()) { + if (_end_of_stream || (_partition_is_open && _single_partition)) { + _end_of_stream = true; + return maybe_next_partition; + } reset_partition(); maybe_next_partition = _underlying.next_partition(); } @@ -475,9 +479,13 @@ flat_mutation_reader_v2 make_nonforwardable(flat_mutation_reader_v2 r, bool sing }); } virtual future<> fast_forward_to(const dht::partition_range& pr) override { - _end_of_stream = false; clear_buffer(); + if (_single_partition) { + _end_of_stream = true; + return make_ready_future<>(); + } reset_partition(); + _end_of_stream = false; return _underlying.fast_forward_to(pr); } virtual future<> close() noexcept override { diff --git a/test/boost/flat_mutation_reader_test.cc b/test/boost/flat_mutation_reader_test.cc index e0b16496e1..5cc00ae2f8 100644 --- a/test/boost/flat_mutation_reader_test.cc +++ b/test/boost/flat_mutation_reader_test.cc @@ -858,13 +858,24 @@ SEASTAR_THREAD_TEST_CASE(test_make_nonforwardable) { rd.produces_partition_start(m1.decorated_key(), m1.partition().partition_tombstone()); rd.produces_row_with_key(m1.partition().clustered_rows().begin()->key()); - rd.produces_partition_end(); + + rd.next_partition(); rd.produces_end_of_stream(); rd.next_partition(); - rd.produces_partition_start(m2.decorated_key(), m2.partition().partition_tombstone()); - rd.produces_row_with_key(m2.partition().clustered_rows().begin()->key()); - rd.produces_partition_end(); + rd.produces_end_of_stream(); + } + + // single_partition with fast_forward_to + { + const auto m1_range = dht::partition_range::make_singular(m1.decorated_key()); + auto rd = make_reader({m1, m2}, true, m1_range); + rd.set_max_buffer_size(1); + + rd.produces_partition_start(m1.decorated_key(), m1.partition().partition_tombstone()); + + const auto m2_range = dht::partition_range::make_singular(m2.decorated_key()); + rd.fast_forward_to(m2_range); rd.produces_end_of_stream(); rd.next_partition(); From 992ccb62551d30e2fe7daaa9f2771f20c54395fe Mon Sep 17 00:00:00 2001 From: Petr Gusev Date: Mon, 27 Feb 2023 13:35:48 +0400 Subject: [PATCH 11/12] make_nonforwardable: test through run_mutation_source_tests --- test/boost/flat_mutation_reader_test.cc | 29 +++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/test/boost/flat_mutation_reader_test.cc b/test/boost/flat_mutation_reader_test.cc index 5cc00ae2f8..578de460b4 100644 --- a/test/boost/flat_mutation_reader_test.cc +++ b/test/boost/flat_mutation_reader_test.cc @@ -900,6 +900,35 @@ SEASTAR_THREAD_TEST_CASE(test_make_nonforwardable) { } } +SEASTAR_THREAD_TEST_CASE(test_make_nonforwardable_from_mutations_as_mutation_source) { + auto populate = [] (schema_ptr, const std::vector &muts) { + return mutation_source([=] ( + schema_ptr schema, + reader_permit permit, + const dht::partition_range& range, + const query::partition_slice& slice, + const io_priority_class&, + tracing::trace_state_ptr, + streamed_mutation::forwarding fwd_sm, + mutation_reader::forwarding) mutable { + auto squashed_muts = squash_mutations(muts); + const auto single_partition = squashed_muts.size() == 1; + auto reader = make_flat_mutation_reader_from_mutations_v2(schema, + std::move(permit), + std::move(squashed_muts), + range, + slice, + streamed_mutation::forwarding::yes); + reader = make_nonforwardable(std::move(reader), single_partition); + if (fwd_sm) { + reader = make_forwardable(std::move(reader)); + } + return reader; + }); + }; + run_mutation_source_tests(populate); +} + SEASTAR_TEST_CASE(test_abandoned_flat_mutation_reader_from_mutation) { return seastar::async([] { tests::reader_concurrency_semaphore_wrapper semaphore; From 1709a17c380cda437fefcd88459ccb32833b7bc9 Mon Sep 17 00:00:00 2001 From: Petr Gusev Date: Tue, 28 Feb 2023 22:36:12 +0400 Subject: [PATCH 12/12] flat_mutation_reader_test: cleanup, seastar::async -> SEASTAR_THREAD_TEST_CASE --- test/boost/flat_mutation_reader_test.cc | 672 ++++++++++++------------ 1 file changed, 329 insertions(+), 343 deletions(-) diff --git a/test/boost/flat_mutation_reader_test.cc b/test/boost/flat_mutation_reader_test.cc index 578de460b4..f9317a7c24 100644 --- a/test/boost/flat_mutation_reader_test.cc +++ b/test/boost/flat_mutation_reader_test.cc @@ -111,193 +111,187 @@ static size_t count_fragments(mutation m) { return res; } -SEASTAR_TEST_CASE(test_flat_mutation_reader_consume_single_partition) { - return seastar::async([] { - tests::reader_concurrency_semaphore_wrapper semaphore; - for_each_mutation([&] (const mutation& m) { - size_t fragments_in_m = count_fragments(m); - for (size_t depth = 1; depth <= fragments_in_m + 1; ++depth) { - auto r = make_flat_mutation_reader_from_mutations_v2(m.schema(), semaphore.make_permit(), m); - auto close_reader = deferred_close(r); - auto result = r.consume(mock_consumer(*m.schema(), semaphore.make_permit(), depth)).get0(); - BOOST_REQUIRE(result._consume_end_of_stream_called); - BOOST_REQUIRE_EQUAL(1, result._consume_new_partition_call_count); - BOOST_REQUIRE_EQUAL(1, result._consume_end_of_partition_call_count); - BOOST_REQUIRE_EQUAL(m.partition().partition_tombstone() ? 1 : 0, result._consume_tombstone_call_count); - auto r2 = assert_that(make_flat_mutation_reader_from_mutations_v2(m.schema(), semaphore.make_permit(), m)); - r2.produces_partition_start(m.decorated_key(), m.partition().partition_tombstone()); - if (result._fragments.empty()) { - continue; - } - for (auto& mf : result._fragments) { - r2.produces(*m.schema(), mf); - } +SEASTAR_THREAD_TEST_CASE(test_flat_mutation_reader_consume_single_partition) { + tests::reader_concurrency_semaphore_wrapper semaphore; + for_each_mutation([&] (const mutation& m) { + size_t fragments_in_m = count_fragments(m); + for (size_t depth = 1; depth <= fragments_in_m + 1; ++depth) { + auto r = make_flat_mutation_reader_from_mutations_v2(m.schema(), semaphore.make_permit(), m); + auto close_reader = deferred_close(r); + auto result = r.consume(mock_consumer(*m.schema(), semaphore.make_permit(), depth)).get0(); + BOOST_REQUIRE(result._consume_end_of_stream_called); + BOOST_REQUIRE_EQUAL(1, result._consume_new_partition_call_count); + BOOST_REQUIRE_EQUAL(1, result._consume_end_of_partition_call_count); + BOOST_REQUIRE_EQUAL(m.partition().partition_tombstone() ? 1 : 0, result._consume_tombstone_call_count); + auto r2 = assert_that(make_flat_mutation_reader_from_mutations_v2(m.schema(), semaphore.make_permit(), m)); + r2.produces_partition_start(m.decorated_key(), m.partition().partition_tombstone()); + if (result._fragments.empty()) { + continue; } - }); + for (auto& mf : result._fragments) { + r2.produces(*m.schema(), mf); + } + } }); } -SEASTAR_TEST_CASE(test_flat_mutation_reader_consume_two_partitions) { - return seastar::async([] { - tests::reader_concurrency_semaphore_wrapper semaphore; - auto test = [&semaphore] (mutation m1, mutation m2) { - size_t fragments_in_m1 = count_fragments(m1); - size_t fragments_in_m2 = count_fragments(m2); - for (size_t depth = 1; depth < fragments_in_m1; ++depth) { - auto r = make_flat_mutation_reader_from_mutations_v2(m1.schema(), semaphore.make_permit(), {m1, m2}); - auto close_r = deferred_close(r); - auto result = r.consume(mock_consumer(*m1.schema(), semaphore.make_permit(), depth)).get0(); - BOOST_REQUIRE(result._consume_end_of_stream_called); - BOOST_REQUIRE_EQUAL(1, result._consume_new_partition_call_count); - BOOST_REQUIRE_EQUAL(1, result._consume_end_of_partition_call_count); - BOOST_REQUIRE_EQUAL(m1.partition().partition_tombstone() ? 1 : 0, result._consume_tombstone_call_count); - auto r2 = make_flat_mutation_reader_from_mutations_v2(m1.schema(), semaphore.make_permit(), {m1, m2}); - auto close_r2 = deferred_close(r2); - auto start = r2().get0(); - BOOST_REQUIRE(start); - BOOST_REQUIRE(start->is_partition_start()); - for (auto& mf : result._fragments) { - auto mfopt = r2().get0(); - BOOST_REQUIRE(mfopt); - BOOST_REQUIRE(mf.equal(*m1.schema(), *mfopt)); - } +SEASTAR_THREAD_TEST_CASE(test_flat_mutation_reader_consume_two_partitions) { + tests::reader_concurrency_semaphore_wrapper semaphore; + auto test = [&semaphore] (mutation m1, mutation m2) { + size_t fragments_in_m1 = count_fragments(m1); + size_t fragments_in_m2 = count_fragments(m2); + for (size_t depth = 1; depth < fragments_in_m1; ++depth) { + auto r = make_flat_mutation_reader_from_mutations_v2(m1.schema(), semaphore.make_permit(), {m1, m2}); + auto close_r = deferred_close(r); + auto result = r.consume(mock_consumer(*m1.schema(), semaphore.make_permit(), depth)).get0(); + BOOST_REQUIRE(result._consume_end_of_stream_called); + BOOST_REQUIRE_EQUAL(1, result._consume_new_partition_call_count); + BOOST_REQUIRE_EQUAL(1, result._consume_end_of_partition_call_count); + BOOST_REQUIRE_EQUAL(m1.partition().partition_tombstone() ? 1 : 0, result._consume_tombstone_call_count); + auto r2 = make_flat_mutation_reader_from_mutations_v2(m1.schema(), semaphore.make_permit(), {m1, m2}); + auto close_r2 = deferred_close(r2); + auto start = r2().get0(); + BOOST_REQUIRE(start); + BOOST_REQUIRE(start->is_partition_start()); + for (auto& mf : result._fragments) { + auto mfopt = r2().get0(); + BOOST_REQUIRE(mfopt); + BOOST_REQUIRE(mf.equal(*m1.schema(), *mfopt)); } - for (size_t depth = fragments_in_m1; depth < fragments_in_m1 + fragments_in_m2 + 1; ++depth) { - auto r = make_flat_mutation_reader_from_mutations_v2(m1.schema(), semaphore.make_permit(), {m1, m2}); - auto close_r = deferred_close(r); - auto result = r.consume(mock_consumer(*m1.schema(), semaphore.make_permit(), depth)).get0(); - BOOST_REQUIRE(result._consume_end_of_stream_called); - BOOST_REQUIRE_EQUAL(2, result._consume_new_partition_call_count); - BOOST_REQUIRE_EQUAL(2, result._consume_end_of_partition_call_count); - size_t tombstones_count = 0; - if (m1.partition().partition_tombstone()) { - ++tombstones_count; - } - if (m2.partition().partition_tombstone()) { - ++tombstones_count; - } - BOOST_REQUIRE_EQUAL(tombstones_count, result._consume_tombstone_call_count); - auto r2 = make_flat_mutation_reader_from_mutations_v2(m1.schema(), semaphore.make_permit(), {m1, m2}); - auto close_r2 = deferred_close(r2); - auto start = r2().get0(); - BOOST_REQUIRE(start); - BOOST_REQUIRE(start->is_partition_start()); - for (auto& mf : result._fragments) { - auto mfopt = r2().get0(); - BOOST_REQUIRE(mfopt); - if (mfopt->is_partition_start() || mfopt->is_end_of_partition()) { - mfopt = r2().get0(); - } - BOOST_REQUIRE(mfopt); - BOOST_REQUIRE(mf.equal(*m1.schema(), *mfopt)); - } + } + for (size_t depth = fragments_in_m1; depth < fragments_in_m1 + fragments_in_m2 + 1; ++depth) { + auto r = make_flat_mutation_reader_from_mutations_v2(m1.schema(), semaphore.make_permit(), {m1, m2}); + auto close_r = deferred_close(r); + auto result = r.consume(mock_consumer(*m1.schema(), semaphore.make_permit(), depth)).get0(); + BOOST_REQUIRE(result._consume_end_of_stream_called); + BOOST_REQUIRE_EQUAL(2, result._consume_new_partition_call_count); + BOOST_REQUIRE_EQUAL(2, result._consume_end_of_partition_call_count); + size_t tombstones_count = 0; + if (m1.partition().partition_tombstone()) { + ++tombstones_count; } - }; - for_each_mutation_pair([&] (auto&& m, auto&& m2, are_equal) { - if (m.decorated_key().less_compare(*m.schema(), m2.decorated_key())) { - test(m, m2); - } else if (m2.decorated_key().less_compare(*m.schema(), m.decorated_key())) { - test(m2, m); + if (m2.partition().partition_tombstone()) { + ++tombstones_count; } - }); + BOOST_REQUIRE_EQUAL(tombstones_count, result._consume_tombstone_call_count); + auto r2 = make_flat_mutation_reader_from_mutations_v2(m1.schema(), semaphore.make_permit(), {m1, m2}); + auto close_r2 = deferred_close(r2); + auto start = r2().get0(); + BOOST_REQUIRE(start); + BOOST_REQUIRE(start->is_partition_start()); + for (auto& mf : result._fragments) { + auto mfopt = r2().get0(); + BOOST_REQUIRE(mfopt); + if (mfopt->is_partition_start() || mfopt->is_end_of_partition()) { + mfopt = r2().get0(); + } + BOOST_REQUIRE(mfopt); + BOOST_REQUIRE(mf.equal(*m1.schema(), *mfopt)); + } + } + }; + for_each_mutation_pair([&] (auto&& m, auto&& m2, are_equal) { + if (m.decorated_key().less_compare(*m.schema(), m2.decorated_key())) { + test(m, m2); + } else if (m2.decorated_key().less_compare(*m.schema(), m.decorated_key())) { + test(m2, m); + } }); } -SEASTAR_TEST_CASE(test_fragmenting_and_freezing) { - return seastar::async([] { - tests::reader_concurrency_semaphore_wrapper semaphore; - for_each_mutation([&] (const mutation& m) { - std::vector fms; +SEASTAR_THREAD_TEST_CASE(test_fragmenting_and_freezing) { + tests::reader_concurrency_semaphore_wrapper semaphore; + for_each_mutation([&] (const mutation& m) { + std::vector fms; - fragment_and_freeze(make_flat_mutation_reader_from_mutations_v2(m.schema(), semaphore.make_permit(), { mutation(m) }), [&] (auto fm, bool frag) { + fragment_and_freeze(make_flat_mutation_reader_from_mutations_v2(m.schema(), semaphore.make_permit(), { mutation(m) }), [&] (auto fm, bool frag) { + BOOST_REQUIRE(!frag); + fms.emplace_back(std::move(fm)); + return make_ready_future(stop_iteration::no); + }, std::numeric_limits::max()).get0(); + + BOOST_REQUIRE_EQUAL(fms.size(), 1); + + auto m1 = fms.back().unfreeze(m.schema()); + BOOST_REQUIRE_EQUAL(m, m1); + + fms.clear(); + + std::optional fragmented; + fragment_and_freeze(make_flat_mutation_reader_from_mutations_v2(m.schema(), semaphore.make_permit(), { mutation(m) }), [&] (auto fm, bool frag) { + BOOST_REQUIRE(!fragmented || *fragmented == frag); + *fragmented = frag; + fms.emplace_back(std::move(fm)); + return make_ready_future(stop_iteration::no); + }, 1).get0(); + + auto&& rows = m.partition().non_dummy_rows(); + auto expected_fragments = std::distance(rows.begin(), rows.end()) + + m.partition().row_tombstones().size() + + !m.partition().static_row().empty(); + BOOST_REQUIRE_EQUAL(fms.size(), std::max(expected_fragments, size_t(1))); + BOOST_REQUIRE(expected_fragments < 2 || *fragmented); + + auto m2 = fms.back().unfreeze(m.schema()); + fms.pop_back(); + mutation_application_stats app_stats; + while (!fms.empty()) { + m2.partition().apply(*m.schema(), fms.back().partition(), *m.schema(), app_stats); + fms.pop_back(); + } + BOOST_REQUIRE_EQUAL(m, m2); + }); + + auto test_random_streams = [&semaphore] (random_mutation_generator&& gen) { + for (auto i = 0; i < 4; i++) { + auto muts = gen(4); + auto s = muts[0].schema(); + + std::vector frozen; + + // Freeze all + fragment_and_freeze(make_flat_mutation_reader_from_mutations_v2(gen.schema(), semaphore.make_permit(), muts), [&] (auto fm, bool frag) { BOOST_REQUIRE(!frag); - fms.emplace_back(std::move(fm)); + frozen.emplace_back(fm); return make_ready_future(stop_iteration::no); }, std::numeric_limits::max()).get0(); + BOOST_REQUIRE_EQUAL(muts.size(), frozen.size()); + for (auto j = 0u; j < muts.size(); j++) { + BOOST_REQUIRE_EQUAL(muts[j], frozen[j].unfreeze(s)); + } - BOOST_REQUIRE_EQUAL(fms.size(), 1); + // Freeze first + frozen.clear(); + fragment_and_freeze(make_flat_mutation_reader_from_mutations_v2(gen.schema(), semaphore.make_permit(), muts), [&] (auto fm, bool frag) { + BOOST_REQUIRE(!frag); + frozen.emplace_back(fm); + return make_ready_future(stop_iteration::yes); + }, std::numeric_limits::max()).get0(); + BOOST_REQUIRE_EQUAL(frozen.size(), 1); + BOOST_REQUIRE_EQUAL(muts[0], frozen[0].unfreeze(s)); - auto m1 = fms.back().unfreeze(m.schema()); - BOOST_REQUIRE_EQUAL(m, m1); - - fms.clear(); - - std::optional fragmented; - fragment_and_freeze(make_flat_mutation_reader_from_mutations_v2(m.schema(), semaphore.make_permit(), { mutation(m) }), [&] (auto fm, bool frag) { - BOOST_REQUIRE(!fragmented || *fragmented == frag); - *fragmented = frag; - fms.emplace_back(std::move(fm)); + // Fragment and freeze all + frozen.clear(); + fragment_and_freeze(make_flat_mutation_reader_from_mutations_v2(gen.schema(), semaphore.make_permit(), muts), [&] (auto fm, bool frag) { + frozen.emplace_back(fm); return make_ready_future(stop_iteration::no); }, 1).get0(); - - auto&& rows = m.partition().non_dummy_rows(); - auto expected_fragments = std::distance(rows.begin(), rows.end()) - + m.partition().row_tombstones().size() - + !m.partition().static_row().empty(); - BOOST_REQUIRE_EQUAL(fms.size(), std::max(expected_fragments, size_t(1))); - BOOST_REQUIRE(expected_fragments < 2 || *fragmented); - - auto m2 = fms.back().unfreeze(m.schema()); - fms.pop_back(); - mutation_application_stats app_stats; - while (!fms.empty()) { - m2.partition().apply(*m.schema(), fms.back().partition(), *m.schema(), app_stats); - fms.pop_back(); - } - BOOST_REQUIRE_EQUAL(m, m2); - }); - - auto test_random_streams = [&semaphore] (random_mutation_generator&& gen) { - for (auto i = 0; i < 4; i++) { - auto muts = gen(4); - auto s = muts[0].schema(); - - std::vector frozen; - - // Freeze all - fragment_and_freeze(make_flat_mutation_reader_from_mutations_v2(gen.schema(), semaphore.make_permit(), muts), [&] (auto fm, bool frag) { - BOOST_REQUIRE(!frag); - frozen.emplace_back(fm); - return make_ready_future(stop_iteration::no); - }, std::numeric_limits::max()).get0(); - BOOST_REQUIRE_EQUAL(muts.size(), frozen.size()); - for (auto j = 0u; j < muts.size(); j++) { - BOOST_REQUIRE_EQUAL(muts[j], frozen[j].unfreeze(s)); + std::vector unfrozen; + while (!frozen.empty()) { + auto m = frozen.front().unfreeze(s); + frozen.erase(frozen.begin()); + if (unfrozen.empty() || !unfrozen.back().decorated_key().equal(*s, m.decorated_key())) { + unfrozen.emplace_back(std::move(m)); + } else { + unfrozen.back().apply(std::move(m)); } - - // Freeze first - frozen.clear(); - fragment_and_freeze(make_flat_mutation_reader_from_mutations_v2(gen.schema(), semaphore.make_permit(), muts), [&] (auto fm, bool frag) { - BOOST_REQUIRE(!frag); - frozen.emplace_back(fm); - return make_ready_future(stop_iteration::yes); - }, std::numeric_limits::max()).get0(); - BOOST_REQUIRE_EQUAL(frozen.size(), 1); - BOOST_REQUIRE_EQUAL(muts[0], frozen[0].unfreeze(s)); - - // Fragment and freeze all - frozen.clear(); - fragment_and_freeze(make_flat_mutation_reader_from_mutations_v2(gen.schema(), semaphore.make_permit(), muts), [&] (auto fm, bool frag) { - frozen.emplace_back(fm); - return make_ready_future(stop_iteration::no); - }, 1).get0(); - std::vector unfrozen; - while (!frozen.empty()) { - auto m = frozen.front().unfreeze(s); - frozen.erase(frozen.begin()); - if (unfrozen.empty() || !unfrozen.back().decorated_key().equal(*s, m.decorated_key())) { - unfrozen.emplace_back(std::move(m)); - } else { - unfrozen.back().apply(std::move(m)); - } - } - BOOST_REQUIRE_EQUAL(muts, unfrozen); } - }; + BOOST_REQUIRE_EQUAL(muts, unfrozen); + } + }; - test_random_streams(random_mutation_generator(random_mutation_generator::generate_counters::no)); - test_random_streams(random_mutation_generator(random_mutation_generator::generate_counters::yes)); - }); + test_random_streams(random_mutation_generator(random_mutation_generator::generate_counters::no)); + test_random_streams(random_mutation_generator(random_mutation_generator::generate_counters::yes)); } SEASTAR_THREAD_TEST_CASE(test_flat_mutation_reader_move_buffer_content_to) { @@ -372,111 +366,109 @@ SEASTAR_THREAD_TEST_CASE(test_flat_mutation_reader_move_buffer_content_to) { .is_equal_to(mut_orig); } -SEASTAR_TEST_CASE(test_multi_range_reader) { - return seastar::async([] { - simple_schema s; - tests::reader_concurrency_semaphore_wrapper semaphore; - auto permit = semaphore.make_permit(); +SEASTAR_THREAD_TEST_CASE(test_multi_range_reader) { + simple_schema s; + tests::reader_concurrency_semaphore_wrapper semaphore; + auto permit = semaphore.make_permit(); - auto keys = s.make_pkeys(10); - auto ring = s.to_ring_positions(keys); + auto keys = s.make_pkeys(10); + auto ring = s.to_ring_positions(keys); - auto crs = boost::copy_range>(boost::irange(0, 3) | boost::adaptors::transformed([&] (auto n) { - return s.make_row(permit, s.make_ckey(n), "value"); - })); + auto crs = boost::copy_range>(boost::irange(0, 3) | boost::adaptors::transformed([&] (auto n) { + return s.make_row(permit, s.make_ckey(n), "value"); + })); - auto ms = boost::copy_range>(keys | boost::adaptors::transformed([&] (auto& key) { - auto m = mutation(s.schema(), key); - for (auto& mf : crs) { - m.apply(mf); - } - return m; - })); + auto ms = boost::copy_range>(keys | boost::adaptors::transformed([&] (auto& key) { + auto m = mutation(s.schema(), key); + for (auto& mf : crs) { + m.apply(mf); + } + return m; + })); - auto source = mutation_source([&] (schema_ptr, reader_permit permit, const dht::partition_range& range) { - return make_flat_mutation_reader_from_mutations_v2(s.schema(), std::move(permit), ms, range); - }); - - const auto empty_ranges = dht::partition_range_vector{}; - const auto single_ranges = dht::partition_range_vector{ - dht::partition_range::make(ring[1], ring[2]), - }; - const auto multiple_ranges = dht::partition_range_vector { - dht::partition_range::make(ring[1], ring[2]), - dht::partition_range::make_singular(ring[4]), - dht::partition_range::make(ring[6], ring[8]), - }; - const auto empty_generator = [] { return std::optional{}; }; - const auto single_generator = [r = std::optional(single_ranges.front())] () mutable { - return std::exchange(r, {}); - }; - const auto multiple_generator = [it = multiple_ranges.cbegin(), end = multiple_ranges.cend()] () mutable -> std::optional { - if (it == end) { - return std::nullopt; - } - return *(it++); - }; - auto fft_range = dht::partition_range::make_starting_with(ring[9]); - - // Generator ranges are single pass, so we need a new range each time they are used. - auto run_test = [&] (auto make_empty_ranges, auto make_single_ranges, auto make_multiple_ranges) { - testlog.info("empty ranges"); - assert_that(make_flat_multi_range_reader(s.schema(), semaphore.make_permit(), source, make_empty_ranges(), s.schema()->full_slice())) - .produces_end_of_stream() - .fast_forward_to(fft_range) - .produces(ms[9]) - .produces_end_of_stream(); - - testlog.info("single range"); - assert_that(make_flat_multi_range_reader(s.schema(), semaphore.make_permit(), source, make_single_ranges(), s.schema()->full_slice())) - .produces(ms[1]) - .produces(ms[2]) - .produces_end_of_stream() - .fast_forward_to(fft_range) - .produces(ms[9]) - .produces_end_of_stream(); - - testlog.info("read full partitions and fast forward"); - assert_that(make_flat_multi_range_reader(s.schema(), semaphore.make_permit(), source, make_multiple_ranges(), s.schema()->full_slice())) - .produces(ms[1]) - .produces(ms[2]) - .produces(ms[4]) - .produces(ms[6]) - .fast_forward_to(fft_range) - .produces(ms[9]) - .produces_end_of_stream(); - - testlog.info("read, skip partitions and fast forward"); - assert_that(make_flat_multi_range_reader(s.schema(), semaphore.make_permit(), source, make_multiple_ranges(), s.schema()->full_slice())) - .produces_partition_start(keys[1]) - .next_partition() - .produces_partition_start(keys[2]) - .produces_row_with_key(crs[0].as_clustering_row().key()) - .next_partition() - .produces(ms[4]) - .next_partition() - .produces_partition_start(keys[6]) - .produces_row_with_key(crs[0].as_clustering_row().key()) - .produces_row_with_key(crs[1].as_clustering_row().key()) - .fast_forward_to(fft_range) - .next_partition() - .produces_partition_start(keys[9]) - .next_partition() - .produces_end_of_stream(); - }; - - testlog.info("vector version"); - run_test( - [&] { return empty_ranges; }, - [&] { return single_ranges; }, - [&] { return multiple_ranges; }); - - testlog.info("generator version"); - run_test( - [&] { return empty_generator; }, - [&] { return single_generator; }, - [&] { return multiple_generator; }); + auto source = mutation_source([&] (schema_ptr, reader_permit permit, const dht::partition_range& range) { + return make_flat_mutation_reader_from_mutations_v2(s.schema(), std::move(permit), ms, range); }); + + const auto empty_ranges = dht::partition_range_vector{}; + const auto single_ranges = dht::partition_range_vector{ + dht::partition_range::make(ring[1], ring[2]), + }; + const auto multiple_ranges = dht::partition_range_vector { + dht::partition_range::make(ring[1], ring[2]), + dht::partition_range::make_singular(ring[4]), + dht::partition_range::make(ring[6], ring[8]), + }; + const auto empty_generator = [] { return std::optional{}; }; + const auto single_generator = [r = std::optional(single_ranges.front())] () mutable { + return std::exchange(r, {}); + }; + const auto multiple_generator = [it = multiple_ranges.cbegin(), end = multiple_ranges.cend()] () mutable -> std::optional { + if (it == end) { + return std::nullopt; + } + return *(it++); + }; + auto fft_range = dht::partition_range::make_starting_with(ring[9]); + + // Generator ranges are single pass, so we need a new range each time they are used. + auto run_test = [&] (auto make_empty_ranges, auto make_single_ranges, auto make_multiple_ranges) { + testlog.info("empty ranges"); + assert_that(make_flat_multi_range_reader(s.schema(), semaphore.make_permit(), source, make_empty_ranges(), s.schema()->full_slice())) + .produces_end_of_stream() + .fast_forward_to(fft_range) + .produces(ms[9]) + .produces_end_of_stream(); + + testlog.info("single range"); + assert_that(make_flat_multi_range_reader(s.schema(), semaphore.make_permit(), source, make_single_ranges(), s.schema()->full_slice())) + .produces(ms[1]) + .produces(ms[2]) + .produces_end_of_stream() + .fast_forward_to(fft_range) + .produces(ms[9]) + .produces_end_of_stream(); + + testlog.info("read full partitions and fast forward"); + assert_that(make_flat_multi_range_reader(s.schema(), semaphore.make_permit(), source, make_multiple_ranges(), s.schema()->full_slice())) + .produces(ms[1]) + .produces(ms[2]) + .produces(ms[4]) + .produces(ms[6]) + .fast_forward_to(fft_range) + .produces(ms[9]) + .produces_end_of_stream(); + + testlog.info("read, skip partitions and fast forward"); + assert_that(make_flat_multi_range_reader(s.schema(), semaphore.make_permit(), source, make_multiple_ranges(), s.schema()->full_slice())) + .produces_partition_start(keys[1]) + .next_partition() + .produces_partition_start(keys[2]) + .produces_row_with_key(crs[0].as_clustering_row().key()) + .next_partition() + .produces(ms[4]) + .next_partition() + .produces_partition_start(keys[6]) + .produces_row_with_key(crs[0].as_clustering_row().key()) + .produces_row_with_key(crs[1].as_clustering_row().key()) + .fast_forward_to(fft_range) + .next_partition() + .produces_partition_start(keys[9]) + .next_partition() + .produces_end_of_stream(); + }; + + testlog.info("vector version"); + run_test( + [&] { return empty_ranges; }, + [&] { return single_ranges; }, + [&] { return multiple_ranges; }); + + testlog.info("generator version"); + run_test( + [&] { return empty_generator; }, + [&] { return single_generator; }, + [&] { return multiple_generator; }); } using reversed_partitions = seastar::bool_class; @@ -649,83 +641,79 @@ void test_flat_stream(schema_ptr s, std::vector muts, reversed_partiti } } -SEASTAR_TEST_CASE(test_consume_flat) { - return seastar::async([] { - auto test_random_streams = [&] (random_mutation_generator&& gen) { - for (auto i = 0; i < 4; i++) { - auto muts = gen(4); - test_flat_stream(gen.schema(), muts, reversed_partitions::no, in_thread::no); - test_flat_stream(gen.schema(), muts, reversed_partitions::yes, in_thread::no); - test_flat_stream(gen.schema(), muts, reversed_partitions::no, in_thread::yes); - } - }; +SEASTAR_THREAD_TEST_CASE(test_consume_flat) { + auto test_random_streams = [&] (random_mutation_generator&& gen) { + for (auto i = 0; i < 4; i++) { + auto muts = gen(4); + test_flat_stream(gen.schema(), muts, reversed_partitions::no, in_thread::no); + test_flat_stream(gen.schema(), muts, reversed_partitions::yes, in_thread::no); + test_flat_stream(gen.schema(), muts, reversed_partitions::no, in_thread::yes); + } + }; - test_random_streams(random_mutation_generator(random_mutation_generator::generate_counters::no)); - test_random_streams(random_mutation_generator(random_mutation_generator::generate_counters::yes)); - }); + test_random_streams(random_mutation_generator(random_mutation_generator::generate_counters::no)); + test_random_streams(random_mutation_generator(random_mutation_generator::generate_counters::yes)); } -SEASTAR_TEST_CASE(test_make_forwardable) { - return seastar::async([] { - simple_schema s; - tests::reader_concurrency_semaphore_wrapper semaphore; - auto permit = semaphore.make_permit(); +SEASTAR_THREAD_TEST_CASE(test_make_forwardable) { + simple_schema s; + tests::reader_concurrency_semaphore_wrapper semaphore; + auto permit = semaphore.make_permit(); - auto keys = s.make_pkeys(10); + auto keys = s.make_pkeys(10); - auto crs = boost::copy_range < std::vector < - mutation_fragment >> (boost::irange(0, 3) | boost::adaptors::transformed([&](auto n) { - return s.make_row(permit, s.make_ckey(n), "value"); - })); + auto crs = boost::copy_range < std::vector < + mutation_fragment >> (boost::irange(0, 3) | boost::adaptors::transformed([&](auto n) { + return s.make_row(permit, s.make_ckey(n), "value"); + })); - auto ms = boost::copy_range < std::vector < mutation >> (keys | boost::adaptors::transformed([&](auto &key) { - auto m = mutation(s.schema(), key); - for (auto &mf : crs) { - m.apply(mf); - } - return m; - })); - - auto make_reader = [&] (auto& range) { - return assert_that( - make_forwardable(make_flat_mutation_reader_from_mutations_v2(s.schema(), semaphore.make_permit(), ms, range, streamed_mutation::forwarding::no))); - }; - - auto test = [&] (auto& rd, auto& partition) { - rd.produces_partition_start(partition.decorated_key(), partition.partition().partition_tombstone()); - rd.produces_end_of_stream(); - rd.fast_forward_to(position_range::all_clustered_rows()); - for (auto &row : partition.partition().clustered_rows()) { - rd.produces_row_with_key(row.key()); - } - rd.produces_end_of_stream(); - rd.next_partition(); - }; - - auto rd = make_reader(query::full_partition_range); - - for (auto& partition : ms) { - test(rd, partition); + auto ms = boost::copy_range < std::vector < mutation >> (keys | boost::adaptors::transformed([&](auto &key) { + auto m = mutation(s.schema(), key); + for (auto &mf : crs) { + m.apply(mf); } + return m; + })); - auto single_range = dht::partition_range::make_singular(ms[0].decorated_key()); + auto make_reader = [&] (auto& range) { + return assert_that( + make_forwardable(make_flat_mutation_reader_from_mutations_v2(s.schema(), semaphore.make_permit(), ms, range, streamed_mutation::forwarding::no))); + }; - auto rd2 = make_reader(single_range); - - rd2.produces_partition_start(ms[0].decorated_key(), ms[0].partition().partition_tombstone()); - rd2.produces_end_of_stream(); - rd2.fast_forward_to(position_range::all_clustered_rows()); - rd2.produces_row_with_key(ms[0].partition().clustered_rows().begin()->key()); - rd2.produces_row_with_key(std::next(ms[0].partition().clustered_rows().begin())->key()); - - auto remaining_range = dht::partition_range::make_starting_with({ms[0].decorated_key(), false}); - - rd2.fast_forward_to(remaining_range); - - for (auto i = size_t(1); i < ms.size(); ++i) { - test(rd2, ms[i]); + auto test = [&] (auto& rd, auto& partition) { + rd.produces_partition_start(partition.decorated_key(), partition.partition().partition_tombstone()); + rd.produces_end_of_stream(); + rd.fast_forward_to(position_range::all_clustered_rows()); + for (auto &row : partition.partition().clustered_rows()) { + rd.produces_row_with_key(row.key()); } - }); + rd.produces_end_of_stream(); + rd.next_partition(); + }; + + auto rd = make_reader(query::full_partition_range); + + for (auto& partition : ms) { + test(rd, partition); + } + + auto single_range = dht::partition_range::make_singular(ms[0].decorated_key()); + + auto rd2 = make_reader(single_range); + + rd2.produces_partition_start(ms[0].decorated_key(), ms[0].partition().partition_tombstone()); + rd2.produces_end_of_stream(); + rd2.fast_forward_to(position_range::all_clustered_rows()); + rd2.produces_row_with_key(ms[0].partition().clustered_rows().begin()->key()); + rd2.produces_row_with_key(std::next(ms[0].partition().clustered_rows().begin())->key()); + + auto remaining_range = dht::partition_range::make_starting_with({ms[0].decorated_key(), false}); + + rd2.fast_forward_to(remaining_range); + + for (auto i = size_t(1); i < ms.size(); ++i) { + test(rd2, ms[i]); + } } SEASTAR_THREAD_TEST_CASE(test_make_forwardable_next_partition) { @@ -929,16 +917,14 @@ SEASTAR_THREAD_TEST_CASE(test_make_nonforwardable_from_mutations_as_mutation_sou run_mutation_source_tests(populate); } -SEASTAR_TEST_CASE(test_abandoned_flat_mutation_reader_from_mutation) { - return seastar::async([] { - tests::reader_concurrency_semaphore_wrapper semaphore; - for_each_mutation([&] (const mutation& m) { - auto rd = make_flat_mutation_reader_from_mutations_v2(m.schema(), semaphore.make_permit(), {mutation(m)}); - auto close_rd = deferred_close(rd); - rd().get(); - rd().get(); - // We rely on AddressSanitizer telling us if nothing was leaked. - }); +SEASTAR_THREAD_TEST_CASE(test_abandoned_flat_mutation_reader_from_mutation) { + tests::reader_concurrency_semaphore_wrapper semaphore; + for_each_mutation([&] (const mutation& m) { + auto rd = make_flat_mutation_reader_from_mutations_v2(m.schema(), semaphore.make_permit(), {mutation(m)}); + auto close_rd = deferred_close(rd); + rd().get(); + rd().get(); + // We rely on AddressSanitizer telling us if nothing was leaked. }); }