diff --git a/test/cluster/auth_cluster/test_raft_service_levels.py b/test/cluster/auth_cluster/test_raft_service_levels.py index 890b7c9406..77fc6c3bb8 100644 --- a/test/cluster/auth_cluster/test_raft_service_levels.py +++ b/test/cluster/auth_cluster/test_raft_service_levels.py @@ -11,12 +11,14 @@ from test.pylib.rest_client import get_host_api_address, read_barrier from test.pylib.util import unique_name, wait_for_cql_and_get_hosts from test.pylib.manager_client import ManagerClient from test.cluster.util import trigger_snapshot, wait_until_topology_upgrade_finishes, enter_recovery_state, reconnect_driver, \ - delete_raft_topology_state, delete_raft_data_and_upgrade_state, wait_until_upgrade_finishes, wait_for_token_ring_and_group0_consistency, wait_until_driver_service_level_created, get_topology_coordinator, find_server_by_host_id + delete_raft_topology_state, delete_raft_data_and_upgrade_state, wait_until_upgrade_finishes, \ + wait_for_token_ring_and_group0_consistency, wait_until_driver_service_level_created, get_topology_coordinator, \ + find_server_by_host_id from test.cluster.conftest import skip_mode from test.cqlpy.test_service_levels import MAX_USER_SERVICE_LEVELS from cassandra import ConsistencyLevel from cassandra.query import SimpleStatement -from cassandra.protocol import InvalidRequest +from cassandra.protocol import InvalidRequest, QueryMessage from cassandra.auth import PlainTextAuthProvider from test.cluster.auth_cluster import extra_scylla_config_options as auth_config @@ -575,3 +577,75 @@ async def test_driver_service_creation_failure(manager: ManagerClient) -> None: service_levels = await cql.run_async("LIST ALL SERVICE LEVELS", host=host) service_level_names = [sl.service_level for sl in service_levels] assert "driver" not in service_level_names + +def get_processed_tasks_for_group(metrics, group): + res = metrics.get("scylla_scheduler_tasks_processed", {'group': group}) + if res is None: + return 0 + return res + +@pytest.mark.asyncio +async def _verify_tasks_processed_metrics(manager, server, used_group, unused_group, func): + number_of_requests = 1000 + + def get_processed_tasks_for_group(metrics, group): + res = metrics.get("scylla_scheduler_tasks_processed", {'group': group}) + if res is None: + return 0 + return res + + metrics = await manager.metrics.query(server.ip_addr) + initial_tasks_processed_by_used_group = get_processed_tasks_for_group(metrics, used_group) + initial_tasks_processed_by_unused_group = get_processed_tasks_for_group(metrics, unused_group) + + await asyncio.gather(*[asyncio.to_thread(func) for i in range(number_of_requests)]) + + metrics = await manager.metrics.query(server.ip_addr) + assert get_processed_tasks_for_group(metrics, used_group) - initial_tasks_processed_by_used_group > number_of_requests + assert get_processed_tasks_for_group(metrics, unused_group) - initial_tasks_processed_by_unused_group < number_of_requests + +@pytest.mark.asyncio +async def test_driver_service_level_not_used_for_user_queries(manager: ManagerClient) -> None: + server = await manager.server_add(config=auth_config) + + cql = manager.get_cql() + [h] = await wait_for_cql_and_get_hosts(cql, [server], time.time() + 60) + await wait_for_token_ring_and_group0_consistency(manager, time.time() + 30) + + func = lambda: cql.execute(f"SELECT * from system.peers") + await _verify_tasks_processed_metrics(manager, server, 'sl:default', 'sl:driver', func) + + await cql.run_async(f"CREATE SERVICE LEVEL test", host=h) + await cql.run_async(f"ATTACH SERVICE LEVEL test TO cassandra", host=h) + + func = lambda: cql.execute(f"SELECT * from system.peers") + await _verify_tasks_processed_metrics(manager, server, 'sl:test', 'sl:driver', func) + +@pytest.mark.asyncio +async def test_driver_service_level_used_for_driver_queries(manager: ManagerClient) -> None: + server = await manager.server_add(config=auth_config) + + cql = manager.get_cql() + [h] = await wait_for_cql_and_get_hosts(cql, [server], time.time() + 60) + await wait_for_token_ring_and_group0_consistency(manager, time.time() + 30) + + await cql.run_async(f"CREATE SERVICE LEVEL test", host=h) + await cql.run_async(f"ATTACH SERVICE LEVEL test TO cassandra", host=h) + + async def get_control_connection_query_function(manager): + await manager.driver_connect() # restart control connection + cql = manager.get_cql() + control_connection = cql.cluster.control_connection._connection + query = QueryMessage("select * from system.peers", 1) + return cql, lambda: control_connection.wait_for_response(query) + + cql, func = await get_control_connection_query_function(manager) + await _verify_tasks_processed_metrics(manager, server, 'sl:driver', 'sl:test', func) + + await cql.run_async(f"DROP SERVICE LEVEL driver", host=h) + cql, func = await get_control_connection_query_function(manager) + await _verify_tasks_processed_metrics(manager, server, 'sl:test', 'sl:driver', func) + + await cql.run_async(f"CREATE SERVICE LEVEL driver", host=h) + cql, func = await get_control_connection_query_function(manager) + await _verify_tasks_processed_metrics(manager, server, 'sl:driver', 'sl:test', func)