row_cache: cache_entry::read(): return v2 reader

Push the conversion down one level. Soon we will make cache flat
mutation reader a v2 reader, this keeps the related noise separate.
This commit is contained in:
Botond Dénes
2022-04-05 14:28:26 +03:00
parent 0b035c9099
commit 2a0d7e8a1d
2 changed files with 18 additions and 18 deletions

View File

@@ -378,7 +378,7 @@ private:
} else if (phase == _cache.phase_of(_read_context->range().start()->value())) {
_reader = _cache._read_section(_cache._tracker.region(), [&] {
cache_entry& e = _cache.find_or_create_incomplete(mfopt->as_partition_start(), phase);
return e.read(_cache, *_read_context, phase);
return downgrade_to_v1(e.read(_cache, *_read_context, phase));
});
} else {
_cache._tracker.on_mispopulate();
@@ -528,7 +528,7 @@ public:
this->can_set_continuity() ? &*_last_key : nullptr);
_last_key = row_cache::previous_entry_pointer(key);
return make_ready_future<read_result>(
read_result(e.read(_cache, _read_context, _reader.creation_phase()), std::nullopt));
read_result(downgrade_to_v1(e.read(_cache, _read_context, _reader.creation_phase())), std::nullopt));
});
} else {
_cache._tracker.on_mispopulate();
@@ -573,7 +573,7 @@ private:
flat_mutation_reader read_from_entry(cache_entry& ce) {
_cache.upgrade_entry(ce);
_cache.on_partition_hit();
return ce.read(_cache, *_read_context);
return downgrade_to_v1(ce.read(_cache, *_read_context));
}
static dht::ring_position_view as_ring_position_view(const std::optional<dht::partition_range::bound>& lower_bound) {
@@ -748,7 +748,7 @@ row_cache::make_reader_opt(schema_ptr s,
cache_entry& e = *i;
upgrade_entry(e);
on_partition_hit();
return upgrade_to_v2(e.read(*this, make_context()));
return e.read(*this, make_context());
} else if (i->continuous()) {
return {};
} else {
@@ -1255,40 +1255,40 @@ void rows_entry::on_evicted() noexcept {
on_evicted(*current_tracker);
}
flat_mutation_reader cache_entry::read(row_cache& rc, read_context& reader) {
flat_mutation_reader_v2 cache_entry::read(row_cache& rc, read_context& reader) {
auto source_and_phase = rc.snapshot_of(_key);
reader.enter_partition(_key, source_and_phase.snapshot, source_and_phase.phase);
return do_read(rc, reader);
}
flat_mutation_reader cache_entry::read(row_cache& rc, read_context& reader, row_cache::phase_type phase) {
flat_mutation_reader_v2 cache_entry::read(row_cache& rc, read_context& reader, row_cache::phase_type phase) {
reader.enter_partition(_key, phase);
return do_read(rc, reader);
}
flat_mutation_reader cache_entry::read(row_cache& rc, std::unique_ptr<read_context> unique_ctx) {
flat_mutation_reader_v2 cache_entry::read(row_cache& rc, std::unique_ptr<read_context> unique_ctx) {
auto source_and_phase = rc.snapshot_of(_key);
unique_ctx->enter_partition(_key, source_and_phase.snapshot, source_and_phase.phase);
return do_read(rc, std::move(unique_ctx));
}
flat_mutation_reader cache_entry::read(row_cache& rc, std::unique_ptr<read_context> unique_ctx, row_cache::phase_type phase) {
flat_mutation_reader_v2 cache_entry::read(row_cache& rc, std::unique_ptr<read_context> unique_ctx, row_cache::phase_type phase) {
unique_ctx->enter_partition(_key, phase);
return do_read(rc, std::move(unique_ctx));
}
// Assumes reader is in the corresponding partition
flat_mutation_reader cache_entry::do_read(row_cache& rc, read_context& reader) {
flat_mutation_reader_v2 cache_entry::do_read(row_cache& rc, read_context& reader) {
auto snp = _pe.read(rc._tracker.region(), rc._tracker.cleaner(), _schema, &rc._tracker, reader.phase());
auto ckr = query::clustering_key_filter_ranges::get_native_ranges(*_schema, reader.native_slice(), _key.key());
schema_ptr entry_schema = to_query_domain(reader.slice(), _schema);
auto r = make_cache_flat_mutation_reader(entry_schema, _key, std::move(ckr), rc, reader, std::move(snp));
r.upgrade_schema(to_query_domain(reader.slice(), rc.schema()));
r.upgrade_schema(reader.schema());
return r;
return upgrade_to_v2(std::move(r));
}
flat_mutation_reader cache_entry::do_read(row_cache& rc, std::unique_ptr<read_context> unique_ctx) {
flat_mutation_reader_v2 cache_entry::do_read(row_cache& rc, std::unique_ptr<read_context> unique_ctx) {
auto snp = _pe.read(rc._tracker.region(), rc._tracker.cleaner(), _schema, &rc._tracker, unique_ctx->phase());
auto ckr = query::clustering_key_filter_ranges::get_native_ranges(*_schema, unique_ctx->native_slice(), _key.key());
schema_ptr reader_schema = unique_ctx->schema();
@@ -1297,7 +1297,7 @@ flat_mutation_reader cache_entry::do_read(row_cache& rc, std::unique_ptr<read_co
auto r = make_cache_flat_mutation_reader(entry_schema, _key, std::move(ckr), rc, std::move(unique_ctx), std::move(snp));
r.upgrade_schema(rc_schema);
r.upgrade_schema(reader_schema);
return r;
return upgrade_to_v2(std::move(r));
}
const schema_ptr& row_cache::schema() const {

View File

@@ -63,8 +63,8 @@ class cache_entry {
} _flags{};
friend class size_calculator;
flat_mutation_reader do_read(row_cache&, cache::read_context& ctx);
flat_mutation_reader do_read(row_cache&, std::unique_ptr<cache::read_context> unique_ctx);
flat_mutation_reader_v2 do_read(row_cache&, cache::read_context& ctx);
flat_mutation_reader_v2 do_read(row_cache&, std::unique_ptr<cache::read_context> unique_ctx);
public:
friend class row_cache;
friend class cache_tracker;
@@ -132,10 +132,10 @@ public:
partition_entry& partition() { return _pe; }
const schema_ptr& schema() const noexcept { return _schema; }
schema_ptr& schema() noexcept { return _schema; }
flat_mutation_reader read(row_cache&, cache::read_context&);
flat_mutation_reader read(row_cache&, std::unique_ptr<cache::read_context>);
flat_mutation_reader read(row_cache&, cache::read_context&, utils::phased_barrier::phase_type);
flat_mutation_reader read(row_cache&, std::unique_ptr<cache::read_context>, utils::phased_barrier::phase_type);
flat_mutation_reader_v2 read(row_cache&, cache::read_context&);
flat_mutation_reader_v2 read(row_cache&, std::unique_ptr<cache::read_context>);
flat_mutation_reader_v2 read(row_cache&, cache::read_context&, utils::phased_barrier::phase_type);
flat_mutation_reader_v2 read(row_cache&, std::unique_ptr<cache::read_context>, utils::phased_barrier::phase_type);
bool continuous() const noexcept { return _flags._continuous; }
void set_continuous(bool value) noexcept { _flags._continuous = value; }