row_cache: update reader implementations to v2
cache_flat_mutation_reader gets a native v2 implementation. The underlying mutation representation is not changed: range deletions are still stored as v1 range_tombstones in mutation_partition. These are converted to range tombstone changes during reading. This allows for separating the change of a native v2 reader implementation and a native v2 in-memory storage format, enabling the two to be done at separate times and incrementally.
This commit is contained in:
@@ -13,16 +13,17 @@
|
||||
#include "mutation_fragment.hh"
|
||||
#include "query-request.hh"
|
||||
#include "partition_snapshot_row_cursor.hh"
|
||||
#include "range_tombstone_assembler.hh"
|
||||
#include "read_context.hh"
|
||||
#include "readers/flat_mutation_reader.hh"
|
||||
#include "readers/delegating.hh"
|
||||
#include "readers/delegating_v2.hh"
|
||||
#include "clustering_key_filter.hh"
|
||||
|
||||
namespace cache {
|
||||
|
||||
extern logging::logger clogger;
|
||||
|
||||
class cache_flat_mutation_reader final : public flat_mutation_reader::impl {
|
||||
class cache_flat_mutation_reader final : public flat_mutation_reader_v2::impl {
|
||||
enum class state {
|
||||
before_static_row,
|
||||
|
||||
@@ -51,6 +52,46 @@ class cache_flat_mutation_reader final : public flat_mutation_reader::impl {
|
||||
|
||||
end_of_stream
|
||||
};
|
||||
enum class source {
|
||||
cache = 0,
|
||||
underlying = 1,
|
||||
};
|
||||
// Merges range tombstone change streams coming from underlying and the cache.
|
||||
// Ensures no range tombstone change fragment is emitted when there is no
|
||||
// actual change in the effective tombstone.
|
||||
class range_tombstone_change_merger {
|
||||
const schema& _schema;
|
||||
position_in_partition _pos;
|
||||
tombstone _current_tombstone;
|
||||
std::array<tombstone, 2> _tombstones;
|
||||
private:
|
||||
std::optional<range_tombstone_change> do_flush(position_in_partition pos, bool end_of_range) {
|
||||
std::optional<range_tombstone_change> ret;
|
||||
position_in_partition::tri_compare cmp(_schema);
|
||||
const auto res = cmp(_pos, pos);
|
||||
const auto should_flush = end_of_range ? res <= 0 : res < 0;
|
||||
if (should_flush) {
|
||||
auto merged_tomb = std::max(_tombstones.front(), _tombstones.back());
|
||||
if (merged_tomb != _current_tombstone) {
|
||||
_current_tombstone = merged_tomb;
|
||||
ret.emplace(_pos, _current_tombstone);
|
||||
}
|
||||
_pos = std::move(pos);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
public:
|
||||
range_tombstone_change_merger(const schema& s) : _schema(s), _pos(position_in_partition::before_all_clustered_rows()), _tombstones{}
|
||||
{ }
|
||||
std::optional<range_tombstone_change> apply(source src, range_tombstone_change&& rtc) {
|
||||
auto ret = do_flush(rtc.position(), false);
|
||||
_tombstones[static_cast<size_t>(src)] = rtc.tombstone();
|
||||
return ret;
|
||||
}
|
||||
std::optional<range_tombstone_change> flush(position_in_partition_view pos, bool end_of_range) {
|
||||
return do_flush(position_in_partition(pos), end_of_range);
|
||||
}
|
||||
};
|
||||
partition_snapshot_ptr _snp;
|
||||
|
||||
query::clustering_key_filter_ranges _ck_ranges; // Query schema domain, reversed reads use native order
|
||||
@@ -66,6 +107,7 @@ class cache_flat_mutation_reader final : public flat_mutation_reader::impl {
|
||||
// range_tombstones with positions <= _lower_bound.
|
||||
position_in_partition _lower_bound; // Query schema domain
|
||||
position_in_partition_view _upper_bound; // Query schema domain
|
||||
std::optional<position_in_partition> _underlying_upper_bound; // Query schema domain
|
||||
|
||||
// cache_flat_mutation_reader may be constructed either
|
||||
// with a read_context&, where it knows that the read_context
|
||||
@@ -80,6 +122,19 @@ class cache_flat_mutation_reader final : public flat_mutation_reader::impl {
|
||||
read_context& _read_context;
|
||||
partition_snapshot_row_cursor _next_row;
|
||||
|
||||
range_tombstone_change_generator _rt_gen; // cache -> reader
|
||||
range_tombstone_assembler _rt_assembler; // underlying -> cache
|
||||
range_tombstone_change_merger _rt_merger; // {cache, underlying} -> reader
|
||||
|
||||
// When the read moves to the underlying, the read range will be
|
||||
// (_lower_bound, x], where x is either _next_row.position() or _upper_bound.
|
||||
// In the former case (x is _next_row.position()), underlying can emit
|
||||
// a range tombstone change for after_key(x), which is outside the range.
|
||||
// We can't push this fragment into the buffer straight away, the cache may
|
||||
// have fragments with smaller position. So we save it here and flush it when
|
||||
// a fragment with a larger position is seen.
|
||||
std::optional<mutation_fragment_v2> _queued_underlying_fragment;
|
||||
|
||||
state _state = state::before_static_row;
|
||||
|
||||
bool _next_row_in_range = false;
|
||||
@@ -98,8 +153,8 @@ class cache_flat_mutation_reader final : public flat_mutation_reader::impl {
|
||||
|
||||
// Points to the underlying reader conforming to _schema,
|
||||
// either to *_underlying_holder or _read_context.underlying().underlying().
|
||||
flat_mutation_reader* _underlying = nullptr;
|
||||
flat_mutation_reader_opt _underlying_holder;
|
||||
flat_mutation_reader_v2* _underlying = nullptr;
|
||||
flat_mutation_reader_v2_opt _underlying_holder;
|
||||
|
||||
future<> do_fill_buffer();
|
||||
future<> ensure_underlying();
|
||||
@@ -110,11 +165,13 @@ class cache_flat_mutation_reader final : public flat_mutation_reader::impl {
|
||||
void move_to_range(query::clustering_row_ranges::const_iterator);
|
||||
void move_to_next_entry();
|
||||
void maybe_drop_last_entry() noexcept;
|
||||
void flush_tombstones(position_in_partition_view, bool end_of_range = false);
|
||||
void add_to_buffer(const partition_snapshot_row_cursor&);
|
||||
void add_clustering_row_to_buffer(mutation_fragment&&);
|
||||
void add_to_buffer(range_tombstone&&);
|
||||
void add_clustering_row_to_buffer(mutation_fragment_v2&&);
|
||||
void add_to_buffer(range_tombstone_change&&, source);
|
||||
void do_add_to_buffer(range_tombstone_change&&);
|
||||
void add_range_tombstone_to_buffer(range_tombstone&&);
|
||||
void add_to_buffer(mutation_fragment&&);
|
||||
void add_to_buffer(mutation_fragment_v2&&);
|
||||
future<> read_from_underlying();
|
||||
void start_reading_from_underlying();
|
||||
bool after_current_range(position_in_partition_view position);
|
||||
@@ -131,9 +188,9 @@ class cache_flat_mutation_reader final : public flat_mutation_reader::impl {
|
||||
// if !_read_context.is_reversed() then _last_row is valid after this or the population lower bound
|
||||
// is before all rows (so _last_row doesn't point at any entry).
|
||||
bool ensure_population_lower_bound();
|
||||
void maybe_add_to_cache(const mutation_fragment& mf);
|
||||
void maybe_add_to_cache(const mutation_fragment_v2& mf);
|
||||
void maybe_add_to_cache(const clustering_row& cr);
|
||||
void maybe_add_to_cache(const range_tombstone& rt);
|
||||
void maybe_add_to_cache(const range_tombstone_change& rtc);
|
||||
void maybe_add_to_cache(const static_row& sr);
|
||||
void maybe_set_static_row_continuous();
|
||||
void finish_reader() {
|
||||
@@ -177,7 +234,7 @@ public:
|
||||
read_context& ctx,
|
||||
partition_snapshot_ptr snp,
|
||||
row_cache& cache)
|
||||
: flat_mutation_reader::impl(std::move(s), ctx.permit())
|
||||
: flat_mutation_reader_v2::impl(std::move(s), ctx.permit())
|
||||
, _snp(std::move(snp))
|
||||
, _ck_ranges(std::move(crr))
|
||||
, _ck_ranges_curr(_ck_ranges.begin())
|
||||
@@ -188,6 +245,8 @@ public:
|
||||
, _read_context_holder()
|
||||
, _read_context(ctx) // ctx is owned by the caller, who's responsible for closing it.
|
||||
, _next_row(*_schema, *_snp, false, _read_context.is_reversed())
|
||||
, _rt_gen(*_schema)
|
||||
, _rt_merger(*_schema)
|
||||
{
|
||||
clogger.trace("csm {}: table={}.{}, reversed={}, snap={}", fmt::ptr(this), _schema->ks_name(), _schema->cf_name(), _read_context.is_reversed(),
|
||||
fmt::ptr(&*_snp));
|
||||
@@ -238,13 +297,13 @@ future<> cache_flat_mutation_reader::process_static_row() {
|
||||
return _snp->static_row(_read_context.digest_requested());
|
||||
});
|
||||
if (!sr.empty()) {
|
||||
push_mutation_fragment(mutation_fragment(*_schema, _permit, std::move(sr)));
|
||||
push_mutation_fragment(*_schema, _permit, std::move(sr));
|
||||
}
|
||||
return make_ready_future<>();
|
||||
} else {
|
||||
_read_context.cache().on_row_miss();
|
||||
return ensure_underlying().then([this] {
|
||||
return (*_underlying)().then([this] (mutation_fragment_opt&& sr) {
|
||||
return (*_underlying)().then([this] (mutation_fragment_v2_opt&& sr) {
|
||||
if (sr) {
|
||||
assert(sr->is_static_row());
|
||||
maybe_add_to_cache(sr->as_static_row());
|
||||
@@ -294,7 +353,7 @@ future<> cache_flat_mutation_reader::ensure_underlying() {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
return _read_context.ensure_underlying().then([this] {
|
||||
flat_mutation_reader& ctx_underlying = _read_context.underlying().underlying();
|
||||
flat_mutation_reader_v2& ctx_underlying = _read_context.underlying().underlying();
|
||||
if (ctx_underlying.schema() != _schema) {
|
||||
_underlying_holder = make_delegating_reader(ctx_underlying);
|
||||
_underlying_holder->upgrade_schema(_schema);
|
||||
@@ -318,9 +377,9 @@ future<> cache_flat_mutation_reader::do_fill_buffer() {
|
||||
if (!_read_context.partition_exists()) {
|
||||
return read_from_underlying();
|
||||
}
|
||||
auto end = _next_row_in_range ? position_in_partition(_next_row.position())
|
||||
_underlying_upper_bound = _next_row_in_range ? position_in_partition(_next_row.position())
|
||||
: position_in_partition(_upper_bound);
|
||||
return _underlying->fast_forward_to(position_range{_lower_bound, std::move(end)}).then([this] {
|
||||
return _underlying->fast_forward_to(position_range{_lower_bound, *_underlying_upper_bound}).then([this] {
|
||||
return read_from_underlying();
|
||||
});
|
||||
}
|
||||
@@ -363,12 +422,13 @@ inline
|
||||
future<> cache_flat_mutation_reader::read_from_underlying() {
|
||||
return consume_mutation_fragments_until(*_underlying,
|
||||
[this] { return _state != state::reading_from_underlying || is_buffer_full(); },
|
||||
[this] (mutation_fragment mf) {
|
||||
[this] (mutation_fragment_v2 mf) {
|
||||
_read_context.cache().on_row_miss();
|
||||
maybe_add_to_cache(mf);
|
||||
add_to_buffer(std::move(mf));
|
||||
},
|
||||
[this] {
|
||||
_underlying_upper_bound.reset();
|
||||
_state = state::reading_from_cache;
|
||||
_lsa_manager.run_in_update_section([this] {
|
||||
auto same_pos = _next_row.maybe_refresh();
|
||||
@@ -495,9 +555,9 @@ void cache_flat_mutation_reader::maybe_update_continuity() {
|
||||
}
|
||||
|
||||
inline
|
||||
void cache_flat_mutation_reader::maybe_add_to_cache(const mutation_fragment& mf) {
|
||||
if (mf.is_range_tombstone()) {
|
||||
maybe_add_to_cache(mf.as_range_tombstone());
|
||||
void cache_flat_mutation_reader::maybe_add_to_cache(const mutation_fragment_v2& mf) {
|
||||
if (mf.is_range_tombstone_change()) {
|
||||
maybe_add_to_cache(mf.as_range_tombstone_change());
|
||||
} else {
|
||||
assert(mf.is_clustering_row());
|
||||
const clustering_row& cr = mf.as_clustering_row();
|
||||
@@ -513,9 +573,16 @@ void cache_flat_mutation_reader::maybe_add_to_cache(const clustering_row& cr) {
|
||||
_read_context.cache().on_mispopulate();
|
||||
return;
|
||||
}
|
||||
auto rt_opt = _rt_assembler.flush(*_schema, position_in_partition::after_key(cr.key()));
|
||||
clogger.trace("csm {}: populate({})", fmt::ptr(this), clustering_row::printer(*_schema, cr));
|
||||
_lsa_manager.run_in_update_section_with_allocator([this, &cr] {
|
||||
_lsa_manager.run_in_update_section_with_allocator([this, &cr, &rt_opt] {
|
||||
mutation_partition& mp = _snp->version()->partition();
|
||||
|
||||
if (rt_opt) {
|
||||
clogger.trace("csm {}: populate flushed rt({})", fmt::ptr(this), *rt_opt);
|
||||
mp.mutable_row_tombstones().apply_monotonically(table_schema(), to_table_domain(range_tombstone(*rt_opt)));
|
||||
}
|
||||
|
||||
rows_entry::tri_compare cmp(table_schema());
|
||||
|
||||
if (_read_context.digest_requested()) {
|
||||
@@ -571,11 +638,6 @@ void cache_flat_mutation_reader::copy_from_cache_to_buffer() {
|
||||
position_in_partition_view next_lower_bound = _next_row.dummy() ? _next_row.position() : position_in_partition_view::after_key(_next_row.key());
|
||||
auto upper_bound = _next_row_in_range ? next_lower_bound : _upper_bound;
|
||||
if (_snp->range_tombstones(_lower_bound, upper_bound, [&] (range_tombstone rts) {
|
||||
position_in_partition::less_compare less(*_schema);
|
||||
// Avoid emitting overlapping range tombstones for performance reasons.
|
||||
if (less(upper_bound, rts.end_position())) {
|
||||
rts.set_end(*_schema, upper_bound);
|
||||
}
|
||||
add_range_tombstone_to_buffer(std::move(rts));
|
||||
return stop_iteration(_lower_bound_changed && is_buffer_full());
|
||||
}, _read_context.is_reversed()) == stop_iteration::no) {
|
||||
@@ -599,6 +661,10 @@ void cache_flat_mutation_reader::move_to_end() {
|
||||
|
||||
inline
|
||||
void cache_flat_mutation_reader::move_to_next_range() {
|
||||
if (_queued_underlying_fragment) {
|
||||
add_to_buffer(*std::exchange(_queued_underlying_fragment, {}));
|
||||
}
|
||||
flush_tombstones(position_in_partition::for_range_end(*_ck_ranges_curr), true);
|
||||
auto next_it = std::next(_ck_ranges_curr);
|
||||
if (next_it == _ck_ranges_end) {
|
||||
move_to_end();
|
||||
@@ -615,6 +681,7 @@ void cache_flat_mutation_reader::move_to_range(query::clustering_row_ranges::con
|
||||
_last_row = nullptr;
|
||||
_lower_bound = std::move(lb);
|
||||
_upper_bound = std::move(ub);
|
||||
_rt_gen.trim(_lower_bound);
|
||||
_lower_bound_changed = true;
|
||||
_ck_ranges_curr = next_it;
|
||||
auto adjacent = _next_row.advance_to(_lower_bound);
|
||||
@@ -706,27 +773,49 @@ void cache_flat_mutation_reader::move_to_next_entry() {
|
||||
}
|
||||
}
|
||||
|
||||
void cache_flat_mutation_reader::flush_tombstones(position_in_partition_view pos, bool end_of_range) {
|
||||
// Ensure position is appropriate for range tombstone bound
|
||||
pos = position_in_partition_view::after_key(pos);
|
||||
clogger.trace("csm {}: flush_tombstones({}) end_of_range: {}", fmt::ptr(this), pos, end_of_range);
|
||||
_rt_gen.flush(pos, [this] (range_tombstone_change&& rtc) {
|
||||
add_to_buffer(std::move(rtc), source::cache);
|
||||
}, end_of_range);
|
||||
if (auto rtc_opt = _rt_merger.flush(pos, end_of_range)) {
|
||||
do_add_to_buffer(std::move(*rtc_opt));
|
||||
}
|
||||
}
|
||||
|
||||
inline
|
||||
void cache_flat_mutation_reader::add_to_buffer(mutation_fragment&& mf) {
|
||||
clogger.trace("csm {}: add_to_buffer({})", fmt::ptr(this), mutation_fragment::printer(*_schema, mf));
|
||||
void cache_flat_mutation_reader::add_to_buffer(mutation_fragment_v2&& mf) {
|
||||
clogger.trace("csm {}: add_to_buffer({})", fmt::ptr(this), mutation_fragment_v2::printer(*_schema, mf));
|
||||
position_in_partition::less_compare less(*_schema);
|
||||
if (_underlying_upper_bound && less(*_underlying_upper_bound, mf.position())) {
|
||||
_queued_underlying_fragment = std::move(mf);
|
||||
return;
|
||||
}
|
||||
flush_tombstones(mf.position());
|
||||
if (mf.is_clustering_row()) {
|
||||
add_clustering_row_to_buffer(std::move(mf));
|
||||
} else {
|
||||
assert(mf.is_range_tombstone());
|
||||
add_to_buffer(std::move(mf).as_range_tombstone());
|
||||
assert(mf.is_range_tombstone_change());
|
||||
add_to_buffer(std::move(mf).as_range_tombstone_change(), source::underlying);
|
||||
}
|
||||
}
|
||||
|
||||
inline
|
||||
void cache_flat_mutation_reader::add_to_buffer(const partition_snapshot_row_cursor& row) {
|
||||
position_in_partition::less_compare less(*_schema);
|
||||
if (_queued_underlying_fragment && less(_queued_underlying_fragment->position(), row.position())) {
|
||||
add_to_buffer(*std::exchange(_queued_underlying_fragment, {}));
|
||||
}
|
||||
if (!row.dummy()) {
|
||||
_read_context.cache().on_row_hit();
|
||||
if (_read_context.digest_requested()) {
|
||||
row.latest_row().cells().prepare_hash(table_schema(), column_kind::regular_column);
|
||||
}
|
||||
add_clustering_row_to_buffer(mutation_fragment(*_schema, _permit, row.row()));
|
||||
flush_tombstones(position_in_partition_view::for_key(row.key()));
|
||||
add_clustering_row_to_buffer(mutation_fragment_v2(*_schema, _permit, row.row()));
|
||||
} else {
|
||||
position_in_partition::less_compare less(*_schema);
|
||||
if (less(_lower_bound, row.position())) {
|
||||
_lower_bound = row.position();
|
||||
_lower_bound_changed = true;
|
||||
@@ -739,8 +828,8 @@ void cache_flat_mutation_reader::add_to_buffer(const partition_snapshot_row_curs
|
||||
// (1) no fragment with position >= _lower_bound was pushed yet
|
||||
// (2) If _lower_bound > mf.position(), mf was emitted
|
||||
inline
|
||||
void cache_flat_mutation_reader::add_clustering_row_to_buffer(mutation_fragment&& mf) {
|
||||
clogger.trace("csm {}: add_clustering_row_to_buffer({})", fmt::ptr(this), mutation_fragment::printer(*_schema, mf));
|
||||
void cache_flat_mutation_reader::add_clustering_row_to_buffer(mutation_fragment_v2&& mf) {
|
||||
clogger.trace("csm {}: add_clustering_row_to_buffer({})", fmt::ptr(this), mutation_fragment_v2::printer(*_schema, mf));
|
||||
auto& row = mf.as_clustering_row();
|
||||
auto new_lower_bound = position_in_partition::after_key(row.key());
|
||||
push_mutation_fragment(std::move(mf));
|
||||
@@ -752,32 +841,45 @@ void cache_flat_mutation_reader::add_clustering_row_to_buffer(mutation_fragment&
|
||||
}
|
||||
|
||||
inline
|
||||
void cache_flat_mutation_reader::add_to_buffer(range_tombstone&& rt) {
|
||||
clogger.trace("csm {}: add_to_buffer({})", fmt::ptr(this), rt);
|
||||
// This guarantees that rt starts after any emitted clustering_row
|
||||
// and not before any emitted range tombstone.
|
||||
position_in_partition::less_compare less(*_schema);
|
||||
if (less(_lower_bound, rt.end_position())) {
|
||||
add_range_tombstone_to_buffer(std::move(rt));
|
||||
void cache_flat_mutation_reader::add_to_buffer(range_tombstone_change&& rtc, source src) {
|
||||
clogger.trace("csm {}: add_to_buffer({})", fmt::ptr(this), rtc);
|
||||
if (auto rtc_opt = _rt_merger.apply(src, std::move(rtc))) {
|
||||
do_add_to_buffer(std::move(*rtc_opt));
|
||||
}
|
||||
}
|
||||
|
||||
inline
|
||||
void cache_flat_mutation_reader::do_add_to_buffer(range_tombstone_change&& rtc) {
|
||||
clogger.trace("csm {}: push({})", fmt::ptr(this), rtc);
|
||||
position_in_partition::less_compare less(*_schema);
|
||||
_lower_bound = position_in_partition(rtc.position());
|
||||
_lower_bound_changed = less(_lower_bound, rtc.position());
|
||||
push_mutation_fragment(*_schema, _permit, std::move(rtc));
|
||||
_read_context.cache()._tracker.on_range_tombstone_read();
|
||||
}
|
||||
|
||||
inline
|
||||
void cache_flat_mutation_reader::add_range_tombstone_to_buffer(range_tombstone&& rt) {
|
||||
position_in_partition::less_compare less(*_schema);
|
||||
if (_queued_underlying_fragment && less(_queued_underlying_fragment->position(), rt.position())) {
|
||||
add_to_buffer(*std::exchange(_queued_underlying_fragment, {}));
|
||||
}
|
||||
clogger.trace("csm {}: add_to_buffer({})", fmt::ptr(this), rt);
|
||||
if (!less(_lower_bound, rt.position())) {
|
||||
rt.set_start(_lower_bound);
|
||||
} else {
|
||||
_lower_bound = position_in_partition(rt.position());
|
||||
_lower_bound_changed = true;
|
||||
}
|
||||
clogger.trace("csm {}: push({})", fmt::ptr(this), rt);
|
||||
push_mutation_fragment(*_schema, _permit, std::move(rt));
|
||||
_read_context.cache()._tracker.on_range_tombstone_read();
|
||||
flush_tombstones(rt.position());
|
||||
_rt_gen.consume(std::move(rt));
|
||||
}
|
||||
|
||||
inline
|
||||
void cache_flat_mutation_reader::maybe_add_to_cache(const range_tombstone& rt) {
|
||||
void cache_flat_mutation_reader::maybe_add_to_cache(const range_tombstone_change& rtc) {
|
||||
clogger.trace("csm {}: maybe_add_to_cache({})", fmt::ptr(this), rtc);
|
||||
auto rt_opt = _rt_assembler.consume(*_schema, range_tombstone_change(rtc));
|
||||
if (!rt_opt) {
|
||||
return;
|
||||
}
|
||||
const auto& rt = *rt_opt;
|
||||
if (can_populate()) {
|
||||
clogger.trace("csm {}: maybe_add_to_cache({})", fmt::ptr(this), rt);
|
||||
_lsa_manager.run_in_update_section_with_allocator([&] {
|
||||
@@ -825,25 +927,25 @@ bool cache_flat_mutation_reader::can_populate() const {
|
||||
|
||||
// pass a reference to ctx to cache_flat_mutation_reader
|
||||
// keeping its ownership at caller's.
|
||||
inline flat_mutation_reader make_cache_flat_mutation_reader(schema_ptr s,
|
||||
inline flat_mutation_reader_v2 make_cache_flat_mutation_reader(schema_ptr s,
|
||||
dht::decorated_key dk,
|
||||
query::clustering_key_filter_ranges crr,
|
||||
row_cache& cache,
|
||||
cache::read_context& ctx,
|
||||
partition_snapshot_ptr snp)
|
||||
{
|
||||
return make_flat_mutation_reader<cache::cache_flat_mutation_reader>(
|
||||
return make_flat_mutation_reader_v2<cache::cache_flat_mutation_reader>(
|
||||
std::move(s), std::move(dk), std::move(crr), ctx, std::move(snp), cache);
|
||||
}
|
||||
|
||||
// transfer ownership of ctx to cache_flat_mutation_reader
|
||||
inline flat_mutation_reader make_cache_flat_mutation_reader(schema_ptr s,
|
||||
inline flat_mutation_reader_v2 make_cache_flat_mutation_reader(schema_ptr s,
|
||||
dht::decorated_key dk,
|
||||
query::clustering_key_filter_ranges crr,
|
||||
row_cache& cache,
|
||||
std::unique_ptr<cache::read_context> unique_ctx,
|
||||
partition_snapshot_ptr snp)
|
||||
{
|
||||
return make_flat_mutation_reader<cache::cache_flat_mutation_reader>(
|
||||
return make_flat_mutation_reader_v2<cache::cache_flat_mutation_reader>(
|
||||
std::move(s), std::move(dk), std::move(crr), std::move(unique_ctx), std::move(snp), cache);
|
||||
}
|
||||
|
||||
@@ -25,7 +25,7 @@ namespace cache {
|
||||
class autoupdating_underlying_reader final {
|
||||
row_cache& _cache;
|
||||
read_context& _read_context;
|
||||
flat_mutation_reader_opt _reader;
|
||||
flat_mutation_reader_v2_opt _reader;
|
||||
utils::phased_barrier::phase_type _reader_creation_phase = 0;
|
||||
dht::partition_range _range = { };
|
||||
std::optional<dht::decorated_key> _last_key;
|
||||
@@ -39,7 +39,7 @@ public:
|
||||
: _cache(cache)
|
||||
, _read_context(context)
|
||||
{ }
|
||||
future<mutation_fragment_opt> move_to_next_partition() {
|
||||
future<mutation_fragment_v2_opt> move_to_next_partition() {
|
||||
_last_key = std::move(_new_last_key);
|
||||
auto start = population_range_start();
|
||||
auto phase = _cache.phase_of(start);
|
||||
@@ -109,7 +109,7 @@ public:
|
||||
const dht::partition_range& range() const {
|
||||
return _range;
|
||||
}
|
||||
flat_mutation_reader& underlying() { return *_reader; }
|
||||
flat_mutation_reader_v2& underlying() { return *_reader; }
|
||||
dht::ring_position_view population_range_start() const {
|
||||
return _last_key ? dht::ring_position_view::for_after_key(*_last_key)
|
||||
: dht::ring_position_view::for_range_start(_range);
|
||||
@@ -203,7 +203,7 @@ public:
|
||||
future<> ensure_underlying() {
|
||||
if (_underlying_snapshot) {
|
||||
return create_underlying().then([this] {
|
||||
return _underlying.underlying()().then([this] (mutation_fragment_opt&& mfopt) {
|
||||
return _underlying.underlying()().then([this] (mutation_fragment_v2_opt&& mfopt) {
|
||||
_partition_exists = bool(mfopt);
|
||||
});
|
||||
});
|
||||
|
||||
55
row_cache.cc
55
row_cache.cc
@@ -23,7 +23,6 @@
|
||||
#include "readers/empty.hh"
|
||||
#include "readers/forwardable_v2.hh"
|
||||
#include "readers/nonforwardable.hh"
|
||||
#include "readers/conversion.hh"
|
||||
|
||||
namespace cache {
|
||||
|
||||
@@ -41,10 +40,10 @@ static schema_ptr to_query_domain(const query::partition_slice& slice, schema_pt
|
||||
return table_domain_schema;
|
||||
}
|
||||
|
||||
flat_mutation_reader
|
||||
flat_mutation_reader_v2
|
||||
row_cache::create_underlying_reader(read_context& ctx, mutation_source& src, const dht::partition_range& pr) {
|
||||
schema_ptr entry_schema = to_query_domain(ctx.slice(), _schema);
|
||||
auto reader = src.make_reader(entry_schema, ctx.permit(), pr, ctx.slice(), ctx.pc(), ctx.trace_state(), streamed_mutation::forwarding::yes);
|
||||
auto reader = src.make_reader_v2(entry_schema, ctx.permit(), pr, ctx.slice(), ctx.pc(), ctx.trace_state(), streamed_mutation::forwarding::yes);
|
||||
ctx.on_underlying_created();
|
||||
return reader;
|
||||
}
|
||||
@@ -349,17 +348,17 @@ future<> read_context::create_underlying() {
|
||||
});
|
||||
}
|
||||
|
||||
static flat_mutation_reader read_directly_from_underlying(read_context& reader) {
|
||||
flat_mutation_reader res = make_delegating_reader(reader.underlying().underlying());
|
||||
static flat_mutation_reader_v2 read_directly_from_underlying(read_context& reader) {
|
||||
auto res = make_delegating_reader(reader.underlying().underlying());
|
||||
res.upgrade_schema(reader.schema());
|
||||
return downgrade_to_v1(make_nonforwardable(upgrade_to_v2(std::move(res)), true));
|
||||
return make_nonforwardable(std::move(res), true);
|
||||
}
|
||||
|
||||
// Reader which populates the cache using data from the delegate.
|
||||
class single_partition_populating_reader final : public flat_mutation_reader::impl {
|
||||
class single_partition_populating_reader final : public flat_mutation_reader_v2::impl {
|
||||
row_cache& _cache;
|
||||
std::unique_ptr<read_context> _read_context;
|
||||
flat_mutation_reader_opt _reader;
|
||||
flat_mutation_reader_v2_opt _reader;
|
||||
private:
|
||||
future<> create_reader() {
|
||||
auto src_and_phase = _cache.snapshot_of(_read_context->range().start()->value());
|
||||
@@ -379,7 +378,7 @@ private:
|
||||
} else if (phase == _cache.phase_of(_read_context->range().start()->value())) {
|
||||
_reader = _cache._read_section(_cache._tracker.region(), [&] {
|
||||
cache_entry& e = _cache.find_or_create_incomplete(mfopt->as_partition_start(), phase);
|
||||
return downgrade_to_v1(e.read(_cache, *_read_context, phase));
|
||||
return e.read(_cache, *_read_context, phase);
|
||||
});
|
||||
} else {
|
||||
_cache._tracker.on_mispopulate();
|
||||
@@ -509,7 +508,7 @@ public:
|
||||
, _read_context(ctx)
|
||||
{}
|
||||
|
||||
using read_result = std::tuple<flat_mutation_reader_opt, mutation_fragment_opt>;
|
||||
using read_result = std::tuple<flat_mutation_reader_v2_opt, mutation_fragment_v2_opt>;
|
||||
|
||||
future<read_result> operator()() {
|
||||
return _reader.move_to_next_partition().then([this] (auto&& mfopt) mutable {
|
||||
@@ -529,7 +528,7 @@ public:
|
||||
this->can_set_continuity() ? &*_last_key : nullptr);
|
||||
_last_key = row_cache::previous_entry_pointer(key);
|
||||
return make_ready_future<read_result>(
|
||||
read_result(downgrade_to_v1(e.read(_cache, _read_context, _reader.creation_phase())), std::nullopt));
|
||||
read_result(e.read(_cache, _read_context, _reader.creation_phase()), std::nullopt));
|
||||
});
|
||||
} else {
|
||||
_cache._tracker.on_mispopulate();
|
||||
@@ -558,7 +557,7 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
class scanning_and_populating_reader final : public flat_mutation_reader::impl {
|
||||
class scanning_and_populating_reader final : public flat_mutation_reader_v2::impl {
|
||||
const dht::partition_range* _pr;
|
||||
row_cache& _cache;
|
||||
std::unique_ptr<read_context> _read_context;
|
||||
@@ -569,12 +568,12 @@ class scanning_and_populating_reader final : public flat_mutation_reader::impl {
|
||||
bool _advance_primary = false;
|
||||
std::optional<dht::partition_range::bound> _lower_bound;
|
||||
dht::partition_range _secondary_range;
|
||||
flat_mutation_reader_opt _reader;
|
||||
flat_mutation_reader_v2_opt _reader;
|
||||
private:
|
||||
flat_mutation_reader read_from_entry(cache_entry& ce) {
|
||||
flat_mutation_reader_v2 read_from_entry(cache_entry& ce) {
|
||||
_cache.upgrade_entry(ce);
|
||||
_cache.on_partition_hit();
|
||||
return downgrade_to_v1(ce.read(_cache, *_read_context));
|
||||
return ce.read(_cache, *_read_context);
|
||||
}
|
||||
|
||||
static dht::ring_position_view as_ring_position_view(const std::optional<dht::partition_range::bound>& lower_bound) {
|
||||
@@ -582,8 +581,8 @@ private:
|
||||
: dht::ring_position_view::min();
|
||||
}
|
||||
|
||||
flat_mutation_reader_opt do_read_from_primary() {
|
||||
return _cache._read_section(_cache._tracker.region(), [this] () -> flat_mutation_reader_opt {
|
||||
flat_mutation_reader_v2_opt do_read_from_primary() {
|
||||
return _cache._read_section(_cache._tracker.region(), [this] () -> flat_mutation_reader_v2_opt {
|
||||
bool not_moved = true;
|
||||
if (!_primary.valid()) {
|
||||
not_moved = _primary.advance_to(as_ring_position_view(_lower_bound));
|
||||
@@ -604,7 +603,7 @@ private:
|
||||
_lower_bound = dht::partition_range::bound{e.key(), false};
|
||||
// Delay the call to next() so that we don't see stale continuity on next invocation.
|
||||
_advance_primary = true;
|
||||
return flat_mutation_reader_opt(std::move(fr));
|
||||
return flat_mutation_reader_v2_opt(std::move(fr));
|
||||
} else {
|
||||
if (_primary.in_range()) {
|
||||
cache_entry& e = _primary.entry();
|
||||
@@ -628,24 +627,24 @@ private:
|
||||
});
|
||||
}
|
||||
|
||||
future<flat_mutation_reader_opt> read_from_primary() {
|
||||
future<flat_mutation_reader_v2_opt> read_from_primary() {
|
||||
auto fro = do_read_from_primary();
|
||||
if (!_secondary_in_progress) {
|
||||
return make_ready_future<flat_mutation_reader_opt>(std::move(fro));
|
||||
return make_ready_future<flat_mutation_reader_v2_opt>(std::move(fro));
|
||||
}
|
||||
return _secondary_reader.fast_forward_to(std::move(_secondary_range)).then([this] {
|
||||
return read_from_secondary();
|
||||
});
|
||||
}
|
||||
|
||||
future<flat_mutation_reader_opt> read_from_secondary() {
|
||||
future<flat_mutation_reader_v2_opt> read_from_secondary() {
|
||||
return _secondary_reader().then([this] (range_populating_reader::read_result&& res) {
|
||||
auto&& [fropt, ps] = res;
|
||||
if (fropt) {
|
||||
if (ps) {
|
||||
push_mutation_fragment(std::move(*ps));
|
||||
}
|
||||
return make_ready_future<flat_mutation_reader_opt>(std::move(fropt));
|
||||
return make_ready_future<flat_mutation_reader_v2_opt>(std::move(fropt));
|
||||
} else {
|
||||
_secondary_in_progress = false;
|
||||
return read_from_primary();
|
||||
@@ -718,9 +717,9 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
flat_mutation_reader
|
||||
flat_mutation_reader_v2
|
||||
row_cache::make_scanning_reader(const dht::partition_range& range, std::unique_ptr<read_context> context) {
|
||||
return make_flat_mutation_reader<scanning_and_populating_reader>(*this, range, std::move(context));
|
||||
return make_flat_mutation_reader_v2<scanning_and_populating_reader>(*this, range, std::move(context));
|
||||
}
|
||||
|
||||
flat_mutation_reader_v2_opt
|
||||
@@ -755,7 +754,7 @@ row_cache::make_reader_opt(schema_ptr s,
|
||||
} else {
|
||||
tracing::trace(trace_state, "Range {} not found in cache", range);
|
||||
on_partition_miss();
|
||||
return upgrade_to_v2(make_flat_mutation_reader<single_partition_populating_reader>(*this, make_context()));
|
||||
return make_flat_mutation_reader_v2<single_partition_populating_reader>(*this, make_context());
|
||||
}
|
||||
});
|
||||
|
||||
@@ -768,7 +767,7 @@ row_cache::make_reader_opt(schema_ptr s,
|
||||
|
||||
tracing::trace(trace_state, "Scanning cache for range {} and slice {}",
|
||||
range, seastar::value_of([&slice] { return slice.get_all_ranges(); }));
|
||||
auto mr = upgrade_to_v2(make_scanning_reader(range, make_context()));
|
||||
auto mr = make_scanning_reader(range, make_context());
|
||||
if (fwd == streamed_mutation::forwarding::yes) {
|
||||
return make_forwardable(std::move(mr));
|
||||
} else {
|
||||
@@ -1286,7 +1285,7 @@ flat_mutation_reader_v2 cache_entry::do_read(row_cache& rc, read_context& reader
|
||||
auto r = make_cache_flat_mutation_reader(entry_schema, _key, std::move(ckr), rc, reader, std::move(snp));
|
||||
r.upgrade_schema(to_query_domain(reader.slice(), rc.schema()));
|
||||
r.upgrade_schema(reader.schema());
|
||||
return upgrade_to_v2(std::move(r));
|
||||
return r;
|
||||
}
|
||||
|
||||
flat_mutation_reader_v2 cache_entry::do_read(row_cache& rc, std::unique_ptr<read_context> unique_ctx) {
|
||||
@@ -1298,7 +1297,7 @@ flat_mutation_reader_v2 cache_entry::do_read(row_cache& rc, std::unique_ptr<read
|
||||
auto r = make_cache_flat_mutation_reader(entry_schema, _key, std::move(ckr), rc, std::move(unique_ctx), std::move(snp));
|
||||
r.upgrade_schema(rc_schema);
|
||||
r.upgrade_schema(reader_schema);
|
||||
return upgrade_to_v2(std::move(r));
|
||||
return r;
|
||||
}
|
||||
|
||||
const schema_ptr& row_cache::schema() const {
|
||||
|
||||
@@ -257,8 +257,8 @@ private:
|
||||
logalloc::allocating_section _update_section;
|
||||
logalloc::allocating_section _populate_section;
|
||||
logalloc::allocating_section _read_section;
|
||||
flat_mutation_reader create_underlying_reader(cache::read_context&, mutation_source&, const dht::partition_range&);
|
||||
flat_mutation_reader make_scanning_reader(const dht::partition_range&, std::unique_ptr<cache::read_context>);
|
||||
flat_mutation_reader_v2 create_underlying_reader(cache::read_context&, mutation_source&, const dht::partition_range&);
|
||||
flat_mutation_reader_v2 make_scanning_reader(const dht::partition_range&, std::unique_ptr<cache::read_context>);
|
||||
void on_partition_hit();
|
||||
void on_partition_miss();
|
||||
void on_row_hit();
|
||||
|
||||
@@ -2557,16 +2557,11 @@ SEASTAR_TEST_CASE(test_exception_safety_of_update_from_memtable) {
|
||||
.produces(muts2)
|
||||
.produces_end_of_stream();
|
||||
|
||||
/*FIXME: broken by upgrade_to_v2() used inside row_cache::make_reader()
|
||||
* because set_max_buffer_size() applies to the top level upgrading
|
||||
* reader, instead of to the row cache reader, as the test expects.
|
||||
* Will restore once the above has a native v2 implementation.
|
||||
rd1_v1.produces(muts[0])
|
||||
.produces(muts2[1])
|
||||
.produces(muts2[2])
|
||||
.produces(muts2[3])
|
||||
.produces_end_of_stream();
|
||||
*/
|
||||
});
|
||||
tracker.cleaner().drain().get();
|
||||
BOOST_REQUIRE_EQUAL(0, tracker.get_stats().rows);
|
||||
|
||||
Reference in New Issue
Block a user