test: add test for repair and resize finalization

Add test that checks whether repair does not start if there is an
ongoing resize finalization.

(cherry picked from commit 83c9af9670)
This commit is contained in:
Aleksandra Martyniuk
2025-05-16 18:08:29 +02:00
committed by GitHub Action
parent eb96ef8ce7
commit cbce0ed911
3 changed files with 47 additions and 1 deletions

View File

@@ -2283,7 +2283,8 @@ future<> repair_service::repair_tablets(repair_uniq_id rid, sstring keyspace_nam
}
table_id tid = t->schema()->id();
// Invoke group0 read barrier before obtaining erm pointer so that it sees all prior metadata changes
auto dropped = co_await streaming::table_sync_and_check(_db.local(), _mm, tid);
auto dropped = !utils::get_local_injector().enter("repair_tablets_no_sync") &&
co_await streaming::table_sync_and_check(_db.local(), _mm, tid);
if (dropped) {
rlogger.debug("repair[{}] Table {}.{} does not exist anymore", rid.uuid(), keyspace_name, table_name);
continue;

View File

@@ -1706,6 +1706,11 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
}
future<> handle_tablet_resize_finalization(group0_guard g) {
co_await utils::get_local_injector().inject("handle_tablet_resize_finalization_wait", [] (auto& handler) -> future<> {
rtlogger.info("handle_tablet_resize_finalization: waiting");
co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::seconds{60});
});
// Executes a global barrier to guarantee that any process (e.g. repair) holding stale version
// of token metadata will complete before we update topology.
auto guard = co_await global_tablet_token_metadata_barrier(std::move(g));

View File

@@ -1088,6 +1088,46 @@ async def test_tablet_split_finalization_with_migrations(manager: ManagerClient)
logger.info("Waiting for migrations to complete")
await log.wait_for("Tablet load balancer did not make any plan", migration_mark)
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_tablet_split_finalization_with_repair(manager: ManagerClient):
injection = "handle_tablet_resize_finalization_wait"
cfg = {
'enable_tablets': True,
'error_injections_at_startup': [
injection,
"repair_tablets_no_sync",
'short_tablet_stats_refresh_interval',
]
}
servers = await manager.servers_add(2, config=cfg)
cql = manager.get_cql()
await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 4};")
await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c int) WITH compaction = {'class': 'NullCompactionStrategy'};")
await asyncio.gather(*[cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({k}, {k%3});") for k in range(64)])
await manager.api.keyspace_flush(servers[0].ip_addr, "test", "test")
logs = [await manager.server_open_log(s.server_id) for s in servers]
marks = [await log.mark() for log in logs]
logger.info("Trigger split in table")
await cql.run_async("ALTER TABLE test.test WITH tablets = {'min_tablet_count': 8};")
logger.info("Wait for tablets to split")
done, pending = await asyncio.wait([asyncio.create_task(log.wait_for('handle_tablet_resize_finalization: waiting', from_mark=mark)) for log, mark in zip(logs, marks)], return_when=asyncio.FIRST_COMPLETED)
for task in pending:
task.cancel()
async def repair():
await manager.api.client.post(f"/storage_service/repair_async/test", host=servers[0].ip_addr)
async def check_repair_waits():
await logs[0].wait_for("Topology is busy, waiting for it to quiesce", from_mark=marks[0])
await manager.api.message_injection(servers[0].ip_addr, injection)
await asyncio.gather(repair(), check_repair_waits())
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_two_tablets_concurrent_repair_and_migration_repair_writer_level(manager: ManagerClient):