diff --git a/cache_mutation_reader.hh b/cache_mutation_reader.hh index f04e55c452..94d971138f 100644 --- a/cache_mutation_reader.hh +++ b/cache_mutation_reader.hh @@ -123,6 +123,9 @@ class cache_mutation_reader final : public mutation_reader::impl { gc_clock::time_point _read_time; gc_clock::time_point _gc_before; + api::timestamp_type _max_purgeable_timestamp = api::missing_timestamp; + api::timestamp_type _max_purgeable_timestamp_shadowable = api::missing_timestamp; + future<> do_fill_buffer(); future<> ensure_underlying(); void copy_from_cache_to_buffer(); @@ -207,6 +210,11 @@ class cache_mutation_reader final : public mutation_reader::impl { return gc_clock::time_point::min(); } + bool can_gc(tombstone t, is_shadowable is) const { + const auto max_purgeable = is ? _max_purgeable_timestamp_shadowable : _max_purgeable_timestamp; + return t.timestamp < max_purgeable; + } + public: cache_mutation_reader(schema_ptr s, dht::decorated_key dk, @@ -228,8 +236,19 @@ public: , _read_time(get_read_time()) , _gc_before(get_gc_before(*_schema, dk, _read_time)) { - clogger.trace("csm {}: table={}.{}, reversed={}, snap={}", fmt::ptr(this), _schema->ks_name(), _schema->cf_name(), _read_context.is_reversed(), - fmt::ptr(&*_snp)); + _max_purgeable_timestamp = ctx.get_max_purgeable(dk, is_shadowable::no); + _max_purgeable_timestamp_shadowable = ctx.get_max_purgeable(dk, is_shadowable::yes); + + clogger.trace("csm {}: table={}.{}, dk={}, gc-before={}, max-purgeable-regular={}, max-purgeable-shadowable={}, reversed={}, snap={}", + fmt::ptr(this), + _schema->ks_name(), + _schema->cf_name(), + dk, + _gc_before, + _max_purgeable_timestamp, + _max_purgeable_timestamp_shadowable, + _read_context.is_reversed(), + fmt::ptr(&*_snp)); push_mutation_fragment(*_schema, _permit, partition_start(std::move(dk), _snp->partition_tombstone())); } cache_mutation_reader(schema_ptr s, @@ -787,12 +806,12 @@ void cache_mutation_reader::copy_from_cache_to_buffer() { t.apply(range_tomb); auto row_tomb_expired = [&](row_tombstone tomb) { - return (tomb && tomb.max_deletion_time() < _gc_before); + return (tomb && tomb.max_deletion_time() < _gc_before && can_gc(tomb.tomb(), tomb.is_shadowable())); }; auto is_row_dead = [&](const deletable_row& row) { auto& m = row.marker(); - return (!m.is_missing() && m.is_dead(_read_time) && m.deletion_time() < _gc_before); + return (!m.is_missing() && m.is_dead(_read_time) && m.deletion_time() < _gc_before && can_gc(tombstone(m.timestamp(), m.deletion_time()), is_shadowable::no)); }; if (row_tomb_expired(t) || is_row_dead(row)) { @@ -800,9 +819,11 @@ void cache_mutation_reader::copy_from_cache_to_buffer() { _read_context.cache()._tracker.on_row_compacted(); + auto mutation_can_gc = can_gc_fn([this] (tombstone t, is_shadowable is) { return can_gc(t, is); }); + with_allocator(_snp->region().allocator(), [&] { deletable_row row_copy(row_schema, row); - row_copy.compact_and_expire(row_schema, t.tomb(), _read_time, always_gc, _gc_before, nullptr); + row_copy.compact_and_expire(row_schema, t.tomb(), _read_time, mutation_can_gc, _gc_before, nullptr); std::swap(row, row_copy); }); remove_row = row.empty(); diff --git a/read_context.hh b/read_context.hh index d7910f8f96..237e27cf95 100644 --- a/read_context.hh +++ b/read_context.hh @@ -126,6 +126,7 @@ class read_context final : public enable_lw_shared_from_this { mutation_reader::forwarding _fwd_mr; bool _range_query; const tombstone_gc_state* _tombstone_gc_state; + max_purgeable_fn _get_max_purgeable; // When reader enters a partition, it must be set up for reading that // partition from the underlying mutation source (_underlying) in one of two ways: // @@ -149,6 +150,7 @@ public: const dht::partition_range& range, const query::partition_slice& slice, const tombstone_gc_state* gc_state, + max_purgeable_fn get_max_purgeable, tracing::trace_state_ptr trace_state, mutation_reader::forwarding fwd_mr) : _cache(cache) @@ -160,6 +162,7 @@ public: , _fwd_mr(fwd_mr) , _range_query(!query::is_single_partition(range)) , _tombstone_gc_state(gc_state) + , _get_max_purgeable(std::move(get_max_purgeable)) , _underlying(_cache, *this) { ++_cache._tracker._stats.reads; @@ -195,6 +198,7 @@ public: void on_underlying_created() { ++_underlying_created; } bool digest_requested() const { return _slice.options.contains(); } const tombstone_gc_state* tombstone_gc_state() const { return _tombstone_gc_state; } + api::timestamp_type get_max_purgeable(const dht::decorated_key& dk, is_shadowable is) const { return _get_max_purgeable(dk, is); } public: future<> ensure_underlying() { if (_underlying_snapshot) { diff --git a/replica/table.cc b/replica/table.cc index b4e3548310..d5da834161 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -249,7 +249,8 @@ table::make_reader_v2(schema_ptr s, const auto bypass_cache = slice.options.contains(query::partition_slice::option::bypass_cache); if (cache_enabled() && !bypass_cache) { - if (auto reader_opt = _cache.make_reader_opt(s, permit, range, slice, &_compaction_manager.get_tombstone_gc_state(), std::move(trace_state), fwd, fwd_mr)) { + if (auto reader_opt = _cache.make_reader_opt(s, permit, range, slice, &_compaction_manager.get_tombstone_gc_state(), + get_max_purgeable_fn_for_cache_underlying_reader(), std::move(trace_state), fwd, fwd_mr)) { readers.emplace_back(std::move(*reader_opt)); } } else { diff --git a/row_cache.cc b/row_cache.cc index 9d31a6d1c6..5a907b280d 100644 --- a/row_cache.cc +++ b/row_cache.cc @@ -775,12 +775,13 @@ row_cache::make_reader_opt(schema_ptr s, const dht::partition_range& range, const query::partition_slice& slice, const tombstone_gc_state* gc_state, + max_purgeable_fn get_max_purgeable, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) { auto make_context = [&] { - return std::make_unique(*this, s, permit, range, slice, gc_state, trace_state, fwd_mr); + return std::make_unique(*this, s, permit, range, slice, gc_state, get_max_purgeable, trace_state, fwd_mr); }; if (query::is_single_partition(range) && !fwd_mr) { diff --git a/row_cache.hh b/row_cache.hh index e64b504ffc..8a3912b13d 100644 --- a/row_cache.hh +++ b/row_cache.hh @@ -24,6 +24,7 @@ #include "db/cache_tracker.hh" #include "readers/empty_v2.hh" #include "readers/mutation_source.hh" +#include "compaction/compaction_garbage_collector.hh" namespace bi = boost::intrusive; @@ -376,8 +377,9 @@ public: tracing::trace_state_ptr trace_state = nullptr, streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no, mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::no, - const tombstone_gc_state* gc_state = nullptr) { - if (auto reader_opt = make_reader_opt(s, permit, range, slice, gc_state, std::move(trace_state), fwd, fwd_mr)) { + const tombstone_gc_state* gc_state = nullptr, + max_purgeable_fn get_max_purgeable = can_never_purge) { + if (auto reader_opt = make_reader_opt(s, permit, range, slice, gc_state, std::move(get_max_purgeable), std::move(trace_state), fwd, fwd_mr)) { return std::move(*reader_opt); } [[unlikely]] return make_empty_flat_reader_v2(std::move(s), std::move(permit)); @@ -389,6 +391,7 @@ public: const dht::partition_range&, const query::partition_slice&, const tombstone_gc_state*, + max_purgeable_fn get_max_purgeable, tracing::trace_state_ptr trace_state = nullptr, streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no, mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::no); @@ -396,10 +399,11 @@ public: mutation_reader make_reader(schema_ptr s, reader_permit permit, const dht::partition_range& range = query::full_partition_range, - const tombstone_gc_state* gc_state = nullptr) { + const tombstone_gc_state* gc_state = nullptr, + max_purgeable_fn get_max_purgeable = can_never_purge) { auto& full_slice = s->full_slice(); return make_reader(std::move(s), std::move(permit), range, full_slice, nullptr, - streamed_mutation::forwarding::no, mutation_reader::forwarding::no, gc_state); + streamed_mutation::forwarding::no, mutation_reader::forwarding::no, gc_state, std::move(get_max_purgeable)); } // Only reads what is in the cache, doesn't populate. diff --git a/test/boost/row_cache_test.cc b/test/boost/row_cache_test.cc index 4a5ef39067..82ff88b732 100644 --- a/test/boost/row_cache_test.cc +++ b/test/boost/row_cache_test.cc @@ -4582,7 +4582,7 @@ SEASTAR_TEST_CASE(test_cache_compacts_expired_tombstones_on_read) { return gc_clock::now() - (std::chrono::seconds(s->gc_grace_seconds().count() + 600)); }); - auto rd1 = cache.make_reader(s, semaphore.make_permit(), query::full_partition_range, &gc_state); + auto rd1 = cache.make_reader(s, semaphore.make_permit(), query::full_partition_range, &gc_state, can_always_purge); auto close_rd = deferred_close(rd1); rd1.fill_buffer().get(); // cache_mutation_reader compacts cache on fill buffer @@ -4660,7 +4660,7 @@ SEASTAR_TEST_CASE(test_compact_range_tombstones_on_read) { set_cells_timestamp_to_min(cp.clustered_row(*s.schema(), ck3)); { - auto rd1 = cache.make_reader(s.schema(), semaphore.make_permit(), pr, &gc_state); + auto rd1 = cache.make_reader(s.schema(), semaphore.make_permit(), pr, &gc_state, can_always_purge); auto close_rd1 = deferred_close(rd1); rd1.fill_buffer().get(); @@ -4672,7 +4672,7 @@ SEASTAR_TEST_CASE(test_compact_range_tombstones_on_read) { } { - auto rd2 = cache.make_reader(s.schema(), semaphore.make_permit(), pr, &gc_state); + auto rd2 = cache.make_reader(s.schema(), semaphore.make_permit(), pr, &gc_state, can_always_purge); auto close_rd2 = deferred_close(rd2); rd2.fill_buffer().get();