topology_custom/test_tablets: use new_test_keyspace

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
Benny Halevy
2025-01-19 08:52:50 +02:00
parent 005ceb77d3
commit 649e68c6db

View File

@@ -12,7 +12,7 @@ from test.pylib.scylla_cluster import ReplaceConfig
from test.pylib.tablets import get_tablet_replica, get_all_tablet_replicas
from test.pylib.util import unique_name
from test.topology.conftest import skip_mode
from test.topology.util import wait_for_cql_and_get_hosts
from test.topology.util import wait_for_cql_and_get_hosts, create_new_test_keyspace, new_test_keyspace, reconnect_driver
from contextlib import nullcontext as does_not_raise
import time
import pytest
@@ -37,12 +37,12 @@ async def test_tablet_replication_factor_enough_nodes(manager: ManagerClient):
res = await cql.run_async("SELECT data_center FROM system.local")
this_dc = res[0].data_center
await cql.run_async(f"CREATE KEYSPACE test WITH replication = {{'class': 'NetworkTopologyStrategy', '{this_dc}': 3}}")
with pytest.raises(ConfigurationException, match=f"Datacenter {this_dc} doesn't have enough token-owning nodes"):
await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c int);")
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', '{this_dc}': 3}}") as ks:
with pytest.raises(ConfigurationException, match=f"Datacenter {this_dc} doesn't have enough token-owning nodes"):
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
await cql.run_async(f"ALTER KEYSPACE test WITH replication = {{'class': 'NetworkTopologyStrategy', '{this_dc}': 2}}")
await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c int);")
await cql.run_async(f"ALTER KEYSPACE {ks} WITH replication = {{'class': 'NetworkTopologyStrategy', '{this_dc}': 2}}")
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
@pytest.mark.asyncio
@@ -53,27 +53,27 @@ async def test_tablet_cannot_decommision_below_replication_factor(manager: Manag
logger.info("Creating table")
cql = manager.get_cql()
await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3}")
await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c int);")
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
logger.info("Populating table")
keys = range(256)
await asyncio.gather(*[cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({k}, {k});") for k in keys])
logger.info("Populating table")
keys = range(256)
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in keys])
logger.info("Decommission some node")
await manager.decommission_node(servers[0].server_id)
logger.info("Decommission some node")
await manager.decommission_node(servers[0].server_id)
with pytest.raises(HTTPError, match="Decommission failed"):
logger.info("Decommission another node")
await manager.decommission_node(servers[1].server_id)
with pytest.raises(HTTPError, match="Decommission failed"):
logger.info("Decommission another node")
await manager.decommission_node(servers[1].server_id)
# Three nodes should still provide CL=3
logger.info("Checking table")
query = SimpleStatement("SELECT * FROM test.test;", consistency_level=ConsistencyLevel.THREE)
rows = await cql.run_async(query)
assert len(rows) == len(keys)
for r in rows:
assert r.c == r.pk
# Three nodes should still provide CL=3
logger.info("Checking table")
query = SimpleStatement(f"SELECT * FROM {ks}.test;", consistency_level=ConsistencyLevel.THREE)
rows = await cql.run_async(query)
assert len(rows) == len(keys)
for r in rows:
assert r.c == r.pk
async def test_reshape_with_tablets(manager: ManagerClient):
logger.info("Bootstrapping cluster")
@@ -83,31 +83,32 @@ async def test_reshape_with_tablets(manager: ManagerClient):
logger.info("Creating table")
cql = manager.get_cql()
number_of_tablets = 2
await cql.run_async(f"CREATE KEYSPACE test WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} and tablets = {{'initial': {number_of_tablets} }}")
await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c int);")
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}} and tablets = {{'initial': {number_of_tablets} }}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
logger.info("Disabling autocompaction for the table")
await manager.api.disable_autocompaction(server.ip_addr, "test", "test")
logger.info("Disabling autocompaction for the table")
await manager.api.disable_autocompaction(server.ip_addr, ks, "test")
logger.info("Populating table")
loop_count = 32
for _ in range(loop_count):
await asyncio.gather(*[cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({k}, {k});") for k in range(64)])
await manager.api.keyspace_flush(server.ip_addr, "test", "test")
# After populating the table, expect loop_count number of sstables per tablet
sstable_info = await manager.api.get_sstable_info(server.ip_addr, "test", "test")
assert len(sstable_info[0]['sstables']) == number_of_tablets * loop_count
logger.info("Populating table")
loop_count = 32
for _ in range(loop_count):
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in range(64)])
await manager.api.keyspace_flush(server.ip_addr, ks, "test")
# After populating the table, expect loop_count number of sstables per tablet
sstable_info = await manager.api.get_sstable_info(server.ip_addr, ks, "test")
assert len(sstable_info[0]['sstables']) == number_of_tablets * loop_count
log = await manager.server_open_log(server.server_id)
mark = await log.mark()
log = await manager.server_open_log(server.server_id)
mark = await log.mark()
# Restart the server and verify that the sstables have been reshaped down to one sstable per tablet
logger.info("Restart the server")
await manager.server_restart(server.server_id)
# Restart the server and verify that the sstables have been reshaped down to one sstable per tablet
logger.info("Restart the server")
await manager.server_restart(server.server_id)
await reconnect_driver(manager)
await log.wait_for("Reshape test.test .* Reshaped 32 sstables to .*", mark, 30)
sstable_info = await manager.api.get_sstable_info(server.ip_addr, "test", "test")
assert len(sstable_info[0]['sstables']) == number_of_tablets
await log.wait_for(f"Reshape {ks}.test .* Reshaped 32 sstables to .*", mark, 30)
sstable_info = await manager.api.get_sstable_info(server.ip_addr, ks, "test")
assert len(sstable_info[0]['sstables']) == number_of_tablets
@pytest.mark.parametrize("direction", ["up", "down", "none"])
@@ -132,47 +133,47 @@ async def test_tablet_rf_change(manager: ManagerClient, direction):
rf_from = 2
rf_to = 2
await cql.run_async(f"CREATE KEYSPACE test WITH replication = {{'class': 'NetworkTopologyStrategy', '{this_dc}': {rf_from}}}")
await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c int);")
await cql.run_async("CREATE MATERIALIZED VIEW test.test_mv AS SELECT pk FROM test.test WHERE pk IS NOT NULL PRIMARY KEY (pk)")
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', '{this_dc}': {rf_from}}}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
await cql.run_async(f"CREATE MATERIALIZED VIEW {ks}.test_mv AS SELECT pk FROM {ks}.test WHERE pk IS NOT NULL PRIMARY KEY (pk)")
logger.info("Populating table")
await asyncio.gather(*[cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({k}, {k});") for k in range(128)])
logger.info("Populating table")
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in range(128)])
async def check_allocated_replica(expected: int):
replicas = await get_all_tablet_replicas(manager, servers[0], 'test', 'test')
replicas = replicas + await get_all_tablet_replicas(manager, servers[0], 'test', 'test_mv', is_view=True)
for r in replicas:
logger.info(f"{r.replicas}")
assert len(r.replicas) == expected
async def check_allocated_replica(expected: int):
replicas = await get_all_tablet_replicas(manager, servers[0], ks, 'test')
replicas = replicas + await get_all_tablet_replicas(manager, servers[0], ks, 'test_mv', is_view=True)
for r in replicas:
logger.info(f"{r.replicas}")
assert len(r.replicas) == expected
logger.info(f"Checking {rf_from} allocated replicas")
await check_allocated_replica(rf_from)
logger.info(f"Checking {rf_from} allocated replicas")
await check_allocated_replica(rf_from)
logger.info(f"Altering RF {rf_from} -> {rf_to}")
await cql.run_async(f"ALTER KEYSPACE test WITH replication = {{'class': 'NetworkTopologyStrategy', '{this_dc}': {rf_to}}}")
logger.info(f"Altering RF {rf_from} -> {rf_to}")
await cql.run_async(f"ALTER KEYSPACE {ks} WITH replication = {{'class': 'NetworkTopologyStrategy', '{this_dc}': {rf_to}}}")
logger.info(f"Checking {rf_to} re-allocated replicas")
await check_allocated_replica(rf_to)
logger.info(f"Checking {rf_to} re-allocated replicas")
await check_allocated_replica(rf_to)
if direction != 'up':
# Don't check fragments for up/none changes, scylla crashes when checking nodes
# that (validly) miss the replica, see scylladb/scylladb#18786
return
if direction != 'up':
# Don't check fragments for up/none changes, scylla crashes when checking nodes
# that (validly) miss the replica, see scylladb/scylladb#18786
return
fragments = { pk: set() for pk in random.sample(range(128), 17) }
for s in servers:
host_id = await manager.get_host_id(s.server_id)
host = await wait_for_cql_and_get_hosts(cql, [s], time.time() + 30)
await read_barrier(manager.api, s.ip_addr) # scylladb/scylladb#18199
fragments = { pk: set() for pk in random.sample(range(128), 17) }
for s in servers:
host_id = await manager.get_host_id(s.server_id)
host = await wait_for_cql_and_get_hosts(cql, [s], time.time() + 30)
await read_barrier(manager.api, s.ip_addr) # scylladb/scylladb#18199
for k in fragments:
res = await cql.run_async(f"SELECT partition_region FROM MUTATION_FRAGMENTS({ks}.test) WHERE pk={k}", host=host[0])
for fragment in res:
if fragment.partition_region == 0: # partition start
fragments[k].add(host_id)
logger.info("Checking fragments")
for k in fragments:
res = await cql.run_async(f"SELECT partition_region FROM MUTATION_FRAGMENTS(test.test) WHERE pk={k}", host=host[0])
for fragment in res:
if fragment.partition_region == 0: # partition start
fragments[k].add(host_id)
logger.info("Checking fragments")
for k in fragments:
assert len(fragments[k]) == rf_to, f"Found mutations for {k} key on {fragments[k]} hosts, but expected only {rf_to} of them"
assert len(fragments[k]) == rf_to, f"Found mutations for {k} key on {fragments[k]} hosts, but expected only {rf_to} of them"
@pytest.mark.asyncio
@@ -185,17 +186,17 @@ async def test_tablet_mutation_fragments_unowned_partition(manager: ManagerClien
cql = manager.get_cql()
await cql.run_async(f"CREATE KEYSPACE test WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 2}}")
await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c int);")
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
logger.info("Populating table")
await asyncio.gather(*[cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({k}, {k});") for k in range(4)])
logger.info("Populating table")
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in range(4)])
for s in servers:
host_id = await manager.get_host_id(s.server_id)
host = await wait_for_cql_and_get_hosts(cql, [s], time.time() + 30)
for k in range(4):
await cql.run_async(f"SELECT partition_region FROM MUTATION_FRAGMENTS(test.test) WHERE pk={k}", host=host[0])
for s in servers:
host_id = await manager.get_host_id(s.server_id)
host = await wait_for_cql_and_get_hosts(cql, [s], time.time() + 30)
for k in range(4):
await cql.run_async(f"SELECT partition_region FROM MUTATION_FRAGMENTS({ks}.test) WHERE pk={k}", host=host[0])
# ALTER tablets KS cannot change RF of any DC by more than 1 at a time.
@@ -213,37 +214,37 @@ async def test_multidc_alter_tablets_rf(request: pytest.FixtureRequest, manager:
await manager.servers_add(2, config=config, property_file={'dc': f'dc2', 'rack': 'myrack'})
cql = manager.get_cql()
await cql.run_async("create keyspace if not exists ks with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1}")
# need to create a table to not change only the schema, but also tablets replicas
await cql.run_async("create table ks.t (pk int primary key)")
with pytest.raises(InvalidRequest, match="Only one DC's RF can be changed at a time and not by more than 1"):
# changing RF of dc2 from 0 to 2 should fail
await cql.run_async("alter keyspace ks with replication = {'class': 'NetworkTopologyStrategy', 'dc2': 2}")
async with new_test_keyspace(manager, "with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1}") as ks:
# need to create a table to not change only the schema, but also tablets replicas
await cql.run_async(f"create table {ks}.t (pk int primary key)")
with pytest.raises(InvalidRequest, match="Only one DC's RF can be changed at a time and not by more than 1"):
# changing RF of dc2 from 0 to 2 should fail
await cql.run_async(f"alter keyspace {ks} with replication = {{'class': 'NetworkTopologyStrategy', 'dc2': 2}}")
# changing RF of dc2 from 0 to 1 should succeed
await cql.run_async("alter keyspace ks with replication = {'class': 'NetworkTopologyStrategy', 'dc2': 1}")
# ensure that RFs of both DCs are equal to 1 now, i.e. that omitting dc1 in above command didn't change it
res = await cql.run_async("SELECT * FROM system_schema.keyspaces WHERE keyspace_name = 'ks'")
assert res[0].replication['dc1'] == '1'
assert res[0].replication['dc2'] == '1'
# changing RF of dc2 from 0 to 1 should succeed
await cql.run_async(f"alter keyspace {ks} with replication = {{'class': 'NetworkTopologyStrategy', 'dc2': 1}}")
# ensure that RFs of both DCs are equal to 1 now, i.e. that omitting dc1 in above command didn't change it
res = await cql.run_async(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = '{ks}'")
assert res[0].replication['dc1'] == '1'
assert res[0].replication['dc2'] == '1'
# incrementing RF of 2 DCs at once should NOT succeed, because it'd leave 2 pending tablets replicas
with pytest.raises(InvalidRequest, match="Only one DC's RF can be changed at a time and not by more than 1"):
await cql.run_async("alter keyspace ks with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 2, 'dc2': 2}")
# as above, but decrementing
with pytest.raises(InvalidRequest, match="Only one DC's RF can be changed at a time and not by more than 1"):
await cql.run_async("alter keyspace ks with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 0, 'dc2': 0}")
# as above, but decrement 1 RF and increment the other
with pytest.raises(InvalidRequest, match="Only one DC's RF can be changed at a time and not by more than 1"):
await cql.run_async("alter keyspace ks with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 2, 'dc2': 0}")
# as above, but RFs are swapped
with pytest.raises(InvalidRequest, match="Only one DC's RF can be changed at a time and not by more than 1"):
await cql.run_async("alter keyspace ks with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 0, 'dc2': 2}")
# incrementing RF of 2 DCs at once should NOT succeed, because it'd leave 2 pending tablets replicas
with pytest.raises(InvalidRequest, match="Only one DC's RF can be changed at a time and not by more than 1"):
await cql.run_async(f"alter keyspace {ks} with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 2, 'dc2': 2}}")
# as above, but decrementing
with pytest.raises(InvalidRequest, match="Only one DC's RF can be changed at a time and not by more than 1"):
await cql.run_async(f"alter keyspace {ks} with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 0, 'dc2': 0}}")
# as above, but decrement 1 RF and increment the other
with pytest.raises(InvalidRequest, match="Only one DC's RF can be changed at a time and not by more than 1"):
await cql.run_async(f"alter keyspace {ks} with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 2, 'dc2': 0}}")
# as above, but RFs are swapped
with pytest.raises(InvalidRequest, match="Only one DC's RF can be changed at a time and not by more than 1"):
await cql.run_async(f"alter keyspace {ks} with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 0, 'dc2': 2}}")
# check that we can remove all replicas from dc2 by changing RF from 1 to 0
await cql.run_async("alter keyspace ks with replication = {'class': 'NetworkTopologyStrategy', 'dc2': 0}")
# check that we can remove all replicas from the cluster, i.e. change RF of dc1 from 1 to 0 as well:
await cql.run_async("alter keyspace ks with replication = {'class': 'NetworkTopologyStrategy', 'dc1': 0}")
# check that we can remove all replicas from dc2 by changing RF from 1 to 0
await cql.run_async(f"alter keyspace {ks} with replication = {{'class': 'NetworkTopologyStrategy', 'dc2': 0}}")
# check that we can remove all replicas from the cluster, i.e. change RF of dc1 from 1 to 0 as well:
await cql.run_async(f"alter keyspace {ks} with replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 0}}")
# Reproducer for https://github.com/scylladb/scylladb/issues/18110
@@ -260,55 +261,55 @@ async def test_saved_readers_tablet_migration(manager: ManagerClient, build_mode
cql = manager.get_cql()
await cql.run_async("CREATE KEYSPACE test WITH"
async with new_test_keyspace(manager, "WITH"
" replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}"
" and tablets = {'initial': 1}")
await cql.run_async("CREATE TABLE test.test (pk int, ck int, c int, PRIMARY KEY (pk, ck));")
" and tablets = {'initial': 1}") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int, ck int, c int, PRIMARY KEY (pk, ck));")
logger.info("Populating table")
await asyncio.gather(*[cql.run_async(f"INSERT INTO test.test (pk, ck, c) VALUES (0, {k}, 0);") for k in range(128)])
logger.info("Populating table")
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, ck, c) VALUES (0, {k}, 0);") for k in range(128)])
statement = SimpleStatement("SELECT * FROM test.test WHERE pk = 0", fetch_size=10)
cql.execute(statement)
statement = SimpleStatement(f"SELECT * FROM {ks}.test WHERE pk = 0", fetch_size=10)
cql.execute(statement)
def get_querier_cache_population(server):
metrics = requests.get(f"http://{server.ip_addr}:9180/metrics").text
pattern = re.compile("^scylla_database_querier_cache_population")
for metric in metrics.split('\n'):
if pattern.match(metric) is not None:
return int(float(metric.split()[1]))
def get_querier_cache_population(server):
metrics = requests.get(f"http://{server.ip_addr}:9180/metrics").text
pattern = re.compile("^scylla_database_querier_cache_population")
for metric in metrics.split('\n'):
if pattern.match(metric) is not None:
return int(float(metric.split()[1]))
assert any(map(lambda x: x > 0, [get_querier_cache_population(server) for server in servers]))
assert any(map(lambda x: x > 0, [get_querier_cache_population(server) for server in servers]))
table_id = await cql.run_async("SELECT id FROM system_schema.tables WHERE keyspace_name = 'test' AND table_name = 'test'")
table_id = table_id[0].id
table_id = await cql.run_async(f"SELECT id FROM system_schema.tables WHERE keyspace_name = '{ks}' AND table_name = 'test'")
table_id = table_id[0].id
tablet_infos = await cql.run_async(f"SELECT last_token, replicas FROM system.tablets WHERE table_id = {table_id}")
tablet_infos = list(tablet_infos)
tablet_infos = await cql.run_async(f"SELECT last_token, replicas FROM system.tablets WHERE table_id = {table_id}")
tablet_infos = list(tablet_infos)
assert len(tablet_infos) == 1
tablet_info = tablet_infos[0]
assert len(tablet_info.replicas) == 1
assert len(tablet_infos) == 1
tablet_info = tablet_infos[0]
assert len(tablet_info.replicas) == 1
hosts = {await manager.get_host_id(server.server_id) for server in servers}
print(f"HOSTS: {hosts}")
source_host, source_shard = tablet_info.replicas[0]
hosts = {await manager.get_host_id(server.server_id) for server in servers}
print(f"HOSTS: {hosts}")
source_host, source_shard = tablet_info.replicas[0]
hosts.remove(str(source_host))
target_host, target_shard = list(hosts)[0], source_shard
hosts.remove(str(source_host))
target_host, target_shard = list(hosts)[0], source_shard
await manager.api.move_tablet(
node_ip=servers[0].ip_addr,
ks="test",
table="test",
src_host=source_host,
src_shard=source_shard,
dst_host=target_host,
dst_shard=target_shard,
token=tablet_info.last_token)
await manager.api.move_tablet(
node_ip=servers[0].ip_addr,
ks=ks,
table="test",
src_host=source_host,
src_shard=source_shard,
dst_host=target_host,
dst_shard=target_shard,
token=tablet_info.last_token)
# The tablet move should have evicted the cached reader.
assert all(map(lambda x: x == 0, [get_querier_cache_population(server) for server in servers]))
# The tablet move should have evicted the cached reader.
assert all(map(lambda x: x == 0, [get_querier_cache_population(server) for server in servers]))
# Reproducer for https://github.com/scylladb/scylladb/issues/19052
# 1) table A has N tablets and views
@@ -334,53 +335,53 @@ async def test_read_of_pending_replica_during_migration(manager: ManagerClient,
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
cql = manager.get_cql()
await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1};")
await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c int);")
await cql.run_async("CREATE MATERIALIZED VIEW test.mv1 AS \
SELECT * FROM test.test WHERE pk IS NOT NULL AND c IS NOT NULL \
PRIMARY KEY (c, pk);")
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1};") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
await cql.run_async(f"CREATE MATERIALIZED VIEW {ks}.mv1 AS \
SELECT * FROM {ks}.test WHERE pk IS NOT NULL AND c IS NOT NULL \
PRIMARY KEY (c, pk);")
servers.append(await manager.server_add(cmdline=cmdline, config=cfg))
servers.append(await manager.server_add(cmdline=cmdline, config=cfg))
key = 7 # Whatever
tablet_token = 0 # Doesn't matter since there is one tablet
await cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({key}, 0)")
rows = await cql.run_async("SELECT pk from test.test")
assert len(list(rows)) == 1
key = 7 # Whatever
tablet_token = 0 # Doesn't matter since there is one tablet
await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({key}, 0)")
rows = await cql.run_async(f"SELECT pk from {ks}.test")
assert len(list(rows)) == 1
replica = await get_tablet_replica(manager, servers[0], 'test', 'test', tablet_token)
replica = await get_tablet_replica(manager, servers[0], ks, 'test', tablet_token)
s0_host_id = await manager.get_host_id(servers[0].server_id)
s1_host_id = await manager.get_host_id(servers[1].server_id)
dst_shard = 0
s0_host_id = await manager.get_host_id(servers[0].server_id)
s1_host_id = await manager.get_host_id(servers[1].server_id)
dst_shard = 0
await manager.api.enable_injection(servers[1].ip_addr, "stream_mutation_fragments", one_shot=True)
s1_log = await manager.server_open_log(servers[1].server_id)
s1_mark = await s1_log.mark()
await manager.api.enable_injection(servers[1].ip_addr, "stream_mutation_fragments", one_shot=True)
s1_log = await manager.server_open_log(servers[1].server_id)
s1_mark = await s1_log.mark()
# Drop cache to remove dummy entry indicating that underlying mutation source is empty
await manager.api.drop_sstable_caches(servers[1].ip_addr)
# Drop cache to remove dummy entry indicating that underlying mutation source is empty
await manager.api.drop_sstable_caches(servers[1].ip_addr)
migration_task = asyncio.create_task(
manager.api.move_tablet(servers[0].ip_addr, "test", "test", replica[0], replica[1], s1_host_id, dst_shard, tablet_token))
migration_task = asyncio.create_task(
manager.api.move_tablet(servers[0].ip_addr, ks, "test", replica[0], replica[1], s1_host_id, dst_shard, tablet_token))
await s1_log.wait_for('stream_mutation_fragments: waiting', from_mark=s1_mark)
s1_mark = await s1_log.mark()
await s1_log.wait_for('stream_mutation_fragments: waiting', from_mark=s1_mark)
s1_mark = await s1_log.mark()
await cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({key}, 1)")
rows = await cql.run_async("SELECT pk from test.test")
assert len(list(rows)) == 1
await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({key}, 1)")
rows = await cql.run_async(f"SELECT pk from {ks}.test")
assert len(list(rows)) == 1
# Release abandoned streaming
await manager.api.message_injection(servers[1].ip_addr, "stream_mutation_fragments")
await s1_log.wait_for('stream_mutation_fragments: done', from_mark=s1_mark)
# Release abandoned streaming
await manager.api.message_injection(servers[1].ip_addr, "stream_mutation_fragments")
await s1_log.wait_for('stream_mutation_fragments: done', from_mark=s1_mark)
logger.info("Waiting for migration to finish")
await migration_task
logger.info("Migration done")
logger.info("Waiting for migration to finish")
await migration_task
logger.info("Migration done")
rows = await cql.run_async("SELECT pk from test.test")
assert len(list(rows)) == 1
rows = await cql.run_async(f"SELECT pk from {ks}.test")
assert len(list(rows)) == 1
# This test checks that --enable-tablets option and the TABLETS parameters of the CQL CREATE KEYSPACE
@@ -400,12 +401,12 @@ async def test_keyspace_creation_cql_vs_config_sanity(manager: ManagerClient, wi
# First, check if a kesypace is able to be created with default CQL statement that
# doesn't contain tablets parameters. When possible, tablets should be activated
await cql.run_async(f"CREATE KEYSPACE test_d WITH replication = {{'class': '{replication_strategy}', 'replication_factor': 1}};")
res = cql.execute(f"SELECT initial_tablets FROM system_schema.scylla_keyspaces WHERE keyspace_name = 'test_d'").one()
if tablets_enabled_by_default:
assert res.initial_tablets == 0
else:
assert res is None
async with new_test_keyspace(manager, f"WITH replication = {{'class': '{replication_strategy}', 'replication_factor': 1}}") as ks:
res = cql.execute(f"SELECT initial_tablets FROM system_schema.scylla_keyspaces WHERE keyspace_name = '{ks}'").one()
if tablets_enabled_by_default:
assert res.initial_tablets == 0
else:
assert res is None
# Next, check that explicit CQL request for enabling tablets can only be satisfied when
# tablets are possible. Tablets must be activated in this case
@@ -414,15 +415,16 @@ async def test_keyspace_creation_cql_vs_config_sanity(manager: ManagerClient, wi
else:
expectation = pytest.raises(ConfigurationException)
with expectation:
await cql.run_async(f"CREATE KEYSPACE test_y WITH replication = {{'class': '{replication_strategy}', 'replication_factor': 1}} AND TABLETS = {{'enabled': true}};")
res = cql.execute(f"SELECT initial_tablets FROM system_schema.scylla_keyspaces WHERE keyspace_name = 'test_y'").one()
ks = await create_new_test_keyspace(cql, f"WITH replication = {{'class': '{replication_strategy}', 'replication_factor': 1}} AND TABLETS = {{'enabled': true}}")
res = cql.execute(f"SELECT initial_tablets FROM system_schema.scylla_keyspaces WHERE keyspace_name = '{ks}'").one()
assert res.initial_tablets == 0
await cql.run_async(f"drop keyspace {ks}")
# Finally, check that explicitly disabling tablets in CQL results in vnode-based keyspace
# whenever tablets are enabled or not in config
await cql.run_async(f"CREATE KEYSPACE test_n WITH replication = {{'class': '{replication_strategy}', 'replication_factor': 1}} AND TABLETS = {{'enabled': false}};")
res = cql.execute(f"SELECT initial_tablets FROM system_schema.scylla_keyspaces WHERE keyspace_name = 'test_n'").one()
assert res is None
async with new_test_keyspace(manager, f"WITH replication = {{'class': '{replication_strategy}', 'replication_factor': 1}} AND TABLETS = {{'enabled': false}}") as ks:
res = cql.execute(f"SELECT initial_tablets FROM system_schema.scylla_keyspaces WHERE keyspace_name = '{ks}'").one()
assert res is None
@pytest.mark.asyncio
async def test_tablets_and_gossip_topology_changes_are_incompatible(manager: ManagerClient):
@@ -435,11 +437,9 @@ async def test_tablets_disabled_with_gossip_topology_changes(manager: ManagerCli
cfg = {"enable_tablets": False, "force_gossip_topology_changes": True}
await manager.server_add(config=cfg)
cql = manager.get_cql()
ks_name = unique_name()
await cql.run_async(f"CREATE KEYSPACE {ks_name} WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}};")
res = cql.execute(f"SELECT * FROM system_schema.scylla_keyspaces WHERE keyspace_name = '{ks_name}'").one()
logger.info(res)
await cql.run_async(f"DROP KEYSPACE {ks_name}")
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}") as ks_name:
res = cql.execute(f"SELECT * FROM system_schema.scylla_keyspaces WHERE keyspace_name = '{ks_name}'").one()
logger.info(res)
for enabled in ["false", "true"]:
expected = r"Error from server: code=2000 \[Syntax error in CQL query\] message=\"line 1:126 no viable alternative at input 'tablets'\""
@@ -469,37 +469,37 @@ async def test_tablet_streaming_with_unbuilt_view(manager: ManagerClient):
logger.info("Create table, populate it and flush the table to disk")
cql = manager.get_cql()
await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1};")
await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c int);")
num_of_rows = 64
await asyncio.gather(*[cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({k}, {k%3});") for k in range(num_of_rows)])
await manager.api.keyspace_flush(servers[0].ip_addr, "test", "test")
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1};") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
num_of_rows = 64
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k%3});") for k in range(num_of_rows)])
await manager.api.keyspace_flush(servers[0].ip_addr, ks, "test")
logger.info("Starting Node 2")
servers.append(await manager.server_add(cmdline=cmdline, config=cfg))
s1_host_id = await manager.get_host_id(servers[1].server_id)
logger.info("Starting Node 2")
servers.append(await manager.server_add(cmdline=cmdline, config=cfg))
s1_host_id = await manager.get_host_id(servers[1].server_id)
logger.info("Inject error to make view generator pause before processing the sstable")
injection_name = "view_builder_pause_add_new_view"
await manager.api.enable_injection(servers[0].ip_addr, injection_name, one_shot=True)
logger.info("Inject error to make view generator pause before processing the sstable")
injection_name = "view_builder_pause_add_new_view"
await manager.api.enable_injection(servers[0].ip_addr, injection_name, one_shot=True)
logger.info("Create view")
await cql.run_async("CREATE MATERIALIZED VIEW test.mv1 AS \
SELECT * FROM test.test WHERE pk IS NOT NULL AND c IS NOT NULL \
PRIMARY KEY (c, pk);")
logger.info("Create view")
await cql.run_async(f"CREATE MATERIALIZED VIEW {ks}.mv1 AS \
SELECT * FROM {ks}.test WHERE pk IS NOT NULL AND c IS NOT NULL \
PRIMARY KEY (c, pk);")
logger.info("Migrate the tablet to node 2")
tablet_token = 0 # Doesn't matter since there is one tablet
replica = await get_tablet_replica(manager, servers[0], 'test', 'test', tablet_token)
await manager.api.move_tablet(servers[0].ip_addr, "test", "test", replica[0], replica[1], s1_host_id, 0, tablet_token)
logger.info("Migration done")
logger.info("Migrate the tablet to node 2")
tablet_token = 0 # Doesn't matter since there is one tablet
replica = await get_tablet_replica(manager, servers[0], ks, 'test', tablet_token)
await manager.api.move_tablet(servers[0].ip_addr, ks, "test", replica[0], replica[1], s1_host_id, 0, tablet_token)
logger.info("Migration done")
# Verify the table has expected number of rows
rows = await cql.run_async("SELECT pk from test.test")
assert len(list(rows)) == num_of_rows
# Verify that the view has the expected number of rows
rows = await cql.run_async("SELECT c from test.mv1")
assert len(list(rows)) == num_of_rows
# Verify the table has expected number of rows
rows = await cql.run_async(f"SELECT pk from {ks}.test")
assert len(list(rows)) == num_of_rows
# Verify that the view has the expected number of rows
rows = await cql.run_async(f"SELECT c from {ks}.mv1")
assert len(list(rows)) == num_of_rows
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
@@ -525,55 +525,55 @@ async def test_tablet_streaming_with_staged_sstables(manager: ManagerClient):
logger.info("Create the test table, populate few rows and flush to disk")
cql = manager.get_cql()
await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1};")
await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c int);")
await asyncio.gather(*[cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({k}, {k%3});") for k in range(64)])
await manager.api.keyspace_flush(servers[0].ip_addr, "test", "test")
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1};") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k%3});") for k in range(64)])
await manager.api.keyspace_flush(servers[0].ip_addr, ks, "test")
logger.info("Create view")
await cql.run_async("CREATE MATERIALIZED VIEW test.mv1 AS \
SELECT * FROM test.test WHERE pk IS NOT NULL AND c IS NOT NULL \
PRIMARY KEY (c, pk);")
logger.info("Create view")
await cql.run_async(f"CREATE MATERIALIZED VIEW {ks}.mv1 AS \
SELECT * FROM {ks}.test WHERE pk IS NOT NULL AND c IS NOT NULL \
PRIMARY KEY (c, pk);")
logger.info("Generate an sstable and move it to upload directory of test table")
# create an sstable using a dummy table
await cql.run_async("CREATE TABLE test.dummy (pk int PRIMARY KEY, c int);")
await asyncio.gather(*[cql.run_async(f"INSERT INTO test.dummy (pk, c) VALUES ({k}, {k%3});") for k in range(64, 128)])
await manager.api.keyspace_flush(servers[0].ip_addr, "test", "dummy")
node_workdir = await manager.server_get_workdir(servers[0].server_id)
dummy_table_dir = glob.glob(os.path.join(node_workdir, "data", "test", "dummy-*"))[0]
test_table_upload_dir = glob.glob(os.path.join(node_workdir, "data", "test", "test-*", "upload"))[0]
for src_path in glob.glob(os.path.join(dummy_table_dir, "me-*")):
dst_path = os.path.join(test_table_upload_dir, os.path.basename(src_path))
os.rename(src_path, dst_path)
await cql.run_async("DROP TABLE test.dummy;")
logger.info("Generate an sstable and move it to upload directory of test table")
# create an sstable using a dummy table
await cql.run_async("CREATE TABLE {ks}.dummy (pk int PRIMARY KEY, c int);")
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.dummy (pk, c) VALUES ({k}, {k%3});") for k in range(64, 128)])
await manager.api.keyspace_flush(servers[0].ip_addr, ks, "dummy")
node_workdir = await manager.server_get_workdir(servers[0].server_id)
dummy_table_dir = glob.glob(os.path.join(node_workdir, "data", ks, "dummy-*"))[0]
test_table_upload_dir = glob.glob(os.path.join(node_workdir, "data", ks, "test-*", "upload"))[0]
for src_path in glob.glob(os.path.join(dummy_table_dir, "me-*")):
dst_path = os.path.join(test_table_upload_dir, os.path.basename(src_path))
os.rename(src_path, dst_path)
await cql.run_async(f"DROP TABLE {ks}.dummy;")
logger.info("Starting Node 2")
servers.append(await manager.server_add(cmdline=cmdline, config=cfg))
s1_host_id = await manager.get_host_id(servers[1].server_id)
logger.info("Starting Node 2")
servers.append(await manager.server_add(cmdline=cmdline, config=cfg))
s1_host_id = await manager.get_host_id(servers[1].server_id)
logger.info("Inject error to prevent view generator from processing staged sstables")
injection_name = "view_update_generator_consume_staging_sstable"
await manager.api.enable_injection(servers[0].ip_addr, injection_name, one_shot=True)
logger.info("Inject error to prevent view generator from processing staged sstables")
injection_name = "view_update_generator_consume_staging_sstable"
await manager.api.enable_injection(servers[0].ip_addr, injection_name, one_shot=True)
logger.info("Load the sstables from upload directory")
await manager.api.load_new_sstables(servers[0].ip_addr, "test", "test")
logger.info("Load the sstables from upload directory")
await manager.api.load_new_sstables(servers[0].ip_addr, ks, "test")
# The table now has both staged and unstaged sstables.
# Verify that tablet migration handles them both without causing any base-view inconsistencies.
logger.info("Migrate the tablet to node 2")
tablet_token = 0 # Doesn't matter since there is one tablet
replica = await get_tablet_replica(manager, servers[0], 'test', 'test', tablet_token)
await manager.api.move_tablet(servers[0].ip_addr, "test", "test", replica[0], replica[1], s1_host_id, 0, tablet_token)
logger.info("Migration done")
# The table now has both staged and unstaged sstables.
# Verify that tablet migration handles them both without causing any base-view inconsistencies.
logger.info("Migrate the tablet to node 2")
tablet_token = 0 # Doesn't matter since there is one tablet
replica = await get_tablet_replica(manager, servers[0], ks, 'test', tablet_token)
await manager.api.move_tablet(servers[0].ip_addr, ks, "test", replica[0], replica[1], s1_host_id, 0, tablet_token)
logger.info("Migration done")
expected_num_of_rows = 128
# Verify the table has expected number of rows
rows = await cql.run_async("SELECT pk from test.test")
assert len(list(rows)) == expected_num_of_rows
# Verify that the view has the expected number of rows
rows = await cql.run_async("SELECT c from test.mv1")
assert len(list(rows)) == expected_num_of_rows
expected_num_of_rows = 128
# Verify the table has expected number of rows
rows = await cql.run_async(f"SELECT pk from {ks}.test")
assert len(list(rows)) == expected_num_of_rows
# Verify that the view has the expected number of rows
rows = await cql.run_async(f"SELECT c from {ks}.mv1")
assert len(list(rows)) == expected_num_of_rows
@pytest.mark.asyncio
async def test_orphaned_sstables_on_startup(manager: ManagerClient):
@@ -598,24 +598,24 @@ async def test_orphaned_sstables_on_startup(manager: ManagerClient):
logger.info("Create the test table, populate few rows and flush to disk")
cql = manager.get_cql()
await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 2};")
await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c int);")
await asyncio.gather(*[cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({k}, {k%3});") for k in range(256)])
await manager.api.keyspace_flush(servers[0].ip_addr, "test", "test")
ks = await create_new_test_keyspace(cql, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 2}")
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k%3});") for k in range(256)])
await manager.api.keyspace_flush(servers[0].ip_addr, ks, "test")
node0_workdir = await manager.server_get_workdir(servers[0].server_id)
node0_table_dir = glob.glob(os.path.join(node0_workdir, "data", "test", "test-*"))[0]
node0_table_dir = glob.glob(os.path.join(node0_workdir, "data", ks, "test-*"))[0]
logger.info("Start Node 2")
servers.append(await manager.server_add(cmdline=cmdline, config=cfg))
await manager.api.disable_tablet_balancing(servers[1].ip_addr)
node1_workdir = await manager.server_get_workdir(servers[1].server_id)
node1_table_dir = glob.glob(os.path.join(node1_workdir, "data", "test", "test-*"))[0]
node1_table_dir = glob.glob(os.path.join(node1_workdir, "data", ks, "test-*"))[0]
s1_host_id = await manager.get_host_id(servers[1].server_id)
logger.info("Migrate the tablet from node1 to node2")
tablet_token = 0 # Doesn't matter since there is one tablet
replica = await get_tablet_replica(manager, servers[0], 'test', 'test', tablet_token)
await manager.api.move_tablet(servers[0].ip_addr, "test", "test", replica[0], replica[1], s1_host_id, 0, tablet_token)
replica = await get_tablet_replica(manager, servers[0], ks, 'test', tablet_token)
await manager.api.move_tablet(servers[0].ip_addr, ks, "test", replica[0], replica[1], s1_host_id, 0, tablet_token)
logger.info("Migration done")
logger.info("Stop node1 and copy the sstables from node2")
@@ -626,7 +626,7 @@ async def test_orphaned_sstables_on_startup(manager: ManagerClient):
# try starting the server again
logger.info("Start node1 with the orphaned sstables and expect it to fail")
# Error thrown is of format : "Unable to load SSTable {sstable_name} : Storage wasn't found for tablet {tablet_id} of table test.test"
# Error thrown is of format : "Unable to load SSTable {sstable_name} : Storage wasn't found for tablet {tablet_id} of table {ks}.test"
await manager.server_start(servers[0].server_id, expected_error="Storage wasn't found for tablet")
@pytest.mark.asyncio
@@ -649,25 +649,25 @@ async def test_remove_failure_with_no_normal_token_owners_in_dc(manager: Manager
servers['dc3'] = [await manager.server_add(config={'join_ring': False}, property_file={'dc': 'dc3', 'rack': 'rack3'})]
cql = manager.get_cql()
await cql.run_async(f"CREATE KEYSPACE test WITH replication = {{ 'class': 'NetworkTopologyStrategy', 'dc1': 2, 'dc2': 1 }} AND tablets = {{ 'initial': 1 }}")
await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c int);")
async with new_test_keyspace(manager, "WITH replication = { 'class': 'NetworkTopologyStrategy', 'dc1': 2, 'dc2': 1 } AND tablets = { 'initial': 1 }") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
node_to_remove = servers['dc1'][0]
node_to_replace = servers['dc1'][1]
replaced_host_id = await manager.get_host_id(node_to_replace.server_id)
initiator_node = servers['dc2'][0]
node_to_remove = servers['dc1'][0]
node_to_replace = servers['dc1'][1]
replaced_host_id = await manager.get_host_id(node_to_replace.server_id)
initiator_node = servers['dc2'][0]
# Stop both token owners in dc1 to leave no token owners in the datacenter
await manager.server_stop_gracefully(node_to_remove.server_id)
await manager.server_stop_gracefully(node_to_replace.server_id)
# Stop both token owners in dc1 to leave no token owners in the datacenter
await manager.server_stop_gracefully(node_to_remove.server_id)
await manager.server_stop_gracefully(node_to_replace.server_id)
logger.info("Attempting removenode - expected to fail")
await manager.remove_node(initiator_node.server_id, server_id=node_to_remove.server_id, ignore_dead=[replaced_host_id],
expected_error="Removenode failed. See earlier errors (Rolled back: Failed to drain tablets: std::runtime_error (There are nodes with tablets to drain")
logger.info("Attempting removenode - expected to fail")
await manager.remove_node(initiator_node.server_id, server_id=node_to_remove.server_id, ignore_dead=[replaced_host_id],
expected_error="Removenode failed. See earlier errors (Rolled back: Failed to drain tablets: std::runtime_error (There are nodes with tablets to drain")
logger.info(f"Replacing {node_to_replace} with a new node")
replace_cfg = ReplaceConfig(replaced_id=node_to_remove.server_id, reuse_ip_addr = False, use_host_id=True, wait_replaced_dead=True)
await manager.server_add(replace_cfg=replace_cfg, property_file={'dc': 'dc1', 'rack': f'rack1'})
logger.info(f"Replacing {node_to_replace} with a new node")
replace_cfg = ReplaceConfig(replaced_id=node_to_remove.server_id, reuse_ip_addr = False, use_host_id=True, wait_replaced_dead=True)
await manager.server_add(replace_cfg=replace_cfg, property_file={'dc': 'dc1', 'rack': 'rack1'})
@pytest.mark.asyncio
@pytest.mark.parametrize("with_zero_token_node", [False, True])
@@ -686,21 +686,21 @@ async def test_remove_failure_then_replace(manager: ManagerClient, with_zero_tok
servers['dc3'] = [await manager.server_add(config={'join_ring': False}, property_file={'dc': 'dc3', 'rack': 'rack3'})]
cql = manager.get_cql()
await cql.run_async(f"CREATE KEYSPACE test WITH replication = {{ 'class': 'NetworkTopologyStrategy', 'dc1': 2, 'dc2': 1 }} AND tablets = {{ 'initial': 1 }}")
await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c int);")
async with new_test_keyspace(manager, "WITH replication = { 'class': 'NetworkTopologyStrategy', 'dc1': 2, 'dc2': 1 } AND tablets = { 'initial': 1 }") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
node_to_remove = servers['dc1'][0]
initiator_node = servers['dc2'][0]
node_to_remove = servers['dc1'][0]
initiator_node = servers['dc2'][0]
await manager.server_stop_gracefully(node_to_remove.server_id)
await manager.server_stop_gracefully(node_to_remove.server_id)
logger.info("Attempting removenode - expected to fail")
await manager.remove_node(initiator_node.server_id, server_id=node_to_remove.server_id,
expected_error="Removenode failed. See earlier errors (Rolled back: Failed to drain tablets: std::runtime_error (Unable to find new replica for tablet")
logger.info("Attempting removenode - expected to fail")
await manager.remove_node(initiator_node.server_id, server_id=node_to_remove.server_id,
expected_error="Removenode failed. See earlier errors (Rolled back: Failed to drain tablets: std::runtime_error (Unable to find new replica for tablet")
logger.info(f"Replacing {node_to_remove} with a new node")
replace_cfg = ReplaceConfig(replaced_id=node_to_remove.server_id, reuse_ip_addr = False, use_host_id=True, wait_replaced_dead=True)
await manager.server_add(replace_cfg=replace_cfg, property_file={'dc': 'dc1', 'rack': f'rack1'})
logger.info(f"Replacing {node_to_remove} with a new node")
replace_cfg = ReplaceConfig(replaced_id=node_to_remove.server_id, reuse_ip_addr = False, use_host_id=True, wait_replaced_dead=True)
await manager.server_add(replace_cfg=replace_cfg, property_file={'dc': 'dc1', 'rack': 'rack1'})
@pytest.mark.asyncio
@pytest.mark.parametrize("with_zero_token_node", [False, True])
@@ -722,38 +722,41 @@ async def test_replace_with_no_normal_token_owners_in_dc(manager: ManagerClient,
servers['dc3'] = [await manager.server_add(config={'join_ring': False}, property_file={'dc': 'dc3', 'rack': 'rack3'})]
cql = manager.get_cql()
await cql.run_async(f"CREATE KEYSPACE test WITH replication = {{ 'class': 'NetworkTopologyStrategy', 'dc1': 2, 'dc2': 1 }} AND tablets = {{ 'initial': 1 }}")
await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c int);")
async with new_test_keyspace(manager, "WITH replication = { 'class': 'NetworkTopologyStrategy', 'dc1': 2, 'dc2': 1 } AND tablets = { 'initial': 1 }") as ks:
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
stmt = cql.prepare("INSERT INTO test.test (pk, c) VALUES (?, ?)")
stmt.consistency_level = ConsistencyLevel.ALL
keys = range(256)
await asyncio.gather(*[cql.run_async(stmt, [k, k]) for k in keys])
stmt = cql.prepare(f"INSERT INTO {ks}.test (pk, c) VALUES (?, ?)")
stmt.consistency_level = ConsistencyLevel.ALL
keys = range(256)
await asyncio.gather(*[cql.run_async(stmt, [k, k]) for k in keys])
nodes_to_replace = servers['dc1'][0:2]
replaced_host_id = await manager.get_host_id(nodes_to_replace[1].server_id)
nodes_to_replace = servers['dc1'][0:2]
replaced_host_id = await manager.get_host_id(nodes_to_replace[1].server_id)
# Stop both token owners in dc1 to leave no token owners in the datacenter
for node in nodes_to_replace:
await manager.server_stop_gracefully(node.server_id)
# Stop both token owners in dc1 to leave no token owners in the datacenter
for node in nodes_to_replace:
await manager.server_stop_gracefully(node.server_id)
logger.info(f"Replacing {nodes_to_replace[0]} with a new node")
replace_cfg = ReplaceConfig(replaced_id=nodes_to_replace[0].server_id, reuse_ip_addr = False, use_host_id=True, wait_replaced_dead=True,
ignore_dead_nodes=[replaced_host_id])
await manager.server_add(replace_cfg=replace_cfg, property_file={'dc': 'dc1', 'rack': f'rack1'})
logger.info(f"Replacing {nodes_to_replace[0]} with a new node")
replace_cfg = ReplaceConfig(replaced_id=nodes_to_replace[0].server_id, reuse_ip_addr = False, use_host_id=True, wait_replaced_dead=True,
ignore_dead_nodes=[replaced_host_id])
await manager.server_add(replace_cfg=replace_cfg, property_file={'dc': 'dc1', 'rack': 'rack1'})
logger.info(f"Replacing {nodes_to_replace[1]} with a new node")
replace_cfg = ReplaceConfig(replaced_id=nodes_to_replace[1].server_id, reuse_ip_addr = False, use_host_id=True, wait_replaced_dead=True)
await manager.server_add(replace_cfg=replace_cfg, property_file={'dc': 'dc1', 'rack': f'rack1'})
logger.info(f"Replacing {nodes_to_replace[1]} with a new node")
replace_cfg = ReplaceConfig(replaced_id=nodes_to_replace[1].server_id, reuse_ip_addr = False, use_host_id=True, wait_replaced_dead=True)
await manager.server_add(replace_cfg=replace_cfg, property_file={'dc': 'dc1', 'rack': 'rack1'})
logger.info("Verifying data")
for node in servers['dc2']:
await manager.server_stop_gracefully(node.server_id)
query = SimpleStatement("SELECT * FROM test.test;", consistency_level=ConsistencyLevel.ONE)
rows = await cql.run_async(query)
assert len(rows) == len(keys)
for r in rows:
assert r.c == r.pk
logger.info("Verifying data")
for node in servers['dc2']:
await manager.server_stop_gracefully(node.server_id)
query = SimpleStatement(f"SELECT * FROM {ks}.test;", consistency_level=ConsistencyLevel.ONE)
rows = await cql.run_async(query)
assert len(rows) == len(keys)
for r in rows:
assert r.c == r.pk
# For dropping the keyspace
await asyncio.gather(*[manager.server_start(node.server_id) for node in servers['dc2']])
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
@@ -777,14 +780,14 @@ async def test_drop_keyspace_while_split(manager: ManagerClient):
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
# create a table so that it has at least 2 tablets (and storage groups) per shard
await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 4};")
await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c int);")
ks = await create_new_test_keyspace(cql, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 4};")
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);")
await manager.api.disable_autocompaction(servers[0].ip_addr, 'test')
await manager.api.disable_autocompaction(servers[0].ip_addr, ks)
keys = range(2048)
await asyncio.gather(*[cql.run_async(f'INSERT INTO test.test (pk, c) VALUES ({k}, {k});') for k in keys])
await manager.api.flush_keyspace(servers[0].ip_addr, 'test')
await asyncio.gather(*[cql.run_async(f'INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});') for k in keys])
await manager.api.flush_keyspace(servers[0].ip_addr, ks)
await manager.api.enable_injection(servers[0].ip_addr, 'truncate_compaction_disabled_wait', one_shot=False)
await manager.api.enable_injection(servers[0].ip_addr, 'split_storage_groups_wait', one_shot=False)
@@ -796,7 +799,7 @@ async def test_drop_keyspace_while_split(manager: ManagerClient):
await s0_log.wait_for('split_storage_groups_wait: wait')
# start a DROP and wait for it to disable compaction
drop_ks_task = cql.run_async('DROP KEYSPACE test;')
drop_ks_task = cql.run_async(f'DROP KEYSPACE {ks};')
await s0_log.wait_for('truncate_compaction_disabled_wait: wait')
# release split