From d0ea8956716708bf74a59db063e960146f254993 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Fri, 25 Mar 2022 14:53:42 +0200 Subject: [PATCH] readers: move multishard reader & friends to reader/multishard.cc Since the multishard reader family weighs more than 1K SLOC, it gets its own .cc file. --- configure.py | 1 + db/view/view.cc | 1 + db/view/view_update_generator.cc | 1 + multishard_mutation_query.cc | 1 + mutation_reader.cc | 1122 ------------------------- mutation_reader.hh | 197 ----- mutation_writer/multishard_writer.cc | 1 + readers/evictable.hh | 83 ++ readers/foreign.hh | 34 + readers/multishard.cc | 1144 ++++++++++++++++++++++++++ readers/multishard.hh | 140 ++++ repair/row_level.cc | 1 + replica/database.cc | 1 + test/boost/mutation_reader_test.cc | 2 + test/boost/view_build_test.cc | 1 + test/lib/reader_lifecycle_policy.hh | 2 +- 16 files changed, 1412 insertions(+), 1320 deletions(-) create mode 100644 readers/evictable.hh create mode 100644 readers/foreign.hh create mode 100644 readers/multishard.cc create mode 100644 readers/multishard.hh diff --git a/configure.py b/configure.py index 5f7192d154..bec3cad862 100755 --- a/configure.py +++ b/configure.py @@ -700,6 +700,7 @@ scylla_core = (['replica/database.cc', 'converting_mutation_partition_applier.cc', 'mutation_reader.cc', 'readers/combined.cc', + 'readers/multishard.cc', 'readers/mutation_reader.cc', 'readers/mutation_readers.cc', 'mutation_query.cc', diff --git a/db/view/view.cc b/db/view/view.cc index 4be545214a..af4ce9de2b 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -57,6 +57,7 @@ #include "utils/fb_utilities.hh" #include "query-result-writer.hh" #include "readers/from_fragments_v2.hh" +#include "readers/evictable.hh" using namespace std::chrono_literals; diff --git a/db/view/view_update_generator.cc b/db/view/view_update_generator.cc index 3d770a086f..b8ceddde79 100644 --- a/db/view/view_update_generator.cc +++ b/db/view/view_update_generator.cc @@ -13,6 +13,7 @@ #include "utils/error_injection.hh" #include "db/view/view_updating_consumer.hh" #include "sstables/sstables.hh" +#include "readers/evictable.hh" static logging::logger vug_logger("view_update_generator"); diff --git a/multishard_mutation_query.cc b/multishard_mutation_query.cc index 6e0417e59d..4c4b03b5a2 100644 --- a/multishard_mutation_query.cc +++ b/multishard_mutation_query.cc @@ -12,6 +12,7 @@ #include "replica/database.hh" #include "db/config.hh" #include "query-result-writer.hh" +#include "readers/multishard.hh" #include diff --git a/mutation_reader.cc b/mutation_reader.cc index 7f4e7fbf25..918182b856 100644 --- a/mutation_reader.cc +++ b/mutation_reader.cc @@ -67,1128 +67,6 @@ mutation_source make_combined_mutation_source(std::vector adden }); } -namespace { - -struct remote_fill_buffer_result_v2 { - foreign_ptr> buffer; - bool end_of_stream = false; - - remote_fill_buffer_result_v2() = default; - remote_fill_buffer_result_v2(flat_mutation_reader_v2::tracked_buffer&& buffer, bool end_of_stream) - : buffer(make_foreign(std::make_unique(std::move(buffer)))) - , end_of_stream(end_of_stream) { - } -}; - -} - -/// See make_foreign_reader() for description. -class foreign_reader : public flat_mutation_reader_v2::impl { - template - using foreign_unique_ptr = foreign_ptr>; - - using fragment_buffer = flat_mutation_reader_v2::tracked_buffer; - - foreign_unique_ptr _reader; - foreign_unique_ptr> _read_ahead_future; - streamed_mutation::forwarding _fwd_sm; - - // Forward an operation to the reader on the remote shard. - // If the remote reader has an ongoing read-ahead, bring it to the - // foreground (wait on it) and execute the operation after. - // After the operation completes, kick off a new read-ahead (fill_buffer()) - // and move it to the background (save it's future but don't wait on it - // now). If all works well read-aheads complete by the next operation and - // we don't have to wait on the remote reader filling its buffer. - template >> - Result forward_operation(Operation op) { - reader_permit::blocked_guard bg{_permit}; - return smp::submit_to(_reader.get_owner_shard(), [reader = _reader.get(), - read_ahead_future = std::exchange(_read_ahead_future, nullptr), - op = std::move(op)] () mutable { - auto exec_op_and_read_ahead = [=] () mutable { - // Not really variadic, we expect 0 (void) or 1 parameter. - return op().then([=] (auto... result) { - auto f = reader->is_end_of_stream() ? nullptr : std::make_unique>(reader->fill_buffer()); - return make_ready_future>, decltype(result)...>>( - std::tuple(make_foreign(std::move(f)), std::move(result)...)); - }); - }; - if (read_ahead_future) { - return read_ahead_future->then(std::move(exec_op_and_read_ahead)); - } else { - return exec_op_and_read_ahead(); - } - }).then([this] (auto fut_and_result) { - _read_ahead_future = std::get<0>(std::move(fut_and_result)); - static_assert(std::tuple_size::value <= 2); - if constexpr (std::tuple_size::value == 1) { - return make_ready_future<>(); - } else { - auto result = std::get<1>(std::move(fut_and_result)); - return make_ready_future(std::move(result)); - } - }).finally([bg = std::move(bg)] { }); - } -public: - foreign_reader(schema_ptr schema, - reader_permit permit, - foreign_unique_ptr reader, - streamed_mutation::forwarding fwd_sm = streamed_mutation::forwarding::no); - - // this is captured. - foreign_reader(const foreign_reader&) = delete; - foreign_reader& operator=(const foreign_reader&) = delete; - foreign_reader(foreign_reader&&) = delete; - foreign_reader& operator=(foreign_reader&&) = delete; - - virtual future<> fill_buffer() override; - virtual future<> next_partition() override; - virtual future<> fast_forward_to(const dht::partition_range& pr) override; - virtual future<> fast_forward_to(position_range pr) override; - virtual future<> close() noexcept override; -}; - -foreign_reader::foreign_reader(schema_ptr schema, - reader_permit permit, - foreign_unique_ptr reader, - streamed_mutation::forwarding fwd_sm) - : impl(std::move(schema), std::move(permit)) - , _reader(std::move(reader)) - , _fwd_sm(fwd_sm) { -} - -future<> foreign_reader::fill_buffer() { - if (_end_of_stream || is_buffer_full()) { - return make_ready_future(); - } - - return forward_operation([reader = _reader.get()] () { - auto f = reader->is_buffer_empty() ? reader->fill_buffer() : make_ready_future<>(); - return f.then([=] { - return make_ready_future(remote_fill_buffer_result_v2(reader->detach_buffer(), reader->is_end_of_stream())); - }); - }).then([this] (remote_fill_buffer_result_v2 res) mutable { - _end_of_stream = res.end_of_stream; - for (const auto& mf : *res.buffer) { - // Need a copy since the mf is on the remote shard. - push_mutation_fragment(mutation_fragment_v2(*_schema, _permit, mf)); - } - }); -} - -future<> foreign_reader::next_partition() { - if (_fwd_sm == streamed_mutation::forwarding::yes) { - clear_buffer(); - _end_of_stream = false; - } else { - clear_buffer_to_next_partition(); - if (!is_buffer_empty()) { - co_return; - } - _end_of_stream = false; - } - co_await forward_operation([reader = _reader.get()] () { - return reader->next_partition(); - }); -} - -future<> foreign_reader::fast_forward_to(const dht::partition_range& pr) { - clear_buffer(); - _end_of_stream = false; - return forward_operation([reader = _reader.get(), &pr] () { - return reader->fast_forward_to(pr); - }); -} - -future<> foreign_reader::fast_forward_to(position_range pr) { - forward_buffer_to(pr.start()); - _end_of_stream = false; - return forward_operation([reader = _reader.get(), pr = std::move(pr)] () { - return reader->fast_forward_to(std::move(pr)); - }); -} - -future<> foreign_reader::close() noexcept { - if (!_reader) { - if (_read_ahead_future) { - on_internal_error_noexcept(mrlog, "foreign_reader::close can't wait on read_ahead future with disengaged reader"); - } - return make_ready_future<>(); - } - return smp::submit_to(_reader.get_owner_shard(), - [reader = std::move(_reader), read_ahead_future = std::exchange(_read_ahead_future, nullptr)] () mutable { - auto read_ahead = read_ahead_future ? std::move(*read_ahead_future.get()) : make_ready_future<>(); - return read_ahead.then_wrapped([reader = std::move(reader)] (future<> f) mutable { - if (f.failed()) { - auto ex = f.get_exception(); - mrlog.warn("foreign_reader: benign read_ahead failure during close: {}. Ignoring.", ex); - } - return reader->close(); - }); - }); -} - -flat_mutation_reader_v2 make_foreign_reader(schema_ptr schema, - reader_permit permit, - foreign_ptr> reader, - streamed_mutation::forwarding fwd_sm) { - if (reader.get_owner_shard() == this_shard_id()) { - return std::move(*reader); - } - return make_flat_mutation_reader_v2(std::move(schema), std::move(permit), std::move(reader), fwd_sm); -} - -template -static void require(bool condition, const char* msg, const Arg&... arg) { - if (!condition) { - on_internal_error(mrlog, format(msg, arg...)); - } -} - -// Encapsulates all data and logic that is local to the remote shard the -// reader lives on. -class evictable_reader_v2 : public flat_mutation_reader_v2::impl { -public: - using auto_pause = bool_class; - -private: - auto_pause _auto_pause; - mutation_source _ms; - const dht::partition_range* _pr; - const query::partition_slice& _ps; - const io_priority_class& _pc; - tracing::global_trace_state_ptr _trace_state; - const mutation_reader::forwarding _fwd_mr; - reader_concurrency_semaphore::inactive_read_handle _irh; - bool _reader_recreated = false; // set if reader was recreated since last operation - position_in_partition::tri_compare _tri_cmp; - - std::optional _last_pkey; - position_in_partition _next_position_in_partition = position_in_partition::for_partition_start(); - // These are used when the reader has to be recreated (after having been - // evicted while paused) and the range and/or slice it is recreated with - // differs from the original ones. - std::optional _range_override; - std::optional _slice_override; - - flat_mutation_reader_v2_opt _reader; - -private: - void do_pause(flat_mutation_reader_v2 reader); - void maybe_pause(flat_mutation_reader_v2 reader); - flat_mutation_reader_v2_opt try_resume(); - void update_next_position(); - void adjust_partition_slice(); - flat_mutation_reader_v2 recreate_reader(); - future resume_or_create_reader(); - void validate_partition_start(const partition_start& ps); - void validate_position_in_partition(position_in_partition_view pos) const; - void examine_first_fragments(mutation_fragment_v2_opt& mf1, mutation_fragment_v2_opt& mf2, mutation_fragment_v2_opt& mf3); - -public: - evictable_reader_v2( - auto_pause ap, - mutation_source ms, - schema_ptr schema, - reader_permit permit, - const dht::partition_range& pr, - const query::partition_slice& ps, - const io_priority_class& pc, - tracing::trace_state_ptr trace_state, - mutation_reader::forwarding fwd_mr); - virtual future<> fill_buffer() override; - virtual future<> next_partition() override; - virtual future<> fast_forward_to(const dht::partition_range& pr) override; - virtual future<> fast_forward_to(position_range) override { - throw_with_backtrace(); - } - virtual future<> close() noexcept override { - if (_reader) { - return _reader->close(); - } - if (auto reader_opt = try_resume()) { - return reader_opt->close(); - } - return make_ready_future<>(); - } - reader_concurrency_semaphore::inactive_read_handle inactive_read_handle() && { - return std::move(_irh); - } - void pause() { - if (_reader) { - do_pause(std::move(*_reader)); - } - } - reader_permit permit() { - return _permit; - } -}; - -void evictable_reader_v2::do_pause(flat_mutation_reader_v2 reader) { - assert(!_irh); - _irh = _permit.semaphore().register_inactive_read(std::move(reader)); -} - -void evictable_reader_v2::maybe_pause(flat_mutation_reader_v2 reader) { - if (_auto_pause) { - do_pause(std::move(reader)); - } else { - _reader = std::move(reader); - } -} - -flat_mutation_reader_v2_opt evictable_reader_v2::try_resume() { - if (auto reader_opt = _permit.semaphore().unregister_inactive_read(std::move(_irh))) { - return std::move(*reader_opt); - } - return {}; -} - -void evictable_reader_v2::update_next_position() { - if (is_buffer_empty()) { - return; - } - - auto rbegin = std::reverse_iterator(buffer().end()); - auto rend = std::reverse_iterator(buffer().begin()); - if (auto pk_it = std::find_if(rbegin, rend, std::mem_fn(&mutation_fragment_v2::is_partition_start)); pk_it != rend) { - _last_pkey = pk_it->as_partition_start().key(); - } - - const auto last_pos = buffer().back().position(); - switch (last_pos.region()) { - case partition_region::partition_start: - _next_position_in_partition = position_in_partition::for_static_row(); - break; - case partition_region::static_row: - _next_position_in_partition = position_in_partition::before_all_clustered_rows(); - break; - case partition_region::clustered: - if (!_reader->is_buffer_empty() && _reader->peek_buffer().is_end_of_partition()) { - push_mutation_fragment(_reader->pop_mutation_fragment()); - _next_position_in_partition = position_in_partition::for_partition_start(); - } else { - _next_position_in_partition = position_in_partition::after_key(last_pos); - } - break; - case partition_region::partition_end: - _next_position_in_partition = position_in_partition::for_partition_start(); - break; - } -} - -void evictable_reader_v2::adjust_partition_slice() { - const auto reversed = _ps.options.contains(query::partition_slice::option::reversed); - _slice_override = reversed ? query::legacy_reverse_slice_to_native_reverse_slice(*_schema, _ps) : _ps; - - auto ranges = _slice_override->default_row_ranges(); - query::trim_clustering_row_ranges_to(*_schema, ranges, _next_position_in_partition); - - _slice_override->clear_ranges(); - _slice_override->set_range(*_schema, _last_pkey->key(), std::move(ranges)); - - if (reversed) { - _slice_override = query::native_reverse_slice_to_legacy_reverse_slice(*_schema, std::move(*_slice_override)); - } -} - -flat_mutation_reader_v2 evictable_reader_v2::recreate_reader() { - const dht::partition_range* range = _pr; - const query::partition_slice* slice = &_ps; - - _range_override.reset(); - _slice_override.reset(); - - if (_last_pkey) { - bool partition_range_is_inclusive = true; - - switch (_next_position_in_partition.region()) { - case partition_region::partition_start: - partition_range_is_inclusive = false; - break; - case partition_region::static_row: - break; - case partition_region::clustered: - adjust_partition_slice(); - slice = &*_slice_override; - break; - case partition_region::partition_end: - partition_range_is_inclusive = false; - break; - } - - // The original range contained a single partition and we've read it - // all. We'd have to create a reader with an empty range that would - // immediately be at EOS. This is not possible so just create an empty - // reader instead. - // This should be extremely rare (who'd create a multishard reader to - // read a single partition) but still, let's make sure we handle it - // correctly. - if (_pr->is_singular() && !partition_range_is_inclusive) { - return make_empty_flat_reader_v2(_schema, _permit); - } - - _range_override = dht::partition_range({dht::partition_range::bound(*_last_pkey, partition_range_is_inclusive)}, _pr->end()); - range = &*_range_override; - - _reader_recreated = true; - } - - return _ms.make_reader_v2( - _schema, - _permit, - *range, - *slice, - _pc, - _trace_state, - streamed_mutation::forwarding::no, - _fwd_mr); -} - -future evictable_reader_v2::resume_or_create_reader() { - if (_reader) { - co_return std::move(*_reader); - } - if (auto reader_opt = try_resume()) { - co_return std::move(*reader_opt); - } - // When the reader is created the first time and we are actually resuming a - // saved reader in `recreate_reader()`, we have two cases here: - // * the reader is still alive (in inactive state) - // * the reader was evicted - // We check for this below with `needs_readmission()` and it is very - // important to not allow for preemption between said check and - // `recreate_reader()`, otherwise the reader might be evicted between the - // check and `recreate_reader()` and the latter will recreate it without - // waiting for re-admission. - if (_permit.needs_readmission()) { - co_await _permit.wait_readmission(); - } - co_return recreate_reader(); -} - -void evictable_reader_v2::validate_partition_start(const partition_start& ps) { - const auto tri_cmp = dht::ring_position_comparator(*_schema); - // If we recreated the reader after fast-forwarding it we won't have - // _last_pkey set. In this case it is enough to check if the partition - // is in range. - if (_last_pkey) { - const auto cmp_res = tri_cmp(*_last_pkey, ps.key()); - if (_next_position_in_partition.region() != partition_region::partition_start) { // we expect to continue from the same partition - // We cannot assume the partition we stopped the read at is still alive - // when we recreate the reader. It might have been compacted away in the - // meanwhile, so allow for a larger partition too. - require( - cmp_res <= 0, - "{}(): validation failed, expected partition with key larger or equal to _last_pkey {}, but got {}", - __FUNCTION__, - *_last_pkey, - ps.key()); - // Reset next pos if we are not continuing from the same partition - if (cmp_res < 0) { - // Close previous partition, we are not going to continue it. - push_mutation_fragment(*_schema, _permit, partition_end{}); - _next_position_in_partition = position_in_partition::for_partition_start(); - } - } else { // should be a larger partition - require( - cmp_res < 0, - "{}(): validation failed, expected partition with key larger than _last_pkey {}, but got {}", - __FUNCTION__, - *_last_pkey, - ps.key()); - } - } - const auto& prange = _range_override ? *_range_override : *_pr; - require( - // TODO: somehow avoid this copy - prange.contains(ps.key(), tri_cmp), - "{}(): validation failed, expected partition with key that falls into current range {}, but got {}", - __FUNCTION__, - prange, - ps.key()); -} - -void evictable_reader_v2::validate_position_in_partition(position_in_partition_view pos) const { - require( - _tri_cmp(_next_position_in_partition, pos) <= 0, - "{}(): validation failed, expected position in partition that is larger-than-equal than _next_position_in_partition {}, but got {}", - __FUNCTION__, - _next_position_in_partition, - pos); - - if (_slice_override && pos.region() == partition_region::clustered) { - const auto reversed = _ps.options.contains(query::partition_slice::option::reversed); - std::optional native_slice; - if (reversed) { - native_slice = query::legacy_reverse_slice_to_native_reverse_slice(*_schema, *_slice_override); - } - auto& slice = reversed ? *native_slice : *_slice_override; - - const auto ranges = slice.row_ranges(*_schema, _last_pkey->key()); - const bool any_contains = std::any_of(ranges.begin(), ranges.end(), [this, &pos] (const query::clustering_range& cr) { - // TODO: somehow avoid this copy - auto range = position_range(cr); - // We cannot use range.contains() because that treats range as a - // [a, b) range, meaning a range tombstone change with position - // after_key(b) will be considered outside of it. Such range - // tombstone changes can be emitted however when recreating the - // reader on clustering range edge. - return _tri_cmp(range.start(), pos) <= 0 && _tri_cmp(pos, range.end()) <= 0; - }); - require( - any_contains, - "{}(): validation failed, expected clustering fragment that is included in the slice {}, but got {}", - __FUNCTION__, - slice, - pos); - } -} - -void evictable_reader_v2::examine_first_fragments(mutation_fragment_v2_opt& mf1, mutation_fragment_v2_opt& mf2, mutation_fragment_v2_opt& mf3) { - if (!mf1) { - return; // the reader is at EOS - } - - // If engaged, the first fragment is always a partition-start. - validate_partition_start(mf1->as_partition_start()); - if (_tri_cmp(mf1->position(), _next_position_in_partition) < 0) { - mf1 = {}; // drop mf1 - } - - const auto continue_same_partition = _next_position_in_partition.region() != partition_region::partition_start; - - // If we have a first fragment, we are guaranteed to have a second one -- if not else, a partition-end. - if (mf2->is_end_of_partition()) { - return; // no further fragments, nothing to do - } - - // We want to validate the position of the first non-dropped fragment. - // If mf2 is a static row and we need to drop it, this will be mf3. - if (mf2->is_static_row() && _tri_cmp(mf2->position(), _next_position_in_partition) < 0) { - mf2 = {}; // drop mf2 - } else { - if (continue_same_partition) { - validate_position_in_partition(mf2->position()); - } - return; - } - - if (mf3->is_end_of_partition()) { - return; // no further fragments, nothing to do - } else if (continue_same_partition) { - validate_position_in_partition(mf3->position()); - } -} - -evictable_reader_v2::evictable_reader_v2( - auto_pause ap, - mutation_source ms, - schema_ptr schema, - reader_permit permit, - const dht::partition_range& pr, - const query::partition_slice& ps, - const io_priority_class& pc, - tracing::trace_state_ptr trace_state, - mutation_reader::forwarding fwd_mr) - : impl(std::move(schema), std::move(permit)) - , _auto_pause(ap) - , _ms(std::move(ms)) - , _pr(&pr) - , _ps(ps) - , _pc(pc) - , _trace_state(std::move(trace_state)) - , _fwd_mr(fwd_mr) - , _tri_cmp(*_schema) { -} - -future<> evictable_reader_v2::fill_buffer() { - if (is_end_of_stream()) { - co_return; - } - _reader = co_await resume_or_create_reader(); - - if (_reader_recreated) { - // Recreating the reader breaks snapshot isolation and creates all sorts - // of complications around the continuity of range tombstone changes, - // e.g. a range tombstone started by the previous reader object - // might not exist anymore with the new reader object. - // To avoid complications we reset the tombstone state on each reader - // recreation by emitting a null tombstone change, if we read at least - // one clustering fragment from the partition. - if (_next_position_in_partition.region() == partition_region::clustered - && _tri_cmp(_next_position_in_partition, position_in_partition::before_all_clustered_rows()) > 0) { - push_mutation_fragment(*_schema, _permit, range_tombstone_change{position_in_partition_view::before_key(_next_position_in_partition), {}}); - } - auto mf1 = co_await (*_reader)(); - auto mf2 = co_await (*_reader)(); - auto mf3 = co_await (*_reader)(); - examine_first_fragments(mf1, mf2, mf3); - if (mf3) { - _reader->unpop_mutation_fragment(std::move(*mf3)); - } - if (mf2) { - _reader->unpop_mutation_fragment(std::move(*mf2)); - } - if (mf1) { - _reader->unpop_mutation_fragment(std::move(*mf1)); - } - _reader_recreated = false; - } else { - co_await _reader->fill_buffer(); - } - - _reader->move_buffer_content_to(*this); - - // Ensure that each buffer represents forward progress. Only a concern when - // the last fragment in the buffer is range tombstone change. In this case - // ensure that: - // * buffer().back().position() > _next_position_in_partition; - // * _reader.peek()->position() > buffer().back().position(); - if (!is_buffer_empty() && buffer().back().is_range_tombstone_change()) { - auto* next_mf = co_await _reader->peek(); - - // First make sure we've made progress w.r.t. _next_position_in_partition. - while (next_mf && _tri_cmp(_next_position_in_partition, buffer().back().position()) <= 0) { - push_mutation_fragment(_reader->pop_mutation_fragment()); - next_mf = co_await _reader->peek(); - } - - const auto last_pos = position_in_partition(buffer().back().position()); - while (next_mf && _tri_cmp(last_pos, next_mf->position()) == 0) { - push_mutation_fragment(_reader->pop_mutation_fragment()); - next_mf = co_await _reader->peek(); - } - } - - update_next_position(); - _end_of_stream = _reader->is_end_of_stream(); - maybe_pause(std::move(*_reader)); -} - -future<> evictable_reader_v2::next_partition() { - _next_position_in_partition = position_in_partition::for_partition_start(); - clear_buffer_to_next_partition(); - if (!is_buffer_empty()) { - co_return; - } - auto reader = co_await resume_or_create_reader(); - co_await reader.next_partition(); - maybe_pause(std::move(reader)); -} - -future<> evictable_reader_v2::fast_forward_to(const dht::partition_range& pr) { - _pr = ≺ - _last_pkey.reset(); - _next_position_in_partition = position_in_partition::for_partition_start(); - clear_buffer(); - _end_of_stream = false; - - if (_reader) { - co_await _reader->fast_forward_to(pr); - _range_override.reset(); - co_return; - } - if (auto reader_opt = try_resume()) { - co_await reader_opt->fast_forward_to(pr); - _range_override.reset(); - maybe_pause(std::move(*reader_opt)); - } -} - -evictable_reader_handle_v2::evictable_reader_handle_v2(evictable_reader_v2& r) : _r(&r) -{ } - -void evictable_reader_handle_v2::evictable_reader_handle_v2::pause() { - _r->pause(); -} - -flat_mutation_reader_v2 make_auto_paused_evictable_reader_v2( - mutation_source ms, - schema_ptr schema, - reader_permit permit, - const dht::partition_range& pr, - const query::partition_slice& ps, - const io_priority_class& pc, - tracing::trace_state_ptr trace_state, - mutation_reader::forwarding fwd_mr) { - return make_flat_mutation_reader_v2(evictable_reader_v2::auto_pause::yes, std::move(ms), std::move(schema), std::move(permit), pr, ps, - pc, std::move(trace_state), fwd_mr); -} - -std::pair make_manually_paused_evictable_reader_v2( - mutation_source ms, - schema_ptr schema, - reader_permit permit, - const dht::partition_range& pr, - const query::partition_slice& ps, - const io_priority_class& pc, - tracing::trace_state_ptr trace_state, - mutation_reader::forwarding fwd_mr) { - auto reader = std::make_unique(evictable_reader_v2::auto_pause::no, std::move(ms), std::move(schema), std::move(permit), pr, ps, - pc, std::move(trace_state), fwd_mr); - auto handle = evictable_reader_handle_v2(*reader.get()); - return std::pair(flat_mutation_reader_v2(std::move(reader)), handle); -} - -namespace { - -// A special-purpose shard reader. -// -// Shard reader manages a reader located on a remote shard. It transparently -// supports read-ahead (background fill_buffer() calls). -// This reader is not for general use, it was designed to serve the -// multishard_combining_reader. -// Although it implements the flat_mutation_reader_v2:impl interface it cannot be -// wrapped into a flat_mutation_reader_v2, as it needs to be managed by a shared -// pointer. -class shard_reader_v2 : public flat_mutation_reader_v2::impl { -private: - shared_ptr _lifecycle_policy; - const unsigned _shard; - foreign_ptr> _pr; - const query::partition_slice& _ps; - const io_priority_class& _pc; - tracing::global_trace_state_ptr _trace_state; - const mutation_reader::forwarding _fwd_mr; - std::optional> _read_ahead; - foreign_ptr> _reader; - -private: - future<> do_fill_buffer(); - -public: - shard_reader_v2( - schema_ptr schema, - reader_permit permit, - shared_ptr lifecycle_policy, - unsigned shard, - const dht::partition_range& pr, - const query::partition_slice& ps, - const io_priority_class& pc, - tracing::trace_state_ptr trace_state, - mutation_reader::forwarding fwd_mr) - : impl(std::move(schema), std::move(permit)) - , _lifecycle_policy(std::move(lifecycle_policy)) - , _shard(shard) - , _pr(make_foreign(make_lw_shared(pr))) - , _ps(ps) - , _pc(pc) - , _trace_state(std::move(trace_state)) - , _fwd_mr(fwd_mr) { - } - - shard_reader_v2(shard_reader_v2&&) = delete; - shard_reader_v2& operator=(shard_reader_v2&&) = delete; - - shard_reader_v2(const shard_reader_v2&) = delete; - shard_reader_v2& operator=(const shard_reader_v2&) = delete; - - const mutation_fragment_v2& peek_buffer() const { - return buffer().front(); - } - virtual future<> fill_buffer() override; - virtual future<> next_partition() override; - virtual future<> fast_forward_to(const dht::partition_range& pr) override; - virtual future<> fast_forward_to(position_range) override; - virtual future<> close() noexcept override; - bool done() const { - return _reader && is_buffer_empty() && is_end_of_stream(); - } - void read_ahead(); - bool is_read_ahead_in_progress() const { - return _read_ahead.has_value(); - } -}; - -future<> shard_reader_v2::close() noexcept { - if (_read_ahead) { - try { - co_await *std::exchange(_read_ahead, std::nullopt); - } catch (...) { - mrlog.warn("shard_reader::close(): read_ahead on shard {} failed: {}", _shard, std::current_exception()); - } - } - - try { - co_await smp::submit_to(_shard, [this] { - if (!_reader) { - return make_ready_future<>(); - } - - auto irh = std::move(*_reader).inactive_read_handle(); - return with_closeable(flat_mutation_reader_v2(_reader.release()), [this] (flat_mutation_reader_v2& reader) mutable { - auto permit = reader.permit(); - const auto& schema = *reader.schema(); - - auto unconsumed_fragments = reader.detach_buffer(); - auto rit = std::reverse_iterator(buffer().cend()); - auto rend = std::reverse_iterator(buffer().cbegin()); - for (; rit != rend; ++rit) { - unconsumed_fragments.emplace_front(schema, permit, *rit); // we are copying from the remote shard. - } - - return unconsumed_fragments; - }).then([this, irh = std::move(irh)] (flat_mutation_reader_v2::tracked_buffer&& buf) mutable { - return _lifecycle_policy->destroy_reader({std::move(irh), std::move(buf)}); - }); - }); - } catch (...) { - mrlog.error("shard_reader::close(): failed to stop reader on shard {}: {}", _shard, std::current_exception()); - } -} - -future<> shard_reader_v2::do_fill_buffer() { - auto fill_buf_fut = make_ready_future(); - - struct reader_and_buffer_fill_result { - foreign_ptr> reader; - remote_fill_buffer_result_v2 result; - }; - - if (!_reader) { - fill_buf_fut = smp::submit_to(_shard, [this, gs = global_schema_ptr(_schema)] () -> future { - auto ms = mutation_source([lifecycle_policy = _lifecycle_policy.get()] ( - schema_ptr s, - reader_permit permit, - const dht::partition_range& pr, - const query::partition_slice& ps, - const io_priority_class& pc, - tracing::trace_state_ptr ts, - streamed_mutation::forwarding, - mutation_reader::forwarding fwd_mr) { - return lifecycle_policy->create_reader(std::move(s), std::move(permit), pr, ps, pc, std::move(ts), fwd_mr); - }); - auto s = gs.get(); - auto permit = co_await _lifecycle_policy->obtain_reader_permit(s, "shard-reader", timeout()); - auto rreader = make_foreign(std::make_unique(evictable_reader_v2::auto_pause::yes, std::move(ms), - s, std::move(permit), *_pr, _ps, _pc, _trace_state, _fwd_mr)); - - std::exception_ptr ex; - try { - tracing::trace(_trace_state, "Creating shard reader on shard: {}", this_shard_id()); - reader_permit::used_guard ug{rreader->permit()}; - co_await rreader->fill_buffer(); - auto res = remote_fill_buffer_result_v2(rreader->detach_buffer(), rreader->is_end_of_stream()); - co_return reader_and_buffer_fill_result{std::move(rreader), std::move(res)}; - } catch (...) { - ex = std::current_exception(); - } - co_await rreader->close(); - std::rethrow_exception(std::move(ex)); - }).then([this] (reader_and_buffer_fill_result res) { - _reader = std::move(res.reader); - return std::move(res.result); - }); - } else { - fill_buf_fut = smp::submit_to(_shard, [this] () mutable { - reader_permit::used_guard ug{_reader->permit()}; - return _reader->fill_buffer().then([this, ug = std::move(ug)] { - return remote_fill_buffer_result_v2(_reader->detach_buffer(), _reader->is_end_of_stream()); - }); - }); - } - - return fill_buf_fut.then([this] (remote_fill_buffer_result_v2 res) mutable { - _end_of_stream = res.end_of_stream; - for (const auto& mf : *res.buffer) { - push_mutation_fragment(mutation_fragment_v2(*_schema, _permit, mf)); - } - }); -} - -future<> shard_reader_v2::fill_buffer() { - // FIXME: want to move this to the inner scopes but it makes clang miscompile the code. - reader_permit::blocked_guard guard(_permit); - if (_read_ahead) { - co_await *std::exchange(_read_ahead, std::nullopt); - co_return; - } - if (!is_buffer_empty()) { - co_return; - } - co_await do_fill_buffer(); -} - -future<> shard_reader_v2::next_partition() { - if (!_reader) { - co_return; - } - - // FIXME: want to move this to the inner scopes but it makes clang miscompile the code. - reader_permit::blocked_guard guard(_permit); - - if (_read_ahead) { - co_await *std::exchange(_read_ahead, std::nullopt); - } - clear_buffer_to_next_partition(); - if (!is_buffer_empty()) { - co_return; - } - co_return co_await smp::submit_to(_shard, [this] { - return _reader->next_partition(); - }); -} - -future<> shard_reader_v2::fast_forward_to(const dht::partition_range& pr) { - if (!_reader && !_read_ahead) { - // No need to fast-forward uncreated readers, they will be passed the new - // range when created. - _pr = make_foreign(make_lw_shared(pr)); - co_return; - } - - reader_permit::blocked_guard guard(_permit); - - if (_read_ahead) { - co_await *std::exchange(_read_ahead, std::nullopt); - } - _end_of_stream = false; - clear_buffer(); - - _pr = co_await smp::submit_to(_shard, [this, &pr] () -> future>> { - auto new_pr = make_lw_shared(pr); - co_await _reader->fast_forward_to(*new_pr); - _lifecycle_policy->update_read_range(new_pr); - co_return make_foreign(std::move(new_pr)); - }); -} - -future<> shard_reader_v2::fast_forward_to(position_range) { - return make_exception_future<>(make_backtraced_exception_ptr()); -} - -void shard_reader_v2::read_ahead() { - if (_read_ahead || is_end_of_stream() || !is_buffer_empty()) { - return; - } - - _read_ahead.emplace(do_fill_buffer()); -} - -} // anonymous namespace - -// See make_multishard_combining_reader() for description. -class multishard_combining_reader_v2 : public flat_mutation_reader_v2::impl { - struct shard_and_token { - shard_id shard; - dht::token token; - - bool operator<(const shard_and_token& o) const { - // Reversed, as we want a min-heap. - return token > o.token; - } - }; - - const dht::sharder& _sharder; - std::vector> _shard_readers; - // Contains the position of each shard with token granularity, organized - // into a min-heap. Used to select the shard with the smallest token each - // time a shard reader produces a new partition. - std::vector _shard_selection_min_heap; - unsigned _current_shard; - bool _crossed_shards; - unsigned _concurrency = 1; - - void on_partition_range_change(const dht::partition_range& pr); - bool maybe_move_to_next_shard(const dht::token* const t = nullptr); - future<> handle_empty_reader_buffer(); - -public: - multishard_combining_reader_v2( - const dht::sharder& sharder, - shared_ptr lifecycle_policy, - schema_ptr s, - reader_permit permit, - const dht::partition_range& pr, - const query::partition_slice& ps, - const io_priority_class& pc, - tracing::trace_state_ptr trace_state, - mutation_reader::forwarding fwd_mr); - - // this is captured. - multishard_combining_reader_v2(const multishard_combining_reader_v2&) = delete; - multishard_combining_reader_v2& operator=(const multishard_combining_reader_v2&) = delete; - multishard_combining_reader_v2(multishard_combining_reader_v2&&) = delete; - multishard_combining_reader_v2& operator=(multishard_combining_reader_v2&&) = delete; - - virtual future<> fill_buffer() override; - virtual future<> next_partition() override; - virtual future<> fast_forward_to(const dht::partition_range& pr) override; - virtual future<> fast_forward_to(position_range pr) override; - virtual future<> close() noexcept override; -}; - -void multishard_combining_reader_v2::on_partition_range_change(const dht::partition_range& pr) { - _shard_selection_min_heap.clear(); - _shard_selection_min_heap.reserve(_sharder.shard_count()); - - auto token = pr.start() ? pr.start()->value().token() : dht::minimum_token(); - _current_shard = _sharder.shard_of(token); - - auto sharder = dht::ring_position_range_sharder(_sharder, pr); - - auto next = sharder.next(*_schema); - - // The first value of `next` is thrown away, as it is the ring range of the current shard. - // We only want to do a full round, until we get back to the shard we started from (`_current_shard`). - // We stop earlier if the sharder has no ranges for the remaining shards. - for (next = sharder.next(*_schema); next && next->shard != _current_shard; next = sharder.next(*_schema)) { - _shard_selection_min_heap.push_back(shard_and_token{next->shard, next->ring_range.start()->value().token()}); - boost::push_heap(_shard_selection_min_heap); - } -} - -bool multishard_combining_reader_v2::maybe_move_to_next_shard(const dht::token* const t) { - if (_shard_selection_min_heap.empty() || (t && *t < _shard_selection_min_heap.front().token)) { - return false; - } - - boost::pop_heap(_shard_selection_min_heap); - const auto next_shard = _shard_selection_min_heap.back().shard; - _shard_selection_min_heap.pop_back(); - - if (t) { - _shard_selection_min_heap.push_back(shard_and_token{_current_shard, *t}); - boost::push_heap(_shard_selection_min_heap); - } - - _crossed_shards = true; - _current_shard = next_shard; - return true; -} - -future<> multishard_combining_reader_v2::handle_empty_reader_buffer() { - auto& reader = *_shard_readers[_current_shard]; - - if (reader.is_end_of_stream()) { - if (_shard_selection_min_heap.empty()) { - _end_of_stream = true; - } else { - maybe_move_to_next_shard(); - } - return make_ready_future<>(); - } else if (reader.is_read_ahead_in_progress()) { - return reader.fill_buffer(); - } else { - // If we crossed shards and the next reader has an empty buffer we - // double concurrency so the next time we cross shards we will have - // more chances of hitting the reader's buffer. - if (_crossed_shards) { - _concurrency = std::min(_concurrency * 2, _sharder.shard_count()); - - // Read ahead shouldn't change the min selection heap so we work on a local copy. - auto shard_selection_min_heap_copy = _shard_selection_min_heap; - - // If concurrency > 1 we kick-off concurrency-1 read-aheads in the - // background. They will be brought to the foreground when we move - // to their respective shard. - for (unsigned i = 1; i < _concurrency && !shard_selection_min_heap_copy.empty(); ++i) { - boost::pop_heap(shard_selection_min_heap_copy); - const auto next_shard = shard_selection_min_heap_copy.back().shard; - shard_selection_min_heap_copy.pop_back(); - _shard_readers[next_shard]->read_ahead(); - } - } - return reader.fill_buffer(); - } -} - -multishard_combining_reader_v2::multishard_combining_reader_v2( - const dht::sharder& sharder, - shared_ptr lifecycle_policy, - schema_ptr s, - reader_permit permit, - const dht::partition_range& pr, - const query::partition_slice& ps, - const io_priority_class& pc, - tracing::trace_state_ptr trace_state, - mutation_reader::forwarding fwd_mr) - : impl(std::move(s), std::move(permit)), _sharder(sharder) { - - on_partition_range_change(pr); - - _shard_readers.reserve(_sharder.shard_count()); - for (unsigned i = 0; i < _sharder.shard_count(); ++i) { - _shard_readers.emplace_back(std::make_unique(_schema, _permit, lifecycle_policy, i, pr, ps, pc, trace_state, fwd_mr)); - } -} - -future<> multishard_combining_reader_v2::fill_buffer() { - _crossed_shards = false; - return do_until([this] { return is_buffer_full() || is_end_of_stream(); }, [this] { - auto& reader = *_shard_readers[_current_shard]; - - if (reader.is_buffer_empty()) { - return handle_empty_reader_buffer(); - } - - while (!reader.is_buffer_empty() && !is_buffer_full()) { - if (const auto& mf = reader.peek_buffer(); mf.is_partition_start() && maybe_move_to_next_shard(&mf.as_partition_start().key().token())) { - return make_ready_future<>(); - } - push_mutation_fragment(reader.pop_mutation_fragment()); - } - return make_ready_future<>(); - }); -} - -future<> multishard_combining_reader_v2::next_partition() { - clear_buffer_to_next_partition(); - if (is_buffer_empty()) { - return _shard_readers[_current_shard]->next_partition(); - } - return make_ready_future<>(); -} - -future<> multishard_combining_reader_v2::fast_forward_to(const dht::partition_range& pr) { - clear_buffer(); - _end_of_stream = false; - on_partition_range_change(pr); - return parallel_for_each(_shard_readers, [&pr] (std::unique_ptr& sr) { - return sr->fast_forward_to(pr); - }); -} - -future<> multishard_combining_reader_v2::fast_forward_to(position_range pr) { - return make_exception_future<>(make_backtraced_exception_ptr()); -} - -future<> multishard_combining_reader_v2::close() noexcept { - return parallel_for_each(_shard_readers, [] (std::unique_ptr& sr) { - return sr->close(); - }); -} - -flat_mutation_reader_v2 make_multishard_combining_reader_v2( - shared_ptr lifecycle_policy, - schema_ptr schema, - reader_permit permit, - const dht::partition_range& pr, - const query::partition_slice& ps, - const io_priority_class& pc, - tracing::trace_state_ptr trace_state, - mutation_reader::forwarding fwd_mr) { - const dht::sharder& sharder = schema->get_sharder(); - return make_flat_mutation_reader_v2(sharder, std::move(lifecycle_policy), std::move(schema), std::move(permit), pr, ps, pc, - std::move(trace_state), fwd_mr); -} - -flat_mutation_reader_v2 make_multishard_combining_reader_v2_for_tests( - const dht::sharder& sharder, - shared_ptr lifecycle_policy, - schema_ptr schema, - reader_permit permit, - const dht::partition_range& pr, - const query::partition_slice& ps, - const io_priority_class& pc, - tracing::trace_state_ptr trace_state, - mutation_reader::forwarding fwd_mr) { - return make_flat_mutation_reader_v2(sharder, std::move(lifecycle_policy), std::move(schema), std::move(permit), pr, ps, pc, - std::move(trace_state), fwd_mr); -} - class queue_reader final : public flat_mutation_reader::impl { friend class queue_reader_handle; diff --git a/mutation_reader.hh b/mutation_reader.hh index b90b363dd5..6175d05114 100644 --- a/mutation_reader.hh +++ b/mutation_reader.hh @@ -354,203 +354,6 @@ snapshot_source make_empty_snapshot_source(); using mutation_source_opt = optimized_optional; -/// Make a foreign_reader. -/// -/// foreign_reader is a local representant of a reader located on a remote -/// shard. Manages its lifecycle and takes care of seamlessly transferring -/// produced fragments. Fragments are *copied* between the shards, a -/// bufferful at a time. -/// To maximize throughput read-ahead is used. After each fill_buffer() or -/// fast_forward_to() a read-ahead (a fill_buffer() on the remote reader) is -/// issued. This read-ahead runs in the background and is brough back to -/// foreground on the next fill_buffer() or fast_forward_to() call. -/// If the reader resides on this shard (the shard where make_foreign_reader() -/// is called) there is no need to wrap it in foreign_reader, just return it as -/// is. -flat_mutation_reader_v2 make_foreign_reader(schema_ptr schema, - reader_permit permit, - foreign_ptr> reader, - streamed_mutation::forwarding fwd_sm = streamed_mutation::forwarding::no); - -/// Make an auto-paused evictable reader. -/// -/// The reader is paused after each use, that is after each call to any of its -/// members that cause actual reading to be done (`fill_buffer()` and -/// `fast_forward_to()`). When paused, the reader is made evictable, that it is -/// it is registered with reader concurrency semaphore as an inactive read. -/// The reader is resumed automatically on the next use. If it was evicted, it -/// will be recreated at the position it left off reading. This is all -/// transparent to its user. -/// Parameters passed by reference have to be kept alive while the reader is -/// alive. -flat_mutation_reader_v2 make_auto_paused_evictable_reader_v2( - mutation_source ms, - schema_ptr schema, - reader_permit permit, - const dht::partition_range& pr, - const query::partition_slice& ps, - const io_priority_class& pc, - tracing::trace_state_ptr trace_state, - mutation_reader::forwarding fwd_mr); - -class evictable_reader_v2; - -class evictable_reader_handle_v2 { - friend std::pair make_manually_paused_evictable_reader_v2(mutation_source, schema_ptr, reader_permit, - const dht::partition_range&, const query::partition_slice&, const io_priority_class&, tracing::trace_state_ptr, mutation_reader::forwarding); - -private: - evictable_reader_v2* _r; - -private: - explicit evictable_reader_handle_v2(evictable_reader_v2& r); - -public: - void pause(); -}; - -/// Make a manually-paused evictable reader. -/// -/// The reader can be paused via the evictable reader handle when desired. The -/// intended usage is subsequent reads done in bursts, after which the reader is -/// not used for some time. When paused, the reader is made evictable, that is, -/// it is registered with reader concurrency semaphore as an inactive read. -/// The reader is resumed automatically on the next use. If it was evicted, it -/// will be recreated at the position it left off reading. This is all -/// transparent to its user. -/// Parameters passed by reference have to be kept alive while the reader is -/// alive. -std::pair make_manually_paused_evictable_reader_v2( - mutation_source ms, - schema_ptr schema, - reader_permit permit, - const dht::partition_range& pr, - const query::partition_slice& ps, - const io_priority_class& pc, - tracing::trace_state_ptr trace_state, - mutation_reader::forwarding fwd_mr); - -/// Reader lifecycle policy for the mulitshard combining reader. -/// -/// This policy is expected to make sure any additional resource the readers -/// might need is kept alive for the lifetime of the readers, not that -/// of the multishard reader. This is a very important distinction. As -/// destructors cannot return futures, the multishard reader will be -/// destroyed before all it's shard readers could stop properly. Hence it -/// is the duty of this policy to make sure all objects the shard readers -/// depend on stay alive until they are properly destroyed on their home -/// shards. Note that this also includes the passed in `range` and `slice` -/// parameters because although client code is required to keep them alive as -/// long as the top level reader lives, the shard readers might outlive the -/// multishard reader itself. -class reader_lifecycle_policy_v2 { -public: - struct stopped_reader { - reader_concurrency_semaphore::inactive_read_handle handle; - flat_mutation_reader_v2::tracked_buffer unconsumed_fragments; - }; - -public: - /// Create an appropriate reader on the shard it is called on. - /// - /// Will be called when the multishard reader visits a shard for the - /// first time or when a reader has to be recreated after having been - /// evicted (while paused). This method should also enter gates, take locks - /// or whatever is appropriate to make sure resources it is using on the - /// remote shard stay alive, during the lifetime of the created reader. - /// - /// The \c permit parameter shall be obtained via `obtain_reader_permit()` - virtual flat_mutation_reader_v2 create_reader( - schema_ptr schema, - reader_permit permit, - const dht::partition_range& range, - const query::partition_slice& slice, - const io_priority_class& pc, - tracing::trace_state_ptr trace_state, - mutation_reader::forwarding fwd_mr) = 0; - - /// Updates the read-range of the shard reader. - /// - /// Gives the lifecycle-policy a chance to update its stored read-range (if - /// the case). Called after any modification to the read range (typically - /// after fast_forward_to()). The range is identical to the one the reader - /// holds a reference to after the modification happened. When this method - /// is called, it is safe to destroy the previous range instance. - /// - /// This method has to be called on the shard the reader lives on. - virtual void update_read_range(lw_shared_ptr pr) = 0; - - /// Destroy the shard reader. - /// - /// Will be called when the multishard reader is being destroyed. It will be - /// called for each of the shard readers. - /// This method is expected to do a proper cleanup, that is, leave any gates, - /// release any locks or whatever is appropriate for the shard reader. - /// - /// This method has to be called on the shard the reader lives on. - /// This method will be called from a destructor so it cannot throw. - virtual future<> destroy_reader(stopped_reader reader) noexcept = 0; - - /// Get the relevant semaphore for this read. - /// - /// The semaphore is used to register paused readers with as inactive - /// readers. The semaphore then can evict these readers when resources are - /// in-demand. - /// The multishard reader will pause and resume readers via the `pause()` - /// and `try_resume()` helper methods. Clients can resume any paused readers - /// after the multishard reader is destroyed via the same helper methods. - /// - /// This method will be called on the shard where the relevant reader lives. - virtual reader_concurrency_semaphore& semaphore() = 0; - - /// Obtain an admitted permit. - /// - /// The permit will be associated with the semaphore returned by - /// `semaphore()`. - /// - /// This method will be called on the shard where the relevant reader lives. - virtual future obtain_reader_permit(schema_ptr schema, const char* const description, db::timeout_clock::time_point timeout) = 0; -}; - -/// Make a multishard_combining_reader. -/// -/// multishard_combining_reader takes care of reading a range from all shards -/// that own a subrange in the range. Shard reader are created on-demand, when -/// the shard is visited for the first time. -/// -/// The read starts with a concurrency of one, that is the reader reads from a -/// single shard at a time. The concurrency is exponentially increased (to a -/// maximum of the number of shards) when a reader's buffer is empty after -/// moving the next shard. This condition is important as we only wan't to -/// increase concurrency for sparse tables that have little data and the reader -/// has to move between shards often. When concurrency is > 1, the reader -/// issues background read-aheads to the next shards so that by the time it -/// needs to move to them they have the data ready. -/// For dense tables (where we rarely cross shards) we rely on the -/// foreign_reader to issue sufficient read-aheads on its own to avoid blocking. -/// -/// The readers' life-cycles are managed through the supplied lifecycle policy. -flat_mutation_reader_v2 make_multishard_combining_reader_v2( - shared_ptr lifecycle_policy, - schema_ptr schema, - reader_permit permit, - const dht::partition_range& pr, - const query::partition_slice& ps, - const io_priority_class& pc, - tracing::trace_state_ptr trace_state = nullptr, - mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::no); - -flat_mutation_reader_v2 make_multishard_combining_reader_v2_for_tests( - const dht::sharder& sharder, - shared_ptr lifecycle_policy, - schema_ptr schema, - reader_permit permit, - const dht::partition_range& pr, - const query::partition_slice& ps, - const io_priority_class& pc, - tracing::trace_state_ptr trace_state = nullptr, - mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::no); - class queue_reader; /// Calls to different methods cannot overlap! diff --git a/mutation_writer/multishard_writer.cc b/mutation_writer/multishard_writer.cc index d510b232f4..dd9c7cf49f 100644 --- a/mutation_writer/multishard_writer.cc +++ b/mutation_writer/multishard_writer.cc @@ -10,6 +10,7 @@ #include "mutation_reader.hh" #include "mutation_fragment_v2.hh" #include "schema_registry.hh" +#include "readers/foreign.hh" #include #include #include diff --git a/readers/evictable.hh b/readers/evictable.hh new file mode 100644 index 0000000000..21267dc3a6 --- /dev/null +++ b/readers/evictable.hh @@ -0,0 +1,83 @@ +/* + * Copyright (C) 2022-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +#pragma once + +#include "dht/i_partitioner.hh" +#include "readers/flat_mutation_reader_fwd.hh" +#include "schema_fwd.hh" +#include "seastarx.hh" + +namespace seastar { +class io_priority_class; +} + +class reader_permit; +class mutation_source; + +namespace tracing { +class trace_state_ptr; +} + +/// Make an auto-paused evictable reader. +/// +/// The reader is paused after each use, that is after each call to any of its +/// members that cause actual reading to be done (`fill_buffer()` and +/// `fast_forward_to()`). When paused, the reader is made evictable, that it is +/// it is registered with reader concurrency semaphore as an inactive read. +/// The reader is resumed automatically on the next use. If it was evicted, it +/// will be recreated at the position it left off reading. This is all +/// transparent to its user. +/// Parameters passed by reference have to be kept alive while the reader is +/// alive. +flat_mutation_reader_v2 make_auto_paused_evictable_reader_v2( + mutation_source ms, + schema_ptr schema, + reader_permit permit, + const dht::partition_range& pr, + const query::partition_slice& ps, + const io_priority_class& pc, + tracing::trace_state_ptr trace_state, + mutation_reader::forwarding fwd_mr); + +class evictable_reader_v2; + +class evictable_reader_handle_v2 { + friend std::pair make_manually_paused_evictable_reader_v2(mutation_source, schema_ptr, reader_permit, + const dht::partition_range&, const query::partition_slice&, const io_priority_class&, tracing::trace_state_ptr, mutation_reader::forwarding); + +private: + evictable_reader_v2* _r; + +private: + explicit evictable_reader_handle_v2(evictable_reader_v2& r); + +public: + void pause(); +}; + +/// Make a manually-paused evictable reader. +/// +/// The reader can be paused via the evictable reader handle when desired. The +/// intended usage is subsequent reads done in bursts, after which the reader is +/// not used for some time. When paused, the reader is made evictable, that is, +/// it is registered with reader concurrency semaphore as an inactive read. +/// The reader is resumed automatically on the next use. If it was evicted, it +/// will be recreated at the position it left off reading. This is all +/// transparent to its user. +/// Parameters passed by reference have to be kept alive while the reader is +/// alive. +std::pair make_manually_paused_evictable_reader_v2( + mutation_source ms, + schema_ptr schema, + reader_permit permit, + const dht::partition_range& pr, + const query::partition_slice& ps, + const io_priority_class& pc, + tracing::trace_state_ptr trace_state, + mutation_reader::forwarding fwd_mr); diff --git a/readers/foreign.hh b/readers/foreign.hh new file mode 100644 index 0000000000..3b1a03a0fb --- /dev/null +++ b/readers/foreign.hh @@ -0,0 +1,34 @@ +/* + * Copyright (C) 2022-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +#pragma once + +#include + +#include "readers/flat_mutation_reader_fwd.hh" +#include "schema_fwd.hh" + +class reader_permit; + +/// Make a foreign_reader. +/// +/// foreign_reader is a local representant of a reader located on a remote +/// shard. Manages its lifecycle and takes care of seamlessly transferring +/// produced fragments. Fragments are *copied* between the shards, a +/// bufferful at a time. +/// To maximize throughput read-ahead is used. After each fill_buffer() or +/// fast_forward_to() a read-ahead (a fill_buffer() on the remote reader) is +/// issued. This read-ahead runs in the background and is brough back to +/// foreground on the next fill_buffer() or fast_forward_to() call. +/// If the reader resides on this shard (the shard where make_foreign_reader() +/// is called) there is no need to wrap it in foreign_reader, just return it as +/// is. +flat_mutation_reader_v2 make_foreign_reader(schema_ptr schema, + reader_permit permit, + foreign_ptr> reader, + streamed_mutation::forwarding fwd_sm = streamed_mutation::forwarding::no); diff --git a/readers/multishard.cc b/readers/multishard.cc new file mode 100644 index 0000000000..4cf1da1cf9 --- /dev/null +++ b/readers/multishard.cc @@ -0,0 +1,1144 @@ +/* + * Copyright (C) 2022-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +#include +#include +#include + +#include "dht/sharder.hh" +#include "mutation_reader.hh" +#include "readers/empty_v2.hh" +#include "readers/evictable.hh" +#include "readers/flat_mutation_reader_v2.hh" +#include "readers/foreign.hh" +#include "readers/multishard.hh" +#include "schema_registry.hh" + +extern logger mrlog; + +namespace { + +struct remote_fill_buffer_result_v2 { + foreign_ptr> buffer; + bool end_of_stream = false; + + remote_fill_buffer_result_v2() = default; + remote_fill_buffer_result_v2(flat_mutation_reader_v2::tracked_buffer&& buffer, bool end_of_stream) + : buffer(make_foreign(std::make_unique(std::move(buffer)))) + , end_of_stream(end_of_stream) { + } +}; + +} + +/// See make_foreign_reader() for description. +class foreign_reader : public flat_mutation_reader_v2::impl { + template + using foreign_unique_ptr = foreign_ptr>; + + using fragment_buffer = flat_mutation_reader_v2::tracked_buffer; + + foreign_unique_ptr _reader; + foreign_unique_ptr> _read_ahead_future; + streamed_mutation::forwarding _fwd_sm; + + // Forward an operation to the reader on the remote shard. + // If the remote reader has an ongoing read-ahead, bring it to the + // foreground (wait on it) and execute the operation after. + // After the operation completes, kick off a new read-ahead (fill_buffer()) + // and move it to the background (save it's future but don't wait on it + // now). If all works well read-aheads complete by the next operation and + // we don't have to wait on the remote reader filling its buffer. + template >> + Result forward_operation(Operation op) { + reader_permit::blocked_guard bg{_permit}; + return smp::submit_to(_reader.get_owner_shard(), [reader = _reader.get(), + read_ahead_future = std::exchange(_read_ahead_future, nullptr), + op = std::move(op)] () mutable { + auto exec_op_and_read_ahead = [=] () mutable { + // Not really variadic, we expect 0 (void) or 1 parameter. + return op().then([=] (auto... result) { + auto f = reader->is_end_of_stream() ? nullptr : std::make_unique>(reader->fill_buffer()); + return make_ready_future>, decltype(result)...>>( + std::tuple(make_foreign(std::move(f)), std::move(result)...)); + }); + }; + if (read_ahead_future) { + return read_ahead_future->then(std::move(exec_op_and_read_ahead)); + } else { + return exec_op_and_read_ahead(); + } + }).then([this] (auto fut_and_result) { + _read_ahead_future = std::get<0>(std::move(fut_and_result)); + static_assert(std::tuple_size::value <= 2); + if constexpr (std::tuple_size::value == 1) { + return make_ready_future<>(); + } else { + auto result = std::get<1>(std::move(fut_and_result)); + return make_ready_future(std::move(result)); + } + }).finally([bg = std::move(bg)] { }); + } +public: + foreign_reader(schema_ptr schema, + reader_permit permit, + foreign_unique_ptr reader, + streamed_mutation::forwarding fwd_sm = streamed_mutation::forwarding::no); + + // this is captured. + foreign_reader(const foreign_reader&) = delete; + foreign_reader& operator=(const foreign_reader&) = delete; + foreign_reader(foreign_reader&&) = delete; + foreign_reader& operator=(foreign_reader&&) = delete; + + virtual future<> fill_buffer() override; + virtual future<> next_partition() override; + virtual future<> fast_forward_to(const dht::partition_range& pr) override; + virtual future<> fast_forward_to(position_range pr) override; + virtual future<> close() noexcept override; +}; + +foreign_reader::foreign_reader(schema_ptr schema, + reader_permit permit, + foreign_unique_ptr reader, + streamed_mutation::forwarding fwd_sm) + : impl(std::move(schema), std::move(permit)) + , _reader(std::move(reader)) + , _fwd_sm(fwd_sm) { +} + +future<> foreign_reader::fill_buffer() { + if (_end_of_stream || is_buffer_full()) { + return make_ready_future(); + } + + return forward_operation([reader = _reader.get()] () { + auto f = reader->is_buffer_empty() ? reader->fill_buffer() : make_ready_future<>(); + return f.then([=] { + return make_ready_future(remote_fill_buffer_result_v2(reader->detach_buffer(), reader->is_end_of_stream())); + }); + }).then([this] (remote_fill_buffer_result_v2 res) mutable { + _end_of_stream = res.end_of_stream; + for (const auto& mf : *res.buffer) { + // Need a copy since the mf is on the remote shard. + push_mutation_fragment(mutation_fragment_v2(*_schema, _permit, mf)); + } + }); +} + +future<> foreign_reader::next_partition() { + if (_fwd_sm == streamed_mutation::forwarding::yes) { + clear_buffer(); + _end_of_stream = false; + } else { + clear_buffer_to_next_partition(); + if (!is_buffer_empty()) { + co_return; + } + _end_of_stream = false; + } + co_await forward_operation([reader = _reader.get()] () { + return reader->next_partition(); + }); +} + +future<> foreign_reader::fast_forward_to(const dht::partition_range& pr) { + clear_buffer(); + _end_of_stream = false; + return forward_operation([reader = _reader.get(), &pr] () { + return reader->fast_forward_to(pr); + }); +} + +future<> foreign_reader::fast_forward_to(position_range pr) { + forward_buffer_to(pr.start()); + _end_of_stream = false; + return forward_operation([reader = _reader.get(), pr = std::move(pr)] () { + return reader->fast_forward_to(std::move(pr)); + }); +} + +future<> foreign_reader::close() noexcept { + if (!_reader) { + if (_read_ahead_future) { + on_internal_error_noexcept(mrlog, "foreign_reader::close can't wait on read_ahead future with disengaged reader"); + } + return make_ready_future<>(); + } + return smp::submit_to(_reader.get_owner_shard(), + [reader = std::move(_reader), read_ahead_future = std::exchange(_read_ahead_future, nullptr)] () mutable { + auto read_ahead = read_ahead_future ? std::move(*read_ahead_future.get()) : make_ready_future<>(); + return read_ahead.then_wrapped([reader = std::move(reader)] (future<> f) mutable { + if (f.failed()) { + auto ex = f.get_exception(); + mrlog.warn("foreign_reader: benign read_ahead failure during close: {}. Ignoring.", ex); + } + return reader->close(); + }); + }); +} + +flat_mutation_reader_v2 make_foreign_reader(schema_ptr schema, + reader_permit permit, + foreign_ptr> reader, + streamed_mutation::forwarding fwd_sm) { + if (reader.get_owner_shard() == this_shard_id()) { + return std::move(*reader); + } + return make_flat_mutation_reader_v2(std::move(schema), std::move(permit), std::move(reader), fwd_sm); +} + +template +static void require(bool condition, const char* msg, const Arg&... arg) { + if (!condition) { + on_internal_error(mrlog, format(msg, arg...)); + } +} + +// Encapsulates all data and logic that is local to the remote shard the +// reader lives on. +class evictable_reader_v2 : public flat_mutation_reader_v2::impl { +public: + using auto_pause = bool_class; + +private: + auto_pause _auto_pause; + mutation_source _ms; + const dht::partition_range* _pr; + const query::partition_slice& _ps; + const io_priority_class& _pc; + tracing::global_trace_state_ptr _trace_state; + const mutation_reader::forwarding _fwd_mr; + reader_concurrency_semaphore::inactive_read_handle _irh; + bool _reader_recreated = false; // set if reader was recreated since last operation + position_in_partition::tri_compare _tri_cmp; + + std::optional _last_pkey; + position_in_partition _next_position_in_partition = position_in_partition::for_partition_start(); + // These are used when the reader has to be recreated (after having been + // evicted while paused) and the range and/or slice it is recreated with + // differs from the original ones. + std::optional _range_override; + std::optional _slice_override; + + flat_mutation_reader_v2_opt _reader; + +private: + void do_pause(flat_mutation_reader_v2 reader); + void maybe_pause(flat_mutation_reader_v2 reader); + flat_mutation_reader_v2_opt try_resume(); + void update_next_position(); + void adjust_partition_slice(); + flat_mutation_reader_v2 recreate_reader(); + future resume_or_create_reader(); + void validate_partition_start(const partition_start& ps); + void validate_position_in_partition(position_in_partition_view pos) const; + void examine_first_fragments(mutation_fragment_v2_opt& mf1, mutation_fragment_v2_opt& mf2, mutation_fragment_v2_opt& mf3); + +public: + evictable_reader_v2( + auto_pause ap, + mutation_source ms, + schema_ptr schema, + reader_permit permit, + const dht::partition_range& pr, + const query::partition_slice& ps, + const io_priority_class& pc, + tracing::trace_state_ptr trace_state, + mutation_reader::forwarding fwd_mr); + virtual future<> fill_buffer() override; + virtual future<> next_partition() override; + virtual future<> fast_forward_to(const dht::partition_range& pr) override; + virtual future<> fast_forward_to(position_range) override { + throw_with_backtrace(); + } + virtual future<> close() noexcept override { + if (_reader) { + return _reader->close(); + } + if (auto reader_opt = try_resume()) { + return reader_opt->close(); + } + return make_ready_future<>(); + } + reader_concurrency_semaphore::inactive_read_handle inactive_read_handle() && { + return std::move(_irh); + } + void pause() { + if (_reader) { + do_pause(std::move(*_reader)); + } + } + reader_permit permit() { + return _permit; + } +}; + +void evictable_reader_v2::do_pause(flat_mutation_reader_v2 reader) { + assert(!_irh); + _irh = _permit.semaphore().register_inactive_read(std::move(reader)); +} + +void evictable_reader_v2::maybe_pause(flat_mutation_reader_v2 reader) { + if (_auto_pause) { + do_pause(std::move(reader)); + } else { + _reader = std::move(reader); + } +} + +flat_mutation_reader_v2_opt evictable_reader_v2::try_resume() { + if (auto reader_opt = _permit.semaphore().unregister_inactive_read(std::move(_irh))) { + return std::move(*reader_opt); + } + return {}; +} + +void evictable_reader_v2::update_next_position() { + if (is_buffer_empty()) { + return; + } + + auto rbegin = std::reverse_iterator(buffer().end()); + auto rend = std::reverse_iterator(buffer().begin()); + if (auto pk_it = std::find_if(rbegin, rend, std::mem_fn(&mutation_fragment_v2::is_partition_start)); pk_it != rend) { + _last_pkey = pk_it->as_partition_start().key(); + } + + const auto last_pos = buffer().back().position(); + switch (last_pos.region()) { + case partition_region::partition_start: + _next_position_in_partition = position_in_partition::for_static_row(); + break; + case partition_region::static_row: + _next_position_in_partition = position_in_partition::before_all_clustered_rows(); + break; + case partition_region::clustered: + if (!_reader->is_buffer_empty() && _reader->peek_buffer().is_end_of_partition()) { + push_mutation_fragment(_reader->pop_mutation_fragment()); + _next_position_in_partition = position_in_partition::for_partition_start(); + } else { + _next_position_in_partition = position_in_partition::after_key(last_pos); + } + break; + case partition_region::partition_end: + _next_position_in_partition = position_in_partition::for_partition_start(); + break; + } +} + +void evictable_reader_v2::adjust_partition_slice() { + const auto reversed = _ps.options.contains(query::partition_slice::option::reversed); + _slice_override = reversed ? query::legacy_reverse_slice_to_native_reverse_slice(*_schema, _ps) : _ps; + + auto ranges = _slice_override->default_row_ranges(); + query::trim_clustering_row_ranges_to(*_schema, ranges, _next_position_in_partition); + + _slice_override->clear_ranges(); + _slice_override->set_range(*_schema, _last_pkey->key(), std::move(ranges)); + + if (reversed) { + _slice_override = query::native_reverse_slice_to_legacy_reverse_slice(*_schema, std::move(*_slice_override)); + } +} + +flat_mutation_reader_v2 evictable_reader_v2::recreate_reader() { + const dht::partition_range* range = _pr; + const query::partition_slice* slice = &_ps; + + _range_override.reset(); + _slice_override.reset(); + + if (_last_pkey) { + bool partition_range_is_inclusive = true; + + switch (_next_position_in_partition.region()) { + case partition_region::partition_start: + partition_range_is_inclusive = false; + break; + case partition_region::static_row: + break; + case partition_region::clustered: + adjust_partition_slice(); + slice = &*_slice_override; + break; + case partition_region::partition_end: + partition_range_is_inclusive = false; + break; + } + + // The original range contained a single partition and we've read it + // all. We'd have to create a reader with an empty range that would + // immediately be at EOS. This is not possible so just create an empty + // reader instead. + // This should be extremely rare (who'd create a multishard reader to + // read a single partition) but still, let's make sure we handle it + // correctly. + if (_pr->is_singular() && !partition_range_is_inclusive) { + return make_empty_flat_reader_v2(_schema, _permit); + } + + _range_override = dht::partition_range({dht::partition_range::bound(*_last_pkey, partition_range_is_inclusive)}, _pr->end()); + range = &*_range_override; + + _reader_recreated = true; + } + + return _ms.make_reader_v2( + _schema, + _permit, + *range, + *slice, + _pc, + _trace_state, + streamed_mutation::forwarding::no, + _fwd_mr); +} + +future evictable_reader_v2::resume_or_create_reader() { + if (_reader) { + co_return std::move(*_reader); + } + if (auto reader_opt = try_resume()) { + co_return std::move(*reader_opt); + } + // When the reader is created the first time and we are actually resuming a + // saved reader in `recreate_reader()`, we have two cases here: + // * the reader is still alive (in inactive state) + // * the reader was evicted + // We check for this below with `needs_readmission()` and it is very + // important to not allow for preemption between said check and + // `recreate_reader()`, otherwise the reader might be evicted between the + // check and `recreate_reader()` and the latter will recreate it without + // waiting for re-admission. + if (_permit.needs_readmission()) { + co_await _permit.wait_readmission(); + } + co_return recreate_reader(); +} + +void evictable_reader_v2::validate_partition_start(const partition_start& ps) { + const auto tri_cmp = dht::ring_position_comparator(*_schema); + // If we recreated the reader after fast-forwarding it we won't have + // _last_pkey set. In this case it is enough to check if the partition + // is in range. + if (_last_pkey) { + const auto cmp_res = tri_cmp(*_last_pkey, ps.key()); + if (_next_position_in_partition.region() != partition_region::partition_start) { // we expect to continue from the same partition + // We cannot assume the partition we stopped the read at is still alive + // when we recreate the reader. It might have been compacted away in the + // meanwhile, so allow for a larger partition too. + require( + cmp_res <= 0, + "{}(): validation failed, expected partition with key larger or equal to _last_pkey {}, but got {}", + __FUNCTION__, + *_last_pkey, + ps.key()); + // Reset next pos if we are not continuing from the same partition + if (cmp_res < 0) { + // Close previous partition, we are not going to continue it. + push_mutation_fragment(*_schema, _permit, partition_end{}); + _next_position_in_partition = position_in_partition::for_partition_start(); + } + } else { // should be a larger partition + require( + cmp_res < 0, + "{}(): validation failed, expected partition with key larger than _last_pkey {}, but got {}", + __FUNCTION__, + *_last_pkey, + ps.key()); + } + } + const auto& prange = _range_override ? *_range_override : *_pr; + require( + // TODO: somehow avoid this copy + prange.contains(ps.key(), tri_cmp), + "{}(): validation failed, expected partition with key that falls into current range {}, but got {}", + __FUNCTION__, + prange, + ps.key()); +} + +void evictable_reader_v2::validate_position_in_partition(position_in_partition_view pos) const { + require( + _tri_cmp(_next_position_in_partition, pos) <= 0, + "{}(): validation failed, expected position in partition that is larger-than-equal than _next_position_in_partition {}, but got {}", + __FUNCTION__, + _next_position_in_partition, + pos); + + if (_slice_override && pos.region() == partition_region::clustered) { + const auto reversed = _ps.options.contains(query::partition_slice::option::reversed); + std::optional native_slice; + if (reversed) { + native_slice = query::legacy_reverse_slice_to_native_reverse_slice(*_schema, *_slice_override); + } + auto& slice = reversed ? *native_slice : *_slice_override; + + const auto ranges = slice.row_ranges(*_schema, _last_pkey->key()); + const bool any_contains = std::any_of(ranges.begin(), ranges.end(), [this, &pos] (const query::clustering_range& cr) { + // TODO: somehow avoid this copy + auto range = position_range(cr); + // We cannot use range.contains() because that treats range as a + // [a, b) range, meaning a range tombstone change with position + // after_key(b) will be considered outside of it. Such range + // tombstone changes can be emitted however when recreating the + // reader on clustering range edge. + return _tri_cmp(range.start(), pos) <= 0 && _tri_cmp(pos, range.end()) <= 0; + }); + require( + any_contains, + "{}(): validation failed, expected clustering fragment that is included in the slice {}, but got {}", + __FUNCTION__, + slice, + pos); + } +} + +void evictable_reader_v2::examine_first_fragments(mutation_fragment_v2_opt& mf1, mutation_fragment_v2_opt& mf2, mutation_fragment_v2_opt& mf3) { + if (!mf1) { + return; // the reader is at EOS + } + + // If engaged, the first fragment is always a partition-start. + validate_partition_start(mf1->as_partition_start()); + if (_tri_cmp(mf1->position(), _next_position_in_partition) < 0) { + mf1 = {}; // drop mf1 + } + + const auto continue_same_partition = _next_position_in_partition.region() != partition_region::partition_start; + + // If we have a first fragment, we are guaranteed to have a second one -- if not else, a partition-end. + if (mf2->is_end_of_partition()) { + return; // no further fragments, nothing to do + } + + // We want to validate the position of the first non-dropped fragment. + // If mf2 is a static row and we need to drop it, this will be mf3. + if (mf2->is_static_row() && _tri_cmp(mf2->position(), _next_position_in_partition) < 0) { + mf2 = {}; // drop mf2 + } else { + if (continue_same_partition) { + validate_position_in_partition(mf2->position()); + } + return; + } + + if (mf3->is_end_of_partition()) { + return; // no further fragments, nothing to do + } else if (continue_same_partition) { + validate_position_in_partition(mf3->position()); + } +} + +evictable_reader_v2::evictable_reader_v2( + auto_pause ap, + mutation_source ms, + schema_ptr schema, + reader_permit permit, + const dht::partition_range& pr, + const query::partition_slice& ps, + const io_priority_class& pc, + tracing::trace_state_ptr trace_state, + mutation_reader::forwarding fwd_mr) + : impl(std::move(schema), std::move(permit)) + , _auto_pause(ap) + , _ms(std::move(ms)) + , _pr(&pr) + , _ps(ps) + , _pc(pc) + , _trace_state(std::move(trace_state)) + , _fwd_mr(fwd_mr) + , _tri_cmp(*_schema) { +} + +future<> evictable_reader_v2::fill_buffer() { + if (is_end_of_stream()) { + co_return; + } + _reader = co_await resume_or_create_reader(); + + if (_reader_recreated) { + // Recreating the reader breaks snapshot isolation and creates all sorts + // of complications around the continuity of range tombstone changes, + // e.g. a range tombstone started by the previous reader object + // might not exist anymore with the new reader object. + // To avoid complications we reset the tombstone state on each reader + // recreation by emitting a null tombstone change, if we read at least + // one clustering fragment from the partition. + if (_next_position_in_partition.region() == partition_region::clustered + && _tri_cmp(_next_position_in_partition, position_in_partition::before_all_clustered_rows()) > 0) { + push_mutation_fragment(*_schema, _permit, range_tombstone_change{position_in_partition_view::before_key(_next_position_in_partition), {}}); + } + auto mf1 = co_await (*_reader)(); + auto mf2 = co_await (*_reader)(); + auto mf3 = co_await (*_reader)(); + examine_first_fragments(mf1, mf2, mf3); + if (mf3) { + _reader->unpop_mutation_fragment(std::move(*mf3)); + } + if (mf2) { + _reader->unpop_mutation_fragment(std::move(*mf2)); + } + if (mf1) { + _reader->unpop_mutation_fragment(std::move(*mf1)); + } + _reader_recreated = false; + } else { + co_await _reader->fill_buffer(); + } + + _reader->move_buffer_content_to(*this); + + // Ensure that each buffer represents forward progress. Only a concern when + // the last fragment in the buffer is range tombstone change. In this case + // ensure that: + // * buffer().back().position() > _next_position_in_partition; + // * _reader.peek()->position() > buffer().back().position(); + if (!is_buffer_empty() && buffer().back().is_range_tombstone_change()) { + auto* next_mf = co_await _reader->peek(); + + // First make sure we've made progress w.r.t. _next_position_in_partition. + while (next_mf && _tri_cmp(_next_position_in_partition, buffer().back().position()) <= 0) { + push_mutation_fragment(_reader->pop_mutation_fragment()); + next_mf = co_await _reader->peek(); + } + + const auto last_pos = position_in_partition(buffer().back().position()); + while (next_mf && _tri_cmp(last_pos, next_mf->position()) == 0) { + push_mutation_fragment(_reader->pop_mutation_fragment()); + next_mf = co_await _reader->peek(); + } + } + + update_next_position(); + _end_of_stream = _reader->is_end_of_stream(); + maybe_pause(std::move(*_reader)); +} + +future<> evictable_reader_v2::next_partition() { + _next_position_in_partition = position_in_partition::for_partition_start(); + clear_buffer_to_next_partition(); + if (!is_buffer_empty()) { + co_return; + } + auto reader = co_await resume_or_create_reader(); + co_await reader.next_partition(); + maybe_pause(std::move(reader)); +} + +future<> evictable_reader_v2::fast_forward_to(const dht::partition_range& pr) { + _pr = ≺ + _last_pkey.reset(); + _next_position_in_partition = position_in_partition::for_partition_start(); + clear_buffer(); + _end_of_stream = false; + + if (_reader) { + co_await _reader->fast_forward_to(pr); + _range_override.reset(); + co_return; + } + if (auto reader_opt = try_resume()) { + co_await reader_opt->fast_forward_to(pr); + _range_override.reset(); + maybe_pause(std::move(*reader_opt)); + } +} + +evictable_reader_handle_v2::evictable_reader_handle_v2(evictable_reader_v2& r) : _r(&r) +{ } + +void evictable_reader_handle_v2::evictable_reader_handle_v2::pause() { + _r->pause(); +} + +flat_mutation_reader_v2 make_auto_paused_evictable_reader_v2( + mutation_source ms, + schema_ptr schema, + reader_permit permit, + const dht::partition_range& pr, + const query::partition_slice& ps, + const io_priority_class& pc, + tracing::trace_state_ptr trace_state, + mutation_reader::forwarding fwd_mr) { + return make_flat_mutation_reader_v2(evictable_reader_v2::auto_pause::yes, std::move(ms), std::move(schema), std::move(permit), pr, ps, + pc, std::move(trace_state), fwd_mr); +} + +std::pair make_manually_paused_evictable_reader_v2( + mutation_source ms, + schema_ptr schema, + reader_permit permit, + const dht::partition_range& pr, + const query::partition_slice& ps, + const io_priority_class& pc, + tracing::trace_state_ptr trace_state, + mutation_reader::forwarding fwd_mr) { + auto reader = std::make_unique(evictable_reader_v2::auto_pause::no, std::move(ms), std::move(schema), std::move(permit), pr, ps, + pc, std::move(trace_state), fwd_mr); + auto handle = evictable_reader_handle_v2(*reader.get()); + return std::pair(flat_mutation_reader_v2(std::move(reader)), handle); +} + +namespace { + +// A special-purpose shard reader. +// +// Shard reader manages a reader located on a remote shard. It transparently +// supports read-ahead (background fill_buffer() calls). +// This reader is not for general use, it was designed to serve the +// multishard_combining_reader. +// Although it implements the flat_mutation_reader_v2:impl interface it cannot be +// wrapped into a flat_mutation_reader_v2, as it needs to be managed by a shared +// pointer. +class shard_reader_v2 : public flat_mutation_reader_v2::impl { +private: + shared_ptr _lifecycle_policy; + const unsigned _shard; + foreign_ptr> _pr; + const query::partition_slice& _ps; + const io_priority_class& _pc; + tracing::global_trace_state_ptr _trace_state; + const mutation_reader::forwarding _fwd_mr; + std::optional> _read_ahead; + foreign_ptr> _reader; + +private: + future<> do_fill_buffer(); + +public: + shard_reader_v2( + schema_ptr schema, + reader_permit permit, + shared_ptr lifecycle_policy, + unsigned shard, + const dht::partition_range& pr, + const query::partition_slice& ps, + const io_priority_class& pc, + tracing::trace_state_ptr trace_state, + mutation_reader::forwarding fwd_mr) + : impl(std::move(schema), std::move(permit)) + , _lifecycle_policy(std::move(lifecycle_policy)) + , _shard(shard) + , _pr(make_foreign(make_lw_shared(pr))) + , _ps(ps) + , _pc(pc) + , _trace_state(std::move(trace_state)) + , _fwd_mr(fwd_mr) { + } + + shard_reader_v2(shard_reader_v2&&) = delete; + shard_reader_v2& operator=(shard_reader_v2&&) = delete; + + shard_reader_v2(const shard_reader_v2&) = delete; + shard_reader_v2& operator=(const shard_reader_v2&) = delete; + + const mutation_fragment_v2& peek_buffer() const { + return buffer().front(); + } + virtual future<> fill_buffer() override; + virtual future<> next_partition() override; + virtual future<> fast_forward_to(const dht::partition_range& pr) override; + virtual future<> fast_forward_to(position_range) override; + virtual future<> close() noexcept override; + bool done() const { + return _reader && is_buffer_empty() && is_end_of_stream(); + } + void read_ahead(); + bool is_read_ahead_in_progress() const { + return _read_ahead.has_value(); + } +}; + +future<> shard_reader_v2::close() noexcept { + if (_read_ahead) { + try { + co_await *std::exchange(_read_ahead, std::nullopt); + } catch (...) { + mrlog.warn("shard_reader::close(): read_ahead on shard {} failed: {}", _shard, std::current_exception()); + } + } + + try { + co_await smp::submit_to(_shard, [this] { + if (!_reader) { + return make_ready_future<>(); + } + + auto irh = std::move(*_reader).inactive_read_handle(); + return with_closeable(flat_mutation_reader_v2(_reader.release()), [this] (flat_mutation_reader_v2& reader) mutable { + auto permit = reader.permit(); + const auto& schema = *reader.schema(); + + auto unconsumed_fragments = reader.detach_buffer(); + auto rit = std::reverse_iterator(buffer().cend()); + auto rend = std::reverse_iterator(buffer().cbegin()); + for (; rit != rend; ++rit) { + unconsumed_fragments.emplace_front(schema, permit, *rit); // we are copying from the remote shard. + } + + return unconsumed_fragments; + }).then([this, irh = std::move(irh)] (flat_mutation_reader_v2::tracked_buffer&& buf) mutable { + return _lifecycle_policy->destroy_reader({std::move(irh), std::move(buf)}); + }); + }); + } catch (...) { + mrlog.error("shard_reader::close(): failed to stop reader on shard {}: {}", _shard, std::current_exception()); + } +} + +future<> shard_reader_v2::do_fill_buffer() { + auto fill_buf_fut = make_ready_future(); + + struct reader_and_buffer_fill_result { + foreign_ptr> reader; + remote_fill_buffer_result_v2 result; + }; + + if (!_reader) { + fill_buf_fut = smp::submit_to(_shard, [this, gs = global_schema_ptr(_schema)] () -> future { + auto ms = mutation_source([lifecycle_policy = _lifecycle_policy.get()] ( + schema_ptr s, + reader_permit permit, + const dht::partition_range& pr, + const query::partition_slice& ps, + const io_priority_class& pc, + tracing::trace_state_ptr ts, + streamed_mutation::forwarding, + mutation_reader::forwarding fwd_mr) { + return lifecycle_policy->create_reader(std::move(s), std::move(permit), pr, ps, pc, std::move(ts), fwd_mr); + }); + auto s = gs.get(); + auto permit = co_await _lifecycle_policy->obtain_reader_permit(s, "shard-reader", timeout()); + auto rreader = make_foreign(std::make_unique(evictable_reader_v2::auto_pause::yes, std::move(ms), + s, std::move(permit), *_pr, _ps, _pc, _trace_state, _fwd_mr)); + + std::exception_ptr ex; + try { + tracing::trace(_trace_state, "Creating shard reader on shard: {}", this_shard_id()); + reader_permit::used_guard ug{rreader->permit()}; + co_await rreader->fill_buffer(); + auto res = remote_fill_buffer_result_v2(rreader->detach_buffer(), rreader->is_end_of_stream()); + co_return reader_and_buffer_fill_result{std::move(rreader), std::move(res)}; + } catch (...) { + ex = std::current_exception(); + } + co_await rreader->close(); + std::rethrow_exception(std::move(ex)); + }).then([this] (reader_and_buffer_fill_result res) { + _reader = std::move(res.reader); + return std::move(res.result); + }); + } else { + fill_buf_fut = smp::submit_to(_shard, [this] () mutable { + reader_permit::used_guard ug{_reader->permit()}; + return _reader->fill_buffer().then([this, ug = std::move(ug)] { + return remote_fill_buffer_result_v2(_reader->detach_buffer(), _reader->is_end_of_stream()); + }); + }); + } + + return fill_buf_fut.then([this] (remote_fill_buffer_result_v2 res) mutable { + _end_of_stream = res.end_of_stream; + for (const auto& mf : *res.buffer) { + push_mutation_fragment(mutation_fragment_v2(*_schema, _permit, mf)); + } + }); +} + +future<> shard_reader_v2::fill_buffer() { + // FIXME: want to move this to the inner scopes but it makes clang miscompile the code. + reader_permit::blocked_guard guard(_permit); + if (_read_ahead) { + co_await *std::exchange(_read_ahead, std::nullopt); + co_return; + } + if (!is_buffer_empty()) { + co_return; + } + co_await do_fill_buffer(); +} + +future<> shard_reader_v2::next_partition() { + if (!_reader) { + co_return; + } + + // FIXME: want to move this to the inner scopes but it makes clang miscompile the code. + reader_permit::blocked_guard guard(_permit); + + if (_read_ahead) { + co_await *std::exchange(_read_ahead, std::nullopt); + } + clear_buffer_to_next_partition(); + if (!is_buffer_empty()) { + co_return; + } + co_return co_await smp::submit_to(_shard, [this] { + return _reader->next_partition(); + }); +} + +future<> shard_reader_v2::fast_forward_to(const dht::partition_range& pr) { + if (!_reader && !_read_ahead) { + // No need to fast-forward uncreated readers, they will be passed the new + // range when created. + _pr = make_foreign(make_lw_shared(pr)); + co_return; + } + + reader_permit::blocked_guard guard(_permit); + + if (_read_ahead) { + co_await *std::exchange(_read_ahead, std::nullopt); + } + _end_of_stream = false; + clear_buffer(); + + _pr = co_await smp::submit_to(_shard, [this, &pr] () -> future>> { + auto new_pr = make_lw_shared(pr); + co_await _reader->fast_forward_to(*new_pr); + _lifecycle_policy->update_read_range(new_pr); + co_return make_foreign(std::move(new_pr)); + }); +} + +future<> shard_reader_v2::fast_forward_to(position_range) { + return make_exception_future<>(make_backtraced_exception_ptr()); +} + +void shard_reader_v2::read_ahead() { + if (_read_ahead || is_end_of_stream() || !is_buffer_empty()) { + return; + } + + _read_ahead.emplace(do_fill_buffer()); +} + +} // anonymous namespace + +// See make_multishard_combining_reader() for description. +class multishard_combining_reader_v2 : public flat_mutation_reader_v2::impl { + struct shard_and_token { + shard_id shard; + dht::token token; + + bool operator<(const shard_and_token& o) const { + // Reversed, as we want a min-heap. + return token > o.token; + } + }; + + const dht::sharder& _sharder; + std::vector> _shard_readers; + // Contains the position of each shard with token granularity, organized + // into a min-heap. Used to select the shard with the smallest token each + // time a shard reader produces a new partition. + std::vector _shard_selection_min_heap; + unsigned _current_shard; + bool _crossed_shards; + unsigned _concurrency = 1; + + void on_partition_range_change(const dht::partition_range& pr); + bool maybe_move_to_next_shard(const dht::token* const t = nullptr); + future<> handle_empty_reader_buffer(); + +public: + multishard_combining_reader_v2( + const dht::sharder& sharder, + shared_ptr lifecycle_policy, + schema_ptr s, + reader_permit permit, + const dht::partition_range& pr, + const query::partition_slice& ps, + const io_priority_class& pc, + tracing::trace_state_ptr trace_state, + mutation_reader::forwarding fwd_mr); + + // this is captured. + multishard_combining_reader_v2(const multishard_combining_reader_v2&) = delete; + multishard_combining_reader_v2& operator=(const multishard_combining_reader_v2&) = delete; + multishard_combining_reader_v2(multishard_combining_reader_v2&&) = delete; + multishard_combining_reader_v2& operator=(multishard_combining_reader_v2&&) = delete; + + virtual future<> fill_buffer() override; + virtual future<> next_partition() override; + virtual future<> fast_forward_to(const dht::partition_range& pr) override; + virtual future<> fast_forward_to(position_range pr) override; + virtual future<> close() noexcept override; +}; + +void multishard_combining_reader_v2::on_partition_range_change(const dht::partition_range& pr) { + _shard_selection_min_heap.clear(); + _shard_selection_min_heap.reserve(_sharder.shard_count()); + + auto token = pr.start() ? pr.start()->value().token() : dht::minimum_token(); + _current_shard = _sharder.shard_of(token); + + auto sharder = dht::ring_position_range_sharder(_sharder, pr); + + auto next = sharder.next(*_schema); + + // The first value of `next` is thrown away, as it is the ring range of the current shard. + // We only want to do a full round, until we get back to the shard we started from (`_current_shard`). + // We stop earlier if the sharder has no ranges for the remaining shards. + for (next = sharder.next(*_schema); next && next->shard != _current_shard; next = sharder.next(*_schema)) { + _shard_selection_min_heap.push_back(shard_and_token{next->shard, next->ring_range.start()->value().token()}); + boost::push_heap(_shard_selection_min_heap); + } +} + +bool multishard_combining_reader_v2::maybe_move_to_next_shard(const dht::token* const t) { + if (_shard_selection_min_heap.empty() || (t && *t < _shard_selection_min_heap.front().token)) { + return false; + } + + boost::pop_heap(_shard_selection_min_heap); + const auto next_shard = _shard_selection_min_heap.back().shard; + _shard_selection_min_heap.pop_back(); + + if (t) { + _shard_selection_min_heap.push_back(shard_and_token{_current_shard, *t}); + boost::push_heap(_shard_selection_min_heap); + } + + _crossed_shards = true; + _current_shard = next_shard; + return true; +} + +future<> multishard_combining_reader_v2::handle_empty_reader_buffer() { + auto& reader = *_shard_readers[_current_shard]; + + if (reader.is_end_of_stream()) { + if (_shard_selection_min_heap.empty()) { + _end_of_stream = true; + } else { + maybe_move_to_next_shard(); + } + return make_ready_future<>(); + } else if (reader.is_read_ahead_in_progress()) { + return reader.fill_buffer(); + } else { + // If we crossed shards and the next reader has an empty buffer we + // double concurrency so the next time we cross shards we will have + // more chances of hitting the reader's buffer. + if (_crossed_shards) { + _concurrency = std::min(_concurrency * 2, _sharder.shard_count()); + + // Read ahead shouldn't change the min selection heap so we work on a local copy. + auto shard_selection_min_heap_copy = _shard_selection_min_heap; + + // If concurrency > 1 we kick-off concurrency-1 read-aheads in the + // background. They will be brought to the foreground when we move + // to their respective shard. + for (unsigned i = 1; i < _concurrency && !shard_selection_min_heap_copy.empty(); ++i) { + boost::pop_heap(shard_selection_min_heap_copy); + const auto next_shard = shard_selection_min_heap_copy.back().shard; + shard_selection_min_heap_copy.pop_back(); + _shard_readers[next_shard]->read_ahead(); + } + } + return reader.fill_buffer(); + } +} + +multishard_combining_reader_v2::multishard_combining_reader_v2( + const dht::sharder& sharder, + shared_ptr lifecycle_policy, + schema_ptr s, + reader_permit permit, + const dht::partition_range& pr, + const query::partition_slice& ps, + const io_priority_class& pc, + tracing::trace_state_ptr trace_state, + mutation_reader::forwarding fwd_mr) + : impl(std::move(s), std::move(permit)), _sharder(sharder) { + + on_partition_range_change(pr); + + _shard_readers.reserve(_sharder.shard_count()); + for (unsigned i = 0; i < _sharder.shard_count(); ++i) { + _shard_readers.emplace_back(std::make_unique(_schema, _permit, lifecycle_policy, i, pr, ps, pc, trace_state, fwd_mr)); + } +} + +future<> multishard_combining_reader_v2::fill_buffer() { + _crossed_shards = false; + return do_until([this] { return is_buffer_full() || is_end_of_stream(); }, [this] { + auto& reader = *_shard_readers[_current_shard]; + + if (reader.is_buffer_empty()) { + return handle_empty_reader_buffer(); + } + + while (!reader.is_buffer_empty() && !is_buffer_full()) { + if (const auto& mf = reader.peek_buffer(); mf.is_partition_start() && maybe_move_to_next_shard(&mf.as_partition_start().key().token())) { + return make_ready_future<>(); + } + push_mutation_fragment(reader.pop_mutation_fragment()); + } + return make_ready_future<>(); + }); +} + +future<> multishard_combining_reader_v2::next_partition() { + clear_buffer_to_next_partition(); + if (is_buffer_empty()) { + return _shard_readers[_current_shard]->next_partition(); + } + return make_ready_future<>(); +} + +future<> multishard_combining_reader_v2::fast_forward_to(const dht::partition_range& pr) { + clear_buffer(); + _end_of_stream = false; + on_partition_range_change(pr); + return parallel_for_each(_shard_readers, [&pr] (std::unique_ptr& sr) { + return sr->fast_forward_to(pr); + }); +} + +future<> multishard_combining_reader_v2::fast_forward_to(position_range pr) { + return make_exception_future<>(make_backtraced_exception_ptr()); +} + +future<> multishard_combining_reader_v2::close() noexcept { + return parallel_for_each(_shard_readers, [] (std::unique_ptr& sr) { + return sr->close(); + }); +} + +flat_mutation_reader_v2 make_multishard_combining_reader_v2( + shared_ptr lifecycle_policy, + schema_ptr schema, + reader_permit permit, + const dht::partition_range& pr, + const query::partition_slice& ps, + const io_priority_class& pc, + tracing::trace_state_ptr trace_state, + mutation_reader::forwarding fwd_mr) { + const dht::sharder& sharder = schema->get_sharder(); + return make_flat_mutation_reader_v2(sharder, std::move(lifecycle_policy), std::move(schema), std::move(permit), pr, ps, pc, + std::move(trace_state), fwd_mr); +} + +flat_mutation_reader_v2 make_multishard_combining_reader_v2_for_tests( + const dht::sharder& sharder, + shared_ptr lifecycle_policy, + schema_ptr schema, + reader_permit permit, + const dht::partition_range& pr, + const query::partition_slice& ps, + const io_priority_class& pc, + tracing::trace_state_ptr trace_state, + mutation_reader::forwarding fwd_mr) { + return make_flat_mutation_reader_v2(sharder, std::move(lifecycle_policy), std::move(schema), std::move(permit), pr, ps, pc, + std::move(trace_state), fwd_mr); +} diff --git a/readers/multishard.hh b/readers/multishard.hh new file mode 100644 index 0000000000..4428a8698e --- /dev/null +++ b/readers/multishard.hh @@ -0,0 +1,140 @@ +/* + * Copyright (C) 2022-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +#pragma once + +#include "reader_concurrency_semaphore.hh" +#include "readers/flat_mutation_reader_fwd.hh" +#include "tracing/trace_state.hh" +#include "seastarx.hh" + +namespace seastar { +class io_priority_class; +} + +/// Reader lifecycle policy for the mulitshard combining reader. +/// +/// This policy is expected to make sure any additional resource the readers +/// might need is kept alive for the lifetime of the readers, not that +/// of the multishard reader. This is a very important distinction. As +/// destructors cannot return futures, the multishard reader will be +/// destroyed before all it's shard readers could stop properly. Hence it +/// is the duty of this policy to make sure all objects the shard readers +/// depend on stay alive until they are properly destroyed on their home +/// shards. Note that this also includes the passed in `range` and `slice` +/// parameters because although client code is required to keep them alive as +/// long as the top level reader lives, the shard readers might outlive the +/// multishard reader itself. +class reader_lifecycle_policy_v2 { +public: + struct stopped_reader { + reader_concurrency_semaphore::inactive_read_handle handle; + flat_mutation_reader_v2::tracked_buffer unconsumed_fragments; + }; + +public: + /// Create an appropriate reader on the shard it is called on. + /// + /// Will be called when the multishard reader visits a shard for the + /// first time or when a reader has to be recreated after having been + /// evicted (while paused). This method should also enter gates, take locks + /// or whatever is appropriate to make sure resources it is using on the + /// remote shard stay alive, during the lifetime of the created reader. + /// + /// The \c permit parameter shall be obtained via `obtain_reader_permit()` + virtual flat_mutation_reader_v2 create_reader( + schema_ptr schema, + reader_permit permit, + const dht::partition_range& range, + const query::partition_slice& slice, + const io_priority_class& pc, + tracing::trace_state_ptr trace_state, + mutation_reader::forwarding fwd_mr) = 0; + + /// Updates the read-range of the shard reader. + /// + /// Gives the lifecycle-policy a chance to update its stored read-range (if + /// the case). Called after any modification to the read range (typically + /// after fast_forward_to()). The range is identical to the one the reader + /// holds a reference to after the modification happened. When this method + /// is called, it is safe to destroy the previous range instance. + /// + /// This method has to be called on the shard the reader lives on. + virtual void update_read_range(lw_shared_ptr pr) = 0; + + /// Destroy the shard reader. + /// + /// Will be called when the multishard reader is being destroyed. It will be + /// called for each of the shard readers. + /// This method is expected to do a proper cleanup, that is, leave any gates, + /// release any locks or whatever is appropriate for the shard reader. + /// + /// This method has to be called on the shard the reader lives on. + /// This method will be called from a destructor so it cannot throw. + virtual future<> destroy_reader(stopped_reader reader) noexcept = 0; + + /// Get the relevant semaphore for this read. + /// + /// The semaphore is used to register paused readers with as inactive + /// readers. The semaphore then can evict these readers when resources are + /// in-demand. + /// The multishard reader will pause and resume readers via the `pause()` + /// and `try_resume()` helper methods. Clients can resume any paused readers + /// after the multishard reader is destroyed via the same helper methods. + /// + /// This method will be called on the shard where the relevant reader lives. + virtual reader_concurrency_semaphore& semaphore() = 0; + + /// Obtain an admitted permit. + /// + /// The permit will be associated with the semaphore returned by + /// `semaphore()`. + /// + /// This method will be called on the shard where the relevant reader lives. + virtual future obtain_reader_permit(schema_ptr schema, const char* const description, db::timeout_clock::time_point timeout) = 0; +}; + +/// Make a multishard_combining_reader. +/// +/// multishard_combining_reader takes care of reading a range from all shards +/// that own a subrange in the range. Shard reader are created on-demand, when +/// the shard is visited for the first time. +/// +/// The read starts with a concurrency of one, that is the reader reads from a +/// single shard at a time. The concurrency is exponentially increased (to a +/// maximum of the number of shards) when a reader's buffer is empty after +/// moving the next shard. This condition is important as we only wan't to +/// increase concurrency for sparse tables that have little data and the reader +/// has to move between shards often. When concurrency is > 1, the reader +/// issues background read-aheads to the next shards so that by the time it +/// needs to move to them they have the data ready. +/// For dense tables (where we rarely cross shards) we rely on the +/// foreign_reader to issue sufficient read-aheads on its own to avoid blocking. +/// +/// The readers' life-cycles are managed through the supplied lifecycle policy. +flat_mutation_reader_v2 make_multishard_combining_reader_v2( + shared_ptr lifecycle_policy, + schema_ptr schema, + reader_permit permit, + const dht::partition_range& pr, + const query::partition_slice& ps, + const io_priority_class& pc, + tracing::trace_state_ptr trace_state = nullptr, + mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::no); + +flat_mutation_reader_v2 make_multishard_combining_reader_v2_for_tests( + const dht::sharder& sharder, + shared_ptr lifecycle_policy, + schema_ptr schema, + reader_permit permit, + const dht::partition_range& pr, + const query::partition_slice& ps, + const io_priority_class& pc, + tracing::trace_state_ptr trace_state = nullptr, + mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::no); + diff --git a/repair/row_level.cc b/repair/row_level.cc index 3ce0b49832..3a1f750829 100644 --- a/repair/row_level.cc +++ b/repair/row_level.cc @@ -47,6 +47,7 @@ #include "cql3/untyped_result_set.hh" #include "idl/partition_checksum.dist.hh" #include "readers/empty.hh" +#include "readers/evictable.hh" extern logging::logger rlogger; diff --git a/replica/database.cc b/replica/database.cc index b931d7d0f3..414354cad4 100644 --- a/replica/database.cc +++ b/replica/database.cc @@ -60,6 +60,7 @@ #include "replica/data_dictionary_impl.hh" #include "readers/multi_range.hh" +#include "readers/multishard.hh" using namespace std::chrono_literals; using namespace db; diff --git a/test/boost/mutation_reader_test.cc b/test/boost/mutation_reader_test.cc index 4badc11b7e..a46991c6ec 100644 --- a/test/boost/mutation_reader_test.cc +++ b/test/boost/mutation_reader_test.cc @@ -57,6 +57,8 @@ #include "readers/empty_v2.hh" #include "readers/next_partition_adaptor.hh" #include "readers/combined.hh" +#include "readers/foreign.hh" +#include "readers/evictable.hh" static schema_ptr make_schema() { return schema_builder("ks", "cf") diff --git a/test/boost/view_build_test.cc b/test/boost/view_build_test.cc index e23cf2303c..3be4514f7e 100644 --- a/test/boost/view_build_test.cc +++ b/test/boost/view_build_test.cc @@ -32,6 +32,7 @@ #include "utils/ranges.hh" #include "readers/from_mutations_v2.hh" +#include "readers/evictable.hh" using namespace std::literals::chrono_literals; diff --git a/test/lib/reader_lifecycle_policy.hh b/test/lib/reader_lifecycle_policy.hh index 7764801959..d32012d710 100644 --- a/test/lib/reader_lifecycle_policy.hh +++ b/test/lib/reader_lifecycle_policy.hh @@ -8,7 +8,7 @@ #pragma once -#include "mutation_reader.hh" +#include "readers/multishard.hh" #include class test_reader_lifecycle_policy