table: require a valid permit to be passed to most read methods
Now that the most prevalent users (range scan and single partition reads) all pass valid permits we require all users to do so and propagate the permit down towards `make_sstable_reader()`. The plan is to use this permit for restricting the sstable readers, instead of the semaphore the table is configured with. The various `make_streaming_*reader()` overloads keep using the internal semaphores as but they also create the permit before the read starts and pass it to `make_sstable_reader()`.
This commit is contained in:
15
database.hh
15
database.hh
@@ -642,6 +642,7 @@ private:
|
||||
// The 'range' parameter must be live as long as the reader is used.
|
||||
// Mutations returned by the reader will all have given schema.
|
||||
flat_mutation_reader make_sstable_reader(schema_ptr schema,
|
||||
reader_permit permit,
|
||||
lw_shared_ptr<sstables::sstable_set> sstables,
|
||||
const dht::partition_range& range,
|
||||
const query::partition_slice& slice,
|
||||
@@ -708,6 +709,7 @@ public:
|
||||
// If I/O needs to be issued to read anything in the specified range, the operations
|
||||
// will be scheduled under the priority class given by pc.
|
||||
flat_mutation_reader make_reader(schema_ptr schema,
|
||||
reader_permit permit,
|
||||
const dht::partition_range& range,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc = default_priority_class(),
|
||||
@@ -715,6 +717,7 @@ public:
|
||||
streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no,
|
||||
mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::yes) const;
|
||||
flat_mutation_reader make_reader_excluding_sstables(schema_ptr schema,
|
||||
reader_permit permit,
|
||||
std::vector<sstables::shared_sstable>& sst,
|
||||
const dht::partition_range& range,
|
||||
const query::partition_slice& slice,
|
||||
@@ -723,9 +726,9 @@ public:
|
||||
streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no,
|
||||
mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::yes) const;
|
||||
|
||||
flat_mutation_reader make_reader(schema_ptr schema, const dht::partition_range& range = query::full_partition_range) const {
|
||||
flat_mutation_reader make_reader(schema_ptr schema, reader_permit permit, const dht::partition_range& range = query::full_partition_range) const {
|
||||
auto& full_slice = schema->full_slice();
|
||||
return make_reader(std::move(schema), range, full_slice);
|
||||
return make_reader(std::move(schema), std::move(permit), range, full_slice);
|
||||
}
|
||||
|
||||
// The streaming mutation reader differs from the regular mutation reader in that:
|
||||
@@ -787,9 +790,9 @@ public:
|
||||
const schema_ptr& schema() const { return _schema; }
|
||||
void set_schema(schema_ptr);
|
||||
db::commitlog* commitlog() { return _commitlog; }
|
||||
future<const_mutation_partition_ptr> find_partition(schema_ptr, const dht::decorated_key& key) const;
|
||||
future<const_mutation_partition_ptr> find_partition_slow(schema_ptr, const partition_key& key) const;
|
||||
future<const_row_ptr> find_row(schema_ptr, const dht::decorated_key& partition_key, clustering_key clustering_key) const;
|
||||
future<const_mutation_partition_ptr> find_partition(schema_ptr, reader_permit permit, const dht::decorated_key& key) const;
|
||||
future<const_mutation_partition_ptr> find_partition_slow(schema_ptr, reader_permit permit, const partition_key& key) const;
|
||||
future<const_row_ptr> find_row(schema_ptr, reader_permit permit, const dht::decorated_key& partition_key, clustering_key clustering_key) const;
|
||||
// Applies given mutation to this column family
|
||||
// The mutation is always upgraded to current schema.
|
||||
void apply(const frozen_mutation& m, const schema_ptr& m_schema, db::rp_handle&& = {});
|
||||
@@ -1085,7 +1088,7 @@ private:
|
||||
public:
|
||||
// Iterate over all partitions. Protocol is the same as std::all_of(),
|
||||
// so that iteration can be stopped by returning false.
|
||||
future<bool> for_all_partitions_slow(schema_ptr, std::function<bool (const dht::decorated_key&, const mutation_partition&)> func) const;
|
||||
future<bool> for_all_partitions_slow(schema_ptr, reader_permit permit, std::function<bool (const dht::decorated_key&, const mutation_partition&)> func) const;
|
||||
|
||||
friend std::ostream& operator<<(std::ostream& out, const column_family& cf);
|
||||
// Testing purposes.
|
||||
|
||||
@@ -64,6 +64,7 @@ class build_progress_virtual_reader {
|
||||
|
||||
build_progress_reader(
|
||||
schema_ptr legacy_schema,
|
||||
reader_permit permit,
|
||||
column_family& scylla_views_build_progress,
|
||||
const dht::partition_range& range,
|
||||
const query::partition_slice& slice,
|
||||
@@ -80,6 +81,7 @@ class build_progress_virtual_reader {
|
||||
, _slice(adjust_partition_slice())
|
||||
, _underlying(scylla_views_build_progress.make_reader(
|
||||
scylla_views_build_progress.schema(),
|
||||
std::move(permit),
|
||||
range,
|
||||
slice,
|
||||
pc,
|
||||
@@ -188,7 +190,7 @@ public:
|
||||
|
||||
flat_mutation_reader operator()(
|
||||
schema_ptr s,
|
||||
reader_permit,
|
||||
reader_permit permit,
|
||||
const dht::partition_range& range,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
@@ -197,6 +199,7 @@ public:
|
||||
mutation_reader::forwarding fwd_mr) {
|
||||
return flat_mutation_reader(std::make_unique<build_progress_reader>(
|
||||
std::move(s),
|
||||
std::move(permit),
|
||||
_db.find_column_family(s->ks_name(), system_keyspace::v3::SCYLLA_VIEWS_BUILDS_IN_PROGRESS),
|
||||
range,
|
||||
slice,
|
||||
|
||||
@@ -46,6 +46,7 @@ class built_indexes_virtual_reader {
|
||||
built_indexes_reader(
|
||||
database& db,
|
||||
schema_ptr schema,
|
||||
reader_permit permit,
|
||||
column_family& built_views,
|
||||
const dht::partition_range& range,
|
||||
const query::partition_slice& slice,
|
||||
@@ -57,6 +58,7 @@ class built_indexes_virtual_reader {
|
||||
, _db(db)
|
||||
, _underlying(built_views.make_reader(
|
||||
built_views.schema(),
|
||||
std::move(permit),
|
||||
range,
|
||||
slice,
|
||||
pc,
|
||||
@@ -118,7 +120,7 @@ public:
|
||||
|
||||
flat_mutation_reader operator()(
|
||||
schema_ptr s,
|
||||
reader_permit,
|
||||
reader_permit permit,
|
||||
const dht::partition_range& range,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
@@ -128,6 +130,7 @@ public:
|
||||
return make_flat_mutation_reader<built_indexes_reader>(
|
||||
_db,
|
||||
std::move(s),
|
||||
std::move(permit),
|
||||
_db.find_column_family(s->ks_name(), system_keyspace::v3::BUILT_VIEWS),
|
||||
range,
|
||||
slice,
|
||||
|
||||
54
table.cc
54
table.cc
@@ -327,6 +327,7 @@ flat_mutation_reader make_range_sstable_reader(schema_ptr s,
|
||||
|
||||
flat_mutation_reader
|
||||
table::make_sstable_reader(schema_ptr s,
|
||||
reader_permit permit,
|
||||
lw_shared_ptr<sstables::sstable_set> sstables,
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& slice,
|
||||
@@ -396,9 +397,9 @@ table::make_sstable_reader(schema_ptr s,
|
||||
|
||||
// Exposed for testing, not performance critical.
|
||||
future<table::const_mutation_partition_ptr>
|
||||
table::find_partition(schema_ptr s, const dht::decorated_key& key) const {
|
||||
return do_with(dht::partition_range::make_singular(key), [s = std::move(s), this] (auto& range) {
|
||||
return do_with(this->make_reader(s, range), [s] (flat_mutation_reader& reader) {
|
||||
table::find_partition(schema_ptr s, reader_permit permit, const dht::decorated_key& key) const {
|
||||
return do_with(dht::partition_range::make_singular(key), [s = std::move(s), permit = std::move(permit), this] (auto& range) mutable {
|
||||
return do_with(this->make_reader(std::move(s), std::move(permit), range), [] (flat_mutation_reader& reader) {
|
||||
return read_mutation_from_flat_mutation_reader(reader, db::no_timeout).then([] (mutation_opt&& mo) -> std::unique_ptr<const mutation_partition> {
|
||||
if (!mo) {
|
||||
return {};
|
||||
@@ -410,13 +411,13 @@ table::find_partition(schema_ptr s, const dht::decorated_key& key) const {
|
||||
}
|
||||
|
||||
future<table::const_mutation_partition_ptr>
|
||||
table::find_partition_slow(schema_ptr s, const partition_key& key) const {
|
||||
return find_partition(s, dht::decorate_key(*s, key));
|
||||
table::find_partition_slow(schema_ptr s, reader_permit permit, const partition_key& key) const {
|
||||
return find_partition(s, std::move(permit), dht::decorate_key(*s, key));
|
||||
}
|
||||
|
||||
future<table::const_row_ptr>
|
||||
table::find_row(schema_ptr s, const dht::decorated_key& partition_key, clustering_key clustering_key) const {
|
||||
return find_partition(s, partition_key).then([clustering_key = std::move(clustering_key), s] (const_mutation_partition_ptr p) {
|
||||
table::find_row(schema_ptr s, reader_permit permit, const dht::decorated_key& partition_key, clustering_key clustering_key) const {
|
||||
return find_partition(s, std::move(permit), partition_key).then([clustering_key = std::move(clustering_key), s] (const_mutation_partition_ptr p) {
|
||||
if (!p) {
|
||||
return make_ready_future<const_row_ptr>();
|
||||
}
|
||||
@@ -432,6 +433,7 @@ table::find_row(schema_ptr s, const dht::decorated_key& partition_key, clusterin
|
||||
|
||||
flat_mutation_reader
|
||||
table::make_reader(schema_ptr s,
|
||||
reader_permit permit,
|
||||
const dht::partition_range& range,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
@@ -439,7 +441,7 @@ table::make_reader(schema_ptr s,
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr) const {
|
||||
if (_virtual_reader) {
|
||||
return (*_virtual_reader).make_reader(s, no_reader_permit(), range, slice, pc, trace_state, fwd, fwd_mr);
|
||||
return (*_virtual_reader).make_reader(s, std::move(permit), range, slice, pc, trace_state, fwd, fwd_mr);
|
||||
}
|
||||
|
||||
std::vector<flat_mutation_reader> readers;
|
||||
@@ -472,7 +474,7 @@ table::make_reader(schema_ptr s,
|
||||
if (cache_enabled() && !slice.options.contains(query::partition_slice::option::bypass_cache)) {
|
||||
readers.emplace_back(_cache.make_reader(s, range, slice, pc, std::move(trace_state), fwd, fwd_mr));
|
||||
} else {
|
||||
readers.emplace_back(make_sstable_reader(s, _sstables, range, slice, pc, std::move(trace_state), fwd, fwd_mr));
|
||||
readers.emplace_back(make_sstable_reader(s, permit, _sstables, range, slice, pc, std::move(trace_state), fwd, fwd_mr));
|
||||
}
|
||||
|
||||
auto comb_reader = make_combined_reader(s, std::move(readers), fwd, fwd_mr);
|
||||
@@ -496,25 +498,27 @@ sstables::shared_sstable table::make_streaming_sstable_for_write(std::optional<s
|
||||
flat_mutation_reader
|
||||
table::make_streaming_reader(schema_ptr s,
|
||||
const dht::partition_range_vector& ranges) const {
|
||||
auto permit = _config.streaming_read_concurrency_semaphore->make_permit();
|
||||
auto& slice = s->full_slice();
|
||||
auto& pc = service::get_local_streaming_read_priority();
|
||||
|
||||
auto source = mutation_source([this] (schema_ptr s, reader_permit, const dht::partition_range& range, const query::partition_slice& slice,
|
||||
auto source = mutation_source([this] (schema_ptr s, reader_permit permit, const dht::partition_range& range, const query::partition_slice& slice,
|
||||
const io_priority_class& pc, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) {
|
||||
std::vector<flat_mutation_reader> readers;
|
||||
readers.reserve(_memtables->size() + 1);
|
||||
for (auto&& mt : *_memtables) {
|
||||
readers.emplace_back(mt->make_flat_reader(s, range, slice, pc, trace_state, fwd, fwd_mr));
|
||||
}
|
||||
readers.emplace_back(make_sstable_reader(s, _sstables, range, slice, pc, std::move(trace_state), fwd, fwd_mr));
|
||||
readers.emplace_back(make_sstable_reader(s, permit, _sstables, range, slice, pc, std::move(trace_state), fwd, fwd_mr));
|
||||
return make_combined_reader(s, std::move(readers), fwd, fwd_mr);
|
||||
});
|
||||
|
||||
return make_flat_multi_range_reader(s, no_reader_permit(), std::move(source), ranges, slice, pc, nullptr, mutation_reader::forwarding::no);
|
||||
return make_flat_multi_range_reader(s, std::move(permit), std::move(source), ranges, slice, pc, nullptr, mutation_reader::forwarding::no);
|
||||
}
|
||||
|
||||
flat_mutation_reader table::make_streaming_reader(schema_ptr schema, const dht::partition_range& range,
|
||||
const query::partition_slice& slice, mutation_reader::forwarding fwd_mr) const {
|
||||
auto permit = _config.streaming_read_concurrency_semaphore->make_permit();
|
||||
const auto& pc = service::get_local_streaming_read_priority();
|
||||
auto trace_state = tracing::trace_state_ptr();
|
||||
const auto fwd = streamed_mutation::forwarding::no;
|
||||
@@ -524,7 +528,7 @@ flat_mutation_reader table::make_streaming_reader(schema_ptr schema, const dht::
|
||||
for (auto&& mt : *_memtables) {
|
||||
readers.emplace_back(mt->make_flat_reader(schema, range, slice, pc, trace_state, fwd, fwd_mr));
|
||||
}
|
||||
readers.emplace_back(make_sstable_reader(schema, _sstables, range, slice, pc, std::move(trace_state), fwd, fwd_mr));
|
||||
readers.emplace_back(make_sstable_reader(schema, permit, _sstables, range, slice, pc, std::move(trace_state), fwd, fwd_mr));
|
||||
return make_combined_reader(std::move(schema), std::move(readers), fwd, fwd_mr);
|
||||
}
|
||||
|
||||
@@ -535,7 +539,7 @@ future<std::vector<locked_cell>> table::lock_counter_cells(const mutation& m, db
|
||||
|
||||
// Not performance critical. Currently used for testing only.
|
||||
future<bool>
|
||||
table::for_all_partitions_slow(schema_ptr s, std::function<bool (const dht::decorated_key&, const mutation_partition&)> func) const {
|
||||
table::for_all_partitions_slow(schema_ptr s, reader_permit permit, std::function<bool (const dht::decorated_key&, const mutation_partition&)> func) const {
|
||||
struct iteration_state {
|
||||
flat_mutation_reader reader;
|
||||
std::function<bool (const dht::decorated_key&, const mutation_partition&)> func;
|
||||
@@ -543,13 +547,14 @@ table::for_all_partitions_slow(schema_ptr s, std::function<bool (const dht::deco
|
||||
bool empty = false;
|
||||
public:
|
||||
bool done() const { return !ok || empty; }
|
||||
iteration_state(schema_ptr s, const column_family& cf, std::function<bool (const dht::decorated_key&, const mutation_partition&)>&& func)
|
||||
: reader(cf.make_reader(std::move(s)))
|
||||
iteration_state(schema_ptr s, reader_permit permit, const column_family& cf,
|
||||
std::function<bool (const dht::decorated_key&, const mutation_partition&)>&& func)
|
||||
: reader(cf.make_reader(std::move(s), std::move(permit)))
|
||||
, func(std::move(func))
|
||||
{ }
|
||||
};
|
||||
|
||||
return do_with(iteration_state(std::move(s), *this, std::move(func)), [] (iteration_state& is) {
|
||||
return do_with(iteration_state(std::move(s), std::move(permit), *this, std::move(func)), [] (iteration_state& is) {
|
||||
return do_until([&is] { return is.done(); }, [&is] {
|
||||
return read_mutation_from_flat_mutation_reader(is.reader, db::no_timeout).then([&is](mutation_opt&& mo) {
|
||||
if (!mo) {
|
||||
@@ -1559,14 +1564,14 @@ table::sstables_as_snapshot_source() {
|
||||
return snapshot_source([this] () {
|
||||
auto sst_set = _sstables;
|
||||
return mutation_source([this, sst_set] (schema_ptr s,
|
||||
reader_permit,
|
||||
reader_permit permit,
|
||||
const dht::partition_range& r,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr) {
|
||||
return make_sstable_reader(std::move(s), sst_set, r, slice, pc, std::move(trace_state), fwd, fwd_mr);
|
||||
return make_sstable_reader(std::move(s), std::move(permit), sst_set, r, slice, pc, std::move(trace_state), fwd, fwd_mr);
|
||||
}, [this, sst_set] {
|
||||
return make_partition_presence_checker(sst_set);
|
||||
});
|
||||
@@ -2392,14 +2397,14 @@ table::query(schema_ptr s,
|
||||
mutation_source
|
||||
table::as_mutation_source() const {
|
||||
return mutation_source([this] (schema_ptr s,
|
||||
reader_permit,
|
||||
reader_permit permit,
|
||||
const dht::partition_range& range,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr) {
|
||||
return this->make_reader(std::move(s), range, slice, pc, std::move(trace_state), fwd, fwd_mr);
|
||||
return this->make_reader(std::move(s), std::move(permit), range, slice, pc, std::move(trace_state), fwd, fwd_mr);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -2469,6 +2474,7 @@ table::disable_auto_compaction() {
|
||||
|
||||
flat_mutation_reader
|
||||
table::make_reader_excluding_sstables(schema_ptr s,
|
||||
reader_permit permit,
|
||||
std::vector<sstables::shared_sstable>& excluded,
|
||||
const dht::partition_range& range,
|
||||
const query::partition_slice& slice,
|
||||
@@ -2488,7 +2494,7 @@ table::make_reader_excluding_sstables(schema_ptr s,
|
||||
effective_sstables->erase(sst);
|
||||
}
|
||||
|
||||
readers.emplace_back(make_sstable_reader(s, std::move(effective_sstables), range, slice, pc, std::move(trace_state), fwd, fwd_mr));
|
||||
readers.emplace_back(make_sstable_reader(s, permit, std::move(effective_sstables), range, slice, pc, std::move(trace_state), fwd, fwd_mr));
|
||||
return make_combined_reader(s, std::move(readers), fwd, fwd_mr);
|
||||
}
|
||||
|
||||
@@ -2604,14 +2610,14 @@ table::stream_view_replica_updates(const schema_ptr& s, mutation&& m, db::timeou
|
||||
mutation_source
|
||||
table::as_mutation_source_excluding(std::vector<sstables::shared_sstable>& ssts) const {
|
||||
return mutation_source([this, &ssts] (schema_ptr s,
|
||||
reader_permit,
|
||||
reader_permit permit,
|
||||
const dht::partition_range& range,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr) {
|
||||
return this->make_reader_excluding_sstables(std::move(s), ssts, range, slice, pc, std::move(trace_state), fwd, fwd_mr);
|
||||
return this->make_reader_excluding_sstables(std::move(s), std::move(permit), ssts, range, slice, pc, std::move(trace_state), fwd, fwd_mr);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -27,6 +27,7 @@
|
||||
|
||||
#include "test/lib/cql_test_env.hh"
|
||||
#include "test/lib/result_set_assertions.hh"
|
||||
#include "test/lib/reader_permit.hh"
|
||||
|
||||
#include "database.hh"
|
||||
#include "partition_slice_builder.hh"
|
||||
@@ -150,7 +151,7 @@ SEASTAR_THREAD_TEST_CASE(test_database_with_data_in_sstables_is_a_mutation_sourc
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr) {
|
||||
return cf.make_reader(s, range, slice, pc, std::move(trace_state), fwd, fwd_mr);
|
||||
return cf.make_reader(s, tests::make_permit(), range, slice, pc, std::move(trace_state), fwd, fwd_mr);
|
||||
});
|
||||
});
|
||||
return make_ready_future<>();
|
||||
|
||||
@@ -42,6 +42,7 @@
|
||||
#include "query-result-reader.hh"
|
||||
#include "partition_slice_builder.hh"
|
||||
#include "test/lib/tmpdir.hh"
|
||||
#include "test/lib/reader_permit.hh"
|
||||
#include "sstables/compaction_manager.hh"
|
||||
|
||||
#include <seastar/testing/test_case.hh>
|
||||
@@ -516,7 +517,7 @@ SEASTAR_TEST_CASE(test_multiple_memtables_one_partition) {
|
||||
auto verify_row = [&] (int32_t c1, int32_t r1) {
|
||||
auto c_key = clustering_key::from_exploded(*s, {int32_type->decompose(c1)});
|
||||
auto p_key = dht::decorate_key(*s, key);
|
||||
auto r = cf.find_row(cf.schema(), p_key, c_key).get0();
|
||||
auto r = cf.find_row(cf.schema(), tests::make_permit(), p_key, c_key).get0();
|
||||
{
|
||||
BOOST_REQUIRE(r);
|
||||
auto i = r->find_cell(r1_col.id);
|
||||
@@ -575,13 +576,13 @@ SEASTAR_TEST_CASE(test_flush_in_the_middle_of_a_scan) {
|
||||
std::sort(mutations.begin(), mutations.end(), mutation_decorated_key_less_comparator());
|
||||
|
||||
// Flush will happen in the middle of reading for this scanner
|
||||
auto assert_that_scanner1 = assert_that(cf.make_reader(s, query::full_partition_range));
|
||||
auto assert_that_scanner1 = assert_that(cf.make_reader(s, tests::make_permit(), query::full_partition_range));
|
||||
|
||||
// Flush will happen before it is invoked
|
||||
auto assert_that_scanner2 = assert_that(cf.make_reader(s, query::full_partition_range));
|
||||
auto assert_that_scanner2 = assert_that(cf.make_reader(s, tests::make_permit(), query::full_partition_range));
|
||||
|
||||
// Flush will happen after all data was read, but before EOS was consumed
|
||||
auto assert_that_scanner3 = assert_that(cf.make_reader(s, query::full_partition_range));
|
||||
auto assert_that_scanner3 = assert_that(cf.make_reader(s, tests::make_permit(), query::full_partition_range));
|
||||
|
||||
assert_that_scanner1.produces(mutations[0]);
|
||||
assert_that_scanner1.produces(mutations[1]);
|
||||
@@ -655,7 +656,7 @@ SEASTAR_TEST_CASE(test_multiple_memtables_multiple_partitions) {
|
||||
}
|
||||
|
||||
return do_with(std::move(result), [&cf, s, &r1_col, shadow] (auto& result) {
|
||||
return cf.for_all_partitions_slow(s, [&, s] (const dht::decorated_key& pk, const mutation_partition& mp) {
|
||||
return cf.for_all_partitions_slow(s, tests::make_permit(), [&, s] (const dht::decorated_key& pk, const mutation_partition& mp) {
|
||||
auto p1 = value_cast<int32_t>(int32_type->deserialize(pk._key.explode(*s)[0]));
|
||||
for (const rows_entry& re : mp.range(*s, nonwrapping_range<clustering_key_prefix>())) {
|
||||
auto c1 = value_cast<int32_t>(int32_type->deserialize(re.key().explode(*s)[0]));
|
||||
|
||||
@@ -70,7 +70,7 @@
|
||||
#include <boost/icl/interval_map.hpp>
|
||||
#include "test/lib/test_services.hh"
|
||||
#include "test/lib/cql_test_env.hh"
|
||||
|
||||
#include "test/lib/reader_permit.hh"
|
||||
#include "test/lib/sstable_utils.hh"
|
||||
|
||||
namespace fs = std::filesystem;
|
||||
@@ -5577,7 +5577,7 @@ SEASTAR_TEST_CASE(incremental_compaction_data_resurrection_test) {
|
||||
cf->set_compaction_strategy(sstables::compaction_strategy_type::null);
|
||||
|
||||
auto is_partition_dead = [&s, &cf] (partition_key& pkey) {
|
||||
column_family::const_mutation_partition_ptr mp = cf->find_partition_slow(s, pkey).get0();
|
||||
column_family::const_mutation_partition_ptr mp = cf->find_partition_slow(s, tests::make_permit(), pkey).get0();
|
||||
return mp && bool(mp->partition_tombstone());
|
||||
};
|
||||
|
||||
|
||||
@@ -45,6 +45,7 @@
|
||||
#include "db/batchlog_manager.hh"
|
||||
#include "schema_builder.hh"
|
||||
#include "test/lib/tmpdir.hh"
|
||||
#include "test/lib/reader_permit.hh"
|
||||
#include "db/query_context.hh"
|
||||
#include "test/lib/test_services.hh"
|
||||
#include "db/view/view_builder.hh"
|
||||
@@ -277,7 +278,7 @@ public:
|
||||
table_name = std::move(table_name)] (database& db) mutable {
|
||||
auto& cf = db.find_column_family(ks_name, table_name);
|
||||
auto schema = cf.schema();
|
||||
return cf.find_partition_slow(schema, pkey)
|
||||
return cf.find_partition_slow(schema, tests::make_permit(), pkey)
|
||||
.then([schema, ckey, column_name, exp] (column_family::const_mutation_partition_ptr p) {
|
||||
assert(p != nullptr);
|
||||
auto row = p->find_row(*schema, ckey);
|
||||
|
||||
@@ -26,6 +26,7 @@
|
||||
#include <boost/range/adaptors.hpp>
|
||||
#include <json/json.h>
|
||||
#include "test/lib/cql_test_env.hh"
|
||||
#include "test/lib/reader_permit.hh"
|
||||
#include "test/perf/perf.hh"
|
||||
#include <seastar/core/app-template.hh>
|
||||
#include "schema_builder.hh"
|
||||
@@ -747,6 +748,7 @@ static void assert_partition_start(flat_mutation_reader& rd) {
|
||||
// cf should belong to ks.test
|
||||
static test_result scan_rows_with_stride(column_family& cf, int n_rows, int n_read = 1, int n_skip = 0) {
|
||||
auto rd = cf.make_reader(cf.schema(),
|
||||
tests::make_permit(),
|
||||
query::full_partition_range,
|
||||
cf.schema()->full_slice(),
|
||||
default_priority_class(),
|
||||
@@ -791,7 +793,7 @@ static test_result scan_with_stride_partitions(column_family& cf, int n, int n_r
|
||||
int pk = 0;
|
||||
auto pr = n_skip ? dht::partition_range::make_ending_with(dht::partition_range::bound(keys[0], false)) // covering none
|
||||
: query::full_partition_range;
|
||||
auto rd = cf.make_reader(cf.schema(), pr, cf.schema()->full_slice());
|
||||
auto rd = cf.make_reader(cf.schema(), tests::make_permit(), pr, cf.schema()->full_slice());
|
||||
|
||||
metrics_snapshot before;
|
||||
|
||||
@@ -813,6 +815,7 @@ static test_result scan_with_stride_partitions(column_family& cf, int n, int n_r
|
||||
|
||||
static test_result slice_rows(column_family& cf, int offset = 0, int n_read = 1) {
|
||||
auto rd = cf.make_reader(cf.schema(),
|
||||
tests::make_permit(),
|
||||
query::full_partition_range,
|
||||
cf.schema()->full_slice(),
|
||||
default_priority_class(),
|
||||
@@ -842,7 +845,7 @@ static test_result slice_rows_by_ck(column_family& cf, int offset = 0, int n_rea
|
||||
clustering_key::from_singular(*cf.schema(), offset + n_read - 1)))
|
||||
.build();
|
||||
auto pr = dht::partition_range::make_singular(make_pkey(*cf.schema(), 0));
|
||||
auto rd = cf.make_reader(cf.schema(), pr, slice);
|
||||
auto rd = cf.make_reader(cf.schema(), tests::make_permit(), pr, slice);
|
||||
return test_reading_all(rd);
|
||||
}
|
||||
|
||||
@@ -854,6 +857,7 @@ static test_result select_spread_rows(column_family& cf, int stride = 0, int n_r
|
||||
|
||||
auto slice = sb.build();
|
||||
auto rd = cf.make_reader(cf.schema(),
|
||||
tests::make_permit(),
|
||||
query::full_partition_range,
|
||||
slice);
|
||||
|
||||
@@ -867,14 +871,14 @@ static test_result test_slicing_using_restrictions(column_family& cf, int_range
|
||||
}))
|
||||
.build();
|
||||
auto pr = dht::partition_range::make_singular(make_pkey(*cf.schema(), 0));
|
||||
auto rd = cf.make_reader(cf.schema(), pr, slice, default_priority_class(), nullptr,
|
||||
auto rd = cf.make_reader(cf.schema(), tests::make_permit(), pr, slice, default_priority_class(), nullptr,
|
||||
streamed_mutation::forwarding::no, mutation_reader::forwarding::no);
|
||||
return test_reading_all(rd);
|
||||
}
|
||||
|
||||
static test_result slice_rows_single_key(column_family& cf, int offset = 0, int n_read = 1) {
|
||||
auto pr = dht::partition_range::make_singular(make_pkey(*cf.schema(), 0));
|
||||
auto rd = cf.make_reader(cf.schema(), pr, cf.schema()->full_slice(), default_priority_class(), nullptr, streamed_mutation::forwarding::yes, mutation_reader::forwarding::no);
|
||||
auto rd = cf.make_reader(cf.schema(), tests::make_permit(), pr, cf.schema()->full_slice(), default_priority_class(), nullptr, streamed_mutation::forwarding::yes, mutation_reader::forwarding::no);
|
||||
|
||||
metrics_snapshot before;
|
||||
assert_partition_start(rd);
|
||||
@@ -893,7 +897,7 @@ static test_result slice_partitions(column_family& cf, const std::vector<dht::de
|
||||
dht::partition_range::bound(keys[std::min<size_t>(keys.size(), offset + n_read) - 1], true)
|
||||
);
|
||||
|
||||
auto rd = cf.make_reader(cf.schema(), pr, cf.schema()->full_slice());
|
||||
auto rd = cf.make_reader(cf.schema(), tests::make_permit(), pr, cf.schema()->full_slice());
|
||||
metrics_snapshot before;
|
||||
|
||||
uint64_t fragments = consume_all_with_next_partition(rd);
|
||||
@@ -996,6 +1000,7 @@ static test_result test_forwarding_with_restriction(column_family& cf, clustered
|
||||
|
||||
auto pr = single_partition ? dht::partition_range::make_singular(make_pkey(*cf.schema(), 0)) : query::full_partition_range;
|
||||
auto rd = cf.make_reader(cf.schema(),
|
||||
tests::make_permit(),
|
||||
pr,
|
||||
slice,
|
||||
default_priority_class(),
|
||||
|
||||
Reference in New Issue
Block a user