topology: disable force-gossip-topology-changes option

The patch marks force-gossip-topology-changes as deprecated and removes
tests that use it. There is one test (test_different_group0_ids) which
is marked as xfail instead since it looks like gossiper mode was used
there as a way to easily achieve a certain state, so more investigation
is needed if the tests can be fixed to use raft mode instead.

Closes scylladb/scylladb#28383
This commit is contained in:
Gleb Natapov
2026-01-27 12:05:56 +02:00
committed by Patryk Jędrzejczak
parent ceec703bb7
commit 08268eee3f
34 changed files with 27 additions and 1830 deletions

View File

@@ -814,8 +814,7 @@ generation_service::generation_service(
config cfg, gms::gossiper& g, sharded<db::system_distributed_keyspace>& sys_dist_ks,
sharded<db::system_keyspace>& sys_ks,
abort_source& abort_src, const locator::shared_token_metadata& stm, gms::feature_service& f,
replica::database& db,
std::function<bool()> raft_topology_change_enabled)
replica::database& db)
: _cfg(std::move(cfg))
, _gossiper(g)
, _sys_dist_ks(sys_dist_ks)
@@ -824,7 +823,6 @@ generation_service::generation_service(
, _token_metadata(stm)
, _feature_service(f)
, _db(db)
, _raft_topology_change_enabled(std::move(raft_topology_change_enabled))
{
}
@@ -878,16 +876,7 @@ future<> generation_service::on_join(gms::inet_address ep, locator::host_id id,
future<> generation_service::on_change(gms::inet_address ep, locator::host_id id, const gms::application_state_map& states, gms::permit_id pid) {
assert_shard_zero(__PRETTY_FUNCTION__);
if (_raft_topology_change_enabled()) {
return make_ready_future<>();
}
return on_application_state_change(ep, id, states, gms::application_state::CDC_GENERATION_ID, pid, [this] (gms::inet_address ep, locator::host_id id, const gms::versioned_value& v, gms::permit_id) {
auto gen_id = gms::versioned_value::cdc_generation_id_from_string(v.value());
cdc_log.debug("Endpoint: {}, CDC generation ID change: {}", ep, gen_id);
return legacy_handle_cdc_generation(gen_id);
});
return make_ready_future<>();
}
future<> generation_service::check_and_repair_cdc_streams() {

View File

@@ -79,17 +79,12 @@ private:
std::optional<cdc::generation_id> _gen_id;
future<> _cdc_streams_rewrite_complete = make_ready_future<>();
/* Returns true if raft topology changes are enabled.
* Can only be called from shard 0.
*/
std::function<bool()> _raft_topology_change_enabled;
public:
generation_service(config cfg, gms::gossiper&,
sharded<db::system_distributed_keyspace>&,
sharded<db::system_keyspace>& sys_ks,
abort_source&, const locator::shared_token_metadata&,
gms::feature_service&, replica::database& db,
std::function<bool()> raft_topology_change_enabled);
gms::feature_service&, replica::database& db);
future<> stop();
~generation_service();

View File

@@ -1498,7 +1498,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
, index_cache_fraction(this, "index_cache_fraction", liveness::LiveUpdate, value_status::Used, 0.2,
"The maximum fraction of cache memory permitted for use by index cache. Clamped to the [0.0; 1.0] range. Must be small enough to not deprive the row cache of memory, but should be big enough to fit a large fraction of the index. The default value 0.2 means that at least 80\% of cache memory is reserved for the row cache, while at most 20\% is usable by the index cache.")
, consistent_cluster_management(this, "consistent_cluster_management", value_status::Deprecated, true, "Use RAFT for cluster management and DDL.")
, force_gossip_topology_changes(this, "force_gossip_topology_changes", value_status::Used, false, "Force gossip-based topology operations in a fresh cluster. Only the first node in the cluster must use it. The rest will fall back to gossip-based operations anyway. This option should be used only for testing. Note: gossip topology changes are incompatible with tablets.")
, force_gossip_topology_changes(this, "force_gossip_topology_changes", value_status::Deprecated, false, "Force gossip-based topology operations in a fresh cluster. Only the first node in the cluster must use it. The rest will fall back to gossip-based operations anyway. This option should be used only for testing. Note: gossip topology changes are incompatible with tablets.")
, recovery_leader(this, "recovery_leader", liveness::LiveUpdate, value_status::Used, utils::null_uuid(), "Host ID of the node restarted first while performing the Manual Raft-based Recovery Procedure. Warning: this option disables some guardrails for the needs of the Manual Raft-based Recovery Procedure. Make sure you unset it at the end of the procedure.")
, wasm_cache_memory_fraction(this, "wasm_cache_memory_fraction", value_status::Used, 0.01, "Maximum total size of all WASM instances stored in the cache as fraction of total shard memory.")
, wasm_cache_timeout_in_ms(this, "wasm_cache_timeout_in_ms", value_status::Used, 5000, "Time after which an instance is evicted from the cache.")

View File

@@ -102,13 +102,6 @@ std::set<sstring> get_disabled_features_from_db_config(const db::config& cfg, st
if (!cfg.check_experimental(db::experimental_features_t::feature::STRONGLY_CONSISTENT_TABLES)) {
disabled.insert("STRONGLY_CONSISTENT_TABLES"s);
}
if (cfg.force_gossip_topology_changes()) {
if (cfg.enable_tablets_by_default()) {
throw std::runtime_error("Tablets cannot be enabled with gossip topology changes. Use either --tablets-mode-for-new-keyspaces=enabled|enforced or --force-gossip-topology-changes, but not both.");
}
startlog.warn("The tablets feature is disabled due to forced gossip topology changes");
disabled.insert("TABLETS"s);
}
if (!cfg.table_digest_insensitive_to_expiry()) {
disabled.insert("TABLE_DIGEST_INSENSITIVE_TO_EXPIRY"s);
}

View File

@@ -2041,8 +2041,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
cdc_config.ring_delay = std::chrono::milliseconds(cfg->ring_delay_ms());
cdc_config.dont_rewrite_streams = cfg->cdc_dont_rewrite_streams();
cdc_generation_service.start(std::move(cdc_config), std::ref(gossiper), std::ref(sys_dist_ks), std::ref(sys_ks),
std::ref(stop_signal.as_sharded_abort_source()), std::ref(token_metadata), std::ref(feature_service), std::ref(db),
[&ss] () -> bool { return ss.local().raft_topology_change_enabled(); }).get();
std::ref(stop_signal.as_sharded_abort_source()), std::ref(token_metadata), std::ref(feature_service), std::ref(db)).get();
auto stop_cdc_generation_service = defer_verbose_shutdown("CDC Generation Management service", [] {
cdc_generation_service.stop().get();
});

View File

@@ -3189,9 +3189,6 @@ future<> storage_service::join_cluster(sharded<service::storage_proxy>& proxy,
throw std::runtime_error(
"Cannot start in the Raft-based recovery procedure - Raft-based topology has not been enabled");
}
if (_db.local().get_config().force_gossip_topology_changes()) {
throw std::runtime_error("Cannot force gossip topology changes in the Raft-based recovery procedure");
}
}
}
@@ -3215,9 +3212,6 @@ future<> storage_service::join_cluster(sharded<service::storage_proxy>& proxy,
} else if (_group0->joined_group0()) {
// We are a part of group 0.
set_topology_change_kind(upgrade_state_to_topology_op_kind(_topology_state_machine._topology.upgrade_state));
if (_db.local().get_config().force_gossip_topology_changes() && raft_topology_change_enabled()) {
throw std::runtime_error("Cannot force gossip topology changes - the cluster is using raft-based topology");
}
slogger.info("The node is already in group 0 and will restart in {} mode", raft_topology_change_enabled() ? "raft" : "legacy");
} else if (_sys_ks.local().bootstrap_complete()) {
if (co_await _sys_ks.local().load_topology_features_state()) {
@@ -3238,13 +3232,8 @@ future<> storage_service::join_cluster(sharded<service::storage_proxy>& proxy,
if (_group0->load_my_id() == g0_info.id) {
// We're creating the group 0.
if (_db.local().get_config().force_gossip_topology_changes()) {
slogger.info("We are creating the group 0. Start in legacy topology operations mode by force");
set_topology_change_kind(topology_change_kind::legacy);
} else {
slogger.info("We are creating the group 0. Start in raft topology operations mode");
set_topology_change_kind(topology_change_kind::raft);
}
slogger.info("We are creating the group 0. Start in raft topology operations mode");
set_topology_change_kind(topology_change_kind::raft);
} else {
// Ask the current member of the raft group about which mode to use
auto params = join_node_query_params {};
@@ -3252,9 +3241,6 @@ future<> storage_service::join_cluster(sharded<service::storage_proxy>& proxy,
&_messaging.local(), netw::msg_addr(g0_info.ip_addr), g0_info.id, std::move(params));
switch (result.topo_mode) {
case join_node_query_result::topology_mode::raft:
if (_db.local().get_config().force_gossip_topology_changes()) {
throw std::runtime_error("Cannot force gossip topology changes - joining the cluster that is using raft-based topology");
}
slogger.info("Will join existing cluster in raft topology operations mode");
set_topology_change_kind(topology_change_kind::raft);
break;

View File

@@ -14,15 +14,11 @@ from test.pylib.manager_client import ManagerClient
from test.cluster.auth_cluster import extra_scylla_config_options as auth_config
@pytest.mark.asyncio
async def __test_attach_service_level_to_user(request, manager: ManagerClient, is_raft: bool):
async def test_attach_service_level_to_user(request, manager: ManagerClient):
user = f"test_user_{unique_name()}"
# Start nodes with correct topology
if is_raft:
servers = await manager.servers_add(3, config=auth_config)
else:
conf = {**auth_config, 'force_gossip_topology_changes': True, 'tablets_mode_for_new_keyspaces': 'disabled'}
servers = [await manager.server_add(config=conf) for _ in range(3)]
servers = await manager.servers_add(3, config=auth_config)
cql = manager.get_cql()
logging.info("Waiting until driver connects to every server")
@@ -46,28 +42,9 @@ async def __test_attach_service_level_to_user(request, manager: ManagerClient, i
for sl in sls:
await cql.run_async(f"ATTACH SERVICE LEVEL {sl} TO {user}")
#if we are not using raft we have to switch the tenant and wait for it to take effect
if not is_raft:
for ip in ips:
await manager.api.client.post('/service_levels/switch_tenants', host=ip)
# Switching tenants may be blocked if a connection is waiting for a request (see 'generic_server::connection::process_until_tenant_switch()').
# Execute enough cheap statements, so that connection on each shard will process at one statement and update its tenant.
for _ in range(100):
read_barrier(manager.api, ip)
assert verify_service_level(sl), f"All connections should be in {sl} service level"
await cql.run_async(f"DETACH SERVICE LEVEL FROM {user}")
await cql.run_async(f"DROP ROLE {user}")
for sl in sls:
await cql.run_async(f"DROP SERVICE LEVEL {sl}")
@pytest.mark.asyncio
async def test_attach_service_level_with_raft(request, manager: ManagerClient):
await __test_attach_service_level_to_user(request, manager, is_raft=True)
@pytest.mark.asyncio
async def test_attach_service_level_with_gossip(request, manager: ManagerClient):
await __test_attach_service_level_to_user(request, manager, is_raft=False)
await cql.run_async(f"DROP SERVICE LEVEL {sl}")

View File

@@ -146,47 +146,6 @@ async def check_auth_v2_works(manager: ManagerClient, hosts):
await asyncio.gather(*(cql.run_async(f"LIST ROLES OF {username}", host=host) for host in hosts))
await cql.run_async(f"DROP ROLE {username}")
@pytest.mark.asyncio
async def test_auth_v2_migration(request, manager: ManagerClient):
# First, force the first node to start in legacy mode
cfg = {**auth_config, 'force_gossip_topology_changes': True, 'tablets_mode_for_new_keyspaces': 'disabled'}
servers = [await manager.server_add(config=cfg)]
# Enable raft-based node operations for subsequent nodes - they should fall back to
# using gossiper-based node operations
cfg.pop('force_gossip_topology_changes')
servers += [await manager.server_add(config=cfg) for _ in range(2)]
cql = manager.cql
assert(cql)
logging.info("Waiting until driver connects to every server")
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
await wait_for_token_ring_and_group0_consistency(manager, time.time() + 30)
logging.info("Checking the upgrade state on all nodes")
for host in hosts:
status = await manager.api.raft_topology_upgrade_status(host.address)
assert status == "not_upgraded"
await populate_auth_v1_data(manager)
await warmup_v1_static_values(manager, hosts)
logging.info("Triggering upgrade to raft topology")
await manager.api.upgrade_to_raft_topology(hosts[0].address)
logging.info("Waiting until upgrade finishes")
await asyncio.gather(*(wait_until_topology_upgrade_finishes(manager, h.address, time.time() + 60) for h in hosts))
logging.info("Checking migrated data in system")
await check_auth_v2_data_migration(manager, hosts)
logging.info("Checking auth statements after migration")
await check_auth_v2_works(manager, hosts)
@pytest.mark.asyncio
async def test_auth_v2_during_recovery(manager: ManagerClient):
# FIXME: move this test to the Raft-based recovery procedure or remove it if unneeded.

View File

@@ -62,57 +62,6 @@ async def test_service_levels_snapshot(manager: ManagerClient):
assert set([sl.service_level for sl in result]) == set([sl.service_level for sl in new_result])
@pytest.mark.asyncio
async def test_service_levels_upgrade(request, manager: ManagerClient, build_mode: str):
# First, force the first node to start in legacy mode
cfg = {**auth_config, 'force_gossip_topology_changes': True, 'tablets_mode_for_new_keyspaces': 'disabled'}
servers = [await manager.server_add(config=cfg)]
# Enable raft-based node operations for subsequent nodes - they should fall back to
# using gossiper-based node operations
cfg.pop('force_gossip_topology_changes')
servers += [await manager.server_add(config=cfg) for _ in range(2)]
cql = manager.get_cql()
assert(cql)
logging.info("Waiting until driver connects to every server")
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
logging.info("Checking the upgrade state on all nodes")
for host in hosts:
status = await manager.api.raft_topology_upgrade_status(host.address)
assert status == "not_upgraded"
sls = ["sl" + unique_name() for _ in range(5)]
for sl in sls:
await cql.run_async(f"CREATE SERVICE LEVEL {sl}")
result = await cql.run_async("SELECT service_level FROM system_distributed.service_levels")
assert set([sl.service_level for sl in result]) == set(sls)
if build_mode in ("debug", "dev"):
# See scylladb/scylladb/#24963 for more details
logging.info("Enabling an error injection in legacy role manager, to check that we don't query auth in system_auth")
await asyncio.gather(*(manager.api.enable_injection(s.ip_addr, "standard_role_manager_fail_legacy_query", one_shot=False) for s in servers))
logging.info("Triggering upgrade to raft topology")
await manager.api.upgrade_to_raft_topology(hosts[0].address)
logging.info("Waiting until upgrade finishes")
await asyncio.gather(*(wait_until_topology_upgrade_finishes(manager, h.address, time.time() + 60) for h in hosts))
await wait_until_driver_service_level_created(manager, time.time() + 60)
result_v2 = await cql.run_async("SELECT service_level FROM system.service_levels_v2")
assert set([sl.service_level for sl in result_v2]) == set(sls + [DRIVER_SL_NAME])
sl_v2 = "sl" + unique_name()
await cql.run_async(f"CREATE SERVICE LEVEL {sl_v2}")
await asyncio.gather(*(read_barrier(manager.api, get_host_api_address(host)) for host in hosts))
result_with_sl_v2 = await cql.run_async(f"SELECT service_level FROM system.service_levels_v2")
assert set([sl.service_level for sl in result_with_sl_v2]) == set(sls + [DRIVER_SL_NAME] + [sl_v2])
@pytest.mark.asyncio
async def test_service_levels_work_during_recovery(manager: ManagerClient):
# FIXME: move this test to the Raft-based recovery procedure or remove it if unneeded.
@@ -384,50 +333,6 @@ async def test_shares_check(manager: ManagerClient):
await cql.run_async(f"CREATE SERVICE LEVEL {sl2} WITH shares=500")
await cql.run_async(f"ALTER SERVICE LEVEL {sl1} WITH shares=100")
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injection is not supported in release mode')
async def test_workload_prioritization_upgrade(manager: ManagerClient):
# This test simulates OSS->enterprise upgrade in v1 service levels.
# Using error injection, the test disables WORKLOAD_PRIORITIZATION feature
# and removes `shares` column from system_distributed.service_levels table.
config = {
**auth_config,
'authenticator': 'AllowAllAuthenticator',
'authorizer': 'AllowAllAuthorizer',
'force_gossip_topology_changes': True,
'tablets_mode_for_new_keyspaces': 'disabled',
'error_injections_at_startup': [
{
'name': 'suppress_features',
'value': 'WORKLOAD_PRIORITIZATION'
},
{
'name': 'service_levels_v1_table_without_shares'
}
]
}
servers = [await manager.server_add(config=config) for _ in range(3)]
cql = manager.get_cql()
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
# Validate that service levels' table has no `shares` column
sl_schema = await cql.run_async("DESC TABLE system_distributed.service_levels")
assert "shares int" not in sl_schema[0].create_statement
with pytest.raises(InvalidRequest):
await cql.run_async("CREATE SERVICE LEVEL sl1 WITH shares = 100")
# Do rolling restart of the cluster and remove error injections
for server in servers:
await manager.server_update_config(server.server_id, 'error_injections_at_startup', [])
await manager.rolling_restart(servers)
# Validate that `shares` column was added
logs = [await manager.server_open_log(server.server_id) for server in servers]
await logs[0].wait_for("Workload prioritization v1 started|Workload prioritization v1 is already started", timeout=10)
sl_schema_upgraded = await cql.run_async("DESC TABLE system_distributed.service_levels")
assert "shares int" in sl_schema_upgraded[0].create_statement
await cql.run_async("CREATE SERVICE LEVEL sl2 WITH shares = 100")
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injection is disabled in release mode')
async def test_service_levels_over_limit(manager: ManagerClient):

View File

@@ -13,7 +13,6 @@ class DTestConfig:
self.num_tokens = -1
self.experimental_features = []
self.tablets = False
self.force_gossip_topology_changes = False
self.scylla_features = set()
def setup(self, request):
@@ -21,7 +20,6 @@ class DTestConfig:
self.num_tokens = request.config.getoption("--num-tokens")
self.experimental_features = request.config.getoption("--experimental-features") or set()
self.tablets = request.config.getoption("--tablets", default=False)
self.force_gossip_topology_changes = request.config.getoption("--force-gossip-topology-changes", default=False)
self.scylla_features = request.config.scylla_features
@property

View File

@@ -526,10 +526,6 @@ class DTestSetup:
experimental_features.append(f)
self.scylla_features |= set(values.get("experimental_features", []))
if self.dtest_config.force_gossip_topology_changes:
logger.debug("Forcing gossip topology changes")
values["force_gossip_topology_changes"] = True
logger.debug("Setting 'enable_tablets' to %s", self.dtest_config.tablets)
values["enable_tablets"] = self.dtest_config.tablets
values["tablets_mode_for_new_keyspaces"] = "enabled" if self.dtest_config.tablets else "disabled"

View File

@@ -33,8 +33,7 @@ logger = logging.getLogger(__name__)
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_mv_topology_change(manager: ManagerClient):
cfg = {'force_gossip_topology_changes': True,
'tablets_mode_for_new_keyspaces': 'disabled',
cfg = {'tablets_mode_for_new_keyspaces': 'disabled',
'error_injections_at_startup': ['delay_before_get_view_natural_endpoint']}
servers = [await manager.server_add(config=cfg) for _ in range(3)]

View File

@@ -9,8 +9,6 @@ extra_scylla_config_options:
rf_rack_valid_keyspaces: True
tablets_mode_for_new_keyspaces: enabled
run_first:
- test_raft_recovery_stuck
- test_raft_recovery_basic
- test_group0_schema_versioning
- test_tablets_migration
- test_zero_token_nodes_topology_ops
@@ -39,7 +37,6 @@ run_in_dev:
- test_raft_ignore_nodes
- test_group0_schema_versioning
- test_different_group0_ids
- test_replace_ignore_nodes
- test_zero_token_nodes_no_replication
- test_not_enough_token_owners
- test_replace_alive_node

View File

@@ -1,52 +0,0 @@
#
# Copyright (C) 2023-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import time
import pytest
import logging
from test.pylib.manager_client import ManagerClient
from test.cluster.util import wait_for_token_ring_and_group0_consistency
logger = logging.getLogger(__name__)
@pytest.mark.asyncio
async def test_boot_after_ip_change(manager: ManagerClient) -> None:
"""Bootstrap a new node after existing one changed its IP.
Regression test for #14468. Does not apply to Raft-topology mode.
"""
cfg = {'enable_user_defined_functions': False,
'force_gossip_topology_changes': True,
'tablets_mode_for_new_keyspaces': 'disabled'}
logger.info(f"Booting initial cluster")
servers = [await manager.server_add(config=cfg) for _ in range(2)]
await wait_for_token_ring_and_group0_consistency(manager, time.time() + 30)
logger.info(f"Stopping server {servers[1]}")
await manager.server_stop_gracefully(servers[1].server_id)
logger.info(f"Changing IP of server {servers[1]}")
new_ip = await manager.server_change_ip(servers[1].server_id)
servers[1] = servers[1]._replace(ip_addr = new_ip)
logger.info(f"New IP: {new_ip}")
logger.info(f"Restarting server {servers[1]}")
await manager.server_start(servers[1].server_id)
# We need to do this wait before we boot a new node.
# Otherwise the newly booting node may contact servers[0] even before servers[0]
# saw the new IP of servers[1], and then the booting node will try to wait
# for servers[1] to be alive using its old IP (and eventually time out).
#
# Note that this still acts as a regression test for #14468.
# In #14468, the problem was that a booting node would try to wait for the old IP
# of servers[0] even after all existing servers saw the IP change.
logger.info(f"Wait until {servers[0]} sees the new IP of {servers[1]}")
await manager.server_sees_other_server(servers[0].ip_addr, servers[1].ip_addr)
logger.info(f"Booting new node")
await manager.server_add(config=cfg)

View File

@@ -13,6 +13,7 @@ from test.pylib.util import wait_for_first_completed
@pytest.mark.asyncio
@pytest.mark.xfail(reason="gossiper topology mode is no longer supported, need to rewrite the test using raft topology")
async def test_different_group0_ids(manager: ManagerClient):
"""
The reproducer for #14448.

View File

@@ -1,21 +0,0 @@
import pytest
from test.pylib.manager_client import ManagerClient
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_gossip_boot(manager: ManagerClient):
"""
Regression test for scylladb/scylladb#17493.
"""
cfg = {'error_injections_at_startup': ['gossiper_replicate_sleep'],
'force_gossip_topology_changes': True,
'tablets_mode_for_new_keyspaces': 'disabled'}
servers = [await manager.server_add(config=cfg, timeout=60) for _ in range(3)]
logs = [await manager.server_open_log(s.server_id) for s in servers]
for log in logs:
for s in servers:
await log.wait_for(f'handle_state_normal for {s.ip_addr}.*finished', timeout=60)

View File

@@ -1,358 +0,0 @@
#
# Copyright (C) 2023-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import asyncio
import time
import pytest
import logging
import re
from uuid import UUID
from cassandra.cluster import Session, ConsistencyLevel # type: ignore
from cassandra.query import SimpleStatement # type: ignore
from cassandra.pool import Host # type: ignore
from test.pylib.manager_client import ManagerClient, ServerInfo
from test.pylib.util import wait_for, wait_for_cql_and_get_hosts
from test.pylib.log_browsing import ScyllaLogFile
from test.cluster.util import reconnect_driver, wait_until_upgrade_finishes, \
enter_recovery_state, delete_raft_data_and_upgrade_state, new_test_keyspace
logger = logging.getLogger(__name__)
async def get_local_schema_version(cql: Session, h: Host) -> UUID:
rs = await cql.run_async("select schema_version from system.local where key = 'local'", host=h)
assert(rs)
return rs[0].schema_version
async def get_group0_schema_version(cql: Session, h: Host) -> UUID | None:
rs = await cql.run_async("select value from system.scylla_local where key = 'group0_schema_version'", host=h)
if rs:
return UUID(rs[0].value)
return None
async def get_scylla_tables_versions(cql: Session, h: Host) -> list[tuple[str, str, UUID | None]]:
rs = await cql.run_async("select keyspace_name, table_name, version from system_schema.scylla_tables", host=h)
return [(r.keyspace_name, r.table_name, r.version) for r in rs]
async def get_scylla_tables_version(cql: Session, h: Host, keyspace_name: str, table_name: str) -> UUID | None:
rs = await cql.run_async(
f"select version from system_schema.scylla_tables"
f" where keyspace_name = '{keyspace_name}' and table_name = '{table_name}'",
host=h)
if not rs:
pytest.fail(f"No scylla_tables row found for {keyspace_name}.{table_name}")
return rs[0].version
async def verify_local_schema_versions_synced(cql: Session, hs: list[Host]) -> None:
async def check():
versions = {h: await get_local_schema_version(cql, h) for h in hs}
logger.info(f"system.local schema_versions: {versions}")
h1, v1 = next(iter(versions.items()))
for h, v in versions.items():
if v != v1:
logger.info(f"{h1}'s system.local schema_version {v1} is different than {h}'s version {v}; retrying")
return None
return True
await wait_for(check, deadline=time.time() + 5.0, period=1.0)
async def verify_group0_schema_versions_synced(cql: Session, hs: list[Host]) -> None:
versions = {h: await get_group0_schema_version(cql, h) for h in hs}
logger.info(f"system.scylla_local group0_schema_versions: {versions}")
h1, v1 = next(iter(versions.items()))
for h, v in versions.items():
if v != v1:
pytest.fail(f"{h1}'s system.scylla_local group0_schema_version {v1} is different than {h}'s version {v}")
async def verify_scylla_tables_versions_synced(cql: Session, hs: list[Host], ignore_system_tables: bool) -> None:
versions = {h: set(await get_scylla_tables_versions(cql, h)) for h in hs}
logger.info(f"system_schema.scylla_tables: {versions}")
h1, v1 = next(iter(versions.items()))
for h, v in versions.items():
diff = v.symmetric_difference(v1)
if ignore_system_tables:
diff = {(k, t, v) for k, t, v in diff if k != "system"}
if diff:
pytest.fail(f"{h1}'s system_schema.scylla_tables contents is different than {h}'s, symmetric diff: {diff}")
async def verify_table_versions_synced(cql: Session, hs: list[Host], ignore_system_tables: bool = False) -> None:
logger.info("Verifying that versions stored in tables are in sync")
await verify_group0_schema_versions_synced(cql, hs)
await verify_local_schema_versions_synced(cql, hs)
await verify_scylla_tables_versions_synced(cql, hs, ignore_system_tables)
async def verify_in_memory_table_versions(srvs: list[ServerInfo], logs: list[ScyllaLogFile], marks: list[int], table):
"""
Assumes that `logs` are log files of servers `srvs`, correspondingly in order.
Assumes that `marks` are log markers (obtained by `ScyllaLogFile.mark()`) corresponding to `logs` in order.
Assumes that an 'alter table {table} ...' statement was performed after obtaining `marks`.
Checks that every server printed the same version in `Altering {table}...' log message.
"""
logger.info("Verifying that in-memory table schema versions are in sync")
matches = [await log.grep(f"Altering {table}.*version=(.*)", from_mark=mark) for log, mark in zip(logs, marks)]
def get_version(srv: ServerInfo, matches: list[tuple[str, re.Match[str]]]):
if not matches:
pytest.fail(f"Server {srv} didn't log 'Altering' message")
_, match = matches[0]
return UUID(match.group(1))
versions = {srv: get_version(srv, m) for srv, m in zip(srvs, matches)}
logger.info(f"In-memory table versions: {versions}")
s1, v1 = next(iter(versions.items()))
for s, v in versions.items():
if v != v1:
pytest.fail(f"{s1}'s in-memory table version {v1} is different than {s}'s version {v}")
@pytest.mark.asyncio
async def test_schema_versioning_with_recovery(manager: ManagerClient):
"""
Perform schema changes while mixing nodes in RECOVERY mode with nodes in group 0 mode.
Schema changes originating from RECOVERY node use digest-based schema versioning.
Schema changes originating from group 0 nodes use persisted versions committed through group 0.
Verify that schema versions are in sync after each schema change.
"""
cfg = {'enable_user_defined_functions': False,
'force_gossip_topology_changes': True,
'tablets_mode_for_new_keyspaces': 'disabled'}
logger.info("Booting cluster")
# Must bootstrap sequentially because of gossip topology changes
servers = [await manager.server_add(config=cfg, property_file={"dc":"dc1", "rack":f"rack{i+1}"}) for i in range(3)]
cql = manager.get_cql()
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
logger.info("Creating keyspace and table")
async with new_test_keyspace(manager, "with replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3}") as ks_name:
await verify_table_versions_synced(cql, hosts)
table_name = "t"
table = f"{ks_name}.{table_name}"
await cql.run_async(f"create table {table} (pk int primary key)")
logger.info("Waiting for driver")
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
await verify_table_versions_synced(cql, hosts)
ks_t_version = await get_scylla_tables_version(cql, hosts[0], ks_name, table_name)
assert ks_t_version
logs = [await manager.server_open_log(srv.server_id) for srv in servers]
marks = [await log.mark() for log in logs]
logger.info("Altering table")
await cql.run_async(f"alter table {table} with comment = ''")
await verify_table_versions_synced(cql, hosts)
await verify_in_memory_table_versions(servers, logs, marks, table)
new_ks_t_version = await get_scylla_tables_version(cql, hosts[0], ks_name, table_name)
assert new_ks_t_version
assert new_ks_t_version != ks_t_version
ks_t_version = new_ks_t_version
# We still have a group 0 majority, don't do this at home.
srv1 = servers[0]
logger.info(f"Rebooting {srv1} in RECOVERY mode")
h1 = next(h for h in hosts if h.address == srv1.ip_addr)
await cql.run_async("update system.scylla_local set value = 'recovery' where key = 'group0_upgrade_state'", host=h1)
await manager.server_restart(srv1.server_id)
cql = await reconnect_driver(manager)
logger.info(f"Waiting for driver")
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
await verify_table_versions_synced(cql, hosts)
# We're doing a schema change on RECOVERY node while we have two nodes running in group 0 mode.
# Don't do this at home.
#
# Now, the two nodes are not doing any schema changes right now, so this doesn't actually break anything:
# the RECOVERY node is operating using the old schema change procedure, which means
# that it pushes the schema mutations to other nodes directly with RPC, modifying
# the group 0 state machine on other two nodes.
#
# There is one problem with this however. If the RECOVERY node considers some other node
# as DOWN, it will silently *not* push the schema change, completing the operation
# "successfully" nevertheless (it will return to the driver without error).
# Usually in this case we rely on eventual convergence of schema through gossip,
# which will not happen here, because the group 0 nodes are not doing schema pulls!
# So we need to make sure that the RECOVERY node sees the other nodes as UP before
# we perform the schema change, so it pushes the mutations to them.
logger.info(f"Waiting until RECOVERY node ({srv1}) sees other servers as UP")
await manager.server_sees_others(srv1.server_id, 2)
marks = [await log.mark() for log in logs]
logger.info(f"Altering table on RECOVERY node ({srv1})")
await cql.run_async(f"alter table {table} with comment = ''", host=h1)
await verify_table_versions_synced(cql, hosts)
await verify_in_memory_table_versions(servers, logs, marks, table)
new_ks_t_version = await get_scylla_tables_version(cql, hosts[0], ks_name, table_name)
assert not new_ks_t_version
ks_t_version = new_ks_t_version
logger.info(f"Stopping {srv1} gracefully")
await manager.server_stop_gracefully(srv1.server_id)
srv2 = servers[1]
logger.info(f"Waiting until {srv2} sees {srv1} as dead")
await manager.server_not_sees_other_server(srv2.ip_addr, srv1.ip_addr)
# Now we modify schema through group 0 while the RECOVERY node is dead.
# Don't do this at home.
marks = [await log.mark() for log in logs]
h2 = next(h for h in hosts if h.address == srv2.ip_addr)
logger.info(f"Altering table on group 0 node {srv2}")
await cql.run_async(f"alter table {table} with comment = ''", host=h2)
await manager.server_start(srv1.server_id)
cql = await reconnect_driver(manager)
logger.info(f"Waiting for driver")
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
logger.info(f"Waiting until {srv2} sees {srv1} as UP")
await manager.server_sees_other_server(srv2.ip_addr, srv1.ip_addr)
# The RECOVERY node will pull schema when it gets a write.
# The other group 0 node will do a barrier so it will also sync schema before the write returns.
logger.info("Forcing schema sync through CL=ALL INSERT")
await cql.run_async(SimpleStatement(f"insert into {table} (pk) values (0)", consistency_level=ConsistencyLevel.ALL),
host=h2)
await verify_table_versions_synced(cql, hosts)
await verify_in_memory_table_versions(servers, logs, marks, table)
new_ks_t_version = await get_scylla_tables_version(cql, hosts[0], ks_name, table_name)
assert new_ks_t_version
ks_t_version = new_ks_t_version
srv3 = servers[2]
h3 = next(h for h in hosts if h.address == srv3.ip_addr)
logger.info("Finishing recovery")
for h in [h2, h3]:
await cql.run_async(
"update system.scylla_local set value = 'recovery' where key = 'group0_upgrade_state'", host=h)
await asyncio.gather(*(manager.server_restart(srv.server_id) for srv in [srv2, srv3]))
cql = await reconnect_driver(manager)
logger.info("Waiting for driver")
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
for h in [h1, h2, h3]:
await delete_raft_data_and_upgrade_state(cql, h)
logger.info("Restarting servers")
await asyncio.gather(*(manager.server_restart(srv.server_id) for srv in servers))
cql = await reconnect_driver(manager)
logger.info("Waiting for driver")
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
logging.info(f"Waiting until upgrade finishes")
for h in [h1, h2, h3]:
await wait_until_upgrade_finishes(cql, h, time.time() + 60)
await verify_table_versions_synced(cql, hosts)
for change in [
f"alter table {table} with comment = ''",
f"alter table {table} add v int",
f"alter table {table} alter v type blob"]:
marks = [await log.mark() for log in logs]
logger.info(f"Altering table with \"{change}\"")
await cql.run_async(change)
new_ks_t_version = await get_scylla_tables_version(cql, hosts[0], ks_name, table_name)
assert new_ks_t_version
assert new_ks_t_version != ks_t_version
ks_t_version = new_ks_t_version
await verify_table_versions_synced(cql, hosts)
await verify_in_memory_table_versions(servers, logs, marks, table)
@pytest.mark.asyncio
async def test_upgrade(manager: ManagerClient):
"""
This test uses the gossip-based recovery procedure.
While Raft is disabled, we use digest-based schema versioning.
Once Raft upgrade is complete, we use persisted versions committed through group 0.
"""
# Raft upgrade tests had to be replaced with recovery tests (scylladb/scylladb#16192)
# as prerequisite for getting rid of `consistent_cluster_management` flag.
# So we do the same here: start a cluster in Raft mode, then enter recovery
# to simulate a non-Raft cluster.
cfg = {'enable_user_defined_functions': False,
'force_gossip_topology_changes': True,
'tablets_mode_for_new_keyspaces': 'disabled'}
logger.info("Booting cluster")
servers = [await manager.server_add(config=cfg, property_file={"dc":"dc1", "rack":f"rack{i+1}"}) for i in range(3)]
cql = manager.get_cql()
logging.info("Waiting until driver connects to every server")
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
logging.info(f"Setting recovery state on {hosts} and restarting")
await asyncio.gather(*(enter_recovery_state(cql, h) for h in hosts))
await asyncio.gather(*(manager.server_restart(srv.server_id) for srv in servers))
cql = await reconnect_driver(manager)
logging.info("Waiting until driver connects to every server")
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
logger.info("Creating keyspace and table")
async with new_test_keyspace(manager, "with replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2}") as ks_name:
table = f"{ks_name}.t"
await verify_table_versions_synced(cql, hosts)
await cql.run_async(f"create table {table} (pk int primary key)")
logging.info(f"Deleting Raft data and upgrade state on {hosts}")
await asyncio.gather(*(delete_raft_data_and_upgrade_state(cql, h) for h in hosts))
logging.info(f"Restarting {servers}")
await asyncio.gather(*(manager.server_restart(srv.server_id) for srv in servers))
cql = await reconnect_driver(manager)
logger.info("Waiting for driver")
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
logging.info(f"Waiting until Raft upgrade procedure finishes")
await asyncio.gather(*(wait_until_upgrade_finishes(cql, h, time.time() + 60) for h in hosts))
logs = [await manager.server_open_log(srv.server_id) for srv in servers]
marks = [await log.mark() for log in logs]
logger.info("Altering table")
await cql.run_async(f"alter table {table} with comment = ''")
await verify_table_versions_synced(cql, hosts)
await verify_in_memory_table_versions(servers, logs, marks, table)
# `group0_schema_version` should be present
# and the version column for `{table}` should be non-null.
for h in hosts:
logger.info(f"Checking that `group0_schema_version` is set on {h}")
assert (await get_group0_schema_version(cql, h)) is not None
for h in hosts:
logger.info(f"Checking that `version` column for `{table}` is set on {h}")
versions = await get_scylla_tables_versions(cql, h)
for ks, _, v in versions:
if ks == "ks":
assert v is not None

View File

@@ -21,13 +21,6 @@ async def test_create_keyspace_with_default_replication_factor(manager: ManagerC
def get_pf(dc: str, rack: str) -> dict:
return {'dc': dc, 'rack': rack}
logging.info("Trying to add a zero-token server in the gossip-based topology")
await manager.server_add(config={'join_ring': False,
'force_gossip_topology_changes': True,
'tablets_mode_for_new_keyspaces': 'disabled'},
property_file={'dc': 'dc1', 'rack': 'rz'},
expected_error='the raft-based topology is disabled')
normal_cfg = {
'tablets_mode_for_new_keyspaces': 'enabled' if tablets_enabled else 'disabled',
'rf_rack_valid_keyspaces': rf_rack_valid_keyspaces

View File

@@ -83,51 +83,4 @@ async def test_cannot_disable_cluster_feature_after_all_declare_support(manager:
# Nodes should start supporting the feature
cql = cql = manager.get_cql()
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
await asyncio.gather(*(wait_for_feature('TEST_ONLY_FEATURE', cql, h, time.time() + 60) for h in hosts))
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_simulate_upgrade_legacy_to_raft_listener_registration(manager: ManagerClient):
"""
We simulate an upgrade from legacy mode to Raft. Our goal is
to make sure that the cluster successfully reaches the state
where it can start the upgrade procedure.
This test effectively reproduces the problem described
in scylladb/scylladb#18049.
"""
# We need this so that the first logs we wait for appear.
cmdline = ["--logger-log-level", "raft_topology=debug"]
# Tablets and legacy mode are incompatible with each other.
config = {"force_gossip_topology_changes": True,
"tablets_mode_for_new_keyspaces": "disabled"}
error_injection = { "name": "suppress_features", "value": "SUPPORTS_CONSISTENT_TOPOLOGY_CHANGES"}
bad_config = config | {"error_injections_at_startup": [error_injection]}
# We need to bootstrap the nodes one-by-one.
# We can't do it concurrently without Raft.
s1 = await manager.server_add(cmdline=cmdline, config=bad_config)
s2 = await manager.server_add(cmdline=cmdline, config=bad_config)
# Simulate upgrading node 1.
await manager.server_stop_gracefully(s1.server_id)
await manager.server_update_config(s1.server_id, "error_injections_at_startup", [])
log = await manager.server_open_log(s1.server_id)
mark = await log.mark()
await manager.server_start(s1.server_id)
# The node should block after this.
await log.wait_for("Waiting for cluster feature `SUPPORTS_CONSISTENT_TOPOLOGY_CHANGES`", from_mark=mark)
mark = await log.mark()
# Simulate upgrading node 2.
await manager.server_stop_gracefully(s2.server_id)
await manager.server_update_config(s2.server_id, "error_injections_at_startup", [])
await manager.server_start(s2.server_id)
# If everything went smoothly, we'll get to this.
await log.wait_for("The cluster is ready to start upgrade to the raft topology")
await asyncio.gather(*(wait_for_feature('TEST_ONLY_FEATURE', cql, h, time.time() + 60) for h in hosts))

View File

@@ -1,81 +0,0 @@
#
# Copyright (C) 2024-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import pytest
import time
import logging
from test.pylib.manager_client import ManagerClient
from test.pylib.util import wait_for_cql_and_get_hosts
from test.cluster.util import reconnect_driver, enter_recovery_state, \
delete_raft_data_and_upgrade_state, wait_until_upgrade_finishes, \
wait_for_token_ring_and_group0_consistency, new_test_keyspace
logger = logging.getLogger(__name__)
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_raft_fix_broken_snapshot(manager: ManagerClient):
"""Reproducer for scylladb/scylladb#16683.
This test uses the gossip-based recovery procedure.
Simulate upgrade-to-Raft in old cluster (which doesn't have ff386e7a445)
using RECOVERY mode and error injection.
Then bootstrap a new server.
Thanks to the new logic we will detect lack of snapshot and create one,
which the new server will receive, resulting in correct schema transfer.
"""
cfg = {'enable_user_defined_functions': False,
'force_gossip_topology_changes': True,
'tablets_mode_for_new_keyspaces': 'disabled',
'error_injections_at_startup': ['raft_sys_table_storage::bootstrap/init_index_0']}
srv = await manager.server_add(config=cfg)
cql = manager.get_cql()
h = (await wait_for_cql_and_get_hosts(cql, [srv], time.time() + 60))[0]
# Enter RECOVERY mode, create a keyspace, leave RECOVERY to create new group 0
# but with error injection that causes the snapshot to have index 0 (as in ScyllaDB 5.2).
logger.info(f"Entering recovery state on {srv}")
await enter_recovery_state(cql, h)
await manager.server_restart(srv.server_id)
cql = await reconnect_driver(manager)
await wait_for_cql_and_get_hosts(cql, [srv], time.time() + 60)
logger.info(f"Creating keyspace")
async with new_test_keyspace(manager, "with replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2}") as ks:
await cql.run_async(f"create table {ks}.t (pk int primary key)")
logger.info(f"Leaving recovery state")
await delete_raft_data_and_upgrade_state(cql, h)
await manager.server_stop_gracefully(srv.server_id)
await manager.server_start(srv.server_id)
cql = await reconnect_driver(manager)
await wait_for_cql_and_get_hosts(cql, [srv], time.time() + 60)
logger.info(f"Waiting for group 0 upgrade to finish")
await wait_until_upgrade_finishes(cql, h, time.time() + 60)
# The Raft log will only contain this change,
# older schema changes can only be obtained through snapshot transfer.
await cql.run_async(f"create table {ks}.t2 (pk int primary key)")
# Restarting the server should trigger snapshot creation.
await manager.server_restart(srv.server_id)
cql = await reconnect_driver(manager)
await wait_for_cql_and_get_hosts(cql, [srv], time.time() + 60)
await manager.server_add(config=cfg)
await manager.server_sees_others(srv.server_id, 1)
await wait_for_token_ring_and_group0_consistency(manager, time.time() + 60)
# This would fail if snapshot creation wasn't triggered,
# second node reporting 'Failed to apply mutation ... no_such_column_family`
await cql.run_async(f"insert into {ks}.t (pk) values (0)", host=h)

View File

@@ -1,68 +0,0 @@
#
# Copyright (C) 2022-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import asyncio
import pytest
import logging
import time
from test.pylib.manager_client import ManagerClient
from test.pylib.random_tables import RandomTables
from test.pylib.util import unique_name, wait_for_cql_and_get_hosts
from test.cluster.util import reconnect_driver, enter_recovery_state, \
wait_until_upgrade_finishes, delete_raft_data_and_upgrade_state, log_run_time
@pytest.mark.asyncio
@log_run_time
async def test_raft_recovery_basic(request, manager: ManagerClient):
# This test uses the gossip-based recovery procedure.
cfg = {'enable_user_defined_functions': False,
'force_gossip_topology_changes': True,
'tablets_mode_for_new_keyspaces': 'disabled'}
cmd = ['--logger-log-level', 'raft=trace']
servers = [await manager.server_add(config=cfg, cmdline=cmd) for _ in range(3)]
cql = manager.cql
assert(cql)
logging.info("Waiting until driver connects to every server")
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
logging.info(f"Setting recovery state on {hosts}")
await asyncio.gather(*(enter_recovery_state(cql, h) for h in hosts))
await asyncio.gather(*(manager.server_restart(srv.server_id) for srv in servers))
cql = await reconnect_driver(manager)
logging.info("Cluster restarted, waiting until driver reconnects to every server")
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
logging.info(f"Driver reconnected, hosts: {hosts}")
logging.info(f"Deleting Raft data and upgrade state on {hosts}")
await asyncio.gather(*(delete_raft_data_and_upgrade_state(cql, h) for h in hosts))
logging.info(f"Restarting {servers}")
await asyncio.gather(*(manager.server_restart(srv.server_id) for srv in servers))
cql = await reconnect_driver(manager)
logging.info(f"Cluster restarted, waiting until driver reconnects to every server")
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
logging.info(f"Driver reconnected, hosts: {hosts}. Waiting until upgrade finishes")
await asyncio.gather(*(wait_until_upgrade_finishes(cql, h, time.time() + 60) for h in hosts))
logging.info("Upgrade finished. Creating a new table")
random_tables = RandomTables(request.node.name, manager, unique_name(), 1)
table = await random_tables.add_table(ncolumns=5)
logging.info("Checking group0_history")
rs = await cql.run_async("select * from system.group0_history")
assert(rs)
logging.info(f"group0_history entry description: '{rs[0].description}'")
assert(table.full_name in rs[0].description)
logging.info("Booting new node")
await manager.server_add(config=cfg, cmdline=cmd)

View File

@@ -8,6 +8,9 @@ import logging
import time
import pytest
from cassandra.cluster import Session
from cassandra.pool import Host
from uuid import UUID
from test.pylib.internal_types import ServerInfo
from test.pylib.manager_client import ManagerClient
from test.pylib.rest_client import get_host_api_address, read_barrier
@@ -15,8 +18,17 @@ from test.pylib.util import wait_for_cql_and_get_hosts
from test.cluster.util import check_system_topology_and_cdc_generations_v3_consistency, \
check_token_ring_and_group0_consistency, delete_discovery_state_and_group0_id, delete_raft_group_data, \
reconnect_driver, wait_for_cdc_generations_publishing
from test.cluster.test_group0_schema_versioning import get_group0_schema_version, get_local_schema_version
async def get_group0_schema_version(cql: Session, h: Host) -> UUID | None:
rs = await cql.run_async("select value from system.scylla_local where key = 'group0_schema_version'", host=h)
if rs:
return UUID(rs[0].value)
return None
async def get_local_schema_version(cql: Session, h: Host) -> UUID:
rs = await cql.run_async("select schema_version from system.local where key = 'local'", host=h)
assert(rs)
return rs[0].schema_version
@pytest.mark.asyncio
async def test_raft_recovery_entry_loss(manager: ManagerClient):

View File

@@ -1,84 +0,0 @@
#
# Copyright (C) 2022-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import asyncio
import pytest
import logging
import time
from test.pylib.manager_client import ManagerClient
from test.pylib.random_tables import RandomTables
from test.pylib.util import unique_name, wait_for_cql_and_get_hosts
from test.cluster.util import reconnect_driver, enter_recovery_state, \
wait_until_upgrade_finishes, delete_raft_data_and_upgrade_state, log_run_time
@pytest.mark.asyncio
@log_run_time
async def test_recovery_after_majority_loss(request, manager: ManagerClient):
"""
This test uses the gossip-based recovery procedure.
All initial servers but one fail - group 0 is left without a majority. We create a new group
0 by entering RECOVERY, using `removenode` to get rid of the other servers, clearing Raft
data and restarting. The Raft upgrade procedure runs to establish a single-node group 0. We
also verify that schema changes performed using the old group 0 are still there.
Note: in general there's no guarantee that all schema changes will be present; the minority
used to recover group 0 might have missed them. However in this test the driver waits
for schema agreement to complete before proceeding, so we know that every server learned
about the schema changes.
"""
cfg = {'enable_user_defined_functions': False,
'force_gossip_topology_changes': True,
'tablets_mode_for_new_keyspaces': 'disabled'}
servers = [await manager.server_add(config=cfg) for _ in range(3)]
logging.info("Waiting until driver connects to every server")
cql = manager.get_cql()
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
logging.info("Creating a bunch of tables")
random_tables = RandomTables(request.node.name, manager, unique_name(), 1)
tables = await asyncio.gather(*(random_tables.add_table(ncolumns=5) for _ in range(5)))
srv1, *others = servers
logging.info(f"Killing all nodes except {srv1}")
await asyncio.gather(*(manager.server_stop_gracefully(srv.server_id) for srv in others))
logging.info(f"Entering recovery state on {srv1}")
host1 = next(h for h in hosts if h.address == srv1.ip_addr)
await enter_recovery_state(cql, host1)
await manager.server_restart(srv1.server_id)
cql = await reconnect_driver(manager)
logging.info("Node restarted, waiting until driver connects")
host1 = (await wait_for_cql_and_get_hosts(cql, [srv1], time.time() + 60))[0]
for i in range(len(others)):
to_remove = others[i]
ignore_dead_ips = [srv.ip_addr for srv in others[i+1:]]
logging.info(f"Removing {to_remove} using {srv1} with ignore_dead: {ignore_dead_ips}")
await manager.remove_node(srv1.server_id, to_remove.server_id, ignore_dead_ips)
logging.info(f"Deleting old Raft data and upgrade state on {host1} and restarting")
await delete_raft_data_and_upgrade_state(cql, host1)
await manager.server_restart(srv1.server_id)
cql = await reconnect_driver(manager)
logging.info("Node restarted, waiting until driver connects")
host1 = (await wait_for_cql_and_get_hosts(cql, [srv1], time.time() + 60))[0]
logging.info(f"Driver reconnected, host: {host1}. Waiting until upgrade finishes.")
await wait_until_upgrade_finishes(cql, host1, time.time() + 60)
logging.info("Checking if previously created tables still exist")
await asyncio.gather(*(cql.run_async(f"select * from {t.full_name}") for t in tables))
logging.info("Creating another table")
await random_tables.add_table(ncolumns=5)
logging.info("Booting new node")
await manager.server_add(config=cfg)

View File

@@ -1,136 +0,0 @@
#
# Copyright (C) 2022-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import asyncio
import pytest
import logging
import time
from test.pylib.manager_client import ManagerClient
from test.pylib.random_tables import RandomTables
from test.pylib.util import unique_name, wait_for_cql_and_get_hosts
from test.cluster.util import (delete_raft_data_and_upgrade_state, enter_recovery_state, log_run_time,
reconnect_driver, wait_for_upgrade_state, wait_until_upgrade_finishes)
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
@log_run_time
async def test_recover_stuck_raft_recovery(request, manager: ManagerClient):
"""
This test uses the gossip-based recovery procedure.
1. Create a cluster,
2. Enter RECOVERY state on every server.
3. Delete the Raft data and the upgrade state on all servers.
4. Restart them and the upgrade procedure starts.
5. Start the first node with a group 0 upgrade error injected to it, so it fails.
6. Start the rest of the nodes in the cluster, they enter 'synchronize' state.
We assume the failed server cannot be recovered. We cannot just remove it at this point;
it's already part of group 0, `remove_from_group0` will wait until upgrade procedure
finishes - but the procedure is stuck. To proceed we:
7. Enter RECOVERY state on the other servers,
8. Remove the failed node, and
9. Clear existing Raft data.
10. After leaving RECOVERY, the remaining nodes will restart the procedure, establish a new
group 0 and finish upgrade.
"""
cfg = {'enable_user_defined_functions': False,
'force_gossip_topology_changes': True,
'tablets_mode_for_new_keyspaces': 'disabled'}
servers = [await manager.server_add(config=cfg) for _ in range(3)]
srv1, *others = servers
logging.info("Waiting until driver connects to every server")
cql = manager.get_cql()
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
logging.info(f"Setting recovery state on {hosts}")
await asyncio.gather(*(enter_recovery_state(cql, h) for h in hosts))
await asyncio.gather(*(manager.server_restart(srv.server_id) for srv in servers))
cql = await reconnect_driver(manager)
logging.info(f"Cluster restarted, waiting until driver reconnects to {others}")
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
logging.info(f"Driver reconnected, hosts: {hosts}")
logging.info(f"Deleting Raft data and upgrade state on {hosts}")
await asyncio.gather(*(delete_raft_data_and_upgrade_state(cql, h) for h in hosts))
logging.info(f"Stopping {servers}")
await asyncio.gather(*(manager.server_stop_gracefully(srv.server_id) for srv in servers))
logging.info(f"Starting {srv1} with injected group 0 upgrade error")
await manager.server_update_config(srv1.server_id, 'error_injections_at_startup', ['group0_upgrade_before_synchronize'])
await manager.server_start(srv1.server_id)
logging.info(f"Starting {others}")
await asyncio.gather(*(manager.server_start(srv.server_id) for srv in others))
logging.info(f"Waiting until {servers} see each other as alive")
await manager.servers_see_each_other(servers)
cql = await reconnect_driver(manager)
logging.info(f"Cluster restarted, waiting until driver reconnects to {others}")
hosts = await wait_for_cql_and_get_hosts(cql, others, time.time() + 60)
logging.info(f"Driver reconnected, hosts: {hosts}")
logging.info(f"Waiting until {hosts} enter 'synchronize' state")
await asyncio.gather(*(wait_for_upgrade_state('synchronize', cql, h, time.time() + 60) for h in hosts))
logging.info(f"{hosts} entered synchronize")
log_file1 = await manager.server_open_log(srv1.server_id)
logging.info(f"Checking if Raft upgrade procedure failed on {srv1}")
await log_file1.wait_for("error injection before group 0 upgrade enters synchronize")
logging.info(f"Setting recovery state on {hosts}")
await asyncio.gather(*(enter_recovery_state(cql, h) for h in hosts))
logging.info(f"Restarting {others}")
await manager.rolling_restart(others)
# Prevent scylladb/scylladb#21724
logging.info("Wait until everyone sees everyone as alive")
await manager.servers_see_each_other(servers)
await reconnect_driver(manager)
logging.info(f"{others} restarted, waiting until driver reconnects to them")
cql, hosts = await manager.get_ready_cql(others)
logging.info(f"Checking if {hosts} are in recovery state")
for host in hosts:
rs = await cql.run_async(
"select value from system.scylla_local where key = 'group0_upgrade_state'",
host=host)
assert rs[0].value == 'recovery'
logging.info("Creating a table while in recovery state")
random_tables = RandomTables(request.node.name, manager, unique_name(), 1)
table = await random_tables.add_table(ncolumns=5)
logging.info(f"Stopping {srv1}")
await manager.server_stop_gracefully(srv1.server_id)
logging.info(f"Removing {srv1} using {others[0]}")
await manager.remove_node(others[0].server_id, srv1.server_id)
logging.info(f"Deleting Raft data and upgrade state on {hosts}")
await asyncio.gather(*(delete_raft_data_and_upgrade_state(cql, h) for h in hosts))
logging.info(f"Restarting {others}")
await manager.rolling_restart(others)
await reconnect_driver(manager)
logging.info(f"Cluster restarted, waiting until driver reconnects to {others}")
cql, hosts = await manager.get_ready_cql(others)
logging.info(f"Driver reconnected, hosts: {hosts}, waiting until upgrade finishes")
await asyncio.gather(*(wait_until_upgrade_finishes(cql, h, time.time() + 60) for h in hosts))
logging.info("Checking if previously created table still exists")
await cql.run_async(f"select * from {table.full_name}")

View File

@@ -1,58 +0,0 @@
#
# Copyright (C) 2023-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import time
import pytest
import logging
from test.pylib.internal_types import IPAddress, HostID
from test.pylib.scylla_cluster import ReplaceConfig
from test.pylib.manager_client import ManagerClient
from test.cluster.util import wait_for_token_ring_and_group0_consistency
logger = logging.getLogger(__name__)
@pytest.mark.asyncio
async def test_replace_ignore_nodes(manager: ManagerClient) -> None:
"""Replace a node in presence of multiple dead nodes.
Regression test for #14487. Does not apply to Raft-topology mode.
This is a slow test with a 7 node cluster any 3 replace operations,
we don't want to run it in debug mode.
Preferably run it only in one mode e.g. dev.
"""
cfg = {'enable_user_defined_functions': False,
'force_gossip_topology_changes': True,
'tablets_mode_for_new_keyspaces': 'disabled'}
logger.info(f"Booting initial cluster")
servers = [await manager.server_add(config=cfg) for _ in range(7)]
s2_id = await manager.get_host_id(servers[2].server_id)
logger.info(f"Stopping servers {servers[:3]}")
await manager.server_stop(servers[0].server_id)
await manager.server_stop(servers[1].server_id)
await manager.server_stop_gracefully(servers[2].server_id)
# The parameter accepts both IP addrs with host IDs.
# We must be able to resolve them in both ways.
ignore_dead: list[IPAddress | HostID] = [servers[1].ip_addr, s2_id]
logger.info(f"Replacing {servers[0]}, ignore_dead_nodes = {ignore_dead}")
replace_cfg = ReplaceConfig(replaced_id = servers[0].server_id, reuse_ip_addr = False, use_host_id = False,
ignore_dead_nodes = ignore_dead)
await manager.server_add(replace_cfg=replace_cfg, config=cfg)
await wait_for_token_ring_and_group0_consistency(manager, time.time() + 30)
ignore_dead = [servers[2].ip_addr]
logger.info(f"Replacing {servers[1]}, ignore_dead_nodes = {ignore_dead}")
replace_cfg = ReplaceConfig(replaced_id = servers[1].server_id, reuse_ip_addr = False, use_host_id = False,
ignore_dead_nodes = ignore_dead)
await manager.server_add(replace_cfg=replace_cfg, config=cfg)
await wait_for_token_ring_and_group0_consistency(manager, time.time() + 30)
logger.info(f"Replacing {servers[2]}")
replace_cfg = ReplaceConfig(replaced_id = servers[2].server_id, reuse_ip_addr = False, use_host_id = False)
await manager.server_add(replace_cfg=replace_cfg, config=cfg)
await wait_for_token_ring_and_group0_consistency(manager, time.time() + 30)

View File

@@ -9,7 +9,6 @@ async def test_drop_table_during_streaming_receiver_side(manager: ManagerClient)
'error_injections_at_startup': ['stream_mutation_fragments_table_dropped'],
'enable_repair_based_node_ops': False,
'enable_user_defined_functions': False,
'force_gossip_topology_changes': True,
'tablets_mode_for_new_keyspaces': 'disabled'
}) for _ in range(2)]

View File

@@ -832,27 +832,6 @@ async def test_keyspace_creation_cql_vs_config_sanity(manager: ManagerClient, wi
res = cql.execute(f"SELECT initial_tablets FROM system_schema.scylla_keyspaces WHERE keyspace_name = '{ks}'").one()
assert res is None
@pytest.mark.asyncio
async def test_tablets_and_gossip_topology_changes_are_incompatible(manager: ManagerClient):
cfg = {"tablets_mode_for_new_keyspaces": "enabled", "force_gossip_topology_changes": True}
with pytest.raises(Exception, match="Failed to add server"):
await manager.server_add(config=cfg)
@pytest.mark.asyncio
async def test_tablets_disabled_with_gossip_topology_changes(manager: ManagerClient):
cfg = {"tablets_mode_for_new_keyspaces": "disabled", "force_gossip_topology_changes": True}
await manager.server_add(config=cfg)
cql = manager.get_cql()
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}") as ks_name:
res = cql.execute(f"SELECT * FROM system_schema.scylla_keyspaces WHERE keyspace_name = '{ks_name}'").one()
logger.info(res)
for enabled in ["false", "true"]:
expected = r"Error from server: code=2000 \[Syntax error in CQL query\] message=\"line 1:126 no viable alternative at input 'tablets'\""
with pytest.raises(SyntaxException, match=expected):
ks_name = unique_name()
await cql.run_async(f"CREATE KEYSPACE {ks_name} WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} AND tablets {{'enabled': {enabled}}};")
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_tablet_streaming_with_unbuilt_view(manager: ManagerClient):

View File

@@ -1,139 +0,0 @@
#
# Copyright (C) 2022-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
"""
Test removenode with node with node no longer member
"""
import logging
from test.pylib.manager_client import ManagerClient
from test.pylib.rest_client import inject_error_one_shot
from test.pylib.util import wait_for_cql_and_get_hosts
from test.cluster.util import get_token_ring_host_ids, get_current_group0_config, \
check_token_ring_and_group0_consistency, wait_for_token_ring_and_group0_consistency
import time
import pytest
logger = logging.getLogger(__name__)
async def test_remove_garbage_group0_members(manager: ManagerClient):
"""
Verify that failing to leave group 0 or remove a node from group 0 in removenode/decommission
can be handled by executing removenode (which should clear the 'garbage' group 0 member),
even though the node is no longer a token ring member. Does not apply to Raft-topology mode.
"""
# 4 servers, one dead
cfg = {'enable_user_defined_functions': False,
'force_gossip_topology_changes': True,
'tablets_mode_for_new_keyspaces': 'disabled'}
servers = [await manager.server_add(config=cfg) for _ in range(4)]
# Make sure that the driver has connected to all nodes, and they see each other as NORMAL
# (otherwise the driver may remove connection to some host, even after it manages to connect to it,
# because the node that it has control connection to considers that host as not NORMAL yet).
# This ensures that after we stop/remove some nodes in the test, the driver will still
# be able to connect to the remaining nodes. See scylladb/scylladb#16373
await wait_for_token_ring_and_group0_consistency(manager, time.time() + 60)
await wait_for_cql_and_get_hosts(manager.get_cql(), servers, time.time() + 60)
removed_host_id = await manager.get_host_id(servers[0].server_id)
await manager.server_stop_gracefully(servers[0].server_id)
logging.info(f'removenode {servers[0]} using {servers[1]}')
# removenode will fail after removing the server from the token ring,
# but before removing it from group 0
await inject_error_one_shot(manager.api, servers[1].ip_addr,
'removenode_fail_before_remove_from_group0')
try:
await manager.remove_node(servers[1].server_id, servers[0].server_id)
except Exception:
# Note: the exception returned here is only '500 internal server error',
# need to look in test.py log for the actual message coming from Scylla.
logging.info(f'expected exception during injection')
# Query the storage_service/host_id endpoint to calculate a list of known token ring members' Host IDs
# (internally, this endpoint uses token_metadata)
token_ring_ids = await get_token_ring_host_ids(manager, servers[1])
logging.info(f'token ring members: {token_ring_ids}')
group0_members = await get_current_group0_config(manager, servers[1])
logging.info(f'group 0 members: {group0_members}')
group0_ids = {m[0] for m in group0_members}
# Token ring members should currently be a subset of group 0 members
assert token_ring_ids <= group0_ids
garbage_members = group0_ids - token_ring_ids
logging.info(f'garbage members: {garbage_members}')
assert len(garbage_members) == 1
garbage_member = next(iter(garbage_members))
# The garbage member is the one that we failed to remove
assert garbage_member == removed_host_id
# Verify that at least it's a non-voter.
assert garbage_member in {m[0] for m in group0_members if not m[1]}
logging.info(f'removenode {servers[0]} using {servers[1]} again')
# Retry removenode. It should skip the token ring removal step and remove the server from group 0.
await manager.remove_node(servers[1].server_id, servers[0].server_id)
group0_members = await get_current_group0_config(manager, servers[1])
logging.info(f'group 0 members: {group0_members}')
group0_ids = {m[0] for m in group0_members}
# Token ring members and group 0 members should now be the same.
assert token_ring_ids == group0_ids
# Verify that availability is not reduced.
# Stop one of the 3 remaining servers and try to remove it. It should succeed with only 2 servers.
logging.info(f'stop {servers[1]}')
await manager.server_stop_gracefully(servers[1].server_id)
logging.info(f'removenode {servers[1]} using {servers[2]}')
await manager.remove_node(servers[2].server_id, servers[1].server_id)
# Perform a similar scenario with decommission. One of the node fails to decommission fully,
# but it manages to leave the token ring. We observe the leftovers using the same APIs as above
# and remove the leftovers.
# We can do this with only 2 nodes because during decommission we become a non-voter before
# leaving the token ring, thus the remaining single node will become a voting majority
# and will be able to perform removenode alone.
decommissioned_host_id = await manager.get_host_id(servers[2].server_id)
await manager.api.enable_injection(
servers[2].ip_addr, 'decommission_fail_before_leave_group0', one_shot=True)
logging.info(f'decommission {servers[2]}')
try:
await manager.decommission_node(servers[2].server_id)
except Exception:
logging.info(f'expected exception during injection')
logging.info(f'stop {servers[2]}')
await manager.server_stop_gracefully(servers[2].server_id)
token_ring_ids = await get_token_ring_host_ids(manager, servers[3])
logging.info(f'token ring members: {token_ring_ids}')
group0_members = await get_current_group0_config(manager, servers[3])
logging.info(f'group 0 members: {group0_members}')
group0_ids = {m[0] for m in group0_members}
assert token_ring_ids <= group0_ids
garbage_members = group0_ids - token_ring_ids
logging.info(f'garbage members: {garbage_members}')
assert len(garbage_members) == 1
garbage_member = next(iter(garbage_members))
assert garbage_member == decommissioned_host_id
assert garbage_member in {m[0] for m in group0_members if not m[1]}
logging.info(f'removenode {servers[2]} using {servers[3]}')
await manager.remove_node(servers[3].server_id, servers[2].server_id)
await check_token_ring_and_group0_consistency(manager)

View File

@@ -1,81 +0,0 @@
#
# Copyright (C) 2024-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import asyncio
import logging
import pytest
import time
from test.pylib.manager_client import ManagerClient
from test.pylib.util import wait_for_cql_and_get_hosts
from test.cluster.util import log_run_time, wait_until_last_generation_is_in_use, wait_until_topology_upgrade_finishes, \
wait_for_cdc_generations_publishing, check_system_topology_and_cdc_generations_v3_consistency, \
start_writes_to_cdc_table
@pytest.mark.asyncio
@log_run_time
async def test_topology_upgrade_basic(request, build_mode: str, manager: ManagerClient):
# First, force the first node to start in legacy mode
cfg = {
'force_gossip_topology_changes': True,
'tablets_mode_for_new_keyspaces': 'disabled',
'ring_delay_ms': 15000 if build_mode == 'debug' else 5000,
}
servers = [await manager.server_add(config=cfg)]
# Enable raft-based node operations for subsequent nodes - they should fall back to
# using gossiper-based node operations
del cfg['force_gossip_topology_changes']
servers += [await manager.server_add(config=cfg) for _ in range(2)]
cql = manager.cql
assert(cql)
logging.info("Waiting until driver connects to every server")
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
logging.info("Checking the upgrade state on all nodes")
for host in hosts:
status = await manager.api.raft_topology_upgrade_status(host.address)
assert status == "not_upgraded"
_, stop_writes_and_verify = await start_writes_to_cdc_table(cql)
logging.info("Triggering upgrade to raft topology")
await manager.api.upgrade_to_raft_topology(hosts[0].address)
logging.info("Check that triggering upgrade is idempotent")
await manager.api.upgrade_to_raft_topology(hosts[0].address)
logging.info("Waiting until upgrade finishes")
await asyncio.gather(*(wait_until_topology_upgrade_finishes(manager, h.address, time.time() + 60) for h in hosts))
logging.info("Waiting for CDC generations publishing")
await wait_for_cdc_generations_publishing(cql, hosts, time.time() + 60)
logging.info("Checking consistency of data in system.topology and system.cdc_generations_v3")
await check_system_topology_and_cdc_generations_v3_consistency(manager, hosts)
logging.info("Booting new node")
servers.append(await manager.server_add(config=cfg))
logging.info("Waiting until driver connects to every server")
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
logging.info("Waiting for the new CDC generation publishing")
await wait_for_cdc_generations_publishing(cql, hosts, time.time() + 60)
logging.info("Checking consistency of data in system.topology and system.cdc_generations_v3")
await check_system_topology_and_cdc_generations_v3_consistency(manager, hosts)
await wait_until_last_generation_is_in_use(cql)
logging.debug("Sleeping for 1 second to make sure there are writes to the CDC table in the last generation")
await asyncio.sleep(1)
logging.info("Checking correctness of data in system_distributed.cdc_streams_descriptions_v2")
await stop_writes_and_verify()

View File

@@ -1,63 +0,0 @@
#
# Copyright (C) 2024-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import asyncio
import logging
import pytest
import time
from test.pylib.manager_client import ManagerClient
from test.pylib.rest_client import inject_error
from test.pylib.util import wait_for_cql_and_get_hosts
from test.cluster.util import wait_until_topology_upgrade_finishes, \
wait_for_cdc_generations_publishing, \
check_system_topology_and_cdc_generations_v3_consistency
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_topology_upgrade_not_stuck_after_recent_removal(request, manager: ManagerClient):
"""
Regression test for https://github.com/scylladb/scylladb/issues/18198.
1. Create a two node cluster in legacy mode
2. Remove one of the nodes
3. Upgrade the cluster to raft topology.
4. Verify that the upgrade went OK and it did not get stuck.
"""
# First, force the nodes to start in legacy mode due to the error injection
cfg = {
'force_gossip_topology_changes': True,
'tablets_mode_for_new_keyspaces': 'disabled',
}
logging.info("Creating a two node cluster")
servers = [await manager.server_add(config=cfg) for _ in range(2)]
cql = manager.cql
assert(cql)
srv1, srv2 = servers
logging.info("Removing the second node in the cluster")
await manager.decommission_node(srv2.server_id)
logging.info("Waiting until driver connects to the only server")
host1 = (await wait_for_cql_and_get_hosts(cql, [srv1], time.time() + 60))[0]
logging.info("Checking the upgrade state")
status = await manager.api.raft_topology_upgrade_status(host1.address)
assert status == "not_upgraded"
logging.info("Triggering upgrade to raft topology")
await manager.api.upgrade_to_raft_topology(host1.address)
logging.info("Waiting until upgrade finishes")
await wait_until_topology_upgrade_finishes(manager, host1.address, time.time() + 60)
logging.info("Waiting for CDC generations publishing")
await wait_for_cdc_generations_publishing(cql, [host1], time.time() + 60)
logging.info("Checking consistency of data in system.topology and system.cdc_generations_v3")
await check_system_topology_and_cdc_generations_v3_consistency(manager, [host1])

View File

@@ -1,158 +0,0 @@
#
# Copyright (C) 2025-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import asyncio
import logging
import pytest
import time
from typing import List
from test.pylib.log_browsing import ScyllaLogFile
from test.pylib.manager_client import ManagerClient
from test.pylib.scylla_cluster import gather_safely
from test.pylib.util import wait_for_cql_and_get_hosts, wait_for_first_completed
from test.cluster.util import reconnect_driver, enter_recovery_state, \
delete_raft_data_and_upgrade_state, log_run_time, wait_until_upgrade_finishes as wait_until_schema_upgrade_finishes, \
wait_until_topology_upgrade_finishes, delete_raft_topology_state, wait_for_cdc_generations_publishing, \
check_system_topology_and_cdc_generations_v3_consistency
async def wait_for_log_on_any_node(logs: List[ScyllaLogFile], marks: List[int], pattern: str):
"""
Waits until a given line appears on any node in the cluster.
"""
assert len(logs) == len(marks)
await wait_for_first_completed([l.wait_for(pattern) for l, m in zip(logs, marks)])
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
@pytest.mark.skip_mode(mode='debug', reason='test performs many topology changes')
@log_run_time
async def test_topology_upgrade_stuck(request, manager: ManagerClient):
"""
Simulates a situation where upgrade procedure gets stuck due to majority
loss: we have one upgraded node, one not upgraded node, and three nodes
permanently down. Then, it verifies that it's possible to perform recovery
procedure and redo the upgrade after the issue is resolved.
"""
# First, force the first node to start in legacy mode
cfg = {'force_gossip_topology_changes': True, 'tablets_mode_for_new_keyspaces': 'disabled'}
servers = [await manager.server_add(config=cfg) for _ in range(5)]
to_be_upgraded_node, to_be_isolated_node, *to_be_shutdown_nodes = servers
logging.info("Waiting until driver connects to every server")
cql, hosts = await manager.get_ready_cql(servers)
removed_hosts = hosts[2:]
logging.info("Checking the upgrade state on all nodes")
for host in hosts:
status = await manager.api.raft_topology_upgrade_status(host.address)
assert status == "not_upgraded"
logging.info("Enabling error injection which will cause the topology coordinator to get stuck")
await gather_safely(*(manager.api.enable_injection(s.ip_addr, "topology_coordinator_fail_to_build_state_during_upgrade", one_shot=False) for s in servers))
logging.info("Triggering upgrade to raft topology")
await manager.api.upgrade_to_raft_topology(hosts[0].address)
logging.info("Waiting until upgrade gets stuck due to error injection")
logs = await gather_safely(*(manager.server_open_log(s.server_id) for s in servers))
marks = await gather_safely(*(l.mark() for l in logs))
await wait_for_log_on_any_node(logs, marks, "failed to build topology coordinator state due to error injection")
logging.info("Isolate one of the nodes via error injection")
await manager.api.enable_injection(to_be_isolated_node.ip_addr, "raft_drop_incoming_append_entries", one_shot=False)
logging.info("Disable the error injection that causes upgrade to get stuck")
marks = await gather_safely(*(l.mark() for l in logs))
await gather_safely(*(manager.api.disable_injection(s.ip_addr, "topology_coordinator_fail_to_build_state_during_upgrade") for s in servers))
logging.info("Wait for the topology coordinator to observe upgrade as finished")
await wait_for_log_on_any_node(logs, marks, "upgrade to raft topology has finished")
logging.info("Shut down three nodes to simulate quorum loss")
await gather_safely(*(manager.server_stop(s.server_id) for s in to_be_shutdown_nodes))
logging.info("Disable the error injection that causes node to be isolated")
await manager.api.disable_injection(to_be_isolated_node.ip_addr, "raft_drop_incoming_append_entries")
logging.info("Checking that not all nodes finished upgrade")
upgraded_count = 0
for s in [to_be_upgraded_node, to_be_isolated_node]:
status = await manager.api.raft_topology_upgrade_status(s.ip_addr)
if status == "done":
upgraded_count += 1
assert upgraded_count != 2
logging.info(f"Only {upgraded_count}/2 nodes finished upgrade, which was expected")
servers, others = [to_be_upgraded_node, to_be_isolated_node], to_be_shutdown_nodes
logging.info(f"Obtaining hosts for nodes {servers}")
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
logging.info(f"Restarting hosts {hosts} in recovery mode")
await gather_safely(*(enter_recovery_state(cql, h) for h in hosts))
await manager.rolling_restart(servers)
cql = await reconnect_driver(manager)
await manager.servers_see_each_other(servers)
logging.info("Cluster restarted, waiting until driver reconnects to every server")
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
logging.info(f"Driver reconnected, hosts: {hosts}")
for i in range(len(others)):
to_remove = others[i]
ignore_dead_ips = [srv.ip_addr for srv in others[i+1:]]
logging.info(f"Removing {to_remove} using {servers[0]} with ignore_dead: {ignore_dead_ips}")
await manager.remove_node(servers[0].server_id, to_remove.server_id, ignore_dead_ips)
logging.info(f"Deleting Raft data and upgrade state on {hosts}")
await gather_safely(*(delete_raft_topology_state(cql, h) for h in hosts))
await gather_safely(*(delete_raft_data_and_upgrade_state(cql, h) for h in hosts))
logging.info(f"Restarting hosts {hosts}")
await manager.rolling_restart(servers)
cql = await reconnect_driver(manager)
logging.info("Cluster restarted, waiting until driver reconnects to every server")
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
logging.info("Waiting until upgrade to raft schema finishes")
await gather_safely(*(wait_until_schema_upgrade_finishes(cql, h, time.time() + 60) for h in hosts))
logging.info("Checking the topology upgrade state on all nodes")
for host in hosts:
status = await manager.api.raft_topology_upgrade_status(host.address)
assert status == "not_upgraded"
logging.info("Waiting until all nodes see others as alive")
await manager.servers_see_each_other(servers)
logging.info("Triggering upgrade to raft topology")
await manager.api.upgrade_to_raft_topology(hosts[0].address)
logging.info("Waiting until upgrade finishes")
await gather_safely(*(wait_until_topology_upgrade_finishes(manager, h.address, time.time() + 60) for h in hosts))
logging.info("Waiting for CDC generations publishing")
await wait_for_cdc_generations_publishing(cql, hosts, time.time() + 60)
logging.info("Checking consistency of data in system.topology and system.cdc_generations_v3")
await check_system_topology_and_cdc_generations_v3_consistency(manager, hosts, ignored_hosts=removed_hosts)
logging.info("Booting three new nodes")
servers += await gather_safely(*(manager.server_add() for _ in range(3)))
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
logging.info("Waiting for the new CDC generation publishing")
await wait_for_cdc_generations_publishing(cql, hosts, time.time() + 60)
logging.info("Checking consistency of data in system.topology and system.cdc_generations_v3")
await check_system_topology_and_cdc_generations_v3_consistency(manager, hosts, ignored_hosts=removed_hosts)

View File

@@ -201,165 +201,6 @@ async def test_view_build_status_snapshot(manager: ManagerClient):
await wait_for_view(cql, "vt1", 4)
await wait_for_view(cql, "vt2", 4)
# Start cluster in view_builder v1 mode and migrate to v2.
# Verify the migration copies the v1 data, and the new v2 table
# is used after the migration.
@pytest.mark.asyncio
async def test_view_build_status_migration_to_v2(request, manager: ManagerClient):
# First, force the first node to start in legacy mode
cfg = {'force_gossip_topology_changes': True, 'tablets_mode_for_new_keyspaces': 'disabled'}
servers = [await manager.server_add(config=cfg)]
# Enable raft-based node operations for subsequent nodes - they should fall back to
# using gossiper-based node operations
del cfg['force_gossip_topology_changes']
servers += [await manager.server_add(config=cfg) for _ in range(2)]
logging.info("Waiting until driver connects to every server")
cql, hosts = await manager.get_ready_cql(servers)
logging.info("Checking the upgrade state on all nodes")
for host in hosts:
status = await manager.api.raft_topology_upgrade_status(host.address)
assert status == "not_upgraded"
ks = await create_keyspace(cql)
await create_table(cql, ks)
await create_mv(cql, ks, "vt1")
# Verify we're using v1 now
v = await get_view_builder_version(cql)
assert v == 1
await wait_for_row_count(cql, "system_distributed.view_build_status", 3, hosts[0])
result = await cql.run_async("SELECT * FROM system.view_build_status_v2")
assert len(result) == 0
logging.info("Triggering upgrade to raft topology")
await manager.api.upgrade_to_raft_topology(hosts[0].address)
logging.info("Waiting until upgrade finishes")
await asyncio.gather(*(wait_until_topology_upgrade_finishes(manager, h.address, time.time() + 60) for h in hosts))
logging.info("Checking migrated data in system")
await asyncio.gather(*(wait_for(lambda: view_builder_is_v2(cql, host=h), time.time() + 60) for h in hosts))
# Check that new writes are written to the v2 table
await create_mv(cql, ks, "vt2")
await asyncio.gather(*(wait_for_view_v2(cql, ks, "vt2", 3, host=h) for h in hosts))
await wait_for_row_count(cql, "system.view_build_status_v2", 6, hosts[0])
# Migrate the view_build_status table to v2 and write to the table during the migration.
# The migration process goes through an intermediate stage where it writes to
# both the old and new table, so the write should not be lost.
@pytest.mark.asyncio
async def test_view_build_status_migration_to_v2_with_write_during_migration(request, manager: ManagerClient):
# First, force the first node to start in legacy mode
cfg = {'force_gossip_topology_changes': True, 'tablets_mode_for_new_keyspaces': 'disabled'}
servers = [await manager.server_add(config=cfg)]
# Enable raft-based node operations for subsequent nodes - they should fall back to
# using gossiper-based node operations
del cfg['force_gossip_topology_changes']
servers += [await manager.server_add(config=cfg) for _ in range(2)]
logging.info("Waiting until driver connects to every server")
cql, hosts = await manager.get_ready_cql(servers)
logging.info("Checking the upgrade state on all nodes")
for host in hosts:
status = await manager.api.raft_topology_upgrade_status(host.address)
assert status == "not_upgraded"
ks = await create_keyspace(cql)
await create_table(cql, ks)
inj_insert = "view_builder_pause_add_new_view"
await manager.api.enable_injection(servers[1].ip_addr, inj_insert, one_shot=True)
await create_mv(cql, ks, "vt1")
# pause the migration between reading the old table and writing to the new table, so we have
# a time window where new writes may be lost.
# we don't know who the coordinator is so inject in all nodes.
inj_upgrade = "view_builder_pause_in_migrate_v2"
for s in servers:
await manager.api.enable_injection(s.ip_addr, inj_upgrade, one_shot=True)
logging.info("Triggering upgrade to raft topology")
await manager.api.upgrade_to_raft_topology(hosts[0].address)
logging.info("Waiting until upgrade finishes")
await asyncio.gather(*(wait_until_topology_upgrade_finishes(manager, h.address, time.time() + 60) for h in hosts))
logging.info("Checking migrated data in system")
# Now that the upgrade is paused, write the new view.
await manager.api.message_injection(servers[1].ip_addr, inj_insert)
await asyncio.sleep(1)
# continue the migration
for s in servers:
await manager.api.message_injection(s.ip_addr, inj_upgrade)
await asyncio.gather(*(wait_for(lambda: view_builder_is_v2(cql, host=h), time.time() + 60) for h in hosts))
await asyncio.gather(*(wait_for_view_v2(cql, ks, 'vt1', 3, host=h) for h in hosts))
# Migrate the view_build_status table to v2 while there is an 'old' write operation in progress.
# The migration should wait for the old operations to complete before continuing, otherwise
# these writes may be lost.
@pytest.mark.asyncio
async def test_view_build_status_migration_to_v2_barrier(request, manager: ManagerClient):
# First, force the first node to start in legacy mode
cfg = {'force_gossip_topology_changes': True, 'tablets_mode_for_new_keyspaces': 'disabled'}
servers = [await manager.server_add(config=cfg)]
# Enable raft-based node operations for subsequent nodes - they should fall back to
# using gossiper-based node operations
del cfg['force_gossip_topology_changes']
servers += [await manager.server_add(config=cfg) for _ in range(2)]
logging.info("Waiting until driver connects to every server")
cql, hosts = await manager.get_ready_cql(servers)
logging.info("Checking the upgrade state on all nodes")
for host in hosts:
status = await manager.api.raft_topology_upgrade_status(host.address)
assert status == "not_upgraded"
ks = await create_keyspace(cql)
await create_table(cql, ks)
# Create MV and delay the write operation to the old table
inj_insert = "view_builder_pause_add_new_view"
await manager.api.enable_injection(servers[1].ip_addr, inj_insert, one_shot=True)
await create_mv(cql, ks, "vt1")
# The upgrade should perform a barrier and wait for the delayed operation to complete before continuing.
logging.info("Triggering upgrade to raft topology")
await manager.api.upgrade_to_raft_topology(hosts[0].address)
logging.info("Waiting until upgrade finishes")
await asyncio.gather(*(wait_until_topology_upgrade_finishes(manager, h.address, time.time() + 60) for h in hosts))
# the upgrade should now be waiting for the insert to complete.
# unpause the insert
await asyncio.sleep(1)
await manager.api.message_injection(servers[1].ip_addr, inj_insert)
logging.info("Checking migrated data in system")
await asyncio.gather(*(wait_for(lambda: view_builder_is_v2(cql, host=h), time.time() + 60) for h in hosts))
await asyncio.gather(*(wait_for_view_v2(cql, ks, 'vt1', 3, host=h) for h in hosts))
# Test that when removing a node from the cluster, we clean its rows from
# the view build status table.
@pytest.mark.asyncio
@@ -428,74 +269,6 @@ async def test_view_build_status_with_replace_node(manager: ManagerClient):
await wait_for(node_rows_replaced, time.time() + 60)
# Start with view_build_status v1 mode, and create entries such that
# some of them correspond to removed nodes or non-existent views.
# Then migrate to v2 table and verify that only valid entries belonging to known nodes
# and views are migrated to the new table.
@pytest.mark.asyncio
async def test_view_build_status_migration_to_v2_with_cleanup(request, manager: ManagerClient):
# First, force the first node to start in legacy mode
cfg = {'force_gossip_topology_changes': True, 'tablets_mode_for_new_keyspaces': 'disabled'}
servers = [await manager.server_add(config=cfg)]
# Enable raft-based node operations for subsequent nodes - they should fall back to
# using gossiper-based node operations
del cfg['force_gossip_topology_changes']
# We start with total 4 nodes and we will remove one of them before the migration.
servers += [await manager.server_add(config=cfg) for _ in range(3)]
logging.info("Waiting until driver connects to every server")
cql, hosts = await manager.get_ready_cql(servers)
logging.info("Checking the upgrade state on all nodes")
for host in hosts:
status = await manager.api.raft_topology_upgrade_status(host.address)
assert status == "not_upgraded"
# Create a view. This will insert 4 entries to the view build status table, one for each node.
ks_name = await create_keyspace(cql)
await create_table(cql, ks_name)
await create_mv(cql, ks_name, "vt1")
await wait_for_view_v1(cql, "vt1", 4)
await wait_for_row_count(cql, "system_distributed.view_build_status", 4, hosts[0])
# Insert a row that doesn't correspond to an existing view, but does correspond to a known host.
# This row should get cleaned during migration.
s0_host_id = await manager.get_host_id(servers[0].server_id)
await cql.run_async(f"INSERT INTO system_distributed.view_build_status(keyspace_name, view_name, host_id, status) \
VALUES ('{ks_name}', 'view_doesnt_exist', {s0_host_id}, 'SUCCESS')")
# Remove the last node. the entry for this node in the view build status remains and it
# corresponds now to an unknown node. The migration should remove it.
logging.info("Removing last node")
await manager.server_stop_gracefully(servers[-1].server_id)
await manager.remove_node(servers[0].server_id, servers[-1].server_id)
servers = servers[:-1]
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
logging.info("Triggering upgrade to raft topology")
await manager.api.upgrade_to_raft_topology(hosts[0].address)
logging.info("Waiting until upgrade finishes")
await asyncio.gather(*(wait_until_topology_upgrade_finishes(manager, h.address, time.time() + 60) for h in hosts))
logging.info("Checking migrated data in system")
# Wait for migration and upgrade to view build status v2.
await asyncio.gather(*(wait_for(lambda: view_builder_is_v2(cql, host=h), time.time() + 60) for h in hosts))
await wait_for_view_v2(cql, ks_name, "vt1", 3)
# Verify that after migration we kept only the entries for the known nodes and views.
async def rows_migrated():
result = await cql.run_async("SELECT * FROM system.view_build_status_v2", host=hosts[0])
return (len(result) == 3) or None
await wait_for(rows_migrated, time.time() + 60)
# Reproduces scylladb/scylladb#20754
# View build status migration is doing read with CL=ALL, so it requires all nodes to be up.
# Before the fix, the migration was triggered too early, causing unavailable exception in topology coordinator.

View File

@@ -28,13 +28,6 @@ async def test_zero_token_nodes_topology_ops(manager: ManagerClient, tablets_ena
def get_pf(rack: str) -> dict[str, str]:
return {"dc": "dc1", "rack": rack}
logging.info('Trying to add a zero-token server in the gossip-based topology')
await manager.server_add(config={'join_ring': False,
'force_gossip_topology_changes': True,
'tablets_mode_for_new_keyspaces': 'disabled'},
property_file=get_pf("rz"),
expected_error='the raft-based topology is disabled')
normal_cfg = {'tablets_mode_for_new_keyspaces': 'enabled' if tablets_enabled else 'disabled'}
zero_token_cfg = {'tablets_mode_for_new_keyspaces': 'enabled' if tablets_enabled else 'disabled', 'join_ring': False}

View File

@@ -1096,7 +1096,7 @@ private:
* and would only slow down tests (by having them wait).
*/
cdc_config.ring_delay = std::chrono::milliseconds(0);
_cdc_generation_service.start(std::ref(cdc_config), std::ref(_gossiper), std::ref(_sys_dist_ks), std::ref(_sys_ks), std::ref(abort_sources), std::ref(_token_metadata), std::ref(_feature_service), std::ref(_db), [&] { return !cfg->force_gossip_topology_changes(); }).get();
_cdc_generation_service.start(std::ref(cdc_config), std::ref(_gossiper), std::ref(_sys_dist_ks), std::ref(_sys_ks), std::ref(abort_sources), std::ref(_token_metadata), std::ref(_feature_service), std::ref(_db)).get();
auto stop_cdc_generation_service = defer_verbose_shutdown("CDC generation service", [this] {
_cdc_generation_service.stop().get();
});