test: make create_dataset async and refactor so it's configurable
with num_keys and min_tablet_count split from bhalevy/load-balance-primary-replica Signed-off-by: Robert Bindar <robert.bindar@scylladb.com>
This commit is contained in:
@@ -554,26 +554,24 @@ async def create_cluster(topology, rf_rack_valid_keyspaces, manager, logger, obj
|
||||
|
||||
return servers,host_ids
|
||||
|
||||
def create_dataset(manager, ks, cf, topology, logger, dcs_num=None):
|
||||
async def create_dataset(manager, ks, cf, topology, logger, num_keys=256, min_tablet_count=None, schema=None):
|
||||
cql = manager.get_cql()
|
||||
logger.info(f'Create keyspace, rf={topology.rf}')
|
||||
keys = range(256)
|
||||
logger.info(f'Create keyspace, {topology=}')
|
||||
keys = range(num_keys)
|
||||
replication_opts = {'class': 'NetworkTopologyStrategy'}
|
||||
if dcs_num is not None:
|
||||
for dc in range(dcs_num):
|
||||
replication_opts[f'dc{dc}'] = int(topology.rf / dcs_num)
|
||||
else:
|
||||
replication_opts['replication_factor'] = f'{topology.rf}'
|
||||
replication_opts['replication_factor'] = f'{topology.rf}'
|
||||
replication_opts = format_tuples(replication_opts)
|
||||
|
||||
print(replication_opts)
|
||||
|
||||
cql.execute((f"CREATE KEYSPACE {ks} WITH REPLICATION = {replication_opts};"))
|
||||
|
||||
schema = f"CREATE TABLE {ks}.{cf} ( pk int primary key, value text );"
|
||||
schema = f"CREATE TABLE {ks}.{cf} ( pk text primary key, value int )"
|
||||
if min_tablet_count:
|
||||
schema += f"WITH tablets = {{'min_tablet_count': {min_tablet_count}}}"
|
||||
schema += ';'
|
||||
cql.execute(schema)
|
||||
for k in keys:
|
||||
cql.execute(f"INSERT INTO {ks}.{cf} ( pk, value ) VALUES ({k}, '{k}');")
|
||||
await asyncio.gather(*(cql.run_async(f"INSERT INTO {ks}.{cf} ( pk, value ) VALUES ('{k}', {k});") for k in keys))
|
||||
|
||||
return schema, keys, replication_opts
|
||||
|
||||
@@ -702,7 +700,7 @@ async def test_restore_with_streaming_scopes(manager: ManagerClient, object_stor
|
||||
ks = 'ks'
|
||||
cf = 'cf'
|
||||
|
||||
schema, keys, replication_opts = create_dataset(manager, ks, cf, topology, logger)
|
||||
schema, keys, replication_opts = await create_dataset(manager, ks, cf, topology, logger, num_keys=10000, min_tablet_count=512)
|
||||
|
||||
snap_name, sstables = await take_snapshot(ks, servers, manager, logger)
|
||||
prefix = f'{cf}/{snap_name}'
|
||||
@@ -854,7 +852,7 @@ async def test_restore_primary_replica_same_rack_scope_rack(manager: ManagerClie
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
cql = manager.get_cql()
|
||||
|
||||
schema, keys, replication_opts = create_dataset(manager, ks, cf, topology, logger)
|
||||
schema, keys, replication_opts = await create_dataset(manager, ks, cf, topology, logger)
|
||||
|
||||
snap_name, sstables = await take_snapshot(ks, servers, manager, logger)
|
||||
prefix = f'{cf}/{snap_name}'
|
||||
@@ -905,7 +903,7 @@ async def test_restore_primary_replica_different_rack_scope_dc(manager: ManagerC
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
cql = manager.get_cql()
|
||||
|
||||
schema, keys, replication_opts = create_dataset(manager, ks, cf, topology, logger)
|
||||
schema, keys, replication_opts = await create_dataset(manager, ks, cf, topology, logger)
|
||||
|
||||
snap_name, sstables = await take_snapshot(ks, servers, manager, logger)
|
||||
prefix = f'{cf}/{snap_name}'
|
||||
@@ -948,7 +946,7 @@ async def test_restore_primary_replica_same_dc_scope_dc(manager: ManagerClient,
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
cql = manager.get_cql()
|
||||
|
||||
schema, keys, replication_opts = create_dataset(manager, ks, cf, topology, logger)
|
||||
schema, keys, replication_opts = await create_dataset(manager, ks, cf, topology, logger)
|
||||
|
||||
snap_name, sstables = await take_snapshot(ks, servers, manager, logger)
|
||||
prefix = f'{cf}/{snap_name}'
|
||||
@@ -989,7 +987,7 @@ async def test_restore_primary_replica_different_dc_scope_all(manager: ManagerCl
|
||||
The test also checks that the logs of each restoring node shows streaming to two nodes because cross-dc streaming is allowed
|
||||
and eventually one node, depending on tablet_id of mutations, will end up choosing either of the two nodes as primary replica.'''
|
||||
|
||||
topology = topo(rf = 2, nodes = 2, racks = 2, dcs = 2)
|
||||
topology = topo(rf = 1, nodes = 2, racks = 1, dcs = 2)
|
||||
scope = "all"
|
||||
ks = 'ks'
|
||||
cf = 'cf'
|
||||
@@ -999,7 +997,7 @@ async def test_restore_primary_replica_different_dc_scope_all(manager: ManagerCl
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
cql = manager.get_cql()
|
||||
|
||||
schema, keys, replication_opts = create_dataset(manager, ks, cf, topology, logger, dcs_num=2)
|
||||
schema, keys, replication_opts = await create_dataset(manager, ks, cf, topology, logger)
|
||||
|
||||
snap_name, sstables = await take_snapshot(ks, servers, manager, logger)
|
||||
prefix = f'{cf}/{snap_name}'
|
||||
|
||||
@@ -127,7 +127,7 @@ async def test_refresh_deletes_uploaded_sstables(manager: ManagerClient):
|
||||
|
||||
ks = 'ks'
|
||||
cf = 'cf'
|
||||
_, keys, _ = create_dataset(manager, ks, cf, topology, logger)
|
||||
_, keys, _ = await create_dataset(manager, ks, cf, topology, logger)
|
||||
|
||||
_, sstables = await take_snapshot(ks, servers, manager, logger)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user