topology_custom/test_view_build_status: use new_test_keyspace

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
Benny Halevy
2025-01-19 08:52:50 +02:00
parent 2d4af01281
commit b810791fbb

View File

@@ -12,7 +12,7 @@ from test.pylib.manager_client import ManagerClient
from test.pylib.scylla_cluster import ReplaceConfig
from test.pylib.internal_types import ServerInfo
from test.topology.util import trigger_snapshot, wait_until_topology_upgrade_finishes, enter_recovery_state, reconnect_driver, \
delete_raft_topology_state, delete_raft_data_and_upgrade_state, wait_until_upgrade_finishes, wait_for
delete_raft_topology_state, delete_raft_data_and_upgrade_state, wait_until_upgrade_finishes, wait_for, create_new_test_keyspace
from test.topology.conftest import skip_mode
from cassandra import ConsistencyLevel
from cassandra.query import SimpleStatement
@@ -22,15 +22,13 @@ from cassandra.protocol import InvalidRequest
logger = logging.getLogger(__name__)
async def create_keyspace(cql):
ks_name = 'ks'
await cql.run_async(f"CREATE KEYSPACE {ks_name} WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}}")
return ks_name
return await create_new_test_keyspace(cql, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}")
async def create_table(cql):
await cql.run_async(f"CREATE TABLE ks.t (p int, c int, PRIMARY KEY (p, c))")
async def create_table(cql, ks):
await cql.run_async(f"CREATE TABLE {ks}.t (p int, c int, PRIMARY KEY (p, c))")
async def create_mv(cql, view_name):
await cql.run_async(f"CREATE MATERIALIZED VIEW ks.{view_name} AS SELECT * FROM ks.t WHERE c IS NOT NULL and p IS NOT NULL PRIMARY KEY (c, p)")
async def create_mv(cql, ks, view_name):
await cql.run_async(f"CREATE MATERIALIZED VIEW {ks}.{view_name} AS SELECT * FROM {ks}.t WHERE c IS NOT NULL and p IS NOT NULL PRIMARY KEY (c, p)")
async def get_view_builder_version(cql, **kwargs):
result = await cql.run_async("SELECT value FROM system.scylla_local WHERE key='view_builder_version'", **kwargs)
@@ -72,11 +70,11 @@ async def test_view_build_status_v2_table(manager: ManagerClient):
v = await get_view_builder_version(cql, host=h)
assert v == 2
await create_keyspace(cql)
await create_table(cql)
await create_mv(cql, "vt")
ks = await create_keyspace(cql)
await create_table(cql, ks)
await create_mv(cql, ks, "vt")
await asyncio.gather(*(wait_for_view_v2(cql, 'ks', 'vt', node_count, host=h) for h in hosts))
await asyncio.gather(*(wait_for_view_v2(cql, ks, 'vt', node_count, host=h) for h in hosts))
# The table system_distributed.view_build_status is set to be a virtual table reading
# from system.view_build_status_v2, so verify that reading from each of them provides
@@ -107,16 +105,16 @@ async def test_view_build_status_virtual_table(manager: ManagerClient):
await wait_for(view_is_built, deadline)
ks_name = await create_keyspace(cql)
await create_table(cql)
await create_table(cql, ks_name)
await assert_v1_eq_v2()
await create_mv(cql, 'vt1')
await create_mv(cql, ks_name, 'vt1')
await asyncio.gather(*(wait_for_view_on_host(cql, 'vt1', node_count, h) for h in hosts))
await assert_v1_eq_v2()
assert len(await select_v2()) == node_count
await create_mv(cql, 'vt2')
await create_mv(cql, ks_name, 'vt2')
await asyncio.gather(*(wait_for_view_on_host(cql, 'vt2', node_count, h) for h in hosts))
await assert_v1_eq_v2()
assert len(await select_v2()) == node_count * 2
@@ -163,11 +161,11 @@ async def test_view_build_status_snapshot(manager: ManagerClient):
servers = await manager.servers_add(3)
cql, _ = await manager.get_ready_cql(servers)
await create_keyspace(cql)
await create_table(cql)
ks = await create_keyspace(cql)
await create_table(cql, ks)
await create_mv(cql, "vt1")
await create_mv(cql, "vt2")
await create_mv(cql, ks, "vt1")
await create_mv(cql, ks, "vt2")
for s in servers:
await manager.driver_connect(server=s)
@@ -215,9 +213,9 @@ async def test_view_build_status_migration_to_v2(request, manager: ManagerClient
status = await manager.api.raft_topology_upgrade_status(host.address)
assert status == "not_upgraded"
await create_keyspace(cql)
await create_table(cql)
await create_mv(cql, "vt1")
ks = await create_keyspace(cql)
await create_table(cql, ks)
await create_mv(cql, ks, "vt1")
# Verify we're using v1 now
v = await get_view_builder_version(cql)
@@ -239,8 +237,8 @@ async def test_view_build_status_migration_to_v2(request, manager: ManagerClient
await asyncio.gather(*(wait_for(lambda: view_builder_is_v2(cql, host=h), time.time() + 60) for h in hosts))
# Check that new writes are written to the v2 table
await create_mv(cql, "vt2")
await asyncio.gather(*(wait_for_view_v2(cql, "ks", "vt2", 3, host=h) for h in hosts))
await create_mv(cql, ks, "vt2")
await asyncio.gather(*(wait_for_view_v2(cql, ks, "vt2", 3, host=h) for h in hosts))
await wait_for_row_count(cql, "system.view_build_status_v2", 6, hosts[0])
@@ -267,13 +265,13 @@ async def test_view_build_status_migration_to_v2_with_write_during_migration(req
status = await manager.api.raft_topology_upgrade_status(host.address)
assert status == "not_upgraded"
await create_keyspace(cql)
await create_table(cql)
ks = await create_keyspace(cql)
await create_table(cql, ks)
inj_insert = "view_builder_pause_add_new_view"
await manager.api.enable_injection(servers[1].ip_addr, inj_insert, one_shot=True)
await create_mv(cql, "vt1")
await create_mv(cql, ks, "vt1")
# pause the migration between reading the old table and writing to the new table, so we have
# a time window where new writes may be lost.
@@ -300,7 +298,7 @@ async def test_view_build_status_migration_to_v2_with_write_during_migration(req
await asyncio.gather(*(wait_for(lambda: view_builder_is_v2(cql, host=h), time.time() + 60) for h in hosts))
await asyncio.gather(*(wait_for_view_v2(cql, 'ks', 'vt1', 3, host=h) for h in hosts))
await asyncio.gather(*(wait_for_view_v2(cql, ks, 'vt1', 3, host=h) for h in hosts))
# Migrate the view_build_status table to v2 while there is an 'old' write operation in progress.
# The migration should wait for the old operations to complete before continuing, otherwise
@@ -325,13 +323,13 @@ async def test_view_build_status_migration_to_v2_barrier(request, manager: Manag
status = await manager.api.raft_topology_upgrade_status(host.address)
assert status == "not_upgraded"
await create_keyspace(cql)
await create_table(cql)
ks = await create_keyspace(cql)
await create_table(cql, ks)
# Create MV and delay the write operation to the old table
inj_insert = "view_builder_pause_add_new_view"
await manager.api.enable_injection(servers[1].ip_addr, inj_insert, one_shot=True)
await create_mv(cql, "vt1")
await create_mv(cql, ks, "vt1")
# The upgrade should perform a barrier and wait for the delayed operation to complete before continuing.
logging.info("Triggering upgrade to raft topology")
@@ -349,7 +347,7 @@ async def test_view_build_status_migration_to_v2_barrier(request, manager: Manag
await asyncio.gather(*(wait_for(lambda: view_builder_is_v2(cql, host=h), time.time() + 60) for h in hosts))
await asyncio.gather(*(wait_for_view_v2(cql, 'ks', 'vt1', 3, host=h) for h in hosts))
await asyncio.gather(*(wait_for_view_v2(cql, ks, 'vt1', 3, host=h) for h in hosts))
# Test that when removing a node from the cluster, we clean its rows from
# the view build status table.
@@ -359,10 +357,10 @@ async def test_view_build_status_cleanup_on_remove_node(manager: ManagerClient):
servers = await manager.servers_add(node_count)
cql, hosts = await manager.get_ready_cql(servers)
await create_keyspace(cql)
await create_table(cql)
await create_mv(cql, "vt1")
await create_mv(cql, "vt2")
ks = await create_keyspace(cql)
await create_table(cql, ks)
await create_mv(cql, ks, "vt1")
await create_mv(cql, ks, "vt2")
await wait_for_row_count(cql, "system.view_build_status_v2", node_count*2, hosts[0])
@@ -383,10 +381,10 @@ async def test_view_build_status_with_replace_node(manager: ManagerClient):
servers = await manager.servers_add(node_count)
cql, hosts = await manager.get_ready_cql(servers)
await create_keyspace(cql)
await create_table(cql)
await create_mv(cql, "vt1")
await create_mv(cql, "vt2")
ks = await create_keyspace(cql)
await create_table(cql, ks)
await create_mv(cql, ks, "vt1")
await create_mv(cql, ks, "vt2")
await wait_for_row_count(cql, "system.view_build_status_v2", node_count*2, hosts[1])
@@ -446,8 +444,8 @@ async def test_view_build_status_migration_to_v2_with_cleanup(request, manager:
# Create a view. This will insert 4 entries to the view build status table, one for each node.
ks_name = await create_keyspace(cql)
await create_table(cql)
await create_mv(cql, "vt1")
await create_table(cql, ks_name)
await create_mv(cql, ks_name, "vt1")
await wait_for_view_v1(cql, "vt1", 4)
@@ -457,7 +455,7 @@ async def test_view_build_status_migration_to_v2_with_cleanup(request, manager:
# This row should get cleaned during migration.
s0_host_id = await manager.get_host_id(servers[0].server_id)
await cql.run_async(f"INSERT INTO system_distributed.view_build_status(keyspace_name, view_name, host_id, status) \
VALUES ('ks', 'view_doesnt_exist', {s0_host_id}, 'SUCCESS')")
VALUES ('{ks_name}', 'view_doesnt_exist', {s0_host_id}, 'SUCCESS')")
# Remove the last node. the entry for this node in the view build status remains and it
# corresponds now to an unknown node. The migration should remove it.
@@ -509,9 +507,9 @@ async def test_migration_on_existing_raft_topology(request, manager: ManagerClie
logging.info("Waiting until driver connects to every server")
cql, hosts = await manager.get_ready_cql(servers)
await create_keyspace(cql)
await create_table(cql)
await create_mv(cql, "vt1")
ks = await create_keyspace(cql)
await create_table(cql, ks)
await create_mv(cql, ks, "vt1")
# Verify we're using v1 now
v = await get_view_builder_version(cql)
@@ -533,8 +531,8 @@ async def test_migration_on_existing_raft_topology(request, manager: ManagerClie
await asyncio.gather(*(wait_for(lambda: view_builder_is_v2(cql, host=h), time.time() + 60) for h in hosts))
# Check that new writes are written to the v2 table
await create_mv(cql, "vt2")
await asyncio.gather(*(wait_for_view_v2(cql, "ks", "vt2", 3, host=h) for h in hosts))
await create_mv(cql, ks, "vt2")
await asyncio.gather(*(wait_for_view_v2(cql, ks, "vt2", 3, host=h) for h in hosts))
await wait_for_row_count(cql, "system.view_build_status_v2", 6, hosts[0])