diff --git a/db/chained_delegating_reader.hh b/db/chained_delegating_reader.hh
index 489fb272a4..a973400a08 100644
--- a/db/chained_delegating_reader.hh
+++ b/db/chained_delegating_reader.hh
@@ -59,7 +59,7 @@ public:
}
_end_of_stream = false;
- forward_buffer_to(pr.start());
+ clear_buffer();
return _underlying->fast_forward_to(std::move(pr));
}
diff --git a/db/size_estimates_virtual_reader.cc b/db/size_estimates_virtual_reader.cc
index 4976252b78..5ea2868f26 100644
--- a/db/size_estimates_virtual_reader.cc
+++ b/db/size_estimates_virtual_reader.cc
@@ -295,7 +295,7 @@ future<> size_estimates_mutation_reader::fast_forward_to(const dht::partition_ra
}
future<> size_estimates_mutation_reader::fast_forward_to(position_range pr) {
- forward_buffer_to(pr.start());
+ clear_buffer();
_end_of_stream = false;
if (_partition_reader) {
return _partition_reader->fast_forward_to(std::move(pr));
diff --git a/db/view/build_progress_virtual_reader.hh b/db/view/build_progress_virtual_reader.hh
index ddc90a2f12..d2473c9e6c 100644
--- a/db/view/build_progress_virtual_reader.hh
+++ b/db/view/build_progress_virtual_reader.hh
@@ -172,7 +172,7 @@ class build_progress_virtual_reader {
}
virtual future<> fast_forward_to(position_range range) override {
- forward_buffer_to(range.start());
+ clear_buffer();
_end_of_stream = false;
return _underlying.fast_forward_to(std::move(range));
}
diff --git a/index/built_indexes_virtual_reader.hh b/index/built_indexes_virtual_reader.hh
index 6669cfd9d4..c74e12cb37 100644
--- a/index/built_indexes_virtual_reader.hh
+++ b/index/built_indexes_virtual_reader.hh
@@ -175,7 +175,7 @@ class built_indexes_virtual_reader {
}
virtual future<> fast_forward_to(position_range range) override {
- forward_buffer_to(range.start());
+ clear_buffer();
_end_of_stream = false;
// range contains index names (e.g., xyz) but the underlying table
// contains view names (e.g., xyz_index) so we need to add the
diff --git a/readers/combined.cc b/readers/combined.cc
index 9a320da8b1..cf676e9f4c 100644
--- a/readers/combined.cc
+++ b/readers/combined.cc
@@ -658,7 +658,7 @@ future<> merging_reader
::fast_forward_to(const dht::partition_range& pr) {
template
future<> merging_reader::fast_forward_to(position_range pr) {
- forward_buffer_to(pr.start());
+ clear_buffer();
_end_of_stream = false;
return _merger.fast_forward_to(std::move(pr));
}
diff --git a/readers/delegating_v2.hh b/readers/delegating_v2.hh
index 810f7c9ba8..cb47de681e 100644
--- a/readers/delegating_v2.hh
+++ b/readers/delegating_v2.hh
@@ -40,7 +40,7 @@ public:
}
virtual future<> fast_forward_to(position_range pr) override {
_end_of_stream = false;
- forward_buffer_to(pr.start());
+ clear_buffer();
return _underlying->fast_forward_to(std::move(pr));
}
virtual future<> next_partition() override {
diff --git a/readers/filtering.hh b/readers/filtering.hh
index 331b3611d6..48259b123d 100644
--- a/readers/filtering.hh
+++ b/readers/filtering.hh
@@ -54,7 +54,7 @@ public:
return _rd.fast_forward_to(pr);
}
virtual future<> fast_forward_to(position_range pr) override {
- forward_buffer_to(pr.start());
+ clear_buffer();
_end_of_stream = false;
return _rd.fast_forward_to(std::move(pr));
}
diff --git a/readers/flat_mutation_reader_v2.hh b/readers/flat_mutation_reader_v2.hh
index 2180d1fc9d..d8425fae28 100644
--- a/readers/flat_mutation_reader_v2.hh
+++ b/readers/flat_mutation_reader_v2.hh
@@ -153,7 +153,6 @@ public:
void reserve_additional(size_t n) {
_buffer.reserve(_buffer.size() + n);
}
- void forward_buffer_to(const position_in_partition& pos);
void clear_buffer_to_next_partition();
template
future fill_buffer_from(Source&);
@@ -722,7 +721,7 @@ flat_mutation_reader_v2 transform(flat_mutation_reader_v2 r, T t) {
return _reader.fast_forward_to(pr);
}
virtual future<> fast_forward_to(position_range pr) override {
- forward_buffer_to(pr.start());
+ clear_buffer();
_end_of_stream = false;
return _reader.fast_forward_to(std::move(pr));
}
diff --git a/readers/multishard.cc b/readers/multishard.cc
index f17e0a1f94..b34f78757f 100644
--- a/readers/multishard.cc
+++ b/readers/multishard.cc
@@ -158,7 +158,7 @@ future<> foreign_reader::fast_forward_to(const dht::partition_range& pr) {
}
future<> foreign_reader::fast_forward_to(position_range pr) {
- forward_buffer_to(pr.start());
+ clear_buffer();
_end_of_stream = false;
return forward_operation([reader = _reader.get(), pr = std::move(pr)] () {
return reader->fast_forward_to(std::move(pr));
diff --git a/readers/mutation_reader.cc b/readers/mutation_reader.cc
index d7bf452d82..36b391afb2 100644
--- a/readers/mutation_reader.cc
+++ b/readers/mutation_reader.cc
@@ -385,11 +385,6 @@ flat_mutation_reader_v2::~flat_mutation_reader_v2() {
}
}
-void flat_mutation_reader_v2::impl::forward_buffer_to(const position_in_partition& pos) {
- clear_buffer();
- _buffer_size = compute_buffer_size(*_schema, _buffer);
-}
-
void flat_mutation_reader_v2::impl::clear_buffer_to_next_partition() {
auto next_partition_start = std::find_if(_buffer.begin(), _buffer.end(), [] (const mutation_fragment_v2& mf) {
return mf.is_partition_start();
diff --git a/readers/mutation_readers.cc b/readers/mutation_readers.cc
index 282842cc42..3bcf18c3b7 100644
--- a/readers/mutation_readers.cc
+++ b/readers/mutation_readers.cc
@@ -167,16 +167,19 @@ flat_mutation_reader_v2 make_forwardable(flat_mutation_reader_v2 m) {
_current = std::move(pr);
_end_of_stream = false;
_current_has_content = false;
- forward_buffer_to(_current.start());
+ clear_buffer();
return make_ready_future<>();
}
virtual future<> next_partition() override {
+ clear_buffer_to_next_partition();
+ if (!is_buffer_empty()) {
+ co_return;
+ }
_end_of_stream = false;
if (!_next || !_next->is_partition_start()) {
co_await _underlying.next_partition();
_next = {};
}
- clear_buffer_to_next_partition();
_current = {
position_in_partition::for_partition_start(),
position_in_partition(position_in_partition::after_static_row_tag_t())
@@ -267,7 +270,7 @@ flat_mutation_reader_v2 make_slicing_filtering_reader(flat_mutation_reader_v2 rd
}
virtual future<> fast_forward_to(position_range pr) override {
- forward_buffer_to(pr.start());
+ clear_buffer();
_end_of_stream = false;
return _rd.fast_forward_to(std::move(pr));
}
@@ -411,25 +414,32 @@ flat_mutation_reader_v2 make_nonforwardable(flat_mutation_reader_v2 r, bool sing
flat_mutation_reader_v2 _underlying;
bool _single_partition;
bool _static_row_done = false;
+ bool _partition_is_open = false;
bool is_end_end_of_underlying_stream() const {
return _underlying.is_buffer_empty() && _underlying.is_end_of_stream();
}
future<> on_end_of_underlying_stream() {
- if (!_static_row_done) {
- _static_row_done = true;
- return _underlying.fast_forward_to(position_range::all_clustered_rows());
+ if (_partition_is_open) {
+ if (!_static_row_done) {
+ _static_row_done = true;
+ return _underlying.fast_forward_to(position_range::all_clustered_rows());
+ }
+ push_mutation_fragment(*_schema, _permit, partition_end());
+ reset_partition();
}
- push_mutation_fragment(*_schema, _permit, partition_end());
if (_single_partition) {
_end_of_stream = true;
return make_ready_future<>();
}
- return _underlying.next_partition().then([this] {
- _static_row_done = false;
- return _underlying.fill_buffer().then([this] {
- _end_of_stream = is_end_end_of_underlying_stream();
+ return _underlying.next_partition().then([this] {
+ return _underlying.fill_buffer().then([this] {
+ _end_of_stream = is_end_end_of_underlying_stream();
+ });
});
- });
+ }
+ void reset_partition() {
+ _partition_is_open = false;
+ _static_row_done = false;
}
public:
reader(flat_mutation_reader_v2 r, bool single_partition)
@@ -440,6 +450,9 @@ flat_mutation_reader_v2 make_nonforwardable(flat_mutation_reader_v2 r, bool sing
virtual future<> fill_buffer() override {
return do_until([this] { return is_end_of_stream() || is_buffer_full(); }, [this] {
return fill_buffer_from(_underlying).then([this] (bool underlying_finished) {
+ if (!_partition_is_open && !is_buffer_empty()) {
+ _partition_is_open = true;
+ }
if (underlying_finished) {
return on_end_of_underlying_stream();
}
@@ -452,17 +465,27 @@ flat_mutation_reader_v2 make_nonforwardable(flat_mutation_reader_v2 r, bool sing
}
virtual future<> next_partition() override {
clear_buffer_to_next_partition();
- auto maybe_next_partition = make_ready_future<>();;
+ auto maybe_next_partition = make_ready_future<>();
if (is_buffer_empty()) {
+ if (_end_of_stream || (_partition_is_open && _single_partition)) {
+ _end_of_stream = true;
+ return maybe_next_partition;
+ }
+ reset_partition();
maybe_next_partition = _underlying.next_partition();
}
- return maybe_next_partition.then([this] {
- _end_of_stream = is_end_end_of_underlying_stream();
- });
+ return maybe_next_partition.then([this] {
+ _end_of_stream = is_end_end_of_underlying_stream();
+ });
}
virtual future<> fast_forward_to(const dht::partition_range& pr) override {
- _end_of_stream = false;
clear_buffer();
+ if (_single_partition) {
+ _end_of_stream = true;
+ return make_ready_future<>();
+ }
+ reset_partition();
+ _end_of_stream = false;
return _underlying.fast_forward_to(pr);
}
virtual future<> close() noexcept override {
@@ -1532,7 +1555,7 @@ public:
return _reader.fast_forward_to(pr);
}
virtual future<> fast_forward_to(position_range pr) override {
- forward_buffer_to(pr.start());
+ clear_buffer();
_end_of_stream = false;
return _reader.fast_forward_to(std::move(pr));
}
diff --git a/row_cache.cc b/row_cache.cc
index aa6909f867..7f3c68912c 100644
--- a/row_cache.cc
+++ b/row_cache.cc
@@ -354,8 +354,9 @@ future<> read_context::create_underlying() {
});
}
-static flat_mutation_reader_v2 read_directly_from_underlying(read_context& reader) {
+static flat_mutation_reader_v2 read_directly_from_underlying(read_context& reader, mutation_fragment_v2 partition_start) {
auto res = make_delegating_reader(reader.underlying().underlying());
+ res.unpop_mutation_fragment(std::move(partition_start));
res.upgrade_schema(reader.schema());
return make_nonforwardable(std::move(res), true);
}
@@ -388,8 +389,7 @@ private:
});
} else {
_cache._tracker.on_mispopulate();
- _reader = read_directly_from_underlying(*_read_context);
- this->push_mutation_fragment(std::move(*mfopt));
+ _reader = read_directly_from_underlying(*_read_context, std::move(*mfopt));
}
});
});
@@ -514,15 +514,13 @@ public:
, _read_context(ctx)
{}
- using read_result = std::tuple;
-
- future operator()() {
+ future operator()() {
return _reader.move_to_next_partition().then([this] (auto&& mfopt) mutable {
{
if (!mfopt) {
return _cache._read_section(_cache._tracker.region(), [&] {
this->handle_end_of_stream();
- return make_ready_future(read_result(std::nullopt, std::nullopt));
+ return make_ready_future(std::nullopt);
});
}
_cache.on_partition_miss();
@@ -533,14 +531,12 @@ public:
cache_entry& e = _cache.find_or_create_incomplete(ps, _reader.creation_phase(),
this->can_set_continuity() ? &*_last_key : nullptr);
_last_key = row_cache::previous_entry_pointer(key);
- return make_ready_future(
- read_result(e.read(_cache, _read_context, _reader.creation_phase()), std::nullopt));
+ return make_ready_future(e.read(_cache, _read_context, _reader.creation_phase()));
});
} else {
_cache._tracker.on_mispopulate();
_last_key = row_cache::previous_entry_pointer(key);
- return make_ready_future(
- read_result(read_directly_from_underlying(_read_context), std::move(mfopt)));
+ return make_ready_future(read_directly_from_underlying(_read_context, std::move(*mfopt)));
}
}
});
@@ -644,12 +640,8 @@ private:
}
future read_from_secondary() {
- return _secondary_reader().then([this] (range_populating_reader::read_result&& res) {
- auto&& [fropt, ps] = res;
+ return _secondary_reader().then([this] (flat_mutation_reader_v2_opt&& fropt) {
if (fropt) {
- if (ps) {
- push_mutation_fragment(std::move(*ps));
- }
return make_ready_future(std::move(fropt));
} else {
_secondary_in_progress = false;
diff --git a/sstables/kl/reader.cc b/sstables/kl/reader.cc
index 2d8e85560b..b68c7ab969 100644
--- a/sstables/kl/reader.cc
+++ b/sstables/kl/reader.cc
@@ -1465,7 +1465,7 @@ public:
// If _ds is not created then next_partition() has no effect because there was no partition_start emitted yet.
}
virtual future<> fast_forward_to(position_range cr) override {
- forward_buffer_to(cr.start());
+ clear_buffer();
if (!_partition_finished) {
_end_of_stream = false;
return advance_context(_consumer.fast_forward_to(std::move(cr)));
diff --git a/sstables/mx/reader.cc b/sstables/mx/reader.cc
index fbe0b1452d..bc90290f39 100644
--- a/sstables/mx/reader.cc
+++ b/sstables/mx/reader.cc
@@ -1653,7 +1653,7 @@ public:
// If _ds is not created then next_partition() has no effect because there was no partition_start emitted yet.
}
virtual future<> fast_forward_to(position_range cr) override {
- forward_buffer_to(cr.start());
+ clear_buffer();
if (!_partition_finished) {
_end_of_stream = false;
return advance_context(_consumer.fast_forward_to(std::move(cr)));
diff --git a/test/boost/flat_mutation_reader_test.cc b/test/boost/flat_mutation_reader_test.cc
index 72271aadde..59c154221a 100644
--- a/test/boost/flat_mutation_reader_test.cc
+++ b/test/boost/flat_mutation_reader_test.cc
@@ -38,6 +38,7 @@
#include "readers/from_fragments_v2.hh"
#include "readers/forwardable_v2.hh"
#include "readers/compacting.hh"
+#include "readers/nonforwardable.hh"
struct mock_consumer {
struct result {
@@ -110,193 +111,187 @@ static size_t count_fragments(mutation m) {
return res;
}
-SEASTAR_TEST_CASE(test_flat_mutation_reader_consume_single_partition) {
- return seastar::async([] {
- tests::reader_concurrency_semaphore_wrapper semaphore;
- for_each_mutation([&] (const mutation& m) {
- size_t fragments_in_m = count_fragments(m);
- for (size_t depth = 1; depth <= fragments_in_m + 1; ++depth) {
- auto r = make_flat_mutation_reader_from_mutations_v2(m.schema(), semaphore.make_permit(), m);
- auto close_reader = deferred_close(r);
- auto result = r.consume(mock_consumer(*m.schema(), semaphore.make_permit(), depth)).get0();
- BOOST_REQUIRE(result._consume_end_of_stream_called);
- BOOST_REQUIRE_EQUAL(1, result._consume_new_partition_call_count);
- BOOST_REQUIRE_EQUAL(1, result._consume_end_of_partition_call_count);
- BOOST_REQUIRE_EQUAL(m.partition().partition_tombstone() ? 1 : 0, result._consume_tombstone_call_count);
- auto r2 = assert_that(make_flat_mutation_reader_from_mutations_v2(m.schema(), semaphore.make_permit(), m));
- r2.produces_partition_start(m.decorated_key(), m.partition().partition_tombstone());
- if (result._fragments.empty()) {
- continue;
- }
- for (auto& mf : result._fragments) {
- r2.produces(*m.schema(), mf);
- }
+SEASTAR_THREAD_TEST_CASE(test_flat_mutation_reader_consume_single_partition) {
+ tests::reader_concurrency_semaphore_wrapper semaphore;
+ for_each_mutation([&] (const mutation& m) {
+ size_t fragments_in_m = count_fragments(m);
+ for (size_t depth = 1; depth <= fragments_in_m + 1; ++depth) {
+ auto r = make_flat_mutation_reader_from_mutations_v2(m.schema(), semaphore.make_permit(), m);
+ auto close_reader = deferred_close(r);
+ auto result = r.consume(mock_consumer(*m.schema(), semaphore.make_permit(), depth)).get0();
+ BOOST_REQUIRE(result._consume_end_of_stream_called);
+ BOOST_REQUIRE_EQUAL(1, result._consume_new_partition_call_count);
+ BOOST_REQUIRE_EQUAL(1, result._consume_end_of_partition_call_count);
+ BOOST_REQUIRE_EQUAL(m.partition().partition_tombstone() ? 1 : 0, result._consume_tombstone_call_count);
+ auto r2 = assert_that(make_flat_mutation_reader_from_mutations_v2(m.schema(), semaphore.make_permit(), m));
+ r2.produces_partition_start(m.decorated_key(), m.partition().partition_tombstone());
+ if (result._fragments.empty()) {
+ continue;
}
- });
+ for (auto& mf : result._fragments) {
+ r2.produces(*m.schema(), mf);
+ }
+ }
});
}
-SEASTAR_TEST_CASE(test_flat_mutation_reader_consume_two_partitions) {
- return seastar::async([] {
- tests::reader_concurrency_semaphore_wrapper semaphore;
- auto test = [&semaphore] (mutation m1, mutation m2) {
- size_t fragments_in_m1 = count_fragments(m1);
- size_t fragments_in_m2 = count_fragments(m2);
- for (size_t depth = 1; depth < fragments_in_m1; ++depth) {
- auto r = make_flat_mutation_reader_from_mutations_v2(m1.schema(), semaphore.make_permit(), {m1, m2});
- auto close_r = deferred_close(r);
- auto result = r.consume(mock_consumer(*m1.schema(), semaphore.make_permit(), depth)).get0();
- BOOST_REQUIRE(result._consume_end_of_stream_called);
- BOOST_REQUIRE_EQUAL(1, result._consume_new_partition_call_count);
- BOOST_REQUIRE_EQUAL(1, result._consume_end_of_partition_call_count);
- BOOST_REQUIRE_EQUAL(m1.partition().partition_tombstone() ? 1 : 0, result._consume_tombstone_call_count);
- auto r2 = make_flat_mutation_reader_from_mutations_v2(m1.schema(), semaphore.make_permit(), {m1, m2});
- auto close_r2 = deferred_close(r2);
- auto start = r2().get0();
- BOOST_REQUIRE(start);
- BOOST_REQUIRE(start->is_partition_start());
- for (auto& mf : result._fragments) {
- auto mfopt = r2().get0();
- BOOST_REQUIRE(mfopt);
- BOOST_REQUIRE(mf.equal(*m1.schema(), *mfopt));
- }
+SEASTAR_THREAD_TEST_CASE(test_flat_mutation_reader_consume_two_partitions) {
+ tests::reader_concurrency_semaphore_wrapper semaphore;
+ auto test = [&semaphore] (mutation m1, mutation m2) {
+ size_t fragments_in_m1 = count_fragments(m1);
+ size_t fragments_in_m2 = count_fragments(m2);
+ for (size_t depth = 1; depth < fragments_in_m1; ++depth) {
+ auto r = make_flat_mutation_reader_from_mutations_v2(m1.schema(), semaphore.make_permit(), {m1, m2});
+ auto close_r = deferred_close(r);
+ auto result = r.consume(mock_consumer(*m1.schema(), semaphore.make_permit(), depth)).get0();
+ BOOST_REQUIRE(result._consume_end_of_stream_called);
+ BOOST_REQUIRE_EQUAL(1, result._consume_new_partition_call_count);
+ BOOST_REQUIRE_EQUAL(1, result._consume_end_of_partition_call_count);
+ BOOST_REQUIRE_EQUAL(m1.partition().partition_tombstone() ? 1 : 0, result._consume_tombstone_call_count);
+ auto r2 = make_flat_mutation_reader_from_mutations_v2(m1.schema(), semaphore.make_permit(), {m1, m2});
+ auto close_r2 = deferred_close(r2);
+ auto start = r2().get0();
+ BOOST_REQUIRE(start);
+ BOOST_REQUIRE(start->is_partition_start());
+ for (auto& mf : result._fragments) {
+ auto mfopt = r2().get0();
+ BOOST_REQUIRE(mfopt);
+ BOOST_REQUIRE(mf.equal(*m1.schema(), *mfopt));
}
- for (size_t depth = fragments_in_m1; depth < fragments_in_m1 + fragments_in_m2 + 1; ++depth) {
- auto r = make_flat_mutation_reader_from_mutations_v2(m1.schema(), semaphore.make_permit(), {m1, m2});
- auto close_r = deferred_close(r);
- auto result = r.consume(mock_consumer(*m1.schema(), semaphore.make_permit(), depth)).get0();
- BOOST_REQUIRE(result._consume_end_of_stream_called);
- BOOST_REQUIRE_EQUAL(2, result._consume_new_partition_call_count);
- BOOST_REQUIRE_EQUAL(2, result._consume_end_of_partition_call_count);
- size_t tombstones_count = 0;
- if (m1.partition().partition_tombstone()) {
- ++tombstones_count;
- }
- if (m2.partition().partition_tombstone()) {
- ++tombstones_count;
- }
- BOOST_REQUIRE_EQUAL(tombstones_count, result._consume_tombstone_call_count);
- auto r2 = make_flat_mutation_reader_from_mutations_v2(m1.schema(), semaphore.make_permit(), {m1, m2});
- auto close_r2 = deferred_close(r2);
- auto start = r2().get0();
- BOOST_REQUIRE(start);
- BOOST_REQUIRE(start->is_partition_start());
- for (auto& mf : result._fragments) {
- auto mfopt = r2().get0();
- BOOST_REQUIRE(mfopt);
- if (mfopt->is_partition_start() || mfopt->is_end_of_partition()) {
- mfopt = r2().get0();
- }
- BOOST_REQUIRE(mfopt);
- BOOST_REQUIRE(mf.equal(*m1.schema(), *mfopt));
- }
+ }
+ for (size_t depth = fragments_in_m1; depth < fragments_in_m1 + fragments_in_m2 + 1; ++depth) {
+ auto r = make_flat_mutation_reader_from_mutations_v2(m1.schema(), semaphore.make_permit(), {m1, m2});
+ auto close_r = deferred_close(r);
+ auto result = r.consume(mock_consumer(*m1.schema(), semaphore.make_permit(), depth)).get0();
+ BOOST_REQUIRE(result._consume_end_of_stream_called);
+ BOOST_REQUIRE_EQUAL(2, result._consume_new_partition_call_count);
+ BOOST_REQUIRE_EQUAL(2, result._consume_end_of_partition_call_count);
+ size_t tombstones_count = 0;
+ if (m1.partition().partition_tombstone()) {
+ ++tombstones_count;
}
- };
- for_each_mutation_pair([&] (auto&& m, auto&& m2, are_equal) {
- if (m.decorated_key().less_compare(*m.schema(), m2.decorated_key())) {
- test(m, m2);
- } else if (m2.decorated_key().less_compare(*m.schema(), m.decorated_key())) {
- test(m2, m);
+ if (m2.partition().partition_tombstone()) {
+ ++tombstones_count;
}
- });
+ BOOST_REQUIRE_EQUAL(tombstones_count, result._consume_tombstone_call_count);
+ auto r2 = make_flat_mutation_reader_from_mutations_v2(m1.schema(), semaphore.make_permit(), {m1, m2});
+ auto close_r2 = deferred_close(r2);
+ auto start = r2().get0();
+ BOOST_REQUIRE(start);
+ BOOST_REQUIRE(start->is_partition_start());
+ for (auto& mf : result._fragments) {
+ auto mfopt = r2().get0();
+ BOOST_REQUIRE(mfopt);
+ if (mfopt->is_partition_start() || mfopt->is_end_of_partition()) {
+ mfopt = r2().get0();
+ }
+ BOOST_REQUIRE(mfopt);
+ BOOST_REQUIRE(mf.equal(*m1.schema(), *mfopt));
+ }
+ }
+ };
+ for_each_mutation_pair([&] (auto&& m, auto&& m2, are_equal) {
+ if (m.decorated_key().less_compare(*m.schema(), m2.decorated_key())) {
+ test(m, m2);
+ } else if (m2.decorated_key().less_compare(*m.schema(), m.decorated_key())) {
+ test(m2, m);
+ }
});
}
-SEASTAR_TEST_CASE(test_fragmenting_and_freezing) {
- return seastar::async([] {
- tests::reader_concurrency_semaphore_wrapper semaphore;
- for_each_mutation([&] (const mutation& m) {
- std::vector fms;
+SEASTAR_THREAD_TEST_CASE(test_fragmenting_and_freezing) {
+ tests::reader_concurrency_semaphore_wrapper semaphore;
+ for_each_mutation([&] (const mutation& m) {
+ std::vector fms;
- fragment_and_freeze(make_flat_mutation_reader_from_mutations_v2(m.schema(), semaphore.make_permit(), { mutation(m) }), [&] (auto fm, bool frag) {
+ fragment_and_freeze(make_flat_mutation_reader_from_mutations_v2(m.schema(), semaphore.make_permit(), { mutation(m) }), [&] (auto fm, bool frag) {
+ BOOST_REQUIRE(!frag);
+ fms.emplace_back(std::move(fm));
+ return make_ready_future(stop_iteration::no);
+ }, std::numeric_limits::max()).get0();
+
+ BOOST_REQUIRE_EQUAL(fms.size(), 1);
+
+ auto m1 = fms.back().unfreeze(m.schema());
+ BOOST_REQUIRE_EQUAL(m, m1);
+
+ fms.clear();
+
+ std::optional fragmented;
+ fragment_and_freeze(make_flat_mutation_reader_from_mutations_v2(m.schema(), semaphore.make_permit(), { mutation(m) }), [&] (auto fm, bool frag) {
+ BOOST_REQUIRE(!fragmented || *fragmented == frag);
+ *fragmented = frag;
+ fms.emplace_back(std::move(fm));
+ return make_ready_future(stop_iteration::no);
+ }, 1).get0();
+
+ auto&& rows = m.partition().non_dummy_rows();
+ auto expected_fragments = std::distance(rows.begin(), rows.end())
+ + m.partition().row_tombstones().size()
+ + !m.partition().static_row().empty();
+ BOOST_REQUIRE_EQUAL(fms.size(), std::max(expected_fragments, size_t(1)));
+ BOOST_REQUIRE(expected_fragments < 2 || *fragmented);
+
+ auto m2 = fms.back().unfreeze(m.schema());
+ fms.pop_back();
+ mutation_application_stats app_stats;
+ while (!fms.empty()) {
+ m2.partition().apply(*m.schema(), fms.back().partition(), *m.schema(), app_stats);
+ fms.pop_back();
+ }
+ BOOST_REQUIRE_EQUAL(m, m2);
+ });
+
+ auto test_random_streams = [&semaphore] (random_mutation_generator&& gen) {
+ for (auto i = 0; i < 4; i++) {
+ auto muts = gen(4);
+ auto s = muts[0].schema();
+
+ std::vector frozen;
+
+ // Freeze all
+ fragment_and_freeze(make_flat_mutation_reader_from_mutations_v2(gen.schema(), semaphore.make_permit(), muts), [&] (auto fm, bool frag) {
BOOST_REQUIRE(!frag);
- fms.emplace_back(std::move(fm));
+ frozen.emplace_back(fm);
return make_ready_future(stop_iteration::no);
}, std::numeric_limits::max()).get0();
+ BOOST_REQUIRE_EQUAL(muts.size(), frozen.size());
+ for (auto j = 0u; j < muts.size(); j++) {
+ BOOST_REQUIRE_EQUAL(muts[j], frozen[j].unfreeze(s));
+ }
- BOOST_REQUIRE_EQUAL(fms.size(), 1);
+ // Freeze first
+ frozen.clear();
+ fragment_and_freeze(make_flat_mutation_reader_from_mutations_v2(gen.schema(), semaphore.make_permit(), muts), [&] (auto fm, bool frag) {
+ BOOST_REQUIRE(!frag);
+ frozen.emplace_back(fm);
+ return make_ready_future(stop_iteration::yes);
+ }, std::numeric_limits::max()).get0();
+ BOOST_REQUIRE_EQUAL(frozen.size(), 1);
+ BOOST_REQUIRE_EQUAL(muts[0], frozen[0].unfreeze(s));
- auto m1 = fms.back().unfreeze(m.schema());
- BOOST_REQUIRE_EQUAL(m, m1);
-
- fms.clear();
-
- std::optional fragmented;
- fragment_and_freeze(make_flat_mutation_reader_from_mutations_v2(m.schema(), semaphore.make_permit(), { mutation(m) }), [&] (auto fm, bool frag) {
- BOOST_REQUIRE(!fragmented || *fragmented == frag);
- *fragmented = frag;
- fms.emplace_back(std::move(fm));
+ // Fragment and freeze all
+ frozen.clear();
+ fragment_and_freeze(make_flat_mutation_reader_from_mutations_v2(gen.schema(), semaphore.make_permit(), muts), [&] (auto fm, bool frag) {
+ frozen.emplace_back(fm);
return make_ready_future(stop_iteration::no);
}, 1).get0();
-
- auto&& rows = m.partition().non_dummy_rows();
- auto expected_fragments = std::distance(rows.begin(), rows.end())
- + m.partition().row_tombstones().size()
- + !m.partition().static_row().empty();
- BOOST_REQUIRE_EQUAL(fms.size(), std::max(expected_fragments, size_t(1)));
- BOOST_REQUIRE(expected_fragments < 2 || *fragmented);
-
- auto m2 = fms.back().unfreeze(m.schema());
- fms.pop_back();
- mutation_application_stats app_stats;
- while (!fms.empty()) {
- m2.partition().apply(*m.schema(), fms.back().partition(), *m.schema(), app_stats);
- fms.pop_back();
- }
- BOOST_REQUIRE_EQUAL(m, m2);
- });
-
- auto test_random_streams = [&semaphore] (random_mutation_generator&& gen) {
- for (auto i = 0; i < 4; i++) {
- auto muts = gen(4);
- auto s = muts[0].schema();
-
- std::vector frozen;
-
- // Freeze all
- fragment_and_freeze(make_flat_mutation_reader_from_mutations_v2(gen.schema(), semaphore.make_permit(), muts), [&] (auto fm, bool frag) {
- BOOST_REQUIRE(!frag);
- frozen.emplace_back(fm);
- return make_ready_future(stop_iteration::no);
- }, std::numeric_limits::max()).get0();
- BOOST_REQUIRE_EQUAL(muts.size(), frozen.size());
- for (auto j = 0u; j < muts.size(); j++) {
- BOOST_REQUIRE_EQUAL(muts[j], frozen[j].unfreeze(s));
+ std::vector unfrozen;
+ while (!frozen.empty()) {
+ auto m = frozen.front().unfreeze(s);
+ frozen.erase(frozen.begin());
+ if (unfrozen.empty() || !unfrozen.back().decorated_key().equal(*s, m.decorated_key())) {
+ unfrozen.emplace_back(std::move(m));
+ } else {
+ unfrozen.back().apply(std::move(m));
}
-
- // Freeze first
- frozen.clear();
- fragment_and_freeze(make_flat_mutation_reader_from_mutations_v2(gen.schema(), semaphore.make_permit(), muts), [&] (auto fm, bool frag) {
- BOOST_REQUIRE(!frag);
- frozen.emplace_back(fm);
- return make_ready_future(stop_iteration::yes);
- }, std::numeric_limits::max()).get0();
- BOOST_REQUIRE_EQUAL(frozen.size(), 1);
- BOOST_REQUIRE_EQUAL(muts[0], frozen[0].unfreeze(s));
-
- // Fragment and freeze all
- frozen.clear();
- fragment_and_freeze(make_flat_mutation_reader_from_mutations_v2(gen.schema(), semaphore.make_permit(), muts), [&] (auto fm, bool frag) {
- frozen.emplace_back(fm);
- return make_ready_future(stop_iteration::no);
- }, 1).get0();
- std::vector unfrozen;
- while (!frozen.empty()) {
- auto m = frozen.front().unfreeze(s);
- frozen.erase(frozen.begin());
- if (unfrozen.empty() || !unfrozen.back().decorated_key().equal(*s, m.decorated_key())) {
- unfrozen.emplace_back(std::move(m));
- } else {
- unfrozen.back().apply(std::move(m));
- }
- }
- BOOST_REQUIRE_EQUAL(muts, unfrozen);
}
- };
+ BOOST_REQUIRE_EQUAL(muts, unfrozen);
+ }
+ };
- test_random_streams(random_mutation_generator(random_mutation_generator::generate_counters::no));
- test_random_streams(random_mutation_generator(random_mutation_generator::generate_counters::yes));
- });
+ test_random_streams(random_mutation_generator(random_mutation_generator::generate_counters::no));
+ test_random_streams(random_mutation_generator(random_mutation_generator::generate_counters::yes));
}
SEASTAR_THREAD_TEST_CASE(test_flat_mutation_reader_move_buffer_content_to) {
@@ -371,111 +366,109 @@ SEASTAR_THREAD_TEST_CASE(test_flat_mutation_reader_move_buffer_content_to) {
.is_equal_to(mut_orig);
}
-SEASTAR_TEST_CASE(test_multi_range_reader) {
- return seastar::async([] {
- simple_schema s;
- tests::reader_concurrency_semaphore_wrapper semaphore;
- auto permit = semaphore.make_permit();
+SEASTAR_THREAD_TEST_CASE(test_multi_range_reader) {
+ simple_schema s;
+ tests::reader_concurrency_semaphore_wrapper semaphore;
+ auto permit = semaphore.make_permit();
- auto keys = s.make_pkeys(10);
- auto ring = s.to_ring_positions(keys);
+ auto keys = s.make_pkeys(10);
+ auto ring = s.to_ring_positions(keys);
- auto crs = boost::copy_range>(boost::irange(0, 3) | boost::adaptors::transformed([&] (auto n) {
- return s.make_row(permit, s.make_ckey(n), "value");
- }));
+ auto crs = boost::copy_range>(boost::irange(0, 3) | boost::adaptors::transformed([&] (auto n) {
+ return s.make_row(permit, s.make_ckey(n), "value");
+ }));
- auto ms = boost::copy_range>(keys | boost::adaptors::transformed([&] (auto& key) {
- auto m = mutation(s.schema(), key);
- for (auto& mf : crs) {
- m.apply(mf);
- }
- return m;
- }));
+ auto ms = boost::copy_range>(keys | boost::adaptors::transformed([&] (auto& key) {
+ auto m = mutation(s.schema(), key);
+ for (auto& mf : crs) {
+ m.apply(mf);
+ }
+ return m;
+ }));
- auto source = mutation_source([&] (schema_ptr, reader_permit permit, const dht::partition_range& range) {
- return make_flat_mutation_reader_from_mutations_v2(s.schema(), std::move(permit), ms, range);
- });
-
- const auto empty_ranges = dht::partition_range_vector{};
- const auto single_ranges = dht::partition_range_vector{
- dht::partition_range::make(ring[1], ring[2]),
- };
- const auto multiple_ranges = dht::partition_range_vector {
- dht::partition_range::make(ring[1], ring[2]),
- dht::partition_range::make_singular(ring[4]),
- dht::partition_range::make(ring[6], ring[8]),
- };
- const auto empty_generator = [] { return std::optional{}; };
- const auto single_generator = [r = std::optional(single_ranges.front())] () mutable {
- return std::exchange(r, {});
- };
- const auto multiple_generator = [it = multiple_ranges.cbegin(), end = multiple_ranges.cend()] () mutable -> std::optional {
- if (it == end) {
- return std::nullopt;
- }
- return *(it++);
- };
- auto fft_range = dht::partition_range::make_starting_with(ring[9]);
-
- // Generator ranges are single pass, so we need a new range each time they are used.
- auto run_test = [&] (auto make_empty_ranges, auto make_single_ranges, auto make_multiple_ranges) {
- testlog.info("empty ranges");
- assert_that(make_flat_multi_range_reader(s.schema(), semaphore.make_permit(), source, make_empty_ranges(), s.schema()->full_slice()))
- .produces_end_of_stream()
- .fast_forward_to(fft_range)
- .produces(ms[9])
- .produces_end_of_stream();
-
- testlog.info("single range");
- assert_that(make_flat_multi_range_reader(s.schema(), semaphore.make_permit(), source, make_single_ranges(), s.schema()->full_slice()))
- .produces(ms[1])
- .produces(ms[2])
- .produces_end_of_stream()
- .fast_forward_to(fft_range)
- .produces(ms[9])
- .produces_end_of_stream();
-
- testlog.info("read full partitions and fast forward");
- assert_that(make_flat_multi_range_reader(s.schema(), semaphore.make_permit(), source, make_multiple_ranges(), s.schema()->full_slice()))
- .produces(ms[1])
- .produces(ms[2])
- .produces(ms[4])
- .produces(ms[6])
- .fast_forward_to(fft_range)
- .produces(ms[9])
- .produces_end_of_stream();
-
- testlog.info("read, skip partitions and fast forward");
- assert_that(make_flat_multi_range_reader(s.schema(), semaphore.make_permit(), source, make_multiple_ranges(), s.schema()->full_slice()))
- .produces_partition_start(keys[1])
- .next_partition()
- .produces_partition_start(keys[2])
- .produces_row_with_key(crs[0].as_clustering_row().key())
- .next_partition()
- .produces(ms[4])
- .next_partition()
- .produces_partition_start(keys[6])
- .produces_row_with_key(crs[0].as_clustering_row().key())
- .produces_row_with_key(crs[1].as_clustering_row().key())
- .fast_forward_to(fft_range)
- .next_partition()
- .produces_partition_start(keys[9])
- .next_partition()
- .produces_end_of_stream();
- };
-
- testlog.info("vector version");
- run_test(
- [&] { return empty_ranges; },
- [&] { return single_ranges; },
- [&] { return multiple_ranges; });
-
- testlog.info("generator version");
- run_test(
- [&] { return empty_generator; },
- [&] { return single_generator; },
- [&] { return multiple_generator; });
+ auto source = mutation_source([&] (schema_ptr, reader_permit permit, const dht::partition_range& range) {
+ return make_flat_mutation_reader_from_mutations_v2(s.schema(), std::move(permit), ms, range);
});
+
+ const auto empty_ranges = dht::partition_range_vector{};
+ const auto single_ranges = dht::partition_range_vector{
+ dht::partition_range::make(ring[1], ring[2]),
+ };
+ const auto multiple_ranges = dht::partition_range_vector {
+ dht::partition_range::make(ring[1], ring[2]),
+ dht::partition_range::make_singular(ring[4]),
+ dht::partition_range::make(ring[6], ring[8]),
+ };
+ const auto empty_generator = [] { return std::optional{}; };
+ const auto single_generator = [r = std::optional(single_ranges.front())] () mutable {
+ return std::exchange(r, {});
+ };
+ const auto multiple_generator = [it = multiple_ranges.cbegin(), end = multiple_ranges.cend()] () mutable -> std::optional {
+ if (it == end) {
+ return std::nullopt;
+ }
+ return *(it++);
+ };
+ auto fft_range = dht::partition_range::make_starting_with(ring[9]);
+
+ // Generator ranges are single pass, so we need a new range each time they are used.
+ auto run_test = [&] (auto make_empty_ranges, auto make_single_ranges, auto make_multiple_ranges) {
+ testlog.info("empty ranges");
+ assert_that(make_flat_multi_range_reader(s.schema(), semaphore.make_permit(), source, make_empty_ranges(), s.schema()->full_slice()))
+ .produces_end_of_stream()
+ .fast_forward_to(fft_range)
+ .produces(ms[9])
+ .produces_end_of_stream();
+
+ testlog.info("single range");
+ assert_that(make_flat_multi_range_reader(s.schema(), semaphore.make_permit(), source, make_single_ranges(), s.schema()->full_slice()))
+ .produces(ms[1])
+ .produces(ms[2])
+ .produces_end_of_stream()
+ .fast_forward_to(fft_range)
+ .produces(ms[9])
+ .produces_end_of_stream();
+
+ testlog.info("read full partitions and fast forward");
+ assert_that(make_flat_multi_range_reader(s.schema(), semaphore.make_permit(), source, make_multiple_ranges(), s.schema()->full_slice()))
+ .produces(ms[1])
+ .produces(ms[2])
+ .produces(ms[4])
+ .produces(ms[6])
+ .fast_forward_to(fft_range)
+ .produces(ms[9])
+ .produces_end_of_stream();
+
+ testlog.info("read, skip partitions and fast forward");
+ assert_that(make_flat_multi_range_reader(s.schema(), semaphore.make_permit(), source, make_multiple_ranges(), s.schema()->full_slice()))
+ .produces_partition_start(keys[1])
+ .next_partition()
+ .produces_partition_start(keys[2])
+ .produces_row_with_key(crs[0].as_clustering_row().key())
+ .next_partition()
+ .produces(ms[4])
+ .next_partition()
+ .produces_partition_start(keys[6])
+ .produces_row_with_key(crs[0].as_clustering_row().key())
+ .produces_row_with_key(crs[1].as_clustering_row().key())
+ .fast_forward_to(fft_range)
+ .next_partition()
+ .produces_partition_start(keys[9])
+ .next_partition()
+ .produces_end_of_stream();
+ };
+
+ testlog.info("vector version");
+ run_test(
+ [&] { return empty_ranges; },
+ [&] { return single_ranges; },
+ [&] { return multiple_ranges; });
+
+ testlog.info("generator version");
+ run_test(
+ [&] { return empty_generator; },
+ [&] { return single_generator; },
+ [&] { return multiple_generator; });
}
using reversed_partitions = seastar::bool_class;
@@ -648,95 +641,290 @@ void test_flat_stream(schema_ptr s, std::vector muts, reversed_partiti
}
}
-SEASTAR_TEST_CASE(test_consume_flat) {
- return seastar::async([] {
- auto test_random_streams = [&] (random_mutation_generator&& gen) {
- for (auto i = 0; i < 4; i++) {
- auto muts = gen(4);
- test_flat_stream(gen.schema(), muts, reversed_partitions::no, in_thread::no);
- test_flat_stream(gen.schema(), muts, reversed_partitions::yes, in_thread::no);
- test_flat_stream(gen.schema(), muts, reversed_partitions::no, in_thread::yes);
- }
- };
+SEASTAR_THREAD_TEST_CASE(test_consume_flat) {
+ auto test_random_streams = [&] (random_mutation_generator&& gen) {
+ for (auto i = 0; i < 4; i++) {
+ auto muts = gen(4);
+ test_flat_stream(gen.schema(), muts, reversed_partitions::no, in_thread::no);
+ test_flat_stream(gen.schema(), muts, reversed_partitions::yes, in_thread::no);
+ test_flat_stream(gen.schema(), muts, reversed_partitions::no, in_thread::yes);
+ }
+ };
- test_random_streams(random_mutation_generator(random_mutation_generator::generate_counters::no));
- test_random_streams(random_mutation_generator(random_mutation_generator::generate_counters::yes));
- });
+ test_random_streams(random_mutation_generator(random_mutation_generator::generate_counters::no));
+ test_random_streams(random_mutation_generator(random_mutation_generator::generate_counters::yes));
}
-SEASTAR_TEST_CASE(test_make_forwardable) {
- return seastar::async([] {
- simple_schema s;
- tests::reader_concurrency_semaphore_wrapper semaphore;
- auto permit = semaphore.make_permit();
+SEASTAR_THREAD_TEST_CASE(test_make_forwardable) {
+ simple_schema s;
+ tests::reader_concurrency_semaphore_wrapper semaphore;
+ auto permit = semaphore.make_permit();
- auto keys = s.make_pkeys(10);
+ auto keys = s.make_pkeys(10);
- auto crs = boost::copy_range < std::vector <
- mutation_fragment >> (boost::irange(0, 3) | boost::adaptors::transformed([&](auto n) {
- return s.make_row(permit, s.make_ckey(n), "value");
- }));
+ auto crs = boost::copy_range < std::vector <
+ mutation_fragment >> (boost::irange(0, 3) | boost::adaptors::transformed([&](auto n) {
+ return s.make_row(permit, s.make_ckey(n), "value");
+ }));
- auto ms = boost::copy_range < std::vector < mutation >> (keys | boost::adaptors::transformed([&](auto &key) {
- auto m = mutation(s.schema(), key);
- for (auto &mf : crs) {
- m.apply(mf);
- }
- return m;
- }));
+ auto ms = boost::copy_range < std::vector < mutation >> (keys | boost::adaptors::transformed([&](auto &key) {
+ auto m = mutation(s.schema(), key);
+ for (auto &mf : crs) {
+ m.apply(mf);
+ }
+ return m;
+ }));
- auto make_reader = [&] (auto& range) {
- return assert_that(
- make_forwardable(make_flat_mutation_reader_from_mutations_v2(s.schema(), semaphore.make_permit(), ms, range, streamed_mutation::forwarding::no)));
- };
+ auto make_reader = [&] (auto& range) {
+ return assert_that(
+ make_forwardable(make_flat_mutation_reader_from_mutations_v2(s.schema(), semaphore.make_permit(), ms, range, streamed_mutation::forwarding::no)));
+ };
- auto test = [&] (auto& rd, auto& partition) {
- rd.produces_partition_start(partition.decorated_key(), partition.partition().partition_tombstone());
- rd.produces_end_of_stream();
- rd.fast_forward_to(position_range::all_clustered_rows());
- for (auto &row : partition.partition().clustered_rows()) {
- rd.produces_row_with_key(row.key());
- }
- rd.produces_end_of_stream();
+ auto test = [&] (auto& rd, auto& partition) {
+ rd.produces_partition_start(partition.decorated_key(), partition.partition().partition_tombstone());
+ rd.produces_end_of_stream();
+ rd.fast_forward_to(position_range::all_clustered_rows());
+ for (auto &row : partition.partition().clustered_rows()) {
+ rd.produces_row_with_key(row.key());
+ }
+ rd.produces_end_of_stream();
+ rd.next_partition();
+ };
+
+ auto rd = make_reader(query::full_partition_range);
+
+ for (auto& partition : ms) {
+ test(rd, partition);
+ }
+
+ auto single_range = dht::partition_range::make_singular(ms[0].decorated_key());
+
+ auto rd2 = make_reader(single_range);
+
+ rd2.produces_partition_start(ms[0].decorated_key(), ms[0].partition().partition_tombstone());
+ rd2.produces_end_of_stream();
+ rd2.fast_forward_to(position_range::all_clustered_rows());
+ rd2.produces_row_with_key(ms[0].partition().clustered_rows().begin()->key());
+ rd2.produces_row_with_key(std::next(ms[0].partition().clustered_rows().begin())->key());
+
+ auto remaining_range = dht::partition_range::make_starting_with({ms[0].decorated_key(), false});
+
+ rd2.fast_forward_to(remaining_range);
+
+ for (auto i = size_t(1); i < ms.size(); ++i) {
+ test(rd2, ms[i]);
+ }
+}
+
+SEASTAR_THREAD_TEST_CASE(test_make_forwardable_next_partition) {
+ simple_schema s;
+ tests::reader_concurrency_semaphore_wrapper semaphore;
+ const auto permit = semaphore.make_permit();
+
+ auto make_reader = [&](std::vector mutations, const dht::partition_range& pr) {
+ auto result = make_flat_mutation_reader_from_mutations_v2(s.schema(),
+ permit,
+ std::move(mutations),
+ pr,
+ streamed_mutation::forwarding::yes);
+ return assert_that(std::move(result)).exact();
+ };
+
+ const auto pk1 = s.make_pkey(1);
+ auto m1 = mutation(s.schema(), pk1);
+ s.add_static_row(m1, "test-static-1");
+
+ const auto pk2 = s.make_pkey(2);
+ auto m2 = mutation(s.schema(), pk2);
+ s.add_static_row(m2, "test-static-2");
+
+ dht::ring_position_comparator cmp{*s.schema()};
+ BOOST_CHECK_EQUAL(cmp(m1.decorated_key(), m2.decorated_key()), std::strong_ordering::less);
+
+ auto rd = make_reader({m1, m2}, query::full_partition_range);
+ rd.fill_buffer().get();
+ rd.next_partition();
+ rd.produces_partition_start(m1.decorated_key(), m1.partition().partition_tombstone());
+ rd.produces_static_row(
+ {{s.schema()->get_column_definition(to_bytes("s1")), to_bytes("test-static-1")}});
+ rd.produces_end_of_stream();
+
+ rd.next_partition();
+ rd.produces_partition_start(m2.decorated_key(), m2.partition().partition_tombstone());
+ rd.produces_static_row(
+ {{s.schema()->get_column_definition(to_bytes("s1")), to_bytes("test-static-2")}});
+ rd.produces_end_of_stream();
+
+ rd.next_partition();
+ rd.produces_end_of_stream();
+}
+
+SEASTAR_THREAD_TEST_CASE(test_make_nonforwardable) {
+ simple_schema s;
+ tests::reader_concurrency_semaphore_wrapper semaphore;
+ const auto permit = semaphore.make_permit();
+
+ auto make_reader = [&](std::vector mutations,
+ bool single_partition,
+ const dht::partition_range& pr)
+ {
+ auto result = make_flat_mutation_reader_from_mutations_v2(s.schema(),
+ permit,
+ std::move(mutations),
+ pr,
+ streamed_mutation::forwarding::yes);
+ result = make_nonforwardable(std::move(result), single_partition);
+ return assert_that(std::move(result)).exact();
+ };
+
+ const auto pk1 = s.make_pkey(1);
+ auto m1 = mutation(s.schema(), pk1);
+ m1.apply(s.make_row(permit, s.make_ckey(11), "value1"));
+
+ const auto pk2 = s.make_pkey(2);
+ auto m2 = mutation(s.schema(), pk2);
+ m2.apply(s.make_row(permit, s.make_ckey(22), "value2"));
+
+ const auto pk3 = s.make_pkey(3);
+ auto m3 = mutation(s.schema(), pk3);
+ m3.apply(s.make_row(permit, s.make_ckey(33), "value3"));
+
+ dht::ring_position_comparator cmp{*s.schema()};
+ BOOST_CHECK_EQUAL(cmp(m1.decorated_key(), m2.decorated_key()), std::strong_ordering::less);
+ BOOST_CHECK_EQUAL(cmp(m2.decorated_key(), m3.decorated_key()), std::strong_ordering::less);
+
+ // no input -> no output
+ {
+ auto rd = make_reader({}, false, query::full_partition_range);
+ rd.produces_end_of_stream();
+ }
+
+ // next_partition()
+ {
+ auto check = [&] (flat_reader_assertions_v2 rd) {
+ rd.produces_partition_start(m1.decorated_key(), m1.partition().partition_tombstone());
rd.next_partition();
+ rd.produces_partition_start(m2.decorated_key(), m2.partition().partition_tombstone());
+ rd.produces_row_with_key(m2.partition().clustered_rows().begin()->key());
+ rd.produces_partition_end();
+ rd.produces_end_of_stream();
};
- auto rd = make_reader(query::full_partition_range);
+ // buffer is not empty
+ check(make_reader({m1, m2}, false, query::full_partition_range));
- for (auto& partition : ms) {
- test(rd, partition);
+ // buffer is empty
+ {
+ auto rd = make_reader({m1, m2}, false, query::full_partition_range);
+ rd.set_max_buffer_size(1);
+ check(std::move(rd));
}
+ }
- auto single_range = dht::partition_range::make_singular(ms[0].decorated_key());
+ // fast_forward_to()
+ {
+ const auto m1_range = dht::partition_range::make_singular(m1.decorated_key());
+ auto rd = make_reader({m1, m2}, false, m1_range);
+ rd.set_max_buffer_size(1);
- auto rd2 = make_reader(single_range);
+ rd.produces_partition_start(m1.decorated_key(), m1.partition().partition_tombstone());
- rd2.produces_partition_start(ms[0].decorated_key(), ms[0].partition().partition_tombstone());
- rd2.produces_end_of_stream();
- rd2.fast_forward_to(position_range::all_clustered_rows());
- rd2.produces_row_with_key(ms[0].partition().clustered_rows().begin()->key());
- rd2.produces_row_with_key(std::next(ms[0].partition().clustered_rows().begin())->key());
+ const auto m2_range = dht::partition_range::make_singular(m2.decorated_key());
+ rd.fast_forward_to(m2_range);
+ rd.produces_partition_start(m2.decorated_key(), m2.partition().partition_tombstone());
+ rd.produces_row_with_key(m2.partition().clustered_rows().begin()->key());
+ rd.produces_partition_end();
- auto remaining_range = dht::partition_range::make_starting_with({ms[0].decorated_key(), false});
+ rd.next_partition();
+ rd.produces_end_of_stream();
+ }
- rd2.fast_forward_to(remaining_range);
+ // single_partition
+ {
+ auto rd = make_reader({m1, m2}, true, query::full_partition_range);
+ rd.set_max_buffer_size(1);
- for (auto i = size_t(1); i < ms.size(); ++i) {
- test(rd2, ms[i]);
- }
- });
+ rd.produces_partition_start(m1.decorated_key(), m1.partition().partition_tombstone());
+ rd.produces_row_with_key(m1.partition().clustered_rows().begin()->key());
+
+ rd.next_partition();
+ rd.produces_end_of_stream();
+
+ rd.next_partition();
+ rd.produces_end_of_stream();
+ }
+
+ // single_partition with fast_forward_to
+ {
+ const auto m1_range = dht::partition_range::make_singular(m1.decorated_key());
+ auto rd = make_reader({m1, m2}, true, m1_range);
+ rd.set_max_buffer_size(1);
+
+ rd.produces_partition_start(m1.decorated_key(), m1.partition().partition_tombstone());
+
+ const auto m2_range = dht::partition_range::make_singular(m2.decorated_key());
+ rd.fast_forward_to(m2_range);
+ rd.produces_end_of_stream();
+
+ rd.next_partition();
+ rd.produces_end_of_stream();
+ }
+
+ // static row
+ {
+ s.add_static_row(m1, "test-static");
+ const auto m1_range = dht::partition_range::make_singular(m1.decorated_key());
+ auto rd = make_reader({m1, m2}, false, m1_range);
+ rd.set_max_buffer_size(1);
+ rd.produces_partition_start(m1.decorated_key(), m1.partition().partition_tombstone());
+ rd.produces_static_row(
+ {{s.schema()->get_column_definition(to_bytes("s1")), to_bytes("test-static")}});
+ rd.produces_row(
+ m1.partition().clustered_rows().begin()->key(),
+ {{s.schema()->get_column_definition(to_bytes("v")), to_bytes("value1")}}
+ );
+ rd.produces_partition_end();
+ rd.produces_end_of_stream();
+ }
}
-SEASTAR_TEST_CASE(test_abandoned_flat_mutation_reader_from_mutation) {
- return seastar::async([] {
- tests::reader_concurrency_semaphore_wrapper semaphore;
- for_each_mutation([&] (const mutation& m) {
- auto rd = make_flat_mutation_reader_from_mutations_v2(m.schema(), semaphore.make_permit(), {mutation(m)});
- auto close_rd = deferred_close(rd);
- rd().get();
- rd().get();
- // We rely on AddressSanitizer telling us if nothing was leaked.
+SEASTAR_THREAD_TEST_CASE(test_make_nonforwardable_from_mutations_as_mutation_source) {
+ auto populate = [] (schema_ptr, const std::vector &muts) {
+ return mutation_source([=] (
+ schema_ptr schema,
+ reader_permit permit,
+ const dht::partition_range& range,
+ const query::partition_slice& slice,
+ const io_priority_class&,
+ tracing::trace_state_ptr,
+ streamed_mutation::forwarding fwd_sm,
+ mutation_reader::forwarding) mutable {
+ auto squashed_muts = squash_mutations(muts);
+ const auto single_partition = squashed_muts.size() == 1;
+ auto reader = make_flat_mutation_reader_from_mutations_v2(schema,
+ std::move(permit),
+ std::move(squashed_muts),
+ range,
+ slice,
+ streamed_mutation::forwarding::yes);
+ reader = make_nonforwardable(std::move(reader), single_partition);
+ if (fwd_sm) {
+ reader = make_forwardable(std::move(reader));
+ }
+ return reader;
});
+ };
+ run_mutation_source_tests(populate);
+}
+
+SEASTAR_THREAD_TEST_CASE(test_abandoned_flat_mutation_reader_from_mutation) {
+ tests::reader_concurrency_semaphore_wrapper semaphore;
+ for_each_mutation([&] (const mutation& m) {
+ auto rd = make_flat_mutation_reader_from_mutations_v2(m.schema(), semaphore.make_permit(), {mutation(m)});
+ auto close_rd = deferred_close(rd);
+ rd().get();
+ rd().get();
+ // We rely on AddressSanitizer telling us if nothing was leaked.
});
}