topology/test_tls: test_upgrade_to_ssl: use new_test_keyspace

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
Benny Halevy
2025-01-19 07:48:02 +02:00
parent 0fd1b846fe
commit a66ddb7c04

View File

@@ -6,6 +6,7 @@
from test.pylib.manager_client import ManagerClient
from cassandra.connection import ConnectionShutdown
from test.topology.util import new_test_keyspace
import asyncio
import logging
@@ -24,79 +25,77 @@ async def test_upgrade_to_ssl(manager: ManagerClient) -> None:
"all": [7001],
}
ks = 'ks'
cf = 'cf'
servers = await manager.running_servers()
cql = manager.get_cql()
await cql.run_async(f"CREATE KEYSPACE {ks} WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 3}}")
await cql.run_async(f"CREATE TABLE {ks}.{cf} (pk int PRIMARY KEY) WITH tombstone_gc = {{'mode': 'immediate'}}")
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.{cf} (pk int PRIMARY KEY) WITH tombstone_gc = {{'mode': 'immediate'}}")
async def update_config_and_restart(mode):
for srv in servers:
# get the log and current pos
log = await manager.server_open_log(srv.server_id)
mark = await log.mark()
# stop one server
await manager.server_stop_gracefully(srv.server_id)
# change internode encryption
seo = (await manager.server_get_config(srv.server_id))['server_encryption_options']
seo['internode_encryption'] = mode
await manager.server_update_config(srv.server_id, "server_encryption_options", seo)
# restart
await manager.server_start(srv.server_id)
# now check we get the expected messaging server listening lines in log
expected_ports = mode2ports[mode]
pattern = "|".join(["port " + str(port) for port in expected_ports])
res = await log.grep(pattern, from_mark=mark)
assert len(res) == len(expected_ports), \
f"The listened ports are not same as expected! " \
f"Expected ports: {expected_ports}\nReal listened ports: {res}"
async def update_config_and_restart(mode):
for srv in servers:
# get the log and current pos
log = await manager.server_open_log(srv.server_id)
mark = await log.mark()
# stop one server
await manager.server_stop_gracefully(srv.server_id)
# change internode encryption
seo = (await manager.server_get_config(srv.server_id))['server_encryption_options']
seo['internode_encryption'] = mode
await manager.server_update_config(srv.server_id, "server_encryption_options", seo)
# restart
await manager.server_start(srv.server_id)
# now check we get the expected messaging server listening lines in log
expected_ports = mode2ports[mode]
pattern = "|".join(["port " + str(port) for port in expected_ports])
res = await log.grep(pattern, from_mark=mark)
assert len(res) == len(expected_ports), \
f"The listened ports are not same as expected! " \
f"Expected ports: {expected_ports}\nReal listened ports: {res}"
async def reconnect():
manager.driver_close()
await manager.driver_connect()
return manager.get_cql()
async def reconnect():
manager.driver_close()
await manager.driver_connect()
return manager.get_cql()
async def run_retry_async(stmt : str):
lcql = cql
while True:
try:
await lcql.run_async(stmt)
return
except ConnectionShutdown:
lcql = await reconnect();
async def run_retry_async(stmt : str):
lcql = cql
while True:
try:
await lcql.run_async(stmt)
return
except ConnectionShutdown:
lcql = await reconnect();
# iterate from none to all and back
for mode in ["none", "transitional", "all", "transitional", "none"]:
# run a bunch of inserts in background. TODO: have something akin to cassandra-stress
# we can run in separate thread/process to really guarantee parallelism.
go_on = True
# iterate from none to all and back
for mode in ["none", "transitional", "all", "transitional", "none"]:
# run a bunch of inserts in background. TODO: have something akin to cassandra-stress
# we can run in separate thread/process to really guarantee parallelism.
go_on = True
async def write_in_background():
count = 0;
while go_on:
await run_retry_async(f"INSERT INTO {ks}.{cf} (pk) VALUES ({count});")
count = count + 1
return count
async def write_in_background():
count = 0;
while go_on:
await run_retry_async(f"INSERT INTO {ks}.{cf} (pk) VALUES ({count});")
count = count + 1
return count
# f = asyncio.gather(
# *[run_retry_async(f"INSERT INTO {ks}.{cf} (pk) VALUES ({k});") for k in range(count)]
# )
f = write_in_background()
# f = asyncio.gather(
# *[run_retry_async(f"INSERT INTO {ks}.{cf} (pk) VALUES ({k});") for k in range(count)]
# )
f = write_in_background()
# do a rolling restart, updating the internode_encryption mode
await update_config_and_restart(mode)
go_on = False
# wait for the writes to finish
count = await f
cql = await reconnect()
# check writes completed even though we are so very rolling
await cql.run_async(f"SELECT COUNT(*) FROM {ks}.{cf}")
assert count == (await cql.run_async(f"SELECT COUNT(*) FROM {ks}.{cf}"))[0].count
# and drop data
await cql.run_async(f"TRUNCATE {ks}.{cf}")
# do a rolling restart, updating the internode_encryption mode
await update_config_and_restart(mode)
go_on = False
# wait for the writes to finish
count = await f
cql = await reconnect()
# check writes completed even though we are so very rolling
await cql.run_async(f"SELECT COUNT(*) FROM {ks}.{cf}")
assert count == (await cql.run_async(f"SELECT COUNT(*) FROM {ks}.{cf}"))[0].count
# and drop data
await cql.run_async(f"TRUNCATE {ks}.{cf}")
await cql.run_async(f"DROP TABLE {ks}.{cf};")
await cql.run_async(f"DROP KEYSPACE {ks};")
await cql.run_async(f"DROP TABLE {ks}.{cf};")