test: add test to verify use of sl:driver
`sl:driver` is expected to be used for new and control connections, but other connections that run user load should not use it after the user is authenticated. Refs: scylladb/scylladb#24411
This commit is contained in:
@@ -11,12 +11,14 @@ from test.pylib.rest_client import get_host_api_address, read_barrier
|
||||
from test.pylib.util import unique_name, wait_for_cql_and_get_hosts
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.cluster.util import trigger_snapshot, wait_until_topology_upgrade_finishes, enter_recovery_state, reconnect_driver, \
|
||||
delete_raft_topology_state, delete_raft_data_and_upgrade_state, wait_until_upgrade_finishes, wait_for_token_ring_and_group0_consistency, wait_until_driver_service_level_created, get_topology_coordinator, find_server_by_host_id
|
||||
delete_raft_topology_state, delete_raft_data_and_upgrade_state, wait_until_upgrade_finishes, \
|
||||
wait_for_token_ring_and_group0_consistency, wait_until_driver_service_level_created, get_topology_coordinator, \
|
||||
find_server_by_host_id
|
||||
from test.cluster.conftest import skip_mode
|
||||
from test.cqlpy.test_service_levels import MAX_USER_SERVICE_LEVELS
|
||||
from cassandra import ConsistencyLevel
|
||||
from cassandra.query import SimpleStatement
|
||||
from cassandra.protocol import InvalidRequest
|
||||
from cassandra.protocol import InvalidRequest, QueryMessage
|
||||
from cassandra.auth import PlainTextAuthProvider
|
||||
from test.cluster.auth_cluster import extra_scylla_config_options as auth_config
|
||||
|
||||
@@ -575,3 +577,75 @@ async def test_driver_service_creation_failure(manager: ManagerClient) -> None:
|
||||
service_levels = await cql.run_async("LIST ALL SERVICE LEVELS", host=host)
|
||||
service_level_names = [sl.service_level for sl in service_levels]
|
||||
assert "driver" not in service_level_names
|
||||
|
||||
def get_processed_tasks_for_group(metrics, group):
|
||||
res = metrics.get("scylla_scheduler_tasks_processed", {'group': group})
|
||||
if res is None:
|
||||
return 0
|
||||
return res
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def _verify_tasks_processed_metrics(manager, server, used_group, unused_group, func):
|
||||
number_of_requests = 1000
|
||||
|
||||
def get_processed_tasks_for_group(metrics, group):
|
||||
res = metrics.get("scylla_scheduler_tasks_processed", {'group': group})
|
||||
if res is None:
|
||||
return 0
|
||||
return res
|
||||
|
||||
metrics = await manager.metrics.query(server.ip_addr)
|
||||
initial_tasks_processed_by_used_group = get_processed_tasks_for_group(metrics, used_group)
|
||||
initial_tasks_processed_by_unused_group = get_processed_tasks_for_group(metrics, unused_group)
|
||||
|
||||
await asyncio.gather(*[asyncio.to_thread(func) for i in range(number_of_requests)])
|
||||
|
||||
metrics = await manager.metrics.query(server.ip_addr)
|
||||
assert get_processed_tasks_for_group(metrics, used_group) - initial_tasks_processed_by_used_group > number_of_requests
|
||||
assert get_processed_tasks_for_group(metrics, unused_group) - initial_tasks_processed_by_unused_group < number_of_requests
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_driver_service_level_not_used_for_user_queries(manager: ManagerClient) -> None:
|
||||
server = await manager.server_add(config=auth_config)
|
||||
|
||||
cql = manager.get_cql()
|
||||
[h] = await wait_for_cql_and_get_hosts(cql, [server], time.time() + 60)
|
||||
await wait_for_token_ring_and_group0_consistency(manager, time.time() + 30)
|
||||
|
||||
func = lambda: cql.execute(f"SELECT * from system.peers")
|
||||
await _verify_tasks_processed_metrics(manager, server, 'sl:default', 'sl:driver', func)
|
||||
|
||||
await cql.run_async(f"CREATE SERVICE LEVEL test", host=h)
|
||||
await cql.run_async(f"ATTACH SERVICE LEVEL test TO cassandra", host=h)
|
||||
|
||||
func = lambda: cql.execute(f"SELECT * from system.peers")
|
||||
await _verify_tasks_processed_metrics(manager, server, 'sl:test', 'sl:driver', func)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_driver_service_level_used_for_driver_queries(manager: ManagerClient) -> None:
|
||||
server = await manager.server_add(config=auth_config)
|
||||
|
||||
cql = manager.get_cql()
|
||||
[h] = await wait_for_cql_and_get_hosts(cql, [server], time.time() + 60)
|
||||
await wait_for_token_ring_and_group0_consistency(manager, time.time() + 30)
|
||||
|
||||
await cql.run_async(f"CREATE SERVICE LEVEL test", host=h)
|
||||
await cql.run_async(f"ATTACH SERVICE LEVEL test TO cassandra", host=h)
|
||||
|
||||
async def get_control_connection_query_function(manager):
|
||||
await manager.driver_connect() # restart control connection
|
||||
cql = manager.get_cql()
|
||||
control_connection = cql.cluster.control_connection._connection
|
||||
query = QueryMessage("select * from system.peers", 1)
|
||||
return cql, lambda: control_connection.wait_for_response(query)
|
||||
|
||||
cql, func = await get_control_connection_query_function(manager)
|
||||
await _verify_tasks_processed_metrics(manager, server, 'sl:driver', 'sl:test', func)
|
||||
|
||||
await cql.run_async(f"DROP SERVICE LEVEL driver", host=h)
|
||||
cql, func = await get_control_connection_query_function(manager)
|
||||
await _verify_tasks_processed_metrics(manager, server, 'sl:test', 'sl:driver', func)
|
||||
|
||||
await cql.run_async(f"CREATE SERVICE LEVEL driver", host=h)
|
||||
cql, func = await get_control_connection_query_function(manager)
|
||||
await _verify_tasks_processed_metrics(manager, server, 'sl:driver', 'sl:test', func)
|
||||
|
||||
Reference in New Issue
Block a user