db/view/view_builder: don't drop partition and range tombstones when resuming

The view builder builds the views from a given base table in
view_builder::batch_size batches of rows. After processing this many
rows, it suspends so the view builder can switch to building views for
other base tables in the name of fairness. When resuming the build step
for a given base table, it reuses the reader used previously (also
serving the role of a snapshot, pinning sstables read from). The
compactor however is created anew. As the reader can be in the middle of
a partition, the view builder injects a partition start into the
compactor to prime it for continuing the partition. This however only
included the partition-key, crucially missing any active tombstones:
partition tombstone or -- since the v2 transition -- active range
tombstone. This can result in base rows covered by either of this to be
resurrected and the view builder to generate view updates for them.
This patch solves this by using the detach-state mechanism of the
compactor which was explicitly developed for situations like this (in
the range scan code) -- resuming a read with the readers kept but the
compactor recreated.
Also included are two test cases reproducing the problem, one with a
range tombstone, the other with a partition tombstone.

Fixes: #11668

Closes #11671
This commit is contained in:
Botond Dénes
2022-09-30 11:13:19 +03:00
committed by Nadav Har'El
parent 2c744628ae
commit 5621cdd7f9
2 changed files with 83 additions and 6 deletions

View File

@@ -2254,17 +2254,20 @@ public:
// Called in the context of a seastar::thread.
void view_builder::execute(build_step& step, exponential_backoff_retry r) {
gc_clock::time_point now = gc_clock::now();
auto consumer = compact_for_query_v2<view_builder::consumer>(
auto compaction_state = make_lw_shared<compact_for_query_state_v2>(
*step.reader.schema(),
now,
step.pslice,
batch_size,
query::max_partitions,
view_builder::consumer{*this, step, now});
if (auto mfp = step.reader.peek().get(); mfp && !mfp->is_partition_start()) {
consumer.consume_new_partition(step.current_key); // Initialize the state in case we're resuming a partition
}
query::max_partitions);
auto consumer = compact_for_query_v2<view_builder::consumer>(compaction_state, view_builder::consumer{*this, step, now});
auto built = step.reader.consume_in_thread(std::move(consumer));
if (auto ds = std::move(*compaction_state).detach_state()) {
if (ds->current_tombstone) {
step.reader.unpop_mutation_fragment(mutation_fragment_v2(*step.reader.schema(), step.reader.permit(), std::move(*ds->current_tombstone)));
}
step.reader.unpop_mutation_fragment(mutation_fragment_v2(*step.reader.schema(), step.reader.permit(), std::move(ds->partition_start)));
}
_as.check();

View File

@@ -369,3 +369,77 @@ def test_oversized_base_regular_view_key_build(cql, test_keyspace, cassandra_bug
break
time.sleep(0.1)
assert results == {(str(i), i) for i in range(30)}
# Reproduces #11668
# When the view builder resumes building a partition, it reuses the reader
# used from the previous step but re-creates the compactor. This means that any
# range tombstone changes active at the time of suspending the step, have to be
# explicitly re-opened on when resuming. Without that, already deleted base rows
# can be resurrected as demonstrated by this test.
# The view-builder suspends processing a base-table after
# `view_builder::batch_size` (that is 128) rows. So in this test we create a
# table which has at least 2X that many rows and add a range tombstone so that
# it covers half of the rows (even rows are covered why odd rows aren't).
def test_view_builder_suspend_with_active_range_tombstone(cql, test_keyspace, scylla_only):
with new_test_table(cql, test_keyspace, "pk int, ck int, v int, PRIMARY KEY(pk, ck)", "WITH compaction = {'class': 'NullCompactionStrategy'}") as table:
stmt = cql.prepare(f'INSERT INTO {table} (pk, ck, v) VALUES (?, ?, ?)')
# sstable 1 - even rows
for ck in range(0, 512, 2):
cql.execute(stmt, (0, ck, ck))
nodetool.flush(cql, table)
# sstable 2 - odd rows and a range tombstone covering even rows
# we need two sstables so memtable doesn't compact away the shadowed rows
cql.execute(f"DELETE FROM {table} WHERE pk = 0 AND ck >= 0 AND ck < 512")
for ck in range(1, 512, 2):
cql.execute(stmt, (0, ck, ck))
nodetool.flush(cql, table)
# we should not see any even rows here - they are covered by the range tombstone
res = [r.ck for r in cql.execute(f"SELECT ck FROM {table} WHERE pk = 0")]
assert res == list(range(1, 512, 2))
with new_materialized_view(cql, table, select='*', pk='v,pk,ck', where='v is not null and pk is not null and ck is not null') as mv:
start_time = time.time()
while time.time() < start_time + 30:
res = sorted([r.v for r in cql.execute(f"SELECT * FROM {mv}")])
if len(res) >= 512/2:
break
time.sleep(0.1)
# again, we should not see any even rows in the materialized-view,
# they are covered with a range tombstone in the base-table
assert res == list(range(1, 512, 2))
# A variant of the above using a partition-tombstone, which is also lost similar
# to range tombstones.
def test_view_builder_suspend_with_partition_tombstone(cql, test_keyspace, scylla_only):
with new_test_table(cql, test_keyspace, "pk int, ck int, v int, PRIMARY KEY(pk, ck)", "WITH compaction = {'class': 'NullCompactionStrategy'}") as table:
stmt = cql.prepare(f'INSERT INTO {table} (pk, ck, v) VALUES (?, ?, ?)')
# sstable 1 - even rows
for ck in range(0, 512, 2):
cql.execute(stmt, (0, ck, ck))
nodetool.flush(cql, table)
# sstable 2 - odd rows and a partition covering even rows
# we need two sstables so memtable doesn't compact away the shadowed rows
cql.execute(f"DELETE FROM {table} WHERE pk = 0")
for ck in range(1, 512, 2):
cql.execute(stmt, (0, ck, ck))
nodetool.flush(cql, table)
# we should not see any even rows here - they are covered by the partition tombstone
res = [r.ck for r in cql.execute(f"SELECT ck FROM {table} WHERE pk = 0")]
assert res == list(range(1, 512, 2))
with new_materialized_view(cql, table, select='*', pk='v,pk,ck', where='v is not null and pk is not null and ck is not null') as mv:
start_time = time.time()
while time.time() < start_time + 30:
res = sorted([r.v for r in cql.execute(f"SELECT * FROM {mv}")])
if len(res) >= 512/2:
break
time.sleep(0.1)
# again, we should not see any even rows in the materialized-view,
# they are covered with a partition tombstone in the base-table
assert res == list(range(1, 512, 2))