diff --git a/db/view/view.cc b/db/view/view.cc index 4523d33742..74521934cc 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -2254,17 +2254,20 @@ public: // Called in the context of a seastar::thread. void view_builder::execute(build_step& step, exponential_backoff_retry r) { gc_clock::time_point now = gc_clock::now(); - auto consumer = compact_for_query_v2( + auto compaction_state = make_lw_shared( *step.reader.schema(), now, step.pslice, batch_size, - query::max_partitions, - view_builder::consumer{*this, step, now}); - if (auto mfp = step.reader.peek().get(); mfp && !mfp->is_partition_start()) { - consumer.consume_new_partition(step.current_key); // Initialize the state in case we're resuming a partition - } + query::max_partitions); + auto consumer = compact_for_query_v2(compaction_state, view_builder::consumer{*this, step, now}); auto built = step.reader.consume_in_thread(std::move(consumer)); + if (auto ds = std::move(*compaction_state).detach_state()) { + if (ds->current_tombstone) { + step.reader.unpop_mutation_fragment(mutation_fragment_v2(*step.reader.schema(), step.reader.permit(), std::move(*ds->current_tombstone))); + } + step.reader.unpop_mutation_fragment(mutation_fragment_v2(*step.reader.schema(), step.reader.permit(), std::move(ds->partition_start))); + } _as.check(); diff --git a/test/cql-pytest/test_materialized_view.py b/test/cql-pytest/test_materialized_view.py index 270ad3ba9e..0a11ea9f10 100644 --- a/test/cql-pytest/test_materialized_view.py +++ b/test/cql-pytest/test_materialized_view.py @@ -369,3 +369,77 @@ def test_oversized_base_regular_view_key_build(cql, test_keyspace, cassandra_bug break time.sleep(0.1) assert results == {(str(i), i) for i in range(30)} + +# Reproduces #11668 +# When the view builder resumes building a partition, it reuses the reader +# used from the previous step but re-creates the compactor. This means that any +# range tombstone changes active at the time of suspending the step, have to be +# explicitly re-opened on when resuming. Without that, already deleted base rows +# can be resurrected as demonstrated by this test. +# The view-builder suspends processing a base-table after +# `view_builder::batch_size` (that is 128) rows. So in this test we create a +# table which has at least 2X that many rows and add a range tombstone so that +# it covers half of the rows (even rows are covered why odd rows aren't). +def test_view_builder_suspend_with_active_range_tombstone(cql, test_keyspace, scylla_only): + with new_test_table(cql, test_keyspace, "pk int, ck int, v int, PRIMARY KEY(pk, ck)", "WITH compaction = {'class': 'NullCompactionStrategy'}") as table: + stmt = cql.prepare(f'INSERT INTO {table} (pk, ck, v) VALUES (?, ?, ?)') + + # sstable 1 - even rows + for ck in range(0, 512, 2): + cql.execute(stmt, (0, ck, ck)) + nodetool.flush(cql, table) + + # sstable 2 - odd rows and a range tombstone covering even rows + # we need two sstables so memtable doesn't compact away the shadowed rows + cql.execute(f"DELETE FROM {table} WHERE pk = 0 AND ck >= 0 AND ck < 512") + for ck in range(1, 512, 2): + cql.execute(stmt, (0, ck, ck)) + nodetool.flush(cql, table) + + # we should not see any even rows here - they are covered by the range tombstone + res = [r.ck for r in cql.execute(f"SELECT ck FROM {table} WHERE pk = 0")] + assert res == list(range(1, 512, 2)) + + with new_materialized_view(cql, table, select='*', pk='v,pk,ck', where='v is not null and pk is not null and ck is not null') as mv: + start_time = time.time() + while time.time() < start_time + 30: + res = sorted([r.v for r in cql.execute(f"SELECT * FROM {mv}")]) + if len(res) >= 512/2: + break + time.sleep(0.1) + # again, we should not see any even rows in the materialized-view, + # they are covered with a range tombstone in the base-table + assert res == list(range(1, 512, 2)) + +# A variant of the above using a partition-tombstone, which is also lost similar +# to range tombstones. +def test_view_builder_suspend_with_partition_tombstone(cql, test_keyspace, scylla_only): + with new_test_table(cql, test_keyspace, "pk int, ck int, v int, PRIMARY KEY(pk, ck)", "WITH compaction = {'class': 'NullCompactionStrategy'}") as table: + stmt = cql.prepare(f'INSERT INTO {table} (pk, ck, v) VALUES (?, ?, ?)') + + # sstable 1 - even rows + for ck in range(0, 512, 2): + cql.execute(stmt, (0, ck, ck)) + nodetool.flush(cql, table) + + # sstable 2 - odd rows and a partition covering even rows + # we need two sstables so memtable doesn't compact away the shadowed rows + cql.execute(f"DELETE FROM {table} WHERE pk = 0") + for ck in range(1, 512, 2): + cql.execute(stmt, (0, ck, ck)) + nodetool.flush(cql, table) + + # we should not see any even rows here - they are covered by the partition tombstone + res = [r.ck for r in cql.execute(f"SELECT ck FROM {table} WHERE pk = 0")] + assert res == list(range(1, 512, 2)) + + with new_materialized_view(cql, table, select='*', pk='v,pk,ck', where='v is not null and pk is not null and ck is not null') as mv: + start_time = time.time() + while time.time() < start_time + 30: + res = sorted([r.v for r in cql.execute(f"SELECT * FROM {mv}")]) + if len(res) >= 512/2: + break + time.sleep(0.1) + # again, we should not see any even rows in the materialized-view, + # they are covered with a partition tombstone in the base-table + assert res == list(range(1, 512, 2))