From cc5137ffe34ac4167fbe293b500fde315d69b170 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Tue, 14 Apr 2020 17:33:32 +0300 Subject: [PATCH] table: require a valid permit to be passed to most read methods Now that the most prevalent users (range scan and single partition reads) all pass valid permits we require all users to do so and propagate the permit down towards `make_sstable_reader()`. The plan is to use this permit for restricting the sstable readers, instead of the semaphore the table is configured with. The various `make_streaming_*reader()` overloads keep using the internal semaphores as but they also create the permit before the read starts and pass it to `make_sstable_reader()`. --- database.hh | 15 ++++--- db/view/build_progress_virtual_reader.hh | 5 ++- index/built_indexes_virtual_reader.hh | 5 ++- table.cc | 54 +++++++++++++----------- test/boost/database_test.cc | 3 +- test/boost/mutation_test.cc | 11 ++--- test/boost/sstable_datafile_test.cc | 4 +- test/lib/cql_test_env.cc | 3 +- test/perf/perf_fast_forward.cc | 15 ++++--- 9 files changed, 69 insertions(+), 46 deletions(-) diff --git a/database.hh b/database.hh index c1f7a4f1e7..91348b5dd4 100644 --- a/database.hh +++ b/database.hh @@ -642,6 +642,7 @@ private: // The 'range' parameter must be live as long as the reader is used. // Mutations returned by the reader will all have given schema. flat_mutation_reader make_sstable_reader(schema_ptr schema, + reader_permit permit, lw_shared_ptr sstables, const dht::partition_range& range, const query::partition_slice& slice, @@ -708,6 +709,7 @@ public: // If I/O needs to be issued to read anything in the specified range, the operations // will be scheduled under the priority class given by pc. flat_mutation_reader make_reader(schema_ptr schema, + reader_permit permit, const dht::partition_range& range, const query::partition_slice& slice, const io_priority_class& pc = default_priority_class(), @@ -715,6 +717,7 @@ public: streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no, mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::yes) const; flat_mutation_reader make_reader_excluding_sstables(schema_ptr schema, + reader_permit permit, std::vector& sst, const dht::partition_range& range, const query::partition_slice& slice, @@ -723,9 +726,9 @@ public: streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no, mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::yes) const; - flat_mutation_reader make_reader(schema_ptr schema, const dht::partition_range& range = query::full_partition_range) const { + flat_mutation_reader make_reader(schema_ptr schema, reader_permit permit, const dht::partition_range& range = query::full_partition_range) const { auto& full_slice = schema->full_slice(); - return make_reader(std::move(schema), range, full_slice); + return make_reader(std::move(schema), std::move(permit), range, full_slice); } // The streaming mutation reader differs from the regular mutation reader in that: @@ -787,9 +790,9 @@ public: const schema_ptr& schema() const { return _schema; } void set_schema(schema_ptr); db::commitlog* commitlog() { return _commitlog; } - future find_partition(schema_ptr, const dht::decorated_key& key) const; - future find_partition_slow(schema_ptr, const partition_key& key) const; - future find_row(schema_ptr, const dht::decorated_key& partition_key, clustering_key clustering_key) const; + future find_partition(schema_ptr, reader_permit permit, const dht::decorated_key& key) const; + future find_partition_slow(schema_ptr, reader_permit permit, const partition_key& key) const; + future find_row(schema_ptr, reader_permit permit, const dht::decorated_key& partition_key, clustering_key clustering_key) const; // Applies given mutation to this column family // The mutation is always upgraded to current schema. void apply(const frozen_mutation& m, const schema_ptr& m_schema, db::rp_handle&& = {}); @@ -1085,7 +1088,7 @@ private: public: // Iterate over all partitions. Protocol is the same as std::all_of(), // so that iteration can be stopped by returning false. - future for_all_partitions_slow(schema_ptr, std::function func) const; + future for_all_partitions_slow(schema_ptr, reader_permit permit, std::function func) const; friend std::ostream& operator<<(std::ostream& out, const column_family& cf); // Testing purposes. diff --git a/db/view/build_progress_virtual_reader.hh b/db/view/build_progress_virtual_reader.hh index 24cdb6965b..fee6ae945c 100644 --- a/db/view/build_progress_virtual_reader.hh +++ b/db/view/build_progress_virtual_reader.hh @@ -64,6 +64,7 @@ class build_progress_virtual_reader { build_progress_reader( schema_ptr legacy_schema, + reader_permit permit, column_family& scylla_views_build_progress, const dht::partition_range& range, const query::partition_slice& slice, @@ -80,6 +81,7 @@ class build_progress_virtual_reader { , _slice(adjust_partition_slice()) , _underlying(scylla_views_build_progress.make_reader( scylla_views_build_progress.schema(), + std::move(permit), range, slice, pc, @@ -188,7 +190,7 @@ public: flat_mutation_reader operator()( schema_ptr s, - reader_permit, + reader_permit permit, const dht::partition_range& range, const query::partition_slice& slice, const io_priority_class& pc, @@ -197,6 +199,7 @@ public: mutation_reader::forwarding fwd_mr) { return flat_mutation_reader(std::make_unique( std::move(s), + std::move(permit), _db.find_column_family(s->ks_name(), system_keyspace::v3::SCYLLA_VIEWS_BUILDS_IN_PROGRESS), range, slice, diff --git a/index/built_indexes_virtual_reader.hh b/index/built_indexes_virtual_reader.hh index b08e92712e..bbf2df0891 100644 --- a/index/built_indexes_virtual_reader.hh +++ b/index/built_indexes_virtual_reader.hh @@ -46,6 +46,7 @@ class built_indexes_virtual_reader { built_indexes_reader( database& db, schema_ptr schema, + reader_permit permit, column_family& built_views, const dht::partition_range& range, const query::partition_slice& slice, @@ -57,6 +58,7 @@ class built_indexes_virtual_reader { , _db(db) , _underlying(built_views.make_reader( built_views.schema(), + std::move(permit), range, slice, pc, @@ -118,7 +120,7 @@ public: flat_mutation_reader operator()( schema_ptr s, - reader_permit, + reader_permit permit, const dht::partition_range& range, const query::partition_slice& slice, const io_priority_class& pc, @@ -128,6 +130,7 @@ public: return make_flat_mutation_reader( _db, std::move(s), + std::move(permit), _db.find_column_family(s->ks_name(), system_keyspace::v3::BUILT_VIEWS), range, slice, diff --git a/table.cc b/table.cc index df37e02fdf..a8a9e3a9f0 100644 --- a/table.cc +++ b/table.cc @@ -327,6 +327,7 @@ flat_mutation_reader make_range_sstable_reader(schema_ptr s, flat_mutation_reader table::make_sstable_reader(schema_ptr s, + reader_permit permit, lw_shared_ptr sstables, const dht::partition_range& pr, const query::partition_slice& slice, @@ -396,9 +397,9 @@ table::make_sstable_reader(schema_ptr s, // Exposed for testing, not performance critical. future -table::find_partition(schema_ptr s, const dht::decorated_key& key) const { - return do_with(dht::partition_range::make_singular(key), [s = std::move(s), this] (auto& range) { - return do_with(this->make_reader(s, range), [s] (flat_mutation_reader& reader) { +table::find_partition(schema_ptr s, reader_permit permit, const dht::decorated_key& key) const { + return do_with(dht::partition_range::make_singular(key), [s = std::move(s), permit = std::move(permit), this] (auto& range) mutable { + return do_with(this->make_reader(std::move(s), std::move(permit), range), [] (flat_mutation_reader& reader) { return read_mutation_from_flat_mutation_reader(reader, db::no_timeout).then([] (mutation_opt&& mo) -> std::unique_ptr { if (!mo) { return {}; @@ -410,13 +411,13 @@ table::find_partition(schema_ptr s, const dht::decorated_key& key) const { } future -table::find_partition_slow(schema_ptr s, const partition_key& key) const { - return find_partition(s, dht::decorate_key(*s, key)); +table::find_partition_slow(schema_ptr s, reader_permit permit, const partition_key& key) const { + return find_partition(s, std::move(permit), dht::decorate_key(*s, key)); } future -table::find_row(schema_ptr s, const dht::decorated_key& partition_key, clustering_key clustering_key) const { - return find_partition(s, partition_key).then([clustering_key = std::move(clustering_key), s] (const_mutation_partition_ptr p) { +table::find_row(schema_ptr s, reader_permit permit, const dht::decorated_key& partition_key, clustering_key clustering_key) const { + return find_partition(s, std::move(permit), partition_key).then([clustering_key = std::move(clustering_key), s] (const_mutation_partition_ptr p) { if (!p) { return make_ready_future(); } @@ -432,6 +433,7 @@ table::find_row(schema_ptr s, const dht::decorated_key& partition_key, clusterin flat_mutation_reader table::make_reader(schema_ptr s, + reader_permit permit, const dht::partition_range& range, const query::partition_slice& slice, const io_priority_class& pc, @@ -439,7 +441,7 @@ table::make_reader(schema_ptr s, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) const { if (_virtual_reader) { - return (*_virtual_reader).make_reader(s, no_reader_permit(), range, slice, pc, trace_state, fwd, fwd_mr); + return (*_virtual_reader).make_reader(s, std::move(permit), range, slice, pc, trace_state, fwd, fwd_mr); } std::vector readers; @@ -472,7 +474,7 @@ table::make_reader(schema_ptr s, if (cache_enabled() && !slice.options.contains(query::partition_slice::option::bypass_cache)) { readers.emplace_back(_cache.make_reader(s, range, slice, pc, std::move(trace_state), fwd, fwd_mr)); } else { - readers.emplace_back(make_sstable_reader(s, _sstables, range, slice, pc, std::move(trace_state), fwd, fwd_mr)); + readers.emplace_back(make_sstable_reader(s, permit, _sstables, range, slice, pc, std::move(trace_state), fwd, fwd_mr)); } auto comb_reader = make_combined_reader(s, std::move(readers), fwd, fwd_mr); @@ -496,25 +498,27 @@ sstables::shared_sstable table::make_streaming_sstable_for_write(std::optionalmake_permit(); auto& slice = s->full_slice(); auto& pc = service::get_local_streaming_read_priority(); - auto source = mutation_source([this] (schema_ptr s, reader_permit, const dht::partition_range& range, const query::partition_slice& slice, + auto source = mutation_source([this] (schema_ptr s, reader_permit permit, const dht::partition_range& range, const query::partition_slice& slice, const io_priority_class& pc, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) { std::vector readers; readers.reserve(_memtables->size() + 1); for (auto&& mt : *_memtables) { readers.emplace_back(mt->make_flat_reader(s, range, slice, pc, trace_state, fwd, fwd_mr)); } - readers.emplace_back(make_sstable_reader(s, _sstables, range, slice, pc, std::move(trace_state), fwd, fwd_mr)); + readers.emplace_back(make_sstable_reader(s, permit, _sstables, range, slice, pc, std::move(trace_state), fwd, fwd_mr)); return make_combined_reader(s, std::move(readers), fwd, fwd_mr); }); - return make_flat_multi_range_reader(s, no_reader_permit(), std::move(source), ranges, slice, pc, nullptr, mutation_reader::forwarding::no); + return make_flat_multi_range_reader(s, std::move(permit), std::move(source), ranges, slice, pc, nullptr, mutation_reader::forwarding::no); } flat_mutation_reader table::make_streaming_reader(schema_ptr schema, const dht::partition_range& range, const query::partition_slice& slice, mutation_reader::forwarding fwd_mr) const { + auto permit = _config.streaming_read_concurrency_semaphore->make_permit(); const auto& pc = service::get_local_streaming_read_priority(); auto trace_state = tracing::trace_state_ptr(); const auto fwd = streamed_mutation::forwarding::no; @@ -524,7 +528,7 @@ flat_mutation_reader table::make_streaming_reader(schema_ptr schema, const dht:: for (auto&& mt : *_memtables) { readers.emplace_back(mt->make_flat_reader(schema, range, slice, pc, trace_state, fwd, fwd_mr)); } - readers.emplace_back(make_sstable_reader(schema, _sstables, range, slice, pc, std::move(trace_state), fwd, fwd_mr)); + readers.emplace_back(make_sstable_reader(schema, permit, _sstables, range, slice, pc, std::move(trace_state), fwd, fwd_mr)); return make_combined_reader(std::move(schema), std::move(readers), fwd, fwd_mr); } @@ -535,7 +539,7 @@ future> table::lock_counter_cells(const mutation& m, db // Not performance critical. Currently used for testing only. future -table::for_all_partitions_slow(schema_ptr s, std::function func) const { +table::for_all_partitions_slow(schema_ptr s, reader_permit permit, std::function func) const { struct iteration_state { flat_mutation_reader reader; std::function func; @@ -543,13 +547,14 @@ table::for_all_partitions_slow(schema_ptr s, std::function&& func) - : reader(cf.make_reader(std::move(s))) + iteration_state(schema_ptr s, reader_permit permit, const column_family& cf, + std::function&& func) + : reader(cf.make_reader(std::move(s), std::move(permit))) , func(std::move(func)) { } }; - return do_with(iteration_state(std::move(s), *this, std::move(func)), [] (iteration_state& is) { + return do_with(iteration_state(std::move(s), std::move(permit), *this, std::move(func)), [] (iteration_state& is) { return do_until([&is] { return is.done(); }, [&is] { return read_mutation_from_flat_mutation_reader(is.reader, db::no_timeout).then([&is](mutation_opt&& mo) { if (!mo) { @@ -1559,14 +1564,14 @@ table::sstables_as_snapshot_source() { return snapshot_source([this] () { auto sst_set = _sstables; return mutation_source([this, sst_set] (schema_ptr s, - reader_permit, + reader_permit permit, const dht::partition_range& r, const query::partition_slice& slice, const io_priority_class& pc, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) { - return make_sstable_reader(std::move(s), sst_set, r, slice, pc, std::move(trace_state), fwd, fwd_mr); + return make_sstable_reader(std::move(s), std::move(permit), sst_set, r, slice, pc, std::move(trace_state), fwd, fwd_mr); }, [this, sst_set] { return make_partition_presence_checker(sst_set); }); @@ -2392,14 +2397,14 @@ table::query(schema_ptr s, mutation_source table::as_mutation_source() const { return mutation_source([this] (schema_ptr s, - reader_permit, + reader_permit permit, const dht::partition_range& range, const query::partition_slice& slice, const io_priority_class& pc, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) { - return this->make_reader(std::move(s), range, slice, pc, std::move(trace_state), fwd, fwd_mr); + return this->make_reader(std::move(s), std::move(permit), range, slice, pc, std::move(trace_state), fwd, fwd_mr); }); } @@ -2469,6 +2474,7 @@ table::disable_auto_compaction() { flat_mutation_reader table::make_reader_excluding_sstables(schema_ptr s, + reader_permit permit, std::vector& excluded, const dht::partition_range& range, const query::partition_slice& slice, @@ -2488,7 +2494,7 @@ table::make_reader_excluding_sstables(schema_ptr s, effective_sstables->erase(sst); } - readers.emplace_back(make_sstable_reader(s, std::move(effective_sstables), range, slice, pc, std::move(trace_state), fwd, fwd_mr)); + readers.emplace_back(make_sstable_reader(s, permit, std::move(effective_sstables), range, slice, pc, std::move(trace_state), fwd, fwd_mr)); return make_combined_reader(s, std::move(readers), fwd, fwd_mr); } @@ -2604,14 +2610,14 @@ table::stream_view_replica_updates(const schema_ptr& s, mutation&& m, db::timeou mutation_source table::as_mutation_source_excluding(std::vector& ssts) const { return mutation_source([this, &ssts] (schema_ptr s, - reader_permit, + reader_permit permit, const dht::partition_range& range, const query::partition_slice& slice, const io_priority_class& pc, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) { - return this->make_reader_excluding_sstables(std::move(s), ssts, range, slice, pc, std::move(trace_state), fwd, fwd_mr); + return this->make_reader_excluding_sstables(std::move(s), std::move(permit), ssts, range, slice, pc, std::move(trace_state), fwd, fwd_mr); }); } diff --git a/test/boost/database_test.cc b/test/boost/database_test.cc index f39aa67e5e..5ea3a47e6a 100644 --- a/test/boost/database_test.cc +++ b/test/boost/database_test.cc @@ -27,6 +27,7 @@ #include "test/lib/cql_test_env.hh" #include "test/lib/result_set_assertions.hh" +#include "test/lib/reader_permit.hh" #include "database.hh" #include "partition_slice_builder.hh" @@ -150,7 +151,7 @@ SEASTAR_THREAD_TEST_CASE(test_database_with_data_in_sstables_is_a_mutation_sourc tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) { - return cf.make_reader(s, range, slice, pc, std::move(trace_state), fwd, fwd_mr); + return cf.make_reader(s, tests::make_permit(), range, slice, pc, std::move(trace_state), fwd, fwd_mr); }); }); return make_ready_future<>(); diff --git a/test/boost/mutation_test.cc b/test/boost/mutation_test.cc index 06cec29bed..cb0e855579 100644 --- a/test/boost/mutation_test.cc +++ b/test/boost/mutation_test.cc @@ -42,6 +42,7 @@ #include "query-result-reader.hh" #include "partition_slice_builder.hh" #include "test/lib/tmpdir.hh" +#include "test/lib/reader_permit.hh" #include "sstables/compaction_manager.hh" #include @@ -516,7 +517,7 @@ SEASTAR_TEST_CASE(test_multiple_memtables_one_partition) { auto verify_row = [&] (int32_t c1, int32_t r1) { auto c_key = clustering_key::from_exploded(*s, {int32_type->decompose(c1)}); auto p_key = dht::decorate_key(*s, key); - auto r = cf.find_row(cf.schema(), p_key, c_key).get0(); + auto r = cf.find_row(cf.schema(), tests::make_permit(), p_key, c_key).get0(); { BOOST_REQUIRE(r); auto i = r->find_cell(r1_col.id); @@ -575,13 +576,13 @@ SEASTAR_TEST_CASE(test_flush_in_the_middle_of_a_scan) { std::sort(mutations.begin(), mutations.end(), mutation_decorated_key_less_comparator()); // Flush will happen in the middle of reading for this scanner - auto assert_that_scanner1 = assert_that(cf.make_reader(s, query::full_partition_range)); + auto assert_that_scanner1 = assert_that(cf.make_reader(s, tests::make_permit(), query::full_partition_range)); // Flush will happen before it is invoked - auto assert_that_scanner2 = assert_that(cf.make_reader(s, query::full_partition_range)); + auto assert_that_scanner2 = assert_that(cf.make_reader(s, tests::make_permit(), query::full_partition_range)); // Flush will happen after all data was read, but before EOS was consumed - auto assert_that_scanner3 = assert_that(cf.make_reader(s, query::full_partition_range)); + auto assert_that_scanner3 = assert_that(cf.make_reader(s, tests::make_permit(), query::full_partition_range)); assert_that_scanner1.produces(mutations[0]); assert_that_scanner1.produces(mutations[1]); @@ -655,7 +656,7 @@ SEASTAR_TEST_CASE(test_multiple_memtables_multiple_partitions) { } return do_with(std::move(result), [&cf, s, &r1_col, shadow] (auto& result) { - return cf.for_all_partitions_slow(s, [&, s] (const dht::decorated_key& pk, const mutation_partition& mp) { + return cf.for_all_partitions_slow(s, tests::make_permit(), [&, s] (const dht::decorated_key& pk, const mutation_partition& mp) { auto p1 = value_cast(int32_type->deserialize(pk._key.explode(*s)[0])); for (const rows_entry& re : mp.range(*s, nonwrapping_range())) { auto c1 = value_cast(int32_type->deserialize(re.key().explode(*s)[0])); diff --git a/test/boost/sstable_datafile_test.cc b/test/boost/sstable_datafile_test.cc index b72871ddb9..4c55f1a204 100644 --- a/test/boost/sstable_datafile_test.cc +++ b/test/boost/sstable_datafile_test.cc @@ -70,7 +70,7 @@ #include #include "test/lib/test_services.hh" #include "test/lib/cql_test_env.hh" - +#include "test/lib/reader_permit.hh" #include "test/lib/sstable_utils.hh" namespace fs = std::filesystem; @@ -5577,7 +5577,7 @@ SEASTAR_TEST_CASE(incremental_compaction_data_resurrection_test) { cf->set_compaction_strategy(sstables::compaction_strategy_type::null); auto is_partition_dead = [&s, &cf] (partition_key& pkey) { - column_family::const_mutation_partition_ptr mp = cf->find_partition_slow(s, pkey).get0(); + column_family::const_mutation_partition_ptr mp = cf->find_partition_slow(s, tests::make_permit(), pkey).get0(); return mp && bool(mp->partition_tombstone()); }; diff --git a/test/lib/cql_test_env.cc b/test/lib/cql_test_env.cc index 1ac7e22534..d0bf8e98bf 100644 --- a/test/lib/cql_test_env.cc +++ b/test/lib/cql_test_env.cc @@ -45,6 +45,7 @@ #include "db/batchlog_manager.hh" #include "schema_builder.hh" #include "test/lib/tmpdir.hh" +#include "test/lib/reader_permit.hh" #include "db/query_context.hh" #include "test/lib/test_services.hh" #include "db/view/view_builder.hh" @@ -277,7 +278,7 @@ public: table_name = std::move(table_name)] (database& db) mutable { auto& cf = db.find_column_family(ks_name, table_name); auto schema = cf.schema(); - return cf.find_partition_slow(schema, pkey) + return cf.find_partition_slow(schema, tests::make_permit(), pkey) .then([schema, ckey, column_name, exp] (column_family::const_mutation_partition_ptr p) { assert(p != nullptr); auto row = p->find_row(*schema, ckey); diff --git a/test/perf/perf_fast_forward.cc b/test/perf/perf_fast_forward.cc index b2bdf4ef75..73d66cd585 100644 --- a/test/perf/perf_fast_forward.cc +++ b/test/perf/perf_fast_forward.cc @@ -26,6 +26,7 @@ #include #include #include "test/lib/cql_test_env.hh" +#include "test/lib/reader_permit.hh" #include "test/perf/perf.hh" #include #include "schema_builder.hh" @@ -747,6 +748,7 @@ static void assert_partition_start(flat_mutation_reader& rd) { // cf should belong to ks.test static test_result scan_rows_with_stride(column_family& cf, int n_rows, int n_read = 1, int n_skip = 0) { auto rd = cf.make_reader(cf.schema(), + tests::make_permit(), query::full_partition_range, cf.schema()->full_slice(), default_priority_class(), @@ -791,7 +793,7 @@ static test_result scan_with_stride_partitions(column_family& cf, int n, int n_r int pk = 0; auto pr = n_skip ? dht::partition_range::make_ending_with(dht::partition_range::bound(keys[0], false)) // covering none : query::full_partition_range; - auto rd = cf.make_reader(cf.schema(), pr, cf.schema()->full_slice()); + auto rd = cf.make_reader(cf.schema(), tests::make_permit(), pr, cf.schema()->full_slice()); metrics_snapshot before; @@ -813,6 +815,7 @@ static test_result scan_with_stride_partitions(column_family& cf, int n, int n_r static test_result slice_rows(column_family& cf, int offset = 0, int n_read = 1) { auto rd = cf.make_reader(cf.schema(), + tests::make_permit(), query::full_partition_range, cf.schema()->full_slice(), default_priority_class(), @@ -842,7 +845,7 @@ static test_result slice_rows_by_ck(column_family& cf, int offset = 0, int n_rea clustering_key::from_singular(*cf.schema(), offset + n_read - 1))) .build(); auto pr = dht::partition_range::make_singular(make_pkey(*cf.schema(), 0)); - auto rd = cf.make_reader(cf.schema(), pr, slice); + auto rd = cf.make_reader(cf.schema(), tests::make_permit(), pr, slice); return test_reading_all(rd); } @@ -854,6 +857,7 @@ static test_result select_spread_rows(column_family& cf, int stride = 0, int n_r auto slice = sb.build(); auto rd = cf.make_reader(cf.schema(), + tests::make_permit(), query::full_partition_range, slice); @@ -867,14 +871,14 @@ static test_result test_slicing_using_restrictions(column_family& cf, int_range })) .build(); auto pr = dht::partition_range::make_singular(make_pkey(*cf.schema(), 0)); - auto rd = cf.make_reader(cf.schema(), pr, slice, default_priority_class(), nullptr, + auto rd = cf.make_reader(cf.schema(), tests::make_permit(), pr, slice, default_priority_class(), nullptr, streamed_mutation::forwarding::no, mutation_reader::forwarding::no); return test_reading_all(rd); } static test_result slice_rows_single_key(column_family& cf, int offset = 0, int n_read = 1) { auto pr = dht::partition_range::make_singular(make_pkey(*cf.schema(), 0)); - auto rd = cf.make_reader(cf.schema(), pr, cf.schema()->full_slice(), default_priority_class(), nullptr, streamed_mutation::forwarding::yes, mutation_reader::forwarding::no); + auto rd = cf.make_reader(cf.schema(), tests::make_permit(), pr, cf.schema()->full_slice(), default_priority_class(), nullptr, streamed_mutation::forwarding::yes, mutation_reader::forwarding::no); metrics_snapshot before; assert_partition_start(rd); @@ -893,7 +897,7 @@ static test_result slice_partitions(column_family& cf, const std::vector(keys.size(), offset + n_read) - 1], true) ); - auto rd = cf.make_reader(cf.schema(), pr, cf.schema()->full_slice()); + auto rd = cf.make_reader(cf.schema(), tests::make_permit(), pr, cf.schema()->full_slice()); metrics_snapshot before; uint64_t fragments = consume_all_with_next_partition(rd); @@ -996,6 +1000,7 @@ static test_result test_forwarding_with_restriction(column_family& cf, clustered auto pr = single_partition ? dht::partition_range::make_singular(make_pkey(*cf.schema(), 0)) : query::full_partition_range; auto rd = cf.make_reader(cf.schema(), + tests::make_permit(), pr, slice, default_priority_class(),