From e65a235fd5a6f77912982ca3f67fa8a4a845efff Mon Sep 17 00:00:00 2001 From: Ferenc Szili Date: Tue, 3 Dec 2024 18:18:54 +0100 Subject: [PATCH] test: add tests for truncate with tablets This patch adds the unit tests for truncate with tablets. test_truncate_while_migration() triggers a tablet migration, then runs a TRUNCATE TABLE for the table containing the tablet being migrated. test_truncate_with_concurrent_drop() starts a truncate, then attempts to drop the table while it is being truncated. test_truncate_while_node_restart() validates the case where a replica node is restarted while truncate is running. test_truncate_with_coordinator_crash() validates if truncate is correctly completed in cases where the topology coordinator has crashed or restarted after the truncate session is cleared, but before the truncate request is finalized. --- service/storage_service.cc | 4 + service/topology_coordinator.cc | 10 + .../test_truncate_with_tablets.py | 215 ++++++++++++++++++ 3 files changed, 229 insertions(+) create mode 100644 test/topology_custom/test_truncate_with_tablets.py diff --git a/service/storage_service.cc b/service/storage_service.cc index a1e8fc6220..0d1c8130ff 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -6025,6 +6025,10 @@ future<> storage_service::stream_tablet(locator::global_tablet_id tablet) { throw std::runtime_error(fmt::format("Cannot stream within the same node using regular migration, tablet: {}, shard {} -> {}", tablet, leaving_replica->shard, trinfo->pending_replica->shard)); } + co_await utils::get_local_injector().inject("migration_streaming_wait", [] (auto& handler) { + rtlogger.info("migration_streaming_wait: start"); + return handler.wait_for_message(db::timeout_clock::now() + std::chrono::minutes(2)); + }); auto& table = _db.local().find_column_family(tablet.table); std::vector tables = {table.schema()->cf_name()}; auto my_id = tm->get_my_id(); diff --git a/service/topology_coordinator.cc b/service/topology_coordinator.cc index 6273f04f91..251bddc032 100644 --- a/service/topology_coordinator.cc +++ b/service/topology_coordinator.cc @@ -873,6 +873,11 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { // Release the guard to avoid blocking group0 for long periods of time while invoking RPCs release_guard(std::move(guard)); + co_await utils::get_local_injector().inject("truncate_table_wait", [] (auto& handler) { + rtlogger.info("truncate_table_wait: start"); + return handler.wait_for_message(db::timeout_clock::now() + std::chrono::minutes(2)); + }); + // Check if all the nodes with replicas are alive for (const locator::host_id& replica_host: replica_hosts) { if (!_gossiper.is_alive(replica_host)) { @@ -917,6 +922,11 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { } } + utils::get_local_injector().inject("truncate_crash_after_session_clear", [] { + rtlogger.info("truncate_crash_after_session_clear hit, killing the node"); + _exit(1); + }); + // Execute a barrier to ensure the TRUNCATE RPC can't run on any nodes after this point if (!guard) { guard = co_await start_operation(); diff --git a/test/topology_custom/test_truncate_with_tablets.py b/test/topology_custom/test_truncate_with_tablets.py new file mode 100644 index 0000000000..da500c7515 --- /dev/null +++ b/test/topology_custom/test_truncate_with_tablets.py @@ -0,0 +1,215 @@ +# +# Copyright (C) 2024-present ScyllaDB +# +# SPDX-License-Identifier: AGPL-3.0-or-later +# +from cassandra.query import SimpleStatement, ConsistencyLevel +from cassandra.protocol import InvalidRequest +from test.pylib.manager_client import ManagerClient +from test.topology.conftest import skip_mode +from test.topology.util import get_topology_coordinator +from test.pylib.tablets import get_all_tablet_replicas +from test.pylib.util import wait_for_cql_and_get_hosts +import time +import pytest +import logging +import asyncio + +logger = logging.getLogger(__name__) + +@pytest.mark.asyncio +@skip_mode('release', 'error injections are not supported in release mode') +async def test_truncate_while_migration(manager: ManagerClient): + + logger.info('Bootstrapping cluster') + cfg = { 'enable_tablets': True, + 'error_injections_at_startup': ['migration_streaming_wait'] + } + + servers = [] + servers.append(await manager.server_add(config=cfg)) + + cql = manager.get_cql() + + # Create a keyspace with tablets and initial_tablets == 2, then insert data + await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 2}") + await cql.run_async('CREATE TABLE test.test (pk int PRIMARY KEY, c int);') + + keys = range(1024) + await asyncio.gather(*[cql.run_async(f'INSERT INTO test.test (pk, c) VALUES ({k}, {k});') for k in keys]) + + # Add a node to the cluster. This will cause the tablet load balancer to migrate one tablet to the new node + servers.append(await manager.server_add(config=cfg)) + + # Wait for tablet streaming to start + pending_node = servers[1] + pending_log = await manager.server_open_log(pending_node.server_id) + + await pending_log.wait_for('migration_streaming_wait: start') + await manager.api.message_injection(pending_node.ip_addr, 'migration_streaming_wait') + + # Do a TRUNCATE TABLE while the tablet is being streamed + await cql.run_async('TRUNCATE TABLE test.test') + + # Wait for streaming to complete + await pending_log.wait_for('raft_topology - Streaming for tablet migration of.*successful') + + # Check if we have any data + row = await cql.run_async(SimpleStatement('SELECT COUNT(*) FROM test.test', consistency_level=ConsistencyLevel.ALL)) + assert row[0].count == 0 + + +async def get_raft_leader_and_log(manager: ManagerClient, servers): + raft_leader_host_id = await get_topology_coordinator(manager) + for s in servers: + if raft_leader_host_id == await manager.get_host_id(s.server_id): + raft_leader = s + break + raft_leader_log = await manager.server_open_log(raft_leader.server_id) + return (raft_leader, raft_leader_log) + + +@pytest.mark.asyncio +@skip_mode('release', 'error injections are not supported in release mode') +async def test_truncate_with_concurrent_drop(manager: ManagerClient): + + logger.info('Bootstrapping cluster') + cfg = { 'enable_tablets': True, + 'error_injections_at_startup': ['truncate_table_wait'] + } + + servers = [] + servers.append(await manager.server_add(config=cfg)) + servers.append(await manager.server_add(config=cfg)) + servers.append(await manager.server_add(config=cfg)) + + cql = manager.get_cql() + hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60) + + # Create a keyspace with tablets and initial_tablets == 2, then insert data + await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 2}") + await cql.run_async('CREATE TABLE test.test (pk int PRIMARY KEY, c int);') + + keys = range(1024) + await asyncio.gather(*[cql.run_async(f'INSERT INTO test.test (pk, c) VALUES ({k}, {k});') for k in keys]) + + (raft_leader, raft_leader_log) = await get_raft_leader_and_log(manager, servers) + + if raft_leader == servers[0]: + trunc_host = hosts[1] + drop_host = hosts[2] + elif raft_leader == servers[1]: + trunc_host = hosts[0] + drop_host = hosts[2] + elif raft_leader == servers[2]: + trunc_host = hosts[0] + drop_host = hosts[1] + else: + assert False, 'Unable to determine raft leader' + + # Start a TRUNCATE in the background + trunc_future = cql.run_async('TRUNCATE TABLE test.test', host=trunc_host) + # Wait for the topology coordinator to reach a point wher it is about to start sending the truncate RPCs + await raft_leader_log.wait_for('truncate_table_wait: start') + # Execute DROP TABLE + await cql.run_async('DROP TABLE test.test', host=drop_host) + # Release TRUNCATE table in topology coordinator + await manager.api.message_injection(raft_leader.ip_addr, 'truncate_table_wait') + # Check we received an error + with pytest.raises(InvalidRequest, match='unconfigured table test'): + await trunc_future + + +@pytest.mark.asyncio +@skip_mode('release', 'error injections are not supported in release mode') +async def test_truncate_while_node_restart(manager: ManagerClient): + + logger.info('Bootstrapping cluster') + cfg = { 'enable_tablets': True } + + servers = [] + servers.append(await manager.server_add(config=cfg)) + servers.append(await manager.server_add(config=cfg)) + servers.append(await manager.server_add(config=cfg)) + + cql = manager.get_cql() + hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60) + + # Create a keyspace with tablets and initial_tablets == 2, then insert data + await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 2}") + await cql.run_async('CREATE TABLE test.test (pk int PRIMARY KEY, c int);') + + keys = range(1024) + await asyncio.gather(*[cql.run_async(f'INSERT INTO test.test (pk, c) VALUES ({k}, {k});') for k in keys]) + + (raft_leader, raft_leader_log) = await get_raft_leader_and_log(manager, servers) + + # Decide which node to restart; select a node with a replica but not the raft leader + tablet_replicas = await get_all_tablet_replicas(manager, raft_leader, 'test', 'test') + replica_hosts = [tr.replicas[0][0] for tr in tablet_replicas] + for s in servers: + if s != raft_leader: + host_id = await manager.get_host_id(s.server_id) + if host_id in replica_hosts: + restart_node = s + break + + # Shutdown the node containing a replica + await manager.server_stop_gracefully(restart_node.server_id) + # Start truncating in the background + trunc_future = cql.run_async('TRUNCATE TABLE test.test', host=hosts[0]) + # Restart the node + await manager.server_start(restart_node.server_id) + # Wait for truncate to complete + await trunc_future + + # Check if truncate was successful + row = await cql.run_async(SimpleStatement('SELECT COUNT(*) FROM test.test', consistency_level=ConsistencyLevel.ALL)) + assert row[0].count == 0 + + +@pytest.mark.xfail(reason="issue #21719") +@pytest.mark.asyncio +@skip_mode('release', 'error injections are not supported in release mode') +async def test_truncate_with_coordinator_crash(manager: ManagerClient): + + logger.info('Bootstrapping cluster') + cfg = { 'enable_tablets': True } + + servers = [] + servers.append(await manager.server_add(config=cfg)) + servers.append(await manager.server_add(config=cfg)) + + cql = manager.get_cql() + hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60) + + # Create a keyspace with tablets and initial_tablets == 2, then insert data + await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 2}") + await cql.run_async('CREATE TABLE test.test (pk int PRIMARY KEY, c int);') + + keys = range(1024) + await asyncio.gather(*[cql.run_async(f'INSERT INTO test.test (pk, c) VALUES ({k}, {k});') for k in keys]) + + (raft_leader, raft_leader_log) = await get_raft_leader_and_log(manager, servers) + + if raft_leader == servers[0]: + trunc_host = hosts[1] + else: + trunc_host = hosts[0] + + # Enable injection to crash the raft leader after truncate cleared the session ID + await manager.api.enable_injection(raft_leader.ip_addr, 'truncate_crash_after_session_clear', one_shot=False) + + # Start a TRUNCATE in the background + trunc_future = cql.run_async('TRUNCATE TABLE test.test', host=trunc_host) + # Wait for the topology coordinator to crash + await raft_leader_log.wait_for('truncate_crash_after_session_clear hit, killing the node') + await manager.server_stop(raft_leader.server_id) + # Restart the crashed node + await manager.server_start(raft_leader.server_id) + # Wait for truncate to complete + await trunc_future + + # Check if we have any data + row = await cql.run_async(SimpleStatement('SELECT COUNT(*) FROM test.test', consistency_level=ConsistencyLevel.ALL)) + assert row[0].count == 0