compact and remove expired rows from cache on read

when read from cache compact and expire row tombstones
remove expired empty rows from cache
do not expire range tombstones in this patch

Refs #2252, #6033

Closes #12917
This commit is contained in:
Alexey Novikov
2023-02-16 14:59:36 +03:00
committed by Tomasz Grabiec
parent b23361977b
commit ca4e7f91c6
8 changed files with 168 additions and 10 deletions

View File

@@ -110,6 +110,9 @@ class cache_flat_mutation_reader final : public flat_mutation_reader_v2::impl {
flat_mutation_reader_v2* _underlying = nullptr;
flat_mutation_reader_v2_opt _underlying_holder;
gc_clock::time_point _read_time;
gc_clock::time_point _gc_before;
future<> do_fill_buffer();
future<> ensure_underlying();
void copy_from_cache_to_buffer();
@@ -178,6 +181,20 @@ class cache_flat_mutation_reader final : public flat_mutation_reader_v2::impl {
const schema& table_schema() {
return *_snp->schema();
}
gc_clock::time_point get_read_time() {
return _read_context.tombstone_gc_state() ? gc_clock::now() : gc_clock::time_point::min();
}
gc_clock::time_point get_gc_before(const schema& schema, dht::decorated_key dk, const gc_clock::time_point query_time) {
auto gc_state = _read_context.tombstone_gc_state();
if (gc_state) {
return gc_state->get_gc_before_for_key(schema.shared_from_this(), dk, query_time);
}
return gc_clock::time_point::min();
}
public:
cache_flat_mutation_reader(schema_ptr s,
dht::decorated_key dk,
@@ -196,6 +213,8 @@ public:
, _read_context_holder()
, _read_context(ctx) // ctx is owned by the caller, who's responsible for closing it.
, _next_row(*_schema, *_snp, false, _read_context.is_reversed())
, _read_time(get_read_time())
, _gc_before(get_gc_before(*_schema, dk, _read_time))
{
clogger.trace("csm {}: table={}.{}, reversed={}, snap={}", fmt::ptr(this), _schema->ks_name(), _schema->cf_name(), _read_context.is_reversed(),
fmt::ptr(&*_snp));
@@ -730,9 +749,40 @@ void cache_flat_mutation_reader::copy_from_cache_to_buffer() {
}
}
// We add the row to the buffer even when it's full.
// This simplifies the code. For more info see #3139.
if (_next_row_in_range) {
bool remove_row = false;
if (_read_context.tombstone_gc_state() // do not compact rows when tombstone_gc_state is not set (used in some unit tests)
&& !_next_row.dummy()
&& _snp->at_latest_version()
&& _snp->at_oldest_version()) {
deletable_row& row = _next_row.latest_row();
auto t = row.deleted_at();
auto row_tomb_expired = [&](row_tombstone tomb) {
return (tomb && tomb.max_deletion_time() < _gc_before);
};
auto is_row_dead = [&](const deletable_row& row) {
auto& m = row.marker();
return (!m.is_missing() && m.is_dead(_read_time) && m.deletion_time() < _gc_before);
};
if (row_tomb_expired(t) || is_row_dead(row)) {
can_gc_fn always_gc = [&](tombstone) { return true; };
const schema& row_schema = _next_row.latest_row_schema();
_read_context.cache()._tracker.on_row_compacted();
with_allocator(_snp->region().allocator(), [&] {
deletable_row row_copy(row_schema, row);
row_copy.compact_and_expire(row_schema, t.tomb(), _read_time, always_gc, _gc_before, nullptr);
std::swap(row, row_copy);
});
remove_row = row.empty();
}
}
if (_next_row.range_tombstone_for_row() != _current_tombstone) [[unlikely]] {
auto tomb = _next_row.range_tombstone_for_row();
auto new_lower_bound = position_in_partition::before_key(_next_row.position());
@@ -742,8 +792,31 @@ void cache_flat_mutation_reader::copy_from_cache_to_buffer() {
_current_tombstone = tomb;
_read_context.cache()._tracker.on_range_tombstone_read();
}
if (remove_row) {
_read_context.cache()._tracker.on_row_compacted_away();
_lower_bound = position_in_partition::after_key(*_schema, _next_row.position());
partition_snapshot_row_weakref row_ref(_next_row);
move_to_next_entry();
with_allocator(_snp->region().allocator(), [&] {
cache_tracker& tracker = _read_context.cache()._tracker;
if (row_ref->is_linked()) {
tracker.get_lru().remove(*row_ref);
}
row_ref->on_evicted(tracker);
});
_snp->region().allocator().invalidate_references();
_next_row.force_valid();
} else {
// We add the row to the buffer even when it's full.
// This simplifies the code. For more info see #3139.
add_to_buffer(_next_row);
move_to_next_entry();
}
} else {
move_to_next_range();
}

View File

@@ -68,6 +68,8 @@ public:
uint64_t pinned_dirty_memory_overload;
uint64_t range_tombstone_reads;
uint64_t row_tombstone_reads;
uint64_t rows_compacted;
uint64_t rows_compacted_away;
uint64_t active_reads() const {
return reads - reads_done;
@@ -115,6 +117,8 @@ public:
void on_row_merged_from_memtable() noexcept { ++_stats.rows_merged_from_memtable; }
void on_range_tombstone_read() noexcept { ++_stats.range_tombstone_reads; }
void on_row_tombstone_read() noexcept { ++_stats.row_tombstone_reads; }
void on_row_compacted() noexcept { ++_stats.rows_compacted; }
void on_row_compacted_away() noexcept { ++_stats.rows_compacted_away; }
void pinned_dirty_memory_overload(uint64_t bytes) noexcept;
allocation_strategy& allocator() noexcept;
logalloc::region& region() noexcept;

View File

@@ -564,6 +564,15 @@ public:
return cr;
}
const schema& latest_row_schema() const noexcept {
return *_current_row[0].schema;
}
// Can be called only when cursor is valid and pointing at a row.
deletable_row& latest_row() const noexcept {
return _current_row[0].it->row();
}
// Can be called only when cursor is valid and pointing at a row.
void latest_row_prepare_hash() const {
_current_row[0].it->row().cells().prepare_hash(*_current_row[0].schema, column_kind::regular_column);

View File

@@ -126,6 +126,7 @@ class read_context final : public enable_lw_shared_from_this<read_context> {
tracing::trace_state_ptr _trace_state;
mutation_reader::forwarding _fwd_mr;
bool _range_query;
const tombstone_gc_state* _tombstone_gc_state;
// When reader enters a partition, it must be set up for reading that
// partition from the underlying mutation source (_underlying) in one of two ways:
//
@@ -148,6 +149,7 @@ public:
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
const tombstone_gc_state* gc_state,
tracing::trace_state_ptr trace_state,
mutation_reader::forwarding fwd_mr)
: _cache(cache)
@@ -158,6 +160,7 @@ public:
, _trace_state(std::move(trace_state))
, _fwd_mr(fwd_mr)
, _range_query(!query::is_single_partition(range))
, _tombstone_gc_state(gc_state)
, _underlying(_cache, *this)
{
if (_slice.options.contains(query::partition_slice::option::reversed)) {
@@ -195,6 +198,7 @@ public:
bool partition_exists() const { return _partition_exists; }
void on_underlying_created() { ++_underlying_created; }
bool digest_requested() const { return _slice.options.contains<query::partition_slice::option::with_digest>(); }
const tombstone_gc_state* tombstone_gc_state() const { return _tombstone_gc_state; }
public:
future<> ensure_underlying() {
if (_underlying_snapshot) {

View File

@@ -261,7 +261,7 @@ table::make_reader_v2(schema_ptr s,
const auto bypass_cache = slice.options.contains(query::partition_slice::option::bypass_cache);
if (cache_enabled() && !bypass_cache && !(reversed && _config.reversed_reads_auto_bypass_cache())) {
if (auto reader_opt = _cache.make_reader_opt(s, permit, range, slice, std::move(trace_state), fwd, fwd_mr)) {
if (auto reader_opt = _cache.make_reader_opt(s, permit, range, slice, &_compaction_manager.get_tombstone_gc_state(), std::move(trace_state), fwd, fwd_mr)) {
readers.emplace_back(std::move(*reader_opt));
}
} else {

View File

@@ -140,6 +140,10 @@ cache_tracker::setup_metrics() {
sm::description("total amount of range tombstones processed during read")),
sm::make_counter("row_tombstone_reads", _stats.row_tombstone_reads,
sm::description("total amount of row tombstones processed during read")),
sm::make_counter("rows_compacted", _stats.rows_compacted,
sm::description("total amount of attempts to compact expired rows during read")),
sm::make_counter("rows_compacted_away", _stats.rows_compacted_away,
sm::description("total amount of compacted and removed rows during read")),
});
}
@@ -725,12 +729,13 @@ row_cache::make_reader_opt(schema_ptr s,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
const tombstone_gc_state* gc_state,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr)
{
auto make_context = [&] {
return std::make_unique<read_context>(*this, s, std::move(permit), range, slice, trace_state, fwd_mr);
return std::make_unique<read_context>(*this, s, std::move(permit), range, slice, gc_state, trace_state, fwd_mr);
};
if (query::is_single_partition(range) && !fwd_mr) {

View File

@@ -358,14 +358,16 @@ public:
// User needs to ensure that the row_cache object stays alive
// as long as the reader is used.
// The range must not wrap around.
flat_mutation_reader_v2 make_reader(schema_ptr s,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
tracing::trace_state_ptr trace_state = nullptr,
streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no,
mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::no) {
if (auto reader_opt = make_reader_opt(s, permit, range, slice, std::move(trace_state), fwd, fwd_mr)) {
mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::no,
const tombstone_gc_state* gc_state = nullptr) {
if (auto reader_opt = make_reader_opt(s, permit, range, slice, gc_state, std::move(trace_state), fwd, fwd_mr)) {
return std::move(*reader_opt);
}
[[unlikely]] return make_empty_flat_reader_v2(std::move(s), std::move(permit));
@@ -376,13 +378,18 @@ public:
reader_permit permit,
const dht::partition_range&,
const query::partition_slice&,
const tombstone_gc_state*,
tracing::trace_state_ptr trace_state = nullptr,
streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no,
mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::no);
flat_mutation_reader_v2 make_reader(schema_ptr s, reader_permit permit, const dht::partition_range& range = query::full_partition_range) {
flat_mutation_reader_v2 make_reader(schema_ptr s,
reader_permit permit,
const dht::partition_range& range = query::full_partition_range,
const tombstone_gc_state* gc_state = nullptr) {
auto& full_slice = s->full_slice();
return make_reader(std::move(s), std::move(permit), range, full_slice);
return make_reader(std::move(s), std::move(permit), range, full_slice, nullptr,
streamed_mutation::forwarding::no, mutation_reader::forwarding::no, gc_state);
}
const stats& stats() const { return _stats; }

View File

@@ -4543,3 +4543,59 @@ SEASTAR_THREAD_TEST_CASE(test_digest_read_during_schema_upgrade) {
m2.upgrade(s2);
assert_that(std::move(rd)).produces(m2);
}
SEASTAR_TEST_CASE(test_cache_compacts_expired_tombstones_on_read) {
return seastar::async([] {
auto s = schema_builder("ks", "cf")
.with_column("pk", int32_type, column_kind::partition_key)
.with_column("ck", int32_type, column_kind::clustering_key)
.with_column("v", int32_type)
.build();
tests::reader_concurrency_semaphore_wrapper semaphore;
auto pkey = tests::generate_partition_key(s);
auto make_ck = [&s] (int v) {
return clustering_key::from_deeply_exploded(*s, {data_value{v}});
};
auto make_prefix = [&s] (int v) {
return clustering_key_prefix::from_deeply_exploded(*s, {data_value{v}});
};
auto ck1 = make_ck(1);
auto ck2 = make_ck(2);
auto ck3 = make_ck(3);
auto dt_noexp = gc_clock::now();
auto dt_exp = gc_clock::now() - std::chrono::seconds(s->gc_grace_seconds().count() + 1);
auto mt = make_lw_shared<replica::memtable>(s);
cache_tracker tracker;
row_cache cache(s, snapshot_source_from_snapshot(mt->as_data_source()), tracker);
{
mutation m(s, pkey);
m.set_clustered_cell(ck1, "v", data_value(101), 1);
m.partition().apply_delete(*s, make_prefix(2), tombstone(1, dt_noexp)); // create non-expired tombstone
m.partition().apply_delete(*s, make_prefix(3), tombstone(2, dt_exp)); // create expired tombstone
cache.populate(m);
}
tombstone_gc_state gc_state(nullptr);
auto rd1 = cache.make_reader(s, semaphore.make_permit(), query::full_partition_range, &gc_state);
auto close_rd = deferred_close(rd1);
rd1.fill_buffer().get(); // cache_flat_mutation_reader compacts cache on fill buffer
cache_entry& entry = cache.lookup(pkey);
auto& cp = entry.partition().version()->partition();
BOOST_REQUIRE(cp.find_row(*s, ck1) != nullptr); // live row is in cache
BOOST_REQUIRE_EQUAL(cp.clustered_row(*s, ck2).deleted_at(), row_tombstone(tombstone(1, dt_noexp))); // non-expired tombstone is in cache
BOOST_REQUIRE(cp.find_row(*s, ck3) == nullptr); // expired tombstone isn't in cache
// check tracker stats
auto &tracker_stats = tracker.get_stats();
BOOST_REQUIRE(tracker_stats.rows_compacted == 1);
BOOST_REQUIRE(tracker_stats.rows_compacted_away == 1);
});
}