Files
scylla/test/storage/test_out_of_space_prevention.py
Botond Dénes 4ce17d20df replica/database: consolidate the two database_apply error injections
Into a single database_apply one. Add three parameters:
* ks_name and cf_name to filter the tables to be affected
* what - what to do: throw or wait

This leads to smaller footprint in the code and improved filtering for
table names at the cost of some extra error injection params in the
tests.

(cherry picked from commit f375aae257)
2026-04-24 18:31:11 +03:00

656 lines
38 KiB
Python

#
# Copyright (C) 2025-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import asyncio
import logging
import os
import pathlib
import psutil
import pytest
import time
import uuid
from cassandra.cluster import ConsistencyLevel
from cassandra.query import SimpleStatement
from typing import Callable
from test.cluster.util import get_topology_coordinator, find_server_by_host_id, new_test_keyspace, new_test_table, reconnect_driver
from test.pylib.manager_client import ManagerClient, wait_for_cql_and_get_hosts
from test.pylib.tablets import get_tablet_count
from test.pylib.util import Host
from test.storage.conftest import space_limited_servers
logger = logging.getLogger(__name__)
def write_generator(table, size_in_kb: int):
for idx in range(size_in_kb):
yield f"INSERT INTO {table} (pk, t) VALUES ({idx}, '{'x' * 1020}')"
class random_content_file:
def __init__(self, path: str, size_in_bytes: int):
path = pathlib.Path(path)
self.filename = path if path.is_file() else path / str(uuid.uuid4())
self.size = size_in_bytes if size_in_bytes > 0 else 0
def __enter__(self):
with open(self.filename, 'wb') as fh:
fh.write(os.urandom(self.size))
def __exit__(self, exc_type, exc_value, exc_traceback):
os.unlink(self.filename)
# Since we create 100M volumes, we need to reduce the commitlog segment size
# otherwise we hit out of space.
global_cmdline = ["--disk-space-monitor-normal-polling-interval-in-seconds", "1",
"--critical-disk-utilization-level", "0.8",
"--commitlog-segment-size-in-mb", "2",
"--schema-commitlog-segment-size-in-mb", "4",
]
@pytest.mark.asyncio
async def test_user_writes_rejection(manager: ManagerClient, volumes_factory: Callable) -> None:
async def validate_data_existence(cql, successful_hosts: list[Host], failed_hosts: list[Host], cf: str, pk: int) -> None:
stmt = SimpleStatement(f"SELECT * from MUTATION_FRAGMENTS({cf}) where pk = {pk};", consistency_level=ConsistencyLevel.ONE)
for host in successful_hosts:
res = await cql.run_async(stmt, host=host)
assert res, f"Data not found on {host}"
for host in failed_hosts:
res = await cql.run_async(stmt, host=host)
assert not res, f"Data found on {host} but it shouldn't be there"
async with space_limited_servers(manager, volumes_factory, ["100M"]*3, cmdline=global_cmdline) as servers:
cql, hosts = await manager.get_ready_cql(servers)
workdir = await manager.server_get_workdir(servers[0].server_id)
log = await manager.server_open_log(servers[0].server_id)
mark = await log.mark()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} AND tablets = {'initial': 1}") as ks:
for server in servers:
await manager.api.disable_autocompaction(server.ip_addr, ks)
async with new_test_table(manager, ks, "pk int PRIMARY KEY, t text", " WITH speculative_retry = 'NONE'") as cf:
wgen = write_generator(cf, 3)
logger.info("Create a big file on the target node to reach critical disk utilization level")
disk_info = psutil.disk_usage(workdir)
with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used):
mark, _ = await log.wait_for("Reached the critical disk utilization level", from_mark=mark)
for _ in range(2):
mark, _ = await log.wait_for("database - Set critical disk utilization mode: true", from_mark=mark)
logger.info("Write data and verify it did not reach the target node")
await cql.run_async(SimpleStatement(next(wgen), consistency_level=ConsistencyLevel.QUORUM))
await validate_data_existence(cql, hosts[1:], [hosts[0]], cf, 0)
logger.info("Restart the node")
mark = await log.mark()
await manager.server_restart(servers[0].server_id)
mark, _ = await log.wait_for("Reached the critical disk utilization level", from_mark=mark)
cql = await reconnect_driver(manager)
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
for _ in range(2):
mark, _ = await log.wait_for("database - Set critical disk utilization mode: true", from_mark=mark)
time.sleep(1) # Let the cluster run for a sec to grep for potential errors
assert await log.grep("database - Set critical disk utilization mode: false", from_mark=mark) == []
logger.info("Write more data and verify it did not reach the target node")
await cql.run_async(SimpleStatement(next(wgen), consistency_level=ConsistencyLevel.QUORUM))
await validate_data_existence(cql, hosts[1:], [hosts[0]], cf, 1)
logger.info("With blob file removed, wait for DB to drop below the critical disk utilization level")
mark, _ = await log.wait_for("Dropped below the critical disk utilization level", from_mark=mark)
for _ in range(2):
mark, _ = await log.wait_for("database - Set critical disk utilization mode: false", from_mark=mark)
logger.info("Write more data and expect it to succeed")
await cql.run_async(SimpleStatement(next(wgen), consistency_level=ConsistencyLevel.ALL))
@pytest.mark.asyncio
async def test_autotoggle_compaction(manager: ManagerClient, volumes_factory: Callable) -> None:
cmdline = [*global_cmdline,
"--logger-log-level", "compaction=debug"]
async with space_limited_servers(manager, volumes_factory, ["100M"]*3, cmdline=cmdline) as servers:
cql, _ = await manager.get_ready_cql(servers)
workdir = await manager.server_get_workdir(servers[0].server_id)
log = await manager.server_open_log(servers[0].server_id)
mark = await log.mark()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} AND tablets = {'initial': 1}") as ks:
for server in servers:
await manager.api.disable_autocompaction(server.ip_addr, ks)
async with new_test_table(manager, ks, "pk int PRIMARY KEY, t text") as cf:
table = cf.split('.')[-1]
for _ in range(3):
await asyncio.gather(*[cql.run_async(query) for query in write_generator(cf, 10)])
await manager.api.flush_keyspace(servers[0].ip_addr, ks)
logger.info("Create a big file on the target node to reach critical disk utilization level")
disk_info = psutil.disk_usage(workdir)
with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used):
mark, _ = await log.wait_for("Reached the critical disk utilization level", from_mark=mark)
for _ in range(2):
mark, _ = await log.wait_for("compaction_manager - Drained", from_mark=mark)
logger.info("Restart the node")
mark = await log.mark()
await manager.server_restart(servers[0].server_id)
await reconnect_driver(manager)
mark, _ = await log.wait_for("Reached the critical disk utilization level", from_mark=mark)
for _ in range(2):
mark, _ = await log.wait_for("compaction_manager - Drained", from_mark=mark)
logger.info("With blob file removed, wait for DB to drop below the critical disk utilization level")
mark, _ = await log.wait_for("Dropped below the critical disk utilization level", from_mark=mark)
for _ in range(2):
mark, _ = await log.wait_for("compaction_manager - Enabled", from_mark=mark)
await manager.api.keyspace_compaction(servers[0].ip_addr, ks)
await log.wait_for(rf"Major {ks}\.{table} .* Compacted .* sstables to .*", from_mark=mark)
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_critical_utilization_during_decommission(manager: ManagerClient, volumes_factory: Callable) -> None:
"""
Test that decommission fails when the target node reaches critical disk utilization level during streaming.
Scenario:
- Create a 2-node cluster with limited disk space.
- Create and populate a test table.
- Trap migrations before they start streaming
- Start decommissioning one node.
- During streaming, create a large file on the target node to push it over the critical disk utilization level.
- Unblock streaming
- Verify that the decommission operation fails.
"""
cmdline = [*global_cmdline,
"--logger-log-level", "load_balancer=debug",
"--logger-log-level", "debug_error_injection=debug"
]
config = {
'tablet_load_stats_refresh_interval_in_seconds': 1
}
async with space_limited_servers(manager, volumes_factory, ["100M"]*2, config=config, cmdline=cmdline,
property_file=[{"dc": "dc1", "rack": "r1"}]*2) as servers:
cql, _ = await manager.get_ready_cql(servers)
workdir = await manager.server_get_workdir(servers[0].server_id)
log = await manager.server_open_log(servers[0].server_id)
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'dc1': ['{servers[1].rack}'] }}"
" AND tablets = {'initial': 8}") as ks:
async with new_test_table(manager, ks, "pk int PRIMARY KEY, t text") as cf:
await asyncio.gather(*[cql.run_async(query) for query in write_generator(cf, 10)])
# We want to trap only migrations which happened during decommission
await manager.api.quiesce_topology(servers[0].ip_addr)
await manager.api.enable_injection(servers[0].ip_addr, "stream_tablet_wait", one_shot=False)
mark = await log.mark()
decomm_task = asyncio.create_task(manager.decommission_node(servers[1].server_id))
mark, _ = await log.wait_for("Will set .* stage to streaming", from_mark=mark)
logger.info("Create a big file on the target node to reach critical disk utilization level")
disk_info = psutil.disk_usage(workdir)
with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used):
mark, _ = await log.wait_for("Reached the critical disk utilization level", from_mark=mark)
mark, _ = await log.wait_for("Refreshing table load stats", from_mark=mark)
mark, _ = await log.wait_for("Refreshed table load stats", from_mark=mark)
await manager.api.disable_injection(servers[0].ip_addr, "stream_tablet_wait")
with pytest.raises(Exception, match="Decommission failed.*critical disk utilization"):
await decomm_task
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_reject_split_compaction(manager: ManagerClient, volumes_factory: Callable) -> None:
async with space_limited_servers(manager, volumes_factory, ["100M"]*3, cmdline=global_cmdline) as servers:
cql, _ = await manager.get_ready_cql(servers)
workdir = await manager.server_get_workdir(servers[0].server_id)
log = await manager.server_open_log(servers[0].server_id)
mark = await log.mark()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} AND tablets = {'initial': 1}") as ks:
async with new_test_table(manager, ks, "pk int PRIMARY KEY, t text") as cf:
for _ in range(30):
await asyncio.gather(*[cql.run_async(query) for query in write_generator(cf, 100)])
await manager.api.flush_keyspace(servers[0].ip_addr, ks)
logger.info("Trigger split compaction")
await manager.api.enable_injection(servers[0].ip_addr, "split_sstable_rewrite", one_shot=False)
cql.execute_async(f"ALTER KEYSPACE {ks} WITH tablets = {{'initial': 32}}")
mark, _ = await log.wait_for("split_sstable_rewrite: waiting", from_mark=mark)
logger.info("Create a big file on the target node to reach critical disk utilization level")
disk_info = psutil.disk_usage(workdir)
with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used):
mark, _ = await log.wait_for("Reached the critical disk utilization level", from_mark=mark)
await log.wait_for(f"Split task .* for table {cf} .* stopped, reason: Compaction for {cf} was stopped due to: drain", from_mark=mark)
@pytest.mark.asyncio
async def test_split_compaction_not_triggered(manager: ManagerClient, volumes_factory: Callable) -> None:
cmd = [*global_cmdline,
"--logger-log-level", "compaction=debug"]
async with space_limited_servers(manager, volumes_factory, ["100M"]*3, cmdline=cmd) as servers:
cql, _ = await manager.get_ready_cql(servers)
workdir = await manager.server_get_workdir(servers[0].server_id)
s1_log = await manager.server_open_log(servers[0].server_id)
s1_mark = await s1_log.mark()
s2_log = await manager.server_open_log(servers[1].server_id)
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} AND tablets = {'initial': 1}") as ks:
async with new_test_table(manager, ks, "pk int PRIMARY KEY, t text") as cf:
for _ in range(30):
await asyncio.gather(*[cql.run_async(query) for query in write_generator(cf, 100)])
await manager.api.flush_keyspace(servers[0].ip_addr, ks)
logger.info("Create a big file on the target node to reach critical disk utilization level")
disk_info = psutil.disk_usage(workdir)
with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used):
s1_mark, _ = await s1_log.wait_for("Reached the critical disk utilization level", from_mark=s1_mark)
for _ in range(2):
s1_mark, _ = await s1_log.wait_for("compaction_manager - Drained", from_mark=s1_mark)
logger.info("Trigger split compaction")
s2_mark = await s2_log.mark()
cql.execute_async(f"ALTER KEYSPACE {ks} WITH tablets = {{'initial': 32}}")
await s2_log.wait_for(f"compaction.*Split {cf}", from_mark=s2_mark)
assert await s1_log.grep(f"compaction.*Split {cf}", from_mark=s1_mark) == []
@pytest.mark.asyncio
async def test_tablet_repair(manager: ManagerClient, volumes_factory: Callable) -> None:
cfg = {
'tablet_load_stats_refresh_interval_in_seconds': 1,
}
async with space_limited_servers(manager, volumes_factory, ["100M"]*3, cmdline=global_cmdline, config=cfg) as servers:
cql, _ = await manager.get_ready_cql(servers)
workdir = await manager.server_get_workdir(servers[0].server_id)
log = await manager.server_open_log(servers[0].server_id)
host = await manager.get_host_id(servers[0].server_id)
mark = await log.mark()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} AND tablets = {'initial': 4}") as ks:
async with new_test_table(manager, ks, "pk int PRIMARY KEY, t text") as cf:
table = cf.split('.')[-1]
for _ in range(2):
await asyncio.gather(*[cql.run_async(query) for query in write_generator(cf, 100)])
await manager.api.flush_keyspace(servers[0].ip_addr, ks)
await manager.server_stop_gracefully(servers[0].server_id)
await manager.server_wipe_sstables(servers[0].server_id, ks, table)
await manager.server_start(servers[0].server_id)
cql = await reconnect_driver(manager)
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
logger.info("Create a big file on the target node to reach critical disk utilization level")
disk_info = psutil.disk_usage(workdir)
with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used):
mark, _ = await log.wait_for("Reached the critical disk utilization level", from_mark=mark)
for _ in range(2):
mark, _ = await log.wait_for("repair - Drained", from_mark=mark)
coord = await get_topology_coordinator(manager)
coord_serv = await find_server_by_host_id(manager, servers, coord)
coord_log = await manager.server_open_log(coord_serv.server_id)
coord_mark = await coord_log.mark()
logger.info("Schedule tablet repair")
response = await manager.api.tablet_repair(servers[0].ip_addr, ks, table, "all", await_completion=False)
task_id = response['tablet_task_id']
for _ in range(await get_tablet_count(manager, servers[1], ks, table)):
coord_mark, matches = await coord_log.wait_for("Initiating tablet repair host=(?P<host>.*) tablet=(?P<tablet>.*)", from_mark=coord_mark)
dst_host, tablet = matches[0][1].group("host"), matches[0][1].group("tablet")
if host == dst_host:
# Tablet repair is triggered on the node with disk utilization above the critical level.
# A local tablet repair task is refused to be created and the tablet repair fails.
error = "Repair service is disabled. No repairs will be started until it's re-enabled"
else:
# Tablet repair is triggered on the node with disk utilization below the critical level.
# A local tablet repair task is created and the row-level repair is executed. It will try
# to send missing rows to the node with critical disk utilization that are rejected.
error = f"put_row_diff: Repair follower={host} failed in put_row_diff handler"
await coord_log.wait_for(f"repair for tablet {tablet} failed: seastar::rpc::remote_verb_error.*{error}", from_mark=coord_mark)
logger.info("Restart the node")
mark = await log.mark()
await manager.server_restart(servers[0].server_id, wait_others=2)
await reconnect_driver(manager)
mark, _ = await log.wait_for("Reached the critical disk utilization level", from_mark=mark)
for _ in range(2):
mark, _ = await log.wait_for("repair - Drained", from_mark=mark)
logger.info("With blob file removed, wait for the tablet repair to succeed")
mark, _ = await log.wait_for("Dropped below the critical disk utilization level", from_mark=mark)
await manager.api.wait_task(servers[0].ip_addr, task_id)
@pytest.mark.asyncio
async def test_autotoggle_reject_incoming_migrations(manager: ManagerClient, volumes_factory: Callable) -> None:
cfg = {
'tablet_load_stats_refresh_interval_in_seconds': 1,
}
async with space_limited_servers(manager, volumes_factory, ["100M"]*3, cmdline=global_cmdline, config=cfg) as servers:
await manager.disable_tablet_balancing()
cql, _ = await manager.get_ready_cql(servers)
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
async with new_test_table(manager, ks, "pk int PRIMARY KEY, t text") as cf:
table = cf.split('.')[-1]
await asyncio.gather(*[cql.run_async(query) for query in write_generator(cf, 10)])
logger.info("Get tablet to migrate")
table_id = await cql.run_async(f"SELECT id FROM system_schema.tables WHERE keyspace_name = '{ks}' AND table_name = '{table}'")
table_id = table_id[0].id
tablet_infos = await cql.run_async(f"SELECT last_token, replicas FROM system.tablets WHERE table_id = {table_id}")
tablet_infos = list(tablet_infos)
assert len(tablet_infos) == 1
tablet_info = tablet_infos[0]
assert len(tablet_info.replicas) == 1
hosts = {await manager.get_host_id(server.server_id) : server for server in servers}
source_host, source_shard = tablet_info.replicas[0]
del hosts[str(source_host)]
target_host, target_server = list(hosts.items())[0]
target_shard = source_shard
logger.info(f"Tablet to migrate: {tablet_info.last_token} from {source_host} to {target_host}")
logger.info("Create a big file on the target node to reach critical disk utilization level")
workdir = await manager.server_get_workdir(target_server.server_id)
log = await manager.server_open_log(target_server.server_id)
mark = await log.mark()
disk_info = psutil.disk_usage(workdir)
with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used):
mark, _ = await log.wait_for("Reached the critical disk utilization level", from_mark=mark)
for _ in range(2):
mark, _ = await log.wait_for("database - Set critical disk utilization mode: true", from_mark=mark)
logger.info("Migrate a tablet to the target node and expect a failure")
await manager.api.move_tablet(node_ip=servers[0].ip_addr, ks=ks, table=table, src_host=source_host,
src_shard=source_shard, dst_host=target_host, dst_shard=target_shard,
token=tablet_info.last_token)
mark, _ = await log.wait_for("Streaming for tablet migration .* failed", from_mark=mark)
logger.info("With blob file removed, wait for DB to drop below the critical disk utilization level")
mark, _ = await log.wait_for("Dropped below the critical disk utilization level", from_mark=mark)
for _ in range(2):
mark, _ = await log.wait_for("database - Set critical disk utilization mode: false", from_mark=mark)
logger.info("Migrate a tablet to the target node and expect a success")
await manager.api.move_tablet(node_ip=servers[0].ip_addr, ks=ks, table=table, src_host=source_host,
src_shard=source_shard, dst_host=target_host, dst_shard=target_shard,
token=tablet_info.last_token)
mark, _ = await log.wait_for("Streaming for tablet migration .* successful", from_mark=mark)
@pytest.mark.asyncio
async def test_node_restart_while_tablet_split(manager: ManagerClient, volumes_factory: Callable) -> None:
cfg = {
'tablet_load_stats_refresh_interval_in_seconds': 1,
}
cmd = [*global_cmdline,
"--logger-log-level", "compaction=debug"]
async with space_limited_servers(manager, volumes_factory, ["100M"]*3, cmdline=cmd, config=cfg) as servers:
cql, _ = await manager.get_ready_cql(servers)
workdir = await manager.server_get_workdir(servers[0].server_id)
log = await manager.server_open_log(servers[0].server_id)
mark = await log.mark()
logger.info("Create and populate test table")
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} AND tablets = {'initial': 1}") as ks:
async with new_test_table(manager, ks, "pk int PRIMARY KEY, t text") as cf:
table = cf.split('.')[-1]
table_id = (await cql.run_async(f"SELECT id FROM system_schema.tables WHERE keyspace_name = '{ks}' AND table_name = '{table}'"))[0].id
await asyncio.gather(*[cql.run_async(query) for query in write_generator(cf, 64)])
await manager.api.flush_keyspace(servers[0].ip_addr, ks)
# Ensure that the topology state update reaches the node. We should never reach 5s timeout here.
# But one call to retrieve resize_task_info may not be enough as the entry to system.tablets might
# not be there yet.
async def assert_resize_task_info(table_id, cb):
async with asyncio.timeout(5):
while (response := await cql.run_async(f"SELECT resize_task_info from system.tablets where table_id = {table_id};")):
try:
assert cb(response)
except AssertionError:
await asyncio.sleep(0.1)
else:
break
logger.info("Create a big file on the target node to reach critical disk utilization level")
disk_info = psutil.disk_usage(workdir)
with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used):
mark, _ = await log.wait_for("Reached the critical disk utilization level", from_mark=mark)
for _ in range(2):
mark, _ = await log.wait_for("compaction_manager - Drained", from_mark=mark)
logger.info("Trigger split in table and restart the node")
coord = await get_topology_coordinator(manager)
logger.info(f"Topology coordinator is {coord}")
coord_serv = await find_server_by_host_id(manager, servers, coord)
coord_log = await manager.server_open_log(coord_serv.server_id)
await cql.run_async(f"ALTER TABLE {cf} WITH tablets = {{'min_tablet_count': 2}};")
await coord_log.wait_for(f"Generating resize decision for table {table_id} of type split")
mark = await log.mark()
await manager.server_restart(servers[0].server_id, wait_others=2)
cql = await reconnect_driver(manager)
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
mark, _ = await log.wait_for("Reached the critical disk utilization level", from_mark=mark)
logger.info("Check if tablet split happened")
await assert_resize_task_info(table_id, lambda response: len(response) == 1 and response[0].resize_task_info is not None)
time.sleep(1) # Let the cluster run for a sec to grep for potential errors
assert await log.grep(f"compaction.*Split {cf}", from_mark=mark) == []
logger.info("With blob file removed, wait for DB to drop below the critical disk utilization level")
mark, _ = await log.wait_for("Dropped below the critical disk utilization level", from_mark=mark)
for _ in range(2):
mark, _ = await log.wait_for("compaction_manager - Enabled", from_mark=mark)
mark, _ = await log.wait_for(f"Detected tablet split for table {cf}, increasing from 1 to 2 tablets", from_mark=mark)
await assert_resize_task_info(table_id, lambda response: len(response) == 2 and all(r.resize_task_info is None for r in response))
# Verify that new sstable produced by repair cannot be split, if disk utilization level is critical.
@pytest.mark.asyncio
@pytest.mark.skip_mode('release', 'error injections are not supported in release mode')
async def test_repair_failure_on_split_rejection(manager: ManagerClient, volumes_factory: Callable) -> None:
cfg = {
'tablet_load_stats_refresh_interval_in_seconds': 1,
}
cmd = [*global_cmdline,
"--logger-log-level", "compaction=debug"]
async with space_limited_servers(manager, volumes_factory, ["100M"]*3, cmdline=cmd, config=cfg) as servers:
cql, _ = await manager.get_ready_cql(servers)
workdir = await manager.server_get_workdir(servers[0].server_id)
log = await manager.server_open_log(servers[0].server_id)
mark = await log.mark()
logger.info("Create and populate test table")
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} AND tablets = {'initial': 2}") as ks:
async with new_test_table(manager, ks, "pk int PRIMARY KEY, t text") as cf:
table = cf.split('.')[-1]
table_id = (await cql.run_async(f"SELECT id FROM system_schema.tables WHERE keyspace_name = '{ks}' AND table_name = '{table}'"))[0].id
await asyncio.gather(*[cql.run_async(query) for query in write_generator(cf, 64)])
await manager.api.flush_keyspace(servers[0].ip_addr, ks)
coord = await get_topology_coordinator(manager)
coord_serv = await find_server_by_host_id(manager, servers, coord)
coord_log = await manager.server_open_log(coord_serv.server_id)
async def run_split():
await manager.api.enable_injection(coord_serv.ip_addr, 'tablet_resize_finalization_postpone', one_shot=False)
# force split on the test table
await cql.run_async(f"ALTER TABLE {cf} WITH tablets = {{'min_tablet_count': 4}}")
await coord_log.wait_for(f"Generating resize decision for table {table_id} of type split")
async def generate_repair_work():
insert_stmt = cql.prepare(f"INSERT INTO {cf} (pk, t) VALUES (?, ?)")
insert_stmt.consistency_level = ConsistencyLevel.ONE
await manager.api.enable_injection(servers[0].ip_addr, "database_apply", one_shot=False, parameters={"ks_name": ks, "cf_name": table, "what": "throw"})
pks = range(256, 512)
await asyncio.gather(*[cql.run_async(insert_stmt, (k, f'{k}')) for k in pks])
await manager.api.disable_injection(servers[0].ip_addr, "database_apply")
await generate_repair_work()
await manager.api.enable_injection(servers[0].ip_addr, "maybe_split_new_sstable_wait", one_shot=True)
token = 'all'
repair_task = asyncio.create_task(manager.api.tablet_repair(servers[0].ip_addr, ks, table, token))
# Emit split decision during repair.
await run_split()
await log.wait_for("maybe_split_new_sstable_wait: waiting", from_mark=mark)
await manager.api.disable_injection(coord_serv.ip_addr, "tablet_resize_finalization_postpone")
logger.info("Create a big file on the target node to reach critical disk utilization level")
disk_info = psutil.disk_usage(workdir)
with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used):
mark, _ = await log.wait_for("Reached the critical disk utilization level", from_mark=mark)
for _ in range(2):
mark, _ = await log.wait_for("compaction_manager - Drained", from_mark=mark)
await manager.api.message_injection(servers[0].ip_addr, "maybe_split_new_sstable_wait")
# Expect repair to fail when splitting new sstables
await log.wait_for("Repair for tablet migration of .* failed", from_mark=mark)
await log.wait_for("Failed to load SSTable.*\(critical disk utilization\)", from_mark=mark)
assert await log.grep(f"compaction.*Split {cf}", from_mark=mark) == []
logger.info("With blob file removed, wait for DB to drop below the critical disk utilization level")
mark, _ = await log.wait_for("Dropped below the critical disk utilization level", from_mark=mark)
for _ in range(2):
mark, _ = await log.wait_for("compaction_manager - Enabled", from_mark=mark)
await repair_task
mark, _ = await log.wait_for(f"Detected tablet split for table {cf}", from_mark=mark)
# Since we create 20M volumes, we need to reduce the commitlog segment size
# otherwise we hit out of space.
global_cmdline_with_disabled_monitor = [
"--disk-space-monitor-normal-polling-interval-in-seconds", "1",
"--critical-disk-utilization-level", "1.0",
"--commitlog-segment-size-in-mb", "2",
"--schema-commitlog-segment-size-in-mb", "4",
"--tablet-load-stats-refresh-interval-in-seconds", "1",
]
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_sstables_incrementally_released_during_streaming(manager: ManagerClient, volumes_factory: Callable) -> None:
"""
Test that source node will not run out of space if major compaction rewrites the sstables being streamed.
Expects the file streaming and major will both release sstables incrementally, reducing chances of 2x
space amplification.
Scenario:
- Create a 2-node cluster with limited disk space.
- Create a table with 2 tablets, one in each node
- Write 20% of node capacity to each tablet.
- Start decommissioning one node.
- During streaming, create a large file on the source node to push it over 85%
- Run major expecting the file streaming released the sstables incrementally. Had it not, source node runs out of space.
- Unblock streaming
- Verify that the decommission operation succeeds.
"""
cmdline = [*global_cmdline_with_disabled_monitor,
"--logger-log-level", "load_balancer=debug",
"--logger-log-level", "debug_error_injection=debug"
]
# the coordinator needs more space, so creating a 40M volume for it.
async with space_limited_servers(manager, volumes_factory, ["40M", "20M"], cmdline=cmdline,
property_file=[{"dc": "dc1", "rack": "r1"}]*2) as servers:
cql, _ = await manager.get_ready_cql(servers)
workdir = await manager.server_get_workdir(servers[1].server_id)
log = await manager.server_open_log(servers[1].server_id)
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'dc1': ['{servers[1].rack}'] }}"
" AND tablets = {'initial': 2}") as ks:
await manager.disable_tablet_balancing()
# Needs 1mb fragments in order to stress incremental release in file streaming
extra_table_param = "WITH compaction = {'class' : 'IncrementalCompactionStrategy', 'sstable_size_in_mb' : '1'} and compression = {}"
async with new_test_table(manager, ks, "pk int PRIMARY KEY, t text", extra_table_param) as cf:
before_disk_info = psutil.disk_usage(workdir)
# About 4mb per tablet
await asyncio.gather(*[cql.run_async(query) for query in write_generator(cf, 8000)])
# split data into 1mb fragments
await manager.api.keyspace_flush(servers[1].ip_addr, ks)
await manager.api.keyspace_compaction(servers[1].ip_addr, ks)
after_disk_info = psutil.disk_usage(workdir)
percent_by_writes = after_disk_info.percent - before_disk_info.percent
logger.info(f"Percent taken by writes {percent_by_writes}")
# assert sstable data content account for more than 20% of node's storage.
assert percent_by_writes > 20
# We want to trap only migrations which happened during decommission
await manager.api.quiesce_topology(servers[0].ip_addr)
await manager.api.enable_injection(servers[1].ip_addr, "tablet_stream_files_end_wait", one_shot=True)
mark = await log.mark()
logger.info(f"Workdir {workdir}")
decomm_task = asyncio.create_task(manager.decommission_node(servers[1].server_id))
await manager.enable_tablet_balancing()
mark, _ = await log.wait_for("tablet_stream_files_end_wait: waiting", from_mark=mark)
disk_info = psutil.disk_usage(workdir)
with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used):
disk_info = psutil.disk_usage(workdir)
logger.info(f"Percent used before major {disk_info.percent}")
# Run major in order to try to reproduce 2x space amplification if files aren't released
# incrementally by streamer.
await manager.api.keyspace_compaction(servers[1].ip_addr, ks)
await asyncio.gather(*[cql.run_async(query) for query in write_generator(cf, 100)])
disk_info = psutil.disk_usage(workdir)
logger.info(f"Percent used after major {disk_info.percent}")
await manager.api.message_injection(servers[1].ip_addr, "tablet_stream_files_end_wait")
await decomm_task