test/repair: Add tombstone GC safety tests for incremental repair
Add three cluster tests that verify no data resurrection occurs when
tombstone GC runs on the repaired sstable set under incremental repair
with tombstone_gc=repair mode.
All tests use propagation_delay_in_seconds=0 so that tombstones become
GC-eligible immediately after repair_time is committed (gc_before =
repair_time), allowing the scenarios to exercise the actual GC eligibility
path without artificial sleeps.
(test_tombstone_gc_no_resurrection_basic_ordering)
Data D (ts=1) and tombstone T (ts=2) are written to all replicas and
flushed before repair. Repair captures both in the repairing snapshot
and promotes them to repaired. Once repair_time is committed, T is
GC-eligible (T.deletion_time < gc_before = repair_time).
The test verifies that compaction on the repaired set does NOT purge T,
because D is already in repaired (mark_sstable_as_repaired() completes
on all replicas before repair_time is committed to Raft) and clamps
max_purgeable to D.timestamp=1 < T.timestamp=2.
(test_tombstone_gc_no_resurrection_hints_flush_failure)
The repair_flush_hints_batchlog_handler_bm_uninitialized injection causes
hints flush to fail on one node. When hints flush fails, flush_time stays
at gc_clock::time_point{} (epoch). This propagates as repair_time=epoch
committed to system.tablets, so gc_before = epoch - propagation_delay is
effectively the minimum possible time. No tombstone has a deletion_time
older than epoch, so T is never GC-eligible from this repair.
The test verifies that repair_time does not advance to a meaningful value
after a failed hints flush, and that compaction on the repaired set does
not purge T (key remains deleted, no resurrection).
(test_tombstone_gc_no_resurrection_propagation_delay)
Simulates a write D carrying an old CQL USING TIMESTAMP (ts_d = now-2h)
that was stored as a hint while a replica was down, and a tombstone T
with a higher timestamp (ts_t = now-90min, ts_t > ts_d) that was written
to all live replicas. After the replica restarts, repair flushes hints
synchronously before taking the repairing snapshot, guaranteeing D is
delivered and captured in repairing before the snapshot.
After mark_sstable_as_repaired() promotes D to repaired, the coordinator
commits repair_time. gc_before = repair_time > T.deletion_time so T is
GC-eligible. The test verifies that compaction on the repaired set does
NOT purge T: D (ts_d < ts_t) is already in repaired, clamping
max_purgeable = ts_d < ts_t = T.timestamp, so T is not purgeable.
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
@@ -5,10 +5,11 @@
|
||||
#
|
||||
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.repair import load_tablet_sstables_repaired_at, create_table_insert_data_for_repair
|
||||
from test.pylib.repair import load_tablet_sstables_repaired_at, load_tablet_repair_time, create_table_insert_data_for_repair
|
||||
from test.pylib.tablets import get_all_tablet_replicas
|
||||
from test.cluster.tasks.task_manager_client import TaskManagerClient
|
||||
from test.cluster.util import reconnect_driver, find_server_by_host_id, get_topology_coordinator, new_test_keyspace, new_test_table, trigger_stepdown
|
||||
from test.pylib.util import wait_for_cql_and_get_hosts
|
||||
|
||||
from cassandra.query import ConsistencyLevel
|
||||
|
||||
@@ -1050,3 +1051,262 @@ async def test_incremental_repair_race_window_promotes_unrepaired_data(manager:
|
||||
f"on servers[1] after restart lost the being_repaired markers during the race window. " \
|
||||
f"They are UNREPAIRED on servers[0] and servers[2] (classification divergence). " \
|
||||
f"Wrongly promoted (first 10): {sorted(wrongly_promoted)[:10]}"
|
||||
|
||||
# ----------------------------------------------------------------------------
|
||||
# Tombstone GC safety tests
|
||||
#
|
||||
# These tests verify that incremental repair with tombstone_gc=repair never
|
||||
# causes data resurrection via premature tombstone GC. The key invariant is:
|
||||
#
|
||||
# mark_sstable_as_repaired() completes on ALL replicas BEFORE the coordinator
|
||||
# commits repair_time to Raft. Therefore, by the time gc_before advances and
|
||||
# a tombstone T becomes GC-eligible, any data D that T shadows has already
|
||||
# been promoted from the repairing set to the repaired set. Tombstone GC
|
||||
# running on the repaired set will always see D there and will prevent T from
|
||||
# being purged prematurely.
|
||||
#
|
||||
# Three scenarios are tested:
|
||||
#
|
||||
# 1. Basic ordering guarantee: D arrives via hint flush at the start of repair,
|
||||
# is captured in the repairing snapshot, and is promoted to repaired before
|
||||
# repair_time advances. GC then runs on the repaired set; T must not be
|
||||
# purged while D is still present.
|
||||
#
|
||||
# 2. Hints flush failure: hints flush times out on one replica so D is not
|
||||
# delivered during that repair. repair_time must NOT advance (the guard in
|
||||
# repair that skips the repair_history update when hints_batchlog_flushed is
|
||||
# false). Therefore T does not become GC-eligible from this repair, and no
|
||||
# resurrection can occur even though D is not yet on that replica.
|
||||
#
|
||||
# 3. Propagation delay: D is written with an old USING TIMESTAMP (simulating a
|
||||
# write that arrives within the propagation delay window). T is written
|
||||
# later with a higher timestamp. Repair runs, D is captured in repairing
|
||||
# (delivered via hint flush), T is in repaired. After repair completes D is
|
||||
# in repaired too. GC on the repaired set must not purge T because D has a
|
||||
# lower timestamp but is visible.
|
||||
# ----------------------------------------------------------------------------
|
||||
|
||||
async def _setup_tombstone_gc_cluster(manager, *, tablets=2, extra_cmdline=None):
|
||||
"""Create a 3-node cluster with a tombstone_gc=repair table and return
|
||||
(servers, cql, hosts, ks, table_id, logs).
|
||||
|
||||
Uses propagation_delay_in_seconds=0 so that tombstones become GC-eligible
|
||||
immediately after repair_time is committed (T.deletion_time < repair_time = gc_before),
|
||||
allowing the tests to actually exercise the GC eligibility path without sleeping.
|
||||
"""
|
||||
cmdline = ['--logger-log-level', 'repair=debug']
|
||||
if extra_cmdline:
|
||||
cmdline += extra_cmdline
|
||||
servers, cql, hosts, ks, table_id = await create_table_insert_data_for_repair(
|
||||
manager, nr_keys=0, cmdline=cmdline, tablets=tablets,
|
||||
disable_flush_cache_time=True)
|
||||
# Lower propagation_delay to 0 so gc_before = repair_time, making tombstones
|
||||
# GC-eligible immediately after a successful repair rather than 1h later.
|
||||
await cql.run_async(
|
||||
f"ALTER TABLE {ks}.test WITH tombstone_gc = {{'mode': 'repair', 'propagation_delay_in_seconds': '0'}}"
|
||||
)
|
||||
logs = [await manager.server_open_log(s.server_id) for s in servers]
|
||||
return servers, cql, hosts, ks, table_id, logs
|
||||
|
||||
|
||||
async def _trigger_repaired_compaction(manager, server, ks):
|
||||
"""Force a compaction that operates on the repaired sstable set."""
|
||||
await manager.api.keyspace_compaction(server.ip_addr, ks, "test")
|
||||
|
||||
|
||||
async def _assert_key_visible(cql, ks, key, hosts, *, msg=""):
|
||||
"""Assert that key is readable on every replica (by querying each host directly)."""
|
||||
for h in hosts:
|
||||
rows = await cql.run_async(
|
||||
f"SELECT pk FROM {ks}.test WHERE pk = {key}",
|
||||
host=h)
|
||||
assert rows, f"Key {key} not found on host {h} after tombstone GC compaction. {msg}"
|
||||
|
||||
|
||||
async def _assert_key_deleted(cql, ks, key, hosts, *, msg=""):
|
||||
"""Assert that key is not visible (tombstone wins) on every replica."""
|
||||
for h in hosts:
|
||||
rows = await cql.run_async(
|
||||
f"SELECT pk FROM {ks}.test WHERE pk = {key}",
|
||||
host=h)
|
||||
assert not rows, f"Key {key} unexpectedly visible on host {h} — data resurrection! {msg}"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_tombstone_gc_no_resurrection_basic_ordering(manager: ManagerClient):
|
||||
"""Verify that the ordering guarantee prevents premature tombstone GC.
|
||||
|
||||
With propagation_delay=0, gc_before = repair_time. T.deletion_time (wall-clock
|
||||
time of DELETE, just before repair starts) < repair_time = gc_before, so T IS
|
||||
GC-eligible after repair. D (CQL timestamp=1) is in the repaired set because it
|
||||
was captured in the repairing snapshot and promoted before repair_time was committed.
|
||||
Compaction on the repaired set must NOT purge T because D (min_live_timestamp=1)
|
||||
makes T non-purgeable (T.timestamp=2 > max_purgeable=1).
|
||||
|
||||
Scenario:
|
||||
- D written at ts=1 to all replicas, flushed.
|
||||
- T (row deletion, ts=2) written to all replicas, flushed.
|
||||
- Repair runs: both D and T move repairing → repaired. repair_time advances.
|
||||
- gc_before = repair_time > T.deletion_time → T is GC-eligible.
|
||||
- Repaired compaction must NOT purge T because D (in repaired) prevents it.
|
||||
- Key must remain deleted.
|
||||
"""
|
||||
servers, cql, hosts, ks, table_id, logs = await _setup_tombstone_gc_cluster(manager)
|
||||
|
||||
# D at ts=1, T at ts=2 (T wins over D, correctly deletes the row).
|
||||
key = 42
|
||||
await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({key}, 1) USING TIMESTAMP 1")
|
||||
for s in servers:
|
||||
await manager.api.flush_keyspace(s.ip_addr, ks)
|
||||
|
||||
await cql.run_async(f"DELETE FROM {ks}.test USING TIMESTAMP 2 WHERE pk = {key}")
|
||||
for s in servers:
|
||||
await manager.api.flush_keyspace(s.ip_addr, ks)
|
||||
|
||||
# Both D and T captured in repairing, then promoted to repaired. repair_time advances.
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", "all", incremental_mode='incremental')
|
||||
|
||||
# With propagation_delay=0: gc_before = repair_time > T.deletion_time → T is GC-eligible.
|
||||
# Compaction on the repaired set: D is also in repaired (promoted before repair_time
|
||||
# was committed), so max_purgeable = D.timestamp = 1 < T.timestamp = 2 → T not purgeable.
|
||||
for s in servers:
|
||||
await _trigger_repaired_compaction(manager, s, ks)
|
||||
|
||||
# T must not have been GC'd; key must remain deleted (not resurrected).
|
||||
await _assert_key_deleted(cql, ks, key, hosts,
|
||||
msg="T was prematurely GC'd on repaired compaction (D in repaired should prevent it)")
|
||||
|
||||
logger.info("test_tombstone_gc_no_resurrection_basic_ordering: PASSED")
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_tombstone_gc_no_resurrection_hints_flush_failure(manager: ManagerClient):
|
||||
"""Verify that repair_time stays at epoch when hints flush fails, so tombstones
|
||||
are never GC-eligible after such a repair and data resurrection cannot occur.
|
||||
|
||||
With propagation_delay=0, gc_before = repair_time. When hints flush fails,
|
||||
repair_time is set to epoch (gc_clock::time_point{}) by the repair framework
|
||||
(flush_time stays at epoch because hints_batchlog_flushed=False). Therefore
|
||||
gc_before = epoch, T.deletion_time ≈ now >> epoch, T is never GC-eligible, and
|
||||
compaction cannot purge T regardless of what is in the repaired set.
|
||||
|
||||
Scenario:
|
||||
- D and T written to all replicas and flushed.
|
||||
- Repair runs with injection that makes hints flush fail on servers[2].
|
||||
- repair_time must stay at epoch (guard: hints_batchlog_flushed=False).
|
||||
- Compaction on the repaired set: T not GC-eligible → key stays deleted.
|
||||
"""
|
||||
servers, cql, hosts, ks, table_id, logs = await _setup_tombstone_gc_cluster(manager)
|
||||
|
||||
key = 99
|
||||
|
||||
# Write D at ts=1 and T at ts=2 to all nodes so everyone has the tombstone.
|
||||
await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({key}, 1) USING TIMESTAMP 1")
|
||||
await cql.run_async(f"DELETE FROM {ks}.test USING TIMESTAMP 2 WHERE pk = {key}")
|
||||
for s in servers:
|
||||
await manager.api.flush_keyspace(s.ip_addr, ks)
|
||||
|
||||
# Read the initial repair_time so we can verify it doesn't meaningfully advance.
|
||||
initial_repair_times = await load_tablet_repair_time(cql, hosts, table_id)
|
||||
|
||||
# Inject: make batchlog manager appear uninitialized on servers[2] so hints flush fails.
|
||||
await manager.api.enable_injection(servers[2].ip_addr, "repair_flush_hints_batchlog_handler_bm_uninitialized", one_shot=False)
|
||||
|
||||
try:
|
||||
# Repair warns about hints flush failure but continues (repair.cc outer catch).
|
||||
# flush_time stays at epoch so repair_time will be set to epoch in system.tablets.
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", "all", incremental_mode='incremental')
|
||||
except Exception:
|
||||
pass # repair may report failure; we care about side-effects only
|
||||
finally:
|
||||
await manager.api.disable_injection(servers[2].ip_addr, "repair_flush_hints_batchlog_handler_bm_uninitialized")
|
||||
|
||||
# repair_time must NOT have advanced to a meaningful time (it stays at epoch when
|
||||
# hints flush fails, so gc_before = epoch and T is never GC-eligible).
|
||||
new_repair_times = await load_tablet_repair_time(cql, hosts, table_id)
|
||||
for token, old_time in initial_repair_times.items():
|
||||
new_time = new_repair_times.get(token)
|
||||
assert new_time == old_time or new_time is None, \
|
||||
f"repair_time advanced for token {token} despite hints flush failure: " \
|
||||
f"{old_time} → {new_time}"
|
||||
|
||||
# Trigger compaction on the repaired set; T is not GC-eligible (gc_before = epoch).
|
||||
for s in servers:
|
||||
await _trigger_repaired_compaction(manager, s, ks)
|
||||
|
||||
# Key must remain deleted (T not GC'd).
|
||||
await _assert_key_deleted(cql, ks, key, hosts,
|
||||
msg="T was prematurely GC'd despite hints flush failure (repair_time should be epoch)")
|
||||
|
||||
logger.info("test_tombstone_gc_no_resurrection_hints_flush_failure: PASSED")
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_tombstone_gc_no_resurrection_propagation_delay(manager: ManagerClient):
|
||||
"""Verify the ordering guarantee when D arrives via hint flush just before repair.
|
||||
|
||||
D has an old CQL timestamp (ts_d = now - 2h) simulating a write that was delayed
|
||||
by the propagation window. T has a higher CQL timestamp (ts_t = now - 90min) so T
|
||||
correctly shadows D. With propagation_delay=0, T becomes GC-eligible as soon as
|
||||
repair_time is committed (T.deletion_time < repair_time = gc_before).
|
||||
|
||||
The invariant being tested: mark_sstable_as_repaired() runs on all replicas BEFORE
|
||||
the coordinator commits repair_time to Raft. So when gc_before advances (repair_time
|
||||
committed), D is already in the repaired set on all replicas. Repaired compaction
|
||||
must NOT purge T because D (min_live_timestamp = ts_d < ts_t) is present there.
|
||||
|
||||
Scenario:
|
||||
- servers[2] stopped; D and T written (go to hints on coordinator for servers[2]).
|
||||
- D and T flushed on servers[0] and servers[1].
|
||||
- servers[2] restarted.
|
||||
- Repair: hints flush delivers D and T to servers[2] before the repairing snapshot.
|
||||
mark_sstable_as_repaired() promotes D and T to repaired on servers[2].
|
||||
repair_time then committed. gc_before = repair_time > T.deletion_time.
|
||||
- Repaired compaction: D (ts_d) in repaired prevents T (ts_t > ts_d) from being GC'd.
|
||||
- Key must remain deleted.
|
||||
"""
|
||||
servers, cql, hosts, ks, table_id, logs = await _setup_tombstone_gc_cluster(
|
||||
manager, extra_cmdline=['--hinted-handoff-enabled', '1'])
|
||||
|
||||
# Stop servers[2] so writes to it are queued as hints on the coordinator.
|
||||
await manager.server_stop_gracefully(servers[2].server_id)
|
||||
|
||||
key = 77
|
||||
# CQL USING TIMESTAMP is in microseconds since epoch.
|
||||
now_us = int(time.time() * 1e6)
|
||||
ts_d = now_us - int(2 * 3600 * 1e6) # 2 hours ago (CQL µs timestamp)
|
||||
ts_t = now_us - int(90 * 60 * 1e6) # 90 minutes ago (CQL µs timestamp); ts_t > ts_d
|
||||
|
||||
# D and T go to servers[0] and servers[1] directly; servers[2] gets hints queued.
|
||||
await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({key}, 1) USING TIMESTAMP {ts_d}")
|
||||
await cql.run_async(f"DELETE FROM {ks}.test USING TIMESTAMP {ts_t} WHERE pk = {key}")
|
||||
|
||||
for s in [servers[0], servers[1]]:
|
||||
await manager.api.flush_keyspace(s.ip_addr, ks)
|
||||
|
||||
# Restart servers[2]; D and T are not on its disk yet.
|
||||
await manager.server_start(servers[2].server_id)
|
||||
await manager.servers_see_each_other(servers)
|
||||
await reconnect_driver(manager)
|
||||
cql = manager.get_cql()
|
||||
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
|
||||
# Repair: hints flush sends D and T to servers[2] before the repairing snapshot.
|
||||
# After row sync, mark_sstable_as_repaired() promotes D and T to repaired on servers[2].
|
||||
# Only then does the coordinator commit repair_time to Raft.
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", "all", incremental_mode='incremental')
|
||||
|
||||
# With propagation_delay=0: gc_before = repair_time > T.deletion_time → T is GC-eligible.
|
||||
# But D (ts_d < ts_t) is already in repaired on servers[2], so max_purgeable = ts_d
|
||||
# < ts_t = T.timestamp → T is NOT purgeable.
|
||||
for s in servers:
|
||||
await _trigger_repaired_compaction(manager, s, ks)
|
||||
|
||||
# Key must remain deleted (T not GC'd, D not resurrected).
|
||||
await _assert_key_deleted(cql, ks, key, hosts,
|
||||
msg="Data resurrection: T was GC'd despite D being present in repaired set")
|
||||
|
||||
logger.info("test_tombstone_gc_no_resurrection_propagation_delay: PASSED")
|
||||
|
||||
Reference in New Issue
Block a user