test: add tests for truncate with tablets
This patch adds the unit tests for truncate with tablets. test_truncate_while_migration() triggers a tablet migration, then runs a TRUNCATE TABLE for the table containing the tablet being migrated. test_truncate_with_concurrent_drop() starts a truncate, then attempts to drop the table while it is being truncated. test_truncate_while_node_restart() validates the case where a replica node is restarted while truncate is running. test_truncate_with_coordinator_crash() validates if truncate is correctly completed in cases where the topology coordinator has crashed or restarted after the truncate session is cleared, but before the truncate request is finalized.
This commit is contained in:
@@ -6025,6 +6025,10 @@ future<> storage_service::stream_tablet(locator::global_tablet_id tablet) {
|
||||
throw std::runtime_error(fmt::format("Cannot stream within the same node using regular migration, tablet: {}, shard {} -> {}",
|
||||
tablet, leaving_replica->shard, trinfo->pending_replica->shard));
|
||||
}
|
||||
co_await utils::get_local_injector().inject("migration_streaming_wait", [] (auto& handler) {
|
||||
rtlogger.info("migration_streaming_wait: start");
|
||||
return handler.wait_for_message(db::timeout_clock::now() + std::chrono::minutes(2));
|
||||
});
|
||||
auto& table = _db.local().find_column_family(tablet.table);
|
||||
std::vector<sstring> tables = {table.schema()->cf_name()};
|
||||
auto my_id = tm->get_my_id();
|
||||
|
||||
@@ -873,6 +873,11 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
// Release the guard to avoid blocking group0 for long periods of time while invoking RPCs
|
||||
release_guard(std::move(guard));
|
||||
|
||||
co_await utils::get_local_injector().inject("truncate_table_wait", [] (auto& handler) {
|
||||
rtlogger.info("truncate_table_wait: start");
|
||||
return handler.wait_for_message(db::timeout_clock::now() + std::chrono::minutes(2));
|
||||
});
|
||||
|
||||
// Check if all the nodes with replicas are alive
|
||||
for (const locator::host_id& replica_host: replica_hosts) {
|
||||
if (!_gossiper.is_alive(replica_host)) {
|
||||
@@ -917,6 +922,11 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
}
|
||||
}
|
||||
|
||||
utils::get_local_injector().inject("truncate_crash_after_session_clear", [] {
|
||||
rtlogger.info("truncate_crash_after_session_clear hit, killing the node");
|
||||
_exit(1);
|
||||
});
|
||||
|
||||
// Execute a barrier to ensure the TRUNCATE RPC can't run on any nodes after this point
|
||||
if (!guard) {
|
||||
guard = co_await start_operation();
|
||||
|
||||
215
test/topology_custom/test_truncate_with_tablets.py
Normal file
215
test/topology_custom/test_truncate_with_tablets.py
Normal file
@@ -0,0 +1,215 @@
|
||||
#
|
||||
# Copyright (C) 2024-present ScyllaDB
|
||||
#
|
||||
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
#
|
||||
from cassandra.query import SimpleStatement, ConsistencyLevel
|
||||
from cassandra.protocol import InvalidRequest
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.topology.conftest import skip_mode
|
||||
from test.topology.util import get_topology_coordinator
|
||||
from test.pylib.tablets import get_all_tablet_replicas
|
||||
from test.pylib.util import wait_for_cql_and_get_hosts
|
||||
import time
|
||||
import pytest
|
||||
import logging
|
||||
import asyncio
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@skip_mode('release', 'error injections are not supported in release mode')
|
||||
async def test_truncate_while_migration(manager: ManagerClient):
|
||||
|
||||
logger.info('Bootstrapping cluster')
|
||||
cfg = { 'enable_tablets': True,
|
||||
'error_injections_at_startup': ['migration_streaming_wait']
|
||||
}
|
||||
|
||||
servers = []
|
||||
servers.append(await manager.server_add(config=cfg))
|
||||
|
||||
cql = manager.get_cql()
|
||||
|
||||
# Create a keyspace with tablets and initial_tablets == 2, then insert data
|
||||
await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 2}")
|
||||
await cql.run_async('CREATE TABLE test.test (pk int PRIMARY KEY, c int);')
|
||||
|
||||
keys = range(1024)
|
||||
await asyncio.gather(*[cql.run_async(f'INSERT INTO test.test (pk, c) VALUES ({k}, {k});') for k in keys])
|
||||
|
||||
# Add a node to the cluster. This will cause the tablet load balancer to migrate one tablet to the new node
|
||||
servers.append(await manager.server_add(config=cfg))
|
||||
|
||||
# Wait for tablet streaming to start
|
||||
pending_node = servers[1]
|
||||
pending_log = await manager.server_open_log(pending_node.server_id)
|
||||
|
||||
await pending_log.wait_for('migration_streaming_wait: start')
|
||||
await manager.api.message_injection(pending_node.ip_addr, 'migration_streaming_wait')
|
||||
|
||||
# Do a TRUNCATE TABLE while the tablet is being streamed
|
||||
await cql.run_async('TRUNCATE TABLE test.test')
|
||||
|
||||
# Wait for streaming to complete
|
||||
await pending_log.wait_for('raft_topology - Streaming for tablet migration of.*successful')
|
||||
|
||||
# Check if we have any data
|
||||
row = await cql.run_async(SimpleStatement('SELECT COUNT(*) FROM test.test', consistency_level=ConsistencyLevel.ALL))
|
||||
assert row[0].count == 0
|
||||
|
||||
|
||||
async def get_raft_leader_and_log(manager: ManagerClient, servers):
|
||||
raft_leader_host_id = await get_topology_coordinator(manager)
|
||||
for s in servers:
|
||||
if raft_leader_host_id == await manager.get_host_id(s.server_id):
|
||||
raft_leader = s
|
||||
break
|
||||
raft_leader_log = await manager.server_open_log(raft_leader.server_id)
|
||||
return (raft_leader, raft_leader_log)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@skip_mode('release', 'error injections are not supported in release mode')
|
||||
async def test_truncate_with_concurrent_drop(manager: ManagerClient):
|
||||
|
||||
logger.info('Bootstrapping cluster')
|
||||
cfg = { 'enable_tablets': True,
|
||||
'error_injections_at_startup': ['truncate_table_wait']
|
||||
}
|
||||
|
||||
servers = []
|
||||
servers.append(await manager.server_add(config=cfg))
|
||||
servers.append(await manager.server_add(config=cfg))
|
||||
servers.append(await manager.server_add(config=cfg))
|
||||
|
||||
cql = manager.get_cql()
|
||||
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
|
||||
# Create a keyspace with tablets and initial_tablets == 2, then insert data
|
||||
await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 2}")
|
||||
await cql.run_async('CREATE TABLE test.test (pk int PRIMARY KEY, c int);')
|
||||
|
||||
keys = range(1024)
|
||||
await asyncio.gather(*[cql.run_async(f'INSERT INTO test.test (pk, c) VALUES ({k}, {k});') for k in keys])
|
||||
|
||||
(raft_leader, raft_leader_log) = await get_raft_leader_and_log(manager, servers)
|
||||
|
||||
if raft_leader == servers[0]:
|
||||
trunc_host = hosts[1]
|
||||
drop_host = hosts[2]
|
||||
elif raft_leader == servers[1]:
|
||||
trunc_host = hosts[0]
|
||||
drop_host = hosts[2]
|
||||
elif raft_leader == servers[2]:
|
||||
trunc_host = hosts[0]
|
||||
drop_host = hosts[1]
|
||||
else:
|
||||
assert False, 'Unable to determine raft leader'
|
||||
|
||||
# Start a TRUNCATE in the background
|
||||
trunc_future = cql.run_async('TRUNCATE TABLE test.test', host=trunc_host)
|
||||
# Wait for the topology coordinator to reach a point wher it is about to start sending the truncate RPCs
|
||||
await raft_leader_log.wait_for('truncate_table_wait: start')
|
||||
# Execute DROP TABLE
|
||||
await cql.run_async('DROP TABLE test.test', host=drop_host)
|
||||
# Release TRUNCATE table in topology coordinator
|
||||
await manager.api.message_injection(raft_leader.ip_addr, 'truncate_table_wait')
|
||||
# Check we received an error
|
||||
with pytest.raises(InvalidRequest, match='unconfigured table test'):
|
||||
await trunc_future
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@skip_mode('release', 'error injections are not supported in release mode')
|
||||
async def test_truncate_while_node_restart(manager: ManagerClient):
|
||||
|
||||
logger.info('Bootstrapping cluster')
|
||||
cfg = { 'enable_tablets': True }
|
||||
|
||||
servers = []
|
||||
servers.append(await manager.server_add(config=cfg))
|
||||
servers.append(await manager.server_add(config=cfg))
|
||||
servers.append(await manager.server_add(config=cfg))
|
||||
|
||||
cql = manager.get_cql()
|
||||
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
|
||||
# Create a keyspace with tablets and initial_tablets == 2, then insert data
|
||||
await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 2}")
|
||||
await cql.run_async('CREATE TABLE test.test (pk int PRIMARY KEY, c int);')
|
||||
|
||||
keys = range(1024)
|
||||
await asyncio.gather(*[cql.run_async(f'INSERT INTO test.test (pk, c) VALUES ({k}, {k});') for k in keys])
|
||||
|
||||
(raft_leader, raft_leader_log) = await get_raft_leader_and_log(manager, servers)
|
||||
|
||||
# Decide which node to restart; select a node with a replica but not the raft leader
|
||||
tablet_replicas = await get_all_tablet_replicas(manager, raft_leader, 'test', 'test')
|
||||
replica_hosts = [tr.replicas[0][0] for tr in tablet_replicas]
|
||||
for s in servers:
|
||||
if s != raft_leader:
|
||||
host_id = await manager.get_host_id(s.server_id)
|
||||
if host_id in replica_hosts:
|
||||
restart_node = s
|
||||
break
|
||||
|
||||
# Shutdown the node containing a replica
|
||||
await manager.server_stop_gracefully(restart_node.server_id)
|
||||
# Start truncating in the background
|
||||
trunc_future = cql.run_async('TRUNCATE TABLE test.test', host=hosts[0])
|
||||
# Restart the node
|
||||
await manager.server_start(restart_node.server_id)
|
||||
# Wait for truncate to complete
|
||||
await trunc_future
|
||||
|
||||
# Check if truncate was successful
|
||||
row = await cql.run_async(SimpleStatement('SELECT COUNT(*) FROM test.test', consistency_level=ConsistencyLevel.ALL))
|
||||
assert row[0].count == 0
|
||||
|
||||
|
||||
@pytest.mark.xfail(reason="issue #21719")
|
||||
@pytest.mark.asyncio
|
||||
@skip_mode('release', 'error injections are not supported in release mode')
|
||||
async def test_truncate_with_coordinator_crash(manager: ManagerClient):
|
||||
|
||||
logger.info('Bootstrapping cluster')
|
||||
cfg = { 'enable_tablets': True }
|
||||
|
||||
servers = []
|
||||
servers.append(await manager.server_add(config=cfg))
|
||||
servers.append(await manager.server_add(config=cfg))
|
||||
|
||||
cql = manager.get_cql()
|
||||
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
|
||||
# Create a keyspace with tablets and initial_tablets == 2, then insert data
|
||||
await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 2}")
|
||||
await cql.run_async('CREATE TABLE test.test (pk int PRIMARY KEY, c int);')
|
||||
|
||||
keys = range(1024)
|
||||
await asyncio.gather(*[cql.run_async(f'INSERT INTO test.test (pk, c) VALUES ({k}, {k});') for k in keys])
|
||||
|
||||
(raft_leader, raft_leader_log) = await get_raft_leader_and_log(manager, servers)
|
||||
|
||||
if raft_leader == servers[0]:
|
||||
trunc_host = hosts[1]
|
||||
else:
|
||||
trunc_host = hosts[0]
|
||||
|
||||
# Enable injection to crash the raft leader after truncate cleared the session ID
|
||||
await manager.api.enable_injection(raft_leader.ip_addr, 'truncate_crash_after_session_clear', one_shot=False)
|
||||
|
||||
# Start a TRUNCATE in the background
|
||||
trunc_future = cql.run_async('TRUNCATE TABLE test.test', host=trunc_host)
|
||||
# Wait for the topology coordinator to crash
|
||||
await raft_leader_log.wait_for('truncate_crash_after_session_clear hit, killing the node')
|
||||
await manager.server_stop(raft_leader.server_id)
|
||||
# Restart the crashed node
|
||||
await manager.server_start(raft_leader.server_id)
|
||||
# Wait for truncate to complete
|
||||
await trunc_future
|
||||
|
||||
# Check if we have any data
|
||||
row = await cql.run_async(SimpleStatement('SELECT COUNT(*) FROM test.test', consistency_level=ConsistencyLevel.ALL))
|
||||
assert row[0].count == 0
|
||||
Reference in New Issue
Block a user