diff --git a/mutation_reader.cc b/mutation_reader.cc index 6cfee34612..633efc359c 100644 --- a/mutation_reader.cc +++ b/mutation_reader.cc @@ -2714,6 +2714,226 @@ flat_mutation_reader make_multishard_combining_reader_for_tests( std::move(trace_state), fwd_mr); } +// See make_multishard_combining_reader() for description. +class multishard_combining_reader_v2 : public flat_mutation_reader_v2::impl { + struct shard_and_token { + shard_id shard; + dht::token token; + + bool operator<(const shard_and_token& o) const { + // Reversed, as we want a min-heap. + return token > o.token; + } + }; + + const dht::sharder& _sharder; + std::vector> _shard_readers; + // Contains the position of each shard with token granularity, organized + // into a min-heap. Used to select the shard with the smallest token each + // time a shard reader produces a new partition. + std::vector _shard_selection_min_heap; + unsigned _current_shard; + bool _crossed_shards; + unsigned _concurrency = 1; + + void on_partition_range_change(const dht::partition_range& pr); + bool maybe_move_to_next_shard(const dht::token* const t = nullptr); + future<> handle_empty_reader_buffer(); + +public: + multishard_combining_reader_v2( + const dht::sharder& sharder, + shared_ptr lifecycle_policy, + schema_ptr s, + reader_permit permit, + const dht::partition_range& pr, + const query::partition_slice& ps, + const io_priority_class& pc, + tracing::trace_state_ptr trace_state, + mutation_reader::forwarding fwd_mr); + + // this is captured. + multishard_combining_reader_v2(const multishard_combining_reader_v2&) = delete; + multishard_combining_reader_v2& operator=(const multishard_combining_reader_v2&) = delete; + multishard_combining_reader_v2(multishard_combining_reader_v2&&) = delete; + multishard_combining_reader_v2& operator=(multishard_combining_reader_v2&&) = delete; + + virtual future<> fill_buffer() override; + virtual future<> next_partition() override; + virtual future<> fast_forward_to(const dht::partition_range& pr) override; + virtual future<> fast_forward_to(position_range pr) override; + virtual future<> close() noexcept override; +}; + +void multishard_combining_reader_v2::on_partition_range_change(const dht::partition_range& pr) { + _shard_selection_min_heap.clear(); + _shard_selection_min_heap.reserve(_sharder.shard_count()); + + auto token = pr.start() ? pr.start()->value().token() : dht::minimum_token(); + _current_shard = _sharder.shard_of(token); + + auto sharder = dht::ring_position_range_sharder(_sharder, pr); + + auto next = sharder.next(*_schema); + + // The first value of `next` is thrown away, as it is the ring range of the current shard. + // We only want to do a full round, until we get back to the shard we started from (`_current_shard`). + // We stop earlier if the sharder has no ranges for the remaining shards. + for (next = sharder.next(*_schema); next && next->shard != _current_shard; next = sharder.next(*_schema)) { + _shard_selection_min_heap.push_back(shard_and_token{next->shard, next->ring_range.start()->value().token()}); + boost::push_heap(_shard_selection_min_heap); + } +} + +bool multishard_combining_reader_v2::maybe_move_to_next_shard(const dht::token* const t) { + if (_shard_selection_min_heap.empty() || (t && *t < _shard_selection_min_heap.front().token)) { + return false; + } + + boost::pop_heap(_shard_selection_min_heap); + const auto next_shard = _shard_selection_min_heap.back().shard; + _shard_selection_min_heap.pop_back(); + + if (t) { + _shard_selection_min_heap.push_back(shard_and_token{_current_shard, *t}); + boost::push_heap(_shard_selection_min_heap); + } + + _crossed_shards = true; + _current_shard = next_shard; + return true; +} + +future<> multishard_combining_reader_v2::handle_empty_reader_buffer() { + auto& reader = *_shard_readers[_current_shard]; + + if (reader.is_end_of_stream()) { + if (_shard_selection_min_heap.empty()) { + _end_of_stream = true; + } else { + maybe_move_to_next_shard(); + } + return make_ready_future<>(); + } else if (reader.is_read_ahead_in_progress()) { + return reader.fill_buffer(); + } else { + // If we crossed shards and the next reader has an empty buffer we + // double concurrency so the next time we cross shards we will have + // more chances of hitting the reader's buffer. + if (_crossed_shards) { + _concurrency = std::min(_concurrency * 2, _sharder.shard_count()); + + // Read ahead shouldn't change the min selection heap so we work on a local copy. + auto shard_selection_min_heap_copy = _shard_selection_min_heap; + + // If concurrency > 1 we kick-off concurrency-1 read-aheads in the + // background. They will be brought to the foreground when we move + // to their respective shard. + for (unsigned i = 1; i < _concurrency && !shard_selection_min_heap_copy.empty(); ++i) { + boost::pop_heap(shard_selection_min_heap_copy); + const auto next_shard = shard_selection_min_heap_copy.back().shard; + shard_selection_min_heap_copy.pop_back(); + _shard_readers[next_shard]->read_ahead(); + } + } + return reader.fill_buffer(); + } +} + +multishard_combining_reader_v2::multishard_combining_reader_v2( + const dht::sharder& sharder, + shared_ptr lifecycle_policy, + schema_ptr s, + reader_permit permit, + const dht::partition_range& pr, + const query::partition_slice& ps, + const io_priority_class& pc, + tracing::trace_state_ptr trace_state, + mutation_reader::forwarding fwd_mr) + : impl(std::move(s), std::move(permit)), _sharder(sharder) { + + on_partition_range_change(pr); + + _shard_readers.reserve(_sharder.shard_count()); + for (unsigned i = 0; i < _sharder.shard_count(); ++i) { + _shard_readers.emplace_back(std::make_unique(_schema, _permit, lifecycle_policy, i, pr, ps, pc, trace_state, fwd_mr)); + } +} + +future<> multishard_combining_reader_v2::fill_buffer() { + _crossed_shards = false; + return do_until([this] { return is_buffer_full() || is_end_of_stream(); }, [this] { + auto& reader = *_shard_readers[_current_shard]; + + if (reader.is_buffer_empty()) { + return handle_empty_reader_buffer(); + } + + while (!reader.is_buffer_empty() && !is_buffer_full()) { + if (const auto& mf = reader.peek_buffer(); mf.is_partition_start() && maybe_move_to_next_shard(&mf.as_partition_start().key().token())) { + return make_ready_future<>(); + } + push_mutation_fragment(reader.pop_mutation_fragment()); + } + return make_ready_future<>(); + }); +} + +future<> multishard_combining_reader_v2::next_partition() { + clear_buffer_to_next_partition(); + if (is_buffer_empty()) { + return _shard_readers[_current_shard]->next_partition(); + } + return make_ready_future<>(); +} + +future<> multishard_combining_reader_v2::fast_forward_to(const dht::partition_range& pr) { + clear_buffer(); + _end_of_stream = false; + on_partition_range_change(pr); + return parallel_for_each(_shard_readers, [&pr] (std::unique_ptr& sr) { + return sr->fast_forward_to(pr); + }); +} + +future<> multishard_combining_reader_v2::fast_forward_to(position_range pr) { + return make_exception_future<>(make_backtraced_exception_ptr()); +} + +future<> multishard_combining_reader_v2::close() noexcept { + return parallel_for_each(_shard_readers, [] (std::unique_ptr& sr) { + return sr->close(); + }); +} + +flat_mutation_reader_v2 make_multishard_combining_reader_v2( + shared_ptr lifecycle_policy, + schema_ptr schema, + reader_permit permit, + const dht::partition_range& pr, + const query::partition_slice& ps, + const io_priority_class& pc, + tracing::trace_state_ptr trace_state, + mutation_reader::forwarding fwd_mr) { + const dht::sharder& sharder = schema->get_sharder(); + return make_flat_mutation_reader_v2(sharder, std::move(lifecycle_policy), std::move(schema), std::move(permit), pr, ps, pc, + std::move(trace_state), fwd_mr); +} + +flat_mutation_reader_v2 make_multishard_combining_reader_v2_for_tests( + const dht::sharder& sharder, + shared_ptr lifecycle_policy, + schema_ptr schema, + reader_permit permit, + const dht::partition_range& pr, + const query::partition_slice& ps, + const io_priority_class& pc, + tracing::trace_state_ptr trace_state, + mutation_reader::forwarding fwd_mr) { + return make_flat_mutation_reader_v2(sharder, std::move(lifecycle_policy), std::move(schema), std::move(permit), pr, ps, pc, + std::move(trace_state), fwd_mr); +} + class queue_reader final : public flat_mutation_reader::impl { friend class queue_reader_handle; diff --git a/mutation_reader.hh b/mutation_reader.hh index 6509822651..ab7acc9528 100644 --- a/mutation_reader.hh +++ b/mutation_reader.hh @@ -716,6 +716,45 @@ flat_mutation_reader make_multishard_combining_reader_for_tests( tracing::trace_state_ptr trace_state = nullptr, mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::no); +/// Make a multishard_combining_reader. +/// +/// multishard_combining_reader takes care of reading a range from all shards +/// that own a subrange in the range. Shard reader are created on-demand, when +/// the shard is visited for the first time. +/// +/// The read starts with a concurrency of one, that is the reader reads from a +/// single shard at a time. The concurrency is exponentially increased (to a +/// maximum of the number of shards) when a reader's buffer is empty after +/// moving the next shard. This condition is important as we only wan't to +/// increase concurrency for sparse tables that have little data and the reader +/// has to move between shards often. When concurrency is > 1, the reader +/// issues background read-aheads to the next shards so that by the time it +/// needs to move to them they have the data ready. +/// For dense tables (where we rarely cross shards) we rely on the +/// foreign_reader to issue sufficient read-aheads on its own to avoid blocking. +/// +/// The readers' life-cycles are managed through the supplied lifecycle policy. +flat_mutation_reader_v2 make_multishard_combining_reader_v2( + shared_ptr lifecycle_policy, + schema_ptr schema, + reader_permit permit, + const dht::partition_range& pr, + const query::partition_slice& ps, + const io_priority_class& pc, + tracing::trace_state_ptr trace_state = nullptr, + mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::no); + +flat_mutation_reader_v2 make_multishard_combining_reader_v2_for_tests( + const dht::sharder& sharder, + shared_ptr lifecycle_policy, + schema_ptr schema, + reader_permit permit, + const dht::partition_range& pr, + const query::partition_slice& ps, + const io_priority_class& pc, + tracing::trace_state_ptr trace_state = nullptr, + mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::no); + class queue_reader; /// Calls to different methods cannot overlap!