diff --git a/cache_flat_mutation_reader.hh b/cache_flat_mutation_reader.hh index 828c1b73b1..647d2ae1ba 100644 --- a/cache_flat_mutation_reader.hh +++ b/cache_flat_mutation_reader.hh @@ -163,11 +163,12 @@ public: cache_flat_mutation_reader(const cache_flat_mutation_reader&) = delete; cache_flat_mutation_reader(cache_flat_mutation_reader&&) = delete; virtual future<> fill_buffer(db::timeout_clock::time_point timeout) override; - virtual void next_partition() override { + virtual future<> next_partition() override { clear_buffer_to_next_partition(); if (is_buffer_empty()) { _end_of_stream = true; } + return make_ready_future<>(); } virtual future<> fast_forward_to(const dht::partition_range&, db::timeout_clock::time_point timeout) override { clear_buffer(); diff --git a/db/size_estimates_virtual_reader.cc b/db/size_estimates_virtual_reader.cc index 366fd20e46..0d4d78c1ed 100644 --- a/db/size_estimates_virtual_reader.cc +++ b/db/size_estimates_virtual_reader.cc @@ -275,11 +275,12 @@ future<> size_estimates_mutation_reader::fill_buffer(db::timeout_clock::time_poi }); } -void size_estimates_mutation_reader::next_partition() { +future<> size_estimates_mutation_reader::next_partition() { clear_buffer_to_next_partition(); if (is_buffer_empty()) { _partition_reader = std::nullopt; } + return make_ready_future<>(); } future<> size_estimates_mutation_reader::fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) { diff --git a/db/size_estimates_virtual_reader.hh b/db/size_estimates_virtual_reader.hh index 4e2b15709d..0656f7934b 100644 --- a/db/size_estimates_virtual_reader.hh +++ b/db/size_estimates_virtual_reader.hh @@ -48,7 +48,7 @@ public: size_estimates_mutation_reader(database& db, schema_ptr, reader_permit, const dht::partition_range&, const query::partition_slice&, streamed_mutation::forwarding); virtual future<> fill_buffer(db::timeout_clock::time_point) override; - virtual void next_partition() override; + virtual future<> next_partition() override; virtual future<> fast_forward_to(const dht::partition_range&, db::timeout_clock::time_point) override; virtual future<> fast_forward_to(position_range, db::timeout_clock::time_point) override; private: diff --git a/db/view/build_progress_virtual_reader.hh b/db/view/build_progress_virtual_reader.hh index 0260d346bc..1fce9bf3c5 100644 --- a/db/view/build_progress_virtual_reader.hh +++ b/db/view/build_progress_virtual_reader.hh @@ -162,12 +162,13 @@ class build_progress_virtual_reader { }); } - virtual void next_partition() override { + virtual future<> next_partition() override { _end_of_stream = false; clear_buffer_to_next_partition(); if (is_buffer_empty()) { - _underlying.next_partition(); + return _underlying.next_partition(); } + return make_ready_future<>(); } virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override { diff --git a/flat_mutation_reader.cc b/flat_mutation_reader.cc index 5ec5b03b5f..81318455ab 100644 --- a/flat_mutation_reader.cc +++ b/flat_mutation_reader.cc @@ -168,7 +168,7 @@ flat_mutation_reader make_reversing_reader(flat_mutation_reader& original, query }); } - virtual void next_partition() override { + virtual future<> next_partition() override { clear_buffer_to_next_partition(); if (is_buffer_empty() && !is_end_of_stream()) { while (!_mutation_fragments.empty()) { @@ -177,8 +177,9 @@ flat_mutation_reader make_reversing_reader(flat_mutation_reader& original, query } _range_tombstones.clear(); _partition_end = std::nullopt; - _source->next_partition(); + return _source->next_partition(); } + return make_ready_future<>(); } virtual future<> fast_forward_to(const dht::partition_range&, db::timeout_clock::time_point) override { @@ -271,17 +272,21 @@ flat_mutation_reader make_forwardable(flat_mutation_reader m) { forward_buffer_to(_current.start()); return make_ready_future<>(); } - virtual void next_partition() override { + virtual future<> next_partition() override { _end_of_stream = false; + auto maybe_next_partition = make_ready_future<>(); if (!_next || !_next->is_partition_start()) { - _underlying.next_partition(); + maybe_next_partition = _underlying.next_partition().then([this] { _next = {}; + }); } + return maybe_next_partition.then([this] { clear_buffer_to_next_partition(); _current = { position_in_partition(position_in_partition::partition_start_tag_t()), position_in_partition(position_in_partition::after_static_row_tag_t()) }; + }); } virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override { _end_of_stream = false; @@ -315,11 +320,12 @@ flat_mutation_reader make_nonforwardable(flat_mutation_reader r, bool single_par _end_of_stream = true; return make_ready_future<>(); } - _underlying.next_partition(); + return _underlying.next_partition().then([this, timeout] { _static_row_done = false; return _underlying.fill_buffer(timeout).then([this] { _end_of_stream = is_end_end_of_underlying_stream(); }); + }); } public: reader(flat_mutation_reader r, bool single_partition) @@ -340,12 +346,15 @@ flat_mutation_reader make_nonforwardable(flat_mutation_reader r, bool single_par virtual future<> fast_forward_to(position_range pr, db::timeout_clock::time_point timeout) override { return make_exception_future<>(make_backtraced_exception_ptr()); } - virtual void next_partition() override { + virtual future<> next_partition() override { clear_buffer_to_next_partition(); + auto maybe_next_partition = make_ready_future<>();; if (is_buffer_empty()) { - _underlying.next_partition(); + maybe_next_partition = _underlying.next_partition(); } + return maybe_next_partition.then([this] { _end_of_stream = is_end_end_of_underlying_stream(); + }); } virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override { _end_of_stream = false; @@ -360,7 +369,7 @@ class empty_flat_reader final : public flat_mutation_reader::impl { public: empty_flat_reader(schema_ptr s, reader_permit permit) : impl(std::move(s), std::move(permit)) { _end_of_stream = true; } virtual future<> fill_buffer(db::timeout_clock::time_point timeout) override { return make_ready_future<>(); } - virtual void next_partition() override {} + virtual future<> next_partition() override { return make_ready_future<>(); } virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override { return make_ready_future<>(); }; virtual future<> fast_forward_to(position_range cr, db::timeout_clock::time_point timeout) override { return make_ready_future<>(); }; }; @@ -553,7 +562,7 @@ flat_mutation_reader_from_mutations(reader_permit permit, std::vector do_fill_buffer(timeout); return make_ready_future<>(); } - virtual void next_partition() override { + virtual future<> next_partition() override { clear_buffer_to_next_partition(); if (is_buffer_empty() && !is_end_of_stream()) { destroy_current_mutation(); @@ -564,6 +573,7 @@ flat_mutation_reader_from_mutations(reader_permit permit, std::vector start_new_partition(); } } + return make_ready_future<>(); } virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override { clear_buffer(); @@ -647,14 +657,15 @@ public: virtual future<> fast_forward_to(position_range pr, db::timeout_clock::time_point timeout) override { return make_exception_future<>(make_backtraced_exception_ptr()); } - virtual void next_partition() override { + virtual future<> next_partition() override { if (!_reader) { - return; + return make_ready_future<>(); } clear_buffer_to_next_partition(); if (is_buffer_empty() && !is_end_of_stream()) { - _reader->next_partition(); + return _reader->next_partition(); } + return make_ready_future<>(); } }; @@ -717,11 +728,12 @@ public: return make_exception_future<>(make_backtraced_exception_ptr()); } - virtual void next_partition() override { + virtual future<> next_partition() override { clear_buffer_to_next_partition(); if (is_buffer_empty() && !is_end_of_stream()) { - _reader.next_partition(); + return _reader.next_partition(); } + return make_ready_future<>(); } }; @@ -849,13 +861,14 @@ make_flat_mutation_reader_from_fragments(schema_ptr schema, reader_permit permit } return make_ready_future<>(); } - virtual void next_partition() override { + virtual future<> next_partition() override { clear_buffer_to_next_partition(); if (is_buffer_empty()) { while (!(_end_of_stream = end_of_range()) && !_fragments.front().is_partition_start()) { _fragments.pop_front(); } } + return make_ready_future<>(); } virtual future<> fast_forward_to(position_range pr, db::timeout_clock::time_point timeout) override { throw std::runtime_error("This reader can't be fast forwarded to another range."); @@ -921,8 +934,8 @@ public: }); }); } - virtual void next_partition() override { - throw_with_backtrace(); + virtual future<> next_partition() override { + return make_exception_future<>(make_backtraced_exception_ptr()); } virtual future<> fast_forward_to(const dht::partition_range&, db::timeout_clock::time_point) override { return make_exception_future<>(make_backtraced_exception_ptr()); diff --git a/flat_mutation_reader.hh b/flat_mutation_reader.hh index 7a73d53556..3d23ce0b65 100644 --- a/flat_mutation_reader.hh +++ b/flat_mutation_reader.hh @@ -46,6 +46,8 @@ template concept FlatMutationReaderConsumer = requires(Consumer c, mutation_fragment mf) { { c(std::move(mf)) } -> std::same_as; + } || requires(Consumer c, mutation_fragment mf) { + { c(std::move(mf)) } -> std::same_as>; }; @@ -84,7 +86,6 @@ public: private: tracked_buffer _buffer; size_t _buffer_size = 0; - bool _consume_done = false; protected: size_t max_buffer_size_in_bytes = 8 * 1024; bool _end_of_stream = false; @@ -119,7 +120,7 @@ public: impl(schema_ptr s, reader_permit permit) : _buffer(permit), _schema(std::move(s)), _permit(std::move(permit)) { } virtual ~impl() {} virtual future<> fill_buffer(db::timeout_clock::time_point) = 0; - virtual void next_partition() = 0; + virtual future<> next_partition() = 0; bool is_end_of_stream() const { return _end_of_stream; } bool is_buffer_empty() const { return _buffer.empty(); } @@ -153,16 +154,22 @@ public: // Stops when consumer returns stop_iteration::yes or end of stream is reached. // Next call will start from the next mutation_fragment in the stream. future<> consume_pausable(Consumer consumer, db::timeout_clock::time_point timeout) { - _consume_done = false; - return do_until([this] { return (is_end_of_stream() && is_buffer_empty()) || _consume_done; }, - [this, consumer = std::move(consumer), timeout] () mutable { - if (is_buffer_empty()) { - return fill_buffer(timeout); - } + return do_with(std::move(consumer), [this, timeout] (Consumer& consumer) { + return repeat([this, &consumer, timeout] { + if (is_end_of_stream() && is_buffer_empty()) { + return make_ready_future(stop_iteration::yes); + } - _consume_done = consumer(pop_mutation_fragment()) == stop_iteration::yes; + if (is_buffer_empty()) { + return fill_buffer(timeout).then([] { + return make_ready_future(stop_iteration::no); + }); + } - return make_ready_future<>(); + return futurize_invoke([&consumer, mf = pop_mutation_fragment()] () mutable { + return consumer(std::move(mf)); + }); + }); }); } @@ -186,10 +193,16 @@ public: } auto mf = pop_mutation_fragment(); if (mf.is_partition_start() && !filter(mf.as_partition_start().key())) { - next_partition(); + next_partition().get(); continue; } - if (filter(mf) && (consumer(std::move(mf)) == stop_iteration::yes)) { + if (!filter(mf)) { + continue; + } + auto do_stop = futurize_invoke([&consumer, mf = std::move(mf)] () mutable { + return consumer(std::move(mf)); + }); + if (do_stop.get0()) { return; } } @@ -205,38 +218,42 @@ public: : _reader(reader) , _consumer(std::move(c)) { } - stop_iteration operator()(mutation_fragment&& mf) { + future operator()(mutation_fragment&& mf) { return std::move(mf).consume(*this); } - stop_iteration consume(static_row&& sr) { + future consume(static_row&& sr) { return handle_result(_consumer.consume(std::move(sr))); } - stop_iteration consume(clustering_row&& cr) { + future consume(clustering_row&& cr) { return handle_result(_consumer.consume(std::move(cr))); } - stop_iteration consume(range_tombstone&& rt) { + future consume(range_tombstone&& rt) { return handle_result(_consumer.consume(std::move(rt))); } - stop_iteration consume(partition_start&& ps) { + future consume(partition_start&& ps) { _decorated_key.emplace(std::move(ps.key())); _consumer.consume_new_partition(*_decorated_key); if (ps.partition_tombstone()) { _consumer.consume(ps.partition_tombstone()); } - return stop_iteration::no; + return make_ready_future(stop_iteration::no); } - stop_iteration consume(partition_end&& pe) { + future consume(partition_end&& pe) { + return futurize_invoke([this] { return _consumer.consume_end_of_partition(); + }); } private: - stop_iteration handle_result(stop_iteration si) { + future handle_result(stop_iteration si) { if (si) { if (_consumer.consume_end_of_partition()) { - return stop_iteration::yes; + return make_ready_future(stop_iteration::yes); } - _reader.next_partition(); + return _reader.next_partition().then([] { + return make_ready_future(stop_iteration::no); + }); } - return stop_iteration::no; + return make_ready_future(stop_iteration::no); } }; public: @@ -405,7 +422,7 @@ public: // // Can be used to skip over entire partitions if interleaved with // `operator()()` calls. - void next_partition() { _impl->next_partition(); } + future<> next_partition() { return _impl->next_partition(); } future<> fill_buffer(db::timeout_clock::time_point timeout) { return _impl->fill_buffer(timeout); } @@ -593,11 +610,12 @@ flat_mutation_reader transform(flat_mutation_reader r, T t) { } }); } - virtual void next_partition() override { + virtual future<> next_partition() override { clear_buffer_to_next_partition(); if (is_buffer_empty()) { - _reader.next_partition(); + return _reader.next_partition(); } + return make_ready_future<>(); } virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override { clear_buffer(); @@ -635,12 +653,15 @@ public: forward_buffer_to(pr.start()); return to_reference(_underlying).fast_forward_to(std::move(pr), timeout); } - virtual void next_partition() override { + virtual future<> next_partition() override { clear_buffer_to_next_partition(); + auto maybe_next_partition = make_ready_future<>(); if (is_buffer_empty()) { - to_reference(_underlying).next_partition(); + maybe_next_partition = to_reference(_underlying).next_partition(); } + return maybe_next_partition.then([this] { _end_of_stream = to_reference(_underlying).is_end_of_stream() && to_reference(_underlying).is_buffer_empty(); + }); } virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override { _end_of_stream = false; diff --git a/index/built_indexes_virtual_reader.hh b/index/built_indexes_virtual_reader.hh index 09e8c8bfd6..340ddb8929 100644 --- a/index/built_indexes_virtual_reader.hh +++ b/index/built_indexes_virtual_reader.hh @@ -92,12 +92,13 @@ class built_indexes_virtual_reader { }); } - virtual void next_partition() override { + virtual future<> next_partition() override { _end_of_stream = false; clear_buffer_to_next_partition(); if (is_buffer_empty()) { - _underlying.next_partition(); + return _underlying.next_partition(); } + return make_ready_future<>(); } virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override { diff --git a/memtable.cc b/memtable.cc index bf454a52b1..04787cc656 100644 --- a/memtable.cc +++ b/memtable.cc @@ -460,15 +460,16 @@ public: return is_end_of_stream() ? make_ready_future<>() : fill_buffer_from_delegate(timeout); }); } - virtual void next_partition() override { + virtual future<> next_partition() override { clear_buffer_to_next_partition(); if (is_buffer_empty()) { if (!_delegate_range) { _delegate = {}; } else { - _delegate->next_partition(); + return _delegate->next_partition(); } } + return make_ready_future<>(); } virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override { _end_of_stream = false; @@ -621,11 +622,12 @@ public: }); }); } - virtual void next_partition() override { + virtual future<> next_partition() override { clear_buffer_to_next_partition(); if (is_buffer_empty()) { _partition_reader = std::nullopt; } + return make_ready_future<>(); } virtual future<> fast_forward_to(const dht::partition_range&, db::timeout_clock::time_point timeout) override { return make_exception_future<>(make_backtraced_exception_ptr()); diff --git a/multishard_mutation_query.cc b/multishard_mutation_query.cc index e1f54cb52a..cb35f015c2 100644 --- a/multishard_mutation_query.cc +++ b/multishard_mutation_query.cc @@ -457,19 +457,22 @@ read_context::dismantle_buffer_stats read_context::dismantle_compaction_state(de } future<> read_context::save_reader(shard_id shard, const dht::decorated_key& last_pkey, const std::optional& last_ckey) { - return _db.invoke_on(shard, [this, shard, query_uuid = _cmd.query_uuid, query_ranges = _ranges, rm = std::exchange(_readers[shard], {}), + return do_with(std::exchange(_readers[shard], {}), [this, shard, &last_pkey, &last_ckey] (reader_meta& rm) mutable { + return _db.invoke_on(shard, [this, query_uuid = _cmd.query_uuid, query_ranges = _ranges, &rm, &last_pkey, &last_ckey, gts = tracing::global_trace_state_ptr(_trace_state)] (database& db) mutable { try { flat_mutation_reader_opt reader = try_resume(rm.rparts->permit.semaphore(), std::move(*rm.handle)); if (!reader) { - return; + return make_ready_future<>(); } + auto maybe_next_partition = make_ready_future<>(); if (rm.has_pending_next_partition) { - reader->next_partition(); + maybe_next_partition = reader->next_partition(); } + return maybe_next_partition.then([this, query_uuid, query_ranges, &rm, &last_pkey, &last_ckey, gts = std::move(gts), &db, reader = std::move(reader)] () mutable { auto& buffer = *rm.buffer; const auto fragments = buffer.size(); const auto size_before = reader->buffer_size(); @@ -497,11 +500,13 @@ future<> read_context::save_reader(shard_id shard, const dht::decorated_key& las db.get_stats().multishard_query_unpopped_fragments += fragments; db.get_stats().multishard_query_unpopped_bytes += (size_after - size_before); + }); } catch (...) { // We don't want to fail a read just because of a failure to // save any of the readers. mmq_log.debug("Failed to save reader: {}", std::current_exception()); ++db.get_stats().multishard_query_failed_reader_saves; + return make_ready_future<>(); } }).handle_exception([this, shard] (std::exception_ptr e) { // We don't want to fail a read just because of a failure to @@ -511,6 +516,7 @@ future<> read_context::save_reader(shard_id shard, const dht::decorated_key& las // know where exactly the failure happened anyway. ++_db.local().get_stats().multishard_query_failed_reader_saves; }); + }); } future<> read_context::lookup_readers() { diff --git a/mutation_reader.cc b/mutation_reader.cc index 0643a3c73f..f36901f8de 100644 --- a/mutation_reader.cc +++ b/mutation_reader.cc @@ -26,6 +26,7 @@ #include "mutation_reader.hh" #include +#include #include "flat_mutation_reader.hh" #include "schema_registry.hh" #include "mutation_compactor.hh" @@ -49,7 +50,7 @@ concept FragmentProducer = requires(Producer p, dht::partition_range part_range, // The following functions have the same semantics as their // flat_mutation_reader counterparts. - { p.next_partition() }; + { p.next_partition() } -> std::same_as>; { p.fast_forward_to(part_range, timeout) } -> std::same_as>; { p.fast_forward_to(pos_range, timeout) } -> std::same_as>; }; @@ -131,8 +132,8 @@ public: }); } - void next_partition() { - _producer.next_partition(); + future<> next_partition() { + return _producer.next_partition(); } future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) { @@ -240,7 +241,7 @@ public: // Produces the next batch of mutation-fragments of the same // position. future operator()(db::timeout_clock::time_point timeout); - void next_partition(); + future<> next_partition(); future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout); future<> fast_forward_to(position_range pr, db::timeout_clock::time_point timeout); }; @@ -267,7 +268,7 @@ public: , _fwd_sm(fwd_sm) {} virtual future<> fill_buffer(db::timeout_clock::time_point timeout) override; - virtual void next_partition() override; + virtual future<> next_partition() override; virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override; virtual future<> fast_forward_to(position_range pr, db::timeout_clock::time_point timeout) override; }; @@ -542,7 +543,7 @@ future mutation_reader_merger::operator()(db::timeout_c return make_ready_future(_current); } -void mutation_reader_merger::next_partition() { +future<> mutation_reader_merger::next_partition() { // If the last batch of fragments returned by operator() came from partition P, // we must forward to the partition immediately following P (as per the `next_partition` // contract in `flat_mutation_reader`). @@ -558,7 +559,7 @@ void mutation_reader_merger::next_partition() { prepare_forwardable_readers(); for (auto& rk : _next) { rk.last_kind = mutation_fragment::kind::partition_end; - rk.reader->next_partition(); + co_await rk.reader->next_partition(); } } @@ -605,11 +606,11 @@ future<> merging_reader

::fill_buffer(db::timeout_clock::time_point timeout) { } template -void merging_reader

::next_partition() { +future<> merging_reader

::next_partition() { if (_fwd_sm == streamed_mutation::forwarding::yes) { clear_buffer(); _end_of_stream = false; - _merger.next_partition(); + return _merger.next_partition(); } else { clear_buffer_to_next_partition(); // If the buffer is empty at this point then all fragments in it @@ -619,9 +620,10 @@ void merging_reader

::next_partition() { // Thus we need to call next_partition on it (see the `next_partition` contract // of `flat_mutation_reader`, which `FragmentProducer` follows). if (is_buffer_empty()) { - _merger.next_partition(); + return _merger.next_partition(); } } + return make_ready_future<>(); } template @@ -749,15 +751,16 @@ public: }); }, timeout); } - virtual void next_partition() override { + virtual future<> next_partition() override { clear_buffer_to_next_partition(); if (!is_buffer_empty()) { - return; + return make_ready_future<>(); } _end_of_stream = false; if (auto* state = std::get_if(&_state)) { return state->reader.next_partition(); } + return make_ready_future<>(); } virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override { clear_buffer(); @@ -875,15 +878,18 @@ class foreign_reader : public flat_mutation_reader::impl { timeout, op = std::move(op)] () mutable { auto exec_op_and_read_ahead = [=] () mutable { + auto maybe_next_partition = make_ready_future<>(); if (pending_next_partition) { - reader->next_partition(); + maybe_next_partition = reader->next_partition(); } + return maybe_next_partition.then([=] () mutable { // Not really variadic, we expect 0 (void) or 1 parameter. return op().then([=] (auto... result) { auto f = reader->is_end_of_stream() ? nullptr : std::make_unique>(reader->fill_buffer(timeout)); return make_ready_future>, decltype(result)...>>( std::tuple(make_foreign(std::move(f)), std::move(result)...)); }); + }); }; if (read_ahead_future) { return read_ahead_future->then(std::move(exec_op_and_read_ahead)); @@ -916,7 +922,7 @@ public: foreign_reader& operator=(foreign_reader&&) = delete; virtual future<> fill_buffer(db::timeout_clock::time_point timeout) override; - virtual void next_partition() override; + virtual future<> next_partition() override; virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override; virtual future<> fast_forward_to(position_range pr, db::timeout_clock::time_point timeout) override; }; @@ -963,7 +969,7 @@ future<> foreign_reader::fill_buffer(db::timeout_clock::time_point timeout) { }); } -void foreign_reader::next_partition() { +future<> foreign_reader::next_partition() { if (_fwd_sm == streamed_mutation::forwarding::yes) { clear_buffer(); _end_of_stream = false; @@ -975,6 +981,7 @@ void foreign_reader::next_partition() { _pending_next_partition = true; } } + return make_ready_future<>(); } future<> foreign_reader::fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) { @@ -1087,7 +1094,7 @@ public: mutation_reader::forwarding fwd_mr); ~evictable_reader(); virtual future<> fill_buffer(db::timeout_clock::time_point timeout) override; - virtual void next_partition() override; + virtual future<> next_partition() override; virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override; virtual future<> fast_forward_to(position_range, db::timeout_clock::time_point timeout) override { throw_with_backtrace(); @@ -1478,23 +1485,26 @@ future<> evictable_reader::fill_buffer(db::timeout_clock::time_point timeout) { } return do_with(resume_or_create_reader(), [this, pending_next_partition, timeout] (flat_mutation_reader& reader) mutable { + auto maybe_next_partition = make_ready_future<>(); if (pending_next_partition) { - reader.next_partition(); + maybe_next_partition = reader.next_partition(); } - + return maybe_next_partition.then([this, timeout, &reader] { return fill_buffer(reader, timeout).then([this, &reader] { _end_of_stream = reader.is_end_of_stream() && reader.is_buffer_empty(); maybe_pause(std::move(reader)); }); + }); }); } -void evictable_reader::next_partition() { +future<> evictable_reader::next_partition() { clear_buffer_to_next_partition(); if (is_buffer_empty()) { _pending_next_partition = true; _next_position_in_partition = position_in_partition::for_partition_start(); } + return make_ready_future<>(); } future<> evictable_reader::fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) { @@ -1615,7 +1625,7 @@ public: return buffer().front(); } virtual future<> fill_buffer(db::timeout_clock::time_point timeout) override; - virtual void next_partition() override; + virtual future<> next_partition() override; virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override; virtual future<> fast_forward_to(position_range, db::timeout_clock::time_point timeout) override; bool done() const { @@ -1694,12 +1704,15 @@ future<> shard_reader::do_fill_buffer(db::timeout_clock::time_point timeout) { }); } else { fill_buf_fut = smp::submit_to(_shard, [this, pending_next_partition, timeout] () mutable { + auto maybe_next_partition = make_ready_future<>(); if (pending_next_partition) { - _reader->next_partition(); + maybe_next_partition = _reader->next_partition(); } + return maybe_next_partition.then([this, timeout] { return _reader->fill_buffer(timeout).then([this] { return remote_fill_buffer_result(_reader->detach_buffer(), _reader->is_end_of_stream()); }); + }); }); } @@ -1721,12 +1734,12 @@ future<> shard_reader::fill_buffer(db::timeout_clock::time_point timeout) { return do_fill_buffer(timeout); } -void shard_reader::next_partition() { - if (!_reader) { - return; - } +future<> shard_reader::next_partition() { + if (_reader) { clear_buffer_to_next_partition(); _pending_next_partition = is_buffer_empty(); + } + return make_ready_future<>(); } future<> shard_reader::fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) { @@ -1810,7 +1823,7 @@ public: multishard_combining_reader& operator=(multishard_combining_reader&&) = delete; virtual future<> fill_buffer(db::timeout_clock::time_point timeout) override; - virtual void next_partition() override; + virtual future<> next_partition() override; virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override; virtual future<> fast_forward_to(position_range pr, db::timeout_clock::time_point timeout) override; }; @@ -1930,11 +1943,12 @@ future<> multishard_combining_reader::fill_buffer(db::timeout_clock::time_point }); } -void multishard_combining_reader::next_partition() { +future<> multishard_combining_reader::next_partition() { clear_buffer_to_next_partition(); if (is_buffer_empty()) { - _shard_readers[_current_shard]->next_partition(); + return _shard_readers[_current_shard]->next_partition(); } + return make_ready_future<>(); } future<> multishard_combining_reader::fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) { @@ -2044,8 +2058,8 @@ public: _full.emplace(); return _full->get_future(); } - virtual void next_partition() override { - throw_with_backtrace(); + virtual future<> next_partition() override { + return make_exception_future<>(make_backtraced_exception_ptr()); } virtual future<> fast_forward_to(const dht::partition_range&, db::timeout_clock::time_point) override { return make_exception_future<>(make_backtraced_exception_ptr()); @@ -2268,14 +2282,14 @@ public: }); }); } - virtual void next_partition() override { + virtual future<> next_partition() override { clear_buffer_to_next_partition(); if (!is_buffer_empty()) { - return; + return make_ready_future<>(); } _end_of_stream = false; maybe_inject_partition_end(); - _reader.next_partition(); + return _reader.next_partition(); } virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override { clear_buffer(); @@ -2642,7 +2656,7 @@ public: return make_ready_future(_current_batch); } - void next_partition() { + future<> next_partition() { throw std::runtime_error( "clustering_order_reader_merger::next_partition: this reader works only for single partition queries"); } diff --git a/mutation_reader.hh b/mutation_reader.hh index 3a3dd50b25..40245744af 100644 --- a/mutation_reader.hh +++ b/mutation_reader.hh @@ -87,27 +87,29 @@ public: virtual future<> fill_buffer(db::timeout_clock::time_point timeout) override { return do_until([this] { return is_buffer_full() || is_end_of_stream(); }, [this, timeout] { return _rd.fill_buffer(timeout).then([this] { - while (!_rd.is_buffer_empty()) { + return do_until([this] { return _rd.is_buffer_empty(); }, [this] { auto mf = _rd.pop_mutation_fragment(); if (mf.is_partition_start()) { auto& dk = mf.as_partition_start().key(); if (!_filter(dk)) { - _rd.next_partition(); - continue; + return _rd.next_partition(); } } push_mutation_fragment(std::move(mf)); - } - _end_of_stream = _rd.is_end_of_stream(); + return make_ready_future<>(); + }).then([this] { + _end_of_stream = _rd.is_end_of_stream(); + }); }); }); } - virtual void next_partition() override { + virtual future<> next_partition() override { clear_buffer_to_next_partition(); if (is_buffer_empty()) { _end_of_stream = false; - _rd.next_partition(); + return _rd.next_partition(); } + return make_ready_future<>(); } virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override { clear_buffer(); diff --git a/partition_snapshot_reader.hh b/partition_snapshot_reader.hh index 1c6ed84072..b493106288 100644 --- a/partition_snapshot_reader.hh +++ b/partition_snapshot_reader.hh @@ -304,11 +304,12 @@ public: return make_ready_future<>(); }); } - virtual void next_partition() override { + virtual future<> next_partition() override { clear_buffer_to_next_partition(); if (is_buffer_empty()) { _end_of_stream = true; } + return make_ready_future<>(); } virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override { throw std::runtime_error("This reader can't be fast forwarded to another partition."); diff --git a/read_context.hh b/read_context.hh index a6eb8a1c13..94e78a6d5c 100644 --- a/read_context.hh +++ b/read_context.hh @@ -70,8 +70,8 @@ public: _reader = _cache.create_underlying_reader(_read_context, snap, _range); _reader_creation_phase = phase; } - _reader->next_partition(); + return _reader->next_partition().then([this, timeout] { if (_reader->is_end_of_stream() && _reader->is_buffer_empty()) { return make_ready_future(); } @@ -82,6 +82,7 @@ public: } return std::move(mfopt); }); + }); } future<> fast_forward_to(dht::partition_range&& range, db::timeout_clock::time_point timeout) { auto snapshot_and_phase = _cache.snapshot_of(dht::ring_position_view::for_range_start(_range)); diff --git a/row_cache.cc b/row_cache.cc index 17069eb01b..5a9f21869e 100644 --- a/row_cache.cc +++ b/row_cache.cc @@ -410,11 +410,12 @@ public: }); }); } - virtual void next_partition() override { + virtual future<> next_partition() override { if (_reader) { clear_buffer(); _end_of_stream = true; } + return make_ready_future<>(); } virtual future<> fast_forward_to(const dht::partition_range&, db::timeout_clock::time_point timeout) override { clear_buffer(); @@ -675,11 +676,12 @@ public: } }); } - virtual void next_partition() override { + virtual future<> next_partition() override { clear_buffer_to_next_partition(); if (_reader && is_buffer_empty()) { - _reader->next_partition(); + return _reader->next_partition(); } + return make_ready_future<>(); } virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override { clear_buffer(); diff --git a/sstables/compaction.cc b/sstables/compaction.cc index 2b8dcac9c8..3328c37197 100644 --- a/sstables/compaction.cc +++ b/sstables/compaction.cc @@ -1247,8 +1247,8 @@ class scrub_compaction final : public regular_compaction { } }); } - virtual void next_partition() override { - throw_with_backtrace(); + virtual future<> next_partition() override { + return make_exception_future<>(make_backtraced_exception_ptr()); } virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override { return make_exception_future<>(make_backtraced_exception_ptr()); diff --git a/sstables/partition.cc b/sstables/partition.cc index 367859d1ac..e8b3d0417c 100644 --- a/sstables/partition.cc +++ b/sstables/partition.cc @@ -440,7 +440,7 @@ public: } }); } - virtual void next_partition() override { + virtual future<> next_partition() override { if (is_initialized()) { if (_fwd == streamed_mutation::forwarding::yes) { clear_buffer(); @@ -453,6 +453,7 @@ public: } } } + return make_ready_future<>(); // If _ds is not created then next_partition() has no effect because there was no partition_start emitted yet. } virtual future<> fast_forward_to(position_range cr, db::timeout_clock::time_point timeout) override { diff --git a/test/boost/flat_mutation_reader_test.cc b/test/boost/flat_mutation_reader_test.cc index 0b9c21279e..3b93c4b293 100644 --- a/test/boost/flat_mutation_reader_test.cc +++ b/test/boost/flat_mutation_reader_test.cc @@ -334,7 +334,7 @@ SEASTAR_THREAD_TEST_CASE(test_flat_mutation_reader_move_buffer_content_to) { struct dummy_reader_impl : public flat_mutation_reader::impl { using flat_mutation_reader::impl::impl; virtual future<> fill_buffer(db::timeout_clock::time_point) override { return make_ready_future<>(); } - virtual void next_partition() { } + virtual future<> next_partition() { return make_ready_future<>(); } virtual future<> fast_forward_to(const dht::partition_range&, db::timeout_clock::time_point) override { return make_ready_future<>(); } virtual future<> fast_forward_to(position_range, db::timeout_clock::time_point) override { return make_ready_future<>(); } }; diff --git a/test/boost/mutation_reader_test.cc b/test/boost/mutation_reader_test.cc index 70ec160b5c..3f279c52ec 100644 --- a/test/boost/mutation_reader_test.cc +++ b/test/boost/mutation_reader_test.cc @@ -933,12 +933,13 @@ public: }); } - virtual void next_partition() override { + virtual future<> next_partition() override { _end_of_stream = false; clear_buffer_to_next_partition(); if (is_buffer_empty()) { - _reader.next_partition(); + return _reader.next_partition(); } + return make_ready_future<>(); } virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override { @@ -2084,7 +2085,7 @@ public: } abort(); } - virtual void next_partition() override { } + virtual future<> next_partition() override { return make_ready_future<>(); } virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point) override { ++_ctrl.fast_forward_to; clear_buffer(); @@ -2983,13 +2984,13 @@ SEASTAR_THREAD_TEST_CASE(test_manual_paused_evictable_reader_is_mutation_source) maybe_pause(); }); } - virtual void next_partition() override { + virtual future<> next_partition() override { clear_buffer_to_next_partition(); if (!is_buffer_empty()) { - return; + return make_ready_future<>(); } _end_of_stream = false; - _reader.next_partition(); + return _reader.next_partition(); } virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override { clear_buffer(); @@ -3786,7 +3787,7 @@ SEASTAR_THREAD_TEST_CASE(clustering_combined_reader_mutation_source_test) { } } - virtual void next_partition() override { + virtual future<> next_partition() override { clear_buffer_to_next_partition(); _end_of_stream = false; if (is_buffer_empty()) { @@ -3803,6 +3804,7 @@ SEASTAR_THREAD_TEST_CASE(clustering_combined_reader_mutation_source_test) { // either no previously fetched fragment or must have come from before _it. Nothing to do } } + return make_ready_future<>(); } virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override { diff --git a/test/boost/row_cache_test.cc b/test/boost/row_cache_test.cc index d188111811..f89b079daa 100644 --- a/test/boost/row_cache_test.cc +++ b/test/boost/row_cache_test.cc @@ -195,10 +195,10 @@ public: } return delegating_reader::fill_buffer(timeout); } - virtual void next_partition() override { + virtual future<> next_partition() override { _count_fill_buffer = false; ++_counter; - delegating_reader::next_partition(); + return delegating_reader::next_partition(); } }; diff --git a/test/boost/sstable_datafile_test.cc b/test/boost/sstable_datafile_test.cc index 46f56d9042..24c4fa97ac 100644 --- a/test/boost/sstable_datafile_test.cc +++ b/test/boost/sstable_datafile_test.cc @@ -2504,7 +2504,8 @@ SEASTAR_TEST_CASE(sstable_rewrite) { BOOST_REQUIRE(m->is_partition_start()); auto pkey = partition_key::from_exploded(*s, {to_bytes(key)}); BOOST_REQUIRE(m->as_partition_start().key().key().equal(*s, pkey)); - reader->next_partition(); + return reader->next_partition(); + }).then([reader] { return (*reader)(db::no_timeout); }).then([reader] (mutation_fragment_opt m) { BOOST_REQUIRE(!m); diff --git a/test/boost/sstable_mutation_test.cc b/test/boost/sstable_mutation_test.cc index 37c3c1a380..ab5910bc55 100644 --- a/test/boost/sstable_mutation_test.cc +++ b/test/boost/sstable_mutation_test.cc @@ -358,9 +358,10 @@ future<> test_range_reads(sstables::test_env& env, const dht::token& min, const BOOST_REQUIRE(std::vector({expected.back()}) == mfopt->as_partition_start().key().key().explode()); expected.pop_back(); (*count)++; - mutations->next_partition(); + return mutations->next_partition(); } else { *stop = true; + return make_ready_future<>(); } }); }).then([count, expected_size] { diff --git a/test/lib/flat_mutation_reader_assertions.hh b/test/lib/flat_mutation_reader_assertions.hh index 8d576358e2..4e22acf7c8 100644 --- a/test/lib/flat_mutation_reader_assertions.hh +++ b/test/lib/flat_mutation_reader_assertions.hh @@ -421,7 +421,7 @@ public: flat_reader_assertions& next_partition() { testlog.trace("Skip to next partition"); - _reader.next_partition(); + _reader.next_partition().get(); return *this; } diff --git a/test/lib/normalizing_reader.cc b/test/lib/normalizing_reader.cc index 47288dcf75..9d1e41389b 100644 --- a/test/lib/normalizing_reader.cc +++ b/test/lib/normalizing_reader.cc @@ -70,13 +70,14 @@ future<> normalizing_reader::fill_buffer(db::timeout_clock::time_point timeout) }); } -void normalizing_reader::next_partition() { +future<> normalizing_reader::next_partition() { _range_tombstones.reset(); clear_buffer_to_next_partition(); if (is_buffer_empty()) { _end_of_stream = false; - _rd.next_partition(); + return _rd.next_partition(); } + return make_ready_future<>(); } future<> normalizing_reader::fast_forward_to( const dht::partition_range& pr, db::timeout_clock::time_point timeout) { diff --git a/test/lib/normalizing_reader.hh b/test/lib/normalizing_reader.hh index 6213b00bc6..f682e1e33d 100644 --- a/test/lib/normalizing_reader.hh +++ b/test/lib/normalizing_reader.hh @@ -41,7 +41,7 @@ public: virtual future<> fill_buffer(db::timeout_clock::time_point timeout) override; - virtual void next_partition() override; + virtual future<> next_partition() override; virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override; diff --git a/test/manual/enormous_table_scan_test.cc b/test/manual/enormous_table_scan_test.cc index 12391f859b..2ba3ea7da2 100644 --- a/test/manual/enormous_table_scan_test.cc +++ b/test/manual/enormous_table_scan_test.cc @@ -118,9 +118,10 @@ public: }); } - virtual void next_partition() override { + virtual future<> next_partition() override { clear_buffer(); _end_of_stream = true; + return make_ready_future<>(); } virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override { diff --git a/test/perf/perf_fast_forward.cc b/test/perf/perf_fast_forward.cc index 92b8072a58..ae638ead5b 100644 --- a/test/perf/perf_fast_forward.cc +++ b/test/perf/perf_fast_forward.cc @@ -733,7 +733,7 @@ uint64_t consume_all_with_next_partition(flat_mutation_reader& rd) { uint64_t fragments = 0; do { fragments += consume_all(rd); - rd.next_partition(); + rd.next_partition().get(); rd.fill_buffer(db::no_timeout).get(); } while(!rd.is_end_of_stream() || !rd.is_buffer_empty()); return fragments;