From c098e9a327e00843d0d5c3b1cfc2ffd1e6aaef52 Mon Sep 17 00:00:00 2001 From: Michael Litvak Date: Mon, 3 Feb 2025 10:23:03 +0200 Subject: [PATCH] test/test_view_build_status: fix flaky asserts In few test cases of test_view_build_status we create a view, wait for it and then query the view_build_status table and expect it to have all rows for each node and view. But it may fail because it could happen that the wait_for_view query and the following queries are done on different nodes, and some of the nodes didn't apply all the table updates yet, so they have missing rows. To fix it, we change the assert to work in the eventual consistency sense, retrying until the number of rows is as expectd. Fixes scylladb/scylladb#22644 Closes scylladb/scylladb#22654 --- .../topology_custom/test_view_build_status.py | 57 +++++++++---------- 1 file changed, 26 insertions(+), 31 deletions(-) diff --git a/test/topology_custom/test_view_build_status.py b/test/topology_custom/test_view_build_status.py index 492ca8fc11..5865dcb179 100644 --- a/test/topology_custom/test_view_build_status.py +++ b/test/topology_custom/test_view_build_status.py @@ -51,6 +51,12 @@ async def view_is_built_v2(cql, ks_name, view_name, node_count, **kwargs): async def wait_for_view_v2(cql, ks_name, view_name, node_count, **kwargs): await wait_for(lambda: view_is_built_v2(cql, ks_name, view_name, node_count, **kwargs), time.time() + 60) +async def wait_for_row_count(cql, table, n, host): + async def row_count_is_n(): + cnt = (await cql.run_async(f"SELECT count(*) FROM {table}", host=host))[0].count + return cnt == n or None + await wait_for(row_count_is_n, time.time() + 60) + # Verify a new cluster uses the view_build_status_v2 table. # Create a materialized view and check that the view's build status # is stored in view_build_status_v2 and all nodes see all the other @@ -143,11 +149,11 @@ async def test_view_build_status_virtual_table(manager: ManagerClient): await cql.run_async(f"DROP MATERIALIZED VIEW {ks_name}.vt1") - async def view_rows_removed(host): - r = await cql.run_async("SELECT * FROM system.view_build_status_v2", host=host) - return (len(r) == node_count) or None - await asyncio.gather(*(wait_for(lambda: view_rows_removed(h), time.time() + 60) for h in hosts)) - await assert_v1_eq_v2() + async def view_rows_removed_and_v1_eq_v2(): + r1, r2 = await select_v1(), await select_v2() + if len(r2) == node_count and r1 == r2: + return True + await wait_for(view_rows_removed_and_v1_eq_v2, time.time() + 60) # Cluster with 3 nodes. # Create materialized views. Start new server and it should get a snapshot on bootstrap. @@ -217,8 +223,7 @@ async def test_view_build_status_migration_to_v2(request, manager: ManagerClient v = await get_view_builder_version(cql) assert v == 1 - result = await cql.run_async("SELECT * FROM system_distributed.view_build_status") - assert len(result) == 3 + await wait_for_row_count(cql, "system_distributed.view_build_status", 3, hosts[0]) result = await cql.run_async("SELECT * FROM system.view_build_status_v2") assert len(result) == 0 @@ -237,8 +242,7 @@ async def test_view_build_status_migration_to_v2(request, manager: ManagerClient await create_mv(cql, "vt2") await asyncio.gather(*(wait_for_view_v2(cql, "ks", "vt2", 3, host=h) for h in hosts)) - result = await cql.run_async("SELECT * FROM system.view_build_status_v2") - assert len(result) == 6 + await wait_for_row_count(cql, "system.view_build_status_v2", 6, hosts[0]) # Migrate the view_build_status table to v2 and write to the table during the migration. # The migration process goes through an intermediate stage where it writes to @@ -360,11 +364,7 @@ async def test_view_build_status_cleanup_on_remove_node(manager: ManagerClient): await create_mv(cql, "vt1") await create_mv(cql, "vt2") - async def row_count_is(cql, n, host): - result = await cql.run_async("SELECT * FROM system.view_build_status_v2", host=host) - return (len(result) == n) or None - - await wait_for(lambda: row_count_is(cql, node_count * 2, hosts[0]), time.time() + 60) + await wait_for_row_count(cql, "system.view_build_status_v2", node_count*2, hosts[0]) await manager.server_stop_gracefully(servers[-1].server_id) await manager.remove_node(servers[0].server_id, servers[-1].server_id) @@ -373,7 +373,7 @@ async def test_view_build_status_cleanup_on_remove_node(manager: ManagerClient): # The 2 rows belonging to the node that was removed, one for each view, should # be deleted from the table. - await wait_for(lambda: row_count_is(cql, (node_count - 1) * 2, hosts[0]), time.time() + 60) + await wait_for_row_count(cql, "system.view_build_status_v2", (node_count-1)*2, hosts[0]) # Replace a node and verify that the view_build_status has rows for the new node and # no rows for the old node @@ -381,18 +381,14 @@ async def test_view_build_status_cleanup_on_remove_node(manager: ManagerClient): async def test_view_build_status_with_replace_node(manager: ManagerClient): node_count = 4 servers = await manager.servers_add(node_count) - cql, _ = await manager.get_ready_cql(servers) + cql, hosts = await manager.get_ready_cql(servers) await create_keyspace(cql) await create_table(cql) await create_mv(cql, "vt1") await create_mv(cql, "vt2") - await wait_for_view(cql, "vt1", node_count) - await wait_for_view(cql, "vt2", node_count) - - result = await cql.run_async("SELECT * FROM system.view_build_status_v2") - assert len(result) == node_count * 2 + await wait_for_row_count(cql, "system.view_build_status_v2", node_count*2, hosts[1]) # replace a node removed_host_id = await manager.get_host_id(servers[0].server_id) @@ -402,6 +398,9 @@ async def test_view_build_status_with_replace_node(manager: ManagerClient): servers = servers[1:] added_host_id = await manager.get_host_id(servers[-1].server_id) + await manager.driver_connect(server=servers[1]) + cql = manager.get_cql() + # wait for the old node rows to be removed and new node rows to be added async def node_rows_replaced(): result = await cql.run_async(f"SELECT * FROM system.view_build_status_v2 WHERE host_id={removed_host_id} ALLOW FILTERING") @@ -452,13 +451,13 @@ async def test_view_build_status_migration_to_v2_with_cleanup(request, manager: await wait_for_view_v1(cql, "vt1", 4) - result = await cql.run_async("SELECT * FROM system_distributed.view_build_status") - assert len(result) == 4 + await wait_for_row_count(cql, "system_distributed.view_build_status", 4, hosts[0]) # Insert a row that doesn't correspond to an existing view, but does correspond to a known host. # This row should get cleaned during migration. + s0_host_id = await manager.get_host_id(servers[0].server_id) await cql.run_async(f"INSERT INTO system_distributed.view_build_status(keyspace_name, view_name, host_id, status) \ - VALUES ('ks', 'view_doesnt_exist', {result[0].host_id}, 'SUCCESS')") + VALUES ('ks', 'view_doesnt_exist', {s0_host_id}, 'SUCCESS')") # Remove the last node. the entry for this node in the view build status remains and it # corresponds now to an unknown node. The migration should remove it. @@ -483,7 +482,7 @@ async def test_view_build_status_migration_to_v2_with_cleanup(request, manager: # Verify that after migration we kept only the entries for the known nodes and views. async def rows_migrated(): - result = await cql.run_async("SELECT * FROM system.view_build_status_v2") + result = await cql.run_async("SELECT * FROM system.view_build_status_v2", host=hosts[0]) return (len(result) == 3) or None await wait_for(rows_migrated, time.time() + 60) @@ -518,10 +517,7 @@ async def test_migration_on_existing_raft_topology(request, manager: ManagerClie v = await get_view_builder_version(cql) assert v == 1 - async def _view_build_finished(): - result = await cql.run_async("SELECT * FROM system_distributed.view_build_status") - return len(result) == 3 - await wait_for(_view_build_finished, time.time() + 10, period=.5) + await wait_for_row_count(cql, "system_distributed.view_build_status", 3, hosts[0]) result = await cql.run_async("SELECT * FROM system.view_build_status_v2") assert len(result) == 0 @@ -540,8 +536,7 @@ async def test_migration_on_existing_raft_topology(request, manager: ManagerClie await create_mv(cql, "vt2") await asyncio.gather(*(wait_for_view_v2(cql, "ks", "vt2", 3, host=h) for h in hosts)) - result = await cql.run_async("SELECT * FROM system.view_build_status_v2") - assert len(result) == 6 + await wait_for_row_count(cql, "system.view_build_status_v2", 6, hosts[0]) # Check if there is no error logs from raft topology for srv in servers: