mutation/mutation_compactor: add tombstone_gc_state to query ctor

So tombstones can be purged correctly based on the tombstone gc mode.
Currently if repair-mode is used, tombstones are not purged at all,
which can lead to purged tombstone being re-replicated to replicas which
already purged them via read-repair.
This is not a correctness problem, tombstones are not included in data
query resutl or digest, these purgable tombstone are only a nuissance
for read repair, where they can create extra differences between
replicas. Note that for the read repair to trigger, some difference
other than in purgable tombstones has to exist, because as mentioned
above, these are not included in digets.

Fixes: scylladb/scylladb#24332

Closes scylladb/scylladb#26351
This commit is contained in:
Botond Dénes
2025-05-30 16:47:40 +03:00
committed by Avi Kivity
parent d9c3772e20
commit 24c6476f73
13 changed files with 104 additions and 28 deletions

View File

@@ -3339,7 +3339,8 @@ void view_builder::execute(build_step& step, exponential_backoff_retry r) {
now,
step.pslice,
batch_size,
query::max_partitions);
query::max_partitions,
tombstone_gc_state(nullptr));
auto consumer = compact_for_query<view_builder::consumer>(compaction_state, view_builder::consumer{*this, _vug.shared_from_this(), step, now});
auto built = step.reader.consume_in_thread(std::move(consumer));
if (auto ds = std::move(*compaction_state).detach_state()) {

View File

@@ -787,7 +787,8 @@ future<> view_building_worker::do_build_range(table_id base_id, std::vector<tabl
now,
slice,
query::max_rows,
query::max_partitions);
query::max_partitions,
base_cf->get_compaction_manager().get_tombstone_gc_state());
auto consumer = compact_for_query<view_building_worker::consumer>(compaction_state, view_building_worker::consumer(
_db,
views_ids,

View File

@@ -345,8 +345,14 @@ private:
public:
compact_mutation_state(compact_mutation_state&&) = delete; // Because 'this' is captured
compact_mutation_state(const schema& s, gc_clock::time_point query_time, const query::partition_slice& slice, uint64_t limit,
uint32_t partition_limit, mutation_fragment_stream_validation_level validation_level = mutation_fragment_stream_validation_level::token)
compact_mutation_state(
const schema& s,
gc_clock::time_point query_time,
const query::partition_slice& slice,
uint64_t limit,
uint32_t partition_limit,
const tombstone_gc_state& gc_state,
mutation_fragment_stream_validation_level validation_level = mutation_fragment_stream_validation_level::token)
: _schema(s)
, _query_time(query_time)
, _can_gc(always_gc)
@@ -354,7 +360,7 @@ public:
, _row_limit(limit)
, _partition_limit(partition_limit)
, _partition_row_limit(_slice.options.contains(query::partition_slice::option::distinct) ? 1 : slice.partition_row_limit())
, _tombstone_gc_state(nullptr)
, _tombstone_gc_state(gc_state)
, _last_pos(position_in_partition::for_partition_end())
, _validator("mutation_compactor for read", _schema, validation_level)
{
@@ -694,9 +700,9 @@ class compact_mutation {
public:
// Can only be used for compact_for_sstables::no
compact_mutation(const schema& s, gc_clock::time_point query_time, const query::partition_slice& slice, uint64_t limit,
uint32_t partition_limit,
uint32_t partition_limit, const tombstone_gc_state& gc_state,
Consumer consumer, GCConsumer gc_consumer = GCConsumer())
: _state(make_lw_shared<compact_mutation_state<SSTableCompaction>>(s, query_time, slice, limit, partition_limit))
: _state(make_lw_shared<compact_mutation_state<SSTableCompaction>>(s, query_time, slice, limit, partition_limit, gc_state))
, _consumer(std::move(consumer))
, _gc_consumer(std::move(gc_consumer)) {
}

View File

@@ -2157,7 +2157,7 @@ to_data_query_result(const reconcilable_result& r, schema_ptr s, const query::pa
// This result was already built with a limit, don't apply another one.
query::result::builder builder(slice, opts, query::result_memory_accounter{ query::result_memory_limiter::unlimited_result_size }, query::max_tombstones);
auto consumer = compact_for_query<query_result_builder>(*s, gc_clock::time_point::min(), slice, max_rows,
max_partitions, query_result_builder(*s, builder));
max_partitions, tombstone_gc_state(nullptr), query_result_builder(*s, builder));
auto compaction_state = consumer.get_state();
frozen_mutation_consumer_adaptor adaptor(s, consumer);
for (const partition& p : r.partitions()) {
@@ -2176,7 +2176,7 @@ query::result
query_mutation(mutation&& m, const query::partition_slice& slice, uint64_t row_limit, gc_clock::time_point now, query::result_options opts) {
query::result::builder builder(slice, opts, query::result_memory_accounter{ query::result_memory_limiter::unlimited_result_size }, query::max_tombstones);
auto consumer = compact_for_query<query_result_builder>(*m.schema(), now, slice, row_limit,
query::max_partitions, query_result_builder(*m.schema(), builder));
query::max_partitions, tombstone_gc_state(nullptr), query_result_builder(*m.schema(), builder));
auto compaction_state = consumer.get_state();
std::move(m).consume(consumer, consume_in_reverse::no);
return builder.build(compaction_state->current_full_position());
@@ -2405,7 +2405,7 @@ future<mutation_opt> counter_write_query(schema_ptr s, const mutation_source& so
auto r_a_r = std::make_unique<range_and_reader>(s, source, std::move(permit), dk, slice, std::move(trace_ptr));
auto cwqrb = counter_write_query_result_builder(*s);
auto cfq = compact_for_query<counter_write_query_result_builder>(
*s, gc_clock::now(), slice, query::max_rows, query::max_partitions, std::move(cwqrb));
*s, gc_clock::now(), slice, query::max_rows, query::max_partitions, tombstone_gc_state(nullptr), std::move(cwqrb));
auto f = r_a_r->reader.consume(std::move(cfq));
return f.finally([r_a_r = std::move(r_a_r)] {
return r_a_r->reader.close();

View File

@@ -13,6 +13,7 @@
#include "query/query-result-writer.hh"
#include "query/query_result_merger.hh"
#include "readers/multishard.hh"
#include "compaction/compaction_manager.hh"
#include <fmt/core.h>
#include <seastar/core/coroutine.hh>
@@ -720,9 +721,10 @@ future<page_consume_result<ResultBuilder>> read_page(
const query::read_command& cmd,
const dht::partition_range_vector& ranges,
tracing::trace_state_ptr trace_state,
const tombstone_gc_state& gc_state,
noncopyable_function<ResultBuilder()> result_builder_factory) {
auto compaction_state = make_lw_shared<compact_for_query_state>(*s, cmd.timestamp, cmd.slice, cmd.get_row_limit(),
cmd.partition_limit);
cmd.partition_limit, gc_state);
auto reader = make_multishard_combining_reader(ctx, s, ctx->erm(), ctx->permit(), ranges.front(), cmd.slice,
trace_state, mutation_reader::forwarding(ranges.size() > 1));
@@ -774,7 +776,7 @@ future<foreign_ptr<lw_shared_ptr<typename ResultBuilder::result_type>>> do_query
// Use coroutine::as_future to prevent exception on timesout.
auto f = co_await coroutine::as_future(ctx->lookup_readers(timeout).then([&, result_builder_factory = std::move(result_builder_factory)] () mutable {
return read_page<ResultBuilder>(ctx, s, cmd, ranges, trace_state, std::move(result_builder_factory));
return read_page<ResultBuilder>(ctx, s, cmd, ranges, trace_state, table.get_compaction_manager().get_tombstone_gc_state(), std::move(result_builder_factory));
}).then([&] (page_consume_result<ResultBuilder> r) -> future<foreign_ptr<lw_shared_ptr<typename ResultBuilder::result_type>>> {
if (r.compaction_state->are_limits_reached() || r.result.is_short_read()) {
// Must call before calling `detach_state()`.

View File

@@ -605,7 +605,7 @@ future<foreign_ptr<lw_shared_ptr<query::result>>> dump_mutations(
auto accounter = co_await db.local().get_result_memory_limiter().new_data_read(permit.max_result_size(), short_read_allowed);
query_state qs(output_schema, cmd, opts, prs, std::move(accounter));
auto compaction_state = make_lw_shared<compact_for_query_state>(*output_schema, qs.cmd.timestamp, qs.cmd.slice, qs.remaining_rows(), qs.remaining_partitions());
auto compaction_state = make_lw_shared<compact_for_query_state>(*output_schema, qs.cmd.timestamp, qs.cmd.slice, qs.remaining_rows(), qs.remaining_partitions(), tombstone_gc_state(nullptr));
auto partition_key_generator = make_partition_key_generator(db, underlying_schema, prs, ts, timeout);
auto dk_opt = co_await partition_key_generator();

View File

@@ -160,9 +160,10 @@ public:
dht::partition_range range,
query::partition_slice slice,
tracing::trace_state_ptr trace_ptr,
const tombstone_gc_state& gc_state,
querier_config config = {})
: querier_base(schema, permit, std::move(range), std::move(slice), ms, std::move(trace_ptr), std::move(config))
, _compaction_state(make_lw_shared<compact_for_query_state>(*schema, gc_clock::time_point{}, *_slice, 0, 0)) {
, _compaction_state(make_lw_shared<compact_for_query_state>(*schema, gc_clock::time_point{}, *_slice, 0, 0, gc_state)) {
}
bool are_limits_reached() const {

View File

@@ -3920,7 +3920,7 @@ table::query(schema_ptr query_schema,
if (!querier_opt) {
querier_base::querier_config conf(_config.tombstone_warn_threshold);
querier_opt = querier(as_mutation_source(), query_schema, permit, range, qs.cmd.slice, trace_state, conf);
querier_opt = querier(as_mutation_source(), query_schema, permit, range, qs.cmd.slice, trace_state, get_compaction_manager().get_tombstone_gc_state(), conf);
}
auto& q = *querier_opt;
@@ -3976,7 +3976,7 @@ table::mutation_query(schema_ptr query_schema,
}
if (!querier_opt) {
querier_base::querier_config conf(_config.tombstone_warn_threshold);
querier_opt = querier(as_mutation_source(), query_schema, permit, range, cmd.slice, trace_state, conf);
querier_opt = querier(as_mutation_source(), query_schema, permit, range, cmd.slice, trace_state, get_compaction_manager().get_tombstone_gc_state(), conf);
}
auto& q = *querier_opt;

View File

@@ -1514,7 +1514,8 @@ SEASTAR_TEST_CASE(database_drop_column_family_clears_querier_cache) {
database_test_wrapper(db).get_user_read_concurrency_semaphore().make_tracking_only_permit(s, "test", db::no_timeout, {}),
query::full_partition_range,
s->full_slice(),
nullptr);
nullptr,
tombstone_gc_state(nullptr));
auto f = replica::database::legacy_drop_table_on_all_shards(e.db(), e.get_system_keyspace(), "ks", "cf");
@@ -1867,4 +1868,67 @@ SEASTAR_TEST_CASE(test_max_purgeable_can_purge) {
return make_ready_future<>();
}
SEASTAR_TEST_CASE(test_query_tombstone_gc) {
return do_with_cql_env_thread([] (cql_test_env& env) {
const auto keyspace_name = get_name();
const auto table_name = "tbl";
// Can use tablets and RF=1 after #21623 is fixed.
env.execute_cql(std::format("CREATE KEYSPACE {} WITH"
" replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 3}} AND"
" tablets = {{'enabled': 'false'}}", keyspace_name)).get();
env.execute_cql(std::format("CREATE TABLE {}.{} (pk int, ck int, v int, PRIMARY KEY (pk, ck))"
" WITH compaction = {{'class': 'NullCompactionStrategy'}}"
" AND tombstone_gc = {{'mode': 'repair', 'propagation_delay_in_seconds': 0}}", keyspace_name, table_name)).get();
auto& db = env.local_db();
auto& tbl = db.find_column_family(keyspace_name, table_name);
const auto schema = tbl.schema();
const auto tid = schema->id();
const auto pk_value = 1;
const auto pk = partition_key::from_exploded(*schema, {data_value(pk_value).serialize_nonnull()});
const auto dk = dht::decorate_key(*schema, pk);
const auto key_shard = tbl.shard_for_reads(dk.token());
env.execute_cql(format("DELETE FROM {}.{} WHERE pk = 1 AND ck = 1", keyspace_name, table_name, pk_value)).get();
env.db().invoke_on(key_shard, [] (replica::database& db) {
return db.flush_commitlog();
}).get();
const auto repair_range = dht::token_range::make(dht::first_token(), dht::last_token());
const auto repair_time = gc_clock::now() + gc_clock::duration(std::chrono::hours(1));
env.db().invoke_on_all([tid, &repair_range, &repair_time] (replica::database& db) {
auto& tbl = db.find_column_family(tid);
tbl.get_compaction_manager().get_shared_tombstone_gc_state().update_repair_time(tid, repair_range, repair_time);
}).get();
testlog.info("repair_time: {}", repair_time);
auto slice = partition_slice_builder(*schema, schema->full_slice())
.with_option<query::partition_slice::option::bypass_cache>()
.build();
const auto cmd = query::read_command(schema->id(), schema->version(), slice, db.get_query_max_result_size(), query::tombstone_limit::max);
const auto pr = dht::partition_range::make_singular(dk);
env.db().invoke_on(key_shard, [tid, &cmd, &pr] (replica::database& db) -> future<> {
auto& tbl = db.find_column_family(tid);
const auto schema = tbl.schema();
auto permit = co_await db.obtain_reader_permit(tbl, "read", db::no_timeout, {});
auto accounter = co_await db.get_result_memory_limiter().new_mutation_read(*cmd.max_result_size, query::short_read::no);
const auto res = co_await tbl.mutation_query(schema, std::move(permit), cmd, pr, {}, std::move(accounter), db::no_timeout);
BOOST_CHECK_EQUAL(res.partitions().size(), 0);
}).get();
{
const auto res = replica::query_mutations_on_all_shards(env.db(), schema, cmd, {pr}, {}, db::no_timeout).get();
BOOST_CHECK_EQUAL(std::get<0>(res)->partitions().size(), 0);
}
return make_ready_future<>();
});
}
BOOST_AUTO_TEST_SUITE_END()

View File

@@ -84,7 +84,7 @@ query::result_set to_result_set(const reconcilable_result& r, schema_ptr s, cons
static reconcilable_result mutation_query(schema_ptr s, reader_permit permit, const mutation_source& source, const dht::partition_range& range,
const query::partition_slice& slice, uint64_t row_limit, uint32_t partition_limit, gc_clock::time_point query_time) {
auto querier = replica::querier(source, s, std::move(permit), range, slice, {});
auto querier = replica::querier(source, s, std::move(permit), range, slice, {}, tombstone_gc_state(nullptr));
auto close_querier = deferred_close(querier);
auto rrb = reconcilable_result_builder(*s, slice, make_accounter());
return querier.consume_page(std::move(rrb), row_limit, partition_limit, query_time).get();
@@ -538,7 +538,7 @@ SEASTAR_TEST_CASE(test_partition_limit) {
static void data_query(schema_ptr s, reader_permit permit, const mutation_source& source, const dht::partition_range& range,
const query::partition_slice& slice, query::result::builder& builder) {
auto querier = replica::querier(source, s, std::move(permit), range, slice, {});
auto querier = replica::querier(source, s, std::move(permit), range, slice, {}, tombstone_gc_state(nullptr));
auto close_querier = deferred_close(querier);
auto qrb = query_result_builder(*s, builder);
querier.consume_page(std::move(qrb), std::numeric_limits<uint32_t>::max(), std::numeric_limits<uint32_t>::max(), gc_clock::now()).get();

View File

@@ -3499,7 +3499,7 @@ SEASTAR_THREAD_TEST_CASE(test_compactor_range_tombstone_spanning_many_pages) {
testlog.info("non-paged v2");
{
mutation res_mut(s, pk);
auto c = compact_for_query<consumer>(*s, query_time, s->full_slice(), max_rows, max_partitions, consumer{permit, res_mut, max_rows});
auto c = compact_for_query<consumer>(*s, query_time, s->full_slice(), max_rows, max_partitions, tombstone_gc_state(nullptr), consumer{permit, res_mut, max_rows});
auto reader = make_mutation_reader_from_fragments(s, permit, make_frags());
auto close_reader = deferred_close(reader);
@@ -3511,7 +3511,7 @@ SEASTAR_THREAD_TEST_CASE(test_compactor_range_tombstone_spanning_many_pages) {
testlog.info("limited pages v2");
{
mutation res_mut(s, pk);
auto compaction_state = make_lw_shared<compact_mutation_state<compact_for_sstables::no>>(*s, query_time, s->full_slice(), 1, max_partitions);
auto compaction_state = make_lw_shared<compact_mutation_state<compact_for_sstables::no>>(*s, query_time, s->full_slice(), 1, max_partitions, tombstone_gc_state(nullptr));
auto reader = make_mutation_reader_from_fragments(s, permit, make_frags());
auto close_reader = deferred_close(reader);
@@ -3527,7 +3527,7 @@ SEASTAR_THREAD_TEST_CASE(test_compactor_range_tombstone_spanning_many_pages) {
testlog.info("short pages v2");
{
mutation res_mut(s, pk);
auto compaction_state = make_lw_shared<compact_mutation_state<compact_for_sstables::no>>(*s, query_time, s->full_slice(), max_rows, max_partitions);
auto compaction_state = make_lw_shared<compact_mutation_state<compact_for_sstables::no>>(*s, query_time, s->full_slice(), max_rows, max_partitions, tombstone_gc_state(nullptr));
auto reader = make_mutation_reader_from_fragments(s, permit, make_frags());
auto close_reader = deferred_close(reader);
@@ -3552,7 +3552,7 @@ SEASTAR_THREAD_TEST_CASE(test_compactor_range_tombstone_spanning_many_pages) {
if (detached_state) {
restore_state(reader, std::move(*detached_state));
}
auto compaction_state = make_lw_shared<compact_mutation_state<compact_for_sstables::no>>(*s, query_time, s->full_slice(), 1, max_partitions);
auto compaction_state = make_lw_shared<compact_mutation_state<compact_for_sstables::no>>(*s, query_time, s->full_slice(), 1, max_partitions, tombstone_gc_state(nullptr));
auto c = consumer{permit, res_mut, max_rows};
reader.consume(compact_for_query<consumer>(compaction_state, std::move(c))).get();
detached_state = std::move(*compaction_state).detach_state();
@@ -3573,7 +3573,7 @@ SEASTAR_THREAD_TEST_CASE(test_compactor_range_tombstone_spanning_many_pages) {
if (detached_state) {
restore_state(reader, std::move(*detached_state));
}
auto compaction_state = make_lw_shared<compact_mutation_state<compact_for_sstables::no>>(*s, query_time, s->full_slice(), max_rows, max_partitions);
auto compaction_state = make_lw_shared<compact_mutation_state<compact_for_sstables::no>>(*s, query_time, s->full_slice(), max_rows, max_partitions, tombstone_gc_state(nullptr));
auto c = consumer{permit, res_mut, 2};
reader.consume(compact_for_query<consumer>(compaction_state, std::move(c))).get();
detached_state = std::move(*compaction_state).detach_state();
@@ -3662,7 +3662,7 @@ SEASTAR_THREAD_TEST_CASE(test_compactor_detach_state) {
auto check = [&] (uint64_t stop_at, bool final_stop) {
testlog.debug("stop_at={}, final_stop={}", stop_at, final_stop);
auto compaction_state = make_lw_shared<compact_mutation_state<compact_for_sstables::no>>(*s, query_time, s->full_slice(), max_rows, max_partitions);
auto compaction_state = make_lw_shared<compact_mutation_state<compact_for_sstables::no>>(*s, query_time, s->full_slice(), max_rows, max_partitions, tombstone_gc_state(nullptr));
auto reader = make_mutation_reader_from_fragments(s, permit, make_frags());
auto close_reader = deferred_close(reader);
reader.consume(compact_for_query<consumer>(compaction_state, consumer(stop_at, final_stop))).get();
@@ -3751,7 +3751,7 @@ SEASTAR_THREAD_TEST_CASE(test_compactor_validator) {
}
auto compaction_state = make_lw_shared<compact_mutation_state<compact_for_sstables::no>>(*s, gc_clock::now(), s->full_slice(),
std::numeric_limits<uint64_t>::max(), std::numeric_limits<uint64_t>::max(), mutation_fragment_stream_validation_level::clustering_key);
std::numeric_limits<uint64_t>::max(), std::numeric_limits<uint64_t>::max(), tombstone_gc_state(nullptr), mutation_fragment_stream_validation_level::clustering_key);
auto reader = make_mutation_reader_from_fragments(s, permit, std::move(frags));
auto close_reader = deferred_close(reader);
bool is_valid = true;

View File

@@ -116,7 +116,8 @@ private:
_sem.make_tracking_only_permit(_s.schema(), "make-querier", timeout, {}),
range,
_s.schema()->full_slice(),
nullptr);
nullptr,
tombstone_gc_state(nullptr));
}
static query_id make_cache_key(unsigned key) {

View File

@@ -190,7 +190,7 @@ void test_scan_with_range_delete_over_rows() {
auto d = duration_in_seconds([&] {
auto slice = partition_slice_builder(*s).build();
auto q = replica::querier(cache_ms, s, semaphore.make_permit(), pr, slice, nullptr);
auto q = replica::querier(cache_ms, s, semaphore.make_permit(), pr, slice, nullptr, tombstone_gc_state(nullptr));
auto close_q = deferred_close(q);
q.consume_page(noop_compacted_fragments_consumer(),
std::numeric_limits<uint32_t>::max(),