topology_custom/test_tablets_cql: use new_test_keyspace
And create_new_test_keyspace when we need drop to be explicit. Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
@@ -12,7 +12,7 @@ from cassandra.protocol import InvalidRequest
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.rest_client import inject_error_one_shot
|
||||
from test.topology.conftest import skip_mode
|
||||
from test.topology.util import disable_schema_agreement_wait
|
||||
from test.topology.util import disable_schema_agreement_wait, create_new_test_keyspace, new_test_keyspace
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -30,10 +30,10 @@ async def test_alter_dropped_tablets_keyspace(manager: ManagerClient) -> None:
|
||||
logger.info("starting a second node (the follower)")
|
||||
servers += [await manager.server_add(config=config)]
|
||||
|
||||
await manager.get_cql().run_async("create keyspace ks with "
|
||||
ks = await create_new_test_keyspace(manager.get_cql(), "with "
|
||||
"replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} and "
|
||||
"tablets = {'enabled': true}")
|
||||
await manager.get_cql().run_async("create table ks.t (pk int primary key)")
|
||||
await manager.get_cql().run_async(f"create table {ks}.t (pk int primary key)")
|
||||
|
||||
logger.info(f"injecting wait-after-topology-coordinator-gets-event into the leader node {servers[0]}")
|
||||
injection_handler = await inject_error_one_shot(manager.api, servers[0].ip_addr,
|
||||
@@ -43,7 +43,7 @@ async def test_alter_dropped_tablets_keyspace(manager: ManagerClient) -> None:
|
||||
res = await manager.get_cql().run_async("select data_center from system.local")
|
||||
# ALTER tablets KS only accepts a specific DC, it rejects the generic 'replication_factor' tag
|
||||
this_dc = res[0].data_center
|
||||
await manager.get_cql().run_async("alter keyspace ks "
|
||||
await manager.get_cql().run_async(f"alter keyspace {ks} "
|
||||
f"with replication = {{'class': 'NetworkTopologyStrategy', '{this_dc}': 1}}")
|
||||
|
||||
# by creating a task this way we ensure it's immediately executed, but we won't wait until it's completed
|
||||
@@ -56,16 +56,16 @@ async def test_alter_dropped_tablets_keyspace(manager: ManagerClient) -> None:
|
||||
logger.info(f"dropping KS from the follower node {servers[1]} so that the leader, which hangs on injected sleep, "
|
||||
f"wakes up with the drop applied")
|
||||
host = manager.get_cql().cluster.metadata.get_host(servers[1].ip_addr)
|
||||
await manager.get_cql().run_async("drop keyspace ks", host=host)
|
||||
await manager.get_cql().run_async(f"drop keyspace {ks}", host=host)
|
||||
|
||||
logger.info("Waking up the leader to continue processing ALTER with KS that doesn't exist (has been just dropped)")
|
||||
await injection_handler.message()
|
||||
|
||||
matches = await leader_log_file.grep("topology change coordinator fiber got error "
|
||||
"data_dictionary::no_such_keyspace \(Can't find a keyspace ks\)")
|
||||
f"data_dictionary::no_such_keyspace \(Can't find a keyspace {ks}\)")
|
||||
assert not matches
|
||||
|
||||
with pytest.raises(InvalidRequest, match="Can't ALTER keyspace ks, keyspace doesn't exist") as e:
|
||||
with pytest.raises(InvalidRequest, match=f"Can't ALTER keyspace {ks}, keyspace doesn't exist") as e:
|
||||
await task
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@@ -81,51 +81,53 @@ async def test_alter_tablets_keyspace_concurrent_modification(manager: ManagerCl
|
||||
logger.info("starting a second node (the follower)")
|
||||
servers += [await manager.server_add(config=config)]
|
||||
|
||||
await manager.get_cql().run_async("create keyspace ks with "
|
||||
async with new_test_keyspace(manager, "with "
|
||||
"replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} and "
|
||||
"tablets = {'initial': 2}")
|
||||
await manager.get_cql().run_async("create table ks.t (pk int primary key)")
|
||||
"tablets = {'initial': 2}") as ks:
|
||||
await manager.get_cql().run_async(f"create table {ks}.t (pk int primary key)")
|
||||
|
||||
logger.info(f"injecting wait-before-committing-rf-change-event into the leader node {servers[0]}")
|
||||
injection_handler = await inject_error_one_shot(manager.api, servers[0].ip_addr,
|
||||
'wait-before-committing-rf-change-event')
|
||||
logger.info(f"injecting wait-before-committing-rf-change-event into the leader node {servers[0]}")
|
||||
injection_handler = await inject_error_one_shot(manager.api, servers[0].ip_addr,
|
||||
'wait-before-committing-rf-change-event')
|
||||
|
||||
# ALTER tablets KS only accepts a specific DC, it rejects the generic 'replication_factor' tag
|
||||
res = await manager.get_cql().run_async("select data_center from system.local")
|
||||
this_dc = res[0].data_center
|
||||
# ALTER tablets KS only accepts a specific DC, it rejects the generic 'replication_factor' tag
|
||||
res = await manager.get_cql().run_async("select data_center from system.local")
|
||||
this_dc = res[0].data_center
|
||||
|
||||
async def alter_tablets_ks_without_waiting_to_complete():
|
||||
logger.info("scheduling ALTER KS to change the RF from 1 to 2")
|
||||
await manager.get_cql().run_async("alter keyspace ks "
|
||||
f"with replication = {{'class': 'NetworkTopologyStrategy', '{this_dc}': 2}}")
|
||||
async def alter_tablets_ks_without_waiting_to_complete():
|
||||
logger.info("scheduling ALTER KS to change the RF from 1 to 2")
|
||||
await manager.get_cql().run_async(f"alter keyspace {ks} "
|
||||
f"with replication = {{'class': 'NetworkTopologyStrategy', '{this_dc}': 2}}")
|
||||
|
||||
# by creating a task this way we ensure it's immediately executed,
|
||||
# but we don't want to wait until the task is completed here,
|
||||
# because we want to do something else in the meantime
|
||||
task = asyncio.create_task(alter_tablets_ks_without_waiting_to_complete())
|
||||
# by creating a task this way we ensure it's immediately executed,
|
||||
# but we don't want to wait until the task is completed here,
|
||||
# because we want to do something else in the meantime
|
||||
task = asyncio.create_task(alter_tablets_ks_without_waiting_to_complete())
|
||||
|
||||
logger.info(f"waiting for the leader node {servers[0]} to start handling the keyspace-rf-change request")
|
||||
leader_log_file = await manager.server_open_log(servers[0].server_id)
|
||||
await leader_log_file.wait_for("wait-before-committing-rf-change-event: waiting", timeout=10)
|
||||
logger.info(f"waiting for the leader node {servers[0]} to start handling the keyspace-rf-change request")
|
||||
leader_log_file = await manager.server_open_log(servers[0].server_id)
|
||||
await leader_log_file.wait_for("wait-before-committing-rf-change-event: waiting", timeout=10)
|
||||
|
||||
logger.info(f"creating another keyspace from the follower node {servers[1]} so that the leader, which hangs on injected sleep, "
|
||||
f"wakes up with a changed schema")
|
||||
host = manager.get_cql().cluster.metadata.get_host(servers[1].ip_addr)
|
||||
with disable_schema_agreement_wait(manager.get_cql()):
|
||||
await manager.get_cql().run_async("create keyspace ks2 with "
|
||||
"replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} "
|
||||
"and tablets = {'enabled': true}", host=host)
|
||||
logger.info(f"creating another keyspace from the follower node {servers[1]} so that the leader, which hangs on injected sleep, "
|
||||
f"wakes up with a changed schema")
|
||||
host = manager.get_cql().cluster.metadata.get_host(servers[1].ip_addr)
|
||||
with disable_schema_agreement_wait(manager.get_cql()):
|
||||
ks2 = await create_new_test_keyspace(manager.get_cql(), "with "
|
||||
"replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} "
|
||||
"and tablets = {'enabled': true}", host=host)
|
||||
|
||||
logger.info("waking up the leader to continue processing ALTER on a changed schema, which should cause a retry")
|
||||
await injection_handler.message()
|
||||
logger.info("waking up the leader to continue processing ALTER on a changed schema, which should cause a retry")
|
||||
await injection_handler.message()
|
||||
|
||||
logger.info("waiting for ALTER to complete")
|
||||
await task
|
||||
logger.info("waiting for ALTER to complete")
|
||||
await task
|
||||
|
||||
# ensure that the concurrent modification error really did take place
|
||||
matches = await leader_log_file.grep("topology change coordinator fiber got group0_concurrent_modification")
|
||||
assert matches
|
||||
# ensure that the concurrent modification error really did take place
|
||||
matches = await leader_log_file.grep("topology change coordinator fiber got group0_concurrent_modification")
|
||||
assert matches
|
||||
|
||||
# ensure that the ALTER has eventually succeeded and we changed RF from 1 to 2
|
||||
res = manager.get_cql().execute(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = 'ks'")
|
||||
assert res[0].replication[this_dc] == '2'
|
||||
# ensure that the ALTER has eventually succeeded and we changed RF from 1 to 2
|
||||
res = manager.get_cql().execute(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'")
|
||||
assert res[0].replication[this_dc] == '2'
|
||||
|
||||
await manager.get_cql().run_async(f"drop keyspace {ks2}")
|
||||
|
||||
Reference in New Issue
Block a user