test: pylib: Ignore exceptions in wait_for()
ManagerClient::get_ready_cql() calls server_sees_others(), which waits
for servers to see each other as alive in gossip. If one of the
servers is still early in boot, RESTful API call to
"gossiper/endpoint/live" may fail. It throws an exception, which
currently terminates the wait_for() and propagates up, failing the test.
Fix this by ignoring errors when polling inside wait_for. In case of
timeout, we log the last exception. This should fix the problem not
only in this case, for all uses of wait_for().
Example output:
```
pred = <function ManagerClient.server_sees_others.<locals>._sees_min_others at 0x7f022af9a140>
deadline = 1775218828.9172852, period = 1.0, before_retry = None
backoff_factor = 1.5, max_period = 1.0, label = None
async def wait_for(
pred: Callable[[], Awaitable[Optional[T]]],
deadline: float,
period: float = 0.1,
before_retry: Optional[Callable[[], Any]] = None,
backoff_factor: float = 1.5,
max_period: float = 1.0,
label: Optional[str] = None) -> T:
tag = label or getattr(pred, '__name__', 'unlabeled')
start = time.time()
retries = 0
last_exception: Exception | None = None
while True:
elapsed = time.time() - start
if time.time() >= deadline:
timeout_msg = f"wait_for({tag}) timed out after {elapsed:.2f}s ({retries} retries)"
if last_exception is not None:
timeout_msg += (
f"; last exception: {type(last_exception).__name__}: {last_exception}"
)
raise AssertionError(timeout_msg) from last_exception
raise AssertionError(timeout_msg)
try:
> res = await pred()
test/pylib/util.py:80:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
async def _sees_min_others():
> raise Exception("asd")
E Exception: asd
test/pylib/manager_client.py:802: Exception
The above exception was the direct cause of the following exception:
manager = <test.pylib.manager_client.ManagerClient object at 0x7f022af7e7b0>
@pytest.mark.asyncio
async def test_auth_after_reset(manager: ManagerClient) -> None:
servers = await manager.servers_add(3, config=auth_config, auto_rack_dc="dc1")
> cql, _ = await manager.get_ready_cql(servers)
test/cluster/auth_cluster/test_auth_after_reset.py:33:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
test/pylib/manager_client.py:137: in get_ready_cql
await self.servers_see_each_other(servers)
test/pylib/manager_client.py:820: in servers_see_each_other
await asyncio.gather(*others)
test/pylib/manager_client.py:806: in server_sees_others
await wait_for(_sees_min_others, time() + interval, period=.5)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
pred = <function ManagerClient.server_sees_others.<locals>._sees_min_others at 0x7f022af9a140>
deadline = 1775218828.9172852, period = 1.0, before_retry = None
backoff_factor = 1.5, max_period = 1.0, label = None
async def wait_for(
pred: Callable[[], Awaitable[Optional[T]]],
deadline: float,
period: float = 0.1,
before_retry: Optional[Callable[[], Any]] = None,
backoff_factor: float = 1.5,
max_period: float = 1.0,
label: Optional[str] = None) -> T:
tag = label or getattr(pred, '__name__', 'unlabeled')
start = time.time()
retries = 0
last_exception: Exception | None = None
while True:
elapsed = time.time() - start
if time.time() >= deadline:
timeout_msg = f"wait_for({tag}) timed out after {elapsed:.2f}s ({retries} retries)"
if last_exception is not None:
timeout_msg += (
f"; last exception: {type(last_exception).__name__}: {last_exception}"
)
> raise AssertionError(timeout_msg) from last_exception
E AssertionError: wait_for(_sees_min_others) timed out after 45.30s (46 retries); last exception: Exception: asd
test/pylib/util.py:76: AssertionError
```
Fixes a failure observed in test_auth_after_reset:
```
manager = <test.pylib.manager_client.ManagerClient object at 0x7fb3740e1630>
@pytest.mark.asyncio
async def test_auth_after_reset(manager: ManagerClient) -> None:
servers = await manager.servers_add(3, config=auth_config, auto_rack_dc="dc1")
cql, _ = await manager.get_ready_cql(servers)
await cql.run_async("ALTER ROLE cassandra WITH PASSWORD = 'forgotten_pwd'")
logging.info("Stopping cluster")
await asyncio.gather(*[manager.server_stop_gracefully(server.server_id) for server in servers])
logging.info("Deleting sstables")
for table in ["roles", "role_members", "role_attributes", "role_permissions"]:
await asyncio.gather(*[manager.server_wipe_sstables(server.server_id, "system", table) for server in servers])
logging.info("Starting cluster")
# Don't try connect to the servers yet, with deleted superuser it will be possible only after
# quorum is reached.
await asyncio.gather(*[manager.server_start(server.server_id, connect_driver=False) for server in servers])
logging.info("Waiting for CQL connection")
await repeat_until_success(lambda: manager.driver_connect(auth_provider=PlainTextAuthProvider(username="cassandra", password="cassandra")))
> await manager.get_ready_cql(servers)
test/cluster/auth_cluster/test_auth_after_reset.py:50:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
test/pylib/manager_client.py:137: in get_ready_cql
await self.servers_see_each_other(servers)
test/pylib/manager_client.py:819: in servers_see_each_other
await asyncio.gather(*others)
test/pylib/manager_client.py:805: in server_sees_others
await wait_for(_sees_min_others, time() + interval, period=.5)
test/pylib/util.py:71: in wait_for
res = await pred()
test/pylib/manager_client.py:802: in _sees_min_others
alive_nodes = await self.api.get_alive_endpoints(server_ip)
test/pylib/rest_client.py:243: in get_alive_endpoints
data = await self.client.get_json(f"/gossiper/endpoint/live", host=node_ip)
test/pylib/rest_client.py:99: in get_json
ret = await self._fetch("GET", resource_uri, response_type = "json", host = host,
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <test.pylib.rest_client.TCPRESTClient object at 0x7fb2404a0650>
method = 'GET', resource = '/gossiper/endpoint/live', response_type = 'json'
host = '127.15.252.8', port = 10000, params = None, json = None, timeout = None
allow_failed = False
async def _fetch(self, method: str, resource: str, response_type: Optional[str] = None,
host: Optional[str] = None, port: Optional[int] = None,
params: Optional[Mapping[str, str]] = None,
json: Optional[Mapping] = None, timeout: Optional[float] = None, allow_failed: bool = False) -> Any:
# Can raise exception. See https://docs.aiohttp.org/en/latest/web_exceptions.html
assert method in ["GET", "POST", "PUT", "DELETE"], f"Invalid HTTP request method {method}"
assert response_type is None or response_type in ["text", "json"], \
f"Invalid response type requested {response_type} (expected 'text' or 'json')"
# Build the URI
port = port if port else self.default_port if hasattr(self, "default_port") else None
port_str = f":{port}" if port else ""
assert host is not None or hasattr(self, "default_host"), "_fetch: missing host for " \
"{method} {resource}"
host_str = host if host is not None else self.default_host
uri = self.uri_scheme + "://" + host_str + port_str + resource
logging.debug(f"RESTClient fetching {method} {uri}")
client_timeout = ClientTimeout(total = timeout if timeout is not None else 300)
async with request(method, uri,
connector = self.connector if hasattr(self, "connector") else None,
params = params, json = json, timeout = client_timeout) as resp:
if allow_failed:
return await resp.json()
if resp.status != 200:
text = await resp.text()
> raise HTTPError(uri, resp.status, params, json, text)
E test.pylib.rest_client.HTTPError: HTTP error 404, uri: http://127.15.252.8:10000/gossiper/endpoint/live, params: None, json: None, body:
E {"message": "Not found", "code": 404}
test/pylib/rest_client.py:77: HTTPError
```
Fixes: SCYLLADB-1367
Closes scylladb/scylladb#29323
This commit is contained in:
committed by
Avi Kivity
parent
8c0920202b
commit
74542be5aa
@@ -64,11 +64,25 @@ async def wait_for(
|
||||
tag = label or getattr(pred, '__name__', 'unlabeled')
|
||||
start = time.time()
|
||||
retries = 0
|
||||
last_exception: Exception | None = None
|
||||
while True:
|
||||
elapsed = time.time() - start
|
||||
assert time.time() < deadline, \
|
||||
f"wait_for({tag}) timed out after {elapsed:.2f}s ({retries} retries)"
|
||||
res = await pred()
|
||||
if time.time() >= deadline:
|
||||
timeout_msg = f"wait_for({tag}) timed out after {elapsed:.2f}s ({retries} retries)"
|
||||
if last_exception is not None:
|
||||
timeout_msg += (
|
||||
f"; last exception: {type(last_exception).__name__}: {last_exception}"
|
||||
)
|
||||
raise AssertionError(timeout_msg) from last_exception
|
||||
raise AssertionError(timeout_msg)
|
||||
|
||||
try:
|
||||
res = await pred()
|
||||
last_exception = None
|
||||
except Exception as exc:
|
||||
res = None
|
||||
last_exception = exc
|
||||
|
||||
if res is not None:
|
||||
if retries > 0:
|
||||
logger.debug(f"wait_for({tag}) completed "
|
||||
|
||||
Reference in New Issue
Block a user