test: test_topology_recovery_basic: test CDC during recovery
In topology on raft, management of CDC generations is moved to the topology coordinator. We extend the topology recovery test to verify that the CDC keeps working correctly during the whole recovery process. In particular, we test that after restarting nodes in the recovery mode, they correctly use the active CDC generation created by the topology coordinator. A node restarting in the recovery mode should learn about the active generation from `system.cdc_local` (or from gossip, but we don't want to rely on it). Then, it should load its data from `system.cdc_generations_v3`. Fixes scylladb/scylladb#17409
This commit is contained in:
@@ -14,41 +14,72 @@ from test.pylib.util import wait_for_cql_and_get_hosts
|
||||
from test.topology.util import reconnect_driver, enter_recovery_state, \
|
||||
delete_raft_data_and_upgrade_state, log_run_time, wait_until_upgrade_finishes as wait_until_schema_upgrade_finishes, \
|
||||
wait_until_topology_upgrade_finishes, delete_raft_topology_state, wait_for_cdc_generations_publishing, \
|
||||
check_system_topology_and_cdc_generations_v3_consistency
|
||||
check_system_topology_and_cdc_generations_v3_consistency, start_writes_to_cdc_table, wait_until_last_generation_is_in_use
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@log_run_time
|
||||
async def test_topology_recovery_basic(request, manager: ManagerClient):
|
||||
servers = await manager.servers_add(3)
|
||||
async def test_topology_recovery_basic(request, mode: str, manager: ManagerClient):
|
||||
# Increase ring delay to ensure nodes learn about CDC generations before they start operating.
|
||||
cfg = {'ring_delay_ms': 15000 if mode == 'debug' else 5000}
|
||||
|
||||
servers = await manager.servers_add(3, config=cfg)
|
||||
cql = manager.cql
|
||||
assert(cql)
|
||||
|
||||
logging.info("Waiting until driver connects to every server")
|
||||
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
|
||||
restart_writes, stop_writes_and_verify = await start_writes_to_cdc_table(cql)
|
||||
|
||||
logging.info(f"Restarting hosts {hosts} in recovery mode")
|
||||
await asyncio.gather(*(enter_recovery_state(cql, h) for h in hosts))
|
||||
|
||||
# If we restarted nodes before the last generation was in use, some writes
|
||||
# could fail. After restart, nodes load only the last generation. If it's
|
||||
# not active yet, writes with lower timestamps would fail.
|
||||
await wait_until_last_generation_is_in_use(cql)
|
||||
|
||||
logging.debug("Sleeping for 1 second to make sure there are writes to the CDC table in all 3 generations")
|
||||
await asyncio.sleep(1)
|
||||
|
||||
# Restart sequentially, as it tests how nodes operating in legacy mode
|
||||
# react to raft topology mode nodes and vice versa
|
||||
await manager.rolling_restart(servers)
|
||||
|
||||
await stop_writes_and_verify()
|
||||
|
||||
cql = await reconnect_driver(manager)
|
||||
|
||||
logging.info("Cluster restarted, waiting until driver reconnects to every server")
|
||||
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
logging.info(f"Driver reconnected, hosts: {hosts}")
|
||||
|
||||
restart_writes(cql)
|
||||
|
||||
logging.info(f"Deleting Raft data and upgrade state on {hosts}")
|
||||
await asyncio.gather(*(delete_raft_topology_state(cql, h) for h in hosts))
|
||||
await asyncio.gather(*(delete_raft_data_and_upgrade_state(cql, h) for h in hosts))
|
||||
|
||||
logging.info(f"Restarting hosts {hosts}")
|
||||
await manager.rolling_restart(servers)
|
||||
|
||||
# FIXME: We must reconnect the driver before performing CQL queries below, for example
|
||||
# in wait_until_schema_upgrade_finishes. Unfortunately, it forces us to stop writing to
|
||||
# a CDC table first. Reconnecting the driver would close the session used to send the
|
||||
# writes, and some writes could time out on the client.
|
||||
# Once https://github.com/scylladb/python-driver/issues/295 is fixed, we can remove
|
||||
# all calls to reconnect_driver, restart_writes and leave only the last call to
|
||||
# stop_writes_and_verify.
|
||||
await stop_writes_and_verify()
|
||||
|
||||
cql = await reconnect_driver(manager)
|
||||
|
||||
logging.info("Cluster restarted, waiting until driver reconnects to every server")
|
||||
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
|
||||
restart_writes(cql)
|
||||
|
||||
logging.info("Waiting until upgrade to raft schema finishes")
|
||||
await asyncio.gather(*(wait_until_schema_upgrade_finishes(cql, h, time.time() + 60) for h in hosts))
|
||||
|
||||
@@ -73,7 +104,7 @@ async def test_topology_recovery_basic(request, manager: ManagerClient):
|
||||
await check_system_topology_and_cdc_generations_v3_consistency(manager, hosts)
|
||||
|
||||
logging.info("Booting new node")
|
||||
servers += [await manager.server_add()]
|
||||
servers += [await manager.server_add(config=cfg)]
|
||||
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
|
||||
logging.info("Waiting for the new CDC generation publishing")
|
||||
@@ -81,3 +112,11 @@ async def test_topology_recovery_basic(request, manager: ManagerClient):
|
||||
|
||||
logging.info("Checking consistency of data in system.topology and system.cdc_generations_v3")
|
||||
await check_system_topology_and_cdc_generations_v3_consistency(manager, hosts)
|
||||
|
||||
await wait_until_last_generation_is_in_use(cql)
|
||||
|
||||
logging.debug("Sleeping for 1 second to make sure there are writes to the CDC table in the last generation")
|
||||
await asyncio.sleep(1)
|
||||
|
||||
logging.info("Checking correctness of data in system_distributed.cdc_streams_descriptions_v2")
|
||||
await stop_writes_and_verify()
|
||||
|
||||
Reference in New Issue
Block a user