pgo: add a repair workload

This workload is added to teach PGO about repair.
Tests are inconclusive about its alignment with existing workloads,
because repair doesn't seem utilize 100% of the reactor.
This commit is contained in:
Michał Chojnowski
2023-02-15 03:37:54 +01:00
committed by Kefu Chai
parent 1c9ce0a9ee
commit 95c8d88b96
2 changed files with 115 additions and 0 deletions

92
pgo/conf/repair.yaml Normal file
View File

@@ -0,0 +1,92 @@
#
# This is an example YAML profile for cassandra-stress
#
# insert data
# cassandra-stress user profile=/home/jake/stress1.yaml ops(insert=1)
#
# read, using query simple1:
# cassandra-stress profile=/home/jake/stress1.yaml ops(simple1=1)
#
# mixed workload (90/10)
# cassandra-stress user profile=/home/jake/stress1.yaml ops(insert=1,simple1=9)
#
# Keyspace info
#
keyspace: ks
#
# The CQL for creating a keyspace (optional if it already exists)
#
keyspace_definition: |
CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3};
#
# Table info
#
table: standard1
#
# The CQL for creating a table you wish to stress (optional if it already exists)
#
table_definition: |
CREATE TABLE standard1 (
name text,
choice boolean,
date timestamp,
address inet,
dbl double,
lval bigint,
ival int,
uid timeuuid,
value blob,
PRIMARY KEY((name,choice), date, address, dbl, lval, ival, uid)
);
#
# Optional meta information on the generated columns in the above table
# The min and max only apply to text and blob types
# The distribution field represents the total unique population
# distribution of that column across rows. Supported types are
#
# EXP(min..max) An exponential distribution over the range [min..max]
# EXTREME(min..max,shape) An extreme value (Weibull) distribution over the range [min..max]
# GAUSSIAN(min..max,stdvrng) A gaussian/normal distribution, where mean=(min+max)/2, and stdev is (mean-min)/stdvrng
# GAUSSIAN(min..max,mean,stdev) A gaussian/normal distribution, with explicitly defined mean and stdev
# UNIFORM(min..max) A uniform distribution over the range [min, max]
# FIXED(val) A fixed distribution, always returning the same value
# Aliases: extr, gauss, normal, norm, weibull
#
# If preceded by ~, the distribution is inverted
#
# Defaults for all columns are size: uniform(4..8), population: uniform(1..100B), cluster: fixed(1)
#
columnspec:
- name: name
size: uniform(1..10)
population: uniform(1..4M) # the range of unique values to select for the field (default is 100Billion)
- name: date
cluster: uniform(20..40)
- name: lval
population: gaussian(1..1000)
cluster: uniform(1..4)
insert:
partitions: uniform(1..50) # number of unique partitions to update in a single operation
# if batchcount > 1, multiple batches will be used but all partitions will
# occur in all batches (unless they finish early); only the row counts will vary
batchtype: UNLOGGED # type of batch to use
select: uniform(1..10)/10 # uniform chance any single generated CQL row will be visited in a partition;
# generated for each partition independently, each time we visit it
#
# A list of queries you wish to run against the schema
#
queries:
simple1:
cql: select * from standard1 where name = ? and choice = ? LIMIT 100
fields: samerow # samerow or multirow (select arguments from the same row, or randomly from all rows in the partition)
range1:
cql: select * from standard1 where name = ? and choice = ? and date >= ? LIMIT 100
fields: multirow # samerow or multirow (select arguments from the same row, or randomly from all rows in the partition)

View File

@@ -697,6 +697,29 @@ async def train_counters(executable: PathLike, workdir: PathLike) -> None:
#trainers["counters"] = ("counters_dataset", train_counters)
populators["counters_dataset"] = populate_counters
# REPAIR ==================================================
async def populate_repair(executable: PathLike, workdir: PathLike) -> None:
async with with_cs_populate(executable=executable, workdir=workdir) as server:
await cs(cmd=["user", "profile=./conf/repair.yaml", "ops(insert=1)"], n=5000000, cl="local_quorum", node=server)
await cs(cmd=["write"], n=1000000, cl="local_quorum", schema=RF3_SCHEMA, node=server)
async def train_repair(executable: PathLike, workdir: PathLike) -> None:
# The idea is to remove some user data sstables from the node (in an offline cluster),
# start the cluster, and run repair on the affected node.
# I don't know if it's a good PGO workload.
# Does this cover repair codepaths reasonably?
addr = cluster_metadata(workdir)["subnet"] + ".2"
await bash(fr"rm -rf {workdir}/{addr}/data/ks/*")
async with with_cluster(executable=executable, workdir=workdir) as (addrs, procs):
await asyncio.sleep(5) # FIXME: artificial gossip sleep, get rid of it.
repair_id = (await query(["curl", "--silent", "-X", "POST", fr"http://{addr}:10000/storage_service/repair_async/ks"])).decode()
await query(["curl", "--silent", fr"http://{addr}:10000/storage_service/repair_status/?id={repair_id}"])
await merge_profraw(workdir)
trainers["repair"] = ("repair_dataset", train_repair)
populators["repair_dataset"] = populate_repair
################################################################################
# Training procedures