Merge 'readers/nonforwarding: don't emit partition_end on next_partition,fast_forward_to' from Gusev Petr
The series fixes the `make_nonforwardable` reader, it shouldn't emit `partition_end` for previous partition after `next_partition()` and `fast_forward_to()` Fixes: #12249 Closes #12978 * github.com:scylladb/scylladb: flat_mutation_reader_test: cleanup, seastar::async -> SEASTAR_THREAD_TEST_CASE make_nonforwardable: test through run_mutation_source_tests make_nonforwardable: next_partition and fast_forward_to when single_partition is true make_forwardable: fix next_partition flat_mutation_reader_v2: drop forward_buffer_to nonforwardable reader: fix indentation nonforwardable reader: refactor, extract reset_partition nonforwardable reader: add more tests nonforwardable reader: no partition_end after fast_forward_to() nonforwardable reader: no partition_end after next_partition() nonforwardable reader: no partition_end for empty reader row_cache: pass partition_start though nonforwardable reader
This commit is contained in:
@@ -59,7 +59,7 @@ public:
|
|||||||
}
|
}
|
||||||
|
|
||||||
_end_of_stream = false;
|
_end_of_stream = false;
|
||||||
forward_buffer_to(pr.start());
|
clear_buffer();
|
||||||
return _underlying->fast_forward_to(std::move(pr));
|
return _underlying->fast_forward_to(std::move(pr));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -295,7 +295,7 @@ future<> size_estimates_mutation_reader::fast_forward_to(const dht::partition_ra
|
|||||||
}
|
}
|
||||||
|
|
||||||
future<> size_estimates_mutation_reader::fast_forward_to(position_range pr) {
|
future<> size_estimates_mutation_reader::fast_forward_to(position_range pr) {
|
||||||
forward_buffer_to(pr.start());
|
clear_buffer();
|
||||||
_end_of_stream = false;
|
_end_of_stream = false;
|
||||||
if (_partition_reader) {
|
if (_partition_reader) {
|
||||||
return _partition_reader->fast_forward_to(std::move(pr));
|
return _partition_reader->fast_forward_to(std::move(pr));
|
||||||
|
|||||||
@@ -172,7 +172,7 @@ class build_progress_virtual_reader {
|
|||||||
}
|
}
|
||||||
|
|
||||||
virtual future<> fast_forward_to(position_range range) override {
|
virtual future<> fast_forward_to(position_range range) override {
|
||||||
forward_buffer_to(range.start());
|
clear_buffer();
|
||||||
_end_of_stream = false;
|
_end_of_stream = false;
|
||||||
return _underlying.fast_forward_to(std::move(range));
|
return _underlying.fast_forward_to(std::move(range));
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -175,7 +175,7 @@ class built_indexes_virtual_reader {
|
|||||||
}
|
}
|
||||||
|
|
||||||
virtual future<> fast_forward_to(position_range range) override {
|
virtual future<> fast_forward_to(position_range range) override {
|
||||||
forward_buffer_to(range.start());
|
clear_buffer();
|
||||||
_end_of_stream = false;
|
_end_of_stream = false;
|
||||||
// range contains index names (e.g., xyz) but the underlying table
|
// range contains index names (e.g., xyz) but the underlying table
|
||||||
// contains view names (e.g., xyz_index) so we need to add the
|
// contains view names (e.g., xyz_index) so we need to add the
|
||||||
|
|||||||
@@ -658,7 +658,7 @@ future<> merging_reader<P>::fast_forward_to(const dht::partition_range& pr) {
|
|||||||
|
|
||||||
template <FragmentProducer P>
|
template <FragmentProducer P>
|
||||||
future<> merging_reader<P>::fast_forward_to(position_range pr) {
|
future<> merging_reader<P>::fast_forward_to(position_range pr) {
|
||||||
forward_buffer_to(pr.start());
|
clear_buffer();
|
||||||
_end_of_stream = false;
|
_end_of_stream = false;
|
||||||
return _merger.fast_forward_to(std::move(pr));
|
return _merger.fast_forward_to(std::move(pr));
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -40,7 +40,7 @@ public:
|
|||||||
}
|
}
|
||||||
virtual future<> fast_forward_to(position_range pr) override {
|
virtual future<> fast_forward_to(position_range pr) override {
|
||||||
_end_of_stream = false;
|
_end_of_stream = false;
|
||||||
forward_buffer_to(pr.start());
|
clear_buffer();
|
||||||
return _underlying->fast_forward_to(std::move(pr));
|
return _underlying->fast_forward_to(std::move(pr));
|
||||||
}
|
}
|
||||||
virtual future<> next_partition() override {
|
virtual future<> next_partition() override {
|
||||||
|
|||||||
@@ -54,7 +54,7 @@ public:
|
|||||||
return _rd.fast_forward_to(pr);
|
return _rd.fast_forward_to(pr);
|
||||||
}
|
}
|
||||||
virtual future<> fast_forward_to(position_range pr) override {
|
virtual future<> fast_forward_to(position_range pr) override {
|
||||||
forward_buffer_to(pr.start());
|
clear_buffer();
|
||||||
_end_of_stream = false;
|
_end_of_stream = false;
|
||||||
return _rd.fast_forward_to(std::move(pr));
|
return _rd.fast_forward_to(std::move(pr));
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -153,7 +153,6 @@ public:
|
|||||||
void reserve_additional(size_t n) {
|
void reserve_additional(size_t n) {
|
||||||
_buffer.reserve(_buffer.size() + n);
|
_buffer.reserve(_buffer.size() + n);
|
||||||
}
|
}
|
||||||
void forward_buffer_to(const position_in_partition& pos);
|
|
||||||
void clear_buffer_to_next_partition();
|
void clear_buffer_to_next_partition();
|
||||||
template<typename Source>
|
template<typename Source>
|
||||||
future<bool> fill_buffer_from(Source&);
|
future<bool> fill_buffer_from(Source&);
|
||||||
@@ -722,7 +721,7 @@ flat_mutation_reader_v2 transform(flat_mutation_reader_v2 r, T t) {
|
|||||||
return _reader.fast_forward_to(pr);
|
return _reader.fast_forward_to(pr);
|
||||||
}
|
}
|
||||||
virtual future<> fast_forward_to(position_range pr) override {
|
virtual future<> fast_forward_to(position_range pr) override {
|
||||||
forward_buffer_to(pr.start());
|
clear_buffer();
|
||||||
_end_of_stream = false;
|
_end_of_stream = false;
|
||||||
return _reader.fast_forward_to(std::move(pr));
|
return _reader.fast_forward_to(std::move(pr));
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -158,7 +158,7 @@ future<> foreign_reader::fast_forward_to(const dht::partition_range& pr) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
future<> foreign_reader::fast_forward_to(position_range pr) {
|
future<> foreign_reader::fast_forward_to(position_range pr) {
|
||||||
forward_buffer_to(pr.start());
|
clear_buffer();
|
||||||
_end_of_stream = false;
|
_end_of_stream = false;
|
||||||
return forward_operation([reader = _reader.get(), pr = std::move(pr)] () {
|
return forward_operation([reader = _reader.get(), pr = std::move(pr)] () {
|
||||||
return reader->fast_forward_to(std::move(pr));
|
return reader->fast_forward_to(std::move(pr));
|
||||||
|
|||||||
@@ -385,11 +385,6 @@ flat_mutation_reader_v2::~flat_mutation_reader_v2() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void flat_mutation_reader_v2::impl::forward_buffer_to(const position_in_partition& pos) {
|
|
||||||
clear_buffer();
|
|
||||||
_buffer_size = compute_buffer_size(*_schema, _buffer);
|
|
||||||
}
|
|
||||||
|
|
||||||
void flat_mutation_reader_v2::impl::clear_buffer_to_next_partition() {
|
void flat_mutation_reader_v2::impl::clear_buffer_to_next_partition() {
|
||||||
auto next_partition_start = std::find_if(_buffer.begin(), _buffer.end(), [] (const mutation_fragment_v2& mf) {
|
auto next_partition_start = std::find_if(_buffer.begin(), _buffer.end(), [] (const mutation_fragment_v2& mf) {
|
||||||
return mf.is_partition_start();
|
return mf.is_partition_start();
|
||||||
|
|||||||
@@ -167,16 +167,19 @@ flat_mutation_reader_v2 make_forwardable(flat_mutation_reader_v2 m) {
|
|||||||
_current = std::move(pr);
|
_current = std::move(pr);
|
||||||
_end_of_stream = false;
|
_end_of_stream = false;
|
||||||
_current_has_content = false;
|
_current_has_content = false;
|
||||||
forward_buffer_to(_current.start());
|
clear_buffer();
|
||||||
return make_ready_future<>();
|
return make_ready_future<>();
|
||||||
}
|
}
|
||||||
virtual future<> next_partition() override {
|
virtual future<> next_partition() override {
|
||||||
|
clear_buffer_to_next_partition();
|
||||||
|
if (!is_buffer_empty()) {
|
||||||
|
co_return;
|
||||||
|
}
|
||||||
_end_of_stream = false;
|
_end_of_stream = false;
|
||||||
if (!_next || !_next->is_partition_start()) {
|
if (!_next || !_next->is_partition_start()) {
|
||||||
co_await _underlying.next_partition();
|
co_await _underlying.next_partition();
|
||||||
_next = {};
|
_next = {};
|
||||||
}
|
}
|
||||||
clear_buffer_to_next_partition();
|
|
||||||
_current = {
|
_current = {
|
||||||
position_in_partition::for_partition_start(),
|
position_in_partition::for_partition_start(),
|
||||||
position_in_partition(position_in_partition::after_static_row_tag_t())
|
position_in_partition(position_in_partition::after_static_row_tag_t())
|
||||||
@@ -267,7 +270,7 @@ flat_mutation_reader_v2 make_slicing_filtering_reader(flat_mutation_reader_v2 rd
|
|||||||
}
|
}
|
||||||
|
|
||||||
virtual future<> fast_forward_to(position_range pr) override {
|
virtual future<> fast_forward_to(position_range pr) override {
|
||||||
forward_buffer_to(pr.start());
|
clear_buffer();
|
||||||
_end_of_stream = false;
|
_end_of_stream = false;
|
||||||
return _rd.fast_forward_to(std::move(pr));
|
return _rd.fast_forward_to(std::move(pr));
|
||||||
}
|
}
|
||||||
@@ -411,25 +414,32 @@ flat_mutation_reader_v2 make_nonforwardable(flat_mutation_reader_v2 r, bool sing
|
|||||||
flat_mutation_reader_v2 _underlying;
|
flat_mutation_reader_v2 _underlying;
|
||||||
bool _single_partition;
|
bool _single_partition;
|
||||||
bool _static_row_done = false;
|
bool _static_row_done = false;
|
||||||
|
bool _partition_is_open = false;
|
||||||
bool is_end_end_of_underlying_stream() const {
|
bool is_end_end_of_underlying_stream() const {
|
||||||
return _underlying.is_buffer_empty() && _underlying.is_end_of_stream();
|
return _underlying.is_buffer_empty() && _underlying.is_end_of_stream();
|
||||||
}
|
}
|
||||||
future<> on_end_of_underlying_stream() {
|
future<> on_end_of_underlying_stream() {
|
||||||
if (!_static_row_done) {
|
if (_partition_is_open) {
|
||||||
_static_row_done = true;
|
if (!_static_row_done) {
|
||||||
return _underlying.fast_forward_to(position_range::all_clustered_rows());
|
_static_row_done = true;
|
||||||
|
return _underlying.fast_forward_to(position_range::all_clustered_rows());
|
||||||
|
}
|
||||||
|
push_mutation_fragment(*_schema, _permit, partition_end());
|
||||||
|
reset_partition();
|
||||||
}
|
}
|
||||||
push_mutation_fragment(*_schema, _permit, partition_end());
|
|
||||||
if (_single_partition) {
|
if (_single_partition) {
|
||||||
_end_of_stream = true;
|
_end_of_stream = true;
|
||||||
return make_ready_future<>();
|
return make_ready_future<>();
|
||||||
}
|
}
|
||||||
return _underlying.next_partition().then([this] {
|
return _underlying.next_partition().then([this] {
|
||||||
_static_row_done = false;
|
return _underlying.fill_buffer().then([this] {
|
||||||
return _underlying.fill_buffer().then([this] {
|
_end_of_stream = is_end_end_of_underlying_stream();
|
||||||
_end_of_stream = is_end_end_of_underlying_stream();
|
});
|
||||||
});
|
});
|
||||||
});
|
}
|
||||||
|
void reset_partition() {
|
||||||
|
_partition_is_open = false;
|
||||||
|
_static_row_done = false;
|
||||||
}
|
}
|
||||||
public:
|
public:
|
||||||
reader(flat_mutation_reader_v2 r, bool single_partition)
|
reader(flat_mutation_reader_v2 r, bool single_partition)
|
||||||
@@ -440,6 +450,9 @@ flat_mutation_reader_v2 make_nonforwardable(flat_mutation_reader_v2 r, bool sing
|
|||||||
virtual future<> fill_buffer() override {
|
virtual future<> fill_buffer() override {
|
||||||
return do_until([this] { return is_end_of_stream() || is_buffer_full(); }, [this] {
|
return do_until([this] { return is_end_of_stream() || is_buffer_full(); }, [this] {
|
||||||
return fill_buffer_from(_underlying).then([this] (bool underlying_finished) {
|
return fill_buffer_from(_underlying).then([this] (bool underlying_finished) {
|
||||||
|
if (!_partition_is_open && !is_buffer_empty()) {
|
||||||
|
_partition_is_open = true;
|
||||||
|
}
|
||||||
if (underlying_finished) {
|
if (underlying_finished) {
|
||||||
return on_end_of_underlying_stream();
|
return on_end_of_underlying_stream();
|
||||||
}
|
}
|
||||||
@@ -452,17 +465,27 @@ flat_mutation_reader_v2 make_nonforwardable(flat_mutation_reader_v2 r, bool sing
|
|||||||
}
|
}
|
||||||
virtual future<> next_partition() override {
|
virtual future<> next_partition() override {
|
||||||
clear_buffer_to_next_partition();
|
clear_buffer_to_next_partition();
|
||||||
auto maybe_next_partition = make_ready_future<>();;
|
auto maybe_next_partition = make_ready_future<>();
|
||||||
if (is_buffer_empty()) {
|
if (is_buffer_empty()) {
|
||||||
|
if (_end_of_stream || (_partition_is_open && _single_partition)) {
|
||||||
|
_end_of_stream = true;
|
||||||
|
return maybe_next_partition;
|
||||||
|
}
|
||||||
|
reset_partition();
|
||||||
maybe_next_partition = _underlying.next_partition();
|
maybe_next_partition = _underlying.next_partition();
|
||||||
}
|
}
|
||||||
return maybe_next_partition.then([this] {
|
return maybe_next_partition.then([this] {
|
||||||
_end_of_stream = is_end_end_of_underlying_stream();
|
_end_of_stream = is_end_end_of_underlying_stream();
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
virtual future<> fast_forward_to(const dht::partition_range& pr) override {
|
virtual future<> fast_forward_to(const dht::partition_range& pr) override {
|
||||||
_end_of_stream = false;
|
|
||||||
clear_buffer();
|
clear_buffer();
|
||||||
|
if (_single_partition) {
|
||||||
|
_end_of_stream = true;
|
||||||
|
return make_ready_future<>();
|
||||||
|
}
|
||||||
|
reset_partition();
|
||||||
|
_end_of_stream = false;
|
||||||
return _underlying.fast_forward_to(pr);
|
return _underlying.fast_forward_to(pr);
|
||||||
}
|
}
|
||||||
virtual future<> close() noexcept override {
|
virtual future<> close() noexcept override {
|
||||||
@@ -1532,7 +1555,7 @@ public:
|
|||||||
return _reader.fast_forward_to(pr);
|
return _reader.fast_forward_to(pr);
|
||||||
}
|
}
|
||||||
virtual future<> fast_forward_to(position_range pr) override {
|
virtual future<> fast_forward_to(position_range pr) override {
|
||||||
forward_buffer_to(pr.start());
|
clear_buffer();
|
||||||
_end_of_stream = false;
|
_end_of_stream = false;
|
||||||
return _reader.fast_forward_to(std::move(pr));
|
return _reader.fast_forward_to(std::move(pr));
|
||||||
}
|
}
|
||||||
|
|||||||
24
row_cache.cc
24
row_cache.cc
@@ -354,8 +354,9 @@ future<> read_context::create_underlying() {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
static flat_mutation_reader_v2 read_directly_from_underlying(read_context& reader) {
|
static flat_mutation_reader_v2 read_directly_from_underlying(read_context& reader, mutation_fragment_v2 partition_start) {
|
||||||
auto res = make_delegating_reader(reader.underlying().underlying());
|
auto res = make_delegating_reader(reader.underlying().underlying());
|
||||||
|
res.unpop_mutation_fragment(std::move(partition_start));
|
||||||
res.upgrade_schema(reader.schema());
|
res.upgrade_schema(reader.schema());
|
||||||
return make_nonforwardable(std::move(res), true);
|
return make_nonforwardable(std::move(res), true);
|
||||||
}
|
}
|
||||||
@@ -388,8 +389,7 @@ private:
|
|||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
_cache._tracker.on_mispopulate();
|
_cache._tracker.on_mispopulate();
|
||||||
_reader = read_directly_from_underlying(*_read_context);
|
_reader = read_directly_from_underlying(*_read_context, std::move(*mfopt));
|
||||||
this->push_mutation_fragment(std::move(*mfopt));
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
@@ -514,15 +514,13 @@ public:
|
|||||||
, _read_context(ctx)
|
, _read_context(ctx)
|
||||||
{}
|
{}
|
||||||
|
|
||||||
using read_result = std::tuple<flat_mutation_reader_v2_opt, mutation_fragment_v2_opt>;
|
future<flat_mutation_reader_v2_opt> operator()() {
|
||||||
|
|
||||||
future<read_result> operator()() {
|
|
||||||
return _reader.move_to_next_partition().then([this] (auto&& mfopt) mutable {
|
return _reader.move_to_next_partition().then([this] (auto&& mfopt) mutable {
|
||||||
{
|
{
|
||||||
if (!mfopt) {
|
if (!mfopt) {
|
||||||
return _cache._read_section(_cache._tracker.region(), [&] {
|
return _cache._read_section(_cache._tracker.region(), [&] {
|
||||||
this->handle_end_of_stream();
|
this->handle_end_of_stream();
|
||||||
return make_ready_future<read_result>(read_result(std::nullopt, std::nullopt));
|
return make_ready_future<flat_mutation_reader_v2_opt>(std::nullopt);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
_cache.on_partition_miss();
|
_cache.on_partition_miss();
|
||||||
@@ -533,14 +531,12 @@ public:
|
|||||||
cache_entry& e = _cache.find_or_create_incomplete(ps, _reader.creation_phase(),
|
cache_entry& e = _cache.find_or_create_incomplete(ps, _reader.creation_phase(),
|
||||||
this->can_set_continuity() ? &*_last_key : nullptr);
|
this->can_set_continuity() ? &*_last_key : nullptr);
|
||||||
_last_key = row_cache::previous_entry_pointer(key);
|
_last_key = row_cache::previous_entry_pointer(key);
|
||||||
return make_ready_future<read_result>(
|
return make_ready_future<flat_mutation_reader_v2_opt>(e.read(_cache, _read_context, _reader.creation_phase()));
|
||||||
read_result(e.read(_cache, _read_context, _reader.creation_phase()), std::nullopt));
|
|
||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
_cache._tracker.on_mispopulate();
|
_cache._tracker.on_mispopulate();
|
||||||
_last_key = row_cache::previous_entry_pointer(key);
|
_last_key = row_cache::previous_entry_pointer(key);
|
||||||
return make_ready_future<read_result>(
|
return make_ready_future<flat_mutation_reader_v2_opt>(read_directly_from_underlying(_read_context, std::move(*mfopt)));
|
||||||
read_result(read_directly_from_underlying(_read_context), std::move(mfopt)));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
@@ -644,12 +640,8 @@ private:
|
|||||||
}
|
}
|
||||||
|
|
||||||
future<flat_mutation_reader_v2_opt> read_from_secondary() {
|
future<flat_mutation_reader_v2_opt> read_from_secondary() {
|
||||||
return _secondary_reader().then([this] (range_populating_reader::read_result&& res) {
|
return _secondary_reader().then([this] (flat_mutation_reader_v2_opt&& fropt) {
|
||||||
auto&& [fropt, ps] = res;
|
|
||||||
if (fropt) {
|
if (fropt) {
|
||||||
if (ps) {
|
|
||||||
push_mutation_fragment(std::move(*ps));
|
|
||||||
}
|
|
||||||
return make_ready_future<flat_mutation_reader_v2_opt>(std::move(fropt));
|
return make_ready_future<flat_mutation_reader_v2_opt>(std::move(fropt));
|
||||||
} else {
|
} else {
|
||||||
_secondary_in_progress = false;
|
_secondary_in_progress = false;
|
||||||
|
|||||||
@@ -1465,7 +1465,7 @@ public:
|
|||||||
// If _ds is not created then next_partition() has no effect because there was no partition_start emitted yet.
|
// If _ds is not created then next_partition() has no effect because there was no partition_start emitted yet.
|
||||||
}
|
}
|
||||||
virtual future<> fast_forward_to(position_range cr) override {
|
virtual future<> fast_forward_to(position_range cr) override {
|
||||||
forward_buffer_to(cr.start());
|
clear_buffer();
|
||||||
if (!_partition_finished) {
|
if (!_partition_finished) {
|
||||||
_end_of_stream = false;
|
_end_of_stream = false;
|
||||||
return advance_context(_consumer.fast_forward_to(std::move(cr)));
|
return advance_context(_consumer.fast_forward_to(std::move(cr)));
|
||||||
|
|||||||
@@ -1653,7 +1653,7 @@ public:
|
|||||||
// If _ds is not created then next_partition() has no effect because there was no partition_start emitted yet.
|
// If _ds is not created then next_partition() has no effect because there was no partition_start emitted yet.
|
||||||
}
|
}
|
||||||
virtual future<> fast_forward_to(position_range cr) override {
|
virtual future<> fast_forward_to(position_range cr) override {
|
||||||
forward_buffer_to(cr.start());
|
clear_buffer();
|
||||||
if (!_partition_finished) {
|
if (!_partition_finished) {
|
||||||
_end_of_stream = false;
|
_end_of_stream = false;
|
||||||
return advance_context(_consumer.fast_forward_to(std::move(cr)));
|
return advance_context(_consumer.fast_forward_to(std::move(cr)));
|
||||||
|
|||||||
@@ -38,6 +38,7 @@
|
|||||||
#include "readers/from_fragments_v2.hh"
|
#include "readers/from_fragments_v2.hh"
|
||||||
#include "readers/forwardable_v2.hh"
|
#include "readers/forwardable_v2.hh"
|
||||||
#include "readers/compacting.hh"
|
#include "readers/compacting.hh"
|
||||||
|
#include "readers/nonforwardable.hh"
|
||||||
|
|
||||||
struct mock_consumer {
|
struct mock_consumer {
|
||||||
struct result {
|
struct result {
|
||||||
@@ -110,193 +111,187 @@ static size_t count_fragments(mutation m) {
|
|||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
SEASTAR_TEST_CASE(test_flat_mutation_reader_consume_single_partition) {
|
SEASTAR_THREAD_TEST_CASE(test_flat_mutation_reader_consume_single_partition) {
|
||||||
return seastar::async([] {
|
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
for_each_mutation([&] (const mutation& m) {
|
||||||
for_each_mutation([&] (const mutation& m) {
|
size_t fragments_in_m = count_fragments(m);
|
||||||
size_t fragments_in_m = count_fragments(m);
|
for (size_t depth = 1; depth <= fragments_in_m + 1; ++depth) {
|
||||||
for (size_t depth = 1; depth <= fragments_in_m + 1; ++depth) {
|
auto r = make_flat_mutation_reader_from_mutations_v2(m.schema(), semaphore.make_permit(), m);
|
||||||
auto r = make_flat_mutation_reader_from_mutations_v2(m.schema(), semaphore.make_permit(), m);
|
auto close_reader = deferred_close(r);
|
||||||
auto close_reader = deferred_close(r);
|
auto result = r.consume(mock_consumer(*m.schema(), semaphore.make_permit(), depth)).get0();
|
||||||
auto result = r.consume(mock_consumer(*m.schema(), semaphore.make_permit(), depth)).get0();
|
BOOST_REQUIRE(result._consume_end_of_stream_called);
|
||||||
BOOST_REQUIRE(result._consume_end_of_stream_called);
|
BOOST_REQUIRE_EQUAL(1, result._consume_new_partition_call_count);
|
||||||
BOOST_REQUIRE_EQUAL(1, result._consume_new_partition_call_count);
|
BOOST_REQUIRE_EQUAL(1, result._consume_end_of_partition_call_count);
|
||||||
BOOST_REQUIRE_EQUAL(1, result._consume_end_of_partition_call_count);
|
BOOST_REQUIRE_EQUAL(m.partition().partition_tombstone() ? 1 : 0, result._consume_tombstone_call_count);
|
||||||
BOOST_REQUIRE_EQUAL(m.partition().partition_tombstone() ? 1 : 0, result._consume_tombstone_call_count);
|
auto r2 = assert_that(make_flat_mutation_reader_from_mutations_v2(m.schema(), semaphore.make_permit(), m));
|
||||||
auto r2 = assert_that(make_flat_mutation_reader_from_mutations_v2(m.schema(), semaphore.make_permit(), m));
|
r2.produces_partition_start(m.decorated_key(), m.partition().partition_tombstone());
|
||||||
r2.produces_partition_start(m.decorated_key(), m.partition().partition_tombstone());
|
if (result._fragments.empty()) {
|
||||||
if (result._fragments.empty()) {
|
continue;
|
||||||
continue;
|
|
||||||
}
|
|
||||||
for (auto& mf : result._fragments) {
|
|
||||||
r2.produces(*m.schema(), mf);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
});
|
for (auto& mf : result._fragments) {
|
||||||
|
r2.produces(*m.schema(), mf);
|
||||||
|
}
|
||||||
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
SEASTAR_TEST_CASE(test_flat_mutation_reader_consume_two_partitions) {
|
SEASTAR_THREAD_TEST_CASE(test_flat_mutation_reader_consume_two_partitions) {
|
||||||
return seastar::async([] {
|
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
auto test = [&semaphore] (mutation m1, mutation m2) {
|
||||||
auto test = [&semaphore] (mutation m1, mutation m2) {
|
size_t fragments_in_m1 = count_fragments(m1);
|
||||||
size_t fragments_in_m1 = count_fragments(m1);
|
size_t fragments_in_m2 = count_fragments(m2);
|
||||||
size_t fragments_in_m2 = count_fragments(m2);
|
for (size_t depth = 1; depth < fragments_in_m1; ++depth) {
|
||||||
for (size_t depth = 1; depth < fragments_in_m1; ++depth) {
|
auto r = make_flat_mutation_reader_from_mutations_v2(m1.schema(), semaphore.make_permit(), {m1, m2});
|
||||||
auto r = make_flat_mutation_reader_from_mutations_v2(m1.schema(), semaphore.make_permit(), {m1, m2});
|
auto close_r = deferred_close(r);
|
||||||
auto close_r = deferred_close(r);
|
auto result = r.consume(mock_consumer(*m1.schema(), semaphore.make_permit(), depth)).get0();
|
||||||
auto result = r.consume(mock_consumer(*m1.schema(), semaphore.make_permit(), depth)).get0();
|
BOOST_REQUIRE(result._consume_end_of_stream_called);
|
||||||
BOOST_REQUIRE(result._consume_end_of_stream_called);
|
BOOST_REQUIRE_EQUAL(1, result._consume_new_partition_call_count);
|
||||||
BOOST_REQUIRE_EQUAL(1, result._consume_new_partition_call_count);
|
BOOST_REQUIRE_EQUAL(1, result._consume_end_of_partition_call_count);
|
||||||
BOOST_REQUIRE_EQUAL(1, result._consume_end_of_partition_call_count);
|
BOOST_REQUIRE_EQUAL(m1.partition().partition_tombstone() ? 1 : 0, result._consume_tombstone_call_count);
|
||||||
BOOST_REQUIRE_EQUAL(m1.partition().partition_tombstone() ? 1 : 0, result._consume_tombstone_call_count);
|
auto r2 = make_flat_mutation_reader_from_mutations_v2(m1.schema(), semaphore.make_permit(), {m1, m2});
|
||||||
auto r2 = make_flat_mutation_reader_from_mutations_v2(m1.schema(), semaphore.make_permit(), {m1, m2});
|
auto close_r2 = deferred_close(r2);
|
||||||
auto close_r2 = deferred_close(r2);
|
auto start = r2().get0();
|
||||||
auto start = r2().get0();
|
BOOST_REQUIRE(start);
|
||||||
BOOST_REQUIRE(start);
|
BOOST_REQUIRE(start->is_partition_start());
|
||||||
BOOST_REQUIRE(start->is_partition_start());
|
for (auto& mf : result._fragments) {
|
||||||
for (auto& mf : result._fragments) {
|
auto mfopt = r2().get0();
|
||||||
auto mfopt = r2().get0();
|
BOOST_REQUIRE(mfopt);
|
||||||
BOOST_REQUIRE(mfopt);
|
BOOST_REQUIRE(mf.equal(*m1.schema(), *mfopt));
|
||||||
BOOST_REQUIRE(mf.equal(*m1.schema(), *mfopt));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
for (size_t depth = fragments_in_m1; depth < fragments_in_m1 + fragments_in_m2 + 1; ++depth) {
|
}
|
||||||
auto r = make_flat_mutation_reader_from_mutations_v2(m1.schema(), semaphore.make_permit(), {m1, m2});
|
for (size_t depth = fragments_in_m1; depth < fragments_in_m1 + fragments_in_m2 + 1; ++depth) {
|
||||||
auto close_r = deferred_close(r);
|
auto r = make_flat_mutation_reader_from_mutations_v2(m1.schema(), semaphore.make_permit(), {m1, m2});
|
||||||
auto result = r.consume(mock_consumer(*m1.schema(), semaphore.make_permit(), depth)).get0();
|
auto close_r = deferred_close(r);
|
||||||
BOOST_REQUIRE(result._consume_end_of_stream_called);
|
auto result = r.consume(mock_consumer(*m1.schema(), semaphore.make_permit(), depth)).get0();
|
||||||
BOOST_REQUIRE_EQUAL(2, result._consume_new_partition_call_count);
|
BOOST_REQUIRE(result._consume_end_of_stream_called);
|
||||||
BOOST_REQUIRE_EQUAL(2, result._consume_end_of_partition_call_count);
|
BOOST_REQUIRE_EQUAL(2, result._consume_new_partition_call_count);
|
||||||
size_t tombstones_count = 0;
|
BOOST_REQUIRE_EQUAL(2, result._consume_end_of_partition_call_count);
|
||||||
if (m1.partition().partition_tombstone()) {
|
size_t tombstones_count = 0;
|
||||||
++tombstones_count;
|
if (m1.partition().partition_tombstone()) {
|
||||||
}
|
++tombstones_count;
|
||||||
if (m2.partition().partition_tombstone()) {
|
|
||||||
++tombstones_count;
|
|
||||||
}
|
|
||||||
BOOST_REQUIRE_EQUAL(tombstones_count, result._consume_tombstone_call_count);
|
|
||||||
auto r2 = make_flat_mutation_reader_from_mutations_v2(m1.schema(), semaphore.make_permit(), {m1, m2});
|
|
||||||
auto close_r2 = deferred_close(r2);
|
|
||||||
auto start = r2().get0();
|
|
||||||
BOOST_REQUIRE(start);
|
|
||||||
BOOST_REQUIRE(start->is_partition_start());
|
|
||||||
for (auto& mf : result._fragments) {
|
|
||||||
auto mfopt = r2().get0();
|
|
||||||
BOOST_REQUIRE(mfopt);
|
|
||||||
if (mfopt->is_partition_start() || mfopt->is_end_of_partition()) {
|
|
||||||
mfopt = r2().get0();
|
|
||||||
}
|
|
||||||
BOOST_REQUIRE(mfopt);
|
|
||||||
BOOST_REQUIRE(mf.equal(*m1.schema(), *mfopt));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
};
|
if (m2.partition().partition_tombstone()) {
|
||||||
for_each_mutation_pair([&] (auto&& m, auto&& m2, are_equal) {
|
++tombstones_count;
|
||||||
if (m.decorated_key().less_compare(*m.schema(), m2.decorated_key())) {
|
|
||||||
test(m, m2);
|
|
||||||
} else if (m2.decorated_key().less_compare(*m.schema(), m.decorated_key())) {
|
|
||||||
test(m2, m);
|
|
||||||
}
|
}
|
||||||
});
|
BOOST_REQUIRE_EQUAL(tombstones_count, result._consume_tombstone_call_count);
|
||||||
|
auto r2 = make_flat_mutation_reader_from_mutations_v2(m1.schema(), semaphore.make_permit(), {m1, m2});
|
||||||
|
auto close_r2 = deferred_close(r2);
|
||||||
|
auto start = r2().get0();
|
||||||
|
BOOST_REQUIRE(start);
|
||||||
|
BOOST_REQUIRE(start->is_partition_start());
|
||||||
|
for (auto& mf : result._fragments) {
|
||||||
|
auto mfopt = r2().get0();
|
||||||
|
BOOST_REQUIRE(mfopt);
|
||||||
|
if (mfopt->is_partition_start() || mfopt->is_end_of_partition()) {
|
||||||
|
mfopt = r2().get0();
|
||||||
|
}
|
||||||
|
BOOST_REQUIRE(mfopt);
|
||||||
|
BOOST_REQUIRE(mf.equal(*m1.schema(), *mfopt));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
for_each_mutation_pair([&] (auto&& m, auto&& m2, are_equal) {
|
||||||
|
if (m.decorated_key().less_compare(*m.schema(), m2.decorated_key())) {
|
||||||
|
test(m, m2);
|
||||||
|
} else if (m2.decorated_key().less_compare(*m.schema(), m.decorated_key())) {
|
||||||
|
test(m2, m);
|
||||||
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
SEASTAR_TEST_CASE(test_fragmenting_and_freezing) {
|
SEASTAR_THREAD_TEST_CASE(test_fragmenting_and_freezing) {
|
||||||
return seastar::async([] {
|
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
for_each_mutation([&] (const mutation& m) {
|
||||||
for_each_mutation([&] (const mutation& m) {
|
std::vector<frozen_mutation> fms;
|
||||||
std::vector<frozen_mutation> fms;
|
|
||||||
|
|
||||||
fragment_and_freeze(make_flat_mutation_reader_from_mutations_v2(m.schema(), semaphore.make_permit(), { mutation(m) }), [&] (auto fm, bool frag) {
|
fragment_and_freeze(make_flat_mutation_reader_from_mutations_v2(m.schema(), semaphore.make_permit(), { mutation(m) }), [&] (auto fm, bool frag) {
|
||||||
|
BOOST_REQUIRE(!frag);
|
||||||
|
fms.emplace_back(std::move(fm));
|
||||||
|
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||||
|
}, std::numeric_limits<size_t>::max()).get0();
|
||||||
|
|
||||||
|
BOOST_REQUIRE_EQUAL(fms.size(), 1);
|
||||||
|
|
||||||
|
auto m1 = fms.back().unfreeze(m.schema());
|
||||||
|
BOOST_REQUIRE_EQUAL(m, m1);
|
||||||
|
|
||||||
|
fms.clear();
|
||||||
|
|
||||||
|
std::optional<bool> fragmented;
|
||||||
|
fragment_and_freeze(make_flat_mutation_reader_from_mutations_v2(m.schema(), semaphore.make_permit(), { mutation(m) }), [&] (auto fm, bool frag) {
|
||||||
|
BOOST_REQUIRE(!fragmented || *fragmented == frag);
|
||||||
|
*fragmented = frag;
|
||||||
|
fms.emplace_back(std::move(fm));
|
||||||
|
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||||
|
}, 1).get0();
|
||||||
|
|
||||||
|
auto&& rows = m.partition().non_dummy_rows();
|
||||||
|
auto expected_fragments = std::distance(rows.begin(), rows.end())
|
||||||
|
+ m.partition().row_tombstones().size()
|
||||||
|
+ !m.partition().static_row().empty();
|
||||||
|
BOOST_REQUIRE_EQUAL(fms.size(), std::max(expected_fragments, size_t(1)));
|
||||||
|
BOOST_REQUIRE(expected_fragments < 2 || *fragmented);
|
||||||
|
|
||||||
|
auto m2 = fms.back().unfreeze(m.schema());
|
||||||
|
fms.pop_back();
|
||||||
|
mutation_application_stats app_stats;
|
||||||
|
while (!fms.empty()) {
|
||||||
|
m2.partition().apply(*m.schema(), fms.back().partition(), *m.schema(), app_stats);
|
||||||
|
fms.pop_back();
|
||||||
|
}
|
||||||
|
BOOST_REQUIRE_EQUAL(m, m2);
|
||||||
|
});
|
||||||
|
|
||||||
|
auto test_random_streams = [&semaphore] (random_mutation_generator&& gen) {
|
||||||
|
for (auto i = 0; i < 4; i++) {
|
||||||
|
auto muts = gen(4);
|
||||||
|
auto s = muts[0].schema();
|
||||||
|
|
||||||
|
std::vector<frozen_mutation> frozen;
|
||||||
|
|
||||||
|
// Freeze all
|
||||||
|
fragment_and_freeze(make_flat_mutation_reader_from_mutations_v2(gen.schema(), semaphore.make_permit(), muts), [&] (auto fm, bool frag) {
|
||||||
BOOST_REQUIRE(!frag);
|
BOOST_REQUIRE(!frag);
|
||||||
fms.emplace_back(std::move(fm));
|
frozen.emplace_back(fm);
|
||||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||||
}, std::numeric_limits<size_t>::max()).get0();
|
}, std::numeric_limits<size_t>::max()).get0();
|
||||||
|
BOOST_REQUIRE_EQUAL(muts.size(), frozen.size());
|
||||||
|
for (auto j = 0u; j < muts.size(); j++) {
|
||||||
|
BOOST_REQUIRE_EQUAL(muts[j], frozen[j].unfreeze(s));
|
||||||
|
}
|
||||||
|
|
||||||
BOOST_REQUIRE_EQUAL(fms.size(), 1);
|
// Freeze first
|
||||||
|
frozen.clear();
|
||||||
|
fragment_and_freeze(make_flat_mutation_reader_from_mutations_v2(gen.schema(), semaphore.make_permit(), muts), [&] (auto fm, bool frag) {
|
||||||
|
BOOST_REQUIRE(!frag);
|
||||||
|
frozen.emplace_back(fm);
|
||||||
|
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||||
|
}, std::numeric_limits<size_t>::max()).get0();
|
||||||
|
BOOST_REQUIRE_EQUAL(frozen.size(), 1);
|
||||||
|
BOOST_REQUIRE_EQUAL(muts[0], frozen[0].unfreeze(s));
|
||||||
|
|
||||||
auto m1 = fms.back().unfreeze(m.schema());
|
// Fragment and freeze all
|
||||||
BOOST_REQUIRE_EQUAL(m, m1);
|
frozen.clear();
|
||||||
|
fragment_and_freeze(make_flat_mutation_reader_from_mutations_v2(gen.schema(), semaphore.make_permit(), muts), [&] (auto fm, bool frag) {
|
||||||
fms.clear();
|
frozen.emplace_back(fm);
|
||||||
|
|
||||||
std::optional<bool> fragmented;
|
|
||||||
fragment_and_freeze(make_flat_mutation_reader_from_mutations_v2(m.schema(), semaphore.make_permit(), { mutation(m) }), [&] (auto fm, bool frag) {
|
|
||||||
BOOST_REQUIRE(!fragmented || *fragmented == frag);
|
|
||||||
*fragmented = frag;
|
|
||||||
fms.emplace_back(std::move(fm));
|
|
||||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||||
}, 1).get0();
|
}, 1).get0();
|
||||||
|
std::vector<mutation> unfrozen;
|
||||||
auto&& rows = m.partition().non_dummy_rows();
|
while (!frozen.empty()) {
|
||||||
auto expected_fragments = std::distance(rows.begin(), rows.end())
|
auto m = frozen.front().unfreeze(s);
|
||||||
+ m.partition().row_tombstones().size()
|
frozen.erase(frozen.begin());
|
||||||
+ !m.partition().static_row().empty();
|
if (unfrozen.empty() || !unfrozen.back().decorated_key().equal(*s, m.decorated_key())) {
|
||||||
BOOST_REQUIRE_EQUAL(fms.size(), std::max(expected_fragments, size_t(1)));
|
unfrozen.emplace_back(std::move(m));
|
||||||
BOOST_REQUIRE(expected_fragments < 2 || *fragmented);
|
} else {
|
||||||
|
unfrozen.back().apply(std::move(m));
|
||||||
auto m2 = fms.back().unfreeze(m.schema());
|
|
||||||
fms.pop_back();
|
|
||||||
mutation_application_stats app_stats;
|
|
||||||
while (!fms.empty()) {
|
|
||||||
m2.partition().apply(*m.schema(), fms.back().partition(), *m.schema(), app_stats);
|
|
||||||
fms.pop_back();
|
|
||||||
}
|
|
||||||
BOOST_REQUIRE_EQUAL(m, m2);
|
|
||||||
});
|
|
||||||
|
|
||||||
auto test_random_streams = [&semaphore] (random_mutation_generator&& gen) {
|
|
||||||
for (auto i = 0; i < 4; i++) {
|
|
||||||
auto muts = gen(4);
|
|
||||||
auto s = muts[0].schema();
|
|
||||||
|
|
||||||
std::vector<frozen_mutation> frozen;
|
|
||||||
|
|
||||||
// Freeze all
|
|
||||||
fragment_and_freeze(make_flat_mutation_reader_from_mutations_v2(gen.schema(), semaphore.make_permit(), muts), [&] (auto fm, bool frag) {
|
|
||||||
BOOST_REQUIRE(!frag);
|
|
||||||
frozen.emplace_back(fm);
|
|
||||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
|
||||||
}, std::numeric_limits<size_t>::max()).get0();
|
|
||||||
BOOST_REQUIRE_EQUAL(muts.size(), frozen.size());
|
|
||||||
for (auto j = 0u; j < muts.size(); j++) {
|
|
||||||
BOOST_REQUIRE_EQUAL(muts[j], frozen[j].unfreeze(s));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Freeze first
|
|
||||||
frozen.clear();
|
|
||||||
fragment_and_freeze(make_flat_mutation_reader_from_mutations_v2(gen.schema(), semaphore.make_permit(), muts), [&] (auto fm, bool frag) {
|
|
||||||
BOOST_REQUIRE(!frag);
|
|
||||||
frozen.emplace_back(fm);
|
|
||||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
|
||||||
}, std::numeric_limits<size_t>::max()).get0();
|
|
||||||
BOOST_REQUIRE_EQUAL(frozen.size(), 1);
|
|
||||||
BOOST_REQUIRE_EQUAL(muts[0], frozen[0].unfreeze(s));
|
|
||||||
|
|
||||||
// Fragment and freeze all
|
|
||||||
frozen.clear();
|
|
||||||
fragment_and_freeze(make_flat_mutation_reader_from_mutations_v2(gen.schema(), semaphore.make_permit(), muts), [&] (auto fm, bool frag) {
|
|
||||||
frozen.emplace_back(fm);
|
|
||||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
|
||||||
}, 1).get0();
|
|
||||||
std::vector<mutation> unfrozen;
|
|
||||||
while (!frozen.empty()) {
|
|
||||||
auto m = frozen.front().unfreeze(s);
|
|
||||||
frozen.erase(frozen.begin());
|
|
||||||
if (unfrozen.empty() || !unfrozen.back().decorated_key().equal(*s, m.decorated_key())) {
|
|
||||||
unfrozen.emplace_back(std::move(m));
|
|
||||||
} else {
|
|
||||||
unfrozen.back().apply(std::move(m));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
BOOST_REQUIRE_EQUAL(muts, unfrozen);
|
|
||||||
}
|
}
|
||||||
};
|
BOOST_REQUIRE_EQUAL(muts, unfrozen);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
test_random_streams(random_mutation_generator(random_mutation_generator::generate_counters::no));
|
test_random_streams(random_mutation_generator(random_mutation_generator::generate_counters::no));
|
||||||
test_random_streams(random_mutation_generator(random_mutation_generator::generate_counters::yes));
|
test_random_streams(random_mutation_generator(random_mutation_generator::generate_counters::yes));
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SEASTAR_THREAD_TEST_CASE(test_flat_mutation_reader_move_buffer_content_to) {
|
SEASTAR_THREAD_TEST_CASE(test_flat_mutation_reader_move_buffer_content_to) {
|
||||||
@@ -371,111 +366,109 @@ SEASTAR_THREAD_TEST_CASE(test_flat_mutation_reader_move_buffer_content_to) {
|
|||||||
.is_equal_to(mut_orig);
|
.is_equal_to(mut_orig);
|
||||||
}
|
}
|
||||||
|
|
||||||
SEASTAR_TEST_CASE(test_multi_range_reader) {
|
SEASTAR_THREAD_TEST_CASE(test_multi_range_reader) {
|
||||||
return seastar::async([] {
|
simple_schema s;
|
||||||
simple_schema s;
|
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
auto permit = semaphore.make_permit();
|
||||||
auto permit = semaphore.make_permit();
|
|
||||||
|
|
||||||
auto keys = s.make_pkeys(10);
|
auto keys = s.make_pkeys(10);
|
||||||
auto ring = s.to_ring_positions(keys);
|
auto ring = s.to_ring_positions(keys);
|
||||||
|
|
||||||
auto crs = boost::copy_range<std::vector<mutation_fragment>>(boost::irange(0, 3) | boost::adaptors::transformed([&] (auto n) {
|
auto crs = boost::copy_range<std::vector<mutation_fragment>>(boost::irange(0, 3) | boost::adaptors::transformed([&] (auto n) {
|
||||||
return s.make_row(permit, s.make_ckey(n), "value");
|
return s.make_row(permit, s.make_ckey(n), "value");
|
||||||
}));
|
}));
|
||||||
|
|
||||||
auto ms = boost::copy_range<std::vector<mutation>>(keys | boost::adaptors::transformed([&] (auto& key) {
|
auto ms = boost::copy_range<std::vector<mutation>>(keys | boost::adaptors::transformed([&] (auto& key) {
|
||||||
auto m = mutation(s.schema(), key);
|
auto m = mutation(s.schema(), key);
|
||||||
for (auto& mf : crs) {
|
for (auto& mf : crs) {
|
||||||
m.apply(mf);
|
m.apply(mf);
|
||||||
}
|
}
|
||||||
return m;
|
return m;
|
||||||
}));
|
}));
|
||||||
|
|
||||||
auto source = mutation_source([&] (schema_ptr, reader_permit permit, const dht::partition_range& range) {
|
auto source = mutation_source([&] (schema_ptr, reader_permit permit, const dht::partition_range& range) {
|
||||||
return make_flat_mutation_reader_from_mutations_v2(s.schema(), std::move(permit), ms, range);
|
return make_flat_mutation_reader_from_mutations_v2(s.schema(), std::move(permit), ms, range);
|
||||||
});
|
|
||||||
|
|
||||||
const auto empty_ranges = dht::partition_range_vector{};
|
|
||||||
const auto single_ranges = dht::partition_range_vector{
|
|
||||||
dht::partition_range::make(ring[1], ring[2]),
|
|
||||||
};
|
|
||||||
const auto multiple_ranges = dht::partition_range_vector {
|
|
||||||
dht::partition_range::make(ring[1], ring[2]),
|
|
||||||
dht::partition_range::make_singular(ring[4]),
|
|
||||||
dht::partition_range::make(ring[6], ring[8]),
|
|
||||||
};
|
|
||||||
const auto empty_generator = [] { return std::optional<dht::partition_range>{}; };
|
|
||||||
const auto single_generator = [r = std::optional<dht::partition_range>(single_ranges.front())] () mutable {
|
|
||||||
return std::exchange(r, {});
|
|
||||||
};
|
|
||||||
const auto multiple_generator = [it = multiple_ranges.cbegin(), end = multiple_ranges.cend()] () mutable -> std::optional<dht::partition_range> {
|
|
||||||
if (it == end) {
|
|
||||||
return std::nullopt;
|
|
||||||
}
|
|
||||||
return *(it++);
|
|
||||||
};
|
|
||||||
auto fft_range = dht::partition_range::make_starting_with(ring[9]);
|
|
||||||
|
|
||||||
// Generator ranges are single pass, so we need a new range each time they are used.
|
|
||||||
auto run_test = [&] (auto make_empty_ranges, auto make_single_ranges, auto make_multiple_ranges) {
|
|
||||||
testlog.info("empty ranges");
|
|
||||||
assert_that(make_flat_multi_range_reader(s.schema(), semaphore.make_permit(), source, make_empty_ranges(), s.schema()->full_slice()))
|
|
||||||
.produces_end_of_stream()
|
|
||||||
.fast_forward_to(fft_range)
|
|
||||||
.produces(ms[9])
|
|
||||||
.produces_end_of_stream();
|
|
||||||
|
|
||||||
testlog.info("single range");
|
|
||||||
assert_that(make_flat_multi_range_reader(s.schema(), semaphore.make_permit(), source, make_single_ranges(), s.schema()->full_slice()))
|
|
||||||
.produces(ms[1])
|
|
||||||
.produces(ms[2])
|
|
||||||
.produces_end_of_stream()
|
|
||||||
.fast_forward_to(fft_range)
|
|
||||||
.produces(ms[9])
|
|
||||||
.produces_end_of_stream();
|
|
||||||
|
|
||||||
testlog.info("read full partitions and fast forward");
|
|
||||||
assert_that(make_flat_multi_range_reader(s.schema(), semaphore.make_permit(), source, make_multiple_ranges(), s.schema()->full_slice()))
|
|
||||||
.produces(ms[1])
|
|
||||||
.produces(ms[2])
|
|
||||||
.produces(ms[4])
|
|
||||||
.produces(ms[6])
|
|
||||||
.fast_forward_to(fft_range)
|
|
||||||
.produces(ms[9])
|
|
||||||
.produces_end_of_stream();
|
|
||||||
|
|
||||||
testlog.info("read, skip partitions and fast forward");
|
|
||||||
assert_that(make_flat_multi_range_reader(s.schema(), semaphore.make_permit(), source, make_multiple_ranges(), s.schema()->full_slice()))
|
|
||||||
.produces_partition_start(keys[1])
|
|
||||||
.next_partition()
|
|
||||||
.produces_partition_start(keys[2])
|
|
||||||
.produces_row_with_key(crs[0].as_clustering_row().key())
|
|
||||||
.next_partition()
|
|
||||||
.produces(ms[4])
|
|
||||||
.next_partition()
|
|
||||||
.produces_partition_start(keys[6])
|
|
||||||
.produces_row_with_key(crs[0].as_clustering_row().key())
|
|
||||||
.produces_row_with_key(crs[1].as_clustering_row().key())
|
|
||||||
.fast_forward_to(fft_range)
|
|
||||||
.next_partition()
|
|
||||||
.produces_partition_start(keys[9])
|
|
||||||
.next_partition()
|
|
||||||
.produces_end_of_stream();
|
|
||||||
};
|
|
||||||
|
|
||||||
testlog.info("vector version");
|
|
||||||
run_test(
|
|
||||||
[&] { return empty_ranges; },
|
|
||||||
[&] { return single_ranges; },
|
|
||||||
[&] { return multiple_ranges; });
|
|
||||||
|
|
||||||
testlog.info("generator version");
|
|
||||||
run_test(
|
|
||||||
[&] { return empty_generator; },
|
|
||||||
[&] { return single_generator; },
|
|
||||||
[&] { return multiple_generator; });
|
|
||||||
});
|
});
|
||||||
|
|
||||||
|
const auto empty_ranges = dht::partition_range_vector{};
|
||||||
|
const auto single_ranges = dht::partition_range_vector{
|
||||||
|
dht::partition_range::make(ring[1], ring[2]),
|
||||||
|
};
|
||||||
|
const auto multiple_ranges = dht::partition_range_vector {
|
||||||
|
dht::partition_range::make(ring[1], ring[2]),
|
||||||
|
dht::partition_range::make_singular(ring[4]),
|
||||||
|
dht::partition_range::make(ring[6], ring[8]),
|
||||||
|
};
|
||||||
|
const auto empty_generator = [] { return std::optional<dht::partition_range>{}; };
|
||||||
|
const auto single_generator = [r = std::optional<dht::partition_range>(single_ranges.front())] () mutable {
|
||||||
|
return std::exchange(r, {});
|
||||||
|
};
|
||||||
|
const auto multiple_generator = [it = multiple_ranges.cbegin(), end = multiple_ranges.cend()] () mutable -> std::optional<dht::partition_range> {
|
||||||
|
if (it == end) {
|
||||||
|
return std::nullopt;
|
||||||
|
}
|
||||||
|
return *(it++);
|
||||||
|
};
|
||||||
|
auto fft_range = dht::partition_range::make_starting_with(ring[9]);
|
||||||
|
|
||||||
|
// Generator ranges are single pass, so we need a new range each time they are used.
|
||||||
|
auto run_test = [&] (auto make_empty_ranges, auto make_single_ranges, auto make_multiple_ranges) {
|
||||||
|
testlog.info("empty ranges");
|
||||||
|
assert_that(make_flat_multi_range_reader(s.schema(), semaphore.make_permit(), source, make_empty_ranges(), s.schema()->full_slice()))
|
||||||
|
.produces_end_of_stream()
|
||||||
|
.fast_forward_to(fft_range)
|
||||||
|
.produces(ms[9])
|
||||||
|
.produces_end_of_stream();
|
||||||
|
|
||||||
|
testlog.info("single range");
|
||||||
|
assert_that(make_flat_multi_range_reader(s.schema(), semaphore.make_permit(), source, make_single_ranges(), s.schema()->full_slice()))
|
||||||
|
.produces(ms[1])
|
||||||
|
.produces(ms[2])
|
||||||
|
.produces_end_of_stream()
|
||||||
|
.fast_forward_to(fft_range)
|
||||||
|
.produces(ms[9])
|
||||||
|
.produces_end_of_stream();
|
||||||
|
|
||||||
|
testlog.info("read full partitions and fast forward");
|
||||||
|
assert_that(make_flat_multi_range_reader(s.schema(), semaphore.make_permit(), source, make_multiple_ranges(), s.schema()->full_slice()))
|
||||||
|
.produces(ms[1])
|
||||||
|
.produces(ms[2])
|
||||||
|
.produces(ms[4])
|
||||||
|
.produces(ms[6])
|
||||||
|
.fast_forward_to(fft_range)
|
||||||
|
.produces(ms[9])
|
||||||
|
.produces_end_of_stream();
|
||||||
|
|
||||||
|
testlog.info("read, skip partitions and fast forward");
|
||||||
|
assert_that(make_flat_multi_range_reader(s.schema(), semaphore.make_permit(), source, make_multiple_ranges(), s.schema()->full_slice()))
|
||||||
|
.produces_partition_start(keys[1])
|
||||||
|
.next_partition()
|
||||||
|
.produces_partition_start(keys[2])
|
||||||
|
.produces_row_with_key(crs[0].as_clustering_row().key())
|
||||||
|
.next_partition()
|
||||||
|
.produces(ms[4])
|
||||||
|
.next_partition()
|
||||||
|
.produces_partition_start(keys[6])
|
||||||
|
.produces_row_with_key(crs[0].as_clustering_row().key())
|
||||||
|
.produces_row_with_key(crs[1].as_clustering_row().key())
|
||||||
|
.fast_forward_to(fft_range)
|
||||||
|
.next_partition()
|
||||||
|
.produces_partition_start(keys[9])
|
||||||
|
.next_partition()
|
||||||
|
.produces_end_of_stream();
|
||||||
|
};
|
||||||
|
|
||||||
|
testlog.info("vector version");
|
||||||
|
run_test(
|
||||||
|
[&] { return empty_ranges; },
|
||||||
|
[&] { return single_ranges; },
|
||||||
|
[&] { return multiple_ranges; });
|
||||||
|
|
||||||
|
testlog.info("generator version");
|
||||||
|
run_test(
|
||||||
|
[&] { return empty_generator; },
|
||||||
|
[&] { return single_generator; },
|
||||||
|
[&] { return multiple_generator; });
|
||||||
}
|
}
|
||||||
|
|
||||||
using reversed_partitions = seastar::bool_class<class reversed_partitions_tag>;
|
using reversed_partitions = seastar::bool_class<class reversed_partitions_tag>;
|
||||||
@@ -648,95 +641,290 @@ void test_flat_stream(schema_ptr s, std::vector<mutation> muts, reversed_partiti
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SEASTAR_TEST_CASE(test_consume_flat) {
|
SEASTAR_THREAD_TEST_CASE(test_consume_flat) {
|
||||||
return seastar::async([] {
|
auto test_random_streams = [&] (random_mutation_generator&& gen) {
|
||||||
auto test_random_streams = [&] (random_mutation_generator&& gen) {
|
for (auto i = 0; i < 4; i++) {
|
||||||
for (auto i = 0; i < 4; i++) {
|
auto muts = gen(4);
|
||||||
auto muts = gen(4);
|
test_flat_stream(gen.schema(), muts, reversed_partitions::no, in_thread::no);
|
||||||
test_flat_stream(gen.schema(), muts, reversed_partitions::no, in_thread::no);
|
test_flat_stream(gen.schema(), muts, reversed_partitions::yes, in_thread::no);
|
||||||
test_flat_stream(gen.schema(), muts, reversed_partitions::yes, in_thread::no);
|
test_flat_stream(gen.schema(), muts, reversed_partitions::no, in_thread::yes);
|
||||||
test_flat_stream(gen.schema(), muts, reversed_partitions::no, in_thread::yes);
|
}
|
||||||
}
|
};
|
||||||
};
|
|
||||||
|
|
||||||
test_random_streams(random_mutation_generator(random_mutation_generator::generate_counters::no));
|
test_random_streams(random_mutation_generator(random_mutation_generator::generate_counters::no));
|
||||||
test_random_streams(random_mutation_generator(random_mutation_generator::generate_counters::yes));
|
test_random_streams(random_mutation_generator(random_mutation_generator::generate_counters::yes));
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SEASTAR_TEST_CASE(test_make_forwardable) {
|
SEASTAR_THREAD_TEST_CASE(test_make_forwardable) {
|
||||||
return seastar::async([] {
|
simple_schema s;
|
||||||
simple_schema s;
|
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
auto permit = semaphore.make_permit();
|
||||||
auto permit = semaphore.make_permit();
|
|
||||||
|
|
||||||
auto keys = s.make_pkeys(10);
|
auto keys = s.make_pkeys(10);
|
||||||
|
|
||||||
auto crs = boost::copy_range < std::vector <
|
auto crs = boost::copy_range < std::vector <
|
||||||
mutation_fragment >> (boost::irange(0, 3) | boost::adaptors::transformed([&](auto n) {
|
mutation_fragment >> (boost::irange(0, 3) | boost::adaptors::transformed([&](auto n) {
|
||||||
return s.make_row(permit, s.make_ckey(n), "value");
|
return s.make_row(permit, s.make_ckey(n), "value");
|
||||||
}));
|
}));
|
||||||
|
|
||||||
auto ms = boost::copy_range < std::vector < mutation >> (keys | boost::adaptors::transformed([&](auto &key) {
|
auto ms = boost::copy_range < std::vector < mutation >> (keys | boost::adaptors::transformed([&](auto &key) {
|
||||||
auto m = mutation(s.schema(), key);
|
auto m = mutation(s.schema(), key);
|
||||||
for (auto &mf : crs) {
|
for (auto &mf : crs) {
|
||||||
m.apply(mf);
|
m.apply(mf);
|
||||||
}
|
}
|
||||||
return m;
|
return m;
|
||||||
}));
|
}));
|
||||||
|
|
||||||
auto make_reader = [&] (auto& range) {
|
auto make_reader = [&] (auto& range) {
|
||||||
return assert_that(
|
return assert_that(
|
||||||
make_forwardable(make_flat_mutation_reader_from_mutations_v2(s.schema(), semaphore.make_permit(), ms, range, streamed_mutation::forwarding::no)));
|
make_forwardable(make_flat_mutation_reader_from_mutations_v2(s.schema(), semaphore.make_permit(), ms, range, streamed_mutation::forwarding::no)));
|
||||||
};
|
};
|
||||||
|
|
||||||
auto test = [&] (auto& rd, auto& partition) {
|
auto test = [&] (auto& rd, auto& partition) {
|
||||||
rd.produces_partition_start(partition.decorated_key(), partition.partition().partition_tombstone());
|
rd.produces_partition_start(partition.decorated_key(), partition.partition().partition_tombstone());
|
||||||
rd.produces_end_of_stream();
|
rd.produces_end_of_stream();
|
||||||
rd.fast_forward_to(position_range::all_clustered_rows());
|
rd.fast_forward_to(position_range::all_clustered_rows());
|
||||||
for (auto &row : partition.partition().clustered_rows()) {
|
for (auto &row : partition.partition().clustered_rows()) {
|
||||||
rd.produces_row_with_key(row.key());
|
rd.produces_row_with_key(row.key());
|
||||||
}
|
}
|
||||||
rd.produces_end_of_stream();
|
rd.produces_end_of_stream();
|
||||||
|
rd.next_partition();
|
||||||
|
};
|
||||||
|
|
||||||
|
auto rd = make_reader(query::full_partition_range);
|
||||||
|
|
||||||
|
for (auto& partition : ms) {
|
||||||
|
test(rd, partition);
|
||||||
|
}
|
||||||
|
|
||||||
|
auto single_range = dht::partition_range::make_singular(ms[0].decorated_key());
|
||||||
|
|
||||||
|
auto rd2 = make_reader(single_range);
|
||||||
|
|
||||||
|
rd2.produces_partition_start(ms[0].decorated_key(), ms[0].partition().partition_tombstone());
|
||||||
|
rd2.produces_end_of_stream();
|
||||||
|
rd2.fast_forward_to(position_range::all_clustered_rows());
|
||||||
|
rd2.produces_row_with_key(ms[0].partition().clustered_rows().begin()->key());
|
||||||
|
rd2.produces_row_with_key(std::next(ms[0].partition().clustered_rows().begin())->key());
|
||||||
|
|
||||||
|
auto remaining_range = dht::partition_range::make_starting_with({ms[0].decorated_key(), false});
|
||||||
|
|
||||||
|
rd2.fast_forward_to(remaining_range);
|
||||||
|
|
||||||
|
for (auto i = size_t(1); i < ms.size(); ++i) {
|
||||||
|
test(rd2, ms[i]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
SEASTAR_THREAD_TEST_CASE(test_make_forwardable_next_partition) {
|
||||||
|
simple_schema s;
|
||||||
|
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||||
|
const auto permit = semaphore.make_permit();
|
||||||
|
|
||||||
|
auto make_reader = [&](std::vector<mutation> mutations, const dht::partition_range& pr) {
|
||||||
|
auto result = make_flat_mutation_reader_from_mutations_v2(s.schema(),
|
||||||
|
permit,
|
||||||
|
std::move(mutations),
|
||||||
|
pr,
|
||||||
|
streamed_mutation::forwarding::yes);
|
||||||
|
return assert_that(std::move(result)).exact();
|
||||||
|
};
|
||||||
|
|
||||||
|
const auto pk1 = s.make_pkey(1);
|
||||||
|
auto m1 = mutation(s.schema(), pk1);
|
||||||
|
s.add_static_row(m1, "test-static-1");
|
||||||
|
|
||||||
|
const auto pk2 = s.make_pkey(2);
|
||||||
|
auto m2 = mutation(s.schema(), pk2);
|
||||||
|
s.add_static_row(m2, "test-static-2");
|
||||||
|
|
||||||
|
dht::ring_position_comparator cmp{*s.schema()};
|
||||||
|
BOOST_CHECK_EQUAL(cmp(m1.decorated_key(), m2.decorated_key()), std::strong_ordering::less);
|
||||||
|
|
||||||
|
auto rd = make_reader({m1, m2}, query::full_partition_range);
|
||||||
|
rd.fill_buffer().get();
|
||||||
|
rd.next_partition();
|
||||||
|
rd.produces_partition_start(m1.decorated_key(), m1.partition().partition_tombstone());
|
||||||
|
rd.produces_static_row(
|
||||||
|
{{s.schema()->get_column_definition(to_bytes("s1")), to_bytes("test-static-1")}});
|
||||||
|
rd.produces_end_of_stream();
|
||||||
|
|
||||||
|
rd.next_partition();
|
||||||
|
rd.produces_partition_start(m2.decorated_key(), m2.partition().partition_tombstone());
|
||||||
|
rd.produces_static_row(
|
||||||
|
{{s.schema()->get_column_definition(to_bytes("s1")), to_bytes("test-static-2")}});
|
||||||
|
rd.produces_end_of_stream();
|
||||||
|
|
||||||
|
rd.next_partition();
|
||||||
|
rd.produces_end_of_stream();
|
||||||
|
}
|
||||||
|
|
||||||
|
SEASTAR_THREAD_TEST_CASE(test_make_nonforwardable) {
|
||||||
|
simple_schema s;
|
||||||
|
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||||
|
const auto permit = semaphore.make_permit();
|
||||||
|
|
||||||
|
auto make_reader = [&](std::vector<mutation> mutations,
|
||||||
|
bool single_partition,
|
||||||
|
const dht::partition_range& pr)
|
||||||
|
{
|
||||||
|
auto result = make_flat_mutation_reader_from_mutations_v2(s.schema(),
|
||||||
|
permit,
|
||||||
|
std::move(mutations),
|
||||||
|
pr,
|
||||||
|
streamed_mutation::forwarding::yes);
|
||||||
|
result = make_nonforwardable(std::move(result), single_partition);
|
||||||
|
return assert_that(std::move(result)).exact();
|
||||||
|
};
|
||||||
|
|
||||||
|
const auto pk1 = s.make_pkey(1);
|
||||||
|
auto m1 = mutation(s.schema(), pk1);
|
||||||
|
m1.apply(s.make_row(permit, s.make_ckey(11), "value1"));
|
||||||
|
|
||||||
|
const auto pk2 = s.make_pkey(2);
|
||||||
|
auto m2 = mutation(s.schema(), pk2);
|
||||||
|
m2.apply(s.make_row(permit, s.make_ckey(22), "value2"));
|
||||||
|
|
||||||
|
const auto pk3 = s.make_pkey(3);
|
||||||
|
auto m3 = mutation(s.schema(), pk3);
|
||||||
|
m3.apply(s.make_row(permit, s.make_ckey(33), "value3"));
|
||||||
|
|
||||||
|
dht::ring_position_comparator cmp{*s.schema()};
|
||||||
|
BOOST_CHECK_EQUAL(cmp(m1.decorated_key(), m2.decorated_key()), std::strong_ordering::less);
|
||||||
|
BOOST_CHECK_EQUAL(cmp(m2.decorated_key(), m3.decorated_key()), std::strong_ordering::less);
|
||||||
|
|
||||||
|
// no input -> no output
|
||||||
|
{
|
||||||
|
auto rd = make_reader({}, false, query::full_partition_range);
|
||||||
|
rd.produces_end_of_stream();
|
||||||
|
}
|
||||||
|
|
||||||
|
// next_partition()
|
||||||
|
{
|
||||||
|
auto check = [&] (flat_reader_assertions_v2 rd) {
|
||||||
|
rd.produces_partition_start(m1.decorated_key(), m1.partition().partition_tombstone());
|
||||||
rd.next_partition();
|
rd.next_partition();
|
||||||
|
rd.produces_partition_start(m2.decorated_key(), m2.partition().partition_tombstone());
|
||||||
|
rd.produces_row_with_key(m2.partition().clustered_rows().begin()->key());
|
||||||
|
rd.produces_partition_end();
|
||||||
|
rd.produces_end_of_stream();
|
||||||
};
|
};
|
||||||
|
|
||||||
auto rd = make_reader(query::full_partition_range);
|
// buffer is not empty
|
||||||
|
check(make_reader({m1, m2}, false, query::full_partition_range));
|
||||||
|
|
||||||
for (auto& partition : ms) {
|
// buffer is empty
|
||||||
test(rd, partition);
|
{
|
||||||
|
auto rd = make_reader({m1, m2}, false, query::full_partition_range);
|
||||||
|
rd.set_max_buffer_size(1);
|
||||||
|
check(std::move(rd));
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
auto single_range = dht::partition_range::make_singular(ms[0].decorated_key());
|
// fast_forward_to()
|
||||||
|
{
|
||||||
|
const auto m1_range = dht::partition_range::make_singular(m1.decorated_key());
|
||||||
|
auto rd = make_reader({m1, m2}, false, m1_range);
|
||||||
|
rd.set_max_buffer_size(1);
|
||||||
|
|
||||||
auto rd2 = make_reader(single_range);
|
rd.produces_partition_start(m1.decorated_key(), m1.partition().partition_tombstone());
|
||||||
|
|
||||||
rd2.produces_partition_start(ms[0].decorated_key(), ms[0].partition().partition_tombstone());
|
const auto m2_range = dht::partition_range::make_singular(m2.decorated_key());
|
||||||
rd2.produces_end_of_stream();
|
rd.fast_forward_to(m2_range);
|
||||||
rd2.fast_forward_to(position_range::all_clustered_rows());
|
rd.produces_partition_start(m2.decorated_key(), m2.partition().partition_tombstone());
|
||||||
rd2.produces_row_with_key(ms[0].partition().clustered_rows().begin()->key());
|
rd.produces_row_with_key(m2.partition().clustered_rows().begin()->key());
|
||||||
rd2.produces_row_with_key(std::next(ms[0].partition().clustered_rows().begin())->key());
|
rd.produces_partition_end();
|
||||||
|
|
||||||
auto remaining_range = dht::partition_range::make_starting_with({ms[0].decorated_key(), false});
|
rd.next_partition();
|
||||||
|
rd.produces_end_of_stream();
|
||||||
|
}
|
||||||
|
|
||||||
rd2.fast_forward_to(remaining_range);
|
// single_partition
|
||||||
|
{
|
||||||
|
auto rd = make_reader({m1, m2}, true, query::full_partition_range);
|
||||||
|
rd.set_max_buffer_size(1);
|
||||||
|
|
||||||
for (auto i = size_t(1); i < ms.size(); ++i) {
|
rd.produces_partition_start(m1.decorated_key(), m1.partition().partition_tombstone());
|
||||||
test(rd2, ms[i]);
|
rd.produces_row_with_key(m1.partition().clustered_rows().begin()->key());
|
||||||
}
|
|
||||||
});
|
rd.next_partition();
|
||||||
|
rd.produces_end_of_stream();
|
||||||
|
|
||||||
|
rd.next_partition();
|
||||||
|
rd.produces_end_of_stream();
|
||||||
|
}
|
||||||
|
|
||||||
|
// single_partition with fast_forward_to
|
||||||
|
{
|
||||||
|
const auto m1_range = dht::partition_range::make_singular(m1.decorated_key());
|
||||||
|
auto rd = make_reader({m1, m2}, true, m1_range);
|
||||||
|
rd.set_max_buffer_size(1);
|
||||||
|
|
||||||
|
rd.produces_partition_start(m1.decorated_key(), m1.partition().partition_tombstone());
|
||||||
|
|
||||||
|
const auto m2_range = dht::partition_range::make_singular(m2.decorated_key());
|
||||||
|
rd.fast_forward_to(m2_range);
|
||||||
|
rd.produces_end_of_stream();
|
||||||
|
|
||||||
|
rd.next_partition();
|
||||||
|
rd.produces_end_of_stream();
|
||||||
|
}
|
||||||
|
|
||||||
|
// static row
|
||||||
|
{
|
||||||
|
s.add_static_row(m1, "test-static");
|
||||||
|
const auto m1_range = dht::partition_range::make_singular(m1.decorated_key());
|
||||||
|
auto rd = make_reader({m1, m2}, false, m1_range);
|
||||||
|
rd.set_max_buffer_size(1);
|
||||||
|
rd.produces_partition_start(m1.decorated_key(), m1.partition().partition_tombstone());
|
||||||
|
rd.produces_static_row(
|
||||||
|
{{s.schema()->get_column_definition(to_bytes("s1")), to_bytes("test-static")}});
|
||||||
|
rd.produces_row(
|
||||||
|
m1.partition().clustered_rows().begin()->key(),
|
||||||
|
{{s.schema()->get_column_definition(to_bytes("v")), to_bytes("value1")}}
|
||||||
|
);
|
||||||
|
rd.produces_partition_end();
|
||||||
|
rd.produces_end_of_stream();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SEASTAR_TEST_CASE(test_abandoned_flat_mutation_reader_from_mutation) {
|
SEASTAR_THREAD_TEST_CASE(test_make_nonforwardable_from_mutations_as_mutation_source) {
|
||||||
return seastar::async([] {
|
auto populate = [] (schema_ptr, const std::vector<mutation> &muts) {
|
||||||
tests::reader_concurrency_semaphore_wrapper semaphore;
|
return mutation_source([=] (
|
||||||
for_each_mutation([&] (const mutation& m) {
|
schema_ptr schema,
|
||||||
auto rd = make_flat_mutation_reader_from_mutations_v2(m.schema(), semaphore.make_permit(), {mutation(m)});
|
reader_permit permit,
|
||||||
auto close_rd = deferred_close(rd);
|
const dht::partition_range& range,
|
||||||
rd().get();
|
const query::partition_slice& slice,
|
||||||
rd().get();
|
const io_priority_class&,
|
||||||
// We rely on AddressSanitizer telling us if nothing was leaked.
|
tracing::trace_state_ptr,
|
||||||
|
streamed_mutation::forwarding fwd_sm,
|
||||||
|
mutation_reader::forwarding) mutable {
|
||||||
|
auto squashed_muts = squash_mutations(muts);
|
||||||
|
const auto single_partition = squashed_muts.size() == 1;
|
||||||
|
auto reader = make_flat_mutation_reader_from_mutations_v2(schema,
|
||||||
|
std::move(permit),
|
||||||
|
std::move(squashed_muts),
|
||||||
|
range,
|
||||||
|
slice,
|
||||||
|
streamed_mutation::forwarding::yes);
|
||||||
|
reader = make_nonforwardable(std::move(reader), single_partition);
|
||||||
|
if (fwd_sm) {
|
||||||
|
reader = make_forwardable(std::move(reader));
|
||||||
|
}
|
||||||
|
return reader;
|
||||||
});
|
});
|
||||||
|
};
|
||||||
|
run_mutation_source_tests(populate);
|
||||||
|
}
|
||||||
|
|
||||||
|
SEASTAR_THREAD_TEST_CASE(test_abandoned_flat_mutation_reader_from_mutation) {
|
||||||
|
tests::reader_concurrency_semaphore_wrapper semaphore;
|
||||||
|
for_each_mutation([&] (const mutation& m) {
|
||||||
|
auto rd = make_flat_mutation_reader_from_mutations_v2(m.schema(), semaphore.make_permit(), {mutation(m)});
|
||||||
|
auto close_rd = deferred_close(rd);
|
||||||
|
rd().get();
|
||||||
|
rd().get();
|
||||||
|
// We rely on AddressSanitizer telling us if nothing was leaked.
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user