diff --git a/test/cluster/test_incremental_repair.py b/test/cluster/test_incremental_repair.py index 97dedd9ba1..f8a1119b9a 100644 --- a/test/cluster/test_incremental_repair.py +++ b/test/cluster/test_incremental_repair.py @@ -5,10 +5,11 @@ # from test.pylib.manager_client import ManagerClient -from test.pylib.repair import load_tablet_sstables_repaired_at, create_table_insert_data_for_repair +from test.pylib.repair import load_tablet_sstables_repaired_at, load_tablet_repair_time, create_table_insert_data_for_repair from test.pylib.tablets import get_all_tablet_replicas from test.cluster.tasks.task_manager_client import TaskManagerClient from test.cluster.util import reconnect_driver, find_server_by_host_id, get_topology_coordinator, new_test_keyspace, new_test_table, trigger_stepdown +from test.pylib.util import wait_for_cql_and_get_hosts from cassandra.query import ConsistencyLevel @@ -1050,3 +1051,262 @@ async def test_incremental_repair_race_window_promotes_unrepaired_data(manager: f"on servers[1] after restart lost the being_repaired markers during the race window. " \ f"They are UNREPAIRED on servers[0] and servers[2] (classification divergence). " \ f"Wrongly promoted (first 10): {sorted(wrongly_promoted)[:10]}" + +# ---------------------------------------------------------------------------- +# Tombstone GC safety tests +# +# These tests verify that incremental repair with tombstone_gc=repair never +# causes data resurrection via premature tombstone GC. The key invariant is: +# +# mark_sstable_as_repaired() completes on ALL replicas BEFORE the coordinator +# commits repair_time to Raft. Therefore, by the time gc_before advances and +# a tombstone T becomes GC-eligible, any data D that T shadows has already +# been promoted from the repairing set to the repaired set. Tombstone GC +# running on the repaired set will always see D there and will prevent T from +# being purged prematurely. +# +# Three scenarios are tested: +# +# 1. Basic ordering guarantee: D arrives via hint flush at the start of repair, +# is captured in the repairing snapshot, and is promoted to repaired before +# repair_time advances. GC then runs on the repaired set; T must not be +# purged while D is still present. +# +# 2. Hints flush failure: hints flush times out on one replica so D is not +# delivered during that repair. repair_time must NOT advance (the guard in +# repair that skips the repair_history update when hints_batchlog_flushed is +# false). Therefore T does not become GC-eligible from this repair, and no +# resurrection can occur even though D is not yet on that replica. +# +# 3. Propagation delay: D is written with an old USING TIMESTAMP (simulating a +# write that arrives within the propagation delay window). T is written +# later with a higher timestamp. Repair runs, D is captured in repairing +# (delivered via hint flush), T is in repaired. After repair completes D is +# in repaired too. GC on the repaired set must not purge T because D has a +# lower timestamp but is visible. +# ---------------------------------------------------------------------------- + +async def _setup_tombstone_gc_cluster(manager, *, tablets=2, extra_cmdline=None): + """Create a 3-node cluster with a tombstone_gc=repair table and return + (servers, cql, hosts, ks, table_id, logs). + + Uses propagation_delay_in_seconds=0 so that tombstones become GC-eligible + immediately after repair_time is committed (T.deletion_time < repair_time = gc_before), + allowing the tests to actually exercise the GC eligibility path without sleeping. + """ + cmdline = ['--logger-log-level', 'repair=debug'] + if extra_cmdline: + cmdline += extra_cmdline + servers, cql, hosts, ks, table_id = await create_table_insert_data_for_repair( + manager, nr_keys=0, cmdline=cmdline, tablets=tablets, + disable_flush_cache_time=True) + # Lower propagation_delay to 0 so gc_before = repair_time, making tombstones + # GC-eligible immediately after a successful repair rather than 1h later. + await cql.run_async( + f"ALTER TABLE {ks}.test WITH tombstone_gc = {{'mode': 'repair', 'propagation_delay_in_seconds': '0'}}" + ) + logs = [await manager.server_open_log(s.server_id) for s in servers] + return servers, cql, hosts, ks, table_id, logs + + +async def _trigger_repaired_compaction(manager, server, ks): + """Force a compaction that operates on the repaired sstable set.""" + await manager.api.keyspace_compaction(server.ip_addr, ks, "test") + + +async def _assert_key_visible(cql, ks, key, hosts, *, msg=""): + """Assert that key is readable on every replica (by querying each host directly).""" + for h in hosts: + rows = await cql.run_async( + f"SELECT pk FROM {ks}.test WHERE pk = {key}", + host=h) + assert rows, f"Key {key} not found on host {h} after tombstone GC compaction. {msg}" + + +async def _assert_key_deleted(cql, ks, key, hosts, *, msg=""): + """Assert that key is not visible (tombstone wins) on every replica.""" + for h in hosts: + rows = await cql.run_async( + f"SELECT pk FROM {ks}.test WHERE pk = {key}", + host=h) + assert not rows, f"Key {key} unexpectedly visible on host {h} — data resurrection! {msg}" + + +@pytest.mark.asyncio +@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode') +async def test_tombstone_gc_no_resurrection_basic_ordering(manager: ManagerClient): + """Verify that the ordering guarantee prevents premature tombstone GC. + + With propagation_delay=0, gc_before = repair_time. T.deletion_time (wall-clock + time of DELETE, just before repair starts) < repair_time = gc_before, so T IS + GC-eligible after repair. D (CQL timestamp=1) is in the repaired set because it + was captured in the repairing snapshot and promoted before repair_time was committed. + Compaction on the repaired set must NOT purge T because D (min_live_timestamp=1) + makes T non-purgeable (T.timestamp=2 > max_purgeable=1). + + Scenario: + - D written at ts=1 to all replicas, flushed. + - T (row deletion, ts=2) written to all replicas, flushed. + - Repair runs: both D and T move repairing → repaired. repair_time advances. + - gc_before = repair_time > T.deletion_time → T is GC-eligible. + - Repaired compaction must NOT purge T because D (in repaired) prevents it. + - Key must remain deleted. + """ + servers, cql, hosts, ks, table_id, logs = await _setup_tombstone_gc_cluster(manager) + + # D at ts=1, T at ts=2 (T wins over D, correctly deletes the row). + key = 42 + await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({key}, 1) USING TIMESTAMP 1") + for s in servers: + await manager.api.flush_keyspace(s.ip_addr, ks) + + await cql.run_async(f"DELETE FROM {ks}.test USING TIMESTAMP 2 WHERE pk = {key}") + for s in servers: + await manager.api.flush_keyspace(s.ip_addr, ks) + + # Both D and T captured in repairing, then promoted to repaired. repair_time advances. + await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", "all", incremental_mode='incremental') + + # With propagation_delay=0: gc_before = repair_time > T.deletion_time → T is GC-eligible. + # Compaction on the repaired set: D is also in repaired (promoted before repair_time + # was committed), so max_purgeable = D.timestamp = 1 < T.timestamp = 2 → T not purgeable. + for s in servers: + await _trigger_repaired_compaction(manager, s, ks) + + # T must not have been GC'd; key must remain deleted (not resurrected). + await _assert_key_deleted(cql, ks, key, hosts, + msg="T was prematurely GC'd on repaired compaction (D in repaired should prevent it)") + + logger.info("test_tombstone_gc_no_resurrection_basic_ordering: PASSED") + + +@pytest.mark.asyncio +@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode') +async def test_tombstone_gc_no_resurrection_hints_flush_failure(manager: ManagerClient): + """Verify that repair_time stays at epoch when hints flush fails, so tombstones + are never GC-eligible after such a repair and data resurrection cannot occur. + + With propagation_delay=0, gc_before = repair_time. When hints flush fails, + repair_time is set to epoch (gc_clock::time_point{}) by the repair framework + (flush_time stays at epoch because hints_batchlog_flushed=False). Therefore + gc_before = epoch, T.deletion_time ≈ now >> epoch, T is never GC-eligible, and + compaction cannot purge T regardless of what is in the repaired set. + + Scenario: + - D and T written to all replicas and flushed. + - Repair runs with injection that makes hints flush fail on servers[2]. + - repair_time must stay at epoch (guard: hints_batchlog_flushed=False). + - Compaction on the repaired set: T not GC-eligible → key stays deleted. + """ + servers, cql, hosts, ks, table_id, logs = await _setup_tombstone_gc_cluster(manager) + + key = 99 + + # Write D at ts=1 and T at ts=2 to all nodes so everyone has the tombstone. + await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({key}, 1) USING TIMESTAMP 1") + await cql.run_async(f"DELETE FROM {ks}.test USING TIMESTAMP 2 WHERE pk = {key}") + for s in servers: + await manager.api.flush_keyspace(s.ip_addr, ks) + + # Read the initial repair_time so we can verify it doesn't meaningfully advance. + initial_repair_times = await load_tablet_repair_time(cql, hosts, table_id) + + # Inject: make batchlog manager appear uninitialized on servers[2] so hints flush fails. + await manager.api.enable_injection(servers[2].ip_addr, "repair_flush_hints_batchlog_handler_bm_uninitialized", one_shot=False) + + try: + # Repair warns about hints flush failure but continues (repair.cc outer catch). + # flush_time stays at epoch so repair_time will be set to epoch in system.tablets. + await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", "all", incremental_mode='incremental') + except Exception: + pass # repair may report failure; we care about side-effects only + finally: + await manager.api.disable_injection(servers[2].ip_addr, "repair_flush_hints_batchlog_handler_bm_uninitialized") + + # repair_time must NOT have advanced to a meaningful time (it stays at epoch when + # hints flush fails, so gc_before = epoch and T is never GC-eligible). + new_repair_times = await load_tablet_repair_time(cql, hosts, table_id) + for token, old_time in initial_repair_times.items(): + new_time = new_repair_times.get(token) + assert new_time == old_time or new_time is None, \ + f"repair_time advanced for token {token} despite hints flush failure: " \ + f"{old_time} → {new_time}" + + # Trigger compaction on the repaired set; T is not GC-eligible (gc_before = epoch). + for s in servers: + await _trigger_repaired_compaction(manager, s, ks) + + # Key must remain deleted (T not GC'd). + await _assert_key_deleted(cql, ks, key, hosts, + msg="T was prematurely GC'd despite hints flush failure (repair_time should be epoch)") + + logger.info("test_tombstone_gc_no_resurrection_hints_flush_failure: PASSED") + + +@pytest.mark.asyncio +@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode') +async def test_tombstone_gc_no_resurrection_propagation_delay(manager: ManagerClient): + """Verify the ordering guarantee when D arrives via hint flush just before repair. + + D has an old CQL timestamp (ts_d = now - 2h) simulating a write that was delayed + by the propagation window. T has a higher CQL timestamp (ts_t = now - 90min) so T + correctly shadows D. With propagation_delay=0, T becomes GC-eligible as soon as + repair_time is committed (T.deletion_time < repair_time = gc_before). + + The invariant being tested: mark_sstable_as_repaired() runs on all replicas BEFORE + the coordinator commits repair_time to Raft. So when gc_before advances (repair_time + committed), D is already in the repaired set on all replicas. Repaired compaction + must NOT purge T because D (min_live_timestamp = ts_d < ts_t) is present there. + + Scenario: + - servers[2] stopped; D and T written (go to hints on coordinator for servers[2]). + - D and T flushed on servers[0] and servers[1]. + - servers[2] restarted. + - Repair: hints flush delivers D and T to servers[2] before the repairing snapshot. + mark_sstable_as_repaired() promotes D and T to repaired on servers[2]. + repair_time then committed. gc_before = repair_time > T.deletion_time. + - Repaired compaction: D (ts_d) in repaired prevents T (ts_t > ts_d) from being GC'd. + - Key must remain deleted. + """ + servers, cql, hosts, ks, table_id, logs = await _setup_tombstone_gc_cluster( + manager, extra_cmdline=['--hinted-handoff-enabled', '1']) + + # Stop servers[2] so writes to it are queued as hints on the coordinator. + await manager.server_stop_gracefully(servers[2].server_id) + + key = 77 + # CQL USING TIMESTAMP is in microseconds since epoch. + now_us = int(time.time() * 1e6) + ts_d = now_us - int(2 * 3600 * 1e6) # 2 hours ago (CQL µs timestamp) + ts_t = now_us - int(90 * 60 * 1e6) # 90 minutes ago (CQL µs timestamp); ts_t > ts_d + + # D and T go to servers[0] and servers[1] directly; servers[2] gets hints queued. + await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({key}, 1) USING TIMESTAMP {ts_d}") + await cql.run_async(f"DELETE FROM {ks}.test USING TIMESTAMP {ts_t} WHERE pk = {key}") + + for s in [servers[0], servers[1]]: + await manager.api.flush_keyspace(s.ip_addr, ks) + + # Restart servers[2]; D and T are not on its disk yet. + await manager.server_start(servers[2].server_id) + await manager.servers_see_each_other(servers) + await reconnect_driver(manager) + cql = manager.get_cql() + hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60) + + # Repair: hints flush sends D and T to servers[2] before the repairing snapshot. + # After row sync, mark_sstable_as_repaired() promotes D and T to repaired on servers[2]. + # Only then does the coordinator commit repair_time to Raft. + await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", "all", incremental_mode='incremental') + + # With propagation_delay=0: gc_before = repair_time > T.deletion_time → T is GC-eligible. + # But D (ts_d < ts_t) is already in repaired on servers[2], so max_purgeable = ts_d + # < ts_t = T.timestamp → T is NOT purgeable. + for s in servers: + await _trigger_repaired_compaction(manager, s, ks) + + # Key must remain deleted (T not GC'd, D not resurrected). + await _assert_key_deleted(cql, ks, key, hosts, + msg="Data resurrection: T was GC'd despite D being present in repaired set") + + logger.info("test_tombstone_gc_no_resurrection_propagation_delay: PASSED")