Merge "Add scylla-bench datasets to perf_fast_forward" from Tomasz

"
After this series one can use perf_fast_forward to generate the data set.
It takes a lot less time this way than to use scylla-bench.
"

* 'perf-fast-forward-scylla-bench-dataset' of github.com:tgrabiec/scylla:
  tests: perf_fast_forward: Use data_source::make_ck()
  tests: perf_fast_forward: Move declaration of clustered_ds up
  tests: perf_fast_forward: Make scylla_bench_small_part_ds1 not included by default
  tests: perf_fast_forward: Add data sets which conform to scylla-bench schema
This commit is contained in:
Avi Kivity
2021-07-08 17:33:22 +03:00

View File

@@ -174,6 +174,7 @@ public:
const std::string& create_table_statement() const { return _create_table_statement; }
virtual generator_fn make_generator(schema_ptr, const table_config&) = 0;
virtual bool enabled_by_default() const { return true; }
};
// Adapts a function which accepts DataSet& as its argument to a dataset_acceptor
@@ -788,8 +789,20 @@ static void assert_partition_start(flat_mutation_reader& rd) {
assert(mfopt->is_partition_start());
}
// A dataset with one large partition with many clustered fragments.
// Partition key: pk int [0]
// Clusterint key: ck int [0 .. n_rows() - 1]
class clustered_ds {
public:
virtual int n_rows(const table_config&) = 0;
virtual clustering_key make_ck(const schema& s, int ck) {
return clustering_key::from_single_value(s, serialized(ck));
}
};
// cf should belong to ks.test
static test_result scan_rows_with_stride(column_family& cf, int n_rows, int n_read = 1, int n_skip = 0) {
static test_result scan_rows_with_stride(column_family& cf, clustered_ds& ds, int n_rows, int n_read = 1, int n_skip = 0) {
tests::reader_concurrency_semaphore_wrapper semaphore;
auto rd = cf.make_reader(cf.schema(),
semaphore.make_permit(),
@@ -808,8 +821,8 @@ static test_result scan_rows_with_stride(column_family& cf, int n_rows, int n_re
while (ck < n_rows) {
if (n_skip) {
rd.fast_forward_to(position_range(
position_in_partition(position_in_partition::clustering_row_tag_t(), clustering_key::from_singular(*cf.schema(), ck)),
position_in_partition(position_in_partition::clustering_row_tag_t(), clustering_key::from_singular(*cf.schema(), ck + n_read))
position_in_partition(position_in_partition::clustering_row_tag_t(), ds.make_ck(*cf.schema(), ck)),
position_in_partition(position_in_partition::clustering_row_tag_t(), ds.make_ck(*cf.schema(), ck + n_read))
), db::no_timeout).get();
}
fragments += consume_all(rd);
@@ -860,7 +873,7 @@ static test_result scan_with_stride_partitions(column_family& cf, int n, int n_r
return {before, fragments};
}
static test_result slice_rows(column_family& cf, int offset = 0, int n_read = 1) {
static test_result slice_rows(column_family& cf, clustered_ds& ds, int offset = 0, int n_read = 1) {
tests::reader_concurrency_semaphore_wrapper semaphore;
auto rd = cf.make_reader(cf.schema(),
semaphore.make_permit(),
@@ -875,8 +888,8 @@ static test_result slice_rows(column_family& cf, int offset = 0, int n_read = 1)
assert_partition_start(rd);
rd.fast_forward_to(position_range(
position_in_partition::for_key(clustering_key::from_singular(*cf.schema(), offset)),
position_in_partition::for_key(clustering_key::from_singular(*cf.schema(), offset + n_read))), db::no_timeout).get();
position_in_partition::for_key(ds.make_ck(*cf.schema(), offset)),
position_in_partition::for_key(ds.make_ck(*cf.schema(), offset + n_read))), db::no_timeout).get();
uint64_t fragments = consume_all_with_next_partition(rd);
return {before, fragments};
@@ -887,12 +900,12 @@ static test_result test_reading_all(flat_mutation_reader& rd) {
return {before, consume_all(rd)};
}
static test_result slice_rows_by_ck(column_family& cf, int offset = 0, int n_read = 1) {
static test_result slice_rows_by_ck(column_family& cf, clustered_ds& ds, int offset = 0, int n_read = 1) {
tests::reader_concurrency_semaphore_wrapper semaphore;
auto slice = partition_slice_builder(*cf.schema())
.with_range(query::clustering_range::make(
clustering_key::from_singular(*cf.schema(), offset),
clustering_key::from_singular(*cf.schema(), offset + n_read - 1)))
ds.make_ck(*cf.schema(), offset),
ds.make_ck(*cf.schema(), offset + n_read - 1)))
.build();
auto pr = dht::partition_range::make_singular(make_pkey(*cf.schema(), 0));
auto rd = cf.make_reader(cf.schema(), semaphore.make_permit(), pr, slice);
@@ -901,11 +914,11 @@ static test_result slice_rows_by_ck(column_family& cf, int offset = 0, int n_rea
return test_reading_all(rd);
}
static test_result select_spread_rows(column_family& cf, int stride = 0, int n_read = 1) {
static test_result select_spread_rows(column_family& cf, clustered_ds& ds, int stride = 0, int n_read = 1) {
tests::reader_concurrency_semaphore_wrapper semaphore;
auto sb = partition_slice_builder(*cf.schema());
for (int i = 0; i < n_read; ++i) {
sb.with_range(query::clustering_range::make_singular(clustering_key::from_singular(*cf.schema(), i * stride)));
sb.with_range(query::clustering_range::make_singular(ds.make_ck(*cf.schema(), i * stride)));
}
auto slice = sb.build();
@@ -918,11 +931,11 @@ static test_result select_spread_rows(column_family& cf, int stride = 0, int n_r
return test_reading_all(rd);
}
static test_result test_slicing_using_restrictions(column_family& cf, int_range row_range) {
static test_result test_slicing_using_restrictions(column_family& cf, clustered_ds& ds, int_range row_range) {
tests::reader_concurrency_semaphore_wrapper semaphore;
auto slice = partition_slice_builder(*cf.schema())
.with_range(std::move(row_range).transform([&] (int i) -> clustering_key {
return clustering_key::from_singular(*cf.schema(), i);
return ds.make_ck(*cf.schema(), i);
}))
.build();
auto pr = dht::partition_range::make_singular(make_pkey(*cf.schema(), 0));
@@ -933,7 +946,7 @@ static test_result test_slicing_using_restrictions(column_family& cf, int_range
return test_reading_all(rd);
}
static test_result slice_rows_single_key(column_family& cf, int offset = 0, int n_read = 1) {
static test_result slice_rows_single_key(column_family& cf, clustered_ds& ds, int offset = 0, int n_read = 1) {
tests::reader_concurrency_semaphore_wrapper semaphore;
auto pr = dht::partition_range::make_singular(make_pkey(*cf.schema(), 0));
auto rd = cf.make_reader(cf.schema(), semaphore.make_permit(), pr, cf.schema()->full_slice(), default_priority_class(), nullptr, streamed_mutation::forwarding::yes, mutation_reader::forwarding::no);
@@ -942,8 +955,8 @@ static test_result slice_rows_single_key(column_family& cf, int offset = 0, int
metrics_snapshot before;
assert_partition_start(rd);
rd.fast_forward_to(position_range(
position_in_partition::for_key(clustering_key::from_singular(*cf.schema(), offset)),
position_in_partition::for_key(clustering_key::from_singular(*cf.schema(), offset + n_read))), db::no_timeout).get();
position_in_partition::for_key(ds.make_ck(*cf.schema(), offset)),
position_in_partition::for_key(ds.make_ck(*cf.schema(), offset + n_read))), db::no_timeout).get();
uint64_t fragments = consume_all_with_next_partition(rd);
return {before, fragments};
@@ -976,18 +989,6 @@ bytes make_blob(size_t blob_size) {
return big_blob;
}
// A dataset with one large partition with many clustered fragments.
// Partition key: pk int [0]
// Clusterint key: ck int [0 .. n_rows() - 1]
class clustered_ds {
public:
virtual int n_rows(const table_config&) = 0;
clustering_key make_ck(const schema& s, int ck) {
return clustering_key::from_single_value(s, serialized(ck));
}
};
// A dataset with many partitions.
// Partition key: pk int [0 .. n_partitions() - 1]
class multipart_ds {
@@ -1005,6 +1006,24 @@ public:
}
};
class scylla_bench_ds : public clustered_ds, public dataset {
public:
scylla_bench_ds(std::string name, std::string desc) : dataset(name, desc,
"create table {} (pk bigint, ck bigint, v blob, primary key (pk, ck))") {}
int n_rows(const table_config& cfg) override {
return cfg.n_rows;
}
clustering_key make_ck(const schema& s, int ck) override {
return clustering_key::from_single_value(s, serialized<int64_t>(ck));
}
bool enabled_by_default() const override {
return false;
}
};
class large_part_ds1 : public simple_large_part_ds {
public:
large_part_ds1() : simple_large_part_ds("large-part-ds1", "One large partition with many small rows") {}
@@ -1027,6 +1046,56 @@ public:
}
};
class scylla_bench_large_part_ds1 : public scylla_bench_ds {
public:
scylla_bench_large_part_ds1() : scylla_bench_ds("sb-large-part-ds1", "One large partition with many small rows, scylla-bench schema") {}
generator_fn make_generator(schema_ptr s, const table_config& cfg) override {
auto value = serialized(make_blob(cfg.value_size));
auto& value_cdef = *s->get_column_definition("v");
auto pk = partition_key::from_single_value(*s, serialized<int64_t>(0));
return [this, s, ck = 0, n_ck = n_rows(cfg), &value_cdef, value, pk] () mutable -> std::optional<mutation> {
if (ck == n_ck) {
return std::nullopt;
}
auto ts = api::new_timestamp();
mutation m(s, pk);
auto& row = m.partition().clustered_row(*s, make_ck(*s, ck));
row.cells().apply(value_cdef, atomic_cell::make_live(*value_cdef.type, ts, value));
++ck;
return m;
};
}
};
class scylla_bench_small_part_ds1 : public multipart_ds, public scylla_bench_ds {
public:
scylla_bench_small_part_ds1()
: scylla_bench_ds("sb-small-part", "Many small partitions, scylla-bench schema")
{ }
generator_fn make_generator(schema_ptr s, const table_config& cfg) override {
auto value = serialized(make_blob(cfg.value_size));
auto& value_cdef = *s->get_column_definition("v");
auto ck = make_ck(*s, 0);
return [s, ck = std::move(ck), pk = 0, n_pk = n_partitions(cfg), &value_cdef, value] () mutable -> std::optional<mutation> {
if (pk == n_pk) {
return std::nullopt;
}
auto ts = api::new_timestamp();
mutation m(s, partition_key::from_single_value(*s, serialized<int64_t>(pk)));
auto& row = m.partition().clustered_row(*s, ck);
row.cells().apply(value_cdef, atomic_cell::make_live(*value_cdef.type, ts, value));
++pk;
return m;
};
}
int n_partitions(const table_config& cfg) override {
return cfg.n_rows;
}
};
class small_part_ds1 : public multipart_ds, public dataset {
public:
small_part_ds1() : dataset("small-part", "Many small partitions with no clustering key",
@@ -1057,7 +1126,7 @@ static test_result test_forwarding_with_restriction(column_family& cf, clustered
tests::reader_concurrency_semaphore_wrapper semaphore;
auto first_key = ds.n_rows(cfg) / 2;
auto slice = partition_slice_builder(*cf.schema())
.with_range(query::clustering_range::make_starting_with(clustering_key::from_singular(*cf.schema(), first_key)))
.with_range(query::clustering_range::make_starting_with(ds.make_ck(*cf.schema(), first_key)))
.build();
auto pr = single_partition ? dht::partition_range::make_singular(make_pkey(*cf.schema(), 0)) : query::full_partition_range;
@@ -1077,14 +1146,14 @@ static test_result test_forwarding_with_restriction(column_family& cf, clustered
fragments += consume_all(rd);
rd.fast_forward_to(position_range(
position_in_partition::for_key(clustering_key::from_singular(*cf.schema(), 1)),
position_in_partition::for_key(clustering_key::from_singular(*cf.schema(), 2))), db::no_timeout).get();
position_in_partition::for_key(ds.make_ck(*cf.schema(), 1)),
position_in_partition::for_key(ds.make_ck(*cf.schema(), 2))), db::no_timeout).get();
fragments += consume_all(rd);
rd.fast_forward_to(position_range(
position_in_partition::for_key(clustering_key::from_singular(*cf.schema(), first_key - 2)),
position_in_partition::for_key(clustering_key::from_singular(*cf.schema(), first_key + 2))), db::no_timeout).get();
position_in_partition::for_key(ds.make_ck(*cf.schema(), first_key - 2)),
position_in_partition::for_key(ds.make_ck(*cf.schema(), first_key + 2))), db::no_timeout).get();
fragments += consume_all_with_next_partition(rd);
return {before, fragments};
@@ -1285,7 +1354,7 @@ void test_large_partition_single_key_slice(column_family& cf, clustered_ds& ds)
struct first {
};
auto test = [&](int_range range) {
auto r = test_slicing_using_restrictions(cf, range);
auto r = test_slicing_using_restrictions(cf, ds, range);
r.set_params(to_sstrings(new_test_case ? "->": 0, format("{}", range)));
check_fragment_count(r, cardinality(intersection(range, live_range)));
return r;
@@ -1407,7 +1476,7 @@ void test_large_partition_skips(column_family& cf, clustered_ds& ds) {
output_mgr->set_test_param_names({{"read", "{:<7}"}, {"skip", "{:<7}"}}, test_result::stats_names());
auto do_test = [&] (int n_read, int n_skip) {
auto r = scan_rows_with_stride(cf, n_rows, n_read, n_skip);
auto r = scan_rows_with_stride(cf, ds, n_rows, n_read, n_skip);
r.set_params(to_sstrings(n_read, n_skip));
check_fragment_count(r, count_for_skip_pattern(n_rows, n_read, n_skip));
return r;
@@ -1459,7 +1528,7 @@ void test_large_partition_slicing(column_family& cf, clustered_ds& ds) {
output_mgr->set_test_param_names({{"offset", "{:<7}"}, {"read", "{:<7}"}}, test_result::stats_names());
auto test = [&] (int offset, int read) {
run_test_case([&] {
auto r = slice_rows(cf, offset, read);
auto r = slice_rows(cf, ds, offset, read);
r.set_params(to_sstrings(offset, read));
check_fragment_count(r, std::min(n_rows - offset, read));
return r;
@@ -1483,7 +1552,7 @@ void test_large_partition_slicing_clustering_keys(column_family& cf, clustered_d
output_mgr->set_test_param_names({{"offset", "{:<7}"}, {"read", "{:<7}"}}, test_result::stats_names());
auto test = [&] (int offset, int read) {
run_test_case([&] {
auto r = slice_rows_by_ck(cf, offset, read);
auto r = slice_rows_by_ck(cf, ds, offset, read);
r.set_params(to_sstrings(offset, read));
check_fragment_count(r, std::min(n_rows - offset, read));
return r;
@@ -1507,7 +1576,7 @@ void test_large_partition_slicing_single_partition_reader(column_family& cf, clu
output_mgr->set_test_param_names({{"offset", "{:<7}"}, {"read", "{:<7}"}}, test_result::stats_names());
auto test = [&](int offset, int read) {
run_test_case([&] {
auto r = slice_rows_single_key(cf, offset, read);
auto r = slice_rows_single_key(cf, ds, offset, read);
r.set_params(to_sstrings(offset, read));
check_fragment_count(r, std::min(n_rows - offset, read));
return r;
@@ -1531,7 +1600,7 @@ void test_large_partition_select_few_rows(column_family& cf, clustered_ds& ds) {
output_mgr->set_test_param_names({{"stride", "{:<7}"}, {"rows", "{:<7}"}}, test_result::stats_names());
auto test = [&](int stride, int read) {
run_test_case([&] {
auto r = select_spread_rows(cf, stride, read);
auto r = select_spread_rows(cf, ds, stride, read);
r.set_params(to_sstrings(stride, read));
check_fragment_count(r, read);
return r;
@@ -1657,6 +1726,8 @@ auto make_datasets() {
};
add(std::make_unique<small_part_ds1>());
add(std::make_unique<large_part_ds1>());
add(std::make_unique<scylla_bench_large_part_ds1>());
add(std::make_unique<scylla_bench_small_part_ds1>());
return dsets;
}
@@ -1818,7 +1889,11 @@ int main(int argc, char** argv) {
),
"Test groups to run")
("datasets", bpo::value<std::vector<std::string>>()->default_value(
boost::copy_range<std::vector<std::string>>(datasets | boost::adaptors::map_keys)),
boost::copy_range<std::vector<std::string>>(datasets
| boost::adaptors::filtered([] (auto&& e) {
return e.second->enabled_by_default();
})
| boost::adaptors::map_keys)),
"Use only the following datasets")
("list-tests", "Show available test groups")
("list-datasets", "Show available datasets")