test/cluster: fix flaky test_hints_consistency_during_replace
The test creates a sync point immediately after writing 100 rows
with CL=ANY, without waiting for pending hint writes to complete.
store_hint() is fire-and-forget: it submits do_store_hint() to a gate
and returns immediately. do_store_hint() updates _last_written_rp only
after writing to the commitlog. If create_sync_point() is called before
all do_store_hint() coroutines complete, the captured replay position
is stale, and await_sync_point() returns DONE before all hints are
replayed, leaving some rows missing.
Fix by waiting for the size_of_hints_in_progress metric to reach zero
before creating the sync point, ensuring all in-flight hint writes have
completed and _last_written_rp is up to date. This follows the same
pattern already used in test_sync_point.
Fixes: SCYLLADB-1709
Closes scylladb/scylladb#29623
(cherry picked from commit 7634d3f7d4)
Closes scylladb/scylladb#29632
This commit is contained in:
committed by
Botond Dénes
parent
60d5b3959d
commit
593a6da09c
@@ -283,6 +283,18 @@ async def test_hints_consistency_during_replace(manager: ManagerClient):
|
||||
# Write 100 rows with CL=ANY. Some of the rows will only be stored as hints because of RF=1
|
||||
for i in range(100):
|
||||
await cql.run_async(SimpleStatement(f"INSERT INTO {table} (pk, v) VALUES ({i}, {i + 1})", consistency_level=ConsistencyLevel.ANY))
|
||||
|
||||
# Hint writes are fire-and-forget (store_hint() submits do_store_hint()
|
||||
# asynchronously via a gate). Wait for all pending hint writes to complete
|
||||
# before creating the sync point, otherwise it may capture a stale
|
||||
# replay position and miss some hints.
|
||||
async def no_pending_hint_writes():
|
||||
size = await get_hint_metrics(manager.metrics, servers[0].ip_addr, "size_of_hints_in_progress")
|
||||
if size == 0:
|
||||
return True
|
||||
return None
|
||||
await wait_for(no_pending_hint_writes, time.time() + 30)
|
||||
|
||||
sync_point = await create_sync_point(manager.api.client, servers[0].ip_addr)
|
||||
|
||||
await manager.server_add(replace_cfg=ReplaceConfig(replaced_id = servers[2].server_id, reuse_ip_addr = False, use_host_id = True))
|
||||
|
||||
Reference in New Issue
Block a user