Merge 'test: delete topology_raft_disabled suite' from Patryk Jędrzejczak
This PR is a necessary step to fix #15854 -- making consistent cluster management mandatory on master. Before making consistent cluster management mandatory, we have to get rid of all tests that depend on the `consistent_cluster_management=false` config. These are the tests in the `topology_raft_disabled` suite. There's the internal Raft upgrade procedure, which is the bulk of the upgrade logic. Then, there are two thin "layers" around it that invoke it underneath: recovery procedure and enable-raft-in-the-cluster procedure. We're getting rid of the second one by making Raft always enabled, so we naturally have to get rid of tests that depend on it. The idea is to replace every necessary enable-raft-in-the-cluster procedure in these tests with the recovery procedure. Then, we will still be testing the internal Raft upgrade procedure in the in-tree tests. The enable-raft-in-the-cluster procedure is already tested by QA tests, so we don't need to worry about these changes. Unfortunately, we cannot adapt `test_raft_upgrade_no_schema`. After making consistent cluster management mandatory on master, schema commitlog will also become mandatory because `consistent_cluster_management: True`, `force_schema_commit_log: False` is considered a bad configuration. These changes will make `test_raft_upgrade_no_schema` unimplementable in the Scylla repo. Therefore, we remove this test. If we want to keep it, we must rewrite it as an upgrade dtest. After making all tests in `topology_raft_disabled` use consistent cluster management, there is no point in keeping this suite. Therefore, we delete it and move all the tests to `topology_custom`. Closes scylladb/scylladb#16192 * github.com:scylladb/scylladb: test: delete topology_raft_disabled suite test: topology_raft_disabled: move tests to topology_custom suite test: topology_raft_disabled: move utils to topology suite test: topology_raft_disabled: use consistent cluster management test: topology_raft_disabled: add new util functions test: topology_raft_disabled: delete test_raft_upgrade_no_schema
This commit is contained in:
@@ -7,9 +7,11 @@
|
||||
Test consistency of schema changes with topology changes.
|
||||
"""
|
||||
import logging
|
||||
import functools
|
||||
import pytest
|
||||
import time
|
||||
from cassandra.cluster import Session # type: ignore # pylint: disable=no-name-in-module
|
||||
from cassandra.pool import Host # type: ignore # pylint: disable=no-name-in-module
|
||||
from test.pylib.internal_types import ServerInfo
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.util import wait_for, wait_for_cql_and_get_hosts, read_barrier
|
||||
@@ -92,3 +94,66 @@ async def wait_for_token_ring_and_group0_consistency(manager: ManagerClient, dea
|
||||
return None
|
||||
return True
|
||||
await wait_for(token_ring_matches, deadline, period=.5)
|
||||
|
||||
|
||||
async def restart(manager: ManagerClient, server: ServerInfo) -> None:
|
||||
logging.info(f"Stopping {server} gracefully")
|
||||
await manager.server_stop_gracefully(server.server_id)
|
||||
logging.info(f"Restarting {server}")
|
||||
await manager.server_start(server.server_id)
|
||||
logging.info(f"{server} restarted")
|
||||
|
||||
|
||||
async def wait_for_upgrade_state(state: str, cql: Session, host: Host, deadline: float) -> None:
|
||||
"""Wait until group 0 upgrade state reaches `state` on `host`, using `cql` to query it. Warning: if the
|
||||
upgrade procedure may progress beyond `state` this function may not notice when it entered `state` and
|
||||
then time out. Use it only if either `state` is the last state or the conditions of the test don't allow
|
||||
the upgrade procedure to progress beyond `state` (e.g. a dead node causing the procedure to be stuck).
|
||||
"""
|
||||
async def reached_state():
|
||||
rs = await cql.run_async("select value from system.scylla_local where key = 'group0_upgrade_state'", host=host)
|
||||
if rs:
|
||||
value = rs[0].value
|
||||
if value == state:
|
||||
return True
|
||||
else:
|
||||
logging.info(f"Upgrade not yet in state {state} on server {host}, state: {value}")
|
||||
else:
|
||||
logging.info(f"Upgrade not yet in state {state} on server {host}, no state was written")
|
||||
return None
|
||||
await wait_for(reached_state, deadline)
|
||||
|
||||
|
||||
async def wait_until_upgrade_finishes(cql: Session, host: Host, deadline: float) -> None:
|
||||
await wait_for_upgrade_state('use_post_raft_procedures', cql, host, deadline)
|
||||
|
||||
|
||||
async def enter_recovery_state(cql: Session, host: Host) -> None:
|
||||
await cql.run_async(
|
||||
"update system.scylla_local set value = 'recovery' where key = 'group0_upgrade_state'",
|
||||
host=host)
|
||||
|
||||
|
||||
async def delete_raft_data(cql: Session, host: Host) -> None:
|
||||
await cql.run_async("truncate table system.discovery", host=host)
|
||||
await cql.run_async("truncate table system.group0_history", host=host)
|
||||
await cql.run_async("delete value from system.scylla_local where key = 'raft_group0_id'", host=host)
|
||||
|
||||
|
||||
async def delete_upgrade_state(cql: Session, host: Host) -> None:
|
||||
await cql.run_async("delete from system.scylla_local where key = 'group0_upgrade_state'", host=host)
|
||||
|
||||
|
||||
async def delete_raft_data_and_upgrade_state(cql: Session, host: Host) -> None:
|
||||
await delete_raft_data(cql, host)
|
||||
await delete_upgrade_state(cql, host)
|
||||
|
||||
|
||||
def log_run_time(f):
|
||||
@functools.wraps(f)
|
||||
async def wrapped(*args, **kwargs):
|
||||
start = time.time()
|
||||
res = await f(*args, **kwargs)
|
||||
logging.info(f"{f.__name__} took {int(time.time() - start)} seconds.")
|
||||
return res
|
||||
return wrapped
|
||||
|
||||
@@ -6,9 +6,12 @@ extra_scylla_config_options:
|
||||
authenticator: AllowAllAuthenticator
|
||||
authorizer: AllowAllAuthorizer
|
||||
run_first:
|
||||
- test_raft_recovery_majority_loss
|
||||
- test_raft_recovery_stuck
|
||||
- test_read_repair
|
||||
- test_replace
|
||||
skip_in_release:
|
||||
- test_raft_recovery_stuck
|
||||
- test_shutdown_hang
|
||||
- test_replace_ignore_nodes
|
||||
- test_old_ip_notification_repro
|
||||
|
||||
64
test/topology_custom/test_raft_recovery_basic.py
Normal file
64
test/topology_custom/test_raft_recovery_basic.py
Normal file
@@ -0,0 +1,64 @@
|
||||
#
|
||||
# Copyright (C) 2022-present ScyllaDB
|
||||
#
|
||||
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import pytest
|
||||
import logging
|
||||
import time
|
||||
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.random_tables import RandomTables
|
||||
from test.pylib.util import unique_name, wait_for_cql_and_get_hosts
|
||||
from test.topology.util import reconnect_driver, restart, enter_recovery_state, \
|
||||
wait_until_upgrade_finishes, delete_raft_data_and_upgrade_state, log_run_time
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@log_run_time
|
||||
async def test_raft_recovery_basic(request, manager: ManagerClient):
|
||||
cfg = {'enable_user_defined_functions': False,
|
||||
'experimental_features': list[str]()}
|
||||
servers = [await manager.server_add(config=cfg) for _ in range(3)]
|
||||
cql = manager.cql
|
||||
assert(cql)
|
||||
|
||||
logging.info("Waiting until driver connects to every server")
|
||||
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
|
||||
logging.info(f"Setting recovery state on {hosts}")
|
||||
await asyncio.gather(*(enter_recovery_state(cql, h) for h in hosts))
|
||||
await asyncio.gather(*(restart(manager, srv) for srv in servers))
|
||||
cql = await reconnect_driver(manager)
|
||||
|
||||
logging.info("Cluster restarted, waiting until driver reconnects to every server")
|
||||
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
logging.info(f"Driver reconnected, hosts: {hosts}")
|
||||
|
||||
logging.info(f"Deleting Raft data and upgrade state on {hosts}")
|
||||
await asyncio.gather(*(delete_raft_data_and_upgrade_state(cql, h) for h in hosts))
|
||||
|
||||
logging.info(f"Restarting {servers}")
|
||||
await asyncio.gather(*(restart(manager, srv) for srv in servers))
|
||||
cql = await reconnect_driver(manager)
|
||||
|
||||
logging.info(f"Cluster restarted, waiting until driver reconnects to every server")
|
||||
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
|
||||
logging.info(f"Driver reconnected, hosts: {hosts}. Waiting until upgrade finishes")
|
||||
await asyncio.gather(*(wait_until_upgrade_finishes(cql, h, time.time() + 60) for h in hosts))
|
||||
|
||||
logging.info("Upgrade finished. Creating a new table")
|
||||
random_tables = RandomTables(request.node.name, manager, unique_name(), 1)
|
||||
table = await random_tables.add_table(ncolumns=5)
|
||||
|
||||
logging.info("Checking group0_history")
|
||||
rs = await cql.run_async("select * from system.group0_history")
|
||||
assert(rs)
|
||||
logging.info(f"group0_history entry description: '{rs[0].description}'")
|
||||
assert(table.full_name in rs[0].description)
|
||||
|
||||
logging.info("Booting new node")
|
||||
await manager.server_add(config=cfg)
|
||||
@@ -10,40 +10,34 @@ import logging
|
||||
import time
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.random_tables import RandomTables
|
||||
from test.pylib.util import wait_for_cql_and_get_hosts
|
||||
from test.topology.util import reconnect_driver
|
||||
from test.topology_raft_disabled.util import restart, enable_raft_and_restart, \
|
||||
wait_until_upgrade_finishes, delete_raft_data, log_run_time
|
||||
from test.pylib.util import unique_name, wait_for_cql_and_get_hosts
|
||||
from test.topology.util import reconnect_driver, restart, enter_recovery_state, \
|
||||
wait_until_upgrade_finishes, delete_raft_data_and_upgrade_state, log_run_time
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@log_run_time
|
||||
@pytest.mark.replication_factor(1)
|
||||
async def test_recovery_after_majority_loss(manager: ManagerClient, random_tables: RandomTables):
|
||||
async def test_recovery_after_majority_loss(request, manager: ManagerClient):
|
||||
"""
|
||||
We successfully upgrade a cluster. Eventually however all servers but one fail - group 0
|
||||
is left without a majority. We create a new group 0 by entering RECOVERY, using `removenode`
|
||||
to get rid of the other servers, clearing Raft data and restarting. The Raft upgrade procedure
|
||||
runs again to establish a single-node group 0. We also verify that schema changes performed
|
||||
using the old group 0 are still there.
|
||||
All initial servers but one fail - group 0 is left without a majority. We create a new group
|
||||
0 by entering RECOVERY, using `removenode` to get rid of the other servers, clearing Raft
|
||||
data and restarting. The Raft upgrade procedure runs to establish a single-node group 0. We
|
||||
also verify that schema changes performed using the old group 0 are still there.
|
||||
Note: in general there's no guarantee that all schema changes will be present; the minority
|
||||
used to recover group 0 might have missed them. However in this test the driver waits
|
||||
for schema agreement to complete before proceeding, so we know that every server learned
|
||||
about the schema changes.
|
||||
"""
|
||||
servers = await manager.running_servers()
|
||||
cfg = {'enable_user_defined_functions': False,
|
||||
'experimental_features': list[str]()}
|
||||
servers = [await manager.server_add(config=cfg) for _ in range(3)]
|
||||
|
||||
logging.info(f"Enabling Raft on {servers} and restarting")
|
||||
await asyncio.gather(*(enable_raft_and_restart(manager, srv) for srv in servers))
|
||||
cql = await reconnect_driver(manager)
|
||||
|
||||
logging.info("Cluster restarted, waiting until driver reconnects to every server")
|
||||
logging.info("Waiting until driver connects to every server")
|
||||
cql = manager.get_cql()
|
||||
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
|
||||
logging.info(f"Driver reconnected, hosts: {hosts}. Waiting until upgrade finishes")
|
||||
await asyncio.gather(*(wait_until_upgrade_finishes(cql, h, time.time() + 60) for h in hosts))
|
||||
|
||||
logging.info("Upgrade finished. Creating a bunch of tables")
|
||||
logging.info("Creating a bunch of tables")
|
||||
random_tables = RandomTables(request.node.name, manager, unique_name(), 1)
|
||||
tables = await asyncio.gather(*(random_tables.add_table(ncolumns=5) for _ in range(5)))
|
||||
|
||||
srv1, *others = servers
|
||||
@@ -53,7 +47,7 @@ async def test_recovery_after_majority_loss(manager: ManagerClient, random_table
|
||||
|
||||
logging.info(f"Entering recovery state on {srv1}")
|
||||
host1 = next(h for h in hosts if h.address == srv1.ip_addr)
|
||||
await cql.run_async("update system.scylla_local set value = 'recovery' where key = 'group0_upgrade_state'", host=host1)
|
||||
await enter_recovery_state(cql, host1)
|
||||
await restart(manager, srv1)
|
||||
cql = await reconnect_driver(manager)
|
||||
|
||||
@@ -67,8 +61,7 @@ async def test_recovery_after_majority_loss(manager: ManagerClient, random_table
|
||||
await manager.remove_node(srv1.server_id, to_remove.server_id, ignore_dead_ips)
|
||||
|
||||
logging.info(f"Deleting old Raft data and upgrade state on {host1} and restarting")
|
||||
await delete_raft_data(cql, host1)
|
||||
await cql.run_async("delete from system.scylla_local where key = 'group0_upgrade_state'", host=host1)
|
||||
await delete_raft_data_and_upgrade_state(cql, host1)
|
||||
await restart(manager, srv1)
|
||||
cql = await reconnect_driver(manager)
|
||||
|
||||
@@ -85,6 +78,4 @@ async def test_recovery_after_majority_loss(manager: ManagerClient, random_table
|
||||
await random_tables.add_table(ncolumns=5)
|
||||
|
||||
logging.info("Booting new node")
|
||||
await manager.server_add(config={
|
||||
'consistent_cluster_management': True
|
||||
})
|
||||
await manager.server_add(config=cfg)
|
||||
@@ -10,38 +10,53 @@ import logging
|
||||
import time
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.random_tables import RandomTables
|
||||
from test.pylib.rest_client import inject_error_one_shot
|
||||
from test.pylib.util import wait_for_cql_and_get_hosts
|
||||
from test.topology.util import reconnect_driver
|
||||
from test.topology_raft_disabled.util import restart, enable_raft_and_restart, \
|
||||
wait_for_upgrade_state, wait_until_upgrade_finishes, delete_raft_data, log_run_time
|
||||
from test.pylib.util import unique_name, wait_for_cql_and_get_hosts
|
||||
from test.topology.util import reconnect_driver, restart, enter_recovery_state, wait_for_upgrade_state, \
|
||||
wait_until_upgrade_finishes, delete_raft_data_and_upgrade_state, log_run_time
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@log_run_time
|
||||
@pytest.mark.replication_factor(1)
|
||||
async def test_recover_stuck_raft_upgrade(manager: ManagerClient, random_tables: RandomTables):
|
||||
async def test_recover_stuck_raft_recovery(request, manager: ManagerClient):
|
||||
"""
|
||||
We enable Raft on every server and the upgrade procedure starts. All servers join group 0. Then one
|
||||
of them fails, the rest enter 'synchronize' state. We assume the failed server cannot be recovered.
|
||||
After creating a cluster, we enter RECOVERY state on every server. Then, we delete the Raft data
|
||||
and the upgrade state on all servers. We restart them and the upgrade procedure starts. One of the
|
||||
servers fails, the rest enter 'synchronize' state. We assume the failed server cannot be recovered.
|
||||
We cannot just remove it at this point; it's already part of group 0, `remove_from_group0` will wait
|
||||
until upgrade procedure finishes - but the procedure is stuck. To proceed we enter RECOVERY state on
|
||||
the other servers, remove the failed one, and clear existing Raft data. After leaving RECOVERY the
|
||||
remaining nodes will restart the procedure, establish a new group 0 and finish upgrade.
|
||||
|
||||
kbr-: the test takes about 26 seconds in dev mode on my laptop.
|
||||
"""
|
||||
servers = await manager.running_servers()
|
||||
cfg = {'enable_user_defined_functions': False,
|
||||
'experimental_features': list[str]()}
|
||||
servers = [await manager.server_add(config=cfg) for _ in range(3)]
|
||||
srv1, *others = servers
|
||||
|
||||
logging.info(f"Enabling Raft on {srv1} and restarting")
|
||||
await enable_raft_and_restart(manager, srv1)
|
||||
logging.info("Waiting until driver connects to every server")
|
||||
cql = manager.get_cql()
|
||||
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
|
||||
# TODO error injection should probably be done through ScyllaClusterManager (we may need to mark the cluster as dirty).
|
||||
# In this test the cluster is dirty anyway due to a restart so it's safe.
|
||||
await inject_error_one_shot(manager.api, srv1.ip_addr, 'group0_upgrade_before_synchronize')
|
||||
logging.info(f"Enabling Raft on {others} and restarting")
|
||||
await asyncio.gather(*(enable_raft_and_restart(manager, srv) for srv in others))
|
||||
logging.info(f"Setting recovery state on {hosts}")
|
||||
await asyncio.gather(*(enter_recovery_state(cql, h) for h in hosts))
|
||||
await asyncio.gather(*(restart(manager, srv) for srv in servers))
|
||||
cql = await reconnect_driver(manager)
|
||||
|
||||
logging.info(f"Cluster restarted, waiting until driver reconnects to {others}")
|
||||
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
logging.info(f"Driver reconnected, hosts: {hosts}")
|
||||
|
||||
logging.info(f"Deleting Raft data and upgrade state on {hosts}")
|
||||
await asyncio.gather(*(delete_raft_data_and_upgrade_state(cql, h) for h in hosts))
|
||||
|
||||
logging.info(f"Stopping {servers}")
|
||||
await asyncio.gather(*(manager.server_stop_gracefully(srv.server_id) for srv in servers))
|
||||
|
||||
logging.info(f"Starting {srv1} with injected group 0 upgrade error")
|
||||
await manager.server_update_config(srv1.server_id, 'error_injections_at_startup', ['group0_upgrade_before_synchronize'])
|
||||
await manager.server_start(srv1.server_id)
|
||||
|
||||
logging.info(f"Starting {others}")
|
||||
await asyncio.gather(*(manager.server_start(srv.server_id) for srv in others))
|
||||
cql = await reconnect_driver(manager)
|
||||
|
||||
logging.info(f"Cluster restarted, waiting until driver reconnects to {others}")
|
||||
@@ -56,10 +71,7 @@ async def test_recover_stuck_raft_upgrade(manager: ManagerClient, random_tables:
|
||||
# '[shard 0] raft_group0_upgrade - Raft upgrade failed: std::runtime_error (error injection before group 0 upgrade enters synchronize).'
|
||||
|
||||
logging.info(f"Setting recovery state on {hosts}")
|
||||
for host in hosts:
|
||||
await cql.run_async(
|
||||
"update system.scylla_local set value = 'recovery' where key = 'group0_upgrade_state'",
|
||||
host=host)
|
||||
await asyncio.gather(*(enter_recovery_state(cql, h) for h in hosts))
|
||||
|
||||
logging.info(f"Restarting {others}")
|
||||
await asyncio.gather(*(restart(manager, srv) for srv in others))
|
||||
@@ -76,6 +88,7 @@ async def test_recover_stuck_raft_upgrade(manager: ManagerClient, random_tables:
|
||||
assert rs[0].value == 'recovery'
|
||||
|
||||
logging.info("Creating a table while in recovery state")
|
||||
random_tables = RandomTables(request.node.name, manager, unique_name(), 1)
|
||||
table = await random_tables.add_table(ncolumns=5)
|
||||
|
||||
logging.info(f"Stopping {srv1}")
|
||||
@@ -85,9 +98,7 @@ async def test_recover_stuck_raft_upgrade(manager: ManagerClient, random_tables:
|
||||
await manager.remove_node(others[0].server_id, srv1.server_id)
|
||||
|
||||
logging.info(f"Deleting Raft data and upgrade state on {hosts} and restarting")
|
||||
for host in hosts:
|
||||
await delete_raft_data(cql, host)
|
||||
await cql.run_async("delete from system.scylla_local where key = 'group0_upgrade_state'", host=host)
|
||||
await asyncio.gather(*(delete_raft_data_and_upgrade_state(cql, h) for h in hosts))
|
||||
|
||||
await asyncio.gather(*(restart(manager, srv) for srv in others))
|
||||
cql = await reconnect_driver(manager)
|
||||
@@ -1,3 +0,0 @@
|
||||
This suite is similar to the `topology` suite except Raft is disabled
|
||||
initially. It can still be enabled during a test.
|
||||
The original purpose of this suite is to test the Raft upgrade procedure.
|
||||
@@ -1,9 +0,0 @@
|
||||
#
|
||||
# Copyright (C) 2022-present ScyllaDB
|
||||
#
|
||||
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
#
|
||||
# This file configures pytest for all tests in this directory, and also
|
||||
# defines common test fixtures for all of them to use
|
||||
|
||||
from test.topology.conftest import *
|
||||
@@ -1,12 +0,0 @@
|
||||
# Pytest configuration file. If we don't have one in this directory,
|
||||
# pytest will look for one in our ancestor directories, and may find
|
||||
# something irrelevant. So we should have one here, even if empty.
|
||||
[pytest]
|
||||
asyncio_mode = auto
|
||||
|
||||
log_cli = true
|
||||
log_format = %(asctime)s.%(msecs)03d %(levelname)s> %(message)s
|
||||
log_date_format = %H:%M:%S
|
||||
|
||||
markers =
|
||||
replication_factor: replication factor for RandomTables
|
||||
@@ -1,13 +0,0 @@
|
||||
type: Topology
|
||||
pool_size: 4
|
||||
cluster:
|
||||
initial_size: 3
|
||||
extra_scylla_config_options:
|
||||
authenticator: AllowAllAuthenticator
|
||||
authorizer: AllowAllAuthorizer
|
||||
enable_user_defined_functions: False
|
||||
experimental_features: []
|
||||
consistent_cluster_management: False
|
||||
run_first:
|
||||
- test_raft_upgrade_stuck
|
||||
- test_raft_upgrade_majority_loss
|
||||
@@ -1,57 +0,0 @@
|
||||
#
|
||||
# Copyright (C) 2022-present ScyllaDB
|
||||
#
|
||||
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import pytest
|
||||
import logging
|
||||
import time
|
||||
from typing import Callable, Awaitable, Optional, TypeVar, Generic
|
||||
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.random_tables import RandomTables
|
||||
from test.pylib.rest_client import inject_error_one_shot
|
||||
from test.pylib.util import wait_for, wait_for_cql_and_get_hosts, wait_for_feature
|
||||
from test.topology.util import reconnect_driver
|
||||
from test.topology_raft_disabled.util import restart, enable_raft, \
|
||||
enable_raft_and_restart, wait_for_upgrade_state, wait_until_upgrade_finishes, \
|
||||
delete_raft_data, log_run_time
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@log_run_time
|
||||
@pytest.mark.replication_factor(1)
|
||||
async def test_raft_upgrade_basic(manager: ManagerClient, random_tables: RandomTables):
|
||||
servers = await manager.running_servers()
|
||||
cql = manager.cql
|
||||
assert(cql)
|
||||
|
||||
# system.group0_history should either not exist or there should be no entries in it before upgrade.
|
||||
if await cql.run_async("select * from system_schema.tables where keyspace_name = 'system' and table_name = 'group0_history'"):
|
||||
assert(not (await cql.run_async("select * from system.group0_history")))
|
||||
|
||||
logging.info(f"Enabling Raft on {servers} and restarting")
|
||||
await asyncio.gather(*(enable_raft_and_restart(manager, srv) for srv in servers))
|
||||
cql = await reconnect_driver(manager)
|
||||
|
||||
logging.info("Cluster restarted, waiting until driver reconnects to every server")
|
||||
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
|
||||
logging.info(f"Driver reconnected, hosts: {hosts}. Waiting until upgrade finishes")
|
||||
await asyncio.gather(*(wait_until_upgrade_finishes(cql, h, time.time() + 60) for h in hosts))
|
||||
|
||||
logging.info("Upgrade finished. Creating a new table")
|
||||
table = await random_tables.add_table(ncolumns=5)
|
||||
|
||||
logging.info("Checking group0_history")
|
||||
rs = await cql.run_async("select * from system.group0_history")
|
||||
assert(rs)
|
||||
logging.info(f"group0_history entry description: '{rs[0].description}'")
|
||||
assert(table.full_name in rs[0].description)
|
||||
|
||||
logging.info("Booting new node")
|
||||
await manager.server_add(config={
|
||||
'consistent_cluster_management': True
|
||||
})
|
||||
@@ -1,74 +0,0 @@
|
||||
#
|
||||
# Copyright (C) 2022-present ScyllaDB
|
||||
#
|
||||
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import pytest
|
||||
import logging
|
||||
import time
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.random_tables import RandomTables
|
||||
from test.pylib.util import wait_for_cql_and_get_hosts, wait_for_feature
|
||||
from test.topology.util import reconnect_driver
|
||||
from test.topology_raft_disabled.util import restart, enable_raft, \
|
||||
enable_raft_and_restart, wait_for_upgrade_state, wait_until_upgrade_finishes, \
|
||||
delete_raft_data, log_run_time
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@log_run_time
|
||||
async def test_durability_with_no_schema_commitlog (manager: ManagerClient):
|
||||
new_server_info = await manager.server_add(config={
|
||||
'consistent_cluster_management': False,
|
||||
'force_schema_commit_log': False,
|
||||
'flush_schema_tables_after_modification': True
|
||||
})
|
||||
await manager.server_stop(new_server_info.server_id)
|
||||
await manager.server_start(new_server_info.server_id)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@log_run_time
|
||||
async def test_upgrade_with_no_schema_commitlog(manager: ManagerClient, random_tables: RandomTables):
|
||||
# RAFT without schema_commit_log should produce error at node startup
|
||||
# We can't test this on the already running nodes, since schema commitlog feature
|
||||
# may have already been enabled on them
|
||||
new_server_info = await manager.server_add(start=False, config={
|
||||
'consistent_cluster_management': 'True',
|
||||
'force_schema_commit_log': 'False'
|
||||
})
|
||||
await manager.server_start(new_server_info.server_id,
|
||||
"Bad configuration: consistent_cluster_management "
|
||||
"requires schema commit log to be enabled")
|
||||
|
||||
# Rollback RAFT and restart new node
|
||||
# Now it's the same as initially running nodes, except with force_schema_commit_log = False
|
||||
await manager.server_update_config(new_server_info.server_id, 'consistent_cluster_management', False)
|
||||
await manager.server_stop_gracefully(new_server_info.server_id)
|
||||
await manager.server_start(new_server_info.server_id)
|
||||
cql = await reconnect_driver(manager)
|
||||
servers = await manager.running_servers()
|
||||
|
||||
# Wait for schema commitlog feature on all nodes, including the new one.
|
||||
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
await asyncio.gather(*(wait_for_feature("SCHEMA_COMMITLOG", cql, h, time.time() + 60) for h in hosts))
|
||||
|
||||
# restart all the nodes with RAFT enabled
|
||||
await asyncio.gather(*(enable_raft_and_restart(manager, srv) for srv in servers))
|
||||
cql = await reconnect_driver(manager)
|
||||
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
|
||||
# wait until upgrade finishes
|
||||
await asyncio.gather(*(wait_until_upgrade_finishes(cql, h, time.time() + 60) for h in hosts))
|
||||
|
||||
# upgrade finished, try to create a new table
|
||||
table = await random_tables.add_table(ncolumns=5)
|
||||
|
||||
# check group history
|
||||
rs = await cql.run_async("select * from system.group0_history")
|
||||
assert(rs)
|
||||
logging.info(f"group0_history entry description: '{rs[0].description}'")
|
||||
assert(table.full_name in rs[0].description)
|
||||
|
||||
|
||||
@@ -1,93 +0,0 @@
|
||||
#
|
||||
# Copyright (C) 2023-present ScyllaDB
|
||||
#
|
||||
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
#
|
||||
"""
|
||||
Utilities for Raft upgrade tests
|
||||
"""
|
||||
import logging
|
||||
import functools
|
||||
import time
|
||||
from typing import Optional
|
||||
from cassandra.cluster import NoHostAvailable, Session # type: ignore # pylint: disable=no-name-in-module
|
||||
from cassandra.pool import Host # type: ignore # pylint: disable=no-name-in-module
|
||||
from test.pylib.rest_client import ScyllaRESTAPIClient
|
||||
from test.pylib.manager_client import ManagerClient, IPAddress, ServerInfo
|
||||
from test.pylib.util import wait_for
|
||||
|
||||
|
||||
async def restart(manager: ManagerClient, server: ServerInfo) -> None:
|
||||
logging.info(f"Stopping {server} gracefully")
|
||||
await manager.server_stop_gracefully(server.server_id)
|
||||
logging.info(f"Restarting {server}")
|
||||
await manager.server_start(server.server_id)
|
||||
logging.info(f"{server} restarted")
|
||||
|
||||
|
||||
async def enable_raft(manager: ManagerClient, server: ServerInfo) -> None:
|
||||
config = await manager.server_get_config(server.server_id)
|
||||
logging.info(f"Updating config of server {server}")
|
||||
await manager.server_update_config(server.server_id, 'consistent_cluster_management', 'True')
|
||||
|
||||
|
||||
async def enable_raft_and_restart(manager: ManagerClient, server: ServerInfo) -> None:
|
||||
await enable_raft(manager, server)
|
||||
await restart(manager, server)
|
||||
|
||||
|
||||
async def wait_for_upgrade_state(state: str, cql: Session, host: Host, deadline: float) -> None:
|
||||
"""Wait until group 0 upgrade state reaches `state` on `host`, using `cql` to query it. Warning: if the
|
||||
upgrade procedure may progress beyond `state` this function may not notice when it entered `state` and
|
||||
then time out. Use it only if either `state` is the last state or the conditions of the test don't allow
|
||||
the upgrade procedure to progress beyond `state` (e.g. a dead node causing the procedure to be stuck).
|
||||
"""
|
||||
async def reached_state():
|
||||
rs = await cql.run_async("select value from system.scylla_local where key = 'group0_upgrade_state'", host=host)
|
||||
if rs:
|
||||
value = rs[0].value
|
||||
if value == state:
|
||||
return True
|
||||
else:
|
||||
logging.info(f"Upgrade not yet in state {state} on server {host}, state: {value}")
|
||||
else:
|
||||
logging.info(f"Upgrade not yet in state {state} on server {host}, no state was written")
|
||||
return None
|
||||
await wait_for(reached_state, deadline)
|
||||
|
||||
|
||||
async def wait_until_upgrade_finishes(cql: Session, host: Host, deadline: float) -> None:
|
||||
await wait_for_upgrade_state('use_post_raft_procedures', cql, host, deadline)
|
||||
|
||||
|
||||
async def wait_for_gossip_gen_increase(api: ScyllaRESTAPIClient, gen: int, node_ip: IPAddress,
|
||||
target_ip: IPAddress, deadline: float):
|
||||
"""Wait until the generation number of `target_ip` increases above `gen` from the point of view of `node_ip`.
|
||||
Can be used to wait until `node_ip` gossips with `target_ip` after `target_ip` was restarted
|
||||
by saving the generation number of `target_ip` before restarting it and then calling this function
|
||||
(nodes increase their generation numbers when they restart).
|
||||
"""
|
||||
async def gen_increased() -> Optional[int]:
|
||||
curr_gen = await api.get_gossip_generation_number(node_ip, target_ip)
|
||||
if curr_gen <= gen:
|
||||
logging.info(f"Gossip generation number of {target_ip} is {curr_gen} <= {gen} according to {node_ip}")
|
||||
return None
|
||||
return curr_gen
|
||||
gen = await wait_for(gen_increased, deadline)
|
||||
logging.info(f"Gossip generation number of {target_ip} is reached {gen} according to {node_ip}")
|
||||
|
||||
|
||||
async def delete_raft_data(cql: Session, host: Host) -> None:
|
||||
await cql.run_async("truncate table system.discovery", host=host)
|
||||
await cql.run_async("truncate table system.group0_history", host=host)
|
||||
await cql.run_async("delete value from system.scylla_local where key = 'raft_group0_id'", host=host)
|
||||
|
||||
|
||||
def log_run_time(f):
|
||||
@functools.wraps(f)
|
||||
async def wrapped(*args, **kwargs):
|
||||
start = time.time()
|
||||
res = await f(*args, **kwargs)
|
||||
logging.info(f"{f.__name__} took {int(time.time() - start)} seconds.")
|
||||
return res
|
||||
return wrapped
|
||||
Reference in New Issue
Block a user