Merge "Remove reader from mutations v1" from Botond

"
First migrate all users to the v2 variant, all of which are tests.
However, to be able to properly migrate all tests off it, a v2 variant
of the restricted reader is also needed. All restricted reader users are
then migrated to the freshly introduced v2 variant and the v1 variant is
removed.
Users include:
* replica::table::make_reader_v2()
* streaming_virtual_table::as_mutation_source()
* sstables::make_reader()
* tests

This allows us to get rid of a bunch of conversions on the query path,
which was mostly v2 already.

With a few tests we did kick the can down the road by wrapping the v2
reader in `downgrade_to_v1()`, but this series is long enough already.

Tests: unit(dev), unit(boost/flat_mutation_reader_test:debug)
"

* 'remove-reader-from-mutations-v1/v3' of https://github.com/denesb/scylla:
  readers: remove now unused v1 reader from mutations
  test: move away from v1 reader from mutations
  test/boost/mutation_reader_test: use fragment_scatterer
  test/boost/mutation_fragment_test: extract fragment_scatterer into a separate hh
  test/boost: mutation_fragment_test: refactor fragment_scatterer
  readers: remove now unused v1 reversing reader
  test/boost/flat_mutation_reader_test: convert to v2
  frozen_mutation: fragment_and_freeze(): convert to v2
  frozen_mutation: coroutinize fragment_and_freeze()
  readers: migrate away from v1 reversing reader
  db/virtual_table: use v2 variant of reversing and forwardable readers
  replica/table: use v2 variant of reversing reader
  sstables/sstable: remove unused make_crawling_reader_v1()
  sstables/sstable: remove make_reader_v1()
  readers: add v2 variant of reversing reader
  readers/reversing: remove FIXME
  readers: reader from mutations: use mutation's own schema when slicing
This commit is contained in:
Avi Kivity
2022-03-31 13:29:11 +03:00
17 changed files with 355 additions and 694 deletions

View File

