topology_custom/test_tablets_removenode: use create_new_test_keyspace
Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
@@ -14,12 +14,13 @@ import logging
|
||||
|
||||
from test.pylib.scylla_cluster import ReplaceConfig
|
||||
from test.pylib.util import start_writes
|
||||
from test.topology.util import create_new_test_keyspace
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
async def create_keyspace(cql, name, initial_tablets, rf):
|
||||
await cql.run_async(f"CREATE KEYSPACE {name} WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': {rf}}}"
|
||||
async def create_keyspace(cql, initial_tablets, rf):
|
||||
return await create_new_test_keyspace(cql, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': {rf}}}"
|
||||
f" AND tablets = {{'initial': {initial_tablets}}};")
|
||||
|
||||
|
||||
@@ -40,25 +41,25 @@ async def test_replace(manager: ManagerClient):
|
||||
|
||||
cql = manager.get_cql()
|
||||
|
||||
await create_keyspace(cql, "test", 32, rf=1)
|
||||
await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c int);")
|
||||
ks1 = await create_keyspace(cql, 32, rf=1)
|
||||
await cql.run_async(f"CREATE TABLE {ks1}.test (pk int PRIMARY KEY, c int);")
|
||||
|
||||
# We want RF=2 table to validate that quorum reads work after replacing node finishes
|
||||
# bootstrap which indicates that bootstrap waits for rebuilt.
|
||||
# Otherwise, some reads would fail to find a quorum.
|
||||
await create_keyspace(cql, "test2", 32, rf=2)
|
||||
await cql.run_async("CREATE TABLE test2.test (pk int PRIMARY KEY, c int);")
|
||||
ks2 = await create_keyspace(cql, 32, rf=2)
|
||||
await cql.run_async(f"CREATE TABLE {ks2}.test (pk int PRIMARY KEY, c int);")
|
||||
|
||||
await create_keyspace(cql, "test3", 32, rf=3)
|
||||
await cql.run_async("CREATE TABLE test3.test (pk int PRIMARY KEY, c int);")
|
||||
await cql.run_async("CREATE TABLE test3.test2 (pk int PRIMARY KEY, c int);")
|
||||
ks3 = await create_keyspace(cql, 32, rf=3)
|
||||
await cql.run_async(f"CREATE TABLE {ks3}.test (pk int PRIMARY KEY, c int);")
|
||||
await cql.run_async(f"CREATE TABLE {ks3}.test2 (pk int PRIMARY KEY, c int);")
|
||||
|
||||
logger.info("Populating table")
|
||||
|
||||
keys = range(256)
|
||||
await asyncio.gather(*[run_async_cl_all(cql, f"INSERT INTO test.test (pk, c) VALUES ({k}, {k});") for k in keys])
|
||||
await asyncio.gather(*[run_async_cl_all(cql, f"INSERT INTO test2.test (pk, c) VALUES ({k}, {k});") for k in keys])
|
||||
await asyncio.gather(*[run_async_cl_all(cql, f"INSERT INTO test3.test (pk, c) VALUES ({k}, {k});") for k in keys])
|
||||
await asyncio.gather(*[run_async_cl_all(cql, f"INSERT INTO {ks1}.test (pk, c) VALUES ({k}, {k});") for k in keys])
|
||||
await asyncio.gather(*[run_async_cl_all(cql, f"INSERT INTO {ks2}.test (pk, c) VALUES ({k}, {k});") for k in keys])
|
||||
await asyncio.gather(*[run_async_cl_all(cql, f"INSERT INTO {ks3}.test (pk, c) VALUES ({k}, {k});") for k in keys])
|
||||
|
||||
async def check_ks(ks):
|
||||
logger.info(f"Checking {ks}")
|
||||
@@ -71,8 +72,8 @@ async def test_replace(manager: ManagerClient):
|
||||
async def check():
|
||||
# RF=1 keyspace will experience data loss so don't check it.
|
||||
# We include it in the test only to check that the system doesn't crash.
|
||||
await check_ks("test2")
|
||||
await check_ks("test3")
|
||||
await check_ks(ks2)
|
||||
await check_ks(ks3)
|
||||
|
||||
await check()
|
||||
|
||||
@@ -80,7 +81,7 @@ async def test_replace(manager: ManagerClient):
|
||||
# See https://github.com/scylladb/scylladb/issues/16527
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
finish_writes = await start_writes(cql, "test3", "test2")
|
||||
finish_writes = await start_writes(cql, ks3, "test2")
|
||||
|
||||
logger.info('Replacing a node')
|
||||
await manager.server_stop(servers[0].server_id)
|
||||
@@ -89,7 +90,7 @@ async def test_replace(manager: ManagerClient):
|
||||
servers = servers[1:]
|
||||
|
||||
key_count = await finish_writes()
|
||||
stmt = SimpleStatement("SELECT * FROM test3.test2;", consistency_level=ConsistencyLevel.QUORUM)
|
||||
stmt = SimpleStatement(f"SELECT * FROM {ks3}.test2;", consistency_level=ConsistencyLevel.QUORUM)
|
||||
rows = await cql.run_async(stmt, all_pages=True)
|
||||
assert len(rows) == key_count
|
||||
for r in rows:
|
||||
@@ -105,7 +106,7 @@ async def test_replace(manager: ManagerClient):
|
||||
await manager.server_not_sees_other_server(servers[1].ip_addr, servers[0].ip_addr)
|
||||
await manager.server_not_sees_other_server(servers[2].ip_addr, servers[0].ip_addr)
|
||||
|
||||
await check_ks("test3")
|
||||
await check_ks(ks3)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@@ -119,37 +120,37 @@ async def test_removenode(manager: ManagerClient):
|
||||
cql = manager.get_cql()
|
||||
|
||||
# RF=1
|
||||
await create_keyspace(cql, "test", 32, rf=1)
|
||||
await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c int);")
|
||||
ks1 = await create_keyspace(cql, 32, rf=1)
|
||||
await cql.run_async(f"CREATE TABLE {ks1}.test (pk int PRIMARY KEY, c int);")
|
||||
|
||||
# RF=2
|
||||
await create_keyspace(cql, "test2", 32, rf=2)
|
||||
await cql.run_async("CREATE TABLE test2.test (pk int PRIMARY KEY, c int);")
|
||||
ks2 = await create_keyspace(cql, 32, rf=2)
|
||||
await cql.run_async(f"CREATE TABLE {ks2}.test (pk int PRIMARY KEY, c int);")
|
||||
|
||||
# RF=3
|
||||
await create_keyspace(cql, "test3", 32, rf=3)
|
||||
await cql.run_async("CREATE TABLE test3.test (pk int PRIMARY KEY, c int);")
|
||||
ks3 = await create_keyspace(cql, 32, rf=3)
|
||||
await cql.run_async(f"CREATE TABLE {ks3}.test (pk int PRIMARY KEY, c int);")
|
||||
|
||||
logger.info("Populating table")
|
||||
|
||||
keys = range(256)
|
||||
await asyncio.gather(*[run_async_cl_all(cql, f"INSERT INTO test.test (pk, c) VALUES ({k}, {k});") for k in keys])
|
||||
await asyncio.gather(*[run_async_cl_all(cql, f"INSERT INTO test2.test (pk, c) VALUES ({k}, {k});") for k in keys])
|
||||
await asyncio.gather(*[run_async_cl_all(cql, f"INSERT INTO test3.test (pk, c) VALUES ({k}, {k});") for k in keys])
|
||||
await asyncio.gather(*[run_async_cl_all(cql, f"INSERT INTO {ks1}.test (pk, c) VALUES ({k}, {k});") for k in keys])
|
||||
await asyncio.gather(*[run_async_cl_all(cql, f"INSERT INTO {ks2}.test (pk, c) VALUES ({k}, {k});") for k in keys])
|
||||
await asyncio.gather(*[run_async_cl_all(cql, f"INSERT INTO {ks3}.test (pk, c) VALUES ({k}, {k});") for k in keys])
|
||||
|
||||
async def check():
|
||||
# RF=1 table "test" will experience data loss so don't check it.
|
||||
# We include it to check that the system doesn't crash.
|
||||
|
||||
logger.info("Checking table test2")
|
||||
query = SimpleStatement("SELECT * FROM test2.test;", consistency_level=ConsistencyLevel.ONE)
|
||||
query = SimpleStatement(f"SELECT * FROM {ks2}.test;", consistency_level=ConsistencyLevel.ONE)
|
||||
rows = await cql.run_async(query)
|
||||
assert len(rows) == len(keys)
|
||||
for r in rows:
|
||||
assert r.c == r.pk
|
||||
|
||||
logger.info("Checking table test3")
|
||||
query = SimpleStatement("SELECT * FROM test3.test;", consistency_level=ConsistencyLevel.ONE)
|
||||
query = SimpleStatement(f"SELECT * FROM {ks3}.test;", consistency_level=ConsistencyLevel.ONE)
|
||||
rows = await cql.run_async(query)
|
||||
assert len(rows) == len(keys)
|
||||
for r in rows:
|
||||
@@ -182,17 +183,17 @@ async def test_removenode_with_ignored_node(manager: ManagerClient):
|
||||
|
||||
cql = manager.get_cql()
|
||||
|
||||
await create_keyspace(cql, "test", 32, rf=3)
|
||||
await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c int);")
|
||||
ks = await create_keyspace(cql, 32, rf=3)
|
||||
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
|
||||
|
||||
logger.info("Populating table")
|
||||
|
||||
keys = range(512)
|
||||
await asyncio.gather(*[run_async_cl_all(cql, f"INSERT INTO test.test (pk, c) VALUES ({k}, {k});") for k in keys])
|
||||
await asyncio.gather(*[run_async_cl_all(cql, f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in keys])
|
||||
|
||||
async def check():
|
||||
logger.info("Checking")
|
||||
query = SimpleStatement("SELECT * FROM test.test;", consistency_level=ConsistencyLevel.ONE)
|
||||
query = SimpleStatement(f"SELECT * FROM {ks}.test;", consistency_level=ConsistencyLevel.ONE)
|
||||
rows = await cql.run_async(query)
|
||||
assert len(rows) == len(keys)
|
||||
for r in rows:
|
||||
|
||||
Reference in New Issue
Block a user