Merge "futurize flat_mutation_reader::next_partition" from Benny

The main motivation for this patchset is to prepare
for adding a async close() method to flat_mutation_reader.

In order to close the reader before destroying it
in all paths we need to make next_partition asynchronous
so it can asynchronously close a current reader before
destoring it, e.g. by reassignment of flat_mutation_reader_opt,
as done in scanning_reader::next_partition.

Test: unit(release, debug)

* git@github.com:bhalevy/scylla.git futurize-next-partition-v1:
  flat_mutation_reader: return future from next_partition
  multishard_mutation_query: read_context: save_reader: destroy reader_meta from the calling shard
  mutation_reader: filtering_reader: fill_buffer: futurize inner loop
  flat_mutation_reader::impl: consumer_adapter: futurize handle_result
  flat_mutation_reader: consume_pausable/in_thread: futurize_invoke consumer
  flat_mutation_reader: FlatMutationReaderConsumer: support also async consumer
  flat_mutation_reader:impl: get rid of _consume_done member
This commit is contained in:
Tomasz Grabiec
2021-01-18 20:33:48 +01:00
committed by Avi Kivity
26 changed files with 198 additions and 125 deletions

View File

@@ -163,11 +163,12 @@ public:
cache_flat_mutation_reader(const cache_flat_mutation_reader&) = delete;
cache_flat_mutation_reader(cache_flat_mutation_reader&&) = delete;
virtual future<> fill_buffer(db::timeout_clock::time_point timeout) override;
virtual void next_partition() override {
virtual future<> next_partition() override {
clear_buffer_to_next_partition();
if (is_buffer_empty()) {
_end_of_stream = true;
}
return make_ready_future<>();
}
virtual future<> fast_forward_to(const dht::partition_range&, db::timeout_clock::time_point timeout) override {
clear_buffer();

View File

@@ -275,11 +275,12 @@ future<> size_estimates_mutation_reader::fill_buffer(db::timeout_clock::time_poi
});
}
void size_estimates_mutation_reader::next_partition() {
future<> size_estimates_mutation_reader::next_partition() {
clear_buffer_to_next_partition();
if (is_buffer_empty()) {
_partition_reader = std::nullopt;
}
return make_ready_future<>();
}
future<> size_estimates_mutation_reader::fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) {

View File

@@ -48,7 +48,7 @@ public:
size_estimates_mutation_reader(database& db, schema_ptr, reader_permit, const dht::partition_range&, const query::partition_slice&, streamed_mutation::forwarding);
virtual future<> fill_buffer(db::timeout_clock::time_point) override;
virtual void next_partition() override;
virtual future<> next_partition() override;
virtual future<> fast_forward_to(const dht::partition_range&, db::timeout_clock::time_point) override;
virtual future<> fast_forward_to(position_range, db::timeout_clock::time_point) override;
private:

View File

@@ -162,12 +162,13 @@ class build_progress_virtual_reader {
});
}
virtual void next_partition() override {
virtual future<> next_partition() override {
_end_of_stream = false;
clear_buffer_to_next_partition();
if (is_buffer_empty()) {
_underlying.next_partition();
return _underlying.next_partition();
}
return make_ready_future<>();
}
virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override {

View File

@@ -168,7 +168,7 @@ flat_mutation_reader make_reversing_reader(flat_mutation_reader& original, query
});
}
virtual void next_partition() override {
virtual future<> next_partition() override {
clear_buffer_to_next_partition();
if (is_buffer_empty() && !is_end_of_stream()) {
while (!_mutation_fragments.empty()) {
@@ -177,8 +177,9 @@ flat_mutation_reader make_reversing_reader(flat_mutation_reader& original, query
}
_range_tombstones.clear();
_partition_end = std::nullopt;
_source->next_partition();
return _source->next_partition();
}
return make_ready_future<>();
}
virtual future<> fast_forward_to(const dht::partition_range&, db::timeout_clock::time_point) override {
@@ -271,17 +272,21 @@ flat_mutation_reader make_forwardable(flat_mutation_reader m) {
forward_buffer_to(_current.start());
return make_ready_future<>();
}
virtual void next_partition() override {
virtual future<> next_partition() override {
_end_of_stream = false;
auto maybe_next_partition = make_ready_future<>();
if (!_next || !_next->is_partition_start()) {
_underlying.next_partition();
maybe_next_partition = _underlying.next_partition().then([this] {
_next = {};
});
}
return maybe_next_partition.then([this] {
clear_buffer_to_next_partition();
_current = {
position_in_partition(position_in_partition::partition_start_tag_t()),
position_in_partition(position_in_partition::after_static_row_tag_t())
};
});
}
virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override {
_end_of_stream = false;
@@ -315,11 +320,12 @@ flat_mutation_reader make_nonforwardable(flat_mutation_reader r, bool single_par
_end_of_stream = true;
return make_ready_future<>();
}
_underlying.next_partition();
return _underlying.next_partition().then([this, timeout] {
_static_row_done = false;
return _underlying.fill_buffer(timeout).then([this] {
_end_of_stream = is_end_end_of_underlying_stream();
});
});
}
public:
reader(flat_mutation_reader r, bool single_partition)
@@ -340,12 +346,15 @@ flat_mutation_reader make_nonforwardable(flat_mutation_reader r, bool single_par
virtual future<> fast_forward_to(position_range pr, db::timeout_clock::time_point timeout) override {
return make_exception_future<>(make_backtraced_exception_ptr<std::bad_function_call>());
}
virtual void next_partition() override {
virtual future<> next_partition() override {
clear_buffer_to_next_partition();
auto maybe_next_partition = make_ready_future<>();;
if (is_buffer_empty()) {
_underlying.next_partition();
maybe_next_partition = _underlying.next_partition();
}
return maybe_next_partition.then([this] {
_end_of_stream = is_end_end_of_underlying_stream();
});
}
virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override {
_end_of_stream = false;
@@ -360,7 +369,7 @@ class empty_flat_reader final : public flat_mutation_reader::impl {
public:
empty_flat_reader(schema_ptr s, reader_permit permit) : impl(std::move(s), std::move(permit)) { _end_of_stream = true; }
virtual future<> fill_buffer(db::timeout_clock::time_point timeout) override { return make_ready_future<>(); }
virtual void next_partition() override {}
virtual future<> next_partition() override { return make_ready_future<>(); }
virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override { return make_ready_future<>(); };
virtual future<> fast_forward_to(position_range cr, db::timeout_clock::time_point timeout) override { return make_ready_future<>(); };
};
@@ -553,7 +562,7 @@ flat_mutation_reader_from_mutations(reader_permit permit, std::vector<mutation>
do_fill_buffer(timeout);
return make_ready_future<>();
}
virtual void next_partition() override {
virtual future<> next_partition() override {
clear_buffer_to_next_partition();
if (is_buffer_empty() && !is_end_of_stream()) {
destroy_current_mutation();
@@ -564,6 +573,7 @@ flat_mutation_reader_from_mutations(reader_permit permit, std::vector<mutation>
start_new_partition();
}
}
return make_ready_future<>();
}
virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override {
clear_buffer();
@@ -647,14 +657,15 @@ public:
virtual future<> fast_forward_to(position_range pr, db::timeout_clock::time_point timeout) override {
return make_exception_future<>(make_backtraced_exception_ptr<std::bad_function_call>());
}
virtual void next_partition() override {
virtual future<> next_partition() override {
if (!_reader) {
return;
return make_ready_future<>();
}
clear_buffer_to_next_partition();
if (is_buffer_empty() && !is_end_of_stream()) {
_reader->next_partition();
return _reader->next_partition();
}
return make_ready_future<>();
}
};
@@ -717,11 +728,12 @@ public:
return make_exception_future<>(make_backtraced_exception_ptr<std::bad_function_call>());
}
virtual void next_partition() override {
virtual future<> next_partition() override {
clear_buffer_to_next_partition();
if (is_buffer_empty() && !is_end_of_stream()) {
_reader.next_partition();
return _reader.next_partition();
}
return make_ready_future<>();
}
};
@@ -849,13 +861,14 @@ make_flat_mutation_reader_from_fragments(schema_ptr schema, reader_permit permit
}
return make_ready_future<>();
}
virtual void next_partition() override {
virtual future<> next_partition() override {
clear_buffer_to_next_partition();
if (is_buffer_empty()) {
while (!(_end_of_stream = end_of_range()) && !_fragments.front().is_partition_start()) {
_fragments.pop_front();
}
}
return make_ready_future<>();
}
virtual future<> fast_forward_to(position_range pr, db::timeout_clock::time_point timeout) override {
throw std::runtime_error("This reader can't be fast forwarded to another range.");
@@ -921,8 +934,8 @@ public:
});
});
}
virtual void next_partition() override {
throw_with_backtrace<std::bad_function_call>();
virtual future<> next_partition() override {
return make_exception_future<>(make_backtraced_exception_ptr<std::bad_function_call>());
}
virtual future<> fast_forward_to(const dht::partition_range&, db::timeout_clock::time_point) override {
return make_exception_future<>(make_backtraced_exception_ptr<std::bad_function_call>());

View File

@@ -46,6 +46,8 @@ template<typename Consumer>
concept FlatMutationReaderConsumer =
requires(Consumer c, mutation_fragment mf) {
{ c(std::move(mf)) } -> std::same_as<stop_iteration>;
} || requires(Consumer c, mutation_fragment mf) {
{ c(std::move(mf)) } -> std::same_as<future<stop_iteration>>;
};
@@ -84,7 +86,6 @@ public:
private:
tracked_buffer _buffer;
size_t _buffer_size = 0;
bool _consume_done = false;
protected:
size_t max_buffer_size_in_bytes = 8 * 1024;
bool _end_of_stream = false;
@@ -119,7 +120,7 @@ public:
impl(schema_ptr s, reader_permit permit) : _buffer(permit), _schema(std::move(s)), _permit(std::move(permit)) { }
virtual ~impl() {}
virtual future<> fill_buffer(db::timeout_clock::time_point) = 0;
virtual void next_partition() = 0;
virtual future<> next_partition() = 0;
bool is_end_of_stream() const { return _end_of_stream; }
bool is_buffer_empty() const { return _buffer.empty(); }
@@ -153,16 +154,22 @@ public:
// Stops when consumer returns stop_iteration::yes or end of stream is reached.
// Next call will start from the next mutation_fragment in the stream.
future<> consume_pausable(Consumer consumer, db::timeout_clock::time_point timeout) {
_consume_done = false;
return do_until([this] { return (is_end_of_stream() && is_buffer_empty()) || _consume_done; },
[this, consumer = std::move(consumer), timeout] () mutable {
if (is_buffer_empty()) {
return fill_buffer(timeout);
}
return do_with(std::move(consumer), [this, timeout] (Consumer& consumer) {
return repeat([this, &consumer, timeout] {
if (is_end_of_stream() && is_buffer_empty()) {
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
_consume_done = consumer(pop_mutation_fragment()) == stop_iteration::yes;
if (is_buffer_empty()) {
return fill_buffer(timeout).then([] {
return make_ready_future<stop_iteration>(stop_iteration::no);
});
}
return make_ready_future<>();
return futurize_invoke([&consumer, mf = pop_mutation_fragment()] () mutable {
return consumer(std::move(mf));
});
});
});
}
@@ -186,10 +193,16 @@ public:
}
auto mf = pop_mutation_fragment();
if (mf.is_partition_start() && !filter(mf.as_partition_start().key())) {
next_partition();
next_partition().get();
continue;
}
if (filter(mf) && (consumer(std::move(mf)) == stop_iteration::yes)) {
if (!filter(mf)) {
continue;
}
auto do_stop = futurize_invoke([&consumer, mf = std::move(mf)] () mutable {
return consumer(std::move(mf));
});
if (do_stop.get0()) {
return;
}
}
@@ -205,38 +218,42 @@ public:
: _reader(reader)
, _consumer(std::move(c))
{ }
stop_iteration operator()(mutation_fragment&& mf) {
future<stop_iteration> operator()(mutation_fragment&& mf) {
return std::move(mf).consume(*this);
}
stop_iteration consume(static_row&& sr) {
future<stop_iteration> consume(static_row&& sr) {
return handle_result(_consumer.consume(std::move(sr)));
}
stop_iteration consume(clustering_row&& cr) {
future<stop_iteration> consume(clustering_row&& cr) {
return handle_result(_consumer.consume(std::move(cr)));
}
stop_iteration consume(range_tombstone&& rt) {
future<stop_iteration> consume(range_tombstone&& rt) {
return handle_result(_consumer.consume(std::move(rt)));
}
stop_iteration consume(partition_start&& ps) {
future<stop_iteration> consume(partition_start&& ps) {
_decorated_key.emplace(std::move(ps.key()));
_consumer.consume_new_partition(*_decorated_key);
if (ps.partition_tombstone()) {
_consumer.consume(ps.partition_tombstone());
}
return stop_iteration::no;
return make_ready_future<stop_iteration>(stop_iteration::no);
}
stop_iteration consume(partition_end&& pe) {
future<stop_iteration> consume(partition_end&& pe) {
return futurize_invoke([this] {
return _consumer.consume_end_of_partition();
});
}
private:
stop_iteration handle_result(stop_iteration si) {
future<stop_iteration> handle_result(stop_iteration si) {
if (si) {
if (_consumer.consume_end_of_partition()) {
return stop_iteration::yes;
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
_reader.next_partition();
return _reader.next_partition().then([] {
return make_ready_future<stop_iteration>(stop_iteration::no);
});
}
return stop_iteration::no;
return make_ready_future<stop_iteration>(stop_iteration::no);
}
};
public:
@@ -405,7 +422,7 @@ public:
//
// Can be used to skip over entire partitions if interleaved with
// `operator()()` calls.
void next_partition() { _impl->next_partition(); }
future<> next_partition() { return _impl->next_partition(); }
future<> fill_buffer(db::timeout_clock::time_point timeout) { return _impl->fill_buffer(timeout); }
@@ -593,11 +610,12 @@ flat_mutation_reader transform(flat_mutation_reader r, T t) {
}
});
}
virtual void next_partition() override {
virtual future<> next_partition() override {
clear_buffer_to_next_partition();
if (is_buffer_empty()) {
_reader.next_partition();
return _reader.next_partition();
}
return make_ready_future<>();
}
virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override {
clear_buffer();
@@ -635,12 +653,15 @@ public:
forward_buffer_to(pr.start());
return to_reference(_underlying).fast_forward_to(std::move(pr), timeout);
}
virtual void next_partition() override {
virtual future<> next_partition() override {
clear_buffer_to_next_partition();
auto maybe_next_partition = make_ready_future<>();
if (is_buffer_empty()) {
to_reference(_underlying).next_partition();
maybe_next_partition = to_reference(_underlying).next_partition();
}
return maybe_next_partition.then([this] {
_end_of_stream = to_reference(_underlying).is_end_of_stream() && to_reference(_underlying).is_buffer_empty();
});
}
virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override {
_end_of_stream = false;

View File

@@ -92,12 +92,13 @@ class built_indexes_virtual_reader {
});
}
virtual void next_partition() override {
virtual future<> next_partition() override {
_end_of_stream = false;
clear_buffer_to_next_partition();
if (is_buffer_empty()) {
_underlying.next_partition();
return _underlying.next_partition();
}
return make_ready_future<>();
}
virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override {

View File

@@ -460,15 +460,16 @@ public:
return is_end_of_stream() ? make_ready_future<>() : fill_buffer_from_delegate(timeout);
});
}
virtual void next_partition() override {
virtual future<> next_partition() override {
clear_buffer_to_next_partition();
if (is_buffer_empty()) {
if (!_delegate_range) {
_delegate = {};
} else {
_delegate->next_partition();
return _delegate->next_partition();
}
}
return make_ready_future<>();
}
virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override {
_end_of_stream = false;
@@ -621,11 +622,12 @@ public:
});
});
}
virtual void next_partition() override {
virtual future<> next_partition() override {
clear_buffer_to_next_partition();
if (is_buffer_empty()) {
_partition_reader = std::nullopt;
}
return make_ready_future<>();
}
virtual future<> fast_forward_to(const dht::partition_range&, db::timeout_clock::time_point timeout) override {
return make_exception_future<>(make_backtraced_exception_ptr<std::bad_function_call>());

View File

@@ -457,19 +457,22 @@ read_context::dismantle_buffer_stats read_context::dismantle_compaction_state(de
}
future<> read_context::save_reader(shard_id shard, const dht::decorated_key& last_pkey, const std::optional<clustering_key_prefix>& last_ckey) {
return _db.invoke_on(shard, [this, shard, query_uuid = _cmd.query_uuid, query_ranges = _ranges, rm = std::exchange(_readers[shard], {}),
return do_with(std::exchange(_readers[shard], {}), [this, shard, &last_pkey, &last_ckey] (reader_meta& rm) mutable {
return _db.invoke_on(shard, [this, query_uuid = _cmd.query_uuid, query_ranges = _ranges, &rm,
&last_pkey, &last_ckey, gts = tracing::global_trace_state_ptr(_trace_state)] (database& db) mutable {
try {
flat_mutation_reader_opt reader = try_resume(rm.rparts->permit.semaphore(), std::move(*rm.handle));
if (!reader) {
return;
return make_ready_future<>();
}
auto maybe_next_partition = make_ready_future<>();
if (rm.has_pending_next_partition) {
reader->next_partition();
maybe_next_partition = reader->next_partition();
}
return maybe_next_partition.then([this, query_uuid, query_ranges, &rm, &last_pkey, &last_ckey, gts = std::move(gts), &db, reader = std::move(reader)] () mutable {
auto& buffer = *rm.buffer;
const auto fragments = buffer.size();
const auto size_before = reader->buffer_size();
@@ -497,11 +500,13 @@ future<> read_context::save_reader(shard_id shard, const dht::decorated_key& las
db.get_stats().multishard_query_unpopped_fragments += fragments;
db.get_stats().multishard_query_unpopped_bytes += (size_after - size_before);
});
} catch (...) {
// We don't want to fail a read just because of a failure to
// save any of the readers.
mmq_log.debug("Failed to save reader: {}", std::current_exception());
++db.get_stats().multishard_query_failed_reader_saves;
return make_ready_future<>();
}
}).handle_exception([this, shard] (std::exception_ptr e) {
// We don't want to fail a read just because of a failure to
@@ -511,6 +516,7 @@ future<> read_context::save_reader(shard_id shard, const dht::decorated_key& las
// know where exactly the failure happened anyway.
++_db.local().get_stats().multishard_query_failed_reader_saves;
});
});
}
future<> read_context::lookup_readers() {

View File

@@ -26,6 +26,7 @@
#include "mutation_reader.hh"
#include <seastar/core/future-util.hh>
#include <seastar/core/coroutine.hh>
#include "flat_mutation_reader.hh"
#include "schema_registry.hh"
#include "mutation_compactor.hh"
@@ -49,7 +50,7 @@ concept FragmentProducer = requires(Producer p, dht::partition_range part_range,
// The following functions have the same semantics as their
// flat_mutation_reader counterparts.
{ p.next_partition() };
{ p.next_partition() } -> std::same_as<future<>>;
{ p.fast_forward_to(part_range, timeout) } -> std::same_as<future<>>;
{ p.fast_forward_to(pos_range, timeout) } -> std::same_as<future<>>;
};
@@ -131,8 +132,8 @@ public:
});
}
void next_partition() {
_producer.next_partition();
future<> next_partition() {
return _producer.next_partition();
}
future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) {
@@ -240,7 +241,7 @@ public:
// Produces the next batch of mutation-fragments of the same
// position.
future<mutation_fragment_batch> operator()(db::timeout_clock::time_point timeout);
void next_partition();
future<> next_partition();
future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout);
future<> fast_forward_to(position_range pr, db::timeout_clock::time_point timeout);
};
@@ -267,7 +268,7 @@ public:
, _fwd_sm(fwd_sm) {}
virtual future<> fill_buffer(db::timeout_clock::time_point timeout) override;
virtual void next_partition() override;
virtual future<> next_partition() override;
virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override;
virtual future<> fast_forward_to(position_range pr, db::timeout_clock::time_point timeout) override;
};
@@ -542,7 +543,7 @@ future<mutation_fragment_batch> mutation_reader_merger::operator()(db::timeout_c
return make_ready_future<mutation_fragment_batch>(_current);
}
void mutation_reader_merger::next_partition() {
future<> mutation_reader_merger::next_partition() {
// If the last batch of fragments returned by operator() came from partition P,
// we must forward to the partition immediately following P (as per the `next_partition`
// contract in `flat_mutation_reader`).
@@ -558,7 +559,7 @@ void mutation_reader_merger::next_partition() {
prepare_forwardable_readers();
for (auto& rk : _next) {
rk.last_kind = mutation_fragment::kind::partition_end;
rk.reader->next_partition();
co_await rk.reader->next_partition();
}
}
@@ -605,11 +606,11 @@ future<> merging_reader<P>::fill_buffer(db::timeout_clock::time_point timeout) {
}
template <FragmentProducer P>
void merging_reader<P>::next_partition() {
future<> merging_reader<P>::next_partition() {
if (_fwd_sm == streamed_mutation::forwarding::yes) {
clear_buffer();
_end_of_stream = false;
_merger.next_partition();
return _merger.next_partition();
} else {
clear_buffer_to_next_partition();
// If the buffer is empty at this point then all fragments in it
@@ -619,9 +620,10 @@ void merging_reader<P>::next_partition() {
// Thus we need to call next_partition on it (see the `next_partition` contract
// of `flat_mutation_reader`, which `FragmentProducer` follows).
if (is_buffer_empty()) {
_merger.next_partition();
return _merger.next_partition();
}
}
return make_ready_future<>();
}
template <FragmentProducer P>
@@ -749,15 +751,16 @@ public:
});
}, timeout);
}
virtual void next_partition() override {
virtual future<> next_partition() override {
clear_buffer_to_next_partition();
if (!is_buffer_empty()) {
return;
return make_ready_future<>();
}
_end_of_stream = false;
if (auto* state = std::get_if<admitted_state>(&_state)) {
return state->reader.next_partition();
}
return make_ready_future<>();
}
virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override {
clear_buffer();
@@ -875,15 +878,18 @@ class foreign_reader : public flat_mutation_reader::impl {
timeout,
op = std::move(op)] () mutable {
auto exec_op_and_read_ahead = [=] () mutable {
auto maybe_next_partition = make_ready_future<>();
if (pending_next_partition) {
reader->next_partition();
maybe_next_partition = reader->next_partition();
}
return maybe_next_partition.then([=] () mutable {
// Not really variadic, we expect 0 (void) or 1 parameter.
return op().then([=] (auto... result) {
auto f = reader->is_end_of_stream() ? nullptr : std::make_unique<future<>>(reader->fill_buffer(timeout));
return make_ready_future<std::tuple<foreign_unique_ptr<future<>>, decltype(result)...>>(
std::tuple(make_foreign(std::move(f)), std::move(result)...));
});
});
};
if (read_ahead_future) {
return read_ahead_future->then(std::move(exec_op_and_read_ahead));
@@ -916,7 +922,7 @@ public:
foreign_reader& operator=(foreign_reader&&) = delete;
virtual future<> fill_buffer(db::timeout_clock::time_point timeout) override;
virtual void next_partition() override;
virtual future<> next_partition() override;
virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override;
virtual future<> fast_forward_to(position_range pr, db::timeout_clock::time_point timeout) override;
};
@@ -963,7 +969,7 @@ future<> foreign_reader::fill_buffer(db::timeout_clock::time_point timeout) {
});
}
void foreign_reader::next_partition() {
future<> foreign_reader::next_partition() {
if (_fwd_sm == streamed_mutation::forwarding::yes) {
clear_buffer();
_end_of_stream = false;
@@ -975,6 +981,7 @@ void foreign_reader::next_partition() {
_pending_next_partition = true;
}
}
return make_ready_future<>();
}
future<> foreign_reader::fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) {
@@ -1087,7 +1094,7 @@ public:
mutation_reader::forwarding fwd_mr);
~evictable_reader();
virtual future<> fill_buffer(db::timeout_clock::time_point timeout) override;
virtual void next_partition() override;
virtual future<> next_partition() override;
virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override;
virtual future<> fast_forward_to(position_range, db::timeout_clock::time_point timeout) override {
throw_with_backtrace<std::bad_function_call>();
@@ -1478,23 +1485,26 @@ future<> evictable_reader::fill_buffer(db::timeout_clock::time_point timeout) {
}
return do_with(resume_or_create_reader(),
[this, pending_next_partition, timeout] (flat_mutation_reader& reader) mutable {
auto maybe_next_partition = make_ready_future<>();
if (pending_next_partition) {
reader.next_partition();
maybe_next_partition = reader.next_partition();
}
return maybe_next_partition.then([this, timeout, &reader] {
return fill_buffer(reader, timeout).then([this, &reader] {
_end_of_stream = reader.is_end_of_stream() && reader.is_buffer_empty();
maybe_pause(std::move(reader));
});
});
});
}
void evictable_reader::next_partition() {
future<> evictable_reader::next_partition() {
clear_buffer_to_next_partition();
if (is_buffer_empty()) {
_pending_next_partition = true;
_next_position_in_partition = position_in_partition::for_partition_start();
}
return make_ready_future<>();
}
future<> evictable_reader::fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) {
@@ -1615,7 +1625,7 @@ public:
return buffer().front();
}
virtual future<> fill_buffer(db::timeout_clock::time_point timeout) override;
virtual void next_partition() override;
virtual future<> next_partition() override;
virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override;
virtual future<> fast_forward_to(position_range, db::timeout_clock::time_point timeout) override;
bool done() const {
@@ -1694,12 +1704,15 @@ future<> shard_reader::do_fill_buffer(db::timeout_clock::time_point timeout) {
});
} else {
fill_buf_fut = smp::submit_to(_shard, [this, pending_next_partition, timeout] () mutable {
auto maybe_next_partition = make_ready_future<>();
if (pending_next_partition) {
_reader->next_partition();
maybe_next_partition = _reader->next_partition();
}
return maybe_next_partition.then([this, timeout] {
return _reader->fill_buffer(timeout).then([this] {
return remote_fill_buffer_result(_reader->detach_buffer(), _reader->is_end_of_stream());
});
});
});
}
@@ -1721,12 +1734,12 @@ future<> shard_reader::fill_buffer(db::timeout_clock::time_point timeout) {
return do_fill_buffer(timeout);
}
void shard_reader::next_partition() {
if (!_reader) {
return;
}
future<> shard_reader::next_partition() {
if (_reader) {
clear_buffer_to_next_partition();
_pending_next_partition = is_buffer_empty();
}
return make_ready_future<>();
}
future<> shard_reader::fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) {
@@ -1810,7 +1823,7 @@ public:
multishard_combining_reader& operator=(multishard_combining_reader&&) = delete;
virtual future<> fill_buffer(db::timeout_clock::time_point timeout) override;
virtual void next_partition() override;
virtual future<> next_partition() override;
virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override;
virtual future<> fast_forward_to(position_range pr, db::timeout_clock::time_point timeout) override;
};
@@ -1930,11 +1943,12 @@ future<> multishard_combining_reader::fill_buffer(db::timeout_clock::time_point
});
}
void multishard_combining_reader::next_partition() {
future<> multishard_combining_reader::next_partition() {
clear_buffer_to_next_partition();
if (is_buffer_empty()) {
_shard_readers[_current_shard]->next_partition();
return _shard_readers[_current_shard]->next_partition();
}
return make_ready_future<>();
}
future<> multishard_combining_reader::fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) {
@@ -2044,8 +2058,8 @@ public:
_full.emplace();
return _full->get_future();
}
virtual void next_partition() override {
throw_with_backtrace<std::bad_function_call>();
virtual future<> next_partition() override {
return make_exception_future<>(make_backtraced_exception_ptr<std::bad_function_call>());
}
virtual future<> fast_forward_to(const dht::partition_range&, db::timeout_clock::time_point) override {
return make_exception_future<>(make_backtraced_exception_ptr<std::bad_function_call>());
@@ -2268,14 +2282,14 @@ public:
});
});
}
virtual void next_partition() override {
virtual future<> next_partition() override {
clear_buffer_to_next_partition();
if (!is_buffer_empty()) {
return;
return make_ready_future<>();
}
_end_of_stream = false;
maybe_inject_partition_end();
_reader.next_partition();
return _reader.next_partition();
}
virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override {
clear_buffer();
@@ -2642,7 +2656,7 @@ public:
return make_ready_future<mutation_fragment_batch>(_current_batch);
}
void next_partition() {
future<> next_partition() {
throw std::runtime_error(
"clustering_order_reader_merger::next_partition: this reader works only for single partition queries");
}

View File

@@ -87,27 +87,29 @@ public:
virtual future<> fill_buffer(db::timeout_clock::time_point timeout) override {
return do_until([this] { return is_buffer_full() || is_end_of_stream(); }, [this, timeout] {
return _rd.fill_buffer(timeout).then([this] {
while (!_rd.is_buffer_empty()) {
return do_until([this] { return _rd.is_buffer_empty(); }, [this] {
auto mf = _rd.pop_mutation_fragment();
if (mf.is_partition_start()) {
auto& dk = mf.as_partition_start().key();
if (!_filter(dk)) {
_rd.next_partition();
continue;
return _rd.next_partition();
}
}
push_mutation_fragment(std::move(mf));
}
_end_of_stream = _rd.is_end_of_stream();
return make_ready_future<>();
}).then([this] {
_end_of_stream = _rd.is_end_of_stream();
});
});
});
}
virtual void next_partition() override {
virtual future<> next_partition() override {
clear_buffer_to_next_partition();
if (is_buffer_empty()) {
_end_of_stream = false;
_rd.next_partition();
return _rd.next_partition();
}
return make_ready_future<>();
}
virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override {
clear_buffer();

View File

@@ -304,11 +304,12 @@ public:
return make_ready_future<>();
});
}
virtual void next_partition() override {
virtual future<> next_partition() override {
clear_buffer_to_next_partition();
if (is_buffer_empty()) {
_end_of_stream = true;
}
return make_ready_future<>();
}
virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override {
throw std::runtime_error("This reader can't be fast forwarded to another partition.");

View File

@@ -70,8 +70,8 @@ public:
_reader = _cache.create_underlying_reader(_read_context, snap, _range);
_reader_creation_phase = phase;
}
_reader->next_partition();
return _reader->next_partition().then([this, timeout] {
if (_reader->is_end_of_stream() && _reader->is_buffer_empty()) {
return make_ready_future<mutation_fragment_opt>();
}
@@ -82,6 +82,7 @@ public:
}
return std::move(mfopt);
});
});
}
future<> fast_forward_to(dht::partition_range&& range, db::timeout_clock::time_point timeout) {
auto snapshot_and_phase = _cache.snapshot_of(dht::ring_position_view::for_range_start(_range));

View File

@@ -410,11 +410,12 @@ public:
});
});
}
virtual void next_partition() override {
virtual future<> next_partition() override {
if (_reader) {
clear_buffer();
_end_of_stream = true;
}
return make_ready_future<>();
}
virtual future<> fast_forward_to(const dht::partition_range&, db::timeout_clock::time_point timeout) override {
clear_buffer();
@@ -675,11 +676,12 @@ public:
}
});
}
virtual void next_partition() override {
virtual future<> next_partition() override {
clear_buffer_to_next_partition();
if (_reader && is_buffer_empty()) {
_reader->next_partition();
return _reader->next_partition();
}
return make_ready_future<>();
}
virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override {
clear_buffer();

View File

@@ -1247,8 +1247,8 @@ class scrub_compaction final : public regular_compaction {
}
});
}
virtual void next_partition() override {
throw_with_backtrace<std::bad_function_call>();
virtual future<> next_partition() override {
return make_exception_future<>(make_backtraced_exception_ptr<std::bad_function_call>());
}
virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override {
return make_exception_future<>(make_backtraced_exception_ptr<std::bad_function_call>());

View File

@@ -440,7 +440,7 @@ public:
}
});
}
virtual void next_partition() override {
virtual future<> next_partition() override {
if (is_initialized()) {
if (_fwd == streamed_mutation::forwarding::yes) {
clear_buffer();
@@ -453,6 +453,7 @@ public:
}
}
}
return make_ready_future<>();
// If _ds is not created then next_partition() has no effect because there was no partition_start emitted yet.
}
virtual future<> fast_forward_to(position_range cr, db::timeout_clock::time_point timeout) override {

View File

@@ -334,7 +334,7 @@ SEASTAR_THREAD_TEST_CASE(test_flat_mutation_reader_move_buffer_content_to) {
struct dummy_reader_impl : public flat_mutation_reader::impl {
using flat_mutation_reader::impl::impl;
virtual future<> fill_buffer(db::timeout_clock::time_point) override { return make_ready_future<>(); }
virtual void next_partition() { }
virtual future<> next_partition() { return make_ready_future<>(); }
virtual future<> fast_forward_to(const dht::partition_range&, db::timeout_clock::time_point) override { return make_ready_future<>(); }
virtual future<> fast_forward_to(position_range, db::timeout_clock::time_point) override { return make_ready_future<>(); }
};

View File

@@ -933,12 +933,13 @@ public:
});
}
virtual void next_partition() override {
virtual future<> next_partition() override {
_end_of_stream = false;
clear_buffer_to_next_partition();
if (is_buffer_empty()) {
_reader.next_partition();
return _reader.next_partition();
}
return make_ready_future<>();
}
virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override {
@@ -2084,7 +2085,7 @@ public:
}
abort();
}
virtual void next_partition() override { }
virtual future<> next_partition() override { return make_ready_future<>(); }
virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point) override {
++_ctrl.fast_forward_to;
clear_buffer();
@@ -2983,13 +2984,13 @@ SEASTAR_THREAD_TEST_CASE(test_manual_paused_evictable_reader_is_mutation_source)
maybe_pause();
});
}
virtual void next_partition() override {
virtual future<> next_partition() override {
clear_buffer_to_next_partition();
if (!is_buffer_empty()) {
return;
return make_ready_future<>();
}
_end_of_stream = false;
_reader.next_partition();
return _reader.next_partition();
}
virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override {
clear_buffer();
@@ -3786,7 +3787,7 @@ SEASTAR_THREAD_TEST_CASE(clustering_combined_reader_mutation_source_test) {
}
}
virtual void next_partition() override {
virtual future<> next_partition() override {
clear_buffer_to_next_partition();
_end_of_stream = false;
if (is_buffer_empty()) {
@@ -3803,6 +3804,7 @@ SEASTAR_THREAD_TEST_CASE(clustering_combined_reader_mutation_source_test) {
// either no previously fetched fragment or must have come from before _it. Nothing to do
}
}
return make_ready_future<>();
}
virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override {

View File

@@ -195,10 +195,10 @@ public:
}
return delegating_reader<flat_mutation_reader>::fill_buffer(timeout);
}
virtual void next_partition() override {
virtual future<> next_partition() override {
_count_fill_buffer = false;
++_counter;
delegating_reader<flat_mutation_reader>::next_partition();
return delegating_reader<flat_mutation_reader>::next_partition();
}
};

View File

@@ -2504,7 +2504,8 @@ SEASTAR_TEST_CASE(sstable_rewrite) {
BOOST_REQUIRE(m->is_partition_start());
auto pkey = partition_key::from_exploded(*s, {to_bytes(key)});
BOOST_REQUIRE(m->as_partition_start().key().key().equal(*s, pkey));
reader->next_partition();
return reader->next_partition();
}).then([reader] {
return (*reader)(db::no_timeout);
}).then([reader] (mutation_fragment_opt m) {
BOOST_REQUIRE(!m);

View File

@@ -358,9 +358,10 @@ future<> test_range_reads(sstables::test_env& env, const dht::token& min, const
BOOST_REQUIRE(std::vector<bytes>({expected.back()}) == mfopt->as_partition_start().key().key().explode());
expected.pop_back();
(*count)++;
mutations->next_partition();
return mutations->next_partition();
} else {
*stop = true;
return make_ready_future<>();
}
});
}).then([count, expected_size] {

View File

@@ -421,7 +421,7 @@ public:
flat_reader_assertions& next_partition() {
testlog.trace("Skip to next partition");
_reader.next_partition();
_reader.next_partition().get();
return *this;
}

View File

@@ -70,13 +70,14 @@ future<> normalizing_reader::fill_buffer(db::timeout_clock::time_point timeout)
});
}
void normalizing_reader::next_partition() {
future<> normalizing_reader::next_partition() {
_range_tombstones.reset();
clear_buffer_to_next_partition();
if (is_buffer_empty()) {
_end_of_stream = false;
_rd.next_partition();
return _rd.next_partition();
}
return make_ready_future<>();
}
future<> normalizing_reader::fast_forward_to(
const dht::partition_range& pr, db::timeout_clock::time_point timeout) {

View File

@@ -41,7 +41,7 @@ public:
virtual future<> fill_buffer(db::timeout_clock::time_point timeout) override;
virtual void next_partition() override;
virtual future<> next_partition() override;
virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override;

View File

@@ -118,9 +118,10 @@ public:
});
}
virtual void next_partition() override {
virtual future<> next_partition() override {
clear_buffer();
_end_of_stream = true;
return make_ready_future<>();
}
virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override {

View File

@@ -733,7 +733,7 @@ uint64_t consume_all_with_next_partition(flat_mutation_reader& rd) {
uint64_t fragments = 0;
do {
fragments += consume_all(rd);
rd.next_partition();
rd.next_partition().get();
rd.fill_buffer(db::no_timeout).get();
} while(!rd.is_end_of_stream() || !rd.is_buffer_empty());
return fragments;