test_user_writes_rejection: Fix test flakiness caused by typo and non-local CL=ONE reads

The current code:
```
try:
   cql.execute(f"INSERT INTO {cf} (pk, t) VALUES (-1, 'x')", host=host[0], execution_profile=cl_one_profile).result()
except Exception:
   pass
```

contains a typo: `host=host[0]` which throws an exception becase Host
object is not subscriptable. The test does not fail because the except
block is too broad and suppresses all exceptions.

Fixing the typo alone is insufficient. The write still succeeds because
the remaining nodes are UP and the query uses CL=ONE, so no failure
should be expected.

Another source of flakiness is data verification:
```
SELECT * FROM {cf} WHERE pk = 0;
```

Even when a coordinator is explicitly provided, using CL=ONE does not
guarantee a local read. The coordinator may forward the read request to
another replica, causing the verification to fail nondeterministically.

This patch rewrites the tests to address these issues:
- Fix the typo: `host[0]` to `hosts[0]`
- Verify data using `MUTATION_FRAGMENTS({cf})` which guarantees a local
  read on the coordinator node
- Reconnect the driver after node restart

Fixes https://github.com/scylladb/scylladb/issues/27933

Closes scylladb/scylladb#27934

(cherry picked from commit 7bf26ece4d)

Closes scylladb/scylladb#28094
This commit is contained in:
Łukasz Paszkowski
2025-12-31 11:10:24 +01:00
committed by Dawid Mędrek
parent 7feafe9a62
commit d70c049e07

View File

@@ -11,13 +11,15 @@ import psutil
import pytest
import time
import uuid
from cassandra.cluster import ConsistencyLevel, EXEC_PROFILE_DEFAULT
from cassandra.cluster import ConsistencyLevel
from cassandra.query import SimpleStatement
from typing import Callable
from test.cluster.conftest import skip_mode
from test.cluster.util import get_topology_coordinator, find_server_by_host_id, new_test_keyspace, new_test_table
from test.pylib.manager_client import ManagerClient
from test.pylib.tablets import get_tablet_count
from test.pylib.util import Host
from test.storage.conftest import space_limited_servers
logger = logging.getLogger(__name__)
@@ -53,6 +55,15 @@ global_cmdline = ["--disk-space-monitor-normal-polling-interval-in-seconds", "1"
@pytest.mark.asyncio
async def test_user_writes_rejection(manager: ManagerClient, volumes_factory: Callable) -> None:
async def validate_data_existence(cql, successful_hosts: list[Host], failed_hosts: list[Host], cf: str, pk: int) -> None:
stmt = SimpleStatement(f"SELECT * from MUTATION_FRAGMENTS({cf}) where pk = {pk};", consistency_level=ConsistencyLevel.ONE)
for host in successful_hosts:
res = await cql.run_async(stmt, host=host)
assert res, f"Data not found on {host}"
for host in failed_hosts:
res = await cql.run_async(stmt, host=host)
assert not res, f"Data found on {host} but it shouldn't be there"
async with space_limited_servers(manager, volumes_factory, ["100M"]*3, cmdline=global_cmdline) as servers:
cql, hosts = await manager.get_ready_cql(servers)
@@ -65,6 +76,7 @@ async def test_user_writes_rejection(manager: ManagerClient, volumes_factory: Ca
await manager.api.disable_autocompaction(server.ip_addr, ks)
async with new_test_table(manager, ks, "pk int PRIMARY KEY, t text", " WITH speculative_retry = 'NONE'") as cf:
wgen = write_generator(cf, 3)
logger.info("Create a big file on the target node to reach critical disk utilization level")
disk_info = psutil.disk_usage(workdir)
@@ -73,41 +85,30 @@ async def test_user_writes_rejection(manager: ManagerClient, volumes_factory: Ca
mark, _ = await log.wait_for("database - Setting critical disk utilization mode: true", from_mark=mark)
logger.info("Write data and verify it did not reach the target node")
query = next(write_generator(cf, 1))
cl_quorum_profile = cql.execution_profile_clone_update(EXEC_PROFILE_DEFAULT, consistency_level = ConsistencyLevel.QUORUM)
await cql.run_async(query, execution_profile=cl_quorum_profile)
cl_one_profile = cql.execution_profile_clone_update(EXEC_PROFILE_DEFAULT, consistency_level = ConsistencyLevel.ONE)
res = cql.execute(f"SELECT * from {cf} where pk = 0;", host=hosts[0], execution_profile=cl_one_profile)
assert res.one() is None
for host in hosts[1:]:
res = cql.execute(f"SELECT * from {cf} where pk = 0;", host=host, execution_profile=cl_one_profile)
assert res.one()
await cql.run_async(SimpleStatement(next(wgen), consistency_level=ConsistencyLevel.QUORUM))
await validate_data_existence(cql, hosts[1:], [hosts[0]], cf, 0)
logger.info("Restart the node")
mark = await log.mark()
await manager.server_restart(servers[0].server_id)
await manager.driver_connect()
cql = manager.get_cql()
for _ in range(2):
mark, _ = await log.wait_for("database - Setting critical disk utilization mode: true", from_mark=mark)
time.sleep(1) # Let the cluster run for a sec to grep for potential errors
assert await log.grep("database - Setting critical disk utilization mode: false", from_mark=mark) == []
try:
cql.execute(f"INSERT INTO {cf} (pk, t) VALUES (-1, 'x')", host=host[0], execution_profile=cl_one_profile).result()
except Exception:
pass
else:
pytest.fail("Expected to fail due to critical disk utilization level")
logger.info("Write more data and verify it did not reach the target node")
await cql.run_async(SimpleStatement(next(wgen), consistency_level=ConsistencyLevel.QUORUM))
await validate_data_existence(cql, hosts[1:], [hosts[0]], cf, 1)
logger.info("With blob file removed, wait for DB to drop below the critical disk utilization level")
for _ in range(2):
mark, _ = await log.wait_for("database - Setting critical disk utilization mode: false", from_mark=mark)
logger.info("Write more data and expect it to succeed")
cl_all_profile = cql.execution_profile_clone_update(EXEC_PROFILE_DEFAULT, consistency_level = ConsistencyLevel.ALL)
cql.execute(next(write_generator(cf, 1)), execution_profile=cl_all_profile)
await cql.run_async(SimpleStatement(next(wgen), consistency_level=ConsistencyLevel.ALL))
@pytest.mark.asyncio