test: add test to check errro handling during tablet draining
The test checks that the topology operation is aborted if an error happens during tablet migration stage.
This commit is contained in:
@@ -8,9 +8,14 @@ from test.pylib.internal_types import ServerInfo
|
||||
from test.pylib.scylla_cluster import ReplaceConfig
|
||||
import pytest
|
||||
import logging
|
||||
import asyncio
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
async def inject_error_on(manager, error_name, servers):
|
||||
errs = [manager.api.enable_injection(s.ip_addr, error_name, True) for s in servers]
|
||||
await asyncio.gather(*errs)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_topology_streaming_failure(request, manager: ManagerClient):
|
||||
"""Fail streaming while doing a topology operation"""
|
||||
@@ -58,3 +63,30 @@ async def test_topology_streaming_failure(request, manager: ManagerClient):
|
||||
assert s not in servers
|
||||
matches = [await log.grep("storage_service - rollback.*after replacing failure to state left_token_ring", from_mark=mark) for log, mark in zip(logs, marks)]
|
||||
assert sum(len(x) for x in matches) == 1
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_tablet_drain_failure_during_decommission(manager: ManagerClient):
|
||||
servers = await manager.running_servers()
|
||||
|
||||
logs = [await manager.server_open_log(srv.server_id) for srv in servers]
|
||||
marks = [await log.mark() for log in logs]
|
||||
|
||||
cql = manager.get_cql()
|
||||
await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', "
|
||||
"'replication_factor': 1, 'initial_tablets': 32};")
|
||||
await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c int);")
|
||||
|
||||
logger.info("Populating table")
|
||||
|
||||
keys = range(256)
|
||||
await asyncio.gather(*[cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({k}, {k});") for k in keys])
|
||||
|
||||
await inject_error_on(manager, "stream_tablet_fail_on_drain", servers)
|
||||
|
||||
await manager.decommission_node(servers[2].server_id, expected_error="Decommission failed. See earlier errors")
|
||||
|
||||
matches = [await log.grep("storage_service - rollback.*after decommissioning failure to state rollback_to_normal", from_mark=mark) for log, mark in zip(logs, marks)]
|
||||
assert sum(len(x) for x in matches) == 1
|
||||
|
||||
await cql.run_async("DROP KEYSPACE test;")
|
||||
|
||||
|
||||
Reference in New Issue
Block a user