diff --git a/test/perf/perf_fast_forward.cc b/test/perf/perf_fast_forward.cc index 6d0ff6b42b..1c26cb5a72 100644 --- a/test/perf/perf_fast_forward.cc +++ b/test/perf/perf_fast_forward.cc @@ -174,6 +174,7 @@ public: const std::string& create_table_statement() const { return _create_table_statement; } virtual generator_fn make_generator(schema_ptr, const table_config&) = 0; + virtual bool enabled_by_default() const { return true; } }; // Adapts a function which accepts DataSet& as its argument to a dataset_acceptor @@ -788,8 +789,20 @@ static void assert_partition_start(flat_mutation_reader& rd) { assert(mfopt->is_partition_start()); } +// A dataset with one large partition with many clustered fragments. +// Partition key: pk int [0] +// Clusterint key: ck int [0 .. n_rows() - 1] +class clustered_ds { +public: + virtual int n_rows(const table_config&) = 0; + + virtual clustering_key make_ck(const schema& s, int ck) { + return clustering_key::from_single_value(s, serialized(ck)); + } +}; + // cf should belong to ks.test -static test_result scan_rows_with_stride(column_family& cf, int n_rows, int n_read = 1, int n_skip = 0) { +static test_result scan_rows_with_stride(column_family& cf, clustered_ds& ds, int n_rows, int n_read = 1, int n_skip = 0) { tests::reader_concurrency_semaphore_wrapper semaphore; auto rd = cf.make_reader(cf.schema(), semaphore.make_permit(), @@ -808,8 +821,8 @@ static test_result scan_rows_with_stride(column_family& cf, int n_rows, int n_re while (ck < n_rows) { if (n_skip) { rd.fast_forward_to(position_range( - position_in_partition(position_in_partition::clustering_row_tag_t(), clustering_key::from_singular(*cf.schema(), ck)), - position_in_partition(position_in_partition::clustering_row_tag_t(), clustering_key::from_singular(*cf.schema(), ck + n_read)) + position_in_partition(position_in_partition::clustering_row_tag_t(), ds.make_ck(*cf.schema(), ck)), + position_in_partition(position_in_partition::clustering_row_tag_t(), ds.make_ck(*cf.schema(), ck + n_read)) ), db::no_timeout).get(); } fragments += consume_all(rd); @@ -860,7 +873,7 @@ static test_result scan_with_stride_partitions(column_family& cf, int n, int n_r return {before, fragments}; } -static test_result slice_rows(column_family& cf, int offset = 0, int n_read = 1) { +static test_result slice_rows(column_family& cf, clustered_ds& ds, int offset = 0, int n_read = 1) { tests::reader_concurrency_semaphore_wrapper semaphore; auto rd = cf.make_reader(cf.schema(), semaphore.make_permit(), @@ -875,8 +888,8 @@ static test_result slice_rows(column_family& cf, int offset = 0, int n_read = 1) assert_partition_start(rd); rd.fast_forward_to(position_range( - position_in_partition::for_key(clustering_key::from_singular(*cf.schema(), offset)), - position_in_partition::for_key(clustering_key::from_singular(*cf.schema(), offset + n_read))), db::no_timeout).get(); + position_in_partition::for_key(ds.make_ck(*cf.schema(), offset)), + position_in_partition::for_key(ds.make_ck(*cf.schema(), offset + n_read))), db::no_timeout).get(); uint64_t fragments = consume_all_with_next_partition(rd); return {before, fragments}; @@ -887,12 +900,12 @@ static test_result test_reading_all(flat_mutation_reader& rd) { return {before, consume_all(rd)}; } -static test_result slice_rows_by_ck(column_family& cf, int offset = 0, int n_read = 1) { +static test_result slice_rows_by_ck(column_family& cf, clustered_ds& ds, int offset = 0, int n_read = 1) { tests::reader_concurrency_semaphore_wrapper semaphore; auto slice = partition_slice_builder(*cf.schema()) .with_range(query::clustering_range::make( - clustering_key::from_singular(*cf.schema(), offset), - clustering_key::from_singular(*cf.schema(), offset + n_read - 1))) + ds.make_ck(*cf.schema(), offset), + ds.make_ck(*cf.schema(), offset + n_read - 1))) .build(); auto pr = dht::partition_range::make_singular(make_pkey(*cf.schema(), 0)); auto rd = cf.make_reader(cf.schema(), semaphore.make_permit(), pr, slice); @@ -901,11 +914,11 @@ static test_result slice_rows_by_ck(column_family& cf, int offset = 0, int n_rea return test_reading_all(rd); } -static test_result select_spread_rows(column_family& cf, int stride = 0, int n_read = 1) { +static test_result select_spread_rows(column_family& cf, clustered_ds& ds, int stride = 0, int n_read = 1) { tests::reader_concurrency_semaphore_wrapper semaphore; auto sb = partition_slice_builder(*cf.schema()); for (int i = 0; i < n_read; ++i) { - sb.with_range(query::clustering_range::make_singular(clustering_key::from_singular(*cf.schema(), i * stride))); + sb.with_range(query::clustering_range::make_singular(ds.make_ck(*cf.schema(), i * stride))); } auto slice = sb.build(); @@ -918,11 +931,11 @@ static test_result select_spread_rows(column_family& cf, int stride = 0, int n_r return test_reading_all(rd); } -static test_result test_slicing_using_restrictions(column_family& cf, int_range row_range) { +static test_result test_slicing_using_restrictions(column_family& cf, clustered_ds& ds, int_range row_range) { tests::reader_concurrency_semaphore_wrapper semaphore; auto slice = partition_slice_builder(*cf.schema()) .with_range(std::move(row_range).transform([&] (int i) -> clustering_key { - return clustering_key::from_singular(*cf.schema(), i); + return ds.make_ck(*cf.schema(), i); })) .build(); auto pr = dht::partition_range::make_singular(make_pkey(*cf.schema(), 0)); @@ -933,7 +946,7 @@ static test_result test_slicing_using_restrictions(column_family& cf, int_range return test_reading_all(rd); } -static test_result slice_rows_single_key(column_family& cf, int offset = 0, int n_read = 1) { +static test_result slice_rows_single_key(column_family& cf, clustered_ds& ds, int offset = 0, int n_read = 1) { tests::reader_concurrency_semaphore_wrapper semaphore; auto pr = dht::partition_range::make_singular(make_pkey(*cf.schema(), 0)); auto rd = cf.make_reader(cf.schema(), semaphore.make_permit(), pr, cf.schema()->full_slice(), default_priority_class(), nullptr, streamed_mutation::forwarding::yes, mutation_reader::forwarding::no); @@ -942,8 +955,8 @@ static test_result slice_rows_single_key(column_family& cf, int offset = 0, int metrics_snapshot before; assert_partition_start(rd); rd.fast_forward_to(position_range( - position_in_partition::for_key(clustering_key::from_singular(*cf.schema(), offset)), - position_in_partition::for_key(clustering_key::from_singular(*cf.schema(), offset + n_read))), db::no_timeout).get(); + position_in_partition::for_key(ds.make_ck(*cf.schema(), offset)), + position_in_partition::for_key(ds.make_ck(*cf.schema(), offset + n_read))), db::no_timeout).get(); uint64_t fragments = consume_all_with_next_partition(rd); return {before, fragments}; @@ -976,18 +989,6 @@ bytes make_blob(size_t blob_size) { return big_blob; } -// A dataset with one large partition with many clustered fragments. -// Partition key: pk int [0] -// Clusterint key: ck int [0 .. n_rows() - 1] -class clustered_ds { -public: - virtual int n_rows(const table_config&) = 0; - - clustering_key make_ck(const schema& s, int ck) { - return clustering_key::from_single_value(s, serialized(ck)); - } -}; - // A dataset with many partitions. // Partition key: pk int [0 .. n_partitions() - 1] class multipart_ds { @@ -1005,6 +1006,24 @@ public: } }; +class scylla_bench_ds : public clustered_ds, public dataset { +public: + scylla_bench_ds(std::string name, std::string desc) : dataset(name, desc, + "create table {} (pk bigint, ck bigint, v blob, primary key (pk, ck))") {} + + int n_rows(const table_config& cfg) override { + return cfg.n_rows; + } + + clustering_key make_ck(const schema& s, int ck) override { + return clustering_key::from_single_value(s, serialized(ck)); + } + + bool enabled_by_default() const override { + return false; + } +}; + class large_part_ds1 : public simple_large_part_ds { public: large_part_ds1() : simple_large_part_ds("large-part-ds1", "One large partition with many small rows") {} @@ -1027,6 +1046,56 @@ public: } }; +class scylla_bench_large_part_ds1 : public scylla_bench_ds { +public: + scylla_bench_large_part_ds1() : scylla_bench_ds("sb-large-part-ds1", "One large partition with many small rows, scylla-bench schema") {} + + generator_fn make_generator(schema_ptr s, const table_config& cfg) override { + auto value = serialized(make_blob(cfg.value_size)); + auto& value_cdef = *s->get_column_definition("v"); + auto pk = partition_key::from_single_value(*s, serialized(0)); + return [this, s, ck = 0, n_ck = n_rows(cfg), &value_cdef, value, pk] () mutable -> std::optional { + if (ck == n_ck) { + return std::nullopt; + } + auto ts = api::new_timestamp(); + mutation m(s, pk); + auto& row = m.partition().clustered_row(*s, make_ck(*s, ck)); + row.cells().apply(value_cdef, atomic_cell::make_live(*value_cdef.type, ts, value)); + ++ck; + return m; + }; + } +}; + +class scylla_bench_small_part_ds1 : public multipart_ds, public scylla_bench_ds { +public: + scylla_bench_small_part_ds1() + : scylla_bench_ds("sb-small-part", "Many small partitions, scylla-bench schema") + { } + + generator_fn make_generator(schema_ptr s, const table_config& cfg) override { + auto value = serialized(make_blob(cfg.value_size)); + auto& value_cdef = *s->get_column_definition("v"); + auto ck = make_ck(*s, 0); + return [s, ck = std::move(ck), pk = 0, n_pk = n_partitions(cfg), &value_cdef, value] () mutable -> std::optional { + if (pk == n_pk) { + return std::nullopt; + } + auto ts = api::new_timestamp(); + mutation m(s, partition_key::from_single_value(*s, serialized(pk))); + auto& row = m.partition().clustered_row(*s, ck); + row.cells().apply(value_cdef, atomic_cell::make_live(*value_cdef.type, ts, value)); + ++pk; + return m; + }; + } + + int n_partitions(const table_config& cfg) override { + return cfg.n_rows; + } +}; + class small_part_ds1 : public multipart_ds, public dataset { public: small_part_ds1() : dataset("small-part", "Many small partitions with no clustering key", @@ -1057,7 +1126,7 @@ static test_result test_forwarding_with_restriction(column_family& cf, clustered tests::reader_concurrency_semaphore_wrapper semaphore; auto first_key = ds.n_rows(cfg) / 2; auto slice = partition_slice_builder(*cf.schema()) - .with_range(query::clustering_range::make_starting_with(clustering_key::from_singular(*cf.schema(), first_key))) + .with_range(query::clustering_range::make_starting_with(ds.make_ck(*cf.schema(), first_key))) .build(); auto pr = single_partition ? dht::partition_range::make_singular(make_pkey(*cf.schema(), 0)) : query::full_partition_range; @@ -1077,14 +1146,14 @@ static test_result test_forwarding_with_restriction(column_family& cf, clustered fragments += consume_all(rd); rd.fast_forward_to(position_range( - position_in_partition::for_key(clustering_key::from_singular(*cf.schema(), 1)), - position_in_partition::for_key(clustering_key::from_singular(*cf.schema(), 2))), db::no_timeout).get(); + position_in_partition::for_key(ds.make_ck(*cf.schema(), 1)), + position_in_partition::for_key(ds.make_ck(*cf.schema(), 2))), db::no_timeout).get(); fragments += consume_all(rd); rd.fast_forward_to(position_range( - position_in_partition::for_key(clustering_key::from_singular(*cf.schema(), first_key - 2)), - position_in_partition::for_key(clustering_key::from_singular(*cf.schema(), first_key + 2))), db::no_timeout).get(); + position_in_partition::for_key(ds.make_ck(*cf.schema(), first_key - 2)), + position_in_partition::for_key(ds.make_ck(*cf.schema(), first_key + 2))), db::no_timeout).get(); fragments += consume_all_with_next_partition(rd); return {before, fragments}; @@ -1285,7 +1354,7 @@ void test_large_partition_single_key_slice(column_family& cf, clustered_ds& ds) struct first { }; auto test = [&](int_range range) { - auto r = test_slicing_using_restrictions(cf, range); + auto r = test_slicing_using_restrictions(cf, ds, range); r.set_params(to_sstrings(new_test_case ? "->": 0, format("{}", range))); check_fragment_count(r, cardinality(intersection(range, live_range))); return r; @@ -1407,7 +1476,7 @@ void test_large_partition_skips(column_family& cf, clustered_ds& ds) { output_mgr->set_test_param_names({{"read", "{:<7}"}, {"skip", "{:<7}"}}, test_result::stats_names()); auto do_test = [&] (int n_read, int n_skip) { - auto r = scan_rows_with_stride(cf, n_rows, n_read, n_skip); + auto r = scan_rows_with_stride(cf, ds, n_rows, n_read, n_skip); r.set_params(to_sstrings(n_read, n_skip)); check_fragment_count(r, count_for_skip_pattern(n_rows, n_read, n_skip)); return r; @@ -1459,7 +1528,7 @@ void test_large_partition_slicing(column_family& cf, clustered_ds& ds) { output_mgr->set_test_param_names({{"offset", "{:<7}"}, {"read", "{:<7}"}}, test_result::stats_names()); auto test = [&] (int offset, int read) { run_test_case([&] { - auto r = slice_rows(cf, offset, read); + auto r = slice_rows(cf, ds, offset, read); r.set_params(to_sstrings(offset, read)); check_fragment_count(r, std::min(n_rows - offset, read)); return r; @@ -1483,7 +1552,7 @@ void test_large_partition_slicing_clustering_keys(column_family& cf, clustered_d output_mgr->set_test_param_names({{"offset", "{:<7}"}, {"read", "{:<7}"}}, test_result::stats_names()); auto test = [&] (int offset, int read) { run_test_case([&] { - auto r = slice_rows_by_ck(cf, offset, read); + auto r = slice_rows_by_ck(cf, ds, offset, read); r.set_params(to_sstrings(offset, read)); check_fragment_count(r, std::min(n_rows - offset, read)); return r; @@ -1507,7 +1576,7 @@ void test_large_partition_slicing_single_partition_reader(column_family& cf, clu output_mgr->set_test_param_names({{"offset", "{:<7}"}, {"read", "{:<7}"}}, test_result::stats_names()); auto test = [&](int offset, int read) { run_test_case([&] { - auto r = slice_rows_single_key(cf, offset, read); + auto r = slice_rows_single_key(cf, ds, offset, read); r.set_params(to_sstrings(offset, read)); check_fragment_count(r, std::min(n_rows - offset, read)); return r; @@ -1531,7 +1600,7 @@ void test_large_partition_select_few_rows(column_family& cf, clustered_ds& ds) { output_mgr->set_test_param_names({{"stride", "{:<7}"}, {"rows", "{:<7}"}}, test_result::stats_names()); auto test = [&](int stride, int read) { run_test_case([&] { - auto r = select_spread_rows(cf, stride, read); + auto r = select_spread_rows(cf, ds, stride, read); r.set_params(to_sstrings(stride, read)); check_fragment_count(r, read); return r; @@ -1657,6 +1726,8 @@ auto make_datasets() { }; add(std::make_unique()); add(std::make_unique()); + add(std::make_unique()); + add(std::make_unique()); return dsets; } @@ -1818,7 +1889,11 @@ int main(int argc, char** argv) { ), "Test groups to run") ("datasets", bpo::value>()->default_value( - boost::copy_range>(datasets | boost::adaptors::map_keys)), + boost::copy_range>(datasets + | boost::adaptors::filtered([] (auto&& e) { + return e.second->enabled_by_default(); + }) + | boost::adaptors::map_keys)), "Use only the following datasets") ("list-tests", "Show available test groups") ("list-datasets", "Show available datasets")