db/row_cache: add overlap-check for cache tombstone garbage collection

The cache should not garbage-collect tombstone which cover data in the
memtable. Add overlap checks (get_max_purgeable) to garbage collection
to detect tombstones which cover data in the memtable and to prevent
their garbage collection.

(cherry picked from commit 6b5b563ef7)
This commit is contained in:
Botond Dénes
2025-03-12 07:06:36 -04:00
parent 4bb1969a7f
commit b43d024ffb
6 changed files with 45 additions and 14 deletions

View File

@@ -123,6 +123,9 @@ class cache_mutation_reader final : public mutation_reader::impl {
gc_clock::time_point _read_time;
gc_clock::time_point _gc_before;
api::timestamp_type _max_purgeable_timestamp = api::missing_timestamp;
api::timestamp_type _max_purgeable_timestamp_shadowable = api::missing_timestamp;
future<> do_fill_buffer();
future<> ensure_underlying();
void copy_from_cache_to_buffer();
@@ -207,6 +210,11 @@ class cache_mutation_reader final : public mutation_reader::impl {
return gc_clock::time_point::min();
}
bool can_gc(tombstone t, is_shadowable is) const {
const auto max_purgeable = is ? _max_purgeable_timestamp_shadowable : _max_purgeable_timestamp;
return t.timestamp < max_purgeable;
}
public:
cache_mutation_reader(schema_ptr s,
dht::decorated_key dk,
@@ -228,8 +236,19 @@ public:
, _read_time(get_read_time())
, _gc_before(get_gc_before(*_schema, dk, _read_time))
{
clogger.trace("csm {}: table={}.{}, reversed={}, snap={}", fmt::ptr(this), _schema->ks_name(), _schema->cf_name(), _read_context.is_reversed(),
fmt::ptr(&*_snp));
_max_purgeable_timestamp = ctx.get_max_purgeable(dk, is_shadowable::no);
_max_purgeable_timestamp_shadowable = ctx.get_max_purgeable(dk, is_shadowable::yes);
clogger.trace("csm {}: table={}.{}, dk={}, gc-before={}, max-purgeable-regular={}, max-purgeable-shadowable={}, reversed={}, snap={}",
fmt::ptr(this),
_schema->ks_name(),
_schema->cf_name(),
dk,
_gc_before,
_max_purgeable_timestamp,
_max_purgeable_timestamp_shadowable,
_read_context.is_reversed(),
fmt::ptr(&*_snp));
push_mutation_fragment(*_schema, _permit, partition_start(std::move(dk), _snp->partition_tombstone()));
}
cache_mutation_reader(schema_ptr s,
@@ -787,12 +806,12 @@ void cache_mutation_reader::copy_from_cache_to_buffer() {
t.apply(range_tomb);
auto row_tomb_expired = [&](row_tombstone tomb) {
return (tomb && tomb.max_deletion_time() < _gc_before);
return (tomb && tomb.max_deletion_time() < _gc_before && can_gc(tomb.tomb(), tomb.is_shadowable()));
};
auto is_row_dead = [&](const deletable_row& row) {
auto& m = row.marker();
return (!m.is_missing() && m.is_dead(_read_time) && m.deletion_time() < _gc_before);
return (!m.is_missing() && m.is_dead(_read_time) && m.deletion_time() < _gc_before && can_gc(tombstone(m.timestamp(), m.deletion_time()), is_shadowable::no));
};
if (row_tomb_expired(t) || is_row_dead(row)) {
@@ -800,9 +819,11 @@ void cache_mutation_reader::copy_from_cache_to_buffer() {
_read_context.cache()._tracker.on_row_compacted();
auto mutation_can_gc = can_gc_fn([this] (tombstone t, is_shadowable is) { return can_gc(t, is); });
with_allocator(_snp->region().allocator(), [&] {
deletable_row row_copy(row_schema, row);
row_copy.compact_and_expire(row_schema, t.tomb(), _read_time, always_gc, _gc_before, nullptr);
row_copy.compact_and_expire(row_schema, t.tomb(), _read_time, mutation_can_gc, _gc_before, nullptr);
std::swap(row, row_copy);
});
remove_row = row.empty();

View File

@@ -126,6 +126,7 @@ class read_context final : public enable_lw_shared_from_this<read_context> {
mutation_reader::forwarding _fwd_mr;
bool _range_query;
const tombstone_gc_state* _tombstone_gc_state;
max_purgeable_fn _get_max_purgeable;
// When reader enters a partition, it must be set up for reading that
// partition from the underlying mutation source (_underlying) in one of two ways:
//
@@ -149,6 +150,7 @@ public:
const dht::partition_range& range,
const query::partition_slice& slice,
const tombstone_gc_state* gc_state,
max_purgeable_fn get_max_purgeable,
tracing::trace_state_ptr trace_state,
mutation_reader::forwarding fwd_mr)
: _cache(cache)
@@ -160,6 +162,7 @@ public:
, _fwd_mr(fwd_mr)
, _range_query(!query::is_single_partition(range))
, _tombstone_gc_state(gc_state)
, _get_max_purgeable(std::move(get_max_purgeable))
, _underlying(_cache, *this)
{
++_cache._tracker._stats.reads;
@@ -195,6 +198,7 @@ public:
void on_underlying_created() { ++_underlying_created; }
bool digest_requested() const { return _slice.options.contains<query::partition_slice::option::with_digest>(); }
const tombstone_gc_state* tombstone_gc_state() const { return _tombstone_gc_state; }
api::timestamp_type get_max_purgeable(const dht::decorated_key& dk, is_shadowable is) const { return _get_max_purgeable(dk, is); }
public:
future<> ensure_underlying() {
if (_underlying_snapshot) {

View File

@@ -249,7 +249,8 @@ table::make_reader_v2(schema_ptr s,
const auto bypass_cache = slice.options.contains(query::partition_slice::option::bypass_cache);
if (cache_enabled() && !bypass_cache) {
if (auto reader_opt = _cache.make_reader_opt(s, permit, range, slice, &_compaction_manager.get_tombstone_gc_state(), std::move(trace_state), fwd, fwd_mr)) {
if (auto reader_opt = _cache.make_reader_opt(s, permit, range, slice, &_compaction_manager.get_tombstone_gc_state(),
get_max_purgeable_fn_for_cache_underlying_reader(), std::move(trace_state), fwd, fwd_mr)) {
readers.emplace_back(std::move(*reader_opt));
}
} else {

View File

@@ -775,12 +775,13 @@ row_cache::make_reader_opt(schema_ptr s,
const dht::partition_range& range,
const query::partition_slice& slice,
const tombstone_gc_state* gc_state,
max_purgeable_fn get_max_purgeable,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr)
{
auto make_context = [&] {
return std::make_unique<read_context>(*this, s, permit, range, slice, gc_state, trace_state, fwd_mr);
return std::make_unique<read_context>(*this, s, permit, range, slice, gc_state, get_max_purgeable, trace_state, fwd_mr);
};
if (query::is_single_partition(range) && !fwd_mr) {

View File

@@ -24,6 +24,7 @@
#include "db/cache_tracker.hh"
#include "readers/empty_v2.hh"
#include "readers/mutation_source.hh"
#include "compaction/compaction_garbage_collector.hh"
namespace bi = boost::intrusive;
@@ -376,8 +377,9 @@ public:
tracing::trace_state_ptr trace_state = nullptr,
streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no,
mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::no,
const tombstone_gc_state* gc_state = nullptr) {
if (auto reader_opt = make_reader_opt(s, permit, range, slice, gc_state, std::move(trace_state), fwd, fwd_mr)) {
const tombstone_gc_state* gc_state = nullptr,
max_purgeable_fn get_max_purgeable = can_never_purge) {
if (auto reader_opt = make_reader_opt(s, permit, range, slice, gc_state, std::move(get_max_purgeable), std::move(trace_state), fwd, fwd_mr)) {
return std::move(*reader_opt);
}
[[unlikely]] return make_empty_flat_reader_v2(std::move(s), std::move(permit));
@@ -389,6 +391,7 @@ public:
const dht::partition_range&,
const query::partition_slice&,
const tombstone_gc_state*,
max_purgeable_fn get_max_purgeable,
tracing::trace_state_ptr trace_state = nullptr,
streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no,
mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::no);
@@ -396,10 +399,11 @@ public:
mutation_reader make_reader(schema_ptr s,
reader_permit permit,
const dht::partition_range& range = query::full_partition_range,
const tombstone_gc_state* gc_state = nullptr) {
const tombstone_gc_state* gc_state = nullptr,
max_purgeable_fn get_max_purgeable = can_never_purge) {
auto& full_slice = s->full_slice();
return make_reader(std::move(s), std::move(permit), range, full_slice, nullptr,
streamed_mutation::forwarding::no, mutation_reader::forwarding::no, gc_state);
streamed_mutation::forwarding::no, mutation_reader::forwarding::no, gc_state, std::move(get_max_purgeable));
}
// Only reads what is in the cache, doesn't populate.

View File

@@ -4582,7 +4582,7 @@ SEASTAR_TEST_CASE(test_cache_compacts_expired_tombstones_on_read) {
return gc_clock::now() - (std::chrono::seconds(s->gc_grace_seconds().count() + 600));
});
auto rd1 = cache.make_reader(s, semaphore.make_permit(), query::full_partition_range, &gc_state);
auto rd1 = cache.make_reader(s, semaphore.make_permit(), query::full_partition_range, &gc_state, can_always_purge);
auto close_rd = deferred_close(rd1);
rd1.fill_buffer().get(); // cache_mutation_reader compacts cache on fill buffer
@@ -4660,7 +4660,7 @@ SEASTAR_TEST_CASE(test_compact_range_tombstones_on_read) {
set_cells_timestamp_to_min(cp.clustered_row(*s.schema(), ck3));
{
auto rd1 = cache.make_reader(s.schema(), semaphore.make_permit(), pr, &gc_state);
auto rd1 = cache.make_reader(s.schema(), semaphore.make_permit(), pr, &gc_state, can_always_purge);
auto close_rd1 = deferred_close(rd1);
rd1.fill_buffer().get();
@@ -4672,7 +4672,7 @@ SEASTAR_TEST_CASE(test_compact_range_tombstones_on_read) {
}
{
auto rd2 = cache.make_reader(s.schema(), semaphore.make_permit(), pr, &gc_state);
auto rd2 = cache.make_reader(s.schema(), semaphore.make_permit(), pr, &gc_state, can_always_purge);
auto close_rd2 = deferred_close(rd2);
rd2.fill_buffer().get();