From 74542be5aa27e268bca1db3a566e29dc4aa25e26 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Fri, 3 Apr 2026 00:55:31 +0200 Subject: [PATCH] test: pylib: Ignore exceptions in wait_for() ManagerClient::get_ready_cql() calls server_sees_others(), which waits for servers to see each other as alive in gossip. If one of the servers is still early in boot, RESTful API call to "gossiper/endpoint/live" may fail. It throws an exception, which currently terminates the wait_for() and propagates up, failing the test. Fix this by ignoring errors when polling inside wait_for. In case of timeout, we log the last exception. This should fix the problem not only in this case, for all uses of wait_for(). Example output: ``` pred = ._sees_min_others at 0x7f022af9a140> deadline = 1775218828.9172852, period = 1.0, before_retry = None backoff_factor = 1.5, max_period = 1.0, label = None async def wait_for( pred: Callable[[], Awaitable[Optional[T]]], deadline: float, period: float = 0.1, before_retry: Optional[Callable[[], Any]] = None, backoff_factor: float = 1.5, max_period: float = 1.0, label: Optional[str] = None) -> T: tag = label or getattr(pred, '__name__', 'unlabeled') start = time.time() retries = 0 last_exception: Exception | None = None while True: elapsed = time.time() - start if time.time() >= deadline: timeout_msg = f"wait_for({tag}) timed out after {elapsed:.2f}s ({retries} retries)" if last_exception is not None: timeout_msg += ( f"; last exception: {type(last_exception).__name__}: {last_exception}" ) raise AssertionError(timeout_msg) from last_exception raise AssertionError(timeout_msg) try: > res = await pred() test/pylib/util.py:80: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ async def _sees_min_others(): > raise Exception("asd") E Exception: asd test/pylib/manager_client.py:802: Exception The above exception was the direct cause of the following exception: manager = @pytest.mark.asyncio async def test_auth_after_reset(manager: ManagerClient) -> None: servers = await manager.servers_add(3, config=auth_config, auto_rack_dc="dc1") > cql, _ = await manager.get_ready_cql(servers) test/cluster/auth_cluster/test_auth_after_reset.py:33: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ test/pylib/manager_client.py:137: in get_ready_cql await self.servers_see_each_other(servers) test/pylib/manager_client.py:820: in servers_see_each_other await asyncio.gather(*others) test/pylib/manager_client.py:806: in server_sees_others await wait_for(_sees_min_others, time() + interval, period=.5) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ pred = ._sees_min_others at 0x7f022af9a140> deadline = 1775218828.9172852, period = 1.0, before_retry = None backoff_factor = 1.5, max_period = 1.0, label = None async def wait_for( pred: Callable[[], Awaitable[Optional[T]]], deadline: float, period: float = 0.1, before_retry: Optional[Callable[[], Any]] = None, backoff_factor: float = 1.5, max_period: float = 1.0, label: Optional[str] = None) -> T: tag = label or getattr(pred, '__name__', 'unlabeled') start = time.time() retries = 0 last_exception: Exception | None = None while True: elapsed = time.time() - start if time.time() >= deadline: timeout_msg = f"wait_for({tag}) timed out after {elapsed:.2f}s ({retries} retries)" if last_exception is not None: timeout_msg += ( f"; last exception: {type(last_exception).__name__}: {last_exception}" ) > raise AssertionError(timeout_msg) from last_exception E AssertionError: wait_for(_sees_min_others) timed out after 45.30s (46 retries); last exception: Exception: asd test/pylib/util.py:76: AssertionError ``` Fixes a failure observed in test_auth_after_reset: ``` manager = @pytest.mark.asyncio async def test_auth_after_reset(manager: ManagerClient) -> None: servers = await manager.servers_add(3, config=auth_config, auto_rack_dc="dc1") cql, _ = await manager.get_ready_cql(servers) await cql.run_async("ALTER ROLE cassandra WITH PASSWORD = 'forgotten_pwd'") logging.info("Stopping cluster") await asyncio.gather(*[manager.server_stop_gracefully(server.server_id) for server in servers]) logging.info("Deleting sstables") for table in ["roles", "role_members", "role_attributes", "role_permissions"]: await asyncio.gather(*[manager.server_wipe_sstables(server.server_id, "system", table) for server in servers]) logging.info("Starting cluster") # Don't try connect to the servers yet, with deleted superuser it will be possible only after # quorum is reached. await asyncio.gather(*[manager.server_start(server.server_id, connect_driver=False) for server in servers]) logging.info("Waiting for CQL connection") await repeat_until_success(lambda: manager.driver_connect(auth_provider=PlainTextAuthProvider(username="cassandra", password="cassandra"))) > await manager.get_ready_cql(servers) test/cluster/auth_cluster/test_auth_after_reset.py:50: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ test/pylib/manager_client.py:137: in get_ready_cql await self.servers_see_each_other(servers) test/pylib/manager_client.py:819: in servers_see_each_other await asyncio.gather(*others) test/pylib/manager_client.py:805: in server_sees_others await wait_for(_sees_min_others, time() + interval, period=.5) test/pylib/util.py:71: in wait_for res = await pred() test/pylib/manager_client.py:802: in _sees_min_others alive_nodes = await self.api.get_alive_endpoints(server_ip) test/pylib/rest_client.py:243: in get_alive_endpoints data = await self.client.get_json(f"/gossiper/endpoint/live", host=node_ip) test/pylib/rest_client.py:99: in get_json ret = await self._fetch("GET", resource_uri, response_type = "json", host = host, _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = method = 'GET', resource = '/gossiper/endpoint/live', response_type = 'json' host = '127.15.252.8', port = 10000, params = None, json = None, timeout = None allow_failed = False async def _fetch(self, method: str, resource: str, response_type: Optional[str] = None, host: Optional[str] = None, port: Optional[int] = None, params: Optional[Mapping[str, str]] = None, json: Optional[Mapping] = None, timeout: Optional[float] = None, allow_failed: bool = False) -> Any: # Can raise exception. See https://docs.aiohttp.org/en/latest/web_exceptions.html assert method in ["GET", "POST", "PUT", "DELETE"], f"Invalid HTTP request method {method}" assert response_type is None or response_type in ["text", "json"], \ f"Invalid response type requested {response_type} (expected 'text' or 'json')" # Build the URI port = port if port else self.default_port if hasattr(self, "default_port") else None port_str = f":{port}" if port else "" assert host is not None or hasattr(self, "default_host"), "_fetch: missing host for " \ "{method} {resource}" host_str = host if host is not None else self.default_host uri = self.uri_scheme + "://" + host_str + port_str + resource logging.debug(f"RESTClient fetching {method} {uri}") client_timeout = ClientTimeout(total = timeout if timeout is not None else 300) async with request(method, uri, connector = self.connector if hasattr(self, "connector") else None, params = params, json = json, timeout = client_timeout) as resp: if allow_failed: return await resp.json() if resp.status != 200: text = await resp.text() > raise HTTPError(uri, resp.status, params, json, text) E test.pylib.rest_client.HTTPError: HTTP error 404, uri: http://127.15.252.8:10000/gossiper/endpoint/live, params: None, json: None, body: E {"message": "Not found", "code": 404} test/pylib/rest_client.py:77: HTTPError ``` Fixes: SCYLLADB-1367 Closes scylladb/scylladb#29323 --- test/pylib/util.py | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/test/pylib/util.py b/test/pylib/util.py index a6f5853254..23117f0178 100644 --- a/test/pylib/util.py +++ b/test/pylib/util.py @@ -64,11 +64,25 @@ async def wait_for( tag = label or getattr(pred, '__name__', 'unlabeled') start = time.time() retries = 0 + last_exception: Exception | None = None while True: elapsed = time.time() - start - assert time.time() < deadline, \ - f"wait_for({tag}) timed out after {elapsed:.2f}s ({retries} retries)" - res = await pred() + if time.time() >= deadline: + timeout_msg = f"wait_for({tag}) timed out after {elapsed:.2f}s ({retries} retries)" + if last_exception is not None: + timeout_msg += ( + f"; last exception: {type(last_exception).__name__}: {last_exception}" + ) + raise AssertionError(timeout_msg) from last_exception + raise AssertionError(timeout_msg) + + try: + res = await pred() + last_exception = None + except Exception as exc: + res = None + last_exception = exc + if res is not None: if retries > 0: logger.debug(f"wait_for({tag}) completed "