flat_mutation_reader: flat_multi_range_reader: add reader_permit parameter

Mutation sources will soon require a valid permit so make sure we have
one and pass it to the mutation sources when creating the underlying
readers.
For now, pass no_reader_permit() on call sites, deferring the obtaining
of a valid permit to later patches.
This commit is contained in:
Botond Dénes
2020-05-13 15:15:43 +03:00
parent 97af2d98d2
commit 0b4ec62332
6 changed files with 25 additions and 16 deletions

View File

@@ -2057,7 +2057,7 @@ flat_mutation_reader make_multishard_streaming_reader(distributed<database>& db,
std::move(trace_state), fwd_mr);
});
auto&& full_slice = schema->full_slice();
return make_flat_multi_range_reader(std::move(schema), std::move(ms), std::move(range_generator), std::move(full_slice),
return make_flat_multi_range_reader(std::move(schema), no_reader_permit(), std::move(ms), std::move(range_generator), std::move(full_slice),
service::get_local_streaming_read_priority(), {}, mutation_reader::forwarding::no);
}

View File

@@ -597,6 +597,7 @@ flat_mutation_reader_from_mutations(std::vector<mutation> mutations, const dht::
/// Delays the creation of the underlying reader until it is first
/// fast-forwarded and thus a range is available.
class forwardable_empty_mutation_reader : public flat_mutation_reader::impl {
reader_permit _permit;
mutation_source _source;
const query::partition_slice& _slice;
const io_priority_class& _pc;
@@ -604,11 +605,13 @@ class forwardable_empty_mutation_reader : public flat_mutation_reader::impl {
flat_mutation_reader_opt _reader;
public:
forwardable_empty_mutation_reader(schema_ptr s,
reader_permit permit,
mutation_source source,
const query::partition_slice& slice,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state)
: impl(s)
, _permit(std::move(permit))
, _source(std::move(source))
, _slice(slice)
, _pc(pc)
@@ -632,7 +635,7 @@ public:
}
virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override {
if (!_reader) {
_reader = _source.make_reader(_schema, no_reader_permit(), pr, _slice, _pc, std::move(_trace_state), streamed_mutation::forwarding::no,
_reader = _source.make_reader(_schema, _permit, pr, _slice, _pc, std::move(_trace_state), streamed_mutation::forwarding::no,
mutation_reader::forwarding::yes);
_end_of_stream = false;
return make_ready_future<>();
@@ -674,6 +677,7 @@ class flat_multi_range_mutation_reader : public flat_mutation_reader::impl {
public:
flat_multi_range_mutation_reader(
schema_ptr s,
reader_permit permit,
mutation_source source,
const dht::partition_range& first_range,
Generator generator,
@@ -682,7 +686,7 @@ public:
tracing::trace_state_ptr trace_state)
: impl(s)
, _generator(std::move(generator))
, _reader(source.make_reader(s, no_reader_permit(), first_range, slice, pc, trace_state, streamed_mutation::forwarding::no, mutation_reader::forwarding::yes))
, _reader(source.make_reader(s, std::move(permit), first_range, slice, pc, trace_state, streamed_mutation::forwarding::no, mutation_reader::forwarding::yes))
{
}
@@ -729,7 +733,7 @@ public:
};
flat_mutation_reader
make_flat_multi_range_reader(schema_ptr s, mutation_source source, const dht::partition_range_vector& ranges,
make_flat_multi_range_reader(schema_ptr s, reader_permit permit, mutation_source source, const dht::partition_range_vector& ranges,
const query::partition_slice& slice, const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
mutation_reader::forwarding fwd_mr)
@@ -751,14 +755,15 @@ make_flat_multi_range_reader(schema_ptr s, mutation_source source, const dht::pa
if (ranges.empty()) {
if (fwd_mr) {
return make_flat_mutation_reader<forwardable_empty_mutation_reader>(std::move(s), std::move(source), slice, pc, std::move(trace_state));
return make_flat_mutation_reader<forwardable_empty_mutation_reader>(std::move(s), std::move(permit), std::move(source), slice, pc,
std::move(trace_state));
} else {
return make_empty_flat_reader(std::move(s));
}
} else if (ranges.size() == 1) {
return source.make_reader(std::move(s), no_reader_permit(), ranges.front(), slice, pc, std::move(trace_state), streamed_mutation::forwarding::no, fwd_mr);
return source.make_reader(std::move(s), std::move(permit), ranges.front(), slice, pc, std::move(trace_state), streamed_mutation::forwarding::no, fwd_mr);
} else {
return make_flat_mutation_reader<flat_multi_range_mutation_reader<adapter>>(std::move(s), std::move(source),
return make_flat_mutation_reader<flat_multi_range_mutation_reader<adapter>>(std::move(s), std::move(permit), std::move(source),
ranges.front(), adapter(std::next(ranges.cbegin()), ranges.cend()), slice, pc, std::move(trace_state));
}
}
@@ -766,6 +771,7 @@ make_flat_multi_range_reader(schema_ptr s, mutation_source source, const dht::pa
flat_mutation_reader
make_flat_multi_range_reader(
schema_ptr s,
reader_permit permit,
mutation_source source,
std::function<std::optional<dht::partition_range>()> generator,
const query::partition_slice& slice,
@@ -798,12 +804,12 @@ make_flat_multi_range_reader(
auto* first_range = adapted_generator();
if (!first_range) {
if (fwd_mr) {
return make_flat_mutation_reader<forwardable_empty_mutation_reader>(std::move(s), std::move(source), slice, pc, std::move(trace_state));
return make_flat_mutation_reader<forwardable_empty_mutation_reader>(std::move(s), std::move(permit), std::move(source), slice, pc, std::move(trace_state));
} else {
return make_empty_flat_reader(std::move(s));
}
} else {
return make_flat_mutation_reader<flat_multi_range_mutation_reader<adapter>>(std::move(s), std::move(source),
return make_flat_mutation_reader<flat_multi_range_mutation_reader<adapter>>(std::move(s), std::move(permit), std::move(source),
*first_range, std::move(adapted_generator), slice, pc, std::move(trace_state));
}
}

View File

@@ -39,6 +39,7 @@
using seastar::future;
class mutation_source;
class reader_permit;
GCC6_CONCEPT(
template<typename Consumer>
@@ -677,7 +678,7 @@ flat_mutation_reader_from_mutations(std::vector<mutation> ms,
/// ranges. Otherwise the reader is created with
/// mutation_reader::forwarding::yes.
flat_mutation_reader
make_flat_multi_range_reader(schema_ptr s, mutation_source source, const dht::partition_range_vector& ranges,
make_flat_multi_range_reader(schema_ptr s, reader_permit permit, mutation_source source, const dht::partition_range_vector& ranges,
const query::partition_slice& slice, const io_priority_class& pc = default_priority_class(),
tracing::trace_state_ptr trace_state = nullptr,
flat_mutation_reader::partition_range_forwarding fwd_mr = flat_mutation_reader::partition_range_forwarding::yes);
@@ -689,6 +690,7 @@ make_flat_multi_range_reader(schema_ptr s, mutation_source source, const dht::pa
flat_mutation_reader
make_flat_multi_range_reader(
schema_ptr s,
reader_permit permit,
mutation_source source,
std::function<std::optional<dht::partition_range>()> generator,
const query::partition_slice& slice,

View File

@@ -599,7 +599,7 @@ static future<reconcilable_result> do_query_mutations(
mutation_reader::forwarding fwd_mr) {
return make_multishard_combining_reader(ctx, std::move(s), pr, ps, pc, std::move(trace_state), fwd_mr);
});
auto reader = make_flat_multi_range_reader(s, std::move(ms), ranges, cmd.slice,
auto reader = make_flat_multi_range_reader(s, no_reader_permit(), std::move(ms), ranges, cmd.slice,
service::get_local_sstable_query_read_priority(), trace_state, mutation_reader::forwarding::no);
auto compaction_state = make_lw_shared<compact_for_mutation_query_state>(*s, cmd.timestamp, cmd.slice, cmd.row_limit,

View File

@@ -510,7 +510,7 @@ table::make_streaming_reader(schema_ptr s,
return make_combined_reader(s, std::move(readers), fwd, fwd_mr);
});
return make_flat_multi_range_reader(s, std::move(source), ranges, slice, pc, nullptr, mutation_reader::forwarding::no);
return make_flat_multi_range_reader(s, no_reader_permit(), std::move(source), ranges, slice, pc, nullptr, mutation_reader::forwarding::no);
}
flat_mutation_reader table::make_streaming_reader(schema_ptr schema, const dht::partition_range& range,

View File

@@ -39,6 +39,7 @@
#include "test/lib/simple_schema.hh"
#include "test/lib/flat_mutation_reader_assertions.hh"
#include "test/lib/log.hh"
#include "test/lib/reader_permit.hh"
struct mock_consumer {
struct result {
@@ -441,14 +442,14 @@ SEASTAR_TEST_CASE(test_multi_range_reader) {
// Generator ranges are single pass, so we need a new range each time they are used.
auto run_test = [&] (auto make_empty_ranges, auto make_single_ranges, auto make_multiple_ranges) {
testlog.info("empty ranges");
assert_that(make_flat_multi_range_reader(s.schema(), source, make_empty_ranges(), s.schema()->full_slice()))
assert_that(make_flat_multi_range_reader(s.schema(), tests::make_permit(), source, make_empty_ranges(), s.schema()->full_slice()))
.produces_end_of_stream()
.fast_forward_to(fft_range)
.produces(ms[9])
.produces_end_of_stream();
testlog.info("single range");
assert_that(make_flat_multi_range_reader(s.schema(), source, make_single_ranges(), s.schema()->full_slice()))
assert_that(make_flat_multi_range_reader(s.schema(), tests::make_permit(), source, make_single_ranges(), s.schema()->full_slice()))
.produces(ms[1])
.produces(ms[2])
.produces_end_of_stream()
@@ -457,7 +458,7 @@ SEASTAR_TEST_CASE(test_multi_range_reader) {
.produces_end_of_stream();
testlog.info("read full partitions and fast forward");
assert_that(make_flat_multi_range_reader(s.schema(), source, make_multiple_ranges(), s.schema()->full_slice()))
assert_that(make_flat_multi_range_reader(s.schema(), tests::make_permit(), source, make_multiple_ranges(), s.schema()->full_slice()))
.produces(ms[1])
.produces(ms[2])
.produces(ms[4])
@@ -467,7 +468,7 @@ SEASTAR_TEST_CASE(test_multi_range_reader) {
.produces_end_of_stream();
testlog.info("read, skip partitions and fast forward");
assert_that(make_flat_multi_range_reader(s.schema(), source, make_multiple_ranges(), s.schema()->full_slice()))
assert_that(make_flat_multi_range_reader(s.schema(), tests::make_permit(), source, make_multiple_ranges(), s.schema()->full_slice()))
.produces_partition_start(keys[1])
.next_partition()
.produces_partition_start(keys[2])