@@ -13,8 +13,8 @@
#include "db/virtual_table.hh"
#include "db/chained_delegating_reader.hh"
#include "readers/queue.hh"
#include "readers/reversing.hh"
#include "readers/forwardable.hh"
#include "readers/reversing_v2.hh"
#include "readers/forwardable_v2.hh"
#include "readers/slicing_filtering.hh"
namespace db {
@@ -148,12 +148,12 @@ mutation_source streaming_virtual_table::as_mutation_source() {
}
});
auto rd = make_slicing_filtering_reader(downgrade_to_v1(std::move(reader_and_handle.first)), pr, slice);
auto rd = upgrade_to_v2(make_slicing_filtering_reader(downgrade_to_v1(std::move(reader_and_handle.first)), pr, slice));
if (!_shard_aware) {
rd = downgrade_to_v1(make_filtering_reader(upgrade_to_v2(std::move(rd)), [this] (const dht::decorated_key& dk) -> bool {
rd = make_filtering_reader(std::move(rd), [this] (const dht::decorated_key& dk) -> bool {
return this_shard_owns(dk);
}));
});
}
if (reversed) {

View File

@@ -6,7 +6,7 @@
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#include <seastar/util/closeable.hh>
#include <seastar/core/coroutine.hh>
#include "frozen_mutation.hh"
#include "mutation_partition.hh"
#include "mutation.hh"
@@ -26,7 +26,7 @@
#include "idl/uuid.dist.impl.hh"
#include "idl/keys.dist.impl.hh"
#include "idl/mutation.dist.impl.hh"
#include "readers/flat_mutation_reader.hh"
#include "readers/flat_mutation_reader_v2.hh"
#include "converting_mutation_partition_applier.hh"
#include "mutation_partition_view.hh"
@@ -190,6 +190,8 @@ class fragmenting_mutation_freezer {
bool _fragmented = false;
size_t _dirty_size = 0;
size_t _fragment_size;
range_tombstone_change _current_rtc;
private:
future<stop_iteration> flush() {
bytes_ostream out;
@@ -219,7 +221,7 @@ private:
}
public:
fragmenting_mutation_freezer(const schema& s, frozen_mutation_consumer_fn c, size_t fragment_size)
: _schema(s), _rts(s), _consumer(c), _fragment_size(fragment_size) { }
: _schema(s), _rts(s), _consumer(c), _fragment_size(fragment_size), _current_rtc(position_in_partition::before_all_clustered_rows(), {}) { }
future<stop_iteration> consume(partition_start&& ps) {
_key = std::move(ps.key().key());
@@ -241,10 +243,16 @@ public:
return maybe_flush();
}
future<stop_iteration> consume(range_tombstone&& rt) {
_dirty_size += rt.memory_usage(_schema);
_rts.apply(_schema, std::move(rt));
return maybe_flush();
future<stop_iteration> consume(range_tombstone_change&& rtc) {
auto ret = make_ready_future<stop_iteration>(stop_iteration::no);
if (_current_rtc.tombstone()) {
auto rt = range_tombstone(_current_rtc.position(), rtc.position(), _current_rtc.tombstone());
_dirty_size += rt.memory_usage(_schema);
_rts.apply(_schema, std::move(rt));
ret = maybe_flush();
}
_current_rtc = std::move(rtc);
return ret;
}
future<stop_iteration> consume(partition_end&&) {
@@ -255,19 +263,20 @@ public:
}
};
future<> fragment_and_freeze(flat_mutation_reader mr, frozen_mutation_consumer_fn c, size_t fragment_size)
future<> fragment_and_freeze(flat_mutation_reader_v2 mr, frozen_mutation_consumer_fn c, size_t fragment_size)
{
return with_closeable(std::move(mr), [c = std::move(c), fragment_size] (flat_mutation_reader& mr) mutable {
fragmenting_mutation_freezer freezer(*mr.schema(), c, fragment_size);
return do_with(std::move(freezer), [&mr] (auto& freezer) {
return repeat([&] {
return mr().then([&] (auto mfopt) {
if (!mfopt) {
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
return std::move(*mfopt).consume(freezer);
});
});
});
});
std::exception_ptr ex;
try {
fragmenting_mutation_freezer freezer(*mr.schema(), c, fragment_size);
mutation_fragment_v2_opt mfopt;
while ((mfopt = co_await mr()) && (co_await std::move(*mfopt).consume(freezer) == stop_iteration::no));
} catch (...) {
ex = std::current_exception();
}
co_await mr.close();
if (ex) {
std::rethrow_exception(std::move(ex));
}
}

View File

@@ -14,7 +14,7 @@
class mutation;
class mutation_partition_view;
class flat_mutation_reader;
class flat_mutation_reader_v2;
namespace ser {
class mutation_view;
@@ -103,7 +103,7 @@ public:
static constexpr size_t default_frozen_fragment_size = 128 * 1024;
using frozen_mutation_consumer_fn = std::function<future<stop_iteration>(frozen_mutation, bool)>;
future<> fragment_and_freeze(flat_mutation_reader mr, frozen_mutation_consumer_fn c,
future<> fragment_and_freeze(flat_mutation_reader_v2 mr, frozen_mutation_consumer_fn c,
size_t fragment_size = default_frozen_fragment_size);
class reader_permit;

View File

@@ -1,52 +0,0 @@
/*
* Copyright (C) 2022-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#pragma once
#include "schema_fwd.hh"
#include "dht/i_partitioner_fwd.hh"
#include <vector>
#include "mutation_fragment_fwd.hh"
#include "readers/flat_mutation_reader_fwd.hh"
class flat_mutation_reader;
class reader_permit;
class mutation;
namespace query {
class partition_slice;
extern const dht::partition_range full_partition_range;
}
// All mutations should have the same schema.
flat_mutation_reader make_flat_mutation_reader_from_mutations(
schema_ptr schema,
reader_permit permit,
std::vector<mutation>,
const dht::partition_range& pr = query::full_partition_range,
streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no);
// All mutations should have the same schema.
flat_mutation_reader make_flat_mutation_reader_from_mutations(schema_ptr schema, reader_permit permit, std::vector<mutation> ms, streamed_mutation::forwarding fwd);
// All mutations should have the same schema.
flat_mutation_reader
make_flat_mutation_reader_from_mutations(schema_ptr schema,
reader_permit permit,
std::vector<mutation> ms,
const query::partition_slice& slice,
streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no);
// All mutations should have the same schema.
flat_mutation_reader
make_flat_mutation_reader_from_mutations(schema_ptr schema,
reader_permit permit,
std::vector<mutation> ms,
const dht::partition_range& pr,
const query::partition_slice& slice,
streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no);

View File

@@ -24,14 +24,13 @@
#include "readers/forwardable.hh"
#include "readers/forwardable_v2.hh"
#include "readers/from_fragments_v2.hh"
#include "readers/from_mutations.hh"
#include "readers/from_mutations_v2.hh"
#include "readers/generating_v2.hh"
#include "readers/multi_range.hh"
#include "readers/mutation_source.hh"
#include "readers/nonforwardable.hh"
#include "readers/queue.hh"
#include "readers/reversing.hh"
#include "readers/reversing_v2.hh"
#include "readers/slice_mutations.hh"
#include "readers/upgrading_consumer.hh"
#include <seastar/core/coroutine.hh>
@@ -389,145 +388,107 @@ std::vector<mutation> slice_mutations(schema_ptr schema, std::vector<mutation> m
std::vector<mutation> sliced_ms;
for (auto& m : ms) {
auto ck_ranges = query::clustering_key_filter_ranges::get_ranges(*schema, slice, m.key());
auto mp = mutation_partition(std::move(m.partition()), *schema, std::move(ck_ranges));
auto mp = mutation_partition(std::move(m.partition()), *m.schema(), std::move(ck_ranges));
sliced_ms.emplace_back(schema, m.decorated_key(), std::move(mp));
}
return sliced_ms;
}
flat_mutation_reader make_reversing_reader(flat_mutation_reader original, query::max_result_size max_size, std::unique_ptr<query::partition_slice> slice) {
class partition_reversing_mutation_reader final : public flat_mutation_reader::impl {
flat_mutation_reader _source;
range_tombstone_list _range_tombstones;
std::stack<mutation_fragment> _mutation_fragments;
mutation_fragment_opt _partition_end;
size_t _stack_size = 0;
const query::max_result_size _max_size;
flat_mutation_reader_v2 make_reversing_reader(flat_mutation_reader_v2 original, query::max_result_size max_size, std::unique_ptr<query::partition_slice> slice) {
class partition_reversing_mutation_reader final : public flat_mutation_reader_v2::impl {
flat_mutation_reader_v2 _source;
tombstone _current_tombstone;
query::max_result_size _max_size;
bool _below_soft_limit = true;
std::unique_ptr<query::partition_slice> _slice; // only stored, not used
private:
stop_iteration emit_partition() {
auto emit_range_tombstone = [&] {
// _range_tombstones uses the reverse schema already, so we can use `begin()`
auto it = _range_tombstones.begin();
push_mutation_fragment(*_schema, _permit, _range_tombstones.pop(it));
};
position_in_partition::tri_compare cmp(*_schema);
while (!_mutation_fragments.empty() && !is_buffer_full()) {
auto& mf = _mutation_fragments.top();
if (!_range_tombstones.empty() && cmp(_range_tombstones.begin()->position(), mf.position()) <= 0) {
emit_range_tombstone();
void check_buffer_size(const partition_key& key) {
const auto sz = buffer_size();
if (sz > _max_size.hard_limit || (sz > _max_size.soft_limit && _below_soft_limit)) [[unlikely]] {
if (buffer_size() > _max_size.hard_limit) {
throw std::runtime_error(fmt::format(
"Memory usage of reversed read exceeds hard limit of {} (configured via max_memory_for_unlimited_query_hard_limit), while reading partition {}",
_max_size.hard_limit,
key.with_schema(*_schema)));
} else {
_stack_size -= mf.memory_usage();
push_mutation_fragment(std::move(mf));
_mutation_fragments.pop();
mrlog.warn(
"Memory usage of reversed read exceeds soft limit of {} (configured via max_memory_for_unlimited_query_soft_limit), while reading partition {}",
_max_size.soft_limit,
key.with_schema(*_schema));
_below_soft_limit = false;
}
}
while (!_range_tombstones.empty() && !is_buffer_full()) {
emit_range_tombstone();
}
if (is_buffer_full()) {
return stop_iteration::yes;
}
push_mutation_fragment(std::move(*std::exchange(_partition_end, std::nullopt)));
return stop_iteration::no;
}
future<stop_iteration> consume_partition_from_source() {
if (_source.is_buffer_empty()) {
if (_source.is_end_of_stream()) {
_end_of_stream = true;
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
return _source.fill_buffer().then([] { return stop_iteration::no; });
}
while (!_source.is_buffer_empty() && !is_buffer_full()) {
auto mf = _source.pop_mutation_fragment();
if (mf.is_partition_start() || mf.is_static_row()) {
push_mutation_fragment(std::move(mf));
} else if (mf.is_end_of_partition()) {
_partition_end = std::move(mf);
if (emit_partition()) {
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
} else if (mf.is_range_tombstone()) {
auto&& rt = std::move(mf).as_range_tombstone();
rt.reverse();
_range_tombstones.apply(*_schema, std::move(rt));
} else {
_mutation_fragments.emplace(std::move(mf));
_stack_size += _mutation_fragments.top().memory_usage();
if (_stack_size > _max_size.hard_limit || (_stack_size > _max_size.soft_limit && _below_soft_limit)) {
const partition_key* key = nullptr;
auto it = buffer().end();
--it;
if (it->is_partition_start()) {
key = &it->as_partition_start().key().key();
} else {
--it;
key = &it->as_partition_start().key().key();
}
if (_stack_size > _max_size.hard_limit) {
return make_exception_future<stop_iteration>(std::runtime_error(fmt::format(
"Memory usage of reversed read exceeds hard limit of {} (configured via max_memory_for_unlimited_query_hard_limit), while reading partition {}",
_max_size.hard_limit,
key->with_schema(*_schema))));
} else {
mrlog.warn(
"Memory usage of reversed read exceeds soft limit of {} (configured via max_memory_for_unlimited_query_soft_limit), while reading partition {}",
_max_size.soft_limit,
key->with_schema(*_schema));
_below_soft_limit = false;
}
}
}
}
return make_ready_future<stop_iteration>(is_buffer_full());
void push_front(mutation_fragment_v2&& mf) {
unpop_mutation_fragment(std::move(mf));
}
void push_back(mutation_fragment_v2&& mf) {
push_mutation_fragment(std::move(mf));
}
public:
explicit partition_reversing_mutation_reader(flat_mutation_reader mr, query::max_result_size max_size, std::unique_ptr<query::partition_slice> slice)
: flat_mutation_reader::impl(mr.schema()->make_reversed(), mr.permit())
explicit partition_reversing_mutation_reader(flat_mutation_reader_v2 mr, query::max_result_size max_size, std::unique_ptr<query::partition_slice> slice)
: flat_mutation_reader_v2::impl(mr.schema()->make_reversed(), mr.permit())
, _source(std::move(mr))
, _range_tombstones(*_schema)
, _max_size(max_size)
, _slice(std::move(slice))
{ }
virtual future<> fill_buffer() override {
return repeat([&] {
if (_partition_end) {
// We have consumed full partition from source, now it is
// time to emit it.
auto stop = emit_partition();
if (stop) {
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
if (!is_buffer_empty()) {
co_return;
}
mutation_fragment_v2_opt ps, sr, pe;
const partition_key* pk = nullptr;
bool have_partition = false;
while (!have_partition) {
auto mf_opt = co_await _source();
if (!mf_opt) {
break;
}
return consume_partition_from_source();
});
switch (mf_opt->mutation_fragment_kind()) {
case mutation_fragment_v2::kind::partition_start:
ps = std::move(mf_opt);
pk = &ps->as_partition_start().key().key();
break;
case mutation_fragment_v2::kind::partition_end:
pe = std::move(mf_opt);
have_partition = true;
break;
case mutation_fragment_v2::kind::static_row:
sr = std::move(mf_opt);
break;
case mutation_fragment_v2::kind::range_tombstone_change:
mf_opt->mutate_as_range_tombstone_change(*_schema, [this] (range_tombstone_change& rtc) {
rtc.set_position(std::move(rtc).position().reversed());
rtc.set_tombstone(std::exchange(_current_tombstone, rtc.tombstone()));
});
[[fallthrough]];
case mutation_fragment_v2::kind::clustering_row:
push_front(std::move(*mf_opt));
check_buffer_size(*pk);
break;
}
}
if (have_partition) {
if (sr) {
push_front(std::move(*sr));
}
push_front(std::move(*ps));
push_back(std::move(*pe));
}
_end_of_stream = _source.is_end_of_stream();
}
virtual future<> next_partition() override {
clear_buffer_to_next_partition();
if (is_buffer_empty() && !is_end_of_stream()) {
while (!_mutation_fragments.empty()) {
_stack_size -= _mutation_fragments.top().memory_usage();
_mutation_fragments.pop();
}
_range_tombstones.clear();
_partition_end = std::nullopt;
return _source.next_partition();
}
return make_ready_future<>();
_end_of_stream = false;
return _source.next_partition();
}
virtual future<> fast_forward_to(const dht::partition_range& pr) override {
clear_buffer();
while (!_mutation_fragments.empty()) {
_mutation_fragments.pop();
}
_stack_size = 0;
_partition_end = std::nullopt;
_end_of_stream = false;
return _source.fast_forward_to(pr);
}
@@ -541,7 +502,7 @@ flat_mutation_reader make_reversing_reader(flat_mutation_reader original, query:
}
};
return make_flat_mutation_reader<partition_reversing_mutation_reader>(std::move(original), max_size, std::move(slice));
return make_flat_mutation_reader_v2<partition_reversing_mutation_reader>(std::move(original), max_size, std::move(slice));
}
flat_mutation_reader make_nonforwardable(flat_mutation_reader r, bool single_partition) {
@@ -1056,243 +1017,6 @@ flat_mutation_reader_v2 make_flat_mutation_reader_from_mutations_v2(
return make_flat_mutation_reader_from_mutations_v2(std::move(schema), std::move(permit), std::move(ms), query::full_partition_range, fwd);
}
// All mutations should have the same schema.
flat_mutation_reader make_flat_mutation_reader_from_mutations(schema_ptr schema, reader_permit permit, std::vector<mutation> ms, streamed_mutation::forwarding fwd) {
return make_flat_mutation_reader_from_mutations(std::move(schema), std::move(permit), std::move(ms), query::full_partition_range, fwd);
}
flat_mutation_reader
make_flat_mutation_reader_from_mutations(schema_ptr schema,
reader_permit permit,
std::vector<mutation> ms,
const dht::partition_range& pr,
const query::partition_slice& query_slice,
streamed_mutation::forwarding fwd) {
const auto reversed = query_slice.is_reversed();
if (reversed) {
schema = schema->make_reversed();
}
auto slice = reversed
? query::half_reverse_slice(*schema, query_slice)
: query_slice;
auto sliced_ms = slice_mutations(schema, std::move(ms), slice);
auto rd = make_flat_mutation_reader_from_mutations(schema, permit, sliced_ms, pr, reversed ? streamed_mutation::forwarding::no : fwd);
if (reversed) {
rd = make_reversing_reader(std::move(rd), permit.max_result_size());
if (fwd) {
rd = make_forwardable(std::move(rd));
}
}
return rd;
}
flat_mutation_reader
make_flat_mutation_reader_from_mutations(schema_ptr schema, reader_permit permit, std::vector<mutation> ms, const query::partition_slice& slice, streamed_mutation::forwarding fwd) {
return make_flat_mutation_reader_from_mutations(std::move(schema), std::move(permit), std::move(ms), query::full_partition_range, slice, fwd);
}
flat_mutation_reader
make_flat_mutation_reader_from_mutations(schema_ptr s, reader_permit permit, std::vector<mutation> mutations, const dht::partition_range& pr, streamed_mutation::forwarding fwd) {
class reader final : public flat_mutation_reader::impl {
std::vector<mutation> _mutations;
std::vector<mutation>::iterator _cur;
std::vector<mutation>::iterator _end;
position_in_partition::less_compare _cmp;
bool _static_row_done = false;
mutation_fragment_opt _rt;
mutation_fragment_opt _cr;
private:
void prepare_next_clustering_row() {
auto& crs = _cur->partition().mutable_clustered_rows();
while (true) {
auto re = crs.unlink_leftmost_without_rebalance();
if (!re) {
break;
}
auto re_deleter = defer([re] () noexcept { current_deleter<rows_entry>()(re); });
if (!re->dummy()) {
_cr = mutation_fragment(*_schema, _permit, std::move(*re));
break;
}
}
}
void prepare_next_range_tombstone() {
auto& rts = _cur->partition().mutable_row_tombstones();
if (!rts.empty()) {
_rt = mutation_fragment(*_schema, _permit, rts.pop_front_and_lock());
}
}
mutation_fragment_opt read_next() {
if (_cr && (!_rt || _cmp(_cr->position(), _rt->position()))) {
auto cr = std::exchange(_cr, { });
prepare_next_clustering_row();
return cr;
} else if (_rt) {
auto rt = std::exchange(_rt, { });
prepare_next_range_tombstone();
return rt;
}
return { };
}
private:
void do_fill_buffer() {
while (!is_end_of_stream() && !is_buffer_full()) {
if (!_static_row_done) {
_static_row_done = true;
if (!_cur->partition().static_row().empty()) {
push_mutation_fragment(*_schema, _permit, static_row(std::move(_cur->partition().static_row().get_existing())));
}
}
auto mfopt = read_next();
if (mfopt) {
push_mutation_fragment(std::move(*mfopt));
} else {
push_mutation_fragment(*_schema, _permit, partition_end());
++_cur;
if (_cur == _end) {
_end_of_stream = true;
} else {
start_new_partition();
}
}
}
}
void start_new_partition() {
_static_row_done = false;
push_mutation_fragment(*_schema, _permit, partition_start(_cur->decorated_key(),
_cur->partition().partition_tombstone()));
prepare_next_clustering_row();
prepare_next_range_tombstone();
}
void destroy_current_mutation() {
auto &crs = _cur->partition().mutable_clustered_rows();
auto deleter = current_deleter<rows_entry>();
crs.clear_and_dispose(deleter);
auto &rts = _cur->partition().mutable_row_tombstones();
// cannot use .clear() here, pop_front_and_lock leaves
// boost set in a sate that crashes if being cleared.
while (!rts.empty()) {
rts.pop_front_and_lock();
}
}
struct cmp {
bool operator()(const mutation& m, const dht::ring_position& p) const {
return m.decorated_key().tri_compare(*m.schema(), p) < 0;
}
bool operator()(const dht::ring_position& p, const mutation& m) const {
return m.decorated_key().tri_compare(*m.schema(), p) > 0;
}
};
static std::vector<mutation>::iterator find_first_partition(std::vector<mutation>& ms, const dht::partition_range& pr) {
if (!pr.start()) {
return std::begin(ms);
}
if (pr.is_singular()) {
return std::lower_bound(std::begin(ms), std::end(ms), pr.start()->value(), cmp{});
} else {
if (pr.start()->is_inclusive()) {
return std::lower_bound(std::begin(ms), std::end(ms), pr.start()->value(), cmp{});
} else {
return std::upper_bound(std::begin(ms), std::end(ms), pr.start()->value(), cmp{});
}
}
}
static std::vector<mutation>::iterator find_last_partition(std::vector<mutation>& ms, const dht::partition_range& pr) {
if (!pr.end()) {
return std::end(ms);
}
if (pr.is_singular()) {
return std::upper_bound(std::begin(ms), std::end(ms), pr.start()->value(), cmp{});
} else {
if (pr.end()->is_inclusive()) {
return std::upper_bound(std::begin(ms), std::end(ms), pr.end()->value(), cmp{});
} else {
return std::lower_bound(std::begin(ms), std::end(ms), pr.end()->value(), cmp{});
}
}
}
public:
reader(schema_ptr s, reader_permit permit, std::vector<mutation>&& mutations, const dht::partition_range& pr)
: impl(s, std::move(permit))
, _mutations(std::move(mutations))
, _cur(find_first_partition(_mutations, pr))
, _end(find_last_partition(_mutations, pr))
, _cmp(*s)
{
_end_of_stream = _cur == _end;
if (!_end_of_stream) {
auto mutation_destroyer = defer([this] () noexcept { destroy_mutations(); });
start_new_partition();
do_fill_buffer();
mutation_destroyer.cancel();
}
}
void destroy_mutations() noexcept {
// After unlink_leftmost_without_rebalance() was called on a bi::set
// we need to complete destroying the tree using that function.
// clear_and_dispose() used by mutation_partition destructor won't
// work properly.
_cur = _mutations.begin();
while (_cur != _end) {
destroy_current_mutation();
++_cur;
}
}
~reader() {
destroy_mutations();
}
virtual future<> fill_buffer() override {
do_fill_buffer();
return make_ready_future<>();
}
virtual future<> next_partition() override {
clear_buffer_to_next_partition();
if (is_buffer_empty() && !is_end_of_stream()) {
destroy_current_mutation();
++_cur;
if (_cur == _end) {
_end_of_stream = true;
} else {
start_new_partition();
}
}
return make_ready_future<>();
}
virtual future<> fast_forward_to(const dht::partition_range& pr) override {
clear_buffer();
_cur = find_first_partition(_mutations, pr);
_end = find_last_partition(_mutations, pr);
_static_row_done = false;
_cr = {};
_rt = {};
_end_of_stream = _cur == _end;
if (!_end_of_stream) {
start_new_partition();
}
return make_ready_future<>();
};
virtual future<> fast_forward_to(position_range cr) override {
return make_exception_future<>(std::runtime_error("This reader can't be fast forwarded to another position."));
};
virtual future<> close() noexcept override {
return make_ready_future<>();
};
};
if (mutations.empty()) {
return make_empty_flat_reader(std::move(s), std::move(permit));
}
auto res = make_flat_mutation_reader<reader>(std::move(s), std::move(permit), std::move(mutations), pr);
if (fwd) {
return make_forwardable(std::move(res));
}
return res;
}
flat_mutation_reader_v2
make_flat_mutation_reader_from_fragments(schema_ptr schema, reader_permit permit, std::deque<mutation_fragment_v2> fragments) {
return make_flat_mutation_reader_from_fragments(std::move(schema), std::move(permit), std::move(fragments), query::full_partition_range);

View File

@@ -9,10 +9,10 @@
#pragma once
#include <memory>
class flat_mutation_reader;
class flat_mutation_reader_v2;
namespace query {
struct max_result_size;
struct max_result_size;
class partition_slice;
}
@@ -22,10 +22,10 @@ namespace query {
/// 1. The reader's schema() method will return a reversed schema (see
/// \ref schema::make_reversed()).
/// 2. Static row is still emitted first.
/// 3. Range tombstones' bounds are reversed (see \ref range_tombstone::reverse()).
/// 4. Clustered rows and range tombstones are emitted in descending order.
/// Because of 3 and 4 the guarantee that a range tombstone is emitted before
/// any mutation fragment affected by it still holds.
/// 3. Clustering elements are emitted in reverse order.
/// 3. Range tombstones changes' tombstones are shifted by one to the left to
/// account for the implicit null tombstone at the start of the stream moving
/// from start to end (due to reversing).
/// Ordering of partitions themselves remains unchanged.
/// For more details see docs/design-notes/reverse-reads.md.
///
@@ -43,7 +43,5 @@ namespace query {
/// store an edited slice somewhere. This is common for reads that work
/// with a native-reversed slice and so have to convert the one used in the
/// query -- which is in half-reversed format.
///
/// FIXME: reversing should be done in the sstable layer, see #1413.
flat_mutation_reader
make_reversing_reader(flat_mutation_reader original, query::max_result_size max_size, std::unique_ptr<query::partition_slice> slice = {});
flat_mutation_reader_v2
make_reversing_reader(flat_mutation_reader_v2 original, query::max_result_size max_size, std::unique_ptr<query::partition_slice> slice = {});

View File

@@ -44,7 +44,7 @@
#include <boost/range/algorithm/remove_if.hpp>
#include <boost/range/algorithm.hpp>
#include "utils/error_injection.hh"
#include "readers/reversing.hh"
#include "readers/reversing_v2.hh"
#include "readers/from_mutations_v2.hh"
#include "readers/empty_v2.hh"
#include "readers/multi_range.hh"
@@ -207,7 +207,7 @@ table::make_reader_v2(schema_ptr s,
}
if (unreversed_slice) [[unlikely]] {
return upgrade_to_v2(make_reversing_reader(downgrade_to_v1(std::move(rd)), permit.max_result_size(), std::move(unreversed_slice)));
return make_reversing_reader(std::move(rd), permit.max_result_size(), std::move(unreversed_slice));
}
return rd;

View File

@@ -75,8 +75,8 @@
#include "utils/cached_file.hh"
#include "tombstone_gc.hh"
#include "reader_concurrency_semaphore.hh"
#include "readers/reversing.hh"
#include "readers/forwardable.hh"
#include "readers/reversing_v2.hh"
#include "readers/forwardable_v2.hh"
thread_local disk_error_signal_type sstable_read_error;
thread_local disk_error_signal_type sstable_write_error;
@@ -2145,56 +2145,35 @@ sstable::make_reader(
}
// Multi-partition reversed queries are not yet supported natively in the mx reader.
// Therefore in this case we delegate to make_reader_v1 which handles it (by using
// `make_reversing_reader` which right now works only with the v1 format).
// Therefore in this case we use `make_reversing_reader` over the forward reader.
// FIXME: remove this workaround eventually.
return upgrade_to_v2(make_reader_v1(std::move(schema), std::move(permit), range, slice, pc, std::move(trace_state), fwd, fwd_mr, mon));
}
flat_mutation_reader
sstable::make_reader_v1(
schema_ptr schema,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr,
read_monitor& mon) {
const auto reversed = slice.options.contains(query::partition_slice::option::reversed);
auto max_result_size = permit.max_result_size();
if (_version >= version_types::mc) {
if (reversed && !range.is_singular()) {
auto rd = make_reversing_reader(downgrade_to_v1(mx::make_reader(shared_from_this(), schema->make_reversed(), std::move(permit),
range, half_reverse_slice(*schema, slice), pc, std::move(trace_state), streamed_mutation::forwarding::no, fwd_mr, mon)),
max_result_size);
if (fwd) {
rd = make_forwardable(std::move(rd));
}
return rd;
}
return downgrade_to_v1(mx::make_reader(shared_from_this(), schema, std::move(permit),
range, slice, pc, std::move(trace_state), fwd, fwd_mr, mon));
}
if (reversed) {
// The kl reader does not support reversed queries at all.
// Perform a forward query on it, then reverse the result.
// Note: we can pass a half-reversed slice, the kl reader performs an unreversed query nevertheless.
auto rd = make_reversing_reader(kl::make_reader(shared_from_this(), schema->make_reversed(), std::move(permit),
range, slice, pc, std::move(trace_state), streamed_mutation::forwarding::no, fwd_mr, mon), max_result_size);
// The only mx case falling through here is reversed multi-partition reader
auto rd = make_reversing_reader(mx::make_reader(shared_from_this(), schema->make_reversed(), std::move(permit),
range, half_reverse_slice(*schema, slice), pc, std::move(trace_state), streamed_mutation::forwarding::no, fwd_mr, mon),
max_result_size);
if (fwd) {
rd = make_forwardable(std::move(rd));
}
return rd;
}
return kl::make_reader(shared_from_this(), schema, std::move(permit),
range, slice, pc, std::move(trace_state), fwd, fwd_mr, mon);
if (reversed) {
// The kl reader does not support reversed queries at all.
// Perform a forward query on it, then reverse the result.
// Note: we can pass a half-reversed slice, the kl reader performs an unreversed query nevertheless.
auto rd = make_reversing_reader(upgrade_to_v2(kl::make_reader(shared_from_this(), schema->make_reversed(), std::move(permit),
range, slice, pc, std::move(trace_state), streamed_mutation::forwarding::no, fwd_mr, mon)), max_result_size);
if (fwd) {
rd = make_forwardable(std::move(rd));
}
return rd;
}
return upgrade_to_v2(kl::make_reader(shared_from_this(), schema, std::move(permit),
range, slice, pc, std::move(trace_state), fwd, fwd_mr, mon));
}
flat_mutation_reader_v2
@@ -2210,19 +2189,6 @@ sstable::make_crawling_reader(
return upgrade_to_v2(kl::make_crawling_reader(shared_from_this(), std::move(schema), std::move(permit), pc, std::move(trace_state), monitor));
}
flat_mutation_reader
sstable::make_crawling_reader_v1(
schema_ptr schema,
reader_permit permit,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
read_monitor& monitor) {
if (_version >= version_types::mc) {
return downgrade_to_v1(mx::make_crawling_reader(shared_from_this(), std::move(schema), std::move(permit), pc, std::move(trace_state), monitor));
}
return kl::make_crawling_reader(shared_from_this(), std::move(schema), std::move(permit), pc, std::move(trace_state), monitor);
}
static entry_descriptor make_entry_descriptor(sstring sstdir, sstring fname, sstring* const provided_ks, sstring* const provided_cf) {
static std::regex la_mx("(la|m[cde])-(\\d+)-(\\w+)-(.*)");
static std::regex ka("(\\w+)-(\\w+)-ka-(\\d+)-(.*)");
@@ -3197,7 +3163,7 @@ mutation_source sstable::as_mutation_source() {
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr) mutable {
return sst->make_reader_v1(std::move(s), std::move(permit), range, slice, pc, std::move(trace_state), fwd, fwd_mr);
return sst->make_reader(std::move(s), std::move(permit), range, slice, pc, std::move(trace_state), fwd, fwd_mr);
});
}

View File

@@ -220,20 +220,6 @@ public:
mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::yes,
read_monitor& monitor = default_read_monitor());
// Precondition: if the slice is reversed, the schema must be reversed as well.
// Reversed slices must be provided in the 'half-reversed' format (the order of ranges
// being reversed, but the ranges themselves are not).
flat_mutation_reader make_reader_v1(
schema_ptr schema,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
const io_priority_class& pc = default_priority_class(),
tracing::trace_state_ptr trace_state = {},
streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no,
mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::yes,
read_monitor& monitor = default_read_monitor());
// A reader which doesn't use the index at all. It reads everything from the
// sstable and it doesn't support skipping.
flat_mutation_reader_v2 make_crawling_reader(
@@ -243,13 +229,6 @@ public:
tracing::trace_state_ptr trace_state = {},
read_monitor& monitor = default_read_monitor());
flat_mutation_reader make_crawling_reader_v1(
schema_ptr schema,
reader_permit permit,
const io_priority_class& pc = default_priority_class(),
tracing::trace_state_ptr trace_state = {},
read_monitor& monitor = default_read_monitor());
// Returns mutation_source containing all writes contained in this sstable.
// The mutation_source shares ownership of this sstable.
mutation_source as_mutation_source();

View File

@@ -16,17 +16,17 @@
#include "mutation_fragment.hh"
#include "test/lib/mutation_source_test.hh"
#include "readers/flat_mutation_reader.hh"
#include "readers/reversing.hh"
#include "readers/reversing_v2.hh"
#include "readers/forwardable.hh"
#include "readers/delegating.hh"
#include "readers/delegating_v2.hh"
#include "readers/multi_range.hh"
#include "readers/from_mutations.hh"
#include "schema_builder.hh"
#include "replica/memtable.hh"
#include "row_cache.hh"
#include "test/lib/tmpdir.hh"
#include "repair/repair.hh"
#include "mutation_partition_view.hh"
#include "mutation_rebuilder.hh"
#include "test/lib/simple_schema.hh"
#include "test/lib/flat_mutation_reader_assertions.hh"
@@ -46,7 +46,7 @@ struct mock_consumer {
size_t _consume_tombstone_call_count = 0;
size_t _consume_end_of_partition_call_count = 0;
bool _consume_end_of_stream_called = false;
std::vector<mutation_fragment> _fragments;
std::vector<mutation_fragment_v2> _fragments;
};
const schema& _schema;
reader_permit _permit;
@@ -66,15 +66,15 @@ struct mock_consumer {
return stop_iteration::no;
}
stop_iteration consume(static_row&& sr) {
_result._fragments.push_back(mutation_fragment(_schema, _permit, std::move(sr)));
_result._fragments.emplace_back(_schema, _permit, std::move(sr));
return update_depth();
}
stop_iteration consume(clustering_row&& cr) {
_result._fragments.push_back(mutation_fragment(_schema, _permit, std::move(cr)));
_result._fragments.emplace_back(_schema, _permit, std::move(cr));
return update_depth();
}
stop_iteration consume(range_tombstone&& rt) {
_result._fragments.push_back(mutation_fragment(_schema, _permit, std::move(rt)));
stop_iteration consume(range_tombstone_change&& rtc) {
_result._fragments.emplace_back(_schema, _permit, std::move(rtc));
return update_depth();
}
stop_iteration consume_end_of_partition() {
@@ -89,7 +89,7 @@ struct mock_consumer {
static size_t count_fragments(mutation m) {
tests::reader_concurrency_semaphore_wrapper semaphore;
auto r = make_flat_mutation_reader_from_mutations(m.schema(), semaphore.make_permit(), {m});
auto r = make_flat_mutation_reader_from_mutations_v2(m.schema(), semaphore.make_permit(), {m});
auto close_reader = deferred_close(r);
size_t res = 0;
auto mfopt = r().get0();
@@ -106,26 +106,20 @@ SEASTAR_TEST_CASE(test_flat_mutation_reader_consume_single_partition) {
for_each_mutation([&] (const mutation& m) {
size_t fragments_in_m = count_fragments(m);
for (size_t depth = 1; depth <= fragments_in_m + 1; ++depth) {
auto r = make_flat_mutation_reader_from_mutations(m.schema(), semaphore.make_permit(), {m});
auto r = make_flat_mutation_reader_from_mutations_v2(m.schema(), semaphore.make_permit(), {m});
auto close_reader = deferred_close(r);
auto result = r.consume(mock_consumer(*m.schema(), semaphore.make_permit(), depth)).get0();
BOOST_REQUIRE(result._consume_end_of_stream_called);
BOOST_REQUIRE_EQUAL(1, result._consume_new_partition_call_count);
BOOST_REQUIRE_EQUAL(1, result._consume_end_of_partition_call_count);
BOOST_REQUIRE_EQUAL(m.partition().partition_tombstone() ? 1 : 0, result._consume_tombstone_call_count);
auto r2 = assert_that(make_flat_mutation_reader_from_mutations(m.schema(), semaphore.make_permit(), {m}));
auto r2 = assert_that(make_flat_mutation_reader_from_mutations_v2(m.schema(), semaphore.make_permit(), {m}));
r2.produces_partition_start(m.decorated_key(), m.partition().partition_tombstone());
if (result._fragments.empty()) {
continue;
}
query::clustering_row_ranges ck_ranges = {};
if (depth > 1) {
const auto& mf = result._fragments.back();
auto ck_range = query::clustering_range::make_ending_with({mf.position().key(), mf.position().get_bound_weight() >= bound_weight::equal});
ck_ranges = {ck_range};
}
for (auto& mf : result._fragments) {
r2.produces(*m.schema(), mf, ck_ranges);
r2.produces(*m.schema(), mf);
}
}
});
@@ -139,14 +133,14 @@ SEASTAR_TEST_CASE(test_flat_mutation_reader_consume_two_partitions) {
size_t fragments_in_m1 = count_fragments(m1);
size_t fragments_in_m2 = count_fragments(m2);
for (size_t depth = 1; depth < fragments_in_m1; ++depth) {
auto r = make_flat_mutation_reader_from_mutations(m1.schema(), semaphore.make_permit(), {m1, m2});
auto r = make_flat_mutation_reader_from_mutations_v2(m1.schema(), semaphore.make_permit(), {m1, m2});
auto close_r = deferred_close(r);
auto result = r.consume(mock_consumer(*m1.schema(), semaphore.make_permit(), depth)).get0();
BOOST_REQUIRE(result._consume_end_of_stream_called);
BOOST_REQUIRE_EQUAL(1, result._consume_new_partition_call_count);
BOOST_REQUIRE_EQUAL(1, result._consume_end_of_partition_call_count);
BOOST_REQUIRE_EQUAL(m1.partition().partition_tombstone() ? 1 : 0, result._consume_tombstone_call_count);
auto r2 = make_flat_mutation_reader_from_mutations(m1.schema(), semaphore.make_permit(), {m1, m2});
auto r2 = make_flat_mutation_reader_from_mutations_v2(m1.schema(), semaphore.make_permit(), {m1, m2});
auto close_r2 = deferred_close(r2);
auto start = r2().get0();
BOOST_REQUIRE(start);
@@ -158,7 +152,7 @@ SEASTAR_TEST_CASE(test_flat_mutation_reader_consume_two_partitions) {
}
}
for (size_t depth = fragments_in_m1; depth < fragments_in_m1 + fragments_in_m2 + 1; ++depth) {
auto r = make_flat_mutation_reader_from_mutations(m1.schema(), semaphore.make_permit(), {m1, m2});
auto r = make_flat_mutation_reader_from_mutations_v2(m1.schema(), semaphore.make_permit(), {m1, m2});
auto close_r = deferred_close(r);
auto result = r.consume(mock_consumer(*m1.schema(), semaphore.make_permit(), depth)).get0();
BOOST_REQUIRE(result._consume_end_of_stream_called);
@@ -172,7 +166,7 @@ SEASTAR_TEST_CASE(test_flat_mutation_reader_consume_two_partitions) {
++tombstones_count;
}
BOOST_REQUIRE_EQUAL(tombstones_count, result._consume_tombstone_call_count);
auto r2 = make_flat_mutation_reader_from_mutations(m1.schema(), semaphore.make_permit(), {m1, m2});
auto r2 = make_flat_mutation_reader_from_mutations_v2(m1.schema(), semaphore.make_permit(), {m1, m2});
auto close_r2 = deferred_close(r2);
auto start = r2().get0();
BOOST_REQUIRE(start);
@@ -204,7 +198,7 @@ SEASTAR_TEST_CASE(test_fragmenting_and_freezing) {
for_each_mutation([&] (const mutation& m) {
std::vector<frozen_mutation> fms;
fragment_and_freeze(make_flat_mutation_reader_from_mutations(m.schema(), semaphore.make_permit(), { mutation(m) }), [&] (auto fm, bool frag) {
fragment_and_freeze(make_flat_mutation_reader_from_mutations_v2(m.schema(), semaphore.make_permit(), { mutation(m) }), [&] (auto fm, bool frag) {
BOOST_REQUIRE(!frag);
fms.emplace_back(std::move(fm));
return make_ready_future<stop_iteration>(stop_iteration::no);
@@ -218,7 +212,7 @@ SEASTAR_TEST_CASE(test_fragmenting_and_freezing) {
fms.clear();
std::optional<bool> fragmented;
fragment_and_freeze(make_flat_mutation_reader_from_mutations(m.schema(), semaphore.make_permit(), { mutation(m) }), [&] (auto fm, bool frag) {
fragment_and_freeze(make_flat_mutation_reader_from_mutations_v2(m.schema(), semaphore.make_permit(), { mutation(m) }), [&] (auto fm, bool frag) {
BOOST_REQUIRE(!fragmented || *fragmented == frag);
*fragmented = frag;
fms.emplace_back(std::move(fm));
@@ -250,7 +244,7 @@ SEASTAR_TEST_CASE(test_fragmenting_and_freezing) {
std::vector<frozen_mutation> frozen;
// Freeze all
fragment_and_freeze(make_flat_mutation_reader_from_mutations(gen.schema(), semaphore.make_permit(), muts), [&] (auto fm, bool frag) {
fragment_and_freeze(make_flat_mutation_reader_from_mutations_v2(gen.schema(), semaphore.make_permit(), muts), [&] (auto fm, bool frag) {
BOOST_REQUIRE(!frag);
frozen.emplace_back(fm);
return make_ready_future<stop_iteration>(stop_iteration::no);
@@ -262,7 +256,7 @@ SEASTAR_TEST_CASE(test_fragmenting_and_freezing) {
// Freeze first
frozen.clear();
fragment_and_freeze(make_flat_mutation_reader_from_mutations(gen.schema(), semaphore.make_permit(), muts), [&] (auto fm, bool frag) {
fragment_and_freeze(make_flat_mutation_reader_from_mutations_v2(gen.schema(), semaphore.make_permit(), muts), [&] (auto fm, bool frag) {
BOOST_REQUIRE(!frag);
frozen.emplace_back(fm);
return make_ready_future<stop_iteration>(stop_iteration::yes);
@@ -272,7 +266,7 @@ SEASTAR_TEST_CASE(test_fragmenting_and_freezing) {
// Fragment and freeze all
frozen.clear();
fragment_and_freeze(make_flat_mutation_reader_from_mutations(gen.schema(), semaphore.make_permit(), muts), [&] (auto fm, bool frag) {
fragment_and_freeze(make_flat_mutation_reader_from_mutations_v2(gen.schema(), semaphore.make_permit(), muts), [&] (auto fm, bool frag) {
frozen.emplace_back(fm);
return make_ready_future<stop_iteration>(stop_iteration::no);
}, 1).get0();
@@ -296,8 +290,8 @@ SEASTAR_TEST_CASE(test_fragmenting_and_freezing) {
}
SEASTAR_THREAD_TEST_CASE(test_flat_mutation_reader_move_buffer_content_to) {
struct dummy_reader_impl : public flat_mutation_reader::impl {
using flat_mutation_reader::impl::impl;
struct dummy_reader_impl : public flat_mutation_reader_v2::impl {
using flat_mutation_reader_v2::impl::impl;
virtual future<> fill_buffer() override { return make_ready_future<>(); }
virtual future<> next_partition() override { return make_ready_future<>(); }
virtual future<> fast_forward_to(const dht::partition_range&) override { return make_ready_future<>(); }
@@ -322,7 +316,8 @@ SEASTAR_THREAD_TEST_CASE(test_flat_mutation_reader_move_buffer_content_to) {
// having to have loads of data.
const auto max_buffer_size = size_t{100};
auto reader = make_flat_mutation_reader_from_mutations(s.schema(), semaphore.make_permit(), {mut_orig}, dht::partition_range::make_open_ended_both_sides());
const auto prange = dht::partition_range::make_open_ended_both_sides();
auto reader = make_flat_mutation_reader_from_mutations_v2(s.schema(), semaphore.make_permit(), {mut_orig}, prange);
auto close_reader = deferred_close(reader);
auto dummy_impl = std::make_unique<dummy_reader_impl>(s.schema(), semaphore.make_permit());
reader.set_max_buffer_size(max_buffer_size);
@@ -357,7 +352,7 @@ SEASTAR_THREAD_TEST_CASE(test_flat_mutation_reader_move_buffer_content_to) {
BOOST_CHECK_EQUAL(dummy_impl->buffer_size(), expected_buf_size);
}
auto dummy_reader = flat_mutation_reader(std::move(dummy_impl));
auto dummy_reader = flat_mutation_reader_v2(std::move(dummy_impl));
auto close_dummy_reader = deferred_close(dummy_reader);
auto mut_new = read_mutation_from_flat_mutation_reader(dummy_reader).get0();
@@ -388,7 +383,7 @@ SEASTAR_TEST_CASE(test_multi_range_reader) {
}));
auto source = mutation_source([&] (schema_ptr, reader_permit permit, const dht::partition_range& range) {
return make_flat_mutation_reader_from_mutations(s.schema(), std::move(permit), ms, range);
return make_flat_mutation_reader_from_mutations_v2(s.schema(), std::move(permit), ms, range);
});
const auto empty_ranges = dht::partition_range_vector{};
@@ -485,7 +480,10 @@ struct flat_stream_consumer {
skip_after_first_fragment _skip_partition;
skip_after_first_partition _skip_stream;
std::vector<mutation> _mutations;
std::optional<mutation_rebuilder_v2> _mut;
std::optional<position_in_partition> _previous_position;
tombstone _current_tombstone;
circular_buffer<range_tombstone_change> _reversed_rtcs;
bool _inside_partition = false;
private:
void verify_order(position_in_partition_view pos) {
@@ -506,48 +504,59 @@ public:
void consume_new_partition(dht::decorated_key dk) {
BOOST_REQUIRE(!_inside_partition);
BOOST_REQUIRE(!_previous_position);
_mutations.emplace_back(_schema, dk);
_mut.emplace(_schema);
_mut->consume_new_partition(dk);
_inside_partition = true;
}
void consume(tombstone pt) {
BOOST_REQUIRE(_inside_partition);
BOOST_REQUIRE(!_previous_position);
BOOST_REQUIRE_GE(_mutations.size(), 1);
_mutations.back().partition().apply(pt);
BOOST_REQUIRE(_mut);
_mut->consume(pt);
}
stop_iteration consume(static_row&& sr) {
BOOST_REQUIRE(_inside_partition);
BOOST_REQUIRE(!_previous_position);
BOOST_REQUIRE_GE(_mutations.size(), 1);
BOOST_REQUIRE(_mut);
_previous_position.emplace(sr.position());
_mutations.back().partition().apply(*_schema, mutation_fragment(*_schema, _permit, std::move(sr)));
_mut->consume(std::move(sr));
return stop_iteration(bool(_skip_partition));
}
stop_iteration consume(clustering_row&& cr) {
BOOST_REQUIRE(_inside_partition);
verify_order(cr.position());
BOOST_REQUIRE_GE(_mutations.size(), 1);
BOOST_REQUIRE(_mut);
_previous_position.emplace(cr.position());
_mutations.back().partition().apply(*_schema, mutation_fragment(*_schema, _permit, std::move(cr)));
_mut->consume(std::move(cr));
return stop_iteration(bool(_skip_partition));
}
stop_iteration consume(range_tombstone&& rt) {
stop_iteration consume(range_tombstone_change&& rtc) {
BOOST_REQUIRE(_inside_partition);
auto pos = rt.position();
auto pos = rtc.position();
verify_order(pos);
BOOST_REQUIRE_GE(_mutations.size(), 1);
BOOST_REQUIRE(_mut);
_previous_position.emplace(pos);
if (_reversed_schema) {
rt.reverse(); // undo the reversing
//FIXME: until mutation rebuilder has to do v1 conversion and hence requires rtc to be fed in schema order
_reversed_rtcs.emplace_front(std::move(pos).reversed(), std::exchange(_current_tombstone, rtc.tombstone()));
} else {
_mut->consume(std::move(rtc));
}
_mutations.back().partition().apply(*_schema, mutation_fragment(*_schema, _permit, std::move(rt)));
return stop_iteration(bool(_skip_partition));
}
stop_iteration consume_end_of_partition() {
BOOST_REQUIRE(_inside_partition);
BOOST_REQUIRE_GE(_mutations.size(), 1);
BOOST_REQUIRE(_mut);
BOOST_REQUIRE(!_current_tombstone);
_previous_position = std::nullopt;
_inside_partition = false;
for (auto&& rtc : _reversed_rtcs) {
_mut->consume(std::move(rtc));
}
_reversed_rtcs.clear();
auto mut_opt = _mut->consume_end_of_stream();
BOOST_REQUIRE(mut_opt);
_mutations.emplace_back(std::move(*mut_opt));
return stop_iteration(bool(_skip_stream));
}
std::vector<mutation> consume_end_of_stream() {
@@ -560,14 +569,14 @@ void test_flat_stream(schema_ptr s, std::vector<mutation> muts, reversed_partiti
tests::reader_concurrency_semaphore_wrapper semaphore;
auto reversed_msg = reversed ? ", reversed partitions" : "";
auto consume_fn = [&] (flat_mutation_reader& fmr, flat_stream_consumer fsc) {
auto consume_fn = [&] (flat_mutation_reader_v2& fmr, flat_stream_consumer fsc) {
if (thread) {
assert(bool(!reversed));
return fmr.consume_in_thread(std::move(fsc));
} else {
if (reversed) {
return with_closeable(make_reversing_reader(make_delegating_reader(fmr), query::max_result_size(size_t(1) << 20)),
[fsc = std::move(fsc)] (flat_mutation_reader& reverse_reader) mutable {
return with_closeable(make_reversing_reader(make_delegating_reader_v2(fmr), query::max_result_size(size_t(1) << 20)),
[fsc = std::move(fsc)] (flat_mutation_reader_v2& reverse_reader) mutable {
return reverse_reader.consume(std::move(fsc));
}).get0();
}
@@ -577,7 +586,7 @@ void test_flat_stream(schema_ptr s, std::vector<mutation> muts, reversed_partiti
{
testlog.info("Consume all{}", reversed_msg);
auto fmr = make_flat_mutation_reader_from_mutations(s, semaphore.make_permit(), muts);
auto fmr = make_flat_mutation_reader_from_mutations_v2(s, semaphore.make_permit(), muts);
auto close_fmr = deferred_close(fmr);
auto muts2 = consume_fn(fmr, flat_stream_consumer(s, semaphore.make_permit(), reversed));
BOOST_REQUIRE_EQUAL(muts, muts2);
@@ -585,7 +594,7 @@ void test_flat_stream(schema_ptr s, std::vector<mutation> muts, reversed_partiti
{
testlog.info("Consume first fragment from partition{}", reversed_msg);
auto fmr = make_flat_mutation_reader_from_mutations(s, semaphore.make_permit(), muts);
auto fmr = make_flat_mutation_reader_from_mutations_v2(s, semaphore.make_permit(), muts);
auto close_fmr = deferred_close(fmr);
auto muts2 = consume_fn(fmr, flat_stream_consumer(s, semaphore.make_permit(), reversed, skip_after_first_fragment::yes));
BOOST_REQUIRE_EQUAL(muts.size(), muts2.size());
@@ -601,7 +610,7 @@ void test_flat_stream(schema_ptr s, std::vector<mutation> muts, reversed_partiti
{
testlog.info("Consume first partition{}", reversed_msg);
auto fmr = make_flat_mutation_reader_from_mutations(s, semaphore.make_permit(), muts);
auto fmr = make_flat_mutation_reader_from_mutations_v2(s, semaphore.make_permit(), muts);
auto close_fmr = deferred_close(fmr);
auto muts2 = consume_fn(fmr, flat_stream_consumer(s, semaphore.make_permit(), reversed, skip_after_first_fragment::no,
skip_after_first_partition::yes));
@@ -610,7 +619,7 @@ void test_flat_stream(schema_ptr s, std::vector<mutation> muts, reversed_partiti
}
if (thread) {
auto filter = flat_mutation_reader::filter([&] (const dht::decorated_key& dk) {
auto filter = flat_mutation_reader_v2::filter([&] (const dht::decorated_key& dk) {
for (auto j = size_t(0); j < muts.size(); j += 2) {
if (dk.equal(*s, muts[j].decorated_key())) {
return false;
@@ -619,7 +628,7 @@ void test_flat_stream(schema_ptr s, std::vector<mutation> muts, reversed_partiti
return true;
});
testlog.info("Consume all, filtered");
auto fmr = make_flat_mutation_reader_from_mutations(s, semaphore.make_permit(), muts);
auto fmr = make_flat_mutation_reader_from_mutations_v2(s, semaphore.make_permit(), muts);
auto close_fmr = deferred_close(fmr);
auto muts2 = fmr.consume_in_thread(flat_stream_consumer(s, semaphore.make_permit(), reversed), std::move(filter));
BOOST_REQUIRE_EQUAL(muts.size() / 2, muts2.size());
@@ -668,7 +677,7 @@ SEASTAR_TEST_CASE(test_make_forwardable) {
auto make_reader = [&] (auto& range) {
return assert_that(
make_forwardable(make_flat_mutation_reader_from_mutations(s.schema(), semaphore.make_permit(), ms, range, streamed_mutation::forwarding::no)));
make_forwardable(make_flat_mutation_reader_from_mutations_v2(s.schema(), semaphore.make_permit(), ms, range, streamed_mutation::forwarding::no)));
};
auto test = [&] (auto& rd, auto& partition) {
@@ -712,7 +721,7 @@ SEASTAR_TEST_CASE(test_abandoned_flat_mutation_reader_from_mutation) {
return seastar::async([] {
tests::reader_concurrency_semaphore_wrapper semaphore;
for_each_mutation([&] (const mutation& m) {
auto rd = make_flat_mutation_reader_from_mutations(m.schema(), semaphore.make_permit(), {mutation(m)});
auto rd = make_flat_mutation_reader_from_mutations_v2(m.schema(), semaphore.make_permit(), {mutation(m)});
auto close_rd = deferred_close(rd);
rd().get();
rd().get();
@@ -732,7 +741,7 @@ SEASTAR_THREAD_TEST_CASE(test_mutation_reader_from_mutations_as_mutation_source)
tracing::trace_state_ptr,
streamed_mutation::forwarding fwd_sm,
mutation_reader::forwarding) mutable {
return make_flat_mutation_reader_from_mutations(schema, std::move(permit), squash_mutations(muts), range, slice, fwd_sm);
return make_flat_mutation_reader_from_mutations_v2(schema, std::move(permit), squash_mutations(muts), range, slice, fwd_sm);
});
};
run_mutation_source_tests(populate);
@@ -796,7 +805,7 @@ SEASTAR_THREAD_TEST_CASE(test_reverse_reader_memory_limit) {
void consume(tombstone) { }
stop_iteration consume(static_row&&) { return stop_iteration::no; }
stop_iteration consume(clustering_row&&) { return stop_iteration::no; }
stop_iteration consume(range_tombstone&&) { return stop_iteration::no; }
stop_iteration consume(range_tombstone_change&&) { return stop_iteration::no; }
stop_iteration consume_end_of_partition() { return stop_iteration::no; }
void consume_end_of_stream() { }
};
@@ -817,7 +826,7 @@ SEASTAR_THREAD_TEST_CASE(test_reverse_reader_memory_limit) {
}
const uint64_t hard_limit = size_t(1) << 18;
auto reverse_reader = make_reversing_reader(make_flat_mutation_reader_from_mutations(schema.schema(), semaphore.make_permit(), {mut}),
auto reverse_reader = make_reversing_reader(make_flat_mutation_reader_from_mutations_v2(schema.schema(), semaphore.make_permit(), {mut}),
query::max_result_size(size_t(1) << 10, hard_limit));
auto close_reverse_reader = deferred_close(reverse_reader);
@@ -889,9 +898,9 @@ SEASTAR_THREAD_TEST_CASE(test_reverse_reader_reads_in_native_reverse_order) {
reverse_mt->apply(mut.build(reverse_schema));
}
auto reversed_forward_reader = assert_that(make_reversing_reader(downgrade_to_v1(forward_mt->make_flat_reader(forward_schema, permit)), query::max_result_size(1 << 20)));
auto reversed_forward_reader = assert_that(make_reversing_reader(forward_mt->make_flat_reader(forward_schema, permit), query::max_result_size(1 << 20)));
auto reverse_reader = downgrade_to_v1(reverse_mt->make_flat_reader(reverse_schema, permit));
auto reverse_reader = reverse_mt->make_flat_reader(reverse_schema, permit);
auto deferred_reverse_close = deferred_close(reverse_reader);
while (auto mf_opt = reverse_reader().get()) {
@@ -901,16 +910,16 @@ SEASTAR_THREAD_TEST_CASE(test_reverse_reader_reads_in_native_reverse_order) {
reversed_forward_reader.produces_end_of_stream();
}
SEASTAR_THREAD_TEST_CASE(test_reverse_reader_is_mutation_source) {
std::list<query::partition_slice> reversed_slices;
auto populate = [&reversed_slices] (schema_ptr s, const std::vector<mutation> &muts) {
SEASTAR_THREAD_TEST_CASE(test_reverse_reader_v2_is_mutation_source) {
auto populate = [] (schema_ptr s, const std::vector<mutation> &muts) {
auto reverse_schema = s->make_reversed();
auto reverse_mt = make_lw_shared<replica::memtable>(reverse_schema);
auto reverse_muts = std::vector<mutation>();
reverse_muts.reserve(muts.size());
for (const auto& mut : muts) {
reverse_mt->apply(reverse(mut));
reverse_muts.emplace_back(reverse(mut));
}
return mutation_source([=, &reversed_slices] (
return mutation_source([muts = squash_mutations(muts), reverse_muts = squash_mutations(reverse_muts)] (
schema_ptr schema,
reader_permit permit,
const dht::partition_range& range,
@@ -919,22 +928,24 @@ SEASTAR_THREAD_TEST_CASE(test_reverse_reader_is_mutation_source) {
tracing::trace_state_ptr trace_ptr,
streamed_mutation::forwarding fwd_sm,
mutation_reader::forwarding fwd_mr) mutable {
flat_mutation_reader rd(nullptr);
flat_mutation_reader_v2 rd(nullptr);
std::unique_ptr<query::partition_slice> reversed_slice;
std::vector<mutation>* selected_muts;
schema = schema->make_reversed();
const auto reversed = slice.options.contains(query::partition_slice::option::reversed);
if (reversed) {
reversed_slices.emplace_back(query::half_reverse_slice(*schema, slice));
rd = make_flat_mutation_reader_from_mutations(schema, std::move(permit), squash_mutations(muts), range, reversed_slices.back());
reversed_slice = std::make_unique<query::partition_slice>(query::half_reverse_slice(*schema, slice));
selected_muts = &muts;
} else {
reversed_slices.emplace_back(query::reverse_slice(*schema, slice));
reversed_slice = std::make_unique<query::partition_slice>(query::reverse_slice(*schema, slice));
// We don't want the memtable reader to read in reverse.
reversed_slices.back().options.remove(query::partition_slice::option::reversed);
rd = downgrade_to_v1(reverse_mt->make_flat_reader(schema, std::move(permit), range, reversed_slices.back(), pc, std::move(trace_ptr),
streamed_mutation::forwarding::no, fwd_mr));
reversed_slice->options.remove(query::partition_slice::option::reversed);
selected_muts = &reverse_muts;
}
rd = make_reversing_reader(std::move(rd), query::max_result_size(1 << 20));
rd = make_flat_mutation_reader_from_mutations_v2(schema, std::move(permit), *selected_muts, range, *reversed_slice);
rd = make_reversing_reader(std::move(rd), query::max_result_size(1 << 20), std::move(reversed_slice));
if (fwd_sm) {
return make_forwardable(std::move(rd));

View File

@@ -20,7 +20,7 @@
#include "test/lib/mutation_source_test.hh"
#include <seastar/core/thread.hh>
#include "readers/from_mutations.hh"
#include "readers/from_mutations_v2.hh"
static schema_builder new_table() {
return { "some_keyspace", "some_table" };
@@ -94,7 +94,7 @@ SEASTAR_THREAD_TEST_CASE(test_frozen_mutation_fragment) {
for_each_mutation([&] (const mutation& m) {
auto& s = *m.schema();
std::vector<mutation_fragment> mfs;
auto rd = make_flat_mutation_reader_from_mutations(m.schema(), semaphore.make_permit(), { m });
auto rd = downgrade_to_v1(make_flat_mutation_reader_from_mutations_v2(m.schema(), semaphore.make_permit(), { m }));
auto close_rd = deferred_close(rd);
rd.consume_pausable([&] (mutation_fragment mf) {
mfs.emplace_back(std::move(mf));

View File

@@ -22,70 +22,15 @@
#include "schema_upgrader.hh"
#include "readers/combined.hh"
#include "replica/memtable.hh"
#include "mutation_rebuilder.hh"
#include "test/lib/mutation_assertions.hh"
#include "test/lib/reader_concurrency_semaphore.hh"
#include "test/lib/simple_schema.hh"
#include "test/lib/fragment_scatterer.hh"
#include <boost/range/algorithm/transform.hpp>
#include "readers/from_mutations.hh"
// A StreamedMutationConsumer which distributes fragments randomly into several mutations.
class fragment_scatterer {
std::vector<mutation>& _mutations;
size_t _next = 0;
private:
void for_each_target(noncopyable_function<void (mutation&)> func) {
// round-robin
func(_mutations[_next % _mutations.size()]);
++_next;
}
public:
fragment_scatterer(std::vector<mutation>& muts)
: _mutations(muts)
{ }
void consume_new_partition(const dht::decorated_key&) {}
stop_iteration consume(tombstone t) {
for_each_target([&] (mutation& m) {
m.partition().apply(t);
});
return stop_iteration::no;
}
stop_iteration consume(range_tombstone&& rt) {
for_each_target([&] (mutation& m) {
m.partition().apply_row_tombstone(*m.schema(), std::move(rt));
});
return stop_iteration::no;
}
stop_iteration consume(static_row&& sr) {
for_each_target([&] (mutation& m) {
m.partition().static_row().apply(*m.schema(), column_kind::static_column, std::move(sr.cells()));
});
return stop_iteration::no;
}
stop_iteration consume(clustering_row&& cr) {
for_each_target([&] (mutation& m) {
auto& dr = m.partition().clustered_row(*m.schema(), std::move(cr.key()));
dr.apply(cr.tomb());
dr.apply(cr.marker());
dr.cells().apply(*m.schema(), column_kind::regular_column, std::move(cr.cells()));
});
return stop_iteration::no;
}
stop_iteration consume_end_of_partition() {
return stop_iteration::no;
}
stop_iteration consume_end_of_stream() {
return stop_iteration::no;
}
};
#include "readers/from_mutations_v2.hh"
SEASTAR_TEST_CASE(test_mutation_merger_conforms_to_mutation_source) {
return seastar::async([] {
@@ -102,13 +47,9 @@ SEASTAR_TEST_CASE(test_mutation_merger_conforms_to_mutation_source) {
}
for (auto&& m : partitions) {
std::vector<mutation> muts;
for (int i = 0; i < n; ++i) {
muts.push_back(mutation(m.schema(), m.decorated_key()));
}
auto rd = make_flat_mutation_reader_from_mutations(s, semaphore.make_permit(), {m});
auto rd = make_flat_mutation_reader_from_mutations_v2(s, semaphore.make_permit(), {m});
auto close_rd = deferred_close(rd);
rd.consume(fragment_scatterer{muts}).get();
auto muts = rd.consume(fragment_scatterer(s, n)).get();
for (int i = 0; i < n; ++i) {
memtables[i]->apply(std::move(muts[i]));
}
@@ -397,7 +338,7 @@ SEASTAR_TEST_CASE(test_schema_upgrader_is_equivalent_with_mutation_upgrade) {
if (m1.schema()->version() != m2.schema()->version()) {
// upgrade m1 to m2's schema
auto reader = transform(make_flat_mutation_reader_from_mutations(m1.schema(), semaphore.make_permit(), {m1}), schema_upgrader(m2.schema()));
auto reader = transform(make_flat_mutation_reader_from_mutations_v2(m1.schema(), semaphore.make_permit(), {m1}), schema_upgrader_v2(m2.schema()));
auto close_reader = deferred_close(reader);
auto from_upgrader = read_mutation_from_flat_mutation_reader(reader).get0();

View File

@@ -27,7 +27,7 @@
#include <seastar/core/thread.hh>
#include "schema_builder.hh"
#include "partition_slice_builder.hh"
#include "readers/from_mutations.hh"
#include "readers/from_mutations_v2.hh"
using namespace std::literals::chrono_literals;
@@ -59,7 +59,7 @@ mutation_source make_source(std::vector<mutation> mutations) {
assert(m.schema() == s);
}
}
return make_flat_mutation_reader_from_mutations(s, std::move(permit), mutations, slice, fwd);
return make_flat_mutation_reader_from_mutations_v2(s, std::move(permit), mutations, slice, fwd);
});
}

View File

@@ -32,6 +32,7 @@
#include "test/lib/reader_concurrency_semaphore.hh"
#include "test/lib/random_utils.hh"
#include "test/lib/simple_position_reader_queue.hh"
#include "test/lib/fragment_scatterer.hh"
#include "dht/sharder.hh"
#include "schema_builder.hh"
@@ -47,7 +48,6 @@
#include <boost/range/algorithm/sort.hpp>
#include "readers/from_mutations_v2.hh"
#include "readers/from_mutations.hh"
#include "readers/forwardable_v2.hh"
#include "readers/forwardable.hh"
#include "readers/from_fragments_v2.hh"
@@ -1216,16 +1216,13 @@ SEASTAR_TEST_CASE(test_combined_mutation_source_is_a_mutation_source) {
memtables.push_back(make_lw_shared<replica::memtable>(s));
}
int source_index = 0;
for (auto&& m : muts) {
auto rd = make_flat_mutation_reader_from_mutations(s, semaphore.make_permit(), {m});
auto rd = make_flat_mutation_reader_from_mutations_v2(s, semaphore.make_permit(), {m});
auto close_rd = deferred_close(rd);
rd.consume_pausable([&] (mutation_fragment&& mf) {
mutation mf_m(m.schema(), m.decorated_key());
mf_m.partition().apply(*s, mf);
memtables[source_index++ % memtables.size()]->apply(mf_m);
return stop_iteration::no;
}).get();
auto muts = rd.consume(fragment_scatterer(s, n_sources)).get();
for (int i = 0; i < n_sources; ++i) {
memtables[i]->apply(std::move(muts[i]));
}
}
std::vector<mutation_source> sources;
@@ -4061,32 +4058,36 @@ SEASTAR_THREAD_TEST_CASE(clustering_combined_reader_mutation_source_test) {
std::optional<std::pair<position_in_partition, position_in_partition>> bounds;
position_in_partition::less_compare less{*s};
mutation good(m.schema(), dk);
std::optional<mutation> bad;
auto rd = make_flat_mutation_reader_from_mutations(s, semaphore.make_permit(), {m});
mutation_rebuilder_v2 good(m.schema());
good.consume_new_partition(dk);
std::optional<mutation_rebuilder_v2> bad;
auto rd = make_flat_mutation_reader_from_mutations_v2(s, semaphore.make_permit(), {m});
auto close_rd = deferred_close(rd);
rd.consume_pausable([&] (mutation_fragment&& mf) {
if ((mf.is_partition_start() && mf.as_partition_start().partition_tombstone()) || mf.is_static_row()) {
rd.consume_pausable([&] (mutation_fragment_v2&& mf) {
if (mf.is_partition_start()) {
if (mf.as_partition_start().partition_tombstone()) {
bad.emplace(m.schema());
bad->consume(std::move(mf));
}
} else if (mf.is_static_row()) {
if (!bad) {
bad = mutation{m.schema(), dk};
bad.emplace(m.schema());
bad->consume_new_partition(dk);
}
bad->apply(std::move(mf));
} else {
if (!mf.is_partition_start() && !mf.is_end_of_partition()) {
auto upper = mf.is_range_tombstone() ? mf.as_range_tombstone().end : mf.position();
if (!bounds) {
bounds = std::pair{mf.position(), upper};
} else if (less(bounds->second, upper)) {
bounds->second = upper;
}
bad->consume(std::move(mf));
} else if (!mf.is_end_of_partition()) {
if (!bounds) {
bounds = std::pair{mf.position(), mf.position()};
} else {
bounds->second = mf.position();
}
good.apply(std::move(mf));
good.consume(std::move(mf));
}
return stop_iteration::no;
}).get();
mutation_bounds mb {
std::move(good),
std::move(*good.consume_end_of_stream()),
bounds ? std::move(bounds->first) : position_in_partition::before_all_clustered_rows(),
bounds ? std::move(bounds->second) : position_in_partition::after_all_clustered_rows()
};
@@ -4099,7 +4100,7 @@ SEASTAR_THREAD_TEST_CASE(clustering_combined_reader_mutation_source_test) {
}
if (bad) {
bad_mutations.push_back(std::move(*bad));
bad_mutations.push_back(std::move(*bad->consume_end_of_stream()));
}
}
@@ -4192,7 +4193,7 @@ SEASTAR_THREAD_TEST_CASE(test_generating_reader_v1) {
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd_sm,
mutation_reader::forwarding fwd_mr) {
auto underlying = make_flat_mutation_reader_from_mutations(schema, permit, squash_mutations(muts), range, slice);
auto underlying = downgrade_to_v1(make_flat_mutation_reader_from_mutations_v2(schema, permit, squash_mutations(muts), range, slice));
auto rd = make_next_partition_adaptor(make_generating_reader_v1(schema, permit, closing_holder(std::move(underlying))));
if (fwd_sm == streamed_mutation::forwarding::yes) {
return make_forwardable(std::move(rd));

View File

@@ -19,7 +19,7 @@
#include <seastar/util/closeable.hh>
#include <boost/range/algorithm/sort.hpp>
#include "readers/from_mutations.hh"
#include "readers/from_mutations_v2.hh"
#include "readers/empty_v2.hh"
using namespace std::chrono_literals;
@@ -156,7 +156,7 @@ public:
, _cache(entry_ttl)
, _mutations(make_mutations(_s, external_make_value))
, _mutation_source([this] (schema_ptr schema, reader_permit permit, const dht::partition_range& range) {
auto rd = make_flat_mutation_reader_from_mutations(schema, std::move(permit), _mutations, range);
auto rd = make_flat_mutation_reader_from_mutations_v2(schema, std::move(permit), _mutations, range);
rd.set_max_buffer_size(max_reader_buffer_size);
return rd;
}) {

View File

@@ -33,7 +33,6 @@
#include "test/lib/random_utils.hh"
#include <boost/range/algorithm/min_element.hpp>
#include "readers/from_mutations.hh"
#include "readers/from_mutations_v2.hh"
#include "readers/delegating.hh"
#include "readers/delegating_v2.hh"
@@ -84,7 +83,7 @@ snapshot_source make_decorated_snapshot_source(snapshot_source src, std::functio
mutation_source make_source_with(mutation m) {
return mutation_source([m] (schema_ptr s, reader_permit permit, const dht::partition_range&, const query::partition_slice&, const io_priority_class&, tracing::trace_state_ptr, streamed_mutation::forwarding fwd) {
assert(m.schema() == s);
return make_flat_mutation_reader_from_mutations(s, std::move(permit), {m}, std::move(fwd));
return make_flat_mutation_reader_from_mutations_v2(s, std::move(permit), {m}, std::move(fwd));
});
}

View File

@@ -0,0 +1,85 @@
/*
* Copyright (C) 2022-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#include "mutation.hh"
#include "mutation_rebuilder.hh"
// A StreamedMutationConsumer which distributes fragments randomly into several mutations.
class fragment_scatterer {
schema_ptr _schema;
size_t _n;
std::vector<mutation_rebuilder_v2> _mutations;
size_t _next = 0;
std::optional<size_t> _last_rt;
private:
void for_each_target(noncopyable_function<void (mutation_rebuilder_v2&)> func) {
// round-robin
func(_mutations[_next % _mutations.size()]);
++_next;
}
public:
explicit fragment_scatterer(schema_ptr schema, size_t n) : _schema(std::move(schema)), _n(n)
{ }
void consume_new_partition(const dht::decorated_key& dk) {
_mutations.reserve(_n);
for (size_t i = 0; i < _n; ++i) {
_mutations.emplace_back(_schema);
_mutations.back().consume_new_partition(dk);
}
}
stop_iteration consume(tombstone t) {
for_each_target([&] (mutation_rebuilder_v2& m) {
m.consume(t);
});
return stop_iteration::no;
}
stop_iteration consume(range_tombstone_change&& rtc) {
if (_last_rt) {
_mutations[*_last_rt].consume(range_tombstone_change(rtc.position(), {}));
}
if (rtc.tombstone()) {
const auto i = _next % _mutations.size();
_mutations[i].consume(std::move(rtc));
_last_rt = i;
} else {
_last_rt.reset();
}
++_next;
return stop_iteration::no;
}
stop_iteration consume(static_row&& sr) {
for_each_target([&] (mutation_rebuilder_v2& m) {
m.consume(std::move(sr));
});
return stop_iteration::no;
}
stop_iteration consume(clustering_row&& cr) {
for_each_target([&] (mutation_rebuilder_v2& m) {
m.consume(std::move(cr));
});
return stop_iteration::no;
}
stop_iteration consume_end_of_partition() {
return stop_iteration::no;
}
std::vector<mutation> consume_end_of_stream() {
std::vector<mutation> muts;
muts.reserve(_mutations.size());
for (auto& mut_builder : _mutations) {
muts.emplace_back(*mut_builder.consume_end_of_stream());
}
return muts;
}
};